大数据处理系统:Hadoop源代码情景分析
上QQ阅读APP看书,第一时间看更新

4.4 RM节点上的RPC服务

我们先看具体的RPC服务端是怎么搭建起来的。

我们从YARN的主节点RM即ResourceManager开始,这是个带main()函数的类,所以会作为一个独立的JVM进程(或者说在一台独立的Java虚拟机上)运行。下面是ResourceManager类的摘要,我们就从其main()函数开始:

      class ResourceManager extends CompositeService {}
      ]main(String argv[])
        > conf=new YarnConfiguration()
        > resourceManager=new ResourceManager()
        > hook=new CompositeServiceShutdownHook(resourceManager)
        > ShutdownHookManager.get().addShutdownHook(hook, …)
        > resourceManager.init(conf) //会调用ResourceManager.serviceInit()
        > resourceManager.start()
      ]ResourceManager()  //构造函数
        > super("ResourceManager")==CompositeService("ResourceManager")
        >> super(name)==AbstractService
      ]serviceInit(Configuration conf)
        > …//load core-site.xml
        > …//load yarn-site.xml
        > this.rmLoginUGI=UserGroupInformation.getCurrentUser()
        > doSecureLogin()
        > rmDispatcher=setupDispatcher()
        > adminService=createAdminService()
        > createAndInitActiveServices()
        >> activeServices=new RMActiveServices(this) //其定义见后
        >> activeServices.init(conf) //会调用下面的RMActiveServices.serviceInit()
        > super.serviceInit(this.conf)
      ]class RMActiveServices extends CompositeService {}
      ]]serviceInit(Configuration configuration)
        > …
        > cl ientRM=createClientRMService()
        >> return new ClientRMService(this.rmContext, scheduler, this.rmAppManager, …)
                                  //创建ClientRMService,这相当于RM面向客户的服务窗口
        > addService(clientRM)
        > …
      ]]serviceStart()

从其main()函数开始,读者应能顺着程序的流程进入createClientRMService(),就是ClientRMService类对象clientRM的创建,然后看这个类的serviceStart():

      [ResourceManager.main()> serviceInit()> createAndInitActiveServices()>
      RMActiveServices.serviceInit()> createClientRMService()> ClientRMService.serviceStart()]


      class ClientRMService extends AbstractService implements ApplicationClientProtocol{}
      ]serviceStart()
        > YarnRPC rpc=YarnRPC.create(conf) //实际创建的是HadoopYarnProtoRPC
        > this.server=rpc.getServer(ApplicationClientProtocol.class, this, clientBindAddress, …)
          ==HadoopYarnProtoRPC.getServer(ApplicationClientProtocol.class, …)
        > this.server.start()
        > super.serviceStart()
      ]getNewApplication(GetNewApplicationRequest request)
      ]submitApplication(SubmitApplicationRequest request)
        > …
        > user=UserGroupInformation.getCurrentUser().getShortUserName()
        > rmAppManager.submitApplication(submissionContext, System.currentTimeMillis(), user)
      ]…//还有好多,此处省略

RM节点上的ClientRMService对象是专门为RM的客户,主要是App服务的。所以这个类提供了例如getNewApplication()、submitApplication()等函数,就是让客户通过RPC来调用的。为此它需要建立自己的RPC服务端,所以才调用YarnRPC.create():

      [ResourceManager.serviceInit()> createAndInitActiveServices()> createClientRMService()
      > RMActiveServices.serviceInit()>createClientRMService()>ClientRMService.serviceStart()
      > YarnRPC.Create()]


      abstract class YarnRPC {}
      ]create(Configuration conf)
        > clazzName=conf.get(YarnConfiguration.IPC_RPC_IMPL)
                      =="org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC"
                                //定义于yarn-default.xml 中的“yarn.ipc.rpc.class”属性
        > if (clazzName==null)clazzName=YarnConfiguration.DEFAULT_IPC_RPC_IMPL
                                //那也是“org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC”
        > return (YarnRPC)Class.forName(clazzName).newInstance()
                                //创建HadoopYarnProtoRPC对象,那是对YarnRPC的扩充

