5.4 示例三:采用ToolRunner的QuasiMonteCarlo
除新、老两种API外,Hadoop还提供了另一种开发Hadoop应用的方法和提交作业的途径,那就是把应用做成对Tool界面的实现,再借助Hadoop的ToolRunner来运行这个实现了Tool界面的对象。我们这里要介绍的示例三QuasiMonteCarlo就采用这种方法,其源码文件在hadoop-mapreduce-proj ect/hadoop-mapreduce-examples/src目录下面,相对路径名为main/java/org/apache/hadoop/examples/QuasiMonteCarlo.java。
QuasiMonteCarlo这个应用采用准蒙特卡罗方法计算圆周率π,即Pi。其思路是这样的:在单位正方形里以[0.5,0.5]为圆心、以0.5为半径作内切圆,则圆的面积为0.25π,即π/4。这样,只要能得出这个内切圆的面积,就知道π是什么数值了。那么,怎样才能算得这个内切圆的面积呢?我们可以把这个正方形打上很多网格,然后统计所有的格点,看有多少落在圆内(包括圆周上),有多少落在圆外,二者之和就是格点总数。因为格点总数代表着单位正方形的面积,即1,落在圆内的格点数与总数之比就是圆的面积。如果精度不够,就把网格打得再细一些。至于怎样判断一个格点是否落在圆内或圆周上,那是很简单的,因为根据每个格点的坐标可以算得其与圆心的距离,如果大于0.5就是落在圆外。
对于π的一次计算,网格一旦确定之后就不再变动,对每一个格点的计算都是独立的,并不依赖于其他格点的计算,理论上所有格点的计算都可以并行,但由于格点的数量很大(不然精度不够),实际上只能按块划分。可见,这是一种典型的“大数据小计算”的算法,这样的算法很适合MapReduce。不采用此类算法的话,π的计算是很难并行化的。
显然,这种方法是在拼计算能力,以计算能力换取算法的简化和并行化;某种意义上这也正是大数据处理的本质所在。
鉴于格点的数量很大,每个格点的计算都要并行是不现实的,所以实际上我们总是把这单位正方形划分成若干条块,每一条块中仍有很多个格点,把每一条块的格点计算交给一个Mapper。这样,条块之间是并行的,但是条块之内仍是串行的,至于分多少个条块(需要多少个Mapper)就视实际情况而定了。
先看一下QuasiMonteCarlo这个类的结构摘要:
class QuasiMonteCarlo extends Configured implements Tool {} ]class HaltonSequence{} //二维Halton Sequence是用来生成取样点的算法 ]class QmcMapper extends Mapper<… > {} ]]map(LongWritable offset, LongWritable size, Context context) ]class QmcReducer extends Reducer<…, WritableComparable<? >, Writable> {} ]]reduce(BooleanWritable isInside, Iterable<LongWritable> values, Context context) ]estimatePi(int numMaps, long numPoints, Path tmpDir, Configuration conf) ]run(String[]args) ]main(String[]argv) > System.exit(ToolRunner.run(null, new QuasiMonteCarlo(), argv))
这个Java类有main()方法,所以它本身是个独立的JVM可执行程序。此外,这个类一方面是对Configured类的扩充;另一方面又实现了Tool界面。那么Tool界面是什么样的呢?其实这是个很简单的界面:
public interface Tool extends Configurable { int run(String []args)throws Exception; }
这是对界面Configurable的扩展,在Configurable的基础上增加了一个方法run()。而Configurable也是一个很简单的界面:
public interface Configurable { /** Set the configuration to be used by this object.*/ void setConf(Configuration conf); /** Return the configuration used by this object.*/ Configuration getConf(); }
所以,界面Tool只定义了三个方法,即run()、setConf()和getConf()。其中setConf()和getConf()已由Configuradle类实现,QuasiMonteCarlo类则补上了run()的实现。后面我们会看到,只要把实现了Tool界面的类交给ToolRunner.run(),后者就会为其创建一个对象并调用其run()方法,其余就是这个run()方法里面的事了。QuasiMonteCarlo的其他方法都是用户自己定义的。
从QuasiMonteCarlo的摘要可以看出,这个类里面有内嵌的QmcMapper和QmcReducer,二者分别是对Mapper和Reducer的扩充。由此可见,实际上采用的也是新API,而实现着Tool界面的QuasiMonteCarlo类在某种意义上是对新API的包装。不过,这并不意味着Tool界面跟新API有什么内在的联系,实现Tool界面的类也可以采用老API,那都是run()以内的事。
此外还有个内嵌的类HaltonSequence,这跟MapReduce框架没什么关系,只是应用层上由用户定义的一个类,是实现前述准蒙特卡罗方法之所需。
除了这几个内嵌的类和方法run()、main()以外,还有个函数就是estimatePi(),等一下我们会看到它的代码。
如上所述,实现Tool界面的类只需提供一个run()方法,让ToolRunner调用就行了。所以QuasiMonteCarlo的main()方法很简单,就是创建一个QuasiMonteCarlo对象,把它连同命令行参数一起交给ToolRunner,后者就会将其当成一个命令行来执行。
[QuasiMonteCarlo.main()] public static void main(String[]argv)throws Exception { System.exit(ToolRunner.run(null, new QuasiMonteCarlo(), argv)); }
注意,通过new操作创建一个对象,例如QuasiMonteCarlo类的对象时,执行的是这个类的构造方法QuasiMonteCarlo(),与其main()方法无关;而在启动一个JVM时,作为目标可执行类加载以后执行的则是其main()方法,但却并不直接创建这个类。
在这里,在QuasiMonteCarlo的main()方法中,则通过new操作创建一个QuasiMonteCarlo对象,并将其作为参数之一来调用ToolRunner.run(),成为其调用参数tool。我们看一下ToolRunner的摘要,ToolRunner也是由Hadoop定义和提供的:
class ToolRunner {} ]run(Configuration conf, Tool tool, String[]args) > parser=new GenericOptionsParser(conf, args)//创建一个可选项解析器 > tool.setConf(conf) > String[]toolArgs=parser.getRemainingArgs()//获取命令行中对于所启用工具的参数 > tool.run(toolArgs) //启动工具的运行 ]printGenericCommandUsage(PrintStream out) > GenericOptionsParser.printGenericCommandUsage(out) ]confirmPrompt(String prompt)
可见ToolRunner的核心就是run()。运行什么呢?那就是实现了Tool界面的某个类的对象。除此之外,还有个方法是printGenericCommandUsage(),那只是在人机界面上显示一段怎么使用命令行的提示。另一个方法是confirmPrompt(),用来在屏幕上显示一个提示,然后要求用户输入Y或N。所以,ToolRunner就是用来帮助运行Tool的,是帮助我们运用“工具”的工具。
从ToolRunner的摘要可见,它的run()方法也很简单,说到底就是调用想要运行的Tool的run()方法,只是加上了对命令行的解析处理而已。读者或许要问:既然如此,那我在QuasiMonteCarlo.main()中直接调用run(),而不到ToolRunner来这么绕一下,是否可以呢?其实也无妨,ToolRunner只是为你提供帮助,让你的编程方便一些而已。
这样,我们的流程就到了QuasiMonteCarlo.run():
[QuasiMonteCarlo.main()> ToolRunner.run()> QuasiMonteCarlo.run()] public int run(String[]args)throws Exception { if (args.length! =2){ //应该有两个命令行参数,即nMaps和nSamples System.err.println("Usage:"+getClass().getName()+"<nMaps> <nSamples>"); ToolRunner.printGenericCommandUsage(System.err); //ToolRunner提供的便利 return 2; //出错返回 } final int nMaps=Integer.parseInt(args[0]); final long nSamples=Long.parseLong(args[1]); long now=System.currentTimeMillis(); int rand=new Random().nextInt(Integer.MAX_VALUE); //产生一个随机数 final Path tmpDir=new Path(TMP_DIR_PREFIX+"_"+now+"_"+rand); //临时目录 System.out.println("Number of Maps ="+nMaps); System.out.println("Samples per Map="+nSamples); //计算Pi并显示 System.out.println("Estimated value of Pi is" +estimatePi(nMaps, nSamples, tmpDir, getConf())); return 0; //正常返回 }
这也是很简单的程序,把代码在这里列出只是为了让读者对此类程序有点直观的印象。显然,这里的核心在于对方法estimatePi()的调用。如前所述,对Pi值的估算是典型的大数据小计算,是最适合MapReduce并行计算的,这当然要提交给YARN。
[QuasiMonteCarlo.main()> ToolRunner.run()> QuasiMonteCarlo.run()> estimatePi()] public static BigDecimal estimatePi(int numMaps, long numPoints, Path tmpDir, Configuration conf)throws IOException, ClassNotFoundException, InterruptedException{ //参数numMaps表示把正方形分成几块,实际上也是Mapper的个数 //参数numPoints表示每块之中有多少点数 Job j ob=new Job(conf); //setup j ob conf job.setJobName(QuasiMonteCarlo.class.getSimpleName()); job.setJarByClass(QuasiMonteCarlo.class);
job.setInputFormatClass(SequenceFileInputFormat.class); //输入文件格式 job.setOutputKeyClass(BooleanWritable.class); job.setOutputValueClass(LongWritable.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); //输出文件格式 j ob.setMapperClass(QmcMapper.class); j ob.setReducerClass(QmcReducer.class); j ob.setNumReduceTasks(1); //只要1 个reducer //turn off speculative execution, because DFS doesn't handle //multiple writers to the same file. job.setSpeculativeExecution(false); //不要后备执行 //setup input/output directories final Path inDir=new Path(tmpDir, "in"); final Path outDir=new Path(tmpDir, "out"); FileInputFormat.setInputPaths(job, inDir); //本作业的输入目录为“in” FileOutputFormat.setOutputPath(job, outDir); //本作业的输出目录为“out” final FileSystem fs=FileSystem.get(conf); //获取所使用的文件系统 if (fs.exists(tmpDir)){ //如果参数给定的临时目录业已存在,有冲突 throw new IOException("Tmp directory"+fs.makeQualified(tmpDir) +"already exists. Please remove it first."); //因发生IOException异常而终止运行 } if (! fs.mkdirs(inDir)){//如果创建输入目录失败 throw new IOException("Cannot create input directory"+inDir); //异常,终止运行 } try { //generate an input file for each map task for(int i=0; i < numMaps; ++i){ final Path file=new Path(inDir, "part"+i); //输入目录中的文件名“part0”、“part1”…… final LongWritable offset=new LongWritable(i * numPoints); //本块起点位移 final LongWritable size=new LongWritable(numPoints); //本块大小(点数) final SequenceFile.Writer writer=SequenceFile.createWriter(fs, conf, file, LongWritable.class, LongWritable.class, CompressionType.NONE); try{ writer.append(offset, size); //将位移和大小写入本块的输入文件 }finally{ writer.close(); //写完后就关闭本块的输入文件
} System.out.println("Wrote input for Map #"+i); } //end for //start a map/reduce j ob System.out.println("Starting Job"); final long startTime=System.currentTimeMillis(); job.waitForCompletion(true); //通过新API提交作业 final double duration=(System.currentTimeMillis()-startTime)/1000.0; //统计运行时间 System.out.println("Job Finished in"+duration+"seconds"); //read outputs Path inFile=new Path(outDir, "reduce-out"); //输出目录中的文件名“reduce-out” LongWritable numInside=new LongWritable(); //创建对象,用于落在圆内的点数 LongWritable numOutside=new LongWritable(); //创建对象,用于落在圆外的点数 SequenceFile.Reader reader=new SequenceFile.Reader(fs, inFile, conf); //打开文件 try{ reader.next(numInside, numOutside); //从“reduce-out”读取落在圆内/圆外的点数 }finally{ reader.close(); //读完后关闭文件 } //compute estimated value final BigDecimal numTotal //这是总的点数 =BigDecimal.valueOf(numMaps).multiply(BigDecimal.valueOf(numPoints)); return BigDecimal.valueOf(4).setScale(20) //精度为小数点后20位 .multiply(BigDecimal.valueOf(numInside.get())) .divide(numTotal, RoundingMode.HALF_UP); //计算圆内点数与总数的比率 }finally { fs.delete(tmpDir, true); //最后把临时目录删掉 } }
代码中已添加了注释,再结合前面的说明,读者理解起来应该不会感到困难,就不再详细解释了。
这里调用job.waitForCompletion()之前的那一段就是我们已经熟悉了的准备作业单的过程。MapReduce的输出根据FileOutputFormat.setOutputPath()的设置写入目录“out”中的文件“reduce-out”。
同样,还是通过job.waitForCompletion()提交作业并监视其执行直到结束。当程序从job.waitForCompletion()返回时,MapReduce计算已经完成,然后就是从文件“reduce-out”读出落入圆内的格点数量,算出π的数值。不过这里采用的不是一般的整数和浮点数,而是LongWritable和BigDecimal,因为一般整数和浮点数的容量和精度可能不够(这里设置的精度为小数点后20位)。
这里要简单提一下“后备执行(speculative execution)”。这是指Hadoop提供的一种机制,就是在根据用户要求安排一定数量的Mapper和Reducer的同时,还要准备好若干作为后备的Mapper和Reducer,如果运行中发现某个Mapper或Reducer停滞不前或有别的反常现象,就安排后备的Mapper或Reducer顶上去,就像打仗时的“预备队”一样。但是,这里通过job.setSpeculativeExecution(false)否定了预备队的使用,注释中说这是因为Hadoop的分布式文件系统DFS不能处理多个线程同时写入同一文件。注意,这里Reducer的数量只是1,但是后备Reducer顶上去的时候可能会有一小段时间与处于“一线”的Reducer重叠。
至于QmcMapper和QmcReducer的代码,我们就不深入进去了,读者可以自己看源文件。这里只是简单提一下。QmcMapper的数量应该等于numMaps,每个Mapper对应着一个输入文件,如part0、part1等,并从输入文件读入一个offset和一个size。offset决定了分块所在的位置,size就是需要生成的点数。随后,每个Mapper都进入一个循环,依次调用HaltonSequence这个类所提供的方法,生成一个点,然后根据这个点的坐标计算其与圆心的距离,并对落在圆内和圆外的点分别计数。循环结束,即生成并计算了size个点之后,就把两个计数发送给Reducer。而Reducer则接受来自所有Mapper发来的计数并加以汇总,并把结果写入输出文件。可见,在这个示例中Reducer的负担是很轻的。
总之,通过ToolRunner提交作业的方法,实质上与前面两种并无不同,只不过这样一来增加了一些灵活性,可以像别的Tool一样被启用而已。
从Job.waitForCompletion()往前一步,就是Job.submit()了。