WorkFlowUpgradeService.java 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. package com.primeton.dsp.upgrader.workflow;
  2. import com.primeton.dsp.upgrader.DAOServiceImpl;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.springframework.jdbc.core.JdbcTemplate;
  5. import java.util.Arrays;
  6. import java.util.concurrent.atomic.AtomicInteger;
  7. /**
  8. * 工作流升级, 工作流升级只能在升级 元数据后,执行 step1, metacube 后执行的
  9. * <pre>
  10. *
  11. * Created by zhaopx.
  12. * User: zhaopx
  13. * Date: 2020/6/8
  14. * Time: 15:47
  15. *
  16. * </pre>
  17. *
  18. * @author zhaopx
  19. */
  20. @Slf4j
  21. public class WorkFlowUpgradeService extends DAOServiceImpl {
  22. public WorkFlowUpgradeService(JdbcTemplate jdbcTemplate) {
  23. super(jdbcTemplate);
  24. }
  25. /**
  26. * 开始升级 WorkFlow
  27. */
  28. public void doUpgradeWorkFlow() {
  29. log.info("------------------upgrade workflow---------------------");
  30. //List<Map> upgradeResources = this.queryForMap(getSQL("findUpgradeJoinResources"));
  31. //System.out.println(upgradeResources.size());
  32. try {
  33. // 实体绑定的资源
  34. String upgradeJoinResources = getSQL("upgradeJoinResources");
  35. String[] batchSqls = upgradeJoinResources.split(";\\s*");
  36. int[] ints = new int[batchSqls.length];
  37. int i = 0;
  38. for (String batchSql : batchSqls) {
  39. ints[i] = updateSQL(batchSql);
  40. i++;
  41. }
  42. AtomicInteger sum = new AtomicInteger(0);
  43. Arrays.stream(ints).forEach(it -> sum.addAndGet(it));
  44. log.info("升级 {} 条实体资源。", sum.get());
  45. // 在线作业
  46. String upgradeJoinOnlineItems = getSQL("upgradeJoinOnlineItems");
  47. String[] batchSqlSet = upgradeJoinOnlineItems.split(";\\s*");
  48. int[] ints2 = new int[batchSqls.length];
  49. i = 0;
  50. for (String batchSql : batchSqlSet) {
  51. ints2[i] = updateSQL(batchSql);
  52. i++;
  53. }
  54. AtomicInteger sum2 = new AtomicInteger(0);
  55. Arrays.stream(ints2).forEach(it -> sum2.addAndGet(it));
  56. log.info("升级 {} 条在线作业绑定资源。", sum2.get());
  57. } catch (Exception e) {
  58. log.error("升级 workflow 失败。", e);
  59. throw new IllegalStateException("升级 workflow 失败, Cause: " + e.getMessage(), e);
  60. }
  61. }
  62. }