在函数调用超时

我正在调用Python中的一个函数,我知道这个函数可能会停顿并迫使我重新启动脚本。

我该如何调用函数,或者如何包装它,如果这个过程需要5秒以上的时间,那么脚本将会取消它并执行其他操作?

如果您在UNIX上运行,则可以使用信号包:

In [1]: import signal # Register an handler for the timeout In [2]: def handler(signum, frame): ...: print "Forever is over!" ...: raise Exception("end of time") ...: # This function *may* run for an indetermined time... In [3]: def loop_forever(): ...: import time ...: while 1: ...: print "sec" ...: time.sleep(1) ...: ...: # Register the signal function handler In [4]: signal.signal(signal.SIGALRM, handler) Out[4]: 0 # Define a timeout for your function In [5]: signal.alarm(10) Out[5]: 0 In [6]: try: ...: loop_forever() ...: except Exception, exc: ...: print exc ....: sec sec sec sec sec sec sec sec Forever is over! end of time # Cancel the timer if the function returned before timeout # (ok, mine won't but yours maybe will :) In [7]: signal.alarm(0) Out[7]: 0 

在呼叫alarm.alarm(10) 10秒后,处理程序被调用。 这引发了一个例外,你可以从常规的Python代码中截取。

这个模块在线程中玩的不好(但是,谁呢?)

请注意,由于在发生超时时发生exception,因此可能最终会在函数内部捕获并忽略它们,例如一个这样的函数:

 def loop_forever(): while 1: print 'sec' try: time.sleep(10) except: continue 

你可以使用multiprocessing.Process来做到这一点。

 import multiprocessing import time # bar def bar(): for i in range(100): print "Tick" time.sleep(1) if __name__ == '__main__': # Start bar as a process p = multiprocessing.Process(target=bar) p.start() # Wait for 10 seconds or until process finishes p.join(10) # If thread is still active if p.is_alive(): print "running... let's kill it..." # Terminate p.terminate() p.join() 

我有一个不同的build议,这是一个纯函数(与线程build议相同的API),似乎工作正常(根据对这个线程的build议)

 def timeout(func, args=(), kwargs={}, timeout_duration=1, default=None): import signal class TimeoutError(Exception): pass def handler(signum, frame): raise TimeoutError() # set the timeout handler signal.signal(signal.SIGALRM, handler) signal.alarm(timeout_duration) try: result = func(*args, **kwargs) except TimeoutError as exc: result = default finally: signal.alarm(0) return result 

我该如何调用这个函数,或者我怎样包装它,这样如果这个脚本需要超过5秒的时间,脚本就会取消它呢?

我发布了一个解决这个问题与装饰和threading.Timer 。 这是一个故障。

导入和设置兼容性

它是用Python 2和3testing的。它也应该在Unix / Linux和Windows下工作。

首先是import。 无论Python版本如何,这些尝试都保持代码一致:

 from __future__ import print_function import sys import threading from time import sleep try: import thread except ImportError: import _thread as thread 

使用版本无关的代码:

 try: range, _print = xrange, print def print(*args, **kwargs): flush = kwargs.pop('flush', False) _print(*args, **kwargs) if flush: kwargs.get('file', sys.stdout).flush() except NameError: pass 

现在我们已经从标准库中导入了我们的function。

exit_after装饰器

接下来,我们需要一个函数来终止子线程的main()

 def quit_function(fn_name): # print to stderr, unbuffered in Python 2. print('{0} took too long'.format(fn_name), file=sys.stderr) sys.stderr.flush() # Python 3 stderr is likely buffered. thread.interrupt_main() # raises KeyboardInterrupt 

这里是装饰者本身:

 def exit_after(s): ''' use as decorator to exit process if function takes longer than s seconds ''' def outer(fn): def inner(*args, **kwargs): timer = threading.Timer(s, quit_function, args=[fn.__name__]) timer.start() try: result = fn(*args, **kwargs) finally: timer.cancel() return result return inner return outer 

用法

这里是直接回答你5秒后退出的问题!

 @exit_after(5) def countdown(n): print('countdown started', flush=True) for i in range(n, -1, -1): print(i, end=', ', flush=True) sleep(1) print('countdown finished') 

