SimpleDataSource.java 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275
  1. package org.apache.kyuubi.engine.jdbc.session;
  2. import org.apache.kyuubi.config.KyuubiConf;
  3. import org.apache.kyuubi.engine.jdbc.connection.JdbcConnectionProvider;
  4. import org.slf4j.Logger;
  5. import org.slf4j.LoggerFactory;
  6. import java.io.IOException;
  7. import java.io.PrintWriter;
  8. import java.lang.reflect.InvocationHandler;
  9. import java.lang.reflect.Method;
  10. import java.lang.reflect.Proxy;
  11. import java.sql.Connection;
  12. import java.sql.SQLException;
  13. import java.sql.SQLFeatureNotSupportedException;
  14. import java.util.Date;
  15. import java.util.Properties;
  16. import java.util.Set;
  17. import java.util.concurrent.ConcurrentHashMap;
  18. import java.util.concurrent.ConcurrentMap;
  19. import java.util.concurrent.Semaphore;
  20. import java.util.concurrent.TimeUnit;
  21. /**
  22. * 自实现的基于 JdbcConnectionProvider 的数据源 连接池
  23. * @author zhaopx
  24. */
  25. public class SimpleDataSource implements PooledDataSource {
  26. final Logger log = LoggerFactory.getLogger(SimpleDataSource.class);
  27. /**
  28. * 缓存的池子
  29. */
  30. private final ConcurrentMap<JdbcConnectionWrapper, Date> pool = new ConcurrentHashMap<JdbcConnectionWrapper, Date>();
  31. /**
  32. * 5 个连接,最大
  33. */
  34. private int maxSize = 10;
  35. /**
  36. * 最小连接数
  37. */
  38. private int minSize = 1;
  39. /**
  40. * 连接等待时间
  41. */
  42. private int waitTime = 30000;
  43. private Semaphore semaphore;
  44. private final Properties connProps = new Properties();
  45. private JdbcConnectionProvider jdbcConnectionProvider;
  46. private KyuubiConf kyuubiConf;
  47. public SimpleDataSource(JdbcConnectionProvider jdbcConnectionProvider, KyuubiConf kyuubiConf) {
  48. this.jdbcConnectionProvider = jdbcConnectionProvider;
  49. this.kyuubiConf = kyuubiConf;
  50. this.maxSize = Math.max(Integer.parseInt(kyuubiConf.getOption("kyuubi.engine.jdbc.pool.maxSize").getOrElse(()-> "10")), 10);
  51. this.minSize = Math.max(Integer.parseInt(kyuubiConf.getOption("kyuubi.engine.jdbc.pool.minSize").getOrElse(()-> "1")), 1);
  52. this.waitTime = Math.max(Integer.parseInt(kyuubiConf.getOption("kyuubi.engine.jdbc.pool.maxWait").getOrElse(()-> "30000")), 1000);
  53. initConnections();
  54. }
  55. private void initConnections() {
  56. log.info("Initializing simple data source{ pool.max = " + maxSize + ", pool.min = " + minSize + "}");
  57. semaphore = new Semaphore(maxSize, false);
  58. if (minSize > 0 && minSize < maxSize) {
  59. try {
  60. // 尝试获得连接
  61. Connection conn = getRealConnection(null, null);
  62. conn.close();
  63. } catch (SQLException e) {
  64. throw new RuntimeException(e);
  65. }
  66. }
  67. }
  68. public void close() throws IOException {
  69. Exception ex = null;
  70. for (JdbcConnectionWrapper conn : pool.keySet()) {
  71. try {
  72. conn.directClose();
  73. } catch (Exception e) { ex = e; }
  74. }
  75. pool.clear();
  76. if(ex != null) {
  77. throw new IOException(ex);
  78. }
  79. log.info("closed data source{ pool.max = " + maxSize + ", pool.min = " + minSize + "}");
  80. }
  81. /**
  82. * 关闭连接,这里是软关闭
  83. * @param realConnection
  84. * @throws SQLException
  85. */
  86. private void closeConnection(Connection realConnection, Connection proxyConnection) throws SQLException {
  87. synchronized (pool) {
  88. if (pool.size() <= maxSize && realConnection instanceof JdbcConnectionWrapper && ((JdbcConnectionWrapper)realConnection).isValidFlag()) {
  89. // 正常的连接不关闭,放到池中
  90. pool.put((JdbcConnectionWrapper)realConnection, new Date());
  91. return;
  92. } else if(pool.size() <= maxSize && !(realConnection instanceof JdbcConnectionWrapper)) {
  93. pool.put(new JdbcConnectionWrapper(proxyConnection, realConnection), new Date());
  94. return;
  95. }
  96. }
  97. try {
  98. realConnection.close();
  99. } finally {
  100. semaphore.release();
  101. }
  102. }
  103. /**
  104. * 关闭连接,这里是软关闭
  105. * @param realConnection
  106. * @throws SQLException
  107. */
  108. public void closeConnectionAndRemove(Connection realConnection) throws SQLException {
  109. if(realConnection == null) {
  110. return;
  111. }
  112. synchronized (pool) {
  113. // 从缓存移除
  114. if(realConnection instanceof JdbcConnectionWrapper) {
  115. pool.remove((JdbcConnectionWrapper) realConnection);
  116. } else {
  117. pool.remove(realConnection);
  118. }
  119. }
  120. try {
  121. if(realConnection instanceof JdbcConnectionWrapper) {
  122. ((JdbcConnectionWrapper)realConnection).directClose();
  123. } else {
  124. realConnection.close();
  125. }
  126. } catch (Exception ignore) {
  127. } finally {
  128. semaphore.release();
  129. }
  130. }
  131. public Connection getConnection() throws SQLException {
  132. return getConnection(null, null);
  133. }
  134. public Connection getConnection(String username, String password) throws SQLException {
  135. synchronized (pool) {
  136. if (!pool.isEmpty()) {
  137. JdbcConnectionWrapper realConn = pool.keySet().iterator().next();
  138. pool.remove(realConn);
  139. if(realConn.isValidFlag()) {
  140. return realConn;
  141. }
  142. // hive jdbc 不支持设置 AutoCommit
  143. //realConn.setAutoCommit(true);
  144. return getProxyConnection(realConn);
  145. }
  146. }
  147. try {
  148. if (semaphore.tryAcquire(waitTime, TimeUnit.MILLISECONDS)) {
  149. return getProxyConnection(getRealConnection(username, password));
  150. } else {
  151. throw new RuntimeException("Connection pool is full: " + maxSize);
  152. }
  153. } catch (SQLException e) {
  154. semaphore.release();
  155. throw e;
  156. } catch (InterruptedException e) {
  157. throw new RuntimeException(e);
  158. }
  159. }
  160. private Connection getProxyConnection(final Connection realConnection) {
  161. InvocationHandler handler = new InvocationHandler() {
  162. public Object invoke(Object proxy, Method method, Object[] params) throws Exception {
  163. Object ret = null;
  164. if ("close".equals(method.getName())) {
  165. closeConnection(realConnection, (Connection)proxy);
  166. } else if ("directClose".equals(method.getName())) {
  167. // 实际的关闭
  168. try {
  169. realConnection.close();
  170. } catch (Exception ignore) {}
  171. ret = Void.TYPE.newInstance();
  172. } else if ("unwrap".equals(method.getName())) {
  173. ret = realConnection;
  174. } else {
  175. ret = method.invoke(realConnection, params);
  176. }
  177. return ret;
  178. }
  179. };
  180. return new JdbcConnectionWrapper((JdbcConnection) Proxy.newProxyInstance(JdbcConnection.class.getClassLoader(), new Class[] { JdbcConnection.class }, handler), realConnection);
  181. }
  182. public Connection getRealConnection(String username, String password) throws SQLException {
  183. try {
  184. return jdbcConnectionProvider.getConnection(kyuubiConf);
  185. } catch (Exception e) {
  186. throw new SQLException(e);
  187. }
  188. }
  189. public void setProperties(Properties properties){
  190. this.connProps.putAll(properties);
  191. }
  192. public PrintWriter getLogWriter() throws SQLException {
  193. return null;
  194. }
  195. public void setLogWriter(PrintWriter out) throws SQLException {
  196. }
  197. public void setLoginTimeout(int seconds) throws SQLException {
  198. }
  199. public int getLoginTimeout() throws SQLException {
  200. return 0;
  201. }
  202. public java.util.logging.Logger getParentLogger() throws SQLFeatureNotSupportedException {
  203. return null;
  204. }
  205. public <T> T unwrap(Class<T> iface) throws SQLException {
  206. return null;
  207. }
  208. public boolean isWrapperFor(Class<?> iface) throws SQLException {
  209. return false;
  210. }
  211. public void setIdleValidationQuery(int idleInSeconds,String validationQuery){
  212. //do noting
  213. }
  214. public int getMaxSize() {
  215. return maxSize;
  216. }
  217. public int getMinSize() {
  218. return minSize;
  219. }
  220. public int getWaitTime() {
  221. return waitTime;
  222. }
  223. public Properties getConnProps() {
  224. return connProps;
  225. }
  226. /**
  227. * 返回内置的绑定
  228. * @return
  229. */
  230. public Set<JdbcConnectionWrapper> getPoolSet() {
  231. return pool.keySet();
  232. }
  233. }