线程池使用boost asio
我正在尝试使用boost :: asio创build一个有限的线程池类。 但是我被卡住了,有一个人可以帮助我。
唯一的问题是我应该减less的地方呢?
代码不能按预期方式工作。
问题是我不知道什么时候我的线程将完成执行,我将如何知道它已经返回到池中
#include <boost/asio.hpp> #include <iostream> #include <boost/thread/thread.hpp> #include <boost/bind.hpp> #include <boost/thread/mutex.hpp> #include <stack> using namespace std; using namespace boost; class ThreadPool { static int count; int NoOfThread; thread_group grp; mutex mutex_; asio::io_service io_service; int counter; stack<thread*> thStk ; public: ThreadPool(int num) { NoOfThread = num; counter = 0; mutex::scoped_lock lock(mutex_); if(count == 0) count++; else return; for(int i=0 ; i<num ; ++i) { thStk.push(grp.create_thread(boost::bind(&asio::io_service::run, &io_service))); } } ~ThreadPool() { io_service.stop(); grp.join_all(); } thread* getThread() { if(counter > NoOfThread) { cout<<"run out of threads \n"; return NULL; } counter++; thread* ptr = thStk.top(); thStk.pop(); return ptr; } }; int ThreadPool::count = 0; struct callable { void operator()() { cout<<"some task for thread \n"; } }; int main( int argc, char * argv[] ) { callable x; ThreadPool pool(10); thread* p = pool.getThread(); cout<<p->get_id(); //how i can assign some function to thread pointer ? //how i can return thread pointer after work done so i can add //it back to stack? return 0; }
总之,您需要用另一个函数来包装用户提供的任务:
- 调用用户函数或可调用对象。
- locking互斥锁并减less计数器。
我可能不了解这个线程池的所有要求。 因此,为了清楚起见,这是一个明确的列表,我相信是要求:
- 池pipe理线程的生命周期。 用户不应该能够删除驻留在池中的线程。
- 用户可以以非侵入方式将任务分配给池。
- 在分配任务时,如果池中的所有线程正在运行其他任务,则任务将被丢弃。
在我提供实施之前,我想强调一些要点:
- 一旦线程启动,它将运行,直到完成,取消或终止。 线程正在执行的函数不能被重新分配。 为了允许单个线程在其生命周期中执行多个函数,线程将希望使用将从队列读取的函数(例如
io_service::run()
,并且将可调用types发布到事件队列,如从io_service::post()
。 -
io_service::run()
返回io_service::run()
中没有挂起的工作,io_service
停止,或者线程正在运行的处理程序抛出exception。 为了防止在没有未完成的工作时返回io_serivce::run()
,可以使用io_service::work
类。 - 定义任务的types需求(即任务的types必须可以通过
object()
语法来调用),而不是需要一个types(即任务必须从process
inheritance),为用户提供更多的灵活性。 它允许用户提供一个任务作为一个函数指针或一个提供无符号operator()
的typesoperator()
。
使用boost::asio
:
#include <boost/asio.hpp> #include <boost/thread.hpp> class thread_pool { private: boost::asio::io_service io_service_; boost::asio::io_service::work work_; boost::thread_group threads_; std::size_t available_; boost::mutex mutex_; public: /// @brief Constructor. thread_pool( std::size_t pool_size ) : work_( io_service_ ), available_( pool_size ) { for ( std::size_t i = 0; i < pool_size; ++i ) { threads_.create_thread( boost::bind( &boost::asio::io_service::run, &io_service_ ) ); } } /// @brief Destructor. ~thread_pool() { // Force all threads to return from io_service::run(). io_service_.stop(); // Suppress all exceptions. try { threads_.join_all(); } catch ( const std::exception& ) {} } /// @brief Adds a task to the thread pool if a thread is currently available. template < typename Task > void run_task( Task task ) { boost::unique_lock< boost::mutex > lock( mutex_ ); // If no threads are available, then return. if ( 0 == available_ ) return; // Decrement count, indicating thread is no longer available. --available_; // Post a wrapped task into the queue. io_service_.post( boost::bind( &thread_pool::wrap_task, this, boost::function< void() >( task ) ) ); } private: /// @brief Wrap a task so that the available count can be increased once /// the user provided task has completed. void wrap_task( boost::function< void() > task ) { // Run the user supplied task. try { task(); } // Suppress all exceptions. catch ( const std::exception& ) {} // Task has finished, so increment count of available threads. boost::unique_lock< boost::mutex > lock( mutex_ ); ++available_; } };
关于实施的一些评论:
- exception处理需要在用户的任务周围进行。 如果用户的函数或可调用对象抛出一个不是
boost::thread_interrupted
types的exception,则boost::thread_interrupted
std::terminate()
。 这是Boost.Thread 线程函数行为exception的结果。 这也是值得阅读Boost.Asio 从处理程序抛出exception的影响 。 - 如果用户通过
boost::bind
提供task
,则嵌套boost::bind
将无法编译。 以下选项之一是必需的:- 不支持由
boost::bind
创build的task
。 - 如果
boost::bind
的结果是boost::protect
,则可以使用元编程来根据用户的types执行编译时分支,因为boost::protect
只能在某些函数对象上正常运行。 - 使用另一种types间接传递
task
对象。 我select使用boost::function
来提高可读性,代价是失去了确切的types。boost::tuple
虽然可读性稍差,但也可用于保存确切types,如Boost.Asio的序列化示例所示。
- 不支持由
应用程序代码现在可以非干扰地使用thread_pool
types:
void work() {}; struct worker { void operator()() {}; }; void more_work( int ) {}; int main() { thread_pool pool( 2 ); pool.run_task( work ); // Function pointer. pool.run_task( worker() ); // Callable object. pool.run_task( boost::bind( more_work, 5 ) ); // Callable object. }
thread_pool
可以在没有Boost.Asio的情况下创build,而且对于维护者来说可能会稍微简单一些,因为他们不再需要了解Boost.Asio
行为,比如io_service::run()
何时返回,什么是io_service::work
对象:
#include <queue> #include <boost/bind.hpp> #include <boost/thread.hpp> class thread_pool { private: std::queue< boost::function< void() > > tasks_; boost::thread_group threads_; std::size_t available_; boost::mutex mutex_; boost::condition_variable condition_; bool running_; public: /// @brief Constructor. thread_pool( std::size_t pool_size ) : available_( pool_size ), running_( true ) { for ( std::size_t i = 0; i < pool_size; ++i ) { threads_.create_thread( boost::bind( &thread_pool::pool_main, this ) ) ; } } /// @brief Destructor. ~thread_pool() { // Set running flag to false then notify all threads. { boost::unique_lock< boost::mutex > lock( mutex_ ); running_ = false; condition_.notify_all(); } try { threads_.join_all(); } // Suppress all exceptions. catch ( const std::exception& ) {} } /// @brief Add task to the thread pool if a thread is currently available. template < typename Task > void run_task( Task task ) { boost::unique_lock< boost::mutex > lock( mutex_ ); // If no threads are available, then return. if ( 0 == available_ ) return; // Decrement count, indicating thread is no longer available. --available_; // Set task and signal condition variable so that a worker thread will // wake up andl use the task. tasks_.push( boost::function< void() >( task ) ); condition_.notify_one(); } private: /// @brief Entry point for pool threads. void pool_main() { while( running_ ) { // Wait on condition variable while the task is empty and the pool is // still running. boost::unique_lock< boost::mutex > lock( mutex_ ); while ( tasks_.empty() && running_ ) { condition_.wait( lock ); } // If pool is no longer running, break out. if ( !running_ ) break; // Copy task locally and remove from the queue. This is done within // its own scope so that the task object is destructed immediately // after running the task. This is useful in the event that the // function contains shared_ptr arguments bound via bind. { boost::function< void() > task = tasks_.front(); tasks_.pop(); lock.unlock(); // Run the task. try { task(); } // Suppress all exceptions. catch ( const std::exception& ) {} } // Task has finished, so increment count of available threads. lock.lock(); ++available_; } // while running_ } };