新聞中心
1 背景
我們先介紹一下 RocketMQ 消息監(jiān)聽再來說明 RocketMQ 消費線程。

成都創(chuàng)新互聯(lián)公司專注于茂名企業(yè)網(wǎng)站建設,成都響應式網(wǎng)站建設公司,商城網(wǎng)站定制開發(fā)。茂名網(wǎng)站建設公司,為茂名等地區(qū)提供建站服務。全流程按需定制開發(fā),專業(yè)設計,全程項目跟蹤,成都創(chuàng)新互聯(lián)公司專業(yè)和態(tài)度為您提供的服務
2 RocketMQ 消息監(jiān)聽
設置消費者組為 my_consumer_group,監(jiān)聽 TopicTest 隊列,并使用并發(fā)消息監(jiān)聽器MessageListenerConcurrently
1public class Consumer {
2
3 public static void main(String[] args) throws InterruptedException, MQClientException {
4 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_consumer_group");
5 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
6 consumer.subscribe("TopicTest", "*");
7 consumer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
8 consumer.registerMessageListener(new MessageListenerConcurrently() {
9 @Override
10 public ConsumeConcurrentlyStatus consumeMessage(List msgs,
11 ConsumeConcurrentlyContext context) {
12 System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
13 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
14 }
15 });
16 consumer.start();
17 System.out.printf("Consumer Started.%n");
18 }
19} 3 RocketMQ 中連接結(jié)構(gòu)圖
4 消費監(jiān)聽器
接口:org.apache.rocketmq.client.consumer.listener.MessageListener
有兩個子接口:
- 順序消費:MessageListenerOrderly
- 并發(fā)消費: MessageListenerConcurrently
4.1 MessageListenerConcurrently
作用:consumer并發(fā)消費消息的監(jiān)聽器
比如,在 quick start 中,就是使用的并發(fā)消費消息監(jiān)聽器:?
1 consumer.registerMessageListener(new MessageListenerConcurrently() {
2 @Override
3 public ConsumeConcurrentlyStatus consumeMessage(List msgs,
4 ConsumeConcurrentlyContext context) {
5 System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
6 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
7 }
8 }); 方法返回值,是個枚舉:
1 package org.apache.rocketmq.client.consumer.listener;
2
3/**
4 * 并發(fā)消費mq消息結(jié)果
5 */
6public enum ConsumeConcurrentlyStatus {
7
8 /**
9 * Success consumption
10 * 成功消費
11 */
12 CONSUME_SUCCESS,
13
14 /**
15 * Failure consumption,later try to consume
16 * 失敗消費,稍后嘗試消費
17 *
18 *
19 * 如果 {@link MessageListener}返回的消費結(jié)果為 RECONSUME_LATER,則需要將這些消息發(fā)送給Broker延遲消息。
20 * 如果給broker發(fā)送消息失敗,將延遲5s后提交線程池進行消費。
21 *
22 * RECONSUME_LATER的消息發(fā)送入口: MQClientAPIImpl#consumerSendMessageBack,
23 * 命令編碼: {@link org.apache.rocketmq.common.protocol.RequestCode#CONSUMER_SEND_MSG_BACK}
24 */
25 RECONSUME_LATER;
26}
畫外音:
當前,我們在具體開發(fā)中,肯定不會直接使用這種方式來寫consumer。
常用的Consumer實現(xiàn)是:基于 推 的consumer:DefaultMQPushConsumer
4.2 MessageListenerOrderly
作用:consumer順序消費消息的監(jiān)聽器
5 消費線程池
5.1 DefaultMQPushConsumer
作用:基于 推 的consumer消費者
5.2 注冊并發(fā)消息監(jiān)聽器
org.apache.rocketmq.client.consumer.DefaultMQPushConsumer#registerMessageListener
當使用這個方法注冊消息監(jiān)聽器時,實際上會把這個病發(fā)消息監(jiān)聽器設置到 org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#messageListenerInner屬性中。
5.3 設置 consumer 消費 service
可選有兩種:?
并發(fā)消費的service
順序消費的service
當consumer在啟動的時,會使用MessageListener具體實現(xiàn)類型進行判斷:
MessageListener 就有并發(fā)和順序兩種,所以service也有兩種。
1public synchronized void start() throws MQClientException {
2 switch (this.serviceState) {
3 case CREATE_JUST:
4
5 // 省略一部分代碼...........
6
7 // 根據(jù)注冊的監(jiān)聽器類型[并發(fā)消息監(jiān)聽器/順序執(zhí)行消息監(jiān)聽器],來確定使用哪種消費服務.
8 if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
9 this.consumeOrderly = true;
10 this.consumeMessageService = new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
11 } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
12 this.consumeOrderly = false;
13 this.consumeMessageService = new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
14 }
15 this.consumeMessageService.start();
16
17 // 省略一部分代碼..........
18 this.serviceState = ServiceState.RUNNING;
19 break;
20 case RUNNING:
21 case START_FAILED:
22 case SHUTDOWN_ALREADY:
23 throw new MQClientException("The PushConsumer service state not OK, maybe started once");
24 default:
25 break;
26 }
27
28 // 省略一部分代碼..........
29 }如果使用的是并發(fā)消費的話,使用 ConsumeMessageConcurrentlyService :
在實例化的時候,會創(chuàng)建一個線程池:
1// 無界隊列,并且不可配置容量.那 DefaultMQPushConsumer#consumeThreadMax 配置就毫無意義了.
2this.consumeRequestQueue = new LinkedBlockingQueue();
3this.consumeExecutor = new ThreadPoolExecutor(
4 this.defaultMQPushConsumer.getConsumeThreadMin(), // 默認20
5 this.defaultMQPushConsumer.getConsumeThreadMax(), // 默認64
6 1000 * 60,
7 TimeUnit.MILLISECONDS,
8 this.consumeRequestQueue,
9 new ThreadFactoryImpl("ConsumeMessageThread_"));
consumer消費線程池參數(shù):
- 默認最小消費線程數(shù) 20
- 默認最大消費線程數(shù) 64
- keepAliveTime = 60*1000 單位:秒
- 隊列:new LinkedBlockingQueue<>()? 無界隊列
- 線程名稱:前綴是:ConsumeMessageThread_
注意:因為線程池使用的是無界隊列,那么設置的最大線程數(shù),其實沒有什么意義。
5.4 修改線程池線程數(shù)
上面我們已經(jīng)知道了,設置線程池的最大線程數(shù)是沒什么用的。
那我們其實可以設置線程池的最小線程數(shù),來修改consumer消費消息時的線程池大小。
1public static void main(String[] args) throws InterruptedException, MQClientException {
2 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
3
4 consumer.setConsumeThreadMin(30);
5 consumer.setConsumeThreadMax(64);
6
7 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
8 consumer.subscribe("TopicTest", "*");
9 consumer.registerMessageListener(new MessageListenerConcurrently() {
10
11 @Override
12 public ConsumeConcurrentlyStatus consumeMessage(List msgs,
13 ConsumeConcurrentlyContext context) {
14 System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
15 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
16 }
17 });
18 consumer.start();
19 System.out.printf("Consumer Started.%n");
20 } 注意:consumeThreadMin? 如果大于64,則也需要設置 consumeThreadMax 參數(shù),因為有個校驗:
-修改線程池線程數(shù)-SpringBoot版
如果consumer是使用spring boot進行集成的,則可以這樣設置消費者線程數(shù):
本文題目:如何在SpringBoot項目中控制RocketMQ消費線程數(shù)量
轉(zhuǎn)載來于:http://m.fisionsoft.com.cn/article/dpcshdj.html


咨詢
建站咨詢
