package com.primeton.dsp.datarelease.data.service;
import com.alibaba.fastjson.JSONObject;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.TimeUnit;
/**
*
* HBase 操作类工具类。
*
*
* Created by ZhenQin on 2018/5/17 0017-9:52
* Vendor: 9sdata.cn
*/
public class CommonHBaseService implements Closeable {
/**
* 对象值转为 Byte[] 类型,先转为 文本类型再转为 byte[]
*/
public final static int TEXT_BYTES_TYPE = 0;
/**
* HBase key 和 value 全部为 bin 类型
*/
public final static int BIN_BYTES_TYPE = 1;
/**
* Hbase table cached
*/
final static Cache HBASE_TABLE_CACHED = CacheBuilder.newBuilder()
.maximumSize(100)
.initialCapacity(20)
.expireAfterAccess(30, TimeUnit.MINUTES) // 30 分钟不从缓存中读取该表,则移除出缓存
.removalListener(new RemovalListener() {
@Override
public void onRemoval(RemovalNotification notification) {
try {
notification.getValue().close();
} catch (IOException e) {
e.printStackTrace();
}
}
})
.build();
/**
* tablename
*/
final String tableName;
/**
* Hbase Table
*/
final Table table;
/**
* 选择
*/
private int bytesType = TEXT_BYTES_TYPE;
final static Logger logger = LoggerFactory.getLogger(CommonHBaseService.class);
/**
* 构造方法传入Hbase Table
*
* @param table
*/
public CommonHBaseService(Table table) {
this(table, TEXT_BYTES_TYPE);
}
/**
* 构造方法传入Hbase Table
*
* @param table
*/
public CommonHBaseService(Table table, int type) {
this.table = table;
this.tableName = table.getName().getNameAsString();
this.bytesType = type;
}
public CommonHBaseService(Connection connection, String tableName) throws IOException {
Table table = HBASE_TABLE_CACHED.getIfPresent(tableName);
if (table == null) {
boolean existsTable = connection.getAdmin().tableExists(TableName.valueOf(tableName));
//boolean existsTable = false;
if (!existsTable) {
logger.warn("table: " + tableName + " not exists.");
this.tableName = null;
//throw new IllegalStateException("table: " + tableName + " not exists.");
this.table = null;
} else {
this.tableName = tableName;
this.table = connection.getTable(TableName.valueOf(tableName));
HBASE_TABLE_CACHED.put(tableName, this.table);
}
} else {
this.tableName = tableName;
this.table = table;
}
this.bytesType = TEXT_BYTES_TYPE;
}
public CommonHBaseService(Configuration conf, String tableName) throws IOException {
this(ConnectionFactory.createConnection(conf), tableName);
}
public int put(JSONObject values, String defaultFamily) throws IOException {
String key = values.getString("key");
final JSONObject copy = new JSONObject(values.size() - 1);
Set> entries = values.entrySet();
for (Map.Entry entry : entries) {
if ("key".equals(entry.getKey())) {
continue;
}
copy.put(entry.getKey(), entry.getValue());
}
return put(key, copy, defaultFamily);
}
public int put(String key, JSONObject values, String defaultFamily) throws IOException {
table.put(jsonConvert2Put(key, values, defaultFamily));
return 1;
}
public int puts(JSONObject[] values, String defaultFamily) throws IOException {
return puts(Arrays.asList(values), defaultFamily);
}
public int puts(Collection values, String defaultFamily) throws IOException {
List puts = new ArrayList<>(values.size());
for (JSONObject value : values) {
puts.add(jsonConvert2Put(value.getString("key"), value, defaultFamily));
}
table.put(puts);
return puts.size();
}
/**
* 把 JSON 转为 HBase 的 put, json 中一定要包含 key 字段
*
* @param key
* @param value
* @param defaultFamily
* @return
*/
private Put jsonConvert2Put(String key, JSONObject value, String defaultFamily) {
if (StringUtils.isBlank(key)) {
throw new IllegalArgumentException("no key value to set...");
}
Put put = new Put(Bytes.toBytes(key));
Set> entries = value.entrySet();
for (Map.Entry entry : entries) {
String field = entry.getKey();
if (StringUtils.isBlank(field) || ":".equals(field)) {
throw new IllegalArgumentException("invalid field: " + field);
}
int indexOf = field.indexOf(":");
if (indexOf <= 0) {
//a=abc OR :a=abc
field = indexOf == 0 ? field.substring(1) : field;
put.addColumn(Bytes.toBytes(defaultFamily), Bytes.toBytes(field), getBytes(entry.getValue()));
} else {
//f:a=abc
String f = field.substring(0, indexOf);
String x = field.substring(indexOf + 1);
put.addColumn(Bytes.toBytes(f), Bytes.toBytes(x), getBytes(entry.getValue()));
}
}
return put;
}
/**
* 对象值转为 byte[] 类型
*
* @param value
* @return
*/
private byte[] getBytes(Object value) {
if (value == null) {
return new byte[0];
}
if (value instanceof String) {
return Bytes.toBytes((String) value);
}
if (value instanceof byte[]) {
return (byte[]) value;
}
if (bytesType == 0) {
return Bytes.toBytes(String.valueOf(value));
}
if (value instanceof Integer) {
return Bytes.toBytes((Integer) value);
} else if (value instanceof Long) {
return Bytes.toBytes((Long) value);
} else if (value instanceof Double) {
return Bytes.toBytes((Double) value);
} else if (value instanceof Float) {
return Bytes.toBytes((Float) value);
} else if (value instanceof Byte) {
return Bytes.toBytes((Byte) value);
} else if (value instanceof Boolean) {
return Bytes.toBytes((Boolean) value);
} else if (value instanceof Short) {
return Bytes.toBytes((Short) value);
} else if (value instanceof Date) {
return Bytes.toBytes(((Date) value).getTime());
}
// 其他的未知类型,统一设置为字符串
return Bytes.toBytes(String.valueOf(value));
}
/**
* HBaseResult 转换为JSON, HBase rowkey 放在 json的 row 里,必须是字符串形式。
*
* @param result
* @return
*/
private JSONObject result2JSON(Result result) {
JSONObject json = new JSONObject();
byte[] row = result.getRow();
if (row == null || row.length == 0) {
return json;
}
json.put("key", Bytes.toString(row));
NavigableMap>> map = result.getMap();
if (map == null || map.isEmpty()) {
return json;
}
for (Map.Entry>> entry : map.entrySet()) {
byte[] familyBytes = entry.getKey();
String family = Bytes.toString(familyBytes);
NavigableMap> values = entry.getValue();
for (Map.Entry> en : values.entrySet()) {
String field = Bytes.toString(en.getKey());
NavigableMap value = en.getValue();
Map.Entry firstEntry = value.firstEntry();
if (firstEntry != null) {
json.put(family + ":" + field, Bytes.toString(firstEntry.getValue()));
}
}
}
return json;
}
public JSONObject get(String rowKey) throws IOException {
if (StringUtils.isBlank(this.tableName)) {
// table not exists.
return null;
}
Result result = table.get(new Get(Bytes.toBytes(rowKey)));
if (result == null) {
return null;
}
return result2JSON(result);
}
public List get(List rowKeys) throws IOException {
if (StringUtils.isBlank(this.tableName)) {
// table not exists.
return null;
}
List gets = new ArrayList<>(rowKeys.size());
for (String rowKey : rowKeys) {
gets.add(new Get(Bytes.toBytes(rowKey)));
}
Result[] results = table.get(gets);
if (results == null) {
return null;
}
List res = new ArrayList<>(rowKeys.size());
for (Result result : results) {
res.add(result2JSON(result));
}
return res;
}
public int delete(String rowkey) throws IOException {
table.delete(new Delete(Bytes.toBytes(rowkey)));
return 1;
}
public int delete(byte[] rowkey) throws IOException {
table.delete(new Delete(rowkey));
return 1;
}
public int delete(String... rowkeys) throws IOException {
return delete(Arrays.asList(rowkeys));
}
public int delete(Collection rowkeys) throws IOException {
List deletes = new ArrayList<>(rowkeys.size());
for (String rowkey : rowkeys) {
deletes.add(new Delete(Bytes.toBytes(rowkey)));
}
table.delete(deletes);
return deletes.size();
}
@Override
public void close() throws IOException {
if(table != null) {
table.close();
}
}
public void setBytesType(int bytesType) {
this.bytesType = bytesType;
}
}