はじめに
クリアコードではFluentdというソフトウェアの開発に参加しています。Fluentdはログ収集や収集したデータの分配・集約などを行うソフトウェアです。
Fluentdのv0.14はv0.12とある程度の後方互換性が保たれているメジャーバージョンアップです。
v0.14での新機能を使ったプラグインを作成する際にはこれまでの Fluent
以下のクラスではなく、Fluent::Plugin
以下のクラスを継承し、実装する必要が出てきました。
また、v0.14からはプラグインでよく書かれるコードをカプセル化し共通に使えるヘルパーを提供することで、よりプラグイン開発者が簡潔で良くテストされたコードを使ってプラグインが開発出来るようになる、とアナウンスされています。1
Inputプラグインの場合
Inputプラグインをv0.14のプラグインに移行する際には Fluent::Input
の継承を止め、Fluent::Plugin::Input
を継承するようにします。
また、Fluentdのテストを読むと、v0.14でのInputプラグインのテストにはfluent/test/driver/input.rbにある Fluent::Test::Driver::Input
クラスのテストドライバを用いるようにすると良いことがわかります。
driver#run の書き方がv0.12と比べて変わっていると言う点に注意しなければなりません。
v0.14のテストドライバでは driver#run
の終了条件が確定しない場合は例外が上がるようになっています。 2
そのため、v0.14向けのInputプラグインのテストでは driver#run
へブロックを渡すか、 driver#end_if
で終了条件を指定することが必要です。
-
requireするInputクラスを
fluent/input
からfluent/plugin/input
へ変更する -
Time.now
をナノ秒に対応した現在時刻を返すFluent::EventTime.now
に置き換える 3 -
前述のプラグインヘルパーを使う事のできる箇所は置き換える
-
テストドライバをv0.14のものを使用するようにする。
という点に注意してInputプラグインのv0.14への移行作業を行います。
実際に、in_object_spaceプラグインをv0.14化したプルリクエストを見てみましょう。
1.と2.は見ての通りほぼそのままなので、とくにここでは詳細に解説しません。 Inputプラグインのv0.14への移行作業は3.の作業と4.の作業が特に重い作業です。
3.の作業に該当するものは、in_object_spaceプラグインをv0.14化したプルリクエストでは、timerヘルパーの使用にあたります。 v0.14ではタイマーを用いてイベントを発生させるのに汎用的なtimerヘルパーが提供されており、v0.12の頃はCool.ioのクラスを継承したクラスを作成してこの手の処理を行うコードを書く必要がありました。v0.14ではヘルパーを使うだけになっています。
また、プラグインヘルパーは helpers
に :inject
のように使いたいプラグインヘルパー名をシンボルで渡す形式になっています。
4.の作業では、既に driver#run
にブロックが渡されていたため、driver#emits
を driver#events
に書き換える作業と、v0.14のInput Driverクラスの Fluent::Test::Driver::Input
を用い、プラグインのクラスをv0.14のテストドライバに渡す作業のみでした。v0.14でのInputプラグインのテストドライバを利用するには fluent/test/driver/input
をrequireする必要があります。
Outputプラグインの場合
v0.14のOutputプラグインが継承するべき Fluent::Plugin::Output
クラスは実装されているメソッドによって
-
バッファリングしないOutputプラグイン (non-Buffered Output)
-
バッファリングし、同期的にバッファをコミットするOutputプラグイン (Buffered Synchronous Output)
-
バッファリングし、非同期的にバッファをコミットするOutputプラグイン (Buffered Asynchronous Output)
の3つのOutputプラグインの性質を持つようになります。
また、v0.14でのOutputプラグインのテストドライバを利用するには fluent/test/driver/output
をrequireする必要があります。
driver#run
の書き方が変更になっており、例えば
driver.run(default_tag: 'test') do
driver.feed(time, {"a"=>1})
driver.feed(time, {"a"=>2})
end
のように、driver#run
のdefault_tag:
キーワード引数にemitする時のタグを渡したり、driver#emit
ではなくdriver#feed
を用いてイベントを流し込む必要があるのに注意してください。
流し込まれたイベントはInputと同様に driver#events
で取得することができます。
non-Buffered Output
まずは、1.の場合のv0.14への移行のプルリクエストを見ていきます。
この場合は #process
メソッドのみOutputプラグインが実装する必要があります。
また、driver#run
の新しい書き方に対応させました。
Buffered Synchronous Output
out_fileのv0.14化対応のプルリクエストを見ていきます。
Buffered Synchronous Outputのv0.14のプラグインは #write
メソッドと <buffer>
セクションを解釈出来るようにするか、compat_parametersプラグインヘルパーをプラグイン中で使用するようにします。
このプルリクエストではTimeSlicedOutputを継承したOutputプラグインのv0.14化対応をしています。 また、Outputプラグインからfomatterプラグインやbufferプラグインを使う事ができるため、それに関するプラグインヘルパーのconfig sectionの追加も行っています。 v0.14形式とv0.12形式のconfigは書き方が大幅に異なっています。4
ですが、その差異を埋めるcompat_parametersプラグインヘルパーもあります。
このプラグインヘルパーの compat_parameters_convert
メソッドを使う事により、v0.12形式のconfigでもv0.14のFluentdで引き続き使う事ができます。
Buffered Asynchronous Output
Buffered Asynchronous Outputのv0.14のプラグインは #try_write
メソッドと <buffer>
セクションを解釈出来るようにするか、compat_parametersプラグインヘルパーをプラグイン中で使用するようにします。
また、 prefer_delayed_commit
でtrueを返すようにします。
まだこの非同期コミットに絞ったv0.14のプラグインはこの記事の執筆時点では書かれていません。
Buffered Synchronous/Asynchronous Output
実は、Buffered Outputプラグインはsynchronousとasynchronousの両方の機能をconfigで切り替えられるように書く事ができます。
Buffered Outputに対応したテスト用のプラグインを追加したプルリクエストを見てみます。
ここでは、Fluent::Plugin::Output
を継承したOutputプラグインを追加しています。そこで#write
、#try_write
、そして、#prefer_delayed_commit
をそれぞれ実装しています。
このプラグインではバッファのコミットを非同期にする設定を追加してはいませんが、その設定をconfigからできるようにすることでバッファを同期的または非同期的にコミットする動作をconfigで切り替えられるプラグインを書く事ができます。
Outputプラグインのcustom format
また、v0.14のOutputプラグインはレコードのformatを行うためにプラグイン固有の#format
メソッドも定義しておく事が可能です。
Parserプラグインの場合
ParserプラグインはParserプラグイン単体で使われるプラグインではなく、Input・Outputプラグインなどから使われるOwnedプラグインと呼ばれる範疇のプラグインです。
これらParserプラグインのインスタンスはv0.14ではparserヘルパーの parser_create
メソッドを用いて作成することが推奨されます。
v0.14のParserプラグインは Fluent::Plugin::Parser
クラスを継承する必要があります。
Parserプラグインが持つべきメソッドは #parse
のみです。また、v0.14対応をするには fluent/test/driver/parser
にある Fluent::Test::Driver::Parser
クラスのParserテストドライバを用いる必要があります。
v0.12では driver#parse
にてパース結果をテストする形になっていましたが、v0.14では driver#instance
によりParserプラグインのインスタンスを取り出してからパーサープラグインのインスタンスの #parse
メソッドを呼ぶような規約に変更になったので注意が必要です。
Formatterプラグインの場合
FormatterプラグインはFormatterプラグイン単体で使われるプラグインではなく、Input・Outputプラグインなどから使われるOwnedプラグインと呼ばれる範疇のプラグインです。
v0.14のFormatterプラグインは Fluent::Plugin::Formatter
クラスを継承する必要があります。
これらFormatterプラグインのインスタンスはv0.14ではformatterヘルパーの formatter_create
メソッドを用いて作成することが推奨されます。
v0.14において、Formatterプラグインが持つべきメソッドは #format
です。
また、v0.14対応をするには fluent/test/driver/formatter
にある Fluent::Test::Driver::Formatter
クラスのFormatterテストドライバを用いる必要があります。
v0.12ではテストドライバを使う事は少なかったのですが、
v0.14では driver#instance
によりFormatterプラグインのインスタンスを取り出してからパーサープラグインのインスタンスの #format
メソッドを呼ぶとformat後の値が取得し、テストするようになったので注意が必要です。
Filterプラグインの場合
v0.14のFilterプラグインは Fluent::Plugin::Filter
クラスを継承する必要があります。
また、v0.14でのFilterプラグインのテストドライバは fluent/test/driver/filter
をrequireする必要があります。
v0.12の頃は #filter
よりも #filter_stream
の方が10%程度高速だったため高速化のために利用されていましたが、v0.14ではfilterをパイプライン化できる時はする機能がサポートされることになり、 #filter
を使うようにすることが推奨されます。 #filter_stream
は互換性のために残されますが、非推奨扱いになります。
driver#run
の書き方が変更になっており、例えば
driver.run do
driver.feed('test', time, {"a"=>1})
driver.feed('test', time, {"a"=>2})
end
のように、driver#emit
ではなくdriver#feed
を用いてイベントを流し込む必要があるのに注意してください。
流し込まれてFilterされた後のイベントは driver#filtered_records
で取得することができます。
まとめ
Fluentdのv0.12向けに書かれたプラグインのv0.14対応の概要について説明しました。Fluentd向けに多くのプラグインが公開されていますが、v0.12の書き方のままで公開されているプラグインが多くあるのが現状です。 Fluentdのプラグインのv0.14化対応はやり方を把握すれば挑戦できないことはないので、v0.14らしい書き方をしてみたいFluentdのプラグイン開発者やプラグインを良くしたいユーザーの方々はv0.14のAPIを使ってみるといいのではないでしょうか。 この記事ではプラグインヘルパーについては深入りする事ができませんでした。プラグインヘルパーについてはまたの機会に説明することとします。 最後に、v0.14でこのプラグインが動いていないというIssueを上げるだけでもありがたいのでそのようなフィードバックをして頂けると幸いです。
-
http://www.slideshare.net/tagomoris/fluentd-overview-now-and-then ↩
-
https://github.com/fluent/fluentd/blob/12718a218a1e78126108a573b85d4b18e8bd56d5/lib/fluent/test/driver/base.rb#L134 ↩
-
Fluent::Engine.now
は内部的にFluent::EventTime.now
を呼んでいるのでこのままでよいようです。 ↩ -
例えば、formatterやbufferも一つのconfigセクションとして書けるようになっています。 ↩