Python多处理和一个共享计数器
我遇到了多处理模块的麻烦。 我正在使用地图方法的工作者池从许多文件加载数据,并为他们每个我用自定义函数分析数据。 每次处理一个文件,我都想更新一个计数器,以便能够跟踪还有多less文件需要处理。 这里是示例代码:
def analyze_data( args ): # do something counter += 1 print counter if __name__ == '__main__': list_of_files = os.listdir(some_directory) global counter counter = 0 p = Pool() p.map(analyze_data, list_of_files)
我找不到解决scheme。
问题是counter
variables之间不共享:每个独立的进程创build它自己的本地实例,并增加它。
有关可用于在进程之间共享状态的一些技巧,请参阅本文档的这一部分 。 在你的情况下,你可能想要在工人之间共享一个Value
实例
这里是你的例子的工作版本(有一些虚拟input数据)。 请注意,它使用了我在实践中真正尝试避免的全局值:
from multiprocessing import Pool, Value from time import sleep counter = None def init(args): ''' store the counter for later use ''' global counter counter = args def analyze_data(args): ''' increment the global counter, do something with the input ''' global counter # += operation is not atomic, so we need to get a lock: with counter.get_lock(): counter.value += 1 print counter.value return args * 10 if __name__ == '__main__': #inputs = os.listdir(some_directory) # # initialize a cross-process counter and the input lists # counter = Value('i', 0) inputs = [1, 2, 3, 4] # # create the pool of workers, ensuring each one receives the counter # as it starts. # p = Pool(initializer = init, initargs = (counter, )) i = p.map_async(analyze_data, inputs, chunksize = 1) i.wait() print i.get()
没有比赛条件错误的计数器类:
class Counter(object): def __init__(self): self.val = multiprocessing.Value('i', 0) def increment(self, n=1): with self.val.get_lock(): self.val.value += n @property def value(self): return self.val.value