大数据处理系统:Hadoop源代码情景分析
上QQ阅读APP看书,第一时间看更新

5.4 示例三:采用ToolRunner的QuasiMonteCarlo

除新、老两种API外,Hadoop还提供了另一种开发Hadoop应用的方法和提交作业的途径,那就是把应用做成对Tool界面的实现,再借助Hadoop的ToolRunner来运行这个实现了Tool界面的对象。我们这里要介绍的示例三QuasiMonteCarlo就采用这种方法,其源码文件在hadoop-mapreduce-proj ect/hadoop-mapreduce-examples/src目录下面,相对路径名为main/java/org/apache/hadoop/examples/QuasiMonteCarlo.java。

QuasiMonteCarlo这个应用采用准蒙特卡罗方法计算圆周率π,即Pi。其思路是这样的:在单位正方形里以[0.5,0.5]为圆心、以0.5为半径作内切圆,则圆的面积为0.25π,即π/4。这样,只要能得出这个内切圆的面积,就知道π是什么数值了。那么,怎样才能算得这个内切圆的面积呢?我们可以把这个正方形打上很多网格,然后统计所有的格点,看有多少落在圆内(包括圆周上),有多少落在圆外,二者之和就是格点总数。因为格点总数代表着单位正方形的面积,即1,落在圆内的格点数与总数之比就是圆的面积。如果精度不够,就把网格打得再细一些。至于怎样判断一个格点是否落在圆内或圆周上,那是很简单的,因为根据每个格点的坐标可以算得其与圆心的距离,如果大于0.5就是落在圆外。

对于π的一次计算,网格一旦确定之后就不再变动,对每一个格点的计算都是独立的,并不依赖于其他格点的计算,理论上所有格点的计算都可以并行,但由于格点的数量很大(不然精度不够),实际上只能按块划分。可见,这是一种典型的“大数据小计算”的算法,这样的算法很适合MapReduce。不采用此类算法的话,π的计算是很难并行化的。

显然,这种方法是在拼计算能力,以计算能力换取算法的简化和并行化;某种意义上这也正是大数据处理的本质所在。

鉴于格点的数量很大,每个格点的计算都要并行是不现实的,所以实际上我们总是把这单位正方形划分成若干条块,每一条块中仍有很多个格点,把每一条块的格点计算交给一个Mapper。这样,条块之间是并行的,但是条块之内仍是串行的,至于分多少个条块(需要多少个Mapper)就视实际情况而定了。

先看一下QuasiMonteCarlo这个类的结构摘要:

      class QuasiMonteCarlo extends Configured implements Tool {}
      ]class HaltonSequence{} //二维Halton Sequence是用来生成取样点的算法
      ]class QmcMapper extends Mapper<… > {}
      ]]map(LongWritable offset, LongWritable size, Context context)
      ]class QmcReducer extends Reducer<…, WritableComparable<? >, Writable> {}
      ]]reduce(BooleanWritable isInside, Iterable<LongWritable> values, Context context)
      ]estimatePi(int numMaps, long numPoints, Path tmpDir, Configuration conf)
      ]run(String[]args)
      ]main(String[]argv)
        > System.exit(ToolRunner.run(null, new QuasiMonteCarlo(), argv))

这个Java类有main()方法,所以它本身是个独立的JVM可执行程序。此外,这个类一方面是对Configured类的扩充;另一方面又实现了Tool界面。那么Tool界面是什么样的呢?其实这是个很简单的界面:

      public interface Tool extends Configurable {
        int run(String []args)throws Exception;
      }

这是对界面Configurable的扩展,在Configurable的基础上增加了一个方法run()。而Configurable也是一个很简单的界面:

      public interface Configurable {
          /** Set the configuration to be used by this object.*/
        void setConf(Configuration conf);
          /** Return the configuration used by this object.*/
        Configuration getConf();
      }

