gRPC客户端详解

作者:寒歌

RPC框架的选择

常见的RPC框架主要分为轻重两种。较轻的框架一般只负责通信,如rmi、webservice、restful、Thrift、gRPC等。较重的框架一般包括完整的服务发现、负载均衡策略等等如BAT三家的Dubbo、brpc、Tars之类。

框架选择时个人认为首先要考虑的是框架的历史和项目的活跃程度。一个历史悠久的活跃项目(大概至少可以保证每两到三个月有一次小版本的更新)可以保证各种bug早已暴露并修复,让我们可以更专注于我们自己的项目本身,而不是要担心究竟是我们自己的代码有问题还是框架本身就有问题。

重量级RPC框架有一个主要问题就是结构复杂,另外主语言之外的代码质量也不太容易保证。个人认为活跃的社区以及一个活跃的开源管理团队是这些重型RPC框架项目成功的必要前提条件。比如我们项目组试用过腾讯的Tars,C++同学表示没有任何问题,然后JAVA同学表示java版本有许多bug,修复bug的pull request需要两个多月才能得到merge,而官方jar包也将近两年没有更新过了。

轻量级rpc框架中,restful可以被视作标杆。由于restful基于http协议,天然被各种框架支持,而且非常灵活。restful的缺点有两方面,一是过于灵活,缺少根据协议生成服务端和客户端代码的工具,联调往往要花更多的时间;二是大部分序列化基于json或者xml,相对来讲效率不理想。和restful相比,其它很多轻量级框架都有这样或者那样的缺点,有的缺少跨语言支持(rmi),有的既繁琐又缺乏效率优势(webservice)。个人认为其中相对理想的是gRPC和Thrift。

gRPC简介

Protobuf是一种google推出的非常流行的跨语言序列化/反序列化框架。在Protobuf2中就已经出现了用rpc定义服务的概念,但是一直缺少一种流行的rpc框架支持。当Http2推出之后,google将Http2和protobuf3结合,推出了gRPC。gRPC继承了Protobuf和Http2的优点,包括:

  • 序列化反序列化性能好
  • 强类型支持
  • 向前/向后兼容
  • 有代码生成机制,而且可以支持多语言
  • 长连接、多路复用

同时gRPC还提供了简单地服务发现和负载均衡功能。虽然这并不是gRPC框架的重点,但是开发者可以非常容易的自己扩展gRPC这些功能,实现自己的策略或应用最新的相关方面技术,而不用像重型RPC框架一样受制于框架本身是否支持。

gRPC与Thrift对比

Thrift是Facebook推出的一种RPC框架,从性能上来讲远优于gRPC。但是在实际调研时发现有一个很麻烦的问题:Thrift的客户端是线程不安全的——这意味着在Spring中无法以单例形式注入到Bean中。解决方案有三种:

  1. 每次调用创建一个Thrift客户端。这不仅意味着额外的对象创建和垃圾回收开销,而且实际上相当于只使用了短链接,这是一个开发复杂度最低但是从性能上来讲最差的解决方案。
  2. 利用Pool,稍微复杂一点的解决方案,但是也非常成熟。但是问题在于一来缺少服务发现和负载均衡恐实现,需要很多额外开发;二来需要创建Pool数量*服务端数量个客户端,内存开销会比较大。
  3. 使用异步框架如Netty,可以成功避免创建过多的客户端,但是仍要自己实现服务发现和负载均衡,相对复杂。实际上Facebook有一个基于Netty的Thrift客户端,叫Nifty,但是快四年没更新了。。。

相比较而言gRPC就友好多了,本身有简单而且可扩展的服务发现和负载均衡功能,底层基于Netty所以线程安全,在不需要极限压榨性能的情况下是非常好的选择。当然如果需要极限压榨性能Thrift也未必够看。

gRPC入门

gRPC服务定义

gRPC中有一个特殊的关键字stream,表示可以以流式输入或输出多个protobuf对象。注意只有异步非阻塞的客户端支持以stream形式输入,同步阻塞客户端不支持以stream形式输入。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
syntax = "proto3";  //gRPC必须使用proto3

option java_multiple_files = true;
option java_package = "cn.lmh.examples.grpc.proto";

service RouteGuide {
// 输入一个坐标,返回坐标和时间(1:1)
rpc getPoint(Point) returns (LocationNote) {}
// 输入一个矩形,以stream形式返回一系列点(1:n)
rpc listPoints(Rectangle) returns (stream Point) {}
// 以stream形式输入一系列点,返回点的数量和总共花费的时间(m:1)
rpc recordRoute(stream Point) returns (RouteSummary) {}
// 以stream形式输入一系列点,以stream形式返回已输入点的数量和总共花费的时间(m:n)
rpc getPointStream(stream Point) returns (stream RouteSummary) {}
}

message Point {
int32 latitude = 1;
int32 longitude = 2;
}
message Rectangle {
Point lo = 1;
Point hi = 2;
}
message LocationNote {
Point location = 1;
int64 timestamp = 2;
}
message RouteSummary {
int32 point_count = 1;
int64 elapsed_time = 2;
}

依赖和代码生成

由于protoc的gRPC插件需要自己编译,而且存在环境问题。推荐使用gradle或者maven的protobuf插件。入门示例项目使用了gradle,根目录build.gradle配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
plugins {
id 'java'
id 'idea'
id 'wrapper'
}

ext {
groupId = 'cn.lmh.leviathan'
proto = [
version : "3.9.0",
"grpc" :[
version : "1.23.0"
]
]
}

allprojects{
apply plugin: 'java'
apply plugin: 'idea'

sourceCompatibility=JavaVersion.VERSION_1_8
targetCompatibility=JavaVersion.VERSION_1_8

project.group = 'cn.lmh.examples'

compileJava.options.encoding = 'UTF-8'
}

