gRPC之gRPC流
1、gRPC流
从其名称可以理解,流就是持续不断的传输。有一些业务场景请求或者响应的数据量比较大,不适合使用普通的
RPC 调用通过一次请求-响应处理,一方面是考虑数据量大对请求响应时间的影响,另一方面业务场景的设计不一
定需要一次性处理完所有数据,这时就可以使用流来分批次传输数据。
HTTP/2中有两个概念,流(stream)与帧(frame),其中帧作为HTTP/2中通信的最小传输单位,通常一个请
求或响应会被分为一个或多个帧传输,流则表示已建立连接的虚拟通道,可以传输多次请求或响应。每个帧中包含
Stream Identifier,标志所属流。HTTP/2通过流与帧实现多路复用,对于相同域名的请求,通过Stream
Identifier标识可在同一个流中进行,从而减少连接开销。 而gRPC基于HTTP/2协议传输,自然而然也实现了流式
传输,其中gRPC中共有以下三种类型的流:
1、服务端流式响应
2、客户端流式请求
3、两端双向流式
本篇主要讲讲如何实现gRPC三种流式处理。
gRPC的stream只需要在service的rpc
方法描述中通过 stream
关键字指定启用流特性就好了。
1.1 单向流
单向流是指客户端和服务端只有一端开启流特性,这里的单向特指发送数据的方向。
-
当服务端开启流时,客户端和普通 RPC 调用一样通过一次请求发送数据,服务端通过流分批次响应。
-
当客户端开启流时,客户端通过流分批次发送请求数据,服务端接完所有数据后统一响应一次。
1.1.1 服务端流
定义一个 MultiPong
方法,在服务端开启流,功能是接收到客户端的请求后响应10次 pong 消息。
ping.proto
文件的编写:
// ping.proto
// 指定proto版本
syntax = "proto3";
// 指定包名
package protos;
// 指定go包路径
option go_package = "protos/ping";
// 定义PingPong服务
service PingPong {// Ping发送ping请求,接收pong响应// 服务端流模式,在响应消息前添加stream关键字rpc MultiPong(PingRequest) returns (stream PongResponse);
}// PingRequest请求结构
message PingRequest {// value字段为string类型string value = 1;
}// PongResponse 响应结构
message PongResponse {// value字段为string类型string value = 1;
}
ping.pb.go
文件的生成:
$ protoc --go_out=plugins=grpc:. ping.proto
服务端实现,server.go
的编写,第二个参数为 stream 对象的引用,可以通过它的 Send
方法发送数据。
package mainimport (// 引入编译生成的包pb "demo/protos/ping""google.golang.org/grpc""log""net"
)// PingPongServer 实现 pb.PingPongServer 接口
type PingPongServer struct {pb.UnimplementedPingPongServer
}// MultiPong 服务端流模式
func (s *PingPongServer) MultiPong(req *pb.PingRequest, stream pb.PingPong_MultiPongServer) error {for i := 0; i < 10; i++ {data := &pb.PongResponse{Value: "pong"}// 发送消息err := stream.Send(data)if err != nil {return err}}return nil
}// 启动server
func main() {srv := grpc.NewServer()// 注册 PingPongServerpb.RegisterPingPongServer(srv, &PingPongServer{})lis, err := net.Listen("tcp", ":7009")if err != nil {log.Fatalln(err)}log.Println("listen on 7009")srv.Serve(lis)
}
# 启动server
$ go run server.go
2023/02/10 20:51:04 listen on 7009
客户端实现,client.go
的编写,请求方式和普通 RPC 没有区别,重点关注对响应数据流的处理,通过一个 for
循环接收数据直到结束。
package mainimport ("context"pb "demo/protos/ping" // 引入编译生成的包"google.golang.org/grpc""io""log"
)// Ping 单次请求-响应模式
func main() {conn, err := grpc.Dial("localhost:7009", grpc.WithInsecure())if err != nil {log.Fatalln(err)}defer conn.Close()// 实例化客户端并调用client := pb.NewPingPongClient(conn)// 获得对 stream 对象的引用stream, err := client.MultiPong(context.Background(), &pb.PingRequest{Value: "ping"})if err != nil {log.Fatalln(err)}// 循环接收响应数据流for {msg, err := stream.Recv()if err != nil {// 数据结束if err == io.EOF {break}log.Fatalln(err)}log.Println(msg.Value)}
}
# 客户端运行
$ go run client.go
2023/02/10 20:54:34 pong
2023/02/10 20:54:34 pong
2023/02/10 20:54:34 pong
2023/02/10 20:54:34 pong
2023/02/10 20:54:34 pong
2023/02/10 20:54:34 pong
2023/02/10 20:54:34 pong
2023/02/10 20:54:34 pong
2023/02/10 20:54:34 pong
2023/02/10 20:54:34 pong
# 目录结构
$ tree demo
demo
├── client.go
├── go.mod
├── go.sum
├── ping.proto
├── protos
│ └── ping
│ └── ping.pb.go
└── server.go2 directories, 6 files
1.1.2 客户端流
定义一个 MultiPing
方法,在客户端开启流,功能是持续发送多个 ping 请求,服务端统一响应一次。
ping.proto
文件的编写:
// ping.proto
// 指定proto版本
syntax = "proto3";
// 指定包名
package protos;
// 指定go包路径
option go_package = "protos/ping";
// 定义PingPong服务
service PingPong {// Ping 发送 ping 请求,接收 pong 响应// 客户端流模式,在请求消息前添加 stream 关键字rpc MultiPing(stream PingRequest) returns (PongResponse);
}// PingRequest 请求结构
message PingRequest {string value = 1; // value字段为string类型
}// PongResponse 响应结构
message PongResponse {string value = 1; // value字段为string类型
}
ping.pb.go
文件的生成:
$ protoc --go_out=plugins=grpc:. ping.proto
服务端实现,server.go
的编写,只有一个参数为 stream 对象的引用,可以通过它的 Recv
方法接收数据。使
用 SendAndClose
方法关闭流并响应,服务端可以根据需要提前关闭。
package mainimport ("fmt"// 引入编译生成的包pb "demo/protos/ping""google.golang.org/grpc""io""log""net"
)// PingPongServer 实现 pb.PingPongServer 接口
type PingPongServer struct {pb.UnimplementedPingPongServer
}// MultiPing 客户端流模式
func (s *PingPongServer) MultiPing(stream pb.PingPong_MultiPingServer) error {msgs := []string{}for {// 提前结束接收消息if len(msgs) > 5 {return stream.SendAndClose(&pb.PongResponse{Value: "ping enough, max 5"})}msg, err := stream.Recv()if err != nil {// 客户端消息结束,返回响应信息if err == io.EOF {return stream.SendAndClose(&pb.PongResponse{Value: fmt.Sprintf("got %d ping", len(msgs))})}return err}msgs = append(msgs, msg.Value)}
}// 启动server
func main() {srv := grpc.NewServer()// 注册 PingPongServerpb.RegisterPingPongServer(srv, &PingPongServer{})lis, err := net.Listen("tcp", ":7009")if err != nil {log.Fatalln(err)}log.Println("listen on 7009")srv.Serve(lis)
}
# 启动server
$ go run server.go
2023/02/10 21:26:42 listen on 7009
客户端实现,client.go
的编写,调用 MultiPing
方法时不再指定请求参数,而是通过返回的 stream 对象的
Send
分批发送数据。
package mainimport ("context"pb "demo/protos/ping" // 引入编译生成的包"google.golang.org/grpc""log"
)// Ping 单次请求-响应模式
func main() {conn, err := grpc.Dial("localhost:7009", grpc.WithInsecure())if err != nil {log.Fatalln(err)}defer conn.Close()// 实例化客户端并调用client := pb.NewPingPongClient(conn)// 获得对stream对象的引用// 调用并得到stream对象stream, err := client.MultiPing(context.Background())if err != nil {log.Fatalln(err)}// 发送数据for i := 0; i < 6; i++ {data := &pb.PingRequest{Value: "ping"}err = stream.Send(data)if err != nil {log.Fatalln(err)}}// 发送结束并获取服务端响应res, err := stream.CloseAndRecv()if err != nil {log.Fatalln(err)}log.Println(res.Value)
}
# 启动客户端
# 发送3个ping
$ go run client.go
2023/02/10 21:32:31 got 3 ping
# 发送6个ping
$ go run client.go
2023/02/10 21:32:31 ping enough, max 5
# 项目结构
$ tree demo
demo
├── client.go
├── go.mod
├── go.sum
├── ping.proto
├── protos
│ └── ping
│ └── ping.pb.go
└── server.go2 directories, 6 files
1.2 双向流
双向流是指客户端在发送数据和服务端响应数据的过程中都启用流特性,实际上单向流只是双向流的特例,有了上
面的基础,双向流就很好理解了。
定义一个 MultiPingPong
方法,在客户端和服务端都开启流,功能是服务端每接收到两个 ping 就响应一次
pong。
ping.proto
编写:
// ping.proto
// 指定proto版本
syntax = "proto3";
// 指定包名
package protos;
// 指定go包路径
option go_package = "protos/ping";// 定义PingPong服务
service PingPong {// Ping 发送 ping 请求,接收 pong 响应// 双向流模式rpc MultiPingPong(stream PingRequest) returns (stream PongResponse);
}// PingRequest 请求结构
message PingRequest {string value = 1; // value字段为string类型
}// PongResponse 响应结构
message PongResponse {string value = 1; // value字段为string类型
}
ping.pb.go
文件的生成:
$ protoc --go_out=plugins=grpc:. ping.proto
服务端实现,server.go
的编写,同样通过 stream
的 Recv
和 Send
方法接收和发送数据。
package mainimport (pb "demo/protos/ping" // 引入编译生成的包"google.golang.org/grpc""io""log""net"
)// PingPongServer 实现 pb.PingPongServer 接口
type PingPongServer struct {pb.UnimplementedPingPongServer
}func (s *PingPongServer) MultiPingPong(stream pb.PingPong_MultiPingPongServer) error {msgs := []string{}for {// 接收消息msg, err := stream.Recv()if err != nil {if err == io.EOF {break}return err}msgs = append(msgs, msg.Value)// 每收到两个消息响应一次if len(msgs)%2 == 0 {err = stream.Send(&pb.PongResponse{Value: "pong"})if err != nil {return err}}}return nil
}// 启动server
func main() {srv := grpc.NewServer()// 注册 PingPongServerpb.RegisterPingPongServer(srv, &PingPongServer{})lis, err := net.Listen("tcp", ":7009")if err != nil {log.Fatal(err)}log.Println("listen on 7009")srv.Serve(lis)
}
# 启动server
$ go run server.go
2023/02/10 21:26:42 listen on 7009
客户端实现,client.go
的编写,这里在另外一个 goroutine 里处理接收数据的逻辑来演示同时发送和接收数
据。
package mainimport ("context"pb "demo/protos/ping" // 引入编译生成的包"google.golang.org/grpc""io""log""time"
)// Ping 单次请求-响应模式
func main() {conn, err := grpc.Dial("localhost:7009", grpc.WithInsecure())if err != nil {log.Fatal(err)}defer conn.Close()// 实例化客户端并调用client := pb.NewPingPongClient(conn)stream, err := client.MultiPingPong(context.Background())if err != nil {log.Fatal(err)}// 在另一个goroutine中处理接收数据c := make(chan struct{})go func(stream pb.PingPong_MultiPingPongClient, c chan struct{}) {defer func() {c <- struct{}{}}()for {msg, err := stream.Recv()if err != nil {if err == io.EOF {break}log.Fatal(err)}log.Printf("recv:%s\n", msg.Value)}}(stream, c)// 发送数据for i := 0; i < 6; i++ {data := &pb.PingRequest{Value: "ping"}err = stream.Send(data)if err != nil {log.Fatal(err)}log.Printf("send:%s\n", data.Value)// 延时一段时间发送,等待响应结果time.Sleep(500 * time.Millisecond)}// 结束发送stream.CloseSend()// 等待接收完成<-c
}
# 启动客户端
$ go run client.go
2023/02/10 21:48:26 send:ping
2023/02/10 21:48:26 send:ping
2023/02/10 21:48:26 recv:pong
2023/02/10 21:48:27 send:ping
2023/02/10 21:48:27 send:ping
2023/02/10 21:48:27 recv:pong
2023/02/10 21:48:28 send:ping
2023/02/10 21:48:28 send:ping
2023/02/10 21:48:28 recv:pong
# 项目结构
$ tree demo
demo
├── client.go
├── go.mod
├── go.sum
├── ping.proto
├── protos
│ └── ping
│ └── ping.pb.go
└── server.go2 directories, 6 files
相关文章:
gRPC之gRPC流
1、gRPC流 从其名称可以理解,流就是持续不断的传输。有一些业务场景请求或者响应的数据量比较大,不适合使用普通的 RPC 调用通过一次请求-响应处理,一方面是考虑数据量大对请求响应时间的影响,另一方面业务场景的设计不一 定需…...
Kafka Shell命令交互
Kafka提供了一个命令行工具,用于管理和与Kafka集群交互。这个命令行工具通常称为Kafka Shell,它允许您执行各种操作,如创建主题、发送和消费消息、查看主题列表等。 以下是一些常用的Kafka Shell命令: 创建主题(Topic): kafka-topics.sh --create --topic my-topic --pa…...
什么是回归测试?
什么是回归测试? 回归测试被定义为一种软件测试类型,以确认最近的程序或代码更改未对现有功能产生不利影响。 回归测试只不过是全部或部分选择已执行的测试用例,然后重新执行以确保现有功能正常运行。 进行此测试是为了确保新代码更改不会…...
ZTMap是如何在相关政策引导下让建筑更加智慧化的?
近几年随着智慧楼宇概念的深入,尤其是在“十四五规划”“新基建”“数字经济”等相关战略和政策的引导下,智慧楼宇也迎来了快速发展期,对推动智慧城市系统的建设越来越重要。那么究竟什么是智慧楼宇呢?智慧楼宇其实就是整合楼宇内…...
Python:函数和代码复用
嗨喽,大家好呀~这里是爱看美女的茜茜呐 👇 👇 👇 更多精彩机密、教程,尽在下方,赶紧点击了解吧~ python源码、视频教程、插件安装教程、资料我都准备好了,直接在文末名片自取就可 1、关于递归函…...
three.js——模型对象的使用材质和方法
模型对象的使用材质和方法 前言效果图1、旋转、缩放、平移,居中的使用1.1 旋转rotation(.rotateX()、.rotateY()、.rotateZ())1.2缩放.scale()1.3平移.translate()1.4居中.center() 2、材质属性.wireframe 前言 BufferGeometry通过.scale()、…...
sql explain
目录 1. sql explain每个字段对应的含义1.1. id1.2. select_type1.3. table1.4. partitions1.5. type1.6. possible_keys1.7. key1.8. key_len1.9. ref1.10. rows1.11. Extra 索引实践联合索引最左列原则全值匹配不建议在索引列上做任何操作, 否则索引会失效转而全表扫描尽量使…...
【LeetCode-简单题】剑指 Offer 05. 替换空格
文章目录 题目方法一:常规做法:方法二:双指针做法 题目 方法一:常规做法: class Solution {public String replaceSpace(String s) {int len s.length() ;StringBuffer str new StringBuffer();for(int i 0 ; i &l…...
数字虚拟人制作简明指南
如何在线创建虚拟人? 虚拟人,也称为数字化身、虚拟助理或虚拟代理,是一种可以通过各种在线平台与用户进行逼真交互的人工智能人。 在线创建虚拟人变得越来越流行,因为它为个人和企业带来了许多好处。 推荐:用 NSDT编辑…...
Nginx 文件解析漏洞复现
一、漏洞说明 Nginx文件解析漏洞算是一个比较经典的漏洞,接下来我们就通过如下步骤进行漏洞复现,以及进行漏洞的修复。 版本条件:IIS 7.0/IIS 7.5/ Nginx <8.03 二、搭建环境 cd /vulhub/nginx/nginx_parsing_vulnerability docker-compos…...
Lombok依赖
一.介绍 Project Lombok 是一个 Java 库,它会自动插入编辑器和构建工具,为您的 Java 增添趣味。永远不要再写另一个 getter 或 equals 方法,使用一个注释,您的类有一个功能齐全的构建器,自动化您的日志记录变量等等。…...
XML 和 JSON 学习笔记(基础)
XML Why XML 的出现背景:在实际开发中,不同语言(如Java、JavaScript等)的应用程序之间数据传递的格式不同,导致它们进行数据交换时很困难,XML就应运而生了!(XML 是一种通用的数据交…...
L1-005 考试座位号分数 15
每个 PAT 考生在参加考试时都会被分配两个座位号,一个是试机座位,一个是考试座位。正常情况下,考生在入场时先得到试机座位号码,入座进入试机状态后,系统会显示该考生的考试座位号码,考试时考生需要换到考试…...
无涯教程-JavaScript - CEILING.MATH函数
描述 CEILING.MATH函数将数字四舍五入到最接近的整数或最接近的有效倍数。 Excel CEILING.MATH函数是Excel中的十五个舍入函数之一。 语法 CEILING.MATH (number, [significance], [mode])争论 Argument描述Required/OptionalNumberNumber must be less than 9.99E307 and …...
ChatGPT提示词(prompt)资源汇总
文章目录 awesome-chatgpt-promptsLearn PromptingSnack PromptFlow GPTPrompt VineChatGPT 指令大全AI Toolbox HubAI Short ChatGPT是一种强大的生成式AI模型,而提示词(prompt)则是与ChatGPT一起使用的指导性文本,用于引导模型生…...
MySQL 几种导数据的方法与遇到的问题
零、说在前面 MySQL导数据通常使用第三方工具和MySQL自身的工具,本文分别就这两类方法分别介绍。 一、第三方工具之 Navicat 1.1、Navicat的“数据传输”工具 打开Navicat,点击“工具”标签,找到“数据传输”,即可看到操作界面。…...
(21)多线程实例应用:双色球(6红+1蓝)
一、需求 1.双色球: 投注号码由6个红色球号码和1个蓝色球号码组成。 2.红色球号码从01--33中选择,红色球不能重复。 3.蓝色球号码从01--16中选择。 4.最终结果7个号码:61;即33选6(红) 16选1(蓝) 5.产品: …...
升级OpenSSL并进行编译安装
Packaging (OpenSSL)组件存在安全漏洞的原因是由于当前爆出的Openssl漏洞。 这个漏洞可能会导致泄露隐私信息,并且涉及的机器和环境也有所不同,因此修复方案也会有所不同。 目前,一些服务器使用的Nginx是静态编译OpenSSL,直接将Op…...
Spring整合RabbitMQ
一、步骤 生产者 ① 创建生产者工程 ② 添加依赖 ③ 配置整合 ④ 编写代码发送消息 消费者 ① 创建消费者工程 ② 添加依赖 ③ 配置整合 ④ 编写消息监听器 二、代码 生产者工程 1.在生产者工程和消费者工程中都导入如下依赖 <dependencies><dependency&g…...
MySQL——事务和视图
2023.9.17 本章开始介绍TCL语言(Transaction Control Language 事务控制语言)。 事务 事务的概念:一个或一组sql语句组成一个执行单元,这个执行单元要么全部执行,要么全部不执行。 事务的特性:ÿ…...
做好制造项目管理的5个技巧
制造过程通常由不同的要素组成,如采购材料、与供应商合作、优化生产线效率等。制造商还需要处理库存、物流和分销。 为了确保制造项目在预算范围内按时完成,并且不遗漏任何环节,企业必须建立项目管理流程,以帮助改善组织流程和效…...
JavaScript中While循环
JavaScript中处理For循环,还有一种循环while循环; ● 例如我们之前写了一个模拟举重次数的For循环,如下所示 for (let rep 1; rep < 10; rep) {console.log(举重${rep}次); }● 我们也可以使用while循环去实现这种功能 let rep 1; whi…...
python经典百题之乒乓球比赛
题目: 两个乒乓球队进行比赛,各出三人。甲队为a,b,c三人,乙队为x,y,z三人。已抽签决定比赛名单。有人向队员打听比赛的名单。a说他不和x比,c说他不和x,z比,请编程序找出三队赛手的名单。第一种方式: 思路…...
【C++ Exceptions】Catch exceptions by reference!
catch exceptions 写一个catch子句时必须指明异常对象是如何传递到这个子句来的,三种方式: by pointerby valueby reference 接下来比较它们使用时会出现的问题,以说明最好的选择是by reference。 catch by pointer 无需复制对象&#x…...
高斯公式证明
高斯公式: 若空间闭区域 Ω \Omega Ω 由光滑的闭曲面 Σ \Sigma Σ 围成,则 ∫ ∫ ∫ Ω ( ∂ P ∂ x ∂ Q ∂ y ∂ R ∂ z ) d v ∮ ∮ Σ P d y d z Q d z d x R d x d y \int \int \int _{\Omega}(\frac{\partial P}{\partial x} \frac{\p…...
速卖通获得aliexpress商品详情 API 返回值说明
item_get-获得aliexpress商品详情 aliexpress.item_get 进入测试 公共参数 名称类型必须描述keyString是调用key(必须以GET方式拼接在URL中)secretString是调用密钥api_nameString是API接口名称(包括在请求地址中)[item_search…...
c++语法-模板
模板 模板是C中一种强大的特性,允许你编写通用的代码,以便在不同数据类型上重复使用。模板分为函数模板和类模板,它们都是在编译时生成具体代码的蓝图。 函数模板 函数模板是一种定义通用函数的方式,可以在不同数据类型上使用相…...
DMNet复现(一)之数据准备篇:Density map guided object detection in aerial image
一、生成密度图 密度图标签生成 采用以下代码,生成训练集密度图gt: import cv2 import glob import h5py import scipy import pickle import numpy as np from PIL import Image from itertools import islice from tqdm import tqdm from matplotli…...
k8s相关命令-命名空间
k8s相关命令目录 文章目录 前言一、创建命名空间二、删除命名空间三、查看命名空间列表四、查看命名空间列表五、查看特定命名空间下所有资源六、删除特定命名空间下所有资源 前言 记录k8s命名空间的相关操作命令 一、创建命名空间 kubectl create namespace <namespace&g…...
CG Magic分享同一场景里下,VR渲染器和CR渲染器哪个好?
渲染操作时,VR渲染器和CR渲染器的对比成为常见问题了。这个问题很多人都会问。 今天CG Magic小编通过一个真实的项目,就是同一场景下来比较一下VR渲染器和CR渲染器的区别。 以下图为例是用来测试的场景当年的最终图。采用了当年的一个伊丽莎白大街152号的…...
上饶市建设局培训网站/恶意点击软件哪个好
我是做前端的,接触html也就一年,一年的时间,其实也就是个小白,恰好巧了,我这个小白,一年的时间,半年时间用来接触html、css、html5、css3等,当然也包括js与jquery的学习,…...
网站托管是什么/医院营销策略的具体方法
再游洛带有感——代腾飞 2007年9月15日 于成都今游古镇到洛带恍然忽至回唐朝身穿古装成侠客浪迹江湖甚逍遥...
沈阳公司建站/武汉大学人民医院怎么样
一直困惑于vlan的link问题,access link还稍微可以理解,而trunk link和hybrid link 怎么让untagged frame通过,我却一直很迷惑,终于在老师的指点下,知道了部分原理(首先得谢谢老师)废话少说&…...
2023年免费进入b站/精准客户数据采集软件
学习web编程的方法:1、学习html和css;2、学习javascript;3、了解web服务器;4、学习一门服务器端脚本语言;5、学习数据库及SQL语法;6、学习web框架。如何学习web开发,需要掌握哪些方面࿱…...
我做网站了 圆通/关键字挖掘爱站网
日萌社 人工智能AI:Keras PyTorch MXNet TensorFlow PaddlePaddle 深度学习实战(不定时更新) 1.nn.BatchNorm1d(num_features)1.对小批量(mini-batch)的2d或3d输入进行批标准化(Batch Normalization)操作2.num_features:来自期望输…...
怎么卖wordpress主题/网站优化是做什么的
计算机技术与高中英语课程整合的探索…………………………………孙晓瑜灵活运用多媒体技术、增强英语课堂学习效果…………………………段秀珍充分利用多媒体优势,优化英语阅读课教学……………………………孙晓瑜多媒体在英语教学中的运用…………………………………...