gRPC 是 Google 开源的一款高性能 RPC 框架,前两天发布了 1.0 版本。RPC (Remote Procedure Call) 即远程过程调用,通过 RPC ,客户端的应用程序可以方便地调用另外一台机器上的服务端程序,因而常被应用于分布式系统中。

RPC 框架通常使用 IDL (Interface Description Language) 定义客户端和服务端进行通信的数据结构,服务端提供的服务等,然后编译生成相应的代码供客户端和服务端使用。RPC 框架一般都具备跨语言的特性,这样客户端和服务端可以分别基于不同的语言进行实现。

本文将简单地探索下 gRPC 的使用方法。文中会简单地构建一个 RPC 服务,使用 Java 实现服务端和客户端;为了验证跨语言特性,还将基于 Python 实现一个简单的客户端。

定义服务接口

gRPC 使用 Protocol Buffers 作为 IDL 和底层的序列化工具。 Protocol Buffers 也是非常有名的开源项目,主要用于结构化数据的序列化和反序列化。

在 .proto 文件中定义通信的数据结构和服务接口。关于 Protocol Buffers 的 IDL 的具体细节参考Language Guide (proto3)。本例子中定义的服务接口如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
syntax = "proto3";

option java_multiple_files = true;
option java_package = "com.jr.JRService";
option java_outer_classname = "JRProto";

package JRService;

//service definition
service JRService {
  rpc ListSongs (SingerId) returns (SongList) {}
  //using stream
  rpc GetSongs (SingerId) returns (stream Song) {}
}

message SingerId {
  int32 id = 1;
}

message Singer {
  int32 id = 1;
  string name = 2;
}

message Song {
  int32 id = 1;
  string name = 2;
  Singer singer = 3;
}

message SongList {
  repeated Song songs= 1;
}

这里要注意的是,在 Protocol Buffers 服务接口的方法定义中是不能使用基本类型的,方法参数和返回值都必须是自定义的 message 类型。

代码生成

在定义了接口描述文件后,就可以使用 Protocol Buffers 编译器生成相应编程语言的代码了。

下载 Protoc Buffer 后,编译定义的 proto 文件生成相应的代码。以 Java 为例:

1
$ protoc -I=$SRC_DIR --java_out=$DST_DIR $SRC_DIR/jr.proto

查看生成的代码可以发现,Protoco Buffers 为每一个 message 都生成了相应的接口和类,可供客户端和服务端代码直接使用。

目前还只是生成了消息对象和序列化及反序列相关的代码。为了使用 gRPC 构建 RPC 服务,还要使用 protoc-gen-grpc-java 插件来生成通信部分的代码。protoco-gen-grpc-java插件可以自行编译,或者从这里下载。使用 protoc-gen-grpc-java 插件生成通信服务相关的接口类及接口。

1
$protoc --plugin=protoc-gen-grpc-java=/path/to/protoc-gen-grpc-java --grpc-java_out=$DST_DIR --proto_path=$SRC_DIR $SRC_DIR/jr.proto

运行上述命令后会生成 JRServiceGrpc.java,后面 RPC 的服务端和客户端就依赖该类进行构建。

上述手动编译的方式有点麻烦,如果使用 Maven 或者 Gradle 的话,可以选择使用相关的插件。我这里选择使用 Gradle 构建项目,在 build.gradle 中依赖和插件可以这样配置:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
apply plugin: 'java'
apply plugin: 'com.google.protobuf'

buildscript {
    repositories {
        mavenCentral()
    }
    dependencies {
        // ASSUMES GRADLE 2.12 OR HIGHER. Use plugin version 0.7.5 with earlier
        // gradle versions
        classpath 'com.google.protobuf:protobuf-gradle-plugin:0.8.0'
    }
}


def grpcVersion = '1.0.0' // CURRENT_GRPC_VERSION

dependencies {
    compile "io.grpc:grpc-netty:${grpcVersion}"
    compile "io.grpc:grpc-protobuf:${grpcVersion}"
    compile "io.grpc:grpc-stub:${grpcVersion}"
}

protobuf {
    protoc {
        // The version of protoc must match protobuf-java. If you don't depend on
        // protobuf-java directly, you will be transitively depending on the
        // protobuf-java version that grpc depends on.
        artifact = 'com.google.protobuf:protoc:3.0.0'
    }
    plugins {
        grpc {
            artifact = "io.grpc:protoc-gen-grpc-java:${grpcVersion}"
        }
    }
    generateProtoTasks {
        all()*.plugins {
            grpc {
                // To generate deprecated interfaces and static bindService method,
                // turn the enable_deprecated option to true below:
                option 'enable_deprecated=false'
            }
        }
    }
}

