当前位置: 首页 > news >正文

springboot集成Redisson做分布式消息队列

这里演示Redisson做分布式消息队列。首先引入 Redisson依赖,官方github

<dependency><groupId>org.redisson</groupId><artifactId>redisson-spring-boot-starter</artifactId><version>3.17.6</version>
</dependency>

首先创建一个自定义注解RedissonTopic.java,用于指定消息的路由key

package com.zyq.annotation;import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.util.concurrent.TimeUnit;/** Redissson消息队列注解* author xiaochi* date 2024/10/23*/
@Inherited
@Documented
@Target({ElementType.TYPE,ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface RedissonTopic {/*** topic名称* @return*/String key();/*** 是否队列发送消息* @return*/boolean queue() default false;/*** 队列容量* @return*/int queueSize() default 100;/** queue为true时生效* 延迟发送时间(大于0默认延迟,延迟队列可设置大于0)* @return*/int delayTime() default 0;/** queue为true时生效* 时间单位(默认毫秒)* @return*/TimeUnit timeUnit() default TimeUnit.MILLISECONDS;
}

继续创建消息监听器 RedissonTopicMessageListener.java,具体内容如下:

package com.zyq.listener;/** Redisson消息监听接口* author xiaochi* date 2024/10/23*/
public interface RedissonTopicMessageListener{/*** 接收的消息处理* @param message*/void message(Object message);/*** 发送失败(队列已满时会调用)* @param message*/void sendFail(Object message);/*** 异常* @param message*/void exception(Object message);
}

接下来就是最重要的Redisson配置,内容如下:

/*** Redisson 配置* @return*/
@Bean(destroyMethod="shutdown")
public RedissonClient redissonClient(ConfigurableApplicationContext applicationContext){Config config = new Config();config.useSingleServer().setPassword("123456").setDatabase(0).setConnectionPoolSize(24) // 连接池大小,默认64.setConnectionMinimumIdleSize(3) // 最小空闲连接数,默认32.setRetryAttempts(3) // 命令失败重试次数 3.setRetryInterval(1500) // 命令重试发送时间间隔(毫秒) 默认1500.setTimeout(10000) // 命令等待超时(毫秒) 默认10000.setConnectTimeout(10000) // 连接空闲超时(毫秒) 默认10000.setIdleConnectionTimeout(10000) // 连接空闲超时(毫秒) 默认10000.setSubscriptionConnectionMinimumIdleSize(1) // 发布和订阅连接的最小空闲连接数.setSubscriptionConnectionPoolSize(24) // 发布和订阅连接池大小 默认50.setDnsMonitoringInterval(10000) // DNS监测时间间隔(毫秒),默认5000*//*.setAddress("redis://127.0.0.1:6379");//config.setThreads(Runtime.getRuntime().availableProcessors());// 默认 16RedissonClient redissonClient = Redisson.create(config);StringBuilder msg = new StringBuilder();msg.append("Redisson topic register[");String[] beanNames = applicationContext.getBeanNamesForType(RedissonTopicMessageListener.class);for (String beanName : beanNames) {RedissonTopicMessageListener topicMessageListener = applicationContext.getBean(beanName, RedissonTopicMessageListener.class);if (topicMessageListener.getClass().isAnnotationPresent(RedissonTopic.class)){RedissonTopic redissonTopic = topicMessageListener.getClass().getAnnotation(RedissonTopic.class);if (redissonTopic.queue()){RBoundedBlockingQueue<Object> boundedBlockingQueue = redissonClient.getBoundedBlockingQueue(redissonTopic.key());boundedBlockingQueue.trySetCapacity(redissonTopic.queueSize());RDelayedQueue<Object> delayedQueue = null;if (0 != redissonTopic.delayTime()){delayedQueue = redissonClient.getDelayedQueue(boundedBlockingQueue);}RTopic topic = redissonClient.getTopic(redissonTopic.key());RDelayedQueue<Object> finalDelayedQueue = delayedQueue;topic.addListener(Object.class, (channel, message) -> {if (finalDelayedQueue != null){try {finalDelayedQueue.offer(message,redissonTopic.delayTime(), redissonTopic.timeUnit());}catch (Exception e){topicMessageListener.exception(e.getMessage());log.info("Redisson添加延迟队列异常,{}",e);}}else {try {if (!boundedBlockingQueue.offer(message)){topicMessageListener.sendFail(message);}}catch (Exception e){topicMessageListener.exception(e.getMessage());log.info("Redisson添加队列异常,{}",e);}}});// 为了不阻塞主线程,放在新线程中运行AsyncUtil.run(() -> {while (!Thread.currentThread().isInterrupted() && !redissonClient.isShutdown()){try {Object take = boundedBlockingQueue.take();if (!"".equals(take)){topicMessageListener.message(take);}} catch (Exception e) {topicMessageListener.exception(e.getMessage());log.info("Redisson延迟队列监测异常,{}",e);}}if (Thread.currentThread().isInterrupted() || redissonClient.isShutdown()){log.info("Redisson service shutdown");}});}else {RTopic topic = redissonClient.getTopic(redissonTopic.key());topic.addListener(Object.class, (channel,message) -> {try {topicMessageListener.message(message);}catch (Exception e){topicMessageListener.exception(e.getMessage());log.info("Redisson即时消息发送异常,{}",e);}});}msg.append(redissonTopic.key()).append(".");}}msg.append("]").append("finish.");log.info(msg.toString());return redissonClient;
}

到此基本就完成了,接下来就是创建消息监听类进行消费消息了TopicMessageListener.java 去实现消息监听器接口RedissonTopicMessageListener.java

package com.zyq.listener;import com.zyq.annotation.RedissonTopic;
import org.springframework.stereotype.Component;/** 消息监听类* author xiaochi* date 2024/10/23*/
@Component
@RedissonTopic(key = "testTopic",queue = true,delayTime = 5000)
public class TopicMessageListener implements RedissonTopicMessageListener {@Overridepublic void message(Object message) {System.out.println("testTopic监听器延迟队列收到消息," + message);}@Overridepublic void sendFail(Object message) {System.out.println("延迟队列 TopicMessageListener testTopic消息发送失败");}@Overridepublic void exception(Object message) {System.out.println("延迟队列 TopicMessageListener testTopic消息异常");}
}

现在可以起2个springboot项目进行消息交流了。封装一个消息发送工具 RedissonMessageUtil.java,内容如下:

package com.demo3.util;import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;/** Redisson消息发送工具* author xiaochi* date 2024/10/24*/
@Component
public class RedissonMessageUtil {private static RedissonClient redissonClient;@Autowiredpublic void setRedissonClient(RedissonClient redissonClient) {RedissonMessageUtil.redissonClient = redissonClient;}/*** 发送消息* @param key* @param message* @return 返回接收消息的客户端数量*/public static long send(String key,Object message){RTopic topic = redissonClient.getTopic(key);return topic.publish(message);}
}

到此完成。

相关文章:

springboot集成Redisson做分布式消息队列

这里演示Redisson做分布式消息队列。首先引入 Redisson依赖&#xff0c;官方github <dependency><groupId>org.redisson</groupId><artifactId>redisson-spring-boot-starter</artifactId><version>3.17.6</version> </dependen…...

如何通过Lua语言请求接口拿到数据

文章目录 概要http客户端通过请求下载数据 概要 当某个需求是需要在模块内请求接口拿到数据&#xff0c;需要使用http客户端调用接口 http客户端 LuaSOC请求接口官方文档 调用&#xff1a;http.request(method,url,headers,body,opts,ca_file,client_ca, client_key, clien…...

Android 13 SystemUI 隐藏下拉快捷面板部分模块(wifi,bt,nfc等)入口

frameworks/base/packages/SystemUI/src/com/android/systemui/qs/tileimpl/QSFactoryImpl.java createTileInternal(tileSpec)方法注释想隐藏的模块即可。...

自由学习记录(14)

unity操作问题 位置&#xff1a;子物体的位置是相对于父物体的。如果你移动父物体&#xff0c;子物体会保持相对于父物体的相对位置&#xff0c;跟着一起移动。 旋转&#xff1a;子物体的旋转也是相对于父物体的。旋转父物体会导致子物体围绕父物体的原点旋转。 缩放&#xf…...

疯狂Spring Boot讲义[推荐1]

《疯狂Spring Boot讲义》是2021年电子工业出版社出版的图书&#xff0c;作者是李刚 《疯狂Spring Boot终极讲义》不是一本介绍类似于PathVariable、MatrixVariable、RequestBody、ResponseBody这些基础注解的图书&#xff0c;它是真正讲解Spring Boot的图书。Spring Boot的核心…...

vue中$nextTick的作用是什么,什么时候使用

$nextTick 是 Vue 提供的一个方法&#xff0c;用于在下一次 DOM 更新周期之后执行回调函数。它通常用于在 Vue 完成数据更新后&#xff0c;需要访问更新后的 DOM 状态时&#xff0c;保证操作的是更新后的 DOM。 工作原理&#xff1a; Vue 是异步更新 DOM 的&#xff0c;当数据…...

Redis实现全局ID生成器

全局ID生成器 为什么要用全局ID生成器 1.当我们使用数据库自增来实现id的生成时,规律过于明显,会给用户暴露很多信息 2.当我们订单量过大时无法用数据库的一张表来存放订单,如果两张表的id都是自增的话,id就会出现重复 什么是全局ID生成器 全局ID生成器,是一种在分布式系统…...

Xshell远程连接工具详解

Xshell是一款在Windows平台上运行的远程连接工具&#xff0c;它支持SSH1、SSH2以及Microsoft Windows平台的TELNET协议。Xshell通过互联网实现对远程主机的安全连接&#xff0c;帮助用户在复杂的网络环境中享受他们的工作。本文将详细介绍Xshell的溯源、最新版本以及它的优势。…...

如何在verilog设计的磁盘阵列控制器中实现不同RAID级别(如RAID 0、RAID 1等)的切换?

以下是一种在Verilog设计的磁盘阵列控制器中实现不同RAID级别(以RAID 0和RAID 1为例)切换的方法: 添加控制信号 在磁盘阵列控制器模块中添加一个输入信号,例如raid_mode,用于选择RAID模式。假设raid_mode = 0表示RAID 0模式,raid_mode = 1表示RAID 1模式。module raid_co…...

基于元神操作系统实现NTFS文件操作(十)

1. 背景 本文补充介绍文件遍历操作的部分附加内容&#xff0c;譬如&#xff0c;过滤掉系统元文件、过滤掉重复的文件项、过滤掉隐藏文件等&#xff0c;并提供了基于元神操作系统的部分实现代码。 2. 方法 &#xff08;1&#xff09;过滤掉系统元文件 NTFS文件系统的前16个元…...

Qt的几个函数方法

void receiveInfo1() {// 假设这是从串口接收到的字符串QString receivedString "23.5C,45%,1012hPa";// 使用逗号分隔符分割字符串QStringList parts receivedString.split(,);// 检查分割后的列表是否有足够的部分if (parts.size() > 3) {QString part1 part…...

openpnp - bug - 散料飞达至少定义2个物料

文章目录 openpnp - bug - 散料飞达至少定义2个物料笔记END openpnp - bug - 散料飞达至少定义2个物料 笔记 散料飞达上定义的物料个数用完了&#xff0c;现在只需要一个料就可以。 用顶部相机去找编带上是否还有一个单独的料&#xff0c;找到了。 定义散料飞达的料为1个&…...

HDFS异常org.apache.hadoop.hdfs.protocol.NSQuotaExceededException

HDFS异常org.apache.hadoop.hdfs.protocol.NSQuotaExceededException 异常信息&#xff1a; Hive:org.apache.hadoop.hdfs.protocol.NSQuotaExceededException: The NameSpace quota (directories and files) of directory /xxxdir is exceeded: quota10000 file count15001N…...

数据库的构成与手写简单数据库的探索

一、引言 在当今数字化的时代&#xff0c;数据库扮演着至关重要的角色。无论是企业管理系统、电子商务平台还是各种移动应用&#xff0c;都离不开数据库的支持。数据库是存储和管理数据的核心工具&#xff0c;它的高效性、可靠性和安全性对于数据的处理和应用至关重要。本文将…...

基于STM32的智能晾衣架设计

引言 随着智能家居的普及&#xff0c;智能晾衣架成为了提升生活便利性的重要设备。智能晾衣架通过集成多个传感器&#xff0c;能够自动感知天气变化、湿度、光照等环境因素&#xff0c;实现自动升降、风干和报警功能&#xff0c;帮助用户更加高效地晾晒衣物。本项目基于STM32设…...

【MAUI】模糊控件(毛玻璃高斯模糊亚克力模糊)

文章目录 XAML.CSToBytes方法使用效果 常试过AcrylicView.MAUI和Sharpnado.MaterialFrame&#xff0c;对于二者教程很少&#xff0c;使用直接写控件然后调属性&#xff0c;没有报错但也并没有效果所幸就自己写一个 XAML <?xml version"1.0" encoding"utf-…...

深度学习:pandas篇

1. Pandas 基础 Pandas 是一个帮助你处理和分析数据的工具 安装 Pandas pip install pandas 导入 Pandas&#xff0c;我们用 pd 来代替 Pandas 的全称&#xff0c;这样以后写代码的时候更简洁 import pandas as pd 建 Series 和 DataFrame Pandas 最基本的两个数据结构是…...

Redis学习文档(Redis基本数据类型【Hash、Set】)

Hash&#xff08;哈希&#xff09; 介绍 Redis 中的 Hash 是一个 String 类型的 field-value&#xff08;键值对&#xff09; 的映射表&#xff0c;特别适合用于存储对象&#xff0c;后续操作的时候&#xff0c;你可以直接修改这个对象中的某些字段的值。 Hash 类似于 JDK1.…...

15分钟学Go 第9天:函数的定义与调用

第9天&#xff1a;函数的定义与调用 欢迎来到第9天的Go语言学习模块&#xff01;今天我们将深入探讨函数的定义与调用&#xff0c;帮助你掌握如何编写和使用函数。学习函数不仅是Go语言的基础&#xff0c;也是程序设计的核心概念之一。这一节将详细介绍函数的结构、参数传递、…...

Java虚拟机:JVM介绍

1024 程序员节日快乐&#xff01;愿您我的代码永远没有 bug &#xff0c;人生永远没有 bug &#xff01; JVM 概述JVM 架构 概述 JVM&#xff08; Java Virtual Machine &#xff0c;Java 虚拟机&#xff09;&#xff0c;是 Java 语言的运行环境&#xff0c;是运行所有 Java 程…...

R数据科学 16.5.3练习题

(1) 编写代码以使用一种映射函数完成以下任务。 a. 计算 mtcars 数据集中每列的均值。 b. 确定 nycflights13::flights 数据集中每列的类型。 c. 计算 iris 数据集中每列唯一值的数量。 d. 分别使用 μ -10、0、10 和 100 的正态分布生成 10 个随机数。 library(purrr) # 计算…...

通过conda install -c nvidia cuda=“11.3.0“ 安装低版本的cuda,但是却安装了高版本的12.4.0

问题 直接通过 conda install -c nvidia cuda"11.3.0"安装得到的却是高版本的 不清楚原理 解决方法 不过我们可以分个安装 runtime toolkit 和 nvcc 安装指定版本的 cudatoolkit 和 nvcc conda install -c nvidia cuda-cudart"11.3.58" conda instal…...

简易CPU设计入门:验证取指令模块

项目代码下载 还是请大家首先准备好本项目所用的源代码。如果已经下载了&#xff0c;那就不用重复下载了。如果还没有下载&#xff0c;那么&#xff0c;请大家点击下方链接&#xff0c;来了解下载本项目的CPU源代码的方法。 下载本项目代码 准备好了项目源代码以后&#xff…...

【MySQL数据库】MySQL主从复制

文章目录 MySQL主从复制MySQL主从复制的分类MySQL主从复制原理MySQL主从复制的配置步骤MySQL主从复制的同步模式 MySQL主从复制实验环境准备关闭防火墙和 SELinux时间同步主服务器设置从服务器设置 MySQL 主从复制配置主服务器配置从服务器配置&#xff08;以 Slave1 为例&…...

CDC变更数据捕捉技术是什么?和ETL有什么不同?

一、什么是CDC技术? 变更数据捕获&#xff08;Change Data Capture&#xff0c;简称 CDC&#xff09;是一种用于识别和跟踪数据源中发生变化的数据的技术。 工作原理&#xff1a; 1.监测数据源&#xff1a;CDC 工具会持续监测指定的数据源&#xff0c;如数据库表、文件系统…...

一种用于推进欧洲临床中心中风管理的联邦学习平台即服务

论文标题&#xff1a;A Federated Learning Platform as a Service for Advancing Stroke Management in European Clinical Centers 作者信息&#xff1a; Diogo Reis Santos, Albert Sund Aillet, Antonio Boiano, Usevalad Milasheuski, Lorenzo Giusti, Marco Di Gennaro…...

给哔哩哔哩bilibili电脑版做个手机遥控器

前言 bilibili电脑版可以在电脑屏幕上观看bilibili视频。然而&#xff0c;电脑版的bilibili不能通过手机控制视频翻页和调节音量&#xff0c;这意味着观看视频时需要一直坐在电脑旁边。那么&#xff0c;有没有办法制作一个手机遥控器来控制bilibili电脑版呢&#xff1f; 首先…...

opencv dnn模块 示例(27) 目标检测 object_detection 之 yolov11

文章目录 1、YOLO v11 介绍1.1、改进点特性1.2、性能对比1.3、多任务支持 2、测试2.1、官方Python测试2.2、Opencv dnn测试2.3、测试统计 3、训练 1、YOLO v11 介绍 YOLO11是Ultralytics实时目标探测器系列中最新的迭代版本&#xff0c;重新定义尖端的精度、速度和效率。在以往…...

鸿蒙开发融云demo初始化和登录

鸿蒙开发融云IMKit初始化和登录 融云鸿蒙版是不带UI的&#xff0c;得自己一步步搭建。 下面说如何初始化和登录&#xff1a; 一、初始化&#xff1a; /*** desc : 初始化融云* author : congge on 2024-07-12 15:47**/public static initRongIm() {IMEngine.getInstance()…...

手机防窥膜的工作原理是怎样的?有必要使用防窥膜吗?

在信息高度发达的社会中&#xff0c;我们通过手机可以实现非常多的操作&#xff0c;同时手机中有存在许多我们的隐私信息&#xff0c;伴随使用手机的时间增多&#xff0c;手机中的信息也有可能被暴露&#xff0c;尤其是在公共场所旁人很容易通过瞥视你的手机屏幕获取到一些信息…...

做网站需准备些什么软件/传统营销和网络营销的区别

在设计中&#xff0c;为了减少管脚&#xff0c;在有些工业标准中的数据总线设计为复用的方式&#xff0c;既输入输出在物理上是同一个管脚&#xff0c;为了避免输入输出信号的冲突&#xff0c;双向端口采用了使能信号对输出进行控制。 与三态端口相类似的&#xff0c;FPGA内部没…...

图片网站如何做百度排名/seo教程排名第一

2021新的一年&#xff0c;开启新的征程&#xff0c;回顾2020&#xff0c;真是太“南”了。 从年初各大厂裁员&#xff0c;竟然成为一件理所应当的事情&#xff0c;到四月份 GitHub 上“996.ICU” 引起了大家的共鸣。即使我们兢兢业业“996”&#xff0c;但依旧难以抵御 35 岁时…...

中国新闻社是国企还是私企/网站seo优化是什么

前言 使用hive&#xff0c;我们很多情况下会并发调用hive程序&#xff0c;将sql任务转换成mapreuce提交到hadoop集群中&#xff0c;而在本人使用hive的过程中&#xff0c;发现并发调用hive有几个问题,在这个和大家分享下. 正文 默认安装hive&#xff0c;hive是使用derby内存数据…...

医院网站建设解决方案/aso优化工具

在OS X的系统中&#xff0c;不再有Windows用户熟悉的C盘、D盘&#xff0c;这是因为OS X底层是Unix系统&#xff0c;其目录机构符合Unix系统的规范。MAC机器主板使用了Intel主导的EFI标准&#xff0c;硬盘分区格式采用GPT。这种EFIGPT的方式相比传统的BIOS&#xff0b;MBR的方式…...

学网站建设培训班/做网上推广

最近在看effective-python&#xff0c;第二章函数中提到了优先排序的概念&#xff0c;具体代码如下&#xff1a;values [1, 5, 3, 9, 7, 4, 2, 8, 6]group [7, 9]def sort_priority(values, group):def helper(x):if x in group:return (0, x)return (1, x)values.sort(keyhe…...

手机网站模板 怎样做/东莞seo报价

描述SWRITE具有与CWRITE类似的功能和语法。但是&#xff0c;与CWRITE不同&#xff0c;SWRITE不会将数据写入通道&#xff0c;而是写入CHAR数组。1. 可以将CWRITE限制为将数据写入通道。 SWRITE可以执行更复杂的格式化任务。这使程序更加灵活。2. CWRITE最多可以处理10个变量。…...