并行化后,pandasgroupby

我已经使用了rosetta.parallel.pandas_easy并行化后应用,例如:

from rosetta.parallel.pandas_easy import groupby_to_series_to_frame df = pd.DataFrame({'a': [6, 2, 2], 'b': [4, 5, 6]},index= ['g1', 'g1', 'g2']) groupby_to_series_to_frame(df, np.mean, n_jobs=8, use_apply=True, by=df.index) 

然而,有没有人想出了如何并行化返回数据框的函数呢? 正如预期的那样,此代码不能正常工作。

 def tmpFunc(df): df['c'] = df.a + df.b return df df.groupby(df.index).apply(tmpFunc) groupby_to_series_to_frame(df, tmpFunc, n_jobs=1, use_apply=True, by=df.index) 

这似乎工作,虽然它真的应该build在pandas

 import pandas as pd from joblib import Parallel, delayed import multiprocessing def tmpFunc(df): df['c'] = df.a + df.b return df def applyParallel(dfGrouped, func): retLst = Parallel(n_jobs=multiprocessing.cpu_count())(delayed(func)(group) for name, group in dfGrouped) return pd.concat(retLst) if __name__ == '__main__': df = pd.DataFrame({'a': [6, 2, 2], 'b': [4, 5, 6]},index= ['g1', 'g1', 'g2']) print 'parallel version: ' print applyParallel(df.groupby(df.index), tmpFunc) print 'regular version: ' print df.groupby(df.index).apply(tmpFunc) print 'ideal version (does not work): ' print df.groupby(df.index).applyParallel(tmpFunc) 

伊万的答案是伟大的,但它看起来可以稍微简化,也取消了取决于joblib的需要:

 from multiprocessing import Pool, cpu_count def applyParallel(dfGrouped, func): with Pool(cpu_count()) as p: ret_list = p.map(func, [group for name, group in dfGrouped]) return pandas.concat(ret_list) 

顺便说一句:这不能代替任何 groupby.apply(),但它将覆盖典型的情况:例如它应该覆盖文档中的情况2和3,而你应该通过给出参数axis=1到最后的pandas.concat()调用。

我有一个黑客我使用pandas进行并行化。 我把我的数据框分成块,把每个块放到列表的元素中,然后用ipython的并行位对dataframe列表进行并行处理。 然后我使用pandas concat函数将列表重新放在一起。

然而,这通常不适用。 它适用于我,因为我想要应用于每个数据块的function需要一分钟左右的时间。 而把我的数据分开放在一起并不需要太长时间。 所以这显然是一个混乱。 这样说,这里是一个例子。 我正在使用Ipython笔记本,以便在代码中看到%%time magic:

 ## make some example data import pandas as pd np.random.seed(1) n=10000 df = pd.DataFrame({'mygroup' : np.random.randint(1000, size=n), 'data' : np.random.rand(n)}) grouped = df.groupby('mygroup') 

在这个例子中,我将根据上面的groupby来创build“块”,但这不一定是数据如何分块。 虽然这是一个很常见的模式。

 dflist = [] for name, group in grouped: dflist.append(group) 

设置并行位

 from IPython.parallel import Client rc = Client() lview = rc.load_balanced_view() lview.block = True 

写一个愚蠢的函数来适用于我们的数据

 def myFunc(inDf): inDf['newCol'] = inDf.data ** 10 return inDf 

现在让我们串行运行代码然后并行运行。 先串连:

 %%time serial_list = map(myFunc, dflist) CPU times: user 14 s, sys: 19.9 ms, total: 14 s Wall time: 14 s 

现在平行

 %%time parallel_list = lview.map(myFunc, dflist) CPU times: user 1.46 s, sys: 86.9 ms, total: 1.54 s Wall time: 1.56 s 

那么只需要几ms就可以将它们合并回一个dataframe

 %%time combinedDf = pd.concat(parallel_list) CPU times: user 296 ms, sys: 5.27 ms, total: 301 ms Wall time: 300 ms 

我在我的MacBook上运行了6个IPython引擎,但是你可以看到它将执行时间从14秒降低到2秒。

对于长时间运行的随机模拟,我可以通过使用StarCluster启动集群来使用AWS后端。 但是,大部分时间,我在MBP上只是并行处理8个CPU。

随同JD龙的回答一个简短的评论。 我发现如果组的数量非常大(比如说成千上万),并且你的apply函数正在做一些相当简单和快速的事情,那么把你的数据框分成块,并把每个块分配给一个worker来执行groupby-apply(连续)可以比并行groupby-apply更快,让工作人员读取包含多个组的队列。 例:

 import pandas as pd import numpy as np import time from concurrent.futures import ProcessPoolExecutor, as_completed nrows = 15000 np.random.seed(1980) df = pd.DataFrame({'a': np.random.permutation(np.arange(nrows))}) 

所以我们的数据框如下所示:

  a 0 3425 1 1016 2 8141 3 9263 4 8018 

请注意,“a”列有许多组(认为客户ID):

 len(df.a.unique()) 15000 

在我们的小组上运作的function:

 def f1(group): time.sleep(0.0001) return group 

开始游泳池:

 ppe = ProcessPoolExecutor(12) futures = [] results = [] 

做一个平行的小组:

 %%time for name, group in df.groupby('a'): p = ppe.submit(f1, group) futures.append(p) for future in as_completed(futures): r = future.result() results.append(r) df_output = pd.concat(results) del ppe CPU times: user 18.8 s, sys: 2.15 s, total: 21 s Wall time: 17.9 s 

现在我们来添加一个把df分成很多组的列:

 df['b'] = np.random.randint(0, 12, nrows) 

现在,而不是15000组,只有12:

 len(df.b.unique()) 12 

我们将分区我们的DF,并在每个块上做一个groupby-apply。

 ppe = ProcessPoolExecutor(12) 

包装乐趣:

 def f2(df): df.groupby('a').apply(f1) return df 

发出每个块在串行操作:

 %%time for i in df.b.unique(): p = ppe.submit(f2, df[df.b==i]) futures.append(p) for future in as_completed(futures): r = future.result() results.append(r) df_output = pd.concat(results) CPU times: user 11.4 s, sys: 176 ms, total: 11.5 s Wall time: 12.4 s 

请注意,每个组的时间花费没有改变。 而改变的是工人从中读取的队列长度。 我怀疑现在发生的事情是工人不能同时访问共享内存,而是不停地回头看看队列,从而踩到对方脚趾。 随着更大块的操作,工人返回的频率减less,这个问题得到改善,整体执行速度加快。