|
@@ -1,6 +1,9 @@
|
|
|
-package com.yiidata.dataops.common.bigdata;
|
|
|
|
|
|
|
+package org.apache.kyuubi.engine.jdbc.session;
|
|
|
|
|
|
|
|
-import lombok.extern.slf4j.Slf4j;
|
|
|
|
|
|
|
+import org.apache.kyuubi.config.KyuubiConf;
|
|
|
|
|
+import org.apache.kyuubi.engine.jdbc.connection.JdbcConnectionProvider;
|
|
|
|
|
+import org.slf4j.Logger;
|
|
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
|
|
|
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
|
import java.io.PrintWriter;
|
|
import java.io.PrintWriter;
|
|
@@ -12,64 +15,77 @@ import java.sql.SQLException;
|
|
|
import java.sql.SQLFeatureNotSupportedException;
|
|
import java.sql.SQLFeatureNotSupportedException;
|
|
|
import java.util.Date;
|
|
import java.util.Date;
|
|
|
import java.util.Properties;
|
|
import java.util.Properties;
|
|
|
|
|
+import java.util.Set;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
import java.util.concurrent.ConcurrentMap;
|
|
import java.util.concurrent.ConcurrentMap;
|
|
|
import java.util.concurrent.Semaphore;
|
|
import java.util.concurrent.Semaphore;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
- *
|
|
|
|
|
|
|
+ * 自实现的基于 JdbcConnectionProvider 的数据源 连接池
|
|
|
* @author zhaopx
|
|
* @author zhaopx
|
|
|
*/
|
|
*/
|
|
|
|
|
|
|
|
-@Slf4j
|
|
|
|
|
-public class SimpleDataSource implements PooledDataSource {
|
|
|
|
|
|
|
+public class SimpleDataSource implements PooledDataSource {
|
|
|
|
|
|
|
|
- private ConcurrentMap<Connection, Date> pool = new ConcurrentHashMap<Connection, Date>();
|
|
|
|
|
-
|
|
|
|
|
|
|
+ final Logger log = LoggerFactory.getLogger(SimpleDataSource.class);
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 缓存的池子
|
|
|
|
|
+ */
|
|
|
|
|
+ private final ConcurrentMap<JdbcConnectionWrapper, Date> pool = new ConcurrentHashMap<JdbcConnectionWrapper, Date>();
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 5 个连接,最大
|
|
|
|
|
+ */
|
|
|
|
|
+ private int maxSize = 10;
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 最小连接数
|
|
|
|
|
+ */
|
|
|
|
|
+ private int minSize = 1;
|
|
|
|
|
|
|
|
- private int maxSize;
|
|
|
|
|
- private int minSize;
|
|
|
|
|
- private int waitTime;
|
|
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 连接等待时间
|
|
|
|
|
+ */
|
|
|
|
|
+ private int waitTime = 30000;
|
|
|
|
|
|
|
|
private Semaphore semaphore;
|
|
private Semaphore semaphore;
|
|
|
|
|
|
|
|
- private final Properties connProps=new Properties();
|
|
|
|
|
|
|
+ private final Properties connProps = new Properties();
|
|
|
|
|
|
|
|
|
|
|
|
|
- private HiveConnectionService connectionService;
|
|
|
|
|
|
|
+ private JdbcConnectionProvider jdbcConnectionProvider;
|
|
|
|
|
+ private KyuubiConf kyuubiConf;
|
|
|
|
|
|
|
|
- public SimpleDataSource(HiveConnectionService connectionService, Properties poolProperties) {
|
|
|
|
|
- this.connectionService = connectionService;
|
|
|
|
|
- connProps.putAll(poolProperties);
|
|
|
|
|
-
|
|
|
|
|
- maxSize = Integer.parseInt(poolProperties.getProperty("pool.max", "15"));
|
|
|
|
|
- minSize = Integer.parseInt(poolProperties.getProperty("pool.min", "3"));
|
|
|
|
|
- waitTime = Integer.parseInt(poolProperties.getProperty("pool.waitTime", "5"));
|
|
|
|
|
- initConnections(poolProperties);
|
|
|
|
|
|
|
+ public SimpleDataSource(JdbcConnectionProvider jdbcConnectionProvider, KyuubiConf kyuubiConf) {
|
|
|
|
|
+ this.jdbcConnectionProvider = jdbcConnectionProvider;
|
|
|
|
|
+ this.kyuubiConf = kyuubiConf;
|
|
|
|
|
+ this.maxSize = Math.max(Integer.parseInt(kyuubiConf.getOption("kyuubi.engine.jdbc.pool.maxSize").getOrElse(()-> "10")), 10);
|
|
|
|
|
+ this.minSize = Math.max(Integer.parseInt(kyuubiConf.getOption("kyuubi.engine.jdbc.pool.minSize").getOrElse(()-> "1")), 1);
|
|
|
|
|
+ this.waitTime = Math.max(Integer.parseInt(kyuubiConf.getOption("kyuubi.engine.jdbc.pool.maxWait").getOrElse(()-> "30000")), 1000);
|
|
|
|
|
+ initConnections();
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- private void initConnections(Properties poolProperties) {
|
|
|
|
|
|
|
+ private void initConnections() {
|
|
|
log.info("Initializing simple data source{ pool.max = " + maxSize + ", pool.min = " + minSize + "}");
|
|
log.info("Initializing simple data source{ pool.max = " + maxSize + ", pool.min = " + minSize + "}");
|
|
|
semaphore = new Semaphore(maxSize, false);
|
|
semaphore = new Semaphore(maxSize, false);
|
|
|
if (minSize > 0 && minSize < maxSize) {
|
|
if (minSize > 0 && minSize < maxSize) {
|
|
|
try {
|
|
try {
|
|
|
// 尝试获得连接
|
|
// 尝试获得连接
|
|
|
- Connection conn = getConnection();
|
|
|
|
|
- if(conn!=null){
|
|
|
|
|
- conn.close();
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ Connection conn = getRealConnection(null, null);
|
|
|
|
|
+ conn.close();
|
|
|
} catch (SQLException e) {
|
|
} catch (SQLException e) {
|
|
|
- throw new IllegalArgumentException(e);
|
|
|
|
|
|
|
+ throw new RuntimeException(e);
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
public void close() throws IOException {
|
|
public void close() throws IOException {
|
|
|
Exception ex = null;
|
|
Exception ex = null;
|
|
|
- for (Connection conn : pool.keySet()) {
|
|
|
|
|
|
|
+ for (JdbcConnectionWrapper conn : pool.keySet()) {
|
|
|
try {
|
|
try {
|
|
|
- conn.close();
|
|
|
|
|
|
|
+ conn.directClose();
|
|
|
} catch (Exception e) { ex = e; }
|
|
} catch (Exception e) { ex = e; }
|
|
|
}
|
|
}
|
|
|
pool.clear();
|
|
pool.clear();
|
|
@@ -78,15 +94,23 @@ public class SimpleDataSource implements PooledDataSource {
|
|
|
}
|
|
}
|
|
|
log.info("closed data source{ pool.max = " + maxSize + ", pool.min = " + minSize + "}");
|
|
log.info("closed data source{ pool.max = " + maxSize + ", pool.min = " + minSize + "}");
|
|
|
}
|
|
}
|
|
|
-
|
|
|
|
|
- private void closeConnection(Connection realConnection) throws SQLException {
|
|
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 关闭连接,这里是软关闭
|
|
|
|
|
+ * @param realConnection
|
|
|
|
|
+ * @throws SQLException
|
|
|
|
|
+ */
|
|
|
|
|
+ private void closeConnection(Connection realConnection, Connection proxyConnection) throws SQLException {
|
|
|
synchronized (pool) {
|
|
synchronized (pool) {
|
|
|
- if (pool.size() <= maxSize) {
|
|
|
|
|
- pool.put(realConnection, new Date());
|
|
|
|
|
|
|
+ if (pool.size() <= maxSize && realConnection instanceof JdbcConnectionWrapper && ((JdbcConnectionWrapper)realConnection).isValidFlag()) {
|
|
|
|
|
+ // 正常的连接不关闭,放到池中
|
|
|
|
|
+ pool.put((JdbcConnectionWrapper)realConnection, new Date());
|
|
|
|
|
+ return;
|
|
|
|
|
+ } else if(pool.size() <= maxSize && !(realConnection instanceof JdbcConnectionWrapper)) {
|
|
|
|
|
+ pool.put(new JdbcConnectionWrapper(proxyConnection, realConnection), new Date());
|
|
|
return;
|
|
return;
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
-
|
|
|
|
|
try {
|
|
try {
|
|
|
realConnection.close();
|
|
realConnection.close();
|
|
|
} finally {
|
|
} finally {
|
|
@@ -94,6 +118,35 @@ public class SimpleDataSource implements PooledDataSource {
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 关闭连接,这里是软关闭
|
|
|
|
|
+ * @param realConnection
|
|
|
|
|
+ * @throws SQLException
|
|
|
|
|
+ */
|
|
|
|
|
+ public void closeConnectionAndRemove(Connection realConnection) throws SQLException {
|
|
|
|
|
+ if(realConnection == null) {
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+ synchronized (pool) {
|
|
|
|
|
+ // 从缓存移除
|
|
|
|
|
+ if(realConnection instanceof JdbcConnectionWrapper) {
|
|
|
|
|
+ pool.remove((JdbcConnectionWrapper) realConnection);
|
|
|
|
|
+ } else {
|
|
|
|
|
+ pool.remove(realConnection);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ try {
|
|
|
|
|
+ if(realConnection instanceof JdbcConnectionWrapper) {
|
|
|
|
|
+ ((JdbcConnectionWrapper)realConnection).directClose();
|
|
|
|
|
+ } else {
|
|
|
|
|
+ realConnection.close();
|
|
|
|
|
+ }
|
|
|
|
|
+ } catch (Exception ignore) {
|
|
|
|
|
+ } finally {
|
|
|
|
|
+ semaphore.release();
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
public Connection getConnection() throws SQLException {
|
|
public Connection getConnection() throws SQLException {
|
|
|
return getConnection(null, null);
|
|
return getConnection(null, null);
|
|
|
}
|
|
}
|
|
@@ -101,54 +154,60 @@ public class SimpleDataSource implements PooledDataSource {
|
|
|
public Connection getConnection(String username, String password) throws SQLException {
|
|
public Connection getConnection(String username, String password) throws SQLException {
|
|
|
synchronized (pool) {
|
|
synchronized (pool) {
|
|
|
if (!pool.isEmpty()) {
|
|
if (!pool.isEmpty()) {
|
|
|
- Connection realConn = pool.keySet().iterator().next();
|
|
|
|
|
|
|
+ JdbcConnectionWrapper realConn = pool.keySet().iterator().next();
|
|
|
pool.remove(realConn);
|
|
pool.remove(realConn);
|
|
|
|
|
+ if(realConn.isValidFlag()) {
|
|
|
|
|
+ return realConn;
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
// hive jdbc 不支持设置 AutoCommit
|
|
// hive jdbc 不支持设置 AutoCommit
|
|
|
//realConn.setAutoCommit(true);
|
|
//realConn.setAutoCommit(true);
|
|
|
-
|
|
|
|
|
return getProxyConnection(realConn);
|
|
return getProxyConnection(realConn);
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
try {
|
|
try {
|
|
|
- if(semaphore.tryAcquire(waitTime,TimeUnit.SECONDS)) {
|
|
|
|
|
|
|
+ if (semaphore.tryAcquire(waitTime, TimeUnit.MILLISECONDS)) {
|
|
|
return getProxyConnection(getRealConnection(username, password));
|
|
return getProxyConnection(getRealConnection(username, password));
|
|
|
- }else {
|
|
|
|
|
- throw new IllegalArgumentException("Connection pool is full: "+maxSize);
|
|
|
|
|
|
|
+ } else {
|
|
|
|
|
+ throw new RuntimeException("Connection pool is full: " + maxSize);
|
|
|
}
|
|
}
|
|
|
- }catch(SQLException e) {
|
|
|
|
|
|
|
+ } catch (SQLException e) {
|
|
|
semaphore.release();
|
|
semaphore.release();
|
|
|
throw e;
|
|
throw e;
|
|
|
} catch (InterruptedException e) {
|
|
} catch (InterruptedException e) {
|
|
|
- log.error(e.getMessage(),e);
|
|
|
|
|
- Thread.currentThread().interrupt();
|
|
|
|
|
- return null;
|
|
|
|
|
|
|
+ throw new RuntimeException(e);
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- private Connection getProxyConnection(final Connection realConnection) {
|
|
|
|
|
|
|
+ private Connection getProxyConnection(final Connection realConnection) {
|
|
|
InvocationHandler handler = new InvocationHandler() {
|
|
InvocationHandler handler = new InvocationHandler() {
|
|
|
public Object invoke(Object proxy, Method method, Object[] params) throws Exception {
|
|
public Object invoke(Object proxy, Method method, Object[] params) throws Exception {
|
|
|
Object ret = null;
|
|
Object ret = null;
|
|
|
if ("close".equals(method.getName())) {
|
|
if ("close".equals(method.getName())) {
|
|
|
- closeConnection(realConnection);
|
|
|
|
|
- }else if ("unwrap".equals(method.getName())) {
|
|
|
|
|
- ret=realConnection;
|
|
|
|
|
|
|
+ closeConnection(realConnection, (Connection)proxy);
|
|
|
|
|
+ } else if ("directClose".equals(method.getName())) {
|
|
|
|
|
+ // 实际的关闭
|
|
|
|
|
+ try {
|
|
|
|
|
+ realConnection.close();
|
|
|
|
|
+ } catch (Exception ignore) {}
|
|
|
|
|
+ ret = Void.TYPE.newInstance();
|
|
|
|
|
+ } else if ("unwrap".equals(method.getName())) {
|
|
|
|
|
+ ret = realConnection;
|
|
|
} else {
|
|
} else {
|
|
|
ret = method.invoke(realConnection, params);
|
|
ret = method.invoke(realConnection, params);
|
|
|
}
|
|
}
|
|
|
return ret;
|
|
return ret;
|
|
|
}
|
|
}
|
|
|
};
|
|
};
|
|
|
- return (Connection) Proxy.newProxyInstance(Connection.class.getClassLoader(), new Class[] { Connection.class }, handler);
|
|
|
|
|
|
|
+ return new JdbcConnectionWrapper((JdbcConnection) Proxy.newProxyInstance(JdbcConnection.class.getClassLoader(), new Class[] { JdbcConnection.class }, handler), realConnection);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-
|
|
|
|
|
|
|
|
|
|
- protected Connection getRealConnection(String username, String password) throws SQLException {
|
|
|
|
|
|
|
+
|
|
|
|
|
+ public Connection getRealConnection(String username, String password) throws SQLException {
|
|
|
try {
|
|
try {
|
|
|
- return connectionService.getConnection();
|
|
|
|
|
|
|
+ return jdbcConnectionProvider.getConnection(kyuubiConf);
|
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
|
throw new SQLException(e);
|
|
throw new SQLException(e);
|
|
|
}
|
|
}
|
|
@@ -163,11 +222,10 @@ public class SimpleDataSource implements PooledDataSource {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
public void setLogWriter(PrintWriter out) throws SQLException {
|
|
public void setLogWriter(PrintWriter out) throws SQLException {
|
|
|
- throw new UnsupportedOperationException();
|
|
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
public void setLoginTimeout(int seconds) throws SQLException {
|
|
public void setLoginTimeout(int seconds) throws SQLException {
|
|
|
- throw new UnsupportedOperationException();
|
|
|
|
|
|
|
+
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
public int getLoginTimeout() throws SQLException {
|
|
public int getLoginTimeout() throws SQLException {
|
|
@@ -206,4 +264,12 @@ public class SimpleDataSource implements PooledDataSource {
|
|
|
public Properties getConnProps() {
|
|
public Properties getConnProps() {
|
|
|
return connProps;
|
|
return connProps;
|
|
|
}
|
|
}
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 返回内置的绑定
|
|
|
|
|
+ * @return
|
|
|
|
|
+ */
|
|
|
|
|
+ public Set<JdbcConnectionWrapper> getPoolSet() {
|
|
|
|
|
+ return pool.keySet();
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|