| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407 |
- package org.apache.kyuubi.util;
- import java.time.Duration;
- import java.util.ArrayList;
- import java.util.Collections;
- import java.util.Deque;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- import java.util.Optional;
- import java.util.Timer;
- import java.util.TimerTask;
- import java.util.concurrent.ConcurrentHashMap;
- import java.util.concurrent.LinkedBlockingDeque;
- import java.util.concurrent.locks.ReentrantLock;
- /**
- * <pre>
- *
- * Created by zhenqin.
- * User: zhenqin
- * Date: 2026/3/13
- * Time: 13:57
- * Vendor: yiidata.com
- *
- * </pre>
- *
- * @author zhenqin
- */
- public class TimestampSlidingWindowCounter {
- // 存储每个key的事件时间戳队列
- private final ConcurrentHashMap<String, Window> windows = new ConcurrentHashMap<>();
- // 窗口大小(毫秒)
- private final long windowSizeMs;
- // 是否自动清理过期数据
- private final boolean autoCleanup;
- // 清理线程
- private Timer cleanupTimer;
- public TimestampSlidingWindowCounter(long windowSizeMs, boolean autoCleanup) {
- this.windowSizeMs = windowSizeMs;
- this.autoCleanup = autoCleanup;
- if (autoCleanup) {
- startCleanupTask();
- }
- }
- /**
- * 创建1分钟窗口计数器
- */
- public static TimestampSlidingWindowCounter createMinuteWindow() {
- return new TimestampSlidingWindowCounter(Duration.ofMinutes(1).toMillis(), true);
- }
- /**
- * 创建5分钟窗口计数器
- */
- @Deprecated
- public static TimestampSlidingWindowCounter create5MinuteWindow() {
- return new TimestampSlidingWindowCounter(Duration.ofMinutes(5).toMillis(), true);
- }
- /**
- * 创建1小时窗口计数器
- */
- public static TimestampSlidingWindowCounter createHourWindow() {
- return new TimestampSlidingWindowCounter(Duration.ofMinutes(60).toMillis(), true);
- }
- /**
- * 记录一个事件
- */
- public long addEvent(String key) {
- return addEvent(key, System.currentTimeMillis());
- }
- /**
- * 记录一个指定时间的事件
- */
- public long addEvent(String key, long timestamp) {
- Window window = windows.computeIfAbsent(key, k -> new Window(windowSizeMs));
- return window.add(timestamp);
- }
- /**
- * 记录一个指定时间的事件, 并添加 count 值
- */
- public long addEventCount(String key, long count) {
- return addEventCount(key, System.currentTimeMillis(), count);
- }
- /**
- * 记录一个指定时间的事件, 并添加 count 值
- */
- public long addEventCount(String key, long timestamp, long count) {
- Window window = windows.computeIfAbsent(key, k -> new Window(windowSizeMs));
- return window.add(timestamp, count);
- }
- /**
- * 批量添加事件
- */
- public Map<String, Long> addEvents(Map<String, List<Long>> events) {
- Map<String, Long> results = new HashMap<>();
- events.forEach((key, timestamps) -> {
- long count = 0;
- for (long ts : timestamps) {
- count = addEvent(key, ts);
- }
- results.put(key, count);
- });
- return results;
- }
- /**
- * 获取最后放入的时间
- * @param key
- * @return
- */
- public long getLastTime(String key) {
- Window window = windows.get(key);
- return window != null ? window.getLastTime() : 0;
- }
- /**
- * 获取当前窗口内的事件数量
- */
- public long getCount(String key) {
- return getCount(key, System.currentTimeMillis());
- }
- /**
- * 获取指定时间点窗口内的事件数量
- */
- public long getCount(String key, long timestamp) {
- Window window = windows.get(key);
- return window != null ? window.getCount(timestamp) : 0;
- }
- /**
- * 获取指定时间点窗口内的计数器数量
- */
- public long getTotal(String key) {
- Window window = windows.get(key);
- return window != null ? window.getTotal(System.currentTimeMillis()) : 0;
- }
- /**
- * 获取指定时间点窗口内的计数器数量
- */
- public long getTotal(String key, long timestamp) {
- Window window = windows.get(key);
- return window != null ? window.getTotal(timestamp) : 0;
- }
- /**
- * 获取所有key的计数
- */
- public Map<String, Long> getAllCounts() {
- Map<String, Long> counts = new HashMap<>();
- long now = System.currentTimeMillis();
- windows.forEach((key, window) -> {
- counts.put(key, window.getCount(now));
- });
- return counts;
- }
- /**
- * 判断是否超过阈值
- */
- public boolean isExceedLimit(String key, long threshold) {
- return getCount(key) > threshold;
- }
- /**
- * 判断是否超过阈值(带自定义时间)
- */
- public boolean isExceedLimit(String key, long threshold, long timestamp) {
- return getCount(key, timestamp) > threshold;
- }
- /**
- * 获取窗口内的事件时间戳列表
- */
- public List<Long> getEventTimestamps(String key) {
- Window window = windows.get(key);
- return window != null ? window.getTimestamps() : Collections.emptyList();
- }
- /**
- * 清空指定key的数据
- */
- public void clear(String key) {
- windows.remove(key);
- }
- /**
- * 清空所有数据
- */
- public void clearAll() {
- windows.clear();
- }
- /**
- * 获取活跃的key数量
- */
- public int getActiveKeyCount() {
- return windows.size();
- }
- /**
- * 获取窗口大小
- */
- public long getWindowSizeMs() {
- return windowSizeMs;
- }
- /**
- * 启动清理任务
- */
- private void startCleanupTask() {
- cleanupTimer = new Timer("SlidingWindow-Cleanup", true);
- cleanupTimer.scheduleAtFixedRate(new TimerTask() {
- @Override
- public void run() {
- try {
- cleanup();
- } catch (Exception e) {
- // 忽略清理异常
- }
- }
- }, windowSizeMs / 2, windowSizeMs / 2);
- }
- /**
- * 手动清理过期数据
- */
- public void cleanup() {
- long now = System.currentTimeMillis();
- windows.entrySet().removeIf(entry -> {
- Window window = entry.getValue();
- return window.isEmpty(now);
- });
- }
- /**
- * 关闭清理任务
- */
- public void shutdown() {
- if (cleanupTimer != null) {
- cleanupTimer.cancel();
- }
- }
- /**
- * 内部窗口类
- */
- private static class Window {
- // 使用双端队列存储时间戳
- private final Deque<Long> timestamps = new LinkedBlockingDeque<>();
- /** 每隔时间戳的统计值 */
- private final Deque<Long> statistics = new LinkedBlockingDeque<>();
- // 窗口大小
- private final long windowSizeMs;
- // 锁(保证线程安全)
- private final ReentrantLock lock = new ReentrantLock();
- // 缓存当前计数(优化性能)
- private volatile long cachedCount = 0;
- private volatile long lastCleanupTime = System.currentTimeMillis();
- public Window(long windowSizeMs) {
- this.windowSizeMs = windowSizeMs;
- }
- /**
- * 添加事件
- */
- public long add(long timestamp) {
- lock.lock();
- try {
- // 移除过期的数据
- removeExpired(timestamp);
- // 添加新事件
- timestamps.addLast(timestamp);
- statistics.addLast(0L);
- cachedCount = timestamps.size();
- lastCleanupTime = timestamp;
- return cachedCount;
- } finally {
- lock.unlock();
- }
- }
- /**
- * 添加事件
- */
- public long add(long timestamp, long value) {
- lock.lock();
- try {
- // 移除过期的数据
- removeExpired(timestamp);
- // 添加新事件
- timestamps.addLast(timestamp);
- statistics.addLast(value);
- cachedCount = timestamps.size();
- lastCleanupTime = timestamp;
- return cachedCount;
- } finally {
- lock.unlock();
- }
- }
- /**
- * 获取当前计数
- */
- public long getCount(long timestamp) {
- // 如果最后清理时间太旧,主动清理一次
- if (timestamp - lastCleanupTime > windowSizeMs / 4) {
- lock.lock();
- try {
- removeExpired(timestamp);
- } finally {
- lock.unlock();
- }
- }
- return cachedCount;
- }
- /**
- * 获取当前计数总和
- */
- public long getTotal(long timestamp) {
- // 如果最后清理时间太旧,主动清理一次
- if (timestamp - lastCleanupTime > windowSizeMs / 4) {
- lock.lock();
- try {
- removeExpired(timestamp);
- } finally {
- lock.unlock();
- }
- }
- return statistics.stream().mapToLong(it->it).sum();
- }
- /**
- * 获取最后一次计数的拉入时间,可能返回 null
- */
- public long getLastTime() {
- return Optional.ofNullable(timestamps.peekLast()).orElse(0L);
- }
- /**
- * 获取所有时间戳
- */
- public List<Long> getTimestamps() {
- lock.lock();
- try {
- return new ArrayList<>(timestamps);
- } finally {
- lock.unlock();
- }
- }
- /**
- * 是否为空
- */
- public boolean isEmpty(long timestamp) {
- lock.lock();
- try {
- removeExpired(timestamp);
- return timestamps.isEmpty();
- } finally {
- lock.unlock();
- }
- }
- /**
- * 移除过期数据
- */
- private void removeExpired(long now) {
- long cutoff = now - windowSizeMs;
- while (!timestamps.isEmpty() && timestamps.peekFirst() < cutoff) {
- timestamps.removeFirst();
- statistics.removeFirst();
- }
- cachedCount = timestamps.size();
- lastCleanupTime = now;
- }
- }
- }
|