Python多处理全局variables更新不返回给父

我正在尝试从subprocess返回值,但这些值是不可取的。 所以我在线程模块中使用了全局variables,但是在使用多处理模块时,还没有能够检索在subprocess中完成的更新。 我希望我失去了一些东西。

最后打印的结果始终与赋予variablesdataDV03和dataDV04的初始值相同。 subprocess正在更新这些全局variables,但是这些全局variables在父进程中保持不变。

import multiprocessing # NOT ABLE to get python to return values in passed variables. ants = ['DV03', 'DV04'] dataDV03 = ['', ''] dataDV04 = {'driver': '', 'status': ''} def getDV03CclDrivers(lib): # call global variable global dataDV03 dataDV03[1] = 1 dataDV03[0] = 0 # eval( 'CCL.' + lib + '.' + lib + '( "DV03" )' ) these are unpicklable instantiations def getDV04CclDrivers(lib, dataDV04): # pass global variable dataDV04['driver'] = 0 # eval( 'CCL.' + lib + '.' + lib + '( "DV04" )' ) if __name__ == "__main__": jobs = [] if 'DV03' in ants: j = multiprocessing.Process(target=getDV03CclDrivers, args=('LORR',)) jobs.append(j) if 'DV04' in ants: j = multiprocessing.Process(target=getDV04CclDrivers, args=('LORR', dataDV04)) jobs.append(j) for j in jobs: j.start() for j in jobs: j.join() print 'Results:\n' print 'DV03', dataDV03 print 'DV04', dataDV04 

我不能张贴到我的问题,所以会尝试编辑原件。

这是不可挑选的对象:

 In [1]: from CCL import LORR In [2]: lorr=LORR.LORR('DV20', None) In [3]: lorr Out[3]: <CCL.LORR.LORR instance at 0x94b188c> 

这是使用multiprocessing.Pool将实例返回给父级时返回的错误:

 Thread getCcl (('DV20', 'LORR'),) Process PoolWorker-1: Traceback (most recent call last): File "/alma/ACS-10.1/casa/lib/python2.6/multiprocessing/process.py", line 232, in _bootstrap self.run() File "/alma/ACS-10.1/casa/lib/python2.6/multiprocessing/process.py", line 88, in run self._target(*self._args, **self._kwargs) File "/alma/ACS-10.1/casa/lib/python2.6/multiprocessing/pool.py", line 71, in worker put((job, i, result)) File "/alma/ACS-10.1/casa/lib/python2.6/multiprocessing/queues.py", line 366, in put return send(obj) UnpickleableError: Cannot pickle <type 'thread.lock'> objects In [5]: dir(lorr) Out[5]: ['GET_AMBIENT_TEMPERATURE', 'GET_CAN_ERROR', 'GET_CAN_ERROR_COUNT', 'GET_CHANNEL_NUMBER', 'GET_COUNT_PER_C_OP', 'GET_COUNT_REMAINING_OP', 'GET_DCM_LOCKED', 'GET_EFC_125_MHZ', 'GET_EFC_COMB_LINE_PLL', 'GET_ERROR_CODE_LAST_CAN_ERROR', 'GET_INTERNAL_SLAVE_ERROR_CODE', 'GET_MAGNITUDE_CELSIUS_OP', 'GET_MAJOR_REV_LEVEL', 'GET_MINOR_REV_LEVEL', 'GET_MODULE_CODES_CDAY', 'GET_MODULE_CODES_CMONTH', 'GET_MODULE_CODES_DIG1', 'GET_MODULE_CODES_DIG2', 'GET_MODULE_CODES_DIG4', 'GET_MODULE_CODES_DIG6', 'GET_MODULE_CODES_SERIAL', 'GET_MODULE_CODES_VERSION_MAJOR', 'GET_MODULE_CODES_VERSION_MINOR', 'GET_MODULE_CODES_YEAR', 'GET_NODE_ADDRESS', 'GET_OPTICAL_POWER_OFF', 'GET_OUTPUT_125MHZ_LOCKED', 'GET_OUTPUT_2GHZ_LOCKED', 'GET_PATCH_LEVEL', 'GET_POWER_SUPPLY_12V_NOT_OK', 'GET_POWER_SUPPLY_15V_NOT_OK', 'GET_PROTOCOL_MAJOR_REV_LEVEL', 'GET_PROTOCOL_MINOR_REV_LEVEL', 'GET_PROTOCOL_PATCH_LEVEL', 'GET_PROTOCOL_REV_LEVEL', 'GET_PWR_125_MHZ', 'GET_PWR_25_MHZ', 'GET_PWR_2_GHZ', 'GET_READ_MODULE_CODES', 'GET_RX_OPT_PWR', 'GET_SERIAL_NUMBER', 'GET_SIGN_OP', 'GET_STATUS', 'GET_SW_REV_LEVEL', 'GET_TE_LENGTH', 'GET_TE_LONG_FLAG_SET', 'GET_TE_OFFSET_COUNTER', 'GET_TE_SHORT_FLAG_SET', 'GET_TRANS_NUM', 'GET_VDC_12', 'GET_VDC_15', 'GET_VDC_7', 'GET_VDC_MINUS_7', 'SET_CLEAR_FLAGS', 'SET_FPGA_LOGIC_RESET', 'SET_RESET_AMBSI', 'SET_RESET_DEVICE', 'SET_RESYNC_TE', 'STATUS', '_HardwareDevice__componentName', '_HardwareDevice__hw', '_HardwareDevice__stickyFlag', '_LORRBase__logger', '__del__', '__doc__', '__init__', '__module__', '_devices', 'clearDeviceCommunicationErrorAlarm', 'getControlList', 'getDeviceCommunicationErrorCounter', 'getErrorMessage', 'getHwState', 'getInternalSlaveCanErrorMsg', 'getLastCanErrorMsg', 'getMonitorList', 'hwConfigure', 'hwDiagnostic', 'hwInitialize', 'hwOperational', 'hwSimulation', 'hwStart', 'hwStop', 'inErrorState', 'isMonitoring', 'isSimulated'] In [6]: 

