Procházet zdrojové kódy

add serde and file client

zhzhenqin před 4 roky
rodič
revize
7f4e60c829
43 změnil soubory, kde provedl 2855 přidání a 193 odebrání
  1. 5 5
      hadoop-auth/AuthPrincipalCreator.java
  2. 13 9
      hadoop-auth/FIHiveConnectionServiceImpl.java
  3. 18 16
      hadoop-auth/FiHBaseConnectionServiceImpl.java
  4. 9 6
      hadoop-auth/HBaseConnectionFactory.java
  5. 6 6
      hadoop-auth/HiveConnectionFactory.java
  6. 2 0
      hadoop-auth/HiveHelper.java
  7. 2 0
      hadoop-auth/KerberosUtil.java
  8. 6 5
      hadoop-auth/Krb5HBaseConnectionServiceImpl.java
  9. 21 10
      hadoop-auth/Krb5HiveConnectionServiceImpl.java
  10. 4 17
      hadoop-auth/PooledDataSource.java
  11. 10 14
      hadoop-auth/SimpleDataSource.java
  12. 12 24
      hadoop-auth/SimpleHBaseConnectionServiceImpl.java
  13. 14 11
      hadoop-auth/SimpleHiveConnectionServiceImpl.java
  14. 35 0
      hadoop-principal/HadoopPrincipal.java
  15. 87 0
      hadoop-principal/HadoopPrincipalFactory.java
  16. 86 0
      hadoop-principal/KerborsHadoopPrincipal.java
  17. 42 0
      hadoop-principal/NonHadoopPrincipal.java
  18. 77 36
      java-sftp-ftp/FtpClientHandler.java
  19. 162 0
      java-sftp-ftp/HdfsClientHandler.java
  20. 159 0
      java-sftp-ftp/JschSftpClientHandler.java
  21. 46 24
      java-sftp-ftp/LocalFsClientHandler.java
  22. 44 10
      java-sftp-ftp/SftpClientHandler.java
  23. 32 0
      java-sftp-ftp/X11ClientHandler.java
  24. 175 0
      maven-pom-ant/damp-pom-targz.xml
  25. 451 0
      rsa-key/RSAUtils.java
  26. 74 0
      serializer/CompressProtobufKryoSerializer.java
  27. 41 0
      serializer/CompressSerializer.java
  28. 77 0
      serializer/ExternalSerializer.java
  29. 64 0
      serializer/GZipSerializer.java
  30. 93 0
      serializer/JSONFixSerializer.java
  31. 83 0
      serializer/JSONSerializer.java
  32. 91 0
      serializer/JacksonSerializer.java
  33. 65 0
      serializer/JdkSerializer.java
  34. 99 0
      serializer/KryoSerializer.java
  35. 83 0
      serializer/ProtobufClassSerializer.java
  36. 49 0
      serializer/ProtobufKryoSerializer.java
  37. 116 0
      serializer/ProtobufSerializer.java
  38. 45 0
      serializer/Serializer.java
  39. 43 0
      serializer/SnappySerializer.java
  40. 124 0
      serializer/StringSerializer.java
  41. 47 0
      serializer/WritableKryoSerializer.java
  42. 79 0
      serializer/WritableSerializer.java
  43. 64 0
      serializer/ZipSerializer.java

+ 5 - 5
hadoop-auth/AuthPrincipalCreator.java

