日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
一行代碼完成并行任務(wù)

眾所周知,Python的并行處理能力很不理想。我認為如果不考慮線程和GIL的標準參數(shù)(它們大多是合法的),其原因 不是因為技術(shù)不到位而是我們的使用方法不恰當。大多數(shù)關(guān)于Python線程和多進程的教材雖然都很出色,但是內(nèi)容繁瑣冗長。它們的確在開篇鋪陳了許多有用 的信息,但往往都不會涉及真正能提高日常工作的部分。

目前成都創(chuàng)新互聯(lián)公司已為上千家的企業(yè)提供了網(wǎng)站建設(shè)、域名、網(wǎng)絡(luò)空間、網(wǎng)站托管、企業(yè)網(wǎng)站設(shè)計、鐵東網(wǎng)站維護等服務(wù),公司將堅持客戶導向、應(yīng)用為本的策略,正道將秉承"和諧、參與、激情"的文化,與客戶和合作伙伴齊心協(xié)力一起成長,共同發(fā)展。

經(jīng)典例子

DDG上以“Python threading tutorial (Python線程教程)”為關(guān)鍵字的熱門搜索結(jié)果表明:幾乎每篇文章中給出的例子都是相同的類+隊列。

事實上,它們就是以下這段使用producer/Consumer來處理線程/多進程的代碼示例:

 
 
  1. #Example.py
  2. '''''
  3. Standard Producer/Consumer Threading Pattern
  4. '''
  5.  
  6. import time
  7. import threading
  8. import Queue
  9.  
  10. class Consumer(threading.Thread):
  11.     def __init__(self, queue):
  12.         threading.Thread.__init__(self)
  13.         self._queue = queue
  14.  
  15.     def run(self):
  16.         while True:
  17.             # queue.get() blocks the current thread until
  18.             # an item is retrieved.
  19.             msg = self._queue.get()
  20.             # Checks if the current message is
  21.             # the "Poison Pill"
  22.             if isinstance(msg, str) and msg == 'quit':
  23.                 # if so, exists the loop
  24.                 break
  25.             # "Processes" (or in our case, prints) the queue item  
  26.             print "I'm a thread, and I received %s!!" % msg
  27.         # Always be friendly!
  28.         print 'Bye byes!'
  29.  
  30. def Producer():
  31.     # Queue is used to share items between
  32.     # the threads.
  33.     queue = Queue.Queue()
  34.  
  35.     # Create an instance of the worker
  36.     worker = Consumer(queue)
  37.     # start calls the internal run() method to
  38.     # kick off the thread
  39.     worker.start()
  40.  
  41.     # variable to keep track of when we started
  42.     start_time = time.time()
  43.     # While under 5 seconds..
  44.     while time.time() - start_time < 5:
  45.         # "Produce" a piece of work and stick it in
  46.         # the queue for the Consumer to process
  47.         queue.put('something at %s' % time.time())
  48.         # Sleep a bit just to avoid an absurd number of messages
  49.         time.sleep(1)
  50.  
  51.     # This the "poison pill" method of killing a thread.
  52.     queue.put('quit')
  53.     # wait for the thread to close down
  54.     worker.join()
  55.  
  56. if __name__ == '__main__':
  57.     Producer()

唔…….感覺有點像Java。

我現(xiàn)在并不想說明使用Producer / Consume來解決線程/多進程的方法是錯誤的——因為它肯定正確,而且在很多情況下它是最佳方法。但我不認為這是平時寫代碼的最佳選擇。

它的問題所在(個人觀點)

首先,你需要創(chuàng)建一個樣板式的鋪墊類。然后,你再創(chuàng)建一個隊列,通過其傳遞對象和監(jiān)管隊列的兩端來完成任務(wù)。(如果你想實現(xiàn)數(shù)據(jù)的交換或存儲,通常還涉及另一個隊列的參與)。

Worker越多,問題越多。

