如何使用Python的asyncio模块正确地创build和运行并发任务?
我正在尝试使用Python 3相对较新的asyncio
模块正确理解和实现两个并发运行的Task
对象。
简而言之,asyncio似乎devise用于处理asynchronous进程和并发Task
在事件循环中的执行。 它促进使用await
(在asynchronous函数中应用)作为一个无callback的方式来等待并使用一个结果,而不会阻塞事件循环。 (期货和callback仍是一个可行的select。)
它还提供了asyncio.Task()
类,这是Future
一个专门的子类,用于包装协程。 最好通过使用asyncio.ensure_future()
方法来调用。 asyncio任务的预期用途是允许独立运行的任务与同一事件循环内的其他任务“同时”运行。 我的理解是Tasks
连接到事件循环,然后在await
语句之间自动继续驱动协程。
我喜欢能够使用并发任务而不需要使用某个Executor
类的想法,但是我没有find关于实现的详细说明。
这就是我目前的做法:
import asyncio print('running async test') async def say_boo(): i = 0 while True: await asyncio.sleep(0) print('...boo {0}'.format(i)) i += 1 async def say_baa(): i = 0 while True: await asyncio.sleep(0) print('...baa {0}'.format(i)) i += 1 # OPTION 1: wrap in Task object # -> automatically attaches to event loop and executes boo = asyncio.ensure_future(say_boo()) baa = asyncio.ensure_future(say_baa()) loop = asyncio.get_event_loop() loop.run_forever()
在试图同时运行两个循环任务的情况下,我注意到,除非Task有一个内部await
expression式,否则它将被卡住在while
循环中,有效地阻止其他任务运行(非常像普通的while
循环)。 然而,一旦任务不得不等待,他们似乎同时运行,没有问题。
因此, await
语句似乎为事件循环提供了在任务之间来回切换的立足点,从而产生了并发的效果。
内部await
示例输出:
running async test ...boo 0 ...baa 0 ...boo 1 ...baa 1 ...boo 2 ...baa 2
示例输出无内部await
:
...boo 0 ...boo 1 ...boo 2 ...boo 3 ...boo 4
问题
此实现是否传递asyncio
中并发循环任务的“适当”示例?
是否正确,这是唯一的方法是为了让一个Task
提供一个阻塞点( await
expression式),以便事件循环能够处理多个任务?
是的,在事件循环中运行的任何协程都会阻塞其他协程和任务,除非它运行
- 使用
yield from
或await
来调用另一个协程(如果使用Python 3.5+)。 - 回报。
这是因为asyncio
是单线程的; 事件循环运行的唯一方法是没有其他协程正在执行。 使用yield from
/ yield from
暂时暂停协程,给事件循环一个工作的机会。
你的示例代码很好,但是在许多情况下,你可能不希望长时间运行的代码在事件循环内部不运行asynchronousI / O。 在这些情况下,使用BaseEventLoop.run_in_executor
在后台线程或进程中运行代码通常更有意义。 如果您的任务是CPU绑定的, ProcessPoolExecutor
将是更好的select,如果您需要执行一些不是asyncio
友好的I / O,将使用ThreadPoolExecutor
。
例如,您的两个循环完全受CPU限制,不共享任何状态,因此使用ProcessPoolExecutor
可以在CPU之间并行运行每个循环:
import asyncio from concurrent.futures import ProcessPoolExecutor print('running async test') def say_boo(): i = 0 while True: print('...boo {0}'.format(i)) i += 1 def say_baa(): i = 0 while True: print('...baa {0}'.format(i)) i += 1 if __name__ == "__main__": executor = ProcessPoolExecutor(2) loop = asyncio.get_event_loop() boo = asyncio.ensure_future(loop.run_in_executor(executor, say_boo)) baa = asyncio.ensure_future(loop.run_in_executor(executor, say_baa)) loop.run_forever()
您不一定需要yield from x
产生控制权来控制事件循环。
在你的例子中,我认为正确的做法是做一个yield None
或者等价的一个简单的yield
,而不是yield from asyncio.sleep(0.001)
的yield from asyncio.sleep(0.001)
:
import asyncio @asyncio.coroutine def say_boo(): i = 0 while True: yield None print("...boo {0}".format(i)) i += 1 @asyncio.coroutine def say_baa(): i = 0 while True: yield print("...baa {0}".format(i)) i += 1 boo_task = asyncio.async(say_boo()) baa_task = asyncio.async(say_baa()) loop = asyncio.get_event_loop() loop.run_forever()
协程只是简单的旧Python生成器。 在内部, asyncio
事件循环会保存这些生成器的logging,并在每个循环中逐一调用gen.send()
。 无论何时你yield
,对gen.send()
的调用完成,循环可以继续。 (我正在简化它;查看实际代码的https://hg.python.org/cpython/file/3.4/Lib/asyncio/tasks.py#l265 )
也就是说,如果您需要在不共享数据的情况下执行CPU密集型计算,我仍然可以执行run_in_executor
路由。