提交 70089c83 编写于 作者: zhenxin.ma's avatar zhenxin.ma

区域反推project层表生成逻辑

上级 0ea74495
package com.data
import java.sql.PreparedStatement
import com.DateUtils
import com.utils.{JDBCUtil, MySQLConfig, MyUtil, SyncDataConfig}
import org.apache.spark.SparkConf
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.sql.{DataFrame, Dataset, Row, SaveMode, SparkSession}
import scala.collection.mutable.ListBuffer
/**
* 区域反推中的 pica_projecet.attach_region_result中间表生成
* @Author zhenxin.ma
* @Date 2019/11/6 15:07
* @Version 1.0
*/
class SyncAttachRegionResult {
}
object SyncAttachRegionResult {
def main(args: Array[String]): Unit = {
//1.执行任务之前先往record表记录
val insertSQL: String =
s"""
|insert into ${MySQLConfig.HDFS_BASE}.${MySQLConfig.HDFS_TABLE} (job_id,job_name,job_type,job_scn,status,start_time)
|values(1892,'pica_project.attach_region_result','4',?,'0',?)""".stripMargin
//设置同步数据的批次号,格式是20190912
val yesterdayTime: String = DateUtils.getYesterdayTime(-1).split(" ")(0)
val scnData: String = yesterdayTime.replace("-","")
//设置任务开始时间,格式是2019-09-12 14:03:30
val startTime: String = DateUtils.getTodayDate
//存储SQL中的参数
val insertArr: Array[String] = Array[String](scnData,startTime)
//获取MYSQL连接
val connSql: java.sql.Connection = JDBCUtil.getConnection()
//插入数据
val flag: Int = JDBCUtil.insertRecord(connSql,insertSQL,insertArr)
try {
val conf: SparkConf = new SparkConf().setAppName("SyncAttachRegionResult")
MyUtil.setConfigure(conf)
val sparkSession: SparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
val df1: DataFrame = sparkSession.sql(SyncDataConfig.REGION_SQL1)
val df2: DataFrame = sparkSession.sql(SyncDataConfig.REGION_SQL2)
import sparkSession.implicits._
val ppact: Array[(Long, Long, Long, Long, Long)] = df2.map(row => {
(row.getAs[Long]("project_id"), row.getAs[Long]("province_id"),
row.getAs[Long]("city_id"), row.getAs[Long]("county_id"),
row.getAs[Long]("town_id"))
}).collect()
//封装成广播变量
val broadcast: Broadcast[Array[(Long, Long, Long, Long, Long)]] =
sparkSession.sparkContext.broadcast(ppact)
val reDS: Dataset[(Long, Long, Long, Long, Long, Long)] = df1.mapPartitions(it => {
//存储最终的结果,每行代表一个元组
val tuples: ListBuffer[(Long, Long, Long, Long, Long, Long)] = ListBuffer[(Long, Long, Long, Long, Long, Long)]()
val list: List[Row] = it.toList
list.foreach(row => {
var count: Int = 0
//注意:这里有类型的转换
val project_id: Long = row.getAs[Int]("project_id").toLong
val doctor_id: Long = row.getAs[Int]("doctor_id").toLong
val province_id: Long = row.getAs[Long]("province_id")
val city_id: Long = row.getAs[Long]("city_id")
val county_id: Long = row.getAs[Long]("county_id")
val town_id: Long = row.getAs[Long]("town_id")
if (province_id != 0) {
count = count + 1
}
if (city_id != 0) {
count = count + 1
}
if (county_id != 0) {
count = count + 1
}
if (town_id != 0) {
count = count + 1
}
//通过广播变量,在广播变量中查看
val broad: Array[(Long, Long, Long, Long, Long)] = broadcast.value
broad.foreach(tuple => {
if (count == 0 && project_id == tuple._1) {
tuples += ((project_id, doctor_id, tuple._2, tuple._3, tuple._4, tuple._5))
} else if (count == 1 && project_id == tuple._1 && province_id == tuple._2) {
tuples += ((project_id, doctor_id, tuple._2, tuple._3, tuple._4, tuple._5))
} else if (count == 2 && project_id == tuple._1 && city_id == tuple._3) {
tuples += ((project_id, doctor_id, tuple._2, tuple._3, tuple._4, tuple._5))
} else if (count == 3 && project_id == tuple._1 && county_id == tuple._4) {
tuples += ((project_id, doctor_id, tuple._2, tuple._3, tuple._4, tuple._5))
} else if (count == 4 && project_id == tuple._1 && town_id == tuple._5) {
tuples += ((project_id, doctor_id, tuple._2, tuple._3, tuple._4, tuple._5))
}
})
})
tuples.iterator
})
//转换为字段名
val reDF: DataFrame = reDS.toDF("project_id","doctor_id","province_id","city_id","county_id","town_id")
//写入到parquet文件中
reDF.write.mode(SaveMode.Overwrite).format("parquet")
.save(s"${SyncDataConfig.PARQUET_PATH}${SyncDataConfig.DATABASE2}.${SyncDataConfig.Hive_TABLE6}")
//导入到临时表中
sparkSession.sql(s"load data INPATH '${SyncDataConfig.PARQUET_PATH}${SyncDataConfig.DATABASE2}.${SyncDataConfig.Hive_TABLE6}' " +
s"overwrite into table ${SyncDataConfig.DATABASE2}.${SyncDataConfig.Hive_TABLE6}")
//更新record表
println("-------------------------更新 schedule_job_record表--------------------------------------")
//任务结束,更新 record 配置表
val updateSQL: String =
s"""
|update ${MySQLConfig.HDFS_TABLE} set status=?,end_time=? where job_id=1892 and start_time='${startTime}'""".stripMargin
val endTime: String = DateUtils.getTodayDate
val upreSta: PreparedStatement = connSql.prepareStatement(updateSQL)
upreSta.setString(1,"1")
upreSta.setString(2,endTime)
//更新表数据
upreSta.executeUpdate()
//关闭连接
JDBCUtil.close(connSql,upreSta)
}catch {
case e:Exception => {
println("-------------------------任务异常---------------------------------------------------")
val exceptionSQL: String =
s"""
|update ${MySQLConfig.HDFS_TABLE} set status=?,exception=?,end_time=? where job_id=1892 and start_time='${startTime}'""".stripMargin
val errorArr = Array[String]("2",e.getMessage,DateUtils.getTodayDate)
JDBCUtil.insertRecord(connSql,exceptionSQL,errorArr)
connSql.close()
}
}
}
}
Markdown 格式
0% or
您添加了 0 到此讨论。请谨慎行事。
先完成此消息的编辑!
想要评论请 注册