redis stream restTemplate消息监听队列框架搭建
整体思路
1. pom增加redis依赖;
2. 消息监听器,实现StreamListener接口,处理消息到达逻辑;
3. 将消息订阅bean及监听器注册到配置中;
1. pom
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.6</version></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency>
2. 消息监听器实现代码
package cn.thuniwhir.fileserver.redis;import com.alibaba.fastjson.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;/*** @Description: TODO**/
@Component
public class RedisMQListener implements StreamListener<String, MapRecord<String, String, Object>> {private static final Logger log = LoggerFactory.getLogger(RedisMQListener.class);// 创建一个线程池private static final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 20, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());@Overridepublic void onMessage(MapRecord message) {// 异步处理消息threadPoolExecutor.execute(()->{System.out.println(Thread.currentThread().getName() + ":接收到的消息:" + message.getId() + ";" + JSON.toJSONString(message.getValue()));});}
}
3. redis订阅bean及监听器注册
package cn.thuniwhir.fileserver.redis;import cn.thuniwhir.fileserver.context.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamInfo;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;import java.time.Duration;
import java.util.stream.Collectors;/*** @Description: TODO**/
@Configuration
public class RedisMQConfig {@Autowiredprivate RedisMQListener redisMQListener;@Autowiredprivate RedisUtils redisUtils;private static RedisTemplate<Object, Object> redisTemplate;private static final Logger log = LoggerFactory.getLogger(RedisMQConfig.class);public RedisMQConfig(RedisTemplate<Object, Object> redisTemplate) {this.redisTemplate = redisTemplate;}@Beanpublic Subscription subscription(RedisConnectionFactory redisConnectionFactory) {if (redisUtils.hasKey(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME)) {StreamInfo.XInfoGroups xInfoGroups = redisTemplate.opsForStream().groups(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME);if (xInfoGroups.isEmpty()) {redisTemplate.opsForStream().createGroup(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME);} else {if (xInfoGroups.stream().filter(xInfoGroups1 -> xInfoGroups1.groupName().equals(Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME)).collect(Collectors.toList()).isEmpty()) {redisTemplate.opsForStream().createGroup(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME);}}} else {redisTemplate.opsForStream().createGroup(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME);}StreamMessageListenerContainer.StreamMessageListenerContainerOptions options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder().pollTimeout(Duration.ofSeconds(1)).build();StreamMessageListenerContainer streamMessageListenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, options);Subscription subscription = streamMessageListenerContainer.receiveAutoAck(Consumer.from(Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME, Constants.FILE_MQ_DISK_THRESHOLD_CONSUMER), StreamOffset.create(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, ReadOffset.lastConsumed()), redisMQListener);streamMessageListenerContainer.start();return subscription;}}
4. 测试生产消息 消息监听成功
4.1 生产消息
@RequestMapping("/produceMessage")public JSONObject produceMessage(@RequestBody JSONObject jsonObject) {String key = jsonObject.getString("key");String value = jsonObject.getString("value");MapRecord<Object, String, Object> mapRecord = MapRecord.create(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, Collections.singletonMap(key, value));redisTemplate.opsForStream().add(mapRecord);System.out.println("produceMessage Thread Name:" + Thread.currentThread().getName());return formatResult(null);}
4.2 消息监听器监听消息到达 代码见第二节
4.3 测试结果


