线程池使用boost asio

我正在尝试使用boost :: asio创build一个有限的线程池类。 但是我被卡住了,有一个人可以帮助我。

唯一的问题是我应该减less的地方呢?

代码不能按预期方式工作。

问题是我不知道什么时候我的线程将完成执行,我将如何知道它已经返回到池中

#include <boost/asio.hpp> #include <iostream> #include <boost/thread/thread.hpp> #include <boost/bind.hpp> #include <boost/thread/mutex.hpp> #include <stack> using namespace std; using namespace boost; class ThreadPool { static int count; int NoOfThread; thread_group grp; mutex mutex_; asio::io_service io_service; int counter; stack<thread*> thStk ; public: ThreadPool(int num) { NoOfThread = num; counter = 0; mutex::scoped_lock lock(mutex_); if(count == 0) count++; else return; for(int i=0 ; i<num ; ++i) { thStk.push(grp.create_thread(boost::bind(&asio::io_service::run, &io_service))); } } ~ThreadPool() { io_service.stop(); grp.join_all(); } thread* getThread() { if(counter > NoOfThread) { cout<<"run out of threads \n"; return NULL; } counter++; thread* ptr = thStk.top(); thStk.pop(); return ptr; } }; int ThreadPool::count = 0; struct callable { void operator()() { cout<<"some task for thread \n"; } }; int main( int argc, char * argv[] ) { callable x; ThreadPool pool(10); thread* p = pool.getThread(); cout<<p->get_id(); //how i can assign some function to thread pointer ? //how i can return thread pointer after work done so i can add //it back to stack? return 0; } 

总之,您需要用另一个函数来包装用户提供的任务:

  • 调用用户函数或可调用对象。
  • locking互斥锁并减less计数器。

我可能不了解这个线程池的所有要求。 因此,为了清楚起见,这是一个明确的列表,我相信是要求:

  • 池pipe理线程的生命周期。 用户不应该能够删除驻留在池中的线​​程。
  • 用户可以以非侵入方式将任务分配给池。
  • 在分配任务时,如果池中的所有线程正在运行其他任务,则任务将被丢弃。

