在Python中的调用者线程中捕获一个线程的exception

一般来说,我对Python和multithreading编程都很陌生。 基本上,我有一个脚本,将文件复制到另一个位置。 我想这将被放置在另一个线程,所以我可以输出....指示该脚本仍在运行。

我遇到的问题是,如果文件不能被复制,将会抛出exception。 如果在主线程中运行,这是可以的; 但是,具有以下代码不起作用:

 try: threadClass = TheThread(param1, param2, etc.) threadClass.start() ##### **Exception takes place here** except: print "Caught an exception" 

在线程类本身,我试图重新抛出exception,但它不起作用。 我在这里看到有人问过类似的问题,但他们似乎都在做比我想做的更具体的事情(我不太了解提供的解决scheme)。 我已经看到人们提到sys.exc_info()的用法,但我不知道在哪里或如何使用它。

所有的帮助非常感谢!

编辑:线程类的代码如下:

 class TheThread(threading.Thread): def __init__(self, sourceFolder, destFolder): threading.Thread.__init__(self) self.sourceFolder = sourceFolder self.destFolder = destFolder def run(self): try: shul.copytree(self.sourceFolder, self.destFolder) except: raise 

问题是thread_obj.start()立即返回。 你产生的子线程在它自己的上下文中用自己的栈执行。 任何发生的exception都在子线程的上下文中,并且在它自己的堆栈中。 我现在可以想到的一种方式是将这些信息传递给父线程,这是通过使用某种消息传递的方式,所以你可以查看一下。

试试这个大小:

 import sys import threading import Queue class ExcThread(threading.Thread): def __init__(self, bucket): threading.Thread.__init__(self) self.bucket = bucket def run(self): try: raise Exception('An error occured here.') except Exception: self.bucket.put(sys.exc_info()) def main(): bucket = Queue.Queue() thread_obj = ExcThread(bucket) thread_obj.start() while True: try: exc = bucket.get(block=False) except Queue.Empty: pass else: exc_type, exc_obj, exc_trace = exc # deal with the exception print exc_type, exc_obj print exc_trace thread_obj.join(0.1) if thread_obj.isAlive(): continue else: break if __name__ == '__main__': main() 

你必须考虑电话的线程。

考虑这个。

你打电话到当地的市议会,问一个问题。 当他们find你的答案,你坚持。 当他们有答案时,他们会告诉你,然后你挂断电话。 如果由于某种原因他们找不到答案(例外),他们会告诉你的。

这是同步,正常的方法调用的工作方式。 你调用一个方法,当它返回时,你有答案(好或坏)。

但是,一个线程更像这样:

你打电话给当地市议会,问一个问题,并要求他们回答时给你回电话。 你然后挂断电话

在这一点上,你不知道他们是否会find答案,所以现在尝试处理调查结果的任何尝试都将失败,因为你根本没有结果。

相反,你必须对来电作出反应,并采取好消息或坏消息,然后处理。

就您的代码而言,您需要拥有能够对线程做出反应的代码,并logging或处理exception。 在你的问题中,你所说的代码不起作用,就像在挂断电话后立即处理电话结果,而你仍然没有答案。

尽pipe不可能直接捕获在不同线程中抛出的exception,但下面是一个代码,可以非常透明地获得与此function非常接近的内容。 您的子线程必须ExThread类而不是threading.Thread ExThread和父线程在等待线程完成其工作时必须调用child_thread.join_with_exception()方法而不是child_thread.join()

这个实现的技术细节:当子线程抛出一个exception时,它通过一个Queue传递给父进程,并在父线程中再次抛出。 注意,这种方法没有忙着等待。

 #!/usr/bin/env python import sys import threading import Queue class ExThread(threading.Thread): def __init__(self): threading.Thread.__init__(self) self.__status_queue = Queue.Queue() def run_with_exception(self): """This method should be overriden.""" raise NotImplementedError def run(self): """This method should NOT be overriden.""" try: self.run_with_exception() except BaseException: self.__status_queue.put(sys.exc_info()) self.__status_queue.put(None) def wait_for_exc_info(self): return self.__status_queue.get() def join_with_exception(self): ex_info = self.wait_for_exc_info() if ex_info is None: return else: raise ex_info[1] class MyException(Exception): pass class MyThread(ExThread): def __init__(self): ExThread.__init__(self) def run_with_exception(self): thread_name = threading.current_thread().name raise MyException("An error in thread '{}'.".format(thread_name)) def main(): t = MyThread() t.start() try: t.join_with_exception() except MyException as ex: thread_name = threading.current_thread().name print "Caught a MyException in thread '{}': {}".format(thread_name, ex) if __name__ == '__main__': main() 

concurrent.futures模块使得在单独的线程(或进程)中工作变得简单,并处理任何产生的exception:

 import concurrent.futures import shutil def copytree_with_dots(src_path, dst_path): with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: # Execute the copy on a separate thread, # creating a future object to track progress. future = executor.submit(shutil.copytree, src_path, dst_path) while future.running(): # Print pretty dots here. pass # Return the value returned by shutil.copytree(), None. # Raise any exceptions raised during the copy process. return future.result() 

