在Hadoop中链接多个MapReduce作业
在应用MapReduce的许多真实情况下,最终的algorithm最终会成为几个MapReduce步骤。
即Map1,Reduce1,Map2,Reduce2等等。
所以你有最后一次减less的输出作为下一个地图的input。
中间数据是您(一般情况下)一旦pipe道成功完成后不想保留的东西。 另外,因为这个中间数据通常是一些数据结构(比如'map'或者'set'),所以你不需要花费太多精力来编写和读取这些键值对。
在Hadoop中推荐的方法是什么?
是否有一个(简单的)例子说明如何以正确的方式处理这些中间数据,包括之后的清理?
我认为这个关于雅虎开发者networking的教程将帮助你: 链接作业
您使用JobClient.runJob()
。 来自第一份工作的数据的输出path成为第二份工作的inputpath。 这些需要通过适当的代码作为parameter passing给您的作业,以parsing它们并设置作业的参数。
我认为上面的方法可能是现在较老的mapred API的方式,但它应该仍然有效。 在新的mapreduce API中会有类似的方法,但我不确定它是什么。
至于在作业完成后删除中间数据,您可以在代码中执行此操作。 我之前完成的方式是使用类似于:
FileSystem.delete(Path f, boolean recursive);
path是数据在HDFS上的位置。 您需要确保只有在没有其他工作需要时才删除此数据。
有很多方法可以做到这一点。
(1)级联工作
为第一个作业创buildJobConf对象“job1”,并将所有参数设置为“input”作为input目录,“temp”作为输出目录。 执行此项工作:
JobClient.run(job1).
紧接着它,为第二个作业创buildJobConf对象“job2”,并将所有参数设置为“temp”作为input目录,“output”作为输出目录。 执行此项工作:
JobClient.run(job2).
(2)除了不使用JobClient.run之外,创build两个JobConf对象并将其中的所有参数设置为(1) 。
然后用jobconf作为参数创build两个Job对象:
Job job1=new Job(jobconf1); Job job2=new Job(jobconf2);
使用jobControl对象,指定作业依赖关系,然后运行作业:
JobControl jbcntrl=new JobControl("jbcntrl"); jbcntrl.addJob(job1); jbcntrl.addJob(job2); job2.addDependingJob(job1); jbcntrl.run();
(3)如果你需要一个像Map + |的结构 减less| Map *,可以使用Hadoop版本0.19及之后的ChainMapper和ChainReducer类。
干杯
实际上有很多方法可以做到这一点。 我将专注于两个。
一个是通过Riffle( http://github.com/cwensel/riffle )一个注释库来识别相关事物,并以依赖(拓扑)顺序“执行”它们。
或者你可以在Cascading( http://www.cascading.org/ )中使用Cascade(和MapReduceFlow)。 未来版本将支持Riffle注释,但现在使用原始MR JobConf作业效果很好。
一个变种就是不用手工pipe理MR作业,而是使用Cascading API开发你的应用程序。 然后,JobConf和作业链通过Cascading planner和Flow类在内部处理。
这样你就可以将时间花在关注问题上,而不是pipe理Hadoop作业的机制等方面。甚至可以将不同的语言层叠在一起(如clojure或jruby),以进一步简化开发和应用程序。 http://www.cascading.org/modules.html
我已经使用JobConf对象一个接一个地完成了作业链。 我以WordCount为例链接工作。 一份工作能够计算出在给定输出中一个字重复多less次。 第二份工作是将第一份工作输出作为input,并计算给定input中的总字数。 以下是需要放在Driver类中的代码。
//First Job - Counts, how many times a word encountered in a given file JobConf job1 = new JobConf(WordCount.class); job1.setJobName("WordCount"); job1.setOutputKeyClass(Text.class); job1.setOutputValueClass(IntWritable.class); job1.setMapperClass(WordCountMapper.class); job1.setCombinerClass(WordCountReducer.class); job1.setReducerClass(WordCountReducer.class); job1.setInputFormat(TextInputFormat.class); job1.setOutputFormat(TextOutputFormat.class); //Ensure that a folder with the "input_data" exists on HDFS and contains the input files FileInputFormat.setInputPaths(job1, new Path("input_data")); //"first_job_output" contains data that how many times a word occurred in the given file //This will be the input to the second job. For second job, input data name should be //"first_job_output". FileOutputFormat.setOutputPath(job1, new Path("first_job_output")); JobClient.runJob(job1); //Second Job - Counts total number of words in a given file JobConf job2 = new JobConf(TotalWords.class); job2.setJobName("TotalWords"); job2.setOutputKeyClass(Text.class); job2.setOutputValueClass(IntWritable.class); job2.setMapperClass(TotalWordsMapper.class); job2.setCombinerClass(TotalWordsReducer.class); job2.setReducerClass(TotalWordsReducer.class); job2.setInputFormat(TextInputFormat.class); job2.setOutputFormat(TextOutputFormat.class); //Path name for this job should match first job's output path name FileInputFormat.setInputPaths(job2, new Path("first_job_output")); //This will contain the final output. If you want to send this jobs output //as input to third job, then third jobs input path name should be "second_job_output" //In this way, jobs can be chained, sending output one to other as input and get the //final output FileOutputFormat.setOutputPath(job2, new Path("second_job_output")); JobClient.runJob(job2);
运行这些作业的命令是:
bin / hadoop jar TotalWords。
我们需要为命令提供最终的作业名称。 在上面的例子中,是TotalWords。
你可以使用oozie来处理你的MapReduce作业。 http://issues.apache.org/jira/browse/HADOOP-5303
Apache Mahout项目中有一些例子,将多个MapReduce作业链接在一起。 其中一个例子可以在下面find:
RecommenderJob.java
我们可以利用Job的waitForCompletion(true)
方法来定义作业之间的依赖关系。
在我的情况下,我有三个互相依赖的工作。 在驱动程序类中,我使用了下面的代码,它按预期工作。
public static void main(String[] args) throws Exception { // TODO Auto-generated method stub CCJobExecution ccJobExecution = new CCJobExecution(); Job distanceTimeFraudJob = ccJobExecution.configureDistanceTimeFraud(new Configuration(),args[0], args[1]); Job spendingFraudJob = ccJobExecution.configureSpendingFraud(new Configuration(),args[0], args[1]); Job locationFraudJob = ccJobExecution.configureLocationFraud(new Configuration(),args[0], args[1]); System.out.println("****************Started Executing distanceTimeFraudJob ================"); distanceTimeFraudJob.submit(); if(distanceTimeFraudJob.waitForCompletion(true)) { System.out.println("=================Completed DistanceTimeFraudJob================= "); System.out.println("=================Started Executing spendingFraudJob ================"); spendingFraudJob.submit(); if(spendingFraudJob.waitForCompletion(true)) { System.out.println("=================Completed spendingFraudJob================= "); System.out.println("=================Started locationFraudJob================= "); locationFraudJob.submit(); if(locationFraudJob.waitForCompletion(true)) { System.out.println("=================Completed locationFraudJob================="); } } } }
您可以按照代码中给出的方式运行MR链。
请注意 :只提供驱动程序代码
public class WordCountSorting { // here the word keys shall be sorted //let us write the wordcount logic first public static void main(String[] args)throws IOException,InterruptedException,ClassNotFoundException { //THE DRIVER CODE FOR MR CHAIN Configuration conf1=new Configuration(); Job j1=Job.getInstance(conf1); j1.setJarByClass(WordCountSorting.class); j1.setMapperClass(MyMapper.class); j1.setReducerClass(MyReducer.class); j1.setMapOutputKeyClass(Text.class); j1.setMapOutputValueClass(IntWritable.class); j1.setOutputKeyClass(LongWritable.class); j1.setOutputValueClass(Text.class); Path outputPath=new Path("FirstMapper"); FileInputFormat.addInputPath(j1,new Path(args[0])); FileOutputFormat.setOutputPath(j1,outputPath); outputPath.getFileSystem(conf1).delete(outputPath); j1.waitForCompletion(true); Configuration conf2=new Configuration(); Job j2=Job.getInstance(conf2); j2.setJarByClass(WordCountSorting.class); j2.setMapperClass(MyMapper2.class); j2.setNumReduceTasks(0); j2.setOutputKeyClass(Text.class); j2.setOutputValueClass(IntWritable.class); Path outputPath1=new Path(args[1]); FileInputFormat.addInputPath(j2, outputPath); FileOutputFormat.setOutputPath(j2, outputPath1); outputPath1.getFileSystem(conf2).delete(outputPath1, true); System.exit(j2.waitForCompletion(true)?0:1); } }
序列是
( JOB1 )MAP-> REDUCE->( JOB2 )MAP
这样做是为了让键sorting,还有更多的方法,如使用树形图
但是我想把你的注意力集中到乔布斯被链接的方式上!
谢谢
新的类org.apache.hadoop.mapreduce.lib.chain.ChainMapper帮助这种情况
虽然有复杂的基于服务器的Hadoop工作stream引擎,例如oozie,但我有一个简单的java库,可以将多个Hadoop作业作为工作stream执行。 定义内部作业依赖性的作业configuration和工作stream程在JSON文件中configuration。 所有内容都是可以外部configuration的,并且不需要对现有地图缩减实施进行任何更改即可成为工作stream程的一部分。
详细信息可以在这里find。 源代码和jar在github中可用。
http://pkghosh.wordpress.com/2011/05/22/hadoop-orchestration/
普拉纳布
我认为oozie帮助后续的工作直接从以前的工作接收投入。 这样可以避免使用jobcontrol执行的I / O操作。
如果你想以编程的方式链接你的工作,你将无法使用JobControl。 用法很简单:
JobControl jobControl = new JobControl(name);
之后,添加ControlledJob实例。 ControlledJob定义一个具有依赖关系的作业,从而自动插入input和输出以适合作业的“链”。
jobControl.add(new ControlledJob(job, Arrays.asList(controlledjob1, controlledjob2)); jobControl.run();
开始链条。 你会想要把它放在一个敏捷的线程中。 这允许检查它运行的链条的状态:
while (!jobControl.allFinished()) { System.out.println("Jobs in waiting state: " + jobControl.getWaitingJobList().size()); System.out.println("Jobs in ready state: " + jobControl.getReadyJobsList().size()); System.out.println("Jobs in running state: " + jobControl.getRunningJobList().size()); List<ControlledJob> successfulJobList = jobControl.getSuccessfulJobList(); System.out.println("Jobs in success state: " + successfulJobList.size()); List<ControlledJob> failedJobList = jobControl.getFailedJobList(); System.out.println("Jobs in failed state: " + failedJobList.size()); }