Forráskód Böngészése

添加 feign client 支持,添加zip压缩解压支持

zhzhenqin 4 éve
szülő
commit
e90bb7c0b2

+ 141 - 0
feign_client/DataPublisherFactory.java

@@ -0,0 +1,141 @@
+package com.primeton.dsp.dataservice;
+
+import com.alibaba.fastjson.JSON;
+import com.primeton.dsp.dataservice.service.IDataPublisherService;
+import feign.Feign;
+import feign.Request;
+import feign.RequestTemplate;
+import feign.Response;
+import feign.Util;
+import feign.codec.Decoder;
+import feign.codec.Encoder;
+import org.apache.commons.lang.math.NumberUtils;
+
+import java.io.Closeable;
+import java.lang.reflect.Method;
+import java.lang.reflect.Type;
+import java.nio.charset.StandardCharsets;
+
+
+/**
+ * 服务发布接口 Factory 实现
+ */
+public class DataPublisherFactory {
+
+	private DataPublisherFactory() {
+	}
+
+
+    /**
+     * 获取一个新服务
+     * @param url
+     * @return
+     */
+	public static IDataPublisherService newInstance(String url) {
+		IDataPublisherService dataPublisherService = Feign.builder()
+                .encoder(feignEncoder())
+                .decoder(feignDecoder())
+                .target(IDataPublisherService.class, url);
+		return dataPublisherService;
+	}
+
+
+    /**
+     * close 服务
+     * @param closeObj
+     */
+	public static void close(Object closeObj) {
+	    if(closeObj instanceof Closeable) {
+	        close((Closeable)closeObj);
+	        return;
+        }
+
+        try {
+            // 如果有无参数的 close  方法
+            Method close = closeObj.getClass().getDeclaredMethod("close");
+            if(close != null) {
+                close.setAccessible(true);
+                close.invoke(closeObj);
+            }
+        } catch (Exception e) {
+        }
+    }
+
+
+
+    public static void close(Closeable closeable) {
+	    try {
+            closeable.close();
+        } catch (Exception e){};
+    }
+
+
+    /**
+     * 解码器实现
+     * @return
+     */
+    public static Decoder feignDecoder() {
+        // JSON
+        return (Response response, Type type) -> {
+            Response.Body body = response.body();
+            if (response.status() == 404)
+                return Util.emptyValueOf(type);
+            if (body == null)
+                return null;
+            if (byte[].class.equals(type)) {
+                return Util.toByteArray(response.body().asInputStream());
+            }
+            if (String.class == type) {
+                return new String(Util.toByteArray(response.body().asInputStream()), StandardCharsets.UTF_8);
+            }
+
+            if(Number.class.isAssignableFrom(type.getClass())) {
+                // 数字类型
+                String bodyStr = new String(Util.toByteArray(response.body().asInputStream()), StandardCharsets.UTF_8);
+                return NumberUtils.createNumber(bodyStr).intValue();
+            }
+
+            // JSON 类型或者为对象类型
+            String bodyStr = new String(Util.toByteArray(response.body().asInputStream()), StandardCharsets.UTF_8);
+            return JSON.parseObject(bodyStr, type);
+        };
+    }
+
+
+    /**
+     * 接口发送时编码器
+     * @return
+     */
+    public static Encoder feignEncoder() {
+        // JSON
+        return (Object object, Type bodyType, RequestTemplate template) -> {
+            if (bodyType == String.class) {
+                template.body((String)object);
+            } else if (bodyType == byte[].class) {
+                template.body(Request.Body.encoded((byte[]) object, StandardCharsets.UTF_8));
+            } else if (object instanceof Number) {
+                // 数字类型
+                template.body(String.valueOf(object));
+            }
+            // 复杂类型,对象类型 转为 JSON 发送
+            template.body(JSON.toJSONString(object));
+        };
+    }
+    
+//	public static void main(String[] args) {
+//        IDataPublisherService publisherService = DataPublisherFactory.newInstance("http://localhost:9091/datarelease");
+//        Map<String, Object> model = new HashMap<>();
+//        model.put("appCode", "Hello");
+//        model.put("id", "adfadsfas");
+//        model.put("code", "CODE");
+//
+//        PubServiceVO model1 = new PubServiceVO();
+//        model1.setSid("abc123");
+//        model1.setPublisherModel(model);
+//        Map<String, Object> res = publisherService.doPubService(model1);
+//        System.out.println(res);
+//
+//        DataPublisherFactory.close(publisherService);
+//    }
+
+}

+ 79 - 0
feign_client/IDataPublisherService.java

@@ -0,0 +1,79 @@
+package com.primeton.dsp.dataservice.service;
+
+import com.primeton.dsp.dataservice.vomodel.PubServiceVO;
+import com.primeton.dsp.dataservice.vomodel.ServiceParamVO;
+import feign.Headers;
+import feign.Param;
+import feign.RequestLine;
+
+import java.util.Map;
+
+/**
+ * 
+ * 服务发布调用 datarelease 接口
+ * 
+ * 
+ * <pre>
+ *
+ * Created by zhaopx.
+ * User: zhaopx
+ * Date: 2020/3/30
+ * Time: 17:08
+ *
+ * </pre>
+ *
+ * @author zhaopx
+ */
+
+public interface IDataPublisherService {
+
+    /**
+     * 生成 SQL。 应包含 sid,table, conds 几个参数
+     *
+     * @return
+     */
+    @RequestLine("POST /api/pub/saveSingleQuery")
+    @Headers({"Content-Type: application/json", "Accept: application/json"})
+    Map<String, Object> saveSingleQuery(Map<String, Object> singleQuery);
+
+
+    /**
+     * 多表发布服务生成 SQL。 应包含 sid,tables,joins, conds 几个参数
+     *
+     * @return
+     */
+    @RequestLine("POST /api/pub/saveMutilTableQuery")
+    @Headers({"Content-Type: application/json", "Accept: application/json"})
+    Map<String, Object> saveMutilTableQuery(Map<String, Object> mutil);
+
+    /**
+     * 发布服务, 单表和多表
+     *
+     * @param model
+     * @return 返回发布结果, success 为 true 则发布成功
+     */
+    @RequestLine("POST /api/pub/doPub")
+    @Headers({"Content-Type: application/json", "Accept: application/json"})
+    public Map<String, Object> doPubService(PubServiceVO model);
+
+
+
+    /**
+     * 删除 已经发布的服务
+     *
+     * @param id 服务 ID
+     * @return success 为 true 则删除服务成功
+     */
+    @RequestLine("POST /api/pub/remove?id={id}")
+    Map<String, Object> removeDataPub(@Param("id") String id);
+
+
+    /**
+     * 修改服务参数
+     * @param model
+     * @return
+     */
+    @RequestLine("POST /api/pub/saveServiceParams")
+    @Headers({"Content-Type: application/json", "Accept: application/json"})
+    Map<String, Object> saveServiceParams(ServiceParamVO model);
+}

+ 25 - 5
httpclient/RestApiUtils.java

@@ -7,14 +7,18 @@ import com.alibaba.fastjson.JSONObject;
 import org.apache.http.HttpEntity;
 import org.apache.http.HttpResponse;
 import org.apache.http.client.HttpClient;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.http.client.methods.HttpGet;
 import org.apache.http.client.methods.HttpPost;
 import org.apache.http.config.Registry;
 import org.apache.http.config.RegistryBuilder;
+import org.apache.http.config.SocketConfig;
 import org.apache.http.conn.socket.ConnectionSocketFactory;
 import org.apache.http.conn.socket.PlainConnectionSocketFactory;
 import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
 import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.DefaultHttpRequestRetryHandler;
 import org.apache.http.impl.client.HttpClients;
 import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
