更有效的方式来计算numpy的距离?

我有一个关于如何快速计算numpy距离的问题,

def getR1(VVm,VVs,HHm,HHs): t0=time.time() R=VVs.flatten()[numpy.newaxis,:]-VVm.flatten()[:,numpy.newaxis] R*=R R1=HHs.flatten()[numpy.newaxis,:]-HHm.flatten()[:,numpy.newaxis] R1*=R1 R+=R1 del R1 print "R1\t",time.time()-t0, R.shape, #11.7576191425 (108225, 10500) print numpy.max(R) #4176.26290975 # uses 17.5Gb ram return R def getR2(VVm,VVs,HHm,HHs): t0=time.time() precomputed_flat = numpy.column_stack((VVs.flatten(), HHs.flatten())) measured_flat = numpy.column_stack((VVm.flatten(), HHm.flatten())) deltas = precomputed_flat[None,:,:] - measured_flat[:, None, :] #print time.time()-t0, deltas.shape # 5.861109972 (108225, 10500, 2) R = numpy.einsum('ijk,ijk->ij', deltas, deltas) print "R2\t",time.time()-t0,R.shape, #14.5291359425 (108225, 10500) print numpy.max(R) #4176.26290975 # uses 26Gb ram return R def getR3(VVm,VVs,HHm,HHs): from numpy.core.umath_tests import inner1d t0=time.time() precomputed_flat = numpy.column_stack((VVs.flatten(), HHs.flatten())) measured_flat = numpy.column_stack((VVm.flatten(), HHm.flatten())) deltas = precomputed_flat[None,:,:] - measured_flat[:, None, :] #print time.time()-t0, deltas.shape # 5.861109972 (108225, 10500, 2) R = inner1d(deltas, deltas) print "R3\t",time.time()-t0, R.shape, #12.6972110271 (108225, 10500) print numpy.max(R) #4176.26290975 #Uses 26Gb return R def getR4(VVm,VVs,HHm,HHs): from scipy.spatial.distance import cdist t0=time.time() precomputed_flat = numpy.column_stack((VVs.flatten(), HHs.flatten())) measured_flat = numpy.column_stack((VVm.flatten(), HHm.flatten())) R=spdist.cdist(precomputed_flat,measured_flat, 'sqeuclidean') #.T print "R4\t",time.time()-t0, R.shape, #17.7022118568 (108225, 10500) print numpy.max(R) #4176.26290975 # uses 9 Gb ram return R def getR5(VVm,VVs,HHm,HHs): from scipy.spatial.distance import cdist t0=time.time() precomputed_flat = numpy.column_stack((VVs.flatten(), HHs.flatten())) measured_flat = numpy.column_stack((VVm.flatten(), HHm.flatten())) R=spdist.cdist(precomputed_flat,measured_flat, 'euclidean') #.T print "R5\t",time.time()-t0, R.shape, #15.6070930958 (108225, 10500) print numpy.max(R) #64.6240118667 # uses only 9 Gb ram return R def getR6(VVm,VVs,HHm,HHs): from scipy.weave import blitz t0=time.time() R=VVs.flatten()[numpy.newaxis,:]-VVm.flatten()[:,numpy.newaxis] blitz("R=R*R") # R*=R R1=HHs.flatten()[numpy.newaxis,:]-HHm.flatten()[:,numpy.newaxis] blitz("R1=R1*R1") # R1*=R1 blitz("R=R+R1") # R+=R1 del R1 print "R6\t",time.time()-t0, R.shape, #11.7576191425 (108225, 10500) print numpy.max(R) #4176.26290975 return R 

结果如下:

 R1 11.7737319469 (108225, 10500) 4909.66881791 R2 15.1279799938 (108225, 10500) 4909.66881791 R3 12.7408981323 (108225, 10500) 4909.66881791 R4 17.3336868286 (10500, 108225) 4909.66881791 R5 15.7530870438 (10500, 108225) 70.0690289494 R6 11.670968771 (108225, 10500) 4909.66881791 

