Python多处理安全写入文件
我正在尝试解决一个涉及大量子问题的大数值问题,我使用Python的多处理模块(特别是Pool.map)将不同的独立子问题分解到不同的核心上。 每个子问题涉及计算大量的子子问题,我试图通过将这些结果存储到文件中来有效地记忆这些结果,如果它们还没有被任何进程计算,则跳过计算并从文件中读取结果。
我遇到了并发问题:不同的进程有时会检查是否已经计算出子子问题(通过查找结果将被存储的文件),查看它没有,运行计算,然后尝试将结果写入同一个文件中。 我如何避免写这样的冲突?
@ GP89提到了一个很好的解决scheme。 使用队列将写入任务发送到具有唯一写入权限的专用进程。 所有其他工作人员只能读取访问权限。 这将消除冲突。 这是一个使用apply_async的例子,但它也可以和map一起工作:
import multiprocessing as mp import time fn = 'c:/temp/temp.txt' def worker(arg, q): '''stupidly simulates long running process''' start = time.clock() s = 'this is a test' txt = s for i in xrange(200000): txt += s done = time.clock() - start with open(fn, 'rb') as f: size = len(f.read()) res = 'Process' + str(arg), str(size), done q.put(res) return res def listener(q): '''listens for messages on the q, writes to file. ''' f = open(fn, 'wb') while 1: m = q.get() if m == 'kill': f.write('killed') break f.write(str(m) + '\n') f.flush() f.close() def main(): #must use Manager queue here, or will not work manager = mp.Manager() q = manager.Queue() pool = mp.Pool(mp.cpu_count() + 2) #put listener to work first watcher = pool.apply_async(listener, (q,)) #fire off workers jobs = [] for i in range(80): job = pool.apply_async(worker, (i, q)) jobs.append(job) # collect results from the workers through the pool result queue for job in jobs: job.get() #now we are done, kill the listener q.put('kill') pool.close() if __name__ == "__main__": main()
祝你好运,
麦克风
在我看来,您需要使用pipe理器来临时保存您的结果到列表,然后将结果从列表中写入文件。 另外,使用starmap传递要处理的对象和托pipe列表。 第一步是构build要传递给包含托pipe列表的starmap的参数。
from multiprocessing import Manager from multiprocessing import Pool import pandas as pd``` def worker(row, param): # do something here and then append it to row x = param**2 row.append(x) if __name__ == '__main__': pool_parameter = [] # list of objects to process with Manager() as mgr: row = mgr.list([]) # build list of parameters to send to starmap for param in pool_parameter: params.append([row,param]) with Pool() as p: p.starmap(worker, params)
从这一点上,你需要决定如何处理这个列表。 如果你有大量的RAM和一个庞大的数据集,可以自由地连接使用pandas。 然后你可以非常容易地保存文件作为CSV或泡菜。
df = pd.concat(row, ignore_index=True) df.to_pickle('data.pickle') df.to_csv('data.csv')