C中的滚动中值algorithm

我目前正在研究一种algorithm来实现C中滚动中值滤波器(类似于滚动平均滤波器)。从我的文献search中,似乎有两种合理有效的方法来实现它。 首先是对初始值窗口进行sorting,然后执行二进制search以插入新值,并在每次迭代中删除现有值。

第二个(来自Hardle和Steiger,1995,JRSS-C,algorithm296)构build了一个双头堆结构,一头是maxheap,另一头是minheap,中间是中间的。 这产生一个线性时间algorithm,而不是O(n log n)。

这是我的问题:执行前者是可行的,但是我需要在数百万个时间序列上运行这个,所以效率非常重要。 后者certificate很难实施。 我在R的stats包的代码的Trunmed.c文件中发现了代码,但这是相当难以理解的。

有没有人知道线性时间滚动中值algorithm的一个精心编写的C实现?

编辑:链接到Trunmed.c代码http://google.com/codesearch/p?hl=zh-CN&sa=N&cd=1&ct=rc#mYw3h_Lb_e0/R-2.2.0/src/library/stats/src/Trunmed.c

我曾多次查看过R的src/library/stats/src/Trunmed.c ,因为我想在独立的C ++类/ C子程序中使用类似的东西。 请注意,这实际上是两个实现中的一个,请参阅src/library/stats/man/runmed.Rd (帮助文件的来源)

 \details{ Apart from the end values, the result \code{y = runmed(x, k)} simply has \code{y[j] = median(x[(j-k2):(j+k2)])} (k = 2*k2+1), computed very efficiently. The two algorithms are internally entirely different: \describe{ \item{"Turlach"}{is the Härdle-Steiger algorithm (see Ref.) as implemented by Berwin Turlach. A tree algorithm is used, ensuring performance \eqn{O(n \log k)}{O(n * log(k))} where \code{n <- length(x)} which is asymptotically optimal.} \item{"Stuetzle"}{is the (older) Stuetzle-Friedman implementation which makes use of median \emph{updating} when one observation enters and one leaves the smoothing window. While this performs as \eqn{O(n \times k)}{O(n * k)} which is slower asymptotically, it is considerably faster for small \eqn{k} or \eqn{n}.} } } 

很高兴看到这个更独立的方式重新使用。 你是志愿者吗? 我可以帮助一些R位。

编辑1 :除了上面的Trunmed.c旧版本的链接,这里是当前的SVN副本

  • Srunmed.c (Stuetzle版本)
  • Trunmed.c (Turlach版本)
  • runmed.R为R函数调用这些

编辑2 :瑞安Tibshirani有一些C和Fortran代码快速中位数binning这可能是一个合适的开始点窗口的方法。

我无法find顺序统计的c ++数据结构的现代实现,所以最终在MAK( Match Editorial :向下滚动到FloatingMedian)build议的顶级编码器链接中实现了两个想法。

两个multisets

第一个想法是将数据分成两个数据结构(堆,多等),每个插入/删除使用O(ln N)不允许分位数dynamic变化,而没有大的成本。 即我们可以有一个滚动中位数,或滚动75%,但不是在同一时间。

段树

第二个想法使用O(ln N)进行插入/删除/查询的分段树,但是更灵活。 最好的“N”是你的数据范围的大小。 因此,如果您的滚动中位数有一百万个项目的窗口,但您的数据从1..65536变化,那么每移动一百万个滚动窗口,只需要16个操作!

c ++代码类似于Denis在上面发布的(“这是一个简单的量化数据algorithm”)

GNU顺序统计树

在放弃之前,我发现stdlibc ++包含顺序统计树!

这些有两个关键的操作:

 iter = tree.find_by_order(value) order = tree.order_of_key(value) 

请参阅libstdc ++手册policy_based_data_structures_test (search“拆分和连接”)。

