Explorar o código

添加MQService支持,添加Leveldb缓存支持

zhzhenqin %!s(int64=4) %!d(string=hai) anos
pai
achega
bdad60b710

+ 95 - 0
common-utils/JwtUtils.java

@@ -0,0 +1,95 @@
+/**
+ * YIDATA集成平台
+ *
+ * http://yiidata.com
+ *
+ *
+ */
+
+package com.yiidata.amc.hub.config;
+
+import io.jsonwebtoken.Claims;
+import io.jsonwebtoken.Jwts;
+import io.jsonwebtoken.SignatureAlgorithm;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.stereotype.Component;
+
+import java.util.Date;
+
+/**
+ * jwt工具类
+ *
+ * @author Mark sunlightcs@gmail.com
+ */
+@ConfigurationProperties(prefix = "amc.jwt")
+@Component
+public class JwtUtils {
+    private Logger logger = LoggerFactory.getLogger(getClass());
+
+    private String secret;
+    private long expire;
+    private String header;
+
+    /**
+     * 生成jwt token
+     */
+    public String generateToken(long userId) {
+        Date nowDate = new Date();
+        //过期时间
+        Date expireDate = new Date(nowDate.getTime() + expire * 1000);
+
+        return Jwts.builder()
+                .setHeaderParam("typ", "JWT")
+                .setSubject(userId+"")
+                .setIssuedAt(nowDate)
+                .setExpiration(expireDate)
+                .signWith(SignatureAlgorithm.HS512, secret)
+                .compact();
+    }
+
+    public Claims getClaimByToken(String token) {
+        try {
+            return Jwts.parser()
+                    .setSigningKey(secret)
+                    .parseClaimsJws(token)
+                    .getBody();
+        }catch (Exception e){
+            logger.debug("validate is token error ", e);
+            return null;
+        }
+    }
+
+    /**
+     * token是否过期
+     * @return  true:过期
+     */
+    public boolean isTokenExpired(Date expiration) {
+        return expiration.before(new Date());
+    }
+
+    public String getSecret() {
+        return secret;
+    }
+
+    public void setSecret(String secret) {
+        this.secret = secret;
+    }
+
+    public long getExpire() {
+        return expire;
+    }
+
+    public void setExpire(long expire) {
+        this.expire = expire;
+    }
+
+    public String getHeader() {
+        return header;
+    }
+
+    public void setHeader(String header) {
+        this.header = header;
+    }
+}

+ 2 - 1
common-utils/README.md

@@ -1,4 +1,5 @@
 功能说明:
 
 - ID 生成器:SnowflakeIdUtils
-- 网卡地址获取:NetworkInterfaceManager
+- 网卡地址获取:NetworkInterfaceManager
+- 基于Token的加密解密:JwtUtils

+ 80 - 0
embaded_activemq/MQBrockerService.java

@@ -0,0 +1,80 @@
+package com.yiidata.amc.server.service;
+
+import com.yiidata.amc.api.utils.ServerConfig;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+
+import javax.inject.Inject;
+import javax.inject.Named;
+import java.io.File;
+
+/**
+ * <p>
+ * commons of this class
+ * </p>
+ * <p>
+ * Created by ZhenQin on 2018/3/27 0027-10:58
+ * Vendor: yiidata.com
+ */
+public class MQBrockerService {
+
+
+    final String mqBrokerUrl;
+
+
+    final BrokerService brocker;
+
+
+    @Inject
+    public MQBrockerService(@Named("brokerUrl") String mqBrokerUrl, ServerConfig serverConfig) {
+        this.mqBrokerUrl = mqBrokerUrl;
+
+        try {
+            brocker = new BrokerService();
+            brocker.addConnector(mqBrokerUrl);
+            // 消息持久化
+            brocker.setPersistent("true".equals(serverConfig.getProperty("mq.persistent", "false")));
+            brocker.setUseJmx(false);
+
+            PolicyMap policy = new PolicyMap();
+            PolicyEntry entry = new PolicyEntry();
+            entry.setAdvisoryForConsumed(true);
+
+            policy.put(new ActiveMQQueue(">"), entry);
+            brocker.setDestinationPolicy(policy);
+
+            // 持久化
+            if(brocker.isPersistent()) {
+                KahaDBPersistenceAdapter adaptor = new KahaDBPersistenceAdapter();
+                File dir = new File(ServerConfig.getAppHome(), "KahaDB");
+                if(!dir.exists()) {
+                    dir.mkdirs();
+                }
+                adaptor.setDirectory(dir);
+                brocker.setPersistenceAdapter(adaptor);
+            }
+        } catch (Exception e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    public void start() {
+        try {
+            brocker.start();
+        } catch (Exception e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+
+    public void stop() {
+        try {
+            brocker.stop();
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+}

+ 12 - 0
java-commons-cache/CacheFactory.java

@@ -81,6 +81,9 @@ public class CacheFactory implements FactoryBean<ICache>, InitializingBean, Disp
         if (StringUtils.isBlank(cacheType) || "mem".equalsIgnoreCase(cacheType) || "memory".equalsIgnoreCase(cacheType)) {
             logger.info("use guava cache.");
             this.cache = new GuavaMemCached();
+        } else if ("leveldb".equalsIgnoreCase(cacheType)) {
+            logger.info("use leveldb.");
+            this.cache = initLevelDb();
         } else if ("ehcache".equalsIgnoreCase(cacheType)) {
             logger.info("use ehcache.");
             this.cache = initEhcache();
@@ -111,6 +114,15 @@ public class CacheFactory implements FactoryBean<ICache>, InitializingBean, Disp
     private ICache initRedisCache(String host, int port, String password, int db) {
         return new Redised(host, port, password, db);
     }
+	
+	
+    /**
+     * 初始化 LevelDB 客户端
+     * @return
+     */
+    private ICache initLevelDb() {
+        return new LevelDB();
+    }
 
     /**
      * 初始化 EHCache 客户端

+ 130 - 0
java-commons-cache/LevelDB.java

@@ -0,0 +1,130 @@
+package com.yiidata.flume.cache;
+
+import com.yiidata.flume.cache.serde.Serializer;
+import com.yiidata.flume.cache.serde.StringSerializer;
+import com.yiidata.flume.support.FlumeConfig;
+import lombok.NonNull;
+import org.iq80.leveldb.DB;
+import org.iq80.leveldb.Options;
+import org.iq80.leveldb.impl.Iq80DBFactory;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+
+/**
+ *
+ * Level DB Cache 实现
+ *
+ * <pre>
+ *
+ * Created by zhenqin.
+ * User: zhenqin
+ * Date: 2021/7/20
+ * Time: 14:46
+ * Vendor: yiidata.com
+ *
+ * </pre>
+ *
+ * @author zhenqin
+ */
+public class LevelDB implements ICache<String>, Closeable {
+
+
+    /**
+     * level db
+     */
+    final DB db;
+
+
+
+    /**
+     * KEY 序列化
+     */
+    private final Serializer<String> KEY_SERDE = new StringSerializer();
+
+
+    public LevelDB() {
+        final File levelDbPath = new File(FlumeConfig.getAppHome(), "db");
+        if(!levelDbPath.exists()) {
+            levelDbPath.mkdirs();
+        }
+
+        try {
+            final Options options = new Options();
+            options.createIfMissing(true).errorIfExists(false);
+            this.db = new Iq80DBFactory().open(levelDbPath, options);
+        } catch (Exception e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    @Override
+    public void add(@NonNull String key, @NonNull String value) {
+        db.put(KEY_SERDE.serialize(key), KEY_SERDE.serialize(value));
+    }
+
+    @Override
+    public void add(String key, int exp, String value) {
+        add(key, value);
+    }
+
+    @Override
+    public String get(@NonNull String key) {
+        return KEY_SERDE.deserialize(db.get(KEY_SERDE.serialize(key)));
+    }
+
+    @Override
+    public String remove(@NonNull String key) {
+        db.delete(KEY_SERDE.serialize(key));
+        return null;
+    }
+
+    @Override
+    public int removeByPrefix(String prefix) {
+        return 0;
+    }
+
+    @Override
+    public void clear() {
+
+    }
+
+    @Override
+    public int size() {
+        return 0;
+    }
+
+    @Override
+    public void shutdown() {
+        try {
+            close();
+        } catch (IOException ignore) {}
+    }
+
+    @Override
+    public void close() throws IOException {
+        db.close();
+    }
+
+    public static void main(String[] args) {
+        final LevelDB levelDB = new LevelDB();
+        /*
+        for (int i = 0; i < 100000; i++) {
+            levelDB.add("hello" + i, "world, " + i);
+        }*/
+        long start = System.currentTimeMillis();
+        for (int i = 0; i < 100; i++) {
+            final String va = levelDB.get("hello" + i);
+            System.out.println(va);
+        }
+        System.out.println(System.currentTimeMillis() - start);
+        /*
+        start = System.currentTimeMillis();
+        for (int i = 0; i < 100000; i++) {
+            levelDB.get("hello" + i);
+        }
+        System.out.println(System.currentTimeMillis() - start);*/
+        levelDB.shutdown();
+    }
+}