寒歌随笔


  • Home

  • About

  • Tags

  • Categories

  • Archives

如何写一个易扩展的模版引擎(1):设计

Posted on 2020-01-01 | In javascript

作者:寒歌

轻量级高可扩展静态模板引擎Tirpitz:https://github.com/LiuMenghan/Tirpitz

前言

这是一个四年前的项目,起因是当时跟风学习node.js,但是当了解到当时node.js单线程异步的奇怪设计之后,我就放弃了在生产环境服务端使用node.js的想法(就像现在我看到Rust不支持反射,就放弃了用Rust写业务逻辑的想法)。但是node.js不适合在大型互联网公司生产环境服务端使用,并不意味着node.js这门技术就没有合适的使用场景。node.js由于使用了javascript作为编程语言,非常适合作为前端转型全栈工程师的中间环节,一些重心在前端但是需要服务端技术支持的功能都非常适合用node.js开发,比如:

  • 以展示为主、轻后端重前端的小型网站;
  • 前端开发接口Mock;
  • 模版引擎。

几个里面相对来说看上去比较有挑战性的就是模版引擎,所以就打算自己写个模版引擎来练练手。在阅读了FreeMarker、Velocity、Thymeleaf、StringTemplate等一系列模版引擎的使用文档和源码之后,对模版引擎有了以下总结:

  1. 模板引擎是用来生成文档的工具;
  2. 大多数人对文档的理解都是一种类似目录的树形结构;
  3. 模板引擎最大的作用是代替程序员进行重复劳动(如banner、Copyright),让程序员可以更关注于整体结构和特殊的内容细节;
  4. 性能其实没那么重要,如果由模版引擎实时生成文档难以满足生产环境的需求,最终不可避免的会改为预先由模版引擎生成静态文档、生产环境直接使用静态文档的技术方案,这种情况下模版引擎的性能只需要满足开发调试的需求就可以了;
  5. 扩展性很重要,缺乏扩展性(无法扩展,或者开发扩展插件的代价较高)会促使程序员使用各种各样奇怪的hacker技术,反过来由于开源社区的反馈机制又会使模版引擎朝着奇怪的方向发展,最终使模版引擎难以使用。

所以最终定下的目标是基于node.js开发一个高度可扩展,但是性能(空间、时间)没那么好的模版引擎。

由于编程习惯,设计上是按照强类型设计的。由于javascript是弱类型,所以部分代码通过java来进行描述,但实际都是通过弱类型的javascript实现的。

总体设计

模版语言与普通程序语言不同,相对缺少时间上的先后顺序概念。大多数模版引擎以标签方式标记模版逻辑,每个模板文件的渲染过程都可以分为两步:

  1. 将模板文件解析成一棵标签树;
  2. 遍历每个标签节点并执行对应标签逻辑。

模版文件 -> 标签树 -> 遍历标签树处理标签逻辑 -> 生成文档

主要扩展点

上面四个节点,最好都具有一定的可扩展性。

  • 模版文件是用户自己编写的,自然是容易扩展的。
  • 标签树作为一种数据结构,应该允许存储用户自定义的一些参数。
  • 用户应该可以方便的自定义标签及标签处理逻辑,标签需要有一些和标签处理逻辑相关的属性,同时用户应该也可以自定义标签符号、转义符号、属性的赋值符号。
  • 由于文档用处不同(如输出到文件/控制台/http流),由处理后的标签树生成文档这步应该也是可以由用户根据需求自定义的。

AOP扩展点

模版引擎处理流程中的三个箭头都可以作为AOP扩展点。

模版文件到标签树的切面,可以用来自定义添加标签,应作为一个扩展点保留。

标签树到遍历标签树处理标签逻辑的切面,实现的流程上应该是:

遍历标签树 -> 找到满足切面条件的扩展点 -> 执行AOP逻辑

这个逻辑与自定义标签的处理逻辑几乎完全相同,所以就不需要保留了。

处理标签逻辑到生成文档的切面,可以用来做一些最终文本的处理工作(如敏感词过滤),应作为一个扩展点保留。

这样一共增加了两个扩展点,分别为前置拦截器和后置拦截器,处理流程变成下面这个样子:

模版文件 -> 前置拦截器 -> 标签树 -> 遍历标签树处理标签逻辑 -> 后置拦截器 -> 生成文档

详细设计

模版文件的标签设计

模板文件的内容包括两部分:普通文本和标签。标签的结构设计最好方便标签树的生成,同时标签需要存储一些和标签处理逻辑相关的属性。这种思路典型的实现就是XML格式。XML的标签有三组:

  • <和>组成开始标签;
  • </和>组成结束标签,和开始标签成对出现;
  • <和/>组成独立标签。

鉴于模版文件肯定会广泛应用于类XML的HTML文件生成,所以最好不要直接使用XML的标签定义,模仿一下,修改成:

1
2
3
exports.starterTag = ["{%", "%}"];	//开始标签
exports.endTag = ["{%/", "%}"]; //结束标签
exports.singleTag = ["{%", "/%}"]; //独立标签

虽然标签的属性通常是结构化的,但是非结构化的需求可能也潜在存在,同时考虑到方便node.js解析,所以标签属性定义成json形式。

最终设计出来的标签是这个样子的:

1
{%variable={"key":"date"} /%}

前置拦截器

前置拦截器的输入和输出应该一致,都是字符串。前置拦截方法定义为:

1
String before(String context);

前置拦截器的执行顺序应该由用户自己决定。

解析器

由于标签参考了XML设计,实现上可以参考XML的解析设计。具体比较复杂,下一章单独讨论。

模版标签树节点

节点是模版引擎最重要的数据结构。从标签定义可以看出,至少需要有两个属性:标签名称和属性。同时,作为一棵树,考虑到遍历需求,需要有子节点数组、父节点引用。最后,这棵树中最多的节点肯定是普通文本节点,所以最好添加一个text属性保存文本。

模板标签树节点应该定义成这样:

1
2
3
4
5
6
7
class TemplateNode{
String processorName;
String text;
TemplateNode parent;
TemplateNode[] children;
Map<String, Object> attribute;
}

标签处理器

最常见的标签毫无疑问是纯文本标签,通过将标签名称默认为空,不进行任何处理,就实现了纯文本标签的处理逻辑。其它标签处理器的抽象是其实是一件比较困难的事情,后面关于标签处理器的定义可能还要修改。开发的时候从常用标签处理器着手,实现了两个最常用的标签处理器:插值和继承。

插值处理器

插值处理器的作用是将变量替换为外部配置好的值,那么输入参数至少有两个:当前节点和外部字典表。根据node.attribute.key的值从字典表中取出对应的值并替换text。

1
2
3
4
5
6
7
8
9
10
exports.processorName = "variable";
exports.process = function(node, tplPath, parser, variables){

var key = node.attribute.key;
if(key != undefined && null != key){
var value = variables[key];
node.text = value == undefined || null == value ? "" : value + "";
}
node.processorName = "";
}

处理结束后将标签名称赋值为空,标识着标签处理逻辑结束,标签转换为纯被文本标签。

继承处理器

继承处理器比较复杂,它允许两个模版文件按一定规则组合起来,比如父模版文件为:

1
2
3
4
5
Super head
{%replacable={"id":"replacer0"} /%}
Super body
{%replacable={"id":"replacer1"} /%}
Super bottom

子模版文件为:

