- 投稿日:2022-02-01T22:57:36+09:00
【gRPC】GoでStreaming APIを実装する
はじめに 前回の記事で、gRPCのAPIタイプの内、1リクエストに対して1レスポンスを返すUnaryを実装しました。 今回は1回のコネクションで複数のリクエストやレスポンスを送るタイプである、Streaming APIを実装してみました。 Streaming API Streaming APIはHTTP/2の恩恵を受けており、リクエスト/レスポンスのたびにコネクションを確立しないというメリットをもちます。 3種類のタイプがあり、それぞれが以下のような特長をもっています。 Server Streaming: 1リクエストに対して複数レスポンスを返す。サーバが大量のデータを送りたいとき(ライブ配信やチャット)に使い、リクエストを受け取らずとも非同期でレスポンスを返す。 Client Streaming: 複数リクエストに対して1レスポンスを返す。クライアントが大量のデータを送りたいとき(データアップロードなど)に使い、サーバは非同期でレスポンスを返すことができる。 Bi Directional Streaming: 複数リクエストに対して複数レスポンスを返す。リクエストとレスポンスの数は合っている必要がなく、サーバは返すレスポンス数を選択することができる。非同期でデータを送りあうようなチャットや長時間の接続を行うゲームのオンライン対戦などの用途で使用する。 実装 スキーマ作成、サーバ実装、クライアント実装の順で行います。 スキーマ作成(.protoファイルの作成) Unaryのときと同様にスキーマを作成します。 rpc GreetManyTimes(GreetManyTimesRequest) returns (stream GreetManytimesResponse) {};のように、リクエストやレスポンスの前にstreamをつけることで、各Streaming APIを実現します。 message Greeting { string first_name = 1; string last_name = 2; } message GreetManyTimesRequest { Greeting greeting = 1; } message GreetManytimesResponse { string result = 1; } message LongGreetRequest { Greeting greeting = 1; } message LongGreetResponse { string result = 1; } message GreetEveryoneRequest { Greeting greeting = 1; } message GreetEveryoneResponse { string result = 1; } service GreetService{ // Server Streaming rpc GreetManyTimes(GreetManyTimesRequest) returns (stream GreetManytimesResponse) {}; // Client Streaming rpc LongGreet(stream LongGreetRequest) returns (LongGreetResponse) {}; // Bi Directional Streaming rpc GreetEveryone(stream GreetEveryoneRequest) returns (stream GreetEveryoneResponse) {}; } サーバ実装 protoc greet/greetpb/greet.proto --go_out=plugins=grpc:.でgreet.pb.goファイルを生成後、サーバ実装を行います。 GreetManyTimes()はServer Streamingのため、実装では1000 msごとに10回レスポンスを返すようにしています。 LongGreet()はClient Streamingのため、リクエストが終わるまでresultを連結して、最終的なレスポンスを一つのResultとして返します。 GreetEveryone()はBi Directional Streamingで、今回はすべてのリクエストに対してレスポンスを返しています(レスポンスの数は自由に選択するような実装も可能です)。 server.go // Server Streaming func (*server) GreetManyTimes(req *greetpb.GreetManyTimesRequest, stream greetpb.GreetService_GreetManyTimesServer) error { fmt.Printf("GreetManyTimes function was invoked with %v\n", req) firstName := req.GetGreeting().GetFirstName() for i := 0; i < 10; i++ { result := "Hello " + firstName + " number " + strconv.Itoa(i) res := &greetpb.GreetManytimesResponse{ Result: result, } stream.Send(res) time.Sleep(1000 * time.Millisecond) } return nil } // Client Streaming func (*server) LongGreet(stream greetpb.GreetService_LongGreetServer) error { fmt.Printf("LongGreet function was invoked with a streaming request\n") result := "" for { req, err := stream.Recv() if err == io.EOF { // we have finished reading the client stream return stream.SendAndClose(&greetpb.LongGreetResponse{ Result: result, }) } if err != nil { log.Fatalf("Error while reading client stream: %v", err) } firstName := req.GetGreeting().GetFirstName() result += "Hello " + firstName + "! " } } // Bi Directional Streaming func (*server) GreetEveryone(stream greetpb.GreetService_GreetEveryoneServer) error { fmt.Printf("GreetEveryone function was invoked with a streaming request\n") for { req, err := stream.Recv() if err == io.EOF { return nil } if err != nil { log.Fatalf("Error while reading client stream: %v", err) return err } firstName := req.GetGreeting().GetFirstName() result := "Hello " + firstName + "! " sendErr := stream.Send(&greetpb.GreetEveryoneResponse{ Result: result, }) if sendErr != nil { log.Fatalf("Error while sending data to client: %v", sendErr) return sendErr } } } クライアント実装 最後にクライアント側を実装します。 doServerStreaming()では、resStream, err := c.GreetManyTimes(context.Background(), req)でレスポンスを取得して、for内でメッセージを一つずつ出力しています。 doClientStreaming()では、Streamingで送りたいリクエストrequestsをつくり、1000 msごとにstream.Send(req)としてサーバに送ります。 そして、res, err := stream.CloseAndRecv()で1つのレスポンスを取得し、最後に出力します。 doBiDiStreaming()では、doClientStreaming()と同様にstream.Send(req)でサーバにリクエストを送ります。 Bi Directional Streamingは非同期でリクエストとレスポンスを送りあうので、この辺りの処理をgoroutineで実装します。 すべてのリクエストとレスポンスが修了したら、<-waitcで修了します。 client.go // Server Streaming func doServerStreaming(c greetpb.GreetServiceClient) { fmt.Println("Starting to do a Server Streaming RPC...") req := &greetpb.GreetManyTimesRequest{ Greeting: &greetpb.Greeting{ FirstName: "Stephane", LastName: "Maarek", }, } resStream, err := c.GreetManyTimes(context.Background(), req) if err != nil { log.Fatalf("error while calling GreetManyTimes RPC: %v", err) } for { msg, err := resStream.Recv() if err == io.EOF { // we've reached the end of the stream break } if err != nil { log.Fatalf("error while reading stream: %v", err) } log.Printf("Response from GreetManyTimes: %v", msg.GetResult()) } } // Client Streaming func doClientStreaming(c greetpb.GreetServiceClient) { fmt.Println("Starting to do a Client Streaming RPC...") requests := []*greetpb.LongGreetRequest{ &greetpb.LongGreetRequest{ Greeting: &greetpb.Greeting{ FirstName: "Stephane", }, }, &greetpb.LongGreetRequest{ Greeting: &greetpb.Greeting{ FirstName: "John", }, }, &greetpb.LongGreetRequest{ Greeting: &greetpb.Greeting{ FirstName: "Lucy", }, }, &greetpb.LongGreetRequest{ Greeting: &greetpb.Greeting{ FirstName: "Mark", }, }, &greetpb.LongGreetRequest{ Greeting: &greetpb.Greeting{ FirstName: "Piper", }, }, } stream, err := c.LongGreet(context.Background()) if err != nil { log.Fatalf("error while calling LongGreet: %v", err) } // we iterate over our slice and send each message individually for _, req := range requests { fmt.Printf("Sending req: %v\n", req) stream.Send(req) time.Sleep(1000 * time.Millisecond) } res, err := stream.CloseAndRecv() if err != nil { log.Fatalf("error while receiving response from LongGreet: %v", err) } fmt.Printf("LongGreet Response: %v\n", res) } // Bi Directional Streaming func doBiDiStreaming(c greetpb.GreetServiceClient) { fmt.Println("Starting to do a BiDi Streaming RPC...") // we create a stream by invoking the client stream, err := c.GreetEveryone(context.Background()) if err != nil { log.Fatalf("Error while creating stream: %v", err) return } requests := []*greetpb.GreetEveryoneRequest{ &greetpb.GreetEveryoneRequest{ Greeting: &greetpb.Greeting{ FirstName: "Stephane", }, }, &greetpb.GreetEveryoneRequest{ Greeting: &greetpb.Greeting{ FirstName: "John", }, }, &greetpb.GreetEveryoneRequest{ Greeting: &greetpb.Greeting{ FirstName: "Lucy", }, }, &greetpb.GreetEveryoneRequest{ Greeting: &greetpb.Greeting{ FirstName: "Mark", }, }, &greetpb.GreetEveryoneRequest{ Greeting: &greetpb.Greeting{ FirstName: "Piper", }, }, } waitc := make(chan struct{}) // we send a bunch of messages to the client (go routine) go func() { // function to send a bunch of messages for _, req := range requests { fmt.Printf("Sending message: %v\n", req) stream.Send(req) time.Sleep(1000 * time.Millisecond) } stream.CloseSend() }() // we receive a bunch of messages from the client (go routine) go func() { // function to receive a bunch of messages for { res, err := stream.Recv() if err == io.EOF { break } if err != nil { log.Fatalf("Error while receiving: %v", err) break } fmt.Printf("Received: %v\n", res.GetResult()) } close(waitc) }() // block until everything is done <-waitc } 参考資料
- 投稿日:2022-02-01T22:22:55+09:00
M1 Mac も速くないことがある
これは何? 先日まで Mid 2015 の 15 inch MacBook Pro (Core i7 クアッド / 2.2 GHz) を使っていた。 先日 MacBook Pro 14 inch (M1 非Max) を手に入れたんだけど、あんまり速くないなと思うことがあったので、今日も楽しいマイクロベンチマーク。 計算内容 ruby で書くと短くていいね。 ruby N=10000 r=(1..N).max_by{ |x| ((N-x)**x/7) % 6074001001 } p r こういう内容。なんの意味もない。 出力は 8663 となれば正解。 これを、go, java, c++ with boost (clang, gcc), ruby, python3, node で試した。 以降、グラフで出てくる "m1", "rosetta", "amd64" の意味は下表のとおり。 記号 実行ハードウェア バイナリ m1 MacBook Pro 14 inch (M1 非Max) arm64 rosetta MacBook Pro 14 inch (M1 非Max) x86_64 amd64 MacBook Pro (Core i7, Mid 2015) x86_64 コンパイルするチーム go, java, c++ with boost (clang, g++-11)。 各コンパイラは下記の通り go version go1.17.5 darwin/arm64 openjdk 17.0.1 2021-10-19 LTS Apple clang version 13.0.0 (clang-1300.0.29.30) g++-11 (Homebrew GCC 11.2.0_3) 11.2.0 java と clang の rosetta はサボった。 結果は下記の通り。 time コマンドの real の値を出しているので棒が短いほど速い。 ちなみに、 real なのは並列実行を優遇するため。実際、 Java は user が real の 1.5倍ぐらいある。 結果は下図。 目盛りを見ると分かる通り、 go が速い。意外と clang が M1 を使いこなせてない感じ。 全体的にはまあそうだよねという結果だと思う。 コンパイルしないチーム 続いて、 ruby, python3, node。 各環境は下記の通り ruby 3.1.0p0 (2021-12-25 revision fb4df44d16) [arm64-darwin21] / for m1 ruby 3.0.3p157 (2021-11-24 revision 3fb7d2cadc) [x86_64-darwin21] / for rosetta ruby 3.1.0p0 (2021-12-25 revision fb4df44d16) [x86_64-darwin21] / for amd64 Python 3.9.10 (main, Jan 15 2022, 11:40:53) / for m1 Python 3.9.10 (main, Jan 15 2022, 11:48:04) / for rosetta Python 3.9.10 (main, Jan 15 2022, 11:48:04) / for amd64 node v17.3.0 / for m1 and rosetta node v17.3.1 / for amd64 なんか ruby と node のバージョンが合ってないけど気にしない。 結果は下図。 こちらはわりと思いがけなかった。 node はまあまあそうだよねという内容。m1 と amd64 の差はもうちょっとあってもいいかなと思うけど。 python3 は、三者ほぼ同タイム。 そして ruby は m1 が一番遅いという意外な展開。よく見てみると、m1 が遅いというより、amd64 が速すぎる。amd64 の中では go と並んでほぼ最速。m1 が遅いと書いたもののそれは ruby 内の比較の話。m1 内での比較だと Java と同等、clang より速い。node には負けるけど。 ruby や python で m1 がふるわないのは、おそらく、x86_64 バイナリは SSEとかをたっぷり使っていて、ARM64 バイナリは NEONとかを使いこなせてないんだろうと想像する。調べてないので想像するだけ。 誤解なきよう ここでやっているのはマイクロベンチマーク。多倍長整数の特定の計算だけしかしていない。 「ruby は M1 でも遅いのか」という感想を持つべきではなくて「ruby は多倍長整数計算では M1 でも遅いことがあるのか」という感想が正解。 実際。 多倍長整数ではない計算をすると、私が試した範囲では全部、M1 は rosetta に圧勝する。 時間測定に使ったコード ruby # hash require "json" $ix=0 def foo(x) return { ($ix+=1).to_i=>($ix+=1).to_s } if x==0 foo(x-1).merge(foo(x-1)) end p foo((ARGV[0]||21).to_i).size ruby # json require "json" def foo(x) x.times.with_object({}){ |e,o| o[e] = JSON.parse(foo(x-1)) }.to_json end p foo((ARGV[0]||9).to_i).size ruby # eval long text def foo(s, n) return eval(s) if n==0 foo("(#{s})*2+(#{s})", n-1) end p foo("1", (ARGV[0]||21).to_i) ruby # many delete_at def foo(n) a=[*1..n] (1..).each do |ix| return a[0] if a.size==1 a.delete_at(ix % a.size) end end p foo((ARGV[0]||200000).to_i) ruby # float calc def foo(n) n.downto(1).sum{ |e| x=e.to_f (x+1)/(x**(x**0.1)) } end p foo((ARGV[0]||10000000).to_i) ruby # regexp def foo(n) s = "___"+(1..n).map{ |x| "o"*x }.join("___")+"___" s.scan(/(_(o(o+))(\2+)_)/).sum{ |e| e[0].size-2} end p foo((ARGV[0]||4096).to_i) ruby # deep flatten def foo(n) return [1] if n<1 [foo(n-1)*n]*n end p foo(ARGV[0] || 7).flatten.sum ruby # recursive fibo def foo(n) return n if n<2 foo(n-1) + foo(n-2) end p foo(ARGV[0] || 37) 上記の折りたたんであるコードを実行すると、下図のとおり、 M1 が勝つ。 まあそりゃそうだ、という話。 result.png まとめ M1 ネイティブでも rosetta 2 より遅いこともあるよ。 とはいえ。ほとんどの場合は M1 ネイティブは速いし、今遅いものもそのうち早くなるんじゃないかと思うよ(思うだけ)。