深入理解Java 線程通信
當線程在系統內運行時,線程的調度具有一定的透明性,程序通常無法準確控制線程的輪換執行,但 Java 也提供了一些機制來保證線程協調運行。
傳統的線程通信
假設現在系統中有兩個線程,這兩個線程分別代表存款者和取錢者——現在假設系統有一種特殊的要求,系統要求存款者和取錢者不斷地重復存款、取錢的動作,而且要求每當存款者將錢存入指定賬戶后,取錢者就立即取出該筆錢。不允許存款者連續兩次存錢,也不允許取錢者連續兩次取錢。
為了實現這種功能,可以借助于 Object 類提供的 wait()、 notify() 和 notifyAll() 三個方法,這三個方法并不屬于 Thread 類,而是屬于 Object 類。但這三個方法必須由同步監視器對象來調用,這可分成以下兩種情況。
對于使用 synchronized 修飾的同步方法,因為該類的默認實例(this)就是同步監視器,所以可以在同步方法中直接調用這三個方法。 對于使用 synchronized 修飾的同步代碼塊,同步監視器是 synchronized 后括號里的對象,所以必須使用該對象調用這三個方法。關于這三個方法的解釋如下。
wait():導致當前線程等待,直到其他線程調用該同步監視器的 notify() 方法或 notifyAll() 方法來喚醒該線程。該 wait() 方法有三種形式——無時間參數的 wait (—直等待,直到其他線程通知 )、帶毫秒參數的 wait() 和帶毫秒、毫微秒參數的 wait() (這兩種方法都是等待指定時間后自動蘇醒)。調用 wait() 方法的當前線程會釋放對該同步監視器的鎖定。 notify():喚醒在此同步監視器上等待的單個線程。如果所有線程都在此同步監視器上等待,則會選擇喚醒其中一個線程。選擇是任意性的。只有當前線程放棄對該同步監視器的鎖定后(使用 wait() 方法),才可以執行被喚醒的線程。 notifyAll():喚醒在此同步監視器上等待的所有線程。只有當前線程放棄對該同步監視器的鎖定后,才可以執行被喚醒的線程。程序中可以通過一個旗標來標識賬戶中是否已有存款,當旗標為 false 時,表明賬戶中沒有存款,存款者線程可以向下執行,當存款者把錢存入賬戶后,將旗標設為 true ,并調用 notify() 或 notifyAll() 方法來喚醒其他線程;當存款者線程進入線程體后,如果旗標為 true 就調用 wait() 方法讓該線程等待。
當旗標為 true 時,表明賬戶中已經存入了存款,則取錢者線程可以向下執行,當取錢者把錢從賬戶中取出后,將旗標設為 false ,并調用 notify() 或 notifyAll() 方法來喚醒其他線程;當取錢者線程進入線程體后,如果旗標為 false 就調用 wait() 方法讓該線程等待。
本程序為 Account 類提供 draw() 和 deposit() 兩個方法,分別對應該賬戶的取錢、存款等操作,因為這兩個方法可能需要并發修改 Account 類的 balance 成員變量的值,所以這兩個方法都使用 synchronized 修飾成同步方法。除此之外,這兩個方法還使用了 wait() 和 notifyAll() 來控制線程的協作。
public class Account{ private String accountNo; private double balance; //標識賬戶中是否已有存款的旗標 private boolean flag = false; public Account(){} public Account(String accountNo , double balance){ this.accountNo = accountNo; this.balance = balance; } public void setAccountNo(String accountNo){ this.accountNo = accountNo; } public String getAccountNo(){ return this.accountNo; } public double getBalance(){ return this.balance; } public synchronized void draw(double drawAmount){ try{ //如果flag為假,表明賬戶中還沒有人存錢進去,則取錢方法阻塞 if (!flag){wait(); }else{//執行取錢System.out.println(Thread.currentThread().getName() + ' 取錢:' + drawAmount);balance -= drawAmount;System.out.println('賬戶余額為:' + balance);//將標識賬戶是否已有存款的旗標設為false。flag = false;//喚醒其他線程notifyAll(); } }catch (InterruptedException ex){ ex.printStackTrace(); } } public synchronized void deposit(double depositAmount){ try{ //如果flag為真,表明賬戶中已有人存錢進去,則存錢方法阻塞 if (flag){ // ①wait(); }else{//執行存款System.out.println(Thread.currentThread().getName() + ' 存款:' + depositAmount);balance += depositAmount;System.out.println('賬戶余額為:' + balance);//將表示賬戶是否已有存款的旗標設為trueflag = true;//喚醒其他線程notifyAll(); } }catch (InterruptedException ex){ ex.printStackTrace(); } } public int hashCode(){ return accountNo.hashCode(); } public boolean equals(Object obj){ if (obj != null && obj.getClass() == Account.class){ Account target = (Account)obj; return target.getAccountNo().equals(accountNo); } return false; }}
上面程序中的粗體字代碼使用 wait() 和 notifyAll() 進行了控制,對存款者線程而言,當程序進入 deposit() 方法后,如果 flag 為 true ,則表明賬戶中已有存款,程序調用 wait() 方法阻塞;否則程序向下執行存款操作,當存款操作執行完成后,系統將 flag 設為 true,然后調用 notifyAll() 來喚醒其他被阻塞的線程——如果系統中有存款者線程,存款者線程也會被喚醒,但該存款者線程執行到①號代碼處時再次進入阻塞狀態,只有執行 draw() 方法的取錢者線程才可以向下執行。同理,取錢者線程的運行流程也是如此。
程序中的存款者線程循環100次重復存款,而取錢者線程則循環100次重復取錢,存款者線程和取錢者線程分別調用 Account 對象的 deposit()、 draw() 方法來實現。
public class DrawThread extends Thread{ //模擬用戶賬戶 private Account account; //當前取錢線程所希望取的錢數 private double drawAmount; public DrawThread(String name, Account account, double drawAmount){ super(name); this.account = account; this.drawAmount = drawAmount; } //重復100次執行取錢操作 public void run(){ for (int i = 0 ; i < 100 ; i++ ){ account.draw(drawAmount); } }}
public class DepositThread extends Thread{ //模擬用戶賬戶 private Account account; //當前取錢線程所希望存款的錢數 private double depositAmount; public DepositThread(String name, Account account, double depositAmount){ super(name); this.account = account; this.depositAmount = depositAmount; } //重復100次執行存款操作 public void run(){ for (int i = 0 ; i < 100 ; i++ ){ account.deposit(depositAmount); } }}
主程序可以啟動任意多個存款線程和取錢線程,可以看到所有的取錢線程必須等存款線程存錢后才可以向下執行,而存款線程也必須等取錢線程取錢后才可以向下執行。主程序代碼如下。
public class TestDraw{ public static void main(String[] args){ //創建一個賬戶 Account acct = new Account('1234567' , 0); new DrawThread('取錢者' , acct , 800).start(); new DepositThread('存款者甲' , acct , 800).start(); new DepositThread('存款者乙' , acct , 800).start(); new DepositThread('存款者丙' , acct , 800).start(); }}
運行該程序,可以看到存款者線程、取錢者線程交替執行的情形,每當存款者向賬戶中存入800元之后,取錢者線程立即從賬戶中取出這筆錢。存款完成后賬戶余額總是800元,取錢結束后賬戶余額總是0元。運行該程序,會看到如下圖所示的結果。
從上圖中可以看出 , 3個存款者線程隨機地向賬戶中存款,只有1個取錢者線程執行取錢操作。只有當取錢者取錢后,存款者才可以存款;同理,只有等存款者存款后,取錢者線程才可以取錢。
上圖顯示程序最后被阻塞無法繼續向下執行,這因為3個存款者線程共有300次存款操作,但1個取錢者線程只有100次取錢操作,所以程序最后被阻塞。
注意:上圖所示的阻塞并不是死鎖,對于這種情況,取錢者線程已經執行結束,而存款者線程只是在等待其他線程來取錢而已,并不是等待其他線程釋放同步監視器。不要把死鎖和程序阻塞等同起來!
使用 Condition 控制線程通信
如果程序不使用 synchronized 關鍵字來保證同步,而是直接使用 Lock 對象來保證同步,則系統中不存在隱式的同步監視器,也就不能使用 wait()、notify()、notifyAll() 方法進行線程通信了。
當使用 Lock 對象來保證同步時,Java 提供了一個 Condition 類來保持協調,使用 Condition 可以讓那些已經得到 Lock 對象卻無法繼續執行的線程釋放 Lock 對象,Condition 對象也可以喚醒其他處于等待的線程。
Condition 將同步監視器方法(wait()、notify() 和 notifyAll() )分解成截然不同的對象,以便通過將這些對象與 Lock 對象組合使用,為每個對象提供多個等待集(wait-set)。在這種情況下,Lock 替代了同步方法或同步代碼塊,Condition 替代了同步監視器的功能。
Condition 實例被綁定在一個 Lock 對象上。要獲得特定 Lock 實例的 Condition 實例,調用 Lock 對象的 newCondition() 方法即可。Condition 類提供了如下三個方法。
await():類似于隱式同步監視器上的 wait() 方法,導致當前線程等待,直到其他線程調用該 Condition 的 signal() 方法或 signalAll() 方法來喚醒該線程。該 await() 方法有更多變體,如 long awaitNanos(long nanosTimeout)、 void awaitUninterruptibly() 、 awaitUntil(Date deadline) 等,可以完成更豐富的等待操作。 signal():喚醒在此 Lock 對象上等待的單個線程。如果所有線程都在該 Lock 對象上等待,則會選擇喚醒其中一個線程。選擇是任意性的。只有當前線程放棄對該 Lock 對象的鎖定后(使用 await() 方法),才可以執行被喚醒的線程。 signalAll():喚醒在此 Lock 對象上等待的所有線程。只有當前線程放棄對該 Lock 對象的鎖定后,才可以執行被喚醒的線程。下面程序中 Account 使用 Lock 對象來控制同步,并使用 Condition 對象來控制線程的協調運行。
public class Account{ //顯示定義Lock對象 private final Lock lock = new ReentrantLock(); //獲得指定Lock對象對應的條件變量 private final Condition cond = lock.newCondition(); private String accountNo; private double balance; //標識賬戶中是否已經存款的旗標 private boolean flag = false; public Account(){} public Account(String accountNo , double balance){ this.accountNo = accountNo; this.balance = balance; } public void setAccountNo(String accountNo){ this.accountNo = accountNo; } public String getAccountNo(){ return this.accountNo; } public double getBalance(){ return this.balance; } public void draw(double drawAmount){ //加鎖 lock.lock(); try{ //如果賬戶中還沒有存入存款,該線程等待 if (!flag){cond.await(); }else{//執行取錢操作System.out.println(Thread.currentThread().getName() + ' 取錢:' + drawAmount);balance -= drawAmount;System.out.println('賬戶余額為:' + balance);//將標識是否成功存入存款的旗標設為falseflag = false;//喚醒該Lock對象對應的其他線程cond.signalAll(); } }catch (InterruptedException ex){ ex.printStackTrace(); } //使用finally塊來確保釋放鎖 finally{ lock.unlock(); } } public void deposit(double depositAmount){ lock.lock(); try{ //如果賬戶中已經存入了存款,該線程等待 if(flag){cond.await(); }else{//執行存款操作System.out.println(Thread.currentThread().getName() + ' 存款:' + depositAmount);balance += depositAmount;System.out.println('賬戶余額為:' + balance);//將標識是否成功存入存款的旗標設為trueflag = true;//喚醒該Lock對象對應的其他線程cond.signalAll(); } }catch (InterruptedException ex){ ex.printStackTrace(); } //使用finally塊來確保釋放鎖 finally{ lock.unlock(); } } public int hashCode(){ return accountNo.hashCode(); } public boolean equals(Object obj){ if (obj != null && obj.getClass() == Account.class){ Account target = (Account)obj; return target.getAccountNo().equals(accountNo); } return false; }}
顯式地使用 Lock 對象來充當同步監視器,則需要使用 Condition 對象來暫停、喚醒指定線程。存取錢的代碼和最上面相同。
使用阻塞隊列(BlockingQueue)控制線程通信
Java5 提供了一個 BlockingQueue 接口,雖然 BlockingQueue 也是 Queue 的子接口,但它的主要用途并不是作為容器,而是作為線程同步的工具。 BlockingQueue 具有一個特征:當生產者線程試圖向 BlockingQueue 中放入元素時,如果該隊列已滿,則該線程被阻塞;當消費者線程試圖從 BlockingQueue 中取出元素時,如果該隊列已空,則該線程被阻塞。
程序的兩個線程通過交替向 BlockingQueue 中放入元素、取出元素,即可很好地控制線程的通信。BlockingQueue 提供如下兩個支持阻塞的方法。
put(E e):嘗試把 E 元素放入 BlockingQueue 中,如果該隊列的元素己滿,則阻塞該線程。 take():嘗試從 BlockingQueue 的頭部取出元素,如果該隊列的元素已空,則阻塞該線程。BlockingQueue 繼承了 Queue 接口,當然也可使用 Queue 接口中的方法。這些方法歸納起來可分為如下三組。
在隊列尾部插入元素。包括 add(E e)、offer(E e) 和 put(E e) 方法,當該隊列已滿時,這三個方法分別會拋出異常、返回 false 、阻塞隊列。 在隊列頭部刪除并返回刪除的元素。包括 remove()、 poll() 和 take() 方法。當該隊列已空時,這三個方法分別會拋出異常、返回 false 、阻塞隊列。 在隊列頭部取出但不刪除元素。包括 element() 和 peek() 方法,當隊列已空時,這兩個方法分別拋出異常、返回 falseBlockingQueue 包含的方法之間的對應關系如下表所示:
BlockingQueue 與其實現類之間的類圖如下圖所示。
上圖中以黑色方框框出的都是 Java7 新增的阻塞隊列。可以看到 , BlockingQueue 包含如下5個實現類。
ArrayBlockingQueue:基于數組實現的 BlockingQueue 隊列。 LinkedBlockingQueue:基于鏈表實現的 BlockingQueue 隊列。 PriorityBlockingQueue:它并不是標準的阻塞隊列。與前面介紹的 PriorityQueue 類似,該隊列調用 remove()、poll()、take() 等方法取出元素時,并不是取出隊列中存在時間最長的元素,而是隊列中最小的元素。 PriorityBlockingQueue 判斷元素的大小即可根據元素(實現 Comparable 接口)的本身大小來自然排序,也可使用 Comparator 進行定制排序。 SynchronousQueue:同步隊列。對該隊列的存、取操作必須交替進行。 DelayQueue:它是一個特殊的 BlockingQueue ,底層基于 PriorityBlockingQueue 實現。不過,DelayQueue 要求集合元素都實現 Delay 接口(該接口里只有一個 long getDelay() 方法),DelayQueue 根據集合元素的 getDalay() 方法的返回值進行排序。下面以 ArrayBlockingQueue 為例介紹阻塞隊列的功能和用法。下面先用一個最簡單的程序來測試 BlockingQueue 的 put() 方法。
public class BlockingQueueTest { public static void main(String[] args) throws Exception { BlockingQueue<String> bq = new ArrayBlockingQueue<>(2); bq.put('Java'); // 與bq.add('Java')、bq.offer('Java') 相同 bq.put('Java'); // 與bq.add('Java')、bq.offer('Java') 相同 bq.put('Java'); // ① 阻塞線程 }}
上面程序先定義一個大小為2的 BlockingQueue,程序先向該隊列中放入兩個元素,此時隊列還沒有滿,兩個元紊都可以放入,因此使用 put()、add() 和 offer() 方法效果完全一樣。當程序試圖放入第三個元素時,如果使用 put() 方法嘗試放入元素將會阻寒線程,如上面程序①號代碼所示。如果使用 add() 方法嘗試放入元素將會引發異常;如果使用 offer() 方法嘗試放入元素則會返回 false,元素不會被放入。
與此類似的是,在 BlockingQueue 已空的情況下,程序使用 take() 方法嘗試取出元素將會阻塞線程:使用 remove() 方法嘗試取出元素將引發異常:使用 poll() 方法嘗試取出元素將返回 false,元索不會被刪除。
掌握了 BlodcingQuene 阻塞隊列的特性之后,下面程序就可以利用 BlockingQueue 來實現線程通信了。
public class Producer extends Thread { private BlockingQueue<String> bq; public Producer(BlockingQueue<String> bq) { this.bq = bq; } public void run() { String[] strArr = new String[] {'Java','Struts','Spring' };for(int i=0;i<99999999;i++) { System.out.println(getName()+'生產者準備生產集合元素'); try {Thread.sleep(200);// 嘗試放入元素,如果隊列已滿,則線程被阻塞bq.put(strArr[i%3]); }catch(Exception ex) {ex.printStackTrace(); } System.out.println(getName()+'生產完成:'+bq); } }}public class Consumer extends Thread { private BlockingQueue<String> bq; public Consumer(BlockingQueue<String> bq) { this.bq = bq; } public void run() { while(true) { System.out.println(getName()+'消費者準備消費集合元素!'); try {Thread.sleep(200);// 嘗試取出元素,如果隊列已空,則線程被阻塞bq.take(); }catch(Exception ex) {ex.printStackTrace(); } System.out.println(getName()+'消費完成:'+bq); } }}public class BlockingQueueTest2 { public static void main(String[] args) { // 創建一個容量為1的BlockingQueue BlockingQueue<String> bq = new ArrayBlockingQueue<>(1); // 啟動3個生產者線程 new Producer(bq).start(); new Producer(bq).start(); new Producer(bq).start(); // 啟動一個消費者線程 new Consumer(bq).start(); }}
上面程序啟動了 3個生產者線程向 BlockingQueue 集合放入元素,啟動了 1個消費者線程從 BlockingQueue 集合取出元素。本程序的 BlockingQueue 集合容量為1,因此3個生產者線程無法連續放入元素,必須等待消費者線程取出一個元素后 , 3個生產者線程的其中之一才能放入一個元素。運行該程序,會看到如下圖所示的結果。
從上圖可以看出,3個生產者線程都想向 BlockingQueue 中放入元素,但只要其中一個線程向該隊列中放入元素之后,其他生產者線程就必須等待,等待消費者線程取出 BlockingQueue 隊列里的元素。
以上就是深入理解Java 線程通信的詳細內容,更多關于Java 線程通信的資料請關注好吧啦網其它相關文章!
相關文章:
1. Python獲取抖音關注列表封號賬號的實現代碼2. Warning: require(): open_basedir restriction in effect,目錄配置open_basedir報錯問題分析3. php網絡安全中命令執行漏洞的產生及本質探究4. 解決Python 進程池Pool中一些坑5. php測試程序運行速度和頁面執行速度的代碼6. Python如何讀寫CSV文件7. 三個不常見的 HTML5 實用新特性簡介8. ajax請求添加自定義header參數代碼9. python利用os模塊編寫文件復制功能——copy()函數用法10. 無線標記語言(WML)基礎之WMLScript 基礎第1/2頁
