AsyncTaskQueue.java 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247
  1. package com.yiidata.intergration.web.task;
  2. import lombok.extern.slf4j.Slf4j;
  3. import java.util.HashMap;
  4. import java.util.HashSet;
  5. import java.util.Map;
  6. import java.util.Set;
  7. import java.util.concurrent.ConcurrentHashMap;
  8. import java.util.concurrent.Executor;
  9. import java.util.concurrent.Semaphore;
  10. import java.util.concurrent.locks.Lock;
  11. import java.util.concurrent.locks.ReentrantLock;
  12. /**
  13. *
  14. * 异步任务队列
  15. *
  16. * <pre>
  17. *
  18. * Created by zhaopx.
  19. * User: zhaopx
  20. * Date: 2020/11/16
  21. * Time: 15:36
  22. *
  23. * </pre>
  24. *
  25. * @author zhaopx
  26. */
  27. @Slf4j
  28. public class AsyncTaskQueue {
  29. /**
  30. * 任务缓存
  31. */
  32. private final static Map<String, Object> TASK_QUEUE = new ConcurrentHashMap<>();
  33. /**
  34. * 任务错误队列
  35. */
  36. private final static Map<String, Object> ERROR_TASK_QUEUE = new ConcurrentHashMap<>();
  37. /**
  38. * 正在运行的队列
  39. */
  40. private final static Set<String> RUNNING_TASK_QUEUE = new HashSet<>();
  41. /**
  42. * 控制 Spark 并发的信号量
  43. */
  44. private final Semaphore semaphore;
  45. /**
  46. * 公平锁
  47. */
  48. private final static Lock LOCK = new ReentrantLock();
  49. private static AsyncTaskQueue SPARK_QUEUE;
  50. private AsyncTaskQueue(int permits) {
  51. semaphore = new Semaphore(permits);
  52. }
  53. /**
  54. * 初次调用有效,
  55. * @return
  56. */
  57. public static AsyncTaskQueue getInstance() {
  58. return getInstance(3);
  59. }
  60. /**
  61. * 按照配置,设置并发量。 第一次调用有效
  62. * @param permits
  63. * @return
  64. */
  65. public static synchronized AsyncTaskQueue getInstance(int permits) {
  66. if(SPARK_QUEUE == null) {
  67. SPARK_QUEUE = new AsyncTaskQueue(permits);
  68. }
  69. return SPARK_QUEUE;
  70. }
  71. /**
  72. * 添加任务
  73. * @param taskId
  74. * @param taskInfo
  75. */
  76. public static boolean addTask(String taskId, Map taskInfo) {
  77. LOCK.lock();
  78. try {
  79. if(!TASK_QUEUE.containsKey(taskId)) {
  80. TASK_QUEUE.put(taskId, taskInfo);
  81. log.info("add task: {} , params: {}", taskId, String.valueOf(taskInfo));
  82. return true;
  83. }
  84. } finally {
  85. LOCK.unlock();
  86. }
  87. return false;
  88. }
  89. /**
  90. * 获取当前需要执行队列的长度
  91. * @return
  92. */
  93. public static int getPendingTaskSize() {
  94. LOCK.lock();
  95. try {
  96. HashMap<String, Object> tmpMap = new HashMap<>(TASK_QUEUE);
  97. for (String s : RUNNING_TASK_QUEUE) {
  98. tmpMap.remove(s);
  99. }
  100. return tmpMap.size();
  101. } finally {
  102. LOCK.unlock();
  103. }
  104. }
  105. /**
  106. * 获取当前需要执行队列
  107. * @return
  108. */
  109. public static Set<String> getPendingTasks() {
  110. LOCK.lock();
  111. try {
  112. HashMap<String, Object> tmpMap = new HashMap<>(TASK_QUEUE);
  113. for (String s : RUNNING_TASK_QUEUE) {
  114. tmpMap.remove(s);
  115. }
  116. return tmpMap.keySet();
  117. } finally {
  118. LOCK.unlock();
  119. }
  120. }
  121. /**
  122. * 获取当前正在执行任务的长度
  123. * @return
  124. */
  125. public static int getRunningTaskSize() {
  126. return RUNNING_TASK_QUEUE.size();
  127. }
  128. public static Object getTaskInfo(String taskId) {
  129. return TASK_QUEUE.get(taskId);
  130. }
  131. /**
  132. * 移除任务
  133. * @param taskId
  134. */
  135. public static void removeTask(String taskId) {
  136. LOCK.lock();
  137. try {
  138. TASK_QUEUE.remove(taskId);
  139. RUNNING_TASK_QUEUE.remove(taskId);
  140. log.info("remove task: {}", taskId);
  141. } finally {
  142. LOCK.unlock();
  143. }
  144. }
  145. /**
  146. * 错误的任务报告
  147. * @param taskId
  148. */
  149. public static void reportError(String taskId) {
  150. LOCK.lock();
  151. try {
  152. Object errorTaskInfo = TASK_QUEUE.remove(taskId);
  153. ERROR_TASK_QUEUE.put(taskId, errorTaskInfo);
  154. RUNNING_TASK_QUEUE.remove(taskId);
  155. } finally {
  156. LOCK.unlock();
  157. }
  158. }
  159. /**
  160. * 判断任务是否正在运行
  161. * @param taskId
  162. * @return
  163. */
  164. public static boolean runningTask(String taskId) {
  165. return RUNNING_TASK_QUEUE.contains(taskId);
  166. }
  167. /**
  168. * 执行该函数
  169. * @param executor
  170. * @param task
  171. */
  172. public void execute(Executor executor, final SuperTask task) {
  173. executor.execute(()->{
  174. final String runningTaskId = task.getTaskId();
  175. // 有任务需要运行
  176. if(AsyncTaskQueue.runningTask(runningTaskId)) {
  177. // 取得的待运行的task,不能是正在运行的列表中的
  178. log.info("task {} running.", runningTaskId);
  179. return;
  180. }
  181. // 获得一个许可
  182. try {
  183. semaphore.acquire();
  184. } catch (InterruptedException e) {
  185. return;
  186. }
  187. try {
  188. // 运行任务
  189. RUNNING_TASK_QUEUE.add(runningTaskId);
  190. log.info("running task: {}", runningTaskId);
  191. task.run();
  192. log.info("finished task: {}", runningTaskId);
  193. // 执行成功,移除
  194. removeTask(runningTaskId);
  195. } catch (Exception e) {
  196. log.info("执行任务异常。error task: " + runningTaskId, e);
  197. // 运行错误
  198. reportError(runningTaskId);
  199. } finally {
  200. // 释放许可
  201. semaphore.release();
  202. }
  203. });
  204. }
  205. }