05.gRPC的metadata

gRPC 的 metadata (元信息) 类似于 HTTP 的 Header ,只是 gRPC 的 metadat 的 value 是一个数组。 HTTP 的 Header 生命周周期是一次 HTTP 请求;gRPC 的 metadata 的生命周期是一次 RPC 调用。

创建 metadata

metadata 的本质是一个 map ,其中 key 是 string 类型,而 value 是 []string。 同时要注意 metadata 中 key 是非大小写敏感的,也就是说 key 和 KEY , Key 是同一个 key。

google.golang.org/grpc/metadata包提供了metadata支持,创建 metadata的方法可以是以下两种

md := metadata.New(map[string]string{"key1":"value1","key2":"value2"})
md.Append("key","value1","value2")


md := metadata.Pairs(
"key1", "value1",
"key1", "value1.2", // "key1" 的值为 []string{"value1", "value1.2"}
"key2", "value2",
)

Unary RPC

客户端发送 metadata

Unary RCP 发送 metadata 的流程如下:

  • 创建 context.Context
  • 创建 metadata
  • 将 metadata 附加到 context
  • 使用 context 调用 RPC

示例代码如下

// 创建 context.Context
ctx, cancle := context.WithCancel(context.Background())
defer cancle()

// 创建 metadata
md := metadata.New(map[string]string{"client-key-1": "client-value-1", "client-key-2": "client-value-2"})

// 将 metadata 附加到 context
// 创建新的 Metadata 并添加到 context 中,如果 context 已有 metadata 会被覆盖掉
ctx = metadata.NewOutgoingContext(ctx, md)
// 将 key-value 对添加到已有的 context 中。如果对应的 context 没有 metadata,那么就会创建一个;
// 如果已有 metadata 了,那么就将数据添加到原来的 metadata
ctx = metadata.AppendToOutgoingContext(ctx, "client-key-3", "client-value-3")

// 使用 context 调用 RPC
r, err := c.Echo(ctx, &echo.EchoRequest{Msg: "hello"})

服务端接收 metadata

Unary RPC 服务端接收 metadata 非常简单,metadata.FromIncomingContext(ctx) 返回一个 metadata 和一个 bool 类型的标识

clientMD, exists := metadata.FromIncomingContext(ctx)
if exists {
    fmt.Println(clientMD)
}

服务端发送 metadata

Unary RPC 服务端发送 metadata 可以直接使用 grpc.SetHeadergrpc.SetTrailer 来实现

header := metadata.New(map[string]string{"server-header-key-1": "server-header-value-1", "server-header-key-2": "server-header-value-2"})
grpc.SetHeader(ctx, header)
trailer := metadata.New(map[string]string{"server-trailer-key-1": "server-trailer-value-1", "server-trailer-key-2": "server-trailer-value-2"})
grpc.SetTrailer(ctx, trailer)

客户端接收 metadata

Unary RPC 客户端接收 metadata 可以在调用 RCP 时,使用 grpc.Headergrpc.Trailer 创建 option 接收服务端返回的 Header 和 Trailer

var header, trailer metadata.MD
r, err := c.Echo(ctx, &echo.EchoRequest{Msg: "hello"}, grpc.Header(&header), grpc.Trailer(&trailer))
if err != nil {
    log.Fatalf("echo failed :%#v", err)
}
fmt.Println(header)
fmt.Println(trailer)

完整的示例

代码基于 grpc-testing v0.0.1 修改

服务端

func (e *EchoServer) Echo(ctx context.Context, par *echo.EchoRequest) (*echo.EchoResponse, error) {
	// 接收 metadata
	clientMD, exists := metadata.FromIncomingContext(ctx)
	if exists {
		fmt.Println(clientMD)
	}

	result := &echo.EchoResponse{
		Msg: par.GetMsg(),
	}

	// 发送 metadata
	header := metadata.New(map[string]string{"server-header-key-1": "server-header-value-1", "server-header-key-2": "server-header-value-2"})
	grpc.SetHeader(ctx, header)
	trailer := metadata.New(map[string]string{"server-trailer-key-1": "server-trailer-value-1", "server-trailer-key-2": "server-trailer-value-2"})
	grpc.SetTrailer(ctx, trailer)

	return result, nil
}

客户端

func main() {
	conn, err := grpc.Dial("127.0.0.1:8080", grpc.WithInsecure(), grpc.WithBlock())
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer conn.Close()
	c := echo.NewEchoClient(conn)
	// 创建 context.Context
	ctx, cancle := context.WithCancel(context.Background())
	defer cancle()

	// 创建 metadata
	md := metadata.New(map[string]string{"client-key-1": "client-value-1", "client-key-2": "client-value-2"})

	// 将 metadata 附加到 context
	// 创建新的 Metadata 并添加到 context 中,如果 context 已有 metadata 会被覆盖掉
	ctx = metadata.NewOutgoingContext(ctx, md)
	// 将 key-value 对添加到已有的 context 中。如果对应的 context 没有 metadata,那么就会创建一个;
	// 如果已有 metadata 了,那么就将数据添加到原来的 metadata
	ctx = metadata.AppendToOutgoingContext(ctx, "client-key-3", "client-value-3")

	// 使用 context 调用 RPC 并 接收服务端返回的 header 和 trailer
	var header, trailer metadata.MD
	r, err := c.Echo(ctx, &echo.EchoRequest{Msg: "hello"}, grpc.Header(&header), grpc.Trailer(&trailer))
	if err != nil {
		log.Fatalf("echo failed :%#v", err)
	}
	fmt.Println(header)
	fmt.Println(trailer)

	log.Print(r.GetMsg())
}

