如何在Python中循环multithreading操作
说我有一个非常大的名单,我正在执行一个这样的操作:
for item in items: try: api.my_operation(item) except: print 'error with item'
我的问题是双重的:
- 有很多项目
- api.my_operation需要永远返回
我想使用multithreading立即启动一堆api.my_operations,所以我可以同时处理5个或10个甚至100个项目。
如果my_operation()返回一个exception(因为可能我已经处理了这个项目) – 没关系。 它不会破坏任何东西。 循环可以继续到下一个项目。
注意 :这是针对Python 2.7.3的
首先,在Python中,如果您的代码是CPU绑定的,multithreading将无济于事,因为一次只有一个线程可以保存全局解释器锁,因此可以运行Python代码。 所以,你需要使用进程,而不是线程。
如果你的操作是“永久返回”,因为它是IO绑定的,也就是说,等待在networking或磁盘拷贝之类。 我稍后再回来。
接下来,一次处理5个,10个或100个项目的方法是创build一个由5个或10个或100个工作人员组成的池,并将这些项目放入工作人员服务的队列中。 幸运的是,stdlib multiprocessing
和concurrent.futures
库都包含了大部分的细节。
前者对传统编程更加强大和灵活; 后者更简单,如果你需要撰写未来的等待; 对于微不足道的情况,你select的并不重要。 (在这种情况下,最明显的实现是每行3行, futures
4行, multiprocessing
。
如果您使用2.6-2.7或3.0-3.1, futures
不是内置的,但您可以从PyPI ( pip install futures
) pip install futures
。
最后,如果可以将整个循环迭代转换为函数调用(例如,可以传递给map
),那么平行化通常要简单得多,所以我们先来做:
def try_my_operation(item): try: api.my_operation(item) except: print('error with item')
把它放在一起:
executor = concurrent.futures.ProcessPoolExecutor(10) futures = [executor.submit(try_my_operation, item) for item in items] concurrent.futures.wait(futures)
如果你有很多相对较小的工作,多处理的开销可能会使收益大大减less。 解决这个问题的方法是把工作分成更大的工作。 例如(使用itertools
食谱中的itertools
,可以复制并粘贴到代码中,或者从PyPI上的more-itertools
项目中获取):
def try_multiple_operations(items): for item in items: try: api.my_operation(item) except: print('error with item') executor = concurrent.futures.ProcessPoolExecutor(10) futures = [executor.submit(try_multiple_operations, group) for group in grouper(5, items)] concurrent.futures.wait(futures)
最后,如果你的代码是IO绑定的呢? 然后线程和进程一样好,开销less(限制less,但这些限制在这种情况下通常不会影响你)。 有时候,“less开销”就足以说明你不需要使用线程进行批处理,但是你可以使用进程,这是一个不错的胜利。
那么,你如何使用线程而不是进程呢? 只需将ProcessPoolExecutor
更改为ThreadPoolExecutor
。
如果您不确定您的代码是CPU限制还是IO限制,请尝试两种方法。
我可以为我的python脚本中的多个function吗? 例如,如果我想要并行化其他代码中的其他代码。 是否有可能在同一个脚本中执行两个multithreading函数?
是。 事实上,有两种不同的方式来做到这一点。
首先,你可以共享相同的(线程或进程)执行程序,并从多个地方使用它,没有问题。 任务和未来的重点在于它们是独立的; 你不关心他们在哪里跑,只是你排队他们,最终得到答案。
或者,你可以在同一个程序中有两个执行程序,没有问题。 这具有性能成本 – 如果同时使用两个执行程序,则最终将尝试在8个内核上运行(例如)16个繁忙线程,这意味着将会发生一些上下文切换。 但是有时候这样做是值得的,因为这两个执行者很less在同一时间忙碌,这使得你的代码变得更简单了。 或者,也许一个执行者正在运行可能需要一段时间才能完成的非常大的任务,另一个正在运行需要尽快完成的非常小的任务,因为对于部分程序来说,响应速度比吞吐量更重要。
如果你不知道哪一个适合你的程序,通常是第一个。
编辑 :忘了提及这在Python 2.7.x上工作
有multiprocesing.pool,下面的示例说明如何使用其中之一:
from multiprocessing.pool import ThreadPool as Pool # from multiprocessing import Pool pool_size = 5 # your "parallelness" pool = Pool(pool_size) def worker(item): try: api.my_operation(item) except: print('error with item') for item in items: pool.apply_async(worker, (item,)) pool.close() pool.join()
现在,如果您确实确定您的进程是以@abarnert提到的CPU绑定的,请将ThreadPool更改为进程池实现(在ThreadPool导入下进行评论)。 你可以在这里find更多的细节: http : //docs.python.org/2/library/multiprocessing.html#using-a-pool-of-workers
您可以使用如下方法将处理分割成指定数量的线程:
import threading def process(items, start, end): for item in items[start:end]: try: api.my_operation(item) except Exception: print('error with item') def split_processing(items, num_splits=4): split_size = len(items) // num_splits threads = [] for i in range(num_splits): # determine the indices of the list this thread will handle start = i * split_size # special case on the last chunk to account for uneven splits end = None if i+1 == num_splits else (i+1) * split_size # create the thread threads.append( threading.Thread(target=process, args=(items, start, end))) threads[-1].start() # start the thread we just created # wait for all threads to finish for t in threads: t.join() split_processing(items)