接下來,你應(yīng)該會創(chuàng)建一個worker類的pool來提高Python的速度。下面是IBM tutorial給出的較好的方法。這也是程序員們在利用多線程檢索web頁面時的常用方法。

 
 
  1. #Example2.py
  2. '''''
  3. A more realistic thread pool example
  4. '''
  5.  
  6. import time
  7. import threading
  8. import Queue
  9. import urllib2
  10.  
  11. class Consumer(threading.Thread):
  12.     def __init__(self, queue):
  13.         threading.Thread.__init__(self)
  14.         self._queue = queue
  15.  
  16.     def run(self):
  17.         while True:
  18.             content = self._queue.get()
  19.             if isinstance(content, str) and content == 'quit':
  20.                 break
  21.             response = urllib2.urlopen(content)
  22.         print 'Bye byes!'
  23.  
  24. def Producer():
  25.     urls = [
  26.         'http://www.python.org', 'http://www.yahoo.com'
  27.         'http://www.scala.org', 'http://www.google.com'
  28.         # etc..
  29.     ]
  30.     queue = Queue.Queue()
  31.     worker_threads = build_worker_pool(queue, 4)
  32.     start_time = time.time()
  33.  
  34.     # Add the urls to process
  35.     for url in urls:
  36.         queue.put(url) 
  37.     # Add the poison pillv
  38.     for worker in worker_threads:
  39.         queue.put('quit')
  40.     for worker in worker_threads:
  41.         worker.join()
  42.  
  43.     print 'Done! Time taken: {}'.format(time.time() - start_time)
  44.  
  45. def build_worker_pool(queue, size):
  46.     workers = []
  47.     for _ in range(size):
  48.         worker = Consumer(queue)
  49.         worker.start()
  50.         workers.append(worker)
  51.     return workers
  52.  
  53. if __name__ == '__main__':
  54.     Producer()

它的確能運行,但是這些代碼多么復(fù)雜阿!它包括了初始化方法、線程跟蹤列表以及和我一樣容易在死鎖問題上出錯的人的噩夢——大量的join語句。而這些還僅僅只是繁瑣的開始!

我們目前為止都完成了什么?基本上什么都沒有。上面的代碼幾乎一直都只是在進行傳遞。這是很基礎(chǔ)的方法,很容易出錯(該死,我剛才忘了在隊列對象上還需要調(diào)用task_done()方法(但是我懶得修改了)),性價比很低。還好,我們還有更好的方法。

#p#

介紹:Map

Map是一個很棒的小功能,同時它也是Python并行代碼快速運行的關(guān)鍵。給不熟悉的人講解一下吧,map是從函數(shù)語言Lisp來的。map函數(shù)能夠按序映射出另一個函數(shù)。例如

 
 
  1. urls = ['http://www.yahoo.com', 'http://www.reddit.com']
  2. results = map(urllib2.urlopen, urls)

這里調(diào)用urlopen方法來把調(diào)用結(jié)果全部按序返回并存儲到一個列表里。就像:

 
 
  1. results = []
  2. for url in urls:
  3.     results.append(urllib2.urlopen(url))

Map按序處理這些迭代。調(diào)用這個函數(shù),它就會返回給我們一個按序存儲著結(jié)果的簡易列表。

為什么它這么厲害呢?因為只要有了合適的庫,map能使并行運行得十分流暢!

有兩個能夠支持通過map函數(shù)來完成并行的庫:一個是multiprocessing,另一個是鮮為人知但功能強大的子文件:multiprocessing.dummy。

題外話:這個是什么?你從來沒聽說過dummy多進程庫?我也是最近才知道的。它在多進程的說明文檔里面僅僅只被提到了句。而且那一句就是大概讓你知道有這么個東西的存在。我敢說,這樣幾近拋售的做法造成的后果是不堪設(shè)想的!

