多处理中的共享内存对象

假设我有一个大内存numpy数组,我有一个函数func ,在这个巨大的数组作为input(连同一些其他参数)。 具有不同参数的func可以并行运行。 例如:

 def func(arr, param): # do stuff to arr, param # build array arr pool = Pool(processes = 6) results = [pool.apply_async(func, [arr, param]) for param in all_params] output = [res.get() for res in results] 

如果我使用多处理库,那么这个巨型数组将被多次复制到不同的进程中。

有没有办法让不同的进程共享相同的数组? 这个数组对象是只读的,不会被修改。

更复杂的是,如果arr不是一个数组,而是一个任意的python对象,有没有办法共享呢?

[EDITED]

我读了答案,但我仍然有点困惑。 由于fork()是copy-on-write,因此在python多处理库中产生新进程时,不应该调用任何额外的开销。 但是下面的代码表明有一个巨大的开销:

 from multiprocessing import Pool, Manager import numpy as np; import time def f(arr): return len(arr) t = time.time() arr = np.arange(10000000) print "construct array = ", time.time() - t; pool = Pool(processes = 6) t = time.time() res = pool.apply_async(f, [arr,]) res.get() print "multiprocessing overhead = ", time.time() - t; 

输出(顺便说一下,随着数组大小的增加,成本也会增加,所以我怀疑还有与内存复制相关的开销):

 construct array = 0.0178790092468 multiprocessing overhead = 0.252444982529 

为什么会有这么大的开销,如果我们没有复制数组? 共享内存中的哪一部分可以拯救我?

如果您使用的是使用copy-on-write fork()语义的操作系统(就像任何常见的unix一样),那么只要不改变数据结构,它就可以用于所有subprocess而不占用额外的内存。 你不需要做任何特别的事情(除非确保你不要改变对象)。

可以为你的问题做的最有效的事情是将你的数组打包成一个有效的数组结构(使用numpyarray ),把它放在共享内存中,用multiprocessing.Array包装它,然后把它传递给你的函数。 这个答案显示了如何做到这一点 。

如果你想要一个可写的共享对象,那么你需要用某种同步或locking来包装它。 multiprocessing提供了两种方法 :一种使用共享内存(适用于简单的值,数组或ctypes)或一个Manager代理,其中一个进程保存内存,一个pipe理器仲裁从其他进程(即使通过networking) 。

Manager方法可以与任意的Python对象一起使用,但是会比使用共享内存的等价方法慢,因为对象需要在进程之间进行序列化/反序列化和发送。

Python中有大量的并行处理库和方法 。 multiprocessing是一个很好的和完善的库,但如果你有特殊的需求,也许其他方法可能会更好。

我遇到了同样的问题,写了一个小共享内存实用程序类来解决它。

我正在使用multiprocessing.RawArray(lockfree),并且对数组的访问完全不同步(lockfree),注意不要自己动手。

有了这个解决scheme,我在四核i7上获得了大约3倍的加速。

这里的代码:随意使用和改进,并请回报任何错误。

 ''' Created on 14.05.2013 @author: martin ''' import multiprocessing import ctypes import numpy as np class SharedNumpyMemManagerError(Exception): pass ''' Singleton Pattern ''' class SharedNumpyMemManager: _initSize = 1024 _instance = None def __new__(cls, *args, **kwargs): if not cls._instance: cls._instance = super(SharedNumpyMemManager, cls).__new__( cls, *args, **kwargs) return cls._instance def __init__(self): self.lock = multiprocessing.Lock() self.cur = 0 self.cnt = 0 self.shared_arrays = [None] * SharedNumpyMemManager._initSize def __createArray(self, dimensions, ctype=ctypes.c_double): self.lock.acquire() # double size if necessary if (self.cnt >= len(self.shared_arrays)): self.shared_arrays = self.shared_arrays + [None] * len(self.shared_arrays) # next handle self.__getNextFreeHdl() # create array in shared memory segment shared_array_base = multiprocessing.RawArray(ctype, np.prod(dimensions)) # convert to numpy array vie ctypeslib self.shared_arrays[self.cur] = np.ctypeslib.as_array(shared_array_base) # do a reshape for correct dimensions # Returns a masked array containing the same data, but with a new shape. # The result is a view on the original array self.shared_arrays[self.cur] = self.shared_arrays[self.cnt].reshape(dimensions) # update cnt self.cnt += 1 self.lock.release() # return handle to the shared memory numpy array return self.cur def __getNextFreeHdl(self): orgCur = self.cur while self.shared_arrays[self.cur] is not None: self.cur = (self.cur + 1) % len(self.shared_arrays) if orgCur == self.cur: raise SharedNumpyMemManagerError('Max Number of Shared Numpy Arrays Exceeded!') def __freeArray(self, hdl): self.lock.acquire() # set reference to None if self.shared_arrays[hdl] is not None: # consider multiple calls to free self.shared_arrays[hdl] = None self.cnt -= 1 self.lock.release() def __getArray(self, i): return self.shared_arrays[i] @staticmethod def getInstance(): if not SharedNumpyMemManager._instance: SharedNumpyMemManager._instance = SharedNumpyMemManager() return SharedNumpyMemManager._instance @staticmethod def createArray(*args, **kwargs): return SharedNumpyMemManager.getInstance().__createArray(*args, **kwargs) @staticmethod def getArray(*args, **kwargs): return SharedNumpyMemManager.getInstance().__getArray(*args, **kwargs) @staticmethod def freeArray(*args, **kwargs): return SharedNumpyMemManager.getInstance().__freeArray(*args, **kwargs) # Init Singleton on module load SharedNumpyMemManager.getInstance() if __name__ == '__main__': import timeit N_PROC = 8 INNER_LOOP = 10000 N = 1000 def propagate(t): i, shm_hdl, evidence = t a = SharedNumpyMemManager.getArray(shm_hdl) for j in range(INNER_LOOP): a[i] = i class Parallel_Dummy_PF: def __init__(self, N): self.N = N self.arrayHdl = SharedNumpyMemManager.createArray(self.N, ctype=ctypes.c_double) self.pool = multiprocessing.Pool(processes=N_PROC) def update_par(self, evidence): self.pool.map(propagate, zip(range(self.N), [self.arrayHdl] * self.N, [evidence] * self.N)) def update_seq(self, evidence): for i in range(self.N): propagate((i, self.arrayHdl, evidence)) def getArray(self): return SharedNumpyMemManager.getArray(self.arrayHdl) def parallelExec(): pf = Parallel_Dummy_PF(N) print(pf.getArray()) pf.update_par(5) print(pf.getArray()) def sequentialExec(): pf = Parallel_Dummy_PF(N) print(pf.getArray()) pf.update_seq(5) print(pf.getArray()) t1 = timeit.Timer("sequentialExec()", "from __main__ import sequentialExec") t2 = timeit.Timer("parallelExec()", "from __main__ import parallelExec") print("Sequential: ", t1.timeit(number=1)) print("Parallel: ", t2.timeit(number=1))