unity 中使用zeroMq和Mqtt 进行通讯
最近我在做一个车上的HMI项目,也就是车机应用,需要与云端和域控进行通信。HMI的功能已经外包了,但消息的统一层留给我自己来做。因为项目组其他人都没有经验,所以这个任务就落到了我头上,尽管我自己也没有太多经验,但也没办法,只能直接上手了。大概架构如下

1、整体设计
设计原则很简单,通过阻塞队列进行交互,因为要和第三方进行对接,这里通过topic和 bytes 进行交互,让他们自己转化 看下消息队列定义,也没啥,就是两个阻塞队列,通过队列和第三方交互。
public class MsgQueue
{// 用于存储接收到的二进制消息private static BlockingCollection<Msg> recMessageQueue = new BlockingCollection<Msg>();private static BlockingCollection<Msg> sendMessageQueue = new BlockingCollection<Msg>();public static BlockingCollection<Msg> RecvMessageQueue{get { return recMessageQueue; }}public static BlockingCollection<Msg> SendMessageQueue{get { return sendMessageQueue; }}
}
2、zeroMq功能实现
这次的需求是和域控进行通信,主要使用发布订阅模式,也就是我本地需要一个client,一个server。
2.1 zeroMq 介绍
第一次使用zeroMq ,稍微介绍下;ZeroMQ 是一个高性能的异步消息库,旨在简化分布式或多线程应用程序中的消息传递。它提供了一种灵活且高效的方式来进行数据交换,支持多种消息模式,能够在不同的进程、机器和网络之间进行通信。以下是 ZeroMQ 的一些关键特性和概念:ZeroMQ 支持多种消息模式,包括:
-
请求-响应(Req-Rep):客户端发送请求,服务器处理并回复。
-
发布-订阅(Pub-Sub):发布者发布消息,订阅者接收感兴趣的消息。
-
推送-拉取(Push-Pull):用于分布式任务处理,推送端将任务发送到拉取端。
-
管道(Pipeline):将多个组件连接起来形成数据处理管道。简单说就是一个
TCP通信的框架,可以在本地作为客户端和服务器 官方网站:https://zeromq.org/languages/csharp/
2.2 插件介绍
zeromq的在C#上主要是通过netMq库,这玩意好多年不更新了 具体地址:https://github.com/zeromq/netmq 这玩意折腾了好久,第一次上手,主要要注意版本,通过Nuget 安装
Install-Package NetMQ
不知道为什么我安装不成功,unity里还是无法使用,我直接拷贝了dll到plugins拷贝的时候注意依赖项,总共有三个,要不然会报错

