package com.primeton.datainsight.task;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import com.primeton.datainsight.configure.TaskProperties;
import com.primeton.datainsight.exec.SparkTaskSubmits;
import com.primeton.datainsight.job.IJob;
import com.primeton.datainsight.service.WarnService;
import com.primeton.datainsight.support.JobListener;
import com.primeton.datainsight.support.ProcessPublisher;
import com.primeton.datainsight.utils.ClasspathPackageScanner;
import com.primeton.datainsight.utils.Constance;
import com.primeton.datainsight.utils.NetworkInterfaceManager;
import org.apache.commons.exec.CommandLine;
import org.apache.commons.exec.DefaultExecutor;
import org.apache.commons.exec.ExecuteWatchdog;
import org.apache.commons.exec.PumpStreamHandler;
import org.apache.commons.lang.ClassUtils;
import org.apache.commons.lang.StringUtils;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
/**
*
*
* Created by zhaopx on 2018/1/3 0003-18:06
*/
public class SparkNativeLocalTask extends SparkTask {
private static Logger LOG = LoggerFactory.getLogger(SparkNativeLocalTask.class);
/**
* 公共的add的类列表
*/
private final List commonDependList;
/**
* 任务的JobId
*/
protected final String jobId;
/**
* 前端传输的taskId
*/
protected final String originTaskId;
/**
* 入口类
*/
protected final String jobClassStr;
/**
* Spark App Name
*/
protected final String appName;
/**
* 传递给 Task 的参数
*/
protected final Map taskParams;
/**
* 实现接口的类
*/
protected Class jobClass = null;
/**
* 告警服务
*/
WarnService warnService;
JobListener jobListener;
ProcessPublisher processPublisher;
/**
* 任务参数配置
*/
TaskProperties taskProperties;
/**
* dependency jar
*/
protected java.net.URL[] dependJars = null;
/**
* 不关心 originTaskId,采用 UUID 生成
* @param jobId
* @param taskParams 提供给App的入口参数
*/
public SparkNativeLocalTask(String jobId,
String jobClass,
String appName,
Map taskParams) {
this(jobId, jobClass, appName, UUID.randomUUID().toString().replaceAll("-", ""), taskParams);
}
/**
* 传入 dataFile 地址, file or hdfs
* @param originTaskId 这里的 originID 是指 taskInstanceId。 本质上Spark运行的ID和前面指定的ID是无法直接使用的,这里理解为做个关联
* @param taskParams 提供给App的入口参数
*/
public SparkNativeLocalTask(String jobId,
String jobClass,
String appName,
String originTaskId,
Map taskParams) {
super(StringUtils.isBlank(originTaskId) ? UUID.randomUUID().toString().replaceAll("-", "") : originTaskId);
this.jobId = jobId;
this.originTaskId = getTaskId();
this.jobClassStr = jobClass;
this.appName = appName;
this.taskParams = Optional.ofNullable(taskParams).orElse(new HashMap<>());
ImmutableList.Builder builder = ImmutableList.builder();
//String appLibHome = ServerConfig.getAppHome() + "/lib/";
try {
// 是绝对路径,无需加前缀
String fastJsonJar = ClasspathPackageScanner.findContainingJar(JSON.class);
builder.add(fastJsonJar);
} catch (Exception e) {
LOG.warn("find data3c commom jar, server jar, fastjson jar error.", e);
}
this.commonDependList = builder.build();
}
@Override
public void doExecute() throws Exception {
JSONObject params = new JSONObject();
params.put("id", jobId);
params.put("originTaskId", originTaskId);
params.put("jobClass", jobClassStr);
params.put("rpcPort", 8088);
params.put("jobName", appName);
params.put("master", taskProperties.getMaster());
params.put("deployMode", taskProperties.getDeployMode());
params.put("queue", taskProperties.getQueue());
params.put("appName", appName);
params.put("files", taskProperties.getFiles());
execute(params);
}
private void execute(Map params) throws Exception {
JSONObject map = (JSONObject)params;
final String jobId = map.getString("id");
// 前端的taskId,因为在任务中需要
final String originTaskId = map.getString("originTaskId");
// quartz job name, 跟 jobId 有关系
LOG.info("Start SparkJob {} at: {}", jobId, DateTime.now().toString("yyyy-MM-dd HH:mm:ss"));
final String jobClass = map.getString("jobClass");
// TASK UUID
String uuid = UUID.randomUUID().toString().replaceAll("-", "");
try {
Set dependJars = new HashSet<>();
if(StringUtils.isNotBlank(taskProperties.getJars())) {
String[] split = taskProperties.getJars().split(",");
dependJars.addAll(Sets.newHashSet(split));
}
List jars = getJars(jobId, dependJars);
if(!dependJars.isEmpty()){
this.dependJars = jars.toArray(new java.net.URL[jars.size()]);
if(!jars.isEmpty()) {
LOG.info("add jars: " + Arrays.toString(this.dependJars));
// URLClassLoader, 加载外部的 lib 资源,用于动态加载,部分job 在系统上add 后不用重启
java.net.URLClassLoader classLoader = new java.net.URLClassLoader(this.dependJars, this.getClass().getClassLoader());
this.jobClass = (Class extends IJob>) ClassUtils.getClass(classLoader, jobClass, false);
}
}
if(this.jobClass == null) {
this.jobClass = (Class extends IJob>) ClassUtils.getClass(jobClass, false);
}
} catch (Exception e) {
throw new IllegalStateException(e);
}
String serverHost = NetworkInterfaceManager.INSTANCE.getLocalHostAddress();
if(StringUtils.isBlank(serverHost)){
throw new IllegalArgumentException("unknown data3c server. env[DATA3C_HOST] or sysprop[data3c.host] to set.");
}
// RPC 反向回调的地址
String endPoint = serverHost + ":" + map.getIntValue("rpcPort");
String[] args = this.parseArgs(jobId, originTaskId, map, endPoint);
try {
JSONObject object = new JSONObject();
object.put("finalStatus", "UNDEFINED");
object.put("appState", "NEW");
object.put("source", "data3c");
object.put("jobType", "SPARK");
object.put("taskName", map.getString("jobName"));
object.put("uuid", uuid);
object.put("__ORIGIN_TASK_ID", originTaskId);
jobListener.call(jobId, originTaskId, "NEW", object);
processPublisher.publish(Constance.JOB_PROCESS_TOPIC, object.toJSONString());
// 把正在运行的任务放置到队列
JobQueue.pushNewTask(jobId, originTaskId, uuid, map, jobListener);
int exitCode = runAsRunTimeProcess(jobId,
originTaskId,
map,
args);
if(exitCode != 0) {
// 执行异常
throw new IllegalStateException("Spark Task Invalid exitCode: " + exitCode);
}
JSONObject result = new JSONObject();
result.put("source", "data3c");
result.put("jobType", "SPARK");
result.put("exitCode", exitCode);
object.put("appState", "SUCCEEDED");
result.put("finalStatus", "FINISHED");
result.put("taskName", map.getString("jobName"));
result.put("process", 1);
result.put("uuid", uuid);
result.put("__ORIGIN_TASK_ID", originTaskId);
jobListener.call(jobId, originTaskId, "SUCCEEDED", result);
processPublisher.publish(Constance.JOB_PROCESS_TOPIC, result.toJSONString());
LOG.info("{}/{} 运行结束。 STATE: {}", jobId, originTaskId, result.toJSONString());
} catch (Exception e) {
JSONObject error = new JSONObject();
error.put("source", "data3c");
error.put("jobType", "SPARK");
error.put("appState", "FAILED");
error.put("finalStatus", "FAILED");
error.put("taskName", map.getString("jobName"));
error.put("message", e.getMessage());
error.put("uuid", uuid);
error.put("__ORIGIN_TASK_ID", originTaskId);
//error.put("error", ExceptionUtils.getFullStackTrace(e));
jobListener.call(jobId, originTaskId, "FAILED", error);
processPublisher.publish(Constance.JOB_PROCESS_TOPIC, error.toJSONString());
JobQueue.removeErrorJob(jobId, originTaskId);
LOG.error("run job error: ", e);
}
}
/**
* 调用系统命令,启动Job
*
* @param args
* @return 0 正常运行结束,返回0; 不正常的结束,返回小于 0
*/
private int runAsRunTimeProcess(final String jobId,
final String taskId,
final JSONObject taskConfig,
String[] args){
String execute = "spark-submit";
// 如果配置了SaprkHome,则从Spark Home 下读取spark-submit
if(System.getenv("SPARK_HOME") != null){
//execute = System.getenv("SPARK_HOME") + "bin" + execute;
String sparkHome = System.getenv("SPARK_HOME");
execute = StringUtils.join(Arrays.asList(sparkHome, "bin", execute).iterator(), File.separator);
}
String cmd = execute + " " + StringUtils.join(args, " ");
LOG.info("execute command: " + cmd);
try (SparkJobOutputStream out = new SparkJobOutputStream(taskProperties.getTaskLogDir(), jobId, taskId)){
PumpStreamHandler streamHandler = new PumpStreamHandler(out);
// 不限时间
final ExecuteWatchdog watchDog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT);
DefaultExecutor executor = new DefaultExecutor();
executor.setExitValue(0);
PidProcessDestroyer pidProcessDestroyer = new PidProcessDestroyer(taskId);
executor.setStreamHandler(streamHandler);
executor.setWatchdog(watchDog);
executor.setProcessDestroyer(pidProcessDestroyer);
JobQueue.addTaskObserver(jobId, taskId, watchDog);
Map environment = new HashMap<>(System.getenv());
environment.put("SPARK_SUBMIT_OPTS", taskProperties.getSparkSubmitOpts());
executor.setWorkingDirectory(new File(out.getTaskDir()));
// 是否开启任务
JobExecuteResultHandler handler = new JobExecuteResultHandler(
warnService);
handler.setTaskName(taskConfig.getString("jobName"));
environment.put("TASK_DIR", out.getTaskDir());
executor.execute(CommandLine.parse(cmd), environment, handler);
handler.waitFor();
return handler.getExitValue();
} catch (Exception e) {
LOG.error(jobId + "/" + taskId + " 执行任务提交命令失败。");
throw new IllegalStateException(e.getMessage());
} finally {
JobQueue.removeTaskObserver(jobId, taskId);
}
}
/**
* 解析 Job 参数
* @param jobId 任务ID
* @param taskId 随机唯一ID
* @param taskConfig task 配置
* @param endPoint 反向推送RPC的端口号
* @return
*/
protected String[] parseArgs(final String jobId, final String taskId, final JSONObject taskConfig, final String endPoint) {
String classInJar = null;
// 路径相同,不会出现多个
Set jarPaths = new HashSet<>();
try {
// 添加第三方的依赖jar
jarPaths.addAll(commonDependList);
if(dependJars != null) {
for (java.net.URL jar : dependJars) {
jarPaths.add(jar.getPath());
}
}
classInJar = ClasspathPackageScanner.findContainingJar(jobClass);
//jarPaths.add(classInJar);
jarPaths.remove(classInJar);
} catch (Exception e) {
throw new IllegalArgumentException(e);
}
List list = new ArrayList<>();
list.add("--master");
list.add("'"+ taskConfig.getString("master") + "'");
// local 模式,用于测试,不需要提交 deployMode 参数
// 当为 yarn 是存在 yarn-client 和 yarn-cluster 两种模式
if(StringUtils.equals(taskConfig.getString("master"), "yarn")) {
list.add("--deploy-mode");
list.add(taskConfig.getString("deployMode"));
}
// json 參數
JSONObject parameters = (JSONObject) taskConfig.get("parameters");
if(parameters == null) {
parameters = new JSONObject();
}
list.add("--name");
String name = getTaskName(taskConfig);
list.add(name);
list.add("--class");
boolean amcImplJob = true;
if(IJob.class.isAssignableFrom(jobClass)) {
list.add(SparkTaskSubmits.class.getName());
//集成了 AMC 的任务
amcImplJob = true;
} else {
//list.add(taskConfig.getString("jobClass"));
list.add(jobClass.getName());
// 原生任务
amcImplJob = false;
}
if(StringUtils.isNotBlank(taskProperties.getJvmOpts())) {
list.add(taskProperties.getJvmOpts());
}
list.add("--queue");
list.add(Optional.ofNullable(taskConfig.getString("queue")).orElse("default"));
list.add("--conf");
list.add("spark.app.name=" + name);
Set confArgs = new HashSet<>();
if(StringUtils.isNotBlank(taskProperties.getSparkParameters())) {
String[] confSplit = taskProperties.getSparkParameters().split(",");
confArgs.addAll(Sets.newHashSet(confSplit));
}
if(confArgs.size() > 0) {
for (String confArg : confArgs) {
list.add("--conf");
list.add(confArg);
}
}
// 添加文件
String addFiles = (String)taskConfig.get("files");
if(StringUtils.isNotBlank(addFiles)) {
String[] files = addFiles.split(",");
Set fileSet = new HashSet<>(files.length);
if(files.length > 0) {
for (String file : files) {
File file1 = new File(file);
if(file1.exists() && file1.isFile()) {
fileSet.add(file1.getAbsolutePath());
LOG.info("add spark file: {}", file1.getAbsolutePath());
}
}
}
if(fileSet.size() > 0) {
list.add("--files");
list.add(StringUtils.join(fileSet, ","));
}
}
// 解决了 在 lib 包下,又在 jobs 包下的任务包。优先使用 jobs 下的
String jobJarName = new File(classInJar).getName();
String jobJar = classInJar;
Set dependJarsPath = new HashSet<>();
for (String jarPath : jarPaths) {
File jarFile = new File(jarPath);
if(jarFile.exists() && jarFile.getName().equals(jobJarName)) {
// 如果 jobs 下有和 lib 下的同名jar,则使用 jobs 下面的。
jobJar = jarPath;
} else {
dependJarsPath.add(jarPath);
}
}
list.add("--jars");
String dependJars = StringUtils.join(dependJarsPath, ",");
list.add(dependJars);
list.add(jobJar);
for (String jarPath : jarPaths) {
LOG.info("add jar {}", jarPath);
}
// dataFile 文件地址,作为参数传入
for (Map.Entry taskParam : this.taskParams.entrySet()) {
parameters.put(taskParam.getKey(), taskParam.getValue());
}
parameters.put("__JOB_ID", jobId);
parameters.put("__TASK_ID", taskId);
if(StringUtils.isNotBlank(originTaskId)){
parameters.put("__ORIGIN_TASK_ID", originTaskId);
}
if(StringUtils.isNotBlank(taskProperties.getAppParameters())) {
list.add(taskProperties.getAppParameters());
} else {
// 这个之后的是程序参数,args,第0个 参数
list.add(jobClass.getName());
org.apache.commons.codec.binary.Base64 base64 = new org.apache.commons.codec.binary.Base64(1, ":".getBytes());
// 第1个 参数
list.add(base64.encodeToString(parameters.toJSONString().getBytes(StandardCharsets.UTF_8)));
// 第2个 参数
list.add(endPoint);
}
LOG.info("args: " + StringUtils.join(list.iterator(), " "));
return list.toArray(new String[list.size()]);
}
public void setWarnService(WarnService warnService) {
this.warnService = warnService;
}
public void setJobListener(JobListener jobListener) {
this.jobListener = jobListener;
}
public void setProcessPublisher(ProcessPublisher processPublisher) {
this.processPublisher = processPublisher;
}
public void setTaskProperties(TaskProperties taskProperties) {
this.taskProperties = taskProperties;
}
/**
* 获取 Task name
* @param taskConfig Task Config
* @return 返回任务名称
*/
protected String getTaskName(final JSONObject taskConfig) {
String name = taskConfig.getString("jobName");
if(StringUtils.isBlank(name) || "null".equalsIgnoreCase(name)) {
name = jobClass.getSimpleName();
}
// 把前端的任务名称追加到yarn的name上,yarn上好识别
String forceTaskName = taskConfig.getString("taskName");
if(StringUtils.isNotBlank(forceTaskName)){
name = name + ":" + forceTaskName;
}
return name;
}
public List getJars(String jobId, Set dependJars) throws Exception {
List jars = new ArrayList<>(dependJars.size());
for (String dependJar : dependJars) {
File file = new File(dependJar);
if (file.exists() && file.isFile()) {
jars.add(file.toURL());
continue;
} else {
String jobJarDir = taskProperties.getSparkLibDir();
// 单个文件, 找不到
file = new File(jobJarDir, dependJar);
if (file.exists() && file.isFile()) {
jars.add(file.toURL());
continue;
}
}
LOG.warn("{} not exists, skip file.", dependJar);
}
return jars;
}
}