|
@@ -0,0 +1,547 @@
|
|
|
|
|
+package com.primeton.datainsight.task;
|
|
|
|
|
+
|
|
|
|
|
+import com.alibaba.fastjson.JSON;
|
|
|
|
|
+import com.alibaba.fastjson.JSONObject;
|
|
|
|
|
+import com.google.common.collect.ImmutableList;
|
|
|
|
|
+import com.google.common.collect.Sets;
|
|
|
|
|
+import com.primeton.datainsight.configure.TaskProperties;
|
|
|
|
|
+import com.primeton.datainsight.exec.SparkTaskSubmits;
|
|
|
|
|
+import com.primeton.datainsight.job.IJob;
|
|
|
|
|
+import com.primeton.datainsight.service.WarnService;
|
|
|
|
|
+import com.primeton.datainsight.support.JobListener;
|
|
|
|
|
+import com.primeton.datainsight.support.ProcessPublisher;
|
|
|
|
|
+import com.primeton.datainsight.utils.ClasspathPackageScanner;
|
|
|
|
|
+import com.primeton.datainsight.utils.Constance;
|
|
|
|
|
+import com.primeton.datainsight.utils.NetworkInterfaceManager;
|
|
|
|
|
+import org.apache.commons.exec.CommandLine;
|
|
|
|
|
+import org.apache.commons.exec.DefaultExecutor;
|
|
|
|
|
+import org.apache.commons.exec.ExecuteWatchdog;
|
|
|
|
|
+import org.apache.commons.exec.PumpStreamHandler;
|
|
|
|
|
+import org.apache.commons.lang.ClassUtils;
|
|
|
|
|
+import org.apache.commons.lang.StringUtils;
|
|
|
|
|
+import org.joda.time.DateTime;
|
|
|
|
|
+import org.slf4j.Logger;
|
|
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
|
|
+
|
|
|
|
|
+import java.io.File;
|
|
|
|
|
+import java.nio.charset.StandardCharsets;
|
|
|
|
|
+import java.util.ArrayList;
|
|
|
|
|
+import java.util.Arrays;
|
|
|
|
|
+import java.util.HashMap;
|
|
|
|
|
+import java.util.HashSet;
|
|
|
|
|
+import java.util.List;
|
|
|
|
|
+import java.util.Map;
|
|
|
|
|
+import java.util.Optional;
|
|
|
|
|
+import java.util.Set;
|
|
|
|
|
+import java.util.UUID;
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+/**
|
|
|
|
|
+ * <p>
|
|
|
|
|
+ * </p>
|
|
|
|
|
+ * Created by zhaopx on 2018/1/3 0003-18:06
|
|
|
|
|
+ */
|
|
|
|
|
+public class SparkNativeLocalTask extends SparkTask {
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ private static Logger LOG = LoggerFactory.getLogger(SparkNativeLocalTask.class);
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 公共的add的类列表
|
|
|
|
|
+ */
|
|
|
|
|
+ private final List<String> commonDependList;
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 任务的JobId
|
|
|
|
|
+ */
|
|
|
|
|
+ protected final String jobId;
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 前端传输的taskId
|
|
|
|
|
+ */
|
|
|
|
|
+ protected final String originTaskId;
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 入口类
|
|
|
|
|
+ */
|
|
|
|
|
+ protected final String jobClassStr;
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * Spark App Name
|
|
|
|
|
+ */
|
|
|
|
|
+ protected final String appName;
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 传递给 Task 的参数
|
|
|
|
|
+ */
|
|
|
|
|
+ protected final Map<String, String> taskParams;
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 实现接口的类
|
|
|
|
|
+ */
|
|
|
|
|
+ protected Class jobClass = null;
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 告警服务
|
|
|
|
|
+ */
|
|
|
|
|
+ WarnService warnService;
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ JobListener jobListener;
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ ProcessPublisher processPublisher;
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 任务参数配置
|
|
|
|
|
+ */
|
|
|
|
|
+ TaskProperties taskProperties;
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * dependency jar
|
|
|
|
|
+ */
|
|
|
|
|
+ protected java.net.URL[] dependJars = null;
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 不关心 originTaskId,采用 UUID 生成
|
|
|
|
|
+ * @param jobId
|
|
|
|
|
+ * @param taskParams 提供给App的入口参数
|
|
|
|
|
+ */
|
|
|
|
|
+ public SparkNativeLocalTask(String jobId,
|
|
|
|
|
+ String jobClass,
|
|
|
|
|
+ String appName,
|
|
|
|
|
+ Map<String, String> taskParams) {
|
|
|
|
|
+ this(jobId, jobClass, appName, UUID.randomUUID().toString().replaceAll("-", ""), taskParams);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 传入 dataFile 地址, file or hdfs
|
|
|
|
|
+ * @param originTaskId 这里的 originID 是指 taskInstanceId。 本质上Spark运行的ID和前面指定的ID是无法直接使用的,这里理解为做个关联
|
|
|
|
|
+ * @param taskParams 提供给App的入口参数
|
|
|
|
|
+ */
|
|
|
|
|
+ public SparkNativeLocalTask(String jobId,
|
|
|
|
|
+ String jobClass,
|
|
|
|
|
+ String appName,
|
|
|
|
|
+ String originTaskId,
|
|
|
|
|
+ Map<String, String> taskParams) {
|
|
|
|
|
+ super(StringUtils.isBlank(originTaskId) ? UUID.randomUUID().toString().replaceAll("-", "") : originTaskId);
|
|
|
|
|
+ this.jobId = jobId;
|
|
|
|
|
+ this.originTaskId = getTaskId();
|
|
|
|
|
+ this.jobClassStr = jobClass;
|
|
|
|
|
+ this.appName = appName;
|
|
|
|
|
+ this.taskParams = Optional.ofNullable(taskParams).orElse(new HashMap<>());
|
|
|
|
|
+ ImmutableList.Builder<String> builder = ImmutableList.<String>builder();
|
|
|
|
|
+ //String appLibHome = ServerConfig.getAppHome() + "/lib/";
|
|
|
|
|
+ try {
|
|
|
|
|
+ // 是绝对路径,无需加前缀
|
|
|
|
|
+ String fastJsonJar = ClasspathPackageScanner.findContainingJar(JSON.class);
|
|
|
|
|
+ builder.add(fastJsonJar);
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ LOG.warn("find data3c commom jar, server jar, fastjson jar error.", e);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ this.commonDependList = builder.build();
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ @Override
|
|
|
|
|
+ public void doExecute() throws Exception {
|
|
|
|
|
+ JSONObject params = new JSONObject();
|
|
|
|
|
+ params.put("id", jobId);
|
|
|
|
|
+ params.put("originTaskId", originTaskId);
|
|
|
|
|
+ params.put("jobClass", jobClassStr);
|
|
|
|
|
+ params.put("rpcPort", 8088);
|
|
|
|
|
+ params.put("jobName", appName);
|
|
|
|
|
+
|
|
|
|
|
+ params.put("master", taskProperties.getMaster());
|
|
|
|
|
+ params.put("deployMode", taskProperties.getDeployMode());
|
|
|
|
|
+ params.put("queue", taskProperties.getQueue());
|
|
|
|
|
+ params.put("appName", appName);
|
|
|
|
|
+ params.put("files", taskProperties.getFiles());
|
|
|
|
|
+
|
|
|
|
|
+ execute(params);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ private void execute(Map<String, Object> params) throws Exception {
|
|
|
|
|
+ JSONObject map = (JSONObject)params;
|
|
|
|
|
+ final String jobId = map.getString("id");
|
|
|
|
|
+ // 前端的taskId,因为在任务中需要
|
|
|
|
|
+ final String originTaskId = map.getString("originTaskId");
|
|
|
|
|
+
|
|
|
|
|
+ // quartz job name, 跟 jobId 有关系
|
|
|
|
|
+ LOG.info("Start SparkJob {} at: {}", jobId, DateTime.now().toString("yyyy-MM-dd HH:mm:ss"));
|
|
|
|
|
+
|
|
|
|
|
+ final String jobClass = map.getString("jobClass");
|
|
|
|
|
+
|
|
|
|
|
+ // TASK UUID
|
|
|
|
|
+ String uuid = UUID.randomUUID().toString().replaceAll("-", "");
|
|
|
|
|
+ try {
|
|
|
|
|
+ Set<String> dependJars = new HashSet<>();
|
|
|
|
|
+ if(StringUtils.isNotBlank(taskProperties.getJars())) {
|
|
|
|
|
+ String[] split = taskProperties.getJars().split(",");
|
|
|
|
|
+ dependJars.addAll(Sets.newHashSet(split));
|
|
|
|
|
+ }
|
|
|
|
|
+ List<java.net.URL> jars = getJars(jobId, dependJars);
|
|
|
|
|
+ if(!dependJars.isEmpty()){
|
|
|
|
|
+ this.dependJars = jars.toArray(new java.net.URL[jars.size()]);
|
|
|
|
|
+ if(!jars.isEmpty()) {
|
|
|
|
|
+ LOG.info("add jars: " + Arrays.toString(this.dependJars));
|
|
|
|
|
+ // URLClassLoader, 加载外部的 lib 资源,用于动态加载,部分job 在系统上add 后不用重启
|
|
|
|
|
+ java.net.URLClassLoader classLoader = new java.net.URLClassLoader(this.dependJars, this.getClass().getClassLoader());
|
|
|
|
|
+ this.jobClass = (Class<? extends IJob>) ClassUtils.getClass(classLoader, jobClass, false);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ if(this.jobClass == null) {
|
|
|
|
|
+ this.jobClass = (Class<? extends IJob>) ClassUtils.getClass(jobClass, false);
|
|
|
|
|
+ }
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ throw new IllegalStateException(e);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ String serverHost = NetworkInterfaceManager.INSTANCE.getLocalHostAddress();
|
|
|
|
|
+ if(StringUtils.isBlank(serverHost)){
|
|
|
|
|
+ throw new IllegalArgumentException("unknown data3c server. env[DATA3C_HOST] or sysprop[data3c.host] to set.");
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // RPC 反向回调的地址
|
|
|
|
|
+ String endPoint = serverHost + ":" + map.getIntValue("rpcPort");
|
|
|
|
|
+ String[] args = this.parseArgs(jobId, originTaskId, map, endPoint);
|
|
|
|
|
+
|
|
|
|
|
+ try {
|
|
|
|
|
+ JSONObject object = new JSONObject();
|
|
|
|
|
+ object.put("finalStatus", "UNDEFINED");
|
|
|
|
|
+ object.put("appState", "NEW");
|
|
|
|
|
+ object.put("source", "data3c");
|
|
|
|
|
+ object.put("jobType", "SPARK");
|
|
|
|
|
+ object.put("taskName", map.getString("jobName"));
|
|
|
|
|
+ object.put("uuid", uuid);
|
|
|
|
|
+ object.put("__ORIGIN_TASK_ID", originTaskId);
|
|
|
|
|
+
|
|
|
|
|
+ jobListener.call(jobId, originTaskId, "NEW", object);
|
|
|
|
|
+ processPublisher.publish(Constance.JOB_PROCESS_TOPIC, object.toJSONString());
|
|
|
|
|
+ // 把正在运行的任务放置到队列
|
|
|
|
|
+ JobQueue.pushNewTask(jobId, originTaskId, uuid, map, jobListener);
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ int exitCode = runAsRunTimeProcess(jobId,
|
|
|
|
|
+ originTaskId,
|
|
|
|
|
+ map,
|
|
|
|
|
+ args);
|
|
|
|
|
+ if(exitCode != 0) {
|
|
|
|
|
+ // 执行异常
|
|
|
|
|
+ throw new IllegalStateException("Spark Task Invalid exitCode: " + exitCode);
|
|
|
|
|
+ }
|
|
|
|
|
+ JSONObject result = new JSONObject();
|
|
|
|
|
+ result.put("source", "data3c");
|
|
|
|
|
+ result.put("jobType", "SPARK");
|
|
|
|
|
+ result.put("exitCode", exitCode);
|
|
|
|
|
+ object.put("appState", "SUCCEEDED");
|
|
|
|
|
+ result.put("finalStatus", "FINISHED");
|
|
|
|
|
+ result.put("taskName", map.getString("jobName"));
|
|
|
|
|
+ result.put("process", 1);
|
|
|
|
|
+ result.put("uuid", uuid);
|
|
|
|
|
+ result.put("__ORIGIN_TASK_ID", originTaskId);
|
|
|
|
|
+ jobListener.call(jobId, originTaskId, "SUCCEEDED", result);
|
|
|
|
|
+ processPublisher.publish(Constance.JOB_PROCESS_TOPIC, result.toJSONString());
|
|
|
|
|
+ LOG.info("{}/{} 运行结束。 STATE: {}", jobId, originTaskId, result.toJSONString());
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ JSONObject error = new JSONObject();
|
|
|
|
|
+ error.put("source", "data3c");
|
|
|
|
|
+ error.put("jobType", "SPARK");
|
|
|
|
|
+ error.put("appState", "FAILED");
|
|
|
|
|
+ error.put("finalStatus", "FAILED");
|
|
|
|
|
+ error.put("taskName", map.getString("jobName"));
|
|
|
|
|
+ error.put("message", e.getMessage());
|
|
|
|
|
+ error.put("uuid", uuid);
|
|
|
|
|
+ error.put("__ORIGIN_TASK_ID", originTaskId);
|
|
|
|
|
+ //error.put("error", ExceptionUtils.getFullStackTrace(e));
|
|
|
|
|
+ jobListener.call(jobId, originTaskId, "FAILED", error);
|
|
|
|
|
+ processPublisher.publish(Constance.JOB_PROCESS_TOPIC, error.toJSONString());
|
|
|
|
|
+ JobQueue.removeErrorJob(jobId, originTaskId);
|
|
|
|
|
+ LOG.error("run job error: ", e);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 调用系统命令,启动Job
|
|
|
|
|
+ *
|
|
|
|
|
+ * @param args
|
|
|
|
|
+ * @return 0 正常运行结束,返回0; 不正常的结束,返回小于 0
|
|
|
|
|
+ */
|
|
|
|
|
+ private int runAsRunTimeProcess(final String jobId,
|
|
|
|
|
+ final String taskId,
|
|
|
|
|
+ final JSONObject taskConfig,
|
|
|
|
|
+ String[] args){
|
|
|
|
|
+ String execute = "spark-submit";
|
|
|
|
|
+
|
|
|
|
|
+ // 如果配置了SaprkHome,则从Spark Home 下读取spark-submit
|
|
|
|
|
+ if(System.getenv("SPARK_HOME") != null){
|
|
|
|
|
+ //execute = System.getenv("SPARK_HOME") + "bin" + execute;
|
|
|
|
|
+ String sparkHome = System.getenv("SPARK_HOME");
|
|
|
|
|
+ execute = StringUtils.join(Arrays.asList(sparkHome, "bin", execute).iterator(), File.separator);
|
|
|
|
|
+ }
|
|
|
|
|
+ String cmd = execute + " " + StringUtils.join(args, " ");
|
|
|
|
|
+ LOG.info("execute command: " + cmd);
|
|
|
|
|
+ try (SparkJobOutputStream out = new SparkJobOutputStream(taskProperties.getTaskLogDir(), jobId, taskId)){
|
|
|
|
|
+ PumpStreamHandler streamHandler = new PumpStreamHandler(out);
|
|
|
|
|
+
|
|
|
|
|
+ // 不限时间
|
|
|
|
|
+ final ExecuteWatchdog watchDog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT);
|
|
|
|
|
+ DefaultExecutor executor = new DefaultExecutor();
|
|
|
|
|
+ executor.setExitValue(0);
|
|
|
|
|
+
|
|
|
|
|
+ PidProcessDestroyer pidProcessDestroyer = new PidProcessDestroyer(taskId);
|
|
|
|
|
+ executor.setStreamHandler(streamHandler);
|
|
|
|
|
+ executor.setWatchdog(watchDog);
|
|
|
|
|
+ executor.setProcessDestroyer(pidProcessDestroyer);
|
|
|
|
|
+ JobQueue.addTaskObserver(jobId, taskId, watchDog);
|
|
|
|
|
+
|
|
|
|
|
+ Map<String, String> environment = new HashMap<>(System.getenv());
|
|
|
|
|
+ environment.put("SPARK_SUBMIT_OPTS", taskProperties.getSparkSubmitOpts());
|
|
|
|
|
+ executor.setWorkingDirectory(new File(out.getTaskDir()));
|
|
|
|
|
+
|
|
|
|
|
+ // 是否开启任务
|
|
|
|
|
+ JobExecuteResultHandler handler = new JobExecuteResultHandler(
|
|
|
|
|
+ warnService);
|
|
|
|
|
+ handler.setTaskName(taskConfig.getString("jobName"));
|
|
|
|
|
+ environment.put("TASK_DIR", out.getTaskDir());
|
|
|
|
|
+ executor.execute(CommandLine.parse(cmd), environment, handler);
|
|
|
|
|
+ handler.waitFor();
|
|
|
|
|
+ return handler.getExitValue();
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ LOG.error(jobId + "/" + taskId + " 执行任务提交命令失败。");
|
|
|
|
|
+ throw new IllegalStateException(e.getMessage());
|
|
|
|
|
+ } finally {
|
|
|
|
|
+ JobQueue.removeTaskObserver(jobId, taskId);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 解析 Job 参数
|
|
|
|
|
+ * @param jobId 任务ID
|
|
|
|
|
+ * @param taskId 随机唯一ID
|
|
|
|
|
+ * @param taskConfig task 配置
|
|
|
|
|
+ * @param endPoint 反向推送RPC的端口号
|
|
|
|
|
+ * @return
|
|
|
|
|
+ */
|
|
|
|
|
+ protected String[] parseArgs(final String jobId, final String taskId, final JSONObject taskConfig, final String endPoint) {
|
|
|
|
|
+ String classInJar = null;
|
|
|
|
|
+ // 路径相同,不会出现多个
|
|
|
|
|
+ Set<String> jarPaths = new HashSet<>();
|
|
|
|
|
+
|
|
|
|
|
+ try {
|
|
|
|
|
+ // 添加第三方的依赖jar
|
|
|
|
|
+ jarPaths.addAll(commonDependList);
|
|
|
|
|
+ if(dependJars != null) {
|
|
|
|
|
+ for (java.net.URL jar : dependJars) {
|
|
|
|
|
+ jarPaths.add(jar.getPath());
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ classInJar = ClasspathPackageScanner.findContainingJar(jobClass);
|
|
|
|
|
+ //jarPaths.add(classInJar);
|
|
|
|
|
+ jarPaths.remove(classInJar);
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ throw new IllegalArgumentException(e);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ List<String> list = new ArrayList<>();
|
|
|
|
|
+ list.add("--master");
|
|
|
|
|
+ list.add("'"+ taskConfig.getString("master") + "'");
|
|
|
|
|
+
|
|
|
|
|
+ // local 模式,用于测试,不需要提交 deployMode 参数
|
|
|
|
|
+ // 当为 yarn 是存在 yarn-client 和 yarn-cluster 两种模式
|
|
|
|
|
+ if(StringUtils.equals(taskConfig.getString("master"), "yarn")) {
|
|
|
|
|
+ list.add("--deploy-mode");
|
|
|
|
|
+ list.add(taskConfig.getString("deployMode"));
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // json 參數
|
|
|
|
|
+ JSONObject parameters = (JSONObject) taskConfig.get("parameters");
|
|
|
|
|
+ if(parameters == null) {
|
|
|
|
|
+ parameters = new JSONObject();
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ list.add("--name");
|
|
|
|
|
+ String name = getTaskName(taskConfig);
|
|
|
|
|
+ list.add(name);
|
|
|
|
|
+
|
|
|
|
|
+ list.add("--class");
|
|
|
|
|
+ boolean amcImplJob = true;
|
|
|
|
|
+ if(IJob.class.isAssignableFrom(jobClass)) {
|
|
|
|
|
+ list.add(SparkTaskSubmits.class.getName());
|
|
|
|
|
+ //集成了 AMC 的任务
|
|
|
|
|
+ amcImplJob = true;
|
|
|
|
|
+ } else {
|
|
|
|
|
+ //list.add(taskConfig.getString("jobClass"));
|
|
|
|
|
+ list.add(jobClass.getName());
|
|
|
|
|
+ // 原生任务
|
|
|
|
|
+ amcImplJob = false;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ if(StringUtils.isNotBlank(taskProperties.getJvmOpts())) {
|
|
|
|
|
+ list.add(taskProperties.getJvmOpts());
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ list.add("--queue");
|
|
|
|
|
+ list.add(Optional.ofNullable(taskConfig.getString("queue")).orElse("default"));
|
|
|
|
|
+
|
|
|
|
|
+ list.add("--conf");
|
|
|
|
|
+ list.add("spark.app.name=" + name);
|
|
|
|
|
+ Set<String> confArgs = new HashSet<>();
|
|
|
|
|
+ if(StringUtils.isNotBlank(taskProperties.getSparkParameters())) {
|
|
|
|
|
+ String[] confSplit = taskProperties.getSparkParameters().split(",");
|
|
|
|
|
+ confArgs.addAll(Sets.newHashSet(confSplit));
|
|
|
|
|
+ }
|
|
|
|
|
+ if(confArgs.size() > 0) {
|
|
|
|
|
+ for (String confArg : confArgs) {
|
|
|
|
|
+ list.add("--conf");
|
|
|
|
|
+ list.add(confArg);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ // 添加文件
|
|
|
|
|
+ String addFiles = (String)taskConfig.get("files");
|
|
|
|
|
+ if(StringUtils.isNotBlank(addFiles)) {
|
|
|
|
|
+ String[] files = addFiles.split(",");
|
|
|
|
|
+ Set<String> fileSet = new HashSet<>(files.length);
|
|
|
|
|
+ if(files.length > 0) {
|
|
|
|
|
+ for (String file : files) {
|
|
|
|
|
+ File file1 = new File(file);
|
|
|
|
|
+ if(file1.exists() && file1.isFile()) {
|
|
|
|
|
+ fileSet.add(file1.getAbsolutePath());
|
|
|
|
|
+ LOG.info("add spark file: {}", file1.getAbsolutePath());
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ if(fileSet.size() > 0) {
|
|
|
|
|
+ list.add("--files");
|
|
|
|
|
+ list.add(StringUtils.join(fileSet, ","));
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 解决了 在 lib 包下,又在 jobs 包下的任务包。优先使用 jobs 下的
|
|
|
|
|
+ String jobJarName = new File(classInJar).getName();
|
|
|
|
|
+ String jobJar = classInJar;
|
|
|
|
|
+ Set<String> dependJarsPath = new HashSet<>();
|
|
|
|
|
+ for (String jarPath : jarPaths) {
|
|
|
|
|
+ File jarFile = new File(jarPath);
|
|
|
|
|
+ if(jarFile.exists() && jarFile.getName().equals(jobJarName)) {
|
|
|
|
|
+ // 如果 jobs 下有和 lib 下的同名jar,则使用 jobs 下面的。
|
|
|
|
|
+ jobJar = jarPath;
|
|
|
|
|
+ } else {
|
|
|
|
|
+ dependJarsPath.add(jarPath);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ list.add("--jars");
|
|
|
|
|
+ String dependJars = StringUtils.join(dependJarsPath, ",");
|
|
|
|
|
+ list.add(dependJars);
|
|
|
|
|
+ list.add(jobJar);
|
|
|
|
|
+
|
|
|
|
|
+ for (String jarPath : jarPaths) {
|
|
|
|
|
+ LOG.info("add jar {}", jarPath);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ // dataFile 文件地址,作为参数传入
|
|
|
|
|
+ for (Map.Entry<String, String> taskParam : this.taskParams.entrySet()) {
|
|
|
|
|
+ parameters.put(taskParam.getKey(), taskParam.getValue());
|
|
|
|
|
+ }
|
|
|
|
|
+ parameters.put("__JOB_ID", jobId);
|
|
|
|
|
+ parameters.put("__TASK_ID", taskId);
|
|
|
|
|
+ if(StringUtils.isNotBlank(originTaskId)){
|
|
|
|
|
+ parameters.put("__ORIGIN_TASK_ID", originTaskId);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ if(StringUtils.isNotBlank(taskProperties.getAppParameters())) {
|
|
|
|
|
+ list.add(taskProperties.getAppParameters());
|
|
|
|
|
+ } else {
|
|
|
|
|
+ // 这个之后的是程序参数,args,第0个 参数
|
|
|
|
|
+ list.add(jobClass.getName());
|
|
|
|
|
+
|
|
|
|
|
+ org.apache.commons.codec.binary.Base64 base64 = new org.apache.commons.codec.binary.Base64(1, ":".getBytes());
|
|
|
|
|
+ // 第1个 参数
|
|
|
|
|
+ list.add(base64.encodeToString(parameters.toJSONString().getBytes(StandardCharsets.UTF_8)));
|
|
|
|
|
+
|
|
|
|
|
+ // 第2个 参数
|
|
|
|
|
+ list.add(endPoint);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ LOG.info("args: " + StringUtils.join(list.iterator(), " "));
|
|
|
|
|
+ return list.toArray(new String[list.size()]);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ public void setWarnService(WarnService warnService) {
|
|
|
|
|
+ this.warnService = warnService;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ public void setJobListener(JobListener jobListener) {
|
|
|
|
|
+ this.jobListener = jobListener;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ public void setProcessPublisher(ProcessPublisher processPublisher) {
|
|
|
|
|
+ this.processPublisher = processPublisher;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ public void setTaskProperties(TaskProperties taskProperties) {
|
|
|
|
|
+ this.taskProperties = taskProperties;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 获取 Task name
|
|
|
|
|
+ * @param taskConfig Task Config
|
|
|
|
|
+ * @return 返回任务名称
|
|
|
|
|
+ */
|
|
|
|
|
+ protected String getTaskName(final JSONObject taskConfig) {
|
|
|
|
|
+ String name = taskConfig.getString("jobName");
|
|
|
|
|
+ if(StringUtils.isBlank(name) || "null".equalsIgnoreCase(name)) {
|
|
|
|
|
+ name = jobClass.getSimpleName();
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 把前端的任务名称追加到yarn的name上,yarn上好识别
|
|
|
|
|
+ String forceTaskName = taskConfig.getString("taskName");
|
|
|
|
|
+ if(StringUtils.isNotBlank(forceTaskName)){
|
|
|
|
|
+ name = name + ":" + forceTaskName;
|
|
|
|
|
+ }
|
|
|
|
|
+ return name;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ public List<java.net.URL> getJars(String jobId, Set<String> dependJars) throws Exception {
|
|
|
|
|
+ List<java.net.URL> jars = new ArrayList<>(dependJars.size());
|
|
|
|
|
+ for (String dependJar : dependJars) {
|
|
|
|
|
+ File file = new File(dependJar);
|
|
|
|
|
+ if (file.exists() && file.isFile()) {
|
|
|
|
|
+ jars.add(file.toURL());
|
|
|
|
|
+ continue;
|
|
|
|
|
+ } else {
|
|
|
|
|
+ String jobJarDir = taskProperties.getSparkLibDir();
|
|
|
|
|
+ // 单个文件, 找不到
|
|
|
|
|
+ file = new File(jobJarDir, dependJar);
|
|
|
|
|
+ if (file.exists() && file.isFile()) {
|
|
|
|
|
+ jars.add(file.toURL());
|
|
|
|
|
+ continue;
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ LOG.warn("{} not exists, skip file.", dependJar);
|
|
|
|
|
+ }
|
|
|
|
|
+ return jars;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+}
|