当前位置: 首页 > news >正文

MQTT+Springboot整合

1.mqttconfig配置(配置参数是从数据库查出来的)

package com.terminal.dc3.api.center.manager.config;import com.collection.common.utils.StringUtils;
import com.collection.system.mapper.MqttConfigMapper;
import lombok.Data;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;import java.util.List;
import java.util.concurrent.ConcurrentHashMap;@Component
@Data
public class MqttConfig {@Autowiredprivate MqttPushClient mqttPushClient;@Autowiredprivate MqttConfigMapper mqttConfigMapper;public static ConcurrentHashMap<String, List<MqttPushClient>> mapHashMap = new ConcurrentHashMap<>();/*** 用户名*/private String username;/*** 密码*/private String password;/*** 连接地址*/private String hostUrl;/*** 端口*/private String port;/*** 客户Id*/private String clientId;/*** 默认连接话题*/private String defaultTopic;/*** 超时时间*/private int timeout;/*** 保持连接数*/private int keepalive;/*** mqtt功能使能*/private boolean enabled;private boolean retained;/*** qos*/private int qos;public String getUsername() {return username;}public void setUsername(String username) {this.username = username;}public String getPassword() {return password;}public void setPassword(String password) {this.password = password;}public String getHostUrl() {return hostUrl;}public void setHostUrl(String hostUrl) {this.hostUrl = hostUrl;}public String getClientId() {return clientId;}public void setClientId(String clientId) {this.clientId = clientId;}public String getDefaultTopic() {return defaultTopic;}public void setDefaultTopic(String defaultTopic) {this.defaultTopic = defaultTopic;}public int getTimeout() {return timeout;}public void setTimeout(int timeout) {this.timeout = timeout;}public int getKeepalive() {return keepalive;}public void setKeepalive(int keepalive) {this.keepalive = keepalive;}public boolean isEnabled() {return enabled;}public void setEnabled(boolean enabled) {this.enabled = enabled;}public int getQos() {return qos;}public void setQos(int qos) {this.qos = qos;}//    @Beanpublic MqttPushClient getMqttPushClient() {List<com.collection.system.domain.MqttConfig> mqttConfigs = mqttConfigMapper.selectMqttConfigList(new com.collection.system.domain.MqttConfig());for (com.collection.system.domain.MqttConfig mq : mqttConfigs) {mq.setHost("tcp://" + mq.getHost() + ":" + mq.getPort());if (mq.getEnabled()) {String mqtt_topic[] = StringUtils.split(mq.getTopic(), ",");mqttPushClient.connect(mq.getHost(), mq.getClientId(), mq.getUsername(),mq.getPassword(), mq.getTimeout(), mq.getKeepalive());//连接if (mqtt_topic != null) {for (int i = 0; i < mqtt_topic.length; i++) {mqttPushClient.subscribe(mqtt_topic[i], mq.getQos().intValue());//订阅主题}}}}return mqttPushClient;}
}

2.MqttPushClient客户端配置

package com.terminal.dc3.api.center.manager.config;import com.collection.common.core.redis.RedisCache;
import com.collection.common.utils.QueueUtils;
import com.collection.common.utils.SecurityUtils;
import com.collection.common.utils.StringUtils;
import com.collection.common.utils.ip.IpUtils;
import com.collection.common.utils.uuid.IdUtils;
import com.collection.ems.domain.MqttDatasLog;
import com.collection.ems.mapper.MqttDatasLogMapper;
import com.terminal.dc3.driver.service.mqtt.PushCallback;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.util.Date;@Component
public class MqttPushClient {private static final Logger logger = LoggerFactory.getLogger(MqttPushClient.class);@Autowiredprivate PushCallback pushCallback;private static MqttConfig mqttConfig;@Autowiredpublic void setMqttConfig(MqttConfig mqttConfig) {this.mqttConfig = mqttConfig;}private static MqttClient client;private static MqttClient getClient() {return client;}private static void setClient(MqttClient client) {MqttPushClient.client = client;}private static MqttDatasLogMapper mqttDatasLogMapper;@Autowiredpublic void setMqttDatasLogMapper(MqttDatasLogMapper mqttDatasLogMapper) {this.mqttDatasLogMapper = mqttDatasLogMapper;}/*** 客户端连接** @param host      ip+端口* @param clientID  客户端Id* @param username  用户名* @param password  密码* @param timeout   超时时间* @param keepalive 保留数*/public void connect(String host, String clientID, String username, String password, int timeout, int keepalive) {MqttClient client;try {if (host != null && clientID != null && username != null && password != null) {client = new MqttClient(host, clientID, new MemoryPersistence());MqttConnectOptions options = new MqttConnectOptions();options.setCleanSession(true);options.setUserName(username);options.setPassword(password.toCharArray());options.setConnectionTimeout(timeout);options.setKeepAliveInterval(keepalive);options.setAutomaticReconnect(true);MqttPushClient.setClient(client);client.setCallback(pushCallback);client.connect(options);}} catch (Exception e) {logger.error("connect error", e);}}/*** 发布消息** @param pubTopic 主题* @param message  内容* @param qos      连接方式*/public void publishMessage(String pubTopic, String message, int qos) {//重新进行连接if (client == null || !client.isConnected()) {mqttConfig.getMqttPushClient();logger.info("重新获取连接  {}" + client);}MqttMessage mqttMessage = new MqttMessage();mqttMessage.setQos(qos);mqttMessage.setPayload(message.getBytes());if (client == null) {return;}MqttTopic topic = client.getTopic(pubTopic);//记录发送日志MqttDatasLog mqttDatasLog = new MqttDatasLog();mqttDatasLog.setDataLog(IdUtils.randomUUID());mqttDatasLog.setLogType(pubTopic);mqttDatasLog.setContent(message);mqttDatasLog.setOperIp(IpUtils.getHostIp());mqttDatasLog.setOperTime(new Date());mqttDatasLog.setBusinessType(0L);if (null != topic) {try {MqttDeliveryToken publish = topic.publish(mqttMessage);if (!publish.isComplete()) {logger.info("发布消息成功");mqttDatasLog.setStatus(0l);//在队列中删除推送成功的数据QueueUtils.removeQueueObject(pubTopic, new String(mqttMessage.getPayload()));}} catch (Exception e) {logger.error("发布消息错误 error");mqttDatasLog.setStatus(1l);}}mqttDatasLogMapper.insertMqttDatasLog(mqttDatasLog);}/*** 订阅某个主题** @param topic 主题* @param qos   连接方式*/public static void subscribe(String topic, int qos) {logger.info("开始订阅主题" + topic + "连接方式" + qos);try {MqttPushClient.getClient().subscribe(topic, qos);} catch (Exception e) {logger.error("开始订阅主题错误 error");}}}

3.MQTTMessage数据类

package com.terminal.dc3.api.center.manager.config;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;import java.io.Serializable;/*** 消息实体对象**/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class MQTTMessage implements Serializable {/*** MQTT主题*/private String topic;/*** qos*/private Integer qos = 1;/*** MQTT内容*/private String content;}

4.MQTTLicenseTask(这是我的发送数据定时器,仅供参考)

package com.collection.web.controller.ems.task;import com.collection.common.queue.QueueCodeConstant;
import com.collection.common.utils.QueueUtils;
import com.collection.common.utils.StringUtils;
import com.collection.system.domain.MqttConfig;
import com.collection.system.mapper.MqttConfigMapper;
import com.terminal.dc3.api.center.manager.config.MqttPushClient;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.util.List;@Component
@Slf4j
public class MQTTLicenseTask {@Resourceprivate MqttConfigMapper mqttConfigMapper;@Resourceprivate MqttPushClient mqttPushClient;//    @Scheduled(fixedRate = 5000) // 每1000毫秒(1秒)执行一次@RabbitListener(queues = QueueCodeConstant.mqtt_task_config, containerFactory = "rabbitListenerContainerFactory")public void executeTask() {List<MqttConfig> mqttConfigs = mqttConfigMapper.selectMqttConfigList(new com.collection.system.domain.MqttConfig());for (com.collection.system.domain.MqttConfig mq : mqttConfigs) {String mqtt_topic[] = StringUtils.split(mq.getTopic(), ",");if (mqtt_topic != null) {for (int i = 0; i < mqtt_topic.length; i++) {Object queueObject = QueueUtils.getQueueObject(mqtt_topic[i]);if (queueObject != null) {log.info("定时推送");mqttPushClient.publishMessage(mqtt_topic[i], String.valueOf(queueObject), mq.getQos().intValue());}}}}}}

相关文章:

MQTT+Springboot整合

1.mqttconfig配置(配置参数是从数据库查出来的) package com.terminal.dc3.api.center.manager.config;import com.collection.common.utils.StringUtils; import com.collection.system.mapper.MqttConfigMapper; import lombok.Data; import org.springframework.beans.fact…...

ERROR TypeError: AutoImport is not a function

TypeError: AutoImport is not a function 原因&#xff1a;unplugin-auto-import 插件版本问题 Vue3基于Webpack&#xff0c;在vue.config.js中配置 当unplugin-vue-components版本小于0.26.0时&#xff0c;使用以下写法 const { defineConfig } require("vue/cli-se…...

软考教材重点内容 信息安全工程师 第 3 章 密码学基本理论

&#xff08;本章相对老版本极大的简化&#xff0c;所有与算法相关的计算全部删除&#xff0c;因此考试需要了解各个常 用算法的基本参数以及考试中可能存在的古典密码算法的计算&#xff0c;典型的例子是 2021 和 2022 年分别考了 DES 算法中的 S 盒计算&#xff0c;RSA 中的已…...

微信小程序 https://thirdwx.qlogo.cn 不在以下 downloadFile 合法域名列表中

授权登录后&#xff0c;拿到用户头像进行加载&#xff0c;但报错提示&#xff1a; https://thirdwx.qlogo.cn 不在以下 downloadFile 合法域名列表中 解决方法一&#xff08;未完全解决&#xff0c;临时处理&#xff09;&#xff1a;在微信开发者工具将不校验...勾上就可以访问…...

Linux性能优化之火焰图的起源

Linux火焰图的起源与性能优化专家 Brendan Gregg 密切相关&#xff0c;他在 2011 年首次提出这一工具&#xff0c;用于解决性能分析过程中可视化和数据解读的难题。 1. 背景&#xff1a;性能优化的需求 在现代计算中&#xff0c;性能优化往往需要对程序执行中的热点和瓶颈进行…...

《Markdown语法入门》

文章目录 《Markdown语法入门》1.标题2.段落2.1 换行2.2分割线 3.文字显示3.1 字体3.2 上下标 4. 列表4.1无序列表4.2 有序列表4.3 任务列表 5. 区块显示6. 代码显示6.1 行内代码6.2 代码块 7.插入超链接8.插入图片9. 插入表格 《Markdown语法入门》 【Typora 教程】手把手教你…...

Controller Baseband commands速览

目录 一、设备连接与通信控制类&#xff08;34条&#xff09; 1.1. 连接参数相关 1.1.1. 连接建立超时设置 1.1.2. 链路监督超时设置 1.1.3. Page操作超时设置 1.1.4. 扩展Page操作超时设置 1.1.5. 安全连接主机支持 1.2. 扫描操作相关 1.2.1. 扫描启用与禁用 1.2.2.…...

Redisson 3.39.0 发布

Redisson 3.39.0 发布&#xff0c;官方推荐的 Redis 客户端 Redisson 3.38.0 &#xff0c;一个 Java 编写的 Redis 客户端。 此版本更新内容如下&#xff1a; RTopic 对象的 partitioning 实现 RShardedTopic对象的 partitioning 实现 RReliableTopic 对象的 partitioning 实…...

高阶C语言补充:柔性数组

C99中&#xff0c;结构体中最后一个元素允许时未知大小的数组&#xff0c;这就叫做柔性数组成员。 vs编译器也支持柔性数组。 之所以把柔性数组单独列出&#xff0c;是因为&#xff1a; 1、柔性数组是建立在结构体的基础上的。 2、柔性数组的使用用到了动态内存分配。 这使得柔…...

S32K324信息安全-使用IC5000/IC5700进行debug口解锁

文章目录 前言winIDEA配置参考 前言 由于信息安全要求&#xff0c;需要对debug口&#xff08;JTAG&#xff09;进行加密&#xff0c;本文介绍基于固定密码的方式&#xff0c;使用IC5000/IC5700进行debug口解锁的方法 winIDEA配置 点击 Hardware | CPU Options | Reset | Ini…...

简单实现QT对象的[json]序列化与反序列化

简单实现QT对象的[json]序列化与反序列化 简介应用场景qt元对象系统思路实现使用方式题外话 简介 众所周知json作为一种轻量级的数据交换格式&#xff0c;在开发中被广泛应用。因此如何方便的将对象数据转为json格式和从json格式中加载数据到对象中就变得尤为重要。 在python类…...

Unity肢体控制(关节控制)

前面的基础搭建网上自己搜&#xff0c;我这个任务模型网上也有&#xff0c;可以去官网看看更多模型&#xff0c;这里只讲述有模型如何驱动肢体的操作方式 第一步&#xff1a;创建脚本 第二步&#xff1a;创建Rig Builder 建空容器 加部件&#xff08;Rig&#xff09;,加了之后…...

Node.js | Yarn下载安装与环境配置

一、安装Node.js Yarn 是 Node.js 下的包管理工具&#xff0c;因此想要使用 Yarn 就必须先下载 Node.js。 推荐参考&#xff1a;Node.js | npm下载安装及环境配置教程 二、Yarn安装 打开cmd&#xff0c;输入以下命令&#xff1a; npm install -g yarn检查是否安装成功&…...

WPF如何全局应用黑白主题效果

灰白色很多时候用于纪念&#xff0c;哀悼等。那么使用 WPF如何来做到这种效果呢&#xff1f;要实现的这种效果&#xff0c;我们会发现&#xff0c;它其实不仅仅是要针对图片&#xff0c;而是要针对整个窗口来实现灰白色。 如果只是针对图片的话&#xff0c;我可以可以对图片进…...

[Qt] Qt删除文本文件中的某一行

需求 我们经常读一个文件或者直接往一个空白文件中写文本&#xff0c;那么该如何使用Qt在一个文本文件中删除某一行 代码 #include <QCoreApplication> #include <QIODevice> #include <QFile> #include <QTextStream> #include <QString> #i…...

【HarmonyOS学习日志(9)】一次开发,多端部署之界面级一多开发

关于一次开发&#xff0c;多端部署 一次开发多端部署就是指一套代码工程&#xff0c;一次开发上架&#xff0c;多端按需部署&#xff08;一多&#xff09;&#xff0c;用于支撑开发者快速高效地开发多终端设备上的应用&#xff0c;以节省开发成本。 HarmonyOS系统面向多终端&…...

基于Java+SSM+JSP+MYSQL实现的宠物领养收养管理系统功能设计与实现六

一、前言介绍&#xff1a; 免费学习&#xff1a;猿来入此 1.1 项目摘要 随着人们生活水平的提高&#xff0c;宠物已经成为越来越多家庭的重要成员。然而&#xff0c;宠物的数量增长也带来了一系列问题&#xff0c;如流浪宠物数量的增加、宠物健康管理的缺失以及宠物领养收养…...

Java项目实战II基于微信小程序的课堂助手(开发文档+数据库+源码)

目录 一、前言 二、技术介绍 三、系统实现 四、文档参考 五、核心代码 六、源码获取 全栈码农以及毕业设计实战开发&#xff0c;CSDN平台Java领域新星创作者&#xff0c;专注于大学生项目实战开发、讲解和毕业答疑辅导。获取源码联系方式请查看文末 一、前言 在数字化教…...

解析 Android WebChromeClient:提升 WebView 用户体验的关键组件

文章目录 一、总览二、详细说明三、一些实际和有趣的应用四、最佳实践五、与其他组件的比较六、安全性考虑&#xff1a;防止 XSS 攻击与数据泄露6.1 介绍6.2 代码案例6.2.1 输入过滤6.2.2 Content Security Policy (CSP) 案例 六、总结 在 Android 开发中&#xff0c;WebChrome…...

【LeetCode热题100】字符串

本篇博客记录了关于字符串相关的几道题目&#xff0c;包括最长公共前缀、最长回文子串、二进制求和、字符串相乘。 //解法1 class Solution { public:string longestCommonPrefix(vector<string>& strs) {string ret strs[0];for(int i 1 ; i < strs.size() ; i…...

OceanBase 闪回查询

前言 在OB中&#xff0c;drop表可以通过 回收站 或者 以往的备份恢复来还原单表。当delete数据时&#xff0c;由于delete操作的对象不会进入回收站&#xff0c;此时需要通过闪回查询功能查看delete的数据&#xff0c;以便后续恢复 本次实验版本为 OceanBase 4.2.1.8&#xff0…...

C++析构函数详解

C析构函数详解&#xff1a;对象销毁与资源清理 在 C 中&#xff0c;析构函数是与构造函数相对应的特殊成员函数&#xff0c;它在对象生命周期结束时被自动调用&#xff0c;用于执行对象销毁之前的清理操作。析构函数主要用于释放对象占用的资源&#xff0c;如动态分配的内存、打…...

【网络安全 | 漏洞挖掘】未授权获取AI聊天内容

未经许可,不得转载。 文章目录 两天前,我收到了一项私人项目的邀请,内容看起来像是一个聊天机器人,类似于 Gemini 或 ChatGPT。于是我开始测试该项目的一些业务逻辑漏洞和 IDOR(不当访问控制)漏洞。尽管这个产品拥有一个强大的安全团队,网站上也部署了 WAF(Web 应用防火…...

时间序列分析——移动平均法、指数平滑法、逐步回归法、趋势外推法等(基于Python实现)

第 11章——时间序列分析和预测 【例11-1】 绘制时间序列折线图—观察成分 【代码框11-1】——绘制时间序列折线图 # 图11-2的绘制代码 import pandas as pd import matplotlib.pyplot as plt plt.rcParams[font.sans-serif]=[SimHei...

opencv(c++)----图像的读取以及显示

opencv(c)----图像的读取以及显示 imread: 作用&#xff1a;读取图像文件并将其加载到 Mat 对象中。参数&#xff1a; 第一个参数是文件路径&#xff0c;可以是相对路径或绝对路径。第二个参数是读取标志&#xff0c;比如 IMREAD_COLOR 表示以彩色模式读取图像。 返回值&#x…...

PyTorch——从入门到精通:PyTorch基础知识(张量)【PyTorch系统学习】

什么是张量&#xff08;Tensor&#xff09; ​ 张量在数学中是一个代数对象&#xff0c;描述了与矢量空间相关的代数对象集之间的多重线性映射。张量是向量和矩阵概念的推广&#xff0c;可以理解为多维数组。作为数学中的一个基本概念&#xff0c;张量有着多种类型&#xff0c;…...

(笔记)ubuntu20安装jdk7,多版本管理

前往 Oracle JDK 7 下载页面&#xff08;需要 Oracle 账户&#xff09;&#xff0c;下载 JDK 7 的压缩包文件&#xff08;.tar.gz&#xff09;。 下载完成后&#xff0c;将文件解压到 /opt 目录&#xff1a; sudo tar -xzf jdk-7u<version>-linux-x64.tar.gz -C /opt 重…...

Python系列教程

文章目录 1. Python基础2. Python基础库3. Python数据分析 1. Python基础 语句数据类型表达式输入、输出与文件读写函数模块与包类与面向对象作用域与命名空间常用技巧与操作 2. Python基础库 Typing库 3. Python数据分析...

如何恢復電腦IP地址的手動設置?

手動設置IP地址後&#xff0c;可能會遇到一些網路連接問題&#xff0c;或者需要恢復到之前的自動獲取狀態。這篇文章將詳細介紹如何恢復電腦的IP地址設置。 為什麼需要恢復IP地址設置&#xff1f; 網路連接問題&#xff1a;手動設置IP地址後&#xff0c;可能會導致與路由器或…...

Linux 下敏感文件路径总结

Linux 下敏感文件路径总结 在服务器运维和安全测试过程中&#xff0c;掌握各类服务的关键配置文件路径、日志文件位置以及重要目录的存放位置至关重要。本文整理了 Linux 系统下常见服务&#xff08;如 Apache、Nginx、MySQL 等&#xff09;的路径结构&#xff0c;以及一些敏感…...