1
2
3
4
5
6
Child text before super.
{%super={"parent":"./super.tirpitz"}%}
{%override={"id":"replacer0"}%}Text in child replacer0.{%/override%}
{%override={"id":"replacer1"}%}Text in child replacer1.{%/override%}
{%/super%}
Child text after super.

子模版文件中的super块会被替换成父模版文件的内容,而且父模版文件当中的replacable块会被替换成和子模版文件中override块id一致的内容。编译子模版的输出内容如下:

1
2
3
4
5
6
7
Child text before super.
Super head
Text in child replacer0
Super body
Text in child replacer1.
Super bottom
Child text after super.

super标签处理器的参数至少需要有三个:

  • 当前节点;
  • 当前解析模版文件路径(用于查找父模版);
  • 解析器(用于解析父模版)

处理过程如下(其中Util.traverseNodeTree是一个深度遍历树的工具方法):

  1. 获取可以替换的override子节点;
  2. 获取并解析父模版;
  3. 将父模版中的replacable子节点替换为override节点;
  4. 将当前节点的子节点替换为父模版节点的子节点;
  5. 将标签名称赋值为空,标识着标签处理逻辑结束,标签转换为纯被文本标签。。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
var Util = require('../util.js');
exports.processorName = "super";
exports.process = function(node, tplPath, parser, variables){
var children = node.children;
var replacer = {};
for(var i = 0; i < children.length ; i++){
var child = children[i];
Util.traverseNodeTree(child, tplPath, function(node, superPath){
if("override" == node.processorName){
replacer[node.attribute.id] = node;
}else if("super" == node.processorName){
node.attribute.parent = Path.resolve(Path.dirname(tplPath), node.attribute.parent);
}
});
}

//获取父模版
var superPath = Path.resolve(Path.dirname(tplPath), node.attribute.parent);
console.log(superPath);
var superContext = Fs.readFileSync(superPath, parser.encoding);
var superNode = parser.parse2node(superContext, superPath);

Util.traverseNodeTree(superNode, superPath, function(node, superPath){
if("super" == node.processorName){
var relativePath = node.attribute.parent;
//路径改为绝对路径,以解决多级嵌套的问题
node.attribute.parent = Path.resolve(Path.dirname(superPath), relativePath);
}else if(node.processorName == "replacable"){
//替换
if(undefined != replacer[node.attribute.id]){
node.processorName = "";
node.children = replacer[node.attribute.id].children;
}else{
node.processorName = "";
}
}
});
node.parentNode.children[node.parentIdx].children = superNode.children;
node.parentNode.children[node.parentIdx].processorName = "";
}

后置拦截器

后置拦截器的输入是标签树根节点,因为多个后置拦截器都是在同一棵树上进行连续操作,所以不需要返回值。后置拦截方法定义为:

1
void after(TemplateNode root);

后置拦截器的执行顺序应该由用户自己决定。

生成文档

生成文档的过程实际上通过深度遍历树,将各节点的text值输出到控制台/文件/http response的过程。生成文档的过程也应该可以由用户自己定义,以输出到控制台为例:

1
2
3
4
5
6
7
8
9
exports.handle = function(node, tplPath, logger, starttime){
var wholeText = "";
util.traverseNodeTree(node, logger, function(textNode, logger){
wholeText += textNode.text;
});
logger.log(wholeText);
var endtime = new Date().getTime();
console.log("Parse " + tplPath + " to console.(" + (endtime - starttime) + " milliseconds)");
}

封装及文档撰写

模板解析器里面可以由用户自定义的部分很多,加载用户自定义的内容应遵循“约定优于配置”的规则。用户使用大体步骤分两步:

  1. 初始化模板引擎;
  2. 通过引擎渲染模板文件。

常用配置可以通过在初始化模版引擎的时候配置,并在文档中注明,方便新手用户使用。

轻量级高可扩展静态模板引擎Tirpitz:https://github.com/LiuMenghan/Tirpitz

gRPC客户端详解

Posted on 2019-10-07 | In Java

作者:寒歌

RPC框架的选择

常见的RPC框架主要分为轻重两种。较轻的框架一般只负责通信,如rmi、webservice、restful、Thrift、gRPC等。较重的框架一般包括完整的服务发现、负载均衡策略等等如BAT三家的Dubbo、brpc、Tars之类。

框架选择时个人认为首先要考虑的是框架的历史和项目的活跃程度。一个历史悠久的活跃项目(大概至少可以保证每两到三个月有一次小版本的更新)可以保证各种bug早已暴露并修复,让我们可以更专注于我们自己的项目本身,而不是要担心究竟是我们自己的代码有问题还是框架本身就有问题。

重量级RPC框架有一个主要问题就是结构复杂,另外主语言之外的代码质量也不太容易保证。个人认为活跃的社区以及一个活跃的开源管理团队是这些重型RPC框架项目成功的必要前提条件。比如我们项目组试用过腾讯的Tars,C++同学表示没有任何问题,然后JAVA同学表示java版本有许多bug,修复bug的pull request需要两个多月才能得到merge,而官方jar包也将近两年没有更新过了。

轻量级rpc框架中,restful可以被视作标杆。由于restful基于http协议,天然被各种框架支持,而且非常灵活。restful的缺点有两方面,一是过于灵活,缺少根据协议生成服务端和客户端代码的工具,联调往往要花更多的时间;二是大部分序列化基于json或者xml,相对来讲效率不理想。和restful相比,其它很多轻量级框架都有这样或者那样的缺点,有的缺少跨语言支持(rmi),有的既繁琐又缺乏效率优势(webservice)。个人认为其中相对理想的是gRPC和Thrift。

gRPC简介

Protobuf是一种google推出的非常流行的跨语言序列化/反序列化框架。在Protobuf2中就已经出现了用rpc定义服务的概念,但是一直缺少一种流行的rpc框架支持。当Http2推出之后,google将Http2和protobuf3结合,推出了gRPC。gRPC继承了Protobuf和Http2的优点,包括:

  • 序列化反序列化性能好
  • 强类型支持
  • 向前/向后兼容
  • 有代码生成机制,而且可以支持多语言
  • 长连接、多路复用

同时gRPC还提供了简单地服务发现和负载均衡功能。虽然这并不是gRPC框架的重点,但是开发者可以非常容易的自己扩展gRPC这些功能,实现自己的策略或应用最新的相关方面技术,而不用像重型RPC框架一样受制于框架本身是否支持。

gRPC与Thrift对比

Thrift是Facebook推出的一种RPC框架,从性能上来讲远优于gRPC。但是在实际调研时发现有一个很麻烦的问题:Thrift的客户端是线程不安全的——这意味着在Spring中无法以单例形式注入到Bean中。解决方案有三种:

  1. 每次调用创建一个Thrift客户端。这不仅意味着额外的对象创建和垃圾回收开销,而且实际上相当于只使用了短链接,这是一个开发复杂度最低但是从性能上来讲最差的解决方案。
  2. 利用Pool,稍微复杂一点的解决方案,但是也非常成熟。但是问题在于一来缺少服务发现和负载均衡恐实现,需要很多额外开发;二来需要创建Pool数量*服务端数量个客户端,内存开销会比较大。
  3. 使用异步框架如Netty,可以成功避免创建过多的客户端,但是仍要自己实现服务发现和负载均衡,相对复杂。实际上Facebook有一个基于Netty的Thrift客户端,叫Nifty,但是快四年没更新了。。。

