package com.primeton.datainsight.task; import com.primeton.datainsight.bo.WarnMessage; import com.primeton.datainsight.service.WarnService; import com.primeton.datainsight.utils.Constance; import com.primeton.datainsight.utils.TimeUtils; import org.apache.commons.exec.DefaultExecuteResultHandler; import org.apache.commons.exec.ExecuteException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Date; import java.util.concurrent.atomic.AtomicInteger; /** * * 任务超时告警监控 * * *
* * Created by zhaopx. * User: zhaopx * Date: 2019/4/11 * Time: 11:04 * ** * @author zhaopx */ public class JobExecuteResultHandler extends DefaultExecuteResultHandler { private static Logger LOG = LoggerFactory.getLogger(JobExecuteResultHandler.class); /** * 开始时间 */ public final long startTime = System.currentTimeMillis(); /** * 预警服务 */ final WarnService warnService; /** * 是否开启预警 */ final boolean enable; /** * 任务一小时预警 */ final long warnTimeout; /** * 1 小时候后每10分钟预警一次 */ final long warnTimeoutInterval; /** * 预警次数 */ final AtomicInteger warnCount = new AtomicInteger(0); /** * 任务名称,作为任务日志显示用 */ String taskName; public JobExecuteResultHandler(WarnService warnService) { this(warnService, warnService.isWarnEnable(), warnService.getWarnJobTimeout(), warnService.getWarnJobPeriod()); } /** * 任务执行多长时间后未完成预警,如果预警后每隔多长时间继续预警 * @param warnTimeout * @param warnTimeoutInterval */ public JobExecuteResultHandler(WarnService warnService, boolean enable, long warnTimeout, long warnTimeoutInterval) { this.warnService = warnService; this.enable = enable; this.warnTimeout = warnTimeout > 0 ? warnTimeout : -1L; this.warnTimeoutInterval = warnTimeoutInterval > 0 ? warnTimeoutInterval : -1L; } @Override public void onProcessComplete(int exitValue) { // 退出码非 0 则认为任务不成功 if(exitValue != 0){ throw new IllegalStateException("进程非正常退出。exitValue: " + exitValue); } super.onProcessComplete(exitValue); } @Override public void onProcessFailed(ExecuteException e) { super.onProcessFailed(e); if(warnService.isWarnEnable()) { WarnMessage warnMessage = new WarnMessage(); warnMessage.setWarnCatalog(Constance.WARN_CATALOG_TASK_EXE_FAILED); warnMessage.setWarnType("task"); warnMessage.setWarnLevel("HIGH"); warnMessage.setWarnStart(new Date()); warnMessage.setWarnSubtext("任务执行失败"); warnMessage.setWarnContent(taskName + "执行失败。REASON: " + e.getMessage()); warnService.warn(warnMessage); } } @Override public void waitFor() throws InterruptedException { while (!hasResult()) { super.waitFor(1000); long l = System.currentTimeMillis(); // 任务如果是开启的,就用任务的,否则使用全局的 boolean enable = this.enable; // 如果没有开始告警,则不会查询一次 getTaskRunningTimeout(),提高性能 long warnTimeout = enable && this.warnTimeout > 0 ? this.warnTimeout : 3600000; if(enable && (l - startTime) >= warnTimeout) { // 超时了,开始预警 long warnInterval = this.warnTimeoutInterval > 0 ? this.warnTimeoutInterval : 600000; long timeout = warnCount.get() * warnInterval + warnTimeout; if(l - startTime >= timeout) { warnCount.incrementAndGet(); LOG.warn("任务执行超时:" + TimeUtils.getTimeString(timeout)); WarnMessage warnMessage = new WarnMessage(); warnMessage.setWarnCatalog(Constance.WARN_CATALOG_TASK_TIMEOUT); warnMessage.setWarnType("task"); warnMessage.setWarnLevel("HIGH"); warnMessage.setWarnStart(new Date()); warnMessage.setWarnSubtext("任务执行超时"); warnMessage.setWarnContent(taskName + " 执行超时, 超时:" + TimeUtils.getTimeString(timeout)); warnService.warn(warnMessage); } } } } public void setTaskName(String taskName) { this.taskName = taskName; } }