SimpleDataSource.java 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. package com.primeton.damp.bigdata;
  2. import lombok.extern.slf4j.Slf4j;
  3. import java.io.IOException;
  4. import java.io.PrintWriter;
  5. import java.lang.reflect.InvocationHandler;
  6. import java.lang.reflect.Method;
  7. import java.lang.reflect.Proxy;
  8. import java.sql.Connection;
  9. import java.sql.SQLException;
  10. import java.sql.SQLFeatureNotSupportedException;
  11. import java.util.ArrayList;
  12. import java.util.Date;
  13. import java.util.List;
  14. import java.util.Properties;
  15. import java.util.concurrent.ConcurrentHashMap;
  16. import java.util.concurrent.ConcurrentMap;
  17. import java.util.concurrent.Semaphore;
  18. import java.util.concurrent.TimeUnit;
  19. /**
  20. *
  21. * @author zhaopx
  22. */
  23. @Slf4j
  24. public class SimpleDataSource implements PooledDataSource {
  25. private ConcurrentMap<Connection, Date> pool = new ConcurrentHashMap<Connection, Date>();
  26. private int maxSize;
  27. private int minSize;
  28. private int waitTime;
  29. private Semaphore semaphore;
  30. private final Properties connProps=new Properties();
  31. private HiveConnectionService connectionService;
  32. public SimpleDataSource(HiveConnectionService connectionService, Properties poolProperties) {
  33. this.connectionService = connectionService;
  34. connProps.putAll(poolProperties);
  35. maxSize = Integer.parseInt(poolProperties.getProperty("pool.max", "15"));
  36. minSize = Integer.parseInt(poolProperties.getProperty("pool.min", "3"));
  37. waitTime = Integer.parseInt(poolProperties.getProperty("pool.waitTime", "5"));
  38. initConnections(poolProperties);
  39. }
  40. private void initConnections(Properties poolProperties) {
  41. log.info("Initializing simple data source{ pool.max = " + maxSize + ", pool.min = " + minSize + "}");
  42. semaphore = new Semaphore(maxSize, false);
  43. if (minSize > 0 && minSize < maxSize) {
  44. try {
  45. // 尝试获得连接
  46. Connection conn = getConnection();
  47. conn.close();
  48. } catch (SQLException e) {
  49. throw new RuntimeException(e);
  50. }
  51. }
  52. }
  53. public void close() throws IOException {
  54. Exception ex = null;
  55. for (Connection conn : pool.keySet()) {
  56. try {
  57. conn.close();
  58. } catch (Exception e) { ex = e; }
  59. }
  60. pool.clear();
  61. if(ex != null) {
  62. throw new IOException(ex);
  63. }
  64. log.info("closed data source{ pool.max = " + maxSize + ", pool.min = " + minSize + "}");
  65. }
  66. private void closeConnection(Connection realConnection) throws SQLException {
  67. synchronized (pool) {
  68. if (pool.size() <= maxSize) {
  69. pool.put(realConnection, new Date());
  70. return;
  71. }
  72. }
  73. try {
  74. realConnection.close();
  75. } finally {
  76. semaphore.release();
  77. }
  78. }
  79. public Connection getConnection() throws SQLException {
  80. return getConnection(null, null);
  81. }
  82. public Connection getConnection(String username, String password) throws SQLException {
  83. synchronized (pool) {
  84. if (!pool.isEmpty()) {
  85. Connection realConn = pool.keySet().iterator().next();
  86. pool.remove(realConn);
  87. // hive jdbc 不支持设置 AutoCommit
  88. //realConn.setAutoCommit(true);
  89. return getProxyConnection(realConn);
  90. }
  91. }
  92. try {
  93. if(semaphore.tryAcquire(waitTime,TimeUnit.SECONDS)) {
  94. return getProxyConnection(getRealConnection(username, password));
  95. }else {
  96. throw new RuntimeException("Connection pool is full: "+maxSize);
  97. }
  98. }catch(SQLException e) {
  99. semaphore.release();
  100. throw e;
  101. } catch (InterruptedException e) {
  102. throw new RuntimeException(e);
  103. }
  104. }
  105. private Connection getProxyConnection(final Connection realConnection) {
  106. InvocationHandler handler = new InvocationHandler() {
  107. public Object invoke(Object proxy, Method method, Object[] params) throws Exception {
  108. Object ret = null;
  109. if ("close".equals(method.getName())) {
  110. closeConnection(realConnection);
  111. }else if ("unwrap".equals(method.getName())) {
  112. ret=realConnection;
  113. } else {
  114. ret = method.invoke(realConnection, params);
  115. }
  116. return ret;
  117. }
  118. };
  119. return (Connection) Proxy.newProxyInstance(Connection.class.getClassLoader(), new Class[] { Connection.class }, handler);
  120. }
  121. protected Connection getRealConnection(String username, String password) throws SQLException {
  122. try {
  123. return connectionService.getConnection();
  124. } catch (Exception e) {
  125. throw new SQLException(e);
  126. }
  127. }
  128. public void setProperties(Properties properties){
  129. this.connProps.putAll(properties);
  130. }
  131. public PrintWriter getLogWriter() throws SQLException {
  132. return null;
  133. }
  134. public void setLogWriter(PrintWriter out) throws SQLException {
  135. }
  136. public void setLoginTimeout(int seconds) throws SQLException {
  137. }
  138. public int getLoginTimeout() throws SQLException {
  139. return 0;
  140. }
  141. public java.util.logging.Logger getParentLogger() throws SQLFeatureNotSupportedException {
  142. return null;
  143. }
  144. public <T> T unwrap(Class<T> iface) throws SQLException {
  145. return null;
  146. }
  147. public boolean isWrapperFor(Class<?> iface) throws SQLException {
  148. return false;
  149. }
  150. public void setIdleValidationQuery(int idleInSeconds,String validationQuery){
  151. //do noting
  152. }
  153. public int getMaxSize() {
  154. return maxSize;
  155. }
  156. public int getMinSize() {
  157. return minSize;
  158. }
  159. public int getWaitTime() {
  160. return waitTime;
  161. }
  162. public Properties getConnProps() {
  163. return connProps;
  164. }
  165. }