SparkNativeLocalTask.java 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547
  1. package com.primeton.datainsight.task;
  2. import com.alibaba.fastjson.JSON;
  3. import com.alibaba.fastjson.JSONObject;
  4. import com.google.common.collect.ImmutableList;
  5. import com.google.common.collect.Sets;
  6. import com.primeton.datainsight.configure.TaskProperties;
  7. import com.primeton.datainsight.exec.SparkTaskSubmits;
  8. import com.primeton.datainsight.job.IJob;
  9. import com.primeton.datainsight.service.WarnService;
  10. import com.primeton.datainsight.support.JobListener;
  11. import com.primeton.datainsight.support.ProcessPublisher;
  12. import com.primeton.datainsight.utils.ClasspathPackageScanner;
  13. import com.primeton.datainsight.utils.Constance;
  14. import com.primeton.datainsight.utils.NetworkInterfaceManager;
  15. import org.apache.commons.exec.CommandLine;
  16. import org.apache.commons.exec.DefaultExecutor;
  17. import org.apache.commons.exec.ExecuteWatchdog;
  18. import org.apache.commons.exec.PumpStreamHandler;
  19. import org.apache.commons.lang.ClassUtils;
  20. import org.apache.commons.lang.StringUtils;
  21. import org.joda.time.DateTime;
  22. import org.slf4j.Logger;
  23. import org.slf4j.LoggerFactory;
  24. import java.io.File;
  25. import java.nio.charset.StandardCharsets;
  26. import java.util.ArrayList;
  27. import java.util.Arrays;
  28. import java.util.HashMap;
  29. import java.util.HashSet;
  30. import java.util.List;
  31. import java.util.Map;
  32. import java.util.Optional;
  33. import java.util.Set;
  34. import java.util.UUID;
  35. /**
  36. * <p>
  37. * </p>
  38. * Created by zhaopx on 2018/1/3 0003-18:06
  39. */
  40. public class SparkNativeLocalTask extends SparkTask {
  41. private static Logger LOG = LoggerFactory.getLogger(SparkNativeLocalTask.class);
  42. /**
  43. * 公共的add的类列表
  44. */
  45. private final List<String> commonDependList;
  46. /**
  47. * 任务的JobId
  48. */
  49. protected final String jobId;
  50. /**
  51. * 前端传输的taskId
  52. */
  53. protected final String originTaskId;
  54. /**
  55. * 入口类
  56. */
  57. protected final String jobClassStr;
  58. /**
  59. * Spark App Name
  60. */
  61. protected final String appName;
  62. /**
  63. * 传递给 Task 的参数
  64. */
  65. protected final Map<String, String> taskParams;
  66. /**
  67. * 实现接口的类
  68. */
  69. protected Class jobClass = null;
  70. /**
  71. * 告警服务
  72. */
  73. WarnService warnService;
  74. JobListener jobListener;
  75. ProcessPublisher processPublisher;
  76. /**
  77. * 任务参数配置
  78. */
  79. TaskProperties taskProperties;
  80. /**
  81. * dependency jar
  82. */
  83. protected java.net.URL[] dependJars = null;
  84. /**
  85. * 不关心 originTaskId,采用 UUID 生成
  86. * @param jobId
  87. * @param taskParams 提供给App的入口参数
  88. */
  89. public SparkNativeLocalTask(String jobId,
  90. String jobClass,
  91. String appName,
  92. Map<String, String> taskParams) {
  93. this(jobId, jobClass, appName, UUID.randomUUID().toString().replaceAll("-", ""), taskParams);
  94. }
  95. /**
  96. * 传入 dataFile 地址, file or hdfs
  97. * @param originTaskId 这里的 originID 是指 taskInstanceId。 本质上Spark运行的ID和前面指定的ID是无法直接使用的,这里理解为做个关联
  98. * @param taskParams 提供给App的入口参数
  99. */
  100. public SparkNativeLocalTask(String jobId,
  101. String jobClass,
  102. String appName,
  103. String originTaskId,
  104. Map<String, String> taskParams) {
  105. super(StringUtils.isBlank(originTaskId) ? UUID.randomUUID().toString().replaceAll("-", "") : originTaskId);
  106. this.jobId = jobId;
  107. this.originTaskId = getTaskId();
  108. this.jobClassStr = jobClass;
  109. this.appName = appName;
  110. this.taskParams = Optional.ofNullable(taskParams).orElse(new HashMap<>());
  111. ImmutableList.Builder<String> builder = ImmutableList.<String>builder();
  112. //String appLibHome = ServerConfig.getAppHome() + "/lib/";
  113. try {
  114. // 是绝对路径,无需加前缀
  115. String fastJsonJar = ClasspathPackageScanner.findContainingJar(JSON.class);
  116. builder.add(fastJsonJar);
  117. } catch (Exception e) {
  118. LOG.warn("find data3c commom jar, server jar, fastjson jar error.", e);
  119. }
  120. this.commonDependList = builder.build();
  121. }
  122. @Override
  123. public void doExecute() throws Exception {
  124. JSONObject params = new JSONObject();
  125. params.put("id", jobId);
  126. params.put("originTaskId", originTaskId);
  127. params.put("jobClass", jobClassStr);
  128. params.put("rpcPort", 8088);
  129. params.put("jobName", appName);
  130. params.put("master", taskProperties.getMaster());
  131. params.put("deployMode", taskProperties.getDeployMode());
  132. params.put("queue", taskProperties.getQueue());
  133. params.put("appName", appName);
  134. params.put("files", taskProperties.getFiles());
  135. execute(params);
  136. }
  137. private void execute(Map<String, Object> params) throws Exception {
  138. JSONObject map = (JSONObject)params;
  139. final String jobId = map.getString("id");
  140. // 前端的taskId,因为在任务中需要
  141. final String originTaskId = map.getString("originTaskId");
  142. // quartz job name, 跟 jobId 有关系
  143. LOG.info("Start SparkJob {} at: {}", jobId, DateTime.now().toString("yyyy-MM-dd HH:mm:ss"));
  144. final String jobClass = map.getString("jobClass");
  145. // TASK UUID
  146. String uuid = UUID.randomUUID().toString().replaceAll("-", "");
  147. try {
  148. Set<String> dependJars = new HashSet<>();
  149. if(StringUtils.isNotBlank(taskProperties.getJars())) {
  150. String[] split = taskProperties.getJars().split(",");
  151. dependJars.addAll(Sets.newHashSet(split));
  152. }
  153. List<java.net.URL> jars = getJars(jobId, dependJars);
  154. if(!dependJars.isEmpty()){
  155. this.dependJars = jars.toArray(new java.net.URL[jars.size()]);
  156. if(!jars.isEmpty()) {
  157. LOG.info("add jars: " + Arrays.toString(this.dependJars));
  158. // URLClassLoader, 加载外部的 lib 资源,用于动态加载,部分job 在系统上add 后不用重启
  159. java.net.URLClassLoader classLoader = new java.net.URLClassLoader(this.dependJars, this.getClass().getClassLoader());
  160. this.jobClass = (Class<? extends IJob>) ClassUtils.getClass(classLoader, jobClass, false);
  161. }
  162. }
  163. if(this.jobClass == null) {
  164. this.jobClass = (Class<? extends IJob>) ClassUtils.getClass(jobClass, false);
  165. }
  166. } catch (Exception e) {
  167. throw new IllegalStateException(e);
  168. }
  169. String serverHost = NetworkInterfaceManager.INSTANCE.getLocalHostAddress();
  170. if(StringUtils.isBlank(serverHost)){
  171. throw new IllegalArgumentException("unknown data3c server. env[DATA3C_HOST] or sysprop[data3c.host] to set.");
  172. }
  173. // RPC 反向回调的地址
  174. String endPoint = serverHost + ":" + map.getIntValue("rpcPort");
  175. String[] args = this.parseArgs(jobId, originTaskId, map, endPoint);
  176. try {
  177. JSONObject object = new JSONObject();
  178. object.put("finalStatus", "UNDEFINED");
  179. object.put("appState", "NEW");
  180. object.put("source", "data3c");
  181. object.put("jobType", "SPARK");
  182. object.put("taskName", map.getString("jobName"));
  183. object.put("uuid", uuid);
  184. object.put("__ORIGIN_TASK_ID", originTaskId);
  185. jobListener.call(jobId, originTaskId, "NEW", object);
  186. processPublisher.publish(Constance.JOB_PROCESS_TOPIC, object.toJSONString());
  187. // 把正在运行的任务放置到队列
  188. JobQueue.pushNewTask(jobId, originTaskId, uuid, map, jobListener);
  189. int exitCode = runAsRunTimeProcess(jobId,
  190. originTaskId,
  191. map,
  192. args);
  193. if(exitCode != 0) {
  194. // 执行异常
  195. throw new IllegalStateException("Spark Task Invalid exitCode: " + exitCode);
  196. }
  197. JSONObject result = new JSONObject();
  198. result.put("source", "data3c");
  199. result.put("jobType", "SPARK");
  200. result.put("exitCode", exitCode);
  201. object.put("appState", "SUCCEEDED");
  202. result.put("finalStatus", "FINISHED");
  203. result.put("taskName", map.getString("jobName"));
  204. result.put("process", 1);
  205. result.put("uuid", uuid);
  206. result.put("__ORIGIN_TASK_ID", originTaskId);
  207. jobListener.call(jobId, originTaskId, "SUCCEEDED", result);
  208. processPublisher.publish(Constance.JOB_PROCESS_TOPIC, result.toJSONString());
  209. LOG.info("{}/{} 运行结束。 STATE: {}", jobId, originTaskId, result.toJSONString());
  210. } catch (Exception e) {
  211. JSONObject error = new JSONObject();
  212. error.put("source", "data3c");
  213. error.put("jobType", "SPARK");
  214. error.put("appState", "FAILED");
  215. error.put("finalStatus", "FAILED");
  216. error.put("taskName", map.getString("jobName"));
  217. error.put("message", e.getMessage());
  218. error.put("uuid", uuid);
  219. error.put("__ORIGIN_TASK_ID", originTaskId);
  220. //error.put("error", ExceptionUtils.getFullStackTrace(e));
  221. jobListener.call(jobId, originTaskId, "FAILED", error);
  222. processPublisher.publish(Constance.JOB_PROCESS_TOPIC, error.toJSONString());
  223. JobQueue.removeErrorJob(jobId, originTaskId);
  224. LOG.error("run job error: ", e);
  225. }
  226. }
  227. /**
  228. * 调用系统命令,启动Job
  229. *
  230. * @param args
  231. * @return 0 正常运行结束,返回0; 不正常的结束,返回小于 0
  232. */
  233. private int runAsRunTimeProcess(final String jobId,
  234. final String taskId,
  235. final JSONObject taskConfig,
  236. String[] args){
  237. String execute = "spark-submit";
  238. // 如果配置了SaprkHome,则从Spark Home 下读取spark-submit
  239. if(System.getenv("SPARK_HOME") != null){
  240. //execute = System.getenv("SPARK_HOME") + "bin" + execute;
  241. String sparkHome = System.getenv("SPARK_HOME");
  242. execute = StringUtils.join(Arrays.asList(sparkHome, "bin", execute).iterator(), File.separator);
  243. }
  244. String cmd = execute + " " + StringUtils.join(args, " ");
  245. LOG.info("execute command: " + cmd);
  246. try (SparkJobOutputStream out = new SparkJobOutputStream(taskProperties.getTaskLogDir(), jobId, taskId)){
  247. PumpStreamHandler streamHandler = new PumpStreamHandler(out);
  248. // 不限时间
  249. final ExecuteWatchdog watchDog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT);
  250. DefaultExecutor executor = new DefaultExecutor();
  251. executor.setExitValue(0);
  252. PidProcessDestroyer pidProcessDestroyer = new PidProcessDestroyer(taskId);
  253. executor.setStreamHandler(streamHandler);
  254. executor.setWatchdog(watchDog);
  255. executor.setProcessDestroyer(pidProcessDestroyer);
  256. JobQueue.addTaskObserver(jobId, taskId, watchDog);
  257. Map<String, String> environment = new HashMap<>(System.getenv());
  258. environment.put("SPARK_SUBMIT_OPTS", taskProperties.getSparkSubmitOpts());
  259. executor.setWorkingDirectory(new File(out.getTaskDir()));
  260. // 是否开启任务
  261. JobExecuteResultHandler handler = new JobExecuteResultHandler(
  262. warnService);
  263. handler.setTaskName(taskConfig.getString("jobName"));
  264. environment.put("TASK_DIR", out.getTaskDir());
  265. executor.execute(CommandLine.parse(cmd), environment, handler);
  266. handler.waitFor();
  267. return handler.getExitValue();
  268. } catch (Exception e) {
  269. LOG.error(jobId + "/" + taskId + " 执行任务提交命令失败。");
  270. throw new IllegalStateException(e.getMessage());
  271. } finally {
  272. JobQueue.removeTaskObserver(jobId, taskId);
  273. }
  274. }
  275. /**
  276. * 解析 Job 参数
  277. * @param jobId 任务ID
  278. * @param taskId 随机唯一ID
  279. * @param taskConfig task 配置
  280. * @param endPoint 反向推送RPC的端口号
  281. * @return
  282. */
  283. protected String[] parseArgs(final String jobId, final String taskId, final JSONObject taskConfig, final String endPoint) {
  284. String classInJar = null;
  285. // 路径相同,不会出现多个
  286. Set<String> jarPaths = new HashSet<>();
  287. try {
  288. // 添加第三方的依赖jar
  289. jarPaths.addAll(commonDependList);
  290. if(dependJars != null) {
  291. for (java.net.URL jar : dependJars) {
  292. jarPaths.add(jar.getPath());
  293. }
  294. }
  295. classInJar = ClasspathPackageScanner.findContainingJar(jobClass);
  296. //jarPaths.add(classInJar);
  297. jarPaths.remove(classInJar);
  298. } catch (Exception e) {
  299. throw new IllegalArgumentException(e);
  300. }
  301. List<String> list = new ArrayList<>();
  302. list.add("--master");
  303. list.add("'"+ taskConfig.getString("master") + "'");
  304. // local 模式,用于测试,不需要提交 deployMode 参数
  305. // 当为 yarn 是存在 yarn-client 和 yarn-cluster 两种模式
  306. if(StringUtils.equals(taskConfig.getString("master"), "yarn")) {
  307. list.add("--deploy-mode");
  308. list.add(taskConfig.getString("deployMode"));
  309. }
  310. // json 參數
  311. JSONObject parameters = (JSONObject) taskConfig.get("parameters");
  312. if(parameters == null) {
  313. parameters = new JSONObject();
  314. }
  315. list.add("--name");
  316. String name = getTaskName(taskConfig);
  317. list.add(name);
  318. list.add("--class");
  319. boolean amcImplJob = true;
  320. if(IJob.class.isAssignableFrom(jobClass)) {
  321. list.add(SparkTaskSubmits.class.getName());
  322. //集成了 AMC 的任务
  323. amcImplJob = true;
  324. } else {
  325. //list.add(taskConfig.getString("jobClass"));
  326. list.add(jobClass.getName());
  327. // 原生任务
  328. amcImplJob = false;
  329. }
  330. if(StringUtils.isNotBlank(taskProperties.getJvmOpts())) {
  331. list.add(taskProperties.getJvmOpts());
  332. }
  333. list.add("--queue");
  334. list.add(Optional.ofNullable(taskConfig.getString("queue")).orElse("default"));
  335. list.add("--conf");
  336. list.add("spark.app.name=" + name);
  337. Set<String> confArgs = new HashSet<>();
  338. if(StringUtils.isNotBlank(taskProperties.getSparkParameters())) {
  339. String[] confSplit = taskProperties.getSparkParameters().split(",");
  340. confArgs.addAll(Sets.newHashSet(confSplit));
  341. }
  342. if(confArgs.size() > 0) {
  343. for (String confArg : confArgs) {
  344. list.add("--conf");
  345. list.add(confArg);
  346. }
  347. }
  348. // 添加文件
  349. String addFiles = (String)taskConfig.get("files");
  350. if(StringUtils.isNotBlank(addFiles)) {
  351. String[] files = addFiles.split(",");
  352. Set<String> fileSet = new HashSet<>(files.length);
  353. if(files.length > 0) {
  354. for (String file : files) {
  355. File file1 = new File(file);
  356. if(file1.exists() && file1.isFile()) {
  357. fileSet.add(file1.getAbsolutePath());
  358. LOG.info("add spark file: {}", file1.getAbsolutePath());
  359. }
  360. }
  361. }
  362. if(fileSet.size() > 0) {
  363. list.add("--files");
  364. list.add(StringUtils.join(fileSet, ","));
  365. }
  366. }
  367. // 解决了 在 lib 包下,又在 jobs 包下的任务包。优先使用 jobs 下的
  368. String jobJarName = new File(classInJar).getName();
  369. String jobJar = classInJar;
  370. Set<String> dependJarsPath = new HashSet<>();
  371. for (String jarPath : jarPaths) {
  372. File jarFile = new File(jarPath);
  373. if(jarFile.exists() && jarFile.getName().equals(jobJarName)) {
  374. // 如果 jobs 下有和 lib 下的同名jar,则使用 jobs 下面的。
  375. jobJar = jarPath;
  376. } else {
  377. dependJarsPath.add(jarPath);
  378. }
  379. }
  380. list.add("--jars");
  381. String dependJars = StringUtils.join(dependJarsPath, ",");
  382. list.add(dependJars);
  383. list.add(jobJar);
  384. for (String jarPath : jarPaths) {
  385. LOG.info("add jar {}", jarPath);
  386. }
  387. // dataFile 文件地址,作为参数传入
  388. for (Map.Entry<String, String> taskParam : this.taskParams.entrySet()) {
  389. parameters.put(taskParam.getKey(), taskParam.getValue());
  390. }
  391. parameters.put("__JOB_ID", jobId);
  392. parameters.put("__TASK_ID", taskId);
  393. if(StringUtils.isNotBlank(originTaskId)){
  394. parameters.put("__ORIGIN_TASK_ID", originTaskId);
  395. }
  396. if(StringUtils.isNotBlank(taskProperties.getAppParameters())) {
  397. list.add(taskProperties.getAppParameters());
  398. } else {
  399. // 这个之后的是程序参数,args,第0个 参数
  400. list.add(jobClass.getName());
  401. org.apache.commons.codec.binary.Base64 base64 = new org.apache.commons.codec.binary.Base64(1, ":".getBytes());
  402. // 第1个 参数
  403. list.add(base64.encodeToString(parameters.toJSONString().getBytes(StandardCharsets.UTF_8)));
  404. // 第2个 参数
  405. list.add(endPoint);
  406. }
  407. LOG.info("args: " + StringUtils.join(list.iterator(), " "));
  408. return list.toArray(new String[list.size()]);
  409. }
  410. public void setWarnService(WarnService warnService) {
  411. this.warnService = warnService;
  412. }
  413. public void setJobListener(JobListener jobListener) {
  414. this.jobListener = jobListener;
  415. }
  416. public void setProcessPublisher(ProcessPublisher processPublisher) {
  417. this.processPublisher = processPublisher;
  418. }
  419. public void setTaskProperties(TaskProperties taskProperties) {
  420. this.taskProperties = taskProperties;
  421. }
  422. /**
  423. * 获取 Task name
  424. * @param taskConfig Task Config
  425. * @return 返回任务名称
  426. */
  427. protected String getTaskName(final JSONObject taskConfig) {
  428. String name = taskConfig.getString("jobName");
  429. if(StringUtils.isBlank(name) || "null".equalsIgnoreCase(name)) {
  430. name = jobClass.getSimpleName();
  431. }
  432. // 把前端的任务名称追加到yarn的name上,yarn上好识别
  433. String forceTaskName = taskConfig.getString("taskName");
  434. if(StringUtils.isNotBlank(forceTaskName)){
  435. name = name + ":" + forceTaskName;
  436. }
  437. return name;
  438. }
  439. public List<java.net.URL> getJars(String jobId, Set<String> dependJars) throws Exception {
  440. List<java.net.URL> jars = new ArrayList<>(dependJars.size());
  441. for (String dependJar : dependJars) {
  442. File file = new File(dependJar);
  443. if (file.exists() && file.isFile()) {
  444. jars.add(file.toURL());
  445. continue;
  446. } else {
  447. String jobJarDir = taskProperties.getSparkLibDir();
  448. // 单个文件, 找不到
  449. file = new File(jobJarDir, dependJar);
  450. if (file.exists() && file.isFile()) {
  451. jars.add(file.toURL());
  452. continue;
  453. }
  454. }
  455. LOG.warn("{} not exists, skip file.", dependJar);
  456. }
  457. return jars;
  458. }
  459. }