如何恢复传递给multiprocessing.Process的函数的返回值?
在下面的示例代码中,我想恢复函数worker
的返回值。 我怎么能这样做? 这个值在哪里存储?
示例代码:
import multiprocessing def worker(procnum): '''worker function''' print str(procnum) + ' represent!' return procnum if __name__ == '__main__': jobs = [] for i in range(5): p = multiprocessing.Process(target=worker, args=(i,)) jobs.append(p) p.start() for proc in jobs: proc.join() print jobs
输出:
0 represent! 1 represent! 2 represent! 3 represent! 4 represent! [<Process(Process-1, stopped)>, <Process(Process-2, stopped)>, <Process(Process-3, stopped)>, <Process(Process-4, stopped)>, <Process(Process-5, stopped)>]
我似乎无法find存储在jobs
中的对象的相关属性。
在此先感谢,blz
使用共享variables进行通信。 比如像这样:
import multiprocessing def worker(procnum, return_dict): '''worker function''' print str(procnum) + ' represent!' return_dict[procnum] = procnum if __name__ == '__main__': manager = multiprocessing.Manager() return_dict = manager.dict() jobs = [] for i in range(5): p = multiprocessing.Process(target=worker, args=(i,return_dict)) jobs.append(p) p.start() for proc in jobs: proc.join() print return_dict.values()
我认为@sega_saibuild议的方法是更好的方法。 但它确实需要一个代码示例,所以在这里:
import multiprocessing from os import getpid def worker(procnum): print 'I am number %d in process %d' % (procnum, getpid()) return getpid() if __name__ == '__main__': pool = multiprocessing.Pool(processes = 3) print pool.map(worker, range(5))
哪个会打印返回值:
I am number 0 in process 19139 I am number 1 in process 19138 I am number 2 in process 19140 I am number 3 in process 19139 I am number 4 in process 19140 [19139, 19138, 19140, 19139, 19140]
如果你熟悉map
(Python内置),这不应该太具有挑战性。 否则,看看sega_Sai的链接 。
请注意,只需要很less的代码。 (还要注意过程如何被重用)。
看起来你应该使用multiprocessing.Pool类,而使用方法.apply().apply_async(),map()
http://docs.python.org/library/multiprocessing.html?highlight=pool#multiprocessing.pool.AsyncResult
本示例显示如何使用多处理.Pipe实例的列表从任意数量的进程返回string:
import multiprocessing def worker(procnum, send_end): '''worker function''' result = str(procnum) + ' represent!' print result send_end.send(result) def main(): jobs = [] pipe_list = [] for i in range(5): recv_end, send_end = multiprocessing.Pipe(False) p = multiprocessing.Process(target=worker, args=(i, send_end)) jobs.append(p) pipe_list.append(recv_end) p.start() for proc in jobs: proc.join() result_list = [x.recv() for x in pipe_list] print result_list if __name__ == '__main__': main()
输出:
0 represent! 1 represent! 2 represent! 3 represent! 4 represent! ['0 represent!', '1 represent!', '2 represent!', '3 represent!', '4 represent!']
这个解决scheme比使用多处理的Queue使用更less的资源
- 一个pipe道
- 至less有一个锁
- 一个缓冲区
- 一个线程
或者使用一个multiprocessing.SimpleQueue
- 一个pipe道
- 至less有一个锁
看看这些types的来源是非常有益的。
您可以使用内置的exit
来设置进程的退出代码。 它可以从进程的exitcode
属性中获得:
import multiprocessing def worker(procnum): print str(procnum) + ' represent!' exit(procnum) if __name__ == '__main__': jobs = [] for i in range(5): p = multiprocessing.Process(target=worker, args=(i,)) jobs.append(p) p.start() result = [] for proc in jobs: proc.join() result.append(proc.exitcode) print result
输出:
0 represent! 1 represent! 2 represent! 3 represent! 4 represent! [0, 1, 2, 3, 4]
对于正在寻求如何从使用Queue
的Process
获取价值的其他人:
import multiprocessing ret = {'foo': False} def worker(queue): ret = queue.get() ret['foo'] = True queue.put(ret) if __name__ == '__main__': queue = multiprocessing.Queue() queue.put(ret) p = multiprocessing.Process(target=worker, args=(queue,)) p.start() p.join() print queue.get() # Prints {"foo": True}
我修改了vartec的答案,因为我需要从函数中获取错误代码。 (感谢vertec !!!它的一个可怕的把戏)
这也可以用一个manager.list
来完成,但我认为最好是在一个字典中存储一个列表。 这样,我们保留函数和结果的方式,因为我们不能确定列表将被填充的顺序。
from multiprocessing import Process import time import datetime import multiprocessing def func1(fn, m_list): print 'func1: starting' time.sleep(1) m_list[fn] = "this is the first function" print 'func1: finishing' # return "func1" # no need for return since Multiprocess doesnt return it =( def func2(fn, m_list): print 'func2: starting' time.sleep(3) m_list[fn] = "this is function 2" print 'func2: finishing' # return "func2" def func3(fn, m_list): print 'func3: starting' time.sleep(9) # if fail wont join the rest because it never populate the dict # or do a try/except to get something in return. raise ValueError("failed here") # if we want to get the error in the manager dict we can catch the error try: raise ValueError("failed here") m_list[fn] = "this is third" except: m_list[fn] = "this is third and it fail horrible" # print 'func3: finishing' # return "func3" def runInParallel(*fns): # * is to accept any input in list start_time = datetime.datetime.now() proc = [] manager = multiprocessing.Manager() m_list = manager.dict() for fn in fns: # print fn # print dir(fn) p = Process(target=fn, name=fn.func_name, args=(fn, m_list)) p.start() proc.append(p) for p in proc: p.join() # 5 is the time out print datetime.datetime.now() - start_time return m_list, proc if __name__ == '__main__': manager, proc = runInParallel(func1, func2, func3) # print dir(proc[0]) # print proc[0]._name # print proc[0].name # print proc[0].exitcode # here you can check what did fail for i in proc: print i.name, i.exitcode # name was set up in the Process line 53 # here will only show the function that worked and where able to populate the # manager dict for i, j in manager.items(): print dir(i) # things you can do to the function print i, j
出于某种原因,我无法find一个通用的例子来说明如何在任何地方使用Queue
(甚至Python的doc例子不会生成多个进程),所以下面是我尝试过10次之后所做的工作:
def add_helper(queue, arg1, arg2): # the func called in child processes ret = arg1 + arg2 queue.put(ret) def multi_add(): # spawns child processes q = Queue() processes = [] rets = [] for _ in range(0, 100): p = Process(target=add_helper, args=(q, 1, 2)) processes.append(p) p.start() for p in processes: ret = q.get() # will block rets.append(ret) for p in processes: p.join() return rets
Queue
是一个阻塞的,线程安全的队列,可以用来存储subprocess的返回值。 所以你必须把队列传递给每个进程。 这里不太明显的一点是,在join
Process
之前必须从队列中get()
,否则队列将填满并阻塞所有内容。
面向对象(在Python 3.4中testing)的更新:
from multiprocessing import Process, Queue class Multiprocessor(): def __init__(self): self.processes = [] self.queue = Queue() @staticmethod def _wrapper(func, queue, args, kwargs): ret = func(*args, **kwargs) queue.put(ret) def run(self, func, *args, **kwargs): args2 = [func, self.queue, args, kwargs] p = Process(target=self._wrapper, args=args2) self.processes.append(p) p.start() def wait(self): rets = [] for p in self.processes: ret = self.queue.get() rets.append(ret) for p in self.processes: p.join() return rets # tester if __name__ == "__main__": mp = Multiprocessor() num_proc = 64 for _ in range(num_proc): # queue up multiple tasks running `sum` mp.run(sum, [1, 2, 3, 4, 5]) ret = mp.wait() # get all results print(ret) assert len(ret) == num_proc and all(r == 15 for r in ret)