python实现MQTT协议(发布者,订阅者,topic)
python实现MQTT协议
一、简介
1.1 概述
本文章针对物联网MQTT协议完成python实现
1.2 环境
- Apache-apollo创建broker
- Python实现发布者和订阅者
1.3 内容
-
MQTT协议架构说明 :
-
利用仿真服务体会 MQTT协议
-
针对MQTT协议进行测试
任务1:MQTT协议应用场景
说明: MQ 遥测传输 (MQTT) 是轻量级基于代理的发布/订阅的消息传输协议,设计思想是开放、简单、轻量、 易于实现。这些特点使它适用于受限环境。该协议的特点有:
1 使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合。
2 对负载内容屏蔽的消息传输。
3 使用 TCP/IP 提供网络连接。
物联网应用场景:

协议角色分工:
客户端分为2种角色:发布者(Publisher)和订阅者(Subscriber)。
每一个发布者(Publisher)可以发送不同类型的消息,我们把消息的类型叫做主题(topic),
MQTT通信中的消息都属于某一个主题 ,而只有订阅了这个主题的订阅者(Subscriber)才能收到属于这个主题的消息。
发布者和订阅者不需要在意和知道对方的存在(不需要知道对方的IP和端口),也不需要直接与对方建立连接。因为通信中存在着一个叫代理 (MQTT broker)的第三种角色,也可以叫MQTT服务器(MQTT server)。
发布者、订阅者只需要知道MQTT服务器的IP和端口即可,并和它直接建立连接通信。MQTT代理作为消息的 中转,它过滤所有接受到的消息,并按照一定的机制(MQTT标准规定是基于主题的消息过滤派发方式,而具 体的MQTT服务器软件也提供了其他的派发方式)分发它们,使得所有注册到MQTT代理的订阅者只接收到他 们订阅了的消息,而不会收到他不关心的消息。
当发布者发布一条消息的时候,他必须同时指定消息的主题和消息的负载。MQTT代理在收到发布者发过来的 消息时,无需访问消息负载,他只是访问消息的主题信息,然后根据这主题派发给订阅者。需要注意的是,一个客户端可以同时既当发布者又当订阅者。比如一个开发板连接了一盏LED灯,它可以发布灯的暗/亮状态 信息,也可以从其他节点订阅对灯的控制消息。
3.生产者(发布消息)和消费者(消耗消息-订阅者)模式理解
生产者:wifi设备采集各种物联网传感器比如温度重力传感器
消费者:客户端比如手机
原理如下:

任务2:搭建Mqtt协议服务 (broker)
前提:安装JDK和JAVA_HOME环境变量
1 下载Apollo服务器 地址 http://archive.apache.org/dist/activemq/activemq-apollo/1.7.1/
2 进入bin目录命令行 输入:
D:\softwares\apache-apollo-1.7.1\bin\apollo.cmd create jwbroker
3:broker\etc\apollo.xml文件下是配置服务器信息的文件
初始默认帐号是admin,密码password;
4:启动命令行: (以一个实例为单位进行创建的)
进入... jwbroker创建实例的\bin\ 目录,
在CMD输入命令apollo-broker.cmd run,可以使用TAB键自动补全,运行后输出信息如下:验证:
MQTT服务器TCP连接端口: tcp://0.0.0.0:61613
后台web管理页面:https://127.0.0.1:61681/或http://127.0.0.1:61680/
出现如下图表示启动成功