@@ -58,7 +62,7 @@ public class RestApiUtils {
     /**
      * http client
      */
-    static final HttpClient httpClient;
+    static final CloseableHttpClient httpClient;
 
     static {
         try {
@@ -71,6 +75,7 @@ public class RestApiUtils {
             PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager(socketFactoryRegistry);
             httpClient = HttpClients.custom()
                     .setConnectionManager(cm)
+                    .setDefaultRequestConfig(RequestConfig.custom().setConnectTimeout(180000).build())
                     .setRetryHandler(new DefaultHttpRequestRetryHandler(3, false))
                     .build();
         } catch (Exception e) {
@@ -166,8 +171,9 @@ public class RestApiUtils {
         StringEntity entity = new StringEntity(jsonStr, "UTF-8");
         entity.setContentType("application/json");
         post.setEntity(entity);
+        CloseableHttpResponse resp = null;
         try {
-            HttpResponse resp = httpClient.execute(post);
+            resp = httpClient.execute(post);
             log.info("execute url {} return code: {}", url, resp.getStatusLine().getStatusCode());
             HttpEntity entity1 = resp.getEntity();
             String result = EntityUtils.toString(entity1);
@@ -185,7 +191,9 @@ public class RestApiUtils {
             jsonObject.put("status_code", resp.getStatusLine().getStatusCode());
             return jsonObject;
         } finally {
-            post.releaseConnection();
+            if(resp != null) {
+                resp.close();
+            }
         }
     }
 
@@ -242,8 +250,9 @@ public class RestApiUtils {
             }
             get.setParams(params);
         }
+        CloseableHttpResponse resp = null;
         try {
-            HttpResponse resp = httpClient.execute(get);
+            resp = httpClient.execute(get);
             log.info("execute url {} return code: {}", url, resp.getStatusLine().getStatusCode());
             HttpEntity entity = resp.getEntity();
             String result = EntityUtils.toString(entity);
@@ -261,7 +270,18 @@ public class RestApiUtils {
             jsonObject.put("status_code", resp.getStatusLine().getStatusCode());
             return jsonObject;
         } finally {
-            get.releaseConnection();
+            if(resp != null) {
+                resp.close();
+            }
         }
     }
+
+
+    /**
+     * 关闭 RPC 调用
+     * @throws IOException
+     */
+    public void shutdown() throws IOException {
+        httpClient.close();
+    }
 }

+ 4 - 0
httpservlet-support/MetaCubeArthNewSupport.java

@@ -93,7 +93,9 @@ public class MetaCubeArthNewSupport extends HttpServlet {
         String header = StringUtils.trimToNull(request.getHeader("Content-Type"));
         MediaType mediaType = MediaType.parse(Optional.ofNullable(header).orElse("text/html"));
         if("application".equalsIgnoreCase(mediaType.type()) || "json".equalsIgnoreCase(mediaType.subtype())) {
+            reader.mark(0);
             String json = IOUtils.toString(reader);
+            reader.reset();
             if(StringUtils.isBlank(json)) {
                 json = "{}";
             }
@@ -113,7 +115,9 @@ public class MetaCubeArthNewSupport extends HttpServlet {
         String header = StringUtils.trimToNull(request.getHeader("Content-Type"));
         MediaType mediaType = MediaType.parse(Optional.ofNullable(header).orElse("text/html"));
         if("application".equalsIgnoreCase(mediaType.type()) || "json".equalsIgnoreCase(mediaType.subtype())) {
+            reader.mark(0);
             String json = IOUtils.toString(reader);
+            reader.reset();
             if(StringUtils.isBlank(json)) {
                 json = "{}";
             }

+ 277 - 0
zip/ZipFileOperator.java

@@ -0,0 +1,277 @@
+package com.primeton.damp.utils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipFile;
+import java.util.zip.ZipOutputStream;
+
+public class ZipFileOperator {
+
+    /**
+     * 日志记录
+     *
+     */
+    private static Logger log = LoggerFactory.getLogger(ZipFileOperator.class);
+
+
+    /**
+     * 压缩指定文件集.
+     * @param compressFilePath 压缩后的zip文件保存的路径
+     * @param filesPath 需要被压缩的文件或者文件夹集合
+     * @return 返回压缩后的文件
+     * @throws IOException  压缩异常
+     */
+    public File compressZip(String compressFilePath, String... filesPath) throws IOException {
+        return compressZip(new File(compressFilePath), filesPath);
+    }
+
+
+    /**
+     * 压缩指定文件集.
+     *
+     * @param files
+     *            被压缩的文件集合
+     * @param compressFile
+     *            压缩后的zip文件保存的路径
+     * @return 返回压缩后的文件
+     * @throws IOException
+     *             压缩异常
+     */
+    public File compressZip(File compressFile, String... files)
+            throws IOException {
+        if (!compressFile.exists()) {
+            if (!compressFile.getParentFile().exists()) {
+                if (!compressFile.getParentFile().mkdirs()) {
+                    StringBuilder exception = new StringBuilder();
+                    exception.append("系统找不到指定的路径: ");
+                    exception.append(compressFile.getParentFile().getAbsolutePath());
+                    exception.append("并且创建: ");
+                    exception.append(compressFile.getAbsolutePath());
+                    exception.append("失败!");
+                    throw new IOException(exception.toString());
+                }
+            }
+            if (!compressFile.createNewFile()) {
+                StringBuilder exception = new StringBuilder();
+                exception.append("创建文件: ");
+                exception.append(compressFile.getAbsolutePath());
+                exception.append("失败!");
+                throw new IOException(exception.toString());
+            }
+        }
+        ZipOutputStream zos = null;
+        try {
+            zos = new ZipOutputStream(
+                    new FileOutputStream(compressFile));
+            for (String fileName : files) {
+                File compFile = new File(fileName);
+                if(compFile.exists()){
+                    compressZip0(zos, compFile, compFile.getName());
+                }
+            }
+            // 当压缩完成,关闭流
+            zos.closeEntry();
+        } catch(Exception e) {
+            log.warn("压缩文件异常。", e);
+        } finally {
+            if(zos != null){
+                zos.close();
+            }
+        }
+        return compressFile;
+    }
+
+    /**
+     * 递归压缩
+     * @param zos
+     * @param file
+     * @param baseDir
+     */
+    private void compressZip0(ZipOutputStream zos, File file, String baseDir){
+        // 压缩文件缓冲区大小
+        FileInputStream in = null;
+        if(file.isFile()){
+            try {
+                // 生成下一个压缩节点
+                zos.putNextEntry(new ZipEntry(baseDir));
+            } catch (IOException e1) {
+                log.warn("压缩文件异常。", e1);
+            }
+            byte[] buffere = new byte[4096];
+            int length = 0;// 读取的长度
+            try {
+                in = new FileInputStream(file);
+                while ((length = in.read(buffere)) != -1) {
+                    zos.write(buffere, 0, length);
+                }
+            } catch (IOException e) {
+                log.warn("压缩文件异常。", e);
+            } finally {
+                // 当压缩完成,关闭流
+                if(in != null){
+                    try{
+                        in.close();
+                    } catch(IOException e){
+
+                    }
+                }
+            }
+            log.debug("压缩文件:  " + file.getAbsolutePath() + "  成功!");
+            return ;
+        } else {
+            try {
+                zos.putNextEntry(new ZipEntry(baseDir + "/"));
+            } catch (IOException e) {
+                log.warn("压缩文件异常。", e);
+            }
+
+            baseDir = baseDir.length() == 0 ? "" : (baseDir + "/");
+            File[] files = file.listFiles();
+            // 遍历所有文件,逐个进行压缩
+            for(File f : files){
+                compressZip0(zos, f, baseDir + f.getName());
+            }
+        }
+    }
+
+
+
+    /**
+     * 解压缩指定zip文件名.
+     * @param zipFile zip文件名
+     * @param uncompressPath 解压说的后要保存的文件路径,若没有,系统自动新建
+     * @throws IOException 解压缩异常
+     */
+    public void uncompressZip(String zipFile, String uncompressPath)
+            throws IOException {
+        uncompressZip(new File(zipFile), new File(uncompressPath));
+    }
+
+
+    /**
+     * 解压缩指定zip文件.
+     *
+     * @param zipFile
+     *            zip文件名
+     * @param uncompressPath
+     *            解压说的后要保存的文件路径,若没有,系统自动新建
+     * @throws IOException 解压缩异常
+     */
+    public void uncompressZip(File zipFile, File uncompressPath)
+            throws IOException {
+        ZipFile zip = null;// 创建解压缩文件
+        try {
+            zip = new ZipFile(zipFile);
+        } catch (IOException e) {
+            log.warn("解压文件异常。", e);
+            return;
+        }
+        // 如果指定的路径不存在,创建文件夹
+        if (!uncompressPath.exists()) {
+            if (!uncompressPath.mkdirs()) {
+                StringBuilder exception = new StringBuilder();
+                exception.append(uncompressPath);
+                exception.append("路径不可到达,并且解压缩");
+                exception.append(zipFile);
+                exception.append("失败!");
+                throw new IOException(exception.toString());
+            }
+        }
+        // 返回 ZIP 文件条目的枚举。
+        Enumeration<? extends ZipEntry> en = zip.entries();
+        ZipEntry entry = null;
+
+        File file = null;
+        // 遍历每一个文件
+        while (en.hasMoreElements()) {
+            // 如果压缩包还有下一个文件,则循环
+            entry = (ZipEntry) en.nextElement();
+            if (entry.isDirectory()) {
+                // 如果是文件夹,创建文件夹并加速循环
+                file = new File(uncompressPath, entry.getName());
+                file.mkdirs();
+                continue;
+            }
+            // 构建文件对象
+            file = new File(uncompressPath, entry.getName());
+            if (!file.getParentFile().exists()) {
+                // 如果文件对象的父目录不存在,创建文件夹
+                if(!file.getParentFile().mkdirs()){
+                    log.debug("can not create dir: " + file.getAbsolutePath());
+                }
+            }
+
+            InputStream in = null;
+            FileOutputStream out = null;
+            try {
+                in = zip.getInputStream(entry);
+                out = new FileOutputStream(file);
+                byte[] bytes = new byte[2048];
+                int size = -1;
+                while((size = in.read(bytes)) != -1){
+                     out.write(bytes, 0, size);
+                }
+                out.flush();
+            } catch (IOException e) {
+                log.warn("解压文件异常。", e);
+            } finally {
+                if(in != null){
+                    try {
+                        in.close();
+                    }catch(IOException e) {
+
+                    }
+                }
+                if(out != null){
+                    try {
+                        out.close();
+                    }catch(IOException e) {
+                    }
+                }
+            }
+            log.debug("解压文件:" + entry.getName() + "成功!");
+        }
+        try{
+            zip.close();
+        } catch(IOException e) {
+            log.warn("关闭文件异常。", e);
+        }
+    }
+
+
+
+    /**
+     * 递归获得该文件夹下所有文件(不包括目录).
+     * 如果该文件路径指向一个文件,则返回该文件的单个集合,
+     * 如果该文件指向一个目录,则返回该目录下的所有文件
+     *
+     * @param file
+     *            可以是目录,也可以是文件,当是文件时,直接返回该文件的列表
+     * @return 返回该文件夹下的所有子文件
+     */
+    public List<File> getSubFile(File file) {
+        // 文件列表对象
+        List<File> fileList = new ArrayList<File>();
+        if (file.isFile()) {
+            // 如果是普通文件,直接把该文件添加到文件列表
+            fileList.add(file);
+            return fileList;
+        }
+        if (file.isDirectory()) {
+            fileList.add(file);
+            // 如果是目录,则遍历目录下的所有文件
+            for (File f : file.listFiles()) {
+                // 这里使用的递归,一直到文件目录的最底层,而文件列表里存的,全是文件
+                fileList.addAll(getSubFile(f));
+            }
+        }
+        return fileList;
+    }
+
+}

+ 692 - 0
zip/ZipHdfsOperator.java

@@ -0,0 +1,692 @@
+package com.jiusuo.flume.sink.compress;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.tools.tar.TarEntry;
+import org.apache.tools.tar.TarOutputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.zip.*;
+import java.util.zip.ZipFile;
+
+/**
+ *
+ * @author Rayn on 2016/1/22 14:00
+ * @email liuwei412552703@163.com
+ */
+public class ZipHdfsOperator {
+
+    /**
+     * log
+     */
+    private static Logger log = LoggerFactory.getLogger(ZipHdfsOperator.class);
+
+
+    public ZipHdfsOperator() {
+
+    }
+
+    /**
+     * 压缩指定文件集.
+     *
+     * @param compressFilePath 压缩后的zip文件保存的路径
+     * @param fs File System
+     * @param path        需要被压缩的文件或者文件夹集合
+     * @return 返回压缩后的文件
+     * @throws IOException 压缩异常
+     */
+    public static Path compressZip(String compressFilePath, FileSystem fs, String... path) throws IOException {
+        return compressZip(new Path(compressFilePath), fs, path);
+    }
+
+
+    /**
+     * 压缩指定文件集.
+     *
+     * @param path        被压缩的文件集合
+     * @param compressFile 压缩后的zip文件保存的路径
+     * @return 返回压缩后的文件
+     * @throws IOException 压缩异常
+     */
+    public static Path compressZip(Path compressFile, FileSystem fs, String... path) throws IOException {
+        if (!fs.exists(compressFile)) {
+            if (!fs.exists(compressFile.getParent())) {
+                if (!fs.mkdirs(compressFile.getParent())) {
+                    StringBuilder exception = new StringBuilder();
+                    exception.append("系统找不到指定的路径: ");
+                    exception.append(compressFile.getParent().toString());
+                    exception.append("并且创建: ");
+                    exception.append(compressFile.toString());
+                    exception.append("失败!");
+                    throw new IOException(exception.toString());
+                }
+            }
+            if (!fs.createNewFile(compressFile)) {
+                StringBuilder exception = new StringBuilder();
+                exception.append("创建文件: ");
+                exception.append(compressFile.toString());
+                exception.append("失败!");
+                throw new IOException(exception.toString());
+            }
+        }
+        ZipOutputStream zos = null;
+        try {
+            zos = new ZipOutputStream(fs.create(compressFile));
+            for (String fileName : path) {
+                Path compFile = new Path(fileName);
+                if (fs.isFile(compFile)) {
+                    compressZip0(zos, fs, compFile, compFile.getName());
+                } else {
+                    compressZip0(zos, fs, compFile, "");
+                }
+            }
+
+            // 当压缩完成,关闭流
+            zos.closeEntry();
+        } catch (Exception e) {
+            log.warn("", e);
+        } finally {
+            if (zos != null) {
+                zos.close();
+            }
+        }
+        return compressFile;
+    }
+
+    /**
+     * 递归压缩
+     *
+     * @param zos
+     * @param path
+     * @param baseDir
+     */
+    private static void compressZip0(ZipOutputStream zos,  FileSystem fs, Path path, String baseDir) throws IOException {
+        // 压缩文件缓冲区大小
+        InputStream in = null;
+        if (fs.isFile(path)) {
+            try {
+                // 生成下一个压缩节点
+                zos.putNextEntry(new ZipEntry(baseDir));
+            } catch (IOException e1) {
+                log.warn("", e1);
+            }
+            byte[] buffere = new byte[4096];
+            int length = 0;// 读取的长度
+            try {
+                in = fs.open(path);
+                while ((length = in.read(buffere)) != -1) {
+                    zos.write(buffere, 0, length);
+                }
+            } catch (IOException e) {
+                log.error("", e);
+            } finally {
+                // 当压缩完成,关闭流
+                if (in != null) {
+                    try {
+                        in.close();
+                    } catch (IOException e) {
+                        log.debug("", e);
+                    }
+                }
+            }
+            log.info("压缩文件:  " + path + "  成功!");
+            return;
+        } else {
+            try {
+                zos.putNextEntry(new ZipEntry(baseDir + "/"));
+            } catch (IOException e) {
+                log.warn("", e);
+            }
+
+            baseDir = baseDir.length() == 0 ? "" : (baseDir + "/");
+
+            FileStatus[] statuses = fs.listStatus(path, new PathFilter() {
+                @Override
+                public boolean accept(Path path) {
+                    String name = path.getName();
+                    return !name.startsWith(".") && !name.startsWith("_");
+                }
+            });
+            // 遍历所有文件,逐个进行压缩
+            for (FileStatus status : statuses) {
+                compressZip0(zos, fs, status.getPath(), baseDir + status.getPath().getName());
+            }
+        }
+    }
+
+
+    /**
+     * 解压缩指定zip文件名.
+     *
+     * @param zipFile        zip文件名
+     * @param uncompressPath 解压说的后要保存的文件路径,若没有,系统自动新建
+     * @throws IOException 解压缩异常
+     */
+    public static void uncompressZip(String zipFile, FileSystem fs, String uncompressPath) throws IOException {
+        uncompressZip(new File(zipFile), fs, new Path(uncompressPath));
+    }
+
+
+    /**
+     * 解压缩指定zip文件.
+     *
+     * @param zipFile        zip文件名
+     * @param uncompressPath 解压说的后要保存的文件路径,若没有,系统自动新建
+     * @throws IOException 解压缩异常
+     */
+    public static List<Path> uncompressZip(FileSystem fs, File zipFile, Path uncompressPath) throws IOException {
+        ZipFile zip = null;// 创建解压缩文件
+        List<Path> paths = new ArrayList<Path>(3);
+        try {
+            zip = new ZipFile(zipFile);
+            Enumeration<? extends ZipEntry> en = zip.entries();
+            ZipEntry entry = null;
+
+            Path path = null;
+
+            // 遍历每一个文件
+            while (en.hasMoreElements()) {
+                // 如果压缩包还有下一个文件,则循环
+                entry = (ZipEntry) en.nextElement();
+                if (entry.isDirectory()) {
+                    // 如果是文件夹,创建文件夹并加速循环
+                    path = new Path(uncompressPath, entry.getName());
+                    fs.mkdirs(path);
+                    continue;
+                }
+                // 构建文件对象
+                path = new Path(uncompressPath, entry.getName());
+
+                InputStream in = null;
+                OutputStream out = null;
+                try {
+                    in = zip.getInputStream(entry);
+
+                    out = fs.create(path, true);
+                    IOUtils.copy(in, out);
+                    out.flush();
+                } catch (IOException e) {
+                    e.printStackTrace();
+                } finally {
+                    if (in != null) {
+                        try {
+                            in.close();
+                        } catch (IOException e) {
+                            e.printStackTrace();
+                        }
+                    }
+                    if (out != null) {
+                        try {
+                            out.close();
+                        } catch (IOException e) {
+                            e.printStackTrace();
+                        }
+                    }
+                }
+
+                paths.add(path);
+                log.info("解压文件: {} 成功!" + entry.getName());
+            }
+
+        } catch (IOException e) {
+            e.printStackTrace();
+        } finally {
+            if (zip != null) {
+                try {
+                    zip.close();
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+
+        return paths;
+    }
+
+    /**
+     * 解压缩指定zip文件.
+     *
+     * @param zipFile        zip文件名
+     * @param uncompressPath 解压说的后要保存的文件路径,若没有,系统自动新建
+     * @throws IOException 解压缩异常
+     */
+    public static FileInfo uncompressZip(File zipFile, FileSystem fs, Path uncompressPath) throws IOException {
+        ZipFile zip = null;// 创建解压缩文件
+
+        FileInfo fileInfo = new FileInfo();
+        try {
+            zip = new ZipFile(zipFile);
+            String name = zip.getName();
+            fileInfo.setExtendName(name);
+        } catch (IOException e) {
+            log.error("", e);
+            return fileInfo;
+        }
+        // 如果指定的路径不存在,创建文件夹
+        if (!fs.exists(uncompressPath) || !fs.isDirectory(uncompressPath)) {
+            if (!fs.mkdirs(uncompressPath)) {
+                StringBuilder exception = new StringBuilder();
+                exception.append(uncompressPath);
+                exception.append("路径不可到达,并且解压缩");
+                exception.append(zipFile);
+                exception.append("失败!");
+                throw new IOException(exception.toString());
+            }
+        }
+        // 返回 ZIP 文件条目的枚举。
+        Enumeration<? extends ZipEntry> en = zip.entries();
+        ZipEntry entry = null;
+
+        int fileCount = 0;
+        long totalSize = 0;
+
+        Path path = null;
+        // 遍历每一个文件
+        while (en.hasMoreElements()) {
+            // 如果压缩包还有下一个文件,则循环
+            entry = (ZipEntry) en.nextElement();
+            if (entry.isDirectory()) {
+                // 如果是文件夹,创建文件夹并加速循环
+                path = new Path(uncompressPath, entry.getName());
+                fs.mkdirs(path);
+                continue;
+            }
+            // 构建文件对象
+            path = new Path(uncompressPath, entry.getName());
+
+            log.info(entry.getName());
+
+            InputStream in = null;
+            OutputStream out = null;
+            try {
+                in = zip.getInputStream(entry);
+
+                //总文件大小
+                totalSize += in.available();
+                //文件总数
+                if(!entry.getName().toString().endsWith(".xml") && !entry.getName().toString().endsWith(".avro")){
+                    fileCount += 1;
+                }
+
+
+                out = fs.create(path, true);
+                IOUtils.copy(in, out);
+                out.flush();
+            } catch (IOException e) {
+                log.warn("", e);
+            } finally {
+                if (in != null) {
+                    try {
+                        in.close();
+                    } catch (IOException e) {
+                        log.debug("", e);
+                    }
+                }
+                if (out != null) {
+                    try {
+                        out.close();
+                    } catch (IOException e) {
+                        log.debug("", e);
+                    }
+                }
+            }
+            log.debug("解压文件:" + entry.getName() + "成功!");
+        }
+
+        fileInfo.setFileCount(fileCount);
+        fileInfo.setTotalSize(totalSize);
+
+        try {
+            zip.close();
+        } catch (IOException e) {
+            log.debug("", e);
+        }
+
+        return fileInfo;
+    }
+
+    /**
+     * 解压缩文件
+     * @param zipFile
+     * @param uncompressPath
+     * @return
+     * @throws IOException
+     */
+    public static File uncompressZip(File zipFile, String uncompressPath) throws IOException {
+        ZipFile zip = null ;// 创建解压缩文件
+        File file = new File(uncompressPath);
+        try {
+            zip = new ZipFile(zipFile);
+        } catch (IOException e) {
+            log.error("", e);
+            return file;
+        }
+        // 如果指定的路径不存在,创建文件夹
+        if (!file.exists() || !file.isDirectory()) {
+            if (!file.mkdirs()) {
+                StringBuilder exception = new StringBuilder();
+                exception.append(uncompressPath);
+                exception.append("路径不可到达,并且解压缩");
+                exception.append(zipFile);
+                exception.append("失败!");
+                throw new IOException(exception.toString());
+            }
+        }
+        // 返回 ZIP 文件条目的枚举。
+        Enumeration<? extends ZipEntry> en = zip.entries();
+        ZipEntry entry = null;
+
+        String childPath = "";
+        // 遍历每一个文件
+        while (en.hasMoreElements()) {
+            // 如果压缩包还有下一个文件,则循环
+            entry = (ZipEntry) en.nextElement();
+            childPath = uncompressPath+ "/" +entry.getName();
+
+            if (entry.isDirectory()) {
+                // 如果是文件夹,创建文件夹并加速循环
+                File file1 = new File(childPath);
+                file1.mkdirs();
+                continue;
+            }
+            // 构建文件对象
+
+            InputStream in = null;
+            OutputStream out = null;
+            try {
+                in = zip.getInputStream(entry);
+                out = new FileOutputStream(childPath);
+                IOUtils.copy(in, out);
+                out.flush();
+            } catch (IOException e) {
+                log.warn("", e);
+            } finally {
+                if (in != null) {
+                    try {
+                        in.close();
+                    } catch (IOException e) {
+                        log.debug("", e);
+                    }
+                }
+                if (out != null) {
+                    try {
+                        out.close();
+                    } catch (IOException e) {
+                        log.debug("", e);
+                    }
+                }
+            }
+            log.debug("解压文件:" + entry.getName() + "成功!");
+        }
+        try {
+            zip.close();
+        } catch (IOException e) {
+            log.debug("", e);
+        }
+        final File[] files = file.listFiles(new FileFilter() {
+            @Override
+            public boolean accept(File pathname) {
+                return pathname.isFile();
+            }
+        });
+        return files[0];
+    }
+
+
+    /**
+     * 递归获得该文件夹下所有文件(不包括目录).
+     * 如果该文件路径指向一个文件,则返回该文件的单个集合,
+     * 如果该文件指向一个目录,则返回该目录下的所有文件
+     *
+     * @param file 可以是目录,也可以是文件,当是文件时,直接返回该文件的列表
+     * @return 返回该文件夹下的所有子文件
+     */
+    public List<File> getSubFile(File file) {
+        // 文件列表对象
+        List<File> fileList = new ArrayList<File>();
+        if (file.isFile()) {
+            // 如果是普通文件,直接把该文件添加到文件列表
+            fileList.add(file);
+            return fileList;
+        }
+        if (file.isDirectory()) {
+            fileList.add(file);
+            // 如果是目录,则遍历目录下的所有文件
+            for (File f : file.listFiles()) {
+                // 这里使用的递归,一直到文件目录的最底层,而文件列表里存的,全是文件
+                fileList.addAll(getSubFile(f));
+            }
+        }
+        return fileList;
+    }
+
+    /**
+     * 解压缩指定zip文件.
+     *
+     * @param zipFile
+     *            zip文件名
+     * @param uncompressPath
+     *            解压说的后要保存的文件路径,若没有,系统自动新建
+     * @throws IOException 解压缩异常
+     */
+    public void uncompressZip(File zipFile, File uncompressPath)
+            throws IOException {
+        ZipFile zip = null;// 创建解压缩文件
+        try {
+            zip = new ZipFile(zipFile);
+        } catch (IOException e) {
+            log.error("", e);
+            return;
+        }
+        // 如果指定的路径不存在,创建文件夹
+        if (!uncompressPath.exists()) {
+            if (!uncompressPath.mkdirs()) {
+                StringBuilder exception = new StringBuilder();
+                exception.append(uncompressPath);
+                exception.append("路径不可到达,并且解压缩");
+                exception.append(zipFile);
+                exception.append("失败!");
+                throw new IOException(exception.toString());
+            }
+        }
+        // 返回 ZIP 文件条目的枚举。
+        Enumeration<? extends ZipEntry> en = zip.entries();
+        ZipEntry entry = null;
+
+        File file = null;
+        // 遍历每一个文件
+        while (en.hasMoreElements()) {
+            // 如果压缩包还有下一个文件,则循环
+            entry = (ZipEntry) en.nextElement();
+            if (entry.isDirectory()) {
+                // 如果是文件夹,创建文件夹并加速循环
+                file = new File(uncompressPath, entry.getName());
+                file.mkdirs();
+                continue;
+            }
+            // 构建文件对象
+            file = new File(uncompressPath, entry.getName());
+            if (!file.getParentFile().exists()) {
+                // 如果文件对象的父目录不存在,创建文件夹
+                if(!file.getParentFile().mkdirs()){
+                    log.debug("can not create dir: " + file.getAbsolutePath());
+                }
+            }
+
+            InputStream in = null;
+            FileOutputStream out = null;
+            try {
+                in = zip.getInputStream(entry);
+                out = new FileOutputStream(file);
+                byte[] bytes = new byte[2048];
+                int size = -1;
+                while((size = in.read(bytes)) != -1){
+                    out.write(bytes, 0, size);
+                }
+                out.flush();
+            } catch (IOException e) {
+                log.warn("", e);
+            } finally {
+                if(in != null){
+                    try {
+                        in.close();
+                    }catch(IOException e) {
+                        log.debug("", e);
+                    }
+                }
+                if(out != null){
+                    try {
+                        out.close();
+                    }catch(IOException e) {
+                        log.debug("", e);
+                    }
+                }
+            }
+            log.debug("解压文件:" + entry.getName() + "成功!");
+        }
+        try{
+            zip.close();
+        } catch(IOException e) {
+            log.debug("", e);
+        }
+    }
+
+    /**
+     * 压缩文件成Gzip格式,Linux上可使用
+     * 压缩文件夹生成后缀名为".gz"的文件并下载
+
+     * */
+    public static void compressTarGz(String folderPath, String  targzipFilePath) {
+        File srcPath =new File(folderPath);
+        int length=srcPath.listFiles().length;
+        byte[] buf = new byte[4096]; //设定读入缓冲区尺寸
+        File[] files   =   srcPath.listFiles();
+        try
+        {
+            //建立压缩文件输出流
+            FileOutputStream fout=new FileOutputStream(targzipFilePath);
+            //建立tar压缩输出流
+            TarOutputStream tout=new TarOutputStream(fout);
+            for(int i=0;i<length;i++)
+            {
+                String filename=srcPath.getPath()+File.separator+files[i].getName();
+                //打开需压缩文件作为文件输入流
+                FileInputStream fin=new FileInputStream(filename);   //filename是文件全路径
+                TarEntry tarEn=new TarEntry(files[i]); //此处必须使用new TarEntry(File file);
+                tarEn.setName(files[i].getName());  //此处需重置名称,默认是带全路径的,否则打包后会带全路径
+                tout.putNextEntry(tarEn);
+                int num;
+                while ((num=fin.read(buf)) != -1)
+                {
+                    tout.write(buf,0,num);
+                }
+                tout.closeEntry();
+                fin.close();
+            }
+            tout.close();
+            fout.close();
+
+            //建立压缩文件输出流
+            FileOutputStream gzFile=new FileOutputStream(targzipFilePath+".gz");
+            //建立gzip压缩输出流
+            GZIPOutputStream gzout=new GZIPOutputStream(gzFile);
+            //打开需压缩文件作为文件输入流
+            FileInputStream tarin=new FileInputStream(targzipFilePath);   //targzipFilePath是文件全路径
+            int len;
+            while ((len=tarin.read(buf)) != -1)
+            {
+                gzout.write(buf,0,len);
+            }
+            gzout.close();
+            gzFile.close();
+            tarin.close();
+        }catch(FileNotFoundException e)
+        {
+            System.out.println(e);
+        }catch(IOException e)
+        {
+            System.out.println(e);
+        }
+    }
+
+
+    /**
+     * 递归删除目录下的所有文件及子目录下所有文件
+     * @param dir 将要删除的文件目录
+     * @return boolean Returns "true" if all deletions were successful.
+     *                 If a deletion fails, the method stops attempting to
+     *                 delete and returns "false".
+     */
+    private static boolean deleteDir(File dir) {
+        if (dir.isDirectory()) {
+            String[] children = dir.list();
+            //递归删除目录中的子目录下
+            for (int i=0; i<children.length; i++) {
+                boolean success = deleteDir(new File(dir, children[i]));
+                if (!success) {
+                    return false;
+                }
+            }
+        }
+        // 目录此时为空,可以删除
+        return dir.delete();
+    }
+
+    public static class FileInfo {
+
+        private int fileCount = 0;
+
+        private long totalSize = 0;
+
+        private String extendName = "zip";
+
+        public FileInfo() {
+        }
+
+        public FileInfo(int fileCount, long totalSize, String extendName) {
+            this.fileCount = fileCount;
+            this.totalSize = totalSize;
+            this.extendName = extendName;
+        }
+
+        public int getFileCount() {
+            return fileCount;
+        }
+
+        public void setFileCount(int fileCount) {
+            this.fileCount = fileCount;
+        }
+
+        public long getTotalSize() {
+            return totalSize;
+        }
+
+        public void setTotalSize(long totalSize) {
+            this.totalSize = totalSize;
+        }
+
+        public String getExtendName() {
+            return extendName;
+        }
+
+        public void setExtendName(String extendName) {
+            this.extendName = extendName;
+        }
+    }
+
+
+    public static void main(String[] args) throws IOException {
+        FileSystem fs = FileSystem.get(new Configuration());
+        ZipHdfsOperator.compressZip(new Path("/user/zhaopingxi/fenguang-test/fengaung-comp.zip"), fs, "/user/zhaopingxi/comp");
+    }
+
+}