了解多重处理:Python中的共享内存pipe理,locking和队列

多进程是Python中的一个强大的工具,我想更深入地理解它。 我想知道什么时候使用常规的 锁和队列以及何时使用多处理器pipe理器在所有进程之间共享这些。

我想出了四种不同的多处理条件下的testing场景:

  1. 使用池和NOpipe理器

  2. 使用一个池和一个经理

  3. 使用单个进程和NOpipe理器

  4. 使用单独的进程和pipe理器

工作

所有的条件执行工作职能the_jobthe_job包含一些由锁保护的打印。 而且,函数的input只是简单地放入一个队列(查看是否可以从队列中恢复)。 这个input只是在主脚本start_scenario创build的range(10)的索引idx (显示在底部)。

 def the_job(args): """The job for multiprocessing. Prints some stuff secured by a lock and finally puts the input into a queue. """ idx = args[0] lock = args[1] queue=args[2] lock.acquire() print 'I' print 'was ' print 'here ' print '!!!!' print '1111' print 'einhundertelfzigelf\n' who= ' By run %d \n' % idx print who lock.release() queue.put(idx) 

一个条件的成功被定义为完全回忆来自队列的input,参见底部的函数read_queue

条件

条件1和2是不言自明的。 条件1涉及创build一个锁和一个队列,并将这些传递给进程池:

 def scenario_1_pool_no_manager(jobfunc, args, ncores): """Runs a pool of processes WITHOUT a Manager for the lock and queue. FAILS! """ mypool = mp.Pool(ncores) lock = mp.Lock() queue = mp.Queue() iterator = make_iterator(args, lock, queue) mypool.imap(jobfunc, iterator) mypool.close() mypool.join() return read_queue(queue) 

(助手函数make_iterator在本文底部给出。)条件1失败, RuntimeError: Lock objects should only be shared between processes through inheritance

条件2非常相似,但现在locking和排队在一位经理的监督下:

 def scenario_2_pool_manager(jobfunc, args, ncores): """Runs a pool of processes WITH a Manager for the lock and queue. SUCCESSFUL! """ mypool = mp.Pool(ncores) lock = mp.Manager().Lock() queue = mp.Manager().Queue() iterator = make_iterator(args, lock, queue) mypool.imap(jobfunc, iterator) mypool.close() mypool.join() return read_queue(queue) 

在条件3中,手动启动新进程,并且在没有pipe理器的情况下创build锁和队列:

 def scenario_3_single_processes_no_manager(jobfunc, args, ncores): """Runs an individual process for every task WITHOUT a Manager, SUCCESSFUL! """ lock = mp.Lock() queue = mp.Queue() iterator = make_iterator(args, lock, queue) do_job_single_processes(jobfunc, iterator, ncores) return read_queue(queue) 

条件4是类似的,但现在再次使用一个经理:

 def scenario_4_single_processes_manager(jobfunc, args, ncores): """Runs an individual process for every task WITH a Manager, SUCCESSFUL! """ lock = mp.Manager().Lock() queue = mp.Manager().Queue() iterator = make_iterator(args, lock, queue) do_job_single_processes(jobfunc, iterator, ncores) return read_queue(queue) 

在这两种情况下 – 3和4 – 我为the_job的10个任务中的每一个启动了一个新的进程,同时最多ncores进程也在同一时间运行。 这是通过以下辅助函数实现的:

 def do_job_single_processes(jobfunc, iterator, ncores): """Runs a job function by starting individual processes for every task. At most `ncores` processes operate at the same time :param jobfunc: Job to do :param iterator: Iterator over different parameter settings, contains a lock and a queue :param ncores: Number of processes operating at the same time """ keep_running=True process_dict = {} # Dict containing all subprocees while len(process_dict)>0 or keep_running: terminated_procs_pids = [] # First check if some processes did finish their job for pid, proc in process_dict.iteritems(): # Remember the terminated processes if not proc.is_alive(): terminated_procs_pids.append(pid) # And delete these from the process dict for terminated_proc in terminated_procs_pids: process_dict.pop(terminated_proc) # If we have less active processes than ncores and there is still # a job to do, add another process if len(process_dict) < ncores and keep_running: try: task = iterator.next() proc = mp.Process(target=jobfunc, args=(task,)) proc.start() process_dict[proc.pid]=proc except StopIteration: # All tasks have been started keep_running=False time.sleep(0.1) 

结果

只有条件1失败( RuntimeError: Lock objects should only be shared between processes through inheritance ),而其他3个条件是成功的。 我试图把我的头围绕这个结果。

为什么池需要在所有进程之间共享一个锁和队列,但条件3中的各个进程不需要?

