新聞中心
今天我就與RocketMQ Tag幾個(gè)值得關(guān)注的問題,和大家來做一個(gè)分享,看過后的朋友,如果覺得有幫助,期待你的點(diǎn)贊支持。

網(wǎng)站建設(shè)哪家好,找創(chuàng)新互聯(lián)建站!專注于網(wǎng)頁設(shè)計(jì)、網(wǎng)站建設(shè)、微信開發(fā)、微信小程序開發(fā)、集團(tuán)企業(yè)網(wǎng)站建設(shè)等服務(wù)項(xiàng)目。為回饋新老客戶創(chuàng)新互聯(lián)還提供了保亭黎族免費(fèi)建站歡迎大家使用!
- 消費(fèi)組訂閱關(guān)系不一致為什么會(huì)到來消息丟失?
- 如果一個(gè)tag的消息數(shù)量很少,是否會(huì)顯示很高的延遲?
1.消費(fèi)組訂閱關(guān)系不一致導(dǎo)致消息丟失
從消息消費(fèi)的視角來看消費(fèi)組是一個(gè)基本的物理隔離單位,每一個(gè)消費(fèi)組擁有自己的消費(fèi)位點(diǎn)、消費(fèi)線程池等。
RocketMQ的初學(xué)者容易犯這樣一個(gè)錯(cuò)誤:消費(fèi)組中的不同消費(fèi)者,訂閱同一個(gè)topic的不同的tag,這樣會(huì)導(dǎo)致消息丟失(部分消息沒有消費(fèi)),在思考這個(gè)問題時(shí),我們不妨先來看一張圖:
簡單闡述一下其核心關(guān)鍵點(diǎn):
- 例如一個(gè)Topic共有4個(gè)隊(duì)列。
- 消息發(fā)送者連續(xù)發(fā)送4條tagA的消息后,再連續(xù)發(fā)送4條tagb的消息,消息發(fā)送者端默認(rèn)采取輪循的負(fù)載均衡機(jī)制,這樣topic的每一個(gè)隊(duì)列中都存在tagA、tabB兩個(gè)tag的消息。
- 消費(fèi)組dw_tag_test的IP為192.168.3.10的消費(fèi)者訂閱tagA,另外一個(gè)IP為192.168.3.11的消費(fèi)者訂閱tagB。
- 消費(fèi)組內(nèi)的消費(fèi)者在進(jìn)行消息消費(fèi)之前,首先會(huì)進(jìn)行隊(duì)列負(fù)載,默認(rèn)為平均分配,分配結(jié)果:
消費(fèi)者然后向Broker發(fā)起消息拉取請(qǐng)求,192.168.3.10消費(fèi)者會(huì)由于只訂閱了tagA,這樣存在q0、q1中的tagB的消息會(huì)被過濾,但被過濾的tagB并不會(huì)投遞到另外一個(gè)訂閱了tagB的消費(fèi)者,造成這部分消息沒有被投遞,從而導(dǎo)致消息丟失。
同樣192.168.3.11消費(fèi)者會(huì)由于只訂閱了tagB,這樣存在q2、q3中的tagA的消息會(huì)被過濾,但被過濾的tagA并不會(huì)投遞到另外一個(gè)訂閱了tagA的消費(fèi)者,造成這部分消息沒有被投遞,從而導(dǎo)致消息丟失。
192.168.3.10 分配到q0、q1。
192.168.3.11 分配到q2、q3。
2.如果一個(gè)tag的消息數(shù)量很少,是否會(huì)顯示很高的延遲?
開篇有群友會(huì)存在這樣一個(gè)擔(dān)憂,其場(chǎng)景大概如下圖所示:
消費(fèi)者在消費(fèi)offset=100的這條tag1消息后,后面連續(xù)出現(xiàn)1000W條非tag1的消息,這個(gè)消費(fèi)組的積壓會(huì)持續(xù)增加,直接到1000W嗎?
要想明白這個(gè)問題,我們至少應(yīng)該要重點(diǎn)去查看如下幾個(gè)功能的源碼:
- 消息拉取流程
- 位點(diǎn)提交機(jī)制
本文將從以問題為導(dǎo)向,經(jīng)過自己的思考,并找到關(guān)鍵源碼加以求證,最后進(jìn)行簡單的示例代碼進(jìn)行驗(yàn)證。
遇到問題之前,我們可以先嘗試思考一下,如果這個(gè)功能要我們實(shí)現(xiàn),我們大概會(huì)怎么去思考?
要判斷消費(fèi)組在消費(fèi)為offset=100的消息后,在接下來1000W條消息都會(huì)被過濾的情況下,如果我們希望位點(diǎn)能夠提交,我們應(yīng)該怎么設(shè)計(jì)?我覺得應(yīng)該至少有如下幾個(gè)關(guān)鍵點(diǎn):
- 消息消息拉取時(shí)連續(xù)1000W條消息找不到合適的消息,服務(wù)端會(huì)如何處理
- 客戶端拉取到消息與未拉取到消息兩種情況如何提交位點(diǎn)
2.1 消息拉取流程中的關(guān)鍵設(shè)計(jì)
客戶端向服務(wù)端拉取消息,連續(xù)1000W條消息都不符合條件,一次過濾查找這么多消息,肯定非常耗時(shí),客戶端也不能等待這么久,那服務(wù)端必須采取措施,必須觸發(fā)一個(gè)停止查找的條件并向客戶端返回NO_MESSAGE,客戶端在消息查找時(shí)會(huì)等待多久呢?
核心關(guān)鍵點(diǎn)一:客戶端在向服務(wù)端發(fā)起消息拉取請(qǐng)求時(shí)會(huì)設(shè)置超時(shí)時(shí)間,代碼如下所示:
其中與超時(shí)時(shí)間相關(guān)的兩個(gè)變量,其含義分別:
- long brokerSuspendMaxTimeMillis 在當(dāng)前沒有符合的消息時(shí)在Broker端允許掛起的時(shí)間,默認(rèn)為15s,暫時(shí)不支持自定義。
- long timeoutMillis 消息拉取的超時(shí)時(shí)間,默認(rèn)為30s,暫時(shí)不支持自定義。
即一次消息拉取最大的超時(shí)時(shí)間為30s。
核心關(guān)鍵點(diǎn)二:Broker端在處理消息拉取時(shí)設(shè)置了完備的退出條件,具體由DefaultMessageStore的getMessage方法事項(xiàng),具體代碼如下所述:
核心要點(diǎn):
- 首先客戶端在發(fā)起時(shí)會(huì)傳入一個(gè)本次期望拉取的消息數(shù)量,對(duì)應(yīng)上述代碼中的maxMsgNums,如果拉取到指定條數(shù)到消息(讀者朋友們?nèi)珞w代碼讀者可以查閱isTheBatchFull方法),則正常退出。
- 另外一個(gè)非常關(guān)鍵的過濾條件,即一次消息拉取過程中,服務(wù)端最大掃描的索引字節(jié)數(shù),即一次拉取掃描ConsumeQueue的字節(jié)數(shù)量,取16000與期望拉取條數(shù)乘以20,因?yàn)橐粋€(gè)consumequeue條目占20個(gè)字節(jié)。
- 服務(wù)端還蘊(yùn)含了一個(gè)長輪循機(jī)制,即如果掃描了指定的字節(jié)數(shù),但一條消息都沒查詢到,會(huì)在broker端掛起一段時(shí)間,如果有新消息到來并符合過濾條件,則會(huì)喚醒,向客戶端返回消息。
回到這個(gè)問題,如果服務(wù)端連續(xù)1000W條非tag1的消息,拉取請(qǐng)求不會(huì)一次性篩選,而是會(huì)返回,不至于讓客戶端超時(shí)。
從這里可以打消第一個(gè)顧慮:服務(wù)端在沒有找到消息時(shí)不會(huì)傻傻等待不返回,接下來看是否會(huì)有積壓的關(guān)鍵是看如何提交位點(diǎn)。
2.2 位點(diǎn)提交機(jī)制
2.2.1 客戶端拉取到合適的消息位點(diǎn)提交機(jī)制
Pull線程從服務(wù)端拉取到結(jié)構(gòu)后會(huì)將消息提交到消費(fèi)組線程池,主要定義在DefaultMQPushConsumerImpl的PullTask類中,具體代碼如下所示:
眾所周知,RocketMQ是在消費(fèi)成功后進(jìn)行位點(diǎn)提交,代碼在ConsumeMessageConcurrentlyService中,如下所示:
這里的核心要點(diǎn):
消費(fèi)端成功消息完消費(fèi)后,會(huì)采用最小位點(diǎn)提交機(jī)制,確保消費(fèi)不丟失。
最小位點(diǎn)提交機(jī)制,其實(shí)就是將拉取到的消息放入一個(gè)TreeMap中,然后消費(fèi)線程成功消費(fèi)一條消息后,將該消息從TreeMap中移除,再計(jì)算位點(diǎn):
如果當(dāng)前TreeMap中還有消息在處理,則返回TreeMap中的第一條消息(最小位點(diǎn))
如果當(dāng)前TreeMap中已沒有消息處理,返回的位點(diǎn)為this.queueOffsetMax,queueOffsetMax的表示的是當(dāng)前消費(fèi)隊(duì)列中拉取到的最大消費(fèi)位點(diǎn),因?yàn)榇藭r(shí)拉取到的消息全部消費(fèi)了。
最后調(diào)用updateoffset方法,更新本地的位點(diǎn)緩存(有定時(shí)持久機(jī)制)
2.2.2 客戶端沒有拉取到合適的消息位點(diǎn)提交機(jī)制
客戶端如果沒有拉取到合適的消息,例如全部被tag過濾了,在DefaultMqPushConsumerImpl的PullTask中定義了處理方式,具體如下所示:
其關(guān)鍵代碼在correctTasOffset中,具體代碼請(qǐng)看:
核心要點(diǎn):如果此時(shí)處理隊(duì)列中的消息為0時(shí),則會(huì)將下一次拉取偏移量當(dāng)成位點(diǎn),而這個(gè)值在服務(wù)端進(jìn)行消息查找時(shí)會(huì)向前驅(qū)動(dòng),代碼在DefaultMessageStore的getMessage中:
故從這里可以看到,就算消息全部過濾掉了,位點(diǎn)還是會(huì)向前驅(qū)動(dòng)的,不會(huì)造成大量積壓。
2.2.3 消息拉取時(shí)會(huì)附帶一次位點(diǎn)提交
其實(shí)RocketMQ的位點(diǎn)提交,客戶端提交位點(diǎn)時(shí)會(huì)先存儲(chǔ)在本地緩存中,然后定時(shí)將位點(diǎn)信息一次性提交到Broker端,其實(shí)還存在另外一種較為隱式位點(diǎn)提交機(jī)制:
即在消息拉取時(shí),如果本地緩存中存在位點(diǎn)信息,會(huì)設(shè)置一個(gè)系統(tǒng)標(biāo)記:FLAG_COMMIT_OFFSET,該標(biāo)記在服務(wù)端會(huì)觸發(fā)一次位點(diǎn)提交,具體代碼如下:
2.2.4 總結(jié)與驗(yàn)證
綜上述所述,使用TAG并不會(huì)因?yàn)閷?duì)應(yīng)tag數(shù)量比較少,從而造成大量積壓的情況。
為了驗(yàn)證這個(gè)觀點(diǎn),我也做了一個(gè)簡單的驗(yàn)證,具體方法是啟動(dòng)一個(gè)消息發(fā)送者,向指定topic發(fā)送tag B的消息,而消費(fèi)者只訂閱tag A,但消費(fèi)者并不會(huì)出現(xiàn)消費(fèi)積壓,測(cè)試代碼如下圖所示:
查看消費(fèi)組積壓情況如下圖所示:
網(wǎng)站欄目:沒想到RocketMQ的tag還有這個(gè)“坑”!
網(wǎng)頁URL:http://m.fisionsoft.com.cn/article/dpcjddi.html


咨詢
建站咨詢
