123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283 |
- package com.yiidata.parcel.utils;
- import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
- import org.apache.commons.io.IOUtils;
- import org.apache.commons.lang.StringUtils;
- import org.apache.tools.tar.TarEntry;
- import org.apache.tools.tar.TarInputStream;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import java.io.BufferedInputStream;
- import java.io.IOException;
- import java.io.InputStream;
- import java.io.OutputStream;
- import java.nio.file.FileSystem;
- import java.nio.file.Files;
- import java.nio.file.Path;
- import java.nio.file.Paths;
- import java.nio.file.StandardOpenOption;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.zip.ZipEntry;
- import java.util.zip.ZipInputStream;
- /**
- * Created by sssd on 2017/8/18.
- */
- public abstract class UncompressFactory {
- protected static Logger logger = LoggerFactory.getLogger(UncompressFactory.class);
- private UncompressFactory() {
- }
- /**
- * 定义解压抽象方法
- * @param hdfsinput hdfs 输入路径
- * @param fs hadaoop confit
- * @param uncompressPath 解压后输出路径
- * @return
- */
- public abstract List<String> uncompress(Path hdfsinput, FileSystem fs, Path uncompressPath);
- /**
- * 根据文件扩展名,返回对应的解压方法。
- * @param sub
- * @return
- */
- public static UncompressFactory getUncompressFactory(String sub) {
- if("zip".equals(sub)) {
- return new ZipUncompress();
- } else if("tar".equals(sub)) {
- return new TarUncompress();
- } else if("gzip".equals(sub)) {
- return new TgzUncompress();
- } else if("tgz".equals(sub)) {
- return new TgzUncompress();
- } else if("gz".equals(sub)) {
- return new TgzUncompress();
- }
- return null;
- }
- /**
- * .zip解压
- */
- static class ZipUncompress extends UncompressFactory {
- @Override
- public List<String> uncompress(Path hdfsinput, FileSystem fs, Path uncompressPath) {
- try {
- return uncompressZip(fs, hdfsinput, uncompressPath); // hadoop, hdfs输入路径, 解压保存路径
- } catch (IOException e) {
- throw new IllegalStateException(e);
- }
- }
- }
- /**
- * .tar 解压
- */
- static class TarUncompress extends UncompressFactory {
- @Override
- public List<String> uncompress(Path hdfsinput, FileSystem fs, Path uncompressPath) {
- try {
- return uncompressTar(fs, hdfsinput, uncompressPath); // hadoop, hdfs输入路径, 解压保存路径
- } catch (IOException e) {
- throw new IllegalStateException(e);
- }
- }
- }
- /**
- * .tgz 解压
- */
- static class TgzUncompress extends UncompressFactory {
- @Override
- public List<String> uncompress(Path hdfsinput, FileSystem fs, Path uncompressPath) {
- try {
- Path temp = uncompressTgz(fs, hdfsinput);
- return uncompressTar(fs, temp, uncompressPath); // hadoop, hdfs输入路径, 解压保存路径
- } catch (IOException e) {
- throw new IllegalStateException(e);
- }
- }
- }
- /**
- * zip解压
- * @param fs
- * @param input
- * @param uncompressPath
- * @return
- * @throws IOException
- */
- public static List<String> uncompressZip(FileSystem fs, Path input, Path uncompressPath) throws IOException {
- OutputStream out = null;
- List<String> paths = new ArrayList<String>();
- ZipInputStream zipInputStream = null;
- try {
- if (!Files.exists(input)) {
- throw new IllegalArgumentException(input.toString() + " does not exist");
- }
- zipInputStream = new ZipInputStream(Files.newInputStream(input));
- ZipEntry zipEntry = null;
- Path path = null;
- while ((zipEntry = zipInputStream.getNextEntry()) != null ) {
- String entryName = zipEntry.getName();
- if (zipEntry.isDirectory()) {
- // 如果是文件夹,创建文件夹并加速循环
- path = Paths.get(uncompressPath.toString(), entryName);
- Files.createDirectories(path);
- continue;
- }
- path = Paths.get(uncompressPath.toString(), entryName);
- out = Files.newOutputStream(path, StandardOpenOption.CREATE_NEW);
- IOUtils.copy(zipInputStream, out);
- out.flush();
- zipInputStream.closeEntry();
- paths.add(path.toString());
- logger.info("解压文件: {} 成功!", entryName);
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- if(out != null){
- try {
- out.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- if(zipInputStream != null){
- try {
- zipInputStream.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- return paths;
- }
- /**
- * .Tar 解压
- * @param fs
- * @param input
- * @param uncompressPath
- * @return
- * @throws IOException
- */
- public static List<String> uncompressTar(FileSystem fs, Path input, Path uncompressPath) throws IOException {
- OutputStream out = null;
- List<String> paths = new ArrayList<String>();
- TarInputStream tarInputStream = null;
- if (!Files.exists(input)) {
- throw new IllegalArgumentException(input.toString() + " does not exist");
- }
- try {
- tarInputStream = new TarInputStream(Files.newInputStream(input));
- TarEntry entry = null;
- Path path = null;
- while ( ( entry = tarInputStream.getNextEntry()) != null ){
- String entryName = entry.getName();
- if(entry.isDirectory()){
- path = Paths.get(uncompressPath.toString(), entryName);
- Files.createDirectories(path);
- continue;
- }
- path = Paths.get(uncompressPath.toString(), entryName);
- out = Files.newOutputStream(path, StandardOpenOption.CREATE_NEW);
- IOUtils.copy(tarInputStream, out);
- out.flush();
- paths.add(path.toString());
- logger.info("解压文件: {} 成功!", entryName);
- }
- } catch (IOException e) {
- e.printStackTrace();
- } finally {
- if ( out != null){
- try {
- out.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- if(tarInputStream != null){
- try {
- tarInputStream.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- return paths;
- }
- /**
- * .tgz 解压
- * @param fs
- * @param input
- * @return
- * @throws IOException
- */
- public static Path uncompressTgz(FileSystem fs, Path input) throws IOException {
- int buffersize = 2048;
- OutputStream out = null;
- GzipCompressorInputStream gzin = null;
- InputStream hdfsinput = null;
- String temppath = null;
- Path outPath = null;
- if (!Files.exists(input)) {
- throw new IllegalArgumentException(input.toString() + " does not exist");
- }
- int i = input.toString().lastIndexOf("/");
- if ( i > 0){
- temppath = StringUtils.trimToNull(input.toString().substring(0,i));
- }
- try {
- hdfsinput = Files.newInputStream(input);
- BufferedInputStream in = new BufferedInputStream(hdfsinput);
- outPath = Paths.get(temppath, "tmp-" + System.currentTimeMillis() + ".tar");
- if( Files.exists(outPath)){
- Files.delete(outPath);
- }
- out = Files.newOutputStream(outPath);
- gzin = new GzipCompressorInputStream(in);
- final byte[] buffer = new byte[buffersize];
- int n = 0;
- while (-1 != (n = gzin.read(buffer))) {
- out.write(buffer, 0, n);
- }
- logger.info("临时文件的保存路径为:" + outPath.toString());
- } catch (IOException e) {
- e.printStackTrace();
- } finally {
- if(hdfsinput != null){
- try {
- hdfsinput.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- if(out!= null){
- try {
- out.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- return outPath;
- }
- }
|