spring boot集成mqtt协议发送和订阅数据
maven的pom.xml引入包
<!--mqtt--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-integration</artifactId><version>2.3.6.RELEASE</version></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-stream</artifactId><version>5.3.4.RELEASE</version></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId><version>5.3.4.RELEASE</version></dependency>
mqtt.yml配置文件
spring:mqtt:username: adminpassword: beyond_2021url: tcp://192.168.3.100:1883client-id: data-clientIdserver-id: data-serverIddata-topic: data/#will-topic: data-willwill-content: data server offlinecompletion-timeout: 10000
初始化MQTT配置bean
package com.beyond.config;import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.EventListener;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent;
import org.springframework.integration.mqtt.event.MqttMessageDeliveredEvent;
import org.springframework.integration.mqtt.event.MqttMessageSentEvent;
import org.springframework.integration.mqtt.event.MqttSubscribedEvent;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;import java.security.SecureRandom;
import java.util.Date;@Configuration
@IntegrationComponentScan
public class MqttConfig {private static final Logger log = LoggerFactory.getLogger(MqttConfig.class);@Value("${spring.mqtt.username}")private String username;@Value("${spring.mqtt.password}")private String password;@Value("${spring.mqtt.url}")private String hostUrl;@Value("${spring.mqtt.client-id}")private String clientId;@Value("${spring.mqtt.server-id}")private String serverId;@Value("${spring.mqtt.data-topic:data/#}")private String dataTopic;@Value("${spring.mqtt.will-topic}")private String willTopic;@Value("${spring.mqtt.will-content}")private String willContent;/*** @desc 连接超时*/@Value("${spring.mqtt.completion-timeout}")private int completionTimeout ;@Beanpublic MqttConnectOptions getMqttConnectOptions(){// MQTT的连接设置MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();// 设置连接的用户名mqttConnectOptions.setUserName(username);// 设置连接的密码mqttConnectOptions.setPassword(password.toCharArray());// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,// 把配置里的 cleanSession 设为false,客户端掉线后 服务器端不会清除session,// 当重连后可以接收之前订阅主题的消息。当客户端上线后会接受到它离线的这段时间的消息mqttConnectOptions.setCleanSession(true);// 设置发布端地址,多个用逗号分隔, 如:tcp://111:1883,tcp://222:1883// 当第一个111连接上后,222不会在连,如果111挂掉后,重试连111几次失败后,会自动去连接222mqttConnectOptions.setServerURIs(hostUrl.split(","));// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制mqttConnectOptions.setKeepAliveInterval(20);mqttConnectOptions.setAutomaticReconnect(true);// 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。mqttConnectOptions.setWill(willTopic, willContent.getBytes(), 2, false);mqttConnectOptions.setMaxInflight(1000000);return mqttConnectOptions;}@Beanpublic MqttPahoClientFactory mqttClientFactory() {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();factory.setConnectionOptions(getMqttConnectOptions());return factory;}/*** @desc 发送通道配置 默认主题* @date 2021/3/16*/@Bean@ServiceActivator(inputChannel = "mqttOutboundChannel")public MessageHandler mqttOutbound() {//clientId每个连接必须唯一,否则,两个相同的clientId相互挤掉线String clientIdStr = clientId + new SecureRandom().nextInt(10);MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientIdStr, mqttClientFactory());//async如果为true,则调用方不会阻塞。而是在发送消息时等待传递确认。默认值为false(发送将阻塞,直到确认发送)messageHandler.setAsync(true);messageHandler.setAsyncEvents(true);messageHandler.setDefaultTopic(dataTopic);messageHandler.setDefaultQos(1);return messageHandler;}/*** @desc 发送通道* @date 2021/3/16*/@Beanpublic MessageChannel mqttOutboundChannel() {return new DirectChannel();}/*** @desc 接收通道* @date 2021/3/16*/@Beanpublic MessageChannel mqttInputChannel() {return new DirectChannel();}/*** @desc 配置监听的 topic 支持通配符* @date 2021/3/16*/@Beanpublic MessageProducer inbound() {//clientId每个连接必须唯一,否则,两个相同的clientId相互挤掉线String serverIdStr = serverId + new SecureRandom().nextInt(10);MqttPahoMessageDrivenChannelAdapter adapter =new MqttPahoMessageDrivenChannelAdapter(serverIdStr, mqttClientFactory(), dataTopic);adapter.setCompletionTimeout(completionTimeout);adapter.setConverter(new DefaultPahoMessageConverter());adapter.setQos(1);adapter.setOutputChannel(mqttInputChannel());return adapter;}/*** @desc 通过通道获取数据 订阅的数据* @date 2021/3/16*/@Bean@ServiceActivator(inputChannel = "mqttInputChannel")public MessageHandler handler() {return new MessageHandler() {@Overridepublic void handleMessage(Message<?> message) throws MessagingException {String payload = message.getPayload().toString();String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString();//处理订阅topic:(data/#)到的所有的数据}};}/*** @desc mqtt连接失败或者订阅失败时,触发MqttConnectionFailedEvent事件* @date 2021/7/22*@param event* @return void*/@EventListener(MqttConnectionFailedEvent.class)public void mqttConnectionFailedEvent(MqttConnectionFailedEvent event) {log.error("mqttConnectionFailedEvent连接mqtt失败: " +"date={}, hostUrl={}, username={}, error={}",new Date(), hostUrl, username, event.getCause().getMessage());}/*** @desc 当async和async事件(async-events)都为true时,将发出MqttMessageSentEvent* 它包含消息、主题、客户端库生成的消息id、clientId和clientInstance(每次连接客户端时递增)* @date 2021/7/22*@param event* @return void*/@EventListener(MqttMessageSentEvent.class)public void mqttMessageSentEvent(MqttMessageSentEvent event) {log.info("mqttMessageSentEvent发送信息: date={}, info={}", new Date(), event.toString());}/*** @desc 当async和async事件(async-events)都为true时,将发出MqttMessageDeliveredEvent* 当客户端库确认传递时,将发出MqttMessageDeliveredEvent。它包含messageId、clientId和clientInstance,使传递与发送相关。* @date 2021/7/22*@param event* @return void*/@EventListener(MqttMessageDeliveredEvent.class)public void mqttMessageDeliveredEvent(MqttMessageDeliveredEvent event) {log.info("mqttMessageDeliveredEvent发送成功信息: date={}, info={}", new Date(), event.toString());}/*** @desc 成功订阅到主题,MqttSubscribedEvent事件就会被触发(多个主题,多次触发)* @date 2021/7/22*@param event* @return void*/@EventListener(MqttSubscribedEvent.class)public void mqttSubscribedEvent(MqttSubscribedEvent event) {log.info("mqttSubscribedEvent订阅成功信息: date={}, info={}", new Date(), event.toString());}}
mqtt发送数据网关配置
package com.beyond.data.component;import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;/*** @desc MQTT发送网关* @date 2021/3/12*/
@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGatewayComponent {void sendToMqtt(String data);void sendToMqtt(String payload, @Header(MqttHeaders.TOPIC) String topic);void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);}
发送数据到mqtt伪代码
@Autowired
private MqttGatewayComponent mqttGatewayComponent;//发送字符串或json字符串,到指定的topic
mqttGatewayComponent.sendToMqtt("json string", "data/abcd");
参考链接:
https://blog.csdn.net/sinat_21184471/article/details/87186186
https://blog.csdn.net/qq_29467891/article/details/107043225?utm_source=app
https://blog.csdn.net/myinsert/article/details/107715538
相关文章:
spring boot集成mqtt协议发送和订阅数据
maven的pom.xml引入包 <!--mqtt--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-integration</artifactId><version>2.3.6.RELEASE</version></dependency><dependency…...

【数据库】详解数据库架构优化思路(两主架构、主从复制、冷热分离)
文章目录 1、为什么对数据库做优化2、双主架构双主架构的工作方式如下:双主架构的优势包括:但是一般不用这种架构,原因是: 3、主从复制主从复制的工作方式如下:主从复制的优势包括:主从复制的缺点 4、冷热分…...

el-table 实现动态表头 静态内容 根据数据显示动态输入框
直接放代码了 <el-table:data"form.tableDataA"borderstripestyle"width: 100%; margin-top: 20px"><el-table-columnv-for"(category, categoryIndex) in form.tableDataA":key"categoryIndex":label"category.name&qu…...

Reids 的整合 Spring Data Redis使用
大家好 , 我是苏麟 , 今天带来强大的Redis . REmote DIctionary Server(Redis) 是一个由 Salvatore Sanfilippo 写的 key-value 存储系统,是跨平台的非关系型数据库。 Redis 是一个开源的使用 ANSI C 语言编写、遵守 BSD 协议、支持网络、可基于内存、分布式、可选…...

3D数据转换工具HOOPS Exchange概览
HOOPS Exchange SDK是一组C软件库,使开发团队能够快速为其应用程序添加可靠的2D和3D CAD导入和导出功能。这允许访问广泛的数据,包括边界表示(BREP)、产品制造信息(PMI)、模型树、视图、持久ID、样式、构造…...
【从零开始的rust web开发之路 一】axum学习使用
系列文章目录 第一章 axum学习使用 文章目录 系列文章目录前言老规矩先看官方文档介绍高级功能兼容性 二、hello world三、路由四,handler和提取器五,响应 前言 本职java开发,兼架构设计。空闲时间学习了rust,目前还不熟练掌握。…...

oracle警告日志\跟踪日志磁盘空间清理
oracle警告日志\跟踪日志磁盘空间清理 问题现象: 通过查看排查到alert和tarce占用大量磁盘空间 警告日志 /u01/app/oracle/diag/rdbms/orcl/orcl/alert 跟踪日志 /u01/app/oracle/diag/rdbms/orcl/orcl/trace 解决方案: 用adrci清除日志 确定目…...
【vue】el-table 数据更新后,刷新表格数据
表格里面的数据更新后,可以通过以下方法来刷新表格 方法1 用更新后的数据,覆盖之前的数据 var newTableData[];for(var i0;i<that.tableData.length;i){ if(aIdthat.selectStationId&&bIdthat.selectDeviceId){that.tableData[i].physica…...

AVL——平衡搜索树
✅<1>主页:我的代码爱吃辣📃<2>知识讲解:数据结构——AVL树☂️<3>开发环境:Visual Studio 2022💬<4>前言:AVL树是对二叉搜索树的严格高度控制,所以AVL树的搜索效率很高…...
TCP通信流程以及一些TCP的相关概念
1.TCP和UDP区别 都为传输层协议 UDP:用户数据报协议,面向无连接,可以单播,多播,广播,面向数据报,不可靠 TCP:传输控制协议,面向连接的,可靠的,基…...

PyTorch学习笔记(十七)——完整的模型验证(测试,demo)套路
完整代码: import torch import torchvision from PIL import Image from torch import nnimage_path "../imgs/dog.png" image Image.open(image_path) print(image)# 因为png格式是四个通道,除了RGB三通道外,还有一个透明度通…...
WPF开篇
一、为什么要学习WPF 大环境不好,公司要求逐年提高,既要会后端又要会客户端WPF相对于WinForm来说用户界面效果更好,图像更加立体化也是给自己增加一项技能,谨记一句话,技多不压身;多一份技能就多一份竞争力…...
linux 压缩解压缩
压缩解压缩 linux中压缩和解压文件也是很常见的 zip格式 zip格式的压缩包在windows很常见,linux中也有zip格式的压缩包 #压缩#zip [选项] 压缩包名 文件(多个文件空格隔开)zip 1.zip 123.txt 456.txt zip -r 2.zip /home/user1 ---------------------- -r 压缩目录 …...

centos9 mysql8修改数据库的存储路径
一、环境 系统:CentOS Stream release 9 mysql版本:mysql Ver 8.0.34 for Linux on x86_64 (MySQL Community Server - GPL) 二、修改mysql的数据库,存储路径 查看目录数据存储的位置 cat /etc/my.cnf操作 1、新建存放的目录,…...
【C++】<Windows编程中消息即事件的处理>
目录 一、注册窗口类,指定消息处理函数,捕获消息并发给处理函数 二、消息处理函数 三、通用窗口消息 四、其他消息 1.滚动条消息 2.按钮控件消息 3.按钮控件通知消息 4.按键消息 5.系统菜单等消息 6.组合框控件消息 7.组合框控件通知消息 8.列…...
数据库SQL语句使用
-- 查询所有数据库 show databases; -- 创建数据库,数据库名为mydatabase create database mydatabase; -- 如果没有名为 mydatabase的数据库则创建,有就不创建 create database if not exists mydatabase; -- 如果没有名为 mydatabase的数据库则创建…...

从零开始 Spring Cloud 12:Sentinel
从零开始 Spring Cloud 12:Sentinel 1.初识 Sentinel 1.1雪崩问题 1.1.1什么是雪崩问题 微服务中,服务间调用关系错综复杂,一个微服务往往依赖于多个其它微服务。 如图,如果服务提供者I发生了故障,当前的应用的部分…...
@Resurce和@Autowired的区别
Resource 和 Autowired 是 Java 中常用的两个注解,用于自动装配依赖对象。它们的主要区别如下: 来源不同: Resource 是 Java EE 提供的注解,属于 J2EE 的一部分,它由 JSR-250 规范定义。 Autowired 是 Spring 框架提供…...

ResNet简介
ResNet (Residual Network) 此网络于2015年,国人何先生提出,用于解决随着深度学习的层数加深造成的网络退化现象和梯度消失、梯度爆炸。 问题1 退化现象 当深度学习的各项指标能够随着训练轮数收敛的情况下,网络的层数增强未能像理论一样&…...
了解单例模式,工厂模式(简单易懂)
文章目录 单例模式饿汉模式懒汉模式对比 工厂模式简单工厂模式(Simple Factory Pattern)工厂方法模式(Factory Method Pattern)抽象工厂模式(Abstract Factory Pattern)对比 单例模式 什么是单例ÿ…...

python打卡day49
知识点回顾: 通道注意力模块复习空间注意力模块CBAM的定义 作业:尝试对今天的模型检查参数数目,并用tensorboard查看训练过程 import torch import torch.nn as nn# 定义通道注意力 class ChannelAttention(nn.Module):def __init__(self,…...

剑指offer20_链表中环的入口节点
链表中环的入口节点 给定一个链表,若其中包含环,则输出环的入口节点。 若其中不包含环,则输出null。 数据范围 节点 val 值取值范围 [ 1 , 1000 ] [1,1000] [1,1000]。 节点 val 值各不相同。 链表长度 [ 0 , 500 ] [0,500] [0,500]。 …...

ETLCloud可能遇到的问题有哪些?常见坑位解析
数据集成平台ETLCloud,主要用于支持数据的抽取(Extract)、转换(Transform)和加载(Load)过程。提供了一个简洁直观的界面,以便用户可以在不同的数据源之间轻松地进行数据迁移和转换。…...
蓝桥杯 冶炼金属
原题目链接 🔧 冶炼金属转换率推测题解 📜 原题描述 小蓝有一个神奇的炉子用于将普通金属 O O O 冶炼成为一种特殊金属 X X X。这个炉子有一个属性叫转换率 V V V,是一个正整数,表示每 V V V 个普通金属 O O O 可以冶炼出 …...

Golang——6、指针和结构体
指针和结构体 1、指针1.1、指针地址和指针类型1.2、指针取值1.3、new和make 2、结构体2.1、type关键字的使用2.2、结构体的定义和初始化2.3、结构体方法和接收者2.4、给任意类型添加方法2.5、结构体的匿名字段2.6、嵌套结构体2.7、嵌套匿名结构体2.8、结构体的继承 3、结构体与…...
WEB3全栈开发——面试专业技能点P7前端与链上集成
一、Next.js技术栈 ✅ 概念介绍 Next.js 是一个基于 React 的 服务端渲染(SSR)与静态网站生成(SSG) 框架,由 Vercel 开发。它简化了构建生产级 React 应用的过程,并内置了很多特性: ✅ 文件系…...
Python 高级应用10:在python 大型项目中 FastAPI 和 Django 的相互配合
无论是python,或者java 的大型项目中,都会涉及到 自身平台微服务之间的相互调用,以及和第三发平台的 接口对接,那在python 中是怎么实现的呢? 在 Python Web 开发中,FastAPI 和 Django 是两个重要但定位不…...

高端性能封装正在突破性能壁垒,其芯片集成技术助力人工智能革命。
2024 年,高端封装市场规模为 80 亿美元,预计到 2030 年将超过 280 亿美元,2024-2030 年复合年增长率为 23%。 细分到各个终端市场,最大的高端性能封装市场是“电信和基础设施”,2024 年该市场创造了超过 67% 的收入。…...

Qt的学习(二)
1. 创建Hello Word 两种方式,实现helloworld: 1.通过图形化的方式,在界面上创建出一个控件,显示helloworld 2.通过纯代码的方式,通过编写代码,在界面上创建控件, 显示hello world; …...

【Vue】scoped+组件通信+props校验
【scoped作用及原理】 【作用】 默认写在组件中style的样式会全局生效, 因此很容易造成多个组件之间的样式冲突问题 故而可以给组件加上scoped 属性, 令样式只作用于当前组件的标签 作用:防止不同vue组件样式污染 【原理】 给组件加上scoped 属性后…...