PstateSetter.scala 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332
  1. package com.yiidata.amc.jdbc
  2. import java.math.BigDecimal
  3. import java.sql.{Types, Timestamp, Date, PreparedStatement}
  4. import org.apache.spark.sql.Row
  5. /**
  6. *
  7. * <p>
  8. * JDBC 入库,数据类型匹配的 statement set value,有空值检查
  9. * </p>
  10. *
  11. * Created by ZhenQin on 2018/2/6 0006-17:17
  12. * Vendor: yiidata.com
  13. *
  14. */
  15. abstract class PstateSetter(fieldName:String) extends Serializable {
  16. /**
  17. * 修改的字段名称
  18. */
  19. val field:String = fieldName
  20. /**
  21. * 该列表属于第几列,RDD 该从 0 开始
  22. */
  23. val index = try {
  24. Integer.parseInt(fieldName.replaceAll("\\$", ""))
  25. } catch {
  26. case e:NumberFormatException => 0
  27. }
  28. /**
  29. * 设置值
  30. *
  31. * @param ps 数据库statement
  32. * @param num 第几个参数
  33. * @param value 当前行数据
  34. */
  35. def setValue(ps:PreparedStatement, num:Int, value: Any): Unit
  36. }
  37. class IntPstateSetter(fieldName:String) extends PstateSetter(fieldName) {
  38. override def setValue(ps:PreparedStatement, num:Int, value: Any): Unit = {
  39. if(value.isInstanceOf[Row]){
  40. val row = value.asInstanceOf[Row]
  41. val i = row.fieldIndex(fieldName)
  42. // 该行数据的该列是 null 值
  43. if(row.isNullAt(i)){
  44. ps.setNull(num, Types.INTEGER)
  45. } else {
  46. ps.setInt(num, row.getInt(i))
  47. }
  48. return
  49. }
  50. // 值是 null 的
  51. if(value == null){
  52. ps.setNull(num, Types.INTEGER)
  53. } else {
  54. ps.setInt(num, value.asInstanceOf[Int])
  55. }
  56. }
  57. }
  58. class StringPstateSetter(fieldName:String) extends PstateSetter(fieldName) {
  59. override def setValue(ps:PreparedStatement, num:Int, value: Any): Unit = {
  60. if(value.isInstanceOf[Row]){
  61. val row = value.asInstanceOf[Row]
  62. val i = row.fieldIndex(fieldName)
  63. // 该行数据的该列是 null 值
  64. if(row.isNullAt(i)){
  65. ps.setNull(num, Types.VARCHAR)
  66. } else {
  67. ps.setString(num, row.getString(i))
  68. }
  69. return
  70. }
  71. // 值是 null 的
  72. if(value == null){
  73. ps.setNull(num, Types.VARCHAR)
  74. } else {
  75. ps.setString(num, value.asInstanceOf[String])
  76. }
  77. }
  78. }
  79. class LongPstateSetter(fieldName:String) extends PstateSetter(fieldName) {
  80. override def setValue(ps:PreparedStatement, num:Int, value: Any): Unit = {
  81. if(value.isInstanceOf[Row]){
  82. val row = value.asInstanceOf[Row]
  83. val i = row.fieldIndex(fieldName)
  84. // 该行数据的该列是 null 值
  85. if(row.isNullAt(i)){
  86. ps.setNull(num, Types.BIGINT)
  87. } else {
  88. ps.setLong(num, row.getLong(i))
  89. }
  90. return
  91. }
  92. // 值是 null 的
  93. if(value == null){
  94. ps.setNull(num, Types.BIGINT)
  95. } else {
  96. ps.setLong(num, value.asInstanceOf[Long])
  97. }
  98. }
  99. }
  100. class DatePstateSetter(fieldName:String) extends PstateSetter(fieldName) {
  101. override def setValue(ps:PreparedStatement, num:Int, value: Any): Unit = {
  102. if(value.isInstanceOf[Row]){
  103. val row = value.asInstanceOf[Row]
  104. val i = row.fieldIndex(fieldName)
  105. // 该行数据的该列是 null 值
  106. if(row.isNullAt(i)){
  107. ps.setNull(num, Types.DATE)
  108. } else {
  109. ps.setDate(num, row.getDate(i))
  110. }
  111. return
  112. }
  113. // 值是 null 的
  114. if(value == null){
  115. ps.setNull(num, Types.DATE)
  116. } else {
  117. ps.setDate(num, value.asInstanceOf[Date])
  118. }
  119. }
  120. }
  121. class TimestampPstateSetter(fieldName:String) extends PstateSetter(fieldName) {
  122. override def setValue(ps:PreparedStatement, num:Int, value: Any): Unit = {
  123. if(value.isInstanceOf[Row]){
  124. val row = value.asInstanceOf[Row]
  125. val i = row.fieldIndex(fieldName)
  126. // 该行数据的该列是 null 值
  127. if(row.isNullAt(i)){
  128. ps.setNull(num, Types.TIMESTAMP)
  129. } else {
  130. ps.setTimestamp(num, row.getTimestamp(i))
  131. }
  132. return
  133. }
  134. // 值是 null 的
  135. if(value == null){
  136. ps.setNull(num, Types.TIMESTAMP)
  137. } else {
  138. ps.setTimestamp(num, value.asInstanceOf[Timestamp])
  139. }
  140. }
  141. }
  142. class DoublePstateSetter(fieldName:String) extends PstateSetter(fieldName) {
  143. override def setValue(ps:PreparedStatement, num:Int, value: Any): Unit = {
  144. if(value.isInstanceOf[Row]){
  145. val row = value.asInstanceOf[Row]
  146. val i = row.fieldIndex(fieldName)
  147. // 该行数据的该列是 null 值
  148. if(row.isNullAt(i)){
  149. ps.setNull(num, Types.DOUBLE)
  150. } else {
  151. ps.setDouble(num, row.getDouble(i))
  152. }
  153. return
  154. }
  155. // 值是 null 的
  156. if(value == null){
  157. ps.setNull(num, Types.DOUBLE)
  158. } else {
  159. ps.setDouble(num, value.asInstanceOf[Double])
  160. }
  161. }
  162. }
  163. class FloatPstateSetter(fieldName:String) extends PstateSetter(fieldName) {
  164. override def setValue(ps:PreparedStatement, num:Int, value: Any): Unit = {
  165. if(value.isInstanceOf[Row]){
  166. val row = value.asInstanceOf[Row]
  167. val i = row.fieldIndex(fieldName)
  168. // 该行数据的该列是 null 值
  169. if(row.isNullAt(i)){
  170. ps.setNull(num, Types.FLOAT)
  171. } else {
  172. ps.setFloat(num, row.getFloat(i))
  173. }
  174. return
  175. }
  176. // 值是 null 的
  177. if(value == null){
  178. ps.setNull(num, Types.FLOAT)
  179. } else {
  180. ps.setFloat(num, value.asInstanceOf[Float])
  181. }
  182. }
  183. }
  184. class BytePstateSetter(fieldName:String) extends PstateSetter(fieldName) {
  185. override def setValue(ps:PreparedStatement, num:Int, value: Any): Unit = {
  186. if(value.isInstanceOf[Row]){
  187. val row = value.asInstanceOf[Row]
  188. val i = row.fieldIndex(fieldName)
  189. // 该行数据的该列是 null 值
  190. if(row.isNullAt(i)){
  191. ps.setNull(num, Types.TINYINT)
  192. } else {
  193. ps.setByte(num, row.getByte(i))
  194. }
  195. return
  196. }
  197. // 值是 null 的
  198. if(value == null){
  199. ps.setNull(num, Types.TINYINT)
  200. } else {
  201. ps.setByte(num, value.asInstanceOf[Byte])
  202. }
  203. }
  204. }
  205. class ShortPstateSetter(fieldName:String) extends PstateSetter(fieldName) {
  206. override def setValue(ps:PreparedStatement, num:Int, value: Any): Unit = {
  207. if(value.isInstanceOf[Row]){
  208. val row = value.asInstanceOf[Row]
  209. val i = row.fieldIndex(fieldName)
  210. // 该行数据的该列是 null 值
  211. if(row.isNullAt(i)){
  212. ps.setNull(num, Types.SMALLINT)
  213. } else {
  214. ps.setShort(num, row.getShort(i))
  215. }
  216. return
  217. }
  218. // 值是 null 的
  219. if(value == null){
  220. ps.setNull(num, Types.SMALLINT)
  221. } else {
  222. ps.setShort(num, value.asInstanceOf[Short])
  223. }
  224. }
  225. }
  226. class BoolPstateSetter(fieldName:String) extends PstateSetter(fieldName) {
  227. override def setValue(ps:PreparedStatement, num:Int, value: Any): Unit = {
  228. if(value.isInstanceOf[Row]){
  229. val row = value.asInstanceOf[Row]
  230. val i = row.fieldIndex(fieldName)
  231. // 该行数据的该列是 null 值
  232. if(row.isNullAt(i)){
  233. ps.setNull(num, Types.BOOLEAN)
  234. } else {
  235. ps.setBoolean(num, row.getBoolean(i))
  236. }
  237. return
  238. }
  239. // 值是 null 的
  240. if(value == null){
  241. ps.setNull(num, Types.BOOLEAN)
  242. } else {
  243. ps.setBoolean(num, value.asInstanceOf[Boolean])
  244. }
  245. }
  246. }
  247. /**
  248. * 高精度整数,浮点数
  249. * @param fieldName
  250. */
  251. class DecimalPstateSetter (fieldName:String) extends PstateSetter(fieldName) {
  252. override def setValue(ps:PreparedStatement, num:Int, value: Any): Unit = {
  253. if(value.isInstanceOf[Row]){
  254. val row = value.asInstanceOf[Row]
  255. val i = row.fieldIndex(fieldName)
  256. // 该行数据的该列是 null 值
  257. if(row.isNullAt(i)){
  258. ps.setNull(num, Types.DECIMAL)
  259. } else {
  260. ps.setBigDecimal(num, row.getDecimal(i))
  261. }
  262. return
  263. }
  264. // 值是 null 的
  265. if(value == null){
  266. ps.setNull(num, Types.DECIMAL)
  267. } else {
  268. ps.setBigDecimal(num, value.asInstanceOf[BigDecimal])
  269. }
  270. }
  271. }