线程池在C + + 11

相关问题

关于C ++ 11:

  • C + + 11:std ::线程池?
  • C ++ 11中的async(launch :: async)是否会使线程池过时,以避免昂贵的线程创build?

关于Boost:

  • C ++ boost线程重用线程
  • boost ::线程并创build它们的池!

我如何获得一个线程池发送任务 ,而不是一遍又一遍地创build和删除它们? 这意味着持久线程不join就重新同步。


我有这样的代码:

namespace { std::vector<std::thread> workers; int total = 4; int arr[4] = {0}; void each_thread_does(int i) { arr[i] += 2; } } int main(int argc, char *argv[]) { for (int i = 0; i < 8; ++i) { // for 8 iterations, for (int j = 0; j < 4; ++j) { workers.push_back(std::thread(each_thread_does, j)); } for (std::thread &t: workers) { if (t.joinable()) { t.join(); } } arr[4] = std::min_element(arr, arr+4); } return 0; } 

我不想每次迭代创build和连接线程,而是每次迭代都向我的工作线程发送任务,只创build一次。

线程池意味着所有的线程都在运行,换句话说,线程函数永远不会返回。 为了给线程做一些有意义的事情,你必须devise一个线程间通信系统,以便告诉线程有什么需要做的事情,以及传达实际的工作数据。

通常这将涉及某种并发的数据结构,每个线程可能会睡在某种条件variables上,当有工作要做时会通知它们。 在收到通知后,一个或多个线程唤醒,从并发数据结构中恢复一个任务,处理它,并以类似的方式存储结果。

然后线程会继续检查是否还有更多的工作要做,如果不能回去睡觉。

结果是你必须自己devise所有这些,因为没有一个普遍适用的“工作”的自然概念。 这是相当多的工作,有一些微妙的问题,你必须得到正确的。 (如果你喜欢一个系统为你在后台pipe理线程pipe理,那么你可以用Go编程。)

您可以使用C ++线程池库, https://github.com/vit-vit/ctpl

那么你写的代码可以用下面的代码replace

 #include <ctpl.h> // or <ctpl_stl.h> if ou do not have Boost library int main (int argc, char *argv[]) { ctpl::thread_pool p(2 /* two threads in the pool */); int arr[4] = {0}; std::vector<std::future<void>> results(4); for (int i = 0; i < 8; ++i) { // for 8 iterations, for (int j = 0; j < 4; ++j) { results[j] = p.push([&arr, j](int){ arr[j] +=2; }); } for (int j = 0; j < 4; ++j) { results[j].get(); } arr[4] = std::min_element(arr, arr + 4); } } 

您将获得所需的线程数,并且不会在迭代中一遍又一遍地创build和删除它们。

这是从我的答案复制到另一个非常类似的职位,希望可以帮助:

1)从系统可以支持的最大线程数开始:

 int Num_Threads = thread::hardware_concurrency(); 

2)对于高效的线程池实现,一旦线程根据Num_Threads创build,最好不要创build新线程,或者通过join来破坏旧线程。 性能会受到影响,甚至可能使您的应用程序比串行版本慢。

每个C ++ 11线程都应该以无限循环的方式运行,不断等待新任务的抓取和运行。

这里是如何将这样的函数附加到线程池:

 int Num_Threads = thread::hardware_concurrency(); vector<thread> Pool; for(int ii = 0; ii < Num_Threads; ii++) { Pool.push_back(thread(Infinite_loop_function));} 

3)Infinite_loop_function

这是等待任务队列的“while(true)”循环

 void The_Pool:: Infinite_loop_function() { while(true) { { unique_lock<mutex> lock(Queue_Mutex); condition.wait(lock, []{return !Queue.empty()}); Job = Queue.front(); Queue.pop(); } Job(); // function<void()> type } }; 

4)创build一个function,将作业添加到队列中

 void The_Pool:: Add_Job(function<void()> New_Job) { { unique_lock<mutex> lock(Queue_Mutex); Queue.push(New_Job); } condition.notify_one(); } 

5)绑定任意函数到你的队列

 Pool_Obj.Add_Job(std::bind(&Some_Class::Some_Method, &Some_object)); 

一旦你整合这些成分,你有自己的dynamic线程池。 这些线程总是运行,等待工作。

我很抱歉,如果有一些语法错误,我键入这些代码,而且我有一个不好的记忆。 对不起,我不能为您提供完整的线程池代码,这将违反我的工作完整性。

