gRPC
gRPC 是一个高性能、通用的开源 RPC 框架,其由 Google 主要面向移动应用开发并基于 HTTP/2 协议标准而设计,基于 ProtoBuf(Protocol Buffers) 序列化协议开发,且支持众多开发语言。
本篇内容演示了如何在 Midway 体系下,提供 gRPC 服务,以及调用 gRPC 服务的方法。
Midway 当前采用了最新的 gRPC 官方推荐的 @grpc/grpc-js 进行开发,并提供了一些工具包,用于快速发布服务和调用服务。
我们使用的模块为 @midwayjs/grpc ,既是一个框架(可以独立发布服务),又是一个组件(可以接入其它框架调用 gRPC 服务)。
创建示例
$ npm -v
# 如果是 npm v6
$ npm init midway --type=grpc my_midway_app
# 如果是 npm v7
$ npm init midway -- --type=grpc my_midway_app
此示例包含一个 gRPC 服务。
目录结构
.
├── package.json
├── proto													## proto 定义文件
│   └── helloworld.proto
├── src
│   ├── configuration.ts					## 入口配置文件
│   ├── interface.ts
│   └── provider									## gRPC 提供服务的文件
│       └── greeter.ts
├── test
├── bootstrap.js									## 服务启动入口
└── tsconfig.json
定义服务接口
在微服务中,定义一个服务需要特定的接口定义语言(IDL)来完成,在 gRPC 中 默认使用 Protocol Buffers 作为序列化协议。
序列化协议独立于语言和平台,提供了多种语言的实现,Java,C++,Go 等等,每一种实现都包含了相应语言的编译器和库文件。所以 gRPC 是一个提供和调用都可以跨语言的服务框架。
一个 gRPC 服务的大体架构可以用官网上的一幅图表示。
Protocol Buffers 协议的文件,默认的后缀为 .proto 。.proto 后缀的 IDL 文件,并通过其编译器生成特定语言的数据结构、服务端接口和客户端 Stub 代码。
由于 proto 文件可以跨语言使用,为了方便共享,我们一般将 proto 文件放在 src 目录外侧,方便其他工具复制分发。
下面是一个基础的 proto/helloworld.proto  文件。
syntax = "proto3";
package helloworld;
service Greeter {
  // Sends a greeting
  rpc SayHello (HelloRequest) returns (HelloReply) {}
}
message HelloRequest {
  string name = 1;
}
message HelloReply {
  string message = 1;
}
proto3 表示的是第三版的 protobuf 协议,是 gRPC 目前推荐的版本,“语法简单,功能更全”。
我们可以用 service  格式,定义服务体,其中可以包含方法。同时,我们可以更加细致的通过 message  描述服务具体的请求参数和响应参数。
我们可以从 Google 的官网文档 中查看更多细节。
大家会看到,这和 Java 中的 Class 非常相像,每个结构就相当于 Java 中的一个类。
编写 proto 文件
现在我们再来看之前的服务,是不是就很好理解了。
syntax = "proto3";
package helloworld;
// 服务的定义
service Greeter {
  // Sends a greeting
  rpc SayHello (HelloRequest) returns (HelloReply) {}
}
// 服务的请求参数
message HelloRequest {
  string name = 1;
}
// 服务的响应参数
message HelloReply {
  string message = 1;
}
我们定义了一个名为 Greeter  的服务,包含一个 HelloRequest  结构的请求体,以及返回 HelloReply  结构的响应体。
接下去,我们将对这个服务给大家做演示。
生成代码定义
传统的 gRPC 框架,需要用户手动编写 proto 文件,以及生成 js 服务,最后再根据 js 生成的服务再编写实现,在 Midway 体系下,我们提供了一个 grpc-helper 工具包来加速这个过程。
如果没有安装,可以先安装(脚手架示例中已带)。
$ npm i @midwayjs/grpc-helper --save-dev
grpc-helper 工具的作用,是将用户提供的 proto 文件,生成对应可读的 ts interface 文件。
我们可以添加一个脚本,方便这个过程。
{
  "scripts": {
    "generate": "tsproto --path proto --output src/domain"
  }
}
然后执行 npm run generate 。
上述命令执行后,会在代码的 src/domain  目录中生成 proto 文件对应的服务接口定义。
不管是提供 gRPC 服务还是调用 gRPC 服务,都要先生成定义。
生成的代码如下,包含有一个命名空间(namespace),以及命名空间下的两个 TypeScript Interface, Greeter  用于编写服务端实现, GreeterClient  用于编写客户端实现。
/**
 * This file is auto-generated by grpc-helper
 */
