当前位置: 首页 > news >正文

springboot集成Redisson做分布式消息队列

这里演示Redisson做分布式消息队列。首先引入 Redisson依赖,官方github

<dependency><groupId>org.redisson</groupId><artifactId>redisson-spring-boot-starter</artifactId><version>3.17.6</version>
</dependency>

首先创建一个自定义注解RedissonTopic.java,用于指定消息的路由key

package com.zyq.annotation;import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.util.concurrent.TimeUnit;/** Redissson消息队列注解* author xiaochi* date 2024/10/23*/
@Inherited
@Documented
@Target({ElementType.TYPE,ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface RedissonTopic {/*** topic名称* @return*/String key();/*** 是否队列发送消息* @return*/boolean queue() default false;/*** 队列容量* @return*/int queueSize() default 100;/** queue为true时生效* 延迟发送时间(大于0默认延迟,延迟队列可设置大于0)* @return*/int delayTime() default 0;/** queue为true时生效* 时间单位(默认毫秒)* @return*/TimeUnit timeUnit() default TimeUnit.MILLISECONDS;
}

继续创建消息监听器 RedissonTopicMessageListener.java,具体内容如下:

package com.zyq.listener;/** Redisson消息监听接口* author xiaochi* date 2024/10/23*/
public interface RedissonTopicMessageListener{/*** 接收的消息处理* @param message*/void message(Object message);/*** 发送失败(队列已满时会调用)* @param message*/void sendFail(Object message);/*** 异常* @param message*/void exception(Object message);
}

接下来就是最重要的Redisson配置,内容如下:

/*** Redisson 配置* @return*/
@Bean(destroyMethod="shutdown")
public RedissonClient redissonClient(ConfigurableApplicationContext applicationContext){Config config = new Config();config.useSingleServer().setPassword("123456").setDatabase(0).setConnectionPoolSize(24) // 连接池大小,默认64.setConnectionMinimumIdleSize(3) // 最小空闲连接数,默认32.setRetryAttempts(3) // 命令失败重试次数 3.setRetryInterval(1500) // 命令重试发送时间间隔(毫秒) 默认1500.setTimeout(10000) // 命令等待超时(毫秒) 默认10000.setConnectTimeout(10000) // 连接空闲超时(毫秒) 默认10000.setIdleConnectionTimeout(10000) // 连接空闲超时(毫秒) 默认10000.setSubscriptionConnectionMinimumIdleSize(1) // 发布和订阅连接的最小空闲连接数.setSubscriptionConnectionPoolSize(24) // 发布和订阅连接池大小 默认50.setDnsMonitoringInterval(10000) // DNS监测时间间隔(毫秒),默认5000*//*.setAddress("redis://127.0.0.1:6379");//config.setThreads(Runtime.getRuntime().availableProcessors());// 默认 16RedissonClient redissonClient = Redisson.create(config);StringBuilder msg = new StringBuilder();msg.append("Redisson topic register[");String[] beanNames = applicationContext.getBeanNamesForType(RedissonTopicMessageListener.class);for (String beanName : beanNames) {RedissonTopicMessageListener topicMessageListener = applicationContext.getBean(beanName, RedissonTopicMessageListener.class);if (topicMessageListener.getClass().isAnnotationPresent(RedissonTopic.class)){RedissonTopic redissonTopic = topicMessageListener.getClass().getAnnotation(RedissonTopic.class);if (redissonTopic.queue()){RBoundedBlockingQueue<Object> boundedBlockingQueue = redissonClient.getBoundedBlockingQueue(redissonTopic.key());boundedBlockingQueue.trySetCapacity(redissonTopic.queueSize());RDelayedQueue<Object> delayedQueue = null;if (0 != redissonTopic.delayTime()){delayedQueue = redissonClient.getDelayedQueue(boundedBlockingQueue);}RTopic topic = redissonClient.getTopic(redissonTopic.key());RDelayedQueue<Object> finalDelayedQueue = delayedQueue;topic.addListener(Object.class, (channel, message) -> {if (finalDelayedQueue != null){try {finalDelayedQueue.offer(message,redissonTopic.delayTime(), redissonTopic.timeUnit());}catch (Exception e){topicMessageListener.exception(e.getMessage());log.info("Redisson添加延迟队列异常,{}",e);}}else {try {if (!boundedBlockingQueue.offer(message)){topicMessageListener.sendFail(message);}}catch (Exception e){topicMessageListener.exception(e.getMessage());log.info("Redisson添加队列异常,{}",e);}}});// 为了不阻塞主线程,放在新线程中运行AsyncUtil.run(() -> {while (!Thread.currentThread().isInterrupted() && !redissonClient.isShutdown()){try {Object take = boundedBlockingQueue.take();if (!"".equals(take)){topicMessageListener.message(take);}} catch (Exception e) {topicMessageListener.exception(e.getMessage());log.info("Redisson延迟队列监测异常,{}",e);}}if (Thread.currentThread().isInterrupted() || redissonClient.isShutdown()){log.info("Redisson service shutdown");}});}else {RTopic topic = redissonClient.getTopic(redissonTopic.key());topic.addListener(Object.class, (channel,message) -> {try {topicMessageListener.message(message);}catch (Exception e){topicMessageListener.exception(e.getMessage());log.info("Redisson即时消息发送异常,{}",e);}});}msg.append(redissonTopic.key()).append(".");}}msg.append("]").append("finish.");log.info(msg.toString());return redissonClient;
}

到此基本就完成了,接下来就是创建消息监听类进行消费消息了TopicMessageListener.java 去实现消息监听器接口RedissonTopicMessageListener.java

package com.zyq.listener;import com.zyq.annotation.RedissonTopic;
import org.springframework.stereotype.Component;/** 消息监听类* author xiaochi* date 2024/10/23*/
@Component
@RedissonTopic(key = "testTopic",queue = true,delayTime = 5000)
public class TopicMessageListener implements RedissonTopicMessageListener {@Overridepublic void message(Object message) {System.out.println("testTopic监听器延迟队列收到消息," + message);}@Overridepublic void sendFail(Object message) {System.out.println("延迟队列 TopicMessageListener testTopic消息发送失败");}@Overridepublic void exception(Object message) {System.out.println("延迟队列 TopicMessageListener testTopic消息异常");}
}

现在可以起2个springboot项目进行消息交流了。封装一个消息发送工具 RedissonMessageUtil.java,内容如下:

package com.demo3.util;import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;/** Redisson消息发送工具* author xiaochi* date 2024/10/24*/
@Component
public class RedissonMessageUtil {private static RedissonClient redissonClient;@Autowiredpublic void setRedissonClient(RedissonClient redissonClient) {RedissonMessageUtil.redissonClient = redissonClient;}/*** 发送消息* @param key* @param message* @return 返回接收消息的客户端数量*/public static long send(String key,Object message){RTopic topic = redissonClient.getTopic(key);return topic.publish(message);}
}

到此完成。

相关文章:

springboot集成Redisson做分布式消息队列

这里演示Redisson做分布式消息队列。首先引入 Redisson依赖&#xff0c;官方github <dependency><groupId>org.redisson</groupId><artifactId>redisson-spring-boot-starter</artifactId><version>3.17.6</version> </dependen…...

如何通过Lua语言请求接口拿到数据

文章目录 概要http客户端通过请求下载数据 概要 当某个需求是需要在模块内请求接口拿到数据&#xff0c;需要使用http客户端调用接口 http客户端 LuaSOC请求接口官方文档 调用&#xff1a;http.request(method,url,headers,body,opts,ca_file,client_ca, client_key, clien…...

Android 13 SystemUI 隐藏下拉快捷面板部分模块(wifi,bt,nfc等)入口

frameworks/base/packages/SystemUI/src/com/android/systemui/qs/tileimpl/QSFactoryImpl.java createTileInternal(tileSpec)方法注释想隐藏的模块即可。...

自由学习记录(14)

unity操作问题 位置&#xff1a;子物体的位置是相对于父物体的。如果你移动父物体&#xff0c;子物体会保持相对于父物体的相对位置&#xff0c;跟着一起移动。 旋转&#xff1a;子物体的旋转也是相对于父物体的。旋转父物体会导致子物体围绕父物体的原点旋转。 缩放&#xf…...

疯狂Spring Boot讲义[推荐1]

《疯狂Spring Boot讲义》是2021年电子工业出版社出版的图书&#xff0c;作者是李刚 《疯狂Spring Boot终极讲义》不是一本介绍类似于PathVariable、MatrixVariable、RequestBody、ResponseBody这些基础注解的图书&#xff0c;它是真正讲解Spring Boot的图书。Spring Boot的核心…...

vue中$nextTick的作用是什么,什么时候使用

$nextTick 是 Vue 提供的一个方法&#xff0c;用于在下一次 DOM 更新周期之后执行回调函数。它通常用于在 Vue 完成数据更新后&#xff0c;需要访问更新后的 DOM 状态时&#xff0c;保证操作的是更新后的 DOM。 工作原理&#xff1a; Vue 是异步更新 DOM 的&#xff0c;当数据…...

Redis实现全局ID生成器

全局ID生成器 为什么要用全局ID生成器 1.当我们使用数据库自增来实现id的生成时,规律过于明显,会给用户暴露很多信息 2.当我们订单量过大时无法用数据库的一张表来存放订单,如果两张表的id都是自增的话,id就会出现重复 什么是全局ID生成器 全局ID生成器,是一种在分布式系统…...

Xshell远程连接工具详解

Xshell是一款在Windows平台上运行的远程连接工具&#xff0c;它支持SSH1、SSH2以及Microsoft Windows平台的TELNET协议。Xshell通过互联网实现对远程主机的安全连接&#xff0c;帮助用户在复杂的网络环境中享受他们的工作。本文将详细介绍Xshell的溯源、最新版本以及它的优势。…...

如何在verilog设计的磁盘阵列控制器中实现不同RAID级别(如RAID 0、RAID 1等)的切换?

以下是一种在Verilog设计的磁盘阵列控制器中实现不同RAID级别(以RAID 0和RAID 1为例)切换的方法: 添加控制信号 在磁盘阵列控制器模块中添加一个输入信号,例如raid_mode,用于选择RAID模式。假设raid_mode = 0表示RAID 0模式,raid_mode = 1表示RAID 1模式。module raid_co…...

基于元神操作系统实现NTFS文件操作(十)

1. 背景 本文补充介绍文件遍历操作的部分附加内容&#xff0c;譬如&#xff0c;过滤掉系统元文件、过滤掉重复的文件项、过滤掉隐藏文件等&#xff0c;并提供了基于元神操作系统的部分实现代码。 2. 方法 &#xff08;1&#xff09;过滤掉系统元文件 NTFS文件系统的前16个元…...

Qt的几个函数方法

void receiveInfo1() {// 假设这是从串口接收到的字符串QString receivedString "23.5C,45%,1012hPa";// 使用逗号分隔符分割字符串QStringList parts receivedString.split(,);// 检查分割后的列表是否有足够的部分if (parts.size() > 3) {QString part1 part…...

openpnp - bug - 散料飞达至少定义2个物料

文章目录 openpnp - bug - 散料飞达至少定义2个物料笔记END openpnp - bug - 散料飞达至少定义2个物料 笔记 散料飞达上定义的物料个数用完了&#xff0c;现在只需要一个料就可以。 用顶部相机去找编带上是否还有一个单独的料&#xff0c;找到了。 定义散料飞达的料为1个&…...

HDFS异常org.apache.hadoop.hdfs.protocol.NSQuotaExceededException

HDFS异常org.apache.hadoop.hdfs.protocol.NSQuotaExceededException 异常信息&#xff1a; Hive:org.apache.hadoop.hdfs.protocol.NSQuotaExceededException: The NameSpace quota (directories and files) of directory /xxxdir is exceeded: quota10000 file count15001N…...

数据库的构成与手写简单数据库的探索

一、引言 在当今数字化的时代&#xff0c;数据库扮演着至关重要的角色。无论是企业管理系统、电子商务平台还是各种移动应用&#xff0c;都离不开数据库的支持。数据库是存储和管理数据的核心工具&#xff0c;它的高效性、可靠性和安全性对于数据的处理和应用至关重要。本文将…...

基于STM32的智能晾衣架设计

引言 随着智能家居的普及&#xff0c;智能晾衣架成为了提升生活便利性的重要设备。智能晾衣架通过集成多个传感器&#xff0c;能够自动感知天气变化、湿度、光照等环境因素&#xff0c;实现自动升降、风干和报警功能&#xff0c;帮助用户更加高效地晾晒衣物。本项目基于STM32设…...

【MAUI】模糊控件(毛玻璃高斯模糊亚克力模糊)

文章目录 XAML.CSToBytes方法使用效果 常试过AcrylicView.MAUI和Sharpnado.MaterialFrame&#xff0c;对于二者教程很少&#xff0c;使用直接写控件然后调属性&#xff0c;没有报错但也并没有效果所幸就自己写一个 XAML <?xml version"1.0" encoding"utf-…...

深度学习:pandas篇

1. Pandas 基础 Pandas 是一个帮助你处理和分析数据的工具 安装 Pandas pip install pandas 导入 Pandas&#xff0c;我们用 pd 来代替 Pandas 的全称&#xff0c;这样以后写代码的时候更简洁 import pandas as pd 建 Series 和 DataFrame Pandas 最基本的两个数据结构是…...

Redis学习文档(Redis基本数据类型【Hash、Set】)

Hash&#xff08;哈希&#xff09; 介绍 Redis 中的 Hash 是一个 String 类型的 field-value&#xff08;键值对&#xff09; 的映射表&#xff0c;特别适合用于存储对象&#xff0c;后续操作的时候&#xff0c;你可以直接修改这个对象中的某些字段的值。 Hash 类似于 JDK1.…...

15分钟学Go 第9天:函数的定义与调用

第9天&#xff1a;函数的定义与调用 欢迎来到第9天的Go语言学习模块&#xff01;今天我们将深入探讨函数的定义与调用&#xff0c;帮助你掌握如何编写和使用函数。学习函数不仅是Go语言的基础&#xff0c;也是程序设计的核心概念之一。这一节将详细介绍函数的结构、参数传递、…...

Java虚拟机:JVM介绍

1024 程序员节日快乐&#xff01;愿您我的代码永远没有 bug &#xff0c;人生永远没有 bug &#xff01; JVM 概述JVM 架构 概述 JVM&#xff08; Java Virtual Machine &#xff0c;Java 虚拟机&#xff09;&#xff0c;是 Java 语言的运行环境&#xff0c;是运行所有 Java 程…...

k8s从入门到放弃之Ingress七层负载

k8s从入门到放弃之Ingress七层负载 在Kubernetes&#xff08;简称K8s&#xff09;中&#xff0c;Ingress是一个API对象&#xff0c;它允许你定义如何从集群外部访问集群内部的服务。Ingress可以提供负载均衡、SSL终结和基于名称的虚拟主机等功能。通过Ingress&#xff0c;你可…...

Java如何权衡是使用无序的数组还是有序的数组

在 Java 中,选择有序数组还是无序数组取决于具体场景的性能需求与操作特点。以下是关键权衡因素及决策指南: ⚖️ 核心权衡维度 维度有序数组无序数组查询性能二分查找 O(log n) ✅线性扫描 O(n) ❌插入/删除需移位维护顺序 O(n) ❌直接操作尾部 O(1) ✅内存开销与无序数组相…...

电脑插入多块移动硬盘后经常出现卡顿和蓝屏

当电脑在插入多块移动硬盘后频繁出现卡顿和蓝屏问题时&#xff0c;可能涉及硬件资源冲突、驱动兼容性、供电不足或系统设置等多方面原因。以下是逐步排查和解决方案&#xff1a; 1. 检查电源供电问题 问题原因&#xff1a;多块移动硬盘同时运行可能导致USB接口供电不足&#x…...

使用Matplotlib创建炫酷的3D散点图:数据可视化的新维度

文章目录 基础实现代码代码解析进阶技巧1. 自定义点的大小和颜色2. 添加图例和样式美化3. 真实数据应用示例实用技巧与注意事项完整示例(带样式)应用场景在数据科学和可视化领域,三维图形能为我们提供更丰富的数据洞察。本文将手把手教你如何使用Python的Matplotlib库创建引…...

短视频矩阵系统文案创作功能开发实践,定制化开发

在短视频行业迅猛发展的当下&#xff0c;企业和个人创作者为了扩大影响力、提升传播效果&#xff0c;纷纷采用短视频矩阵运营策略&#xff0c;同时管理多个平台、多个账号的内容发布。然而&#xff0c;频繁的文案创作需求让运营者疲于应对&#xff0c;如何高效产出高质量文案成…...

MFC 抛体运动模拟:常见问题解决与界面美化

在 MFC 中开发抛体运动模拟程序时,我们常遇到 轨迹残留、无效刷新、视觉单调、物理逻辑瑕疵 等问题。本文将针对这些痛点,详细解析原因并提供解决方案,同时兼顾界面美化,让模拟效果更专业、更高效。 问题一:历史轨迹与小球残影残留 现象 小球运动后,历史位置的 “残影”…...

MinIO Docker 部署:仅开放一个端口

MinIO Docker 部署:仅开放一个端口 在实际的服务器部署中,出于安全和管理的考虑,我们可能只能开放一个端口。MinIO 是一个高性能的对象存储服务,支持 Docker 部署,但默认情况下它需要两个端口:一个是 API 端口(用于存储和访问数据),另一个是控制台端口(用于管理界面…...

面试高频问题

文章目录 &#x1f680; 消息队列核心技术揭秘&#xff1a;从入门到秒杀面试官1️⃣ Kafka为何能"吞云吐雾"&#xff1f;性能背后的秘密1.1 顺序写入与零拷贝&#xff1a;性能的双引擎1.2 分区并行&#xff1a;数据的"八车道高速公路"1.3 页缓存与批量处理…...

Git 命令全流程总结

以下是从初始化到版本控制、查看记录、撤回操作的 Git 命令全流程总结&#xff0c;按操作场景分类整理&#xff1a; 一、初始化与基础操作 操作命令初始化仓库git init添加所有文件到暂存区git add .提交到本地仓库git commit -m "提交描述"首次提交需配置身份git c…...

[C++错误经验]case语句跳过变量初始化

标题&#xff1a;[C错误经验]case语句跳过变量初始化 水墨不写bug 文章目录 一、错误信息复现二、错误分析三、解决方法 一、错误信息复现 write.cc:80:14: error: jump to case label80 | case 2:| ^ write.cc:76:20: note: crosses initialization…...