boost :: asio :: io_service运行方法块/解锁时混淆
作为Boost.Asio的初学者,我很困惑io_service::run()
。 我将不胜感激,如果有人可以向我解释,当这种方法阻止/解锁。 文件指出:
run()
函数阻塞,直到所有的工作完成,并且没有更多的处理程序被分派,或者直到io_service
被停止。多个线程可能会调用
run()
函数来设置一个线程池,io_service
可以从中执行处理程序。 在池中等待的所有线程都是相等的,io_service
可以select其中的任何一个来调用处理程序。
run()
函数的正常退出意味着io_service
对象被停止(stopped()
函数返回true)。 除非事先调用reset()
否则对run()
,run_one()
,poll()
或poll_one()
后续调用将立即返回。
以下声明是什么意思?
没有更多的处理者被派遣[…]
在试图理解io_service::run()
的行为的时候,我遇到了这个例子 (例3a)。 在其中,我观察到io_service->run()
阻塞并等待工作订单。
// WorkerThread invines io_service->run() void WorkerThread(boost::shared_ptr<boost::asio::io_service> io_service); void CalculateFib(size_t); boost::shared_ptr<boost::asio::io_service> io_service( new boost::asio::io_service); boost::shared_ptr<boost::asio::io_service::work> work( new boost::asio::io_service::work(*io_service)); // ... boost::thread_group worker_threads; for(int x = 0; x < 2; ++x) { worker_threads.create_thread(boost::bind(&WorkerThread, io_service)); } io_service->post( boost::bind(CalculateFib, 3)); io_service->post( boost::bind(CalculateFib, 4)); io_service->post( boost::bind(CalculateFib, 5)); work.reset(); worker_threads.join_all();
但是,在我正在处理的以下代码中,客户端使用TCP / IP和运行方法块进行连接,直到asynchronous接收数据为止。
typedef boost::asio::ip::tcp tcp; boost::shared_ptr<boost::asio::io_service> io_service( new boost::asio::io_service); boost::shared_ptr<tcp::socket> socket(new tcp::socket(*io_service)); // Connect to 127.0.0.1:9100. tcp::resolver resolver(*io_service); tcp::resolver::query query("127.0.0.1", boost::lexical_cast< std::string >(9100)); tcp::resolver::iterator endpoint_iterator = resolver.resolve(query); socket->connect(endpoint_iterator->endpoint()); // Just blocks here until a message is received. socket->async_receive(boost::asio::buffer(buf_client, 3000), 0, ClientReceiveEvent); io_service->run(); // Write response. boost::system::error_code ignored_error; std::cout << "Sending message \n"; boost::asio::write(*socket, boost::asio::buffer("some data"), ignored_error);
不胜感激,在下面两个例子中描述其行为的run()
任何解释。
基础
让我们从一个简单的例子开始,检查相关的Boost.Asio作品:
void handle_async_receive(...) { ... } void print() { ... } ... boost::asio::io_service io_service; boost::asio::ip::tcp::socket socket(io_service); ... io_service.post(&print); // 1 socket.connect(endpoint); // 2 socket.async_receive(buffer, &handle_async_receive); // 3 io_service.post(&print); // 4 io_service.run(); // 5
什么是处理程序 ?
处理程序只不过是一个callback。 在示例代码中,有3个处理程序:
-
print
处理程序(1)。 -
handle_async_receive
处理程序(3)。 -
print
处理程序(4)。
即使使用相同的print()
函数两次,每个使用都被认为是创build自己的唯一可识别的处理程序。 处理程序可以有多种forms和大小,从上面的基本函数到更复杂的构造,比如boost::bind()
和lambdaexpression式生成的函子。 不pipe复杂性如何,处理程序仍然只是一个callback。
什么是工作 ?
工作是Boost.Asio被要求代表应用程序代码进行的一些处理。 有时,Boost.Asio可能会在被告知后立即启动一些工作,有时候可能会等待稍后的工作。 一旦完成工作,Boost.Asio将通过调用提供的处理程序来通知应用程序 。
Boost.Asio保证处理程序只能在当前正在调用run()
, run_one()
, poll()
或poll_one()
的线程中run()
。 这些是可以工作和调用处理程序的线程。 因此,在上面的例子中, print()
在被发布到io_service
(1)中时不被调用。 相反,它被添加到io_service
并在以后的时间点被调用。 在这种情况下,它在io_service.run()
(5)中。
什么是asynchronous操作?
一个asynchronous操作创build工作,Boost.Asio将调用一个处理程序通知应用程序何时完成工作。 asynchronous操作是通过调用名称前缀为async_
的函数来创build的。 这些function也被称为启动function 。
asynchronous操作可以分解为三个独特的步骤:
- 启动或通知需要完成的关联
io_service
。async_receive
操作(3)通知io_service
需要从套接字asynchronous读取数据,然后async_receive
立即返回。 - 做实际的工作。 在这种情况下,当
socket
接收到数据时,字节将被读取并复制到buffer
。 实际工作将在以下任一方面完成:- 启动函数(3),如果Boost.Asio可以确定它不会被阻塞。
- 当应用程序显式运行
io_service
(5)时。
- 调用
handle_async_receive
ReadHandler 。 再一次, 处理程序只在运行io_service
线程中调用。 因此,不pipe工作完成的时间(3或5),只能在io_service.run()
(5)中调用handle_async_receive()
)。
这三个步骤之间的时间和空间分离被称为控制stream反演。 这是asynchronous编程困难的一个复杂性。 但是,有些技术可以帮助缓解这一点,比如使用协程 。
io_service.run()
做什么?
当一个线程调用io_service.run()
,将在这个线程中调用工作和处理程序 。 在上面的例子中, io_service.run()
(5)将阻塞,直到:
- 它从两个
print
处理程序中调用并返回,接收操作以成功或失败结束,并且其handle_async_receive
处理程序已被调用并返回。 -
io_service
通过io_service::stop()
明确停止。 - 从处理程序中抛出exception。
一个潜在的虚拟stream程可以描述如下:
创buildio_service 创build套接字 添加打印处理程序到io_service(1) 等待socket连接(2) 将一个asynchronous读取工作请求添加到io_service(3) 添加打印处理程序到io_service(4) 运行io_service(5) 有没有工作或处理? 是的,有1个工作和2个处理程序 套接字有数据吗? 不,不要做任何事情 运行打印处理程序(1) 有没有工作或处理? 是的,有1个工作和1个处理程序 套接字有数据吗? 不,不要做任何事情 运行打印处理程序(4) 有没有工作或处理? 是的,有1项工作 套接字有数据吗? 不,继续等待 - 套接字接收数据 - 套接字有数据,读入缓冲区 将handle_async_receive处理程序添加到io_service 有没有工作或处理? 是的,有一个处理程序 运行handle_async_receive处理程序(3) 有没有工作或处理? 不,将io_service设置为停止并返回
请注意读取完成后,如何向io_service
添加另一个处理程序 。 这个细微的细节是asynchronous编程的一个重要特征。 它允许处理程序被链接在一起。 例如,如果handle_async_receive
没有获得它所期望的所有数据,那么它的实现可能会发布另一个asynchronous读取操作,导致io_service
有更多的工作,因此不会从io_service.run()
返回。
请注意,当io_service
运行不正常时,应用程序必须在reset()
运行之前reset()
io_service
。
示例问题和示例3a代码
现在,我们来看看问题中引用的两段代码。
问题代码
socket->async_receive
将工作添加到io_service
。 因此, io_service->run()
会阻塞,直到读取操作成功或错误完成,并且ClientReceiveEvent
已经完成运行或抛出exception。
示例3a代码
为了更容易理解,下面是一个更小的注释示例3a:
void CalculateFib(std::size_t n); int main() { boost::asio::io_service io_service; boost::optional<boost::asio::io_service::work> work = // '. 1 boost::in_place(boost::ref(io_service)); // .' boost::thread_group worker_threads; // -. for(int x = 0; x < 2; ++x) // : { // '. worker_threads.create_thread( // :- 2 boost::bind(&boost::asio::io_service::run, &io_service) // .' ); // : } // -' io_service.post(boost::bind(CalculateFib, 3)); // '. io_service.post(boost::bind(CalculateFib, 4)); // :- 3 io_service.post(boost::bind(CalculateFib, 5)); // .' work = boost::none; // 4 worker_threads.join_all(); // 5 }
在高层次上,程序将创build2个线程来处理io_service
的事件循环(2)。 这导致一个简单的线程池,将计算斐波纳契数字(3)。
问题代码和这个代码之间的一个主要区别在于, 在实际的工作和处理程序被添加到io_service
(3) 之前 ,这个代码调用io_service::run()
(2)。 为了防止立即返回io_service::run()
,创build了一个io_service::work
对象(1)。 这个对象可以防止io_service
不能工作。 因此, io_service::run()
不会因为没有工作而返回。
整体stream程如下:
- 创build并添加添加到
io_service::work
对象。 - 创build调用
io_service::run()
线程池。 由于io_service::work
对象,这些工作线程不会从io_service
返回。 - 将3个计算斐波那契数的处理程序添加到
io_service
,并立即返回。 工作线程,而不是主线程,可能会立即开始运行这些处理程序。 - 删除
io_service::work
对象。 - 等待工作线程完成运行。 这只会在所有3个处理程序完成执行后才会发生,因为
io_service
既没有处理程序也没有工作。
代码可以写成不同的方式,就像处理程序添加到io_service
的原始代码一样,然后处理io_service
事件循环。 这消除了使用io_service::work
的需要,并且得到以下代码:
int main() { boost::asio::io_service io_service; io_service.post(boost::bind(CalculateFib, 3)); // '. io_service.post(boost::bind(CalculateFib, 4)); // :- 3 io_service.post(boost::bind(CalculateFib, 5)); // .' boost::thread_group worker_threads; // -. for(int x = 0; x < 2; ++x) // : { // '. worker_threads.create_thread( // :- 2 boost::bind(&boost::asio::io_service::run, &io_service) // .' ); // : } // -' worker_threads.join_all(); // 5 }
同步与asynchronous
尽pipe问题中的代码使用了asynchronous操作,但它正在同步运行,因为它正在等待asynchronous操作完成:
socket.async_receive(buffer, handler) io_service.run();
相当于:
boost::asio::error_code error; std::size_t bytes_transferred = socket.receive(buffer, 0, error); handler(error, bytes_transferred);
一般来说,尽量避免混合同步和asynchronous操作。 往往会把复杂的系统变成一个复杂的系统。 这个答案突出了asynchronous编程的优点,其中一些也包含在Boost.Asio 文档中 。
为了简化run
,把它看作是一个必须处理一堆纸的员工; 它只需要一张纸,就是纸张所告诉的,把纸张扔掉,拿走下一张; 当他用完床单时,就离开了办公室。 在每张纸上都可以有任何种类的说明,甚至可以添加一张新的纸张。 回到asio:你可以通过两种方式来实现io_service
工作,本质上是:通过在链接的示例中使用post
,或者使用内部调用io_service
上的post
其他对象,如socket
和它的async_*
方法。