当前位置: 首页 > news >正文

C++的MQTT开发:使用Paho的C++接口实现消息发布、订阅、连接RabbitMQ

C++ Paho实现MQTT消息发布功能

要使用paho的cpp接口实现发布MQTT消息的功能,需要进行以下步骤:

  1. 安装paho库:首先从paho官方网站下载并安装paho的C++库。可以从https://www.eclipse.org/paho/clients/cpp/ 下载适合操作系统的版本。

  2. 创建MQTT客户端:可以使用mqtt::client类来创建一个客户端,如下所示:

mqtt::client client("tcp://broker.example.com:1883", "clientId");

在上面的代码中,broker.example.com是您的MQTT代理服务器的地址,1883是MQTT代理服务器的默认端口。clientId是客户端的唯一标识符,可以自己选择一个适合的名字。

  1. 设置连接选项:创建客户端后可以设置一些连接选项,例如设置用户名和密码,设置遗嘱消息等。以下是示例代码:
mqtt::connect_options connOpts;
connOpts.set_user_name("username");
connOpts.set_password("password");
connOpts.set_will(mqtt::message("topic", "offline", 1, true));

在上面的代码中,usernamepassword是您的MQTT代理服务器的登录凭据。topic是遗嘱消息的主题,offline是遗嘱消息的内容,1是遗嘱消息的QoS级别(Quality of Service),true表示遗嘱消息是保留的。

  1. 连接到MQTT代理服务器:使用mqtt::client对象的connect方法连接到MQTT代理服务器,如下所示:
client.connect(connOpts);
  1. 发布消息:使用mqtt::client对象的publish方法发布消息。以下是一个示例代码:
std::string payload = "Hello, MQTT!";
client.publish("topic", payload.c_str(), payload.length());

在上面的代码中,topic是消息的主题,payload是消息的内容。您可以根据需要修改这些值。

  1. 断开连接:在完成消息发布后,您可以使用mqtt::client对象的disconnect方法断开与MQTT代理服务器的连接,如下所示:
client.disconnect();

这是使用paho的cpp接口发布MQTT消息的基本步骤,实际应用中可能需要处理更多的错误和异常情况。参考paho的官方文档和示例代码来进一步了解和掌握paho的cpp接口的使用。

完整的C++ Paho消息发布的代码演示

#include <iostream>
#include <cstring>
#include "mqtt/async_client.h"const std::string SERVER_ADDRESS("tcp://broker.example.com:1883");
const std::string CLIENT_ID("clientId");
const std::string TOPIC("topic");class mqtt_callback : public virtual mqtt::callback
{void connection_lost(const std::string& cause) override{std::cout << "\nConnection lost! Cause: " << cause << std::endl;}void delivery_complete(mqtt::delivery_token_ptr token) override{std::cout << "Message delivery complete!" << std::endl;}
};int main(int argc, char* argv[])
{try{mqtt::async_client client(SERVER_ADDRESS, CLIENT_ID);mqtt::connect_options conn_opts;mqtt_callback cb;client.set_callback(cb);client.connect(conn_opts);std::string payload = "Hello, MQTT!";mqtt::message_ptr pubmsg = mqtt::make_message(TOPIC, payload);client.publish(pubmsg);client.disconnect();}catch (const mqtt::exception& exc){std::cerr << "Error: " << exc.what() << std::endl;return 1;}return 0;
}

在这段代码中,我们包含了必要的头文件,并定义了MQTT服务器地址、客户端ID和主题。我们还定义了一个自定义的回调类mqtt_callback,它继承自mqtt::callback,用于处理连接丢失和消息传递完成事件。

main()函数内部创建了一个带有服务器地址和客户端ID的mqtt::async_client对象。我们还创建了一个mqtt::connect_options对象来指定连接选项。使用client.set_callback(cb)为客户端设置回调函数,其中cbmqtt_callback类的一个实例。

我们使用client.connect(conn_opts)建立与MQTT服务器的连接。如果连接成功,我们使用mqtt::make_message()创建一个带有所需负载和主题的MQTT消息。

最后,使用client.publish(pubmsg)发布消息,并使用client.disconnect()断开与MQTT服务器的连接。