import * as grpc from '@midwayjs/grpc';
// 生成的命名空间
export namespace helloworld {
  // 服务端使用的定义
  export interface Greeter {
    // Sends a greeting
    sayHello(data: HelloRequest): Promise<HelloReply>;
  }
  // 客户端使用的定义
  export interface GreeterClient {
    // Sends a greeting
    sayHello(options?: grpc.IClientOptions): grpc.IClientUnaryService<HelloRequest, HelloReply>;
  }
  // 请求体结构
  export interface HelloRequest {
    name?: string;
  }
  // 响应体结构
  export interface HelloReply {
    message?: string;
  }
}
每当 proto 文件被修改时,就需要重新生成对应的服务定义,然后将对应的方法实现。
提供 gRPC 服务(Provider)
编写生产者(Provider)
在 src/provider  目录中,我们创建 greeter.ts ,内容如下。
import { MSProviderType, Provider, Provide, GrpcMethod } from '@midwayjs/decorator';
import { helloworld } from '../domain/helloworld';
/**
 * 实现 helloworld.Greeter 接口的服务
 */
@Provide()
@Provider(MSProviderType.GRPC, { package: 'helloworld' })
export class Greeter implements helloworld.Greeter {
  @GrpcMethod()
  async sayHello(request: helloworld.HelloRequest) {
    return { message: 'Hello ' + request.name };
  }
}
注意,@Provider 装饰器和 @Provide 装饰器不同,前者用于提供服务,后者用于依赖注入容器扫描标识的类。
我们使用 @Provider  暴露出一个 RPC 服务, @Provider  的第一个参数为 RPC 服务类型,这个参数是个枚举,这里选择 GRPC 类型。
@Provider 的第二个参数为 RPC 服务的元数据,这里指代的是 gRPC 服务的元数据。这里需要写入 gRPC 的 package 字段,即 proto 文件中的 package 字段(这里的字段用于和 proto 文件加载后的字段做对应)。
对于普通的 gRPC 服务接口(UnaryCall),我们只需要使用 @GrpcMethod()  装饰器修饰即可。修饰的方法即为服务定义本身,入参为 proto 中定义好的入参,return 值即为定义好的响应体。
注意,生成的 Interface 是为了更好的编写服务代码,规范结构,请务必按照定义编写。
启动 gRPC 服务
这里启动需要用到项目根目录 bootstrap.js  独立文件。代码和其他框架初始化类似,只是这里的框架包是 @midwayjs/grpc 。
内容如下:
// 获取框架
const { Framework } = require('@midwayjs/grpc');
const { join } = require('path');
// 初始化框架
const grpcService = new Framework().configure({
  services: [
    {
      protoPath: join(__dirname, 'proto/helloworld.proto'),
      package: 'helloworld',
    },
  ],
});
// 使用 bootstrap 启动
const { Bootstrap } = require('@midwayjs/bootstrap');
Bootstrap.load(grpcService).run();
我们已经将启动命令写到了 start 脚本中,执行 npm run start  即可。
"scripts": {
  "start": "NODE_ENV=production node ./bootstrap.js",
},
在部署前,需要执行 npm run build 将 ts 代码编译为 js。
框架选项
@midwayjs/grpc  作为框架启动时,可以传递的参数如下:
| url | string | 可选,gRPC 服务连接字符串,默认为 localhost:6565 | 
|---|---|---|
| services | IGRPCServiceOptions[] | 必选,数组,需要暴露的 gRPC 服务信息,每个服务对应一个 proto 文件 | 
| loaderOptions | object | 可选,使用 @grpc/proto-loader 加载的选项,具体参考这里,默认为 | 
{   keepCase: true,  longs: String,  enums: String,  defaults: true,  oneofs: true, } |
| credentials | ServerCredentials | 可选,服务凭证,值参考这里,默认值为 ServerCredentials.createInsecure() |
services 字段是数组,意味着 Midway 项目可以同时发布多个 gRPC 服务。每个 service 的结构为:
| protoPath | string | 必选,proto 文件的绝对路径 | 
|---|---|---|
| package | string | 必选,服务对应的 package | 
编写单元测试
@midwayjs/grpc  库提供了一个 createGRPCConsumer  方法,用于实时调用客户端,一般我们用这个方法做测试。
这个方法每次调用会实时连接,不建议将该方法用在生产环境。
在测试中写法如下。
import { createApp, close } from '@midwayjs/mock';
import { Framework, createGRPCConsumer } from '@midwayjs/grpc';
import { join } from 'path';
import { helloworld } from '../src/domain/helloworld';
describe('test/index.test.ts', () => {
  it('should create multiple grpc service in one server', async () => {
    const baseDir = join(__dirname, '../');
    // 创建服务
    const app = await createApp<Framework>(baseDir, {
      services: [
        {
          protoPath: join(baseDir, 'proto', 'helloworld.proto'),
          package: 'helloworld',
        },
      ],
    });
    // 调用服务
    const service = await createGRPCConsumer<helloworld.GreeterClient>({
      package: 'helloworld',
      protoPath: join(baseDir, 'proto', 'helloworld.proto'),
      url: 'localhost:6565',
    });
    const result = await service.sayHello().sendMessage({
      name: 'harry',
    });
    expect(result.message).toEqual('Hello harry');
    await close(app);
  });
});
调用 gRPC 服务(Consumer)
我们编写一个 gRPC 服务来调用上面的暴露的服务。
事实上,你可以在 Web 的 Controller,或者 Service 等其他地方来调用,这里只是做一个示例。
增加组件
@midwayjs/grpc  库即是 Framework,又是组件,在作为组件引入时,需要在 src/configuration.ts  中配置。
// src/configuration.ts
import { Configuration } from '@midwayjs/decorator';
import * as grpc from '@midwayjs/grpc';
import { join } from 'path';
@Configuration({
  imports: [grpc],
  importConfigs: [join(__dirname, './config')],
})
export class AutoConfiguration {}
提供调用配置
你需要在 src/config/config.default.ts  中增加你需要调用的目标服务以及它的 proto 文件信息。
比如,这里我们填写了上面暴露的服务本身,以及该服务的 proto,包名等信息。
// src/config/config.default.ts
import { DefaultConfig } from '@midwayjs/grpc';
import { join } from 'path';
export const grpc = {
  services: [
    {
      url: 'localhost:6565',
      protoPath: join(__dirname, '../../proto/helloworld.proto'),
      package: 'helloworld',
    },
  ],
} as DefaultConfig;
代码调用
配置完后,我们就可以在代码里调用了。
@midwayjs/grpc  提供了 clients ,可以方便的获取到已配置的服务。我们只需要在需要注入的地方,注入这个对象即可。
比如:
import { Provide, Inject } from '@midwayjs/decorator';
import { helloworld, hero } from '../interface';
import { Clients } from '@midwayjs/grpc';
@Provide()
export class UserService {
  @Inject()
  grpcClients: Clients;
}
我们通过 clients  获取到对方服务的客户端实例,然后调用即可。
import { Provide, Inject } from '@midwayjs/decorator';
import { helloworld, hero } from '../interface';
import { Clients } from '@midwayjs/grpc';
@Provide()
export class UserService {
  @Inject()
  grpcClients: Clients;
  async invoke() {
    // 获取服务
    const greeterService = this.grpcClients.getService<helloworld.GreeterClient>('helloworld.Greeter');
    // 调用服务
    const result = await greeterService.sayHello().sendMessage({
      name: 'harry',
    });
    // 返回结果
    return result;
  }
}
我们也可以利用 @Init  装饰器,将需要调用的服务缓存到属性上。这样可以在其他方法调用时复用。
示例如下。
import { GrpcMethod, MSProviderType, Provider, Provide, Inject, Init } from '@midwayjs/decorator';
import { helloworld, hero } from '../interface';
import { Clients } from '@midwayjs/grpc';
@Provide()
@Provider(MSProviderType.GRPC, { package: 'hero' })
export class HeroService implements hero.HeroService {
  // 注入客户端
  @Inject()
  grpcClients: Clients;
  greeterService: helloworld.Greeter;
  @Init()
  async init() {
    // 赋值一个服务实例
    this.greeterService = this.grpcClients.getService<helloworld.GreeterClient>('helloworld.Greeter');
  }
  @GrpcMethod()
  async findOne(data) {
    // 调用服务
    const result = await greeterService.sayHello().sendMessage({
      name: 'harry',
    });
    // 返回结果
    return result;
  }
}
流式服务
gRPC 的流式服务用于减少连接,让服务端或者客户端不需要等待即可执行任务,从而提高执行效率。
gRPC 的流式服务分为三种,以服务端角度来说,为
- 服务端接收流(客户端推)
 - 服务端响应流(服务端 推)
 - 双向流
 