concurrent.futures包含在Python 3.2中,并且可用作早期版本的backported futures模块 。

如果线程中发生exception,最好的方法是在join在调用者线程中重新引发exception。 您可以使用sys.exc_info()函数获取有关当前正在处理的exception的信息。 这个信息可以简单地存储为线程对象的一个​​属性,直到调用连接,在这一点上它可以被重新提出。

请注意, Queue.Queue (如其他答案中所build议的)在这种简单情况下不是必需的,在这种情况下,线程抛出最多1个exception在抛出exception后立即完成 。 我们通过简单地等待线程完成来避免竞争条件。

例如,扩展ExcThread (下面),覆盖excRun (而不是run )。

Python 2.x:

 import threading class ExcThread(threading.Thread): def excRun(self): pass def run(self): self.exc = None try: # Possibly throws an exception self.excRun() except: import sys self.exc = sys.exc_info() # Save details of the exception thrown but don't rethrow, # just complete the function def join(self): threading.Thread.join(self) if self.exc: msg = "Thread '%s' threw an exception: %s" % (self.getName(), self.exc[1]) new_exc = Exception(msg) raise new_exc.__class__, new_exc, self.exc[2] 

Python 3.x:

在Python 3中, raise的3个参数表单已经消失,所以将最后一行改为:

 raise new_exc.with_traceback(self.exc[2]) 

这个问题有很多非常复杂的答案。 我简单地说明了这一点,因为这对我来说似乎足够了。

 from threading import Thread class PropagatingThread(Thread): def run(self): self.exc = None try: if hasattr(self, '_Thread__target'): # Thread uses name mangling prior to Python 3. self.ret = self._Thread__target(*self._Thread__args, **self._Thread__kwargs) else: self.ret = self._target(*self._args, **self._kwargs) except BaseException as e: self.exc = e def join(self): super(PropagatingThread, self).join() if self.exc: raise self.exc return self.ret 

如果你确定你只能运行在一个或者另一个版本的Python上,你可以将run()方法简化为只有损坏的版本(如果你只能在3之前的Python版本上运行),或者干净的版本(如果你只能运行在以3开始的Python版本上)。

用法示例:

 def f(*args, **kwargs) print(args) print(kwargs) raise Exception('I suck') t = PropagatingThread(target=f, args=(5,), kwargs={'hello':'world'}) t.start() t.join() 

当你join时,你会看到在另一个线程上发生的exception。

这是一个讨厌的小问题,我想抛出我的解决scheme。我发现一些其他的解决scheme(例如async.io)看起来很有前景,但也提出了一个黑盒子。 队列/事件循环方法将您与某个实现联系起来。 并发期货源代码只有1000行左右,易于理解 。 它允许我轻松地解决我的问题:创build临时工作线程,而不需要太多设置,并能够在主线程中捕获exception。

我的解决scheme使用并发期货API和线程API。 它可以让你创build一个工作者,让你既是线程也是未来。 这样,你可以join线程来等待结果:

 worker = Worker(test) thread = worker.start() thread.join() print(worker.future.result()) 

…或者你可以让工作人员在完成时发送一个回叫:

 worker = Worker(test) thread = worker.start(lambda x: print('callback', x)) 

…或者你可以循环,直到事件完成:

 worker = Worker(test) thread = worker.start() while True: print("waiting") if worker.future.done(): exc = worker.future.exception() print('exception?', exc) result = worker.future.result() print('result', result) break time.sleep(0.25) 

代码如下:

 from concurrent.futures import Future import threading import time class Worker(object): def __init__(self, fn, args=()): self.future = Future() self._fn = fn self._args = args def start(self, cb=None): self._cb = cb self.future.set_running_or_notify_cancel() thread = threading.Thread(target=self.run, args=()) thread.daemon = True #this will continue thread execution after the main thread runs out of code - you can still ctrl + c or kill the process thread.start() return thread def run(self): try: self.future.set_result(self._fn(*self._args)) except BaseException as e: self.future.set_exception(e) if(self._cb): self._cb(self.future.result()) 

…和testingfunction:

 def test(*args): print('args are', args) time.sleep(2) raise Exception('foo') 

