04.gRPC流式传输
前文的示例是 gRPC 的 Unary RPC , 即客户端发起一次 Request、服务端响应一次 Response ,这种方式适合传输结构化数据很小的场景,当数据量大或者需要不断传输数据时候,应用应该使用流式RPC( Stream RPC ) ,它允许应用边传输数据边处理。
流式RPC的实现可以分为 服务端流式RPC 客户端流式RPC 和 双向流式RPC ,可以跟据业务场景自行选择合适的方式。
服务端流式RPC
服务端流式 RPC 是指客户端发送一个请求,服务端返回一个流,客户端读取流中的消息,直到流结束。
示例
完整的代码参见: grpc-testing v0.0.2
修改上文示例中的 echo.proto
,修改服务定义如下:
rpc Echo(EchoRequest) returns (stream EchoResponse) {}
生成gRPC代码
$ go generate ./...
修改服务端实现
func (e *EchoServer) Echo(par *echo.EchoRequest, echoserver echo.Echo_EchoServer) error {
msg := par.GetMsg()
for i := 0; i < 10; i++ {
echoserver.Send(&echo.EchoResponse{Msg: msg})
}
return nil
}
修改客户端调用
func main() {
conn, err := grpc.Dial("127.0.0.1:8080", grpc.WithInsecure(), grpc.WithBlock())
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
c := echo.NewEchoClient(conn)
ctx, cancle := context.WithCancel(context.Background())
defer cancle()
r, err := c.Echo(ctx, &echo.EchoRequest{Msg: "hello"})
if err != nil {
log.Fatalf("echo failed :%#v", err)
}
for {
resp, err := r.Recv()
if err == nil {
fmt.Println(resp.Msg)
continue
}
if errors.Is(err, io.EOF) {
break
}
log.Fatal("error", err)
}
}
客户端流式RPC
客户端流式 RPC 是指客户端的向服务端发送流数据,在发送结束后,由服务端返回一个响应。
示例
完整代码参见: grpc-testing v0.0.3
修改上文示例中的 echo.proto
,修改服务定义如下:
rpc Echo(stream EchoRequest) returns (EchoResponse) {}
生成gRPC代码
$ go generate ./...
修改服务端实现
func (e *EchoServer) Echo(server echo.Echo_EchoServer) error {
var s string
for {
r, err := server.Recv()
if err == nil {
s = s + " " + r.Msg
continue
}
if errors.Is(err, io.EOF) {
break
}
return err
}
server.SendAndClose(&echo.EchoResponse{Msg: s})
return nil
}
修改客户端调用
func main() {
conn, err := grpc.Dial("127.0.0.1:8080", grpc.WithInsecure(), grpc.WithBlock())
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
c := echo.NewEchoClient(conn)
ctx, cancle := context.WithCancel(context.Background())
defer cancle()
r, err := c.Echo(ctx)
if err != nil {
log.Fatalf("echo failed :%#v", err)
}
for i := 0; i < 10; i++ {
err = r.Send(&echo.EchoRequest{Msg: fmt.Sprintf("hello - %d", i)})
if err != nil {
log.Fatal("send msg error", err)
}
}
response, err := r.CloseAndRecv()
if err != nil {
log.Fatal("close and recv error", err)
}
fmt.Println(response.Msg)
}
双向流式RPC
双向流式 RPC (BIDI) 可以满足客户端和服务端使用读写流去发送和接收消息,两个流相互独立,客户端与服务端可以同时发送接收。
示例
完整代码参见: grpc-testing v0.0.4
修改上文示例中的 echo.proto
,修改服务定义如下:
rpc Echo(stream EchoRequest) returns (stream EchoResponse) {}
修改服务端实现
func (e *EchoServer) Echo(server echo.Echo_EchoServer) error {
for {
r, err := server.Recv()
if err == nil {
server.Send(&echo.EchoResponse{Msg: "return " + r.Msg})
continue
}
if errors.Is(err, io.EOF) {
break
}
return err
}
return nil
}
修改客户端调用
func main() {
conn, err := grpc.Dial("127.0.0.1:8080", grpc.WithInsecure(), grpc.WithBlock())
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
c := echo.NewEchoClient(conn)
ctx, cancle := context.WithCancel(context.Background())
defer cancle()
r, err := c.Echo(ctx)
if err != nil {
log.Fatalf("echo failed :%#v", err)
}
for i := 0; i < 10; i++ {
err = r.Send(&echo.EchoRequest{Msg: fmt.Sprintf("hello - %d", i)})
if err != nil {
log.Fatal("send msg error", err)
}
resp, err := r.Recv()
if err != nil {
log.Fatal("recv msg error", err)
}
fmt.Println(resp.Msg)
}
err = r.CloseSend()
if err != nil {
log.Fatal("close send error", err)
}
}