多处理:如何在类中定义的函数上使用Pool.map?

当我运行如下的东西时:

from multiprocessing import Pool p = Pool(5) def f(x): return x*x p.map(f, [1,2,3]) 

它工作正常。 但是,把它作为一个类的function:

 class calculate(object): def run(self): def f(x): return x*x p = Pool() return p.map(f, [1,2,3]) cl = calculate() print cl.run() 

给我以下错误:

 Exception in thread Thread-1: Traceback (most recent call last): File "/sw/lib/python2.6/threading.py", line 532, in __bootstrap_inner self.run() File "/sw/lib/python2.6/threading.py", line 484, in run self.__target(*self.__args, **self.__kwargs) File "/sw/lib/python2.6/multiprocessing/pool.py", line 225, in _handle_tasks put(task) PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed 

我曾经见过Alex Martelli的一篇文章处理同样的问题,但还不够明确。

我也对pool.map可以接受哪些函数的限制感到恼火。 我写了以下来绕过这个。 它似乎工作,即使recursion使用parmap。

 from multiprocessing import Process, Pipe from itertools import izip def spawn(f): def fun(pipe,x): pipe.send(f(x)) pipe.close() return fun def parmap(f,X): pipe=[Pipe() for x in X] proc=[Process(target=spawn(f),args=(c,x)) for x,(p,c) in izip(X,pipe)] [p.start() for p in proc] [p.join() for p in proc] return [p.recv() for (p,c) in pipe] if __name__ == '__main__': print parmap(lambda x:x**x,range(1,5)) 

我无法使用迄今发布的代码,原因有三。

  1. 使用“multiprocessing.Pool”的代码不适用于lambdaexpression式。
  2. 不使用“multiprocessing.Pool”的代码产生与工作项目一样多的进程。
  3. 所有的代码在做实际的工作之前都要遍历整个input列表。

2.)是一个性能问题,3.)禁止使用像http://code.google.com/p/python-progressbar/这样的进度条;

我调整了代码,它产生了预定义数量的工作人员,并且只有在存在闲置的工作人员时才迭代input列表。 我也为预期的工作人员启用了“守护进程”模式。

 import multiprocessing def fun(f, q_in, q_out): while True: i, x = q_in.get() if i is None: break q_out.put((i, f(x))) def parmap(f, X, nprocs=multiprocessing.cpu_count()): q_in = multiprocessing.Queue(1) q_out = multiprocessing.Queue() proc = [multiprocessing.Process(target=fun, args=(f, q_in, q_out)) for _ in range(nprocs)] for p in proc: p.daemon = True p.start() sent = [q_in.put((i, x)) for i, x in enumerate(X)] [q_in.put((None, None)) for _ in range(nprocs)] res = [q_out.get() for _ in range(len(sent))] [p.join() for p in proc] return [x for i, x in sorted(res)] if __name__ == '__main__': print(parmap(lambda i: i * 2, [1, 2, 3, 4, 6, 7, 8])) 

目前还没有解决您的问题,据我所知:你给map()的函数必须通过导入你的模块来访问。 这就是为什么罗伯特的代码工作:函数f()可以通过导入下面的代码获得:

 def f(x): return x*x class Calculate(object): def run(self): p = Pool() return p.map(f, [1,2,3]) if __name__ == '__main__': cl = Calculate() print cl.run() 

我实际上添加了一个“主”部分,因为这遵循了Windows平台的build议 (“确保主模块可以通过新的Python解释器安全地导入而不会导致意外的副作用”)。

我还在Calculate前加上了一个大写字母,以便遵循PEP 8 。 🙂

除非您跳到标准库之外,否则多处理和酸洗会被破坏和限制。

如果使用名为pathos.multiprocesssingmultiprocessing pathos.multiprocesssing ,则可以在多处理map函数中直接使用类和类方法。 这是因为dill被用来代替picklecPickledill可以序列化几乎任何东西在Python中。