所以,界面Tool只定义了三个方法,即run()、setConf()和getConf()。其中setConf()和getConf()已由Configuradle类实现,QuasiMonteCarlo类则补上了run()的实现。后面我们会看到,只要把实现了Tool界面的类交给ToolRunner.run(),后者就会为其创建一个对象并调用其run()方法,其余就是这个run()方法里面的事了。QuasiMonteCarlo的其他方法都是用户自己定义的。

从QuasiMonteCarlo的摘要可以看出,这个类里面有内嵌的QmcMapper和QmcReducer,二者分别是对Mapper和Reducer的扩充。由此可见,实际上采用的也是新API,而实现着Tool界面的QuasiMonteCarlo类在某种意义上是对新API的包装。不过,这并不意味着Tool界面跟新API有什么内在的联系,实现Tool界面的类也可以采用老API,那都是run()以内的事。

此外还有个内嵌的类HaltonSequence,这跟MapReduce框架没什么关系,只是应用层上由用户定义的一个类,是实现前述准蒙特卡罗方法之所需。

除了这几个内嵌的类和方法run()、main()以外,还有个函数就是estimatePi(),等一下我们会看到它的代码。

如上所述,实现Tool界面的类只需提供一个run()方法,让ToolRunner调用就行了。所以QuasiMonteCarlo的main()方法很简单,就是创建一个QuasiMonteCarlo对象,把它连同命令行参数一起交给ToolRunner,后者就会将其当成一个命令行来执行。

      [QuasiMonteCarlo.main()]


        public static void main(String[]argv)throws Exception {
          System.exit(ToolRunner.run(null, new QuasiMonteCarlo(), argv));
        }

注意,通过new操作创建一个对象,例如QuasiMonteCarlo类的对象时,执行的是这个类的构造方法QuasiMonteCarlo(),与其main()方法无关;而在启动一个JVM时,作为目标可执行类加载以后执行的则是其main()方法,但却并不直接创建这个类。

在这里,在QuasiMonteCarlo的main()方法中,则通过new操作创建一个QuasiMonteCarlo对象,并将其作为参数之一来调用ToolRunner.run(),成为其调用参数tool。我们看一下ToolRunner的摘要,ToolRunner也是由Hadoop定义和提供的:

      class ToolRunner {}
      ]run(Configuration conf, Tool tool, String[]args)
        > parser=new GenericOptionsParser(conf, args)//创建一个可选项解析器
        > tool.setConf(conf)
        > String[]toolArgs=parser.getRemainingArgs()//获取命令行中对于所启用工具的参数tool.run(toolArgs)                     //启动工具的运行
      ]printGenericCommandUsage(PrintStream out)
        > GenericOptionsParser.printGenericCommandUsage(out)
      ]confirmPrompt(String prompt)

可见ToolRunner的核心就是run()。运行什么呢?那就是实现了Tool界面的某个类的对象。除此之外,还有个方法是printGenericCommandUsage(),那只是在人机界面上显示一段怎么使用命令行的提示。另一个方法是confirmPrompt(),用来在屏幕上显示一个提示,然后要求用户输入Y或N。所以,ToolRunner就是用来帮助运行Tool的,是帮助我们运用“工具”的工具。

从ToolRunner的摘要可见,它的run()方法也很简单,说到底就是调用想要运行的Tool的run()方法,只是加上了对命令行的解析处理而已。读者或许要问:既然如此,那我在QuasiMonteCarlo.main()中直接调用run(),而不到ToolRunner来这么绕一下,是否可以呢?其实也无妨,ToolRunner只是为你提供帮助,让你的编程方便一些而已。