subprojects{
repositories {
mavenCentral()
mavenLocal();
};
configurations {
compile
}

dependencies {
compile "io.grpc:grpc-netty-shaded:${proto.grpc.version}"
compile "io.grpc:grpc-protobuf:${proto.grpc.version}"
compile "io.grpc:grpc-stub:${proto.grpc.version}"

testCompile group: 'junit', name: 'junit', version: '4.12'
}
}

子项目build.gradle如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
plugins{
id 'com.google.protobuf' version '0.8.10' //引入protobuf插件
}

sourceSets{
main{
proto {
srcDir 'src/main/proto' //指定.proto文件所在的位置
}
}
}

protobuf {
generatedFilesBaseDir = "$projectDir/src" //生成文件的根目录

protoc {
artifact = "com.google.protobuf:protoc:${proto.version}" //protoc的版本
}

plugins {
grpc {
artifact = "io.grpc:protoc-gen-grpc-java:${proto.grpc.version}" //gRPC的版本
}
}

generateProtoTasks {
all()*.plugins {
grpc {
outputSubDir = "java" //grpc生成文件的子目录
}
}
}
}

我们的入门子项目名称叫做starter,配置好build.gradle之后,执行gradlew :starter:generateProto就可以在src/main/java下生成对应的文件:

gRPC生成的目录结构

服务端

无论客户端以异步非阻塞还是同步阻塞形式调用,gRPC服务端的Response都是异步形式。对于异步的Request或者Response,都需要实现gRPC的io.grpc.stub.StreamObserver接口。io.grpc.stub.StreamObserver接口有三个方法:

  • onNext:表示接收/发送一个对象
  • onError:处理异常
  • onCompleted:表示Request或Response结束

当Request发送到服务端端时,会异步调用requestObserver的onNext方法,直到结束时调用requestObserver的onCompleted方法;服务端调用responseObserver的onNext把Response返回给客户端,直到调用responseObserver的onCompleted方法通知客户端Response结束。服务端代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
public class RouteGuideServer {
private final int port;
private final Server server;

public RouteGuideServer(int port) throws IOException {
this.port = port;
server = ServerBuilder.forPort(port).addService(new RouteGuideService())
.build();
}

/**
* Start server.
*/
public void start() throws IOException {
server.start();
System.out.println("Server started, listening on " + port);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
RouteGuideServer.this.stop();
}
});
}

/**
* Stop server
*/
public void stop() {
if (server != null) {
server.shutdown();
}
}

/**
* Await termination on the main thread since the grpc library uses daemon threads.
*/
private void blockUntilShutdown() throws InterruptedException {
if (server != null) {
server.awaitTermination();
}
}

public static void main(String[] args) throws Exception {
RouteGuideServer server = new RouteGuideServer(8980);
server.start();
server.blockUntilShutdown();
}

private static class RouteGuideService extends RouteGuideGrpc.RouteGuideImplBase {
@Override
public void getPoint(Point request, StreamObserver<LocationNote> responseObserver) {
LocationNote value = LocationNote
.newBuilder()
.setLocation(request)
.setTimestamp(System.nanoTime())
.build();
responseObserver.onNext(value);
responseObserver.onCompleted();
}

@Override
public void listPoints(Rectangle request, StreamObserver<Point> responseObserver) {
int left = Math.min(request.getLo().getLongitude(), request.getHi().getLongitude());
int right = Math.max(request.getLo().getLongitude(), request.getHi().getLongitude());
int top = Math.max(request.getLo().getLatitude(), request.getHi().getLatitude());
int bottom = Math.max(request.getLo().getLatitude(), request.getHi().getLatitude());
for (int x = left; x <= right; x++) {
for (int y = top; y >= bottom; y--) {
Point point = Point.newBuilder().setLongitude(x).setLatitude(y).build();
responseObserver.onNext(point);
}
}
responseObserver.onCompleted();
}

@Override
public StreamObserver<Point> recordRoute(StreamObserver<RouteSummary> responseObserver) {
return new StreamObserver<Point>() { //返回的是requestObserver
AtomicInteger pointCount = new AtomicInteger(0);
final long startTime = System.nanoTime();

@Override
public void onNext(Point value) {
int count = pointCount.incrementAndGet();
}

@Override
public void onError(Throwable t) {
}

@Override
public void onCompleted() {
RouteSummary result = RouteSummary.newBuilder().setElapsedTime(System.nanoTime() - startTime).setPointCount(pointCount.get()).build();
responseObserver.onNext(result);
responseObserver.onCompleted();
}
};
}

@Override
public StreamObserver<Point> getPointStream(StreamObserver<RouteSummary> responseObserver) {
return new StreamObserver<Point>() { //返回的是requestObserver
AtomicInteger pointCount = new AtomicInteger(0);
final long startTime = System.nanoTime();

@Override
public void onNext(Point value) {
int count = pointCount.incrementAndGet();
RouteSummary result = RouteSummary.newBuilder().setElapsedTime(System.nanoTime() - startTime).setPointCount(count).build();
responseObserver.onNext(result);
}

@Override
public void onError(Throwable t) {
}

@Override
public void onCompleted() {
responseObserver.onCompleted();
}
};
}
}
}

客户端

gRPC的客户端有同步阻塞客户端(blockingStub)和异步非阻塞客户端(Stub)两种。同步客户端使用比较方便,但是性能较低,而且不支持stream形式的Request;异步客户端性能较高,支持stream形式的Request,但是如果想要以同步方式调用需要额外封装。本文将主要以异步为例。

异步转同步

由于gRPC的异步客户端性能较高且功能更完整,所以一般都会采用异步客户端。异步客户端接收到的Response也是以io.grpc.stub.StreamObserver形式。由于客户端的调用可能是在异步进程中但更可能是在同步进程中,所以就存在一个如何把gRPC异步Response转为同步Response的问题。

一个比较常见的思路是写一个io.grpc.stub.StreamObserver实现,里面有一个内置变量保存异步Response的结果,再添加一个阻塞式的get()方法,直到Response结束才把所有结果返回。要知道Response是否结束,需要添加一个Boolean或者AtomicBoolean变量,初始化为false,调用responseObserver.onCompleted()方法时设置为true,这样就可以通过这个变量判断Response是否结束。

