123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352 |
- package com.primeton.dsp.datarelease.data.service;
- import com.alibaba.fastjson.JSONObject;
- import com.google.common.cache.Cache;
- import com.google.common.cache.CacheBuilder;
- import com.google.common.cache.RemovalListener;
- import com.google.common.cache.RemovalNotification;
- import org.apache.commons.lang.StringUtils;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.hbase.HBaseConfiguration;
- import org.apache.hadoop.hbase.TableName;
- import org.apache.hadoop.hbase.client.*;
- import org.apache.hadoop.hbase.util.Bytes;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import java.io.Closeable;
- import java.io.IOException;
- import java.util.*;
- import java.util.concurrent.TimeUnit;
- /**
- * <p>
- * HBase 操作类工具类。
- * </p>
- * <p/>
- * Created by ZhenQin on 2018/5/17 0017-9:52
- * Vendor: 9sdata.cn
- */
- public class CommonHBaseService implements Closeable {
- /**
- * 对象值转为 Byte[] 类型,先转为 文本类型再转为 byte[]
- */
- public final static int TEXT_BYTES_TYPE = 0;
- /**
- * HBase key 和 value 全部为 bin 类型
- */
- public final static int BIN_BYTES_TYPE = 1;
- /**
- * Hbase table cached
- */
- final static Cache<String, Table> HBASE_TABLE_CACHED = CacheBuilder.newBuilder()
- .maximumSize(100)
- .initialCapacity(20)
- .expireAfterAccess(30, TimeUnit.MINUTES) // 30 分钟不从缓存中读取该表,则移除出缓存
- .removalListener(new RemovalListener<String, Table>() {
- @Override
- public void onRemoval(RemovalNotification<String, Table> notification) {
- try {
- notification.getValue().close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- })
- .build();
- /**
- * tablename
- */
- final String tableName;
- /**
- * Hbase Table
- */
- final Table table;
- /**
- * 选择
- */
- private int bytesType = TEXT_BYTES_TYPE;
- final static Logger logger = LoggerFactory.getLogger(CommonHBaseService.class);
- /**
- * 构造方法传入Hbase Table
- *
- * @param table
- */
- public CommonHBaseService(Table table) {
- this(table, TEXT_BYTES_TYPE);
- }
- /**
- * 构造方法传入Hbase Table
- *
- * @param table
- */
- public CommonHBaseService(Table table, int type) {
- this.table = table;
- this.tableName = table.getName().getNameAsString();
- this.bytesType = type;
- }
- public CommonHBaseService(Connection connection, String tableName) throws IOException {
- Table table = HBASE_TABLE_CACHED.getIfPresent(tableName);
- if (table == null) {
- boolean existsTable = connection.getAdmin().tableExists(TableName.valueOf(tableName));
- //boolean existsTable = false;
- if (!existsTable) {
- logger.warn("table: " + tableName + " not exists.");
- this.tableName = null;
- //throw new IllegalStateException("table: " + tableName + " not exists.");
- this.table = null;
- } else {
- this.tableName = tableName;
- this.table = connection.getTable(TableName.valueOf(tableName));
- HBASE_TABLE_CACHED.put(tableName, this.table);
- }
- } else {
- this.tableName = tableName;
- this.table = table;
- }
- this.bytesType = TEXT_BYTES_TYPE;
- }
- public CommonHBaseService(Configuration conf, String tableName) throws IOException {
- this(ConnectionFactory.createConnection(conf), tableName);
- }
- public int put(JSONObject values, String defaultFamily) throws IOException {
- String key = values.getString("key");
- final JSONObject copy = new JSONObject(values.size() - 1);
- Set<Map.Entry<String, Object>> entries = values.entrySet();
- for (Map.Entry<String, Object> entry : entries) {
- if ("key".equals(entry.getKey())) {
- continue;
- }
- copy.put(entry.getKey(), entry.getValue());
- }
- return put(key, copy, defaultFamily);
- }
- public int put(String key, JSONObject values, String defaultFamily) throws IOException {
- table.put(jsonConvert2Put(key, values, defaultFamily));
- return 1;
- }
- public int puts(JSONObject[] values, String defaultFamily) throws IOException {
- return puts(Arrays.asList(values), defaultFamily);
- }
- public int puts(Collection<JSONObject> values, String defaultFamily) throws IOException {
- List<Put> puts = new ArrayList<>(values.size());
- for (JSONObject value : values) {
- puts.add(jsonConvert2Put(value.getString("key"), value, defaultFamily));
- }
- table.put(puts);
- return puts.size();
- }
- /**
- * 把 JSON 转为 HBase 的 put, json 中一定要包含 key 字段
- *
- * @param key
- * @param value
- * @param defaultFamily
- * @return
- */
- private Put jsonConvert2Put(String key, JSONObject value, String defaultFamily) {
- if (StringUtils.isBlank(key)) {
- throw new IllegalArgumentException("no key value to set...");
- }
- Put put = new Put(Bytes.toBytes(key));
- Set<Map.Entry<String, Object>> entries = value.entrySet();
- for (Map.Entry<String, Object> entry : entries) {
- String field = entry.getKey();
- if (StringUtils.isBlank(field) || ":".equals(field)) {
- throw new IllegalArgumentException("invalid field: " + field);
- }
- int indexOf = field.indexOf(":");
- if (indexOf <= 0) {
- //a=abc OR :a=abc
- field = indexOf == 0 ? field.substring(1) : field;
- put.addColumn(Bytes.toBytes(defaultFamily), Bytes.toBytes(field), getBytes(entry.getValue()));
- } else {
- //f:a=abc
- String f = field.substring(0, indexOf);
- String x = field.substring(indexOf + 1);
- put.addColumn(Bytes.toBytes(f), Bytes.toBytes(x), getBytes(entry.getValue()));
- }
- }
- return put;
- }
- /**
- * 对象值转为 byte[] 类型
- *
- * @param value
- * @return
- */
- private byte[] getBytes(Object value) {
- if (value == null) {
- return new byte[0];
- }
- if (value instanceof String) {
- return Bytes.toBytes((String) value);
- }
- if (value instanceof byte[]) {
- return (byte[]) value;
- }
- if (bytesType == 0) {
- return Bytes.toBytes(String.valueOf(value));
- }
- if (value instanceof Integer) {
- return Bytes.toBytes((Integer) value);
- } else if (value instanceof Long) {
- return Bytes.toBytes((Long) value);
- } else if (value instanceof Double) {
- return Bytes.toBytes((Double) value);
- } else if (value instanceof Float) {
- return Bytes.toBytes((Float) value);
- } else if (value instanceof Byte) {
- return Bytes.toBytes((Byte) value);
- } else if (value instanceof Boolean) {
- return Bytes.toBytes((Boolean) value);
- } else if (value instanceof Short) {
- return Bytes.toBytes((Short) value);
- } else if (value instanceof Date) {
- return Bytes.toBytes(((Date) value).getTime());
- }
- // 其他的未知类型,统一设置为字符串
- return Bytes.toBytes(String.valueOf(value));
- }
- /**
- * HBaseResult 转换为JSON, HBase rowkey 放在 json的 row 里,必须是字符串形式。
- *
- * @param result
- * @return
- */
- private JSONObject result2JSON(Result result) {
- JSONObject json = new JSONObject();
- byte[] row = result.getRow();
- if (row == null || row.length == 0) {
- return json;
- }
- json.put("key", Bytes.toString(row));
- NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> map = result.getMap();
- if (map == null || map.isEmpty()) {
- return json;
- }
- for (Map.Entry<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> entry : map.entrySet()) {
- byte[] familyBytes = entry.getKey();
- String family = Bytes.toString(familyBytes);
- NavigableMap<byte[], NavigableMap<Long, byte[]>> values = entry.getValue();
- for (Map.Entry<byte[], NavigableMap<Long, byte[]>> en : values.entrySet()) {
- String field = Bytes.toString(en.getKey());
- NavigableMap<Long, byte[]> value = en.getValue();
- Map.Entry<Long, byte[]> firstEntry = value.firstEntry();
- if (firstEntry != null) {
- json.put(family + ":" + field, Bytes.toString(firstEntry.getValue()));
- }
- }
- }
- return json;
- }
- public JSONObject get(String rowKey) throws IOException {
- if (StringUtils.isBlank(this.tableName)) {
- // table not exists.
- return null;
- }
- Result result = table.get(new Get(Bytes.toBytes(rowKey)));
- if (result == null) {
- return null;
- }
- return result2JSON(result);
- }
- public List<JSONObject> get(List<String> rowKeys) throws IOException {
- if (StringUtils.isBlank(this.tableName)) {
- // table not exists.
- return null;
- }
- List<Get> gets = new ArrayList<>(rowKeys.size());
- for (String rowKey : rowKeys) {
- gets.add(new Get(Bytes.toBytes(rowKey)));
- }
- Result[] results = table.get(gets);
- if (results == null) {
- return null;
- }
- List<JSONObject> res = new ArrayList<>(rowKeys.size());
- for (Result result : results) {
- res.add(result2JSON(result));
- }
- return res;
- }
- public int delete(String rowkey) throws IOException {
- table.delete(new Delete(Bytes.toBytes(rowkey)));
- return 1;
- }
- public int delete(byte[] rowkey) throws IOException {
- table.delete(new Delete(rowkey));
- return 1;
- }
- public int delete(String... rowkeys) throws IOException {
- return delete(Arrays.asList(rowkeys));
- }
- public int delete(Collection<String> rowkeys) throws IOException {
- List<Delete> deletes = new ArrayList<>(rowkeys.size());
- for (String rowkey : rowkeys) {
- deletes.add(new Delete(Bytes.toBytes(rowkey)));
- }
- table.delete(deletes);
- return deletes.size();
- }
- @Override
- public void close() throws IOException {
- if(table != null) {
- table.close();
- }
- }
- public void setBytesType(int bytesType) {
- this.bytesType = bytesType;
- }
- }
|