09.gRPC的NameResolver

gRPC NameResolver 根据 name-system 选择对应的解析器,用以解析调用时提供的服务器名,最后返回具体地址列表(IP+端口号)。

gRPC 中的默认 name-system 是 DNS,同时在客户端以插件形式提供了自定义 name-system 的机制。

gRPC 自定义 NameResolver

本节的示例,使用 etcd 存储服务元数据(其实就是地址和端口,如果需要更多信息,可以定义结构体)。代码基于 grpc-testing v0.0.1 实现。

使用 docker-compose 启动 etcd

参见: 使用docker-compose构建etcd集群

服务端

在服务端启动时,向 etcd 注册自己,并定时续约。

// 服务的地址和端口
var ip = "127.0.0.1"
var port = flag.Int("p", 8080, "port")

// etcd endpoint
var etcd = []string{"127.0.0.1:12379", "127.0.0.1:22379", "127.0.0.1:32379"}

// 租期
var lease int64 = 10

func main() {
	log.Println("Go")
	flag.Parse()

	// 创建 etcd client
	cfg := clientv3.Config{
		Endpoints:   etcd,
		DialTimeout: time.Minute,
	}
	var etcdClient *clientv3.Client
	var err error
	if etcdClient, err = clientv3.New(cfg); err != nil {
		log.Fatal("create etcd client failed")
	}
	defer etcdClient.Close()

	//设置租约时间
	grant, err := etcdClient.Grant(context.Background(), lease)
	if err != nil {
		log.Fatal("craete grant failed", err)
	}
	// put key
	key := "/service/echo/" + uuid.New().String()
	value := fmt.Sprintf("%s:%d", ip, *port)
	_, err = etcdClient.Put(context.Background(), key, value, clientv3.WithLease(grant.ID))
	if err != nil {
		log.Fatal("put k-v failed", err)
	}
	// 定时续约
	leaseRespChan, err := etcdClient.KeepAlive(context.Background(), grant.ID)
	if err != nil {
		log.Fatal("keep alive failed", err)
	}
	go func(c <-chan *clientv3.LeaseKeepAliveResponse) {
		for v := range c {
			fmt.Println("续约成功", v.ResponseHeader)
		}
		log.Println("停止续约")
	}(leaseRespChan)

	// 程序退出时,停止续约
	defer func() {
		etcdClient.Revoke(context.Background(), grant.ID)
		log.Println("停止续约")
	}()

	listen, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}
	s := grpc.NewServer()
	echo.RegisterEchoServer(s, &EchoServer{})
	if err := s.Serve(listen); err != nil {
		log.Fatalf("failed to serve: %v", err)
	}
}

客户端

gRPC 提供了简单的 name resolver 方案,即实现需要的两个接口: resolver.Builder , resolver.Resolver 定义在 resolver/resolver.go 文件中。 在 Dial 前,可以使用 resolver.Register 函数注册自定义的 Buildresolver.Builder , resolver.Resolver 定义如下:

// Builder creates a resolver that will be used to watch name resolution updates.
type Builder interface {
	// Build creates a new resolver for the given target.
	//
	// gRPC dial calls Build synchronously, and fails if the returned error is
	// not nil.
	Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error)
	// Scheme returns the scheme supported by this resolver.
	// Scheme is defined at https://github.com/grpc/grpc/blob/master/doc/naming.md.
	Scheme() string
}

// Resolver watches for the updates on the specified target.
// Updates include address updates and service config updates.
type Resolver interface {
	// ResolveNow will be called by gRPC to try to resolve the target name
	// again. It's just a hint, resolver can ignore this if it's not necessary.
	//
	// It could be called multiple times concurrently.
	ResolveNow(ResolveNowOptions)
	// Close closes the resolver.
	Close()
}

客户端实现

// -----------------------------------------------------------------------------

// 实现 resolver.Builder
type EtcdBuilder struct {
	EtcdEndpoints []string
}

func (b *EtcdBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
	fmt.Printf("build Scheme : %s Authority : %s Endpoint : %s \n", target.Scheme, target.Authority, target.Endpoint)
	// 创建 Etcd 客户端连接
	cfg := clientv3.Config{
		Endpoints:   b.EtcdEndpoints,
		DialTimeout: time.Minute,
	}
	var etcdClient *clientv3.Client
	var err error
	if etcdClient, err = clientv3.New(cfg); err != nil {
		log.Println("create etcd client failed")
		return nil, err
	}
	// 创建 Resolver
	r := ServiceResolver{
		target:     target,
		cc:         cc,
		EtcdClient: etcdClient,
	}
	r.ResolveNow(resolver.ResolveNowOptions{})
	return &r, nil
}

func (b *EtcdBuilder) Scheme() string {
	return "grpclb"
}
// -----------------------------------------------------------------------------

// 实现 resolver.Resolver 接口
type ServiceResolver struct {
	target     resolver.Target
	cc         resolver.ClientConn
	EtcdClient *clientv3.Client
}

func (r *ServiceResolver) ResolveNow(opts resolver.ResolveNowOptions) {
	// 获取节点列表
	resp, err := r.EtcdClient.Get(context.Background(), fmt.Sprintf("/service/%s", r.target.Endpoint), clientv3.WithPrefix())
	if err != nil {
		log.Println("get service error", err)
	}
	var Address = make([]resolver.Address, 0, resp.Count)
	for _, v := range resp.Kvs {
		fmt.Println(string(v.Key), string(v.Value))
		Address = append(Address, resolver.Address{Addr: string(v.Value)})
	}
	r.cc.UpdateState(resolver.State{
		Addresses: Address,
	})
}

func (r *ServiceResolver) Close() {
	r.EtcdClient.Close()
}
// -----------------------------------------------------------------------------

func main() {
	fmt.Println("go")

	// 创建 resolver.Builder
	b := &EtcdBuilder{
		EtcdEndpoints: []string{"127.0.0.1:12379", "127.0.0.1:22379", "127.0.0.1:32379"},
	}
	// 注册 resolver.Builder
	resolver.Register(b)

	// 修改连接的地址,使用自定义的 name resolver
	conn, err := grpc.Dial(fmt.Sprintf("%s:///%s", b.Scheme(), "echo"),
		grpc.WithInsecure(),
		// 配置 loadBalancing 策略
		grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`),
	)

	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer conn.Close()

	for i := 0; i < 100; i++ {
		c := echo.NewEchoClient(conn)
		ctx, cancle := context.WithCancel(context.Background())
		defer cancle()
		r, err := c.Echo(ctx, &echo.EchoRequest{Msg: "hello"})
		if err != nil {
			log.Fatalf("echo failed :%#v", err)
		}
		log.Print(r.GetMsg())
		time.Sleep(time.Second)
	}
}