ククログ

株式会社クリアコード > ククログ > Apache Flume NGへFluentdからレコードを送るには

Apache Flume NGへFluentdからレコードを送るには

はじめに

クリアコードではFluentdというソフトウェアの開発に参加しています。Fluentdはログ収集や収集したデータの分配・集約などを行うソフトウェアです。

また、Fluentdにはプラグインのしくみがあり、たくさんのFluentdのプラグインが開発されています。

同じようなログ収集や収集したデータ分配・集約のソフトウェアとして、Apache Flume NGというものもあります。

Flume NGも同じく、その仕組みがあります。Flume NGのSourceにはThriftプロトコルを利用したものが存在し、これを利用してThriftが利用できる言語であれば気軽にFlume NGへの取り込み(Source)ができる仕組みとなっています。

これらのソフトウェアで相互にデータの分配・集約をする目的で開発されたFluentdのプラグインがfluent-plugin-flumeです。しかし、Apache Flume側の開発が進んでいた事によりfluent-plguin-flumeを追従させる必要がありました。

この記事ではfluent-plugin-flumeを最新のFlume NGのthriftプロトコルにどう対応させたかを解説します。

Flume Legacy と Flume NG (Next generation)

fluent-plugin-flumeが何故今のバージョンのflumeで使用出来なくなっていたかを解説するにはFlumeの歴史をひも解く必要があります。

Flumeはソフトウェアの歴史上、2つのバージョン系統があります。

一つはFlume Legacyと呼ばれる0.9.0までのもの。もうひとつは1.0以上のNG (Next Generation) と呼ばれるものです。

まずは、Flume LegacyからFlume NGでThriftプロトコル周りがどのように変わったかという事を見ていきます。

Flume Legacy時代のThriftプロトコル

Flume Legacy時代のThriftプロトコルは次のようになっていました。

https://github.com/fluent/fluent-plugin-flume/blob/c1d3d3308618e3d447d67f4f78678de8ee64793e/lib/fluent/plugin/thrift/flume.thrift から抜粋:

namespace java com.cloudera.flume.handlers.thrift

typedef i64 Timestamp

enum Priority { 
  FATAL = 0,
  ERROR = 1,
  WARN = 2,
  INFO = 3,
  DEBUG = 4,
  TRACE = 5
}

enum EventStatus {
  ACK = 0,
  COMMITED = 1,
  ERR = 2
}

struct ThriftFlumeEvent {
  1: Timestamp timestamp,
  2: Priority priority,
  3: binary body,
  4: i64 nanos,
  5: string host,
  6: map<string,binary> fieldss
}

# Instead of using thrift's serialization, we just assume the contents are serialized already.
struct RawEvent {
  1: binary raw
}

service ThriftFlumeEventServer {
  oneway void append( 1:ThriftFlumeEvent evt ),
  oneway void rawAppend( 1:RawEvent evt),
  EventStatus ackedAppend( 1: ThriftFlumeEvent evt ), 
    
  void close(), 
}

このプロトコルはThriftFlumeEventServerというserviceが付いています。また、ackedAppendなるものが見えます。これらによりFlume Legacyの時代はFlumeへの取り込み(Source)、Flumeからの送出(Sink)がどちらもThriftプロトコルで実装できるようになっていました。

ではFlume NGのThriftプロトコルの定義を見てみます。

https://github.com/apache/flume/blob/88b3fee10f1ec10dc33872710a4d4084c86b5e7d/flume-ng-sdk/src/main/thrift/flume.thrift から抜粋:

namespace java org.apache.flume.thrift

struct ThriftFlumeEvent {
  1: required map <string, string> headers,
  2: required binary body,
}

enum Status {
  OK,
  FAILED,
  ERROR,
  UNKNOWN
}

service ThriftSourceProtocol {
  Status append(1: ThriftFlumeEvent event),
  Status appendBatch(1: list<ThriftFlumeEvent> events),
}

おや?かなりThriftプロトコルでできることが限られているように見えます。 例えば、appendappendBatch はThriftプロトコルの上に載せることは可能ですが、ackedAppend がありません。

このメソッドはfluent-plugin-flumeのin_flumeで使われていたものです:https://github.com/fluent/fluent-plugin-flume/blob/c1d3d3308618e3d447d67f4f78678de8ee64793e/lib/fluent/plugin/in_flume.rb#L146

また、Thriftプロトコルの名称が変わり、ThriftSourceProtocol となっていることから、このThriftプロトコルはFlume NGへのSource専用ということが読み取れます。

FluentdからFlumeへレコードを送信する

やっと本題です。FluentdからFlumeへレコードを送信するにはこの新しい ThriftSourceProtocol に対応しないといけないことがわかりました。

そうして対応させたものをPull Requestしました。このPull Requestにより、Fluentd からFlume NGへのSourceが動くようになりました。また、Flume LegacyとFlume NGとでは ThriftFlumeEvent の構造体が変わっていることにも注意が必要でした。

また、残念ながらFlume NGからFluentdへの送出はThriftプロトコルに載せることができなかったので、先のPull Requestでは制限事項としました。

おわりに

この変更はfluent-plugin-flume 0.1.2に取り込まれました。FluentdからFlumeに送れるようになったfluent-plugin-flumeを是非試してみてください!