使用Python的Multiprocessing模块来执行SEAWAT / MODFLOW模型运行

我试图在我的8处理器64位Windows 7机器上完成100个模型的运行。 我想同时运行7个模型的实例,以减less我的总运行时间(每个模型运行大约9.5分钟)。 我已经看了几个有关Python的Multiprocessing模块的线程,但我仍然缺less一些东西。

使用多处理模块

如何在多处理器系统上产生并行subprocess?

Python多处理队列

我的过程:

我有100个不同的参数集,我想通过SEAWAT / MODFLOW来比较结果。 我已经为每个模型运行预build了模型input文件,并将它们存储在自己的目录中。 我希望能够做的是一次运行7个模型,直到所有的实现已经完成。 不需要进程之间的通信或显示结果。 到目前为止,我只能依次产生模型:

import os,subprocess import multiprocessing as mp ws = r'D:\Data\Users\jbellino\Project\stJohnsDeepening\model\xsec_a' files = [] for f in os.listdir(ws + r'\fieldgen\reals'): if f.endswith('.npy'): files.append(f) ## def work(cmd): ## return subprocess.call(cmd, shell=False) def run(f,def_param=ws): real = f.split('_')[2].split('.')[0] print 'Realization %s' % real mf2k = r'c:\modflow\mf2k.1_19\bin\mf2k.exe ' mf2k5 = r'c:\modflow\MF2005_1_8\bin\mf2005.exe ' seawatV4 = r'c:\modflow\swt_v4_00_04\exe\swt_v4.exe ' seawatV4x64 = r'c:\modflow\swt_v4_00_04\exe\swt_v4x64.exe ' exe = seawatV4x64 swt_nam = ws + r'\reals\real%s\ss\ss.nam_swt' % real os.system( exe + swt_nam ) if __name__ == '__main__': p = mp.Pool(processes=mp.cpu_count()-1) #-leave 1 processor available for system and other processes tasks = range(len(files)) results = [] for f in files: r = p.map_async(run(f), tasks, callback=results.append) 

我改变了if __name__ == 'main':以下是希望它能够解决缺乏并行性的问题,我感觉上面的脚本是通过for loop来传递的。 但是,该模型甚至无法运行(没有Python错误):

 if __name__ == '__main__': p = mp.Pool(processes=mp.cpu_count()-1) #-leave 1 processor available for system and other processes p.map_async(run,((files[f],) for f in range(len(files)))) 

任何和所有的帮助,非常感谢!

编辑2012/3/26 13:31 EST

在下面的@JF Sebastian的答案中使用“Manual Pool”方法,我可以并行执行我的外部.exe。 模型实现一次调用8个批次,但在调用下一个批次之前不会等待这8个运行完成,依此类推:

 from __future__ import print_function import os,subprocess,sys import multiprocessing as mp from Queue import Queue from threading import Thread def run(f,ws): real = f.split('_')[-1].split('.')[0] print('Realization %s' % real) seawatV4x64 = r'c:\modflow\swt_v4_00_04\exe\swt_v4x64.exe ' swt_nam = ws + r'\reals\real%s\ss\ss.nam_swt' % real subprocess.check_call([seawatV4x64, swt_nam]) def worker(queue): """Process files from the queue.""" for args in iter(queue.get, None): try: run(*args) except Exception as e: # catch exceptions to avoid exiting the # thread prematurely print('%r failed: %s' % (args, e,), file=sys.stderr) def main(): # populate files ws = r'D:\Data\Users\jbellino\Project\stJohnsDeepening\model\xsec_a' wdir = os.path.join(ws, r'fieldgen\reals') q = Queue() for f in os.listdir(wdir): if f.endswith('.npy'): q.put_nowait((os.path.join(wdir, f), ws)) # start threads threads = [Thread(target=worker, args=(q,)) for _ in range(8)] for t in threads: t.daemon = True # threads die if the program dies t.start() for _ in threads: q.put_nowait(None) # signal no more files for t in threads: t.join() # wait for completion if __name__ == '__main__': mp.freeze_support() # optional if the program is not frozen main() 

