FIHiveConnectionServiceImpl.java 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. package com.primeton.dsp.datarelease.data.bdata;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.apache.hadoop.conf.Configuration;
  4. import org.apache.hadoop.hive.conf.HiveConf;
  5. import java.sql.Connection;
  6. import java.sql.SQLException;
  7. import java.util.Properties;
  8. import java.util.Set;
  9. /**
  10. *
  11. * Hive Kerberos 认证方式获得连接
  12. *
  13. *
  14. * <pre>
  15. *
  16. * Created by zhaopx.
  17. * User: zhaopx
  18. * Date: 2020/4/22
  19. * Time: 18:02
  20. *
  21. * </pre>
  22. *
  23. * @author zhaopx
  24. */
  25. @Slf4j
  26. public class FIHiveConnectionServiceImpl implements HiveConnectionService {
  27. /**
  28. * Hive 数据源
  29. */
  30. final Properties hiveResource;
  31. /**
  32. * 认证文件所在的基础目录
  33. */
  34. final String authBasePath;
  35. private HiveHelper hiveHelper;
  36. public FIHiveConnectionServiceImpl(Properties params) {
  37. this.hiveResource = params;
  38. this.authBasePath = params.getProperty("authBasePath");
  39. }
  40. @Override
  41. public boolean doAuth() {
  42. AuthPrincipalCreator authPrincipalCreator = AuthPrincipalCreator.useExtractorConf(authBasePath);
  43. Set<String> principals = authPrincipalCreator.listPrincipals();
  44. log.info("find existed principals: {}", principals);
  45. AuthPrincipal kerberosPrincipal = authPrincipalCreator.getKerberosPrincipal(hiveResource.getProperty("hiveDbUser"));
  46. String userKeytabFile = kerberosPrincipal.getUserKeytabFile().getAbsolutePath();
  47. String krb5File = kerberosPrincipal.getKrb5File().getAbsolutePath();
  48. String krbUser = kerberosPrincipal.getPrincipal();
  49. String hiveclientPropFile = kerberosPrincipal.getHiveClientFile().getAbsolutePath();
  50. // 分别加载 core、hdfs、hive site 文件
  51. Configuration conf = new Configuration();
  52. try {
  53. if (kerberosPrincipal.getCoreSite() != null) {
  54. conf.addResource(kerberosPrincipal.getCoreSite().toURL());
  55. log.info("add config: {}", kerberosPrincipal.getCoreSite().getAbsolutePath());
  56. }
  57. if (kerberosPrincipal.getHdfsSite() != null) {
  58. conf.addResource(kerberosPrincipal.getHdfsSite().toURL());
  59. log.info("add config: {}", kerberosPrincipal.getHdfsSite().getAbsolutePath());
  60. }
  61. if (kerberosPrincipal.getHiveSite() != null) {
  62. conf.addResource(kerberosPrincipal.getHiveSite().toURL());
  63. log.info("add config: {}", kerberosPrincipal.getHiveSite().getAbsolutePath());
  64. }
  65. } catch (Exception e) {
  66. throw new IllegalStateException(e);
  67. }
  68. try {
  69. this.hiveHelper = new HiveHelper(conf, hiveclientPropFile, krbUser, userKeytabFile, krb5File);
  70. log.info("hive fusioninsight 认证通过。");
  71. return true;
  72. } catch (Exception e) {
  73. throw new SecurityException("FI 认证失败。", e);
  74. }
  75. }
  76. @Override
  77. public Connection getConnection() throws SQLException {
  78. try {
  79. Class.forName("org.apache.hive.jdbc.HiveDriver");
  80. } catch (ClassNotFoundException e) {
  81. throw new SQLException("找不到Hive驱动:org.apache.hive.jdbc.HiveDriver.", e);
  82. }
  83. return hiveHelper.getPoolConnection();
  84. }
  85. }