相关文章:
redis stream restTemplate消息监听队列框架搭建
整体思路 1. pom增加redis依赖; 2. 消息监听器,实现StreamListener接口,处理消息到达逻辑; 3. 将消息订阅bean及监听器注册到配置中; 1. pom <?xml version"1.0" encoding"UTF-8"?> <…...
【期末不挂科-C++考前速过系列P1】大二C++第1次过程考核(3道简述题&7道代码题)【解析,注释】
前言 大家好吖,欢迎来到 YY 滴C复习系列 ,热烈欢迎! 本章主要内容面向接触过C的老铁 主要内容含: 欢迎订阅 YY滴C专栏!更多干货持续更新!以下是传送门! YY的《C》专栏YY的《C11》专栏YY的《Lin…...
游戏开发中,你的游戏图片压缩格式使用ASTC了吗
文章目录 ASTC原理:使用要求 ASTC(Adaptive Scalable Texture Compression,自适应可伸缩纹理压缩)是一种高级的纹理压缩技术,由ARM公司开发并推广。它在图形处理领域中因其出色的压缩效率和灵活性而受到广泛关注。 AST…...
【PostgreSQL】数据查询-概述
PostgreSQL数据查询 概述 检索或从数据库中检索数据的命令的过程称为查询。在 SQL 中,SELECT 命令用于指定查询。该命令的一般语法是SELECT [WITH with_queries] SELECT select_list FROM table_expression [sort_specification]一种简单的查询形式为:…...
element input组件自动失去焦点问题解决
最近在 Vue3 ElementPlus 中,使用 el-input 组件时,如果设置了 v-model,那么在每次改变内容后后,input 会自动失去焦点,这样会导致用户无法输入多个字符。 一、问题原因 如上图所示,配置项的 Name 和 Cod…...
鸿蒙Harmony--状态管理器-@Observed装饰器和@ObjectLink装饰器详解
经历的越多,越喜欢简单的生活,干净的东西,清楚的感觉,有结果的事,和说到做到的人。把圈子变小,把语放缓,把心放宽,用心做好手边的事儿,该有的总会有的! 目录 一ÿ…...
pytorch安装
pytoch安装 1. 准备工作1.1 需要提前安装的软件 2. 安装pyTorch我遇到的问题 3. 显卡测试4. CPU与GPU切换方法4.1 创建张量4.2 第一种切换方法4.3 第二种切换方法 1. 准备工作 1.1 需要提前安装的软件 Anaconda 史上最全最详细的Anaconda安装教程CUDA CUDA安装教程࿰…...
GBASE南大通用系统目录表
系统目录由描述数据库结构的表和视图组成。这些表对象有时称为数据字典,它们包含 数据库本身的所有信息。每个系统目录表都包含有关数据库中特定元素的信息。每个数据 库都有它自己的系统目录。 这些主题提供了有关系统目录表的结构、内容和使用的信息。还包含了有关…...
RPCMS跨站脚本漏洞(xss)
CNVD-ID: CNVD-2024-01190 漏洞描述: RPCMS是一个应用软件,一个网站CMS系统。 RPCMS v3.5.5版本存在跨站脚本漏洞,该漏洞源于组件/logs/dopost.html中对用户提供的数据缺乏有效过滤与转义,攻击者可利用该漏洞通过注入精心设计的有效载荷执行…...
Linux进阶命令使用
在 Linux 中,除了常用的基础命令,有一系列进阶命令可以帮助用户更有效地管理系统和执行复杂的任务。以下是一些常见的 Linux 进阶命令及其用法: 文本处理 grep:搜索文本并打印匹配的行。 grep pattern filenameawk:用…...
重定位,进程的创建,线程相关
重定位 进程的重定位指将程序加载到内存中不同的位置执行,在进程换出换入过程中将会发生。通过更新程序中使用的相对地址。 进程的创建——fork() 进程树,在自己的节点下创建进程节点。 使用fork,创建的子进程是父进…...
Java填充Execl模板并返回前端下载
功能:后端使用Java POI填充Execl模板,并返回前端下载 Execl模板如下: 1. Java后端 功能:填充模板EXECL,并返回前端 controller层 package org.huan.controller;import org.huan.dto.ExcelData; import org.huan.util.ExcelT…...
ChatGPT本地部署,学习记录
一、GPT4ALL模型 官网地址: Github:https://github.com/nomic-ai/gpt4all GPT4ALL项目部署简易,但是在运行体验上一般,并且是只调用CPU来进行运算。 看官方文档介绍在嵌入式上有比较大的优势,但是目前个人对嵌入式…...
Find My游戏手柄|苹果Find My技术与手柄结合,智能防丢,全球定位
游戏手柄是一种常见电子游戏机的部件,通过操纵其按钮等,实现对游戏虚拟角色的控制。随着游戏设备硬件的升级换代,现代游戏手柄又增加了:类比摇杆(方向及视角),扳机键以及HOME菜单键等。现在的游…...
2024美赛数学建模思路 - 复盘:光照强度计算的优化模型
文章目录 0 赛题思路1 问题要求2 假设约定3 符号约定4 建立模型5 模型求解6 实现代码 建模资料 0 赛题思路 (赛题出来以后第一时间在CSDN分享) https://blog.csdn.net/dc_sinor?typeblog 1 问题要求 现在已知一个教室长为15米,宽为12米&…...
【Deep Dive: AI Webinar】开放 ChatGPT - 人工智能开放性运作的案例研究
【深入探讨人工智能】网络研讨系列总共有 17 个视频。我们按照视频内容,大致上分成了 3 个大类: 1. 人工智能的开放、风险与挑战(4 篇) 2. 人工智能的治理(总共 12 篇),其中分成了几个子类&…...
Devops相关问题及答案(2024)
1、DevOps 的理念是什么? DevOps是一种组织文化、流程和工具的集合,旨在提高软件交付的速度和质量,通过自动化和持续改进的方法来促进开发(Dev)和运维(Ops)的协作。 DevOps的核心理念包括&…...
掌握Python设计模式,SQL Alchemy打破ORM与模型类的束缚
大家好,反转软件组件之间的依赖关系之所以重要,是因为它有助于降低耦合度和提高模块化程度,进而可以提高软件的可维护性、可扩展性和可测试性。 当组件之间紧密耦合时,对一个组件的更改可能会对其他组件产生意想不到的影响&#…...
性能分析与调优: Linux 磁盘I/O 观测工具
目录 一、实验 1.环境 2.iostat 3.sar 4.pidstat 5.perf 6. biolatency 7. biosnoop 8.iotop、biotop 9.blktrace 10.bpftrace 11.smartctl 二、问题 1.如何查看PSI数据 2.iotop如何安装 3.smartctl如何使用 一、实验 1.环境 (1)主机 …...
Could not erase files or folders:
IDEA删除 git 的 localChanges 内的文件时,提示Could not erase files or folders:。 确认下这个文件是否被打开,忘记关闭了;关闭后可以被删除。(文件被打开的情况下,用操作系统自带的删除,也无法删除成功…...
DockerHub与私有镜像仓库在容器化中的应用与管理
哈喽,大家好,我是左手python! Docker Hub的应用与管理 Docker Hub的基本概念与使用方法 Docker Hub是Docker官方提供的一个公共镜像仓库,用户可以在其中找到各种操作系统、软件和应用的镜像。开发者可以通过Docker Hub轻松获取所…...
【HarmonyOS 5.0】DevEco Testing:鸿蒙应用质量保障的终极武器
——全方位测试解决方案与代码实战 一、工具定位与核心能力 DevEco Testing是HarmonyOS官方推出的一体化测试平台,覆盖应用全生命周期测试需求,主要提供五大核心能力: 测试类型检测目标关键指标功能体验基…...
376. Wiggle Subsequence
376. Wiggle Subsequence 代码 class Solution { public:int wiggleMaxLength(vector<int>& nums) {int n nums.size();int res 1;int prediff 0;int curdiff 0;for(int i 0;i < n-1;i){curdiff nums[i1] - nums[i];if( (prediff > 0 && curdif…...
学习STC51单片机31(芯片为STC89C52RCRC)OLED显示屏1
每日一言 生活的美好,总是藏在那些你咬牙坚持的日子里。 硬件:OLED 以后要用到OLED的时候找到这个文件 OLED的设备地址 SSD1306"SSD" 是品牌缩写,"1306" 是产品编号。 驱动 OLED 屏幕的 IIC 总线数据传输格式 示意图 …...
[ACTF2020 新生赛]Include 1(php://filter伪协议)
题目 做法 启动靶机,点进去 点进去 查看URL,有 ?fileflag.php说明存在文件包含,原理是php://filter 协议 当它与包含函数结合时,php://filter流会被当作php文件执行。 用php://filter加编码,能让PHP把文件内容…...
学习一下用鸿蒙DevEco Studio HarmonyOS5实现百度地图
在鸿蒙(HarmonyOS5)中集成百度地图,可以通过以下步骤和技术方案实现。结合鸿蒙的分布式能力和百度地图的API,可以构建跨设备的定位、导航和地图展示功能。 1. 鸿蒙环境准备 开发工具:下载安装 De…...
【Kafka】Kafka从入门到实战:构建高吞吐量分布式消息系统
Kafka从入门到实战:构建高吞吐量分布式消息系统 一、Kafka概述 Apache Kafka是一个分布式流处理平台,最初由LinkedIn开发,后成为Apache顶级项目。它被设计用于高吞吐量、低延迟的消息处理,能够处理来自多个生产者的海量数据,并将这些数据实时传递给消费者。 Kafka核心特…...
大数据治理的常见方式
大数据治理的常见方式 大数据治理是确保数据质量、安全性和可用性的系统性方法,以下是几种常见的治理方式: 1. 数据质量管理 核心方法: 数据校验:建立数据校验规则(格式、范围、一致性等)数据清洗&…...
麒麟系统使用-进行.NET开发
文章目录 前言一、搭建dotnet环境1.获取相关资源2.配置dotnet 二、使用dotnet三、其他说明总结 前言 麒麟系统的内核是基于linux的,如果需要进行.NET开发,则需要安装特定的应用。由于NET Framework 是仅适用于 Windows 版本的 .NET,所以要进…...
【threejs】每天一个小案例讲解:创建基本的3D场景
代码仓 GitHub - TiffanyHoo/three_practices: Learning three.js together! 可自行clone,无需安装依赖,直接liver-server运行/直接打开chapter01中的html文件 运行效果图 知识要点 核心三要素 场景(Scene) 使用 THREE.Scene(…...
