4.4 RM节点上的RPC服务
我们先看具体的RPC服务端是怎么搭建起来的。
我们从YARN的主节点RM即ResourceManager开始,这是个带main()函数的类,所以会作为一个独立的JVM进程(或者说在一台独立的Java虚拟机上)运行。下面是ResourceManager类的摘要,我们就从其main()函数开始:
class ResourceManager extends CompositeService {} ]main(String argv[]) > conf=new YarnConfiguration() > resourceManager=new ResourceManager() > hook=new CompositeServiceShutdownHook(resourceManager) > ShutdownHookManager.get().addShutdownHook(hook, …) > resourceManager.init(conf) //会调用ResourceManager.serviceInit() > resourceManager.start() ]ResourceManager() //构造函数 > super("ResourceManager")==CompositeService("ResourceManager")
>> super(name)==AbstractService ]serviceInit(Configuration conf) > …//load core-site.xml > …//load yarn-site.xml > this.rmLoginUGI=UserGroupInformation.getCurrentUser() > doSecureLogin() > rmDispatcher=setupDispatcher() > adminService=createAdminService() > createAndInitActiveServices() >> activeServices=new RMActiveServices(this) //其定义见后 >> activeServices.init(conf) //会调用下面的RMActiveServices.serviceInit() > super.serviceInit(this.conf) ]class RMActiveServices extends CompositeService {} ]]serviceInit(Configuration configuration) > … > cl ientRM=createClientRMService() >> return new ClientRMService(this.rmContext, scheduler, this.rmAppManager, …) //创建ClientRMService,这相当于RM面向客户的服务窗口 > addService(clientRM) > … ]]serviceStart()
从其main()函数开始,读者应能顺着程序的流程进入createClientRMService(),就是ClientRMService类对象clientRM的创建,然后看这个类的serviceStart():
[ResourceManager.main()> serviceInit()> createAndInitActiveServices()> RMActiveServices.serviceInit()> createClientRMService()> ClientRMService.serviceStart()] class ClientRMService extends AbstractService implements ApplicationClientProtocol{} ]serviceStart() > YarnRPC rpc=YarnRPC.create(conf) //实际创建的是HadoopYarnProtoRPC > this.server=rpc.getServer(ApplicationClientProtocol.class, this, clientBindAddress, …) ==HadoopYarnProtoRPC.getServer(ApplicationClientProtocol.class, …) > this.server.start() > super.serviceStart() ]getNewApplication(GetNewApplicationRequest request) ]submitApplication(SubmitApplicationRequest request) > … > user=UserGroupInformation.getCurrentUser().getShortUserName() > rmAppManager.submitApplication(submissionContext, System.currentTimeMillis(), user) ]…//还有好多,此处省略
RM节点上的ClientRMService对象是专门为RM的客户,主要是App服务的。所以这个类提供了例如getNewApplication()、submitApplication()等函数,就是让客户通过RPC来调用的。为此它需要建立自己的RPC服务端,所以才调用YarnRPC.create():
[ResourceManager.serviceInit()> createAndInitActiveServices()> createClientRMService() > RMActiveServices.serviceInit()>createClientRMService()>ClientRMService.serviceStart() > YarnRPC.Create()] abstract class YarnRPC {} ]create(Configuration conf) > clazzName=conf.get(YarnConfiguration.IPC_RPC_IMPL) =="org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC" //定义于yarn-default.xml 中的“yarn.ipc.rpc.class”属性 > if (clazzName==null)clazzName=YarnConfiguration.DEFAULT_IPC_RPC_IMPL //那也是“org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC” > return (YarnRPC)Class.forName(clazzName).newInstance() //创建HadoopYarnProtoRPC对象,那是对YarnRPC的扩充
但是具体该创建什么类的对象得要用“yarn.ipc.rpc.class”为键去配置块conf中查询,返回的结果应该是HadoopYarnProtoRPC,所以这里创建的就是一个HadoopYarnProtoRPC类的对象,然后就调用其getServer():
[ResourceManager.serviceInit()> createAndInitActiveServices()> createClientRMService() >RMActiveServices.serviceInit()>createClientRMService()>ClientRMService.serviceStart() > YarnRPC.Create()> HadoopYarnProtoRPC.getServer()] class HadoopYarnProtoRPC extends YarnRPC {} ]getServer(Class protocol, Object instance, InetSocketAddress addr, Configuration conf, SecretManager<? extends TokenIdentifier> secretManager, int numHandlers, String portRangeConfig) > LOG.debug("Creating a HadoopYarnProtoRpc server for protocol"+protocol +"with"+numHandlers+"handlers") > serverFactory=RpcFactoryProvider.getServerFactory(conf) >> serverFactoryClassName= conf.get(YarnConfiguration.IPC_SERVER_FACTORY_CLASS, YarnConfiguration.DEFAULT_IPC_SERVER_FACTORY_CLASS) >> return (RpcServerFactory)getFactoryClassInstance(serverFactoryClassName) //这是RpcServerFactoryPBImpl > return serverFactory.getServer(protocol, instance, addr, conf, secretManager, numHandlers, portRangeConfig) ]getProxy(Class protocol, InetSocketAddress addr, Configuration conf)
为了创建RPC服务端,先得获取用来创建这个服务端的serverFactory,这又得去配置块conf中查询,这次的结果是RpcServerFactoryPBImpl,因为默认的就是这个。
DEFAULT_IPC_SERVER_FACTORY_CLASS=
"org.apache.hadoop.yarn.factories.impl.pb.RpcServerFactoryPBImpl"
//默认的 serverFactory
结果这个serverFactory就是RpcServerFactoryPBImpl,再调用其getServer()方法,就得到了服务端针对给定protocol的RPC层Server。注意,在我们现在这个情景中,这里的参数protocol是ApplicationClientProtocol.class。
[ResourceManager.serviceInit()> createAndInitActiveServices()> createClientRMService() >RMActiveServices.serviceInit()>createClientRMService()>ClientRMService.serviceStart() > YarnRPC.Create()> HadoopYarnProtoRPC.getServer() > RpcServerFactoryPBImpl.getServer()] class RpcServerFactoryPBImpl implements RpcServerFactory{} ]getServer(Class<? > protocol, Object instance, InetSocketAddress addr, Configuration conf, SecretManager<? extends TokenIdentifier> secretManager, int numHandlers, String portRangeConfig) > Constructor<? > constructor=serviceCache.get(protocol) //说不定已经有缓存了 //在我们这个情景中,参数protocol 是ApplicationClientProtocol.class > if (constructor==null){ //如果缓存中还没有 >+ clazzName=getPbServiceImplClassName(protocol) >+> String srcPackagePart=getPackageName(clazz)//参数clazz 就是protocol >+> String srcClassName=getClassName(clazz) //ApplicationClientProtocol >+> String destPackagePart=srcPackagePart+"."+PB_IMPL_PACKAGE_SUFFIX //"impl.pb" >+> String destClassPart=srcClassName+PB_IMPL_CLASS_SUFFIX //"PBServiceImpl" >+> return destPackagePart+"."+destClassPart >+ pbServiceImplClazz=localConf.getClassByName(clazzName) //这就是“ApplicationClientProtocolPBServiceImpl” >+ constructor=pbServiceImplClazz.getConstructor(protocol) //这是ApplicationClientProtocolPBServiceImpl 的构造函数 >+ constructor.setAccessible(true) >+ serviceCache.putIfAbsent(protocol, constructor) //加入缓存 > } > Obj ect service=constructor.newInstance(instance) //创建ApplicationClientProtocolPBServiceImpl 对象 > Class<? >pbProtocol=service.getClass().getInterfaces()[0]//获取其实现的第一个界面 >Method method=protoCache.get(protocol) //先在缓存中找 > if (method==null){ //缓存中还没有,创建并放入缓存 >+ clazzName=getProtoClassName(protocol)
>+ Class<? > protoClazz=localConf.getClassByName(clazzName) //这个类是ApplicationClientProtocolService >+ method=protoClazz.getMethod("newReflectiveBlockingService", pbProtocol.getInterfaces()[0]) >+ method.setAccessible(true) >+ protoCache.putIfAbsent(protocol, method) //加入缓存 > } > blkserv=(BlockingService)method.invoke(null, service) ==newReflectiveBlockingService.invoke(null, service) //调用ApplicationClientProtocolService.newReflectiveBlockingService() //以创建一个实现了protobuf.BlockingService()界面的对象 //参数 service,即ApplicationClientProtocolPBServiceImpl,成为其内部成分 impl > return createServer(pbProtocol, addr, conf, secretManager, numHandlers, blkserv, portRangeConfig)
这里,method.invoke()的结果是一个BlockingService对象。BlockingService是在Google Protobuf软件包中定义的一个界面,所以这结果实际上是个实现了BlockingService界面的某类对象。这个类是在抽象类ApplicationClientProtocolService内部动态定义的:
class ApplicationClientProtocol {} ]abstract class ApplicationClientProtocolService{} ]]newReflectiveBlockingService(final BlockingInterface impl) > return new com.google.protobuf.BlockingService() //创建一个实现了BlockingService界面的对象 ]getDescriptorForType() ]callBlockingMethod( com.google.protobuf.Descriptors.MethodDescriptor method, com.google.protobuf.RpcController controller, com.google.protobuf.Message request) //补上需要实现的方法函数 > switch(method.getIndex()){ > case 0:return impl.getNewApplication(controller, request) > case 1:return impl.getApplicationReport(controller, request) > case 2:return impl.submitApplication(controller, request) ==ApplicationClientProtocolPBServiceImpl.submitApplication(controller, request) >> response=real.submitApplication(request) ==ClientRMService.submitApplication(SubmitApplicationRequest request) > … > }
所以,上面通过Reflection机制调用newReflectiveBlockingService()的结果是创建了一个实现了BlockingService界面的对象,并为其动态定义了界面所要求的操作方法,包括关键的callBlockingMethod()。以后当有RPC请求到来的时候,服务端将首先就调用这个callBlockingMethod()函数,然后在里面分情形调用诸如impl.submitApplication(controller, request)等函数。这里的impl就是前面创建的ApplicationClientProtocolPBServiceImpl对象,那里面的变量real就是上一层getServer ()中的参数instance,再往上,就是ClientRMService.serviceStart()中的this,那就是ClientRMService对象本身。所以,这里对impl.submitApplication()的调用最后就转化成对ClientRMService.submitApplication()的调用,这是本次RPC调用的真正目标。
定义于protobuf软件包中的这个界面之所以称为BlockingService,是因为这个界面上的函数调用将是同步的Blocking调用,而非异步的NonBlocking调用。
有了这个以后,就可以在此基础上通过createServer()创建RPC服务端了:
[ResourceManager.serviceInit()> createAndInitActiveServices()> createClientRMService() >RMActiveServices.serviceInit()>createClientRMService()>ClientRMService.serviceStart() > YarnRPC.Create()> HadoopYarnProtoRPC.getServer() > RpcServerFactoryPBImpl.getServer()> createServer()] createServer(Class<? > pbProtocol, InetSocketAddress addr, Configuration conf, SecretManager<? extends TokenIdentifier> secretManager, int numHandlers, BlockingService blockingService, String portRangeConfig) > RPC.setProtocolEngine(conf, pbProtocol, ProtobufRpcEngine.class) > builder=new RPC.Builder(conf) > builder.setProtocol(pbProtocol) //以参数pbProtocol 为这个builder所针对的protocol > builder.setInstance(blockingService) //instance==protobuf.BlockingService() > builder.setBindAddress(addr.getHostName()).setPort(addr.getPort()). setNumHandlers(numHandlers).setVerbose(false). setSecretManager(secretManager).setPortRangeConfig(portRangeConfig) > RPC.Server server=builder.build() //创建一个RPC.Server,实际上是扩充了RPC.Server的某类对象 > LOG.info("Adding protocol"+pbProtocol.getCanonicalName()+"to the server") > server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, pbProtocol, blockingService) > return server //这是一个扩充落实了RPC.Server的某类对象,同时也是一个RPC.Server类对象
这样,最后由RM创建的是服务端的“一条龙”,即整个协议栈:其底层是IPC层的Server,它的上面是RPC.Server;再上面是一个RPC层上实现了com.google.protobuf.BlockingService界面、由ApplicationClientProtocolService中的newReflectiveBlockingService()方法加以动态定义和创建的无名类对象;再上面就是应用层的ApplicationClientProtocolPBServiceImpl。
这样,当有提交作业的RPC请求到来时,在RM节点上的调用路径将是这样:
[RPC.Server > BlockingService.callBlockingMethod() > ApplicationClientProtocolPBServiceImpl.submitApplication() > ClientRMService.submitApplication()]
即IPC层将请求提交给RPC层的RPC.Server,后者从接收到的RPC请求报文中恢复出有关参数,就调用protobuf层的BlockingService.callBlockingMethod(),那里会调用作为跳板和中介的例如ApplicationClientProtocolPBServiceImpl.submitApplication(),再由它调用本次RPC的目标函数,例如ClientRMService.submitApplication()。