Java并發(fā)編程之CountDownLatch源碼解析
CountDownLatch維護(hù)了一個(gè)計(jì)數(shù)器(還是是state字段),調(diào)用countDown方法會(huì)將計(jì)數(shù)器減1,調(diào)用await方法會(huì)阻塞線(xiàn)程直到計(jì)數(shù)器變?yōu)?。可以用于實(shí)現(xiàn)一個(gè)線(xiàn)程等待所有子線(xiàn)程任務(wù)完成之后再繼續(xù)執(zhí)行的邏輯,也可以實(shí)現(xiàn)類(lèi)似簡(jiǎn)易CyclicBarrier的功能,達(dá)到讓多個(gè)線(xiàn)程等待同時(shí)開(kāi)始執(zhí)行某一段邏輯目的。
二、使用 一個(gè)線(xiàn)程等待其它線(xiàn)程執(zhí)行完再繼續(xù)執(zhí)行......CountDownLatch cdl = new CountDownLatch(10);ExecutorService es = Executors.newFixedThreadPool(10);for (int i = 0; i < 10; i++) {es.execute(() -> {//do somethingcdl.countDown();});}cdl.await();...... 實(shí)現(xiàn)類(lèi)似CyclicBarrier的功能,先await,再countDown
......CountDownLatch cdl = new CountDownLatch(1);ExecutorService es = Executors.newFixedThreadPool(10);for (int i = 0; i < 10; i++) { es.execute(() -> {cdl.await();//do something });}Thread.sleep(10000L);cdl.countDown();......三、源碼分析
CountDownLatch的結(jié)構(gòu)和ReentrantLock、Semaphore的結(jié)構(gòu)類(lèi)似,也是使用的內(nèi)部類(lèi)Sync繼承AQS的方式,并且重寫(xiě)了tryAcquireShared和tryReleaseShared方法。
還是首先來(lái)看構(gòu)造函數(shù):
public CountDownLatch(int count) {if (count < 0) throw new IllegalArgumentException('count < 0');this.sync = new Sync(count); }
需要傳入一個(gè)大于0的count,代表CountDownLatch計(jì)數(shù)器的初始值,通過(guò)Sync的構(gòu)造函數(shù)最終賦值給父類(lèi)AQS的state字段。可一個(gè)看到這個(gè)state字段用法多多,在ReentrantLock中使用0和1來(lái)標(biāo)識(shí)鎖的狀態(tài),Semaphore中用來(lái)標(biāo)識(shí)信號(hào)量,此處又用來(lái)表示計(jì)數(shù)器。
CountDownLatch要通過(guò)await方法完成阻塞,先來(lái)看看這個(gè)方法是如何實(shí)現(xiàn)的:
public void await() throws InterruptedException {sync.acquireSharedInterruptibly(1); }
調(diào)用的是sync的acquireSharedInterruptibly方法,該方法定義在A(yíng)QS中,Semaphore也調(diào)用的這個(gè)方法:
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {if (Thread.interrupted()) throw new InterruptedException();if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
這個(gè)方法的邏輯前面在解析SemaPhore的時(shí)候細(xì)說(shuō)過(guò)了,這里不再贅述,主要就是兩個(gè)方法的調(diào)用,先通過(guò)tryAcquireShared方法嘗試獲取'許可',返回值代表此次獲取后的剩余量,如果大于等于0表示獲取成功,否則表示失敗。如果失敗,那么就會(huì)進(jìn)入doAcquireSharedInterruptibly方法執(zhí)行入隊(duì)阻塞的邏輯。這里我們主要到CountDownLatch中看看tryAcquireShared方法的實(shí)現(xiàn):
protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1;}
和Semaphore的實(shí)現(xiàn)中每次將state減去requires不同,這里直接判斷state是否為0,如果為0那么返回1,表示獲取'許可'成功;如果不為0,表示失敗,則需要入隊(duì)阻塞。從這個(gè)tryAcquireShared方法就能看出CountDownLatch的邏輯了:等到state變?yōu)榱?,那么所有線(xiàn)程都能獲取運(yùn)行許可。
那么我們接下來(lái)來(lái)到countDown方法:
public void countDown() {sync.releaseShared(1); }
調(diào)用的是sync的releaseShared方法,該方法定義在父類(lèi)AQS中,Semaphore使用的也是這個(gè)方法:
public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {//當(dāng)state從非 doReleaseShared(); return true;}return false; }
前面提到了CountDownLatch也重寫(xiě)了tryReleaseShared方法:
protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) {int c = getState();if (c == 0)//如果state等于0了直接返回false//保證在并發(fā)情況下,最多只會(huì)有一個(gè)線(xiàn)程返回true//也包括調(diào)用countDown的次數(shù)超過(guò)state的初始值 return false;int nextc = c-1;if (compareAndSetState(c, nextc))//如果返回true,表示state從非0變?yōu)榱?//那么后續(xù)需要喚醒阻塞線(xiàn)程 return nextc == 0; }}
Semaphore在釋放信號(hào)量的時(shí)候,是將獲取的許可歸還到state中,但是CountDownLatch沒(méi)有獲取許可的邏輯(獲取許可的時(shí)候是判斷state是否等于0),所以在countDown的時(shí)候沒(méi)有釋放的邏輯,就是將state減1,然后根據(jù)state減1之后的值是否為0判斷release是否成功,如果state本來(lái)大于0,經(jīng)過(guò)減1之后變?yōu)榱?,那么返回true。tryReleaseShared方法的返回值決定了后續(xù)需不需要調(diào)用doReleaseShared方法喚醒阻塞線(xiàn)程。
這里有個(gè)邏輯:如果state已經(jīng)為0,那么返回false。這個(gè)主要應(yīng)對(duì)兩種情況:
調(diào)用countDown的次數(shù)超過(guò)了state的初始值多 線(xiàn)程并發(fā)調(diào)用的時(shí)候保證只有一個(gè)線(xiàn)程去完成阻塞線(xiàn)程的喚醒操作可以看到CountDownLatch沒(méi)有鎖的概念,countDown方法可以被一個(gè)線(xiàn)程重復(fù)調(diào)用,只需要對(duì)state做reduce操作,而不用關(guān)心是誰(shuí)做的reduce。如果tryReleaseShared返回true,那么表示需要在后面進(jìn)入doReleaseShared方法,該方法和Semaphore中調(diào)用的方法是同一個(gè),主要是喚醒阻塞線(xiàn)程或者設(shè)置PROPAGAGE狀態(tài),這里也不再贅述~
阻塞線(xiàn)程被喚醒之后,會(huì)在doAcquireSharedInterruptibly方法中繼續(xù)循環(huán),雖然和Semaphore調(diào)用的是同樣的方法,但是這里有不一樣的地方,所以還是提一句。我們首先回到doAcquireSharedInterruptibly方法:
private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {final Node node = addWaiter(Node.SHARED);boolean failed = true;try { for (;;) {final Node p = node.predecessor();if (p == head) {//如果head.next被unpark喚醒,說(shuō)明此時(shí)state==0//那么tryAcquireShared會(huì)返回1 int r = tryAcquireShared(arg); //r==1 if (r >= 0) { //node節(jié)點(diǎn)被喚醒后,還會(huì)繼續(xù)喚醒node.next //這樣依次傳遞,因?yàn)樵谶@里的r一定為1setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return; }}if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); }} finally { if (failed)cancelAcquire(node);} }
當(dāng)head.next線(xiàn)程被unpark喚醒后,會(huì)進(jìn)入tryAcquireShared方法判斷,由于此時(shí)state已經(jīng)為0(只有當(dāng)state變?yōu)?時(shí),才會(huì)unpark喚醒線(xiàn)程),而前面提到了在CountDownLatch重寫(xiě)的tryAcquireShared中,如果state==0,那么會(huì)返回1,所以會(huì)進(jìn)入setHeadAndPropagate方法:
private void setHeadAndPropagate(Node node, int propagate) {Node h = head; // Record old head for check belowsetHead(node);if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared())doReleaseShared();} }
該方法在Semaphore中詳細(xì)介紹過(guò),這里我們就站在CountDownLatch的角度來(lái)看看。其實(shí)很簡(jiǎn)單了,注意此時(shí)該方法的propagate參數(shù)值是1,那么就會(huì)進(jìn)入到下面的if邏輯里,繼續(xù)喚醒下一個(gè)node。當(dāng)下一個(gè)node對(duì)應(yīng)的線(xiàn)程被喚醒后,同樣會(huì)進(jìn)入setHeadAndPropagate方法,propagage同樣為1,那么繼續(xù)喚醒下一個(gè)node,就這樣依次將整個(gè)CLH隊(duì)列的節(jié)點(diǎn)都喚醒。
四、總結(jié)如果單獨(dú)把CountDownLatch拿出來(lái)看其實(shí)是很復(fù)雜的,只是CountDownLatch(包括Semaphore和ReentrantLock)都高度共用了AQS提供的一些方法,而這些方法在前面介紹Semaphore和ReentrantLock的時(shí)候已經(jīng)詳細(xì)分析過(guò),所以到本文分析CountDownLatch的時(shí)候,只需要關(guān)注它內(nèi)部類(lèi)Sync重寫(xiě)的兩個(gè)方法:tryAcquireShared和tryReleaseShared,也就是'獲取許可'和'釋放許可'的邏輯。
CountDownLatch在await的邏輯里,如果當(dāng)前state的值大于0,那么會(huì)進(jìn)入CLH隊(duì)列進(jìn)行阻塞等待unpark喚醒(或者中斷喚醒);在countDown的邏輯里,就是簡(jiǎn)單的將state-1,如果一個(gè)線(xiàn)程把state從1減為0,那么該線(xiàn)程就會(huì)負(fù)責(zé)喚醒head.next節(jié)點(diǎn),head.next節(jié)點(diǎn)被喚醒后,又會(huì)在setHeadAndPropagate方法中喚醒next.next節(jié)點(diǎn),這樣依次喚醒所有CLH隊(duì)列中的阻塞節(jié)點(diǎn)。當(dāng)然,如果線(xiàn)程被中斷喚醒,那么也會(huì)進(jìn)入cancelAcquire中進(jìn)行無(wú)效節(jié)點(diǎn)的移除邏輯。
到此這篇關(guān)于Java并發(fā)編程之CountDownLatch源碼解析的文章就介紹到這了,更多相關(guān)Java中CountDownLatch源碼解析內(nèi)容請(qǐng)搜索好吧啦網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持好吧啦網(wǎng)!
相關(guān)文章:
1. Python獲取抖音關(guān)注列表封號(hào)賬號(hào)的實(shí)現(xiàn)代碼2. Warning: require(): open_basedir restriction in effect,目錄配置open_basedir報(bào)錯(cuò)問(wèn)題分析3. php網(wǎng)絡(luò)安全中命令執(zhí)行漏洞的產(chǎn)生及本質(zhì)探究4. 解決Python 進(jìn)程池Pool中一些坑5. php測(cè)試程序運(yùn)行速度和頁(yè)面執(zhí)行速度的代碼6. Python如何讀寫(xiě)CSV文件7. 三個(gè)不常見(jiàn)的 HTML5 實(shí)用新特性簡(jiǎn)介8. ajax請(qǐng)求添加自定義header參數(shù)代碼9. python利用os模塊編寫(xiě)文件復(fù)制功能——copy()函數(shù)用法10. 無(wú)線(xiàn)標(biāo)記語(yǔ)言(WML)基礎(chǔ)之WMLScript 基礎(chǔ)第1/2頁(yè)
