TimestampSlidingWindowCounter.java 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407
  1. package org.apache.kyuubi.util;
  2. import java.time.Duration;
  3. import java.util.ArrayList;
  4. import java.util.Collections;
  5. import java.util.Deque;
  6. import java.util.HashMap;
  7. import java.util.List;
  8. import java.util.Map;
  9. import java.util.Optional;
  10. import java.util.Timer;
  11. import java.util.TimerTask;
  12. import java.util.concurrent.ConcurrentHashMap;
  13. import java.util.concurrent.LinkedBlockingDeque;
  14. import java.util.concurrent.locks.ReentrantLock;
  15. /**
  16. * <pre>
  17. *
  18. * Created by zhenqin.
  19. * User: zhenqin
  20. * Date: 2026/3/13
  21. * Time: 13:57
  22. * Vendor: yiidata.com
  23. *
  24. * </pre>
  25. *
  26. * @author zhenqin
  27. */
  28. public class TimestampSlidingWindowCounter {
  29. // 存储每个key的事件时间戳队列
  30. private final ConcurrentHashMap<String, Window> windows = new ConcurrentHashMap<>();
  31. // 窗口大小(毫秒)
  32. private final long windowSizeMs;
  33. // 是否自动清理过期数据
  34. private final boolean autoCleanup;
  35. // 清理线程
  36. private Timer cleanupTimer;
  37. public TimestampSlidingWindowCounter(long windowSizeMs, boolean autoCleanup) {
  38. this.windowSizeMs = windowSizeMs;
  39. this.autoCleanup = autoCleanup;
  40. if (autoCleanup) {
  41. startCleanupTask();
  42. }
  43. }
  44. /**
  45. * 创建1分钟窗口计数器
  46. */
  47. public static TimestampSlidingWindowCounter createMinuteWindow() {
  48. return new TimestampSlidingWindowCounter(Duration.ofMinutes(1).toMillis(), true);
  49. }
  50. /**
  51. * 创建5分钟窗口计数器
  52. */
  53. @Deprecated
  54. public static TimestampSlidingWindowCounter create5MinuteWindow() {
  55. return new TimestampSlidingWindowCounter(Duration.ofMinutes(5).toMillis(), true);
  56. }
  57. /**
  58. * 创建1小时窗口计数器
  59. */
  60. public static TimestampSlidingWindowCounter createHourWindow() {
  61. return new TimestampSlidingWindowCounter(Duration.ofMinutes(60).toMillis(), true);
  62. }
  63. /**
  64. * 记录一个事件
  65. */
  66. public long addEvent(String key) {
  67. return addEvent(key, System.currentTimeMillis());
  68. }
  69. /**
  70. * 记录一个指定时间的事件
  71. */
  72. public long addEvent(String key, long timestamp) {
  73. Window window = windows.computeIfAbsent(key, k -> new Window(windowSizeMs));
  74. return window.add(timestamp);
  75. }
  76. /**
  77. * 记录一个指定时间的事件, 并添加 count 值
  78. */
  79. public long addEventCount(String key, long count) {
  80. return addEventCount(key, System.currentTimeMillis(), count);
  81. }
  82. /**
  83. * 记录一个指定时间的事件, 并添加 count 值
  84. */
  85. public long addEventCount(String key, long timestamp, long count) {
  86. Window window = windows.computeIfAbsent(key, k -> new Window(windowSizeMs));
  87. return window.add(timestamp, count);
  88. }
  89. /**
  90. * 批量添加事件
  91. */
  92. public Map<String, Long> addEvents(Map<String, List<Long>> events) {
  93. Map<String, Long> results = new HashMap<>();
  94. events.forEach((key, timestamps) -> {
  95. long count = 0;
  96. for (long ts : timestamps) {
  97. count = addEvent(key, ts);
  98. }
  99. results.put(key, count);
  100. });
  101. return results;
  102. }
  103. /**
  104. * 获取最后放入的时间
  105. * @param key
  106. * @return
  107. */
  108. public long getLastTime(String key) {
  109. Window window = windows.get(key);
  110. return window != null ? window.getLastTime() : 0;
  111. }
  112. /**
  113. * 获取当前窗口内的事件数量
  114. */
  115. public long getCount(String key) {
  116. return getCount(key, System.currentTimeMillis());
  117. }
  118. /**
  119. * 获取指定时间点窗口内的事件数量
  120. */
  121. public long getCount(String key, long timestamp) {
  122. Window window = windows.get(key);
  123. return window != null ? window.getCount(timestamp) : 0;
  124. }
  125. /**
  126. * 获取指定时间点窗口内的计数器数量
  127. */
  128. public long getTotal(String key) {
  129. Window window = windows.get(key);
  130. return window != null ? window.getTotal(System.currentTimeMillis()) : 0;
  131. }
  132. /**
  133. * 获取指定时间点窗口内的计数器数量
  134. */
  135. public long getTotal(String key, long timestamp) {
  136. Window window = windows.get(key);
  137. return window != null ? window.getTotal(timestamp) : 0;
  138. }
  139. /**
  140. * 获取所有key的计数
  141. */
  142. public Map<String, Long> getAllCounts() {
  143. Map<String, Long> counts = new HashMap<>();
  144. long now = System.currentTimeMillis();
  145. windows.forEach((key, window) -> {
  146. counts.put(key, window.getCount(now));
  147. });
  148. return counts;
  149. }
  150. /**
  151. * 判断是否超过阈值
  152. */
  153. public boolean isExceedLimit(String key, long threshold) {
  154. return getCount(key) > threshold;
  155. }
  156. /**
  157. * 判断是否超过阈值(带自定义时间)
  158. */
  159. public boolean isExceedLimit(String key, long threshold, long timestamp) {
  160. return getCount(key, timestamp) > threshold;
  161. }
  162. /**
  163. * 获取窗口内的事件时间戳列表
  164. */
  165. public List<Long> getEventTimestamps(String key) {
  166. Window window = windows.get(key);
  167. return window != null ? window.getTimestamps() : Collections.emptyList();
  168. }
  169. /**
  170. * 清空指定key的数据
  171. */
  172. public void clear(String key) {
  173. windows.remove(key);
  174. }
  175. /**
  176. * 清空所有数据
  177. */
  178. public void clearAll() {
  179. windows.clear();
  180. }
  181. /**
  182. * 获取活跃的key数量
  183. */
  184. public int getActiveKeyCount() {
  185. return windows.size();
  186. }
  187. /**
  188. * 获取窗口大小
  189. */
  190. public long getWindowSizeMs() {
  191. return windowSizeMs;
  192. }
  193. /**
  194. * 启动清理任务
  195. */
  196. private void startCleanupTask() {
  197. cleanupTimer = new Timer("SlidingWindow-Cleanup", true);
  198. cleanupTimer.scheduleAtFixedRate(new TimerTask() {
  199. @Override
  200. public void run() {
  201. try {
  202. cleanup();
  203. } catch (Exception e) {
  204. // 忽略清理异常
  205. }
  206. }
  207. }, windowSizeMs / 2, windowSizeMs / 2);
  208. }
  209. /**
  210. * 手动清理过期数据
  211. */
  212. public void cleanup() {
  213. long now = System.currentTimeMillis();
  214. windows.entrySet().removeIf(entry -> {
  215. Window window = entry.getValue();
  216. return window.isEmpty(now);
  217. });
  218. }
  219. /**
  220. * 关闭清理任务
  221. */
  222. public void shutdown() {
  223. if (cleanupTimer != null) {
  224. cleanupTimer.cancel();
  225. }
  226. }
  227. /**
  228. * 内部窗口类
  229. */
  230. private static class Window {
  231. // 使用双端队列存储时间戳
  232. private final Deque<Long> timestamps = new LinkedBlockingDeque<>();
  233. /** 每隔时间戳的统计值 */
  234. private final Deque<Long> statistics = new LinkedBlockingDeque<>();
  235. // 窗口大小
  236. private final long windowSizeMs;
  237. // 锁(保证线程安全)
  238. private final ReentrantLock lock = new ReentrantLock();
  239. // 缓存当前计数(优化性能)
  240. private volatile long cachedCount = 0;
  241. private volatile long lastCleanupTime = System.currentTimeMillis();
  242. public Window(long windowSizeMs) {
  243. this.windowSizeMs = windowSizeMs;
  244. }
  245. /**
  246. * 添加事件
  247. */
  248. public long add(long timestamp) {
  249. lock.lock();
  250. try {
  251. // 移除过期的数据
  252. removeExpired(timestamp);
  253. // 添加新事件
  254. timestamps.addLast(timestamp);
  255. statistics.addLast(0L);
  256. cachedCount = timestamps.size();
  257. lastCleanupTime = timestamp;
  258. return cachedCount;
  259. } finally {
  260. lock.unlock();
  261. }
  262. }
  263. /**
  264. * 添加事件
  265. */
  266. public long add(long timestamp, long value) {
  267. lock.lock();
  268. try {
  269. // 移除过期的数据
  270. removeExpired(timestamp);
  271. // 添加新事件
  272. timestamps.addLast(timestamp);
  273. statistics.addLast(value);
  274. cachedCount = timestamps.size();
  275. lastCleanupTime = timestamp;
  276. return cachedCount;
  277. } finally {
  278. lock.unlock();
  279. }
  280. }
  281. /**
  282. * 获取当前计数
  283. */
  284. public long getCount(long timestamp) {
  285. // 如果最后清理时间太旧,主动清理一次
  286. if (timestamp - lastCleanupTime > windowSizeMs / 4) {
  287. lock.lock();
  288. try {
  289. removeExpired(timestamp);
  290. } finally {
  291. lock.unlock();
  292. }
  293. }
  294. return cachedCount;
  295. }
  296. /**
  297. * 获取当前计数总和
  298. */
  299. public long getTotal(long timestamp) {
  300. // 如果最后清理时间太旧,主动清理一次
  301. if (timestamp - lastCleanupTime > windowSizeMs / 4) {
  302. lock.lock();
  303. try {
  304. removeExpired(timestamp);
  305. } finally {
  306. lock.unlock();
  307. }
  308. }
  309. return statistics.stream().mapToLong(it->it).sum();
  310. }
  311. /**
  312. * 获取最后一次计数的拉入时间,可能返回 null
  313. */
  314. public long getLastTime() {
  315. return Optional.ofNullable(timestamps.peekLast()).orElse(0L);
  316. }
  317. /**
  318. * 获取所有时间戳
  319. */
  320. public List<Long> getTimestamps() {
  321. lock.lock();
  322. try {
  323. return new ArrayList<>(timestamps);
  324. } finally {
  325. lock.unlock();
  326. }
  327. }
  328. /**
  329. * 是否为空
  330. */
  331. public boolean isEmpty(long timestamp) {
  332. lock.lock();
  333. try {
  334. removeExpired(timestamp);
  335. return timestamps.isEmpty();
  336. } finally {
  337. lock.unlock();
  338. }
  339. }
  340. /**
  341. * 移除过期数据
  342. */
  343. private void removeExpired(long now) {
  344. long cutoff = now - windowSizeMs;
  345. while (!timestamps.isEmpty() && timestamps.peekFirst() < cutoff) {
  346. timestamps.removeFirst();
  347. statistics.removeFirst();
  348. }
  349. cachedCount = timestamps.size();
  350. lastCleanupTime = now;
  351. }
  352. }
  353. }