pathos.multiprocessing还提供了一个asynchronous映射函数…它可以map具有多个参数的函数(例如map(math.pow, [1,2,3], [4,5,6])

参见讨论: 多处理和莳萝可以一起做什么?

和: http : //matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization

它甚至可以处理你最初编写的代码,不需要修改,也可以处理解释器。 为什么还有其他更脆弱和更具体的案例呢?

 >>> from pathos.multiprocessing import ProcessingPool as Pool >>> class calculate(object): ... def run(self): ... def f(x): ... return x*x ... p = Pool() ... return p.map(f, [1,2,3]) ... >>> cl = calculate() >>> print cl.run() [1, 4, 9] 

获取代码在这里: https : //github.com/uqfoundation/pathos

而且,只是为了炫耀一下它可以做的事情:

 >>> from pathos.multiprocessing import ProcessingPool as Pool >>> >>> p = Pool(4) >>> >>> def add(x,y): ... return x+y ... >>> x = [0,1,2,3] >>> y = [4,5,6,7] >>> >>> p.map(add, x, y) [4, 6, 8, 10] >>> >>> class Test(object): ... def plus(self, x, y): ... return x+y ... >>> t = Test() >>> >>> p.map(Test.plus, [t]*4, x, y) [4, 6, 8, 10] >>> >>> res = p.amap(t.plus, x, y) >>> res.get() [4, 6, 8, 10] 

mrule的解决scheme是正确的,但有一个错误:如果孩子发回大量的数据,它可以填充pipe道的缓冲区,阻塞在孩子的pipe.send() ,而父母正在等待孩子退出pipe.join() 。 解决scheme是在join()孩子之前阅读孩子的数据。 此外,孩子应该closures父母的pipe道,以防止死锁。 下面的代码解决了这个问题。 另请注意,此parmapX中为每个元素创build一个进程。 更高级的解决scheme是使用multiprocessing.cpu_count()X分成多个块,然后在返回之前合并结果。 我把这个作为一个练习留给读者,以免破坏mrule所给出的正确答案的简洁。 ;)

 from multiprocessing import Process, Pipe from itertools import izip def spawn(f): def fun(ppipe, cpipe,x): ppipe.close() cpipe.send(f(x)) cpipe.close() return fun def parmap(f,X): pipe=[Pipe() for x in X] proc=[Process(target=spawn(f),args=(p,c,x)) for x,(p,c) in izip(X,pipe)] [p.start() for p in proc] ret = [p.recv() for (p,c) in pipe] [p.join() for p in proc] return ret if __name__ == '__main__': print parmap(lambda x:x**x,range(1,5)) 

我也为此而挣扎。 我有一个类的数据成员的function,作为一个简单的例子:

 from multiprocessing import Pool import itertools pool = Pool() class Example(object): def __init__(self, my_add): self.f = my_add def add_lists(self, list1, list2): # Needed to do something like this (the following line won't work) return pool.map(self.f,list1,list2) 

我需要在同一个类中的Pool.map()调用中使用self.f函数,而self.f没有将一个元组作为参数。 由于这个函数被embedded到一个类中,所以我不清楚如何编写其他答案的types。

我通过使用一个不同的包装器来解决这个问题,它包含一个元组/列表,其中第一个元素是函数,其余元素是该函数的参数,名为eval_func_tuple(f_args)。 使用这个,有问题的行可以被replace为返回pool.map(eval_func_tuple,itertools.izip(itertools.repeat(self.f),list1,list2))。 以下是完整的代码:

文件:util.py

 def add(a, b): return a+b def eval_func_tuple(f_args): """Takes a tuple of a function and args, evaluates and returns result""" return f_args[0](*f_args[1:]) 

文件:main.py

 from multiprocessing import Pool import itertools import util pool = Pool() class Example(object): def __init__(self, my_add): self.f = my_add def add_lists(self, list1, list2): # The following line will now work return pool.map(util.eval_func_tuple, itertools.izip(itertools.repeat(self.f), list1, list2)) if __name__ == '__main__': myExample = Example(util.add) list1 = [1, 2, 3] list2 = [10, 20, 30] print myExample.add_lists(list1, list2) 

运行main.py会给[11,22,33]。 随意改进,例如eval_func_tuple也可以修改为采取关键字参数。

在另一个说明中,在另一个答案中,对于比可用CPU数量更多的进程,function“parmap”可以更高效。 我正在复制下面的编辑版本。 这是我的第一篇文章,我不确定是否应该直接编辑原始答案。 我也重命名了一些variables。

 from multiprocessing import Process, Pipe from itertools import izip def spawn(f): def fun(pipe,x): pipe.send(f(x)) pipe.close() return fun def parmap(f,X): pipe=[Pipe() for x in X] processes=[Process(target=spawn(f),args=(c,x)) for x,(p,c) in izip(X,pipe)] numProcesses = len(processes) processNum = 0 outputList = [] while processNum < numProcesses: endProcessNum = min(processNum+multiprocessing.cpu_count(), numProcesses) for proc in processes[processNum:endProcessNum]: proc.start() for proc in processes[processNum:endProcessNum]: proc.join() for proc,c in pipe[processNum:endProcessNum]: outputList.append(proc.recv()) processNum = endProcessNum return outputList if __name__ == '__main__': print parmap(lambda x:x**x,range(1,5)) 

在类中定义的函数(甚至在类内的函数内)并不真正的腌制。 但是,这工作:

 def f(x): return x*x class calculate(object): def run(self): p = Pool() return p.map(f, [1,2,3]) cl = calculate() print cl.run() 

