はじめに
クリアコードではFluentdというソフトウェアの開発に参加しています。Fluentdはログ収集や収集したデータの分配・集約などを行うソフトウェアです。
また、Fluentdにはプラグインのしくみがあり、たくさんのFluentdのプラグインが開発されています。
同じようなログ収集や収集したデータ分配・集約のソフトウェアとして、Apache Flume NGというものもあります。
Flume NGも同じく、その仕組みがあります。Flume NGのSourceにはThriftプロトコルを利用したものが存在し、これを利用してThriftが利用できる言語であれば気軽にFlume NGへの取り込み(Source)ができる仕組みとなっています。
これらのソフトウェアで相互にデータの分配・集約をする目的で開発されたFluentdのプラグインがfluent-plugin-flumeです。しかし、Apache Flume側の開発が進んでいた事によりfluent-plguin-flumeを追従させる必要がありました。
この記事ではfluent-plugin-flumeを最新のFlume NGのthriftプロトコルにどう対応させたかを解説します。
Flume Legacy と Flume NG (Next generation)
fluent-plugin-flumeが何故今のバージョンのflumeで使用出来なくなっていたかを解説するにはFlumeの歴史をひも解く必要があります。
Flumeはソフトウェアの歴史上、2つのバージョン系統があります。
一つはFlume Legacyと呼ばれる0.9.0までのもの。もうひとつは1.0以上のNG (Next Generation) と呼ばれるものです。
まずは、Flume LegacyからFlume NGでThriftプロトコル周りがどのように変わったかという事を見ていきます。
Flume Legacy時代のThriftプロトコル
Flume Legacy時代のThriftプロトコルは次のようになっていました。
namespace java com.cloudera.flume.handlers.thrift
typedef i64 Timestamp
enum Priority {
FATAL = 0,
ERROR = 1,
WARN = 2,
INFO = 3,
DEBUG = 4,
TRACE = 5
}
enum EventStatus {
ACK = 0,
COMMITED = 1,
ERR = 2
}
struct ThriftFlumeEvent {
1: Timestamp timestamp,
2: Priority priority,
3: binary body,
4: i64 nanos,
5: string host,
6: map<string,binary> fieldss
}
# Instead of using thrift's serialization, we just assume the contents are serialized already.
struct RawEvent {
1: binary raw
}
service ThriftFlumeEventServer {
oneway void append( 1:ThriftFlumeEvent evt ),
oneway void rawAppend( 1:RawEvent evt),
EventStatus ackedAppend( 1: ThriftFlumeEvent evt ),
void close(),
}
このプロトコルはThriftFlumeEventServerというserviceが付いています。また、ackedAppendなるものが見えます。これらによりFlume Legacyの時代はFlumeへの取り込み(Source)、Flumeからの送出(Sink)がどちらもThriftプロトコルで実装できるようになっていました。
ではFlume NGのThriftプロトコルの定義を見てみます。
namespace java org.apache.flume.thrift
struct ThriftFlumeEvent {
1: required map <string, string> headers,
2: required binary body,
}
enum Status {
OK,
FAILED,
ERROR,
UNKNOWN
}
service ThriftSourceProtocol {
Status append(1: ThriftFlumeEvent event),
Status appendBatch(1: list<ThriftFlumeEvent> events),
}
おや?かなりThriftプロトコルでできることが限られているように見えます。
例えば、append
や appendBatch
はThriftプロトコルの上に載せることは可能ですが、ackedAppend
がありません。
このメソッドはfluent-plugin-flumeのin_flumeで使われていたものです:https://github.com/fluent/fluent-plugin-flume/blob/c1d3d3308618e3d447d67f4f78678de8ee64793e/lib/fluent/plugin/in_flume.rb#L146
また、Thriftプロトコルの名称が変わり、ThriftSourceProtocol
となっていることから、このThriftプロトコルはFlume NGへのSource専用ということが読み取れます。
FluentdからFlumeへレコードを送信する
やっと本題です。FluentdからFlumeへレコードを送信するにはこの新しい ThriftSourceProtocol
に対応しないといけないことがわかりました。
そうして対応させたものをPull Requestしました。このPull Requestにより、Fluentd
からFlume NGへのSourceが動くようになりました。また、Flume LegacyとFlume NGとでは ThriftFlumeEvent
の構造体が変わっていることにも注意が必要でした。
また、残念ながらFlume NGからFluentdへの送出はThriftプロトコルに載せることができなかったので、先のPull Requestでは制限事項としました。
おわりに
この変更はfluent-plugin-flume 0.1.2に取り込まれました。FluentdからFlumeに送れるようになったfluent-plugin-flumeを是非試してみてください!