一个线程池在核心是一组线程,它们都绑定到一个作为事件循环的函数上。 这些线程将无休止地等待任务被执行,或者自己终止。

线程池作业是提供一个接口来提交作业,定义(也许是修改)运行这些作业的策略(调度规则,线程实例化,池的大小),并监视线程和相关资源的状态。

因此,对于一个多function的游泳池,人们必须首先确定一个任务是什么,如何启动,中断,结果是什么(看到这个问题的承诺和未来的概念),线程将不得不回应什么样的事件他们将如何处理这些事件,如何将这些事件与任务处理的事件区别开来。 如您所见,这可能会变得相当复杂,并且会对线程的工作方式施加限制,因为解决scheme变得越来越复杂。

当前用于处理事件的工具是相当准系统(*):像互斥体,条件variables和几个抽象(锁,障碍)之类的原语。 但是,在某些情况下,这些抽象可能会变得不合适(见这个相关的问题 ),而且必须回到使用原语。

其他问题也必须pipe理:

  • 信号
  • I / O
  • 硬件(处理器亲和力,异构设置)

这些如何在你的设置中发挥出来?

这个类似的问题的答案指向了一个现有的实现意味着提升和stl。

我提供了一个针对另一个问题的线程池的非常粗糙的实现 ,它没有解决上面列出的许多问题。 你可能想要build立它。 你也许还想看看其他语言的现有框架,寻找灵感。


(*)我不认为这是一个问题,恰恰相反。 我认为这是从Cinheritance的C ++的精神。

像这样的东西可能会有帮助(从一个工作的应用程序采取)。

 #include <memory> #include <boost/asio.hpp> #include <boost/thread.hpp> struct thread_pool { typedef std::unique_ptr<boost::asio::io_service::work> asio_worker; thread_pool(int threads) :service(), service_worker(new asio_worker::element_type(service)) { for (int i = 0; i < threads; ++i) { auto worker = [this] { return service.run(); }; grp.add_thread(new boost::thread(worker)); } } template<class F> void enqueue(F f) { service.post(f); } ~thread_pool() { service_worker.reset(); grp.join_all(); service.stop(); } private: boost::asio::io_service service; asio_worker service_worker; boost::thread_group grp; }; 

你可以像这样使用它:

 thread_pool pool(2); pool.enqueue([] { std::cout << "Hello from Task 1\n"; }); pool.enqueue([] { std::cout << "Hello from Task 2\n"; }); 

请记住,重新创build一个高效的asynchronous排队机制并不是微不足道的。

Boost :: asio :: io_service是一个非常高效的实现,或者实际上是一个特定于平台的包装器的集合(例如它包装Windows上的I / O完成端口)。

编辑:这现在需要C ++ 17和概念。 (截至9/12/16,只有g ++ 6.0+就足够了。)

但是,模板演绎因为它更精确,所以值得努力获得更新的编译器。 我还没有find一个需要显式模板参数的函数。

它现在也采取任何适当的可调用对象( 并仍然静态types安全!!! )。

它现在还包含使用相同API的可选绿色线程优先级线程池。 不过这个类只是POSIX。 它使用ucontext_t API进行用户空间任务切换。


我为此创build了一个简单的库。 下面给出了一个使用的例子。 (我正在回答这个问题,因为在我决定自己写这篇文章之前,这是我发现的东西之一。)

 bool is_prime(int n){ // Determine if n is prime. } int main(){ thread_pool pool(8); // 8 threads list<future<bool>> results; for(int n = 2;n < 10000;n++){ // Submit a job to the pool. results.emplace_back(pool.async(is_prime, n)); } int n = 2; for(auto i = results.begin();i != results.end();i++, n++){ // i is an iterator pointing to a future representing the result of is_prime(n) cout << n << " "; bool prime = i->get(); // Wait for the task is_prime(n) to finish and get the result. if(prime) cout << "is prime"; else cout << "is not prime"; cout << endl; } } 

你可以通过任何(或无效)返回值和任何(或不)parameter passingasync任何函数,它将返回相应的std::future 。 为了得到结果(或者等到一个任务完成),你get()在将来调用get()

这里是github: https : //github.com/Tyler-Hardin/thread_pool 。

在STL之外没有依赖关系的线程池是完全可能的。 我最近写了一个小的头只有线程池库来解决完全相同的问题。 它支持dynamic池resize(在运行时更改工作人员数量),等待,停止,暂停,恢复等。 希望对你有帮助。