日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關咨詢
選擇下列產品馬上在線溝通
服務時間:8:30-17:00
你可能遇到了下面的問題
關閉右側工具欄

新聞中心

這里有您想知道的互聯(lián)網營銷解決方案
一日一技:使用Asyncio如何限制協(xié)程的并發(fā)數

在昨天的直播中,有同學問道,如果使用 asyncio + httpx 實現并發(fā)請求,怎么限制請求的頻率呢?怎么限制最多只能有 x 個請求同時發(fā)出呢?我們今天給出兩種方案。

提出問題

假設如果我們同時發(fā)起12個請求,每個請求的時間不同,那么總共的請求時間大概跟最長耗時的請求差不多。我們先來寫一個用于測試的例子:

 
 
 
 
  1. import asyncio
  2. import httpx
  3. import time
  4. async def req(delay):
  5.     print(f'請求一個延遲為{delay}秒的接口')
  6.     async with httpx.AsyncClient(timeout=20) as client:
  7.         resp = await client.get(f'http://127.0.0.1:8000/sleep/{delay}')
  8.         result = resp.json()
  9.         print(result)
  10. async def main():
  11.     start = time.time()
  12.     delay_list = [3, 6, 1, 8, 2, 4, 5, 2, 7, 3, 9, 8]
  13.     task_list = []
  14.     for delay in delay_list:
  15.         task = asyncio.create_task(req(delay))
  16.         task_list.append(task)
  17.     await asyncio.gather(*task_list)
  18.     end = time.time()
  19.     print(f'一共耗時:{end - start}')
  20. asyncio.run(main())

這段代碼,使用 for 循環(huán)創(chuàng)建了12個協(xié)程任務,這些任務幾乎同時運行,于是,請求完成所有的接口,總共耗時如下圖所示:

現在的問題是,由于網站有反爬蟲機制,最多只能同時發(fā)起3個請求。那么我們怎么確保同一時間最多只有3個協(xié)程在請求網絡呢?

限制協(xié)程任務數

第一個方案跟以前限制多線程的線程數的方案相同。我們創(chuàng)建一個列表,確保列表里面最多只有3個任務,然后持續(xù)循環(huán)檢查,發(fā)現有任務完成了,就移除這個完成的任務,并加入一個新的任務,直到待爬的列表為空,這個任務列表也為空。代碼如下:

 
 
 
 
  1. import asyncio
  2. import httpx
  3. import time
  4. async def req(delay):
  5.     print(f'請求一個延遲為{delay}秒的接口')
  6.     async with httpx.AsyncClient(timeout=20) as client:
  7.         resp = await client.get(f'http://127.0.0.1:8000/sleep/{delay}')
  8.         result = resp.json()
  9.         print(result)
  10. async def main():
  11.     start = time.time()
  12.     delay_list = [3, 6, 1, 8, 2, 4, 5, 2, 7, 3, 9, 8]
  13.     task_list = []
  14.     while True:
  15.         if not delay_list and not task_list:
  16.             break
  17.         while len(task_list) < 3:
  18.             if delay_list:
  19.                 delay = delay_list.pop()
  20.                 task = asyncio.create_task(req(delay))
  21.                 task_list.append(task)
  22.             else:
  23.                 break
  24.         task_list = [task for task in task_list if not task.done()]
  25.         await asyncio.sleep(1)
  26.     end = time.time()
  27.     print(f'一共耗時:{end - start}')
  28. asyncio.run(main())

運行效果如下圖所示:

總共耗時大概28秒左右。比串行需要的58秒快了一半,但比全部同時并發(fā)多了一倍。

使用 Semaphore

