新聞中心
本文是Scala代碼實例之Kestrel的第五部分,繼續(xù)講述PersistentQueue處理消息隊列并發(fā)請求的方式。

成都創(chuàng)新互聯(lián)公司是一家集網(wǎng)站建設,景寧畬族自治企業(yè)網(wǎng)站建設,景寧畬族自治品牌網(wǎng)站建設,網(wǎng)站定制,景寧畬族自治網(wǎng)站建設報價,網(wǎng)絡營銷,網(wǎng)絡優(yōu)化,景寧畬族自治網(wǎng)站推廣為一體的創(chuàng)新建站企業(yè),幫助傳統(tǒng)企業(yè)提升企業(yè)形象加強企業(yè)競爭力??沙浞譂M足這一群體相比中小企業(yè)更為豐富、高端、多元的互聯(lián)網(wǎng)需求。同時我們時刻保持專業(yè)、時尚、前沿,時刻以成就客戶成長自我,堅持不斷學習、思考、沉淀、凈化自己,讓我們?yōu)楦嗟钠髽I(yè)打造出實用型網(wǎng)站。
回顧一下之前我們讀過的兩個文件,Kestrel.scala, QueueCollection.scala。Kestrel.scala是啟動文件,并且通過一個actor,保持整個項目不會因為沒有線程運行而退出,同時注冊了一個acceptor,當建立起新的鏈接的時候,訪問 KestrelHandler.scala(這個稍后我們再讀)。QueueCollection.scala,維護一個PersistentQueue的隊列,如果訪問的queue_name不存在,則創(chuàng)建一個,如果存在,就對相應的QueueCollection進行操作。如果留心的話,我們還可以看到QueueCollection在啟動的時候,queue_name的來源是一個文件目錄。
我們就從這個入口繼續(xù)往下,看看PersistentQueue是如何處理消息隊列的并發(fā)請求的:
在前幾篇文章里面,我們曾經(jīng)提到過PersistentQueue有兩個“類”,一個是object PersistentQueue,一個是class PersistentQueue。而object在scala是一個單例模式,也就是singleton。也可以看做是只有static類型的java類。現(xiàn)在讓我們關注一下,看看class PersistentQueue和object Persistent之間的關系是怎樣的。
剛開始的一段代碼有點嚇人:
- class OverlaySetting[T](base: => T) {
- @volatile private var local: Option[T] = None
- def set(value: Option[T]) = local = value
- def apply() = local.getOrElse(base)
- }
我們先跳過去,直接往下看,看到這里:
- def overlay[T](base: => T) = new OverlaySetting(base)
- // attempting to add an item after the queue reaches this size (in items) will fail.
- val maxItems = overlay(PersistentQueue.maxItems)
- // attempting to add an item after the queue reaches this size (in bytes) will fail.
- val maxSize = overlay(PersistentQueue.maxSize)
- ……
如果我們不細究overlay的內(nèi)容,這段代碼其實就是把object PersisitentQueue中的變量賦值給class PersistentQueue中,那么overlay究竟做了什么呢?其實,overlay是將變量做了一個封裝,封裝在一個叫做OverlaySetting的類里面。這個類,根據(jù)我們之前對scala語法的了解,可以知道,它是一個OverlaySetting[T]的類,并且在創(chuàng)建的時候,需要帶入方法,方法沒有參數(shù),但是有一個返回值,類型就是T。(關于class類的語法規(guī)則,可以參考http://programming-scala.labs.oreilly.com/ch05.html#Constructors,不過里面的例子比OverlaySetting還復雜……-_-|||)
這個類在每次創(chuàng)建對象的時候,都會被賦值。我們也看到只有在使用apply方法的時候才會被調(diào)用(不過我沒有太想明白,如何通過函數(shù)的返回值來確定模板中的類型T,也許這就是Scala這種更加靈活的編譯算法,可以在new對象的時候,通過審查變量類型來獲取T的吧,畢竟Scala是一個靜態(tài)語言,如果是動態(tài)語言就不太成為一個問題了)。
這里面還存在一個Scala概念,就是方法=變量。當然在很多動態(tài)語言里面就已經(jīng)這么做了。在Scala里面,我們可以把def看作是val的一種特殊寫法,def聲明的方法,也可以用 def func_name() = {} 這樣的語法規(guī)則,跟val基本就是一回事了。當然,這一改變在Scala里面并不簡單是一個語法規(guī)則的問題,更進一步的,所有的變量也都是類,所以我們可以把一個變量,看做一個類,也可以看做類的建構(gòu)函數(shù),返回的就是類本身……有點繞,不過這樣理解,就比較好理解為什么可以用常量,當作沒有參數(shù)的方法調(diào)用了。
說了那么多,結(jié)論很簡單,maxSize是一個OverlaySetting[LONG]的類,如果maxSize沒有設置過,那么返回的就是object PersistentQueue里面的maxSize。LONG類型。
在主程序體里面,我們看到了Journal類,然后是調(diào)用 configure 方法,這個方法印證了我們的對OverlaySetting的解釋,它從配置文件里面把參數(shù)都讀出來賦值給class PersistentQueue里面的那些常量,用的是set。這里是一個Scala的語法細節(jié),它省略了一些不必要的”.”和”()”。
休息一下。我們開始討論在PersistentQueue里面的Actor
……
休息完畢
Scala中,消息傳遞的方式有一個特殊的語法結(jié)構(gòu):“Object ! MessageType” 就好像在源代碼里面出現(xiàn)的:“w.actor ! ItemArrived?!保P于Scala的Actor,詳細的語法說明在http://programming-scala.labs.oreilly.com/ch09.html可以看到,建議先看一下,好對actor有一個比較深入的了解)
我們發(fā)現(xiàn)PersistentQueue中Actor的實現(xiàn),跟語法說明里面的很不一樣,在語法說明里面的Actor都是作為一個獨立的線程出現(xiàn)的,而在PersistentQueue中,你甚至看不見一個對Actor的重載,但我們可以發(fā)現(xiàn)與Actor相關的幾個地方,一個是Waiter的定義,它是一個case class,并且有一個成員變量叫做actor,類型是Actor:
- private case class Waiter(actor: Actor)
- ……
- private val waiters = new mutable.ArrayBuffer[Waiter]
- ……
- val w = Waiter(Actor.self)
- waiters += w
- ……
需要注意:之前我們提過一個Scala的語法規(guī)則,那就是類后面的建構(gòu)函數(shù)的參數(shù),就是類中的成員變量?。ú贿^這是在解釋,為什么在建構(gòu)函數(shù)里面會有private關鍵字時提到的……)所以,我們知道了一點,就是每一個Waiter內(nèi)部都有一個actor,這些actor通過Actor.self共享了一個線程,當然也和其他的PersistentQueue共享了一個Actor。這是有點讓人不習慣,因為這么要緊的一個線程的創(chuàng)建,竟然可以出現(xiàn)得那么隱蔽。甚至連一個大括號都沒有。
接下來,我們來看看Actor是怎么在PersistentQueue里面工作了——這有點難,因為它的機制有點復雜,不是簡單的象語法說明里面的那樣,是一個完整的獨立的函數(shù),而是在一些函數(shù)中,突然切入進來,分享了Actor.self的一部分線程資源,就像下面代碼一樣:
- ……
- f operateReact(op: => Option[QItem], timeoutAbsolute: Long)(f: Option[QItem] => Unit): Unit = {
- operateOrWait(op, timeoutAbsolute) match {
- case (item, None) =>
- f(item)
- case (None, Some(w)) =>
- Actor.self.reactWithin((timeoutAbsolute - Time.now) max 0) {
- case ItemArrived => operateReact(op, timeoutAbsolute)(f)
- case TIMEOUT => synchronized {
- waiters -= w
- // race: someone could have done an add() between the timeout and grabbing the lock.
- Actor.self.reactWithin(0) {
- case ItemArrived => f(op)
- case TIMEOUT => f(op)
- }
- }
- }
- case _ => throw new RuntimeException()
- }
- ……
其中:
- Actor.self.reactWithin(0) {
- case ItemArrived => f(op)
- case TIMEOUT => f(op)
- }
就是Actor的一個語法,在一段時間里面等待消息,如果有消息就如何……,如果沒有消息(TIMEOUT),就如何……。但是在整個函數(shù)里面套用了兩層 Actor.self.reactWithin,有點讓人要暈菜的感覺,再加上之前有一個match…case的結(jié)構(gòu),調(diào)用了operateOrWait(op, timeoutAbsolute)方法。要了解整個消息處理的機制,就需要把這三個部分聯(lián)系起來看了。
先簡單看一下operateOrWait函數(shù),比較容易理解:
- private def operateOrWait(op: => Option[QItem], timeoutAbsolute: Long): (Option[QItem], Option[Waiter]) = synchronized {
- val item = op
- if (!item.isDefined && !closed && !paused && timeoutAbsolute > 0) {
- val w = Waiter(Actor.self)
- waiters += w
- (None, Some(w))
- } else {
- (item, None)
- }
- }
返回值是一個map,包括兩個被Option封裝的類型QItem和Waiter,從QItem.scala中可以知道(代碼很簡單),QItem就是把原始數(shù)據(jù)打了一個包,而Waiter之前我們也已經(jīng)說過了。程序體中的判斷是這樣的:如果item,也就是op這個參數(shù)沒有定義,并且PersistentQueue也沒有停止,關閉,而且處理時間AbsoluteTime不是0,那么就創(chuàng)建一個Waiter,返回(None, Some[Waiter]);如果不滿足這些條件,那么就直接返回(op, None)。簡單的說,就是如果系統(tǒng)還能等,就讓他等待正常一段時間然后操作,如果不能等,就直接返回操作指令。返回值只有兩種類型。
然后再看operateReact,如果返回的是時間參數(shù)是None(詳細的可以參考 actor .. case 的語法,地址是:http://programming-scala.labs.oreilly.com/ch03.html#MatchingOnCaseClasses),那么就直接執(zhí)行f(op)的函數(shù),把op這個方法,作為參數(shù)傳遞給f函數(shù)。如果返回的是一個時間戳,Some(w),那么我們就等待AbsoluteTime 到 Time.now()這段時間,如果在這段事件里面有ItemArrived事件發(fā)生,那么就處理一下,直到Time.now 等于或者大于 AbsoluteTime,那就會得到一個TIMEOUT,然后就退出了。(有一個異常的情況,需要清空一下事件隊列,通過reactWithin(0){})
這么理解這段actor還是不太清晰,那么讓我們回到上一層的調(diào)用。看看這個f(op)到底是什么,然后我們看到了:
- def removeReact(timeoutAbsolute: Long, transaction: Boolean)(f: Option[QItem] => Unit): Unit = {
- operateReact(remove(transaction), timeoutAbsolute)(f)
- }
我們就知道op其實是一個remove的操作,并且返回remove得到的QItem對象。再往上一層到QueueCollection,我們看到:
- q.removeReact(if (timeout == 0) timeout else Time.now + timeout, transaction) {
- case None =>
- queueMisses.incr
- f(None)
- case Some(item) =>
- queueHits.incr
- f(Some(item))
- }
f方法的操作,如果之前的remove返回的是一個None,則記錄queueMess(未命中)添加1,如果返回的是一個QItem的值,那么就記錄queueHits(命中)添加1,并且,對這個QItem進行操作(注意:這里的f是QueueCollection中remove帶入的那個方法,而不是前面提到的removeReact里面提到的f。
從QueueCollection的remove調(diào)用到***層PersistentQueue的operateReact調(diào)用,我們大致可以了解這么曲折的調(diào)用關系解決了一個什么問題——從消息隊列里面獲取QItem。
回顧一下QueueCollection其他的代碼,我們發(fā)現(xiàn),只有waiter.size > 0的時候,有新的QItem添加,才會發(fā)出ItemArrived事件。也就是說,只有有一個獲取消息隊列的進程存在的時候,才會觸發(fā)ItemArrived事件。獲取消息隊列,則通過使用reactWithin,允許在一個規(guī)定的時間內(nèi),連續(xù)處理一系列的ItemArrived事件。看QueueCollection的remove方法,我們還可以知道,當啟動q.removeReact之前,首先會調(diào)用q.peek來檢查,隊列是不是為空,如果不是空的話,就直接返回隊列里面最前面的那個元素。所以我們可以把這個消息隊列理解成——如果消息隊列為空的情況下,讓獲取消息隊列的Client等待一段時間的機制,以降低反復進行SOCKET連接帶來的不必要的耗損。
這個機制,可以讓我們比較好地理解,為什么Kestrel提示說,如果運行多個獨立的進程來處理消息隊列的時候,會讓這個消息隊列的處理變成一個缺乏時序,但是處理并發(fā)能力很強的集群。每個連接對應的是一個Waiter,但是當ItemArrived觸發(fā)的時候,只可能有其中的一個reactWithin得到了這個事件,發(fā)送給對應的那個線程處理這個消息。
我現(xiàn)在手上的是Kestrel-1.1.2版本的代碼,走讀這部分代碼的時候,其實發(fā)現(xiàn)作者在寫這段代碼的時候,多了一些冗余的內(nèi)容——比如說removeReceive方法,從而看出作者在使用Scala的特性中,也是逐步地把代碼優(yōu)化成如今的樣子。畢竟Scala和Java之間的差別很大,如果做到Type Less, Do More。是需要一個逐步積累的過程,誰都不是天生就能把Scala寫得很好的,更何況是需要性能非常高的時候。
【相關閱讀】
- 從Java走進Scala(Scala經(jīng)典讀物)
- A Scala Tutorial for Java programmers
- 專題:Scala編程語言
- 從Scala看canEqual與正確的的equals實現(xiàn)
- Scala快速入門:從下載安裝到定義方法
分享標題:細說Kestrel.scala中的PersistentQueue
文章源于:http://m.fisionsoft.com.cn/article/djjojhs.html


咨詢
建站咨詢
