はじめに
fluent-plugin-elasticsearchはよく使われているプラグインの一つです。 このプラグインをメンテナンスするためには、Fluentdの知識だけでなく、Elasticsearchが今後どのようになっていくかも知っておく必要があります。 また、このプラグインはRed Hat社がメンテナンスしているOpenShiftのログコンポーネントの一部としても使われています。
elasticsearch-transportのSnifferクラスとは
elasticsearch-transportには定期的にクラスタの状況を監視するSnifferクラスがあります。このクラスではGET _nodes/http
というクラスタの状況を返答するAPIを叩いており、大抵の場合はこのAPIを叩いておけばElasticsearchクラスタの状況がfluent-plugin-elasticsearchが使っているelasticsearchクライアントに通知されます。
そのため、X-Packを用いない通常の使用方法では問題になりません。
k8sサービス化されたElasticsearchクラスタに接続する
k8sのサービスとはPodから生成したノードを一まとめにしたアクセス手段を提供します。k8sの世界観ではサービスのアクセス先は一定です。しかし、サービスを構成するノードの構成要素はある時は起動していますが、またある時は停止または破棄されています。このノード一つ一つにElasticsearchが立っていても通知速度よりも起動・破棄のサイクルが速ければGET _nodes/http
を使用しても欠点が目立つようになります。
そのため、k8sのサービス化されたElasticsearchクラスタには新たなSnifferクラスの実装が必要になります。
そこで、元々のSnifferクラスのhostsメソッドの実装を見てみると、以下のようになっています。
# Retrieves the node list from the Elasticsearch's
# [_Nodes Info API_](http://www.elasticsearch.org/guide/reference/api/admin-cluster-nodes-info/)
# and returns a normalized Array of information suitable for passing to transport.
#
# Shuffles the collection before returning it when the `randomize_hosts` option is set for transport.
#
# @return [Array<Hash>]
# @raise [SnifferTimeoutError]
#
def hosts
Timeout::timeout(timeout, SnifferTimeoutError) do
nodes = transport.perform_request('GET', '_nodes/http').body
hosts = nodes['nodes'].map do |id,info|
if info[PROTOCOL]
host, port = info[PROTOCOL]['publish_address'].split(':')
{ :id => id,
:name => info['name'],
:version => info['version'],
:host => host,
:port => port,
:roles => info['roles'],
:attributes => info['attributes'] }
end
end.compact
hosts.shuffle! if transport.options[:randomize_hosts]
hosts
end
end
nodes = transport.perform_request('GET', '_nodes/http').body
の行でElasticsearchクラスタの情報を取りに行き、取りに行った情報から再度クラスタの情報を再構築しています。
もし、接続先のURLやIPアドレスが固定であれば、以下のようなSnifferクラスを作成し、ホスト情報を使い回す振る舞いをさせた方が良いです。
require 'elasticsearch'
class Fluent::Plugin::ElasticsearchIdempotenceSniffer < Elasticsearch::Transport::Transport::Sniffer
def hosts
@transport.hosts
end
end
elasticsearchクライアントは独自Snifferを渡してそのクラスを元にクラスタ情報を再構築するようなカスタマイズをすることができます。
@sniffer = options[:sniffer_class] ? options[:sniffer_class].new(self) : Sniffer.new(self)
これらの変更をfluent-plugin-elasticsearchで扱うには以下のようにすると独自のSnifferクラスを用いてElasticsearchクラスタとやりとりできるようになります。
config_param :pipeline, :string, :default => nil
config_param :with_transporter_log, :bool, :default => false
config_param :emit_error_for_missing_id, :bool, :default => false
+ config_param :sniffer_class_name, :string, :default => nil
config_param :content_type, :enum, list: [:"application/json", :"application/x-ndjson"], :default => :"application/js
on",
:deprecated => <<EOC
elasticsearch gem v6.0.2 starts to use correct Content-Type. Please upgrade elasticserach gem and stop to use this option
.
#...
def client
@_es ||= begin
adapter_conf = lambda {|f| f.adapter @http_backend, @backend_options }
transport = Elasticsearch::Transport::Transport::HTTP::Faraday.new(get_connection_options.merge(
options: {
reload_connections: @reload_connections,
reload_on_failure: @reload_on_failure,
resurrect_after: @resurrect_after,
retry_on_failure: 5,
@@ -287,7 +300,8 @@ EOC
http: {
user: @user,
password: @password
- }
+ },
+ sniffer_class: @sniffer_class,
}), &adapter_conf)
es = Elasticsearch::Client.new transport: transport
まとめ
fluent-plugin-elasticsearchのElasticsearchへのリクエストに関わるelasticsearch-transportのSnifferに関するお話を書きました。
記事と同様の働きをするパッチはfluent-plugin-elasticsearchのv2.11.5に取り込んでリリース済みです。
fluent-plugin-elasticsearchでk8sやnginxのプロキシを設置していて接続のリロード時に正常なElasticsearchクラスタの情報が取得できずに困っている場合はsniffer_class_name
の設定項目を初期値から変えてみたり、独自のSnifferクラスを定義したりしてみてください。