06.gRPC的拦截器
gRPC 提供了 interceptor (拦截器) 功能,功能类似 middleware 。 拦截器可以用于处理一些公共的或者需要在业务开始前处理的工作。
gRPC 的拦截器
gRPC的拦截器分为:服务端 Unary 拦截器 ( UnaryServerInterceptor
) ,客户端 Unary 拦截器 ( UnaryClientInterceptor
) ,服务端 Streaming 拦截器 ( StreamServerInterceptor
) ,客户端 Streaming 拦截器 ( StreamClientInterceptor
) 。分别定义如下:
type UnaryServerInterceptor func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error)
type UnaryClientInterceptor func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error
type StreamServerInterceptor func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error
type StreamClientInterceptor func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error)
Unary RPC 添加拦截器
服务端
对于 Unary RPC 服务端,添加拦截器可以在启动 gRPC 服务时,使用 grpc.UnaryInterceptor
和 grpc.ChainUnaryInterceptor
创建 ServerOption,做为参数传递给 grpc.NewServer
方法。
示例代码:
// 定义两个 server Interceptor
func AuthInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
fmt.Println("auth intercept")
md, ok := metadata.FromIncomingContext(ctx)
if ok {
// 假装有鉴权
fmt.Println(md)
} else {
return nil, errors.New("no metadata")
}
return handler(ctx, req)
}
func LogInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
fmt.Println("log intercept")
fmt.Println(info.FullMethod)
return handler(ctx, req)
}
// main 函数中启动 Server 时添加 Option
// UnaryInterceptor 创建单个的 拦截器
// ChainUnaryInterceptor 创建一个链式的拦截器链
// 如果两个 Option 都添加,则 UnaryInterceptor 创建的拦截器先被调用, ChainUnaryInterceptor 创建的拦截器后被调用
s := grpc.NewServer(
grpc.UnaryInterceptor(AuthInterceptor),
grpc.ChainUnaryInterceptor(
AuthInterceptor,
LogInterceptor,
),
)
客户端
对于 Unary RPC 客户端,添加拦截器的方法是在创建 ClientConn 的时候 ,使用 WithUnaryInterceptor
和 WithChainUnaryInterceptor
创建Option,做为参数传递给 grpc.Dial
方法。
// 定义两个 Client Interceptor
func LogInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
fmt.Println("call " + method)
return invoker(ctx, method, req, reply, cc, opts...)
}
func AppendInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
fmt.Println("append interceptor")
ctx = metadata.NewOutgoingContext(ctx, metadata.New(map[string]string{"app_id": "123456"}))
return invoker(ctx, method, req, reply, cc, opts...)
}
// main 函数中创建 ClientConn 时添加 Option
// UnaryInterceptor 创建单个的 拦截器
// ChainUnaryInterceptor 创建一个链式的拦截器链
// 如果两个 Option 都添加,则 WithUnaryInterceptor 创建的拦截器先被调用, WithChainUnaryInterceptor 创建的拦截器后被调用
conn, err := grpc.Dial(
"127.0.0.1:8080",
grpc.WithInsecure(),
grpc.WithBlock(),
grpc.WithUnaryInterceptor(LogInterceptor),
grpc.WithChainUnaryInterceptor(
LogInterceptor,
AppendInterceptor,
),
)
Streaming RCP 添加拦截器
Steaming RPC 添加拦截器与 Unary RPC 差不多,区别在于拦截器方法的定义以及添加拦截器的函数变更为 grpc.StreamInterceptor
, grpc.ChainStreamInterceptor
, grpc.WithStreamInterceptor
, grpc.WithChainStreamInterceptor
。这里仅提供一个示例。
服务端
// 定义 Interceptor
func AuthInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
fmt.Println("Auth Interceptor")
md, ok := metadata.FromIncomingContext(ss.Context())
if ok {
fmt.Println(md)
}
return handler(srv, ss)
}
func LogInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
fmt.Println("Log Interceptor")
fmt.Printf("%s %t %t \n", info.FullMethod, info.IsClientStream, info.IsServerStream)
return handler(srv, ss)
}
// 创建服务
s := grpc.NewServer(
grpc.StreamInterceptor(AuthInterceptor),
grpc.ChainStreamInterceptor(AuthInterceptor, LogInterceptor),
)
客户端
// 定义 Interceptor
func LogInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
fmt.Println("Log Interceptor")
fmt.Printf("%s %s %t %t\n", method, desc.StreamName, desc.ClientStreams, desc.ServerStreams)
return streamer(ctx, desc, cc, method, opts...)
}
func AppendInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
fmt.Println("Append Interceptor")
metadata.NewOutgoingContext(ctx, metadata.New(map[string]string{"app_id": "123456"}))
return streamer(ctx, desc, cc, method, opts...)
}
// 创建客户端连接
conn, err := grpc.Dial(
"127.0.0.1:8080",
grpc.WithInsecure(),
grpc.WithBlock(),
grpc.WithStreamInterceptor(LogInterceptor),
grpc.WithChainStreamInterceptor(
LogInterceptor,
AppendInterceptor,
),
)
go-grpc-middleware
在 https://github.com/grpc-ecosystem/go-grpc-middleware 项目中,实现了很多 gRPC 中间件,都是使用 interceptor 来实现的,常用的 logging auth ratelimit retry opentracing recovery 已经实现好了,可以直接在开发过程中使用。