提交 d458386b 编写于 作者: wuyunfeng's avatar wuyunfeng

修改SessionProcess为批量执行

上级 1a0bc349
...@@ -51,54 +51,62 @@ object SessionProcess { ...@@ -51,54 +51,62 @@ object SessionProcess {
val sessionProcess: SessionProcess = SessionProcess() val sessionProcess: SessionProcess = SessionProcess()
//step1:获取源数据,重新分区,产生shuffle,Spark读Hive默认的分区数太少,并对数据去重 //step1:获取源数据,重新分区,产生shuffle,Spark读Hive默认的分区数太少,并对数据去重
var sourceDF: DataFrame = sessionProcess.sparkSession.sql(MyConfigSession.SOURCE_SQL+s" and created_day='${scnData}'").repartition(200).distinct() var sourceDF: DataFrame = sessionProcess.sparkSession.sql(MyConfigSession.SOURCE_SQL+s" and created_day='${scnData}'").repartition(200).distinct()
var conditionGroup = List( "<='2'","between '3' and '5'","between '6' and '8'","between '9' and 'b'",">='c'" )
var dataCount = 0
var index = 0
sourceDF.persist(StorageLevel.MEMORY_AND_DISK_SER)
for(condition <- conditionGroup){
index += 1
val slideDF = sourceDF.where(s" SUBSTRING(pseudo_session,1,1) ${condition}").repartition(100)
//step2:抽取出当天pseudo_session对应的非空的device_token,doctor_id,mobile,补充到对应的pseudo_session下这几项为空的记录中 //step2:抽取出当天pseudo_session对应的非空的device_token,doctor_id,mobile,补充到对应的pseudo_session下这几项为空的记录中
val groupRdd = sourceDF.rdd.groupBy(r => r.getAs[String]("pseudo_session")) val groupRdd = slideDF.rdd.groupBy(r => r.getAs[String]("pseudo_session"))
val resRdd = groupRdd.flatMap(g => { val resRdd = groupRdd.flatMap(g => {
val pseudo_session = g._1 val pseudo_session = g._1
val resList: ListBuffer[Row] = new ListBuffer[Row]() val resList: ListBuffer[Row] = new ListBuffer[Row]()
var rowList = g._2 var rowList = g._2
rowList = rowList.toList.sortWith((x,y)=>x.getAs[String]("created") > y.getAs[String]("created"))//按created由大到小排序 rowList = rowList.toList.sortWith((x,y)=>x.getAs[String]("created") > y.getAs[String]("created"))//按created由大到小排序
var thisDeviceToken = "" var thisDeviceToken = ""
var thisDoctorId = "0" var thisDoctorId = "0"
var thisMobile = "" var thisMobile = ""
rowList.foreach(row => { rowList.foreach(row => {
var deviceToken = row.getAs[String]("device_token") var deviceToken = row.getAs[String]("device_token")
var doctorId = row.getAs[String]("doctor_id") var doctorId = row.getAs[String]("doctor_id")
var mobile = row.getAs[String]("mobile") var mobile = row.getAs[String]("mobile")
val created = row.getAs[String]("created") val created = row.getAs[String]("created")
if(deviceToken!=null && !deviceToken.equals("") ){ if(deviceToken!=null && !deviceToken.equals("") ){
thisDeviceToken = deviceToken thisDeviceToken = deviceToken
}else { }else {
deviceToken = thisDeviceToken deviceToken = thisDeviceToken
} }
if(doctorId!=null && !doctorId.equals("") && !doctorId.equals("0") ){ if(doctorId!=null && !doctorId.equals("") && !doctorId.equals("0") ){
thisDoctorId = doctorId thisDoctorId = doctorId
}else { }else {
doctorId = thisDoctorId doctorId = thisDoctorId
} }
if(mobile!=null && !mobile.equals("") ){ if(mobile!=null && !mobile.equals("") ){
thisMobile = mobile thisMobile = mobile
}else { }else {
mobile = thisMobile mobile = thisMobile
} }
resList += (Row(row.getAs[String]("pseudo_session"), resList += (Row(row.getAs[String]("pseudo_session"),
doctorId, doctorId,
mobile, mobile,
deviceToken, deviceToken,
row.getAs[String]("user_token_tourist"), row.getAs[String]("user_token_tourist"),
row.getAs[String]("class_name"), row.getAs[String]("class_name"),
row.getAs[String]("view_path"), row.getAs[String]("view_path"),
row.getAs[String]("action"), row.getAs[String]("action"),
row.getAs[String]("component_tag"), row.getAs[String]("component_tag"),
row.getAs[String]("app_version"), row.getAs[String]("app_version"),
row.getAs[String]("device_type"), row.getAs[String]("device_type"),
row.getAs[String]("device_brand"), row.getAs[String]("device_brand"),
row.getAs[String]("device_model"), row.getAs[String]("device_model"),
row.getAs[String]("network_type"), row.getAs[String]("network_type"),
row.getAs[String]("created"))) row.getAs[String]("created")))
}) })
resList.iterator resList.iterator
}) })
import sessionProcess.sparkSession.implicits._ import sessionProcess.sparkSession.implicits._
...@@ -106,30 +114,33 @@ object SessionProcess { ...@@ -106,30 +114,33 @@ object SessionProcess {
val resDF = sessionProcess.sparkSession.createDataFrame(resRdd,sourceDF.schema) val resDF = sessionProcess.sparkSession.createDataFrame(resRdd,sourceDF.schema)
resDF.persist(StorageLevel.MEMORY_AND_DISK_SER) resDF.persist(StorageLevel.MEMORY_AND_DISK_SER)
println("resDF.show=======>") println("resDF.show=======>")
// resDF.show() // resDF.show()
val data: RDD[Row] = resDF.rdd.mapPartitions(sessionProcess.filterRows) val data: RDD[Row] = resDF.rdd.mapPartitions(sessionProcess.filterRows)
println("---------------------------------------process columns-------------------------------------------") println("---------------------------------------process columns-------------------------------------------")
val baseDF: DataFrame = data.mapPartitions(sessionProcess.processColumns) val baseDF: DataFrame = data.mapPartitions(sessionProcess.processColumns)
.toDF("pseudo_session", "user_id", "mobile", "device_token", "user_token", "view_class", "view_path", .toDF("pseudo_session", "user_id", "mobile", "device_token", "user_token", "view_class", "view_path",
"action_type", "component_tag", "menu_code", "action_code", "position", "label_value","label_class","module_class1","module_class2", "app_version", "action_type", "component_tag", "menu_code", "action_code", "position", "label_value","label_class","module_class1","module_class2", "app_version",
"device_type", "device_brand", "device_model", "device_system", "net_type", "created_time", "date_time") "device_type", "device_brand", "device_model", "device_system", "net_type", "created_time", "date_time")
println("----------------------------------compute session id---------------------------------------------") println("----------------------------------compute session id---------------------------------------------")
val sessionIdDF: DataFrame = sessionProcess.getSessionId(baseDF,sessionProcess) val sessionIdDF: DataFrame = sessionProcess.getSessionId(baseDF,sessionProcess)
//默认缓存级别是:MEMORY_AND_DISK //默认缓存级别是:MEMORY_AND_DISK
sessionIdDF.persist(StorageLevel.MEMORY_AND_DISK_SER) sessionIdDF.persist(StorageLevel.MEMORY_AND_DISK_SER)
println("sessionIdDF.show=======>") println("sessionIdDF.show=======>")
// sessionIdDF.show()
println("-------------------------------match user_id 逻辑-------------------------------------------------") println("-------------------------------match user_id 逻辑-------------------------------------------------")
val dwFactLogSession: DataFrame = sessionProcess.matchUserId(sessionIdDF,sessionProcess.sparkSession,scnData) val dwFactLogSession: DataFrame = sessionProcess.matchUserId(sessionIdDF,sessionProcess.sparkSession,scnData)
println("dwFactLogSession.show=======>") println("dwFactLogSession.show=======>")
// dwFactLogSession.show()
println("-----------------create view fact_log_session and load to dw_fact_log_session--------------------") println("-----------------create view fact_log_session and load to dw_fact_log_session--------------------")
dwFactLogSession.createOrReplaceTempView("fact_log_session") dwFactLogSession.createOrReplaceTempView("fact_log_session")
var insertMode = "insert overwrite"
if(index!=1){
insertMode = "insert into"
}
val loadDataSql = val loadDataSql =
s"insert overwrite table ${MyConfigSession.HIVE_TABLE1} partition(created_day='${scnData}') select * from fact_log_session distribute by rand()" s"${insertMode} table ${MyConfigSession.HIVE_TABLE1} partition(created_day='${scnData}') select * from fact_log_session distribute by rand()"
sessionProcess.sparkSession.sql(loadDataSql) sessionProcess.sparkSession.sql(loadDataSql)
dataCount = dataCount + dwFactLogSession.count().toInt
}
println("----------------------------------update task record table---------------------------------------") println("----------------------------------update task record table---------------------------------------")
//任务执行成功,更新 Mysql record 配置表 //任务执行成功,更新 Mysql record 配置表
...@@ -141,7 +152,7 @@ object SessionProcess { ...@@ -141,7 +152,7 @@ object SessionProcess {
val upreSta: PreparedStatement = connSql.prepareStatement(updateSQL) val upreSta: PreparedStatement = connSql.prepareStatement(updateSQL)
upreSta.setString(1, "1") upreSta.setString(1, "1")
upreSta.setString(2, endTime) upreSta.setString(2, endTime)
upreSta.setInt(3, sessionIdDF.count().toInt) upreSta.setInt(3, dataCount)
//更新表数据 //更新表数据
upreSta.executeUpdate() upreSta.executeUpdate()
//关闭连接 //关闭连接
......
Markdown 格式
0% or
您添加了 0 到此讨论。请谨慎行事。
先完成此消息的编辑!
想要评论请 注册