C ++ 11primefaces内存sorting – 这是放松(释放 – 消耗)sorting的正确用法吗?

我最近使用三重缓冲区的std :: atomic将C ++ 11作为端口,用作并发同步机制。 这个线程同步方法背后的想法是,对于生产者 – 消费者的情况,你有一个运行速度更快的生产者,消费者,三重缓冲可以给一些好处,因为生产者线程不会因为等待而“放慢”为消费者。 在我的情况下,我有一个更新为〜120fps的物理线程和一个以〜60fps运行的渲染线程。 很明显,我希望渲染线程总是获得最近的状态,但是我也知道,由于速率的不同,我将跳过物理线程中的很多帧。 另一方面,我希望我的物理线程保持其不变的更新速度,而不会被较慢的呈现线程locking我的数据所限制。

原来的C代码是由remis-thoughts制作的,完整的解释在他的博客里 。 我鼓励任何有兴趣阅读的人进一步了解原始实施。

我的实现可以在这里find。

基本思想是在任何给定的时间,有一个有3个位置(缓冲区)和一个primefaces标志的数组,它们被比较和交换来定义哪个数组元素对应于什么状态。 这样,只有一个primefacesvariables用于模型数组的所有3个索引和三重缓冲背后的逻辑。 缓冲区的3个位置被命名为Dirty,Clean和Snap。 生产者总是写入脏指数,并且可以翻转书写器以将脏指数与当前的干净指数交换。 使用者可以请求一个新的Snap,它使用Clean索引交换当前的Snap索引以获得最新的缓冲区。 消费者总是在Snap位置读取缓冲区。

该标志由一个8位无符号整数组成,这些位对应于:

(未使用)(新写入)(2个脏)(2个清理)(2个捕捉)

新的写额外位标志由写入器设置并由读取器清除。 读者可以使用它来检查自上次捕捉以来是否有任何写入,如果没有,则不会再捕捉。 标志和索引可以使用简单的按位操作来获得。

好的,现在的代码:

