淺談Java中的Queue家族
先看下Queue的繼承關(guān)系和其中定義的方法:
Queue繼承自Collection,Collection繼承自Iterable。
Queue有三類主要的方法,我們用個(gè)表格來看一下他們的區(qū)別:
方法類型 方法名稱 方法名稱 區(qū)別 Insert add offer 兩個(gè)方法都表示向Queue中添加某個(gè)元素,不同之處在于添加失敗的情況,add只會(huì)返回true,如果添加失敗,會(huì)拋出異常。offer在添加失敗的時(shí)候會(huì)返回false。所以對(duì)那些有固定長(zhǎng)度的Queue,優(yōu)先使用offer方法。 Remove remove poll 如果Queue是空的情況下,remove會(huì)拋出異常,而poll會(huì)返回null。 Examine element peek 獲取Queue頭部的元素,但不從Queue中刪除。兩者的區(qū)別還是在于Queue為空的情況下,element會(huì)拋出異常,而peek返回null。注意,因?yàn)閷?duì)poll和peek來說null是有特殊含義的,所以一般來說Queue中禁止插入null,但是在實(shí)現(xiàn)中還是有一些類允許插入null比如LinkedList。
盡管如此,我們?cè)谑褂弥羞€是要避免插入null元素。
Queue的分類一般來說Queue可以分為BlockingQueue,Deque和TransferQueue三種。
BlockingQueueBlockingQueue是Queue的一種實(shí)現(xiàn),它提供了兩種額外的功能:
當(dāng)當(dāng)前Queue是空的時(shí)候,從BlockingQueue中獲取元素的操作會(huì)被阻塞。當(dāng)當(dāng)前Queue達(dá)到最大容量的時(shí)候,插入BlockingQueue的操作會(huì)被阻塞。
BlockingQueue的操作可以分為下面四類:
操作類型Throws exceptionSpecial valueBlocksTimes outInsertadd(e)offer(e)put(e)offer(e, time, unit)Removeremove()poll()take()poll(time, unit)Examineelement()peek()not applicablenot applicable
第一類是會(huì)拋出異常的操作,當(dāng)遇到插入失敗,隊(duì)列為空的時(shí)候拋出異常。
第二類是不會(huì)拋出異常的操作。
第三類是會(huì)Block的操作。當(dāng)Queue為空或者達(dá)到最大容量的時(shí)候。
第四類是time out的操作,在給定的時(shí)間里會(huì)Block,超時(shí)會(huì)直接返回。
BlockingQueue是線程安全的Queue,可以在生產(chǎn)者消費(fèi)者模式的多線程中使用,如下所示:
class Producer implements Runnable { private final BlockingQueue queue; Producer(BlockingQueue q) { queue = q; } public void run() { try { while (true) { queue.put(produce()); } } catch (InterruptedException ex) { ... handle ...} } Object produce() { ... } } class Consumer implements Runnable { private final BlockingQueue queue; Consumer(BlockingQueue q) { queue = q; } public void run() { try { while (true) { consume(queue.take()); } } catch (InterruptedException ex) { ... handle ...} } void consume(Object x) { ... } } class Setup { void main() { BlockingQueue q = new SomeQueueImplementation(); Producer p = new Producer(q); Consumer c1 = new Consumer(q); Consumer c2 = new Consumer(q); new Thread(p).start(); new Thread(c1).start(); new Thread(c2).start(); } }
最后,在一個(gè)線程中向BlockQueue中插入元素之前的操作happens-before另外一個(gè)線程中從BlockQueue中刪除或者獲取的操作。
DequeDeque是Queue的子類,它代表double ended queue,也就是說可以從Queue的頭部或者尾部插入和刪除元素。
同樣的,我們也可以將Deque的方法用下面的表格來表示,Deque的方法可以分為對(duì)頭部的操作和對(duì)尾部的操作:
方法類型 Throws exception Special value Throws exception Special value Insert addFirst(e) offerFirst(e) addLast(e) offerLast(e) Remove removeFirst() pollFirst() removeLast() pollLast() Examine getFirst() peekFirst() getLast() peekLast()和Queue的方法描述基本一致,這里就不多講了。
當(dāng)Deque以 FIFO (First-In-First-Out)的方法處理元素的時(shí)候,Deque就相當(dāng)于一個(gè)Queue。
當(dāng)Deque以LIFO (Last-In-First-Out)的方式處理元素的時(shí)候,Deque就相當(dāng)于一個(gè)Stack。
TransferQueueTransferQueue繼承自BlockingQueue,為什么叫Transfer呢?因?yàn)門ransferQueue提供了一個(gè)transfer的方法,生產(chǎn)者可以調(diào)用這個(gè)transfer方法,從而等待消費(fèi)者調(diào)用take或者poll方法從Queue中拿取數(shù)據(jù)。
還提供了非阻塞和timeout版本的tryTransfer方法以供使用。
我們舉個(gè)TransferQueue實(shí)現(xiàn)的生產(chǎn)者消費(fèi)者的問題。
先定義一個(gè)生產(chǎn)者:
@Slf4j@Data@AllArgsConstructorclass Producer implements Runnable { private TransferQueue<String> transferQueue; private String name; private Integer messageCount; public static final AtomicInteger messageProduced = new AtomicInteger(); @Override public void run() {for (int i = 0; i < messageCount; i++) { try {boolean added = transferQueue.tryTransfer( '第'+i+'個(gè)', 2000, TimeUnit.MILLISECONDS);log.info('transfered {} 是否成功: {}','第'+i+'個(gè)',added);if(added){ messageProduced.incrementAndGet();} } catch (InterruptedException e) {log.error(e.getMessage(),e); }}log.info('total transfered {}',messageProduced.get()); }}
在生產(chǎn)者的run方法中,我們調(diào)用了tryTransfer方法,等待2秒鐘,如果沒成功則直接返回。
再定義一個(gè)消費(fèi)者:
@Slf4j@Data@AllArgsConstructorpublic class Consumer implements Runnable { private TransferQueue<String> transferQueue; private String name; private int messageCount; public static final AtomicInteger messageConsumed = new AtomicInteger(); @Override public void run() {for (int i = 0; i < messageCount; i++) { try {String element = transferQueue.take();log.info('take {}',element );messageConsumed.incrementAndGet();Thread.sleep(500); } catch (InterruptedException e) {log.error(e.getMessage(),e); }}log.info('total consumed {}',messageConsumed.get()); }}
在run方法中,調(diào)用了transferQueue.take方法來取消息。
下面先看一下一個(gè)生產(chǎn)者,零個(gè)消費(fèi)者的情況:
@Testpublic void testOneProduceZeroConsumer() throws InterruptedException { TransferQueue<String> transferQueue = new LinkedTransferQueue<>(); ExecutorService exService = Executors.newFixedThreadPool(10); Producer producer = new Producer(transferQueue, 'ProducerOne', 5); exService.execute(producer); exService.awaitTermination(50000, TimeUnit.MILLISECONDS); exService.shutdown();}
輸出結(jié)果:
[pool-1-thread-1] INFO com.flydean.Producer - transfered 第0個(gè) 是否成功: false
[pool-1-thread-1] INFO com.flydean.Producer - transfered 第1個(gè) 是否成功: false
[pool-1-thread-1] INFO com.flydean.Producer - transfered 第2個(gè) 是否成功: false
[pool-1-thread-1] INFO com.flydean.Producer - transfered 第3個(gè) 是否成功: false
[pool-1-thread-1] INFO com.flydean.Producer - transfered 第4個(gè) 是否成功: false
[pool-1-thread-1] INFO com.flydean.Producer - total transfered 0
可以看到,因?yàn)闆]有消費(fèi)者,所以消息并沒有發(fā)送成功。
再看下一個(gè)有消費(fèi)者的情況:
@Testpublic void testOneProduceOneConsumer() throws InterruptedException { TransferQueue<String> transferQueue = new LinkedTransferQueue<>(); ExecutorService exService = Executors.newFixedThreadPool(10); Producer producer = new Producer(transferQueue, 'ProducerOne', 2); Consumer consumer = new Consumer(transferQueue, 'ConsumerOne', 2); exService.execute(producer); exService.execute(consumer); exService.awaitTermination(50000, TimeUnit.MILLISECONDS); exService.shutdown();}
輸出結(jié)果:
[pool-1-thread-2] INFO com.flydean.Consumer - take 第0個(gè)
[pool-1-thread-1] INFO com.flydean.Producer - transfered 第0個(gè) 是否成功: true
[pool-1-thread-2] INFO com.flydean.Consumer - take 第1個(gè)
[pool-1-thread-1] INFO com.flydean.Producer - transfered 第1個(gè) 是否成功: true
[pool-1-thread-1] INFO com.flydean.Producer - total transfered 2
[pool-1-thread-2] INFO com.flydean.Consumer - total consumed 2
可以看到Producer和Consumer是一個(gè)一個(gè)來生產(chǎn)和消費(fèi)的。
以上就是淺談Java中的Queue家族的詳細(xì)內(nèi)容,更多關(guān)于Java中的Queue家族的資料請(qǐng)關(guān)注好吧啦網(wǎng)其它相關(guān)文章!
相關(guān)文章:
1. GIT相關(guān)-IDEA/ECLIPSE工具配置的教程詳解2. Django Auth用戶認(rèn)證組件實(shí)現(xiàn)代碼3. 八種Vue組件間通訊方式合集(推薦)4. idea設(shè)置代碼格式化的方法步驟5. vue3自定義插件的作用場(chǎng)景及使用示例詳解6. XML 增、刪、改和查示例7. php bugs代碼審計(jì)基礎(chǔ)詳解8. 完美實(shí)現(xiàn)CSS垂直居中的11種方法9. ASP中if語句、select 、while循環(huán)的使用方法10. vue項(xiàng)目登錄成功拿到令牌跳轉(zhuǎn)失敗401無登錄信息的解決
