在Android中使用 MQTT 服务实现消息通信
1.摘要
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是一种轻量级的、基于发布/订阅(Publish/Subscribe)模式的通信协议,最初由 IBM 在1999年开发。它设计用于在低带宽、不稳定的网络环境下进行通信,适用于物联网(IoT)和机器对机器(M2M)通信。
2.准备工作
在项目的 build.gradle 文件中添加 MQTT 相关的依赖库:
implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.5'
implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
添加权限:
<uses-permission android:name="android.permission.WAKE_LOCK" /><uses-permission android:name="android.permission.WRITE_EXTERNAL_STORAGE" /><uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" /><uses-permission android:name="android.permission.READ_PHONE_STATE" /><uses-permission android:name="android.permission.READ_EXTERNAL_STORAGE" /><uses-permission android:name="android.permission.INTERNET" />
3.代码实现
class MqttService: Service() {private val TAG = "MqttService"private lateinit var mqttAndroidClient: MqttAndroidClientprivate lateinit var mqttConnectOptions: MqttConnectOptionsprivate val serverUri = "tcp://YOUR_BROKER_URI:PORT"// 修改为你的 MQTT 服务器地址private val clientId = "AndroidClient"// 修改为你的客户端 IDprivate val username = "YOUR_USERNAME"// 修改为你的用户名private val password = "YOUR_PASSWORD"// 修改为你的密码private val PUBLISH_TOPIC = "YOUR_PUBLISH_TOPIC"private lateinit var handler: Handlerprivate val retryInterval: Long = 5000 // 重试间隔,单位:毫秒(5秒)override fun onCreate() {super.onCreate()handler = Handler()initializeMqttClient()}// 初始化 MQTT 客户端private fun initializeMqttClient() {mqttAndroidClient = MqttAndroidClient(applicationContext, serverUri, clientId)mqttConnectOptions = MqttConnectOptions().apply {userName = username // 用户名password = this@MqttService.password.toCharArray() // 密码isAutomaticReconnect = true //是否自动尝试重新连接isCleanSession = false // 清除缓存connectionTimeout = 10 // 设置超时时间,单位:秒keepAliveInterval = 20 // 心跳包发送间隔,单位:秒//当客户端与 MQTT 代理建立连接时,客户端可以指定一个遗嘱消息。如果客户端在建立连接后因为某种原因(例如网络故障或客户端异常退出)非正常断开连接,MQTT 代理将会发布这条遗嘱消息给所有订阅了客户端所订阅主题的客户端。这样,其他订阅者就可以知道客户端已经离线了。//Topic:遗嘱消息要发布到的主题。//Payload:遗嘱消息的内容。//QoS(Quality of Service):遗嘱消息的传输质量要求。//Retained:指定是否将遗嘱消息保留在 MQTT 代理中,以便新订阅者在连接时立即接收到该消息。setWill(PUBLISH_TOPIC, "I am offline".toByteArray(), 1, false)//设置遗嘱消息}mqttAndroidClient.setCallback(object : MqttCallbackExtended {override fun connectComplete(reconnect: Boolean, serverURI: String?) {Log.d(TAG, "连接到: $serverURI")}override fun connectionLost(cause: Throwable?) {Log.d(TAG, "连接丢失: ${cause?.message}")}override fun messageArrived(topic: String?, message: MqttMessage?) {Log.d(TAG, "消息到达: $topic - ${message.toString()}")}//消息传递完成时的回调函数override fun deliveryComplete(token: IMqttDeliveryToken?) {Log.d(TAG, "Delivery complete")}})connectToMqttBroker()}// 连接到 MQTT 代理private fun connectToMqttBroker() {try {if (isNetworkConnected()) {mqttAndroidClient.connect(mqttConnectOptions, null, object : IMqttActionListener {override fun onSuccess(asyncActionToken: IMqttToken?) {Log.d(TAG, "连接到MQTT代理")//订阅主题subscribeToTopic(PUBLISH_TOPIC)}override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {Log.e(TAG, "连接MQTT代理失败: ${exception?.message}")//因为设置了 isAutomaticReconnect 为 true 所以不需要手动重连操作,mqtt会自动重连}})} else {Log.d(TAG, "无网络连接")retryConnectToMqttBroker()}} catch (e: MqttException) {e.printStackTrace()}}// 重连机制private fun retryConnectToMqttBroker() {handler.postDelayed({Log.d(TAG, "重试连接到MQTT代理")connectToMqttBroker()}, retryInterval)}// 发布消息fun publishMessage(topic: String, message: String) {try {if (mqttAndroidClient.isConnected) {val mqttMessage = MqttMessage()mqttMessage.payload = message.toByteArray()mqttAndroidClient.publish(topic, mqttMessage)Log.d(TAG, "消息发布到主题: $topic")} else {Log.e(TAG, "客户端未连接,无法发布消息.")}} catch (e: MqttException) {e.printStackTrace()}}// 订阅主题fun subscribeToTopic(topic: String) {try {if (mqttAndroidClient.isConnected) {mqttAndroidClient.subscribe(topic, 1, null, object : IMqttActionListener {override fun onSuccess(asyncActionToken: IMqttToken?) {Log.d(TAG, "订阅主题: $topic")}override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {Log.e(TAG, "订阅主题失败: $topic")}})} else {Log.e(TAG, "客户端未连接,无法订阅主题.")}} catch (e: MqttException) {e.printStackTrace()}}// 取消订阅主题fun unsubscribeFromTopic(topic: String) {try {if (mqttAndroidClient.isConnected) {mqttAndroidClient.unsubscribe(topic, null, object : IMqttActionListener {override fun onSuccess(asyncActionToken: IMqttToken?) {Log.d(TAG, "取消订阅主题: $topic")}override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {Log.e(TAG, "未能取消订阅主题: $topic")}})} else {Log.e(TAG, "客户端未连接,无法取消订阅主题.")}} catch (e: MqttException) {e.printStackTrace()}}// 判断网络是否连接private fun isNetworkConnected(): Boolean {val connectivityManager = getSystemService(Context.CONNECTIVITY_SERVICE) as ConnectivityManagerval activeNetwork: NetworkInfo? = connectivityManager.activeNetworkInforeturn activeNetwork?.isConnected == true}override fun onBind(intent: Intent?): IBinder? {return null}override fun onDestroy() {super.onDestroy()disconnectClient()}// 断开 MQTT 客户端连接private fun disconnectClient() {try {if (mqttAndroidClient.isConnected) {mqttAndroidClient.disconnect()}} catch (e: MqttException) {e.printStackTrace()}}
}
4.开启服务
在AndroidManifest.xml中的application标签内注册服务
<service android:name=".MqttService" /><service android:name="org.eclipse.paho.android.service.MqttService" />
5.功能点介绍
MQTT 客户端初始化:
- 通过 MqttAndroidClient 和 MqttConnectOptions 初始化 MQTT 客户端。
- 配置 MQTT 客户端的连接选项,包括用户名、密码、自动重连、清除会话、超时设置和心跳包发送间隔等。
- 配置遗嘱消息,当客户端非正常断开时,MQTT 代理将发布该消息(setWill 方法)。
MQTT 客户端回调:
- 实现 MqttCallbackExtended 接口,用于处理连接完成、连接丢失、消息到达以及消息传递完成时的回调。.
连接到 MQTT 代理:
- 尝试连接到 MQTT 代理,如果网络连接正常则进行连接,并订阅指定的主题(PUBLISH_TOPIC)。
- 如果连接失败且设置了自动重连选项,客户端会自动尝试重连。
- 如果没有网络连接,则通过重试机制定时尝试重新连接。
消息发布:
- 实现 publishMessage 方法,用于向指定主题发布消息。
订阅和取消订阅主题:
- 实现 subscribeToTopic 方法,用于订阅指定的主题。
- 实现 unsubscribeFromTopic 方法,用于取消订阅指定的主题。
6.其他介绍
MQTT中 QoS 级别的意义:
QoS 0:最多一次传递(At most once)
- 也称为“至多一次”传递
- 发布消息后,不会收到任何确认或回执。
- MQTT 代理不会跟踪消息传递,也不会重新传递丢失的消息。
- 此级别适用于那些可以容忍偶尔丢失消息的应用场景,例如实时数据流,传感器数据等。
QoS 1:至少一次传递(At least once)
- 确保消息至少被传递一次。
- 在发布消息后,发布者会收到一个 PUBACK(发布确认)消息作为回复。
- 如果 PUBACK 丢失或未收到确认,则 MQTT 客户端会尝试重新发送消息,直到收到 PUBACK。
- 此级别适用于对消息到达的顺序不是特别关心,但确保消息至少被传递一次的应用场景。
QoS 2:只有一次传递(Exactly once)
- 提供最高的传递保证,确保每条消息只传递一次。
- 在发布消息后,发布者会收到两个确认消息:PUBREC(发布接收)和 PUBREL(发布释放)。
- MQTT 客户端会等待收到 PUBREL 消息后再发送 PUBCOMP(发布完成)消息作为最终确认。
- 此级别适用于对消息传递的顺序和确保每条消息只传递一次的高度敏感的应用场景,如金融交易、命令和控制等。
MQTT断开连接((32109) - java.io.EOFException)
可以看我的这篇文章:MQTT断开连接((32109) - java.io.EOFException)
7.最后
MQTT在物联网开发中必不可少,掌握相关知识非常重要,此篇文章用来温故一下MQTT使用流程,知识点不多,代码已经封装的差不多了,方便本人及各位拿来即用 更多功能自行拓展。
相关文章:
在Android中使用 MQTT 服务实现消息通信
1.摘要 MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是一种轻量级的、基于发布/订阅(Publish/Subscribe)模式的通信协议,最初由 IBM 在1999年开发。它设计用于在低带宽、不稳定的网络环境下…...
qsort函数
学习c语言的过程中少不了的就是排序,例如冒泡排序(不清楚的同学可以翻找一下之前的文章), 我们这里将冒泡排序作为一个自定义函数来呈现一下 #include<stdio.h>void bubble_sort(int arr[], int len) {for (int i 0; i &…...
你可以直接和数据库对话了!DB-GPT 用LLM定义数据库下一代交互方式,数据库领域的GPT、开启数据3.0 时代
✨点击这里✨:🚀原文链接:(更好排版、视频播放、社群交流、最新AI开源项目、AI工具分享都在这个公众号!) 你可以直接和数据库对话了!DB-GPT 用LLM定义数据库下一代交互方式,数据库领…...
数据结构笔记2 栈和队列
为什么在循环队列中,判断队满的条件是(Q.rear1)模maxqsize? 取模运算(%)在循环队列中起到关键作用,主要是因为它能确保索引值在数组的有效范围内循环。具体来说,取模运算有以下几个重要作用&am…...
Python | 刷题笔记
继承 class Father:__secret"you are your own kid"stroy"iam a handsome boy..."def tellstory(self):print("我的故事:",self.stroy)def __tellstory(self):print("我的秘密:",Father.__secret) class Son(Father):def tell(self…...
软件三班20240605
文章目录 1.创建工程和模块2.添加 web支持3.创建前端代码4.添加servlet 依赖5. 代码6.案例2 1.创建工程和模块 2.添加 web支持 方法1 方法2 3.创建前端代码 4.添加servlet 依赖 5. 代码 <!DOCTYPE html> <html lang"en"> <head><meta c…...
http和https数据传输与协议区分
目录 1. 数据传输安全性2. 端口号3. URL 前缀4. SSL/TLS 证书5. 性能6. SEO 和用户信任7. 应用场景总结 HTTP(HyperText Transfer Protocol)和 HTTPS(HyperText Transfer Protocol Secure)是用于在客户端(如浏览器&…...
天才程序员周弈帆 | Stable Diffusion 解读(一):回顾早期工作
本文来源公众号“天才程序员周弈帆”,仅用于学术分享,侵权删,干货满满。 原文链接:Stable Diffusion 解读(一):回顾早期工作 在2022年的这波AI绘画浪潮中,Stable Diffusion无疑是最…...
软件架构初探
MVC架构软件层次结构是面向实体的,他最底层是实体类,实体类中封装了对象的抽象数据类型(数据结构和对数据结构的基本操作)。然后向上一层数据处理层提供接口,数据处理层利用模型层提供的对象和基本操作进一步进行算法的…...
Python01 -分解整包数据到各个变量操作和生成器
Python 的星号表达式可以用来解决这个问题。比如,你在学习一门课程,在学期末的时候,你想统计下家庭作业的平均成绩,但是排除掉第一个和最后一个分数。如果只有四个分数,你可能就直接去简单的手动赋值,但如果…...
flutter image_picker 执行拍照的图片怎么保存到本地
在 Flutter 中,使用 image_picker 插件拍照的图片默认会被保存到设备的临时目录中。这个临时目录的具体位置取决于设备的操作系统。在 iOS 上,它通常是应用的沙盒目录;在 Android 上,它通常是应用的缓存目录。 这些图片不会被自动…...
基于Python的北京天气数据可视化分析
项目用到库 import numpy as np import pandas as pd import datetime from pyecharts.charts import Line from pyecharts.charts import Boxplot from pyecharts.charts import Pie,Grid from pyecharts import options as opts from pyecharts.charts import Calendar 1.2…...
Linux编译器-gcc或g++的使用
一.安装gcc/g 在linux中是不会自带gcc/g的,我们需要编译程序就自己需要安装gcc/g。 很简单我们使用简单的命令安装gcc:sudo yum install -y gcc。 g安装:sudo yum install -y gcc-c。 我们知道Windows上区分文件,都是使用文件…...
一条sql的执行流程
文章地址 https://blog.csdn.net/qq_43618881/article/details/118657040 连接器 请求先走到连接器,与客户端建立连接、获取权限、维持和管理连接 mysql缓存池 如果要查找的数据直接在mysql缓存池里面就直接返回数据 分析器 请求已经建立了连接,现在…...
Android音乐播放器的思路处理
** 1.android音乐播放播放列表中下一首上一首随机播放的思路 ** 实现 Android 音乐播放器的播放列表中的下一首、上一首和随机播放功能涉及到对音乐列表的管理以及对播放顺序的控制。以下是实现这些功能的思路: 下一首和上一首功能: 维护一个音乐列表…...
算法课程笔记——可撤销并查集
算法课程笔记——可撤销并查集 Gv...
【排序算法】快速排序
一、定义: 快速排序是Hoare于1962年提出的一种二叉树结构的交换排序方法(也叫Hoare排序),是一种基于分治的排序方。其基本原理是将待排序的数组通过一趟排序分成两个独立的部分,其中一部分的所有数据比另一部分的所有数…...
OS复习笔记ch7-2
页式管理 学过计组的同学都了解一点页式管理,就是将内存划分成较小的、大小固定的、等大的块。现在OS引入了进程的概念,那么为了匹配内存的分块,同样把进程也划分成同样大小的块。 这里区分两个概念 The chunks of a process are called p…...
4.通用编程概念
目录 一、变量与常量1.1 变量1.2 常量 二、遮蔽三、数据类型3.1 标量类型1. 整型2. 浮点型3. 布尔类型4.字符类型 3.2 复合类型1. 元组2. 数组 四、函数五、语句和表达式六、函数的返回值 一、变量与常量 1.1 变量 在Rust中默认的变量是不可变的,如果修改其值会导致…...
iBeacon赋能AR导航:室内定位技术的原理与优势
室内定位导航对于大型商场、机场、医院等复杂室内环境至关重要,它帮助人们快速找到目的地,提高空间利用率。AR技术通过将虚拟信息叠加在现实世界,提供直观导航指引,正在成为室内导航的新趋势,增强用户互动体验…...
Cursor实现用excel数据填充word模版的方法
cursor主页:https://www.cursor.com/ 任务目标:把excel格式的数据里的单元格,按照某一个固定模版填充到word中 文章目录 注意事项逐步生成程序1. 确定格式2. 调试程序 注意事项 直接给一个excel文件和最终呈现的word文件的示例,…...
超短脉冲激光自聚焦效应
前言与目录 强激光引起自聚焦效应机理 超短脉冲激光在脆性材料内部加工时引起的自聚焦效应,这是一种非线性光学现象,主要涉及光学克尔效应和材料的非线性光学特性。 自聚焦效应可以产生局部的强光场,对材料产生非线性响应,可能…...
1688商品列表API与其他数据源的对接思路
将1688商品列表API与其他数据源对接时,需结合业务场景设计数据流转链路,重点关注数据格式兼容性、接口调用频率控制及数据一致性维护。以下是具体对接思路及关键技术点: 一、核心对接场景与目标 商品数据同步 场景:将1688商品信息…...
自然语言处理——循环神经网络
自然语言处理——循环神经网络 循环神经网络应用到基于机器学习的自然语言处理任务序列到类别同步的序列到序列模式异步的序列到序列模式 参数学习和长程依赖问题基于门控的循环神经网络门控循环单元(GRU)长短期记忆神经网络(LSTM)…...
企业如何增强终端安全?
在数字化转型加速的今天,企业的业务运行越来越依赖于终端设备。从员工的笔记本电脑、智能手机,到工厂里的物联网设备、智能传感器,这些终端构成了企业与外部世界连接的 “神经末梢”。然而,随着远程办公的常态化和设备接入的爆炸式…...
渗透实战PortSwigger靶场:lab13存储型DOM XSS详解
进来是需要留言的,先用做简单的 html 标签测试 发现面的</h1>不见了 数据包中找到了一个loadCommentsWithVulnerableEscapeHtml.js 他是把用户输入的<>进行 html 编码,输入的<>当成字符串处理回显到页面中,看来只是把用户输…...
【Linux】Linux安装并配置RabbitMQ
目录 1. 安装 Erlang 2. 安装 RabbitMQ 2.1.添加 RabbitMQ 仓库 2.2.安装 RabbitMQ 3.配置 3.1.启动和管理服务 4. 访问管理界面 5.安装问题 6.修改密码 7.修改端口 7.1.找到文件 7.2.修改文件 1. 安装 Erlang 由于 RabbitMQ 是用 Erlang 编写的,需要先安…...
6️⃣Go 语言中的哈希、加密与序列化:通往区块链世界的钥匙
Go 语言中的哈希、加密与序列化:通往区块链世界的钥匙 一、前言:离区块链还有多远? 区块链听起来可能遥不可及,似乎是只有密码学专家和资深工程师才能涉足的领域。但事实上,构建一个区块链的核心并不复杂,尤其当你已经掌握了一门系统编程语言,比如 Go。 要真正理解区…...
LangChain 中的文档加载器(Loader)与文本切分器(Splitter)详解《二》
🧠 LangChain 中 TextSplitter 的使用详解:从基础到进阶(附代码) 一、前言 在处理大规模文本数据时,特别是在构建知识库或进行大模型训练与推理时,文本切分(Text Splitting) 是一个…...
一些实用的chrome扩展0x01
简介 浏览器扩展程序有助于自动化任务、查找隐藏的漏洞、隐藏自身痕迹。以下列出了一些必备扩展程序,无论是测试应用程序、搜寻漏洞还是收集情报,它们都能提升工作流程。 FoxyProxy 代理管理工具,此扩展简化了使用代理(如 Burp…...
