如何在python Tornado服务器的请求中执行多处理?

我正在使用I / O非阻塞python服务器Tornado。 我有一类GET请求可能需要大量的时间才能完成(想想在5-10秒的范围内)。 问题在于Tornado阻塞了这些请求,以便后续的快速请求被保留,直到缓慢的请求完成。

我看着: https : //github.com/facebook/tornado/wiki/Threading-and-concurrency ,得出结论,我想#3(其他进程)和#4(其他线程)的组合。 #4自己有问题,而当另一个线程正在进行“重型升降”时,我无法得到可靠的控制回到ioloop。 (我认为这是由于GIL和重负载任务具有高CPU负载,并保持远离主Ioloop的控制的事实,但这是一个猜测)。

所以我一直在做原型devise,通过在一个单独的进程中在这些缓慢的GET请求中完成“繁重的任务”,然后在完成请求的过程中将callback放回到Tornado ioloop中。 这释放了ioloop来处理其他请求。

我已经创build了一个简单的例子来演示一个可能的解决scheme,但是很好奇从社区那里得到反馈。

我的问题有两方面:如何简化现行的方法? 它可能存在哪些缺陷?

该方法

  1. 利用Tornado内置的asynchronous装饰器,允许请求保持打开状态,并继续使用ioloop。

  2. 使用python的multiprocessing模块为“繁重”任务生成一个单独的进程。 我第一次尝试使用threading模块,但无法得到任何可靠的控制权交给ioloop。 多核处理似乎也会利用多核。

  3. 在主Ioloop进程中使用正在监视multiprocessing.Queue处理的threading模块启动一个“观察者”线程。当完成时,“繁重”任务的结果将会被调用。 这是需要的,因为我需要一种方式来知道重载任务已经完成,同时还能够通知ioloop这个请求已经完成。

  4. 确保“监视器”线程经常使用time.sleep time.sleep(0)调用将控制权交给主Ioloop循环,以便继续处理其他请求。

  5. 当队列中存在结果时,使用tornado.ioloop.IOLoop.instance().add_callback()从“观察者”线程添加一个callbacktornado.ioloop.IOLoop.instance().add_callback() ,该函数被logging为从其他线程调用ioloop实例的唯一安全方法。

  6. 一定要在callback中调用finish()来完成请求并提交回复。

下面是显示这种方法的一些示例代码。 multi_tornado.py是实现上述大纲的服务器, call_multi.py是一个示例脚本,它以两种不同的方式调用服务器来testing服务器。 两个testing都使用3个慢速GET请求和20个快速GET请求来调用服务器。 在打开和closures线程的情况下都显示结果。

在使用“无线程”运行的情况下,3个缓慢的请求块(每个都需要一秒多的时间才能完成)。 20个快速请求中的一些挤入了ioloop中的一些缓慢请求之间(并不完全确定这是怎么发生的,但可能是我在同一台机器上同时运行服务器和客户端testing脚本的工件)。 这里的要点是,所有的快速请求都受到不同程度的影响。

在使用线程运行的情况下,20个快速请求全部首先立即完成,然后三个慢速请求几乎在同一时间完成,因为它们各自并行运行。 这是所需的行为。 三个缓慢的请求需要2.5秒才能并行完成 – 而在非线程的情况下,三个缓慢的请求总共花费约3.5秒。 所以整体上大概有35%的速度(我认为是由于多核共享)。 但更重要的是 – 快速的请求立即在缓慢的请求中处理。

我没有很多的multithreading编程经验 – 所以虽然这似乎在这里工作,我很好奇学习:

有一个更简单的方法来完成这个吗? 在这种方法中可能会潜伏什么?

(注意:未来的折衷可能是运行更多的Tornado实例,并使用像nginx这样的反向代理来进行负载均衡,无论我将使用负载均衡器运行多个实例 – 但是我只关心在这个问题上抛硬件因为看起来硬件在阻塞方面与问题直接相关)。

示例代码

