键盘中断与python的多处理池

我怎样才能处理与python的多处理池的KeyboardInterrupt事件? 这是一个简单的例子:

from multiprocessing import Pool from time import sleep from sys import exit def slowly_square(i): sleep(1) return i*i def go(): pool = Pool(8) try: results = pool.map(slowly_square, range(40)) except KeyboardInterrupt: # **** THIS PART NEVER EXECUTES. **** pool.terminate() print "You cancelled the program!" sys.exit(1) print "\nFinally, here are the results: ", results if __name__ == "__main__": go() 

当运行上面的代码时, KeyboardInterrupt在我按下^C时被引发,但是这个过程简单地挂在那个点上,我必须从外部杀死它。

我希望能够随时按下^C ,并使所有进程正常退出。

这是一个Python错误。 当在threading.Condition.wait()中等待一个条件时,KeyboardInterrupt永远不会被发送。 摄制:

 import threading cond = threading.Condition(threading.Lock()) cond.acquire() cond.wait(None) print "done" 

KeyboardInterruptexception将不会被传递,直到wait()返回,它永远不会返回,所以中断从不发生。 KeyboardInterrupt几乎肯定会中断一个等待状态。

请注意,如果指定超时,则不会发生这种情况。 cond.wait(1)会立即收到中断。 所以,一个解决方法是指定一个超时。 为此,请replace

  results = pool.map(slowly_square, range(40)) 

  results = pool.map_async(slowly_square, range(40)).get(9999999) 

或类似的。

从我最近发现的,最好的解决办法是设置工作进程忽略SIGINT,并将所有清理代码限制在父进程中。 这解决了空闲和繁忙工作进程的问题,并且在subprocess中不需要error handling代码。

 import signal ... def init_worker(): signal.signal(signal.SIGINT, signal.SIG_IGN) ... def main() pool = multiprocessing.Pool(size, init_worker) ... except KeyboardInterrupt: pool.terminate() pool.join() 

解释和完整的示例代码可以分别在http://noswap.com/blog/python-multiprocessing-keyboardinterrupt/和http://github.com/jreese/multiprocessing-keyboardinterruptfind。;

由于某些原因,通常只处理从基本Exception类inheritance的Exception 。 作为一种解决方法,您可能会重新提升您的KeyboardInterrupt作为一个Exception实例:

 from multiprocessing import Pool import time class KeyboardInterruptError(Exception): pass def f(x): try: time.sleep(x) return x except KeyboardInterrupt: raise KeyboardInterruptError() def main(): p = Pool(processes=4) try: print 'starting the pool map' print p.map(f, range(10)) p.close() print 'pool map complete' except KeyboardInterrupt: print 'got ^C while pool mapping, terminating the pool' p.terminate() print 'pool is terminated' except Exception, e: print 'got exception: %r, terminating the pool' % (e,) p.terminate() print 'pool is terminated' finally: print 'joining pool processes' p.join() print 'join complete' print 'the end' if __name__ == '__main__': main() 

通常你会得到以下输出:

 staring the pool map [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] pool map complete joining pool processes join complete the end 

所以如果你打了^C ,你会得到:

 staring the pool map got ^C while pool mapping, terminating the pool pool is terminated joining pool processes join complete the end 

通常这个简单的结构适用于CtrlC on Pool:

 def signal_handle(_signal, frame): print "Stopping the Jobs." signal.signal(signal.SIGINT, signal_handle) 

正如在几个类似的post中所述:

在没有尝试的情况下在Python中捕获键盘中断

似乎有两个问题,使得多处理恼人的exception。 第一个(由Glenn注意到)是你需要使用带有超时的map_async而不是map来获得即时的响应(即,不要完成处理整个列表)。 第二个(由Andrey指出)是多处理不捕获从Exceptioninheritance的Exception (例如, SystemExit )。 所以这是我的解决scheme,处理这两个:

 import sys import functools import traceback import multiprocessing def _poolFunctionWrapper(function, arg): """Run function under the pool Wrapper around function to catch exceptions that don't inherit from Exception (which aren't caught by multiprocessing, so that you end up hitting the timeout). """ try: return function(arg) except: cls, exc, tb = sys.exc_info() if issubclass(cls, Exception): raise # No worries # Need to wrap the exception with something multiprocessing will recognise import traceback print "Unhandled exception %s (%s):\n%s" % (cls.__name__, exc, traceback.format_exc()) raise Exception("Unhandled exception: %s (%s)" % (cls.__name__, exc)) def _runPool(pool, timeout, function, iterable): """Run the pool Wrapper around pool.map_async, to handle timeout. This is required so as to trigger an immediate interrupt on the KeyboardInterrupt (Ctrl-C); see http://stackoverflow.com/questions/1408356/keyboard-interrupts-with-pythons-multiprocessing-pool Further wraps the function in _poolFunctionWrapper to catch exceptions that don't inherit from Exception. """ return pool.map_async(functools.partial(_poolFunctionWrapper, function), iterable).get(timeout) def myMap(function, iterable, numProcesses=1, timeout=9999): """Run the function on the iterable, optionally with multiprocessing""" if numProcesses > 1: pool = multiprocessing.Pool(processes=numProcesses, maxtasksperchild=1) mapFunc = functools.partial(_runPool, pool, timeout) else: pool = None mapFunc = map results = mapFunc(function, iterable) if pool is not None: pool.close() pool.join() return results 

我发现,目前最好的解决scheme是不使用multiprocessing.poolfunction,而是使用自己的池function。 我提供了一个示例,演示apply_async的错误以及一个示例,显示如何完全避免使用池function。

http://www.bryceboe.com/2010/08/26/python-multiprocessing-and-keyboardinterrupt/

投票答案没有解决核心问题,但也有类似的副作用。

多处理库作者Jesse Noller解释了如何在旧博客文章中使用multiprocessing.Pool时正确处理CTRL + C。

 import signal from multiprocessing import Pool def initializer(): """Ignore CTRL+C in the worker process.""" signal.signal(signal.SIGINT, signal.SIG_IGN) pool = Pool(initializer=initializer) try: pool.map(perform_download, dowloads) except KeyboardInterrupt: pool.terminate() pool.join() 

我是Python的新手。 我到处寻找答案,并偶然发现这个和其他一些博客和YouTubevideo。 我试图复制上面粘贴作者的代码,并在Windows 7 64位我的python 2.7.13重现它。 这接近我想要达到的目标。

我让我的subprocess忽略ControlC并使父进程终止。 看起来像绕过subprocess确实避免了这个问题对我来说。

 #!/usr/bin/python from multiprocessing import Pool from time import sleep from sys import exit def slowly_square(i): try: print "<slowly_square> Sleeping and later running a square calculation..." sleep(1) return i * i except KeyboardInterrupt: print "<child processor> Don't care if you say CtrlC" pass def go(): pool = Pool(8) try: results = pool.map(slowly_square, range(40)) except KeyboardInterrupt: # *** THIS PART NEVER EXECUTES. *** :( pool.terminate() pool.close() print "You cancelled the program!" exit(1) print "Finally, here are the results", results if __name__ == '__main__': go() 

奇怪的是,它看起来像你必须处理在子项中的KeyboardInterrupt 。 我会期望这样做的书面工作…尝试改变slowly_square

 def slowly_square(i): try: sleep(1) return i * i except KeyboardInterrupt: print 'You EVIL bastard!' return 0 

这应该如你所料。