显示Python多处理池地图调用的进度?

我有一个脚本,它成功地做了一个多处理池集合的任务与imap_unordered()调用:

 p = multiprocessing.Pool() rs = p.imap_unordered(do_work, xrange(num_tasks)) p.close() # No more work p.join() # Wait for completion 

然而,我的num_tasks是大约25万,所以join()locking主线程10秒左右,我希望能够增量回显到命令行显示主进程没有被locking。 就像是:

 p = multiprocessing.Pool() rs = p.imap_unordered(do_work, xrange(num_tasks)) p.close() # No more work while (True): remaining = rs.tasks_remaining() # How many of the map call haven't been done yet? if (remaining == 0): break # Jump out of while loop print "Waiting for", remaining, "tasks to complete..." time.sleep(2) 

是否有一个结果对象或池本身的方法,表示剩余的任务数量? 我尝试使用一个multiprocessing.Value对象作为计数器( do_work在完成任务后调用counter.value += 1动作),但是在停止递增之前,计数器只达到总值的85%。

不需要访问结果集的私有属性:

 from __future__ import division import sys for i, _ in enumerate(p.imap_unordered(do_work, xrange(num_tasks)), 1): sys.stderr.write('\rdone {0:%}'.format(i/num_tasks)) 

我个人最喜欢的 – 给你一个很好的进度条和完成ETA,而事情并行运行。

 from multiprocessing import Pool import tqdm pool = Pool(processes=8) for _ in tqdm.tqdm(pool.imap_unordered(do_work, tasks), total=len(tasks)): pass 

通过一些更深入的挖掘find了一个答案:看看imap_unordered结果对象的__dict__ ,我发现它有一个_index属性,每增加一个任务就完成。 所以这适用于日志logging,封装在while循环中:

 p = multiprocessing.Pool() rs = p.imap_unordered(do_work, xrange(num_tasks)) p.close() # No more work while (True): completed = rs._index if (completed == num_tasks): break print "Waiting for", num_tasks-completed, "tasks to complete..." time.sleep(2) 

然而,我发现交换imap_unordered map_async导致更快的执行,虽然结果对象有点不同。 相反,来自map_async的结果对象有一个_number_left属性和一个ready()方法:

 p = multiprocessing.Pool() rs = p.map_async(do_work, xrange(num_tasks)) p.close() # No more work while (True): if (rs.ready()): break remaining = rs._number_left print "Waiting for", remaining, "tasks to complete..." time.sleep(0.5) 

我知道这是一个相当古老的问题,但是当我想要跟踪python中任务池的进展时,我正在做这个。

 from progressbar import ProgressBar, SimpleProgress import multiprocessing as mp from time import sleep def my_function(letter): sleep(2) return letter+letter dummy_args = ["A", "B", "C", "D"] pool = mp.Pool(processes=2) results = [] pbar = ProgressBar(widgets=[SimpleProgress()], maxval=len(dummy_args)).start() r = [pool.apply_async(my_function, (x,), callback=results.append) for x in dummy_args] while len(results) != len(dummy_args): pbar.update(len(results)) sleep(0.5) pbar.finish() print results 

基本上,你使用apply_async和callbak(在这种情况下,它是将返回值附加到一个列表),所以你不必等待做其他事情。 然后,在一个while循环中,检查工作的进度。 在这种情况下,我添加了一个小部件,使其看起来更好。

输出:

 4 of 4 ['AA', 'BB', 'CC', 'DD'] 

希望能帮助到你。

当我试图检查它的进展时,我发现这项工作已经完成了。 这是为我工作。

 from multiprocessing import Pool import tqdm tasks = range(5) pool = Pool() pbar = tqdm(total=len(tasks)) def do_work(x): # do something with x pbar.update(1) pool.imap_unordered(do_work, tasks) pool.close() pool.join() pbar.close() 

这应该适用于所有types的多处理,无论是否阻塞。

我创build了一个自定义类来创build一个进度打印输出。 Maby这有助于:

 from multiprocessing import Pool, cpu_count class ParallelSim(object): def __init__(self, processes=cpu_count()): self.pool = Pool(processes=processes) self.total_processes = 0 self.completed_processes = 0 self.results = [] def add(self, func, args): self.pool.apply_async(func=func, args=args, callback=self.complete) self.total_processes += 1 def complete(self, result): self.results.extend(result) self.completed_processes += 1 print('Progress: {:.2f}%'.format((self.completed_processes/self.total_processes)*100)) def run(self): self.pool.close() self.pool.join() def get_results(self): return self.results