multi_tornado.py (示例服务器):

 import time import threading import multiprocessing import math from tornado.web import RequestHandler, Application, asynchronous from tornado.ioloop import IOLoop # run in some other process - put result in q def heavy_lifting(q): t0 = time.time() for k in range(2000): math.factorial(k) t = time.time() q.put(t - t0) # report time to compute in queue class FastHandler(RequestHandler): def get(self): res = 'fast result ' + self.get_argument('id') print res self.write(res) self.flush() class MultiThreadedHandler(RequestHandler): # Note: This handler can be called with threaded = True or False def initialize(self, threaded=True): self._threaded = threaded self._q = multiprocessing.Queue() def start_process(self, worker, callback): # method to start process and watcher thread self._callback = callback if self._threaded: # launch process multiprocessing.Process(target=worker, args=(self._q,)).start() # start watching for process to finish threading.Thread(target=self._watcher).start() else: # threaded = False just call directly and block worker(self._q) self._watcher() def _watcher(self): # watches the queue for process result while self._q.empty(): time.sleep(0) # relinquish control if not ready # put callback back into the ioloop so we can finish request response = self._q.get(False) IOLoop.instance().add_callback(lambda: self._callback(response)) class SlowHandler(MultiThreadedHandler): @asynchronous def get(self): # start a thread to watch for self.start_process(heavy_lifting, self._on_response) def _on_response(self, delta): _id = self.get_argument('id') res = 'slow result {} <--- {:0.3f} s'.format(_id, delta) print res self.write(res) self.flush() self.finish() # be sure to finish request application = Application([ (r"/fast", FastHandler), (r"/slow", SlowHandler, dict(threaded=False)), (r"/slow_threaded", SlowHandler, dict(threaded=True)), ]) if __name__ == "__main__": application.listen(8888) IOLoop.instance().start() 

call_multi.py (客户端testing器):

 import sys from tornado.ioloop import IOLoop from tornado import httpclient def run(slow): def show_response(res): print res.body # make 3 "slow" requests on server requests = [] for k in xrange(3): uri = 'http://localhost:8888/{}?id={}' requests.append(uri.format(slow, str(k + 1))) # followed by 20 "fast" requests for k in xrange(20): uri = 'http://localhost:8888/fast?id={}' requests.append(uri.format(k + 1)) # show results as they return http_client = httpclient.AsyncHTTPClient() print 'Scheduling Get Requests:' print '------------------------' for req in requests: print req http_client.fetch(req, show_response) # execute requests on server print '\nStart sending requests....' IOLoop.instance().start() if __name__ == '__main__': scenario = sys.argv[1] if scenario == 'slow' or scenario == 'slow_threaded': run(scenario) 

检测结果

通过运行python call_multi.py slow (阻塞行为):

 Scheduling Get Requests: ------------------------ http://localhost:8888/slow?id=1 http://localhost:8888/slow?id=2 http://localhost:8888/slow?id=3 http://localhost:8888/fast?id=1 http://localhost:8888/fast?id=2 http://localhost:8888/fast?id=3 http://localhost:8888/fast?id=4 http://localhost:8888/fast?id=5 http://localhost:8888/fast?id=6 http://localhost:8888/fast?id=7 http://localhost:8888/fast?id=8 http://localhost:8888/fast?id=9 http://localhost:8888/fast?id=10 http://localhost:8888/fast?id=11 http://localhost:8888/fast?id=12 http://localhost:8888/fast?id=13 http://localhost:8888/fast?id=14 http://localhost:8888/fast?id=15 http://localhost:8888/fast?id=16 http://localhost:8888/fast?id=17 http://localhost:8888/fast?id=18 http://localhost:8888/fast?id=19 http://localhost:8888/fast?id=20 Start sending requests.... slow result 1 <--- 1.338 s fast result 1 fast result 2 fast result 3 fast result 4 fast result 5 fast result 6 fast result 7 slow result 2 <--- 1.169 s slow result 3 <--- 1.130 s fast result 8 fast result 9 fast result 10 fast result 11 fast result 13 fast result 12 fast result 14 fast result 15 fast result 16 fast result 18 fast result 17 fast result 19 fast result 20 

