最快的方法来sorting10个数字? (数字是32位)

我正在解决一个问题,它涉及很快sorting10个数字(int32)。 我的应用程序需要尽可能快地sorting10百万次。 我正在抽样数十亿个元素的数据集,每次我需要从中select10个数字(简化)并对它们进行sorting(并从sorting后的10个元素列表中作出结论)。

目前我正在使用插入sorting,但我想我可以实现一个非常快速的自定义sortingalgorithm为我的具体问题的10个数字将击败插入sorting。

有没有人有任何想法如何解决这个问题?

(跟进HelloWorld的build议,研究sortingnetworking。)

看起来29比较/交换networking是做10inputsorting的最快方法。 我使用了Waksman在1969年为Javascript编写的这个例子,它应该直接翻译成C语言,因为它只是一个if语句,比较和交换的列表。

 function sortNet10(data) { // ten-input sorting network by Waksman, 1969 var swap; if (data[0] > data[5]) { swap = data[0]; data[0] = data[5]; data[5] = swap; } if (data[1] > data[6]) { swap = data[1]; data[1] = data[6]; data[6] = swap; } if (data[2] > data[7]) { swap = data[2]; data[2] = data[7]; data[7] = swap; } if (data[3] > data[8]) { swap = data[3]; data[3] = data[8]; data[8] = swap; } if (data[4] > data[9]) { swap = data[4]; data[4] = data[9]; data[9] = swap; } if (data[0] > data[3]) { swap = data[0]; data[0] = data[3]; data[3] = swap; } if (data[5] > data[8]) { swap = data[5]; data[5] = data[8]; data[8] = swap; } if (data[1] > data[4]) { swap = data[1]; data[1] = data[4]; data[4] = swap; } if (data[6] > data[9]) { swap = data[6]; data[6] = data[9]; data[9] = swap; } if (data[0] > data[2]) { swap = data[0]; data[0] = data[2]; data[2] = swap; } if (data[3] > data[6]) { swap = data[3]; data[3] = data[6]; data[6] = swap; } if (data[7] > data[9]) { swap = data[7]; data[7] = data[9]; data[9] = swap; } if (data[0] > data[1]) { swap = data[0]; data[0] = data[1]; data[1] = swap; } if (data[2] > data[4]) { swap = data[2]; data[2] = data[4]; data[4] = swap; } if (data[5] > data[7]) { swap = data[5]; data[5] = data[7]; data[7] = swap; } if (data[8] > data[9]) { swap = data[8]; data[8] = data[9]; data[9] = swap; } if (data[1] > data[2]) { swap = data[1]; data[1] = data[2]; data[2] = swap; } if (data[3] > data[5]) { swap = data[3]; data[3] = data[5]; data[5] = swap; } if (data[4] > data[6]) { swap = data[4]; data[4] = data[6]; data[6] = swap; } if (data[7] > data[8]) { swap = data[7]; data[7] = data[8]; data[8] = swap; } if (data[1] > data[3]) { swap = data[1]; data[1] = data[3]; data[3] = swap; } if (data[4] > data[7]) { swap = data[4]; data[4] = data[7]; data[7] = swap; } if (data[2] > data[5]) { swap = data[2]; data[2] = data[5]; data[5] = swap; } if (data[6] > data[8]) { swap = data[6]; data[6] = data[8]; data[8] = swap; } if (data[2] > data[3]) { swap = data[2]; data[2] = data[3]; data[3] = swap; } if (data[4] > data[5]) { swap = data[4]; data[4] = data[5]; data[5] = swap; } if (data[6] > data[7]) { swap = data[6]; data[6] = data[7]; data[7] = swap; } if (data[3] > data[4]) { swap = data[3]; data[3] = data[4]; data[4] = swap; } if (data[5] > data[6]) { swap = data[5]; data[5] = data[6]; data[6] = swap; } return(data); } alert(sortNet10([5,7,1,8,4,3,6,9,2,0])); 

当你处理这个固定大小时,请看Sorting Networks 。 这些algorithm具有固定的运行时间并且独立于其input。 对于你的用例,你没有这样的开销,一些sortingalgorithm。

比特sorting是这种networking的实现。 这个在len(n)<= 32的CPU上效果最好。 在更大的input你可以考虑移动到GPU。 https://en.wikipedia.org/wiki/Sorting_network

顺便说一句,比较sortingalgorithm的一个很好的网页是这里(尽pipe它缺less了bitonic sort

http://www.sorting-algorithms.com

使用一个sortingnetworking,比较组数为4,所以你可以在SIMD寄存器中进行。 一对打包的最小/最大指令实现打包比较器function。 对不起,我现在没有时间寻找一个我记得看到这个网页,但希望searchSIMD或SSE分拣networking将转向一个页面。

x86 SSE确实为四个32位整数的vector打包了32位整数最小和最大指令。 AVX2(Haswell和更高版本)具有相同的8位整数的256b向量。 也有有效的洗牌说明。

如果你有很多独立的小类,可以使用向量并行执行4或8种sorting。 ESP。 如果你随机select元素(所以要sorting的数据不会在内存中连续),你可以避免洗牌,只需要按你需要的顺序进行比较。 10个寄存器保存来自4个(AVX2:8)10个整数的列表的所有数据仍然留下6个暂存空间的寄存器。

如果您还需要对关联的数据进行sorting,则向量sortingnetworking效率较低。 在这种情况下,最有效的方法似乎是使用打包比较来获取哪些元素已更改的掩码,并使用该掩码来混合(引用)关联数据的向量。

如何展开,无分支selectsorting?

 #include <iostream> #include <algorithm> #include <random> //return the index of the minimum element in array a int min(const int * const a) { int m = a[0]; int indx = 0; #define TEST(i) (m > a[i]) && (m = a[i], indx = i ); //see http://stackoverflow.com/a/7074042/2140449 TEST(1); TEST(2); TEST(3); TEST(4); TEST(5); TEST(6); TEST(7); TEST(8); TEST(9); #undef TEST return indx; } void sort( int * const a ){ int work[10]; int indx; #define GET(i) indx = min(a); work[i] = a[indx]; a[indx] = 2147483647; //get the minimum, copy it to work and set it at max_int in a GET(0); GET(1); GET(2); GET(3); GET(4); GET(5); GET(6); GET(7); GET(8); GET(9); #undef GET #define COPY(i) a[i] = work[i]; //copy back to a COPY(0); COPY(1); COPY(2); COPY(3); COPY(4); COPY(5); COPY(6); COPY(7); COPY(8); COPY(9); #undef COPY } int main() { //generating and printing a random array int a[10] = { 1,2,3,4,5,6,7,8,9,10 }; std::random_device rd; std::mt19937 g(rd()); std::shuffle( a, a+10, g); for (int i = 0; i < 10; i++) { std::cout << a[i] << ' '; } std::cout << std::endl; //sorting and printing again sort(a); for (int i = 0; i < 10; i++) { std::cout << a[i] << ' '; } return 0; } 

http://coliru.stacked-crooked.com/a/71e18bc4f7fa18c6

唯一相关的行是前两个#define

它使用两个列表,完全重新检查第一个十次,这将是一个不好的实现selectsorting,但它避免了分支和可变长度循环,这可能会补偿与现代处理器和这样一个小数据集。


基准

我对分类networking进行了基准testing,我的代码似乎比较慢。 但是我试图删除展开和复制。 运行这个代码:

 #include <iostream> #include <algorithm> #include <random> #include <chrono> int min(const int * const a, int i) { int m = a[i]; int indx = i++; for ( ; i<10; i++) //see http://stackoverflow.com/a/7074042/2140449 (m > a[i]) && (m = a[i], indx = i ); return indx; } void sort( int * const a ){ for (int i = 0; i<9; i++) std::swap(a[i], a[min(a,i)]); //search only forward } void sortNet10(int * const data) { // ten-input sorting network by Waksman, 1969 int swap; if (data[0] > data[5]) { swap = data[0]; data[0] = data[5]; data[5] = swap; } if (data[1] > data[6]) { swap = data[1]; data[1] = data[6]; data[6] = swap; } if (data[2] > data[7]) { swap = data[2]; data[2] = data[7]; data[7] = swap; } if (data[3] > data[8]) { swap = data[3]; data[3] = data[8]; data[8] = swap; } if (data[4] > data[9]) { swap = data[4]; data[4] = data[9]; data[9] = swap; } if (data[0] > data[3]) { swap = data[0]; data[0] = data[3]; data[3] = swap; } if (data[5] > data[8]) { swap = data[5]; data[5] = data[8]; data[8] = swap; } if (data[1] > data[4]) { swap = data[1]; data[1] = data[4]; data[4] = swap; } if (data[6] > data[9]) { swap = data[6]; data[6] = data[9]; data[9] = swap; } if (data[0] > data[2]) { swap = data[0]; data[0] = data[2]; data[2] = swap; } if (data[3] > data[6]) { swap = data[3]; data[3] = data[6]; data[6] = swap; } if (data[7] > data[9]) { swap = data[7]; data[7] = data[9]; data[9] = swap; } if (data[0] > data[1]) { swap = data[0]; data[0] = data[1]; data[1] = swap; } if (data[2] > data[4]) { swap = data[2]; data[2] = data[4]; data[4] = swap; } if (data[5] > data[7]) { swap = data[5]; data[5] = data[7]; data[7] = swap; } if (data[8] > data[9]) { swap = data[8]; data[8] = data[9]; data[9] = swap; } if (data[1] > data[2]) { swap = data[1]; data[1] = data[2]; data[2] = swap; } if (data[3] > data[5]) { swap = data[3]; data[3] = data[5]; data[5] = swap; } if (data[4] > data[6]) { swap = data[4]; data[4] = data[6]; data[6] = swap; } if (data[7] > data[8]) { swap = data[7]; data[7] = data[8]; data[8] = swap; } if (data[1] > data[3]) { swap = data[1]; data[1] = data[3]; data[3] = swap; } if (data[4] > data[7]) { swap = data[4]; data[4] = data[7]; data[7] = swap; } if (data[2] > data[5]) { swap = data[2]; data[2] = data[5]; data[5] = swap; } if (data[6] > data[8]) { swap = data[6]; data[6] = data[8]; data[8] = swap; } if (data[2] > data[3]) { swap = data[2]; data[2] = data[3]; data[3] = swap; } if (data[4] > data[5]) { swap = data[4]; data[4] = data[5]; data[5] = swap; } if (data[6] > data[7]) { swap = data[6]; data[6] = data[7]; data[7] = swap; } if (data[3] > data[4]) { swap = data[3]; data[3] = data[4]; data[4] = swap; } if (data[5] > data[6]) { swap = data[5]; data[5] = data[6]; data[6] = swap; } } std::chrono::duration<double> benchmark( void(*func)(int * const), const int seed ) { std::mt19937 g(seed); int a[10] = {10,11,12,13,14,15,16,17,18,19}; std::chrono::high_resolution_clock::time_point t1, t2; t1 = std::chrono::high_resolution_clock::now(); for (long i = 0; i < 1e7; i++) { std::shuffle( a, a+10, g); func(a); } t2 = std::chrono::high_resolution_clock::now(); return std::chrono::duration_cast<std::chrono::duration<double>>(t2 - t1); } int main() { std::random_device rd; for (int i = 0; i < 10; i++) { const int seed = rd(); std::cout << "seed = " << seed << std::endl; std::cout << "sortNet10: " << benchmark(sortNet10, seed).count() << std::endl; std::cout << "sort: " << benchmark(sort, seed).count() << std::endl; } return 0; } 

与分拣networking相比,我一直在获得更好的分支selectsorting结果

 $ gcc -v gcc version 5.2.0 (GCC) $ g++ -std=c++11 -Ofast sort.cpp && ./a.out seed = -1727396418 sortNet10: 2.24137 sort: 2.21828 seed = 2003959850 sortNet10: 2.23914 sort: 2.21641 seed = 1994540383 sortNet10: 2.23782 sort: 2.21778 seed = 1258259982 sortNet10: 2.25199 sort: 2.21801 seed = 1821086932 sortNet10: 2.25535 sort: 2.2173 seed = 412262735 sortNet10: 2.24489 sort: 2.21776 seed = 1059795817 sortNet10: 2.29226 sort: 2.21777 seed = -188551272 sortNet10: 2.23803 sort: 2.22996 seed = 1043757247 sortNet10: 2.2503 sort: 2.23604 seed = -268332483 sortNet10: 2.24455 sort: 2.24304 

这个问题并不是说这是一种基于networking的应用程序。 引起我注意的一件事是:

我正在抽样数十亿个元素的数据集,每次我需要从中select10个数字(简化)并对它们进行sorting(并从sorting后的10个元素列表中作出结论)。

作为一名软硬件工程师,这对我来说绝对是尖叫“FPGA” 。 我不知道你需要从有序的数字集合中得出什么样的结论,或者数据来自哪里,但是我知道,在这些“一目了然”的数亿亿之间处理一些东西几乎是微不足道的,分析“ 每秒操作。 过去我已经完成了FPGA辅助的DNA测序工作。 当问题非常适合这种types的解决scheme时,几乎不可能击败FPGA的巨大处理能力。

在某种程度上,唯一的限制因素就是能够将数据快速传送到FPGA中,以及能够多快地将其传出。

作为参考,我devise了一个高性能的实时image processing器,以每秒约3亿像素的速率接收32位RGB图像数据。 通过FIR滤波器,matrix乘法器,查找表,空间边缘检测模块和其他一些操作stream出数据,然后出现另一端。 所有这一切都在一个相对较小的Xilinx Virtex2 FPGA上进行,内部时钟频率范围从33MHz左右,如果我没有记错的话,400MHz。 哦,是的,它也有一个DDR2控制器的实施,并运行了两组DDR2内存。

在数百MHz的频率下,每个时钟转换中FPGA可以输出10位32位的数字。 由于数据填充处理stream水线,操作开始时会有短暂的延迟。 之后,你应该能够得到每个时钟一个结果。 或者更多,如果可以通过复制sorting和分析pipe道来并行处理。 原则上,解决scheme几乎是微不足道的。

重点是:如果应用程序不是PC绑定的,并且数据stream和处理与FPGA解决scheme(无论是独立的还是作为机器中的协处理器卡)“兼容”,那么您就没有办法不pipealgorithm如何,用任何语言编写的软件都能够达到可达到的性能水平。

编辑:

只是跑快速search,find一个可能对你有用的文件。 它看起来可以追溯​​到2012年。你可以在今天的performance更好(甚至当时)。 这里是:

在FPGA上分类networking

我最近编写了一个使用Bose-Nelsonalgorithm在编译时生成sortingnetworking的小类 。

它可以用来为10个数字创build一个非常快速的sorting。

 /** * A Functor class to create a sort for fixed sized arrays/containers with a * compile time generated Bose-Nelson sorting network. * \tparam NumElements The number of elements in the array or container to sort. * \tparam T The element type. * \tparam Compare A comparator functor class that returns true if lhs < rhs. */ template <unsigned NumElements, class Compare = void> class StaticSort { template <class A, class C> struct Swap { template <class T> inline void s(T &v0, T &v1) { T t = Compare()(v0, v1) ? v0 : v1; // Min v1 = Compare()(v0, v1) ? v1 : v0; // Max v0 = t; } inline Swap(A &a, const int &i0, const int &i1) { s(a[i0], a[i1]); } }; template <class A> struct Swap <A, void> { template <class T> inline void s(T &v0, T &v1) { // Explicitly code out the Min and Max to nudge the compiler // to generate branchless code. T t = v0 < v1 ? v0 : v1; // Min v1 = v0 < v1 ? v1 : v0; // Max v0 = t; } inline Swap(A &a, const int &i0, const int &i1) { s(a[i0], a[i1]); } }; template <class A, class C, int I, int J, int X, int Y> struct PB { inline PB(A &a) { enum { L = X >> 1, M = (X & 1 ? Y : Y + 1) >> 1, IAddL = I + L, XSubL = X - L }; PB<A, C, I, J, L, M> p0(a); PB<A, C, IAddL, J + M, XSubL, Y - M> p1(a); PB<A, C, IAddL, J, XSubL, M> p2(a); } }; template <class A, class C, int I, int J> struct PB <A, C, I, J, 1, 1> { inline PB(A &a) { Swap<A, C> s(a, I - 1, J - 1); } }; template <class A, class C, int I, int J> struct PB <A, C, I, J, 1, 2> { inline PB(A &a) { Swap<A, C> s0(a, I - 1, J); Swap<A, C> s1(a, I - 1, J - 1); } }; template <class A, class C, int I, int J> struct PB <A, C, I, J, 2, 1> { inline PB(A &a) { Swap<A, C> s0(a, I - 1, J - 1); Swap<A, C> s1(a, I, J - 1); } }; template <class A, class C, int I, int M, bool Stop = false> struct PS { inline PS(A &a) { enum { L = M >> 1, IAddL = I + L, MSubL = M - L}; PS<A, C, I, L, (L <= 1)> ps0(a); PS<A, C, IAddL, MSubL, (MSubL <= 1)> ps1(a); PB<A, C, I, IAddL, L, MSubL> pb(a); } }; template <class A, class C, int I, int M> struct PS <A, C, I, M, true> { inline PS(A &a) {} }; public: /** * Sorts the array/container arr. * \param arr The array/container to be sorted. */ template <class Container> inline void operator() (Container &arr) const { PS<Container, Compare, 1, NumElements, (NumElements <= 1)> ps(arr); }; /** * Sorts the array arr. * \param arr The array to be sorted. */ template <class T> inline void operator() (T *arr) const { PS<T*, Compare, 1, NumElements, (NumElements <= 1)> ps(arr); }; }; #include <iostream> #include <vector> int main(int argc, const char * argv[]) { enum { NumValues = 10 }; // Arrays { int rands[NumValues]; for (int i = 0; i < NumValues; ++i) rands[i] = rand() % 100; std::cout << "Before Sort: \t"; for (int i = 0; i < NumValues; ++i) std::cout << rands[i] << " "; std::cout << "\n"; StaticSort<NumValues> staticSort; staticSort(rands); std::cout << "After Sort: \t"; for (int i = 0; i < NumValues; ++i) std::cout << rands[i] << " "; std::cout << "\n"; } std::cout << "\n"; // STL Vector { std::vector<int> rands(NumValues); for (int i = 0; i < NumValues; ++i) rands[i] = rand() % 100; std::cout << "Before Sort: \t"; for (int i = 0; i < NumValues; ++i) std::cout << rands[i] << " "; std::cout << "\n"; StaticSort<NumValues> staticSort; staticSort(rands); std::cout << "After Sort: \t"; for (int i = 0; i < NumValues; ++i) std::cout << rands[i] << " "; std::cout << "\n"; } return 0; } 

请注意,我们没有使用if (compare) swap语句,而是明确地将三元运算符编码为最小值和最大值。 这是为了帮助推动编译器使用无分支代码。

基准

下面的基准与铿-O3编译,并运行在我2012年中期的MacBook空气。

sorting随机数据

比较它与DarioP的代码,这里是sorting100万个大小为10的32位int数组的毫秒数:

硬编码sortingnetworking10: 88.774毫秒
模板化的玻色纳尔逊sorting10: 27.815毫秒

使用这种模板化的方法,我们也可以在编译时为其他数量的元素生成sortingnetworking。

时间(以毫秒为单位)对100万个不同大小的数组进行sorting。
大小为2,4,8的数组的毫秒数分别为1.943,8.655,20.246。
C ++模板化Bose-Nelson静态排序时序

向Glenn Teitelbaum发放展示插入sorting的信息。

这里是6个元素的小arrays的每种平均时钟。 基准代码和示例可以在这个问题中find:
最快的固定长度6 int数组

 Direct call to qsort library function : 326.81 Naive implementation (insertion sort) : 132.98 Insertion Sort (Daniel Stutzbach) : 104.04 Insertion Sort Unrolled : 99.64 Insertion Sort Unrolled (Glenn Teitelbaum) : 81.55 Rank Order : 44.01 Rank Order with registers : 42.40 Sorting Networks (Daniel Stutzbach) : 88.06 Sorting Networks (Paul R) : 31.64 Sorting Networks 12 with Fast Swap : 29.68 Sorting Networks 12 reordered Swap : 28.61 Reordered Sorting Network w/ fast swap : 24.63 Templated Sorting Network (this class) : 25.37 

它在6个元素的问题中执行速度最快。

性能sortingsorting的数据

通常,input数组可能已经被sorting或者大部分sorting。
在这种情况下,插入sorting可以是更好的select。

在这里输入图像说明

您可能需要根据数据select适当的sortingalgorithm。

用于基准的代码可以在这里find。

虽然networkingsorting在小型数组上有很高的可能性,但是如果适当优化,有时你不能击败插入sorting。 例如,批量插入2个元素:

 { final int a=in[0]<in[1]?in[0]:in[1]; final int b=in[0]<in[1]?in[1]:in[0]; in[0]=a; in[1]=b; } for(int x=2;x<10;x+=2) { final int a=in[x]<in[x+1]?in[x]:in[x+1]; final int b=in[x]<in[x+1]?in[x+1]:in[x]; int y= x-1; while(y>=0&&in[y]>b) { in[y+2]= in[y]; --y; } in[y+2]=b; while(y>=0&&in[y]>a) { in[y+1]= in[y]; --y; } in[y+1]=a; } 

您可以完全展开insertion sort

为了更容易,可以使用recursiontemplate ,而不需要函数开销。 既然它已经是一个template ,那么int也可以是一个template参数。 这也使编码数组以外的其他数组大小创build。

注意,要对int x[10]进行sorting,调用是insert_sort<int, 9>::sort(x); 因为class级使用最后一个项目的索引。 这可以被包装,但是这将是更多的代码来通读。

 template <class T, int NUM> class insert_sort; template <class T> class insert_sort<T,0> // stop template recursion // sorting 1 item is a no-op { public: static void place(T *x) {} static void sort(T * x) {} }; template <class T, int NUM> class insert_sort // use template recursion to do insertion sort // NUM is the index of the last item, eg. for x[10] call <9> { public: static void place(T *x) { T t1=x[NUM-1]; T t2=x[NUM]; if (t1 > t2) { x[NUM-1]=t2; x[NUM]=t1; insert_sort<T,NUM-1>::place(x); } } static void sort(T * x) { insert_sort<T,NUM-1>::sort(x); // sort everything before place(x); // put this item in } }; 

在我的testing中,这比sortingnetworking示例更快。

插入sorting需要平均29.6次比较来sorting10个input,其中最好的情况是9,最差的是45(给定的input是相反的)。

{9,6,1} shell将平均需要25.5个比较来对10个input进行sorting。 最好的情况是14次比较,最差的是34次,sorting反向input需要22次。

由于您似乎熟悉插入sorting,因此您可以将algorithm实现为{9,6}的sortingnetworking,然后添加插入sorting({1}):

 i[0] with i[9] // {9} i[0] with i[6] // {6} i[1] with i[7] // {6} i[2] with i[8] // {6} i[3] with i[9] // {6} i[0 ... 9] // insertion sort 

由于类似于我在这里描述的原因,以下sorting函数sort6_iterator()sort10_iterator_local()应该执行得很好,其中sortingnetworking取自以下位置:

 template<class IterType> inline void sort10_iterator(IterType it) { #define SORT2(x,y) {if(data##x>data##y)std::swap(data##x,data##y);} #define DD1(a) auto data##a=*(data+a); #define DD2(a,b) auto data##a=*(data+a), data##b=*(data+b); #define CB1(a) *(data+a)=data##a; #define CB2(a,b) *(data+a)=data##a;*(data+b)=data##b; DD2(1,4) SORT2(1,4) DD2(7,8) SORT2(7,8) DD2(2,3) SORT2(2,3) DD2(5,6) SORT2(5,6) DD2(0,9) SORT2(0,9) SORT2(2,5) SORT2(0,7) SORT2(8,9) SORT2(3,6) SORT2(4,9) SORT2(0,1) SORT2(0,2) CB1(0) SORT2(6,9) CB1(9) SORT2(3,5) SORT2(4,7) SORT2(1,8) SORT2(3,4) SORT2(5,8) SORT2(6,7) SORT2(1,2) SORT2(7,8) CB1(8) SORT2(1,3) CB1(1) SORT2(2,5) SORT2(4,6) SORT2(2,3) CB1(2) SORT2(6,7) CB1(7) SORT2(4,5) SORT2(3,4) CB2(3,4) SORT2(5,6) CB2(5,6) #undef CB1 #undef CB2 #undef DD1 #undef DD2 #undef SORT2 } 

为了调用这个函数,我传递了一个std::vector迭代器。