客户端:C++ Paho实现 MQTT的客户端

同步客户端:mqtt::client

在paho库中,mqtt::clientmqtt::async_client是MQTT客户端的两种不同实现方式。

mqtt::client是同步的客户端实现,它在发送和接收消息时会阻塞当前线程,直到操作完成或发生超时。这意味着您需要手动管理线程和处理并发操作。使用mqtt::client可以更容易地编写简单的同步代码,特别适用于简单的MQTT应用程序或在没有并发需求的情况下。

以下是使用mqtt::client的示例代码:

mqtt::client client("tcp://broker.example.com:1883", "clientId");// Connect to the MQTT broker
client.connect();// Publish a message
client.publish("topic", "Hello, MQTT!");// Subscribe to a topic
client.subscribe("topic");// Wait for incoming messages
mqtt::message_ptr msg = client.consume_message();// Disconnect from the MQTT broker
client.disconnect();

异步客户端:mqtt::async_client

另一方面,mqtt::async_client是异步的客户端实现,它使用了异步操作和回调函数来处理发送和接收消息,不会阻塞当前线程。这意味着您可以在一个线程中同时处理多个操作,包括异步地发送和接收消息。使用mqtt::async_client可以更好地处理并发操作和复杂的MQTT应用程序。

以下是使用mqtt::async_client的示例代码:

mqtt::async_client client("tcp://broker.example.com:1883", "clientId");// Connect to the MQTT broker
mqtt::token_ptr conntok = client.connect();
conntok->wait();// Publish a message
mqtt::token_ptr pubtok = client.publish("topic", "Hello, MQTT!");
pubtok->wait();// Subscribe to a topic
mqtt::token_ptr subtok = client.subscribe("topic");
subtok->wait();// Wait for incoming messages
client.start_consuming();// Disconnect from the MQTT broker
mqtt::token_ptr disctok = client.disconnect();
disctok->wait();

虽然mqtt::async_client提供了更强大的功能和更好的并发性能,但它需要更多的代码和处理异步回调函数。如果应用程序需要处理大量的并发操作或实现复杂的MQTT逻辑,推荐使用mqtt::async_client。但如果只需要简单的同步操作或在单线程环境中运行较简单的MQTT应用程序,那么使用mqtt::client可能更合适。应该根据您的具体需求和应用程序的复杂性选择适合的客户端实现方式。

使用C++ Paho连接rabbitmq

下面是一个简单的示例代码,演示了如何使用Paho C++库连接到RabbitMQ并发布和订阅消息:

#include <iostream>
#include <cstring>
#include <cstdlib>
#include <unistd.h>
#include <mqtt/async_client.h>const std::string SERVER_ADDRESS("tcp://localhost:1883");
const std::string CLIENT_ID("paho_cpp_async");
const std::string TOPIC("test_topic");class callback : public virtual mqtt::callback, public virtual mqtt::iaction_listener
{
private:mqtt::async_client& client_;public:callback(mqtt::async_client& client) : client_(client) {}void connection_lost(const std::string& cause) override {std::cout << "\nConnection lost" << std::endl;if (!cause.empty())std::cout << "\tcause: " << cause << std::endl;}void delivery_complete(mqtt::delivery_token_ptr token) override {}void message_arrived(const mqtt::const_message_ptr msg) override {std::cout << "Message arrived: " << msg->get_payload_str() << std::endl;}void on_failure(const mqtt::token& tok) override {std::cout << "Connection attempt failed" << std::endl;}void on_success(const mqtt::token& tok) override {std::cout << "Connection attempt successful" << std::endl;// 发布消息mqtt::message_ptr pubmsg = mqtt::make_message(TOPIC, "Hello from Paho C++");pubmsg->set_qos(1);client_.publish(pubmsg);}
};int main(int argc, char* argv[])
{mqtt::async_client client(SERVER_ADDRESS, CLIENT_ID);callback cb(client);client.set_callback(cb);mqtt::connect_options connOpts;connOpts.set_clean_session(true);try {std::cout << "Connecting to the MQTT server..." << std::endl;mqtt::token_ptr conntok = client.connect(connOpts);conntok->wait();std::cout << "Connected!" << std::endl;// 订阅消息client.subscribe(TOPIC, 1);while (true) {// 持续监听消息usleep(1000000);}} catch (const mqtt::exception& exc) {std::cerr << "Error: " << exc.what() << std::endl;return 1;}return 0;
}

这个示例代码创建了一个异步的MQTT客户端,并连接到RabbitMQ服务器。它使用一个回调类来处理与连接、消息到达等事件相关的回调。在on_success回调中,它发布一条消息。在message_arrived回调中,它处理接收到的消息。

  1. 编译和运行代码:使用适当的编译器和选项,将代码编译为可执行文件。确保已经安装了Paho C++库,并在编译时链接到该库。然后运行可执行文件。

这样就可以使用Paho C++库连接到RabbitMQ并进行消息发布和订阅操作了。

本示例代码仅提供了一个基本的框架,需要根据自己的需求进行更多的自定义和错误处理。此外,确保已正确配置RabbitMQ服务器的连接参数,如服务器地址、端口和认证信息等。

补充说明

连续发送消息的注意事项

在连续发送多条消息时避免频繁断开和重连MQTT客户端。频繁的断开和重连会增加网络开销和延迟,并且可能会对MQTT服务器造成不必要的负担。保持MQTT客户端的持久连接,并在需要时使用同一连接发送多条消息。

以下示例展示如何保持MQTT客户端的持久连接,并连续发送多条消息:

mqtt::async_client client("tcp://broker.example.com:1883", "clientId");// Connect to the MQTT broker
mqtt::connect_options connOpts;
connOpts.set_keep_alive_interval(60);  // 设置心跳间隔为60秒
connOpts.set_clean_session(false);     // 设置为非清理会话模式
mqtt::token_ptr conntok = client.connect(connOpts);
conntok->wait();// Publish multiple messages
for (int i = 0; i < 10; ++i) {std::string topic = "topic";std::string payload = "Message " + std::to_string(i);mqtt::message_ptr pubmsg = mqtt::make_message(topic, payload);mqtt::token_ptr pubtok = client.publish(pubmsg);pubtok->wait();
}// Disconnect from the MQTT broker
mqtt::token_ptr disctok = client.disconnect();
disctok->wait();

在上述示例中,使用set_keep_alive_interval()方法设置了心跳间隔为60秒,这样可以确保MQTT客户端与服务器保持持久连接。我们还使用set_clean_session(false)将客户端设置为非清理会话模式,这样即使客户端断开连接,会话状态也会保留。

然后,我们使用一个循环来连续发布多条消息。每次循环中,我们创建一个新的消息并使用client.publish()发送。通过保持持久连接,我们可以在同一连接上连续发送多条消息,而不需要频繁断开和重连。

最后,使用client.disconnect()来断开与MQTT服务器的连接。

具体的MQTT应用程序可能会有不同的需求和限制。如果应用程序需要在发送消息之间有较长的时间间隔,或者需要处理长时间的非活动状态,那么可以考虑在一段时间后断开连接,并在需要时重新连接。为了连续发送多条消息,建议保持MQTT客户端的持久连接,避免频繁断开和重连。这样可以减少网络开销并提高性能。

如果无需同步,是否需要wait

如果不需要等待异步操作完成,那么在使用mqtt::token对象的时候可以选择不调用wait方法。wait方法会阻塞当前线程,直到操作完成或发生超时。如果您不调用wait方法,程序将继续执行下一行代码,而不会等待操作完成。

以下是一个不调用wait方法的情况:

mqtt::async_client client("tcp://broker.example.com:1883", "clientId");// Connect to the MQTT broker
mqtt::token_ptr conntok = client.connect();
// 不调用wait方法,程序继续执行下一行代码// Publish a message
mqtt::token_ptr pubtok = client.publish("topic", "Hello, MQTT!");
// 不调用wait方法,程序继续执行下一行代码// Subscribe to a topic
mqtt::token_ptr subtok = client.subscribe("topic");
// 不调用wait方法,程序继续执行下一行代码// Wait for incoming messages
client.start_consuming();// Disconnect from the MQTT broker
mqtt::token_ptr disctok = client.disconnect();
// 不调用wait方法,程序继续执行下一行代码

上述示例创建了一个mqtt::async_client对象,并进行了连接、发布消息、订阅主题和断开连接的操作。但是我们没有调用wait方法等待操作完成。这意味着程序将继续执行下一行代码,而不会等待操作完成。

请注意,如果不调用wait方法,将无法确定操作是否成功完成。如果需要获取操作的结果或处理操作的回调函数,可能需要调用wait_for_completion方法或使用其他适当的方式来处理异步操作的结果。

相关文章:

C++的MQTT开发:使用Paho的C++接口实现消息发布、订阅、连接RabbitMQ

C Paho实现MQTT消息发布功能 要使用paho的cpp接口实现发布MQTT消息的功能&#xff0c;需要进行以下步骤&#xff1a; 安装paho库&#xff1a;首先从paho官方网站下载并安装paho的C库。可以从https://www.eclipse.org/paho/clients/cpp/ 下载适合操作系统的版本。 创建MQTT客户…...

深度网络学习笔记(二)——Transformer架构详解(包括多头自注意力机制)

Transformer架构详解 前言Transformer的整体架构多头注意力机制&#xff08;Multi-Head Attention&#xff09;具体步骤1. 步骤12. 步骤23. 步骤34. 步骤4 Self-Attention应用与比较Self-Attention用于图像处理Self-Attention vs. CNNSelf-Attention vs. RNN Transformer架构详…...

Python 快速查找并替换Excel中的数据

Excel中的查找替换是一个非常实用的功能&#xff0c;能够帮助用户快速完成大量数据的整理和处理工作&#xff0c;避免手动逐一修改数据的麻烦&#xff0c;提高工作效率。要使用Python实现这一功能&#xff0c; 我们可以借助Spire.XLS for Python 库&#xff0c;具体操作如下&am…...

KafkaStream Local Store和Global Store区别和用法

前言 使用kafkaStream进行流式计算时&#xff0c;如果需要对数据进行状态处理&#xff0c;那么常用的会遇到kafkaStream的store&#xff0c;而store也有Local Store以及Global Store&#xff0c;当然也可以使用其他方案的来进行状态保存&#xff0c;文本主要理清楚kafkaStream…...

PowerDesigner导入Excel模板生成数据表

PowerDesigner导入Excel模板生成数据表 1.准备好需要导入的Excel表结构数据,模板内容如下图所示 2.打开PowerDesigner,新建一个physical data model文件,填入文件名称,选择数据库类型 3.点击Tools|Execute Commands|Edit/Run Script菜单或按下快捷键Ctrl Shift X打开脚本窗口…...

STM32 HAL库开发——入门篇(3):OLED、LCD

源自正点原子视频教程&#xff1a; 【正点原子】手把手教你学STM32 HAL库开发全集【真人出镜】STM32入门教学视频教程 单片机 嵌入式_哔哩哔哩_bilibili 一、OLED 二、内存保护&#xff08;MPU&#xff09;实验 2.1 内存保护单元 三、LCD 3.1 显示屏分类 3.2 LCD简介 3.3 LCD…...

在Linux中查找文件命令的几种方法

要在Linux中查找文件&#xff0c;可以使用以下几种不同的实现方法&#xff1a; 1. 使用find命令&#xff1a; find <搜索路径> <搜索选项> <搜索条件><搜索路径>&#xff1a;表示要搜索的起始路径&#xff0c;可以是一个具体的目录路径&#xff0c;也…...

【TB作品】MSP430F5529 单片机,温度控制系统,DS18B20,使用MSP430实现的智能温度控制系统

作品功能 这个智能温度控制系统基于MSP430单片机设计&#xff0c;能够实时监测环境温度并根据预设的温度报警值自动调节风扇和加热片的工作状态。主要功能包括&#xff1a; 实时显示当前温度。通过OLED屏幕显示温度报警值。通过按键设置温度报警值。实际温度超过报警值时&…...

立创小tips

立创小tips 原理图中 1-修改图纸属性 保存完&#xff0c;绘制原理图的界面就出现了&#xff0c;然后我们鼠标点击原理图的边缘变成红色就可以高边表格的属性了。 2-鼠标右键可以移动整个原理图 3-查看封装 点击任意一个元器件&#xff0c;在右侧就会显示封装属性&#xff…...

Html/HTML5常用标签的学习

课程目标 项目实战&#xff0c;肯定就需要静态网页。朝着做项目方式去学习静态网页。 01、编写第一个html工程结构化 cssjsimages/imgindex.html 归档存储和结构清晰就可以。 02、HTML标签分类 认知&#xff1a;标签为什么要分类&#xff0c;原因因为&#xff1a;分门别类…...

Tomcat 配置:一文掌握所有要点

引言 Apache Tomcat 是一个流行的开源 Java Servlet 容器和 Web 服务器&#xff0c;广泛用于开发和部署 Java Web 应用程序。正确配置 Tomcat 是确保其性能、安全性和稳定性的关键。本文将详细介绍 Tomcat 的各项配置&#xff0c;帮助您优化和管理 Tomcat 服务器。 一、Tomca…...

git 大文件上传失败 Please remove the file from history and try again.

根据提示执行命令 --- 查找到当前文件 git rev-list --objects --all | grep b24e74b34e7d482e2bc687e017c8ab28cd1d24b6git filter-branch --tree-filter rm -f 文件名 --tag-name-filter cat -- --all git push origin --tags --force git push origin --all --force...

骑砍2霸主MOD开发(14)-进击的巨人

一.巨人 sbyte boneIndex Skeleton.GetBoneIndexFromName(Mission.MainAgent.AgentVisuals.GetSkeleton().GetName(), "r_hand"); cp Mission.MainAgent.AgentVisuals.AddPrefabToAgentVisualBoneByRealBoneIndex("p_sword_a", boneIndex); float agent…...

Android 可拖拽的View,限制在父布局中随意拖拽;拖拽结束后可左右吸边;

实现方法一&#xff1a;自定义View 可随意拖动拖拽的View&#xff0c;限制拖动范围是父布局中&#xff1b; import android.content.Context; import android.util.AttributeSet; import android.util.Log; import android.view.MotionEvent; import android.view.ViewGroup; …...

逐步更新动画混合参数(Blend)使其平滑地过渡到目标值

1.具体实现 逐步更新一个动画混合参数&#xff08;Blend&#xff09;&#xff0c;使其平滑地过渡到目标值&#xff0c;可以实现角色动作的平滑过渡&#xff0c;比如从走路过渡到跑步。 private float currentBleng;private float targetBlend;public float accelerSpeed 5;//…...

【多模态/CV】图像数据增强数据分析和处理

note 多模态大模型训练前&#xff0c;图片数据处理的常见操作&#xff1a;分辨率调整、网格畸变、水平翻转、分辨率调整、随机crop、换颜色、多张图片拼接、相似图片检测并去重等 一、分辨率调整 from PIL import Image def resize_image(original_image_path, save_image_p…...

代码随想录——修建二叉搜素树(Leetcode669)

题目链接 递归 /*** Definition for a binary tree node.* public class TreeNode {* int val;* TreeNode left;* TreeNode right;* TreeNode() {}* TreeNode(int val) { this.val val; }* TreeNode(int val, TreeNode left, TreeNode right) {* …...

EasyExcel导出多个sheet封装

导出多个sheet 在需求中&#xff0c;会有需要导出多种sheet的情况&#xff0c;那么这里使用easyexcel进行整合 步骤 1、导入依赖 <dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><d…...

【Python错误】:AttributeError: ‘generator‘ object has no attribute ‘next‘解决办法

【Python错误】&#xff1a;AttributeError: ‘generator’ object has no attribute next’解决办法 在Python中&#xff0c;生成器是一种使用yield语句的特殊迭代器&#xff0c;它允许你在函数中产生一个值序列&#xff0c;而无需一次性创建并返回整个列表。然而&#xff0c;…...

如何配置Feign以实现服务调试

1、引入依赖 在项目中&#xff0c;需要引入Spring Cloud OpenFeign的依赖。这通常是通过在pom.xml文件中添加相应的Maven依赖来完成的。例如&#xff1a; <dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starte…...

STM32标准库-DMA直接存储器存取

文章目录 一、DMA1.1简介1.2存储器映像1.3DMA框图1.4DMA基本结构1.5DMA请求1.6数据宽度与对齐1.7数据转运DMA1.8ADC扫描模式DMA 二、数据转运DMA2.1接线图2.2代码2.3相关API 一、DMA 1.1简介 DMA&#xff08;Direct Memory Access&#xff09;直接存储器存取 DMA可以提供外设…...

MMaDA: Multimodal Large Diffusion Language Models

CODE &#xff1a; https://github.com/Gen-Verse/MMaDA Abstract 我们介绍了一种新型的多模态扩散基础模型MMaDA&#xff0c;它被设计用于在文本推理、多模态理解和文本到图像生成等不同领域实现卓越的性能。该方法的特点是三个关键创新:(i) MMaDA采用统一的扩散架构&#xf…...

【SQL学习笔记1】增删改查+多表连接全解析(内附SQL免费在线练习工具)

可以使用Sqliteviz这个网站免费编写sql语句&#xff0c;它能够让用户直接在浏览器内练习SQL的语法&#xff0c;不需要安装任何软件。 链接如下&#xff1a; sqliteviz 注意&#xff1a; 在转写SQL语法时&#xff0c;关键字之间有一个特定的顺序&#xff0c;这个顺序会影响到…...

Axios请求超时重发机制

Axios 超时重新请求实现方案 在 Axios 中实现超时重新请求可以通过以下几种方式&#xff1a; 1. 使用拦截器实现自动重试 import axios from axios;// 创建axios实例 const instance axios.create();// 设置超时时间 instance.defaults.timeout 5000;// 最大重试次数 cons…...

华为云Flexus+DeepSeek征文|DeepSeek-V3/R1 商用服务开通全流程与本地部署搭建

华为云FlexusDeepSeek征文&#xff5c;DeepSeek-V3/R1 商用服务开通全流程与本地部署搭建 前言 如今大模型其性能出色&#xff0c;华为云 ModelArts Studio_MaaS大模型即服务平台华为云内置了大模型&#xff0c;能助力我们轻松驾驭 DeepSeek-V3/R1&#xff0c;本文中将分享如何…...

使用 Streamlit 构建支持主流大模型与 Ollama 的轻量级统一平台

🎯 使用 Streamlit 构建支持主流大模型与 Ollama 的轻量级统一平台 📌 项目背景 随着大语言模型(LLM)的广泛应用,开发者常面临多个挑战: 各大模型(OpenAI、Claude、Gemini、Ollama)接口风格不统一;缺乏一个统一平台进行模型调用与测试;本地模型 Ollama 的集成与前…...

省略号和可变参数模板

本文主要介绍如何展开可变参数的参数包 1.C语言的va_list展开可变参数 #include <iostream> #include <cstdarg>void printNumbers(int count, ...) {// 声明va_list类型的变量va_list args;// 使用va_start将可变参数写入变量argsva_start(args, count);for (in…...

HubSpot推出与ChatGPT的深度集成引发兴奋与担忧

上周三&#xff0c;HubSpot宣布已构建与ChatGPT的深度集成&#xff0c;这一消息在HubSpot用户和营销技术观察者中引发了极大的兴奋&#xff0c;但同时也存在一些关于数据安全的担忧。 许多网络声音声称&#xff0c;这对SaaS应用程序和人工智能而言是一场范式转变。 但向任何技…...

ubuntu22.04有线网络无法连接,图标也没了

今天突然无法有线网络无法连接任何设备&#xff0c;并且图标都没了 错误案例 往上一顿搜索&#xff0c;试了很多博客都不行&#xff0c;比如 Ubuntu22.04右上角网络图标消失 最后解决的办法 下载网卡驱动&#xff0c;重新安装 操作步骤 查看自己网卡的型号 lspci | gre…...

k8s从入门到放弃之HPA控制器

k8s从入门到放弃之HPA控制器 Kubernetes中的Horizontal Pod Autoscaler (HPA)控制器是一种用于自动扩展部署、副本集或复制控制器中Pod数量的机制。它可以根据观察到的CPU利用率&#xff08;或其他自定义指标&#xff09;来调整这些对象的规模&#xff0c;从而帮助应用程序在负…...