通过运行python call_multi.py slow_threaded (期望的行为):

 Scheduling Get Requests: ------------------------ http://localhost:8888/slow_threaded?id=1 http://localhost:8888/slow_threaded?id=2 http://localhost:8888/slow_threaded?id=3 http://localhost:8888/fast?id=1 http://localhost:8888/fast?id=2 http://localhost:8888/fast?id=3 http://localhost:8888/fast?id=4 http://localhost:8888/fast?id=5 http://localhost:8888/fast?id=6 http://localhost:8888/fast?id=7 http://localhost:8888/fast?id=8 http://localhost:8888/fast?id=9 http://localhost:8888/fast?id=10 http://localhost:8888/fast?id=11 http://localhost:8888/fast?id=12 http://localhost:8888/fast?id=13 http://localhost:8888/fast?id=14 http://localhost:8888/fast?id=15 http://localhost:8888/fast?id=16 http://localhost:8888/fast?id=17 http://localhost:8888/fast?id=18 http://localhost:8888/fast?id=19 http://localhost:8888/fast?id=20 Start sending requests.... fast result 1 fast result 2 fast result 3 fast result 4 fast result 5 fast result 6 fast result 7 fast result 8 fast result 9 fast result 10 fast result 11 fast result 12 fast result 13 fast result 14 fast result 15 fast result 19 fast result 20 fast result 17 fast result 16 fast result 18 slow result 2 <--- 2.485 s slow result 3 <--- 2.491 s slow result 1 <--- 2.517 s 

如果你愿意使用concurrent.futures.ProcessPoolExecutor而不是multiprocessing ,这实际上很简单。 龙卷风的ioloop已经支持concurrent.futures.Future ,所以他们会玩的很好。 concurrent.futures包含在Python 3.2+中,并已被移植到Python 2.x。

这是一个例子:

 import time from concurrent.futures import ProcessPoolExecutor from tornado.ioloop import IOLoop from tornado import gen def f(a, b, c, blah=None): print "got %s %s %s and %s" % (a, b, c, blah) time.sleep(5) return "hey there" @gen.coroutine def test_it(): pool = ProcessPoolExecutor(max_workers=1) fut = pool.submit(f, 1, 2, 3, blah="ok") # This returns a concurrent.futures.Future print("running it asynchronously") ret = yield fut print("it returned %s" % ret) pool.shutdown() IOLoop.instance().run_sync(test_it) 

输出:

 running it asynchronously got 1 2 3 and ok it returned hey there 

ProcessPoolExecutor有一个比multiprocessing.Pool更有限的API,但是如果你不需要multiprocessing.Pool的更高级的function,那么值得使用,因为集成是非常简单的。

multiprocessing.Pool可以被集成到tornado I / O循环,但有点混乱。 使用concurrent.futures可以完成更清洁的集成(请参阅我的其他答案了解详细信息),但是如果您停留在Python 2.x中,并且无法安装concurrent.futures backport,那么您可以严格执行使用multiprocessing

multiprocessing.Pool.apply_asyncmultiprocessing.Pool.map_async方法都有一个可选的callback参数,这意味着两者都可能被插入到tornado.gen.Task 。 所以在大多数情况下,在subprocess中asynchronous运行代码就像这样简单:

 import multiprocessing import contextlib from tornado import gen from tornado.gen import Return from tornado.ioloop import IOLoop from functools import partial def worker(): print "async work here" @gen.coroutine def async_run(func, *args, **kwargs): result = yield gen.Task(pool.apply_async, func, args, kwargs) raise Return(result) if __name__ == "__main__": pool = multiprocessing.Pool(multiprocessing.cpu_count()) func = partial(async_run, worker) IOLoop().run_sync(func) 