相比较而言gRPC就友好多了,本身有简单而且可扩展的服务发现和负载均衡功能,底层基于Netty所以线程安全,在不需要极限压榨性能的情况下是非常好的选择。当然如果需要极限压榨性能Thrift也未必够看。

gRPC入门

gRPC服务定义

gRPC中有一个特殊的关键字stream,表示可以以流式输入或输出多个protobuf对象。注意只有异步非阻塞的客户端支持以stream形式输入,同步阻塞客户端不支持以stream形式输入。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
syntax = "proto3";  //gRPC必须使用proto3

option java_multiple_files = true;
option java_package = "cn.lmh.examples.grpc.proto";

service RouteGuide {
// 输入一个坐标,返回坐标和时间(1:1)
rpc getPoint(Point) returns (LocationNote) {}
// 输入一个矩形,以stream形式返回一系列点(1:n)
rpc listPoints(Rectangle) returns (stream Point) {}
// 以stream形式输入一系列点,返回点的数量和总共花费的时间(m:1)
rpc recordRoute(stream Point) returns (RouteSummary) {}
// 以stream形式输入一系列点,以stream形式返回已输入点的数量和总共花费的时间(m:n)
rpc getPointStream(stream Point) returns (stream RouteSummary) {}
}

message Point {
int32 latitude = 1;
int32 longitude = 2;
}
message Rectangle {
Point lo = 1;
Point hi = 2;
}
message LocationNote {
Point location = 1;
int64 timestamp = 2;
}
message RouteSummary {
int32 point_count = 1;
int64 elapsed_time = 2;
}

依赖和代码生成

由于protoc的gRPC插件需要自己编译,而且存在环境问题。推荐使用gradle或者maven的protobuf插件。入门示例项目使用了gradle,根目录build.gradle配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
plugins {
id 'java'
id 'idea'
id 'wrapper'
}

ext {
groupId = 'cn.lmh.leviathan'
proto = [
version : "3.9.0",
"grpc" :[
version : "1.23.0"
]
]
}

allprojects{
apply plugin: 'java'
apply plugin: 'idea'

sourceCompatibility=JavaVersion.VERSION_1_8
targetCompatibility=JavaVersion.VERSION_1_8

project.group = 'cn.lmh.examples'

compileJava.options.encoding = 'UTF-8'
}

subprojects{
repositories {
mavenCentral()
mavenLocal();
};
configurations {
compile
}

dependencies {
compile "io.grpc:grpc-netty-shaded:${proto.grpc.version}"
compile "io.grpc:grpc-protobuf:${proto.grpc.version}"
compile "io.grpc:grpc-stub:${proto.grpc.version}"

testCompile group: 'junit', name: 'junit', version: '4.12'
}
}

子项目build.gradle如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
plugins{
id 'com.google.protobuf' version '0.8.10' //引入protobuf插件
}

sourceSets{
main{
proto {
srcDir 'src/main/proto' //指定.proto文件所在的位置
}
}
}

protobuf {
generatedFilesBaseDir = "$projectDir/src" //生成文件的根目录

protoc {
artifact = "com.google.protobuf:protoc:${proto.version}" //protoc的版本
}

plugins {
grpc {
artifact = "io.grpc:protoc-gen-grpc-java:${proto.grpc.version}" //gRPC的版本
}
}

generateProtoTasks {
all()*.plugins {
grpc {
outputSubDir = "java" //grpc生成文件的子目录
}
}
}
}

我们的入门子项目名称叫做starter,配置好build.gradle之后,执行gradlew :starter:generateProto就可以在src/main/java下生成对应的文件:

gRPC生成的目录结构

服务端

无论客户端以异步非阻塞还是同步阻塞形式调用,gRPC服务端的Response都是异步形式。对于异步的Request或者Response,都需要实现gRPC的io.grpc.stub.StreamObserver接口。io.grpc.stub.StreamObserver接口有三个方法:

  • onNext:表示接收/发送一个对象
  • onError:处理异常
  • onCompleted:表示Request或Response结束

当Request发送到服务端端时,会异步调用requestObserver的onNext方法,直到结束时调用requestObserver的onCompleted方法;服务端调用responseObserver的onNext把Response返回给客户端,直到调用responseObserver的onCompleted方法通知客户端Response结束。服务端代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
public class RouteGuideServer {
private final int port;
private final Server server;

public RouteGuideServer(int port) throws IOException {
this.port = port;
server = ServerBuilder.forPort(port).addService(new RouteGuideService())
.build();
}

/**
* Start server.
*/
public void start() throws IOException {
server.start();
System.out.println("Server started, listening on " + port);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
RouteGuideServer.this.stop();
}
});
}

/**
* Stop server
*/
public void stop() {
if (server != null) {
server.shutdown();
}
}

/**
* Await termination on the main thread since the grpc library uses daemon threads.
*/
private void blockUntilShutdown() throws InterruptedException {
if (server != null) {
server.awaitTermination();
}
}

public static void main(String[] args) throws Exception {
RouteGuideServer server = new RouteGuideServer(8980);
server.start();
server.blockUntilShutdown();
}

private static class RouteGuideService extends RouteGuideGrpc.RouteGuideImplBase {
@Override
public void getPoint(Point request, StreamObserver<LocationNote> responseObserver) {
LocationNote value = LocationNote
.newBuilder()
.setLocation(request)
.setTimestamp(System.nanoTime())
.build();
responseObserver.onNext(value);
responseObserver.onCompleted();
}

@Override
public void listPoints(Rectangle request, StreamObserver<Point> responseObserver) {
int left = Math.min(request.getLo().getLongitude(), request.getHi().getLongitude());
int right = Math.max(request.getLo().getLongitude(), request.getHi().getLongitude());
int top = Math.max(request.getLo().getLatitude(), request.getHi().getLatitude());
int bottom = Math.max(request.getLo().getLatitude(), request.getHi().getLatitude());
for (int x = left; x <= right; x++) {
for (int y = top; y >= bottom; y--) {
Point point = Point.newBuilder().setLongitude(x).setLatitude(y).build();
responseObserver.onNext(point);
}
}
responseObserver.onCompleted();
}

@Override
public StreamObserver<Point> recordRoute(StreamObserver<RouteSummary> responseObserver) {
return new StreamObserver<Point>() { //返回的是requestObserver
AtomicInteger pointCount = new AtomicInteger(0);
final long startTime = System.nanoTime();

@Override
public void onNext(Point value) {
int count = pointCount.incrementAndGet();
}

@Override
public void onError(Throwable t) {
}

@Override
public void onCompleted() {
RouteSummary result = RouteSummary.newBuilder().setElapsedTime(System.nanoTime() - startTime).setPointCount(pointCount.get()).build();
responseObserver.onNext(result);
responseObserver.onCompleted();
}
};
}

@Override
public StreamObserver<Point> getPointStream(StreamObserver<RouteSummary> responseObserver) {
return new StreamObserver<Point>() { //返回的是requestObserver
AtomicInteger pointCount = new AtomicInteger(0);
final long startTime = System.nanoTime();

@Override
public void onNext(Point value) {
int count = pointCount.incrementAndGet();
RouteSummary result = RouteSummary.newBuilder().setElapsedTime(System.nanoTime() - startTime).setPointCount(count).build();
responseObserver.onNext(result);
}

@Override
public void onError(Throwable t) {
}

@Override
public void onCompleted() {
responseObserver.onCompleted();
}
};
}
}
}

