| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160 |
- package org.apache.kyuubi.events.handler
- import org.apache.commons.lang3.StringUtils
- import org.apache.commons.pool2.impl.GenericObjectPoolConfig
- import org.apache.kyuubi.Logging
- import redis.clients.jedis.{Jedis, JedisPool, JedisPubSub}
- import java.io.{Closeable, IOException}
- import java.time.Duration
- import java.util
- import java.util.Properties
- import scala.collection.JavaConverters._
- import scala.collection.mutable
- /**
- * 易点天下
- *
- * <pre>
- *
- * Created by zhenqin.
- * User: zhenqin
- * Date: 2026/1/7
- * Time: 12:53
- * Vendor: yiidata.com
- *
- * </pre>
- *
- * @author zhenqin
- */
- class Redised(props: Properties) extends Closeable with Logging {
- /**
- * Redis 的 Java Client
- */
- private val redisPool: JedisPool = initJedis(props)
- /**
- * Redis DB
- */
- val db = props.getProperty("db", "0").toInt;
- private def initJedis(props: Properties): JedisPool = {
- val poolConfig = new GenericObjectPoolConfig[Jedis]();
- // 设置连接池的最大空闲连接数等其他属性...// 设置连接池的最大空闲连接数等其他属性...
- poolConfig.setMaxTotal(props.getProperty("maxTotal", "100").toInt)
- poolConfig.setMaxIdle(Math.max(poolConfig.getMaxTotal, 5))
- poolConfig.setMinIdle(3)
- poolConfig.setTestOnBorrow(false)
- poolConfig.setTestWhileIdle(true)
- poolConfig.setTestOnCreate(false)
- poolConfig.setMinEvictableIdleTime(Duration.ofSeconds(95))
- logger.info("connect redis: " + props.getProperty("host", "localhost") + ":" + props.getProperty("port", "6379"))
- // 判断密码,如果有密码
- if(StringUtils.isBlank(props.getProperty("password"))) {
- return new JedisPool(poolConfig, props.getProperty("host", "localhost"), props.getProperty("port", "6379").toInt,
- props.getProperty("timeout", "3000").toInt)
- }
- new JedisPool(poolConfig, props.getProperty("host", "localhost"), props.getProperty("port", "6379").toInt,
- props.getProperty("timeout", "3000").toInt, props.getProperty("password"))
- }
- @throws[IOException]
- def zsetadd(sessionId: String, score: Double, statementId: String): Unit = {
- val jedis = redisPool.getResource
- try jedis.zadd("z:" + sessionId, score, statementId)
- finally if (jedis != null) jedis.close()
- }
- @throws[IOException]
- def zcount(sessionId: String): Long = {
- val jedis = redisPool.getResource
- try jedis.zcard("z:" + sessionId)
- finally if (jedis != null) jedis.close()
- }
- @throws[IOException]
- def zsettopn(sessionId: String, limit: Int): mutable.Set[String] = {
- val jedis = redisPool.getResource
- try {
- jedis.zrevrange("z:" + sessionId, 0, limit).asScala;
- }
- finally if (jedis != null) jedis.close()
- }
- @throws[IOException]
- def mput(sessionId: String, statementId: String, json: String): Unit = {
- val jedis = redisPool.getResource
- try jedis.hset(sessionId, statementId, json)
- finally if (jedis != null) jedis.close()
- }
- /**
- * 获取单个信息
- * @param sessionId
- * @param statementId
- * @throws
- * @return
- */
- @throws[IOException]
- def mget(sessionId: String, statementId: String): String = {
- val jedis = redisPool.getResource
- try {
- jedis.hget(sessionId, statementId);
- }
- finally if (jedis != null) jedis.close()
- }
- @throws[IOException]
- def mget(sessionId: String, statementIds: Array[String]): Map[String, String] = {
- val jedis = redisPool.getResource
- val r = scala.collection.mutable.Map[String, String]()
- try {
- for(id <- statementIds) {
- r += (id -> jedis.hget(sessionId, id))
- }
- return r.toMap[String, String]
- }
- finally if (jedis != null) jedis.close()
- }
- @throws[IOException]
- def mgetAll(sessionId: String): util.Map[String, String] = {
- val jedis = redisPool.getResource
- try jedis.hgetAll(sessionId)
- finally if (jedis != null) jedis.close()
- }
- @throws[IOException]
- def publish(topic: String, json: String): Unit = {
- val jedis = redisPool.getResource
- try jedis.publish(topic, json)
- finally if (jedis != null) jedis.close()
- }
- @throws[IOException]
- def subscribe(topic: String, jedisPubSub: JedisPubSub): Unit = {
- val jedis = redisPool.getResource
- try jedis.subscribe(jedisPubSub, topic)
- finally if (jedis != null) jedis.close()
- }
- def clear(): Unit = {
- val jedis = redisPool.getResource
- try {
- jedis.select(db)
- jedis.flushDB
- } finally if (jedis != null) jedis.close()
- }
- override def close(): Unit = {
- redisPool.close()
- }
- def shutdown(): Unit = {
- try close()
- catch {
- case e: IOException => logger.error("close redis connection error!", e)
- }
- }
- }
|