当前位置: 首页 > news >正文

2.13日学习打卡----初学RocketMQ(四)

2.13日学习打卡

目录:

  • 2.13日学习打卡
  • 一.RocketMQ之Java Class
    • DefaultMQProducer类
    • DefaultMQPushConsumer类
    • Message类
    • MessageExt类
  • 二.RocketMQ 消费幂
    • 消费过程幂等
    • 消费速度慢的处理方式
  • 三.RocketMQ 集群服务
    • 集群特点
    • 单master模式
    • 多master模式
    • 多master多Slave模式-异步复制
    • 多Master多Slave模式-同步双写
  • 四.RocketMQ消息消费
    • 集群消费
    • 广播消费
    • 消息消费时的权衡

一.RocketMQ之Java Class

DefaultMQProducer类

概述
DefaultMQProducer类是应用发送消息使用的基类,封装一些通用的方法方便开发者在更多场景中使用。属于线程安全类,在配置并启动后可在多个线程间共享此对象。
其可以通过无参构造方法快速创建一个生产者,通过getter/setter方法,调整发送者的参数。主要负责消息的发送,支持同步/异步/oneway的发送方式,这些发送方式均支持批量发送。

方法

属性内容
DefaultMQProducerImpl defaultMQProducerImpl;生产者内部默认实现类
String producerGroup;Producer组名, 默认值为DEFAULT_PRODUCER。多个Producer如果属于一个应用,发送同样的消息,则应该将它们归为同一组。
String createTopicKey;自动创建测试的topic名称, 默认值为TBW102;在发送消息时,自动创建服务器不存在的topic,需要指定Key。broker必须开启isAutoCreateTopicEnable
int defaultTopicQueueNums;创建默认topic的queue数量。默认4
int sendMsgTimeout;发送消息超时时间,默认值10000,单位毫秒
int compressMsgBodyOverHowmuch;消息体压缩阈值,默认为4k(Consumer收到消息会自动解压缩)
int retryTimesWhenSendFailed;同步模式,返回发送消息失败前内部重试发送的最大次数。可能导致消息重复。默认2
int retryTimesWhenSendAsyncFailed;异步模式,返回发送消息失败前内部重试发送的最大次数。可能导致消息重复。默认2
boolean retryAnotherBrokerWhenNotStoreOK;声明发送失败时,下次是否投递给其他Broker,默认false
int maxMessageSize;最大消息大小。默认4M; 客户端限制的消息大小,超过报错,同时服务端也会限制
TraceDispatcher traceDispatcher消息追踪器,定义了异步传输数据接口。使用rcpHook来追踪消息

DefaultMQPushConsumer类

概述
DefaultMQPushConsumer类是rocketmq客户端消费者的实现,从名字上已经可以看出其消息获取方式为broker往消费端推送数据,其内部实现了流控,消费位置上报等等。DefaultMQPushConsumer是Push消费模式下的默认配置。

方法

字段内容
DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;消费者实现类,所有的功能都委托给DefaultMQPushConsumerImpl来实现
String consumerGroup;消费者组名,必须设置,参数默认值是:DEFAULT_CONSUMER (需要注意的是,多个消费者如果具有同样的组名,那么这些消费者必须只消费同一个topic)
MessageModel messageModel;消费的方式,支持以下两种 1、集群消费 2、广播消费。BROADCASTING 广播模式,即所有的消费者可以消费同样的消息CLUSTERING 集群模式,即所有的消费者平均来消费一组消息
ConsumeFromWhere consumeFromWhere;消费者从那个位置消费,分别为:CONSUME_FROM_LAST_OFFSET:第一次启动从队列最后位置消费,后续再启动接着上次消费的进度开始消费 ;CONSUME_FROM_FIRST_OFFSET:第一次启动从队列初始位置消费,后续再启动接着上次消费的进度开始消费;CONSUME_FROM_TIMESTAMP:第一次启动从指定时间点位置消费,后续再启动接着上次消费的进度开始消费(以上所说的第一次启动是指从来没有消费过的消费者,如果该消费者消费过,那么会在broker端记录该消费者的消费位置,如果该消费者挂了再启动,那么自动从上次消费的进度开始)
AllocateMessageQueueStrategy allocateMessageQueueStrategy;消息分配策略,用于集群模式下,消息平均分配给所有客户端;默认实现为AllocateMessageQueueAveragely
Map<String, String> subscription;topic对应的订阅tag
MessageListener messageListener;消息监听器 ,处理消息的业务就在监听里面。目前支持的监听模式包括:MessageListenerConcurrently,对应的处理逻辑类是MessageListener messageListener ;ConsumeMessageConcurrentlyService MessageListenerOrderly 对应的处理逻辑类是ConsumeMessageOrderlyService;两者使用不同的ACK机制。RocketMQ提供了ack机制,以保证消息能够被正常消费。发送者为了保证消息肯定消费成功,只有使用方明确表示消费成功,RocketMQ才会认为消息消费成功。中途断电,抛出异常等都不会认为成功——即都会重新投递。上面两个不同的监听模式使用的ACK机制是不一样的。
OffsetStore offsetStore;offset存储实现,分为本地存储或远程存储 。集群消费:从远程Broker获取。广播消费:从本地文件获取。

DefaultMQPushConsumer类

重要字段int consumeThreadMin = 20		线程池自动调整int consumeThreadMax = 64		线程池自动调整long adjustThreadPoolNumsThreshold = 100000 int consumeConcurrentlyMaxSpan = 2000单队列并行消费最大跨度,用于流控 int pullThresholdForQueue = 1000一个queue最大消费的消息个数,用于流控 long pullInterval = 0			检查拉取消息的间隔时间,由于是长轮询,所以为 0,但是如果应用为了流控,也可以设置大于 0 的值,单位毫秒,取值范围: [0, 65535]consumeMessageBatchMaxSize = 1	并发消费时,一次消费消息的数量,默认为1,假如修改为50,此时若有100条消息,那么会创建两个线程,每个线程分配50条消息。换句话说,批量消费最大消息条数,取值范围: [1, 1024]。默认是1pullBatchSize = 32				消费者去broker拉取消息时,一次拉取多少条。取值范围: [1, 1024]。默认是32 。可选配置boolean postSubscriptionWhenPull = false boolean unitMode = false 重要方法subscribe(String topic, String subExpression) 订阅某个topic,subExpression传*为订阅该topic所有消息 registerMessageListener(MessageListenerConcurrently messageListener) 注册消息回调,如果需要顺序消费,需要注册MessageListenerOrderly的实现 start 							启动消息消费	

Message类

