ククログ

株式会社クリアコード > ククログ > Fluent BitからGrafana Lokiに転送するには

Fluent BitからGrafana Lokiに転送するには

はじめに

Fluent BitはFluentdファミリーを構成するソフトウェアの一つです。 Fluent BitはGo Pluginプロキシが提供されており、Golangにて共有ライブラリを作成することにより、プラグインとして振る舞わせることのできるインターフェースが提供されています。 この機能については、fluent-bit-go-s3とfluent-bitのGo Pluginプロキシの話でも解説しました。 Fluent BitのGolang製のプラグインのDockerfileを作った話にて突然fluent-bit-go-lokiプラグインが登場してしまっていたので、そのプラグインについての解説を書きます。

Grafana Lokiとは

Lokiとは、新しく開発されたGrafanaのデータソースです。 Lokiにはログを入力するためのAPIが整備されています。

Lokiにレコードを送信するには

ログをPushするのであればPOST /api/prom/pushがAPIのエンドポイントになります。

このAPIのエンドポイントにはJSONまたはProtocol BufferでログをPushできます。 JSON形式でログをLokiに送るにはlabelsを用意するのが少々面倒だったため、fluent-bit-go-lokiではProtocol Bufferでやり取りを行うLokiのクライアントライブラリを使用することにしました。

これをGolangのコードで表現すると次のようになります。

package main

import "github.com/grafana/loki/pkg/promtail/client"
import "github.com/sirupsen/logrus"
import kit "github.com/go-kit/kit/log/logrus"
import "github.com/cortexproject/cortex/pkg/util/flagext"
import "github.com/prometheus/common/model"

import "fmt"
import "time"

func main() {
	cfg := client.Config{}
	// Init everything with default values.
	flagext.RegisterFlags(&cfg)
	var clientURL flagext.URLValue

	url := "http://localhost:3100/api/prom/push"
	// Override some of those defaults
	err := clientURL.Set(url)
	if err != nil {
		fmt.Println("Failed to parse client URL")
		return
	}
	cfg.URL = clientURL
	cfg.BatchWait = 1
	cfg.BatchSize = 10 * 1024

	log := logrus.New()

	loki, err := client.New(cfg, kit.NewLogrusLogger(log))

	line := `{"message": "Sent from Golang!"}`

	labelValue := "from-golang"
	labelSet := model.LabelSet{"lang": model.LabelValue(labelValue)}
	err = loki.Handle(labelSet, time.Now(), line)
	if err != nil {
		fmt.Println("Failed to send Loki")
	} else {
		fmt.Println("Success")
	}
	// Ensure to send record into Loki.
	time.Sleep(3 * time.Second)
}

このLoki向けのクライアントライブラリはバッチ単位で送るため、Handleを呼び出してもすぐにはLokiのAPIエンドポイントには送られないことに注意してください。

Fluent BitのGolang製のプラグインでLokiへイベントを送る

前節でLokiへアクセスするためのGolangのクライアントライブラリの使い方が分かったので、実際にfluent-bit-go-lokiへ組み込んでみます。 FLBPluginInitでLokiにアクセスするための設定を組み立て、FLBPluginFlushでLokiに一行づつイベントを送信するためのバッファに溜めています。 また、Fluent Bitのレコードの情報を余さずLokiに送信するためにJSONへエンコードし直しています。

package main

import "github.com/fluent/fluent-bit-go/output"
import "github.com/grafana/loki/pkg/promtail/client"
import "github.com/sirupsen/logrus"
import kit "github.com/go-kit/kit/log/logrus"
import "github.com/prometheus/common/model"
import "github.com/cortexproject/cortex/pkg/util/flagext"
import "github.com/json-iterator/go"

import (
	"C"
	"fmt"
	"log"
	"time"
	"unsafe"
)

var loki *client.Client
var ls model.LabelSet

//export FLBPluginRegister
func FLBPluginRegister(ctx unsafe.Pointer) int {
	return output.FLBPluginRegister(ctx, "loki", "Loki GO!")
}

