package com.primeton.dsp.datarelease.data.bdata; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; import org.apache.hadoop.conf.Configuration; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Deque; import java.util.List; import java.util.Properties; /** *
 *
 * Created by zhaopx.
 * User: zhaopx
 * Date: 2020/4/20
 * Time: 17:56
 *
 * 
* * @author zhaopx merge */ @Slf4j public class HiveHelper { private static final String ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME = "Client"; private static final String ZOOKEEPER_SERVER_PRINCIPAL_KEY = "zookeeper.server.principal"; private static final String ZOOKEEPER_DEFAULT_SERVER_PRINCIPAL = "zookeeper/hadoop"; private String hiveclientPropFile = null; private String krb5File = null; private String userName = null; private String userKeytabFile = null; private String url = null; private String zkQuorum = null;// zookeeper节点ip和端口列表 private String auth = null; private String sasl_qop = null; private String zooKeeperNamespace = null; private String serviceDiscoveryMode = null; private String principal = null; private Deque pools = new ArrayDeque(); private List allConnection = new ArrayList(); private final Configuration conf; public HiveHelper(@NonNull Configuration conf, @NonNull String hiveclientPropFile, String userName, String userKeytabFile, String krb5File) throws IOException { this.conf = conf; this.hiveclientPropFile = hiveclientPropFile; this.userName = userName; this.userKeytabFile = userKeytabFile; this.krb5File = krb5File; this.init(); } private void init() throws IOException { Properties clientInfo = null; InputStream fileInputStream = null; try { clientInfo = new Properties(); // "hiveclient.properties"为客户端配置文件,如果使用多实例特性,需要把该文件换成对应实例客户端下的"hiveclient.properties" // "hiveclient.properties"文件位置在对应实例客户端安裝包解压目录下的config目录下 File propertiesFile = new File(this.hiveclientPropFile); fileInputStream = new FileInputStream(propertiesFile); clientInfo.load(fileInputStream); } catch (Exception e) { throw new IOException(e); } finally { if (fileInputStream != null) { fileInputStream.close(); fileInputStream = null; } } // zkQuorum获取后的格式为"xxx.xxx.xxx.xxx:24002,xxx.xxx.xxx.xxx:24002,xxx.xxx.xxx.xxx:24002"; // "xxx.xxx.xxx.xxx"为集群中ZooKeeper所在节点的业务IP,端口默认是24002 zkQuorum = clientInfo.getProperty("zk.quorum"); auth = clientInfo.getProperty("auth"); sasl_qop = clientInfo.getProperty("sasl.qop"); zooKeeperNamespace = clientInfo.getProperty("zooKeeperNamespace"); serviceDiscoveryMode = clientInfo.getProperty("serviceDiscoveryMode"); principal = clientInfo.getProperty("principal"); // 拼接JDBC URL StringBuilder sBuilder = new StringBuilder("jdbc:hive2://").append(zkQuorum).append("/"); if ("KERBEROS".equalsIgnoreCase(auth)) { // 设置客户端的keytab和krb5文件路径 LoginUtil.setJaasConf(ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME, this.userName, this.userKeytabFile); LoginUtil.setZookeeperServerPrincipal(ZOOKEEPER_SERVER_PRINCIPAL_KEY, ZOOKEEPER_DEFAULT_SERVER_PRINCIPAL); // 安全模式 this.conf.set("hadoop.security.authentication", "kerberos"); // Zookeeper登录认证 LoginUtil.login(this.userName, this.userKeytabFile, this.krb5File, this.conf); sBuilder.append(";serviceDiscoveryMode=").append(serviceDiscoveryMode).append(";zooKeeperNamespace=") .append(zooKeeperNamespace).append(";sasl.qop=").append(sasl_qop).append(";auth=").append(auth) .append(";principal=").append(principal).append(";"); } else { // 普通模式 sBuilder.append(";serviceDiscoveryMode=").append(serviceDiscoveryMode).append(";zooKeeperNamespace=") .append(zooKeeperNamespace).append(";auth=none"); } this.url = sBuilder.toString(); } public Connection getConnection() throws SQLException { return DriverManager.getConnection(url, "", ""); } public synchronized Connection getPoolConnection() throws SQLException{ if(!pools.isEmpty()) { return pools.removeLast(); } Connection conn = getConnection(); allConnection.add(conn); return conn; } public synchronized void returnConnection(Connection conn) { pools.addFirst(conn); } public synchronized void closePoolConnection() { for(Connection conn : allConnection) { close(conn); } } public static void close(Connection conn) { if(conn != null) { try { conn.close(); } catch (SQLException e) { log.error("关闭Hive连接失败.", e); } } } public static void close(Statement statment, ResultSet rs) { if(rs != null) { try { rs.close(); } catch (SQLException e) {} } if(statment != null) { try { statment.close(); } catch (SQLException e) {} } } }