dwcs6网站建设视频/手机优化大师为什么扣钱
问题: 使用RocketMQ消息队列,生产者将数据发送出去了,但是生产者一致没接收到(或者是间隔好几分钟,突然接收到一条数据)怎么办?并且通过rocket web控制台查看消息的状态为NOT_ONELINE或者NOT_CONSUME,(如下图) 这种诡异现象该怎么解决?
1. 先说解决方案
这种情况99%是由于订阅关系不一致导致的,可以排查下程序看看是否有多个消费者使用了同一个group,并且订阅了不同的主题。逻辑图展示如下:
这种情况只需要将不同的消费者的group区分一下即可, 逻辑关系图变成如下这种:
到此为止,是不是惊奇的发现,问题解决了?
2. 注意事项:订阅关系一致性
看下Rocket MQ官方文档给出的说明:
定义
消费者分组是 Apache RocketMQ 系统中承载多个消费行为一致的消费者的负载均衡分组。和消费者不同,消费者分组并不是运行实体,而是一个逻辑资源。在 Apache RocketMQ 中,通过消费者分组内初始化多个消费者实现消费性能的水平扩展以及高可用容灾。
这里面只描述出了Tag的一致,事实上下面这种订阅关系也是错误的,同一个group中的两个消费者分别订阅了不同的主题, 违背了定义中的消费行为一致原则:
//Consumer c1
Consumer c1 = ConsumerBuilder.build(groupA);
c1.subscribe(topicA);
//Consumer c2Consumer
c2 = ConsumerBuilder.build(groupA);
c2.subscribe(topicB);
3. 剖析源码实现,分析原因
从GitHub下载rocketmq源码通过idea打开之后,从官方提供的example进来:
进入到DefaultMQPushConsumer构造方法中,可以发现初始化了一个DefaultMQPushConsumerImpl
类:
public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook,AllocateMessageQueueStrategy allocateMessageQueueStrategy) {this.consumerGroup = consumerGroup;this.namespace = namespace;this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;// 这里初始化一个默认的push类型的Consumer实现类defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);}
然后继续进入到DefaultMQPushConsumerImpl
类中, 可以看见有一个成员变量MQClientInstance mQClientFactory
, 在DefaultMQPushConsumerImpl
类的start()
(启动消费者)方法中会通过MQClientManager初始化MQClientInstance
类.
接着跳转到MQClientInstance
构造方法中, 会发现有这样一行代码, 初始化了一个rebalanceService. 这个rebalanceService就是RocketMQ隔一段时间进行rebalance的核心实现.
继续剖析RebalanceService
类, 发现其实现了Runnable接口, 话不多说, 直接看其 run()方法中做了什么事.
呀! 原来是隔一段时间调用一次上述咱们提到的DefaultMQPushConsumerImpl
类中的doRebalance()
方法, 搞了半天又绕回来了. … … … … … …
直接进入到这里面, 看看rebalance的逻辑:
集群部署模式下, 会进行rebalance操作, 根据topic名称和group名称获取到所有的consumer列表.
case CLUSTERING: {Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);// 这里根据topic名称和Group进行获取到所有的consumerList<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);if (null == mqSet) {if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {this.messageQueueChanged(topic, Collections.<MessageQueue>emptySet(), Collections.<MessageQueue>emptySet());log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);}}
但是进去这行代码里面发现, topic名称仅仅用来获取Broker的网络地址, 真正获取到所有Consumer列表的是通过Group名称获取的, 看到这里相信大家基本上能够恍然大悟. 回归到上面的问题: 如果一个一个Group中的多个消费者分别订阅了不同的主题, 即: 消费行为不一致, 无论这个属于当前Group中的消费者是否订阅了这个主题, 都会参与rebalance.
画图解释一下, 假设在同一个Group下, 两个Consumer都分别订阅了Topic1和Topic2, 这种情况订阅关系一致,
假设消费者1消费Topic2的速度比较快, 经过一次rebalance之后, Consumer订阅的队列逻辑有可能成为这样的:
此时由于订阅关系的一致性, 整体系统并不会出现问题. 接下来看一种情况, 同一个消费组中的Consumer1 订阅了Topic1, Consumer2订阅了Topic2, 初始情况逻辑关系是这样:
由于进行rebalance是通过Group获取对应的消费者客户端ID, 因此rebalance之后可能出现Consumer1 指向了Topic2中的某一个队列, 同理, Consumer2指向了Topic1中的队列. 但是这与Consumer中设定的topic不一致, 因此会出现RocketMQ中消息状态为为NOT_COMSUME_YET
(个人通过对源码的简单梳理总结的文章, 如有错误欢迎指正)
相关文章:

RocketMQ生产者消息发送出去了,消费者一直接收不到怎么办?(Rocket MQ订阅关系一致性)
问题: 使用RocketMQ消息队列,生产者将数据发送出去了,但是生产者一致没接收到(或者是间隔好几分钟,突然接收到一条数据)怎么办?并且通过rocket web控制台查看消息的状态为NOT_ONELINE或者NOT_CONSUME&#…...

使用Golang开发硬件驱动
1. 介绍 Golang是一种简洁、高效的编程语言,它的强大并发性能和丰富的标准库使得它成为了开发硬件驱动的理想选择。在本文中,我们将探讨如何使用Golang开发硬件驱动程序,并提供一个实例来帮助你入门。 2. 准备工作 在开始之前,…...

设计模式(19)命令模式
一、介绍: 1、定义:命令模式(Command Pattern)是一种行为设计模式,它将请求封装为一个对象,从而使你可以使用不同的请求对客户端进行参数化。命令模式还支持请求的排队、记录日志、撤销操作等功能。 2、组…...

QModelIndex 与QStandardItem相互转换
目录 1、 QModelIndex 转换成QStandardItem 2 、QStandardItem 转换成 QModelIndex 3、示例 4、总结 1、 QModelIndex 转换成QStandardItem QStandardItem * itemQStandardItemModel::itemFromIndex(const QModelIndex & index) const 借助QStandardItemModel来完成…...

Linux - 进程地址空间
前言 首先,我们先要对 内存当中存储 各个数据之间的 结构要有一个 大概的了解: 各个区当中存储的数据使用类型不同,所以,这些数据在使用方式上是有差别的。比如下面这个例子: 在C 语言当中我们不能直接对 上述的 str…...

系统架构设计师-第16章-嵌入式系统架构设计理论与实践-软考学习笔记
嵌入式系统( Embedded System) 是为了特定应用而专门构建的计算机系统,其架构是随着嵌入式系统的逐步应用而发展形成的。嵌入式软件架构的设计与嵌入式系统的体系架构是密不可分的。因此,本常首先介绍嵌入式系统硬件相关知识(系统特征、硬件组…...

pod进阶
目录 资源限制 CPU 资源单位 内存 资源单位 实例 健康检查 探针的三种规则: Probe支持三种检查方法: 示例1:exec方式 示例2:httpGet方式 示例3:tcpSocket方式 示例4:就绪检测 扩展 资源限制 当定…...

系列四十七、Spring的事务传播行为案例演示(七)#NOT_SUPPORTED
一、演示Spring的传播行为(NOT_SUPPORTED) 1.1、StockServiceImplNOT_SUPPORTED /*** Author : 一叶浮萍归大海* Date: 2023/10/30 15:43* Description: 演示NOT_SUPPORTED的传播行为* 外部不存在事务:不开启新的事务* 外部存在…...

54.RabbitMQ快速实战以及核心概念详解
MQ MQ:MessageQueue,消息队列。这东西分两个部分来理解: 队列,是一种FIFO 先进先出的数据结构。 消息:在不同应用程序之间传递的数据。将消息以队列的形式存储起来,并且在不同的应用程序之间进行传递&am…...

Qt TreeView 设置节点不可编辑
目录 1. 创建treeview 2、节点不可编辑 3、设置logo 4、实例代码 1. 创建treeview //声明模型 QStandardItemModel *model;//创建4行,1列的模型 model new QStandardItemModel(4,1);//添加标题 model->setHeaderData(0, Qt::Horizontal, tr("Tree View…...

python django获取某个角色的某个数据和——例如:获取所有订单的应付金额总和
model关系如下: class Order(models.Model):订单product models.ForeignKey(Product, on_deletemodels.SET_NULL, blankTrue, nullTrue, verbose_name"产品")no models.CharField(max_length50, blankTrue, nullTrue, verbose_name订单编号, db_indexT…...

如何在React项目中引用less
安装less npm install less less-loader --save-dev暴露 webpack 文件 利用 npx create-react-app 搭建的 React 项目,默认隐藏 webpack 配置文件,引入 less 需要修改 webpack 配置文件,因此我们需要执行命令暴露 webpack 配置文件。 请先将…...

NUXT前端服务端渲染技术框架
服务端渲染又称SSR(Server Side Render)实在服务端完成页面的内容,而不是在客户端通过AJAX获取数据 优势:更好的SEO,由于搜索引擎爬虫抓取工具可以直接查看完全渲染的页面 Nuxt.js是一个基于Vue.js的轻量级应用框架&a…...

力扣每日一题90:子集
题目描述: 给你一个整数数组 nums ,其中可能包含重复元素,请你返回该数组所有可能的子集(幂集)。 解集 不能 包含重复的子集。返回的解集中,子集可以按 任意顺序 排列。 示例 1: 输入&#x…...

「linux基础」上传代码到github/gitee
一、在gitee创建一个仓库 1.创建仓库 2.获取仓库地址 二、克隆仓库文件到linux中 1.查看Linux中是否安装git:git --version 如果没有,在root下使用指令 yum install -y git 安装。 2.使用 git clone 仓库地址,克隆仓库文件到linux中 三、第…...

Hafnium总体考虑
安全之安全(security)博客目录导读 目录 一、安全世界构建平台 二、安全分区调度 三、平台拓扑...

C#__对Json文件的解析和序列化
Json: 存储和交换文本信息的语法。(类似XML,语法独立) 一种轻量级的数据交换格式。(更小,更快,更易解析) 语法规则: 数据在键值对里面,数据由逗号分隔开。 …...

如果一定要在C++和JAVA中选择,是C++还是java?
如果一定要在C和JAVA中选择,是C还是java? 计算机专业的同学对这个问题有疑惑的,- 定要看一下这个回答! 上来直接给出最中肯的建议: 如果你是刚刚步入大学的大一时间非常充裕的同学,猪学长强烈建议先学C/C.因为C 非常 最近很多…...

如何运行深度学习项目代码
运行项目代码是第一步哦! 配环境 使用anaconda环境; conda 环境 按照项目提示的README.md,安装指定版本的python; 当然新版python会兼容旧版,也就是你的环境下python版本比它高也不要紧; 但是更新的pyt…...

C语言 每日一题 day9
求最大值及其下标 本题要求编写程序,找出给定的n个数中的最大值及其对应的最小下标(下标从0开始)。 输入格式 : 输入在第一行中给出一个正整数n(1 < n≤10)。第二行输入n个整数,用空格分开。 输出格式 …...

通讯网关软件032——利用CommGate X2OPC实现OPC客户端访问Modbus TCP设备
本文介绍利用CommGate X2OPC实现OPC客户端连接Modbus TCP设备。CommGate X2OPC是宁波科安网信开发的网关软件,软件可以登录到网信智汇(http://wangxinzhihui.com)下载。 【案例】如下图所示,SCADA系统上位机、PLC、设备具备Modbus TCP通讯接口ÿ…...

[计算机提升] 查看系统软件
3.1 查看系统软件 此处系统软件为系统安装后自带的一些软件、工具等。包括:管理工具、系统工具、轻松使用工具、附件等。 方法一:通过菜单打开系统软件 1、点击左下角windows菜单键,在弹出的菜单中,任一点击一个字母(示例中为C)&…...

【mysql】单表数据量过大解决方案
文章目录 背景问题方案数据库冷热数据分离方案 背景 包装码表单表数据量很大,造成查询瓶颈;目前单表数据量达到3000w,单表字段数16 问题 索引膨胀,查询耗时长,影响正常CRUD … 方案 ● 分区 按日期…范围&#x…...

Kafka - 3.x 消费者 生产经验不完全指北
文章目录 生产经验之Consumer事务生产经验—数据积压(消费者如何提高吞吐量) 生产经验之Consumer事务 Kafka引入了消费者事务(Consumer Transactions)来确保在消息处理期间维护端到端的数据一致性。这使得消费者能够以事务的方式…...

UDP网络编程的接受与发送信息
/发送端B>可以接受数据 public class UDPSenderB {public static void main(String[] args) throws IOException {//创建一个DatagramSocket 对象,准备发送和接受数据DatagramSocket socket new DatagramSocket(9998);//将需要发送的数据,封装到Data…...

RK3588开发笔记-USB3.0接口调试
目录 前言 一、资源介绍 二、硬件连接 三、设备树配置...

AI绘画|midjourney入门保姆教程,30秒出专业大片,国内直接使用
同学们,之前大家想用midjourney还需要魔法上网和很复杂的注册配置,现在微信里就能使用midjourney了, 还支持中文,大家赶紧来试试吧。 AI写稿专家 www.promptspower.comhttp://www.promptspower.com 我们还给大家提供了各个行业的…...

阿里发布AI编码助手:通义灵码,兼容 VS Code、IDEA等主流编程工具
今天是阿里云栖大会的第一天,相信场外的瓜,大家都吃过了。这里就不说了,有兴趣可以看看这里:云栖大会变成相亲现场,最新招婿鄙视链来了... 。 这里主要说说阿里还发布了一款AI编码助手,对于我们开发者来说…...

【Linux】-进程控制(深度理解写时拷贝、exit函数、return的含义以及makefile编译多个程序)-进程创建、进程终止、进程等待、进程程序替换
💖作者:小树苗渴望变成参天大树🎈 🎉作者宣言:认真写好每一篇博客💤 🎊作者gitee:gitee✨ 💞作者专栏:C语言,数据结构初阶,Linux,C 动态规划算法🎄 如 果 你 …...

【mfc/VS2022】计图实验:绘图工具设计知识笔记3
实现类对串行化的支持 如果要用CArchive类保存对象的话,那么这个对象的类必须支持串行化。一个可串行化的类通常有一个Serialize成员函数。要想使一个类可串行化,要经历以下5个步骤: 1、从CObject派生类 2、重写Serialize成员函数 3、使用DE…...