package com.primeton.dsp.upgrader.workflow; import com.primeton.dsp.upgrader.DAOServiceImpl; import lombok.extern.slf4j.Slf4j; import org.springframework.jdbc.core.JdbcTemplate; import java.util.Arrays; import java.util.concurrent.atomic.AtomicInteger; /** * 工作流升级, 工作流升级只能在升级 元数据后,执行 step1, metacube 后执行的 *
 *
 * Created by zhaopx.
 * User: zhaopx
 * Date: 2020/6/8
 * Time: 15:47
 *
 * 
* * @author zhaopx */ @Slf4j public class WorkFlowUpgradeService extends DAOServiceImpl { public WorkFlowUpgradeService(JdbcTemplate jdbcTemplate) { super(jdbcTemplate); } /** * 开始升级 WorkFlow */ public void doUpgradeWorkFlow() { log.info("------------------upgrade workflow---------------------"); //List upgradeResources = this.queryForMap(getSQL("findUpgradeJoinResources")); //System.out.println(upgradeResources.size()); try { // 实体绑定的资源 String upgradeJoinResources = getSQL("upgradeJoinResources"); String[] batchSqls = upgradeJoinResources.split(";\\s*"); int[] ints = new int[batchSqls.length]; int i = 0; for (String batchSql : batchSqls) { ints[i] = updateSQL(batchSql); i++; } AtomicInteger sum = new AtomicInteger(0); Arrays.stream(ints).forEach(it -> sum.addAndGet(it)); log.info("升级 {} 条实体资源。", sum.get()); // 在线作业 String upgradeJoinOnlineItems = getSQL("upgradeJoinOnlineItems"); String[] batchSqlSet = upgradeJoinOnlineItems.split(";\\s*"); int[] ints2 = new int[batchSqls.length]; i = 0; for (String batchSql : batchSqlSet) { ints2[i] = updateSQL(batchSql); i++; } AtomicInteger sum2 = new AtomicInteger(0); Arrays.stream(ints2).forEach(it -> sum2.addAndGet(it)); log.info("升级 {} 条在线作业绑定资源。", sum2.get()); } catch (Exception e) { log.error("升级 workflow 失败。", e); throw new IllegalStateException("升级 workflow 失败, Cause: " + e.getMessage(), e); } } }