package com.primeton.damp.bigdata;
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.exceptions.HBaseException;
import java.io.Closeable;
import java.io.IOException;
import java.util.Properties;
import java.util.Set;
/**
*
* 华为 FI HBASE 认证方式获得连接
*
*
*
*
* Created by zhaopx.
* User: zhaopx
* Date: 2020/4/22
* Time: 18:02
*
*
*
* @author zhaopx
*/
@Slf4j
public class FiHBaseConnectionServiceImpl implements HBaseConnectionService, Closeable {
private static final String ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME = "Client";
private static final String ZOOKEEPER_SERVER_PRINCIPAL_KEY = "zookeeper.server.principal";
private static final String ZOOKEEPER_DEFAULT_SERVER_PRINCIPAL = "zookeeper/hadoop.hadoop.com";
/**
* Hive 数据源
*/
final Properties hbaseResource;
/**
* HBase 链接
*/
Connection connection;
public FiHBaseConnectionServiceImpl(Properties hbaseResource) {
this.hbaseResource = hbaseResource;
}
@Override
public boolean doAuth() {
//KrbUser = "hadoop/cdh-node1@HADOOP.COM";
AuthPrincipalCreator authPrincipalCreator = AuthPrincipalCreator.useExtractorConf(hbaseResource.getProperty("authBasePath"));
Set principals = authPrincipalCreator.listPrincipals();
log.info("find existed principals: {}", principals);
String authUser = hbaseResource.getProperty("authUser");
AuthPrincipal kerberosPrincipal = authPrincipalCreator.getKerberosPrincipal(authUser);
String userKeytab = kerberosPrincipal.getUserKeytabFile().getAbsolutePath();
String krb5File = kerberosPrincipal.getKrb5File().getAbsolutePath();
String krbUser = kerberosPrincipal.getPrincipal();
// 分别加载 core、hdfs、hbase site 文件
Configuration conf = new Configuration();
try {
if (kerberosPrincipal.getCoreSite() != null) {
conf.addResource(kerberosPrincipal.getCoreSite().toURL());
log.info("add config: {}", kerberosPrincipal.getCoreSite().getAbsolutePath());
}
if (kerberosPrincipal.getHdfsSite() != null) {
conf.addResource(kerberosPrincipal.getHdfsSite().toURL());
log.info("add config: {}", kerberosPrincipal.getHdfsSite().getAbsolutePath());
}
conf.reloadConfiguration();
Configuration hbaseConf = HBaseConfiguration.create(conf);
if (kerberosPrincipal.getHBaseSite() != null) {
hbaseConf.addResource(kerberosPrincipal.getHBaseSite().toURL());
log.info("add config: {}", kerberosPrincipal.getHBaseSite().getAbsolutePath());
}
hbaseConf.reloadConfiguration();
/*
* Huawei Fi Hbase,认证
*
* if need to connect zk, please provide jaas info about zk. of course,
* you can do it as below:
* System.setProperty("java.security.auth.login.config", confDirPath +
* "jaas.conf"); but the demo can help you more : Note: if this process
* will connect more than one zk cluster, the demo may be not proper. you
* can contact us for more help
*/
LoginUtil.setJaasConf(ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME, krbUser, userKeytab);
LoginUtil.setZookeeperServerPrincipal(ZOOKEEPER_SERVER_PRINCIPAL_KEY,
ZOOKEEPER_DEFAULT_SERVER_PRINCIPAL);
LoginUtil.login(krbUser, userKeytab, krb5File, hbaseConf);
connection = ConnectionFactory.createConnection(hbaseConf);
log.info("fi hbase kerberos 认证通过。");
return true;
} catch (Exception e) {
throw new SecurityException(e);
}
}
@Override
public Admin getConnection() throws HBaseException {
try {
return connection.getAdmin();
} catch (Exception e) {
throw new HBaseException("连接 HBase 异常。", e);
}
}
@Override
public Table getTable(String tableName) throws HBaseException {
try {
return connection.getTable(TableName.valueOf(tableName));
} catch (IOException e) {
throw new HBaseException("无法获取Hbase " + tableName + " 表链接。", e);
}
}
@Override
public void close() throws IOException {
log.info("关闭 HBase 连接。");
if(connection != null) {
connection.close();
}
}
}