asyncio 實際上自帶了一個限制協(xié)程數量的類,叫做Semaphore。我們只需要初始化它,傳入最大允許的協(xié)程數量,然后就可以通過上下文管理器來使用。我們看一下代碼:

 
 
 
 
  1. import asyncio
  2. import httpx
  3. import time
  4. async def req(delay, sem):
  5.     print(f'請求一個延遲為{delay}秒的接口')
  6.     async with sem:
  7.         async with httpx.AsyncClient(timeout=20) as client:
  8.             resp = await client.get(f'http://127.0.0.1:8000/sleep/{delay}')
  9.             result = resp.json()
  10.             print(result)
  11. async def main():
  12.     start = time.time()
  13.     delay_list = [3, 6, 1, 8, 2, 4, 5, 2, 7, 3, 9, 8]
  14.     task_list = []
  15.     sem = asyncio.Semaphore(3)
  16.     for delay in delay_list:
  17.         task = asyncio.create_task(req(delay, sem))
  18.         task_list.append(task)
  19.     await asyncio.gather(*task_list)
  20.     end = time.time()
  21.     print(f'一共耗時:{end - start}')
  22. asyncio.run(main())

運行效果如下圖所示:

耗時為22秒,比第一個方案更快。

我們來看看Semaphore的用法,它的格式為:

 
 
 
 
  1. sem = asyncio.Semaphore(同時運行的協(xié)程數量)
  2. async def func(sem):
  3.     async with sem:
  4.         這里是并發(fā)執(zhí)行的代碼
  5. task_list = []
  6. for _ in range(總共需要執(zhí)行的任務數):
  7.     task = asyncio.create_task(func(sem))
  8.     task_list.append(task)
  9. await asyncio.gather(*task_list)

當我們要限制一個協(xié)程的并發(fā)數的時候,可以在調用協(xié)程之前,先初始化一個Semaphore對象。然后把這個對象傳到需要限制并發(fā)的協(xié)程里面,在協(xié)程里面,使用異步上下文管理器包住你的正式代碼:

 
 
 
 
  1. async with sem:
  2.     正式代碼

這樣一來,如果并發(fā)數沒有達到限制,那么async with sem會瞬間執(zhí)行完成,進入里面的正式代碼中。如果并發(fā)數已經達到了限制,那么其他的協(xié)程會阻塞在async with sem這個地方,直到正在運行的某個協(xié)程完成了,退出了,才會放行一個新的協(xié)程去替換掉這個已經完成的協(xié)程。

這個寫法其實跟多線程的加鎖很像。只不過鎖是確保同一個時間只有一個線程在運行,而Semaphore可以人為指定能有多少個協(xié)程同時運行。

如何限制1分鐘內能夠運行的協(xié)程數

可能同學看了上面的例子以后,只知道如何限制同時運行的協(xié)程數。但是怎么限制在一段時間里同時運行的協(xié)程數呢?

其實非常簡單,在并發(fā)的協(xié)程里面加個 asyncio.sleep 就可以了。例如上面的例子,我想限制每分鐘只能有3個協(xié)程,那么可以把代碼改為:

 
 
 
 
  1. async def req(delay, sem):
  2.     print(f'請求一個延遲為{delay}秒的接口')
  3.     async with sem:
  4.         async with httpx.AsyncClient(timeout=20) as client:
  5.             resp = await client.get(f'http://127.0.0.1:8000/sleep/{delay}')
  6.             result = resp.json()
  7.             print(result)
  8.     await asyncio.sleep(60)

總結

如果大家要限制協(xié)程的并發(fā)數,那么最簡單的辦法就是使用asyncio.Semaphore。但需要注意的是,只能在啟動協(xié)程之前初始化它,然后傳給協(xié)程。要確保所有并發(fā)協(xié)程拿到的是同一個Semaphore對象。

當然,你的程序里面,可能有多個不同的部分,有些部分限制并發(fā)數為 a,有些部分限制并發(fā)數為 b。那么你可以初始化多個Semaphore對象,分別傳給不同的協(xié)程。

本文轉載自微信公眾號「未聞Code」,可以通過以下二維碼關注。轉載本文請聯(lián)系未聞Code公眾號。


標題名稱:一日一技:使用Asyncio如何限制協(xié)程的并發(fā)數
文章地址:http://m.5511xx.com/article/cdcoeci.html