客户端

gRPC的客户端有同步阻塞客户端(blockingStub)和异步非阻塞客户端(Stub)两种。同步客户端使用比较方便,但是性能较低,而且不支持stream形式的Request;异步客户端性能较高,支持stream形式的Request,但是如果想要以同步方式调用需要额外封装。本文将主要以异步为例。

异步转同步

由于gRPC的异步客户端性能较高且功能更完整,所以一般都会采用异步客户端。异步客户端接收到的Response也是以io.grpc.stub.StreamObserver形式。由于客户端的调用可能是在异步进程中但更可能是在同步进程中,所以就存在一个如何把gRPC异步Response转为同步Response的问题。

一个比较常见的思路是写一个io.grpc.stub.StreamObserver实现,里面有一个内置变量保存异步Response的结果,再添加一个阻塞式的get()方法,直到Response结束才把所有结果返回。要知道Response是否结束,需要添加一个Boolean或者AtomicBoolean变量,初始化为false,调用responseObserver.onCompleted()方法时设置为true,这样就可以通过这个变量判断Response是否结束。

阻塞get()方法最常见的思路是get()写一个while循环,直到变量值改为true才退出循环并返回结果。这种方式的优点是简单直接,任何语言都可以简单实现,缺点是由于使用循环可能CPU占用较高。而对于java这种多线程比较完善的语言,另一个比较好思路是Response结束前将线程挂起,当调用responseObserver.onCompleted()方法再唤醒线程。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class CallableStreamObserver<T> implements StreamObserver<T> {
List<T> values = new ArrayList<T>();
boolean isCompleted = false;
Throwable t = null;

@Override
public void onNext(T value) {
this.values.add(value);
}

@Override
public void onError(Throwable t) {
this.isCompleted = true;
notifyAll();
}

@Override
public synchronized void onCompleted() {
this.isCompleted = true;
notifyAll();
}

public List<T> get() throws Throwable {
if (!this.isCompleted) {
synchronized (this) {
this.wait(60 * 1000);
}
}
if (null != t) {
throw this.t;
} else {
return this.values;
}
}
}

客户端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public class RouteGuideClient {

private final ManagedChannel channel;
private final RouteGuideGrpc.RouteGuideBlockingStub blockingStub;
private final RouteGuideGrpc.RouteGuideStub asyncStub;

public RouteGuideClient(String host, int port) {
String target = "dns:///" + host + ":" + port;
ManagedChannelBuilder<?> channelBuilder = ManagedChannelBuilder
.forTarget(target)
.usePlaintext();
channel = channelBuilder.build();
blockingStub = RouteGuideGrpc.newBlockingStub(channel);
asyncStub = RouteGuideGrpc.newStub(channel);
}

public void shutdown() throws InterruptedException {
channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
}

public LocationNote getPoint(int lo, int lt, boolean blocking) throws Throwable {
Point point = Point.newBuilder().setLongitude(lo).setLatitude(lt).build();
if(blocking) {
return blockingStub.getPoint(point);
}else{
CallableStreamObserver<LocationNote> responseObserver = new CallableStreamObserver<LocationNote>();
asyncStub.getPoint(point, responseObserver);
return responseObserver.get().get(0);
}
}

public Iterator<Point> listPoints(int left, int top, int right, int bottom, boolean blocking) throws Throwable {
Point hi = Point.newBuilder().setLongitude(left).setLatitude(top).build();
Point lo = Point.newBuilder().setLongitude(right).setLatitude(bottom).build();
Rectangle rec = Rectangle.newBuilder().setHi(hi).setLo(lo).build();
if(blocking){
return blockingStub.listPoints(rec);
}else{
CallableStreamObserver<Point> responseObserver = new CallableStreamObserver<Point>();
asyncStub.listPoints(rec, responseObserver);
return responseObserver.get().iterator();
}
}

public RouteSummary recordRoute(Collection<Point> points) throws Throwable {
CallableStreamObserver<RouteSummary> responseObserver = new CallableStreamObserver<RouteSummary>();
StreamObserver<Point> requestObserver = asyncStub.recordRoute(responseObserver);
points.stream().parallel().forEach(p -> requestObserver.onNext(p));
requestObserver.onCompleted();
return responseObserver.get().get(0);

}

public List<RouteSummary> getPointStream(Collection<Point> points) throws Throwable {
CallableStreamObserver<RouteSummary> responseObserver = new CallableStreamObserver<RouteSummary>();
StreamObserver<Point> requestObserver = asyncStub.getPointStream(responseObserver);
points.stream().parallel().forEach(p -> requestObserver.onNext(p));
requestObserver.onCompleted();
return responseObserver.get();
}
}

gRPC客户端代码详解

gRPC官方将自己分为三层组件:Stub、Channel和Transport。

  • Stub层是最上层的代码,gRPC附带的插件可以从.proto文件直接生成Stub层代码,开发人员通过直接调用Stub层的代码调用RPC服务
  • Channel层是对Transport层功能的抽象,同时提供了很多有用的功能,比如服务发现和负载均衡。。
  • Transport层承担了将字节从网络中取出和放入数据的工作,有三种实现Netty、okHttp、inProgress。Transport层是最底层的代码。

整个grpc-java项目的代码比较多。从风格上来讲,封装比较多,相对于interface更喜欢使用abstract class,相对于反射更喜欢使用硬编码,而且大量使用了单线程异步调用造成调用栈断裂,与常见的java项目的编码风格有很大差别,阅读起来可能容易不习惯。

在源码层面本文将关注下面这些方面:

  • Channel的初始化过程;
  • gRPC中的服务发现;
  • gRPC中的负载均衡
  • Client与Server之间的数据传输

Channel的初始化过程

通过入门示例可以看到,Channel的初始化过程分三步:

  1. 调用forTarget方法创建io.grpc.ManagedChannelBuilder;
  2. 配置各种选项,不论如何配置,返回的总是io.grpc.ManagedChannelBuilder对象;
  3. 调用build方法创建io.grpc.ManagedChannel。

forTarget方法

gRPC这里设计比较繁琐,过程比较绕。forTarget方法的实际功能就是把参数target赋值给io.grpc.ManagedChannelBuilder的内部变量target。

1
2
3
public static ManagedChannelBuilder<?> forTarget(String target) {
return ManagedChannelProvider.provider().builderForTarget(target);
}

io.grpc.ManagedChannelProvider.provider()会返回一个io.grpc.ManagedChannelProvider实现。有哪些io.grpc.ManagedChannelProvider实现是在io.grpc.ManagedChannelProvider中以硬编码形式确定的,这里其实存在利用反射改进的空间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private static final class HardcodedClasses implements Iterable<Class<?>> {
@Override
public Iterator<Class<?>> iterator() {
List<Class<?>> list = new ArrayList<>();
try {
list.add(Class.forName("io.grpc.okhttp.OkHttpChannelProvider"));
} catch (ClassNotFoundException ex) {
// ignore
}
try {
list.add(Class.forName("io.grpc.netty.NettyChannelProvider"));
} catch (ClassNotFoundException ex) {
// ignore
}
return list.iterator();
}
}

实际上就根据依赖的jar包不同就只有两个实现,一个netty的,一个okhttp的。如果像入门示例项目一样只配置了netty实现,那就只有netty的。io.grpc.netty.NettyChannelProvider的buildForTarget方法调用的是io.grpc.netty.NettyChannelBuilder的forTarget方法。