演示:

 >>> countdown(3) countdown started 3, 2, 1, 0, countdown finished >>> countdown(10) countdown started 10, 9, 8, 7, 6, countdown took too long Traceback (most recent call last): File "<stdin>", line 1, in <module> File "<stdin>", line 11, in inner File "<stdin>", line 6, in countdown KeyboardInterrupt 

第二个函数调用将不会完成,而是应该退出回溯!

KeyboardInterrupt并不总是停止睡眠线程

请注意,在Windows上的Python 2中,睡眠不会总是被键盘中断所中断,例如:

 @exit_after(1) def sleep10(): sleep(10) print('slept 10 seconds') >>> sleep10() sleep10 took too long # Note that it hangs here about 9 more seconds Traceback (most recent call last): File "<stdin>", line 1, in <module> File "<stdin>", line 11, in inner File "<stdin>", line 3, in sleep10 KeyboardInterrupt 

也不会中断在扩展中运行的代码,除非它明确检查PyErr_CheckSignals() ,参见Cython,Python和KeyboardInterrupt被忽略

在任何情况下,我都会避免睡一会儿以上的话 – 这是处理时间的一个永恒。

我该如何调用函数,或者如何包装它,如果这个过程需要5秒以上的时间,那么脚本就会取消它并执行其他的操作。

要抓住它并做其他事情,你可以捕捉到KeyboardInterrupt。

 >>> try: ... countdown(10) ... except KeyboardInterrupt: ... print('do something else') ... countdown started 10, 9, 8, 7, 6, countdown took too long do something else 

当在unit testing中search超时调用时,我跑过了这个线程。 我在答案或第三方软件包中找不到任何简单的东西,所以我在下面写了装饰器,可以直接放入代码中:

 import multiprocessing.pool import functools def timeout(max_timeout): """Timeout decorator, parameter in seconds.""" def timeout_decorator(item): """Wrap the original function.""" @functools.wraps(item) def func_wrapper(*args, **kwargs): """Closure for function.""" pool = multiprocessing.pool.ThreadPool(processes=1) async_result = pool.apply_async(item, args, kwargs) # raises a TimeoutError if execution exceeds max_timeout return async_result.get(max_timeout) return func_wrapper return timeout_decorator 

那么就像这样简单的超时testing或任何你喜欢的function:

 @timeout(5.0) # if execution takes longer than 5 seconds, raise a TimeoutError def test_base_regression(self): ... 

在pypi上find的stopit软件包似乎很好地处理了超时。

我喜欢@stopit.threading_timeoutable装饰器,它为装饰的函数添加了一个timeout参数,它可以达到你所期望的,它会停止函数。

看看pypi: https ://pypi.python.org/pypi/stopit

有很多的build议,但没有使用concurrent.futures,我认为这是最清晰的方式来处理这个问题。

 from concurrent.futures import ProcessPoolExecutor # Warning: this does not terminate function if timeout def timeout_five(fnc, *args, **kwargs): with ProcessPoolExecutor() as p: f = p.submit(fnc, *args, **kwargs) return f.result(timeout=5) 

超级简单的阅读和维护。

我们build立一个池,提交一个进程,然后等待5秒钟,然后引发一个TimeoutError,然后你可以捕获和处理。

原生Python 3.2 +和backported 2.7(点安装期货)。

在线程和进程之间切换就像使用ThreadPoolExecutorreplaceProcessPoolExecutor一样简单。

如果你想在超时终止进程,我会build议寻找卵石 。

我需要嵌套的定时中断(SIGALARM不能这样做),它不会被time.sleep(基于线程的方法无法实现)阻塞。 我结束了从这里复制和轻微修改代码: http : //code.activestate.com/recipes/577600-queue-for-managing-multiple-sigalrm-alarms-concurr/

