ZipHdfsOperator.java 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692
  1. package com.jiusuo.flume.sink.compress;
  2. import org.apache.commons.io.IOUtils;
  3. import org.apache.hadoop.conf.Configuration;
  4. import org.apache.hadoop.fs.FileStatus;
  5. import org.apache.hadoop.fs.FileSystem;
  6. import org.apache.hadoop.fs.Path;
  7. import org.apache.hadoop.fs.PathFilter;
  8. import org.apache.tools.tar.TarEntry;
  9. import org.apache.tools.tar.TarOutputStream;
  10. import org.slf4j.Logger;
  11. import org.slf4j.LoggerFactory;
  12. import java.io.*;
  13. import java.util.ArrayList;
  14. import java.util.Enumeration;
  15. import java.util.List;
  16. import java.util.zip.*;
  17. import java.util.zip.ZipFile;
  18. /**
  19. *
  20. * @author Rayn on 2016/1/22 14:00
  21. * @email liuwei412552703@163.com
  22. */
  23. public class ZipHdfsOperator {
  24. /**
  25. * log
  26. */
  27. private static Logger log = LoggerFactory.getLogger(ZipHdfsOperator.class);
  28. public ZipHdfsOperator() {
  29. }
  30. /**
  31. * 压缩指定文件集.
  32. *
  33. * @param compressFilePath 压缩后的zip文件保存的路径
  34. * @param fs File System
  35. * @param path 需要被压缩的文件或者文件夹集合
  36. * @return 返回压缩后的文件
  37. * @throws IOException 压缩异常
  38. */
  39. public static Path compressZip(String compressFilePath, FileSystem fs, String... path) throws IOException {
  40. return compressZip(new Path(compressFilePath), fs, path);
  41. }
  42. /**
  43. * 压缩指定文件集.
  44. *
  45. * @param path 被压缩的文件集合
  46. * @param compressFile 压缩后的zip文件保存的路径
  47. * @return 返回压缩后的文件
  48. * @throws IOException 压缩异常
  49. */
  50. public static Path compressZip(Path compressFile, FileSystem fs, String... path) throws IOException {
  51. if (!fs.exists(compressFile)) {
  52. if (!fs.exists(compressFile.getParent())) {
  53. if (!fs.mkdirs(compressFile.getParent())) {
  54. StringBuilder exception = new StringBuilder();
  55. exception.append("系统找不到指定的路径: ");
  56. exception.append(compressFile.getParent().toString());
  57. exception.append("并且创建: ");
  58. exception.append(compressFile.toString());
  59. exception.append("失败!");
  60. throw new IOException(exception.toString());
  61. }
  62. }
  63. if (!fs.createNewFile(compressFile)) {
  64. StringBuilder exception = new StringBuilder();
  65. exception.append("创建文件: ");
  66. exception.append(compressFile.toString());
  67. exception.append("失败!");
  68. throw new IOException(exception.toString());
  69. }
  70. }
  71. ZipOutputStream zos = null;
  72. try {
  73. zos = new ZipOutputStream(fs.create(compressFile));
  74. for (String fileName : path) {
  75. Path compFile = new Path(fileName);
  76. if (fs.isFile(compFile)) {
  77. compressZip0(zos, fs, compFile, compFile.getName());
  78. } else {
  79. compressZip0(zos, fs, compFile, "");
  80. }
  81. }
  82. // 当压缩完成,关闭流
  83. zos.closeEntry();
  84. } catch (Exception e) {
  85. log.warn("", e);
  86. } finally {
  87. if (zos != null) {
  88. zos.close();
  89. }
  90. }
  91. return compressFile;
  92. }
  93. /**
  94. * 递归压缩
  95. *
  96. * @param zos
  97. * @param path
  98. * @param baseDir
  99. */
  100. private static void compressZip0(ZipOutputStream zos, FileSystem fs, Path path, String baseDir) throws IOException {
  101. // 压缩文件缓冲区大小
  102. InputStream in = null;
  103. if (fs.isFile(path)) {
  104. try {
  105. // 生成下一个压缩节点
  106. zos.putNextEntry(new ZipEntry(baseDir));
  107. } catch (IOException e1) {
  108. log.warn("", e1);
  109. }
  110. byte[] buffere = new byte[4096];
  111. int length = 0;// 读取的长度
  112. try {
  113. in = fs.open(path);
  114. while ((length = in.read(buffere)) != -1) {
  115. zos.write(buffere, 0, length);
  116. }
  117. } catch (IOException e) {
  118. log.error("", e);
  119. } finally {
  120. // 当压缩完成,关闭流
  121. if (in != null) {
  122. try {
  123. in.close();
  124. } catch (IOException e) {
  125. log.debug("", e);
  126. }
  127. }
  128. }
  129. log.info("压缩文件: " + path + " 成功!");
  130. return;
  131. } else {
  132. try {
  133. zos.putNextEntry(new ZipEntry(baseDir + "/"));
  134. } catch (IOException e) {
  135. log.warn("", e);
  136. }
  137. baseDir = baseDir.length() == 0 ? "" : (baseDir + "/");
  138. FileStatus[] statuses = fs.listStatus(path, new PathFilter() {
  139. @Override
  140. public boolean accept(Path path) {
  141. String name = path.getName();
  142. return !name.startsWith(".") && !name.startsWith("_");
  143. }
  144. });
  145. // 遍历所有文件,逐个进行压缩
  146. for (FileStatus status : statuses) {
  147. compressZip0(zos, fs, status.getPath(), baseDir + status.getPath().getName());
  148. }
  149. }
  150. }
  151. /**
  152. * 解压缩指定zip文件名.
  153. *
  154. * @param zipFile zip文件名
  155. * @param uncompressPath 解压说的后要保存的文件路径,若没有,系统自动新建
  156. * @throws IOException 解压缩异常
  157. */
  158. public static void uncompressZip(String zipFile, FileSystem fs, String uncompressPath) throws IOException {
  159. uncompressZip(new File(zipFile), fs, new Path(uncompressPath));
  160. }
  161. /**
  162. * 解压缩指定zip文件.
  163. *
  164. * @param zipFile zip文件名
  165. * @param uncompressPath 解压说的后要保存的文件路径,若没有,系统自动新建
  166. * @throws IOException 解压缩异常
  167. */
  168. public static List<Path> uncompressZip(FileSystem fs, File zipFile, Path uncompressPath) throws IOException {
  169. ZipFile zip = null;// 创建解压缩文件
  170. List<Path> paths = new ArrayList<Path>(3);
  171. try {
  172. zip = new ZipFile(zipFile);
  173. Enumeration<? extends ZipEntry> en = zip.entries();
  174. ZipEntry entry = null;
  175. Path path = null;
  176. // 遍历每一个文件
  177. while (en.hasMoreElements()) {
  178. // 如果压缩包还有下一个文件,则循环
  179. entry = (ZipEntry) en.nextElement();
  180. if (entry.isDirectory()) {
  181. // 如果是文件夹,创建文件夹并加速循环
  182. path = new Path(uncompressPath, entry.getName());
  183. fs.mkdirs(path);
  184. continue;
  185. }
  186. // 构建文件对象
  187. path = new Path(uncompressPath, entry.getName());
  188. InputStream in = null;
  189. OutputStream out = null;
  190. try {
  191. in = zip.getInputStream(entry);
  192. out = fs.create(path, true);
  193. IOUtils.copy(in, out);
  194. out.flush();
  195. } catch (IOException e) {
  196. e.printStackTrace();
  197. } finally {
  198. if (in != null) {
  199. try {
  200. in.close();
  201. } catch (IOException e) {
  202. e.printStackTrace();
  203. }
  204. }
  205. if (out != null) {
  206. try {
  207. out.close();
  208. } catch (IOException e) {
  209. e.printStackTrace();
  210. }
  211. }
  212. }
  213. paths.add(path);
  214. log.info("解压文件: {} 成功!" + entry.getName());
  215. }
  216. } catch (IOException e) {
  217. e.printStackTrace();
  218. } finally {
  219. if (zip != null) {
  220. try {
  221. zip.close();
  222. } catch (Exception e) {
  223. e.printStackTrace();
  224. }
  225. }
  226. }
  227. return paths;
  228. }
  229. /**
  230. * 解压缩指定zip文件.
  231. *
  232. * @param zipFile zip文件名
  233. * @param uncompressPath 解压说的后要保存的文件路径,若没有,系统自动新建
  234. * @throws IOException 解压缩异常
  235. */
  236. public static FileInfo uncompressZip(File zipFile, FileSystem fs, Path uncompressPath) throws IOException {
  237. ZipFile zip = null;// 创建解压缩文件
  238. FileInfo fileInfo = new FileInfo();
  239. try {
  240. zip = new ZipFile(zipFile);
  241. String name = zip.getName();
  242. fileInfo.setExtendName(name);
  243. } catch (IOException e) {
  244. log.error("", e);
  245. return fileInfo;
  246. }
  247. // 如果指定的路径不存在,创建文件夹
  248. if (!fs.exists(uncompressPath) || !fs.isDirectory(uncompressPath)) {
  249. if (!fs.mkdirs(uncompressPath)) {
  250. StringBuilder exception = new StringBuilder();
  251. exception.append(uncompressPath);
  252. exception.append("路径不可到达,并且解压缩");
  253. exception.append(zipFile);
  254. exception.append("失败!");
  255. throw new IOException(exception.toString());
  256. }
  257. }
  258. // 返回 ZIP 文件条目的枚举。
  259. Enumeration<? extends ZipEntry> en = zip.entries();
  260. ZipEntry entry = null;
  261. int fileCount = 0;
  262. long totalSize = 0;
  263. Path path = null;
  264. // 遍历每一个文件
  265. while (en.hasMoreElements()) {
  266. // 如果压缩包还有下一个文件,则循环
  267. entry = (ZipEntry) en.nextElement();
  268. if (entry.isDirectory()) {
  269. // 如果是文件夹,创建文件夹并加速循环
  270. path = new Path(uncompressPath, entry.getName());
  271. fs.mkdirs(path);
  272. continue;
  273. }
  274. // 构建文件对象
  275. path = new Path(uncompressPath, entry.getName());
  276. log.info(entry.getName());
  277. InputStream in = null;
  278. OutputStream out = null;
  279. try {
  280. in = zip.getInputStream(entry);
  281. //总文件大小
  282. totalSize += in.available();
  283. //文件总数
  284. if(!entry.getName().toString().endsWith(".xml") && !entry.getName().toString().endsWith(".avro")){
  285. fileCount += 1;
  286. }
  287. out = fs.create(path, true);
  288. IOUtils.copy(in, out);
  289. out.flush();
  290. } catch (IOException e) {
  291. log.warn("", e);
  292. } finally {
  293. if (in != null) {
  294. try {
  295. in.close();
  296. } catch (IOException e) {
  297. log.debug("", e);
  298. }
  299. }
  300. if (out != null) {
  301. try {
  302. out.close();
  303. } catch (IOException e) {
  304. log.debug("", e);
  305. }
  306. }
  307. }
  308. log.debug("解压文件:" + entry.getName() + "成功!");
  309. }
  310. fileInfo.setFileCount(fileCount);
  311. fileInfo.setTotalSize(totalSize);
  312. try {
  313. zip.close();
  314. } catch (IOException e) {
  315. log.debug("", e);
  316. }
  317. return fileInfo;
  318. }
  319. /**
  320. * 解压缩文件
  321. * @param zipFile
  322. * @param uncompressPath
  323. * @return
  324. * @throws IOException
  325. */
  326. public static File uncompressZip(File zipFile, String uncompressPath) throws IOException {
  327. ZipFile zip = null ;// 创建解压缩文件
  328. File file = new File(uncompressPath);
  329. try {
  330. zip = new ZipFile(zipFile);
  331. } catch (IOException e) {
  332. log.error("", e);
  333. return file;
  334. }
  335. // 如果指定的路径不存在,创建文件夹
  336. if (!file.exists() || !file.isDirectory()) {
  337. if (!file.mkdirs()) {
  338. StringBuilder exception = new StringBuilder();
  339. exception.append(uncompressPath);
  340. exception.append("路径不可到达,并且解压缩");
  341. exception.append(zipFile);
  342. exception.append("失败!");
  343. throw new IOException(exception.toString());
  344. }
  345. }
  346. // 返回 ZIP 文件条目的枚举。
  347. Enumeration<? extends ZipEntry> en = zip.entries();
  348. ZipEntry entry = null;
  349. String childPath = "";
  350. // 遍历每一个文件
  351. while (en.hasMoreElements()) {
  352. // 如果压缩包还有下一个文件,则循环
  353. entry = (ZipEntry) en.nextElement();
  354. childPath = uncompressPath+ "/" +entry.getName();
  355. if (entry.isDirectory()) {
  356. // 如果是文件夹,创建文件夹并加速循环
  357. File file1 = new File(childPath);
  358. file1.mkdirs();
  359. continue;
  360. }
  361. // 构建文件对象
  362. InputStream in = null;
  363. OutputStream out = null;
  364. try {
  365. in = zip.getInputStream(entry);
  366. out = new FileOutputStream(childPath);
  367. IOUtils.copy(in, out);
  368. out.flush();
  369. } catch (IOException e) {
  370. log.warn("", e);
  371. } finally {
  372. if (in != null) {
  373. try {
  374. in.close();
  375. } catch (IOException e) {
  376. log.debug("", e);
  377. }
  378. }
  379. if (out != null) {
  380. try {
  381. out.close();
  382. } catch (IOException e) {
  383. log.debug("", e);
  384. }
  385. }
  386. }
  387. log.debug("解压文件:" + entry.getName() + "成功!");
  388. }
  389. try {
  390. zip.close();
  391. } catch (IOException e) {
  392. log.debug("", e);
  393. }
  394. final File[] files = file.listFiles(new FileFilter() {
  395. @Override
  396. public boolean accept(File pathname) {
  397. return pathname.isFile();
  398. }
  399. });
  400. return files[0];
  401. }
  402. /**
  403. * 递归获得该文件夹下所有文件(不包括目录).
  404. * 如果该文件路径指向一个文件,则返回该文件的单个集合,
  405. * 如果该文件指向一个目录,则返回该目录下的所有文件
  406. *
  407. * @param file 可以是目录,也可以是文件,当是文件时,直接返回该文件的列表
  408. * @return 返回该文件夹下的所有子文件
  409. */
  410. public List<File> getSubFile(File file) {
  411. // 文件列表对象
  412. List<File> fileList = new ArrayList<File>();
  413. if (file.isFile()) {
  414. // 如果是普通文件,直接把该文件添加到文件列表
  415. fileList.add(file);
  416. return fileList;
  417. }
  418. if (file.isDirectory()) {
  419. fileList.add(file);
  420. // 如果是目录,则遍历目录下的所有文件
  421. for (File f : file.listFiles()) {
  422. // 这里使用的递归,一直到文件目录的最底层,而文件列表里存的,全是文件
  423. fileList.addAll(getSubFile(f));
  424. }
  425. }
  426. return fileList;
  427. }
  428. /**
  429. * 解压缩指定zip文件.
  430. *
  431. * @param zipFile
  432. * zip文件名
  433. * @param uncompressPath
  434. * 解压说的后要保存的文件路径,若没有,系统自动新建
  435. * @throws IOException 解压缩异常
  436. */
  437. public void uncompressZip(File zipFile, File uncompressPath)
  438. throws IOException {
  439. ZipFile zip = null;// 创建解压缩文件
  440. try {
  441. zip = new ZipFile(zipFile);
  442. } catch (IOException e) {
  443. log.error("", e);
  444. return;
  445. }
  446. // 如果指定的路径不存在,创建文件夹
  447. if (!uncompressPath.exists()) {
  448. if (!uncompressPath.mkdirs()) {
  449. StringBuilder exception = new StringBuilder();
  450. exception.append(uncompressPath);
  451. exception.append("路径不可到达,并且解压缩");
  452. exception.append(zipFile);
  453. exception.append("失败!");
  454. throw new IOException(exception.toString());
  455. }
  456. }
  457. // 返回 ZIP 文件条目的枚举。
  458. Enumeration<? extends ZipEntry> en = zip.entries();
  459. ZipEntry entry = null;
  460. File file = null;
  461. // 遍历每一个文件
  462. while (en.hasMoreElements()) {
  463. // 如果压缩包还有下一个文件,则循环
  464. entry = (ZipEntry) en.nextElement();
  465. if (entry.isDirectory()) {
  466. // 如果是文件夹,创建文件夹并加速循环
  467. file = new File(uncompressPath, entry.getName());
  468. file.mkdirs();
  469. continue;
  470. }
  471. // 构建文件对象
  472. file = new File(uncompressPath, entry.getName());
  473. if (!file.getParentFile().exists()) {
  474. // 如果文件对象的父目录不存在,创建文件夹
  475. if(!file.getParentFile().mkdirs()){
  476. log.debug("can not create dir: " + file.getAbsolutePath());
  477. }
  478. }
  479. InputStream in = null;
  480. FileOutputStream out = null;
  481. try {
  482. in = zip.getInputStream(entry);
  483. out = new FileOutputStream(file);
  484. byte[] bytes = new byte[2048];
  485. int size = -1;
  486. while((size = in.read(bytes)) != -1){
  487. out.write(bytes, 0, size);
  488. }
  489. out.flush();
  490. } catch (IOException e) {
  491. log.warn("", e);
  492. } finally {
  493. if(in != null){
  494. try {
  495. in.close();
  496. }catch(IOException e) {
  497. log.debug("", e);
  498. }
  499. }
  500. if(out != null){
  501. try {
  502. out.close();
  503. }catch(IOException e) {
  504. log.debug("", e);
  505. }
  506. }
  507. }
  508. log.debug("解压文件:" + entry.getName() + "成功!");
  509. }
  510. try{
  511. zip.close();
  512. } catch(IOException e) {
  513. log.debug("", e);
  514. }
  515. }
  516. /**
  517. * 压缩文件成Gzip格式,Linux上可使用
  518. * 压缩文件夹生成后缀名为".gz"的文件并下载
  519. * */
  520. public static void compressTarGz(String folderPath, String targzipFilePath) {
  521. File srcPath =new File(folderPath);
  522. int length=srcPath.listFiles().length;
  523. byte[] buf = new byte[4096]; //设定读入缓冲区尺寸
  524. File[] files = srcPath.listFiles();
  525. try
  526. {
  527. //建立压缩文件输出流
  528. FileOutputStream fout=new FileOutputStream(targzipFilePath);
  529. //建立tar压缩输出流
  530. TarOutputStream tout=new TarOutputStream(fout);
  531. for(int i=0;i<length;i++)
  532. {
  533. String filename=srcPath.getPath()+File.separator+files[i].getName();
  534. //打开需压缩文件作为文件输入流
  535. FileInputStream fin=new FileInputStream(filename); //filename是文件全路径
  536. TarEntry tarEn=new TarEntry(files[i]); //此处必须使用new TarEntry(File file);
  537. tarEn.setName(files[i].getName()); //此处需重置名称,默认是带全路径的,否则打包后会带全路径
  538. tout.putNextEntry(tarEn);
  539. int num;
  540. while ((num=fin.read(buf)) != -1)
  541. {
  542. tout.write(buf,0,num);
  543. }
  544. tout.closeEntry();
  545. fin.close();
  546. }
  547. tout.close();
  548. fout.close();
  549. //建立压缩文件输出流
  550. FileOutputStream gzFile=new FileOutputStream(targzipFilePath+".gz");
  551. //建立gzip压缩输出流
  552. GZIPOutputStream gzout=new GZIPOutputStream(gzFile);
  553. //打开需压缩文件作为文件输入流
  554. FileInputStream tarin=new FileInputStream(targzipFilePath); //targzipFilePath是文件全路径
  555. int len;
  556. while ((len=tarin.read(buf)) != -1)
  557. {
  558. gzout.write(buf,0,len);
  559. }
  560. gzout.close();
  561. gzFile.close();
  562. tarin.close();
  563. }catch(FileNotFoundException e)
  564. {
  565. System.out.println(e);
  566. }catch(IOException e)
  567. {
  568. System.out.println(e);
  569. }
  570. }
  571. /**
  572. * 递归删除目录下的所有文件及子目录下所有文件
  573. * @param dir 将要删除的文件目录
  574. * @return boolean Returns "true" if all deletions were successful.
  575. * If a deletion fails, the method stops attempting to
  576. * delete and returns "false".
  577. */
  578. private static boolean deleteDir(File dir) {
  579. if (dir.isDirectory()) {
  580. String[] children = dir.list();
  581. //递归删除目录中的子目录下
  582. for (int i=0; i<children.length; i++) {
  583. boolean success = deleteDir(new File(dir, children[i]));
  584. if (!success) {
  585. return false;
  586. }
  587. }
  588. }
  589. // 目录此时为空,可以删除
  590. return dir.delete();
  591. }
  592. public static class FileInfo {
  593. private int fileCount = 0;
  594. private long totalSize = 0;
  595. private String extendName = "zip";
  596. public FileInfo() {
  597. }
  598. public FileInfo(int fileCount, long totalSize, String extendName) {
  599. this.fileCount = fileCount;
  600. this.totalSize = totalSize;
  601. this.extendName = extendName;
  602. }
  603. public int getFileCount() {
  604. return fileCount;
  605. }
  606. public void setFileCount(int fileCount) {
  607. this.fileCount = fileCount;
  608. }
  609. public long getTotalSize() {
  610. return totalSize;
  611. }
  612. public void setTotalSize(long totalSize) {
  613. this.totalSize = totalSize;
  614. }
  615. public String getExtendName() {
  616. return extendName;
  617. }
  618. public void setExtendName(String extendName) {
  619. this.extendName = extendName;
  620. }
  621. }
  622. public static void main(String[] args) throws IOException {
  623. FileSystem fs = FileSystem.get(new Configuration());
  624. ZipHdfsOperator.compressZip(new Path("/user/zhaopingxi/fenguang-test/fengaung-comp.zip"), fs, "/user/zhaopingxi/comp");
  625. }
  626. }