使用多处理时不能pickle <type'instancemethod'> Pool.map()

我正在尝试使用multiprocessingPool.map()函数来同时分配工作。 当我使用下面的代码,它工作正常:

 import multiprocessing def f(x): return x*x def go(): pool = multiprocessing.Pool(processes=4) print pool.map(f, range(10)) if __name__== '__main__' : go() 

但是,当我以更面向对象的方式使用它时,它不起作用。 它给出的错误信息是:

 PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup __builtin__.instancemethod failed 

这发生在以下是我的主要程序时:

 import someClass if __name__== '__main__' : sc = someClass.someClass() sc.go() 

以下是我的someClass类:

 import multiprocessing class someClass(object): def __init__(self): pass def f(self, x): return x*x def go(self): pool = multiprocessing.Pool(processes=4) print pool.map(self.f, range(10)) 

任何人都知道这个问题可能是一个简单的方法吗?

问题是多处理必须腌制一些东西来把它们放在进程中,绑定方法是不可挑选的。 解决方法(无论您认为“简单”还是不行;-)是将基础设施添加到您的程序,以允许这样的方法被腌制,注册与copy_reg标准库方法。

例如,Steven Bethard对这个线程的贡献(接近线程末尾)显示了一个完全可行的方法,允许通过copy_reg来进行酸洗/ copy_reg

所有这些解决scheme都是丑陋的,因为多处理和酸洗被破坏和限制,除非你跳出标准库。

如果使用名为pathos.multiprocesssingmultiprocessing pathos.multiprocesssing ,则可以在多处理map函数中直接使用类和类方法。 这是因为dill被用来代替picklecPickledill可以序列化几乎任何东西在Python中。

