新聞中心
圖解 Kafka 網(wǎng)絡層源碼實現(xiàn)機制之收發(fā)消息全過程
作者:王江華 2022-08-22 08:45:57
云計算
Kafka 今天我們主要對 Kafka 網(wǎng)絡層收發(fā)流程進行總結下。

成都創(chuàng)新互聯(lián)是一家專業(yè)提供陽信企業(yè)網(wǎng)站建設,專注與網(wǎng)站制作、成都網(wǎng)站制作、H5建站、小程序制作等業(yè)務。10年已為陽信眾多企業(yè)、政府機構等服務。創(chuàng)新互聯(lián)專業(yè)網(wǎng)站設計公司優(yōu)惠進行中。
大家好,我是 華仔, 又跟大家見面了。
今天我們主要對 Kafka 網(wǎng)絡層收發(fā)流程進行總結下,本系列總共分為3篇,這是下篇,主要剖析最后一個問題:
- 針對 Java NIO 的 SocketChannel,kafka 是如何封裝統(tǒng)一的傳輸層來實現(xiàn)最基礎的網(wǎng)絡連接以及讀寫操作的?
- 剖析 KafkaChannel 是如何對傳輸層、讀寫 buffer 操作進行封裝的?
- 剖析工業(yè)級 NIO 實戰(zhàn):如何基于位運算來控制事件的監(jiān)聽以及拆包、粘包是如何實現(xiàn)的?
- 剖析 Kafka 是如何封裝 Selector 多路復用器的?
- 剖析 Kafka 封裝的 Selector 是如何初始化并與 Broker 進行連接以及網(wǎng)絡讀寫的?
- 剖析 Kafka 網(wǎng)絡發(fā)送消息和接收響應的整個過程是怎樣的?
認真讀完這篇文章,我相信你會對 Kafka 網(wǎng)絡層源碼有更加深刻的理解。
這篇文章干貨很多,希望你可以耐心讀完。
一、總體概述
通過場景驅動的方式,在網(wǎng)絡請求封裝和監(jiān)聽好后,我們來看看消息是如何進行網(wǎng)絡收發(fā)的,都需要做哪些工作。
- 發(fā)送消息流程剖析
- 消息預發(fā)送
- 消息真正發(fā)送
- 接收響應流程剖析
- 讀取響應結果
- 解析響應信息
- 處理回調
為了方便大家理解,所有的源碼只保留骨干。
二、發(fā)送消息流程剖析
1、消息預發(fā)送
這部分涉及的東西比較多,此處就簡單的說明下,后續(xù)會有專門篇章進行剖析。
客戶端先準備要發(fā)送的消息,流程如下:
- Sender 子線程會從 RecordAccumulator 緩沖區(qū)拉取要發(fā)送的消息集合,抽取到的數(shù)據(jù)會存放到下面幾個地方:
- 發(fā)送時會放入 inFlightRequests 集合和 KafkaChannel 的 send 對象,其中 inFlightRequests 后續(xù)篇章再進行剖析,這里簡單說明下,該集合用來存儲和操作待發(fā)送消息的緩存區(qū),當請求準備網(wǎng)絡發(fā)送時,會把請求從隊頭放入隊列;當接收到響應后,會把請求從隊尾刪除。
- 待發(fā)送完成后會放入 completedRequests 集合。
- 對已經(jīng)過期的數(shù)據(jù)進行處理。
- 封裝客戶端請求 ClientRequest,把 ClientRequest 類對象發(fā)送給 NetworkClient,它主要有以下2個工作要做:
- 根據(jù) ClientRequest 類對象構造 InFlightRequest 類對象。
- 根據(jù) ClientRequest 類對象構造 NetworkSend 類對象,并放入到 KafkaChannel 的緩存里。
- 此時消息預發(fā)送結束。
接下來我們依次看下 Selector 和 KafkaChannel 類的具體源碼實現(xiàn)。
(1)請求數(shù)據(jù)暫存內存中
github 源碼地址如下:
https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/network/Selector.java
https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
/**
* 消息預發(fā)送
*/
public void send(Send send) {
// 1. 從服務端獲取 connectionId
String connectionId = send.destination();
// 2. 從數(shù)據(jù)包中獲取對應連接
KafkaChannel channel = openOrClosingChannelOrFail(connectionId);
// 3. 如果關閉連接集合中存在該連接
if (closingChannels.containsKey(connectionId)) {
// 把 connectionId 放入 failedSends 集合里
this.failedSends.add(connectionId);
} else {
try {
// 4. 暫存數(shù)據(jù)預發(fā)送,并沒有真正的發(fā)送,一次只能發(fā)送一個
channel.setSend(send);
} catch (Exception e) {
// 5. 更新 KafkaChannel 的狀態(tài)為發(fā)送失敗
channel.state(ChannelState.FAILED_SEND);
// 6. 把 connectionId 放入 failedSends 集合里
this.failedSends.add(connectionId);
// 7. 關閉連接
close(channel, CloseMode.DISCARD_NO_NOTIFY);
...
}
}
}
從源碼中可以看到調用了 KafkaChannel 類的 setSend() 方法。
public void setSend(Send send) {
if (this.send != null)
throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress, connection id is " + id);
// 設置要發(fā)送消息的字段
this.send = send;
// 調用傳輸層增加寫事件
this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
}
// PlaintextTransportLayer 類方法
@Override
public void addInterestOps(int ops) {
//通過 key.interestOps() | ops 來添加事件
key.interestOps(key.interestOps() | ops);
}該方法主要用來預發(fā)送,即在發(fā)送網(wǎng)絡請求前,將需要發(fā)送的ByteBuffer 數(shù)據(jù)保存到 KafkaChannel 的 send 中,然后調用傳輸層方法增加對這個 channel 上「OP_WRITE」事件的關注,同時還保留了「OP_READ」事件,此時該 Channel 是同時可以進行讀寫的。當真正執(zhí)行發(fā)送的時候,會先從 send 中讀取數(shù)據(jù)。
2、消息真正發(fā)送
Sender 子線程會調用 Selector 的 「poll」方法把請求真正發(fā)送出去。
(1)poll()
@Override
public void poll(long timeout) throws IOException {
...
// 調用nioSelector.select線程阻塞等待I/O事件并設置阻塞時間,等待I/O事件就緒發(fā)生,然后返回已經(jīng)監(jiān)控到了多少準備就緒的事件
int numReadyKeys = select(timeout);
// 監(jiān)聽到事件發(fā)生或立即連接集合不為空或存在緩存數(shù)據(jù)
if (numReadyKeys > 0 || !immediatelyConnectedKeys.isEmpty() || dataInBuffers) {
// 在SSL連接才可能會存在緩存數(shù)據(jù)
if (dataInBuffers) {
// 處理事件
pollSelectionKeys(toPoll, false, endSelect);
}
// 處理監(jiān)聽到的準備就緒事件
pollSelectionKeys(readyKeys, false, endSelect);
// 處理立即連接集合
pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
} else {
...
}
...
}
該方法就干了一件事,即收集準備就緒事件,并針對事件進行網(wǎng)絡操作,通過上述簡化代碼可以看出是調用了 「pollSelectionKeys」 方法,真正讀寫操作在該方法中,我們來看看:
(2)pollSelectionKeys()
void pollSelectionKeys(SetselectionKeys,boolean isImmediatelyConnected,long currentTimeNanos) {
//1. 循環(huán)調用當前監(jiān)聽到的事件(原順序或者洗牌后順序)
for (SelectionKey key : determineHandlingOrder(selectionKeys)) {
// 2. 之前創(chuàng)建連接,把kafkachanel注冊到key上,這里就是獲取對應的 channel
KafkaChannel channel = channel(key);
...
// 3. 獲取節(jié)點id
String nodeId = channel.id();
...
try {
...
// 4. 讀事件是否準備就緒了
if (channel.ready() && (key.isReadable() || channel.hasBytesBuffered()) && !hasCompletedReceive(channel) && !explicitlyMutedChannels.contains(channel)) {
// 嘗試處理讀事件
attemptRead(channel);
}
...
try {
// 5. 嘗試處理寫事件
attemptWrite(key, channel, nowNanos);
} catch (Exception e) {
sendFailed = true;
throw e;
}
} catch (Exception e) {
...
} finally {
....
}
}
}
該方法主要用來處理監(jiān)聽到的事件,包括連接事件、讀寫事件、以及立即完成的連接的。接下來我們看看嘗試進行網(wǎng)絡寫操作,如何才能進行真正寫。
(3)attemptWrite()
private void attemptWrite(SelectionKey key, KafkaChannel channel, long nowNanos) throws IOException {
// 此處需要滿足4個條件才可以進行寫操作
if (channel.hasSend()
&& channel.ready()
&& key.isWritable()
&& !channel.maybeBeginClientReauthentication(() -> nowNanos)) {
// 進行寫操作
write(channel);
}
}
// channel 連接就緒
public boolean ready() {
return transportLayer.ready() && authenticator.complete();
}
// java nio SelectionKey
public final boolean isWritable() {
return (readyOps() & OP_WRITE) != 0;
}該方法主要用來嘗試進行網(wǎng)絡寫操作,方法很簡單,必須「同時滿足4個條件」:
- 「channel 還有數(shù)據(jù)可以發(fā)送」即數(shù)據(jù)還未發(fā)送完成。
- 「channel 連接就緒」。
- 「寫事件是可寫狀態(tài)」只要寫緩沖區(qū)未寫滿會一直產生「OP_WRITE」 事件,如果不寫數(shù)據(jù)或者寫滿時則需要取消 「OP_WRITE」 事件,防止產生不必要的資源消耗。
- 「客戶端驗證沒有開啟」。
當滿足以上4個條件后就可以進行寫操作了,接下來我們看看寫操作的過程。
(4)write()
// 執(zhí)行寫操作
void write(KafkaChannel channel) throws IOException {
// 1.獲取 channel 對應的節(jié)點id
String nodeId = channel.id();
// 2. 將保存在 send 上的數(shù)據(jù)真正發(fā)送出去,但是一次不一定能發(fā)送完,會返回已經(jīng)發(fā)出的字節(jié)數(shù)
long bytesSent = channel.write();
// 3. 判斷是否發(fā)送完成,未完成返回null,等待下次poll繼續(xù)發(fā)送
Send send = channel.maybeCompleteSend();
// 4. 說明已經(jīng)發(fā)出或者發(fā)送完成
if (bytesSent > 0 || send != null) {
long currentTimeMs = time.milliseconds();
if (bytesSent > 0)
// 記錄發(fā)送字節(jié) Metrics 信息
this.sensors.recordBytesSent(nodeId, bytesSent, currentTimeMs);
// 發(fā)送完成
if (send != null) {
// 將 send 添加到 completedSends
this.completedSends.add(send);
// 記錄發(fā)送完成 Metrics 信息
this.sensors.recordCompletedSend(nodeId, send.size(), currentTimeMs);
}
}
}
該方法主要用來真正執(zhí)行網(wǎng)絡寫操作的,大家知道在網(wǎng)絡編程過程中,不一定一次性可以發(fā)送完成,此時就需要判斷是否發(fā)送完成,如果未完成返回null,「等待下次輪詢 poll() 會繼續(xù)發(fā)送,并繼續(xù)關注這個 channel 的寫事件」,如果發(fā)送完成,「則返回 send,并取消 Selector 在這個 socketchannel 上 OP_WRITE 事件的關注」。這里調用了 KafkaChannel 類的 write() 進行寫操作發(fā)送,并調用 maybeCompleteSend() 判斷是否發(fā)送完成,我們先來看下 write() 寫操作:
(5)KafkaChannel.write()
public long write() throws IOException {
// 判斷 send 是否為空,如果為空表示已經(jīng)發(fā)送完畢了
if (send == null)
return 0;
midWrite = true;
// 調用ByteBufferSend.writeTo把數(shù)據(jù)真正發(fā)送出去
return send.writeTo(transportLayer);
}該方法主要用來把保存在 send 上的數(shù)據(jù)真正發(fā)送出去,調用 ByteBufferSend.writeTo 把數(shù)據(jù)真正發(fā)送出去,我們來看看 wirteTo() 方法:
@Override
// 將字節(jié)流數(shù)據(jù)寫入到channel中
public long writeTo(GatheringByteChannel channel) throws IOException {
// 1.調用nio底層write方法把buffers寫入傳輸層返回寫入的字節(jié)數(shù)
long written = channel.write(buffers);
if (written < 0)
throw new EOFException("Wrote negative bytes to channel. This shouldn't happen.");
// 2.計算還剩多少字節(jié)沒有寫入傳輸層
remaining -= written;
// 每次發(fā)送 都檢查是否
pending = TransportLayers.hasPendingWrites(channel);
return written;
}
該方法主要用來把 buffers 數(shù)組寫入到 SocketChannel 里,因為在網(wǎng)絡編程中,寫一次不一定可以完全把數(shù)據(jù)都寫成功,所以調用java nio 底層 channel.write(buffers) 方法會返回「已經(jīng)寫入成功多少字節(jié)」的返回值,這樣調用一次后就知道已經(jīng)寫入多少字節(jié)了。
當調用 write() 以及一系列底層方法進行寫操作后,會返回已經(jīng)發(fā)出的字節(jié)數(shù),如果這次沒有發(fā)送完畢則返回 null,「等待下次輪詢 poll 繼續(xù)發(fā)送網(wǎng)絡寫操作,并繼續(xù)關注這個 channel 的寫事件」,所以需要判斷下本次是否發(fā)送完畢了,我們來看看:
(6)maybeCompleteSend()
// 可能完成發(fā)送
public Send maybeCompleteSend() {
// send 不為空且已經(jīng)發(fā)送完畢
if (send != null && send.completed()) {
midWrite = false;
// 當寫數(shù)據(jù)完畢后,取消傳輸層對 OP_WRITE 事件的監(jiān)聽,完成一次寫操作
transportLayer.removeInterestOps(SelectionKey.OP_WRITE);
// 將 send 賦值給結果集 result
Send result = send;
// 此時讀完后將 send 清空,以便下次寫
send = null;
// 最后返回結果集 result,完成一次寫操作
return result;
}
return null;
}
// PlaintextTransportLayer 類方法
@Override
public void removeInterestOps(int ops) {
// 通過 key.interestOps() & ~ops 來刪除事件
key.interestOps(key.interestOps() & ~ops);
}
// ByteBufferSend
@Override
public boolean completed() {
return remaining <= 0 && !pending;
}
該方法主要用來判斷是否寫數(shù)據(jù)完畢了,而判斷的寫數(shù)據(jù)完畢的條件是 buffer 中 remaining 沒有剩余且 pending 為 false。如果發(fā)送完成,把發(fā)送完成的請求添加到發(fā)送完成的集合 completedSends 里。
待消息請求發(fā)送完成后,又做了哪些工作呢?這里涉及到 NetworkClient 類的相關知識,這里簡單說明下,后續(xù)再剖析:
github 源碼地址如下:
https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
private void handleCompletedSends(Listresponses, long now) {
// if no response is expected then when the send is completed, return it
// 上面發(fā)送完成將 send 添加到 completedSends 集合,然后遍歷這個集合
for (Send send : this.selector.completedSends()) {
// 獲取 inFlightRequests 集合發(fā)往對應 Broker 的最后一個請求元素
InFlightRequest request = this.inFlightRequests.lastSent(send.destination());
// 判斷是否期望進行響應
if (!request.expectResponse) {
// 如果不期望進行響應就刪除inFlightRequests集合發(fā)往對應 Broker 請求隊列的第一個元素
this.inFlightRequests.completeLastSent(send.destination());
// 把請求添加到 responses 集合里
responses.add(request.completed(null, now));
}
}
}
從源碼可以看出會對「completedSends」集合和「inFlightRequests」集合是一個「互相協(xié)作」的關系。
其中「completedSends」集合是指發(fā)送完成但還沒有返回的請求集合,而「inFlightRequests」集合則是保存了已經(jīng)發(fā)送出去但還沒有收到響應結果的 Request 集合。其中「completedSends」的元素對應著「inFlightRequests」集合對應隊列的最后一個元素。
到此發(fā)送消息流程剖析完畢,至于發(fā)送完成后續(xù)工作,我們待講解 Sender 和 NetWorkClient 的時候再詳細進行剖析,接下來我們來看看接收響應流程。
三、接收響應流程剖析
?
在上面剖析 Selector.pollSelectionKeys() 時候,當網(wǎng)絡讀事件就緒后會調用 attemptRead() 進行嘗試網(wǎng)絡讀操作,我們來看看:
1、讀取響應結果
(1)attemptRead()
private void attemptRead(KafkaChannel channel) throws IOException {
// 獲取 channel 對應的節(jié)點 id
String nodeId = channel.id();
// 將從傳輸層中讀取數(shù)據(jù)到NetworkReceive對象中
long bytesReceived = channel.read();
if (bytesReceived != 0) {
...
// 判斷 NetworkReceive 對象是否已經(jīng)讀完了
NetworkReceive receive = channel.maybeCompleteReceive();
// 當讀完后把這個 NetworkReceive 對象添加到已經(jīng)接收完畢網(wǎng)絡請求集合里
if (receive != null) {
addToCompletedReceives(channel, receive, currentTimeMs);
}
}
...
}
// KafkaChannel 方法
public long read() throws IOException {
if (receive == null) {
// 初始化 NetworkReceive 對象
receive = new NetworkReceive(maxReceiveSize, id, memoryPool);
}
// 嘗試把 channel 的數(shù)據(jù)讀到 NetworkReceive 對象中
long bytesReceived = receive(this.receive);
...
return bytesReceived;
}該方法主要用來嘗試讀取數(shù)據(jù)并添加已經(jīng)接收完畢的集合中。我們看到會先調用 KafkaChannel.read() 方法進行讀取,然后判斷是否讀完了,如果沒有讀完,下次輪詢時候接著讀取,如果讀完了就假如到請求讀完的集合 completedReceives 中。
我們來看下是如何判斷 NetworkReceive 對象是否已經(jīng)讀完了的:
(2)maybeCompleteReceive()
// 判斷 NetworkReceive 對象是否已經(jīng)讀完了
// 如果此時并沒有讀完一個完整的NetworkReceive對象,則下次觸發(fā)讀事件會繼續(xù)填充整個NetworkReceive對象,
// 如果讀完一個完整的NetworkReceive對象則將其置空,下次觸發(fā)讀事件時會創(chuàng)建一個全新的NetworkReceive對象。
public NetworkReceive maybeCompleteReceive() {
if (receive != null && receive.complete()) {
receive.payload().rewind();
NetworkReceive result = receive;
receive = null;
return result;
}
return null;
}
// NetworkReceive
public boolean complete() {
return !size.hasRemaining() && buffer != null && !buffer.hasRemaining();
}
該方法主要用來判斷數(shù)據(jù)已經(jīng)讀取完畢了,而判斷是否讀完的條件是 NetworkReceive 里的 buffer 是否用完,包括上面說過的表示響應消息頭 size ByteBuffer 和響應消息體本身的 buffer ByteBuffer,這兩個都讀完才算真正讀完了。
如果此時并沒有讀完一個完整的 NetworkReceive 對象,則下次觸發(fā)讀事件會繼續(xù)填充整個 NetworkReceive 對象,如果此時讀完一個完整的NetworkReceive 對象則將其置空,下次觸發(fā)讀事件時會創(chuàng)建一個全新的NetworkReceive 對象。
2、解析響應消息
等讀取完一個完整響應消息后,接下來要做哪些工作呢?那就是要解析這個響應消息,我們來看看是如何實現(xiàn)的:
github 源碼地址如下:
https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
private void handleCompletedReceives(Listresponses, long now) {
// 當讀完后把這個 NetworkReceive 對象添加到已經(jīng)接收完畢網(wǎng)絡請求集合里,然后遍歷這個集合
for (NetworkReceive receive : this.selector.completedReceives()) {
// 獲取發(fā)送請求的node id
String source = receive.source();
// 從 InFlightRequest 集合取出對應的元素并刪除
InFlightRequest req = inFlightRequests.completeNext(source);
// 解析該響應
Struct responseStruct = parseStructMaybeUpdateThrottleTimeMetrics(receive.payload(), req.header,
throttleTimeSensor, now);
....
// 添加響應到響應結果集合中
responses.add(req.completed(response, now));
}
}
該方法主要用來循環(huán)遍歷 completedReceives 集合做一些響應處理工作,在文章開始的時候就簡單說過,收到響應后會將其從「inFlightRequests」中刪除掉,然后去解析這個響應:
private static Struct parseStructMaybeUpdateThrottleTimeMetrics(ByteBuffer responseBuffer, RequestHeader requestHeader,Sensor throttleTimeSensor, long now) {
// 獲取響應頭
ResponseHeader responseHeader = ResponseHeader.parse(responseBuffer,requestHeader.apiKey().responseHeaderVersion(requestHeader.apiVersion()));
// 獲取響應體
Struct responseBody = requestHeader.apiKey().parseResponse(requestHeader.apiVersion(), responseBuffer);
// 對比響應頭 correlationId 和響應體的 correlationId 是否一致,否則拋異常
correlate(requestHeader, responseHeader);
...
return responseBody;
}該方法主要用來解析響應的,并判斷響應頭跟響應體的 correlationId 值是否一致,否則拋異常。
此時只對響應做了解析但并沒有對響應進行處理,而響應處理是通過調用回調方法進行處理的,我們來看下。
3、處理回調
private void completeResponses(Listresponses) {
// 遍歷響應結果集合
for (ClientResponse response : responses) {
try {
response.onComplete();
} catch (Exception e) {
log.error("Uncaught error in request completion:", e);
}
}
}
//ClientResponse 類
public void onComplete() {
if (callback != null)
callback.onComplete(this);
}
到此接收響應消息流程剖析完畢。
四、總結
這里,我們一起來總結一下這篇文章的重點。
1、帶你先整體的梳理了 Kafka 網(wǎng)絡層收發(fā)流程,主要分為「發(fā)送消息流程」和「接收響應流程」。
2、又帶你分別剖析了發(fā)送消息流程和接收響應流程的源碼實現(xiàn)細節(jié)。
當前題目:圖解 Kafka 網(wǎng)絡層源碼實現(xiàn)機制之收發(fā)消息全過程
文章源于:http://m.fisionsoft.com.cn/article/cdieesi.html


咨詢
建站咨詢
