LogOutputStream.java 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. package com.primeton.datainsight.task;
  2. import com.google.common.base.Charsets;
  3. import org.apache.commons.lang.SystemUtils;
  4. import org.slf4j.Logger;
  5. import org.slf4j.LoggerFactory;
  6. import java.io.BufferedWriter;
  7. import java.io.Closeable;
  8. import java.io.File;
  9. import java.io.FileOutputStream;
  10. import java.io.IOException;
  11. import java.io.OutputStream;
  12. import java.io.OutputStreamWriter;
  13. import java.nio.charset.Charset;
  14. /**
  15. * <p>
  16. * </p>
  17. * Created by zhaopx on 2018/1/12 0012-18:39
  18. */
  19. public class LogOutputStream extends OutputStream implements Closeable {
  20. private static Logger logger = LoggerFactory.getLogger(LogOutputStream.class);
  21. /**
  22. * Job Id
  23. */
  24. final String jobId;
  25. /**
  26. * Task Id
  27. */
  28. final String taskId;
  29. /**
  30. * console 输出日志的编码, Windows 使用GBK
  31. */
  32. final java.nio.charset.Charset charset;
  33. /**
  34. * 日志输出流
  35. */
  36. BufferedWriter logWriter;
  37. /**
  38. * 缓冲大小
  39. */
  40. volatile long bufferSiuze = 0;
  41. /**
  42. * 获取 Task 的目录
  43. */
  44. final String taskDir;
  45. public LogOutputStream(String baseDir, String jobId, String taskId) {
  46. this.jobId = jobId;
  47. this.taskId = taskId;
  48. if(SystemUtils.IS_OS_WINDOWS) {
  49. charset = Charset.forName("GBK");
  50. } else {
  51. charset = Charsets.UTF_8;
  52. }
  53. // TODO Task dir
  54. File tasks = new File(baseDir);
  55. if(!tasks.exists() || tasks.isFile()){
  56. tasks.mkdirs();
  57. }
  58. File jobDir = new File(tasks, jobId);
  59. if(!jobDir.exists() || jobDir.isFile()){
  60. jobDir.mkdirs();
  61. }
  62. // 临时Job
  63. File taskDir = new File(jobDir, taskId);
  64. // 肯定不存在的
  65. if(!taskDir.exists()){
  66. taskDir.mkdirs();
  67. }
  68. this.taskDir = taskDir.getAbsolutePath();
  69. File logfile = new File(taskDir, "stdout");
  70. try {
  71. logWriter = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(logfile), "UTF-8"));
  72. } catch (Exception e) {
  73. logger.error("无法打开文件。", e);
  74. }
  75. }
  76. @Override
  77. public void write(int b) throws IOException {
  78. if(b > 0) {
  79. logger.info("unknown byte {}", b);
  80. }
  81. }
  82. /**
  83. * write one line
  84. * @param line one line
  85. * @return
  86. * @throws IOException
  87. */
  88. public void write(String line) throws IOException {
  89. if(logWriter != null) {
  90. try {
  91. logWriter.write(line);
  92. logWriter.flush();
  93. } catch (IOException e) {
  94. logger.error(e.getMessage());
  95. }
  96. }
  97. }
  98. @Override
  99. public void write(byte[] b, int off, int len) throws IOException {
  100. String line = new String(b, off, len, charset);
  101. write(line);
  102. }
  103. @Override
  104. public void flush() throws IOException {
  105. if(logWriter != null) {
  106. logWriter.flush();
  107. }
  108. }
  109. @Override
  110. public void close() throws IOException {
  111. if(logWriter != null) {
  112. logWriter.flush();
  113. logWriter.close();
  114. }
  115. logWriter = null;
  116. }
  117. public String getTaskDir() {
  118. return taskDir;
  119. }
  120. }