新聞中心
大家好,我是君哥。

創(chuàng)新互聯(lián)建站提供高防主機(jī)、云服務(wù)器、香港服務(wù)器、德陽機(jī)房服務(wù)器托管等
今天來分享 RocketMQ 的定時(shí)任務(wù)。通過這些定時(shí)任務(wù),能讓我們更加理解 RocketMQ 的消息處理機(jī)制和設(shè)計(jì)理念。
從 RocketMQ 4.9.4 的源代碼上看,RocketMQ 的定時(shí)任務(wù)有很多,今天主要講解一些核心的定時(shí)任務(wù)。
1、架構(gòu)回顧
首先再來回顧一下 RocketMQ 的架構(gòu)圖:
Name Server 集群部署,但是節(jié)點(diǎn)之間并不會同步數(shù)據(jù),因?yàn)槊總€(gè)節(jié)點(diǎn)都會保存完整的數(shù)據(jù)。因此單個(gè)節(jié)點(diǎn)掛掉,并不會對集群產(chǎn)生影響。
Broker 可以采用主從集群部署,實(shí)現(xiàn)多副本存儲和高可用。每個(gè) Broker 節(jié)點(diǎn)都要跟所有的 Name Server 節(jié)點(diǎn)建立長連接,定義注冊 Topic 路由信息和發(fā)送心跳。
Producer 和 Consumer 跟 Name Server 的任意一個(gè)節(jié)點(diǎn)建立長連接,定期從 Name Server 拉取 Topic 路由信息。
2、Producer 和 Consumer
2.1 獲取 NameServer 地址
Producer 和 Consumer 要跟 Name Server 建立連接,就必須首先獲取 Name Server 地址。Producer 和 Consumer 采用定時(shí)任務(wù)每兩分鐘獲取 Name Server 地址并更新本地緩存。代碼如下:
//MQClientInstance類
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run(){
try {
MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
} catch (Exception e) {
log.error("ScheduledTask fetchNameServerAddr exception", e);
}
}
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
2.2 更新路由信息
Producer 和 Consumer 會定時(shí)從 Name Server 獲取定時(shí)訂閱信息,更新本地緩存,默認(rèn)間隔是 30s(可以配置)。代碼如下:
//MQClientInstance類
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run(){
try {
MQClientInstance.this.updateTopicRouteInfoFromNameServer();
} catch (Exception e) {
log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
}
}
}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
2.3 向 Broker 發(fā)送心跳
Producer 和 Consumer 會從本地緩存的 Broker 列表中定時(shí)清除離線的 Broker,并且向 Broker 發(fā)送心跳,默認(rèn)間隔是 30s(可以配置)。代碼如下:
//MQClientInstance類
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run(){
try {
MQClientInstance.this.cleanOfflineBroker();
MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
} catch (Exception e) {
log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
}
}
}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
2.4 持久化 Offset
消費(fèi)者需要定時(shí)持久化 MessageQueue 的偏移量,默認(rèn)每 5s 更新一次(可以配置)。
注意:集群模式需要向 Broker 發(fā)送持久化消息,因?yàn)榧耗J狡屏勘4嬖?Broker 端,而廣播模式只需要把偏移量保存在消費(fèi)者本地文件。代碼如下:
//MQClientInstance類
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run(){
try {
MQClientInstance.this.persistAllConsumerOffset();
} catch (Exception e) {
log.error("ScheduledTask persistAllConsumerOffset exception", e);
}
}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
2.5 調(diào)整核心線程數(shù)
對于消費(fèi)者采用推模式的情況,消費(fèi)者會根據(jù)未消費(fèi)的消息數(shù)量,定期更新核心線程數(shù),默認(rèn)每 1m 一次。
注意:在 4.9.4 這個(gè)版本,更新核心線程數(shù)的代碼并沒有實(shí)現(xiàn),只是預(yù)留了接口。代碼如下:
//MQClientInstance類
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run(){
try {
MQClientInstance.this.adjustThreadPool();
} catch (Exception e) {
log.error("ScheduledTask adjustThreadPool exception", e);
}
}
}, 1, 1, TimeUnit.MINUTES);
2.6 失效過期請求
Producer 和 Consumer 會定時(shí)掃描緩存在本地的請求,如果請求開始時(shí)間加超時(shí)時(shí)間(再加 1s)小于當(dāng)前時(shí)間,則這個(gè)請求過期。通過定時(shí)任務(wù)(3s 一次)讓過期請求失效,并且觸發(fā)回調(diào)函數(shù)。
//NettyRemotingClient.java
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run(){
try {
NettyRemotingClient.this.scanResponseTable();
} catch (Throwable e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
2.7 生產(chǎn)者
2.7.1 性能記錄
生產(chǎn)者發(fā)送消息后,會對成功失敗的狀態(tài)、花費(fèi)時(shí)間進(jìn)行記錄,以此來計(jì)算吞吐量 TPS,響應(yīng)時(shí)間 RT,代碼如下:
//Producer.java
executorService.scheduleAtFixedRate(new TimerTask() {
@Override
public void run(){
snapshotList.addLast(statsBenchmark.createSnapshot());
if (snapshotList.size() > 10) {
snapshotList.removeFirst();
}
}
}, 1000, 1000, TimeUnit.MILLISECONDS);
executorService.scheduleAtFixedRate(new TimerTask() {
private void printStats(){
if (snapshotList.size() >= 10) {
doPrintStats(snapshotList, statsBenchmark, false);
}
}
@Override
public void run(){
try {
this.printStats();
} catch (Exception e) {
e.printStackTrace();
}
}
}, 10000, 10000, TimeUnit.MILLISECONDS);
2.8 消費(fèi)者
2.8.1 MessageQueue 加鎖
對于順序消息,要保證同一個(gè) MessageQueue 只能被同一個(gè) Consumer 消費(fèi)。消費(fèi)者初始化的時(shí)候,會啟動一個(gè)定時(shí)任務(wù),定時(shí)(默認(rèn) 20s,可以配置)地向 Broker 發(fā)送鎖定消息,Broker 收到請求后,就會把 MessageQueue、group 和 clientId 進(jìn)行綁定,這樣其他客戶端就不能從這個(gè) MessageQueue 拉取消息。
代碼如下:
//ConsumeMessageOrderlyService.java
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run(){
try {
ConsumeMessageOrderlyService.this.lockMQPeriodically();
} catch (Throwable e) {
log.error("scheduleAtFixedRate lockMQPeriodically exception", e);
}
}
}, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
注意:Broker 的加鎖是有時(shí)效的(默認(rèn) 60s,可以配置),過期后,有可能被其他 Consumer 進(jìn)行消費(fèi)。
2.8.2 性能快照
Consumer 每秒會記錄一次性能快照,比如消息從創(chuàng)建到消費(fèi)花費(fèi)的時(shí)間,消息從保存到消費(fèi)花費(fèi)的時(shí)間,接收到消息的總數(shù)量,失敗總數(shù)量。代碼如下:
//Consumer.java
executorService.scheduleAtFixedRate(new TimerTask() {
@Override
public void run(){
snapshotList.addLast(statsBenchmarkConsumer.createSnapshot());
if (snapshotList.size() > 10) {
snapshotList.removeFirst();
}
}
}, 1000, 1000, TimeUnit.MILLISECONDS);
上面記錄了性能快照后,Consumer 會每隔 10s 進(jìn)行性能參數(shù)計(jì)算和打印。代碼如下:
//Consumer.java
executorService.scheduleAtFixedRate(new TimerTask() {
private void printStats(){
if (snapshotList.size() >= 10) {
Long[] begin = snapshotList.getFirst();
Long[] end = snapshotList.getLast();
final long consumeTps =
(long) (((end[1] - begin[1]) / (double) (end[0] - begin[0])) * 1000L);
final double averageB2CRT = (end[2] - begin[2]) / (double) (end[1] - begin[1]);
final double averageS2CRT = (end[3] - begin[3]) / (double) (end[1] - begin[1]);
final long failCount = end[4] - begin[4];
final long b2cMax = statsBenchmarkConsumer.getBorn2ConsumerMaxRT().get();
final long s2cMax = statsBenchmarkConsumer.getStore2ConsumerMaxRT().get();
statsBenchmarkConsumer.getBorn2ConsumerMaxRT().set(0);
statsBenchmarkConsumer.getStore2ConsumerMaxRT().set(0);
System.out.printf("Current Time: %s TPS: %d FAIL: %d AVG(B2C) RT(ms): %7.3f AVG(S2C) RT(ms): %7.3f MAX(B2C) RT(ms): %d MAX(S2C) RT(ms): %d%n",
System.currentTimeMillis(), consumeTps, failCount, averageB2CRT, averageS2CRT, b2cMax, s2cMax
);
}
}
通過性能參數(shù)的日志輸出,可以很方便的對 RocketMQ 的消費(fèi)者進(jìn)行監(jiān)控。
2.8.3 清除過期消息
消費(fèi)者會定期檢查本地拉取的消息列表,如果列表中的消息已經(jīng)過期(默認(rèn) 15 分鐘過期,可以配置),則把過期消息再次發(fā)送給 Broker,然后從本地消息列表刪除。代碼如下:
//ConsumeMessageConcurrentlyService.java
this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {
@Override
public void run(){
try {
cleanExpireMsg();
} catch (Throwable e) {
log.error("scheduleAtFixedRate cleanExpireMsg exception", e);
}
}
}, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES);
2.8.4 清除過期消息
消費(fèi)者會每隔 30s 向 NameServer 拉取 MessageQueue 信息,然后跟本地保存的進(jìn)行比較,如果不一致,則更新本地緩存信息。代碼如下:
//DefaultLitePullConsumerImpl.java
scheduledExecutorService.scheduleAtFixedRate(
new Runnable() {
@Override
public void run(){
try {
fetchTopicMessageQueuesAndCompare();
} catch (Exception e) {
log.error("ScheduledTask fetchMessageQueuesAndCompare exception", e);
}
}
}, 1000 * 10, this.getDefaultLitePullConsumer().getTopicMetadataCheckIntervalMillis(), TimeUnit.MILLISECONDS);
3 Broker
3.1 狀態(tài)采樣
Broker 端會對狀態(tài)進(jìn)行采用,比如一個(gè) Topic、MessageQueue、Group 總共發(fā)送了多少條消息,Topic 總共發(fā)送的消息大小。Broker 會對這些狀態(tài)按照秒、分鐘、小時(shí)為單位進(jìn)行采樣并且定時(shí)打印,這里一共有 6 個(gè)定時(shí)任務(wù)。比如下面是按照秒進(jìn)行采樣的定時(shí)任務(wù):
//StatsItemSet.java
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run(){
try {
samplingInSeconds();
} catch (Throwable ignored) {
}
}
}, 0, 10, TimeUnit.SECONDS);
3.2 記錄消息延時(shí)
Broker 讀取消息時(shí)會記錄消息從保存磁盤到被讀取的時(shí)間差并定時(shí)打印。定時(shí)任務(wù)代碼如下:
//MomentStatsItemSet.java
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run(){
try {
printAtMinutes();
} catch (Throwable ignored) {
}
}
}, Math.abs(UtilAll.computeNextMinutesTimeMillis() - System.currentTimeMillis()), 1000 * 60 * 5, TimeUnit.MILLISECONDS);
3.3 持久化數(shù)據(jù)
Broker 會定時(shí)持久化消費(fèi)偏移量、Topic 配置、定閱組配置等,默認(rèn) 10s 一次(可以配置)。代碼如下:
//ScheduleMessageService.java
this.deliverExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run(){
try {
if (started.get()) {
ScheduleMessageService.this.persist();
}
} catch (Throwable e) {
log.error("scheduleAtFixedRate flush exception", e);
}
}
}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval(), TimeUnit.MILLISECONDS);
3.4 失效過期請求
Broker 會定時(shí)掃描緩存在本地的請求,如果請求開始時(shí)間加超時(shí)時(shí)間(再加 1s)小于當(dāng)前時(shí)間,則這個(gè)請求過期。通過定時(shí)任務(wù)(3s 一次)讓過期請求失效,并且觸發(fā)回調(diào)函數(shù)。
//NettyRemotingServer.java
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run(){
try {
NettyRemotingServer.this.scanResponseTable();
} catch (Throwable e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
3.5 過濾服務(wù)
消費(fèi)者可能會向 Broker 注冊 filterClass 用來過濾消息。Broker 收到消費(fèi)者注冊的 filterClass 后會用定時(shí)任務(wù)來創(chuàng)建 FilterServer。代碼如下:
//FilterServerManager.java
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run(){
try {
FilterServerManager.this.createFilterServer();
} catch (Exception e) {
log.error("", e);
}
}
}, 1000 * 5, 1000 * 30, TimeUnit.MILLISECONDS);
這樣消費(fèi)者拉取消息時(shí)首先從 FilterServer 拉取消息,F(xiàn)ilterServer 從 Broker 拉取消息后進(jìn)行過濾,只把消費(fèi)者感興趣的消息返回給消費(fèi)者。一個(gè) Broker 可以有多個(gè) FilterServer。如下圖:
3.6 記錄消息總量
Broker 每天會記錄前一天收發(fā)消息的總數(shù)量,定時(shí)任務(wù)如下(period 是 1 天):
//BrokerController.java
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run(){
try {
BrokerController.this.getBrokerStats().record();
} catch (Throwable e) {
log.error("schedule record error.", e);
}
}
}, initialDelay, period, TimeUnit.MILLISECONDS);
3.7 持久化 Offset
Broker 默認(rèn)每隔 5s(可以配置) 會持久化一次消息的 Offset,代碼如下:
//BrokerController.java
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run(){
try {
BrokerController.this.consumerOffsetManager.persist();
} catch (Throwable e) {
log.error("schedule persist consumerOffset error.", e);
}
}
}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
3.8 持久化過濾參數(shù)
上面提到過,消費(fèi)者可能會向 Broker 注冊 filterClass,Broker 解析消費(fèi)者注冊的 filterClass 后,會把解析后的 FilterData 持久化到文件,代碼如下:
//BrokerController.java
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run(){
try {
BrokerController.this.consumerFilterManager.persist();
} catch (Throwable e) {
log.error("schedule persist consumer filter error.", e);
}
}
}, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);
3.9 Broker 自我保護(hù)
當(dāng)消費(fèi)者讀取消息緩慢時(shí),Broker 為了保護(hù)自己,會把這個(gè)消費(fèi)者設(shè)置為不允許讀取的狀態(tài),這樣這個(gè)消費(fèi)組就不能再拉取消息了,代碼如下:
//BrokerController.java
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run(){
try {
BrokerController.this.protectBroker();
} catch (Throwable e) {
log.error("protectBroker error.", e);
}
}
}, 3, 3, TimeUnit.MINUTES);
3.10 Broker 打印水位
Broker 會每隔 1s 打印一次水位,包括發(fā)送消息的延遲、接收消息的延遲、事務(wù)消息的延遲、查詢消息的延遲,代碼如下:
//BrokerController.java
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run(){
try {
BrokerController.this.printWaterMark();
} catch (Throwable e) {
log.error("printWaterMark error.", e);
}
}
}, 10, 1, TimeUnit.SECONDS);
3.11 Broker 打印 Offset 差
Broker 會定時(shí)打印最新的消息 Offset 和已經(jīng)分發(fā)給 MessageQueue 和 Index 索引的 Offset 差距,代碼如下:
//BrokerController.java
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run(){
try {
log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes());
} catch (Throwable e) {
log.error("schedule dispatchBehindBytes error.", e);
}
}
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
3.12 獲取 NameServer 地址
Broker 會定期獲取 NameServer 的地址,并更新本地緩存,代碼如下:
//BrokerController.java
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run(){
try {
BrokerController.this.brokerOuterAPI.fetchNameServerAddr();
} catch (Throwable e) {
log.error("ScheduledTask fetchNameServerAddr exception", e);
}
}
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
3.13 打印主從偏移量差距
Broker 會定時(shí)打印 master 節(jié)點(diǎn)和 slave 節(jié)點(diǎn)消息 Offset 的差距,代碼如下:
//BrokerController.java
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run(){
try {
BrokerController.this.printMasterAndSlaveDiff();
} catch (Throwable e) {
log.error("schedule printMasterAndSlaveDiff error.", e);
}
}
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
3.14 向 NameServer 注冊
Broker 會定時(shí)向(默認(rèn) 30s,可配置,最高不超過 60s)所有 NameServer 發(fā)送注冊消息,代碼如下:
//BrokerController.java
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run(){
try {
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
3.15 同步 Slave
Broker 的 Master 節(jié)點(diǎn)會每間隔 10s 向 Slave 節(jié)點(diǎn)同步數(shù)據(jù),包括 Topic 配置、消費(fèi)偏移量、延遲偏移量、消費(fèi)組配置,代碼如下:
//BrokerController.java
slaveSyncFuture = this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run(){
try {
BrokerController.this.slaveSynchronize.syncAll();
}
catch (Throwable e) {
log.error("ScheduledTask SlaveSynchronize syncAll error.", e);
}
}
}, 1000 * 3, 1000 * 10, TimeUnit.MILLISECONDS);
3.16 刪除過期文件
Broker 會周期性(默認(rèn) 10s,可以配置)地執(zhí)行刪除任務(wù),刪除過期的 CommitLog 文件和 ConsumeQueue 文件,代碼如下:
//DefaultMessageStore.java
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run(){
DefaultMessageStore.this.cleanFilesPeriodically();
}
}, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);
3.17 文件大小檢查
Broker 會每隔 10 分鐘檢查 CommitLog 文件和 ConsumeQueue 文件,用當(dāng)前文件的最?。ㄆ鹗迹?Offset 減去上一個(gè)文件最?。ㄆ鹗迹?Offset,如果不等于一個(gè)文件的大小,就說明文件存在問題。代碼如下:
//DefaultMessageStore.java
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run(){
DefaultMessageStore.this.checkSelf();
}
}, 1, 10, TimeUnit.MINUTES);
3.18 保存堆棧映射
Broker 會每隔 1s 記錄所有存活線程的堆棧映射信息,前提是 debugLockEnable 開關(guān)配置是打開的。代碼如下:
//DefaultMessageStore.java
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run(){
if (DefaultMessageStore.this.getMessageStoreConfig().isDebugLockEnable()) {
try {
if (DefaultMessageStore.this.commitLog.getBeginTimeInLock() != 0) {
long lockTime = System.currentTimeMillis() - DefaultMessageStore.this.commitLog.getBeginTimeInLock();
if (lockTime > 1000 && lockTime < 10000000) {
String stack = UtilAll 當(dāng)前文章:40個(gè)定時(shí)任務(wù),帶你理解RocketMQ設(shè)計(jì)精髓!
本文路徑:http://m.fisionsoft.com.cn/article/cccgeee.html


咨詢
建站咨詢