在使用multiprocessing来打开第二个进程时,会创build一个具有其全局状态的全新 Python 实例 。 该全局状态不是共享的,因此subprocess对全局variables所做的更改对于父进程是不可见的。

另外, multiprocessing提供的大部分抽象使用pickle来传输数据。 所有使用代理传输的数据都必须是可以扫描的 ; 其中包含Manager提供的所有对象。 相关的引文(我的重点):

确保代理方法的参数是可挑选的。

和(在Manager部分):

其他进程可以通过使用代理来访问共享对象。

Queue还需要pickleable数据; 该文件不这样说,但一个快速testing证实:

 import multiprocessing import pickle class Thing(object): def __getstate__(self): print 'got pickled' return self.__dict__ def __setstate__(self, state): print 'got unpickled' self.__dict__.update(state) q = multiprocessing.Queue() p = multiprocessing.Process(target=q.put, args=(Thing(),)) p.start() print q.get() p.join() 

输出:

 $ python mp.py got pickled got unpickled <__main__.Thing object at 0x10056b350> 

一种可能适用于你的方法,如果你真的无法腌制数据,就是find一种方法将它存储为一个ctype对象; 对内存的引用可以传递给subprocess 。 这对我来说似乎相当狡猾。 我从来没有做过。 但它可能是一个可能的解决scheme。

鉴于你的更新,似乎你需要更多地了解LORR的内部。 LORR是一堂课吗? 你能从中inheritance吗? 它是别的东西的子类吗? 什么是MRO? (尝试LORR.__mro__并在输出后发送。)如果它是一个纯python对象,则可以对其进行子类化,创build一个__setstate__和一个__getstate__来启用酸洗。

另一种方法是找出如何从LORR实例中获取相关的数据,并通过一个简单的string来传递它。 既然你说你真的只是想调用对象的方法,为什么不使用Queue来来回发送消息呢? 换句话说,像这样(示意):

 Main Process Child 1 Child 2 LORR 1 LORR 2 child1_in_queue -> get message 'foo' call 'foo' method child1_out_queue <- return foo data string child2_in_queue -> get message 'bar' call 'bar' method child2_out_queue <- return bar data string 

使用multiprocess进程时,在进程之间传递对象的唯一方法是使用QueuePipe ; 全局variables不共享。 对象必须是pickleable,所以multiprocess进程不会帮你在这里。

