av一区二区在线观看_亚洲男人的天堂网站_日韩亚洲视频_在线成人免费_欧美日韩精品免费观看视频_久草视

您的位置:首頁技術(shù)文章
文章詳情頁

Python如何實現(xiàn)線程間通信

瀏覽:7日期:2022-07-15 17:49:12

問題

你的程序中有多個線程,你需要在這些線程之間安全地交換信息或數(shù)據(jù)

解決方案

從一個線程向另一個線程發(fā)送數(shù)據(jù)最安全的方式可能就是使用 queue 庫中的隊列了。創(chuàng)建一個被多個線程共享的 Queue 對象,這些線程通過使用 put() 和 get() 操作來向隊列中添加或者刪除元素。 例如:

from queue import Queuefrom threading import Thread# A thread that produces datadef producer(out_q): while True: # Produce some data ... out_q.put(data)# A thread that consumes datadef consumer(in_q): while True:# Get some data data = in_q.get() # Process the data ...# Create the shared queue and launch both threadsq = Queue()t1 = Thread(target=consumer, args=(q,))t2 = Thread(target=producer, args=(q,))t1.start()t2.start()

Queue 對象已經(jīng)包含了必要的鎖,所以你可以通過它在多個線程間多安全地共享數(shù)據(jù)。 當(dāng)使用隊列時,協(xié)調(diào)生產(chǎn)者和消費者的關(guān)閉問題可能會有一些麻煩。一個通用的解決方法是在隊列中放置一個特殊的值,當(dāng)消費者讀到這個值的時候,終止執(zhí)行。例如:

from queue import Queuefrom threading import Thread# Object that signals shutdown_sentinel = object()# A thread that produces datadef producer(out_q): while running: # Produce some data ... out_q.put(data) # Put the sentinel on the queue to indicate completion out_q.put(_sentinel)# A thread that consumes datadef consumer(in_q): while True: # Get some data data = in_q.get() # Check for termination if data is _sentinel: in_q.put(_sentinel) break # Process the data ...

本例中有一個特殊的地方:消費者在讀到這個特殊值之后立即又把它放回到隊列中,將之傳遞下去。這樣,所有監(jiān)聽這個隊列的消費者線程就可以全部關(guān)閉了。 盡管隊列是最常見的線程間通信機制,但是仍然可以自己通過創(chuàng)建自己的數(shù)據(jù)結(jié)構(gòu)并添加所需的鎖和同步機制來實現(xiàn)線程間通信。最常見的方法是使用 Condition 變量來包裝你的數(shù)據(jù)結(jié)構(gòu)。下邊這個例子演示了如何創(chuàng)建一個線程安全的優(yōu)先級隊列

import heapqimport threadingclass PriorityQueue: def __init__(self): self._queue = [] self._count = 0 self._cv = threading.Condition() def put(self, item, priority): with self._cv: heapq.heappush(self._queue, (-priority, self._count, item)) self._count += 1 self._cv.notify() def get(self): with self._cv: while len(self._queue) == 0:self._cv.wait() return heapq.heappop(self._queue)[-1]

使用隊列來進行線程間通信是一個單向、不確定的過程。通常情況下,你沒有辦法知道接收數(shù)據(jù)的線程是什么時候接收到的數(shù)據(jù)并開始工作的。不過隊列對象提供一些基本完成的特性,比如下邊這個例子中的 task_done() 和 join() :

from queue import Queuefrom threading import Thread# A thread that produces datadef producer(out_q): while running: # Produce some data ... out_q.put(data)# A thread that consumes datadef consumer(in_q): while True: # Get some data data = in_q.get() # Process the data ... # Indicate completion in_q.task_done()# Create the shared queue and launch both threadsq = Queue()t1 = Thread(target=consumer, args=(q,))t2 = Thread(target=producer, args=(q,))t1.start()t2.start()# Wait for all produced items to be consumedq.join()

如果一個線程需要在一個“消費者”線程處理完特定的數(shù)據(jù)項時立即得到通知,你可以把要發(fā)送的數(shù)據(jù)和一個 Event 放到一起使用,這樣“生產(chǎn)者”就可以通過這個Event對象來監(jiān)測處理的過程了。示例如下:

from queue import Queuefrom threading import Thread, Event# A thread that produces datadef producer(out_q): while running: # Produce some data ... # Make an (data, event) pair and hand it to the consumer evt = Event() out_q.put((data, evt)) ... # Wait for the consumer to process the item evt.wait()# A thread that consumes datadef consumer(in_q): while True: # Get some data data, evt = in_q.get() # Process the data ... # Indicate completion evt.set()

討論

