Browse Source

springjdbc-sql template 模板

zhzhenqin 5 năm trước cách đây
mục cha
commit
1d3d8797c4

+ 260 - 0
springjdbc-sql-template/DAOServiceImpl.java

@@ -0,0 +1,260 @@
+package com.primeton.dsp.upgrader;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.jsoup.Jsoup;
+import org.jsoup.nodes.Document;
+import org.jsoup.nodes.Element;
+import org.jsoup.parser.ParseSettings;
+import org.jsoup.parser.Parser;
+import org.jsoup.select.Elements;
+import org.springframework.core.io.ClassPathResource;
+import org.springframework.jdbc.core.ColumnMapRowMapper;
+import org.springframework.jdbc.core.JdbcTemplate;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * 为了升级元数据,兼容 hibernate 的 API
+ *
+ * <pre>
+ *
+ * Created by zhaopx.
+ * User: zhaopx
+ * Date: 2020/6/3
+ * Time: 11:12
+ *
+ * </pre>
+ *
+ * @author zhaopx
+ */
+@Slf4j
+public class DAOServiceImpl {
+
+
+    /**
+     * SQL #var# 类型的替换
+     */
+    public static final Pattern SQL_VAR_PATTERN = Pattern.compile("(#(\\w|\\.|-|_)+#)");
+
+
+    public final JdbcTemplate jdbcTemplate;
+
+
+    /**
+     * SQL statement 缓存
+     */
+    private final static Map<String, String> sqlCache = new HashMap<>();
+
+
+    public DAOServiceImpl(JdbcTemplate jdbcTemplate) {
+        this.jdbcTemplate = jdbcTemplate;
+        if (sqlCache.isEmpty()) {
+            // 未初始化,则初始化一次
+            try {
+                init(jdbcTemplate);
+            } catch (Exception e) {
+                throw new IllegalStateException("无法加载 SQL 声明文件。", e);
+            }
+        }
+    }
+
+
+    /**
+     * 注意:静态,同步方法
+     *
+     * @param jdbcTemplate
+     * @throws Exception
+     */
+    private synchronized static void init(JdbcTemplate jdbcTemplate) throws Exception {
+        if (!sqlCache.isEmpty()) {
+            return;
+        }
+        Connection connection = jdbcTemplate.getDataSource().getConnection();
+        try {
+            DatabaseMetaData metaData = connection.getMetaData();
+            String productName = metaData.getDatabaseProductName();
+            log.info("database product name: {}", productName);
+
+            String sqlXmlPath = String.format("META-INF/sqlxml/%s/upgradeSql.xml", Optional.ofNullable(productName).orElse("mysql").toLowerCase());
+            ClassPathResource pathResource = new ClassPathResource(sqlXmlPath);
+            try (InputStream in = pathResource.getInputStream()) {
+                String content = IOUtils.toString(in, StandardCharsets.UTF_8);
+                sqlCache.putAll(readProperties(content));
+            }
+
+            log.info("load sql xml file: {} success.", sqlXmlPath);
+        } finally {
+            try {
+                connection.close();
+            } catch (Exception e) {
+            }
+        }
+
+    }
+
+
+    protected static Map<String, String> readProperties(String xml) {
+        Map<String, String> prop = new HashMap<>();
+
+        // 保持 properties 中的配置是大小写区分的,否则全是小写的
+        Parser parser = Parser.xmlParser();
+
+        // 大小写敏感
+        parser.settings(ParseSettings.preserveCase);
+
+        Document doc = Jsoup.parse(xml, "http://www.primeton.com/dsp", parser);
+        Elements statements = doc.select("statement");
+        if (statements.size() > 0) {
+            for (Element statement : statements) {
+                prop.put(statement.attr("id"), statement.text());
+            }
+        }
+        return prop;
+    }
+
+
+    /**
+     * 获取一个无须替换参数的 SQL
+     *
+     * @param name
+     * @return
+     */
+    public String getSQL(String name) {
+        return getSQL(name, new HashMap());
+    }
+
+
+    /**
+     * 获取 SQL, 替换模板
+     *
+     * @param name
+     * @param params
+     * @return
+     */
+    public String getSQL(String name, Map<String, Object> params) {
+        params = Optional.ofNullable(params).orElse(new HashMap());
+        String sql = sqlCache.get(name);
+        if (params.isEmpty()) {
+            return sql;
+        }
+
+        Map<String, String> varParam = new HashMap<>(params.size());
+        for (Map.Entry<String, Object> entry : params.entrySet()) {
+            varParam.put(entry.getKey(), "?");
+        }
+        return getFullString(sql, varParam);
+    }
+
+
+    /**
+     * 根据 String 的 #field# 按照 map 中的参数替换,组成一个新的 SQL 返回。
+     *
+     * @param sql    支持 #field# 的变量替换
+     * @param params 参数,如果没有参数则会原路返回
+     * @return 返回新的变量替换的 SQL
+     */
+    public static String getFullString(String sql, Map<String, String> params) {
+        Matcher matcher = SQL_VAR_PATTERN.matcher(sql);
+        StringBuffer sb = new StringBuffer();
+        while (matcher.find()) {
+            String group = matcher.group();
+            String field = StringUtils.trim(group.substring(1, group.length() - 1));
+            String val = params.get(field);
+            if (val == null) {
+                continue;
+            }
+            matcher.appendReplacement(sb, val);
+        }
+        matcher.appendTail(sb);
+
+        return sb.toString();
+    }
+
+    /**
+     * 执行查询 SQL 无参数
+     *
+     * @param sql
+     * @return
+     */
+    public List<Map> queryForMap(String sql) {
+        return queryForMap(sql, new String[]{});
+    }
+
+    /**
+     * 执行一次查询
+     *
+     * @param sql
+     * @param params
+     * @return
+     */
+    public List<Map> queryForMap(String sql, Object[] params) {
+        params = Optional.ofNullable(params).orElse(new Object[]{});
+        return (List) jdbcTemplate.query(sql, new ColumnMapRowMapper(), params);
+    }
+
+
+    /**
+     * 执行一次查询
+     *
+     * @param sql
+     * @param params
+     * @return
+     */
+    public Map queryForObjectMap(String sql, Object[] params) {
+        List<Map> rs = queryForMap(sql, params);
+        return rs.isEmpty() ? Collections.emptyMap() : rs.get(0);
+    }
+
+
+    /**
+     * 执行一个 更新 SQL
+     *
+     * @param sql
+     * @return
+     */
+    public int updateSQL(String sql) {
+        return updateSQL(sql, new String[]{});
+    }
+
+
+    /**
+     * 执行一个 更新 SQL
+     *
+     * @param sql
+     * @param params
+     * @return
+     */
+    public int updateSQL(String sql, Object[] params) {
+        params = Optional.ofNullable(params).orElse(new Object[]{});
+        return jdbcTemplate.update(sql, params);
+    }
+
+
+    /**
+     * 批量执行 SQL
+     *
+     * @param batchSql
+     * @return
+     */
+    public int[] batchUpdateSQL(String... batchSql) throws SQLException {
+        if (batchSql.length == 0) {
+            return new int[]{};
+        }
+
+        return jdbcTemplate.batchUpdate(batchSql);
+    }
+}