我所知道的是池条件(1和2)来自迭代器的所有数据都通过酸洗来传递,而在单个进程条件(3和4)中,来自迭代器的所有数据都通过inheritance从主进程传递使用Linux )。 我猜想,直到内存从一个subprocess内改变,相同的内存,父母进程使用被访问(写时复制)。 但是,只要一个人说lock.acquire() ,这应该改变,subprocess使用不同的锁放在内存中的其他地方,不是吗? 一个孩子的过程如何知道一个兄弟已经启动了一个不通过pipe理者共享的锁?

最后,有点相关的是我的问题3和4有多less不同的条件。 两者都有单独的stream程,但是在经理的使用上有所不同。 都被认为是有效的代码? 还是应该避免使用经理,如果实际上不需要经理?


完整的脚本

对于那些只是想复制和粘贴所有代码来执行代码的人,这里是完整的脚本:

 __author__ = 'Me and myself' import multiprocessing as mp import time def the_job(args): """The job for multiprocessing. Prints some stuff secured by a lock and finally puts the input into a queue. """ idx = args[0] lock = args[1] queue=args[2] lock.acquire() print 'I' print 'was ' print 'here ' print '!!!!' print '1111' print 'einhundertelfzigelf\n' who= ' By run %d \n' % idx print who lock.release() queue.put(idx) def read_queue(queue): """Turns a qeue into a normal python list.""" results = [] while not queue.empty(): result = queue.get() results.append(result) return results def make_iterator(args, lock, queue): """Makes an iterator over args and passes the lock an queue to each element.""" return ((arg, lock, queue) for arg in args) def start_scenario(scenario_number = 1): """Starts one of four multiprocessing scenarios. :param scenario_number: Index of scenario, 1 to 4 """ args = range(10) ncores = 3 if scenario_number==1: result = scenario_1_pool_no_manager(the_job, args, ncores) elif scenario_number==2: result = scenario_2_pool_manager(the_job, args, ncores) elif scenario_number==3: result = scenario_3_single_processes_no_manager(the_job, args, ncores) elif scenario_number==4: result = scenario_4_single_processes_manager(the_job, args, ncores) if result != args: print 'Scenario %d fails: %s != %s' % (scenario_number, args, result) else: print 'Scenario %d successful!' % scenario_number def scenario_1_pool_no_manager(jobfunc, args, ncores): """Runs a pool of processes WITHOUT a Manager for the lock and queue. FAILS! """ mypool = mp.Pool(ncores) lock = mp.Lock() queue = mp.Queue() iterator = make_iterator(args, lock, queue) mypool.map(jobfunc, iterator) mypool.close() mypool.join() return read_queue(queue) def scenario_2_pool_manager(jobfunc, args, ncores): """Runs a pool of processes WITH a Manager for the lock and queue. SUCCESSFUL! """ mypool = mp.Pool(ncores) lock = mp.Manager().Lock() queue = mp.Manager().Queue() iterator = make_iterator(args, lock, queue) mypool.map(jobfunc, iterator) mypool.close() mypool.join() return read_queue(queue) def scenario_3_single_processes_no_manager(jobfunc, args, ncores): """Runs an individual process for every task WITHOUT a Manager, SUCCESSFUL! """ lock = mp.Lock() queue = mp.Queue() iterator = make_iterator(args, lock, queue) do_job_single_processes(jobfunc, iterator, ncores) return read_queue(queue) def scenario_4_single_processes_manager(jobfunc, args, ncores): """Runs an individual process for every task WITH a Manager, SUCCESSFUL! """ lock = mp.Manager().Lock() queue = mp.Manager().Queue() iterator = make_iterator(args, lock, queue) do_job_single_processes(jobfunc, iterator, ncores) return read_queue(queue) def do_job_single_processes(jobfunc, iterator, ncores): """Runs a job function by starting individual processes for every task. At most `ncores` processes operate at the same time :param jobfunc: Job to do :param iterator: Iterator over different parameter settings, contains a lock and a queue :param ncores: Number of processes operating at the same time """ keep_running=True process_dict = {} # Dict containing all subprocees while len(process_dict)>0 or keep_running: terminated_procs_pids = [] # First check if some processes did finish their job for pid, proc in process_dict.iteritems(): # Remember the terminated processes if not proc.is_alive(): terminated_procs_pids.append(pid) # And delete these from the process dict for terminated_proc in terminated_procs_pids: process_dict.pop(terminated_proc) # If we have less active processes than ncores and there is still # a job to do, add another process if len(process_dict) < ncores and keep_running: try: task = iterator.next() proc = mp.Process(target=jobfunc, args=(task,)) proc.start() process_dict[proc.pid]=proc except StopIteration: # All tasks have been started keep_running=False time.sleep(0.1) def main(): """Runs 1 out of 4 different multiprocessing scenarios""" start_scenario(1) if __name__ == '__main__': main() 

