一维数组的高效Numpy二维数组构造

我有一个这样的数组:

A = array([1,2,3,4,5,6,7,8,9,10]) 

我试图得到这样一个数组:

 B = array([[1,2,3], [2,3,4], [3,4,5], [4,5,6]]) 

每行(固定的任意宽度)移动一个。 A的数组长度为10K,我试图在Numpy中find这样做的有效方法。 目前我正在使用vstack和一个慢的循环。 有更快的方法吗?

编辑:

 width = 3 # fixed arbitrary width length = 10000 # length of A which I wish to use B = A[0:length + 1] for i in range (1, length): B = np.vstack((B, A[i, i + width + 1])) 

实际上,有一个更有效的方法来做到这一点…使用vstack等的缺点是你正在做一个数组的副本。

顺便说一句,这是相同的@保罗的答案,但我发布这只是为了解释一些更详细的事情…

有一种方法可以只用视图来做到这一点,以避免内存重复。

我直接从Erik Rigtorp的post中借用这个来讨论numpy ,而这个讨论又是从Keith Goodman的Bottleneck (这非常有用)借来的。

基本的技巧是直接操纵数组的步幅 (对于一维数组):

 import numpy as np def rolling(a, window): shape = (a.size - window + 1, window) strides = (a.itemsize, a.itemsize) return np.lib.stride_tricks.as_strided(a, shape=shape, strides=strides) a = np.arange(10) print rolling(a, 3) 

其中a是你的input数组, window是你想要的窗口的长度(3,在你的情况下)。

这产生:

 [[0 1 2] [1 2 3] [2 3 4] [3 4 5] [4 5 6] [5 6 7] [6 7 8] [7 8 9]] 

但是,原始a和返回数组之间绝对不存在内存重复。 这意味着它比其他选项更快更好。

例如(使用a = np.arange(100000)window=3 ):

 %timeit np.vstack([a[i:i-window] for i in xrange(window)]).T 1000 loops, best of 3: 256 us per loop %timeit rolling(a, window) 100000 loops, best of 3: 12 us per loop 

如果我们把它推广到一个N维数组最后一个轴上的“滚动窗口”,我们得到了Erik Rigtorp的“滚动窗口”function:

 import numpy as np def rolling_window(a, window): """ Make an ndarray with a rolling window of the last dimension Parameters ---------- a : array_like Array to add rolling window to window : int Size of rolling window Returns ------- Array that is a view of the original array with a added dimension of size w. Examples -------- >>> x=np.arange(10).reshape((2,5)) >>> rolling_window(x, 3) array([[[0, 1, 2], [1, 2, 3], [2, 3, 4]], [[5, 6, 7], [6, 7, 8], [7, 8, 9]]]) Calculate rolling mean of last dimension: >>> np.mean(rolling_window(x, 3), -1) array([[ 1., 2., 3.], [ 6., 7., 8.]]) """ if window < 1: raise ValueError, "`window` must be at least 1." if window > a.shape[-1]: raise ValueError, "`window` is too long." shape = a.shape[:-1] + (a.shape[-1] - window + 1, window) strides = a.strides + (a.strides[-1],) return np.lib.stride_tricks.as_strided(a, shape=shape, strides=strides) 

所以,让我们来看看这里发生了什么…操纵一个数组的strides可能看起来有点神奇,但是一旦你明白发生了什么事情,这根本就不是。 numpy数组的步幅描述了沿给定轴递增一个值时必须采取的步骤的大小(以字节为单位)。 所以,对于64位浮点数的一维数组,每个项的长度是8个字节, x.strides(8,)

 x = np.arange(9) print x.strides 

现在,如果我们把它重塑成一个2D,3×3的数组,步长将是(3 * 8, 8) ,因为我们必须跳过24个字节,沿着第一个轴增加一步,而8个字节增加一步第二轴。

 y = x.reshape(3,3) print y.strides 

类似的,转置和刚才反转数组的步骤是一样的:

 print y y.strides = y.strides[::-1] print y 

显然,数组的步幅和数组的形状是紧密相连的。 如果我们改变一个,我们必须相应地改变另一个,否则我们将没有有效的描述实际上保存数组值的内存缓冲区。

因此,如果要同时更改数组的形状和大小,即使新的步幅和形状是兼容的,也x.shape设置x.stridesx.shape来完成。

这就是numpy.lib.as_strided的地方。它实际上是一个非常简单的函数,它可以同时设置数组的大小和形状。

