新聞中心
Java中有哪些隊(duì)列

創(chuàng)新互聯(lián)是一家專(zhuān)注于成都網(wǎng)站制作、做網(wǎng)站、外貿(mào)營(yíng)銷(xiāo)網(wǎng)站建設(shè)與策劃設(shè)計(jì),通城網(wǎng)站建設(shè)哪家好?創(chuàng)新互聯(lián)做網(wǎng)站,專(zhuān)注于網(wǎng)站建設(shè)十載,網(wǎng)設(shè)計(jì)領(lǐng)域的專(zhuān)業(yè)建站公司;建站業(yè)務(wù)涵蓋:通城等地區(qū)。通城做網(wǎng)站價(jià)格咨詢(xún):13518219792
- ArrayBlockingQueue 使用ReentrantLock
- LinkedBlockingQueue 使用ReentrantLock
- ConcurrentLinkedQueue 使用CAS
- 等等
我們清楚使用鎖的性能比較低,盡量使用無(wú)鎖設(shè)計(jì)。接下來(lái)就我們來(lái)認(rèn)識(shí)下Disruptor。
Disruptor簡(jiǎn)單使用
github地址:https://github.com/LMAX-Exchange/disruptor/wiki/Performance-Results
先簡(jiǎn)單介紹下:
- Disruptor它是一個(gè)開(kāi)源的并發(fā)框架,并獲得2011 Duke’s程序框架創(chuàng)新獎(jiǎng)【Oracle】,能夠在無(wú)鎖的情況下實(shí)現(xiàn)網(wǎng)絡(luò)的Queue并發(fā)操作。英國(guó)外匯交易公司LMAX開(kāi)發(fā)的一個(gè)高性能隊(duì)列,號(hào)稱(chēng)單線(xiàn)程能支撐每秒600萬(wàn)訂單~
- 日志框架Log4j2 異步模式采用了Disruptor來(lái)處理
- 局限呢,他就是個(gè)內(nèi)存隊(duì)列,也就是說(shuō)無(wú)法支撐分布式場(chǎng)景。
簡(jiǎn)單使用
數(shù)據(jù)傳輸對(duì)象
@Data
public class EventData {
private Long value;
}
消費(fèi)者
public class EventConsumer implements WorkHandler{
/**
* 消費(fèi)回調(diào)
* @param eventData
* @throws Exception
*/
@Override
public void onEvent(EventData eventData) throws Exception {
Thread.sleep(5000);
System.out.println(Thread.currentThread() + ", eventData:" + eventData.getValue());
}
}
生產(chǎn)者
public class EventProducer {
private final RingBuffer ringBuffer;
public EventProducer(RingBuffer ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void sendData(Long v){
// cas展位
long next = ringBuffer.next();
try {
EventData eventData = ringBuffer.get(next);
eventData.setValue(v);
} finally {
// 通知等待的消費(fèi)者
System.out.println("EventProducer send success, sequence:"+next);
ringBuffer.publish(next);
}
}
} 測(cè)試類(lèi)
public class DisruptorTest {
public static void main(String[] args) {
// 2的n次方
int bufferSize = 8;
Disruptor disruptor = new Disruptor(
() -> new EventData(), // 事件工廠(chǎng)
bufferSize, // 環(huán)形數(shù)組大小
Executors.defaultThreadFactory(), // 線(xiàn)程池工廠(chǎng)
ProducerType.MULTI, // 支持多事件發(fā)布者
new BlockingWaitStrategy()); // 等待策略
// 設(shè)置消費(fèi)者
disruptor.handleEventsWithWorkerPool(
new EventConsumer(),
new EventConsumer(),
new EventConsumer(),
new EventConsumer());
disruptor.start();
RingBuffer ringBuffer = disruptor.getRingBuffer();
EventProducer eventProducer = new EventProducer(ringBuffer);
long i = 0;
for(;;){
i++;
eventProducer.sendData(i);
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
} 核心組件
基于上面簡(jiǎn)單例子來(lái)看確實(shí)很簡(jiǎn)單,Disruptor幫我們封裝好了生產(chǎn)消費(fèi)模型的實(shí)現(xiàn),接下來(lái)我們來(lái)看下他是基于哪些核心組件來(lái)支撐起一個(gè)高性能無(wú)鎖隊(duì)列呢?
RingBuffer: 環(huán)形數(shù)組,底層使用數(shù)組entries,在初始化時(shí)填充數(shù)組,避免不斷新建對(duì)象帶來(lái)的開(kāi)銷(xiāo)。后續(xù)只會(huì)對(duì)entries做更新操作
Sequencer: 核心管家
- 定義生產(chǎn)同步的實(shí)現(xiàn):SingleProducerSequencer?單生產(chǎn)、MultiProducerSequencer多生產(chǎn)
- 當(dāng)前寫(xiě)的進(jìn)度Sequence cursor
- 所有消費(fèi)者進(jìn)度的數(shù)組Sequence[] gatingSequences
- MultiProducerSequencer?可用區(qū)availableBuffer【利用空間換取查詢(xún)效率】
Sequence: 本身就是一個(gè)序號(hào)器用來(lái)標(biāo)識(shí)處理進(jìn)度,也可以當(dāng)做是一個(gè)atomicInteger; 還有另外一個(gè)特點(diǎn),為了解決偽共享問(wèn)題而引入的:緩存行填充。這個(gè)在后面介紹。
workProcessor: 處理Event的循環(huán),在循環(huán)中獲取Disruptor的事件,然后把事件分配給各個(gè)handler
EventHandler: 負(fù)責(zé)業(yè)務(wù)邏輯的handler,自己實(shí)現(xiàn)。
WaitStrategy: 消費(fèi)者 如何等待 事件的策略,定義了如下策略
- leepingWaitStrategy:自旋 + yield + sleep
- BlockingWaitStrategy:加鎖,適合CPU資源緊張(不需要切換線(xiàn)程),系統(tǒng)吞吐量無(wú)要求的
- YieldingWaitStrategy:自旋 + yield + 自旋
- BusySpinWaitStrategy:自旋,減少線(xiàn)程之前切換
- PhasedBackoffWaitStrategy:自旋 + yield + 自定義策略
帶著問(wèn)題來(lái)解析代碼?
1、多生產(chǎn)者如何保證消息生產(chǎn)不會(huì)相互覆蓋?!救绾芜_(dá)到互斥效果】
每個(gè)線(xiàn)程獲取不同的一段數(shù)組空間,然后通過(guò)CAS判斷這段空間是否已經(jīng)分配出去。
接下來(lái)我們看下多生產(chǎn)類(lèi)MultiProducerSequencer中next方法【獲取生產(chǎn)序號(hào)】
// 消費(fèi)者上一次消費(fèi)的最小序號(hào) // 后續(xù)第二點(diǎn)會(huì)講到
private final Sequence gatingSequenceCache = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
// 當(dāng)前進(jìn)度的序號(hào)
protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
// 所有消費(fèi)者的序號(hào) //后續(xù)第二點(diǎn)會(huì)講到
protected volatile Sequence[] gatingSequences = new Sequence[0];
public long next(int n)
{
if (n < 1)
{
throw new IllegalArgumentException("n must be > 0");
}
long current;
long next;
do
{
// 當(dāng)前進(jìn)度的序號(hào),Sequence的value具有可見(jiàn)性,保證多線(xiàn)程間線(xiàn)程之間能感知到可申請(qǐng)的最新值
current = cursor.get();
// 要申請(qǐng)的序號(hào)空間:最大序列號(hào)
next = current + n;
long wrapPoint = next - bufferSize;
// 消費(fèi)者最小序列號(hào)
long cachedGatingSequence = gatingSequenceCache.get();
// 大于一圈 || 最小消費(fèi)序列號(hào)>當(dāng)前進(jìn)度
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)
{
long gatingSequence = Util.getMinimumSequence(gatingSequences, current);
// 說(shuō)明大于1圈,并沒(méi)有多余空間可以申請(qǐng)
if (wrapPoint > gatingSequence)
{
LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
continue;
}
// 更新最小值到Sequence的value中
gatingSequenceCache.set(gatingSequence);
}
// CAS成功后更新當(dāng)前Sequence的value
else if (cursor.compareAndSet(current, next))
{
break;
}
}
while (true);
return next;
}
2、生產(chǎn)者向序號(hào)器申請(qǐng)寫(xiě)的序號(hào),如序號(hào)正在被消費(fèi),Sequencer是如何知道哪些序號(hào)是可以被寫(xiě)入的呢?【未消費(fèi)則被覆蓋如何處理】
從gatingSequences中取得最小的序號(hào),生產(chǎn)者最多能寫(xiě)到這個(gè)序號(hào)的后一位。通俗來(lái)講就是申請(qǐng)的序號(hào)不能大于最小消費(fèi)者序號(hào)一圈【申請(qǐng)到最大序列號(hào)-buffersize 要小于/等于 最小消費(fèi)的序列號(hào)】的時(shí)候, 才能申請(qǐng)到當(dāng)前寫(xiě)的序號(hào)
public final EventHandlerGrouphandleEventsWithWorkerPool(final WorkHandler ... workHandlers)
{
return createWorkerPool(new Sequence[0], workHandlers);
}
EventHandlerGroupcreateWorkerPool(
final Sequence[] barrierSequences, final WorkHandler super T>[] workHandlers)
{
final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(barrierSequences);
final WorkerPoolworkerPool = new WorkerPool<>(ringBuffer, sequenceBarrier, exceptionHandler, workHandlers);
consumerRepository.add(workerPool, sequenceBarrier);
final Sequence[] workerSequences = workerPool.getWorkerSequences();
updateGatingSequencesForNextInChain(barrierSequences, workerSequences);
return new EventHandlerGroup<>(this, consumerRepository, workerSequences);
}
private void updateGatingSequencesForNextInChain(final Sequence[] barrierSequences, final Sequence[] processorSequences)
{
if (processorSequences.length > 0)
{
// 消費(fèi)者啟動(dòng)后就會(huì)將所有消費(fèi)者存放入AbstractSequencer中g(shù)atingSequences
ringBuffer.addGatingSequences(processorSequences);
for (final Sequence barrierSequence : barrierSequences)
{
ringBuffer.removeGatingSequence(barrierSequence);
}
consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences);
}
}
3、在多生產(chǎn)者情況下,生產(chǎn)者是申請(qǐng)到一段可寫(xiě)入的序號(hào),然后再寫(xiě)入這些序號(hào)中,那么消費(fèi)者是如何感知哪些序號(hào)是可以被消費(fèi)的呢?【借問(wèn)提1圖說(shuō)明】
這個(gè)前提是多生產(chǎn)者情況下,第一點(diǎn)我們說(shuō)過(guò)每個(gè)線(xiàn)程獲取不同的一段數(shù)組空間,那么現(xiàn)在單單通過(guò)序號(hào)已經(jīng)不夠用了,MultiProducerSequencer?使用了int 數(shù)組 【availableBuffer?】來(lái)標(biāo)識(shí)當(dāng)前序號(hào)是否可用。當(dāng)生產(chǎn)者成功生產(chǎn)事件后會(huì)將availableBuffer中當(dāng)前序列號(hào)置為1標(biāo)識(shí)可以讀取。
如此消費(fèi)者可以讀取的的最大序號(hào)就是我們availableBuffer中第一個(gè)不可用序號(hào)-1。
初始化availableBuffer流程
public MultiProducerSequencer(int bufferSize, final WaitStrategy waitStrategy)
{
super(bufferSize, waitStrategy);
// 初始化可用數(shù)組
availableBuffer = new int[bufferSize];
indexMask = bufferSize - 1;
indexShift = Util.log2(bufferSize);
initialiseAvailableBuffer();
}
// 初始化默認(rèn)availableBuffer為-1
private void initialiseAvailableBuffer()
{
for (int i = availableBuffer.length - 1; i != 0; i--)
{
setAvailableBufferValue(i, -1);
}
setAvailableBufferValue(0, -1);
}
// 生產(chǎn)者成功生產(chǎn)事件將可用區(qū)數(shù)組置為1
public void publish(final long sequence)
{
setAvailable(sequence);
waitStrategy.signalAllWhenBlocking();
}
private void setAvailableBufferValue(int index, int flag)
{
long bufferAddress = (index * SCALE) + BASE;
UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag);
}
消費(fèi)者消費(fèi)流程
WorkProcessor類(lèi)中消費(fèi)run方法
public void run()
{
boolean processedSequence = true;
long cachedAvailableSequence = Long.MIN_VALUE;
long nextSequence = sequence.get();
T event = null;
while (true)
{
try
{
// 先通過(guò)cas獲取消費(fèi)事件的占有權(quán)
if (processedSequence)
{
processedSequence = false;
do
{
nextSequence = workSequence.get() + 1L;
sequence.set(nextSequence - 1L);
}
while (!workSequence.compareAndSet(nextSequence - 1L, nextSequence));
}
// 數(shù)據(jù)就緒,可以消費(fèi)
if (cachedAvailableSequence >= nextSequence)
{
event = ringBuffer.get(nextSequence);
// 觸發(fā)回調(diào)函數(shù)
workHandler.onEvent(event);
processedSequence = true;
}
else
{
// 獲取可以被讀取的下標(biāo)
cachedAvailableSequence = sequenceBarrier.waitFor(nextSequence);
}
}
// ....省略
}
notifyShutdown();
running.set(false);
}
public long waitFor(final long sequence)
throws AlertException, InterruptedException, TimeoutException
{
checkAlert();
// 這個(gè)值獲取的current write 下標(biāo),可以認(rèn)為全局消費(fèi)下標(biāo)。此處與每一段的write1和write2下標(biāo)區(qū)分開(kāi)
long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);
if (availableSequence < sequence)
{
return availableSequence;
}
// 通過(guò)availableBuffer篩選出第一個(gè)不可用序號(hào) -1
return sequencer.getHighestPublishedSequence(sequence, availableSequence);
}
public long getHighestPublishedSequence(long lowerBound, long availableSequence)
{
// 從current read下標(biāo)開(kāi)始, 循環(huán)至 current write,如果碰到availableBuffer 為-1 直接返回
for (long sequence = lowerBound; sequence <= availableSequence; sequence++)
{
if (!isAvailable(sequence))
{
return sequence - 1;
}
}
return availableSequence;
}
解決偽共享問(wèn)題
什么是偽共享問(wèn)題呢?
為了提高CPU的速度,Cpu有高速緩存Cache,該緩存最小單位為緩存行CacheLine,他是從主內(nèi)存復(fù)制的Cache的最小單位,通常是64字節(jié)。一個(gè)Java的long類(lèi)型是8字節(jié),因此在一個(gè)緩存行中可以存8個(gè)long類(lèi)型的變量。如果你訪(fǎng)問(wèn)一個(gè)long數(shù)組,當(dāng)數(shù)組中的一個(gè)值被加載到緩存中,它會(huì)額外加載另外7個(gè)。因此你能非常快地遍歷這個(gè)數(shù)組。
偽共享問(wèn)題是指,當(dāng)多個(gè)線(xiàn)程共享某份數(shù)據(jù)時(shí),線(xiàn)程1可能拉到線(xiàn)程2的數(shù)據(jù)在其cache line中,此時(shí)線(xiàn)程1修改數(shù)據(jù),線(xiàn)程2取其數(shù)據(jù)時(shí)就要重新從內(nèi)存中拉取,兩個(gè)線(xiàn)程互相影響,導(dǎo)致數(shù)據(jù)雖然在cache line中,每次卻要去內(nèi)存中拉取。
Disruptor是如何解決的呢?
在value前后統(tǒng)一都加入7個(gè)Long類(lèi)型進(jìn)行填充,線(xiàn)程拉取時(shí),不論如何都會(huì)占滿(mǎn)整個(gè)緩存
回顧總結(jié):Disuptor為何能稱(chēng)之為高性能的無(wú)鎖隊(duì)列框架呢?
- 緩存行填充,避免緩存頻繁失效?!緅ava8中也引入@sun.misc.Contended注解來(lái)避免偽共享】
- 無(wú)鎖競(jìng)爭(zhēng):通過(guò)CAS 【二階段提交】
- 環(huán)形數(shù)組:數(shù)據(jù)都是覆蓋,避免GC
- 底層更多的使用位運(yùn)算來(lái)提升效率
當(dāng)前名稱(chēng):構(gòu)建高性能內(nèi)存隊(duì)列:Disruptoryyds~
網(wǎng)頁(yè)網(wǎng)址:http://m.fisionsoft.com.cn/article/djhpesc.html


咨詢(xún)
建站咨詢(xún)
