123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206 |
- package com.primeton.damp.bigdata;
- import lombok.extern.slf4j.Slf4j;
- import java.io.IOException;
- import java.io.PrintWriter;
- import java.lang.reflect.InvocationHandler;
- import java.lang.reflect.Method;
- import java.lang.reflect.Proxy;
- import java.sql.Connection;
- import java.sql.SQLException;
- import java.sql.SQLFeatureNotSupportedException;
- import java.util.ArrayList;
- import java.util.Date;
- import java.util.List;
- import java.util.Properties;
- import java.util.concurrent.ConcurrentHashMap;
- import java.util.concurrent.ConcurrentMap;
- import java.util.concurrent.Semaphore;
- import java.util.concurrent.TimeUnit;
- /**
- *
- * @author zhaopx
- */
- @Slf4j
- public class SimpleDataSource implements PooledDataSource {
- private ConcurrentMap<Connection, Date> pool = new ConcurrentHashMap<Connection, Date>();
-
- private int maxSize;
- private int minSize;
- private int waitTime;
-
- private Semaphore semaphore;
- private final Properties connProps=new Properties();
- private HiveConnectionService connectionService;
-
- public SimpleDataSource(HiveConnectionService connectionService, Properties poolProperties) {
- this.connectionService = connectionService;
- connProps.putAll(poolProperties);
- maxSize = Integer.parseInt(poolProperties.getProperty("pool.max", "15"));
- minSize = Integer.parseInt(poolProperties.getProperty("pool.min", "3"));
- waitTime = Integer.parseInt(poolProperties.getProperty("pool.waitTime", "5"));
- initConnections(poolProperties);
- }
- private void initConnections(Properties poolProperties) {
- log.info("Initializing simple data source{ pool.max = " + maxSize + ", pool.min = " + minSize + "}");
- semaphore = new Semaphore(maxSize, false);
- if (minSize > 0 && minSize < maxSize) {
- try {
- // 尝试获得连接
- Connection conn = getConnection();
- conn.close();
- } catch (SQLException e) {
- throw new RuntimeException(e);
- }
- }
- }
- public void close() throws IOException {
- Exception ex = null;
- for (Connection conn : pool.keySet()) {
- try {
- conn.close();
- } catch (Exception e) { ex = e; }
- }
- pool.clear();
- if(ex != null) {
- throw new IOException(ex);
- }
- log.info("closed data source{ pool.max = " + maxSize + ", pool.min = " + minSize + "}");
- }
-
- private void closeConnection(Connection realConnection) throws SQLException {
- synchronized (pool) {
- if (pool.size() <= maxSize) {
- pool.put(realConnection, new Date());
- return;
- }
- }
-
- try {
- realConnection.close();
- } finally {
- semaphore.release();
- }
- }
- public Connection getConnection() throws SQLException {
- return getConnection(null, null);
- }
- public Connection getConnection(String username, String password) throws SQLException {
- synchronized (pool) {
- if (!pool.isEmpty()) {
- Connection realConn = pool.keySet().iterator().next();
- pool.remove(realConn);
- // hive jdbc 不支持设置 AutoCommit
- //realConn.setAutoCommit(true);
- return getProxyConnection(realConn);
- }
- }
-
- try {
- if(semaphore.tryAcquire(waitTime,TimeUnit.SECONDS)) {
- return getProxyConnection(getRealConnection(username, password));
- }else {
- throw new RuntimeException("Connection pool is full: "+maxSize);
- }
- }catch(SQLException e) {
- semaphore.release();
- throw e;
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
-
- private Connection getProxyConnection(final Connection realConnection) {
- InvocationHandler handler = new InvocationHandler() {
- public Object invoke(Object proxy, Method method, Object[] params) throws Exception {
- Object ret = null;
- if ("close".equals(method.getName())) {
- closeConnection(realConnection);
- }else if ("unwrap".equals(method.getName())) {
- ret=realConnection;
- } else {
- ret = method.invoke(realConnection, params);
- }
- return ret;
- }
- };
- return (Connection) Proxy.newProxyInstance(Connection.class.getClassLoader(), new Class[] { Connection.class }, handler);
- }
-
- protected Connection getRealConnection(String username, String password) throws SQLException {
- try {
- return connectionService.getConnection();
- } catch (Exception e) {
- throw new SQLException(e);
- }
- }
- public void setProperties(Properties properties){
- this.connProps.putAll(properties);
- }
-
- public PrintWriter getLogWriter() throws SQLException {
- return null;
- }
- public void setLogWriter(PrintWriter out) throws SQLException {
- }
- public void setLoginTimeout(int seconds) throws SQLException {
- }
- public int getLoginTimeout() throws SQLException {
- return 0;
- }
- public java.util.logging.Logger getParentLogger() throws SQLFeatureNotSupportedException {
- return null;
- }
- public <T> T unwrap(Class<T> iface) throws SQLException {
- return null;
- }
- public boolean isWrapperFor(Class<?> iface) throws SQLException {
- return false;
- }
- public void setIdleValidationQuery(int idleInSeconds,String validationQuery){
- //do noting
- }
- public int getMaxSize() {
- return maxSize;
- }
- public int getMinSize() {
- return minSize;
- }
- public int getWaitTime() {
- return waitTime;
- }
- public Properties getConnProps() {
- return connProps;
- }
- }
|