新聞中心
消費(fèi)者實(shí)現(xiàn)邏輯-Kafka知識體系(四)
作者:小汪哥寫代碼 2021-07-08 05:52:34
開發(fā)
架構(gòu)
Kafka Consumer Group 是 Kafka 提供的可擴(kuò)展且具有容錯性的消費(fèi)者機(jī)制。Kafka 僅僅使用 Consumer Group 這一種機(jī)制,卻同時實(shí)現(xiàn)了傳統(tǒng)消息引擎系統(tǒng)的兩大模型:消息隊列模型、發(fā)布 / 訂閱模型。

上篇文章分享kafka broker 的實(shí)現(xiàn)原理、數(shù)據(jù)的存儲結(jié)構(gòu)和消息持久化相關(guān)的東西,那消息存儲完了之后,怎么被消費(fèi)端消費(fèi)呢,本文來聊一聊Kafka 消費(fèi)端的那些事兒。
1)拉取機(jī)制
Kafka生產(chǎn)端是推的機(jī)制即Push,消費(fèi)端是拉的機(jī)制即Pull。
2)Pull的優(yōu)缺點(diǎn)
優(yōu)點(diǎn)是消費(fèi)端可以自己控制消息的讀取速度和數(shù)量;
缺點(diǎn)是不知道服務(wù)端有沒有數(shù)據(jù),所以要一直pull或隔一定時間pull,可能要pull多次并等待。
3)消息投遞語義:
Kafka默認(rèn)保證at-least-once delivery,容許用戶實(shí)現(xiàn)at-most-once語義,exactly-once的實(shí)現(xiàn)取決于目的存儲系統(tǒng)。
4)分區(qū)分配策略
RangeAssignor:按照分區(qū)范圍分配,當(dāng)前默認(rèn)策略;
RoundRobinAssignor:輪詢的方式分配;
StickyAssignor:Kafka 0.11版本引入,根據(jù)更多指標(biāo)比如負(fù)載,盡可能均勻。
這些前面的文章中也有提到。
消費(fèi)者組
Consumer Group 是 Kafka 提供的可擴(kuò)展且具有容錯性的消費(fèi)者機(jī)制。Kafka 僅僅使用 Consumer Group 這一種機(jī)制,卻同時實(shí)現(xiàn)了傳統(tǒng)消息引擎系統(tǒng)的兩大模型:消息隊列模型、發(fā)布 / 訂閱模型。
理想情況下,Consumer 實(shí)例的數(shù)量應(yīng)該等于該 Group 訂閱主題的分區(qū)總數(shù)。
【消費(fèi)者和消費(fèi)組】
Kafka消費(fèi)者是消費(fèi)組的一部分,當(dāng)多個消費(fèi)者形成一個消費(fèi)組來消費(fèi)主題時,每個消費(fèi)者會收到不同分區(qū)的消息。假設(shè)有一個T1主題,該主題有4個分區(qū);同時我們有一個消費(fèi)組G1,這個消費(fèi)組只有一個消費(fèi)者C1。那么消費(fèi)者C1將會收到這4個分區(qū)的消息,如下所示:
Kafka一個很重要的特性就是,只需寫入一次消息,可以支持任意多的應(yīng)用讀取這個消息。換句話說,每個應(yīng)用都可以讀到全量的消息。為了使得每個應(yīng)用都能讀到全量消息,應(yīng)用需要有不同的消費(fèi)組。對于上面的例子,假如我們新增了一個新的消費(fèi)組G2,而這個消費(fèi)組有兩個消費(fèi)者,那么會是這樣的:
這里值得我們注意的是:
- 一個topic 可以被 多個 消費(fèi)者組 消費(fèi),
但是每個 消費(fèi)者組 消費(fèi)的數(shù)據(jù)是 互不干擾 的,也就是說,每個 消費(fèi)組 消費(fèi)的都是 完整的數(shù)據(jù) 。
- 一個分區(qū)只能被 同一個消費(fèi)組內(nèi) 的一個 消費(fèi)者 消費(fèi),
而 不能拆給多個消費(fèi)者 消費(fèi),也就是說如果你某個 消費(fèi)者組內(nèi)的消費(fèi)者數(shù) 比 該 Topic 的分區(qū)數(shù)還多,那么多余的消費(fèi)者是不起作用的
消費(fèi)者分區(qū)分配的過程
那么我們現(xiàn)在就來看看分配過程是怎么樣的。
1.確定 群組協(xié)調(diào)器
每當(dāng)我們創(chuàng)建一個消費(fèi)組,kafka 會為我們分配一個 broker 作為該消費(fèi)組的 coordinator(協(xié)調(diào)器)
2.注冊消費(fèi)者 并選出 leader consumer
當(dāng)我們的有了 coordinator 之后,消費(fèi)者將會開始往該 coordinator上進(jìn)行注冊,第一個注冊的 消費(fèi)者將成為該消費(fèi)組的 leader,后續(xù)的 作為 follower
3.當(dāng) leader 選出來后,
他會從coordinator那里實(shí)時獲取分區(qū) 和 consumer 信息,并根據(jù)分區(qū)策略給每個consumer 分配 分區(qū),并將分配結(jié)果告訴 coordinator。
4.follower 消費(fèi)者將從 coordinator 那里獲取到自己相關(guān)的分區(qū)信息進(jìn)行消費(fèi),
對于所有的 follower 消費(fèi)者而言,他們只知道自己消費(fèi)的分區(qū),并不知道其他消費(fèi)者的存在。
5.至此,消費(fèi)者都知道自己的消費(fèi)的分區(qū),
分區(qū)過程結(jié)束,當(dāng)發(fā)生 分區(qū)再均衡 的時候,leader 將會重復(fù)分配過程
具體的流程圖可以翻閱前面的文章。
關(guān)于位移
【位移 offset】
- 每個消費(fèi)者在消費(fèi)消息的過程中必然需要有個字段記錄它當(dāng)前消費(fèi)到了分區(qū)的哪個位置上,這個字段就是消費(fèi)者位移(Consumer Offset),它是消費(fèi)者消費(fèi)進(jìn)度的指示器。
- 看上去Offset 就是一個數(shù)值而已,其實(shí)對于 Consumer Group 而言,它是一組 KV 對,Key 是分區(qū),V 對應(yīng) Consumer 消費(fèi)該分區(qū)的最新位移 TopicPartition->long
- 不過切記的是消費(fèi)者位移是下一條消息的位移,而不是目前最新消費(fèi)消息的位移。
- 提交位移主要是為了表征 Consumer 的消費(fèi)進(jìn)度,這樣當(dāng) Consumer 發(fā)生故障重啟之后,就能夠從 Kafka 中讀取之前提交的位移值,然后從相應(yīng)的位移處繼續(xù)消費(fèi),從而避免整個消費(fèi)過程重來一遍。
【位移的保存】
其實(shí)Consumer 端應(yīng)用程序在提交位移時,其實(shí)是向 Coordinator 所在的 Broker 提交位移。同樣地,當(dāng) Consumer 應(yīng)用啟動時,也是向 Coordinator 所在的 Broker 發(fā)送各種請求,然后由 Coordinator 負(fù)責(zé)執(zhí)行消費(fèi)者組的注冊、成員管理記錄等元數(shù)據(jù)管理操作。
老版本的 Consumer Group 把位移保存在 ZooKeeper 中,新版本的 Consumer Group 中,Kafka 社區(qū)重新設(shè)計了 Consumer Group 的位移管理方式,采用了將位移保存在 Kafka內(nèi)部主題的方法,也就是__consumer_offsets,俗稱位移主題。至于為什么放棄kafka 保存位移請看我前面的文章《基礎(chǔ)概念、架構(gòu)和新版的升級Kafka知識體系1》。
【位移主題的數(shù)據(jù)格式】
key
- 位移主題的 Key 中應(yīng)該保存 3 部分內(nèi)容:Group ID,主題名,分區(qū)號
value
- 主要保存的是offset 的信息,當(dāng)然還有時間戳等信息,你還記得你可以根據(jù)時間重置一個消費(fèi)者開始消費(fèi)的地方嗎
【位移的提交】
1. 自動提交
最簡單的提交方式是讓消費(fèi)者自動提交偏移量,如果 enable.auto.commit 被設(shè)為 true,那么每過 5s,消費(fèi)者會自動把從 poll() 方法接收到的最大偏移量提交上去。
可能造成的問題:數(shù)據(jù)重復(fù)讀
假設(shè)我們?nèi)匀皇褂媚J(rèn)的 5s 提交時間間隔,在最近一次提交之后的 3s 發(fā)生了再均衡,再均衡之后,消費(fèi)者從最后一次提交的偏移量位置開始讀取消息。這個時候偏移量已經(jīng)落后了 3s,所以在這 3s內(nèi)到達(dá)的消息會被重復(fù)處理??梢酝ㄟ^修改提交時間間隔來更頻繁地提交偏移量,減小可能出現(xiàn)重復(fù)消息的時間窗,不過這種情況是無法完全避免的。
2. 手動提交
2.1 同步提交
同步存在的問題
- 從名字上來看,它是一個同步操作,即該方法會一直等待,直到位移被成功提交才會返回。如果提交過程中出現(xiàn)異常,該方法會將異常信息拋出。
- commitSync()的問題在于,Consumer程序會處于阻塞狀態(tài),直到遠(yuǎn)端的Broker返回提交結(jié)果,這個狀態(tài)才會結(jié)束,需要注意的是同步提交會在提交失敗之后進(jìn)行重試
- 在任何系統(tǒng)中,因?yàn)槌绦蚨琴Y源限制而導(dǎo)致的阻塞都可能是系統(tǒng)的瓶頸,會影響整個應(yīng)用程序的 TPS,影響吞吐量。
2.2 異步提交
手動提交有一個不足之處,在 broker 對提交請求作出回應(yīng)之前,應(yīng)用程序會一直阻塞,這樣會限制應(yīng)用程序的吞吐量。我們可以通過降低提交頻率來提升吞吐量,但如果發(fā)生了再均衡,會增加重復(fù)消息的數(shù)量。
這時可以使用異步提交,只管發(fā)送提交請求,無需等待 broker 的響應(yīng)。它之所以不進(jìn)行重試,是因?yàn)樵谒盏椒?wù)器響應(yīng)的時候,可能有一個更大的偏移量已經(jīng)提交成功。
假設(shè)我們發(fā)出一個請求用于提交偏移量2000,這個時候發(fā)生了短暫的通信問題,服務(wù)器收不到請求,自然也不會作出任何響應(yīng)。與此同時,我們處理了另外一批消息,并成功提交了偏移量3000。如果commitAsync()重新嘗試提交偏移量2000,它有可能在偏移量3000之后提交成功。這個時候如果發(fā)生再均衡,就會出現(xiàn)重復(fù)消息。
異步存在的問題
- commitAsync 的問題在于,出現(xiàn)問題時它不會自動重試。因?yàn)樗钱惒讲僮?,倘若提交失敗后自動重試,那么它重試時提交的位移值可能早已經(jīng)“過期”或不是最新值了。因此,異步提交的重試其實(shí)沒有意義,所以 commitAsync 是不會重試的,所以只要在程序停止前最后一次提交成功即可。
- 這里提供一個解決方案,那就是不論成功還是失敗我們都將offsets信息記錄下來,如果最后一次提交成功那就忽略,如果最后一次沒有提交成功,我們可以在下次重啟的時候手動指定offset
綜合異步和同步來提交
同時使用了 commitSync() 和 commitAsync()。對于常規(guī)性、階段性的手動提交,我們調(diào)用 commitAsync() 避免程序阻塞,而在 Consumer 要關(guān)閉前,我們調(diào)用 commitSync() 方法執(zhí)行同步阻塞式的位移提交,以確保 Consumer 關(guān)閉前能夠保存正確的位移數(shù)據(jù)。
關(guān)于再均衡Rebalance
分區(qū)的所有權(quán)從一個消費(fèi)者轉(zhuǎn)移到另一個消費(fèi)者,這樣的行為被稱為再均衡(Rebalance)。再均衡非常重要,為消費(fèi)者組帶來了高可用性和伸縮性,可以放心的增加或移除消費(fèi)者。以下是觸發(fā)再均衡的三種行為:
- 當(dāng)一個 消費(fèi)者 加入組時,讀取了原本由其他消費(fèi)者讀取的分區(qū),會觸發(fā)再均衡。
- 當(dāng)一個 消費(fèi)者 離開組時(被關(guān)閉或發(fā)生崩潰),原本由它讀取的分區(qū)將被組里的其他 消費(fèi)者 來讀取,會觸發(fā)再均衡。
- 當(dāng) Topic 發(fā)生變化時,比如添加了新的分區(qū),會發(fā)生分區(qū)重分配,會觸發(fā)再均衡。
分區(qū)再均衡 期間該 Topic 是不可用的,所以Rebalance 實(shí)在是太慢了!!!
這里再補(bǔ)充一下生產(chǎn)環(huán)境中因?yàn)椴徽_的配置引起的不需要的分區(qū)再均衡。
正常集群變動不再考慮范圍內(nèi):
1.防止 因?yàn)槲茨芗皶r發(fā)送心跳,導(dǎo)致Consumer 超時被踢出消費(fèi)者組。
這里可以設(shè)置 session.timeout.ms超時時間 和 heartbeat.interval.ms 心跳間隔一般可以把 超時時間設(shè)置為 心跳間隔的 3倍。
2.Consumer消費(fèi)時間過長導(dǎo)致的。
Consumer端如果無法在規(guī)定時間內(nèi)消費(fèi)完 poll 來的消息,那么就認(rèn)為該消費(fèi)者有問題,從而該消費(fèi)者會自主離組,所以我們可以設(shè)置 max.poll.interval.ms比處理時間略長。
3.從第二點(diǎn)我們還可能引申一點(diǎn)就是,如果集群經(jīng)常發(fā)生 分區(qū)在均衡,
那么你可能需要去觀察下消費(fèi)者執(zhí)行任務(wù)的耗時,特別注意觀察下 GC 的占用時間。
往往線上出問題也是因?yàn)榕渲貌缓侠韺?dǎo)致的。
網(wǎng)站題目:消費(fèi)者實(shí)現(xiàn)邏輯-kafka知識體系(四)
文章出自:http://m.fisionsoft.com.cn/article/djhgdpp.html


咨詢
建站咨詢
