123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547 |
- package com.primeton.datainsight.task;
- import com.alibaba.fastjson.JSON;
- import com.alibaba.fastjson.JSONObject;
- import com.google.common.collect.ImmutableList;
- import com.google.common.collect.Sets;
- import com.primeton.datainsight.configure.TaskProperties;
- import com.primeton.datainsight.exec.SparkTaskSubmits;
- import com.primeton.datainsight.job.IJob;
- import com.primeton.datainsight.service.WarnService;
- import com.primeton.datainsight.support.JobListener;
- import com.primeton.datainsight.support.ProcessPublisher;
- import com.primeton.datainsight.utils.ClasspathPackageScanner;
- import com.primeton.datainsight.utils.Constance;
- import com.primeton.datainsight.utils.NetworkInterfaceManager;
- import org.apache.commons.exec.CommandLine;
- import org.apache.commons.exec.DefaultExecutor;
- import org.apache.commons.exec.ExecuteWatchdog;
- import org.apache.commons.exec.PumpStreamHandler;
- import org.apache.commons.lang.ClassUtils;
- import org.apache.commons.lang.StringUtils;
- import org.joda.time.DateTime;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import java.io.File;
- import java.nio.charset.StandardCharsets;
- import java.util.ArrayList;
- import java.util.Arrays;
- import java.util.HashMap;
- import java.util.HashSet;
- import java.util.List;
- import java.util.Map;
- import java.util.Optional;
- import java.util.Set;
- import java.util.UUID;
- /**
- * <p>
- * </p>
- * Created by zhaopx on 2018/1/3 0003-18:06
- */
- public class SparkNativeLocalTask extends SparkTask {
- private static Logger LOG = LoggerFactory.getLogger(SparkNativeLocalTask.class);
- /**
- * 公共的add的类列表
- */
- private final List<String> commonDependList;
- /**
- * 任务的JobId
- */
- protected final String jobId;
- /**
- * 前端传输的taskId
- */
- protected final String originTaskId;
- /**
- * 入口类
- */
- protected final String jobClassStr;
- /**
- * Spark App Name
- */
- protected final String appName;
- /**
- * 传递给 Task 的参数
- */
- protected final Map<String, String> taskParams;
- /**
- * 实现接口的类
- */
- protected Class jobClass = null;
- /**
- * 告警服务
- */
- WarnService warnService;
- JobListener jobListener;
- ProcessPublisher processPublisher;
- /**
- * 任务参数配置
- */
- TaskProperties taskProperties;
- /**
- * dependency jar
- */
- protected java.net.URL[] dependJars = null;
- /**
- * 不关心 originTaskId,采用 UUID 生成
- * @param jobId
- * @param taskParams 提供给App的入口参数
- */
- public SparkNativeLocalTask(String jobId,
- String jobClass,
- String appName,
- Map<String, String> taskParams) {
- this(jobId, jobClass, appName, UUID.randomUUID().toString().replaceAll("-", ""), taskParams);
- }
- /**
- * 传入 dataFile 地址, file or hdfs
- * @param originTaskId 这里的 originID 是指 taskInstanceId。 本质上Spark运行的ID和前面指定的ID是无法直接使用的,这里理解为做个关联
- * @param taskParams 提供给App的入口参数
- */
- public SparkNativeLocalTask(String jobId,
- String jobClass,
- String appName,
- String originTaskId,
- Map<String, String> taskParams) {
- super(StringUtils.isBlank(originTaskId) ? UUID.randomUUID().toString().replaceAll("-", "") : originTaskId);
- this.jobId = jobId;
- this.originTaskId = getTaskId();
- this.jobClassStr = jobClass;
- this.appName = appName;
- this.taskParams = Optional.ofNullable(taskParams).orElse(new HashMap<>());
- ImmutableList.Builder<String> builder = ImmutableList.<String>builder();
- //String appLibHome = ServerConfig.getAppHome() + "/lib/";
- try {
- // 是绝对路径,无需加前缀
- String fastJsonJar = ClasspathPackageScanner.findContainingJar(JSON.class);
- builder.add(fastJsonJar);
- } catch (Exception e) {
- LOG.warn("find data3c commom jar, server jar, fastjson jar error.", e);
- }
- this.commonDependList = builder.build();
- }
- @Override
- public void doExecute() throws Exception {
- JSONObject params = new JSONObject();
- params.put("id", jobId);
- params.put("originTaskId", originTaskId);
- params.put("jobClass", jobClassStr);
- params.put("rpcPort", 8088);
- params.put("jobName", appName);
- params.put("master", taskProperties.getMaster());
- params.put("deployMode", taskProperties.getDeployMode());
- params.put("queue", taskProperties.getQueue());
- params.put("appName", appName);
- params.put("files", taskProperties.getFiles());
- execute(params);
- }
- private void execute(Map<String, Object> params) throws Exception {
- JSONObject map = (JSONObject)params;
- final String jobId = map.getString("id");
- // 前端的taskId,因为在任务中需要
- final String originTaskId = map.getString("originTaskId");
- // quartz job name, 跟 jobId 有关系
- LOG.info("Start SparkJob {} at: {}", jobId, DateTime.now().toString("yyyy-MM-dd HH:mm:ss"));
- final String jobClass = map.getString("jobClass");
- // TASK UUID
- String uuid = UUID.randomUUID().toString().replaceAll("-", "");
- try {
- Set<String> dependJars = new HashSet<>();
- if(StringUtils.isNotBlank(taskProperties.getJars())) {
- String[] split = taskProperties.getJars().split(",");
- dependJars.addAll(Sets.newHashSet(split));
- }
- List<java.net.URL> jars = getJars(jobId, dependJars);
- if(!dependJars.isEmpty()){
- this.dependJars = jars.toArray(new java.net.URL[jars.size()]);
- if(!jars.isEmpty()) {
- LOG.info("add jars: " + Arrays.toString(this.dependJars));
- // URLClassLoader, 加载外部的 lib 资源,用于动态加载,部分job 在系统上add 后不用重启
- java.net.URLClassLoader classLoader = new java.net.URLClassLoader(this.dependJars, this.getClass().getClassLoader());
- this.jobClass = (Class<? extends IJob>) ClassUtils.getClass(classLoader, jobClass, false);
- }
- }
- if(this.jobClass == null) {
- this.jobClass = (Class<? extends IJob>) ClassUtils.getClass(jobClass, false);
- }
- } catch (Exception e) {
- throw new IllegalStateException(e);
- }
- String serverHost = NetworkInterfaceManager.INSTANCE.getLocalHostAddress();
- if(StringUtils.isBlank(serverHost)){
- throw new IllegalArgumentException("unknown data3c server. env[DATA3C_HOST] or sysprop[data3c.host] to set.");
- }
- // RPC 反向回调的地址
- String endPoint = serverHost + ":" + map.getIntValue("rpcPort");
- String[] args = this.parseArgs(jobId, originTaskId, map, endPoint);
- try {
- JSONObject object = new JSONObject();
- object.put("finalStatus", "UNDEFINED");
- object.put("appState", "NEW");
- object.put("source", "data3c");
- object.put("jobType", "SPARK");
- object.put("taskName", map.getString("jobName"));
- object.put("uuid", uuid);
- object.put("__ORIGIN_TASK_ID", originTaskId);
- jobListener.call(jobId, originTaskId, "NEW", object);
- processPublisher.publish(Constance.JOB_PROCESS_TOPIC, object.toJSONString());
- // 把正在运行的任务放置到队列
- JobQueue.pushNewTask(jobId, originTaskId, uuid, map, jobListener);
- int exitCode = runAsRunTimeProcess(jobId,
- originTaskId,
- map,
- args);
- if(exitCode != 0) {
- // 执行异常
- throw new IllegalStateException("Spark Task Invalid exitCode: " + exitCode);
- }
- JSONObject result = new JSONObject();
- result.put("source", "data3c");
- result.put("jobType", "SPARK");
- result.put("exitCode", exitCode);
- object.put("appState", "SUCCEEDED");
- result.put("finalStatus", "FINISHED");
- result.put("taskName", map.getString("jobName"));
- result.put("process", 1);
- result.put("uuid", uuid);
- result.put("__ORIGIN_TASK_ID", originTaskId);
- jobListener.call(jobId, originTaskId, "SUCCEEDED", result);
- processPublisher.publish(Constance.JOB_PROCESS_TOPIC, result.toJSONString());
- LOG.info("{}/{} 运行结束。 STATE: {}", jobId, originTaskId, result.toJSONString());
- } catch (Exception e) {
- JSONObject error = new JSONObject();
- error.put("source", "data3c");
- error.put("jobType", "SPARK");
- error.put("appState", "FAILED");
- error.put("finalStatus", "FAILED");
- error.put("taskName", map.getString("jobName"));
- error.put("message", e.getMessage());
- error.put("uuid", uuid);
- error.put("__ORIGIN_TASK_ID", originTaskId);
- //error.put("error", ExceptionUtils.getFullStackTrace(e));
- jobListener.call(jobId, originTaskId, "FAILED", error);
- processPublisher.publish(Constance.JOB_PROCESS_TOPIC, error.toJSONString());
- JobQueue.removeErrorJob(jobId, originTaskId);
- LOG.error("run job error: ", e);
- }
- }
- /**
- * 调用系统命令,启动Job
- *
- * @param args
- * @return 0 正常运行结束,返回0; 不正常的结束,返回小于 0
- */
- private int runAsRunTimeProcess(final String jobId,
- final String taskId,
- final JSONObject taskConfig,
- String[] args){
- String execute = "spark-submit";
- // 如果配置了SaprkHome,则从Spark Home 下读取spark-submit
- if(System.getenv("SPARK_HOME") != null){
- //execute = System.getenv("SPARK_HOME") + "bin" + execute;
- String sparkHome = System.getenv("SPARK_HOME");
- execute = StringUtils.join(Arrays.asList(sparkHome, "bin", execute).iterator(), File.separator);
- }
- String cmd = execute + " " + StringUtils.join(args, " ");
- LOG.info("execute command: " + cmd);
- try (SparkJobOutputStream out = new SparkJobOutputStream(taskProperties.getTaskLogDir(), jobId, taskId)){
- PumpStreamHandler streamHandler = new PumpStreamHandler(out);
- // 不限时间
- final ExecuteWatchdog watchDog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT);
- DefaultExecutor executor = new DefaultExecutor();
- executor.setExitValue(0);
- PidProcessDestroyer pidProcessDestroyer = new PidProcessDestroyer(taskId);
- executor.setStreamHandler(streamHandler);
- executor.setWatchdog(watchDog);
- executor.setProcessDestroyer(pidProcessDestroyer);
- JobQueue.addTaskObserver(jobId, taskId, watchDog);
- Map<String, String> environment = new HashMap<>(System.getenv());
- environment.put("SPARK_SUBMIT_OPTS", taskProperties.getSparkSubmitOpts());
- executor.setWorkingDirectory(new File(out.getTaskDir()));
- // 是否开启任务
- JobExecuteResultHandler handler = new JobExecuteResultHandler(
- warnService);
- handler.setTaskName(taskConfig.getString("jobName"));
- environment.put("TASK_DIR", out.getTaskDir());
- executor.execute(CommandLine.parse(cmd), environment, handler);
- handler.waitFor();
- return handler.getExitValue();
- } catch (Exception e) {
- LOG.error(jobId + "/" + taskId + " 执行任务提交命令失败。");
- throw new IllegalStateException(e.getMessage());
- } finally {
- JobQueue.removeTaskObserver(jobId, taskId);
- }
- }
- /**
- * 解析 Job 参数
- * @param jobId 任务ID
- * @param taskId 随机唯一ID
- * @param taskConfig task 配置
- * @param endPoint 反向推送RPC的端口号
- * @return
- */
- protected String[] parseArgs(final String jobId, final String taskId, final JSONObject taskConfig, final String endPoint) {
- String classInJar = null;
- // 路径相同,不会出现多个
- Set<String> jarPaths = new HashSet<>();
- try {
- // 添加第三方的依赖jar
- jarPaths.addAll(commonDependList);
- if(dependJars != null) {
- for (java.net.URL jar : dependJars) {
- jarPaths.add(jar.getPath());
- }
- }
- classInJar = ClasspathPackageScanner.findContainingJar(jobClass);
- //jarPaths.add(classInJar);
- jarPaths.remove(classInJar);
- } catch (Exception e) {
- throw new IllegalArgumentException(e);
- }
- List<String> list = new ArrayList<>();
- list.add("--master");
- list.add("'"+ taskConfig.getString("master") + "'");
- // local 模式,用于测试,不需要提交 deployMode 参数
- // 当为 yarn 是存在 yarn-client 和 yarn-cluster 两种模式
- if(StringUtils.equals(taskConfig.getString("master"), "yarn")) {
- list.add("--deploy-mode");
- list.add(taskConfig.getString("deployMode"));
- }
- // json 參數
- JSONObject parameters = (JSONObject) taskConfig.get("parameters");
- if(parameters == null) {
- parameters = new JSONObject();
- }
- list.add("--name");
- String name = getTaskName(taskConfig);
- list.add(name);
- list.add("--class");
- boolean amcImplJob = true;
- if(IJob.class.isAssignableFrom(jobClass)) {
- list.add(SparkTaskSubmits.class.getName());
- //集成了 AMC 的任务
- amcImplJob = true;
- } else {
- //list.add(taskConfig.getString("jobClass"));
- list.add(jobClass.getName());
- // 原生任务
- amcImplJob = false;
- }
- if(StringUtils.isNotBlank(taskProperties.getJvmOpts())) {
- list.add(taskProperties.getJvmOpts());
- }
- list.add("--queue");
- list.add(Optional.ofNullable(taskConfig.getString("queue")).orElse("default"));
- list.add("--conf");
- list.add("spark.app.name=" + name);
- Set<String> confArgs = new HashSet<>();
- if(StringUtils.isNotBlank(taskProperties.getSparkParameters())) {
- String[] confSplit = taskProperties.getSparkParameters().split(",");
- confArgs.addAll(Sets.newHashSet(confSplit));
- }
- if(confArgs.size() > 0) {
- for (String confArg : confArgs) {
- list.add("--conf");
- list.add(confArg);
- }
- }
- // 添加文件
- String addFiles = (String)taskConfig.get("files");
- if(StringUtils.isNotBlank(addFiles)) {
- String[] files = addFiles.split(",");
- Set<String> fileSet = new HashSet<>(files.length);
- if(files.length > 0) {
- for (String file : files) {
- File file1 = new File(file);
- if(file1.exists() && file1.isFile()) {
- fileSet.add(file1.getAbsolutePath());
- LOG.info("add spark file: {}", file1.getAbsolutePath());
- }
- }
- }
- if(fileSet.size() > 0) {
- list.add("--files");
- list.add(StringUtils.join(fileSet, ","));
- }
- }
- // 解决了 在 lib 包下,又在 jobs 包下的任务包。优先使用 jobs 下的
- String jobJarName = new File(classInJar).getName();
- String jobJar = classInJar;
- Set<String> dependJarsPath = new HashSet<>();
- for (String jarPath : jarPaths) {
- File jarFile = new File(jarPath);
- if(jarFile.exists() && jarFile.getName().equals(jobJarName)) {
- // 如果 jobs 下有和 lib 下的同名jar,则使用 jobs 下面的。
- jobJar = jarPath;
- } else {
- dependJarsPath.add(jarPath);
- }
- }
- list.add("--jars");
- String dependJars = StringUtils.join(dependJarsPath, ",");
- list.add(dependJars);
- list.add(jobJar);
- for (String jarPath : jarPaths) {
- LOG.info("add jar {}", jarPath);
- }
- // dataFile 文件地址,作为参数传入
- for (Map.Entry<String, String> taskParam : this.taskParams.entrySet()) {
- parameters.put(taskParam.getKey(), taskParam.getValue());
- }
- parameters.put("__JOB_ID", jobId);
- parameters.put("__TASK_ID", taskId);
- if(StringUtils.isNotBlank(originTaskId)){
- parameters.put("__ORIGIN_TASK_ID", originTaskId);
- }
- if(StringUtils.isNotBlank(taskProperties.getAppParameters())) {
- list.add(taskProperties.getAppParameters());
- } else {
- // 这个之后的是程序参数,args,第0个 参数
- list.add(jobClass.getName());
- org.apache.commons.codec.binary.Base64 base64 = new org.apache.commons.codec.binary.Base64(1, ":".getBytes());
- // 第1个 参数
- list.add(base64.encodeToString(parameters.toJSONString().getBytes(StandardCharsets.UTF_8)));
- // 第2个 参数
- list.add(endPoint);
- }
- LOG.info("args: " + StringUtils.join(list.iterator(), " "));
- return list.toArray(new String[list.size()]);
- }
- public void setWarnService(WarnService warnService) {
- this.warnService = warnService;
- }
- public void setJobListener(JobListener jobListener) {
- this.jobListener = jobListener;
- }
- public void setProcessPublisher(ProcessPublisher processPublisher) {
- this.processPublisher = processPublisher;
- }
- public void setTaskProperties(TaskProperties taskProperties) {
- this.taskProperties = taskProperties;
- }
- /**
- * 获取 Task name
- * @param taskConfig Task Config
- * @return 返回任务名称
- */
- protected String getTaskName(final JSONObject taskConfig) {
- String name = taskConfig.getString("jobName");
- if(StringUtils.isBlank(name) || "null".equalsIgnoreCase(name)) {
- name = jobClass.getSimpleName();
- }
- // 把前端的任务名称追加到yarn的name上,yarn上好识别
- String forceTaskName = taskConfig.getString("taskName");
- if(StringUtils.isNotBlank(forceTaskName)){
- name = name + ":" + forceTaskName;
- }
- return name;
- }
- public List<java.net.URL> getJars(String jobId, Set<String> dependJars) throws Exception {
- List<java.net.URL> jars = new ArrayList<>(dependJars.size());
- for (String dependJar : dependJars) {
- File file = new File(dependJar);
- if (file.exists() && file.isFile()) {
- jars.add(file.toURL());
- continue;
- } else {
- String jobJarDir = taskProperties.getSparkLibDir();
- // 单个文件, 找不到
- file = new File(jobJarDir, dependJar);
- if (file.exists() && file.isFile()) {
- jars.add(file.toURL());
- continue;
- }
- }
- LOG.warn("{} not exists, skip file.", dependJar);
- }
- return jars;
- }
- }
|