redis stream restTemplate消息监听队列框架搭建
整体思路
1. pom增加redis依赖;
2. 消息监听器,实现StreamListener接口,处理消息到达逻辑;
3. 将消息订阅bean及监听器注册到配置中;
1. pom
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.6</version></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency>
2. 消息监听器实现代码
package cn.thuniwhir.fileserver.redis;import com.alibaba.fastjson.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;/*** @Description: TODO**/
@Component
public class RedisMQListener implements StreamListener<String, MapRecord<String, String, Object>> {private static final Logger log = LoggerFactory.getLogger(RedisMQListener.class);// 创建一个线程池private static final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 20, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());@Overridepublic void onMessage(MapRecord message) {// 异步处理消息threadPoolExecutor.execute(()->{System.out.println(Thread.currentThread().getName() + ":接收到的消息:" + message.getId() + ";" + JSON.toJSONString(message.getValue()));});}
}
3. redis订阅bean及监听器注册
package cn.thuniwhir.fileserver.redis;import cn.thuniwhir.fileserver.context.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamInfo;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;import java.time.Duration;
import java.util.stream.Collectors;/*** @Description: TODO**/
@Configuration
public class RedisMQConfig {@Autowiredprivate RedisMQListener redisMQListener;@Autowiredprivate RedisUtils redisUtils;private static RedisTemplate<Object, Object> redisTemplate;private static final Logger log = LoggerFactory.getLogger(RedisMQConfig.class);public RedisMQConfig(RedisTemplate<Object, Object> redisTemplate) {this.redisTemplate = redisTemplate;}@Beanpublic Subscription subscription(RedisConnectionFactory redisConnectionFactory) {if (redisUtils.hasKey(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME)) {StreamInfo.XInfoGroups xInfoGroups = redisTemplate.opsForStream().groups(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME);if (xInfoGroups.isEmpty()) {redisTemplate.opsForStream().createGroup(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME);} else {if (xInfoGroups.stream().filter(xInfoGroups1 -> xInfoGroups1.groupName().equals(Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME)).collect(Collectors.toList()).isEmpty()) {redisTemplate.opsForStream().createGroup(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME);}}} else {redisTemplate.opsForStream().createGroup(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME);}StreamMessageListenerContainer.StreamMessageListenerContainerOptions options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder().pollTimeout(Duration.ofSeconds(1)).build();StreamMessageListenerContainer streamMessageListenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, options);Subscription subscription = streamMessageListenerContainer.receiveAutoAck(Consumer.from(Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME, Constants.FILE_MQ_DISK_THRESHOLD_CONSUMER), StreamOffset.create(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, ReadOffset.lastConsumed()), redisMQListener);streamMessageListenerContainer.start();return subscription;}}
4. 测试生产消息 消息监听成功
4.1 生产消息
@RequestMapping("/produceMessage")public JSONObject produceMessage(@RequestBody JSONObject jsonObject) {String key = jsonObject.getString("key");String value = jsonObject.getString("value");MapRecord<Object, String, Object> mapRecord = MapRecord.create(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, Collections.singletonMap(key, value));redisTemplate.opsForStream().add(mapRecord);System.out.println("produceMessage Thread Name:" + Thread.currentThread().getName());return formatResult(null);}
4.2 消息监听器监听消息到达 代码见第二节
4.3 测试结果


