package com.primeton.damp.fileclient; import org.apache.hadoop.fs.FileSystem; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.nio.file.Path; import java.nio.file.Paths; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Properties; import java.util.stream.Collectors; /** *
* * Created by zhaopx. * User: zhaopx * Date: 2021/4/1 * Time: 17:55 * ** * @author zhaopx */ public class HdfsClientHandler implements X11ClientHandler { protected static Logger logger = LoggerFactory.getLogger("HdfsClientHandler"); /** * 执行路径的基础路径 */ private final String basePath; /** * hadoop 文件系统 */ final FileSystem fs; public HdfsClientHandler(FileSystem fs) { this(fs, System.getProperty("user.name")); } public HdfsClientHandler(FileSystem fs, String basePath) { this.fs = fs; this.basePath = basePath; } @Override public void reconnect(Properties params) throws IOException { } @Override public OutputStream writeFile(String file, boolean overwrite) throws IOException { if(overwrite) { return fs.create(new org.apache.hadoop.fs.Path(file), true); } return fs.create(new org.apache.hadoop.fs.Path(file), false); } @Override public InputStream readFile(String file) throws IOException { return fs.open(new org.apache.hadoop.fs.Path(file)); } @Override public boolean rename(String file, String newFileName) { final org.apache.hadoop.fs.Path oldFile = new org.apache.hadoop.fs.Path(file); final org.apache.hadoop.fs.Path newFile = new org.apache.hadoop.fs.Path(oldFile.getParent(), newFileName); try { return fs.rename(oldFile, newFile); } catch (IOException e) { throw new RuntimeException("mkdirs " + file + " error!", e); } } @Override public boolean deleteFile(String path) { final org.apache.hadoop.fs.Path file = new org.apache.hadoop.fs.Path(path); try { if (!fs.exists(file)) { return false; } return fs.delete(file, true); } catch (Exception e) { throw new RuntimeException("delete " + path + " error!", e); } } @Override public boolean mkdir(String path) { return mkdirs(path); } @Override public boolean mkdirs(String path) { try { return fs.mkdirs(new org.apache.hadoop.fs.Path(path)); } catch (IOException e) { throw new RuntimeException("mkdirs " + path + " error!", e); } } @Override public List