提交 17d9e537 编写于 作者: wuyunfeng's avatar wuyunfeng

流量临时需求-新增label_class

上级 3f553a85
...@@ -9,6 +9,8 @@ package com.config ...@@ -9,6 +9,8 @@ package com.config
*/ */
object MyConfigSession { object MyConfigSession {
//Hive的DW层流量表 //Hive的DW层流量表
final val HIVE_TABLE1_TMP: String = "pica_dw.dw_fact_log_session_tmp"
final val HIVE_TABLE2_TMP: String = "pica_dw.dw_fact_log_session_path_tmp"
final val HIVE_TABLE1: String = "pica_dw.dw_fact_log_session" final val HIVE_TABLE1: String = "pica_dw.dw_fact_log_session"
final val HIVE_TABLE2: String = "pica_dw.dw_fact_log_session_path" final val HIVE_TABLE2: String = "pica_dw.dw_fact_log_session_path"
final val HIVE_TABLE3: String = "pica_dw.dw_fact_log_session_heart" final val HIVE_TABLE3: String = "pica_dw.dw_fact_log_session_heart"
...@@ -19,19 +21,18 @@ object MyConfigSession { ...@@ -19,19 +21,18 @@ object MyConfigSession {
//流量表中的临时视图 //流量表中的临时视图
final val VIEW_SESSION_ODS: String = "ods_session" final val VIEW_SESSION_ODS: String = "ods_session"
final val VIEW_SESSION_NO_MATCH: String = "ods_session_no_user_id" final val VIEW_SESSION_NO_MATCH: String = "ods_session_no_user_id"
final val VIEW_MOBILE_PHONE: String = "mobile_phone_match"
final val VIEW_EQUIPMENT_INFO: String = "equipment_info" final val VIEW_EQUIPMENT_INFO: String = "equipment_info"
final val VIEW_DEVICE_TOKEN: String = "device_token_match" final val VIEW_DEVICE_TOKEN: String = "device_token_match"
//流量表中使用的三个字典表作为过滤数据条件 //流量表中使用的三个字典表作为过滤数据条件
final val ACTION_TYPE_SQL: String = "select action_type,'1' as is_valid from pica_dw.dw_dim_log_action_type where is_valid=1" final val ACTION_TYPE_SQL: String = "select action_type,'1' as is_valid from pica_dw.dw_dim_log_action_type where is_valid=1"
final val ACTION_TYPE_SQL_HEART: String = "select action_type,'1' as is_valid from pica_dw.dw_dim_log_action_type where action_type='ACTION_HEART_BEAT'" final val ACTION_TYPE_SQL_HEART: String = "select action_type,'1' as is_valid from pica_dw.dw_dim_log_action_type where action_type='ACTION_HEART_BEAT'"
final val CLASS_NAME_SQL: String = "select class_name, '0' as is_valid from pica_dw.dw_dim_log_class_name where is_valid=0" final val CLASS_NAME_SQL: String = "select class_name, '0' as is_valid from pica_dw.dw_dim_log_class_name where is_valid=0"
final val MENU_CODE_SQL: String = "select view_path, menu_code from pica_dw.dw_dim_log_menu_class_code where view_path is not Null" final val MENU_CODE_SQL: String = "select distinct view_path, menu_code from pica_dw.dw_dim_log_menu_class_code where view_path is not Null"
//流量表中根据action_type获取对应的action_category类型 //流量表中根据action_type获取对应的action_category类型
final val ACTION_CATEGORY_SQL: String = "select action_type,action_category from pica_dw.dw_dim_log_action_type where is_valid=1" final val ACTION_CATEGORY_SQL: String = "select action_type,action_category from pica_dw.dw_dim_log_action_type where is_valid=1"
final val ACTION_CATEGORY_SQL_HEART: String = "select action_type,action_category from pica_dw.dw_dim_log_action_type where action_type='ACTION_HEART_BEAT'" final val ACTION_CATEGORY_SQL_HEART: String = "select action_type,action_category from pica_dw.dw_dim_log_action_type where action_type='ACTION_HEART_BEAT'"
final val ACTION_URLLABEL_SQL:String = "select url_content,label_value from pica_dw.dw_dim_log_action_urllabel "
//从源表pica_log.picalog_trace_app_part中执行SQL获取源数据,这里获取昨天的 //从源表pica_log.picalog_trace_app_part中执行SQL获取源数据,这里获取昨天的
...@@ -39,9 +40,10 @@ object MyConfigSession { ...@@ -39,9 +40,10 @@ object MyConfigSession {
""" """
|select pseudo_session,doctor_id,mobile,device_token,user_token_tourist,class_name,view_path,action, |select pseudo_session,doctor_id,mobile,device_token,user_token_tourist,class_name,view_path,action,
|component_tag,app_version,device_type,device_brand,device_model,network_type,created from pica_log.picalog_trace_app_part |component_tag,app_version,device_type,device_brand,device_model,network_type,created from pica_log.picalog_trace_app_part
| where created_day = DATE_SUB(current_date(),1) and pseudo_session is not null and pseudo_session !='' | where pseudo_session is not null and pseudo_session !=''
| and pseudo_id !='' and extra_info !='com.picahealth.patient' and serviceName != 'trace3' | and pseudo_id !='' and extra_info !='com.picahealth.patient' and serviceName != 'trace3'
""".stripMargin | and FROM_UNIXTIME(cast(substring(created,1,10) as bigint),'yyyy-MM-dd')=created_day and created is not null and created!=''
""".stripMargin //and `action`!='ACTION_EQUIP_INFO'
//从源表pica_log.picalog_trace_app_part中执行SQL获取源数据,输入指定日期参数时执行的SQL //从源表pica_log.picalog_trace_app_part中执行SQL获取源数据,输入指定日期参数时执行的SQL
...@@ -51,16 +53,18 @@ object MyConfigSession { ...@@ -51,16 +53,18 @@ object MyConfigSession {
|component_tag,app_version,device_type,device_brand,device_model,network_type,created from pica_log.picalog_trace_app_part |component_tag,app_version,device_type,device_brand,device_model,network_type,created from pica_log.picalog_trace_app_part
| where pseudo_session is not null and pseudo_session !='' | where pseudo_session is not null and pseudo_session !=''
| and pseudo_id !='' and extra_info !='com.picahealth.patient' and serviceName != 'trace3' | and pseudo_id !='' and extra_info !='com.picahealth.patient' and serviceName != 'trace3'
| and created is not null and created!='' and `action`!='ACTION_EQUIP_INFO'
""".stripMargin """.stripMargin
//从dw_fact_log_session表中筛选数据 //从dw_fact_log_session表中筛选数据
final val SOURCE_SQL_PATH: String = final val SOURCE_SQL_PATH: String =
s""" s"""
|select session_id,cast(user_id as int) user_id,action_type,user_token,menu_code,action_code,position,label_value, |select id log_session_id, session_id,user_id_int user_id,action_type,user_token,menu_code,action_code,position,label_value,label_class,
|app_version,device_type,created_time,date_time from ${MyConfigSession.HIVE_TABLE1} |app_version,device_type,created_time,date_time from ${MyConfigSession.HIVE_TABLE1}
| where created_day=DATE_SUB(current_date(),1) and app_version >= '3.1.7' | where app_version >= '3.1.7'
| AND ((action_type ='ACTION_CLICK') OR (action_type ='ACTION_VIEW' and menu_code != '0' and menu_code !='null' and menu_code !='')) | AND ((action_type ='ACTION_CLICK' and action_code != 'null' ) OR action_type ='ACTION_VIEW' )
| and (menu_code != '0' and menu_code !='null' and menu_code !='' and length(menu_code) <= 3 )
""".stripMargin """.stripMargin
...@@ -71,12 +75,22 @@ object MyConfigSession { ...@@ -71,12 +75,22 @@ object MyConfigSession {
s""" s"""
|SELECT t.session_id, COALESCE(cast(b.id as string),'0') AS user_id, t.mobile, t.device_token, t.user_token, |SELECT t.session_id, COALESCE(cast(b.id as string),'0') AS user_id, t.mobile, t.device_token, t.user_token,
|t.view_class,t.view_path,t.action_type,t.component_tag, t.menu_code, |t.view_class,t.view_path,t.action_type,t.component_tag, t.menu_code,
|t.action_code, t.position, t.label_value,t.app_version,t.device_type, |t.action_code, t.position, t.label_value,t.label_class,t.app_version,t.device_type,
|t.device_brand, t.device_model, t.device_system,t.net_type,t.created_time, |t.device_brand, t.device_model, t.device_system,t.net_type,t.created_time,
|t.date_time from ${MyConfigSession.VIEW_SESSION_ODS} as t |t.date_time from ${MyConfigSession.VIEW_SESSION_ODS} as t
|left join pica_ds.pica_doctor as b on t.user_id = cast(b.id as string) |left join pica_ds.pica_doctor as b on t.user_id = cast(b.id as string)
""".stripMargin """.stripMargin
//1.使用equipment表匹配,默认是昨天的 //1.针对没有匹配到的user_id,先使用 mobile_phone 进行匹配,得到 user_id 匹配,'0'
final val MOBILE_PHONE_SQL: String =
s"""
|SELECT ss.session_id, COALESCE(cast(b.id as string),'0') AS user_id, ss.mobile, ss.device_token, ss.user_token,
|ss.view_class,ss.view_path,ss.action_type,ss.component_tag, ss.menu_code,
|ss.action_code, ss.position,ss.label_value,ss.label_class,ss.app_version, ss.device_type,
|ss.device_brand, ss.device_model,ss.device_system,ss.net_type,ss.created_time,
|ss.date_time from ${MyConfigSession.VIEW_SESSION_NO_MATCH} AS ss
|left join (select distinct id,mobile_phone from pica_ds.pica_doctor where pica_doctor.delete_flag = 1 ) AS b on ss.mobile = b.mobile_phone
""".stripMargin
//2.使用equipment表匹配,默认是昨天的
final val EQUIPMENT_INFO_SQL: String = final val EQUIPMENT_INFO_SQL: String =
""" """
|SELECT a.user_id,a.device_token ,ROW_NUMBER() OVER ( PARTITION BY a.device_token ORDER BY a.creat_time DESC ) row_d |SELECT a.user_id,a.device_token ,ROW_NUMBER() OVER ( PARTITION BY a.device_token ORDER BY a.creat_time DESC ) row_d
...@@ -98,23 +112,13 @@ object MyConfigSession { ...@@ -98,23 +112,13 @@ object MyConfigSession {
s""" s"""
|SELECT t.session_id, COALESCE(cast(b.user_id as string),'0') AS user_id, t.mobile, t.device_token, t.user_token, |SELECT t.session_id, COALESCE(cast(b.user_id as string),'0') AS user_id, t.mobile, t.device_token, t.user_token,
|t.view_class,t.view_path,t.action_type,t.component_tag, t.menu_code, |t.view_class,t.view_path,t.action_type,t.component_tag, t.menu_code,
|t.action_code, t.position, t.label_value,t.app_version,t.device_type, |t.action_code, t.position, t.label_value,t.label_class,t.app_version,t.device_type,
|t.device_brand, t.device_model, t.device_system,t.net_type,t.created_time, |t.device_brand, t.device_model, t.device_system,t.net_type,t.created_time,
|t.date_time from ${MyConfigSession.VIEW_SESSION_NO_MATCH} as t |t.date_time from (select * from ${MyConfigSession.VIEW_MOBILE_PHONE} a where a.user_id= '0' ) as t
|left join ${MyConfigSession.VIEW_EQUIPMENT_INFO} as b on t.device_token = b.device_token |left join ${MyConfigSession.VIEW_EQUIPMENT_INFO} as b on t.device_token = b.device_token
""".stripMargin """.stripMargin
//在device_token匹配的基础上,筛选出没有匹配到的user_id,使用 mobile_phone 进行匹配,得到 user_id 匹配,'0'
final val MOBILE_PHONE_SQL: String =
s"""
|SELECT ss.session_id, COALESCE(cast(b.id as string),'0') AS user_id, ss.mobile, ss.device_token, ss.user_token,
|ss.view_class,ss.view_path,ss.action_type,ss.component_tag, ss.menu_code,
|ss.action_code, ss.position,ss.label_value,ss.app_version, ss.device_type,
|ss.device_brand, ss.device_model,ss.device_system,ss.net_type,ss.created_time,
|ss.date_time from (select * from ${MyConfigSession.VIEW_DEVICE_TOKEN} as a where a.user_id = '0') AS ss
|left join (select id,mobile_phone from pica_ds.pica_doctor where pica_doctor.delete_flag = 1 ) AS b on ss.mobile = b.mobile_phone
""".stripMargin
......
...@@ -22,6 +22,150 @@ import scala.collection.mutable.ListBuffer ...@@ -22,6 +22,150 @@ import scala.collection.mutable.ListBuffer
* @Date 2020/3/26 14:23 * @Date 2020/3/26 14:23
* @Version 1.0 * @Version 1.0
*/ */
object SessionProcess {
def apply(): SessionProcess = new SessionProcess()
def main(args: Array[String]): Unit = {
//1.执行任务之前先往record表记录
val insertSQL: String =
s"""
|insert into ${MyConfigSession.DATA_BASE}.${MyConfigSession.JDBC_TABLE} (job_id,job_name,job_type,job_scn,status,start_time)
|values(1969,'pica_dw.dw_fact_log_session','3',?,'0',?)
""".stripMargin
//设置同步数据的批次号,格式是2019-09-12
var scnData: String = DateUtils.getYesterdayDate
if(args.length>=1){
scnData = args(0)
}
println(s"scnData=${scnData}")
//设置任务开始时间,格式是2019-09-12 14:03:30
val startTime: String = DateUtils.getTodayTime
//存储SQL中的参数
val insertArr: Array[String] = Array[String](scnData, startTime)
//获取MYSQL连接
val connSql: sql.Connection = JDBCUtil.getConnection()
//向 record 表插入数据
val flag: Int = JDBCUtil.insertRecord(connSql, insertSQL, insertArr)
try {
val sessionProcess: SessionProcess = SessionProcess()
//step1:获取源数据,重新分区,产生shuffle,Spark读Hive默认的分区数太少,并对数据去重
var sourceDF: DataFrame = sessionProcess.sparkSession.sql(MyConfigSession.SOURCE_SQL+s" and created_day='${scnData}'").repartition(200).distinct()
//step2:抽取出当天pseudo_session对应的非空的device_token,doctor_id,mobile,补充到对应的pseudo_session下这几项为空的记录中
val groupRdd = sourceDF.rdd.groupBy(r => r.getAs[String]("pseudo_session"))
val resRdd = groupRdd.flatMap(g => {
val pseudo_session = g._1
val resList: ListBuffer[Row] = new ListBuffer[Row]()
var rowList = g._2
rowList = rowList.toList.sortWith((x,y)=>x.getAs[String]("created") > y.getAs[String]("created"))//按created由大到小排序
var thisDeviceToken = ""
var thisDoctorId = "0"
var thisMobile = ""
rowList.foreach(row => {
var deviceToken = row.getAs[String]("device_token")
var doctorId = row.getAs[String]("doctor_id")
var mobile = row.getAs[String]("mobile")
val created = row.getAs[String]("created")
if(deviceToken!=null && !deviceToken.equals("") ){
thisDeviceToken = deviceToken
}else {
deviceToken = thisDeviceToken
}
if(doctorId!=null && !doctorId.equals("") && !doctorId.equals("0") ){
thisDoctorId = doctorId
}else {
doctorId = thisDoctorId
}
if(mobile!=null && !mobile.equals("") ){
thisMobile = mobile
}else {
mobile = thisMobile
}
resList += (Row(row.getAs[String]("pseudo_session"),
doctorId,
mobile,
deviceToken,
row.getAs[String]("user_token_tourist"),
row.getAs[String]("class_name"),
row.getAs[String]("view_path"),
row.getAs[String]("action"),
row.getAs[String]("component_tag"),
row.getAs[String]("app_version"),
row.getAs[String]("device_type"),
row.getAs[String]("device_brand"),
row.getAs[String]("device_model"),
row.getAs[String]("network_type"),
row.getAs[String]("created")))
})
resList.iterator
})
import sessionProcess.sparkSession.implicits._
//step3:根据映射表来进行action_type和class_name数据过滤
val resDF = sessionProcess.sparkSession.createDataFrame(resRdd,sourceDF.schema)
resDF.persist(StorageLevel.MEMORY_AND_DISK_SER)
println("resDF.show=======>")
// resDF.show()
val data: RDD[Row] = resDF.rdd.mapPartitions(sessionProcess.filterRows)
println("---------------------------------------process columns-------------------------------------------")
val baseDF: DataFrame = data.mapPartitions(sessionProcess.processColumns)
.toDF("pseudo_session", "user_id", "mobile", "device_token", "user_token", "view_class", "view_path",
"action_type", "component_tag", "menu_code", "action_code", "position", "label_value","label_class", "app_version",
"device_type", "device_brand", "device_model", "device_system", "net_type", "created_time", "date_time")
println("----------------------------------compute session id---------------------------------------------")
val sessionIdDF: DataFrame = sessionProcess.getSessionId(baseDF,sessionProcess)
//默认缓存级别是:MEMORY_AND_DISK
sessionIdDF.persist(StorageLevel.MEMORY_AND_DISK_SER)
println("sessionIdDF.show=======>")
// sessionIdDF.show()
println("-------------------------------match user_id 逻辑-------------------------------------------------")
val dwFactLogSession: DataFrame = sessionProcess.matchUserId(sessionIdDF,sessionProcess.sparkSession,scnData)
println("dwFactLogSession.show=======>")
// dwFactLogSession.show()
println("-----------------create view fact_log_session and load to dw_fact_log_session--------------------")
dwFactLogSession.createOrReplaceTempView("fact_log_session")
val loadDataSql =
s"insert overwrite table ${MyConfigSession.HIVE_TABLE1} partition(created_day='${scnData}') select * from fact_log_session distribute by rand()"
sessionProcess.sparkSession.sql(loadDataSql)
println("----------------------------------update task record table---------------------------------------")
//任务执行成功,更新 Mysql record 配置表
val updateSQL: String =
s"""
|update ${MyConfigSession.JDBC_TABLE} set status=?,end_time=?,data_count=? where job_id=1969 and start_time='${startTime}'
""".stripMargin
val endTime: String = DateUtils.getTodayTime
val upreSta: PreparedStatement = connSql.prepareStatement(updateSQL)
upreSta.setString(1, "1")
upreSta.setString(2, endTime)
upreSta.setInt(3, sessionIdDF.count().toInt)
//更新表数据
upreSta.executeUpdate()
//关闭连接
JDBCUtil.close(connSql, upreSta)
sessionProcess.sparkSession.stop()
} catch {
case e: Exception => {
println(s"-----------------------------------任务异常---------------------------------------------------")
e.printStackTrace()
val exceptionSQL: String =
s"""
|update ${MyConfigSession.JDBC_TABLE} set status=?,exception=?,end_time=? where job_id=1969 and start_time='${startTime}'
""".stripMargin
val errorArr = Array[String]("2", e.getMessage, DateUtils.getTodayTime)
JDBCUtil.insertRecord(connSql, exceptionSQL, errorArr)
connSql.close()
}
}
}
}
class SessionProcess extends java.io.Serializable{ class SessionProcess extends java.io.Serializable{
def getSparkSession(appName: String): SparkSession = { def getSparkSession(appName: String): SparkSession = {
val conf: SparkConf = new SparkConf().setAppName(appName) val conf: SparkConf = new SparkConf().setAppName(appName)
...@@ -83,7 +227,7 @@ class SessionProcess extends java.io.Serializable{ ...@@ -83,7 +227,7 @@ class SessionProcess extends java.io.Serializable{
//处理字段,得到需要的字段值 //处理字段,得到需要的字段值
val processColumns = (rows: Iterator[Row]) => { val processColumns = (rows: Iterator[Row]) => {
val baseList = new ListBuffer[(String, String, String, String, String, String, String, String, String, val baseList = new ListBuffer[(String, String, String, String, String, String, String, String, String,
String, String, String, String, String, String, String, String, String, String, String, String)]() String, String, String, String, String, String, String, String, String, String, String, String,String)]()
//关联到menu_code的映射表广播变量 //关联到menu_code的映射表广播变量
val path_menu: Map[String, String] = menuCodeBroad.value val path_menu: Map[String, String] = menuCodeBroad.value
//关联到action_category的映射表广播变量 //关联到action_category的映射表广播变量
...@@ -104,6 +248,7 @@ class SessionProcess extends java.io.Serializable{ ...@@ -104,6 +248,7 @@ class SessionProcess extends java.io.Serializable{
var action_code: String = "" var action_code: String = ""
var position: String = "" var position: String = ""
var label_value: String = "" var label_value: String = ""
var label_class:String = ""
//将符合要求的component_tag进行切割,获取 aciton_code,label_value //将符合要求的component_tag进行切割,获取 aciton_code,label_value
if (component_tag.contains("#")) { if (component_tag.contains("#")) {
//按照#号切割 //按照#号切割
...@@ -121,16 +266,25 @@ class SessionProcess extends java.io.Serializable{ ...@@ -121,16 +266,25 @@ class SessionProcess extends java.io.Serializable{
action_code = strs(1) action_code = strs(1)
position = strs(2) position = strs(2)
} }
case 4 => {
menu_code = strs(0)
action_code = strs(1)
position = strs(2)
label_value = strs(3).substring(0,math.min(250,strs(3).length))
}
case _ => { case _ => {
menu_code = strs(0) menu_code = strs(0)
action_code = strs(1) action_code = strs(1)
position = strs(2) position = strs(2)
label_value = strs(3).substring(0,math.min(250,strs(3).length)) label_value = strs(3).substring(0,math.min(250,strs(3).length))
label_class = strs(4).substring(0,math.min(250,strs(4).length))
} }
} }
} }
//匹配menu_code:如果上述截取出来的menu_code为'',null或者action is ACTION_VIEW //匹配menu_code:如果上述截取出来的menu_code为(''||null||0||length(menu_code)>3 ) and action is ACTION_VIEW
if (menu_code.equals("")|| menu_code.equals("null") || action_type.equals("ACTION_VIEW")) { if ((menu_code.equals("")|| menu_code.equals("null") || menu_code.equals("0") || menu_code.length> 3 )
&& action_type.equals("ACTION_VIEW")) {
menu_code = "0" //关联不上的显示为0
import scala.util.control.Breaks._ import scala.util.control.Breaks._
breakable { breakable {
//利用menu_code映射表匹配 //利用menu_code映射表匹配
...@@ -158,7 +312,7 @@ class SessionProcess extends java.io.Serializable{ ...@@ -158,7 +312,7 @@ class SessionProcess extends java.io.Serializable{
StringUtils.getNotNullString(row.getAs[String]("user_token_tourist")), StringUtils.getNotNullString(row.getAs[String]("user_token_tourist")),
StringUtils.getNotNullString(row.getAs[String]("class_name")), StringUtils.getNotNullString(row.getAs[String]("class_name")),
StringUtils.getNotNullString(row.getAs[String]("view_path")), StringUtils.getNotNullString(row.getAs[String]("view_path")),
action_type, component_tag, menu_code, action_code, position, label_value, action_type, component_tag, menu_code, action_code, position, label_value,label_class,
StringUtils.getNotNullString(row.getAs[String]("app_version")), StringUtils.getNotNullString(row.getAs[String]("app_version")),
StringUtils.getNotNullString(row.getAs[String]("device_type")), StringUtils.getNotNullString(row.getAs[String]("device_type")),
StringUtils.getNotNullString(row.getAs[String]("device_brand")), StringUtils.getNotNullString(row.getAs[String]("device_brand")),
...@@ -169,8 +323,6 @@ class SessionProcess extends java.io.Serializable{ ...@@ -169,8 +323,6 @@ class SessionProcess extends java.io.Serializable{
}) })
baseList.iterator baseList.iterator
} }
//按照time_gap 切割session,计算session_id //按照time_gap 切割session,计算session_id
val computeSessionId = (tuple: (String, Iterable[Row])) => { val computeSessionId = (tuple: (String, Iterable[Row])) => {
//先按照 pseudo_session 的值命名 sessionID //先按照 pseudo_session 的值命名 sessionID
...@@ -180,7 +332,7 @@ class SessionProcess extends java.io.Serializable{ ...@@ -180,7 +332,7 @@ class SessionProcess extends java.io.Serializable{
var count: Int = 0 var count: Int = 0
//存储一行的数据 //存储一行的数据
val list = new ListBuffer[(String, String, String, String, String, String, String, String, String, val list = new ListBuffer[(String, String, String, String, String, String, String, String, String,
String, String, String, String, String, String, String, String, String, String, String, String)]() String, String, String, String, String, String, String, String, String, String, String, String, String)]()
rowList.toList.foreach(row => { rowList.toList.foreach(row => {
val created: String = StringUtils.getNotNullString(row.getAs[String]("created_time")) val created: String = StringUtils.getNotNullString(row.getAs[String]("created_time"))
val refer_created: String = StringUtils.getNotNullString(row.getAs[String]("refer_created")) val refer_created: String = StringUtils.getNotNullString(row.getAs[String]("refer_created"))
...@@ -203,6 +355,7 @@ class SessionProcess extends java.io.Serializable{ ...@@ -203,6 +355,7 @@ class SessionProcess extends java.io.Serializable{
StringUtils.getNotNullString(row.getAs[String]("action_code")), StringUtils.getNotNullString(row.getAs[String]("action_code")),
StringUtils.getNotNullString(row.getAs[String]("position")), StringUtils.getNotNullString(row.getAs[String]("position")),
StringUtils.getNotNullString(row.getAs[String]("label_value")), StringUtils.getNotNullString(row.getAs[String]("label_value")),
StringUtils.getNotNullString(row.getAs[String]("label_class")),
StringUtils.getNotNullString(row.getAs[String]("app_version")), StringUtils.getNotNullString(row.getAs[String]("app_version")),
StringUtils.getNotNullString(row.getAs[String]("device_type")), StringUtils.getNotNullString(row.getAs[String]("device_type")),
StringUtils.getNotNullString(row.getAs[String]("device_brand")), StringUtils.getNotNullString(row.getAs[String]("device_brand")),
...@@ -213,110 +366,12 @@ class SessionProcess extends java.io.Serializable{ ...@@ -213,110 +366,12 @@ class SessionProcess extends java.io.Serializable{
}) })
list list
} }
}
object SessionProcess {
def apply(): SessionProcess = new SessionProcess()
def main(args: Array[String]): Unit = {
//1.执行任务之前先往record表记录
val insertSQL: String =
s"""
|insert into ${MyConfigSession.DATA_BASE}.${MyConfigSession.JDBC_TABLE} (job_id,job_name,job_type,job_scn,status,start_time)
|values(1969,'pica_dw.dw_fact_log_session','3',?,'0',?)
""".stripMargin
//设置同步数据的批次号,格式是2019-09-12
val scnData: String = DateUtils.getYesterdayDate
//设置任务开始时间,格式是2019-09-12 14:03:30
val startTime: String = DateUtils.getTodayTime
//存储SQL中的参数
val insertArr: Array[String] = Array[String](scnData, startTime)
//获取MYSQL连接
val connSql: sql.Connection = JDBCUtil.getConnection()
//向 record 表插入数据
val flag: Int = JDBCUtil.insertRecord(connSql, insertSQL, insertArr)
try {
val sessionProcess: SessionProcess = SessionProcess()
//获取源数据
val sourceDF: DataFrame = sessionProcess.sparkSession.sql(MyConfigSession.SOURCE_SQL)
//1.重新分区,产生shuffle,Spark读Hive默认的分区数太少
//2.过滤数据,解析创建时间,只获取昨天产生的数据
//3.过滤重复的记录
//4.利用action_type和class_name过滤
val filterDS: Dataset[Row] = sourceDF.repartition(200).filter(row => {
var createdTime: String = row.getAs[String]("created")
//防止出错
if (createdTime == null) {
createdTime = "0"
}
//注意这里的过滤条件,数据的批次时间要和数据产生的年月日一样,也就是当天的数据
scnData.equals(DateUtils.milliSecondsFormatTime(createdTime).substring(0, 10))
}).distinct()
import sessionProcess.sparkSession.implicits._
//根据映射表来进行action_type和class_name数据过滤
val data: RDD[Row] = filterDS.rdd.mapPartitions(sessionProcess.filterRows)
println("---------------------------------------process columns-------------------------------------------")
val baseDF: DataFrame = data.mapPartitions(sessionProcess.processColumns)
.toDF("pseudo_session", "user_id", "mobile", "device_token", "user_token", "view_class", "view_path",
"action_type", "component_tag", "menu_code", "action_code", "position", "label_value", "app_version",
"device_type", "device_brand", "device_model", "device_system", "net_type", "created_time", "date_time")
println("----------------------------------compute session id---------------------------------------------")
val sessionIdDF: DataFrame = getSessionId(baseDF,sessionProcess)
//默认缓存级别是:MEMORY_AND_DISK
sessionIdDF.persist(StorageLevel.MEMORY_AND_DISK_SER)
println("-------------------------------match user_id 逻辑-------------------------------------------------")
val dwFactLogSession: DataFrame = matchUserId(sessionIdDF,sessionProcess.sparkSession,scnData)
println("-----------------create view fact_log_session and load to dw_fact_log_session--------------------")
dwFactLogSession.createOrReplaceTempView("fact_log_session")
val loadDataSql =
s"insert overwrite table ${MyConfigSession.HIVE_TABLE1} partition(created_day='${scnData}') select * from fact_log_session distribute by rand()"
sessionProcess.sparkSession.sql(loadDataSql)
println("----------------------------------update task record table---------------------------------------")
//任务执行成功,更新 Mysql record 配置表
val updateSQL: String =
s"""
|update ${MyConfigSession.JDBC_TABLE} set status=?,end_time=?,data_count=? where job_id=1969 and start_time='${startTime}'
""".stripMargin
val endTime: String = DateUtils.getTodayTime
val upreSta: PreparedStatement = connSql.prepareStatement(updateSQL)
upreSta.setString(1, "1")
upreSta.setString(2, endTime)
upreSta.setInt(3, sessionIdDF.count().toInt)
//更新表数据
upreSta.executeUpdate()
//关闭连接
JDBCUtil.close(connSql, upreSta)
sessionProcess.sparkSession.stop()
} catch {
case e: Exception => {
println("-----------------------------------任务异常---------------------------------------------------")
val exceptionSQL: String =
s"""
|update ${MyConfigSession.JDBC_TABLE} set status=?,exception=?,end_time=? where job_id=1969 and start_time='${startTime}'
""".stripMargin
val errorArr = Array[String]("2", e.getMessage, DateUtils.getTodayTime)
JDBCUtil.insertRecord(connSql, exceptionSQL, errorArr)
connSql.close()
}
}
}
/** /**
* @Description 按照TimeGap切割session,重命名session_id * @Description 按照TimeGap切割session,重命名session_id
* @param dataFrame 要处理的DataFrame * @param dataFrame 要处理的DataFrame
* @param sessionProcess SessionProcess对象,包含SparkSession 环境 * @param sessionProcess SessionProcess对象,包含SparkSession 环境
* @return org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> * @return org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>
**/ **/
def getSessionId(dataFrame: DataFrame,sessionProcess: SessionProcess):DataFrame = { def getSessionId(dataFrame: DataFrame,sessionProcess: SessionProcess):DataFrame = {
import sessionProcess.sparkSession.implicits._ import sessionProcess.sparkSession.implicits._
...@@ -330,7 +385,7 @@ object SessionProcess { ...@@ -330,7 +385,7 @@ object SessionProcess {
val coalesceDF: DataFrame = rcreDF.selectExpr( val coalesceDF: DataFrame = rcreDF.selectExpr(
"pseudo_session", "user_id", "mobile", "device_token", "pseudo_session", "user_id", "mobile", "device_token",
"user_token", "view_class", "view_path", "action_type", "user_token", "view_class", "view_path", "action_type",
"component_tag", "menu_code", "action_code", "position", "label_value", "component_tag", "menu_code", "action_code", "position", "label_value","label_class",
"app_version", "device_type", "device_brand", "device_model", "app_version", "device_type", "device_brand", "device_model",
"device_system", "net_type", "created_time", "date_time", "device_system", "net_type", "created_time", "date_time",
"COALESCE(refer_created,created_time) as refer_created") "COALESCE(refer_created,created_time) as refer_created")
...@@ -341,11 +396,11 @@ object SessionProcess { ...@@ -341,11 +396,11 @@ object SessionProcess {
//计算两者之差,这时候就得到了 session_id //计算两者之差,这时候就得到了 session_id
val sessionIdDF: DataFrame = groupRDD.map(sessionProcess.computeSessionId).flatMap(it => it) val sessionIdDF: DataFrame = groupRDD.map(sessionProcess.computeSessionId).flatMap(it => it)
.toDF("session_id", "user_id", "mobile", "device_token", "user_token", .toDF("session_id", "user_id", "mobile", "device_token", "user_token",
"view_class", "view_path", "action_type", "component_tag", "view_class", "view_path", "action_type", "component_tag",
"menu_code", "action_code", "position", "label_value", "menu_code", "action_code", "position", "label_value","label_class",
"app_version", "device_type", "device_brand", "device_model", "app_version", "device_type", "device_brand", "device_model",
"device_system", "net_type", "created_time", "date_time") "device_system", "net_type", "created_time", "date_time")
sessionIdDF sessionIdDF
} }
...@@ -365,29 +420,49 @@ object SessionProcess { ...@@ -365,29 +420,49 @@ object SessionProcess {
//以下的所有逻辑是为了补全user_id字段 //以下的所有逻辑是为了补全user_id字段
//第一步:首先筛选出不符合的use_id数据,将这些user_id置为字符串'0' //第一步:首先筛选出不符合的use_id数据,将这些user_id置为字符串'0'
val noMatchUserIdDF: Dataset[Row] = DF.where("user_id ='' OR user_id = '0' OR LENGTH(user_id) = 24") val noMatchUserIdDF: Dataset[Row] = DF.where("user_id ='' OR user_id = '0' OR LENGTH(user_id) = 24")
.selectExpr("session_id","'0' as user_id", "mobile", "device_token", "user_token", .selectExpr("session_id","'0' as user_id", "mobile", "device_token", "user_token",
"view_class", "view_path", "action_type", "component_tag", "view_class", "view_path", "action_type", "component_tag",
"menu_code", "action_code", "position", "label_value", "menu_code", "action_code", "position", "label_value","label_class",
"app_version", "device_type", "device_brand", "device_model", "app_version", "device_type", "device_brand", "device_model",
"device_system", "net_type", "created_time", "date_time") "device_system", "net_type", "created_time", "date_time")
noMatchUserIdDF.createOrReplaceTempView(MyConfigSession.VIEW_SESSION_NO_MATCH) noMatchUserIdDF.createOrReplaceTempView(MyConfigSession.VIEW_SESSION_NO_MATCH)
//使用临时表equiment,筛选出为1的那条最新数据 //1.筛选出上一步没有匹配到的user_id,先按照mobile_phone进行匹配
val equipmentDF: DataFrame = sparkSQLSession.sql(MyConfigSession.EQUIPMENT_INFO_SQL).where("row_d =1") val mobilePhoneDF: DataFrame = sparkSQLSession.sql(MyConfigSession.MOBILE_PHONE_SQL)
mobilePhoneDF.createOrReplaceTempView(MyConfigSession.VIEW_MOBILE_PHONE)
//2.使用临时表equiment,筛选出为1的那条最新数据
var equipmentInfoSql = MyConfigSession.EQUIPMENT_INFO_SQL
if(!created_day.equals(DateUtils.getYesterdayDate)){//如果不是跑昨天的数据,使用equipment拉链表
equipmentInfoSql = MyConfigSession.EQUIPMENT_INFO_SQL_ARGS+ s"'${created_day}'"
}
println(s"equipmentInfoSql=${equipmentInfoSql}")
val equipmentDF: DataFrame = sparkSQLSession.sql(equipmentInfoSql).where("row_d =1")
equipmentDF.createOrReplaceTempView(MyConfigSession.VIEW_EQUIPMENT_INFO) equipmentDF.createOrReplaceTempView(MyConfigSession.VIEW_EQUIPMENT_INFO)
//1.将第一步筛选出来的数据先按照device_token进行匹配,获得user_id //3.将第2步筛选出来的数据按照device_token进行匹配,获得user_id
val deviceTokenDF: DataFrame = sparkSQLSession.sql(MyConfigSession.DEVICE_TOKEN_SQL) val deviceTokenDF: DataFrame = sparkSQLSession.sql(MyConfigSession.DEVICE_TOKEN_SQL)
deviceTokenDF.createOrReplaceTempView(MyConfigSession.VIEW_DEVICE_TOKEN)
//2.筛选出上一步没有匹配到的user_id,再按照mobile_phone进行匹配 //4.将上述三者union,最终导入表中的数据
val mobilePhoneDF: DataFrame = sparkSQLSession.sql(MyConfigSession.MOBILE_PHONE_SQL)
//3.将上述三者union,最终导入表中的数据
val deviceToken: Dataset[Row] = deviceTokenDF.where("user_id !='0'")
val rightUserId: Dataset[Row] = DF.where("user_id !='' and user_id != '0' and LENGTH(user_id) !=24") val rightUserId: Dataset[Row] = DF.where("user_id !='' and user_id != '0' and LENGTH(user_id) !=24")
val dwFactLogSession: Dataset[Row] = rightUserId.union(deviceToken).union(mobilePhoneDF) val mobilePhoneResDF: Dataset[Row] = mobilePhoneDF.where("user_id !='0'")
dwFactLogSession val dwFactLogSession: Dataset[Row] = rightUserId.union(mobilePhoneResDF).union(deviceTokenDF)
dwFactLogSession.createOrReplaceTempView(MyConfigSession.VIEW_DEVICE_TOKEN)
//根据pica_doctor补充user_id_int字段(字段类型转换成int型), 限制 delete_flag = 1 and creat_time截止昨日创建,未关联上显示为0
val USER_ID_INT_SQL:String=
s"""
|SELECT concat(regexp_replace( '${created_day}',"-","") ,cast(row_number() over(partition by 1 order by created_time) as string)) as id,
|ss.session_id, ss.user_id,COALESCE(b.id,0) user_id_int, ss.mobile, ss.device_token, ss.user_token,
|ss.view_class,ss.view_path,ss.action_type,ss.component_tag, ss.menu_code,
|ss.action_code, ss.position,ss.label_value,ss.label_class,ss.app_version, ss.device_type,
|ss.device_brand, ss.device_model,ss.device_system,ss.net_type,ss.created_time,
|ss.date_time from ${MyConfigSession.VIEW_DEVICE_TOKEN} AS ss
|left join (select id,cast(id as string) id_str from pica_ds.pica_doctor a where a.delete_flag = 1 and to_date(a.creat_time) <= '${created_day}') AS b on ss.user_id = b.id_str
|""".stripMargin
val userIdDF: DataFrame = sparkSQLSession.sql(USER_ID_INT_SQL)
userIdDF
} }
} }
...@@ -20,200 +20,6 @@ import scala.collection.mutable.ListBuffer ...@@ -20,200 +20,6 @@ import scala.collection.mutable.ListBuffer
* @Date 2020/4/1 15:26 * @Date 2020/4/1 15:26
* @Version 1.0 * @Version 1.0
*/ */
class SessionProcessArgs extends java.io.Serializable {
def getSparkSession(appName: String): SparkSession = {
val conf: SparkConf = new SparkConf().setAppName(appName)
UseUtil.setConfigure(conf)
val sparkSession: SparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
sparkSession
}
val sparkSession: SparkSession = getSparkSession("SessionProcessArgs")
//获取符合要求的actionType广播变量
val actionTypeBroad =
UseUtil.getBroadcast(sparkSession, MyConfigSession.ACTION_TYPE_SQL, "action_type", "is_valid")
//获取clasName广播变量
val classNameBroad =
UseUtil.getBroadcast(sparkSession, MyConfigSession.CLASS_NAME_SQL, "class_name", "is_valid")
//获取menu_code广播变量
val menuCodeBroad =
UseUtil.getBroadcast(sparkSession, MyConfigSession.MENU_CODE_SQL, "view_path", "menu_code")
//获取actionCategory变量
val actionCategory =
UseUtil.getBroadcast(sparkSession,MyConfigSession.ACTION_CATEGORY_SQL,"action_type","action_category")
//定义函数式变量,过滤映射表数据
val filterRows: Iterator[Row] => Iterator[Row] = (rows: Iterator[Row]) => {
val rowList: ListBuffer[Row] = new ListBuffer[Row]()
val actionTypeMap: Map[String, String] = actionTypeBroad.value
val classNameMap: Map[String, String] = classNameBroad.value
//关联到action_category的映射表广播变量
val actionCategoryMap: Map[String, String] = actionCategory.value
rows.toList.foreach(row => {
//筛选action的条件
val action: String = StringUtils.getNotNullString(row.getAs[String]("action"))
//说明该action类型即为所要的
if (actionTypeMap.getOrElse(action, "-1").equals("1")) {
//将action转化为映射表中对应的标准actionCategory
val action_type: String = actionCategoryMap.getOrElse(action,"ACTION")
// val action_type = UseUtil.getActionType(action)
//action为其中的任何一个
if (action_type.equals("ACTION_CLICK") || action_type.equals("ACTION_EXPOSE")) {
//判断 component_tag 必须要包含 "#"
if (row.getAs[String]("component_tag") != null
&& row.getAs[String]("component_tag").contains("#")) {
rowList += row
}
} else if (action_type.equals("ACTION_VIEW")) {
rowList += row
//非上述三种action_type,那么需要过滤掉映射表中class_name为"0"对应的那些数据
} else if (row.getAs[String]("class_name") != null
&& !classNameMap.getOrElse(row.getAs[String]("class_name"), "-1").equals("0")) {
rowList += row
}
}
})
rowList.iterator
}
//处理字段,得到需要的字段值
val processColumns = (rows: Iterator[Row]) => {
val baseList = new ListBuffer[(String, String, String, String, String, String, String, String, String,
String, String, String, String, String, String, String, String, String, String, String, String)]()
//关联到menu_code的映射表广播变量
val path_menu: Map[String, String] = menuCodeBroad.value
//关联到action_category的映射表广播变量
val actionCategoryMap: Map[String, String] = actionCategory.value
rows.toList.foreach(row => {
//1.获取网络类型
//2G,3G,4G,2G/3G/4G,WIFI,WLAN,或者为空字符串
val net_type = UseUtil.netTypeMatch(StringUtils.getNotNullString(row.getAs[String]("network_type")))
//2.修改action类型
var action_type: String = ""
if (row.getAs[String]("action") !=null) {
action_type = actionCategoryMap.getOrElse(row.getAs[String]("action"),"ACTION")
}
//3.拆分 component_tag字段
val component_tag: String = StringUtils.getNotNullString(row.getAs[String]("component_tag"))
var menu_code: String = ""
var action_code: String = ""
var position: String = ""
var label_value: String = ""
//将符合要求的component_tag进行切割,获取 aciton_code,label_value
if (component_tag.contains("#")) {
//按照#号切割
val strs: Array[String] = component_tag.split("#")
strs.length match {
case 4 => {
menu_code = strs(0)
action_code = strs(1)
position = strs(2)
label_value = strs(3)
}
case 3 => {
menu_code = strs(0)
action_code = strs(1)
position = strs(2)
}
case 2 => {
menu_code = strs(0)
action_code = strs(1)
}
case 1 => {
menu_code = strs(0)
}
}
}
//匹配menu_code:如果上述截取出来的menu_code为(''||null||0||length(menu_code)>3 ) and action is ACTION_VIEW
if ((menu_code.equals("")|| menu_code.equals("null") || menu_code.equals("0") || menu_code.length> 3 )
&& action_type.equals("ACTION_VIEW")) {
menu_code = "0" //关联不上的显示为0
import scala.util.control.Breaks._
breakable {
//利用menu_code映射表匹配
for (tuple <- path_menu) {
//源数据view_path的字符串包含映射表view_path的字符串
if (StringUtils.getNotNullString(row.getAs[String]("view_path")).contains(tuple._1)) {
//满足条件后,修改源数据的menu_code
menu_code = tuple._2
println("--------------------menu_code match successfully-----------------------")
//结束遍历
break()
}
}
//经过上述匹配,如果menu_code仍然为空串,那么置为component_tag字段一样
if (menu_code.equals("")) {
menu_code = component_tag
}
}
}
//一行数据添加到List中
baseList += ((StringUtils.getNotNullString(row.getAs[String]("pseudo_session")),
StringUtils.getNotNullString(row.getAs[String]("doctor_id")),
StringUtils.getNotNullString(row.getAs[String]("mobile")),
StringUtils.getNotNullString(row.getAs[String]("device_token")),
StringUtils.getNotNullString(row.getAs[String]("user_token_tourist")),
StringUtils.getNotNullString(row.getAs[String]("class_name")),
StringUtils.getNotNullString(row.getAs[String]("view_path")),
action_type, component_tag, menu_code, action_code, position, label_value,
StringUtils.getNotNullString(row.getAs[String]("app_version")),
StringUtils.getNotNullString(row.getAs[String]("device_type")),
StringUtils.getNotNullString(row.getAs[String]("device_brand")),
StringUtils.getNotNullString(row.getAs[String]("device_model")),
"", net_type,
StringUtils.getNotNullString(row.getAs[String]("created")),
DateUtils.milliSecondsFormatTime(StringUtils.getNotNullString(row.getAs[String]("created")))))
})
baseList.iterator
}
//按照time_gap 切割session,计算session_id
val computeSessionId = (tuple: (String, Iterable[Row])) => {
//先按照 pseudo_session 的值命名 sessionID
val sessionID: String = tuple._1
val rowList: Iterable[Row] = tuple._2
//定义一个累加量
var count: Int = 0
//存储一行的数据
val list = new ListBuffer[(String, String, String, String, String, String, String, String, String,
String, String, String, String, String, String, String, String, String, String, String, String)]()
rowList.toList.foreach(row => {
val created: String = StringUtils.getNotNullString(row.getAs[String]("created_time"))
val refer_created: String = StringUtils.getNotNullString(row.getAs[String]("refer_created"))
val time_diff: Long = created.toLong - refer_created.toLong
//相邻的时间差小于等于30分钟,就是同一个 sessionID
if (time_diff > MyConfigSession.SESSION_GAP) {
count = count + 1
}
//添加到List
list += ((sessionID + count,
StringUtils.getNotNullString(row.getAs[String]("user_id")),
StringUtils.getNotNullString(row.getAs[String]("mobile")),
StringUtils.getNotNullString(row.getAs[String]("device_token")),
StringUtils.getNotNullString(row.getAs[String]("user_token")),
StringUtils.getNotNullString(row.getAs[String]("view_class")),
StringUtils.getNotNullString(row.getAs[String]("view_path")),
StringUtils.getNotNullString(row.getAs[String]("action_type")),
StringUtils.getNotNullString(row.getAs[String]("component_tag")),
StringUtils.getNotNullString(row.getAs[String]("menu_code")),
StringUtils.getNotNullString(row.getAs[String]("action_code")),
StringUtils.getNotNullString(row.getAs[String]("position")),
StringUtils.getNotNullString(row.getAs[String]("label_value")),
StringUtils.getNotNullString(row.getAs[String]("app_version")),
StringUtils.getNotNullString(row.getAs[String]("device_type")),
StringUtils.getNotNullString(row.getAs[String]("device_brand")),
StringUtils.getNotNullString(row.getAs[String]("device_model")),
StringUtils.getNotNullString(row.getAs[String]("device_system")),
StringUtils.getNotNullString(row.getAs[String]("net_type")),
created, StringUtils.getNotNullString(row.getAs[String]("date_time"))))
})
list
}
}
object SessionProcessArgs { object SessionProcessArgs {
def apply(): SessionProcessArgs = new SessionProcessArgs() def apply(): SessionProcessArgs = new SessionProcessArgs()
...@@ -237,47 +43,88 @@ object SessionProcessArgs { ...@@ -237,47 +43,88 @@ object SessionProcessArgs {
val flag: Int = JDBCUtil.insertRecord(connSql, insertSQL, insertArr) val flag: Int = JDBCUtil.insertRecord(connSql, insertSQL, insertArr)
try { try {
val sessionProcess: SessionProcessArgs = SessionProcessArgs() val sessionProcess: SessionProcessArgs = SessionProcessArgs()
//获取源数据,注意这里指定了日期参数 // 获取源数据,注意这里指定了日期参数,
val sourceDF: DataFrame = sessionProcess.sparkSession.sql(MyConfigSession.SOURCE_SQL_ARGS + s" and created_day='${args(0)}'") // step1:获取源数据,重新分区,产生shuffle,Spark读Hive默认的分区数太少,并对数据去重
//1.重新分区,产生shuffle,Spark读Hive默认的分区数太少 val sourceDF: DataFrame = sessionProcess.sparkSession.sql(MyConfigSession.SOURCE_SQL_ARGS + s" and created_day='${args(0)}'").distinct()
//2.过滤数据,解析创建时间,只获取昨天产生的数据 //step2:抽取出当天pseudo_session对应的非空的device_token,doctor_id,mobile,补充到对应的pseudo_session下这几项为空的记录中
//3.过滤重复的记录 val groupRdd = sourceDF.rdd.groupBy(r => r.getAs[String]("pseudo_session"))
//4.利用action_type和class_name过滤 val resRdd = groupRdd.flatMap(g => {
val filterDS: Dataset[Row] = sourceDF.repartition(200).filter(row => { val pseudo_session = g._1
var createdTime: String = row.getAs[String]("created") val resList: ListBuffer[Row] = new ListBuffer[Row]()
//防止出错 var rowList = g._2
if (createdTime == null) { rowList = rowList.toList.sortWith((x,y)=>x.getAs[String]("created") > y.getAs[String]("created"))//按created由大到小排序
createdTime = "0" var thisDeviceToken = ""
var thisDoctorId = "0"
var thisMobile = ""
rowList.foreach(row => {
var deviceToken = row.getAs[String]("device_token")
var doctorId = row.getAs[String]("doctor_id")
var mobile = row.getAs[String]("mobile")
val created = row.getAs[String]("created")
if(deviceToken!=null && !deviceToken.equals("") ){
thisDeviceToken = deviceToken
}else {
deviceToken = thisDeviceToken
} }
//注意这里的过滤条件,数据的批次时间要和数据产生的年月日一样,也就是当天的数据 if(doctorId!=null && !doctorId.equals("") && !doctorId.equals("0") ){
scnData.equals(DateUtils.milliSecondsFormatTime(createdTime).substring(0, 10)) thisDoctorId = doctorId
}).distinct() }else {
doctorId = thisDoctorId
import sessionProcess.sparkSession.implicits._ }
if(mobile!=null && !mobile.equals("") ){
thisMobile = mobile
}else {
mobile = thisMobile
}
resList += (Row(row.getAs[String]("pseudo_session"),
doctorId,
mobile,
deviceToken,
row.getAs[String]("user_token_tourist"),
row.getAs[String]("class_name"),
row.getAs[String]("view_path"),
row.getAs[String]("action"),
row.getAs[String]("component_tag"),
row.getAs[String]("app_version"),
row.getAs[String]("device_type"),
row.getAs[String]("device_brand"),
row.getAs[String]("device_model"),
row.getAs[String]("network_type"),
row.getAs[String]("created")))
})
resList.iterator
})
import sessionProcess.sparkSession.implicits._
//step3:根据映射表来进行action_type和class_name数据过滤
val resDF = sessionProcess.sparkSession.createDataFrame(resRdd,sourceDF.schema)
resDF.persist(StorageLevel.MEMORY_AND_DISK_SER)
println("resDF.show=======>")
resDF.show()
//根据映射表来进行action_type和class_name数据过滤 //根据映射表来进行action_type和class_name数据过滤
val data: RDD[Row] = filterDS.rdd.mapPartitions(sessionProcess.filterRows) val data: RDD[Row] = resDF.rdd.mapPartitions(sessionProcess.filterRows)
println("---------------------------------------process columns-------------------------------------------") println("---------------------------------------process columns-------------------------------------------")
val baseDF: DataFrame = data.mapPartitions(sessionProcess.processColumns) val baseDF: DataFrame = data.mapPartitions(sessionProcess.processColumns)
.toDF("pseudo_session", "user_id", "mobile", "device_token", "user_token", "view_class", "view_path", .toDF("pseudo_session", "user_id", "mobile", "device_token", "user_token", "view_class", "view_path",
"action_type", "component_tag", "menu_code", "action_code", "position", "label_value", "app_version", "action_type", "component_tag", "menu_code", "action_code", "position", "label_value","label_class", "app_version",
"device_type", "device_brand", "device_model", "device_system", "net_type", "created_time", "date_time") "device_type", "device_brand", "device_model", "device_system", "net_type", "created_time", "date_time")
println("----------------------------------compute session id---------------------------------------------") println("----------------------------------compute session id---------------------------------------------")
val sessionIdDF: DataFrame = getSessionId(baseDF,sessionProcess) val sessionIdDF: DataFrame = sessionProcess.getSessionId(baseDF,sessionProcess)
//默认缓存级别是:MEMORY_AND_DISK //默认缓存级别是:MEMORY_AND_DISK
sessionIdDF.persist(StorageLevel.MEMORY_AND_DISK_SER) sessionIdDF.persist(StorageLevel.MEMORY_AND_DISK_SER)
println("-------------------------------match user_id 逻辑-------------------------------------------------") println("-------------------------------match user_id 逻辑-------------------------------------------------")
val dwFactLogSession: DataFrame = matchUserId(sessionIdDF,sessionProcess.sparkSession,scnData) val dwFactLogSession: DataFrame = sessionProcess.matchUserId(sessionIdDF,sessionProcess.sparkSession,scnData)
println("-----------------create view fact_log_session and load to dw_fact_log_session--------------------") println("-----------------create view fact_log_session and load to dw_fact_log_session--------------------")
dwFactLogSession.createOrReplaceTempView("fact_log_session") dwFactLogSession.createOrReplaceTempView("fact_log_session")
val loadDataSql = val loadDataSql =
s"insert overwrite table ${MyConfigSession.HIVE_TABLE1} partition(created_day='${scnData}') select * from fact_log_session distribute by rand()" s"insert overwrite table ${MyConfigSession.HIVE_TABLE1_TMP} partition(created_day='${scnData}') select * from fact_log_session distribute by rand()"
sessionProcess.sparkSession.sql(loadDataSql) sessionProcess.sparkSession.sql(loadDataSql)
println("----------------------------------update task record table---------------------------------------") println("----------------------------------update task record table---------------------------------------")
...@@ -311,85 +158,299 @@ object SessionProcessArgs { ...@@ -311,85 +158,299 @@ object SessionProcessArgs {
} }
/**
* @Description 按照TimeGap切割session,重命名session_id
* @param dataFrame 要处理的DataFrame
* @param sessionProcess SessionProcessArgs对象,包含SparkSession 环境
* @return org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>
**/
def getSessionId(dataFrame: DataFrame,sessionProcess:SessionProcessArgs):DataFrame = {
import sessionProcess.sparkSession.implicits._
//先按照 pseudo_session 分组,然后按照 created 排序,组件一个窗口
val pSessionWinSpec: WindowSpec = Window.partitionBy("pseudo_session").orderBy("created_time")
//增加一个字段 refer_created ,这个字段的值是上一条记录 created 字段的值,方便后面通过两者差值计算出 session_id
val rcreDF: DataFrame =
dataFrame.withColumn("refer_created", lag(dataFrame("created_time"), 1).over(pSessionWinSpec))
//执行COALESCE,目的是为了去掉 refer_created 为Null的值
val coalesceDF: DataFrame = rcreDF.selectExpr(
"pseudo_session", "user_id", "mobile", "device_token",
"user_token", "view_class", "view_path", "action_type",
"component_tag", "menu_code", "action_code", "position", "label_value",
"app_version", "device_type", "device_brand", "device_model",
"device_system", "net_type", "created_time", "date_time",
"COALESCE(refer_created,created_time) as refer_created")
//按照 pseudo_session聚合,计算 session_id
val groupRDD: RDD[(String, Iterable[Row])] =
coalesceDF.rdd.groupBy(row => row.getAs[String]("pseudo_session"))
//计算两者之差,这时候就得到了 session_id
val sessionIdDF: DataFrame = groupRDD.map(sessionProcess.computeSessionId).flatMap(it => it)
.toDF("session_id", "user_id", "mobile", "device_token", "user_token",
"view_class", "view_path", "action_type", "component_tag",
"menu_code", "action_code", "position", "label_value",
"app_version", "device_type", "device_brand", "device_model",
"device_system", "net_type", "created_time", "date_time")
sessionIdDF
}
/** }
* @Description 匹配user_id,补全数据中的user_id字段
* @param dataFrame 筛选后的数据
* @param sparkSQLSession SparkSession 环境
* @param created_day 当前数据的日期,格式 "2020-03-01"
* @return org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>
**/
def matchUserId(dataFrame: DataFrame,sparkSQLSession: SparkSession,created_day:String):DataFrame={
//追加:将dataFrame与pica_ds.pica_doctor根据user_id进行匹配,匹配不上的user_id置为'0'
println("matchUserId开始执行-----------------------------------")
dataFrame.createOrReplaceTempView(MyConfigSession.VIEW_SESSION_ODS)
val DF = sparkSQLSession.sql(MyConfigSession.INIT_USER_ID_SQL)
//以下的所有逻辑是为了补全user_id字段
//第一步:首先筛选出不符合的use_id数据
val noMatchUserIdDF: Dataset[Row] = DF.where("user_id ='' OR user_id = '0' OR LENGTH(user_id) = 24")
.selectExpr("session_id","'0' as user_id", "mobile", "device_token", "user_token",
"view_class", "view_path", "action_type", "component_tag",
"menu_code", "action_code", "position", "label_value",
"app_version", "device_type", "device_brand", "device_model",
"device_system", "net_type", "created_time", "date_time")
noMatchUserIdDF.createOrReplaceTempView(MyConfigSession.VIEW_SESSION_NO_MATCH)
//使用临时表equiment,筛选出为1的那条最新数据
val equipmentDF: DataFrame = sparkSQLSession.sql(MyConfigSession.EQUIPMENT_INFO_SQL_ARGS + s"'${created_day}'").where("row_d =1")
equipmentDF.createOrReplaceTempView(MyConfigSession.VIEW_EQUIPMENT_INFO)
//1.将第一步筛选出来的数据先按照device_token进行匹配,获得user_id
val deviceTokenDF: DataFrame = sparkSQLSession.sql(MyConfigSession.DEVICE_TOKEN_SQL)
deviceTokenDF.createOrReplaceTempView(MyConfigSession.VIEW_DEVICE_TOKEN)
//2.筛选出上一步没有匹配到的user_id,再按照mobile_phone进行匹配
val mobilePhoneDF: DataFrame = sparkSQLSession.sql(MyConfigSession.MOBILE_PHONE_SQL)
//3.将上述三者union,最终导入表中的数据
val deviceToken: Dataset[Row] = deviceTokenDF.where("user_id !='0'")
val rightUserId: Dataset[Row] = DF.where("user_id !='' and user_id != '0' and LENGTH(user_id) !=24")
val dwFactLogSession: Dataset[Row] = rightUserId.union(deviceToken).union(mobilePhoneDF)
dwFactLogSession.createOrReplaceTempView(MyConfigSession.VIEW_MOBILE_PHONE)
val userIdDF: DataFrame = sparkSQLSession.sql(MyConfigSession.USER_ID_INT_SQL)
userIdDF
}
class SessionProcessArgs extends java.io.Serializable {
def getSparkSession(appName: String): SparkSession = {
val conf: SparkConf = new SparkConf().setAppName(appName)
UseUtil.setConfigure(conf)
val sparkSession: SparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
sparkSession
}
val sparkSession: SparkSession = getSparkSession("SessionProcessArgs")
//获取符合要求的actionType广播变量
val actionTypeBroad =
UseUtil.getBroadcast(sparkSession, MyConfigSession.ACTION_TYPE_SQL, "action_type", "is_valid")
//获取clasName广播变量
val classNameBroad =
UseUtil.getBroadcast(sparkSession, MyConfigSession.CLASS_NAME_SQL, "class_name", "is_valid")
//获取menu_code广播变量
val menuCodeBroad =
UseUtil.getBroadcast(sparkSession, MyConfigSession.MENU_CODE_SQL, "view_path", "menu_code")
//获取actionCategory变量
val actionCategory =
UseUtil.getBroadcast(sparkSession,MyConfigSession.ACTION_CATEGORY_SQL,"action_type","action_category")
//定义函数式变量,过滤映射表数据
val filterRows: Iterator[Row] => Iterator[Row] = (rows: Iterator[Row]) => {
val rowList: ListBuffer[Row] = new ListBuffer[Row]()
val actionTypeMap: Map[String, String] = actionTypeBroad.value
val classNameMap: Map[String, String] = classNameBroad.value
//关联到action_category的映射表广播变量
val actionCategoryMap: Map[String, String] = actionCategory.value
rows.toList.foreach(row => {
//筛选action的条件
val action: String = StringUtils.getNotNullString(row.getAs[String]("action"))
//说明该action类型即为所要的
if (actionTypeMap.getOrElse(action, "-1").equals("1")) {
//将action转化为映射表中对应的标准actionCategory
val action_type: String = actionCategoryMap.getOrElse(action,"ACTION")
// val action_type = UseUtil.getActionType(action)
//action为其中的任何一个
if (action_type.equals("ACTION_CLICK") || action_type.equals("ACTION_EXPOSE")) {
//判断 component_tag 必须要包含 "#"
if (row.getAs[String]("component_tag") != null
&& row.getAs[String]("component_tag").contains("#")) {
rowList += row
}
} else if (action_type.equals("ACTION_VIEW")) {
rowList += row
//非上述三种action_type,那么需要过滤掉映射表中class_name为"0"对应的那些数据
} else if (row.getAs[String]("class_name") != null
&& !classNameMap.getOrElse(row.getAs[String]("class_name"), "-1").equals("0")) {
rowList += row
}
}
})
rowList.iterator
}
//处理字段,得到需要的字段值
val processColumns = (rows: Iterator[Row]) => {
val baseList = new ListBuffer[(String, String, String, String, String, String, String, String, String,
String, String, String, String, String, String, String, String, String, String, String,String, String)]()
//关联到menu_code的映射表广播变量
val path_menu: Map[String, String] = menuCodeBroad.value
//关联到action_category的映射表广播变量
val actionCategoryMap: Map[String, String] = actionCategory.value
rows.toList.foreach(row => {
//1.获取网络类型
//2G,3G,4G,2G/3G/4G,WIFI,WLAN,或者为空字符串
val net_type = UseUtil.netTypeMatch(StringUtils.getNotNullString(row.getAs[String]("network_type")))
//2.修改action类型
var action_type: String = ""
if (row.getAs[String]("action") !=null) {
action_type = actionCategoryMap.getOrElse(row.getAs[String]("action"),"ACTION")
}
//3.拆分 component_tag字段
val component_tag: String = StringUtils.getNotNullString(row.getAs[String]("component_tag"))
var menu_code: String = ""
var action_code: String = ""
var position: String = ""
var label_value: String = ""
var label_class:String = ""
//将符合要求的component_tag进行切割,获取 aciton_code,label_value
if (component_tag.contains("#")) {
//按照#号切割
val strs: Array[String] = component_tag.split("#")
strs.length match {
case 1 => {
menu_code = strs(0)
}
case 2 => {
menu_code = strs(0)
action_code = strs(1)
}
case 3 => {
menu_code = strs(0)
action_code = strs(1)
position = strs(2)
}
case 4 => {
menu_code = strs(0)
action_code = strs(1)
position = strs(2)
label_value = strs(3).substring(0,math.min(250,strs(3).length))
}
case _ => {
menu_code = strs(0)
action_code = strs(1)
position = strs(2)
label_value = strs(3).substring(0,math.min(250,strs(3).length))
label_class = strs(4).substring(0,math.min(250,strs(4).length))
}
}
}
//匹配menu_code:如果上述截取出来的menu_code为(''||null||0||length(menu_code)>3 ) and action is ACTION_VIEW
if ((menu_code.equals("")|| menu_code.equals("null") || menu_code.equals("0") || menu_code.length> 3 )
&& action_type.equals("ACTION_VIEW")) {
menu_code = "0" //关联不上的显示为0
import scala.util.control.Breaks._
breakable {
//利用menu_code映射表匹配
for (tuple <- path_menu) {
//源数据view_path的字符串包含映射表view_path的字符串
if (StringUtils.getNotNullString(row.getAs[String]("view_path")).contains(tuple._1)) {
//满足条件后,修改源数据的menu_code
menu_code = tuple._2
println("--------------------menu_code match successfully-----------------------")
//结束遍历
break()
}
}
//经过上述匹配,如果menu_code仍然为空串,那么置为component_tag字段一样
if (menu_code.equals("")) {
menu_code = component_tag
}
}
}
//一行数据添加到List中
baseList += ((StringUtils.getNotNullString(row.getAs[String]("pseudo_session")),
StringUtils.getNotNullString(row.getAs[String]("doctor_id")),
StringUtils.getNotNullString(row.getAs[String]("mobile")),
StringUtils.getNotNullString(row.getAs[String]("device_token")),
StringUtils.getNotNullString(row.getAs[String]("user_token_tourist")),
StringUtils.getNotNullString(row.getAs[String]("class_name")),
StringUtils.getNotNullString(row.getAs[String]("view_path")),
action_type, component_tag, menu_code, action_code, position, label_value,label_class,
StringUtils.getNotNullString(row.getAs[String]("app_version")),
StringUtils.getNotNullString(row.getAs[String]("device_type")),
StringUtils.getNotNullString(row.getAs[String]("device_brand")),
StringUtils.getNotNullString(row.getAs[String]("device_model")),
"", net_type,
StringUtils.getNotNullString(row.getAs[String]("created")),
DateUtils.milliSecondsFormatTime(StringUtils.getNotNullString(row.getAs[String]("created")))))
})
baseList.iterator
}
//按照time_gap 切割session,计算session_id
val computeSessionId = (tuple: (String, Iterable[Row])) => {
//先按照 pseudo_session 的值命名 sessionID
val sessionID: String = tuple._1
val rowList: Iterable[Row] = tuple._2
//定义一个累加量
var count: Int = 0
//存储一行的数据
val list = new ListBuffer[(String, String, String, String, String, String, String, String, String,
String, String, String, String, String, String, String, String, String, String, String,String, String)]()
rowList.toList.foreach(row => {
val created: String = StringUtils.getNotNullString(row.getAs[String]("created_time"))
val refer_created: String = StringUtils.getNotNullString(row.getAs[String]("refer_created"))
val time_diff: Long = created.toLong - refer_created.toLong
//相邻的时间差小于等于30分钟,就是同一个 sessionID
if (time_diff > MyConfigSession.SESSION_GAP) {
count = count + 1
}
//添加到List
list += ((sessionID + count,
StringUtils.getNotNullString(row.getAs[String]("user_id")),
StringUtils.getNotNullString(row.getAs[String]("mobile")),
StringUtils.getNotNullString(row.getAs[String]("device_token")),
StringUtils.getNotNullString(row.getAs[String]("user_token")),
StringUtils.getNotNullString(row.getAs[String]("view_class")),
StringUtils.getNotNullString(row.getAs[String]("view_path")),
StringUtils.getNotNullString(row.getAs[String]("action_type")),
StringUtils.getNotNullString(row.getAs[String]("component_tag")),
StringUtils.getNotNullString(row.getAs[String]("menu_code")),
StringUtils.getNotNullString(row.getAs[String]("action_code")),
StringUtils.getNotNullString(row.getAs[String]("position")),
StringUtils.getNotNullString(row.getAs[String]("label_value")),
StringUtils.getNotNullString(row.getAs[String]("label_class")),
StringUtils.getNotNullString(row.getAs[String]("app_version")),
StringUtils.getNotNullString(row.getAs[String]("device_type")),
StringUtils.getNotNullString(row.getAs[String]("device_brand")),
StringUtils.getNotNullString(row.getAs[String]("device_model")),
StringUtils.getNotNullString(row.getAs[String]("device_system")),
StringUtils.getNotNullString(row.getAs[String]("net_type")),
created, StringUtils.getNotNullString(row.getAs[String]("date_time"))))
})
list
}
/**
* @Description 按照TimeGap切割session,重命名session_id
* @param dataFrame 要处理的DataFrame
* @param sessionProcess SessionProcessArgs对象,包含SparkSession 环境
* @return org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>
**/
def getSessionId(dataFrame: DataFrame,sessionProcess:SessionProcessArgs):DataFrame = {
import sessionProcess.sparkSession.implicits._
//先按照 pseudo_session 分组,然后按照 created 排序,组件一个窗口
val pSessionWinSpec: WindowSpec = Window.partitionBy("pseudo_session").orderBy("created_time")
//增加一个字段 refer_created ,这个字段的值是上一条记录 created 字段的值,方便后面通过两者差值计算出 session_id
val rcreDF: DataFrame =
dataFrame.withColumn("refer_created", lag(dataFrame("created_time"), 1).over(pSessionWinSpec))
//执行COALESCE,目的是为了去掉 refer_created 为Null的值
val coalesceDF: DataFrame = rcreDF.selectExpr(
"pseudo_session", "user_id", "mobile", "device_token",
"user_token", "view_class", "view_path", "action_type",
"component_tag", "menu_code", "action_code", "position", "label_value","label_class",
"app_version", "device_type", "device_brand", "device_model",
"device_system", "net_type", "created_time", "date_time",
"COALESCE(refer_created,created_time) as refer_created")
//按照 pseudo_session聚合,计算 session_id
val groupRDD: RDD[(String, Iterable[Row])] =
coalesceDF.rdd.groupBy(row => row.getAs[String]("pseudo_session"))
//计算两者之差,这时候就得到了 session_id
val sessionIdDF: DataFrame = groupRDD.map(sessionProcess.computeSessionId).flatMap(it => it)
.toDF("session_id", "user_id", "mobile", "device_token", "user_token",
"view_class", "view_path", "action_type", "component_tag",
"menu_code", "action_code", "position", "label_value","label_class",
"app_version", "device_type", "device_brand", "device_model",
"device_system", "net_type", "created_time", "date_time")
sessionIdDF
}
/**
* @Description 匹配user_id,补全数据中的user_id字段
* @param dataFrame 筛选后的数据
* @param sparkSQLSession SparkSession 环境
* @param created_day 当前数据的日期,格式 "2020-03-01"
* @return org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>
**/
def matchUserId(dataFrame: DataFrame,sparkSQLSession: SparkSession,created_day:String):DataFrame={
//追加:将dataFrame与pica_ds.pica_doctor根据user_id进行匹配,匹配不上的user_id置为'0'
println("matchUserId开始执行-----------------------------------")
dataFrame.createOrReplaceTempView(MyConfigSession.VIEW_SESSION_ODS)
val DF = sparkSQLSession.sql(MyConfigSession.INIT_USER_ID_SQL)
//以下的所有逻辑是为了补全user_id字段
//第一步:首先筛选出不符合的use_id数据
val noMatchUserIdDF: Dataset[Row] = DF.where("user_id ='' OR user_id = '0' OR LENGTH(user_id) = 24")
.selectExpr("session_id","'0' as user_id", "mobile", "device_token", "user_token",
"view_class", "view_path", "action_type", "component_tag",
"menu_code", "action_code", "position", "label_value","label_class",
"app_version", "device_type", "device_brand", "device_model",
"device_system", "net_type", "created_time", "date_time")
noMatchUserIdDF.createOrReplaceTempView(MyConfigSession.VIEW_SESSION_NO_MATCH)
//1.筛选出上一步没有匹配到的user_id,先按照mobile_phone进行匹配
val mobilePhoneDF: DataFrame = sparkSQLSession.sql(MyConfigSession.MOBILE_PHONE_SQL)
mobilePhoneDF.createOrReplaceTempView(MyConfigSession.VIEW_MOBILE_PHONE)
//2.使用临时表equiment,筛选出为1的那条最新数据
val equipmentDF: DataFrame = sparkSQLSession.sql(MyConfigSession.EQUIPMENT_INFO_SQL_ARGS + s"'${created_day}'").where("row_d =1")
equipmentDF.createOrReplaceTempView(MyConfigSession.VIEW_EQUIPMENT_INFO)
//3.将第2步筛选出来的数据按照device_token进行匹配,获得user_id
val deviceTokenDF: DataFrame = sparkSQLSession.sql(MyConfigSession.DEVICE_TOKEN_SQL)
//4.将上述三者union,最终导入表中的数据
val rightUserId: Dataset[Row] = DF.where("user_id !='' and user_id != '0' and LENGTH(user_id) !=24")
val mobilePhoneResDF: Dataset[Row] = mobilePhoneDF.where("user_id !='0'")
val dwFactLogSession: Dataset[Row] = rightUserId.union(mobilePhoneResDF).union(deviceTokenDF)
dwFactLogSession.createOrReplaceTempView(MyConfigSession.VIEW_MOBILE_PHONE)
//根据pica_doctor补充user_id_int字段(字段类型转换成int型), 限制 delete_flag = 1 and creat_time截止昨日创建,未关联上显示为0
val USER_ID_INT_SQL:String=
s"""
|SELECT concat(regexp_replace( '${created_day}',"-","") ,cast(row_number() over(partition by 1 order by created_time) as string)) as id,
|ss.session_id, ss.user_id,COALESCE(b.id,0) user_id_int, ss.mobile, ss.device_token, ss.user_token,
|ss.view_class,ss.view_path,ss.action_type,ss.component_tag, ss.menu_code,
|ss.action_code, ss.position,ss.label_value,ss.label_class,ss.app_version, ss.device_type,
|ss.device_brand, ss.device_model,ss.device_system,ss.net_type,ss.created_time,
|ss.date_time from ${MyConfigSession.VIEW_DEVICE_TOKEN} AS ss
|left join (select id,cast(id as string) id_str from pica_ds.pica_doctor a where a.delete_flag = 1 and to_date(a.creat_time) <= '${created_day}') AS b on ss.user_id = b.id_str
|""".stripMargin
val userIdDF: DataFrame = sparkSQLSession.sql(USER_ID_INT_SQL)
userIdDF
}
} }
...@@ -2,13 +2,19 @@ package com.session ...@@ -2,13 +2,19 @@ package com.session
import java.sql import java.sql
import java.sql.PreparedStatement import java.sql.PreparedStatement
import com.config.MyConfigSession import com.config.MyConfigSession
import com.pica.utils.DateUtils import com.pica.utils.{DateUtils, StringUtils}
import com.utils.{JDBCUtil, UseUtil} import com.utils.{JDBCUtil, UseUtil}
import org.apache.spark.SparkConf import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.expressions.{Window, WindowSpec} import org.apache.spark.sql.expressions.{Window, WindowSpec}
import org.apache.spark.sql.functions.{lag, row_number} import org.apache.spark.sql.functions.{lag, row_number}
import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import scala.collection.mutable.ListBuffer
import scala.util.control.Breaks.{break, breakable}
/** /**
...@@ -39,7 +45,11 @@ object SessionProcessPath { ...@@ -39,7 +45,11 @@ object SessionProcessPath {
|values(1968,'pica_dw.dw_fact_log_session_path','3',?,'0',?) |values(1968,'pica_dw.dw_fact_log_session_path','3',?,'0',?)
""".stripMargin """.stripMargin
//设置同步数据的批次号,格式是2019-09-12 //设置同步数据的批次号,格式是2019-09-12
val scnData: String = DateUtils.getYesterdayDate var scnData: String = DateUtils.getYesterdayDate
if(args.length>=1){
scnData = args(0)
}
println(s"scnData=${scnData}")
//设置任务开始时间,格式是2019-09-12 14:03:30 //设置任务开始时间,格式是2019-09-12 14:03:30
val startTime: String = DateUtils.getTodayTime val startTime: String = DateUtils.getTodayTime
//存储SQL中的参数 //存储SQL中的参数
...@@ -50,25 +60,75 @@ object SessionProcessPath { ...@@ -50,25 +60,75 @@ object SessionProcessPath {
val flag: Int = JDBCUtil.insertRecord(connSql, insertSQL, insertArr) val flag: Int = JDBCUtil.insertRecord(connSql, insertSQL, insertArr)
try { try {
val sparkSession: SparkSession = SessionProcessPath().getSparkSession("SessionProcessPath") val sparkSession: SparkSession = SessionProcessPath().getSparkSession("SessionProcessPath")
//获取position对应的label_value广播变量
val positionUrlLabelBroad = UseUtil.getBroadcast(sparkSession, MyConfigSession.ACTION_URLLABEL_SQL, "url_content", "label_value")
println(s"positionUrlLabelBroad=${positionUrlLabelBroad.value}")
//筛选源数据 //筛选源数据
val sourceDF: DataFrame = sparkSession.sql(MyConfigSession.SOURCE_SQL_PATH) val sourceDF: DataFrame = sparkSession.sql(MyConfigSession.SOURCE_SQL_PATH+s" and created_day='${scnData}'")
sourceDF.show()
//注册日期在流量统计日期之前的用户
val doctorDF: DataFrame = sparkSession.sql(
"select id from pica_ds.pica_doctor where to_date(creat_time) <=DATE_SUB(current_date(),1)")
sourceDF.join(doctorDF, sourceDF("user_id") === doctorDF("id"), "left")
.createOrReplaceTempView("tmp_table")
//将id为null的记录设置为0
val reSql: String = "select session_id,case when id is null then 0 else user_id END as user_id,action_type," +
"user_token,menu_code,action_code,position,label_value,app_version,device_type,created_time,date_time from tmp_table"
val selectDF: DataFrame = sparkSession.sql(reSql)
println("-----------------------------------compute refer columns-----------------------------------------") println("-----------------------------------compute refer columns-----------------------------------------")
val resultDF: DataFrame = getReferColumns(selectDF,sparkSession) val referResDF: DataFrame = getReferColumns(sourceDF,sparkSession)
println("referResDF.printSchema()")
referResDF.printSchema()
println("------------------------------------单独计算label_value----------------------------------------------")
//"menu_code = '930' and action_code IN ( '930000', '930001', '930002' ) and action_type = 'ACTION_CLICK'
val newLabelRdd: RDD[Row] = referResDF.where("menu_code = '930'").rdd.mapPartitions(rows=>{
// val rowList: ListBuffer[(String,String,Integer,String,String,String,String,String,String,String,String,String,String,String,
// Integer,Integer,String,String,String,String )] = new ListBuffer()
val rowList: ListBuffer[Row]= new ListBuffer[Row]()
val positionLabelMap: Map[String, String] = positionUrlLabelBroad.value
rows.toList.foreach(row=>{
val action_code = row.getAs[String]("action_code")
val action_type = row.getAs[String]("action_type")
val position = row.getAs[String]("position")
var label_value = ""
if(List("930000","930001","930002" ).contains(action_code) && "ACTION_CLICK".equals(action_type)){
breakable {
//利用position url_content映射表匹配
for (tuple <- positionLabelMap) {
if (StringUtils.getNotNullString(position).contains(tuple._1)) {
//满足条件后,修改源数据的label_value
label_value = tuple._2
println("--------------------menu_code match successfully-----------------------")
//结束遍历
break()
}
}
}
}
//经过上述匹配,如果label_value仍然为空串,那么置为原始值
if (label_value.equals("")) {
label_value = row.getAs[String]("label_value")
}
rowList.append( Row(
StringUtils.getNotNullString(row.getAs[String]("log_session_id")),
StringUtils.getNotNullString(row.getAs[String]("session_id")),
row.getAs[Integer]("user_id"),action_type,
StringUtils.getNotNullString(row.getAs[String]("user_token")),
StringUtils.getNotNullString(row.getAs[String]("menu_code")),
StringUtils.getNotNullString(row.getAs[String]("action_code")),
StringUtils.getNotNullString(row.getAs[String]("position")),
label_value,
StringUtils.getNotNullString(row.getAs[String]("label_class")),
StringUtils.getNotNullString(row.getAs[String]("refer_menu_code")),
StringUtils.getNotNullString(row.getAs[String]("refer_action_code")),
StringUtils.getNotNullString(row.getAs[String]("refer_position")),
StringUtils.getNotNullString(row.getAs[String]("refer_action_type")),
StringUtils.getNotNullString(row.getAs[String]("refer_created")),
row.getAs[Integer]("step_id"),
StringUtils.getNotNullString(row.getAs[String]("app_version")),
StringUtils.getNotNullString(row.getAs[String]("device_type")),
StringUtils.getNotNullString(row.getAs[String]("created_time")),
StringUtils.getNotNullString(row.getAs[String]("date_time")),
row.getAs[Double]("refer_time_diff")
// StringUtils.getNotNullString(row.getAs[String]("module_class1")),
// StringUtils.getNotNullString(row.getAs[String]("module_class2"))
))
})
rowList.iterator
})
val resultDF = sparkSession.createDataFrame(newLabelRdd,referResDF.schema).union(referResDF.where("menu_code != '930'"))
println("-----------------------------------load data to pica_dw.dw_fact_log_session_path-----------------") println("-----------------------------------load data to pica_dw.dw_fact_log_session_path-----------------")
loadData(resultDF,sparkSession,scnData) loadData(resultDF,sparkSession,scnData)
...@@ -128,7 +188,7 @@ object SessionProcessPath { ...@@ -128,7 +188,7 @@ object SessionProcessPath {
//去掉refer字段中的NULL值 //去掉refer字段中的NULL值
val coaleseDF: DataFrame = rowNumberDF.selectExpr( val coaleseDF: DataFrame = rowNumberDF.selectExpr(
"session_id", "user_id", "action_type", "user_token", "menu_code", "action_code", "position", "label_value", "log_session_id","session_id", "user_id", "action_type", "user_token", "menu_code", "action_code", "position", "label_value","label_class",
"COALESCE(refer_menu_code,'') as refer_menu_code", "COALESCE(refer_menu_code,'') as refer_menu_code",
"COALESCE(refer_action_code,'') as refer_action_code", "COALESCE(refer_action_code,'') as refer_action_code",
"COALESCE(refer_position,'') as refer_position", "COALESCE(refer_position,'') as refer_position",
...@@ -156,10 +216,11 @@ object SessionProcessPath { ...@@ -156,10 +216,11 @@ object SessionProcessPath {
val loadDataSql = val loadDataSql =
s""" s"""
|insert overwrite table ${MyConfigSession.HIVE_TABLE2} partition(created_day='${partitionDay}') |insert overwrite table ${MyConfigSession.HIVE_TABLE2} partition(created_day='${partitionDay}')
| select session_id,user_id,action_type,user_token,menu_code,action_code,position,label_value, | select log_session_id, session_id,user_id,action_type,user_token,menu_code,action_code,position,label_value,label_class,
| refer_menu_code,refer_action_code,refer_position,refer_action_type, | refer_menu_code,refer_action_code,refer_position,refer_action_type,
| cast(refer_time_diff as int) as refer_time_diff,step_id,app_version,device_type,created_time,date_time | cast(refer_time_diff as int) as refer_time_diff,
| from result_view | step_id,app_version,device_type,created_time,date_time,'' module_class1,'' module_class2
| from result_view distribute by rand()
""".stripMargin """.stripMargin
sparkSession.sql(loadDataSql) sparkSession.sql(loadDataSql)
} }
......
...@@ -53,28 +53,28 @@ object SessionProcessPathArgs { ...@@ -53,28 +53,28 @@ object SessionProcessPathArgs {
//筛选源数据 //筛选源数据
val sourceSql = val sourceSql =
s""" s"""
|select session_id,cast(user_id as int) user_id,action_type,user_token,menu_code,action_code,position,label_value, |select session_id,user_id_int user_id,action_type,user_token,menu_code,action_code,position,label_value,label_class,
|app_version,device_type,created_time,date_time from ${MyConfigSession.HIVE_TABLE1} |app_version,device_type,created_time,date_time from ${MyConfigSession.HIVE_TABLE1_TMP}
| where created_day='${args(0)}' and app_version >= '3.1.7' and menu_code !='null' and menu_code !='' | where created_day='${args(0)}' and app_version >= '3.1.7' and menu_code !='null' and menu_code !=''
| and ((action_type ='ACTION_VIEW' and menu_code != '0') or (action_type ='ACTION_CLICK' and action_code !='')) | and ((action_type ='ACTION_VIEW' and menu_code != '0') or (action_type ='ACTION_CLICK' and action_code !=''))
""".stripMargin """.stripMargin
val sourceDF: DataFrame = sparkSession.sql(sourceSql) val sourceDF: DataFrame = sparkSession.sql(sourceSql)
//注册日期在流量统计日期之前的用户 //注册日期在流量统计日期之前的用户
val doctorDF: DataFrame = sparkSession.sql( // val doctorDF: DataFrame = sparkSession.sql(
"select id from pica_ds.pica_doctor where to_date(creat_time) <=DATE_SUB(current_date(),1)") // "select id from pica_ds.pica_doctor where to_date(creat_time) <=DATE_SUB(current_date(),1)")
//
sourceDF.join(doctorDF, sourceDF("user_id") === doctorDF("id"), "left") // sourceDF.join(doctorDF, sourceDF("user_id") === doctorDF("id"), "left")
.createOrReplaceTempView("tmp_table") // .createOrReplaceTempView("tmp_table")
//将id为null的记录设置为0 //将id为null的记录设置为0
val reSql: String = "select session_id,case when id is null then 0 else user_id END as user_id,action_type," + // val reSql: String = "select session_id,case when id is null then 0 else user_id END as user_id,action_type," +
"user_token,menu_code,action_code,position,label_value,app_version,device_type,created_time,date_time from tmp_table" // "user_token,menu_code,action_code,position,label_value,app_version,device_type,created_time,date_time from tmp_table"
val selectDF: DataFrame = sparkSession.sql(reSql) // val selectDF: DataFrame = sparkSession.sql(reSql)
println("-----------------------------------compute refer columns-----------------------------------------") println("-----------------------------------compute refer columns-----------------------------------------")
val resultDF: DataFrame = getReferColumns(selectDF,sparkSession) val resultDF: DataFrame = getReferColumns(sourceDF,sparkSession)
println("-----------------------------------load data to pica_dw.dw_fact_log_session_path-----------------") println("-----------------------------------load data to pica_dw.dw_fact_log_session_path-----------------")
loadData(resultDF,sparkSession,scnData) loadData(resultDF,sparkSession,scnData)
...@@ -136,7 +136,7 @@ object SessionProcessPathArgs { ...@@ -136,7 +136,7 @@ object SessionProcessPathArgs {
//去掉refer字段中的NULL值 //去掉refer字段中的NULL值
val coaleseDF: DataFrame = rowNumberDF.selectExpr( val coaleseDF: DataFrame = rowNumberDF.selectExpr(
"session_id", "user_id", "action_type", "user_token", "menu_code", "action_code", "position", "label_value", "log_session_id","session_id", "user_id", "action_type", "user_token", "menu_code", "action_code", "position", "label_value","label_class",
"COALESCE(refer_menu_code,'') as refer_menu_code", "COALESCE(refer_menu_code,'') as refer_menu_code",
"COALESCE(refer_action_code,'') as refer_action_code", "COALESCE(refer_action_code,'') as refer_action_code",
"COALESCE(refer_position,'') as refer_position", "COALESCE(refer_position,'') as refer_position",
...@@ -163,10 +163,11 @@ object SessionProcessPathArgs { ...@@ -163,10 +163,11 @@ object SessionProcessPathArgs {
dataFrame.createOrReplaceTempView("result_view") dataFrame.createOrReplaceTempView("result_view")
val loadDataSql = val loadDataSql =
s""" s"""
|insert overwrite table ${MyConfigSession.HIVE_TABLE2} partition(created_day='${partitionDay}') |insert overwrite table ${MyConfigSession.HIVE_TABLE2_TMP} partition(created_day='${partitionDay}')
| select session_id,user_id,action_type,user_token,menu_code,action_code,position,label_value, | select log_session_id,session_id,user_id,action_type,user_token,menu_code,action_code,position,label_value,label_class,
| refer_menu_code,refer_action_code,refer_position,refer_action_type, | refer_menu_code,refer_action_code,refer_position,refer_action_type,
| cast(refer_time_diff as int) as refer_time_diff,step_id,app_version,device_type,created_time,date_time | cast(refer_time_diff as int) as refer_time_diff,
| step_id,app_version,device_type,created_time,date_time,'' module_class1,'' module_class2
| from result_view | from result_view
""".stripMargin """.stripMargin
sparkSession.sql(loadDataSql) sparkSession.sql(loadDataSql)
......
package com.session
import java.sql
import java.sql.PreparedStatement
import com.config.MyConfigSession
import com.pica.utils.DateUtils
import com.utils.{JDBCUtil, UseUtil}
import org.apache.spark.SparkConf
import org.apache.spark.sql.expressions.{Window, WindowSpec}
import org.apache.spark.sql.functions.{lag, row_number}
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* 处理昨天的数据,导入到pica_dw.dw_fact_log_session_path表
* @Author zhenxin.ma
* @Date 2020/3/27 10:58
* @Version 1.0
*/
class SessionProcessTerm {
def getSparkSession(appName: String): SparkSession = {
val conf: SparkConf = new SparkConf().setAppName(appName)
UseUtil.setConfigure(conf)
val sparkSession: SparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
sparkSession
}
}
object SessionProcessTerm {
def apply(): SessionProcessTerm = new SessionProcessTerm()
def main(args: Array[String]): Unit = {
//1.执行任务之前先往record表记录
val insertSQL: String =
s"""
|insert into ${MyConfigSession.DATA_BASE}.${MyConfigSession.JDBC_TABLE} (job_id,job_name,job_type,job_scn,status,start_time)
|values(1968,'pica_dw.dw_fact_log_session_path','3',?,'0',?)
""".stripMargin
//设置同步数据的批次号,格式是2019-09-12
val scnData: String = DateUtils.getYesterdayDate
//设置任务开始时间,格式是2019-09-12 14:03:30
val startTime: String = DateUtils.getTodayTime
//存储SQL中的参数
val insertArr: Array[String] = Array[String](scnData, startTime)
//获取MYSQL连接
val connSql: sql.Connection = JDBCUtil.getConnection()
//向 record 表插入数据
val flag: Int = JDBCUtil.insertRecord(connSql, insertSQL, insertArr)
try {
val sparkSession: SparkSession = SessionProcessTerm().getSparkSession("SessionProcessTerm")
//筛选源数据
val sourceDF: DataFrame = sparkSession.sql(MyConfigSession.SOURCE_SQL_PATH)
//注册日期在流量统计日期之前的用户
val doctorDF: DataFrame = sparkSession.sql(
"select id from pica_ds.pica_doctor where to_date(creat_time) <=DATE_SUB(current_date(),1)")
sourceDF.join(doctorDF, sourceDF("user_id") === doctorDF("id"), "left")
.createOrReplaceTempView("tmp_table")
//将id为null的记录设置为0
val reSql: String = "select session_id,case when id is null then 0 else user_id END as user_id,action_type," +
"user_token,menu_code,action_code,position,label_value,app_version,device_type,created_time,date_time from tmp_table"
val selectDF: DataFrame = sparkSession.sql(reSql)
println("-----------------------------------compute refer columns-----------------------------------------")
val resultDF: DataFrame = getReferColumns(selectDF,sparkSession)
println("-----------------------------------load data to pica_dw.dw_fact_log_session_path-----------------")
loadData(resultDF,sparkSession,scnData)
println("----------------------------------update task record table---------------------------------------")
//任务执行成功,更新 Mysql record 配置表
val updateSQL: String =
s"""
|update ${MyConfigSession.JDBC_TABLE} set status=?,end_time=?,data_count=? where job_id=1968 and start_time='${startTime}'
""".stripMargin
val upreSta: PreparedStatement = connSql.prepareStatement(updateSQL)
upreSta.setString(1, "1")
upreSta.setString(2, DateUtils.getTodayTime)
upreSta.setInt(3, resultDF.count().toInt)
//更新表数据
upreSta.executeUpdate()
//关闭连接
JDBCUtil.close(connSql, upreSta)
sparkSession.stop()
}catch {
case e:Exception => {
println("-----------------------------------任务异常---------------------------------------------------")
val exceptionSQL: String =
s"""
|update ${MyConfigSession.JDBC_TABLE} set status=?,exception=?,end_time=? where job_id=1968 and start_time='${startTime}'
""".stripMargin
val errorArr = Array[String]("2", e.getMessage, DateUtils.getTodayTime)
JDBCUtil.insertRecord(connSql, exceptionSQL, errorArr)
connSql.close()
}
}
}
/**
* @Description 获取需要的字段的refer字段
* @param dataFrame 源数据
* @param sparkSession SparkSession 环境
* @return org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>
**/
def getReferColumns(dataFrame: DataFrame ,sparkSession: SparkSession):DataFrame = {
//先按照 session_id分区,再按照 created_time排序,进行窗口计算
val sessionIDWinSpec: WindowSpec = Window.partitionBy("session_id").orderBy("created_time")
//增叫refer_字段
val menuDF: DataFrame =
dataFrame.withColumn("refer_menu_code", lag(dataFrame("menu_code"), 1).over(sessionIDWinSpec))
val acodeDF: DataFrame =
menuDF.withColumn("refer_action_code", lag(menuDF("action_code"), 1).over(sessionIDWinSpec))
val positionDF: DataFrame =
acodeDF.withColumn("refer_position", lag(acodeDF("position"), 1).over(sessionIDWinSpec))
val actypeDF: DataFrame =
positionDF.withColumn("refer_action_type", lag(positionDF("action_type"), 1).over(sessionIDWinSpec))
val recreatDF: DataFrame =
actypeDF.withColumn("refer_created", lag(actypeDF("created_time"), 1).over(sessionIDWinSpec))
val rowNumberDF: DataFrame =
recreatDF.withColumn("step_id", row_number().over(sessionIDWinSpec))
//去掉refer字段中的NULL值
val coaleseDF: DataFrame = rowNumberDF.selectExpr(
"session_id", "user_id", "action_type", "user_token", "menu_code", "action_code", "position", "label_value",
"COALESCE(refer_menu_code,'') as refer_menu_code",
"COALESCE(refer_action_code,'') as refer_action_code",
"COALESCE(refer_position,'') as refer_position",
"COALESCE(refer_action_type,'') as refer_action_type",
"COALESCE(refer_created,created_time) as refer_created",
"step_id", "app_version", "device_type", "created_time", "date_time")
//在此基础上增加字段 refer_time_diff,值为 created_time, refer_created 之差
val referTimeDiff: DataFrame =
coaleseDF.withColumn("refer_time_diff", coaleseDF("created_time") - coaleseDF("refer_created"))
referTimeDiff
}
/**
* @Description 导入数据到表中
* @param dataFrame 源数据
* @param sparkSession SparkSession 环境
* @param partitionDay 分区日期
* @return void
**/
def loadData(dataFrame: DataFrame, sparkSession: SparkSession, partitionDay:String):Unit = {
dataFrame.createOrReplaceTempView("result_view")
val loadDataSql =
s"""
|insert overwrite table ${MyConfigSession.HIVE_TABLE2} partition(created_day='${partitionDay}')
| select session_id,user_id,action_type,user_token,menu_code,action_code,position,label_value,
| refer_menu_code,refer_action_code,refer_position,refer_action_type,
| cast(refer_time_diff as int) as refer_time_diff,step_id,app_version,device_type,created_time,date_time
| from result_view
""".stripMargin
sparkSession.sql(loadDataSql)
}
}
Markdown 格式
0% or
您添加了 0 到此讨论。请谨慎行事。
先完成此消息的编辑!
想要评论请 注册