这样,我们的流程就到了QuasiMonteCarlo.run():

      [QuasiMonteCarlo.main()> ToolRunner.run()> QuasiMonteCarlo.run()]


      public int run(String[]args)throws Exception {
          if (args.length! =2){ //应该有两个命令行参数,即nMapsnSamples
            System.err.println("Usage:"+getClass().getName()+"<nMaps> <nSamples>");
            ToolRunner.printGenericCommandUsage(System.err); //ToolRunner提供的便利
            return 2; //出错返回
          }
          final int nMaps=Integer.parseInt(args[0]);
          final long nSamples=Long.parseLong(args[1]);
          long now=System.currentTimeMillis();
          int rand=new Random().nextInt(Integer.MAX_VALUE); //产生一个随机数
          final Path tmpDir=new Path(TMP_DIR_PREFIX+"_"+now+"_"+rand); //临时目录


          System.out.println("Number of Maps ="+nMaps);
          System.out.println("Samples per Map="+nSamples);
            //计算Pi并显示
          System.out.println("Estimated value of Pi is"
                                      +estimatePi(nMaps, nSamples, tmpDir, getConf()));
          return 0;  //正常返回
      }

这也是很简单的程序,把代码在这里列出只是为了让读者对此类程序有点直观的印象。显然,这里的核心在于对方法estimatePi()的调用。如前所述,对Pi值的估算是典型的大数据小计算,是最适合MapReduce并行计算的,这当然要提交给YARN。

      [QuasiMonteCarlo.main()> ToolRunner.run()> QuasiMonteCarlo.run()> estimatePi()]


      public static BigDecimal estimatePi(int numMaps, long numPoints, Path tmpDir,
          Configuration conf)throws IOException, ClassNotFoundException, InterruptedException{
          //参数numMaps表示把正方形分成几块,实际上也是Mapper的个数
          //参数numPoints表示每块之中有多少点数
          Job j ob=new Job(conf);
          //setup j ob conf
          job.setJobName(QuasiMonteCarlo.class.getSimpleName());
          job.setJarByClass(QuasiMonteCarlo.class);
          job.setInputFormatClass(SequenceFileInputFormat.class); //输入文件格式
          job.setOutputKeyClass(BooleanWritable.class);
          job.setOutputValueClass(LongWritable.class);
          job.setOutputFormatClass(SequenceFileOutputFormat.class); //输出文件格式


          j ob.setMapperClass(QmcMapper.class);
          j ob.setReducerClass(QmcReducer.class);
          j ob.setNumReduceTasks(1);  //只要1 reducer


          //turn off speculative execution, because DFS doesn't handle
          //multiple writers to the same file.
          job.setSpeculativeExecution(false); //不要后备执行


          //setup input/output directories
          final Path inDir=new Path(tmpDir, "in");
          final Path outDir=new Path(tmpDir, "out");
          FileInputFormat.setInputPaths(job, inDir);   //本作业的输入目录为“in”
          FileOutputFormat.setOutputPath(job, outDir); //本作业的输出目录为“out”


          final FileSystem fs=FileSystem.get(conf); //获取所使用的文件系统
          if (fs.exists(tmpDir)){ //如果参数给定的临时目录业已存在,有冲突
            throw new IOException("Tmp directory"+fs.makeQualified(tmpDir)
                +"already exists. Please remove it first."); //因发生IOException异常而终止运行
          }
          if (! fs.mkdirs(inDir)){//如果创建输入目录失败
            throw new IOException("Cannot create input directory"+inDir); //异常,终止运行
          }


          try {
            //generate an input file for each map task
            for(int i=0; i < numMaps; ++i){
              final Path file=new Path(inDir, "part"+i); //输入目录中的文件名“part0”、“part1”……
              final LongWritable offset=new LongWritable(i * numPoints); //本块起点位移
              final LongWritable size=new LongWritable(numPoints);    //本块大小(点数)
              final SequenceFile.Writer writer=SequenceFile.createWriter(fs, conf, file,
                          LongWritable.class, LongWritable.class, CompressionType.NONE);
              try{
                writer.append(offset, size); //将位移和大小写入本块的输入文件
              }finally{
                writer.close(); //写完后就关闭本块的输入文件
              }
              System.out.println("Wrote input for Map #"+i);
              } //end for


              //start a map/reduce j ob
              System.out.println("Starting Job");
              final long startTime=System.currentTimeMillis();
              job.waitForCompletion(true);     //通过新API提交作业
              final double duration=(System.currentTimeMillis()-startTime)/1000.0; //统计运行时间
              System.out.println("Job Finished in"+duration+"seconds");


              //read outputs
              Path inFile=new Path(outDir, "reduce-out"); //输出目录中的文件名“reduce-out”
              LongWritable numInside=new LongWritable(); //创建对象,用于落在圆内的点数
              LongWritable numOutside=new LongWritable(); //创建对象,用于落在圆外的点数
              SequenceFile.Reader reader=new SequenceFile.Reader(fs, inFile, conf); //打开文件
              try{
                reader.next(numInside, numOutside); //“reduce-out”读取落在圆内/圆外的点数
              }finally{
                reader.close(); //读完后关闭文件
              }


              //compute estimated value
              final BigDecimal numTotal //这是总的点数
                  =BigDecimal.valueOf(numMaps).multiply(BigDecimal.valueOf(numPoints));
              return BigDecimal.valueOf(4).setScale(20)   //精度为小数点后20
                  .multiply(BigDecimal.valueOf(numInside.get()))
                  .divide(numTotal, RoundingMode.HALF_UP);  //计算圆内点数与总数的比率
            }finally {
              fs.delete(tmpDir, true); //最后把临时目录删掉
            }
          }

