未检测到多处理池中引发的exception
看起来,当多处理.Pool进程发生exception时,没有堆栈跟踪或任何其他指示失败。 例:
from multiprocessing import Pool def go(): print(1) raise Exception() print(2) p = Pool() p.apply_async(go) p.close() p.join() 打印1并停止静音。 有趣的是,引发一个BaseException反而起作用。 有什么办法使所有exception的行为与BaseException相同吗?
我有一个合理的解决scheme,至less出于debugging的目的。 目前我还没有一个解决scheme可以在主stream程中提高例外。 我的第一个想法是使用一个装饰器,但是你只能腌制在模块顶层定义的函数 ,所以这是正确的。
 相反,一个简单的包装类和一个池子类,使用apply_async (因此apply )。 我将离开map_async作为读者的练习。 
 import traceback from multiprocessing.pool import Pool import multiprocessing # Shortcut to multiprocessing's logger def error(msg, *args): return multiprocessing.get_logger().error(msg, *args) class LogExceptions(object): def __init__(self, callable): self.__callable = callable def __call__(self, *args, **kwargs): try: result = self.__callable(*args, **kwargs) except Exception as e: # Here we add some debugging help. If multiprocessing's # debugging is on, it will arrange to log the traceback error(traceback.format_exc()) # Re-raise the original exception so the Pool worker can # clean up raise # It was fine, give a normal answer return result class LoggingPool(Pool): def apply_async(self, func, args=(), kwds={}, callback=None): return Pool.apply_async(self, LogExceptions(func), args, kwds, callback) def go(): print(1) raise Exception() print(2) multiprocessing.log_to_stderr() p = LoggingPool(processes=1) p.apply_async(go) p.close() p.join() 
这给了我:
 1 [ERROR/PoolWorker-1] Traceback (most recent call last): File "mpdebug.py", line 24, in __call__ result = self.__callable(*args, **kwargs) File "mpdebug.py", line 44, in go raise Exception() Exception 
 也许我失去了一些东西,但不是什么Result对象的get方法返回? 请参阅进程池 。 
类multiprocessing.pool.AsyncResult
Pool.apply_async()和Pool.map_async()。get([timeout])返回的结果的类
到达时返回结果。 如果超时不是“无”,并且结果没有在超时秒内到达,则会引发multiprocessing.TimeoutError。 如果远程调用引发exception,那么该exception将被get()重新调整。
所以,稍微修改你的例子,可以做
 from multiprocessing import Pool def go(): print(1) raise Exception("foobar") print(2) p = Pool() x = p.apply_async(go) x.get() p.close() p.join() 
这给出了结果
 1 Traceback (most recent call last): File "rob.py", line 10, in <module> x.get() File "/usr/lib/python2.6/multiprocessing/pool.py", line 422, in get raise self._value Exception: foobar 
这不是完全令人满意的,因为它不打印回溯,但总比没有好。
更新:这个bug已经在Python 3.4中修复,由Richard Oudkerk提供。 请参阅multiprocessing.pool的问题get方法.Async应该返回完整的回溯 。
在撰写本文时得票最多的解决scheme存在一个问题:
 from multiprocessing import Pool def go(): print(1) raise Exception("foobar") print(2) p = Pool() x = p.apply_async(go) x.get() ## waiting here for go() to complete... p.close() p.join() 
 正如@dfrankow所指出的那样,它将在x.get()上等待,这会破坏asynchronous运行任务的点。 所以,为了更好的效率(特别是如果你的工作人员的function需要很长时间),我会改变它: 
 from multiprocessing import Pool def go(x): print(1) # task_that_takes_a_long_time() raise Exception("Can't go anywhere.") print(2) return x**2 p = Pool() results = [] for x in range(1000): results.append( p.apply_async(go, [x]) ) p.close() for r in results: r.get() 
优点 :worker函数是asynchronous运行的,例如,如果你在多个内核上运行很多任务,它将比原来的解决scheme高效得多。
  缺点 :如果worker函数中有一个exception,只有在池完成所有任务之后才会引发exception。  这可能是也可能不是理想的行为。 编辑根据@ colinfang的评论,这固定这个。 
我已经成功地logging这个装饰器的exception:
 import traceback, functools, multiprocessing def trace_unhandled_exceptions(func): @functools.wraps(func) def wrapped_func(*args, **kwargs): try: func(*args, **kwargs) except: print 'Exception in '+func.__name__ traceback.print_exc() return wrapped_func 
与问题中的代码,这是
 @trace_unhandled_exceptions def go(): print(1) raise Exception() print(2) p = multiprocessing.Pool(1) p.apply_async(go) p.close() p.join() 
 简单地装饰你传递给你的进程池的函数。 这个工作的关键是@functools.wraps(func)否则多处理会抛出一个PicklingError 。 
上面的代码给出
 1 Exception in go Traceback (most recent call last): File "<stdin>", line 5, in wrapped_func File "<stdin>", line 4, in go Exception 
我创build了一个RemoteException.py模块,它显示了进程中exception的完整回溯。 Python2。 下载并添加到您的代码:
 import RemoteException @RemoteException.showError def go(): raise Exception('Error!') if __name__ == '__main__': import multiprocessing p = multiprocessing.Pool(processes = 1) r = p.apply(go) # full traceback is shown here 
 import logging from multiprocessing import Pool def proc_wrapper(func, *args, **kwargs): """Print exception because multiprocessing lib doesn't return them right.""" try: return func(*args, **kwargs) except Exception as e: logging.exception(e) raise def go(x): print x raise Exception("foobar") p = Pool() p.apply_async(proc_wrapper, (go, 5)) p.join() p.close() 
我会尝试使用pdb:
 import pdb import sys def handler(type, value, tb): pdb.pm() sys.excepthook = handler 
 既然你已经使用了apply_sync ,我猜这个用例是想做一些同步任务。 使用callback进行处理是另一种select。 请注意,这个选项只适用于python3.2及以上版本,python2.7不可用。 
 from multiprocessing import Pool def callback(result): print('success', result) def callback_error(result): print('error', result) def go(): print(1) raise Exception() print(2) p = Pool() p.apply_async(go, callback=callback, error_callback=callback_error) # You can do another things p.close() p.join() 
 由于multiprocessing.Pool已经有了不错的答案。可用的游戏multiprocessing.Pool ,我将提供一个解决scheme,使用不同的方法来完成。 
 对于python >= 3.2 ,下面的解决scheme似乎是最简单的: 
 from concurrent.futures import ProcessPoolExecutor, wait def go(): print(1) raise Exception() print(2) futures = [] with ProcessPoolExecutor() as p: for i in range(10): futures.append(p.submit(go)) results = [f.result() for f in futures] 
优点:
- 很less的代码
- 在主stream程中引发了一个例外
- 提供堆栈跟踪
- 没有外部依赖
有关API的更多信息,请查看: https : //docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ProcessPoolExecutor
此外,如果您提交了大量任务,并且只要您的某个任务失败,您希望主进程失败,则可以使用以下代码片段:
 from concurrent.futures import ProcessPoolExecutor, wait, FIRST_EXCEPTION, as_completed import time def go(): print(1) time.sleep(0.3) raise Exception() print(2) futures = [] with ProcessPoolExecutor(1) as p: for i in range(10): futures.append(p.submit(go)) for f in as_completed(futures): if f.exception() is not None: for f in futures: f.cancel() break [f.result() for f in futures] 
所有其他答案只有在所有任务都被执行后才会失败。