无网格K均值(或其他优化)

注意:我会更多地了解如何处理和提出这些解决scheme的指南,而不是解决scheme本身。

在我的系统中,我有一个非常关键的性能function,在特定环境下显示为头号性能分析热点。 它处于k-means迭代的中间(已经使用并行处理multithreading来处理每个工作线程中的点的子范围)。

ClusterPoint& pt = points[j]; pt.min_index = -1; pt.min_dist = numeric_limits<float>::max(); for (int i=0; i < num_centroids; ++i) { const ClusterCentroid& cent = centroids[i]; const float dist = ...; if (dist < pt.min_dist) // <-- #1 hotspot { pt.min_dist = dist; pt.min_index = i; } } 

处理这部分代码所花费的时间大大减less,所以我经常在摆弄这些东西。 例如,将质心循环置于外部可能是值得的,并且对于给定的质心并行地遍历点。 这里的聚类点的数量跨越了数百万,而质心的数量跨越了数千个。 该algorithm适用于less数几次迭代(通常在10以下)。 它不寻求完美的收敛/稳定性,只是一些“合理”的近似。

任何想法都是值得赞赏的,但是我真正渴望发现的是,如果这个代码可以做到无分支,因为它可以允许一个SIMD版本。 我还没有真正发展出那种能够轻易掌握如何提出无分支解决scheme的思维能力:我的大脑在那里失败的很多,就像我在早期第一次接触到recursion的时候一样,所以一个关于如何写无分支代码以及如何培养适当的思维方式也是有帮助的。

总之,我正在寻找任何指导和暗示和build议(不一定是解决scheme)如何微观优化此代码。 它很可能有改进algorithm的空间,但是我的盲点一直在微型优化解决scheme(我很好奇学习如何更有效地应用它们而不会过度使用它)。 它已经是紧密的multithreading并行逻辑,所以我几乎被推入到微型优化的angular落,作为一个更快的事情,尝试没有更聪明的algorithm彻底。 我们完全可以改变内存布局。

对algorithmbuild议的回应

关于在寻求在algorithm级别上明显改进的O(knm)algorithm的微观优化,我完全同意这一点。 这将这个具体的问题推向了一个有点学术和不切实际的领域。 但是,如果我能够得到一个轶事,我来自一个高层次编程的原始背景 – 重点是广泛,大范围的观点,安全性,而对于低层次的实现细节则很less。 我最近把项目切换到了一种非常不同的现代风格的项目,我正在学习caching效率,GPGPU,无分支技术,SIMD,专用mem分配器的各种新技巧,这些技巧实际上超过了malloc但是对于特定的情况)等等

这正是我试图赶上最新性能趋势的地方,而且令人吃惊的是,我发现那些90年代我常常喜欢的那些通常是链接/树型结构的旧数据结构实际上正在被大大超越天真,微调优化的并行代码,在连续的存储器块上应用调优指令。 同时,我感觉我们现在更喜欢使用algorithm,并通过这种方式缩小了可能性(尤其是使用GPGPU),这有些令人失望。

最有趣的是,我发现这种types的微优化,快速的数组处理代码比我以前使用的复杂algorithm和数据结构更容易维护。 首先,他们更容易推广。 此外,我的同事经常会针对某个地区的特定放缓情况提出客户投诉,只是为了一个平行的SIMD,可能还有一些SIMD,并称之为加速完成。 algorithm的改进通常可以提供更多,但是这些微优化可以应用的速度和非侵入性让我想在这方面学得更多,因为阅读更好algorithm的论文可能需要一些时间(以及需要更多广泛的变化)。 所以我最近在微调优化方面做了一些改进,在这种情况下也许有点太多了,但是我的好奇心更多的是扩展我在任何情况下可能的解决scheme的范围。

拆卸

