新聞中心
如何將Kafka中的數(shù)據(jù)快速導(dǎo)入hadoop?
作者:趙鈺瑩 2018-10-22 14:48:39
大數(shù)據(jù)
Kafka
Hadoop Kafka是一個(gè)分布式發(fā)布—訂閱系統(tǒng),由于其強(qiáng)大的分布式和性能特性,迅速成為數(shù)據(jù)管道的關(guān)鍵部分。它可完成許多工作,例如消息傳遞、指標(biāo)收集、流處理和日志聚合。Kafka的另一個(gè)有效用途是將數(shù)據(jù)導(dǎo)入Hadoop。

創(chuàng)新互聯(lián)專注為客戶提供全方位的互聯(lián)網(wǎng)綜合服務(wù),包含不限于網(wǎng)站建設(shè)、成都網(wǎng)站建設(shè)、江西網(wǎng)絡(luò)推廣、微信小程序、江西網(wǎng)絡(luò)營(yíng)銷、江西企業(yè)策劃、江西品牌公關(guān)、搜索引擎seo、人物專訪、企業(yè)宣傳片、企業(yè)代運(yùn)營(yíng)等,從售前售中售后,我們都將竭誠(chéng)為您服務(wù),您的肯定,是我們最大的嘉獎(jiǎng);創(chuàng)新互聯(lián)為所有大學(xué)生創(chuàng)業(yè)者提供江西建站搭建服務(wù),24小時(shí)服務(wù)熱線:13518219792,官方網(wǎng)址:www.cdcxhl.com
Kafka是一個(gè)分布式發(fā)布—訂閱系統(tǒng),由于其強(qiáng)大的分布式和性能特性,迅速成為數(shù)據(jù)管道的關(guān)鍵部分。它可完成許多工作,例如消息傳遞、指標(biāo)收集、流處理和日志聚合。Kafka的另一個(gè)有效用途是將數(shù)據(jù)導(dǎo)入Hadoop。使用Kafka的關(guān)鍵原因是它將數(shù)據(jù)生產(chǎn)者和消費(fèi)者分離,允許擁有多個(gè)獨(dú)立的生產(chǎn)者(可能由不同的開(kāi)發(fā)團(tuán)隊(duì)編寫)。同樣,還有多個(gè)獨(dú)立的消費(fèi)者(也可能由不同的團(tuán)隊(duì)編寫)。此外,消費(fèi)者可以是實(shí)時(shí)/同步或批量/離線/異步。當(dāng)對(duì)比RabbitMQ等其他pub-sub工具時(shí),后一個(gè)屬性有很大區(qū)別。
要使用Kafka,有一些需要理解的概念:
- topic—topic是相關(guān)消息的訂閱源;
- 分區(qū)—每個(gè)topic由一個(gè)或多個(gè)分區(qū)組成,這些分區(qū)是由日志文件支持的有序消息隊(duì)列;
- 生產(chǎn)者和消費(fèi)者—生產(chǎn)者和消費(fèi)者將消息寫入分區(qū)并從分區(qū)讀取。
Brokers—Brokers是管理topic和分區(qū)并為生產(chǎn)者和消費(fèi)者請(qǐng)求提供服務(wù)的Kafka流程。
Kafka不保證對(duì)topic的“完全”排序,只保證組成topic的各個(gè)分區(qū)是有序的。消費(fèi)者應(yīng)用程序可以根據(jù)需要強(qiáng)制執(zhí)行對(duì)“全局”topic排序。
圖5.14 顯示了Kafka的概念模型
圖5.15 顯示了如何在Kafka部署分發(fā)分區(qū)的示例
為了支持容錯(cuò),可以復(fù)制topic,這意味著每個(gè)分區(qū)可以在不同主機(jī)上具有可配置數(shù)量的副本。這提供了更高的容錯(cuò)能力,這意味著單個(gè)服務(wù)器死亡對(duì)數(shù)據(jù)或生產(chǎn)者和消費(fèi)者的可用性來(lái)說(shuō)不是災(zāi)難性的。
此處采用Kafka版本0.8和Camus的0.8.X。
實(shí)踐:使用Camus將Avro數(shù)據(jù)從Kafka復(fù)制到HDFS
該技巧在已經(jīng)將數(shù)據(jù)流入Kafka用于其他目的并且希望將數(shù)據(jù)置于HDFS中的情況下非常有用。
問(wèn)題
希望使用Kafka作為數(shù)據(jù)傳遞機(jī)制來(lái)將數(shù)據(jù)導(dǎo)入HDFS。
解決方案
使用LinkedIn開(kāi)發(fā)的解決方案Camus將Kafka中的數(shù)據(jù)復(fù)制到HDFS。
討論
Camus是LinkedIn開(kāi)發(fā)的一個(gè)開(kāi)源項(xiàng)目。Kafka在LinkedIn大量部署,而Camus則用作將數(shù)據(jù)從Kafka復(fù)制到HDFS。
開(kāi)箱即用,Camus支持Kafka中的兩種數(shù)據(jù)格式:JSON和Avro。在這種技術(shù)中,我們將通過(guò)Camus使用Avro數(shù)據(jù)。Camus對(duì)Avro的內(nèi)置支持要求Kafka發(fā)布者以專有方式編寫Avro數(shù)據(jù),因此對(duì)于這種技術(shù),我們假設(shè)希望在Kafka中使用vanilla序列化數(shù)據(jù)。
讓這項(xiàng)技術(shù)發(fā)揮作用需要完成三個(gè)部分的工作:首先要將一些Avro數(shù)據(jù)寫入Kafka,然后編寫一個(gè)簡(jiǎn)單的類來(lái)幫助Camus反序列化Avro數(shù)據(jù),最后運(yùn)行一個(gè)Camus作業(yè)來(lái)執(zhí)行數(shù)據(jù)導(dǎo)入。
為了把Avro記錄寫入Kafka,在以下代碼中,需要通過(guò)配置必需的Kafka屬性來(lái)設(shè)置Kafka生成器,從文件加載一些Avro記錄,并將它們寫出到Kafka:
可以使用以下命令將樣本數(shù)據(jù)加載到名為test的Kafka的topic中:
Kafka控制臺(tái)使用者可用于驗(yàn)證數(shù)據(jù)是否已寫入Kafka,這會(huì)將二進(jìn)制Avro數(shù)據(jù)轉(zhuǎn)儲(chǔ)到控制臺(tái):
完成后,編寫一些Camus代碼,以便可以在Camus中閱讀這些Avro記錄。
實(shí)踐:編寫Camus和模式注冊(cè)表
首先,需要了解三種Camus概念:
- 解碼器—解碼器的工作是將從Kafka提取的原始數(shù)據(jù)轉(zhuǎn)換為Camus格式。
- 編碼器—編碼器將解碼數(shù)據(jù)序列化為將存儲(chǔ)在HDFS中的格式。
- Schema注冊(cè)表—提供有關(guān)正在編碼的Avro數(shù)據(jù)的schema信息。
正如前面提到的,Camus支持Avro數(shù)據(jù),但確實(shí)需要Kafka生產(chǎn)者使用Camus KafkaAvroMessageEncoder類來(lái)編寫數(shù)據(jù),該類為Avro序列化二進(jìn)制數(shù)據(jù)添加了部分專有數(shù)據(jù),可能是因?yàn)镃amus中的解碼器可以驗(yàn)證它是由該類編寫的。
在此示例中,使用 Avro serialization進(jìn)行序列化,因此需要編寫自己的解碼器。幸運(yùn)的是,這很簡(jiǎn)單:
你可能已經(jīng)注意到我們?cè)贙afka中寫了一個(gè)特定的Avro記錄,但在Camus中我們將該記錄讀作通用的Avro記錄,而不是特定的Avro記錄,這是因?yàn)镃amusWrapper類僅支持通用Avro記錄。否則,特定的Avro記錄可以更簡(jiǎn)單地使用,因?yàn)榭梢允褂蒙傻拇a并具有隨之而來(lái)的所有安全特征。
CamusWrapper對(duì)象是從Kafka提取的數(shù)據(jù)。此類存在的原因是允許將元數(shù)據(jù)粘貼到envelope中,例如時(shí)間戳,服務(wù)器名稱和服務(wù)詳細(xì)信息。強(qiáng)烈建議使用的任何數(shù)據(jù)都有一些與每條記錄相關(guān)的有意義的時(shí)間戳(通常這將是創(chuàng)建或生成記錄的時(shí)間)。然后,可以使用接受時(shí)間戳作為參數(shù)的CamusWrapper構(gòu)造函數(shù):
- public CamusWrapper(R record, long timestamp) { ... }
如果未設(shè)置時(shí)間戳,則Camus將在創(chuàng)建包裝器時(shí)創(chuàng)建新的時(shí)間戳。在確定輸出記錄的HDFS位置時(shí),在Camus中使用此時(shí)間戳和其他元數(shù)據(jù)。
接下來(lái),需要編寫一個(gè)schema注冊(cè)表,以便Camus Avro編碼器知道正在寫入HDFS的Avro記錄的schema詳細(xì)信息。注冊(cè)架構(gòu)時(shí),還要指定從中拉出Avro記錄的Kafka的topic名稱:
運(yùn)行Camus
Camus在Hadoop集群上作為MapReduce作業(yè)運(yùn)行,希望在該集群中導(dǎo)入Kafka數(shù)據(jù)。需要向Camus提供一堆屬性,可以使用命令行或者使用屬性文件來(lái)執(zhí)行此操作,我們將使用此技術(shù)的屬性文件:
從屬性中可以看出,無(wú)需明確告訴Camus要導(dǎo)入哪些topic。Camus自動(dòng)與Kafka通信以發(fā)現(xiàn)topic(和分區(qū))以及當(dāng)前的開(kāi)始和結(jié)束偏移。
如果想要精確控制導(dǎo)入的topic,可以分別使用kafka.whitelist.topics和kafka.blacklist.topics列舉白名單(限制topic)和黑名單(排除topic),可以使用逗號(hào)作為分隔符指定多個(gè)topic,還支持正則表達(dá)式,如以下示例所示,其匹配topic的“topic1”或以“abc”開(kāi)頭,后跟一個(gè)或多個(gè)數(shù)字的任何topic,可以使用與value完全相同的語(yǔ)法指定黑名單:
- kafka.whitelist.topics=topic1,abc[0-9]+
一旦屬性全部設(shè)置完畢,就可以運(yùn)行Camus作業(yè)了:
這將導(dǎo)致Avro數(shù)據(jù)在HDFS中著陸。我們來(lái)看看HDFS中的內(nèi)容:
第一個(gè)文件包含已導(dǎo)入的數(shù)據(jù),其他供Camus管理。
可以使用AvroDump實(shí)用程序查看HDFS中的數(shù)據(jù)文件:
那么,當(dāng)Camus工作正在運(yùn)行時(shí)究竟發(fā)生了什么? Camus導(dǎo)入過(guò)程作為MapReduce作業(yè)執(zhí)行,如圖5.16所示。
隨著MapReduce中的Camus任務(wù)成功,Camus OutputCommitter(允許在任務(wù)完成時(shí)執(zhí)行自定義工作的MapReduce構(gòu)造)以原子方式將任務(wù)的數(shù)據(jù)文件移動(dòng)到目標(biāo)目錄。OutputCommitter還為任務(wù)正在處理的所有分區(qū)創(chuàng)建偏移文件,同一作業(yè)中的其他任務(wù)可能會(huì)失敗,但這不會(huì)影響成功任務(wù)的狀態(tài)——成功任務(wù)的數(shù)據(jù)和偏移輸出仍然存在,因此后續(xù)的Camus執(zhí)行將從最后一個(gè)已知的成功狀態(tài)恢復(fù)處理。
接下來(lái),讓我們看看Camus導(dǎo)入數(shù)據(jù)的位置以及如何控制行為。
數(shù)據(jù)分區(qū)
之前,我們看到了Camus導(dǎo)入位于Kafka的Avro數(shù)據(jù),讓我們仔細(xì)看看HDFS路徑結(jié)構(gòu),如圖5.17所示,看看可以做些什么來(lái)確定位置。
圖5.17 在HDFS中解析導(dǎo)出數(shù)據(jù)的Camus輸出路徑
路徑的日期/時(shí)間由從CamusWrapper中提取的時(shí)間戳確定,可以從MessageDecoder中的Kafka記錄中提取時(shí)間戳,并將它們提供給CamusWrapper,這將允許按照有意義的日期對(duì)數(shù)據(jù)進(jìn)行分區(qū),而不是默認(rèn)值,這只是在MapReduce中讀取Kafka記錄的時(shí)間。
Camus支持可插拔分區(qū)程序,允許控制圖5.18所示路徑的一部分。
圖5.18 Camus分區(qū)路徑
Camus Partitioner接口提供了兩種必須實(shí)現(xiàn)的方法:
例如,自定義分區(qū)程序可創(chuàng)建用于Hive分區(qū)的路徑。
總結(jié)
Camus提供了一個(gè)完整的解決方案,可以在HDFS中從Kafka獲取數(shù)據(jù),并在出現(xiàn)問(wèn)題時(shí)負(fù)責(zé)維護(hù)狀態(tài)和進(jìn)行錯(cuò)誤處理。通過(guò)將其與Azkaban或Oozie集成,可以輕松實(shí)現(xiàn)自動(dòng)化,并根據(jù)消息時(shí)間組織HDFS數(shù)據(jù)執(zhí)行簡(jiǎn)單的數(shù)據(jù)管理。值得一提的是,當(dāng)涉及到ETL時(shí),與Flume相比,它的功能是無(wú)懈可擊的。
Kafka捆綁了一種將數(shù)據(jù)導(dǎo)入HDFS的機(jī)制。它有一個(gè)KafkaETLInputFormat輸入格式類,可用于在MapReduce作業(yè)中從Kafka提取數(shù)據(jù)。要求編寫MapReduce作業(yè)以執(zhí)行導(dǎo)入,但優(yōu)點(diǎn)是可以直接在MapReduce流中使用數(shù)據(jù),而不是將HDFS用作數(shù)據(jù)的中間存儲(chǔ)。接下來(lái),我們將討論如何將駐留在Hadoop中的數(shù)據(jù)傳輸?shù)狡渌到y(tǒng),例如文件系統(tǒng)和其他地方。
當(dāng)前名稱:如何將kafka中的數(shù)據(jù)快速導(dǎo)入Hadoop?
當(dāng)前地址:http://m.fisionsoft.com.cn/article/djsgdip.html


咨詢
建站咨詢