阻塞get()方法最常见的思路是get()写一个while循环,直到变量值改为true才退出循环并返回结果。这种方式的优点是简单直接,任何语言都可以简单实现,缺点是由于使用循环可能CPU占用较高。而对于java这种多线程比较完善的语言,另一个比较好思路是Response结束前将线程挂起,当调用responseObserver.onCompleted()方法再唤醒线程。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class CallableStreamObserver<T> implements StreamObserver<T> {
List<T> values = new ArrayList<T>();
boolean isCompleted = false;
Throwable t = null;

@Override
public void onNext(T value) {
this.values.add(value);
}

@Override
public void onError(Throwable t) {
this.isCompleted = true;
notifyAll();
}

@Override
public synchronized void onCompleted() {
this.isCompleted = true;
notifyAll();
}

public List<T> get() throws Throwable {
if (!this.isCompleted) {
synchronized (this) {
this.wait(60 * 1000);
}
}
if (null != t) {
throw this.t;
} else {
return this.values;
}
}
}

客户端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public class RouteGuideClient {

private final ManagedChannel channel;
private final RouteGuideGrpc.RouteGuideBlockingStub blockingStub;
private final RouteGuideGrpc.RouteGuideStub asyncStub;

public RouteGuideClient(String host, int port) {
String target = "dns:///" + host + ":" + port;
ManagedChannelBuilder<?> channelBuilder = ManagedChannelBuilder
.forTarget(target)
.usePlaintext();
channel = channelBuilder.build();
blockingStub = RouteGuideGrpc.newBlockingStub(channel);
asyncStub = RouteGuideGrpc.newStub(channel);
}

public void shutdown() throws InterruptedException {
channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
}

public LocationNote getPoint(int lo, int lt, boolean blocking) throws Throwable {
Point point = Point.newBuilder().setLongitude(lo).setLatitude(lt).build();
if(blocking) {
return blockingStub.getPoint(point);
}else{
CallableStreamObserver<LocationNote> responseObserver = new CallableStreamObserver<LocationNote>();
asyncStub.getPoint(point, responseObserver);
return responseObserver.get().get(0);
}
}

public Iterator<Point> listPoints(int left, int top, int right, int bottom, boolean blocking) throws Throwable {
Point hi = Point.newBuilder().setLongitude(left).setLatitude(top).build();
Point lo = Point.newBuilder().setLongitude(right).setLatitude(bottom).build();
Rectangle rec = Rectangle.newBuilder().setHi(hi).setLo(lo).build();
if(blocking){
return blockingStub.listPoints(rec);
}else{
CallableStreamObserver<Point> responseObserver = new CallableStreamObserver<Point>();
asyncStub.listPoints(rec, responseObserver);
return responseObserver.get().iterator();
}
}

public RouteSummary recordRoute(Collection<Point> points) throws Throwable {
CallableStreamObserver<RouteSummary> responseObserver = new CallableStreamObserver<RouteSummary>();
StreamObserver<Point> requestObserver = asyncStub.recordRoute(responseObserver);
points.stream().parallel().forEach(p -> requestObserver.onNext(p));
requestObserver.onCompleted();
return responseObserver.get().get(0);

}

public List<RouteSummary> getPointStream(Collection<Point> points) throws Throwable {
CallableStreamObserver<RouteSummary> responseObserver = new CallableStreamObserver<RouteSummary>();
StreamObserver<Point> requestObserver = asyncStub.getPointStream(responseObserver);
points.stream().parallel().forEach(p -> requestObserver.onNext(p));
requestObserver.onCompleted();
return responseObserver.get();
}
}

gRPC客户端代码详解

gRPC官方将自己分为三层组件:Stub、Channel和Transport。

  • Stub层是最上层的代码,gRPC附带的插件可以从.proto文件直接生成Stub层代码,开发人员通过直接调用Stub层的代码调用RPC服务
  • Channel层是对Transport层功能的抽象,同时提供了很多有用的功能,比如服务发现和负载均衡。。
  • Transport层承担了将字节从网络中取出和放入数据的工作,有三种实现Netty、okHttp、inProgress。Transport层是最底层的代码。

整个grpc-java项目的代码比较多。从风格上来讲,封装比较多,相对于interface更喜欢使用abstract class,相对于反射更喜欢使用硬编码,而且大量使用了单线程异步调用造成调用栈断裂,与常见的java项目的编码风格有很大差别,阅读起来可能容易不习惯。

在源码层面本文将关注下面这些方面:

  • Channel的初始化过程;
  • gRPC中的服务发现;
  • gRPC中的负载均衡
  • Client与Server之间的数据传输

Channel的初始化过程

通过入门示例可以看到,Channel的初始化过程分三步:

  1. 调用forTarget方法创建io.grpc.ManagedChannelBuilder;
  2. 配置各种选项,不论如何配置,返回的总是io.grpc.ManagedChannelBuilder对象;
  3. 调用build方法创建io.grpc.ManagedChannel

forTarget方法

gRPC这里设计比较繁琐,过程比较绕。forTarget方法的实际功能就是把参数target赋值给io.grpc.ManagedChannelBuilder的内部变量target

1
2
3
public static ManagedChannelBuilder<?> forTarget(String target) {
return ManagedChannelProvider.provider().builderForTarget(target);
}

io.grpc.ManagedChannelProvider.provider()会返回一个io.grpc.ManagedChannelProvider实现。有哪些io.grpc.ManagedChannelProvider实现是在io.grpc.ManagedChannelProvider中以硬编码形式确定的,这里其实存在利用反射改进的空间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private static final class HardcodedClasses implements Iterable<Class<?>> {
@Override
public Iterator<Class<?>> iterator() {
List<Class<?>> list = new ArrayList<>();
try {
list.add(Class.forName("io.grpc.okhttp.OkHttpChannelProvider"));
} catch (ClassNotFoundException ex) {
// ignore
}
try {
list.add(Class.forName("io.grpc.netty.NettyChannelProvider"));
} catch (ClassNotFoundException ex) {
// ignore
}
return list.iterator();
}
}

