package com.primeton.damp.core.dataset;
import com.primeton.damp.config.DataShardConfig;
import com.primeton.damp.fileclient.*;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FileSystem;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Properties;
/**
*
*
* Created by zhaopx.
* User: zhaopx
* Date: 2021/3/31
* Time: 13:17
*
*
*
* @author zhaopx
*/
@Component
@Slf4j
public class X11DatasetHandler implements FactoryBean {
/**
* DataShard 配置
*/
@Autowired
DataShardConfig dataShardConfig;
@Autowired
FileSystem fs;
@Override
public X11ClientHandler getObject() throws Exception {
if(StringUtils.isBlank(dataShardConfig.getType()) || "local".equalsIgnoreCase(dataShardConfig.getType())) {
return new LocalFsClientHandler(dataShardConfig.getBasePath());
} else if("hdfs".equalsIgnoreCase(dataShardConfig.getType())) {
return new HdfsClientHandler(fs);
} else if("sftp".equalsIgnoreCase(dataShardConfig.getType())) {
return new SftpClientHandler((String) dataShardConfig.getSftp().get("host"),
(Integer) dataShardConfig.getSftp().get("port"),
(String) dataShardConfig.getSftp().get("username"),
(String) dataShardConfig.getSftp().get("password"),
new Properties());
} else if("ftp".equalsIgnoreCase(dataShardConfig.getType())) {
return new FtpClientHandler((String) dataShardConfig.getFtp().get("host"),
(Integer) dataShardConfig.getFtp().get("port"),
(String) dataShardConfig.getFtp().get("username"),
(String) dataShardConfig.getFtp().get("password"),
new Properties());
} else {
throw new IllegalArgumentException("Illegal config datashard.type: " + dataShardConfig.getType());
}
}
@Override
public Class> getObjectType() {
return X11ClientHandler.class;
}
}