Python:我怎样才能并行运行python函数?

我先研究,找不到我的问题的答案。 我想在Python中并行运行多个函数。

我有这样的东西:

files.py import common #common is a util class that handles all the IO stuff dir1 = 'C:\folder1' dir2 = 'C:\folder2' filename = 'test.txt' addFiles = [25, 5, 15, 35, 45, 25, 5, 15, 35, 45] def func1(): c = common.Common() for i in range(len(addFiles)): c.createFiles(addFiles[i], filename, dir1) c.getFiles(dir1) time.sleep(10) c.removeFiles(addFiles[i], dir1) c.getFiles(dir1) def func2(): c = common.Common() for i in range(len(addFiles)): c.createFiles(addFiles[i], filename, dir2) c.getFiles(dir2) time.sleep(10) c.removeFiles(addFiles[i], dir2) c.getFiles(dir2) 

我想调用func1和func2,让他们同时运行。 这些函数不会相互影响,也不会影响同一个对象。 现在我必须等func1在func2开始之前完成。 我如何做下面的事情:

 process.py from files import func1, func2 runBothFunc(func1(), func2()) 

我希望能够同时创build两个目录,因为每一分钟我都在计算正在创build多less个文件。 如果该目录不在那里,它将会抛弃我的时间。

你可以使用threading或多multiprocessing

由于CPython的特殊性 , threading不可能达到真正的并行性。 出于这个原因, multiprocessing通常是更好的select。

这是一个完整的例子:

 from multiprocessing import Process def func1(): print 'func1: starting' for i in xrange(10000000): pass print 'func1: finishing' def func2(): print 'func2: starting' for i in xrange(10000000): pass print 'func2: finishing' if __name__ == '__main__': p1 = Process(target=func1) p1.start() p2 = Process(target=func2) p2.start() p1.join() p2.join() 

启动/joinsubprocess的机制可以很容易地被封装到你的runBothFunc

 def runInParallel(*fns): proc = [] for fn in fns: p = Process(target=fn) p.start() proc.append(p) for p in proc: p.join() runInParallel(func1, func2) 

没有办法保证两个函数将会彼此同步执行,这似乎是你想要做的。

你可以做的最好的做法是把这个function分成几个步骤,然后等待两个关键的同步点使用Process.join来完成,比如@ aix的回答提及。

这比time.sleep(10)更好,因为你不能保证准确的时间。 在明确等待的情况下,你说的是在执行这个步骤之前必须完成这个function,而不是假设它将在10ms内完成,而不是基于机器上正在进行的操作来保证。

如果你是一个Windows用户,并使用python 3,那么这篇文章将帮助你在python中进行并行编程。当你运行一个通常的多处理库的池编程时,你会得到一个有关你的程序中的主函数的错误。 这是因为Windows没有fork()的function。 下面的文章是解决上述问题。

http://python.6.x6.nabble.com/Multiprocessing-Pool-woes-td5047050.html

自从我使用python 3以后,我改变了这个程序:

 from types import FunctionType import marshal def _applicable(*args, **kwargs): name = kwargs['__pw_name'] code = marshal.loads(kwargs['__pw_code']) gbls = globals() #gbls = marshal.loads(kwargs['__pw_gbls']) defs = marshal.loads(kwargs['__pw_defs']) clsr = marshal.loads(kwargs['__pw_clsr']) fdct = marshal.loads(kwargs['__pw_fdct']) func = FunctionType(code, gbls, name, defs, clsr) func.fdct = fdct del kwargs['__pw_name'] del kwargs['__pw_code'] del kwargs['__pw_defs'] del kwargs['__pw_clsr'] del kwargs['__pw_fdct'] return func(*args, **kwargs) def make_applicable(f, *args, **kwargs): if not isinstance(f, FunctionType): raise ValueError('argument must be a function') kwargs['__pw_name'] = f.__name__ # edited kwargs['__pw_code'] = marshal.dumps(f.__code__) # edited kwargs['__pw_defs'] = marshal.dumps(f.__defaults__) # edited kwargs['__pw_clsr'] = marshal.dumps(f.__closure__) # edited kwargs['__pw_fdct'] = marshal.dumps(f.__dict__) # edited return _applicable, args, kwargs def _mappable(x): x,name,code,defs,clsr,fdct = x code = marshal.loads(code) gbls = globals() #gbls = marshal.loads(gbls) defs = marshal.loads(defs) clsr = marshal.loads(clsr) fdct = marshal.loads(fdct) func = FunctionType(code, gbls, name, defs, clsr) func.fdct = fdct return func(x) def make_mappable(f, iterable): if not isinstance(f, FunctionType): raise ValueError('argument must be a function') name = f.__name__ # edited code = marshal.dumps(f.__code__) # edited defs = marshal.dumps(f.__defaults__) # edited clsr = marshal.dumps(f.__closure__) # edited fdct = marshal.dumps(f.__dict__) # edited return _mappable, ((i,name,code,defs,clsr,fdct) for i in iterable) 

在这个函数之后,上面的问题代码也是这样改变的:

 from multiprocessing import Pool from poolable import make_applicable, make_mappable def cube(x): return x**3 if __name__ == "__main__": pool = Pool(processes=2) results = [pool.apply_async(*make_applicable(cube,x)) for x in range(1,7)] print([result.get(timeout=10) for result in results]) 

我得到的输出为:

 [1, 8, 27, 64, 125, 216] 

我在想这个post可能对一些windows用户有用。