template <typename T> class TripleBuffer { public: TripleBuffer<T>(); TripleBuffer<T>(const T& init); // non-copyable behavior TripleBuffer<T>(const TripleBuffer<T>&) = delete; TripleBuffer<T>& operator=(const TripleBuffer<T>&) = delete; T snap() const; // get the current snap to read void write(const T newT); // write a new value bool newSnap(); // swap to the latest value, if any void flipWriter(); // flip writer positions dirty / clean T readLast(); // wrapper to read the last available element (newSnap + snap) void update(T newT); // wrapper to update with a new element (write + flipWriter) private: bool isNewWrite(uint_fast8_t flags); // check if the newWrite bit is 1 uint_fast8_t swapSnapWithClean(uint_fast8_t flags); // swap Snap and Clean indexes uint_fast8_t newWriteSwapCleanWithDirty(uint_fast8_t flags); // set newWrite to 1 and swap Clean and Dirty indexes // 8 bit flags are (unused) (new write) (2x dirty) (2x clean) (2x snap) // newWrite = (flags & 0x40) // dirtyIndex = (flags & 0x30) >> 4 // cleanIndex = (flags & 0xC) >> 2 // snapIndex = (flags & 0x3) mutable atomic_uint_fast8_t flags; T buffer[3]; }; 

执行:

 template <typename T> TripleBuffer<T>::TripleBuffer(){ T dummy = T(); buffer[0] = dummy; buffer[1] = dummy; buffer[2] = dummy; flags.store(0x6, std::memory_order_relaxed); // initially dirty = 0, clean = 1 and snap = 2 } template <typename T> TripleBuffer<T>::TripleBuffer(const T& init){ buffer[0] = init; buffer[1] = init; buffer[2] = init; flags.store(0x6, std::memory_order_relaxed); // initially dirty = 0, clean = 1 and snap = 2 } template <typename T> T TripleBuffer<T>::snap() const{ return buffer[flags.load(std::memory_order_consume) & 0x3]; // read snap index } template <typename T> void TripleBuffer<T>::write(const T newT){ buffer[(flags.load(std::memory_order_consume) & 0x30) >> 4] = newT; // write into dirty index } template <typename T> bool TripleBuffer<T>::newSnap(){ uint_fast8_t flagsNow(flags.load(std::memory_order_consume)); do { if( !isNewWrite(flagsNow) ) // nothing new, no need to swap return false; } while(!flags.compare_exchange_weak(flagsNow, swapSnapWithClean(flagsNow), memory_order_release, memory_order_consume)); return true; } template <typename T> void TripleBuffer<T>::flipWriter(){ uint_fast8_t flagsNow(flags.load(std::memory_order_consume)); while(!flags.compare_exchange_weak(flagsNow, newWriteSwapCleanWithDirty(flagsNow), memory_order_release, memory_order_consume)); } template <typename T> T TripleBuffer<T>::readLast(){ newSnap(); // get most recent value return snap(); // return it } template <typename T> void TripleBuffer<T>::update(T newT){ write(newT); // write new value flipWriter(); // change dirty/clean buffer positions for the next update } template <typename T> bool TripleBuffer<T>::isNewWrite(uint_fast8_t flags){ // check if the newWrite bit is 1 return ((flags & 0x40) != 0); } template <typename T> uint_fast8_t TripleBuffer<T>::swapSnapWithClean(uint_fast8_t flags){ // swap snap with clean return (flags & 0x30) | ((flags & 0x3) << 2) | ((flags & 0xC) >> 2); } template <typename T> uint_fast8_t TripleBuffer<T>::newWriteSwapCleanWithDirty(uint_fast8_t flags){ // set newWrite bit to 1 and swap clean with dirty return 0x40 | ((flags & 0xC) << 2) | ((flags & 0x30) >> 2) | (flags & 0x3); } 

正如你所看到的,我决定使用Release-Consume模式来进行内存sorting。 商店的发布 (memory_order_release)确保当前线程中的写入不会在存储之后重新sorting。 另一方面, Consume保证在当前线程中没有读取依赖于当前加载的值可以这个加载之前被重新sorting。 这可以确保写入到释放相同的primefacesvariables的其他线程中的从属variables在当前线程中可见。

如果我的理解是正确的,因为我只需要primefaces设置标志,对其他variables的操作不直接影响标志可以由编译器自由地重新sorting,允许更多的优化。 从阅读一些关于新内存模型的文档,我也意识到这些轻松的primefaces只会在ARM和POWER等平台上产生显着影响(主要是因为它们的引入)。 由于我的目标是ARM,我相信我可以从这些操作中受益,并且能够挤出更多的性能。

现在的问题是:

我是否正确使用Release-Consume这个特定问题的放松顺序?

谢谢,

安德烈

PS:很抱歉,这个post很长,但我相信为了更好地看待问题,需要一些体面的背景。

编辑:实施@雅克的build议:

  • 修正了在newSnap()flipWriter()中使用直接赋值的flags ,因此使用默认load(std::memory_order_seq_cst)
  • 为了清晰起见,将位操作移到专用函数。
  • newSnap()添加bool返回types,现在在没有新的情况下返回false,否则返回true。
  • 定义的类不可复制使用= delete习语,因为如果正在使用TripleBuffer ,复制和分配构造函数都是不安全的。

编辑2:修正描述,这是不正确的(谢谢@使用)。 消费者请求一个新的Snap并从Snap索引(而不是“writer”)中读取。 对不起,分心,并感谢无用指出。

编辑3:根据@Display Name的build议优化了newSnap()flipriter()函数,有效地消除了每个循环的2个冗余load()

你为什么要在CAS循环中加载旧的标志值两次? 第一次是通过flags.load() ,第二次通过compare_exchange_weak() ,标准在CAS失败时指定的第二次将先前的值加载到第一个参数中,在这种情况下是flagsNow。

根据http://en.cppreference.com/w/cpp/atomic/atomic/compare_exchange ,“ 否则,将存储在* this中的实际值加载到期望值(执行加载操作)。 ”所以你的循环做的是失败时, compare_exchange_weak()重载flagsNow ,然后循环重复,第一条语句在compare_exchange_weak()加载后立即再次加载它。 在我看来,你的循环应该把负载拉到循环之外。 例如, newSnap()将是:

 uint_fast8_t flagsNow(flags.load(std::memory_order_consume)); do { if( !isNewWrite(flagsNow)) return false; // nothing new, no need to swap } while(!flags.compare_exchange_weak(flagsNow, swapSnapWithClean(flagsNow), memory_order_release, memory_order_consume)); 

flipWriter()

 uint_fast8_t flagsNow(flags.load(std::memory_order_consume)); while(!flags.compare_exchange_weak(flagsNow, newWriteSwapCleanWithDirty(flagsNow), memory_order_release, memory_order_consume)); 

是的,这是memory_order_acquire和memory_order_consume之间的区别,但是当您每秒使用180次左右时,您不会注意到它。 如果你想知道答案的数字,你可以运行我的testingm2 = memory_order_consume。 只要将producer_or_consumer_Thread更改为如下所示:

 TripleBuffer <int> tb; void producer_or_consumer_Thread(void *arg) { struct Arg * a = (struct Arg *) arg; bool succeeded = false; int i = 0, k, kold = -1, kcur; while (a->run) { while (a->wait) a->is_waiting = true; // busy wait if (a->producer) { i++; tb.update(i); a->counter[0]++; } else { kcur = tb.snap(); if (kold != -1 && kcur != kold) a->counter[1]++; succeeded = tb0.newSnap(); if (succeeded) { k = tb.readLast(); if (kold == -1) kold = k; else if (kold = k + 1) kold = k; else succeeded = false; } if (succeeded) a->counter[0]++; } } a->is_waiting = true; } 

testing结果:

 _#_ __Produced __Consumed _____Total 1 39258150 19509292 58767442 2 24598892 14730385 39329277 3 10615129 10016276 20631405 4 10617349 10026637 20643986 5 10600334 9976625 20576959 6 10624009 10069984 20693993 7 10609040 10016174 20625214 8 25864915 15136263 41001178 9 39847163 19809974 59657137 10 29981232 16139823 46121055 11 10555174 9870567 20425741 12 25975381 15171559 41146940 13 24311523 14490089 38801612 14 10512252 9686540 20198792 15 10520211 9693305 20213516 16 10523458 9720930 20244388 17 10576840 9917756 20494596 18 11048180 9528808 20576988 19 11500654 9530853 21031507 20 11264789 9746040 21010829