HiveHelper.java 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. package com.primeton.dsp.datarelease.data.bdata;
  2. import lombok.NonNull;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.apache.hadoop.conf.Configuration;
  5. import java.io.File;
  6. import java.io.FileInputStream;
  7. import java.io.IOException;
  8. import java.io.InputStream;
  9. import java.sql.Connection;
  10. import java.sql.DriverManager;
  11. import java.sql.ResultSet;
  12. import java.sql.SQLException;
  13. import java.sql.Statement;
  14. import java.util.ArrayDeque;
  15. import java.util.ArrayList;
  16. import java.util.Deque;
  17. import java.util.List;
  18. import java.util.Properties;
  19. /**
  20. * <pre>
  21. *
  22. * Created by zhaopx.
  23. * User: zhaopx
  24. * Date: 2020/4/20
  25. * Time: 17:56
  26. *
  27. * </pre>
  28. *
  29. * @author zhaopx merge
  30. */
  31. @Slf4j
  32. public class HiveHelper {
  33. private static final String ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME = "Client";
  34. private static final String ZOOKEEPER_SERVER_PRINCIPAL_KEY = "zookeeper.server.principal";
  35. private static final String ZOOKEEPER_DEFAULT_SERVER_PRINCIPAL = "zookeeper/hadoop";
  36. private String hiveclientPropFile = null;
  37. private String krb5File = null;
  38. private String userName = null;
  39. private String userKeytabFile = null;
  40. private String url = null;
  41. private String zkQuorum = null;// zookeeper节点ip和端口列表
  42. private String auth = null;
  43. private String sasl_qop = null;
  44. private String zooKeeperNamespace = null;
  45. private String serviceDiscoveryMode = null;
  46. private String principal = null;
  47. private Deque<Connection> pools = new ArrayDeque<Connection>();
  48. private List<Connection> allConnection = new ArrayList<Connection>();
  49. private final Configuration conf;
  50. public HiveHelper(@NonNull Configuration conf,
  51. @NonNull String hiveclientPropFile,
  52. String userName,
  53. String userKeytabFile,
  54. String krb5File) throws IOException {
  55. this.conf = conf;
  56. this.hiveclientPropFile = hiveclientPropFile;
  57. this.userName = userName;
  58. this.userKeytabFile = userKeytabFile;
  59. this.krb5File = krb5File;
  60. this.init();
  61. }
  62. private void init() throws IOException {
  63. Properties clientInfo = null;
  64. InputStream fileInputStream = null;
  65. try {
  66. clientInfo = new Properties();
  67. // "hiveclient.properties"为客户端配置文件,如果使用多实例特性,需要把该文件换成对应实例客户端下的"hiveclient.properties"
  68. // "hiveclient.properties"文件位置在对应实例客户端安裝包解压目录下的config目录下
  69. File propertiesFile = new File(this.hiveclientPropFile);
  70. fileInputStream = new FileInputStream(propertiesFile);
  71. clientInfo.load(fileInputStream);
  72. } catch (Exception e) {
  73. throw new IOException(e);
  74. } finally {
  75. if (fileInputStream != null) {
  76. fileInputStream.close();
  77. fileInputStream = null;
  78. }
  79. }
  80. // zkQuorum获取后的格式为"xxx.xxx.xxx.xxx:24002,xxx.xxx.xxx.xxx:24002,xxx.xxx.xxx.xxx:24002";
  81. // "xxx.xxx.xxx.xxx"为集群中ZooKeeper所在节点的业务IP,端口默认是24002
  82. zkQuorum = clientInfo.getProperty("zk.quorum");
  83. auth = clientInfo.getProperty("auth");
  84. sasl_qop = clientInfo.getProperty("sasl.qop");
  85. zooKeeperNamespace = clientInfo.getProperty("zooKeeperNamespace");
  86. serviceDiscoveryMode = clientInfo.getProperty("serviceDiscoveryMode");
  87. principal = clientInfo.getProperty("principal");
  88. // 拼接JDBC URL
  89. StringBuilder sBuilder = new StringBuilder("jdbc:hive2://").append(zkQuorum).append("/");
  90. if ("KERBEROS".equalsIgnoreCase(auth)) {
  91. // 设置客户端的keytab和krb5文件路径
  92. LoginUtil.setJaasConf(ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME, this.userName, this.userKeytabFile);
  93. LoginUtil.setZookeeperServerPrincipal(ZOOKEEPER_SERVER_PRINCIPAL_KEY, ZOOKEEPER_DEFAULT_SERVER_PRINCIPAL);
  94. // 安全模式
  95. this.conf.set("hadoop.security.authentication", "kerberos");
  96. // Zookeeper登录认证
  97. LoginUtil.login(this.userName, this.userKeytabFile, this.krb5File, this.conf);
  98. sBuilder.append(";serviceDiscoveryMode=").append(serviceDiscoveryMode).append(";zooKeeperNamespace=")
  99. .append(zooKeeperNamespace).append(";sasl.qop=").append(sasl_qop).append(";auth=").append(auth)
  100. .append(";principal=").append(principal).append(";");
  101. } else {
  102. // 普通模式
  103. sBuilder.append(";serviceDiscoveryMode=").append(serviceDiscoveryMode).append(";zooKeeperNamespace=")
  104. .append(zooKeeperNamespace).append(";auth=none");
  105. }
  106. this.url = sBuilder.toString();
  107. }
  108. public Connection getConnection() throws SQLException {
  109. return DriverManager.getConnection(url, "", "");
  110. }
  111. public synchronized Connection getPoolConnection() throws SQLException{
  112. if(!pools.isEmpty()) {
  113. return pools.removeLast();
  114. }
  115. Connection conn = getConnection();
  116. allConnection.add(conn);
  117. return conn;
  118. }
  119. public synchronized void returnConnection(Connection conn) {
  120. pools.addFirst(conn);
  121. }
  122. public synchronized void closePoolConnection() {
  123. for(Connection conn : allConnection) {
  124. close(conn);
  125. }
  126. }
  127. public static void close(Connection conn) {
  128. if(conn != null) {
  129. try {
  130. conn.close();
  131. } catch (SQLException e) {
  132. log.error("关闭Hive连接失败.", e);
  133. }
  134. }
  135. }
  136. public static void close(Statement statment, ResultSet rs) {
  137. if(rs != null) {
  138. try {
  139. rs.close();
  140. } catch (SQLException e) {}
  141. }
  142. if(statment != null) {
  143. try {
  144. statment.close();
  145. } catch (SQLException e) {}
  146. }
  147. }
  148. }