kafka-connect-fluentdを開発したので、その際に得た知見をまとめます。
Kafka Connectとは、簡単に説明するとKafka Consumersまたは、Kafka Producersの一種です。
あるデータソースからKafkaにデータを投入(Kafka Producers)したり、Kafkaから取り出したデータを別のところに流し(Kafka Consumers)たりするときにKafka ConsumersやKafka Producersでは、定型的なコードをたくさん書く必要がありました。 それを汎用化して、データを取り扱う箇所のコードのみを書けばいいようにまとめたものかKafka Connect APIです。
Connector Developer Guideを読めば、Kafka Connectorを開発する方法が書いてあります。 しかし、プロジェクトの初期設定は面倒なものです。定型コードを自動生成できるコマンドを使うと開発を始めるのが楽になります。
$ mvn -e archetype:generate -B -DarchetypeGroupId=io.confluent.maven \
-DarchetypeArtifactId=kafka-connect-quickstart \
-DarchetypeVersion=0.10.0.0 \
-Dpackage=org.fluentd.kafka \
-DgroupId=org.fluentd.kafka \
-DartifactId=kafka-connect-fluentd \
-Dversion=0.0.1
参考: https://github.com/jcustenborder/kafka-connect-archtype
@jcustenborderさんは、Confluentに所属し、多くのKafka Connectorを公開している方です。
これでpom.xmlやソースコードの雛形などが一通り生成されるので、開発を始めることができます。
kafka-connect-fluentdの場合は、自動生成されたpom.xmlをbuild.gradleに変換して利用しています。
Kafka Connector Source
Kafka Producerに対応するConnectorがSourceです。あるデータソースからKafkaにデータを書き込むために使用します。
kafka-connect-fluentd の場合は、Fluentdのout_forwardからkafka-connect-fluentdのSourceにデータを投げてKafkaにデータを流します。
Sourceを開発する場合、以下を実装する必要があります。kafka-connect-fluentdの場合は以下のような名前にしました。
-
FluentdSourceConnector (SourceConnectorを継承)
-
FluentdSourceConnectorConfig (AbstractConfigを継承)
-
FluentdSourceTask (SourceTaskを継承)
FluentdSourceConnector
SourceConnectorを継承し、必要なメソッドを実装します。
-
public String version()
- バージョン文字列を返します
-
public List<Map<String, String>> taskConfigs(int taskMax)
-
マルチスレッドに対応させたい場合は、
taskMax
の値に応じて返り値のList
にMap<String,String>
を詰め込みます -
設定ファイル(properties)に指定した値をそのままセットします
-
-
public void start(Map<String, String> properties)
- Connector開始時にやることを書きます
-
public Class<? extends Task> taskClass()
- taskクラス (
FluentdSourceTaks.class
) を返します
- taskクラス (
-
public void stop()
- Connectorの終了時にやることを書きます
-
public ConfigDef config()
- 設定の定義(
FluentdSourceConnectorConfig.conf()
)を返します
- 設定の定義(
FluentdSourceConnectorConfig
public static ConfigDef conf()
をConfigDef
のAPIを使って設定を定義します。
public static ConfigDef conf() {
return new ConfigDef()
.define(FLUENTD_PORT, Type.INT, 24224, Importance.HIGH,
"Port number to listen. Default: 24224")
.define(FLUENTD_BIND, Type.STRING, "0.0.0.0", Importance.HIGH,
"Bind address to listen. Default: 0.0.0.0");
}
define()
は引数が5つあり、前から順に名前、型、デフォルト値、重要度、説明です。引数の数が異なるdefine()
もありますが、詳細はjavadocを参照してください。
FluentdSourceTask
Sourceを実装する際に最も重要なクラスです。以下のメソッドを実装します。
-
public void start(Map<String, String> properties)
poll()
から定期的にアクセスされる共通のリソースを準備します。daemonやスレッドが必要な場合は、ここで起動します。
-
public List<SourceRecord> poll()
- Kafka Connectのフレームワーク側から定期的に呼ばれます。
FluentdSourceTask
ではキューに溜めたデータをList<SourceRecord>
に詰めて返します。
- Kafka Connectのフレームワーク側から定期的に呼ばれます。
-
public void stop()
- このタスクを止めるときに、必要な処理があれば書きます。
start
で起動したdaemonやスレッドをここで停止します。
- このタスクを止めるときに、必要な処理があれば書きます。
-
public String version()
- バージョン文字列を返します。定型のコードなので詳細は省略します。
このクラスは複数のスレッドから使用される可能性があります。特にpoll()
でアクセスするインスタンス変数などについてはマルチスレッドを意識したコードを書く必要があります。
Kafka Connector Sink
Kafka Consumerに対応するConnectorがSinkです。Kafkaから取り出したデータをどこかに書き込みます。
kafka-connect-fluentd の場合は、Kafkaからデータを取り出してkafka-connect-fluentdのSinkからFluentdのin_forwardへデータを流します。
Sinkを開発する場合、以下を実装する必要があります。kafka-connect-fluentdの場合は以下のような名前にしました。
-
FluentdSinkConnector (SinkConnectorを継承)
-
FluentdSinkConnectorConfig (AbstractConfigを継承)
-
FluentdSinkTask (SinkTaskを継承)
FluentdSinkConnector
SinkConnectorを継承し必要なメソッドを実装します。
-
public String version()
- バージョン文字列を返します
-
public Class<? extends Task> taskClass()
- taskクラス(
FluentdSinkTask.class
)を返します
- taskクラス(
-
public List<Map<String, String>> taskConfigs(int maxTasks)
-
マルチスレッドに対応させたい場合は、
taskMax
の値に応じて返り値のList
にMap<String,String>
を詰め込みます -
設定ファイル(properties)に指定した値をそのままセットします
-
-
public void stop()
- Connectorの終了時にやることがあれば書きます
-
public ConfigDef config()
- 設定の定義('FluentdSinkConnectorConfig.conf()')を返します
FluentdSinkConnectorConfig
FluentdSourceConnectorConfig
と同じようにpublic static ConfigDef conf()
をConfigDef
のAPIを使って設定を定義します。
FluentdSinkTask
Sinkを実装する際に最も重要なクラスです。以下のメソッドを実装します。
-
public String version()
- バージョン文字列を返します。定型のコードなので詳細は省略します。
-
public void start(Map<String, String> properties)
- このタスクを初期化します。
FluentdSinkTask
ではFluency
のインスタンスを作成しています。
- このタスクを初期化します。
-
public void put(Collection<SinkRecord> collection)
- Kafkaから定期的に呼ばれます。
FluentdSinkTask
ではFluencyでデータをFluentdに送信しています。
- Kafkaから定期的に呼ばれます。
-
public void flush(Map<TopicPartition, OffsetAndMetadata> map)
- これをきちんと実装すれば exactly-once に対応できるらしいです。しかし Fluentd は exactly-once に対応していないので、今のところ単にFluencyのバッファをflushするだけにしています。
-
public void stop()
- タスクの停止にやることがあれば、中身を実装します。
FluentdSinkTask
ではFluencyが全てのバッファをflushするのを待ちます。
- タスクの停止にやることがあれば、中身を実装します。
スキーマについて
SchemaBuilder
とStruct
を利用すれば任意のデータ構造をスキーマにマッピングできるのですが、MessagePackとKafka ConnectのSchemaで表現できる型に違いがあるようです。
サポートしている型の違いついて理解を深め、差異をうまく吸収できるような実装をしたいと考えています。
パッケージング
GradleでMaven Centralにライブラリを公開する - たごもりすメモを参考にしてGradleでやりました。
依存関係にKafkaが入っているけど、配布するjarにはKafka関係のファイルを入れてはいけないことに注意が必要でした。
まとめ
思っていたよりも簡単に実装できたのでKafkaにデータを投入したり、Kafkaからデータを取り込みたいのだけどProducer/Consumerはちょっと難しいという場合にKafka Connectを検討してみるとよいのではないでしょうか。