Dummy就是多進程模塊的克隆文件。唯一不同的是,多進程模塊使用的是進程,而dummy則使用線程(當然,它有所有 Python常見的限制)。也就是說,數(shù)據(jù)由一個傳遞給另一個。這能夠使得數(shù)據(jù)輕松的在這兩個之間進行前進和回躍,特別是對于探索性程序來說十分有用,因 為你不用確定框架調(diào)用到底是IO 還是CPU模式。

準備開始

要做到通過map函數(shù)來完成并行,你應(yīng)該先導入裝有它們的模塊:

 
 
  1. from multiprocessing import Pool
  2. from multiprocessing.dummy import Pool as ThreadPool

再初始化:

 
 
  1. pool = ThreadPool()

這簡單的一句就能代替我們的build_worker_pool 函數(shù)在example2.py中的所有工作。換句話說,它創(chuàng)建了許多有效的worker,啟動它們來為接下來的工作做準備,以及把它們存儲在不同的位置,方便使用。

Pool對象需要一些參數(shù),但最重要的是:進程。它決定pool中的worker數(shù)量。如果你不填的話,它就會默認為你電腦的內(nèi)核數(shù)值。

如果你在CPU模式下使用多進程pool,通常內(nèi)核數(shù)越大速度就越快(還有很多其它因素)。但是,當進行線程或者處理網(wǎng)絡(luò)綁定之類的工作時,情況會比較復(fù)雜所以應(yīng)該使用pool的準確大小。

 
 
  1. pool = ThreadPool(4) # Sets the pool size to 4

如果你運行過多線程,多線程間的切換將會浪費許多時間,所以你最好耐心調(diào)試出最適合的任務(wù)數(shù)。

我們現(xiàn)在已經(jīng)創(chuàng)建了pool對象,馬上就能有簡單的并行程序了,所以讓我們重新寫example2.py中的url opener吧!

 
 
  1. import urllib2
  2. from multiprocessing.dummy import Pool as ThreadPool
  3.  
  4. urls = [
  5.     'http://www.python.org',
  6.     'http://www.python.org/about/',
  7.     'http://www.onlamp.com/pub/a/python/2003/04/17/metaclasses.html',
  8.     'http://www.python.org/doc/',
  9.     'http://www.python.org/download/',
  10.     'http://www.python.org/getit/',
  11.     'http://www.python.org/community/',
  12.     'https://wiki.python.org/moin/',
  13.     'http://planet.python.org/',
  14.     'https://wiki.python.org/moin/LocalUserGroups',
  15.     'http://www.python.org/psf/',
  16.     'http://docs.python.org/devguide/',
  17.     'http://www.python.org/community/awards/'
  18.     # etc..
  19.     ]
  20.  
  21. # Make the Pool of workers
  22. pool = ThreadPool(4)
  23. # Open the urls in their own threads
  24. # and return the results
  25. results = pool.map(urllib2.urlopen, urls)
  26. #close the pool and wait for the work to finish
  27. pool.close()
  28. pool.join()

看吧!這次的代碼僅用了4行就完成了所有的工作。其中3句還是簡單的固定寫法。調(diào)用map就能完成我們前面例子中40行的內(nèi)容!為了更形象地表明兩種方法的差異,我還分別給它們運行的時間計時。

 
 
  1. # results = []
  2. # for url in urls:
  3. #   result = urllib2.urlopen(url)
  4. #   results.append(result)
  5.  
  6. # # ------- VERSUS ------- #
  7.  
  8. # # ------- 4 Pool ------- #
  9. # pool = ThreadPool(4)
  10. # results = pool.map(urllib2.urlopen, urls)
  11.  
  12. # # ------- 8 Pool ------- #
  13.  
  14. # pool = ThreadPool(8)
  15. # results = pool.map(urllib2.urlopen, urls)
  16.  
  17. # # ------- 13 Pool ------- #
  18.  
  19. # pool = ThreadPool(13)
  20. # results = pool.map(urllib2.urlopen, urls)

#p#

