在多处理进程之间共享较大的只读Numpy数组

我有一个60GB的SciPyarrays(matrix),我必须共享5个以上的multiprocessing Process对象。 我见过numpy-sharedmem并阅读SciPy列表上的这个讨论 。 似乎有两种方法 – numpy-sharedmem和使用multiprocessing.RawArray()和映射NumPy dtype s到ctype 。 现在, numpy-sharedmem似乎是要走的路,但我还没有看到一个很好的参考例子。 我不需要任何types的锁,因为数组(实际上是一个matrix)将是只读的。 现在,由于它的大小,我想避免一个副本。 这听起来像正确的方法是创build数组的唯一副本作为sharedmem数组,然后将其传递给Process对象? 几个具体的问题:

  1. 真正将sharedmem句柄传递给子Process() es的最好方法是什么? 我需要一个队列来传递一个数组吗? 一个pipe道会更好? 我可以只是将它作为parameter passing给Process()子类的init(我假设它被腌制)?

  2. 在上面的链接讨论中,提到numpy-sharedmem不是64位安全的? 我肯定使用一些不是32位寻址的结构。

  3. RawArray()方法有没有权衡? 越慢越好?

  4. 我需要numpy-sharedmem方法的任何ctype-to-dtype映射吗?

  5. 有没有人有一个这样的OpenSource代码的例子? 我是一个非常实际的学习,如果没有任何一个好的例子来看待这个问题,很难做到这一点。

如果有任何其他信息可以帮助我们澄清这一点,请发表评论,我会添加。 谢谢!

这需要在Ubuntu Linux和Maybe Mac OS上运行,但可移植性不是一个大问题。

@Velimir Mlaker给出了一个很好的答案。 我想我可以添加一些评论和一个小例子。

(我找不到有关sharedmem的很多文档 – 这些是我自己实验的结果。)

  1. 你是否需要在subprocess启动时或者启动之后传递句柄? 如果只是前者,你可以使用Process参数和args 。 这可能比使用全局variables更好。
  2. 从你链接的讨论页面看来,64位Linux的支持似乎已经添加到了sharedmem中,所以这可能不成问题。
  3. 我不知道这个。
  4. 否。请参阅下面的示例。

 #!/usr/bin/env python from multiprocessing import Process import sharedmem import numpy def do_work(data, start): data[start] = 0; def split_work(num): n = 20 width = n/num shared = sharedmem.empty(n) shared[:] = numpy.random.rand(1, n)[0] print "values are %s" % shared processes = [Process(target=do_work, args=(shared, i*width)) for i in xrange(num)] for p in processes: p.start() for p in processes: p.join() print "values are %s" % shared print "type is %s" % type(shared[0]) if __name__ == '__main__': split_work(4) 

产量

 values are [ 0.81397784 0.59667692 0.10761908 0.6736734 0.46349645 0.98340718 0.44056863 0.10701816 0.67167752 0.29158274 0.22242552 0.14273156 0.34912309 0.43812636 0.58484507 0.81697513 0.57758441 0.4284959 0.7292129 0.06063283] values are [ 0. 0.59667692 0.10761908 0.6736734 0.46349645 0. 0.44056863 0.10701816 0.67167752 0.29158274 0. 0.14273156 0.34912309 0.43812636 0.58484507 0. 0.57758441 0.4284959 0.7292129 0.06063283] type is <type 'numpy.float64'> 

这个相关的问题可能是有用的。

如果您在Linux(或任何符合POSIX的系统)上,则可以将此数组定义为全局variables。 当Linux启动一个新的subprocess时,多进程使用fork() 。 一个新产生的subprocess只要不改变它( 写时复制机制)就会自动与其父进程共享内存。

既然你说“我不需要任何types的锁,因为数组(实际上是一个matrix)将是只读的”利用这种行为将是一个非常简单,但非常有效的方法:所有的subprocess将访问读取这个庞大的numpy数组时,物理内存中的数据是相同的。

不要将你的数组交给Process()构造函数,这将指示multiprocessing将数据pickle到子Process() ,这对你来说将是非常低效或不可能的。 在Linux上,在fork() ,孩子是使用相同物理内存的父母的确切副本,所以您只需确保Pythonvariables“包含”matrix可以从target函数交给Process() 。 这通常可以用“全局”variables来实现。

示例代码:

 from multiprocessing import Process from numpy import random global_array = random.random(10**4) def child(): print sum(global_array) def main(): processes = [Process(target=child) for _ in xrange(10)] for p in processes: p.start() for p in processes: p.join() if __name__ == "__main__": main() 

在Windows上 – 不支持fork()multiprocessing正在使用win32 API调用CreateProcess 。 它从任何给定的可执行文件创build一个全新的过程。 这就是为什么在Windows上,如果需要在父级运行时创build数据,那么就需要将数据腌制到子级。

您可能对我写的一小段代码感兴趣: github.com/vmlaker/benchmark-sharedmem

唯一感兴趣的文件是main.py 这是numpy-sharedmem的基准 – 代码只是通过Pipe将数组( numpysharedmem )传递给派生进程。 工作人员只是对数据调用sum() 。 我只是比较两种实现之间的数据通信时间。

我还写了另一个更复杂的代码: github.com/vmlaker/sherlock 。

在这里,我使用numpy-sharedmem模块进行OpenCV的实时image processing – 图像是NumPy数组,按照OpenCV的新版cv2 API。 通过从multiprocessing.Manager创build的字典对象(而不是使用队列或pipe道),进程之间共享图像,实际上是其引用的过程。与使用普通的NumPy数组相比,我获得了巨大的性能改进。

pipe道与队列

根据我的经验,使用Pipe的IPC比Queue快。 这是有道理的,因为队列增加了locking,以保证多个生产者/消费者的安全。 pipe道没有。 但是,如果您只有两个进程来回访问,则使用Pipe是安全的,或者,如文档所示:

…同时使用不同pipe道的stream程不会有腐蚀风险。

sharedmem安全

sharedmem模块的主要问题是不正常程序退出时内存泄漏的可能性。 这在这里进行冗长的讨论。 尽pipe在2011年4月10日Sturla提到了内存泄漏的修复,但自那时起,我仍然经历了泄漏,使用Sturla Molden自己的GitHub( github.com/sturlamol/sharedmem-numpy )和Chris Lee-Messer的Bitbucket bitbucket.org/cleemesser/numpy-sharedmem )。

如果你的数组很大,你可以使用numpy.memmap 。 例如,如果你有一个存储在磁盘中的数组,比如'test.array' ,即使在“写入”模式下,你也可以使用同时进程访问数据,但是你的情况比较简单,因为你只需要“读取”模式。

创build数组:

 a = np.memmap('test.array', dtype='float32', mode='w+', shape=(100000,1000)) 

然后你可以像用一个普通的数组一样填充这个数组。 例如:

 a[:10,:100]=1. a[10:,100:]=2. 

当您删除variablesa时,数据将存储在磁盘中。

稍后,您可以使用多个进程访问test.array的数据:

 # read-only mode b = np.memmap('test.array', dtype='float32', mode='r', shape=(100000,1000)) # read and writing mode c = np.memmap('test.array', dtype='float32', mode='r+', shape=(100000,1000)) 

相关答案:

  • 在python和numpy中使用大数据,没有足够的内存,如何保存光盘上的部分结果?

  • 是否有可能映射磁盘上的不连续的数据与Python的数组?

您可能还会发现查看pyro的文档是很有用的,就像您可以正确分区您的任务一样,您可以使用它在不同的机器上以及在同一台机器的不同核心上执行不同的部分。