ApplicationMaster会按照设定的时间间隔向ResourceManager发送心跳。ResourceManager的ApplicationMasterService每次收到ApplicationMaster的心跳信息后,会同时在AMLivelinessMonitor更新其最近一次发送心跳的时间。
ApplicationMaster通过ContainerRequest方法向ResourceManager发送请求,申请相应数目的container。在发送申请container请求前,需要初始化Request,需要初始化的参数有:
Priority:请求的优先级
capability:当前支持CPU和Memory
nodes:申请的container所在的host(如果不需要指定,则设为null)
racks:申请的container所在的rack(如果不需要指定,则设为null)
ResourceManager返回ApplicationMaster的申请的containers信息,根据container的状态-containerStatus,更新已申请成功和还未申请的container数目。
申请成功的container,ApplicationMaster则通过ContainerLaunchContext初始化container的启动信息。初始化container后启动container。需要初始化的信息有:
Container id
执行资源(Shell脚本或命令、处理的数据)
运行环境
运行命令
container运行期间,ApplicationMaster对container进行监控。
job运行结束,ApplicationMaster发送FinishApplicationMasterRequest请求给ResourceManager,完成ApplicationMaster的注销。
具体代码如下(基于YARN2.6.0):
ApplicationMaster的入口main方法:
publicstaticvoidmain(String[] args) { boolean result = false; try { DshellApplicationMaster appMaster = new DshellApplicationMaster(); LOG.info("Initializing ApplicationMaster"); boolean doRun = appMaster.init(args); if (!doRun) { System.exit(0); } appMaster.run(); result = appMaster.finish(); } catch (Throwable t) { LOG.fatal("Error running ApplicationMaster", t); LogManager.shutdown(); ExitUtil.terminate(1, t); } if (result) { LOG.info("Application Master completed successfully. exiting"); System.exit(0); } else { LOG.info("Application Master failed. exiting"); System.exit(2); } }
main方法:
输入参数为Client提交的执行命令。
init方法完成对执行命令的解析,获取执行命令中参数指定的值。
run方法完成ApplicationMaster的启动、注册、containers的申请、分配、监控等功能的启动。
run方法中建立了与ResourceManager通信的Handle-AMRMClientAsync,其中的CallbackHandler是由RMCallbackHandler类实现的。
RMCallbackHandler类中实现了containers的申请、分配等方法。
containers的分配方法onContainersAllocated中通过LaunchContainerRunnable类中run方法完成container的启动。
finish方法完成container的停止、ApplicationMaster的注销。
4.2.2 对ApplicationMaster源码的修改在原有YARN DistributedShell的基础上做的修改如下: