新聞中心
今天這篇我們先來聊聊 Kafka 生產(chǎn)者初始化時用到的核心組件以及發(fā)送的核心流程,帶你梳理生產(chǎn)者初始化整體的源碼分析脈絡(luò)。

認(rèn)真讀完這篇文章,我相信你會對 Kafka 生產(chǎn)初始化源碼有更加深刻的理解。
這篇文章干貨很多,希望你可以耐心讀完。
1 總體概述
我們都知道在 Kafka 中,我們把產(chǎn)生消息的一方稱為生產(chǎn)者即 Producer,它是 Kafka 核心組件之一,也是消息的來源所在。那么這些生產(chǎn)者產(chǎn)生的消息是如何傳到 Kafka 服務(wù)端的呢?初始化過程是怎么樣的呢?接下來會逐一講解說明。
2 生產(chǎn)者初始化核心組件及流程剖析
我們先從生產(chǎn)者客戶端構(gòu)造 KafkaProducer開始講起:
Properties properties = new Properties();
//構(gòu)造 KafkaProducer
KafkaProducer producer = new KafkaProducer(properties);
//調(diào)用send異步回調(diào)發(fā)送
producer.send(record,new DemoCallBack(record.topic(), record.key(), record.value()));
上面代碼主要做了2件事情:
1)初始化 KafkaProducer 實(shí)例。
2)調(diào)用 send 接口發(fā)送數(shù)據(jù),支持同步和異步回調(diào)方式。
待構(gòu)造完 KafkaProducer 就正式進(jìn)入生產(chǎn)者源碼的入口了,如下圖所示:
接下來我們分析一下 KafkaProducer 的源碼, 先看下該類里面的「重要字段」:
public class KafkaProducerimplements Producer {
private final Logger log;
private static final String JMX_PREFIX = "kafka.producer";
public static final String NETWORK_THREAD_PREFIX = "kafka-producer-network-thread";
public static final String PRODUCER_METRIC_GROUP_NAME = "producer-metrics";
// 生產(chǎn)者客戶端Id
private final String clientId;
// 消息分區(qū)器
private final Partitioner partitioner;
// 消息的最大的長度,默認(rèn)1M,生產(chǎn)環(huán)境可以提高到10M
private final int maxRequestSize;
// 發(fā)送消息的緩沖區(qū)的大小,默認(rèn)32M
private final long totalMemorySize;
// 集群元數(shù)據(jù)
private final ProducerMetadata metadata;
// 消息累加器
private final RecordAccumulator accumulator;
// 執(zhí)行發(fā)送消息的類
private final Sender sender;
// 執(zhí)行發(fā)送消息的線程
private final Thread ioThread;
// 消息壓縮類型
private final CompressionType compressionType;
// key的序列化器
private final SerializerkeySerializer;
// value的序列化器
private final SerializervalueSerializer;
// 生產(chǎn)者客戶端參數(shù)配置
private final ProducerConfig producerConfig;
// 等待元數(shù)據(jù)更新的最大時間,默認(rèn)1分鐘
private final long maxBlockTimeMs;
// 生產(chǎn)者攔截器
private final ProducerInterceptorsinterceptors;
// api版本
private final ApiVersions apiVersions;
// 事務(wù)管理器
private final TransactionManager transactionManager;
........
}
重要且核心字段含義如下:
1)clientId:生產(chǎn)者客戶端的ID。
2)partitioner:消息的分區(qū)器,即通過某些算法將消息分配到某一個分區(qū)中。
3)maxRequestSize:消息的最大的長度,默認(rèn)1M,生產(chǎn)環(huán)境可以提高到10M。
4)totalMemorySize:發(fā)送消息的緩沖區(qū)的大小,默認(rèn)32M。
5)metadata:集群的元數(shù)據(jù)。
6)accumulator:消息累加器,主要負(fù)責(zé)緩沖消息。
7)sender:執(zhí)行發(fā)送消息的類,主要負(fù)責(zé)發(fā)送消息。
8)ioThread:執(zhí)行發(fā)送消息的線程,主要負(fù)責(zé)封裝Sender類。
9)compressionType:消息壓縮的類型。
10)keySerializer:key的序列化器。
11)valueSerializer:value的序列化器。
12)producerConfig:生產(chǎn)者客戶端的配置參數(shù)。
13)maxBlockTimeMs:等待元數(shù)據(jù)更新和緩沖區(qū)分配的最長時間,默認(rèn)1分鐘。
14)interceptors:生產(chǎn)者攔截器。主要負(fù)責(zé)在消息發(fā)送前后對消息進(jìn)行攔截和處理。
接下來我們看下 KafkaProducer 的構(gòu)造方法,來剖析生產(chǎn)者發(fā)送消息的過程中涉及到的「核心組件」。
源碼位置:
kafka\clients\src\main\java\org\apache\kafka\clients\producer\KafkaProducer.java 323行
如果有不會安裝源碼環(huán)境的話,可以參考之前的 Kafka源碼之旅入門篇。
public class KafkaProducerimplements Producer {
......
KafkaProducer(Mapconfigs,
SerializerkeySerializer,
SerializervalueSerializer,
ProducerMetadata metadata,
KafkaClient kafkaClient,
ProducerInterceptorsinterceptors,
Time time) {
// 1.生產(chǎn)者配置初始化
ProducerConfig config = new ProducerConfig(ProducerConfig.appendSerializerToConfig(configs, keySerializer,
valueSerializer));
try {
// 2.獲取客戶端配置參數(shù)
MapuserProvidedConfigs = config.originals();
this.producerConfig = config;
this.time = time;
// 3.用于事務(wù)傳遞的TransactionalId,保證會話的可靠性,如果配置表示啟用冪等+事務(wù)
String transactionalId = (String) userProvidedConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
// 4.設(shè)置生產(chǎn)者客戶端id
this.clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
LogContext logContext;
// 根據(jù)事務(wù)id是否配置來記錄不同日志
if (transactionalId == null)
logContext = new LogContext(String.format("[Producer clientId=%s] ", clientId));
else
logContext = new LogContext(String.format("[Producer clientId=%s, transactionalId=%s] ", clientId, transactionalId));
log = logContext.logger(KafkaProducer.class);
log.trace("Starting the Kafka producer");
........省略Metrics
// 5.設(shè)置對應(yīng)的分區(qū)器
this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
// 6.失敗重試的退避時間,配置參數(shù):retry.backoff.ms 默認(rèn)100ms
long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
// 7.定義key、value對應(yīng)的序列化器
if (keySerializer == null) {
this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
this.keySerializer.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)), true);
} else {
config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
this.keySerializer = keySerializer;
}
if (valueSerializer == null) {
this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
this.valueSerializer.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)), false);
} else {
config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
this.valueSerializer = valueSerializer;
}
// load interceptors and make sure they get clientId
userProvidedConfigs.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
ProducerConfig configWithClientId = new ProducerConfig(userProvidedConfigs, false);
// 8.定義生產(chǎn)者攔截器列表
List> interceptorList = (List) configWithClientId.getConfiguredInstances(
ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptor.class);
if (interceptors != null)
this.interceptors = interceptors;
else
this.interceptors = new ProducerInterceptors<>(interceptorList);
ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keySerializer,
valueSerializer, interceptorList, reporters);
// 9.設(shè)置消息的最大的長度,默認(rèn)1M,生產(chǎn)環(huán)境可以提高到10M
this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
// 10.設(shè)置發(fā)送消息的緩沖區(qū)的大小,默認(rèn)32M
this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
// 11.設(shè)置消息壓縮類型
this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
// 12.設(shè)置等待元數(shù)據(jù)更新的最大時間,默認(rèn)1分鐘
this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
// 13.設(shè)置消息投遞的超時時間
int deliveryTimeoutMs = configureDeliveryTimeout(config, log);
this.apiVersions = new ApiVersions();
// 事務(wù)管理器
this.transactionManager = configureTransactionState(config, logContext);
....省略,看下面各小節(jié)源碼
config.logUnused();
AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds());
log.debug("Kafka producer started");
} catch (Throwable t) {
// call close methods if internal objects are already constructed this is to prevent resource leak. see KAFKA-2121
close(Duration.ofMillis(0), true);
// now propagate the exception
throw new KafkaException("Failed to construct kafka producer", t);
}
}
}
下面通過一張圖來描述 KafkaProducer 的初始化源碼過程:
Kafka Producer 初始化核心組件如下:
1)初始化生產(chǎn)者配置(ProducerConfig)。
2)設(shè)置客戶端配置文件的配置信息(userProvidedConfigs)。
3)設(shè)置事務(wù)ID(transactionaID)。
4)設(shè)置生產(chǎn)者客戶端ID(clientId)。
5)設(shè)置對應(yīng)的分區(qū)器(partitioner),支持自定義,用來將消息分配給某個主題的某個分區(qū)的。
6)設(shè)置失敗重試的退避時間(retryBackoffMs)。在客戶端請求服務(wù)端時,可能因為網(wǎng)絡(luò)或服務(wù)端異常造成請求超時。這時請求失敗會重試,但是如果重試的頻率過高又可能造成服務(wù)端網(wǎng)絡(luò)擁堵。因此必須等一段時間再請求,默認(rèn)100ms。
7)初始化key的序列化器(keySerializer)和value的序列化器(valueSerializer)。key和value的序列化器是用戶在初始化 KafkaProducer 的時候自定義的。
8)設(shè)置生產(chǎn)者攔截器(ProducerInterceptor),攔截器的主要作用是按照一定的規(guī)則統(tǒng)一對消息進(jìn)行處理。
9)設(shè)置消息的最大的長度(maxRequestSize)。默認(rèn)是1M,超了會報異常。在生產(chǎn)環(huán)境中建議設(shè)置為10M。
10)設(shè)置發(fā)送消息的緩沖區(qū)的大?。╰otalMemorySize),默認(rèn)是32M。
11)設(shè)置消息壓縮的類型(compressionType)。默認(rèn)是none表示不壓縮。在消息發(fā)送的過程中,為了提升發(fā)送消息的吞吐量會把消息進(jìn)行壓縮再發(fā)送。
12)設(shè)置等待元數(shù)據(jù)更新和緩沖區(qū)分配的最長時間(maxBlockTimeMs),默認(rèn)60S。
13)設(shè)置消息投遞超時時間(deliveryTimeoutMs),默認(rèn)120S。消息投遞時間是從發(fā)送到收到響應(yīng)的時間。
我們分析了 KafkaProducer 的核心組件,接下來我們分析下初始化過程中的核心流程。
(1)初始化消息累加器
// 初始化消息累加器---緩沖區(qū)
this.accumulator = new RecordAccumulator(logContext,
config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
this.compressionType,
lingerMs(config),
retryBackoffMs,
deliveryTimeoutMs,
metrics,
PRODUCER_METRIC_GROUP_NAME,
time,
apiVersions,
transactionManager,
new BufferPool(this.totalMemorySize, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), metrics, time, PRODUCER_METRIC_GROUP_NAME));
初始化消息累加器對象「accumulator」,部分重要參數(shù)如下:
1)batchSize :消息批次大小,默認(rèn)16KB;
2)compressionType:消息壓縮方式,主要包括none、gzip、snappy、lz4、zstd。默認(rèn)是不進(jìn)行壓縮,如果你的 Topic 占用的磁盤空間比較多的話,可以考慮啟用壓縮,以節(jié)省資源。
3)lingerMs:消息 batch 延遲多久再發(fā)送的時間,這是吞吐量與延時之間的權(quán)衡。為了不頻繁發(fā)送網(wǎng)絡(luò)請求,設(shè)置延遲時間后 batch 會盡量積累更多的消息再發(fā)送出去。
4)retryBackoffMs:設(shè)置失敗重試的退避時間。
5)deliveryTimeoutMs:設(shè)置消息投遞超時時間。
6)apiVersion:客戶端api版本。
7)transactionalManager:事務(wù)管理器。
8)BufferPool 分配:后續(xù)篇在進(jìn)行深度剖析。
消息累加器---緩沖區(qū)的設(shè)計是 Kafka Producer 非常優(yōu)秀和經(jīng)典的設(shè)計,Kafka 中消息不是生產(chǎn)后立馬就發(fā)送給服務(wù)端的,而是會先寫入一個緩沖池中,然后直到多條消息組成了一個 Batch,達(dá)到一定條件才會一次網(wǎng)絡(luò)通信把 Batch 發(fā)送過去,利用該設(shè)計來避免 JVM 頻繁的 Full GC 的問題,后續(xù)會單獨(dú)對其進(jìn)行深度剖析。
(2)初始化集群元數(shù)據(jù)
元數(shù)據(jù)的獲取涉及的組件比較多,主要分為:
1)KafkaProducer 主線程負(fù)責(zé)加載元數(shù)據(jù)。
2)Sender 子線程負(fù)責(zé)拉取元數(shù)據(jù)。
首先我們來看下 KafkaProducer 主線程是如何加載元數(shù)據(jù)。
元數(shù)據(jù)「metadata」的初始化的時候是在 KafkaProducer 主線程里面的,源代碼如下:
// 初始化 Kafka 集群元數(shù)據(jù),元數(shù)據(jù)會保存到客戶端中,并與服務(wù)端元數(shù)據(jù)保持一致
if (metadata != null) {
this.metadata = metadata;
} else {
// 初始化集群元數(shù)據(jù)
this.metadata = new ProducerMetadata(retryBackoffMs,
// 元數(shù)據(jù)過期時間:默認(rèn)5分鐘
config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),
// topic最大空閑時間,如果在規(guī)定時間沒有被訪問,將從緩存刪除,下次訪問時強(qiáng)制獲取元數(shù)據(jù)
config.getLong(ProducerConfig.METADATA_MAX_IDLE_CONFIG),
logContext,
clusterResourceListeners,
Time.SYSTEM);
// 啟動metadata的引導(dǎo)程序
this.metadata.bootstrap(addresses);
}
它會保存在客戶端內(nèi)存中,并與服務(wù)端保持準(zhǔn)實(shí)時的數(shù)據(jù)一致性,元數(shù)據(jù)主要包含:
1)Kafka 集群節(jié)點(diǎn)信息。
2)Topic 信息。
3)Topic對應(yīng)的分區(qū)信息
4)ISR列表信息以及分布情況
5)Leader Partition 所在節(jié)點(diǎn)
等等
從上面源代碼我們可以看出在 KafkaProducer 的構(gòu)造方法中初始化了元數(shù)據(jù)類「metadata」,然后調(diào)用 「metadata.bootstrap()」來啟動引導(dǎo)程序,這個時候 metaData 對象里并沒有具體的元數(shù)據(jù)信息,因為客戶端還沒發(fā)送元數(shù)據(jù)更新的請求「獲取是通過喚醒 Sender 線程進(jìn)行發(fā)送的」。
而具體的發(fā)送和拉取,我們將在下一篇中進(jìn)行剖析。
(3)初始化 Sender 線程
// 初始化 Sender 發(fā)送線程類,并同時初始化NetworkClient
this.sender = newSender(logContext, kafkaClient, this.metadata);
這里非常關(guān)鍵,初始化 「Sender」發(fā)送線程類,并同時初始化 「NetworkClient」,它為 sender 提供了網(wǎng)絡(luò)IO的能力,后續(xù)我們會對其深度剖析。
(4)ioThread 啟用 Sender 線程
String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
// 用 ioThread 線程來封裝 Sender 線程類,使用 demon 守護(hù)線程方式來啟動 Sender 線程類
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();
public KafkaThread(final String name, Runnable runnable, boolean daemon) {
super(runnable, name);
configureThread(name, daemon);
}
private void configureThread(final String name, boolean daemon) {
setDaemon(daemon);
setUncaughtExceptionHandler((t, e) -> log.error("Uncaught exception in thread '{}':", name, e));
}
從上面源代碼可以看出使用「ioThread」線程來封裝 「Sender」 線程類,并使用 demon 守護(hù)線程方式來啟動 Sender 線程類。
這里的設(shè)計模式非常值得我們?nèi)W(xué)習(xí),就是在設(shè)計一些后臺線程的時候,可以把「線程本身」和「線程執(zhí)行」的邏輯分開,Sender 線程就是線程執(zhí)行的具體邏輯,而 KafkaThread 其實(shí)代表了這個「線程本身」、「線程的名字」、「未捕獲的異常處理」,「deamon 線程設(shè)置」。對 KafkaThread 的啟動會自動執(zhí)行 Sender 線程的 Run() 方法。
(5)doSend 發(fā)送
用戶可以直接使用 「producer.send()」 進(jìn)行數(shù)據(jù)的發(fā)送,先看一下 「Send()」接口的源碼實(shí)現(xiàn)。
// 向 topic 異步發(fā)送數(shù)據(jù),此時回調(diào)為空
public Futuresend(ProducerRecord record) {
return send(record, null);
}
// 向 topic 異步地發(fā)送數(shù)據(jù),當(dāng)發(fā)送確認(rèn)后喚起回調(diào)函數(shù)
public Futuresend(ProducerRecord record, Callback callback) {
// intercept the record, which can be potentially modified; this method does not throw exceptions
ProducerRecordinterceptedRecord = this.interceptors.onSend(record);
return doSend(interceptedRecord, callback);
}
生產(chǎn)環(huán)境我們一般會使用帶回調(diào)函數(shù)的方式去發(fā)送,所以最終實(shí)現(xiàn)還是調(diào)用了 KafkaProducer 的 「doSend()」 接口。
該方法只是把消息發(fā)送到緩沖區(qū)后直接返回,真正的發(fā)送是需要等待 Sender 線程把消息從緩沖區(qū)將消息取出來后再進(jìn)行發(fā)送。
源碼比較長,這里只簡單的分析下都做了哪些事情,后續(xù)再進(jìn)行深度剖析,源碼如下:
private FuturedoSend(ProducerRecord record, Callback callback) {
TopicPartition tp = null;
try {
....省略
// 1.等待元數(shù)據(jù)更新即確認(rèn)數(shù)據(jù)要發(fā)送到的 topic 的 metadata 是可用的
clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs);
....省略
// 2.序列化 record的key和value
byte[] serializedKey;
serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
byte[] serializedValue;
serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
// 3.獲取record消息對應(yīng)的分區(qū)
int partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
....省略
// 4.驗證消息的大小
ensureValidRecordSize(serializedSize);
// 5.組裝回調(diào)方法和攔截器為一個對象
Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
....省略
// 6.向 accumulator 中追加數(shù)據(jù)
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);
// 7.新的批次需要重新進(jìn)行分區(qū)
if (result.abortForNewBatch) {
int prevPartition = partition;
partitioner.onNewBatch(record.topic(), cluster, prevPartition);
partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
// producer callback will make sure to call both 'callback' and interceptor callback
interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs);
}
// 8.如果 batch 已經(jīng)滿了, 則喚醒 sender 線程發(fā)送數(shù)據(jù)
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
}
return result.future;
} catch (ApiException e) {
....省略
return new FutureFailure(e);
} catch (InterruptedException e) {
....省略
當(dāng)前題目:圖解Kafka生產(chǎn)者初始化核心流程
分享路徑:http://m.fisionsoft.com.cn/article/ccegsei.html


咨詢
建站咨詢
