123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172 |
- package com.primeton.dsp.upgrader.workflow;
- import com.primeton.dsp.upgrader.DAOServiceImpl;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.jdbc.core.JdbcTemplate;
- import java.util.Arrays;
- import java.util.concurrent.atomic.AtomicInteger;
- /**
- * 工作流升级, 工作流升级只能在升级 元数据后,执行 step1, metacube 后执行的
- * <pre>
- *
- * Created by zhaopx.
- * User: zhaopx
- * Date: 2020/6/8
- * Time: 15:47
- *
- * </pre>
- *
- * @author zhaopx
- */
- @Slf4j
- public class WorkFlowUpgradeService extends DAOServiceImpl {
- public WorkFlowUpgradeService(JdbcTemplate jdbcTemplate) {
- super(jdbcTemplate);
- }
- /**
- * 开始升级 WorkFlow
- */
- public void doUpgradeWorkFlow() {
- log.info("------------------upgrade workflow---------------------");
- //List<Map> upgradeResources = this.queryForMap(getSQL("findUpgradeJoinResources"));
- //System.out.println(upgradeResources.size());
- try {
- // 实体绑定的资源
- String upgradeJoinResources = getSQL("upgradeJoinResources");
- String[] batchSqls = upgradeJoinResources.split(";\\s*");
- int[] ints = new int[batchSqls.length];
- int i = 0;
- for (String batchSql : batchSqls) {
- ints[i] = updateSQL(batchSql);
- i++;
- }
- AtomicInteger sum = new AtomicInteger(0);
- Arrays.stream(ints).forEach(it -> sum.addAndGet(it));
- log.info("升级 {} 条实体资源。", sum.get());
- // 在线作业
- String upgradeJoinOnlineItems = getSQL("upgradeJoinOnlineItems");
- String[] batchSqlSet = upgradeJoinOnlineItems.split(";\\s*");
- int[] ints2 = new int[batchSqls.length];
- i = 0;
- for (String batchSql : batchSqlSet) {
- ints2[i] = updateSQL(batchSql);
- i++;
- }
- AtomicInteger sum2 = new AtomicInteger(0);
- Arrays.stream(ints2).forEach(it -> sum2.addAndGet(it));
- log.info("升级 {} 条在线作业绑定资源。", sum2.get());
- } catch (Exception e) {
- log.error("升级 workflow 失败。", e);
- throw new IllegalStateException("升级 workflow 失败, Cause: " + e.getMessage(), e);
- }
- }
- }
|