新聞中心
作者丨Shaolang Ai

譯者 | 楊曉娟
用Chronicle Queue構(gòu)建的應(yīng)用程序不會(huì)讓生產(chǎn)者放慢將消息放入隊(duì)列的速度(沒(méi)有背壓機(jī)制)。
Chronicle Queue(編年史隊(duì)列)是低延遲、無(wú)代理、持久的消息隊(duì)列。
與其最相近的是0MQ,但0MQ不存儲(chǔ)發(fā)布的消息。Chronicle Queue的開(kāi)源版本不支持跨機(jī)器通信。Chronicle Queue最與眾不同之處在于它使用RandomAccessFile做堆外存儲(chǔ)因而不會(huì)產(chǎn)生垃圾。
Chronicle Queue是以生產(chǎn)者為中心的,也就是說(shuō),用它構(gòu)建的應(yīng)用程序不會(huì)讓生產(chǎn)者放慢將消息放入隊(duì)列的速度(沒(méi)有背壓機(jī)制)。這種設(shè)計(jì)在對(duì)生產(chǎn)者的生產(chǎn)能力幾乎不可控的情況下非常有用,例如外匯價(jià)格更新。
術(shù)語(yǔ)
大多數(shù)消息隊(duì)列使用術(shù)語(yǔ)Producer(生產(chǎn)者)和Consumer(消費(fèi)者),Chronicle Queue使用Appender(附加器)和Tailer(零售商),用于區(qū)分它總是將消息附加到隊(duì)列中,并且零售商從隊(duì)列中讀取消息之后,從不“銷(xiāo)毀/丟棄”任何消息。與Message(消息)相比,Chronicle Queue更喜歡使用術(shù)語(yǔ)Excerpt(摘錄),因?yàn)閷?xiě)入Chronicle Queue的blob可以是字節(jié)數(shù)組、字符串以及域模型。
Hello, World!
我們用傳統(tǒng)的“Hello, World!”來(lái)演示基本用法。如果您使用的是Gradle,將以下內(nèi)容添加到build.gradle.kts:
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile // line 1
plugins {
id("org.jetbrains.kotlin.jvm") version "1.3.71"
application
}
repositories {
mavenCentral()
mavenLocal()
}
dependencies {
implementation("org.jetbrains.kotlin:kotlin-bom")
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
implementation("net.openhft.chronicle:chronicle-queue:5.19.8") // line 17
implementation("org.apache.logging.log4j:log4j-sl4fj18-impl:2.13.1")
}
application {
mainClass = "hello.AppKt"
}
tasks.withType{ // line 25
kotlinOptions.jvmTarget = "1.8"
}
導(dǎo)入KotlinCompile(第1行)允許將Java 1.8指定為編譯目標(biāo)(第25-27行)。第17-18行顯示了開(kāi)始使用Chronicle Queue所需的其它依賴(lài)項(xiàng)。請(qǐng)注意build.gradle.kts假定要使用的包是hello。接下來(lái)看看演示Chronicle Queue用法的代碼:
package hello
import net.openhft.chronicle.queue.ChronicleQueue
fun main(args: Array) {
val q: ChronicleQueue = ChronicleQueue.single("./build/hello-world")
try {
val appender: ExcerptAppender = q.acquireAppender()
appender.writeText("Hello, World!")
val tailer: ExcerptTailer = q.createTailer()
println(tailer.readText())
} finally {
q.close()
}
}
ChronicleQueue.single()返回一個(gè)新建的使用給定的路徑存儲(chǔ)摘錄的ChronicleQueue。其余的代碼幾乎是不言自明的:獲得的appender把摘錄“Hello, World!”追加到排隊(duì)中;tailer從隊(duì)列中讀取并將摘錄打印到標(biāo)準(zhǔn)輸出。程序結(jié)束時(shí)一定要關(guān)閉隊(duì)列。
還記得Chronicle Queue是持久的嗎?注釋掉兩個(gè)appender行,然后再用gradle run執(zhí)行程序。您將看到程序還是在標(biāo)準(zhǔn)輸出上打印了Hello, World!:tailer讀取的是上次運(yùn)行時(shí)寫(xiě)入到隊(duì)列中的數(shù)據(jù)。它的持久性允許在tailers崩潰時(shí)重放收到的摘錄。
便道:摘錄類(lèi)型
Chronicle Queue僅接受以下類(lèi)型的摘要:
1. Serializable對(duì)象:請(qǐng)注意,由于依賴(lài)于反射,序列化類(lèi)對(duì)象的效率很低
2. Externalizable對(duì)象:如果與Java的兼容性很重要,但以犧牲手寫(xiě)邏輯為代價(jià)
3. net.openhft.chronicle.wire.Marshallable對(duì)象:使用二進(jìn)制格式的高性能數(shù)據(jù)交換
4. net.openhft.chronicle.bytes.BytesMarshallable對(duì)象:底層二進(jìn)制或文本編碼
“Hello, World!”演示了字符串,我們順便看一個(gè)使用Chronicle Wire庫(kù)中Marshallable的例子。
package types
import net.openhft.chronicle.wire.Marshallable
import net.openhft.chronicle.wire.SelfDescribingMarshallable
class Person(val name: String, val age: Int): SelfDescribingMarshallable()
fun main(args: Array) {
val person = Person("Shaolang", 3)
val outputString = """
!types.Person {
name: Shaolang
age: 3
}
""".trimIndent()
println(person.toString() == outputString)
val p = Marshallable.fromString(outputString)
println(person == p)
println(person.hashCode() == p.hashCode())
}
運(yùn)行上面的代碼片段會(huì)看到標(biāo)準(zhǔn)輸出上打印了三個(gè)true。SelfDescribtingMarshallable可以輕松持久化Chronicle Queue 中的Marshallable類(lèi)。
寫(xiě)入和讀取域?qū)ο?/h2>
有了從上面小便道得來(lái)的經(jīng)驗(yàn),下面將演示向Chronicle Queue寫(xiě)入和讀取Marshallable對(duì)象:
package docs
import net.openhft.chronicle.queue.ChronicleQueue
import net.openhft.chronicle.wire.SelfDescribingMarshallable
class Person(var name: String? = null, var age: Int? = null): SelfDescribingMarshallable()
class Food(var name: String? = null): SelfDescribingMarshallable()
fun main(args: Array) {
ChronicleQueue.single("./build/documents").use { q ->
val appender = q.acquireAppender()
appender.writeDocument(Person("Shaolang", 3))
appender.writeText("Hello, World!")
appender.writeDocument(Food("Burger"))
val tailer = q.createTailer()
val person = Person()
tailer.readDocument(person)
println(person)
println("${tailer.readText()}\n")
val food = Food()
tailer.readDocument(food)
println(food)
}
}
盡管在不同的VM進(jìn)程中運(yùn)行appender和tailer會(huì)更有意義,但將兩者保持在同一個(gè)VM中可以更容易理解討論,不必篩選無(wú)關(guān)的代碼。運(yùn)行上面的代碼會(huì)看到如下輸出:
!docs.Person {
name: Shaolang,
age: 3
}
Hello, World!
!docs.Food {
name: Burger,
}有幾點(diǎn)需要注意:
1. 由于Chronicle Queue的目標(biāo)是不產(chǎn)生垃圾,因而要求域模型是可變對(duì)象;這就是為什么兩個(gè)類(lèi)在構(gòu)造器中使用var而不是val。
2. Chronicle Queue允許appender將不同的內(nèi)容寫(xiě)入同一隊(duì)列。
3. tailer需要知道它應(yīng)該讀什么才能得到正確的結(jié)果。
如果我們把最后一個(gè)tailer.readDocument(food)改成tailer.readDocument(person)然后打印person,將看到以下打印內(nèi)容(至少在Chronicle Queue 5.19.x中,它不會(huì)崩潰/拋出任何異常):
!docs.Person {
name: Burger,
age: !!null ""
}因?yàn)镻erson和Food有一個(gè)同名的屬性,Chronicle Queue會(huì)盡可能匹配Person,不能匹配的置為空。
上面注意事項(xiàng)中的最后一點(diǎn)“關(guān)于tailer需要知道他們?cè)谧x什么”會(huì)有點(diǎn)麻煩:它們(tailer)現(xiàn)在背負(fù)著過(guò)濾的重?fù)?dān),要從生產(chǎn)者不斷扔來(lái)的雪崩一樣的數(shù)據(jù)中獲得它們想要的信息。為了保持代碼庫(kù)穩(wěn)健,我們需要使用觀察者模式.
(有點(diǎn))只聽(tīng)感興趣的東西
除了直接使用摘錄附加器,另一種方法是使它具體化為傳給methodWriter方法的第一類(lèi)。下面的片段重點(diǎn)介紹指定偵聽(tīng)器的具體化:
package listener
import net.openhft.chronicle.queue.ChronicleQueue
import net.openhft.chronicle.queue.ChronicleReaderMain
import net.openhft.chronicle.wire.SelfDescribingMarshallable
class Person(var name: String? = null, var age: Int? = null): SelfDescribingMarshallable()
interface PersonListener {
fun onPerson(person: Person)
}
fun main(args: Array) {
val directory = "./build/listener"
ChronicleQueue.single(directory).use { q ->
val observable: PersonListener = q.acquireAppender()
.methodWriter(PersonListener::class.java)
observable.onPerson(Person("Shaolang", 3))
observable.onPerson(Person("Elliot", 4))
}
ChronicleReaderMain.main(arrayOf("-d", directory))
}
第17-18行用指定的PersonListener調(diào)用methodWriter獲得附加器。請(qǐng)注意,賦予observable的類(lèi)型是PersonListener,不是ExcerptAppender。現(xiàn)在,任何對(duì)PersonListener的方法調(diào)用都會(huì)把給定的參數(shù)寫(xiě)入隊(duì)列。但是,直接使用附加器寫(xiě)入隊(duì)列和使用具體化的類(lèi)寫(xiě)入隊(duì)列是有區(qū)別的。為了看出區(qū)別,我們使用ChronicleReaderMain檢驗(yàn)隊(duì)列:
0x47c900000000:
onPerson {
name: Shaolang,
age: 3
}
0x47c900000001:
onPerson {
name: Elliot,
age: 4
}
注意,具體化類(lèi)寫(xiě)入隊(duì)列的摘錄用的是onPerson { ...} 而不是!listener.Person { ... }。 這種差異允許實(shí)現(xiàn)了PersonListener的tailer收到寫(xiě)入隊(duì)列的新Person對(duì)象的通知并忽略它們不感興趣的對(duì)象。
是的,你沒(méi)看錯(cuò):實(shí)現(xiàn)了PersonListener的tailer。不幸的是,Chronicle Queue(有點(diǎn))將被觀察者和觀察者混為一談,因此很難區(qū)分它們。我認(rèn)為區(qū)分差異的最簡(jiǎn)單方法是使用以下片段注釋中所示的啟發(fā)式方法:
interface PersonListener {
onPerson(person: Person)
}
// this is an observer because it implements the listener interface
class PersonRegistry: PersonListener {
override fun onPerson(person: Person) {
// code omitted for brevity
}
}
fun main(args: Array) {
// code omitted for brevity
val observable: PersonListener = q.acquireAppender() // this is an
.methodWriter(PersonListener::class.java) // observable
// another way to differentiate: the observer will never call the
// listener method, only observables do.
observable.onPerson(Person("Shaolang", 3))
// code omitted for brevity
} 再來(lái)看一下tailer。盡管Chronicle Queue確保每個(gè)tailer能看到每一條摘錄,通過(guò)實(shí)現(xiàn)偵聽(tīng)器類(lèi)/接口并用已實(shí)現(xiàn)的偵聽(tīng)器創(chuàng)建net.openhft.chronicle.bytes.MethodReader, tailer可以?xún)H過(guò)濾出它想看到的摘錄:
package listener
import net.openhft.chronicle.bytes.MethodReader
import net.openhft.chronicle.queue.ChronicleQueue
import net.openhft.chronicle.wire.SelfDescribingMarshallable
class Person(var name: String? = null, var age: Int? = null): SelfDescribingMarshallable()
class Food(var name: String? = null): SelfDescribingMarshallable()
interface PersonListener {
fun onPerson(person: Person)
}
class PersonRegistry: PersonListener {
override fun onPerson(person: Person) {
println("in registry: ${person.name}")
}
}
fun main(args: Array) {
ChronicleQueue.single("./build/listener2").use { q ->
val appender = q.acquireAppender()
val writer: PersonListener = appender.methodWriter(PersonListener::class.java)
writer.onPerson(Person("Shaolang", 3))
appender.writeDocument(Food("Burger"))
writer.onPerson(Person("Elliot", 4))
val registry: PersonRegistry = PersonRegistry()
val reader: MethodReader = q.createTailer().methodReader(registry)
reader.readOne()
reader.readOne()
reader.readOne()
}
}
這里的主要新內(nèi)容是PersonRegistry的實(shí)現(xiàn),它簡(jiǎn)單地打印出所給的person的name。 代碼片段并沒(méi)直接用ExcerptTailer從隊(duì)列中讀取而是用給定的PersonRegistry由tailer創(chuàng)建了一個(gè)MethodReader。
.methodWriter接受Class參數(shù),而.methodReader接受的是對(duì)象。appender向隊(duì)列寫(xiě)入三個(gè)摘錄:person(通過(guò)調(diào)用onPerson)、food(通過(guò).writeDocument)和person。 因?yàn)閠ailer可以看到每一個(gè)摘錄,所以閱讀者也會(huì)調(diào)用三次“讀取”所有摘錄,但卻只會(huì)看到兩個(gè)輸出:
in registry:Shaolang
in registry:Elliot
如果代碼片段只有兩個(gè).readOne()調(diào)用而不是三個(gè),那么輸出中就不會(huì)包含in registry:Elliot.
MethodReader使用鴨子類(lèi)型
還記得我們檢驗(yàn)由具體化的PersonListener填充隊(duì)列時(shí)ChronicleReaderMain的輸出嗎?輸出的不是類(lèi)名而是類(lèi)似于onPerson { ... }。這表明MethodReader過(guò)濾與方法簽名匹配的摘錄,即它不關(guān)心包含方法簽名的接口/類(lèi);或者簡(jiǎn)單地說(shuō),鴨子類(lèi)型:
package duck
import net.openhft.chronicle.queue.ChronicleQueue
import net.openhft.chronicle.wire.SelfDescribingMarshallable
class Person(var name: String? = null, var age: Int? = null): SelfDescribingMarshallabl()
interface PersonListener {
fun onPerson(person: Person)
}
interface VIPListener {
fun onPerson(person: Person)
}
class VIPClub: VIPListener {
override fun onPerson(person: Person) {
println("Welcome to the club, ${person.name}!")
}
}
fun main(args: Array) {
ChronicleQueue.single("./build/duck").use { q ->
val writer = q.acquireAppender().methodWriter(PersonListener::class.java)
writer.onPerson(Person("Shaolang", 3))
val club = VIPClub()
val reader = q.createTailer().methodReader(club)
reader.readOne()
}
}
注意,VIPClub實(shí)現(xiàn)了VIPListener,碰巧與PersonListener有相同的onPerson方法簽名。運(yùn)行上面的代碼,你會(huì)看到打印的Welcome to the club, Shaolang!
命名tailer
到目前為止,在所有的演示中,我們一直創(chuàng)建的都是匿名的tailer。因?yàn)樗鼈兪悄涿模悦看危ㄖ匦拢┻\(yùn)行都會(huì)讀取隊(duì)列中的所有摘錄。有時(shí),這樣的行為是可接受的,甚至是可取的,但有時(shí)卻不是。只需命名tailer就可以從上次停止的位置繼續(xù)讀取:
package restartable
import net.openhft.chronicle.queue.ChronicleQueue
import net.openhft.chronicle.queue.ExcerptTailer
fun readQueue(tailerName: String, times: Int) {
ChronicleQueue.single("./build/restartable").use { q ->
val tailer = q.createTailer(tailerName) // tailer name given
for (_n in 1..times) {
println("$tailerName: ${tailer.readText()}")
}
println() // to separate outputs for easier visualization
}
}
fun main(args: Array) {
ChronicleQueue.single("./build/restartable").use { q ->
val appender = q.acquireAppender()
appender.writeText("Test Message 1")
appender.writeText("Test Message 2")
appender.writeText("Test Message 3")
appender.writeText("Test Message 4")
}
readQueue("foo", 1)
readQueue("bar", 2)
readQueue("foo", 3)
readQueue("bar", 1)
}
注意,tailer的名字是通過(guò)createTailer方法指定的。上面的代碼中有兩個(gè)tailer(命名為foo和bar)讀取隊(duì)列并在運(yùn)行時(shí)輸出以下內(nèi)容:
foo: Test Message 1
bar: Test Message 1
bar: Test Message 2
foo: Test Message 2
foo: Test Message 3
foo: Test Message 4
bar: Test Message 3
注意,foo和bar第二次從隊(duì)列中讀取數(shù)據(jù)時(shí),會(huì)從之前斷開(kāi)的位置開(kāi)始。
滾動(dòng)文件
Chronicle Queue根據(jù)創(chuàng)建隊(duì)列時(shí)定義的滾動(dòng)周期滾動(dòng)使用的文件;默認(rèn)情況下,每天滾動(dòng)文件。要改變滾動(dòng)周期,就不能使用簡(jiǎn)單的ChronicleQueue.single方法:
package roll
import net.openhft.chronicle.queue.ChronicleQueue
import net.openhft.chronicle.queue.RollCycles
import net.openhft.chronicle.impl.single.SingleChronicleQueueBuilder
fun main(args: Array) {
var qbuilder: SingleChronicleQueueBuilder = ChronicleQueue.singleBuilder("./build/roll")
qbuilder.rollCycle(RollCycles.HOURLY)
val q: ChronicleQueue = qbuilder.build()
// code omitted for brevity
}
首先,得到一個(gè)SingleChronicleQueueBuilder實(shí)例,并通過(guò).rollCycle方法設(shè)置滾動(dòng)周期。 上面的代碼段將隊(duì)列配置為每小時(shí)滾動(dòng)一次文件。配置好后,調(diào)用構(gòu)造器的.build()獲取ChronicleQueue實(shí)例。請(qǐng)注意,appender和tailer(s)在訪問(wèn)同一個(gè)隊(duì)列時(shí)必須使用相同的滾動(dòng)周期。
由于SingleChronicleQueueBuilder支持流式接口,代碼也可以做如下簡(jiǎn)化:
val q: ChronicleQueue = ChronicleQueue.singleBuilder("./build/roll")
.rollCycle(RollCycles.HOURLY)
.build()
接下來(lái)
這篇文章介紹了Chronicle Queue的術(shù)語(yǔ)和基礎(chǔ)知識(shí)。以下網(wǎng)站有更多信息可供挖掘:
1. Chronicle Queue GitHub repository
2. Stack Overflow tagged questions
3. Peter Lawre's Blog
網(wǎng)站欄目:ChronicleQueue入門(mén)
本文來(lái)源:http://m.fisionsoft.com.cn/article/dhehedo.html


咨詢(xún)
建站咨詢(xún)
