如何并行化一个简单的Python循环?

这可能是一个微不足道的问题,但我如何并行Python中的以下循环?

# setup output lists output1 = list() output2 = list() output3 = list() for j in range(0, 10): # calc individual parameter value parameter = j * offset # call the calculation out1, out2, out3 = calc_stuff(parameter = parameter) # put results into correct output list output1.append(out1) output2.append(out2) output3.append(out3) 

我知道如何在Python中启动单线程,但是我不知道如何“收集”结果。

多个进程也可以,无论这种情况最简单。 我目前使用的是Linux,但代码应该在Windows和Mac上运行。

并行化这些代码最简单的方法是什么?

由于全局解释器locking(GIL),在CPython上使用多个线程将不会为纯Python代码提供更好的性能。 我build议使用multiprocessing模块:

 pool = multiprocessing.Pool(4) out1, out2, out3 = zip(*pool.map(calc_stuff, range(0, 10 * offset, offset))) 

请注意,这在交互式解释器中不起作用。

为了避免GIL上通常的FUD:无论如何,这个例子中使用线程没有任何优势。 你在这里使用进程,而不是线程,因为它们避免了一大堆问题。

为了并行化一个简单的for循环, joblib为多处理的原始使用带来了很大的价值。 不仅是简短的语法,而且还包括当迭代速度非常快(消除开销)或捕获subprocess回溯时的透明迭代,以获得更好的错误报告。

免责声明:我是joblib的原作者。

并行化这些代码最简单的方法是什么?

我真的很喜欢这个版本 , 从3.2版本开始可以在Python3中使用,并且可以在PyPi上通过backport指向2.6和2.7 版本 。

您可以使用线程或进程并使用完全相同的接口。

把它放在一个文件 – futuretest.py:

 import concurrent.futures import time, random # add some random sleep time offset = 2 # you don't supply these so def calc_stuff(parameter=None): # these are examples. sleep_time = random.choice([0, 1, 2, 3, 4, 5]) time.sleep(sleep_time) return parameter / 2, sleep_time, parameter * parameter def procedure(j): # just factoring out the parameter = j * offset # procedure # call the calculation return calc_stuff(parameter=parameter) def main(): output1 = list() output2 = list() output3 = list() start = time.time() # let's see how long this takes # we can swap out ProcessPoolExecutor for ThreadPoolExecutor with concurrent.futures.ProcessPoolExecutor() as executor: for out1, out2, out3 in executor.map(procedure, range(0, 10)): # put results into correct output list output1.append(out1) output2.append(out2) output3.append(out3) finish = time.time() # these kinds of format strings are only available on Python 3.6: # time to upgrade! print(f'original inputs: {repr(output1)}') print(f'total time to execute {sum(output2)} = sum({repr(output2)})') print(f'time saved by parallelizing: {sum(output2) - (finish-start)}') print(f'returned in order given: {repr(output3)}') if __name__ == '__main__': main() 

这里是输出:

 $ python3 -m futuretest original inputs: [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0] total time to execute 33 = sum([0, 3, 3, 4, 3, 5, 1, 5, 5, 4]) time saved by parallellizing: 27.68999981880188 returned in order given: [0, 4, 16, 36, 64, 100, 144, 196, 256, 324] 

multithreading

现在将ProcessPoolExecutor更改为ThreadPoolExecutor ,然后再次运行该模块:

 $ python3 -m futuretest original inputs: [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0] total time to execute 19 = sum([0, 2, 3, 5, 2, 0, 0, 3, 3, 1]) time saved by parallellizing: 13.992000102996826 returned in order given: [0, 4, 16, 36, 64, 100, 144, 196, 256, 324] 

现在你已经做了multithreading和多处理!

关于性能和使用两者的注意事项。

抽样太小,无法比较结果。

但是,我怀疑multithreading将比一般的多处理更快,特别是在Windows上,因为Windows不支持分叉,所以每个新进程都需要一定的时间才能启动。 在Linux或Mac上,他们可能会更接近。

您可以在多个进程中嵌套多个线程,但build议不要使用多个线程来分离多个进程。

为什么不使用线程和一个互斥体来保护一个全局列表?

 import os import re import time import sys import thread from threading import Thread class thread_it(Thread): def __init__ (self,param): Thread.__init__(self) self.param = param def run(self): mutex.acquire() output.append(calc_stuff(self.param)) mutex.release() threads = [] output = [] mutex = thread.allocate_lock() for j in range(0, 10): current = thread_it(j * offset) threads.append(current) current.start() for t in threads: t.join() #here you have output list filled with data 

请记住,你会像最慢的线程一样快

在Python中实现多处理和并行/分布式计算时,这可能很有用。

关于使用techila包的YouTube教程

Techila是一个分布式计算中间件,它使用techila软件包直接与Python集成。 包装中的桃子function可用于平行环路结构。 (以下代码片段来自Techila社区论坛 )

 techila.peach(funcname = 'theheavyalgorithm', # Function that will be called on the compute nodes/ Workers files = 'theheavyalgorithm.py', # Python-file that will be sourced on Workers jobs = jobcount # Number of Jobs in the Project ) 

看看这个;

http://docs.python.org/library/queue.html

这可能不是正确的做法,但我会做一些类似的事情;

实际的代码;

 from multiprocessing import Process, JoinableQueue as Queue class CustomWorker(Process): def __init__(self,workQueue, out1,out2,out3): Process.__init__(self) self.input=workQueue self.out1=out1 self.out2=out2 self.out3=out3 def run(self): while True: try: value = self.input.get() #value modifier temp1,temp2,temp3 = self.calc_stuff(value) self.out1.put(temp1) self.out2.put(temp2) self.out3.put(temp3) self.input.task_done() except Queue.Empty: return #Catch things better here def calc_stuff(self,param): out1 = param * 2 out2 = param * 4 out3 = param * 8 return out1,out2,out3 def Main(): inputQueue = Queue() for i in range(10): inputQueue.put(i) out1 = Queue() out2 = Queue() out3 = Queue() processes = [] for x in range(2): p = CustomWorker(inputQueue,out1,out2,out3) p.daemon = True p.start() processes.append(p) inputQueue.join() while(not out1.empty()): print out1.get() print out2.get() print out3.get() if __name__ == '__main__': Main() 

希望有所帮助。