在这里,我们将给大家分享关于聊聊rocketmq的updateConsumeOffsetToBroker的知识,让您更了解rocketmqupdatetopic的本质,同时也会涉及到如何更有效地Roc
在这里,我们将给大家分享关于聊聊rocketmq的updateConsumeOffsetToBroker的知识,让您更了解rocketmq updatetopic的本质,同时也会涉及到如何更有效地RocketMQ Externals RocketMQ-ConsoleRocketMQ-JMSRocketMQ-FlumeRocketMQ-FlinkRocketMQ-SparkRocketMQ-DockerRocketMQ-MySQLRocketMQ-CPP其他 Apache RocketMQ 的扩展项目、RocketMQ学习笔记(13)----RocketMQ的Consumer消息重试、RocketMQ学习笔记(6)----RocketMQ的Client的使用 Producer/Consumer、RocketMQ源码学习(五)-Broker(与Consumer交互部分)的内容。
本文目录一览:- 聊聊rocketmq的updateConsumeOffsetToBroker(rocketmq updatetopic)
- RocketMQ Externals RocketMQ-ConsoleRocketMQ-JMSRocketMQ-FlumeRocketMQ-FlinkRocketMQ-SparkRocketMQ-DockerRocketMQ-MySQLRocketMQ-CPP其他 Apache RocketMQ 的扩展项目
- RocketMQ学习笔记(13)----RocketMQ的Consumer消息重试
- RocketMQ学习笔记(6)----RocketMQ的Client的使用 Producer/Consumer
- RocketMQ源码学习(五)-Broker(与Consumer交互部分)
聊聊rocketmq的updateConsumeOffsetToBroker(rocketmq updatetopic)
序
本文主要研究一下rocketmq的updateConsumeOffsetToBroker
updateConsumeOffsetToBroker
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
public class RemoteBrokerOffsetStore implements OffsetStore {
//......
/**
* Update the Consumer Offset synchronously, once the Master is off, updated to Slave,
* here need to be optimized.
*/
@Override
public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
MQBrokerException, InterruptedException, MQClientException {
FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
if (null == findBrokerResult) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
}
if (findBrokerResult != null) {
UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader();
requestHeader.setTopic(mq.getTopic());
requestHeader.setConsumerGroup(this.groupName);
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setCommitOffset(offset);
if (isOneway) {
this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway(
findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
} else {
this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset(
findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
}
} else {
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
}
//......
}
- RemoteBrokerOffsetStore的updateConsumeOffsetToBroker方法首先通过mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName())获取findBrokerResult
- 若返回null,则执行mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()),然后再执行mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName())获取findBrokerResult
- 之后对于findBrokerResult不为null的情况构建UpdateConsumerOffsetRequestHeader,然后执行mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway或者mQClientFactory.getMQClientAPIImpl().updateConsumerOffset
findBrokerAddressInAdmin
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
public class MQClientInstance {
//......
public FindBrokerResult findBrokerAddressInAdmin(final String brokerName) {
String brokerAddr = null;
boolean slave = false;
boolean found = false;
HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);
if (map != null && !map.isEmpty()) {
for (Map.Entry<Long, String> entry : map.entrySet()) {
Long id = entry.getKey();
brokerAddr = entry.getValue();
if (brokerAddr != null) {
found = true;
if (MixAll.MASTER_ID == id) {
slave = false;
} else {
slave = true;
}
break;
}
} // end of for
}
if (found) {
return new FindBrokerResult(brokerAddr, slave, findBrokerVersion(brokerName, brokerAddr));
}
return null;
}
//......
}
- findBrokerAddressInAdmin方法首先从brokerAddrTable获取指定brokerName的brokerId及address的map,然后遍历map,对于brokerAddr不为null的标记found为true,标记brokerId为MixAll.MASTER_ID的slave为false,否则为true,最后跳出循环;若found为true则构造FindBrokerResult返回,否则返回null
小结
- RemoteBrokerOffsetStore的updateConsumeOffsetToBroker方法首先通过mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName())获取findBrokerResult
- 若返回null,则执行mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()),然后再执行mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName())获取findBrokerResult
- 之后对于findBrokerResult不为null的情况构建UpdateConsumerOffsetRequestHeader,然后执行mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway或者mQClientFactory.getMQClientAPIImpl().updateConsumerOffset
doc
- RemoteBrokerOffsetStore
RocketMQ Externals RocketMQ-ConsoleRocketMQ-JMSRocketMQ-FlumeRocketMQ-FlinkRocketMQ-SparkRocketMQ-DockerRocketMQ-MySQLRocketMQ-CPP其他 Apache RocketMQ 的扩展项目
RocketMQ Externals RocketMQ-ConsoleRocketMQ-JMSRocketMQ-FlumeRocketMQ-FlinkRocketMQ-SparkRocketMQ-DockerRocketMQ-MySQLRocketMQ-CPP其他 介绍
由 Apache RocketMQ 社区贡献并维护的 Apache RocketMQ 扩展项目。
RocketMQ-Console
使用 Spring Boot 重新设计的 RocketMQ 控制台
RocketMQ-JMS
RocketMQ 的 JMS 1.1 规范实现
RocketMQ-Flume
Flume RocketMQ source 和 sink 的实现
RocketMQ-Flink
集成 Apache Flink 和 Apache RocketMQ。详细介绍请查看
README
RocketMQ-Spark
集成 Apache Spark 和 Apache RocketMQ,提供了 push & pull 消费者。详细介绍请查看
README
RocketMQ-Docker
Apache RocketMQ Docker 提供了 Dockerfile 和 bash 脚本用于构建和运行 Docker 镜像
RocketMQ-MysqL
该项目是 MysqL 和其他系统之间的数据复制器。详细介绍请查看 README
RocketMQ-CPP
稳定、广泛使用的 Apache RocketMQ C++ 客户端 SDK
其他
RocketMQ-Druid, RocketMQ-
Ignite 和
RocketMQ-Storm 的集成
RocketMQ Externals RocketMQ-ConsoleRocketMQ-JMSRocketMQ-FlumeRocketMQ-FlinkRocketMQ-SparkRocketMQ-DockerRocketMQ-MySQLRocketMQ-CPP其他 官网
https://github.com/apache/rocketmq-externals
RocketMQ学习笔记(13)----RocketMQ的Consumer消息重试
1. 概念
Producer端重试:
生产者端的消息失败,也就是Producer往MQ上发消息没有发送成功,比如网络抖动导致生产者发送消息到MQ失败。
这种消息失败重试我们可以手动设置发送失败重试的次数。
Consumer端重试:
Consumer消费消息失败后,要提供一种重试机制,令消息再消费一次,Consumer消费消息失败通常可以认为有以下几种情况
1. 由于消息本身的原因,例如反序列化失败,消息数据本身无法处理(例如话费充值,当前消息的手机被注销,无法充值)等。这种错误通常需要跳过这条消息,再消费其他消息,而且这条失败消息即使立刻重试消费,99%也不成功,所以最后提供一种定时的重试机制,即过10s再重试。
2. 由于依赖下游应用服务不可用,例如db连接不可用,外系统网络不可达等。
遇到这种错误,即使跳过当前失败的消息,消费其他消息也会报错,这种情况下建议应用sleep 30s,再消费下一条消息,这样可以减轻Broker重试消息的压力。
2. Broker消息重试策略
查看broker.log文件,可以看到启动有很多的启动参数,其中有一条如下:
这里就表示的是消息重试的时间,1s,5s....的间隔时间后再进行消息的重试,这里是消息消费的消息重试。
3. Producer端消息重试实现
package com.wangx.rocketmq.quickstart;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.exception.RemotingException; public class Producer { public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException { DefaultMQProducer producer = new DefaultMQProducer("myGroup"); producer.setNamesrvAddr("localhost:9876"); producer.start();
//重试三秒 producer.setRetryTimesWhenSendFailed(3); for (int i = 0; i < 10; i++) { Message message = new Message("MyTopic", "tabA", ("Hello World" + i).getBytes());
//超时时间 SendResult result = producer.send(message,100); System.out.println(result); } producer.shutdown(); } }
如果消息在100ms之内发送失败,就重试三次
4. Consumer使用方式
在Consumer中,当消费消息的处理过程中,出现异常时,我们通常返回的是RECONSUME_LATER,表示一会儿之后再重试,当返回了这个状态之后,broker就会按照2中的时间间隔来重试消息。当然,最大也只能重试到2h.
在实际的运用场景中,我们并不想要消息无止境的一直重试下去,可能我们回想要消息重试几次之后,还是不能成功的情况下就将这条消息存储到db或log文件中,所以此时我们可以这样实现:
package com.wangx.rocketmq.quickstart;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; public class Consumer { public static void main(String[] args) throws MQClientException { final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("MyConsumerGroup"); consumer.setNamesrvAddr("47.105.145.123:9876;47.105.149.61:9876"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); consumer.subscribe("MyTopic", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { try { MessageExt ext = msgs.get(0); int x = 0; String topic = ext.getTopic(); String body = new String(ext.getBody(),"utf-8"); if (Integer.parseInt(body) % 2 == 0) { //产生异常 x = Integer.parseInt(body) / 0; } System.out.println("收到来自topic: " + topic + ",的消息:" + body); } catch (Exception e) { try { MessageExt ext = msgs.get(0); String topic = ext.getTopic(); String body = new String(ext.getBody(),"utf-8"); if (ext.getReconsumeTimes() == 3) { //模拟将消息保存到db或日志文件中,返回成功状态,使消息不再重试 System.out.println("保存成功消息:" + body + "成功!!!"); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } System.err.println("err:收到来自topic: " + topic + ",的消息:" + body); } catch (Exception e1) { e1.printStackTrace(); } return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); } }
控制台打印结果如下:
收到来自topic: MyTopic,的消息:1
err: 收到来自topic: MyTopic,的消息:2
err: 收到来自topic: MyTopic,的消息:6
err: 收到来自topic: MyTopic,的消息:2
err: 收到来自topic: MyTopic,的消息:6 收到来自topic: MyTopic,的消息:7 err: 收到来自topic: MyTopic,的消息:0 err: 收到来自topic: MyTopic,的消息:8 err: 收到来自topic: MyTopic,的消息:4 收到来自topic: MyTopic,的消息:3 收到来自topic: MyTopic,的消息:5 收到来自topic: MyTopic,的消息:9 err: 收到来自topic: MyTopic,的消息:0 err: 收到来自topic: MyTopic,的消息:8 err: 收到来自topic: MyTopic,的消息:4 err: 收到来自topic: MyTopic,的消息:2 err: 收到来自topic: MyTopic,的消息:6 err: 收到来自topic: MyTopic,的消息:0 err: 收到来自topic: MyTopic,的消息:8 err: 收到来自topic: MyTopic,的消息:4 保存成功消息:2成功!!! 保存成功消息:6成功!!! 保存成功消息:0成功!!! 保存成功消息:8成功!!! 保存成功消息:4成功!!
可以看到奇数的时候,正常消费,当消息为偶数时,会抛出异常,此时返回的是RECONSUME_LATER,所以消息将会重试消费,在MessageExt中保存了一个属性叫reconsumeTimes,表示消息重试次数,我们这里使用当消息重试三次之后,模拟将消息保存到db或日志文件中的操作,然后返回CONSUME_SUCCESS,结束消息的重试。这样就可以保证消息出现异常时我们可以做适当的操作避免消息一直重试或对于消息无法消费情况做一些补偿操作。
原文 RocketMQ学习笔记(13)----RocketMQ的Consumer消息重试
RocketMQ学习笔记(6)----RocketMQ的Client的使用 Producer/Consumer
1. 添加依赖
pom.xml如下:

<dependency>
<groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.3.1</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-srvutil</artifactId> <version>4.3.1</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.2.3</version> </dependency> <dependency> <groupId>org.javassist</groupId> <artifactId>javassist</artifactId> <version>3.23.1-GA</version> </dependency> <dependency> <groupId>io.openmessaging</groupId> <artifactId>openmessaging-api</artifactId> <version>0.3.0-alpha</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-openmessaging</artifactId> <version>4.3.1</version> </dependency>

2. Producer 的开发步骤
1. 实例化Producer Group,如下:
DefaultMQProducer producer = new DefaultMQProducer("my-producer-group");
2. 设置namesrvAddr,集群环境多个nameserver用;分割,如下:
producer.setNamesrvAddr("47.105.145.123:9876;47.105.149.61:9876");
3. 调用start()方法启动:
producer.start();
4. 发送消息

for (int i = 0; i < 10; i++) {
//构建实例,第一个参数为topic,第二个参数为tabs,第三个参数为消息体 Message message = new Message("MyQuickStartTopic","tabA",("Hello World:" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult result = producer.send(message); System.out.println(result); }

5. 关闭生产者(根据自己需求确定是够需要关闭)
producer.shutdown();
完整示例如下:

package com.wangx.rocketmq.quickstart;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingException; import java.io.UnsupportedEncodingException; /** * 创建一个消费者 */ public class Producer { public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException, UnsupportedEncodingException { //1. 实例化一个producer group DefaultMQProducer producer = new DefaultMQProducer("my-producer-group"); //2. 设置namesrvAddr,集群环境多个nameserver用;分割 producer.setNamesrvAddr("47.105.145.123:9876;47.105.149.61:9876"); //3. 启动 producer.start(); // 4. 发送消息 for (int i = 0; i < 10; i++) { //构建实例,第一个参数为topic,第二个参数为tabs,第三个参数为消息体 Message message = new Message("MyQuickStartTopic","tabA",("Hello World:" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult result = producer.send(message); System.out.println(result); } //关闭生产者 producer.shutdown(); } }

使用方式可以说非常简单了。
3. Consumer开发步骤
1. 实例化Consumer Group,如下:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my-consumer-group");
2. 设置namesrvAddr,集群环境多个nameserver用;分割,如下:
producer.setNamesrvAddr("47.105.145.123:9876;47.105.149.61:9876");
3. 设置从什么位置开始都
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
4. 订阅topic.
consumer.subscribe("MyQuickStartTopic", "*");
5. 注册消息监听器
consumer.registerMessageListener();
6. 重写MessageListenerConcurrently接口的consumeMessage()方法
完整代码如下:

package com.wangx.rocketmq.quickstart;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; /** * 创建一个消费者 */ public class Consumer { public static void main(String[] args) throws MQClientException { //实例化一个consumer组 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my-consumer-group"); //设置setNamesrvAddr,同生产者 consumer.setNamesrvAddr("47.105.145.123:9876;47.105.149.61:9876"); //设置消息读取方式,这里设置的是队尾开始读取 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); //设置订阅主题,第二个参数为过滤tabs的条件,可以写为tabA|tabB过滤Tab,*表示接受所有 consumer.subscribe("MyQuickStartTopic", "*"); //注册消息监听 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { try { //的到MessageExt MessageExt messageExt = list.get(0); String topic = messageExt.getTopic(); String message = new String(messageExt.getBody(),"UTF-8"); int queueId = messageExt.getQueueId(); System.out.println("收到来自topic:" + topic + ", queueId:" + queueId + "的消息:" + message); } catch (Exception e) { //失败,请求稍后重发 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } //成功 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); } }

consumeMessage返回一个枚举的两种状态,成功表示接受成功,否则返回稍后重发的状态。这里注意,启动的时候需要consumer先启动,因为它需要在生产者之前先订阅,否则将会收不到生产在consumer生产的消息,造成消息丢失。
启动consumer,在启动producer
producer控制台
consumer控制台:
rocketmq-console信息:
可以看到,我们前面部署的集群环境也是能够实现消息的负载均衡的,会使两个broker上都创建topic,且都能够接收生产者生产的消息。
进入topic,可以看到新增了两个我们自定义的topic
可能会出现的问题:
RemotingTooMuchRequestException: sendDefaultImpl call timeout
在客户端运行Producer时,可能会出现如上异常,这是因为从 Windows 上开发连接 虚拟机中的 nameServer 时要经过 Linux 系统的防火墙,而防火墙一般都会有超时的机制,在网络连接长时间不传输数据时,会关闭这个 TCP 的会话,关闭后再读写,就有可能导致这个异常。
解决办法就是关闭防火墙,ubuntu下命令如下:
contOS下命令如下:
systemctl stop firewalld.service #停止firewall
systemctl disable firewalld.service #禁止firewall开机启动
firewall-cmd --state #查看默认防火墙状态(关闭后显示notrunning,开启后显示running)
原文 RocketMQ学习笔记(6)----RocketMQ的Client的使用 Producer/Consumer
RocketMQ源码学习(五)-Broker(与Consumer交互部分)
问题列表
Broker 怎么响应Consumer请求?
Broker 怎么维护ConsumeQueue?
Broker 怎么处理事务消息的 ConsumeQueue ?
Broker 怎么处理定时消息的 ConsumeQueue?
Broker 怎么处理回溯消费请求?
Broker 的消息是 at least once还是exactly only once?
怎么响应Consumer请求?
原理:
如上图所示,RocketMQ将所有消息都放在CommitLog里面,消费是维护一个ConsumeQueue帮助Consumer消费.pull操作要读两次,先读ConsumeQueue得到offset,再读CommitLog得到消息内容.
ConsumeQueue有一个长度20的ByteBufferb变量byteBufferIndex,里面维护者消息偏移量,消息发现,tags的hashcode
//消息偏移量
this.byteBufferIndex.putLong(offset);
//消息大小
this.byteBufferIndex.putInt(size);
//tags的HashCode
this.byteBufferIndex.putLong(tagsCode);
BrokerController.initialize方法中会注册PullMessageProcessor来处理pull message 请求
this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);
this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
在PullMessageProcessor.processRequest中又委托给DefaultMessageStore获取
final GetMessageResult getMessageResult =
this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);
DefaultMessageStore.getMessage
public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset, final int maxMsgNums,
final MessageFilter messageFilter) {
if (this.shutdown) {
log.warn("message store has shutdown, so getMessage is forbidden");
return null;
}
if (!this.runningFlags.isReadable()) {
log.warn("message store is not readable, so getMessage is forbidden " + this.runningFlags.getFlagBits());
return null;
}
long beginTime = this.getSystemClock().now();
GetMessageStatus status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
long nextBeginOffset = offset;
long minOffset = 0;
long maxOffset = 0;
GetMessageResult getResult = new GetMessageResult();
final long maxOffsetPy = this.commitLog.getMaxOffset();
//根据topic和queueId找到ConsumeQueue
ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
if (consumeQueue != null) {
minOffset = consumeQueue.getMinOffsetInQueue();
maxOffset = consumeQueue.getMaxOffsetInQueue();
if (maxOffset == 0) {
status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
nextBeginOffset = nextOffsetCorrection(offset, 0);
} else if (offset < minOffset) {
status = GetMessageStatus.OFFSET_TOO_SMALL;
nextBeginOffset = nextOffsetCorrection(offset, minOffset);
} else if (offset == maxOffset) {
status = GetMessageStatus.OFFSET_OVERFLOW_ONE;
nextBeginOffset = nextOffsetCorrection(offset, offset);
} else if (offset > maxOffset) {
status = GetMessageStatus.OFFSET_OVERFLOW_BADLY;
if (0 == minOffset) {
nextBeginOffset = nextOffsetCorrection(offset, minOffset);
} else {
nextBeginOffset = nextOffsetCorrection(offset, maxOffset);
}
} else {
//前面都在处理异常,这里开始真正获取
//获取consumeQueue offset之后所有可读的offset,consumeQueue也是文件存储
SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);
if (bufferConsumeQueue != null) {
try {
status = GetMessageStatus.NO_MATCHED_MESSAGE;
long nextPhyFileStartOffset = Long.MIN_VALUE;
long maxPhyOffsetPulling = 0;
int i = 0;
final int maxFilterMessageCount = Math.max(16000, maxMsgNums * ConsumeQueue.CQ_STORE_UNIT_SIZE);
final boolean diskFallRecorded = this.messageStoreConfig.isDiskFallRecorded();
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
//循环单个获取
for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
int sizePy = bufferConsumeQueue.getByteBuffer().getInt();
long tagsCode = bufferConsumeQueue.getByteBuffer().getLong();
maxPhyOffsetPulling = offsetPy;
if (nextPhyFileStartOffset != Long.MIN_VALUE) {
if (offsetPy < nextPhyFileStartOffset)
continue;
}
boolean isInDisk = checkInDiskByCommitOffset(offsetPy, maxOffsetPy);
if (this.isTheBatchFull(sizePy, maxMsgNums, getResult.getBufferTotalSize(), getResult.getMessageCount(),
isInDisk)) {
break;
}
boolean extRet = false;
if (consumeQueue.isExtAddr(tagsCode)) {
extRet = consumeQueue.getExt(tagsCode, cqExtUnit);
if (extRet) {
tagsCode = cqExtUnit.getTagsCode();
} else {
// can''t find ext content.Client will filter messages by tag also.
log.error("[BUG] can''t find consume queue extend file content!addr={}, offsetPy={}, sizePy={}, topic={}, group={}",
tagsCode, offsetPy, sizePy, topic, group);
}
}
if (messageFilter != null
&& !messageFilter.isMatchedByConsumeQueue(tagsCode, extRet ? cqExtUnit : null)) {
if (getResult.getBufferTotalSize() == 0) {
status = GetMessageStatus.NO_MATCHED_MESSAGE;
}
continue;
}
//根据偏移量和大小从CommitLog获取消息
SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
if (null == selectResult) {
if (getResult.getBufferTotalSize() == 0) {
status = GetMessageStatus.MESSAGE_WAS_REMOVING;
}
nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy);
continue;
}
if (messageFilter != null
&& !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) {
if (getResult.getBufferTotalSize() == 0) {
status = GetMessageStatus.NO_MATCHED_MESSAGE;
}
// release...
selectResult.release();
continue;
}
this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet();
//将查到的消息放入返回值
getResult.addMessage(selectResult);
status = GetMessageStatus.FOUND;
nextPhyFileStartOffset = Long.MIN_VALUE;
}
if (diskFallRecorded) {
long fallBehind = maxOffsetPy - maxPhyOffsetPulling;
brokerStatsManager.recordDiskFallBehindSize(group, topic, queueId, fallBehind);
}
nextBeginOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
long diff = maxOffsetPy - maxPhyOffsetPulling;
long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE
* (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
getResult.setSuggestPullingFromSlave(diff > memory);
} finally {
bufferConsumeQueue.release();
}
} else {
status = GetMessageStatus.OFFSET_FOUND_NULL;
nextBeginOffset = nextOffsetCorrection(offset, consumeQueue.rollNextFile(offset));
log.warn("consumer request topic: " + topic + "offset: " + offset + " minOffset: " + minOffset + " maxOffset: "
+ maxOffset + ", but access logic queue failed.");
}
}
} else {
status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE;
nextBeginOffset = nextOffsetCorrection(offset, 0);
}
if (GetMessageStatus.FOUND == status) {
this.storeStatsService.getGetMessageTimesTotalFound().incrementAndGet();
} else {
this.storeStatsService.getGetMessageTimesTotalMiss().incrementAndGet();
}
long eclipseTime = this.getSystemClock().now() - beginTime;
this.storeStatsService.setGetMessageEntireTimeMax(eclipseTime);
getResult.setStatus(status);
getResult.setNextBeginOffset(nextBeginOffset);
getResult.setMaxOffset(maxOffset);
getResult.setMinOffset(minOffset);
return getResult;
}
CommitLog.getMessage
public SelectMappedBufferResult getMessage(final long offset, final int size) {
int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog();
//根据offset获取到映射文件
MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, offset == 0);
if (mappedFile != null) {
int pos = (int) (offset % mappedFileSize);
//从映射文件中获取指定位置的数据
return mappedFile.selectMappedBuffer(pos, size);
}
return null;
}
MappedFile.selectMappedBuffer
public SelectMappedBufferResult selectMappedBuffer(int pos, int size) {
int readPosition = getReadPosition();
if ((pos + size) <= readPosition) {
if (this.hold()) {
//Java NIO 获取指定pos数据
ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
byteBuffer.position(pos);
ByteBuffer byteBufferNew = byteBuffer.slice();
byteBufferNew.limit(size);
return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this);
} else {
log.warn("matched, but hold failed, request pos: " + pos + ", fileFromOffset: "
+ this.fileFromOffset);
}
} else {
log.warn("selectMappedBuffer request pos invalid, request pos: " + pos + ", size: " + size
+ ", fileFromOffset: " + this.fileFromOffset);
}
return null;
}
消息获取流程明白了,看下怎么获取ConsumeQueue,它是维护在一个以queueId为key的ConcurrentMap中,没有就新建一个.
public ConsumeQueue findConsumeQueue(String topic, int queueId) {
ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
if (null == map) {
ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);
ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
if (oldMap != null) {
map = oldMap;
} else {
map = newMap;
}
}
ConsumeQueue logic = map.get(queueId);
if (null == logic) {
//m
ConsumeQueue newLogic = new ConsumeQueue(//
topic, //
queueId, //
StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), //
this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(), //
this);
ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
if (oldLogic != null) {
logic = oldLogic;
} else {
logic = newLogic;
}
}
return logic;
}
Broker 怎么维护ConsumeQueue?
很容易的想到,是在存放Message的时候维护了ConsumeQueue.但是相关代码没找到,我们从顶层往上层找
CosumeQueue.putMessagePositionInfo
-> CosumeQueue.putMessagePositionInfoWrapper
-> DefaultMessageStore.putMessagePositionInfo
-> CommitLogDispatcherBuildConsumeQueue.dispatch
-> DefaultMessageStore.doDispatch
-> ReputMessageService.doReput
private void doReput() {
for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() //
&& this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
break;
}
//从CommitLog中读取上次偏移量之后的新消息
SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
if (result != null) {
try {
this.reputFromOffset = result.getStartOffset();
//顺序读
for (int readSize = 0; readSize < result.getSize() && doNext; ) {
//构建请求去维护consumeQueue的请求
DispatchRequest dispatchRequest =
DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
int size = dispatchRequest.getMsgSize();
if (dispatchRequest.isSuccess()) {
if (size > 0) {
DefaultMessageStore.this.doDispatch(dispatchRequest);
if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
&& DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
}
// FIXED BUG By shijia
this.reputFromOffset += size;
readSize += size;
if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
DefaultMessageStore.this.storeStatsService
.getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet();
DefaultMessageStore.this.storeStatsService
.getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
.addAndGet(dispatchRequest.getMsgSize());
}
} else if (size == 0) {
this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
readSize = result.getSize();
}
} else if (!dispatchRequest.isSuccess()) {
if (size > 0) {
log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);
this.reputFromOffset += size;
} else {
doNext = false;
if (DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {
log.error("[BUG]the master dispatch message to consume queue error, COMMITLOG OFFSET: {}",
this.reputFromOffset);
this.reputFromOffset += result.getSize() - readSize;
}
}
}
}
} finally {
result.release();
}
} else {
doNext = false;
}
}
}
逻辑:启个线程,不断地查询有没有新的提交,如果有就请求维护consumeQueue.
Broker 怎么处理事务消息的ConsumeQueue?
逻辑:
1.prepare消息不会放入ConsumeQueue
2.commit消息才能被ConsumeQueue
CommitLogDispatcherBuildConsumeQueue.dispatch
class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {
@Override
public void dispatch(DispatchRequest request) {
final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
switch (tranType) {
case MessageSysFlag.TRANSACTION_NOT_TYPE:
// commit操作会放入consumeQueue
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
DefaultMessageStore.this.putMessagePositionInfo(request);
break;
// prepare消息不会放入ConsumeQueue
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
break;
}
}
}
Broker 怎么处理定时消息的 ConsumeQueue?
Producer那篇有讲,定时消息是吧topic和queueId存储到属性中,真正存储与ScheduleTopic上,时间到了才放入原有topic,故ConsumeQueue无需特殊维护.
Broker 怎么处理回溯消费请求?
逻辑:
1.在ConsumeQueue根据时间查询offset
2.请求Consumer Group中所有Consumer重置offset
代码:
AdminBrokerProcessor.processRequest
case RequestCode.INVOKE_BROKER_TO_RESET_OFFSET:
return this.resetOffset(ctx, request);
Broker2Client.resetOffset
public RemotingCommand resetOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final ResetOffsetRequestHeader requestHeader =
(ResetOffsetRequestHeader) request.decodeCommandCustomHeader(ResetOffsetRequestHeader.class);
log.info("[reset-offset] reset offset started by {}. topic={}, group={}, timestamp={}, isForce={}",
RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getTopic(), requestHeader.getGroup(),
requestHeader.getTimestamp(), requestHeader.isForce());
boolean isC = false;
LanguageCode language = request.getLanguage();
switch (language) {
case CPP:
isC = true;
break;
}
return this.brokerController.getBroker2Client().resetOffset(requestHeader.getTopic(), requestHeader.getGroup(),
requestHeader.getTimestamp(), requestHeader.isForce(), isC);
}
主要逻辑在此方法
Broker2Client.resetOffset
public RemotingCommand resetOffset(String topic, String group, long timeStamp, boolean isForce,
boolean isC) {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
if (null == topicConfig) {
log.error("[reset-offset] reset offset failed, no topic in this broker. topic={}", topic);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("[reset-offset] reset offset failed, no topic in this broker. topic=" + topic);
return response;
}
Map<MessageQueue, Long> offsetTable = new HashMap<MessageQueue, Long>();
for (int i = 0; i < topicConfig.getWriteQueueNums(); i++) {
MessageQueue mq = new MessageQueue();
mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
mq.setTopic(topic);
mq.setQueueId(i);
long consumerOffset =
this.brokerController.getConsumerOffsetManager().queryOffset(group, topic, i);
if (-1 == consumerOffset) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(String.format("THe consumer group <%s> not exist", group));
return response;
}
long timeStampOffset;
if (timeStamp == -1) {
timeStampOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, i);
} else {
//根据时间找offset
timeStampOffset = this.brokerController.getMessageStore().getOffsetInQueueByTime(topic, i, timeStamp);
}
if (timeStampOffset < 0) {
log.warn("reset offset is invalid. topic={}, queueId={}, timeStampOffset={}", topic, i, timeStampOffset);
timeStampOffset = 0;
}
if (isForce || timeStampOffset < consumerOffset) {
offsetTable.put(mq, timeStampOffset);
} else {
offsetTable.put(mq, consumerOffset);
}
}
ResetOffsetRequestHeader requestHeader = new ResetOffsetRequestHeader();
requestHeader.setTopic(topic);
requestHeader.setGroup(group);
requestHeader.setTimestamp(timeStamp);
//给Consumer发送请求重置offset
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.RESET_CONSUMER_CLIENT_OFFSET, requestHeader);
if (isC) {
// c++ language
ResetOffsetBodyForC body = new ResetOffsetBodyForC();
List<MessageQueueForC> offsetList = convertOffsetTable2OffsetList(offsetTable);
body.setOffsetTable(offsetList);
request.setBody(body.encode());
} else {
// other language
ResetOffsetBody body = new ResetOffsetBody();
body.setOffsetTable(offsetTable);
request.setBody(body.encode());
}
ConsumerGroupInfo consumerGroupInfo =
this.brokerController.getConsumerManager().getConsumerGroupInfo(group);
if (consumerGroupInfo != null && !consumerGroupInfo.getAllChannel().isEmpty()) {
ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable =
consumerGroupInfo.getChannelInfoTable();
for (Map.Entry<Channel, ClientChannelInfo> entry : channelInfoTable.entrySet()) {
int version = entry.getValue().getVersion();
if (version >= MQVersion.Version.V3_0_7_SNAPSHOT.ordinal()) {
try {
this.brokerController.getRemotingServer().invokeOneway(entry.getKey(), request, 5000);
log.info("[reset-offset] reset offset success. topic={}, group={}, clientId={}",
topic, group, entry.getValue().getClientId());
} catch (Exception e) {
log.error("[reset-offset] reset offset exception. topic={}, group={}",
new Object[] {topic, group}, e);
}
} else {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("the client does not support this feature. version="
+ MQVersion.getVersionDesc(version));
log.warn("[reset-offset] the client does not support this feature. version={}",
RemotingHelper.parseChannelRemoteAddr(entry.getKey()), MQVersion.getVersionDesc(version));
return response;
}
}
} else {
String errorInfo =
String.format("Consumer not online, so can not reset offset, Group: %s Topic: %s Timestamp: %d",
requestHeader.getGroup(),
requestHeader.getTopic(),
requestHeader.getTimestamp());
log.error(errorInfo);
response.setCode(ResponseCode.CONSUMER_NOT_ONLINE);
response.setRemark(errorInfo);
return response;
}
response.setCode(ResponseCode.SUCCESS);
ResetOffsetBody resBody = new ResetOffsetBody();
resBody.setOffsetTable(offsetTable);
response.setBody(resBody.encode());
return response;
}
ConsumeQueue中没有时间戳啊,还是得去CommitLog中找
public long getOffsetInQueueByTime(final long timestamp) {
MappedFile mappedFile = this.mappedFileQueue.getMappedFileByTime(timestamp);
if (mappedFile != null) {
long offset = 0;
int low = minLogicOffset > mappedFile.getFileFromOffset() ? (int) (minLogicOffset - mappedFile.getFileFromOffset()) : 0;
int high = 0;
int midOffset = -1, targetOffset = -1, leftOffset = -1, rightOffset = -1;
long leftIndexValue = -1L, rightIndexValue = -1L;
long minPhysicOffset = this.defaultMessageStore.getMinPhyOffset();
SelectMappedBufferResult sbr = mappedFile.selectMappedBuffer(0);
if (null != sbr) {
ByteBuffer byteBuffer = sbr.getByteBuffer();
high = byteBuffer.limit() - CQ_STORE_UNIT_SIZE;
try {
//二分查找?
while (high >= low) {
midOffset = (low + high) / (2 * CQ_STORE_UNIT_SIZE) * CQ_STORE_UNIT_SIZE;
byteBuffer.position(midOffset);
long phyOffset = byteBuffer.getLong();
int size = byteBuffer.getInt();
if (phyOffset < minPhysicOffset) {
low = midOffset + CQ_STORE_UNIT_SIZE;
leftOffset = midOffset;
continue;
}
//获取指定偏移量的时间戳
long storeTime =
this.defaultMessageStore.getCommitLog().pickupStoreTimestamp(phyOffset, size);
if (storeTime < 0) {
return 0;
} else if (storeTime == timestamp) {
targetOffset = midOffset;
break;
} else if (storeTime > timestamp) {
high = midOffset - CQ_STORE_UNIT_SIZE;
rightOffset = midOffset;
rightIndexValue = storeTime;
} else {
low = midOffset + CQ_STORE_UNIT_SIZE;
leftOffset = midOffset;
leftIndexValue = storeTime;
}
}
if (targetOffset != -1) {
offset = targetOffset;
} else {
if (leftIndexValue == -1) {
offset = rightOffset;
} else if (rightIndexValue == -1) {
offset = leftOffset;
} else {
offset =
Math.abs(timestamp - leftIndexValue) > Math.abs(timestamp
- rightIndexValue) ? rightOffset : leftOffset;
}
}
return (mappedFile.getFileFromOffset() + offset) / CQ_STORE_UNIT_SIZE;
} finally {
sbr.release();
}
}
}
return 0;
}
再来关注下消息存储时存储的额外信息 MessageExt
// 队列ID <PUT>
private int queueId;
//存储消息大小
private int storeSize;
//队列的offset
private long queueOffset;
//消息标志位
private int sysFlag;
//消息在客户端创建时间戳
private long bornTimestamp;
//producer 地址
private SocketAddress bornHost;
//消息在服务器存储时间戳
private long storeTimestamp;
//存储的broker地址
private SocketAddress storeHost;
//消息id
private String msgId;
//消息对应的Commit Log Offset
private long commitLogOffset;
//消息体CRC
private int bodyCRC;
// 当前消息被某个订阅组重新消费了几次(订阅组之间独立计数)
private int reconsumeTimes;
Broker 的消息是 at least once还是exactly only once?
at least once:
是指每个消息必须投递一次,RocketMQ Consumer 先 pull 消息到本地,消费完成后,才向服务器返回 ack,如果没有消费一定不会ack消息,所以RocketMQ可以很好的支持此特性。
exactly only once:
(1). 发送消息阶段,不允许发送重复的消息。
(2). 消费消息阶段,不允许消费重复的消息。只有以上两个条件都满足情况下,才能认为消息是“Exactly Only Once”,而要实现以上两点,在分布式系统环境下,不可避免要产生巨大的开销。所以 RocketMQ 为了追求高性能,并不保证此特性,要求在业务上进行去重, 也就是说消费消息要做到幂等性。RocketMQ 虽然不能严格保证不重复,但是正常情况下很少会出现重复发送、消 费情况,只有网络异常,Consumer 启停等异常情况下会出现消息重复。此问题的本质原因是网络调用存在不确定性,即既不成功也不失败的第三种状态,所以才产生了消息重复性问 题。
我们今天的关于聊聊rocketmq的updateConsumeOffsetToBroker和rocketmq updatetopic的分享就到这里,谢谢您的阅读,如果想了解更多关于RocketMQ Externals RocketMQ-ConsoleRocketMQ-JMSRocketMQ-FlumeRocketMQ-FlinkRocketMQ-SparkRocketMQ-DockerRocketMQ-MySQLRocketMQ-CPP其他 Apache RocketMQ 的扩展项目、RocketMQ学习笔记(13)----RocketMQ的Consumer消息重试、RocketMQ学习笔记(6)----RocketMQ的Client的使用 Producer/Consumer、RocketMQ源码学习(五)-Broker(与Consumer交互部分)的相关信息,可以在本站进行搜索。
本文标签: