如何从python的线程得到返回值?

如何获得线程返回的值'foo'

 from threading import Thread def foo(bar): print 'hello {}'.format(bar) return 'foo' thread = Thread(target=foo, args=('world!',)) thread.start() ret = thread.join() print ret 

上面显示的一个显而易见的方法是返回None

FWIW, multiprocessing模块使用Pool类有一个很好的界面。 如果你想坚持使用线程而不是进程,你可以使用multiprocessing.pool.ThreadPool类作为一个插件替换。

 def foo(bar, baz): print 'hello {0}'.format(bar) return 'foo' + baz from multiprocessing.pool import ThreadPool pool = ThreadPool(processes=1) async_result = pool.apply_async(foo, ('world', 'foo')) # tuple of args for foo # do some other stuff in the main process return_val = async_result.get() # get the return value from your function. 

我见过的一种方法是将可变对象(如列表或字典)传递给线程的构造函数,以及某种索引或其他标识符。 线程然后可以将其结果存储在该对象的专用槽中。 例如:

 def foo(bar, result, index): print 'hello {0}'.format(bar) result[index] = "foo" from threading import Thread threads = [None] * 10 results = [None] * 10 for i in range(len(threads)): threads[i] = Thread(target=foo, args=('world!', results, i)) threads[i].start() # do some other stuff for i in range(len(threads)): threads[i].join() print " ".join(results) # what sound does a metasyntactic locomotive make? 

如果你真的想让join()返回被调用函数的返回值,那么可以使用如下所示的Thread子类来实现:

 from threading import Thread def foo(bar): print 'hello {0}'.format(bar) return "foo" class ThreadWithReturnValue(Thread): def __init__(self, group=None, target=None, name=None, args=(), kwargs={}, Verbose=None): Thread.__init__(self, group, target, name, args, kwargs, Verbose) self._return = None def run(self): if self._Thread__target is not None: self._return = self._Thread__target(*self._Thread__args, **self._Thread__kwargs) def join(self): Thread.join(self) return self._return twrv = ThreadWithReturnValue(target=foo, args=('world!',)) twrv.start() print twrv.join() # prints foo 

由于某些名称的改变,这会变得有点毛骨悚然,并且它访问专用于Thread实现的“私有”数据结构…但是它起作用。

杰克的答案是好的,但如果你不想使用线程池(你不知道需要多少线程,但根据需要创建它们),那么在线程之间传输信息的好方法是内置Queue.Queue类,因为它提供了线程安全性。

我创建了下面的装饰器,使其与线程池的行为类似:

 def threaded(f, daemon=False): import Queue def wrapped_f(q, *args, **kwargs): '''this function calls the decorated function and puts the result in a queue''' ret = f(*args, **kwargs) q.put(ret) def wrap(*args, **kwargs): '''this is the function returned from the decorator. It fires off wrapped_f in a new thread and returns the thread object with the result queue attached''' q = Queue.Queue() t = threading.Thread(target=wrapped_f, args=(q,)+args, kwargs=kwargs) t.daemon = daemon t.start() t.result_queue = q return t return wrap 

那么你只是使用它:

 @threaded def long_task(x): import time x = x + 5 time.sleep(5) return x # does not block, returns Thread object y = long_task(10) print y # this blocks, waiting for the result result = y.result_queue.get() print result 

装饰的函数每次调用时都会创建一个新线程,并返回一个包含将接收结果的队列的Thread对象。

我偷了所有的答案,并把它清理了一下。

关键部分是将* args和** kwargs添加到join()中以处理超时

 class threadWithReturn(Thread): def __init__(self, *args, **kwargs): super(threadWithReturn, self).__init__(*args, **kwargs) self._return = None def run(self): if self._Thread__target is not None: self._return = self._Thread__target(*self._Thread__args, **self._Thread__kwargs) def join(self, *args, **kwargs): super(threadWithReturn, self).join(*args, **kwargs) return self._return 

另一个不需要更改现有代码的解决方案:

 import Queue from threading import Thread def foo(bar): print 'hello {0}'.format(bar) return 'foo' que = Queue.Queue() t = Thread(target=lambda q, arg1: q.put(foo(arg1)), args=(que, 'world!')) t.start() t.join() result = que.get() print result 

它也可以很容易地调整到多线程环境:

 import Queue from threading import Thread def foo(bar): print 'hello {0}'.format(bar) return 'foo' que = Queue.Queue() threads_list = list() t = Thread(target=lambda q, arg1: q.put(foo(arg1)), args=(que, 'world!')) t.start() threads_list.append(t) # Add more threads here ... threads_list.append(t2) ... threads_list.append(t3) ... # Join all the threads for t in threads_list: t.join() # Check thread's return value while not que.empty(): result = que.get() print result 

Parris / kindall的答案 join / return移植到Python 3的答案:

 from threading import Thread def foo(bar): print('hello {0}'.format(bar)) return "foo" class ThreadWithReturnValue(Thread): def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None): Thread.__init__(self, group, target, name, args, kwargs, daemon=daemon) self._return = None def run(self): if self._target is not None: self._return = self._target(*self._args, **self._kwargs) def join(self): Thread.join(self) return self._return twrv = ThreadWithReturnValue(target=foo, args=('world!',)) twrv.start() print(twrv.join()) # prints foo 