正如我所提到的,这在大多数情况下运作良好。 但是如果worker()抛出一个exception, callback永远不会被调用,这意味着gen.Task永远不会完成,并且永远挂起。 现在,如果你知道你的工作永远不会抛出一个exception(因为你try用例子来包装整个东西),你可以很高兴地使用这种方法。 然而,如果你想让exception从你的worker中逃脱出来,我发现的唯一解决scheme是inheritance一些多处理组件,并且让它们调用callback即使workersubprocess引发exception:

 from multiprocessing.pool import ApplyResult, Pool, RUN import multiprocessing class TornadoApplyResult(ApplyResult): def _set(self, i, obj): self._success, self._value = obj if self._callback: self._callback(self._value) self._cond.acquire() try: self._ready = True self._cond.notify() finally: self._cond.release() del self._cache[self._job] class TornadoPool(Pool): def apply_async(self, func, args=(), kwds={}, callback=None): ''' Asynchronous equivalent of `apply()` builtin This version will call `callback` even if an exception is raised by `func`. ''' assert self._state == RUN result = TornadoApplyResult(self._cache, callback) self._taskqueue.put(([(result._job, None, func, args, kwds)], None)) return result ... if __name__ == "__main__": pool = TornadoPool(multiprocessing.cpu_count()) ... 

随着这些变化,exception对象将由gen.Task返回,而不是无限地挂起的gen.Task 。 我还更新了我的async_run方法,以便在返回时重新引发exception,并进行了一些其他更改,以便为workersubprocess中引发的exception提供更好的回溯。 以下是完整的代码:

 import multiprocessing from multiprocessing.pool import Pool, ApplyResult, RUN from functools import wraps import tornado.web from tornado.ioloop import IOLoop from tornado.gen import Return from tornado import gen class WrapException(Exception): def __init__(self): exc_type, exc_value, exc_tb = sys.exc_info() self.exception = exc_value self.formatted = ''.join(traceback.format_exception(exc_type, exc_value, exc_tb)) def __str__(self): return '\n%s\nOriginal traceback:\n%s' % (Exception.__str__(self), self.formatted) class TornadoApplyResult(ApplyResult): def _set(self, i, obj): self._success, self._value = obj if self._callback: self._callback(self._value) self._cond.acquire() try: self._ready = True self._cond.notify() finally: self._cond.release() del self._cache[self._job] class TornadoPool(Pool): def apply_async(self, func, args=(), kwds={}, callback=None): ''' Asynchronous equivalent of `apply()` builtin This version will call `callback` even if an exception is raised by `func`. ''' assert self._state == RUN result = TornadoApplyResult(self._cache, callback) self._taskqueue.put(([(result._job, None, func, args, kwds)], None)) return result @gen.coroutine def async_run(func, *args, **kwargs): """ Runs the given function in a subprocess. This wraps the given function in a gen.Task and runs it in a multiprocessing.Pool. It is meant to be used as a Tornado co-routine. Note that if func returns an Exception (or an Exception sub-class), this function will raise the Exception, rather than return it. """ result = yield gen.Task(pool.apply_async, func, args, kwargs) if isinstance(result, Exception): raise result raise Return(result) def handle_exceptions(func): """ Raise a WrapException so we get a more meaningful traceback""" @wraps(func) def inner(*args, **kwargs): try: return func(*args, **kwargs) except Exception: raise WrapException() return inner # Test worker functions @handle_exceptions def test2(x): raise Exception("eeee") @handle_exceptions def test(x): print x time.sleep(2) return "done" class TestHandler(tornado.web.RequestHandler): @gen.coroutine def get(self): try: result = yield async_run(test, "inside get") self.write("%s\n" % result) result = yield async_run(test2, "hi2") except Exception as e: print("caught exception in get") self.write("Caught an exception: %s" % e) finally: self.finish() app = tornado.web.Application([ (r"/test", TestHandler), ]) if __name__ == "__main__": pool = TornadoPool(4) app.listen(8888) IOLoop.instance().start() 

以下是客户的行为方式:

 dan@dan:~$ curl localhost:8888/test done Caught an exception: Original traceback: Traceback (most recent call last): File "./mutli.py", line 123, in inner return func(*args, **kwargs) File "./mutli.py", line 131, in test2 raise Exception("eeee") Exception: eeee 

