SimpleHBaseConnectionServiceImpl.java 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. package com.primeton.damp.bigdata;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.apache.commons.lang.StringUtils;
  4. import org.apache.hadoop.conf.Configuration;
  5. import org.apache.hadoop.hbase.HBaseConfiguration;
  6. import org.apache.hadoop.hbase.TableName;
  7. import org.apache.hadoop.hbase.client.Admin;
  8. import org.apache.hadoop.hbase.client.Connection;
  9. import org.apache.hadoop.hbase.client.ConnectionFactory;
  10. import org.apache.hadoop.hbase.client.Table;
  11. import org.apache.hadoop.hbase.exceptions.HBaseException;
  12. import java.io.Closeable;
  13. import java.io.File;
  14. import java.io.IOException;
  15. import java.util.Properties;
  16. /**
  17. *
  18. * HBase 无认证,连接 ZK 获得连接
  19. *
  20. *
  21. * <pre>
  22. *
  23. * Created by zhaopx.
  24. * User: zhaopx
  25. * Date: 2020/4/21
  26. * Time: 18:02
  27. *
  28. * </pre>
  29. *
  30. * @author zhaopx
  31. */
  32. @Slf4j
  33. public class SimpleHBaseConnectionServiceImpl implements HBaseConnectionService, Closeable {
  34. /**
  35. * Hive 数据源
  36. */
  37. final Properties hbaseResource;
  38. /**
  39. * HBase 链接
  40. */
  41. final Connection connection;
  42. public SimpleHBaseConnectionServiceImpl(Properties hbaseResource) {
  43. this.hbaseResource = hbaseResource;
  44. String authBasePath = hbaseResource.getProperty("authBasePath");
  45. Configuration hbaseConf = null;
  46. if(StringUtils.isNotBlank(authBasePath)) {
  47. AuthPrincipalCreator authPrincipalCreator = AuthPrincipalCreator.useExtractorConf(hbaseResource.getProperty("authBasePath"));
  48. AuthPrincipal kerberosPrincipal = authPrincipalCreator.getKerberosPrincipal(hbaseResource.getProperty("authUser"));
  49. // 分别加载 core、hdfs、hbase site 文件
  50. Configuration conf = new Configuration();
  51. try {
  52. if (kerberosPrincipal.getCoreSite() != null) {
  53. conf.addResource(kerberosPrincipal.getCoreSite().toURL());
  54. log.info("add config: {}", kerberosPrincipal.getCoreSite().getAbsolutePath());
  55. }
  56. if (kerberosPrincipal.getHdfsSite() != null) {
  57. conf.addResource(kerberosPrincipal.getHdfsSite().toURL());
  58. log.info("add config: {}", kerberosPrincipal.getHdfsSite().getAbsolutePath());
  59. }
  60. conf.reloadConfiguration();
  61. hbaseConf = HBaseConfiguration.create(conf);
  62. if (kerberosPrincipal.getHBaseSite() != null) {
  63. hbaseConf.addResource(kerberosPrincipal.getHBaseSite().toURL());
  64. log.info("add config: {}", kerberosPrincipal.getHBaseSite().getAbsolutePath());
  65. }
  66. hbaseConf.reloadConfiguration();
  67. } catch (Exception e) {
  68. throw new IllegalStateException(e);
  69. }
  70. }
  71. try {
  72. // 连接时,只尝试 5 次
  73. hbaseConf.setInt("zookeeper.recovery.retry", 3);
  74. hbaseConf.setInt("hbase.client.retries.number", 2);
  75. this.connection = ConnectionFactory.createConnection(hbaseConf);
  76. } catch (Exception e) {
  77. throw new IllegalStateException(e);
  78. }
  79. }
  80. @Override
  81. public boolean doAuth() {
  82. log.info("hbase 无需认证,通过。");
  83. return true;
  84. }
  85. @Override
  86. public Admin getConnection() throws HBaseException {
  87. try {
  88. return connection.getAdmin();
  89. } catch (Exception e) {
  90. throw new HBaseException("连接 HBase 异常。", e);
  91. }
  92. }
  93. @Override
  94. public Table getTable(String tableName) throws HBaseException {
  95. try {
  96. return connection.getTable(TableName.valueOf(tableName));
  97. } catch (IOException e) {
  98. throw new HBaseException("无法获取Hbase " + tableName + " 表链接。", e);
  99. }
  100. }
  101. @Override
  102. public void close() throws IOException {
  103. log.info("关闭 HBase 连接。");
  104. if(connection != null) {
  105. connection.close();
  106. }
  107. }
  108. }