package com.primeton.datainsight.task; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.google.common.collect.ImmutableList; import com.google.common.collect.Sets; import com.primeton.datainsight.configure.TaskProperties; import com.primeton.datainsight.exec.SparkTaskSubmits; import com.primeton.datainsight.job.IJob; import com.primeton.datainsight.service.WarnService; import com.primeton.datainsight.support.JobListener; import com.primeton.datainsight.support.ProcessPublisher; import com.primeton.datainsight.utils.ClasspathPackageScanner; import com.primeton.datainsight.utils.Constance; import com.primeton.datainsight.utils.NetworkInterfaceManager; import org.apache.commons.exec.CommandLine; import org.apache.commons.exec.DefaultExecutor; import org.apache.commons.exec.ExecuteWatchdog; import org.apache.commons.exec.PumpStreamHandler; import org.apache.commons.lang.ClassUtils; import org.apache.commons.lang.StringUtils; import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.UUID; /** *

*

* Created by zhaopx on 2018/1/3 0003-18:06 */ public class SparkNativeLocalTask extends SparkTask { private static Logger LOG = LoggerFactory.getLogger(SparkNativeLocalTask.class); /** * 公共的add的类列表 */ private final List commonDependList; /** * 任务的JobId */ protected final String jobId; /** * 前端传输的taskId */ protected final String originTaskId; /** * 入口类 */ protected final String jobClassStr; /** * Spark App Name */ protected final String appName; /** * 传递给 Task 的参数 */ protected final Map taskParams; /** * 实现接口的类 */ protected Class jobClass = null; /** * 告警服务 */ WarnService warnService; JobListener jobListener; ProcessPublisher processPublisher; /** * 任务参数配置 */ TaskProperties taskProperties; /** * dependency jar */ protected java.net.URL[] dependJars = null; /** * 不关心 originTaskId,采用 UUID 生成 * @param jobId * @param taskParams 提供给App的入口参数 */ public SparkNativeLocalTask(String jobId, String jobClass, String appName, Map taskParams) { this(jobId, jobClass, appName, UUID.randomUUID().toString().replaceAll("-", ""), taskParams); } /** * 传入 dataFile 地址, file or hdfs * @param originTaskId 这里的 originID 是指 taskInstanceId。 本质上Spark运行的ID和前面指定的ID是无法直接使用的,这里理解为做个关联 * @param taskParams 提供给App的入口参数 */ public SparkNativeLocalTask(String jobId, String jobClass, String appName, String originTaskId, Map taskParams) { super(StringUtils.isBlank(originTaskId) ? UUID.randomUUID().toString().replaceAll("-", "") : originTaskId); this.jobId = jobId; this.originTaskId = getTaskId(); this.jobClassStr = jobClass; this.appName = appName; this.taskParams = Optional.ofNullable(taskParams).orElse(new HashMap<>()); ImmutableList.Builder builder = ImmutableList.builder(); //String appLibHome = ServerConfig.getAppHome() + "/lib/"; try { // 是绝对路径,无需加前缀 String fastJsonJar = ClasspathPackageScanner.findContainingJar(JSON.class); builder.add(fastJsonJar); } catch (Exception e) { LOG.warn("find data3c commom jar, server jar, fastjson jar error.", e); } this.commonDependList = builder.build(); } @Override public void doExecute() throws Exception { JSONObject params = new JSONObject(); params.put("id", jobId); params.put("originTaskId", originTaskId); params.put("jobClass", jobClassStr); params.put("rpcPort", 8088); params.put("jobName", appName); params.put("master", taskProperties.getMaster()); params.put("deployMode", taskProperties.getDeployMode()); params.put("queue", taskProperties.getQueue()); params.put("appName", appName); params.put("files", taskProperties.getFiles()); execute(params); } private void execute(Map params) throws Exception { JSONObject map = (JSONObject)params; final String jobId = map.getString("id"); // 前端的taskId,因为在任务中需要 final String originTaskId = map.getString("originTaskId"); // quartz job name, 跟 jobId 有关系 LOG.info("Start SparkJob {} at: {}", jobId, DateTime.now().toString("yyyy-MM-dd HH:mm:ss")); final String jobClass = map.getString("jobClass"); // TASK UUID String uuid = UUID.randomUUID().toString().replaceAll("-", ""); try { Set dependJars = new HashSet<>(); if(StringUtils.isNotBlank(taskProperties.getJars())) { String[] split = taskProperties.getJars().split(","); dependJars.addAll(Sets.newHashSet(split)); } List jars = getJars(jobId, dependJars); if(!dependJars.isEmpty()){ this.dependJars = jars.toArray(new java.net.URL[jars.size()]); if(!jars.isEmpty()) { LOG.info("add jars: " + Arrays.toString(this.dependJars)); // URLClassLoader, 加载外部的 lib 资源,用于动态加载,部分job 在系统上add 后不用重启 java.net.URLClassLoader classLoader = new java.net.URLClassLoader(this.dependJars, this.getClass().getClassLoader()); this.jobClass = (Class) ClassUtils.getClass(classLoader, jobClass, false); } } if(this.jobClass == null) { this.jobClass = (Class) ClassUtils.getClass(jobClass, false); } } catch (Exception e) { throw new IllegalStateException(e); } String serverHost = NetworkInterfaceManager.INSTANCE.getLocalHostAddress(); if(StringUtils.isBlank(serverHost)){ throw new IllegalArgumentException("unknown data3c server. env[DATA3C_HOST] or sysprop[data3c.host] to set."); } // RPC 反向回调的地址 String endPoint = serverHost + ":" + map.getIntValue("rpcPort"); String[] args = this.parseArgs(jobId, originTaskId, map, endPoint); try { JSONObject object = new JSONObject(); object.put("finalStatus", "UNDEFINED"); object.put("appState", "NEW"); object.put("source", "data3c"); object.put("jobType", "SPARK"); object.put("taskName", map.getString("jobName")); object.put("uuid", uuid); object.put("__ORIGIN_TASK_ID", originTaskId); jobListener.call(jobId, originTaskId, "NEW", object); processPublisher.publish(Constance.JOB_PROCESS_TOPIC, object.toJSONString()); // 把正在运行的任务放置到队列 JobQueue.pushNewTask(jobId, originTaskId, uuid, map, jobListener); int exitCode = runAsRunTimeProcess(jobId, originTaskId, map, args); if(exitCode != 0) { // 执行异常 throw new IllegalStateException("Spark Task Invalid exitCode: " + exitCode); } JSONObject result = new JSONObject(); result.put("source", "data3c"); result.put("jobType", "SPARK"); result.put("exitCode", exitCode); object.put("appState", "SUCCEEDED"); result.put("finalStatus", "FINISHED"); result.put("taskName", map.getString("jobName")); result.put("process", 1); result.put("uuid", uuid); result.put("__ORIGIN_TASK_ID", originTaskId); jobListener.call(jobId, originTaskId, "SUCCEEDED", result); processPublisher.publish(Constance.JOB_PROCESS_TOPIC, result.toJSONString()); LOG.info("{}/{} 运行结束。 STATE: {}", jobId, originTaskId, result.toJSONString()); } catch (Exception e) { JSONObject error = new JSONObject(); error.put("source", "data3c"); error.put("jobType", "SPARK"); error.put("appState", "FAILED"); error.put("finalStatus", "FAILED"); error.put("taskName", map.getString("jobName")); error.put("message", e.getMessage()); error.put("uuid", uuid); error.put("__ORIGIN_TASK_ID", originTaskId); //error.put("error", ExceptionUtils.getFullStackTrace(e)); jobListener.call(jobId, originTaskId, "FAILED", error); processPublisher.publish(Constance.JOB_PROCESS_TOPIC, error.toJSONString()); JobQueue.removeErrorJob(jobId, originTaskId); LOG.error("run job error: ", e); } } /** * 调用系统命令,启动Job * * @param args * @return 0 正常运行结束,返回0; 不正常的结束,返回小于 0 */ private int runAsRunTimeProcess(final String jobId, final String taskId, final JSONObject taskConfig, String[] args){ String execute = "spark-submit"; // 如果配置了SaprkHome,则从Spark Home 下读取spark-submit if(System.getenv("SPARK_HOME") != null){ //execute = System.getenv("SPARK_HOME") + "bin" + execute; String sparkHome = System.getenv("SPARK_HOME"); execute = StringUtils.join(Arrays.asList(sparkHome, "bin", execute).iterator(), File.separator); } String cmd = execute + " " + StringUtils.join(args, " "); LOG.info("execute command: " + cmd); try (SparkJobOutputStream out = new SparkJobOutputStream(taskProperties.getTaskLogDir(), jobId, taskId)){ PumpStreamHandler streamHandler = new PumpStreamHandler(out); // 不限时间 final ExecuteWatchdog watchDog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT); DefaultExecutor executor = new DefaultExecutor(); executor.setExitValue(0); PidProcessDestroyer pidProcessDestroyer = new PidProcessDestroyer(taskId); executor.setStreamHandler(streamHandler); executor.setWatchdog(watchDog); executor.setProcessDestroyer(pidProcessDestroyer); JobQueue.addTaskObserver(jobId, taskId, watchDog); Map environment = new HashMap<>(System.getenv()); environment.put("SPARK_SUBMIT_OPTS", taskProperties.getSparkSubmitOpts()); executor.setWorkingDirectory(new File(out.getTaskDir())); // 是否开启任务 JobExecuteResultHandler handler = new JobExecuteResultHandler( warnService); handler.setTaskName(taskConfig.getString("jobName")); environment.put("TASK_DIR", out.getTaskDir()); executor.execute(CommandLine.parse(cmd), environment, handler); handler.waitFor(); return handler.getExitValue(); } catch (Exception e) { LOG.error(jobId + "/" + taskId + " 执行任务提交命令失败。"); throw new IllegalStateException(e.getMessage()); } finally { JobQueue.removeTaskObserver(jobId, taskId); } } /** * 解析 Job 参数 * @param jobId 任务ID * @param taskId 随机唯一ID * @param taskConfig task 配置 * @param endPoint 反向推送RPC的端口号 * @return */ protected String[] parseArgs(final String jobId, final String taskId, final JSONObject taskConfig, final String endPoint) { String classInJar = null; // 路径相同,不会出现多个 Set jarPaths = new HashSet<>(); try { // 添加第三方的依赖jar jarPaths.addAll(commonDependList); if(dependJars != null) { for (java.net.URL jar : dependJars) { jarPaths.add(jar.getPath()); } } classInJar = ClasspathPackageScanner.findContainingJar(jobClass); //jarPaths.add(classInJar); jarPaths.remove(classInJar); } catch (Exception e) { throw new IllegalArgumentException(e); } List list = new ArrayList<>(); list.add("--master"); list.add("'"+ taskConfig.getString("master") + "'"); // local 模式,用于测试,不需要提交 deployMode 参数 // 当为 yarn 是存在 yarn-client 和 yarn-cluster 两种模式 if(StringUtils.equals(taskConfig.getString("master"), "yarn")) { list.add("--deploy-mode"); list.add(taskConfig.getString("deployMode")); } // json 參數 JSONObject parameters = (JSONObject) taskConfig.get("parameters"); if(parameters == null) { parameters = new JSONObject(); } list.add("--name"); String name = getTaskName(taskConfig); list.add(name); list.add("--class"); boolean amcImplJob = true; if(IJob.class.isAssignableFrom(jobClass)) { list.add(SparkTaskSubmits.class.getName()); //集成了 AMC 的任务 amcImplJob = true; } else { //list.add(taskConfig.getString("jobClass")); list.add(jobClass.getName()); // 原生任务 amcImplJob = false; } if(StringUtils.isNotBlank(taskProperties.getJvmOpts())) { list.add(taskProperties.getJvmOpts()); } list.add("--queue"); list.add(Optional.ofNullable(taskConfig.getString("queue")).orElse("default")); list.add("--conf"); list.add("spark.app.name=" + name); Set confArgs = new HashSet<>(); if(StringUtils.isNotBlank(taskProperties.getSparkParameters())) { String[] confSplit = taskProperties.getSparkParameters().split(","); confArgs.addAll(Sets.newHashSet(confSplit)); } if(confArgs.size() > 0) { for (String confArg : confArgs) { list.add("--conf"); list.add(confArg); } } // 添加文件 String addFiles = (String)taskConfig.get("files"); if(StringUtils.isNotBlank(addFiles)) { String[] files = addFiles.split(","); Set fileSet = new HashSet<>(files.length); if(files.length > 0) { for (String file : files) { File file1 = new File(file); if(file1.exists() && file1.isFile()) { fileSet.add(file1.getAbsolutePath()); LOG.info("add spark file: {}", file1.getAbsolutePath()); } } } if(fileSet.size() > 0) { list.add("--files"); list.add(StringUtils.join(fileSet, ",")); } } // 解决了 在 lib 包下,又在 jobs 包下的任务包。优先使用 jobs 下的 String jobJarName = new File(classInJar).getName(); String jobJar = classInJar; Set dependJarsPath = new HashSet<>(); for (String jarPath : jarPaths) { File jarFile = new File(jarPath); if(jarFile.exists() && jarFile.getName().equals(jobJarName)) { // 如果 jobs 下有和 lib 下的同名jar,则使用 jobs 下面的。 jobJar = jarPath; } else { dependJarsPath.add(jarPath); } } list.add("--jars"); String dependJars = StringUtils.join(dependJarsPath, ","); list.add(dependJars); list.add(jobJar); for (String jarPath : jarPaths) { LOG.info("add jar {}", jarPath); } // dataFile 文件地址,作为参数传入 for (Map.Entry taskParam : this.taskParams.entrySet()) { parameters.put(taskParam.getKey(), taskParam.getValue()); } parameters.put("__JOB_ID", jobId); parameters.put("__TASK_ID", taskId); if(StringUtils.isNotBlank(originTaskId)){ parameters.put("__ORIGIN_TASK_ID", originTaskId); } if(StringUtils.isNotBlank(taskProperties.getAppParameters())) { list.add(taskProperties.getAppParameters()); } else { // 这个之后的是程序参数,args,第0个 参数 list.add(jobClass.getName()); org.apache.commons.codec.binary.Base64 base64 = new org.apache.commons.codec.binary.Base64(1, ":".getBytes()); // 第1个 参数 list.add(base64.encodeToString(parameters.toJSONString().getBytes(StandardCharsets.UTF_8))); // 第2个 参数 list.add(endPoint); } LOG.info("args: " + StringUtils.join(list.iterator(), " ")); return list.toArray(new String[list.size()]); } public void setWarnService(WarnService warnService) { this.warnService = warnService; } public void setJobListener(JobListener jobListener) { this.jobListener = jobListener; } public void setProcessPublisher(ProcessPublisher processPublisher) { this.processPublisher = processPublisher; } public void setTaskProperties(TaskProperties taskProperties) { this.taskProperties = taskProperties; } /** * 获取 Task name * @param taskConfig Task Config * @return 返回任务名称 */ protected String getTaskName(final JSONObject taskConfig) { String name = taskConfig.getString("jobName"); if(StringUtils.isBlank(name) || "null".equalsIgnoreCase(name)) { name = jobClass.getSimpleName(); } // 把前端的任务名称追加到yarn的name上,yarn上好识别 String forceTaskName = taskConfig.getString("taskName"); if(StringUtils.isNotBlank(forceTaskName)){ name = name + ":" + forceTaskName; } return name; } public List getJars(String jobId, Set dependJars) throws Exception { List jars = new ArrayList<>(dependJars.size()); for (String dependJar : dependJars) { File file = new File(dependJar); if (file.exists() && file.isFile()) { jars.add(file.toURL()); continue; } else { String jobJarDir = taskProperties.getSparkLibDir(); // 单个文件, 找不到 file = new File(jobJarDir, dependJar); if (file.exists() && file.isFile()) { jars.add(file.toURL()); continue; } } LOG.warn("{} not exists, skip file.", dependJar); } return jars; } }