如果我发送两个同时curl请求,我们可以看到他们在服务器端asynchronous处理:

 dan@dan:~$ ./mutli.py inside get inside get caught exception inside get caught exception inside get 

编辑:

请注意,Python 3的代码变得更简单,因为它为所有asynchronousmultiprocessing.Pool方法引入了一个error_callback关键字参数。 这使得与Tornado集成变得更容易:

 class TornadoPool(Pool): def apply_async(self, func, args=(), kwds={}, callback=None): ''' Asynchronous equivalent of `apply()` builtin This version will call `callback` even if an exception is raised by `func`. ''' super().apply_async(func, args, kwds, callback=callback, error_callback=callback) @gen.coroutine def async_run(func, *args, **kwargs): """ Runs the given function in a subprocess. This wraps the given function in a gen.Task and runs it in a multiprocessing.Pool. It is meant to be used as a Tornado co-routine. Note that if func returns an Exception (or an Exception sub-class), this function will raise the Exception, rather than return it. """ result = yield gen.Task(pool.apply_async, func, args, kwargs) raise Return(result) 

我们在重写的apply_async需要做的就是调用带有error_callback关键字参数的父类,除了callback函数kwarg。 无需重写ApplyResult

我们可以通过在我们的TornadoPool使用MetaClass来获得更好的TornadoPool ,以允许直接调用它的*_async方法,就像它们是协程一样:

 import time from functools import wraps from multiprocessing.pool import Pool import tornado.web from tornado import gen from tornado.gen import Return from tornado import stack_context from tornado.ioloop import IOLoop from tornado.concurrent import Future def _argument_adapter(callback): def wrapper(*args, **kwargs): if kwargs or len(args) > 1: callback(Arguments(args, kwargs)) elif args: callback(args[0]) else: callback(None) return wrapper def PoolTask(func, *args, **kwargs): """ Task function for use with multiprocessing.Pool methods. This is very similar to tornado.gen.Task, except it sets the error_callback kwarg in addition to the callback kwarg. This way exceptions raised in pool worker methods get raised in the parent when the Task is yielded from. """ future = Future() def handle_exception(typ, value, tb): if future.done(): return False future.set_exc_info((typ, value, tb)) return True def set_result(result): if future.done(): return if isinstance(result, Exception): future.set_exception(result) else: future.set_result(result) with stack_context.ExceptionStackContext(handle_exception): cb = _argument_adapter(set_result) func(*args, callback=cb, error_callback=cb) return future def coro_runner(func): """ Wraps the given func in a PoolTask and returns it. """ @wraps(func) def wrapper(*args, **kwargs): return PoolTask(func, *args, **kwargs) return wrapper class MetaPool(type): """ Wrap all *_async methods in Pool with coro_runner. """ def __new__(cls, clsname, bases, dct): pdct = bases[0].__dict__ for attr in pdct: if attr.endswith("async") and not attr.startswith('_'): setattr(bases[0], attr, coro_runner(pdct[attr])) return super().__new__(cls, clsname, bases, dct) class TornadoPool(Pool, metaclass=MetaPool): pass # Test worker functions def test2(x): print("hi2") raise Exception("eeee") def test(x): print(x) time.sleep(2) return "done" class TestHandler(tornado.web.RequestHandler): @gen.coroutine def get(self): try: result = yield pool.apply_async(test, ("inside get",)) self.write("%s\n" % result) result = yield pool.apply_async(test2, ("hi2",)) self.write("%s\n" % result) except Exception as e: print("caught exception in get") self.write("Caught an exception: %s" % e) raise finally: self.finish() app = tornado.web.Application([ (r"/test", TestHandler), ]) if __name__ == "__main__": pool = TornadoPool() app.listen(8888) IOLoop.instance().start() 

如果你的请求花费很长时间,龙卷风是错误的框架。

我build议你使用nginx将快速获取到龙卷风和较慢的到不同的服务器。

PeterBe有一篇有趣的文章,他运行多个Tornado服务器,并将其中的一个设置为处理长时间运行请求的“缓慢”请参阅: 令人担忧的Io-blocking我会尝试这种方法。