PLAZMA OSS Day: TD Tech Talk 2018 にて登壇しました。 fluent-plugin-kafkaのスループット問題を解消するために開発したkafka-connect-fluentdの紹介とfluent-plugins-nurseryに代表されるFluentdのプラグインを引き取り、メンテナンスしている事例の概要を紹介しました。
を紹介しました。
スライドにもありますが、Fluentdのプラグインの作者によるメンテナンスが滞っているプラグインがよく見かけられます。そのため、よく使われているプラグインのうち、作者がメンテナンスをする時間が取れないものに関しては引き取ることにしました。その引き取り場所として、fluent-plugins-nursery という organization を作成しました。
PLAZMA OSS Day: TD Tech Talk 2018で先日リリースしたkafka-connect-fluentdの紹介と、Fluentd周りを良くして行く活動の実績を紹介しました。
Firefox ESR60以降のバージョンでは、Policy Engineと呼ばれる新しいポリシー設定の仕組みが導入されます。近いうちに公式なドキュメントが用意されるものと予想されますが、Firefoxを法人で利用中の場合、一日も早く実際の動作を検証したいというニーズもある事でしょう。この記事では、現時点で実装されているPolicy Engineの具体的な使用手順を解説します。
Policy EngineはNightly 60で既に使用可能な状態になっています。Firefox開発版のダウンロードページから最新の開発版であるNightlyをダウンロードし、インストールしておきます。
次に、Nightlyの起動用ショートカットを編集します。Firefoxは通常版と開発版(Nightly)でプロファイルの内容に互換性が無い場合があり、普段使いのFirefoxのプロファイルでNightlyを起動してしまうと、不可逆的な移行処理が行われてしまって、以後通常版のFirefoxでそのプロファイルを使用できなくなる可能性があります。Firefoxのショートカットのプロパティを開き、「リンク先」欄の末尾にC:\Program Files\Nightly\firefox.exe -profile %temp%\NightlyProfile -no-remote
のように起動プロファイルを明示しておく事で、普段使いのプロファイルとは別の専用プロファイルでNightlyを起動できるようになります。
Nightlyの準備が終わったら、Policy Engineの設定の準備です。
Policy Engine用のポリシー設定は、Windowsの場合はレジストリ経由(グループポリシーオブジェクトでの設定)とJSONファイル経由での設定の2通りの方法があります。 簡単のため、ここではJSONファイルを使う方法のみ解説します。
ポリシー設定のためのJSONファイルは、Firefoxのインストール先フォルダ配下にdistribution
という名前でフォルダを作成し、さらにその中にpolicies.json
という名前で以下の内容のテキストファイル(JSON形式)を設置します。
{
"policies": {
}
}
以上で準備は完了です。
JSONファイルでのポリシー設定はpolicies.json
のpolicies
のプロパティとして記述します。現時点で使用できるポリシー設定にどのような物かがあるかはpolicies.json
のスキーマ定義に列挙されています。
例えば、about:config
で設定を変更される事とFirefoxの自動更新のそれぞれを禁止したい場合、policies.json
は以下のように記述します。
{
"policies": {
"BlockAboutConfig": true,
"DisableAppUpdate": true
}
}
2018年3月9日現在、以下のポリシー設定が存在します。これらは仕様が変更または削除される可能性がある事、あるいは新しいポリシー設定が今後追加される可能性がある事に注意して下さい。
"BlockAboutAddons": true
:about:addons
(アドオンマネージャ)の使用を禁止する。間接的に、アドオンのインストールを禁止する効果がある。"BlockAboutConfig": true
:about:config
の使用を禁止する。同時に、副作用として"DisableDeveloperTools": true
の効果も反映される。"BlockAboutProfiles": true
:about:profiles
の使用を禁止する。"BlockAboutSupport": true
:about:support
の使用を禁止する。"BlockSetDesktopBackground": true
:画像をコンテキストメニューからデスクトップの壁紙に設定する機能の使用を禁止する。"Bookmarks": [{"Title": "...", "URL": "...", "Favicon": "...", "Placement": "toolbar", "Folder": true/false }, ...]
:ブックマークツールバーに既定のブックマーク項目を追加する。"Bookmarks": [{"Title": "...", "URL": "...", "Favicon": "...", "Placement": "menu", "Folder": true/false }, ...]
:ブックマークメニューに既定のブックマーク項目を追加する。"Cookies": { "Allow": ["http://example.com", "https://example.org:8080"] }
:指定のWebサイト(オリジンで指定)でCookie、IndexedDB、Web Storage、およびService Worker用Cacheを保存する(任意に無効化はできない)。"Cookies": { "Block": ["http://example.com", "https://example.org:8080"] }
:指定のWebサイト(オリジンで指定)でCookie、IndexedDB、Web Storage、およびService Worker用Cacheを保存しない(任意に有効化はできない)。また、これらのホストに保存済みのCookieがあった場合、それらは削除される。"CreateMasterPassword": false
: マスターパスワードを設定する事を禁止する。"DisableAppUpdate": true
:Firefoxの自動更新を停止する。"DisableDeveloperTools": true
:開発ツールの使用を禁止する。"DisableFirefoxAccounts": true
:Firefoxアカウントの使用を禁止する(ひいては、Firefox Syncの使用も禁止される)。"DisableFirefoxScreenshots": true
:Firefox Screenshotsの使用を禁止する。"DisableFirefoxStudies": true
:Firefoxの新機能のテストへの参加を禁止する。"DisableFormHistory": true
:フォームの入力履歴の保存とオートコンプリートを禁止する。"DisablePocket": true
:Pocketの使用を禁止する。"DisablePrivateBrowsing": true
:プライベートブラウジング機能の使用を禁止する。"DisableSysAddonUpdate": true
:システムアドオンの更新を禁止する。"DisplayBookmarksToolbar": true
:初期状態でブックマークツールバーを表示する。(ただしこの設定は強制でなく、ユーザーが任意に非表示にする事もでき、非表示にした場合は次回以降の起動時も非表示のままとなる。)"DisplayMenuBar": true
:初期状態でメニューバーを表示する。(ただしこの設定は強制でなく、ユーザーが任意に非表示にする事もでき、非表示にした場合は次回以降の起動時も非表示のままとなる。)"DontCheckDefaultBrowser": true
:起動時に既定のブラウザにするかどうかを確認しない。"FlashPlugin": { "Allow": ["http://example.com", "https://example.org:8080"] }
:指定のWebサイト(オリジンで指定)でAdobe FlashをClick to Play無しで自動実行する(任意に無効化はできない)。"FlashPlugin": { "Block": ["http://example.com", "https://example.org:8080"] }
:指定のWebサイト(オリジンで指定)でAdobe Flashの実行を禁止する(任意に有効化はできない)。"Homepage": { "URL": "http://example.com", "Locked": true/false, "Additional": ["https://example.org:8080", ...] }
:既定のホームページを設定する。"Locked"
がtrue
の場合はホームページを固定する。また、"Additional"
を指定した場合は2番目以降のホームページとして設定する。"InstallAddons": { "Allow": ["https://example.com", "https://example.org:8080"] }
:指定のWebサイト(オリジンで指定)でアドオンのインストール時に警告しない(https
のみ指定可能)。"Popups": { "Allow": ["http://example.com", "https://example.org:8080"] }
:指定のWebサイト(オリジンで指定)でwindow.open()
によるポップアップを常に自動的に開く(任意に無効化はできない)。"RememberPasswords": true/false
:パスワードマネージャの使用または使用禁止を強制する。ポリシー設定はFirefoxの起動時に読み込まれます。Firefoxの動作中に変更したポリシー設定は、次回のFirefox起動時から反映されます。
以上、Firefox ESR60から使用可能になるPolicy Engineによるポリシー設定の手順を解説しました。
従来Firefoxでは、AutoConfigの一般的な設定でできないカスタマイズが必要な場合、アドオンやAutoConfig内に埋め込まれたスクリプトによってそれらを強引に実施するという方法を取る事ができました。ESR60以降のバージョンではXULアドオンが廃止され、またAutoConfigのスクリプト内での特権が必要なコードの実行が禁止される見込みであることから、それらの方法は取れなくなります。Policy Engineで実現可能な部分はPolicy Engineで設定するように、今のうちに備えておくようにしましょう。
2017年7月6日の記事で紹介した通り、クリアコードは組み込みLinux向けにMozilla Firefoxを移植するプロジェクトGecko EmbeddedをWebDINO Japan(旧Mozilla Japan)様と共同で立ち上げ、開発を進めております。Yoctoを使用してFirefoxをビルドしたりハードウェアクセラレーションを有効化する際のノウハウを蓄積して公開することで、同じ問題に悩む開発者の助けになることを目指しています。
この記事では、Gecko Embedded本体ではなく周辺のミドルウェアのNode-REDをRZ/G1で動かす話を書きます。 Node-REDは、ハードウェアデバイス/APIおよびオンラインサービスを接続するためのツールです。
2月時点では、Node-REDのビルド及び動作は以下のボードで確認しています。
レシピはGitHubにて公開されているものを使用します。
Yoctoに組み込むには、meta-nodejs-contribを git clone
したのち、(bitbakeのビルドディレクトリ)/conf/local.confへ
IMAGE_INSTALL_append = " node-red nodejs nodejs-npm "
PREFERRED_VERSION_nodejs = "6.11.2"
PREFERRED_VERSION_nodejs-native = "6.11.2"
(bitbakeのビルドディレクトリ)/conf/bblayers.confへ
BBLAYEYS += " ${TOPDIR}/../meta-nodejs "
BBLAYEYS += " ${TOPDIR}/../meta-nodejs-contrib "
をそれぞれ追加し、bitbakeを実行します。
ここで、RZ/G1向けのYoctoではNode.js 6.11.2の動作確認が取れているため、 PREFERRED_VERSION_nodejs
には 6.11.2
を指定しています。
Node-REDを上記の設定でビルドした場合、node-red
コマンドが起動イメージにインストールされます。
$ node-red
21 Feb 04:53:01 - [info]
Welcome to Node-RED
===================
21 Feb 04:53:01 - [info] Node-RED version: v0.17.5
21 Feb 04:53:01 - [info] Node.js version: v6.11.2
21 Feb 04:53:01 - [info] Linux 4.4.55-cip3 arm LE
21 Feb 04:53:07 - [info] Loading palette nodes
21 Feb 04:53:12 - [warn] ------------------------------------------------------
21 Feb 04:53:12 - [warn] [rpi-gpio] Info : Ignoring Raspberry Pi specific node
21 Feb 04:53:12 - [warn] ------------------------------------------------------
21 Feb 04:53:12 - [info] Settings file : /home/root/.node-red/settings.js
21 Feb 04:53:12 - [info] User directory : /home/root/.node-red
21 Feb 04:53:12 - [info] Flows file : /home/root/.node-red/flows_iwg20m.json
21 Feb 04:53:12 - [info] Creating new flow file
21 Feb 04:53:12 - [info] Starting flows
21 Feb 04:53:12 - [info] Started flows
21 Feb 04:53:12 - [info] Server now running at http://127.0.0.1:1880/
となれば動作確認は完了です。 Node-REDの使い方に関してはNode-REDの公式サイトのドキュメントを参照してください。
Node-REDのYoctoレシピを用いてRZ/Gシリーズのボードに載せた話を紹介しました。
これはRuby25周年へのメッセージです。
クリアコードの社長の須藤です。そんなにコミットしていないのであんまり自分から言わないのですが、Rubyのコミット権を持っています。
同い年のRubyコミッター8人の中では一番最初にコミット権をもらいました。2004年の1月のことなので、なんともう14年前!当時は大学生だったのですが、すごくドキドキしたことを覚えています。私がコミット権をもらったのは自分が作っているライブラリーがRuby本体に取り込まれたからなんですが、会ったこともない人がしっくりくると推薦してくれたことがうれしかったです。Rubyが使いやすいなぁと思って使っていたので、他のRubyistからしっくりくると思ってもらえる使い勝手のライブラリーを作れているのがわかったのが嬉しかったんでしょうねぇ。
クリアコードが始まったのは私が社会人になった2006年の7月で、私は(たしか)9月からクリアコードに合流しました。私の社会人歴のほぼすべてはクリアコードでのものです。
クリアコードはフリーソフトウェアを推進するのが一番大事な会社であって、Rubyを応援するのが一番大事な会社ではないので、クリアコードのメンバーみんながRubyistというわけではありません。クリアコードを始めた当時、Rubyistは私だけでした。
私はフリーソフトウェアも推進したいしRubyも応援したかったので、なにかしらRubyを活かせる場所をみつけてRubyを活用していました。たとえば、独立行政法人情報処理推進機構(IPA)平成20年度オープンソフトウェア利用促進事業(リンク切れ)が「迷惑メール対策でなにか」みたいなテーマで募集していたときは、「Rubyを組み込んだ迷惑メール対策システム」を応募しました。それに採択されてお金をもらって開発したのがmilter managerというフリーソフトウェアです。「Rubyを組み込むと動的にいろいろできて捗るよ!」というようにRubyを活かす場所を考えました。
milter managerの開発を始めたのが10年前なのですが、実は、milter managerきっかけでいくつかRubyをよくしたことがあります。1つがメモリーリークの修正で、もう1つが拡張ライブラリーのメモリー使用量を抑えやすくするAPIの追加です。
前者は再現スクリプトを作るのに数週間とか使った気がする(もちろん業務時間内でやっていた)ので、なかなか大変だったなぁという記憶があります。作業していたのは私ではないですが。
後者はmilter managerを開発し始めてから8年後の東京Ruby会議11での発表がきっかけで話が進みました。なにがつながるかわからないものですね。
直接お金を稼いでいるわけではないですが、仕事ですごくRubyを活用している例があります。それはRabbitという私がRubyを使って開発しているプレゼンテーションツールです。
クリアコードは「お客さんを探す」ではなく「お客さんに見つけてもらう」という仕事の探し方をしているので、クリアコードがいろいろ情報発信をすることはお金を稼ぐためにとても大事なことです。ここにいろいろ記事を書くこともそうですし、イベントで発表することもそうです。そして、イベントを発表するときに役立つのがRabbitです。
Rabbitは私が大学にいたときに研究関係の発表をするために作り始めたものです。私にとってはRubyで作ることが大事だったので、プレゼンテーションツールを作るためにRubyでできないことがあれば、それらをRubyでできるようにしながら作ってきました。たとえば、PDF出力機能やGUI・画像処理・マルチメディア機能などです。これらの機能があるからRubyを使っているという人がいるといいな。
Rabbitはすごくヒットしているツールではありません。RubyのイベントでもRabbitを使っている人は極少数派です。ただ、まつもとさんがMagicPointからRabbitに乗り換えたので、ヒットしていなくても私は満足です。みんながまつもとさんのいい話を聴けるのは私のおかげでもあるはず!(RubyKaigi 2017でまつもとさんに教えてもらったRabbitの問題の修正は25周年イベントには間に合わなかったなぁ。残念。)
本当のところを言うと、私は自分が使うために作っているのでユーザーが私だけでも満足だったりします。言い方を変えると、ヒットしていようがしていまいが私は別にどうでもいいです。そういえば、ここ数年、なぜか私以外のクリアコードのメンバーもRabbitを使っているのが不思議です。特に強制していないはずなんですが。。。
Ruby25周年ということで、クリアコードでのRubyの関わり方を一部紹介しました。Rubyを積極的に使っていく(人がいる)し、Rubyを使う中で得られた知見はRuby本体にフィードバックするし、まつもとさんのプレゼンをツールでサポートする、とかやっています。そうそう、RubyKaigiのスポンサーとしてお金を出すというのもやっていました。
これからも引き続き同じような感じでRubyと関わっていくつもりです。近い将来、Red Data Tools関連のことでも稼げるようになるといいなぁと思っています。今はあまり稼げていませんが、開発中に気づいたKeyError
の改良案をRuby 2.6に入れたり、Rubyのcsvをよくしたり、Rubyでできることを増やしたり、といった点でRubyをよくすることはでき始めています。
kafka-connect-fluentdを開発したので、その際に得た知見をまとめます。
Kafka Connectとは、簡単に説明するとKafka Consumersまたは、Kafka Producersの一種です。
あるデータソースからKafkaにデータを投入(Kafka Producers)したり、Kafkaから取り出したデータを別のところに流し(Kafka Consumers)たりするときにKafka ConsumersやKafka Producersでは、定型的なコードをたくさん書く必要がありました。 それを汎用化して、データを取り扱う箇所のコードのみを書けばいいようにまとめたものかKafka Connect APIです。
Connector Developer Guideを読めば、Kafka Connectorを開発する方法が書いてあります。 しかし、プロジェクトの初期設定は面倒なものです。定型コードを自動生成できるコマンドを使うと開発を始めるのが楽になります。
$ mvn -e archetype:generate -B -DarchetypeGroupId=io.confluent.maven \
-DarchetypeArtifactId=kafka-connect-quickstart \
-DarchetypeVersion=0.10.0.0 \
-Dpackage=org.fluentd.kafka \
-DgroupId=org.fluentd.kafka \
-DartifactId=kafka-connect-fluentd \
-Dversion=0.0.1
参考: https://github.com/jcustenborder/kafka-connect-archtype
@jcustenborderさんは、Confluentに所属し、多くのKafka Connectorを公開している方です。
これでpom.xmlやソースコードの雛形などが一通り生成されるので、開発を始めることができます。
kafka-connect-fluentdの場合は、自動生成されたpom.xmlをbuild.gradleに変換して利用しています。
Kafka Producerに対応するConnectorがSourceです。あるデータソースからKafkaにデータを書き込むために使用します。
kafka-connect-fluentd の場合は、Fluentdのout_forwardからkafka-connect-fluentdのSourceにデータを投げてKafkaにデータを流します。
Sourceを開発する場合、以下を実装する必要があります。kafka-connect-fluentdの場合は以下のような名前にしました。
SourceConnectorを継承し、必要なメソッドを実装します。
public String version()
public List<Map<String, String>> taskConfigs(int taskMax)
taskMax
の値に応じて返り値のList
にMap<String,String>
を詰め込みますpublic void start(Map<String, String> properties)
public Class<? extends Task> taskClass()
FluentdSourceTaks.class
) を返しますpublic void stop()
public ConfigDef config()
FluentdSourceConnectorConfig.conf()
)を返しますpublic static ConfigDef conf()
をConfigDef
のAPIを使って設定を定義します。
public static ConfigDef conf() {
return new ConfigDef()
.define(FLUENTD_PORT, Type.INT, 24224, Importance.HIGH,
"Port number to listen. Default: 24224")
.define(FLUENTD_BIND, Type.STRING, "0.0.0.0", Importance.HIGH,
"Bind address to listen. Default: 0.0.0.0");
}
define()
は引数が5つあり、前から順に名前、型、デフォルト値、重要度、説明です。引数の数が異なるdefine()
もありますが、詳細はjavadocを参照してください。
Sourceを実装する際に最も重要なクラスです。以下のメソッドを実装します。
public void start(Map<String, String> properties)
poll()
から定期的にアクセスされる共通のリソースを準備します。daemonやスレッドが必要な場合は、ここで起動します。public List<SourceRecord> poll()
FluentdSourceTask
ではキューに溜めたデータをList<SourceRecord>
に詰めて返します。public void stop()
start
で起動したdaemonやスレッドをここで停止します。public String version()
このクラスは複数のスレッドから使用される可能性があります。特にpoll()
でアクセスするインスタンス変数などについてはマルチスレッドを意識したコードを書く必要があります。
Kafka Consumerに対応するConnectorがSinkです。Kafkaから取り出したデータをどこかに書き込みます。
kafka-connect-fluentd の場合は、Kafkaからデータを取り出してkafka-connect-fluentdのSinkからFluentdのin_forwardへデータを流します。
Sinkを開発する場合、以下を実装する必要があります。kafka-connect-fluentdの場合は以下のような名前にしました。
SinkConnectorを継承し必要なメソッドを実装します。
public String version()
public Class<? extends Task> taskClass()
FluentdSinkTask.class
)を返しますpublic List<Map<String, String>> taskConfigs(int maxTasks)
taskMax
の値に応じて返り値のList
にMap<String,String>
を詰め込みますpublic void stop()
public ConfigDef config()
FluentdSourceConnectorConfig
と同じようにpublic static ConfigDef conf()
をConfigDef
のAPIを使って設定を定義します。
Sinkを実装する際に最も重要なクラスです。以下のメソッドを実装します。
public String version()
public void start(Map<String, String> properties)
FluentdSinkTask
ではFluency
のインスタンスを作成しています。public void put(Collection<SinkRecord> collection)
FluentdSinkTask
ではFluencyでデータをFluentdに送信しています。public void flush(Map<TopicPartition, OffsetAndMetadata> map)
public void stop()
FluentdSinkTask
ではFluencyが全てのバッファをflushするのを待ちます。SchemaBuilder
とStruct
を利用すれば任意のデータ構造をスキーマにマッピングできるのですが、MessagePackとKafka ConnectのSchemaで表現できる型に違いがあるようです。
サポートしている型の違いついて理解を深め、差異をうまく吸収できるような実装をしたいと考えています。
GradleでMaven Centralにライブラリを公開する - たごもりすメモを参考にしてGradleでやりました。
依存関係にKafkaが入っているけど、配布するjarにはKafka関係のファイルを入れてはいけないことに注意が必要でした。
思っていたよりも簡単に実装できたのでKafkaにデータを投入したり、Kafkaからデータを取り込みたいのだけどProducer/Consumerはちょっと難しいという場合にKafka Connectを検討してみるとよいのではないでしょうか。
PLAZMA OSS Day: TD Tech Talk 2018 – Cooperative works for Fluentd Community の補足記事です。
kafka-connect-fluentdとfluent-pluguin-kafkaの性能を比較したいと考えたときに、高い負荷を与える簡単に使えるツールがありませんでした。
dummerとFluentd組み込みのin_tailを使用すれば、負荷をかけることができるのはわかっていましたが、高い負荷を与えるためには複数のFluentdを動作させる必要があり、セットアップや管理が煩雑になることがわかっていました。
例えば、以下のようにたくさんのFluentdプロセスで1つのFluentdに負荷をかけるような感じになります。
参考: https://github.com/okumin/influent-benchmark
これはこれできちんと動きますが、以下の点で手軽ではありませんでした。
そこでdummer + in_tailよりも簡単に使うことができる fluent-benchmark-client というコマンドラインツールを作りました。
Fluentd Forward Protocolを扱う部分はFluencyを採用しました。*1
Kotlinを採用した理由は以下の通りです。
初めて書くので慣れてないという以外にデメリットはありませんでした。
fluent-benchmark-clientを使うと、1つのプロセスから大量のイベントをFluentdに送信することができます。
0.5.0がリリース済みなので使えます。
$ tar xf fluent-benchmark-client-0.5.0.tar
$ cd fluent-benchmark-client
$ ./bin/fluent-benchmark-client --max-buffer-size=4g --period=1m --n-events-per-sec=1000
これで、localhost:24224
に1000 events/secを1分間送ることができます。
送る内容は、{ "message": "Hello, Fluentd! This is a test message." }
です。
--max-buffer-size=4g
はFluencyのバッファのサイズを指定しています。これを指定することにより大きなワークロードにも対応可能です。
送信するワークロードの内容はコマンドラインオプションで変更することができます。 また、ファイルから読むこともできるので複雑なワークロードにも対応可能です。対応しているフォーマットはLTSVとJSONL*2です。 ここで利用するファイルはdummerで生成することを想定しています。
初期の実装では、以下のようにビジーループの中でMap<String, Object>
を作っていました。さらにemit
の内部でMessagePackへの変換が行われていたため速度が出ていませんでした。
約50万 events/sec 出てたかどうかも怪しいという記録が手元に残っていました。
while (true) {
fluency.emit(tag, mapOf("message" to "Hello Kotlin!"))
}
これを以下のように、ループの外でMessagePackに変換してからemitすると、手元のマシンで200万 events/secくらいの速度でemitできるようになりました。
val buffer = ByteArrayOutputStream()
val packer = MessagePack.newDefaultPacker(buffer)
packer.packMapHeader(1)
packer.packString("message")
packer.packString("Hello Kotlin!")
packer.flush()
packer.close()
val mapValue = buffer.toByteArray()
while (true) {
fluency.emit(tag, mapValue)
}
ファイルからLTSVやJSONLを読み込む場合も、ファイルを全部読み込んでMessagePackに変換してからemitするようにしているので、速度は変わりません。 しかし、全データをメモリーに載せるので注意が必要です。
まだ、できていないことがいくつかあります。
kafka-connect-fluentdとfluent-plugin-kafkaの性能を比較するために、ちょうどよいベンチマークツールがなかったのでfluent-benchmark-clientを開発しました。
PLAZMA OSS Day: TD Tech Talk 2018 – Cooperative works for Fluentd Community の補足記事です。
kafka-connect-fluentdとfluent-pluguin-kafkaの性能を比較したいと考えたときに、同じ指標で性能を比較したいと考えていました。 kafka-connect-fluentdのFluentdSourceConnectorとfluent-plugin-kafkaのoutputプラグインはどちらもKafkaに書き込むので、Kafkaのスループットを見ればよいはずです。
幸いKafkaにはメトリックスを取るためのAPIが提供されていました。
しかしKafkaに組み込みのKafkaCSVMetricsReporterは、動きませんでした*1。他にもいくつか既存のMetricsReporterを探してみましたがちょうどよいものはありませんでした。 Kafkaのメトリックスも、同じフィールドに型の異なる値が入っていることがあり、そのままでは扱いづらいものでした。
そこで、全てのメトリックスをFluentdに流せば、それ以降はFluentdのプラグインで好きなように加工できるので素早くデータを可視化するための環境を作ることができそうだと考えました。 またKafkaの提供するメトリックスは大量にあるので、kafka-connect-fluentdとfluent-plugin-kafkaの性能比較に使えるものだけを選別することも簡単にできそうだと考えていました。
kafka-fluent-metrics-reporterを作りました。
KafkaのプラグインはScalaで書かれていたり、Javaで書かれていたり、Kotlinで書かれていたり様々なので、これもKotlinで実装しました。
特に工夫したところはなく、Kafkaの提供してくれるメトリックスをmapにつめてFluentdに送るだけでした。
こちらも未リリースなので自分でビルドする必要があります。
$ git clone https://github.com/okkez/kafka-fluent-metrics-reporter.git
$ cd kafka-fluent-metrics-reporter
$ ./gradlew shadowJar
$ cp build/libs/kafka-fluent-metrics-reporter-1.0-SNAPSHOT-all.jar /path/to/kafka_2.11-1.0.0/libs
以下の設定をKafka Serverの設定ファイル server.properties に追加します。
kafka.metrics.reporters=org.fluentd.kafka.metrics.KafkaFluentMetricsReporter
kafka.metrics.polling.interval.secs=5
kafka.fluent.metrics.enabled=true
kafka.fluent.metrics.host=localhost
kafka.fluent.metrics.port=24224
kafka.fluent.metrics.tagPrefix=kafka-metrics
Fluentd側は以下のように設定します。このように設定してしばらく流してみると、大体様子がわかると思います。
<source>
@type forward
port 24224
</source>
<match kafka-metrics.**>
@type copy
<store>
@type file
path log/${tag}
<buffer tag>
</buffer>
</store>
<store>
@type stdout
</store>
</match>
タグにメトリックスの名前が入ってくるので、必要なメトリックスをタグで絞り込むことができます。
kafka-fluent-metrics-reporterを使うことで、Fluentdを経由してKafkaのメトリックスを簡単に可視化することができました。
InfluxDBに直接流すものやPrometheus用のexporterなどもありましたが、Fluentdに流すものはなかったので作りました。
他の Kafka metrics reporter の実装例です。
*1 問題は報告済みです