1
2
3
public NettyChannelBuilder builderForTarget(String target) {
return NettyChannelBuilder.forTarget(target);
}

而io.grpc.netty.NettyChannelBuilder继承自io.grpc.internal.AbstractManagedChannelImplBuilder,forTarget方法实际上调用了父类的构造函数。

1
2
3
4
5
6
7
NettyChannelBuilder(String target) {
super(target);
}

public static NettyChannelBuilder forTarget(String target) {
return new NettyChannelBuilder(target);
}

io.grpc.internal.AbstractManagedChannelImplBuilder的构造函数最终会是把参数赋值给target变量。

1
2
3
4
protected AbstractManagedChannelImplBuilder(String target) {
this.target = Preconditions.checkNotNull(target, "target");
this.directServerAddress = null;
}

build方法

从前文可以看到,实际初始化的io.grpc.ManagedChannelBuilder实际上是io.grpc.netty.NettyChannelBuilder,其build方法实现在其父类io.grpc.internal.AbstractManagedChannelImplBuilder中。

1
2
3
4
5
6
7
8
9
10
11
public ManagedChannel build() {
return new ManagedChannelOrphanWrapper(new ManagedChannelImpl(
this,
buildTransportFactory(),
// TODO(carl-mastrangelo): Allow clients to pass this in
new ExponentialBackoffPolicy.Provider(),
SharedResourcePool.forResource(GrpcUtil.SHARED_CHANNEL_EXECUTOR),
GrpcUtil.STOPWATCH_SUPPLIER,
getEffectiveInterceptors(),
TimeProvider.SYSTEM_TIME_PROVIDER));
}

这里的io.grpc.internal.ManagedChannelOrphanWrapper和io.grpc.internal.ManagedChannelImpl其实都是io.grpc.ManagedChannel的实现。io.grpc.internal.ManagedChannelOrphanWrapper从功能上分析没有任何作用,io.grpc.internal.ManagedChannelOrphanWrapper会为io.grpc.ManagedChannel创建弱引用,并被放置到ReferenceQueue中。如果Channel是单例的,那么意义不大;如果客户端被重复创建却没有被关闭,那么ReferenceQueue中会留下相应的引用记录,可能有助于排查问题。

io.grpc.internal.ManagedChannelImpl构造方法的几个参数中,除了第一个参数是builder本身,第二个参数是用来创建Transport的Factory,第三个参数是后台连接重试策略,第四个参数是gRPC的全局线程池,第五个和第七个都是和时间相关的对象,主要用于日志中,第六个是客户端调用时的interceptor。在io.grpc.netty.NettyChannelBuilder中,buildTransportFactory方法会创建一个io.grpc.netty.NettyChannelBuilder.NettyTransportFactory。

服务发现

前文的入门示例中直接写了target,只能连接单个Server。如果有多个可以提供服务的Server,那么就需要有一种方式通过单个target发现这些Server。在io.grpc.ManagedChannelBuilder中有一个nameResolverFactory方法,可以用来指定如何解析target地址,发现多个服务端。

nameResolverFactory方法

这个方法的实现也在io.grpc.internal.AbstractManagedChannelImplBuilder中,如果用户有自己的io.grpc.NameResolver.Factory实现的话可以通过nameResolverFactory方法指定,gRPC就会使用用户自己的io.grpc.NameResolver.Factroy实现代替gRPC自己的默认实现,否则会使用io.grpc.NameResolverRegistry中的默认实现。

io.grpc.NameResolverRegistry会通过硬编码加载io.grpc.NameResolverProvider实现,并创建一个与之有关的io.grpc.NameResolver.Factory的实现。目前硬编码加载的io.grpc.NameResolverProvider实现只有io.grpc.internal.DnsNameResolverProvider一种。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
private final NameResolver.Factory factory = new NameResolverFactory();
@GuardedBy("this")
private final LinkedHashSet<NameResolverProvider> allProviders = new LinkedHashSet<>();

private synchronized void addProvider(NameResolverProvider provider) {
checkArgument(provider.isAvailable(), "isAvailable() returned false");
allProviders.add(provider);
}

public static synchronized NameResolverRegistry getDefaultRegistry() {
if (instance == null) {
List<NameResolverProvider> providerList = ServiceProviders.loadAll(
NameResolverProvider.class,
getHardCodedClasses(),
NameResolverProvider.class.getClassLoader(),
new NameResolverPriorityAccessor());
if (providerList.isEmpty()) {
logger.warning("No NameResolverProviders found via ServiceLoader, including for DNS. This "
+ "is probably due to a broken build. If using ProGuard, check your configuration");
}
instance = new NameResolverRegistry();
for (NameResolverProvider provider : providerList) {
logger.fine("Service loader found " + provider);
if (provider.isAvailable()) {
instance.addProvider(provider);
}
}
instance.refreshProviders();
}
return instance;
}

public NameResolver.Factory asFactory() {
return factory;
}

@VisibleForTesting
static List<Class<?>> getHardCodedClasses() {
ArrayList<Class<?>> list = new ArrayList<>();
try {
list.add(Class.forName("io.grpc.internal.DnsNameResolverProvider"));
} catch (ClassNotFoundException e) {
logger.log(Level.FINE, "Unable to find DNS NameResolver", e);
}
return Collections.unmodifiableList(list);
}

private final class NameResolverFactory extends NameResolver.Factory {
@Override
@Nullable
public NameResolver newNameResolver(URI targetUri, NameResolver.Args args) {
List<NameResolverProvider> providers = providers();
for (NameResolverProvider provider : providers) {
NameResolver resolver = provider.newNameResolver(targetUri, args);
if (resolver != null) {
return resolver;
}
}
return null;
}

@Override
public String getDefaultScheme() {
List<NameResolverProvider> providers = providers();
if (providers.isEmpty()) {
return "unknown";
}
return providers.get(0).getDefaultScheme();
}
}

getDefaultSchema会匹配target中的schema(如dns),如果匹配的上,就使用相应的NameResolver.Factory,返回NameResolver决定真正的服务访问地址。

io.grpc.NameResolver