注意:在汇编中我真的很糟糕,所以我经常用一种反复的方式来调整一些东西,提出一些有教育意义的猜测,说明为什么vtune中显示的热点可能是瓶颈,然后试着看如果时间有所改善,假如时间有所改善,那么猜测就有一些真相,否则就完全错过了。

 000007FEEE3FB8A1 jl thread_partition+70h (7FEEE3FB780h) { ClusterPoint& pt = points[j]; pt.min_index = -1; pt.min_dist = numeric_limits<float>::max(); for (int i = 0; i < num_centroids; ++i) 000007FEEE3FB8A7 cmp ecx,r10d 000007FEEE3FB8AA jge thread_partition+1F4h (7FEEE3FB904h) 000007FEEE3FB8AC lea rax,[rbx+rbx*2] 000007FEEE3FB8B0 add rax,rax 000007FEEE3FB8B3 lea r8,[rbp+rax*8+8] { const ClusterCentroid& cent = centroids[i]; const float x = pt.pos[0] - cent.pos[0]; const float y = pt.pos[1] - cent.pos[1]; 000007FEEE3FB8B8 movss xmm0,dword ptr [rdx] const float z = pt.pos[2] - cent.pos[2]; 000007FEEE3FB8BC movss xmm2,dword ptr [rdx+4] 000007FEEE3FB8C1 movss xmm1,dword ptr [rdx-4] 000007FEEE3FB8C6 subss xmm2,dword ptr [r8] 000007FEEE3FB8CB subss xmm0,dword ptr [r8-4] 000007FEEE3FB8D1 subss xmm1,dword ptr [r8-8] const float dist = x*x + y*y + z*z; 000007FEEE3FB8D7 mulss xmm2,xmm2 000007FEEE3FB8DB mulss xmm0,xmm0 000007FEEE3FB8DF mulss xmm1,xmm1 000007FEEE3FB8E3 addss xmm2,xmm0 000007FEEE3FB8E7 addss xmm2,xmm1 if (dist < pt.min_dist) // VTUNE HOTSPOT 000007FEEE3FB8EB comiss xmm2,dword ptr [rdx-8] 000007FEEE3FB8EF jae thread_partition+1E9h (7FEEE3FB8F9h) { pt.min_dist = dist; 000007FEEE3FB8F1 movss dword ptr [rdx-8],xmm2 pt.min_index = i; 000007FEEE3FB8F6 mov dword ptr [rdx-10h],ecx 000007FEEE3FB8F9 inc ecx 000007FEEE3FB8FB add r8,30h 000007FEEE3FB8FF cmp ecx,r10d 000007FEEE3FB902 jl thread_partition+1A8h (7FEEE3FB8B8h) for (int j = *irange.first; j < *irange.last; ++j) 000007FEEE3FB904 inc edi 000007FEEE3FB906 add rdx,20h 000007FEEE3FB90A cmp edi,dword ptr [rsi+4] 000007FEEE3FB90D jl thread_partition+31h (7FEEE3FB741h) 000007FEEE3FB913 mov rbx,qword ptr [irange] } } } } 

我们被迫瞄准SSE 2–在我们这个时代有点落后,但是当我们假设即使SSE 4作为最低要求(用户有一些英特尔原型机)时,用户群实际上也绊倒了一次。

更新与独立testing:约5.6秒

我非常感谢所有提供的帮助! 由于代码库相当广泛,触发该代码的条件非常复杂(系统事件在多个线程间触发),因此每次进行实验性更改和configuration文件都有点难以处理。 所以我作为一个单独的应用程序,作为一个独立的应用程序,其他人也可以运行和尝试,以便我可以尝试所有这些优雅提供的解决scheme的肤浅testing。

 #define _SECURE_SCL 0 #include <iostream> #include <fstream> #include <vector> #include <limits> #include <ctime> #if defined(_MSC_VER) #define ALIGN16 __declspec(align(16)) #else #include <malloc.h> #define ALIGN16 __attribute__((aligned(16))) #endif using namespace std; // Aligned memory allocation (for SIMD). static void* malloc16(size_t amount) { #ifdef _MSC_VER return _aligned_malloc(amount, 16); #else void* mem = 0; posix_memalign(&mem, 16, amount); return mem; #endif } template <class T> static T* malloc16_t(size_t num_elements) { return static_cast<T*>(malloc16(num_elements * sizeof(T))); } // Aligned free. static void free16(void* mem) { #ifdef _MSC_VER return _aligned_free(mem); #else free(mem); #endif } // Test parameters. enum {num_centroids = 512}; enum {num_points = num_centroids * 2000}; enum {num_iterations = 5}; static const float range = 10.0f; class Points { public: Points(): data(malloc16_t<Point>(num_points)) { for (int p=0; p < num_points; ++p) { const float xyz[3] = { range * static_cast<float>(rand()) / RAND_MAX, range * static_cast<float>(rand()) / RAND_MAX, range * static_cast<float>(rand()) / RAND_MAX }; init(p, xyz); } } ~Points() { free16(data); } void init(int n, const float* xyz) { data[n].centroid = -1; data[n].xyz[0] = xyz[0]; data[n].xyz[1] = xyz[1]; data[n].xyz[2] = xyz[2]; } void associate(int n, int new_centroid) { data[n].centroid = new_centroid; } int centroid(int n) const { return data[n].centroid; } float* operator[](int n) { return data[n].xyz; } private: Points(const Points&); Points& operator=(const Points&); struct Point { int centroid; float xyz[3]; }; Point* data; }; class Centroids { public: Centroids(Points& points): data(malloc16_t<Centroid>(num_centroids)) { // Naive initial selection algorithm, but outside the // current area of interest. for (int c=0; c < num_centroids; ++c) init(c, points[c]); } ~Centroids() { free16(data); } void init(int n, const float* xyz) { data[n].count = 0; data[n].xyz[0] = xyz[0]; data[n].xyz[1] = xyz[1]; data[n].xyz[2] = xyz[2]; } void reset(int n) { data[n].count = 0; data[n].xyz[0] = 0.0f; data[n].xyz[1] = 0.0f; data[n].xyz[2] = 0.0f; } void sum(int n, const float* pt_xyz) { data[n].xyz[0] += pt_xyz[0]; data[n].xyz[1] += pt_xyz[1]; data[n].xyz[2] += pt_xyz[2]; ++data[n].count; } void average(int n) { if (data[n].count > 0) { const float inv_count = 1.0f / data[n].count; data[n].xyz[0] *= inv_count; data[n].xyz[1] *= inv_count; data[n].xyz[2] *= inv_count; } } float* operator[](int n) { return data[n].xyz; } int find_nearest(const float* pt_xyz) const { float min_dist_squared = numeric_limits<float>::max(); int min_centroid = -1; for (int c=0; c < num_centroids; ++c) { const float* cen_xyz = data[c].xyz; const float x = pt_xyz[0] - cen_xyz[0]; const float y = pt_xyz[1] - cen_xyz[1]; const float z = pt_xyz[2] - cen_xyz[2]; const float dist_squared = x*x + y*y * z*z; if (min_dist_squared > dist_squared) { min_dist_squared = dist_squared; min_centroid = c; } } return min_centroid; } private: Centroids(const Centroids&); Centroids& operator=(const Centroids&); struct Centroid { int count; float xyz[3]; }; Centroid* data; }; // A high-precision real timer would be nice, but we lack C++11 and // the coarseness of the testing here should allow this to suffice. static double sys_time() { return static_cast<double>(clock()) / CLOCKS_PER_SEC; } static void k_means(Points& points, Centroids& centroids) { // Find the closest centroid for each point. for (int p=0; p < num_points; ++p) { const float* pt_xyz = points[p]; points.associate(p, centroids.find_nearest(pt_xyz)); } // Reset the data of each centroid. for (int c=0; c < num_centroids; ++c) centroids.reset(c); // Compute new position sum of each centroid. for (int p=0; p < num_points; ++p) centroids.sum(points.centroid(p), points[p]); // Compute average position of each centroid. for (int c=0; c < num_centroids; ++c) centroids.average(c); } int main() { Points points; Centroids centroids(points); cout << "Starting simulation..." << endl; double start_time = sys_time(); for (int i=0; i < num_iterations; ++i) k_means(points, centroids); cout << "Time passed: " << (sys_time() - start_time) << " secs" << endl; cout << "# Points: " << num_points << endl; cout << "# Centroids: " << num_centroids << endl; // Write the centroids to a file to give us some crude verification // of consistency as we make changes. ofstream out("centroids.txt"); for (int c=0; c < num_centroids; ++c) out << "Centroid " << c << ": " << centroids[c][0] << "," << centroids[c][1] << "," << centroids[c][2] << endl; } 

我意识到表面testing的危险性,但是由于它已经被认为是以前的真实世界的一个热点,所以我希望这是一个可以原谅的问题。 我也只是对与微代码优化有关的一般技术感兴趣。

我在分析这一个时得到了稍微不同的结果。 在这个循环中,时间会更加均匀地分散,我不知道为什么。 也许这是因为数据较小(我省略了成员,并将min_dist成员挂起,并将其作为本地variables)。 中心到点之间的确切比例也有点不同,但是希望足够接近以将这里的改进翻译成原始代码。 在这个表面testing中,它也是单线程的,反汇编看起来很不相同,所以我可能冒着在没有原始的情况下优化这个表面testing的风险(现在我愿意冒这个风险,因为我更有兴趣扩展我的知识可以优化这些情况的技术,而不是针对这种确切情况的解决scheme)。

VTune™可视化

更新与Yochai Timmer的build议 – 约12.5秒

呵呵,我面对微观优化的困境,却无法很好地理解程序集。 我replace了这个:

  -if (min_dist_squared > dist_squared) -{ - min_dist_squared = dist_squared; - pt.centroid = c; -} 

有了这个:

  +const bool found_closer = min_dist_squared > dist_squared; +pt.centroid = bitselect(found_closer, c, pt.centroid); +min_dist_squared = bitselect(found_closer, dist_squared, min_dist_squared); 

只有find时间从5.6秒升级到12.5秒。 尽pipe如此,这不是他的错,也没有消除他的解决scheme的价值 – 这是我的机器级别上真正发生的事情,并在黑暗中刺伤。 那个显然错过了,显然我并不像我最初想的那样是分支预测失误的受害者。 尽pipe如此,他提出的解决scheme在这种情况下是一个非常好的和普遍的function,我很感激把它添加到我的提示和技巧工具箱中。 现在第二轮。

Harold的SIMD解决scheme – 2.496秒(参见注意事项)

这个解决scheme可能会很棒。 将集群代表转换为SoA后,我得到了这个〜2.5秒的时间! 不幸的是,似乎有某种故障。 对于最终输出,我得到了非常不同的结果,这个结果表明不仅仅是精度上的微小差异,还包括一些以0为结尾的质心(意味着在search中没有find它们)。 我一直试图通过debugging器来查看SIMD逻辑,看看可能会出现什么情况 – 这可能仅仅是我的一个抄录错误,但这里是代码,以防有人发现错误。

如果这个错误可以在不减慢结果的情况下得到纠正,那么这个速度的提升就比我以前从纯微观优化中想象的要多!

  // New version of Centroids::find_nearest (from harold's solution): int find_nearest(const float* pt_xyz) const { __m128i min_index = _mm_set_epi32(3, 2, 1, 0); __m128 xdif = _mm_sub_ps(_mm_set1_ps(pt_xyz[0]), _mm_load_ps(cen_x)); __m128 ydif = _mm_sub_ps(_mm_set1_ps(pt_xyz[1]), _mm_load_ps(cen_y)); __m128 zdif = _mm_sub_ps(_mm_set1_ps(pt_xyz[2]), _mm_load_ps(cen_z)); __m128 min_dist = _mm_add_ps(_mm_add_ps(_mm_mul_ps(xdif, xdif), _mm_mul_ps(ydif, ydif)), _mm_mul_ps(zdif, zdif)); __m128i index = min_index; for (int i=4; i < num_centroids; i += 4) { xdif = _mm_sub_ps(_mm_set1_ps(pt_xyz[0]), _mm_load_ps(cen_x + i)); ydif = _mm_sub_ps(_mm_set1_ps(pt_xyz[1]), _mm_load_ps(cen_y + i)); zdif = _mm_sub_ps(_mm_set1_ps(pt_xyz[2]), _mm_load_ps(cen_z + i)); __m128 dist = _mm_add_ps(_mm_add_ps(_mm_mul_ps(xdif, xdif), _mm_mul_ps(ydif, ydif)), _mm_mul_ps(zdif, zdif)); __m128i mask = _mm_castps_si128(_mm_cmplt_ps(dist, min_dist)); min_dist = _mm_min_ps(min_dist, dist); min_index = _mm_or_si128(_mm_and_si128(index, mask), _mm_andnot_si128(mask, min_index)); index = _mm_add_epi32(index, _mm_set1_epi32(4)); } ALIGN16 float mdist[4]; ALIGN16 uint32_t mindex[4]; _mm_store_ps(mdist, min_dist); _mm_store_si128((__m128i*)mindex, min_index); float closest = mdist[0]; int closest_i = mindex[0]; for (int i=1; i < 4; i++) { if (mdist[i] < closest) { closest = mdist[i]; closest_i = mindex[i]; } } return closest_i; } 

哈罗德的SIMD解决scheme(更正) – 约2.5秒

应用更正和testing后,结果是完整的,function正常与原始代码库类似的改进!

由于这打击了知识的圣杯,我试图更好地理解(无分支SIMD),所以我将用一些额外的道具来奖励解决scheme,以使操作速度提高一倍以上。 为了理解,我做了功课,因为我的目标不仅仅是减轻这个热点,而且扩展我个人对可能的解决scheme的理解。

尽pipe如此,我很感激所有的贡献,从algorithm的build议到非常酷的bitselect技巧! 我希望我能接受所有的答案。 我可能会在某个时候尝试所有这些,但是现在我已经在理解其中一些非算术SIMD操作的情况下做了功课。

 int find_nearest_simd(const float* pt_xyz) const { __m128i min_index = _mm_set_epi32(3, 2, 1, 0); __m128 pt_xxxx = _mm_set1_ps(pt_xyz[0]); __m128 pt_yyyy = _mm_set1_ps(pt_xyz[1]); __m128 pt_zzzz = _mm_set1_ps(pt_xyz[2]); __m128 xdif = _mm_sub_ps(pt_xxxx, _mm_load_ps(cen_x)); __m128 ydif = _mm_sub_ps(pt_yyyy, _mm_load_ps(cen_y)); __m128 zdif = _mm_sub_ps(pt_zzzz, _mm_load_ps(cen_z)); __m128 min_dist = _mm_add_ps(_mm_add_ps(_mm_mul_ps(xdif, xdif), _mm_mul_ps(ydif, ydif)), _mm_mul_ps(zdif, zdif)); __m128i index = min_index; for (int i=4; i < num_centroids; i += 4) { xdif = _mm_sub_ps(pt_xxxx, _mm_load_ps(cen_x + i)); ydif = _mm_sub_ps(pt_yyyy, _mm_load_ps(cen_y + i)); zdif = _mm_sub_ps(pt_zzzz, _mm_load_ps(cen_z + i)); __m128 dist = _mm_add_ps(_mm_add_ps(_mm_mul_ps(xdif, xdif), _mm_mul_ps(ydif, ydif)), _mm_mul_ps(zdif, zdif)); index = _mm_add_epi32(index, _mm_set1_epi32(4)); __m128i mask = _mm_castps_si128(_mm_cmplt_ps(dist, min_dist)); min_dist = _mm_min_ps(min_dist, dist); min_index = _mm_or_si128(_mm_and_si128(index, mask), _mm_andnot_si128(mask, min_index)); } ALIGN16 float mdist[4]; ALIGN16 uint32_t mindex[4]; _mm_store_ps(mdist, min_dist); _mm_store_si128((__m128i*)mindex, min_index); float closest = mdist[0]; int closest_i = mindex[0]; for (int i=1; i < 4; i++) { if (mdist[i] < closest) { closest = mdist[i]; closest_i = mindex[i]; } } return closest_i; } 

太糟糕了,我们不能使用SSE4.1,但是非常好,SSE2就是这样。 我没有testing这个,只是编译它,看看是否有语法错误,看看是否大会是否有意义(大多数是正常的,虽然GCC泄漏min_index即使有一些xmm寄存器不使用,不知道为什么会发生这种情况)

 int find_closest(float *x, float *y, float *z, float pt_x, float pt_y, float pt_z, int n) { __m128i min_index = _mm_set_epi32(3, 2, 1, 0); __m128 xdif = _mm_sub_ps(_mm_set1_ps(pt_x), _mm_load_ps(x)); __m128 ydif = _mm_sub_ps(_mm_set1_ps(pt_y), _mm_load_ps(y)); __m128 zdif = _mm_sub_ps(_mm_set1_ps(pt_z), _mm_load_ps(z)); __m128 min_dist = _mm_add_ps(_mm_add_ps(_mm_mul_ps(xdif, xdif), _mm_mul_ps(ydif, ydif)), _mm_mul_ps(zdif, zdif)); __m128i index = min_index; for (int i = 4; i < n; i += 4) { xdif = _mm_sub_ps(_mm_set1_ps(pt_x), _mm_load_ps(x + i)); ydif = _mm_sub_ps(_mm_set1_ps(pt_y), _mm_load_ps(y + i)); zdif = _mm_sub_ps(_mm_set1_ps(pt_z), _mm_load_ps(z + i)); __m128 dist = _mm_add_ps(_mm_add_ps(_mm_mul_ps(xdif, xdif), _mm_mul_ps(ydif, ydif)), _mm_mul_ps(zdif, zdif)); index = _mm_add_epi32(index, _mm_set1_epi32(4)); __m128i mask = _mm_castps_si128(_mm_cmplt_ps(dist, min_dist)); min_dist = _mm_min_ps(min_dist, dist); min_index = _mm_or_si128(_mm_and_si128(index, mask), _mm_andnot_si128(mask, min_index)); } float mdist[4]; _mm_store_ps(mdist, min_dist); uint32_t mindex[4]; _mm_store_si128((__m128i*)mindex, min_index); float closest = mdist[0]; int closest_i = mindex[0]; for (int i = 1; i < 4; i++) { if (mdist[i] < closest) { closest = mdist[i]; closest_i = mindex[i]; } } return closest_i; } 

像往常一样,它期望指针是16alignment的。 此外,填充应该是无限的点(所以他们永远不会最接近目标)。

上证所4.1会让你取代这个

 min_index = _mm_or_si128(_mm_and_si128(index, mask), _mm_andnot_si128(mask, min_index)); 

这样

 min_index = _mm_blendv_epi8(min_index, index, mask); 

这是一个asm版本,为vsyasm制作 ,testing了一下(似乎工作)

 bits 64 section .data align 16 centroid_four: dd 4, 4, 4, 4 centroid_index: dd 0, 1, 2, 3 section .text global find_closest proc_frame find_closest ; ; arguments: ; ecx: number of points (multiple of 4 and at least 4) ; rdx -> array of 3 pointers to floats (x, y, z) (the points) ; r8 -> array of 3 floats (the reference point) ; alloc_stack 0x58 save_xmm128 xmm6, 0 save_xmm128 xmm7, 16 save_xmm128 xmm8, 32 save_xmm128 xmm9, 48 [endprolog] movss xmm0, [r8] shufps xmm0, xmm0, 0 movss xmm1, [r8 + 4] shufps xmm1, xmm1, 0 movss xmm2, [r8 + 8] shufps xmm2, xmm2, 0 ; pointers to x, y, z in r8, r9, r10 mov r8, [rdx] mov r9, [rdx + 8] mov r10, [rdx + 16] ; reference point is in xmm0, xmm1, xmm2 (x, y, z) movdqa xmm3, [rel centroid_index] ; min_index movdqa xmm4, xmm3 ; current index movdqa xmm9, [rel centroid_four] ; index increment paddd xmm4, xmm9 ; calculate initial min_dist, xmm5 movaps xmm5, [r8] subps xmm5, xmm0 movaps xmm7, [r9] subps xmm7, xmm1 movaps xmm8, [r10] subps xmm8, xmm2 mulps xmm5, xmm5 mulps xmm7, xmm7 mulps xmm8, xmm8 addps xmm5, xmm7 addps xmm5, xmm8 add r8, 16 add r9, 16 add r10, 16 sub ecx, 4 jna _tail _loop: movaps xmm6, [r8] subps xmm6, xmm0 movaps xmm7, [r9] subps xmm7, xmm1 movaps xmm8, [r10] subps xmm8, xmm2 mulps xmm6, xmm6 mulps xmm7, xmm7 mulps xmm8, xmm8 addps xmm6, xmm7 addps xmm6, xmm8 add r8, 16 add r9, 16 add r10, 16 movaps xmm7, xmm6 cmpps xmm6, xmm5, 1 minps xmm5, xmm7 movdqa xmm7, xmm6 pand xmm6, xmm4 pandn xmm7, xmm3 por xmm6, xmm7 movdqa xmm3, xmm6 paddd xmm4, xmm9 sub ecx, 4 ja _loop _tail: ; calculate horizontal minumum pshufd xmm0, xmm5, 0xB1 minps xmm0, xmm5 pshufd xmm1, xmm0, 0x4E minps xmm0, xmm1 ; find index of the minimum cmpps xmm0, xmm5, 0 movmskps eax, xmm0 bsf eax, eax ; index into xmm3, sort of movaps [rsp + 64], xmm3 mov eax, [rsp + 64 + rax * 4] movaps xmm9, [rsp + 48] movaps xmm8, [rsp + 32] movaps xmm7, [rsp + 16] movaps xmm6, [rsp] add rsp, 0x58 ret endproc_frame 

在C ++中:

 extern "C" int find_closest(int n, float** points, float* reference_point); 

您可以使用无分支三元运算符,有时称为bitselect(condition?true:false)。
只要使用它的2名成员,默认无所事事。
不要担心额外的操作,与if语句分支相比,它们没有任何意义。

bitselect实现:

 inline static int bitselect(int condition, int truereturnvalue, int falsereturnvalue) { return (truereturnvalue & -condition) | (falsereturnvalue & ~(-condition)); //a when TRUE and b when FALSE } inline static float bitselect(int condition, float truereturnvalue, float falsereturnvalue) { //Reinterpret floats. Would work because it's just a bit select, no matter the actual value int& at = reinterpret_cast<int&>(truereturnvalue); int& af = reinterpret_cast<int&>(falsereturnvalue); int res = (at & -condition) | (af & ~(-condition)); //a when TRUE and b when FALSE return reinterpret_cast<float&>(res); } 

你的循环应该是这样的:

 for (int i=0; i < num_centroids; ++i) { const ClusterCentroid& cent = centroids[i]; const float dist = ...; bool isSmaeller = dist < pt.min_dist; //use same value if not smaller pt.min_index = bitselect(isSmaeller, i, pt.min_index); pt.min_dist = bitselect(isSmaeller, dist, pt.min_dist); } 

C ++是一种高级语言。 假设C ++源代码中的控制stream转换为分支指令是有缺陷的。 我没有你的例子中的某些types的定义,所以我用一个类似的条件赋值做了一个简单的testing程序:

 int g(int, int); int f(const int *arr) { int min = 10000, minIndex = -1; for ( int i = 0; i < 1000; ++i ) { if ( arr[i] < min ) { min = arr[i]; minIndex = i; } } return g(min, minIndex); } 

请注意,使用未定义的“g”只是为了防止优化器删除所有内容。 我用G ++ 4.9.2用-O3和-S将它转换成x86_64程序集(甚至不需要改变默认的-march),并且(不太令人吃惊)的结果是循环体不包含分支

 movl (%rdi,%rax,4), %ecx movl %edx, %r8d cmpl %edx, %ecx cmovle %ecx, %r8d cmovl %eax, %esi addq $1, %rax 

除此之外,无分支必然更快的假设也可能是有缺陷的,因为新距离“击败”旧的概率减less了你所看到的更多元素。 这不是一个硬币折腾。 当编译器在生成“as-if”组件时比现在更不积极地发明“bitselect”技巧。 我宁愿build议看一看编译器实际生成的汇编types,然后再尝试重写代码,以便编译器能够更好地优化它,或者将结果作为手写汇编的基础。 如果你想看看SIMD,我会build议尝试减less数据依赖性(在我的例子中,“min”的依赖关系可能是一个瓶颈)的“最小最小”方法。

首先,我build议您在尝试任何代码更改之前,先看看优化构build中的反汇编。 理想情况下,您希望在组件级别查看分析器数据。 这可以显示出各种各样的东西,例如:

  1. 编译器可能没有生成实际的分支指令。
  2. 具有瓶颈的代码行可能会有比您想象的更多的与其相关的指令 – 例如dist计算。

除此之外,还有一个标准的技巧,就是当你谈论距离计算时,他们往往需要一个平方根。 你应该在最小平方值的过程结束时做平方根。

SSE可以使用_mm_min_ps一次处理四个值,而不需要任何分支。 如果你真的需要速度,那么你想要使用SSE(或AVX)内在函数。 这是一个基本的例子:

  float MinimumDistance(const float *values, int count) { __m128 min = _mm_set_ps(FLT_MAX, FLT_MAX, FLT_MAX, FLT_MAX); int i=0; for (; i < count - 3; i+=4) { __m128 distances = _mm_loadu_ps(&values[i]); min = _mm_min_ps(min, distances); } // Combine the four separate minimums to a single value min = _mm_min_ps(min, _mm_shuffle_ps(min, min, _MM_SHUFFLE(2, 3, 0, 1))); min = _mm_min_ps(min, _mm_shuffle_ps(min, min, _MM_SHUFFLE(1, 0, 3, 2))); // Deal with the last 0-3 elements the slow way float result = FLT_MAX; if (count > 3) _mm_store_ss(&result, min); for (; i < count; i++) { result = min(values[i], result); } return result; } 

For best SSE performance you should make sure the loads happen at aligned addresses. You can handle the first few misaligned elements in the same way as the last few in the code above if necessary.

The other thing to watch out for is memory bandwidth. If there's several members of the ClusterCentroid structure that you don't use during that loop then you'll be reading much more data from memory than you really need to as memory is read in cache line sized chunks, which are 64 bytes each.

This might go both ways, but I'd give the following structure a try:

 std::vector<float> centDists(num_centroids); //<-- one for each thread. for (size_t p=0; p<num_points; ++p) { Point& pt = points[p]; for (size_t c=0; c<num_centroids; ++c) { const float dist = ...; centDists[c]=dist; } pt.min_idx it= min_element(centDists.begin(),centDists.end())-centDists.begin(); } 

Obviously, you now have to iterate two times over memory, which probably hurts the cache hit to miss ratio (you could also split it into sub ranges) but on the other hand, each of the inner loops should be easy to vectorize and unroll – so you just have to measure whether it is worth it.

And even if you stick to your version, I'd try using local variables to keep track of the minimum index and distance and apply the results to point at the end.
The rational is, that each read or write to pt.min_dist is effectively done through a pointer, which – depending on the compiler optimizations – may or may not decrease your performance.

Another thing that is important for vectorizations is to turn an array of Structs (in this case cententroids) into a struct of arrays (So eg one array for each coordinate of the points), because that way you don't need extra gather instructions in order to load the data for usage with SIMD instructions. See Eric Brumer's talk for more information on that topic.

EDIT: Some numbers for my system (haswell, clang 3.5):
I did a short test with your benchmark and on my system, above code slowed the algorithm down by about 10% – essentially, nothing could be vectorized.

However, when applying the AoS to SoA transformation for your centroids, the distance calculation was vectorized, which lead to a reduction of the overall runtime of about 40% compared to your original structure with applied AoS to SoA transformation.

One possible micro-optimizations: Store min_dist and min_index in local variables. The compiler may have to write to memory more often the way you have it written; on some architectures this can have a big performance impact. See my answer here for another example.

Adams's suggestion of doing 4 compares at once is also a good one.

However, your best speedup is going to come from reducing the number of centroids you have to check. Ideally, build a kd-tree (or similar) around the centroids, then query that to find the closest point.

If you don't have any tree building code lying around, here's my favorite "poor-man's" closest point search:

 Sort the points by one coordinate, eg cent.pos[0] Pick a starting index for the query point (pt) Iterate forwards through the candidate points until you reach the end, OR when abs(pt.pos[0] - cent.pos[0]) > min_dist Repeat the previous step going the opposite direction. 

The extra stopping condition for the search means that you should skip a fair amount of points; you're also guaranteed not to skip any points closer than the best you've already found.

So for your code, this looks something like

 // sort centroid by x coordinate. min_index = -1; min_dist = numeric_limits<float>::max(); // pick the start index. This works well if the points are evenly distributed. float min_x = centroids[0].pos[0]; float max_x = centroids[num_centroids-1].pos[0]; float cur_x = pt.pos[0]; float t = (max_x - cur_x) / (max_x - min_x); // TODO clamp t between 0 and 1 int start_index = int(t * float(num_centroids)) // Forward search for (int i=start_index ; i < num_centroids; ++i) { const ClusterCentroid& cent = centroids[i]; if (fabs(cent.pos[0] - pt.pos[0]) > min_i) // Everything to the right of this must be further min_dist, so break. // This is where the savings comes from! break; const float dist = ...; if (dist < min_dist) { min_dist = dist; min_index = i; } } // Backwards search for (int i=start_index ; i >= 0; --i) { // same as above } pt.min_dist = min_dist pt.min_index = min_index 

(Note that this assumes you're computing the distance between points, but your assembly indicates it's the distance squared. Adjust the break condition accordingly).

There's slight overhead to building the tree or sorting the centroids, but this should be offset by making the calculations faster in the bigger loop (over the number of points).