package com.primeton.damp.fileclient; import com.google.common.collect.Lists; import org.apache.sshd.client.subsystem.sftp.fs.SftpFileSystemProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.nio.file.DirectoryStream; import java.nio.file.FileSystem; import java.nio.file.FileSystemAlreadyExistsException; import java.nio.file.FileSystems; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardOpenOption; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Properties; import static java.nio.file.StandardOpenOption.APPEND; import static java.nio.file.StandardOpenOption.CREATE; /** * Created by hadoop on 2018/8/9. */ public class SftpClientHandler implements X11ClientHandler { protected static Logger logger = LoggerFactory.getLogger("SftpClientHandler"); private final String sftpHost; private final int sftpPort; private final String ftpUsername; private final String ftpPassword; /** * sftp 文件系统 */ FileSystem fs; public SftpClientHandler(String host, int port, String username, String password, Properties params) { sftpHost = host; sftpPort = port; ftpUsername = username; ftpPassword = password; this.reconnect(params); } /** * 以账号和密码重新连接 * @param params */ @Override public void reconnect(Properties params) { java.net.URI uri = SftpFileSystemProvider.createFileSystemURI(this.sftpHost, sftpPort, ftpUsername, ftpPassword); try { fs = FileSystems.newFileSystem(uri, (Map) params); } catch (FileSystemAlreadyExistsException e){ logger.warn("exists file system sftp..."); fs = FileSystems.getFileSystem(uri); } catch (IOException e) { logger.error("ftp connect is error.", e); throw new IllegalStateException("can not connect sftp: " + sftpHost+":"+ sftpPort); } } /** * 写文件流 * @param file * @return * @throws IOException */ @Override public OutputStream writeFile(String file, boolean overwrite) throws IOException { Path path1 = fs.getPath(file); if(overwrite) { return Files.newOutputStream(path1, CREATE); } else { return Files.newOutputStream(path1, APPEND); } } /** * 读取文件流 * @param file * @return * @throws IOException */ @Override public InputStream readFile(String file) throws IOException { Path path1 = fs.getPath(file); return Files.newInputStream(path1, StandardOpenOption.READ); } @Override public boolean rename(String file, String newFileName) { try { Path path1 = fs.getPath(file); Path path = fs.getPath(path1.getParent().toString(), newFileName); Files.move(path1, path); logger.info("rename {} to {}", path1.toString(), path.toString()); return true; } catch (Exception e) { logger.error("Error in rename ftp file.", e); return false; } } /** * 删除文件, 返回 true 则删除成功 * @param path */ public boolean deleteFile(String path) { final Path fsPath = fs.getPath(path); try { if (Files.exists(fsPath) && Files.isDirectory(fsPath)) { // 删除文件夹,连带文件夹一起删除,包括其下的子文件 Files.list(fsPath).forEach(it-> deleteFile(it.toString())); Files.delete(fsPath); logger.info("delete dir: {}", path); } else { // 删除文件 Files.delete(fsPath); logger.info("delete file: {}", path); } } catch (Exception e) { throw new RuntimeException(e); } return true; } /** * 判断目录是否存在 * @param path * @return */ @Override public boolean existsDir(String path) { try { Path path1 = fs.getPath(path); return Files.exists(path1) && Files.isDirectory(path1); } catch (Exception e) { } return false; } /** * 判断目录是否存在 * @param path * @return */ @Override public boolean existsFile(String path) { try { Path path1 = fs.getPath(path); return Files.exists(path1) && Files.isRegularFile(path1); } catch (Exception e) { } return false; } /** * 判断目录或者文件是否存在 * @param path * @return */ @Override public boolean exists(String path) { try { Path path1 = fs.getPath(path); return Files.exists(path1); } catch (Exception e) { } return false; } /** * 创建多层目录 * @param path * @return */ @Override public boolean mkdirs(String path) { Path path1 = fs.getPath(path); Path parentPath = path1.getParent(); if(parentPath == null) { // parentPath 等于 null 则说明是文件系统的根目录了。 return true; } // 父级目录已经存在,只创建当前目录 if(existsDir(parentPath.toString())){ return mkdir(path1.toString()); // 创建当前文件夹 } else { // 父级目录不存在,则先创建父级目录,再创建当前目录 if(mkdirs(parentPath.toString())) { // 创建父级目录 return mkdir(path1.toString()); // 创建当前文件夹 } } // 最后创建文件夹肯定是不成功的。 return existsDir(path); } /** * 创建目录, 返回 true 则创建目录成功 * @param dirName 一级或者多级目录 */ @Override public boolean mkdir(String dirName) { try { Path path1 = fs.getPath(dirName); Files.createDirectory(path1); return true; } catch (IOException e) { logger.info("mkdir " + dirName + " fail.", e); } return false; } /** * 获取一个目录下或者文件的子文件 * @param ftpPath * @return */ @Override public List getChildren(String ftpPath) { Path remotePath = fs.getPath(ftpPath); try { DirectoryStream paths = Files.newDirectoryStream(remotePath); return Lists.newArrayList(paths.iterator()); } catch (IOException e) { logger.info("Error in scan sftpfile."); } return Collections.emptyList(); } @Override public void close() throws IOException { logger.warn("close the file system"); fs.close(); } }