上次文章简单了解了下 gRPC,并实现了一个简单模式,今天说下其他的几种模式。
gRPC有四种通信方式,分别是:简单 RPC(Unary RPC)一元RPC、服务端数据流模式 (Server-side streaming RPC)、客户端数据流模式 (Client-side streaming RPC)、双向数据流模式(Bidirectional streaming RPC)。它们主要有以下特点:
gRPC 支持定义 4 种类型的服务方法,分别是简单模式、服务端数据流模式、客户端数据流模式和双向数据流模式。
- 简单模式、一元RPC(Simple RPC、Unary RPC)是最简单的 gRPC 模式。客户端发起一次请求,服务端响应一个数据。定义格式为 rpc SayHello (HelloRequest) returns (HelloReply) {}。
- 服务端数据流模式(Server-side streaming RPC):客户端发送一个请求,服务器返回数据流响应,客户端从流中读取数据直到为空。定义格式为 rpc SayHello (HelloRequest) returns (stream HelloReply) {}。
- 客户端数据流模式(Client-side streaming RPC):客户端将消息以流的方式发送给服务器,服务器全部处理完成之后返回一次响应。定义格式为 rpc SayHello (stream HelloRequest) returns (HelloReply) {}。
- 双向数据流模式(Bidirectional streaming RPC):客户端和服务端都可以向对方发送数据流,这个时候双方的数据可以同时互相发送,也就是可以实现实时交互 RPC 框架原理。定义格式为 rpc SayHello (stream HelloRequest) returns (stream HelloReply) {}。
大概四种模式我们了解了,接下来使用 Go 语言简单实现一下,这里我们采用上次的代码。保持其目录结构即可。
.
├── README.md
├── api
│ └── user
│ ├── user.pb.go
│ └── user.proto
├── client
│ └── client.go
├── go.mod
├── go.sum
└── server
└── server.go
其实上节已经实现了一个简单模式,但是为了保持四种模式都有,所以还是简单演示一下吧
简单模式
重新创建一个 proto 文件,我们放在 api/simple/simple.proto
中
syntax = "proto3";
option go_package = "api/simple";
package simple;
message SimpleRequest {
string name = 1;
}
message SimpleResponse {
string message = 1;
}
service SimpleService {
rpc Get(SimpleRequest) returns (SimpleResponse) {}
}
生成 go 文件
protoc --go_out=. --go_opt=paths=source_relative \
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
api/simple/simple.proto
接下来去编写 Server 端代码,
还是如下的步骤:
- 通过
net.Listen(...)
监听客户端的请求 - 通过
grpc.NewServer()
创建一个 gRPC Server 实例 - 通过
pb.RegisterUserServiceServer(s, &User{})
将该服务注册到 gRPC 框架中 - 通过
s.Serve(listen)
启动 gRPC 服务
创建 server/simple.go
package main
import (
"context"
"fmt"
pb "github.com/hedeqiang/grpc-demo/api/simple"
"google.golang.org/grpc"
"log"
"net"
)
type Simple struct {
Name string
pb.UnimplementedSimpleServiceServer
}
func (s *Simple) Get(ctx context.Context, req *pb.SimpleRequest) (*pb.SimpleResponse, error) {
name := req.GetName()
return &pb.SimpleResponse{
Message: "Hello " + name,
}, nil
}
func main() {
fmt.Println("start server")
listen, err := net.Listen("tcp", ":8989")
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
s := grpc.NewServer()
pb.RegisterSimpleServiceServer(s, &Simple{})
if err := s.Serve(listen); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
接下来就是创建 Client 了,client 就相对简单了,通过 grpc.Dial(xxx)
来创建连接,通过 pb.NewUserServiceClient(conn)
来创建客户端对象
package main
import (
"context"
"fmt"
pb "github.com/hedeqiang/grpc-demo/api/simple"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"log"
)
func main() {
fmt.Println("client start...")
conn, err := grpc.Dial("localhost:8989", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("fail to dial: %v", err)
}
defer conn.Close()
client := pb.NewSimpleServiceClient(conn)
response, err := client.Get(context.Background(), &pb.SimpleRequest{Name: "hedeqiang"})
if err != nil {
log.Fatalf("fail to call: %v", err)
}
fmt.Println(response.GetMessage())
}
服务端数据流模式(Server-side streaming RPC)
简单来讲服务端数据流模式发起一次普通的 RPC 请求,服务端通过流式响应多次发送数据集,客户端 Recv 接收数据。
好了,我们继续来编写代码,首先是服务端。创建 proto 文件api/server_side/server_side.proto
syntax = "proto3";
option go_package = "api/server-side;server_side";
package server_side;
message GetUserRequest {
string user_id = 1;
}
message GetUserResponse {
string user_id = 1;
string name = 2;
string email = 3;
}
service ServerSide {
rpc GetUser(GetUserRequest) returns (stream GetUserResponse) {}
}
可以看到 服务端数据流模式 是返回的是一个 stream
生成Go文件
protoc --go_out=. --go_opt=paths=source_relative \
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
api/server_side/server_side.proto
编写服务端代码,创建文件server/server_side.go
package main
import (
"fmt"
pb "github.com/hedeqiang/grpc-demo/api/server_side"
"google.golang.org/grpc"
"log"
"net"
"time"
)
type ServerSide struct {
pb.UnimplementedServerSideServer
}
func (s *ServerSide) GetUser(req *pb.GetUserRequest, stream pb.ServerSide_GetUserServer) error {
fmt.Println("GetUser")
for i := 0; i < 10; i++ {
time.Sleep(time.Second)
err := stream.Send(&pb.GetUserResponse{
UserId: fmt.Sprintf("%d", i),
Name: "name" + fmt.Sprintf("%d", i),
Email: fmt.Sprintf("laravel_code@163.com"),
})
if err != nil {
log.Fatalf("%v.Send(%v) = %v", stream, req, err)
}
}
return nil
}
func main() {
fmt.Println("start server")
listen, err := net.Listen("tcp", ":8989")
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
s := grpc.NewServer()
pb.RegisterServerSideServer(s, &ServerSide{})
if err := s.Serve(listen); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
关键代码讲解 GetUser
方法格式其实是固定的,我们在 定义 proto 文件的时候就已经固定了
func (s ServerSide) GetUser(req *pb.GetUserRequest, stream pb.ServerSide_GetUserServer) error {}
其次因为是流传输,那么其实就是不断调用 Send
方法去发送数据
stream.Send(xxx)
为了演示效果 我使用
time.Sleep(time.Second)
稍微休息一下。方便客户端看到具体效果,否则太快了。。
明白了服务端代码主要区别在于 GetUser
是一个流式传输,需要不断的去 Send
。因此客户端同理,需要不断的接收 Recv
即可。
客户端代码 ,创建 client/server_side.go
:
package main
import (
"context"
"fmt"
pb "github.com/hedeqiang/grpc-demo/api/server_side"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"io"
"log"
)
func main() {
fmt.Println("client start...")
conn, err := grpc.Dial("localhost:8989", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("fail to dial: %v", err)
}
defer conn.Close()
client := pb.NewServerSideClient(conn)
user, err := client.GetUser(context.Background(), &pb.GetUserRequest{UserId: "1"})
if err != nil {
log.Fatalf("fail to get user: %v", err)
}
for {
resp, err := user.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("fail to recv: %v", err)
}
fmt.Println(resp)
}
}
需要注意的是,我们需要判断什么时候结束,也就是 err == io.EOF 那么我们就需要 break 掉.
客户端关键就是调用一个 Recv()
方法
接下来分别启动这两个客户端
go run server/server_side.go
go run client/server_side.go
不出意外的话,我们在客户端会间隔一秒输出信息
客户端数据流模式(Client-side streaming RPC)
客户端将消息以流的方式发送给服务器,服务器全部处理完成之后返回一次响应。定义格式为 rpc SayHello (stream HelloRequest) returns (HelloReply) {}。
同样继续创建一个 proto
文件,放在 api/client_side/client_side.proto
中
syntax = "proto3";
option go_package = "api/client;client_side";
package client_side;
message GetUserRequest {
string user_id = 1;
}
message GetUserResponse {
string user_id = 1;
string name = 2;
string email = 3;
}
service ServerSide {
rpc GetUser(stream GetUserRequest) returns (GetUserResponse) {}
}
生成 Go 文件
protoc --go_out=. --go_opt=paths=source_relative \
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
api/client_side/client_side.proto
编写服务端代码
package main
import (
"fmt"
pb "github.com/hedeqiang/grpc-demo/api/client_side"
"google.golang.org/grpc"
"io"
"log"
"net"
)
type ClientSide struct {
Name string
pb.UnimplementedClientSideServer
}
func (c *ClientSide) GetUser(stream pb.ClientSide_GetUserServer) error {
for {
var res pb.GetUserResponse
res.UserId = "123"
res.Name = "hedeqiang"
res.Email = "laravel_code@163.com"
recv, err := stream.Recv()
if err == io.EOF {
return stream.SendAndClose(&res)
}
if err != nil {
log.Fatalf("%v.GetUser(_) = _, %v", c, err)
}
fmt.Println(recv.GetUserId())
}
}
func main() {
fmt.Println("start server")
listen, err := net.Listen("tcp", ":8989")
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
s := grpc.NewServer()
pb.RegisterClientSideServer(s, &ClientSide{})
if err := s.Serve(listen); err != nil {
log.Fatalf("failed to serve: %v", err)
}
通过调用 stream.Recv()
进行接收客户端的数据,当碰到 io.EOF
则表明读取完毕,调用 stream.SendAndClose(xx)
方法将最终的结果发送给客户端。
客户端代码:
package main
import (
"context"
"fmt"
pb "github.com/hedeqiang/grpc-demo/api/client_side"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"log"
"strconv"
"time"
)
func main() {
fmt.Println("client start...")
conn, err := grpc.Dial("localhost:8989", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("fail to dial: %v", err)
}
defer conn.Close()
client := pb.NewClientSideClient(conn)
user, err := client.GetUser(context.Background())
if err != nil {
return
}
if err != nil {
log.Fatalf("fail to get user: %v", err)
}
i := 0
for {
if i > 10 {
break
}
i++
time.Sleep(time.Second)
err := user.Send(&pb.GetUserRequest{
UserId: strconv.Itoa(i),
})
fmt.Println("send:", i)
if err != nil {
log.Fatalf("fail to send: %v", err)
}
}
resp, err := user.CloseAndRecv()
if err != nil {
log.Fatalf("fail to close: %v", err)
}
fmt.Println(resp)
}
调用 Send() 方法,不断向和后端发送数据,当发送完毕以后调用 CloseAndRecv()
方法 关闭请求拿到服务端返回的数据。
双向数据流模式(Bidirectional streaming RPC)
双向数据流模式,顾名思义是双向流,客户端以流式的方式发起请求,服务端同样以流式的方式响应请求
新建api/bidirectional/bidirectional.proto
文件
syntax = "proto3";
option go_package = "api/bidirectional;bidirectional";
package bidirectional;
message GetUserRequest {
string user_id = 1;
}
message GetUserResponse {
string user_id = 1;
string name = 2;
string email = 3;
}
service BidirectionalService {
rpc GetUser(stream GetUserRequest) returns (stream GetUserResponse) {}
}
生成 Go 代码
protoc --go_out=. --go_opt=paths=source_relative \
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
api/bidirectional/bidirectional.proto
编写服务端代码 server/bidirectional.go
package main
import (
"fmt"
pb "github.com/hedeqiang/grpc-demo/api/bidirectional"
"google.golang.org/grpc"
"log"
"net"
)
type BidirectionalService struct {
pb.UnimplementedBidirectionalServiceServer
}
func (b *BidirectionalService) GetUser(stream pb.BidirectionalService_GetUserServer) (err error) {
for {
req, err := stream.Recv()
if err != nil {
log.Printf("stream.Recv error: %v", err)
return err
}
log.Printf("stream.Recv req: %v", req)
err = stream.Send(&pb.GetUserResponse{
UserId: "123",
Name: "hedeqiang",
Email: "laravel_code@163.com",
})
if err != nil {
log.Printf("stream.Send error: %v", err)
return err
}
}
}
func main() {
fmt.Println("start server")
listen, err := net.Listen("tcp", ":8989")
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
s := grpc.NewServer()
pb.RegisterBidirectionalServiceServer(s, &BidirectionalService{})
if err := s.Serve(listen); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
客户端 client/bidirectional.go
package main
import (
"context"
"fmt"
pb "github.com/hedeqiang/grpc-demo/api/bidirectional"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"log"
"strconv"
"time"
)
func main() {
fmt.Println("client start...")
conn, err := grpc.Dial("localhost:8989", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("fail to dial: %v", err)
}
defer conn.Close()
client := pb.NewBidirectionalServiceClient(conn)
user, err := client.GetUser(context.Background())
if err != nil {
log.Fatalf("fail to get user: %v", err)
}
for i := 0; i < 10; i++ {
time.Sleep(time.Second)
err := user.Send(&pb.GetUserRequest{UserId: strconv.Itoa(i)})
if err != nil {
log.Fatalf("fail to send: %v", err)
}
resp, err := user.Recv()
if err != nil {
log.Fatalf("fail to recv: %v", err)
}
fmt.Printf("%v\n", resp)
}
err = user.CloseSend()
if err != nil {
log.Fatalf("fail to close send: %v", err)
}
}
其实可以配合 go 、channel 进行处理 Recv
和 Send
以上代码上传到 https://github.com/hedeqiang/learn-grpc 。可能写得不对,还望大佬们纠正。
关于极客返利
极客返利 是由我个人开发的一款网课返利、返现平台。包含 极客时间返现、拉勾教育返现、掘金小册返现、GitChat返现。目前仅包含这几个平台。后续如果有需要可以考虑其他平台。 简而言之就是:你买课,我返现。让你花更少的钱,就可以买到课程。
版权许可
本作品采用 知识共享署名 4.0 国际许可协议 进行许可。转载无需与我联系,但须注明出处,注明文章来源 Go 语言gRPC 四种通信方式的简单使用