当前位置: 首页 > news >正文

记录一次gRpc流式操作(jedis版)

使用背景: 从redis队列中发送和消费消息.(使用gRpc的流式实现的消费消息)

gRpc协议类定义

service方法定义
service MQDataService{
rpc sendFacebookAndroidMsg(google.protobuf.StringValue)returns (ResultProto);
rpc receiveFacebookAndroidMsg(empty)returns (stream google.protobuf.StringValue);
}

服务端写法

@Overridepublic void sendFacebookAndroidMsg(StringValue request, StreamObserver<ResultProto> responseObserver) {CacheKey cacheKey= AppKey.appReport;String key=cacheKey.get_keyName().replace("{PLATFORM}", MqTopic.FB_TOPIC).replace("{APPTYPE}", "0");RedissonFactory.pushMsg(key, request.getValue(), cacheKey.get_dbIndex(),cacheKey.get_expireSecondTime());ResultProto.Builder builder = ResultProto.newBuilder();builder.setCode(ResultType.SUCCESS);responseObserver.onNext(builder.build());responseObserver.onCompleted();}@Overridepublic void receiveFacebookAndroidMsg(empty request, StreamObserver<StringValue> responseObserver) {MQListener mqListener=new MQListener(responseObserver);try {CacheKey cacheKey= AppKey.appReport;String key=cacheKey.get_keyName().replace("{PLATFORM}", MqTopic.FB_TOPIC).replace("{APPTYPE}","0");RedissonFactory.getRedis().subscribe(mqListener,key);} catch (Exception e) {}finally {responseObserver.onCompleted();}}// 消息监听响应
public class MQListener extends JedisPubSub {public MQListener(StreamObserver<StringValue> responseObserver){_responseObserver=responseObserver;}private StreamObserver<StringValue> _responseObserver;// 取得订阅的消息后的处理public void onMessage(String channel, String message) {if(!StringUtil.isNullOrEmpty(message)){StringValue.Builder builder = StringValue.newBuilder();builder.setValue(message);_responseObserver.onNext(builder.build());}}// 初始化订阅时候的处理public void onSubscribe(String channel, int subscribedChannels) {...}// 取消订阅时候的处理public void onUnsubscribe(String channel, int subscribedChannels) {...}// 初始化按表达式的方式订阅时候的处理public void onPSubscribe(String pattern, int subscribedChannels) {...}// 取消按表达式的方式订阅时候的处理public void onPUnsubscribe(String pattern, int subscribedChannels) {...}// 取得按表达式的方式订阅的消息后的处理public void onPMessage(String pattern, String channel, String message) {...}
}

客户端写法