我采取了克劳斯se和aganders3的答案,并build立了一个文件化的模块,更具可读性,并保存在一个文件中。 您可以将其添加到您的项目。 它甚至有一个可选的进度条!

 """ The ``processes`` module provides some convenience functions for using parallel processes in python. Adapted from http://stackoverflow.com/a/16071616/287297 Example usage: print prll_map(lambda i: i * 2, [1, 2, 3, 4, 6, 7, 8], 32, verbose=True) Comments: "It spawns a predefined amount of workers and only iterates through the input list if there exists an idle worker. I also enabled the "daemon" mode for the workers so that KeyboardInterupt works as expected." Pitfalls: all the stdouts are sent back to the parent stdout, intertwined. Alternatively, use this fork of multiprocessing: https://github.com/uqfoundation/multiprocess """ # Modules # import multiprocessing from tqdm import tqdm ################################################################################ def apply_function(func_to_apply, queue_in, queue_out): while not queue_in.empty(): num, obj = queue_in.get() queue_out.put((num, func_to_apply(obj))) ################################################################################ def prll_map(func_to_apply, items, cpus=None, verbose=False): # Number of processes to use # if cpus is None: cpus = min(multiprocessing.cpu_count(), 32) # Create queues # q_in = multiprocessing.Queue() q_out = multiprocessing.Queue() # Process list # new_proc = lambda t,a: multiprocessing.Process(target=t, args=a) processes = [new_proc(apply_function, (func_to_apply, q_in, q_out)) for x in range(cpus)] # Put all the items (objects) in the queue # sent = [q_in.put((i, x)) for i, x in enumerate(items)] # Start them all # for proc in processes: proc.daemon = True proc.start() # Display progress bar or not # if verbose: results = [q_out.get() for x in tqdm(range(len(sent)))] else: results = [q_out.get() for x in range(len(sent))] # Wait for them to finish # for proc in processes: proc.join() # Return results # return [x for i, x in sorted(results)] ################################################################################ def test(): def slow_square(x): import time time.sleep(2) return x**2 objs = range(20) squares = prll_map(slow_square, objs, 4, verbose=True) print "Result: %s" % squares 

编辑 :添加@ alexander-mcfarlanebuild议和testingfunction

我修改了klaus se的方法,因为当它用小列表工作时,当项目的数量大于或等于1000时,它会挂起。 我不是一次一个地按照None停止条件推送作业,而是一次加载input队列,让进程在其上进行咀嚼,直到它为空。

 from multiprocessing import cpu_count, Queue, Process def apply_func(f, q_in, q_out): while not q_in.empty(): i, x = q_in.get() q_out.put((i, f(x))) # map a function using a pool of processes def parmap(f, X, nprocs = cpu_count()): q_in, q_out = Queue(), Queue() proc = [Process(target=apply_func, args=(f, q_in, q_out)) for _ in range(nprocs)] sent = [q_in.put((i, x)) for i, x in enumerate(X)] [p.start() for p in proc] res = [q_out.get() for _ in sent] [p.join() for p in proc] return [x for i,x in sorted(res)] 

编辑:不幸的是,现在我遇到了我的系统上的这个错误: 多处理队列最大限制是32767 ,希望在那里的解决方法将有所帮助。

我知道这是6年前现在提出的问题,但是只是想增加我的解决scheme,因为上面的一些build议似乎非常复杂,但是我的解决scheme其实非常简单。

我所要做的就是将pool.map()调用包装到一个辅助函数中。 将类对象和args一起作为一个元组传递给方法,看起来有点像这样。

 def run_in_parallel(args): return args[0].method(args[1]) myclass = MyClass() method_args = [1,2,3,4,5,6] args_map = [ (myclass, arg) for arg in method_args ] pool = Pool() pool.map(run_in_parallel, args_map) 

我不知道如果这个方法已经采取,但我使用的工作是:

 from multiprocessing import Pool t = None def run(n): return tf(n) class Test(object): def __init__(self, number): self.number = number def f(self, x): print x * self.number def pool(self): pool = Pool(2) pool.map(run, range(10)) if __name__ == '__main__': t = Test(9) t.pool() pool = Pool(2) pool.map(run, range(10)) 

输出应该是:

 0 9 18 27 36 45 54 63 72 81 0 9 18 27 36 45 54 63 72 81 
 class Calculate(object): # Your instance method to be executed def f(self, x, y): return x*y if __name__ == '__main__': inp_list = [1,2,3] y = 2 cal_obj = Calculate() pool = Pool(2) results = pool.map(lambda x: cal_obj.f(x, y), inp_list) 

您可能希望将这个函数应用于每个不同类的实例。 那么这也是解决scheme

 class Calculate(object): # Your instance method to be executed def __init__(self, x): self.x = x def f(self, y): return self.x*y if __name__ == '__main__': inp_list = [Calculate(i) for i in range(3)] y = 2 pool = Pool(2) results = pool.map(lambda x: xf(y), inp_list)