下面我们将一一介绍。
流式 proto 文件
流式的 proto 文件写法不同,需要在希望使用流式的地方将参数标记为 stream 。
syntax = "proto3";
package math;
message AddArgs {
  int32 id = 1;
  int32 num = 2;
}
message Num {
  int32 id = 1;
  int32 num = 2;
}
service Math {
  rpc Add (AddArgs) returns (Num) {
  }
	// 双向流
  rpc AddMore (stream AddArgs) returns (stream Num) {
  }
  // 服务端往客户端推
  rpc SumMany (AddArgs) returns (stream Num) {
  }
  // 客户端往服务端推
  rpc AddMany (stream AddArgs) returns (Num) {
  }
}
该服务生成的接口定义为:
import {
  IClientDuplexStreamService,
  IClientReadableStreamService,
  IClientUnaryService,
  IClientWritableStreamService,
  IClientOptions,
} from '@midwayjs/grpc';
export namespace math {
  export interface AddArgs {
    id?: number;
    num?: number;
  }
  export interface Num {
    id?: number;
    num?: number;
  }
  /**
   * server interface
   */
  export interface Math {
    add(data: AddArgs): Promise<Num>;
    addMore(data: AddArgs): Promise<void>;
    // 服务端推,客户端读
    sumMany(data: AddArgs): Promise<void>;
    // 客户端端推,服务端读
    addMany(num: AddArgs): Promise<void>;
  }
  /**
   * client interface
   */
  export interface MathClient {
    add(options?: IClientOptions): IClientUnaryService<AddArgs, Num>;
    addMore(options?: IClientOptions): IClientDuplexStreamService<AddArgs, Num>;
    // 服务端推,客户端读
    sumMany(options?: IClientOptions): IClientReadableStreamService<AddArgs, Num>;
    // 客户端端推,服务端读
    addMany(options?: IClientOptions): IClientWritableStreamService<AddArgs, Num>;
  }
}
服务端推送
客户端调用一次,服务端可以多次返回。通过 @GrpcMethod()  的参数来标识流式类型。
可用的类型为:
GrpcStreamTypeEnum.WRITEABLE服务端输出流(单工)GrpcStreamTypeEnum.READABLE客户端输出流(单工),服务端接受多次GrpcStreamTypeEnum.DUPLEX双工流
服务端示例如下:
import { GrpcMethod, GrpcStreamTypeEnum, Inject, MSProviderType, Provide, Provider } from '@midwayjs/decorator';
import { Context } from '@midwayjs/grpc';
import { math } from '../interface';
import { Metadata } from '@grpc/grpc-js';
/**
 */