相关文章:
redis stream restTemplate消息监听队列框架搭建
整体思路 1. pom增加redis依赖; 2. 消息监听器,实现StreamListener接口,处理消息到达逻辑; 3. 将消息订阅bean及监听器注册到配置中; 1. pom <?xml version"1.0" encoding"UTF-8"?> <…...
【期末不挂科-C++考前速过系列P1】大二C++第1次过程考核(3道简述题&7道代码题)【解析,注释】
前言 大家好吖,欢迎来到 YY 滴C复习系列 ,热烈欢迎! 本章主要内容面向接触过C的老铁 主要内容含: 欢迎订阅 YY滴C专栏!更多干货持续更新!以下是传送门! YY的《C》专栏YY的《C11》专栏YY的《Lin…...
游戏开发中,你的游戏图片压缩格式使用ASTC了吗
文章目录 ASTC原理:使用要求 ASTC(Adaptive Scalable Texture Compression,自适应可伸缩纹理压缩)是一种高级的纹理压缩技术,由ARM公司开发并推广。它在图形处理领域中因其出色的压缩效率和灵活性而受到广泛关注。 AST…...
【PostgreSQL】数据查询-概述
PostgreSQL数据查询 概述 检索或从数据库中检索数据的命令的过程称为查询。在 SQL 中,SELECT 命令用于指定查询。该命令的一般语法是SELECT [WITH with_queries] SELECT select_list FROM table_expression [sort_specification]一种简单的查询形式为:…...
element input组件自动失去焦点问题解决
最近在 Vue3 ElementPlus 中,使用 el-input 组件时,如果设置了 v-model,那么在每次改变内容后后,input 会自动失去焦点,这样会导致用户无法输入多个字符。 一、问题原因 如上图所示,配置项的 Name 和 Cod…...
鸿蒙Harmony--状态管理器-@Observed装饰器和@ObjectLink装饰器详解
经历的越多,越喜欢简单的生活,干净的东西,清楚的感觉,有结果的事,和说到做到的人。把圈子变小,把语放缓,把心放宽,用心做好手边的事儿,该有的总会有的! 目录 一ÿ…...
pytorch安装
pytoch安装 1. 准备工作1.1 需要提前安装的软件 2. 安装pyTorch我遇到的问题 3. 显卡测试4. CPU与GPU切换方法4.1 创建张量4.2 第一种切换方法4.3 第二种切换方法 1. 准备工作 1.1 需要提前安装的软件 Anaconda 史上最全最详细的Anaconda安装教程CUDA CUDA安装教程࿰…...
GBASE南大通用系统目录表
系统目录由描述数据库结构的表和视图组成。这些表对象有时称为数据字典,它们包含 数据库本身的所有信息。每个系统目录表都包含有关数据库中特定元素的信息。每个数据 库都有它自己的系统目录。 这些主题提供了有关系统目录表的结构、内容和使用的信息。还包含了有关…...
RPCMS跨站脚本漏洞(xss)
CNVD-ID: CNVD-2024-01190 漏洞描述: RPCMS是一个应用软件,一个网站CMS系统。 RPCMS v3.5.5版本存在跨站脚本漏洞,该漏洞源于组件/logs/dopost.html中对用户提供的数据缺乏有效过滤与转义,攻击者可利用该漏洞通过注入精心设计的有效载荷执行…...
Linux进阶命令使用
在 Linux 中,除了常用的基础命令,有一系列进阶命令可以帮助用户更有效地管理系统和执行复杂的任务。以下是一些常见的 Linux 进阶命令及其用法: 文本处理 grep:搜索文本并打印匹配的行。 grep pattern filenameawk:用…...
重定位,进程的创建,线程相关
重定位 进程的重定位指将程序加载到内存中不同的位置执行,在进程换出换入过程中将会发生。通过更新程序中使用的相对地址。 进程的创建——fork() 进程树,在自己的节点下创建进程节点。 使用fork,创建的子进程是父进…...
Java填充Execl模板并返回前端下载
功能:后端使用Java POI填充Execl模板,并返回前端下载 Execl模板如下: 1. Java后端 功能:填充模板EXECL,并返回前端 controller层 package org.huan.controller;import org.huan.dto.ExcelData; import org.huan.util.ExcelT…...
ChatGPT本地部署,学习记录
一、GPT4ALL模型 官网地址: Github:https://github.com/nomic-ai/gpt4all GPT4ALL项目部署简易,但是在运行体验上一般,并且是只调用CPU来进行运算。 看官方文档介绍在嵌入式上有比较大的优势,但是目前个人对嵌入式…...
Find My游戏手柄|苹果Find My技术与手柄结合,智能防丢,全球定位
游戏手柄是一种常见电子游戏机的部件,通过操纵其按钮等,实现对游戏虚拟角色的控制。随着游戏设备硬件的升级换代,现代游戏手柄又增加了:类比摇杆(方向及视角),扳机键以及HOME菜单键等。现在的游…...
2024美赛数学建模思路 - 复盘:光照强度计算的优化模型
文章目录 0 赛题思路1 问题要求2 假设约定3 符号约定4 建立模型5 模型求解6 实现代码 建模资料 0 赛题思路 (赛题出来以后第一时间在CSDN分享) https://blog.csdn.net/dc_sinor?typeblog 1 问题要求 现在已知一个教室长为15米,宽为12米&…...
【Deep Dive: AI Webinar】开放 ChatGPT - 人工智能开放性运作的案例研究
【深入探讨人工智能】网络研讨系列总共有 17 个视频。我们按照视频内容,大致上分成了 3 个大类: 1. 人工智能的开放、风险与挑战(4 篇) 2. 人工智能的治理(总共 12 篇),其中分成了几个子类&…...
Devops相关问题及答案(2024)
1、DevOps 的理念是什么? DevOps是一种组织文化、流程和工具的集合,旨在提高软件交付的速度和质量,通过自动化和持续改进的方法来促进开发(Dev)和运维(Ops)的协作。 DevOps的核心理念包括&…...
掌握Python设计模式,SQL Alchemy打破ORM与模型类的束缚
大家好,反转软件组件之间的依赖关系之所以重要,是因为它有助于降低耦合度和提高模块化程度,进而可以提高软件的可维护性、可扩展性和可测试性。 当组件之间紧密耦合时,对一个组件的更改可能会对其他组件产生意想不到的影响&#…...
性能分析与调优: Linux 磁盘I/O 观测工具
目录 一、实验 1.环境 2.iostat 3.sar 4.pidstat 5.perf 6. biolatency 7. biosnoop 8.iotop、biotop 9.blktrace 10.bpftrace 11.smartctl 二、问题 1.如何查看PSI数据 2.iotop如何安装 3.smartctl如何使用 一、实验 1.环境 (1)主机 …...
Could not erase files or folders:
IDEA删除 git 的 localChanges 内的文件时,提示Could not erase files or folders:。 确认下这个文件是否被打开,忘记关闭了;关闭后可以被删除。(文件被打开的情况下,用操作系统自带的删除,也无法删除成功…...
Linux相关概念和易错知识点(42)(TCP的连接管理、可靠性、面临复杂网络的处理)
目录 1.TCP的连接管理机制(1)三次握手①握手过程②对握手过程的理解 (2)四次挥手(3)握手和挥手的触发(4)状态切换①挥手过程中状态的切换②握手过程中状态的切换 2.TCP的可靠性&…...
如何在看板中有效管理突发紧急任务
在看板中有效管理突发紧急任务需要:设立专门的紧急任务通道、重新调整任务优先级、保持适度的WIP(Work-in-Progress)弹性、优化任务处理流程、提高团队应对突发情况的敏捷性。其中,设立专门的紧急任务通道尤为重要,这能…...
多模态大语言模型arxiv论文略读(108)
CROME: Cross-Modal Adapters for Efficient Multimodal LLM ➡️ 论文标题:CROME: Cross-Modal Adapters for Efficient Multimodal LLM ➡️ 论文作者:Sayna Ebrahimi, Sercan O. Arik, Tejas Nama, Tomas Pfister ➡️ 研究机构: Google Cloud AI Re…...
Mysql中select查询语句的执行过程
目录 1、介绍 1.1、组件介绍 1.2、Sql执行顺序 2、执行流程 2.1. 连接与认证 2.2. 查询缓存 2.3. 语法解析(Parser) 2.4、执行sql 1. 预处理(Preprocessor) 2. 查询优化器(Optimizer) 3. 执行器…...
windows系统MySQL安装文档
概览:本文讨论了MySQL的安装、使用过程中涉及的解压、配置、初始化、注册服务、启动、修改密码、登录、退出以及卸载等相关内容,为学习者提供全面的操作指导。关键要点包括: 解压 :下载完成后解压压缩包,得到MySQL 8.…...
tomcat指定使用的jdk版本
说明 有时候需要对tomcat配置指定的jdk版本号,此时,我们可以通过以下方式进行配置 设置方式 找到tomcat的bin目录中的setclasspath.bat。如果是linux系统则是setclasspath.sh set JAVA_HOMEC:\Program Files\Java\jdk8 set JRE_HOMEC:\Program Files…...
云原生周刊:k0s 成为 CNCF 沙箱项目
开源项目推荐 HAMi HAMi(原名 k8s‑vGPU‑scheduler)是一款 CNCF Sandbox 级别的开源 K8s 中间件,通过虚拟化 GPU/NPU 等异构设备并支持内存、计算核心时间片隔离及共享调度,为容器提供统一接口,实现细粒度资源配额…...
02.运算符
目录 什么是运算符 算术运算符 1.基本四则运算符 2.增量运算符 3.自增/自减运算符 关系运算符 逻辑运算符 &&:逻辑与 ||:逻辑或 !:逻辑非 短路求值 位运算符 按位与&: 按位或 | 按位取反~ …...
6.9-QT模拟计算器
源码: 头文件: widget.h #ifndef WIDGET_H #define WIDGET_H#include <QWidget> #include <QMouseEvent>QT_BEGIN_NAMESPACE namespace Ui { class Widget; } QT_END_NAMESPACEclass Widget : public QWidget {Q_OBJECTpublic:Widget(QWidget *parent nullptr);…...
flow_controllers
关键点: 流控制器类型: 同步(Sync):发布操作会阻塞,直到数据被确认发送。异步(Async):发布操作非阻塞,数据发送由后台线程处理。纯同步(PureSync…...