multiprocessing.Lock是使用OS提供的Semaphore对象实现的。 在Linux上,孩子通过os.fork从父类inheritance了Semaphore os.fork 。 这不是信号量的副本; 它实际上inheritance了父进程的相同句柄,这与文件描述符可以被inheritance的方式是一样的。 另一方面,Windows不支持os.fork ,所以它必须腌制Lock 。 它通过使用Windows DuplicateHandle API创build一个到multiprocessing.Lock对象内部使用的Windows Semaphore的重复句柄来实现:

重复句柄引用与原始句柄相同的对象。 因此,对象的任何更改都会反映在两个句柄中

DuplicateHandle API允许您将重复的句柄的所有权交给subprocess,以便subprocess在取消它之后可以实际使用它。 通过创build子项拥有的重复句柄,可以有效地“共享”locking对象。

这是multiprocessing/synchronize.py的信号量对象

 class SemLock(object): def __init__(self, kind, value, maxvalue): sl = self._semlock = _multiprocessing.SemLock(kind, value, maxvalue) debug('created semlock with handle %s' % sl.handle) self._make_methods() if sys.platform != 'win32': def _after_fork(obj): obj._semlock._after_fork() register_after_fork(self, _after_fork) def _make_methods(self): self.acquire = self._semlock.acquire self.release = self._semlock.release self.__enter__ = self._semlock.__enter__ self.__exit__ = self._semlock.__exit__ def __getstate__(self): # This is called when you try to pickle the `Lock`. assert_spawning(self) sl = self._semlock return (Popen.duplicate_for_child(sl.handle), sl.kind, sl.maxvalue) def __setstate__(self, state): # This is called when unpickling a `Lock` self._semlock = _multiprocessing.SemLock._rebuild(*state) debug('recreated blocker with handle %r' % state[0]) self._make_methods() 

注意__getstate__assert_spawning调用,在酸洗对象时被调用。 这是如何实现的:

 # # Check that the current thread is spawning a child process # def assert_spawning(self): if not Popen.thread_is_spawning(): raise RuntimeError( '%s objects should only be shared between processes' ' through inheritance' % type(self).__name__ ) 

该函数是通过调用thread_is_spawning确保您“inheritance” Lock函数。 在Linux上,该方法只返回False

 @staticmethod def thread_is_spawning(): return False 

这是因为Linux不需要inheritanceLock ,所以如果在Linux上实际调用__getstate__ ,我们不能inheritance。 在Windows上,还有更多的事情要做:

 def dump(obj, file, protocol=None): ForkingPickler(file, protocol).dump(obj) class Popen(object): ''' Start a subprocess to run the code of a process object ''' _tls = thread._local() def __init__(self, process_obj): ... # send information to child prep_data = get_preparation_data(process_obj._name) to_child = os.fdopen(wfd, 'wb') Popen._tls.process_handle = int(hp) try: dump(prep_data, to_child, HIGHEST_PROTOCOL) dump(process_obj, to_child, HIGHEST_PROTOCOL) finally: del Popen._tls.process_handle to_child.close() @staticmethod def thread_is_spawning(): return getattr(Popen._tls, 'process_handle', None) is not None 

这里,如果Popen._tls对象具有process_handle属性,则thread_is_spawning返回True 。 我们可以看到在__init__创build了process_handle属性,然后我们想要的inheritance的数据使用dump从父节点传递给子节点,然后属性被删除。 所以thread_is_spawning只会在__init__期间为True 。 根据这个python-ideas邮件列表线程 ,这实际上是一个人为的限制,添加到模拟与Linux上的os.fork相同的行为。 Windows实际上可以随时支持传递Lock ,因为DuplicateHandle可以随时运行。

以上所有内容适用于Queue对象,因为它在内部使用Lock

我会说,inheritanceLock对象比使用Manager.Lock()更好,因为当你使用Manager.Lock ,你对Lock每一个调用都必须通过IPC发送到Manager进程,这将是比使用共享的Lock在调用过程中慢得多。 虽然这两种方法都是完全有效的。

最后,可以使用initializer / initargs关键字参数将Lock传递给Pool所有成员,而无需使用Manager

 lock = None def initialize_lock(l): global lock lock = l def scenario_1_pool_no_manager(jobfunc, args, ncores): """Runs a pool of processes WITHOUT a Manager for the lock and queue. """ lock = mp.Lock() mypool = mp.Pool(ncores, initializer=initialize_lock, initargs=(lock,)) queue = mp.Queue() iterator = make_iterator(args, queue) mypool.imap(jobfunc, iterator) # Don't pass lock. It has to be used as a global in the child. (This means `jobfunc` would need to be re-written slightly. mypool.close() mypool.join() return read_queue(queue) 

这是有效的,因为传递给initargs参数被传递给Pool中运行的Process对象的__init__方法,所以它们最终被inheritance,而不是pickle。