@Provide()
@Provider(MSProviderType.GRPC, { package: 'math' })
export class Math implements math.Math {
  @Inject()
  ctx: Context;
  @GrpcMethod({ type: GrpcStreamTypeEnum.WRITEABLE })
  async sumMany(args: math.AddArgs) {
    this.ctx.write({
      num: 1 + args.num,
    });
    this.ctx.write({
      num: 2 + args.num,
    });
    this.ctx.write({
      num: 3 + args.num,
    });
    this.ctx.end();
  }
  // ...
}
服务端使用 ctx.write  方法来返回数据,由于是服务端流,可以返回多次。
返回结束后,请使用 ctx.end()  方法关闭流。
客户端,调用一次,接受多次数据。
比如下面的累加逻辑。
Promise 写法,会等待服务端数据都返回再做处理。
// 服务端推送
let total = 0;
let result = await service.sumMany().sendMessage({
  num: 1,
});
result.forEach((data) => {
  total += data.num;
});
// total = 9;
事件写法,实时处理。
// 服务端推送
let call = service.sumMany().getCall();
call.on('data', (data) => {
  // do something
});
call.sendMessage({
  num: 1,
});
客户端推送
客户端调用多次,服务端接收多次数据,返回一个结果。通过 @GrpcMethod({type: GrpcStreamTypeEnum.READABLE}) 的参数来标识流式类型。
服务端示例如下:
import { GrpcMethod, GrpcStreamTypeEnum, Inject, MSProviderType, Provide, Provider } from '@midwayjs/decorator';
import { Context } from '@midwayjs/grpc';
import { math } from '../interface';
import { Metadata } from '@grpc/grpc-js';
/**
 */
