123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147 |
- package com.primeton.datainsight.task;
- import com.google.common.base.Charsets;
- import org.apache.commons.lang.SystemUtils;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import java.io.BufferedWriter;
- import java.io.Closeable;
- import java.io.File;
- import java.io.FileOutputStream;
- import java.io.IOException;
- import java.io.OutputStream;
- import java.io.OutputStreamWriter;
- import java.nio.charset.Charset;
- /**
- * <p>
- * </p>
- * Created by zhaopx on 2018/1/12 0012-18:39
- */
- public class LogOutputStream extends OutputStream implements Closeable {
- private static Logger logger = LoggerFactory.getLogger(LogOutputStream.class);
- /**
- * Job Id
- */
- final String jobId;
- /**
- * Task Id
- */
- final String taskId;
- /**
- * console 输出日志的编码, Windows 使用GBK
- */
- final java.nio.charset.Charset charset;
- /**
- * 日志输出流
- */
- BufferedWriter logWriter;
- /**
- * 缓冲大小
- */
- volatile long bufferSiuze = 0;
- /**
- * 获取 Task 的目录
- */
- final String taskDir;
- public LogOutputStream(String baseDir, String jobId, String taskId) {
- this.jobId = jobId;
- this.taskId = taskId;
- if(SystemUtils.IS_OS_WINDOWS) {
- charset = Charset.forName("GBK");
- } else {
- charset = Charsets.UTF_8;
- }
- // TODO Task dir
- File tasks = new File(baseDir);
- if(!tasks.exists() || tasks.isFile()){
- tasks.mkdirs();
- }
- File jobDir = new File(tasks, jobId);
- if(!jobDir.exists() || jobDir.isFile()){
- jobDir.mkdirs();
- }
- // 临时Job
- File taskDir = new File(jobDir, taskId);
- // 肯定不存在的
- if(!taskDir.exists()){
- taskDir.mkdirs();
- }
- this.taskDir = taskDir.getAbsolutePath();
- File logfile = new File(taskDir, "stdout");
- try {
- logWriter = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(logfile), "UTF-8"));
- } catch (Exception e) {
- logger.error("无法打开文件。", e);
- }
- }
- @Override
- public void write(int b) throws IOException {
- if(b > 0) {
- logger.info("unknown byte {}", b);
- }
- }
- /**
- * write one line
- * @param line one line
- * @return
- * @throws IOException
- */
- public void write(String line) throws IOException {
- if(logWriter != null) {
- try {
- logWriter.write(line);
- logWriter.flush();
- } catch (IOException e) {
- logger.error(e.getMessage());
- }
- }
- }
- @Override
- public void write(byte[] b, int off, int len) throws IOException {
- String line = new String(b, off, len, charset);
- write(line);
- }
- @Override
- public void flush() throws IOException {
- if(logWriter != null) {
- logWriter.flush();
- }
- }
- @Override
- public void close() throws IOException {
- if(logWriter != null) {
- logWriter.flush();
- logWriter.close();
- }
- logWriter = null;
- }
- public String getTaskDir() {
- return taskDir;
- }
- }
|