//export FLBPluginInit
// (fluentbit will call this)
// ctx (context) pointer to fluentbit context (state/ c code)
func FLBPluginInit(ctx unsafe.Pointer) int {
	// Example to retrieve an optional configuration parameter
	url := output.FLBPluginConfigKey(ctx, "url")
	var clientURL flagext.URLValue
	err := clientURL.Set(url)
	if err != nil {
		log.Fatalf("Failed to parse client URL")
	}
	fmt.Printf("[flb-go] plugin URL parameter = '%s'\n", url)

	cfg := client.Config{}
	// Init everything with default values.
	flagext.RegisterFlags(&cfg)

	// Override some of those defaults
	cfg.URL = clientURL
	cfg.BatchWait = 10 * time.Millisecond
	cfg.BatchSize = 10 * 1024

	log := logrus.New()

	loki, err = client.New(cfg, kit.NewLogrusLogger(log))
	if err != nil {
		log.Fatalf("client.New: %s\n", err)
	}
	ls = model.LabelSet{"job": "fluent-bit"}

	return output.FLB_OK
}

//export FLBPluginFlush
func FLBPluginFlush(data unsafe.Pointer, length C.int, tag *C.char) int {
	var ret int
	var ts interface{}
	var record map[interface{}]interface{}

	dec := output.NewDecoder(data, int(length))

	for {
		ret, ts, record = output.GetRecord(dec)
		if ret != 0 {
			break
		}

		// Get timestamp
		timestamp := ts.(output.FLBTime).Time

		js, err := createJSON(timestamp, record)
		if err != nil {
			fmt.Errorf("error creating message for Grafana Loki: %v", err)
			continue
		}

		err = loki.Handle(ls, timestamp, string(js))
		if err != nil {
			fmt.Errorf("error sending message for Grafana Loki: %v", err)
			return output.FLB_RETRY
		}
	}

	// Return options:
	//
	// output.FLB_OK    = data have been processed.
	// output.FLB_ERROR = unrecoverable error, do not try this again.
	// output.FLB_RETRY = retry to flush later.
	return output.FLB_OK
}

func createJSON(timestamp time.Time, record map[interface{}]interface{}) (string, error) {
	m := make(map[string]interface{})

	for k, v := range record {
		switch t := v.(type) {
		case []byte:
			// prevent encoding to base64
			m[k.(string)] = string(t)
		default:
			m[k.(string)] = v
		}
	}

	js, err := jsoniter.Marshal(m)
	if err != nil {
		return "{}", err
	}

	return string(js), nil
}

//export FLBPluginExit
func FLBPluginExit() int {
	loki.Stop()
	return output.FLB_OK
}

func main() {
}

このファイルをout_loki.goとして保存します。 依存関係のパッケージを準備した後1、以下のコマンドを実行するとFluent Bit用のLokiプラグインの振る舞いをする共有オブジェクトが作成できます。

$ go build -buildmode=c-shared -o out_loki.so .

Golang製のプラグインの動かし方

Fluent BitのGolang製の共有オブジェクトのプラグインを動かすには例えば、以下のような設定ファイルとコマンドが必要です。

[INPUT]
    Name cpu
    Tag  cpu.local
    # Interval Sec
    # ====
    # Read interval (sec) Default: 1
    Interval_Sec 1

[OUTPUT]
    Name  loki
    Match *
    Url http://localhost:3100/api/prom/push
$ fluent-bit -c /path/to/fluent-bit.conf -e /path/to/out_loki.so

Fluent Bitが以下のようなログを吐き出していれば読み込みに成功して動作しています。


Fluent Bit v1.2.2
Copyright (C) Treasure Data

[2019/07/31 12:15:20] [ info] [storage] initializing...
[2019/07/31 12:15:20] [ info] [storage] in-memory
[2019/07/31 12:15:20] [ info] [storage] normal synchronization mode, checksum disabled, max_chunks_up=128
[2019/07/31 12:15:20] [ info] [engine] started (pid=13346)
[flb-go] plugin URL parameter = 'http://localhost:3100/api/prom/push'
[2019/07/31 12:15:20] [ info] [sp] stream processor started

まとめ

Fluent BitのGo製の共有オブジェクトでのプラグインについてまとまった解説を書きました。 実際のfluent-bit-go-lokiはlabelSetsが複数指定できるようになっていたり、テストが書きやすいようにFluent Bitが関わる部分をinterfaceに分離しています。2 GolangでもFluent Bitのプラグインを書くことが出来ますからぜひ試してみてください。

  1. 筆者は執筆時点ではGolangの依存関係を管理するパッケージマネージャーはdepを使用しています。depでの依存パッケージの管理の開始方法はdepのドキュメントを参照してください。

  2. 実際のコードはGitHubリポジトリを参照してください。