代码中已添加了注释,再结合前面的说明,读者理解起来应该不会感到困难,就不再详细解释了。

这里调用job.waitForCompletion()之前的那一段就是我们已经熟悉了的准备作业单的过程。MapReduce的输出根据FileOutputFormat.setOutputPath()的设置写入目录“out”中的文件“reduce-out”。

同样,还是通过job.waitForCompletion()提交作业并监视其执行直到结束。当程序从job.waitForCompletion()返回时,MapReduce计算已经完成,然后就是从文件“reduce-out”读出落入圆内的格点数量,算出π的数值。不过这里采用的不是一般的整数和浮点数,而是LongWritable和BigDecimal,因为一般整数和浮点数的容量和精度可能不够(这里设置的精度为小数点后20位)。

这里要简单提一下“后备执行(speculative execution)”。这是指Hadoop提供的一种机制,就是在根据用户要求安排一定数量的Mapper和Reducer的同时,还要准备好若干作为后备的Mapper和Reducer,如果运行中发现某个Mapper或Reducer停滞不前或有别的反常现象,就安排后备的Mapper或Reducer顶上去,就像打仗时的“预备队”一样。但是,这里通过job.setSpeculativeExecution(false)否定了预备队的使用,注释中说这是因为Hadoop的分布式文件系统DFS不能处理多个线程同时写入同一文件。注意,这里Reducer的数量只是1,但是后备Reducer顶上去的时候可能会有一小段时间与处于“一线”的Reducer重叠。

至于QmcMapper和QmcReducer的代码,我们就不深入进去了,读者可以自己看源文件。这里只是简单提一下。QmcMapper的数量应该等于numMaps,每个Mapper对应着一个输入文件,如part0、part1等,并从输入文件读入一个offset和一个size。offset决定了分块所在的位置,size就是需要生成的点数。随后,每个Mapper都进入一个循环,依次调用HaltonSequence这个类所提供的方法,生成一个点,然后根据这个点的坐标计算其与圆心的距离,并对落在圆内和圆外的点分别计数。循环结束,即生成并计算了size个点之后,就把两个计数发送给Reducer。而Reducer则接受来自所有Mapper发来的计数并加以汇总,并把结果写入输出文件。可见,在这个示例中Reducer的负担是很轻的。

总之,通过ToolRunner提交作业的方法,实质上与前面两种并无不同,只不过这样一来增加了一些灵活性,可以像别的Tool一样被启用而已。

从Job.waitForCompletion()往前一步,就是Job.submit()了。