线程池与多处理池类似吗?

是否有一个用于工作线程的Pool类,类似于多处理模块的Pool类 ?

我喜欢例如并行化地图function的简单方法

def long_running_func(p): c_func_no_gil(p) p = multiprocessing.Pool(4) xs = p.map(long_running_func, range(100)) 

但是我想这样做没有创build新的进程的开销。

我知道GIL。 然而,在我的用例中,函数将是一个IO绑定的C函数,python包装将在实际的函数调用之前释放GIL。

我是否必须编写自己的线程池?

我刚刚发现multiprocessing模块中实际上一个基于线程的Pool接口,但它被隐藏起来并没有被正确logging。

它可以通过导入

 from multiprocessing.pool import ThreadPool 

它是用一个包装python线程的虚拟Process类来实现的。 这个基于线程的Process类可以在文档中简单提及的multiprocessing.dummy中find。 这个虚拟模块据说提供了基于线程的整个多处理接口。

在Python 3中,您可以使用concurrent.futures.ThreadPoolExecutor ,即:

 executor = ThreadPoolExecutor(max_workers=10) a = executor.submit(my_function) 

有关更多信息和示例,请参阅文档 。

是的,它似乎有(或多或less)相同的API。

 import multiprocessing def worker(lnk): .... def start_process(): ..... .... if(PROCESS): pool = multiprocessing.Pool(processes=POOL_SIZE, initializer=start_process) else: pool = multiprocessing.pool.ThreadPool(processes=POOL_SIZE, initializer=start_process) pool.map(worker, inputs) .... 

对于非常简单和轻量级的东西(稍微修改一下 ):

 from Queue import Queue from threading import Thread class Worker(Thread): """Thread executing tasks from a given tasks queue""" def __init__(self, tasks): Thread.__init__(self) self.tasks = tasks self.daemon = True self.start() def run(self): while True: func, args, kargs = self.tasks.get() try: func(*args, **kargs) except Exception, e: print e finally: self.tasks.task_done() class ThreadPool: """Pool of threads consuming tasks from a queue""" def __init__(self, num_threads): self.tasks = Queue(num_threads) for _ in range(num_threads): Worker(self.tasks) def add_task(self, func, *args, **kargs): """Add a task to the queue""" self.tasks.put((func, args, kargs)) def wait_completion(self): """Wait for completion of all the tasks in the queue""" self.tasks.join() if __name__ == '__main__': from random import randrange from time import sleep delays = [randrange(1, 10) for i in range(100)] def wait_delay(d): print 'sleeping for (%d)sec' % d sleep(d) pool = ThreadPool(20) for i, d in enumerate(delays): pool.add_task(wait_delay, d) pool.wait_completion() 

要支持任务完成callback,只需将callback添加到任务元组即可。

以下是Python Cookbook中看起来很有希望的东西:

食谱576519:线程池与(多)processing.Pool(Python)相同的API

创build新进程的开销很小,尤其是当它们只有4个时。 我怀疑这是你的应用程序的性能热点。 保持简单,优化你需要的地方以及分析结果指向的地方。

您好在Python中使用线程池,你可以使用这个库:

 from multiprocessing.dummy import Pool as ThreadPool 

然后使用,这个库这样做:

 pool = ThreadPool(threads) results = pool.map(service, tasks) pool.close() pool.join() return results 

线程是你想要的线程数,任务是最对应于服务的任务列表。

没有内置的基于线程的池。 但是,使用Queue类实现生产者/消费者队列可能非常快。

来自: https : //docs.python.org/2/library/queue.html

 from threading import Thread from Queue import Queue def worker(): while True: item = q.get() do_work(item) q.task_done() q = Queue() for i in range(num_worker_threads): t = Thread(target=worker) t.daemon = True t.start() for item in source(): q.put(item) q.join() # block until all tasks are done