package org.apache.kyuubi.util;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.locks.ReentrantLock;
/**
*
*
* Created by zhenqin.
* User: zhenqin
* Date: 2026/3/13
* Time: 13:57
* Vendor: yiidata.com
*
*
*
* @author zhenqin
*/
public class TimestampSlidingWindowCounter {
// 存储每个key的事件时间戳队列
private final ConcurrentHashMap windows = new ConcurrentHashMap<>();
// 窗口大小(毫秒)
private final long windowSizeMs;
// 是否自动清理过期数据
private final boolean autoCleanup;
// 清理线程
private Timer cleanupTimer;
public TimestampSlidingWindowCounter(long windowSizeMs, boolean autoCleanup) {
this.windowSizeMs = windowSizeMs;
this.autoCleanup = autoCleanup;
if (autoCleanup) {
startCleanupTask();
}
}
/**
* 创建1分钟窗口计数器
*/
public static TimestampSlidingWindowCounter createMinuteWindow() {
return new TimestampSlidingWindowCounter(Duration.ofMinutes(1).toMillis(), true);
}
/**
* 创建5分钟窗口计数器
*/
@Deprecated
public static TimestampSlidingWindowCounter create5MinuteWindow() {
return new TimestampSlidingWindowCounter(Duration.ofMinutes(5).toMillis(), true);
}
/**
* 创建1小时窗口计数器
*/
public static TimestampSlidingWindowCounter createHourWindow() {
return new TimestampSlidingWindowCounter(Duration.ofMinutes(60).toMillis(), true);
}
/**
* 记录一个事件
*/
public long addEvent(String key) {
return addEvent(key, System.currentTimeMillis());
}
/**
* 记录一个指定时间的事件
*/
public long addEvent(String key, long timestamp) {
Window window = windows.computeIfAbsent(key, k -> new Window(windowSizeMs));
return window.add(timestamp);
}
/**
* 记录一个指定时间的事件, 并添加 count 值
*/
public long addEventCount(String key, long count) {
return addEventCount(key, System.currentTimeMillis(), count);
}
/**
* 记录一个指定时间的事件, 并添加 count 值
*/
public long addEventCount(String key, long timestamp, long count) {
Window window = windows.computeIfAbsent(key, k -> new Window(windowSizeMs));
return window.add(timestamp, count);
}
/**
* 批量添加事件
*/
public Map addEvents(Map> events) {
Map results = new HashMap<>();
events.forEach((key, timestamps) -> {
long count = 0;
for (long ts : timestamps) {
count = addEvent(key, ts);
}
results.put(key, count);
});
return results;
}
/**
* 获取最后放入的时间
* @param key
* @return
*/
public long getLastTime(String key) {
Window window = windows.get(key);
return window != null ? window.getLastTime() : 0;
}
/**
* 获取当前窗口内的事件数量
*/
public long getCount(String key) {
return getCount(key, System.currentTimeMillis());
}
/**
* 获取指定时间点窗口内的事件数量
*/
public long getCount(String key, long timestamp) {
Window window = windows.get(key);
return window != null ? window.getCount(timestamp) : 0;
}
/**
* 获取指定时间点窗口内的计数器数量
*/
public long getTotal(String key) {
Window window = windows.get(key);
return window != null ? window.getTotal(System.currentTimeMillis()) : 0;
}
/**
* 获取指定时间点窗口内的计数器数量
*/
public long getTotal(String key, long timestamp) {
Window window = windows.get(key);
return window != null ? window.getTotal(timestamp) : 0;
}
/**
* 获取所有key的计数
*/
public Map getAllCounts() {
Map counts = new HashMap<>();
long now = System.currentTimeMillis();
windows.forEach((key, window) -> {
counts.put(key, window.getCount(now));
});
return counts;
}
/**
* 判断是否超过阈值
*/
public boolean isExceedLimit(String key, long threshold) {
return getCount(key) > threshold;
}
/**
* 判断是否超过阈值(带自定义时间)
*/
public boolean isExceedLimit(String key, long threshold, long timestamp) {
return getCount(key, timestamp) > threshold;
}
/**
* 获取窗口内的事件时间戳列表
*/
public List getEventTimestamps(String key) {
Window window = windows.get(key);
return window != null ? window.getTimestamps() : Collections.emptyList();
}
/**
* 清空指定key的数据
*/
public void clear(String key) {
windows.remove(key);
}
/**
* 清空所有数据
*/
public void clearAll() {
windows.clear();
}
/**
* 获取活跃的key数量
*/
public int getActiveKeyCount() {
return windows.size();
}
/**
* 获取窗口大小
*/
public long getWindowSizeMs() {
return windowSizeMs;
}
/**
* 启动清理任务
*/
private void startCleanupTask() {
cleanupTimer = new Timer("SlidingWindow-Cleanup", true);
cleanupTimer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
cleanup();
} catch (Exception e) {
// 忽略清理异常
}
}
}, windowSizeMs / 2, windowSizeMs / 2);
}
/**
* 手动清理过期数据
*/
public void cleanup() {
long now = System.currentTimeMillis();
windows.entrySet().removeIf(entry -> {
Window window = entry.getValue();
return window.isEmpty(now);
});
}
/**
* 关闭清理任务
*/
public void shutdown() {
if (cleanupTimer != null) {
cleanupTimer.cancel();
}
}
/**
* 内部窗口类
*/
private static class Window {
// 使用双端队列存储时间戳
private final Deque timestamps = new LinkedBlockingDeque<>();
/** 每隔时间戳的统计值 */
private final Deque statistics = new LinkedBlockingDeque<>();
// 窗口大小
private final long windowSizeMs;
// 锁(保证线程安全)
private final ReentrantLock lock = new ReentrantLock();
// 缓存当前计数(优化性能)
private volatile long cachedCount = 0;
private volatile long lastCleanupTime = System.currentTimeMillis();
public Window(long windowSizeMs) {
this.windowSizeMs = windowSizeMs;
}
/**
* 添加事件
*/
public long add(long timestamp) {
lock.lock();
try {
// 移除过期的数据
removeExpired(timestamp);
// 添加新事件
timestamps.addLast(timestamp);
statistics.addLast(0L);
cachedCount = timestamps.size();
lastCleanupTime = timestamp;
return cachedCount;
} finally {
lock.unlock();
}
}
/**
* 添加事件
*/
public long add(long timestamp, long value) {
lock.lock();
try {
// 移除过期的数据
removeExpired(timestamp);
// 添加新事件
timestamps.addLast(timestamp);
statistics.addLast(value);
cachedCount = timestamps.size();
lastCleanupTime = timestamp;
return cachedCount;
} finally {
lock.unlock();
}
}
/**
* 获取当前计数
*/
public long getCount(long timestamp) {
// 如果最后清理时间太旧,主动清理一次
if (timestamp - lastCleanupTime > windowSizeMs / 4) {
lock.lock();
try {
removeExpired(timestamp);
} finally {
lock.unlock();
}
}
return cachedCount;
}
/**
* 获取当前计数总和
*/
public long getTotal(long timestamp) {
// 如果最后清理时间太旧,主动清理一次
if (timestamp - lastCleanupTime > windowSizeMs / 4) {
lock.lock();
try {
removeExpired(timestamp);
} finally {
lock.unlock();
}
}
return statistics.stream().mapToLong(it->it).sum();
}
/**
* 获取最后一次计数的拉入时间,可能返回 null
*/
public long getLastTime() {
return Optional.ofNullable(timestamps.peekLast()).orElse(0L);
}
/**
* 获取所有时间戳
*/
public List getTimestamps() {
lock.lock();
try {
return new ArrayList<>(timestamps);
} finally {
lock.unlock();
}
}
/**
* 是否为空
*/
public boolean isEmpty(long timestamp) {
lock.lock();
try {
removeExpired(timestamp);
return timestamps.isEmpty();
} finally {
lock.unlock();
}
}
/**
* 移除过期数据
*/
private void removeExpired(long now) {
long cutoff = now - windowSizeMs;
while (!timestamps.isEmpty() && timestamps.peekFirst() < cutoff) {
timestamps.removeFirst();
statistics.removeFirst();
}
cachedCount = timestamps.size();
lastCleanupTime = now;
}
}
}