20210513のGoに関する記事は2件です。

fireworqのメトリクスを収集する方法について考えた

こんにちは。qudoを打倒している者です。 以前上記の記事を書いたんですが、最近fireworqをサービスインしていくつかのqudoジョブをリプレイスしました。 動作は問題ないんですが、滞留してるジョブの数など、各種メトリクスがとれていない状態なので、今回はそれらのメトリクスをモニタリングする方法を考えます。 mackerelを利用する mackerelをモニタリングツールとして利用している場合、fireworq公式のmackerel-agent-pluginが存在するため、それを利用するだけで済みます。(fireworqははてな社が開発しているOSSです) cloudwatch + telegrafを利用する cloudwatchにメトリクスを保管する場合を考えます。 弊社はお金ないのでこちらです。 メトリクスの収集パート メトリクスの収集にはtelegrafを利用します。 telegrafはinfxludb社がOSSとして開発しているメトリクス収集用のエージェントです。 telegrafはプラグイン方式をとっており、様々なミドルウェアに対応したプラグインが存在します。 fireworq対応のプラグインは存在しないのですが、コマンド実行結果を入力として取り込むプラグイン「Exec」がありますので、こちらを利用してメトリクスを取り込みます。 The Exec input plugin parses supported Telegraf input data formats (line protocol, JSON, Graphite, Value, Nagios, Collectd, and Dropwizard) into metrics Execインプットプラグインでは、データフォーマットとして複数種のフォーマットに対応しているようです。ここではシンプルにJSONで取り込むことにします。 fireworqのメトリクス取得エンドポイントについて fireworqにはGETリクエストでメトリクスを取得できるエンドポイントが用意されています。 たとえば、 /queues/stats を叩くと、以下のように各キューのメトリクスがJSON形式で返却されます curl ドメイン/queues/stats | jq . { "default": { "total_pushes": 0, "total_pops": 0, "total_successes": 0, "total_failures": 0, "total_permanent_failures": 0, "total_completes": 0, "total_elapsed": 0, "pushes_per_second": 0, "pops_per_second": 0, "outstanding_jobs": 0, "total_workers": 20, "idle_workers": 20, "active_nodes": 1 } } この例ですと、「default」がキュー名に相当します。 あとはゴニョゴニョ加工するなりして、Execインプットプラグインに入力すればよさそうです。 mackerel-plugin-fireworqを流用できそう 上述のエンドポイントを自分で叩いてもいいんですが、mackerel-plugin-fireworqを流用する方法を考えます。 というのも、mackerel-plugin-fireworqは上述のメトリクスをいい感じに集約してmackerelに出力しているためです。 どういう風にデータを集約するかなど、なるべく自分で考えたくありません!!(怠惰 で、具体的にどう流用するかですが、mackerelのプラグインは単体実行できるコマンドとして提供されているため、mackerel-plugin-fireworqの実行結果をtelegrafのExecインプットプラグインに入力すればいいことになります。 しかし、mackerelプラグインは 項目名\t値\tタイムスタンプ という複数行のTSVの形式でメトリクスを標準出力に出すため、そのままではExecインプットプラグインには取り込めません。 mackerel-plugin-fireworqの場合、以下のような出力になります。 fireworq.jobs.elapsed.jobs_average_elapsed_time 0 1620055407 fireworq.node.active_nodes 1 1620055407 fireworq.node.active_nodes_percentage 100 1620055407 fireworq.queue.workers.queue_idle_workers 20 1620055407 fireworq.queue.workers.queue_running_workers 0 1620055407 fireworq.queue.buffer.queue_outstanding_jobs 0 1620055407 fireworq.jobs.jobs_failure 0 1620055407 fireworq.jobs.jobs_success 0 1620055407 fireworq.jobs.jobs_outstanding 0 1620055407 fireworq.jobs.jobs_waiting 0 1620055407 fireworq.jobs.events.jobs_events_pushed 0 1620055407 fireworq.jobs.events.jobs_events_popped 0 1620055407 fireworq.jobs.events.jobs_events_failed 0 1620055407 fireworq.jobs.events.jobs_events_succeeded 0 1620055407 fireworq.jobs.events.jobs_events_completed 0 1620055407 上記のデータをJSONに加工できればよさそうです。 そこでgolangでちょっとしたCLIをしたためました。 canning(缶詰)をもじってkanningと命名しました。 mackerelがサバなので、それを加工するイメージです。(サバ缶的な kanningはmackerel-agent-pluginの出力を標準入力からくわせると、JSONに整形して出力します。 たとえば、上記のmackerel-plugin-fireworqの出力を入力すると、 cat _test/fireworq_metrics.tsv | kanning | jq . 以下のようになります。 { "jobs": { "elapsed": { "jobs_average_elapsed_time": 0 }, "events": { "jobs_events_completed": 0, "jobs_events_failed": 0, "jobs_events_popped": 0, "jobs_events_pushed": 0, "jobs_events_succeeded": 0 }, "jobs_failure": 0, "jobs_outstanding": 0, "jobs_success": 0, "jobs_waiting": 0 }, "node": { "active_nodes": 1, "active_nodes_percentage": 100 }, "queue": { "buffer": { "queue_outstanding_jobs": 0 }, "workers": { "queue_idle_workers": 20, "queue_running_workers": 0 } } } 現在、rootのfireworqとタイムスタンプは出力から落としています。 (kanningはテストまだ書いてないので、利用しないでください。しないと思いますが) 最終的なtelegrafのinput設定 mackerel-plugin-fireworqとkanningを組み合わせると、Execインプットプラグインの設定は以下のようになります。 [[inputs.exec]] ## Commands array commands = [ "/tmp/fireworq.sh" ] ## Timeout for each command to complete. timeout = "5s" ## measurement name suffix (for separating different commands) name_suffix = "_mycollector" ## Data format to consume. ## Each data format has its own unique set of configuration options, read ## more about them here: ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md data_format = "json" /tmp/fireworq.sh #!/bin/sh mackerel-plugin-fireworq | kanning メトリクスの入力パート(cloudwatchへの出力) cloudwatchへの出力用プラグインが存在しますので、そちらを利用しましょう。 余談 cloudwatch-agentはtelegrafのforkのようです。 go.modでreplaceをかけてる telegrafのExecインプットプラグインのようなものがあるかと期待したのですが、独自にメトリクスを取り込むような機能はオミットされているようなので、今回は利用を見送りました。 最後に メトリクスコレクターって何を使うのがいいんでしょうか? mackerelやdatadogを利用している場合、それぞれで提供されているエージェントを利用すればいいと思うのですが、cloudwatchを利用している場合、選択に悩みます。 今回は入力、出力に対応するプラグインが存在するということでtelegrafを利用しましたが、他に選択肢があればコメント欄でご教示いただけますと幸いです。
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

GO言語基礎②(defer/map/range/Interface/goroutine/channel)

前回の記事 GO言語基礎①(package/function/for/if/switch/struct/point/arrays/slice) 基礎文法 Defer:遅延実行 deferへ渡した関数の実行を呼び出し元の関数の終わり(returnする)まで遅延させる。Finallyのようなもの。 deferが複数ある場合、渡した順にスタックされていき、最後から順に実行されていきます。 defer内での処理でエラーが発生した場合、エラーハンドリングのreturnは無視されます。 対策は以下記事を参考にしてください。 【Go】 deferに渡した関数内のエラーを呼び出し元に返す 宣言方法 defer <処理> 下記の順でdeferに処理が渡されていた場合 defer func() { fmt.Println("処理1") }() defer func() { fmt.Println("処理2") }() defer fmt.Println("処理3") 実行結果 処理3 処理2 処理1 後勝ちで処理が実行される。 deferに渡した関数ないでファイル処理、変換処理等でエラーが発生した場合、エラーを拾うことができない。(関数内で拾うことは可能) ファイル編集後のファイルを閉じる処理等で利用します。 Maps:連想配列 Mapsはキー (key)というデータを使って要素を指定するデータ構造である。 複数の宣言方法が存在する。 初期値を指定しない場合、変数はnil(nil マップ)に初期化される。 要素の挿入や削除が行える。 宣言方法 var m map[キーの型]要素の型 var m map[キーの型]要素の型{"key":value,"key":value} m := make(map[キーの型]要素の型, キャパシティの初期値(省略可能)) // 初期値を指定しない宣言(空のmap値のmakeで生成したときのような追加等できないため、型宣言のみ) var m1 map[string]int fmt.Println(m1) m1 = map[string]int{"Go": 1} fmt.Println(m1) // 初期化+予め要素数分のキャパシティの確保ができる m2 := make(map[string]int) mk := make(map[string]int, 2) fmt.Println(m2, mk) // 値の追加 m2["Go"] = 1 m2["Python"] = 2 fmt.Println(m2) // 削除 delete(m2, "Python") fmt.Println(m2) // 初期値を指定して宣言 m3 := map[string]int{"Go": 1, "JavaScript": 2} fmt.Println(m3) 実行結果 map[] map[Go:1] map[] map[] map[Go:1 Python:2] map[Go:1] map[Go:1 JavaScript:2] Range(forのrange節) rangeは拡張for文のようなものです。 map、Slices(スライス)の要素を順に出力できます。 スライスの場合は、インデックス番号と値の取得 mapの場合は、keyとvalueの取得が可能です。 switch文のようにcontenue、bleakにて処理の継続、中断も可能です。 宣言方法 // スライスの場合 for インデックス番号, 値 := range スライス // mapの場合 for key, value := range map s := []string{"Go", "Python", "Ruby", "Java"} m := map[int]string{0: "Go", 2: "Python", 3: "Ruby", 4: "Java"} // スライスのrange for i, v := range s { fmt.Println("スライスの場合:", i, v) } // mapのrange for k, v := range m { fmt.Println("mapの場合:", k, v) } // スライス:indexを取得しない場合 for _, v := range s { fmt.Println(v) } // map:valueを取得しない場合 for k, _ := range m { fmt.Println(k) } // 処理の中断、スキップ for _, v := range s { if "Go" == v { fmt.Println("静的言語") continue // 次の処理にスキップ } if "Ruby" == v { fmt.Println("動的言語") break // ループから抜ける } fmt.Println("学習中。。") } 実行結果 スライスの場合: 0 Go スライスの場合: 1 Python スライスの場合: 2 Ruby スライスの場合: 3 Java mapの場合: 2 Python mapの場合: 3 Ruby mapの場合: 4 Java mapの場合: 0 Go Go Python Ruby Java 0 2 3 4 静的言語 学習中。。 動的言語 Interface interface(インターフェース) とはメソッドの型だけを定義した型のことです。 interface(インターフェース)を利用することで、オブジェクト指向言語でいうところのポリモーフィズムと同様の機能を実現できます。 前回紹介したEmbeddedを利用したストラクトの継承の際にメソッドの実行漏れを防ぐために利用します。 宣言方法 type 型名 interface { メソッド名1(引数の型, ...) (返り値の型, ...) ..... メソッド名N(引数の型, ...) (返り値の型, ...) } 空のInterface Go言語には、全ての型と互換性を持っているinterface{}型(空インターフェース) というものが存在しています。 interface{}(空インターフェース)は文字通りゼロ個のメソッドを指定されたインターフェース型のことなので、以下の様に定義する事ができます。 // 空のインターフェース var obj interface{} func main() { obj = 1 obj = "string" obj = 1.2 obj = []string{"Go", "Python", "Ruby"} obj = map[int]string{1: "Go", 2: "Java"} obj = func(val int) string { return fmt.Sprintf("number is %d", val) } } 型アサーション どんな型の値でも受け取れるinterface{}ですが、interface{}型の引数で受け渡された値は、元の型の情報が欠落しています。 Go言語ではこのような局面で利用するための型アサーションを提供しており、型アサーションにより実体の型が何であるかを動的にチェックすることができます。(以下構文を使用します) 記述方法 value, ok := <変数>.(<型>) // 第二引数を省略した型。比較結果が異なる場合はエラーを投げる value := <変数>.(<型>) valueには実際に格納されている値。okには型の比較結果が格納されます。 また、第二引数の比較結果を取得しないこともできるのですが、型が異なる場合エラーを投げるようになります。 func main() { typeCheck(1) typeCheck("stirng") typeCheck([]string{"Go", "Python", "Ruby"}) } func typeCheck(obj interface{}) { if value, ok := obj.(int); ok { fmt.Printf("parameter is integer. %d\n", value) } if value, ok := obj.(string); ok { fmt.Printf("parameter is string. %s\n", value) } if value, ok := obj.([]string); ok { fmt.Printf("parameter is slice. %s\n", value) } } 型switch 型アサーションと分岐を組み合わせた処理を手軽に記述するための型switchが提供されています。 型switchを利用すると前述のprintIf関数は以下のように記述できます。 宣言方法 swich 比較値 := インタフェース型で定義された値.(type) { case 型:  処理 func main() { obj = 1 switchCheck(obj) obj = "a" switchCheck(obj) obj = []string{"Go", "Python", "Ruby"} switchCheck(obj) obj = map[int]string{1: "Go", 2: "Java"} switchCheck(obj) } func switchCheck(obj interface{}) { switch value := obj.(type) { case int: fmt.Printf("parameter is integer %d\n", value) case string: fmt.Printf("parameter is string %s\n", value) case []string: fmt.Printf("parameter is slice %s\n", value) default: fmt.Printf("parameter is unknow type %v\n", value) } } 実行結果 parameter is integer 1 parameter is string a parameter is slice [Go Python Ruby] parameter is unknow type map[1:Go 2:Java] 構造体にインターフェースの実装 以下の様に書く事で構造体にインターフェースを実装する事ができます。 実装方法 func (引数 構造体名) 関数名(){ 関数の中身 } type People interface { getName() } type Person struct { name string } func (per Person) getName() { fmt.Println(per.name) } func main() { per := Person{"Mike"} per.getName() } 出力結果 Mike PersonのストラクトにgetNameメソッドを実装して実行することができていることがわかる。 goroutine(ゴルーチン):並列処理 goroutineとはGo言語の処理を平行で行うことができます。 マルチスレッドってやつです。 関数の前にgoをつけることにより、異なるgoroutine`で実行することができます。 実装方法 go 関数名() { } goは新しいgoroutineを生成して、そのgoroutine内で指定された関数を実行します。 新しいgoroutineは並行に動作するので、関数の実行終了を待つことなく、goのあとに記述されているプログラムを実行します。 goroutineの終了条件 関数の処理終了 returnで抜ける runtime.Goexit()を実行する 現在起動しているgoroutine(ゴルーチン)の数の取得はruntime.NumGoroutine()を利用することで行うことができる。 func main() { log.Println(runtime.NumGoroutine()) } goroutine(ゴルーチン)の実装例 まずgoroutineなしのプログラムを記述し、実行例を記述します。 goroutineなし func main() { fmt.Println("Strat") roop(2, "firtst") roop(2, "second") fmt.Println("Finish") } func roop(num int, s string) { for i := 0; i <= num; i++ { time.Sleep(1 * time.Second) fmt.Println(i, s) } } 実行結果 Strat 0 firtst 1 firtst 2 firtst 0 second 1 second 2 second Finish 単純にforループ処理が1秒ごとに行われ、次のループ処理の表示がされていることがわかります。 次はgoroutineを利用してroop関数の処理を並列実行してみます。 goroutineあり、main待ち時間なし func main() { fmt.Println("Strat") go roop(2, "firtst") go roop(2, "second") fmt.Println("Finish") } func roop(num int, s string) { for i := 0; i <= num; i++ { time.Sleep(1 * time.Second) fmt.Println(i, s) } } 実行結果 Strat Finish 単純に関数処理を行う場合は上記のようにgoroutineの実行結果を待たない状態でmain処理が終了してしまうため、roop関数の処理を行うことができていません。 実行するようにするにはgoroutineの実行が終わるまでmainの処理を止めておく必要があります。 今回の処理では「0〜3」の3秒が最低でもかかるのでmain処理が4秒後に終えるように修正すれば並列処理を確認できます。 goroutineあり、main待ち時間あり func main() { fmt.Println("Strat") go roop(2, "firtst") go roop(2, "second") time.Sleep(4 * time.Second) fmt.Println("Finish") } func roop(num int, s string) { for i := 0; i <= num; i++ { time.Sleep(1 * time.Second) fmt.Println(i, s) } } 実行結果 Strat 0 second 0 firtst 1 firtst 1 second 2 second 2 firtst Finish 上記のように待ち時間を記載することで並列処理を確認することができました。 今回は簡潔な処理であったため、待ち時間を指定して強引に並列処理を確認しましたが、本来であればchannelを利用してgoroutine処理終了後にmain処理に戻すことができます。 channel(チャネル):同期処理 channelはmain処理とgoroutine間の並列処理中のデータのやり取りをする場合、2つ以上のgoroutineの処理でのデータのやり取りを行う場合に利用されます。 これにより値の交換及び同期を行うことができ、予期せぬ処理結果が起こらないことを保証します。 channelは以下の方法にて生成します。 宣言方法 make(chan 型) make(chan 型, バッファサイズ) バッファサイズはchannelで利用するデータ量を表すものです。これによりやり取りできるデータサイズが決まります。 channel(チャネル)での値の送受信方法 値の送受信 // channelにデータを入れる ch <- data // channelからデータを取得する result := <- ch 簡単な例は以下です。 func main() { c := make(chan int) go sum(c) fmt.Println(<-c) } func sum(c chan int) { sum := 0 for i := 0; i < 5; i++ { sum += i } c <- sum } 実行結果 10 計算結果が取得できていることがわかります。 channelの実装例 上記の記述を踏まえた上で、実装例をみていきましょう。 先程のgoroutineの例では並列処理中の値を取得して、別のgoroutineにて並列処理を行うということができませんでした。 time.Sleepでmain処理を継続させて並列処理を確認するのではなく、channelを利用して処理の同期を取るように修正しましょう。 func main() { c1 := make(chan bool) c2 := make(chan bool) fmt.Println("Strat") go roop(2, "firtst", c1) go roop(2, "second", c2) <-c1 <-c2 fmt.Println("Finish") } func roop(num int, s string, c chan bool) { for i := 0; i <= num; i++ { time.Sleep(1 * time.Second) fmt.Println(i, s) } c <- true } 実行結果 Strat 0 firtst 0 second 1 second 1 firtst 2 second 2 firtst Finish channelをroop関数に渡すことで、出力処理が終わったタイミングでchannelを返却して、main処理にgoroutineの処理が終了したことを同期します。 これにより二つのgoroutineの並列処理をmainに同期することで、シングルスレッドではなく、マルチスレッドの処理が行われていることがわかります。 参考記事 【Go】基本文法⑥(インターフェース) 【Go】基本文法⑤(連想配列・ Range) 【Go】基本文法⑦(並行処理) Go言語 - 空インターフェースと型アサーション
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む