是否将共享只读数据复制到不同进程以进行多处理?

这段代码看起来像这样:

glbl_array = # a 3 Gb array def my_func( args, def_param = glbl_array): #do stuff on args and def_param if __name__ == '__main__': pool = Pool(processes=4) pool.map(my_func, range(1000)) 

有没有办法确保(或鼓励)不同的进程没有得到glbl_array的副本,但共享它。 如果没有办法停止拷贝,我将使用memmapped数组,但是我的访问模式不是很规则,所以我期望memmapped数组变慢。 以上似乎是第一个尝试。 这是在Linux上。 我只是想从Stackoverflow的一些build议,不想惹恼系统pipe理员。 如果第二个参数是像glbl_array.tostring()这样的真正的不可变对象,你认为这会有帮助glbl_array.tostring()

你可以很容易地使用共享内存的东西从multiprocessing和Numpy一起:

 import multiprocessing import ctypes import numpy as np shared_array_base = multiprocessing.Array(ctypes.c_double, 10*10) shared_array = np.ctypeslib.as_array(shared_array_base.get_obj()) shared_array = shared_array.reshape(10, 10) #-- edited 2015-05-01: the assert check below checks the wrong thing # with recent versions of Numpy/multiprocessing. That no copy is made # is indicated by the fact that the program prints the output shown below. ## No copy was made ##assert shared_array.base.base is shared_array_base.get_obj() # Parallel processing def my_func(i, def_param=shared_array): shared_array[i,:] = i if __name__ == '__main__': pool = multiprocessing.Pool(processes=4) pool.map(my_func, range(10)) print shared_array 

打印

  [[0.0.0.0.0.0.0.0.0]
  [1. 1. 1. 1. 1. 1. 1. 1. 1. 1.]
  [2. 2. 2. 2. 2. 2. 2. 2. 2. 2.]
  [3. 3. 3. 3. 3. 3. 3. 3. 3. 3.]
  [4. 4. 4. 4. 4. 4. 4. 4. 4. 4.]
  [5. 5. 5. 5. 5. 5. 5. 5. 5.]
  [6. 6. 6. 6. 6. 6. 6. 6. 6. 6.]
  [7. 7. 7. 7. 7. 7. 7. 7. 7. 7.]
  [8. 8. 8. 8. 8. 8. 8. 8. 8.]
  [9. 9. 9. 9. 9. 9. 9. 9. 9.]] 

但是,Linux在fork()上有copy-on-write语义,所以即使不使用multiprocessing.Array ,数据也不会被复制,除非被写入。

以下代码适用于Win7和Mac(可能在Linux上,但未经testing)。

 import multiprocessing import ctypes import numpy as np #-- edited 2015-05-01: the assert check below checks the wrong thing # with recent versions of Numpy/multiprocessing. That no copy is made # is indicated by the fact that the program prints the output shown below. ## No copy was made ##assert shared_array.base.base is shared_array_base.get_obj() shared_array = None def init(shared_array_base): global shared_array shared_array = np.ctypeslib.as_array(shared_array_base.get_obj()) shared_array = shared_array.reshape(10, 10) # Parallel processing def my_func(i): shared_array[i, :] = i if __name__ == '__main__': shared_array_base = multiprocessing.Array(ctypes.c_double, 10*10) pool = multiprocessing.Pool(processes=4, initializer=init, initargs=(shared_array_base,)) pool.map(my_func, range(10)) shared_array = np.ctypeslib.as_array(shared_array_base.get_obj()) shared_array = shared_array.reshape(10, 10) print shared_array 

对于那些坚持使用Windows,不支持fork() (除非使用CygWin),pv的答案不起作用。 全局不可用于subprocess。

相反,您必须在Pool / Process的初始化程序中传递共享内存,如下所示:

 #! /usr/bin/python import time from multiprocessing import Process, Queue, Array def f(q,a): m = q.get() print m print a[0], a[1], a[2] m = q.get() print m print a[0], a[1], a[2] if __name__ == '__main__': a = Array('B', (1, 2, 3), lock=False) q = Queue() p = Process(target=f, args=(q,a)) p.start() q.put([1, 2, 3]) time.sleep(1) a[0:3] = (4, 5, 6) q.put([4, 5, 6]) p.join() 

(这不是numpy,它不是很好的代码,但它说明了这一点;-)

如果您正在寻找可在Windows上高效运行的选项,并且适用于不规则访问模式,分支和其他可能需要根据共享内存matrix和本地进程数据组合来分析不同matrix的场景, ParallelRegression包中的mathDict工具包旨在处理这种确切的情况。