+ 72 - 0
springjdbc-sql-template/WorkFlowUpgradeService.java

@@ -0,0 +1,72 @@
+package com.primeton.dsp.upgrader.workflow;
+
+import com.primeton.dsp.upgrader.DAOServiceImpl;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.jdbc.core.JdbcTemplate;
+
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicInteger;
+
+
+/**
+ * 工作流升级, 工作流升级只能在升级 元数据后,执行 step1, metacube 后执行的
+ * <pre>
+ *
+ * Created by zhaopx.
+ * User: zhaopx
+ * Date: 2020/6/8
+ * Time: 15:47
+ *
+ * </pre>
+ *
+ * @author zhaopx
+ */
+@Slf4j
+public class WorkFlowUpgradeService extends DAOServiceImpl {
+
+
+    public WorkFlowUpgradeService(JdbcTemplate jdbcTemplate) {
+        super(jdbcTemplate);
+    }
+
+
+    /**
+     * 开始升级 WorkFlow
+     */
+    public void doUpgradeWorkFlow() {
+        log.info("------------------upgrade workflow---------------------");
+        //List<Map> upgradeResources = this.queryForMap(getSQL("findUpgradeJoinResources"));
+        //System.out.println(upgradeResources.size());
+        try {
+            // 实体绑定的资源
+            String upgradeJoinResources = getSQL("upgradeJoinResources");
+            String[] batchSqls = upgradeJoinResources.split(";\\s*");
+
+            int[] ints = new int[batchSqls.length];
+            int i = 0;
+            for (String batchSql : batchSqls) {
+                ints[i] = updateSQL(batchSql);
+                i++;
+            }
+            AtomicInteger sum = new AtomicInteger(0);
+            Arrays.stream(ints).forEach(it -> sum.addAndGet(it));
+            log.info("升级 {} 条实体资源。", sum.get());
+
+            // 在线作业
+            String upgradeJoinOnlineItems = getSQL("upgradeJoinOnlineItems");
+            String[] batchSqlSet = upgradeJoinOnlineItems.split(";\\s*");
+            int[] ints2 = new int[batchSqls.length];
+            i = 0;
+            for (String batchSql : batchSqlSet) {
+                ints2[i] = updateSQL(batchSql);
+                i++;
+            }
+            AtomicInteger sum2 = new AtomicInteger(0);
+            Arrays.stream(ints2).forEach(it -> sum2.addAndGet(it));
+            log.info("升级 {} 条在线作业绑定资源。", sum2.get());
+        } catch (Exception e) {
+            log.error("升级 workflow 失败。", e);
+            throw new IllegalStateException("升级 workflow 失败, Cause: " + e.getMessage(), e);
+        }
+    }
+}

