package com.yiidata.parcel.utils; import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream; import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; import org.apache.tools.tar.TarEntry; import org.apache.tools.tar.TarInputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.BufferedInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.nio.file.FileSystem; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.nio.file.StandardOpenOption; import java.util.ArrayList; import java.util.List; import java.util.zip.ZipEntry; import java.util.zip.ZipInputStream; /** * Created by sssd on 2017/8/18. */ public abstract class UncompressFactory { protected static Logger logger = LoggerFactory.getLogger(UncompressFactory.class); private UncompressFactory() { } /** * 定义解压抽象方法 * @param hdfsinput hdfs 输入路径 * @param fs hadaoop confit * @param uncompressPath 解压后输出路径 * @return */ public abstract List uncompress(Path hdfsinput, FileSystem fs, Path uncompressPath); /** * 根据文件扩展名,返回对应的解压方法。 * @param sub * @return */ public static UncompressFactory getUncompressFactory(String sub) { if("zip".equals(sub)) { return new ZipUncompress(); } else if("tar".equals(sub)) { return new TarUncompress(); } else if("gzip".equals(sub)) { return new TgzUncompress(); } else if("tgz".equals(sub)) { return new TgzUncompress(); } else if("gz".equals(sub)) { return new TgzUncompress(); } return null; } /** * .zip解压 */ static class ZipUncompress extends UncompressFactory { @Override public List uncompress(Path hdfsinput, FileSystem fs, Path uncompressPath) { try { return uncompressZip(fs, hdfsinput, uncompressPath); // hadoop, hdfs输入路径, 解压保存路径 } catch (IOException e) { throw new IllegalStateException(e); } } } /** * .tar 解压 */ static class TarUncompress extends UncompressFactory { @Override public List uncompress(Path hdfsinput, FileSystem fs, Path uncompressPath) { try { return uncompressTar(fs, hdfsinput, uncompressPath); // hadoop, hdfs输入路径, 解压保存路径 } catch (IOException e) { throw new IllegalStateException(e); } } } /** * .tgz 解压 */ static class TgzUncompress extends UncompressFactory { @Override public List uncompress(Path hdfsinput, FileSystem fs, Path uncompressPath) { try { Path temp = uncompressTgz(fs, hdfsinput); return uncompressTar(fs, temp, uncompressPath); // hadoop, hdfs输入路径, 解压保存路径 } catch (IOException e) { throw new IllegalStateException(e); } } } /** * zip解压 * @param fs * @param input * @param uncompressPath * @return * @throws IOException */ public static List uncompressZip(FileSystem fs, Path input, Path uncompressPath) throws IOException { OutputStream out = null; List paths = new ArrayList(); ZipInputStream zipInputStream = null; try { if (!Files.exists(input)) { throw new IllegalArgumentException(input.toString() + " does not exist"); } zipInputStream = new ZipInputStream(Files.newInputStream(input)); ZipEntry zipEntry = null; Path path = null; while ((zipEntry = zipInputStream.getNextEntry()) != null ) { String entryName = zipEntry.getName(); if (zipEntry.isDirectory()) { // 如果是文件夹,创建文件夹并加速循环 path = Paths.get(uncompressPath.toString(), entryName); Files.createDirectories(path); continue; } path = Paths.get(uncompressPath.toString(), entryName); out = Files.newOutputStream(path, StandardOpenOption.CREATE_NEW); IOUtils.copy(zipInputStream, out); out.flush(); zipInputStream.closeEntry(); paths.add(path.toString()); logger.info("解压文件: {} 成功!", entryName); } } catch (Exception e) { e.printStackTrace(); } finally { if(out != null){ try { out.close(); } catch (IOException e) { e.printStackTrace(); } } if(zipInputStream != null){ try { zipInputStream.close(); } catch (IOException e) { e.printStackTrace(); } } } return paths; } /** * .Tar 解压 * @param fs * @param input * @param uncompressPath * @return * @throws IOException */ public static List uncompressTar(FileSystem fs, Path input, Path uncompressPath) throws IOException { OutputStream out = null; List paths = new ArrayList(); TarInputStream tarInputStream = null; if (!Files.exists(input)) { throw new IllegalArgumentException(input.toString() + " does not exist"); } try { tarInputStream = new TarInputStream(Files.newInputStream(input)); TarEntry entry = null; Path path = null; while ( ( entry = tarInputStream.getNextEntry()) != null ){ String entryName = entry.getName(); if(entry.isDirectory()){ path = Paths.get(uncompressPath.toString(), entryName); Files.createDirectories(path); continue; } path = Paths.get(uncompressPath.toString(), entryName); out = Files.newOutputStream(path, StandardOpenOption.CREATE_NEW); IOUtils.copy(tarInputStream, out); out.flush(); paths.add(path.toString()); logger.info("解压文件: {} 成功!", entryName); } } catch (IOException e) { e.printStackTrace(); } finally { if ( out != null){ try { out.close(); } catch (IOException e) { e.printStackTrace(); } } if(tarInputStream != null){ try { tarInputStream.close(); } catch (IOException e) { e.printStackTrace(); } } } return paths; } /** * .tgz 解压 * @param fs * @param input * @return * @throws IOException */ public static Path uncompressTgz(FileSystem fs, Path input) throws IOException { int buffersize = 2048; OutputStream out = null; GzipCompressorInputStream gzin = null; InputStream hdfsinput = null; String temppath = null; Path outPath = null; if (!Files.exists(input)) { throw new IllegalArgumentException(input.toString() + " does not exist"); } int i = input.toString().lastIndexOf("/"); if ( i > 0){ temppath = StringUtils.trimToNull(input.toString().substring(0,i)); } try { hdfsinput = Files.newInputStream(input); BufferedInputStream in = new BufferedInputStream(hdfsinput); outPath = Paths.get(temppath, "tmp-" + System.currentTimeMillis() + ".tar"); if( Files.exists(outPath)){ Files.delete(outPath); } out = Files.newOutputStream(outPath); gzin = new GzipCompressorInputStream(in); final byte[] buffer = new byte[buffersize]; int n = 0; while (-1 != (n = gzin.read(buffer))) { out.write(buffer, 0, n); } logger.info("临时文件的保存路径为:" + outPath.toString()); } catch (IOException e) { e.printStackTrace(); } finally { if(hdfsinput != null){ try { hdfsinput.close(); } catch (IOException e) { e.printStackTrace(); } } if(out!= null){ try { out.close(); } catch (IOException e) { e.printStackTrace(); } } } return outPath; } }