提交 09cdbd5a 编写于 作者: zhenxin.ma's avatar zhenxin.ma

DS层同步数据逻辑

上级 70089c83
package com.data
import java.sql.{PreparedStatement, Timestamp}
import com.DateUtils
import com.mysql.jdbc.Driver
import com.utils.{JDBCUtil, MySQLConfig, MyUtil, SyncDataConfig}
import org.apache.spark.{SparkConf}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, RowFactory, SaveMode, SparkSession}
import scala.collection.mutable.ListBuffer
/**
* 同步MYSQL的Doctor、Organization、Department表数据,到Hive DS层
* @Author zhenxin.ma
* @Date 2019/11/15 10:02
* @Version 1.0
*/
class SyncDoctorOrganizationDepartment {
}
object SyncDoctorOrganizationDepartment {
def main(args: Array[String]): Unit = {
//1.执行任务之前先往record表记录
val insertSQL: String =
s"""
|insert into ${MySQLConfig.HDFS_BASE}.${MySQLConfig.HDFS_TABLE} (job_id,job_name,job_type,job_scn,status,start_time)
|values(1814,'${SyncDataConfig.DATABASE1}.study_report','0',?,'0',?)""".stripMargin
//设置同步数据的批次号,格式是20190912
val yesterdayTime: String = DateUtils.getYesterdayTime(-1).split(" ")(0)
val scnData: String = yesterdayTime.replace("-","")
//设置任务开始时间,格式是2019-09-12 14:03:30
val startTime: String = DateUtils.getTodayDate
//存储SQL中的参数
val insertArr: Array[String] = Array[String](scnData,startTime)
//获取MYSQL连接
val connSql: java.sql.Connection = JDBCUtil.getConnection()
//插入数据
val flag: Int = JDBCUtil.insertRecord(connSql,insertSQL,insertArr)
//2.处理Spark任务
try {
val conf: SparkConf = new SparkConf().setAppName("SyncReportData")
MyUtil.setConfigure(conf)
val sparkSession: SparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
//1.1先清空pica_portal_campaign_doctor、pica_portal_campaign_organization表数据
sparkSession.sql(s"TRUNCATE TABLE ${SyncDataConfig.DATABASE1}.${SyncDataConfig.Hive_TABLE1}")
sparkSession.sql(s"TRUNCATE TABLE ${SyncDataConfig.DATABASE1}.${SyncDataConfig.Hive_TABLE2}")
//1.2读取Hive pica_portal_campaign_mapping 配置表数据
val docOrgSQL: String = s"select table_doctor_seq, table_organization_seq " +
s"from ${SyncDataConfig.DATABASE1}.${SyncDataConfig.Hive_TABLE} where created_time < current_date()"
val docOrgDF: DataFrame = sparkSession.sql(docOrgSQL)
val docOrgRDD: RDD[Row] = docOrgDF.rdd
//2.获取doctor_seq的值,去重,导入doctor数据
val doctor_seq: Array[Int] = docOrgRDD.map(row => {
(row.getAs[Int]("table_doctor_seq"))
}).distinct().collect()
doctor_seq.foreach(f => {
//拼接成 MYSQL中的表名
val my_table: String = SyncDataConfig.MYSQL_TABLE1 + f
println(s"--------------load data to pica_ds.pica_portal_campaign_doctor from $my_table ----------------------------------")
val sql:String = SyncDataConfig.Hive_TABLE1_SQL + my_table + " where created_time < current_date()"
MyUtil.loadMysqlToHive(sparkSession,MySQLConfig.URL,my_table,sql)
})
//3.获取organization_seq的值,去重,导入organization数据
val organization_seq: Array[Int] = docOrgRDD.map(row => {
(row.getAs[Int]("table_organization_seq"))
}).distinct().collect()
organization_seq.foreach(f => {
//拼接成 MYSQL中的表名
val my_table: String = SyncDataConfig.MYSQL_TABLE2 + f
println(s"--------------load data to pica_ds.pica_portal_campaign_organization from $my_table ---------------------------")
val sql:String = SyncDataConfig.Hive_TABLE2_SQL + my_table + " where created_time < current_date()"
MyUtil.loadMysqlToHive(sparkSession,MySQLConfig.URL,my_table,sql)
})
//4.department表
val loadDepSql: String = s"select id,project_id,content,id_type," +
s"delete_flag,created_id,created_time,modified_id,modified_time from ${SyncDataConfig.MYSQL_TABLE3} " +
s" where created_time < current_date()"
val departDF: DataFrame = MyUtil.loadMysqlToHive(sparkSession,MySQLConfig.URL,SyncDataConfig.MYSQL_TABLE3,loadDepSql)
val mapDS = getDataSet(departDF,sparkSession)
//4.1:直接映射成表执行HQL导入表
// mapDS.toDF("id","project_id","content","id_type","delete_flag","created_id","created_time","modified_id","modified_time").createOrReplaceTempView(SyncDataConfig.MYSQL_TABLE3)
// println("------------------------------load data to pica_ds.pica_portal_campaign_department------------------------")
// sparkSession.sql(s"insert overwrite table ${SyncDataConfig.DATABASE}.${SyncDataConfig.Hive_TABLE3} select * from ${SyncDataConfig.MYSQL_TABLE3}")
//4.2:DS写成parquet文件格式,然后 load进表
//注意:必须要把字段名和表中的字段名设置成一样,否则parquet文件导入Hive中后读不出数据
val mapDF: DataFrame = mapDS.toDF("id","project_id","content","id_type",
"delete_flag","created_id","created_time","modified_id","modified_time")
mapDF.write.mode(SaveMode.Overwrite).format("parquet")
.save(s"${SyncDataConfig.PARQUET_PATH}${SyncDataConfig.DATABASE1}.${SyncDataConfig.Hive_TABLE3}")
println("------------------------------load data to pica_ds.pica_portal_campaign_department------------------------")
//导入到表中
sparkSession.sql(s"load data INPATH '${SyncDataConfig.PARQUET_PATH}${SyncDataConfig.DATABASE1}.${SyncDataConfig.Hive_TABLE3}' " +
s"overwrite into table ${SyncDataConfig.DATABASE1}.${SyncDataConfig.Hive_TABLE3}")
//更新record表
println("-------------------------更新 schedule_job_record表--------------------------------------")
//任务结束,更新 record 配置表
val updateSQL: String =
s"""
|update ${MySQLConfig.HDFS_TABLE} set status=?,end_time=? where job_id=1814 and start_time='${startTime}'""".stripMargin
val endTime: String = DateUtils.getTodayDate
val upreSta: PreparedStatement = connSql.prepareStatement(updateSQL)
upreSta.setString(1,"1")
upreSta.setString(2,endTime)
//更新表数据
upreSta.executeUpdate()
//关闭连接
JDBCUtil.close(connSql,upreSta)
}catch {
case e:Exception => {
println("-------------------------任务异常---------------------------------------------------")
val exceptionSQL: String =
s"""
|update ${MySQLConfig.HDFS_TABLE} set status=?,exception=?,end_time=? where job_id=1814 and start_time='${startTime}'""".stripMargin
val errorArr = Array[String]("2",e.getMessage,DateUtils.getTodayDate)
JDBCUtil.insertRecord(connSql,exceptionSQL,errorArr)
connSql.close()
}
}
}
def getDataSet(df: DataFrame,sparkSession: SparkSession):
Dataset[(Long, Long, String, Int, Int, Long, Timestamp, Long, Timestamp)] = {
import sparkSession.implicits._
val mapDS: Dataset[(Long, Long, String, Int, Int, Long, Timestamp, Long, Timestamp)] = df.map(row => {
val list: ListBuffer[(Long, Long, String, Int, Int, Long, Timestamp, Long, Timestamp)] =
ListBuffer[(Long, Long, String, Int, Int, Long, Timestamp, Long, Timestamp)]()
val id: Long = row.getAs[Long]("id")
val project_id: Long = row.getAs[Long]("project_id")
val content: String = row.getAs[String]("content")
val id_type: Int = row.getAs[Int]("id_type")
val delete_flag: Int = row.getAs[Int]("delete_flag")
val created_id: Long = row.getAs[Long]("created_id")
val created_time: Timestamp = row.getAs[Timestamp]("created_time")
val modified_id: Long = row.getAs[Long]("modified_id")
val modified_time: Timestamp = row.getAs[Timestamp]("modified_time")
//含有"|"切割
if (content.contains("|")) {
//注意:要转义字符
val strs: Array[String] = content.split("\\|")
for (i <- 0 until strs.length) {
list += ((id, project_id, strs(i), id_type, delete_flag, created_id, created_time, modified_id, modified_time))
}
} else {
list += ((id, project_id, content, id_type, delete_flag, created_id, created_time, modified_id, modified_time))
}
list
}).flatMap(f => f)
mapDS
}
}
package com.data
import java.sql.{PreparedStatement, Timestamp}
import com.DateUtils
import com.utils.{JDBCUtil, MySQLConfig, MyUtil, SyncDataConfig}
import org.apache.spark.SparkConf
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.sql.{DataFrame, Dataset, SaveMode, SparkSession}
import scala.collection.mutable.ListBuffer
/**
* 同步处理 MySQL表中的 pica.portal_project_attachregion 到 Hive中
* 截取出满足条件的需要的区域字段
* @Author zhenxin.ma
* @Date 2019/10/21 10:37
* @Version 1.0
*/
class SyncPortalProjectAttachregion {
}
object SyncPortalProjectAttachregion {
def main(args: Array[String]): Unit = {
//1.执行任务之前先往record表记录
val insertSQL: String =
s"""
|insert into ${MySQLConfig.HDFS_BASE}.${MySQLConfig.HDFS_TABLE} (job_id,job_name,job_type,job_scn,status,start_time)
|values(1881,'${SyncDataConfig.DATABASE1}.pica_portal_project_attachregion','0',?,'0',?)""".stripMargin
//设置同步数据的批次号,格式是20190912
val yesterdayTime: String = DateUtils.getYesterdayTime(-1).split(" ")(0)
val scnData: String = yesterdayTime.replace("-","")
//设置任务开始时间,格式是2019-09-12 14:03:30
val startTime: String = DateUtils.getTodayDate
//存储SQL中的参数
val insertArr: Array[String] = Array[String](scnData,startTime)
//获取MYSQL连接
val connSql: java.sql.Connection = JDBCUtil.getConnection()
//插入数据
val flag: Int = JDBCUtil.insertRecord(connSql,insertSQL,insertArr)
//执行SPARK任务
try {
val conf: SparkConf = new SparkConf().setAppName("SyncDataTask")
conf.set("spark.serializer", classOf[KryoSerializer].getName)
// 序列化时使用的内存缓冲区大小
conf.set("spark.kryoserializer.buffer.max", "128m")
// 启用rdd压缩
conf.set("spark.rdd.compress", "true")
// 设置压缩格式为lz4, 默认也就是lz4, 这种压缩格式压缩比高, 速度快, 但是耗费的内存相对也多一些
conf.set("spark.io.compression.codec", "snappy")
// 设置压缩时使用的内存缓冲区大小
conf.set("spark.io.compression.snappy.blockSize", "64k")
val sparkSession: SparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
val loadSql: String = s"select * from ${SyncDataConfig.MYSQL_TABLE4} where created_time < current_date()"
val df: DataFrame = MyUtil.loadMysqlToHive(sparkSession,MySQLConfig.URL,SyncDataConfig.MYSQL_TABLE4,loadSql)
//处理逻辑
val resultDS = getDataSet(df,sparkSession)
//转换成表字段
val parquetDF: DataFrame = resultDS.toDF(
"id", "project_id", "doctor_id", "content"
, "country_id", "province_id", "city_id", "county_id", "town_id"
, "id_type", "delete_flag", "created_id",
"created_time", "modified_id", "modified_time")
parquetDF.write.mode(SaveMode.Overwrite).format("parquet")
.save(s"${SyncDataConfig.PARQUET_PATH}${SyncDataConfig.DATABASE1}.${SyncDataConfig.Hive_TABLE4}")
println("------------------------------load data to pica_ds.pica_portal_project_attachregion------------------------")
//导入到表中
sparkSession.sql(s"load data INPATH '${SyncDataConfig.PARQUET_PATH}${SyncDataConfig.DATABASE1}.${SyncDataConfig.Hive_TABLE4}' " +
s"overwrite into table ${SyncDataConfig.DATABASE1}.${SyncDataConfig.Hive_TABLE4}")
println("-------------------------更新 schedule_job_record表--------------------------------------")
//任务结束,更新 record 配置表
val updateSQL: String =
s"""
|update ${MySQLConfig.HDFS_TABLE} set status=?,end_time=? where job_id=1881 and start_time='${startTime}'""".stripMargin
val endTime: String = DateUtils.getTodayDate
val upreSta: PreparedStatement = connSql.prepareStatement(updateSQL)
upreSta.setString(1,"1")
upreSta.setString(2,endTime)
//更新表数据
upreSta.executeUpdate()
//关闭连接
JDBCUtil.close(connSql,upreSta)
}catch {
case e:Exception => {
println("-------------------------任务异常---------------------------------------------------")
val exceptionSQL: String =
s"""
|update ${MySQLConfig.HDFS_TABLE} set status=?,exception=?,end_time=? where job_id=1881 and start_time='${startTime}'""".stripMargin
val errorArr = Array[String]("2",e.getMessage,DateUtils.getTodayDate)
JDBCUtil.insertRecord(connSql,exceptionSQL,errorArr)
connSql.close()
}
}
}
def getDataSet(df: DataFrame,sparkSession:SparkSession):
Dataset[(Long, Long, Long, String, String, Long, Long, Long, Long, Int, Int, Long, Timestamp, Long, Timestamp)] = {
import sparkSession.implicits._
val value: Dataset[(Long, Long, Long, String, String, Long, Long, Long, Long, Int, Int, Long, Timestamp, Long, Timestamp)] =
df.map(row => {
val list: ListBuffer[(Long, Long, Long, String, String, Long, Long, Long, Long, Int, Int, Long, Timestamp, Long, Timestamp)] =
ListBuffer[(Long, Long, Long, String, String, Long, Long, Long, Long, Int, Int, Long, Timestamp, Long, Timestamp)]()
val content: String = row.getAs[String]("content")
var country: String = "0"
var province: Long = 0
var city: Long = 0
var county: Long = 0
var town: Long = 0
if (!content.contains(":1")) {
list += ((row.getAs[Long]("id"), row.getAs[Long]("project_id"),
row.getAs[Long]("doctor_id"), content, country, province, city, county, town,
row.getAs[Int]("id_type"), row.getAs[Int]("delete_flag"),
row.getAs[Long]("created_id"), row.getAs[Timestamp]("created_time"),
row.getAs[Long]("modified_id"), row.getAs[Timestamp]("modified_time")))
} else if (content.contains(":1") && content.contains("|")) {
//注意:要转义字符
val strs: Array[String] = content.split("\\|")
for (i <- 0 until strs.length) {
if (strs(i).contains(":")) {
val info: Array[String] = strs(i).split(":")
if ("1".equals(info(1))) {
if (info(0).contains("_")) {
val address: Array[String] = info(0).split("_")
address.length match {
case 2 => {
country = address(0)
province = address(1).toLong
}
case 3 => {
country = address(0)
province = address(1).toLong
city = address(2).toLong
}
case 4 => {
country = address(0)
province = address(1).toLong
city = address(2).toLong
county = address(3).toLong
}
case 5 => {
country = address(0)
province = address(1).toLong
city = address(2).toLong
county = address(3).toLong
town = address(4).toLong
}
case _ => {
}
}
} else {
country = info(0)
}
list += ((row.getAs[Long]("id"), row.getAs[Long]("project_id"),
row.getAs[Long]("doctor_id"),
content, country, province, city, county, town,
row.getAs[Int]("id_type"), row.getAs[Int]("delete_flag"),
row.getAs[Long]("created_id"), row.getAs[Timestamp]("created_time"),
row.getAs[Long]("modified_id"), row.getAs[Timestamp]("modified_time")))
}
}
}
} else {
val info: Array[String] = content.split(":")
if (info(0).contains("_")) {
val address: Array[String] = info(0).split("_")
address.length match {
case 2 => {
country = address(0)
province = address(1).toLong
}
case 3 => {
country = address(0)
province = address(1).toLong
city = address(2).toLong
}
case 4 => {
country = address(0)
province = address(1).toLong
city = address(2).toLong
county = address(3).toLong
}
case 5 => {
country = address(0)
province = address(1).toLong
city = address(2).toLong
county = address(3).toLong
town = address(4).toLong
}
case _ => {}
}
} else {
country = info(0)
}
list += ((row.getAs[Long]("id"), row.getAs[Long]("project_id"),
row.getAs[Long]("doctor_id"), content, country, province, city, county, town,
row.getAs[Int]("id_type"), row.getAs[Int]("delete_flag"),
row.getAs[Long]("created_id"), row.getAs[Timestamp]("created_time"),
row.getAs[Long]("modified_id"), row.getAs[Timestamp]("modified_time")))
}
list
}).flatMap(f => f)
value
}
}
Markdown 格式
0% or
您添加了 0 到此讨论。请谨慎行事。
先完成此消息的编辑!
想要评论请 注册