+ 94 - 0
springjdbc-sql-template/upgradeSql.xml

@@ -0,0 +1,94 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<ns:hibernate xmlns:ns="http://www.primedata.com/xml/ns/j2ee/hibernate"
+			  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+			  xsi:schemaLocation="http://www.primedata.com/xml/ns/j2ee/hibernate LTHibernateSchema.xsd">
+
+	<statement id="findDataSourcePassword">
+		SELECT DATASOURCE_ID,PARAM_CODE,PARAM_VALUE FROM T_HARVEST_DATASOURCE_PARAM WHERE PARAM_CODE=#password#
+	</statement>
+	<statement id="updateDataSourcePassword">
+		UPDATE T_HARVEST_DATASOURCE_PARAM SET PARAM_VALUE=#paramValue# WHERE DATASOURCE_ID=#dataSourceId# AND
+		PARAM_CODE='password'
+	</statement>
+
+	<statement id="findSingleServ">
+		SELECT a.ENTITY_ID, a.ENTITY_CODE, a.ENTITY_NAME, a.PARENT_CATEGORY_ID, a.ENTITY_TYPE,
+		a.PATH, a.SERVICE_CODE, b.SERVICE_NAME, b.SERVICE_URI
+		from dsp_business_entity a join sam_service b on(a.SERVICE_CODE = b.SERVICE_CODE)
+		where a.ENTITY_TYPE = 'T' and a.DB_TABLE_COUNT_RELATION = 1 and a.SERVICE_CODE is not null
+	</statement>
+
+	<statement id="findMutilTableServ">
+		SELECT a.ENTITY_ID, a.ENTITY_CODE, a.ENTITY_NAME, a.PARENT_CATEGORY_ID, a.ENTITY_TYPE,
+		a.PATH, a.SERVICE_CODE, b.SERVICE_NAME, b.SERVICE_URI
+		from dsp_business_entity a join sam_service b on(a.SERVICE_CODE = b.SERVICE_CODE)
+		where a.ENTITY_TYPE = 'RS' and a.DB_TABLE_COUNT_RELATION > 1 and a.SERVICE_CODE is not null
+	</statement>
+
+	<statement id="findDBEntity">
+		SELECT ELEMENT_ID, PARENT_ENTITY_ID, PARENT_RESOURCE_ID,RESOURCE_TYPE,ELEMENT_NAME,ELEMENT_DESC,
+		d.DATASOURCE_TYPE_CODE, d.DATASOURCE_NAME, d.DATA_PATH
+		from dsp_business_element e join t_harvest_datasource d on(e.PARENT_RESOURCE_ID = d.DATASOURCE_ID)
+		where e.PARENT_ENTITY_ID = ? and e.RESOURCE_TYPE != '3'
+	</statement>
+
+	<statement id="findResourceInstance">
+		SELECT INSTANCE_ID, INSTANCE_CODE, INSTANCE_NAME,
+		CLASSIFIER_ID, PARENT_ID, STRING_8 FROM T_MD_INSTANCE WHERE PARENT_ID = ? AND CLASSIFIER_ID = 'Column'
+	</statement>
+
+	<statement id="findSingleServiceInfo">
+		SELECT a.ID as SERVICEID, a.`NAME`, a.`CODE`,
+		a.URI, a.TYPE, a.queryField, b.ID as SIID, b.MODELID, c.DSID, c.TABLENAME
+		FROM DSP_SERVICEINFO a JOIN dsp_si_table b on(a.ID = b.SERVICEID)
+		JOIN dsp_modelinfo c on(b.MODELID = c.ID)
+		where a.TYPE = 'T' and a.CODE = ?
+	</statement>
+
+	<statement id="findServiceInfo">
+		SELECT a.ID as SERVICEID, a.`NAME`, a.`CODE`,
+		a.URI, a.TYPE, a.queryField, b.ID as SIID, b.ID, b.DSID, b.RSSQL
+		FROM DSP_SERVICEINFO a JOIN dsp_si_rs b on(a.ID = b.SERVICEID)
+		where a.TYPE = 'RS' and a.CODE = ?
+	</statement>
+
+	<statement id="findUpgradeJoinResources">
+		select b.ELEMENT_ID, b.PARENT_ENTITY_ID, b.PARENT_RESOURCE_ID, b.RESOURCE_TYPE, b.ELEMENT_NAME, b.ELEMENT_DESC,
+		c.NEW_INSTANCE_ID, c.INSTANCE_NAME
+		from dsp_business_element b join t_md_upgrade c on(b.ELEMENT_ID = c.old_instance_id)
+		where b.RESOURCE_TYPE != '3'
+	</statement>
+
+	<statement id="upgradeJoinResources">
+		alter table dsp_business_element
+		add column NELEMENT_ID varchar(32);
+
+		UPDATE dsp_business_element EL
+		JOIN T_MD_UPGRADE MG ON (
+		EL.ELEMENT_ID = MG.OLD_INSTANCE_ID
+		)
+		SET EL.NELEMENT_ID = MG.NEW_INSTANCE_ID
+		where EL.RESOURCE_TYPE != '3';
+
+		UPDATE dsp_business_element SET ELEMENT_ID = NELEMENT_ID WHERE NELEMENT_ID IS NOT NULL;
+
+		alter table dsp_business_element
+		drop column NELEMENT_ID;
+	</statement>
+
+	<statement id="upgradeJoinOnlineItems">
+		alter table di_online_dataitem
+		add column NRESOURCE_ITEM_ID varchar(32);
+
+		UPDATE di_online_dataitem EL
+		JOIN T_MD_UPGRADE MG ON (
+		EL.RESOURCE_ITEM_ID = MG.OLD_INSTANCE_ID
+		)
+		SET EL.NRESOURCE_ITEM_ID = MG.NEW_INSTANCE_ID;
+
+		UPDATE di_online_dataitem SET RESOURCE_ITEM_ID = NRESOURCE_ITEM_ID WHERE NRESOURCE_ITEM_ID IS NOT NULL;
+
+		alter table di_online_dataitem
+		drop column NRESOURCE_ITEM_ID;
+	</statement>
+</ns:hibernate>