- 投稿日:2020-10-13T23:15:18+09:00
【Golang】mapのvalueに配列を指定する
概要
mapの中にvalueに配列をいれるみたいなのをGoでしたかったので自分用メモ
実装
package main import "fmt" type Hoge struct { Say string } func main() { a := map[string][]Hoge{} a["hoge"] = append(a["hoge"], Hoge{Say: "hello1"}) a["hoge"] = append(a["hoge"], Hoge{Say: "hello2"}) a["hoge1"] = append(a["hoge1"], Hoge{Say: "hello3"}) fmt.Println(a) }上記をgo runすると
map[hoge:[{hello1} {hello2}] hoge1:[{hello3}]]のようにmapのvalueの中にarrayが入ります
- 投稿日:2020-10-13T10:00:27+09:00
golang profiling
参考資料をメモするだけ
- https://github.com/adjust/go-wrk
- https://golang.org/pkg/net/http/pprof/
- https://github.com/uber-archive/go-torch (すでにGo1.11から含まれてる)
Video
- GopherCon 2019: Dave Cheney - Two Go Programs, Three Different Profiling Techniques https://youtu.be/nok0aYiGiYA
- Profiling and Optimizing Go(古いけど) https://youtu.be/N3PWzBeLX2M
- 投稿日:2020-10-13T02:42:43+09:00
Cloud Pub/Subの「メッセージの順序指定」によりFIFOキューが実現できるのか?
お題
Cloud Pub/Sub使ってますか?
大量のメッセージを捌きたい時に使いますね。
これ、メッセージ投入時の順序、保証したい時、ないですか?
AWSの場合であればSQSのFIFOキューを使うのかなと思います。
(過去に使おうとしたことがあったものの、まだ東京リージョンなかったのと1秒に最大3000メッセージという制約に尻込みして、結局使わなかった記憶が。。。)
SQSに相当するといったらGCPではCloud Pub/Subだと思いますが、残念ながらFIFOキューは無い。
と思っていたら、
(Beta版ですが)「メッセージの順序指定」という機能が出来ていたようで。
これで、メッセージがPublishした順にSubscribeできるのかと思い、試してみました。とはいえ、だいぶアバウトな試し方なので、とりあえず、こういう試し方をしたところ、こういう結果になったというレベルの捉え方をしてもらえれば。
想定する読者
- GCPについては知っている。
- Cloud Pub/Subを使ったことがある。ないし、おおよそどういうものかは知っている。
- Golangもそれなりに書ける。
前提
- ローカルにGoの開発環境構築済み。
- GCP契約済み。
- ローカルでCloud SDKのセットアップ済み。
- ローカルの環境変数
GOOGLE_APPLICATION_CREDENTIALS
に(必要な権限を全て有したサービスアカウントの)鍵JSONファイルパス設定済み。開発環境
# OS - Linux(Ubuntu)
$ cat /etc/os-release NAME="Ubuntu" VERSION="18.04.5 LTS (Bionic Beaver)"# バックエンド
# 言語 - Golang
$ go version go version go1.15.2 linux/amd64IDE - Goland
GoLand 2020.2.3 Build #GO-202.7319.61, built on September 16, 2020今回の全ソース
https://github.com/sky0621/go-publisher/tree/v0.1.0
https://github.com/sky0621/go-subscriber/tree/v0.1.0ソース抜粋解説
go-publisher
5つのエンドポイントを用意。
topic.EnableMessageOrdering = true
とOrderingKey: operationSequence,
の部分は、実は「メッセージの順序指定」のために必要なコード。
ただし、このソースでメッセージをPublishするTopic("my-normal-topic"
)は「メッセージの順序指定」用ではないSubscriptionと対応しているので、「メッセージの順序指定」は機能しない。main.gopackage main import ( "fmt" "log" "net/http" "os" "time" "cloud.google.com/go/pubsub" "github.com/labstack/echo/v4" "github.com/labstack/echo/v4/middleware" ) func main() { project := os.Getenv("PUB_PROJECT") e := echo.New() e.Use(middleware.Logger()) e.Use(middleware.Recover()) e.GET("/order01", handler(project, "order01")) e.GET("/order02", handler(project, "order02")) e.GET("/order03", handler(project, "order03")) e.GET("/order04", handler(project, "order04")) e.GET("/order05", handler(project, "order05")) e.Logger.Fatal(e.Start(":8080")) } func handler(project, path string) func(c echo.Context) error { return func(c echo.Context) error { ctx := c.Request().Context() operationSequence := createOperationSequence() client, err := pubsub.NewClient(ctx, project) if err != nil { log.Fatal(err) } topic := client.Topic("my-normal-topic") defer topic.Stop() topic.EnableMessageOrdering = true message := &pubsub.Message{ OrderingKey: operationSequence, Data: []byte(path + ":" + operationSequence), } r := topic.Publish(ctx, message) if r == nil { log.Fatal("failed to topic.Publish!") } log.Printf("%+v", r) return c.String(http.StatusOK, path+":"+operationSequence) } } func createOperationSequence() string { return fmt.Sprintf("%d", time.Now().UnixNano()) }go-subscriber
main.gopackage main import ( "encoding/json" "io" "io/ioutil" "log" "net/http" "github.com/labstack/echo/v4" "github.com/labstack/echo/v4/middleware" ) func main() { e := echo.New() e.Use(middleware.Logger()) e.Use(middleware.Recover()) e.POST("/", func(c echo.Context) error { m, err := unmarshal(c.Request().Body) if err != nil { return c.String(http.StatusBadRequest, err.Error()) } data := string(m.Message.Data) log.Printf("fs_Received__Data:%s", data) return c.String(http.StatusOK, "OK") }) e.Logger.Fatal(e.Start(":8080")) } type PubSubMessage struct { Message struct { Data []byte `json:"data,omitempty"` ID string `json:"id"` } `json:"message"` Subscription string `json:"subscription"` } func unmarshal(r io.ReadCloser) (*PubSubMessage, error) { var m PubSubMessage body, err := ioutil.ReadAll(r) if err != nil { log.Printf("ioutil.ReadAll: %v", err) return nil, err } if err := json.Unmarshal(body, &m); err != nil { log.Printf("json.Unmarshal: %v", err) return nil, err } return &m, nil }5つのエンドポイントを順繰り叩くシェル
かなり雑なつくり。
#!/usr/bin/env bash set -euox pipefail for i in {0..15} do for j in {1..5} do curl "https://go-publisher-xxxxxxxxxxxxxx/order0${j}" sleepenh 0.005 done sleepenh 0.005 done実践
イメージ
RESTの口を持つサービスをCloud Runに載せておいて、アクセスが来たらCloud Pub/SubにPublishし、Pushタイプで用意しておいたSubscriptionが、これまたCloud Runに載せておいたRESTの口を持つサービスにリクエストを飛ばす。
通常のTopicとSubscriptionの場合
TopicとSubscriptionの設定
メッセージ投入結果
メッセージは、以下の順に繰り返しエンドポイントを叩いていて、Publisherのログ上もその順番になっているものの、Subscriberのログでは順不同になっている。
/order01
/order02
/order03
/order04
/order05エンドポイントを叩くシェルが出したログ上はもちろん順序通り。
「メッセージの順序指定」に対応したTopicとSubscriptionの場合
TopicとSubscriptionの設定
同じリージョンに存在している
という条件があるのでTopicのリージョンを指定。「メッセージの順序指定」は当然、「有効」にする。
メッセージ投入結果
何度か試してみた感じでは、立ち上がりに一部だけSubscribeする順番が入れ替わるものが発生するケースがあったが、それ以降は順序通りになっていた。
わりと順不同になっていた通常のTopic/Subscriptionの組み合わせの時と比べると安定した順番にはなっている。まとめ
「メッセージの順序指定」モードにしても、順番が入れ替わる事象が起きたので、お題に対して「実現できる」と言い切れず。。。
これが、試し方の問題なのか、まあBeta版だからなのか、気が向いたら、、、突き詰めてみようか。。。
- 投稿日:2020-10-13T01:37:12+09:00
goのgqlgenでgqlgenコマンドが効かない
事象
- goでGraphQLを実装するのにgqlgenを使っていた
- https://github.com/99designs/gqlgen
- READMEに書いてあるコマンド
go run github.com/99designs/gqlgen [args]
が実行できたり、できなかったりした発生条件
- go.modファイルに書いてあるgoのバージョンが1.12以下の人はおそらくREADME通りにやっても動作する
- go.modファイルに書いてあるgoのバージョンが1.13以上の人は
go mod vendor
後に実行するとおそらくエラーになる対処方法
go get github.com/99designs/gqlgen
した段階でおそらく$GOPATH/bin/gqlgen
が生成されているはずgqlgen [args]
で実行できる。($GOPATH/bin
にパスが通っているはず)最後に
- go run [package]は、実行する場所($GOPATH内かそうではないか)、go.modのバージョン、./vendorが存在するか否か、によって参照しにいく場所が変わるのでややこしい
- goのmoduleは1.12前と1.13以降で挙動が違うので、さっさとバージョンをあげてしまった方がよさそう
- 投稿日:2020-10-13T01:10:22+09:00
【続】tcpの仕様上、接続先がコネクションをcloseしているかはパケットを一度は実際に送るまでわからないよという話
はじめに
以前、tcpの仕様上、接続先がコネクションをcloseしているかはパケットを一度は実際に送るまでわからないよという話という記事をかいたのですが、そのきっかけは
以前、アプリからDBにSQLを投げたところ、コネクションがinvalidだというエラーが起きました。この原因自体はとても簡単でサーバ側(DB側)のコネクションを保持するタイムアウト設定がクライアントよりも短かったというだけなのですが、「これってクライアントライブラリ側でソケットにwriteした時点でエラーになるんだからハンドリングしてコネクションプールに保持している他のコネクションをよしなに使ってよ!!」と思ったのでした。
でした。
これはgoのmysqlドライバーを利用した場合に発生していたのですが、まさにこの問題をGitHubの中の人が去年に修正しており、それをテーマにブログを書かれていました。
それが非常に勉強になったので、わかりずらい部分を補足しつつ紹介したいと思います。
ブログを読む
背景
Three bugs in the Go MySQL Driverです。
背景とかの話も非常に興味深かったので若干主旨とずれる部分も紹介します。
GitHubのサービスはRailsのモノリスだったのを、ここ数年で少しずつ、速度や信頼性が必要な部分を中心に切り出していってGolangで書き換えていっているようです。
その中の一つのサービスが2019年に稼働した
authzd
というサービスで、GitHub社のGoで書かれたWebアプリでMySQLに接続する初めてのサービスだったようです。そのブログはその際に経験したバグに対してGitHub社が修正した3つのPRをもとに修正が紹介されています。今回は最初に紹介されている
The crash
の部分を紹介します。ちなみに
resulting in our first “9” of availability for the service
と書いてあったのでThe crash
の修正によってサービス可用性90%突破したようです。業務でサービス可用性目標をあげるところもあると思いますが、OSSがボトルネックになっていたのでOSSを修正するというのは素晴らしいですね!!
ちなみにブログに添付されているスクリーンショットはDatadogのmonitorのようなので、GitHub社もDatadogを利用しているようですね(どうでもいい)
The crash
どんな話なのかを先にざっくり書いてしまうと、MySQLのサーバ側の
idle timeout
がクライアントのそれよりも短い場合、クライアントからクエリを送ろうとしたとき実はそのコネクションはサーバ側がcloseしていたということがおこりえます。その場合にクライアントとしては強制的にエラーになることを経験します。この問題への対応としては簡単で
(*DB).SetConnMaxLifetime
をサーバのidle timeout
より小さくすればいいだけです。
ただ、SetConnMaxLifetime
であってSetIdleConnMaxLifetime
ではないので、idleではなくactiveなコネクションも不必要にcloseされてしまい、イケていないです。これは全てのDBサーバのコネクションにidle
という概念があるわけではないため、database/sql
側が用意していないという背景があるようです。自分はまさに上記の対応を行って(参考までにDBの
idle timeout
はAWSのAuroraの場合、デフォルトで8h
に設定されているようです。GitHub社では30s
に設定しているようです。短い!!)おり、そのときにmysqlドライバー側でよしなにできないの?と思い、調査したことを以前記事にしたわけですが、修正してくれたようです。さて、詳細に入っていきます。
記事の序盤はtcpの仕様上、接続先がコネクションをcloseしているかはパケットを一度は実際に送るまでわからないよという話とほぼ同じことがTCPの遷移図とともに書かれています。
TCPの仕様上、サーバがFINパケットを送ってもそれはあくまでサーバ側がwriteしないことのみを意味し、クライアントからはサーバへwriteし、サーバがreadし処理をするのはありえます。そして、サーバがwriteもreadも全くなにもしない(例えばソケットをcloseするなど)ことを安全にクライアントへ伝える方法はtcpのプロトコルに存在しません。
わかりやすいので以下引用しますが、TCPの上記のような特性はほとんどのプロトコルの場合は問題にならないようですが、MySQLのプロトコルは「クライアントが送りサーバがそれに返答する」という流れが決まっており、クライアントは
write
するまでread
することがないようです。In most network protocols on top of TCP, this isn’t an issue. The client is performing reads from the server, and as soon as it receives a [SYN, ACK], the next read returns an EOF error, because the Kernel knows that the server won’t write more data to this connection. However, as discussed earlier, once a MySQL connection is in its Command Phase, the MySQL protocol is client-managed. The client only reads from the server after it sends a request, because the server only sends data in response to requests from the client.
そういえば、この特性はHTTP/1.x(pipeliningは除く)も同様かと思いますが、以前、Goのhttp.Requestのキャンセルの仕組みを理解するという記事で書いたようにGoのhttpサーバの実装ではリクエストボディを読み切ったタイミングでソケットをReadするgoルーチンが作成され、サーバ処理中にクライアント側のcloseに気づけるようになっています。こちらはサーバ側の話ですが。
ここまで話を聞いて、エラーだったらretryしてくれよと思う方もいるかもしれません。
実は、retryの仕組みはdatabase/sql
に用意されており、ErrBadConn
を返却するようにすれば、maxBadConnRetries(2回)リトライし、それでもエラーになればコネクションプールを利用せずに新規のコネクションを作成する実装になっています。以下は
QueryContext
の例ですが、database/sql
のあらゆる処理に同じようなリトライの処理があり、また、driver側(goのmysqlドライバーも)でもdatabase/sql/driver
をimport
して、driver.ErrBadConn
を返却しているケースがあるようです。database/sql/driver/driver.go// ErrBadConn should be returned by a driver to signal to the sql // package that a driver.Conn is in a bad state (such as the server // having earlier closed the connection) and the sql package should // retry on a new connection. // // To prevent duplicate operations, ErrBadConn should NOT be returned // if there's a possibility that the database server might have // performed the operation. Even if the server sends back an error, // you shouldn't return ErrBadConn. var ErrBadConn = errors.New("driver: bad connection")database/sql/sql.go// QueryContext executes a query that returns rows, typically a SELECT. // The args are for any placeholder parameters in the query. func (db *DB) QueryContext(ctx context.Context, query string, args ...interface{}) (*Rows, error) { var rows *Rows var err error for i := 0; i < maxBadConnRetries; i++ { rows, err = db.query(ctx, query, args, cachedOrNewConn) if err != driver.ErrBadConn { break } } if err == driver.ErrBadConn { return db.query(ctx, query, args, alwaysNewConn) } return rows, err }今回のも同じように
ErrBadConn
を返却するようにしていればそもそも問題にならない(リトライに失敗しても最後にはコネクションプールを必ず使わないから)のですが、エラーが発覚する箇所がwrite
である(Goのhttpserver実装のようになんらかの仕組みを用意しない限りwriteで初めてサーバのcloseに気付く)ので、常に安全にリトライできないという事情があるようです。ブログに紹介されている以下のケースは、まさに
ErrBadConn
のコメントにあるTo prevent duplicate operations, ErrBadConn should NOT be returned if there's a possibility that the database server might have performed the operation
のケースなので、ErrBadConn
は返却してはいけないということになります。What would happen if we performed an UPDATE in a perfectly healthy connection, MySQL executed it, and then our network went down before it could reply to us? The Go MySQL driver would also receive an EOF after a valid write. But if it were to return driver.ErrBadConn, database/sql would
では、
write
するまえにnon-blockingでread
してEOFであればErrBadConn
をなげればいいのでは?と思うかもしれませんが、まさにそれがPRで対応されていることです!
いやー、事情が複雑ですね。。
PRを読む
packets: Check connection liveness before writing queryを実際によんでいきましょう。
修正方針を前章で把握するのだけでもお腹いっぱいですが、PRも100行ほどの小さいPRにもかかわらず、なかなか勉強になりました。勉強になった点を3つ紹介します。
チェックを行うときには生のファイルディスクリプタを参照する
やることは前章で整理したように
write
する直前にソケットをnon-blockingでread
してすでにサーバがclose済みであればErrBadConn
を返すだけです。が、Goのネットワーク処理はAPIとしては同期的なAPIを提供しているが、実は内部ではnon-blockingな処理がされています。
簡単に説明すると、netpollerと呼ばれる仕組みでネットワークの待ちになった際に、goroutineが元処理から切り離されepollなどのシステムコールで非同期にソケットに対するイベントを把握し、処理可能になったら再度goroutineを割り当てる仕組みがgoのランタイムには備わっています(といっても自分は該当部分のソースを読んだことがないです)
これは本当に素晴らしい仕組みだと思うのですが、今回のようにブロックされることがないことが確定している場合には生のファイルディスクリタを利用したシステムコールを使った方が好ましいです。というわけで以下のような実装がされています。
明示的にnon-blockingにしていないのは、生のファイルディスクリタはGoのランタイム側ですでに
O_NONBLOCK
指定されているためだと思います。conncheck.gosconn, ok := c.(syscall.Conn) if !ok { return nil } rc, err := sconn.SyscallConn() if err != nil { return err } rerr := rc.Read(func(fd uintptr) bool { n, err = syscall.Read(int(fd), buff[:]) return true }) switch { case rerr != nil: return rerr case n == 0 && err == nil: return io.EOF case n > 0: return errUnexpectedRead case err == syscall.EAGAIN || err == syscall.EWOULDBLOCK: return nil default: return err }チェックを行う回数をできるだけ少なくする
ResetSession
がsql/driver
側でinterfaceで定義されており、この処理は処理済みのコネクションをコネクションプールに戻す時にsql/driver
が呼びます。これにより実装するdriver側が処理を行う機会を得ます。今回のPRではこのinterface実装でコネクションに設けたフラグをonにし、
write
時にこのフラグがあるときのみチェックを行い、チェック後フラグをoffにするという工夫がされています。これにより、コネクションプールから取得したコネクションが最初に通信する時のみにチェックがされることになります。すごい!!
database/sql/driver/driver.go// SessionResetter may be implemented by Conn to allow drivers to reset the // session state associated with the connection and to signal a bad connection. type SessionResetter interface { // ResetSession is called while a connection is in the connection // pool. No queries will run on this connection until this method returns. // // If the connection is bad this should return driver.ErrBadConn to prevent // the connection from being returned to the connection pool. Any other // error will be discarded. ResetSession(ctx context.Context) error }windowsでは何もしない
PRではwindowsで動作確認がとれておらず、CIも存在しない。どうやって動作確認しようか
と議論がつまりかけたのですが、// +build !windows
が指定されているconncheck.go
とconncheck_windows.go
の両方のファイルでconnCheck
関数を実装し、conncheck_windows.go
側ではnil
を返すというだけという技を使って議論を進めていました。これによってwindows側には何の変更もなしに修正したことになります。すごい!!
おわりに
PRを確認すると最初にあげる時点でかなり詳しく説明し、パフォーマンス遅延などへの影響等も検証されていてすごいと思いました。OSSでPRあげる時はどうしても低姿勢になりがちな気がしますが、自分の修正内容に自信をもっていて、フローが遅いぞと圧をかけたり、こんな大変なissueも残っているんだと言われた場合もokそっちは来週PR作るねといって実際にmergeされているのでやばい
内容が素晴らしく、自分も努力せねばと思ったので紹介させていただきました。