使用多处理时不能pickle <type'instancemethod'> Pool.map()
我正在尝试使用multiprocessing
的Pool.map()
函数来同时分配工作。 当我使用下面的代码,它工作正常:
import multiprocessing def f(x): return x*x def go(): pool = multiprocessing.Pool(processes=4) print pool.map(f, range(10)) if __name__== '__main__' : go()
但是,当我以更面向对象的方式使用它时,它不起作用。 它给出的错误信息是:
PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup __builtin__.instancemethod failed
这发生在以下是我的主要程序时:
import someClass if __name__== '__main__' : sc = someClass.someClass() sc.go()
以下是我的someClass
类:
import multiprocessing class someClass(object): def __init__(self): pass def f(self, x): return x*x def go(self): pool = multiprocessing.Pool(processes=4) print pool.map(self.f, range(10))
任何人都知道这个问题可能是一个简单的方法吗?
问题是多处理必须腌制一些东西来把它们放在进程中,绑定方法是不可挑选的。 解决方法(无论您认为“简单”还是不行;-)是将基础设施添加到您的程序,以允许这样的方法被腌制,注册与copy_reg标准库方法。
例如,Steven Bethard对这个线程的贡献(接近线程末尾)显示了一个完全可行的方法,允许通过copy_reg
来进行酸洗/ copy_reg
。
所有这些解决scheme都是丑陋的,因为多处理和酸洗被破坏和限制,除非你跳出标准库。
如果使用名为pathos.multiprocesssing
的multiprocessing
pathos.multiprocesssing
,则可以在多处理map
函数中直接使用类和类方法。 这是因为dill
被用来代替pickle
或cPickle
, dill
可以序列化几乎任何东西在Python中。
pathos.multiprocessing
还提供了一个asynchronous映射函数…它可以map
具有多个参数的函数(例如map(math.pow, [1,2,3], [4,5,6])
)
请参阅: 多处理和莳萝可以一起做什么?
和: http : //matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization/
>>> import pathos.pools as pp >>> p = pp.ProcessPool(4) >>> >>> def add(x,y): ... return x+y ... >>> x = [0,1,2,3] >>> y = [4,5,6,7] >>> >>> p.map(add, x, y) [4, 6, 8, 10] >>> >>> class Test(object): ... def plus(self, x, y): ... return x+y ... >>> t = Test() >>> >>> p.map(Test.plus, [t]*4, x, y) [4, 6, 8, 10] >>> >>> p.map(t.plus, x, y) [4, 6, 8, 10]
如果你想明白的话,你可以完全想要你想做的事情,如果你愿意,你可以从口译员那里做。
>>> import pathos.pools as pp >>> class someClass(object): ... def __init__(self): ... pass ... def f(self, x): ... return x*x ... def go(self): ... pool = pp.ProcessPool(4) ... print pool.map(self.f, range(10)) ... >>> sc = someClass() >>> sc.go() [0, 1, 4, 9, 16, 25, 36, 49, 64, 81] >>>
获取代码在这里: https : //github.com/uqfoundation/pathos
你也可以在你的someClass()
定义一个__call__()
方法,该方法调用someClass.go()
,然后将someClass()
的实例传递给池。 这个对象是pickleable,它工作正常(对我来说)…
尽pipeSteven Bethard的解决scheme有一些限制:
当你将类方法注册为一个函数时,每当你的方法处理完成时,你的类的析构函数就会被惊人地调用。 所以,如果你有一个你的类的实例调用n次方法,成员可能会消失在2次运行之间,你可能会得到一个消息malloc: *** error for object 0x...: pointer being freed was not allocated
(例如打开成员文件)或pure virtual method called, terminate called without an active exception
(这意味着比我所使用的成员对象的生命周期比我想象的更短)。 在处理大于池大小的n时,我得到了这个结果。 这里是一个简短的例子:
from multiprocessing import Pool, cpu_count from multiprocessing.pool import ApplyResult # --------- see Stenven's solution above ------------- from copy_reg import pickle from types import MethodType def _pickle_method(method): func_name = method.im_func.__name__ obj = method.im_self cls = method.im_class return _unpickle_method, (func_name, obj, cls) def _unpickle_method(func_name, obj, cls): for cls in cls.mro(): try: func = cls.__dict__[func_name] except KeyError: pass else: break return func.__get__(obj, cls) class Myclass(object): def __init__(self, nobj, workers=cpu_count()): print "Constructor ..." # multi-processing pool = Pool(processes=workers) async_results = [ pool.apply_async(self.process_obj, (i,)) for i in range(nobj) ] pool.close() # waiting for all results map(ApplyResult.wait, async_results) lst_results=[r.get() for r in async_results] print lst_results def __del__(self): print "... Destructor" def process_obj(self, index): print "object %d" % index return "results" pickle(MethodType, _pickle_method, _unpickle_method) Myclass(nobj=8, workers=3) # problem !!! the destructor is called nobj times (instead of once)
输出:
Constructor ... object 0 object 1 object 2 ... Destructor object 3 ... Destructor object 4 ... Destructor object 5 ... Destructor object 6 ... Destructor object 7 ... Destructor ... Destructor ... Destructor ['results', 'results', 'results', 'results', 'results', 'results', 'results', 'results'] ... Destructor
__call__
方法不是那么等价,因为从结果中读取[None,…]
from multiprocessing import Pool, cpu_count from multiprocessing.pool import ApplyResult class Myclass(object): def __init__(self, nobj, workers=cpu_count()): print "Constructor ..." # multiprocessing pool = Pool(processes=workers) async_results = [ pool.apply_async(self, (i,)) for i in range(nobj) ] pool.close() # waiting for all results map(ApplyResult.wait, async_results) lst_results=[r.get() for r in async_results] print lst_results def __call__(self, i): self.process_obj(i) def __del__(self): print "... Destructor" def process_obj(self, i): print "obj %d" % i return "result" Myclass(nobj=8, workers=3) # problem !!! the destructor is called nobj times (instead of once), # **and** results are empty !
所以这两种方法都不能令人满意
还有另外一个捷径可以使用,尽pipe根据类实例中的内容可能是效率低下的。
正如大家所说的那样,问题在于multiprocessing
代码必须腌制它发送给已经启动的subprocess的东西,picker不会执行实例方法。
但是,可以不发送实例方法,而是将实际的类实例以及要调用的函数的名称发送到普通函数,然后使用getattr
调用实例方法,从而在Pool
创build绑定方法子。 这与定义__call__
方法类似,不同之处在于您可以调用多个成员函数。
从他的回答中窃取@ EricH的代码并注释了一下(我重新input了它,因此所有的名字都改变了,出于某种原因,这似乎比剪切粘贴更容易:-))来说明所有的魔法:
import multiprocessing import os def call_it(instance, name, args=(), kwargs=None): "indirect caller for instance methods and multiprocessing" if kwargs is None: kwargs = {} return getattr(instance, name)(*args, **kwargs) class Klass(object): def __init__(self, nobj, workers=multiprocessing.cpu_count()): print "Constructor (in pid=%d)..." % os.getpid() self.count = 1 pool = multiprocessing.Pool(processes = workers) async_results = [pool.apply_async(call_it, args = (self, 'process_obj', (i,))) for i in range(nobj)] pool.close() map(multiprocessing.pool.ApplyResult.wait, async_results) lst_results = [r.get() for r in async_results] print lst_results def __del__(self): self.count -= 1 print "... Destructor (in pid=%d) count=%d" % (os.getpid(), self.count) def process_obj(self, index): print "object %d" % index return "results" Klass(nobj=8, workers=3)
输出结果显示,构造函数的确被调用了一次(在原始的pid中),而析构函数被调用了9次(每次复制一次=每个pool-worker-process需要2到3次,再加上一次处理)。 这通常是可以的,因为在这种情况下,由于默认pickler会复制整个实例,并且(semi-)会秘密地重新填充它,在这种情况下,
obj = object.__new__(Klass) obj.__dict__.update({'count':1})
这就是为什么即使在三个工作进程中调用了八次析构函数的情况下,每一次都会从1减less到0,但是当然你仍然可以通过这种方式陷入困境。 如有必要,你可以提供你自己的__setstate__
:
def __setstate__(self, adict): self.count = adict['count']
在这种情况下,例如。
你也可以在你的someClass()
定义一个__call__()
方法,该方法调用someClass.go()
,然后将someClass()
的实例传递给池。 这个对象是pickleable,它工作正常(对我来说)…
class someClass(object): def __init__(self): pass def f(self, x): return x*x def go(self): p = Pool(4) sc = p.map(self, range(4)) print sc def __call__(self, x): return self.f(x) sc = someClass() sc.go()
一个潜在的微不足道的解决办法是切换到使用multiprocessing.dummy
。 这是一个基于线程的多处理接口实现,在Python 2.7中似乎没有这个问题。 我在这里没有太多的经验,但是这个快速导入的改变允许我在类方法上调用apply_async。
有关multiprocessing.dummy
一些很好的资源:
https://docs.python.org/2/library/multiprocessing.html#module-multiprocessing.dummy