当前位置: 首页 > news >正文

springboot集成mqtt

引入jar包

<dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId>
</dependency>

配置

mqtt:username: admin  #用户名password: admin  #密码address: tcp://192.168.236.136:1883   #地址通过TCP访问,realtime:topicprefix: /realtime/    #实时数据消息topic前缀

mqtt生产者配置类

@Configuration
@IntegrationComponentScan
public class MqttConfig {@Value("${mqtt.username}")private String userName;@Value("${mqtt.password}")private String passWord;@Value("${mqtt.address}")private String address;@Beanpublic MqttPahoClientFactory mqttClientFactory() {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();MqttConnectOptions options = new MqttConnectOptions();options.setCleanSession(true);options.setConnectionTimeout(10);options.setKeepAliveInterval(90);options.setAutomaticReconnect(true);options.setKeepAliveInterval(2);options.setServerURIs(new String[]{address});options.setUserName(userName);options.setPassword(passWord.toCharArray());factory.setConnectionOptions(options);return factory;}/*** @return 创建推送通道。*/@Beanpublic MessageChannel mqttOutboundChannel() {return new DirectChannel();}/*** @return 默认输出。*/@Bean@ServiceActivator(inputChannel = "mqttOutboundChannel")public MessageHandler outbound() {// 在这里进行mqttOutboundChannel的相关设置MqttPahoMessageHandler messageHandler =new MqttPahoMessageHandler("publishClient", mqttClientFactory());messageHandler.setAsync(true); //如果设置成true,发送消息时将不会阻塞。messageHandler.setDefaultTopic("mqttTopic");return messageHandler;}/*** 推送MQTT的网关。*/@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")public interface MqttSender {/*** 发送信息到MQTT服务器** @param data 发送的文本*/void sendToMqtt(String data);/*** 发送信息到MQTT服务器** @param topic   主题* @param payload 消息主体*/void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,String payload);/*** 发送信息到MQTT服务器** @param topic   主题* @param qos     对消息处理的几种机制。<br> 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。<br>*                1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。<br>*                2 多了一次去重的动作,确保订阅者收到的消息有一次。* @param payload 消息主体*/void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,@Header(MqttHeaders.QOS) int qos,String payload);}
}

mqtt生产者测试

@RestController
@RequestMapping("/v1/mqtt")
public class MqttController {@Autowiredprivate MqttConfig.MqttSender mqttSender;@PostMapping("/v1/create")public HttpResult<Boolean> create(@RequestBody final RemoteControlRequestBean requestBean) {mqttSender.sendToMqtt(requestBean.getTopic(), 0, requestBean.getMsg());return DefaultHttpResultFactory.success("发送成功",Boolean.TRUE);}
}

mqtt消费者配置

@Slf4j
@Configuration
@IntegrationComponentScan
public class MqttConfig {@Value("${mqtt.username}")private String userName;@Value("${mqtt.password}")private String passWord;@Value("${mqtt.address}")private String address;@Beanpublic MqttPahoClientFactory mqttClientFactory() {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();MqttConnectOptions options = new MqttConnectOptions();options.setCleanSession(true);options.setConnectionTimeout(10);options.setKeepAliveInterval(90);options.setAutomaticReconnect(true);options.setKeepAliveInterval(2);options.setServerURIs(new String[]{address});options.setUserName(userName);options.setPassword(passWord.toCharArray());factory.setConnectionOptions(options);return factory;}@Beanpublic MessageProducer inbound() {String[] topics = new String[]{"test","topicTest","123"};MqttPahoMessageDrivenChannelAdapter adapter =new MqttPahoMessageDrivenChannelAdapter( "123_consumer", mqttClientFactory(), topics);adapter.setCompletionTimeout(30000);DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();converter.setPayloadAsBytes(true);adapter.setConverter(converter);adapter.setQos(2);adapter.setOutputChannel(mqttInputChannel());return adapter;}/*** MQTT信息通道(消费者)**/@Beanpublic MessageChannel mqttInputChannel() {return new DirectChannel();}/*** MQTT消息处理器(消费者)**/@Bean@ServiceActivator(inputChannel = "mqttInputChannel")public MessageHandler handler() {return new MessageHandler() {@Overridepublic void handleMessage(Message<?> message) throws MessagingException {log.info("收到订阅消息: {}", message);String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString();log.info("消息主题:{}", topic);Object payLoad = message.getPayload();log.info("发送的Packet数据{}", JSON.toJSONString(payLoad));}};}
}

topic定义的核心是区分业务场景。

mqtt中topic定义和规范

所有的主题名和主题过滤器必须至少包含一个字符。
主题名和主题过滤器是大小写敏感的。如:ACCOUNTS 和 Accounts 是不同的主题名。
主题名和主题过滤器可以包含空格字符。如:Accounts payable 是合法的主题名
主题名或主题过滤器以前置或后置斜杠 / 区分。如:/finance 和 finance 是不同的。
只包含斜杠 / 的主题名或主题过滤器是合法的。
主题名和主题过滤器不能包含 null 字符(Unicode U+0000)。
主题名和主题过滤器是 UTF-8 编码字符串,除了不能超过 UTF-8 编码字符串的长度限制之外,主题名或主题过滤器的层级数量没有其它限制。

mqtt中topic层级

MQTT 协议主题可以通过斜杠(“/” U+002F)将主题分割成多个层级;作为消息通道,客户端可以通过定义主题层级来实现对消息类型的细分;
例如:一个主机厂有多个车型,每个车型下面有多个车联网业务,我们在定义车机向对某个车型业务系统发消息时可以向<车型A>/ <车辆唯一标识>/<业务X>主题发消息;
当然在MQTT世界中主题可以有很多层(MQTT 协议中没有限制层级数量),比如:<车型A>/<车辆唯一标识(车架号)>/<业务X>/<子业务1>
这样,我们在定义车联网分层级的业务通道的时候可以按主题层级来设计。

topic中通配符

多层通配符#

#字符号(“#” U+0023)是用于匹配主题中任意层级的通配符。多层通配符表示它的父级和任意数量的子层级。如:订阅者可以通过订阅<车型A>/# 接收到:
<车型A>
<车型A>/<车架号1>
<车型A>/<车架号1>/<业务X>
这几类主题的消息。

单层通配符+

加号 (“+” U+002B) 用于单个主题层级匹配的通配符。如:订阅者可以通过订阅<车型A>/+ 来接收
<车型A>/<车架号1>
<车型A>/<车架号2>
不同于多层通配符,使用单层通配符的时候无法匹配子层级的主题,比如:<车型A>/<车架号1>/<业务X>的主题消息就无法接收到。

相关文章:

springboot集成mqtt

引入jar包 <dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId> </dependency> <dependency><groupId>com.alibaba</groupId><artifactId>fastjs…...

Lecture3 梯度下降(Gradient Descent)

目录 1 问题背景 2 批量梯度下降 (Batch Gradient Descent) 3 鞍点(Saddle Point) 3 随机梯度下降 (Stochastic Gradient Descent) 4 小批量梯度下降 (Mini-batch Gradient Descent) 1 问题背景 图1 上节课讲述的穷举法求最优权重值在Lecture2中&#xff0c;介绍了使用穷举…...

深入了解DSP

一、时钟和电源 问&#xff1a;DSP的电源设计和时钟设计应该特别注意哪些方面&#xff1f;外接晶振选用有源的好还是无源的好&#xff1f; 答&#xff1a;时钟一般使用晶体&#xff0c;电源可用TI的配套电源。外接晶振用无源的好。 问&#xff1a;TMS320LF2407的A/D转换精度保证…...

Flink反压如何排查

Flink反压利用了网络传输和动态限流。Flink的任务的组成由流和算子组成&#xff0c;那么流中的数据在算子之间转换的时候&#xff0c;会放入分布式的阻塞队列中。当消费者的阻塞队列满的时候&#xff0c;则会降低生产者的处理速度。 如上图所示&#xff0c;当Task C 的数据处…...

windows无法访问指定设备路径或文件怎么办?2个解决方案

有时候Win10电脑打不开程序或文件&#xff0c;windows无法访问指定设备路径或文件该怎么办&#xff1f;原因是什么呢&#xff1f;一般导致这种情况的出现&#xff0c;大多是因为我们的电脑缺乏相应的查看权限&#xff0c;我们只需要通过赋予权限就可以解决这个难题了。 操作环境…...

冷知识|鹤顶红还能用来修长城?

大家好&#xff0c;我是建模助手。 在上篇浅浅地蹭了波热点之后&#xff0c;我灵机一动&#xff0c;倒不如也搞一搞建筑方面的冷知识&#xff1f;冷热搭配&#xff0c;事半功倍... 问问大家&#xff0c;如果谈起古建筑&#xff0c;关键词都有什么&#xff1f;是庄严、震撼、壮…...

【GD32F427开发板试用】在IAR环境中移植RTX5

本篇文章来自极术社区与兆易创新组织的GD32F427开发板评测活动&#xff0c;更多开发板试用活动请关注极术社区网站。作者&#xff1a;吴金刚 0.前言 首先感谢极术社区和兆易创新给了这次试用GD32F427开发板的机会。 板子做的虽然简约&#xff0c;但是自带了GD-link所以一根USB…...

MySQl学习(从入门到精通15)

MySQl学习&#xff08;从入门到精通15&#xff09;第 18 章_MySQL 8 其它新特性1. MySQL 8 新特性概述1. 1 MySQL 8. 0 新增特性1. 2 MySQL 8. 0 移除的旧特性2. 新特性 1 &#xff1a;窗口函数2. 1 使用窗口函数前后对比2. 2 窗口函数分类2. 3 语法结构2. 4 分类讲解1. 序号函…...

前端构建工具 Vite

文章目录参考环境构建工具构建工具的主要功能目前主流的前端构建工具Vite为什么使用 Vite冷启动WebpackVite热更新优化热更新优化预构建依赖Webpack VS ViteVite 的缺点首屏性能懒加载与 Vite 相关的基本操作获取create-vite创建项目Project nameSelect a frameworkSelect a va…...

若依框架---PageHelper分页(十)

在前几天的文章中&#xff0c;我们介绍了PageHelper的分页方法&#xff0c;研读代码定位到了ExecutorUtil.pageQuery(...)方法&#xff0c;并阅读到了其中的部分代码。 今天我们将看到重要的SQL修改代码。 getPageSql 我们接着看代码&#xff1a; if (!dialect.beforePage(…...

苹果手机专用蓝牙耳机有哪些?与iphone兼容性好的蓝牙耳机

蓝牙耳机摆脱了线缆的束缚&#xff0c;在地以各种方式轻松通话。自从蓝牙耳机问世以来&#xff0c;一直是行动商务族提升效率的好工具&#xff0c;苹果产品一直都是受欢迎的数码产品&#xff0c;下面推荐几款与iphone兼容性好的蓝牙耳机。 第一款&#xff1a;南卡小音舱蓝牙耳…...

CS-TPGS;壳聚糖修饰维生素E;Chitosan-g-TPGS

Chitosan-g-TPGS,CS-TPGS壳聚糖修饰维生素E聚乙二醇1000琥珀酸酯外观呈现白色固体或者粘稠液体。长期保存需要在-20℃,避光,干燥条件下存放&#xff0c;注意取用一定要干燥,避免频繁溶冻。 维生素E聚乙二醇琥珀酸酯(简称TPGS)是维生素E的水溶性衍生物,由维生素E琥珀酸酯的羧基与…...

easyx的基本使用(万字解析)

easyx的基本使用一.基本框架1.创建文件2.创建窗体-initgraph,closegraph,getchar二.简单的绘制1.圆形-circle2.坐标系统-setorigin,setaspectratio三.简单图形1.绘制点-putpixel2.简单的直线-line3.矩形-rectangle4.椭圆-ellipse5.圆角矩形-roundrect6.扇形-pie7.圆弧-arc四.多…...

基于OpenCV 的车牌识别

基于OpenCV 的车牌识别 车牌识别是一种图像处理技术&#xff0c;用于识别不同车辆。这项技术被广泛用于各种安全检测中。现在让我一起基于 OpenCV 编写 Python 代码来完成这一任务。 车牌识别的相关步骤 1. 车牌检测&#xff1a;第一步是从汽车上检测车牌所在位置。我们将使用…...

C#【必备技能篇】Winform跨线程更新进度条的实例

文章目录实例一&#xff1a;【方便理解&#xff0c;常用&#xff01;】源码&#xff1a;运行效果&#xff1a;实例二&#xff1a;【重在理解代码本身】源码&#xff1a;运行效果&#xff1a;参考&#xff1a;实例一&#xff1a;【方便理解&#xff0c;常用&#xff01;】 跨线…...

(1分钟速通面试) 矩阵分解相关内容

矩阵分解算法--总结QR分解 LU分解本篇博客总结一下QR分解和LU分解&#xff0c;这些都是矩阵加速的操作&#xff0c;在slam里面还算是比较常用的内容&#xff0c;这个地方在isam的部分出现过。(当然isam也是一个坑&#xff0c;想要出点创新成果的话 可能是不太现实的 短期来讲 哈…...

this指向

&#xff08;1&#xff09;在全局环境中的this——window 无论是否在严格模式下&#xff0c;在全局执行环境中&#xff08;在任何函数体外部&#xff09;this 都指向全局对象。 "use strict"console.log(this); //windowconsole.log(thiswindow);//true &#xff08…...

安卓小游戏:小板弹球

安卓小游戏&#xff1a;小板弹球 前言 这个是通过自定义View实现小游戏的第三篇&#xff0c;是小时候玩的那种五块钱的游戏机上的&#xff0c;和俄罗斯方块很像&#xff0c;小时候觉得很有意思&#xff0c;就模仿了一下。 需求 这里的逻辑就是板能把球弹起来&#xff0c;球…...

7、单行函数

文章目录1 函数的理解1.1 什么是函数1.2 不同DBMS函数的差异1.3 MySQL的内置函数及分类2 数值函数2.1 基本函数2.2 角度与弧度互换函数2.3 三角函数2.4 指数与对数2.5 进制间的转换3 字符串函数4 日期和时间函数4.1 获取日期、时间4.2 日期与时间戳的转换4.3 获取月份、星期、星…...

华为机试题:HJ56 完全数计算(python)

文章目录博主精品专栏导航知识点详解1、input()&#xff1a;获取控制台&#xff08;任意形式&#xff09;的输入。输出均为字符串类型。1.1、input() 与 list(input()) 的区别、及其相互转换方法2、print() &#xff1a;打印输出。3、整型int() &#xff1a;将指定进制&#xf…...

日语学习-日语知识点小记-构建基础-JLPT-N4阶段(33):にする

日语学习-日语知识点小记-构建基础-JLPT-N4阶段(33):にする 1、前言(1)情况说明(2)工程师的信仰2、知识点(1) にする1,接续:名词+にする2,接续:疑问词+にする3,(A)は(B)にする。(2)復習:(1)复习句子(2)ために & ように(3)そう(4)にする3、…...

(二)TensorRT-LLM | 模型导出(v0.20.0rc3)

0. 概述 上一节 对安装和使用有个基本介绍。根据这个 issue 的描述&#xff0c;后续 TensorRT-LLM 团队可能更专注于更新和维护 pytorch backend。但 tensorrt backend 作为先前一直开发的工作&#xff0c;其中包含了大量可以学习的地方。本文主要看看它导出模型的部分&#x…...

Java - Mysql数据类型对应

Mysql数据类型java数据类型备注整型INT/INTEGERint / java.lang.Integer–BIGINTlong/java.lang.Long–––浮点型FLOATfloat/java.lang.FloatDOUBLEdouble/java.lang.Double–DECIMAL/NUMERICjava.math.BigDecimal字符串型CHARjava.lang.String固定长度字符串VARCHARjava.lang…...

Linux-07 ubuntu 的 chrome 启动不了

文章目录 问题原因解决步骤一、卸载旧版chrome二、重新安装chorme三、启动不了&#xff0c;报错如下四、启动不了&#xff0c;解决如下 总结 问题原因 在应用中可以看到chrome&#xff0c;但是打不开(说明&#xff1a;原来的ubuntu系统出问题了&#xff0c;这个是备用的硬盘&a…...

Mac下Android Studio扫描根目录卡死问题记录

环境信息 操作系统: macOS 15.5 (Apple M2芯片)Android Studio版本: Meerkat Feature Drop | 2024.3.2 Patch 1 (Build #AI-243.26053.27.2432.13536105, 2025年5月22日构建) 问题现象 在项目开发过程中&#xff0c;提示一个依赖外部头文件的cpp源文件需要同步&#xff0c;点…...

ip子接口配置及删除

配置永久生效的子接口&#xff0c;2个IP 都可以登录你这一台服务器。重启不失效。 永久的 [应用] vi /etc/sysconfig/network-scripts/ifcfg-eth0修改文件内内容 TYPE"Ethernet" BOOTPROTO"none" NAME"eth0" DEVICE"eth0" ONBOOT&q…...

10-Oracle 23 ai Vector Search 概述和参数

一、Oracle AI Vector Search 概述 企业和个人都在尝试各种AI&#xff0c;使用客户端或是内部自己搭建集成大模型的终端&#xff0c;加速与大型语言模型&#xff08;LLM&#xff09;的结合&#xff0c;同时使用检索增强生成&#xff08;Retrieval Augmented Generation &#…...

IP如何挑?2025年海外专线IP如何购买?

你花了时间和预算买了IP&#xff0c;结果IP质量不佳&#xff0c;项目效率低下不说&#xff0c;还可能带来莫名的网络问题&#xff0c;是不是太闹心了&#xff1f;尤其是在面对海外专线IP时&#xff0c;到底怎么才能买到适合自己的呢&#xff1f;所以&#xff0c;挑IP绝对是个技…...

2025年渗透测试面试题总结-腾讯[实习]科恩实验室-安全工程师(题目+回答)

安全领域各种资源&#xff0c;学习文档&#xff0c;以及工具分享、前沿信息分享、POC、EXP分享。不定期分享各种好玩的项目及好用的工具&#xff0c;欢迎关注。 目录 腾讯[实习]科恩实验室-安全工程师 一、网络与协议 1. TCP三次握手 2. SYN扫描原理 3. HTTPS证书机制 二…...

Chromium 136 编译指南 Windows篇:depot_tools 配置与源码获取(二)

引言 工欲善其事&#xff0c;必先利其器。在完成了 Visual Studio 2022 和 Windows SDK 的安装后&#xff0c;我们即将接触到 Chromium 开发生态中最核心的工具——depot_tools。这个由 Google 精心打造的工具集&#xff0c;就像是连接开发者与 Chromium 庞大代码库的智能桥梁…...