SimpleDataSource.java 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209
  1. package com.yiidata.dataops.common.bigdata;
  2. import lombok.extern.slf4j.Slf4j;
  3. import java.io.IOException;
  4. import java.io.PrintWriter;
  5. import java.lang.reflect.InvocationHandler;
  6. import java.lang.reflect.Method;
  7. import java.lang.reflect.Proxy;
  8. import java.sql.Connection;
  9. import java.sql.SQLException;
  10. import java.sql.SQLFeatureNotSupportedException;
  11. import java.util.Date;
  12. import java.util.Properties;
  13. import java.util.concurrent.ConcurrentHashMap;
  14. import java.util.concurrent.ConcurrentMap;
  15. import java.util.concurrent.Semaphore;
  16. import java.util.concurrent.TimeUnit;
  17. /**
  18. *
  19. * @author zhaopx
  20. */
  21. @Slf4j
  22. public class SimpleDataSource implements PooledDataSource {
  23. private ConcurrentMap<Connection, Date> pool = new ConcurrentHashMap<Connection, Date>();
  24. private int maxSize;
  25. private int minSize;
  26. private int waitTime;
  27. private Semaphore semaphore;
  28. private final Properties connProps=new Properties();
  29. private HiveConnectionService connectionService;
  30. public SimpleDataSource(HiveConnectionService connectionService, Properties poolProperties) {
  31. this.connectionService = connectionService;
  32. connProps.putAll(poolProperties);
  33. maxSize = Integer.parseInt(poolProperties.getProperty("pool.max", "15"));
  34. minSize = Integer.parseInt(poolProperties.getProperty("pool.min", "3"));
  35. waitTime = Integer.parseInt(poolProperties.getProperty("pool.waitTime", "5"));
  36. initConnections(poolProperties);
  37. }
  38. private void initConnections(Properties poolProperties) {
  39. log.info("Initializing simple data source{ pool.max = " + maxSize + ", pool.min = " + minSize + "}");
  40. semaphore = new Semaphore(maxSize, false);
  41. if (minSize > 0 && minSize < maxSize) {
  42. try {
  43. // 尝试获得连接
  44. Connection conn = getConnection();
  45. if(conn!=null){
  46. conn.close();
  47. }
  48. } catch (SQLException e) {
  49. throw new IllegalArgumentException(e);
  50. }
  51. }
  52. }
  53. public void close() throws IOException {
  54. Exception ex = null;
  55. for (Connection conn : pool.keySet()) {
  56. try {
  57. conn.close();
  58. } catch (Exception e) { ex = e; }
  59. }
  60. pool.clear();
  61. if(ex != null) {
  62. throw new IOException(ex);
  63. }
  64. log.info("closed data source{ pool.max = " + maxSize + ", pool.min = " + minSize + "}");
  65. }
  66. private void closeConnection(Connection realConnection) throws SQLException {
  67. synchronized (pool) {
  68. if (pool.size() <= maxSize) {
  69. pool.put(realConnection, new Date());
  70. return;
  71. }
  72. }
  73. try {
  74. realConnection.close();
  75. } finally {
  76. semaphore.release();
  77. }
  78. }
  79. public Connection getConnection() throws SQLException {
  80. return getConnection(null, null);
  81. }
  82. public Connection getConnection(String username, String password) throws SQLException {
  83. synchronized (pool) {
  84. if (!pool.isEmpty()) {
  85. Connection realConn = pool.keySet().iterator().next();
  86. pool.remove(realConn);
  87. // hive jdbc 不支持设置 AutoCommit
  88. //realConn.setAutoCommit(true);
  89. return getProxyConnection(realConn);
  90. }
  91. }
  92. try {
  93. if(semaphore.tryAcquire(waitTime,TimeUnit.SECONDS)) {
  94. return getProxyConnection(getRealConnection(username, password));
  95. }else {
  96. throw new IllegalArgumentException("Connection pool is full: "+maxSize);
  97. }
  98. }catch(SQLException e) {
  99. semaphore.release();
  100. throw e;
  101. } catch (InterruptedException e) {
  102. log.error(e.getMessage(),e);
  103. Thread.currentThread().interrupt();
  104. return null;
  105. }
  106. }
  107. private Connection getProxyConnection(final Connection realConnection) {
  108. InvocationHandler handler = new InvocationHandler() {
  109. public Object invoke(Object proxy, Method method, Object[] params) throws Exception {
  110. Object ret = null;
  111. if ("close".equals(method.getName())) {
  112. closeConnection(realConnection);
  113. }else if ("unwrap".equals(method.getName())) {
  114. ret=realConnection;
  115. } else {
  116. ret = method.invoke(realConnection, params);
  117. }
  118. return ret;
  119. }
  120. };
  121. return (Connection) Proxy.newProxyInstance(Connection.class.getClassLoader(), new Class[] { Connection.class }, handler);
  122. }
  123. protected Connection getRealConnection(String username, String password) throws SQLException {
  124. try {
  125. return connectionService.getConnection();
  126. } catch (Exception e) {
  127. throw new SQLException(e);
  128. }
  129. }
  130. public void setProperties(Properties properties){
  131. this.connProps.putAll(properties);
  132. }
  133. public PrintWriter getLogWriter() throws SQLException {
  134. return null;
  135. }
  136. public void setLogWriter(PrintWriter out) throws SQLException {
  137. throw new UnsupportedOperationException();
  138. }
  139. public void setLoginTimeout(int seconds) throws SQLException {
  140. throw new UnsupportedOperationException();
  141. }
  142. public int getLoginTimeout() throws SQLException {
  143. return 0;
  144. }
  145. public java.util.logging.Logger getParentLogger() throws SQLFeatureNotSupportedException {
  146. return null;
  147. }
  148. public <T> T unwrap(Class<T> iface) throws SQLException {
  149. return null;
  150. }
  151. public boolean isWrapperFor(Class<?> iface) throws SQLException {
  152. return false;
  153. }
  154. public void setIdleValidationQuery(int idleInSeconds,String validationQuery){
  155. //do noting
  156. }
  157. public int getMaxSize() {
  158. return maxSize;
  159. }
  160. public int getMinSize() {
  161. return minSize;
  162. }
  163. public int getWaitTime() {
  164. return waitTime;
  165. }
  166. public Properties getConnProps() {
  167. return connProps;
  168. }
  169. }