はじめに
Fluentdの周辺のIssueチケットを見ていたらProtocol BuffersをFluentdのInputプラグインで扱えると面白そう1ということで、対応を考えてみた畑ケです。
クリアコードはFluentdの開発に参加しています。
Fluentdにはプラグインというしくみがあり、たくさんのプラグインが開発されています。 Fluentdのプラグインには、Parserプラグインという種類のプラグインがあります。 この種類のプラグインはInputプラグインが受け取ったテキストやバイナリーデータをFluentdで扱いやすくするために使用されます。
Protocol Buffersとは
Protocol Buffersとは、プログラミング言語間の差異を吸収してデータのやり取りを行うデータ形式です。 データ構造を定義して、それをいくつかの言語のProtocol Buffersを扱える表現にコンパイルすることでProtocol Buffersの形式にシリアライズしたり、デシリアライズできます。
例えば、Goで書いたプログラムでProtocol Buffers形式でシリアライズされたデータをC++で書いたプログラムからProtocol Buffersのデータ定義を用いて元のデータを復元できます。
FluentdにProtocol Buffersを組み込む
Fluentdにはin_tcp
やin_http
のような<parse>
ディレクティブを処理できるInputプラグインがあります。InputプラグインはParserプラグインとしてテキストやバイナリーのパースの処理をプラグインとして持ってこれるものがあります。この任意のパース処理を差し替えることができるようにする仕組みがParserプラグインです。
まとめると、FluentdでProtocol Buffersを処理するのに最適な場所はParserです。
そこで、InputプラグインでProtocol Buffersでシリアライズされたバイナリデータを処理できるようにProtocol Buffersを処理するParserプラグインを作成しました。
例えば、このParserプラグインを使ったin_http
プラグインを使ったHTTPエンドポイントを立てると、
FluentdがProtocol BuffersのAPIのHTTPエンドポイントとして振る舞うことができ、色んなシステムと直接つなげることができます。
fluent-plugin-parser-protobufの使い方
fluent-plugin-parser-protobufはProtocol Buffersのコンパイラーが必要です。この記事ではProtocol Buffers v3の場合について解説します。
Protocol Buffers v3のコンパイラーや各言語ごとのライブラリーは https://github.com/protocolbuffers/protobuf/releases からダウンロードできます。 Protocol Buffers v3をFluentdで使う手順では、Protocol Buffersのコンパイラーであるprotocがインストール済みであると仮定して解説をします。
Protocol BuffersのIDLの文法はProtocol Buffersの公式ドキュメントの概要を参照してください。
例えば、下記のようなprotobufのIDLを作成します。
simple.proto
syntax = "proto3";
import "google/protobuf/timestamp.proto";
message SearchRequest {
string query = 1;
int32 page_number = 2;
int32 result_per_page = 3;
enum Corpus {
UNIVERSAL = 0;
WEB = 1;
IMAGES = 2;
LOCAL = 3;
NEWS = 4;
PRODUCTS = 5;
VIDEO = 6;
}
Corpus corpus = 4;
google.protobuf.Timestamp timestamp = 5;
}
これを、protocでコンパイルします。
$ protoc --proto_path=/path/to/idl --ruby_out=/path/to/output simple.proto
すると、下記のRubyのクラスが生成されます。
simple_pb.rb
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: simple.proto
require 'google/protobuf'
require 'google/protobuf/timestamp_pb'
Google::Protobuf::DescriptorPool.generated_pool.build do
add_file("simple.proto", :syntax => :proto3) do
add_message "SearchRequest" do
optional :query, :string, 1
optional :page_number, :int32, 2
optional :result_per_page, :int32, 3
optional :corpus, :enum, 4, "SearchRequest.Corpus"
optional :timestamp, :message, 5, "google.protobuf.Timestamp"
end
add_enum "SearchRequest.Corpus" do
value :UNIVERSAL, 0
value :WEB, 1
value :IMAGES, 2
value :LOCAL, 3
value :NEWS, 4
value :PRODUCTS, 5
value :VIDEO, 6
end
end
end
SearchRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("SearchRequest").msgclass
SearchRequest::Corpus = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("SearchRequest.Corpus").enummodule
このprotobufクラスがあれば、fluent-plugin-parser-protobufにProtocol Buffersの定義を読み込ませることができます。
<parse>
ディレクティブの中にprotobufパーサーに関連する設定を書きます。
fluent.conf
<source>
@type http
port 8080
<parse>
@type protobuf
class_name SearchRequest
class_file "#{File.expand_path(File.join('path', 'to', 'simple_pb.rb'))}"
protobuf_version protobuf3
</parse>
</source>
<match protobuf>
@type stdout
</match>
疎通確認テストとしてHTTPでProtocol Buffersでシリアライズしたリクエストを送るようにします。 そのため、以下のRubyスクリプトを用意します。
test_http.rb
require 'google/protobuf'
require "net/http"
require_relative "path/to/simple_pb"
def encoded_simple_binary
request = SearchRequest.new(query: "q=Fluentd",
page_number: 404,
result_per_page: 10,
corpus: :WEB,
timestamp: Time.now)
SearchRequest.encode(request)
end
uri = URI.parse("http://localhost:8080/protobuf")
params = encoded_simple_binary
req = Net::HTTP.new(uri.host, uri.port)
req.post(uri.path, params.to_s)
実行結果
Fluentdとテストスクリプトはそれぞれ、別の端末で実行します。
$ bundle exec ruby test_http.rb
$ bundle exec fluentd -c fluent.conf -p lib/fluent/plugin
<snip>
2020-06-01 16:45:43 +0900 [info]: #0 fluentd worker is now running worker=0
2020-06-01 16:45:46.377515000 +0900 protobuf: {"query":"q=Fluentd","page_number":404,"result_per_page":10,"corpus":"WEB","timestamp":{"seconds":1590997546,"nanos":371940000}}
最後の2020-06-01 16:45:46.377515000 +0900 protobuf: {"query":"q=Fluentd","page_number":404,"result_per_page":10,"corpus":"WEB","timestamp":{"seconds":1590997546,"nanos":371940000}}
により、Protocol BuffersでシリアライズされていたHTTPリクエストbodyがfluent-plugin-parser-protobufによりパースされ、Hashオブジェクトに分解されているのが確認できます。
まとめ
FluentdでProtocol Buffersを扱うにはどのようにしたら良いのかの方針を立て、実際にProtocol Buffersをパースするための手順を解説しました。 記事で解説した方法でFluentdがProtocol BuffersのAPIのHTTPエンドポイントとして振る舞うことができればより色んなシステムと直接つなげることができます。 今回のParserプラグインでは取り込むだけですが、FluentdのProtocol BuffersのFormatterプラグインを作成するとProtocol Buffersを用いてデータのやり取りを行うシステムに直接データを送ることができるようになるでしょう。
当社では、お客さまからの技術的なご質問・ご依頼に有償にて対応するFluentdサポートサービスを提供しています。Fluentd/Fluent Bitをエンタープライズ環境において導入/運用されるSIer様、サービス提供事業者様は、お問い合わせフォームよりお問い合わせください。