提交 5f3d2383 编写于 作者: zhenxin.ma's avatar zhenxin.ma

Hive表数据导入到MYSQL

上级 a3b45e90
package com.report
import java.io.FileInputStream
import java.sql.{Connection, Date, DriverManager, ResultSet, Timestamp}
import java.util.Properties
import com.utils.{ MyUtil}
import com.utils.MyUtil.getProPerties
import org.apache.spark.SparkConf
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.sql.types.{BooleanType, ByteType, DataType, DateType, DoubleType, FloatType, IntegerType, LongType, ShortType, StringType, TimestampType}
import org.apache.spark.sql.{Column, DataFrame, Row, SaveMode, SparkSession}
/**
* 学情报告同步最后的 pica_project 层数据到MYSQL
* @Author zhenxin.ma
* @Date 2019/11/22 16:18
* @Version 1.0
*/
class SyncDataToMySQL {
}
object SyncDataToMySQL {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SyncDataToMySQL")
conf.set("spark.serializer", classOf[KryoSerializer].getName)
// 启用rdd压缩
conf.set("spark.rdd.compress", "true")
// 设置压缩格式为snappy, 默认也就是lz4, 这种压缩格式压缩比高, 速度快, 但是耗费的内存相对也多一些
conf.set("spark.io.compression.codec", "snappy")
// 设置压缩时使用的内存缓冲区大小
conf.set("spark.io.compression.snappy.blockSize", "64k")
val sparkSession: SparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
val times: Int = args(0).toInt
val sql =
s"""
|select a.stat_date as log_date,
|${times} as times,
|a.project_id,
|COALESCE(a.project_name,'') project_name,
|a.doctor_id,
|COALESCE(a.doctor_name,'') doctor_name,
|cast(a.doctor_role as int) doctor_role,
|COALESCE(a.province_id,0) province_id,
|COALESCE(a.province_name,'') province_name,
|COALESCE(a.city_id,0) city_id,
|COALESCE(a.city_name,'') city_name,
|COALESCE(a.county_id,0) county_id,
|COALESCE(a.county_name,'') county_name,
|case when a.town_id is null OR a.town_id=-1 then 0 else a.town_id END town_id,
|COALESCE(a.town_name,'') town_name,
|COALESCE(a.hospital_id,0) hospital_id,
|COALESCE(a.hospital_name,'') hospital_name,
|COALESCE(a.department_id,0) department_id,
|COALESCE(a.department_name,'') department_name,
|COALESCE(a.education_id,0) education_id,
|COALESCE(a.education_name,'') education_name,
|COALESCE(a.title_id,0) title_id,
|COALESCE(a.title_name,'') title_name,
|COALESCE(a.sex,0) sex,
|COALESCE(a.age,0) age,
|1 as delete_flag ,
|0 as created_id,
|from_unixtime(UNIX_TIMESTAMP()) as created_time,
|0 as modified_id,
|from_unixtime(UNIX_TIMESTAMP()) as modified_time
|from pica_project_uat.lr_project_doctor_mapping a
""".stripMargin
val dataFrame: DataFrame = sparkSession.sql(sql)
dataFrame.printSchema()
println("---------------数据量为------------" + dataFrame.count())
val tableName="r_project_doctor_mapping"
val path="/mysqlConfig.properties"
MyUtil.saveASMysqlTable(dataFrame,tableName,SaveMode.Append,path)
print("-------------------END---------------------------------")
}
/**
* @Description 通过JDBC保存DF到MySQL
* @param tableName 表名字
* @param resultDateFrame 要保存的DF
* @return void
**/
def saveDFtoMysql(tableName: String, resultDateFrame: DataFrame): Unit = {
val colNumbsers: Int = resultDateFrame.columns.length
println("----------columns is " +colNumbsers)
val sql = getInsertSql(tableName, colNumbsers)
println("----------sql 为--------------- " + sql)
getConnection()
val columnDataTypes : Array[DataType] = resultDateFrame.schema.fields.map(_.dataType)
resultDateFrame.foreachPartition(partitionRecords => {
val conn: Connection = getConnection()
val prepareStatement = conn.prepareStatement(sql)
val metaData : ResultSet = conn.getMetaData.getColumns("null","%",tableName,"%")
try {
//设置事务,批量插入数据
conn.setAutoCommit(false)
partitionRecords.foreach(record => {
for (i <- 1 to colNumbsers) {
//获取数据
val value = record.get(i - 1)
//获取数据类型
val dateType = columnDataTypes(i - 1)
if (value != null) {
//先转换成String
prepareStatement.setString(i, value.toString)
dateType match {
case _: ByteType => prepareStatement.setInt(i, record.getAs[Int](i - 1))
case _: ShortType => prepareStatement.setInt(i, record.getAs[Int](i - 1))
case _: IntegerType => prepareStatement.setInt(i, record.getAs[Int](i - 1))
case _: LongType => prepareStatement.setLong(i, record.getAs[Long](i - 1))
case _: BooleanType => prepareStatement.setBoolean(i, record.getAs[Boolean](i - 1))
case _: FloatType => prepareStatement.setFloat(i, record.getAs[Float](i - 1))
case _: DoubleType => prepareStatement.setDouble(i, record.getAs[Double](i - 1))
case _: StringType => prepareStatement.setString(i, record.getAs[String](i - 1))
case _: TimestampType => prepareStatement.setTimestamp(i, record.getAs[Timestamp](i - 1))
case _: DateType => prepareStatement.setDate(i, record.getAs[Date](i - 1))
case _ => throw new RuntimeException("nonsupport $ {dateType} !!!")
}
} else {
metaData.absolute(i)
prepareStatement.setNull(i,metaData.getInt("DATA_TYPE"))
}
}
//批量插入
prepareStatement.addBatch()
})
prepareStatement.executeBatch()
conn.commit()
}catch {
case e: Exception => println(s"@@saveDFtoDB ${e.getMessage}")
}finally {
prepareStatement.close()
conn.close()
}
})
}
/**
* @Description 拼接 SQL
* @param tableName 表名字
* @param colNumbers 列数目
* @return java.lang.String
**/
def getInsertSql(tableName: String, colNumbers: Int): String = {
var sqlStr = "insert into " + tableName + " values(null,"
for (i <- 1 to colNumbers) {
sqlStr += "?"
if (i != colNumbers) {
sqlStr += ","
}
}
sqlStr += ")"
sqlStr
}
/**
* @Description 获取MYSQL连接
* @param
* @return java.sql.Connection
**/
def getConnection(): Connection = {
val properties: Properties = getProPerties("/mysqlConfig.properties")
var con: Connection = null
try{
Class.forName(properties.getProperty("mysql.driver"))
con = DriverManager.getConnection(properties.getProperty("mysql.url"), properties.getProperty("mysql.username"), properties.getProperty("mysql.password"))
println("--------------------get connection is success----------------------------")
} catch {
case e:Exception => {
println("-----------MYSQL Connection has exception , msg = "+ e.getMessage())
}
}
con
}
}
Markdown 格式
0% or
您添加了 0 到此讨论。请谨慎行事。
先完成此消息的编辑!
想要评论请 注册