package com.primeton.damp.bigdata; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.exceptions.HBaseException; import java.io.Closeable; import java.io.File; import java.io.IOException; import java.util.Properties; /** * * HBase 无认证,连接 ZK 获得连接 * * *
 *
 * Created by zhaopx.
 * User: zhaopx
 * Date: 2020/4/21
 * Time: 18:02
 *
 * 
* * @author zhaopx */ @Slf4j public class SimpleHBaseConnectionServiceImpl implements HBaseConnectionService, Closeable { /** * Hive 数据源 */ final Properties hbaseResource; /** * HBase 链接 */ final Connection connection; public SimpleHBaseConnectionServiceImpl(Properties hbaseResource) { this.hbaseResource = hbaseResource; String authBasePath = hbaseResource.getProperty("authBasePath"); Configuration hbaseConf = null; if(StringUtils.isNotBlank(authBasePath)) { AuthPrincipalCreator authPrincipalCreator = AuthPrincipalCreator.useExtractorConf(hbaseResource.getProperty("authBasePath")); AuthPrincipal kerberosPrincipal = authPrincipalCreator.getKerberosPrincipal(hbaseResource.getProperty("authUser")); // 分别加载 core、hdfs、hbase site 文件 Configuration conf = new Configuration(); try { if (kerberosPrincipal.getCoreSite() != null) { conf.addResource(kerberosPrincipal.getCoreSite().toURL()); log.info("add config: {}", kerberosPrincipal.getCoreSite().getAbsolutePath()); } if (kerberosPrincipal.getHdfsSite() != null) { conf.addResource(kerberosPrincipal.getHdfsSite().toURL()); log.info("add config: {}", kerberosPrincipal.getHdfsSite().getAbsolutePath()); } conf.reloadConfiguration(); hbaseConf = HBaseConfiguration.create(conf); if (kerberosPrincipal.getHBaseSite() != null) { hbaseConf.addResource(kerberosPrincipal.getHBaseSite().toURL()); log.info("add config: {}", kerberosPrincipal.getHBaseSite().getAbsolutePath()); } hbaseConf.reloadConfiguration(); } catch (Exception e) { throw new IllegalStateException(e); } } try { // 连接时,只尝试 5 次 hbaseConf.setInt("zookeeper.recovery.retry", 3); hbaseConf.setInt("hbase.client.retries.number", 2); this.connection = ConnectionFactory.createConnection(hbaseConf); } catch (Exception e) { throw new IllegalStateException(e); } } @Override public boolean doAuth() { log.info("hbase 无需认证,通过。"); return true; } @Override public Admin getConnection() throws HBaseException { try { return connection.getAdmin(); } catch (Exception e) { throw new HBaseException("连接 HBase 异常。", e); } } @Override public Table getTable(String tableName) throws HBaseException { try { return connection.getTable(TableName.valueOf(tableName)); } catch (IOException e) { throw new HBaseException("无法获取Hbase " + tableName + " 表链接。", e); } } @Override public void close() throws IOException { log.info("关闭 HBase 连接。"); if(connection != null) { connection.close(); } } }