最近看到有几个Github友关注了Streaming的监控工程——Teddy,所以思来想去还是优化下代码,不能让别人看笑话,是不。于是就想改在一下之前最丑陋的一个地方——任务提交
本博客内容基于Spark2.2版本~在阅读文章并想实际操作前,请确保你有:
一台配置好Spark和yarn的服务器
支持正常spark-submit --master yarn xxxx的任务提交
老版本老版本任务提交是采用启动本地进程,执行脚本spark-submit xxx的方式做的。其中一个关键的问题就是获得提交Spark任务的Application-id,因为这个id是跟任务状态的跟踪有关系的。如果你的资源管理框架用的是yarn,应该知道每个运行的任务都有一个applicaiton_id,这个id的生成规则是:
appplication_时间戳_数字原本老版本的spark通过参数spark.app.id就可以手动指定id,但是新版本的代码不是了。它是直接读取的taskBackend中的applicationId()方法,这个方法具体的实现是根据实现类来定的。在yarn中,是通过Yarn的YarnClusterSchedulerBackend实现的,具体的连接可以参考这个:
https://github.com/apache/spark/blob/e1dd03e42c2131b167b1e80c761291e88bfdf03f/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
感兴趣的同学可以看一下,最终真正常见applicaiton_id的地方式在hadoop-yarn里面,有个叫做ContainerId来生成的:
总结一句话就是,想要自定义id,甭想了!!!!
于是当时脑袋瓜不灵光的我,就想到那就等应用创建好了之后,直接写到数据库里面呗。怎么写呢?
我事先生成一个自定义的id,当做参数传递到spark应用里面;
等spark初始化后,就可以通过sparkContext取得对应的application_id以及url
然后再driver连接数据库,插入一条关联关系
新版本还是归结于互联网时代的信息传递,我看到群友的聊天,知道了SparkLauncer这个东西,调查后发现。他可以基于Java代码自动提交Spark任务,有两种模式:
new SparkLauncher().launch() 直接启动一个Process,效果跟以前一样
new SparkLauncher().startApplicaiton(监听器) 返回一个SparkAppHandler,并(可选)传入一个监听器
当然是更倾向于第二种啦,因为好处很多:
自带输出重定向(Output,Error都有,支持写到文件里面),超级爽的功能
可以自定义监听器,当信息或者状态变更时,都能进行操作(对我没啥用)
返回的SparkAppHandler支持 暂停、停止、断连、获得AppId、获得State等多种功能,我就想要这个!!!!
一步一步,代码展示首先创建一个最基本的Spark程序:
import org.apache.spark.sql.SparkSession; import java.util.ArrayList; import java.util.List; public class HelloWorld { public static void main(String[] args) throws InterruptedException { SparkSession spark = SparkSession .builder() //.master("yarn") //.appName("hello-wrold") //.config("spark.some.config.option", "some-value") .getOrCreate(); List<Person> persons = new ArrayList<>(); persons.add(new Person("zhangsan", 22, "male")); persons.add(new Person("lisi", 25, "male")); persons.add(new Person("wangwu", 23, "female")); spark.createDataFrame(persons, Person.class).show(false); spark.close(); } }然后创建SparkLauncher类:
import org.apache.spark.launcher.SparkAppHandle; import org.apache.spark.launcher.SparkLauncher; import java.io.IOException; public class Launcher { public static void main(String[] args) throws IOException { SparkAppHandle handler = new SparkLauncher() .setAppName("hello-world") .setSparkHome(args[0]) .setMaster(args[1]) .setConf("spark.driver.memory", "2g") .setConf("spark.executor.memory", "1g") .setConf("spark.executor.cores", "3") .setAppResource("/home/xinghailong/launcher/launcher_test.jar") .setMainClass("HelloWorld") .addAppArgs("I come from Launcher") .setDeployMode("cluster") .startApplication(new SparkAppHandle.Listener(){ @Override public void stateChanged(SparkAppHandle handle) { System.out.println("********** state changed **********"); } @Override public void infoChanged(SparkAppHandle handle) { System.out.println("********** info changed **********"); } }); while(!"FINISHED".equalsIgnoreCase(handler.getState().toString()) && !"FAILED".equalsIgnoreCase(handler.getState().toString())){ System.out.println("id "+handler.getAppId()); System.out.println("state "+handler.getState()); try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } } } }然后打包工程,打包过程可以参考之前的博客: