Python多处理:如何在多个进程中共享字典?

一个创build多个进程的程序,这些进程在可连接的队列Q ,最终可能会操作全局字典D来存储结果。 (因此每个subprocess可以使用D来存储其结果,并且还可以查看其他subprocess正在产生的结果)

如果我在subprocess中打印字典D,则会看到对其进行的修改(即在D上)。 但是在主进程joinQ之后,如果我打印D,这是一个空的字典!

我知道这是一个同步/locking问题。 有人可以告诉我这里发生了什么事情吗?我可以如何同步访问D?

一般的答案涉及使用Manager对象。 从文档改编:

 from multiprocessing import Process, Manager def f(d): d[1] += '1' d['2'] += 2 if __name__ == '__main__': manager = Manager() d = manager.dict() d[1] = '1' d['2'] = 2 p1 = Process(target=f, args=(d,)) p2 = Process(target=f, args=(d,)) p1.start() p2.start() p1.join() p2.join() print d 

输出:

 $ python mul.py {1: '111', '2': 6} 

multithreading不像线程。 每个subprocess将获得主进程内存的副本。 通常状态是通过通信(pipe道/套接字),信号或共享内存共享的。

多处理使得一些抽象可用于您的用例 – 通过使用代理或共享内存将共享状态视为本地: http : //docs.python.org/library/multiprocessing.html#sharing-state-between-processes

相关部分:

我想分享我自己的工作,比Manager的字典更快,比使用大量内存的pyshmht库更简单,更稳定,并且不适用于Mac OS。 虽然我的字典只适用于简单的string,目前是不可变的。 我使用线性探测实现并将键和值对存储在表之后的单独的内存块中。

 from mmap import mmap import struct from timeit import default_timer from multiprocessing import Manager from pyshmht import HashTable class shared_immutable_dict: def __init__(self, a): self.hs = 1 << (len(a) * 3).bit_length() kvp = self.hs * 4 ht = [0xffffffff] * self.hs kvl = [] for k, v in a.iteritems(): h = self.hash(k) while ht[h] != 0xffffffff: h = (h + 1) & (self.hs - 1) ht[h] = kvp kvp += self.kvlen(k) + self.kvlen(v) kvl.append(k) kvl.append(v) self.m = mmap(-1, kvp) for p in ht: self.m.write(uint_format.pack(p)) for x in kvl: if len(x) <= 0x7f: self.m.write_byte(chr(len(x))) else: self.m.write(uint_format.pack(0x80000000 + len(x))) self.m.write(x) def hash(self, k): h = hash(k) h = (h + (h >> 3) + (h >> 13) + (h >> 23)) * 1749375391 & (self.hs - 1) return h def get(self, k, d=None): h = self.hash(k) while True: x = uint_format.unpack(self.m[h * 4:h * 4 + 4])[0] if x == 0xffffffff: return d self.m.seek(x) if k == self.read_kv(): return self.read_kv() h = (h + 1) & (self.hs - 1) def read_kv(self): sz = ord(self.m.read_byte()) if sz & 0x80: sz = uint_format.unpack(chr(sz) + self.m.read(3))[0] - 0x80000000 return self.m.read(sz) def kvlen(self, k): return len(k) + (1 if len(k) <= 0x7f else 4) def __contains__(self, k): return self.get(k, None) is not None def close(self): self.m.close() uint_format = struct.Struct('>I') def uget(a, k, d=None): return to_unicode(a.get(to_str(k), d)) def uin(a, k): return to_str(k) in a def to_unicode(s): return s.decode('utf-8') if isinstance(s, str) else s def to_str(s): return s.encode('utf-8') if isinstance(s, unicode) else s def mmap_test(): n = 1000000 d = shared_immutable_dict({str(i * 2): '1' for i in xrange(n)}) start_time = default_timer() for i in xrange(n): if bool(d.get(str(i))) != (i % 2 == 0): raise Exception(i) print 'mmap speed: %d gets per sec' % (n / (default_timer() - start_time)) def manager_test(): n = 100000 d = Manager().dict({str(i * 2): '1' for i in xrange(n)}) start_time = default_timer() for i in xrange(n): if bool(d.get(str(i))) != (i % 2 == 0): raise Exception(i) print 'manager speed: %d gets per sec' % (n / (default_timer() - start_time)) def shm_test(): n = 1000000 d = HashTable('tmp', n) d.update({str(i * 2): '1' for i in xrange(n)}) start_time = default_timer() for i in xrange(n): if bool(d.get(str(i))) != (i % 2 == 0): raise Exception(i) print 'shm speed: %d gets per sec' % (n / (default_timer() - start_time)) if __name__ == '__main__': mmap_test() manager_test() shm_test() 

在我的笔记本上的性能结果是:

 mmap speed: 247288 gets per sec manager speed: 33792 gets per sec shm speed: 691332 gets per sec 

简单的用法例子:

 ht = shared_immutable_dict({'a': '1', 'b': '2'}) print ht.get('a') 

也许你可以尝试pyshmht ,分享基于内存的哈希表扩展为Python。

注意

  1. 它没有完全testing,仅供您参考。

  2. 它目前缺乏多处理的locking/半机制。