123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407 |
- package com.primeton.datainsight.task;
- import com.alibaba.fastjson.JSONObject;
- import com.google.common.collect.ImmutableSet;
- import com.primeton.datainsight.support.JobListener;
- import org.apache.commons.exec.ExecuteWatchdog;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import java.util.Map;
- import java.util.Set;
- import java.util.concurrent.ConcurrentHashMap;
- /**
- * <p>
- * commons of this class
- * </p>
- *
- * Created by zhaopx on 2018/1/24 0024-14:56
- * Vendor: primeton.com
- */
- public class JobQueue {
- /**
- * jobId -> Job Detail
- */
- protected final static Map<String, JobEntity> taskId2Job = new ConcurrentHashMap();
- /**
- *
- * Yarn 上运行的 Application ID 和状态的缓存。appid -> {jobId, state}
- *
- */
- protected final static Map<String, AppEntity> appId2Task = new ConcurrentHashMap();
- /**
- *
- * Yarn 上运行的 Application ID 和状态的缓存。appid -> {jobId, state}
- *
- */
- protected final static Map<String, ExecuteWatchdog> TASKID_TO_TASKREF_MAP = new ConcurrentHashMap();
- protected static Logger LOG = LoggerFactory.getLogger(JobQueue.class);
- /**
- * 添加一个Task的 Observer 监听者
- * @param jobId Job ID
- * @param taskId TaskID
- * @param watchdog 监听者
- */
- public static void addTaskObserver(String jobId, String taskId, ExecuteWatchdog watchdog) {
- TASKID_TO_TASKREF_MAP.put(taskId, watchdog);
- }
- /**
- * 添加一个Task的 Observer 监听者
- * @param jobId Job ID
- * @param taskId TaskID
- */
- public static void removeTaskObserver(String jobId, String taskId) {
- TASKID_TO_TASKREF_MAP.remove(taskId);
- }
- /**
- * 添加一个Task的 Observer 监听者
- * @param jobId Job ID
- * @param taskId TaskID
- */
- public static ExecuteWatchdog getTaskObserver(String jobId, String taskId) {
- return TASKID_TO_TASKREF_MAP.get(taskId);
- }
- /**
- * 返回尚未运行的 Application
- * @return
- */
- public static Set<String> getBeforeRunning(){
- ImmutableSet.Builder<String> builder = ImmutableSet.builder();
- for (Map.Entry<String, AppEntity> entry : appId2Task.entrySet()) {
- if(entry.getValue().getState() == null) {
- continue;
- }
- // Yarn 任务在运行前的几个状态
- if("NEW|NEW_SAVING|SUBMITTED|ACCEPTED".contains(entry.getValue().getState())) {
- builder.add(entry.getKey());
- }
- }
- return builder.build();
- }
- /**
- * 返回正在运行的 Application
- * @return
- */
- public static Set<String> getRunningJob(){
- ImmutableSet.Builder<String> builder = ImmutableSet.builder();
- for (Map.Entry<String, AppEntity> entry : appId2Task.entrySet()) {
- if("RUNNING".equals(entry.getValue().getState())) {
- builder.add(entry.getKey());
- }
- }
- return builder.build();
- }
- /**
- * 返回正在运行的 Quartz 任务
- * @return
- */
- public static Set<JobEntity> getRunningJob0(){
- ImmutableSet.Builder<JobEntity> builder = ImmutableSet.builder();
- for (Map.Entry<String, JobEntity> entry : taskId2Job.entrySet()) {
- builder.add(entry.getValue());
- }
- return builder.build();
- }
- /**
- * 返回运行结束的 Application
- * @return
- */
- public static Set<String> getFinishedJob() {
- ImmutableSet.Builder<String> builder = ImmutableSet.builder();
- for (Map.Entry<String, AppEntity> entry : appId2Task.entrySet()) {
- if(entry.getValue().getState() == null) {
- continue;
- }
- if("FINISHED|FAILED|KILLED".contains(entry.getValue().getState())) {
- builder.add(entry.getKey());
- }
- }
- return builder.build();
- }
- /**
- * Spark Job 启动后推送到队列
- * @param jobId 任务ID
- * @param taskId taskID
- * @param map 任务参数
- * @param jobListener 任务回调
- */
- public static void pushNewTask(String jobId, String taskId, String uuid, JSONObject map, JobListener jobListener) {
- JobEntity jobEntity = new JobEntity(jobId, taskId, uuid, map);
- jobEntity.setJobListener(jobListener);
- taskId2Job.put(taskId, jobEntity);
- }
- /**
- * Yarn 反向推送到该接口,日志解析端推送
- * @param appId AppID
- * @param jobId JobID
- * @param taskId TASK
- * @param state 状态
- */
- public static void runningTask(String appId, String jobId, String taskId, String state) {
- AppEntity appEntity = appId2Task.get(appId);
- if(appEntity == null){
- appEntity = new AppEntity(appId);
- }
- appEntity.setJobId(jobId);
- appEntity.setTaskId(taskId);
- appEntity.setState(state);
- appId2Task.put(appId, appEntity);
- // 运行前, 回调
- JobEntity entity = taskId2Job.get(taskId);
- if(entity != null && entity.getJobListener() != null){
- JSONObject json = new JSONObject();
- json.put("source", "console");
- json.put("appId", appId);
- json.put("uuid", entity.getUuid());
- entity.getJobListener().call(jobId, taskId, state, json);
- }
- // Job 运行结束,一定会发送一次或多次,不用特别处理,运行前可能队列阻塞,启动迟缓,跟超时不同,需要回调
- }
- /**
- * 移除 Job,移除后不再做监控
- * @param appId Yarn AppId
- */
- public static void removeMonitorJob(String appId) {
- AppEntity appEntity = appId2Task.get(appId);
- if(appEntity == null){
- return;
- }
- JobEntity jobEntity = taskId2Job.get(appEntity.getTaskId());
- if(jobEntity == null){
- LOG.warn("{} 已完成,移除该任务。", appEntity.getTaskId());
- appId2Task.remove(appId);
- return;
- }
- // 移除Job
- appId2Task.remove(appId);
- taskId2Job.remove(appEntity.getTaskId());
- LOG.warn("{}/{} 已完成,移除该任务。", appEntity.getJobId(), appId);
- }
- /**
- * 删除任务
- * @param jobId 任务JobID
- * @param taskId 任务TaskID
- */
- public static void removeMonitorTask(String jobId, String taskId) {
- taskId2Job.remove(taskId);
- LOG.warn("{}/{} 已完成,移除该任务。", jobId, taskId);
- }
- /**
- * 移除任务
- * @param jobId
- * @param taskId
- */
- public static void removeErrorJob(String jobId, String taskId){
- taskId2Job.remove(taskId);
- LOG.warn("{}/{} 运行失败,从动态列表中移除。", jobId, taskId);
- }
- /**
- * 获取 App Job 信息
- * @param appId
- * @return
- */
- public static AppEntity getApp(String appId) {
- return appId2Task.get(appId);
- }
- public static class AppEntity {
- final String appId;
- String jobId;
- String taskId;
- String state;
- public AppEntity(String appId) {
- this.appId = appId;
- }
- public String getAppId() {
- return appId;
- }
- public String getJobId() {
- return jobId;
- }
- public void setJobId(String jobId) {
- this.jobId = jobId;
- }
- public String getTaskId() {
- return taskId;
- }
- public void setTaskId(String taskId) {
- this.taskId = taskId;
- }
- public String getState() {
- return state;
- }
- public void setState(String state) {
- this.state = state;
- }
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- AppEntity appEntity = (AppEntity) o;
- return appId != null ? appId.equals(appEntity.appId) : appEntity.appId == null;
- }
- @Override
- public int hashCode() {
- return appId != null ? appId.hashCode() : 0;
- }
- }
- public static class JobEntity {
- /**
- * Job ID
- */
- String jobId;
- /**
- * Task ID
- */
- String taskId;
- /**
- * UUID
- */
- String uuid;
- /**
- * Job 配置
- */
- JSONObject map;
- /**
- * 任务消息结果回调
- */
- JobListener jobListener;
- /**
- * 构造方法
- * @param jobId Job ID
- * @param uuid Task ID
- * @param map Job 配置
- */
- public JobEntity(String jobId, String taskId, String uuid, JSONObject map) {
- this.jobId = jobId;
- this.taskId = taskId;
- this.uuid = uuid;
- this.map = map;
- }
- public String getJobId() {
- return jobId;
- }
- public String getTaskId() {
- return taskId;
- }
- public String getUuid() {
- return uuid;
- }
- public JSONObject getMap() {
- return map;
- }
- public JobListener getJobListener() {
- return jobListener;
- }
- public void setJobListener(JobListener jobListener) {
- this.jobListener = jobListener;
- }
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- JobEntity jobEntity = (JobEntity) o;
- return jobId != null ? jobId.equals(jobEntity.jobId) : jobEntity.jobId == null;
- }
- @Override
- public int hashCode() {
- return jobId != null ? jobId.hashCode() : 0;
- }
- }
- }
|