||
- package com.yiidata.amc.jdbc
- import java.math.BigDecimal
- import java.sql.{Types, Timestamp, Date, PreparedStatement}
- import org.apache.spark.sql.Row
- /**
- *
- * <p>
- * JDBC 入库,数据类型匹配的 statement set value,有空值检查
- * </p>
- *
- * Created by ZhenQin on 2018/2/6 0006-17:17
- * Vendor: yiidata.com
- *
- */
- abstract class PstateSetter(fieldName:String) extends Serializable {
- /**
- * 修改的字段名称
- */
- val field:String = fieldName
- /**
- * 该列表属于第几列,RDD 该从 0 开始
- */
- val index = try {
- Integer.parseInt(fieldName.replaceAll("\\$", ""))
- } catch {
- case e:NumberFormatException => 0
- }
- /**
- * 设置值
- *
- * @param ps 数据库statement
- * @param num 第几个参数
- * @param value 当前行数据
- */
- def setValue(ps:PreparedStatement, num:Int, value: Any): Unit
- }
- class IntPstateSetter(fieldName:String) extends PstateSetter(fieldName) {
- override def setValue(ps:PreparedStatement, num:Int, value: Any): Unit = {
- if(value.isInstanceOf[Row]){
- val row = value.asInstanceOf[Row]
- val i = row.fieldIndex(fieldName)
- // 该行数据的该列是 null 值
- if(row.isNullAt(i)){
- ps.setNull(num, Types.INTEGER)
- } else {
- ps.setInt(num, row.getInt(i))
- }
- return
- }
- // 值是 null 的
- if(value == null){
- ps.setNull(num, Types.INTEGER)
- } else {
- ps.setInt(num, value.asInstanceOf[Int])
- }
- }
- }
- class StringPstateSetter(fieldName:String) extends PstateSetter(fieldName) {
- override def setValue(ps:PreparedStatement, num:Int, value: Any): Unit = {
- if(value.isInstanceOf[Row]){
- val row = value.asInstanceOf[Row]
- val i = row.fieldIndex(fieldName)
- // 该行数据的该列是 null 值
- if(row.isNullAt(i)){
- ps.setNull(num, Types.VARCHAR)
- } else {
- ps.setString(num, row.getString(i))
- }
- return
- }
- // 值是 null 的
- if(value == null){
- ps.setNull(num, Types.VARCHAR)
- } else {
- ps.setString(num, value.asInstanceOf[String])
- }
- }
- }
- class LongPstateSetter(fieldName:String) extends PstateSetter(fieldName) {
- override def setValue(ps:PreparedStatement, num:Int, value: Any): Unit = {
- if(value.isInstanceOf[Row]){
- val row = value.asInstanceOf[Row]
- val i = row.fieldIndex(fieldName)
- // 该行数据的该列是 null 值
- if(row.isNullAt(i)){
- ps.setNull(num, Types.BIGINT)
- } else {
- ps.setLong(num, row.getLong(i))
- }
- return
- }
- // 值是 null 的
- if(value == null){
- ps.setNull(num, Types.BIGINT)
- } else {
- ps.setLong(num, value.asInstanceOf[Long])
- }
- }
- }
- class DatePstateSetter(fieldName:String) extends PstateSetter(fieldName) {
- override def setValue(ps:PreparedStatement, num:Int, value: Any): Unit = {
- if(value.isInstanceOf[Row]){
- val row = value.asInstanceOf[Row]
- val i = row.fieldIndex(fieldName)
- // 该行数据的该列是 null 值
- if(row.isNullAt(i)){
- ps.setNull(num, Types.DATE)
- } else {
- ps.setDate(num, row.getDate(i))
- }
- return
- }
- // 值是 null 的
- if(value == null){
- ps.setNull(num, Types.DATE)
- } else {
- ps.setDate(num, value.asInstanceOf[Date])
- }
- }
- }
- class TimestampPstateSetter(fieldName:String) extends PstateSetter(fieldName) {
- override def setValue(ps:PreparedStatement, num:Int, value: Any): Unit = {
- if(value.isInstanceOf[Row]){
- val row = value.asInstanceOf[Row]
- val i = row.fieldIndex(fieldName)
- // 该行数据的该列是 null 值
- if(row.isNullAt(i)){
- ps.setNull(num, Types.TIMESTAMP)
- } else {
- ps.setTimestamp(num, row.getTimestamp(i))
- }
- return
- }
- // 值是 null 的
- if(value == null){
- ps.setNull(num, Types.TIMESTAMP)
- } else {
- ps.setTimestamp(num, value.asInstanceOf[Timestamp])
- }
- }
- }
- class DoublePstateSetter(fieldName:String) extends PstateSetter(fieldName) {
- override def setValue(ps:PreparedStatement, num:Int, value: Any): Unit = {
- if(value.isInstanceOf[Row]){
- val row = value.asInstanceOf[Row]
- val i = row.fieldIndex(fieldName)
- // 该行数据的该列是 null 值
- if(row.isNullAt(i)){
- ps.setNull(num, Types.DOUBLE)
- } else {
- ps.setDouble(num, row.getDouble(i))
- }
- return
- }
- // 值是 null 的
- if(value == null){
- ps.setNull(num, Types.DOUBLE)
- } else {
- ps.setDouble(num, value.asInstanceOf[Double])
- }
- }
- }
- class FloatPstateSetter(fieldName:String) extends PstateSetter(fieldName) {
- override def setValue(ps:PreparedStatement, num:Int, value: Any): Unit = {
- if(value.isInstanceOf[Row]){
- val row = value.asInstanceOf[Row]
- val i = row.fieldIndex(fieldName)
- // 该行数据的该列是 null 值
- if(row.isNullAt(i)){
- ps.setNull(num, Types.FLOAT)
- } else {
- ps.setFloat(num, row.getFloat(i))
- }
- return
- }
- // 值是 null 的
- if(value == null){
- ps.setNull(num, Types.FLOAT)
- } else {
- ps.setFloat(num, value.asInstanceOf[Float])
- }
- }
- }
- class BytePstateSetter(fieldName:String) extends PstateSetter(fieldName) {
- override def setValue(ps:PreparedStatement, num:Int, value: Any): Unit = {
- if(value.isInstanceOf[Row]){
- val row = value.asInstanceOf[Row]
- val i = row.fieldIndex(fieldName)
- // 该行数据的该列是 null 值
- if(row.isNullAt(i)){
- ps.setNull(num, Types.TINYINT)
- } else {
- ps.setByte(num, row.getByte(i))
- }
- return
- }
- // 值是 null 的
- if(value == null){
- ps.setNull(num, Types.TINYINT)
- } else {
- ps.setByte(num, value.asInstanceOf[Byte])
- }
- }
- }
- class ShortPstateSetter(fieldName:String) extends PstateSetter(fieldName) {
- override def setValue(ps:PreparedStatement, num:Int, value: Any): Unit = {
- if(value.isInstanceOf[Row]){
- val row = value.asInstanceOf[Row]
- val i = row.fieldIndex(fieldName)
- // 该行数据的该列是 null 值
- if(row.isNullAt(i)){
- ps.setNull(num, Types.SMALLINT)
- } else {
- ps.setShort(num, row.getShort(i))
- }
- return
- }
- // 值是 null 的
- if(value == null){
- ps.setNull(num, Types.SMALLINT)
- } else {
- ps.setShort(num, value.asInstanceOf[Short])
- }
- }
- }
- class BoolPstateSetter(fieldName:String) extends PstateSetter(fieldName) {
- override def setValue(ps:PreparedStatement, num:Int, value: Any): Unit = {
- if(value.isInstanceOf[Row]){
- val row = value.asInstanceOf[Row]
- val i = row.fieldIndex(fieldName)
- // 该行数据的该列是 null 值
- if(row.isNullAt(i)){
- ps.setNull(num, Types.BOOLEAN)
- } else {
- ps.setBoolean(num, row.getBoolean(i))
- }
- return
- }
- // 值是 null 的
- if(value == null){
- ps.setNull(num, Types.BOOLEAN)
- } else {
- ps.setBoolean(num, value.asInstanceOf[Boolean])
- }
- }
- }
- /**
- * 高精度整数,浮点数
- * @param fieldName
- */
- class DecimalPstateSetter (fieldName:String) extends PstateSetter(fieldName) {
- override def setValue(ps:PreparedStatement, num:Int, value: Any): Unit = {
- if(value.isInstanceOf[Row]){
- val row = value.asInstanceOf[Row]
- val i = row.fieldIndex(fieldName)
- // 该行数据的该列是 null 值
- if(row.isNullAt(i)){
- ps.setNull(num, Types.DECIMAL)
- } else {
- ps.setBigDecimal(num, row.getDecimal(i))
- }
- return
- }
- // 值是 null 的
- if(value == null){
- ps.setNull(num, Types.DECIMAL)
- } else {
- ps.setBigDecimal(num, value.asInstanceOf[BigDecimal])
- }
- }
- }
|