@Provide()
@Provider(MSProviderType.GRPC, { package: 'math' })
export class Math implements math.Math {
  sumDataList: number[] = [];
  @Inject()
  ctx: Context;
  @GrpcMethod({ type: GrpcStreamTypeEnum.READABLE, onEnd: 'sumEnd' })
  async addMany(data: math.Num) {
    this.sumDataList.push(data);
  }
  async sumEnd(): Promise<math.Num> {
    const total = this.sumDataList.reduce((pre, cur) => {
      return {
        num: pre.num + cur.num,
      };
    });
    return total;
  }
  // ...
}
客户端每次调用,都会触发一次 addMany 方法。
在客户端发送 end  事件之后,会调用 @GrpcMethod  装饰器上的 onEnd  参数指定的方法,该方法的返回值即为最后客户端拿到的值。
客户端示例如下:
// 客户端推送
const data = await service.addMany().sendMessage({ num: 1 }).sendMessage({ num: 2 }).sendMessage({ num: 3 }).end();
// data.num = 6
双向流
客户端可以调用多次,服务端也可以接收多次数据,返回多个结果,类似于传统的 TCP 通信。通过 @GrpcMethod({type: GrpcStreamTypeEnum.DUPLEX}) 的参数来标识双工流式类型。
服务端示例如下:
import { GrpcMethod, GrpcStreamTypeEnum, Inject, MSProviderType, Provide, Provider } from '@midwayjs/decorator';
import { Context } from '@midwayjs/grpc';
import { math } from '../interface';
import { Metadata } from '@grpc/grpc-js';
/**
 */
