未检测到多处理池中引发的exception

看起来,当多处理.Pool进程发生exception时,没有堆栈跟踪或任何其他指示失败。 例:

from multiprocessing import Pool def go(): print(1) raise Exception() print(2) p = Pool() p.apply_async(go) p.close() p.join() 

打印1并停止静音。 有趣的是,引发一个BaseException反而起作用。 有什么办法使所有exception的行为与BaseException相同吗?

我有一个合理的解决scheme,至less出于debugging的目的。 目前我还没有一个解决scheme可以在主stream程中提高例外。 我的第一个想法是使用一个装饰器,但是你只能腌制在模块顶层定义的函数 ,所以这是正确的。

相反,一个简单的包装类和一个池子类,使用apply_async (因此apply )。 我将离开map_async作为读者的练习。

 import traceback from multiprocessing.pool import Pool import multiprocessing # Shortcut to multiprocessing's logger def error(msg, *args): return multiprocessing.get_logger().error(msg, *args) class LogExceptions(object): def __init__(self, callable): self.__callable = callable def __call__(self, *args, **kwargs): try: result = self.__callable(*args, **kwargs) except Exception as e: # Here we add some debugging help. If multiprocessing's # debugging is on, it will arrange to log the traceback error(traceback.format_exc()) # Re-raise the original exception so the Pool worker can # clean up raise # It was fine, give a normal answer return result class LoggingPool(Pool): def apply_async(self, func, args=(), kwds={}, callback=None): return Pool.apply_async(self, LogExceptions(func), args, kwds, callback) def go(): print(1) raise Exception() print(2) multiprocessing.log_to_stderr() p = LoggingPool(processes=1) p.apply_async(go) p.close() p.join() 

这给了我:

 1 [ERROR/PoolWorker-1] Traceback (most recent call last): File "mpdebug.py", line 24, in __call__ result = self.__callable(*args, **kwargs) File "mpdebug.py", line 44, in go raise Exception() Exception 

也许我失去了一些东西,但不是什么Result对象的get方法返回? 请参阅进程池 。

类multiprocessing.pool.AsyncResult

Pool.apply_async()和Pool.map_async()。get([timeout])返回的结果的类
到达时返回结果。 如果超时不是“无”,并且结果没有在超时秒内到达,则会引发multiprocessing.TimeoutError。 如果远程调用引发exception,那么该exception将被get()重新调整。

所以,稍微修改你的例子,可以做

 from multiprocessing import Pool def go(): print(1) raise Exception("foobar") print(2) p = Pool() x = p.apply_async(go) x.get() p.close() p.join() 

这给出了结果

 1 Traceback (most recent call last): File "rob.py", line 10, in <module> x.get() File "/usr/lib/python2.6/multiprocessing/pool.py", line 422, in get raise self._value Exception: foobar 

这不是完全令人满意的,因为它不打印回溯,但总比没有好。

更新:这个bug已经在Python 3.4中修复,由Richard Oudkerk提供。 请参阅multiprocessing.pool的问题get方法.Async应该返回完整的回溯 。

在撰写本文时得票最多的解决scheme存在一个问题:

 from multiprocessing import Pool def go(): print(1) raise Exception("foobar") print(2) p = Pool() x = p.apply_async(go) x.get() ## waiting here for go() to complete... p.close() p.join() 

正如@dfrankow所指出的那样,它将在x.get()上等待,这会破坏asynchronous运行任务的点。 所以,为了更好的效率(特别是如果你的工作人员的function需要很长时间),我会改变它:

 from multiprocessing import Pool def go(x): print(1) # task_that_takes_a_long_time() raise Exception("Can't go anywhere.") print(2) return x**2 p = Pool() results = [] for x in range(1000): results.append( p.apply_async(go, [x]) ) p.close() for r in results: r.get() 

优点 :worker函数是asynchronous运行的,例如,如果你在多个内核上运行很多任务,它将比原来的解决scheme高效得多。

缺点 :如果worker函数中有一个exception,只有在池完成所有任务之后才会引发exception。 这可能是也可能不是理想的行为。 编辑根据@ colinfang的评论,这固定这个。

我已经成功地logging这个装饰器的exception:

 import traceback, functools, multiprocessing def trace_unhandled_exceptions(func): @functools.wraps(func) def wrapped_func(*args, **kwargs): try: func(*args, **kwargs) except: print 'Exception in '+func.__name__ traceback.print_exc() return wrapped_func 

与问题中的代码,这是

 @trace_unhandled_exceptions def go(): print(1) raise Exception() print(2) p = multiprocessing.Pool(1) p.apply_async(go) p.close() p.join() 

简单地装饰你传递给你的进程池的函数。 这个工作的关键是@functools.wraps(func)否则多处理会抛出一个PicklingError

上面的代码给出

 1 Exception in go Traceback (most recent call last): File "<stdin>", line 5, in wrapped_func File "<stdin>", line 4, in go Exception 

我创build了一个RemoteException.py模块,它显示了进程中exception的完整回溯。 Python2。 下载并添加到您的代码:

 import RemoteException @RemoteException.showError def go(): raise Exception('Error!') if __name__ == '__main__': import multiprocessing p = multiprocessing.Pool(processes = 1) r = p.apply(go) # full traceback is shown here 
 import logging from multiprocessing import Pool def proc_wrapper(func, *args, **kwargs): """Print exception because multiprocessing lib doesn't return them right.""" try: return func(*args, **kwargs) except Exception as e: logging.exception(e) raise def go(x): print x raise Exception("foobar") p = Pool() p.apply_async(proc_wrapper, (go, 5)) p.join() p.close() 

我会尝试使用pdb:

 import pdb import sys def handler(type, value, tb): pdb.pm() sys.excepthook = handler 

既然你已经使用了apply_sync ,我猜这个用例是想做一些同步任务。 使用callback进行处理是另一种select。 请注意,这个选项只适用于python3.2及以上版本,python2.7不可用。

 from multiprocessing import Pool def callback(result): print('success', result) def callback_error(result): print('error', result) def go(): print(1) raise Exception() print(2) p = Pool() p.apply_async(go, callback=callback, error_callback=callback_error) # You can do another things p.close() p.join() 

由于multiprocessing.Pool已经有了不错的答案。可用的游戏multiprocessing.Pool ,我将提供一个解决scheme,使用不同的方法来完成。

对于python >= 3.2 ,下面的解决scheme似乎是最简单的:

 from concurrent.futures import ProcessPoolExecutor, wait def go(): print(1) raise Exception() print(2) futures = [] with ProcessPoolExecutor() as p: for i in range(10): futures.append(p.submit(go)) results = [f.result() for f in futures] 

优点:

  • 很less的代码
  • 在主stream程中引发了一个例外
  • 提供堆栈跟踪
  • 没有外部依赖

有关API的更多信息,请查看: https : //docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ProcessPoolExecutor

此外,如果您提交了大量任务,并且只要您的某个任务失败,您希望主进程失败,则可以使用以下代码片段:

 from concurrent.futures import ProcessPoolExecutor, wait, FIRST_EXCEPTION, as_completed import time def go(): print(1) time.sleep(0.3) raise Exception() print(2) futures = [] with ProcessPoolExecutor(1) as p: for i in range(10): futures.append(p.submit(go)) for f in as_completed(futures): if f.exception() is not None: for f in futures: f.cancel() break [f.result() for f in futures] 

所有其他答案只有在所有任务都被执行后才会失败。