使用多处理队列,池和锁的死简单的例子

我试图阅读在http://docs.python.org/dev/library/multiprocessing.html的文档,但我仍然挣扎着多处理队列,池和locking。 而现在我能够build立下面的例子。

关于队列和池,我不知道我是否以正确的方式理解了这个概念,所以如果我错了,请纠正我。 我试图实现的是在时间处理2个请求(数据列表在这个例子中有8个),所以我应该使用什么? 池创build2个进程,可以处理两个不同的队列(最多2个),或者我应该使用队列每次处理2个input? 锁将是正确打印输出。

import multiprocessing import time data = (['a', '2'], ['b', '4'], ['c', '6'], ['d', '8'], ['e', '1'], ['f', '3'], ['g', '5'], ['h', '7'] ) def mp_handler(var1): for indata in var1: p = multiprocessing.Process(target=mp_worker, args=(indata[0], indata[1])) p.start() def mp_worker(inputs, the_time): print " Processs %s\tWaiting %s seconds" % (inputs, the_time) time.sleep(int(the_time)) print " Process %s\tDONE" % inputs if __name__ == '__main__': mp_handler(data) 

您的问题的最佳解决scheme是利用一个Pool 。 使用Queue并具有单独的“队列馈送”function可能是矫枉过正的。

这里是你的程序的一个稍微重新安排的版本,这次只有两个进程联合在一个Pool 。 我相信这是最简单的方法,只需对原始代码进行最小的更改:

 import multiprocessing import time data = ( ['a', '2'], ['b', '4'], ['c', '6'], ['d', '8'], ['e', '1'], ['f', '3'], ['g', '5'], ['h', '7'] ) def mp_worker((inputs, the_time)): print " Processs %s\tWaiting %s seconds" % (inputs, the_time) time.sleep(int(the_time)) print " Process %s\tDONE" % inputs def mp_handler(): p = multiprocessing.Pool(2) p.map(mp_worker, data) if __name__ == '__main__': mp_handler() 

请注意, mp_worker()函数现在接受一个参数(前两个参数的元组),因为map()函数将input数据分块成子列表,每个子列表作为工作函数的单个参数给出。

输出:

 Processs a Waiting 2 seconds Processs b Waiting 4 seconds Process a DONE Processs c Waiting 6 seconds Process b DONE Processs d Waiting 8 seconds Process c DONE Processs e Waiting 1 seconds Process e DONE Processs f Waiting 3 seconds Process d DONE Processs g Waiting 5 seconds Process f DONE Processs h Waiting 7 seconds Process g DONE Process h DONE 

按下面的@Thales评论进行编辑:

如果你想“每个池限制一个锁”,以便你的进程串联对运行,ala:

等待B等待| 完成,B完成| C在等待,D在等待| C完成,D完成| …

然后更改处理函数以为每对数据启动池(2个进程):

 def mp_handler(): subdata = zip(data[0::2], data[1::2]) for task1, task2 in subdata: p = multiprocessing.Pool(2) p.map(mp_worker, (task1, task2)) 

现在你的输出是:

  Processs a Waiting 2 seconds Processs b Waiting 4 seconds Process a DONE Process b DONE Processs c Waiting 6 seconds Processs d Waiting 8 seconds Process c DONE Process d DONE Processs e Waiting 1 seconds Processs f Waiting 3 seconds Process e DONE Process f DONE Processs g Waiting 5 seconds Processs h Waiting 7 seconds Process g DONE Process h DONE 

这是我的个人转到这个话题:

在这里,请(拉请求欢迎!): https : //gist.github.com/thorsummoner/b5b1dfcff7e7fdd334ec

 import multiprocessing import sys THREADS = 3 # Used to prevent multiple threads from mixing thier output GLOBALLOCK = multiprocessing.Lock() def func_worker(args): """This function will be called by each thread. This function can not be a class method. """ # Expand list of args into named args. str1, str2 = args del args # Work # ... # Serial-only Portion GLOBALLOCK.acquire() print(str1) print(str2) GLOBALLOCK.release() def main(argp=None): """Multiprocessing Spawn Example """ # Create the number of threads you want pool = multiprocessing.Pool(THREADS) # Define two jobs, each with two args. func_args = [ ('Hello', 'World',), ('Goodbye', 'World',), ] try: # Spawn up to 9999999 jobs, I think this is the maximum possible. # I do not know what happens if you exceed this. pool.map_async(func_worker, func_args).get(9999999) except KeyboardInterrupt: # Allow ^C to interrupt from any thread. sys.stdout.write('\033[0m') sys.stdout.write('User Interupt\n') pool.close() if __name__ == '__main__': main() 

这可能不是100%相关的问题,但在我search一个队列使用多处理的例子,这首先出现在谷歌。

这是一个基本的示例类,您可以实例化并将项目放入队列,并可以等到队列完成。 这就是我需要的。

 from multiprocessing import JoinableQueue from multiprocessing.context import Process class Renderer: queue = None def __init__(self, nb_workers=2): self.queue = JoinableQueue() self.processes = [Process(target=self.upload) for i in range(nb_workers)] for p in self.processes: p.start() def render(self, item): self.queue.put(item) def upload(self): while True: item = self.queue.get() if item is None: break # process your item here self.queue.task_done() def terminate(self): """ wait until queue is empty and terminate processes """ self.queue.join() for p in self.processes: p.terminate() r = Renderer() r.render(item1) r.render(item2) r.terminate() 

这里是我的代码(对于线程池,但只是改变类的名称,你会有进程池)的一个例子:

 def execute_run(rp): ... do something pool = ThreadPoolExecutor(6) for mat in TESTED_MATERIAL: for en in TESTED_ENERGIES: for ecut in TESTED_E_CUT: rp = RunParams( simulations, DEST_DIR, PARTICLE, mat, 960, 0.125, ecut, en ) pool.submit(execute_run, rp) pool.join() 

基本上:

  • pool = ThreadPoolExecutor(6)为6个线程创build一个池
  • 那么你有一堆的任务添加到池中
  • pool.submit(execute_run, rp)将任务添加到池中,第一个arogument是在线程/进程中调用的函数,其余parameter passing给被调用的函数。
  • pool.join等到所有任务完成。

对于像Komodo Edit(win10)这样的编辑者来说,把sys.stdout.flush()添加到:

 def mp_worker((inputs, the_time)): print " Process %s\tWaiting %s seconds" % (inputs, the_time) time.sleep(int(the_time)) print " Process %s\tDONE" % inputs sys.stdout.flush() 

或作为第一行来:

  if __name__ == '__main__': sys.stdout.flush() 

这有助于了解脚本运行过程中发生的情况。 而不必看黑色的命令行框。