package com.primeton.datainsight.task;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.ImmutableSet;
import com.primeton.datainsight.support.JobListener;
import org.apache.commons.exec.ExecuteWatchdog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
*
* commons of this class
*
*
* Created by zhaopx on 2018/1/24 0024-14:56
* Vendor: primeton.com
*/
public class JobQueue {
/**
* jobId -> Job Detail
*/
protected final static Map taskId2Job = new ConcurrentHashMap();
/**
*
* Yarn 上运行的 Application ID 和状态的缓存。appid -> {jobId, state}
*
*/
protected final static Map appId2Task = new ConcurrentHashMap();
/**
*
* Yarn 上运行的 Application ID 和状态的缓存。appid -> {jobId, state}
*
*/
protected final static Map TASKID_TO_TASKREF_MAP = new ConcurrentHashMap();
protected static Logger LOG = LoggerFactory.getLogger(JobQueue.class);
/**
* 添加一个Task的 Observer 监听者
* @param jobId Job ID
* @param taskId TaskID
* @param watchdog 监听者
*/
public static void addTaskObserver(String jobId, String taskId, ExecuteWatchdog watchdog) {
TASKID_TO_TASKREF_MAP.put(taskId, watchdog);
}
/**
* 添加一个Task的 Observer 监听者
* @param jobId Job ID
* @param taskId TaskID
*/
public static void removeTaskObserver(String jobId, String taskId) {
TASKID_TO_TASKREF_MAP.remove(taskId);
}
/**
* 添加一个Task的 Observer 监听者
* @param jobId Job ID
* @param taskId TaskID
*/
public static ExecuteWatchdog getTaskObserver(String jobId, String taskId) {
return TASKID_TO_TASKREF_MAP.get(taskId);
}
/**
* 返回尚未运行的 Application
* @return
*/
public static Set getBeforeRunning(){
ImmutableSet.Builder builder = ImmutableSet.builder();
for (Map.Entry entry : appId2Task.entrySet()) {
if(entry.getValue().getState() == null) {
continue;
}
// Yarn 任务在运行前的几个状态
if("NEW|NEW_SAVING|SUBMITTED|ACCEPTED".contains(entry.getValue().getState())) {
builder.add(entry.getKey());
}
}
return builder.build();
}
/**
* 返回正在运行的 Application
* @return
*/
public static Set getRunningJob(){
ImmutableSet.Builder builder = ImmutableSet.builder();
for (Map.Entry entry : appId2Task.entrySet()) {
if("RUNNING".equals(entry.getValue().getState())) {
builder.add(entry.getKey());
}
}
return builder.build();
}
/**
* 返回正在运行的 Quartz 任务
* @return
*/
public static Set getRunningJob0(){
ImmutableSet.Builder builder = ImmutableSet.builder();
for (Map.Entry entry : taskId2Job.entrySet()) {
builder.add(entry.getValue());
}
return builder.build();
}
/**
* 返回运行结束的 Application
* @return
*/
public static Set getFinishedJob() {
ImmutableSet.Builder builder = ImmutableSet.builder();
for (Map.Entry entry : appId2Task.entrySet()) {
if(entry.getValue().getState() == null) {
continue;
}
if("FINISHED|FAILED|KILLED".contains(entry.getValue().getState())) {
builder.add(entry.getKey());
}
}
return builder.build();
}
/**
* Spark Job 启动后推送到队列
* @param jobId 任务ID
* @param taskId taskID
* @param map 任务参数
* @param jobListener 任务回调
*/
public static void pushNewTask(String jobId, String taskId, String uuid, JSONObject map, JobListener jobListener) {
JobEntity jobEntity = new JobEntity(jobId, taskId, uuid, map);
jobEntity.setJobListener(jobListener);
taskId2Job.put(taskId, jobEntity);
}
/**
* Yarn 反向推送到该接口,日志解析端推送
* @param appId AppID
* @param jobId JobID
* @param taskId TASK
* @param state 状态
*/
public static void runningTask(String appId, String jobId, String taskId, String state) {
AppEntity appEntity = appId2Task.get(appId);
if(appEntity == null){
appEntity = new AppEntity(appId);
}
appEntity.setJobId(jobId);
appEntity.setTaskId(taskId);
appEntity.setState(state);
appId2Task.put(appId, appEntity);
// 运行前, 回调
JobEntity entity = taskId2Job.get(taskId);
if(entity != null && entity.getJobListener() != null){
JSONObject json = new JSONObject();
json.put("source", "console");
json.put("appId", appId);
json.put("uuid", entity.getUuid());
entity.getJobListener().call(jobId, taskId, state, json);
}
// Job 运行结束,一定会发送一次或多次,不用特别处理,运行前可能队列阻塞,启动迟缓,跟超时不同,需要回调
}
/**
* 移除 Job,移除后不再做监控
* @param appId Yarn AppId
*/
public static void removeMonitorJob(String appId) {
AppEntity appEntity = appId2Task.get(appId);
if(appEntity == null){
return;
}
JobEntity jobEntity = taskId2Job.get(appEntity.getTaskId());
if(jobEntity == null){
LOG.warn("{} 已完成,移除该任务。", appEntity.getTaskId());
appId2Task.remove(appId);
return;
}
// 移除Job
appId2Task.remove(appId);
taskId2Job.remove(appEntity.getTaskId());
LOG.warn("{}/{} 已完成,移除该任务。", appEntity.getJobId(), appId);
}
/**
* 删除任务
* @param jobId 任务JobID
* @param taskId 任务TaskID
*/
public static void removeMonitorTask(String jobId, String taskId) {
taskId2Job.remove(taskId);
LOG.warn("{}/{} 已完成,移除该任务。", jobId, taskId);
}
/**
* 移除任务
* @param jobId
* @param taskId
*/
public static void removeErrorJob(String jobId, String taskId){
taskId2Job.remove(taskId);
LOG.warn("{}/{} 运行失败,从动态列表中移除。", jobId, taskId);
}
/**
* 获取 App Job 信息
* @param appId
* @return
*/
public static AppEntity getApp(String appId) {
return appId2Task.get(appId);
}
public static class AppEntity {
final String appId;
String jobId;
String taskId;
String state;
public AppEntity(String appId) {
this.appId = appId;
}
public String getAppId() {
return appId;
}
public String getJobId() {
return jobId;
}
public void setJobId(String jobId) {
this.jobId = jobId;
}
public String getTaskId() {
return taskId;
}
public void setTaskId(String taskId) {
this.taskId = taskId;
}
public String getState() {
return state;
}
public void setState(String state) {
this.state = state;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AppEntity appEntity = (AppEntity) o;
return appId != null ? appId.equals(appEntity.appId) : appEntity.appId == null;
}
@Override
public int hashCode() {
return appId != null ? appId.hashCode() : 0;
}
}
public static class JobEntity {
/**
* Job ID
*/
String jobId;
/**
* Task ID
*/
String taskId;
/**
* UUID
*/
String uuid;
/**
* Job 配置
*/
JSONObject map;
/**
* 任务消息结果回调
*/
JobListener jobListener;
/**
* 构造方法
* @param jobId Job ID
* @param uuid Task ID
* @param map Job 配置
*/
public JobEntity(String jobId, String taskId, String uuid, JSONObject map) {
this.jobId = jobId;
this.taskId = taskId;
this.uuid = uuid;
this.map = map;
}
public String getJobId() {
return jobId;
}
public String getTaskId() {
return taskId;
}
public String getUuid() {
return uuid;
}
public JSONObject getMap() {
return map;
}
public JobListener getJobListener() {
return jobListener;
}
public void setJobListener(JobListener jobListener) {
this.jobListener = jobListener;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
JobEntity jobEntity = (JobEntity) o;
return jobId != null ? jobId.equals(jobEntity.jobId) : jobEntity.jobId == null;
}
@Override
public int hashCode() {
return jobId != null ? jobId.hashCode() : 0;
}
}
}