package com.primeton.datainsight.task; import com.alibaba.fastjson.JSONObject; import com.google.common.collect.ImmutableSet; import com.primeton.datainsight.support.JobListener; import org.apache.commons.exec.ExecuteWatchdog; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; /** *

* commons of this class *

* * Created by zhaopx on 2018/1/24 0024-14:56 * Vendor: primeton.com */ public class JobQueue { /** * jobId -> Job Detail */ protected final static Map taskId2Job = new ConcurrentHashMap(); /** * * Yarn 上运行的 Application ID 和状态的缓存。appid -> {jobId, state} * */ protected final static Map appId2Task = new ConcurrentHashMap(); /** * * Yarn 上运行的 Application ID 和状态的缓存。appid -> {jobId, state} * */ protected final static Map TASKID_TO_TASKREF_MAP = new ConcurrentHashMap(); protected static Logger LOG = LoggerFactory.getLogger(JobQueue.class); /** * 添加一个Task的 Observer 监听者 * @param jobId Job ID * @param taskId TaskID * @param watchdog 监听者 */ public static void addTaskObserver(String jobId, String taskId, ExecuteWatchdog watchdog) { TASKID_TO_TASKREF_MAP.put(taskId, watchdog); } /** * 添加一个Task的 Observer 监听者 * @param jobId Job ID * @param taskId TaskID */ public static void removeTaskObserver(String jobId, String taskId) { TASKID_TO_TASKREF_MAP.remove(taskId); } /** * 添加一个Task的 Observer 监听者 * @param jobId Job ID * @param taskId TaskID */ public static ExecuteWatchdog getTaskObserver(String jobId, String taskId) { return TASKID_TO_TASKREF_MAP.get(taskId); } /** * 返回尚未运行的 Application * @return */ public static Set getBeforeRunning(){ ImmutableSet.Builder builder = ImmutableSet.builder(); for (Map.Entry entry : appId2Task.entrySet()) { if(entry.getValue().getState() == null) { continue; } // Yarn 任务在运行前的几个状态 if("NEW|NEW_SAVING|SUBMITTED|ACCEPTED".contains(entry.getValue().getState())) { builder.add(entry.getKey()); } } return builder.build(); } /** * 返回正在运行的 Application * @return */ public static Set getRunningJob(){ ImmutableSet.Builder builder = ImmutableSet.builder(); for (Map.Entry entry : appId2Task.entrySet()) { if("RUNNING".equals(entry.getValue().getState())) { builder.add(entry.getKey()); } } return builder.build(); } /** * 返回正在运行的 Quartz 任务 * @return */ public static Set getRunningJob0(){ ImmutableSet.Builder builder = ImmutableSet.builder(); for (Map.Entry entry : taskId2Job.entrySet()) { builder.add(entry.getValue()); } return builder.build(); } /** * 返回运行结束的 Application * @return */ public static Set getFinishedJob() { ImmutableSet.Builder builder = ImmutableSet.builder(); for (Map.Entry entry : appId2Task.entrySet()) { if(entry.getValue().getState() == null) { continue; } if("FINISHED|FAILED|KILLED".contains(entry.getValue().getState())) { builder.add(entry.getKey()); } } return builder.build(); } /** * Spark Job 启动后推送到队列 * @param jobId 任务ID * @param taskId taskID * @param map 任务参数 * @param jobListener 任务回调 */ public static void pushNewTask(String jobId, String taskId, String uuid, JSONObject map, JobListener jobListener) { JobEntity jobEntity = new JobEntity(jobId, taskId, uuid, map); jobEntity.setJobListener(jobListener); taskId2Job.put(taskId, jobEntity); } /** * Yarn 反向推送到该接口,日志解析端推送 * @param appId AppID * @param jobId JobID * @param taskId TASK * @param state 状态 */ public static void runningTask(String appId, String jobId, String taskId, String state) { AppEntity appEntity = appId2Task.get(appId); if(appEntity == null){ appEntity = new AppEntity(appId); } appEntity.setJobId(jobId); appEntity.setTaskId(taskId); appEntity.setState(state); appId2Task.put(appId, appEntity); // 运行前, 回调 JobEntity entity = taskId2Job.get(taskId); if(entity != null && entity.getJobListener() != null){ JSONObject json = new JSONObject(); json.put("source", "console"); json.put("appId", appId); json.put("uuid", entity.getUuid()); entity.getJobListener().call(jobId, taskId, state, json); } // Job 运行结束,一定会发送一次或多次,不用特别处理,运行前可能队列阻塞,启动迟缓,跟超时不同,需要回调 } /** * 移除 Job,移除后不再做监控 * @param appId Yarn AppId */ public static void removeMonitorJob(String appId) { AppEntity appEntity = appId2Task.get(appId); if(appEntity == null){ return; } JobEntity jobEntity = taskId2Job.get(appEntity.getTaskId()); if(jobEntity == null){ LOG.warn("{} 已完成,移除该任务。", appEntity.getTaskId()); appId2Task.remove(appId); return; } // 移除Job appId2Task.remove(appId); taskId2Job.remove(appEntity.getTaskId()); LOG.warn("{}/{} 已完成,移除该任务。", appEntity.getJobId(), appId); } /** * 删除任务 * @param jobId 任务JobID * @param taskId 任务TaskID */ public static void removeMonitorTask(String jobId, String taskId) { taskId2Job.remove(taskId); LOG.warn("{}/{} 已完成,移除该任务。", jobId, taskId); } /** * 移除任务 * @param jobId * @param taskId */ public static void removeErrorJob(String jobId, String taskId){ taskId2Job.remove(taskId); LOG.warn("{}/{} 运行失败,从动态列表中移除。", jobId, taskId); } /** * 获取 App Job 信息 * @param appId * @return */ public static AppEntity getApp(String appId) { return appId2Task.get(appId); } public static class AppEntity { final String appId; String jobId; String taskId; String state; public AppEntity(String appId) { this.appId = appId; } public String getAppId() { return appId; } public String getJobId() { return jobId; } public void setJobId(String jobId) { this.jobId = jobId; } public String getTaskId() { return taskId; } public void setTaskId(String taskId) { this.taskId = taskId; } public String getState() { return state; } public void setState(String state) { this.state = state; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; AppEntity appEntity = (AppEntity) o; return appId != null ? appId.equals(appEntity.appId) : appEntity.appId == null; } @Override public int hashCode() { return appId != null ? appId.hashCode() : 0; } } public static class JobEntity { /** * Job ID */ String jobId; /** * Task ID */ String taskId; /** * UUID */ String uuid; /** * Job 配置 */ JSONObject map; /** * 任务消息结果回调 */ JobListener jobListener; /** * 构造方法 * @param jobId Job ID * @param uuid Task ID * @param map Job 配置 */ public JobEntity(String jobId, String taskId, String uuid, JSONObject map) { this.jobId = jobId; this.taskId = taskId; this.uuid = uuid; this.map = map; } public String getJobId() { return jobId; } public String getTaskId() { return taskId; } public String getUuid() { return uuid; } public JSONObject getMap() { return map; } public JobListener getJobListener() { return jobListener; } public void setJobListener(JobListener jobListener) { this.jobListener = jobListener; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; JobEntity jobEntity = (JobEntity) o; return jobId != null ? jobId.equals(jobEntity.jobId) : jobEntity.jobId == null; } @Override public int hashCode() { return jobId != null ? jobId.hashCode() : 0; } } }