package com.yiidata.intergration.web.task; import com.yiidata.intergration.web.config.TaskProperties; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import java.util.HashMap; import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; /** * * 控制 Spark Task 队列 * *
 *
 * Created by zhaopx.
 * User: zhaopx
 * Date: 2020/11/17
 * Time: 18:04
 *
 * 
* * @author zhaopx */ @Component @Slf4j public class TaskActivationTask implements Callable>, TaskActivation, InitializingBean, DisposableBean { @Autowired DefaultTaskFactory defaultTaskFactory; @Autowired ThreadPoolTaskExecutor taskExecutor; @Autowired TaskProperties taskProperties; /** * 线程完成的句柄 */ private Future> taskFuture; /** * 当前是否正在运行 */ private boolean running = true; @Override public void afterPropertiesSet() throws Exception { AsyncTaskQueue.getInstance(taskProperties.getParallel()); taskFuture = taskExecutor.submit(this); log.info("start scan TaskActivationTask thread."); } @Override public void active() { try { synchronized (this) { this.notify(); } } catch (Exception e) { log.error("notify TaskActivationTask thread error.", e); } } @Override public Map call() throws Exception { while (running) { // 开始提交任务 try { // 如果待运行的队列非空,且当前运行的并发小于要求的值,则开启任务 if(AsyncTaskQueue.getPendingTaskSize() > 0 && AsyncTaskQueue.getRunningTaskSize() < taskProperties.getParallel()) { Set pendingTasks = AsyncTaskQueue.getPendingTasks(); int count = taskProperties.getParallel() - AsyncTaskQueue.getRunningTaskSize(); // 当前可提交的任务数量 // 开始提交 count 个任务 int i = 0; for (String taskId : pendingTasks) { i++; AsyncTaskQueue.getInstance().execute(taskExecutor, defaultTaskFactory.newTask(taskId, (Map) AsyncTaskQueue.getTaskInfo(taskId))); if(i >= count) { break; } } } } catch (Exception e) { log.error("Submit InnerTaskExecution Error!", e); } try { synchronized (this) { // 停止 10s,采用 wait 比 sleep 好, 让出 CPU 时间,使执行其他线程 this.wait(TimeUnit.SECONDS.toMillis(30)); } } catch (InterruptedException e) { log.error("Interrupted TaskActivationTask Thread!"); // 系统中断 running = false; } } return new HashMap<>(); } @Override public void destroy() throws Exception { log.warn("destroy spring context, will stop TaskActivationTask thread!"); running = false; active(); if(taskFuture != null) { taskFuture.cancel(true); } } }