没有错误追溯可用。 run()函数在调用单个模型实现文件(与多个文件一起run()执行其职责。 唯一的区别是对于多个文件,虽然每个实例立即closures,但只有一个模型运行允许完成,脚本正常退出(退出代码0),但它被称为len(files)时间。

main()添加一些打印语句会显示一些关于活动线程计数和线程状态的信息(注意,这只是对实现文件中的8个进行testing才能使屏幕截图更易于pipe理,理论上所有8个文件应该同时运行,但是,行为继续在他们产卵并立即死亡,除了一个):

 def main(): # populate files ws = r'D:\Data\Users\jbellino\Project\stJohnsDeepening\model\xsec_a' wdir = os.path.join(ws, r'fieldgen\test') q = Queue() for f in os.listdir(wdir): if f.endswith('.npy'): q.put_nowait((os.path.join(wdir, f), ws)) # start threads threads = [Thread(target=worker, args=(q,)) for _ in range(mp.cpu_count())] for t in threads: t.daemon = True # threads die if the program dies t.start() print('Active Count a',threading.activeCount()) for _ in threads: print(_) q.put_nowait(None) # signal no more files for t in threads: print(t) t.join() # wait for completion print('Active Count b',threading.activeCount()) 

截图

**读取“ D:\\Data\\Users... ”的行是我手动停止模型运行到完成时抛出的错误信息。 一旦我停止模型的运行,剩下的线程状态行会被报告,脚本退出。

编辑2012/3/26 16:24东部时间

SEAWAT允许并行执行,就像我过去所做的那样,手动使用iPython产生实例并从每个模型文件夹启动。 这一次,我从一个位置,即我的脚本所在的目录启动所有的模型运行。 看起来罪魁祸首可能是SEAWAT节省一些产出的方式。 当SEAWAT运行时,它会立即创build与模型运行相关的文件。 其中一个文件没有保存到模型实现所在的目录中,而是保存在脚本所在的顶层目录中。 这阻止了任何后续的线程在相同的位置保存相同的文件名(他们都希望这样做,因为这些文件名是通用的,并且对于每个实现都是非特定的)。 SEAWAT的窗户没有打开足够长的时间以供我阅读,甚至看到有错误信息,我只是在回去尝试使用iPython运行代码时才意识到这一点,它直接显示来自SEAWAT的打印输出而不是打开新窗口来运行程序。

我接受@JF Sebastian的答案,因为很可能一旦我解决了这个模型可执行的问题,他提供的线程代码将会使我得到我需要的地方。

最终代码

在subprocess.check_call中添加了cwd参数,以便在自己的目录中启动SEAWAT的每个实例。 非常关键。

 from __future__ import print_function import os,subprocess,sys import multiprocessing as mp from Queue import Queue from threading import Thread import threading def run(f,ws): real = f.split('_')[-1].split('.')[0] print('Realization %s' % real) seawatV4x64 = r'c:\modflow\swt_v4_00_04\exe\swt_v4x64.exe ' cwd = ws + r'\reals\real%s\ss' % real swt_nam = ws + r'\reals\real%s\ss\ss.nam_swt' % real subprocess.check_call([seawatV4x64, swt_nam],cwd=cwd) def worker(queue): """Process files from the queue.""" for args in iter(queue.get, None): try: run(*args) except Exception as e: # catch exceptions to avoid exiting the # thread prematurely print('%r failed: %s' % (args, e,), file=sys.stderr) def main(): # populate files ws = r'D:\Data\Users\jbellino\Project\stJohnsDeepening\model\xsec_a' wdir = os.path.join(ws, r'fieldgen\reals') q = Queue() for f in os.listdir(wdir): if f.endswith('.npy'): q.put_nowait((os.path.join(wdir, f), ws)) # start threads threads = [Thread(target=worker, args=(q,)) for _ in range(mp.cpu_count()-1)] for t in threads: t.daemon = True # threads die if the program dies t.start() for _ in threads: q.put_nowait(None) # signal no more files for t in threads: t.join() # wait for completion if __name__ == '__main__': mp.freeze_support() # optional if the program is not frozen main() 

我在Python代码中看不到任何计算。 如果您只需要并行执行多个外部程序,则使用subprocess进程运行程序和threading模块以保持运行的进程数不变,但最简单的代码使用multiprocessing.Pool就足够了。 subprocess

 #!/usr/bin/env python import os import multiprocessing as mp def run(filename_def_param): filename, def_param = filename_def_param # unpack arguments ... # call external program on `filename` def safe_run(*args, **kwargs): """Call run(), catch exceptions.""" try: run(*args, **kwargs) except Exception as e: print("error: %s run(*%r, **%r)" % (e, args, kwargs)) def main(): # populate files ws = r'D:\Data\Users\jbellino\Project\stJohnsDeepening\model\xsec_a' workdir = os.path.join(ws, r'fieldgen\reals') files = ((os.path.join(workdir, f), ws) for f in os.listdir(workdir) if f.endswith('.npy')) # start processes pool = mp.Pool() # use all available CPUs pool.map(safe_run, files) if __name__=="__main__": mp.freeze_support() # optional if the program is not frozen main() 

如果有多个文件,则可以将pool.map()replacefor _ in pool.imap_unordered(safe_run, files): pass

还有一个mutiprocessing.dummy.Pool提供了与multiprocessing.Pool相同的接口,但是使用线程而不是在这种情况下可能更合适的进程。

你不需要保留一些CPU空闲。 只需使用一个命令,以低优先级启动您的可执行文件(在Linux上它是一个nice程序)。

ThreadPoolExecutor示例

concurrent.futures.ThreadPoolExecutor既简单又足够,但是它需要Python 2.x的第三方依赖 (它自Python 3.2以来就是在stdlib中)。

 #!/usr/bin/env python import os import concurrent.futures def run(filename, def_param): ... # call external program on `filename` # populate files ws = r'D:\Data\Users\jbellino\Project\stJohnsDeepening\model\xsec_a' wdir = os.path.join(ws, r'fieldgen\reals') files = (os.path.join(wdir, f) for f in os.listdir(wdir) if f.endswith('.npy')) # start threads with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor: future_to_file = dict((executor.submit(run, f, ws), f) for f in files) for future in concurrent.futures.as_completed(future_to_file): f = future_to_file[future] if future.exception() is not None: print('%r generated an exception: %s' % (f, future.exception())) # run() doesn't return anything so `future.result()` is always `None` 

或者,如果我们忽略run()引发的exception:

 from itertools import repeat ... # the same # start threads with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor: executor.map(run, files, repeat(ws)) # run() doesn't return anything so `map()` results can be ignored 

subprocess + threading (手动池)解决scheme

 #!/usr/bin/env python from __future__ import print_function import os import subprocess import sys from Queue import Queue from threading import Thread def run(filename, def_param): ... # define exe, swt_nam subprocess.check_call([exe, swt_nam]) # run external program def worker(queue): """Process files from the queue.""" for args in iter(queue.get, None): try: run(*args) except Exception as e: # catch exceptions to avoid exiting the # thread prematurely print('%r failed: %s' % (args, e,), file=sys.stderr) # start threads q = Queue() threads = [Thread(target=worker, args=(q,)) for _ in range(8)] for t in threads: t.daemon = True # threads die if the program dies t.start() # populate files ws = r'D:\Data\Users\jbellino\Project\stJohnsDeepening\model\xsec_a' wdir = os.path.join(ws, r'fieldgen\reals') for f in os.listdir(wdir): if f.endswith('.npy'): q.put_nowait((os.path.join(wdir, f), ws)) for _ in threads: q.put_nowait(None) # signal no more files for t in threads: t.join() # wait for completion 

这是我保持内存中最小x线程数的方法。 它是线程和多处理模块的组合。 其他技术如上面所解释的其他技术可能是不寻常的,但可能是相当可观的。 为了解释起见,我正在一次抓取至less5个网站的情况。

所以这里是:

 #importing dependencies. from multiprocessing import Process from threading import Thread import threading # Crawler function def crawler(domain): # define crawler technique here. output.write(scrapeddata + "\n") pass 

接下来是threadController函数。 这个函数将控制线程stream向主内存。 它将继续激活线程以维持threadNum“最小”限制,即。 5.它也不会退出,直到所有的活动线程(acitveCount)完成。

它将维持最less的threadNum(5)startProcess函数线程(这些线程最终将从processList启动进程,同时以60秒的时间join它们)。 在注视threadController之后,会有2个线程,这些线程不包含在5的上述限制内。 主线程和线程控制器线程本身。 这就是为什么threading.activeCount()!= 2已被使用。

 def threadController(): print "Thread count before child thread starts is:-", threading.activeCount(), len(processList) # staring first thread. This will make the activeCount=3 Thread(target = startProcess).start() # loop while thread List is not empty OR active threads have not finished up. while len(processList) != 0 or threading.activeCount() != 2: if (threading.activeCount() < (threadNum + 2) and # if count of active threads are less than the Minimum AND len(processList) != 0): # processList is not empty Thread(target = startProcess).start() # This line would start startThreads function as a seperate thread ** 

startProcess函数作为一个单独的线程,将启动processlist中的进程。 这个函数的目的(**作为一个不同的线程开始)是它将成为进程的父线程。 所以当它以60秒的超时join它们时,这将阻止startProcess线程前进,但是这不会停止threadController的执行。 所以这样,threadController将按需要工作。

 def startProcess(): pr = processList.pop(0) pr.start() pr.join(60.00) # joining the thread with time out of 60 seconds as a float. if __name__ == '__main__': # a file holding a list of domains domains = open("Domains.txt", "r").read().split("\n") output = open("test.txt", "a") processList = [] # thread list threadNum = 5 # number of thread initiated processes to be run at one time # making process List for r in range(0, len(domains), 1): domain = domains[r].strip() p = Process(target = crawler, args = (domain,)) processList.append(p) # making a list of performer threads. # starting the threadController as a seperate thread. mt = Thread(target = threadController) mt.start() mt.join() # won't let go next until threadController thread finishes. output.close() print "Done" 

除了在内存中保持最less的线程数之外,我的目标也是避免在内存中卡住线程或进程。 我用超时function做了这个。 我对任何input错误表示歉意。

我希望这个build筑能够帮助这个世界上的任何人。 问候,Vikas Gautam