联童科技基于incubator-dolphinscheduler从0到1构建大数据调度平台之路 (2)

联童科技基于incubator-dolphinscheduler从0到1构建大数据调度平台之路

 2、增加主动触发下游工作流实例运行

如下图所示,我们在原有并行执行的基础上,增加主动触发下游一个或者多个工作流实例运行。

联童科技基于incubator-dolphinscheduler从0到1构建大数据调度平台之路

 运行后效果如下:

联童科技基于incubator-dolphinscheduler从0到1构建大数据调度平台之路

 

联童科技基于incubator-dolphinscheduler从0到1构建大数据调度平台之路

3、一些较大的Bug修复

  联童在使用 incubator-dolphinscheduler时,同样也踩过不少的坑,这里我们举其中一个例子,比如在内部使用时,同事反馈最多的问题就是调度任务的日志刷新不及时,有时候很久才能刷新出日志。后来经过源码分析,发现是源码中存在了一些不太健壮的处理导致了这个问题。

 incubator-dolphinscheduler 中AbstractCommandExecutor.java 部分源码

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.server.worker.task; import static org.apache.dolphinscheduler.common.Constants.EXIT_CODE_FAILURE; import static org.apache.dolphinscheduler.common.Constants.EXIT_CODE_KILL; import static org.apache.dolphinscheduler.common.Constants.EXIT_CODE_SUCCESS; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.utils.HadoopUtils; import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.utils.ProcessUtils; import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager; import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStreamReader; import java.lang.reflect.Field; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.slf4j.Logger; /** * abstract command executor */ public abstract class AbstractCommandExecutor { /** * rules for extracting application ID */ protected static final Pattern APPLICATION_REGEX = Pattern.compile(Constants.APPLICATION_REGEX); protected StringBuilder varPool = new StringBuilder(); /** * process */ private Process process; /** * log handler */ protected Consumer<List<String>> logHandler; /** * logger */ protected Logger logger; /** * log list */ protected final List<String> logBuffer; /** * taskExecutionContext */ protected TaskExecutionContext taskExecutionContext; /** * taskExecutionContextCacheManager */ private TaskExecutionContextCacheManager taskExecutionContextCacheManager; public AbstractCommandExecutor(Consumer<List<String>> logHandler, TaskExecutionContext taskExecutionContext, Logger logger) { this.logHandler = logHandler; this.taskExecutionContext = taskExecutionContext; this.logger = logger; this.logBuffer = Collections.synchronizedList(new ArrayList<>()); this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class); } /** * build process * * @param commandFile command file * @throws IOException IO Exception */ private void buildProcess(String commandFile) throws IOException { // setting up user to run commands List<String> command = new LinkedList<>(); //init process builder ProcessBuilder processBuilder = new ProcessBuilder(); // setting up a working directory processBuilder.directory(new File(taskExecutionContext.getExecutePath())); // merge error information to standard output stream processBuilder.redirectErrorStream(true); // setting up user to run commands command.add("sudo"); command.add("-u"); command.add(taskExecutionContext.getTenantCode()); command.add(commandInterpreter()); command.addAll(commandOptions()); command.add(commandFile); // setting commands processBuilder.command(command); process = processBuilder.start(); // print command printCommand(command); } .......... /** * get the standard output of the process * * @param process process */ private void parseProcessOutput(Process process) { String threadLoggerInfoName = String.format(LoggerUtils.TASK_LOGGER_THREAD_NAME + "-%s", taskExecutionContext.getTaskAppId()); ExecutorService parseProcessOutputExecutorService = ThreadUtils.newDaemonSingleThreadExecutor(threadLoggerInfoName); parseProcessOutputExecutorService.submit(new Runnable() { @Override public void run() { BufferedReader inReader = null; try { inReader = new BufferedReader(new InputStreamReader(process.getInputStream())); String line; long lastFlushTime = System.currentTimeMillis(); while ((line = inReader.readLine()) != null) { if (line.startsWith("${setValue(")) { varPool.append(line.substring("${setValue(".length(), line.length() - 2)); varPool.append("$VarPool$"); } else { logBuffer.add(line); lastFlushTime = flush(lastFlushTime); } } } catch (Exception e) { logger.error(e.getMessage(), e); } finally { clear(); close(inReader); } } }); parseProcessOutputExecutorService.shutdown(); } ................ /** * when log buffer siz or flush time reach condition , then flush * * @param lastFlushTime last flush time * @return last flush time */ private long flush(long lastFlushTime) { long now = System.currentTimeMillis(); /** * when log buffer siz or flush time reach condition , then flush */ if (logBuffer.size() >= Constants.DEFAULT_LOG_ROWS_NUM || now - lastFlushTime > Constants.DEFAULT_LOG_FLUSH_INTERVAL) { lastFlushTime = now; /** log handle */ logHandler.accept(logBuffer); logBuffer.clear(); } return lastFlushTime; } /** * close buffer reader * * @param inReader in reader */ private void close(BufferedReader inReader) { if (inReader != null) { try { inReader.close(); } catch (IOException e) { logger.error(e.getMessage(), e); } } } protected List<String> commandOptions() { return Collections.emptyList(); } protected abstract String buildCommandFilePath(); protected abstract String commandInterpreter(); protected abstract void createCommandFileIfNotExists(String execCommand, String commandFile) throws IOException; }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wsxdpx.html