实际上就根据依赖的jar包不同就只有两个实现,一个netty的,一个okhttp的。如果像入门示例项目一样只配置了netty实现,那就只有netty的。io.grpc.netty.NettyChannelProvider的buildForTarget方法调用的是io.grpc.netty.NettyChannelBuilderforTarget方法。

1
2
3
public NettyChannelBuilder builderForTarget(String target) {
return NettyChannelBuilder.forTarget(target);
}

io.grpc.netty.NettyChannelBuilder继承自io.grpc.internal.AbstractManagedChannelImplBuilderforTarget方法实际上调用了父类的构造函数。

1
2
3
4
5
6
7
NettyChannelBuilder(String target) {
super(target);
}

public static NettyChannelBuilder forTarget(String target) {
return new NettyChannelBuilder(target);
}

io.grpc.internal.AbstractManagedChannelImplBuilder的构造函数最终会是把参数赋值给target变量。

1
2
3
4
protected AbstractManagedChannelImplBuilder(String target) {
this.target = Preconditions.checkNotNull(target, "target");
this.directServerAddress = null;
}

build方法

从前文可以看到,实际初始化的io.grpc.ManagedChannelBuilder实际上是io.grpc.netty.NettyChannelBuilder,其build方法实现在其父类io.grpc.internal.AbstractManagedChannelImplBuilder中。

1
2
3
4
5
6
7
8
9
10
11
public ManagedChannel build() {
return new ManagedChannelOrphanWrapper(new ManagedChannelImpl(
this,
buildTransportFactory(),
// TODO(carl-mastrangelo): Allow clients to pass this in
new ExponentialBackoffPolicy.Provider(),
SharedResourcePool.forResource(GrpcUtil.SHARED_CHANNEL_EXECUTOR),
GrpcUtil.STOPWATCH_SUPPLIER,
getEffectiveInterceptors(),
TimeProvider.SYSTEM_TIME_PROVIDER));
}

这里的io.grpc.internal.ManagedChannelOrphanWrapperio.grpc.internal.ManagedChannelImpl其实都是io.grpc.ManagedChannel的实现。io.grpc.internal.ManagedChannelOrphanWrapper从功能上分析没有任何作用,io.grpc.internal.ManagedChannelOrphanWrapper会为io.grpc.ManagedChannel创建弱引用,并被放置到ReferenceQueue中。如果Channel是单例的,那么意义不大;如果客户端被重复创建却没有被关闭,那么ReferenceQueue中会留下相应的引用记录,可能有助于排查问题。

io.grpc.internal.ManagedChannelImpl构造方法的几个参数中,除了第一个参数是builder本身,第二个参数是用来创建Transport的Factory,第三个参数是后台连接重试策略,第四个参数是gRPC的全局线程池,第五个和第七个都是和时间相关的对象,主要用于日志中,第六个是客户端调用时的interceptor。在io.grpc.netty.NettyChannelBuilder中,buildTransportFactory方法会创建一个io.grpc.netty.NettyChannelBuilder.NettyTransportFactory

服务发现

前文的入门示例中直接写了target,只能连接单个Server。如果有多个可以提供服务的Server,那么就需要有一种方式通过单个target发现这些Server。在io.grpc.ManagedChannelBuilder中有一个nameResolverFactory方法,可以用来指定如何解析target地址,发现多个服务端。

nameResolverFactory方法

这个方法的实现也在io.grpc.internal.AbstractManagedChannelImplBuilder中,如果用户有自己的io.grpc.NameResolver.Factory实现的话可以通过nameResolverFactory方法指定,gRPC就会使用用户自己的io.grpc.NameResolver.Factroy实现代替gRPC自己的默认实现,否则会使用io.grpc.NameResolverRegistry中的默认实现。

io.grpc.NameResolverRegistry会通过硬编码加载io.grpc.NameResolverProvider实现,并创建一个与之有关的io.grpc.NameResolver.Factory的实现。目前硬编码加载的io.grpc.NameResolverProvider实现只有io.grpc.internal.DnsNameResolverProvider一种。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
private final NameResolver.Factory factory = new NameResolverFactory();
@GuardedBy("this")
private final LinkedHashSet<NameResolverProvider> allProviders = new LinkedHashSet<>();

private synchronized void addProvider(NameResolverProvider provider) {
checkArgument(provider.isAvailable(), "isAvailable() returned false");
allProviders.add(provider);
}

public static synchronized NameResolverRegistry getDefaultRegistry() {
if (instance == null) {
List<NameResolverProvider> providerList = ServiceProviders.loadAll(
NameResolverProvider.class,
getHardCodedClasses(),
NameResolverProvider.class.getClassLoader(),
new NameResolverPriorityAccessor());
if (providerList.isEmpty()) {
logger.warning("No NameResolverProviders found via ServiceLoader, including for DNS. This "
+ "is probably due to a broken build. If using ProGuard, check your configuration");
}
instance = new NameResolverRegistry();
for (NameResolverProvider provider : providerList) {
logger.fine("Service loader found " + provider);
if (provider.isAvailable()) {
instance.addProvider(provider);
}
}
instance.refreshProviders();
}
return instance;
}

public NameResolver.Factory asFactory() {
return factory;
}

@VisibleForTesting
static List<Class<?>> getHardCodedClasses() {
ArrayList<Class<?>> list = new ArrayList<>();
try {
list.add(Class.forName("io.grpc.internal.DnsNameResolverProvider"));
} catch (ClassNotFoundException e) {
logger.log(Level.FINE, "Unable to find DNS NameResolver", e);
}
return Collections.unmodifiableList(list);
}

