| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332 |
- package com.yiidata.amc.jdbc
- import java.math.BigDecimal
- import java.sql.{Types, Timestamp, Date, PreparedStatement}
- import org.apache.spark.sql.Row
- /**
- *
- * <p>
- * JDBC 入库,数据类型匹配的 statement set value,有空值检查
- * </p>
- *
- * Created by ZhenQin on 2018/2/6 0006-17:17
- * Vendor: yiidata.com
- *
- */
- abstract class PstateSetter(fieldName:String) extends Serializable {
- /**
- * 修改的字段名称
- */
- val field:String = fieldName
- /**
- * 该列表属于第几列,RDD 该从 0 开始
- */
- val index = try {
- Integer.parseInt(fieldName.replaceAll("\\$", ""))
- } catch {
- case e:NumberFormatException => 0
- }
- /**
- * 设置值
- *
- * @param ps 数据库statement
- * @param num 第几个参数
- * @param value 当前行数据
- */
- def setValue(ps:PreparedStatement, num:Int, value: Any): Unit
- }
- class IntPstateSetter(fieldName:String) extends PstateSetter(fieldName) {
- override def setValue(ps:PreparedStatement, num:Int, value: Any): Unit = {
- if(value.isInstanceOf[Row]){
- val row = value.asInstanceOf[Row]
- val i = row.fieldIndex(fieldName)
- // 该行数据的该列是 null 值
- if(row.isNullAt(i)){
- ps.setNull(num, Types.INTEGER)
- } else {
- ps.setInt(num, row.getInt(i))
- }
- return
- }
- // 值是 null 的
- if(value == null){
- ps.setNull(num, Types.INTEGER)
- } else {
- ps.setInt(num, value.asInstanceOf[Int])
- }
- }
- }
- class StringPstateSetter(fieldName:String) extends PstateSetter(fieldName) {
- override def setValue(ps:PreparedStatement, num:Int, value: Any): Unit = {
- if(value.isInstanceOf[Row]){
- val row = value.asInstanceOf[Row]
- val i = row.fieldIndex(fieldName)
- // 该行数据的该列是 null 值
- if(row.isNullAt(i)){
- ps.setNull(num, Types.VARCHAR)
- } else {
- ps.setString(num, row.getString(i))
- }
- return
- }
- // 值是 null 的
- if(value == null){
- ps.setNull(num, Types.VARCHAR)
- } else {
- ps.setString(num, value.asInstanceOf[String])
- }
- }
- }
- class LongPstateSetter(fieldName:String) extends PstateSetter(fieldName) {
- override def setValue(ps:PreparedStatement, num:Int, value: Any): Unit = {
- if(value.isInstanceOf[Row]){
- val row = value.asInstanceOf[Row]
- val i = row.fieldIndex(fieldName)
- // 该行数据的该列是 null 值
- if(row.isNullAt(i)){
- ps.setNull(num, Types.BIGINT)
- } else {
- ps.setLong(num, row.getLong(i))
- }
- return
- }
- // 值是 null 的
- if(value == null){
- ps.setNull(num, Types.BIGINT)
- } else {
- ps.setLong(num, value.asInstanceOf[Long])
- }
- }
- }
- class DatePstateSetter(fieldName:String) extends PstateSetter(fieldName) {
- override def setValue(ps:PreparedStatement, num:Int, value: Any): Unit = {
- if(value.isInstanceOf[Row]){
- val row = value.asInstanceOf[Row]
- val i = row.fieldIndex(fieldName)
- // 该行数据的该列是 null 值
- if(row.isNullAt(i)){
- ps.setNull(num, Types.DATE)
- } else {
- ps.setDate(num, row.getDate(i))
- }
- return
- }
- // 值是 null 的
- if(value == null){
- ps.setNull(num, Types.DATE)
- } else {
- ps.setDate(num, value.asInstanceOf[Date])
- }
- }
- }
- class TimestampPstateSetter(fieldName:String) extends PstateSetter(fieldName) {
- override def setValue(ps:PreparedStatement, num:Int, value: Any): Unit = {
- if(value.isInstanceOf[Row]){
- val row = value.asInstanceOf[Row]
- val i = row.fieldIndex(fieldName)
- // 该行数据的该列是 null 值
- if(row.isNullAt(i)){
- ps.setNull(num, Types.TIMESTAMP)
- } else {
- ps.setTimestamp(num, row.getTimestamp(i))
- }
- return
- }
- // 值是 null 的
- if(value == null){
- ps.setNull(num, Types.TIMESTAMP)
- } else {
- ps.setTimestamp(num, value.asInstanceOf[Timestamp])
- }
- }
- }
- class DoublePstateSetter(fieldName:String) extends PstateSetter(fieldName) {
- override def setValue(ps:PreparedStatement, num:Int, value: Any): Unit = {
- if(value.isInstanceOf[Row]){
- val row = value.asInstanceOf[Row]
- val i = row.fieldIndex(fieldName)
- // 该行数据的该列是 null 值
- if(row.isNullAt(i)){
- ps.setNull(num, Types.DOUBLE)
- } else {
- ps.setDouble(num, row.getDouble(i))
- }
- return
- }
- // 值是 null 的
- if(value == null){
- ps.setNull(num, Types.DOUBLE)
- } else {
- ps.setDouble(num, value.asInstanceOf[Double])
- }
- }
- }
- class FloatPstateSetter(fieldName:String) extends PstateSetter(fieldName) {
- override def setValue(ps:PreparedStatement, num:Int, value: Any): Unit = {
- if(value.isInstanceOf[Row]){
- val row = value.asInstanceOf[Row]
- val i = row.fieldIndex(fieldName)
- // 该行数据的该列是 null 值
- if(row.isNullAt(i)){
- ps.setNull(num, Types.FLOAT)
- } else {
- ps.setFloat(num, row.getFloat(i))
- }
- return
- }
- // 值是 null 的
- if(value == null){
- ps.setNull(num, Types.FLOAT)
- } else {
- ps.setFloat(num, value.asInstanceOf[Float])
- }
- }
- }
- class BytePstateSetter(fieldName:String) extends PstateSetter(fieldName) {
- override def setValue(ps:PreparedStatement, num:Int, value: Any): Unit = {
- if(value.isInstanceOf[Row]){
- val row = value.asInstanceOf[Row]
- val i = row.fieldIndex(fieldName)
- // 该行数据的该列是 null 值
- if(row.isNullAt(i)){
- ps.setNull(num, Types.TINYINT)
- } else {
- ps.setByte(num, row.getByte(i))
- }
- return
- }
- // 值是 null 的
- if(value == null){
- ps.setNull(num, Types.TINYINT)
- } else {
- ps.setByte(num, value.asInstanceOf[Byte])
- }
- }
- }
- class ShortPstateSetter(fieldName:String) extends PstateSetter(fieldName) {
- override def setValue(ps:PreparedStatement, num:Int, value: Any): Unit = {
- if(value.isInstanceOf[Row]){
- val row = value.asInstanceOf[Row]
- val i = row.fieldIndex(fieldName)
- // 该行数据的该列是 null 值
- if(row.isNullAt(i)){
- ps.setNull(num, Types.SMALLINT)
- } else {
- ps.setShort(num, row.getShort(i))
- }
- return
- }
- // 值是 null 的
- if(value == null){
- ps.setNull(num, Types.SMALLINT)
- } else {
- ps.setShort(num, value.asInstanceOf[Short])
- }
- }
- }
- class BoolPstateSetter(fieldName:String) extends PstateSetter(fieldName) {
- override def setValue(ps:PreparedStatement, num:Int, value: Any): Unit = {
- if(value.isInstanceOf[Row]){
- val row = value.asInstanceOf[Row]
- val i = row.fieldIndex(fieldName)
- // 该行数据的该列是 null 值
- if(row.isNullAt(i)){
- ps.setNull(num, Types.BOOLEAN)
- } else {
- ps.setBoolean(num, row.getBoolean(i))
- }
- return
- }
- // 值是 null 的
- if(value == null){
- ps.setNull(num, Types.BOOLEAN)
- } else {
- ps.setBoolean(num, value.asInstanceOf[Boolean])
- }
- }
- }
- /**
- * 高精度整数,浮点数
- * @param fieldName
- */
- class DecimalPstateSetter (fieldName:String) extends PstateSetter(fieldName) {
- override def setValue(ps:PreparedStatement, num:Int, value: Any): Unit = {
- if(value.isInstanceOf[Row]){
- val row = value.asInstanceOf[Row]
- val i = row.fieldIndex(fieldName)
- // 该行数据的该列是 null 值
- if(row.isNullAt(i)){
- ps.setNull(num, Types.DECIMAL)
- } else {
- ps.setBigDecimal(num, row.getDecimal(i))
- }
- return
- }
- // 值是 null 的
- if(value == null){
- ps.setNull(num, Types.DECIMAL)
- } else {
- ps.setBigDecimal(num, value.asInstanceOf[BigDecimal])
- }
- }
- }
|