Python Multiprocessing.Pool惰性迭代
我想知道python的Multiprocessing.Pool类与地图,imap和map_async工作的方式。 我特别的问题是,我想映射一个迭代器,创build内存重的对象,并不希望所有这些对象同时生成内存。 我想看看各种map()函数是否会干扰我的迭代器,或者只是在subprocess缓慢前进时智能地调用next()函数,所以我篡改了一些testing:
def g(): for el in xrange(100): print el yield el def f(x): time.sleep(1) return x*x if __name__ == '__main__': pool = Pool(processes=4) # start 4 worker processes go = g() g2 = pool.imap(f, go) g2.next()
依此类推,使用map,imap和map_async。 然而,这是最公然的例子,因为简单地调用next()在g2上的一次打印出我的生成器g()中的所有元素,而如果imap是这样做的“懒惰”,我希望它只能调用go.next ()一次,因此只打印出'1'。
有人可以清理发生的事情吗?如果有什么办法让stream程池“懒惰地”根据需要评估迭代器?
谢谢,
加布
我们先看看程序的结尾。
程序结束时,多处理模块使用atexit
来调用multiprocessing.util._exit_function
。
如果删除g2.next()
,程序将很快结束。
_exit_function
最终调用Pool._terminate_pool
。 主线程将pool._task_handler._state
的状态从RUN
更改为TERMINATE
。 同时, pool._task_handler
线程在Pool._handle_tasks
循环,并在达到条件时退出
if thread._state: debug('task handler found thread._state != RUN') break
(请参阅/usr/lib/python2.6/multiprocessing/pool.py)
这是什么阻止任务处理程序完全消耗您的生成器, g()
。 如果你看看Pool._handle_tasks
你会看到
for i, task in enumerate(taskseq): ... try: put(task) except IOError: debug('could not put task on queue') break
这是消耗你的发电机的代码。 ( taskseq
不完全是你的生成器,但是由于taskseq
被消耗,你的生成器也是如此)。
相反,当你调用g2.next()
,主线程会调用IMapIterator.next
,并等待它到达self._cond.wait(timeout)
。
主线程正在等待,而不是调用_exit_function
是什么让任务处理程序线程正常运行,这意味着完全消耗生成器,因为它将任务放在worker
的inqueue
在Pool._handle_tasks
函数中。
底线是所有的Pool
映射函数都使用它给出的整个迭代。 如果你想要大块消耗发电机,你可以这样做:
import multiprocessing as mp import itertools import time def g(): for el in xrange(50): print el yield el def f(x): time.sleep(1) return x * x if __name__ == '__main__': pool = mp.Pool(processes=4) # start 4 worker processes go = g() result = [] N = 11 while True: g2 = pool.map(f, itertools.islice(go, N)) if g2: result.extend(g2) time.sleep(1) else: break print(result)
我也有这个问题,并且对地图消耗了所有元素感到失望。 我编写了一个函数,它在多处理中使用Queue数据types来延迟地使用迭代器。 这与@unutbu在回答评论时所描述的相似,但是正如他指出的那样,由于没有用于重新加载队列的callback机制。 Queue数据types暴露了一个超时参数,我已经用了100毫秒来达到很好的效果。
from multiprocessing import Process, Queue, cpu_count from Queue import Full as QueueFull from Queue import Empty as QueueEmpty def worker(recvq, sendq): for func, args in iter(recvq.get, None): result = func(*args) sendq.put(result) def pool_imap_unordered(function, iterable, procs=cpu_count()): # Create queues for sending/receiving items from iterable. sendq = Queue(procs) recvq = Queue() # Start worker processes. for rpt in xrange(procs): Process(target=worker, args=(sendq, recvq)).start() # Iterate iterable and communicate with worker processes. send_len = 0 recv_len = 0 itr = iter(iterable) try: value = itr.next() while True: try: sendq.put((function, value), True, 0.1) send_len += 1 value = itr.next() except QueueFull: while True: try: result = recvq.get(False) recv_len += 1 yield result except QueueEmpty: break except StopIteration: pass # Collect all remaining results. while recv_len < send_len: result = recvq.get() recv_len += 1 yield result # Terminate worker processes. for rpt in xrange(procs): sendq.put(None)
该解决scheme的优点是不会将请求分配到Pool.map。 一个人不能阻止别人进步。 因人而异。 请注意,您可能想要使用不同的对象来为工作人员发送终止信号。 在这个例子中,我使用了None。
testing“的Python 2.7(r27:82525,2010年7月4日,09:01:59)[MSC v.1500 32位(英特尔)]在win32”
你想要的是在NuMap包中实现的,从网站:
NuMap是一个并行(基于线程或进程,本地或远程),缓冲,多任务,itertools.imap或multiprocessing.Pool.imap函数的替代品。 就像imap一样,它对一个序列的元素或者可迭代的元素进行评估,而且这样做是懒惰的。 懒惰可以通过“步幅”和“缓冲”参数进行调整。