20201013のGoに関する記事は6件です。

【Golang】mapのvalueに配列を指定する

概要

mapの中にvalueに配列をいれるみたいなのをGoでしたかったので自分用メモ

実装

package main

import "fmt"

type Hoge struct {
    Say string
}

func main()  {
    a := map[string][]Hoge{}
    a["hoge"] = append(a["hoge"], Hoge{Say: "hello1"})
    a["hoge"] = append(a["hoge"], Hoge{Say: "hello2"})
    a["hoge1"] = append(a["hoge1"], Hoge{Say: "hello3"})
    fmt.Println(a)
}

上記をgo runすると

map[hoge:[{hello1} {hello2}] hoge1:[{hello3}]]

のようにmapのvalueの中にarrayが入ります

  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

テスト記事1

test1

  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

golang profiling

参考資料をメモするだけ

Video

  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

Cloud Pub/Subの「メッセージの順序指定」によりFIFOキューが実現できるのか?

お題

Cloud Pub/Sub使ってますか?
大量のメッセージを捌きたい時に使いますね。
これ、メッセージ投入時の順序、保証したい時、ないですか?
AWSの場合であればSQSのFIFOキューを使うのかなと思います。
(過去に使おうとしたことがあったものの、まだ東京リージョンなかったのと1秒に最大3000メッセージという制約に尻込みして、結局使わなかった記憶が。。。)
SQSに相当するといったらGCPではCloud Pub/Subだと思いますが、残念ながらFIFOキューは無い。
と思っていたら、
(Beta版ですが)「メッセージの順序指定」という機能が出来ていたようで。
これで、メッセージがPublishした順にSubscribeできるのかと思い、試してみました。

とはいえ、だいぶアバウトな試し方なので、とりあえず、こういう試し方をしたところ、こういう結果になったというレベルの捉え方をしてもらえれば。

想定する読者

  • GCPについては知っている。
  • Cloud Pub/Subを使ったことがある。ないし、おおよそどういうものかは知っている。
  • Golangもそれなりに書ける。

前提

  • ローカルにGoの開発環境構築済み。
  • GCP契約済み。
  • ローカルでCloud SDKのセットアップ済み。
  • ローカルの環境変数GOOGLE_APPLICATION_CREDENTIALSに(必要な権限を全て有したサービスアカウントの)鍵JSONファイルパス設定済み。

開発環境

# OS - Linux(Ubuntu)

$ cat /etc/os-release 
NAME="Ubuntu"
VERSION="18.04.5 LTS (Bionic Beaver)"

# バックエンド

# 言語 - Golang

$ go version
go version go1.15.2 linux/amd64

IDE - Goland

GoLand 2020.2.3
Build #GO-202.7319.61, built on September 16, 2020

今回の全ソース

https://github.com/sky0621/go-publisher/tree/v0.1.0
https://github.com/sky0621/go-subscriber/tree/v0.1.0

ソース抜粋解説

go-publisher

5つのエンドポイントを用意。
topic.EnableMessageOrdering = trueOrderingKey: operationSequence,の部分は、実は「メッセージの順序指定」のために必要なコード。
ただし、このソースでメッセージをPublishするTopic("my-normal-topic")は「メッセージの順序指定」用ではないSubscriptionと対応しているので、「メッセージの順序指定」は機能しない。

main.go
package main

import (
    "fmt"
    "log"
    "net/http"
    "os"
    "time"

    "cloud.google.com/go/pubsub"
    "github.com/labstack/echo/v4"
    "github.com/labstack/echo/v4/middleware"
)

func main() {
    project := os.Getenv("PUB_PROJECT")

    e := echo.New()
    e.Use(middleware.Logger())
    e.Use(middleware.Recover())

    e.GET("/order01", handler(project, "order01"))
    e.GET("/order02", handler(project, "order02"))
    e.GET("/order03", handler(project, "order03"))
    e.GET("/order04", handler(project, "order04"))
    e.GET("/order05", handler(project, "order05"))

    e.Logger.Fatal(e.Start(":8080"))
}

