YARN DistributedShell源码分析与修改(3)

publicDshellClient(Configuration conf) throws Exception { // 修改构造方法的ApplicationMaster类↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓ this("org.apache.hadoop.yarn.applications.distributedshell.DshellApplicationMaster",conf); // ↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑ }

init方法

增加container_files和container_archives两个参数的解析

// 初始化选项container_files、container_archives↓↓↓↓↓↓↓ this.opts.addOption("container_files", true,"The files that containers will run . Separated by comma"); this.opts.addOption("container_archives", true,"The archives that containers will unzip. Separated by comma"); // ↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑

run方法

上传container_files和container_archives两个参数指定的依赖包和缓存目录至HDFS

// 上传container_files指定的jar包到HDFS ↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓ if (this.containerJarPaths.length != 0) for (int i = 0; i < this.containerJarPaths.length; i++) { String hdfsJarLocation = ""; String[] jarNameSplit = this.containerJarPaths[i].split("/"); String jarName = jarNameSplit[(jarNameSplit.length - 1)]; long hdfsJarLen = 0L; long hdfsJarTimestamp = 0L; if (!this.containerJarPaths[i].isEmpty()) { Path jarSrc = new Path(this.containerJarPaths[i]); String jarPathSuffix = this.appName + "/" + appId.toString() + "/" + jarName; Path jarDst = new Path(fs.getHomeDirectory(), jarPathSuffix); fs.copyFromLocalFile(false, true, jarSrc, jarDst); hdfsJarLocation = jarDst.toUri().toString(); FileStatus jarFileStatus = fs.getFileStatus(jarDst); hdfsJarLen = jarFileStatus.getLen(); hdfsJarTimestamp = jarFileStatus.getModificationTime(); env.put(DshellDSConstants.DISTRIBUTEDJARLOCATION + i, hdfsJarLocation); env.put(DshellDSConstants.DISTRIBUTEDJARTIMESTAMP + i, Long.toString(hdfsJarTimestamp)); env.put(DshellDSConstants.DISTRIBUTEDJARLEN + i, Long.toString(hdfsJarLen)); } } // ↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑ // 上传container_archives到HDFS↓↓↓↓↓↓↓↓↓↓↓↓↓↓ long hdfsArchiveLen; String archivePathSuffix; Path archiveDst; FileStatus archiveFileStatus; if (this.containerArchivePaths.length != 0) { for (int i = 0; i < this.containerArchivePaths.length; i++) { String hdfsArchiveLocation = ""; String[] archiveNameSplit = this.containerArchivePaths[i].split("/"); String archiveName = archiveNameSplit[(archiveNameSplit.length - 1)]; hdfsArchiveLen = 0L; long hdfsArchiveTimestamp = 0L; if (!this.containerArchivePaths[i].isEmpty()) { Path archiveSrc = new Path(this.containerArchivePaths[i]); archivePathSuffix = this.appName + "/" + appId.toString() + "/" + archiveName; archiveDst = new Path(fs.getHomeDirectory(), archivePathSuffix); fs.copyFromLocalFile(false, true, archiveSrc, archiveDst); hdfsArchiveLocation = archiveDst.toUri().toString(); archiveFileStatus = fs.getFileStatus(archiveDst); hdfsArchiveLen = archiveFileStatus.getLen(); hdfsArchiveTimestamp = archiveFileStatus .getModificationTime(); env.put(DshellDSConstants.DISTRIBUTEDARCHIVELOCATION + i, hdfsArchiveLocation); env.put(DshellDSConstants.DISTRIBUTEDARCHIVETIMESTAMP + i, Long.toString(hdfsArchiveTimestamp)); env.put(DshellDSConstants.DISTRIBUTEDARCHIVELEN + i, Long.toString(hdfsArchiveLen)); } } } // ↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑

4.2 ApplicationMaster类 4.2.1 ApplicationMaster源码逻辑

一个ApplicationMaster将在启动一个或过个container,在container上执行shell命令或脚本。ApplicationMaster运行逻辑为:

ResourceManager启动一个container用于运行ApplicationMaster。

ApplicationMaster连接ResourceManager,向ResourceManager注册自己。

向ResourceManager注册的信息有:

ApplicationMaster的ip:port

ApplicationMaster所在主机的hostname

ApplicationMaster的tracking url。客户端可以用tracking url来跟踪任务的状态和历史记录。

需要注意的是:在DistributedShell中,不需要初注册tracking url和 appMasterHost:appMasterRpcPort,只需要设置hostname。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/cb101bc55df5e3fc8538c314a0022fe0.html