はじめに
クリアコードはFluentdというソフトウェアの開発に参加しています。Fluentdはログ収集や収集したデータの分配・集約などを行うソフトウェアです。
v0.14での新機能を使ったプラグインを作成する際にはこれまでの Fluent
以下のクラスではなく、Fluent::Plugin
以下のクラスを継承し、実装する必要が出てきました。
また、v0.14のOutputプラグインはv0.12とは異なり、Fluent::Plugin::Output
クラスに様々な機能が入っています。これらの機能をプラグイン開発者向けに解説することを目指します。
この記事はv0.14.8以降が対象です。 まずは、Outputプラグインが必ず実装するべきメソッドについてのおさらいです。
non-buffered
def emit(tag, es, chain)
# ...
chain.next
end
を
def process(tag, es)
# ...
end
と読み替えます。 output#process(tag, es)
だけを実装するとnon-bufferedプラグインになります。
例えば、out_relabel の使用例があります。
buffered synchronous
output#write(chunk)
を実装するとbuffered outputプラグインになります。
def write(chunk)
# ...
end
例えば、out_stdout の使用例があります。
buffered asynchronous
output#try_write(chunk)
を実装するとbuffered asynchronous outputプラグインになります。
def try_write(chunk)
# ...
end
out_stdout の使用例があります。ただし、これはテスト用の実装のため、実用のものとは異なることに注意してください。
また、#commit_write(chunk_id)
を呼び、chunkのwriteを確定させることが必要です。
rollback_write
は commit_write
が行われないまま指定秒数が経過した chunk に対して自動的に呼ばれるので、プラグイン開発者が明示的に呼ぶ必要は通常はありません(秒数は delayed_commit_timeout
で設定から制御可能)。
ここまでがv0.14のOutputプラグインの基本的な事柄です。
では、さらにv0.14のプラグイン開発者にとって必要なことを順々に見ていきましょう。
custom format
#format(tag, time, record)
を実装すると、bufferのchunkでmsgpack以外のformatが使用できるようになります。
#format
を使用すると、
def formatted_to_msgpack_binary
true
end
としてtrueを返すようにしなければ chunk#msgpack_each
メソッドは使用できません。
chunk#msgpack_each
v0.12のObjectBufferedOutput互換になるのは #format
を実装していない場合です。
#format
の有無や、 #formatted_to_msgpack_binary
の返り値によって挙動が異なってくるのに注意してください。
standard format
chunk#msgpack_each
でyieldされてくる値は #format
を実装している時とそうでない時で異なります。
def write(chunk)
chunk.msgpack_each do |time, record|
# ...
end
end
ただし、#msgpack_each
は互換性のために残されているものです。
通常は chunk.each
を使ってください。msgpack_each
も(主に互換性の関係から) alias が定義されていますが、本来 chunk の内部フォーマット(msgpack)を意識させたメソッドを使うのは好ましくありません。
tagが必要な場合は、
config_section :buffer do
config_set_default :@type, DEFAULT_BUFFER_TYPE
config_set_default :chunk_keys, ['tag']
end
のようなbufferのdefault confを足し、chunk.metadata.tag
で取得してください。
また、tag が必要な場合 config_set_default :chunk_keys, ['tag']
を指定しておくのはよいですが、これは設定で上書きされる可能性があるため #configure
でチェックを行うべきです。
def configure(conf)
super
raise Fluent::ConfigError, "chunk keys must include 'tag' for this plugin" unless @chunk_key_tag
# ...
end
custom format
#format(tag, time, record)
を実装した場合は、to_msgpackでmsgpackへパックした順にmsgpack_eachをすると得られます。
また、#formatted_to_msgpack_binary
をオーバーライドしてtrueを返すようにしてください。
def format(tag, time, record)
[tag, time, record].to_msgpack
end
def formatted_to_msgpack_binary
true
end
def write(chunk)
chunk.msgpack_each do |tag, time, record|
# ...
end
end
injectヘルパーを使う場合は #format(tag, time, record)
を通すことでより見通しが良くなります。そのため、 #format
を実装し、その中で inject_values_to_record(tag, time, record)
を呼ぶようにしてください。
発展形
v0.14のOutputプラグインはオーバーライドするメソッドや実装するメソッドにより、confの設定により実行時に3種の異なる種別のOutputプラグインへ切り替えることができます。
non-bufferedとbufferedの切り替え
これは以下の優先順位で行われます:
-
実装メソッドによる分岐 (例:
#process
しか実装されていない → non-buffered) -
両方実装されている場合で、かつ設定において
<buffer>
セクションが指定されている場合 → buffered -
両方実装されており設定に
セクションが指定されていない場合 → #prefer_buffered_processing
を呼んで判定
buffered synchronous/asynchronousの切り替え
output#write
と output#try_write
を実装して #prefer_delayed_commit
の返り値のtrue/falseでbuffered synchronousとbuffered asynchronousを切り替えられます。
-
true -> buffered asynchronous
-
false -> buffered synchronous
output#write
と output#try_write
のどちらか一方だけ実装している場合は、#prefer_delayed_commit
は呼ばれません。
bufferedプラグインの注意点
#write
, #try_write
を実装していないOutputプラグインへのconfigには <buffer>
ディレクティブが使用できません。
複合形
#prefer_delayed_commit |
#prefer_buffered_processing |
結果 |
---|---|---|
false | false | non-buffered |
false | true | buffered synchronous |
true | true | buffered asynchronous |
true | false | 選択不可 |
secondaryの扱い
secondaryに指定されたプラグインはbufferingのサポートが必要です。out_fileなどのbufferingをサポートしたoutputプラグインを指定できます。
bufferディレクティブのCHUNK_KEYSアトリビュート
<buffer CHUNK_KEYS>
のようにbufferディレクティブにはCHUNK_KEYSのアトリビュートの指定が可能です。
tag, timekey, variablesの指定ができるようになっています。これはこのアトリビュートによってチャンクをひとまとめにするためにあります。
-
tag →タグごとにチャンクがまとめられる
-
timekey →time formatごとにチャンクがまとめられる
-
variables →レコードの中のキーごとのチャンクがまとめられる
buffered outputプラグインのflushで用いられるthread
start時に <buffer>
ディレクティブにある flush_thread_count
で指定されている数のスレッドを作ります。#submit_flush_once
は単にそれらのスレッドを明示的にアクティブにしているだけです。
v0.12のbuffered outputプラグインの自前スレッドの書き換え
プラグインが自前で作成していたスレッドは以下のようにできるはずです。
-
定期的にある処理を行う必要があった場合 → timer plugin helper を使う
-
Fluent::Output
プラグインを継承していたが(ある設定が有効なときのみ)バックグラウンドでflushするような処理を自前で書いていた →#process
および#write
両方を実装して設定により挙動を切り替える -
socketをlistenしていた → socket/server plugin helper を使う(これから実装される)
それ以外の場合は thread plugin helper を使います。自前で Thread.new
するべきではありません。thread plugin helperを使う場合、plugin test driverがそのスレッドの状態管理などの面倒を見てくれるため、たまに失敗するテスト、などの危険性が大幅に低下します。
プレースホルダ
chunk.metadata
が実際にどの値を有しているかは <buffer CHUNK_KEYS>
の CHUNK_KEYS
に何をユーザが指定したか(あるいは config_set_default
で何が指定されていたか)により異なります。
が、プラグイン作者が独自にチェックするべきではなく #configure
内で #placeholder_validate!("name_of_parameter", @name_of_parameter)
を使うべきです。使われているプレースホルダと chunk key の間に不整合があれば configuration error が上がります。
(もっと細かい制御もやろうと思えばできますが、コーナーケースです。こちらの議論を参照してください。)
つまりプラグイン作者は #configure
内で #placeholder_validate!
し、そこが通っているならあとは #write
で extract_placeholder(@name_of_parameter, chunk.metadata)
するだけでよいです。
${tag}
chunkに含まれるタグに展開されます。 また、tag1.tag2.tag3.... のようなタグとなっている場合、 ${tag[0]}, ${tag[1]}, ${tag[2]},...のようにタグの添え字を指定することで個別に取り出すことができます。
strftime形式(%Y%m%dなど)
strftimeのフォーマットに準じて展開されます。
variable_%Y-%m-%dT%H:%M:%S.%N
のように用います。
これは variable_2015-12-25T12:34:56.123450000
のように展開されます。
まとめ
v0.14のOutputプラグインの仕様をFluentdの開発者の協力を仰ぎ1書き出してみました。v0.12のoutputプラグインと変わっている箇所も多く、単純にv0.14への移行は難しい箇所もあります。 v0.14のAPIを使うように移行するとプラグインヘルパーやプレースホルダーの機能により、より柔軟なconfの設定を書くことが可能になります。例えば、プレースホルダーの機能を使ったものとしては、fluent-plugin-mysql のテーブル名へのプレースホルダーを指定可能にする機能2 を実装したものがあります。このようにタグや日付ごとのデータ集計をサポートする機能を簡単に実装できるようになるというメリットがあるため、v0.14のAPIを使うように移行を試みてみるのはいかがでしょうか?
-
この記事を書くに当たって @tagomoris さんのレビューの協力を仰ぎました。ありがとうございます。 ↩
-
https://github.com/tagomoris/fluent-plugin-mysql#configuration-examplebulk-insert-with-tag-placeholder-for-table-name や https://github.com/tagomoris/fluent-plugin-mysql#configuration-examplebulk-insert-with-time-format-placeholder-for-table-name を参照。 ↩