private final class NameResolverFactory extends NameResolver.Factory {
@Override
@Nullable
public NameResolver newNameResolver(URI targetUri, NameResolver.Args args) {
List<NameResolverProvider> providers = providers();
for (NameResolverProvider provider : providers) {
NameResolver resolver = provider.newNameResolver(targetUri, args);
if (resolver != null) {
return resolver;
}
}
return null;
}

@Override
public String getDefaultScheme() {
List<NameResolverProvider> providers = providers();
if (providers.isEmpty()) {
return "unknown";
}
return providers.get(0).getDefaultScheme();
}
}

getDefaultSchema会匹配target中的schema(如dns),如果匹配的上,就使用相应的NameResolver.Factory,返回NameResolver决定真正的服务访问地址。

io.grpc.NameResolver

我们来看io.grpc.NameResolver

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
public abstract class NameResolver {

public abstract String getServiceAuthority();

public void start(final Listener listener) {
if (listener instanceof Listener2) {
start((Listener2) listener);
} else {
start(new Listener2() {
@Override
public void onError(Status error) {
listener.onError(error);
}

@Override
public void onResult(ResolutionResult resolutionResult) {
listener.onAddresses(resolutionResult.getAddresses(), resolutionResult.getAttributes());
}
});
}
}

public void start(Listener2 listener) {
start((Listener) listener);
}

public abstract void shutdown();

public void refresh() {}

@ThreadSafe
public interface Listener {

void onAddresses(List<EquivalentAddressGroup> servers, @ResolutionResultAttr Attributes attributes);

void onError(Status error);
}

public abstract static class Listener2 implements Listener {

@Override
public final void onAddresses(
List<EquivalentAddressGroup> servers, @ResolutionResultAttr Attributes attributes) {
onResult(
ResolutionResult.newBuilder().setAddresses(servers).setAttributes(attributes).build());
}

public abstract void onResult(ResolutionResult resolutionResult);

@Override
public abstract void onError(Status error);
}

public static final class ResolutionResult {
private final List<EquivalentAddressGroup> addresses;
@ResolutionResultAttr
private final Attributes attributes;
@Nullable
private final ConfigOrError serviceConfig;

ResolutionResult(
List<EquivalentAddressGroup> addresses,
@ResolutionResultAttr Attributes attributes,
ConfigOrError serviceConfig) {
this.addresses = Collections.unmodifiableList(new ArrayList<>(addresses));
this.attributes = checkNotNull(attributes, "attributes");
this.serviceConfig = serviceConfig;
}

public static Builder newBuilder() {
return new Builder();
}

public Builder toBuilder() {
return newBuilder()
.setAddresses(addresses)
.setAttributes(attributes)
.setServiceConfig(serviceConfig);
}

public List<EquivalentAddressGroup> getAddresses() {
return addresses;
}

@ResolutionResultAttr
public Attributes getAttributes() {
return attributes;
}

@Nullable
public ConfigOrError getServiceConfig() {
return serviceConfig;
}

@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1770")
public static final class Builder {
private List<EquivalentAddressGroup> addresses = Collections.emptyList();
private Attributes attributes = Attributes.EMPTY;
@Nullable
private ConfigOrError serviceConfig;
Builder() {}

public Builder setAddresses(List<EquivalentAddressGroup> addresses) {
this.addresses = addresses;
return this;
}

public Builder setAttributes(Attributes attributes) {
this.attributes = attributes;
return this;
}

public Builder setServiceConfig(@Nullable ConfigOrError serviceConfig) {
this.serviceConfig = serviceConfig;
return this;
}

public ResolutionResult build() {
return new ResolutionResult(addresses, attributes, serviceConfig);
}
}
}
}

在客户端首次连接服务端的时候会调用Listener2start方法,需要更新的时候会调用refresh方法。当Listener2接收到服务端地址时,会调用onResult方法。

io.grpc.internal.DnsNameResolver

由于gRPC支持长连接,所以如果直连的话只会访问一个域名下的一台服务器,即首次连接时通过DNS返回IP地址。io.grpc.internal.DnsNameResolverProvider是对io.grpc.internal.DnsNameResolver的简单封装,只支持以dns:///开头的地址。io.grpc.internal.DnsNameResolver会根据target获取该host下所有关联的IP,即通过DNS解析出所有的服务端IP地址。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public final class DnsNameResolverProvider extends NameResolverProvider {

private static final String SCHEME = "dns";

@Override
public DnsNameResolver newNameResolver(URI targetUri, NameResolver.Args args) {
if (SCHEME.equals(targetUri.getScheme())) {
String targetPath = Preconditions.checkNotNull(targetUri.getPath(), "targetPath");
Preconditions.checkArgument(targetPath.startsWith("/"),
"the path component (%s) of the target (%s) must start with '/'", targetPath, targetUri);
String name = targetPath.substring(1);
return new DnsNameResolver(
targetUri.getAuthority(),
name,
args,
GrpcUtil.SHARED_CHANNEL_EXECUTOR,
Stopwatch.createUnstarted(),
InternalServiceProviders.isAndroid(getClass().getClassLoader()));
} else {
return null;
}
}

@Override
public String getDefaultScheme() {
return SCHEME;
}

@Override
protected boolean isAvailable() {
return true;
}

@Override
protected int priority() {
return 5;
}
}

可以看到io.grpc.internal.DnsNameResolver中的startrefresh方法都调用的是resolve方法,而resolve方法是执行了一个继承自RunnableResolve接口。

DnsNameResolver

