はじめに
Fluent BitはFluentdファミリーを構成するソフトウェアの一つです。 Fluent BitはGo Pluginプロキシが提供されており、Golangにて共有ライブラリを作成することにより、プラグインとして振る舞わせることのできるインターフェースが提供されています。 この機能については、fluent-bit-go-s3とfluent-bitのGo Pluginプロキシの話でも解説しました。 Fluent BitのGolang製のプラグインのDockerfileを作った話にて突然fluent-bit-go-lokiプラグインが登場してしまっていたので、そのプラグインについての解説を書きます。
Grafana Lokiとは
Lokiとは、新しく開発されたGrafanaのデータソースです。 Lokiにはログを入力するためのAPIが整備されています。
Lokiにレコードを送信するには
ログをPushするのであればPOST /api/prom/push
がAPIのエンドポイントになります。
このAPIのエンドポイントにはJSONまたはProtocol BufferでログをPushできます。 JSON形式でログをLokiに送るにはlabelsを用意するのが少々面倒だったため、fluent-bit-go-lokiではProtocol Bufferでやり取りを行うLokiのクライアントライブラリを使用することにしました。
これをGolangのコードで表現すると次のようになります。
package main
import "github.com/grafana/loki/pkg/promtail/client"
import "github.com/sirupsen/logrus"
import kit "github.com/go-kit/kit/log/logrus"
import "github.com/cortexproject/cortex/pkg/util/flagext"
import "github.com/prometheus/common/model"
import "fmt"
import "time"
func main() {
cfg := client.Config{}
// Init everything with default values.
flagext.RegisterFlags(&cfg)
var clientURL flagext.URLValue
url := "http://localhost:3100/api/prom/push"
// Override some of those defaults
err := clientURL.Set(url)
if err != nil {
fmt.Println("Failed to parse client URL")
return
}
cfg.URL = clientURL
cfg.BatchWait = 1
cfg.BatchSize = 10 * 1024
log := logrus.New()
loki, err := client.New(cfg, kit.NewLogrusLogger(log))
line := `{"message": "Sent from Golang!"}`
labelValue := "from-golang"
labelSet := model.LabelSet{"lang": model.LabelValue(labelValue)}
err = loki.Handle(labelSet, time.Now(), line)
if err != nil {
fmt.Println("Failed to send Loki")
} else {
fmt.Println("Success")
}
// Ensure to send record into Loki.
time.Sleep(3 * time.Second)
}
このLoki向けのクライアントライブラリはバッチ単位で送るため、Handleを呼び出してもすぐにはLokiのAPIエンドポイントには送られないことに注意してください。
Fluent BitのGolang製のプラグインでLokiへイベントを送る
前節でLokiへアクセスするためのGolangのクライアントライブラリの使い方が分かったので、実際にfluent-bit-go-lokiへ組み込んでみます。
FLBPluginInit
でLokiにアクセスするための設定を組み立て、FLBPluginFlush
でLokiに一行づつイベントを送信するためのバッファに溜めています。
また、Fluent Bitのレコードの情報を余さずLokiに送信するためにJSONへエンコードし直しています。
package main
import "github.com/fluent/fluent-bit-go/output"
import "github.com/grafana/loki/pkg/promtail/client"
import "github.com/sirupsen/logrus"
import kit "github.com/go-kit/kit/log/logrus"
import "github.com/prometheus/common/model"
import "github.com/cortexproject/cortex/pkg/util/flagext"
import "github.com/json-iterator/go"
import (
"C"
"fmt"
"log"
"time"
"unsafe"
)
var loki *client.Client
var ls model.LabelSet
//export FLBPluginRegister
func FLBPluginRegister(ctx unsafe.Pointer) int {
return output.FLBPluginRegister(ctx, "loki", "Loki GO!")
}
//export FLBPluginInit
// (fluentbit will call this)
// ctx (context) pointer to fluentbit context (state/ c code)
func FLBPluginInit(ctx unsafe.Pointer) int {
// Example to retrieve an optional configuration parameter
url := output.FLBPluginConfigKey(ctx, "url")
var clientURL flagext.URLValue
err := clientURL.Set(url)
if err != nil {
log.Fatalf("Failed to parse client URL")
}
fmt.Printf("[flb-go] plugin URL parameter = '%s'\n", url)
cfg := client.Config{}
// Init everything with default values.
flagext.RegisterFlags(&cfg)
// Override some of those defaults
cfg.URL = clientURL
cfg.BatchWait = 10 * time.Millisecond
cfg.BatchSize = 10 * 1024
log := logrus.New()
loki, err = client.New(cfg, kit.NewLogrusLogger(log))
if err != nil {
log.Fatalf("client.New: %s\n", err)
}
ls = model.LabelSet{"job": "fluent-bit"}
return output.FLB_OK
}
//export FLBPluginFlush
func FLBPluginFlush(data unsafe.Pointer, length C.int, tag *C.char) int {
var ret int
var ts interface{}
var record map[interface{}]interface{}
dec := output.NewDecoder(data, int(length))
for {
ret, ts, record = output.GetRecord(dec)
if ret != 0 {
break
}
// Get timestamp
timestamp := ts.(output.FLBTime).Time
js, err := createJSON(timestamp, record)
if err != nil {
fmt.Errorf("error creating message for Grafana Loki: %v", err)
continue
}
err = loki.Handle(ls, timestamp, string(js))
if err != nil {
fmt.Errorf("error sending message for Grafana Loki: %v", err)
return output.FLB_RETRY
}
}
// Return options:
//
// output.FLB_OK = data have been processed.
// output.FLB_ERROR = unrecoverable error, do not try this again.
// output.FLB_RETRY = retry to flush later.
return output.FLB_OK
}
func createJSON(timestamp time.Time, record map[interface{}]interface{}) (string, error) {
m := make(map[string]interface{})
for k, v := range record {
switch t := v.(type) {
case []byte:
// prevent encoding to base64
m[k.(string)] = string(t)
default:
m[k.(string)] = v
}
}
js, err := jsoniter.Marshal(m)
if err != nil {
return "{}", err
}
return string(js), nil
}
//export FLBPluginExit
func FLBPluginExit() int {
loki.Stop()
return output.FLB_OK
}
func main() {
}
このファイルをout_loki.goとして保存します。 依存関係のパッケージを準備した後1、以下のコマンドを実行するとFluent Bit用のLokiプラグインの振る舞いをする共有オブジェクトが作成できます。
$ go build -buildmode=c-shared -o out_loki.so .
Golang製のプラグインの動かし方
Fluent BitのGolang製の共有オブジェクトのプラグインを動かすには例えば、以下のような設定ファイルとコマンドが必要です。
[INPUT]
Name cpu
Tag cpu.local
# Interval Sec
# ====
# Read interval (sec) Default: 1
Interval_Sec 1
[OUTPUT]
Name loki
Match *
Url http://localhost:3100/api/prom/push
$ fluent-bit -c /path/to/fluent-bit.conf -e /path/to/out_loki.so
Fluent Bitが以下のようなログを吐き出していれば読み込みに成功して動作しています。
Fluent Bit v1.2.2
Copyright (C) Treasure Data
[2019/07/31 12:15:20] [ info] [storage] initializing...
[2019/07/31 12:15:20] [ info] [storage] in-memory
[2019/07/31 12:15:20] [ info] [storage] normal synchronization mode, checksum disabled, max_chunks_up=128
[2019/07/31 12:15:20] [ info] [engine] started (pid=13346)
[flb-go] plugin URL parameter = 'http://localhost:3100/api/prom/push'
[2019/07/31 12:15:20] [ info] [sp] stream processor started
まとめ
Fluent BitのGo製の共有オブジェクトでのプラグインについてまとまった解説を書きました。 実際のfluent-bit-go-lokiはlabelSetsが複数指定できるようになっていたり、テストが書きやすいようにFluent Bitが関わる部分をinterfaceに分離しています。2 GolangでもFluent Bitのプラグインを書くことが出来ますからぜひ試してみてください。
-
筆者は執筆時点ではGolangの依存関係を管理するパッケージマネージャーはdepを使用しています。depでの依存パッケージの管理の開始方法はdepのドキュメントを参照してください。 ↩
-
実際のコードはGitHubリポジトリを参照してください。 ↩