Tag: 多处理

在多处理进程之间共享较大的只读Numpy数组

我有一个60GB的SciPyarrays(matrix),我必须共享5个以上的multiprocessing Process对象。 我见过numpy-sharedmem并阅读SciPy列表上的这个讨论 。 似乎有两种方法 – numpy-sharedmem和使用multiprocessing.RawArray()和映射NumPy dtype s到ctype 。 现在, numpy-sharedmem似乎是要走的路,但我还没有看到一个很好的参考例子。 我不需要任何types的锁,因为数组(实际上是一个matrix)将是只读的。 现在,由于它的大小,我想避免一个副本。 这听起来像正确的方法是创build数组的唯一副本作为sharedmem数组,然后将其传递给Process对象? 几个具体的问题: 真正将sharedmem句柄传递给子Process() es的最好方法是什么? 我需要一个队列来传递一个数组吗? 一个pipe道会更好? 我可以只是将它作为parameter passing给Process()子类的init(我假设它被腌制)? 在上面的链接讨论中,提到numpy-sharedmem不是64位安全的? 我肯定使用一些不是32位寻址的结构。 对RawArray()方法有没有权衡? 越慢越好? 我需要numpy-sharedmem方法的任何ctype-to-dtype映射吗? 有没有人有一个这样的OpenSource代码的例子? 我是一个非常实际的学习,如果没有任何一个好的例子来看待这个问题,很难做到这一点。 如果有任何其他信息可以帮助我们澄清这一点,请发表评论,我会添加。 谢谢! 这需要在Ubuntu Linux和Maybe Mac OS上运行,但可移植性不是一个大问题。

使用多处理队列,池和锁的死简单的例子

我试图阅读在http://docs.python.org/dev/library/multiprocessing.html的文档,但我仍然挣扎着多处理队列,池和locking。 而现在我能够build立下面的例子。 关于队列和池,我不知道我是否以正确的方式理解了这个概念,所以如果我错了,请纠正我。 我试图实现的是在时间处理2个请求(数据列表在这个例子中有8个),所以我应该使用什么? 池创build2个进程,可以处理两个不同的队列(最多2个),或者我应该使用队列每次处理2个input? 锁将是正确打印输出。 import multiprocessing import time data = (['a', '2'], ['b', '4'], ['c', '6'], ['d', '8'], ['e', '1'], ['f', '3'], ['g', '5'], ['h', '7'] ) def mp_handler(var1): for indata in var1: p = multiprocessing.Process(target=mp_worker, args=(indata[0], indata[1])) p.start() def mp_worker(inputs, the_time): print " Processs %s\tWaiting %s seconds" % (inputs, the_time) time.sleep(int(the_time)) print […]

多处理中的共享内存对象

假设我有一个大内存numpy数组,我有一个函数func ,在这个巨大的数组作为input(连同一些其他参数)。 具有不同参数的func可以并行运行。 例如: def func(arr, param): # do stuff to arr, param # build array arr pool = Pool(processes = 6) results = [pool.apply_async(func, [arr, param]) for param in all_params] output = [res.get() for res in results] 如果我使用多处理库,那么这个巨型数组将被多次复制到不同的进程中。 有没有办法让不同的进程共享相同的数组? 这个数组对象是只读的,不会被修改。 更复杂的是,如果arr不是一个数组,而是一个任意的python对象,有没有办法共享呢? [EDITED] 我读了答案,但我仍然有点困惑。 由于fork()是copy-on-write,因此在python多处理库中产生新进程时,不应该调用任何额外的开销。 但是下面的代码表明有一个巨大的开销: from multiprocessing import Pool, Manager import numpy as np; […]

在Python中使用多处理时应该如何logging?

现在我在一个框架中有一个中心模块,它使用Python 2.6 multiprocessing模块产生多个进程。 因为它使用multiprocessing ,所以有模块级的多处理感知日志LOG = multiprocessing.get_logger() 。 根据文档 ,这个logging器拥有进程共享的锁,这样你就不会在sys.stderr (或任何文件句柄)中sys.stderr ,因为多个进程同时写入。 我现在的问题是,框架中的其他模块不支持多处理。 我看到它的方式,我需要使这个中央模块上的所有依赖使用多处理感知日志logging。 这是在框架内烦人的,更不用说为框架的所有客户。 有没有替代品我没有想到?

是否将共享只读数据复制到不同进程以进行多处理?

这段代码看起来像这样: glbl_array = # a 3 Gb array def my_func( args, def_param = glbl_array): #do stuff on args and def_param if __name__ == '__main__': pool = Pool(processes=4) pool.map(my_func, range(1000)) 有没有办法确保(或鼓励)不同的进程没有得到glbl_array的副本,但共享它。 如果没有办法停止拷贝,我将使用memmapped数组,但是我的访问模式不是很规则,所以我期望memmapped数组变慢。 以上似乎是第一个尝试。 这是在Linux上。 我只是想从Stackoverflow的一些build议,不想惹恼系统pipe理员。 如果第二个参数是像glbl_array.tostring()这样的真正的不可变对象,你认为这会有帮助glbl_array.tostring() ?

multiprocessing.Pool:何时使用apply,apply_async或map?

我还没有看到使用Pool.apply , Pool.apply_async和Pool.map的例子 。 我主要使用Pool.map ; 别人的优点是什么?

键盘中断与python的多处理池

我怎样才能处理与python的多处理池的KeyboardInterrupt事件? 这是一个简单的例子: from multiprocessing import Pool from time import sleep from sys import exit def slowly_square(i): sleep(1) return i*i def go(): pool = Pool(8) try: results = pool.map(slowly_square, range(40)) except KeyboardInterrupt: # **** THIS PART NEVER EXECUTES. **** pool.terminate() print "You cancelled the program!" sys.exit(1) print "\nFinally, here are the results: ", results if __name__ […]

为什么在我导入numpy之后多处理只使用一个核心?

我不确定这是否会成为一个操作系统问题,但我想我会问在这里,以防有人从Python的一端有所了解。 我一直在尝试使用joblib来并行执行一个CPU joblib循环,但是我发现,不是每个工作进程被分配到不同的内核,而是最终将所有这些进程分配给相同的内核,而且性能没有提高。 这是一个非常微不足道的例子… from joblib import Parallel,delayed import numpy as np def testfunc(data): # some very boneheaded CPU work for nn in xrange(1000): for ii in data[0,:]: for jj in data[1,:]: ii*jj def run(niter=10): data = (np.random.randn(2,100) for ii in xrange(niter)) pool = Parallel(n_jobs=-1,verbose=1,pre_dispatch='all') results = pool(delayed(testfunc)(dd) for dd in data) if __name__ == […]

在共享内存中使用numpy数组进行多处理

我想在共享内存中使用一个numpy数组用于多处理模块。 难点在于使用它像一个numpy数组,而不仅仅是一个ctypes数组。 from multiprocessing import Process, Array import scipy def f(a): a[0] = -a[0] if __name__ == '__main__': # Create the array N = int(10) unshared_arr = scipy.rand(N) a = Array('d', unshared_arr) print "Originally, the first two elements of arr = %s"%(arr[:2]) # Create, start, and finish the child process p = Process(target=f, args=(a,)) p.start() […]

多处理与线程化Python

我想了解多 线程的多处理优势。 我知道全局解释器锁可以解决多处理问题 ,但是还有什么其他的优点,而且可以通过线程来做同样的事情呢?