10.gRPC的客户端负载均衡

gRPC 提供了关于 gRPC 负载均衡方案 Load Balancing in gRPC 的定义,此方案是为 gRPC 设计实现的。本文参考 gRPC 文档,简单介绍 gRPC 实现自定义负载均衡。

gRPC 中的负载均衡基于每次 gRPC 调用,而不是基于每个客户端连接,即即使请求仅来自一个客户端,系统仍然希望在所有服务器之间进行负载平衡。

gRPC-Go 内置了 pick_first , round_robin 。

  • pick_first:尝试逐个连接客户端地址,如果某一个地址连接成功,则将其用于所有RPC,如果所有的都失败,则报告错误。
  • round_robin:连接所有的地址,并依次向每个可用的后端发送 RPC 请求。

在上一节的示例客户端代码中,使用了 grpc.WithDefaultServiceConfig("{\"loadBalancingPolicy\":\"round_robin\"}"), 配置使用 round_robin 的负载均衡策略。本节将自定义一个负载均衡策略。

gRPC 自定义负载均衡策略

gRPC 在 balancer 包中提供了 func Register(b Builder) 函数用于注册自定义负载均衡器,同时,balancer/base 包提供了 func NewBalancerBuilder(name string, pb PickerBuilder, config Config) balancer.Builder 函数用于快速构建 balancer.Builder,在实际的开发过程中,只需要实现 base.PickerBuilderbalancer.Picker 接口,即可实现自定义的负载均衡器

// PickerBuilder creates balancer.Picker.
type PickerBuilder interface {
	// Build returns a picker that will be used by gRPC to pick a SubConn.
	Build(info PickerBuildInfo) balancer.Picker
}


// Picker is used by gRPC to pick a SubConn to send an RPC.
// Balancer is expected to generate a new picker from its snapshot every time its
// internal state has changed.
//
// The pickers used by gRPC can be updated by ClientConn.UpdateState().
type Picker interface {
	// Pick returns the connection to use for this RPC and related information.
	//
	// Pick should not block.  If the balancer needs to do I/O or any blocking
	// or time-consuming work to service this call, it should return
	// ErrNoSubConnAvailable, and the Pick call will be repeated by gRPC when
	// the Picker is updated (using ClientConn.UpdateState).
	//
	// If an error is returned:
	//
	// - If the error is ErrNoSubConnAvailable, gRPC will block until a new
	//   Picker is provided by the balancer (using ClientConn.UpdateState).
	//
	// - If the error is a status error (implemented by the grpc/status
	//   package), gRPC will terminate the RPC with the code and message
	//   provided.
	//
	// - For all other errors, wait for ready RPCs will wait, but non-wait for
	//   ready RPCs will be terminated with this error's Error() string and
	//   status code Unavailable.
	Pick(info PickInfo) (PickResult, error)
}

示例

本节的示例 代码基于 grpc-testing v0.0.1 实现。

在服务端启动时,向 etcd 注册本服务,并附带一个 color 标签用于标示颜色,在客户端实现了 etcd 的 naming resolver 和 一个 color 负载均衡器,把 context 包中带有 color标签的流量,分发到指定颜色的节点。同颜色的节点没有实现负载均衡,而是使用 pick_first 的策略。

服务端

package main

import (
	"app/grpc-testing-server/api/echo"
	"context"
	"encoding/json"
	"flag"
	"fmt"
	"log"
	"net"
	"time"

	"github.com/google/uuid"
	clientv3 "go.etcd.io/etcd/client/v3"
	"google.golang.org/grpc"
)

type EchoServer struct {
	echo.UnimplementedEchoServer
}

func (e *EchoServer) Echo(ctx context.Context, par *echo.EchoRequest) (*echo.EchoResponse, error) {
	fmt.Println(par.GetMsg())
	result := &echo.EchoResponse{
		Msg: par.GetMsg(),
	}
	return result, nil
}

var (
	// 服务地址,端口,染色
	ip    = "127.0.0.1"
	port  = flag.Int("p", 8080, "port")
	color = flag.String("c", "", "color")

	etcd        = []string{"127.0.0.1:12379", "127.0.0.1:22379", "127.0.0.1:32379"}
	lease int64 = 10
)