在有代理的情况下,ResolveresolveInternal会根据代理返回的ProxiedSocketAddress创建EquivalentAddressGroup作为服务端列表返回,并设置空config;否则会调用resolveAll方法获取服务端列表,并调用parseServiceConfig方法设置config。resolveAll方法返回的ResolutionResults有三个变量addressestxtRecordsbalancerAddresses

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
@VisibleForTesting
static ResolutionResults resolveAll(
AddressResolver addressResolver,
@Nullable ResourceResolver resourceResolver,
boolean requestSrvRecords,
boolean requestTxtRecords,
String name) {
List<? extends InetAddress> addresses = Collections.emptyList();
Exception addressesException = null;
List<EquivalentAddressGroup> balancerAddresses = Collections.emptyList();
Exception balancerAddressesException = null;
List<String> txtRecords = Collections.emptyList();
Exception txtRecordsException = null;

try {
addresses = addressResolver.resolveAddress(name);
} catch (Exception e) {
addressesException = e;
}
if (resourceResolver != null) {
if (requestSrvRecords) {
try {
balancerAddresses =
resourceResolver.resolveSrv(addressResolver, GRPCLB_NAME_PREFIX + name);
} catch (Exception e) {
balancerAddressesException = e;
}
}
if (requestTxtRecords) {
boolean balancerLookupFailedOrNotAttempted =
!requestSrvRecords || balancerAddressesException != null;
boolean dontResolveTxt =
(addressesException != null) && balancerLookupFailedOrNotAttempted;
if (!dontResolveTxt) {
try {
txtRecords = resourceResolver.resolveTxt(SERVICE_CONFIG_NAME_PREFIX + name);
} catch (Exception e) {
txtRecordsException = e;
}
}
}
}
try {
if (addressesException != null
&& (balancerAddressesException != null || balancerAddresses.isEmpty())) {
Throwables.throwIfUnchecked(addressesException);
throw new RuntimeException(addressesException);
}
} finally {
if (addressesException != null) {
logger.log(Level.FINE, "Address resolution failure", addressesException);
}
if (balancerAddressesException != null) {
logger.log(Level.FINE, "Balancer resolution failure", balancerAddressesException);
}
if (txtRecordsException != null) {
logger.log(Level.FINE, "ServiceConfig resolution failure", txtRecordsException);
}
}
return new ResolutionResults(addresses, txtRecords, balancerAddresses);
}

addressResolverresolveAddress方法实际是调用JDK的java.net.InetAddressgetAllByName方法,即根据host通过DNS返回一系列服务端列表。resourceResolver根据LDAP协议获取指定命名空间下的服务端列表地址。txtRecordsbalancerAddresses是和LDAP相关的参数,方法入参requestSrvRecordsrequestTxtRecords的默认值都是false。由于LDAP不是特别常用,这里就不深入展开了。

NameResolverListeneronResult

NameResolverListener获取解析结果后会调用onResult方法,进而会调用io.grpc.LoadBalancerhandleResolvedAddresses方法。

获取解析结果后调用handleResolvedAddresses方法

负载均衡

io.grpc.ManagedChannel初始化的时候可以通过defaultLoadBalancingPolicy方法指定负载均衡策略,实际是根据defaultLoadBalancingPolicy创建了一个io.grpc.internal.AutoConfiguredLoadBalancerFactory对象。io.grpc.internal.AutoConfiguredLoadBalancerFactory则通过io.grpc.LoadBalancerRegistry获取对应名称的负载均衡策略。io.grpc.LoadBalancerProvidergetPolicyName方法指定负载均衡策略名称,newLoadBalancer返回负载均衡io.grpc.LoadBalancer的具体实现。如果想要添加自定义负载均衡策略,需要调用io.grpc.LoadBalancerRegistryregistry方法,并自己实现io.grpc.LoadBalancerProviderio.grpc.LoadBalancer,并指定负载均衡策略名称即可。
defaultLoadBalancingPolicy方法

io.grpc.LoadBalancer.SubchannelPicker

io.grpc.LoadBalancer的核心逻辑实际在SubchannelPicker中。pickSubchannel方法会返回的PickResult中包含真正可用的subchannel,用来进行后续的数据传输。

1
2
3
4
5
6
7
8
9
public abstract static class SubchannelPicker {
/**
* Make a balancing decision for a new RPC.
*
* @param args the pick arguments
* @since 1.3.0
*/
public abstract PickResult pickSubchannel(PickSubchannelArgs args);
}

gRPC默认提供了两种负载均衡实现策略:prick_firstround_robin。前者总会使用第一个可用的服务端,后者则是简单轮询。

handleResolvedAddresses

当服务端列表更新时,会调用io.grpc.LoadBalancerhandleResolvedAddresses方法更新可用的subchannel。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Override
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
List<EquivalentAddressGroup> servers = resolvedAddresses.getAddresses();
if (subchannel == null) {
final Subchannel subchannel = helper.createSubchannel(
CreateSubchannelArgs.newBuilder()
.setAddresses(servers)
.build());
subchannel.start(new SubchannelStateListener() {
@Override
public void onSubchannelState(ConnectivityStateInfo stateInfo) {
processSubchannelState(subchannel, stateInfo);
}
});
this.subchannel = subchannel;
helper.updateBalancingState(CONNECTING, new Picker(PickResult.withSubchannel(subchannel)));
subchannel.requestConnection();
} else {
subchannel.updateAddresses(servers);
}
}

如果是首次调用(subchannel == null) 会创建subchannel,其实现是io.grpc.internal.ManagedChannelImpl.SubchannelImpl,创建的过程中会创建io.grpc.internal.InternalSubchannel。然后调用io.grpc.internal.ManagedChannelImplupdateBalancingState方法,把subchannelPicker更新为实现Picker,然后开启subchannel的连接。

开启subchannel链接

在开启subchannel的连接过程中,会调用io.grpc.internal.InternalSubchannelobtainActiveTransport方法。

这里的transportFactory就是上面提到io.grpc.ManagedChannelBuilder调用build初始化时调用buildTransportFactory方法返回的,依赖于Transport层的具体实现。在netty实现中,返回的是io.grpc.netty.NettyClientTransport

传输

gRPC客户端发起Request时,stub会调用ClientCallsstartCall方法,最终会调用io.grpc.internal.ManagedChannelImpl.ChannelTransportProviderget方法获取io.grc.internal.ClientTransport

