grpc
原创大约 4 分钟
四种流模式
名称 | 说明 |
---|---|
简单模式(Simple RPC) | 客户端发起一次请求,服务端响应一条数据,典型例子:电商平台组件间的调用 |
服务端流模式(Server-side streaming RPC) | 客户端发起一次请求,服务端响应一段连续的数据流,典型例子:实时股票数据查询 |
客户端流模式(Client-side streaming RPC) | 客户端源源不断地向服务端发送数据,而后由服务端返回一个响应,典型例子:物联网终端数据发送 |
双向流模式(Bidirectional streaming RPC) | 将客户端流模式和服务端流模式结合起来,典型例子:聊天机器人 |
注意
在每一个用protobuf生成的*_grpc.pb.go
源码文件中,可能都会有这么一行。
......
type GreeterServer interface {
......
// 下面这一行记得手动注释掉(如果没有则可以略过)
// mustEmbedUnimplementedXXXXXX()
}
......
简单模式
hello.proto
文件。
syntax = "proto3";
// 指定在go语言中的包名(必须有,否则无法生成go文件)
option go_package=".;hello";
// grpc通过protobuf定义通信接口
service Greeter {
rpc SayHello (HelloRequest) returns (HelloReply) {}
}
// 消息序列化
message HelloRequest {
// 1 是编号,而不是值
string name = 1;
int32 age = 2;
}
// 消息序列化
message HelloReply {
string reply = 1;
}
执行下面的命令生成源码。
> cd ~/workspace-go/go-project/grpc/proto
> protoc --go_out=. --go-grpc_out=. hello.proto
服务端源码。
package main
import (
"context"
"go-project/grpc/proto"
"google.golang.org/grpc"
"net"
)
type Server struct{}
func (s *Server) SayHello(ctx context.Context, in *hello.HelloRequest) (*hello.HelloReply, error) {
return &hello.HelloReply{
Reply: "hello " + in.Name,
}, nil
}
func main() {
// 1. 实例化服务
g := grpc.NewServer()
// 2. 注册服务
hello.RegisterGreeterServer(g, &Server{})
// 3. 启动服务
var listener, _ = net.Listen("tcp", ":1234")
_ = g.Serve(listener)
}
客户端源码。
package main
import (
"context"
"fmt"
"go-project/grpc/proto"
"google.golang.org/grpc"
)
func main() {
// 建立连接
conn, _ := grpc.Dial("localhost:1234", grpc.WithInsecure())
defer conn.Close()
client := hello.NewGreeterClient(conn)
r, _ := client.SayHello(context.Background(), &hello.HelloRequest{
Name: "xiangwang",
Age: 18,
})
fmt.Println(r.Reply)
}
定义流模式
hello.proto
文件。
syntax = "proto3";
option go_package="./;stream";
// grpc通过protobuf定义通信接口
service Greeter {
// 客户端流模式
rpc PutStream (stream StreamRequestData) returns (StreamResponseData) {}
// 服务端流模式
rpc GetStream (StreamRequestData) returns (stream StreamResponseData) {}
// 双向流模式
rpc AllStream (stream StreamRequestData) returns (stream StreamResponseData) {}
}
message StreamRequestData {
string data = 1;
}
message StreamResponseData {
string data = 1;
}
执行下面的命令生成源码。
> cd ~/workspace-go/go-project/grpc_stream/proto
> protoc --go_out=. --go-grpc_out=. hello.proto
服务端源码
package main
import (
"fmt"
"go-project/grpc_stream/proto"
"google.golang.org/grpc"
"net"
"sync"
"time"
)
const PORT = ":1234"
var waitGroup = sync.WaitGroup{}
type Server struct{}
// PutStream 客户端流模式
func (s *Server) PutStream(response stream.Greeter_PutStreamServer) error {
for {
// 1. 接收客户端数据
if data, err := response.Recv(); err != nil {
fmt.Println("接收数据错误:", err)
break
} else {
fmt.Println("接收数据:", data.Data)
}
// 2. 发送数据给客户端
if err := response.SendAndClose(&stream.StreamResponseData{Data: "hello from server, " + fmt.Sprintf("%v", time.Now().Unix())}); err != nil {
fmt.Println("发送数据错误", err)
}
}
return nil
}
// GetStream 服务端流模式
func (s *Server) GetStream(request *stream.StreamRequestData, response stream.Greeter_GetStreamServer) error {
i := 0
for {
i++
if err := response.Send(&stream.StreamResponseData{Data: "hello from server, " + fmt.Sprintf("%v", time.Now().Unix())}); err != nil {
fmt.Println("服务端发送数据错误", err)
break
} else {
time.Sleep(time.Second)
if i > 5 {
break
}
}
}
return nil
}
// AllStream 双向流模式
func (s *Server) AllStream(response stream.Greeter_AllStreamServer) error {
// 这里需要让服务端的发送和接收并行处理
waitGroup.Add(2)
// 1. 接收客户端数据
go func() {
defer waitGroup.Done()
for {
if data, err := response.Recv(); err != nil {
fmt.Println("接收客户端数据错误:", err)
break
} else {
fmt.Println("接收客户端数据:", data.Data)
}
}
}()
// 2. 发送数据给客户端
go func() {
defer waitGroup.Done()
for {
if err := response.Send(&stream.StreamResponseData{Data: "hello from server, " + fmt.Sprintf("%v", time.Now().Unix())}); err != nil {
fmt.Println("服务端发送数据错误", err)
break
} else {
time.Sleep(time.Second)
}
}
}()
waitGroup.Wait()
return nil
}
func main() {
// 1. 实例化服务
var listener, _ = net.Listen("tcp", PORT)
g := grpc.NewServer()
// 2. 注册服务
stream.RegisterGreeterServer(g, &Server{})
// 3. 启动服务
_ = g.Serve(listener)
}
客户端源码
package main
import (
"context"
"fmt"
"go-project/grpc_stream/proto"
"google.golang.org/grpc"
"sync"
"time"
)
var waitGroup = sync.WaitGroup{}
func main() {
// 建立连接
conn, _ := grpc.Dial("localhost:1234", grpc.WithInsecure())
defer conn.Close()
client := stream.NewGreeterClient(conn)
waitGroup.Add(4)
// 服务端流模式
go func() {
defer waitGroup.Done()
result1, _ := client.GetStream(context.Background(), &stream.StreamRequestData{
Data: "are you xiangwang?",
})
for {
res, err := result1.Recv()
if err != nil {
fmt.Println(err)
break
}
fmt.Println(res.Data)
}
}()
// 客户端流模式
result2, _ := client.PutStream(context.Background())
go func() {
defer waitGroup.Done()
i := 0
for {
i++
if err := result2.Send(&stream.StreamRequestData{Data: fmt.Sprintf("hello, i am %d", i)}); err != nil {
fmt.Println("客户端发送数据错误", err)
break
}
time.Sleep(time.Second)
if i > 10 {
break
}
}
}()
// 双向流模式
result3, _ := client.AllStream(context.Background())
// 1. 接收服务端数据
go func() {
defer waitGroup.Done()
for {
if data, err := result3.Recv(); err != nil {
fmt.Println("接收服务端数据错误:", err)
break
} else {
fmt.Println("接收服务端数据:", data.Data)
}
}
}()
// 2. 发送数据给服务端
go func() {
defer waitGroup.Done()
for {
if err := result3.Send(&stream.StreamRequestData{Data: "hello from client, " + fmt.Sprintf("%v", time.Now().Unix())}); err != nil {
fmt.Println("客户端发送数据错误", err)
break
} else {
time.Sleep(time.Second)
}
}
}()
waitGroup.Wait()
}
感谢支持
更多内容,请移步《超级个体》。