// 服务信息
type ServerInfo struct {
	Address string `json:"address"`
	Port    int    `json:"port"`
	Color   string `json:"color"`
}

func main() {
	log.Println("Go")
	flag.Parse()

	// 创建 etcd client
	cfg := clientv3.Config{
		Endpoints:   etcd,
		DialTimeout: time.Minute,
	}
	var etcdClient *clientv3.Client
	var err error
	if etcdClient, err = clientv3.New(cfg); err != nil {
		log.Fatal("create etcd client failed")
	}
	defer etcdClient.Close()

	//设置租约时间
	grant, err := etcdClient.Grant(context.Background(), lease)
	if err != nil {
		log.Fatal("craete grant failed", err)
	}
	// put key
	key := "/service/echo/" + uuid.New().String()

	serverInfo := ServerInfo{
		Address: ip,
		Port:    *port,
		Color:   *color,
	}

	value, err := json.Marshal(serverInfo)
	if err != nil {
		log.Fatal(err)
	}

	_, err = etcdClient.Put(context.Background(), key, string(value), clientv3.WithLease(grant.ID))
	if err != nil {
		log.Fatal("put k-v failed", err)
	}
	// 定时续约
	leaseRespChan, err := etcdClient.KeepAlive(context.Background(), grant.ID)
	if err != nil {
		log.Fatal("keep alive failed", err)
	}
	go func(c <-chan *clientv3.LeaseKeepAliveResponse) {
		for v := range c {
			fmt.Println("续约成功", v.ResponseHeader)
		}
		log.Println("停止续约")
	}(leaseRespChan)

	// 程序退出时,停止续约
	defer func() {
		etcdClient.Revoke(context.Background(), grant.ID)
		log.Println("停止续约")
	}()

	listen, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}
	s := grpc.NewServer()
	echo.RegisterEchoServer(s, &EchoServer{})
	if err := s.Serve(listen); err != nil {
		log.Fatalf("failed to serve: %v", err)
	}
}

客户端

package main

import (
	"app/grpc-testing-client/api/echo"
	"context"
	"encoding/json"
	"errors"
	"fmt"
	"log"
	"time"

	clientv3 "go.etcd.io/etcd/client/v3"
	"google.golang.org/grpc"
	"google.golang.org/grpc/attributes"
	"google.golang.org/grpc/balancer"
	"google.golang.org/grpc/balancer/base"
	"google.golang.org/grpc/metadata"
	"google.golang.org/grpc/resolver"
)

// 实现 resolver.Builder
type EtcdBuilder struct {
	EtcdEndpoints []string
}

// -----------------------------------------------------------------------------

func (b *EtcdBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
	fmt.Printf("build Scheme : %s Authority : %s Endpoint : %s \n", target.Scheme, target.Authority, target.Endpoint)
	// 创建 Etcd 客户端连接
	cfg := clientv3.Config{
		Endpoints:   b.EtcdEndpoints,
		DialTimeout: time.Minute,
	}
	var etcdClient *clientv3.Client
	var err error
	if etcdClient, err = clientv3.New(cfg); err != nil {
		log.Println("create etcd client failed")
		return nil, err
	}
	// 创建 Resolver
	r := ServiceResolver{
		target:     target,
		cc:         cc,
		EtcdClient: etcdClient,
	}
	r.ResolveNow(resolver.ResolveNowOptions{})
	return &r, nil
}

func (b *EtcdBuilder) Scheme() string {
	return "grpclb"
}

type ServerInfo struct {
	Address string `json:"address"`
	Port    int    `json:"port"`
	Color   string `json:"color"`
}

// 实现 resolver.Resolver 接口
type ServiceResolver struct {
	target     resolver.Target
	cc         resolver.ClientConn
	EtcdClient *clientv3.Client
}