在我提供实施之前,我想强调一些要点:

  • 一旦线程启动,它将运行,直到完成,取消或终止。 线程正在执行的函数不能被重新分配。 为了允许单个线程在其生命周期中执行多个函数,线程将希望使用将从队列读取的函数(例如io_service::run() ,并且将可调用types发布到事件队列,如从io_service::post()
  • io_service::run()返回io_service::run()中没有挂起的工作, io_service停止,或者线程正在运行的处理程序抛出exception。 为了防止在没有未完成的工作时返回io_serivce::run() ,可以使用io_service::work类。
  • 定义任务的types需求(即任务的types必须可以通过object()语法来调用),而不是需要一个types(即任务必须从processinheritance),为用户提供更多的灵活性。 它允许用户提供一个任务作为一个函数指针或一个提供无符号operator()的typesoperator()

使用boost::asio

 #include <boost/asio.hpp> #include <boost/thread.hpp> class thread_pool { private: boost::asio::io_service io_service_; boost::asio::io_service::work work_; boost::thread_group threads_; std::size_t available_; boost::mutex mutex_; public: /// @brief Constructor. thread_pool( std::size_t pool_size ) : work_( io_service_ ), available_( pool_size ) { for ( std::size_t i = 0; i < pool_size; ++i ) { threads_.create_thread( boost::bind( &boost::asio::io_service::run, &io_service_ ) ); } } /// @brief Destructor. ~thread_pool() { // Force all threads to return from io_service::run(). io_service_.stop(); // Suppress all exceptions. try { threads_.join_all(); } catch ( const std::exception& ) {} } /// @brief Adds a task to the thread pool if a thread is currently available. template < typename Task > void run_task( Task task ) { boost::unique_lock< boost::mutex > lock( mutex_ ); // If no threads are available, then return. if ( 0 == available_ ) return; // Decrement count, indicating thread is no longer available. --available_; // Post a wrapped task into the queue. io_service_.post( boost::bind( &thread_pool::wrap_task, this, boost::function< void() >( task ) ) ); } private: /// @brief Wrap a task so that the available count can be increased once /// the user provided task has completed. void wrap_task( boost::function< void() > task ) { // Run the user supplied task. try { task(); } // Suppress all exceptions. catch ( const std::exception& ) {} // Task has finished, so increment count of available threads. boost::unique_lock< boost::mutex > lock( mutex_ ); ++available_; } }; 

关于实施的一些评论:

  • exception处理需要在用户的任务周围进行。 如果用户的函数或可调用对象抛出一个不是boost::thread_interruptedtypes的exception,则boost::thread_interrupted std::terminate() 。 这是Boost.Thread 线程函数行为exception的结果。 这也是值得阅读Boost.Asio 从处理程序抛出exception的影响 。
  • 如果用户通过boost::bind提供task ,则嵌套boost::bind将无法编译。 以下选项之一是必需的:
    • 不支持由boost::bind创build的task
    • 如果boost::bind的结果是boost::protect ,则可以使用元编程来根据用户的types执行编译时分支,因为boost::protect只能在某些函数对象上正常运行。
    • 使用另一种types间接传递task对象。 我select使用boost::function来提高可读性,代价是失去了确切的types。 boost::tuple虽然可读性稍差,但也可用于保存确切types,如Boost.Asio的序列化示例所示。

应用程序代码现在可以非干扰地使用thread_pooltypes:

 void work() {}; struct worker { void operator()() {}; }; void more_work( int ) {}; int main() { thread_pool pool( 2 ); pool.run_task( work ); // Function pointer. pool.run_task( worker() ); // Callable object. pool.run_task( boost::bind( more_work, 5 ) ); // Callable object. } 

thread_pool可以在没有Boost.Asio的情况下创build,而且对于维护者来说可能会稍微简单一些,因为他们不再需要了解Boost.Asio行为,比如io_service::run()何时返回,什么是io_service::work对象:

 #include <queue> #include <boost/bind.hpp> #include <boost/thread.hpp> class thread_pool { private: std::queue< boost::function< void() > > tasks_; boost::thread_group threads_; std::size_t available_; boost::mutex mutex_; boost::condition_variable condition_; bool running_; public: /// @brief Constructor. thread_pool( std::size_t pool_size ) : available_( pool_size ), running_( true ) { for ( std::size_t i = 0; i < pool_size; ++i ) { threads_.create_thread( boost::bind( &thread_pool::pool_main, this ) ) ; } } /// @brief Destructor. ~thread_pool() { // Set running flag to false then notify all threads. { boost::unique_lock< boost::mutex > lock( mutex_ ); running_ = false; condition_.notify_all(); } try { threads_.join_all(); } // Suppress all exceptions. catch ( const std::exception& ) {} } /// @brief Add task to the thread pool if a thread is currently available. template < typename Task > void run_task( Task task ) { boost::unique_lock< boost::mutex > lock( mutex_ ); // If no threads are available, then return. if ( 0 == available_ ) return; // Decrement count, indicating thread is no longer available. --available_; // Set task and signal condition variable so that a worker thread will // wake up andl use the task. tasks_.push( boost::function< void() >( task ) ); condition_.notify_one(); } private: /// @brief Entry point for pool threads. void pool_main() { while( running_ ) { // Wait on condition variable while the task is empty and the pool is // still running. boost::unique_lock< boost::mutex > lock( mutex_ ); while ( tasks_.empty() && running_ ) { condition_.wait( lock ); } // If pool is no longer running, break out. if ( !running_ ) break; // Copy task locally and remove from the queue. This is done within // its own scope so that the task object is destructed immediately // after running the task. This is useful in the event that the // function contains shared_ptr arguments bound via bind. { boost::function< void() > task = tasks_.front(); tasks_.pop(); lock.unlock(); // Run the task. try { task(); } // Suppress all exceptions. catch ( const std::exception& ) {} } // Task has finished, so increment count of available threads. lock.lock(); ++available_; } // while running_ } };