public static void receiveFacebookAndroidMsg() {try {log.info("facebook android msg");// 接收消息StreamObserver<StringValue> responseObserver = new StreamObserver<StringValue>() {@Overridepublic void onNext(StringValue msgProto) {try {log.info("facebook android msg 接收到消息: {}", msgProto.getValue());JSONObject jsonObject = JSONObject.parseObject(msgProto.getValue());...} catch (Exception e) {log.error("facebook ios msg 消费失败{}", e.getMessage());// 发给mq重新消费...}}@Overridepublic void onError(Throwable throwable) {System.err.println("Error occurred: " + throwable.getMessage());log.info("facebook android Error occurred: {}", throwable.getMessage());}@Overridepublic void onCompleted() {System.out.println("Stream completed.");log.info("facebook android Stream completed.");}};log.info("接收fb android msg 开始");ClientManager.getMqDataServiceStub().receiveFacebookAndroidMsg(empty.newBuilder().build(), responseObserver);log.info("接收fb android msg 成功");} catch (Exception e) {log.info("出错了");}}

源码下载

相关文章:

记录一次gRpc流式操作(jedis版)

使用背景: 从redis队列中发送和消费消息.(使用gRpc的流式实现的消费消息) gRpc协议类定义 service方法定义 service MQDataService{ rpc sendFacebookAndroidMsg(google.protobuf.StringValue)returns (ResultProto); rpc receiveFacebookAndroidMsg(empty)returns (stream g…...

20241001国庆学习

n60f/p 这个n是指旋转磁场的速度。 极数表示旋转转子的永磁体极数&#xff0c;具有一对N极&#xff0f;S极的电机称为双极电机。 极数可以是2、4、6、8等。 &#xff08;从电机控制的角度来看&#xff0c;当极数增加一倍时&#xff0c;转速将减半&#xff0c;当极数增加四倍时…...

基于SSM的农产品仓库管理系统【附源码】

基于SSM的农产品仓库管理系统&#xff08;源码L文说明文档&#xff09; 目录 4 系统设计 4.1 系统概要设计 4.2 系统功能结构设计 4.3 数据库设计 4.3.1 数据库E-R图设计 4.3.2 数据库表结构设计 5 系统实现 5.1 管理员功能介绍 5.1.1 用户管…...

fmt:C++ 格式化库

fmt 是一个现代化、快速且安全的 C 格式化库&#xff0c;专注于高效地格式化文本。它提供了类似 Python 的 format 功能&#xff0c;但具有更高的性能和类型安全特性。fmt 库在处理字符串格式化、日志输出以及构建用户友好的输出时尤为强大。自从 C20 标准引入 std::format 后&…...

RabbitMQ MQ的可靠性及消费者的可靠性

1.MQ可靠性&#xff1a; 如何保证消息的可靠性&#xff1a; (1).通过配置可以让交换机、队列、以及发送的消息都持久化。这样队列中的消息会持久化到磁盘&#xff0c;MQ重起消息依然存在。 (2).3.6.0版本开始&#xff0c;RabbitMQ引入了惰性队列模式&#xff0c;这种模式下&am…...

使用 Nexus 代理 Docker Hub 的配置指南

在本篇文章中&#xff0c;我们将详细介绍如何配置 Nexus 以代理 Docker Hub&#xff0c;从而实现更高效的镜像管理。以下步骤涵盖了从 Nexus 的安装到 Docker 客户端的配置。 1. 配置 Nexus 1.1 登录 Nexus 打开浏览器&#xff0c;访问 Nexus 的 URL&#xff08;例如 http:/…...

笔记整理—linux进程部分(4)进程状态与守护进程

进程的几种重要状态&#xff0c;就绪态&#xff1b;运行态&#xff1b;僵尸态&#xff1b;等待态&#xff08;浅度睡眠、深度睡眠&#xff09;&#xff1b;停止态。 就单核CPU而言&#xff0c;在同一时间只能运行一个进程&#xff0c;但实际上要运行的进程不止一个&#xff0c;…...

# VirtualBox中安装的CentOS 6.5网络设置为NAT模式时,怎么使用SecureCRT连接CentOS6.5系统?

VirtualBox中安装的CentOS 6.5网络设置为NAT模式时&#xff0c;怎么使用SecureCRT连接CentOS6.5系统&#xff1f; 一、查询 【VirtualBox Host-Only Network】虚拟网卡的网络配置 IP。 1、按键盘上WIN R 组合键&#xff0c;打开【运行】&#xff0c;输入【 ncpa.cpl 】&…...

7-1.Android SQLite 之 SQLiteDatabase 简单编码模板(SQLiteDatabase 使用、SQL 语句编写)

一、SQLiteDatabase SQLite 是一种轻量级的数据库引擎&#xff0c;它非常适合在移动设备&#xff08;例如&#xff0c;Android&#xff09;上使用 SQLiteDatabase 允许应用程序与 SQLite 数据库进行交互&#xff0c;它提供了增删改查等一系列方法 二、SQLiteDatabase 简单编码…...

灰度图像重心(质心)求取算法

1、图像的重心坐标计算 假设我们有一个二维图像,其中 (x, y) 表示图像中每个像素的坐标。I(x, y) 表示图像在 (x, y) 处的亮度(或像素值),通常是灰度值。 图像的重心坐标 (X, Y) 可以通过以下公式计算: X = Σ [x * I(x, y)] / Σ I(x, y) Y = Σ [y * I(x, y)] / Σ I(…...

k8s 1.28.2 集群部署 ingress 1.11.1 包含 admission-webhook

文章目录 [toc]证书创建部署 ingress-controlleringress 验证创建测试 nginx pod创建错误的 ingress 配置创建正确的 ingress 配置 ingress 官方 yaml 文件&#xff1a;deploy.yaml基于官方 yaml 文件做了一些修改 官方的 svc 是 ClusterIP 和 LoadBalancer&#xff0c;我这边把…...

pom web 自动化测试框架分享

这是初版的 pom web 测试框架&#xff0c;目录如下同时部分代码也放在下面&#xff0c;详细代码可前往 github 查看&#xff0c;欢迎大家给出宝贵意见。 |--base | base_page.py&#xff08;封装方法&#xff09; | |--config | allure_config.py&#xff08;测试报告配…...

一些以前使用的linux及shell命令,gnuplot脚本

tar tar -cvzf xxx.tar.gz * -c&#xff0c;--create 创建新的tar文件 -v&#xff0c;--verbose 列出每一步处理涉及的文件的信息&#xff0c;只用一个“v”时&#xff0c;仅列出文件名 使用两个“v”时&#xff0c;列出权限、所有者、大小、时间、文件名等信息 -z&#xff0c…...

Django一分钟:DRF模型序列化器处理关联关系的示例与注意事项

DRF的ModelSerializer序列化器与Django的Model模型紧密映射&#xff0c;本文将通过简单的示例介绍几种处理关联关系的方法。 1. 创建模型和初始数据 创建模型 from django.db import modelsclass Product(models.Model):product_name models.CharField(max_length255)quant…...

Python爬虫selenium框架基本使用

一、安装导入 使用包管理器安装 pip3 install selenium 二、WebDriver工具 要使用这个工具我们需要保证安装了一个浏览器的驱动器。 Python的WebDriver是一个用于自动化Web浏览器操作的工具&#xff0c;它属于Selenium的一部分&#xff0c;特别是Selenium 2.0及以后版本中…...

sql 时间交集

任务&#xff08;取时间交集&#xff09; 前端输入开始时间和结束时间&#xff0c;通过sql筛选出活动开始时间和活动结束时间再开时时间和结束时间有交集的活动 想法&#xff1a; 前后一段时间内遇到了类似取交集的&#xff0c;从网上找到了两种写法&#xff0c;再结合GPT等…...

【深度学习】05-Rnn循环神经网络-01- 自然语言处理概述/词嵌入层/循环网络/文本生成案例精讲

循环神经网络&#xff08;RNN&#xff09;主要用于自然语言处理的。 循环神经网络&#xff08;RNN&#xff09;、卷积神经网络&#xff08;CNN&#xff09;和全连接神经网络&#xff08;FCN&#xff09;是三种常见的神经网络类型&#xff0c;各自擅长处理不同类型的数据。下面…...

基于JAVA+SpringBoot+Vue的电商平台的设计与实现

基于JAVASpringBootVue的电商平台的设计与实现 前言 ✌全网粉丝20W,csdn特邀作者、博客专家、CSDN[新星计划]导师、java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末附源码下载链接&#x1f345…...

CSS盒模型-怪异盒模型笔记-思维导图-案例等

文章目录 一、盒模型&#xff08;重点&#xff09;二、怪异盒模型三、块级元素和行内元素区别汇总四、块级元素和行内元素的转换(显示方式)||元素的显示和隐藏五、思维导图六、笔记资料 一、盒模型&#xff08;重点&#xff09; 所有HTML元素可以看作盒子。 CSS盒模型本质上是…...

thinkphp6开发的通用网站系统源码

thinkphp6开发的通用网站系统源码。 基于ThinkPHP6框架开发的通用后台权限管理系统&#xff0c;底层采用国内最流行的ThinkPHP6框架&#xff0c; 支持内容管理、文章管理、用户管理、权限管理、角色管理等功能。 代码下载百度网盘...

国防科技大学计算机基础课程笔记02信息编码

1.机内码和国标码 国标码就是我们非常熟悉的这个GB2312,但是因为都是16进制&#xff0c;因此这个了16进制的数据既可以翻译成为这个机器码&#xff0c;也可以翻译成为这个国标码&#xff0c;所以这个时候很容易会出现这个歧义的情况&#xff1b; 因此&#xff0c;我们的这个国…...

RocketMQ延迟消息机制

两种延迟消息 RocketMQ中提供了两种延迟消息机制 指定固定的延迟级别 通过在Message中设定一个MessageDelayLevel参数&#xff0c;对应18个预设的延迟级别指定时间点的延迟级别 通过在Message中设定一个DeliverTimeMS指定一个Long类型表示的具体时间点。到了时间点后&#xf…...

DBAPI如何优雅的获取单条数据

API如何优雅的获取单条数据 案例一 对于查询类API&#xff0c;查询的是单条数据&#xff0c;比如根据主键ID查询用户信息&#xff0c;sql如下&#xff1a; select id, name, age from user where id #{id}API默认返回的数据格式是多条的&#xff0c;如下&#xff1a; {&qu…...

全志A40i android7.1 调试信息打印串口由uart0改为uart3

一&#xff0c;概述 1. 目的 将调试信息打印串口由uart0改为uart3。 2. 版本信息 Uboot版本&#xff1a;2014.07&#xff1b; Kernel版本&#xff1a;Linux-3.10&#xff1b; 二&#xff0c;Uboot 1. sys_config.fex改动 使能uart3(TX:PH00 RX:PH01)&#xff0c;并让boo…...

AI书签管理工具开发全记录(十九):嵌入资源处理

1.前言 &#x1f4dd; 在上一篇文章中&#xff0c;我们完成了书签的导入导出功能。本篇文章我们研究如何处理嵌入资源&#xff0c;方便后续将资源打包到一个可执行文件中。 2.embed介绍 &#x1f3af; Go 1.16 引入了革命性的 embed 包&#xff0c;彻底改变了静态资源管理的…...

苹果AI眼镜:从“工具”到“社交姿态”的范式革命——重新定义AI交互入口的未来机会

在2025年的AI硬件浪潮中,苹果AI眼镜(Apple Glasses)正在引发一场关于“人机交互形态”的深度思考。它并非简单地替代AirPods或Apple Watch,而是开辟了一个全新的、日常可接受的AI入口。其核心价值不在于功能的堆叠,而在于如何通过形态设计打破社交壁垒,成为用户“全天佩戴…...

什么是VR全景技术

VR全景技术&#xff0c;全称为虚拟现实全景技术&#xff0c;是通过计算机图像模拟生成三维空间中的虚拟世界&#xff0c;使用户能够在该虚拟世界中进行全方位、无死角的观察和交互的技术。VR全景技术模拟人在真实空间中的视觉体验&#xff0c;结合图文、3D、音视频等多媒体元素…...

消息队列系统设计与实践全解析

文章目录 &#x1f680; 消息队列系统设计与实践全解析&#x1f50d; 一、消息队列选型1.1 业务场景匹配矩阵1.2 吞吐量/延迟/可靠性权衡&#x1f4a1; 权衡决策框架 1.3 运维复杂度评估&#x1f527; 运维成本降低策略 &#x1f3d7;️ 二、典型架构设计2.1 分布式事务最终一致…...

CppCon 2015 学习:Time Programming Fundamentals

Civil Time 公历时间 特点&#xff1a; 共 6 个字段&#xff1a; Year&#xff08;年&#xff09;Month&#xff08;月&#xff09;Day&#xff08;日&#xff09;Hour&#xff08;小时&#xff09;Minute&#xff08;分钟&#xff09;Second&#xff08;秒&#xff09; 表示…...

Cursor AI 账号纯净度维护与高效注册指南

Cursor AI 账号纯净度维护与高效注册指南&#xff1a;解决限制问题的实战方案 风车无限免费邮箱系统网页端使用说明|快速获取邮箱|cursor|windsurf|augment 问题背景 在成功解决 Cursor 环境配置问题后&#xff0c;许多开发者仍面临账号纯净度不足导致的限制问题。无论使用 16…...