pathos.multiprocessing还提供了一个asynchronous映射函数…它可以map具有多个参数的函数(例如map(math.pow, [1,2,3], [4,5,6])

请参阅: 多处理和莳萝可以一起做什么?

和: http : //matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization/

 >>> import pathos.pools as pp >>> p = pp.ProcessPool(4) >>> >>> def add(x,y): ... return x+y ... >>> x = [0,1,2,3] >>> y = [4,5,6,7] >>> >>> p.map(add, x, y) [4, 6, 8, 10] >>> >>> class Test(object): ... def plus(self, x, y): ... return x+y ... >>> t = Test() >>> >>> p.map(Test.plus, [t]*4, x, y) [4, 6, 8, 10] >>> >>> p.map(t.plus, x, y) [4, 6, 8, 10] 

如果你想明白的话,你可以完全想要你想做的事情,如果你愿意,你可以从口译员那里做。

 >>> import pathos.pools as pp >>> class someClass(object): ... def __init__(self): ... pass ... def f(self, x): ... return x*x ... def go(self): ... pool = pp.ProcessPool(4) ... print pool.map(self.f, range(10)) ... >>> sc = someClass() >>> sc.go() [0, 1, 4, 9, 16, 25, 36, 49, 64, 81] >>> 

获取代码在这里: https : //github.com/uqfoundation/pathos

你也可以在你的someClass()定义一个__call__()方法,该方法调用someClass.go() ,然后将someClass()的实例传递给池。 这个对象是pickleable,它工作正常(对我来说)…

尽pipeSteven Bethard的解决scheme有一些限制:

当你将类方法注册为一个函数时,每当你的方法处理完成时,你的类的析构函数就会被惊人地调用。 所以,如果你有一个你的类的实例调用n次方法,成员可能会消失在2次运行之间,你可能会得到一个消息malloc: *** error for object 0x...: pointer being freed was not allocated (例如打开成员文件)或pure virtual method called, terminate called without an active exception (这意味着比我所使用的成员对象的生命周期比我想象的更短)。 在处理大于池大小的n时,我得到了这个结果。 这里是一个简短的例子:

 from multiprocessing import Pool, cpu_count from multiprocessing.pool import ApplyResult # --------- see Stenven's solution above ------------- from copy_reg import pickle from types import MethodType def _pickle_method(method): func_name = method.im_func.__name__ obj = method.im_self cls = method.im_class return _unpickle_method, (func_name, obj, cls) def _unpickle_method(func_name, obj, cls): for cls in cls.mro(): try: func = cls.__dict__[func_name] except KeyError: pass else: break return func.__get__(obj, cls) class Myclass(object): def __init__(self, nobj, workers=cpu_count()): print "Constructor ..." # multi-processing pool = Pool(processes=workers) async_results = [ pool.apply_async(self.process_obj, (i,)) for i in range(nobj) ] pool.close() # waiting for all results map(ApplyResult.wait, async_results) lst_results=[r.get() for r in async_results] print lst_results def __del__(self): print "... Destructor" def process_obj(self, index): print "object %d" % index return "results" pickle(MethodType, _pickle_method, _unpickle_method) Myclass(nobj=8, workers=3) # problem !!! the destructor is called nobj times (instead of once) 

输出:

 Constructor ... object 0 object 1 object 2 ... Destructor object 3 ... Destructor object 4 ... Destructor object 5 ... Destructor object 6 ... Destructor object 7 ... Destructor ... Destructor ... Destructor ['results', 'results', 'results', 'results', 'results', 'results', 'results', 'results'] ... Destructor 

__call__方法不是那么等价,因为从结果中读取[None,…]

 from multiprocessing import Pool, cpu_count from multiprocessing.pool import ApplyResult class Myclass(object): def __init__(self, nobj, workers=cpu_count()): print "Constructor ..." # multiprocessing pool = Pool(processes=workers) async_results = [ pool.apply_async(self, (i,)) for i in range(nobj) ] pool.close() # waiting for all results map(ApplyResult.wait, async_results) lst_results=[r.get() for r in async_results] print lst_results def __call__(self, i): self.process_obj(i) def __del__(self): print "... Destructor" def process_obj(self, i): print "obj %d" % i return "result" Myclass(nobj=8, workers=3) # problem !!! the destructor is called nobj times (instead of once), # **and** results are empty ! 

所以这两种方法都不能令人满意

还有另外一个捷径可以使用,尽pipe根据类实例中的内容可能是效率低下的。

正如大家所说的那样,问题在于multiprocessing代码必须腌制它发送给已经启动的subprocess的东西,picker不会执行实例方法。

但是,可以不发送实例方法,而是将实际的类实例以及要调用的函数的名称发送到普通函数,然后使用getattr调用实例方法,从而在Pool创build绑定方法子。 这与定义__call__方法类似,不同之处在于您可以调用多个成员函数。

从他的回答中窃取@ EricH的代码并注释了一下(我重新input了它,因此所有的名字都改变了,出于某种原因,这似乎比剪切粘贴更容易:-))来说明所有的魔法:

 import multiprocessing import os def call_it(instance, name, args=(), kwargs=None): "indirect caller for instance methods and multiprocessing" if kwargs is None: kwargs = {} return getattr(instance, name)(*args, **kwargs) class Klass(object): def __init__(self, nobj, workers=multiprocessing.cpu_count()): print "Constructor (in pid=%d)..." % os.getpid() self.count = 1 pool = multiprocessing.Pool(processes = workers) async_results = [pool.apply_async(call_it, args = (self, 'process_obj', (i,))) for i in range(nobj)] pool.close() map(multiprocessing.pool.ApplyResult.wait, async_results) lst_results = [r.get() for r in async_results] print lst_results def __del__(self): self.count -= 1 print "... Destructor (in pid=%d) count=%d" % (os.getpid(), self.count) def process_obj(self, index): print "object %d" % index return "results" Klass(nobj=8, workers=3) 

输出结果显示,构造函数的确被调用了一次(在原始的pid中),而析构函数被调用了9次(每次复制一次=每个pool-worker-process需要2到3次,再加上一次处理)。 这通常是可以的,因为在这种情况下,由于默认pickler会复制整个实例,并且(semi-)会秘密地重新填充它,在这种情况下,

 obj = object.__new__(Klass) obj.__dict__.update({'count':1}) 

这就是为什么即使在三个工作进程中调用了八次析构函数的情况下,每一次都会从1减less到0,但是当然你仍然可以通过这种方式陷入困境。 如有必要,你可以提供你自己的__setstate__

  def __setstate__(self, adict): self.count = adict['count'] 

在这种情况下,例如。

你也可以在你的someClass()定义一个__call__()方法,该方法调用someClass.go() ,然后将someClass()的实例传递给池。 这个对象是pickleable,它工作正常(对我来说)…

 class someClass(object): def __init__(self): pass def f(self, x): return x*x def go(self): p = Pool(4) sc = p.map(self, range(4)) print sc def __call__(self, x): return self.f(x) sc = someClass() sc.go() 

一个潜在的微不足道的解决办法是切换到使用multiprocessing.dummy 。 这是一个基于线程的多处理接口实现,在Python 2.7中似乎没有这个问题。 我在这里没有太多的经验,但是这个快速导入的改变允许我在类方法上调用apply_async。

有关multiprocessing.dummy一些很好的资源:

https://docs.python.org/2/library/multiprocessing.html#module-multiprocessing.dummy

http://chriskiehl.com/article/parallelism-in-one-line/