package org.apache.kyuubi.util; import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.Deque; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.locks.ReentrantLock; /** *
 *
 * Created by zhenqin.
 * User: zhenqin
 * Date: 2026/3/13
 * Time: 13:57
 * Vendor: yiidata.com
 *
 * 
* * @author zhenqin */ public class TimestampSlidingWindowCounter { // 存储每个key的事件时间戳队列 private final ConcurrentHashMap windows = new ConcurrentHashMap<>(); // 窗口大小(毫秒) private final long windowSizeMs; // 是否自动清理过期数据 private final boolean autoCleanup; // 清理线程 private Timer cleanupTimer; public TimestampSlidingWindowCounter(long windowSizeMs, boolean autoCleanup) { this.windowSizeMs = windowSizeMs; this.autoCleanup = autoCleanup; if (autoCleanup) { startCleanupTask(); } } /** * 创建1分钟窗口计数器 */ public static TimestampSlidingWindowCounter createMinuteWindow() { return new TimestampSlidingWindowCounter(Duration.ofMinutes(1).toMillis(), true); } /** * 创建5分钟窗口计数器 */ @Deprecated public static TimestampSlidingWindowCounter create5MinuteWindow() { return new TimestampSlidingWindowCounter(Duration.ofMinutes(5).toMillis(), true); } /** * 创建1小时窗口计数器 */ public static TimestampSlidingWindowCounter createHourWindow() { return new TimestampSlidingWindowCounter(Duration.ofMinutes(60).toMillis(), true); } /** * 记录一个事件 */ public long addEvent(String key) { return addEvent(key, System.currentTimeMillis()); } /** * 记录一个指定时间的事件 */ public long addEvent(String key, long timestamp) { Window window = windows.computeIfAbsent(key, k -> new Window(windowSizeMs)); return window.add(timestamp); } /** * 记录一个指定时间的事件, 并添加 count 值 */ public long addEventCount(String key, long count) { return addEventCount(key, System.currentTimeMillis(), count); } /** * 记录一个指定时间的事件, 并添加 count 值 */ public long addEventCount(String key, long timestamp, long count) { Window window = windows.computeIfAbsent(key, k -> new Window(windowSizeMs)); return window.add(timestamp, count); } /** * 批量添加事件 */ public Map addEvents(Map> events) { Map results = new HashMap<>(); events.forEach((key, timestamps) -> { long count = 0; for (long ts : timestamps) { count = addEvent(key, ts); } results.put(key, count); }); return results; } /** * 获取最后放入的时间 * @param key * @return */ public long getLastTime(String key) { Window window = windows.get(key); return window != null ? window.getLastTime() : 0; } /** * 获取当前窗口内的事件数量 */ public long getCount(String key) { return getCount(key, System.currentTimeMillis()); } /** * 获取指定时间点窗口内的事件数量 */ public long getCount(String key, long timestamp) { Window window = windows.get(key); return window != null ? window.getCount(timestamp) : 0; } /** * 获取指定时间点窗口内的计数器数量 */ public long getTotal(String key) { Window window = windows.get(key); return window != null ? window.getTotal(System.currentTimeMillis()) : 0; } /** * 获取指定时间点窗口内的计数器数量 */ public long getTotal(String key, long timestamp) { Window window = windows.get(key); return window != null ? window.getTotal(timestamp) : 0; } /** * 获取所有key的计数 */ public Map getAllCounts() { Map counts = new HashMap<>(); long now = System.currentTimeMillis(); windows.forEach((key, window) -> { counts.put(key, window.getCount(now)); }); return counts; } /** * 判断是否超过阈值 */ public boolean isExceedLimit(String key, long threshold) { return getCount(key) > threshold; } /** * 判断是否超过阈值(带自定义时间) */ public boolean isExceedLimit(String key, long threshold, long timestamp) { return getCount(key, timestamp) > threshold; } /** * 获取窗口内的事件时间戳列表 */ public List getEventTimestamps(String key) { Window window = windows.get(key); return window != null ? window.getTimestamps() : Collections.emptyList(); } /** * 清空指定key的数据 */ public void clear(String key) { windows.remove(key); } /** * 清空所有数据 */ public void clearAll() { windows.clear(); } /** * 获取活跃的key数量 */ public int getActiveKeyCount() { return windows.size(); } /** * 获取窗口大小 */ public long getWindowSizeMs() { return windowSizeMs; } /** * 启动清理任务 */ private void startCleanupTask() { cleanupTimer = new Timer("SlidingWindow-Cleanup", true); cleanupTimer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { try { cleanup(); } catch (Exception e) { // 忽略清理异常 } } }, windowSizeMs / 2, windowSizeMs / 2); } /** * 手动清理过期数据 */ public void cleanup() { long now = System.currentTimeMillis(); windows.entrySet().removeIf(entry -> { Window window = entry.getValue(); return window.isEmpty(now); }); } /** * 关闭清理任务 */ public void shutdown() { if (cleanupTimer != null) { cleanupTimer.cancel(); } } /** * 内部窗口类 */ private static class Window { // 使用双端队列存储时间戳 private final Deque timestamps = new LinkedBlockingDeque<>(); /** 每隔时间戳的统计值 */ private final Deque statistics = new LinkedBlockingDeque<>(); // 窗口大小 private final long windowSizeMs; // 锁(保证线程安全) private final ReentrantLock lock = new ReentrantLock(); // 缓存当前计数(优化性能) private volatile long cachedCount = 0; private volatile long lastCleanupTime = System.currentTimeMillis(); public Window(long windowSizeMs) { this.windowSizeMs = windowSizeMs; } /** * 添加事件 */ public long add(long timestamp) { lock.lock(); try { // 移除过期的数据 removeExpired(timestamp); // 添加新事件 timestamps.addLast(timestamp); statistics.addLast(0L); cachedCount = timestamps.size(); lastCleanupTime = timestamp; return cachedCount; } finally { lock.unlock(); } } /** * 添加事件 */ public long add(long timestamp, long value) { lock.lock(); try { // 移除过期的数据 removeExpired(timestamp); // 添加新事件 timestamps.addLast(timestamp); statistics.addLast(value); cachedCount = timestamps.size(); lastCleanupTime = timestamp; return cachedCount; } finally { lock.unlock(); } } /** * 获取当前计数 */ public long getCount(long timestamp) { // 如果最后清理时间太旧,主动清理一次 if (timestamp - lastCleanupTime > windowSizeMs / 4) { lock.lock(); try { removeExpired(timestamp); } finally { lock.unlock(); } } return cachedCount; } /** * 获取当前计数总和 */ public long getTotal(long timestamp) { // 如果最后清理时间太旧,主动清理一次 if (timestamp - lastCleanupTime > windowSizeMs / 4) { lock.lock(); try { removeExpired(timestamp); } finally { lock.unlock(); } } return statistics.stream().mapToLong(it->it).sum(); } /** * 获取最后一次计数的拉入时间,可能返回 null */ public long getLastTime() { return Optional.ofNullable(timestamps.peekLast()).orElse(0L); } /** * 获取所有时间戳 */ public List getTimestamps() { lock.lock(); try { return new ArrayList<>(timestamps); } finally { lock.unlock(); } } /** * 是否为空 */ public boolean isEmpty(long timestamp) { lock.lock(); try { removeExpired(timestamp); return timestamps.isEmpty(); } finally { lock.unlock(); } } /** * 移除过期数据 */ private void removeExpired(long now) { long cutoff = now - windowSizeMs; while (!timestamps.isEmpty() && timestamps.peekFirst() < cutoff) { timestamps.removeFirst(); statistics.removeFirst(); } cachedCount = timestamps.size(); lastCleanupTime = now; } } }