Subprocess.Popen:将stdout和stderr复制到terminal和variables

是否有可能修改下面的代码,从“标准输出”和“标准错误”打印输出:

  • 打印在terminal上 (实时),
  • 最后存储在输出errsvariables?

代码:

#!/usr/bin/python3 # -*- coding: utf-8 -*- import subprocess def run_cmd(command, cwd=None): p = subprocess.Popen(command, cwd=cwd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE) outs, errs = p.communicate() rc = p.returncode outs = outs.decode('utf-8') errs = errs.decode('utf-8') return (rc, (outs, errs)) 

感谢@unutbu,特别感谢@ jf-sebastian,最终的function:

 #!/usr/bin/python3 # -*- coding: utf-8 -*- import sys from queue import Queue from subprocess import PIPE, Popen from threading import Thread def read_output(pipe, funcs): for line in iter(pipe.readline, b''): for func in funcs: func(line.decode('utf-8')) pipe.close() def write_output(get): for line in iter(get, None): sys.stdout.write(line) def run_cmd(command, cwd=None, passthrough=True): outs, errs = None, None proc = Popen( command, cwd=cwd, shell=False, close_fds=True, stdout=PIPE, stderr=PIPE, bufsize=1 ) if passthrough: outs, errs = [], [] q = Queue() stdout_thread = Thread( target=read_output, args=(proc.stdout, [q.put, outs.append]) ) stderr_thread = Thread( target=read_output, args=(proc.stderr, [q.put, errs.append]) ) writer_thread = Thread( target=write_output, args=(q.get,) ) for t in (stdout_thread, stderr_thread, writer_thread): t.daemon = True t.start() proc.wait() for t in (stdout_thread, stderr_thread): t.join() q.put(None) outs = ' '.join(outs) errs = ' '.join(errs) else: outs, errs = proc.communicate() outs = '' if outs == None else outs.decode('utf-8') errs = '' if errs == None else errs.decode('utf-8') rc = proc.returncode return (rc, (outs, errs)) 

你可以派生线程来读取stdout和stderrpipe道,写入一个通用队列,并附加到列表。 然后使用第三个线程来打印队列中的项目。

 import time import Queue import sys import threading import subprocess PIPE = subprocess.PIPE def read_output(pipe, funcs): for line in iter(pipe.readline, ''): for func in funcs: func(line) # time.sleep(1) pipe.close() def write_output(get): for line in iter(get, None): sys.stdout.write(line) process = subprocess.Popen( ['random_print.py'], stdout=PIPE, stderr=PIPE, close_fds=True, bufsize=1) q = Queue.Queue() out, err = [], [] tout = threading.Thread( target=read_output, args=(process.stdout, [q.put, out.append])) terr = threading.Thread( target=read_output, args=(process.stderr, [q.put, err.append])) twrite = threading.Thread(target=write_output, args=(q.get,)) for t in (tout, terr, twrite): t.daemon = True t.start() process.wait() for t in (tout, terr): t.join() q.put(None) print(out) print(err) 

使用第三个线程(而不是让前两个线程都直接打印到terminal)的原因是为了防止两个打印语句同时发生,这可能导致有时出现乱码文本。


以上调用random_print.py ,随机打印到stdout和stderr:

 import sys import time import random for i in range(50): f = random.choice([sys.stdout,sys.stderr]) f.write(str(i)+'\n') f.flush() time.sleep(0.1) 

这个解决scheme借鉴了JF Sebastian的代码和想法。


这是一个类Unix系统的替代解决scheme,使用select.select

 import collections import select import fcntl import os import time import Queue import sys import threading import subprocess PIPE = subprocess.PIPE def make_async(fd): # https://stackoverflow.com/a/7730201/190597 '''add the O_NONBLOCK flag to a file descriptor''' fcntl.fcntl( fd, fcntl.F_SETFL, fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK) def read_async(fd): # https://stackoverflow.com/a/7730201/190597 '''read some data from a file descriptor, ignoring EAGAIN errors''' # time.sleep(1) try: return fd.read() except IOError, e: if e.errno != errno.EAGAIN: raise e else: return '' def write_output(fds, outmap): for fd in fds: line = read_async(fd) sys.stdout.write(line) outmap[fd.fileno()].append(line) process = subprocess.Popen( ['random_print.py'], stdout=PIPE, stderr=PIPE, close_fds=True) make_async(process.stdout) make_async(process.stderr) outmap = collections.defaultdict(list) while True: rlist, wlist, xlist = select.select([process.stdout, process.stderr], [], []) write_output(rlist, outmap) if process.poll() is not None: write_output([process.stdout, process.stderr], outmap) break fileno = {'stdout': process.stdout.fileno(), 'stderr': process.stderr.fileno()} print(outmap[fileno['stdout']]) print(outmap[fileno['stderr']]) 

这个解决scheme使用了Adam Rosenfield的post中的代码和想法。

要在一个线程中同时捕获和显示来自subprocess的stdout和stderr,可以使用asynchronousI / O:

 #!/usr/bin/env python3 import asyncio import os import sys from asyncio.subprocess import PIPE @asyncio.coroutine def read_stream_and_display(stream, display): """Read from stream line by line until EOF, display, and capture the lines. """ output = [] while True: line = yield from stream.readline() if not line: break output.append(line) display(line) # assume it doesn't block return b''.join(output) @asyncio.coroutine def read_and_display(*cmd): """Capture cmd's stdout, stderr while displaying them as they arrive (line by line). """ # start process process = yield from asyncio.create_subprocess_exec(*cmd, stdout=PIPE, stderr=PIPE) # read child's stdout/stderr concurrently (capture and display) try: stdout, stderr = yield from asyncio.gather( read_stream_and_display(process.stdout, sys.stdout.buffer.write), read_stream_and_display(process.stderr, sys.stderr.buffer.write)) except Exception: process.kill() raise finally: # wait for the process to exit rc = yield from process.wait() return rc, stdout, stderr # run the event loop if os.name == 'nt': loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows asyncio.set_event_loop(loop) else: loop = asyncio.get_event_loop() rc, *output = loop.run_until_complete(read_and_display(*cmd)) loop.close()