它检查两者是否兼容,但不是旧的步幅和新的形状是兼容的,如果你独立设置两个,就会发生这种情况。 (它实际上是通过numpy的__array_interface__来实现的 ,它允许任意类将内存缓冲区描述为一个numpy数组。

因此,我们所做的只是使一个项目沿着一个轴向前移动一个项目(在64位arrays的情况下是8个字节),而沿着另一个轴向前移动8个字节

换句话说,在“窗口”大小为3的情况下,数组的形状为(whatever, 3) ,而不是为第二维步进完整的3 * x.itemsize ,而只是向前推进一个项目 ,有效地使新数组的行成为“移动窗口”视图到原始数组中。

(这也意味着x.shape[0] * x.shape[1]将不会与新数组的x.size相同。)

无论如何,希望这会让事情变得更加清晰。

这个解决scheme不是由Python循环高效地实现的,因为它带有各种types的检查,最好避免使用numpy数组。 如果你的arrays特别高,你会注意到这个速度很快:

 newshape = (4,3) newstrides = (A.itemsize, A.itemsize) B = numpy.lib.stride_tricks.as_strided(A, shape=newshape, strides=newstrides) 

这给出了一个数组A的视图 。如果你想要一个新的数组,你可以编辑,在末尾使用.copy()做相同的操作。

详细步骤:

在这种情况下, newstrides元组将是(4,4),因为数组有4个字节的项目,并且您希望继续在i维中的单个项目步骤中逐步处理您的数据。 第二个值“4”是指j维度的步幅(在一个正常的4x4arrays中它将是16)。 因为在这种情况下,你也希望在j维中以4个字节的步长增加你的读缓冲区。

乔给出了一个很好的,详细的描述,当他说这一切都是同时改变步伐和形状的时候,就让事情变得晶莹剔透。

你使用哪种方法?

 import numpy as np A = np.array([1,2,3,4,5,6,7,8,9,10]) width = 3 np.vstack([A[i:i-len(A)+width] for i in xrange(len(A)-width)]) # needs 26.3µs np.vstack([A[i:i-width] for i in xrange(width)]).T # needs 13.2µs 

如果你的宽度相对较低(3),并且你有一个很大的A (10000个元素),那么差别就更重要了:第一个是32.4ms,第二个是44μs。

只是进一步去@Joe一般的答案

 import numpy as np def rolling(a, window): step = 2 shape = ( (a.size-window)/step + 1 , window) strides = (a.itemsize*step, a.itemsize) return np.lib.stride_tricks.as_strided(a, shape=shape, strides=strides) a = np.arange(10) print rolling(a, 3) 

其输出:

 [[0 1 2] [2 3 4] [4 5 6] [6 7 8]] 

为了进一步概括第二种情况,即使用它从图像中提取斑点

 def rolling2d(a,win_h,win_w,step_h,step_w): h,w = a.shape shape = ( ((h-win_h)/step_h + 1) * ((w-win_w)/step_w + 1) , win_h , win_w) strides = (step_w*a.itemsize, h*a.itemsize,a.itemsize) return np.lib.stride_tricks.as_strided(a, shape=shape, strides=strides) a = np.arange(36).reshape(6,6) print a print rolling2d (a,3,3,2,2) 

其输出:

 [[ 0 1 2 3 4 5] [ 6 7 8 9 10 11] [12 13 14 15 16 17] [18 19 20 21 22 23] [24 25 26 27 28 29] [30 31 32 33 34 35]] [[[ 0 1 2] [ 6 7 8] [12 13 14]] [[ 2 3 4] [ 8 9 10] [14 15 16]] [[ 4 5 6] [10 11 12] [16 17 18]] [[ 6 7 8] [12 13 14] [18 19 20]]] 

我认为这可能比循环更快,当宽度固定在一个较低的数字…

 import numpy a = numpy.array([1,2,3,4,5,6]) b = numpy.reshape(a, (numpy.shape(a)[0],1)) b = numpy.concatenate((b, numpy.roll(b,-1,0), numpy.roll(b,-2,0)), 1) b = b[0:(numpy.shape(a)[0]/2) + 1,:] 

编辑显然,使用步幅的解决scheme优于这个,唯一的主要缺点是,他们还没有很好的logging…

看看: view_as_windows 。

 import numpy as np from skimage.util.shape import view_as_windows window_shape = (4, ) aa = np.arange(1000000000) # 1 billion bb = view_as_windows(aa, window_shape) 

大约1秒。

我使用的是类似于@JustInTime的更一般的函数,但适用于ndarray

 def sliding_window(x, size, overlap=0): step = size - overlap # in npts nwin = (x.shape[-1]-size)//step + 1 shape = x.shape[:-1] + (nwin, size) strides = x.strides[:-1] + (step*x.strides[-1], x.strides[-1]) return stride_tricks.as_strided(x, shape=shape, strides=strides) 

一个例子,

 x = np.arange(10) M.sliding_window(x, 5, 3) Out[1]: array([[0, 1, 2, 3, 4], [2, 3, 4, 5, 6], [4, 5, 6, 7, 8]]) x = np.arange(10).reshape((2,5)) M.sliding_window(x, 3, 1) Out[2]: array([[[0, 1, 2], [2, 3, 4]], [[5, 6, 7], [7, 8, 9]]])