package com.primeton.damp.bigdata; import lombok.extern.slf4j.Slf4j; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.exceptions.HBaseException; import java.io.Closeable; import java.io.IOException; import java.util.Properties; import java.util.Set; /** * * 华为 FI HBASE 认证方式获得连接 * * *
 *
 * Created by zhaopx.
 * User: zhaopx
 * Date: 2020/4/22
 * Time: 18:02
 *
 * 
* * @author zhaopx */ @Slf4j public class FiHBaseConnectionServiceImpl implements HBaseConnectionService, Closeable { private static final String ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME = "Client"; private static final String ZOOKEEPER_SERVER_PRINCIPAL_KEY = "zookeeper.server.principal"; private static final String ZOOKEEPER_DEFAULT_SERVER_PRINCIPAL = "zookeeper/hadoop.hadoop.com"; /** * Hive 数据源 */ final Properties hbaseResource; /** * HBase 链接 */ Connection connection; public FiHBaseConnectionServiceImpl(Properties hbaseResource) { this.hbaseResource = hbaseResource; } @Override public boolean doAuth() { //KrbUser = "hadoop/cdh-node1@HADOOP.COM"; AuthPrincipalCreator authPrincipalCreator = AuthPrincipalCreator.useExtractorConf(hbaseResource.getProperty("authBasePath")); Set principals = authPrincipalCreator.listPrincipals(); log.info("find existed principals: {}", principals); String authUser = hbaseResource.getProperty("authUser"); AuthPrincipal kerberosPrincipal = authPrincipalCreator.getKerberosPrincipal(authUser); String userKeytab = kerberosPrincipal.getUserKeytabFile().getAbsolutePath(); String krb5File = kerberosPrincipal.getKrb5File().getAbsolutePath(); String krbUser = kerberosPrincipal.getPrincipal(); // 分别加载 core、hdfs、hbase site 文件 Configuration conf = new Configuration(); try { if (kerberosPrincipal.getCoreSite() != null) { conf.addResource(kerberosPrincipal.getCoreSite().toURL()); log.info("add config: {}", kerberosPrincipal.getCoreSite().getAbsolutePath()); } if (kerberosPrincipal.getHdfsSite() != null) { conf.addResource(kerberosPrincipal.getHdfsSite().toURL()); log.info("add config: {}", kerberosPrincipal.getHdfsSite().getAbsolutePath()); } conf.reloadConfiguration(); Configuration hbaseConf = HBaseConfiguration.create(conf); if (kerberosPrincipal.getHBaseSite() != null) { hbaseConf.addResource(kerberosPrincipal.getHBaseSite().toURL()); log.info("add config: {}", kerberosPrincipal.getHBaseSite().getAbsolutePath()); } hbaseConf.reloadConfiguration(); /* * Huawei Fi Hbase,认证 * * if need to connect zk, please provide jaas info about zk. of course, * you can do it as below: * System.setProperty("java.security.auth.login.config", confDirPath + * "jaas.conf"); but the demo can help you more : Note: if this process * will connect more than one zk cluster, the demo may be not proper. you * can contact us for more help */ LoginUtil.setJaasConf(ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME, krbUser, userKeytab); LoginUtil.setZookeeperServerPrincipal(ZOOKEEPER_SERVER_PRINCIPAL_KEY, ZOOKEEPER_DEFAULT_SERVER_PRINCIPAL); LoginUtil.login(krbUser, userKeytab, krb5File, hbaseConf); connection = ConnectionFactory.createConnection(hbaseConf); log.info("fi hbase kerberos 认证通过。"); return true; } catch (Exception e) { throw new SecurityException(e); } } @Override public Admin getConnection() throws HBaseException { try { return connection.getAdmin(); } catch (Exception e) { throw new HBaseException("连接 HBase 异常。", e); } } @Override public Table getTable(String tableName) throws HBaseException { try { return connection.getTable(TableName.valueOf(tableName)); } catch (IOException e) { throw new HBaseException("无法获取Hbase " + tableName + " 表链接。", e); } } @Override public void close() throws IOException { log.info("关闭 HBase 连接。"); if(connection != null) { connection.close(); } } }