而最后一个给出sqrt((VVm-VVs)^ 2 +(HHm-HHs)^ 2),而其他的给出(VVm-VVs)^ 2 +(HHm-HHs)^ 2,这并不重要,因为除此之外,在我的代码中,我对每个i取最小值R [i ,:],而sqrt并不影响最小值(如果我对距离感兴趣,我只需要取sqrt(value)在整个arrays上做sqrt,所以实际上没有时间差异。

问题依然是:第一个解决scheme是最好的,(第二个和第三个慢的原因是因为deltas = …需要5.8秒,(这也是为什么这两个方法需要26Gb)),为什么sqeuclidean慢于欧几里得?

sqeuclidean应该只是(VVm-VVs)^ 2 +(HHm-HHs)^ 2,而我认为它做了一些不同的事情。 任何人都知道如何find该方法的源代码(C或底部的任何东西)? 我认为它是sqrt((VVm-VVs)^ 2 +(HHm-HHs)^ 2)^ 2(我能想到为什么它会比(VVm-VVs)^ 2 +(HHm-HHs) ^ 2 – 我知道这是一个愚蠢的理由,任何人都有一个更合乎逻辑的理由?)

因为我对C一无所知,所以如何将这个与scipy.weave结合? 而且这个代码可以像Python一样编译吗? 或者我需要为此安装特殊的东西?

编辑:好吧,我试着用scipy.weave.blitz,(R6方法),这是稍快,但我认为有人知道比我更多的C仍然可以提高这个速度? 我只是采取了forms为a + = b或* =的行,并查找了它们将如何在C中,并将它们放在闪电战声明中,但我想如果我把行与扁平和newaxis语句C以及它也应该更快,但我不知道我怎么能做到这一点(谁知道C也许解释?)。 现在,闪电战与我的第一种方法之间的差距不足以真正造成C vs numpy我猜?

我想其他方法像deltas = …也可以更快,当我把它放在C?

每当你有乘法和总和,尝试使用一个点积函数或np.einsum 。 既然你是预分配你的数组,而不是有不同的数组水平和垂直坐标,堆叠在一起:

 precomputed_flat = np.column_stack((svf.flatten(), shf.flatten())) measured_flat = np.column_stack((VVmeasured.flatten(), HHmeasured.flatten())) deltas = precomputed_flat - measured_flat[:, None, :] 

从这里,最简单的将是:

 dist = np.einsum('ijk,ijk->ij', deltas, deltas) 

你也可以尝试这样的:

 from numpy.core.umath_tests import inner1d dist = inner1d(deltas, deltas) 

当然还有SciPy的空间模块cdist

 from scipy.spatial.distance import cdist dist = cdist(precomputed_flat, measured_flat, 'euclidean') 

编辑我无法在如此庞大的数据集上运行testing,但是这些时间相当有启发性:

 len_a, len_b = 10000, 1000 a = np.random.rand(2, len_a) b = np.random.rand(2, len_b) c = np.random.rand(len_a, 2) d = np.random.rand(len_b, 2) In [3]: %timeit a[:, None, :] - b[..., None] 10 loops, best of 3: 76.7 ms per loop In [4]: %timeit c[:, None, :] - d 1 loops, best of 3: 221 ms per loop 

对于上面的较小的数据集,我可以通过scipy.spatial.distance.cdist稍微加快你的方法,并通过在内存中安排不同的数据来匹配inner1d

 precomputed_flat = np.vstack((svf.flatten(), shf.flatten())) measured_flat = np.vstack((VVmeasured.flatten(), HHmeasured.flatten())) deltas = precomputed_flat[:, None, :] - measured_flat import scipy.spatial.distance as spdist from numpy.core.umath_tests import inner1d In [13]: %timeit r0 = a[0, None, :] - b[0, :, None]; r1 = a[1, None, :] - b[1, :, None]; r0 *= r0; r1 *= r1; r0 += r1 10 loops, best of 3: 146 ms per loop In [14]: %timeit deltas = (a[:, None, :] - b[..., None]).T; inner1d(deltas, deltas) 10 loops, best of 3: 145 ms per loop In [15]: %timeit spdist.cdist(aT, bT) 10 loops, best of 3: 124 ms per loop In [16]: %timeit deltas = a[:, None, :] - b[..., None]; np.einsum('ijk,ijk->jk', deltas, deltas) 10 loops, best of 3: 163 ms per loop