基于簡單隊列編寫多線程程序在多數(shù)情況下是一個比較明智的選擇。從線程安全隊列的底層實現(xiàn)來看,你無需在你的代碼中使用鎖和其他底層的同步機制,這些只會把你的程序弄得亂七八糟。此外,使用隊列這種基于消息的通信機制可以被擴展到更大的應(yīng)用范疇,比如,你可以把你的程序放入多個進程甚至是分布式系統(tǒng)而無需改變底層的隊列結(jié)構(gòu)。 使用線程隊列有一個要注意的問題是,向隊列中添加數(shù)據(jù)項時并不會復(fù)制此數(shù)據(jù)項,線程間通信實際上是在線程間傳遞對象引用。如果你擔(dān)心對象的共享狀態(tài),那你最好只傳遞不可修改的數(shù)據(jù)結(jié)構(gòu)(如:整型、字符串或者元組)或者一個對象的深拷貝。例如:

from queue import Queuefrom threading import Threadimport copy# A thread that produces datadef producer(out_q): while True: # Produce some data ... out_q.put(copy.deepcopy(data))# A thread that consumes datadef consumer(in_q): while True: # Get some data data = in_q.get() # Process the data ...

Queue 對象提供一些在當(dāng)前上下文很有用的附加特性。比如在創(chuàng)建 Queue 對象時提供可選的 size 參數(shù)來限制可以添加到隊列中的元素數(shù)量。對于“生產(chǎn)者”與“消費者”速度有差異的情況,為隊列中的元素數(shù)量添加上限是有意義的。比如,一個“生產(chǎn)者”產(chǎn)生項目的速度比“消費者” “消費”的速度快,那么使用固定大小的隊列就可以在隊列已滿的時候阻塞隊列,以免未預(yù)期的連鎖效應(yīng)擴散整個程序造成死鎖或者程序運行失常。在通信的線程之間進行“流量控制”是一個看起來容易實現(xiàn)起來困難的問題。如果你發(fā)現(xiàn)自己曾經(jīng)試圖通過擺弄隊列大小來解決一個問題,這也許就標(biāo)志著你的程序可能存在脆弱設(shè)計或者固有的可伸縮問題。 get() 和 put() 方法都支持非阻塞方式和設(shè)定超時,例如:

import queueq = queue.Queue()try: data = q.get(block=False)except queue.Empty: ...try: q.put(item, block=False)except queue.Full: ...try: data = q.get(timeout=5.0)except queue.Empty: ...

這些操作都可以用來避免當(dāng)執(zhí)行某些特定隊列操作時發(fā)生無限阻塞的情況,比如,一個非阻塞的 put() 方法和一個固定大小的隊列一起使用,這樣當(dāng)隊列已滿時就可以執(zhí)行不同的代碼。比如輸出一條日志信息并丟棄。

def producer(q): ... try: q.put(item, block=False) except queue.Full: log.warning(’queued item %r discarded!’, item)

如果你試圖讓消費者線程在執(zhí)行像 q.get() 這樣的操作時,超時自動終止以便檢查終止標(biāo)志,你應(yīng)該使用 q.get() 的可選參數(shù) timeout ,如下:

_running = Truedef consumer(q): while _running: try: item = q.get(timeout=5.0) # Process item ... except queue.Empty: pass

最后,有 q.qsize() , q.full() , q.empty() 等實用方法可以獲取一個隊列的當(dāng)前大小和狀態(tài)。但要注意,這些方法都不是線程安全的??赡苣銓σ粋€隊列使用 empty() 判斷出這個隊列為空,但同時另外一個線程可能已經(jīng)向這個隊列中插入一個數(shù)據(jù)項。所以,你最好不要在你的代碼中使用這些方法。

以上就是Python如何實現(xiàn)線程間通信的詳細(xì)內(nèi)容,更多關(guān)于Python 線程間通信的資料請關(guān)注好吧啦網(wǎng)其它相關(guān)文章!

標(biāo)簽: Python 編程
相關(guān)文章:
主站蜘蛛池模板: 91视频亚洲 | 91免费在线 | 亚洲成人国产综合 | 日韩aⅴ片| 一区二区三区网站 | 性做久久久久久免费观看欧美 | 国产激情精品视频 | 免费a网 | 日韩一区不卡 | 色婷婷综合久久久中字幕精品久久 | 久久久久综合 | 一级看片免费视频囗交动图 | 亚洲三级视频 | 黄色免费三级 | 九九热免费视频在线观看 | 久久99深爱久久99精品 | 久久午夜国产精品www忘忧草 | 麻豆一区一区三区四区 | 日本小电影在线 | 日韩精品在线免费观看视频 | 97国产精品视频人人做人人爱 | 欧美日韩一区二区三区视频 | 国产日韩精品视频 | 在线观看国产精品一区二区 | 国产1区| 国产一级黄色网 | 99亚洲精品 | 亚洲日本乱码在线观看 | 日韩在线国产精品 | 国产有码| 超碰97人人人人人蜜桃 | 精品一区二区三区四区 | 精品久久久久久久 | 91精品久久久久久久久中文字幕 | 超碰免费在线 | 99精品欧美一区二区三区综合在线 | 韩日在线 | 日韩欧美国产一区二区三区 | 亚洲国产成人精品女人久久久 | 啪视频在线 | 天天拍天天操 |