SimpleDataSource.java 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
  1. package com.primeton.dsp.datarelease.data.bdata;
  2. import lombok.extern.slf4j.Slf4j;
  3. import java.io.IOException;
  4. import java.io.PrintWriter;
  5. import java.lang.reflect.InvocationHandler;
  6. import java.lang.reflect.Method;
  7. import java.lang.reflect.Proxy;
  8. import java.sql.Connection;
  9. import java.sql.SQLException;
  10. import java.sql.SQLFeatureNotSupportedException;
  11. import java.util.ArrayList;
  12. import java.util.Date;
  13. import java.util.List;
  14. import java.util.Properties;
  15. import java.util.concurrent.ConcurrentHashMap;
  16. import java.util.concurrent.ConcurrentMap;
  17. import java.util.concurrent.Semaphore;
  18. import java.util.concurrent.TimeUnit;
  19. /**
  20. *
  21. * @author zhaopx
  22. */
  23. @Slf4j
  24. public class SimpleDataSource implements PooledDataSource {
  25. private ConcurrentMap<Connection, Date> pool = new ConcurrentHashMap<Connection, Date>();
  26. private int maxSize;
  27. private int minSize;
  28. private int waitTime;
  29. private Semaphore semaphore;
  30. private final Properties connProps=new Properties();
  31. private HiveConnectionService connectionService;
  32. public SimpleDataSource(HiveConnectionService connectionService, Properties poolProperties) {
  33. this.connectionService = connectionService;
  34. connProps.putAll(poolProperties);
  35. maxSize = Integer.parseInt(poolProperties.getProperty("pool.max", "15"));
  36. minSize = Integer.parseInt(poolProperties.getProperty("pool.min", "3"));
  37. waitTime = Integer.parseInt(poolProperties.getProperty("pool.waitTime", "5"));
  38. initConnections(poolProperties);
  39. }
  40. private void initConnections(Properties poolProperties) {
  41. log.info("Initializing simple data source{ pool.max = " + maxSize + ", pool.min = " + minSize + "}");
  42. semaphore = new Semaphore(maxSize, false);
  43. if (minSize > 0 && minSize < maxSize) {
  44. try {
  45. List<Connection> connections = new ArrayList<Connection>();
  46. for (int i = 0; i < minSize; i++) {
  47. connections.add(getConnection());
  48. }
  49. for (Connection conn : connections) {
  50. conn.close();
  51. }
  52. } catch (SQLException e) {
  53. throw new RuntimeException(e);
  54. }
  55. }
  56. }
  57. public void close() throws IOException {
  58. Exception ex = null;
  59. for (Connection conn : pool.keySet()) {
  60. try {
  61. conn.close();
  62. } catch (Exception e) { ex = e; }
  63. }
  64. pool.clear();
  65. if(ex != null) {
  66. throw new IOException(ex);
  67. }
  68. }
  69. private void closeConnection(Connection realConnection) throws SQLException {
  70. synchronized (pool) {
  71. if (pool.size() <= maxSize) {
  72. pool.put(realConnection, new Date());
  73. return;
  74. }
  75. }
  76. try {
  77. realConnection.close();
  78. } finally {
  79. semaphore.release();
  80. }
  81. }
  82. public Connection getConnection() throws SQLException {
  83. return getConnection(null, null);
  84. }
  85. public Connection getConnection(String username, String password) throws SQLException {
  86. synchronized (pool) {
  87. if (!pool.isEmpty()) {
  88. Connection realConn = pool.keySet().iterator().next();
  89. pool.remove(realConn);
  90. realConn.setAutoCommit(true);
  91. return getProxyConnection(realConn);
  92. }
  93. }
  94. try {
  95. if(semaphore.tryAcquire(waitTime,TimeUnit.SECONDS)) {
  96. return getProxyConnection(getRealConnection(username, password));
  97. }else {
  98. throw new RuntimeException("Connection pool is full: "+maxSize);
  99. }
  100. }catch(SQLException e) {
  101. semaphore.release();
  102. throw e;
  103. } catch (InterruptedException e) {
  104. throw new RuntimeException(e);
  105. }
  106. }
  107. private Connection getProxyConnection(final Connection realConnection) {
  108. InvocationHandler handler = new InvocationHandler() {
  109. public Object invoke(Object proxy, Method method, Object[] params) throws Exception {
  110. Object ret = null;
  111. if ("close".equals(method.getName())) {
  112. closeConnection(realConnection);
  113. }else if ("unwrap".equals(method.getName())) {
  114. ret=realConnection;
  115. } else {
  116. ret = method.invoke(realConnection, params);
  117. }
  118. return ret;
  119. }
  120. };
  121. return (Connection) Proxy.newProxyInstance(Connection.class.getClassLoader(), new Class[] { Connection.class }, handler);
  122. }
  123. protected Connection getRealConnection(String username, String password) throws SQLException {
  124. try {
  125. return connectionService.getConnection();
  126. } catch (Exception e) {
  127. throw new RuntimeException(e);
  128. }
  129. }
  130. public void setProperties(Properties properties){
  131. this.connProps.putAll(properties);
  132. }
  133. public PrintWriter getLogWriter() throws SQLException {
  134. return null;
  135. }
  136. public void setLogWriter(PrintWriter out) throws SQLException {
  137. }
  138. public void setLoginTimeout(int seconds) throws SQLException {
  139. }
  140. public int getLoginTimeout() throws SQLException {
  141. return 0;
  142. }
  143. public java.util.logging.Logger getParentLogger() throws SQLFeatureNotSupportedException {
  144. return null;
  145. }
  146. public <T> T unwrap(Class<T> iface) throws SQLException {
  147. return null;
  148. }
  149. public boolean isWrapperFor(Class<?> iface) throws SQLException {
  150. return false;
  151. }
  152. public void setIdleValidationQuery(int idleInSeconds,String validationQuery){
  153. //do noting
  154. }
  155. public int getMaxSize() {
  156. return maxSize;
  157. }
  158. public int getMinSize() {
  159. return minSize;
  160. }
  161. public int getWaitTime() {
  162. return waitTime;
  163. }
  164. public Properties getConnProps() {
  165. return connProps;
  166. }
  167. }