Python多处理酸洗错误

对不起,我不能用更简单的例子重现错误,而且我的代码太复杂了,不能发布。 如果我在IPython shell而不是普通的python中运行这个程序,那么结果会很好。

我查了一下以前有关这个问题的笔记。 它们都是由使用pool来调用类函数中定义的函数引起的。 但是对我来说情况并非如此。

Exception in thread Thread-3: Traceback (most recent call last): File "/usr/lib64/python2.7/threading.py", line 552, in __bootstrap_inner self.run() File "/usr/lib64/python2.7/threading.py", line 505, in run self.__target(*self.__args, **self.__kwargs) File "/usr/lib64/python2.7/multiprocessing/pool.py", line 313, in _handle_tasks put(task) PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed 

我将不胜感激任何帮助。

更新:我腌菜function定义在模块的顶层。 虽然它调用一个包含嵌套函数的函数。 即,f()调用g()调用h(),它有一个嵌套的函数i(),我正在调用pool.apply_async(f)。 f(),g(),h()都是在顶层定义的。 我尝试了这个模式的更简单的例子,它虽然工作。

这是一个什么可以腌制的清单 。 特别是,只有在模块顶层定义的function才可以使用。

这段代码:

 import multiprocessing as mp class Foo(): @staticmethod def work(self): pass pool = mp.Pool() foo = Foo() pool.apply_async(foo.work) pool.close() pool.join() 

产生的错误几乎与您发布的错误相同:

 Exception in thread Thread-2: Traceback (most recent call last): File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner self.run() File "/usr/lib/python2.7/threading.py", line 505, in run self.__target(*self.__args, **self.__kwargs) File "/usr/lib/python2.7/multiprocessing/pool.py", line 315, in _handle_tasks put(task) PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed 

问题是pool方法都使用一个queue.Queue 。将任务传递给工作进程。 所有经过queue.Queue东西都是queue.Queuefoo.work不可用,因为它没有在模块的顶层定义。

它可以通过在顶层定义一个调用foo.work()的函数来解决:

 def work(foo): foo.work() pool.apply_async(work,args=(foo,)) 

注意foo是可选的,因为Foo是在顶层定义的,而foo.__dict__是可选的。

我会使用pathos.multiprocesssing ,而不是multiprocessingpathos.multiprocessing是使用dillmultiprocessing一个分支。 dill可以序列化几乎任何东西在Python中,所以你可以发送更多的东西并行。 pathos分支也可以直接使用多个参数函数,就像你需要的类方法一样。

 >>> from pathos.multiprocessing import ProcessingPool as Pool >>> p = Pool(4) >>> class Test(object): ... def plus(self, x, y): ... return x+y ... >>> t = Test() >>> p.map(t.plus, x, y) [4, 6, 8, 10] >>> >>> class Foo(object): ... @staticmethod ... def work(self, x): ... return x+1 ... >>> f = Foo() >>> p.apipe(f.work, f, 100) <processing.pool.ApplyResult object at 0x10504f8d0> >>> res = _ >>> res.get() 101 

获得pathos (如果你喜欢, dill )在这里: https : //github.com/uqfoundation

正如其他人所说, multiprocessing只能将Python对象转移到可以被腌制的工作进程。 如果你不能像unutbu所描述的那样重新组织你的代码,你可以使用dill的扩展pickling / unpicklingfunction来传输数据(特别是代码数据),如下所示。

这个解决scheme只需要安装dill ,没有其他库作为pathos

 import os from multiprocessing import Pool import dill def run_dill_encoded(payload): fun, args = dill.loads(payload) return fun(*args) def apply_async(pool, fun, args): payload = dill.dumps((fun, args)) return pool.apply_async(run_dill_encoded, (payload,)) if __name__ == "__main__": pool = Pool(processes=5) # asyn execution of lambda jobs = [] for i in range(10): job = apply_async(pool, lambda a, b: (a, b, a * b), (i, i + 1)) jobs.append(job) for job in jobs: print job.get() print # async execution of static method class O(object): @staticmethod def calc(): return os.getpid() jobs = [] for i in range(10): job = apply_async(pool, O.calc, ()) jobs.append(job) for job in jobs: print job.get() 

我发现我也可以通过尝试使用探查器来在完美的代码片段上生成完全错误的输出结果。

请注意,这是在Windows(分叉是不太优雅)。

我之前在跑步:

 python -m profile -o output.pstats <script> 

并发现删除分析删除了错误,并把分析恢复它。 也是因为我知道代码用于工作,所以也是在驾驶着我。 我正在检查,看看有没有更新pool.py …然后有一个下沉的感觉,并消除了分析,就是这样。

如果有其他人遇到这种情况,请在这里张贴档案。

这个解决scheme只需要安装莳萝,没有其他库作为病态

 def apply_packed_function_for_map((dumped_function, item, args, kwargs),): """ Unpack dumped function as target function and call it with arguments. :param (dumped_function, item, args, kwargs): a tuple of dumped function and its arguments :return: result of target function """ target_function = dill.loads(dumped_function) res = target_function(item, *args, **kwargs) return res def pack_function_for_map(target_function, items, *args, **kwargs): """ Pack function and arguments to object that can be sent from one multiprocessing.Process to another. The main problem is: «multiprocessing.Pool.map*» or «apply*» cannot use class methods or closures. It solves this problem with «dill». It works with target function as argument, dumps it («with dill») and returns dumped function with arguments of target function. For more performance we dump only target function itself and don't dump its arguments. How to use (pseudo-code): ~>>> import multiprocessing ~>>> images = [...] ~>>> pool = multiprocessing.Pool(100500) ~>>> features = pool.map( ~... *pack_function_for_map( ~... super(Extractor, self).extract_features, ~... images, ~... type='png' ~... **options, ~... ) ~... ) ~>>> :param target_function: function, that you want to execute like target_function(item, *args, **kwargs). :param items: list of items for map :param args: positional arguments for target_function(item, *args, **kwargs) :param kwargs: named arguments for target_function(item, *args, **kwargs) :return: tuple(function_wrapper, dumped_items) It returs a tuple with * function wrapper, that unpack and call target function; * list of packed target function and its' arguments. """ dumped_function = dill.dumps(target_function) dumped_items = [(dumped_function, item, args, kwargs) for item in items] return apply_packed_function_for_map, dumped_items 

它也适用于numpy数组。

你有没有机会传递一个数组的string?

当我传递一个恰好包含空string的数组时,我有同样的确切的错误。 我认为这可能是由于这个bug: http : //projects.scipy.org/numpy/ticket/1658

 Can't pickle <type 'function'>: attribute lookup __builtin__.function failed 

如果在传递给asynchronous作业的模型对象中有任何内置函数,也会出现此错误。

所以请确保检查传递的模型对象没有内置函数。 (在我们的例子中,我们使用模型中的django-model-utils的 FieldTracker()函数来跟踪某个字段)。 这里是相关的GitHub问题的链接 。