123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180 |
- package com.primeton.dsp.datarelease.data.bdata;
- import lombok.NonNull;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.hadoop.conf.Configuration;
- import java.io.File;
- import java.io.FileInputStream;
- import java.io.IOException;
- import java.io.InputStream;
- import java.sql.Connection;
- import java.sql.DriverManager;
- import java.sql.ResultSet;
- import java.sql.SQLException;
- import java.sql.Statement;
- import java.util.ArrayDeque;
- import java.util.ArrayList;
- import java.util.Deque;
- import java.util.List;
- import java.util.Properties;
- /**
- * <pre>
- *
- * Created by zhaopx.
- * User: zhaopx
- * Date: 2020/4/20
- * Time: 17:56
- *
- * </pre>
- *
- * @author zhaopx merge
- */
- @Slf4j
- public class HiveHelper {
- private static final String ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME = "Client";
- private static final String ZOOKEEPER_SERVER_PRINCIPAL_KEY = "zookeeper.server.principal";
- private static final String ZOOKEEPER_DEFAULT_SERVER_PRINCIPAL = "zookeeper/hadoop";
- private String hiveclientPropFile = null;
- private String krb5File = null;
- private String userName = null;
- private String userKeytabFile = null;
- private String url = null;
- private String zkQuorum = null;// zookeeper节点ip和端口列表
- private String auth = null;
- private String sasl_qop = null;
- private String zooKeeperNamespace = null;
- private String serviceDiscoveryMode = null;
- private String principal = null;
- private Deque<Connection> pools = new ArrayDeque<Connection>();
- private List<Connection> allConnection = new ArrayList<Connection>();
- private final Configuration conf;
- public HiveHelper(@NonNull Configuration conf,
- @NonNull String hiveclientPropFile,
- String userName,
- String userKeytabFile,
- String krb5File) throws IOException {
- this.conf = conf;
- this.hiveclientPropFile = hiveclientPropFile;
- this.userName = userName;
- this.userKeytabFile = userKeytabFile;
- this.krb5File = krb5File;
- this.init();
- }
- private void init() throws IOException {
- Properties clientInfo = null;
- InputStream fileInputStream = null;
- try {
- clientInfo = new Properties();
- // "hiveclient.properties"为客户端配置文件,如果使用多实例特性,需要把该文件换成对应实例客户端下的"hiveclient.properties"
- // "hiveclient.properties"文件位置在对应实例客户端安裝包解压目录下的config目录下
- File propertiesFile = new File(this.hiveclientPropFile);
- fileInputStream = new FileInputStream(propertiesFile);
- clientInfo.load(fileInputStream);
- } catch (Exception e) {
- throw new IOException(e);
- } finally {
- if (fileInputStream != null) {
- fileInputStream.close();
- fileInputStream = null;
- }
- }
- // zkQuorum获取后的格式为"xxx.xxx.xxx.xxx:24002,xxx.xxx.xxx.xxx:24002,xxx.xxx.xxx.xxx:24002";
- // "xxx.xxx.xxx.xxx"为集群中ZooKeeper所在节点的业务IP,端口默认是24002
- zkQuorum = clientInfo.getProperty("zk.quorum");
- auth = clientInfo.getProperty("auth");
- sasl_qop = clientInfo.getProperty("sasl.qop");
- zooKeeperNamespace = clientInfo.getProperty("zooKeeperNamespace");
- serviceDiscoveryMode = clientInfo.getProperty("serviceDiscoveryMode");
- principal = clientInfo.getProperty("principal");
- // 拼接JDBC URL
- StringBuilder sBuilder = new StringBuilder("jdbc:hive2://").append(zkQuorum).append("/");
- if ("KERBEROS".equalsIgnoreCase(auth)) {
- // 设置客户端的keytab和krb5文件路径
- LoginUtil.setJaasConf(ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME, this.userName, this.userKeytabFile);
- LoginUtil.setZookeeperServerPrincipal(ZOOKEEPER_SERVER_PRINCIPAL_KEY, ZOOKEEPER_DEFAULT_SERVER_PRINCIPAL);
- // 安全模式
- this.conf.set("hadoop.security.authentication", "kerberos");
- // Zookeeper登录认证
- LoginUtil.login(this.userName, this.userKeytabFile, this.krb5File, this.conf);
- sBuilder.append(";serviceDiscoveryMode=").append(serviceDiscoveryMode).append(";zooKeeperNamespace=")
- .append(zooKeeperNamespace).append(";sasl.qop=").append(sasl_qop).append(";auth=").append(auth)
- .append(";principal=").append(principal).append(";");
- } else {
- // 普通模式
- sBuilder.append(";serviceDiscoveryMode=").append(serviceDiscoveryMode).append(";zooKeeperNamespace=")
- .append(zooKeeperNamespace).append(";auth=none");
- }
- this.url = sBuilder.toString();
- }
- public Connection getConnection() throws SQLException {
- return DriverManager.getConnection(url, "", "");
- }
- public synchronized Connection getPoolConnection() throws SQLException{
- if(!pools.isEmpty()) {
- return pools.removeLast();
- }
- Connection conn = getConnection();
- allConnection.add(conn);
- return conn;
- }
- public synchronized void returnConnection(Connection conn) {
- pools.addFirst(conn);
- }
- public synchronized void closePoolConnection() {
- for(Connection conn : allConnection) {
- close(conn);
- }
- }
- public static void close(Connection conn) {
- if(conn != null) {
- try {
- conn.close();
- } catch (SQLException e) {
- log.error("关闭Hive连接失败.", e);
- }
- }
- }
- public static void close(Statement statment, ResultSet rs) {
- if(rs != null) {
- try {
- rs.close();
- } catch (SQLException e) {}
- }
- if(statment != null) {
- try {
- statment.close();
- } catch (SQLException e) {}
- }
- }
- }
|