func handler(project, path string) func(c echo.Context) error {
    return func(c echo.Context) error {
        ctx := c.Request().Context()
        operationSequence := createOperationSequence()

        client, err := pubsub.NewClient(ctx, project)
        if err != nil {
            log.Fatal(err)
        }

        topic := client.Topic("my-normal-topic")
        defer topic.Stop()

        topic.EnableMessageOrdering = true

        message := &pubsub.Message{
            OrderingKey: operationSequence,
            Data:        []byte(path + ":" + operationSequence),
        }
        r := topic.Publish(ctx, message)
        if r == nil {
            log.Fatal("failed to topic.Publish!")
        }
        log.Printf("%+v", r)

        return c.String(http.StatusOK, path+":"+operationSequence)
    }
}

func createOperationSequence() string {
    return fmt.Sprintf("%d", time.Now().UnixNano())
}

go-subscriber

main.go
package main

import (
    "encoding/json"
    "io"
    "io/ioutil"
    "log"
    "net/http"

    "github.com/labstack/echo/v4"
    "github.com/labstack/echo/v4/middleware"
)

func main() {
    e := echo.New()
    e.Use(middleware.Logger())
    e.Use(middleware.Recover())

    e.POST("/", func(c echo.Context) error {
        m, err := unmarshal(c.Request().Body)
        if err != nil {
            return c.String(http.StatusBadRequest, err.Error())
        }
        data := string(m.Message.Data)
        log.Printf("fs_Received__Data:%s", data)
        return c.String(http.StatusOK, "OK")
    })

    e.Logger.Fatal(e.Start(":8080"))
}

type PubSubMessage struct {
    Message struct {
        Data []byte `json:"data,omitempty"`
        ID   string `json:"id"`
    } `json:"message"`
    Subscription string `json:"subscription"`
}

func unmarshal(r io.ReadCloser) (*PubSubMessage, error) {
    var m PubSubMessage
    body, err := ioutil.ReadAll(r)
    if err != nil {
        log.Printf("ioutil.ReadAll: %v", err)
        return nil, err
    }
    if err := json.Unmarshal(body, &m); err != nil {
        log.Printf("json.Unmarshal: %v", err)
        return nil, err
    }
    return &m, nil
}

5つのエンドポイントを順繰り叩くシェル

かなり雑なつくり。

#!/usr/bin/env bash
set -euox pipefail

for i in {0..15}
do
    for j in {1..5}
    do
        curl "https://go-publisher-xxxxxxxxxxxxxx/order0${j}"
        sleepenh 0.005
    done
    sleepenh 0.005
done

実践

イメージ

RESTの口を持つサービスをCloud Runに載せておいて、アクセスが来たらCloud Pub/SubにPublishし、Pushタイプで用意しておいたSubscriptionが、これまたCloud Runに載せておいたRESTの口を持つサービスにリクエストを飛ばす。

screenshot-app.cloudskew.com-2020.10.13-01_50_36.png

通常のTopicとSubscriptionの場合

TopicとSubscriptionの設定

screenshot-console.cloud.google.com-2020.10.12-23_04_10.png

screenshot-console.cloud.google.com-2020.10.12-23_06_00.png

メッセージ投入結果

メッセージは、以下の順に繰り返しエンドポイントを叩いていて、Publisherのログ上もその順番になっているものの、Subscriberのログでは順不同になっている。
/order01
/order02
/order03
/order04
/order05

screenshot-console.cloud.google.com-2020.10.13-00_48_38.png

エンドポイントを叩くシェルが出したログ上はもちろん順序通り。

Screenshot at 2020-10-13 00-52-17.png

「メッセージの順序指定」に対応したTopicとSubscriptionの場合

TopicとSubscriptionの設定

同じリージョンに存在しているという条件があるのでTopicのリージョンを指定。

screenshot-console.cloud.google.com-2020.10.12-23_14_13.png
screenshot-console.cloud.google.com-2020.10.12-23_24_06.png

「メッセージの順序指定」は当然、「有効」にする。

screenshot-console.cloud.google.com-2020.10.12-23_22_08.png

メッセージ投入結果

何度か試してみた感じでは、立ち上がりに一部だけSubscribeする順番が入れ替わるものが発生するケースがあったが、それ以降は順序通りになっていた。
わりと順不同になっていた通常のTopic/Subscriptionの組み合わせの時と比べると安定した順番にはなっている。

Screenshot at 2020-10-13 01-30-45.png

まとめ

「メッセージの順序指定」モードにしても、順番が入れ替わる事象が起きたので、お題に対して「実現できる」と言い切れず。。。
これが、試し方の問題なのか、まあBeta版だからなのか、気が向いたら、、、突き詰めてみようか。。。

  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

goのgqlgenでgqlgenコマンドが効かない

事象

  • goでGraphQLを実装するのにgqlgenを使っていた
  • https://github.com/99designs/gqlgen
  • READMEに書いてあるコマンド go run github.com/99designs/gqlgen [args]が実行できたり、できなかったりした

発生条件

  • go.modファイルに書いてあるgoのバージョンが1.12以下の人はおそらくREADME通りにやっても動作する
  • go.modファイルに書いてあるgoのバージョンが1.13以上の人はgo mod vendor後に実行するとおそらくエラーになる

対処方法

  • go get github.com/99designs/gqlgenした段階でおそらく$GOPATH/bin/gqlgenが生成されているはず
  • gqlgen [args]で実行できる。($GOPATH/binにパスが通っているはず)

最後に

  • go run [package]は、実行する場所($GOPATH内かそうではないか)、go.modのバージョン、./vendorが存在するか否か、によって参照しにいく場所が変わるのでややこしい
  • goのmoduleは1.12前と1.13以降で挙動が違うので、さっさとバージョンをあげてしまった方がよさそう
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

【続】tcpの仕様上、接続先がコネクションをcloseしているかはパケットを一度は実際に送るまでわからないよという話

はじめに

以前、tcpの仕様上、接続先がコネクションをcloseしているかはパケットを一度は実際に送るまでわからないよという話という記事をかいたのですが、そのきっかけは

以前、アプリからDBにSQLを投げたところ、コネクションがinvalidだというエラーが起きました。この原因自体はとても簡単でサーバ側(DB側)のコネクションを保持するタイムアウト設定がクライアントよりも短かったというだけなのですが、「これってクライアントライブラリ側でソケットにwriteした時点でエラーになるんだからハンドリングしてコネクションプールに保持している他のコネクションをよしなに使ってよ!!」と思ったのでした。

でした。

これはgoのmysqlドライバーを利用した場合に発生していたのですが、まさにこの問題をGitHubの中の人が去年に修正しており、それをテーマにブログを書かれていました。

それが非常に勉強になったので、わかりずらい部分を補足しつつ紹介したいと思います。

ブログを読む

背景

Three bugs in the Go MySQL Driverです。

背景とかの話も非常に興味深かったので若干主旨とずれる部分も紹介します。

GitHubのサービスはRailsのモノリスだったのを、ここ数年で少しずつ、速度や信頼性が必要な部分を中心に切り出していってGolangで書き換えていっているようです。

その中の一つのサービスが2019年に稼働したauthzdというサービスで、GitHub社のGoで書かれたWebアプリでMySQLに接続する初めてのサービスだったようです。

そのブログはその際に経験したバグに対してGitHub社が修正した3つのPRをもとに修正が紹介されています。今回は最初に紹介されているThe crashの部分を紹介します。

ちなみにresulting in our first “9” of availability for the serviceと書いてあったのでThe crashの修正によってサービス可用性90%突破したようです。

業務でサービス可用性目標をあげるところもあると思いますが、OSSがボトルネックになっていたのでOSSを修正するというのは素晴らしいですね!!

ちなみにブログに添付されているスクリーンショットはDatadogのmonitorのようなので、GitHub社もDatadogを利用しているようですね(どうでもいい)

The crash

どんな話なのかを先にざっくり書いてしまうと、MySQLのサーバ側のidle timeoutがクライアントのそれよりも短い場合、クライアントからクエリを送ろうとしたとき実はそのコネクションはサーバ側がcloseしていたということがおこりえます。その場合にクライアントとしては強制的にエラーになることを経験します。

この問題への対応としては簡単で(*DB).SetConnMaxLifetimeをサーバのidle timeoutより小さくすればいいだけです。
ただ、SetConnMaxLifetimeであってSetIdleConnMaxLifetimeではないので、idleではなくactiveなコネクションも不必要にcloseされてしまい、イケていないです。これは全てのDBサーバのコネクションにidleという概念があるわけではないため、database/sql側が用意していないという背景があるようです。

自分はまさに上記の対応を行って(参考までにDBのidle timeoutはAWSのAuroraの場合、デフォルトで8hに設定されているようです。GitHub社では30sに設定しているようです。短い!!)おり、そのときにmysqlドライバー側でよしなにできないの?と思い、調査したことを以前記事にしたわけですが、修正してくれたようです。

さて、詳細に入っていきます。

記事の序盤はtcpの仕様上、接続先がコネクションをcloseしているかはパケットを一度は実際に送るまでわからないよという話とほぼ同じことがTCPの遷移図とともに書かれています。

TCPの仕様上、サーバがFINパケットを送ってもそれはあくまでサーバ側がwriteしないことのみを意味し、クライアントからはサーバへwriteし、サーバがreadし処理をするのはありえます。そして、サーバがwriteもreadも全くなにもしない(例えばソケットをcloseするなど)ことを安全にクライアントへ伝える方法はtcpのプロトコルに存在しません。

わかりやすいので以下引用しますが、TCPの上記のような特性はほとんどのプロトコルの場合は問題にならないようですが、MySQLのプロトコルは「クライアントが送りサーバがそれに返答する」という流れが決まっており、クライアントはwriteするまでreadすることがないようです。

In most network protocols on top of TCP, this isn’t an issue. The client is performing reads from the server, and as soon as it receives a [SYN, ACK], the next read returns an EOF error, because the Kernel knows that the server won’t write more data to this connection. However, as discussed earlier, once a MySQL connection is in its Command Phase, the MySQL protocol is client-managed. The client only reads from the server after it sends a request, because the server only sends data in response to requests from the client.

そういえば、この特性はHTTP/1.x(pipeliningは除く)も同様かと思いますが、以前、Goのhttp.Requestのキャンセルの仕組みを理解するという記事で書いたようにGoのhttpサーバの実装ではリクエストボディを読み切ったタイミングでソケットをReadするgoルーチンが作成され、サーバ処理中にクライアント側のcloseに気づけるようになっています。こちらはサーバ側の話ですが。

ここまで話を聞いて、エラーだったらretryしてくれよと思う方もいるかもしれません。
実は、retryの仕組みはdatabase/sqlに用意されており、ErrBadConnを返却するようにすれば、maxBadConnRetries(2回)リトライし、それでもエラーになればコネクションプールを利用せずに新規のコネクションを作成する実装になっています。

以下はQueryContextの例ですが、database/sqlのあらゆる処理に同じようなリトライの処理があり、また、driver側(goのmysqlドライバーも)でもdatabase/sql/driverimportして、driver.ErrBadConnを返却しているケースがあるようです。

database/sql/driver/driver.go
// ErrBadConn should be returned by a driver to signal to the sql
// package that a driver.Conn is in a bad state (such as the server
// having earlier closed the connection) and the sql package should
// retry on a new connection.
//
// To prevent duplicate operations, ErrBadConn should NOT be returned
// if there's a possibility that the database server might have
// performed the operation. Even if the server sends back an error,
// you shouldn't return ErrBadConn.
var ErrBadConn = errors.New("driver: bad connection")
database/sql/sql.go
// QueryContext executes a query that returns rows, typically a SELECT.
// The args are for any placeholder parameters in the query.
func (db *DB) QueryContext(ctx context.Context, query string, args ...interface{}) (*Rows, error) {
    var rows *Rows
    var err error
    for i := 0; i < maxBadConnRetries; i++ {
        rows, err = db.query(ctx, query, args, cachedOrNewConn)
        if err != driver.ErrBadConn {
            break
        }
    }
    if err == driver.ErrBadConn {
        return db.query(ctx, query, args, alwaysNewConn)
    }
    return rows, err
}

今回のも同じようにErrBadConnを返却するようにしていればそもそも問題にならない(リトライに失敗しても最後にはコネクションプールを必ず使わないから)のですが、エラーが発覚する箇所がwriteである(Goのhttpserver実装のようになんらかの仕組みを用意しない限りwriteで初めてサーバのcloseに気付く)ので、常に安全にリトライできないという事情があるようです。

ブログに紹介されている以下のケースは、まさにErrBadConnのコメントにあるTo prevent duplicate operations, ErrBadConn should NOT be returned if there's a possibility that the database server might have performed the operationのケースなので、ErrBadConnは返却してはいけないということになります。

What would happen if we performed an UPDATE in a perfectly healthy connection, MySQL executed it, and then our network went down before it could reply to us? The Go MySQL driver would also receive an EOF after a valid write. But if it were to return driver.ErrBadConn, database/sql would

では、writeするまえにnon-blockingでreadしてEOFであればErrBadConnをなげればいいのでは?

と思うかもしれませんが、まさにそれがPRで対応されていることです!

いやー、事情が複雑ですね。。

PRを読む

packets: Check connection liveness before writing queryを実際によんでいきましょう。
修正方針を前章で把握するのだけでもお腹いっぱいですが、PRも100行ほどの小さいPRにもかかわらず、なかなか勉強になりました。

勉強になった点を3つ紹介します。

チェックを行うときには生のファイルディスクリプタを参照する

やることは前章で整理したようにwriteする直前にソケットをnon-blockingでreadしてすでにサーバがclose済みであればErrBadConnを返すだけです。

が、Goのネットワーク処理はAPIとしては同期的なAPIを提供しているが、実は内部ではnon-blockingな処理がされています。

簡単に説明すると、netpollerと呼ばれる仕組みでネットワークの待ちになった際に、goroutineが元処理から切り離されepollなどのシステムコールで非同期にソケットに対するイベントを把握し、処理可能になったら再度goroutineを割り当てる仕組みがgoのランタイムには備わっています(といっても自分は該当部分のソースを読んだことがないです)

これは本当に素晴らしい仕組みだと思うのですが、今回のようにブロックされることがないことが確定している場合には生のファイルディスクリタを利用したシステムコールを使った方が好ましいです。というわけで以下のような実装がされています。

明示的にnon-blockingにしていないのは、生のファイルディスクリタはGoのランタイム側ですでにO_NONBLOCK指定されているためだと思います。

conncheck.go
    sconn, ok := c.(syscall.Conn)
    if !ok {
        return nil
    }
    rc, err := sconn.SyscallConn()
    if err != nil {
        return err
    }
    rerr := rc.Read(func(fd uintptr) bool {
        n, err = syscall.Read(int(fd), buff[:])
        return true
    })
    switch {
    case rerr != nil:
        return rerr
    case n == 0 && err == nil:
        return io.EOF
    case n > 0:
        return errUnexpectedRead
    case err == syscall.EAGAIN || err == syscall.EWOULDBLOCK:
        return nil
    default:
        return err
    }

チェックを行う回数をできるだけ少なくする

ResetSessionsql/driver側でinterfaceで定義されており、この処理は処理済みのコネクションをコネクションプールに戻す時にsql/driverが呼びます。これにより実装するdriver側が処理を行う機会を得ます。

今回のPRではこのinterface実装でコネクションに設けたフラグをonにし、write時にこのフラグがあるときのみチェックを行い、チェック後フラグをoffにするという工夫がされています。

これにより、コネクションプールから取得したコネクションが最初に通信する時のみにチェックがされることになります。すごい!!

database/sql/driver/driver.go
// SessionResetter may be implemented by Conn to allow drivers to reset the
// session state associated with the connection and to signal a bad connection.
type SessionResetter interface {
    // ResetSession is called while a connection is in the connection
    // pool. No queries will run on this connection until this method returns.
    //
    // If the connection is bad this should return driver.ErrBadConn to prevent
    // the connection from being returned to the connection pool. Any other
    // error will be discarded.
    ResetSession(ctx context.Context) error
}

windowsでは何もしない

PRではwindowsで動作確認がとれておらず、CIも存在しない。どうやって動作確認しようか
と議論がつまりかけたのですが、// +build !windowsが指定されているconncheck.goconncheck_windows.goの両方のファイルでconnCheck関数を実装し、conncheck_windows.go側ではnilを返すというだけという技を使って議論を進めていました。これによってwindows側には何の変更もなしに修正したことになります。

すごい!!

おわりに

PRを確認すると最初にあげる時点でかなり詳しく説明し、パフォーマンス遅延などへの影響等も検証されていてすごいと思いました。OSSでPRあげる時はどうしても低姿勢になりがちな気がしますが、自分の修正内容に自信をもっていて、フローが遅いぞと圧をかけたり、こんな大変なissueも残っているんだと言われた場合もokそっちは来週PR作るねといって実際にmergeされているのでやばい

内容が素晴らしく、自分も努力せねばと思ったので紹介させていただきました。

  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む