C ++ 0x没有信号量? 如何同步线程?

C ++ 0x是否会出现没有信号量的情况? 关于使用信号量,Stack Overflow已经有一些问题了。 我一直使用它们(posix信号量)让一个线程在另一个线程中等待某个事件:

void thread0(...) { doSomething0(); event1.wait(); ... } void thread1(...) { doSomething1(); event1.post(); ... } 

如果我用互斥体做到这一点:

 void thread0(...) { doSomething0(); event1.lock(); event1.unlock(); ... } void thread1(...) { event1.lock(); doSomethingth1(); event1.unlock(); ... } 

问题:这是丑陋的,并不能保证thread1首先锁定互斥锁(假设同一个线程应该锁定和解锁互斥锁,在thread0和thread1开始之前,也不能锁定event1)。

所以既然boost还没有信号量,那么实现上面的最简单的方法是什么?

你可以很容易地从一个互斥体和一个条件变量中构建一个:

 #include <mutex> #include <condition_variable> class semaphore { private: std::mutex mutex_; std::condition_variable condition_; unsigned long count_ = 0; // Initialized as locked. public: void notify() { std::unique_lock<decltype(mutex_)> lock(mutex_); ++count_; condition_.notify_one(); } void wait() { std::unique_lock<decltype(mutex_)> lock(mutex_); while(!count_) // Handle spurious wake-ups. condition_.wait(lock); --count_; } bool try_wait() { std::unique_lock<decltype(mutex_)> lock(mutex_); if(count_) { --count_; return true; } return false; } }; 

基于“格言Yegorushkin”的答案,我试图用C ++ 11风格的例子。

 #include <mutex> #include <condition_variable> class Semaphore { public: Semaphore (int count_ = 0) : count(count_) {} inline void notify() { std::unique_lock<std::mutex> lock(mtx); count++; cv.notify_one(); } inline void wait() { std::unique_lock<std::mutex> lock(mtx); while(count == 0){ cv.wait(lock); } count--; } private: std::mutex mtx; std::condition_variable cv; int count; }; 

我决定用尽可能多的标准样式编写最强大/通用的C ++ 11信号量(注意using semaphore = ... ,通常你只需要使用类似于正常使用string的名字semaphorebasic_string ):

 template <typename Mutex, typename CondVar> class basic_semaphore { public: using native_handle_type = typename CondVar::native_handle_type; explicit basic_semaphore(size_t count = 0); basic_semaphore(const basic_semaphore&) = delete; basic_semaphore(basic_semaphore&&) = delete; basic_semaphore& operator=(const basic_semaphore&) = delete; basic_semaphore& operator=(basic_semaphore&&) = delete; void notify(); void wait(); bool try_wait(); template<class Rep, class Period> bool wait_for(const std::chrono::duration<Rep, Period>& d); template<class Clock, class Duration> bool wait_until(const std::chrono::time_point<Clock, Duration>& t); native_handle_type native_handle(); private: Mutex mMutex; CondVar mCv; size_t mCount; }; using semaphore = basic_semaphore<std::mutex, std::condition_variable>; template <typename Mutex, typename CondVar> basic_semaphore<Mutex, CondVar>::basic_semaphore(size_t count) : mCount{count} {} template <typename Mutex, typename CondVar> void basic_semaphore<Mutex, CondVar>::notify() { std::lock_guard<Mutex> lock{mMutex}; ++mCount; mCv.notify_one(); } template <typename Mutex, typename CondVar> void basic_semaphore<Mutex, CondVar>::wait() { std::unique_lock<Mutex> lock{mMutex}; mCv.wait(lock, [&]{ return mCount > 0; }); --mCount; } template <typename Mutex, typename CondVar> bool basic_semaphore<Mutex, CondVar>::try_wait() { std::lock_guard<Mutex> lock{mMutex}; if (mCount > 0) { --mCount; return true; } return false; } template <typename Mutex, typename CondVar> template<class Rep, class Period> bool basic_semaphore<Mutex, CondVar>::wait_for(const std::chrono::duration<Rep, Period>& d) { std::unique_lock<Mutex> lock{mMutex}; auto finished = mCv.wait_for(lock, d, [&]{ return mCount > 0; }); if (finished) --mCount; return finished; } template <typename Mutex, typename CondVar> template<class Clock, class Duration> bool basic_semaphore<Mutex, CondVar>::wait_until(const std::chrono::time_point<Clock, Duration>& t) { std::unique_lock<Mutex> lock{mMutex}; auto finished = mCv.wait_until(lock, t, [&]{ return mCount > 0; }); if (finished) --mCount; return finished; } template <typename Mutex, typename CondVar> typename basic_semaphore<Mutex, CondVar>::native_handle_type basic_semaphore<Mutex, CondVar>::native_handle() { return mCv.native_handle(); } 