含义Producer发送的消息定义为Message类位置org.apache.rocketmq.common.message字段定义如图字段详解topicMessage都有Topic这一属性,Producer发送指定Topic的消息,Consumer订阅Topic下的消息。通过Topic字段,Producer会获取消息投递的路由信息,决定发送给哪个Broker。flag网络通信层标记。bodyProducer要发送的实际消息内容,以字节数组形式进行存储。Message消息有一定大小限制。transactionIdRocketMQ 4.3.0引入的事务消息相关的事务编号。properties该字段为一个HashMap,存储了Message其余各项参数,比如tag、key等关键的消息属性。RocketMQ预定义了一组内置属性,除了内置属性之外,还可以设置任意自定义属性。当然属性的数量也是有限的,消息序列化之后的大小不能超过预设的最大消息大小。系统内置属性定义于org.apache.rocketmq.common.message.MessageConst	(如图)对于一些关键属性,Message类提供了一组set接口来进行设置,class Message {public void setTags(String tags) {...}public void setKeys(Collection<String> keys) {...}public void setDelayTimeLevel(int level) {...}public void setWaitStoreMsgOK(boolean waitStoreMsgOK) {...}public void setBuyerId(String buyerId) {...}}这几个set接口对应的作用分别为为,属性								接口				用途MessageConst.PROPERTY_TAGS		setTags			在消费消息时可以通过tag进行消息过滤判定MessageConst.PROPERTY_KEYS		setKeys			可以设置业务相关标识,用于消费处理判定,或消息追踪查询MessageConst.PROPERTY_DELAY_TIME_LEVEL	setDelayTimeLevel	消息延迟处理级别,不同级别对应不同延迟时间MessageConst.PROPERTY_WAIT_STORE_MSG_OK	setWaitStoreMsgOK	在同步刷盘情况下是否需要等待数据落地才认为消息发送成功`MessageConst.PROPERTY_BUYER_ID	setBuyerId		没有在代码中找到使用的地方,所以暂不明白其用处	这几个字段为什么用属性定义,而不是单独用一个字段进行表示?方便之处可能在于消息数据存盘结构早早定义,一些后期添加上的字段功能为了适应之前的存储结构,以属性形式存储在一个动态字段更为方便,自然兼容。

MessageExt类

含义对于发送方来说,上述Message的定义以足够。但对于RocketMQ的整个处理流程来说,还需要更多的字段信息用以记录一些必要内容,比如消息的id、创建时间、存储时间等等。在同package下可以找到与之相关的其余类定义。首先就是MessageExt,字段字段							用途queueId						记录MessageQueue编号,消息会被发送到Topic下的MessageQueuestoreSize					记录消息在Broker存盘大小queueOffset					记录在ConsumeQueue中的偏移sysFlag						记录一些系统标志的开关状态,MessageSysFlag中定义了系统标识bornTimestamp				消息创建时间,在Producer发送消息时设置storeHost					记录存储该消息的Broker地址msgId						消息IdcommitLogOffset				记录在Broker中存储便宜bodyCRC						消息内容CRC校验值reconsumeTimes				消息重试消费次数preparedTransactionOffset	事务详细相关字段	注意Message还有一个名为MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX的属性,在消息发送时由Producer生成创建。上面的msgId则是消息在Broker端进行存储时通过MessageDecoder.createMessageId方法生成的,其构成为(如图)这个MsgId是在Broker生成的,Producer在发送消息时没有该信息,Consumer在消费消息时则能获取到该值。RocketMQ也提供了相关命令,命令				实现类					描述queryMsgById	QueryMsgByIdSubCommand	根据MsgId查询消息		

二.RocketMQ 消费幂

在这里插入图片描述

消费过程幂等

RocketMQ无法避免消息重复(Exactly-Once),所以如果业务对消费重复非常敏感,务必要在业务层面进行去重处理。可以借助关系数据库进行去重。首先需要确定消息的唯一键,可以是msgId,也可以是消息内容中的唯一标识字段,例如订单Id等。在消费之前判断唯一键是否在关系数据库中存在。如果不存在则插入,并消费,否则跳过。(实际过程要考虑原子性问题,判断是否存在可以尝试插入,如果报主键冲突,则插入失败,直接跳过)msgId一定是全局唯一标识符,但是实际使用中,可能会存在相同的消息有两个不同msgId的情况(消费者主动重发、因客户端重投机制导致的重复等),这种情况就需要使业务字段进行重复消费。

消费速度慢的处理方式

提高消费并行度

绝大部分消息消费行为都属于 IO 密集型,即可能是操作数据库,或者调用 RPC,这类消费行为的消费速度在于后端数据库或者外系统的吞吐量,通过增加消费并行度,可以提高总的消费吞吐量,但是并行度增加到一定程度,反而会下降。所以,应用必须要设置合理的并行度。 如下有几种修改消费并行度的方法:

同一个 ConsumerGroup 下,通过增加 Consumer 实例数量来提高并行度(需要注意的是超过订阅队列数的 Consumer 实例无效)。可以通过加机器,或者在已有机器启动多个进程的方式。

提高单个 Consumer 的消费并行线程,通过修改参数 consumeThreadMin、consumeThreadMax实现。

批量方式消费

批量方式消费可以很大程度上提高消费吞吐量,例如订单扣款类应用,一次处理一个订单耗时 1 s,一次处理 10 个订单可能也只耗时 2 s,这样即可大幅度提高消费的吞吐量,通过设置 consumer的 consumeMessageBatchMaxSize 参数值,默认是 1,即一次只消费一条消息,例如设置为 N,那么每次消费的消息数小于等于 N。

跳过非重要消息

发生消息堆积时,如果消费速度一直追不上发送速度,如果业务对数据要求不高的话,可以选择丢弃不重要的消息。例如,当某个队列的消息数堆积到100000条以上,则尝试丢弃部分或全部消息,这样就可以快速追上发送消息的速度。示例代码如下:

    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {//队列偏移量long offset = msgs.get(0).getQueueOffset();//最大偏移量String maxOffset = msgs.get(0).getProperty(Message.PROPERTY_MAX_OFFSET);long diff = Long.parseLong(maxOffset) - offset;if (diff > 100000) {// TODO 消息堆积情况的特殊处理return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}// TODO 正常消费过程return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;} 

优化每条消息消费过程
举例如下,原来某条消息的消费过程如图中左侧的流程,优化后变成右侧
在这里插入图片描述
这条消息的消费过程中有4次与 DB的 交互,如果按照每次 5ms 计算,那么总共耗时 20ms,假设业务计算耗时 5ms,那么总过耗时 25ms,所以如果能把 4 次 DB 交互优化为 2 次,那么总耗时就可以优化到 15ms,即总体性能提高了 40%。所以应用如果对时延敏感的话,可以把DB部署在SSD硬盘,相比于SCSI磁盘,前者的RT会小很多。

消费打印日志

如果消息量较少,建议在消费入口方法打印消息,消费耗时等,方便后续排查问题

public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {log.info("RECEIVE_MSG_BEGIN: " + msgs.toString());// TODO 正常消费过程return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;} 

如果能打印每条消息消费耗时,那么在排查消费慢等线上问题时,会更方便。

三.RocketMQ 集群服务

在这里插入图片描述

集群特点

NameServer是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave的对应关系通过指定相同的BrokerName,不同的BrokerId来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。 每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有 NameServer。

Producer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。

Consumer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从 NameServer取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定。

单master模式

这种方式风险较大,一旦Broker重启或者宕机时,会导致整个服务不可用。不建议线上环境使用,可以用于本地测试。
启动 NameServer

### 首先启动Name Server
$ nohup sh mqnamesrv &
### 验证Name Server 是否启动成功
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...

启动 Broker

### 启动Broker
$ nohup sh bin/mqbroker -n localhost:9876 &### 验证Broker是否启动成功,例如Broker的IP为:192.168.1.2,且名称为broker-a
$ tail -f ~/logs/rocketmqlogs/broker.log 
The broker[broker-a, 192.169.1.2:10911] boot success...

多master模式

一个集群无Slave,全是Master,例如2个Master或者3个Master,这种模式的优缺点如下:
优点:配置简单,单个Master宕机或重启维护对应用无影响,在磁盘配置为RAID10时,即使机器宕机不可恢复情况下,由于RAID10磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢),性能最高;
缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到影响。
启动NameServer
NameServer需要先于Broker启动,且如果在生产环境使用,为了保证高可用,建议一般规模的集群启动3个NameServer,各节点的启动命令相同,如下:

 ### 首先启动Name Server$ nohup sh mqnamesrv &### 验证Name Server 是否启动成功$ tail -f ~/logs/rocketmqlogs/namesrv.logThe Name Server boot success...

启动Broker集群

 ### 在机器A,启动第一个Master,例如NameServer的IP为:192.168.1.1$ nohup sh mqbroker -n 192.168.1.1:9876 -c$ROCKETMQ_HOME/conf/2m-noslave/broker-a.properties &### 在机器B,启动第二个Master,例如NameServer的IP为:192.168.1.1$ nohup sh mqbroker -n 192.168.1.1:9876 -c$ROCKETMQ_HOME/conf/2m-noslave/broker-b.properties &...

多master多Slave模式-异步复制

每个Master配置一个Slave,有多对Master-Slave,HA采用异步复制方式,主备有短暂消息延迟(毫秒级),这种模式的优缺点如下:

优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,同时Master宕机后,消费者仍然可以从Slave消费,而且此过程对应用透明,不需要人工干预,性能同多Master模式几乎一样

缺点:Master宕机,磁盘损坏情况下会丢失少量消息。

启动NameServer

### 首先启动Name Server
$ nohup sh mqnamesrv &
### 验证Name Server 是否启动成功
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...

启动Broker集群

### 在机器A,启动第一个Master,例如NameServer的IP
为:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c
$ROCKETMQ_HOME/conf/2m-2s-async/broker-a.properties &
### 在机器B,启动第二个Master,例如NameServer的IP
为:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c
$ROCKETMQ_HOME/conf/2m-2s-async/broker-b.properties &
### 在机器C,启动第一个Slave,例如NameServer的IP
为:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c
$ROCKETMQ_HOME/conf/2m-2s-async/broker-a-s.properties &
### 在机器D,启动第二个Slave,例如NameServer的IP
为:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c
$ROCKETMQ_HOME/conf/2m-2s-async/broker-b-s.properties &

多Master多Slave模式-同步双写

每个Master配置一个Slave,有多对Master-Slave,HA采用同步双写方式,即只有主备都写成功,才向应用返回成功,这种模式的优缺点如下:

优点:数据与服务都无单点故障,Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高;

缺点:性能比异步复制模式略低(大约低10%左右),发送单个消息的RT会略高,且目前版本在主节点宕机后,备机不能自动切换为主机。

启动NameServer

### 首先启动Name Server$ nohup sh mqnamesrv &### 验证Name Server 是否启动成功$ tail -f ~/logs/rocketmqlogs/namesrv.logThe Name Server boot success...

启动Broker集群

### 在机器A,启动第一个Master,例如NameServer的IP
为:192.168.1.1$ nohup sh mqbroker -n 192.168.1.1:9876 -c
$ROCKETMQ_HOME/conf/2m-2s-sync/broker-a.properties &### 在机器B,启动第二个Master,例如NameServer的IP
为:192.168.1.1$ nohup sh mqbroker -n 192.168.1.1:9876 -c
$ROCKETMQ_HOME/conf/2m-2s-sync/broker-b.properties &### 在机器C,启动第一个Slave,例如NameServer的IP
为:192.168.1.1$ nohup sh mqbroker -n 192.168.1.1:9876 -c
$ROCKETMQ_HOME/conf/2m-2s-sync/broker-a-s.properties &### 在机器D,启动第二个Slave,例如NameServer的IP
为:192.168.1.1$ nohup sh mqbroker -n 192.168.1.1:9876 -c
$ROCKETMQ_HOME/conf/2m-2s-sync/broker-b-s.properties &

以上Broker与Slave配对是通过指定相同的BrokerName参数来配对,Master的BrokerId必须是0,Slave的BrokerId必须是大于0的数。另外一个Master下面可以挂载多个Slave,同一Master下的多个Slave通过指定不同的BrokerId来区分。$ROCKETMQ_HOME指的RocketMQ安装目录,需要用户自己设置此环境变量

四.RocketMQ消息消费

集群消费

在这里插入图片描述
消费者的一种消费模式。一个Consumer Group中的各个Consumer实例分摊去消费消息,即一条消息只会投递到一个Consumer Group下面的一个实例。

实际上,每个Consumer是平均分摊Message Queue的去做拉取消费。例如某个Topic有3条Q,其中一个Consumer Group有3个实例(可能是3个进程,或者3台机器),那么每个实例只消费其中的1条Q。

而由Producer发送消息的时候是轮询所有的Q,所以消息会平均散落在不同的Q上,可以认为Q上的消息是平均的。那么实例也就平均地消费消息了。

这种模式下,消费进度(Consumer Offset)的存储会持久化到Broker。


import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;import static com.morris.rocketmq.util.Contant.CONSUMER_GROUP;
import static com.morris.rocketmq.util.Contant.NAME_SERVER_ADDRESS;/*** 集群消费消息(默认)* ClusterConsumer 类,用于从 RocketMQ 集群消费消息*/
public class ClusterConsumer {public static void main(String[] args) throws InterruptedException, MQClientException {// 使用指定的消费者组名来实例化 DefaultMQPushConsumer。DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);// 指定 NameServer 地址。consumer.setNamesrvAddr(NAME_SERVER_ADDRESS);consumer.setMessageModel(MessageModel.CLUSTERING); // 设置消息模式为集群消费模式(默认)。// 订阅一个或多个要消费的主题。consumer.subscribe("TopicTest", "*");// 注册回调函数,以便在从代理获取的消息到达时执行。consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 标记消息已成功消费。});// 启动消费者实例。consumer.start();System.out.printf("Consumer Started.%n");}
}
  • CONSUMER_GROUP 和 NAME_SERVER_ADDRESS 是从 Contant 类中获取的常量,用于指定消费者组和 NameServer 地址。
  • MessageModel.CLUSTERING 设置消费模式为集群消费模式,默认情况下 RocketMQ 使用集群消费模式。
  • consumer.subscribe(“TopicTest”, ““) 订阅了名为 “TopicTest” 的主题,”” 表示消费该主题下的所有消息。
  • consumer.registerMessageListener(…) 注册了一个消息监听器,用于接收从代理获取的消息,并处理这些消息。
  • consumer.start() 启动了消费者实例,开始消费消息。

广播消费

在这里插入图片描述
消费者的一种消费模式。消息将对一个Consumer Group下的各个Consumer实例都投递一遍。即即使这些Consumer属于同一个Consumer Group,消息也会被Consumer Group 中的每个Consumer都消费一次。

实际上,是一个消费组下的每个消费者实例都获取到了topic下面的每个Message Queue去拉取消费。所以消息会投递到每个消费者实例。

这种模式下,消费进度(Consumer Offset)会存储持久化到实例本地。


import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;import static com.morris.rocketmq.util.Contant.CONSUMER_GROUP;
import static com.morris.rocketmq.util.Contant.NAME_SERVER_ADDRESS;/*** 广播消费消息(默认)* BroadcastingConsumer 类,用于从 RocketMQ 广播消费消息*/
public class BroadcastingConsumer {public static void main(String[] args) throws InterruptedException, MQClientException {// 使用指定的消费者组名来实例化 DefaultMQPushConsumer。DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);// 指定 NameServer 地址。consumer.setNamesrvAddr(NAME_SERVER_ADDRESS);consumer.setMessageModel(MessageModel.BROADCASTING); // 设置消息模式为广播消费模式(默认)。// 订阅一个或多个要消费的主题。consumer.subscribe("TopicTest", "*");// 注册回调函数,以便在从代理获取的消息到达时执行。consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 标记消息已成功消费。});// 启动消费者实例。consumer.start();System.out.printf("Consumer Started.%n");}
}
  • CONSUMER_GROUP 和 NAME_SERVER_ADDRESS 是从 Contant 类中获取的常量,用于指定消费者组和 NameServer 地址。
  • MessageModel.BROADCASTING 设置消费模式为广播消费模式,默认情况下 RocketMQ 使用广播消费模式。
  • consumer.subscribe(“TopicTest”, ““) 订阅了名为 “TopicTest” 的主题,”” 表示消费该主题下的所有消息。
  • consumer.registerMessageListener(…) 注册了一个消息监听器,用于接收从代理获取的消息,并处理这些消息。
  • consumer.start() 启动了消费者实例,开始消费消息。

这种模式下消费者只能收到启动后发送MQ中的消息。

消息消费时的权衡

集群模式

  • 消费端集群化部署,每条消息只需要被处理一次。
  • 由于消费进度在服务端维护,可靠性更高。
  • 集群消费模式下,每一条消息都只会被分发到一台机器上处理。如果需要被集群下的每一台机器都处理,请使用广播模式。
  • 集群消费模式下,不保证每一次失败重投的消息路由到同一台机器上,因此处理消息时不应该做任何确定性假设。

广播模式

  • 广播消费模式下不支持顺序消息。
  • 广播消费模式下不支持重置消费位点。
  • 每条消息都需要被相同逻辑的多台机器处理。
  • 消费进度在客户端维护,出现重复的概率稍大于集群模式。

广播模式下,消息队列RocketMQ保证每条消息至少被每台客户端消费一次,但是并不会对消费失败的消息进行失败重投,因此业务方需要关注消费失败的情况。

广播模式下,客户端每一次重启都会从最新消息消费。客户端在被停止期间发送至服务端的消息将会被自动跳过,请谨慎选择。

广播模式下,每条消息都会被大量的客户端重复处理,因此推荐尽可能使用集群模式。

目前仅Java客户端支持广播模式。广播模式下服务端不维护消费进度,所以消息队列RocketMQ控制台不支持消息堆积查询、消息堆积报警和订阅关系查询功能。

如果我的内容对你有帮助,请点赞,评论,收藏。创作不易,大家的支持就是我坚持下去的动力!在这里插入图片描述

相关文章:

2.13日学习打卡----初学RocketMQ(四)

2.13日学习打卡 目录&#xff1a; 2.13日学习打卡一.RocketMQ之Java ClassDefaultMQProducer类DefaultMQPushConsumer类Message类MessageExt类 二.RocketMQ 消费幂消费过程幂等消费速度慢的处理方式 三.RocketMQ 集群服务集群特点单master模式多master模式多master多Slave模式-…...

ZigBee学习——BDB

✨本博客参考了善学坊的教程&#xff0c;并总结了在实现过程中遇到的问题。 善学坊官网 文章目录 一、BDB简介二、BDB Commissioning Modes2.1 Network Steering2.2 Network Formation2.3 Finding and Binding&#xff08;F & B&#xff09;2.4 Touchlink 三、BDB Commissi…...

使用Docker快速部署MySQL

部署MySQL 使用Docker安装&#xff0c;仅仅需要一步即可&#xff0c;在命令行输入下面的命令 docker run -d \--name mysql \-p 3306:3306 \-e TZAsia/Shanghai \-e MYSQL_ROOT_PASSWORD123456 \mysql MySQL安装完毕&#xff01;通过任意客户端工具即可连接到MySQL. 当我们执…...

力扣热题100_滑动窗口_3_无重复字符的最长子串

文章目录 题目链接解题思路解题代码 题目链接 3. 无重复字符的最长子串 给定一个字符串 s &#xff0c;请你找出其中不含有重复字符的 最长子串 的长度。 示例 1: 输入: s “abcabcbb” 输出: 3 解释: 因为无重复字符的最长子串是 “abc”&#xff0c;所以其长度为 3。 示…...

RM电控工程讲义

HAL_CAN_RxFifo0MsgPendingCallback(CAN_HandleTypeDef *hcan) 是一个回调函数&#xff0c;通常在STM32的HAL库中用于处理CAN&#xff08;Controller Area Network&#xff09;接收FIFO 0中的消息。当CAN接口在FIFO 0中有待处理的消息时&#xff0c;这个函数会被调用。 HAL库C…...

论文阅读:《Deep Learning-Based Human Pose Estimation: A Survey》——Part 1:2D HPE

目录 人体姿态识别概述 论文框架 HPE分类 人体建模模型 二维单人姿态估计 回归方法 目前发展 优化 基于热图的方法 基于CNN的几个网络 利用身体结构信息提供构建HPE网络 视频序列中的人体姿态估计 2D多人姿态识别 方法 自上而下 自下而上 2D HPE 总结 数据集…...

C语言——oj刷题——杨氏矩阵

目录 1. 理解杨氏矩形的特点 2. 实现杨氏矩形查找算法 3. 编写示例代码 当我们谈到杨氏矩形时&#xff0c;我们指的是一种在二维数组中查找目标元素的高效算法。它是由杨氏&#xff08;Yan Shi&#xff09;教授提出的&#xff0c;因此得名为杨氏矩形。 杨氏矩形问题的场景是…...

C++ 50道面试题

1. static关键字 1.全局static变量 存储位置&#xff1a;静态存储区&#xff0c;在程序运行期间一直存在 初始化&#xff1a; 未手动初始化的变量自动初始化为0 作用域&#xff1a; 从定义之处开始&#xff0c;到文件结束&#xff0c;仅能在本文件中使用 2.局部static变量…...

寒假学习记录14:JS字符串

目录 查找字符串中的特定元素 String.indexOf() &#xff08;返回索引值&#xff09; 截取字符串的一部分 .substring() &#xff08;不影响原数组&#xff09;&#xff08;不允许负值&#xff09; 截取字符串的一部分 .slice() &#xff08;不影响原数…...

【数学建模】【2024年】【第40届】【MCM/ICM】【C题 网球运动中的“动量”】【解题思路】

一、题目 &#xff08;一&#xff09; 赛题原文 2024 MCM Problem C: Momentum in Tennis In the 2023 Wimbledon Gentlemen’s final, 20-year-old Spanish rising star Carlos Alcaraz defeated 36-year-old Novak Djokovic. The loss was Djokovic’s first at Wimbledon…...

无人驾驶LQR控制算法 c++ 实现

参考博客&#xff1a; &#xff08;1&#xff09;LQR的理解与运用 第一期——理解篇 &#xff08;2&#xff09;线性二次型调节器(LQR)原理详解 &#xff08;3&#xff09;LQR控制基本原理&#xff08;包括Riccati方程具体推导过程&#xff09; &#xff08;4&#xff09;【基础…...

Karnaugh map (卡诺图)

【Leetcode】 289. Game of Life According to Wikipedia’s article: “The Game of Life, also known simply as Life, is a cellular automaton devised by the British mathematician John Horton Conway in 1970.” The board is made up of an m x n grid of cells, wh…...

C# CAD 框选pdf输出

在C#中进行AutoCAD二次开发时&#xff0c;实现框选&#xff08;窗口选择&#xff09;实体并输出这些实体到PDF文件通常涉及以下步骤&#xff1a; public ObjectIdCollection GetSelectedEntities() {using (var acTrans HostApplicationServices.WorkingDatabase.Transaction…...

【Linux】 Linux 小项目—— 进度条

进度条 基础知识1 \r && \n2 行缓冲区3 函数介绍 进度条实现版本 1代码实现运行效果 版本2 Thanks♪(&#xff65;ω&#xff65;)&#xff89;谢谢阅读&#xff01;&#xff01;&#xff01;下一篇文章见&#xff01;&#xff01;&#xff01; 基础知识 1 \r &&a…...

Sora和Pika,RunwayMl,Stable Video对比!网友:Sora真王者,其他都是弟

大家好&#xff0c;我是木易&#xff0c;一个持续关注AI领域的互联网技术产品经理&#xff0c;国内Top2本科&#xff0c;美国Top10 CS研究生&#xff0c;MBA。我坚信AI是普通人变强的“外挂”&#xff0c;所以创建了“AI信息Gap”这个公众号&#xff0c;专注于分享AI全维度知识…...

Go内存优化与垃圾收集

Go提供了自动化的内存管理机制&#xff0c;但在某些情况下需要更精细的微调从而避免发生OOM错误。本文介绍了如何通过微调GOGC和GOMEMLIMIT在性能和内存效率之间取得平衡&#xff0c;并尽量避免OOM的产生。原文: Memory Optimization and Garbage Collector Management in Go 本…...

【Spring】Bean 的生命周期

一、Bean 的生命周期 Spring 其实就是一个管理 Bean 对象的工厂&#xff0c;它负责对象的创建&#xff0c;对象的销毁等 所谓的生命周期就是&#xff1a;对象从创建开始到最终销毁的整个过程 什么时候创建 Bean 对象&#xff1f;创建 Bean 对象的前后会调用什么方法&#xf…...

云计算基础-存储基础

存储概念 什么是存储&#xff1a; 存储就是根据不同的应用程序环境&#xff0c;通过采取合理、安全、有效的方式将数据保存到某些介质上&#xff0c;并能保证有效的访问&#xff0c;存储的本质是记录信息的载体。 存储的特性&#xff1a; 数据临时或长期驻留的物理介质需要保…...

问题:人的安全知识和技能是天生的。() #媒体#知识分享#学习方法

问题&#xff1a;人的安全知识和技能是天生的。&#xff08;) 人的安全知识和技能是天生的。() 参考答案如图所示 问题&#xff1a;&#xff08;&#xff09;是党和国家的根本所在、命脉所在&#xff0c;是全国各族人民的利益所在、幸福所在。 A.人民当家作主 B.坚持和完善…...

【数据分享】2001~2020年青藏高原植被净初级生产力数据集

各位同学们好&#xff0c;今天和大伙儿分享的是2001~2020年青藏高原植被净初级生产力数据集。如果大家有下载处理数据等方面的问题&#xff0c;您可以私信或评论。 朱军涛. (2022). 青藏高原植被净初级生产力数据集&#xff08;2001-2020&#xff09;. 国家青藏高原数据中心. …...

【Spring MVC篇】返回响应

个人主页&#xff1a;兜里有颗棉花糖 欢迎 点赞&#x1f44d; 收藏✨ 留言✉ 加关注&#x1f493;本文由 兜里有颗棉花糖 原创 收录于专栏【Spring MVC】 本专栏旨在分享学习Spring MVC的一点学习心得&#xff0c;欢迎大家在评论区交流讨论&#x1f48c; 目录 一、返回静态页面…...

阿里云BGP多线精品EIP香港CN2线路低时延,价格贵

阿里云香港等地域服务器的网络线路类型可以选择BGP&#xff08;多线&#xff09;和 BGP&#xff08;多线&#xff09;精品&#xff0c;普通的BGP多线和精品有什么区别&#xff1f;BGP&#xff08;多线&#xff09;适用于香港本地、香港和海外之间的互联网访问。使用BGP&#xf…...

(08)Hive——Join连接、谓词下推

前言 Hive-3.1.2版本支持6种join语法。分别是&#xff1a;inner join&#xff08;内连接&#xff09;、left join&#xff08;左连接&#xff09;、right join&#xff08;右连接&#xff09;、full outer join&#xff08;全外连接&#xff09;、left semi join&#xff08;左…...

创新技巧|迁移到 Google Analytics 4 时如何保存历史 Universal Analytics 数据

Google Universal Analytics 从 2023 年 7 月起停止收集数据&#xff08;除了付费 GA360 之外&#xff09;。它被Google Analytics 4取代。为此&#xff0c;不少用户疑惑&#xff1a;是否可以将累积&#xff08;历史&#xff09;数据从 Google Analytics Universal 传输到 Goog…...

一个小而实用的 Python 包 pangu,实现在中文和半宽字符(字母、数字和符号)之间自动插入空格

&#x1f349; CSDN 叶庭云&#xff1a;https://yetingyun.blog.csdn.net/ 一个小巧的库&#xff0c;可以避免自己重新开发功能。利用 Python 包 pangu&#xff0c;可以轻松实现在 CJK&#xff08;中文、日文、韩文&#xff09;和半宽字符&#xff08;字母、数字和符号&#xf…...

openJudge | 中位数 C语言

总时间限制: 2000ms 内存限制: 65536kB 描述 中位数定义&#xff1a;一组数据按从小到大的顺序依次排列&#xff0c;处在中间位置的一个数或最中间两个数据的平均值&#xff08;如果这组数的个数为奇数&#xff0c;则中位数为位于中间位置的那个数&#xff1b;如果这组数的个…...

ctfshow-文件上传(web151-web161)

目录 web151 web152 web153 web154 web155 web156 web157 web158 web159 web160 web161 web151 提示前台验证不可靠 那限制条件估计就是在前端设置的 上传php小马后 弹出了窗口说不支持的格式 查看源码 这一条很关键 这种不懂直接ai搜 意思就是限制了上传类型 允许…...

cudnn免登录下载

现在要下载cuDNN&#xff0c;点击下载的页面后都是出现要求先加入Nvidia developers才能进行下载&#xff0c;但这个注册的过程非常慢&#xff0c;常常卡在第二个步骤&#xff0c;这里根据亲身的经验介绍一个可以绕过这个注册或登陆步骤的方式直接下载cuDNN。遇到此类问题的可以…...

SQLyog安装配置(注册码)连接MySQL

下载资源 博主给你打包好了安装包&#xff0c;在网盘里&#xff0c;只有几Mb&#xff0c;防止你下载到钓鱼软件 快说谢谢博主&#xff08;然后心甘情愿的点个赞~&#x1f60a;&#xff09; SQLyog.zip 安装流程 ①下载好压缩包后并解压 ②打开文件夹&#xff0c;双击安装包 ③…...

java+SSM+mysql 开放式实验管理系统78512-计算机毕业设计项目选题推荐(免费领源码)

摘 要 我国高校开放式实验管理普遍存在实验设备使用率较低、管理制度不完善,实验设备共享程度不高等诸多问题。要在更大范围推行开放式实验管理,就必须在开放式实验教学管理流程中,通过引入信息化管理加大信息技术在其中的应用,才能真正发挥这种教学模式的开放性优势。 本系统…...

代码随想录算法训练营第三十三天|1005.K次取反后最大化的数组和、134.加油站、135.分发糖果

1005.K次取反后最大化的数组和 public class Solution {public int LargestSumAfterKNegations(int[] nums, int k) {int cnt0;int sum0;int minint.MaxValue;Array.Sort(nums);for(int i0;i<nums.Length;i){if(nums[i]>0){continue;}else{nums[i]-nums[i];cnt;}if(cntk…...

解决LeetCode编译器报错的技巧:正确处理位操作中的数据类型

一天我在leetcode上刷题时&#xff0c;遇到了这样的题目&#xff1a; 随即我写了如下的代码&#xff1a; int convertInteger(int A, int B) {int count 0;int C A ^ B;int flag 1;while(flag){if (C & flag){count;}flag<<1;}return count;} 但LeetCode显示如下…...

一周学会Django5 Python Web开发-Django5操作命令

锋哥原创的Python Web开发 Django5视频教程&#xff1a; 2024版 Django5 Python web开发 视频教程(无废话版) 玩命更新中~_哔哩哔哩_bilibili2024版 Django5 Python web开发 视频教程(无废话版) 玩命更新中~共计11条视频&#xff0c;包括&#xff1a;2024版 Django5 Python we…...

反转链表【基础算法精讲 06】

视频地址 反转链表【基础算法精讲 06】_哔哩哔哩_bilibili 概念 链表的每一个结点都包含节点值 和1指向下一个结点的next指针 , 链表的最后一个结点指向空; 206 . 反转链表 用cur记录当前遍历到的结点 &#xff0c; 用pre表示下一个结点 &#xff0c; 用nxt表示cur的下一个…...

Git 初学

目录 一、需求的产生 二、版本控制系统理解 1. 认识版本控制系统 2. 版本控制系统分类 &#xff08;1&#xff09;集中式版本控制系统 缺点&#xff1a; &#xff08;2&#xff09;分布式版本控制系统 三、初识 git 四、git 的使用 例&#xff1a;将 “ OLED文件夹 ”…...

智胜未来,新时代IT技术人风口攻略-第四版(弃稿)

文章目录 前言鸿蒙生态科普调研人员画像高校助力鸿蒙高校鸿蒙课程开设占比教研力量并非唯一原因 企业布局规划全盘接纳仍需一段时间企业对鸿蒙的一些诉求 机构入场红利机构鸿蒙课程开设占比机构对鸿蒙的一些诉求 鸿蒙实际体验高校用户群体高度认同与影响体验企业用户群体未来可…...

渗透专用虚拟机(公开版)

0x01 工具介绍 okfafu渗透虚拟机公开版。解压密码&#xff1a;Mrl64Miku&#xff0c;压缩包大小&#xff1a;15.5G&#xff0c;解压后大小&#xff1a;16.5G。安装的软件已分类并在桌面中体现&#xff0c;也可以使用everything进行查找。包含一些常用的渗透工具以及一些基本工…...

HCIA-HarmonyOS设备开发认证V2.0-3.2.轻量系统内核基础-时间管理

目录 一、时间管理1.1、时间接口1.2、代码分析&#xff08;待续...&#xff09; 坚持就有收获 一、时间管理 时间管理以系统时钟为基础&#xff0c;给应用程序提供所有和时间有关的服务。系统时钟是由定时器/计数器产生的输出脉冲触发中断产生的&#xff0c;一般定义为整数或长…...

嵌入式培训机构四个月实训课程笔记(完整版)-Linux ARM驱动编程第五天-ARM Linux编程之file_operations详解 (物联技术666)

链接&#xff1a;https://pan.baidu.com/s/1V0E9IHSoLbpiWJsncmFgdA?pwd1688 提取码&#xff1a;1688 struct file_operations{ struct module *owner; // 指向拥有该结构的模块的指针&#xff0c;避免正在操作时被卸载&#xff0c;一般为初始化THIS_MODULES loff_t &#…...

第9章 网络编程

9.1 网络通信协议 通过计算机网络可以实现多台计算机连接&#xff0c;但是不同计算机的操作系统和硬件体系结构不同&#xff0c;为了提供通信支持&#xff0c;位于同一个网络中的计算机在进行连接和通信时必须要遵守一定的规则&#xff0c;这就好比在道路中行驶的汽车一定要遵…...

Python setattr函数

在Python编程中&#xff0c;setattr()函数是一个有用且灵活的内置函数&#xff0c;用于设置对象的属性值。它可以在运行时动态地设置对象的属性&#xff0c;无论是新建对象还是已有对象。本文将深入探讨setattr()函数的用法、语法、示例代码&#xff0c;并探讨其在实际编程中的…...

[C#]winform制作仪表盘好用的表盘控件和使用方法

【仪表盘一般创建流程】 在C#中制作仪表盘文案&#xff08;通常指仪表盘上的文本、数字或指标显示&#xff09;涉及到使用图形用户界面&#xff08;GUI&#xff09;组件&#xff0c;比如Windows Forms、WPF (Windows Presentation Foundation) 或 ASP.NET 等。以下是一个使用W…...

探索弗洛姆的思想:人类本质与爱的哲学

探索弗洛姆的思想&#xff1a;人类本质与爱的哲学 摘要&#xff1a;艾里希弗洛姆是20世纪著名的德裔美国哲学家和精神分析学家&#xff0c;他的理论在心理学、社会学和哲学领域均产生了广泛影响。弗洛姆的核心思想围绕人的本性、自由发展以及爱的重要性展开&#xff0c;强调了人…...

【碎片知识点】安装Linux系统 VMware与kali

天命&#xff1a;VMware就是可以运行操作系统的载体&#xff0c;kali就是Linux的其中一个分支 天命&#xff1a;Linux有两个分支版本&#xff1a;centos与ubuntu&#xff0c;kali底层就是ubuntu&#xff08;所有Linux用起来都差不多&#xff0c;没啥区别&#xff09; 天命&…...

Android 车载应用开发之SystemUI 详解

一、SystemUI SystemUI全称System User Interface,直译过来就是系统级用户交互界面,在 Android 系统中由SystemUI负责统一管理整个系统层的 UI,它是一个系统级应用程序(APK),源码在/frameworks/base/packages/目录下,而不是在/packages/目录下,这也说明了SystemUI这个…...

C# CAD-Xdata数据 添加(一)

运行环境Visual Studio 2022 c# cad2016 一、XData&#xff08;扩展数据&#xff09;特定代码值 XData&#xff08;扩展数据&#xff09;特定代码值 XData通过一系列DXF组码&#xff08;DxfCode&#xff09;存储不同类型的数据&#xff0c;包括但不限于ASCII字符串、已注册应…...

【NLP】MHA、MQA、GQA机制的区别

Note LLama2的注意力机制使用了GQA。三种机制的图如下&#xff1a; MHA机制&#xff08;Multi-head Attention&#xff09; MHA&#xff08;Multi-head Attention&#xff09;是标准的多头注意力机制&#xff0c;包含h个Query、Key 和 Value 矩阵。所有注意力头的 Key 和 V…...

nginx upstream server主动健康监测模块添加https检测功能

1 缘起 前面的《nginx upstream server主动健康检测模块ngx_http_upstream_check_module 使用和源码分析》系列已经分析了ngx_http_upstream_check_module的实现原理&#xff0c;并且在借助这个模块的框架实现了一个udp健康检测的新功能。   但是ngx_http_upstream_check_mod…...

OCP的operator——(4)用户任务:使用Operator创建etcd集群

文章目录 环境在namespace中安装Operator先决条件使用Web console从OperatorHub安装删除 使用CLI从OperatorHub安装从已安装的Operator创建应用使用Operator创建etcd集群报错从web console debug从命令行debug分析 参考 环境 RHEL 9.3Red Hat OpenShift Local 2.32 在namespa…...

win7自带截图工具保存失效解决办法

今日发现一台远航技术的win7中自带的截图工具使用时正常&#xff0c;保存图片时没有弹出保存位置的对话窗口&#xff0c;无法正常保存图片。解决方案如下&#xff1a; 1、进入注册表编辑器。开始-搜索程序和文件-输入 regedit 按下回车键&#xff0c;打开注册表&#xff1b; 2、…...