使用命令 gradle generateProto 即可生成代码。

生成 Python 代码

如果想要生成 Python 相关的代码,可以通过如下方法来进行:

首先安装 gRPC tools:

1
2
$ sudo pip install grpcio
$ sudo pip install grpcio-tools

接着使用 gRPC tools 生成 Python 代码:

1
$ python -m grpc.tools.protoc -I$SRC_DIR --python_out=. --grpc_python_out=. ./proto/jr.proto

生成的 jr_pb2.py 中包含了序列化和反序列化,以及 RPC 通信相关的代码。

Java 服务端实现

服务端代码的实现主要分为两部分: - 实现服务接口需要完成的实际工作:主要通过继承生成的基本服务类,并重写相应的 RPC 方法来完成具体的工作。 - 运行一个 gRPC 服务,监听客户端的请求并返回响应。

实现服务类

自定义一个内部类,继承自生成的 JRServiceGrpc.JRServiceImplBase 抽象类。在 JRServiceImpl 中重写服务方法来完成具体的工作。

1
2
3
private static class JRServiceImpl extends JRServiceGrpc.JRServiceImplBase {
  //...
}

先来看一下 ListSongs 的实现。该方法接受一个 SingerId 请求,并返回一个 SongList。注意 SongList 的定义,SongList 由一个或多个 Song 构成。

1
2
3
4
5
    public void listSongs(SingerId request, StreamObserver<SongList> responseObserver) {
      SongList list = SongList.newBuilder().addAllSongs(genFakeSongs(request)).build();
      responseObserver.onNext(list);
      responseObserver.onCompleted();
    }

可以看到,listSongs() 方法接受两个参数:

  • SingerId, 这个是请求
  • StreamObserver<SongList>, 用于处理响应和关闭通道

这个方法中首先构建了 SongList 对象,然后使用 responseObserveronNext() 方法返回响应,并调用 onCompleted() 方法表明已经处理完毕。

至于 GetSongs 的实现,基本和 ListSongs 一致。不同点在于,由于定义 RPC 方法时指定了响应是 stream Song,因而可以多次返回响应。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
    public void getSongs(SingerId request, StreamObserver<Song> responseObserver) {
      List<Song> songs = genFakeSongs(request);
      for (Song song: songs) {
        responseObserver.onNext(song);
        try {
          Thread.sleep(1000);
        } catch (InterruptedException e) {
          responseObserver.onError(e);
        }
      }
      responseObserver.onCompleted();
    }

这里多次调用 responseObserveronNext() 方法返回相应,每次间隔 1s ,调用onCompleted() 方法表明经处理完毕。

启动服务端监听

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
  private int port = 50051;
  private Server server;

  private void start() throws IOException{
    server = ServerBuilder.forPort(port).addService(new JRServiceImpl()).build();
    server.start();

    Runtime.getRuntime().addShutdownHook(new Thread(){
      @Override
      public void run() {
        JRServiceServer.this.stop();
      }
    });
  }

  private void stop() {
    if (server != null) {
      server.shutdown();
    }
  }

使用 ServerBuilder 来创建一个 Server ,主要分为三步: - 指定服务监听的端口 - 创建具体的服务对象,并注册给 ServerBuilder - 创建 Server 并启动。

Java 客户端实现

为了调用服务端的方法,需要创建 stub 。有两种类型的 stub :

  • blocking/synchronous stub : 阻塞,客户端发起 RPC 调用后一直等待服务端的响应
  • non-blocking/asynchronous stub : 非阻塞,异步响应,通过 StreamObserver 在响应时进行回调

为了创建 stub , 首先要创建 channel , 需要指定服务端的主机和监听的端口。然后按序创建阻塞或者非阻塞的 stub 。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
  private final ManagedChannel channel;
  private final JRServiceGrpc.JRServiceBlockingStub blockingStub;
  private final JRServiceGrpc.JRServiceStub asyncStub;

  public JRServiceClient(String hots, int port) {
    channel = ManagedChannelBuilder.forAddress(hots, port)
        // Channels are secure by default (via SSL/TLS). For the example we disable TLS to avoid
        // needing certificates.
        .usePlaintext(true)
        .build();
    blockingStub = JRServiceGrpc.newBlockingStub(channel);
    asyncStub = JRServiceGrpc.newStub(channel);
  }

