当前位置: 首页 > news >正文

unity 中使用zeroMq和Mqtt 进行通讯

最近我在做一个车上的HMI项目,也就是车机应用,需要与云端和域控进行通信。HMI的功能已经外包了,但消息的统一层留给我自己来做。因为项目组其他人都没有经验,所以这个任务就落到了我头上,尽管我自己也没有太多经验,但也没办法,只能直接上手了。大概架构如下

图片

1、整体设计

设计原则很简单,通过阻塞队列进行交互,因为要和第三方进行对接,这里通过topicbytes 进行交互,让他们自己转化 看下消息队列定义,也没啥,就是两个阻塞队列,通过队列和第三方交互。

public class MsgQueue 
{// 用于存储接收到的二进制消息private static BlockingCollection<Msg> recMessageQueue = new BlockingCollection<Msg>();private static BlockingCollection<Msg> sendMessageQueue = new BlockingCollection<Msg>();public static BlockingCollection<Msg> RecvMessageQueue{get { return recMessageQueue; }}public static BlockingCollection<Msg> SendMessageQueue{get { return sendMessageQueue; }}
}

2、zeroMq功能实现

这次的需求是和域控进行通信,主要使用发布订阅模式,也就是我本地需要一个client,一个server

2.1 zeroMq 介绍

第一次使用zeroMq ,稍微介绍下;ZeroMQ 是一个高性能的异步消息库,旨在简化分布式或多线程应用程序中的消息传递。它提供了一种灵活且高效的方式来进行数据交换,支持多种消息模式,能够在不同的进程、机器和网络之间进行通信。以下是 ZeroMQ 的一些关键特性和概念:ZeroMQ 支持多种消息模式,包括:

  • 请求-响应(Req-Rep):客户端发送请求,服务器处理并回复。

  • 发布-订阅(Pub-Sub):发布者发布消息,订阅者接收感兴趣的消息。

  • 推送-拉取(Push-Pull):用于分布式任务处理,推送端将任务发送到拉取端。

  • 管道(Pipeline):将多个组件连接起来形成数据处理管道。简单说就是一个TCP通信的框架,可以在本地作为客户端和服务器 官方网站:https://zeromq.org/languages/csharp/

2.2 插件介绍

zeromq的在C#上主要是通过netMq库,这玩意好多年不更新了 具体地址:https://github.com/zeromq/netmq 这玩意折腾了好久,第一次上手,主要要注意版本,通过Nuget 安装

Install-Package NetMQ

不知道为什么我安装不成功,unity里还是无法使用,我直接拷贝了dllplugins拷贝的时候注意依赖项,总共有三个,要不然会报错

图片

2.3 代码实现