安装mqtt需要的包:
pip install paho-mqtt
发布者publish创建:
import time
from paho.mqtt import publish
#源码中只需要知道 ip + 端口 + 订阅的主题
HOST ="127.0.0.1"
PORT =61613def on_connect(client,userdata, flags,rc):print("Connected with result code" + str(rc))client.subscribe("jw-temperature") # 发布主题def on_message(client,userdata,msg):print(msg.topc + "消息发送!" + msg.payload.decode("utf-8"))if __name__ == '__main__':print("消息发布!----我是一个发布者:正在给设备和传感器发布主题-----")client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))for i in range(20):time.sleep(2)publish.single("lightChange","现在天黑了", qos = 1, hostname = HOST, port = PORT, client_id = client_id,auth = {'username': "admin", 'password': "password"})print("已发送"+str(i+1)+"条消息")
订阅者light_subcribe创建:
import paho.mqtt.client as mqtt
import time#源码中只需要知道 ip + 端口 + 订阅的主题
HOST ="127.0.0.1"
PORT =61613'''
The callback for when the client receives a CONNACK response from the server
客户端接收到服务器端的确认连接请求时,回调on_connect服务端发送CONNACK报文响应从客户端收到的CONNECT报文。
服务端发送给客户端的第一个报文必须是CONNACK [MQTT-3.2.0-1].
'''
def on_connect(client,userdata, flags,rc):print("Connected with result code" + str(rc))'''Subscribing in on_connect() means that if we lose the connection and reconnect then subscriptions wil be renewed(恢复、续订).'''client.subscribe("lightChange") # 订阅主题'''
The callback for when a PUBLISH message is received from the server.
客户端接收到服务器向其传输的消息时,
回调on_messagePUBLISH控制报文是指从客户端向服务端或者服务端向客户端传输一个应用消息。
'''
def on_message(client,userdata,msg):print(msg.topic+ msg.payload.decode("utf-8") + ",回调消息:收到收到!我已经接收到发布者的消息,并且打开了光传感器" )def client_loop():'''注意,client_id是必须的,并且是唯一的。否则可能会出现如下错误[WinError 10054] 远程主机强迫关闭了一个现有的连接'''client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))client = mqtt.Client(client_id) # Client_id 不能重复,所以使用当前时间client.username_pw_set("admin","password") # 必须设置,否则会返回 /Connected with result code 4/client.on_connect = on_connectclient.on_message = on_message'''拥塞回调:处理网络流量,调度回调和重连接。Blocking call that processes network traffic, dispatches callbacks and handles reconnecting.Other loop*() functions are available that give a threaded interface and amanual- interface...I'''try:client.connect(HOST,PORT,60)client.loop_forever()except KeyboardInterrupt:client.disconnect()if __name__ == '__main__':print("手电筒打开----我是一个订阅者:需要消费主题-----")client_loop()
订阅者phone_subcribe创建:
import paho.mqtt.client as mqtt
import time#源码中只需要知道 ip + 端口 + 订阅的主题
HOST ="127.0.0.1"
PORT =61613
'''
The callback for when the client receives a CONNACK response from the server
客户端接收到服务器端的确认连接请求时,回调on_connect服务端发送CONNACK报文响应从客户端收到的CONNECT报文。
服务端发送给客户端的第一个报文必须是CONNACK [MQTT-3.2.0-1].'''
def on_connect(client,userdata, flags,rc):print("Connected with result code" + str(rc))'''Subscribing in on_connect() means that if we lose the connection and reconnect then subscriptions wil be renewed(恢复、续订).'''client.subscribe("lightChange") # 订阅主题'''
The callback for when a PUBLISH message is received from the server.
客户端接收到服务器向其传输的消息时,
回调on_messagePUBLISH控制报文是指从客户端向服务端或者服务端向客户端传输一个应用消息。
'''
def on_message(client,userdata,msg):print(msg.topic + msg.payload.decode("utf-8")+ ",回调消息:收到收到!我已经接收到发布者的消息并给用户反馈手电筒已经打开")def client_loop():'''注意,client_id是必须的,并且是唯一的。否则可能会出现如下错误[WinError 10054] 远程主机强迫关闭了一个现有的连接'''client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))client = mqtt.Client(client_id) # Client_id 不能重复,所以使用当前时间client.username_pw_set("admin","password") # 必须设置,否则会返回 /Connected with result code 4/client.on_connect = on_connectclient.on_message = on_message'''拥塞回调:处理网络流量,调度回调和重连接。Blocking call that processes network traffic, dispatches callbacks and handles reconnecting.Other loop*() functions are available that give a threaded interface and amanual- interface...I'''try:client.connect(HOST,PORT,60)client.loop_forever()except KeyboardInterrupt:client.disconnect()if __name__ == '__main__':print("手机启动----我是一个订阅者:需要消费主题-----")client_loop()
演示:分别运行publish和light_subcribe和phone_subcribe
publish:
light_subcribe:
phone_subcribe:

相关文章:
python实现MQTT协议(发布者,订阅者,topic)
python实现MQTT协议 一、简介 1.1 概述 本文章针对物联网MQTT协议完成python实现 1.2 环境 Apache-apollo创建brokerPython实现发布者和订阅者 1.3 内容 MQTT协议架构说明 : 利用仿真服务体会 MQTT协议 针对MQTT协议进行测试 任务1:MQTT协议应…...
2023年09月03日-----16:58
协同过滤推荐和矩阵分解本质上有什么不同?协同过滤推荐和矩阵分解是两种推荐系统方法,它们在某些方面有相似之处,但也有一些本质不同之处。 基本原理: 协同过滤推荐:协同过滤是一种基于用户行为数据的推荐方法,它依赖于用户-物品交互数据,如用户的评分或点击历史。协同过…...
HTTP状态码504(Gateway Timeout)报错原因分析和解决办法
文章目录 504报错原因分析一、用户角度1. 代理服务器问题2. 网络问题 二、网站管理员角度1. 服务器负载过重2. 网关配置问题3. 目标服务器响应慢4. IIS/nginx/apache服务关闭5. 维护或故障6. 数据库的慢处理也会导致504 用户角度可以采取哪些措施解决504错误1. 刷新页面2. 检查…...
《凤凰架构》第三章——事务处理
前言 由于一些地方原文感觉不太清楚,有些地方用小林coding的文章代替。 总结 事务处理主要的目的就是要让数据在各种条件下,最终的运行结果都能符合你的期望。要达成这个目标有三点需要满足:原子性(业务要么同时成功࿰…...
音视频添 加水印
一、文字水印 在视频中增加文字水印需要准备的条件比较多,需要有文字字库处理的相关文件,在编译FFmpeg时需要支持FreeType、FontConfig、iconv,系统中需要有相关的字库,在FFmpeg中增加纯字母水印可以使用drawtext滤镜进行支持&am…...
使用Python的requests库与chatGPT进行通信
前言 在人工智能领域,自然语言处理模型如OpenAI GPT-3.5 Turbo具有广泛的应用。虽然官方提供了Python库来与这些模型进行交互,但也有一些人更喜欢使用requests库来自定义请求和处理响应,比如现在很多第三方LLM都提供了与chatGPT类似的http请…...
SASS常用内置函数
1,math 引入:use "sass:math"; 常用函数: ceil($number) - 向上取整floor($number) - 向下取整round($number) - 四舍五入max($number...) - 比较若干数值并取最大值min($number...) - 比较若干数值并取最小值random() - 取0~1之…...
2023年05月 C/C++(四级)真题解析#中国电子学会#全国青少年软件编程等级考试
C/C++编程(1~8级)全部真题・点这里 第1题:怪盗基德的滑翔翼 怪盗基德是一个充满传奇色彩的怪盗,专门以珠宝为目标的超级盗窃犯。而他最为突出的地方,就是他每次都能逃脱中村警部的重重围堵,而这也很大程度上是多亏了他随身携带的便于操作的滑翔翼。 有一天,怪盗基德像往…...
Emmet 使用笔记小结
Emmet 使用笔记小结 最近在跟视频走 CSS 的教程,然后要写很多的 HTML 结构,就想着总结一下 Emmet 的语法。 Emmet 是一个工具可以用来加速 HTML 和 CSS 的开发过程,不过 emmet 只支持 HTML & XML 文件结构,所以我个人觉得对…...
如何使用Puppeteer进行新闻网站数据抓取和聚合
导语 Puppeteer是一个基于Node.js的库,它提供了一个高级的API来控制Chrome或Chromium浏览器。通过Puppeteer,我们可以实现各种自动化任务,如网页截图、PDF生成、表单填写、网络监控等。本文将介绍如何使用Puppeteer进行新闻网站数据抓取和聚…...
【LeetCode每日一题合集】2023.8.7-2023.8.13(动态规划分治)
文章目录 344. 反转字符串1749. 任意子数组和的绝对值的最大值(最大子数组和)1281. 整数的各位积和之差1289. 下降路径最小和 II解法1——动态规划 O ( n 3 ) O(n^3) O(n3)解法2——转移过程优化 O ( n 2 ) O(n^2) O(n2) ⭐ 1572. 矩阵对角线元素的和解法…...
微信小程序修改vant组件样式
1 背景 在使用vant组件开发微信小程序的时候,想更改vant组件内部样式,达到自己想要的目的(van-grid组件改成宫格背景色为透明,默认为白色),官网没有示例,通过以下几步修改成功。 2 步骤 2.1 …...
yum 、rpm、yumdownloader、repotrack 学习笔记
1 Linux 包管理器概述 rpm的使用: rpm -ivh filename.rpm#这列出该packageName(包名)安装的所有文件列表。 rpm -ql packageName #查询已安装的该packageName的详细信息,包括版本、发布日期等。 rpm -qi packageName #列出该pac…...
python检测CPU占用、内存和磁盘剩余空间 脚本
python检测CPU占用、内存和磁盘剩余空间 脚本。后续将其加入到计划列表中即可 # codingutf-8 import time import psutil import osimport smtplibfrom email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText # email 用于构建邮件内容 from email…...
量化策略:CTA,市场中性,指数增强
CTA 策略 commodity Trading Advisor Strategy,即“商品交易顾问策略”,也被称作管理期货策略。 期货T0,股票T1双向交易:就单向交易而言的,不仅能先买入再卖出(做多),而且可以先卖…...
L1-051 打折(Python实现) 测试点全过
前言: {\color{Blue}前言:} 前言: 本系列题使用的是,“PTA中的团体程序设计天梯赛——练习集”的题库,难度有L1、L2、L3三个等级,分别对应团体程序设计天梯赛的三个难度。更新取决于题目的难度,…...
任意文件读取和漏洞复现
任意文件读取 1. 概述 一些网站的需求,可能会提供文件查看与下载的功能。如果对用户查看或下载的文件没有限制或者限制绕过,就可以查看或下载任意文件。这些文件可以是漂代码文件,配置文件,敏感文件等等。 任意文件读取会造成&…...
编译KArchive在windows10下
使用QT6和VS2019编译KArchive的简要步骤: 安装 Qt ,我是用源码自己编译的 "F:\qtbuild"安装CMakefile并配置环境变量安装Git下载ECM源码 https://github.com/KDE/extra-cmake-modules.git-------------------------------------------------…...
【Python】批量下载页面资源
【背景】 有一些非常不错的资源网站,比如一些MP3资源网站。资源很丰富,但是每一个资源都不大,一个一个下载费时费力,想用Python快速实现可复用的批量下载程序。 【思路】 获得包含资源链接的静态页面,用beautifulsoup分析页面,获得所有MP3资源的实际地址,然后下载。…...
Windows NUMA编程实践 – 处理器组、组亲和性、处理器亲和性及版本变化
Windows在设计之初没有考虑过对大数量的多CPU和NUMA架构的设备的支持,大部分关于CPU的设计按照64个为上限来设计。核心数越来越多的多核处理器的进入市场使得微软不得不做较大的改动来进行支持,因此Windows 的进程、线程和NUMA API在各个版本中行为不一样…...
脑机新手指南(八):OpenBCI_GUI:从环境搭建到数据可视化(下)
一、数据处理与分析实战 (一)实时滤波与参数调整 基础滤波操作 60Hz 工频滤波:勾选界面右侧 “60Hz” 复选框,可有效抑制电网干扰(适用于北美地区,欧洲用户可调整为 50Hz)。 平滑处理&…...
【OSG学习笔记】Day 18: 碰撞检测与物理交互
物理引擎(Physics Engine) 物理引擎 是一种通过计算机模拟物理规律(如力学、碰撞、重力、流体动力学等)的软件工具或库。 它的核心目标是在虚拟环境中逼真地模拟物体的运动和交互,广泛应用于 游戏开发、动画制作、虚…...
MFC内存泄露
1、泄露代码示例 void X::SetApplicationBtn() {CMFCRibbonApplicationButton* pBtn GetApplicationButton();// 获取 Ribbon Bar 指针// 创建自定义按钮CCustomRibbonAppButton* pCustomButton new CCustomRibbonAppButton();pCustomButton->SetImage(IDB_BITMAP_Jdp26)…...
Day131 | 灵神 | 回溯算法 | 子集型 子集
Day131 | 灵神 | 回溯算法 | 子集型 子集 78.子集 78. 子集 - 力扣(LeetCode) 思路: 笔者写过很多次这道题了,不想写题解了,大家看灵神讲解吧 回溯算法套路①子集型回溯【基础算法精讲 14】_哔哩哔哩_bilibili 完…...
转转集团旗下首家二手多品类循环仓店“超级转转”开业
6月9日,国内领先的循环经济企业转转集团旗下首家二手多品类循环仓店“超级转转”正式开业。 转转集团创始人兼CEO黄炜、转转循环时尚发起人朱珠、转转集团COO兼红布林CEO胡伟琨、王府井集团副总裁祝捷等出席了开业剪彩仪式。 据「TMT星球」了解,“超级…...
postgresql|数据库|只读用户的创建和删除(备忘)
CREATE USER read_only WITH PASSWORD 密码 -- 连接到xxx数据库 \c xxx -- 授予对xxx数据库的只读权限 GRANT CONNECT ON DATABASE xxx TO read_only; GRANT USAGE ON SCHEMA public TO read_only; GRANT SELECT ON ALL TABLES IN SCHEMA public TO read_only; GRANT EXECUTE O…...
push [特殊字符] present
push 🆚 present 前言present和dismiss特点代码演示 push和pop特点代码演示 前言 在 iOS 开发中,push 和 present 是两种不同的视图控制器切换方式,它们有着显著的区别。 present和dismiss 特点 在当前控制器上方新建视图层级需要手动调用…...
从物理机到云原生:全面解析计算虚拟化技术的演进与应用
前言:我的虚拟化技术探索之旅 我最早接触"虚拟机"的概念是从Java开始的——JVM(Java Virtual Machine)让"一次编写,到处运行"成为可能。这个软件层面的虚拟化让我着迷,但直到后来接触VMware和Doc…...
在golang中如何将已安装的依赖降级处理,比如:将 go-ansible/v2@v2.2.0 更换为 go-ansible/@v1.1.7
在 Go 项目中降级 go-ansible 从 v2.2.0 到 v1.1.7 具体步骤: 第一步: 修改 go.mod 文件 // 原 v2 版本声明 require github.com/apenella/go-ansible/v2 v2.2.0 替换为: // 改为 v…...
OCR MLLM Evaluation
为什么需要评测体系?——背景与矛盾 能干的事: 看清楚发票、身份证上的字(准确率>90%),速度飞快(眨眼间完成)。干不了的事: 碰到复杂表格(合并单元…...