@Provide()
@Provider(MSProviderType.GRPC, { package: 'math' })
export class Math implements math.Math {
  @Inject()
  ctx: Context;
  @GrpcMethod({ type: GrpcStreamTypeEnum.DUPLEX, onEnd: 'duplexEnd' })
  async addMore(message: math.AddArgs) {
    this.ctx.write({
      id: message.id,
      num: message.num + 10,
    });
  }
  async duplexEnd() {
    console.log('got client end message');
  }
  // ...
}
服务端可以随时使用 ctx.write  返回数据,也可以使用 ctx.end  来关闭流。
客户端示例:
对于双工通信的客户端,由于无法保证调用、返回的顺序,我们需要使用监听的模式来消费结果。
const clientStream = service.addMore().getCall();
let total = 0;
let idx = 0;
duplexCall.on('data', (data: math.Num) => {
  total += data.num;
  idx++;
  if (idx === 2) {
    duplexCall.end();
    // total => 29
  }
});
duplexCall.write({
  num: 3,
});
duplexCall.write({
  num: 6,
});
如果希望保证调用顺序,我们也提供了保证顺序的双向流调用方法,但是需要在 proto 中定义一个固定的 id,来确保顺序。
比如我们的 Math.proto,对每个入参和出参,都增加了一个固定的 id,所以可以固定顺序。
syntax = "proto3";
package math;
message AddArgs {
  int32 id = 1;							//  这里的 id 名字是固定的
  int32 num = 2;
}
message Num {
  int32 id = 1;							//  这里的 id 名字是固定的
  int32 num = 2;
}
service Math {
  rpc Add (AddArgs) returns (Num) {
  }
  rpc AddMore (stream AddArgs) returns (stream Num) {
  }
  // 服务端往客户端推
  rpc SumMany (AddArgs) returns (stream Num) {
  }
  // 客户端往服务端推
  rpc AddMany (stream AddArgs) returns (Num) {
  }
}
固定顺序的客户端调用方式如下:
// 保证顺序的双向流
const t = service.addMore();
const result4 = await new Promise<number>((resolve, reject) => {
  let total = 0;
  // 第一次调用和返回
  t.sendMessage({
    num: 2,
  })
    .then((res) => {
      expect(res.num).toEqual(12);
      total += res.num;
    })
    .catch((err) => console.error(err));
  // 第二次调用和返回
  t.sendMessage({
    num: 5,
  })
    .then((res) => {
      expect(res.num).toEqual(15);
      total += res.num;
      resolve(total);
    })
    .catch((err) => console.error(err));
  t.end();
});
// result4 => 27
默认的 id 为 id ,如果服务端定义不同,需要修改,可以在客户端调用时传递。
// 保证顺序的双向流
const t = service.addMore({
  messageKey: 'uid',
});
元数据(Metadata)
gRPC 的元数据等价于 HTTP 的上下文。
服务端通过 ctx.sendMetadata  方法返回元数据,也可以通过 ctx.metadata 获取客户端传递的元数据。
import { MSProviderType, Provider, Provide, GrpcMethod } from '@midwayjs/decorator';
import { helloworld } from '../domain/helloworld';
import { Metadata } from '@grpc/grpc-js';
import { Context } from '@midwayjs/grpc';
/**
 * 实现 helloworld.Greeter 接口的服务
 */
@Provide()
@Provider(MSProviderType.GRPC, { package: 'helloworld' })
export class Greeter implements helloworld.Greeter {
  @Inject()
  ctx: Context;
  @GrpcMethod()
  async sayHello(request: helloworld.HelloRequest) {
    // 客户端传递的元数据
    console.log(this.ctx.metadata);
    // 创建元数据
    const meta = new Metadata();
    this.ctx.metadata.add('xxx', 'bbb');
    this.ctx.sendMetadata(meta);
    return { message: 'Hello ' + request.name };
  }
}
客户端通过方法的 options 参数传递元数据。
import { Metadata } from '@grpc/grpc-js';
const meta = new Metadata();
meta.add('key', 'value');
const result = await service
  .sayHello({
    metadata: meta,
  })
  .sendMessage({
    name: 'harry',
  });
获取元数据相对麻烦一些。
普通一元调用(UnaryCall)获取元数据需要使用 sendMessageWithCallback  方法。
const call = service.sayHello().sendMessageWithCallback(
  {
    name: 'zhangting',
  },
  (err) => {
    if (err) {
      reject(err);
    }
  }
);
call.on('metadata', (meta) => {
  // output meta
});
其他流式服务,可以通过 getCall()  方法获取原始客户端流对象,从而直接订阅。
// 获取服务,注意,这里没有 await
const call = service.addMany().getCall();
call.on('metadata', (meta) => {
  // output meta
});
超时处理
我们可以在调用服务时传递参数,单位毫秒。
const result = await service
  .sayHello({
    timeout: 5000,
  })
  .sendMessage({
    name: 'harry',
  });