123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134 |
- package com.yiidata.intergration.web.task;
- import com.yiidata.intergration.web.config.TaskProperties;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.beans.factory.DisposableBean;
- import org.springframework.beans.factory.InitializingBean;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
- import org.springframework.stereotype.Component;
- import java.util.HashMap;
- import java.util.Map;
- import java.util.Set;
- import java.util.concurrent.Callable;
- import java.util.concurrent.Future;
- import java.util.concurrent.TimeUnit;
- /**
- *
- * 控制 Spark Task 队列
- *
- * <pre>
- *
- * Created by zhaopx.
- * User: zhaopx
- * Date: 2020/11/17
- * Time: 18:04
- *
- * </pre>
- *
- * @author zhaopx
- */
- @Component
- @Slf4j
- public class TaskActivationTask implements Callable<Map<String, Object>>, TaskActivation,
- InitializingBean, DisposableBean {
- @Autowired
- DefaultTaskFactory defaultTaskFactory;
- @Autowired
- ThreadPoolTaskExecutor taskExecutor;
- @Autowired
- TaskProperties taskProperties;
- /**
- * 线程完成的句柄
- */
- private Future<Map<String, Object>> taskFuture;
- /**
- * 当前是否正在运行
- */
- private boolean running = true;
- @Override
- public void afterPropertiesSet() throws Exception {
- AsyncTaskQueue.getInstance(taskProperties.getParallel());
- taskFuture = taskExecutor.submit(this);
- log.info("start scan TaskActivationTask thread.");
- }
- @Override
- public void active() {
- try {
- synchronized (this) {
- this.notify();
- }
- } catch (Exception e) {
- log.error("notify TaskActivationTask thread error.", e);
- }
- }
- @Override
- public Map<String, Object> call() throws Exception {
- while (running) {
- // 开始提交任务
- try {
- // 如果待运行的队列非空,且当前运行的并发小于要求的值,则开启任务
- if(AsyncTaskQueue.getPendingTaskSize() > 0 && AsyncTaskQueue.getRunningTaskSize() < taskProperties.getParallel()) {
- Set<String> pendingTasks = AsyncTaskQueue.getPendingTasks();
- int count = taskProperties.getParallel() - AsyncTaskQueue.getRunningTaskSize(); // 当前可提交的任务数量
- // 开始提交 count 个任务
- int i = 0;
- for (String taskId : pendingTasks) {
- i++;
- AsyncTaskQueue.getInstance().execute(taskExecutor,
- defaultTaskFactory.newTask(taskId, (Map) AsyncTaskQueue.getTaskInfo(taskId)));
- if(i >= count) {
- break;
- }
- }
- }
- } catch (Exception e) {
- log.error("Submit InnerTaskExecution Error!", e);
- }
- try {
- synchronized (this) {
- // 停止 10s,采用 wait 比 sleep 好, 让出 CPU 时间,使执行其他线程
- this.wait(TimeUnit.SECONDS.toMillis(30));
- }
- } catch (InterruptedException e) {
- log.error("Interrupted TaskActivationTask Thread!");
- // 系统中断
- running = false;
- }
- }
- return new HashMap<>();
- }
- @Override
- public void destroy() throws Exception {
- log.warn("destroy spring context, will stop TaskActivationTask thread!");
- running = false;
- active();
- if(taskFuture != null) {
- taskFuture.cancel(true);
- }
- }
- }
|