我们来看io.grpc.NameResolver

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
public abstract class NameResolver {

public abstract String getServiceAuthority();

public void start(final Listener listener) {
if (listener instanceof Listener2) {
start((Listener2) listener);
} else {
start(new Listener2() {
@Override
public void onError(Status error) {
listener.onError(error);
}

@Override
public void onResult(ResolutionResult resolutionResult) {
listener.onAddresses(resolutionResult.getAddresses(), resolutionResult.getAttributes());
}
});
}
}

public void start(Listener2 listener) {
start((Listener) listener);
}

public abstract void shutdown();

public void refresh() {}

@ThreadSafe
public interface Listener {

void onAddresses(List<EquivalentAddressGroup> servers, @ResolutionResultAttr Attributes attributes);

void onError(Status error);
}

public abstract static class Listener2 implements Listener {

@Override
public final void onAddresses(
List<EquivalentAddressGroup> servers, @ResolutionResultAttr Attributes attributes) {
onResult(
ResolutionResult.newBuilder().setAddresses(servers).setAttributes(attributes).build());
}

public abstract void onResult(ResolutionResult resolutionResult);

@Override
public abstract void onError(Status error);
}

public static final class ResolutionResult {
private final List<EquivalentAddressGroup> addresses;
@ResolutionResultAttr
private final Attributes attributes;
@Nullable
private final ConfigOrError serviceConfig;

ResolutionResult(
List<EquivalentAddressGroup> addresses,
@ResolutionResultAttr Attributes attributes,
ConfigOrError serviceConfig) {
this.addresses = Collections.unmodifiableList(new ArrayList<>(addresses));
this.attributes = checkNotNull(attributes, "attributes");
this.serviceConfig = serviceConfig;
}

public static Builder newBuilder() {
return new Builder();
}

public Builder toBuilder() {
return newBuilder()
.setAddresses(addresses)
.setAttributes(attributes)
.setServiceConfig(serviceConfig);
}

public List<EquivalentAddressGroup> getAddresses() {
return addresses;
}

@ResolutionResultAttr
public Attributes getAttributes() {
return attributes;
}

@Nullable
public ConfigOrError getServiceConfig() {
return serviceConfig;
}

@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1770")
public static final class Builder {
private List<EquivalentAddressGroup> addresses = Collections.emptyList();
private Attributes attributes = Attributes.EMPTY;
@Nullable
private ConfigOrError serviceConfig;
Builder() {}

public Builder setAddresses(List<EquivalentAddressGroup> addresses) {
this.addresses = addresses;
return this;
}

public Builder setAttributes(Attributes attributes) {
this.attributes = attributes;
return this;
}

public Builder setServiceConfig(@Nullable ConfigOrError serviceConfig) {
this.serviceConfig = serviceConfig;
return this;
}

public ResolutionResult build() {
return new ResolutionResult(addresses, attributes, serviceConfig);
}
}
}
}

在客户端首次连接服务端的时候会调用Listener2的start方法,需要更新的时候会调用refresh方法。当Listener2接收到服务端地址时,会调用onResult方法。

io.grpc.internal.DnsNameResolver

由于gRPC支持长连接,所以如果直连的话只会访问一个域名下的一台服务器,即首次连接时通过DNS返回IP地址。io.grpc.internal.DnsNameResolverProvider是对io.grpc.internal.DnsNameResolver的简单封装,只支持以dns:///开头的地址。io.grpc.internal.DnsNameResolver会根据target获取该host下所有关联的IP,即通过DNS解析出所有的服务端IP地址。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public final class DnsNameResolverProvider extends NameResolverProvider {

private static final String SCHEME = "dns";

@Override
public DnsNameResolver newNameResolver(URI targetUri, NameResolver.Args args) {
if (SCHEME.equals(targetUri.getScheme())) {
String targetPath = Preconditions.checkNotNull(targetUri.getPath(), "targetPath");
Preconditions.checkArgument(targetPath.startsWith("/"),
"the path component (%s) of the target (%s) must start with '/'", targetPath, targetUri);
String name = targetPath.substring(1);
return new DnsNameResolver(
targetUri.getAuthority(),
name,
args,
GrpcUtil.SHARED_CHANNEL_EXECUTOR,
Stopwatch.createUnstarted(),
InternalServiceProviders.isAndroid(getClass().getClassLoader()));
} else {
return null;
}
}

@Override
public String getDefaultScheme() {
return SCHEME;
}

@Override
protected boolean isAvailable() {
return true;
}

@Override
protected int priority() {
return 5;
}
}

可以看到io.grpc.internal.DnsNameResolver中的start和refresh方法都调用的是resolve方法,而resolve方法是执行了一个继承自Runnable的Resolve接口。

DnsNameResolver

在有代理的情况下,Resolve的resolveInternal会根据代理返回的ProxiedSocketAddress创建EquivalentAddressGroup作为服务端列表返回,并设置空config;否则会调用resolveAll方法获取服务端列表,并调用parseServiceConfig方法设置config。resolveAll方法返回的ResolutionResults有三个变量addresses、txtRecords和balancerAddresses。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
@VisibleForTesting
static ResolutionResults resolveAll(
AddressResolver addressResolver,
@Nullable ResourceResolver resourceResolver,
boolean requestSrvRecords,
boolean requestTxtRecords,
String name) {
List<? extends InetAddress> addresses = Collections.emptyList();
Exception addressesException = null;
List<EquivalentAddressGroup> balancerAddresses = Collections.emptyList();
Exception balancerAddressesException = null;
List<String> txtRecords = Collections.emptyList();
Exception txtRecordsException = null;

try {
addresses = addressResolver.resolveAddress(name);
} catch (Exception e) {
addressesException = e;
}
if (resourceResolver != null) {
if (requestSrvRecords) {
try {
balancerAddresses =
resourceResolver.resolveSrv(addressResolver, GRPCLB_NAME_PREFIX + name);
} catch (Exception e) {
balancerAddressesException = e;
}
}
if (requestTxtRecords) {
boolean balancerLookupFailedOrNotAttempted =
!requestSrvRecords || balancerAddressesException != null;
boolean dontResolveTxt =
(addressesException != null) && balancerLookupFailedOrNotAttempted;
if (!dontResolveTxt) {
try {
txtRecords = resourceResolver.resolveTxt(SERVICE_CONFIG_NAME_PREFIX + name);
} catch (Exception e) {
txtRecordsException = e;
}
}
}
}
try {
if (addressesException != null
&& (balancerAddressesException != null || balancerAddresses.isEmpty())) {
Throwables.throwIfUnchecked(addressesException);
throw new RuntimeException(addressesException);
}
} finally {
if (addressesException != null) {
logger.log(Level.FINE, "Address resolution failure", addressesException);
}
if (balancerAddressesException != null) {
logger.log(Level.FINE, "Balancer resolution failure", balancerAddressesException);
}
if (txtRecordsException != null) {
logger.log(Level.FINE, "ServiceConfig resolution failure", txtRecordsException);
}
}
return new ResolutionResults(addresses, txtRecords, balancerAddresses);
}

addressResolver的resolveAddress方法实际是调用JDK的java.net.InetAddress的getAllByName方法,即根据host通过DNS返回一系列服务端列表。resourceResolver根据LDAP协议获取指定命名空间下的服务端列表地址。txtRecords和balancerAddresses是和LDAP相关的参数,方法入参requestSrvRecords和requestTxtRecords的默认值都是false。由于LDAP不是特别常用,这里就不深入展开了。

NameResolverListener的onResult

当NameResolverListener获取解析结果后会调用onResult方法,进而会调用io.grpc.LoadBalancer的handleResolvedAddresses方法。

获取解析结果后调用handleResolvedAddresses方法

负载均衡

io.grpc.ManagedChannel初始化的时候可以通过defaultLoadBalancingPolicy方法指定负载均衡策略,实际是根据defaultLoadBalancingPolicy创建了一个io.grpc.internal.AutoConfiguredLoadBalancerFactory对象。io.grpc.internal.AutoConfiguredLoadBalancerFactory则通过io.grpc.LoadBalancerRegistry获取对应名称的负载均衡策略。io.grpc.LoadBalancerProvider的getPolicyName方法指定负载均衡策略名称,newLoadBalancer返回负载均衡io.grpc.LoadBalancer的具体实现。如果想要添加自定义负载均衡策略,需要调用io.grpc.LoadBalancerRegistry的registry方法,并自己实现io.grpc.LoadBalancerProvider和io.grpc.LoadBalancer,并指定负载均衡策略名称即可。
defaultLoadBalancingPolicy方法

io.grpc.LoadBalancer.SubchannelPicker

io.grpc.LoadBalancer的核心逻辑实际在SubchannelPicker中。pickSubchannel方法会返回的PickResult中包含真正可用的subchannel,用来进行后续的数据传输。

