Python:并行执行catsubprocess

我正在运行几个cat | zgrep 在远程服务器上执行cat | zgrep命令并单独收集输出以进一步处理:

 class MainProcessor(mp.Process): def __init__(self, peaks_array): super(MainProcessor, self).__init__() self.peaks_array = peaks_array def run(self): for peak_arr in self.peaks_array: peak_processor = PeakProcessor(peak_arr) peak_processor.start() class PeakProcessor(mp.Process): def __init__(self, peak_arr): super(PeakProcessor, self).__init__() self.peak_arr = peak_arr def run(self): command = 'ssh remote_host cat files_to_process | zgrep --mmap "regex" ' log_lines = (subprocess.check_output(command, shell=True)).split('\n') process_data(log_lines) 

但是,这会导致顺序执行subprocess('ssh … cat …')命令。 二峰等第一完成等。

我怎样才能修改这段代码,使subprocess调用并行运行,同时仍然能够收集每个单独的输出?

另一种方法(而不是将shell进程放在后台的其他方法)是使用multithreading。

你有的run方法会做这样的事情:

 thread.start_new_thread ( myFuncThatDoesZGrep) 

要收集结果,你可以做这样的事情:

 class MyThread(threading.Thread): def run(self): self.finished = False # Your code to run the command here. blahBlah() # When finished.... self.finished = True self.results = [] 

按照multithreading链接中的说明运行线程。 当你的线程对象有myThread.finished == True时,你可以通过myThread.results收集结果。

您不需要multiprocessing也不需要threading来并行运行subprocess,例如:

 #!/usr/bin/env python from subprocess import Popen # run commands in parallel processes = [Popen("echo {i:d}; sleep 2; echo {i:d}".format(i=i), shell=True) for i in range(5)] # collect statuses exitcodes = [p.wait() for p in processes] 

它同时运行5个shell命令。 注意:这里不使用线程和multiprocessing模块。 没有必要在shell命令中添加符号&Popen不会等待命令完成。 你需要明确地调用.wait()

这很方便,但没有必要使用线程来收集subprocess的输出:

 #!/usr/bin/env python from multiprocessing.dummy import Pool # thread pool from subprocess import Popen, PIPE, STDOUT # run commands in parallel processes = [Popen("echo {i:d}; sleep 2; echo {i:d}".format(i=i), shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True) for i in range(5)] # collect output in parallel def get_lines(process): return process.communicate()[0].splitlines() outputs = Pool(len(processes)).map(get_lines, processes) 

相关: Python线程多个bashsubprocess? 。

下面是在同一个线程中同时从几个subprocess获取输出的代码示例:

 #!/usr/bin/env python3 import asyncio import sys from asyncio.subprocess import PIPE, STDOUT @asyncio.coroutine def get_lines(shell_command): p = yield from asyncio.create_subprocess_shell(shell_command, stdin=PIPE, stdout=PIPE, stderr=STDOUT) return (yield from p.communicate())[0].splitlines() if sys.platform.startswith('win'): loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows asyncio.set_event_loop(loop) else: loop = asyncio.get_event_loop() # get commands output in parallel coros = [get_lines('"{e}" -c "print({i:d}); import time; time.sleep({i:d})"' .format(i=i, e=sys.executable)) for i in range(5)] print(loop.run_until_complete(asyncio.gather(*coros))) loop.close()