package com.jiusuo.flume.sink.compress; import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.tools.tar.TarEntry; import org.apache.tools.tar.TarOutputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.*; import java.util.ArrayList; import java.util.Enumeration; import java.util.List; import java.util.zip.*; import java.util.zip.ZipFile; /** * * @author Rayn on 2016/1/22 14:00 * @email liuwei412552703@163.com */ public class ZipHdfsOperator { /** * log */ private static Logger log = LoggerFactory.getLogger(ZipHdfsOperator.class); public ZipHdfsOperator() { } /** * 压缩指定文件集. * * @param compressFilePath 压缩后的zip文件保存的路径 * @param fs File System * @param path 需要被压缩的文件或者文件夹集合 * @return 返回压缩后的文件 * @throws IOException 压缩异常 */ public static Path compressZip(String compressFilePath, FileSystem fs, String... path) throws IOException { return compressZip(new Path(compressFilePath), fs, path); } /** * 压缩指定文件集. * * @param path 被压缩的文件集合 * @param compressFile 压缩后的zip文件保存的路径 * @return 返回压缩后的文件 * @throws IOException 压缩异常 */ public static Path compressZip(Path compressFile, FileSystem fs, String... path) throws IOException { if (!fs.exists(compressFile)) { if (!fs.exists(compressFile.getParent())) { if (!fs.mkdirs(compressFile.getParent())) { StringBuilder exception = new StringBuilder(); exception.append("系统找不到指定的路径: "); exception.append(compressFile.getParent().toString()); exception.append("并且创建: "); exception.append(compressFile.toString()); exception.append("失败!"); throw new IOException(exception.toString()); } } if (!fs.createNewFile(compressFile)) { StringBuilder exception = new StringBuilder(); exception.append("创建文件: "); exception.append(compressFile.toString()); exception.append("失败!"); throw new IOException(exception.toString()); } } ZipOutputStream zos = null; try { zos = new ZipOutputStream(fs.create(compressFile)); for (String fileName : path) { Path compFile = new Path(fileName); if (fs.isFile(compFile)) { compressZip0(zos, fs, compFile, compFile.getName()); } else { compressZip0(zos, fs, compFile, ""); } } // 当压缩完成,关闭流 zos.closeEntry(); } catch (Exception e) { log.warn("", e); } finally { if (zos != null) { zos.close(); } } return compressFile; } /** * 递归压缩 * * @param zos * @param path * @param baseDir */ private static void compressZip0(ZipOutputStream zos, FileSystem fs, Path path, String baseDir) throws IOException { // 压缩文件缓冲区大小 InputStream in = null; if (fs.isFile(path)) { try { // 生成下一个压缩节点 zos.putNextEntry(new ZipEntry(baseDir)); } catch (IOException e1) { log.warn("", e1); } byte[] buffere = new byte[4096]; int length = 0;// 读取的长度 try { in = fs.open(path); while ((length = in.read(buffere)) != -1) { zos.write(buffere, 0, length); } } catch (IOException e) { log.error("", e); } finally { // 当压缩完成,关闭流 if (in != null) { try { in.close(); } catch (IOException e) { log.debug("", e); } } } log.info("压缩文件: " + path + " 成功!"); return; } else { try { zos.putNextEntry(new ZipEntry(baseDir + "/")); } catch (IOException e) { log.warn("", e); } baseDir = baseDir.length() == 0 ? "" : (baseDir + "/"); FileStatus[] statuses = fs.listStatus(path, new PathFilter() { @Override public boolean accept(Path path) { String name = path.getName(); return !name.startsWith(".") && !name.startsWith("_"); } }); // 遍历所有文件,逐个进行压缩 for (FileStatus status : statuses) { compressZip0(zos, fs, status.getPath(), baseDir + status.getPath().getName()); } } } /** * 解压缩指定zip文件名. * * @param zipFile zip文件名 * @param uncompressPath 解压说的后要保存的文件路径,若没有,系统自动新建 * @throws IOException 解压缩异常 */ public static void uncompressZip(String zipFile, FileSystem fs, String uncompressPath) throws IOException { uncompressZip(new File(zipFile), fs, new Path(uncompressPath)); } /** * 解压缩指定zip文件. * * @param zipFile zip文件名 * @param uncompressPath 解压说的后要保存的文件路径,若没有,系统自动新建 * @throws IOException 解压缩异常 */ public static List uncompressZip(FileSystem fs, File zipFile, Path uncompressPath) throws IOException { ZipFile zip = null;// 创建解压缩文件 List paths = new ArrayList(3); try { zip = new ZipFile(zipFile); Enumeration en = zip.entries(); ZipEntry entry = null; Path path = null; // 遍历每一个文件 while (en.hasMoreElements()) { // 如果压缩包还有下一个文件,则循环 entry = (ZipEntry) en.nextElement(); if (entry.isDirectory()) { // 如果是文件夹,创建文件夹并加速循环 path = new Path(uncompressPath, entry.getName()); fs.mkdirs(path); continue; } // 构建文件对象 path = new Path(uncompressPath, entry.getName()); InputStream in = null; OutputStream out = null; try { in = zip.getInputStream(entry); out = fs.create(path, true); IOUtils.copy(in, out); out.flush(); } catch (IOException e) { e.printStackTrace(); } finally { if (in != null) { try { in.close(); } catch (IOException e) { e.printStackTrace(); } } if (out != null) { try { out.close(); } catch (IOException e) { e.printStackTrace(); } } } paths.add(path); log.info("解压文件: {} 成功!" + entry.getName()); } } catch (IOException e) { e.printStackTrace(); } finally { if (zip != null) { try { zip.close(); } catch (Exception e) { e.printStackTrace(); } } } return paths; } /** * 解压缩指定zip文件. * * @param zipFile zip文件名 * @param uncompressPath 解压说的后要保存的文件路径,若没有,系统自动新建 * @throws IOException 解压缩异常 */ public static FileInfo uncompressZip(File zipFile, FileSystem fs, Path uncompressPath) throws IOException { ZipFile zip = null;// 创建解压缩文件 FileInfo fileInfo = new FileInfo(); try { zip = new ZipFile(zipFile); String name = zip.getName(); fileInfo.setExtendName(name); } catch (IOException e) { log.error("", e); return fileInfo; } // 如果指定的路径不存在,创建文件夹 if (!fs.exists(uncompressPath) || !fs.isDirectory(uncompressPath)) { if (!fs.mkdirs(uncompressPath)) { StringBuilder exception = new StringBuilder(); exception.append(uncompressPath); exception.append("路径不可到达,并且解压缩"); exception.append(zipFile); exception.append("失败!"); throw new IOException(exception.toString()); } } // 返回 ZIP 文件条目的枚举。 Enumeration en = zip.entries(); ZipEntry entry = null; int fileCount = 0; long totalSize = 0; Path path = null; // 遍历每一个文件 while (en.hasMoreElements()) { // 如果压缩包还有下一个文件,则循环 entry = (ZipEntry) en.nextElement(); if (entry.isDirectory()) { // 如果是文件夹,创建文件夹并加速循环 path = new Path(uncompressPath, entry.getName()); fs.mkdirs(path); continue; } // 构建文件对象 path = new Path(uncompressPath, entry.getName()); log.info(entry.getName()); InputStream in = null; OutputStream out = null; try { in = zip.getInputStream(entry); //总文件大小 totalSize += in.available(); //文件总数 if(!entry.getName().toString().endsWith(".xml") && !entry.getName().toString().endsWith(".avro")){ fileCount += 1; } out = fs.create(path, true); IOUtils.copy(in, out); out.flush(); } catch (IOException e) { log.warn("", e); } finally { if (in != null) { try { in.close(); } catch (IOException e) { log.debug("", e); } } if (out != null) { try { out.close(); } catch (IOException e) { log.debug("", e); } } } log.debug("解压文件:" + entry.getName() + "成功!"); } fileInfo.setFileCount(fileCount); fileInfo.setTotalSize(totalSize); try { zip.close(); } catch (IOException e) { log.debug("", e); } return fileInfo; } /** * 解压缩文件 * @param zipFile * @param uncompressPath * @return * @throws IOException */ public static File uncompressZip(File zipFile, String uncompressPath) throws IOException { ZipFile zip = null ;// 创建解压缩文件 File file = new File(uncompressPath); try { zip = new ZipFile(zipFile); } catch (IOException e) { log.error("", e); return file; } // 如果指定的路径不存在,创建文件夹 if (!file.exists() || !file.isDirectory()) { if (!file.mkdirs()) { StringBuilder exception = new StringBuilder(); exception.append(uncompressPath); exception.append("路径不可到达,并且解压缩"); exception.append(zipFile); exception.append("失败!"); throw new IOException(exception.toString()); } } // 返回 ZIP 文件条目的枚举。 Enumeration en = zip.entries(); ZipEntry entry = null; String childPath = ""; // 遍历每一个文件 while (en.hasMoreElements()) { // 如果压缩包还有下一个文件,则循环 entry = (ZipEntry) en.nextElement(); childPath = uncompressPath+ "/" +entry.getName(); if (entry.isDirectory()) { // 如果是文件夹,创建文件夹并加速循环 File file1 = new File(childPath); file1.mkdirs(); continue; } // 构建文件对象 InputStream in = null; OutputStream out = null; try { in = zip.getInputStream(entry); out = new FileOutputStream(childPath); IOUtils.copy(in, out); out.flush(); } catch (IOException e) { log.warn("", e); } finally { if (in != null) { try { in.close(); } catch (IOException e) { log.debug("", e); } } if (out != null) { try { out.close(); } catch (IOException e) { log.debug("", e); } } } log.debug("解压文件:" + entry.getName() + "成功!"); } try { zip.close(); } catch (IOException e) { log.debug("", e); } final File[] files = file.listFiles(new FileFilter() { @Override public boolean accept(File pathname) { return pathname.isFile(); } }); return files[0]; } /** * 递归获得该文件夹下所有文件(不包括目录). * 如果该文件路径指向一个文件,则返回该文件的单个集合, * 如果该文件指向一个目录,则返回该目录下的所有文件 * * @param file 可以是目录,也可以是文件,当是文件时,直接返回该文件的列表 * @return 返回该文件夹下的所有子文件 */ public List getSubFile(File file) { // 文件列表对象 List fileList = new ArrayList(); if (file.isFile()) { // 如果是普通文件,直接把该文件添加到文件列表 fileList.add(file); return fileList; } if (file.isDirectory()) { fileList.add(file); // 如果是目录,则遍历目录下的所有文件 for (File f : file.listFiles()) { // 这里使用的递归,一直到文件目录的最底层,而文件列表里存的,全是文件 fileList.addAll(getSubFile(f)); } } return fileList; } /** * 解压缩指定zip文件. * * @param zipFile * zip文件名 * @param uncompressPath * 解压说的后要保存的文件路径,若没有,系统自动新建 * @throws IOException 解压缩异常 */ public void uncompressZip(File zipFile, File uncompressPath) throws IOException { ZipFile zip = null;// 创建解压缩文件 try { zip = new ZipFile(zipFile); } catch (IOException e) { log.error("", e); return; } // 如果指定的路径不存在,创建文件夹 if (!uncompressPath.exists()) { if (!uncompressPath.mkdirs()) { StringBuilder exception = new StringBuilder(); exception.append(uncompressPath); exception.append("路径不可到达,并且解压缩"); exception.append(zipFile); exception.append("失败!"); throw new IOException(exception.toString()); } } // 返回 ZIP 文件条目的枚举。 Enumeration en = zip.entries(); ZipEntry entry = null; File file = null; // 遍历每一个文件 while (en.hasMoreElements()) { // 如果压缩包还有下一个文件,则循环 entry = (ZipEntry) en.nextElement(); if (entry.isDirectory()) { // 如果是文件夹,创建文件夹并加速循环 file = new File(uncompressPath, entry.getName()); file.mkdirs(); continue; } // 构建文件对象 file = new File(uncompressPath, entry.getName()); if (!file.getParentFile().exists()) { // 如果文件对象的父目录不存在,创建文件夹 if(!file.getParentFile().mkdirs()){ log.debug("can not create dir: " + file.getAbsolutePath()); } } InputStream in = null; FileOutputStream out = null; try { in = zip.getInputStream(entry); out = new FileOutputStream(file); byte[] bytes = new byte[2048]; int size = -1; while((size = in.read(bytes)) != -1){ out.write(bytes, 0, size); } out.flush(); } catch (IOException e) { log.warn("", e); } finally { if(in != null){ try { in.close(); }catch(IOException e) { log.debug("", e); } } if(out != null){ try { out.close(); }catch(IOException e) { log.debug("", e); } } } log.debug("解压文件:" + entry.getName() + "成功!"); } try{ zip.close(); } catch(IOException e) { log.debug("", e); } } /** * 压缩文件成Gzip格式,Linux上可使用 * 压缩文件夹生成后缀名为".gz"的文件并下载 * */ public static void compressTarGz(String folderPath, String targzipFilePath) { File srcPath =new File(folderPath); int length=srcPath.listFiles().length; byte[] buf = new byte[4096]; //设定读入缓冲区尺寸 File[] files = srcPath.listFiles(); try { //建立压缩文件输出流 FileOutputStream fout=new FileOutputStream(targzipFilePath); //建立tar压缩输出流 TarOutputStream tout=new TarOutputStream(fout); for(int i=0;i