作为Threading的一个不好意思,我花了很长时间去理解如何实现Mateusz Kobos的代码(上面)。 这是一个澄清的版本,以帮助理解如何使用它。

 #!/usr/bin/env python import sys import threading import Queue class ExThread(threading.Thread): def __init__(self): threading.Thread.__init__(self) self.__status_queue = Queue.Queue() def run_with_exception(self): """This method should be overriden.""" raise NotImplementedError def run(self): """This method should NOT be overriden.""" try: self.run_with_exception() except Exception: self.__status_queue.put(sys.exc_info()) self.__status_queue.put(None) def wait_for_exc_info(self): return self.__status_queue.get() def join_with_exception(self): ex_info = self.wait_for_exc_info() if ex_info is None: return else: raise ex_info[1] class MyException(Exception): pass class MyThread(ExThread): def __init__(self): ExThread.__init__(self) # This overrides the "run_with_exception" from class "ExThread" # Note, this is where the actual thread to be run lives. The thread # to be run could also call a method or be passed in as an object def run_with_exception(self): # Code will function until the int print "sleeping 5 seconds" import time for i in 1, 2, 3, 4, 5: print i time.sleep(1) # Thread should break here int("str") # I'm honestly not sure why these appear here? So, I removed them. # Perhaps Mateusz can clarify? # thread_name = threading.current_thread().name # raise MyException("An error in thread '{}'.".format(thread_name)) if __name__ == '__main__': # The code lives in MyThread in this example. So creating the MyThread # object set the code to be run (but does not start it yet) t = MyThread() # This actually starts the thread t.start() print print ("Notice 't.start()' is considered to have completed, although" " the countdown continues in its new thread. So you code " "can tinue into new processing.") # Now that the thread is running, the join allows for monitoring of it try: t.join_with_exception() # should be able to be replace "Exception" with specific error (untested) except Exception, e: print print "Exceptioon was caught and control passed back to the main thread" print "Do some handling here...or raise a custom exception " thread_name = threading.current_thread().name e = ("Caught a MyException in thread: '" + str(thread_name) + "' [" + str(e) + "]") raise Exception(e) # Or custom class of exception, such as MyException 

像RickardSjogren没有队列,sys等类似的方式,但也没有一些监听信号:直接执行一个exception处理程序,对应于一个except块。

 #!/usr/bin/env python3 import threading class ExceptionThread(threading.Thread): def __init__(self, callback=None, *args, **kwargs): """ Redirect exceptions of thread to an exception handler. :param callback: function to handle occured exception :type callback: function(thread, exception) :param args: arguments for threading.Thread() :type args: tuple :param kwargs: keyword arguments for threading.Thread() :type kwargs: dict """ self._callback = callback super().__init__(*args, **kwargs) def run(self): try: if self._target: self._target(*self._args, **self._kwargs) except BaseException as e: if self._callback is None: raise e else: self._callback(self, e) finally: # Avoid a refcycle if the thread is running a function with # an argument that has a member that points to the thread. del self._target, self._args, self._kwargs, self._callback 

只有self._callback和run()中的except块才是普通线程的附加。

我喜欢的一种方法是基于观察者模式 。 我定义了一个线程用来向侦听器发出exception的信号类。 它也可以用来从线程返回值。 例:

 import threading class Signal: def __init__(self): self._subscribers = list() def emit(self, *args, **kwargs): for func in self._subscribers: func(*args, **kwargs) def connect(self, func): self._subscribers.append(func) def disconnect(self, func): try: self._subscribers.remove(func) except ValueError: raise ValueError('Function {0} not removed from {1}'.format(func, self)) class WorkerThread(threading.Thread): def __init__(self, *args, **kwargs): super(WorkerThread, self).__init__(*args, **kwargs) self.Exception = Signal() self.Result = Signal() def run(self): if self._Thread__target is not None: try: self._return_value = self._Thread__target(*self._Thread__args, **self._Thread__kwargs) except Exception as e: self.Exception.emit(e) else: self.Result.emit(self._return_value) if __name__ == '__main__': import time def handle_exception(exc): print exc.message def handle_result(res): print res def a(): time.sleep(1) raise IOError('a failed') def b(): time.sleep(2) return 'b returns' t = WorkerThread(target=a) t2 = WorkerThread(target=b) t.Exception.connect(handle_exception) t2.Result.connect(handle_result) t.start() t2.start() print 'Threads started' t.join() t2.join() print 'Done' 

我没有足够的线程经验来声称这是一个完全安全的方法。 但它已经为我工作,我喜欢灵活性。

使用赤身裸体不是一个好的做法,因为你通常比你讨价还价。

我build议修改except只捕获你想处理的exception。 我不认为提高它有所期望的效果,因为当你去外部try实例化TheThread ,如果它引发一个exception,分配永远不会发生。

相反,您可能只想提醒一下,然后继续前进,例如:

 def run(self): try: shul.copytree(self.sourceFolder, self.destFolder) except OSError, err: print err 

那么当这个exception被捕获时,你可以在那里处理它。 然后,当外部tryTheThread捕获一个exception时,就知道它不会是你已经处理的exception,并且会帮助你隔离你的stream程。

捕获线程的exception并传callback用方法的简单方法可以是将字典或列表传递给worker方法。

示例(将字典传递给worker方法):

 import threading def my_method(throw_me): raise Exception(throw_me) def worker(shared_obj, *args, **kwargs): try: shared_obj['target'](*args, **kwargs) except Exception as err: shared_obj['err'] = err shared_obj = {'err':'', 'target': my_method} throw_me = "Test" th = threading.Thread(target=worker, args=(shared_obj, throw_me), kwargs={}) th.start() th.join() if shared_obj['err']: print(">>%s" % shared_obj['err'])