新聞中心
4.2 版中的新功能。

使用類似于標(biāo)準(zhǔn)庫提供給線程的同步原語來協(xié)調(diào)協(xié)程。 這些類與標(biāo)準(zhǔn)庫的 ?asyncio包中提供的類非常相似。
請(qǐng)注意,這些原語實(shí)際上不是線程安全的,并且不能用來代替標(biāo)準(zhǔn)庫的線程模塊中的那些原語——它們旨在協(xié)調(diào)單線程應(yīng)用程序中的 Tornado 協(xié)程,而不是保護(hù)多線程應(yīng)用程序中的共享對(duì)象。
Condition
class tornado.locks.Condition
一個(gè)?condition?允許一個(gè)或多個(gè)協(xié)程等待直到收到通知。
與標(biāo)準(zhǔn) ?threading.Condition? 類似,但不需要獲取和釋放的底層鎖。
使用 ?Condition?,協(xié)程可以等待其他協(xié)程的通知:
from tornado import gen
from tornado.ioloop import IOLoop
from tornado.locks import Condition
condition = Condition()
async def waiter():
print("I'll wait right here")
await condition.wait()
print("I'm done waiting")
async def notifier():
print("About to notify")
condition.notify()
print("Done notifying")
async def runner():
# Wait for waiter() and notifier() in parallel
await gen.multi([waiter(), notifier()])
IOLoop.current().run_sync(runner)結(jié)果為:
I'll wait right here
About to notify
Done notifying
I'm done waiting?wait接受一個(gè)可選的 ?timeout參數(shù),它可以是一個(gè)絕對(duì)時(shí)間戳:
io_loop = IOLoop.current()
# Wait up to 1 second for a notification.
await condition.wait(timeout=io_loop.time() + 1)?datetime.timedelta? 表示相對(duì)于當(dāng)前時(shí)間的超時(shí):
# Wait up to 1 second.
await condition.wait(timeout=datetime.timedelta(seconds=1))如果在截止日期之前沒有通知,則該方法返回 False。
wait(timeout: Union[float, datetime.timedelta, None] = None) → Awaitable[bool]
如果條件被通知,則返回一個(gè) ?Future解析 ?True?,或者在超時(shí)后解析為 ?False?。
notify(n: int = 1) → None
喚醒n個(gè)waiters
notify_all() → None
喚醒所有waiters
事件
class tornado.locks.Event
一個(gè)事件會(huì)阻塞協(xié)程,直到其內(nèi)部標(biāo)志設(shè)置為 ?True。
類似于?threading.Event?。
協(xié)程可以等待設(shè)置事件。 一旦設(shè)置,對(duì) ?yield event.wait() 的調(diào)用將不會(huì)阻塞,除非事件已被清除:
from tornado import gen
from tornado.ioloop import IOLoop
from tornado.locks import Event
event = Event()
async def waiter():
print("Waiting for event")
await event.wait()
print("Not waiting this time")
await event.wait()
print("Done")
async def setter():
print("About to set the event")
event.set()
async def runner():
await gen.multi([waiter(), setter()])
IOLoop.current().run_sync(runner)結(jié)果如下:
Waiting for event
About to set the event
Not waiting this time
Doneis_set() → bool
如果內(nèi)部標(biāo)志為?True?,則返回?True?
set() → None
將內(nèi)部標(biāo)志設(shè)置為 ?True?。 所有的waiters都被喚醒了。
設(shè)置標(biāo)志后調(diào)用 ?wait不會(huì)阻塞。
clear() → None
將內(nèi)部標(biāo)志重置為 ?False?。
調(diào)用 ?wait將阻塞,直到調(diào)用 ?set ?。
wait(timeout: Union[float, datetime.timedelta, None] = None) → Awaitable[None]
阻塞直到內(nèi)部標(biāo)志為?True?。
返回一個(gè) ?awaitable?,它在超時(shí)后引發(fā) ?tornado.util.TimeoutError?。
信號(hào)
class tornado.locks.Semaphore(value: int = 1)
在阻塞之前可以獲取固定次數(shù)的鎖。
信號(hào)量管理一個(gè)計(jì)數(shù)器,表示釋放調(diào)用的數(shù)量減去獲取調(diào)用的數(shù)量,再加上一個(gè)初始值。 如果需要,?acquire方法會(huì)阻塞,直到它可以返回而不使計(jì)數(shù)器為負(fù)。
信號(hào)量限制對(duì)共享資源的訪問。 一次允許兩個(gè)worker訪問:
from tornado import gen
from tornado.ioloop import IOLoop
from tornado.locks import Semaphore
sem = Semaphore(2)
async def worker(worker_id):
await sem.acquire()
try:
print("Worker %d is working" % worker_id)
await use_some_resource()
finally:
print("Worker %d is done" % worker_id)
sem.release()
async def runner():
# Join all workers.
await gen.multi([worker(i) for i in range(3)])
IOLoop.current().run_sync(runner)結(jié)果如下:
Worker 0 is working
Worker 1 is working
Worker 0 is done
Worker 2 is working
Worker 1 is done
Worker 2 is doneworker 0 和 1 被允許同時(shí)運(yùn)行,但worker 2 等到信號(hào)量被worker 0 釋放一次。
信號(hào)量可以用作異步上下文管理器:
async def worker(worker_id):
async with sem:
print("Worker %d is working" % worker_id)
await use_some_resource()
# Now the semaphore has been released.
print("Worker %d is done" % worker_id)為了與舊版本的 python 兼容,?acquire是一個(gè)上下文管理器,因此 worker 也可以寫成:
@gen.coroutine
def worker(worker_id):
with (yield sem.acquire()):
print("Worker %d is working" % worker_id)
yield use_some_resource()
# Now the semaphore has been released.
print("Worker %d is done" % worker_id)release() → None
增加計(jì)數(shù)器并喚醒一個(gè)waiter。
acquire(timeout: Union[float, datetime.timedelta, None] = None) → Awaitable[tornado.locks._ReleasingContextManager]
減少計(jì)數(shù)器。 返回一個(gè)可等待的。
如果計(jì)數(shù)器為零,則阻塞并等待釋放。awaitable在截止日期后引發(fā) ?TimeoutError?。
有界信號(hào)量
class tornado.locks.BoundedSemaphore(value: int = 1)
防止 ?release()? 被調(diào)用太多次的信號(hào)量。
如果 ?release增加信號(hào)量的值超過初始值,它會(huì)引發(fā) ?ValueError。 信號(hào)量主要用于保護(hù)容量有限的資源,因此信號(hào)量釋放次數(shù)過多是錯(cuò)誤的標(biāo)志。
release() → None
增加計(jì)數(shù)器并喚醒一個(gè)waiter。
acquire(timeout: Union[float, datetime.timedelta, None] = None) → Awaitable[tornado.locks._ReleasingContextManager]
減少計(jì)數(shù)器。 返回一個(gè)可等待的。
如果計(jì)數(shù)器為零,則阻塞并等待釋放。 ?awaitable在截止日期后引發(fā) ?TimeoutError?。
鎖
class tornado.locks.Lock
協(xié)程的鎖。
鎖開始解鎖,并立即獲取鎖。 當(dāng)它被鎖定時(shí),產(chǎn)生?acquire?的協(xié)程等待直到另一個(gè)協(xié)程調(diào)用?release?。
釋放未鎖定的鎖會(huì)引發(fā) ?RuntimeError?。
?Lock可以用作帶有 ?async with? 語句的異步上下文管理器:
>>> from tornado import locks
>>> lock = locks.Lock()
>>>
>>> async def f():
... async with lock:
... # Do something holding the lock.
... pass
...
... # Now the lock is released.為了與舊版本的 Python 兼容,?acquire方法異步返回一個(gè)常規(guī)上下文管理器:
>>> async def f2():
... with (yield lock.acquire()):
... # Do something holding the lock.
... pass
...
... # Now the lock is released.acquire(timeout: Union[float, datetime.timedelta, None] = None) → Awaitable[tornado.locks._ReleasingContextManager]
嘗試鎖定。 返回一個(gè)?awaitable?。
返回一個(gè) ?awaitable?,它在超時(shí)后引發(fā) ?tornado.util.TimeoutError?。
release() → None
解鎖。
排隊(duì)等待獲取的第一個(gè)協(xié)程獲得鎖。
如果未鎖定,則引發(fā) ?RuntimeError?。
文章標(biāo)題:創(chuàng)新互聯(lián)Tornado教程:Tornado 同步原語
轉(zhuǎn)載注明:http://m.5511xx.com/article/coheejs.html


咨詢
建站咨詢
