gRPC之gRPC流
1、gRPC流
从其名称可以理解,流就是持续不断的传输。有一些业务场景请求或者响应的数据量比较大,不适合使用普通的
RPC 调用通过一次请求-响应处理,一方面是考虑数据量大对请求响应时间的影响,另一方面业务场景的设计不一
定需要一次性处理完所有数据,这时就可以使用流来分批次传输数据。
HTTP/2中有两个概念,流(stream)与帧(frame),其中帧作为HTTP/2中通信的最小传输单位,通常一个请
求或响应会被分为一个或多个帧传输,流则表示已建立连接的虚拟通道,可以传输多次请求或响应。每个帧中包含
Stream Identifier,标志所属流。HTTP/2通过流与帧实现多路复用,对于相同域名的请求,通过Stream
Identifier标识可在同一个流中进行,从而减少连接开销。 而gRPC基于HTTP/2协议传输,自然而然也实现了流式
传输,其中gRPC中共有以下三种类型的流:
1、服务端流式响应
2、客户端流式请求
3、两端双向流式
本篇主要讲讲如何实现gRPC三种流式处理。
gRPC的stream只需要在service的rpc方法描述中通过 stream 关键字指定启用流特性就好了。
1.1 单向流
单向流是指客户端和服务端只有一端开启流特性,这里的单向特指发送数据的方向。
-
当服务端开启流时,客户端和普通 RPC 调用一样通过一次请求发送数据,服务端通过流分批次响应。
-
当客户端开启流时,客户端通过流分批次发送请求数据,服务端接完所有数据后统一响应一次。
1.1.1 服务端流
定义一个 MultiPong 方法,在服务端开启流,功能是接收到客户端的请求后响应10次 pong 消息。
ping.proto文件的编写:
// ping.proto
// 指定proto版本
syntax = "proto3";
// 指定包名
package protos;
// 指定go包路径
option go_package = "protos/ping";
// 定义PingPong服务
service PingPong {// Ping发送ping请求,接收pong响应// 服务端流模式,在响应消息前添加stream关键字rpc MultiPong(PingRequest) returns (stream PongResponse);
}// PingRequest请求结构
message PingRequest {// value字段为string类型string value = 1;
}// PongResponse 响应结构
message PongResponse {// value字段为string类型string value = 1;
}
ping.pb.go文件的生成:
$ protoc --go_out=plugins=grpc:. ping.proto
服务端实现,server.go的编写,第二个参数为 stream 对象的引用,可以通过它的 Send 方法发送数据。
package mainimport (// 引入编译生成的包pb "demo/protos/ping""google.golang.org/grpc""log""net"
)// PingPongServer 实现 pb.PingPongServer 接口
type PingPongServer struct {pb.UnimplementedPingPongServer
}// MultiPong 服务端流模式
func (s *PingPongServer) MultiPong(req *pb.PingRequest, stream pb.PingPong_MultiPongServer) error {for i := 0; i < 10; i++ {data := &pb.PongResponse{Value: "pong"}// 发送消息err := stream.Send(data)if err != nil {return err}}return nil
}// 启动server
func main() {srv := grpc.NewServer()// 注册 PingPongServerpb.RegisterPingPongServer(srv, &PingPongServer{})lis, err := net.Listen("tcp", ":7009")if err != nil {log.Fatalln(err)}log.Println("listen on 7009")srv.Serve(lis)
}
# 启动server
$ go run server.go
2023/02/10 20:51:04 listen on 7009
客户端实现,client.go的编写,请求方式和普通 RPC 没有区别,重点关注对响应数据流的处理,通过一个 for
循环接收数据直到结束。
package mainimport ("context"pb "demo/protos/ping" // 引入编译生成的包"google.golang.org/grpc""io""log"
)// Ping 单次请求-响应模式
func main() {conn, err := grpc.Dial("localhost:7009", grpc.WithInsecure())if err != nil {log.Fatalln(err)}defer conn.Close()// 实例化客户端并调用client := pb.NewPingPongClient(conn)// 获得对 stream 对象的引用stream, err := client.MultiPong(context.Background(), &pb.PingRequest{Value: "ping"})if err != nil {log.Fatalln(err)}// 循环接收响应数据流for {msg, err := stream.Recv()if err != nil {// 数据结束if err == io.EOF {break}log.Fatalln(err)}log.Println(msg.Value)}
}
# 客户端运行
$ go run client.go
2023/02/10 20:54:34 pong
2023/02/10 20:54:34 pong
2023/02/10 20:54:34 pong
2023/02/10 20:54:34 pong
2023/02/10 20:54:34 pong
2023/02/10 20:54:34 pong
2023/02/10 20:54:34 pong
2023/02/10 20:54:34 pong
2023/02/10 20:54:34 pong
2023/02/10 20:54:34 pong
# 目录结构
$ tree demo
demo
├── client.go
├── go.mod
├── go.sum
├── ping.proto
├── protos
│ └── ping
│ └── ping.pb.go
└── server.go2 directories, 6 files
1.1.2 客户端流
定义一个 MultiPing 方法,在客户端开启流,功能是持续发送多个 ping 请求,服务端统一响应一次。
ping.proto文件的编写:
// ping.proto
// 指定proto版本
syntax = "proto3";
// 指定包名
package protos;
// 指定go包路径
option go_package = "protos/ping";
// 定义PingPong服务
service PingPong {// Ping 发送 ping 请求,接收 pong 响应// 客户端流模式,在请求消息前添加 stream 关键字rpc MultiPing(stream PingRequest) returns (PongResponse);
}// PingRequest 请求结构
message PingRequest {string value = 1; // value字段为string类型
}// PongResponse 响应结构
message PongResponse {string value = 1; // value字段为string类型
}
ping.pb.go文件的生成:
$ protoc --go_out=plugins=grpc:. ping.proto
服务端实现,server.go的编写,只有一个参数为 stream 对象的引用,可以通过它的 Recv 方法接收数据。使
用 SendAndClose 方法关闭流并响应,服务端可以根据需要提前关闭。
package mainimport ("fmt"// 引入编译生成的包pb "demo/protos/ping""google.golang.org/grpc""io""log""net"
)// PingPongServer 实现 pb.PingPongServer 接口
type PingPongServer struct {pb.UnimplementedPingPongServer
}// MultiPing 客户端流模式
func (s *PingPongServer) MultiPing(stream pb.PingPong_MultiPingServer) error {msgs := []string{}for {// 提前结束接收消息if len(msgs) > 5 {return stream.SendAndClose(&pb.PongResponse{Value: "ping enough, max 5"})}msg, err := stream.Recv()if err != nil {// 客户端消息结束,返回响应信息if err == io.EOF {return stream.SendAndClose(&pb.PongResponse{Value: fmt.Sprintf("got %d ping", len(msgs))})}return err}msgs = append(msgs, msg.Value)}
}// 启动server
func main() {srv := grpc.NewServer()// 注册 PingPongServerpb.RegisterPingPongServer(srv, &PingPongServer{})lis, err := net.Listen("tcp", ":7009")if err != nil {log.Fatalln(err)}log.Println("listen on 7009")srv.Serve(lis)
}
# 启动server
$ go run server.go
2023/02/10 21:26:42 listen on 7009
客户端实现,client.go的编写,调用 MultiPing 方法时不再指定请求参数,而是通过返回的 stream 对象的
Send 分批发送数据。
package mainimport ("context"pb "demo/protos/ping" // 引入编译生成的包"google.golang.org/grpc""log"
)// Ping 单次请求-响应模式
func main() {conn, err := grpc.Dial("localhost:7009", grpc.WithInsecure())if err != nil {log.Fatalln(err)}defer conn.Close()// 实例化客户端并调用client := pb.NewPingPongClient(conn)// 获得对stream对象的引用// 调用并得到stream对象stream, err := client.MultiPing(context.Background())if err != nil {log.Fatalln(err)}// 发送数据for i := 0; i < 6; i++ {data := &pb.PingRequest{Value: "ping"}err = stream.Send(data)if err != nil {log.Fatalln(err)}}// 发送结束并获取服务端响应res, err := stream.CloseAndRecv()if err != nil {log.Fatalln(err)}log.Println(res.Value)
}
# 启动客户端
# 发送3个ping
$ go run client.go
2023/02/10 21:32:31 got 3 ping
# 发送6个ping
$ go run client.go
2023/02/10 21:32:31 ping enough, max 5
# 项目结构
$ tree demo
demo
├── client.go
├── go.mod
├── go.sum
├── ping.proto
├── protos
│ └── ping
│ └── ping.pb.go
└── server.go2 directories, 6 files
1.2 双向流
双向流是指客户端在发送数据和服务端响应数据的过程中都启用流特性,实际上单向流只是双向流的特例,有了上
面的基础,双向流就很好理解了。
定义一个 MultiPingPong 方法,在客户端和服务端都开启流,功能是服务端每接收到两个 ping 就响应一次
pong。
ping.proto编写:
// ping.proto
// 指定proto版本
syntax = "proto3";
// 指定包名
package protos;
// 指定go包路径
option go_package = "protos/ping";// 定义PingPong服务
service PingPong {// Ping 发送 ping 请求,接收 pong 响应// 双向流模式rpc MultiPingPong(stream PingRequest) returns (stream PongResponse);
}// PingRequest 请求结构
message PingRequest {string value = 1; // value字段为string类型
}// PongResponse 响应结构
message PongResponse {string value = 1; // value字段为string类型
}
ping.pb.go文件的生成:
$ protoc --go_out=plugins=grpc:. ping.proto
服务端实现,server.go的编写,同样通过 stream 的 Recv 和 Send 方法接收和发送数据。
package mainimport (pb "demo/protos/ping" // 引入编译生成的包"google.golang.org/grpc""io""log""net"
)// PingPongServer 实现 pb.PingPongServer 接口
type PingPongServer struct {pb.UnimplementedPingPongServer
}func (s *PingPongServer) MultiPingPong(stream pb.PingPong_MultiPingPongServer) error {msgs := []string{}for {// 接收消息msg, err := stream.Recv()if err != nil {if err == io.EOF {break}return err}msgs = append(msgs, msg.Value)// 每收到两个消息响应一次if len(msgs)%2 == 0 {err = stream.Send(&pb.PongResponse{Value: "pong"})if err != nil {return err}}}return nil
}// 启动server
func main() {srv := grpc.NewServer()// 注册 PingPongServerpb.RegisterPingPongServer(srv, &PingPongServer{})lis, err := net.Listen("tcp", ":7009")if err != nil {log.Fatal(err)}log.Println("listen on 7009")srv.Serve(lis)
}
# 启动server
$ go run server.go
2023/02/10 21:26:42 listen on 7009
客户端实现,client.go的编写,这里在另外一个 goroutine 里处理接收数据的逻辑来演示同时发送和接收数
据。
package mainimport ("context"pb "demo/protos/ping" // 引入编译生成的包"google.golang.org/grpc""io""log""time"
)// Ping 单次请求-响应模式
func main() {conn, err := grpc.Dial("localhost:7009", grpc.WithInsecure())if err != nil {log.Fatal(err)}defer conn.Close()// 实例化客户端并调用client := pb.NewPingPongClient(conn)stream, err := client.MultiPingPong(context.Background())if err != nil {log.Fatal(err)}// 在另一个goroutine中处理接收数据c := make(chan struct{})go func(stream pb.PingPong_MultiPingPongClient, c chan struct{}) {defer func() {c <- struct{}{}}()for {msg, err := stream.Recv()if err != nil {if err == io.EOF {break}log.Fatal(err)}log.Printf("recv:%s\n", msg.Value)}}(stream, c)// 发送数据for i := 0; i < 6; i++ {data := &pb.PingRequest{Value: "ping"}err = stream.Send(data)if err != nil {log.Fatal(err)}log.Printf("send:%s\n", data.Value)// 延时一段时间发送,等待响应结果time.Sleep(500 * time.Millisecond)}// 结束发送stream.CloseSend()// 等待接收完成<-c
}
# 启动客户端
$ go run client.go
2023/02/10 21:48:26 send:ping
2023/02/10 21:48:26 send:ping
2023/02/10 21:48:26 recv:pong
2023/02/10 21:48:27 send:ping
2023/02/10 21:48:27 send:ping
2023/02/10 21:48:27 recv:pong
2023/02/10 21:48:28 send:ping
2023/02/10 21:48:28 send:ping
2023/02/10 21:48:28 recv:pong
# 项目结构
$ tree demo
demo
├── client.go
├── go.mod
├── go.sum
├── ping.proto
├── protos
│ └── ping
│ └── ping.pb.go
└── server.go2 directories, 6 files
相关文章:
gRPC之gRPC流
1、gRPC流 从其名称可以理解,流就是持续不断的传输。有一些业务场景请求或者响应的数据量比较大,不适合使用普通的 RPC 调用通过一次请求-响应处理,一方面是考虑数据量大对请求响应时间的影响,另一方面业务场景的设计不一 定需…...
Kafka Shell命令交互
Kafka提供了一个命令行工具,用于管理和与Kafka集群交互。这个命令行工具通常称为Kafka Shell,它允许您执行各种操作,如创建主题、发送和消费消息、查看主题列表等。 以下是一些常用的Kafka Shell命令: 创建主题(Topic): kafka-topics.sh --create --topic my-topic --pa…...
什么是回归测试?
什么是回归测试? 回归测试被定义为一种软件测试类型,以确认最近的程序或代码更改未对现有功能产生不利影响。 回归测试只不过是全部或部分选择已执行的测试用例,然后重新执行以确保现有功能正常运行。 进行此测试是为了确保新代码更改不会…...
ZTMap是如何在相关政策引导下让建筑更加智慧化的?
近几年随着智慧楼宇概念的深入,尤其是在“十四五规划”“新基建”“数字经济”等相关战略和政策的引导下,智慧楼宇也迎来了快速发展期,对推动智慧城市系统的建设越来越重要。那么究竟什么是智慧楼宇呢?智慧楼宇其实就是整合楼宇内…...
Python:函数和代码复用
嗨喽,大家好呀~这里是爱看美女的茜茜呐 👇 👇 👇 更多精彩机密、教程,尽在下方,赶紧点击了解吧~ python源码、视频教程、插件安装教程、资料我都准备好了,直接在文末名片自取就可 1、关于递归函…...
three.js——模型对象的使用材质和方法
模型对象的使用材质和方法 前言效果图1、旋转、缩放、平移,居中的使用1.1 旋转rotation(.rotateX()、.rotateY()、.rotateZ())1.2缩放.scale()1.3平移.translate()1.4居中.center() 2、材质属性.wireframe 前言 BufferGeometry通过.scale()、…...
sql explain
目录 1. sql explain每个字段对应的含义1.1. id1.2. select_type1.3. table1.4. partitions1.5. type1.6. possible_keys1.7. key1.8. key_len1.9. ref1.10. rows1.11. Extra 索引实践联合索引最左列原则全值匹配不建议在索引列上做任何操作, 否则索引会失效转而全表扫描尽量使…...
【LeetCode-简单题】剑指 Offer 05. 替换空格
文章目录 题目方法一:常规做法:方法二:双指针做法 题目 方法一:常规做法: class Solution {public String replaceSpace(String s) {int len s.length() ;StringBuffer str new StringBuffer();for(int i 0 ; i &l…...
数字虚拟人制作简明指南
如何在线创建虚拟人? 虚拟人,也称为数字化身、虚拟助理或虚拟代理,是一种可以通过各种在线平台与用户进行逼真交互的人工智能人。 在线创建虚拟人变得越来越流行,因为它为个人和企业带来了许多好处。 推荐:用 NSDT编辑…...
Nginx 文件解析漏洞复现
一、漏洞说明 Nginx文件解析漏洞算是一个比较经典的漏洞,接下来我们就通过如下步骤进行漏洞复现,以及进行漏洞的修复。 版本条件:IIS 7.0/IIS 7.5/ Nginx <8.03 二、搭建环境 cd /vulhub/nginx/nginx_parsing_vulnerability docker-compos…...
Lombok依赖
一.介绍 Project Lombok 是一个 Java 库,它会自动插入编辑器和构建工具,为您的 Java 增添趣味。永远不要再写另一个 getter 或 equals 方法,使用一个注释,您的类有一个功能齐全的构建器,自动化您的日志记录变量等等。…...
XML 和 JSON 学习笔记(基础)
XML Why XML 的出现背景:在实际开发中,不同语言(如Java、JavaScript等)的应用程序之间数据传递的格式不同,导致它们进行数据交换时很困难,XML就应运而生了!(XML 是一种通用的数据交…...
L1-005 考试座位号分数 15
每个 PAT 考生在参加考试时都会被分配两个座位号,一个是试机座位,一个是考试座位。正常情况下,考生在入场时先得到试机座位号码,入座进入试机状态后,系统会显示该考生的考试座位号码,考试时考生需要换到考试…...
无涯教程-JavaScript - CEILING.MATH函数
描述 CEILING.MATH函数将数字四舍五入到最接近的整数或最接近的有效倍数。 Excel CEILING.MATH函数是Excel中的十五个舍入函数之一。 语法 CEILING.MATH (number, [significance], [mode])争论 Argument描述Required/OptionalNumberNumber must be less than 9.99E307 and …...
ChatGPT提示词(prompt)资源汇总
文章目录 awesome-chatgpt-promptsLearn PromptingSnack PromptFlow GPTPrompt VineChatGPT 指令大全AI Toolbox HubAI Short ChatGPT是一种强大的生成式AI模型,而提示词(prompt)则是与ChatGPT一起使用的指导性文本,用于引导模型生…...
MySQL 几种导数据的方法与遇到的问题
零、说在前面 MySQL导数据通常使用第三方工具和MySQL自身的工具,本文分别就这两类方法分别介绍。 一、第三方工具之 Navicat 1.1、Navicat的“数据传输”工具 打开Navicat,点击“工具”标签,找到“数据传输”,即可看到操作界面。…...
(21)多线程实例应用:双色球(6红+1蓝)
一、需求 1.双色球: 投注号码由6个红色球号码和1个蓝色球号码组成。 2.红色球号码从01--33中选择,红色球不能重复。 3.蓝色球号码从01--16中选择。 4.最终结果7个号码:61;即33选6(红) 16选1(蓝) 5.产品: …...
升级OpenSSL并进行编译安装
Packaging (OpenSSL)组件存在安全漏洞的原因是由于当前爆出的Openssl漏洞。 这个漏洞可能会导致泄露隐私信息,并且涉及的机器和环境也有所不同,因此修复方案也会有所不同。 目前,一些服务器使用的Nginx是静态编译OpenSSL,直接将Op…...
Spring整合RabbitMQ
一、步骤 生产者 ① 创建生产者工程 ② 添加依赖 ③ 配置整合 ④ 编写代码发送消息 消费者 ① 创建消费者工程 ② 添加依赖 ③ 配置整合 ④ 编写消息监听器 二、代码 生产者工程 1.在生产者工程和消费者工程中都导入如下依赖 <dependencies><dependency&g…...
MySQL——事务和视图
2023.9.17 本章开始介绍TCL语言(Transaction Control Language 事务控制语言)。 事务 事务的概念:一个或一组sql语句组成一个执行单元,这个执行单元要么全部执行,要么全部不执行。 事务的特性:ÿ…...
【大模型RAG】拍照搜题技术架构速览:三层管道、两级检索、兜底大模型
摘要 拍照搜题系统采用“三层管道(多模态 OCR → 语义检索 → 答案渲染)、两级检索(倒排 BM25 向量 HNSW)并以大语言模型兜底”的整体框架: 多模态 OCR 层 将题目图片经过超分、去噪、倾斜校正后,分别用…...
变量 varablie 声明- Rust 变量 let mut 声明与 C/C++ 变量声明对比分析
一、变量声明设计:let 与 mut 的哲学解析 Rust 采用 let 声明变量并通过 mut 显式标记可变性,这种设计体现了语言的核心哲学。以下是深度解析: 1.1 设计理念剖析 安全优先原则:默认不可变强制开发者明确声明意图 let x 5; …...
装饰模式(Decorator Pattern)重构java邮件发奖系统实战
前言 现在我们有个如下的需求,设计一个邮件发奖的小系统, 需求 1.数据验证 → 2. 敏感信息加密 → 3. 日志记录 → 4. 实际发送邮件 装饰器模式(Decorator Pattern)允许向一个现有的对象添加新的功能,同时又不改变其…...
Java 8 Stream API 入门到实践详解
一、告别 for 循环! 传统痛点: Java 8 之前,集合操作离不开冗长的 for 循环和匿名类。例如,过滤列表中的偶数: List<Integer> list Arrays.asList(1, 2, 3, 4, 5); List<Integer> evens new ArrayList…...
.Net框架,除了EF还有很多很多......
文章目录 1. 引言2. Dapper2.1 概述与设计原理2.2 核心功能与代码示例基本查询多映射查询存储过程调用 2.3 性能优化原理2.4 适用场景 3. NHibernate3.1 概述与架构设计3.2 映射配置示例Fluent映射XML映射 3.3 查询示例HQL查询Criteria APILINQ提供程序 3.4 高级特性3.5 适用场…...
CMake 从 GitHub 下载第三方库并使用
有时我们希望直接使用 GitHub 上的开源库,而不想手动下载、编译和安装。 可以利用 CMake 提供的 FetchContent 模块来实现自动下载、构建和链接第三方库。 FetchContent 命令官方文档✅ 示例代码 我们将以 fmt 这个流行的格式化库为例,演示如何: 使用 FetchContent 从 GitH…...
自然语言处理——Transformer
自然语言处理——Transformer 自注意力机制多头注意力机制Transformer 虽然循环神经网络可以对具有序列特性的数据非常有效,它能挖掘数据中的时序信息以及语义信息,但是它有一个很大的缺陷——很难并行化。 我们可以考虑用CNN来替代RNN,但是…...
【HTTP三个基础问题】
面试官您好!HTTP是超文本传输协议,是互联网上客户端和服务器之间传输超文本数据(比如文字、图片、音频、视频等)的核心协议,当前互联网应用最广泛的版本是HTTP1.1,它基于经典的C/S模型,也就是客…...
Maven 概述、安装、配置、仓库、私服详解
目录 1、Maven 概述 1.1 Maven 的定义 1.2 Maven 解决的问题 1.3 Maven 的核心特性与优势 2、Maven 安装 2.1 下载 Maven 2.2 安装配置 Maven 2.3 测试安装 2.4 修改 Maven 本地仓库的默认路径 3、Maven 配置 3.1 配置本地仓库 3.2 配置 JDK 3.3 IDEA 配置本地 Ma…...
Android第十三次面试总结(四大 组件基础)
Activity生命周期和四大启动模式详解 一、Activity 生命周期 Activity 的生命周期由一系列回调方法组成,用于管理其创建、可见性、焦点和销毁过程。以下是核心方法及其调用时机: onCreate() 调用时机:Activity 首次创建时调用。…...
