unity 中使用zeroMq和Mqtt 进行通讯
最近我在做一个车上的HMI项目,也就是车机应用,需要与云端和域控进行通信。HMI的功能已经外包了,但消息的统一层留给我自己来做。因为项目组其他人都没有经验,所以这个任务就落到了我头上,尽管我自己也没有太多经验,但也没办法,只能直接上手了。大概架构如下
1、整体设计
设计原则很简单,通过阻塞队列进行交互,因为要和第三方进行对接,这里通过topic
和 bytes
进行交互,让他们自己转化 看下消息队列定义,也没啥,就是两个阻塞队列,通过队列和第三方交互。
public class MsgQueue
{// 用于存储接收到的二进制消息private static BlockingCollection<Msg> recMessageQueue = new BlockingCollection<Msg>();private static BlockingCollection<Msg> sendMessageQueue = new BlockingCollection<Msg>();public static BlockingCollection<Msg> RecvMessageQueue{get { return recMessageQueue; }}public static BlockingCollection<Msg> SendMessageQueue{get { return sendMessageQueue; }}
}
2、zeroMq功能实现
这次的需求是和域控进行通信,主要使用发布订阅模式,也就是我本地需要一个client
,一个server
。
2.1 zeroMq 介绍
第一次使用zeroMq
,稍微介绍下;ZeroMQ
是一个高性能的异步消息库,旨在简化分布式或多线程应用程序中的消息传递。它提供了一种灵活且高效的方式来进行数据交换,支持多种消息模式,能够在不同的进程、机器和网络之间进行通信。以下是 ZeroMQ
的一些关键特性和概念:ZeroMQ
支持多种消息模式,包括:
-
请求-响应(Req-Rep):客户端发送请求,服务器处理并回复。
-
发布-订阅(Pub-Sub):发布者发布消息,订阅者接收感兴趣的消息。
-
推送-拉取(Push-Pull):用于分布式任务处理,推送端将任务发送到拉取端。
-
管道(Pipeline):将多个组件连接起来形成数据处理管道。简单说就是一个
TCP
通信的框架,可以在本地作为客户端和服务器 官方网站:https://zeromq.org/languages/csharp/
2.2 插件介绍
zeromq
的在C#
上主要是通过netMq
库,这玩意好多年不更新了 具体地址:https://github.com/zeromq/netmq 这玩意折腾了好久,第一次上手,主要要注意版本,通过Nuget 安装
Install-Package NetMQ
不知道为什么我安装不成功,unity里还是无法使用,我直接拷贝了dll
到plugins
拷贝的时候注意依赖项,总共有三个,要不然会报错
2.3 代码实现
using NetMQ;
using NetMQ.Sockets;
using System;
using System.Text;
using System.Threading.Tasks;
using UnityEngine;public class ZeroStarter : MonoBehaviour
{private SubscriberSocket subscriber;private PublisherSocket publisherSocket;private bool isRunning = true;void Start(){AsyncIO.ForceDotNet.Force(); // 确保 NetMQ 在 Unity 中正确工作// 初始化发布者publisherSocket = new PublisherSocket();publisherSocket.Bind("tcp://*:5557");subscriber = new SubscriberSocket();subscriber.Connect("tcp://localhost:5556");// 启动发布和订阅任务Task.Run(() => StartPub());Task.Run(() => StartSubscriber());}private void StartPub(){while (isRunning){try{// 使用 BlockingCollection 的 Take 方法获取消息Msg message = MsgQueue.SendMessageQueue.Take();publisherSocket.SendMoreFrame(message.Topic).SendFrame(message.Data);}catch (Exception e){Debug.LogError("Error in StartPub: " + e.Message);}}publisherSocket?.Close(); // 确保发布套接字在退出时关闭}private void StartSubscriber(){subscriber.Subscribe("");while (isRunning){try{// 通过超时和 TryReceiveFrameString 检查订阅消息if (subscriber.TryReceiveFrameString(out string topic)){byte[] bytes = subscriber.ReceiveFrameBytes();MsgQueue.RecvMessageQueue.Add(new Msg(topic, bytes));string str = Encoding.Default.GetString(bytes);Debug.Log($"{topic} {str}");}Task.Delay(10).Wait(); // 使用短暂的延迟来避免过度循环}catch (Exception e){Debug.LogError("Error in StartSubscriber: " + e.Message);}}subscriber.Close(); // 手动关闭订阅者套接字}private void OnDestroy(){isRunning = false; // 设置标志位,通知任务退出// 确保发布套接字关闭publisherSocket?.Close();NetMQConfig.Cleanup(); // 清理 NetMQ}
}
这里有一个注意点就是zeroMq 不支持topic, 但是支持 SendMoreFrame(message.Topic).SendFrame(message.Data) 这个有点风险,也很奇怪,但是貌似是官方推荐的做法,不纠结,就这样吧。
3、Mqtt功能实现
3.1 emqx介绍
mqtt也是一个消息队列,主要是用在IOT
,简单来说就是一个TCP
服务器,消息头会小一些。适合不稳定的网络。我们用的是Emqx
,还行,挺好用。工具客户端使用的是mqttx
,下载地址:https://mqttx.app/zh 官方网站:https://www.emqx.com/zh
3.2 unity插件
这里使用的是官方推荐的插件库 sdk 介绍地址:https://docs.emqx.com/zh/emqx/latest/connect-emqx/introduction.html
官方示例:https://github.com/emqx/MQTT-Client-Examples
我这里使用的是:https://github.com/eclipse/paho.mqtt.m2mqtt 我也使用nuget安装了库,也不起作用,不知道为毛,盲猜是版本的问题,打开上面库,下载之后自己编译 因为用unity
,所以打开了mono
的库
编译之后
将生成的dll
拷贝到plugins
下
3.3 代码实现
using System.Text;
using UnityEngine;
using uPLibrary.Networking.M2Mqtt;
using uPLibrary.Networking.M2Mqtt.Messages;public class MqttStarter : MonoBehaviour
{// Start is called before the first frame updatevoid Start(){string ipAddr = "192.168.3.8";int emqxPort = 1883;string clientId = "csharpclientid";//服务器默认密码是这个string username = "username";string password = "pwd";MqttClient client = new MqttClient(ipAddr, emqxPort, false, null, null, MqttSslProtocols.None);// register to message receivedclient.MqttMsgPublishReceived += client_MqttMsgPublishReceived;client.Connect(clientId,username,password);// client.Subscribe(new string[] { "idse/cloud2veh/#" }, new byte[] { MqttMsgBase.QOS_LEVEL_AT_MOST_ONCE });}private void client_MqttMsgPublishReceived(object sender, MqttMsgPublishEventArgs e){MsgQueue.RecvMessageQueue.Add(new Msg(e.Topic, e.Message));Debug.Log("mqtt--> " +e.Topic +" ==== >" + Encoding.Default.GetString(e.Message));}
}
这里需要注意的是m2mqtt
会自动开启线程,不需要单独实现线程。其他的都是常规操作。
4、验证
先看下zeromq
的收发
验证下Mqtt的收发
5、Java代码zeromq的使用
pom中加入
<dependency><groupId>org.zeromq</groupId><artifactId>jeromq</artifactId><version>0.5.2</version></dependency>
zeromq 发布者
package org.example;/*** Hello world!**/
import org.zeromq.ZMQ;
import org.zeromq.ZContext;public class App {public static void main(String[] args) {int i = 0;// 创建一个ZeroMQ的上下文try (ZContext context = new ZContext()) {// 创建一个发布者socketZMQ.Socket publisher = context.createSocket(ZMQ.PUB);// 绑定到指定的端口String address = "tcp://*:5556";publisher.bind(address);System.out.println("Publisher started at " + address);// 持续发送消息int messageNumber = 0;while (!Thread.currentThread().isInterrupted()) {String message = "Message " + messageNumber;publisher.sendMore("t:"+ (messageNumber%2));publisher.send("abc".getBytes());System.out.println("Sent: " + message);// 增加消息计数并休眠1秒messageNumber++;Thread.sleep(1000);}// 关闭发布者socketpublisher.close();} catch (Exception e) {e.printStackTrace();}}
}
zeromq 订阅者
package org.example;import org.zeromq.ZMQ;
import org.zeromq.ZContext;import java.nio.ByteBuffer;public class ZmqSubscriber {public static void main(String[] args) {// 创建一个ZeroMQ的上下文try (ZContext context = new ZContext()) {// 创建一个订阅者socketZMQ.Socket subscriber = context.createSocket(ZMQ.SUB);// 连接到发布者的地址String address = "tcp://localhost:5557"; // 确保使用正确的地址subscriber.connect(address);System.out.println("Subscriber connected to " + address);// 订阅所有消息subscriber.subscribe("".getBytes()); // 订阅所有消息,可以根据需要指定主题// 持续接收消息while (!Thread.currentThread().isInterrupted()) {// 接收消息String topic = subscriber.recvStr(0);byte[] message = subscriber.recv(0);ByteBuffer wrap = ByteBuffer.wrap(message);if (message != null) {System.out.println("Received: " + topic +" " + wrap.order(java.nio.ByteOrder.LITTLE_ENDIAN).getInt());}}// 关闭订阅者socketsubscriber.close();} catch (Exception e) {e.printStackTrace();}}
}
6、总结
第一次搞zeromq
的消息队列,和平常用的kafka
和rocketmq
差的很多,甚至完全不在同一个讨论方向。
还有一些需要研究
-
在
unity
中nuget
的学习 -
c#
不同平台的学习 -
启动后台线程之后无法关闭,导致
unity
死掉。
byte[] bytes = BitConverter.GetBytes(66666);
-
C#侧生成的bytes 是小端序列
相关文章:
unity 中使用zeroMq和Mqtt 进行通讯
最近我在做一个车上的HMI项目,也就是车机应用,需要与云端和域控进行通信。HMI的功能已经外包了,但消息的统一层留给我自己来做。因为项目组其他人都没有经验,所以这个任务就落到了我头上,尽管我自己也没有太多经验&…...
四、k8s快速入门之Kubernetes资源清单
kubernetes中的资源 ⭐️ k8s中所有的内容都抽象为资源,资源实列化之后,叫做对象 1️⃣名称空间级别 ⭐️ kubeadm在执行k8s的pod的时候会在kube-system这个名称空间下执行,所以说当你kubectl get pod 的时候是查看不到的查看的是默认的po…...
掌握ElasticSearch(六):分析过程
文章目录 一、什么是分析1. 字符过滤 (Character Filtering)2. 分词 (Breaking into Tokens)3. 词条过滤 (Token Filtering)4. 词条索引 (Token Indexing) 二、内置分析器分类1. 标准分析器 (Standard Analyzer)2. 简单分析器 (Simple Analyzer)3. 语言分析器 (Language Analyz…...
【设计模式】使用python 实践框架设计
单一职责原则(SRP):一个类应该只有一个职责,意味着该类只应该有一个引起变化的原因。这使得代码更易于维护和理解。 开放封闭原则(OCP):软件实体(类、模块、函数等)应该…...
Apache paimon-CDC
CDC集成 paimon支持五种方式通过模式转化数据提取到paimon表中。添加的列会实时同步到Paimon表中 MySQL同步表:将MySQL中的一张或多张表同步到一张Paimon表中。MySQL同步数据库:将MySQL的整个数据库同步到一个Paimon数据库中。API同步表:将您的自定义DataStream输入同步到一…...
如何分析算法的执行效率和资源消耗
分析算法的执行效率和资源消耗可以从以下几个方面入手: 一、时间复杂度分析 定义和概念 时间复杂度是衡量算法执行时间随输入规模增长的速度的指标。它通常用大 O 符号表示,表示算法执行时间与输入规模之间的关系。例如,一个算法的时间复杂度为 O(n),表示该算法的执行时间…...
提示工程(Prompt Engineering)指南(进阶篇)
在 Prompt Engineering 的进阶阶段,我们着重关注提示的结构化、复杂任务的分解、反馈循环以及模型的高级特性利用。随着生成式 AI 技术的快速发展,Prompt Engineering 已经从基础的单一指令优化转向了更具系统性的设计思维,并应用于多轮对话、…...
音视频入门基础:FLV专题(19)——FFmpeg源码中,解码Audio Tag的AudioTagHeader,并提取AUDIODATA的实现
一、引言 从《音视频入门基础:FLV专题(18)——Audio Tag简介》可以知道,未加密的情况下,FLV文件中的一个Audio Tag Tag header AudioTagHeader AUDIODATA。本文讲述FFmpeg源码中是怎样解码Audio Tag的AudioTagHead…...
前端零基础入门到上班:【Day3】从零开始构建网页骨架HTML
HTML 基础入门:从零开始构建网页骨架 目录 1. 什么是 HTML?HTML 的核心作用 2. HTML 基本结构2.1 DOCTYPE 声明2.2 <html> 标签2.3 <head> 标签2.4 <body> 标签 3. HTML 常用标签详解3.1 标题标签3.2 段落和文本标签3.3 链接标签3.4 图…...
字符脱敏工具类
1、字符脱敏工具类 import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils;/*** 数据脱敏工具类** date 2024/10/30 13:44*/Slf4j public class DataDesensitizationUtils {public static final String STAR_1 "*";public static final …...
【jvm】jvm对象都分配在堆上吗
目录 1. 说明2. 堆上分配3. 栈上分配(逃逸分析和标量替换)4. 方法区分配5. 直接内存(非堆内存) 1. 说明 1.JVM的对象并不总是分配在堆上。2.堆是JVM用于存储对象实例的主要内存区域,存在一些特殊情况,对象…...
@AutoWired和 @Resource原理深度分析!
嗨,你好呀,我是猿java Autowired和Resource是 Java程序员经常用来实现依赖注入的两个注解,这篇文章,我们将详细分析这两个注解的工作原理、使用示例和它们之间的对比。 依赖注入概述 依赖注入是一种常见的设计模式,…...
C++设计模式创建型模式———原型模式
文章目录 一、引言二、原型模式三、总结 一、引言 与工厂模式相同,原型模式(Prototype)也是创建型模式。原型模式通过一个对象(原型对象)克隆出多个一模一样的对象。实际上,该模式与其说是一种设计模式&am…...
重学SpringBoot3-Spring WebFlux之SSE服务器发送事件
更多SpringBoot3内容请关注我的专栏:《SpringBoot3》 期待您的点赞👍收藏⭐评论✍ Spring WebFlux之SSE服务器发送事件 1. 什么是 SSE?2. Spring Boot 3 响应式编程与 SSE为什么选择响应式编程实现 SSE? 3. 实现 SSE 的基本步骤3.…...
YOLO即插即用模块---AgentAttention
Agent Attention: On the Integration of Softmax and Linear Attention 论文地址:https://arxiv.org/pdf/2312.08874 问题: 普遍使用的 Softmax 注意力机制在视觉 Transformer 模型中计算复杂度过高,限制了其在各种场景中的应用。 方法&a…...
探索开源语音识别的未来:高效利用先进的自动语音识别技术20241030
🚀 探索开源语音识别的未来:高效利用自动语音识别技术 🌟 引言 在数字化时代,语音识别技术正在引领人机交互的新潮流,为各行业带来了颠覆性的改变。开源的自动语音识别(ASR)系统,如…...
学习路之TP6--workman安装
一、安装 首先通过 composer 安装 composer require topthink/think-worker 报错: 分析:最新版本需要TP8,或装低版本的 composer require topthink/think-worker:^3.*安装后, 增加目录 vendor\workerman vendor\topthink\think-w…...
.NET内网实战:通过白名单文件反序列化漏洞绕过UAC
01阅读须知 此文所节选自小报童《.NET 内网实战攻防》专栏,主要内容有.NET在各个内网渗透阶段与Windows系统交互的方式和技巧,对内网和后渗透感兴趣的朋友们可以订阅该电子报刊,解锁更多的报刊内容。 02基本介绍 03原理分析 在渗透测试和红…...
AI Agents - 自动化项目:计划、评估和分配
Agents: Role 角色Goal 目标Backstory 背景故事 Tasks: Description 描述Expected Output 期望输出Agent 代理 Automated Project: Planning, Estimation, and Allocation Initial Imports 1.本地文件helper.py # Add your utilities or helper functions to…...
Git的.gitignore文件
一、各语言对应的.gitignore模板文件 项目地址:https://github.com/github/gitignore 二、.gitignore文件不生效 .gitignore文件只是ignore没有被追踪的文件,已被追踪的文件,要先删除缓存文件。 # 单个文件 git rm --cached file/path/to…...
网站安全,WAF网站保护暴力破解
雷池的核心功能 通过过滤和监控 Web 应用与互联网之间的 HTTP 流量,功能包括: SQL 注入保护:防止恶意 SQL 代码的注入,保护网站数据安全。跨站脚本攻击 (XSS):阻止攻击者在用户浏览器中执行恶意脚本。暴力破解防护&a…...
深度学习:梯度下降算法简介
梯度下降算法简介 梯度下降算法 我们思考这样一个问题,现在需要用一条直线来回归拟合这三个点,直线的方程是 y w ^ x b y \hat{w}x b yw^xb,我们假设斜率 w ^ \hat{w} w^是已知的,现在想要找到一个最好的截距 b b b。 一条…...
SparkSQL整合Hive后,如何启动hiveserver2服务
当spark sql与hive整合后,我们就无法启动hiveserver2的服务了,每次都要先启动hive的元数据服务(nohup hive --service metastore)才能启动hive,之前的beeline命令也用不了,hiveserver2的无法启动,这也导致我…...
前端路由如何从0开始配置?vue-router 的使用
在 Web 开发中,路由是指根据 URL 的不同部分将请求分发到不同的处理函数或页面的过程。路由是单页应用(SPA, Single Page Application)和服务器端渲染(SSR, Server-Side Rendering)应用中的一个重要概念。 在开发中如何…...
Java中的运算符【与C语言的区别】
目录 1. 算术运算符 1.0 赋值运算符: 1.1 四则运算符: - * / % 【取余与C有点不同】 1.2 增量运算符: - * / % * 【右侧运算结果会自动转换类型】 1.3 自增、自减:、-- 2. 关系运算符 3. 逻辑运算符 3.1 短路求值 3.2 【…...
二、基础语法
入门了解 注释 **作用:**在代码中加一些注释和说明,方便自己或者其他程序员阅读代码 两种格式: 单行注释:// 描述信息 通常放在一行代码的上方,或者一条语句的末尾,对该行代码进行说明 多行注释&#x…...
DB-GPT系列(一):DB-GPT能帮你做什么?
DB-GPT是一个开源的AI原生数据应用开发框架(AI Native Data App Development framework with AWEL and Agents),围绕大模型提供灵活、可拓展的AI原生数据应用管理与开发能力,可以帮助企业快速构建、部署智能AI数据应用,通过智能数据分析、洞察…...
【Python各个击破】numpy
简介 NumPy是一个开源的Python库,它提供了一个强大的N维数组对象和许多用于操作这些数组的函数。它是大多数Python科学计算的基础,包括Pandas、SciPy和scikit-learn等库都建立在NumPy之上。 安装 !pip install numpy导入 import numpy as np用法 # …...
【STM32 Blue Pill编程实例】-4位7段数码管使用
4位7段数码管使用 文章目录 4位7段数码管使用1、7段数码介绍2、硬件准备与接线3、模块配置4、代码实现在本文中,我们将介绍如何将 STM32 Blue Pill开发板与 4 位 7 段数码管连接,并在 STM32CubeIDE 中对其进行编程。 在文章中首先将介绍 4 位 7 段数码管及其与 STM32 Blue Pi…...
[进阶]java基础之集合(三)数据结构
文章目录 数据结构概述常见的数据结构数据结构(栈)数据结构(队列)数据结构(数组)数据结构(链表) 数据结构 概述 数据结构是计算机底层存储、组织数据的方式。是指数据相互之间是以什么方式排列在一起的。数据结构是为了更加方便的管理和使用数据,需要结合具体的业…...
南宁制作网站企业/手机上制作网页
动态规划算法 基本思想 动态规划算法通常用于求解具有某种最优性质的问题。在这类问题中,可能会有许多可行解。每一个解都对应于一个值,我们希望找到具有最优值的解。 动态规划算法与分治法类似,其基本思想也是将待求解问题分解成若干个子…...
做企业网站需要维护费吗/网站推广优化排名教程
图片模糊搜索项目一阶段项目小结 历时近一个月,经过多次的算法优化,加上工程上的调整,初期版本完成,尤其是元素种类相对清晰的图片搜索结果更能体现效果. 这里从以下几个方面展开表述 : 特征提取jni接口配置 / 图片特征保存 / 相关算法的调整优化 / 科学合理的测试策略 / 大数据…...
律师的网站模板/网络营销主要学什么
到目前为止,demo 的操作是不能回退的,点击格子以后,想要记录历史的操作,就需要 使用 slice() 函数为每一步创建 squares 数组的副本,同时把这个数组当作不可变对象。这样就可以把所有 squares 数组的历史版本都保存下来…...
上海做网站公/seo教程网站优化推广排名
windows:CtrlAltL MAC:ComOptionL...
哪里有配音的网站/京东seo搜索优化
供应商管理是正确优化和负责有效供应链的关键。 供应商几乎是每个组织活动和流程的核心。事实上,供应商涉及到业务的每一个部分,为了确保企业能够正常运营,它需要确保商品和产品的无缝流动。 如果供应链出现故障,企业将蒙受巨大…...
wordpress导入demo/上海公司网站seo
文章目录一、简单局域网的构成二、IP地址三、子网掩码四、IP地址详解五、网关六、DNS七、网络测试命令一、简单局域网的构成 局域网:一般称为内网。 简单的局域网构成:交换机、网线、pc机(其他终端) 交换机:用来组建内…...