通过 stub 来调用发起 RPC 调用,直接在 stub 上调用同名方法。

1
2
3
4
5
6
7
  public void getSongList() {
    SingerId request = SingerId.newBuilder().setId(1).build();
    SongList songList = blockingStub.listSongs(request);
    for (Song song : songList.getSongsList()) {
      logger.info(song.toString());
    }
  }

构造请求对象并传递给 listSongs(request) 方法。看上去是调用本地方法进行处理,实际上中间涉及到网络的通信。

对于 stream Song 的响应,返回的是一个迭代器 Iterator<Song>

1
2
3
4
5
6
7
  public void getSongsUsingStream() {
    SingerId request = SingerId.newBuilder().setId(1).build();
    Iterator<Song> iterator = blockingStub.getSongs(request);
    while (iterator.hasNext()) {
      logger.info(iterator.next().toString());
    }
  }

对于异步的 stub,则需要一个 StreamObserver 对象来完成回调处理:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
  public void getSongsUsingAsyncStub() throws InterruptedException {

    SingerId request = SingerId.newBuilder().setId(1).build();
    final CountDownLatch latch = new CountDownLatch(1); // using CountDownLatch

    StreamObserver<Song> responseObserver = new StreamObserver<Song>() {
      @Override
      public void onNext(Song value) {
        logger.info("get song :" + value.toString());
      }

      @Override
      public void onError(Throwable t) {
        Status status = Status.fromThrowable(t);
        logger.info("failed with status : " + status );
        latch.countDown();
      }

      @Override
      public void onCompleted() {
        logger.info("finished!");
        latch.countDown();
      }
    };

    asyncStub.getSongs(request, responseObserver);

    latch.await();
  }

创建了一个实现了 StreamObserver 接口的匿名内部类对象 responseObserver 用于回调处理,每一次在收到一个响应的 Song 对象时会触发 onNext() 方法,RPC 调用完成或出错时则会调用 onCompleted()onError() 。这里还用到了一个 CountDownLatch ,等待响应全部接受完毕后才从方法返回。

Python 客户端的实现

Python 客户端的实现也分为三步:1)创建 channel ;2)创建 stub ;3)在 stub 上调用服务方法发起 RPC 调用。相关代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
def get_songlist(stub):
  request = jr_pb2.SingerId(id=1)
  song_list = stub.ListSongs(request)
  for song in song_list.songs:
    print '%d, %s, %s, %s' % (time.time()*1000, song.id, song.name, song.singer.name)

def get_songs_using_stream(stub):
  request = jr_pb2.SingerId(id=1)
  songs = stub.GetSongs(request)
  for song in songs:
    print '%d, %s, %s, %s' % (time.time()*1000, song.id, song.name, song.singer.name)

def run():
  channel = grpc.insecure_channel('localhost:50051')
  stub = jr_pb2.JRServiceStub(channel)
  get_songlist(stub)
  get_songs_using_stream(stub)

小结

总的来说,使用 gRPC 构建 RPC 分为三步:1)使用 IDL 定义服务接口及通信消息对象;2)使用 Protocol Buffers 和 gRPC 工具生成序列化/反序列化和 RPC 通信的代码;3)基于生成的代码创建服务端和客户端应用。gRPC 在数据交换格式上使用了自家的 Protocol Buffers,已经被证明是非常高效序列化框架;在传输协议上 gRPC 支持 HTTP 2.0 标准化协议,比 HTTP 1.1 有更好的性能。

RPC 的实现原理其实是基于 C/S 架构的,通过网络将客户端的请求传输给服务端,服务端对请求进行处理后将结果返回给客户端。在很多情况下使用 JSON 进行数据传输的 REST 服务和 RPC 实现的效果差不多,都是跨网络进行数据的交换,但是 RPC 中客户端在进行方法调用的时候更加便捷,底层是完全透明的,看上去就像是调用本地方法一样。

之前也简单地用过一点 FaceBook 开源 RPC 框架的 Thrift,感觉 gRPC 和 Thrift 在使用上还是比较接近的,不知道两者的性能对比如何^_^

本文项目地址

GitHub: https://github.com/jrthe42/grpc-service-demo

-EOF-