但是具体该创建什么类的对象得要用“yarn.ipc.rpc.class”为键去配置块conf中查询,返回的结果应该是HadoopYarnProtoRPC,所以这里创建的就是一个HadoopYarnProtoRPC类的对象,然后就调用其getServer():

      [ResourceManager.serviceInit()> createAndInitActiveServices()> createClientRMService()
      >RMActiveServices.serviceInit()>createClientRMService()>ClientRMService.serviceStart()
      > YarnRPC.Create()> HadoopYarnProtoRPC.getServer()]


      class HadoopYarnProtoRPC extends YarnRPC {}
      ]getServer(Class protocol, Object instance, InetSocketAddress addr, Configuration conf,
                              SecretManager<? extends TokenIdentifier> secretManager,
                                            int numHandlers, String portRangeConfig)
        > LOG.debug("Creating a HadoopYarnProtoRpc server for protocol"+protocol
                                            +"with"+numHandlers+"handlers")
        > serverFactory=RpcFactoryProvider.getServerFactory(conf)
        >> serverFactoryClassName=
                      conf.get(YarnConfiguration.IPC_SERVER_FACTORY_CLASS,
                          YarnConfiguration.DEFAULT_IPC_SERVER_FACTORY_CLASS)
        >> return (RpcServerFactory)getFactoryClassInstance(serverFactoryClassName)
                                                        //这是RpcServerFactoryPBImpl
        > return serverFactory.getServer(protocol, instance, addr, conf,
                                          secretManager, numHandlers, portRangeConfig)
      ]getProxy(Class protocol, InetSocketAddress addr, Configuration conf)

为了创建RPC服务端,先得获取用来创建这个服务端的serverFactory,这又得去配置块conf中查询,这次的结果是RpcServerFactoryPBImpl,因为默认的就是这个。

      DEFAULT_IPC_SERVER_FACTORY_CLASS=
                "org.apache.hadoop.yarn.factories.impl.pb.RpcServerFactoryPBImpl"
                                                      //默认的 serverFactory

结果这个serverFactory就是RpcServerFactoryPBImpl,再调用其getServer()方法,就得到了服务端针对给定protocol的RPC层Server。注意,在我们现在这个情景中,这里的参数protocol是ApplicationClientProtocol.class。

      [ResourceManager.serviceInit()> createAndInitActiveServices()> createClientRMService()
      >RMActiveServices.serviceInit()>createClientRMService()>ClientRMService.serviceStart()
      > YarnRPC.Create()> HadoopYarnProtoRPC.getServer()
      > RpcServerFactoryPBImpl.getServer()]


      class RpcServerFactoryPBImpl implements RpcServerFactory{}
      ]getServer(Class<? > protocol, Object instance, InetSocketAddress addr, Configuration conf,
                        SecretManager<? extends TokenIdentifier> secretManager,
                                            int numHandlers, String portRangeConfig)
        > Constructor<? > constructor=serviceCache.get(protocol) //说不定已经有缓存了
                      //在我们这个情景中,参数protocol ApplicationClientProtocol.class
        > if (constructor==null){ //如果缓存中还没有
        >+ clazzName=getPbServiceImplClassName(protocol)
        >+> String srcPackagePart=getPackageName(clazz)//参数clazz 就是protocol
        >+> String srcClassName=getClassName(clazz)  //ApplicationClientProtocol
        >+> String destPackagePart=srcPackagePart+"."+PB_IMPL_PACKAGE_SUFFIX
                                                          //"impl.pb"
        >+> String destClassPart=srcClassName+PB_IMPL_CLASS_SUFFIX //"PBServiceImpl"
        >+> return destPackagePart+"."+destClassPart
        >+ pbServiceImplClazz=localConf.getClassByName(clazzName)
                            //这就是“ApplicationClientProtocolPBServiceImpl”
        >+ constructor=pbServiceImplClazz.getConstructor(protocol)
                            //这是ApplicationClientProtocolPBServiceImpl 的构造函数
        >+ constructor.setAccessible(true)
        >+ serviceCache.putIfAbsent(protocol, constructor) //加入缓存
        > }
        > Obj ect service=constructor.newInstance(instance)
                            //创建ApplicationClientProtocolPBServiceImpl 对象
        > Class<? >pbProtocol=service.getClass().getInterfaces()[0]//获取其实现的第一个界面
        >Method method=protoCache.get(protocol) //先在缓存中找
        > if (method==null){ //缓存中还没有,创建并放入缓存
        >+ clazzName=getProtoClassName(protocol)
        >+ Class<? > protoClazz=localConf.getClassByName(clazzName)
                            //这个类是ApplicationClientProtocolService
        >+ method=protoClazz.getMethod("newReflectiveBlockingService",
                                                      pbProtocol.getInterfaces()[0])
        >+ method.setAccessible(true)
        >+ protoCache.putIfAbsent(protocol, method) //加入缓存
        > }
        > blkserv=(BlockingService)method.invoke(null, service)
              ==newReflectiveBlockingService.invoke(null, service)
              //调用ApplicationClientProtocolService.newReflectiveBlockingService()
              //以创建一个实现了protobuf.BlockingService()界面的对象
              //参数 service,ApplicationClientProtocolPBServiceImpl,成为其内部成分 impl
        > return createServer(pbProtocol, addr, conf, secretManager, numHandlers, blkserv,
                                                                    portRangeConfig)

