PLAZMA OSS Day: TD Tech Talk 2018 – Cooperative works for Fluentd Community の補足記事です。
背景
kafka-connect-fluentdとfluent-pluguin-kafkaの性能を比較したいと考えたときに、高い負荷を与える簡単に使えるツールがありませんでした。
dummerとFluentd組み込みのin_tailを使用すれば、負荷をかけることができるのはわかっていましたが、高い負荷を与えるためには複数のFluentdを動作させる必要があり、セットアップや管理が煩雑になることがわかっていました。
例えば、以下のようにたくさんのFluentdプロセスで1つのFluentdに負荷をかけるような感じになります。
参考: https://github.com/okumin/influent-benchmark
これはこれできちんと動きますが、以下の点で手軽ではありませんでした。
-
dummer でファイルを作る必要があるのでクライアント側にも Fluentd(in_tail + out_forward) が必須になってしまう
- in_tailはposファイルを作るので、条件を変えながら繰り返し測定しようとすると、セットアップとクリーンアップが必要となる
-
dummerで流量制御しようとするとI/Oの速度が上限になってしまう
- I/Oの速度を越えようとすると、上の図のように複数のFluentdプロセスを起動する必要があります
fluent-benchmark-client
そこでdummer + in_tailよりも簡単に使うことができる fluent-benchmark-client というコマンドラインツールを作りました。
Fluentd Forward Protocolを扱う部分はFluencyを採用しました。1
Kotlinを採用した理由は以下の通りです。
-
Javaで書くのと性能は変わらない
-
Javaで書かれたライブラリーは全部使える
-
IntelliJ IDEAのサポートがある
-
Null安全を試してみたかった
-
型推論を試したかった
-
コルーチンどうなの?というのを試したかった
初めて書くので慣れてないという以外にデメリットはありませんでした。
fluent-benchmark-clientを使うと、1つのプロセスから大量のイベントをFluentdに送信することができます。
使い方
0.5.0がリリース済みなので使えます。
$ tar xf fluent-benchmark-client-0.5.0.tar
$ cd fluent-benchmark-client
$ ./bin/fluent-benchmark-client --max-buffer-size=4g --period=1m --n-events-per-sec=1000
これで、localhost:24224
に1000 events/secを1分間送ることができます。
送る内容は、{ "message": "Hello, Fluentd! This is a test message." }
です。
--max-buffer-size=4g
はFluencyのバッファのサイズを指定しています。これを指定することにより大きなワークロードにも対応可能です。
送信するワークロードの内容はコマンドラインオプションで変更することができます。 また、ファイルから読むこともできるので複雑なワークロードにも対応可能です。対応しているフォーマットはLTSVとJSONL2です。 ここで利用するファイルはdummerで生成することを想定しています。
工夫したところ
初期の実装では、以下のようにビジーループの中でMap<String, Object>
を作っていました。さらにemit
の内部でMessagePackへの変換が行われていたため速度が出ていませんでした。
約50万 events/sec 出てたかどうかも怪しいという記録が手元に残っていました。
while (true) {
fluency.emit(tag, mapOf("message" to "Hello Kotlin!"))
}
これを以下のように、ループの外でMessagePackに変換してからemitすると、手元のマシンで200万 events/secくらいの速度でemitできるようになりました。
val buffer = ByteArrayOutputStream()
val packer = MessagePack.newDefaultPacker(buffer)
packer.packMapHeader(1)
packer.packString("message")
packer.packString("Hello Kotlin!")
packer.flush()
packer.close()
val mapValue = buffer.toByteArray()
while (true) {
fluency.emit(tag, mapValue)
}
ファイルからLTSVやJSONLを読み込む場合も、ファイルを全部読み込んでMessagePackに変換してからemitするようにしているので、速度は変わりません。 しかし、全データをメモリーに載せるので注意が必要です。
まだできていないこと
まだ、できていないことがいくつかあります。
-
マルチワーカー化
- Kotlinのコルーチンへの理解が進んだら着手したいです
-
SSL/TLS対応
- Fluencyが対応したら対応したいです
まとめ
kafka-connect-fluentdとfluent-plugin-kafkaの性能を比較するために、ちょうどよいベンチマークツールがなかったのでfluent-benchmark-clientを開発しました。