package com.primeton.dsp.datarelease.data.bdata; import com.primeton.dsp.datarelease.server.model.DspHbaseResource; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.exceptions.HBaseException; import java.io.Closeable; import java.io.IOException; import java.util.Set; /** * * HBase 无认证,连接 ZK 获得连接 * * *
* * Created by zhaopx. * User: zhaopx * Date: 2020/4/21 * Time: 18:02 * ** * @author zhaopx */ @Slf4j public class SimpleHBaseConnectionServiceImpl implements HBaseConnectionService, Closeable { /** * Hive 数据源 */ final DspHbaseResource hbaseResource; /** * HBase 链接 */ final Connection connection; public SimpleHBaseConnectionServiceImpl(DspHbaseResource hbaseResource) { this.hbaseResource = hbaseResource; Configuration hbaseConf = null; if(StringUtils.isNotBlank(hbaseResource.getAuthUser())) { AuthPrincipalCreator authPrincipalCreator = AuthPrincipalCreator.useDataReleaseConf(hbaseResource.getAuthBasePath()); AuthPrincipal kerberosPrincipal = authPrincipalCreator.getKerberosPrincipal(hbaseResource.getAuthUser()); // 分别加载 core、hdfs、hbase site 文件 Configuration conf = new Configuration(); try { if (kerberosPrincipal.getCoreSite() != null) { conf.addResource(kerberosPrincipal.getCoreSite().toURL()); log.info("add config: {}", kerberosPrincipal.getCoreSite().getAbsolutePath()); } if (kerberosPrincipal.getHdfsSite() != null) { conf.addResource(kerberosPrincipal.getHdfsSite().toURL()); log.info("add config: {}", kerberosPrincipal.getHdfsSite().getAbsolutePath()); } conf.reloadConfiguration(); hbaseConf = HBaseConfiguration.create(conf); if (kerberosPrincipal.getHBaseSite() != null) { hbaseConf.addResource(kerberosPrincipal.getHBaseSite().toURL()); log.info("add config: {}", kerberosPrincipal.getHBaseSite().getAbsolutePath()); } hbaseConf.reloadConfiguration(); } catch (Exception e) { throw new IllegalStateException(e); } } else { hbaseConf = getConf(); } try { this.connection = ConnectionFactory.createConnection(hbaseConf); } catch (Exception e) { throw new IllegalStateException(e); } } public Configuration getConf() { Configuration conf = HBaseConfiguration.create(new Configuration()); //配置类 if(StringUtils.isNotBlank(hbaseResource.getHbaseMaster())) conf.set("hbase.master", hbaseResource.getHbaseMaster()); if(StringUtils.isNotBlank(hbaseResource.getHbaseZookeeperQuorum())) conf.set("hbase.zookeeper.quorum", hbaseResource.getHbaseZookeeperQuorum()); if(StringUtils.isNotBlank(hbaseResource.getZookeeperClientPort())) conf.set("hbase.zookeeper.property.clientPort", hbaseResource.getZookeeperClientPort()); if(StringUtils.isNotBlank(hbaseResource.getZnode())) conf.set("zookeeper.znode.parent", hbaseResource.getZnode()); conf.setInt("hbase.client.retries.number", 2); return conf; } @Override public boolean doAuth() { log.info("hbase 无需认证,通过。"); return true; } @Override public Admin getConnection() throws HBaseException { try { return connection.getAdmin(); } catch (Exception e) { throw new HBaseException("连接 HBase 异常。", e); } } @Override public Table getTable(String tableName) throws HBaseException { try { return connection.getTable(TableName.valueOf(tableName)); } catch (IOException e) { throw new HBaseException("无法获取Hbase " + tableName + " 表链接。", e); } } @Override public void close() throws IOException { log.info("关闭 HBase 连接。"); if(connection != null) { connection.close(); } } }