package com.primeton.dsp.datarelease.data.bdata; import lombok.extern.slf4j.Slf4j; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.exceptions.HBaseException; import java.io.Closeable; import java.io.IOException; import java.util.Properties; import java.util.Set; /** * * Hive Kerberos 认证方式获得连接 * * *
 *
 * Created by zhaopx.
 * User: zhaopx
 * Date: 2020/4/22
 * Time: 18:02
 *
 * 
* * @author zhaopx */ @Slf4j public class Krb5HBaseConnectionServiceImpl implements HBaseConnectionService, Closeable { /** * Hive 数据源 */ final Properties hbaseResource; /** * HBase 链接 */ Connection connection; public Krb5HBaseConnectionServiceImpl(Properties hbaseResource) { this.hbaseResource = hbaseResource; } @Override public boolean doAuth() { //KrbUser = "hadoop/cdh-node1@HADOOP.COM"; log.info("hbase 开始 kerberos 认证。"); AuthPrincipalCreator authPrincipalCreator = AuthPrincipalCreator.useExtractorConf(hbaseResource.getProperty("authBasePath")); Set principals = authPrincipalCreator.listPrincipals(); log.info("find existed principals: {}", principals); AuthPrincipal kerberosPrincipal = authPrincipalCreator.getKerberosPrincipal(hbaseResource.getProperty("authUser")); String userKeytab = kerberosPrincipal.getUserKeytabFile().getAbsolutePath(); String krb5File = kerberosPrincipal.getKrb5File().getAbsolutePath(); String krbUser = kerberosPrincipal.getPrincipal(); // 分别加载 core、hdfs、hbase site 文件 Configuration conf = new Configuration(); try { if (kerberosPrincipal.getCoreSite() != null) { conf.addResource(kerberosPrincipal.getCoreSite().toURL()); log.info("add config: {}", kerberosPrincipal.getCoreSite().getAbsolutePath()); } if (kerberosPrincipal.getHdfsSite() != null) { conf.addResource(kerberosPrincipal.getHdfsSite().toURL()); log.info("add config: {}", kerberosPrincipal.getHdfsSite().getAbsolutePath()); } conf.reloadConfiguration(); Configuration hbaseConf = HBaseConfiguration.create(conf); if (kerberosPrincipal.getHBaseSite() != null) { hbaseConf.addResource(kerberosPrincipal.getHBaseSite().toURL()); log.info("add config: {}", kerberosPrincipal.getHBaseSite().getAbsolutePath()); } hbaseConf.reloadConfiguration(); // Kerberos 认证 KerberosUtil.loginKerberos(hbaseConf, krbUser, userKeytab, krb5File); connection = ConnectionFactory.createConnection(hbaseConf); log.info("hbase kerberos 认证通过。"); return true; } catch (Exception e) { throw new SecurityException("HBase Kerberos 认证异常。", e); } } @Override public Admin getConnection() throws HBaseException { try { return connection.getAdmin(); } catch (Exception e) { throw new HBaseException("连接 HBase 异常。", e); } } @Override public Table getTable(String tableName) throws HBaseException { try { return connection.getTable(TableName.valueOf(tableName)); } catch (IOException e) { throw new HBaseException("无法获取Hbase " + tableName + " 表链接。", e); } } @Override public void close() throws IOException { log.info("关闭 HBase 连接。"); if(connection != null) { connection.close(); } } }