Python多处理 – Pipe vs Queue

Python的多处理包中的队列和pipe道有什么根本区别?

在什么情况下应该select一个吗? 什么时候使用Pipe()有利? 什么时候使用Queue()是有利的?

  • Pipe()只能有两个端点。

  • Queue()可以有多个生产者和消费者。

何时使用它们

如果您需要两点以上的沟通,请使用Queue()

如果你需要绝对的性能, Pipe()要快得多,因为Queue()build立在Pipe()之上。

性能标杆pipe理

假设您想要产生两个进程并尽快发送消息。 这些是使用Pipe()Queue()进行类似testing之间拖拽比赛的计时结果…这是在运行Ubuntu 11.10和Python 2.7.2的ThinkpadT61上进行的。

仅供参考,我投入JoinableQueue()结果作为奖金; 当queue.task_done()被调用(它甚至不知道具体的任务,它只是对队列中的未完成的任务进行计数)时, queue.join()queue.join()任务,所以queue.join()知道工作已经完成。

这个答案的底部每个代码…

 mpenning@mpenning-T61:~$ python multi_pipe.py Sending 10000 numbers to Pipe() took 0.0369849205017 seconds Sending 100000 numbers to Pipe() took 0.328398942947 seconds Sending 1000000 numbers to Pipe() took 3.17266988754 seconds mpenning@mpenning-T61:~$ python multi_queue.py Sending 10000 numbers to Queue() took 0.105256080627 seconds Sending 100000 numbers to Queue() took 0.980564117432 seconds Sending 1000000 numbers to Queue() took 10.1611330509 seconds mpnening@mpenning-T61:~$ python multi_joinablequeue.py Sending 10000 numbers to JoinableQueue() took 0.172781944275 seconds Sending 100000 numbers to JoinableQueue() took 1.5714070797 seconds Sending 1000000 numbers to JoinableQueue() took 15.8527247906 seconds mpenning@mpenning-T61:~$ 

总结Pipe()Queue()快大约三倍。 甚至不要考虑JoinableQueue()除非你真的必须得到好处。

奖励材料2

多处理引入了信息stream中的细微变化,除非您知道一些快捷方式,否则会使debugging变得困难。 例如,你可能有一个脚本,在许多条件下通过字典索引时工作正常,但很less有某些input失败。

通常,当整个python进程崩溃时,我们会得到失败的线索; 但是,如果多处理function崩溃,则不会获得打印到控制台的未经请求的故障回溯。 跟踪未知的多重处理崩溃是很难的,而不知道什么是崩溃的过程。

我发现跟踪多处理崩溃信息最简单的方法是将整个多处理函数封装在try / except并使用traceback.print_exc()

 import traceback def reader(args): try: # Insert stuff to be multiprocessed here return args[0]['that'] except: print "FATAL: reader({0}) exited while multiprocessing".format(args) traceback.print_exc() 

现在,当你发现一个崩溃,你会看到类似于:

 FATAL: reader([{'crash', 'this'}]) exited while multiprocessing Traceback (most recent call last): File "foo.py", line 19, in __init__ self.run(task_q, result_q) File "foo.py", line 46, in run raise ValueError ValueError 

源代码:


 """ multi_pipe.py """ from multiprocessing import Process, Pipe import time def reader(pipe): output_p, input_p = pipe input_p.close() # We are only reading while True: try: msg = output_p.recv() # Read from the output pipe and do nothing except EOFError: break def writer(count, input_p): for ii in xrange(0, count): input_p.send(ii) # Write 'count' numbers into the input pipe if __name__=='__main__': for count in [10**4, 10**5, 10**6]: output_p, input_p = Pipe() reader_p = Process(target=reader, args=((output_p, input_p),)) reader_p.start() # Launch the reader process output_p.close() # We no longer need this part of the Pipe() _start = time.time() writer(count, input_p) # Send a lot of stuff to reader() input_p.close() # Ask the reader to stop when it reads EOF reader_p.join() print "Sending %s numbers to Pipe() took %s seconds" % (count, (time.time() - _start)) 

 """ multi_queue.py """ from multiprocessing import Process, Queue import time def reader(queue): while True: msg = queue.get() # Read from the queue and do nothing if (msg == 'DONE'): break def writer(count, queue): for ii in xrange(0, count): queue.put(ii) # Write 'count' numbers into the queue queue.put('DONE') if __name__=='__main__': for count in [10**4, 10**5, 10**6]: queue = Queue() # reader() reads from queue # writer() writes to queue reader_p = Process(target=reader, args=((queue),)) reader_p.daemon = True reader_p.start() # Launch the reader process _start = time.time() writer(count, queue) # Send a lot of stuff to reader() reader_p.join() # Wait for the reader to finish print "Sending %s numbers to Queue() took %s seconds" % (count, (time.time() - _start)) 

 """ multi_joinablequeue.py """ from multiprocessing import Process, JoinableQueue import time def reader(queue): while True: msg = queue.get() # Read from the queue and do nothing queue.task_done() def writer(count, queue): for ii in xrange(0, count): queue.put(ii) # Write 'count' numbers into the queue if __name__=='__main__': for count in [10**4, 10**5, 10**6]: queue = JoinableQueue() # reader() reads from queue # writer() writes to queue reader_p = Process(target=reader, args=((queue),)) reader_p.daemon = True reader_p.start() # Launch the reader process _start = time.time() writer(count, queue) # Send a lot of stuff to reader() queue.join() # Wait for the reader to finish print "Sending %s numbers to JoinableQueue() took %s seconds" % (count, (time.time() - _start))