1
2
3
4
5
6
7
8
9
public abstract static class SubchannelPicker {
/**
* Make a balancing decision for a new RPC.
*
* @param args the pick arguments
* @since 1.3.0
*/
public abstract PickResult pickSubchannel(PickSubchannelArgs args);
}

gRPC默认提供了两种负载均衡实现策略:prick_first和round_robin。前者总会使用第一个可用的服务端,后者则是简单轮询。

handleResolvedAddresses

当服务端列表更新时,会调用io.grpc.LoadBalancer的handleResolvedAddresses方法更新可用的subchannel。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Override
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
List<EquivalentAddressGroup> servers = resolvedAddresses.getAddresses();
if (subchannel == null) {
final Subchannel subchannel = helper.createSubchannel(
CreateSubchannelArgs.newBuilder()
.setAddresses(servers)
.build());
subchannel.start(new SubchannelStateListener() {
@Override
public void onSubchannelState(ConnectivityStateInfo stateInfo) {
processSubchannelState(subchannel, stateInfo);
}
});
this.subchannel = subchannel;
helper.updateBalancingState(CONNECTING, new Picker(PickResult.withSubchannel(subchannel)));
subchannel.requestConnection();
} else {
subchannel.updateAddresses(servers);
}
}

如果是首次调用(subchannel == null) 会创建subchannel,其实现是io.grpc.internal.ManagedChannelImpl.SubchannelImpl,创建的过程中会创建io.grpc.internal.InternalSubchannel。然后调用io.grpc.internal.ManagedChannelImpl的updateBalancingState方法,把subchannelPicker更新为实现Picker,然后开启subchannel的连接。

开启subchannel链接

在开启subchannel的连接过程中,会调用io.grpc.internal.InternalSubchannel的obtainActiveTransport方法。

这里的transportFactory就是上面提到io.grpc.ManagedChannelBuilder调用build初始化时调用buildTransportFactory方法返回的,依赖于Transport层的具体实现。在netty实现中,返回的是io.grpc.netty.NettyClientTransport。

传输

gRPC客户端发起Request时,stub会调用ClientCalls的startCall方法,最终会调用io.grpc.internal.ManagedChannelImpl.ChannelTransportProvider的get方法获取io.grc.internal.ClientTransport。

gRPC客户端发起Request时调用ChannelTransportProvider的get方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public ClientTransport get(PickSubchannelArgs args) {
SubchannelPicker pickerCopy = subchannelPicker;
if (shutdown.get()) {
return delayedTransport;
}
if (pickerCopy == null) {
final class ExitIdleModeForTransport implements Runnable {
@Override
public void run() {
exitIdleMode();
}
}
syncContext.execute(new ExitIdleModeForTransport());
return delayedTransport;
}
PickResult pickResult = pickerCopy.pickSubchannel(args);
ClientTransport transport = GrpcUtil.getTransportFromPickResult(
pickResult, args.getCallOptions().isWaitForReady());
if (transport != null) {
return transport;
}
return delayedTransport;
}

如果subchannelPicker存在,会使用subchannelPicker进行选择;如果是首次访问服务端时subchannel肯定不存在,会使用syncContext异步执行exitIdleMode方法初始化。syncContext是一个单线程执行队列,可以保证先提交的任务先执行。delayedTransport的执行也依赖于syncContext,这就保证了delayedTransport中的方法执行一定会在exitIdleMode方法之后。

首次访问服务端时执行exidIdleMode方法

exitIdleMode方法会初始化NameResolver和LoadBalancer,并会启动NameResolverListener。当解析完成后会调用NameResolverListener的onResult方法,进而调用LoadBalancer的handleResolvedAddresses方法创建subchannelPicker、创建并连接subchannel。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@VisibleForTesting
void exitIdleMode() {
syncContext.throwIfNotInThisSynchronizationContext();
if (shutdown.get() || panicMode) {
return;
}
if (inUseStateAggregator.isInUse()) {
cancelIdleTimer(false);
} else {
rescheduleIdleTimer();
}
if (lbHelper != null) {
return;
}
channelLogger.log(ChannelLogLevel.INFO, "Exiting idle mode");
LbHelperImpl lbHelper = new LbHelperImpl();
lbHelper.lb = loadBalancerFactory.newLoadBalancer(lbHelper);
this.lbHelper = lbHelper;

NameResolverListener listener = new NameResolverListener(lbHelper, nameResolver);
nameResolver.start(listener);
nameResolverStarted = true;
}

Request

发送Request时会调用ConnectionClientTransport的newStream方法返回一个io.grpc.internal.ClientStream对象,而首次调用会通过delayedTransport延迟调用newStream方法。

调用newStream的调用栈

netty实现会返回一个io.grpc.netty.shaded.io.grpc.netty.NettyClientStream对象。io.grpc.internal.ClientStream下有两个子类,TransportState负责处理传输状态,Sink负责写入数据。

在进行一系列http2相关设置后,会调用io.grpc.internal.ClientStream的start方法,为TransportState设置监听并通过Sink写入Header。

1
2
3
4
5
6
7
8
@Override
public final void start(ClientStreamListener listener) {
transportState().setListener(listener);
if (!useGet) {
abstractClientStreamSink().writeHeaders(headers, null);
headers = null;
}
}

初始化结束后,调用requestObserver的onNext方法会调用io.grpc.internal.ClientCallImpl的sendMessage方法,将protobuf对象转换成InputStream,并作为参数调用io.grpc.internal.ClientStream的writeMessage方法,进而调用io.grpc.internal.MessageFramer的writePayload方法,最终调用writeToOutputStream方法将内容写入Http的OutputStream。如果是参数是stream形式会继续调用flush。

onNext

调用requestObserver的onCompleted方法会调用io.grpc.internal.ClientCallImpl的halfClose方法,进而会调用io.grpc.internal.MessageFramer的endOfMessages,flush并结束发送消息。

onComplete

Response

onNext

客户端接受到Response会调用ClientStreamListener的messagesAvailable方法,并通过同步线程池最终调用StreamObserver的onNext方法接收数据。

onComplete

当返回结束时会调用TransportState的transportReportStatus方法关闭请求,进而调用ClientStreamListener的closed方法关闭监听,进而调用StreamObserver的onClose方法。

gRPC通信格式

gRPC发送的请求发送方法是POST,路径是/${serviceName}/${methodName},content-type为content-type = application/grpc+proto。

Request

1
2
3
4
5
6
7
8
9
10
HEADERS (flags = END_HEADERS)
:method = POST
:scheme = http
:path = /RouteGuide/getPoint
grpc-timeout = 1S
content-type = application/grpc+proto
grpc-encoding = gzip

DATA (flags = END_STREAM)
<Length-Prefixed Message>

Response

1
2
3
4
5
6
7
8
9
10
11
HEADERS (flags = END_HEADERS)
:status = 200
grpc-encoding = gzip
content-type = application/grpc+proto

DATA
<Length-Prefixed Message>

HEADERS (flags = END_STREAM, END_HEADERS)
grpc-status = 0 # OK
trace-proto-bin = jher831yy13JHy3hc

扩展gRPC

