新聞中心
Kafka:消息是如何在服務(wù)端存儲與讀取的,你真的知道嗎?
作者:JAVA夢想口服液 2020-05-15 10:09:38
存儲
容災(zāi)備份
Kafka 小伙伴們肯定也比較好奇,Kafka 能夠處理千萬級消息,那它的消息是如何在 Partition 上存儲的呢?今天這篇文章就來為大家揭秘消息是如何存儲的。本文主要從消息的邏輯存儲和物理存儲兩個(gè)角度來介紹其實(shí)現(xiàn)原理。

前言
小伙伴們肯定也比較好奇,Kafka 能夠處理千萬級消息,那它的消息是如何在 Partition 上存儲的呢?今天這篇文章就來為大家揭秘消息是如何存儲的。本文主要從消息的邏輯存儲和物理存儲兩個(gè)角度來介紹其實(shí)現(xiàn)原理。
文章概覽
- Partition、Replica、Log 和 LogSegment 的關(guān)系。
- 寫入消息流程分析。
- 消費(fèi)消息及副本同步流程分析。
Partition、Replica、Log 和 LogSegment 的關(guān)系
假設(shè)有一個(gè) Kafka 集群,Broker 個(gè)數(shù)為 3,Topic 個(gè)數(shù)為 1,Partition 個(gè)數(shù)為 3,Replica 個(gè)數(shù)為 2。Partition 的物理分布如下圖所示。
Partition分布圖
從上圖可以看出,該 Topic 由三個(gè) Partition 構(gòu)成,并且每個(gè) Partition 由主從兩個(gè)副本構(gòu)成。每個(gè) Partition 的主從副本分布在不同的 Broker 上,通過這點(diǎn)也可以看出,當(dāng)某個(gè) Broker 宕機(jī)時(shí),可以將分布在其他 Broker 上的從副本設(shè)置為主副本,因?yàn)橹挥兄鞲北緦ν馓峁┳x寫請求,當(dāng)然在最新的 2.x 版本中從副本也可以對外讀請求了。將主從副本分布在不同的 Broker 上從而提高系統(tǒng)的可用性。
Partition 的實(shí)際物理存儲是以 Log 文件的形式展示的,而每個(gè) Log 文件又以多個(gè) LogSegment 組成。Kafka 為什么要這么設(shè)計(jì)呢?其實(shí)原因比較簡單,隨著消息的不斷寫入,Log 文件肯定是越來越大,Kafka 為了方便管理,將一個(gè)大文件切割成一個(gè)一個(gè)的 LogSegment 來進(jìn)行管理;每個(gè) LogSegment 由數(shù)據(jù)文件和索引文件構(gòu)成,數(shù)據(jù)文件是用來存儲實(shí)際的消息內(nèi)容,而索引文件是為了加快消息內(nèi)容的讀取。
可能又有朋友會(huì)問,Kafka 本身消費(fèi)是以 Partition 維度順序消費(fèi)消息的,磁盤在順序讀的時(shí)候效率很高完全沒有必要使用索引啊。其實(shí) Kafka 為了滿足一些特殊業(yè)務(wù)需求,比如要隨機(jī)消費(fèi) Partition 中的消息,此時(shí)可以先通過索引文件快速定位到消息的實(shí)際存儲位置,然后進(jìn)行處理。
總結(jié)一下 Partition、Replica、Log 和 LogSegment 之間的關(guān)系。消息是以 Partition 維度進(jìn)行管理的,為了提高系統(tǒng)的可用性,每個(gè) Partition 都可以設(shè)置相應(yīng)的 Replica 副本數(shù),一般在創(chuàng)建 Topic 的時(shí)候同時(shí)指定 Replica 的個(gè)數(shù);Partition 和 Replica 的實(shí)際物理存儲形式是通過 Log 文件展現(xiàn)的,為了防止消息不斷寫入,導(dǎo)致 Log 文件大小持續(xù)增長,所以將 Log 切割成一個(gè)一個(gè)的 LogSegment 文件。
注意: 在同一時(shí)刻,每個(gè)主 Partition 中有且只有一個(gè) LogSegment 被標(biāo)識為可寫入狀態(tài),當(dāng)一個(gè) LogSegment 文件大小超過一定大小后(比如當(dāng)文件大小超過 1G,這個(gè)就類似于 HDFS 存儲的數(shù)據(jù)文件,HDFS 中數(shù)據(jù)文件達(dá)到 128M 的時(shí)候就會(huì)被分出一個(gè)新的文件來存儲數(shù)據(jù)),就會(huì)新創(chuàng)建一個(gè) LogSegment 來繼續(xù)接收新寫入的消息。
寫入消息流程分析
消息寫入及落盤流程
流程解析
在第 3 篇文章講過,生產(chǎn)者客戶端對于每個(gè) Partition 一次會(huì)發(fā)送一批消息到服務(wù)端,服務(wù)端收到一批消息后寫入相應(yīng)的 Partition 上。上圖流程主要分為如下幾步:
- 客戶端消息收集器收集屬于同一個(gè)分區(qū)的消息,并對每條消息設(shè)置一個(gè)偏移量,且每一批消息總是從 0 開始單調(diào)遞增。比如第一次發(fā)送 3 條消息,則對三條消息依次編號 [0,1,2],第二次發(fā)送 4 條消息,則消息依次編號為 [0,1,2,3]。注意此處設(shè)置的消息偏移量是相對偏移量。
- 客戶端將消息發(fā)送給服務(wù)端,服務(wù)端拿到下一條消息的絕對偏移量,將傳到服務(wù)端的這批消息的相對偏移量修改成絕對偏移量。
- 將修改后的消息以追加的方式追加到當(dāng)前活躍的 LogSegment 后面,然后更新絕對偏移量。
- 將消息集寫入到文件通道。
- 文件通道將消息集 flush 到磁盤,完成消息的寫入操作。
了解以上過程后,我們在來看看消息的具體構(gòu)成情況。
消息構(gòu)成細(xì)節(jié)圖
一條消息由如下三部分構(gòu)成:
- OffSet:偏移量,消息在客戶端發(fā)送前將相對偏移量存儲到該位置,當(dāng)消息存儲到 LogSegment 前,先將其修改為絕對偏移量在寫入磁盤。
- Size:本條 Message 的內(nèi)容大小
- Message:消息的具體內(nèi)容,其具體又由 7 部分組成,crc 用于校驗(yàn)消息,Attribute 代表了屬性,key-length 和 value-length 分別代表 key 和 value 的長度,key 和 value 分別代表了其對應(yīng)的內(nèi)容。
消息偏移量的計(jì)算過程
通過以上流程可以看出,每條消息在被實(shí)際存儲到磁盤時(shí)都會(huì)被分配一個(gè)絕對偏移量后才能被寫入磁盤。在同一個(gè)分區(qū)內(nèi),消息的絕對偏移量都是從 0 開始,且單調(diào)遞增;在不同分區(qū)內(nèi),消息的絕對偏移量是沒有任何關(guān)系的。接下來討論下消息的絕對偏移量的計(jì)算規(guī)則。
確定消息偏移量有兩種方式,一種是順序讀取每一條消息來確定,此種方式代價(jià)比較大,實(shí)際上我們并不想知道消息的內(nèi)容,而只是想知道消息的偏移量;第二種是讀取每條消息的 Size 屬性,然后計(jì)算出下一條消息的起始偏移量。比如第一條消息內(nèi)容為 “abc”,寫入磁盤后的偏移量為:8(OffSet)+ 4(Message 大小)+ 3(Message 內(nèi)容的長度)= 15。第二條寫入的消息內(nèi)容為“defg”,其起始偏移量為 15,下一條消息的起始偏移量應(yīng)該是:15+8+4+4=31,以此類推。
消費(fèi)消息及副本同步流程分析
和寫入消息流程不同,讀取消息流程分為兩種情況,分別是消費(fèi)端消費(fèi)消息和從副本(備份副本)同步主副本的消息。在開始分析讀取流程之前,需要先明白幾個(gè)用到的變量,不然流程分析可能會(huì)看的比較糊涂。
- BaseOffSet:基準(zhǔn)偏移量,每個(gè) Partition 由 N 個(gè) LogSegment 組成,每個(gè) LogSegment 都有基準(zhǔn)偏移量,大概由如下構(gòu)成,數(shù)組中每個(gè)數(shù)代表一個(gè) LogSegment 的基準(zhǔn)偏移量:[0,200,400,600, ...]。
- StartOffSet:起始偏移量,由消費(fèi)端發(fā)起讀取消息請求時(shí),指定從哪個(gè)位置開始消費(fèi)消息。
- MaxLength:拉取大小,由消費(fèi)端發(fā)起讀取消息請求時(shí),指定本次最大拉取消息內(nèi)容的數(shù)據(jù)大小。該參數(shù)可以通過max.partition.fetch.bytes來指定,默認(rèn)大小為 1M。
- MaxOffSet:最大偏移量,消費(fèi)端拉取消息時(shí),最高可拉取消息的位置,即俗稱的“高水位”。該參數(shù)由服務(wù)端指定,其作用是為了防止生產(chǎn)端還未寫入的消息就被消費(fèi)端進(jìn)行消費(fèi)。此參數(shù)對于從副本同步主副本不會(huì)用到。
- MaxPosition:LogSegment 的最大位置,確定了起始偏移量在某個(gè) LogSegment 上開始,讀取 MaxLength 后,不能超過 MaxPosition。MaxPosition 是一個(gè)實(shí)際的物理位置,而非偏移量。
假設(shè)消費(fèi)端從 000000621 位置開始消費(fèi)消息,關(guān)于幾個(gè)變量的關(guān)系如下圖所示。
位置關(guān)系圖
消費(fèi)端和從副本拉取流程如下:
- 客戶端確定拉取的位置,即 StartOffSet 的值,找到主副本對應(yīng)的 LogSegment。
- LogSegment 由索引文件和數(shù)據(jù)文件構(gòu)成,由于索引文件是從小到大排列的,首先從索引文件確定一個(gè)小于等于 StartOffSet 最近的索引位置。
- 根據(jù)索引位置找到對應(yīng)的數(shù)據(jù)文件位置,由于數(shù)據(jù)文件也是從小到大排列的,從找到的數(shù)據(jù)文件位置順序向后遍歷,直到找到和 StartOffSet 相等的位置,即為消費(fèi)或拉取消息的位置。
從 StartOffSet 開始向后拉取 MaxLength 大小的數(shù)據(jù),返回給消費(fèi)端或者從副本進(jìn)行消費(fèi)或備份操作。
假設(shè)拉取消息起始位置為 00000313,消息拉取流程圖如下:
消息拉取流程圖
總結(jié)
本文從邏輯存儲和物理存儲的角度,分析了消息的寫入與消費(fèi)流程。其中邏輯存儲是以 Partition 來管理一批一批的消息,Partition 映射 Log 對象,Log 對象管理了多個(gè) LogSegment,多個(gè) Partition 構(gòu)成了一個(gè)完整的 Topic。消息的實(shí)際物理存儲是由一個(gè)一個(gè)的 LogSegment 構(gòu)成,每個(gè) LogSegment 又由索引文件和數(shù)據(jù)文件構(gòu)成。
網(wǎng)頁名稱:Kafka:消息是如何在服務(wù)端存儲與讀取的,你真的知道嗎?
標(biāo)題來源:http://m.fisionsoft.com.cn/article/coidphh.html


咨詢
建站咨詢
