package com.primeton.datainsight.task; import java.io.Closeable; import java.io.IOException; import java.util.regex.Matcher; import java.util.regex.Pattern; /** *

*

* Created by zhaopx on 2018/1/12 0012-18:39 */ public class SparkJobOutputStream extends LogOutputStream implements Closeable { /** * 从 console log 中获取 appid 的 正则 */ final String pa = "Client: Application report for (application_\\d+_\\d+) \\(state: (ACCEPTED|RUNNING|FINISHED|FAILED|KILLED)\\)"; /** * Yarn App */ final Pattern YARN_APPID_PATTERN = Pattern.compile(pa); /** * 提交的应用 Yarn 程序ID */ String appId; /** * 程序的状态 */ String state = "UNKNOWN"; public SparkJobOutputStream(String baseDir, String jobId, String taskId) { super(baseDir, jobId, taskId); } @Override public void write(byte[] b, int off, int len) throws IOException { String line = new String(b, off, len, charset); super.write(line); Matcher matcher = YARN_APPID_PATTERN.matcher(line); if(matcher.find()){ String appId = matcher.group(1); String state = matcher.group(2); setAppId(appId); setState(state); return; } } public String getAppId() { return appId; } public String getState() { return state; } public void setAppId(String appId) { this.appId = appId; } public void setState(String state) { this.state = state; //JobQueue.runningTask(getAppId(), jobId, taskId, state); } }