在共享内存中使用numpy数组进行多处理

我想在共享内存中使用一个numpy数组用于多处理模块。 难点在于使用它像一个numpy数组,而不仅仅是一个ctypes数组。

from multiprocessing import Process, Array import scipy def f(a): a[0] = -a[0] if __name__ == '__main__': # Create the array N = int(10) unshared_arr = scipy.rand(N) a = Array('d', unshared_arr) print "Originally, the first two elements of arr = %s"%(arr[:2]) # Create, start, and finish the child process p = Process(target=f, args=(a,)) p.start() p.join() # Print out the changed values print "Now, the first two elements of arr = %s"%arr[:2] 

这会产生如下输出:

 Originally, the first two elements of arr = [0.3518653236697369, 0.517794725524976] Now, the first two elements of arr = [-0.3518653236697369, 0.517794725524976] 

数组可以用ctypes方式访问,例如[i]是有意义的。 但是,它不是一个numpy数组,我不能执行诸如-1 * a或a.sum()的操作。 我想一个解决scheme是将ctypes数组转换为一个numpy数组。 然而(除了不能做这个工作),我不相信它会被分享了。

似乎有一个标准的解决scheme,什么是一个共同的问题。

要添加到@ unutbu的(不再可用)和@Henry Gomersall的答案。 您可以使用shared_arr.get_lock()在需要时同步访问:

 shared_arr = mp.Array(ctypes.c_double, N) # ... def f(i): # could be anything numpy accepts as an index such another numpy array with shared_arr.get_lock(): # synchronize access arr = np.frombuffer(shared_arr.get_obj()) # no data copying arr[i] = -arr[i] 

 import ctypes import logging import multiprocessing as mp from contextlib import closing import numpy as np info = mp.get_logger().info def main(): logger = mp.log_to_stderr() logger.setLevel(logging.INFO) # create shared array N, M = 100, 11 shared_arr = mp.Array(ctypes.c_double, N) arr = tonumpyarray(shared_arr) # fill with random values arr[:] = np.random.uniform(size=N) arr_orig = arr.copy() # write to arr from different processes with closing(mp.Pool(initializer=init, initargs=(shared_arr,))) as p: # many processes access the same slice stop_f = N // 10 p.map_async(f, [slice(stop_f)]*M) # many processes access different slices of the same array assert M % 2 # odd step = N // 10 p.map_async(g, [slice(i, i + step) for i in range(stop_f, N, step)]) p.join() assert np.allclose(((-1)**M)*tonumpyarray(shared_arr), arr_orig) def init(shared_arr_): global shared_arr shared_arr = shared_arr_ # must be inhereted, not passed as an argument def tonumpyarray(mp_arr): return np.frombuffer(mp_arr.get_obj()) def f(i): """synchronized.""" with shared_arr.get_lock(): # synchronize access g(i) def g(i): """no synchronization.""" info("start %s" % (i,)) arr = tonumpyarray(shared_arr) arr[i] = -1 * arr[i] info("end %s" % (i,)) if __name__ == '__main__': mp.freeze_support() main() 

如果你不需要同步访问,或者你创build自己的锁,那么mp.Array()是不必要的。 在这种情况下,你可以使用mp.sharedctypes.RawArray

Array对象具有一个get_obj()关联的get_obj()方法,该方法返回呈现缓冲区接口的ctypes数组。 我认为以下应该工作…

 from multiprocessing import Process, Array import scipy import numpy def f(a): a[0] = -a[0] if __name__ == '__main__': # Create the array N = int(10) unshared_arr = scipy.rand(N) a = Array('d', unshared_arr) print "Originally, the first two elements of arr = %s"%(a[:2]) # Create, start, and finish the child process p = Process(target=f, args=(a,)) p.start() p.join() # Print out the changed values print "Now, the first two elements of arr = %s"%a[:2] b = numpy.frombuffer(a.get_obj()) b[0] = 10.0 print a[0] 

运行时,打印出现在为10.0的第一个元素,显示ab只是同一个内存中的两个视图。

为了确保它仍然是多处理器安全的,我相信你将不得不使用存在于Array对象上的acquirerelease方法,以及它的内置锁,以确保它的所有安全访问(尽pipe我不是多处理器模块的专家)。

您可以使用sharedmem模块: https : //bitbucket.org/cleemesser/numpy-sharedmem

这里是你的原始代码,这次使用共享内存的行为就像一个NumPy数组(注意额外的最后一个语句调用一个NumPy sum()函数):

 from multiprocessing import Process import sharedmem import scipy def f(a): a[0] = -a[0] if __name__ == '__main__': # Create the array N = int(10) unshared_arr = scipy.rand(N) arr = sharedmem.empty(N) arr[:] = unshared_arr.copy() print "Originally, the first two elements of arr = %s"%(arr[:2]) # Create, start, and finish the child process p = Process(target=f, args=(arr,)) p.start() p.join() # Print out the changed values print "Now, the first two elements of arr = %s"%arr[:2] # Perform some NumPy operation print arr.sum() 

我已经写了一个使用POSIX共享内存的python模块来共享python解释器之间的numpy数组。 也许你会发现它很方便。

https://pypi.python.org/pypi/SharedArray

这是如何工作的:

 import numpy as np import SharedArray as sa # Create an array in shared memory a = sa.create("test1", 10) # Attach it as a different array. This can be done from another # python interpreter as long as it runs on the same computer. b = sa.attach("test1") # See how they are actually sharing the same memory block a[0] = 42 print(b[0]) # Destroying a does not affect b. del a print(b[0]) # See how "test1" is still present in shared memory even though we # destroyed the array a. sa.list() # Now destroy the array "test1" from memory. sa.delete("test1") # The array b is not affected, but once you destroy it then the # data are lost. print(b[0]) 

虽然已经给出的答案是好的,但只要满足两个条件,就可以更容易地解决这个问题:

  1. 您位于符合POSIX标准的操作系统(例如Linux,Mac OSX); 和
  2. 你的subprocess需要对共享数组的只读访问

在这种情况下,您不需要显式地使variables共享,因为subprocess将使用分叉来创build。 分叉的孩子自动分享父母的内存空间。 在Python多处理的情况下,这意味着它共享所有的模块级variables; 请注意,这不适用于显式传递给您的subprocess的参数或您在multiprocessing.Pool调用的函数。

一个简单的例子:

 import multiprocessing import numpy as np # will hold the (implicitly mem-shared) data data_array = None # child worker function def job_handler(num): # built-in id() returns unique memory ID of a variable return id(data_array), np.sum(data_array) def launch_jobs(data, num_jobs=5, num_worker=4): global data_array data_array = data pool = multiprocessing.Pool(num_worker) return pool.map(job_handler, range(num_jobs)) # create some random data and execute the child jobs mem_ids, sumvals = zip(*launch_jobs(np.random.rand(10))) # this will print 'True' on POSIX OS, since the data was shared print(np.all(np.asarray(mem_ids) == id(data_array)))