Krb5HBaseConnectionServiceImpl.java 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. package com.primeton.dsp.datarelease.data.bdata;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.apache.hadoop.conf.Configuration;
  4. import org.apache.hadoop.hbase.HBaseConfiguration;
  5. import org.apache.hadoop.hbase.TableName;
  6. import org.apache.hadoop.hbase.client.Admin;
  7. import org.apache.hadoop.hbase.client.Connection;
  8. import org.apache.hadoop.hbase.client.ConnectionFactory;
  9. import org.apache.hadoop.hbase.client.Table;
  10. import org.apache.hadoop.hbase.exceptions.HBaseException;
  11. import java.io.Closeable;
  12. import java.io.IOException;
  13. import java.util.Properties;
  14. import java.util.Set;
  15. /**
  16. *
  17. * Hive Kerberos 认证方式获得连接
  18. *
  19. *
  20. * <pre>
  21. *
  22. * Created by zhaopx.
  23. * User: zhaopx
  24. * Date: 2020/4/22
  25. * Time: 18:02
  26. *
  27. * </pre>
  28. *
  29. * @author zhaopx
  30. */
  31. @Slf4j
  32. public class Krb5HBaseConnectionServiceImpl implements HBaseConnectionService, Closeable {
  33. /**
  34. * Hive 数据源
  35. */
  36. final Properties hbaseResource;
  37. /**
  38. * HBase 链接
  39. */
  40. Connection connection;
  41. public Krb5HBaseConnectionServiceImpl(Properties hbaseResource) {
  42. this.hbaseResource = hbaseResource;
  43. }
  44. @Override
  45. public boolean doAuth() {
  46. //KrbUser = "hadoop/cdh-node1@HADOOP.COM";
  47. log.info("hbase 开始 kerberos 认证。");
  48. AuthPrincipalCreator authPrincipalCreator = AuthPrincipalCreator.useExtractorConf(hbaseResource.getProperty("authBasePath"));
  49. Set<String> principals = authPrincipalCreator.listPrincipals();
  50. log.info("find existed principals: {}", principals);
  51. AuthPrincipal kerberosPrincipal = authPrincipalCreator.getKerberosPrincipal(hbaseResource.getProperty("authUser"));
  52. String userKeytab = kerberosPrincipal.getUserKeytabFile().getAbsolutePath();
  53. String krb5File = kerberosPrincipal.getKrb5File().getAbsolutePath();
  54. String krbUser = kerberosPrincipal.getPrincipal();
  55. // 分别加载 core、hdfs、hbase site 文件
  56. Configuration conf = new Configuration();
  57. try {
  58. if (kerberosPrincipal.getCoreSite() != null) {
  59. conf.addResource(kerberosPrincipal.getCoreSite().toURL());
  60. log.info("add config: {}", kerberosPrincipal.getCoreSite().getAbsolutePath());
  61. }
  62. if (kerberosPrincipal.getHdfsSite() != null) {
  63. conf.addResource(kerberosPrincipal.getHdfsSite().toURL());
  64. log.info("add config: {}", kerberosPrincipal.getHdfsSite().getAbsolutePath());
  65. }
  66. conf.reloadConfiguration();
  67. Configuration hbaseConf = HBaseConfiguration.create(conf);
  68. if (kerberosPrincipal.getHBaseSite() != null) {
  69. hbaseConf.addResource(kerberosPrincipal.getHBaseSite().toURL());
  70. log.info("add config: {}", kerberosPrincipal.getHBaseSite().getAbsolutePath());
  71. }
  72. hbaseConf.reloadConfiguration();
  73. // Kerberos 认证
  74. KerberosUtil.loginKerberos(hbaseConf, krbUser, userKeytab, krb5File);
  75. connection = ConnectionFactory.createConnection(hbaseConf);
  76. log.info("hbase kerberos 认证通过。");
  77. return true;
  78. } catch (Exception e) {
  79. throw new SecurityException("HBase Kerberos 认证异常。", e);
  80. }
  81. }
  82. @Override
  83. public Admin getConnection() throws HBaseException {
  84. try {
  85. return connection.getAdmin();
  86. } catch (Exception e) {
  87. throw new HBaseException("连接 HBase 异常。", e);
  88. }
  89. }
  90. @Override
  91. public Table getTable(String tableName) throws HBaseException {
  92. try {
  93. return connection.getTable(TableName.valueOf(tableName));
  94. } catch (IOException e) {
  95. throw new HBaseException("无法获取Hbase " + tableName + " 表链接。", e);
  96. }
  97. }
  98. @Override
  99. public void close() throws IOException {
  100. log.info("关闭 HBase 连接。");
  101. if(connection != null) {
  102. connection.close();
  103. }
  104. }
  105. }