10.gRPC的客户端负载均衡
gRPC 提供了关于 gRPC 负载均衡方案 Load Balancing in gRPC 的定义,此方案是为 gRPC 设计实现的。本文参考 gRPC 文档,简单介绍 gRPC 实现自定义负载均衡。
gRPC 中的负载均衡基于每次 gRPC 调用,而不是基于每个客户端连接,即即使请求仅来自一个客户端,系统仍然希望在所有服务器之间进行负载平衡。
gRPC-Go 内置了 pick_first , round_robin 。
- pick_first:尝试逐个连接客户端地址,如果某一个地址连接成功,则将其用于所有RPC,如果所有的都失败,则报告错误。
- round_robin:连接所有的地址,并依次向每个可用的后端发送 RPC 请求。
在上一节的示例客户端代码中,使用了 grpc.WithDefaultServiceConfig("{\"loadBalancingPolicy\":\"round_robin\"}"),
配置使用 round_robin
的负载均衡策略。本节将自定义一个负载均衡策略。
gRPC 自定义负载均衡策略
gRPC 在 balancer
包中提供了 func Register(b Builder)
函数用于注册自定义负载均衡器,同时,balancer/base
包提供了 func NewBalancerBuilder(name string, pb PickerBuilder, config Config) balancer.Builder
函数用于快速构建 balancer.Builder,在实际的开发过程中,只需要实现 base.PickerBuilder
和 balancer.Picker
接口,即可实现自定义的负载均衡器
// PickerBuilder creates balancer.Picker.
type PickerBuilder interface {
// Build returns a picker that will be used by gRPC to pick a SubConn.
Build(info PickerBuildInfo) balancer.Picker
}
// Picker is used by gRPC to pick a SubConn to send an RPC.
// Balancer is expected to generate a new picker from its snapshot every time its
// internal state has changed.
//
// The pickers used by gRPC can be updated by ClientConn.UpdateState().
type Picker interface {
// Pick returns the connection to use for this RPC and related information.
//
// Pick should not block. If the balancer needs to do I/O or any blocking
// or time-consuming work to service this call, it should return
// ErrNoSubConnAvailable, and the Pick call will be repeated by gRPC when
// the Picker is updated (using ClientConn.UpdateState).
//
// If an error is returned:
//
// - If the error is ErrNoSubConnAvailable, gRPC will block until a new
// Picker is provided by the balancer (using ClientConn.UpdateState).
//
// - If the error is a status error (implemented by the grpc/status
// package), gRPC will terminate the RPC with the code and message
// provided.
//
// - For all other errors, wait for ready RPCs will wait, but non-wait for
// ready RPCs will be terminated with this error's Error() string and
// status code Unavailable.
Pick(info PickInfo) (PickResult, error)
}
示例
本节的示例 代码基于 grpc-testing v0.0.1 实现。
在服务端启动时,向 etcd 注册本服务,并附带一个 color
标签用于标示颜色,在客户端实现了 etcd 的 naming resolver 和 一个 color 负载均衡器,把 context 包中带有 color标签的流量,分发到指定颜色的节点。同颜色的节点没有实现负载均衡,而是使用 pick_first 的策略。
服务端
package main
import (
"app/grpc-testing-server/api/echo"
"context"
"encoding/json"
"flag"
"fmt"
"log"
"net"
"time"
"github.com/google/uuid"
clientv3 "go.etcd.io/etcd/client/v3"
"google.golang.org/grpc"
)
type EchoServer struct {
echo.UnimplementedEchoServer
}
func (e *EchoServer) Echo(ctx context.Context, par *echo.EchoRequest) (*echo.EchoResponse, error) {
fmt.Println(par.GetMsg())
result := &echo.EchoResponse{
Msg: par.GetMsg(),
}
return result, nil
}
var (
// 服务地址,端口,染色
ip = "127.0.0.1"
port = flag.Int("p", 8080, "port")
color = flag.String("c", "", "color")
etcd = []string{"127.0.0.1:12379", "127.0.0.1:22379", "127.0.0.1:32379"}
lease int64 = 10
)
// 服务信息
type ServerInfo struct {
Address string `json:"address"`
Port int `json:"port"`
Color string `json:"color"`
}
func main() {
log.Println("Go")
flag.Parse()
// 创建 etcd client
cfg := clientv3.Config{
Endpoints: etcd,
DialTimeout: time.Minute,
}
var etcdClient *clientv3.Client
var err error
if etcdClient, err = clientv3.New(cfg); err != nil {
log.Fatal("create etcd client failed")
}
defer etcdClient.Close()
//设置租约时间
grant, err := etcdClient.Grant(context.Background(), lease)
if err != nil {
log.Fatal("craete grant failed", err)
}
// put key
key := "/service/echo/" + uuid.New().String()
serverInfo := ServerInfo{
Address: ip,
Port: *port,
Color: *color,
}
value, err := json.Marshal(serverInfo)
if err != nil {
log.Fatal(err)
}
_, err = etcdClient.Put(context.Background(), key, string(value), clientv3.WithLease(grant.ID))
if err != nil {
log.Fatal("put k-v failed", err)
}
// 定时续约
leaseRespChan, err := etcdClient.KeepAlive(context.Background(), grant.ID)
if err != nil {
log.Fatal("keep alive failed", err)
}
go func(c <-chan *clientv3.LeaseKeepAliveResponse) {
for v := range c {
fmt.Println("续约成功", v.ResponseHeader)
}
log.Println("停止续约")
}(leaseRespChan)
// 程序退出时,停止续约
defer func() {
etcdClient.Revoke(context.Background(), grant.ID)
log.Println("停止续约")
}()
listen, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
s := grpc.NewServer()
echo.RegisterEchoServer(s, &EchoServer{})
if err := s.Serve(listen); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
客户端
package main
import (
"app/grpc-testing-client/api/echo"
"context"
"encoding/json"
"errors"
"fmt"
"log"
"time"
clientv3 "go.etcd.io/etcd/client/v3"
"google.golang.org/grpc"
"google.golang.org/grpc/attributes"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/resolver"
)
// 实现 resolver.Builder
type EtcdBuilder struct {
EtcdEndpoints []string
}
// -----------------------------------------------------------------------------
func (b *EtcdBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
fmt.Printf("build Scheme : %s Authority : %s Endpoint : %s \n", target.Scheme, target.Authority, target.Endpoint)
// 创建 Etcd 客户端连接
cfg := clientv3.Config{
Endpoints: b.EtcdEndpoints,
DialTimeout: time.Minute,
}
var etcdClient *clientv3.Client
var err error
if etcdClient, err = clientv3.New(cfg); err != nil {
log.Println("create etcd client failed")
return nil, err
}
// 创建 Resolver
r := ServiceResolver{
target: target,
cc: cc,
EtcdClient: etcdClient,
}
r.ResolveNow(resolver.ResolveNowOptions{})
return &r, nil
}
func (b *EtcdBuilder) Scheme() string {
return "grpclb"
}
type ServerInfo struct {
Address string `json:"address"`
Port int `json:"port"`
Color string `json:"color"`
}
// 实现 resolver.Resolver 接口
type ServiceResolver struct {
target resolver.Target
cc resolver.ClientConn
EtcdClient *clientv3.Client
}
func (r *ServiceResolver) ResolveNow(opts resolver.ResolveNowOptions) {
// 获取节点列表
resp, err := r.EtcdClient.Get(context.Background(), fmt.Sprintf("/service/%s", r.target.Endpoint), clientv3.WithPrefix())
if err != nil {
log.Println("get service error", err)
}
var Address = make([]resolver.Address, 0, resp.Count)
for _, v := range resp.Kvs {
fmt.Println(string(v.Key), string(v.Value))
var serverInfo ServerInfo
if err := json.Unmarshal(v.Value, &serverInfo); err != nil {
log.Println(err)
continue
}
// 这里为 Addr 实体添加了 Attributes , 在负载均衡器器使用颜色来分发流量
attr := attributes.New("color", serverInfo.Color)
Address = append(Address, resolver.Address{Addr: fmt.Sprintf("%s:%d", serverInfo.Address, serverInfo.Port), Attributes: attr})
}
r.cc.UpdateState(resolver.State{
Addresses: Address,
})
}
func (r *ServiceResolver) Close() {
r.EtcdClient.Close()
}
// -----------------------------------------------------------------------------
// 自定义负载均衡器Builder
type ColorBalancerBuilder struct{}
func (cpb ColorBalancerBuilder) Build(info base.PickerBuildInfo) balancer.Picker {
if len(info.ReadySCs) == 0 {
return base.NewErrPicker(balancer.ErrNoSubConnAvailable)
}
var scs = map[string][]balancer.SubConn{}
for k, v := range info.ReadySCs {
color, ok := v.Address.Attributes.Value("color").(string)
if ok {
scs[color] = append(scs[color], k)
} else {
scs[""] = append(scs[""], k)
}
}
return &ColorBalancer{scs: scs}
}
// 负载均衡器
type ColorBalancer struct {
scs map[string][]balancer.SubConn
}
func (cb *ColorBalancer) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
// 获取请求流量颜色
md, ok := metadata.FromOutgoingContext(info.Ctx)
color := ""
if ok {
colors := md.Get("color")
if len(colors) != 0 {
color = colors[0]
}
}
// 选择节点 这里可以添加容错逻辑,带颜色的节点不存在时,使用没有颜色的节点来处理流量
sclist := cb.scs[color]
if len(sclist) == 0 {
return balancer.PickResult{}, errors.New("pick failed")
}
return balancer.PickResult{
SubConn: sclist[0],
}, nil
}
// -----------------------------------------------------------------------------
func main() {
fmt.Println("go")
// 创建 resolver.Builder
b := &EtcdBuilder{
EtcdEndpoints: []string{"127.0.0.1:12379", "127.0.0.1:22379", "127.0.0.1:32379"},
}
// 注册 naming resolver
resolver.Register(b)
// 注册 负载均衡器
balancer.Register(base.NewBalancerBuilder("color", ColorBalancerBuilder{}, base.Config{HealthCheck: false}))
// 修改连接的地址,使用自定义的 name resolver
conn, err := grpc.Dial(fmt.Sprintf("%s:///%s", b.Scheme(), "echo"),
grpc.WithInsecure(),
// 配置 loadBalancing 策略
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"color"}`),
)
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
for i := 0; i < 10; i++ {
c := echo.NewEchoClient(conn)
ctx, cancle := context.WithCancel(context.Background())
defer cancle()
// 为流量添加颜色
ctx = metadata.NewOutgoingContext(ctx, metadata.New(map[string]string{"color": "green"}))
r, err := c.Echo(ctx, &echo.EchoRequest{Msg: "hello"})
if err != nil {
log.Fatalf("echo failed :%#v", err)
}
log.Print(r.GetMsg())
time.Sleep(time.Second)
}
for i := 0; i < 10; i++ {
c := echo.NewEchoClient(conn)
ctx, cancle := context.WithCancel(context.Background())
defer cancle()
ctx = metadata.NewOutgoingContext(ctx, metadata.New(map[string]string{"color": "red"}))
r, err := c.Echo(ctx, &echo.EchoRequest{Msg: "hello"})
if err != nil {
log.Fatalf("echo failed :%#v", err)
}
log.Print(r.GetMsg())
time.Sleep(time.Second)
}
}