当前位置: 首页 > news >正文

【MacOS】RocketMQ 搭建Java客户端

【MacOS】RocketMQ 搭建Java客户端

文章目录

  • 【MacOS】RocketMQ 搭建Java客户端
    • 一、引入RocketMQ客户端依赖
      • 1.maven工程,在你的`pom.xml`中添加RocketMQ客户端依赖:
      • 2.gradle工程添加库
    • 二、创建生产者和消费者
      • 1.创建一个生产者
      • 消费者
        • 1.创建一个PullConsumer
        • 2.创建一个PushConsumer
    • 三、遇到的问题
      • 1.连接失败的问题
      • 2.主题没有找到

一、引入RocketMQ客户端依赖

1.maven工程,在你的pom.xml中添加RocketMQ客户端依赖:

<dependencies><!-- 添加RocketMQ客户端依赖 --><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>版本跟你下载的rocketmq版本一样</version></dependency>
</dependencies>

2.gradle工程添加库

compile 'org.apache.rocketmq:rocketmq-client:你的版本号'
  • 注意
    1. 客户端和服务端版本要一致,否则会发射管一些奇怪的问题
    2. 要到控制台创建Topic队列名称

二、创建生产者和消费者

1.创建一个生产者

mport com.alibaba.fastjson.JSONObject;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;/*** @author pengxiaoping* @date 2024年10月18日 11:27*/
public class Producer {public static void main(String[] args) {Producer.producer();}public static void producer() {//创建DefaultMQProducer消息生产者对象DefaultMQProducer producer = new DefaultMQProducer("TestProducerGroup");//设置NameServer 多个节点间用分号分割producer.setNamesrvAddr("localhost:9876");try {//与NameServer建立长连接producer.start();//发送十条数据for (int i = 1; i <= 10; i++) {//1S中发送一次Thread.sleep(1000);JSONObject json = new JSONObject();json.put("orderId",i+1);json.put("desc","这是第"+i+1+"个订单");//数据正文String data = json.toJSONString();/*创建消息Message消息三个参数topic 代表消息主题,自定义自定义TopicOrder代表订单主题代表订单主题tags 代表标志,用于消费者接收数据时进行数据筛选。PAY_TAG代表支付相关信息body 代表消息内容*/Message message = new Message("TopicOrder", "PAY_TAG", data.getBytes());//发送消息,获取发送结果SendResult result = producer.send(message);//将发送结果对象打印在控制台System.out.println("消息已发送:MsgId:" + result.getMsgId() + ",发送状态:"+ result.getSendStatus());}}catch (Exception e){e.printStackTrace();}finally {try {producer.shutdown();} catch (Exception e) {}}}}

消费者

对于Consumer来说,他有两种基础的工作方式:pull和push。
区别:push:broker端来了消息以后主动将消息从broker端向consumer端推送。
pull:对于consumer来说主动往broker发一个请求,然后broker在通过请求响应给consumer一批消息。一般用push模式。

1.创建一个PullConsumer
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class PullConsumer {public static volatile boolean running = true;public static void consumer() {//创建pull消费者对象DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("TestPullConsumerGroup");//设置NameServer节点litePullConsumer.setNamesrvAddr("localhost:9876");try {//订阅主题,与Push相同litePullConsumer.subscribe("TopicOrder", "*");//每次拉取数据条目数litePullConsumer.setPullBatchSize(10);//启动消费者litePullConsumer.start();while (running) {List<MessageExt> messageExts = litePullConsumer.poll();//批量数据处理for (MessageExt msg : messageExts) {System.out.println("消费者获取数据:" + msg.getMsgId() + "==>" + newString(msg.getBody()));}}}catch (Exception e){e.printStackTrace();}finally {litePullConsumer.shutdown();}}public static void main(String[] args) {PullConsumer.consumer();}
}
2.创建一个PushConsumer
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class PushConsumer {public static void consumer() {//创建消费者对象DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TestConsumerGroup");try {//设置NameServer节点consumer.setNamesrvAddr("localhost:9876");/*订阅主题,consumer.subscribe包含两个参数:topic: 说明消费者从Broker订阅哪一个主题,这一项要与Provider保持一致。subExpression: 子表达式用于筛选tags。同一个主题下可以包含很多不同的tags,subExpression用于筛选符合条件的tags进行接收。例如:设置为*,则代表接收所有tags数据。例如:设置为PAY_TAG,则Broker中只有tags=PAY_TAG的消息会被接收,而其他的就会被排除在外。*/consumer.subscribe("TopicOrder", "*");//创建监听,当有新的消息监听程序会及时捕捉并加以处理。consumer.registerMessageListener(new MessageListenerConcurrently() {public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {//批量数据处理for (MessageExt msg : msgs) {System.out.println("消费者获取数据:" + msg.getMsgId() + "==>" + newString(msg.getBody()));}//返回数据已接收标识return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动消费者,与Broker建立长连接,开始监听。consumer.start();} catch (Exception e) {e.printStackTrace();}}public static void main(String[] args) {PushConsumer.consumer();}
}

三、遇到的问题

1.连接失败的问题

Caused by: org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to null failedat org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:572)at org.apache.rocketmq.client.impl.MQClientAPIImpl.getTopicRouteInfoFromNameServer(MQClientAPIImpl.java:2050)at org.apache.rocketmq.client.impl.MQClientAPIImpl.getTopicRouteInfoFromNameServer(MQClientAPIImpl.java:2041)at org.apache.rocketmq.client.impl.factory.MQClientInstance.updateTopicRouteInfoFromNameServer(MQClientInstance.java:782)... 6 more
  • 检查代码中设置的NameServer地址是否正确,跟配置文件中的NAME_ADDR地址一致。确保没有拼写错误、IP 地址或域名准确,以及端口号正确。
  • 检查 RocketMQ 的 NameServer 是否已经启动并且正在运行。查看 NameServer 的日志文件,确认没有错误或异常情况。如果 NameServer 没有启动,需要启动它。确保 NameServer 的配置正确,并且没有与其他服务冲突。

2.主题没有找到

org.apache.rocketmq.client.exception.MQClientException: No route info of this topic: TopicTest
See https://rocketmq.apache.org/docs/bestPractice/06FAQ for further details.at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:879)at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1564)at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:475)at org.apache.rocketmq.example.quickstart.Producer.main(Producer.java:78)
org.apache.rocketmq.client.exception.MQClientException: No route info of this topic: TopicTest
See https://rocketmq.apache.org/docs/bestPractice/06FAQ for further details.at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:879)at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1564)at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:475)at org.apache.rocketmq.example.quickstart.Producer.main(Producer.java:78)
  • 主题不存在或者未被创建:

    #在控制台创建主题 -n namesrv 的地址。-t 主题名。-c 指定所在集群
    sh bin/mqadmin updateTopic -n localhost:9876 -t TopicOrder -c DefaultCluster
    
    #出现这个创建成功
    create topic to 172.16.224.140:10911 success.
    TopicConfig [topicName=TopicOrder, readQueueNums=8, writeQueueNums=8, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false, attributes={}]
    

相关文章:

【MacOS】RocketMQ 搭建Java客户端

【MacOS】RocketMQ 搭建Java客户端 文章目录 【MacOS】RocketMQ 搭建Java客户端一、引入RocketMQ客户端依赖1.maven工程&#xff0c;在你的pom.xml中添加RocketMQ客户端依赖&#xff1a;2.gradle工程添加库 二、创建生产者和消费者1.创建一个生产者消费者1.创建一个PullConsume…...

前端学习---(5)js基础--3

ES 的全称是 ECMAScript&#xff0c;它是由 ECMA 国际标准化组织 制定的一套脚本语言的标准化规范。 ES6 的变量声明 let&#xff1a;定义变量 const&#xff1a;定义常量&#xff08;定义后&#xff0c;不可修改&#xff09; ES5中的 var 容易造成全局污染; ES6中的let可以在块…...

Spring Boot 3.3.4 升级导致 Logback 之前回滚策略配置不兼容问题解决

前言 在将 Spring Boot 项目升级至 3.3.4 版本后&#xff0c;遇到 Logback 配置的兼容性问题。本文将详细描述该问题的错误信息、原因分析&#xff0c;并提供调整日志回滚策略的解决方案。 错误描述 这是SpringBoot 3.3.3版本之前的回滚策略的配置 <!-- 日志记录器的滚动…...

如何开发属于自己的Hoobuy跨境独立站

以下是开发属于自己的类似 Pandabuy 或 Hoobuy 的跨境独立站的一般步骤&#xff1a; 市场调研与定位&#xff1a; 目标市场分析&#xff1a;确定您的独立站面向的海外目标市场&#xff0c;比如特定国家或地区。研究该市场的消费趋势、需求特点、竞争对手情况以及当地的法律法规…...

java智能物流管理系统源码(springboot)

项目简介 智能物流管理系统实现了以下功能&#xff1a; 智能物流管理系统的主要使用者分为管理员&#xff0c;顾客&#xff0c;员工&#xff0c;店主。功能有个人中心&#xff0c;顾客管理&#xff0c;员工管理&#xff0c;店主管理&#xff0c;门店信息管理&#xff0c;门店…...

全新语音图像数据集,以高质量训练数据加速提升模型性能

海天瑞声数据集上新&#xff1a;超60个国家地区口音英语语音识别数据集、多国口音西语语音识别数据集、印度多语种语音识别数据集、中文自然对话语音合成数据集、中文多音色语音合成数据集、多肤色高清人像数据集。海天瑞声高质量AI训练数据加速提升模型性能&#xff0c;让AI产…...

基于Springboot在线视频网站的设计与实现

基于Springboot视频网站的设计与实现 开发语言&#xff1a;Java 框架&#xff1a;springboot JDK版本&#xff1a;JDK1.8 服务器&#xff1a;tomcat7 数据库&#xff1a;mysql 5.7 数据库工具&#xff1a;Navicat11 开发软件&#xff1a;idea 源码获取&#xff1a;https://do…...

vue富文本使用editor

富文本【图片上传、缩放、拖动和不能复制只能根据点击图片上传到服务器】 <div id"editorId"><quill-editorref"myQuillEditor"v-model.trim"addForm.content":options"editorOption":disabled"isDisable"change&…...

Spring Boot植物健康系统:绿色科技的创新引擎

摘要 随着信息技术在管理上越来越深入而广泛的应用&#xff0c;管理信息系统的实施在技术上已逐步成熟。本文介绍了植物健康系统的开发全过程。通过分析植物健康系统管理的不足&#xff0c;创建了一个计算机管理植物健康系统的方案。文章介绍了植物健康系统的系统分析部分&…...

什么是域名?什么是泛域名?

域名 定义 域名是互联网上用于识别和定位网站或网络服务的名称。它是由一串用点分隔的字符组成&#xff0c;例如 “baidu.com”。就像是现实生活中建筑物的地址&#xff0c;方便用户在互联网的海量信息中找到特定的网站。 结构 域名从右到左依次为顶级域名&#xff08;TLD&…...

c#webapi远程调试方法asp.netcore

服务器安装 Visual Studio 2019 Remote Debugger 打开并运行 tools->Options 选择No Authorizaiton 确保IIS已经启动 打开本地项目->调试->添加到进程 参考&#xff1a; c#webapi远程调试方法asp.netcore - txwtech - 博客园...

XMLHttpRequest和FormData下载文件,ajax下载文件

1、前端请求只做下载功能 function downloadDatas3() {// 封装请求参数let formData new FormData();formData.append(page, 1);formData.append(rows, 10);// 创建xhr对象let xhr new XMLHttpRequest();xhr.open(POST, http://127.0.0.1:8080/getData);// xhr.setRequestHe…...

针对考研的C语言学习(2014二叉树大题代码实战)

题目描述 解析 1.递归思想遍历节点&#xff0c;若是叶子结点就累加计算的wpl&#xff0c;反之继续遍历 2.代码如下 //树 typedef struct trees {ElemType data;struct trees* lc;struct trees* rc; }treeNode, * Tree;3.算法设计 //deep路径长度也叫做深度&#xff0c;0开始 …...

webpack面试笔记(一)

一.webpack基础 1.模块化 什么是模块化? 模块化是把一个复杂的系统分解到多个模块以方便编码 为什么出现模块化 以前使用命名空间的方式来组织代码,比如jQuery,zepto, 它们有很多缺点: 命名空间冲突,两个库可能会使用同一个名称,例如zepto也被放在window.$下无法合理管理项目…...

雷池社区版有多个防护站点监听在同一个端口上,匹配顺序是怎么样的

如果域名处填写的分别为 IP 与域名&#xff0c;那么当使用进行 IP 请求时&#xff0c;则将会命中第一个配置的站点 以上图为例&#xff0c;如果用户使用 IP 访问&#xff0c;命中 example.com。 如果域名处填写的分别为域名与泛域名&#xff0c;除非准确命中域名&#xff0c;否…...

【小白学机器学习15】 概率论的世界观

目录 1 最近看的几本书和想说的 1.1 最近看的书 1.2 为什么写这个 2 概率论的观点看世界 2.1 上帝掷骰子&#xff0c;没有绝对的事情&#xff0c;所有事情都是概率决定的&#xff0c;都是相对的。 2.2 万物皆可能&#xff0c;无物是必然 2.3 什么是&#xff1a;可能性…...

合成数据用于大模型训练的3点理解

最近看国内对合成数据的研究讨论也变得多 ,而不单单是多模态,扩散模型这些偏视觉类的, 因此就合成数据写一下目前的情况。 2023年国外就有很多研究合成数据的论文, 包括Self-Consuming Generative Models Go MAD, Crowd Workers Widely Use Large Language Models for Text Pr…...

Safari 中 filter: blur() 高斯模糊引发的性能问题及解决方案

目录 引言问题背景&#xff1a;filter: blur() 引发的问题产生问题的原因分析解决方案&#xff1a;开启硬件加速实际应用示例性能优化建议常见的调试工具与分析方法 引言 在前端开发中&#xff0c;CSS滤镜&#xff08;如filter: blur()&#xff09;的广泛使用为页面带来了各种…...

浏览器实时更新esp32-c3 Supermini http server 数据

一利用此程序的思路就可以用浏览器显示esp32 采集的各种传感器的数据&#xff0c;也可以去控制各种传感器。省去编写针对各系统的app. 图片 1.浏览器每隔1秒更新一次数据 2.现在更新的是开机数据&#xff0c;运用此程序&#xff0c;可以实时显示各种传感器的实时数据 3.es…...

【亚马逊云】基于 Amazon EKS 搭建开源向量数据库 Milvus

文章目录 一、先决条件1.1 安装AWS CLI ✅1.2 安装 EKS 相关工具✅1.3 创建 Amazon S3 存储桶✅1.4 创建 Amazon MSK 实例✅ 二、创建EKS集群三、创建 ebs-sc StorageClass四、安装 AWS Load Balancer Controller五、部署 Milvus 数据库5.1 添加 Milvus Helm 仓库5.2 配置 S3 作…...

pytorch安装GPU版本,指定设备

安装了GPU版本的pytorch的时候&#xff0c;想要使用CPU&#xff0c;怎么操作呢&#xff1f; 设置环境变量&#xff1a; set TF_FORCE_GPU_ALLOW_GROWTHfalse set CUDA_VISIBLE_DEVICES如果想要使用固定序号的GUP设备&#xff0c;则指定ID set CUDA_VISIBLE_DEVICES0 # 使用第…...

草地杂草数据集野外草地数据集田间野草数据集YOLO格式VOC格式目标检测计算机视觉数据集

一、数据集概述 数据集名称&#xff1a;杂草图像数据集 数据集是一个包含野草种类的集合&#xff0c;其中每种野草都有详细的特征描述和标记。这些数据可以包括野草的图片、生长习性、叶片形状、颜色等特征。 1.1可能应用的领域 农业领域: 农业专家和农民可以利用这一数据集来…...

顺序表排序相关算法题|负数移到正数前面|奇数移到偶数前面|小于x的数移到大于x的数前面|快排思想(C)

负数移到正数前面 已知顺序表 ( a 1 , … , a n ) (a_{1},\dots,a_{n}) (a1​,…,an​)&#xff0c;每个元素都是整数&#xff0c;把所有值为负数的元素移到全部正数值元素前边 算法思想 快排的前后指针版本 排序|冒泡排序|快速排序|霍尔版本|挖坑版本|前后指针版本|非递归版…...

【小白学机器学习20】单变量分析 / 0因子分析 (只分析1个变量本身的数据)

目录 1 什么是单变量分析&#xff08;就是只分析数据本身&#xff09; 1.1 不同的名字 1.2 《戏说统计》这本书里很多概念和一般的书不一样 1.3 具体来说&#xff0c;各种概率分布都属于单变量分析 2 一维的数据分析的几个层次 2.1 数据分析的层次 2.2 一维的数据为什么…...

[软件工程]—桥接(Brige)模式与伪码推导

桥接&#xff08;Brige&#xff09;模式与伪码推导 1.基本概念 1.1 动机 由于某些类型的固有的实现逻辑&#xff0c;使它们具有两个变化的维度&#xff0c;乃至多个维度的变化。如何应对这种“多维度的变化”&#xff1f;如何利用面向对象技术是的类型可以轻松的沿着两个乃至…...

TensorFlow面试整理-TensorFlow 结构与组件

TensorFlow 的结构和组件是其功能强大、灵活性高的重要原因。掌握这些结构和组件有助于更好地理解和使用 TensorFlow 构建、训练和部署模型。以下是 TensorFlow 关键的结构与组件介绍: 1. Tensor(张量) 定义:张量是 TensorFlow 中的数据载体,类似于多维数组或矩阵。张量的…...

linux下gpio模拟spi三线时序

目录 前言一、配置内容二、驱动代码实现三、总结 前言 本笔记总结linux下使用gpio模拟spi时序的方法&#xff0c;基于arm64架构的一个SOC&#xff0c;linux内核版本为linux5.10.xxx&#xff0c;以驱动三线spi(时钟线sclk&#xff0c;片选cs&#xff0c;sdata数据读和写使用同一…...

makesense导出的压缩包是空的

md ,那些教程感觉都不是人写的&#xff0c;没说要在右边选标签&#xff0c;我本来就是一个标签&#xff0c;我以为他会自动识别打标&#xff0c;结果死活导出来空包 密码要在右边选标签&#xff0c;...

Spring Boot框架下的中小企业设备维护系统

5系统详细实现 5.1 用户信息管理 中小企业设备管理系统的系统管理员可以对用户信息添加修改删除以及查询操作。具体界面的展示如图5.1所示。 图5.1 用户信息管理界面 5.2 员工信息管理 管理员可以对员工信息进行添加修改删除操作。具体界面如图5.2所示。 图5.2 员工信息界面…...

处理文件上传和进度条的显示(进度条随文件上传进度值变化)

成品效果图&#xff1a; 解决问题&#xff1a;上传文件过大时&#xff0c;等待时间过长&#xff0c;但是进度条却不会动&#xff0c;只会在上传完成之后才会显示上传完成 上传文件的upload.component.html <nz-modal [(nzVisible)]"isVisible" [nzTitle]"文…...

合肥网站设计制作/个人做外贸怎样起步

这篇文章主要介绍了python plotly画柱状图代码实例,文中通过示例代码介绍的非常详细&#xff0c;对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下代码import pandas as pdimport numpy as npimport plotly.plotly as pyimport plotly.graph_objs as gopath …...

怎样做吧网站排名做上去/网络推广费用大概价格

这道题同时涉及到深搜和广搜&#xff0c;是很好的搜索入门题&#xff0c;利用广搜快速求最短的查找次数。利用深搜求向左和向右方向的查找次数。 下面是代码: #include <stdio.h>#include <stdlib.h>#include <cstring>#include <queue>#define Max …...

建网站赚钱吗/如何在百度发布文章

在本地IDEA开发方便代码调试&#xff0c;测试通过后&#xff0c;才会打包上传服务器运行。 hbase客户端连接hbase服务端读写时&#xff0c;会出现莫名的异常&#xff0c;有时会报错看日志可以解决问题&#xff0c;有时控制台没有日志 本次遇到的问题是权限问题 服务器部署了…...

百度手机网站优化指南/营销网

企业老板还在为处理有机废气烦恼吗&#xff1f;您应该了解一下活性炭吸附法工业有机废气是指工业生产过程中排出的含挥发性有机物的气态污染物&#xff0c;这些废气如果得不到有效处理会对工作人员和周围环境造成严重的危害。因此治理有机废气&#xff0c;让废气排放达到环保标…...

建网站公司销售/优量汇广告平台

偶然在网上看到的编程题&#xff0c;感觉挺有意思的。但是没有在网上找到对应的题目和解析&#xff0c;所以没法测试算法的正确性&#xff0c;下面写一下思路&#xff0c;供大家参考&#xff0c;如果有纰漏之处还望指出。关于不曾下降过的序列递增应该由如下方式组成&#xff1…...

上海营销网站建设定制服务/热搜词排行榜关键词

C语言练习&#xff1a;第二大整数问题描述编写一个程序&#xff0c;读入一组整数(不超过20个)&#xff0c;当用户输入0时&#xff0c;表示输入结束。然后程序将从这组整数中&#xff0c;把第二大的那个整数找出来&#xff0c;并把它打印出来。说明&#xff1a;(1)0表示输入结束…...