新聞中心
前言

成都創(chuàng)新互聯(lián)堅(jiān)持“要么做到,要么別承諾”的工作理念,服務(wù)領(lǐng)域包括:成都網(wǎng)站設(shè)計(jì)、成都做網(wǎng)站、企業(yè)官網(wǎng)、英文網(wǎng)站、手機(jī)端網(wǎng)站、網(wǎng)站推廣等服務(wù),滿足客戶于互聯(lián)網(wǎng)時(shí)代的浦城網(wǎng)站設(shè)計(jì)、移動(dòng)媒體設(shè)計(jì)的需求,幫助企業(yè)找到有效的互聯(lián)網(wǎng)解決方案。努力成為您成熟可靠的網(wǎng)絡(luò)建設(shè)合作伙伴!
控制并發(fā)流程的工具類,作用就是幫助我們程序員更容易的讓線程之間合作,讓線程之間相互配合來滿足業(yè)務(wù)邏輯。比如讓線程A等待線程B執(zhí)行完畢后再執(zhí)行等合作策略。
控制并發(fā)流程的工具類主要有:
簡(jiǎn)介
從字面意思看,這個(gè)類的中文意思是“循環(huán)柵欄”。大概的意思就是一個(gè)可循環(huán)利用的屏障。它的作用就是會(huì)讓所有線程都等待完成后才會(huì)繼續(xù)下一步行動(dòng)。
舉個(gè)例子,就像生活中我們會(huì)約朋友到某個(gè)餐廳一起吃飯,有些朋友可能會(huì)早到,有些朋友可能會(huì)晚到,但這個(gè)餐廳規(guī)定必須等到所有人到期之后才會(huì)讓我們進(jìn)去。這里的朋友們就各個(gè)線程,餐廳就是CyclicBarrier。
在JUC包中為我們提供了一個(gè)同步工具類能夠很好的模擬這類場(chǎng)景,它就是CyclicBarrier類。利用CyclicBarrier類可以實(shí)現(xiàn)一組線程相互等待,當(dāng)所有線程都到達(dá)某個(gè)屏障點(diǎn)后再進(jìn)行后續(xù)的操作。下圖演示了這一過程。
應(yīng)用場(chǎng)景
可用于多線程計(jì)數(shù)數(shù)據(jù),最后合并計(jì)數(shù)結(jié)果的場(chǎng)景。
使用CyclicBarrier實(shí)現(xiàn)等待的線程都被稱為參與方。參與方只需要執(zhí)行cyclicBarrier.await() 就可以實(shí)現(xiàn)等待。由于CyclicBarrier內(nèi)部維護(hù)了一個(gè)顯示鎖,這可以知道參與方中誰最后一個(gè)執(zhí)行cyclicBarrier.await() 。當(dāng)最后一個(gè)線程執(zhí)行完,會(huì)使得使用相應(yīng)CyclicBarrier實(shí)例的其他參與方被喚醒,而最后一個(gè)線程自身不會(huì)被暫停。其流程圖如下:
- public static void main(String[] args) {
- CyclicBarrier cyclicBarrier = new CyclicBarrier(7,() ->{
- System.out.println("****召喚神龍");
- });
- for(int i = 1;i <= 7; i++){
- int finalI = i;
- new Thread(() -> {
- System.out.println(Thread.currentThread().getName() + "\t 收集到第"+ finalI +"顆龍珠");
- try {
- cyclicBarrier.await();
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (BrokenBarrierException e) {
- e.printStackTrace();
- }
- },String.valueOf(i)).start();
- }
- }
源碼分析
CyclicBarrier 類圖
CyclicBarrier是包含了 “ReentrantLock對(duì)象lock” 和 “Condition對(duì)象trip”,它是通過獨(dú)占鎖實(shí)現(xiàn)的。
其內(nèi)部主要變量和方法如下:
成員變量
//同步操作鎖
private final ReentrantLock lock = new ReentrantLock();
- //同步操作鎖
- private final ReentrantLock lock = new ReentrantLock();
- //線程攔截器
- private final Condition trip = lock.newCondition();
- //每次攔截的線程數(shù)
- private final int parties;
- //換代前執(zhí)行的任務(wù)
- private final Runnable barrierCommand;
- //表示柵欄的當(dāng)前代
- private Generation generation = new Generation();
- //計(jì)數(shù)器
- private int count;
- //靜態(tài)內(nèi)部類Generation
- private static class Generation {
- boolean broken = false;
- }
可以看到 CyclicBarrier 內(nèi)部是通過條件隊(duì)列 trip 來對(duì)線程進(jìn)行阻塞的,并且其內(nèi)部維護(hù)了兩個(gè) int 型的變量 parties 和 count:
- parties 表示每次攔截的線程數(shù),該值在構(gòu)造時(shí)進(jìn)行賦值;
- count 是內(nèi)部計(jì)數(shù)器,它的初始值和 parties 相同,以后隨著每次 await 方法的調(diào)用而減 1,直到減為 0 就將所有線程喚醒。
CycliBarrier 有一個(gè)靜態(tài)內(nèi)部類 Generation,該類的對(duì)象代表柵欄的當(dāng)前代,就像玩游戲時(shí)代表的本局有些,利用它可以實(shí)現(xiàn)循環(huán)等待。barrierCommand 表示換代前執(zhí)行的任務(wù),當(dāng) count 減為 0 時(shí)表示本局游戲結(jié)束,需要轉(zhuǎn)到下一句。在轉(zhuǎn)到下一句游戲之前會(huì)將所有阻塞的線程喚醒,在喚醒所有線程之前你可以通過指定 barrierCommand 來執(zhí)行自己的任務(wù)。
構(gòu)造函數(shù)
主要提供了兩個(gè)構(gòu)造方法
- public CyclicBarrier(int parties) {
- this(parties, null);
- }
- public CyclicBarrier(int parties, Runnable barrierAction) {
- if (parties <= 0) throw new IllegalArgumentException();
- // parties表示“必須同時(shí)到達(dá)barrier的線程個(gè)數(shù)”。
- this.parties = parties;
- // count表示“處在等待狀態(tài)的線程個(gè)數(shù)”。
- this.count = parties;
- // barrierCommand表示“parties個(gè)線程到達(dá)barrier時(shí),會(huì)執(zhí)行的動(dòng)作”。
- this.barrierCommand = barrierAction;
- }
解析:
- parties 是參與線程的個(gè)數(shù)
- 第二個(gè)構(gòu)造方法有一個(gè)Runnable參數(shù),這個(gè)參數(shù)的意思是最后一個(gè)到達(dá)線程要執(zhí)行的動(dòng)作。
重要方法
CyclicBarrier類最主要的功能就是使先到達(dá)屏障點(diǎn)的線程阻塞并等待后面的線程,其中它提供了兩種等待的方法,分別是定時(shí)等待和非定時(shí)等待。
await()方法
- //非定時(shí)等待
- public int await() throws InterruptedException, BrokenBarrierException {
- try {
- return dowait(false, 0L);
- } catch (TimeoutException toe) {
- throw new Error(toe);
- }
- }
- //定時(shí)等待
- public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException {
- return dowait(true, unit.toNanos(timeout));
- }
解析:
- 線程調(diào)用await()表示總結(jié)已經(jīng)到達(dá)柵欄
- BrokenBarrierException表示柵欄已經(jīng)被破壞,破壞的原因可能是其中一個(gè)線程await()時(shí)被中斷或者超時(shí)。
dowait()方法
可以看到不管是定時(shí)等待還是非定時(shí)等待,它們都調(diào)用了dowait方法,只不過是傳入的參數(shù)不同而已。下面我們就來看看dowait方法都做了些什么。
- //核心等待方法
- private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
- // 顯示鎖
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- final Generation g = generation;
- //檢查當(dāng)前柵欄是否被打翻
- if (g.broken) {
- throw new BrokenBarrierException();
- }
- //檢查當(dāng)前線程是否被中斷
- if (Thread.interrupted()) {
- //如果當(dāng)前線程被中斷會(huì)做以下三件事
- //1.打翻當(dāng)前柵欄
- //2.喚醒攔截的所有線程
- //3.拋出中斷異常
- breakBarrier();
- throw new InterruptedException();
- }
- //每次都將計(jì)數(shù)器的值減1
- int index = --count;
- //計(jì)數(shù)器的值減為0則需喚醒所有線程并轉(zhuǎn)換到下一代
- if (index == 0) {
- boolean ranAction = false;
- try {
- //喚醒所有線程前先執(zhí)行指定的任務(wù)
- final Runnable command = barrierCommand;
- if (command != null) {
- command.run();
- }
- ranAction = true;
- //喚醒所有線程并轉(zhuǎn)到下一代
- nextGeneration();
- return 0;
- } finally {
- //確保在任務(wù)未成功執(zhí)行時(shí)能將所有線程喚醒
- if (!ranAction) {
- breakBarrier();
- }
- }
- }
- //如果計(jì)數(shù)器不為0則執(zhí)行此循環(huán)
- for (;;) {
- try {
- //根據(jù)傳入的參數(shù)來決定是定時(shí)等待還是非定時(shí)等待
- if (!timed) {
- trip.await();
- }else if (nanos > 0L) {
- nanos = trip.awaitNanos(nanos);
- }
- } catch (InterruptedException ie) {
- //若當(dāng)前線程在等待期間被中斷則打翻柵欄喚醒其他線程
- if (g == generation && ! g.broken) {
- breakBarrier();
- throw ie;
- } else {
- //若在捕獲中斷異常前已經(jīng)完成在柵欄上的等待, 則直接調(diào)用中斷操作
- Thread.currentThread().interrupt();
- }
- }
- //如果線程因?yàn)榇蚍瓥艡诓僮鞫粏拘褎t拋出異常
- if (g.broken) {
- throw new BrokenBarrierException();
- }
- //如果線程因?yàn)閾Q代操作而被喚醒則返回計(jì)數(shù)器的值
- if (g != generation) {
- return index;
- }
- //如果線程因?yàn)闀r(shí)間到了而被喚醒則打翻柵欄并拋出異常
- if (timed && nanos <= 0L) {
- breakBarrier();
- throw new TimeoutException();
- }
- }
- } finally {
- lock.unlock();
- }
- }
上面執(zhí)行的代碼相對(duì)比較容易看懂,我們?cè)賮砜匆幌聢?zhí)行流程:
- 獲得顯示鎖,判斷當(dāng)前線程狀態(tài)是否被中斷,如果是,則執(zhí)行 breakBarrier 方法,喚醒之前阻塞的所有線程,并將計(jì)數(shù)器重置;
- 計(jì)數(shù)器 count 減 1,如果 count == 0,表示最后一個(gè)線程達(dá)到柵欄,接著執(zhí)行之前指定的 Runnable 接口,同時(shí)執(zhí)行 nextGeneration 方法進(jìn)入下一代;
- 否則,進(jìn)入自旋,判斷當(dāng)前線程是進(jìn)入定時(shí)等待還是非定時(shí)等待,如果在等待過程中被中斷,執(zhí)行 breakBarrier 方法,喚醒之前阻塞的所有線程;
- 判斷是否是因?yàn)閳?zhí)行 breakBarrier 方法而被喚醒,如果是,則拋出異常;
- 判斷是否是正常的換代操作而被喚醒,如果是,則返回計(jì)數(shù)器的值;
- 判斷是否是超時(shí)而被喚醒,如果是,則喚醒之前阻塞的所有線程,并拋出異常;
- 釋放鎖。
breakBarrier()方法
- private void breakBarrier() {
- generation.broken = true;//柵欄被打破
- count = parties;//重置count
- trip.signalAll();//喚醒之前阻塞的線程
- }
nextGeneration()方法
- private void nextGeneration() {
- //喚醒所以的線程
- trip.signalAll();
- //重置計(jì)數(shù)器
- count = parties;
- //重新開始
- generation = new Generation();
- }
reset()方法
接下來看看柵欄重置的方法
- // 重置barrier到初始狀態(tài),所有還在等待中的線程最終會(huì)拋出BrokenBarrierException。
- public void reset() {
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- breakBarrier(); // break the current generation
- nextGeneration(); // start a new generation
- } finally {
- lock.unlock();
- }
- }
其它方法
CyclicBarrier 其它還提供了例如getParties,isBroken,getNumberWaiting等方法,都比較簡(jiǎn)單,其中除了getParties由于parties被final修飾不可變,其余方法都會(huì)先去獲得互斥鎖。
- /**
- * 獲取當(dāng)前這一輪是否已經(jīng)broken。
- */
- public boolean isBroken() {
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- return generation.broken;
- } finally {
- lock.unlock();
- }
- }
- /**
- * 獲得當(dāng)前在barrier中等待的線程數(shù)。
- */
- public int getNumberWaiting() {
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- return parties - count;
- } finally {
- lock.unlock();
- }
- }
總結(jié)
CountDownLatch和CyclicBarrier區(qū)別
- CountDownLatch和CyclicBarrier都能夠?qū)崿F(xiàn)線程之間的等待,只不過它們側(cè)重點(diǎn)不同:
- CountDownLatch一般用于一個(gè)或多個(gè)線程,等待其他線程執(zhí)行完任務(wù)后,再才執(zhí)行;
- CyclicBarrier一般用于一組線程互相等待至某個(gè)狀態(tài),然后這一組線程再同時(shí)執(zhí)行;
- CountDownLatch 是一次性的,CyclicBarrier 是可循環(huán)利用的;
- CountDownLathch是一個(gè)計(jì)數(shù)器,線程完成一個(gè)記錄一個(gè),計(jì)數(shù)器遞減,只能用一次。如下圖:
CyclicBarrier的計(jì)數(shù)器更像一個(gè)閥門,需要所有線程都到達(dá),然后繼續(xù)執(zhí)行,計(jì)數(shù)器遞減,提供reset功能,可以多次使用。如下圖:
PS:以上代碼提交在 Github :
https://github.com/Niuh-Study/niuh-juc-final.git
PS:這里有一個(gè)技術(shù)交流群(QQ群:1158819530),方便大家一起交流,持續(xù)學(xué)習(xí),共同進(jìn)步,有需要的可以加一下。
網(wǎng)頁名稱:并發(fā)編程之CyclicBarrier原理與使用
文章位置:http://m.fisionsoft.com.cn/article/copggoj.html


咨詢
建站咨詢
