package com.primeton.dsp.datarelease.data.service; import com.alibaba.fastjson.JSONObject; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.common.cache.RemovalListener; import com.google.common.cache.RemovalNotification; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.util.Bytes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Closeable; import java.io.IOException; import java.util.*; import java.util.concurrent.TimeUnit; /** *

* HBase 操作类工具类。 *

*

* Created by ZhenQin on 2018/5/17 0017-9:52 * Vendor: 9sdata.cn */ public class CommonHBaseService implements Closeable { /** * 对象值转为 Byte[] 类型,先转为 文本类型再转为 byte[] */ public final static int TEXT_BYTES_TYPE = 0; /** * HBase key 和 value 全部为 bin 类型 */ public final static int BIN_BYTES_TYPE = 1; /** * Hbase table cached */ final static Cache HBASE_TABLE_CACHED = CacheBuilder.newBuilder() .maximumSize(100) .initialCapacity(20) .expireAfterAccess(30, TimeUnit.MINUTES) // 30 分钟不从缓存中读取该表,则移除出缓存 .removalListener(new RemovalListener() { @Override public void onRemoval(RemovalNotification notification) { try { notification.getValue().close(); } catch (IOException e) { e.printStackTrace(); } } }) .build(); /** * tablename */ final String tableName; /** * Hbase Table */ final Table table; /** * 选择 */ private int bytesType = TEXT_BYTES_TYPE; final static Logger logger = LoggerFactory.getLogger(CommonHBaseService.class); /** * 构造方法传入Hbase Table * * @param table */ public CommonHBaseService(Table table) { this(table, TEXT_BYTES_TYPE); } /** * 构造方法传入Hbase Table * * @param table */ public CommonHBaseService(Table table, int type) { this.table = table; this.tableName = table.getName().getNameAsString(); this.bytesType = type; } public CommonHBaseService(Connection connection, String tableName) throws IOException { Table table = HBASE_TABLE_CACHED.getIfPresent(tableName); if (table == null) { boolean existsTable = connection.getAdmin().tableExists(TableName.valueOf(tableName)); //boolean existsTable = false; if (!existsTable) { logger.warn("table: " + tableName + " not exists."); this.tableName = null; //throw new IllegalStateException("table: " + tableName + " not exists."); this.table = null; } else { this.tableName = tableName; this.table = connection.getTable(TableName.valueOf(tableName)); HBASE_TABLE_CACHED.put(tableName, this.table); } } else { this.tableName = tableName; this.table = table; } this.bytesType = TEXT_BYTES_TYPE; } public CommonHBaseService(Configuration conf, String tableName) throws IOException { this(ConnectionFactory.createConnection(conf), tableName); } public int put(JSONObject values, String defaultFamily) throws IOException { String key = values.getString("key"); final JSONObject copy = new JSONObject(values.size() - 1); Set> entries = values.entrySet(); for (Map.Entry entry : entries) { if ("key".equals(entry.getKey())) { continue; } copy.put(entry.getKey(), entry.getValue()); } return put(key, copy, defaultFamily); } public int put(String key, JSONObject values, String defaultFamily) throws IOException { table.put(jsonConvert2Put(key, values, defaultFamily)); return 1; } public int puts(JSONObject[] values, String defaultFamily) throws IOException { return puts(Arrays.asList(values), defaultFamily); } public int puts(Collection values, String defaultFamily) throws IOException { List puts = new ArrayList<>(values.size()); for (JSONObject value : values) { puts.add(jsonConvert2Put(value.getString("key"), value, defaultFamily)); } table.put(puts); return puts.size(); } /** * 把 JSON 转为 HBase 的 put, json 中一定要包含 key 字段 * * @param key * @param value * @param defaultFamily * @return */ private Put jsonConvert2Put(String key, JSONObject value, String defaultFamily) { if (StringUtils.isBlank(key)) { throw new IllegalArgumentException("no key value to set..."); } Put put = new Put(Bytes.toBytes(key)); Set> entries = value.entrySet(); for (Map.Entry entry : entries) { String field = entry.getKey(); if (StringUtils.isBlank(field) || ":".equals(field)) { throw new IllegalArgumentException("invalid field: " + field); } int indexOf = field.indexOf(":"); if (indexOf <= 0) { //a=abc OR :a=abc field = indexOf == 0 ? field.substring(1) : field; put.addColumn(Bytes.toBytes(defaultFamily), Bytes.toBytes(field), getBytes(entry.getValue())); } else { //f:a=abc String f = field.substring(0, indexOf); String x = field.substring(indexOf + 1); put.addColumn(Bytes.toBytes(f), Bytes.toBytes(x), getBytes(entry.getValue())); } } return put; } /** * 对象值转为 byte[] 类型 * * @param value * @return */ private byte[] getBytes(Object value) { if (value == null) { return new byte[0]; } if (value instanceof String) { return Bytes.toBytes((String) value); } if (value instanceof byte[]) { return (byte[]) value; } if (bytesType == 0) { return Bytes.toBytes(String.valueOf(value)); } if (value instanceof Integer) { return Bytes.toBytes((Integer) value); } else if (value instanceof Long) { return Bytes.toBytes((Long) value); } else if (value instanceof Double) { return Bytes.toBytes((Double) value); } else if (value instanceof Float) { return Bytes.toBytes((Float) value); } else if (value instanceof Byte) { return Bytes.toBytes((Byte) value); } else if (value instanceof Boolean) { return Bytes.toBytes((Boolean) value); } else if (value instanceof Short) { return Bytes.toBytes((Short) value); } else if (value instanceof Date) { return Bytes.toBytes(((Date) value).getTime()); } // 其他的未知类型,统一设置为字符串 return Bytes.toBytes(String.valueOf(value)); } /** * HBaseResult 转换为JSON, HBase rowkey 放在 json的 row 里,必须是字符串形式。 * * @param result * @return */ private JSONObject result2JSON(Result result) { JSONObject json = new JSONObject(); byte[] row = result.getRow(); if (row == null || row.length == 0) { return json; } json.put("key", Bytes.toString(row)); NavigableMap>> map = result.getMap(); if (map == null || map.isEmpty()) { return json; } for (Map.Entry>> entry : map.entrySet()) { byte[] familyBytes = entry.getKey(); String family = Bytes.toString(familyBytes); NavigableMap> values = entry.getValue(); for (Map.Entry> en : values.entrySet()) { String field = Bytes.toString(en.getKey()); NavigableMap value = en.getValue(); Map.Entry firstEntry = value.firstEntry(); if (firstEntry != null) { json.put(family + ":" + field, Bytes.toString(firstEntry.getValue())); } } } return json; } public JSONObject get(String rowKey) throws IOException { if (StringUtils.isBlank(this.tableName)) { // table not exists. return null; } Result result = table.get(new Get(Bytes.toBytes(rowKey))); if (result == null) { return null; } return result2JSON(result); } public List get(List rowKeys) throws IOException { if (StringUtils.isBlank(this.tableName)) { // table not exists. return null; } List gets = new ArrayList<>(rowKeys.size()); for (String rowKey : rowKeys) { gets.add(new Get(Bytes.toBytes(rowKey))); } Result[] results = table.get(gets); if (results == null) { return null; } List res = new ArrayList<>(rowKeys.size()); for (Result result : results) { res.add(result2JSON(result)); } return res; } public int delete(String rowkey) throws IOException { table.delete(new Delete(Bytes.toBytes(rowkey))); return 1; } public int delete(byte[] rowkey) throws IOException { table.delete(new Delete(rowkey)); return 1; } public int delete(String... rowkeys) throws IOException { return delete(Arrays.asList(rowkeys)); } public int delete(Collection rowkeys) throws IOException { List deletes = new ArrayList<>(rowkeys.size()); for (String rowkey : rowkeys) { deletes.add(new Delete(Bytes.toBytes(rowkey))); } table.delete(deletes); return deletes.size(); } @Override public void close() throws IOException { if(table != null) { table.close(); } } public void setBytesType(int bytesType) { this.bytesType = bytesType; } }