Tag: python multithreading

如何在python Tornado服务器的请求中执行多处理?

我正在使用I / O非阻塞python服务器Tornado。 我有一类GET请求可能需要大量的时间才能完成(想想在5-10秒的范围内)。 问题在于Tornado阻塞了这些请求,以便后续的快速请求被保留,直到缓慢的请求完成。 我看着: https : //github.com/facebook/tornado/wiki/Threading-and-concurrency ,得出结论,我想#3(其他进程)和#4(其他线程)的组合。 #4自己有问题,而当另一个线程正在进行“重型升降”时,我无法得到可靠的控制回到ioloop。 (我认为这是由于GIL和重负载任务具有高CPU负载,并保持远离主Ioloop的控制的事实,但这是一个猜测)。 所以我一直在做原型devise,通过在一个单独的进程中在这些缓慢的GET请求中完成“繁重的任务”,然后在完成请求的过程中将callback放回到Tornado ioloop中。 这释放了ioloop来处理其他请求。 我已经创build了一个简单的例子来演示一个可能的解决scheme,但是很好奇从社区那里得到反馈。 我的问题有两方面:如何简化现行的方法? 它可能存在哪些缺陷? 该方法 利用Tornado内置的asynchronous装饰器,允许请求保持打开状态,并继续使用ioloop。 使用python的multiprocessing模块为“繁重”任务生成一个单独的进程。 我第一次尝试使用threading模块,但无法得到任何可靠的控制权交给ioloop。 多核处理似乎也会利用多核。 在主Ioloop进程中使用正在监视multiprocessing.Queue处理的threading模块启动一个“观察者”线程。当完成时,“繁重”任务的结果将会被调用。 这是需要的,因为我需要一种方式来知道重载任务已经完成,同时还能够通知ioloop这个请求已经完成。 确保“监视器”线程经常使用time.sleep time.sleep(0)调用将控制权交给主Ioloop循环,以便继续处理其他请求。 当队列中存在结果时,使用tornado.ioloop.IOLoop.instance().add_callback()从“观察者”线程添加一个callbacktornado.ioloop.IOLoop.instance().add_callback() ,该函数被logging为从其他线程调用ioloop实例的唯一安全方法。 一定要在callback中调用finish()来完成请求并提交回复。 下面是显示这种方法的一些示例代码。 multi_tornado.py是实现上述大纲的服务器, call_multi.py是一个示例脚本,它以两种不同的方式调用服务器来testing服务器。 两个testing都使用3个慢速GET请求和20个快速GET请求来调用服务器。 在打开和closures线程的情况下都显示结果。 在使用“无线程”运行的情况下,3个缓慢的请求块(每个都需要一秒多的时间才能完成)。 20个快速请求中的一些挤入了ioloop中的一些缓慢请求之间(并不完全确定这是怎么发生的,但可能是我在同一台机器上同时运行服务器和客户端testing脚本的工件)。 这里的要点是,所有的快速请求都受到不同程度的影响。 在使用线程运行的情况下,20个快速请求全部首先立即完成,然后三个慢速请求几乎在同一时间完成,因为它们各自并行运行。 这是所需的行为。 三个缓慢的请求需要2.5秒才能并行完成 – 而在非线程的情况下,三个缓慢的请求总共花费约3.5秒。 所以整体上大概有35%的速度(我认为是由于多核共享)。 但更重要的是 – 快速的请求立即在缓慢的请求中处理。 我没有很多的multithreading编程经验 – 所以虽然这似乎在这里工作,我很好奇学习: 有一个更简单的方法来完成这个吗? 在这种方法中可能会潜伏什么? (注意:未来的折衷可能是运行更多的Tornado实例,并使用像nginx这样的反向代理来进行负载均衡,无论我将使用负载均衡器运行多个实例 – 但是我只关心在这个问题上抛硬件因为看起来硬件在阻塞方面与问题直接相关)。 […]

如何在Python中循环multithreading操作

说我有一个非常大的名单,我正在执行一个这样的操作: for item in items: try: api.my_operation(item) except: print 'error with item' 我的问题是双重的: 有很多项目 api.my_operation需要永远返回 我想使用multithreading立即启动一堆api.my_operations,所以我可以同时处理5个或10个甚至100个项目。 如果my_operation()返回一个exception(因为可能我已经处理了这个项目) – 没关系。 它不会破坏任何东西。 循环可以继续到下一个项目。 注意 :这是针对Python 2.7.3的

Python 3定时input

我希望能够做的是使用input问一个用户一个问题。 例如: print('some scenario') prompt = input("You have 10 seconds to choose the correct answer…\n") 如果时间过去了,就打印一下 print('Sorry, times up.') 任何帮助指向我在正确的方向将不胜感激。

列表是线程安全的

我注意到它经常被build议使用带有多个排队的队列,而不是列表和.pop()。 这是因为列表不是线程安全的,还是其他原因?

Python:并行执行catsubprocess

我正在运行几个cat | zgrep 在远程服务器上执行cat | zgrep命令并单独收集输出以进一步处理: class MainProcessor(mp.Process): def __init__(self, peaks_array): super(MainProcessor, self).__init__() self.peaks_array = peaks_array def run(self): for peak_arr in self.peaks_array: peak_processor = PeakProcessor(peak_arr) peak_processor.start() class PeakProcessor(mp.Process): def __init__(self, peak_arr): super(PeakProcessor, self).__init__() self.peak_arr = peak_arr def run(self): command = 'ssh remote_host cat files_to_process | zgrep –mmap "regex" ' log_lines = (subprocess.check_output(command, shell=True)).split('\n') process_data(log_lines) 但是,这会导致顺序执行subprocess('ssh […]

在函数调用超时

我正在调用Python中的一个函数,我知道这个函数可能会停顿并迫使我重新启动脚本。 我该如何调用函数,或者如何包装它,如果这个过程需要5秒以上的时间,那么脚本将会取消它并执行其他操作?