C ++ 11线程安全队列

我正在使用的一个项目使用多个线程来处理文件的集合。 每个线程都可以将文件添加到要处理的文件列表中,所以我放在一起(我以为是)一个线程安全的队列。 相关部分如下:

// qMutex is a std::mutex intended to guard the queue // populatedNotifier is a std::condition_variable intended to // notify waiting threads of a new item in the queue void FileQueue::enqueue(std::string&& filename) { std::lock_guard<std::mutex> lock(qMutex); q.push(std::move(filename)); // Notify anyone waiting for additional files that more have arrived populatedNotifier.notify_one(); } std::string FileQueue::dequeue(const std::chrono::milliseconds& timeout) { std::unique_lock<std::mutex> lock(qMutex); if (q.empty()) { if (populatedNotifier.wait_for(lock, timeout) == std::cv_status::no_timeout) { std::string ret = q.front(); q.pop(); return ret; } else { return std::string(); } } else { std::string ret = q.front(); q.pop(); return ret; } } 

然而,我偶尔会在if (...wait_for(lock, timeout) == std::cv_status::no_timeout) { }块中进行segfaulting,并且gdb中的检查表明由于队列为空而发生段错误。 这怎么可能? 这是我的理解, wait_for只有返回cv_status::no_timeout时,它已被通知,这应该只发生在FileQueue::enqueue刚刚推新项目的队列。

根据标准, condition_variablesvariables允许虚假地唤醒,即使事件没有发生。 在虚假唤醒的情况下,它将返回cv_status::no_timeout (因为它醒来而不是超时),即使它没有被通知。 正确的解决scheme当然是在进行之前检查唤醒是否真正合法。

细节在标准§30.5.1[thread.condition.condvar]中指定:

当notify_one()调用,notify_all()调用,abs_time指定的绝对超时(30.2.4)到期或虚假指定时,函数将会解除阻塞。

如果abs_time指定的绝对超时值(30.2.4)过期,则返回 cv_status :: timeout,否则返回 cv_status :: no_timeout。

看看它,当你检查一个条件variables时,最好使用一个while循环(所以如果它醒来,仍然无效,你再次检查)。 我只是写了一个asynchronous队列的模板,希望这有助于。

 #ifndef SAFE_QUEUE #define SAFE_QUEUE #include <queue> #include <mutex> #include <condition_variable> // A threadsafe-queue. template <class T> class SafeQueue { public: SafeQueue(void) : q() , m() , c() {} ~SafeQueue(void) {} // Add an element to the queue. void enqueue(T t) { std::lock_guard<std::mutex> lock(m); q.push(t); c.notify_one(); } // Get the "front"-element. // If the queue is empty, wait till a element is avaiable. T dequeue(void) { std::unique_lock<std::mutex> lock(m); while(q.empty()) { // release lock as long as the wait and reaquire it afterwards. c.wait(lock); } T val = q.front(); q.pop(); return val; } private: std::queue<T> q; mutable std::mutex m; std::condition_variable c; }; #endif 

这可能是你应该这样做的:

 void push(std::string&& filename) { { std::lock_guard<std::mutex> lock(qMutex); q.push(std::move(filename)); } populatedNotifier.notify_one(); } bool try_pop(std::string& filename, std::chrono::milliseconds timeout) { std::unique_lock<std::mutex> lock(qMutex); if(!populatedNotifier.wait_for(lock, timeout, [this] { return !q.empty(); })) return false; filename = std::move(q.front()); q.pop(); return true; } 

除了被接受的答案之外,我会说实现一个正确的多生产者/多消费者队列是困难的(尽pipeC ++ 11更容易)

我build议你尝试一下(非常好的)无锁的boost库 ,这个“queue”结构可以实现你想要的function, 无需等待/无锁保证, 而且不需要C ++ 11编译器 。

我现在添加这个答案,因为无锁库是非常新的提升(因为我相信1.53)

我会重写你的出队function:

 std::string FileQueue::dequeue(const std::chrono::milliseconds& timeout) { std::unique_lock<std::mutex> lock(qMutex); while(q.empty()) { if (populatedNotifier.wait_for(lock, timeout) == std::cv_status::timeout ) return std::string(); } std::string ret = q.front(); q.pop(); return ret; } 

它更短,并没有像你所做的重复的代码。 只有发出它可能会等待更长的时间。 为了防止在循环之前需要记住启动时间,请检查超时并相应地调整等待时间。 或者在等待条件下指定绝对时间。

我前一段时间遇到同样的问题,然后在C ++ 11中编写一个GNU线程安全asynchronous队列模板。 我在我的博客发布:

http://gnodebian.blogspot.com.es/2013/07/a-thread-safe-asynchronous-queue-in-c11.html

对于这种情况,也有GLib解决scheme,我还没有尝试,但我相信这是一个很好的解决scheme。 https://developer.gnome.org/glib/2.36/glib-Asynchronous-Queues.html#g-async-queue-new