package org.apache.kyuubi.events.handler import org.apache.commons.lang3.StringUtils import org.apache.commons.pool2.impl.GenericObjectPoolConfig import org.apache.kyuubi.Logging import redis.clients.jedis.{Jedis, JedisPool, JedisPubSub} import java.io.{Closeable, IOException} import java.time.Duration import java.util import java.util.Properties import scala.collection.JavaConverters._ import scala.collection.mutable /** * 易点天下 * *
 *
 * Created by zhenqin.
 * User: zhenqin
 * Date: 2026/1/7
 * Time: 12:53
 * Vendor: yiidata.com
 *
 * 
* * @author zhenqin */ class Redised(props: Properties) extends Closeable with Logging { /** * Redis 的 Java Client */ private val redisPool: JedisPool = initJedis(props) /** * Redis DB */ val db = props.getProperty("db", "0").toInt; private def initJedis(props: Properties): JedisPool = { val poolConfig = new GenericObjectPoolConfig[Jedis](); // 设置连接池的最大空闲连接数等其他属性...// 设置连接池的最大空闲连接数等其他属性... poolConfig.setMaxTotal(props.getProperty("maxTotal", "100").toInt) poolConfig.setMaxIdle(Math.max(poolConfig.getMaxTotal, 5)) poolConfig.setMinIdle(3) poolConfig.setTestOnBorrow(false) poolConfig.setTestWhileIdle(true) poolConfig.setTestOnCreate(false) poolConfig.setMinEvictableIdleTime(Duration.ofSeconds(95)) logger.info("connect redis: " + props.getProperty("host", "localhost") + ":" + props.getProperty("port", "6379")) // 判断密码,如果有密码 if(StringUtils.isBlank(props.getProperty("password"))) { return new JedisPool(poolConfig, props.getProperty("host", "localhost"), props.getProperty("port", "6379").toInt, props.getProperty("timeout", "3000").toInt) } new JedisPool(poolConfig, props.getProperty("host", "localhost"), props.getProperty("port", "6379").toInt, props.getProperty("timeout", "3000").toInt, props.getProperty("password")) } @throws[IOException] def zsetadd(sessionId: String, score: Double, statementId: String): Unit = { val jedis = redisPool.getResource try jedis.zadd("z:" + sessionId, score, statementId) finally if (jedis != null) jedis.close() } @throws[IOException] def zcount(sessionId: String): Long = { val jedis = redisPool.getResource try jedis.zcard("z:" + sessionId) finally if (jedis != null) jedis.close() } @throws[IOException] def zsettopn(sessionId: String, limit: Int): mutable.Set[String] = { val jedis = redisPool.getResource try { jedis.zrevrange("z:" + sessionId, 0, limit).asScala; } finally if (jedis != null) jedis.close() } @throws[IOException] def mput(sessionId: String, statementId: String, json: String): Unit = { val jedis = redisPool.getResource try jedis.hset(sessionId, statementId, json) finally if (jedis != null) jedis.close() } /** * 获取单个信息 * @param sessionId * @param statementId * @throws * @return */ @throws[IOException] def mget(sessionId: String, statementId: String): String = { val jedis = redisPool.getResource try { jedis.hget(sessionId, statementId); } finally if (jedis != null) jedis.close() } @throws[IOException] def mget(sessionId: String, statementIds: Array[String]): Map[String, String] = { val jedis = redisPool.getResource val r = scala.collection.mutable.Map[String, String]() try { for(id <- statementIds) { r += (id -> jedis.hget(sessionId, id)) } return r.toMap[String, String] } finally if (jedis != null) jedis.close() } @throws[IOException] def mgetAll(sessionId: String): util.Map[String, String] = { val jedis = redisPool.getResource try jedis.hgetAll(sessionId) finally if (jedis != null) jedis.close() } @throws[IOException] def publish(topic: String, json: String): Unit = { val jedis = redisPool.getResource try jedis.publish(topic, json) finally if (jedis != null) jedis.close() } @throws[IOException] def subscribe(topic: String, jedisPubSub: JedisPubSub): Unit = { val jedis = redisPool.getResource try jedis.subscribe(jedisPubSub, topic) finally if (jedis != null) jedis.close() } def clear(): Unit = { val jedis = redisPool.getResource try { jedis.select(db) jedis.flushDB } finally if (jedis != null) jedis.close() } override def close(): Unit = { redisPool.close() } def shutdown(): Unit = { try close() catch { case e: IOException => logger.error("close redis connection error!", e) } } }