gRPC客户端发起Request时调用ChannelTransportProvider的get方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public ClientTransport get(PickSubchannelArgs args) {
SubchannelPicker pickerCopy = subchannelPicker;
if (shutdown.get()) {
return delayedTransport;
}
if (pickerCopy == null) {
final class ExitIdleModeForTransport implements Runnable {
@Override
public void run() {
exitIdleMode();
}
}
syncContext.execute(new ExitIdleModeForTransport());
return delayedTransport;
}
PickResult pickResult = pickerCopy.pickSubchannel(args);
ClientTransport transport = GrpcUtil.getTransportFromPickResult(
pickResult, args.getCallOptions().isWaitForReady());
if (transport != null) {
return transport;
}
return delayedTransport;
}

如果subchannelPicker存在,会使用subchannelPicker进行选择;如果是首次访问服务端时subchannel肯定不存在,会使用syncContext异步执行exitIdleMode方法初始化。syncContext是一个单线程执行队列,可以保证先提交的任务先执行。delayedTransport的执行也依赖于syncContext,这就保证了delayedTransport中的方法执行一定会在exitIdleMode方法之后。

首次访问服务端时执行exidIdleMode方法

exitIdleMode方法会初始化NameResolverLoadBalancer,并会启动NameResolverListener。当解析完成后会调用NameResolverListeneronResult方法,进而调用LoadBalancerhandleResolvedAddresses方法创建subchannelPicker、创建并连接subchannel。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@VisibleForTesting
void exitIdleMode() {
syncContext.throwIfNotInThisSynchronizationContext();
if (shutdown.get() || panicMode) {
return;
}
if (inUseStateAggregator.isInUse()) {
cancelIdleTimer(false);
} else {
rescheduleIdleTimer();
}
if (lbHelper != null) {
return;
}
channelLogger.log(ChannelLogLevel.INFO, "Exiting idle mode");
LbHelperImpl lbHelper = new LbHelperImpl();
lbHelper.lb = loadBalancerFactory.newLoadBalancer(lbHelper);
this.lbHelper = lbHelper;

NameResolverListener listener = new NameResolverListener(lbHelper, nameResolver);
nameResolver.start(listener);
nameResolverStarted = true;
}

Request

发送Request时会调用ConnectionClientTransportnewStream方法返回一个io.grpc.internal.ClientStream对象,而首次调用会通过delayedTransport延迟调用newStream方法。

调用newStream的调用栈

netty实现会返回一个io.grpc.netty.shaded.io.grpc.netty.NettyClientStream对象。io.grpc.internal.ClientStream下有两个子类,TransportState负责处理传输状态,Sink负责写入数据。

在进行一系列http2相关设置后,会调用io.grpc.internal.ClientStreamstart方法,为TransportState设置监听并通过Sink写入Header。

1
2
3
4
5
6
7
8
@Override
public final void start(ClientStreamListener listener) {
transportState().setListener(listener);
if (!useGet) {
abstractClientStreamSink().writeHeaders(headers, null);
headers = null;
}
}

初始化结束后,调用requestObserver的onNext方法会调用io.grpc.internal.ClientCallImplsendMessage方法,将protobuf对象转换成InputStream,并作为参数调用io.grpc.internal.ClientStreamwriteMessage方法,进而调用io.grpc.internal.MessageFramerwritePayload方法,最终调用writeToOutputStream方法将内容写入Http的OutputStream。如果是参数是stream形式会继续调用flush。

onNext

调用requestObserver的onCompleted方法会调用io.grpc.internal.ClientCallImplhalfClose方法,进而会调用io.grpc.internal.MessageFramerendOfMessages,flush并结束发送消息。

onComplete

Response

onNext

客户端接受到Response会调用ClientStreamListener的messagesAvailable方法,并通过同步线程池最终调用StreamObserver的onNext方法接收数据。

onComplete

当返回结束时会调用TransportState的transportReportStatus方法关闭请求,进而调用ClientStreamListener的closed方法关闭监听,进而调用StreamObserver的onClose方法。

gRPC通信格式

gRPC发送的请求发送方法是POST,路径是/${serviceName}/${methodName},content-type为content-type = application/grpc+proto。

Request

1
2
3
4
5
6
7
8
9
10
HEADERS (flags = END_HEADERS)
:method = POST
:scheme = http
:path = /RouteGuide/getPoint
grpc-timeout = 1S
content-type = application/grpc+proto
grpc-encoding = gzip

DATA (flags = END_STREAM)
<Length-Prefixed Message>

Response

1
2
3
4
5
6
7
8
9
10
11
HEADERS (flags = END_HEADERS)
:status = 200
grpc-encoding = gzip
content-type = application/grpc+proto

DATA
<Length-Prefixed Message>

HEADERS (flags = END_STREAM, END_HEADERS)
grpc-status = 0 # OK
trace-proto-bin = jher831yy13JHy3hc

扩展gRPC

自定义基于zookeeper的NameResolver.Factory实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
public class CuratorNameResolver extends NameResolver {
CuratorFramework curatorFramework;
String basePath;
String serviceAuthority;
private Listener2 listener;

public CuratorNameResolver(CuratorFramework curatorFramework, String basePath, String serviceAuthority) {
this.curatorFramework = curatorFramework;
this.basePath = basePath;
this.serviceAuthority = serviceAuthority;
}

@Override
public void start(Listener2 listener) {
this.curatorFramework.start();
this.listener = listener;
refresh();
}

@Override
public void refresh() {
List<EquivalentAddressGroup> servers = new ArrayList<>();
try {
List<EquivalentAddressGroup> addresses = curatorFramework.getChildren()
.forPath(basePath)
.stream().map(address ->{
try {
URI uri = new URI("http://" + address);
return new EquivalentAddressGroup(
new InetSocketAddress(uri.getHost(), uri.getPort()));
}catch (Exception e){
listener.onError(Status.INTERNAL);
return null;
}
}).collect(Collectors.toList());
listener.onResult(ResolutionResult.newBuilder().setAddresses(addresses).build());

} catch (Exception e) {
listener.onError(Status.INTERNAL);
}
}

@Override
public String getServiceAuthority() {
return this.serviceAuthority;
}

@Override
public void shutdown() {
this.curatorFramework.close();
}

public static class Factory extends NameResolver.Factory{
@Override
public NameResolver newNameResolver(URI targetUri, Args args) {
String address = targetUri.getHost() + ":" + targetUri.getPort();
String authority = null == targetUri.getAuthority() ? address : targetUri.getAuthority();
CuratorFramework curator = CuratorFrameworkFactory.builder()
.connectString(address)
.retryPolicy(new ExponentialBackoffRetry(1000, 5))
.connectionTimeoutMs(1000)
.sessionTimeoutMs(60000)
.build();
return new CuratorNameResolver(curator, targetUri.getPath(), authority);
}

@Override
public String getDefaultScheme() {
return "zookeeper";
}
}
}

