ククログ

株式会社クリアコード > ククログ > fluent-plugin-elasticsearchのHTTPバックエンドを切り替えられるようにするには

fluent-plugin-elasticsearchのHTTPバックエンドを切り替えられるようにするには

はじめに

fluent-plugin-elasticsearchはよく使われているプラグインの一つです。 このプラグインをメンテナンスするためには、Fluentdの知識だけでなく、Elasticsearchが今後どのようになっていくかも知っておく必要があります。 取り掛かりとして、fluent-plugin-elasticsearchの構造をまず軽く説明します。fluent-plugin-elasticsearchのElasticsearchのAPIリクエストは自前で実装しているのではなく、elasticsearch, elasticsearch-api, elasticsearch-transportというgemに依存しています。それぞれ、ElasticsearchのRubyクライアントライブラリをカプセル化して共通のインターフェースで使用できるようにgem化したもの、APIリクエストをgem化したもの、HTTPリクエストの方式をgem化したものです。

elasticsearch-transport

この中で、今回はelasticsearch-transportについて取り上げます。elasticsearch-transportは複数のHTTPバックエンドを切り替えて使用することができます。

fluent-plugin-elasticsearchでは、これまで以下のようにHTTPリクエストのバックエンドライブラリとしてexconが固定で使われていました。

    def client
      @_es ||= begin
        excon_options = { client_key: @client_key, client_cert: @client_cert, client_key_pass: @client_key_pass }
        adapter_conf = lambda {|f| f.adapter :excon, excon_options } # f.adaptorに ':excon' 固定
        transport = Elasticsearch::Transport::Transport::HTTP::Faraday.new(get_connection_options.merge(
                                                                            options: {
                                                                              reload_connections: @reload_connections,
                                                                               reload_on_failure: @reload_on_failure,
                                                                               resurrect_after: @resurrect_after,
                                                                               retry_on_failure: 5,
                                                                               logger: @transport_logger,
                                                                               transport_options: {
                                                                                 headers: { 'Content-Type' => @content_type.to_s },
                                                                                 request: { timeout: @request_timeout },
                                                                                 ssl: { verify: @ssl_verify, ca_file: @ca_file, version: @ssl_version }
                                                                               },
                                                                               http: {
                                                                                 user: @user,
                                                                                 password: @password
                                                                               }
                                                                             }), &adapter_conf)
      es = Elasticsearch::Client.new transport: transport
# ...

exconの問題点

exconはHTTPのバックエンドライブラリとしては申し分がないのですが、keepaliveがデフォルトで有効にならないという問題がありました。 nginxなどのproxy配下ではkeepaliveが有効でないと接続が頻繁に切れ、効率的な転送が行えないという問題が報告されました。

elasticsearch-transportでFaradyアダプターのHTTPバックエンドを切り替えられるようにする

elasticsearch-transportはいくつかのHTTPバックエンドを使用することができます。その一つがFaradayアダプターを使用するものです。 Faradayはそれ単体ではHTTPを扱う統一的なインターフェースを提供するだけですが、実際のHTTPリクエストはHTTPを扱うライブラリに担当させます。 例えば、exconを使ってHTTPリクエストを出すには以下のようにします。

require 'excon'

client = Elasticsearch::Client.new(host: 'localhost', port: '9200') do |f|
  f.response :logger
  f.adapter  :excon
end

この状態では、exconのみしかHTTPのバックエンドに使用することができません。

例えば、typhoeus を使ってHTTPリクエストを投げるようにするには、

require 'typhoeus'
require 'typhoeus/adapters/faraday'

client = Elasticsearch::Client.new(host: 'localhost', port: '9200') do |f|
  f.response :logger
  f.adapter  :typhoeus
end

のようにすると、HTTPのリクエストはTyphoeusを使って投げられるようになります。

実際のプラグインに適用する

実際のfluent-plugin-elasticsearchにHTTPバックエンドを変更できるようにしたパッチは以下の通りです。 (out_elasticsearch部分のみ示します。)

diff --git a/lib/fluent/plugin/out_elasticsearch.rb b/lib/fluent/plugin/out_elasticsearch.rb
index 42e1a16..cb5f1c0 100644
--- a/lib/fluent/plugin/out_elasticsearch.rb
+++ b/lib/fluent/plugin/out_elasticsearch.rb
@@ -107,6 +107,7 @@ elasticsearch gem v6.0.2 starts to use correct Content-Type. Please upgrade elas
 see: https://github.com/elastic/elasticsearch-ruby/pull/514
 EOC
     config_param :include_index_in_url, :bool, :default => false
+    config_param :http_backend, :enum, list: [:excon, :typhoeus], :default => :excon
 
     config_section :buffer do
       config_set_default :@type, DEFAULT_BUFFER_TYPE
@@ -128,6 +129,7 @@ EOC
       raise Fluent::ConfigError, "'tag' in chunk_keys is required." if not @chunk_key_tag
 
       @time_parser = create_time_parser
+      @backend_options = backend_options
 
       if @remove_keys
         @remove_keys = @remove_keys.split(/\s*,\s*/)
@@ -207,6 +209,18 @@ EOC
       end
     end
 
+    def backend_options
+      case @http_backend
+      when :excon
+        { client_key: @client_key, client_cert: @client_cert, client_key_pass: @client_key_pass }
+      when :typhoeus
+        require 'typhoeus'
+        { sslkey: @client_key, sslcert: @client_cert, keypasswd: @client_key_pass }
+      end
+    rescue LoadError
+      raise Fluent::ConfigError, "You must install #{@http_backend} gem."
+    end
+
     def detect_es_major_version
       @_es_info ||= client.info
       @_es_info["version"]["number"].to_i
@@ -257,8 +271,7 @@ EOC
 
     def client
       @_es ||= begin
-        excon_options = { client_key: @client_key, client_cert: @client_cert, client_key_pass: @client_key_pass }
-        adapter_conf = lambda {|f| f.adapter :excon, excon_options }
+        adapter_conf = lambda {|f| f.adapter @http_backend, @backend_options }
         transport = Elasticsearch::Transport::Transport::HTTP::Faraday.new(get_connection_options.merge(
                                                                             options: {
                                                                               reload_connections: @reload_connections,

前述の通り、f.adapter の部分へ切り替えたいHTTPバックエンドのgem名のシンボルを渡してあげれば良いことになります。 この記事では解説していませんが、バックエンドによってはTLSの設定方法に違いがある場合があるので新しいバックエンドを追加した際には無効なハッシュキーを渡さないように注意してください。

まとめ

fluent-plugin-elasticsearchのHTTPバックエンドをexconだけではなくtyphoeusも扱えるように改修したお話を書きました。 記事で引用したパッチはfluent-plugin-elasticsearchのv2.11.4に取り込んでリリース済みです。 fluent-plugin-elasticsearchでkeepaliveが有効にならず、困っている場合はtyphoeus gemをインストールした後、http_backend typhoeusの設定値を加えてkeepaliveが有効になるHTTPバックエンドをぜひ試してみてください。