为了支持c ++ 0x / c ++ 11 style partial typedefs的编译器,

 #if !defined(GNU_ORDER_STATISTIC_SET_H) #define GNU_ORDER_STATISTIC_SET_H #include <ext/pb_ds/assoc_container.hpp> #include <ext/pb_ds/tree_policy.hpp> // A red-black tree table storing ints and their order // statistics. Note that since the tree uses // tree_order_statistics_node_update as its update policy, then it // includes its methods by_order and order_of_key. template <typename T> using t_order_statistic_set = __gnu_pbds::tree< T, __gnu_pbds::null_type, std::less<T>, __gnu_pbds::rb_tree_tag, // This policy updates nodes' metadata for order statistics. __gnu_pbds::tree_order_statistics_node_update>; #endif //GNU_ORDER_STATISTIC_SET_H 

我在这里做了一个C实现 。 这个问题还有一些细节: 在C – Turlach实现中滚动中值 。

示例用法:

 int main(int argc, char* argv[]) { int i,v; Mediator* m = MediatorNew(15); for (i=0;i<30;i++) { v = rand()&127; printf("Inserting %3d \n",v); MediatorInsert(m,v); v=MediatorMedian(m); printf("Median = %3d.\n\n",v); ShowTree(m); } } 

下面是一个简单的量化数据algorithm(几个月后):

 """ median1.py: moving median 1d for quantized, eg 8-bit data Method: cache the median, so that wider windows are faster. The code is simple -- no heaps, no trees. Keywords: median filter, moving median, running median, numpy, scipy See Perreault + Hebert, Median Filtering in Constant Time, 2007, http://nomis80.org/ctmf.html: nice 6-page paper and C code, mainly for 2d images Example: y = medians( x, window=window, nlevel=nlevel ) uses: med = Median1( nlevel, window, counts=np.bincount( x[0:window] )) med.addsub( +, - ) -- see the picture in Perreault m = med.median() -- using cached m, summ How it works: picture nlevel=8, window=3 -- 3 1s in an array of 8 counters: counts: . 1 . . 1 . 1 . sums: 0 1 1 1 2 2 3 3 ^ sums[3] < 2 <= sums[4] <=> median 4 addsub( 0, 1 ) m, summ stay the same addsub( 5, 1 ) slide right addsub( 5, 6 ) slide left Updating `counts` in an `addsub` is trivial, updating `sums` is not. But we can cache the previous median `m` and the sum to m `summ`. The less often the median changes, the faster; so fewer levels or *wider* windows are faster. (Like any cache, run time varies a lot, depending on the input.) See also: scipy.signal.medfilt -- runtime roughly ~ window size http://stackoverflow.com/questions/1309263/rolling-median-algorithm-in-c """ from __future__ import division import numpy as np # bincount, pad0 __date__ = "2009-10-27 oct" __author_email__ = "denis-bz-py at t-online dot de" #............................................................................... class Median1: """ moving median 1d for quantized, eg 8-bit data """ def __init__( s, nlevel, window, counts ): s.nlevel = nlevel # >= len(counts) s.window = window # == sum(counts) s.half = (window // 2) + 1 # odd or even s.setcounts( counts ) def median( s ): """ step up or down until sum cnt to m-1 < half <= sum to m """ if s.summ - s.cnt[sm] < s.half <= s.summ: return sm j, sumj = sm, s.summ if sumj <= s.half: while j < s.nlevel - 1: j += 1 sumj += s.cnt[j] # print "j sumj:", j, sumj if sumj - s.cnt[j] < s.half <= sumj: break else: while j > 0: sumj -= s.cnt[j] j -= 1 # print "j sumj:", j, sumj if sumj - s.cnt[j] < s.half <= sumj: break sm, s.summ = j, sumj return sm def addsub( s, add, sub ): s.cnt[add] += 1 s.cnt[sub] -= 1 assert s.cnt[sub] >= 0, (add, sub) if add <= sm: s.summ += 1 if sub <= sm: s.summ -= 1 def setcounts( s, counts ): assert len(counts) <= s.nlevel, (len(counts), s.nlevel) if len(counts) < s.nlevel: counts = pad0__( counts, s.nlevel ) # numpy array / list sumcounts = sum(counts) assert sumcounts == s.window, (sumcounts, s.window) s.cnt = counts s.slowmedian() def slowmedian( s ): j, sumj = -1, 0 while sumj < s.half: j += 1 sumj += s.cnt[j] sm, s.summ = j, sumj def __str__( s ): return ("median %d: " % sm) + \ "".join([ (" ." if c == 0 else "%2d" % c) for c in s.cnt ]) #............................................................................... def medianfilter( x, window, nlevel=256 ): """ moving medians, y[j] = median( x[j:j+window] ) -> a shorter list, len(y) = len(x) - window + 1 """ assert len(x) >= window, (len(x), window) # np.clip( x, 0, nlevel-1, out=x ) # cf http://scipy.org/Cookbook/Rebinning cnt = np.bincount( x[0:window] ) med = Median1( nlevel=nlevel, window=window, counts=cnt ) y = (len(x) - window + 1) * [0] y[0] = med.median() for j in xrange( len(x) - window ): med.addsub( x[j+window], x[j] ) y[j+1] = med.median() return y # list # return np.array( y ) def pad0__( x, tolen ): """ pad x with 0 s, numpy array or list """ n = tolen - len(x) if n > 0: try: x = np.r_[ x, np.zeros( n, dtype=x[0].dtype )] except NameError: x += n * [0] return x #............................................................................... if __name__ == "__main__": Len = 10000 window = 3 nlevel = 256 period = 100 np.set_printoptions( 2, threshold=100, edgeitems=10 ) # print medians( np.arange(3), 3 ) sinwave = (np.sin( 2 * np.pi * np.arange(Len) / period ) + 1) * (nlevel-1) / 2 x = np.asarray( sinwave, int ) print "x:", x for window in ( 3, 31, 63, 127, 255 ): if window > Len: continue print "medianfilter: Len=%d window=%d nlevel=%d:" % (Len, window, nlevel) y = medianfilter( x, window=window, nlevel=nlevel ) print np.array( y ) # end median1.py 

