06.gRPC的拦截器

gRPC 提供了 interceptor (拦截器) 功能,功能类似 middleware 。 拦截器可以用于处理一些公共的或者需要在业务开始前处理的工作。

gRPC 的拦截器

gRPC的拦截器分为:服务端 Unary 拦截器 ( UnaryServerInterceptor ) ,客户端 Unary 拦截器 ( UnaryClientInterceptor ) ,服务端 Streaming 拦截器 ( StreamServerInterceptor ) ,客户端 Streaming 拦截器 ( StreamClientInterceptor ) 。分别定义如下:


type UnaryServerInterceptor func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error)

type UnaryClientInterceptor func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error

type StreamServerInterceptor func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error

type StreamClientInterceptor func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error)

Unary RPC 添加拦截器

服务端

对于 Unary RPC 服务端,添加拦截器可以在启动 gRPC 服务时,使用 grpc.UnaryInterceptorgrpc.ChainUnaryInterceptor 创建 ServerOption,做为参数传递给 grpc.NewServer 方法。

示例代码:


// 定义两个 server Interceptor
func AuthInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
	fmt.Println("auth intercept")
	md, ok := metadata.FromIncomingContext(ctx)
	if ok {
		// 假装有鉴权
		fmt.Println(md)
	} else {
		return nil, errors.New("no metadata")
	}
	return handler(ctx, req)
}

func LogInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
	fmt.Println("log intercept")
	fmt.Println(info.FullMethod)
	return handler(ctx, req)
}

// main 函数中启动 Server 时添加 Option
// UnaryInterceptor 创建单个的 拦截器
// ChainUnaryInterceptor 创建一个链式的拦截器链
// 如果两个 Option 都添加,则 UnaryInterceptor 创建的拦截器先被调用, ChainUnaryInterceptor 创建的拦截器后被调用 
s := grpc.NewServer(
    grpc.UnaryInterceptor(AuthInterceptor),
    grpc.ChainUnaryInterceptor(
        AuthInterceptor,
        LogInterceptor,
    ),
)

客户端

对于 Unary RPC 客户端,添加拦截器的方法是在创建 ClientConn 的时候 ,使用 WithUnaryInterceptorWithChainUnaryInterceptor 创建Option,做为参数传递给 grpc.Dial 方法。

// 定义两个 Client Interceptor
func LogInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
	fmt.Println("call " + method)
	return invoker(ctx, method, req, reply, cc, opts...)
}

func AppendInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
	fmt.Println("append interceptor")
	ctx = metadata.NewOutgoingContext(ctx, metadata.New(map[string]string{"app_id": "123456"}))
	return invoker(ctx, method, req, reply, cc, opts...)
}

// main 函数中创建 ClientConn 时添加 Option
// UnaryInterceptor 创建单个的 拦截器
// ChainUnaryInterceptor 创建一个链式的拦截器链
// 如果两个 Option 都添加,则 WithUnaryInterceptor 创建的拦截器先被调用, WithChainUnaryInterceptor 创建的拦截器后被调用 
conn, err := grpc.Dial(
    "127.0.0.1:8080",
    grpc.WithInsecure(),
    grpc.WithBlock(),
    grpc.WithUnaryInterceptor(LogInterceptor),
    grpc.WithChainUnaryInterceptor(
        LogInterceptor,
        AppendInterceptor,
    ),
)

Streaming RCP 添加拦截器

Steaming RPC 添加拦截器与 Unary RPC 差不多,区别在于拦截器方法的定义以及添加拦截器的函数变更为 grpc.StreamInterceptorgrpc.ChainStreamInterceptorgrpc.WithStreamInterceptorgrpc.WithChainStreamInterceptor。这里仅提供一个示例。

服务端

// 定义 Interceptor
func AuthInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
	fmt.Println("Auth Interceptor")
	md, ok := metadata.FromIncomingContext(ss.Context())
	if ok {
		fmt.Println(md)
	}
	return handler(srv, ss)
}

func LogInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
	fmt.Println("Log Interceptor")
	fmt.Printf("%s %t %t \n", info.FullMethod, info.IsClientStream, info.IsServerStream)
	return handler(srv, ss)
}

// 创建服务
s := grpc.NewServer(
    grpc.StreamInterceptor(AuthInterceptor),
    grpc.ChainStreamInterceptor(AuthInterceptor, LogInterceptor),
)

客户端

// 定义 Interceptor
func LogInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
	fmt.Println("Log Interceptor")
	fmt.Printf("%s %s %t %t\n", method, desc.StreamName, desc.ClientStreams, desc.ServerStreams)
	return streamer(ctx, desc, cc, method, opts...)
}

func AppendInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
	fmt.Println("Append Interceptor")
	metadata.NewOutgoingContext(ctx, metadata.New(map[string]string{"app_id": "123456"}))
	return streamer(ctx, desc, cc, method, opts...)
}

// 创建客户端连接
conn, err := grpc.Dial(
    "127.0.0.1:8080",
    grpc.WithInsecure(),
    grpc.WithBlock(),
    grpc.WithStreamInterceptor(LogInterceptor),
    grpc.WithChainStreamInterceptor(
        LogInterceptor,
        AppendInterceptor,
    ),
)

go-grpc-middleware

https://github.com/grpc-ecosystem/go-grpc-middleware 项目中,实现了很多 gRPC 中间件,都是使用 interceptor 来实现的,常用的 logging auth ratelimit retry opentracing recovery 已经实现好了,可以直接在开发过程中使用。