如何在多处理器系统上产生并行subprocess?

我有一个Python脚本,我想用作另一个Python脚本的控制器。 我有一个有64个处理器的服务器,所以想要产生多达64个这个第二个Python脚本的subprocess。 子脚本被调用:

$ python create_graphs.py --name=NAME 

NAME是像XYZ,ABC,NYU等

在我的父母控制器脚本中,我从列表中检索名称variables:

 my_list = [ 'XYZ', 'ABC', 'NYU' ] 

所以我的问题是,什么是最好的方式来产生这些进程作为孩子? 我想把孩子的数量限制在64个,所以需要跟踪状态(如果孩子的过程已经完成了),所以我可以有效地保持整个一代的运行。

我考虑使用subprocess包,但拒绝它,因为它一次只产生一个孩子。 我终于find了多处理器软件包,但是我承认被整个线程和subprocess文档所淹没。

现在,我的脚本使用subprocess.call来一次只产生一个孩子,看起来像这样:

 #!/path/to/python import subprocess, multiprocessing, Queue from multiprocessing import Process my_list = [ 'XYZ', 'ABC', 'NYU' ] if __name__ == '__main__': processors = multiprocessing.cpu_count() for i in range(len(my_list)): if( i < processors ): cmd = ["python", "/path/to/create_graphs.py", "--name="+ my_list[i]] child = subprocess.call( cmd, shell=False ) 

我真的希望它一次产生64个孩子。 在其他的stackoverflow问题,我看到人们使用队列,但它似乎创造了性能打击?

你正在寻找的是多进程中的进程池类。

 import multiprocessing import subprocess def work(cmd): return subprocess.call(cmd, shell=False) if __name__ == '__main__': count = multiprocessing.cpu_count() pool = multiprocessing.Pool(processes=count) print pool.map(work, ['ls'] * count) 

这里是一个计算示例,使其更容易理解。 以下将在N个进程中划分10000个任务,其中N是CPU计数。 请注意,我将None作为进程数传递。 这将导致Pool类使用cpu_count作为进程的数量( 引用 )

 import multiprocessing import subprocess def calculate(value): return value * 10 if __name__ == '__main__': pool = multiprocessing.Pool(None) tasks = range(10000) results = [] r = pool.map_async(calculate, tasks, callback=results.append) r.wait() # Wait on the results print results 

根据Nadia和Jim的评论,这是我提出的解决scheme。 我不确定这是否是最好的方法,但它是有效的。 原来被调用的子脚本需要是一个shell脚本,因为我需要使用包括Matlab在内的一些第三方应用程序。 所以我不得不把它从Python中取出,并用bash编码。

 import sys import os import multiprocessing import subprocess def work(staname): print 'Processing station:',staname print 'Parent process:', os.getppid() print 'Process id:', os.getpid() cmd = [ "/bin/bash" "/path/to/executable/create_graphs.sh","--name=%s" % (staname) ] return subprocess.call(cmd, shell=False) if __name__ == '__main__': my_list = [ 'XYZ', 'ABC', 'NYU' ] my_list.sort() print my_list # Get the number of processors available num_processes = multiprocessing.cpu_count() threads = [] len_stas = len(my_list) print "+++ Number of stations to process: %s" % (len_stas) # run until all the threads are done, and there is no data left for list_item in my_list: # if we aren't using all the processors AND there is still data left to # compute, then spawn another thread if( len(threads) < num_processes ): p = multiprocessing.Process(target=work,args=[list_item]) p.start() print p, p.is_alive() threads.append(p) else: for thread in threads: if not thread.is_alive(): threads.remove(thread) 

这似乎是一个合理的解决scheme? 我试图使用吉姆的while循环格式,但我的脚本只是没有返回。 我不知道为什么会这样。 下面是当我用Jim的'while'循环代替'for'循环运行脚本时的输出:

 hostname{me}2% controller.py ['ABC', 'NYU', 'XYZ'] Number of processes: 64 +++ Number of stations to process: 3 hostname{me}3% 

当我用'for'循环运行它时,我得到了更有意义的东西:

 hostname{me}6% controller.py ['ABC', 'NYU', 'XYZ'] Number of processes: 64 +++ Number of stations to process: 3 Processing station: ABC Parent process: 1056 Process id: 1068 Processing station: NYU Parent process: 1056 Process id: 1069 Processing station: XYZ Parent process: 1056 Process id: 1071 hostname{me}7% 

所以这个工作,我很高兴。 但是,我仍然不明白为什么我不能使用Jim的'while'样式循环,而不是我正在使用的'for'循环。 感谢所有的帮助 – 我对知识@ stackoverflow的广度印象深刻。

我不认为你需要队列,除非你打算从应用程序中获取数据(如果你需要数据,我认为它可能更容易将其添加到数据库)

但请尝试以下尺寸:

把你的create_graphs.py脚本的内容都放到一个名为“create_graphs”的函数中

 import threading from create_graphs import create_graphs num_processes = 64 my_list = [ 'XYZ', 'ABC', 'NYU' ] threads = [] # run until all the threads are done, and there is no data left while threads or my_list: # if we aren't using all the processors AND there is still data left to # compute, then spawn another thread if (len(threads) < num_processes) and my_list: t = threading.Thread(target=create_graphs, args=[ my_list.pop() ]) t.setDaemon(True) t.start() threads.append(t) # in the case that we have the maximum number of threads check if any of them # are done. (also do this when we run out of data, until all the threads are done) else: for thread in threads: if not thread.isAlive(): threads.remove(thread) 

我知道这会导致1个线程比处理器less,这可能是好的,它留下一个处理器来pipe理线程,磁盘I / O和其他事情发生在计算机上。 如果您决定要使用最后一个核心,只需添加一个

编辑 :我想我可能误解了my_list的目的。 你不需要my_list跟踪线程(因为它们都被threads列表中的项目引用)。 但是,这是一个很好的喂养过程input的方法 – 甚至更好:使用生成器函数;)

my_listthreads的目的

my_list保存你需要在你的函数中处理的数据
threads只是当前正在运行的线程的列表

while循环执行两件事情,启动新线程来处理数据,并检查是否有任何线程正在运行。

所以只要你有(一)更多的数据处理,或(b)线程没有完成运行….你想编程继续运行。 一旦两个列表都是空的,它们将评估为False ,while循环将退出

我肯定会使用多处理,而不是使用子stream程来滚动我自己的解决scheme。