package com.primeton.dsp.datarelease.data.bdata;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.conf.Configuration;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.List;
import java.util.Properties;
/**
*
*
* Created by zhaopx.
* User: zhaopx
* Date: 2020/4/20
* Time: 17:56
*
*
*
* @author zhaopx merge
*/
@Slf4j
public class HiveHelper {
private static final String ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME = "Client";
private static final String ZOOKEEPER_SERVER_PRINCIPAL_KEY = "zookeeper.server.principal";
private static final String ZOOKEEPER_DEFAULT_SERVER_PRINCIPAL = "zookeeper/hadoop";
private String hiveclientPropFile = null;
private String krb5File = null;
private String userName = null;
private String userKeytabFile = null;
private String url = null;
private String zkQuorum = null;// zookeeper节点ip和端口列表
private String auth = null;
private String sasl_qop = null;
private String zooKeeperNamespace = null;
private String serviceDiscoveryMode = null;
private String principal = null;
private Deque pools = new ArrayDeque();
private List allConnection = new ArrayList();
private final Configuration conf;
public HiveHelper(@NonNull Configuration conf,
@NonNull String hiveclientPropFile,
String userName,
String userKeytabFile,
String krb5File) throws IOException {
this.conf = conf;
this.hiveclientPropFile = hiveclientPropFile;
this.userName = userName;
this.userKeytabFile = userKeytabFile;
this.krb5File = krb5File;
this.init();
}
private void init() throws IOException {
Properties clientInfo = null;
InputStream fileInputStream = null;
try {
clientInfo = new Properties();
// "hiveclient.properties"为客户端配置文件,如果使用多实例特性,需要把该文件换成对应实例客户端下的"hiveclient.properties"
// "hiveclient.properties"文件位置在对应实例客户端安裝包解压目录下的config目录下
File propertiesFile = new File(this.hiveclientPropFile);
fileInputStream = new FileInputStream(propertiesFile);
clientInfo.load(fileInputStream);
} catch (Exception e) {
throw new IOException(e);
} finally {
if (fileInputStream != null) {
fileInputStream.close();
fileInputStream = null;
}
}
// zkQuorum获取后的格式为"xxx.xxx.xxx.xxx:24002,xxx.xxx.xxx.xxx:24002,xxx.xxx.xxx.xxx:24002";
// "xxx.xxx.xxx.xxx"为集群中ZooKeeper所在节点的业务IP,端口默认是24002
zkQuorum = clientInfo.getProperty("zk.quorum");
auth = clientInfo.getProperty("auth");
sasl_qop = clientInfo.getProperty("sasl.qop");
zooKeeperNamespace = clientInfo.getProperty("zooKeeperNamespace");
serviceDiscoveryMode = clientInfo.getProperty("serviceDiscoveryMode");
principal = clientInfo.getProperty("principal");
// 拼接JDBC URL
StringBuilder sBuilder = new StringBuilder("jdbc:hive2://").append(zkQuorum).append("/");
if ("KERBEROS".equalsIgnoreCase(auth)) {
// 设置客户端的keytab和krb5文件路径
LoginUtil.setJaasConf(ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME, this.userName, this.userKeytabFile);
LoginUtil.setZookeeperServerPrincipal(ZOOKEEPER_SERVER_PRINCIPAL_KEY, ZOOKEEPER_DEFAULT_SERVER_PRINCIPAL);
// 安全模式
this.conf.set("hadoop.security.authentication", "kerberos");
// Zookeeper登录认证
LoginUtil.login(this.userName, this.userKeytabFile, this.krb5File, this.conf);
sBuilder.append(";serviceDiscoveryMode=").append(serviceDiscoveryMode).append(";zooKeeperNamespace=")
.append(zooKeeperNamespace).append(";sasl.qop=").append(sasl_qop).append(";auth=").append(auth)
.append(";principal=").append(principal).append(";");
} else {
// 普通模式
sBuilder.append(";serviceDiscoveryMode=").append(serviceDiscoveryMode).append(";zooKeeperNamespace=")
.append(zooKeeperNamespace).append(";auth=none");
}
this.url = sBuilder.toString();
}
public Connection getConnection() throws SQLException {
return DriverManager.getConnection(url, "", "");
}
public synchronized Connection getPoolConnection() throws SQLException{
if(!pools.isEmpty()) {
return pools.removeLast();
}
Connection conn = getConnection();
allConnection.add(conn);
return conn;
}
public synchronized void returnConnection(Connection conn) {
pools.addFirst(conn);
}
public synchronized void closePoolConnection() {
for(Connection conn : allConnection) {
close(conn);
}
}
public static void close(Connection conn) {
if(conn != null) {
try {
conn.close();
} catch (SQLException e) {
log.error("关闭Hive连接失败.", e);
}
}
}
public static void close(Statement statment, ResultSet rs) {
if(rs != null) {
try {
rs.close();
} catch (SQLException e) {}
}
if(statment != null) {
try {
statment.close();
} catch (SQLException e) {}
}
}
}