新聞中心
Kafka是一個(gè)分布式流處理平臺(tái),它被廣泛用于構(gòu)建可擴(kuò)展、高吞吐量的實(shí)時(shí)數(shù)據(jù)管道。然而,在處理大量數(shù)據(jù)時(shí),Kafka數(shù)據(jù)丟失的問(wèn)題會(huì)引起許多煩惱。解決這個(gè)問(wèn)題的一種方法是將Kafka的數(shù)據(jù)持久化到數(shù)據(jù)庫(kù)中,從而更加可靠地保存數(shù)據(jù)。

成都創(chuàng)新互聯(lián)公司-專業(yè)網(wǎng)站定制、快速模板網(wǎng)站建設(shè)、高性價(jià)比朝陽(yáng)網(wǎng)站開(kāi)發(fā)、企業(yè)建站全套包干低至880元,成熟完善的模板庫(kù),直接使用。一站式朝陽(yáng)網(wǎng)站制作公司更省心,省錢,快速模板網(wǎng)站建設(shè)找我們,業(yè)務(wù)覆蓋朝陽(yáng)地區(qū)。費(fèi)用合理售后完善,10年實(shí)體公司更值得信賴。
Kafka數(shù)據(jù)丟失的原因和解決方法
Kafka的數(shù)據(jù)丟失問(wèn)題是由于Kafka的寫(xiě)入機(jī)制導(dǎo)致的。Kafka的寫(xiě)入機(jī)制是異步的,不能保證發(fā)布到Kafka的消息會(huì)被成功寫(xiě)入Kafka broker。因此,在某些情況下,Kafka會(huì)丟失消息,例如當(dāng)發(fā)生網(wǎng)絡(luò)斷開(kāi)或Kafka broker宕機(jī)時(shí)。
為了解決這個(gè)問(wèn)題,Kafka提供了一種常見(jiàn)的方法:使用Kafka的復(fù)制機(jī)制來(lái)保護(hù)數(shù)據(jù)。Kafka的復(fù)制機(jī)制將消息復(fù)制到備用副本中,以便在Kafka broker宕機(jī)或者數(shù)據(jù)丟失的時(shí)候,備用副本可以被用來(lái)恢復(fù)數(shù)據(jù)。但是,復(fù)制機(jī)制會(huì)增加寫(xiě)入延遲和消息存儲(chǔ)的開(kāi)銷,如果需要處理高并發(fā)或海量數(shù)據(jù),就需要考慮其他更可靠的方案。
將Kafka數(shù)據(jù)持久化到數(shù)據(jù)庫(kù)中的解決方案
將Kafka數(shù)據(jù)持久化到數(shù)據(jù)庫(kù)中是解決Kafka數(shù)據(jù)丟失問(wèn)題的一種可靠方法。這種方法的實(shí)現(xiàn)基于Kafka Connect,它是一個(gè)開(kāi)源工具,用于在Kafka和其他數(shù)據(jù)存儲(chǔ)系統(tǒng)之間進(jìn)行數(shù)據(jù)傳輸。
Kafka Connect的主要作用是將Kafka的數(shù)據(jù)轉(zhuǎn)換為其他數(shù)據(jù)格式并存儲(chǔ)到其他數(shù)據(jù)存儲(chǔ)系統(tǒng)中。要將Kafka的數(shù)據(jù)持久化到數(shù)據(jù)庫(kù)中,可以使用Kafka Connect的JDBC連接器。JDBC連接器可以將Kafka消息轉(zhuǎn)換為數(shù)據(jù)庫(kù)的記錄并將其插入到數(shù)據(jù)庫(kù)中。
以下步驟描述了將Kafka數(shù)據(jù)持久化到數(shù)據(jù)庫(kù)的過(guò)程:
1. 安裝Kafka Connect:將Kafka Connect安裝在您的本地機(jī)器或云服務(wù)器上。
2. 配置Kafka Connect:配置Kafka Connect以使其可以連接到Kafka和數(shù)據(jù)庫(kù)。
3. 創(chuàng)建JDBC連接器:使用Kafka Connect創(chuàng)建JDBC連接器,該連接器將消息轉(zhuǎn)換為數(shù)據(jù)庫(kù)的記錄,并將其插入到數(shù)據(jù)庫(kù)中。
4. 測(cè)試連接器:測(cè)試連接器以確保它可以正確地將消息保存到數(shù)據(jù)庫(kù)中。
將Kafka數(shù)據(jù)持久化到數(shù)據(jù)庫(kù)的好處
將Kafka數(shù)據(jù)持久化到數(shù)據(jù)庫(kù)的好處有:
1. 可靠性:數(shù)據(jù)會(huì)被持久化到數(shù)據(jù)庫(kù)中,從而保證數(shù)據(jù)不會(huì)丟失。
2. 可擴(kuò)展性:可以使用數(shù)據(jù)庫(kù)的擴(kuò)展性,無(wú)需考慮Kafka復(fù)制機(jī)制的限制。
3. 數(shù)據(jù)一致性:如果在Kafka broker宕機(jī)或網(wǎng)絡(luò)斷開(kāi)的情況下,可以使用數(shù)據(jù)庫(kù)恢復(fù)數(shù)據(jù)。
4. 數(shù)據(jù)備份:可以使用數(shù)據(jù)庫(kù)備份和還原機(jī)制對(duì)數(shù)據(jù)進(jìn)行備份和還原。
5. 數(shù)據(jù)安全性:可以使用數(shù)據(jù)庫(kù)的安全機(jī)制來(lái)保護(hù)數(shù)據(jù)。
結(jié)論
在處理大量實(shí)時(shí)數(shù)據(jù)時(shí),Kafka的數(shù)據(jù)丟失問(wèn)題是一個(gè)令人頭痛的問(wèn)題。解決這個(gè)問(wèn)題的一種方法是將Kafka數(shù)據(jù)持久化到數(shù)據(jù)庫(kù)中,從而更加可靠地保存數(shù)據(jù)。使用Kafka Connect的JDBC連接器可以使持久化過(guò)程變得更加容易和可管理。因此,如果您在使用Kafka時(shí)遇到了數(shù)據(jù)丟失的問(wèn)題,將Kafka數(shù)據(jù)持久化到數(shù)據(jù)庫(kù)中可能是一個(gè)可靠的解決方案。
相關(guān)問(wèn)題拓展閱讀:
- kafka:replica副本同步機(jī)制
kafka:replica副本同步機(jī)制
Kafka的流行歸功于它設(shè)計(jì)和操作簡(jiǎn)單、存儲(chǔ)系統(tǒng)高效、充分利用磁盤(pán)順序讀寫(xiě)等特性、非常適合在線日志收集等高吞吐場(chǎng)景。
Kafka特性之一是它的復(fù)制協(xié)議。復(fù)制協(xié)議是保障kafka高可靠性的關(guān)鍵。對(duì)于單個(gè)集群中每個(gè)Broker不同工作負(fù)載情況下,如何自動(dòng)調(diào)優(yōu)Kafka副本的工作方式是比較有挑戰(zhàn)的。它的挑戰(zhàn)之一是要知道如何避免follower進(jìn)入和退出同步副本列表(即ISR)。從用戶的角度來(lái)看,如果生產(chǎn)者發(fā)送一大批海量消息,可能會(huì)引起Kafka Broker很多警告。這些警報(bào)表明一些topics處于“under replicated”狀態(tài),這些副本處于同步失敗或失效狀態(tài),更意味著數(shù)據(jù)沒(méi)有被復(fù)制到足夠數(shù)量Broker從而增加數(shù)據(jù)丟失的概率。因此Kafka集群中處于“under replicated”中Partition數(shù)要密切監(jiān)控。這個(gè)警告應(yīng)該來(lái)自于Broker失效,減慢或暫停等狀態(tài)而不是生產(chǎn)者寫(xiě)不同大小消息引起的。
Kafka中主題的每個(gè)Partition有一個(gè)預(yù)寫(xiě)式日志文件,每個(gè)Partition都由一系列有序的、不可變的消息組成,這些消息被連續(xù)的追加到Partition中,Partition中的每個(gè)消息都有一個(gè)連續(xù)的序列號(hào)叫做offset, 確定它在分區(qū)日志中唯一的位置。
Kafka每個(gè)topic的partition有N個(gè)副本,其中N是topic的復(fù)制因子。Kafka通過(guò)多副本機(jī)制實(shí)現(xiàn)故障自動(dòng)轉(zhuǎn)移,當(dāng)Kafka集群中一個(gè)Broker失效情況下仍然保證服務(wù)可用。在Kafka中發(fā)生復(fù)制時(shí)確保partition的預(yù)寫(xiě)式日志有序地寫(xiě)到其他節(jié)點(diǎn)上。N個(gè)replicas中。其中一個(gè)replica為leader,其他都為follower,leader處理partition的所有讀寫(xiě)請(qǐng)求,與此同時(shí),follower會(huì)被動(dòng)定期地去復(fù)制leader上的數(shù)據(jù)。
如下圖所示,Kafka集群中有4個(gè)broker, 某topic有3個(gè)partition,且復(fù)制因子即副本個(gè)數(shù)也為3:
Kafka提供了數(shù)據(jù)復(fù)制算法保證,如果leader發(fā)生故障或掛掉,一個(gè)新leader被選舉并被接受客戶端的消息成功寫(xiě)入。Kafka確保從同步副本列表中選舉一個(gè)副本為leader,或者說(shuō)follower追趕leader數(shù)據(jù)。leader負(fù)責(zé)維護(hù)和跟蹤ISR(In-Sync Replicas的縮寫(xiě),表示副本同步隊(duì)列,具體可參考下節(jié))中所有follower滯后的狀態(tài)。當(dāng)producer發(fā)送一條消息到broker后,leader寫(xiě)入消息并復(fù)制到所有follower。消息提交之后才被成功復(fù)制到所有的同步副本。消息復(fù)制延遲受最慢的follower限制,重要的是快速檢測(cè)慢副本,如果follower“落后”太多或者失效,leader將會(huì)把它從ISR中刪除。
副本同步隊(duì)列(ISR)
所謂同步,必須滿足如下兩個(gè)條件:
默認(rèn)情況下Kafka對(duì)應(yīng)的topic的replica數(shù)量為1,即每個(gè)partition都有一個(gè)唯一的肢指leader,為了確保消息的可靠性,通常應(yīng)用中將其值(由broker的參數(shù)offsets.topic.replication.factor指定)大小設(shè)置為大于1,比如3。 所有的副本(replicas)統(tǒng)稱為Assigned Replicas,即AR。ISR是AR中的一個(gè)子集,由leader維護(hù)ISR列表,follower從leader同步數(shù)據(jù)有一些延遲。任意一個(gè)超過(guò)閾值都會(huì)把follower剔除出ISR, 存入OSR(Outof-Sync Replicas)列表舉饑,新加入的follower也會(huì)先存放在OSR中。AR=ISR+OSR。
上一節(jié)中的HW俗稱高水位,是HighWatermark的縮寫(xiě),取一個(gè)partition對(duì)應(yīng)的ISR中最小的LEO作為HW,consumer最多只能消費(fèi)到HW所在的位置。另外每個(gè)replica都有HW,leader和follower各自負(fù)責(zé)更新自己的HW的狀態(tài)。對(duì)于leader新寫(xiě)入的消息,consumer不能立刻消費(fèi),leader會(huì)等歷答配待該消息被所有ISR中的replicas同步后更新HW,此時(shí)消息才能被consumer消費(fèi)。這樣就保證了如果leader所在的broker失效,該消息仍然可以從新選舉的leader中獲取。對(duì)于來(lái)自內(nèi)部broKer的讀取請(qǐng)求,沒(méi)有HW的限制。
下圖詳細(xì)的說(shuō)明了當(dāng)producer生產(chǎn)消息至broker后,ISR以及HW和LEO的流轉(zhuǎn)過(guò)程:
由此可見(jiàn),Kafka的復(fù)制機(jī)制既不是完全的同步復(fù)制,也不是單純的異步復(fù)制。事實(shí)上,同步復(fù)制要求所有能工作的follower都復(fù)制完,這條消息才會(huì)被commit,這種復(fù)制方式極大的影響了吞吐率。而異步復(fù)制方式下,follower異步的從leader復(fù)制數(shù)據(jù),數(shù)據(jù)只要被leader寫(xiě)入log就被認(rèn)為已經(jīng)commit,這種情況下如果follower都還沒(méi)有復(fù)制完,落后于leader時(shí),突然leader宕機(jī),則會(huì)丟失數(shù)據(jù)。而Kafka的這種使用ISR的方式則很好的均衡了確保數(shù)據(jù)不丟失以及吞吐率。
副本不同步的異常情況
broker 分配的任何一個(gè) partition 都是以 Replica 對(duì)象實(shí)例的形式存在,而 Replica 在 Kafka 上是有兩個(gè)角色: leader 和 follower,只要這個(gè) Replica 是 follower,它便會(huì)向 leader 進(jìn)行數(shù)據(jù)同步。
反映在 ReplicaManager 上就是如果 Broker 的本地副本被選舉為 follower,那么它將會(huì)啟動(dòng)副本同步線程,其具體實(shí)現(xiàn)如下所示:
簡(jiǎn)單來(lái)說(shuō),makeFollowers() 的處理過(guò)程如下:
關(guān)于第6步,并不一定會(huì)為每一個(gè) partition 都啟動(dòng)一個(gè) fetcher 線程,對(duì)于一個(gè)目的 broker,只會(huì)啟動(dòng) num.replica.fetchers 個(gè)線程,具體這個(gè) topic-partition 會(huì)分配到哪個(gè) fetcher 線程上,是根據(jù) topic 名和 partition id 進(jìn)行計(jì)算得到,實(shí)現(xiàn)所示:
如上所示,在 ReplicaManager 調(diào)用 makeFollowers() 啟動(dòng) replica fetcher 線程后,它實(shí)際上是通過(guò) ReplicaFetcherManager 實(shí)例進(jìn)行相關(guān) topic-partition 同步線程的啟動(dòng)和關(guān)閉,其啟動(dòng)過(guò)程分為下面兩步:
addFetcherForPartitions() 的具體實(shí)現(xiàn)如下所示:
這個(gè)方法其實(shí)是做了下面這幾件事:
ReplicaFetcherManager 創(chuàng)建 replica Fetcher 線程的實(shí)現(xiàn)如下:
replica fetcher 線程在啟動(dòng)之后就開(kāi)始進(jìn)行正常數(shù)據(jù)同步流程了,這個(gè)過(guò)程都是在 ReplicaFetcherThread 線程中實(shí)現(xiàn)的。
ReplicaFetcherThread 的 doWork() 方法是一直在這個(gè)線程中的 run() 中調(diào)用的,實(shí)現(xiàn)方法如下:
在 doWork() 方法中主要做了兩件事:
processFetchRequest() 這個(gè)方法的作用是發(fā)送 Fetch 請(qǐng)求,并對(duì)返回的結(jié)果進(jìn)行處理,最終寫(xiě)入到本地副本的 Log 實(shí)例中,其具體實(shí)現(xiàn):
其處理過(guò)程簡(jiǎn)單總結(jié)一下:
fetch() 方法作用是發(fā)送 Fetch 請(qǐng)求,并返回相應(yīng)的結(jié)果,其具體的實(shí)現(xiàn),如下:
processPartitionData
這個(gè)方法的作用是,處理 Fetch 請(qǐng)求的具體數(shù)據(jù)內(nèi)容,簡(jiǎn)單來(lái)說(shuō)就是:檢查一下數(shù)據(jù)大小是否超過(guò)限制、將數(shù)據(jù)追加到本地副本的日志文件中、更新本地副本的 hw 值。
在副本同步的過(guò)程中,會(huì)遇到哪些異常情況呢?
大家一定會(huì)想到關(guān)于 offset 的問(wèn)題,在 Kafka 中,關(guān)于 offset 的處理,無(wú)論是 producer 端、consumer 端還是其他地方,offset 似乎都是一個(gè)形影不離的問(wèn)題。在副本同步時(shí),關(guān)于 offset,會(huì)遇到什么問(wèn)題呢?下面舉兩個(gè)異常的場(chǎng)景:
以上兩種情況都是 offset OutOfRange 的情況,只不過(guò):一是 Fetch Offset 超過(guò)了 leader 的 LEO,二是 Fetch Offset 小于 leader 最小的 offset
在介紹 Kafka 解決方案之前,我們先來(lái)自己思考一下這兩種情況應(yīng)該怎么處理?
上面是我們比較容易想出的解決方案,而在 Kafka 中,其解決方案也很類似,不過(guò)遇到情況比上面我們列出的兩種情況多了一些復(fù)雜,其解決方案如下:
針對(duì)之一種情況,在 Kafka 中,實(shí)際上還會(huì)發(fā)生這樣一種情況,1 在收到 OutOfRange 錯(cuò)誤時(shí),這時(shí)去 leader 上獲取的 LEO 值與最小的 offset 值,這時(shí)候卻發(fā)現(xiàn) leader 的 LEO 已經(jīng)從 800 變成了 1100(這個(gè) topic-partition 的數(shù)據(jù)量增長(zhǎng)得比較快),再按照上面的解決方案就不太合理,Kafka 這邊的解決方案是:遇到這種情況,進(jìn)行重試就可以了,下次同步時(shí)就會(huì)正常了,但是依然會(huì)有上面說(shuō)的那個(gè)問(wèn)題。
replica fetcher 線程關(guān)閉的條件,在三種情況下會(huì)關(guān)閉對(duì)這個(gè) topic-partition 的拉取操作:
這里直接說(shuō)線程關(guān)閉,其實(shí)不是很準(zhǔn)確,因?yàn)槊總€(gè) replica fetcher 線程操作的是多個(gè) topic-partition,而在關(guān)閉的粒度是 partition 級(jí)別,只有這個(gè)線程分配的 partition 全部關(guān)閉后,這個(gè)線程才會(huì)真正被關(guān)閉。
stopReplica
StopReplica 的請(qǐng)求實(shí)際上是 Controller 發(fā)送過(guò)來(lái)的,這個(gè)在 controller 部分會(huì)講述,它觸發(fā)的條件有多種,比如:broker 下線、partition replica 遷移等等。
makeLeaders
makeLeaders() 方法的調(diào)用是在 broker 上這個(gè) partition 的副本被設(shè)置為 leader 時(shí)觸發(fā)的,其實(shí)現(xiàn)如下:
調(diào)用 ReplicaFetcherManager 的 removeFetcherForPartitions() 刪除對(duì)這些 topic-partition 的副本同步設(shè)置,這里在實(shí)現(xiàn)時(shí),會(huì)遍歷所有的 replica fetcher 線程,都執(zhí)行 removePartitions() 方法來(lái)移除對(duì)應(yīng)的 topic-partition 。
removePartitions
這個(gè)方法的作用是:ReplicaFetcherThread 將這些 topic-partition 從自己要拉取的 partition 列表中移除。
ReplicaFetcherThread的關(guān)閉
前面介紹那么多,似乎還是沒(méi)有真正去關(guān)閉,那么 ReplicaFetcherThread 真正關(guān)閉是哪里操作的呢?
實(shí)際上 ReplicaManager 每次處理完 LeaderAndIsr 請(qǐng)求后,都會(huì)調(diào)用 ReplicaFetcherManager 的 shutdownIdleFetcherThreads() 方法,如果 fetcher 線程要拉取的 topic-partition 為空,那么就會(huì)關(guān)閉掉對(duì)應(yīng)的 fetcher 線程。
關(guān)于kafka數(shù)據(jù)不丟失數(shù)據(jù)庫(kù)的介紹到此就結(jié)束了,不知道你從中找到你需要的信息了嗎 ?如果你還想了解更多這方面的信息,記得收藏關(guān)注本站。
成都創(chuàng)新互聯(lián)建站主營(yíng):成都網(wǎng)站建設(shè)、網(wǎng)站維護(hù)、網(wǎng)站改版的網(wǎng)站建設(shè)公司,提供成都網(wǎng)站制作、成都網(wǎng)站建設(shè)、成都網(wǎng)站推廣、成都網(wǎng)站優(yōu)化seo、響應(yīng)式移動(dòng)網(wǎng)站開(kāi)發(fā)制作等網(wǎng)站服務(wù)。
當(dāng)前標(biāo)題:解決Kafka數(shù)據(jù)不丟失問(wèn)題,數(shù)據(jù)庫(kù)更加可靠(kafka數(shù)據(jù)不丟失數(shù)據(jù)庫(kù))
URL鏈接:http://m.fisionsoft.com.cn/article/djsdcep.html


咨詢
建站咨詢