streaming RPC

客户端发送 metadata

流式 RCP 客户端发送 metadata 与 Unary RPC 相同

服务端接收 metadata

流式 RCP 服务端接收 metadata 与 Unary RPC 相似,区别在于 Unary RPC 直接使用参数中的 context ,流式 RPC 使用 ServerStreamContext

// 流式 RCP 服务端接收 metadata 
md, exists := metadata.FromIncomingContext(server.Context())
if exists {
    fmt.Println(md)
}

服务端发送 metadata

流式 RPC 服务端发送 metadata 也与 Unary RPC 相似,区别在于 Unary RPC 服务通过 grpc.setHeader()grpc.SetTrailer() 向 client 发送 header 和 trailer,而流式 RPC 使用 ServerStream 接口的 SendHeader()SetTrailer() 方法向 client 发送 header 和 trailer。

注意这里的 header 方法是 SendHeader ,不是 setHeader

// 发送 Header
header := metadata.New(map[string]string{"server-header-key-1": "server-header-value-1", "server-header-key-2": "server-header-value-2"})
server.SendHeader(header)

// 发送 trailer
trailer := metadata.New(map[string]string{"server-trailer-key-1": "server-trailer-value-1", "server-trailer-key-2": "server-trailer-value-2"})
server.SetTrailer(trailer)

客户端接收 metadata

流式 RPC 客户端接收 metadata 可以通过调用返回的 ClientStream 接口的 Header()Trailer() 方法接收。

// 接收 Header
header, err := stream.Header()

// 接收 Trailer
trailer := stream.Trailer()

完整的示例基于 grpc-testing v0.0.4 修改

服务端

func (e *EchoServer) Echo(server echo.Echo_EchoServer) error {
	// 接收 metadata
	md, exists := metadata.FromIncomingContext(server.Context())
	if exists {
		fmt.Println(md)
	}
	// 发送 Header
	header := metadata.New(map[string]string{"server-header-key-1": "server-header-value-1", "server-header-key-2": "server-header-value-2"})
	server.SendHeader(header)

	for {
		r, err := server.Recv()
		if err == nil {
			server.Send(&echo.EchoResponse{Msg: "return " + r.Msg})
			continue
		}
		if errors.Is(err, io.EOF) {
			break
		}
		return err
	}
	// 发送 trailer
	trailer := metadata.New(map[string]string{"server-trailer-key-1": "server-trailer-value-1", "server-trailer-key-2": "server-trailer-value-2"})
	server.SetTrailer(trailer)

	return nil
}

客户端

func main() {
	conn, err := grpc.Dial("127.0.0.1:8080", grpc.WithInsecure(), grpc.WithBlock())
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer conn.Close()
	c := echo.NewEchoClient(conn)

	// 创建 context.Context
	ctx, cancle := context.WithCancel(context.Background())
	defer cancle()

	// 创建 metadata
	md := metadata.New(map[string]string{"client-key-1": "client-value-1", "client-key-2": "client-value-2"})

	// 将 metadata 附加到 context
	// 创建新的 Metadata 并添加到 context 中,如果 context 已有 metadata 会被覆盖掉
	ctx = metadata.NewOutgoingContext(ctx, md)
	// 将 key-value 对添加到已有的 context 中。如果对应的 context 没有 metadata,那么就会创建一个;
	// 如果已有 metadata 了,那么就将数据添加到原来的 metadata
	ctx = metadata.AppendToOutgoingContext(ctx, "client-key-3", "client-value-3")

	r, err := c.Echo(ctx)
	if err != nil {
		log.Fatalf("echo failed :%#v", err)
	}

	// 接收服务端的 header
	fmt.Println(r.Header())
	for i := 0; i < 10; i++ {
		err = r.Send(&echo.EchoRequest{Msg: fmt.Sprintf("hello - %d", i)})
		if err != nil {
			log.Fatal("send msg error", err)
		}

		resp, err := r.Recv()
		if err != nil {
			log.Fatal("recv msg error", err)
		}
		fmt.Println(resp.Msg)
	}
	err = r.CloseSend()
	if err != nil {
		log.Fatal("close send error", err)
	}

	// 注意: 这里等待一秒是等服务端发送 trailer ,如果不等待,很可能接收不到trailer
	time.Sleep(1 * time.Second)
	fmt.Println(r.Trailer())
}

关于 Header 和 Trailer 的区别

在 Unary gRPC 调用中,Header 和 Trailer 几乎没有区别,都是一起从服务端发到客户端的

但是在 Stream gRPC 中,就有区别的了: Header会先发送,然后发送流数据,流数据结束后,才会发送 Trailer。如果在发送流数据之后,再调用 server.SendHeader,客户端是接收不到设置的metadata的。而在发送流数据之前调用 SetTrailer 并不会使 Trailer 先发送。

metadata发送二进制值

在 metadata 中,key 永远是 string 类型,但是 value 可以是 string 也可以是二进制数据。

为了在 metadata 中存储二进制数据,需要在 key 的后面加一个 -bin 后缀。具有 -bin 后缀的 key 所对应的 value 在创建 metadata 时会被编码(base64),收到的时候会被解码

// 客户端
md := metadata.New(map[string]string{"client-key-1-bin": string([]byte{0x0F, 0xFF})})

// 服务端
md, exists := metadata.FromIncomingContext(server.Context())
if exists {
	for k, v := range md {
		fmt.Printf("%s : %#v : %T\n", k, v, v)
	}
}

编码处理详见 : https://github.com/grpc/grpc-go/blob/master/internal/transport/http_util.go#L149