新聞中心
使用ElasticSearch、Kafka和Cassandra構(gòu)建流式數(shù)據(jù)中心
作者:佚名 2015-10-22 14:02:58
服務(wù)器
數(shù)據(jù)中心
Kafka 在過去的一年里,我遇到了一些軟件公司討論如何處理應(yīng)用程序的數(shù)據(jù)(通常以日志和metrics的形式)。在這些討論中,我經(jīng)常會聽到挫折感,他們不得不用一組零碎的工具,隨著時(shí)間的推移將這些數(shù)據(jù)匯總起來。

在過去的一年里,我遇到了一些軟件公司討論如何處理應(yīng)用程序的數(shù)據(jù)(通常以日志和metrics的形式)。在這些討論中,我經(jīng)常會聽到挫折感,他們不得不用一組零碎的工具,隨著時(shí)間的推移將這些數(shù)據(jù)匯總起來。這些工具,如: - 運(yùn)維人員使用的,用于監(jiān)控和告警的工具
- 開發(fā)人員用于跟蹤性能和定位問題的工具
- 一個(gè)完整獨(dú)立的系統(tǒng),商業(yè)智能(BI)和業(yè)務(wù)依賴其分析用戶行為。
雖然這些工具使用不同的視角,適用不同的場景,但是他們同樣都是關(guān)注數(shù)據(jù)來源和類型。因此,許多軟件團(tuán)隊(duì)說,“如果時(shí)間充裕,我們可以建立一個(gè)更好的”,坦率地說,現(xiàn)在有很多出色的開源代碼,自己重頭建立一套是否更有意義值得商榷。在Jut我們就是這樣做的。我們使用開源的大數(shù)據(jù)組件建立了一個(gè)流式數(shù)據(jù)分析系統(tǒng),這篇文章描述了我們使用的片段以及我們?nèi)绾伟阉鼈兘M合在一起。我們將介紹:
- 數(shù)據(jù)攝?。喝绾我氩煌愋偷臄?shù)據(jù)流
- 索引及保存數(shù)據(jù):高效存儲以及統(tǒng)一查詢
- 串聯(lián):系統(tǒng)中的數(shù)據(jù)流過程
- 調(diào)優(yōu):讓整個(gè)過程真正的快速,用戶才會真的使用它。
我希望通過閱讀這篇文章將有助于您的系統(tǒng)在一個(gè)理智的,可擴(kuò)展的方式避免一些我們遇到的陷阱。
1數(shù)據(jù)攝取
當(dāng)涉及到業(yè)務(wù)分析和監(jiān)控,大部分相關(guān)的數(shù)據(jù)類型,格式和傳輸協(xié)議并不是固定的。你需要能夠支持系統(tǒng)不同的數(shù)據(jù)來源和數(shù)據(jù)發(fā)送者。例如,您的數(shù)據(jù)可能包括下列任何一種:
- 自定義的應(yīng)用程序事件。
- 容器級指標(biāo)和日志。
- statsd或收集的度量指標(biāo)。
- 來自第三方的webhook事件,像GitHub或Stripe。
- 應(yīng)用程序或服務(wù)器日志。
- 用戶行為。 雖然這些都有不同的格式和象征,他們在系統(tǒng)內(nèi)部需要一個(gè)統(tǒng)一的格式。無論你選擇哪一個(gè)格式,你都需要對輸入的數(shù)據(jù)流做轉(zhuǎn)換。
我們選擇了簡單靈活的數(shù)據(jù)格式:每個(gè)記錄(“點(diǎn)”)是一系列的鍵/值對,它可以方便地表示為一個(gè)JSON對象。所有的點(diǎn)都有一個(gè)“時(shí)間”字段,度量點(diǎn)也有一個(gè)數(shù)值型的“值”字段;其他點(diǎn)可以有任何的“形狀”。前端HTTPS服務(wù)器(運(yùn)行Nginx)接收數(shù)據(jù),多路分配并發(fā)送到本地的每個(gè)數(shù)據(jù)類型“連接器”進(jìn)程(運(yùn)行Node.js)。這些進(jìn)程將傳入的數(shù)據(jù)轉(zhuǎn)換為系統(tǒng)的內(nèi)部格式,然后將它們發(fā)布到一個(gè)Kafka topic(可靠性),從中,它們可以被用于索引和/或處理。 除了上面的數(shù)據(jù)類型,多考慮使用連接器,能使您自己的團(tuán)隊(duì)最容易將輸入數(shù)據(jù)整合到您的數(shù)據(jù)總線。你可能不需要太多我在這里描述的通用性或靈活性,但設(shè)計(jì)一些靈活性總是好的,這使你系統(tǒng)能夠攝取更多的數(shù)據(jù)類型,防止以后新數(shù)據(jù)到來要重新建造。
2索引及保存數(shù)據(jù)
所有這些數(shù)據(jù)都需要保存在某個(gè)地方。***在一個(gè)數(shù)據(jù)庫中,當(dāng)您的數(shù)據(jù)需要的增長時(shí),將很容易擴(kuò)展。并且如果該數(shù)據(jù)庫提供對分析類型的查詢方式支持,那***不過了。如果這個(gè)數(shù)據(jù)中心只是為了存儲日志和事件,那么你可以選擇Elasticsearch。如果這只是關(guān)于度量指標(biāo),你可以選擇一個(gè)時(shí)間序列數(shù)據(jù)庫(TSDB)。但是我們都需要處理。我們最終建立了一個(gè)系統(tǒng),有多個(gè)本地?cái)?shù)據(jù)存儲,以便我們能夠最有效地處理不同類型的數(shù)據(jù)。
ElasticSearch保存日志以及Events
我們使用Elasticsearch作為事件數(shù)據(jù)庫。這些事件可以有不同的“形狀”,這取決于他們來自哪一個(gè)來源。我們使用了一些Elasticsearch API,效果很好,特別是查詢和聚合API。
Cassandra和ElasticSearch保存Metrics
而metrics,原則上,是完全存儲在Elasticsearch(或任何其他數(shù)據(jù)庫),使用一個(gè)專門的匹配metrics數(shù)據(jù)結(jié)構(gòu)以及metrics冗余數(shù)據(jù)的數(shù)據(jù)庫將更有效。 ***的方法是使用現(xiàn)有的開源時(shí)間序列數(shù)據(jù)庫(TSDB)。
我們最初是這么使用的 —— 我使用開源TSDB并使用Cassandra作為后端。這種方法的挑戰(zhàn)是,TSDB有自己的查詢API,它不同于Elasticsearch的API。由于API之間的不同,為事件和指標(biāo)提供一個(gè)統(tǒng)一的搜索和查詢界面是很難的。 這就是為什么我們最終決定寫自己的TSDB,通過Casandra和Elasticsearch存儲metrics。
具體來說,我們在Cassandra中存儲的時(shí)間/值的鍵值對,在Elasticsearch中存儲元數(shù)據(jù),并在頂部有一個(gè)查詢和管理層。這樣,搜索和查詢事件以及metrics可以統(tǒng)一在Elasticsearch做。 流式處理引擎 那么現(xiàn)在我們有一個(gè)攝取數(shù)據(jù)的途徑和一些數(shù)據(jù)庫。我們是否可以準(zhǔn)備添加前端應(yīng)用程序并使用我們的數(shù)據(jù)?并沒有!盡管Elasticsearch本身可以做一些日志和事件分析,我們?nèi)匀贿€需要一個(gè)處理引擎。因?yàn)椋?/p>
- 我們需要一個(gè)統(tǒng)一的方式來訪問事件和指標(biāo),包括實(shí)時(shí)或歷史的數(shù)據(jù)。
- 對于某些情況(監(jiān)控、報(bào)警),當(dāng)它發(fā)生時(shí),我們需要實(shí)時(shí)處理這些數(shù)據(jù)。
- 度量指標(biāo)!我們想要做的不只是尋找度量指標(biāo)并讀出來
- 度量指標(biāo)是為了優(yōu)化現(xiàn)有的度量。
- 即使是事件,我們需要一個(gè)比Elasticsearch API更通用的處理能力。例如,join不同的來源和數(shù)據(jù),或做字符串解析,或自定義聚合。 從這里開始,事情變得非常有趣。你可以花一天(或更多)研究別人是如何建立數(shù)據(jù)管道,了解Lambda,Kappa等數(shù)據(jù)架構(gòu)。實(shí)際上有很多非常好的資料在那里。我們就開門見山:我們達(dá)到的效果,是一個(gè)支持實(shí)時(shí)數(shù)據(jù)流和批處理計(jì)算的處理引擎。在這方面,我們完全支持,有興趣的可以看這里以及這里。
在這里,不同于存儲和攝取,我們從頭建立了自己的處理引擎,- 不是因?yàn)闆]有其他的流處理引擎,而是由于我們看重查詢的性能,我們將在下面的部分單獨(dú)討論。更具體地說,我們建立了一個(gè)流處理引擎,實(shí)現(xiàn)了數(shù)據(jù)流處理模型,計(jì)算表示被表示為一系列操作的有向圖,將輸入轉(zhuǎn)化為輸出的,這些操作包括聚合,窗口,過濾或join。這能很自然的將模型的查詢和計(jì)算組合起來,適合實(shí)時(shí)和批量,且適合分布式運(yùn)行。
當(dāng)然,除非你真的在尋找建立一個(gè)新的項(xiàng)目,然而我們推薦你使用一個(gè)開源的流處理引擎。我們建議你看看Riemann, Spark Streaming或者Apache Flink。
3查詢和計(jì)算
我們使用流處理引擎,基于數(shù)據(jù)流模型的計(jì)算。但用戶如何表達(dá)查詢和創(chuàng)建這樣的數(shù)據(jù)流圖?一個(gè)方法是提供一個(gè)API或嵌入式DSL。該接口將需要提供查詢和篩選數(shù)據(jù)、定義轉(zhuǎn)換和其他處理操作的方法,而且最重要的是,提供一種將多個(gè)處理階段組合并應(yīng)用到流圖的方法。上述每一個(gè)項(xiàng)目都有自己的API,而個(gè)人的偏好可能有所不同,API常見的一個(gè)挑戰(zhàn)是,SQL分析師或Excel用戶無法方便的使用。
一個(gè)可能的解決問題的方案,在這一點(diǎn)上,可以讓這些用戶通過基于這些API構(gòu)建的工具來訪問系統(tǒng)(例如,一個(gè)簡單的web應(yīng)用程序)。
另一種方法是提供一個(gè)簡單的查詢語言。這是我們Jut在做的。因?yàn)槟壳皼]有現(xiàn)有的數(shù)據(jù)流的查詢語言(如SQL之于關(guān)系查詢),我們創(chuàng)建了一個(gè)數(shù)據(jù)流查詢語言稱為Juttle。它的核心,Juttle的流圖查詢語言可以用簡單的語法,聲明處理管道,如上圖所示。
它具有這些原語,search,window,join,aggregation和group-by,語法簡單。當(dāng)然,在處理一個(gè)流程圖數(shù)據(jù)之前,你需要取得到數(shù)據(jù) - Juttle允許您定義查詢獲取數(shù)據(jù),通過事件和/或度量的任何組合,實(shí)時(shí)和/或歷史的,都具有相同的語法和結(jié)構(gòu)。下面是一個(gè)簡單的例子,遵循一個(gè)模式… query | analyze | view (注意鏈接使用管道操作符,語法類似shell)。 ``` read -from :1 day ago: datatype = 'weblog' | reduce -every :minute: count() by status_code | @timechart ```
4拼在一起:一個(gè)異常檢測的例子
到目前為止,我們已經(jīng)采取了一個(gè)組件為中心的視角-我們已經(jīng)討論了組成成分和它們的作用,但沒怎么提到關(guān)于如何將它們組合在一起?,F(xiàn)在我們將視角切換到以數(shù)據(jù)為中心,看看支持實(shí)時(shí)和歷史查詢需要哪些步驟。讓我們使用一個(gè)異常檢測算法的實(shí)例來解說。這是一個(gè)很好的例子,因?yàn)槲覀冃枰樵儦v史數(shù)據(jù)來訓(xùn)練潛在的統(tǒng)計(jì)模型,實(shí)時(shí)流數(shù)據(jù)來測試異常,然后我們需要把結(jié)果寫回系統(tǒng),同時(shí)異常告警。
但是,在我們做任何查詢之前,我們需要串聯(lián)下攝取的整個(gè)過程,傳入的數(shù)據(jù)是如何寫入索引存儲。這是由import服務(wù)完成的,服務(wù)完成了包括寫入時(shí)間序列數(shù)據(jù)庫,將指標(biāo)數(shù)據(jù)和元數(shù)據(jù)存儲在Elasticsearch和Cassandra。
現(xiàn)在一個(gè)用戶來了,啟動了一個(gè)異常檢測的job。這需要讀取歷史數(shù)據(jù),通過任務(wù)處理引擎直接查詢底層數(shù)據(jù)庫來進(jìn)行的。不同的查詢和數(shù)據(jù)可以進(jìn)一步做性能優(yōu)化(下面討論),和/或?qū)嵤┒攘繑?shù)據(jù)庫的讀取路徑(查詢Elasticsearch中的元數(shù)據(jù),獲取Cassandra中的度量值,并結(jié)合結(jié)果產(chǎn)生實(shí)際的度量點(diǎn))。
歷史數(shù)據(jù)涵蓋了一些過去范圍內(nèi)的數(shù)據(jù),處理引擎將歷史數(shù)據(jù)轉(zhuǎn)換成流向圖的實(shí)時(shí)數(shù)據(jù)。為了做到這一點(diǎn),處理引擎直接將數(shù)據(jù)導(dǎo)入import服務(wù)的入口點(diǎn)。請注意,這種切換必須小心,以免數(shù)據(jù)丟棄或者數(shù)據(jù)重復(fù)。
在這一點(diǎn)上,我們有一個(gè)訓(xùn)練有素的異常檢測流圖運(yùn)行在實(shí)時(shí)數(shù)據(jù)上。當(dāng)檢測到異常時(shí),我們希望它將警報(bào)發(fā)送給一些外部的系統(tǒng),這可以通過處理引擎向外部的HTTP服務(wù)POST數(shù)據(jù)。除了發(fā)送警報(bào),我們還希望保持對內(nèi)部系統(tǒng)的跟蹤。換句話說,我們希望能夠?qū)?shù)據(jù)流寫回系統(tǒng)中。從概念上講這是通過處理引擎管道返回?cái)?shù)據(jù)到攝取途徑。
5調(diào)優(yōu)
那么我們已有了一個(gè)攝取數(shù)據(jù)的工作系統(tǒng)的和一些數(shù)據(jù)庫以及處理引擎。我們可以準(zhǔn)備添加前端應(yīng)用程序并分析我們的數(shù)據(jù)了嗎?還沒有! 嗯,我們實(shí)際上可以這樣做,但問題是我們的查詢性能仍然會非常慢。而緩慢的查詢意味著……沒有人會使用我們的系統(tǒng)。
因此,讓我們重新審視一下“統(tǒng)一處理引擎”的概念。按照我們的解釋,它是同一個(gè)系統(tǒng)使用相同結(jié)構(gòu),抽象和查詢來處理歷史或?qū)崟r(shí)的數(shù)據(jù)。 性能挑戰(zhàn)來自于這樣的一個(gè)事實(shí),歷史數(shù)據(jù)比實(shí)時(shí)數(shù)據(jù)要多的多。例如,假設(shè)我們有一百萬點(diǎn)/秒的速度輸入到系統(tǒng),并有一個(gè)是足夠快處理過程,可以在數(shù)據(jù)錄入時(shí)進(jìn)行實(shí)時(shí)查詢?,F(xiàn)在采取相同的查詢語義查詢過去一天的數(shù)據(jù) 。
這將需要一次性處理數(shù)百億點(diǎn)(或者,至少,必須能跟的上從存儲點(diǎn)讀取的速度)。假設(shè)計(jì)算是分布式的,我們可以通過增加計(jì)算節(jié)點(diǎn)來解決,但在***的情況下,這將是低效和昂貴的。 所以這就是優(yōu)化的所在。有許多方法可以優(yōu)化數(shù)據(jù)查詢。其中一些包括對查詢本身進(jìn)行轉(zhuǎn)換 。例如,上游數(shù)據(jù)的filters或aggregations盡可能不改變查詢語義。我們說的這種優(yōu)化,是將數(shù)據(jù)的filter和處理盡量由數(shù)據(jù)庫去做。這需要做以下的:
- 自動識別可以由數(shù)據(jù)庫處理查詢的部分
- 將對應(yīng)的部分轉(zhuǎn)換成目標(biāo)數(shù)據(jù)庫的查詢語言
- 運(yùn)行后端查詢并將結(jié)果注入到數(shù)據(jù)流圖的正確位置
6結(jié)語
我們做到了!當(dāng)然,如果不需要一個(gè)可視化層,我們就完成了。只能通過API來查詢系統(tǒng)。建立一個(gè)客戶端應(yīng)用程序來創(chuàng)建查詢,流和可視化數(shù)據(jù),組合儀表板是另外一個(gè)棘手的問題,所以我們將改天討論這個(gè)。
現(xiàn)在,讓我們來總結(jié)一下我們在建設(shè)這個(gè)數(shù)據(jù)中心過程中的所見所聞:
- 一個(gè)攝取途徑,可以接受不同來源的輸入數(shù)據(jù),并將其轉(zhuǎn)換為統(tǒng)一的格式,并儲存起來供以后消費(fèi)。(在Jut,這是基于Kafka建立的)。
- 事件和度量的數(shù)據(jù)庫。在Jut,Events使用Elasticsearch,自己構(gòu)建的度量數(shù)據(jù)庫則基于Cassandra。
- 一個(gè)處理引擎(或是兩個(gè),如果你要用lambda ISH架構(gòu))。
- 在系統(tǒng)上運(yùn)行查詢的API或查詢語言。 唷。建立這套系統(tǒng),是一個(gè)漫長而有趣的旅程。即便你要建立你自己的系統(tǒng),可以先試試Jut。你可能會覺得很好用。
當(dāng)前名稱:使用Elasticsearch、Kafka和Cassandra構(gòu)建流式數(shù)據(jù)中心
瀏覽路徑:http://m.fisionsoft.com.cn/article/djojhic.html


咨詢
建站咨詢