我使用这个增量中值估计器:

 median += eta * sgn(sample - median) 

它与更常见的均值估计量具有相同的forms:

 mean += eta * (sample - mean) 

这里eta是一个小的学习速率参数(例如0.001), sgn ()是返回{-1,0,1}之一的符号函数。 (如果数据是非平稳的,并且想要跟踪随时间变化的数据,则使用一个常数eta ;否则,对于固定源使用类似eta = 1 / n的数据来收敛,其中n是到目前为止所看到的样本数。 )

此外,我修改了中值估计器,使其适用于任意分位数。 一般来说,分位数函数(http://en.wikipedia.org/wiki/Quantile_function)会告诉您将数据分成两部分的值:p和1-p。; 以下估计值递增:

 quantile += eta * (sgn(sample - quantile) + 2.0 * p - 1.0) 

值p应该在[0,1]之内。 这实质上将sgn()函数的对称输出{-1,0,1}偏移到一侧,将数据样本分割成两个不相等大小的区域(数据的小数p和1-p小于/大于分位数估计值)。 注意,对于p = 0.5,这减less到中值估计器。

滚动中位数可以通过维护两个数字分区来find。

为了维护分区,使用Min Heap和Max Heap。

最大堆将包含小于等于中位数的数字。

Min Heap将包含大于等于中位数的数字。

平衡约束:如果元素的总数是偶数,那么这两个堆应该有相等的元素。

如果元素的总数是奇数,那么Max Heap将比Min Heap多一个元素。

中间元素:如果两个分区的元素数量相等,那么中值将是第一个分区的最大元素和第二个分区的最小元素的总和的一半。

否则,median将是第一个分区的最大元素。

algorithm-
 1-拿两堆(1分堆和1最大堆)
   最大堆将包含前半部分元素
    Min Heap将包含下半部分元素

 2-比较来自stream与Max Heap顶部的新号码, 
   如果它小于或等于在最大堆中添加该数字。 
   否则,在最小堆中添加数字。

 3-如果最小堆比最大堆有更多的元素 
   然后移除Min Heap的顶层元素并添加Max Heap。
   如果最大堆有超过最小堆中的元素 
   然后删除Max Heap的顶层元素并添加到Min Heap中。

 4-如果两堆都有相同数量的元素,那么
   中位数将是Max Heap中最大元素和Min Heap中最小元素的总和的一半。
   否则,median将是第一个分区的最大元素。
 public class Solution { public static void main(String[] args) { Scanner in = new Scanner(System.in); RunningMedianHeaps s = new RunningMedianHeaps(); int n = in.nextInt(); for(int a_i=0; a_i < n; a_i++){ printMedian(s,in.nextInt()); } in.close(); } public static void printMedian(RunningMedianHeaps s, int nextNum){ s.addNumberInHeap(nextNum); System.out.printf("%.1f\n",s.getMedian()); } } class RunningMedianHeaps{ PriorityQueue<Integer> minHeap = new PriorityQueue<Integer>(); PriorityQueue<Integer> maxHeap = new PriorityQueue<Integer>(Comparator.reverseOrder()); public double getMedian() { int size = minHeap.size() + maxHeap.size(); if(size % 2 == 0) return (maxHeap.peek()+minHeap.peek())/2.0; return maxHeap.peek()*1.0; } private void balanceHeaps() { if(maxHeap.size() < minHeap.size()) { maxHeap.add(minHeap.poll()); } else if(maxHeap.size() > 1+minHeap.size()) { minHeap.add(maxHeap.poll()); } } public void addNumberInHeap(int num) { if(maxHeap.size()==0 || num <= maxHeap.peek()) { maxHeap.add(num); } else { minHeap.add(num); } balanceHeaps(); } } 

如果您可以根据时间点来引用值,则可以使用replace值对值进行采样,应用引导程序在置信区间内生成引导值中值。 这可以让你计算一个近似的中位数,而不是不断的将input值sorting到一个数据结构中。

对于那些需要在Java中运行中位数的人… PriorityQueue是你的朋友。 O(log N)插入,O(1)当前中位数和O(N)删除。 如果你知道你的数据分布,你可以做得比这更好。

 public class RunningMedian { // Two priority queues, one of reversed order. PriorityQueue<Integer> lower = new PriorityQueue<Integer>(10, new Comparator<Integer>() { public int compare(Integer arg0, Integer arg1) { return (arg0 < arg1) ? 1 : arg0 == arg1 ? 0 : -1; } }), higher = new PriorityQueue<Integer>(); public void insert(Integer n) { if (lower.isEmpty() && higher.isEmpty()) lower.add(n); else { if (n <= lower.peek()) lower.add(n); else higher.add(n); rebalance(); } } void rebalance() { if (lower.size() < higher.size() - 1) lower.add(higher.remove()); else if (higher.size() < lower.size() - 1) higher.add(lower.remove()); } public Integer getMedian() { if (lower.isEmpty() && higher.isEmpty()) return null; else if (lower.size() == higher.size()) return (lower.peek() + higher.peek()) / 2; else return (lower.size() < higher.size()) ? higher.peek() : lower .peek(); } public void remove(Integer n) { if (lower.remove(n) || higher.remove(n)) rebalance(); } } 

值得指出的是,有一个特殊的情况,它有一个简单的确切的解决scheme:当stream中的所有值都是在相对小的定义范围内的整数时。 例如,假设它们都必须位于0和1023之间。在这种情况下,只需定义一个由1024个元素和一个计数组成的数组,并清除所有这些值。 对于stream中的每个值,增加相应的bin和count。 stream结束后,查找包含count / 2最高值的bin – 通过添加从0开始的连续bin可以很容易地完成。使用相同的方法,可以find任意排名的值。 (如果检测料桶饱和度并且在运行期间将存储料箱的尺寸“升级”到更大的types,则需要小的复杂度。)

这种特殊情况可能看起来是人为的,但在实践中是非常普遍的。 如果它们位于一个范围内并且已知“精度足够”的精度水平,它也可以用作实数的近似值。 这对几乎所有的“现实世界”物体的测量都适用。 例如,一群人的身高或体重。 不够大的一套? 对于这个星球上所有(个体)细菌的长度或重量来说,这也同样适用 – 假设有人可以提供数据!

它看起来像我误解了原来的 – 这似乎是想要一个滑动窗口中位数,而不是一个很长的stream的中位数。 这种方法仍然适用于此。 加载初始窗口的前N个stream值,然后为第N + 1个stream值递增对应的分箱,同时递减对应于第0个stream值的分箱。 在这种情况下,需要保留最后的N个值以允许递减,这可以通过周期性地处理大小为N的数组来有效地完成。由于中值的位置只能改变-2,-1,0,1 ,在滑动窗口的每一步2上,没有必要将每个步骤中的所有分箱求和到中值,只要根据哪一个(或哪些)分箱被修改来调整“中值指针”即可。 例如,如果新值和被删除的值都低于当前中值,那么它不会改变(offset = 0)。 当N变得太大而不能方便地存储在内存中时,该方法就会崩溃。

这是一个可以用于确切的输出不重要(用于显示等)您需要totalcount和lastmedian,再加上newvalue。

 { totalcount++; newmedian=lastmedian+(newvalue>lastmedian?1:-1)*(lastmedian==0?newvalue: lastmedian/totalcount*2); } 

对于诸如page_display_time之类的东西产生相当精确的结果。

规则:inputstream需要按照页面显示时间,数量大(> 30等)的顺序平滑,并且具有非零中值。

例如:页面加载时间800个项目,10ms … 3000ms,平均90ms,实际中值:11ms

经过30次input,中位数误差一般<= 20%(9ms..12ms),并且越来越less。 经过800次input后,误差为+ -2%。

另一个有类似解决scheme的思考者在这里: 中值filter超高效的实现

这是java的实现

 package MedianOfIntegerStream; import java.util.Comparator; import java.util.HashSet; import java.util.Iterator; import java.util.Set; import java.util.TreeSet; public class MedianOfIntegerStream { public Set<Integer> rightMinSet; public Set<Integer> leftMaxSet; public int numOfElements; public MedianOfIntegerStream() { rightMinSet = new TreeSet<Integer>(); leftMaxSet = new TreeSet<Integer>(new DescendingComparator()); numOfElements = 0; } public void addNumberToStream(Integer num) { leftMaxSet.add(num); Iterator<Integer> iterMax = leftMaxSet.iterator(); Iterator<Integer> iterMin = rightMinSet.iterator(); int maxEl = iterMax.next(); int minEl = 0; if (iterMin.hasNext()) { minEl = iterMin.next(); } if (numOfElements % 2 == 0) { if (numOfElements == 0) { numOfElements++; return; } else if (maxEl > minEl) { iterMax.remove(); if (minEl != 0) { iterMin.remove(); } leftMaxSet.add(minEl); rightMinSet.add(maxEl); } } else { if (maxEl != 0) { iterMax.remove(); } rightMinSet.add(maxEl); } numOfElements++; } public Double getMedian() { if (numOfElements % 2 != 0) return new Double(leftMaxSet.iterator().next()); else return (leftMaxSet.iterator().next() + rightMinSet.iterator().next()) / 2.0; } private class DescendingComparator implements Comparator<Integer> { @Override public int compare(Integer o1, Integer o2) { return o2 - o1; } } public static void main(String[] args) { MedianOfIntegerStream streamMedian = new MedianOfIntegerStream(); streamMedian.addNumberToStream(1); System.out.println(streamMedian.getMedian()); // should be 1 streamMedian.addNumberToStream(5); streamMedian.addNumberToStream(10); streamMedian.addNumberToStream(12); streamMedian.addNumberToStream(2); System.out.println(streamMedian.getMedian()); // should be 5 streamMedian.addNumberToStream(3); streamMedian.addNumberToStream(8); streamMedian.addNumberToStream(9); System.out.println(streamMedian.getMedian()); // should be 6.5 } } 

如果你只需要一个平滑的平均值,一个快速/简单的方法是将最新的值乘以x,平均值乘以(1-x),然后将它们相加。 这就成了新的平均值。

编辑:不是什么用户要求,而不是统计有效,但足够多的用途。
我会留在这里(尽pipedownvotes)search!