TaskActivationTask.java 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. package com.yiidata.intergration.web.task;
  2. import com.yiidata.intergration.web.config.TaskProperties;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.springframework.beans.factory.DisposableBean;
  5. import org.springframework.beans.factory.InitializingBean;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
  8. import org.springframework.stereotype.Component;
  9. import java.util.HashMap;
  10. import java.util.Map;
  11. import java.util.Set;
  12. import java.util.concurrent.Callable;
  13. import java.util.concurrent.Future;
  14. import java.util.concurrent.TimeUnit;
  15. /**
  16. *
  17. * 控制 Spark Task 队列
  18. *
  19. * <pre>
  20. *
  21. * Created by zhaopx.
  22. * User: zhaopx
  23. * Date: 2020/11/17
  24. * Time: 18:04
  25. *
  26. * </pre>
  27. *
  28. * @author zhaopx
  29. */
  30. @Component
  31. @Slf4j
  32. public class TaskActivationTask implements Callable<Map<String, Object>>, TaskActivation,
  33. InitializingBean, DisposableBean {
  34. @Autowired
  35. DefaultTaskFactory defaultTaskFactory;
  36. @Autowired
  37. ThreadPoolTaskExecutor taskExecutor;
  38. @Autowired
  39. TaskProperties taskProperties;
  40. /**
  41. * 线程完成的句柄
  42. */
  43. private Future<Map<String, Object>> taskFuture;
  44. /**
  45. * 当前是否正在运行
  46. */
  47. private boolean running = true;
  48. @Override
  49. public void afterPropertiesSet() throws Exception {
  50. AsyncTaskQueue.getInstance(taskProperties.getParallel());
  51. taskFuture = taskExecutor.submit(this);
  52. log.info("start scan TaskActivationTask thread.");
  53. }
  54. @Override
  55. public void active() {
  56. try {
  57. synchronized (this) {
  58. this.notify();
  59. }
  60. } catch (Exception e) {
  61. log.error("notify TaskActivationTask thread error.", e);
  62. }
  63. }
  64. @Override
  65. public Map<String, Object> call() throws Exception {
  66. while (running) {
  67. // 开始提交任务
  68. try {
  69. // 如果待运行的队列非空,且当前运行的并发小于要求的值,则开启任务
  70. if(AsyncTaskQueue.getPendingTaskSize() > 0 && AsyncTaskQueue.getRunningTaskSize() < taskProperties.getParallel()) {
  71. Set<String> pendingTasks = AsyncTaskQueue.getPendingTasks();
  72. int count = taskProperties.getParallel() - AsyncTaskQueue.getRunningTaskSize(); // 当前可提交的任务数量
  73. // 开始提交 count 个任务
  74. int i = 0;
  75. for (String taskId : pendingTasks) {
  76. i++;
  77. AsyncTaskQueue.getInstance().execute(taskExecutor,
  78. defaultTaskFactory.newTask(taskId, (Map) AsyncTaskQueue.getTaskInfo(taskId)));
  79. if(i >= count) {
  80. break;
  81. }
  82. }
  83. }
  84. } catch (Exception e) {
  85. log.error("Submit InnerTaskExecution Error!", e);
  86. }
  87. try {
  88. synchronized (this) {
  89. // 停止 10s,采用 wait 比 sleep 好, 让出 CPU 时间,使执行其他线程
  90. this.wait(TimeUnit.SECONDS.toMillis(30));
  91. }
  92. } catch (InterruptedException e) {
  93. log.error("Interrupted TaskActivationTask Thread!");
  94. // 系统中断
  95. running = false;
  96. }
  97. }
  98. return new HashMap<>();
  99. }
  100. @Override
  101. public void destroy() throws Exception {
  102. log.warn("destroy spring context, will stop TaskActivationTask thread!");
  103. running = false;
  104. active();
  105. if(taskFuture != null) {
  106. taskFuture.cancel(true);
  107. }
  108. }
  109. }