自定义基于zookeeper的NameResolver.Factory实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
public class CuratorNameResolver extends NameResolver {
CuratorFramework curatorFramework;
String basePath;
String serviceAuthority;
private Listener2 listener;

public CuratorNameResolver(CuratorFramework curatorFramework, String basePath, String serviceAuthority) {
this.curatorFramework = curatorFramework;
this.basePath = basePath;
this.serviceAuthority = serviceAuthority;
}

@Override
public void start(Listener2 listener) {
this.curatorFramework.start();
this.listener = listener;
refresh();
}

@Override
public void refresh() {
List<EquivalentAddressGroup> servers = new ArrayList<>();
try {
List<EquivalentAddressGroup> addresses = curatorFramework.getChildren()
.forPath(basePath)
.stream().map(address ->{
try {
URI uri = new URI("http://" + address);
return new EquivalentAddressGroup(
new InetSocketAddress(uri.getHost(), uri.getPort()));
}catch (Exception e){
listener.onError(Status.INTERNAL);
return null;
}
}).collect(Collectors.toList());
listener.onResult(ResolutionResult.newBuilder().setAddresses(addresses).build());

} catch (Exception e) {
listener.onError(Status.INTERNAL);
}
}

@Override
public String getServiceAuthority() {
return this.serviceAuthority;
}

@Override
public void shutdown() {
this.curatorFramework.close();
}

public static class Factory extends NameResolver.Factory{
@Override
public NameResolver newNameResolver(URI targetUri, Args args) {
String address = targetUri.getHost() + ":" + targetUri.getPort();
String authority = null == targetUri.getAuthority() ? address : targetUri.getAuthority();
CuratorFramework curator = CuratorFrameworkFactory.builder()
.connectString(address)
.retryPolicy(new ExponentialBackoffRetry(1000, 5))
.connectionTimeoutMs(1000)
.sessionTimeoutMs(60000)
.build();
return new CuratorNameResolver(curator, targetUri.getPath(), authority);
}

@Override
public String getDefaultScheme() {
return "zookeeper";
}
}
}

自定义随机负载均衡实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
public class RandomLoadBalancer extends LoadBalancer{
LoadBalancer.Helper helper;

private final Map<EquivalentAddressGroup, Subchannel> subchannels =
new HashMap<>();
static final Attributes.Key<Ref<ConnectivityStateInfo>> STATE_INFO =
Attributes.Key.create("state-info");

public RandomLoadBalancer(LoadBalancer.Helper helper) {
this.helper = helper;
}
@Override
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
List<EquivalentAddressGroup> servers = resolvedAddresses.getAddresses();
for(EquivalentAddressGroup server : servers){
List<EquivalentAddressGroup> serverSingletonListt = Collections.singletonList(server);
Subchannel exists = subchannels.getOrDefault(server, null);
if(null != exists){
exists.updateAddresses(serverSingletonListt);
continue;
}
Attributes.Builder subchannelAttrs = Attributes.newBuilder()
.set(STATE_INFO,
new Ref<>(ConnectivityStateInfo.forNonError(IDLE)));
final Subchannel subchannel = helper.createSubchannel(CreateSubchannelArgs.newBuilder()
.setAddresses(serverSingletonListt)
.setAttributes(subchannelAttrs.build())
.build());
subchannels.put(server, subchannel);
subchannel.start(new SubchannelStateListener() {
@Override
public void onSubchannelState(ConnectivityStateInfo state) {
for(Map.Entry<EquivalentAddressGroup, Subchannel> entry : subchannels.entrySet()){
if(subchannel == entry.getValue()){
if (state.getState() == SHUTDOWN) {
subchannels.remove(entry.getKey());
}
if (state.getState() == IDLE) {
subchannel.requestConnection();
}
subchannel.getAttributes().get(STATE_INFO).value = state;
updateBalancingState();
return;
}
}
}
});
subchannel.requestConnection();
}
updateBalancingState();
}
@Override
public void handleNameResolutionError(Status error) {
shutdown();
helper.updateBalancingState(TRANSIENT_FAILURE, new SubchannelPicker() {
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
return PickResult.withError(error);
}
});
}

private void updateBalancingState(){
boolean ready = true;
for(Subchannel subchannel : this.subchannels.values()){
if(subchannel.getAttributes().get(STATE_INFO).value.getState() != READY){
helper.updateBalancingState(CONNECTING, new RandomSubchannelPick(subchannels.values()));
return;
}
}
helper.updateBalancingState(ConnectivityState.READY, new RandomSubchannelPick(subchannels.values()));
}

@Override
public void shutdown() {
for(Iterator<Map.Entry<EquivalentAddressGroup, Subchannel>> itr = subchannels.entrySet().iterator(); itr.hasNext();){
Map.Entry<EquivalentAddressGroup, Subchannel> e = itr.next();
e.getValue().shutdown();
itr.remove();
}

}

class RandomSubchannelPick extends SubchannelPicker{
Subchannel[] subchannels;
Random random = new Random(System.currentTimeMillis());

public RandomSubchannelPick(Collection<Subchannel> subchannels) {
this.subchannels = subchannels.stream().toArray(Subchannel[]::new);
}

@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
int idx = random.nextInt(subchannels.length);
return PickResult.withSubchannel(subchannels[idx]);
}
}

public static class Provider extends LoadBalancerProvider{

@Override
public boolean isAvailable() {
return true;
}

@Override
public int getPriority() {
return 100;
}

@Override
public String getPolicyName() {
return "random";
}

@Override
public LoadBalancer newLoadBalancer(LoadBalancer.Helper helper) {
return new RandomLoadBalancer(helper);
}
}

static final class Ref<T> {
T value;

Ref(T value) {
this.value = value;
}
}
}

服务端初始化

服务端需要把自己的服务地址注册到zookeeper。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
private final int port;
private final Server server;
private String registryPath;
private String address;
CuratorFramework curator = CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181")
.retryPolicy(new ExponentialBackoffRetry(1000, 5))
.connectionTimeoutMs(1000)
.sessionTimeoutMs(60000)
.build();;

public GreetingServer(int port, String registryPath) throws IOException {
this.port = port;
server = ServerBuilder.forPort(port).addService(new GreetingService())
.build();
this.registryPath = registryPath;
this.address = "localhost:" + port; //本机网卡不能正确显示地址,直接写死localhost
}

/**
* Start server.
*/
public void start() throws Exception {
this.curator.start();
server.start();;
this.curator.create()
.creatingParentContainersIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath(registryPath + "/" + address, ("http://" + address).getBytes());

System.out.println("Server started, listening on " + address);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
GreetingServer.this.stop();
}
});
}

客户端初始化

客户端需要注册自定义的NameResolverFactory和LoadBalancer。

1
2
3
4
5
6
7
8
9
10
11
12
13
public GreetingClient(String host, int port, String path) {
String target = "zookeeper://" + host + ":" + port + path;
CuratorNameResolver.Factory factory = new CuratorNameResolver.Factory();

LoadBalancerRegistry.getDefaultRegistry().register(new RandomLoadBalancer.Provider());
ManagedChannelBuilder<?> channelBuilder = ManagedChannelBuilder
.forTarget(target)
.nameResolverFactory(factory)
.defaultLoadBalancingPolicy("random")
.usePlaintext();
channel = channelBuilder.build();
blockingStub = GreetingGrpc.newBlockingStub(channel);
}

参考资料

  • 示例代码
  • grpc-java源码地址
  • gRPC传输格式说明

Liu Menghan

2 posts
3 categories
5 tags
© 2020 Liu Menghan
Powered by Hexo
|
Theme — NexT.Mist v5.1.4