根据posix信号量,我会补充一点

 class semaphore { ... bool trywait() { boost::mutex::scoped_lock lock(mutex_); if(count_) { --count_; return true; } else { return false; } } }; 

我更喜欢在方便的抽象级别使用同步机制,而不是总是使用更多基本的操作符来复制粘贴在一起的版本。

你也可以看看cpp11-on-multicore – 它有一个便携和最佳的信号量实现。

这个版本库还包含了补充c ++ 11线程的其他线程特性。

您可以使用互斥锁和条件变量。 您可以通过互斥体获得独占访问权限,检查您是要继续还是需要等待另一端。 如果您需要等待,您需要等待一段时间。 当另一个线程确定您可以继续时,它表示状态。

在boost :: thread库中有一个很简单的例子 ,你可能只是复制(C ++ 0x和boost线程库非常相似)。

也可以在线程中使用RAII信号量包装:

 class ScopedSemaphore { public: explicit ScopedSemaphore(Semaphore& sem) : m_Semaphore(sem) { m_Semaphore.Wait(); } ScopedSemaphore(const ScopedSemaphore&) = delete; ~ScopedSemaphore() { m_Semaphore.Notify(); } ScopedSemaphore& operator=(const ScopedSemaphore&) = delete; private: Semaphore& m_Semaphore; }; 

多线程应用程序中的使用示例:

 boost::ptr_vector<std::thread> threads; Semaphore semaphore; for (...) { ... auto t = new std::thread([..., &semaphore] { ScopedSemaphore scopedSemaphore(semaphore); ... } ); threads.push_back(t); } for (auto& t : threads) t.join(); 

我发现shared_ptr和weak_ptr,一个长列表,做了我需要的工作。 我的问题是,我有几个客户希望与主机的内部数据交互。 通常,主机自己更新数据,但是,如果客户端请求了数据,则主机需要停止更新,直到没有客户端访问主机数据为止。 同时,客户端可以要求独占访问,以便其他客户端和主机不能修改该主机数据。

我如何做到这一点,我创建了一个结构:

 struct UpdateLock { typedef std::shared_ptr< UpdateLock > ptr; }; 

每个客户都有一个这样的成员:

 UpdateLock::ptr m_myLock; 

然后主机将有一个weak_ptr成员的排他性,以及非排他性锁的weak_ptrs列表:

 std::weak_ptr< UpdateLock > m_exclusiveLock; std::list< std::weak_ptr< UpdateLock > > m_locks; 

有一个功能来启用锁定,另一个功能是检查主机是否被锁定:

 UpdateLock::ptr LockUpdate( bool exclusive ); bool IsUpdateLocked( bool exclusive ) const; 

我测试LockUpdate,IsUpdateLocked中的锁,并定期在主机的Update例程中进行测试。 测试一个锁就像检查weak_ptr是否过期一样简单,并从m_locks列表中删除任何过期(我只在主机更新期间这样做),我可以检查列表是否为空; 同时,当客户端重置他们挂接的shared_ptr时,我会自动解锁,当客户端自动销毁时也会发生这种情况。

总的来说效果是,因为客户很少需要排他性(通常仅用于增加和删除),所以大部分时间LockUpdate(false)的请求,即非独占性的,只要(!m_exclusiveLock)成功。 并且LockUpdate(true)是一个排他性请求,只有当(!m_exclusiveLock)和(m_locks.empty())都成功。

可以添加一个队列来缓解排他锁和非排他锁之间的冲突,但是到目前为止我没有发生冲突,所以我打算等到发生这种情况时才添加解决方案(主要是我有一个真实世界的测试条件)。

到目前为止,这对我的需求是很好的。 我可以想象有必要扩大这个范围,以及一些扩大使用的问题,然而,这很快就实现了,而且只需要很少的定制代码。

如果有人对原子版本感兴趣,这里是实现。 性能预计比互斥和条件变量版本更好。

 class semaphore_atomic { public: void notify() { count_.fetch_add(1, std::memory_order_release); } void wait() { while (true) { int count = count_.load(std::memory_order_relaxed); if (count > 0) { if (count_.compare_exchange_weak(count, count-1, std::memory_order_acq_rel, std::memory_order_relaxed)) { break; } } } } bool try_wait() { int count = count_.load(std::memory_order_relaxed); if (count > 0) { if (count_.compare_exchange_strong(count, count-1, std::memory_order_acq_rel, std::memory_order_relaxed)) { return true; } } return false; } private: std::atomic_int count_{0}; };