@@ -50,7 +50,7 @@ public final class AuthPrincipalCreator {
      * 采用 EOS 8 的外置目录
      * @return
      */
-    public static AuthPrincipalCreator useDataReleaseConf(String basePathFirst) {
+    public static AuthPrincipalCreator useExtractorConf(String basePathFirst) {
         // 采用 传来的 地址
         String externalDir = basePathFirst;
         if(StringUtils.isNotBlank(basePathFirst) && new File(basePathFirst).exists()) {
@@ -58,16 +58,16 @@ public final class AuthPrincipalCreator {
             return new AuthPrincipalCreator(externalDir);
         }
 
-        // 不存在则使用 datarelease_home 下的 conf/principal 的地址
-        externalDir = System.getenv("DATARELEASE_HOME");
+        // 不存在则使用extractor_home 下的 conf/principal 的地址
+        externalDir = System.getenv("EXTRACTOR_HOME");
         if(StringUtils.isBlank(externalDir)) {
-            externalDir = System.getProperty("datarelease.home");
+            externalDir = System.getProperty("extractor.home");
         }
 
         if(StringUtils.isBlank(externalDir)) {
             externalDir = "./";
         }
-        String principalBasePath = new File(externalDir, "conf/principal").getAbsolutePath();
+        String principalBasePath = new File(externalDir, "config/principal").getAbsolutePath();
         log.info("use principal dir: {}", principalBasePath);
         return new AuthPrincipalCreator(principalBasePath);
     }

+ 13 - 9
hadoop-auth/FIHiveConnectionServiceImpl.java

@@ -1,13 +1,13 @@
 package com.primeton.dsp.datarelease.data.bdata;
 
 
-import com.primeton.dsp.datarelease.server.model.DspHiveResource;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 
 import java.sql.Connection;
 import java.sql.SQLException;
+import java.util.Properties;
 import java.util.Set;
 
 /**
@@ -33,26 +33,30 @@ public class FIHiveConnectionServiceImpl implements HiveConnectionService {
     /**
      * Hive 数据源
      */
-    final DspHiveResource hiveResource;
-
+    final Properties hiveResource;
 
 
+    /**
+     * 认证文件所在的基础目录
+     */
+    final String authBasePath;
 
-    private HiveHelper hiveHelper;
 
 
+    private HiveHelper hiveHelper;
 
-    public FIHiveConnectionServiceImpl(DspHiveResource hiveResource) {
-        this.hiveResource = hiveResource;
+    public FIHiveConnectionServiceImpl(Properties params) {
+        this.hiveResource = params;
+        this.authBasePath = params.getProperty("authBasePath");
     }
 
+
     @Override
     public boolean doAuth() {
-        // 认证传过来
-        AuthPrincipalCreator authPrincipalCreator = AuthPrincipalCreator.useDataReleaseConf(hiveResource.getAuthBasePath());
+        AuthPrincipalCreator authPrincipalCreator = AuthPrincipalCreator.useExtractorConf(authBasePath);
         Set<String> principals = authPrincipalCreator.listPrincipals();
         log.info("find existed principals: {}", principals);
-        AuthPrincipal kerberosPrincipal = authPrincipalCreator.getKerberosPrincipal(hiveResource.getHiveDbUser());
+        AuthPrincipal kerberosPrincipal = authPrincipalCreator.getKerberosPrincipal(hiveResource.getProperty("hiveDbUser"));
 
         String userKeytabFile = kerberosPrincipal.getUserKeytabFile().getAbsolutePath();
         String krb5File = kerberosPrincipal.getKrb5File().getAbsolutePath();

+ 18 - 16
hadoop-auth/FiHBaseConnectionServiceImpl.java

@@ -1,7 +1,6 @@
-package com.primeton.dsp.datarelease.data.bdata;
+package com.primeton.damp.bigdata;
 
 
-import com.primeton.dsp.datarelease.server.model.DspHbaseResource;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -14,6 +13,7 @@ import org.apache.hadoop.hbase.exceptions.HBaseException;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.Properties;
 import java.util.Set;
 
 /**
@@ -45,7 +45,7 @@ public class FiHBaseConnectionServiceImpl implements HBaseConnectionService, Clo
     /**
      * Hive 数据源
      */
-    final DspHbaseResource hbaseResource;
+    final Properties hbaseResource;
 
     /**
      * HBase 链接
@@ -53,17 +53,19 @@ public class FiHBaseConnectionServiceImpl implements HBaseConnectionService, Clo
     Connection connection;
 
 
-    public FiHBaseConnectionServiceImpl(DspHbaseResource hbaseResource) {
+    public FiHBaseConnectionServiceImpl(Properties hbaseResource) {
         this.hbaseResource = hbaseResource;
     }
 
     @Override
     public boolean doAuth() {
         //KrbUser = "hadoop/cdh-node1@HADOOP.COM";
-        AuthPrincipalCreator authPrincipalCreator = AuthPrincipalCreator.useDataReleaseConf(hbaseResource.getAuthBasePath());
+        AuthPrincipalCreator authPrincipalCreator = AuthPrincipalCreator.useExtractorConf(hbaseResource.getProperty("authBasePath"));
         Set<String> principals = authPrincipalCreator.listPrincipals();
         log.info("find existed principals: {}", principals);
-        AuthPrincipal kerberosPrincipal = authPrincipalCreator.getKerberosPrincipal(hbaseResource.getAuthUser());
+
+        String authUser = hbaseResource.getProperty("authUser");
+        AuthPrincipal kerberosPrincipal = authPrincipalCreator.getKerberosPrincipal(authUser);
 
         String userKeytab = kerberosPrincipal.getUserKeytabFile().getAbsolutePath();
         String krb5File = kerberosPrincipal.getKrb5File().getAbsolutePath();
@@ -90,16 +92,16 @@ public class FiHBaseConnectionServiceImpl implements HBaseConnectionService, Clo
                 log.info("add config: {}", kerberosPrincipal.getHBaseSite().getAbsolutePath());
             }
             hbaseConf.reloadConfiguration();
-            /*
-             * Huawei Fi Hbase,认证
-             *
-             * if need to connect zk, please provide jaas info about zk. of course,
-             * you can do it as below:
-             * System.setProperty("java.security.auth.login.config", confDirPath +
-             * "jaas.conf"); but the demo can help you more : Note: if this process
-             * will connect more than one zk cluster, the demo may be not proper. you
-             * can contact us for more help
-             */
+        /*
+         * Huawei Fi Hbase,认证
+         *
+         * if need to connect zk, please provide jaas info about zk. of course,
+         * you can do it as below:
+         * System.setProperty("java.security.auth.login.config", confDirPath +
+         * "jaas.conf"); but the demo can help you more : Note: if this process
+         * will connect more than one zk cluster, the demo may be not proper. you
+         * can contact us for more help
+         */
 
             LoginUtil.setJaasConf(ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME, krbUser, userKeytab);
             LoginUtil.setZookeeperServerPrincipal(ZOOKEEPER_SERVER_PRINCIPAL_KEY,

+ 9 - 6
hadoop-auth/HBaseConnectionFactory.java

@@ -1,9 +1,11 @@
-package com.primeton.dsp.datarelease.data.bdata;
+package com.primeton.damp.bigdata;
 
-import com.primeton.dsp.datarelease.server.model.DspHbaseResource;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang.StringUtils;
 
+import java.util.Properties;
+import java.util.Set;
+
 /**
  *
  * 根据参数不同,生成不动的测试连接实例
@@ -27,12 +29,13 @@ public class HBaseConnectionFactory {
      * 生成不同的测试实例
      * @return
      */
-    public static HBaseConnectionService getHBaseInstance(DspHbaseResource hbaseResource) {
-        String authUser = hbaseResource.getAuthUser();
-        if(StringUtils.isBlank(authUser) || "noauth".equalsIgnoreCase(hbaseResource.getAuthType())) {
+    public static HBaseConnectionService getHBaseInstance(Properties hbaseResource) {
+        String authUser = hbaseResource.getProperty("authUser");
+        String authType = hbaseResource.getProperty("authType");
+        if("noauth".equalsIgnoreCase(authType) || StringUtils.isBlank(authUser) || "default".equalsIgnoreCase(authUser)) {
             // 无需认证
             return new SimpleHBaseConnectionServiceImpl(hbaseResource);
-        } else if("kerberos".equalsIgnoreCase(hbaseResource.getAuthType())){
+        } else if("kerberos".equalsIgnoreCase(authType)){
             // kerberos 认证
             return new Krb5HBaseConnectionServiceImpl(hbaseResource);
         } else {

+ 6 - 6
hadoop-auth/HiveConnectionFactory.java

@@ -1,9 +1,10 @@
 package com.primeton.dsp.datarelease.data.bdata;
 
-import com.primeton.dsp.datarelease.server.model.DspHiveResource;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang.StringUtils;
 
+import java.util.Properties;
+
 /**
  *
  * 根据参数不同,生成不动的测试连接实例
@@ -27,11 +28,10 @@ public class HiveConnectionFactory {
      * 生成不同的测试实例
      * @return
      */
-    public static HiveConnectionService getHiveInstance(DspHiveResource hiveResource) {
-	    String type = hiveResource.getCollectionType();
-        String authType = hiveResource.getAuthType();
-        String hiveDbUser = hiveResource.getHiveDbUser();
-        if(StringUtils.isBlank(hiveDbUser) || "noauth".equalsIgnoreCase(authType)) {
+    public static HiveConnectionService getHiveInstance(Properties hiveResource) {
+        String authType = hiveResource.getProperty("authType");
+        String hiveDbUser = hiveResource.getProperty("hiveDbUser");
+		if("noauth".equalsIgnoreCase(authType)) {
             // 默认无认证的方式测试
             return new SimpleHiveConnectionServiceImpl(hiveResource);
         } else if ("kerberos".equalsIgnoreCase(authType) && StringUtils.isNotBlank(hiveDbUser)) {

+ 2 - 0
hadoop-auth/HiveHelper.java

@@ -22,6 +22,8 @@ import java.util.List;
 import java.util.Properties;
 
 
+
+
 /**
  * <pre>
  *

+ 2 - 0
hadoop-auth/KerberosUtil.java

@@ -26,6 +26,8 @@ public class KerberosUtil {
      */
     public static void loginKerberos(Configuration conf, String principal, String userKeytabFile, String krb5File) {
         System.setProperty("java.security.krb5.conf", krb5File);
+        conf.setBoolean("hadoop.security.authorization", true);
+        conf.set("hadoop.security.authentication", "kerberos");
         try {
             UserGroupInformation.setConfiguration(conf);
             UserGroupInformation.loginUserFromKeytab(principal, userKeytabFile);

+ 6 - 5
hadoop-auth/Krb5HBaseConnectionServiceImpl.java

@@ -1,7 +1,6 @@
 package com.primeton.dsp.datarelease.data.bdata;
 
 
-import com.primeton.dsp.datarelease.server.model.DspHbaseResource;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -14,6 +13,7 @@ import org.apache.hadoop.hbase.exceptions.HBaseException;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.Properties;
 import java.util.Set;
 
 /**
@@ -39,7 +39,7 @@ public class Krb5HBaseConnectionServiceImpl implements HBaseConnectionService, C
     /**
      * Hive 数据源
      */
-    final DspHbaseResource hbaseResource;
+    final Properties hbaseResource;
 
     /**
      * HBase 链接
@@ -47,17 +47,18 @@ public class Krb5HBaseConnectionServiceImpl implements HBaseConnectionService, C
     Connection connection;
 
 
-    public Krb5HBaseConnectionServiceImpl(DspHbaseResource hbaseResource) {
+    public Krb5HBaseConnectionServiceImpl(Properties hbaseResource) {
         this.hbaseResource = hbaseResource;
     }
 
     @Override
     public boolean doAuth() {
         //KrbUser = "hadoop/cdh-node1@HADOOP.COM";
-        AuthPrincipalCreator authPrincipalCreator = AuthPrincipalCreator.useDataReleaseConf(hbaseResource.getAuthBasePath());
+        log.info("hbase 开始 kerberos 认证。");
+        AuthPrincipalCreator authPrincipalCreator = AuthPrincipalCreator.useExtractorConf(hbaseResource.getProperty("authBasePath"));
         Set<String> principals = authPrincipalCreator.listPrincipals();
         log.info("find existed principals: {}", principals);
-        AuthPrincipal kerberosPrincipal = authPrincipalCreator.getKerberosPrincipal(hbaseResource.getAuthUser());
+        AuthPrincipal kerberosPrincipal = authPrincipalCreator.getKerberosPrincipal(hbaseResource.getProperty("authUser"));
 
         String userKeytab = kerberosPrincipal.getUserKeytabFile().getAbsolutePath();
         String krb5File = kerberosPrincipal.getKrb5File().getAbsolutePath();

+ 21 - 10
hadoop-auth/Krb5HiveConnectionServiceImpl.java

@@ -1,7 +1,6 @@
-package com.primeton.dsp.datarelease.data.bdata;
+package com.primeton.damp.bigdata;
 
 
-import com.primeton.dsp.datarelease.server.model.DspHiveResource;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -9,6 +8,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.SQLException;
+import java.util.Properties;
 import java.util.Set;
 
 /**
@@ -34,30 +34,41 @@ public class Krb5HiveConnectionServiceImpl implements HiveConnectionService {
     /**
      * Hive 数据源
      */
-    final DspHiveResource hiveResource;
+    final Properties params;
 
 
 
+    /**
+     * 认证文件所在的基础目录
+     */
+    final String authBasePath;
+
+
     String hiveUrl;
 
-    public Krb5HiveConnectionServiceImpl(DspHiveResource hiveResource) {
-        this.hiveResource = hiveResource;
+
+    public Krb5HiveConnectionServiceImpl(Properties params) {
+        this.params = params;
+        this.authBasePath = params.getProperty("authBasePath");
     }
 
+
     @Override
     public boolean doAuth() {
         //KrbUser = "hadoop/cdh-node1@HADOOP.COM";
-        // 认证传过来
-        AuthPrincipalCreator authPrincipalCreator = AuthPrincipalCreator.useDataReleaseConf(hiveResource.getAuthBasePath());
+        log.info("hive 开始 kerberos 认证。");
+        AuthPrincipalCreator authPrincipalCreator = AuthPrincipalCreator.useExtractorConf(authBasePath);
         Set<String> principals = authPrincipalCreator.listPrincipals();
         log.info("find existed principals: {}", principals);
-        AuthPrincipal kerberosPrincipal = authPrincipalCreator.getKerberosPrincipal(hiveResource.getHiveDbUser());
+        AuthPrincipal kerberosPrincipal = authPrincipalCreator.getKerberosPrincipal(params.getProperty("hiveDbUser"));
 
         String userKeytab = kerberosPrincipal.getUserKeytabFile().getAbsolutePath();
         String krb5File = kerberosPrincipal.getKrb5File().getAbsolutePath();
         String krbUser = kerberosPrincipal.getPrincipal();
-        StringBuffer buffer = new StringBuffer(hiveResource.getHiveUrl());
-        buffer.append(";principal=").append(krbUser);
+        StringBuffer buffer = new StringBuffer(params.getProperty("hiveUrl"));
+        if(!buffer.toString().contains(";principal=")) {
+            buffer.append(";principal=").append(krbUser);
+        }
         hiveUrl = buffer.toString();
         log.info("HIVE_URL : " + hiveUrl);
 

+ 4 - 17
hadoop-auth/PooledDataSource.java

@@ -1,20 +1,4 @@
-/*******************************************************************************************
- *	Copyright (c) 2016, zzg.zhou(11039850@qq.com)
- * 
- *  Monalisa is free software: you can redistribute it and/or modify
- *	it under the terms of the GNU Lesser General Public License as published by
- *	the Free Software Foundation, either version 3 of the License, or
- *	(at your option) any later version.
-
- *	This program is distributed in the hope that it will be useful,
- *	but WITHOUT ANY WARRANTY; without even the implied warranty of
- *	MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- *	GNU Lesser General Public License for more details.
-
- *	You should have received a copy of the GNU Lesser General Public License
- *	along with this program.  If not, see <http://www.gnu.org/licenses/>.
- *******************************************************************************************/
-package com.primeton.dsp.datarelease.data.bdata;
+package com.primeton.damp.bigdata;
 
 
 import java.io.Closeable;
@@ -23,6 +7,9 @@ import java.util.Properties;
 import javax.sql.DataSource;
 
 /**
+ *
+ * Hive DAtaSource 实现
+ *
  *  @author zhaopx
  */
 public interface PooledDataSource extends DataSource, Closeable {

+ 10 - 14
hadoop-auth/SimpleDataSource.java

@@ -1,4 +1,4 @@
-package com.primeton.dsp.datarelease.data.bdata;
+package com.primeton.damp.bigdata;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -52,19 +52,13 @@ public class SimpleDataSource implements PooledDataSource {
 	}
 
 	private void initConnections(Properties poolProperties) {
-
 		log.info("Initializing simple data source{ pool.max = " + maxSize + ", pool.min = " + minSize + "}");
 		semaphore = new Semaphore(maxSize, false);
-
 		if (minSize > 0 && minSize < maxSize) {
 			try {
-				List<Connection> connections = new ArrayList<Connection>();
-				for (int i = 0; i < minSize; i++) {
-					connections.add(getConnection());
-				}
-				for (Connection conn : connections) {
-					conn.close();
-				}
+				// 尝试获得连接
+				Connection conn = getConnection();
+				conn.close();
 			} catch (SQLException e) {
 				throw new RuntimeException(e);
 			}
@@ -82,6 +76,7 @@ public class SimpleDataSource implements PooledDataSource {
 		if(ex != null) {
 			throw new IOException(ex);
 		}
+		log.info("closed data source{ pool.max = " + maxSize + ", pool.min = " + minSize + "}");
 	}
 	
 	private void closeConnection(Connection realConnection) throws SQLException {
@@ -108,9 +103,10 @@ public class SimpleDataSource implements PooledDataSource {
 			if (!pool.isEmpty()) {
 				Connection realConn = pool.keySet().iterator().next();
 				pool.remove(realConn);
-				  
-				realConn.setAutoCommit(true);
-				 
+
+				// hive jdbc 不支持设置  AutoCommit
+				//realConn.setAutoCommit(true);
+
 				return getProxyConnection(realConn);
 			}
 		}
@@ -152,7 +148,7 @@ public class SimpleDataSource implements PooledDataSource {
 		try {
 			return connectionService.getConnection();
 		} catch (Exception e) {
-			throw new RuntimeException(e);
+			throw new SQLException(e);
 		}
 	}
 

+ 12 - 24
hadoop-auth/SimpleHBaseConnectionServiceImpl.java

@@ -1,7 +1,6 @@
-package com.primeton.dsp.datarelease.data.bdata;
+package com.primeton.damp.bigdata;
 
 
-import com.primeton.dsp.datarelease.server.model.DspHbaseResource;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -14,8 +13,9 @@ import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.exceptions.HBaseException;
 
 import java.io.Closeable;
+import java.io.File;
 import java.io.IOException;
-import java.util.Set;
+import java.util.Properties;
 
 /**
  *
@@ -40,7 +40,7 @@ public class SimpleHBaseConnectionServiceImpl implements HBaseConnectionService,
     /**
      * Hive 数据源
      */
-    final DspHbaseResource hbaseResource;
+    final Properties hbaseResource;
 
 
     /**
@@ -49,13 +49,14 @@ public class SimpleHBaseConnectionServiceImpl implements HBaseConnectionService,
     final Connection connection;
 
 
-    public SimpleHBaseConnectionServiceImpl(DspHbaseResource hbaseResource) {
+    public SimpleHBaseConnectionServiceImpl(Properties hbaseResource) {
         this.hbaseResource = hbaseResource;
 
+        String authBasePath = hbaseResource.getProperty("authBasePath");
         Configuration hbaseConf = null;
-        if(StringUtils.isNotBlank(hbaseResource.getAuthUser())) {
-            AuthPrincipalCreator authPrincipalCreator = AuthPrincipalCreator.useDataReleaseConf(hbaseResource.getAuthBasePath());
-            AuthPrincipal kerberosPrincipal = authPrincipalCreator.getKerberosPrincipal(hbaseResource.getAuthUser());
+        if(StringUtils.isNotBlank(authBasePath)) {
+            AuthPrincipalCreator authPrincipalCreator = AuthPrincipalCreator.useExtractorConf(hbaseResource.getProperty("authBasePath"));
+            AuthPrincipal kerberosPrincipal = authPrincipalCreator.getKerberosPrincipal(hbaseResource.getProperty("authUser"));
 
             // 分别加载 core、hdfs、hbase site 文件
             Configuration conf = new Configuration();
@@ -81,11 +82,11 @@ public class SimpleHBaseConnectionServiceImpl implements HBaseConnectionService,
             } catch (Exception e) {
                 throw new IllegalStateException(e);
             }
-        } else {
-            hbaseConf = getConf();
         }
-
         try {
+            // 连接时,只尝试 5 次
+            hbaseConf.setInt("zookeeper.recovery.retry", 3);
+            hbaseConf.setInt("hbase.client.retries.number", 2);
             this.connection = ConnectionFactory.createConnection(hbaseConf);
         } catch (Exception e) {
             throw new IllegalStateException(e);
@@ -93,19 +94,6 @@ public class SimpleHBaseConnectionServiceImpl implements HBaseConnectionService,
     }
 
 
-    public Configuration getConf() {
-        Configuration conf = HBaseConfiguration.create(new Configuration()); //配置类
-        if(StringUtils.isNotBlank(hbaseResource.getHbaseMaster())) conf.set("hbase.master", hbaseResource.getHbaseMaster());
-        if(StringUtils.isNotBlank(hbaseResource.getHbaseZookeeperQuorum())) conf.set("hbase.zookeeper.quorum", hbaseResource.getHbaseZookeeperQuorum());
-        if(StringUtils.isNotBlank(hbaseResource.getZookeeperClientPort())) conf.set("hbase.zookeeper.property.clientPort", hbaseResource.getZookeeperClientPort());
-        if(StringUtils.isNotBlank(hbaseResource.getZnode())) conf.set("zookeeper.znode.parent", hbaseResource.getZnode());
-
-        conf.setInt("hbase.client.retries.number", 2);
-
-        return conf;
-
-    }
-
     @Override
     public boolean doAuth() {
         log.info("hbase 无需认证,通过。");

+ 14 - 11
hadoop-auth/SimpleHiveConnectionServiceImpl.java

@@ -1,13 +1,13 @@
-package com.primeton.dsp.datarelease.data.bdata;
+package com.primeton.damp.bigdata;
 
 
-import com.primeton.dsp.datarelease.server.model.DspHiveResource;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang.StringUtils;
 
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.SQLException;
+import java.util.Properties;
 
 /**
  *
@@ -32,22 +32,22 @@ public class SimpleHiveConnectionServiceImpl implements HiveConnectionService {
     /**
      * Hive 数据源
      */
-    final DspHiveResource hiveResource;
+    final Properties params;;
 
 
-    public SimpleHiveConnectionServiceImpl(DspHiveResource hiveResource) {
-        this.hiveResource = hiveResource;
+    public SimpleHiveConnectionServiceImpl(Properties params) {
+        this.params = params;
     }
 
     @Override
     public Connection getConnection() throws SQLException {
         String hUrl = "jdbc:hive2://ip:port/default";
-        if(StringUtils.isNotBlank(hiveResource.getHiveUrl())) {
-            hUrl = hiveResource.getHiveUrl();
+        if(StringUtils.isNotBlank(params.getProperty("hiveUrl"))) {
+            hUrl = params.getProperty("hiveUrl");
         } else {
-            hUrl = hUrl.replace("ip", hiveResource.getHiveIp());
-            hUrl = hUrl.replace("port", hiveResource.getHivePort()+"");
-            hUrl = hUrl.replace("default", hiveResource.getHiveDbName());
+            hUrl = hUrl.replace("ip", params.getProperty("hiveIp"));
+            hUrl = hUrl.replace("port", params.getProperty("hivePort"));
+            hUrl = hUrl.replace("default", params.getProperty("hiveDbName"));
         }
 
         log.info("测试连接:{}", hUrl);
@@ -56,6 +56,9 @@ public class SimpleHiveConnectionServiceImpl implements HiveConnectionService {
         } catch (ClassNotFoundException e) {
             throw new SQLException("找不到Hive驱动:org.apache.hive.jdbc.HiveDriver.", e);
         }
-        return DriverManager.getConnection(hUrl, hiveResource.getHiveDbUser() , hiveResource.getHivePassword());
+        String hiveDbUser = params.getProperty("hiveDbUser");
+        String hivePassword = params.getProperty("hivePassword");
+        log.info("extract hive url: {} use user: {}:******", hUrl, hiveDbUser);
+        return DriverManager.getConnection(hUrl, hiveDbUser, hivePassword);
     }
 }

+ 35 - 0
hadoop-principal/HadoopPrincipal.java

@@ -0,0 +1,35 @@
+package com.yiidata.amc.common.principal;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * <pre>
+ *
+ * Created by zhenqin.
+ * User: zhenqin
+ * Date: 2018/4/23
+ * Time: 10:15
+ * Vendor: yiidata.com
+ * To change this template use File | Settings | File Templates.
+ *
+ * </pre>
+ *
+ * @author zhenqin
+ */
+public abstract class HadoopPrincipal {
+
+
+
+    public Configuration getConf(String... res){
+        return getPrincipalConf(new Configuration(), res);
+    }
+
+
+    /**
+     * 获取认证的配置
+     * @param conf Hadoop Conf
+     * @param res 多个 site xml 信息
+     * @return
+     */
+    public abstract Configuration getPrincipalConf(Configuration conf, String... res);
+}

+ 87 - 0
hadoop-principal/HadoopPrincipalFactory.java

@@ -0,0 +1,87 @@
+package com.yiidata.amc.common.principal;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ *
+ *
+ * <pre>
+ *
+ * Created by zhenqin.
+ * User: zhenqin
+ * Date: 2018/4/23
+ * Time: 10:43
+ * Vendor: yiidata.com
+ * To change this template use File | Settings | File Templates.
+ *
+ * </pre>
+ *
+ * @author zhenqin
+ */
+public class HadoopPrincipalFactory {
+
+
+    /**
+     * Hadoop 的认证方式
+     */
+    public static final String HADOOP_SECUROTY_AUTH = "hadoop.security.auth";
+
+
+    /**
+     * 无认证的值
+     */
+    public static final String HADOOP_SECUROTY_AUTH_NONE = "none";
+
+
+
+
+    private static Logger LOG = LoggerFactory.getLogger(HadoopPrincipal.class);
+
+
+
+    /**
+     * 获取 加密模式
+     * @param userPrincipal
+     * @param userKeytabPath
+     * @param krb5ConfPath
+     * @return
+     */
+    public static HadoopPrincipal getHadoopPrincipal(
+            final String userPrincipal,
+            final String userKeytabPath,
+            final String krb5ConfPath) {
+        if(StringUtils.isBlank(userPrincipal)) {
+            LOG.info("use HadoopPrincipal class: " + NonHadoopPrincipal.class.getName());
+            return new NonHadoopPrincipal();
+        }
+        LOG.info("use HadoopPrincipal class: " + KerborsHadoopPrincipal.class.getName());
+        return new KerborsHadoopPrincipal(userPrincipal, userKeytabPath, krb5ConfPath);
+    }
+
+
+
+
+
+    /**
+     * 获取 加密模式
+     * @param conf Hadoop Conf
+     * @return
+     */
+    public static HadoopPrincipal getHadoopPrincipal(Configuration conf) {
+        String auth = conf.get(HADOOP_SECUROTY_AUTH, HADOOP_SECUROTY_AUTH_NONE);
+        if(StringUtils.isBlank(auth) || HADOOP_SECUROTY_AUTH_NONE.equals(auth)) {
+            LOG.info("use HadoopPrincipal class: " + NonHadoopPrincipal.class.getName());
+            return new NonHadoopPrincipal();
+        }
+
+        LOG.info("use HadoopPrincipal class: " + KerborsHadoopPrincipal.class.getName());
+        final String userPrincipal = conf.get("hadoop.security.auth.principal");
+        final String userKeytabPath = conf.get("hadoop.security.auth.keytab");
+        final String krb5ConfPath = conf.get("hadoop.security.auth.krb5Path");
+        return new KerborsHadoopPrincipal(userPrincipal, userKeytabPath, krb5ConfPath);
+    }
+}

+ 86 - 0
hadoop-principal/KerborsHadoopPrincipal.java

@@ -0,0 +1,86 @@
+package com.yiidata.amc.common.principal;
+
+import com.yiidata.amc.common.utils.LoginUtil;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ *
+ * 无加密的
+ *
+ *
+ * <pre>
+ *
+ * Created by zhenqin.
+ * User: zhenqin
+ * Date: 2018/4/23
+ * Time: 10:15
+ * Vendor: yiidata.com
+ * To change this template use File | Settings | File Templates.
+ *
+ * </pre>
+ *
+ * @author zhenqin
+ */
+public class KerborsHadoopPrincipal extends HadoopPrincipal {
+
+    public final String ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME = "Client";
+
+    public final String ZOOKEEPER_SERVER_PRINCIPAL_KEY = "zookeeper.api.principal";
+
+    public final String zkServerPrincipal = "zookeeper/hadoop.hadoop.com";
+
+
+    final String userPrincipal;
+    final String userKeytabPath;
+    final String krb5ConfPath;
+
+
+    final static Logger LOG = LoggerFactory.getLogger(KerborsHadoopPrincipal.class);
+
+
+    public KerborsHadoopPrincipal(String userPrincipal, String userKeytabPath, String krb5ConfPath) {
+        this.userPrincipal = userPrincipal;
+        this.userKeytabPath = userKeytabPath;
+        this.krb5ConfPath = krb5ConfPath;
+    }
+
+    /**
+     * 无加密的 Conf
+     * @param conf 配置
+     * @return 返回加密的 Hadoop Conf
+     */
+    @Override
+    public Configuration getPrincipalConf(Configuration conf, String... res) {
+        for (String re : res) {
+            if(StringUtils.isBlank(re)) {
+                continue;
+            }
+            LOG.info("add {} conf resource to hadoop config.", re);
+            conf.addResource(re);
+        }
+        try {
+            LOG.info("try to setup hadoop kerberos auth.");
+            LoginUtil.setJaasConf(ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME, userPrincipal, userKeytabPath);
+            LoginUtil.setZookeeperServerPrincipal(ZOOKEEPER_SERVER_PRINCIPAL_KEY, zkServerPrincipal);
+            LoginUtil.login(userPrincipal, userKeytabPath, krb5ConfPath, conf);
+            LOG.info("setup hadoop kerberos auth success.");
+        } catch (IOException e) {
+            throw new IllegalStateException(e);
+        }
+        return conf;
+    }
+
+
+    public static void main(String[] args) {
+        HadoopPrincipal hadoopPrincipal = new KerborsHadoopPrincipal("user_shisuo",
+                "F:/Deploy/sichuan-update/user.keytab",
+                "F:/Deploy/sichuan-update/krb5.conf");
+        Configuration conf = hadoopPrincipal.getConf();
+        System.out.println(conf);
+    }
+}

+ 42 - 0
hadoop-principal/NonHadoopPrincipal.java

@@ -0,0 +1,42 @@
+package com.yiidata.amc.common.principal;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ *
+ * 无加密的
+ *
+ *
+ * <pre>
+ *
+ * Created by zhenqin.
+ * User: zhenqin
+ * Date: 2018/4/23
+ * Time: 10:15
+ * Vendor: yiidata.com
+ * To change this template use File | Settings | File Templates.
+ *
+ * </pre>
+ *
+ * @author zhenqin
+ */
+public class NonHadoopPrincipal extends HadoopPrincipal {
+
+
+    /**
+     * 无加密的 Conf
+     * @param conf 配置
+     * @return
+     */
+    @Override
+    public Configuration getPrincipalConf(Configuration conf, String... res) {
+        for (String re : res) {
+            if(StringUtils.isBlank(re)) {
+                continue;
+            }
+            conf.addResource(re);
+        }
+        return conf;
+    }
+}

+ 77 - 36
java-sftp-ftp/FtpClientHandler.java

@@ -1,5 +1,6 @@
 package com.primeton.damp.fileclient;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.commons.net.ftp.FTPClient;
 import org.apache.commons.net.ftp.FTPFile;
 import org.apache.commons.net.ftp.FTPReply;
@@ -19,7 +20,7 @@ import java.util.stream.Collectors;
 
 
 /**
- * Created by hadoop on 2018/8/9.
+ * Created by zhaopx on 2018/8/9.
  */
 public class FtpClientHandler implements X11ClientHandler {
 
@@ -78,12 +79,11 @@ public class FtpClientHandler implements X11ClientHandler {
      * @return
      * @throws IOException
      */
+    @Override
     public InputStream readFile(String file) throws IOException {
         return ftpClient.retrieveFileStream(file);
     }
 
-
-
     /**
      * 读取文件流
      * @param file
@@ -99,9 +99,15 @@ public class FtpClientHandler implements X11ClientHandler {
         }
     }
 
-    public boolean renameFile(String file, String newFileName) {
+    @Override
+    public boolean rename(String file, String newFileName) {
         try {
-            return ftpClient.rename(file, newFileName);
+            final Path path1 = Paths.get(file);
+            Path fileP = path1.getParent();
+            if(fileP == null) {
+                fileP = Paths.get("/");
+            }
+            return ftpClient.rename(file, Paths.get(fileP.toString(), newFileName).toString());
         } catch (IOException e) {
             logger.error("Error in rename ftp file.", e);
             return false;
@@ -112,16 +118,49 @@ public class FtpClientHandler implements X11ClientHandler {
      * 删除文件, 返回 true 则删除成功
      * @param fileName
      */
+    @Override
     public boolean deleteFile(String fileName) {
+        if("/".equals(fileName)) {
+            return false;
+        }
+        final Path path1 = Paths.get(fileName);
+        Path fileP = path1.getParent();
+        if(fileP == null) {
+            fileP = Paths.get("/");
+        }
         try {
-            return ftpClient.deleteFile(fileName);
+            final FTPFile[] ftpFiles = ftpClient.listFiles(fileP.toString());
+            for (FTPFile ftpFile : ftpFiles) {
+                if (StringUtils.equals(path1.getFileName().toString(), ftpFile.getName())) {
+                    deleteFile(fileP.toString(), ftpFile);
+                    return true;
+                }
+            }
         } catch (IOException e) {
-            logger.info("Delete file fail.", e);
+            throw new RuntimeException(e);
         }
         return false;
     }
 
 
+    /**
+     * 递归删除文件和文件夹
+     * @param file
+     */
+    private void deleteFile(String basePath, FTPFile file) throws IOException {
+        String prefixPath = basePath.endsWith("/") ? basePath : basePath + "/";
+        if(file.isDirectory()) {
+            FTPFile[] ftpFiles = ftpClient.listFiles(file.getName());
+            for (FTPFile ftpFile : ftpFiles) {
+                deleteFile(prefixPath + file.getName(), ftpFile);
+            }
+            ftpClient.removeDirectory(prefixPath + file.getName());
+        } else {
+            ftpClient.deleteFile(prefixPath + file.getName());
+        }
+    }
+
+
 
     /**
      * 判断目录是否存在
@@ -130,9 +169,21 @@ public class FtpClientHandler implements X11ClientHandler {
      */
     @Override
     public boolean existsDir(String path) {
+        if("/".equals(path)) {
+            return true;
+        }
         try {
-            FTPFile mdtmFile = ftpClient.mlistFile(path);
-            return mdtmFile != null && mdtmFile.isDirectory();
+            final Path path1 = Paths.get(path);
+            Path fileP = path1.getParent();
+            if(fileP == null) {
+                fileP = Paths.get("/");
+            }
+            final FTPFile[] ftpFiles = ftpClient.listFiles(fileP.toString());
+            for (FTPFile ftpFile : ftpFiles) {
+                if(StringUtils.equals(path1.getFileName().toString(), ftpFile.getName())) {
+                    return ftpFile.isDirectory();
+                }
+            }
         } catch (IOException e) {
             logger.error(e.getMessage());
         }
@@ -148,11 +199,23 @@ public class FtpClientHandler implements X11ClientHandler {
      */
     @Override
     public boolean existsFile(String path) {
+        if("/".equals(path)) {
+            return false;
+        }
         try {
-            FTPFile mdtmFile = ftpClient.mlistFile(path);
-            return mdtmFile != null && mdtmFile.isFile();
+            final Path path1 = Paths.get(path);
+            Path fileP = path1.getParent();
+            if(fileP == null) {
+                fileP = Paths.get("/");
+            }
+            final FTPFile[] ftpFiles = ftpClient.listFiles(fileP.toString());
+            for (FTPFile ftpFile : ftpFiles) {
+                if(StringUtils.equals(path1.getFileName().toString(), ftpFile.getName())) {
+                    return ftpFile.isFile();
+                }
+            }
         } catch (IOException e) {
-        	logger.error(e.getMessage());
+            logger.error(e.getMessage());
         }
         return false;
     }
@@ -205,9 +268,8 @@ public class FtpClientHandler implements X11ClientHandler {
         try {
             return ftpClient.makeDirectory(dirName);
         } catch (IOException e) {
-            logger.info("mkdir " + dirName + " fail.", e);
+            throw new RuntimeException("mkdir " + dirName + " fail.", e);
         }
-        return false;
     }
 
     /**
@@ -242,28 +304,7 @@ public class FtpClientHandler implements X11ClientHandler {
         String username = "admin";
         String password = "primeton000000";
         FtpClientHandler handler = new FtpClientHandler(host, port, username, password, map);
-//        handler.changeWorkingDirectory("/sms");
-        List<Path> result = handler.getChildren("/DAMP");
-        for (Path ftpFile : result) {
-            System.out.println(ftpFile.toString());
-        }
-
-        /*
-        OutputStream out = handler.writeFile("/hello.txt", false);
-        IOUtils.copy(new FileInputStream("pom.xml"), out);
-        out.flush();
-        out.close();
-        */
-
-        System.out.println(handler.exists("/"));
-        System.out.println(handler.exists("/hello.txt"));
-        System.out.println(handler.exists("/example_data_1225"));
-        System.out.println(handler.exists("/world/example_data_1225"));
-
-        if(!handler.existsDir("/world/example_data_1225")) {
-            System.out.println(handler.mkdirs("/world/example_data_1225"));
-        }
-        System.out.println(handler.exists("/world/example_data_1225"));
+        System.out.println(handler.deleteFile("/test"));
 
     }
 

+ 162 - 0
java-sftp-ftp/HdfsClientHandler.java

@@ -0,0 +1,162 @@
+package com.primeton.damp.fileclient;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+/**
+ * <pre>
+ *
+ * Created by zhaopx.
+ * User: zhaopx
+ * Date: 2021/4/1
+ * Time: 17:55
+ *
+ * </pre>
+ *
+ * @author zhaopx
+ */
+public class HdfsClientHandler implements X11ClientHandler {
+
+    protected static Logger logger = LoggerFactory.getLogger("HdfsClientHandler");
+
+
+    /**
+     * 执行路径的基础路径
+     */
+    private final String basePath;
+
+
+    /**
+     * hadoop 文件系统
+     */
+    final FileSystem fs;
+
+
+    public HdfsClientHandler(FileSystem fs) {
+        this(fs, System.getProperty("user.name"));
+    }
+
+    public HdfsClientHandler(FileSystem fs, String basePath) {
+        this.fs = fs;
+        this.basePath = basePath;
+    }
+
+    @Override
+    public void reconnect(Properties params) throws IOException {
+
+    }
+
+    @Override
+    public OutputStream writeFile(String file, boolean overwrite) throws IOException {
+        if(overwrite) {
+            return fs.create(new org.apache.hadoop.fs.Path(file), true);
+        }
+        return fs.create(new org.apache.hadoop.fs.Path(file), false);
+    }
+
+    @Override
+    public InputStream readFile(String file) throws IOException {
+        return fs.open(new org.apache.hadoop.fs.Path(file));
+    }
+
+    @Override
+    public boolean rename(String file, String newFileName) {
+        final org.apache.hadoop.fs.Path oldFile = new org.apache.hadoop.fs.Path(file);
+        final org.apache.hadoop.fs.Path newFile = new org.apache.hadoop.fs.Path(oldFile.getParent(), newFileName);
+        try {
+            return fs.rename(oldFile, newFile);
+        } catch (IOException e) {
+            throw new RuntimeException("mkdirs " + file + " error!", e);
+        }
+    }
+
+    @Override
+    public boolean deleteFile(String path) {
+        final org.apache.hadoop.fs.Path file = new org.apache.hadoop.fs.Path(path);
+        try {
+            if (!fs.exists(file)) {
+                return false;
+            }
+            return fs.delete(file, true);
+        } catch (Exception e) {
+            throw new RuntimeException("delete " + path + " error!", e);
+        }
+    }
+
+    @Override
+    public boolean mkdir(String path) {
+        return mkdirs(path);
+    }
+
+    @Override
+    public boolean mkdirs(String path) {
+        try {
+            return fs.mkdirs(new org.apache.hadoop.fs.Path(path));
+        } catch (IOException e) {
+            throw new RuntimeException("mkdirs " + path + " error!", e);
+        }
+    }
+
+    @Override
+    public List<Path> getChildren(String ftpPath) {
+        final org.apache.hadoop.fs.Path file = new org.apache.hadoop.fs.Path(ftpPath);
+        try {
+            if(fs.exists(file) && fs.getFileStatus(file).isDirectory()) {
+                return Arrays.stream(fs.listStatus(file)).map(it-> Paths.get(it.toString())).collect(Collectors.toList());
+            }
+        } catch (Exception e) {
+            logger.error("getChildren " + ftpPath + " error!", e);
+        }
+        return Collections.emptyList();
+    }
+
+    @Override
+    public boolean exists(String path) {
+        final org.apache.hadoop.fs.Path file = new org.apache.hadoop.fs.Path(path);
+        try {
+            return fs.exists(file);
+        } catch (Exception e) {
+            logger.error("exists " + path + " error!", e);
+        }
+        return false;
+    }
+
+    @Override
+    public boolean existsDir(String path) {
+        final org.apache.hadoop.fs.Path file = new org.apache.hadoop.fs.Path(path);
+        try {
+            return fs.exists(file) && fs.getFileStatus(file).isDirectory();
+        } catch (Exception e) {
+            logger.error("existsDir " + path + " error!", e);
+        }
+        return false;
+    }
+
+    @Override
+    public boolean existsFile(String path) {
+        final org.apache.hadoop.fs.Path file = new org.apache.hadoop.fs.Path(path);
+        try {
+            return fs.exists(file) && fs.getFileStatus(file).isFile();
+        } catch (Exception e) {
+            logger.error("existsFile " + path + " error!", e);
+        }
+        return false;
+    }
+
+    @Override
+    public void close() throws IOException {
+
+    }
+}

+ 159 - 0
java-sftp-ftp/JschSftpClientHandler.java

@@ -0,0 +1,159 @@
+package com.primeton.damp.fileclient;
+
+import com.jcraft.jsch.Channel;
+import com.jcraft.jsch.ChannelSftp;
+import com.jcraft.jsch.JSch;
+import com.jcraft.jsch.Session;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * <pre>
+ *
+ * Created by zhaopx.
+ * User: zhaopx
+ * Date: 2021/3/31
+ * Time: 10:54
+ *
+ * </pre>
+ *
+ * @author zhaopx
+ */
+public class JschSftpClientHandler implements X11ClientHandler {
+
+
+    protected static Logger log = LoggerFactory.getLogger("JschSftpClientHandler");
+
+
+    private final String sftpHost;
+    private int sftpPort = 22;
+    private final String ftpUsername;
+    private String ftpPassword;
+    private String privateKey = "/Users/zhenqin/.ssh/id_rsa";
+
+
+    private  ChannelSftp channelSftp;
+
+    public JschSftpClientHandler(String sftpHost, int sftpPort, String ftpUsername, String ftpPassword) {
+        this.sftpHost = sftpHost;
+        this.sftpPort = sftpPort;
+        this.ftpUsername = ftpUsername;
+        this.ftpPassword = ftpPassword;
+    }
+
+    @Override
+    public void reconnect(Properties params) throws IOException {
+        JSch jsch = new JSch();
+        try {
+            /*
+            if (Files.exists(Paths.get(privateKey))) {
+                jsch.addIdentity(privateKey, "123456");
+                if (StringUtils.isNotBlank(ftpPassword)) {
+                    jsch.addIdentity(privateKey, ftpPassword);
+                } else {
+
+                }
+            }
+                */
+            Session session = jsch.getSession(ftpUsername, sftpHost, sftpPort);
+            session.setPassword(ftpPassword);
+            session.setConfig("StrictHostKeyChecking", "no");
+            session.connect(CONNECT_TIMEOUT);
+            // 创建sftp通信通道
+            Channel channel = session.openChannel("sftp");
+            channel.connect(CONNECT_TIMEOUT);
+            log.info("Channel created to " + sftpHost + ".");
+            channelSftp = (ChannelSftp) channel;
+        }  catch (Exception e) {
+            if(e instanceof IOException) {
+                throw (IOException)e;
+            }
+            throw new IOException(e);
+        }
+    }
+
+    @Override
+    public OutputStream writeFile(String file, boolean overwrite) throws IOException {
+        return null;
+    }
+
+    @Override
+    public InputStream readFile(String file) throws IOException {
+        return null;
+    }
+
+    @Override
+    public boolean rename(String file, String newFileName) {
+        return false;
+    }
+
+    @Override
+    public boolean deleteFile(String path) {
+        return false;
+    }
+
+    @Override
+    public boolean mkdir(String path) {
+        return false;
+    }
+
+    @Override
+    public boolean mkdirs(String path) {
+        return false;
+    }
+
+    @Override
+    public List<Path> getChildren(String ftpPath) {
+        return null;
+    }
+
+    @Override
+    public boolean exists(String path) {
+        return false;
+    }
+
+    @Override
+    public boolean existsDir(String path) {
+        return false;
+    }
+
+    @Override
+    public boolean existsFile(String path) {
+        return false;
+    }
+
+    @Override
+    public void close() throws IOException {
+        if(channelSftp != null) {
+            channelSftp.disconnect();
+        }
+    }
+
+
+    public static void main(String[] args) throws IOException {
+        Properties map = new Properties();
+        String host = "192.168.30.143";
+        int port = 22;
+        String username = "root";
+        String password = "admin-123456";
+
+        try(JschSftpClientHandler handler = new JschSftpClientHandler(host, port, username, password)) {
+            handler.reconnect(map);
+            List<Path> result = handler.getChildren("/root/software/damp");
+            for (Path path : result) {
+                System.out.println(path.toString());
+            }
+
+        }
+    }
+}

+ 46 - 24
java-sftp-ftp/LocalFsClientHandler.java

@@ -1,10 +1,12 @@
 package com.primeton.damp.fileclient;
 
-import org.apache.commons.compress.utils.IOUtils;
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.file.FileSystem;
 import java.nio.file.FileSystems;
@@ -33,6 +35,10 @@ import java.util.stream.Collectors;
 public class LocalFsClientHandler implements X11ClientHandler {
 
 
+
+    protected static Logger logger = LoggerFactory.getLogger("LocalFsClientHandler");
+
+
     /**
      * 执行路径的基础路径
      */
@@ -60,6 +66,44 @@ public class LocalFsClientHandler implements X11ClientHandler {
         return new FileOutputStream(fsPath.toFile(), !overwrite);
     }
 
+    @Override
+    public InputStream readFile(String file) throws IOException {
+        return Files.newInputStream(fs.getPath(basePath, file));
+    }
+
+
+    @Override
+    public boolean rename(String file, String newFileName) {
+        try {
+            Path path1 = fs.getPath(file);
+            Path path = fs.getPath(path1.getParent().toString(), newFileName);
+            Files.move(path1, path);
+            logger.info("rename {} to {}", path1.toString(), path.toString());
+            return true;
+        } catch (Exception e) {
+            logger.error("Error in rename ftp file.", e);
+            return false;
+        }
+    }
+
+
+    @Override
+    public boolean deleteFile(String path) {
+        final Path fsPath = fs.getPath(basePath, path);
+        try {
+            if (Files.exists(fsPath) && Files.isDirectory(fsPath)) {
+                // 删除文件夹,连带文件夹一起删除,包括其下的子文件
+                FileUtils.deleteDirectory(fsPath.toFile());
+            } else {
+                // 删除文件
+                Files.delete(fsPath);
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+        return true;
+    }
+
     @Override
     public boolean mkdir(String path) {
         final Path fsPath = fs.getPath(basePath, path);
@@ -112,26 +156,4 @@ public class LocalFsClientHandler implements X11ClientHandler {
     public void close() throws IOException {
 
     }
-
-
-    //test
-    public static void main(String[] args) throws IOException {
-        String basePath = "/Volumes/Media/Primeton/CODE/damp/damp-server/manual";
-        LocalFsClientHandler handler = new LocalFsClientHandler(basePath);
-        List<Path> result = handler.getChildren("/");
-        for (Path ftpFile : result) {
-            System.out.println(ftpFile.toString());
-        }
-
-        System.out.println(handler.exists("/"));
-        System.out.println(handler.exists("/hello.txt"));
-        System.out.println(handler.exists("/example_data_1225"));
-        System.out.println(handler.exists("/world/example_data_1225"));
-
-        if(!handler.existsDir("/world/example_data_1225")) {
-            System.out.println(handler.mkdirs("/world/example_data_1225"));
-        }
-        System.out.println(handler.exists("/world/example_data_1225"));
-
-    }
 }

+ 44 - 10
java-sftp-ftp/SftpClientHandler.java

@@ -6,9 +6,19 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.OutputStream;
-import java.nio.file.*;
-import java.util.*;
+import java.nio.file.DirectoryStream;
+import java.nio.file.FileSystem;
+import java.nio.file.FileSystemAlreadyExistsException;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
 
 import static java.nio.file.StandardOpenOption.APPEND;
 import static java.nio.file.StandardOpenOption.CREATE;
@@ -69,7 +79,7 @@ public class SftpClientHandler implements X11ClientHandler {
 
 
     /**
-     * 读取文件流
+     * 文件流
      * @param file
      * @return
      * @throws IOException
@@ -85,12 +95,27 @@ public class SftpClientHandler implements X11ClientHandler {
     }
 
 
+    /**
+     * 读取文件流
+     * @param file
+     * @return
+     * @throws IOException
+     */
+    @Override
+    public InputStream readFile(String file) throws IOException {
+        Path path1 = fs.getPath(file);
+        return Files.newInputStream(path1, StandardOpenOption.READ);
+    }
+
+
 
-    public boolean renameFile(String file, String newFileName) {
+    @Override
+    public boolean rename(String file, String newFileName) {
         try {
             Path path1 = fs.getPath(file);
-            Path path = Paths.get(path1.getParent().toString(), newFileName);
-            Files.move(path, path1);
+            Path path = fs.getPath(path1.getParent().toString(), newFileName);
+            Files.move(path1, path);
+            logger.info("rename {} to {}", path1.toString(), path.toString());
             return true;
         } catch (Exception e) {
             logger.error("Error in rename ftp file.", e);
@@ -104,13 +129,22 @@ public class SftpClientHandler implements X11ClientHandler {
      * @param path
      */
     public boolean deleteFile(String path) {
+        final Path fsPath = fs.getPath(path);
         try {
-            Path path1 = fs.getPath(path);
-            Files.delete(path1);
+            if (Files.exists(fsPath) && Files.isDirectory(fsPath)) {
+                // 删除文件夹,连带文件夹一起删除,包括其下的子文件
+                Files.list(fsPath).forEach(it-> deleteFile(it.toString()));
+                Files.delete(fsPath);
+                logger.info("delete dir: {}", path);
+            } else {
+                // 删除文件
+                Files.delete(fsPath);
+                logger.info("delete file: {}", path);
+            }
         } catch (Exception e) {
-            logger.info("Delete file fail.", e);
+            throw new RuntimeException(e);
         }
-        return false;
+        return true;
     }
 
 

+ 32 - 0
java-sftp-ftp/X11ClientHandler.java

@@ -2,6 +2,7 @@ package com.primeton.damp.fileclient;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.file.Path;
 import java.util.List;
@@ -18,6 +19,12 @@ import java.util.Properties;
 public interface X11ClientHandler extends Closeable {
 
 
+    /**
+     * 连接超时事件
+     */
+    public final static int CONNECT_TIMEOUT = 30_000;
+
+
     /**
      * 根据 配置参数,重新连接 ftp
      * @param params
@@ -36,6 +43,31 @@ public interface X11ClientHandler extends Closeable {
     public OutputStream writeFile(String file, boolean overwrite) throws IOException;
 
 
+    /**
+     * 读取文件
+     * @param file
+     * @return
+     * @throws IOException
+     */
+    public InputStream readFile(String file) throws IOException;
+
+
+    /**
+     * 重命名文件或者文件夹。总会吧最后一个 path 的资源重命名
+     * @param file 路径
+     * @param newFileName 新的文件名
+     * @return
+     */
+    public boolean rename(String file, String newFileName);
+
+
+    /**
+     * 删除目录或文件
+     * @param path
+     * @return
+     */
+    public boolean deleteFile(String path);
+
     /**
      * 创建目录,, 如 Linux: mkdir /dir/path, 如 /dir 不存在则报错
      * @param path

+ 175 - 0
maven-pom-ant/damp-pom-targz.xml

@@ -0,0 +1,175 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>com.primeton.damp</groupId>
+        <artifactId>damp-server</artifactId>
+        <version>7.0.0</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>boot</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>com.primeton.damp</groupId>
+            <artifactId>core</artifactId>
+            <version>${damp.version}</version>
+            <exclusions>
+                <exclusion>
+                    <artifactId>commons-io</artifactId>
+                    <groupId>commons-io</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>springfox-swagger2</artifactId>
+                    <groupId>io.springfox</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>springfox-swagger-ui</artifactId>
+                    <groupId>io.springfox</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>guava</artifactId>
+                    <groupId>com.google.guava</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>com.primeton.damp</groupId>
+            <artifactId>damp</artifactId>
+            <version>${damp.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.mybatis.generator</groupId>
+            <artifactId>mybatis-generator-core</artifactId>
+            <version>${mybatis-generator.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>tk.mybatis</groupId>
+            <artifactId>mapper-generator</artifactId>
+            <version>1.0.0</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.jasig.cas.client</groupId>
+            <artifactId>cas-client-core</artifactId>
+            <version>3.5.0</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <finalName>dampServer-${project.version}</finalName>
+        <plugins>
+            <!-- 打包插件 -->
+            <plugin>
+                <groupId>org.springframework.boot</groupId>
+                <artifactId>spring-boot-maven-plugin</artifactId>
+                <configuration>
+                    <includeSystemScope>true</includeSystemScope>
+                </configuration>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>repackage</goal>
+                        </goals>
+                        <configuration>
+                            <mainClass>com.primeton.damp.Application</mainClass>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <configuration>
+                    <source>${java.version}</source>
+                    <!-- 指定JDK编译版本 -->
+                    <target>${java.version}</target>
+                </configuration>
+            </plugin>
+
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <version>2.4</version>
+                <configuration>
+                    <descriptors>
+                        <descriptor>${project.basedir}/src/main/assembly/assembly.xml</descriptor>
+                    </descriptors>
+                    <finalName>Primeton_DQMS${project.version}_Server</finalName>
+                    <archive>
+                        <manifest>
+                            <mainClass/>
+                        </manifest>
+                    </archive>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-antrun-plugin</artifactId>
+                <version>1.8</version>
+                <executions>
+                    <execution>
+                        <phase>compile</phase>
+                        <goals>
+                            <goal>run</goal>
+                        </goals>
+                        <configuration>
+                            <exportAntProperties>true</exportAntProperties>
+                            <propertyPrefix>damp</propertyPrefix>
+                            <tasks>
+                                <echo message="rm dist directory" />
+                                <!-- 删除指定目录下的所有文件和目录,包括指定目录-->
+                                <delete includeEmptyDirs="true">
+                                    <fileset dir="${project.build.directory}/dist"/>
+                                </delete>
+
+                                <echo message="mkdir dist directory" />
+                                <mkdir dir="${project.build.directory}/dist"/>
+
+                                <echo message="copy maven dependencies" />
+                                <exec  executable="mvn">
+                                    <arg line=" dependency:copy-dependencies "/>
+                                </exec>
+
+                                <tar longfile="gnu"
+                                     destfile="${project.build.directory}/dist/${project.build.finalName}-bin.tar.gz" compression="gzip">
+                                    <tarfileset dir="${project.build.directory}/dependency" filemode="755" dirmode="755" prefix="${project.build.finalName}/lib">
+                                        <include name="*.jar"/>
+                                    </tarfileset>
+                                    <tarfileset dir="${project.basedir}/src/main/scripts" filemode="755" dirmode="755" prefix="${project.build.finalName}" >
+                                        <include name="*.sh"/>
+                                    </tarfileset>
+                                    <tarfileset dir="${project.basedir}/src/main/scripts" filemode="755" dirmode="755" prefix="${project.build.finalName}">
+                                        <include name="*.bat"/>
+                                    </tarfileset>
+                                    <tarfileset dir="${project.basedir}/src/main/resources" filemode="755" dirmode="755" prefix="${project.build.finalName}/conf">
+                                    </tarfileset>
+                                    <tarfileset dir="${project.basedir}/../manual/resourceTemplate" filemode="755" dirmode="755" prefix="${project.build.finalName}/resourceTemplate">
+                                    </tarfileset>
+                                    <tarfileset dir="${project.basedir}/../manual/dataCheck" filemode="755" dirmode="755" prefix="${project.build.finalName}/dataCheck">
+                                    </tarfileset>
+                                </tar>
+                            </tasks>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+
+        </plugins>
+    </build>
+</project>

+ 451 - 0
rsa-key/RSAUtils.java

@@ -0,0 +1,451 @@
+package com.sdyc.datahub.api.utils;
+
+/**
+ * <pre>
+ *
+ * Created by zhenqin.
+ * User: zhenqin
+ * Date: 17/1/10
+ * Time: 14:41
+ * Vendor: NowledgeData
+ * To change this template use File | Settings | File Templates.
+ *
+ * </pre>
+ *
+ * @author zhenqin
+ */
+
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.io.IOUtils;
+import org.bouncycastle.jce.provider.BouncyCastleProvider;
+import org.bouncycastle.openssl.PEMReader;
+
+import javax.crypto.Cipher;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileReader;
+import java.nio.ByteBuffer;
+import java.security.*;
+import java.security.interfaces.RSAPrivateKey;
+import java.security.interfaces.RSAPublicKey;
+import java.security.spec.PKCS8EncodedKeySpec;
+import java.security.spec.X509EncodedKeySpec;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * <p>
+ * RSA公钥/私钥/签名工具包
+ * <p>
+ * 字符串格式的密钥在未在特殊说明情况下都为BASE64编码格式<br/>
+ * 由于非对称加密速度极其缓慢,一般文件不使用它来加密而是使用对称加密,<br/>
+ * 非对称加密算法可以用来对对称加密的密钥加密,这样保证密钥的安全也就保证了数据的安全
+ * </p>
+ *
+ */
+public class RSAUtils {
+
+
+    /**
+     * 加密算法RSA
+     */
+    public static final String KEY_ALGORITHM = "RSA";
+
+    /**
+     * 签名算法
+     */
+    public static final String SIGNATURE_ALGORITHM = "MD5withRSA";
+
+    /**
+     * 获取公钥的key
+     */
+    public static final String PUBLIC_KEY = "LocatorPublicKey";
+
+    /**
+     * 获取私钥的key
+     */
+    public static final String PRIVATE_KEY = "LocatorPrivateKey";
+
+    /**
+     * RSA最大加密明文大小
+     */
+    public static final int MAX_ENCRYPT_BLOCK = 1024;
+
+    /**
+     * RSA最大解密密文大小
+     */
+    public static final int MAX_DECRYPT_BLOCK = 128;
+
+
+    static {
+        Security.addProvider(new BouncyCastleProvider());
+    }
+
+
+
+    static class CipherXX {
+        final Cipher decryptCipher;
+        final Cipher encryptCihper;
+
+        CipherXX(Key key) {
+            try {
+                KeyFactory keyFactory = KeyFactory.getInstance(KEY_ALGORITHM);
+                if(key instanceof PublicKey) {
+                    X509EncodedKeySpec x509KeySpec = new X509EncodedKeySpec(key.getEncoded());
+                    Key publicK = keyFactory.generatePublic(x509KeySpec);
+                    decryptCipher = Cipher.getInstance(keyFactory.getAlgorithm());
+                    decryptCipher.init(Cipher.DECRYPT_MODE, publicK);
+
+                    // 对数据加密
+                    encryptCihper = Cipher.getInstance(keyFactory.getAlgorithm());
+                    encryptCihper.init(Cipher.ENCRYPT_MODE, publicK);
+                } else {
+                    PKCS8EncodedKeySpec pkcs8KeySpec = new PKCS8EncodedKeySpec(key.getEncoded());
+                    Key privateK = keyFactory.generatePrivate(pkcs8KeySpec);
+
+                    decryptCipher = Cipher.getInstance(keyFactory.getAlgorithm());
+                    decryptCipher.init(Cipher.DECRYPT_MODE, privateK);
+
+                    encryptCihper = Cipher.getInstance(keyFactory.getAlgorithm());
+                    encryptCihper.init(Cipher.ENCRYPT_MODE, privateK);
+                }
+            } catch (Exception e) {
+                throw new IllegalStateException(e);
+            }
+        }
+
+
+        public Cipher getDecryptCipher() {
+            return decryptCipher;
+        }
+
+        public Cipher getEncryptCihper() {
+            return encryptCihper;
+        }
+    }
+
+
+    /**
+     * Thread Local
+     */
+    protected final static ThreadLocal<Map> THREAD_LOCAL = new ThreadLocal<Map>();
+
+
+    private RSAUtils() {
+
+    }
+
+
+
+    private static Cipher getCipher(Key key, int mode) throws Exception {
+        Map<Key, CipherXX> KEY_CIPHER_MAP = THREAD_LOCAL.get();
+        boolean put = false;
+        if(KEY_CIPHER_MAP == null) {
+            KEY_CIPHER_MAP = new HashMap<Key, CipherXX>(3);
+            put = true;
+        }
+
+        CipherXX cipher = KEY_CIPHER_MAP.get(key);
+        if(cipher == null) {
+            cipher = new CipherXX(key);
+            KEY_CIPHER_MAP.put(key, cipher);
+        }
+
+        if(put) THREAD_LOCAL.set(KEY_CIPHER_MAP);
+        return mode == Cipher.ENCRYPT_MODE ? cipher.getEncryptCihper() : cipher.getDecryptCipher();
+    }
+
+
+
+    /**
+     * <p>
+     * 生成密钥对(公钥和私钥)
+     * </p>
+     *
+     * @return
+     * @throws Exception
+     */
+    public static Map<String, Object> genKeyPair() throws Exception {
+        KeyPairGenerator keyPairGen = KeyPairGenerator.getInstance(KEY_ALGORITHM);
+        keyPairGen.initialize(1024);
+        KeyPair keyPair = keyPairGen.generateKeyPair();
+        PublicKey publicKey = (RSAPublicKey) keyPair.getPublic();
+        PrivateKey privateKey = (RSAPrivateKey) keyPair.getPrivate();
+        Map<String, Object> keyMap = new HashMap<String, Object>(2);
+        keyMap.put(PUBLIC_KEY, publicKey);
+        keyMap.put(PRIVATE_KEY, privateKey);
+        return keyMap;
+    }
+
+
+
+    /**
+     * <p>
+     * 生成密钥对(公钥和私钥)
+     * </p>
+     *
+     * @return
+     * @throws Exception
+     */
+    public static Map<String, Object> genFileKeyPair(File privateKeyFile) throws Exception {
+        PEMReader reader = null;
+        try {
+            reader = new PEMReader(new FileReader(privateKeyFile), null);
+            KeyPair keyPair = (KeyPair) reader.readObject();
+
+            PublicKey publicKey = keyPair.getPublic();
+            PrivateKey privateKey = keyPair.getPrivate();
+
+            Map<String, Object> keyMap = new HashMap<String, Object>(2);
+            keyMap.put(PUBLIC_KEY, publicKey);
+            keyMap.put(PRIVATE_KEY, privateKey);
+            return keyMap;
+        } finally {
+            IOUtils.closeQuietly(reader);
+        }
+    }
+
+
+
+    /**
+     * <p>
+     * 生成密钥对(公钥和私钥)
+     * </p>
+     *
+     * @return
+     * @throws Exception
+     */
+    public static PublicKey genPublicKey(File publicKeyFile) throws Exception {
+        PEMReader reader =  null;
+        try {
+            reader =  new PEMReader(new FileReader(publicKeyFile), null);
+            return  (PublicKey) reader.readObject();
+        } finally {
+            IOUtils.closeQuietly(reader);
+        }
+    }
+
+
+
+
+    /**
+     * <p>
+     * 用私钥对信息生成数字签名
+     * </p>
+     *
+     * @param data 已加密数据
+     * @param privateKey 私钥(BASE64编码)
+     *
+     * @return
+     * @throws Exception
+     */
+    public static String sign(byte[] data, PrivateKey privateKey) throws Exception {
+        PKCS8EncodedKeySpec pkcs8KeySpec = new PKCS8EncodedKeySpec(privateKey.getEncoded());
+        KeyFactory keyFactory = KeyFactory.getInstance(KEY_ALGORITHM);
+        PrivateKey privateK = keyFactory.generatePrivate(pkcs8KeySpec);
+        Signature signature = Signature.getInstance(SIGNATURE_ALGORITHM);
+        signature.initSign(privateK);
+        signature.update(data);
+        return Base64.encodeBase64String(signature.sign());
+    }
+
+    /**
+     * <p>
+     * 校验数字签名
+     * </p>
+     *
+     * @param data 已加密数据
+     * @param publicKey 公钥(BASE64编码)
+     * @param sign 数字签名
+     *
+     * @return
+     * @throws Exception
+     *
+     */
+    public static boolean verify(byte[] data, PublicKey publicKey, String sign)
+            throws Exception {
+        X509EncodedKeySpec keySpec = new X509EncodedKeySpec(publicKey.getEncoded());
+        KeyFactory keyFactory = KeyFactory.getInstance(KEY_ALGORITHM);
+        PublicKey publicK = keyFactory.generatePublic(keySpec);
+        Signature signature = Signature.getInstance(SIGNATURE_ALGORITHM);
+        signature.initVerify(publicK);
+        signature.update(data);
+        return signature.verify(Base64.decodeBase64(sign));
+    }
+
+    /**
+     * <P>
+     * 私钥解密
+     * </p>
+     *
+     * @param encryptedData 已加密数据
+     * @param privateKey 私钥(BASE64编码)
+     * @return
+     * @throws Exception
+     */
+    public static byte[] decryptByPrivateKey(byte[] encryptedData, PrivateKey privateKey)
+            throws Exception {
+        Cipher cipher = getCipher(privateKey, Cipher.DECRYPT_MODE);
+        int inputLen = encryptedData.length;
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        int offSet = 0;
+        byte[] cache;
+        int i = 0;
+        // 对数据分段解密
+        while (inputLen - offSet > 0) {
+            if (inputLen - offSet > MAX_DECRYPT_BLOCK) {
+                cache = cipher.doFinal(encryptedData, offSet, MAX_DECRYPT_BLOCK);
+            } else {
+                cache = cipher.doFinal(encryptedData, offSet, inputLen - offSet);
+            }
+            out.write(cache, 0, cache.length);
+            i++;
+            offSet = i * MAX_DECRYPT_BLOCK;
+        }
+        byte[] decryptedData = out.toByteArray();
+        out.close();
+        return decryptedData;
+    }
+
+    /**
+     * <p>
+     * 公钥解密
+     * </p>
+     *
+     * @param encryptedData 已加密数据
+     * @param publicKey 公钥(BASE64编码)
+     * @return
+     * @throws Exception
+     */
+    public static byte[] decryptByPublicKey(byte[] encryptedData, PublicKey publicKey)
+            throws Exception {
+        Cipher cipher = getCipher(publicKey, Cipher.DECRYPT_MODE);
+
+        int inputLen = encryptedData.length;
+        ByteBuffer buffer = ByteBuffer.allocate(MAX_ENCRYPT_BLOCK);
+        int offSet = 0;
+        byte[] cache;
+        int i = 0;
+        // 对数据分段解密
+        while (inputLen - offSet > 0) {
+            if (inputLen - offSet > MAX_DECRYPT_BLOCK) {
+                cache = cipher.doFinal(encryptedData, offSet, MAX_DECRYPT_BLOCK);
+            } else {
+                cache = cipher.doFinal(encryptedData, offSet, inputLen - offSet);
+            }
+            buffer.put(cache, 0, cache.length);
+            i++;
+            offSet = i * MAX_DECRYPT_BLOCK;
+        }
+        buffer.flip();
+        byte[] dst = new byte[buffer.limit()];
+        buffer.get(dst, 0, buffer.limit());
+        return dst;
+    }
+
+    /**
+     * <p>
+     * 公钥加密
+     * </p>
+     *
+     * @param data 源数据
+     * @param publicKey 公钥(BASE64编码)
+     * @return
+     * @throws Exception
+     */
+    public static byte[] encryptByPublicKey(byte[] data, PublicKey publicKey)
+            throws Exception {
+        Cipher cipher = getCipher(publicKey, Cipher.ENCRYPT_MODE);
+        int inputLen = data.length;
+        ByteBuffer buffer = ByteBuffer.allocate(MAX_ENCRYPT_BLOCK);
+        int offSet = 0;
+        byte[] cache;
+        int i = 0;
+        // 对数据分段加密
+        while (inputLen - offSet > 0) {
+            if (inputLen - offSet > MAX_ENCRYPT_BLOCK) {
+                cache = cipher.doFinal(data, offSet, MAX_ENCRYPT_BLOCK);
+            } else {
+                cache = cipher.doFinal(data, offSet, inputLen - offSet);
+            }
+            buffer.put(cache);
+            i++;
+            offSet = i * MAX_ENCRYPT_BLOCK;
+        }
+        buffer.flip();
+        byte[] dst = new byte[buffer.limit()];
+        buffer.get(dst, 0, buffer.limit());
+        return dst;
+    }
+
+    /**
+     * <p>
+     * 私钥加密
+     * </p>
+     *
+     * @param data 源数据
+     * @param privateKey 私钥(BASE64编码)
+     * @return
+     * @throws Exception
+     */
+    public static byte[] encryptByPrivateKey(byte[] data, PrivateKey privateKey)
+            throws Exception {
+        Cipher cipher = getCipher(privateKey, Cipher.ENCRYPT_MODE);
+        int inputLen = data.length;
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        int offSet = 0;
+        byte[] cache;
+        int i = 0;
+        // 对数据分段加密
+        while (inputLen - offSet > 0) {
+            if (inputLen - offSet > MAX_ENCRYPT_BLOCK) {
+                cache = cipher.doFinal(data, offSet, MAX_ENCRYPT_BLOCK);
+            } else {
+                cache = cipher.doFinal(data, offSet, inputLen - offSet);
+            }
+            out.write(cache, 0, cache.length);
+            i++;
+            offSet = i * MAX_ENCRYPT_BLOCK;
+        }
+        byte[] encryptedData = out.toByteArray();
+        out.close();
+        return encryptedData;
+    }
+
+
+    public static void main(String[] args) throws Exception {
+        String str = "srcbabababababab";
+        File privateKeyFile = new File("/Volumes/Media/Workspace/美亚DataHub/id_rsa.keystore");
+        File publicKeyFile = new File("/Volumes/Media/Workspace/美亚DataHub/id_rsa.keystore.pub");
+
+        Map<String, Object> keyPair = genFileKeyPair(privateKeyFile);
+        PrivateKey privateKey = (PrivateKey) keyPair.get(PRIVATE_KEY);
+        PublicKey publicKeyX = (PublicKey) keyPair.get(PUBLIC_KEY);
+
+        PublicKey publicKey = genPublicKey(publicKeyFile);
+
+        System.out.println("====== encoding by private key, decoding by public key======");
+        byte[] x = encryptByPrivateKey(str.getBytes(), privateKey);
+        System.out.println("encode: " + Base64.encodeBase64String(x));
+        System.out.println("decode: " + new String(decryptByPublicKey(x, publicKeyX)));
+        System.out.println("decode: " + new String(decryptByPublicKey(x, publicKey)));
+
+
+        System.out.println("====== encoding by public key, decoding by private key======");
+        x = encryptByPublicKey(str.getBytes(), publicKey);
+        System.out.println("encode: " + Base64.encodeBase64String(x));
+        System.out.println("decode: " + new String(decryptByPrivateKey(x, privateKey)));
+
+        System.out.println("====== encoding by public key, decoding by private key======");
+        x = encryptByPublicKey(str.getBytes(), publicKey);
+        System.out.println("encode: " + Base64.encodeBase64String(x));
+        System.out.println("encode: " + Base64.encodeBase64String(x));
+        System.out.println("encode: " + Base64.encodeBase64String(x));
+        System.out.println("decode: " + new String(decryptByPrivateKey(x, privateKey)));
+
+        System.out.println("======================");
+
+    }
+
+}

+ 74 - 0
serializer/CompressProtobufKryoSerializer.java

@@ -0,0 +1,74 @@
+package com.sdyc.ndmp.protobuf.serializer;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.protobuf.GeneratedMessage;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * <pre>
+ *
+ * Created by IntelliJ IDEA.
+ * User: zhenqin
+ * Date: 14/12/10
+ * Time: 09:55
+ * To change this template use File | Settings | File Templates.
+ *
+ * </pre>
+ *
+ * @author zhenqin
+ */
+public class CompressProtobufKryoSerializer extends com.esotericsoftware.kryo.Serializer<GeneratedMessage> {
+
+
+
+    /**
+     * 内部缓存
+     */
+    protected final static Map<Class<?>, Method> CLASS_METHOD_MAP = new HashMap<Class<?>, Method>(3);
+
+
+    public CompressProtobufKryoSerializer() {
+    }
+
+
+    @Override
+    public void write(Kryo kryo, Output output, GeneratedMessage object) {
+        byte[] bytes = object.toByteArray();
+        output.writeInt(bytes.length, false);
+        output.write(bytes);
+    }
+
+
+    @Override
+    public GeneratedMessage read(Kryo kryo, Input input, Class type) {
+        int i = input.readInt(false);
+        byte[] bytes = new byte[i];
+        input.read(bytes);
+        try {
+            return reflact(type, bytes);
+        } catch (Exception e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+
+
+    protected static GeneratedMessage reflact(Class aClass, byte[] args)
+            throws ClassNotFoundException, NoSuchMethodException,
+            InvocationTargetException, IllegalAccessException {
+        Method method = CLASS_METHOD_MAP.get(aClass);
+        if (method == null) {
+            method = aClass.getMethod("parseFrom", byte[].class);
+            CLASS_METHOD_MAP.put(aClass, method);
+        }
+        return (GeneratedMessage) method.invoke(null, args);
+    }
+
+}

+ 41 - 0
serializer/CompressSerializer.java

@@ -0,0 +1,41 @@
+package com.sdyc.ndmp.protobuf.serializer;
+
+/**
+ * <pre>
+ *
+ * Created by IntelliJ IDEA.
+ * User: zhenqin
+ * Date: 15/2/27
+ * Time: 10:58
+ * To change this template use File | Settings | File Templates.
+ *
+ * </pre>
+ *
+ * @author zhenqin
+ */
+public abstract class CompressSerializer implements Serializer<byte[]> {
+
+
+    public CompressSerializer() {
+    }
+
+
+    @Override
+    public byte[] serialize(byte[] bytes) {
+        return compress(bytes);
+    }
+
+    @Override
+    public byte[] deserialize(byte[] bytes) {
+        return uncompress(bytes);
+    }
+
+
+
+    public abstract byte[] compress(byte[] bytes);
+
+
+
+
+    public abstract byte[] uncompress(byte[] bytes);
+}

+ 77 - 0
serializer/ExternalSerializer.java

@@ -0,0 +1,77 @@
+/*
+ * Copyright 2010-2011 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.sdyc.ndmp.protobuf.serializer;
+
+
+import java.io.*;
+import java.io.Externalizable;
+
+/**
+ * Java Serialization Redis serializer.
+ * Delegates to the default (Java based) serializer input Spring 3.
+ *
+ * @author ZhenQin
+ */
+public class ExternalSerializer implements Serializer<Externalizable> {
+
+
+    /**
+     * 序列化表格
+     */
+    private static final long serialVersionUID = 1L;
+
+
+    protected final Class<Externalizable> objectClass;
+
+
+
+    public ExternalSerializer(Class<Externalizable> t) {
+        this.objectClass = t;
+    }
+
+
+    @Override
+    public Externalizable deserialize(byte[] bytes) {
+        try {
+            ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(bytes));
+            Externalizable object = objectClass.newInstance();
+            object.readExternal(in);
+            in.close();
+            return object;
+        } catch (Exception ex) {
+            throw new IllegalArgumentException("Cannot deserialize", ex);
+        }
+    }
+
+    @Override
+    public byte[] serialize(Externalizable object) {
+        if (object == null) {
+            return new byte[0];
+        }
+        try {
+            ByteArrayOutputStream out = new ByteArrayOutputStream();
+            ObjectOutputStream outputStream = new ObjectOutputStream(out);
+
+            object.writeExternal(outputStream);
+
+            outputStream.flush();
+            outputStream.close();
+            return out.toByteArray();
+        } catch (Exception ex) {
+            throw new IllegalArgumentException("Cannot serialize", ex);
+        }
+    }
+}

+ 64 - 0
serializer/GZipSerializer.java

@@ -0,0 +1,64 @@
+package com.sdyc.ndmp.protobuf.serializer;
+
+import org.apache.commons.compress.compressors.CompressorException;
+import org.apache.commons.compress.compressors.CompressorInputStream;
+import org.apache.commons.compress.compressors.CompressorOutputStream;
+import org.apache.commons.compress.compressors.CompressorStreamFactory;
+import org.apache.commons.io.IOUtils;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+/**
+ * <pre>
+ * Created with IntelliJ IDEA.
+ * User: lwj
+ * Date: 2015/2/27
+ * Time: 10:48
+ * To change this template use File | Settings | File Templates.
+ * </pre>
+ *
+ * @author lwj
+ */
+public class GZipSerializer extends CompressSerializer {
+
+
+    public GZipSerializer() {
+    }
+
+    @Override
+    public byte[] compress(byte[] bytes) {
+        CompressorInputStream inputStream = null;
+        try {
+            inputStream = new CompressorStreamFactory().createCompressorInputStream(
+                    CompressorStreamFactory.GZIP, new ByteArrayInputStream(bytes));
+            ByteArrayOutputStream out = new ByteArrayOutputStream();
+            IOUtils.copy(inputStream, out);
+            inputStream.close();
+            return out.toByteArray();
+        } catch (IOException e) {
+            throw new IllegalStateException(e);
+        } catch (CompressorException e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+
+
+    @Override
+    public byte[] uncompress(byte[] bytes) {
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        try {
+            CompressorOutputStream outputStream = new CompressorStreamFactory().createCompressorOutputStream(
+                    CompressorStreamFactory.GZIP, out);
+            IOUtils.copy(new ByteArrayInputStream(bytes), outputStream);
+            outputStream.close();
+            return out.toByteArray();
+        } catch (IOException e) {
+            throw new IllegalStateException(e);
+        } catch (CompressorException e) {
+            throw new IllegalStateException(e);
+        }
+    }
+}

+ 93 - 0
serializer/JSONFixSerializer.java

@@ -0,0 +1,93 @@
+/*
+ * Copyright 2010-2011 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.sdyc.ndmp.protobuf.serializer;
+
+import com.alibaba.fastjson.JSON;
+import com.google.common.primitives.Ints;
+
+
+/**
+ * Java Serialization Redis strserializer.
+ * Delegates to the default (Java based) strserializer in Spring 3.
+ * <p/>
+ * <pre>
+ *
+ * Created by IntelliJ IDEA.
+ * User: zhenqin
+ * Date: 13-11-13
+ * Time: 上午8:58
+ * To change this template use File | Settings | File Templates.
+ *
+ * </pre>
+ *
+ * @author zhenqin
+ */
+public class JSONFixSerializer<T> implements Serializer<T> {
+
+    private static final long serialVersionUID = 1L;
+
+    private final StringSerializer serializer;
+
+    public JSONFixSerializer() {
+        serializer = new StringSerializer();
+    }
+
+
+    public JSONFixSerializer(String charset) {
+        this.serializer = new StringSerializer(charset);
+    }
+
+
+    public JSONFixSerializer(StringSerializer serializer) {
+        this.serializer = serializer;
+    }
+
+
+    @Override
+    public byte[] serialize(T o) {
+        String clazz = o.getClass().getName();
+        byte[] classString = clazz.getBytes(serializer.getCharset());
+
+        int length = classString.length;
+        byte[] head = Ints.toByteArray(length);
+
+        byte[] body = serializer.serialize(JSON.toJSONString(o));
+        byte[] bytes = new byte[head.length + length + body.length];
+        System.arraycopy(head, 0, bytes, 0, head.length);
+        System.arraycopy(classString, 0, bytes, head.length, classString.length);
+        System.arraycopy(body, 0, bytes, head.length + length, body.length);
+        return bytes;
+    }
+
+
+    @Override
+    public T deserialize(byte[] bytes) {
+        byte[] head = new byte[4];
+        System.arraycopy(bytes, 0, head, 0, head.length);
+
+        int length = Ints.fromBytes(bytes[0], bytes[1], bytes[2], bytes[3]);
+
+        String classString = new String(bytes, 4, length);
+
+        try {
+            return (T) JSON.toJavaObject(
+                    JSON.parseObject(serializer.deserialize(bytes, 4 + length, bytes.length - 4 - length)),
+                    Class.forName(classString));
+        } catch (ClassNotFoundException e) {
+            throw new IllegalStateException(e);
+        }
+    }
+}

+ 83 - 0
serializer/JSONSerializer.java

@@ -0,0 +1,83 @@
+/*
+ * Copyright 2010-2011 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.sdyc.ndmp.protobuf.serializer;
+
+import com.alibaba.fastjson.JSON;
+
+
+/**
+ * Java Serialization Redis strserializer.
+ * Delegates to the default (Java based) strserializer in Spring 3.
+ *
+ * <pre>
+ *
+ * Created by IntelliJ IDEA.
+ * User: zhenqin
+ * Date: 13-11-13
+ * Time: 上午8:58
+ * To change this template use File | Settings | File Templates.
+ *
+ * </pre>
+ *
+ * @author zhenqin
+ */
+public class JSONSerializer<T> implements Serializer<T> {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * String byte
+     */
+    private StringSerializer serializer = new StringSerializer();
+
+
+    /**
+     * 类对象, 用于Object json to Object 的转换
+     */
+    private Class<? extends T> reverseClazz;
+
+
+    public JSONSerializer(Class<? extends T> clazz) {
+        this.reverseClazz = clazz;
+    }
+
+
+
+    public JSONSerializer(String classString) {
+        try {
+            this.reverseClazz = (Class)Class.forName(classString);
+        } catch (ClassNotFoundException e) {
+            throw new IllegalArgumentException(e);
+        }
+    }
+
+
+    @Override
+    public byte[] serialize(T o) {
+        return serializer.serialize(JSON.toJSONString(o));
+    }
+
+    @Override
+    public T deserialize(byte[] bytes) {
+        return JSON.toJavaObject(
+                JSON.parseObject(serializer.deserialize(bytes)), reverseClazz);
+    }
+
+
+    public Class<? extends T> getReverseClazz() {
+        return reverseClazz;
+    }
+}

+ 91 - 0
serializer/JacksonSerializer.java

@@ -0,0 +1,91 @@
+package com.sdyc.ndmp.protobuf.serializer;
+
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializationConfig;
+
+import java.io.IOException;
+
+/**
+ * <pre>
+ *
+ * Created by IntelliJ IDEA.
+ * User: zhenqin
+ * Date: 14/11/14
+ * Time: 13:08
+ * To change this template use File | Settings | File Templates.
+ *
+ * </pre>
+ *
+ * @author zhenqin
+ */
+public class JacksonSerializer<T> implements Serializer<T> {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * String byte
+     */
+    private StringSerializer serializer = new StringSerializer();
+
+
+    /**
+     * 类对象, 用于Object json to Object 的转换
+     */
+    private Class<? extends T> reverseClazz;
+
+
+    private ObjectMapper objectMapper;
+
+
+
+    public JacksonSerializer(String reverseClazz) {
+        try {
+            this.reverseClazz = (Class<T>) Class.forName(reverseClazz);
+        } catch (ClassNotFoundException e) {
+            throw new IllegalArgumentException(e);
+        }
+
+        objectMapper = new ObjectMapper();
+        objectMapper.configure(SerializationConfig.Feature.FAIL_ON_EMPTY_BEANS, false);
+        objectMapper.configure(SerializationConfig.Feature.WRITE_CHAR_ARRAYS_AS_JSON_ARRAYS, true);
+    }
+
+
+
+    public JacksonSerializer(Class<? extends T> clazz) {
+        this.reverseClazz = clazz;
+        objectMapper = new ObjectMapper();
+        objectMapper.configure(SerializationConfig.Feature.FAIL_ON_EMPTY_BEANS, false);
+        objectMapper.configure(SerializationConfig.Feature.WRITE_CHAR_ARRAYS_AS_JSON_ARRAYS, true);
+    }
+
+
+    @Override
+    public byte[] serialize(T t) {
+        try {
+            String s = objectMapper.writeValueAsString(t);
+            return serializer.serialize(s);
+        } catch (IOException e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    @Override
+    public T deserialize(byte[] bytes) {
+        try {
+            return objectMapper.readValue(serializer.deserialize(bytes), reverseClazz);
+        } catch (IOException e) {
+            throw new IllegalArgumentException(e);
+        }
+    }
+
+
+    public Class<? extends T> getReverseClazz() {
+        return reverseClazz;
+    }
+
+    public ObjectMapper getObjectMapper() {
+        return objectMapper;
+    }
+
+}

+ 65 - 0
serializer/JdkSerializer.java

@@ -0,0 +1,65 @@
+/*
+ * Copyright 2010-2011 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.sdyc.ndmp.protobuf.serializer;
+
+
+import java.io.*;
+
+/**
+ * Java Serialization Redis serializer.
+ * Delegates to the default (Java based) serializer input Spring 3.
+ *
+ * @author Mark Pollack
+ * @author Costin Leau
+ */
+public class JdkSerializer implements Serializer<Serializable> {
+
+    private static final long serialVersionUID = 1L;
+
+    public JdkSerializer() {
+
+    }
+
+
+    @Override
+    public Serializable deserialize(byte[] bytes) {
+        try {
+            ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(bytes));
+            Serializable o = (Serializable)in.readObject();
+            in.close();
+            return o;
+        } catch (Exception ex) {
+            throw new IllegalArgumentException("Cannot deserialize", ex);
+        }
+    }
+
+    @Override
+    public byte[] serialize(Serializable object) {
+        if (object == null) {
+            return new byte[0];
+        }
+        try {
+            ByteArrayOutputStream out = new ByteArrayOutputStream();
+            ObjectOutputStream outputStream = new ObjectOutputStream(out);
+            outputStream.writeObject(object);
+            outputStream.flush();
+            outputStream.close();
+            return out.toByteArray();
+        } catch (Exception ex) {
+            throw new IllegalArgumentException("Cannot serialize", ex);
+        }
+    }
+}

+ 99 - 0
serializer/KryoSerializer.java

@@ -0,0 +1,99 @@
+package com.sdyc.ndmp.protobuf.serializer;
+
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.common.collect.ImmutableMap;
+import com.google.protobuf.GeneratedMessage;
+import com.sdyc.ndmp.protobuf.dubbo.support.KryoFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.util.Map;
+
+/**
+ * <pre>
+ *
+ * Created by IntelliJ IDEA.
+ * User: zhenqin
+ * Date: 14/12/10
+ * Time: 09:27
+ * To change this template use File | Settings | File Templates.
+ *
+ * </pre>
+ *
+ * @author zhenqin
+ */
+public class KryoSerializer<T> implements Serializer<T> {
+
+
+    protected final Kryo kryo;
+
+
+    protected final Class<T> objectClass;
+
+
+    public KryoSerializer(Class<T> objectClass) {
+        this(objectClass,
+                ImmutableMap.<Class<?>, com.esotericsoftware.kryo.Serializer>of(
+                GeneratedMessage.class, new ProtobufKryoSerializer()));
+    }
+
+
+
+    public KryoSerializer(Class<T> objectClass, Map<Class<?>, com.esotericsoftware.kryo.Serializer> classMap) {
+        kryo = new Kryo();
+        for (Map.Entry<Class<?>, com.esotericsoftware.kryo.Serializer> entry : classMap.entrySet()) {
+            kryo.register(entry.getKey(), entry.getValue());
+        }
+        this.objectClass = objectClass;
+    }
+
+
+
+    protected KryoSerializer(Kryo kryo, Class<T> objectClass) {
+        this.kryo = kryo;
+        this.objectClass = objectClass;
+    }
+
+
+
+    public static <S> KryoSerializer<S> getKryoSerializer(Class<S> objectClass) {
+        return new KryoSerializer<S>(KryoFactory.newInstance(), objectClass);
+    }
+
+
+
+
+    public static <S> KryoSerializer<S> getKryoSerializer(KryoFactory.Call call, Class<S> objectClass) {
+        return new KryoSerializer<S>(KryoFactory.newInstance(call), objectClass);
+    }
+
+
+
+    @Override
+    public byte[] serialize(T o) {
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        Output output = new Output(out);
+        kryo.writeObject(output, o);
+        output.flush();
+        return out.toByteArray();
+    }
+
+
+
+    @Override
+    public T deserialize(byte[] bytes) {
+        return deserialize(bytes, this.objectClass);
+    }
+
+
+
+    public T deserialize(byte[] bytes, Class<T> clazz) {
+        ByteArrayInputStream in = new ByteArrayInputStream(bytes);
+        Input input = new Input(in);
+        return kryo.readObject(input, clazz);
+    }
+
+}

+ 83 - 0
serializer/ProtobufClassSerializer.java

@@ -0,0 +1,83 @@
+package com.sdyc.ndmp.protobuf.serializer;
+
+import com.google.protobuf.GeneratedMessage;
+
+import java.lang.reflect.Method;
+
+/**
+ * <pre>
+ *
+ * Created by IntelliJ IDEA.
+ * User: zhenqin
+ * Date: 14-7-28
+ * Time: 下午3:36
+ * To change this template use File | Settings | File Templates.
+ *
+ * </pre>
+ *
+ * @author zhenqin
+ */
+public class ProtobufClassSerializer implements Serializer<GeneratedMessage> {
+
+    /**
+     * 字符编码
+     */
+    protected String charset = "UTF-8";
+
+
+
+
+    protected final Class<? extends GeneratedMessage> objectClass;
+
+
+
+    protected Method parseMethod;
+
+
+    public ProtobufClassSerializer(Class<? extends GeneratedMessage> objectClass) {
+        this(objectClass, "UTF-8");
+    }
+
+
+    public ProtobufClassSerializer(Class<? extends GeneratedMessage> objectClass, String charset) {
+        this.charset = charset;
+        this.objectClass = objectClass;
+    }
+
+    @Override
+    public byte[] serialize(GeneratedMessage o) {
+        if (o == null) {
+            return null;
+        }
+
+        return o.toByteArray();
+    }
+
+    @Override
+    public GeneratedMessage deserialize(byte[] bytes) {
+        if (bytes == null || bytes.length == 0) {
+            return null;
+        }
+        // 检查一下Netty Protobuf解码器是怎么写的
+        try {
+            if (parseMethod == null) {
+                parseMethod = objectClass.getMethod("parseFrom", byte[].class);
+            }
+            return (GeneratedMessage) parseMethod.invoke(null, bytes);
+        } catch (NoSuchMethodException e) {
+            throw new IllegalStateException("class: " + objectClass.getName() + " has not method: parseFrom(byte[]).", e);
+        } catch (Exception e) {
+            throw new IllegalStateException("reflection error!", e);
+        }
+    }
+
+
+
+    public String getCharset() {
+        return charset;
+    }
+
+    public void setCharset(String charset) {
+        this.charset = charset;
+    }
+}

+ 49 - 0
serializer/ProtobufKryoSerializer.java

@@ -0,0 +1,49 @@
+package com.sdyc.ndmp.protobuf.serializer;
+
+import com.esotericsoftware.kryo.*;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.protobuf.GeneratedMessage;
+
+
+/**
+ * <pre>
+ *
+ * Created by IntelliJ IDEA.
+ * User: zhenqin
+ * Date: 14/12/10
+ * Time: 09:55
+ * To change this template use File | Settings | File Templates.
+ *
+ * </pre>
+ *
+ * @author zhenqin
+ */
+public class ProtobufKryoSerializer extends com.esotericsoftware.kryo.Serializer<GeneratedMessage> {
+
+
+    protected final ProtobufSerializer protobufSerializer;
+
+
+    public ProtobufKryoSerializer() {
+        protobufSerializer = new ProtobufSerializer();
+    }
+
+    @Override
+    public void write(Kryo kryo, Output output, GeneratedMessage object) {
+        byte[] bytes = protobufSerializer.serialize(object);
+        output.writeInt(bytes.length, false);
+        output.write(bytes);
+    }
+
+
+    @Override
+    public GeneratedMessage read(Kryo kryo, Input input, Class type) {
+        int i = input.readInt(false);
+        byte[] bytes = new byte[i];
+        input.read(bytes);
+        return protobufSerializer.deserialize(bytes);
+    }
+
+
+}

+ 116 - 0
serializer/ProtobufSerializer.java

@@ -0,0 +1,116 @@
+package com.sdyc.ndmp.protobuf.serializer;
+
+import com.google.protobuf.GeneratedMessage;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * <pre>
+ *
+ * Created by IntelliJ IDEA.
+ * User: zhenqin
+ * Date: 14-7-28
+ * Time: 下午3:36
+ * To change this template use File | Settings | File Templates.
+ *
+ * </pre>
+ *
+ * @author zhenqin
+ */
+public class ProtobufSerializer implements Serializer<GeneratedMessage> {
+
+    private static final long serialVersionUID = 1L;
+    /**
+     * 字符编码
+     */
+    protected String charset = "UTF-8";
+
+
+    /**
+     * 内部缓存
+     */
+    protected final static Map<Class<?>, Method>
+            CLASS_METHOD_MAP = new HashMap<Class<?>, Method>(3);
+
+
+    public ProtobufSerializer() {
+
+    }
+
+
+    public ProtobufSerializer(String charset) {
+        this.charset = charset;
+    }
+
+    @Override
+    public byte[] serialize(GeneratedMessage o) {
+        if (o == null) {
+            return null;
+        }
+
+        byte[] bytes = o.toByteArray();
+
+        String clazz = o.getClass().getName();
+        byte[] bytes1 = clazz.getBytes(Charset.forName(charset));
+
+        ByteBuf channelBuffer = Unpooled.buffer(4 + bytes.length + bytes1.length);
+        channelBuffer.writeInt(bytes1.length);
+        channelBuffer.writeBytes(bytes1);
+        channelBuffer.writeBytes(bytes);
+        return channelBuffer.array();
+    }
+
+    @Override
+    public GeneratedMessage deserialize(byte[] bytes) {
+        if (bytes == null || bytes.length == 0) {
+            return null;
+        }
+
+        ByteBuf channelBuffer = Unpooled.wrappedBuffer(bytes);
+
+        channelBuffer.resetReaderIndex();
+        int classNameLength = channelBuffer.readInt();
+
+        ByteBuf classBuf = channelBuffer.readBytes(classNameLength);
+        String clazzName = classBuf.toString(Charset.forName(charset));
+
+        ByteBuf objectBuf = channelBuffer.readBytes(channelBuffer.readableBytes());
+
+        // 检查一下Netty Protobuf解码器是怎么写的
+        try {
+            return reflact(clazzName, objectBuf.array());
+        } catch (ClassNotFoundException e) {
+            throw new IllegalStateException("class: " + clazzName + " not found!", e);
+        } catch (NoSuchMethodException e) {
+            throw new IllegalStateException("class: " + clazzName + " has not method: parseFrom(byte[]).", e);
+        } catch (Exception e) {
+            throw new IllegalStateException("reflection error!", e);
+        }
+    }
+
+
+    protected static GeneratedMessage reflact(String clazz, byte[] args) throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {
+        Class<?> aClass = Class.forName(clazz);
+        Method method = CLASS_METHOD_MAP.get(aClass);
+        if (method == null) {
+            method = aClass.getMethod("parseFrom", byte[].class);
+            CLASS_METHOD_MAP.put(aClass, method);
+        }
+        return (GeneratedMessage) method.invoke(null, args);
+    }
+
+
+    public String getCharset() {
+        return charset;
+    }
+
+    public void setCharset(String charset) {
+        this.charset = charset;
+    }
+}

+ 45 - 0
serializer/Serializer.java

@@ -0,0 +1,45 @@
+/*
+ * Copyright 2010-2011 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.sdyc.ndmp.protobuf.serializer;
+
+import java.io.Serializable;
+
+/**
+ * Basic interface serialization and deserialization of Objects to byte arrays (binary data).
+ * <p/>
+ * It is recommended that implementations are designed to handle null objects/empty arrays on serialization and deserialization side.
+ * Note that Redis does not accept null keys or values but can return null replies (for non existing keys).
+ *
+ * @author ZhenQin
+ */
+public interface Serializer<T> extends Serializable {
+
+    /**
+     * Serialize the given object to binary data.
+     *
+     * @param t object to serialize
+     * @return the equivalent binary data
+     */
+    byte[] serialize(T t);
+
+    /**
+     * Deserialize an object from the given binary data.
+     *
+     * @param bytes object binary representation
+     * @return the equivalent object instance
+     */
+    T deserialize(byte[] bytes);
+}

+ 43 - 0
serializer/SnappySerializer.java

@@ -0,0 +1,43 @@
+package com.sdyc.ndmp.protobuf.serializer;
+
+import org.xerial.snappy.Snappy;
+
+import java.io.IOException;
+
+/**
+ * <pre>
+ * Created with IntelliJ IDEA.
+ * User: lwj
+ * Date: 2015/2/27
+ * Time: 10:48
+ * To change this template use File | Settings | File Templates.
+ * </pre>
+ *
+ * @author lwj
+ */
+public class SnappySerializer extends CompressSerializer {
+
+
+    public SnappySerializer() {
+    }
+
+    @Override
+    public byte[] compress(byte[] bytes) {
+        try {
+            return Snappy.compress(bytes);
+        } catch (IOException e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+
+
+    @Override
+    public byte[] uncompress(byte[] bytes) {
+        try {
+            return Snappy.uncompress(bytes);
+        } catch (IOException e) {
+            throw new IllegalStateException(e);
+        }
+    }
+}

+ 124 - 0
serializer/StringSerializer.java

@@ -0,0 +1,124 @@
+package com.sdyc.ndmp.protobuf.serializer;
+
+import java.nio.charset.Charset;
+
+/**
+ * <pre>
+ *
+ * Created by IntelliJ IDEA.
+ * User: ZhenQin
+ * Date: 13-10-8
+ * Time: 上午9:57
+ * To change this template use File | Settings | File Templates.
+ *
+ * </pre>
+ *
+ * @author ZhenQin
+ */
+public class StringSerializer implements Serializer<String> {
+
+    private static final long serialVersionUID = 1L;
+    /**
+     * 默认支持的编码类型
+     */
+    private Charset charset = Charset.forName("UTF-8");
+
+
+    /**
+     * 默认是UTF-8的数据编码类型
+     */
+    public StringSerializer() {
+
+    }
+
+
+    /**
+     * 自己指定数据编码类型
+     * @param charset
+     */
+    public StringSerializer(Charset charset) {
+        this.charset = charset;
+    }
+
+
+    /**
+     * 自己指定数据编码类型
+     * @param charset
+     */
+    public StringSerializer(String charset) {
+        this.charset = Charset.forName(charset);
+    }
+
+    /**
+     * Serialize the given object to binary data.
+     *
+     * @param s object to serialize
+     * @return the equivalent binary data
+     */
+    @Override
+    public byte[] serialize(String s) {
+        return s.getBytes(charset);
+    }
+
+
+
+
+    public String deserialize(byte[] bytes, int offset, int length) {
+        return new String(bytes, offset, length, charset);
+    }
+
+
+
+    /**
+     * Deserialize an object from the given binary data.
+     *
+     * @param bytes object binary representation
+     * @return the equivalent object instance
+     */
+    @Override
+    public String deserialize(byte[] bytes) {
+        return new String(bytes, charset);
+    }
+
+
+    public Charset getCharset() {
+        return charset;
+    }
+
+    public void setCharset(Charset charset) {
+        this.charset = charset;
+    }
+
+    /**
+     * 把非字符串化的字符串转换成 String
+     * @param bytes
+     * @return
+     */
+    public static String toString(byte[] bytes) {
+        StringBuffer strBuf = new StringBuffer();
+
+        for (int i = 0; i < bytes.length; i++) {
+            strBuf.append((char) (((bytes[i] >> 4) & 0xF) + ((int) 'a')));
+            strBuf.append((char) (((bytes[i]) & 0xF) + ((int) 'a')));
+        }
+
+        return strBuf.toString();
+    }
+
+
+    /**
+     * 把非字符串化的字符串转换成 byte
+     * @param str
+     * @return
+     */
+    public static byte[] toBytes(String str) {
+        byte[] bytes = new byte[str.length() / 2];
+        for (int i = 0; i < str.length(); i += 2) {
+            char c = str.charAt(i);
+            bytes[i / 2] = (byte) ((c - 'a') << 4);
+            c = str.charAt(i + 1);
+            bytes[i / 2] += (c - 'a');
+        }
+        return bytes;
+    }
+}

+ 47 - 0
serializer/WritableKryoSerializer.java

@@ -0,0 +1,47 @@
+package com.sdyc.ndmp.protobuf.serializer;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.hadoop.io.Writable;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+
+/**
+ * <pre>
+ *
+ * Created by IntelliJ IDEA.
+ * User: zhenqin
+ * Date: 15/1/23
+ * Time: 10:02
+ * To change this template use File | Settings | File Templates.
+ *
+ * </pre>
+ *
+ * @author zhenqin
+ */
+public class WritableKryoSerializer extends com.esotericsoftware.kryo.Serializer<Writable> {
+
+
+
+    @Override
+    public void write(Kryo kryo, Output output, Writable object) {
+        try {
+            object.write(new DataOutputStream(output));
+        } catch (Exception ex) {
+            throw new IllegalArgumentException("Cannot deserialize", ex);
+        }
+    }
+
+    @Override
+    public Writable read(Kryo kryo, Input input, Class<Writable> type) {
+        try {
+            Writable object = type.newInstance();
+            object.readFields(new DataInputStream(input));
+            return object;
+        } catch (Exception ex) {
+            throw new IllegalArgumentException("Cannot serialize", ex);
+        }
+    }
+}

+ 79 - 0
serializer/WritableSerializer.java

@@ -0,0 +1,79 @@
+/*
+ * Copyright 2010-2011 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.sdyc.ndmp.protobuf.serializer;
+
+
+import org.apache.hadoop.io.Writable;
+
+import java.io.*;
+
+/**
+ * Java Serialization Redis serializer.
+ * Delegates to the default (Java based) serializer input Spring 3.
+ *
+ * @author ZhenQin
+ */
+public class WritableSerializer implements Serializer<Writable> {
+
+
+
+    /**
+     * 序列化表格
+     */
+    private static final long serialVersionUID = 1L;
+
+
+    protected final Class<Writable> objectClass;
+
+
+
+    public WritableSerializer(Class<Writable> t) {
+        this.objectClass = t;
+    }
+
+
+    @Override
+    public Writable deserialize(byte[] bytes) {
+        try {
+            DataInputStream in = new DataInputStream(new ByteArrayInputStream(bytes));
+            Writable object = objectClass.newInstance();
+            object.readFields(in);
+            in.close();
+            return object;
+        } catch (Exception ex) {
+            throw new IllegalArgumentException("Cannot deserialize", ex);
+        }
+    }
+
+    @Override
+    public byte[] serialize(Writable object) {
+        if (object == null) {
+            return new byte[0];
+        }
+        try {
+            ByteArrayOutputStream out = new ByteArrayOutputStream();
+            DataOutputStream outputStream = new DataOutputStream(out);
+
+            object.write(outputStream);
+
+            outputStream.flush();
+            outputStream.close();
+            return out.toByteArray();
+        } catch (Exception ex) {
+            throw new IllegalArgumentException("Cannot serialize", ex);
+        }
+    }
+}

+ 64 - 0
serializer/ZipSerializer.java

@@ -0,0 +1,64 @@
+package com.sdyc.ndmp.protobuf.serializer;
+
+import org.apache.commons.compress.compressors.CompressorException;
+import org.apache.commons.compress.compressors.CompressorInputStream;
+import org.apache.commons.compress.compressors.CompressorOutputStream;
+import org.apache.commons.compress.compressors.CompressorStreamFactory;
+import org.apache.commons.io.IOUtils;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+/**
+ * <pre>
+ * Created with IntelliJ IDEA.
+ * User: lwj
+ * Date: 2015/2/27
+ * Time: 10:48
+ * To change this template use File | Settings | File Templates.
+ * </pre>
+ *
+ * @author lwj
+ */
+public class ZipSerializer extends CompressSerializer {
+
+
+    public ZipSerializer() {
+    }
+
+    @Override
+    public byte[] compress(byte[] bytes) {
+        CompressorInputStream inputStream = null;
+        try {
+            inputStream = new CompressorStreamFactory().createCompressorInputStream(
+                    CompressorStreamFactory.BZIP2, new ByteArrayInputStream(bytes));
+            ByteArrayOutputStream out = new ByteArrayOutputStream();
+            IOUtils.copy(inputStream, out);
+            inputStream.close();
+            return out.toByteArray();
+        } catch (IOException e) {
+            throw new IllegalStateException(e);
+        } catch (CompressorException e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+
+
+    @Override
+    public byte[] uncompress(byte[] bytes) {
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        try {
+            CompressorOutputStream outputStream = new CompressorStreamFactory().createCompressorOutputStream(
+                    CompressorStreamFactory.BZIP2, out);
+            IOUtils.copy(new ByteArrayInputStream(bytes), outputStream);
+            outputStream.close();
+            return out.toByteArray();
+        } catch (IOException e) {
+            throw new IllegalStateException(e);
+        } catch (CompressorException e) {
+            throw new IllegalStateException(e);
+        }
+    }
+}