04.gRPC流式传输

前文的示例是 gRPC 的 Unary RPC , 即客户端发起一次 Request、服务端响应一次 Response ,这种方式适合传输结构化数据很小的场景,当数据量大或者需要不断传输数据时候,应用应该使用流式RPC( Stream RPC ) ,它允许应用边传输数据边处理。

流式RPC的实现可以分为 服务端流式RPC 客户端流式RPC 和 双向流式RPC ,可以跟据业务场景自行选择合适的方式。

服务端流式RPC

服务端流式 RPC 是指客户端发送一个请求,服务端返回一个流,客户端读取流中的消息,直到流结束。

示例

完整的代码参见: grpc-testing v0.0.2

修改上文示例中的 echo.proto,修改服务定义如下:

rpc Echo(EchoRequest) returns (stream EchoResponse) {}

生成gRPC代码

$ go generate ./...

修改服务端实现

func (e *EchoServer) Echo(par *echo.EchoRequest, echoserver echo.Echo_EchoServer) error {
	msg := par.GetMsg()
	for i := 0; i < 10; i++ {
		echoserver.Send(&echo.EchoResponse{Msg: msg})
	}
	return nil
}

修改客户端调用

func main() {
	conn, err := grpc.Dial("127.0.0.1:8080", grpc.WithInsecure(), grpc.WithBlock())
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer conn.Close()
	c := echo.NewEchoClient(conn)
	ctx, cancle := context.WithCancel(context.Background())
	defer cancle()
	r, err := c.Echo(ctx, &echo.EchoRequest{Msg: "hello"})
	if err != nil {
		log.Fatalf("echo failed :%#v", err)
	}
	for {
		resp, err := r.Recv()
		if err == nil {
			fmt.Println(resp.Msg)
			continue
		}

		if errors.Is(err, io.EOF) {
			break
		}
		log.Fatal("error", err)
	}
}

客户端流式RPC

客户端流式 RPC 是指客户端的向服务端发送流数据,在发送结束后,由服务端返回一个响应。

示例

完整代码参见: grpc-testing v0.0.3

修改上文示例中的 echo.proto,修改服务定义如下:

rpc Echo(stream EchoRequest) returns (EchoResponse) {}

生成gRPC代码

$ go generate ./...

修改服务端实现

func (e *EchoServer) Echo(server echo.Echo_EchoServer) error {
	var s string
	for {
		r, err := server.Recv()
		if err == nil {
			s = s + " " + r.Msg
			continue
		}
		if errors.Is(err, io.EOF) {
			break
		}
		return err
	}
	server.SendAndClose(&echo.EchoResponse{Msg: s})
	return nil
}

修改客户端调用

func main() {
	conn, err := grpc.Dial("127.0.0.1:8080", grpc.WithInsecure(), grpc.WithBlock())
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer conn.Close()
	c := echo.NewEchoClient(conn)
	ctx, cancle := context.WithCancel(context.Background())
	defer cancle()
	r, err := c.Echo(ctx)
	if err != nil {
		log.Fatalf("echo failed :%#v", err)
	}
	for i := 0; i < 10; i++ {
		err = r.Send(&echo.EchoRequest{Msg: fmt.Sprintf("hello - %d", i)})
		if err != nil {
			log.Fatal("send msg error", err)
		}
	}
	response, err := r.CloseAndRecv()
	if err != nil {
		log.Fatal("close and recv error", err)
	}
	fmt.Println(response.Msg)
}

双向流式RPC

双向流式 RPC (BIDI) 可以满足客户端和服务端使用读写流去发送和接收消息,两个流相互独立,客户端与服务端可以同时发送接收。

示例

完整代码参见: grpc-testing v0.0.4

修改上文示例中的 echo.proto,修改服务定义如下:

rpc Echo(stream EchoRequest) returns (stream EchoResponse) {}

修改服务端实现

func (e *EchoServer) Echo(server echo.Echo_EchoServer) error {
	for {
		r, err := server.Recv()
		if err == nil {
			server.Send(&echo.EchoResponse{Msg: "return " + r.Msg})
			continue
		}
		if errors.Is(err, io.EOF) {
			break
		}
		return err
	}
	return nil
}

修改客户端调用

func main() {
	conn, err := grpc.Dial("127.0.0.1:8080", grpc.WithInsecure(), grpc.WithBlock())
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer conn.Close()
	c := echo.NewEchoClient(conn)
	ctx, cancle := context.WithCancel(context.Background())
	defer cancle()
	r, err := c.Echo(ctx)
	if err != nil {
		log.Fatalf("echo failed :%#v", err)
	}
	for i := 0; i < 10; i++ {
		err = r.Send(&echo.EchoRequest{Msg: fmt.Sprintf("hello - %d", i)})
		if err != nil {
			log.Fatal("send msg error", err)
		}

		resp, err := r.Recv()
		if err != nil {
			log.Fatal("recv msg error", err)
		}
		fmt.Println(resp.Msg)
	}
	err = r.CloseSend()
	if err != nil {
		log.Fatal("close send error", err)
	}
}