package com.primeton.damp.bigdata; import lombok.extern.slf4j.Slf4j; import java.io.IOException; import java.io.PrintWriter; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.sql.Connection; import java.sql.SQLException; import java.sql.SQLFeatureNotSupportedException; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; /** * * @author zhaopx */ @Slf4j public class SimpleDataSource implements PooledDataSource { private ConcurrentMap pool = new ConcurrentHashMap(); private int maxSize; private int minSize; private int waitTime; private Semaphore semaphore; private final Properties connProps=new Properties(); private HiveConnectionService connectionService; public SimpleDataSource(HiveConnectionService connectionService, Properties poolProperties) { this.connectionService = connectionService; connProps.putAll(poolProperties); maxSize = Integer.parseInt(poolProperties.getProperty("pool.max", "15")); minSize = Integer.parseInt(poolProperties.getProperty("pool.min", "3")); waitTime = Integer.parseInt(poolProperties.getProperty("pool.waitTime", "5")); initConnections(poolProperties); } private void initConnections(Properties poolProperties) { log.info("Initializing simple data source{ pool.max = " + maxSize + ", pool.min = " + minSize + "}"); semaphore = new Semaphore(maxSize, false); if (minSize > 0 && minSize < maxSize) { try { // 尝试获得连接 Connection conn = getConnection(); conn.close(); } catch (SQLException e) { throw new RuntimeException(e); } } } public void close() throws IOException { Exception ex = null; for (Connection conn : pool.keySet()) { try { conn.close(); } catch (Exception e) { ex = e; } } pool.clear(); if(ex != null) { throw new IOException(ex); } log.info("closed data source{ pool.max = " + maxSize + ", pool.min = " + minSize + "}"); } private void closeConnection(Connection realConnection) throws SQLException { synchronized (pool) { if (pool.size() <= maxSize) { pool.put(realConnection, new Date()); return; } } try { realConnection.close(); } finally { semaphore.release(); } } public Connection getConnection() throws SQLException { return getConnection(null, null); } public Connection getConnection(String username, String password) throws SQLException { synchronized (pool) { if (!pool.isEmpty()) { Connection realConn = pool.keySet().iterator().next(); pool.remove(realConn); // hive jdbc 不支持设置 AutoCommit //realConn.setAutoCommit(true); return getProxyConnection(realConn); } } try { if(semaphore.tryAcquire(waitTime,TimeUnit.SECONDS)) { return getProxyConnection(getRealConnection(username, password)); }else { throw new RuntimeException("Connection pool is full: "+maxSize); } }catch(SQLException e) { semaphore.release(); throw e; } catch (InterruptedException e) { throw new RuntimeException(e); } } private Connection getProxyConnection(final Connection realConnection) { InvocationHandler handler = new InvocationHandler() { public Object invoke(Object proxy, Method method, Object[] params) throws Exception { Object ret = null; if ("close".equals(method.getName())) { closeConnection(realConnection); }else if ("unwrap".equals(method.getName())) { ret=realConnection; } else { ret = method.invoke(realConnection, params); } return ret; } }; return (Connection) Proxy.newProxyInstance(Connection.class.getClassLoader(), new Class[] { Connection.class }, handler); } protected Connection getRealConnection(String username, String password) throws SQLException { try { return connectionService.getConnection(); } catch (Exception e) { throw new SQLException(e); } } public void setProperties(Properties properties){ this.connProps.putAll(properties); } public PrintWriter getLogWriter() throws SQLException { return null; } public void setLogWriter(PrintWriter out) throws SQLException { } public void setLoginTimeout(int seconds) throws SQLException { } public int getLoginTimeout() throws SQLException { return 0; } public java.util.logging.Logger getParentLogger() throws SQLFeatureNotSupportedException { return null; } public T unwrap(Class iface) throws SQLException { return null; } public boolean isWrapperFor(Class iface) throws SQLException { return false; } public void setIdleValidationQuery(int idleInSeconds,String validationQuery){ //do noting } public int getMaxSize() { return maxSize; } public int getMinSize() { return minSize; } public int getWaitTime() { return waitTime; } public Properties getConnProps() { return connProps; } }