新聞中心
Kafka 如何解決消息不丟失?
作者: 微觀技術 2021-08-04 07:47:18
開發(fā)
架構
Kafka Kafka 消息框架,大家一定不陌生,很多人工作中都有接觸。它的核心思路,通過一個高性能的MQ服務來連接生產和消費兩個系統(tǒng),達到系統(tǒng)間的解耦,有很強的擴展性。

創(chuàng)新互聯(lián)服務項目包括漢南網(wǎng)站建設、漢南網(wǎng)站制作、漢南網(wǎng)頁制作以及漢南網(wǎng)絡營銷策劃等。多年來,我們專注于互聯(lián)網(wǎng)行業(yè),利用自身積累的技術優(yōu)勢、行業(yè)經(jīng)驗、深度合作伙伴關系等,向廣大中小型企業(yè)、政府機構等提供互聯(lián)網(wǎng)行業(yè)的解決方案,漢南網(wǎng)站推廣取得了明顯的社會效益與經(jīng)濟效益。目前,我們服務的客戶以成都為中心已經(jīng)輻射到漢南省份的部分城市,未來相信會繼續(xù)擴大服務區(qū)域并繼續(xù)獲得客戶的支持與信任!
本文轉載自微信公眾號「微觀技術」,作者微觀技術。轉載本文請聯(lián)系微觀技術公眾號。
大家好,我是Tom哥~
Kafka 消息框架,大家一定不陌生,很多人工作中都有接觸。它的核心思路,通過一個高性能的MQ服務來連接生產和消費兩個系統(tǒng),達到系統(tǒng)間的解耦,有很強的擴展性。
你可能會有疑問,如果中間某一個環(huán)節(jié)斷掉了,那怎么辦?
這種情況,我們稱之為消息丟失,會造成系統(tǒng)間的數(shù)據(jù)不一致。
那如何解決這個問題?需要從生產端、MQ服務端、消費端,三個維度來處理。
1、生產端
生產端的職責就是,確保生產的消息能到達MQ服務端,這里我們需要有一個響應來判斷本次的操作是否成功。
- Future
send(ProducerRecord record, Callback callback)
比如,上面的代碼就是通過一個Callback函數(shù),來判斷消息是否發(fā)送成功,如果失敗,我們需要補償處理。
另外,為了提升發(fā)送時的靈活性,kafka提供了多種參數(shù),供不同業(yè)務自己選擇
1.1 參數(shù) acks
該參數(shù)表示有多少個分區(qū)副本收到消息,才認為本次發(fā)送是成功的。
acks=0,只要發(fā)送消息就認為成功,生產端不等待服務器節(jié)點的響應
acks=1,表示生產者收到 leader 分區(qū)的響應就認為發(fā)送成功
acks=-1,只有當 ISR 中的副本全部收到消息時,生產端才會認為是成功的。這種配置是最安全的,但由于同步的節(jié)點較多,吞吐量會降低。
1.2 參數(shù) retries
表示生產端的重試次數(shù),如果重試次數(shù)用完后,還是失敗,會將消息臨時存儲在本地磁盤,待服務恢復后再重新發(fā)送。建議值 retries=3
1.3 參數(shù) retry.backoff.m
消息發(fā)送超時或失敗后,間隔的重試時間。一般推薦的設置時間是 300 毫秒。
這里要特別注意一種特殊情況,如果MQ服務沒有正常響應,不一定代表消息發(fā)送失敗,也有可能是響應時正好趕上網(wǎng)絡抖動,響應超時。
當生產端做完這些,一定能保證消息發(fā)送成功了,但可能發(fā)送多次,這樣就會導致消息重復,這個我們后面再講解決方案。
2、MQ服務端
MQ服務端作為消息的存儲介質,也有可能會丟失消息。比如:一個分區(qū)突然掛掉,那么怎么保證這個分區(qū)的數(shù)據(jù)不丟失,我們會引入副本概念,通過備份來解決這個問題。
具體可設置哪些參數(shù)?
2.1 參數(shù) replication.factor
表示分區(qū)副本的個數(shù),replication.factor >1 當leader 副本掛了,follower副本會被選舉為leader繼續(xù)提供服務。
2.2 參數(shù) min.insync.replicas
表示 ISR 最少的副本數(shù)量,通常設置 min.insync.replicas >1,這樣才有可用的follower副本執(zhí)行替換,保證消息不丟失
2.3 參數(shù) unclean.leader.election.enable
是否可以把非 ISR 集合中的副本選舉為 leader 副本。
如果設置為true,而follower副本的同步消息進度落后較多,此時被選舉為leader,會導致消息丟失,慎用。
3、消費端
消費端要做的是把消息完整的消費處理掉。但是這里面有個提交位移的步驟。
有的同學,考慮到業(yè)務處理消耗時間較長,會單獨啟動線程拉取消息存儲到本地內存隊列,然后再搞個線程池并行處理業(yè)務邏輯。這樣設計有個風險,本地消息如果沒有處理完,服務器宕機了,會造成消息丟失。
正確的做法:拉取消息 --- 業(yè)務處理 ---- 提交消費位移
關于提交位移,kafka提供了集中參數(shù)配置
參數(shù) enable.auto.commit
表示消費位移是否自動提交。
如果拉取了消息,業(yè)務邏輯還沒處理完,提交了消費位移但是消費端卻掛了,消費端恢復或其他消費端接管該分片再也拉取不到這條消息,會造成消息丟失。所以,我們通常設置 enable.auto.commit=false,手動提交消費位移。
- List
messages = consumer.poll(); - processMsg(messages);
- consumer.commitOffset();
這個方案,會產生另外一個問題,我們來看下這個圖:
拉取了消息4~消息8,業(yè)務處理后,在提交消費位移時,不湊巧系統(tǒng)宕機了,最后的提交位移并沒有保存到MQ 服務端,下次拉取消息時,依然是從消息4開始拉取,但是這部分消息已經(jīng)處理過了,這樣便會導致重復消費。
如何解決重復消費,避免引發(fā)數(shù)據(jù)不一致
首先,要解決MQ 服務端的重復消息。kafka 在 0.11.0 版本后,每條消息都有唯一的message id, MQ服務采用空間換時間方式,自動對重復消息過濾處理,保證接口的冪等性。
但這個不能根本上解決消息重復問題,即使MQ服務中存儲的消息沒有重復,但消費端是采用拉取方式,如果重復拉取,也會導致重復消費,如何解決這種場景問題?
方案一:只拉取一次(消費者拉取消息后,先提交 offset 后再處理消息),但是如果系統(tǒng)宕機,業(yè)務處理沒有正常結束,后面再也拉取不到這些消息,會導致數(shù)據(jù)不一致,該方案很少采用。
方案二:允許拉取重復消息,但是消費端自己做冪等性控制。保證只成功消費一次。
關于冪等技術方案很多,我們可以采用數(shù)據(jù)表或Redis緩存存儲處理標識,每次拉取到消息,處理前先校驗處理狀態(tài),再決定是處理還是丟棄消息。
文章名稱:Kafka如何解決消息不丟失?
當前路徑:http://m.fisionsoft.com.cn/article/djjidsg.html


咨詢
建站咨詢
