123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692 |
- package com.jiusuo.flume.sink.compress;
- import org.apache.commons.io.IOUtils;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.FileStatus;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.fs.PathFilter;
- import org.apache.tools.tar.TarEntry;
- import org.apache.tools.tar.TarOutputStream;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import java.io.*;
- import java.util.ArrayList;
- import java.util.Enumeration;
- import java.util.List;
- import java.util.zip.*;
- import java.util.zip.ZipFile;
- /**
- *
- * @author Rayn on 2016/1/22 14:00
- * @email liuwei412552703@163.com
- */
- public class ZipHdfsOperator {
- /**
- * log
- */
- private static Logger log = LoggerFactory.getLogger(ZipHdfsOperator.class);
- public ZipHdfsOperator() {
- }
- /**
- * 压缩指定文件集.
- *
- * @param compressFilePath 压缩后的zip文件保存的路径
- * @param fs File System
- * @param path 需要被压缩的文件或者文件夹集合
- * @return 返回压缩后的文件
- * @throws IOException 压缩异常
- */
- public static Path compressZip(String compressFilePath, FileSystem fs, String... path) throws IOException {
- return compressZip(new Path(compressFilePath), fs, path);
- }
- /**
- * 压缩指定文件集.
- *
- * @param path 被压缩的文件集合
- * @param compressFile 压缩后的zip文件保存的路径
- * @return 返回压缩后的文件
- * @throws IOException 压缩异常
- */
- public static Path compressZip(Path compressFile, FileSystem fs, String... path) throws IOException {
- if (!fs.exists(compressFile)) {
- if (!fs.exists(compressFile.getParent())) {
- if (!fs.mkdirs(compressFile.getParent())) {
- StringBuilder exception = new StringBuilder();
- exception.append("系统找不到指定的路径: ");
- exception.append(compressFile.getParent().toString());
- exception.append("并且创建: ");
- exception.append(compressFile.toString());
- exception.append("失败!");
- throw new IOException(exception.toString());
- }
- }
- if (!fs.createNewFile(compressFile)) {
- StringBuilder exception = new StringBuilder();
- exception.append("创建文件: ");
- exception.append(compressFile.toString());
- exception.append("失败!");
- throw new IOException(exception.toString());
- }
- }
- ZipOutputStream zos = null;
- try {
- zos = new ZipOutputStream(fs.create(compressFile));
- for (String fileName : path) {
- Path compFile = new Path(fileName);
- if (fs.isFile(compFile)) {
- compressZip0(zos, fs, compFile, compFile.getName());
- } else {
- compressZip0(zos, fs, compFile, "");
- }
- }
- // 当压缩完成,关闭流
- zos.closeEntry();
- } catch (Exception e) {
- log.warn("", e);
- } finally {
- if (zos != null) {
- zos.close();
- }
- }
- return compressFile;
- }
- /**
- * 递归压缩
- *
- * @param zos
- * @param path
- * @param baseDir
- */
- private static void compressZip0(ZipOutputStream zos, FileSystem fs, Path path, String baseDir) throws IOException {
- // 压缩文件缓冲区大小
- InputStream in = null;
- if (fs.isFile(path)) {
- try {
- // 生成下一个压缩节点
- zos.putNextEntry(new ZipEntry(baseDir));
- } catch (IOException e1) {
- log.warn("", e1);
- }
- byte[] buffere = new byte[4096];
- int length = 0;// 读取的长度
- try {
- in = fs.open(path);
- while ((length = in.read(buffere)) != -1) {
- zos.write(buffere, 0, length);
- }
- } catch (IOException e) {
- log.error("", e);
- } finally {
- // 当压缩完成,关闭流
- if (in != null) {
- try {
- in.close();
- } catch (IOException e) {
- log.debug("", e);
- }
- }
- }
- log.info("压缩文件: " + path + " 成功!");
- return;
- } else {
- try {
- zos.putNextEntry(new ZipEntry(baseDir + "/"));
- } catch (IOException e) {
- log.warn("", e);
- }
- baseDir = baseDir.length() == 0 ? "" : (baseDir + "/");
- FileStatus[] statuses = fs.listStatus(path, new PathFilter() {
- @Override
- public boolean accept(Path path) {
- String name = path.getName();
- return !name.startsWith(".") && !name.startsWith("_");
- }
- });
- // 遍历所有文件,逐个进行压缩
- for (FileStatus status : statuses) {
- compressZip0(zos, fs, status.getPath(), baseDir + status.getPath().getName());
- }
- }
- }
- /**
- * 解压缩指定zip文件名.
- *
- * @param zipFile zip文件名
- * @param uncompressPath 解压说的后要保存的文件路径,若没有,系统自动新建
- * @throws IOException 解压缩异常
- */
- public static void uncompressZip(String zipFile, FileSystem fs, String uncompressPath) throws IOException {
- uncompressZip(new File(zipFile), fs, new Path(uncompressPath));
- }
- /**
- * 解压缩指定zip文件.
- *
- * @param zipFile zip文件名
- * @param uncompressPath 解压说的后要保存的文件路径,若没有,系统自动新建
- * @throws IOException 解压缩异常
- */
- public static List<Path> uncompressZip(FileSystem fs, File zipFile, Path uncompressPath) throws IOException {
- ZipFile zip = null;// 创建解压缩文件
- List<Path> paths = new ArrayList<Path>(3);
- try {
- zip = new ZipFile(zipFile);
- Enumeration<? extends ZipEntry> en = zip.entries();
- ZipEntry entry = null;
- Path path = null;
- // 遍历每一个文件
- while (en.hasMoreElements()) {
- // 如果压缩包还有下一个文件,则循环
- entry = (ZipEntry) en.nextElement();
- if (entry.isDirectory()) {
- // 如果是文件夹,创建文件夹并加速循环
- path = new Path(uncompressPath, entry.getName());
- fs.mkdirs(path);
- continue;
- }
- // 构建文件对象
- path = new Path(uncompressPath, entry.getName());
- InputStream in = null;
- OutputStream out = null;
- try {
- in = zip.getInputStream(entry);
- out = fs.create(path, true);
- IOUtils.copy(in, out);
- out.flush();
- } catch (IOException e) {
- e.printStackTrace();
- } finally {
- if (in != null) {
- try {
- in.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- if (out != null) {
- try {
- out.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- paths.add(path);
- log.info("解压文件: {} 成功!" + entry.getName());
- }
- } catch (IOException e) {
- e.printStackTrace();
- } finally {
- if (zip != null) {
- try {
- zip.close();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- return paths;
- }
- /**
- * 解压缩指定zip文件.
- *
- * @param zipFile zip文件名
- * @param uncompressPath 解压说的后要保存的文件路径,若没有,系统自动新建
- * @throws IOException 解压缩异常
- */
- public static FileInfo uncompressZip(File zipFile, FileSystem fs, Path uncompressPath) throws IOException {
- ZipFile zip = null;// 创建解压缩文件
- FileInfo fileInfo = new FileInfo();
- try {
- zip = new ZipFile(zipFile);
- String name = zip.getName();
- fileInfo.setExtendName(name);
- } catch (IOException e) {
- log.error("", e);
- return fileInfo;
- }
- // 如果指定的路径不存在,创建文件夹
- if (!fs.exists(uncompressPath) || !fs.isDirectory(uncompressPath)) {
- if (!fs.mkdirs(uncompressPath)) {
- StringBuilder exception = new StringBuilder();
- exception.append(uncompressPath);
- exception.append("路径不可到达,并且解压缩");
- exception.append(zipFile);
- exception.append("失败!");
- throw new IOException(exception.toString());
- }
- }
- // 返回 ZIP 文件条目的枚举。
- Enumeration<? extends ZipEntry> en = zip.entries();
- ZipEntry entry = null;
- int fileCount = 0;
- long totalSize = 0;
- Path path = null;
- // 遍历每一个文件
- while (en.hasMoreElements()) {
- // 如果压缩包还有下一个文件,则循环
- entry = (ZipEntry) en.nextElement();
- if (entry.isDirectory()) {
- // 如果是文件夹,创建文件夹并加速循环
- path = new Path(uncompressPath, entry.getName());
- fs.mkdirs(path);
- continue;
- }
- // 构建文件对象
- path = new Path(uncompressPath, entry.getName());
- log.info(entry.getName());
- InputStream in = null;
- OutputStream out = null;
- try {
- in = zip.getInputStream(entry);
- //总文件大小
- totalSize += in.available();
- //文件总数
- if(!entry.getName().toString().endsWith(".xml") && !entry.getName().toString().endsWith(".avro")){
- fileCount += 1;
- }
- out = fs.create(path, true);
- IOUtils.copy(in, out);
- out.flush();
- } catch (IOException e) {
- log.warn("", e);
- } finally {
- if (in != null) {
- try {
- in.close();
- } catch (IOException e) {
- log.debug("", e);
- }
- }
- if (out != null) {
- try {
- out.close();
- } catch (IOException e) {
- log.debug("", e);
- }
- }
- }
- log.debug("解压文件:" + entry.getName() + "成功!");
- }
- fileInfo.setFileCount(fileCount);
- fileInfo.setTotalSize(totalSize);
- try {
- zip.close();
- } catch (IOException e) {
- log.debug("", e);
- }
- return fileInfo;
- }
- /**
- * 解压缩文件
- * @param zipFile
- * @param uncompressPath
- * @return
- * @throws IOException
- */
- public static File uncompressZip(File zipFile, String uncompressPath) throws IOException {
- ZipFile zip = null ;// 创建解压缩文件
- File file = new File(uncompressPath);
- try {
- zip = new ZipFile(zipFile);
- } catch (IOException e) {
- log.error("", e);
- return file;
- }
- // 如果指定的路径不存在,创建文件夹
- if (!file.exists() || !file.isDirectory()) {
- if (!file.mkdirs()) {
- StringBuilder exception = new StringBuilder();
- exception.append(uncompressPath);
- exception.append("路径不可到达,并且解压缩");
- exception.append(zipFile);
- exception.append("失败!");
- throw new IOException(exception.toString());
- }
- }
- // 返回 ZIP 文件条目的枚举。
- Enumeration<? extends ZipEntry> en = zip.entries();
- ZipEntry entry = null;
- String childPath = "";
- // 遍历每一个文件
- while (en.hasMoreElements()) {
- // 如果压缩包还有下一个文件,则循环
- entry = (ZipEntry) en.nextElement();
- childPath = uncompressPath+ "/" +entry.getName();
- if (entry.isDirectory()) {
- // 如果是文件夹,创建文件夹并加速循环
- File file1 = new File(childPath);
- file1.mkdirs();
- continue;
- }
- // 构建文件对象
- InputStream in = null;
- OutputStream out = null;
- try {
- in = zip.getInputStream(entry);
- out = new FileOutputStream(childPath);
- IOUtils.copy(in, out);
- out.flush();
- } catch (IOException e) {
- log.warn("", e);
- } finally {
- if (in != null) {
- try {
- in.close();
- } catch (IOException e) {
- log.debug("", e);
- }
- }
- if (out != null) {
- try {
- out.close();
- } catch (IOException e) {
- log.debug("", e);
- }
- }
- }
- log.debug("解压文件:" + entry.getName() + "成功!");
- }
- try {
- zip.close();
- } catch (IOException e) {
- log.debug("", e);
- }
- final File[] files = file.listFiles(new FileFilter() {
- @Override
- public boolean accept(File pathname) {
- return pathname.isFile();
- }
- });
- return files[0];
- }
- /**
- * 递归获得该文件夹下所有文件(不包括目录).
- * 如果该文件路径指向一个文件,则返回该文件的单个集合,
- * 如果该文件指向一个目录,则返回该目录下的所有文件
- *
- * @param file 可以是目录,也可以是文件,当是文件时,直接返回该文件的列表
- * @return 返回该文件夹下的所有子文件
- */
- public List<File> getSubFile(File file) {
- // 文件列表对象
- List<File> fileList = new ArrayList<File>();
- if (file.isFile()) {
- // 如果是普通文件,直接把该文件添加到文件列表
- fileList.add(file);
- return fileList;
- }
- if (file.isDirectory()) {
- fileList.add(file);
- // 如果是目录,则遍历目录下的所有文件
- for (File f : file.listFiles()) {
- // 这里使用的递归,一直到文件目录的最底层,而文件列表里存的,全是文件
- fileList.addAll(getSubFile(f));
- }
- }
- return fileList;
- }
- /**
- * 解压缩指定zip文件.
- *
- * @param zipFile
- * zip文件名
- * @param uncompressPath
- * 解压说的后要保存的文件路径,若没有,系统自动新建
- * @throws IOException 解压缩异常
- */
- public void uncompressZip(File zipFile, File uncompressPath)
- throws IOException {
- ZipFile zip = null;// 创建解压缩文件
- try {
- zip = new ZipFile(zipFile);
- } catch (IOException e) {
- log.error("", e);
- return;
- }
- // 如果指定的路径不存在,创建文件夹
- if (!uncompressPath.exists()) {
- if (!uncompressPath.mkdirs()) {
- StringBuilder exception = new StringBuilder();
- exception.append(uncompressPath);
- exception.append("路径不可到达,并且解压缩");
- exception.append(zipFile);
- exception.append("失败!");
- throw new IOException(exception.toString());
- }
- }
- // 返回 ZIP 文件条目的枚举。
- Enumeration<? extends ZipEntry> en = zip.entries();
- ZipEntry entry = null;
- File file = null;
- // 遍历每一个文件
- while (en.hasMoreElements()) {
- // 如果压缩包还有下一个文件,则循环
- entry = (ZipEntry) en.nextElement();
- if (entry.isDirectory()) {
- // 如果是文件夹,创建文件夹并加速循环
- file = new File(uncompressPath, entry.getName());
- file.mkdirs();
- continue;
- }
- // 构建文件对象
- file = new File(uncompressPath, entry.getName());
- if (!file.getParentFile().exists()) {
- // 如果文件对象的父目录不存在,创建文件夹
- if(!file.getParentFile().mkdirs()){
- log.debug("can not create dir: " + file.getAbsolutePath());
- }
- }
- InputStream in = null;
- FileOutputStream out = null;
- try {
- in = zip.getInputStream(entry);
- out = new FileOutputStream(file);
- byte[] bytes = new byte[2048];
- int size = -1;
- while((size = in.read(bytes)) != -1){
- out.write(bytes, 0, size);
- }
- out.flush();
- } catch (IOException e) {
- log.warn("", e);
- } finally {
- if(in != null){
- try {
- in.close();
- }catch(IOException e) {
- log.debug("", e);
- }
- }
- if(out != null){
- try {
- out.close();
- }catch(IOException e) {
- log.debug("", e);
- }
- }
- }
- log.debug("解压文件:" + entry.getName() + "成功!");
- }
- try{
- zip.close();
- } catch(IOException e) {
- log.debug("", e);
- }
- }
- /**
- * 压缩文件成Gzip格式,Linux上可使用
- * 压缩文件夹生成后缀名为".gz"的文件并下载
- * */
- public static void compressTarGz(String folderPath, String targzipFilePath) {
- File srcPath =new File(folderPath);
- int length=srcPath.listFiles().length;
- byte[] buf = new byte[4096]; //设定读入缓冲区尺寸
- File[] files = srcPath.listFiles();
- try
- {
- //建立压缩文件输出流
- FileOutputStream fout=new FileOutputStream(targzipFilePath);
- //建立tar压缩输出流
- TarOutputStream tout=new TarOutputStream(fout);
- for(int i=0;i<length;i++)
- {
- String filename=srcPath.getPath()+File.separator+files[i].getName();
- //打开需压缩文件作为文件输入流
- FileInputStream fin=new FileInputStream(filename); //filename是文件全路径
- TarEntry tarEn=new TarEntry(files[i]); //此处必须使用new TarEntry(File file);
- tarEn.setName(files[i].getName()); //此处需重置名称,默认是带全路径的,否则打包后会带全路径
- tout.putNextEntry(tarEn);
- int num;
- while ((num=fin.read(buf)) != -1)
- {
- tout.write(buf,0,num);
- }
- tout.closeEntry();
- fin.close();
- }
- tout.close();
- fout.close();
- //建立压缩文件输出流
- FileOutputStream gzFile=new FileOutputStream(targzipFilePath+".gz");
- //建立gzip压缩输出流
- GZIPOutputStream gzout=new GZIPOutputStream(gzFile);
- //打开需压缩文件作为文件输入流
- FileInputStream tarin=new FileInputStream(targzipFilePath); //targzipFilePath是文件全路径
- int len;
- while ((len=tarin.read(buf)) != -1)
- {
- gzout.write(buf,0,len);
- }
- gzout.close();
- gzFile.close();
- tarin.close();
- }catch(FileNotFoundException e)
- {
- System.out.println(e);
- }catch(IOException e)
- {
- System.out.println(e);
- }
- }
- /**
- * 递归删除目录下的所有文件及子目录下所有文件
- * @param dir 将要删除的文件目录
- * @return boolean Returns "true" if all deletions were successful.
- * If a deletion fails, the method stops attempting to
- * delete and returns "false".
- */
- private static boolean deleteDir(File dir) {
- if (dir.isDirectory()) {
- String[] children = dir.list();
- //递归删除目录中的子目录下
- for (int i=0; i<children.length; i++) {
- boolean success = deleteDir(new File(dir, children[i]));
- if (!success) {
- return false;
- }
- }
- }
- // 目录此时为空,可以删除
- return dir.delete();
- }
- public static class FileInfo {
- private int fileCount = 0;
- private long totalSize = 0;
- private String extendName = "zip";
- public FileInfo() {
- }
- public FileInfo(int fileCount, long totalSize, String extendName) {
- this.fileCount = fileCount;
- this.totalSize = totalSize;
- this.extendName = extendName;
- }
- public int getFileCount() {
- return fileCount;
- }
- public void setFileCount(int fileCount) {
- this.fileCount = fileCount;
- }
- public long getTotalSize() {
- return totalSize;
- }
- public void setTotalSize(long totalSize) {
- this.totalSize = totalSize;
- }
- public String getExtendName() {
- return extendName;
- }
- public void setExtendName(String extendName) {
- this.extendName = extendName;
- }
- }
- public static void main(String[] args) throws IOException {
- FileSystem fs = FileSystem.get(new Configuration());
- ZipHdfsOperator.compressZip(new Path("/user/zhaopingxi/fenguang-test/fengaung-comp.zip"), fs, "/user/zhaopingxi/comp");
- }
- }
|