@DBlas给你一个快速的url和经理类在一个答案的参考,但我认为它仍然有点模糊,所以我认为它可能会对你有所帮助,只是看到它应用…

 import multiprocessing from multiprocessing import Manager ants = ['DV03', 'DV04'] def getDV03CclDrivers(lib, data_dict): data_dict[1] = 1 data_dict[0] = 0 def getDV04CclDrivers(lib, data_list): data_list['driver'] = 0 if __name__ == "__main__": manager = Manager() dataDV03 = manager.list(['', '']) dataDV04 = manager.dict({'driver': '', 'status': ''}) jobs = [] if 'DV03' in ants: j = multiprocessing.Process( target=getDV03CclDrivers, args=('LORR', dataDV03)) jobs.append(j) if 'DV04' in ants: j = multiprocessing.Process( target=getDV04CclDrivers, args=('LORR', dataDV04)) jobs.append(j) for j in jobs: j.start() for j in jobs: j.join() print 'Results:\n' print 'DV03', dataDV03 print 'DV04', dataDV04 

由于多处理实际上使用单独的进程,所以不能简单地共享全局variables,因为它们将在内存中完全不同的“空间”。 你在一个过程中做的全球化将不会反映在另一个过程中。 虽然我承认,从你看到的方式来看,它看起来很混乱,它们都在同一块代码中生存,所以“为什么这些方法不能进入全球”呢? 它很难将自己的头脑围绕着他们将在不同的过程中运行的想法。

Manager类被赋予作为数据结构的代理,可以在进程之间来回传递信息。 你要做的是从经理创build一个特殊的字典和列表,将它们传递到你的方法,并在本地进行操作。

不可腌制的数据

对于专门的LORR对象,您可能需要创build一些代表可以代表实例可选状态的代理。

不是超级健壮或testing了很多,但给你的想法。

 class LORRProxy(object): def __init__(self, lorrObject=None): self.instance = lorrObject def __getstate__(self): # how to get the state data out of a lorr instance inst = self.instance state = dict( foo = inst.a, bar = inst.b, ) return state def __setstate__(self, state): # rebuilt a lorr instance from state lorr = LORR.LORR() lorr.a = state['foo'] lorr.b = state['bar'] self.instance = lorr 

您也可以使用多处理arrays 。 这允许你在进程之间有一个共享状态,并且可能是最接近全局variables的东西。

在main的顶部,声明一个Array。 第一个论点“我”说这将是整数。 第二个参数给出了初始值:

 shared_dataDV03 = multiprocessing.Array ('i', (0, 0)) #a shared array 

然后将这个数组作为parameter passing给进程:

 j = multiprocessing.Process(target=getDV03CclDrivers, args=('LORR',shared_dataDV03)) 

您必须在被调用的函数中接收数组参数,然后您可以在函数中对其进行修改:

 def getDV03CclDrivers(lib,arr): # call global variable arr[1]=1 arr[0]=0 

该数组与父级共享,因此您可以在父级末尾打印出值:

 print 'DV03', shared_dataDV03[:] 

它会显示更改:

 DV03 [0, 1] 

我使用p.map()将大量进程分离到远程服务器,并在不可预知的时间返回时显示结果:

 Servers=[...] from multiprocessing import Pool p=Pool(len(Servers)) p.map(DoIndividualSummary, Servers) 

如果DoIndividualSummary用于print结果,这个效果很好,但总体结果是不可预测的,这使得解释变得困难。 我尝试了一些使用全局variables的方法,但遇到了问题。 最后,我用sqlite3成功了。

p.map()之前,打开一个sqlite连接并创build一个表:

 import sqlite3 conn=sqlite3.connect('servers.db') # need conn for commit and close db=conn.cursor() try: db.execute('''drop table servers''') except: pass db.execute('''CREATE TABLE servers (server text, serverdetail text, readings text)''') conn.commit() 

然后,从DoIndividualSummary()返回时,将结果保存到表中:

 db.execute('''INSERT INTO servers VALUES (?,?,?)''', (server,serverdetail,readings)) conn.commit() return 

map()语句之后,打印结果:

 db.execute('''select * from servers order by server''') rows=db.fetchall() for server,serverdetail,readings in rows: print serverdetail,readings 

可能看起来像是矫枉过正,但对于我来说比build议的解决scheme更简单。