这里,method.invoke()的结果是一个BlockingService对象。BlockingService是在Google Protobuf软件包中定义的一个界面,所以这结果实际上是个实现了BlockingService界面的某类对象。这个类是在抽象类ApplicationClientProtocolService内部动态定义的:

      class ApplicationClientProtocol {}
      ]abstract class ApplicationClientProtocolService{}
      ]]newReflectiveBlockingService(final BlockingInterface impl)
          > return new com.google.protobuf.BlockingService()
                                          //创建一个实现了BlockingService界面的对象
            ]getDescriptorForType()
            ]callBlockingMethod(
                  com.google.protobuf.Descriptors.MethodDescriptor method,
                  com.google.protobuf.RpcController controller,
                  com.google.protobuf.Message request)  //补上需要实现的方法函数
              > switch(method.getIndex()){
              > case 0:return impl.getNewApplication(controller, request)
              > case 1:return impl.getApplicationReport(controller, request)
              > case 2:return impl.submitApplication(controller, request)
                ==ApplicationClientProtocolPBServiceImpl.submitApplication(controller, request)
              >> response=real.submitApplication(request)
                  ==ClientRMService.submitApplication(SubmitApplicationRequest request)
              > …
              > }

所以,上面通过Reflection机制调用newReflectiveBlockingService()的结果是创建了一个实现了BlockingService界面的对象,并为其动态定义了界面所要求的操作方法,包括关键的callBlockingMethod()。以后当有RPC请求到来的时候,服务端将首先就调用这个callBlockingMethod()函数,然后在里面分情形调用诸如impl.submitApplication(controller, request)等函数。这里的impl就是前面创建的ApplicationClientProtocolPBServiceImpl对象,那里面的变量real就是上一层getServer ()中的参数instance,再往上,就是ClientRMService.serviceStart()中的this,那就是ClientRMService对象本身。所以,这里对impl.submitApplication()的调用最后就转化成对ClientRMService.submitApplication()的调用,这是本次RPC调用的真正目标。

定义于protobuf软件包中的这个界面之所以称为BlockingService,是因为这个界面上的函数调用将是同步的Blocking调用,而非异步的NonBlocking调用。

有了这个以后,就可以在此基础上通过createServer()创建RPC服务端了:

      [ResourceManager.serviceInit()> createAndInitActiveServices()> createClientRMService()
      >RMActiveServices.serviceInit()>createClientRMService()>ClientRMService.serviceStart()
      > YarnRPC.Create()> HadoopYarnProtoRPC.getServer()
      > RpcServerFactoryPBImpl.getServer()> createServer()]


      createServer(Class<? > pbProtocol, InetSocketAddress addr, Configuration conf,
                SecretManager<? extends TokenIdentifier> secretManager, int numHandlers,
                BlockingService blockingService, String portRangeConfig)
      > RPC.setProtocolEngine(conf, pbProtocol, ProtobufRpcEngine.class)
      > builder=new RPC.Builder(conf)
      > builder.setProtocol(pbProtocol) //以参数pbProtocol 为这个builder所针对的protocol
      > builder.setInstance(blockingService) //instance==protobuf.BlockingService()
      > builder.setBindAddress(addr.getHostName()).setPort(addr.getPort()).
                    setNumHandlers(numHandlers).setVerbose(false).
                    setSecretManager(secretManager).setPortRangeConfig(portRangeConfig)
      > RPC.Server server=builder.build()
                            //创建一个RPC.Server,实际上是扩充了RPC.Server的某类对象
      > LOG.info("Adding protocol"+pbProtocol.getCanonicalName()+"to the server")
      > server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, pbProtocol, blockingService)
      > return server
                //这是一个扩充落实了RPC.Server的某类对象,同时也是一个RPC.Server类对象

这样,最后由RM创建的是服务端的“一条龙”,即整个协议栈:其底层是IPC层的Server,它的上面是RPC.Server;再上面是一个RPC层上实现了com.google.protobuf.BlockingService界面、由ApplicationClientProtocolService中的newReflectiveBlockingService()方法加以动态定义和创建的无名类对象;再上面就是应用层的ApplicationClientProtocolPBServiceImpl。

这样,当有提交作业的RPC请求到来时,在RM节点上的调用路径将是这样:

      [RPC.Server > BlockingService.callBlockingMethod()
      > ApplicationClientProtocolPBServiceImpl.submitApplication()
      > ClientRMService.submitApplication()]

即IPC层将请求提交给RPC层的RPC.Server,后者从接收到的RPC请求报文中恢复出有关参数,就调用protobuf层的BlockingService.callBlockingMethod(),那里会调用作为跳板和中介的例如ApplicationClientProtocolPBServiceImpl.submitApplication(),再由它调用本次RPC的目标函数,例如ClientRMService.submitApplication()。