SparkJobOutputStream.java 1.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. package com.primeton.datainsight.task;
  2. import java.io.Closeable;
  3. import java.io.IOException;
  4. import java.util.regex.Matcher;
  5. import java.util.regex.Pattern;
  6. /**
  7. * <p>
  8. * </p>
  9. * Created by zhaopx on 2018/1/12 0012-18:39
  10. */
  11. public class SparkJobOutputStream extends LogOutputStream implements Closeable {
  12. /**
  13. * 从 console log 中获取 appid 的 正则
  14. */
  15. final String pa = "Client: Application report for (application_\\d+_\\d+) \\(state: (ACCEPTED|RUNNING|FINISHED|FAILED|KILLED)\\)";
  16. /**
  17. * Yarn App
  18. */
  19. final Pattern YARN_APPID_PATTERN = Pattern.compile(pa);
  20. /**
  21. * 提交的应用 Yarn 程序ID
  22. */
  23. String appId;
  24. /**
  25. * 程序的状态
  26. */
  27. String state = "UNKNOWN";
  28. public SparkJobOutputStream(String baseDir, String jobId, String taskId) {
  29. super(baseDir, jobId, taskId);
  30. }
  31. @Override
  32. public void write(byte[] b, int off, int len) throws IOException {
  33. String line = new String(b, off, len, charset);
  34. super.write(line);
  35. Matcher matcher = YARN_APPID_PATTERN.matcher(line);
  36. if(matcher.find()){
  37. String appId = matcher.group(1);
  38. String state = matcher.group(2);
  39. setAppId(appId);
  40. setState(state);
  41. return;
  42. }
  43. }
  44. public String getAppId() {
  45. return appId;
  46. }
  47. public String getState() {
  48. return state;
  49. }
  50. public void setAppId(String appId) {
  51. this.appId = appId;
  52. }
  53. public void setState(String state) {
  54. this.state = state;
  55. //JobQueue.runningTask(getAppId(), jobId, taskId, state);
  56. }
  57. }