Python多处理pool.map多个参数
在Python多处理库中,是否有支持多个参数的pool.map的变体?
text = "test" def harvester(text, case): X = case[0] return text+ str(X) if __name__ == '__main__': pool = multiprocessing.Pool(processes=6) case = RAW_DATASET pool.map(harvester(text,case),case, 1) pool.close() pool.join()
对此的答案是版本和情况相关的。 对于Python的最新版本(从3.3开始)最常见的答案在下面由JF Sebastian首先描述。 1它使用Pool.starmap
方法,它接受一系列参数元组。 然后它会自动解包每个元组的参数,并将它们传递给给定的函数:
import multiprocessing from itertools import product def merge_names(a, b): return '{} & {}'.format(a, b) if __name__ == '__main__': names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie'] with multiprocessing.Pool(processes=3) as pool: results = pool.starmap(merge_names, product(names, repeat=2)) print(results) # Output: ['Brown & Brown', 'Brown & Wilson', 'Brown & Bartlett', ...
对于较早的Python版本,您需要编写一个辅助函数来明确地解压参数。 如果你想使用,你还需要编写一个包装器来把Pool
变成一个上下文pipe理器。 (感谢muon指出了这一点。)
import multiprocessing from itertools import product from contextlib import contextmanager def merge_names(a, b): return '{} & {}'.format(a, b) def merge_names_unpack(args): return merge_names(*args) @contextmanager def poolcontext(*args, **kwargs): pool = multiprocessing.Pool(*args, **kwargs) yield pool pool.terminate() if __name__ == '__main__': names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie'] with poolcontext(processes=3) as pool: results = pool.map(merge_names_unpack, product(names, repeat=2)) print(results) # Output: ['Brown & Brown', 'Brown & Wilson', 'Brown & Bartlett', ...
在更简单的情况下,使用固定的第二个参数,也可以使用partial
,但只能在Python 2.7+中使用。
import multiprocessing from functools import partial from contextlib import contextmanager @contextmanager def poolcontext(*args, **kwargs): pool = multiprocessing.Pool(*args, **kwargs) yield pool pool.terminate() def merge_names(a, b): return '{} & {}'.format(a, b) if __name__ == '__main__': names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie'] with poolcontext(processes=3) as pool: results = pool.map(partial(merge_names, b='Sons'), names) print(results) # Output: ['Brown & Sons', 'Wilson & Sons', 'Bartlett & Sons', ...
其中大部分是由他的答案所启发的,而这个答案可能应该被接受。 但是,由于这个问题一直处于顶端,对于未来的读者来说似乎是最好的。
有没有支持多个参数的pool.map的变种?
Python 3.3包含了pool.starmap()
方法 :
#!/usr/bin/env python3 from functools import partial from itertools import repeat from multiprocessing import Pool, freeze_support def func(a, b): return a + b def main(): a_args = [1,2,3] second_arg = 1 with Pool() as pool: L = pool.starmap(func, [(1, 1), (2, 1), (3, 1)]) M = pool.starmap(func, zip(a_args, repeat(second_arg))) N = pool.map(partial(func, b=second_arg), a_args) assert L == M == N if __name__=="__main__": freeze_support() main()
对于旧版本:
#!/usr/bin/env python2 import itertools from multiprocessing import Pool, freeze_support def func(a, b): print a, b def func_star(a_b): """Convert `f([1,2])` to `f(1,2)` call.""" return func(*a_b) def main(): pool = Pool() a_args = [1,2,3] second_arg = 1 pool.map(func_star, itertools.izip(a_args, itertools.repeat(second_arg))) if __name__=="__main__": freeze_support() main()
产量
1 1 2 1 3 1
注意itertools.izip()
和itertools.repeat()
在这里是如何使用的。
由于@unutbu提到的错误,你不能在Python 2.6上使用functools.partial()
或类似的function,所以应该明确定义简单的包装函数func_star()
。 另请参阅uptimebox
build议 的解决方法 。
我认为下面会更好
def multi_run_wrapper(args): return add(*args) def add(x,y): return x+y if __name__ == "__main__": from multiprocessing import Pool pool = Pool(4) results = pool.map(multi_run_wrapper,[(1,2),(2,3),(3,4)]) print results
产量
[3, 5, 7]
使用Python 3.3+与pool.starmap():
from multiprocessing.dummy import Pool as ThreadPool def write(i, x): print(i, "---", x) a = ["1","2","3"] b = ["4","5","6"] pool = ThreadPool(2) pool.starmap(write, zip(a,b)) pool.close() pool.join()
结果:
1 --- 4 2 --- 5 3 --- 6
如果你喜欢,你也可以压缩()更多的参数: zip(a,b,c,d,e)
如果你想有一个常量值作为parameter passing,你必须使用import itertools
然后zip(itertools.repeat(constant), a)
。
在了解了JF Sebastian中的 itertools之后,我决定更进一步,编写一个parmap
包来处理并行化问题,在python-2.7和python-3.2(以及之后的版本)上提供map
和starmap
函数,的立场论点。
安装
pip install parmap
如何并行化:
import parmap # If you want to do: y = [myfunction(x, argument1, argument2) for x in mylist] # In parallel: y = parmap.map(myfunction, mylist, argument1, argument2) # If you want to do: z = [myfunction(x, y, argument1, argument2) for (x,y) in mylist] # In parallel: z = parmap.starmap(myfunction, mylist, argument1, argument2) # If you want to do: listx = [1, 2, 3, 4, 5, 6] listy = [2, 3, 4, 5, 6, 7] param = 3.14 param2 = 42 listz = [] for (x, y) in zip(listx, listy): listz.append(myfunction(x, y, param1, param2)) # In parallel: listz = parmap.starmap(myfunction, zip(listx, listy), param1, param2)
我已经将parmap上传到PyPI和github存储库 。
作为一个例子,这个问题可以回答如下:
import parmap def harvester(case, text): X = case[0] text+ str(X) if __name__ == "__main__": case = RAW_DATASET # assuming this is an iterable parmap.map(harvester, case, "test", chunksize=1)
有一个叫做starmap
的multiprocessing
分支( 注意:使用github上的版本 ),不需要starmap
– map函数镜像python的map的API,因此map可以带多个参数。 有了pathos
,你通常也可以在解释器中进行多处理,而不是卡在__main__
块中。 经过一些轻微的更新之后,Pathos将会发布一个版本 – 主要是转换到python 3.x.
Python 2.7.5 (default, Sep 30 2013, 20:15:49) [GCC 4.2.1 (Apple Inc. build 5566)] on darwin Type "help", "copyright", "credits" or "license" for more information. >>> def func(a,b): ... print a,b ... >>> >>> from pathos.multiprocessing import ProcessingPool >>> pool = ProcessingPool(nodes=4) >>> pool.map(func, [1,2,3], [1,1,1]) 1 1 2 1 3 1 [None, None, None] >>> >>> # also can pickle stuff like lambdas >>> result = pool.map(lambda x: x**2, range(10)) >>> result [0, 1, 4, 9, 16, 25, 36, 49, 64, 81] >>> >>> # also does asynchronous map >>> result = pool.amap(pow, [1,2,3], [4,5,6]) >>> result.get() [1, 32, 729] >>> >>> # or can return a map iterator >>> result = pool.imap(pow, [1,2,3], [4,5,6]) >>> result <processing.pool.IMapIterator object at 0x110c2ffd0> >>> list(result) [1, 32, 729]
您可以使用以下两个函数来避免为每个新函数编写一个包装:
import itertools from multiprocessing import Pool def universal_worker(input_pair): function, args = input_pair return function(*args) def pool_args(function, *args): return zip(itertools.repeat(function), zip(*args))
使用带有arg_0
, arg_1
和arg_2
参数列表的函数function
,如下所示:
pool = Pool(n_core) list_model = pool.map(universal_worker, pool_args(function, arg_0, arg_1, arg_2) pool.close() pool.join()
更好的方法是使用装饰器,而不是手工编写包装函数 。 特别是当你有很多映射的函数时,装饰器会避免为每个函数编写包装器,从而节省你的时间。 通常装饰的function是不可挑选的,但是我们可以使用functools
来解决它。 更多的消除可以在这里find。
这里的例子
def unpack_args(func): from functools import wraps @wraps(func) def wrapper(args): if isinstance(args, dict): return func(**args) else: return func(*args) return wrapper @unpack_args def func(x, y): return x + y
然后你可以用压缩参数来映射它
np, xlist, ylist = 2, range(10), range(10) pool = Pool(np) res = pool.map(func, zip(xlist, ylist)) pool.close() pool.join()
当然,您也可以在Python 3中使用Pool.starmap
(> = 3.3),如其他答案中所述。
另一个简单的select是将你的函数参数包装在一个元组中,然后包装应该传递给元组的参数。 在处理大量数据时,这可能并不理想。 我相信它会复制每个元组。
from multiprocessing import Pool def f((a,b,c,d)): print a,b,c,d return a + b + c +d if __name__ == '__main__': p = Pool(10) data = [(i+0,i+1,i+2,i+3) for i in xrange(10)] print(p.map(f, data)) p.close() p.join()
给出一些随机的输出:
0 1 2 3 1 2 3 4 2 3 4 5 3 4 5 6 4 5 6 7 5 6 7 8 7 8 9 10 6 7 8 9 8 9 10 11 9 10 11 12 [6, 10, 14, 18, 22, 26, 30, 34, 38, 42]
另一种方法是将列表传递给一个参数的例程:
import os from multiprocessing import Pool def task(args): print "PID =", os.getpid(), ", arg1 =", args[0], ", arg2 =", args[1] pool = Pool() pool.map(task, [ [1,2], [3,4], [5,6], [7,8] ])
人们可以用自己喜欢的方法构build一个参数列表。
从python 3.4.4开始,你可以使用multiprocessing.get_context()获得一个上下文对象来使用多个启动方法:
import multiprocessing as mp def foo(q, h, w): q.put(h + ' ' + w) print(h + ' ' + w) if __name__ == '__main__': ctx = mp.get_context('spawn') q = ctx.Queue() p = ctx.Process(target=foo, args=(q,'hello', 'world')) p.start() print(q.get()) p.join()
或者你只是简单地更换
pool.map(harvester(text,case),case, 1)
通过:
pool.apply_async(harvester(text,case),case, 1)
更好的解决schemepython2:
from multiprocessing import Pool def func((i, (a, b))): print i, a, b return a + b pool = Pool(3) pool.map(func, [(0,(1,2)), (1,(2,3)), (2,(3, 4))])
2 3 4
1 2 3
0 1 2
出[]:
[3,5,7]
在官方文档中指出它只支持一个可迭代的参数。 我喜欢在这种情况下使用apply_async。 在你的情况下,我会这样做:
from multiprocessing import Process, Pool, Manager text = "test" def harvester(text, case, q = None): X = case[0] res = text+ str(X) if q: q.put(res) return res def block_until(q, results_queue, until_counter=0): i = 0 while i < until_counter: results_queue.put(q.get()) i+=1 if __name__ == '__main__': pool = multiprocessing.Pool(processes=6) case = RAW_DATASET m = Manager() q = m.Queue() results_queue = m.Queue() # when it completes results will reside in this queue blocking_process = Process(block_until, (q, results_queue, len(case))) blocking_process.start() for c in case: try: res = pool.apply_async(harvester, (text, case, q = None)) res.get(timeout=0.1) except: pass blocking_process.join()