using NetMQ;
using NetMQ.Sockets;
using System;
using System.Text;
using System.Threading.Tasks;
using UnityEngine;public class ZeroStarter : MonoBehaviour
{private SubscriberSocket subscriber;private PublisherSocket publisherSocket;private bool isRunning = true;void Start(){AsyncIO.ForceDotNet.Force(); // 确保 NetMQ 在 Unity 中正确工作// 初始化发布者publisherSocket = new PublisherSocket();publisherSocket.Bind("tcp://*:5557");subscriber = new SubscriberSocket();subscriber.Connect("tcp://localhost:5556");// 启动发布和订阅任务Task.Run(() => StartPub());Task.Run(() => StartSubscriber());}private void StartPub(){while (isRunning){try{// 使用 BlockingCollection 的 Take 方法获取消息Msg message = MsgQueue.SendMessageQueue.Take();publisherSocket.SendMoreFrame(message.Topic).SendFrame(message.Data);}catch (Exception e){Debug.LogError("Error in StartPub: " + e.Message);}}publisherSocket?.Close(); // 确保发布套接字在退出时关闭}private void StartSubscriber(){subscriber.Subscribe("");while (isRunning){try{// 通过超时和 TryReceiveFrameString 检查订阅消息if (subscriber.TryReceiveFrameString(out string topic)){byte[] bytes = subscriber.ReceiveFrameBytes();MsgQueue.RecvMessageQueue.Add(new Msg(topic, bytes));string str = Encoding.Default.GetString(bytes);Debug.Log($"{topic} {str}");}Task.Delay(10).Wait(); // 使用短暂的延迟来避免过度循环}catch (Exception e){Debug.LogError("Error in StartSubscriber: " + e.Message);}}subscriber.Close(); // 手动关闭订阅者套接字}private void OnDestroy(){isRunning = false; // 设置标志位,通知任务退出// 确保发布套接字关闭publisherSocket?.Close();NetMQConfig.Cleanup(); // 清理 NetMQ}
}

这里有一个注意点就是zeroMq 不支持topic, 但是支持 SendMoreFrame(message.Topic).SendFrame(message.Data) 这个有点风险,也很奇怪,但是貌似是官方推荐的做法,不纠结,就这样吧。

3、Mqtt功能实现

3.1 emqx介绍

mqtt也是一个消息队列,主要是用在IOT,简单来说就是一个TCP服务器,消息头会小一些。适合不稳定的网络。我们用的是Emqx ,还行,挺好用。工具客户端使用的是mqttx,下载地址:https://mqttx.app/zh 官方网站:https://www.emqx.com/zh

3.2 unity插件

这里使用的是官方推荐的插件库 sdk 介绍地址:https://docs.emqx.com/zh/emqx/latest/connect-emqx/introduction.html

官方示例:https://github.com/emqx/MQTT-Client-Examples

我这里使用的是:https://github.com/eclipse/paho.mqtt.m2mqtt 我也使用nuget安装了库,也不起作用,不知道为毛,盲猜是版本的问题,打开上面库,下载之后自己编译 因为用unity,所以打开了mono的库

图片

编译之后

图片

将生成的dll拷贝到plugins

3.3 代码实现

using System.Text;
using UnityEngine;
using uPLibrary.Networking.M2Mqtt;
using uPLibrary.Networking.M2Mqtt.Messages;public class MqttStarter : MonoBehaviour
{// Start is called before the first frame updatevoid Start(){string ipAddr = "192.168.3.8";int emqxPort = 1883;string clientId = "csharpclientid";//服务器默认密码是这个string username = "username";string password = "pwd";MqttClient client = new MqttClient(ipAddr, emqxPort,  false, null, null, MqttSslProtocols.None);// register to message receivedclient.MqttMsgPublishReceived += client_MqttMsgPublishReceived;client.Connect(clientId,username,password);// client.Subscribe(new string[] { "idse/cloud2veh/#" }, new byte[] { MqttMsgBase.QOS_LEVEL_AT_MOST_ONCE });}private void client_MqttMsgPublishReceived(object sender, MqttMsgPublishEventArgs e){MsgQueue.RecvMessageQueue.Add(new Msg(e.Topic, e.Message));Debug.Log("mqtt--> " +e.Topic +"  ==== >" + Encoding.Default.GetString(e.Message));}
}

这里需要注意的是m2mqtt 会自动开启线程,不需要单独实现线程。其他的都是常规操作。

4、验证

先看下zeromq的收发

图片

验证下Mqtt的收发

图片

5、Java代码zeromq的使用

pom中加入

   <dependency><groupId>org.zeromq</groupId><artifactId>jeromq</artifactId><version>0.5.2</version></dependency>

zeromq 发布者

package org.example;/*** Hello world!**/
import org.zeromq.ZMQ;
import org.zeromq.ZContext;public class App {public static void main(String[] args) {int i = 0;// 创建一个ZeroMQ的上下文try (ZContext context = new ZContext()) {// 创建一个发布者socketZMQ.Socket publisher = context.createSocket(ZMQ.PUB);// 绑定到指定的端口String address = "tcp://*:5556";publisher.bind(address);System.out.println("Publisher started at " + address);// 持续发送消息int messageNumber = 0;while (!Thread.currentThread().isInterrupted()) {String message = "Message " + messageNumber;publisher.sendMore("t:"+ (messageNumber%2));publisher.send("abc".getBytes());System.out.println("Sent: " + message);// 增加消息计数并休眠1秒messageNumber++;Thread.sleep(1000);}// 关闭发布者socketpublisher.close();} catch (Exception e) {e.printStackTrace();}}
}

zeromq 订阅者

package org.example;import org.zeromq.ZMQ;
import org.zeromq.ZContext;import java.nio.ByteBuffer;public class ZmqSubscriber {public static void main(String[] args) {// 创建一个ZeroMQ的上下文try (ZContext context = new ZContext()) {// 创建一个订阅者socketZMQ.Socket subscriber = context.createSocket(ZMQ.SUB);// 连接到发布者的地址String address = "tcp://localhost:5557"; // 确保使用正确的地址subscriber.connect(address);System.out.println("Subscriber connected to " + address);// 订阅所有消息subscriber.subscribe("".getBytes()); // 订阅所有消息,可以根据需要指定主题// 持续接收消息while (!Thread.currentThread().isInterrupted()) {// 接收消息String topic = subscriber.recvStr(0);byte[] message = subscriber.recv(0);ByteBuffer wrap = ByteBuffer.wrap(message);if (message != null) {System.out.println("Received: " + topic +"  " + wrap.order(java.nio.ByteOrder.LITTLE_ENDIAN).getInt());}}// 关闭订阅者socketsubscriber.close();} catch (Exception e) {e.printStackTrace();}}
}

6、总结

第一次搞zeromq的消息队列,和平常用的kafkarocketmq 差的很多,甚至完全不在同一个讨论方向。

还有一些需要研究

  • unitynuget的学习

  • c# 不同平台的学习 

  • 启动后台线程之后无法关闭,导致unity死掉。

byte[] bytes = BitConverter.GetBytes(66666);
  • C#侧生成的bytes 是小端序列

相关文章:

unity 中使用zeroMq和Mqtt 进行通讯

最近我在做一个车上的HMI项目&#xff0c;也就是车机应用&#xff0c;需要与云端和域控进行通信。HMI的功能已经外包了&#xff0c;但消息的统一层留给我自己来做。因为项目组其他人都没有经验&#xff0c;所以这个任务就落到了我头上&#xff0c;尽管我自己也没有太多经验&…...

四、k8s快速入门之Kubernetes资源清单

kubernetes中的资源 ⭐️ k8s中所有的内容都抽象为资源&#xff0c;资源实列化之后&#xff0c;叫做对象 1️⃣名称空间级别 ⭐️ kubeadm在执行k8s的pod的时候会在kube-system这个名称空间下执行&#xff0c;所以说当你kubectl get pod 的时候是查看不到的查看的是默认的po…...

掌握ElasticSearch(六):分析过程

文章目录 一、什么是分析1. 字符过滤 (Character Filtering)2. 分词 (Breaking into Tokens)3. 词条过滤 (Token Filtering)4. 词条索引 (Token Indexing) 二、内置分析器分类1. 标准分析器 (Standard Analyzer)2. 简单分析器 (Simple Analyzer)3. 语言分析器 (Language Analyz…...

【设计模式】使用python 实践框架设计

单一职责原则&#xff08;SRP&#xff09;&#xff1a;一个类应该只有一个职责&#xff0c;意味着该类只应该有一个引起变化的原因。这使得代码更易于维护和理解。 开放封闭原则&#xff08;OCP&#xff09;&#xff1a;软件实体&#xff08;类、模块、函数等&#xff09;应该…...

Apache paimon-CDC

CDC集成 paimon支持五种方式通过模式转化数据提取到paimon表中。添加的列会实时同步到Paimon表中 MySQL同步表:将MySQL中的一张或多张表同步到一张Paimon表中。MySQL同步数据库:将MySQL的整个数据库同步到一个Paimon数据库中。API同步表:将您的自定义DataStream输入同步到一…...

如何分析算法的执行效率和资源消耗

分析算法的执行效率和资源消耗可以从以下几个方面入手: 一、时间复杂度分析 定义和概念 时间复杂度是衡量算法执行时间随输入规模增长的速度的指标。它通常用大 O 符号表示,表示算法执行时间与输入规模之间的关系。例如,一个算法的时间复杂度为 O(n),表示该算法的执行时间…...

提示工程(Prompt Engineering)指南(进阶篇)

在 Prompt Engineering 的进阶阶段&#xff0c;我们着重关注提示的结构化、复杂任务的分解、反馈循环以及模型的高级特性利用。随着生成式 AI 技术的快速发展&#xff0c;Prompt Engineering 已经从基础的单一指令优化转向了更具系统性的设计思维&#xff0c;并应用于多轮对话、…...

音视频入门基础:FLV专题(19)——FFmpeg源码中,解码Audio Tag的AudioTagHeader,并提取AUDIODATA的实现

一、引言 从《音视频入门基础&#xff1a;FLV专题&#xff08;18&#xff09;——Audio Tag简介》可以知道&#xff0c;未加密的情况下&#xff0c;FLV文件中的一个Audio Tag Tag header AudioTagHeader AUDIODATA。本文讲述FFmpeg源码中是怎样解码Audio Tag的AudioTagHead…...

前端零基础入门到上班:【Day3】从零开始构建网页骨架HTML

HTML 基础入门&#xff1a;从零开始构建网页骨架 目录 1. 什么是 HTML&#xff1f;HTML 的核心作用 2. HTML 基本结构2.1 DOCTYPE 声明2.2 <html> 标签2.3 <head> 标签2.4 <body> 标签 3. HTML 常用标签详解3.1 标题标签3.2 段落和文本标签3.3 链接标签3.4 图…...

字符脱敏工具类

1、字符脱敏工具类 import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils;/*** 数据脱敏工具类** date 2024/10/30 13:44*/Slf4j public class DataDesensitizationUtils {public static final String STAR_1 "*";public static final …...

【jvm】jvm对象都分配在堆上吗

目录 1. 说明2. 堆上分配3. 栈上分配&#xff08;逃逸分析和标量替换&#xff09;4. 方法区分配5. 直接内存&#xff08;非堆内存&#xff09; 1. 说明 1.JVM的对象并不总是分配在堆上。2.堆是JVM用于存储对象实例的主要内存区域&#xff0c;存在一些特殊情况&#xff0c;对象…...

@AutoWired和 @Resource原理深度分析!

嗨&#xff0c;你好呀&#xff0c;我是猿java Autowired和Resource是 Java程序员经常用来实现依赖注入的两个注解&#xff0c;这篇文章&#xff0c;我们将详细分析这两个注解的工作原理、使用示例和它们之间的对比。 依赖注入概述 依赖注入是一种常见的设计模式&#xff0c;…...

C++设计模式创建型模式———原型模式

文章目录 一、引言二、原型模式三、总结 一、引言 与工厂模式相同&#xff0c;原型模式&#xff08;Prototype&#xff09;也是创建型模式。原型模式通过一个对象&#xff08;原型对象&#xff09;克隆出多个一模一样的对象。实际上&#xff0c;该模式与其说是一种设计模式&am…...

重学SpringBoot3-Spring WebFlux之SSE服务器发送事件

更多SpringBoot3内容请关注我的专栏&#xff1a;《SpringBoot3》 期待您的点赞&#x1f44d;收藏⭐评论✍ Spring WebFlux之SSE服务器发送事件 1. 什么是 SSE&#xff1f;2. Spring Boot 3 响应式编程与 SSE为什么选择响应式编程实现 SSE&#xff1f; 3. 实现 SSE 的基本步骤3.…...

YOLO即插即用模块---AgentAttention

Agent Attention: On the Integration of Softmax and Linear Attention 论文地址&#xff1a;https://arxiv.org/pdf/2312.08874 问题&#xff1a; 普遍使用的 Softmax 注意力机制在视觉 Transformer 模型中计算复杂度过高&#xff0c;限制了其在各种场景中的应用。 方法&a…...

探索开源语音识别的未来:高效利用先进的自动语音识别技术20241030

&#x1f680; 探索开源语音识别的未来&#xff1a;高效利用自动语音识别技术 &#x1f31f; 引言 在数字化时代&#xff0c;语音识别技术正在引领人机交互的新潮流&#xff0c;为各行业带来了颠覆性的改变。开源的自动语音识别&#xff08;ASR&#xff09;系统&#xff0c;如…...

学习路之TP6--workman安装

一、安装 首先通过 composer 安装 composer require topthink/think-worker 报错&#xff1a; 分析&#xff1a;最新版本需要TP8&#xff0c;或装低版本的 composer require topthink/think-worker:^3.*安装后&#xff0c; 增加目录 vendor\workerman vendor\topthink\think-w…...

.NET内网实战:通过白名单文件反序列化漏洞绕过UAC

01阅读须知 此文所节选自小报童《.NET 内网实战攻防》专栏&#xff0c;主要内容有.NET在各个内网渗透阶段与Windows系统交互的方式和技巧&#xff0c;对内网和后渗透感兴趣的朋友们可以订阅该电子报刊&#xff0c;解锁更多的报刊内容。 02基本介绍 03原理分析 在渗透测试和红…...

AI Agents - 自动化项目:计划、评估和分配

Agents: Role 角色Goal 目标Backstory 背景故事 Tasks&#xff1a; Description 描述Expected Output 期望输出Agent 代理 Automated Project: Planning, Estimation, and Allocation Initial Imports 1.本地文件helper.py # Add your utilities or helper functions to…...

Git的.gitignore文件

一、各语言对应的.gitignore模板文件 项目地址&#xff1a;https://github.com/github/gitignore 二、.gitignore文件不生效 .gitignore文件只是ignore没有被追踪的文件&#xff0c;已被追踪的文件&#xff0c;要先删除缓存文件。 # 单个文件 git rm --cached file/path/to…...

KubeSphere 容器平台高可用:环境搭建与可视化操作指南

Linux_k8s篇 欢迎来到Linux的世界&#xff0c;看笔记好好学多敲多打&#xff0c;每个人都是大神&#xff01; 题目&#xff1a;KubeSphere 容器平台高可用&#xff1a;环境搭建与可视化操作指南 版本号: 1.0,0 作者: 老王要学习 日期: 2025.06.05 适用环境: Ubuntu22 文档说…...

浅谈 React Hooks

React Hooks 是 React 16.8 引入的一组 API&#xff0c;用于在函数组件中使用 state 和其他 React 特性&#xff08;例如生命周期方法、context 等&#xff09;。Hooks 通过简洁的函数接口&#xff0c;解决了状态与 UI 的高度解耦&#xff0c;通过函数式编程范式实现更灵活 Rea…...

Go 语言接口详解

Go 语言接口详解 核心概念 接口定义 在 Go 语言中&#xff0c;接口是一种抽象类型&#xff0c;它定义了一组方法的集合&#xff1a; // 定义接口 type Shape interface {Area() float64Perimeter() float64 } 接口实现 Go 接口的实现是隐式的&#xff1a; // 矩形结构体…...

Linux简单的操作

ls ls 查看当前目录 ll 查看详细内容 ls -a 查看所有的内容 ls --help 查看方法文档 pwd pwd 查看当前路径 cd cd 转路径 cd .. 转上一级路径 cd 名 转换路径 …...

JVM垃圾回收机制全解析

Java虚拟机&#xff08;JVM&#xff09;中的垃圾收集器&#xff08;Garbage Collector&#xff0c;简称GC&#xff09;是用于自动管理内存的机制。它负责识别和清除不再被程序使用的对象&#xff0c;从而释放内存空间&#xff0c;避免内存泄漏和内存溢出等问题。垃圾收集器在Ja…...

2.Vue编写一个app

1.src中重要的组成 1.1main.ts // 引入createApp用于创建应用 import { createApp } from "vue"; // 引用App根组件 import App from ./App.vue;createApp(App).mount(#app)1.2 App.vue 其中要写三种标签 <template> <!--html--> </template>…...

跨链模式:多链互操作架构与性能扩展方案

跨链模式&#xff1a;多链互操作架构与性能扩展方案 ——构建下一代区块链互联网的技术基石 一、跨链架构的核心范式演进 1. 分层协议栈&#xff1a;模块化解耦设计 现代跨链系统采用分层协议栈实现灵活扩展&#xff08;H2Cross架构&#xff09;&#xff1a; 适配层&#xf…...

Java 加密常用的各种算法及其选择

在数字化时代&#xff0c;数据安全至关重要&#xff0c;Java 作为广泛应用的编程语言&#xff0c;提供了丰富的加密算法来保障数据的保密性、完整性和真实性。了解这些常用加密算法及其适用场景&#xff0c;有助于开发者在不同的业务需求中做出正确的选择。​ 一、对称加密算法…...

现代密码学 | 椭圆曲线密码学—附py代码

Elliptic Curve Cryptography 椭圆曲线密码学&#xff08;ECC&#xff09;是一种基于有限域上椭圆曲线数学特性的公钥加密技术。其核心原理涉及椭圆曲线的代数性质、离散对数问题以及有限域上的运算。 椭圆曲线密码学是多种数字签名算法的基础&#xff0c;例如椭圆曲线数字签…...

WordPress插件:AI多语言写作与智能配图、免费AI模型、SEO文章生成

厌倦手动写WordPress文章&#xff1f;AI自动生成&#xff0c;效率提升10倍&#xff01; 支持多语言、自动配图、定时发布&#xff0c;让内容创作更轻松&#xff01; AI内容生成 → 不想每天写文章&#xff1f;AI一键生成高质量内容&#xff01;多语言支持 → 跨境电商必备&am…...