boost :: asio :: io_service运行方法块/解锁时混淆

作为Boost.Asio的初学者,我很困惑io_service::run() 。 我将不胜感激,如果有人可以向我解释,当这种方法阻止/解锁。 文件指出:

run()函数阻塞,直到所有的工作完成,并且没有更多的处理程序被分派,或者直到io_service被停止。

多个线程可能会调用run()函数来设置一个线程池, io_service可以从中执行处理程序。 在池中等待的所有线程都是相等的, io_service可以select其中的任何一个来调用处理程序。

run()函数的正常退出意味着io_service对象被停止( stopped()函数返回true)。 除非事先调用reset()否则对run()run_one()poll()poll_one()后续调用将立即返回。

以下声明是什么意思?

没有更多的处理者被派遣[…]


在试图理解io_service::run()的行为的时候,我遇到了这个例子 (例3a)。 在其中,我观察到io_service->run()阻塞并等待工作订单。

 // WorkerThread invines io_service->run() void WorkerThread(boost::shared_ptr<boost::asio::io_service> io_service); void CalculateFib(size_t); boost::shared_ptr<boost::asio::io_service> io_service( new boost::asio::io_service); boost::shared_ptr<boost::asio::io_service::work> work( new boost::asio::io_service::work(*io_service)); // ... boost::thread_group worker_threads; for(int x = 0; x < 2; ++x) { worker_threads.create_thread(boost::bind(&WorkerThread, io_service)); } io_service->post( boost::bind(CalculateFib, 3)); io_service->post( boost::bind(CalculateFib, 4)); io_service->post( boost::bind(CalculateFib, 5)); work.reset(); worker_threads.join_all(); 

但是,在我正在处理的以下代码中,客户端使用TCP / IP和运行方法块进行连接,直到asynchronous接收数据为止。

 typedef boost::asio::ip::tcp tcp; boost::shared_ptr<boost::asio::io_service> io_service( new boost::asio::io_service); boost::shared_ptr<tcp::socket> socket(new tcp::socket(*io_service)); // Connect to 127.0.0.1:9100. tcp::resolver resolver(*io_service); tcp::resolver::query query("127.0.0.1", boost::lexical_cast< std::string >(9100)); tcp::resolver::iterator endpoint_iterator = resolver.resolve(query); socket->connect(endpoint_iterator->endpoint()); // Just blocks here until a message is received. socket->async_receive(boost::asio::buffer(buf_client, 3000), 0, ClientReceiveEvent); io_service->run(); // Write response. boost::system::error_code ignored_error; std::cout << "Sending message \n"; boost::asio::write(*socket, boost::asio::buffer("some data"), ignored_error); 

不胜感激,在下面两个例子中描述其行为的run()任何解释。

基础

让我们从一个简单的例子开始,检查相关的Boost.Asio作品:

 void handle_async_receive(...) { ... } void print() { ... } ... boost::asio::io_service io_service; boost::asio::ip::tcp::socket socket(io_service); ... io_service.post(&print); // 1 socket.connect(endpoint); // 2 socket.async_receive(buffer, &handle_async_receive); // 3 io_service.post(&print); // 4 io_service.run(); // 5 

什么是处理程序

处理程序只不过是一个callback。 在示例代码中,有3个处理程序:

  • print处理程序(1)。
  • handle_async_receive处理程序(3)。
  • print处理程序(4)。

即使使用相同的print()函数两次,每个使用都被认为是创build自己的唯一可识别的处理程序。 处理程序可以有多种forms和大小,从上面的基本函数到更复杂的构造,比如boost::bind()和lambdaexpression式生成的函子。 不pipe复杂性如何,处理程序仍然只是一个callback。

什么是工作

工作是Boost.Asio被要求代表应用程序代码进行的一些处理。 有时,Boost.Asio可能会在被告知后立即启动一些工作,有时候可能会等待稍后的工作。 一旦完成工作,Boost.Asio将通过调用提供的处理程序来通知应用程序

Boost.Asio保证处理程序只能在当前正在调用run()run_one()poll()poll_one()的线程中run() 。 这些是可以工作和调用处理程序的线程。 因此,在上面的例子中, print()在被发布到io_service (1)中时不被调用。 相反,它被添加到io_service并在以后的时间点被调用。 在这种情况下,它在io_service.run() (5)中。

什么是asynchronous操作?

一个asynchronous操作创build工作,Boost.Asio将调用一个处理程序通知应用程序何时完成工作。 asynchronous操作是通过调用名称前缀为async_的函数来创build的。 这些function也被称为启动function

asynchronous操作可以分解为三个独特的步骤:

  • 启动或通知需要完成的关联io_serviceasync_receive操作(3)通知io_service需要从套接字asynchronous读取数据,然后async_receive立即返回。
  • 做实际的工作。 在这种情况下,当socket接收到数据时,字节将被读取并复制到buffer 。 实际工作将在以下任一方面完成:
    • 启动函数(3),如果Boost.Asio可以确定它不会被阻塞。
    • 当应用程序显式运行io_service (5)时。
  • 调用handle_async_receive ReadHandler 。 再一次, 处理程序只在运行io_service线程中调用。 因此,不pipe工作完成的时间(3或5),只能在io_service.run() (5)中调用handle_async_receive() )。

这三个步骤之间的时间和空间分离被称为控制stream反演。 这是asynchronous编程困难的一个复杂性。 但是,有些技术可以帮助缓解这一点,比如使用协程 。

io_service.run()做什么?

