如何在Python多处理中将Pool.map和Array(共享内存)结合起来?

我有一个非常大的(只读)数据数组,我想要由多个进程并行处理。

我喜欢Pool.map函数,并希望使用它来并行计算该数据上的函数。

我看到可以使用Value或Array类在进程之间使用共享内存数据。 但是当我尝试使用这个时,我得到了一个RuntimeError:'SynchronizedString对象应该只在使用Pool.map函数时通过inheritance在进程之间共享:

这是我正在尝试做的一个简单的例子:

from sys import stdin from multiprocessing import Pool, Array def count_it( arr, key ): count = 0 for c in arr: if c == key: count += 1 return count if __name__ == '__main__': testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf" # want to share it using shared memory toShare = Array('c', testData) # this works print count_it( toShare, "a" ) pool = Pool() # RuntimeError here print pool.map( count_it, [(toShare,key) for key in ["a", "b", "s", "d"]] ) 

任何人都可以告诉我我在这里做错了吗?

所以我想要做的是在进程池中创build新进程后,将有关新创build的共享内存分配数组的信息传递给进程。

我刚刚看到赏金,再次尝试;)

基本上我认为错误消息意味着它说什么 – 多处理共享内存数组不能作为parameter passing(通过酸洗)。 将数据序列化是没有意义的 – 关键是数据是共享内存。 所以你必须使共享数组成为全局的。 我认为把它作为一个模块的属性比较合适,就像在我的第一个答案中一样,但是在你的例子中把它作为一个全局variables也是适用的。 考虑到你不想在fork之前设置数据,这里是一个修改的例子。 如果你想有多个可能的共享数组(这就是为什么你想传递给Share作为参数),你可以类似地创build一个共享数组的全局列表,只要将索引传递给count_it(这将成为for c in toShare[i]:

 from sys import stdin from multiprocessing import Pool, Array, Process def count_it( key ): count = 0 for c in toShare: if c == key: count += 1 return count if __name__ == '__main__': # allocate shared array - want lock=False in this case since we # aren't writing to it and want to allow multiple processes to access # at the same time - I think with lock=True there would be little or # no speedup maxLength = 50 toShare = Array('c', maxLength, lock=False) # fork pool = Pool() # can set data after fork testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf" if len(testData) > maxLength: raise ValueError, "Shared array too small to hold data" toShare[:len(testData)] = testData print pool.map( count_it, ["a", "b", "s", "d"] ) 

[编辑:上述不工作在Windows上,因为不使用分叉。 但是,下面的工作在Windows上,仍然使用池,所以我认为这是最接近你想要的:

 from sys import stdin from multiprocessing import Pool, Array, Process import mymodule def count_it( key ): count = 0 for c in mymodule.toShare: if c == key: count += 1 return count def initProcess(share): mymodule.toShare = share if __name__ == '__main__': # allocate shared array - want lock=False in this case since we # aren't writing to it and want to allow multiple processes to access # at the same time - I think with lock=True there would be little or # no speedup maxLength = 50 toShare = Array('c', maxLength, lock=False) # fork pool = Pool(initializer=initProcess,initargs=(toShare,)) # can set data after fork testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf" if len(testData) > maxLength: raise ValueError, "Shared array too small to hold data" toShare[:len(testData)] = testData print pool.map( count_it, ["a", "b", "s", "d"] ) 

不知道为什么地图不会pickle数组,但进程和池将 – 我想也许它已经在窗口上的subprocess初始化点传输。 请注意,数据仍然在叉后设置。

我看到的问题是,池不支持通过参数列表来酸洗共享数据。 这就是错误信息所指的“对象只能通过inheritance在进程之间共享”。 共享数据需要被inheritance,也就是说,如果你想使用Pool类来共享数据。

如果您需要明确地传递它们,则可能必须使用多处理。 这是你重做的例子:

 from multiprocessing import Process, Array, Queue def count_it( q, arr, key ): count = 0 for c in arr: if c == key: count += 1 q.put((key, count)) if __name__ == '__main__': testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf" # want to share it using shared memory toShare = Array('c', testData) q = Queue() keys = ['a', 'b', 's', 'd'] workers = [Process(target=count_it, args = (q, toShare, key)) for key in keys] for p in workers: p.start() for p in workers: p.join() while not q.empty(): print q.get(), 

输出:('s',9)('a',2)('b',3)('d',12)

队列元素的sorting可能会有所不同。

为了使这个更通用和类似的池,你可以创build一个固定的N个进程,将键列表分成N个部分,然后使用一个包装函数作为处理目标,它将为列表中的每个键调用count_it它通过,如:

 def wrapper( q, arr, keys ): for k in keys: count_it(q, arr, k) 

如果数据是只读的,只需将它作为模块中的一个variables, 然后再从池中分叉。 那么所有的subprocess应该能够访问它,只要你不写就可以不被复制。

 import myglobals # anything (empty .py file) myglobals.data = [] def count_it( key ): count = 0 for c in myglobals.data: if c == key: count += 1 return count if __name__ == '__main__': myglobals.data = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf" pool = Pool() print pool.map( count_it, ["a", "b", "s", "d"] ) 

如果你想尝试使用数组,尽pipe你可以尝试使用lock=False关键字参数(默认情况下是true)。

multiprocessing.sharedctypes模块提供了从共享内存中分配ctypes对象的function,这些对象可以被subprocessinheritance。

所以你使用sharedctypes是错误的。 你想从父进程inheritance这个数组还是你希望明确地传递它? 在前一种情况下,您必须创build一个全局variables,如其他答案所示。 但是您不需要使用sharedctypes来显式传递它,只需传递原始的testData

顺便说一下,你使用Pool.map()是错误的。 它和内build的map()函数有相同的接口(你是用starmap()搞砸了吗?)。 下面是工作示例,显式传递数组:

 from multiprocessing import Pool def count_it( (arr, key) ): count = 0 for c in arr: if c == key: count += 1 return count if __name__ == '__main__': testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf" pool = Pool() print pool.map(count_it, [(testData, key) for key in ["a", "b", "s", "d"]])