|
|
@@ -0,0 +1,352 @@
|
|
|
+package com.primeton.dsp.datarelease.data.service;
|
|
|
+
|
|
|
+import com.alibaba.fastjson.JSONObject;
|
|
|
+import com.google.common.cache.Cache;
|
|
|
+import com.google.common.cache.CacheBuilder;
|
|
|
+import com.google.common.cache.RemovalListener;
|
|
|
+import com.google.common.cache.RemovalNotification;
|
|
|
+import org.apache.commons.lang.StringUtils;
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.hbase.HBaseConfiguration;
|
|
|
+import org.apache.hadoop.hbase.TableName;
|
|
|
+import org.apache.hadoop.hbase.client.*;
|
|
|
+import org.apache.hadoop.hbase.util.Bytes;
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
+
|
|
|
+import java.io.Closeable;
|
|
|
+import java.io.IOException;
|
|
|
+import java.util.*;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
+
|
|
|
+/**
|
|
|
+ * <p>
|
|
|
+ * HBase 操作类工具类。
|
|
|
+ * </p>
|
|
|
+ * <p/>
|
|
|
+ * Created by ZhenQin on 2018/5/17 0017-9:52
|
|
|
+ * Vendor: 9sdata.cn
|
|
|
+ */
|
|
|
+public class CommonHBaseService implements Closeable {
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 对象值转为 Byte[] 类型,先转为 文本类型再转为 byte[]
|
|
|
+ */
|
|
|
+ public final static int TEXT_BYTES_TYPE = 0;
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * HBase key 和 value 全部为 bin 类型
|
|
|
+ */
|
|
|
+ public final static int BIN_BYTES_TYPE = 1;
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Hbase table cached
|
|
|
+ */
|
|
|
+ final static Cache<String, Table> HBASE_TABLE_CACHED = CacheBuilder.newBuilder()
|
|
|
+ .maximumSize(100)
|
|
|
+ .initialCapacity(20)
|
|
|
+ .expireAfterAccess(30, TimeUnit.MINUTES) // 30 分钟不从缓存中读取该表,则移除出缓存
|
|
|
+ .removalListener(new RemovalListener<String, Table>() {
|
|
|
+ @Override
|
|
|
+ public void onRemoval(RemovalNotification<String, Table> notification) {
|
|
|
+ try {
|
|
|
+ notification.getValue().close();
|
|
|
+ } catch (IOException e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ })
|
|
|
+ .build();
|
|
|
+
|
|
|
+ /**
|
|
|
+ * tablename
|
|
|
+ */
|
|
|
+ final String tableName;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Hbase Table
|
|
|
+ */
|
|
|
+ final Table table;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 选择
|
|
|
+ */
|
|
|
+ private int bytesType = TEXT_BYTES_TYPE;
|
|
|
+
|
|
|
+
|
|
|
+ final static Logger logger = LoggerFactory.getLogger(CommonHBaseService.class);
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 构造方法传入Hbase Table
|
|
|
+ *
|
|
|
+ * @param table
|
|
|
+ */
|
|
|
+ public CommonHBaseService(Table table) {
|
|
|
+ this(table, TEXT_BYTES_TYPE);
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 构造方法传入Hbase Table
|
|
|
+ *
|
|
|
+ * @param table
|
|
|
+ */
|
|
|
+ public CommonHBaseService(Table table, int type) {
|
|
|
+ this.table = table;
|
|
|
+ this.tableName = table.getName().getNameAsString();
|
|
|
+ this.bytesType = type;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ public CommonHBaseService(Connection connection, String tableName) throws IOException {
|
|
|
+ Table table = HBASE_TABLE_CACHED.getIfPresent(tableName);
|
|
|
+ if (table == null) {
|
|
|
+ boolean existsTable = connection.getAdmin().tableExists(TableName.valueOf(tableName));
|
|
|
+ //boolean existsTable = false;
|
|
|
+ if (!existsTable) {
|
|
|
+ logger.warn("table: " + tableName + " not exists.");
|
|
|
+ this.tableName = null;
|
|
|
+ //throw new IllegalStateException("table: " + tableName + " not exists.");
|
|
|
+ this.table = null;
|
|
|
+ } else {
|
|
|
+ this.tableName = tableName;
|
|
|
+ this.table = connection.getTable(TableName.valueOf(tableName));
|
|
|
+ HBASE_TABLE_CACHED.put(tableName, this.table);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ this.tableName = tableName;
|
|
|
+ this.table = table;
|
|
|
+ }
|
|
|
+ this.bytesType = TEXT_BYTES_TYPE;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ public CommonHBaseService(Configuration conf, String tableName) throws IOException {
|
|
|
+ this(ConnectionFactory.createConnection(conf), tableName);
|
|
|
+ }
|
|
|
+
|
|
|
+ public int put(JSONObject values, String defaultFamily) throws IOException {
|
|
|
+ String key = values.getString("key");
|
|
|
+ final JSONObject copy = new JSONObject(values.size() - 1);
|
|
|
+ Set<Map.Entry<String, Object>> entries = values.entrySet();
|
|
|
+ for (Map.Entry<String, Object> entry : entries) {
|
|
|
+ if ("key".equals(entry.getKey())) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ copy.put(entry.getKey(), entry.getValue());
|
|
|
+ }
|
|
|
+ return put(key, copy, defaultFamily);
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ public int put(String key, JSONObject values, String defaultFamily) throws IOException {
|
|
|
+ table.put(jsonConvert2Put(key, values, defaultFamily));
|
|
|
+ return 1;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ public int puts(JSONObject[] values, String defaultFamily) throws IOException {
|
|
|
+ return puts(Arrays.asList(values), defaultFamily);
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ public int puts(Collection<JSONObject> values, String defaultFamily) throws IOException {
|
|
|
+ List<Put> puts = new ArrayList<>(values.size());
|
|
|
+ for (JSONObject value : values) {
|
|
|
+ puts.add(jsonConvert2Put(value.getString("key"), value, defaultFamily));
|
|
|
+ }
|
|
|
+ table.put(puts);
|
|
|
+ return puts.size();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 把 JSON 转为 HBase 的 put, json 中一定要包含 key 字段
|
|
|
+ *
|
|
|
+ * @param key
|
|
|
+ * @param value
|
|
|
+ * @param defaultFamily
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ private Put jsonConvert2Put(String key, JSONObject value, String defaultFamily) {
|
|
|
+ if (StringUtils.isBlank(key)) {
|
|
|
+ throw new IllegalArgumentException("no key value to set...");
|
|
|
+ }
|
|
|
+ Put put = new Put(Bytes.toBytes(key));
|
|
|
+ Set<Map.Entry<String, Object>> entries = value.entrySet();
|
|
|
+ for (Map.Entry<String, Object> entry : entries) {
|
|
|
+ String field = entry.getKey();
|
|
|
+ if (StringUtils.isBlank(field) || ":".equals(field)) {
|
|
|
+ throw new IllegalArgumentException("invalid field: " + field);
|
|
|
+ }
|
|
|
+ int indexOf = field.indexOf(":");
|
|
|
+ if (indexOf <= 0) {
|
|
|
+ //a=abc OR :a=abc
|
|
|
+ field = indexOf == 0 ? field.substring(1) : field;
|
|
|
+ put.addColumn(Bytes.toBytes(defaultFamily), Bytes.toBytes(field), getBytes(entry.getValue()));
|
|
|
+ } else {
|
|
|
+ //f:a=abc
|
|
|
+ String f = field.substring(0, indexOf);
|
|
|
+ String x = field.substring(indexOf + 1);
|
|
|
+ put.addColumn(Bytes.toBytes(f), Bytes.toBytes(x), getBytes(entry.getValue()));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return put;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 对象值转为 byte[] 类型
|
|
|
+ *
|
|
|
+ * @param value
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ private byte[] getBytes(Object value) {
|
|
|
+ if (value == null) {
|
|
|
+ return new byte[0];
|
|
|
+ }
|
|
|
+ if (value instanceof String) {
|
|
|
+ return Bytes.toBytes((String) value);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (value instanceof byte[]) {
|
|
|
+ return (byte[]) value;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (bytesType == 0) {
|
|
|
+ return Bytes.toBytes(String.valueOf(value));
|
|
|
+ }
|
|
|
+
|
|
|
+ if (value instanceof Integer) {
|
|
|
+ return Bytes.toBytes((Integer) value);
|
|
|
+ } else if (value instanceof Long) {
|
|
|
+ return Bytes.toBytes((Long) value);
|
|
|
+ } else if (value instanceof Double) {
|
|
|
+ return Bytes.toBytes((Double) value);
|
|
|
+ } else if (value instanceof Float) {
|
|
|
+ return Bytes.toBytes((Float) value);
|
|
|
+ } else if (value instanceof Byte) {
|
|
|
+ return Bytes.toBytes((Byte) value);
|
|
|
+ } else if (value instanceof Boolean) {
|
|
|
+ return Bytes.toBytes((Boolean) value);
|
|
|
+ } else if (value instanceof Short) {
|
|
|
+ return Bytes.toBytes((Short) value);
|
|
|
+ } else if (value instanceof Date) {
|
|
|
+ return Bytes.toBytes(((Date) value).getTime());
|
|
|
+ }
|
|
|
+
|
|
|
+ // 其他的未知类型,统一设置为字符串
|
|
|
+ return Bytes.toBytes(String.valueOf(value));
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * HBaseResult 转换为JSON, HBase rowkey 放在 json的 row 里,必须是字符串形式。
|
|
|
+ *
|
|
|
+ * @param result
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ private JSONObject result2JSON(Result result) {
|
|
|
+ JSONObject json = new JSONObject();
|
|
|
+ byte[] row = result.getRow();
|
|
|
+ if (row == null || row.length == 0) {
|
|
|
+ return json;
|
|
|
+ }
|
|
|
+ json.put("key", Bytes.toString(row));
|
|
|
+
|
|
|
+ NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> map = result.getMap();
|
|
|
+ if (map == null || map.isEmpty()) {
|
|
|
+ return json;
|
|
|
+ }
|
|
|
+ for (Map.Entry<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> entry : map.entrySet()) {
|
|
|
+ byte[] familyBytes = entry.getKey();
|
|
|
+ String family = Bytes.toString(familyBytes);
|
|
|
+ NavigableMap<byte[], NavigableMap<Long, byte[]>> values = entry.getValue();
|
|
|
+ for (Map.Entry<byte[], NavigableMap<Long, byte[]>> en : values.entrySet()) {
|
|
|
+ String field = Bytes.toString(en.getKey());
|
|
|
+
|
|
|
+ NavigableMap<Long, byte[]> value = en.getValue();
|
|
|
+ Map.Entry<Long, byte[]> firstEntry = value.firstEntry();
|
|
|
+ if (firstEntry != null) {
|
|
|
+ json.put(family + ":" + field, Bytes.toString(firstEntry.getValue()));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return json;
|
|
|
+ }
|
|
|
+
|
|
|
+ public JSONObject get(String rowKey) throws IOException {
|
|
|
+ if (StringUtils.isBlank(this.tableName)) {
|
|
|
+ // table not exists.
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ Result result = table.get(new Get(Bytes.toBytes(rowKey)));
|
|
|
+ if (result == null) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ return result2JSON(result);
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ public List<JSONObject> get(List<String> rowKeys) throws IOException {
|
|
|
+ if (StringUtils.isBlank(this.tableName)) {
|
|
|
+ // table not exists.
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ List<Get> gets = new ArrayList<>(rowKeys.size());
|
|
|
+ for (String rowKey : rowKeys) {
|
|
|
+ gets.add(new Get(Bytes.toBytes(rowKey)));
|
|
|
+ }
|
|
|
+ Result[] results = table.get(gets);
|
|
|
+ if (results == null) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ List<JSONObject> res = new ArrayList<>(rowKeys.size());
|
|
|
+ for (Result result : results) {
|
|
|
+ res.add(result2JSON(result));
|
|
|
+ }
|
|
|
+ return res;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ public int delete(String rowkey) throws IOException {
|
|
|
+ table.delete(new Delete(Bytes.toBytes(rowkey)));
|
|
|
+ return 1;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ public int delete(byte[] rowkey) throws IOException {
|
|
|
+ table.delete(new Delete(rowkey));
|
|
|
+ return 1;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ public int delete(String... rowkeys) throws IOException {
|
|
|
+ return delete(Arrays.asList(rowkeys));
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ public int delete(Collection<String> rowkeys) throws IOException {
|
|
|
+ List<Delete> deletes = new ArrayList<>(rowkeys.size());
|
|
|
+ for (String rowkey : rowkeys) {
|
|
|
+ deletes.add(new Delete(Bytes.toBytes(rowkey)));
|
|
|
+ }
|
|
|
+ table.delete(deletes);
|
|
|
+ return deletes.size();
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void close() throws IOException {
|
|
|
+ if(table != null) {
|
|
|
+ table.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public void setBytesType(int bytesType) {
|
|
|
+ this.bytesType = bytesType;
|
|
|
+ }
|
|
|
+
|
|
|
+}
|