请注意, Thread类在Python 3中以不同的方式实现。

我解决这个问题的方法是将函数和线程包装到一个类中。 不需要使用池,队列或c类型变量传递。 它也是非阻塞的。 你检查状态,而不是。 查看如何在代码结束时使用它的示例。

 import threading class ThreadWorker(): ''' The basic idea is given a function create an object. The object can then run the function in a thread. It provides a wrapper to start it,check its status,and get data out the function. ''' def __init__(self,func): self.thread = None self.data = None self.func = self.save_data(func) def save_data(self,func): '''modify function to save its returned data''' def new_func(*args, **kwargs): self.data=func(*args, **kwargs) return new_func def start(self,params): self.data = None if self.thread is not None: if self.thread.isAlive(): return 'running' #could raise exception here #unless thread exists and is alive start or restart it self.thread = threading.Thread(target=self.func,args=params) self.thread.start() return 'started' def status(self): if self.thread is None: return 'not_started' else: if self.thread.isAlive(): return 'running' else: return 'finished' def get_results(self): if self.thread is None: return 'not_started' #could return exception else: if self.thread.isAlive(): return 'running' else: return self.data def add(x,y): return x +y add_worker = ThreadWorker(add) print add_worker.start((1,2,)) print add_worker.status() print add_worker.get_results() 

您可以使用Pool作为工作进程池,如下所示:

 from multiprocessing import Pool def f1(x, y): return x*y if __name__ == '__main__': with Pool(processes=10) as pool: result = pool.apply(f1, (2, 3)) print(result) 

join总是返回None ,我想你应该子类Thread来处理返回码等等。

定义你的目标
1)拿一个参数q
2)用q.put(foo); return替换任何return foo语句q.put(foo); return q.put(foo); return

所以一个功能

 def func(a): ans = a * a return ans 

会成为

 def func(a, q): ans = a * a q.put(ans) return 

然后你会这样做

 from Queue import Queue from threading import Thread ans_q = Queue() arg_tups = [(i, ans_q) for i in xrange(10)] threads = [Thread(target=func, args=arg_tup) for arg_tup in arg_tups] _ = [t.start() for t in threads] _ = [t.join() for t in threads] results = [q.get() for _ in xrange(len(threads))] 

你可以使用函数装饰器/包装器来创建它,所以你可以使用你现有的函数作为target而不用修改它们,但是遵循这个基本的方案。

你可以在线程函数的作用域之上定义一个mutable,并将结果添加到该函数中。 (我也修改了python3兼容的代码)

 returns = {} def foo(bar): print('hello {0}'.format(bar)) returns[bar] = 'foo' from threading import Thread t = Thread(target=foo, args=('world!',)) t.start() t.join() print(returns) 

这返回{'world!': 'foo'}

如果使用函数输入作为结果字典的关键字,则每个唯一的输入都会保证在结果中输入一个条目

我正在使用这个包装器,它可以舒适地转换Thread运行的任何函数 – 处理返回值或异常。 它不会添加Queue开销。

 def threading_func(f): """Decorator for running a function in a thread and handling its return value or exception""" def start(*args, **kw): def run(): try: th.ret = f(*args, **kw) except: th.exc = sys.exc_info() def get(timeout=None): th.join(timeout) if th.exc: raise th.exc[0], th.exc[1], th.exc[2] # py2 ##raise th.exc[1] #py3 return th.ret th = threading.Thread(None, run) th.exc = None th.get = get th.start() return th return start 

用法示例

 def f(x): return 2.5 * x th = threading_func(f)(4) print("still running?:", th.is_alive()) print("result:", th.get(timeout=1.0)) @threading_func def th_mul(a, b): return a * b th = th_mul("text", 2.5) try: print(th.get()) except TypeError: print("exception thrown ok.") 

threading模块的注意事项

线程函数的舒适返回值和异常处理是一种频繁的“Pythonic”需求,应该已经由threading模块提供 – 可能直接在标准Thread类中提供。 ThreadPool对于简单的任务有太多的开销 – 3管理线程,很多官僚作风。 不幸的是, Thread的布局最初是从Java复制而来的 – 例如,您可以从仍然无用的1st(!)构造函数参数group看到该布局。

一个通常的解决方案是用你的装饰器来包装你的函数foo

 result = queue.Queue() def task_wrapper(*args): result.put(target(*args)) 

那么整个代码可能看起来像这样

 result = queue.Queue() def task_wrapper(*args): result.put(target(*args)) threads = [threading.Thread(target=task_wrapper, args=args) for args in args_list] for t in threads: t.start() while(True): if(len(threading.enumerate()) < max_num): break for t in threads: t.join() return result 

注意

一个重要的问题是,返回值可能是无序的 。 (其实return value并不一定保存到queue ,因为你可以选择任意线程安全的数据结构)

为什么不使用全局变量?

 import threading class myThread(threading.Thread): def __init__(self, ind, lock): threading.Thread.__init__(self) self.ind = ind self.lock = lock def run(self): global results with self.lock: results.append(self.ind) results = [] lock = threading.Lock() threads = [myThread(x, lock) for x in range(1, 4)] for t in threads: t.start() for t in threads: t.join() print(results)