はじめに
fluent-plugin-elasticsearchはよく使われているプラグインの一つです。 このプラグインをメンテナンスするためには、Fluentdの知識だけでなく、Elasticsearchが今後どのようになっていくかも知っておく必要があります。 取り掛かりとして、fluent-plugin-elasticsearchの構造をまず軽く説明します。fluent-plugin-elasticsearchのElasticsearchのAPIリクエストは自前で実装しているのではなく、elasticsearch, elasticsearch-api, elasticsearch-transportというgemに依存しています。それぞれ、ElasticsearchのRubyクライアントライブラリをカプセル化して共通のインターフェースで使用できるようにgem化したもの、APIリクエストをgem化したもの、HTTPリクエストの方式をgem化したものです。
elasticsearch-transport
この中で、今回はelasticsearch-transport
について取り上げます。elasticsearch-transport
は複数のHTTPバックエンドを切り替えて使用することができます。
fluent-plugin-elasticsearchでは、これまで以下のようにHTTPリクエストのバックエンドライブラリとしてexcon
が固定で使われていました。
def client
@_es ||= begin
excon_options = { client_key: @client_key, client_cert: @client_cert, client_key_pass: @client_key_pass }
adapter_conf = lambda {|f| f.adapter :excon, excon_options } # f.adaptorに ':excon' 固定
transport = Elasticsearch::Transport::Transport::HTTP::Faraday.new(get_connection_options.merge(
options: {
reload_connections: @reload_connections,
reload_on_failure: @reload_on_failure,
resurrect_after: @resurrect_after,
retry_on_failure: 5,
logger: @transport_logger,
transport_options: {
headers: { 'Content-Type' => @content_type.to_s },
request: { timeout: @request_timeout },
ssl: { verify: @ssl_verify, ca_file: @ca_file, version: @ssl_version }
},
http: {
user: @user,
password: @password
}
}), &adapter_conf)
es = Elasticsearch::Client.new transport: transport
# ...
exconの問題点
exconはHTTPのバックエンドライブラリとしては申し分がないのですが、keepaliveがデフォルトで有効にならないという問題がありました。 nginxなどのproxy配下ではkeepaliveが有効でないと接続が頻繁に切れ、効率的な転送が行えないという問題が報告されました。
elasticsearch-transportでFaradyアダプターのHTTPバックエンドを切り替えられるようにする
elasticsearch-transport
はいくつかのHTTPバックエンドを使用することができます。その一つがFaradayアダプターを使用するものです。
Faradayはそれ単体ではHTTPを扱う統一的なインターフェースを提供するだけですが、実際のHTTPリクエストはHTTPを扱うライブラリに担当させます。
例えば、exconを使ってHTTPリクエストを出すには以下のようにします。
require 'excon'
client = Elasticsearch::Client.new(host: 'localhost', port: '9200') do |f|
f.response :logger
f.adapter :excon
end
この状態では、exconのみしかHTTPのバックエンドに使用することができません。
例えば、typhoeus
を使ってHTTPリクエストを投げるようにするには、
require 'typhoeus'
require 'typhoeus/adapters/faraday'
client = Elasticsearch::Client.new(host: 'localhost', port: '9200') do |f|
f.response :logger
f.adapter :typhoeus
end
のようにすると、HTTPのリクエストはTyphoeusを使って投げられるようになります。
実際のプラグインに適用する
実際のfluent-plugin-elasticsearchにHTTPバックエンドを変更できるようにしたパッチは以下の通りです。 (out_elasticsearch部分のみ示します。)
diff --git a/lib/fluent/plugin/out_elasticsearch.rb b/lib/fluent/plugin/out_elasticsearch.rb
index 42e1a16..cb5f1c0 100644
--- a/lib/fluent/plugin/out_elasticsearch.rb
+++ b/lib/fluent/plugin/out_elasticsearch.rb
@@ -107,6 +107,7 @@ elasticsearch gem v6.0.2 starts to use correct Content-Type. Please upgrade elas
see: https://github.com/elastic/elasticsearch-ruby/pull/514
EOC
config_param :include_index_in_url, :bool, :default => false
+ config_param :http_backend, :enum, list: [:excon, :typhoeus], :default => :excon
config_section :buffer do
config_set_default :@type, DEFAULT_BUFFER_TYPE
@@ -128,6 +129,7 @@ EOC
raise Fluent::ConfigError, "'tag' in chunk_keys is required." if not @chunk_key_tag
@time_parser = create_time_parser
+ @backend_options = backend_options
if @remove_keys
@remove_keys = @remove_keys.split(/\s*,\s*/)
@@ -207,6 +209,18 @@ EOC
end
end
+ def backend_options
+ case @http_backend
+ when :excon
+ { client_key: @client_key, client_cert: @client_cert, client_key_pass: @client_key_pass }
+ when :typhoeus
+ require 'typhoeus'
+ { sslkey: @client_key, sslcert: @client_cert, keypasswd: @client_key_pass }
+ end
+ rescue LoadError
+ raise Fluent::ConfigError, "You must install #{@http_backend} gem."
+ end
+
def detect_es_major_version
@_es_info ||= client.info
@_es_info["version"]["number"].to_i
@@ -257,8 +271,7 @@ EOC
def client
@_es ||= begin
- excon_options = { client_key: @client_key, client_cert: @client_cert, client_key_pass: @client_key_pass }
- adapter_conf = lambda {|f| f.adapter :excon, excon_options }
+ adapter_conf = lambda {|f| f.adapter @http_backend, @backend_options }
transport = Elasticsearch::Transport::Transport::HTTP::Faraday.new(get_connection_options.merge(
options: {
reload_connections: @reload_connections,
前述の通り、f.adapter
の部分へ切り替えたいHTTPバックエンドのgem名のシンボルを渡してあげれば良いことになります。
この記事では解説していませんが、バックエンドによってはTLSの設定方法に違いがある場合があるので新しいバックエンドを追加した際には無効なハッシュキーを渡さないように注意してください。
まとめ
fluent-plugin-elasticsearchのHTTPバックエンドをexconだけではなくtyphoeusも扱えるように改修したお話を書きました。
記事で引用したパッチはfluent-plugin-elasticsearchのv2.11.4に取り込んでリリース済みです。
fluent-plugin-elasticsearchでkeepaliveが有効にならず、困っている場合はtyphoeus gemをインストールした後、http_backend typhoeus
の設定値を加えてkeepaliveが有効になるHTTPバックエンドをぜひ試してみてください。