springboot集成Redisson做分布式消息队列
这里演示Redisson做分布式消息队列。首先引入 Redisson依赖,官方github
<dependency><groupId>org.redisson</groupId><artifactId>redisson-spring-boot-starter</artifactId><version>3.17.6</version>
</dependency>
首先创建一个自定义注解RedissonTopic.java,用于指定消息的路由key
package com.zyq.annotation;import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.util.concurrent.TimeUnit;/** Redissson消息队列注解* author xiaochi* date 2024/10/23*/
@Inherited
@Documented
@Target({ElementType.TYPE,ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface RedissonTopic {/*** topic名称* @return*/String key();/*** 是否队列发送消息* @return*/boolean queue() default false;/*** 队列容量* @return*/int queueSize() default 100;/** queue为true时生效* 延迟发送时间(大于0默认延迟,延迟队列可设置大于0)* @return*/int delayTime() default 0;/** queue为true时生效* 时间单位(默认毫秒)* @return*/TimeUnit timeUnit() default TimeUnit.MILLISECONDS;
}
继续创建消息监听器 RedissonTopicMessageListener.java,具体内容如下:
package com.zyq.listener;/** Redisson消息监听接口* author xiaochi* date 2024/10/23*/
public interface RedissonTopicMessageListener{/*** 接收的消息处理* @param message*/void message(Object message);/*** 发送失败(队列已满时会调用)* @param message*/void sendFail(Object message);/*** 异常* @param message*/void exception(Object message);
}
接下来就是最重要的Redisson配置,内容如下:
/*** Redisson 配置* @return*/
@Bean(destroyMethod="shutdown")
public RedissonClient redissonClient(ConfigurableApplicationContext applicationContext){Config config = new Config();config.useSingleServer().setPassword("123456").setDatabase(0).setConnectionPoolSize(24) // 连接池大小,默认64.setConnectionMinimumIdleSize(3) // 最小空闲连接数,默认32.setRetryAttempts(3) // 命令失败重试次数 3.setRetryInterval(1500) // 命令重试发送时间间隔(毫秒) 默认1500.setTimeout(10000) // 命令等待超时(毫秒) 默认10000.setConnectTimeout(10000) // 连接空闲超时(毫秒) 默认10000.setIdleConnectionTimeout(10000) // 连接空闲超时(毫秒) 默认10000.setSubscriptionConnectionMinimumIdleSize(1) // 发布和订阅连接的最小空闲连接数.setSubscriptionConnectionPoolSize(24) // 发布和订阅连接池大小 默认50.setDnsMonitoringInterval(10000) // DNS监测时间间隔(毫秒),默认5000*//*.setAddress("redis://127.0.0.1:6379");//config.setThreads(Runtime.getRuntime().availableProcessors());// 默认 16RedissonClient redissonClient = Redisson.create(config);StringBuilder msg = new StringBuilder();msg.append("Redisson topic register[");String[] beanNames = applicationContext.getBeanNamesForType(RedissonTopicMessageListener.class);for (String beanName : beanNames) {RedissonTopicMessageListener topicMessageListener = applicationContext.getBean(beanName, RedissonTopicMessageListener.class);if (topicMessageListener.getClass().isAnnotationPresent(RedissonTopic.class)){RedissonTopic redissonTopic = topicMessageListener.getClass().getAnnotation(RedissonTopic.class);if (redissonTopic.queue()){RBoundedBlockingQueue<Object> boundedBlockingQueue = redissonClient.getBoundedBlockingQueue(redissonTopic.key());boundedBlockingQueue.trySetCapacity(redissonTopic.queueSize());RDelayedQueue<Object> delayedQueue = null;if (0 != redissonTopic.delayTime()){delayedQueue = redissonClient.getDelayedQueue(boundedBlockingQueue);}RTopic topic = redissonClient.getTopic(redissonTopic.key());RDelayedQueue<Object> finalDelayedQueue = delayedQueue;topic.addListener(Object.class, (channel, message) -> {if (finalDelayedQueue != null){try {finalDelayedQueue.offer(message,redissonTopic.delayTime(), redissonTopic.timeUnit());}catch (Exception e){topicMessageListener.exception(e.getMessage());log.info("Redisson添加延迟队列异常,{}",e);}}else {try {if (!boundedBlockingQueue.offer(message)){topicMessageListener.sendFail(message);}}catch (Exception e){topicMessageListener.exception(e.getMessage());log.info("Redisson添加队列异常,{}",e);}}});// 为了不阻塞主线程,放在新线程中运行AsyncUtil.run(() -> {while (!Thread.currentThread().isInterrupted() && !redissonClient.isShutdown()){try {Object take = boundedBlockingQueue.take();if (!"".equals(take)){topicMessageListener.message(take);}} catch (Exception e) {topicMessageListener.exception(e.getMessage());log.info("Redisson延迟队列监测异常,{}",e);}}if (Thread.currentThread().isInterrupted() || redissonClient.isShutdown()){log.info("Redisson service shutdown");}});}else {RTopic topic = redissonClient.getTopic(redissonTopic.key());topic.addListener(Object.class, (channel,message) -> {try {topicMessageListener.message(message);}catch (Exception e){topicMessageListener.exception(e.getMessage());log.info("Redisson即时消息发送异常,{}",e);}});}msg.append(redissonTopic.key()).append(".");}}msg.append("]").append("finish.");log.info(msg.toString());return redissonClient;
}
到此基本就完成了,接下来就是创建消息监听类进行消费消息了TopicMessageListener.java 去实现消息监听器接口RedissonTopicMessageListener.java
package com.zyq.listener;import com.zyq.annotation.RedissonTopic;
import org.springframework.stereotype.Component;/** 消息监听类* author xiaochi* date 2024/10/23*/
@Component
@RedissonTopic(key = "testTopic",queue = true,delayTime = 5000)
public class TopicMessageListener implements RedissonTopicMessageListener {@Overridepublic void message(Object message) {System.out.println("testTopic监听器延迟队列收到消息," + message);}@Overridepublic void sendFail(Object message) {System.out.println("延迟队列 TopicMessageListener testTopic消息发送失败");}@Overridepublic void exception(Object message) {System.out.println("延迟队列 TopicMessageListener testTopic消息异常");}
}
现在可以起2个springboot项目进行消息交流了。封装一个消息发送工具 RedissonMessageUtil.java,内容如下:
package com.demo3.util;import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;/** Redisson消息发送工具* author xiaochi* date 2024/10/24*/
@Component
public class RedissonMessageUtil {private static RedissonClient redissonClient;@Autowiredpublic void setRedissonClient(RedissonClient redissonClient) {RedissonMessageUtil.redissonClient = redissonClient;}/*** 发送消息* @param key* @param message* @return 返回接收消息的客户端数量*/public static long send(String key,Object message){RTopic topic = redissonClient.getTopic(key);return topic.publish(message);}
}
到此完成。
相关文章:
springboot集成Redisson做分布式消息队列
这里演示Redisson做分布式消息队列。首先引入 Redisson依赖,官方github <dependency><groupId>org.redisson</groupId><artifactId>redisson-spring-boot-starter</artifactId><version>3.17.6</version> </dependen…...
如何通过Lua语言请求接口拿到数据
文章目录 概要http客户端通过请求下载数据 概要 当某个需求是需要在模块内请求接口拿到数据,需要使用http客户端调用接口 http客户端 LuaSOC请求接口官方文档 调用:http.request(method,url,headers,body,opts,ca_file,client_ca, client_key, clien…...
Android 13 SystemUI 隐藏下拉快捷面板部分模块(wifi,bt,nfc等)入口
frameworks/base/packages/SystemUI/src/com/android/systemui/qs/tileimpl/QSFactoryImpl.java createTileInternal(tileSpec)方法注释想隐藏的模块即可。...
自由学习记录(14)
unity操作问题 位置:子物体的位置是相对于父物体的。如果你移动父物体,子物体会保持相对于父物体的相对位置,跟着一起移动。 旋转:子物体的旋转也是相对于父物体的。旋转父物体会导致子物体围绕父物体的原点旋转。 缩放…...
疯狂Spring Boot讲义[推荐1]
《疯狂Spring Boot讲义》是2021年电子工业出版社出版的图书,作者是李刚 《疯狂Spring Boot终极讲义》不是一本介绍类似于PathVariable、MatrixVariable、RequestBody、ResponseBody这些基础注解的图书,它是真正讲解Spring Boot的图书。Spring Boot的核心…...
vue中$nextTick的作用是什么,什么时候使用
$nextTick 是 Vue 提供的一个方法,用于在下一次 DOM 更新周期之后执行回调函数。它通常用于在 Vue 完成数据更新后,需要访问更新后的 DOM 状态时,保证操作的是更新后的 DOM。 工作原理: Vue 是异步更新 DOM 的,当数据…...
Redis实现全局ID生成器
全局ID生成器 为什么要用全局ID生成器 1.当我们使用数据库自增来实现id的生成时,规律过于明显,会给用户暴露很多信息 2.当我们订单量过大时无法用数据库的一张表来存放订单,如果两张表的id都是自增的话,id就会出现重复 什么是全局ID生成器 全局ID生成器,是一种在分布式系统…...
Xshell远程连接工具详解
Xshell是一款在Windows平台上运行的远程连接工具,它支持SSH1、SSH2以及Microsoft Windows平台的TELNET协议。Xshell通过互联网实现对远程主机的安全连接,帮助用户在复杂的网络环境中享受他们的工作。本文将详细介绍Xshell的溯源、最新版本以及它的优势。…...
如何在verilog设计的磁盘阵列控制器中实现不同RAID级别(如RAID 0、RAID 1等)的切换?
以下是一种在Verilog设计的磁盘阵列控制器中实现不同RAID级别(以RAID 0和RAID 1为例)切换的方法: 添加控制信号 在磁盘阵列控制器模块中添加一个输入信号,例如raid_mode,用于选择RAID模式。假设raid_mode = 0表示RAID 0模式,raid_mode = 1表示RAID 1模式。module raid_co…...
基于元神操作系统实现NTFS文件操作(十)
1. 背景 本文补充介绍文件遍历操作的部分附加内容,譬如,过滤掉系统元文件、过滤掉重复的文件项、过滤掉隐藏文件等,并提供了基于元神操作系统的部分实现代码。 2. 方法 (1)过滤掉系统元文件 NTFS文件系统的前16个元…...
Qt的几个函数方法
void receiveInfo1() {// 假设这是从串口接收到的字符串QString receivedString "23.5C,45%,1012hPa";// 使用逗号分隔符分割字符串QStringList parts receivedString.split(,);// 检查分割后的列表是否有足够的部分if (parts.size() > 3) {QString part1 part…...
openpnp - bug - 散料飞达至少定义2个物料
文章目录 openpnp - bug - 散料飞达至少定义2个物料笔记END openpnp - bug - 散料飞达至少定义2个物料 笔记 散料飞达上定义的物料个数用完了,现在只需要一个料就可以。 用顶部相机去找编带上是否还有一个单独的料,找到了。 定义散料飞达的料为1个&…...
HDFS异常org.apache.hadoop.hdfs.protocol.NSQuotaExceededException
HDFS异常org.apache.hadoop.hdfs.protocol.NSQuotaExceededException 异常信息: Hive:org.apache.hadoop.hdfs.protocol.NSQuotaExceededException: The NameSpace quota (directories and files) of directory /xxxdir is exceeded: quota10000 file count15001N…...
数据库的构成与手写简单数据库的探索
一、引言 在当今数字化的时代,数据库扮演着至关重要的角色。无论是企业管理系统、电子商务平台还是各种移动应用,都离不开数据库的支持。数据库是存储和管理数据的核心工具,它的高效性、可靠性和安全性对于数据的处理和应用至关重要。本文将…...
基于STM32的智能晾衣架设计
引言 随着智能家居的普及,智能晾衣架成为了提升生活便利性的重要设备。智能晾衣架通过集成多个传感器,能够自动感知天气变化、湿度、光照等环境因素,实现自动升降、风干和报警功能,帮助用户更加高效地晾晒衣物。本项目基于STM32设…...
【MAUI】模糊控件(毛玻璃高斯模糊亚克力模糊)
文章目录 XAML.CSToBytes方法使用效果 常试过AcrylicView.MAUI和Sharpnado.MaterialFrame,对于二者教程很少,使用直接写控件然后调属性,没有报错但也并没有效果所幸就自己写一个 XAML <?xml version"1.0" encoding"utf-…...
深度学习:pandas篇
1. Pandas 基础 Pandas 是一个帮助你处理和分析数据的工具 安装 Pandas pip install pandas 导入 Pandas,我们用 pd 来代替 Pandas 的全称,这样以后写代码的时候更简洁 import pandas as pd 建 Series 和 DataFrame Pandas 最基本的两个数据结构是…...
Redis学习文档(Redis基本数据类型【Hash、Set】)
Hash(哈希) 介绍 Redis 中的 Hash 是一个 String 类型的 field-value(键值对) 的映射表,特别适合用于存储对象,后续操作的时候,你可以直接修改这个对象中的某些字段的值。 Hash 类似于 JDK1.…...
15分钟学Go 第9天:函数的定义与调用
第9天:函数的定义与调用 欢迎来到第9天的Go语言学习模块!今天我们将深入探讨函数的定义与调用,帮助你掌握如何编写和使用函数。学习函数不仅是Go语言的基础,也是程序设计的核心概念之一。这一节将详细介绍函数的结构、参数传递、…...
Java虚拟机:JVM介绍
1024 程序员节日快乐!愿您我的代码永远没有 bug ,人生永远没有 bug ! JVM 概述JVM 架构 概述 JVM( Java Virtual Machine ,Java 虚拟机),是 Java 语言的运行环境,是运行所有 Java 程…...
【磁盘】每天掌握一个Linux命令 - iostat
目录 【磁盘】每天掌握一个Linux命令 - iostat工具概述安装方式核心功能基础用法进阶操作实战案例面试题场景生产场景 注意事项 【磁盘】每天掌握一个Linux命令 - iostat 工具概述 iostat(I/O Statistics)是Linux系统下用于监视系统输入输出设备和CPU使…...
Nginx server_name 配置说明
Nginx 是一个高性能的反向代理和负载均衡服务器,其核心配置之一是 server 块中的 server_name 指令。server_name 决定了 Nginx 如何根据客户端请求的 Host 头匹配对应的虚拟主机(Virtual Host)。 1. 简介 Nginx 使用 server_name 指令来确定…...
C# 类和继承(抽象类)
抽象类 抽象类是指设计为被继承的类。抽象类只能被用作其他类的基类。 不能创建抽象类的实例。抽象类使用abstract修饰符声明。 抽象类可以包含抽象成员或普通的非抽象成员。抽象类的成员可以是抽象成员和普通带 实现的成员的任意组合。抽象类自己可以派生自另一个抽象类。例…...
【python异步多线程】异步多线程爬虫代码示例
claude生成的python多线程、异步代码示例,模拟20个网页的爬取,每个网页假设要0.5-2秒完成。 代码 Python多线程爬虫教程 核心概念 多线程:允许程序同时执行多个任务,提高IO密集型任务(如网络请求)的效率…...
RNN避坑指南:从数学推导到LSTM/GRU工业级部署实战流程
本文较长,建议点赞收藏,以免遗失。更多AI大模型应用开发学习视频及资料,尽在聚客AI学院。 本文全面剖析RNN核心原理,深入讲解梯度消失/爆炸问题,并通过LSTM/GRU结构实现解决方案,提供时间序列预测和文本生成…...
JAVA后端开发——多租户
数据隔离是多租户系统中的核心概念,确保一个租户(在这个系统中可能是一个公司或一个独立的客户)的数据对其他租户是不可见的。在 RuoYi 框架(您当前项目所使用的基础框架)中,这通常是通过在数据表中增加一个…...
R语言速释制剂QBD解决方案之三
本文是《Quality by Design for ANDAs: An Example for Immediate-Release Dosage Forms》第一个处方的R语言解决方案。 第一个处方研究评估原料药粒径分布、MCC/Lactose比例、崩解剂用量对制剂CQAs的影响。 第二处方研究用于理解颗粒外加硬脂酸镁和滑石粉对片剂质量和可生产…...
JavaScript基础-API 和 Web API
在学习JavaScript的过程中,理解API(应用程序接口)和Web API的概念及其应用是非常重要的。这些工具极大地扩展了JavaScript的功能,使得开发者能够创建出功能丰富、交互性强的Web应用程序。本文将深入探讨JavaScript中的API与Web AP…...
Linux系统部署KES
1、安装准备 1.版本说明V008R006C009B0014 V008:是version产品的大版本。 R006:是release产品特性版本。 C009:是通用版 B0014:是build开发过程中的构建版本2.硬件要求 #安全版和企业版 内存:1GB 以上 硬盘…...
HybridVLA——让单一LLM同时具备扩散和自回归动作预测能力:训练时既扩散也回归,但推理时则扩散
前言 如上一篇文章《dexcap升级版之DexWild》中的前言部分所说,在叠衣服的过程中,我会带着团队对比各种模型、方法、策略,毕竟针对各个场景始终寻找更优的解决方案,是我个人和我司「七月在线」的职责之一 且个人认为,…...
