YARN DistributedShell源码分析与修改
1 概述
2 YARN DistributedShell不能满足当前需求
2.1 功能需求
2.2 YARN DistributedShell对需求的支持情况
2.3 需要对YARN DistributedShell进行的修改
3 YARN DistributedShell源码获取
4 YARN DistributedShell源码分析及修改
4.1 Client类
4.1.1 Client源码逻辑
4.1.2 对Client源码的修改
4.2 ApplicationMaster类
4.2.1 ApplicationMaster源码逻辑
4.2.2 对ApplicationMaster源码的修改
4.3 DSConstants类
4.4 Log4jPropertyHelper类
1 概述Hadoop YARN项目自带一个非常简单的应用程序编程实例--DistributedShell。DistributedShell是一个构建在YARN之上的non-MapReduce应用示例。它的主要功能是在Hadoop集群中的多个节点,并行执行用户提供的shell命令或shell脚本(将用户提交的一串shell命令或者一个shell脚本,由ApplicationMaster控制,分配到不同的container中执行)。
2 YARN DistributedShell不能满足当前需求 2.1 功能需求我所参与的项目通过融合Hive、MapReduce、Spark、Kafka等大数据开源组件,搭建了一个数据分析平台。
平台需要新增一个功能:
在集群中选取一个节点,执行用户提交的jar包。
该功能需要与平台已有的基于Hive、MR、Spark实现的业务以及YARN相融合。
简而言之,经分析与调研,我们需要基于YARN的DistributedShell实现该功能。
该功能需要实现:
单机执行用户自己提交的jar包
用户提交的jar包会有其他jar包的依赖
用户提交的jar包只能选取一个节点运行
用户提交的jar包需要有缓存数据的目录
2.2 YARN DistributedShell对需求的支持情况YARN的DistributedShell功能为:
支持执行用户提供的shell命令或脚本
执行节点数可以通过参数num_containers设置,默认值为1
不支持jar包的执行
更不支持依赖包的提交
不支持jar包缓存目录的设置
2.3 需要对YARN DistributedShell进行的修改增加支持执行jar包功能
增加支持缓存目录设置功能
删除执行节点数设置功能,不允许用户设置执行节点数,将执行节点数保证值为1
3 YARN DistributedShell源码获取YARN DistributedShell源码可以在GitHub上apache/hadoop获取,hadoop repository中DistributedShell的源代码路径为:
hadoop/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/
这里修改的是2.6.0版本源码。
YARN DistributedShell包含4个java Class:
DistributedShell ├── Client.java ├── ApplicationMaster.java ├── DSConstants.java ├── Log4jPropertyHelper.java
Client:客户端提交application
ApplicationMaster:注册AM,申请分配container,启动container
DSConstants:Client类和ApplicationMaster类中的常量定义
Log4jPropertyHelper:加载Log4j配置
4.1 Client类 4.1.1 Client源码逻辑Client类是DistributedShell应用提交到YARN的客户端。Client将启动application master,然后application master启动多个containers用于运行shell命令或脚本。Client运行逻辑为:
使用ApplicationClientProtocol协议连接ResourceManager(也叫ApplicationsMaster或ASM),获取一个新的ApplicationId。(ApplicationClientProtocol提供给Client一个获取集群信息的方式)
在一个job提交过程中,Client首先创建一个ApplicationSubmissionContext。ApplicationSubmissionContext定义了application的详细信息,例如:ApplicationId、application name、application分配的优先级、application分配的队列。另外,ApplicationSubmissionContext还定义了一个Container,该Container用于启动ApplicationMaster。
在ContainerLaunchContext中需要初始化启动ApplicationMaster的资源:
运行ApplicationMaster的container的资源
jars(例:AppMaster.jar)、配置文件(例:log4j.properties)
运行环境(例:hadoop特定的类路径、java classpath)
启动ApplicationMaster的命令
Client使用ApplicationSubmissionContext提交application到ResourceManager,并通过按周期向ResourceManager请求ApplicationReport,完成对applicatoin的监控。
如果application运行时间超过timeout的限制(默认为600000毫秒,可通过-timeout进行设置),client将发送KillApplicationRequest到ResourceManager,将application杀死。
具体代码如下(基于YARN2.6.0):
Cilent的入口main方法:
publicstaticvoidmain(String[] args) { boolean result = false; try { DshellClient client = new DshellClient(); LOG.info("Initializing Client"); try { boolean doRun = client.init(args); if (!doRun) { System.exit(0); } } catch (IllegalArgumentException e) { System.err.println(e.getLocalizedMessage()); client.printUsage(); System.exit(-1); } result = client.run(); } catch (Throwable t) { LOG.fatal("Error running Client", t); System.exit(1); } if (result) { LOG.info("Application completed successfully"); System.exit(0); } LOG.error("Application failed to complete successfully"); System.exit(2); }
main方法: