当前位置: 首页 > news >正文

springboot集成mqtt

引入jar包

<dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId>
</dependency>

配置

mqtt:username: admin  #用户名password: admin  #密码address: tcp://192.168.236.136:1883   #地址通过TCP访问,realtime:topicprefix: /realtime/    #实时数据消息topic前缀

mqtt生产者配置类

@Configuration
@IntegrationComponentScan
public class MqttConfig {@Value("${mqtt.username}")private String userName;@Value("${mqtt.password}")private String passWord;@Value("${mqtt.address}")private String address;@Beanpublic MqttPahoClientFactory mqttClientFactory() {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();MqttConnectOptions options = new MqttConnectOptions();options.setCleanSession(true);options.setConnectionTimeout(10);options.setKeepAliveInterval(90);options.setAutomaticReconnect(true);options.setKeepAliveInterval(2);options.setServerURIs(new String[]{address});options.setUserName(userName);options.setPassword(passWord.toCharArray());factory.setConnectionOptions(options);return factory;}/*** @return 创建推送通道。*/@Beanpublic MessageChannel mqttOutboundChannel() {return new DirectChannel();}/*** @return 默认输出。*/@Bean@ServiceActivator(inputChannel = "mqttOutboundChannel")public MessageHandler outbound() {// 在这里进行mqttOutboundChannel的相关设置MqttPahoMessageHandler messageHandler =new MqttPahoMessageHandler("publishClient", mqttClientFactory());messageHandler.setAsync(true); //如果设置成true,发送消息时将不会阻塞。messageHandler.setDefaultTopic("mqttTopic");return messageHandler;}/*** 推送MQTT的网关。*/@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")public interface MqttSender {/*** 发送信息到MQTT服务器** @param data 发送的文本*/void sendToMqtt(String data);/*** 发送信息到MQTT服务器** @param topic   主题* @param payload 消息主体*/void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,String payload);/*** 发送信息到MQTT服务器** @param topic   主题* @param qos     对消息处理的几种机制。<br> 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。<br>*                1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。<br>*                2 多了一次去重的动作,确保订阅者收到的消息有一次。* @param payload 消息主体*/void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,@Header(MqttHeaders.QOS) int qos,String payload);}
}

mqtt生产者测试

@RestController
@RequestMapping("/v1/mqtt")
public class MqttController {@Autowiredprivate MqttConfig.MqttSender mqttSender;@PostMapping("/v1/create")public HttpResult<Boolean> create(@RequestBody final RemoteControlRequestBean requestBean) {mqttSender.sendToMqtt(requestBean.getTopic(), 0, requestBean.getMsg());return DefaultHttpResultFactory.success("发送成功",Boolean.TRUE);}
}

mqtt消费者配置

@Slf4j
@Configuration
@IntegrationComponentScan
public class MqttConfig {@Value("${mqtt.username}")private String userName;@Value("${mqtt.password}")private String passWord;@Value("${mqtt.address}")private String address;@Beanpublic MqttPahoClientFactory mqttClientFactory() {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();MqttConnectOptions options = new MqttConnectOptions();options.setCleanSession(true);options.setConnectionTimeout(10);options.setKeepAliveInterval(90);options.setAutomaticReconnect(true);options.setKeepAliveInterval(2);options.setServerURIs(new String[]{address});options.setUserName(userName);options.setPassword(passWord.toCharArray());factory.setConnectionOptions(options);return factory;}@Beanpublic MessageProducer inbound() {String[] topics = new String[]{"test","topicTest","123"};MqttPahoMessageDrivenChannelAdapter adapter =new MqttPahoMessageDrivenChannelAdapter( "123_consumer", mqttClientFactory(), topics);adapter.setCompletionTimeout(30000);DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();converter.setPayloadAsBytes(true);adapter.setConverter(converter);adapter.setQos(2);adapter.setOutputChannel(mqttInputChannel());return adapter;}/*** MQTT信息通道(消费者)**/@Beanpublic MessageChannel mqttInputChannel() {return new DirectChannel();}/*** MQTT消息处理器(消费者)**/@Bean@ServiceActivator(inputChannel = "mqttInputChannel")public MessageHandler handler() {return new MessageHandler() {@Overridepublic void handleMessage(Message<?> message) throws MessagingException {log.info("收到订阅消息: {}", message);String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString();log.info("消息主题:{}", topic);Object payLoad = message.getPayload();log.info("发送的Packet数据{}", JSON.toJSONString(payLoad));}};}
}

topic定义的核心是区分业务场景。

mqtt中topic定义和规范

所有的主题名和主题过滤器必须至少包含一个字符。
主题名和主题过滤器是大小写敏感的。如:ACCOUNTS 和 Accounts 是不同的主题名。
主题名和主题过滤器可以包含空格字符。如:Accounts payable 是合法的主题名
主题名或主题过滤器以前置或后置斜杠 / 区分。如:/finance 和 finance 是不同的。
只包含斜杠 / 的主题名或主题过滤器是合法的。
主题名和主题过滤器不能包含 null 字符(Unicode U+0000)。
主题名和主题过滤器是 UTF-8 编码字符串,除了不能超过 UTF-8 编码字符串的长度限制之外,主题名或主题过滤器的层级数量没有其它限制。

mqtt中topic层级

MQTT 协议主题可以通过斜杠(“/” U+002F)将主题分割成多个层级;作为消息通道,客户端可以通过定义主题层级来实现对消息类型的细分;
例如:一个主机厂有多个车型,每个车型下面有多个车联网业务,我们在定义车机向对某个车型业务系统发消息时可以向<车型A>/ <车辆唯一标识>/<业务X>主题发消息;
当然在MQTT世界中主题可以有很多层(MQTT 协议中没有限制层级数量),比如:<车型A>/<车辆唯一标识(车架号)>/<业务X>/<子业务1>
这样,我们在定义车联网分层级的业务通道的时候可以按主题层级来设计。

topic中通配符

多层通配符#

#字符号(“#” U+0023)是用于匹配主题中任意层级的通配符。多层通配符表示它的父级和任意数量的子层级。如:订阅者可以通过订阅<车型A>/# 接收到:
<车型A>
<车型A>/<车架号1>
<车型A>/<车架号1>/<业务X>
这几类主题的消息。

单层通配符+

加号 (“+” U+002B) 用于单个主题层级匹配的通配符。如:订阅者可以通过订阅<车型A>/+ 来接收
<车型A>/<车架号1>
<车型A>/<车架号2>
不同于多层通配符,使用单层通配符的时候无法匹配子层级的主题,比如:<车型A>/<车架号1>/<业务X>的主题消息就无法接收到。

相关文章:

springboot集成mqtt

引入jar包 <dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId> </dependency> <dependency><groupId>com.alibaba</groupId><artifactId>fastjs…...

Lecture3 梯度下降(Gradient Descent)

目录 1 问题背景 2 批量梯度下降 (Batch Gradient Descent) 3 鞍点(Saddle Point) 3 随机梯度下降 (Stochastic Gradient Descent) 4 小批量梯度下降 (Mini-batch Gradient Descent) 1 问题背景 图1 上节课讲述的穷举法求最优权重值在Lecture2中&#xff0c;介绍了使用穷举…...

深入了解DSP

一、时钟和电源 问&#xff1a;DSP的电源设计和时钟设计应该特别注意哪些方面&#xff1f;外接晶振选用有源的好还是无源的好&#xff1f; 答&#xff1a;时钟一般使用晶体&#xff0c;电源可用TI的配套电源。外接晶振用无源的好。 问&#xff1a;TMS320LF2407的A/D转换精度保证…...

Flink反压如何排查

Flink反压利用了网络传输和动态限流。Flink的任务的组成由流和算子组成&#xff0c;那么流中的数据在算子之间转换的时候&#xff0c;会放入分布式的阻塞队列中。当消费者的阻塞队列满的时候&#xff0c;则会降低生产者的处理速度。 如上图所示&#xff0c;当Task C 的数据处…...

windows无法访问指定设备路径或文件怎么办?2个解决方案

有时候Win10电脑打不开程序或文件&#xff0c;windows无法访问指定设备路径或文件该怎么办&#xff1f;原因是什么呢&#xff1f;一般导致这种情况的出现&#xff0c;大多是因为我们的电脑缺乏相应的查看权限&#xff0c;我们只需要通过赋予权限就可以解决这个难题了。 操作环境…...

冷知识|鹤顶红还能用来修长城?

大家好&#xff0c;我是建模助手。 在上篇浅浅地蹭了波热点之后&#xff0c;我灵机一动&#xff0c;倒不如也搞一搞建筑方面的冷知识&#xff1f;冷热搭配&#xff0c;事半功倍... 问问大家&#xff0c;如果谈起古建筑&#xff0c;关键词都有什么&#xff1f;是庄严、震撼、壮…...

【GD32F427开发板试用】在IAR环境中移植RTX5

本篇文章来自极术社区与兆易创新组织的GD32F427开发板评测活动&#xff0c;更多开发板试用活动请关注极术社区网站。作者&#xff1a;吴金刚 0.前言 首先感谢极术社区和兆易创新给了这次试用GD32F427开发板的机会。 板子做的虽然简约&#xff0c;但是自带了GD-link所以一根USB…...

MySQl学习(从入门到精通15)

MySQl学习&#xff08;从入门到精通15&#xff09;第 18 章_MySQL 8 其它新特性1. MySQL 8 新特性概述1. 1 MySQL 8. 0 新增特性1. 2 MySQL 8. 0 移除的旧特性2. 新特性 1 &#xff1a;窗口函数2. 1 使用窗口函数前后对比2. 2 窗口函数分类2. 3 语法结构2. 4 分类讲解1. 序号函…...

前端构建工具 Vite

文章目录参考环境构建工具构建工具的主要功能目前主流的前端构建工具Vite为什么使用 Vite冷启动WebpackVite热更新优化热更新优化预构建依赖Webpack VS ViteVite 的缺点首屏性能懒加载与 Vite 相关的基本操作获取create-vite创建项目Project nameSelect a frameworkSelect a va…...

若依框架---PageHelper分页(十)

在前几天的文章中&#xff0c;我们介绍了PageHelper的分页方法&#xff0c;研读代码定位到了ExecutorUtil.pageQuery(...)方法&#xff0c;并阅读到了其中的部分代码。 今天我们将看到重要的SQL修改代码。 getPageSql 我们接着看代码&#xff1a; if (!dialect.beforePage(…...

苹果手机专用蓝牙耳机有哪些?与iphone兼容性好的蓝牙耳机

蓝牙耳机摆脱了线缆的束缚&#xff0c;在地以各种方式轻松通话。自从蓝牙耳机问世以来&#xff0c;一直是行动商务族提升效率的好工具&#xff0c;苹果产品一直都是受欢迎的数码产品&#xff0c;下面推荐几款与iphone兼容性好的蓝牙耳机。 第一款&#xff1a;南卡小音舱蓝牙耳…...

CS-TPGS;壳聚糖修饰维生素E;Chitosan-g-TPGS

Chitosan-g-TPGS,CS-TPGS壳聚糖修饰维生素E聚乙二醇1000琥珀酸酯外观呈现白色固体或者粘稠液体。长期保存需要在-20℃,避光,干燥条件下存放&#xff0c;注意取用一定要干燥,避免频繁溶冻。 维生素E聚乙二醇琥珀酸酯(简称TPGS)是维生素E的水溶性衍生物,由维生素E琥珀酸酯的羧基与…...

easyx的基本使用(万字解析)

easyx的基本使用一.基本框架1.创建文件2.创建窗体-initgraph,closegraph,getchar二.简单的绘制1.圆形-circle2.坐标系统-setorigin,setaspectratio三.简单图形1.绘制点-putpixel2.简单的直线-line3.矩形-rectangle4.椭圆-ellipse5.圆角矩形-roundrect6.扇形-pie7.圆弧-arc四.多…...

基于OpenCV 的车牌识别

基于OpenCV 的车牌识别 车牌识别是一种图像处理技术&#xff0c;用于识别不同车辆。这项技术被广泛用于各种安全检测中。现在让我一起基于 OpenCV 编写 Python 代码来完成这一任务。 车牌识别的相关步骤 1. 车牌检测&#xff1a;第一步是从汽车上检测车牌所在位置。我们将使用…...

C#【必备技能篇】Winform跨线程更新进度条的实例

文章目录实例一&#xff1a;【方便理解&#xff0c;常用&#xff01;】源码&#xff1a;运行效果&#xff1a;实例二&#xff1a;【重在理解代码本身】源码&#xff1a;运行效果&#xff1a;参考&#xff1a;实例一&#xff1a;【方便理解&#xff0c;常用&#xff01;】 跨线…...

(1分钟速通面试) 矩阵分解相关内容

矩阵分解算法--总结QR分解 LU分解本篇博客总结一下QR分解和LU分解&#xff0c;这些都是矩阵加速的操作&#xff0c;在slam里面还算是比较常用的内容&#xff0c;这个地方在isam的部分出现过。(当然isam也是一个坑&#xff0c;想要出点创新成果的话 可能是不太现实的 短期来讲 哈…...

this指向

&#xff08;1&#xff09;在全局环境中的this——window 无论是否在严格模式下&#xff0c;在全局执行环境中&#xff08;在任何函数体外部&#xff09;this 都指向全局对象。 "use strict"console.log(this); //windowconsole.log(thiswindow);//true &#xff08…...

安卓小游戏:小板弹球

安卓小游戏&#xff1a;小板弹球 前言 这个是通过自定义View实现小游戏的第三篇&#xff0c;是小时候玩的那种五块钱的游戏机上的&#xff0c;和俄罗斯方块很像&#xff0c;小时候觉得很有意思&#xff0c;就模仿了一下。 需求 这里的逻辑就是板能把球弹起来&#xff0c;球…...

7、单行函数

文章目录1 函数的理解1.1 什么是函数1.2 不同DBMS函数的差异1.3 MySQL的内置函数及分类2 数值函数2.1 基本函数2.2 角度与弧度互换函数2.3 三角函数2.4 指数与对数2.5 进制间的转换3 字符串函数4 日期和时间函数4.1 获取日期、时间4.2 日期与时间戳的转换4.3 获取月份、星期、星…...

华为机试题:HJ56 完全数计算(python)

文章目录博主精品专栏导航知识点详解1、input()&#xff1a;获取控制台&#xff08;任意形式&#xff09;的输入。输出均为字符串类型。1.1、input() 与 list(input()) 的区别、及其相互转换方法2、print() &#xff1a;打印输出。3、整型int() &#xff1a;将指定进制&#xf…...

opencv——傅里叶变换、低通与高通滤波及直方图等操作

1、傅里叶变换a、傅里叶变换原理时域分析&#xff1a;以时间为参照进行分析。频域分析&#xff1a;相当于上帝视角一样&#xff0c;看事物层次更高&#xff0c;时域的运动在频域来看就是静止的。eg&#xff1a;投球——时域分析&#xff1a;第1分钟投了3分&#xff0c;第2分钟投…...

【NGINX入门指北】 进阶篇

nginx 进阶篇 文章目录nginx 进阶篇一、Nginx Proxy 服务器1、代理原理2、proxy代理3、proxy缓存一、Nginx Proxy 服务器 1、代理原理 正向代理 内网客户机通过代理访问互联网&#xff0c;通常要设置代理服务器地址和端口。 反向代理 外网用户通过代理访问内网服务器&…...

Python中关于@修饰符、yeild关键词、next()函数的基本功能简述

关于修饰符&#xff1a;其实就是将修饰符下面的函数当成参数传给它上面的函数。 def a(x):print(a)adef b():print(b) 其效果等价为&#xff1a; def a(x):print(a)def b():print(b)a(b())有个记忆诀窍&#xff0c;的下面哪个函数最近&#xff0c;谁就是儿子&#xff0c;谁就…...

结合Coverity扫描Spring Boot项目进行Path Manipulation漏洞修复

本篇介绍使用Coverity 扫描基于Spring Boot 项目中的Path Manipulation 漏洞, 进而解决风险,并且可以通过扫描。 什么样的代码会被扫描有路径操纵风险? 在Spring Boot 项目中, 实验了如下的场景: 1. Control 中 file path 作为参数传递的会被扫描,单纯服务方法不会 场…...

【FFMPEG源码分析】从ffplay源码摸清ffmpeg框架(一)

ffplay入口 ffmpeg\fftools\ffplay.c int main(int argc, char **argv) {/*******************start 动态库加载/网络初始化等**************/int flags;VideoState *is;init_dynload();av_log_set_flags(AV_LOG_SKIP_REPEATED);parse_loglevel(argc, argv, options);/* regis…...

C++蓝桥杯 基础练习,高精度加法,输入两个整数a和b,输出这两个整数的和。a和b都不超过100位。

C蓝桥杯 基础练习&#xff0c;高精度加法 问题描述 输入两个整数a和b&#xff0c;输出这两个整数的和。a和b都不超过100位。 算法描述 由于a和b都比较大&#xff0c;所以不能直接使用语言中的标准数据类型来存储。对于这种问题&#xff0c;一般使用数组来处理。   定义一…...

MySQL面试题:SQL语句的基本语法

MySQL目录一、数据库入门1. 数据管理技术的三个阶段2. 关系型数据库与非关系型数据库3. 四大非关系型数据库a. 基于列的数据库&#xff08;column-oriented&#xff09;b. 键值对存储&#xff08;Key-Value Stores&#xff09;c. 文档存储&#xff08;Document Stores&#xff…...

Fluid-数据编排能力原理解析

前言本文对Fluid基础功能-数据编排能力进行原理解析。其中涉及到Fluid架构和k8s csi driver相关知识。建议先了解相关概念&#xff0c;为了便于理解&#xff0c;本文使用JuiceFS作为后端runtime引擎。原理概述Fuild数据编排能力&#xff0c;主要是在云原生环境中&#xff0c;能…...

并发线程、锁、ThreadLocal

并发编程并发编程Java内存模型&#xff08;JMM&#xff09;并发编程核心问题—可见性、原子性、有序性volatile关键字原子性原子类CAS(Compare-And-Swap 比较并交换)ABA问题Java中的锁乐观锁和悲观锁可重入锁读写锁分段锁自旋锁共享锁/独占锁公平锁/非公平锁偏向锁/轻量级锁/重…...

CMMI-结项管理

结项管理&#xff08;ProjectClosing Management, PCM&#xff09;是指在项目开发工作结束后&#xff0c;对项目的有形资产和无形资产进行清算&#xff1b;对项目进行综合评估&#xff1b;总结经验教训等。结项管理过程域是SPP模型的重要组成部分。本规范阐述了结项管理的规程&…...

南京设计公司前十名/深圳优化怎么做搜索

大家好&#xff0c;9月14日&#xff08;周五&#xff09;【免费送书】说说哪本书曾经让你爱不释手活动中获奖名单如下&#xff0c;排名1—10的留言读者免费赠送图书一本&#xff0c;图书从活动里的书单中任选&#xff0c;由电子工业出版社博文视点提供&#xff1b;排名11—20的…...

wordpress块引用美化/百度关键词排名怎么查

在阅读本文之前,你应该阅读过的系列: 《Flink重点难点:时间、窗口和流Join》 《Flink重点难点:网络流控和反压》 《Flink重点难点:维表关联理论和Join实战》 《Flink重点难点:内存模型与内存结构》 《Flink重点难点:Flink Table&SQL必知必会(一)》 Flink重点难点:F…...

如何创建微网站/网站推广工作

2019独角兽企业重金招聘Python工程师标准>>> &#xfeff;Java8 BASE64编解码 Base64是一种用64个字符来表示任意二进制数据的方法。 Base64是一种最常见的二进制编码方法。 Java一直缺少BASE64编码 API&#xff0c;以至于通常在项目开发中会选用第三方的API实现。但…...

seo 网站换程序/北京网站制作设计

2019独角兽企业重金招聘Python工程师标准>>> 0.AFN框架基本使用 0.1 AFN内部结构AFN结构体- NSURLConnection AFURLConnectionOperation(已经被废弃) AFHTTPRequestOperation(已经被废弃) AFHTTPRequestOperationManager(封装了常用的 HTTP 方法)(已经被废弃)* 属性…...

域名购买查询/seo权重优化

Algorithm 直接插入排序 算法概述&#xff1a; 把前面的数组排好序&#xff0c;后面的元素插入到排好序的数组中。 实现思路&#xff1a; 把数组分为有序和无序两部分&#xff0c;分别用下标去指向&#xff08;有序&#xff08;j&#xff09;&#xff0c; 无序&#xff08…...

网站维护专业/网站快速排名优化价格

实验三(2009-9-24)一、 实验名称:运算符与表达式。二、 实验目的&#xff1a;(1) 掌握C语言中常用运算符的基本功能&#xff0c;以及优先级与结合性&#xff1b;(2) 正确使用运算符和数据实体构建表达式&#xff0c;并表达式的计算过程&#xff1b;(3) 进一步熟悉Visual C6.0开…...