多进程是Python中的一个强大的工具,我想更深入地理解它。 我想知道什么时候使用常规的 锁和队列以及何时使用多处理器pipe理器在所有进程之间共享这些。 我想出了四种不同的多处理条件下的testing场景: 使用池和NOpipe理器 使用一个池和一个经理 使用单个进程和NOpipe理器 使用单独的进程和pipe理器 工作 所有的条件执行工作职能the_job 。 the_job包含一些由锁保护的打印。 而且,函数的input只是简单地放入一个队列(查看是否可以从队列中恢复)。 这个input只是在主脚本start_scenario创build的range(10)的索引idx (显示在底部)。 def the_job(args): """The job for multiprocessing. Prints some stuff secured by a lock and finally puts the input into a queue. """ idx = args[0] lock = args[1] queue=args[2] lock.acquire() print 'I' print 'was ' print 'here ' print '!!!!' […]
我试图运行2个函数在单个RDD上使用pyspark并行执行完全独立的转换。 有什么办法可以做同样的事情? def doXTransforms(sampleRDD): (X transforms) def doYTransforms(sampleRDD): (Y Transforms) if __name__ == "__main__": sc = SparkContext(appName="parallelTransforms") sqlContext = SQLContext(sc) hive_context = HiveContext(sc) rows_rdd = hive_context.sql("select * from tables.X_table") p1 = Process(target=doXTransforms , args=(rows_rdd,)) p1.start() p2 = Process(target=doYTransforms, args=(rows_rdd,)) p2.start() p1.join() p2.join() sc.stop() 这不起作用,我现在明白这是行不通的。 但是有没有其他方法可以使这项工作? 具体是否有任何python-spark特定的解决scheme?
在下面的示例代码中,我想恢复函数worker的返回值。 我怎么能这样做? 这个值在哪里存储? 示例代码: import multiprocessing def worker(procnum): '''worker function''' print str(procnum) + ' represent!' return procnum if __name__ == '__main__': jobs = [] for i in range(5): p = multiprocessing.Process(target=worker, args=(i,)) jobs.append(p) p.start() for proc in jobs: proc.join() print jobs 输出: 0 represent! 1 represent! 2 represent! 3 represent! 4 represent! [<Process(Process-1, stopped)>, <Process(Process-2, […]
下面的代码,不打印"here" 。 问题是什么? 我在我的两台机器(Windows 7,Ubuntu 12.10)和http://www.compileonline.com/execute_python_online.php上testing了它,在所有情况下都不打印"here" 。 from multiprocessing import Queue, Process def runLang(que): print "start" myDict=dict() for i in xrange(10000): myDict[i]=i que.put(myDict) print "finish" def run(fileToAnalyze): que=Queue() processList=[] dicList=[] langs= ["chi","eng"] for lang in langs: p=Process(target=runLang,args=(que,)) processList.append(p) p.start() for p1 in processList: p1.join() print "here" for _ in xrange(len(langs)): item=que.get() print item dicList.append(item) if […]