当一个线程调用io_service.run() ,将在这个线程中调用工作和处理程序 。 在上面的例子中, io_service.run() (5)将阻塞,直到:

  • 它从两个print处理程序中调用并返回,接收操作以成功或失败结束,并且其handle_async_receive处理程序已被调用并返回。
  • io_service通过io_service::stop()明确停止。
  • 从处理程序中抛出exception。

一个潜在的虚拟stream程可以描述如下:

创buildio_service
创build套接字
添加打印处理程序到io_service(1)
等待socket连接(2)
将一个asynchronous读取工作请求添加到io_service(3)
添加打印处理程序到io_service(4)
运行io_service(5)
  有没有工作或处理?
    是的,有1个工作和2个处理程序
      套接字有数据吗? 不,不要做任何事情
      运行打印处理程序(1)
  有没有工作或处理?
    是的,有1个工作和1个处理程序
      套接字有数据吗? 不,不要做任何事情
      运行打印处理程序(4)
  有没有工作或处理?
    是的,有1项工作
      套接字有数据吗? 不,继续等待
   - 套接字接收数据 - 
      套接字有数据,读入缓冲区
      将handle_async_receive处理程序添加到io_service
  有没有工作或处理?
    是的,有一个处理程序
      运行handle_async_receive处理程序(3)
  有没有工作或处理?
    不,将io_service设置为停止并返回 

请注意读取完成后,如何向io_service添加另一个处理程序 。 这个细微的细节是asynchronous编程的一个重要特征。 它允许处理程序被链接在一起。 例如,如果handle_async_receive没有获得它所期望的所有数据,那么它的实现可能会发布另一个asynchronous读取操作,导致io_service有更多的工作,因此不会从io_service.run()返回。

请注意,当io_service运行不正常时,应用程序必须在reset()运行之前reset() io_service


示例问题和示例3a代码

现在,我们来看看问题中引用的两段代码。

问题代码

socket->async_receive将工作添加到io_service 。 因此, io_service->run()会阻塞,直到读取操作成功或错误完成,并且ClientReceiveEvent已经完成运行或抛出exception。

示例3a代码

为了更容易理解,下面是一个更小的注释示例3a:

 void CalculateFib(std::size_t n); int main() { boost::asio::io_service io_service; boost::optional<boost::asio::io_service::work> work = // '. 1 boost::in_place(boost::ref(io_service)); // .' boost::thread_group worker_threads; // -. for(int x = 0; x < 2; ++x) // : { // '. worker_threads.create_thread( // :- 2 boost::bind(&boost::asio::io_service::run, &io_service) // .' ); // : } // -' io_service.post(boost::bind(CalculateFib, 3)); // '. io_service.post(boost::bind(CalculateFib, 4)); // :- 3 io_service.post(boost::bind(CalculateFib, 5)); // .' work = boost::none; // 4 worker_threads.join_all(); // 5 } 

在高层次上,程序将创build2个线程来处理io_service的事件循环(2)。 这导致一个简单的线程池,将计算斐波纳契数字(3)。

问题代码和这个代码之间的一个主要区别在于, 实际的工作和处理程序被添加到io_service (3) 之前 ,这个代码调用io_service::run() (2)。 为了防止立即返回io_service::run() ,创build了一个io_service::work对象(1)。 这个对象可以防止io_service不能工作。 因此, io_service::run()不会因为没有工作而返回。

整体stream程如下:

  1. 创build并添加添加到io_service::work对象。
  2. 创build调用io_service::run()线程池。 由于io_service::work对象,这些工作线程不会从io_service返回。
  3. 将3个计算斐波那契数的处理程序添加到io_service ,并立即返回。 工作线程,而不是主线程,可能会立即开始运行这些处理程序。
  4. 删除io_service::work对象。
  5. 等待工作线程完成运行。 这只会在所有3个处理程序完成执行后才会发生,因为io_service既没有处理程序也没有工作。

代码可以写成不同的方式,就像处理程序添加到io_service的原始代码一样,然后处理io_service事件循环。 这消除了使用io_service::work的需要,并且得到以下代码:

 int main() { boost::asio::io_service io_service; io_service.post(boost::bind(CalculateFib, 3)); // '. io_service.post(boost::bind(CalculateFib, 4)); // :- 3 io_service.post(boost::bind(CalculateFib, 5)); // .' boost::thread_group worker_threads; // -. for(int x = 0; x < 2; ++x) // : { // '. worker_threads.create_thread( // :- 2 boost::bind(&boost::asio::io_service::run, &io_service) // .' ); // : } // -' worker_threads.join_all(); // 5 } 

同步与asynchronous

尽pipe问题中的代码使用了asynchronous操作,但它正在同步运行,因为它正在等待asynchronous操作完成:

 socket.async_receive(buffer, handler) io_service.run(); 

相当于:

 boost::asio::error_code error; std::size_t bytes_transferred = socket.receive(buffer, 0, error); handler(error, bytes_transferred); 

一般来说,尽量避免混合同步和asynchronous操作。 往往会把复杂的系统变成一个复杂的系统。 这个答案突出了asynchronous编程的优点,其中一些也包含在Boost.Asio 文档中 。

为了简化run ,把它看作是一个必须处理一堆纸的员工; 它只需要一张纸,就是纸张所告诉的,把纸张扔掉,拿走下一张; 当他用完床单时,就离开了办公室。 在每张纸上都可以有任何种类的说明,甚至可以添加一张新的纸张。 回到asio:你可以通过两种方式来实现io_service工作,本质上是:通过在链接的示例中使用post ,或者使用内部调用io_service上的post其他对象,如socket和它的async_*方法。