Redised.scala 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. package org.apache.kyuubi.events.handler
  2. import org.apache.commons.lang3.StringUtils
  3. import org.apache.commons.pool2.impl.GenericObjectPoolConfig
  4. import org.apache.kyuubi.Logging
  5. import redis.clients.jedis.{Jedis, JedisPool, JedisPubSub}
  6. import java.io.{Closeable, IOException}
  7. import java.time.Duration
  8. import java.util
  9. import java.util.Properties
  10. import scala.collection.JavaConverters._
  11. import scala.collection.mutable
  12. /**
  13. * 易点天下
  14. *
  15. * <pre>
  16. *
  17. * Created by zhenqin.
  18. * User: zhenqin
  19. * Date: 2026/1/7
  20. * Time: 12:53
  21. * Vendor: yiidata.com
  22. *
  23. * </pre>
  24. *
  25. * @author zhenqin
  26. */
  27. class Redised(props: Properties) extends Closeable with Logging {
  28. /**
  29. * Redis 的 Java Client
  30. */
  31. private val redisPool: JedisPool = initJedis(props)
  32. /**
  33. * Redis DB
  34. */
  35. val db = props.getProperty("db", "0").toInt;
  36. private def initJedis(props: Properties): JedisPool = {
  37. val poolConfig = new GenericObjectPoolConfig[Jedis]();
  38. // 设置连接池的最大空闲连接数等其他属性...// 设置连接池的最大空闲连接数等其他属性...
  39. poolConfig.setMaxTotal(props.getProperty("maxTotal", "100").toInt)
  40. poolConfig.setMaxIdle(Math.max(poolConfig.getMaxTotal, 5))
  41. poolConfig.setMinIdle(3)
  42. poolConfig.setTestOnBorrow(false)
  43. poolConfig.setTestWhileIdle(true)
  44. poolConfig.setTestOnCreate(false)
  45. poolConfig.setMinEvictableIdleTime(Duration.ofSeconds(95))
  46. logger.info("connect redis: " + props.getProperty("host", "localhost") + ":" + props.getProperty("port", "6379"))
  47. // 判断密码,如果有密码
  48. if(StringUtils.isBlank(props.getProperty("password"))) {
  49. return new JedisPool(poolConfig, props.getProperty("host", "localhost"), props.getProperty("port", "6379").toInt,
  50. props.getProperty("timeout", "3000").toInt)
  51. }
  52. new JedisPool(poolConfig, props.getProperty("host", "localhost"), props.getProperty("port", "6379").toInt,
  53. props.getProperty("timeout", "3000").toInt, props.getProperty("password"))
  54. }
  55. @throws[IOException]
  56. def zsetadd(sessionId: String, score: Double, statementId: String): Unit = {
  57. val jedis = redisPool.getResource
  58. try jedis.zadd("z:" + sessionId, score, statementId)
  59. finally if (jedis != null) jedis.close()
  60. }
  61. @throws[IOException]
  62. def zcount(sessionId: String): Long = {
  63. val jedis = redisPool.getResource
  64. try jedis.zcard("z:" + sessionId)
  65. finally if (jedis != null) jedis.close()
  66. }
  67. @throws[IOException]
  68. def zsettopn(sessionId: String, limit: Int): mutable.Set[String] = {
  69. val jedis = redisPool.getResource
  70. try {
  71. jedis.zrevrange("z:" + sessionId, 0, limit).asScala;
  72. }
  73. finally if (jedis != null) jedis.close()
  74. }
  75. @throws[IOException]
  76. def mput(sessionId: String, statementId: String, json: String): Unit = {
  77. val jedis = redisPool.getResource
  78. try jedis.hset(sessionId, statementId, json)
  79. finally if (jedis != null) jedis.close()
  80. }
  81. /**
  82. * 获取单个信息
  83. * @param sessionId
  84. * @param statementId
  85. * @throws
  86. * @return
  87. */
  88. @throws[IOException]
  89. def mget(sessionId: String, statementId: String): String = {
  90. val jedis = redisPool.getResource
  91. try {
  92. jedis.hget(sessionId, statementId);
  93. }
  94. finally if (jedis != null) jedis.close()
  95. }
  96. @throws[IOException]
  97. def mget(sessionId: String, statementIds: Array[String]): Map[String, String] = {
  98. val jedis = redisPool.getResource
  99. val r = scala.collection.mutable.Map[String, String]()
  100. try {
  101. for(id <- statementIds) {
  102. r += (id -> jedis.hget(sessionId, id))
  103. }
  104. return r.toMap[String, String]
  105. }
  106. finally if (jedis != null) jedis.close()
  107. }
  108. @throws[IOException]
  109. def mgetAll(sessionId: String): util.Map[String, String] = {
  110. val jedis = redisPool.getResource
  111. try jedis.hgetAll(sessionId)
  112. finally if (jedis != null) jedis.close()
  113. }
  114. @throws[IOException]
  115. def publish(topic: String, json: String): Unit = {
  116. val jedis = redisPool.getResource
  117. try jedis.publish(topic, json)
  118. finally if (jedis != null) jedis.close()
  119. }
  120. @throws[IOException]
  121. def subscribe(topic: String, jedisPubSub: JedisPubSub): Unit = {
  122. val jedis = redisPool.getResource
  123. try jedis.subscribe(jedisPubSub, topic)
  124. finally if (jedis != null) jedis.close()
  125. }
  126. def clear(): Unit = {
  127. val jedis = redisPool.getResource
  128. try {
  129. jedis.select(db)
  130. jedis.flushDB
  131. } finally if (jedis != null) jedis.close()
  132. }
  133. override def close(): Unit = {
  134. redisPool.close()
  135. }
  136. def shutdown(): Unit = {
  137. try close()
  138. catch {
  139. case e: IOException => logger.error("close redis connection error!", e)
  140. }
  141. }
  142. }