JobQueue.java 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407
  1. package com.primeton.datainsight.task;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.google.common.collect.ImmutableSet;
  4. import com.primeton.datainsight.support.JobListener;
  5. import org.apache.commons.exec.ExecuteWatchdog;
  6. import org.slf4j.Logger;
  7. import org.slf4j.LoggerFactory;
  8. import java.util.Map;
  9. import java.util.Set;
  10. import java.util.concurrent.ConcurrentHashMap;
  11. /**
  12. * <p>
  13. * commons of this class
  14. * </p>
  15. *
  16. * Created by zhaopx on 2018/1/24 0024-14:56
  17. * Vendor: primeton.com
  18. */
  19. public class JobQueue {
  20. /**
  21. * jobId -> Job Detail
  22. */
  23. protected final static Map<String, JobEntity> taskId2Job = new ConcurrentHashMap();
  24. /**
  25. *
  26. * Yarn 上运行的 Application ID 和状态的缓存。appid -> {jobId, state}
  27. *
  28. */
  29. protected final static Map<String, AppEntity> appId2Task = new ConcurrentHashMap();
  30. /**
  31. *
  32. * Yarn 上运行的 Application ID 和状态的缓存。appid -> {jobId, state}
  33. *
  34. */
  35. protected final static Map<String, ExecuteWatchdog> TASKID_TO_TASKREF_MAP = new ConcurrentHashMap();
  36. protected static Logger LOG = LoggerFactory.getLogger(JobQueue.class);
  37. /**
  38. * 添加一个Task的 Observer 监听者
  39. * @param jobId Job ID
  40. * @param taskId TaskID
  41. * @param watchdog 监听者
  42. */
  43. public static void addTaskObserver(String jobId, String taskId, ExecuteWatchdog watchdog) {
  44. TASKID_TO_TASKREF_MAP.put(taskId, watchdog);
  45. }
  46. /**
  47. * 添加一个Task的 Observer 监听者
  48. * @param jobId Job ID
  49. * @param taskId TaskID
  50. */
  51. public static void removeTaskObserver(String jobId, String taskId) {
  52. TASKID_TO_TASKREF_MAP.remove(taskId);
  53. }
  54. /**
  55. * 添加一个Task的 Observer 监听者
  56. * @param jobId Job ID
  57. * @param taskId TaskID
  58. */
  59. public static ExecuteWatchdog getTaskObserver(String jobId, String taskId) {
  60. return TASKID_TO_TASKREF_MAP.get(taskId);
  61. }
  62. /**
  63. * 返回尚未运行的 Application
  64. * @return
  65. */
  66. public static Set<String> getBeforeRunning(){
  67. ImmutableSet.Builder<String> builder = ImmutableSet.builder();
  68. for (Map.Entry<String, AppEntity> entry : appId2Task.entrySet()) {
  69. if(entry.getValue().getState() == null) {
  70. continue;
  71. }
  72. // Yarn 任务在运行前的几个状态
  73. if("NEW|NEW_SAVING|SUBMITTED|ACCEPTED".contains(entry.getValue().getState())) {
  74. builder.add(entry.getKey());
  75. }
  76. }
  77. return builder.build();
  78. }
  79. /**
  80. * 返回正在运行的 Application
  81. * @return
  82. */
  83. public static Set<String> getRunningJob(){
  84. ImmutableSet.Builder<String> builder = ImmutableSet.builder();
  85. for (Map.Entry<String, AppEntity> entry : appId2Task.entrySet()) {
  86. if("RUNNING".equals(entry.getValue().getState())) {
  87. builder.add(entry.getKey());
  88. }
  89. }
  90. return builder.build();
  91. }
  92. /**
  93. * 返回正在运行的 Quartz 任务
  94. * @return
  95. */
  96. public static Set<JobEntity> getRunningJob0(){
  97. ImmutableSet.Builder<JobEntity> builder = ImmutableSet.builder();
  98. for (Map.Entry<String, JobEntity> entry : taskId2Job.entrySet()) {
  99. builder.add(entry.getValue());
  100. }
  101. return builder.build();
  102. }
  103. /**
  104. * 返回运行结束的 Application
  105. * @return
  106. */
  107. public static Set<String> getFinishedJob() {
  108. ImmutableSet.Builder<String> builder = ImmutableSet.builder();
  109. for (Map.Entry<String, AppEntity> entry : appId2Task.entrySet()) {
  110. if(entry.getValue().getState() == null) {
  111. continue;
  112. }
  113. if("FINISHED|FAILED|KILLED".contains(entry.getValue().getState())) {
  114. builder.add(entry.getKey());
  115. }
  116. }
  117. return builder.build();
  118. }
  119. /**
  120. * Spark Job 启动后推送到队列
  121. * @param jobId 任务ID
  122. * @param taskId taskID
  123. * @param map 任务参数
  124. * @param jobListener 任务回调
  125. */
  126. public static void pushNewTask(String jobId, String taskId, String uuid, JSONObject map, JobListener jobListener) {
  127. JobEntity jobEntity = new JobEntity(jobId, taskId, uuid, map);
  128. jobEntity.setJobListener(jobListener);
  129. taskId2Job.put(taskId, jobEntity);
  130. }
  131. /**
  132. * Yarn 反向推送到该接口,日志解析端推送
  133. * @param appId AppID
  134. * @param jobId JobID
  135. * @param taskId TASK
  136. * @param state 状态
  137. */
  138. public static void runningTask(String appId, String jobId, String taskId, String state) {
  139. AppEntity appEntity = appId2Task.get(appId);
  140. if(appEntity == null){
  141. appEntity = new AppEntity(appId);
  142. }
  143. appEntity.setJobId(jobId);
  144. appEntity.setTaskId(taskId);
  145. appEntity.setState(state);
  146. appId2Task.put(appId, appEntity);
  147. // 运行前, 回调
  148. JobEntity entity = taskId2Job.get(taskId);
  149. if(entity != null && entity.getJobListener() != null){
  150. JSONObject json = new JSONObject();
  151. json.put("source", "console");
  152. json.put("appId", appId);
  153. json.put("uuid", entity.getUuid());
  154. entity.getJobListener().call(jobId, taskId, state, json);
  155. }
  156. // Job 运行结束,一定会发送一次或多次,不用特别处理,运行前可能队列阻塞,启动迟缓,跟超时不同,需要回调
  157. }
  158. /**
  159. * 移除 Job,移除后不再做监控
  160. * @param appId Yarn AppId
  161. */
  162. public static void removeMonitorJob(String appId) {
  163. AppEntity appEntity = appId2Task.get(appId);
  164. if(appEntity == null){
  165. return;
  166. }
  167. JobEntity jobEntity = taskId2Job.get(appEntity.getTaskId());
  168. if(jobEntity == null){
  169. LOG.warn("{} 已完成,移除该任务。", appEntity.getTaskId());
  170. appId2Task.remove(appId);
  171. return;
  172. }
  173. // 移除Job
  174. appId2Task.remove(appId);
  175. taskId2Job.remove(appEntity.getTaskId());
  176. LOG.warn("{}/{} 已完成,移除该任务。", appEntity.getJobId(), appId);
  177. }
  178. /**
  179. * 删除任务
  180. * @param jobId 任务JobID
  181. * @param taskId 任务TaskID
  182. */
  183. public static void removeMonitorTask(String jobId, String taskId) {
  184. taskId2Job.remove(taskId);
  185. LOG.warn("{}/{} 已完成,移除该任务。", jobId, taskId);
  186. }
  187. /**
  188. * 移除任务
  189. * @param jobId
  190. * @param taskId
  191. */
  192. public static void removeErrorJob(String jobId, String taskId){
  193. taskId2Job.remove(taskId);
  194. LOG.warn("{}/{} 运行失败,从动态列表中移除。", jobId, taskId);
  195. }
  196. /**
  197. * 获取 App Job 信息
  198. * @param appId
  199. * @return
  200. */
  201. public static AppEntity getApp(String appId) {
  202. return appId2Task.get(appId);
  203. }
  204. public static class AppEntity {
  205. final String appId;
  206. String jobId;
  207. String taskId;
  208. String state;
  209. public AppEntity(String appId) {
  210. this.appId = appId;
  211. }
  212. public String getAppId() {
  213. return appId;
  214. }
  215. public String getJobId() {
  216. return jobId;
  217. }
  218. public void setJobId(String jobId) {
  219. this.jobId = jobId;
  220. }
  221. public String getTaskId() {
  222. return taskId;
  223. }
  224. public void setTaskId(String taskId) {
  225. this.taskId = taskId;
  226. }
  227. public String getState() {
  228. return state;
  229. }
  230. public void setState(String state) {
  231. this.state = state;
  232. }
  233. @Override
  234. public boolean equals(Object o) {
  235. if (this == o) return true;
  236. if (o == null || getClass() != o.getClass()) return false;
  237. AppEntity appEntity = (AppEntity) o;
  238. return appId != null ? appId.equals(appEntity.appId) : appEntity.appId == null;
  239. }
  240. @Override
  241. public int hashCode() {
  242. return appId != null ? appId.hashCode() : 0;
  243. }
  244. }
  245. public static class JobEntity {
  246. /**
  247. * Job ID
  248. */
  249. String jobId;
  250. /**
  251. * Task ID
  252. */
  253. String taskId;
  254. /**
  255. * UUID
  256. */
  257. String uuid;
  258. /**
  259. * Job 配置
  260. */
  261. JSONObject map;
  262. /**
  263. * 任务消息结果回调
  264. */
  265. JobListener jobListener;
  266. /**
  267. * 构造方法
  268. * @param jobId Job ID
  269. * @param uuid Task ID
  270. * @param map Job 配置
  271. */
  272. public JobEntity(String jobId, String taskId, String uuid, JSONObject map) {
  273. this.jobId = jobId;
  274. this.taskId = taskId;
  275. this.uuid = uuid;
  276. this.map = map;
  277. }
  278. public String getJobId() {
  279. return jobId;
  280. }
  281. public String getTaskId() {
  282. return taskId;
  283. }
  284. public String getUuid() {
  285. return uuid;
  286. }
  287. public JSONObject getMap() {
  288. return map;
  289. }
  290. public JobListener getJobListener() {
  291. return jobListener;
  292. }
  293. public void setJobListener(JobListener jobListener) {
  294. this.jobListener = jobListener;
  295. }
  296. @Override
  297. public boolean equals(Object o) {
  298. if (this == o) return true;
  299. if (o == null || getClass() != o.getClass()) return false;
  300. JobEntity jobEntity = (JobEntity) o;
  301. return jobId != null ? jobId.equals(jobEntity.jobId) : jobEntity.jobId == null;
  302. }
  303. @Override
  304. public int hashCode() {
  305. return jobId != null ? jobId.hashCode() : 0;
  306. }
  307. }
  308. }