代码本身:

 #!/usr/bin/python # lightly modified version of http://code.activestate.com/recipes/577600-queue-for-managing-multiple-sigalrm-alarms-concurr/ """alarm.py: Permits multiple SIGALRM events to be queued. Uses a `heapq` to store the objects to be called when an alarm signal is raised, so that the next alarm is always at the top of the heap. """ import heapq import signal from time import time __version__ = '$Revision: 2539 $'.split()[1] alarmlist = [] __new_alarm = lambda t, f, a, k: (t + time(), f, a, k) __next_alarm = lambda: int(round(alarmlist[0][0] - time())) if alarmlist else None __set_alarm = lambda: signal.alarm(max(__next_alarm(), 1)) class TimeoutError(Exception): def __init__(self, message, id_=None): self.message = message self.id_ = id_ class Timeout: ''' id_ allows for nested timeouts. ''' def __init__(self, id_=None, seconds=1, error_message='Timeout'): self.seconds = seconds self.error_message = error_message self.id_ = id_ def handle_timeout(self): raise TimeoutError(self.error_message, self.id_) def __enter__(self): self.this_alarm = alarm(self.seconds, self.handle_timeout) def __exit__(self, type, value, traceback): try: cancel(self.this_alarm) except ValueError: pass def __clear_alarm(): """Clear an existing alarm. If the alarm signal was set to a callable other than our own, queue the previous alarm settings. """ oldsec = signal.alarm(0) oldfunc = signal.signal(signal.SIGALRM, __alarm_handler) if oldsec > 0 and oldfunc != __alarm_handler: heapq.heappush(alarmlist, (__new_alarm(oldsec, oldfunc, [], {}))) def __alarm_handler(*zargs): """Handle an alarm by calling any due heap entries and resetting the alarm. Note that multiple heap entries might get called, especially if calling an entry takes a lot of time. """ try: nextt = __next_alarm() while nextt is not None and nextt <= 0: (tm, func, args, keys) = heapq.heappop(alarmlist) func(*args, **keys) nextt = __next_alarm() finally: if alarmlist: __set_alarm() def alarm(sec, func, *args, **keys): """Set an alarm. When the alarm is raised in `sec` seconds, the handler will call `func`, passing `args` and `keys`. Return the heap entry (which is just a big tuple), so that it can be cancelled by calling `cancel()`. """ __clear_alarm() try: newalarm = __new_alarm(sec, func, args, keys) heapq.heappush(alarmlist, newalarm) return newalarm finally: __set_alarm() def cancel(alarm): """Cancel an alarm by passing the heap entry returned by `alarm()`. It is an error to try to cancel an alarm which has already occurred. """ __clear_alarm() try: alarmlist.remove(alarm) heapq.heapify(alarmlist) finally: if alarmlist: __set_alarm() 

和一个用法示例:

 import alarm from time import sleep try: with alarm.Timeout(id_='a', seconds=5): try: with alarm.Timeout(id_='b', seconds=2): sleep(3) except alarm.TimeoutError as e: print 'raised', e.id_ sleep(30) except alarm.TimeoutError as e: print 'raised', e.id_ else: print 'nope.' 
 #!/usr/bin/python2 import sys, subprocess, threading proc = subprocess.Popen(sys.argv[2:]) timer = threading.Timer(float(sys.argv[1]), proc.terminate) timer.start() proc.wait() timer.cancel() exit(proc.returncode) 

给定的基于线程的解决scheme略有改进。

下面的代码支持exception

 def runFunctionCatchExceptions(func, *args, **kwargs): try: result = func(*args, **kwargs) except Exception, message: return ["exception", message] return ["RESULT", result] def runFunctionWithTimeout(func, args=(), kwargs={}, timeout_duration=10, default=None): import threading class InterruptableThread(threading.Thread): def __init__(self): threading.Thread.__init__(self) self.result = default def run(self): self.result = runFunctionCatchExceptions(func, *args, **kwargs) it = InterruptableThread() it.start() it.join(timeout_duration) if it.isAlive(): return default if it.result[0] == "exception": raise it.result[1] return it.result[1] 

用5秒超时调用它:

 result = timeout(remote_calculate, (myarg,), timeout_duration=5) 

我们可以使用相同的信号。 我想下面的例子对你有用。 与线程相比,它非常简单。

 import signal def timeout(signum, frame): raise myException #this is an infinite loop, never ending under normal circumstances def main(): print 'Starting Main ', while 1: print 'in main ', #SIGALRM is only usable on a unix platform signal.signal(signal.SIGALRM, timeout) #change 5 to however many seconds you need signal.alarm(5) try: main() except myException: print "whoops"