123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778 |
- package com.primeton.datainsight.task;
- import java.io.Closeable;
- import java.io.IOException;
- import java.util.regex.Matcher;
- import java.util.regex.Pattern;
- /**
- * <p>
- * </p>
- * Created by zhaopx on 2018/1/12 0012-18:39
- */
- public class SparkJobOutputStream extends LogOutputStream implements Closeable {
- /**
- * 从 console log 中获取 appid 的 正则
- */
- final String pa = "Client: Application report for (application_\\d+_\\d+) \\(state: (ACCEPTED|RUNNING|FINISHED|FAILED|KILLED)\\)";
- /**
- * Yarn App
- */
- final Pattern YARN_APPID_PATTERN = Pattern.compile(pa);
- /**
- * 提交的应用 Yarn 程序ID
- */
- String appId;
- /**
- * 程序的状态
- */
- String state = "UNKNOWN";
- public SparkJobOutputStream(String baseDir, String jobId, String taskId) {
- super(baseDir, jobId, taskId);
- }
- @Override
- public void write(byte[] b, int off, int len) throws IOException {
- String line = new String(b, off, len, charset);
- super.write(line);
- Matcher matcher = YARN_APPID_PATTERN.matcher(line);
- if(matcher.find()){
- String appId = matcher.group(1);
- String state = matcher.group(2);
- setAppId(appId);
- setState(state);
- return;
- }
- }
- public String getAppId() {
- return appId;
- }
- public String getState() {
- return state;
- }
- public void setAppId(String appId) {
- this.appId = appId;
- }
- public void setState(String state) {
- this.state = state;
- //JobQueue.runningTask(getAppId(), jobId, taskId, state);
- }
- }
|