新聞中心
大家好,我是君哥。今天來(lái)聊一聊 RocketMQ 客戶(hù)端消息消費(fèi)失敗,怎么辦?

下面是 RocketMQ 推模式的一段代碼:
public static void main(String[] args) throws InterruptedException, MQClientException {
Tracer tracer = initTracer();
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
consumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(new ConsumeMessageOpenTracingHookImpl(tracer));
consumer.subscribe("TopicTest", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setConsumeTimestamp("20181109221800");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
try{
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
}catch (Exception e){
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
} 從這段代碼可以看出,消費(fèi)者消費(fèi)消息后會(huì)返回一個(gè)消費(fèi)狀態(tài),那消費(fèi)狀態(tài)有哪些呢?參見(jiàn)類(lèi) ConsumeConcurrentlyStatus 中定義:
- 消費(fèi)成功,返回 CONSUME_SUCCESS。
- 消費(fèi)失敗,返回 RECONSUME_LATER。
下面代碼就是返回上面兩個(gè)狀態(tài)的邏輯,對(duì)于消費(fèi)狀態(tài),如果返回 null,會(huì)給它賦值 RECONSUME_LATER,處理邏輯如下:
//ConsumeRequest 類(lèi)
public void run() {
MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
//省略部分邏輯
long beginTimestamp = System.currentTimeMillis();
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
try {
//省略部分邏輯
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {}
//省略部分邏輯
if (null == status) {
//省略日志
status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
//省略部分邏輯
if (!processQueue.isDropped()) {
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
} else {}
}
這部分代碼的 UML 類(lèi)圖如下:
上面代碼中的 processConsumeResult 方法就是消費(fèi)失敗后客戶(hù)端的處理邏輯:
public void processConsumeResult(
final ConsumeConcurrentlyStatus status,
final ConsumeConcurrentlyContext context,
final ConsumeRequest consumeRequest
) {
//ackIndex 初始值是 Integer.MAX_VALUE;
int ackIndex = context.getAckIndex();
switch (status) {
case CONSUME_SUCCESS:
if (ackIndex >= consumeRequest.getMsgs().size()) {
ackIndex = consumeRequest.getMsgs().size() - 1;
}
//省略部分邏輯
break;
case RECONSUME_LATER:
ackIndex = -1;
//省略部分邏輯
break;
default:
break;
}
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
//廣播模式下這里只打印日志
break;
case CLUSTERING:
ListmsgBackFailed = new ArrayList (consumeRequest.getMsgs().size());
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
boolean result = this.sendMessageBack(msg, context);
if (!result) {
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}
}
if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);
//發(fā)送回 Broker 失敗的消息,5s 后再次消費(fèi)
this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
}
break;
default:
break;
}
//更新本地保存的偏移量
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
}
一 、消費(fèi)成功
上面的代碼邏輯中,如果消費(fèi)成功,ackIndex 變量的值就是消息數(shù)量減 1,所以上面的 switch 邏輯是不會(huì)執(zhí)行的,因?yàn)?strong>廣播模式下,只是打印一段日志(沒(méi)有其他邏輯),而集群模式下,for 循環(huán)的起始 i 變量已經(jīng)等于消息數(shù)量,循環(huán)里面的代碼不會(huì)執(zhí)行。
因此,如果消息消費(fèi)成功,只會(huì)走最下面的邏輯,更新本地保存的消息偏移量。
二 、消費(fèi)失敗
ackIndex 變量值等于 -1。
1、廣播模式
在消費(fèi)失敗的情況下,廣播模式的代碼只是打印了一段日志,之后更新了本地保存的消息偏移量,因此我們知道廣播模式消息消費(fèi)失敗后就不會(huì)重新消費(fèi)了,相當(dāng)于丟棄了消息。
2、集群模式
從上面代碼的 for 循環(huán)中,會(huì)把所有的消息都發(fā)送回 Broker,這樣這批消息還能再次被拉取到進(jìn)行消費(fèi)。
對(duì)于發(fā)送給 Broker 失敗的消息,會(huì)延遲 5s 后再次消費(fèi)。代碼如下:
private void submitConsumeRequestLater(
final Listmsgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue
) {
this.scheduledExecutorService.schedule(new Runnable() {
@Override
public void run() {
ConsumeMessageConcurrentlyService.this.submitConsumeRequest(msgs, processQueue, messageQueue, true);
}
}, 5000, TimeUnit.MILLISECONDS);
}
更新本地保存的消息偏移量時(shí),會(huì)從消息列表中把發(fā)送回 Broker 失敗的消息先刪除掉。
注意:從上面邏輯可以看到,在拉取到一批消息進(jìn)行消費(fèi)時(shí),只要有一條消息消費(fèi)失敗,這批消息都會(huì)進(jìn)行重試,因此消費(fèi)端做好冪等是必要的。
下面再看一下發(fā)送失敗消息給 Broker 的代碼,發(fā)送消息是,請(qǐng)求的 code 碼是 CONSUMER_SEND_MSG_BACK。根據(jù)這個(gè)請(qǐng)求碼就能找 Broker 端的處理邏輯。
如果發(fā)送回 Broker 時(shí)拋出異常,需要重新發(fā)送一個(gè)新的消息,這里有四點(diǎn)需要注意:
- 新消息的 Topic 變成【 %RETRY% + consumerGroup】。
- 新消息的 RETRY_TOPIC 這個(gè)屬性賦值為之前的 Topic。
- 新消息的重試次數(shù)屬性加 1;
- 新消息的 DELAY 屬性等于重試次數(shù) + 3。
public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
try {
this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,
this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());
} catch (Exception e) {
//Topic 變成 %RETRY% + consumerGroup
Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());
String originMsgId = MessageAccessor.getOriginMessageId(msg);
MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);
//RETRY_TOPIC 賦值
MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
//重試次數(shù)+1
MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1));
//最大重試次數(shù)
MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));
//DELAY = 重試次數(shù) + 3
newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
this.mQClientFactory.getDefaultMQProducer().send(newMsg);
} finally {
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));
}
}
3、Broker 處理
上面已經(jīng)講過(guò),對(duì)于處理失敗的消息,消費(fèi)端會(huì)發(fā)送回 Broker,不過(guò)這里有一點(diǎn)需要注意,發(fā)送回 Broker 時(shí),消息的 Topic 變成【"%RETRY%" + namespace + "%" + 原始 topic】,封裝邏輯在源碼 ClientConfig.withNamespace。
根據(jù)請(qǐng)求碼 CONSUMER_SEND_MSG_BACK 可以定位到 Broker 的處理邏輯在類(lèi) SendMessageProcessor,方法 asyncConsumerSendMsgBack。
(1)進(jìn)死信隊(duì)列
如果重試次數(shù)超過(guò)了最大重試次數(shù)(默認(rèn) 16 次),或者 delayLevel 值小于0,則消息進(jìn)死信隊(duì)列,死信隊(duì)列的 Topic 為【"%DLQ%" + 消費(fèi)組】,代碼如下:
//asyncConsumerSendMsgBack 方法
if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
|| delayLevel < 0) {
newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
queueIdInt = ThreadLocalRandom.current().nextInt(99999999) % DLQ_NUMS_PER_GROUP;
topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
DLQ_NUMS_PER_GROUP,
PermName.PERM_WRITE | PermName.PERM_READ, 0);
msgExt.setDelayTimeLevel(0);
}
(2)發(fā)送 CommitLog
如果延遲級(jí)別(DELAY)等于 0,則延遲級(jí)別就等于重試次數(shù)加 3。
有個(gè)地方需要注意,發(fā)送到延遲隊(duì)列的消息重新進(jìn)行了封裝,封裝這個(gè)消息用的并不是客戶(hù)端發(fā)來(lái)的那個(gè)消息,而是從 CommitLog 中根據(jù)偏移量查找的,代碼如下:
MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset());
if (null == msgExt) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("look message by offset failed, " + requestHeader.getOffset());
return CompletableFuture.completedFuture(response);
}
如果查詢(xún)失敗,就會(huì)給客戶(hù)端返回系統(tǒng)錯(cuò)誤。
這里有個(gè)重要的細(xì)節(jié),這個(gè)消息寫(xiě)入 CommitLog 時(shí),會(huì)判斷 DELAY 是否大于 0,如果大于 0,就會(huì)修改 Topic。代碼如下:
//CommitLog 類(lèi) asyncPutMessage 方法
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// Delay Delivery
if (msg.getDelayTimeLevel() > 0) {
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
//從源碼看,這里最大值是18
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
//queueId = delayLevel - 1
int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// Backup real topic, queueId
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
msg.setTopic(topic);
msg.setQueueId(queueId);
}
}
這里把 Topic 修改為 SCHEDULE_TOPIC_XXXX,供延時(shí)隊(duì)列來(lái)調(diào)度。進(jìn)入延時(shí)隊(duì)列后,延時(shí)隊(duì)列會(huì)按照下面的時(shí)間進(jìn)行調(diào)度:
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
上面代碼可以看到,延時(shí)消息的調(diào)度有 18 個(gè)等級(jí),最小的 1s,最大的 2h。而從下面的代碼我們可以看到,調(diào)度使用第三個(gè)等級(jí)開(kāi)始的:
if (0 == delayLevel) {
delayLevel = 3 + msgExt.getReconsumeTimes();
}
msgExt.setDelayTimeLevel(delayLevel);(3)延時(shí)隊(duì)列
延時(shí)隊(duì)列的代碼邏輯在類(lèi) ScheduleMessageService,這里的 start 方法觸發(fā)延時(shí)隊(duì)列的調(diào)度,而 start 方法的業(yè)務(wù)入口在 BrokerStartup 的初始化。
首先,會(huì)計(jì)算出每個(gè)延時(shí)等級(jí)對(duì)應(yīng)的延時(shí)時(shí)間(處理到 ms 級(jí)別),放到 delayLevelTable,它是一個(gè) ConcurrentHashMap,然后創(chuàng)建一個(gè)核心線(xiàn)程數(shù)等于 18 的定時(shí)線(xiàn)程池,依次對(duì)每個(gè)級(jí)別的延時(shí)進(jìn)行調(diào)度。這個(gè)任務(wù)啟動(dòng)后,會(huì)每 100ms 執(zhí)行一次。代碼如下:
public void start() {
if (started.compareAndSet(false, true)) {
this.load();
this.deliverExecutorService = new ScheduledThreadPoolExecutor(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageTimerThread_"));
//省略異步
for (Map.Entry entry : this.delayLevelTable.entrySet()) {
Integer level = entry.getKey();
Long timeDelay = entry.getValue();
Long offset = this.offsetTable.get(level);
if (null == offset) {
offset = 0L;
}
if (timeDelay != null) {
//省略異步
this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS);
}
}
//省略其他邏輯
}
} 調(diào)度邏輯中,首先根據(jù) Topic 和 queueId 找到對(duì)應(yīng)的消費(fèi)隊(duì)列,然后從里面連續(xù)讀取消息:
public void executeOnTimeup() {
ConsumeQueue cq =
ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
delayLevel2QueueId(delayLevel));
//省略空處理
SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
//省略空處理
long nextOffset = this.offset;
try {
int i = 0;
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
//CQ_STORE_UNIT_SIZE = 20,因?yàn)?ConsumeQueue 中一個(gè)元素占 20 字節(jié)
for (; i < bufferCQ.getSize() && isStarted(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
//offset占8個(gè)字節(jié)
long offsetPy = bufferCQ.getByteBuffer().getLong();
//消息大小占4個(gè)字節(jié)
int sizePy = bufferCQ.getByteBuffer().getInt();
//ConsumeQueue中tagsCode是一個(gè)投遞時(shí)間點(diǎn)
long tagsCode = bufferCQ.getByteBuffer().getLong();
if (cq.isExtAddr(tagsCode)) {
if (cq.getExt(tagsCode, cqExtUnit)) {
tagsCode = cqExtUnit.getTagsCode();
} else {
//can't find ext content.So re compute tags code.
long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
}
}
long now = System.currentTimeMillis();
long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
long countdown = deliverTimestamp - now;
if (countdown > 0) {
//時(shí)間未到,等待下次調(diào)度
this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
return;
}
MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);
MessageExtBrokerInner msgInner = ScheduleMessageService.this.messageTimeup(msgExt);
//省略事務(wù)消息
boolean deliverSuc;
//同步異步都有,只保留同步代碼
deliverSuc = this.syncDeliver(msgInner, msgExt.getMsgId(), nextOffset, offsetPy, sizePy);
}
nextOffset = this.offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
} catch (Exception e) {
} finally {
bufferCQ.release();
}
//DELAY_FOR_A_WHILE是 100ms
this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
}因?yàn)?messageTimeup 方法使用了原始的 Topic 和 QueueId 新建了消息,所以上面的 syncDeliver 方式是將消息重新投遞到原始的隊(duì)列中,這樣消費(fèi)者可以再次拉取到這條消息進(jìn)行消費(fèi)。注意:上面 ConsumeQueue 的 tagsCode 是一個(gè)時(shí)間點(diǎn),很容易誤解為是 tag 的 hashCode,MessageQueue 的存儲(chǔ)元素中最后 8 字節(jié)確實(shí)是 tag 的 hashCode。
三、總結(jié)
消費(fèi)者消費(fèi)失敗后,會(huì)把消費(fèi)發(fā)回給 Broker 進(jìn)行處理。下圖是客戶(hù)端處理流程:
Broker 收到消息后,會(huì)把消息重新發(fā)送到 CommitLog,發(fā)送到 CommitLog 之前,首先會(huì)修改 Topic 為 SCHEDULE_TOPIC_XXXX,這樣就發(fā)送到了延時(shí)隊(duì)列,延時(shí)隊(duì)列再根據(jù)延時(shí)級(jí)別把消息投遞到原始的隊(duì)列,這樣消費(fèi)者就能再次拉取到。流程如下圖:
從流程來(lái)看,消費(fèi)者批量拉取消息,如果部分消息消費(fèi)失敗,那就會(huì)整批全部重試。所以做好冪等是必要的。
當(dāng)前名稱(chēng):阿里二面:RocketMQ消費(fèi)失敗了,怎么處理?
鏈接地址:http://m.fisionsoft.com.cn/article/ccdiseg.html


咨詢(xún)
建站咨詢(xún)