結(jié)果:

 
 
  1. #                       Single thread:  14.4 Seconds
  2. #                              4 Pool:   3.1 Seconds
  3. #                              8 Pool:   1.4 Seconds
  4. #                             13 Pool:   1.3 Seconds

相當出色!并且也表明了為什么要細心調(diào)試pool的大小。在這里,只要大于9,就能使其運行速度加快。

實例2

生成成千上萬的縮略圖

我們在CPU模式下來完成吧!我工作中就經(jīng)常需要處理大量的圖像文件夾。其任務(wù)之一就是創(chuàng)建縮略圖。這在并行任務(wù)中已經(jīng)有很成熟的方法了。

基礎(chǔ)的單線程創(chuàng)建

 
 
  1. import os
  2. import PIL
  3.  
  4. from multiprocessing import Pool
  5. from PIL import Image
  6.  
  7. SIZE = (75,75)
  8. SAVE_DIRECTORY = 'thumbs'
  9.  
  10. def get_image_paths(folder):
  11.     return (os.path.join(folder, f)
  12.             for f in os.listdir(folder)
  13.             if 'jpeg' in f)
  14.  
  15. def create_thumbnail(filename):
  16.     im = Image.open(filename)
  17.     im.thumbnail(SIZE, Image.ANTIALIAS)
  18.     base, fname = os.path.split(filename)
  19.     save_path = os.path.join(base, SAVE_DIRECTORY, fname)
  20.     im.save(save_path)
  21.  
  22. if __name__ == '__main__':
  23.     folder = os.path.abspath(
  24.         '11_18_2013_R000_IQM_Big_Sur_Mon__e10d1958e7b766c3e840')
  25.     os.mkdir(os.path.join(folder, SAVE_DIRECTORY))
  26.  
  27.     images = get_image_paths(folder)
  28.  
  29.     for image in images:
  30.              create_thumbnail(Image)

對于一個例子來說,這是有點難,但本質(zhì)上,這就是向程序傳遞一個文件夾,然后將其中的所有圖片抓取出來,并最終在它們各自的目錄下創(chuàng)建和儲存縮略圖。

我的電腦處理大約6000張圖片用了27.9秒。

如果我們用并行調(diào)用map來代替for循環(huán)的話:

 
 
  1. import os
  2. import PIL
  3.  
  4. from multiprocessing import Pool
  5. from PIL import Image
  6.  
  7. SIZE = (75,75)
  8. SAVE_DIRECTORY = 'thumbs'
  9.  
  10. def get_image_paths(folder):
  11.     return (os.path.join(folder, f)
  12.             for f in os.listdir(folder)
  13.             if 'jpeg' in f)
  14.  
  15. def create_thumbnail(filename):
  16.     im = Image.open(filename)
  17.     im.thumbnail(SIZE, Image.ANTIALIAS)
  18.     base, fname = os.path.split(filename)
  19.     save_path = os.path.join(base, SAVE_DIRECTORY, fname)
  20.     im.save(save_path)
  21.  
  22. if __name__ == '__main__':
  23.     folder = os.path.abspath(
  24.         '11_18_2013_R000_IQM_Big_Sur_Mon__e10d1958e7b766c3e840')
  25.     os.mkdir(os.path.join(folder, SAVE_DIRECTORY))
  26.  
  27.     images = get_image_paths(folder)
  28.  
  29.     pool = Pool()
  30.         pool.map(create_thumbnail,images)
  31.         pool.close()
  32.         pool.join()

5.6秒!

對于只改變了幾行代碼而言,這是大大地提升了運行速度。這個方法還能更快,只要你將cpu 和 io的任務(wù)分別用它們的進程和線程來運行——但也常造成死鎖??傊?,綜合考慮到 map這個實用的功能,以及人為線程管理的缺失,我覺得這是一個美觀,可靠還容易debug的方法。

好了,文章結(jié)束了。一行完成并行任務(wù)。


網(wǎng)頁題目:一行代碼完成并行任務(wù)
文章起源:http://m.5511xx.com/article/dpdejcp.html