自定义随机负载均衡实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
public class RandomLoadBalancer extends LoadBalancer{
LoadBalancer.Helper helper;

private final Map<EquivalentAddressGroup, Subchannel> subchannels =
new HashMap<>();
static final Attributes.Key<Ref<ConnectivityStateInfo>> STATE_INFO =
Attributes.Key.create("state-info");

public RandomLoadBalancer(LoadBalancer.Helper helper) {
this.helper = helper;
}
@Override
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
List<EquivalentAddressGroup> servers = resolvedAddresses.getAddresses();
for(EquivalentAddressGroup server : servers){
List<EquivalentAddressGroup> serverSingletonListt = Collections.singletonList(server);
Subchannel exists = subchannels.getOrDefault(server, null);
if(null != exists){
exists.updateAddresses(serverSingletonListt);
continue;
}
Attributes.Builder subchannelAttrs = Attributes.newBuilder()
.set(STATE_INFO,
new Ref<>(ConnectivityStateInfo.forNonError(IDLE)));
final Subchannel subchannel = helper.createSubchannel(CreateSubchannelArgs.newBuilder()
.setAddresses(serverSingletonListt)
.setAttributes(subchannelAttrs.build())
.build());
subchannels.put(server, subchannel);
subchannel.start(new SubchannelStateListener() {
@Override
public void onSubchannelState(ConnectivityStateInfo state) {
for(Map.Entry<EquivalentAddressGroup, Subchannel> entry : subchannels.entrySet()){
if(subchannel == entry.getValue()){
if (state.getState() == SHUTDOWN) {
subchannels.remove(entry.getKey());
}
if (state.getState() == IDLE) {
subchannel.requestConnection();
}
subchannel.getAttributes().get(STATE_INFO).value = state;
updateBalancingState();
return;
}
}
}
});
subchannel.requestConnection();
}
updateBalancingState();
}
@Override
public void handleNameResolutionError(Status error) {
shutdown();
helper.updateBalancingState(TRANSIENT_FAILURE, new SubchannelPicker() {
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
return PickResult.withError(error);
}
});
}

private void updateBalancingState(){
boolean ready = true;
for(Subchannel subchannel : this.subchannels.values()){
if(subchannel.getAttributes().get(STATE_INFO).value.getState() != READY){
helper.updateBalancingState(CONNECTING, new RandomSubchannelPick(subchannels.values()));
return;
}
}
helper.updateBalancingState(ConnectivityState.READY, new RandomSubchannelPick(subchannels.values()));
}

@Override
public void shutdown() {
for(Iterator<Map.Entry<EquivalentAddressGroup, Subchannel>> itr = subchannels.entrySet().iterator(); itr.hasNext();){
Map.Entry<EquivalentAddressGroup, Subchannel> e = itr.next();
e.getValue().shutdown();
itr.remove();
}

}

class RandomSubchannelPick extends SubchannelPicker{
Subchannel[] subchannels;
Random random = new Random(System.currentTimeMillis());

public RandomSubchannelPick(Collection<Subchannel> subchannels) {
this.subchannels = subchannels.stream().toArray(Subchannel[]::new);
}

@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
int idx = random.nextInt(subchannels.length);
return PickResult.withSubchannel(subchannels[idx]);
}
}

public static class Provider extends LoadBalancerProvider{

@Override
public boolean isAvailable() {
return true;
}

@Override
public int getPriority() {
return 100;
}

@Override
public String getPolicyName() {
return "random";
}

@Override
public LoadBalancer newLoadBalancer(LoadBalancer.Helper helper) {
return new RandomLoadBalancer(helper);
}
}

static final class Ref<T> {
T value;

Ref(T value) {
this.value = value;
}
}
}

服务端初始化

服务端需要把自己的服务地址注册到zookeeper。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
private final int port;
private final Server server;
private String registryPath;
private String address;
CuratorFramework curator = CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181")
.retryPolicy(new ExponentialBackoffRetry(1000, 5))
.connectionTimeoutMs(1000)
.sessionTimeoutMs(60000)
.build();;

public GreetingServer(int port, String registryPath) throws IOException {
this.port = port;
server = ServerBuilder.forPort(port).addService(new GreetingService())
.build();
this.registryPath = registryPath;
this.address = "localhost:" + port; //本机网卡不能正确显示地址,直接写死localhost
}

/**
* Start server.
*/
public void start() throws Exception {
this.curator.start();
server.start();;
this.curator.create()
.creatingParentContainersIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath(registryPath + "/" + address, ("http://" + address).getBytes());

System.out.println("Server started, listening on " + address);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
GreetingServer.this.stop();
}
});
}

客户端初始化

客户端需要注册自定义的NameResolverFactory和LoadBalancer。

1
2
3
4
5
6
7
8
9
10
11
12
13
public GreetingClient(String host, int port, String path) {
String target = "zookeeper://" + host + ":" + port + path;
CuratorNameResolver.Factory factory = new CuratorNameResolver.Factory();

LoadBalancerRegistry.getDefaultRegistry().register(new RandomLoadBalancer.Provider());
ManagedChannelBuilder<?> channelBuilder = ManagedChannelBuilder
.forTarget(target)
.nameResolverFactory(factory)
.defaultLoadBalancingPolicy("random")
.usePlaintext();
channel = channelBuilder.build();
blockingStub = GreetingGrpc.newBlockingStub(channel);
}

参考资料