一文掌握 gRPC
概述
RPC(Remote Procedure Call,远程过程调用)一种 进程间通信 的模式, 客户端应用程序可以直接调用不同计算机上的服务器应用程序上的方法,就像调用 本地 方法一样。
与许多 RPC 系统一样,gRPC 基于 定义一个服务(defining a service) 的理念,指定服务的方法名,参数和返回类型。 服务端应用实现此接口并运行 gRPC 服务器来处理客户端调用。 而在客户端有一个存根(stub)(在某些语言中称为 client),它提供与服务端相同的方法调用。
默认情况下,gRPC 使用 Protocol Buffers 作为消息的序列化机制。 关于 Protocol Buffers 可以参考之前的文章:一文掌握 Protobuf 。
HTTP/2
gRPC 默认使用 HTTP/2 作为传输协议,与 HTTP/1 相比,HTTP/2 具有如下特点:
- 二进制格式报文:HTTP/1 使用文本格式,而 HTTP/2 使用 二进制 格式。
- 二进制分帧层:处于应用层和传输层之间,HTTP 报文中的 Header 与 Data 将在这里分成不同的帧。
- 多路复用:多个请求-响应复用同一个 TCP 连接,每个请求-响应对应一个 流 ,可以把它当成一个虚拟通道,每个流都有一个唯一的整数ID。
- 服务器推送:允许服务端向客户端主动发送消息。服务器 可以 对一个客户端请求发送 多个 响应,服务器向客户端推送资源 无需 客户端明确的请求。
- 首部压缩:使用了 HPACK 算法来压缩头字段,减少了头字段的大小。
更多详细信息可以参考 RFC-7540。
核心概念
RPC 服务定义
下面是一个使用 protobuf 定义的 RPC 服务的例子:
service HelloService {
rpc SayHello (HelloRequest) returns (HelloResponse);
}
message HelloRequest {
string greeting = 1;
}
message HelloResponse {
string reply = 1;
}
gRPC 允许以下 4 种类型的服务:
- Unary RPCs:客户端发送 单个 请求并得到 单个 响应。proto
rpc SayHello(HelloRequest) returns (HelloResponse);
- Server streaming RPCs:客户端发送 单个 请求,但得到一个 消息流, 客户端将持续从流中读取消息直到没有更多消息。proto
rpc LotsOfReplies(HelloRequest) returns (stream HelloResponse);
- Client streaming RPCs:与上面相反,客户端发送一个消息流,服务短返回单一消息。proto
rpc LotsOfGreetings(stream HelloRequest) returns (HelloResponse);
- Bidirectional streaming RPCs:双向流式 RPC。proto
rpc BidiHello(stream HelloRequest) returns (stream HelloResponse);
在下面的 RPC 生命周期 部分会更详细的讲解各类 RPC 服务的区别。
RPC 生命周期
本节将详细介绍各类型 RPC 服务的调用过程。
Unary RPC
- 客户端发起 RPC 调用后,服务器会收到通知,包含客户端的元数据,方法名和超时时间(可选)。
- 服务器可以立刻返回自己的元数据,也可等到客户端的请求到达之后跟响应一起返回。
- 服务端处理请求并返回响应,响应中包含 状态码 和 提示信息(可选)。
- 客户端收到响应后,如果状态码为 OK,则获取响应中的结果,并结束这次调用。
Server streaming RPC
与 Unary RPC RPC 类似,不同之处在于服务器会返回 一串 消息来响应客户端的请求。 发送完所有消息后,服务器的状态信息和元数据(可选)将发送到客户端。 客户端收到所有服务器消息后即完成调用。
Client streaming RPC
与 Unary RPC RPC 类似,不同之处在于客户端向服务器发送的是 一串 消息,而不是单条消息。 服务器通常会(但不一定)在收到 所有 客户端消息后,以单条消息进行响应。
Bidirectional streaming RPC
该模式下,第一步与 Unary RPC 相同,后续两端将会 持续 向对端发送数据。 由于这两个数据流是独立的,因此客户端和服务器可以按 任何顺序 读取和写入消息。
RPC 超时时间
gRPC 允许客户端指定 Deadlines(确定的时间点)或 Timeouts(请求的持续时间) 来声明 RPC 的超时时间。
请求超时后,客户端会以 DEADLINE_EXCEEDED
error 终止;服务器则会根据收到的 Deadlines / Timeouts 确定 RPC 是否超时。
使用 Deadlines 还是 Timeouts,取决于特定语言的具体实现。
RPC 终止
客户端和服务器都会在 本地 确定 RPC 是否成功,因此两端的执行结果可能会不一致。 比如,服务器成功发送了响应,但客户端收到响应时已经超时。
取消 RPC
客户端和服务器都可以随时取消 RPC,此时 RPC 会立刻终止。
WARNING
取消前所做的更改 不会 回滚。
元数据
元数据是有关特定 RPC 调用(例如身份验证)的信息(类似于 HTTP 中的 Header),以键值对列表的形式呈现,其中键是字符串,值通常是字符串,但可以是二进制数据。
键不区分大小写,由 ASCII 字母、数字和特殊字符 -
、_
、.
组成,并且不能以 grpc-
开头(这是为 gRPC 保留字)。 二进制值键以 -bin
结尾,而 ASCII 值键则不以 -bin
结尾。
Channels
gRPC 提供一个 Channel 与特定服务器和端口上的 gRPC 服务通信。客户端可以指定 Channel 的参数修改 gRPC 的默认行为, 例如打开/关闭消息压缩。Channel 有自己的状态,包括已连接(connected)和空闲(idle)。
Channels 的具体实现取决于语言。
使用指南
gRPC 提供了丰富的特性以支持各类应用场景,例如:
- 认证
- 负载均衡
- 数据压缩
- 健康检查
- ...
更多详细信息请参考 gRPC Guides。
下面以 Rust 的 tonic 库为例展示 gRPC 的使用方法。
本示例的完整代码请参考 https://github.com/zou-can/rust-demos/tree/master/examples/grpc 。
创建一个 Unary RPC 服务
在创建项目之前,需要安装 protobuf 编译器,可以参考 安装 proto 编译器。
初始化项目
项目的目录结构如下:
grpc
├─ proto
│ └─ chat.proto # proto 消息定义文件
├─ src
│ ├─ server.rs # 服务器代码
│ ├─ client.rs # 客户端代码
│ ├─ chat.rs # 用于导入 protobuf 生成的 rust 代码
│ └─ lib.rs
├─ build.rs # rust 构建脚本,用于将 .proto 文件编译成 rust 代码
└─ Cargo.toml
Cargo.toml
[package]
name = "grpc"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[[bin]]
name = "chat-server"
path = "src/server.rs"
[[bin]]
name = "chat-client"
path = "src/client.rs"
[dependencies]
tonic = "0.11"
prost = "0.12"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
[build-dependencies]
tonic-build = "0.11"
编写 .proto 文件,并生成对应的 rust 代码
proto/chat.proto
syntax = "proto3";
// 指明生成的 rust 代码的文件名
// 编译过后可以在形如 target/debug/build/grpc-xxx/out/chat.rs 的路径下发现生成的 rust 代码文件
package chat;
message ChatRequest {
string message = 1;
}
message ChatResponse {
string message = 1;
}
service Chat {
// Unary
rpc UnaryChat(ChatRequest) returns (ChatResponse) {}
}
build.rs
fn main() {
tonic_build::compile_protos("proto/chat.proto").unwrap();
}
导入生成的 rust 代码
src/chat.rs
// 导入生成的 rust 代码
tonic::include_proto!("chat");
src/lib.rs
pub mod chat;
服务端代码
src/server.rs
use tonic::{Request, Response, Status};
use tonic::transport::Server;
use grpc::chat::{ChatRequest, ChatResponse};
use grpc::chat::chat_server::{Chat, ChatServer};
/// 定义一个 ChatService
#[derive(Debug, Default)]
struct ChatService {}
/// 为 ChatService 实现 RPC 接口
#[tonic::async_trait]
impl Chat for ChatService {
async fn unary_chat(
&self,
request: Request<ChatRequest>,
) -> Result<Response<ChatResponse>, Status> {
println!("Received a request: {:?}", request);
let resp = ChatResponse {
message: format!("Received: {}", request.into_inner().message)
};
Ok(Response::new(resp))
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let addr = "127.0.0.1:16443".parse()?;
// ChatServer 是 tonic 生成的样板代码
let svc = ChatServer::new(ChatService::default());
Server::builder()
.add_service(svc)
.serve(addr)
.await?;
Ok(())
}
客户端代码
src/client.rs
use tonic::Request;
use grpc::chat::chat_client::ChatClient;
use grpc::chat::ChatRequest;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client =
// ChatClient 是 tonic 生成的样板代码
ChatClient::connect("http://127.0.0.1:16443").await?;
let request = Request::new(ChatRequest {
message: "Hello?".into(),
});
let response = client.unary_chat(request).await?;
println!("{:?}", response);
Ok(())
}
验证
分别执行以下命令启动服务器和客户端:
cargo run --bin chat-server
cargo run --bin chat-client
可以得到客户端输出:
Response {
metadata: MetadataMap {
headers: {
"content-type": "application/grpc",
"date": "Mon, 24 Jun 2024 10:05:23 GMT",
"grpc-status": "0",
},
},
message: ChatResponse {
message: "Received: Hello?",
},
extensions: Extensions,
}
请求拦截器
可以为服务器和客户端添加请求拦截器,tonic 提供了两种定义拦截器的方法, 一种是实现 Interceptor
trait,一种是实现 FnMut
trait。代码示例如下:
src/server.rs
/// 使用 FnMut trait 定义拦截器
fn intercept(req: Request<()>) -> Result<Request<()>, Status> {
// 得到 metadata 中的 trace-id
if let Some(v) = req.metadata().get("trace-id") {
if let Ok(s) = v.to_str() {
println!("{s} - {:?}", req);
}
}
Ok(req)
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let addr = "127.0.0.1:16443".parse()?;
// ChatServer 是 tonic 生成的样板代码
// 配置拦截器
let svc = ChatServer::with_interceptor(
ChatService::default(),
intercept,
);
Server::builder()
.add_service(svc)
.serve(addr)
.await?;
Ok(())
}
src/client.rs
#[derive(Default)]
struct TraceInterceptor {}
/// 使用 Interceptor trait 定义拦截器
impl Interceptor for TraceInterceptor {
fn call(&mut self, mut req: Request<()>) -> Result<Request<()>, Status> {
let trace_id = Uuid::new_v4().to_string();
let value = MetadataValue::try_from(trace_id).unwrap();
req.metadata_mut().insert("trace-id", value);
Ok(req)
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let channel = Endpoint::from_static("http://127.0.0.1:16443")
.connect()
.await?;
let mut client: ChatClient<InterceptedService<Channel, TraceInterceptor>> =
// ChatClient 是 tonic 生成的样板代码
// 配置拦截器
ChatClient::with_interceptor(channel, TraceInterceptor::default());
let request = Request::new(ChatRequest {
message: "Hello?".into(),
});
let response = client.unary_chat(request).await?;
println!("{:#?}", response);
Ok(())
}
分别运行服务器和客户端,并查看服务端控制台输出
87848e6b-a2e9-4f7e-a44e-7f0cc45ce244 - Request { metadata: MetadataMap { headers: {"te": "trailers", "content-type": "application/grpc", "trace-id": "87848e6b-a2e9-4f7e-a44e-7f0cc45ce244", "user-agent": "tonic/0.11.0"} }, message: (), extensions: Extensions }
负载均衡
gRPC 提供了对负载均衡的支持,并允许用户扩展负载均衡的具体实现。更多信息可以参考 Load Balancing in gRPC。
下面是使用 客户端负载均衡 的一个例子:
src/server.rs
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let addrs = ["127.0.0.1:16443", "127.0.0.1:16444"];
let handles = addrs.into_iter()
.map(|addr| {
let addr = addr.parse().unwrap();
// ChatServer 是 tonic 生成的样板代码
// 配置拦截器
let svc = ChatServer::with_interceptor(
ChatService::default(),
intercept,
);
let server = Server::builder()
.add_service(svc)
.serve(addr);
// 分别启动两个 server
tokio::spawn(async move {
if let Err(e) = server.await {
eprintln!("Error = {:?}", e);
}
})
})
.collect::<Vec<_>>();
for handle in handles {
handle.await?;
}
Ok(())
}
src/client.rs
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let endpoints = ["http://127.0.0.1:16444", "http://127.0.0.1:16443", ]
.into_iter()
.map(|addr| Endpoint::from_static(addr));
// 客户端负载均衡
let channel = Channel::balance_list(endpoints);
let mut client: ChatClient<InterceptedService<Channel, TraceInterceptor>> =
// ChatClient 是 tonic 生成的样板代码
// 配置拦截器
ChatClient::with_interceptor(channel, TraceInterceptor::default());
// ...
Ok(())
}
总结
gRPC 工作在 HTTP/2 之上,内置流式通信、服务发现、负载均衡、健康检查等丰富的功能。 它默认的消息编码基于 Protocol Buffers,因此与其 局限性 类似。
特别的,不同语言的支持程度差异比较大,支持不好的情况下容易增加代码复杂性,降低产品质量。