func (r *ServiceResolver) ResolveNow(opts resolver.ResolveNowOptions) {
	// 获取节点列表
	resp, err := r.EtcdClient.Get(context.Background(), fmt.Sprintf("/service/%s", r.target.Endpoint), clientv3.WithPrefix())
	if err != nil {
		log.Println("get service error", err)
	}
	var Address = make([]resolver.Address, 0, resp.Count)
	for _, v := range resp.Kvs {
		fmt.Println(string(v.Key), string(v.Value))
		var serverInfo ServerInfo
		if err := json.Unmarshal(v.Value, &serverInfo); err != nil {
			log.Println(err)
			continue
		}
		// 这里为 Addr 实体添加了 Attributes , 在负载均衡器器使用颜色来分发流量
        attr := attributes.New("color", serverInfo.Color)
		Address = append(Address, resolver.Address{Addr: fmt.Sprintf("%s:%d", serverInfo.Address, serverInfo.Port), Attributes: attr})
	}
	r.cc.UpdateState(resolver.State{
		Addresses: Address,
	})
}

func (r *ServiceResolver) Close() {
	r.EtcdClient.Close()
}

// -----------------------------------------------------------------------------

// 自定义负载均衡器Builder

type ColorBalancerBuilder struct{}

func (cpb ColorBalancerBuilder) Build(info base.PickerBuildInfo) balancer.Picker {
	if len(info.ReadySCs) == 0 {
		return base.NewErrPicker(balancer.ErrNoSubConnAvailable)
	}
	var scs = map[string][]balancer.SubConn{}
	for k, v := range info.ReadySCs {
		color, ok := v.Address.Attributes.Value("color").(string)
		if ok {
			scs[color] = append(scs[color], k)
		} else {
			scs[""] = append(scs[""], k)
		}
	}
	return &ColorBalancer{scs: scs}

}

// 负载均衡器
type ColorBalancer struct {
	scs map[string][]balancer.SubConn
}

func (cb *ColorBalancer) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
    // 获取请求流量颜色
	md, ok := metadata.FromOutgoingContext(info.Ctx)
	color := ""
	if ok {
		colors := md.Get("color")
		if len(colors) != 0 {
			color = colors[0]
		}
	}
    // 选择节点  这里可以添加容错逻辑,带颜色的节点不存在时,使用没有颜色的节点来处理流量
	sclist := cb.scs[color]
	if len(sclist) == 0 {
		return balancer.PickResult{}, errors.New("pick failed")
	}
	return balancer.PickResult{
		SubConn: sclist[0],
	}, nil
}

// -----------------------------------------------------------------------------

func main() {
	fmt.Println("go")

	// 创建 resolver.Builder
	b := &EtcdBuilder{
		EtcdEndpoints: []string{"127.0.0.1:12379", "127.0.0.1:22379", "127.0.0.1:32379"},
	}
	// 注册 naming resolver
	resolver.Register(b)

	// 注册 负载均衡器
	balancer.Register(base.NewBalancerBuilder("color", ColorBalancerBuilder{}, base.Config{HealthCheck: false}))

	// 修改连接的地址,使用自定义的 name resolver
	conn, err := grpc.Dial(fmt.Sprintf("%s:///%s", b.Scheme(), "echo"),
		grpc.WithInsecure(),
		// 配置 loadBalancing 策略
		grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"color"}`),
	)

	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer conn.Close()

	for i := 0; i < 10; i++ {
		c := echo.NewEchoClient(conn)
		ctx, cancle := context.WithCancel(context.Background())
		defer cancle()
        // 为流量添加颜色
		ctx = metadata.NewOutgoingContext(ctx, metadata.New(map[string]string{"color": "green"}))
		r, err := c.Echo(ctx, &echo.EchoRequest{Msg: "hello"})
		if err != nil {
			log.Fatalf("echo failed :%#v", err)
		}
		log.Print(r.GetMsg())
		time.Sleep(time.Second)
	}

	for i := 0; i < 10; i++ {
		c := echo.NewEchoClient(conn)
		ctx, cancle := context.WithCancel(context.Background())
		defer cancle()

		ctx = metadata.NewOutgoingContext(ctx, metadata.New(map[string]string{"color": "red"}))
		r, err := c.Echo(ctx, &echo.EchoRequest{Msg: "hello"})
		if err != nil {
			log.Fatalf("echo failed :%#v", err)
		}
		log.Print(r.GetMsg())
		time.Sleep(time.Second)
	}
}