2.3 代码实现
using NetMQ;
using NetMQ.Sockets;
using System;
using System.Text;
using System.Threading.Tasks;
using UnityEngine;public class ZeroStarter : MonoBehaviour
{private SubscriberSocket subscriber;private PublisherSocket publisherSocket;private bool isRunning = true;void Start(){AsyncIO.ForceDotNet.Force(); // 确保 NetMQ 在 Unity 中正确工作// 初始化发布者publisherSocket = new PublisherSocket();publisherSocket.Bind("tcp://*:5557");subscriber = new SubscriberSocket();subscriber.Connect("tcp://localhost:5556");// 启动发布和订阅任务Task.Run(() => StartPub());Task.Run(() => StartSubscriber());}private void StartPub(){while (isRunning){try{// 使用 BlockingCollection 的 Take 方法获取消息Msg message = MsgQueue.SendMessageQueue.Take();publisherSocket.SendMoreFrame(message.Topic).SendFrame(message.Data);}catch (Exception e){Debug.LogError("Error in StartPub: " + e.Message);}}publisherSocket?.Close(); // 确保发布套接字在退出时关闭}private void StartSubscriber(){subscriber.Subscribe("");while (isRunning){try{// 通过超时和 TryReceiveFrameString 检查订阅消息if (subscriber.TryReceiveFrameString(out string topic)){byte[] bytes = subscriber.ReceiveFrameBytes();MsgQueue.RecvMessageQueue.Add(new Msg(topic, bytes));string str = Encoding.Default.GetString(bytes);Debug.Log($"{topic} {str}");}Task.Delay(10).Wait(); // 使用短暂的延迟来避免过度循环}catch (Exception e){Debug.LogError("Error in StartSubscriber: " + e.Message);}}subscriber.Close(); // 手动关闭订阅者套接字}private void OnDestroy(){isRunning = false; // 设置标志位,通知任务退出// 确保发布套接字关闭publisherSocket?.Close();NetMQConfig.Cleanup(); // 清理 NetMQ}
}
这里有一个注意点就是zeroMq 不支持topic, 但是支持 SendMoreFrame(message.Topic).SendFrame(message.Data) 这个有点风险,也很奇怪,但是貌似是官方推荐的做法,不纠结,就这样吧。
3、Mqtt功能实现
3.1 emqx介绍
mqtt也是一个消息队列,主要是用在IOT,简单来说就是一个TCP服务器,消息头会小一些。适合不稳定的网络。我们用的是Emqx ,还行,挺好用。工具客户端使用的是mqttx,下载地址:https://mqttx.app/zh 官方网站:https://www.emqx.com/zh
3.2 unity插件
这里使用的是官方推荐的插件库 sdk 介绍地址:https://docs.emqx.com/zh/emqx/latest/connect-emqx/introduction.html
官方示例:https://github.com/emqx/MQTT-Client-Examples
我这里使用的是:https://github.com/eclipse/paho.mqtt.m2mqtt 我也使用nuget安装了库,也不起作用,不知道为毛,盲猜是版本的问题,打开上面库,下载之后自己编译 因为用unity,所以打开了mono的库

编译之后

将生成的dll拷贝到plugins下
3.3 代码实现
using System.Text;
using UnityEngine;
using uPLibrary.Networking.M2Mqtt;
using uPLibrary.Networking.M2Mqtt.Messages;public class MqttStarter : MonoBehaviour
{// Start is called before the first frame updatevoid Start(){string ipAddr = "192.168.3.8";int emqxPort = 1883;string clientId = "csharpclientid";//服务器默认密码是这个string username = "username";string password = "pwd";MqttClient client = new MqttClient(ipAddr, emqxPort, false, null, null, MqttSslProtocols.None);// register to message receivedclient.MqttMsgPublishReceived += client_MqttMsgPublishReceived;client.Connect(clientId,username,password);// client.Subscribe(new string[] { "idse/cloud2veh/#" }, new byte[] { MqttMsgBase.QOS_LEVEL_AT_MOST_ONCE });}private void client_MqttMsgPublishReceived(object sender, MqttMsgPublishEventArgs e){MsgQueue.RecvMessageQueue.Add(new Msg(e.Topic, e.Message));Debug.Log("mqtt--> " +e.Topic +" ==== >" + Encoding.Default.GetString(e.Message));}
}
这里需要注意的是m2mqtt 会自动开启线程,不需要单独实现线程。其他的都是常规操作。
4、验证
先看下zeromq的收发

验证下Mqtt的收发

5、Java代码zeromq的使用
pom中加入
<dependency><groupId>org.zeromq</groupId><artifactId>jeromq</artifactId><version>0.5.2</version></dependency>
zeromq 发布者
package org.example;/*** Hello world!**/
import org.zeromq.ZMQ;
import org.zeromq.ZContext;public class App {public static void main(String[] args) {int i = 0;// 创建一个ZeroMQ的上下文try (ZContext context = new ZContext()) {// 创建一个发布者socketZMQ.Socket publisher = context.createSocket(ZMQ.PUB);// 绑定到指定的端口String address = "tcp://*:5556";publisher.bind(address);System.out.println("Publisher started at " + address);// 持续发送消息int messageNumber = 0;while (!Thread.currentThread().isInterrupted()) {String message = "Message " + messageNumber;publisher.sendMore("t:"+ (messageNumber%2));publisher.send("abc".getBytes());System.out.println("Sent: " + message);// 增加消息计数并休眠1秒messageNumber++;Thread.sleep(1000);}// 关闭发布者socketpublisher.close();} catch (Exception e) {e.printStackTrace();}}
}
zeromq 订阅者
package org.example;import org.zeromq.ZMQ;
import org.zeromq.ZContext;import java.nio.ByteBuffer;public class ZmqSubscriber {public static void main(String[] args) {// 创建一个ZeroMQ的上下文try (ZContext context = new ZContext()) {// 创建一个订阅者socketZMQ.Socket subscriber = context.createSocket(ZMQ.SUB);// 连接到发布者的地址String address = "tcp://localhost:5557"; // 确保使用正确的地址subscriber.connect(address);System.out.println("Subscriber connected to " + address);// 订阅所有消息subscriber.subscribe("".getBytes()); // 订阅所有消息,可以根据需要指定主题// 持续接收消息while (!Thread.currentThread().isInterrupted()) {// 接收消息String topic = subscriber.recvStr(0);byte[] message = subscriber.recv(0);ByteBuffer wrap = ByteBuffer.wrap(message);if (message != null) {System.out.println("Received: " + topic +" " + wrap.order(java.nio.ByteOrder.LITTLE_ENDIAN).getInt());}}// 关闭订阅者socketsubscriber.close();} catch (Exception e) {e.printStackTrace();}}
}
6、总结
第一次搞zeromq的消息队列,和平常用的kafka和rocketmq 差的很多,甚至完全不在同一个讨论方向。
还有一些需要研究
-
在
unity中nuget的学习 -
c#不同平台的学习 -
启动后台线程之后无法关闭,导致
unity死掉。
byte[] bytes = BitConverter.GetBytes(66666);
-
C#侧生成的bytes 是小端序列
相关文章:
unity 中使用zeroMq和Mqtt 进行通讯
最近我在做一个车上的HMI项目,也就是车机应用,需要与云端和域控进行通信。HMI的功能已经外包了,但消息的统一层留给我自己来做。因为项目组其他人都没有经验,所以这个任务就落到了我头上,尽管我自己也没有太多经验&…...
四、k8s快速入门之Kubernetes资源清单
kubernetes中的资源 ⭐️ k8s中所有的内容都抽象为资源,资源实列化之后,叫做对象 1️⃣名称空间级别 ⭐️ kubeadm在执行k8s的pod的时候会在kube-system这个名称空间下执行,所以说当你kubectl get pod 的时候是查看不到的查看的是默认的po…...
掌握ElasticSearch(六):分析过程
文章目录 一、什么是分析1. 字符过滤 (Character Filtering)2. 分词 (Breaking into Tokens)3. 词条过滤 (Token Filtering)4. 词条索引 (Token Indexing) 二、内置分析器分类1. 标准分析器 (Standard Analyzer)2. 简单分析器 (Simple Analyzer)3. 语言分析器 (Language Analyz…...
【设计模式】使用python 实践框架设计
单一职责原则(SRP):一个类应该只有一个职责,意味着该类只应该有一个引起变化的原因。这使得代码更易于维护和理解。 开放封闭原则(OCP):软件实体(类、模块、函数等)应该…...
Apache paimon-CDC
CDC集成 paimon支持五种方式通过模式转化数据提取到paimon表中。添加的列会实时同步到Paimon表中 MySQL同步表:将MySQL中的一张或多张表同步到一张Paimon表中。MySQL同步数据库:将MySQL的整个数据库同步到一个Paimon数据库中。API同步表:将您的自定义DataStream输入同步到一…...
如何分析算法的执行效率和资源消耗
分析算法的执行效率和资源消耗可以从以下几个方面入手: 一、时间复杂度分析 定义和概念 时间复杂度是衡量算法执行时间随输入规模增长的速度的指标。它通常用大 O 符号表示,表示算法执行时间与输入规模之间的关系。例如,一个算法的时间复杂度为 O(n),表示该算法的执行时间…...
提示工程(Prompt Engineering)指南(进阶篇)
在 Prompt Engineering 的进阶阶段,我们着重关注提示的结构化、复杂任务的分解、反馈循环以及模型的高级特性利用。随着生成式 AI 技术的快速发展,Prompt Engineering 已经从基础的单一指令优化转向了更具系统性的设计思维,并应用于多轮对话、…...
音视频入门基础:FLV专题(19)——FFmpeg源码中,解码Audio Tag的AudioTagHeader,并提取AUDIODATA的实现
一、引言 从《音视频入门基础:FLV专题(18)——Audio Tag简介》可以知道,未加密的情况下,FLV文件中的一个Audio Tag Tag header AudioTagHeader AUDIODATA。本文讲述FFmpeg源码中是怎样解码Audio Tag的AudioTagHead…...
前端零基础入门到上班:【Day3】从零开始构建网页骨架HTML
HTML 基础入门:从零开始构建网页骨架 目录 1. 什么是 HTML?HTML 的核心作用 2. HTML 基本结构2.1 DOCTYPE 声明2.2 <html> 标签2.3 <head> 标签2.4 <body> 标签 3. HTML 常用标签详解3.1 标题标签3.2 段落和文本标签3.3 链接标签3.4 图…...
字符脱敏工具类
1、字符脱敏工具类 import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils;/*** 数据脱敏工具类** date 2024/10/30 13:44*/Slf4j public class DataDesensitizationUtils {public static final String STAR_1 "*";public static final …...
【jvm】jvm对象都分配在堆上吗
目录 1. 说明2. 堆上分配3. 栈上分配(逃逸分析和标量替换)4. 方法区分配5. 直接内存(非堆内存) 1. 说明 1.JVM的对象并不总是分配在堆上。2.堆是JVM用于存储对象实例的主要内存区域,存在一些特殊情况,对象…...
@AutoWired和 @Resource原理深度分析!
嗨,你好呀,我是猿java Autowired和Resource是 Java程序员经常用来实现依赖注入的两个注解,这篇文章,我们将详细分析这两个注解的工作原理、使用示例和它们之间的对比。 依赖注入概述 依赖注入是一种常见的设计模式,…...
C++设计模式创建型模式———原型模式
文章目录 一、引言二、原型模式三、总结 一、引言 与工厂模式相同,原型模式(Prototype)也是创建型模式。原型模式通过一个对象(原型对象)克隆出多个一模一样的对象。实际上,该模式与其说是一种设计模式&am…...
重学SpringBoot3-Spring WebFlux之SSE服务器发送事件
更多SpringBoot3内容请关注我的专栏:《SpringBoot3》 期待您的点赞👍收藏⭐评论✍ Spring WebFlux之SSE服务器发送事件 1. 什么是 SSE?2. Spring Boot 3 响应式编程与 SSE为什么选择响应式编程实现 SSE? 3. 实现 SSE 的基本步骤3.…...
YOLO即插即用模块---AgentAttention
Agent Attention: On the Integration of Softmax and Linear Attention 论文地址:https://arxiv.org/pdf/2312.08874 问题: 普遍使用的 Softmax 注意力机制在视觉 Transformer 模型中计算复杂度过高,限制了其在各种场景中的应用。 方法&a…...
探索开源语音识别的未来:高效利用先进的自动语音识别技术20241030
🚀 探索开源语音识别的未来:高效利用自动语音识别技术 🌟 引言 在数字化时代,语音识别技术正在引领人机交互的新潮流,为各行业带来了颠覆性的改变。开源的自动语音识别(ASR)系统,如…...
学习路之TP6--workman安装
一、安装 首先通过 composer 安装 composer require topthink/think-worker 报错: 分析:最新版本需要TP8,或装低版本的 composer require topthink/think-worker:^3.*安装后, 增加目录 vendor\workerman vendor\topthink\think-w…...
.NET内网实战:通过白名单文件反序列化漏洞绕过UAC
01阅读须知 此文所节选自小报童《.NET 内网实战攻防》专栏,主要内容有.NET在各个内网渗透阶段与Windows系统交互的方式和技巧,对内网和后渗透感兴趣的朋友们可以订阅该电子报刊,解锁更多的报刊内容。 02基本介绍 03原理分析 在渗透测试和红…...
AI Agents - 自动化项目:计划、评估和分配
Agents: Role 角色Goal 目标Backstory 背景故事 Tasks: Description 描述Expected Output 期望输出Agent 代理 Automated Project: Planning, Estimation, and Allocation Initial Imports 1.本地文件helper.py # Add your utilities or helper functions to…...
Git的.gitignore文件
一、各语言对应的.gitignore模板文件 项目地址:https://github.com/github/gitignore 二、.gitignore文件不生效 .gitignore文件只是ignore没有被追踪的文件,已被追踪的文件,要先删除缓存文件。 # 单个文件 git rm --cached file/path/to…...
KubeSphere 容器平台高可用:环境搭建与可视化操作指南
Linux_k8s篇 欢迎来到Linux的世界,看笔记好好学多敲多打,每个人都是大神! 题目:KubeSphere 容器平台高可用:环境搭建与可视化操作指南 版本号: 1.0,0 作者: 老王要学习 日期: 2025.06.05 适用环境: Ubuntu22 文档说…...
浅谈 React Hooks
React Hooks 是 React 16.8 引入的一组 API,用于在函数组件中使用 state 和其他 React 特性(例如生命周期方法、context 等)。Hooks 通过简洁的函数接口,解决了状态与 UI 的高度解耦,通过函数式编程范式实现更灵活 Rea…...
Go 语言接口详解
Go 语言接口详解 核心概念 接口定义 在 Go 语言中,接口是一种抽象类型,它定义了一组方法的集合: // 定义接口 type Shape interface {Area() float64Perimeter() float64 } 接口实现 Go 接口的实现是隐式的: // 矩形结构体…...
Linux简单的操作
ls ls 查看当前目录 ll 查看详细内容 ls -a 查看所有的内容 ls --help 查看方法文档 pwd pwd 查看当前路径 cd cd 转路径 cd .. 转上一级路径 cd 名 转换路径 …...
JVM垃圾回收机制全解析
Java虚拟机(JVM)中的垃圾收集器(Garbage Collector,简称GC)是用于自动管理内存的机制。它负责识别和清除不再被程序使用的对象,从而释放内存空间,避免内存泄漏和内存溢出等问题。垃圾收集器在Ja…...
2.Vue编写一个app
1.src中重要的组成 1.1main.ts // 引入createApp用于创建应用 import { createApp } from "vue"; // 引用App根组件 import App from ./App.vue;createApp(App).mount(#app)1.2 App.vue 其中要写三种标签 <template> <!--html--> </template>…...
跨链模式:多链互操作架构与性能扩展方案
跨链模式:多链互操作架构与性能扩展方案 ——构建下一代区块链互联网的技术基石 一、跨链架构的核心范式演进 1. 分层协议栈:模块化解耦设计 现代跨链系统采用分层协议栈实现灵活扩展(H2Cross架构): 适配层…...
Java 加密常用的各种算法及其选择
在数字化时代,数据安全至关重要,Java 作为广泛应用的编程语言,提供了丰富的加密算法来保障数据的保密性、完整性和真实性。了解这些常用加密算法及其适用场景,有助于开发者在不同的业务需求中做出正确的选择。 一、对称加密算法…...
现代密码学 | 椭圆曲线密码学—附py代码
Elliptic Curve Cryptography 椭圆曲线密码学(ECC)是一种基于有限域上椭圆曲线数学特性的公钥加密技术。其核心原理涉及椭圆曲线的代数性质、离散对数问题以及有限域上的运算。 椭圆曲线密码学是多种数字签名算法的基础,例如椭圆曲线数字签…...
WordPress插件:AI多语言写作与智能配图、免费AI模型、SEO文章生成
厌倦手动写WordPress文章?AI自动生成,效率提升10倍! 支持多语言、自动配图、定时发布,让内容创作更轻松! AI内容生成 → 不想每天写文章?AI一键生成高质量内容!多语言支持 → 跨境电商必备&am…...
