提交 500cc55e 编写于 作者: wuyunfeng's avatar wuyunfeng

流量与时长统计0.9.4版本

上级 b98a80ff
......@@ -9,12 +9,14 @@ package com.config
*/
object MyConfigSession {
//Hive的DW层流量表
final val HIVE_TABLE0: String = "pica_dw.dw_fact_log_session_pref"
final val HIVE_TABLE1_TMP: String = "pica_dw.dw_fact_log_session_tmp"
final val HIVE_TABLE2_TMP: String = "pica_dw.dw_fact_log_session_path_tmp"
final val HIVE_TABLE1: String = "pica_dw.dw_fact_log_session"
final val HIVE_TABLE2: String = "pica_dw.dw_fact_log_session_path"
final val HIVE_TABLE3: String = "pica_dw.dw_fact_log_session_heart"
final val HIVE_TABLE4: String = "pica_dw.dw_fact_log_session_pref"
final val HIVE_TABLE4: String = "pica_dw.dw_fact_log_session_term"
final val HIVE_TABLE4_MID: String = "pica_dw.dw_fact_log_session_term_mid"
//写入的文件路径
final val PARQUET_PATH: String = "hdfs://bi-name1:8020/tmp/output/"
......@@ -43,7 +45,7 @@ object MyConfigSession {
| from pica_log.picalog_trace_app_part
| where pseudo_session is not null and pseudo_session !=''
| and pseudo_id !='' and extra_info !='com.picahealth.patient' and serviceName != 'trace3'
| and FROM_UNIXTIME(cast(substring(created,1,10) as bigint),'yyyy-MM-dd')=created_day and created is not null and created!=''
| and created is not null and created!='' and FROM_UNIXTIME(cast(substring(created,1,10) as bigint),'yyyy-MM-dd')=created_day
""".stripMargin
//从源表pica_log.picalog_trace_app_part中执行SQL获取源数据,这里获取昨天的
final val SOURCE_SQL: String =
......@@ -57,14 +59,13 @@ object MyConfigSession {
//从源表pica_log.picalog_trace_app_part中执行SQL获取源数据,输入指定日期参数时执行的SQL
final val SOURCE_SQL_ARGS: String =
"""
|select pseudo_session,doctor_id,mobile,device_token,user_token_tourist,class_name,view_path,action,
|component_tag,app_version,device_type,device_brand,device_model,network_type,created from pica_log.picalog_trace_app_part
| where pseudo_session is not null and pseudo_session !=''
| and pseudo_id !='' and extra_info !='com.picahealth.patient' and serviceName != 'trace3'
| and created is not null and created!='' and `action`!='ACTION_EQUIP_INFO'
""".stripMargin
final val SOURCE_SQL_FROM_PREF: String =
s"""
|select pseudo_session,cast(user_id_int as string) user_id,mobile,device_token,user_token , view_class ,view_path,action,action_type,
|component_tag,menu_code,action_code,`position`,label_value,label_class,app_version,device_type,device_brand,device_model,"" device_system,
|net_type,created_time,date_time from ${HIVE_TABLE0}
| where 1=1 and `action`!='ACTION_EQUIP_INFO'
""".stripMargin
//从dw_fact_log_session表中筛选数据
......
......@@ -10,6 +10,7 @@ import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.expressions.{Window, WindowSpec}
import org.apache.spark.sql.functions.lag
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.storage.StorageLevel
......@@ -36,7 +37,7 @@ object SessionProcessHeart {
val insertSQL: String =
s"""
|insert into ${MyConfigSession.DATA_BASE}.${MyConfigSession.JDBC_TABLE} (job_id,job_name,job_type,job_scn,status,start_time)
|values(0,'pica_dw.dw_fact_log_session_heart','3',?,'0',?)
|values(0,'${MyConfigSession.HIVE_TABLE3}','3',?,'0',?)
""".stripMargin
//设置同步数据的批次号,格式是2019-09-12
val scnDate: String = args(0)
......@@ -51,70 +52,94 @@ object SessionProcessHeart {
try {
val sessionProcessHeart: SessionProcessHeart = SessionProcessHeart()
//获取源数据
val sourceDF: DataFrame = sessionProcessHeart.sparkSession.sql(MyConfigSession.SOURCE_SQL_ARGS + s" and created_day='${scnDate}'")
val sourceDF: DataFrame = sessionProcessHeart.sparkSession.sql(MyConfigSession.SOURCE_SQL_FROM_PREF + s" and created_day='${scnDate}'")
//1.重新分区,产生shuffle,Spark读Hive默认的分区数太少
//2.过滤数据,解析创建时间,只获取昨天产生的数据
//3.过滤重复的记录
//4.利用action_type和class_name过滤
val filterDS: Dataset[Row] = sourceDF.repartition(200).filter(row => {
var createdTime: String = row.getAs[String]("created")
//防止出错
if (createdTime == null) {
createdTime = "0"
}
//注意这里的过滤条件,数据的批次时间要和数据产生的年月日一样,也就是当天的数据
scnDate.equals(DateUtils.milliSecondsFormatTime(createdTime).substring(0, 10))
}).distinct()
// val filterDS: Dataset[Row] = sourceDF.repartition(200).filter(row => {
// var createdTime: String = row.getAs[String]("created")
// //防止出错
// if (createdTime == null) {
// createdTime = "0"
// }
// //注意这里的过滤条件,数据的批次时间要和数据产生的年月日一样,也就是当天的数据
// scnDate.equals(DateUtils.milliSecondsFormatTime(createdTime).substring(0, 10))
// }).distinct()
import sessionProcessHeart.sparkSession.implicits._
//根据映射表来进行action_type和class_name数据过滤
val data: RDD[Row] = filterDS.rdd.mapPartitions(sessionProcessHeart.filterRows)
val data: RDD[Row] = sourceDF.rdd.mapPartitions(sessionProcessHeart.filterRows)
sourceDF.printSchema()
println("---------------------------------------process columns-------------------------------------------")
val baseDF: DataFrame = data.mapPartitions(sessionProcessHeart.processColumns)
.toDF("pseudo_session", "user_id", "mobile", "device_token", "user_token", "view_class", "view_path",
"action_type", "component_tag", "menu_code", "action_code", "position", "label_value", "label_class", "app_version",
"device_type", "device_brand", "device_model", "device_system", "net_type", "created_time", "date_time")
// val baseDF: DataFrame = data.mapPartitions(sessionProcessHeart.processColumns)
val baseDF: DataFrame = sessionProcessHeart.sparkSession.createDataFrame(data,StructType(
List(StructField("pseudo_session", StringType, false),
StructField("user_id", StringType, false),
StructField("mobile", StringType, false),
StructField("device_token", StringType, false),
StructField("user_token", StringType, false),
StructField("view_class", StringType, false),
StructField("view_path", StringType, false),
StructField("action", StringType, false),
StructField("action_type", StringType, false),
StructField("component_tag", StringType, false),
StructField("menu_code", StringType, false),
StructField("action_code", StringType, false),
StructField("position", StringType, false),
StructField("label_value", StringType, false),
StructField("label_class", StringType, false),
StructField("app_version", StringType, false),
StructField("device_type", StringType, false),
StructField("device_brand", StringType, false),
StructField("device_model", StringType, false),
StructField("device_system", StringType, false),
StructField("net_type", StringType, false),
StructField("created_time", StringType, false),
StructField("date_time", StringType, false))
))
// val baseDF: DataFrame = data.toDF("pseudo_session", "user_id", "mobile", "device_token", "user_token", "view_class", "view_path",
// "action_type", "component_tag", "menu_code", "action_code", "position", "label_value", "label_class", "app_version",
// "device_type", "device_brand", "device_model", "device_system", "net_type", "created_time", "date_time")
baseDF.printSchema()
println("----------------------------------compute session id---------------------------------------------")
val sessionIdDF: DataFrame = sessionProcessHeart.getSessionId(baseDF )
//默认缓存级别是:MEMORY_AND_DISK
sessionIdDF.persist(StorageLevel.MEMORY_AND_DISK_SER)
println("-------------------------------match user_id 逻辑-------------------------------------------------")
val dwFactLogSession: DataFrame = sessionProcessHeart.matchUserId(sessionIdDF, scnDate)
println("-----------------create view fact_log_session and load to dw_fact_log_session--------------------")
dwFactLogSession.createOrReplaceTempView("fact_log_session")
//根据session_id以及user_id分组取最后一次心跳记录数据进行入库
val loadDataSql =
// println("-------------------------------match user_id 逻辑-------------------------------------------------")
// val dwFactLogSession: DataFrame = sessionProcessHeart.matchUserId(sessionIdDF, scnDate)
sessionIdDF.printSchema()
println("-----------------create view fact_log_session and load to dw_fact_log_session_heart--------------------")
sessionIdDF.createOrReplaceTempView("fact_log_session")
val resSql =
s"""
insert overwrite table ${MyConfigSession.HIVE_TABLE3} partition(created_day='${scnDate}')
select a.session_id,cast(a.user_id as int) user_id,a.mobile,a.device_token,a.user_token,
|a.app_version,a.device_type,a.device_brand,a.device_model,a.date_time
|from fact_log_session a,
| (select b.user_id, b.session_id , min(b.created_time) min_ct, max(b.created_time) max_ct
| from fact_log_session b
| group by b.user_id, b.session_id ) c
|where a.user_id = c.user_id and a.session_id = c.session_id and a.created_time = c.max_ct
|distribute by rand()
|""".stripMargin
|a.app_version,a.device_type,a.device_brand,a.device_model,a.date_time
|from fact_log_session a,
| (select b.user_id, b.session_id , min(b.created_time) min_ct, max(b.created_time) max_ct
| from fact_log_session b
| group by b.user_id, b.session_id ) c
|where a.user_id = c.user_id and a.session_id = c.session_id and a.created_time = c.max_ct
|distribute by rand()
|""".stripMargin
val resDF = sessionProcessHeart.sparkSession.sql(resSql)
resDF.createOrReplaceTempView("session_heart")
//根据session_id以及user_id分组取最后一次心跳记录数据进行入库
val loadDataSql =s"insert overwrite table ${MyConfigSession.HIVE_TABLE3} partition(created_day='${scnDate}') select * from session_heart "
sessionProcessHeart.sparkSession.sql(loadDataSql)
println("----------------------------------update task record table---------------------------------------")
//任务执行成功,更新 Mysql record 配置表
val updateSQL: String =
s"""
|update ${MyConfigSession.JDBC_TABLE} set status=?,end_time=?,data_count=? where job_name='pica_dw.dw_fact_log_session_heart' and start_time='${startTime}'
|update ${MyConfigSession.JDBC_TABLE} set status=?,end_time=?,data_count=? where job_name='${MyConfigSession.HIVE_TABLE3}' and start_time='${startTime}'
""".stripMargin
val endTime: String = DateUtils.getTodayTime
val upreSta: PreparedStatement = connSql.prepareStatement(updateSQL)
upreSta.setString(1, "1")
upreSta.setString(2, endTime)
upreSta.setInt(3, dwFactLogSession.count().toInt)
upreSta.setInt(3, resDF.count().toInt)
//更新表数据
upreSta.executeUpdate()
//关闭连接
......@@ -126,7 +151,7 @@ object SessionProcessHeart {
e.printStackTrace()
val exceptionSQL: String =
s"""
|update ${MyConfigSession.JDBC_TABLE} set status=?,exception=?,end_time=? where job_name='pica_dw.dw_fact_log_session_heart' and start_time='${startTime}'
|update ${MyConfigSession.JDBC_TABLE} set status=?,exception=?,end_time=? where job_name='${MyConfigSession.HIVE_TABLE3}' and start_time='${startTime}'
""".stripMargin
val errorArr = Array[String]("2", e.getMessage, DateUtils.getTodayTime)
JDBCUtil.insertRecord(connSql, exceptionSQL, errorArr)
......@@ -196,15 +221,15 @@ class SessionProcessHeart extends java.io.Serializable{
val actionTypeMap: Map[String, String] = actionTypeBroad.value
val classNameMap: Map[String, String] = classNameBroad.value
//关联到action_category的映射表广播变量
val actionCategoryMap: Map[String, String] = actionCategory.value
// val actionCategoryMap: Map[String, String] = actionCategory.value
rows.toList.foreach(row => {
//筛选action的条件
val action: String = StringUtils.getNotNullString(row.getAs[String]("action"))
//说明该action类型即为所要的
if (actionTypeMap.getOrElse(action, "-1").equals("1")) {
val action_type: String = StringUtils.getNotNullString(row.getAs[String]("action_type"))
//将action转化为映射表中对应的标准actionCategory
val action_type: String = actionCategoryMap.getOrElse(action,"ACTION")
// val action_type = UseUtil.getActionType(action)
// val action_type: String = actionCategoryMap.getOrElse(action,"ACTION")
//action为其中的任何一个
if (action_type.equals("ACTION_CLICK") || action_type.equals("ACTION_EXPOSE")) {
//判断 component_tag 必须要包含 "#"
......@@ -215,8 +240,8 @@ class SessionProcessHeart extends java.io.Serializable{
} else if (action_type.equals("ACTION_VIEW")) {
rowList += row
//非上述三种action_type,那么需要过滤掉映射表中class_name为"0"对应的那些数据
} else if (row.getAs[String]("class_name") != null
&& !classNameMap.getOrElse(row.getAs[String]("class_name"), "-1").equals("0")) {
} else if (row.getAs[String]("view_class") != null
&& !classNameMap.getOrElse(row.getAs[String]("view_class"), "-1").equals("0")) {
rowList += row
}
}
......
......@@ -7,370 +7,441 @@ import com.config.MyConfigSession
import com.pica.utils.{DateUtils, StringUtils}
import com.utils.{JDBCUtil, UseUtil}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.expressions.{Window, WindowSpec}
import org.apache.spark.sql.functions.lag
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.storage.StorageLevel
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
/**
* 处理埋点数据,进行简单的清晰过滤,导入到DW层pica_dw.dw_fact_log_sesson_pref
* 处理埋点数据,进行简单的清晰过滤,导入到DW层pica_dw.dw_fact_log_session_pref
*
* @Author yunfeng.wu
* @Date 2020/08/07 09:23
* @Version 1.0
*/
object SessionProcessPref {
def apply(): SessionProcessPref = new SessionProcessPref()
def apply(): SessionProcessPref = new SessionProcessPref()
def main(args: Array[String]): Unit = {
//1.执行任务之前先往record表记录
val insertSQL: String =
s"""
def main(args: Array[String]): Unit = {
//1.执行任务之前先往record表记录
val insertSQL: String =
s"""
|insert into ${MyConfigSession.DATA_BASE}.${MyConfigSession.JDBC_TABLE} (job_id,job_name,job_type,job_scn,status,start_time)
|values(1969,'pica_dw.dw_fact_log_session','3',?,'0',?)
|values(0,'${MyConfigSession.HIVE_TABLE0}','3',?,'0',?)
""".stripMargin
//设置同步数据的批次号,格式是2019-09-12
var scnData: String = DateUtils.getYesterdayDate
if(args.length>=1){
scnData = args(0)
}
println(s"scnData=${scnData}")
//设置任务开始时间,格式是2019-09-12 14:03:30
val startTime: String = DateUtils.getTodayTime
//存储SQL中的参数
val insertArr: Array[String] = Array[String](scnData, startTime)
//获取MYSQL连接
val connSql: sql.Connection = JDBCUtil.getConnection()
//向 record 表插入数据
val flag: Int = JDBCUtil.insertRecord(connSql, insertSQL, insertArr)
try {
val sessionProcessPref: SessionProcessPref = SessionProcessPref()
//step1:获取源数据,重新分区,产生shuffle,Spark读Hive默认的分区数太少,并对数据去重
var sourceDF: DataFrame = sessionProcessPref.sparkSession.sql(MyConfigSession.SOURCE_SQL_PREF+s" and created_day='${scnData}'").repartition(200).distinct()
//step2:抽取出当天pseudo_session对应的非空的device_token,doctor_id,mobile,补充到对应的pseudo_session下这几项为空的记录中
val groupRdd = sourceDF.rdd.groupBy(r => r.getAs[String]("pseudo_session"))
val resRdd = groupRdd.flatMap(g => {
val pseudo_session = g._1
val resList: ListBuffer[Row] = new ListBuffer[Row]()
var rowList = g._2
rowList = rowList.toList.sortWith((x,y)=>x.getAs[String]("created") > y.getAs[String]("created"))//按created由大到小排序
var thisDeviceToken = ""
var thisDoctorId = "0"
var thisMobile = ""
rowList.foreach(row => {
var deviceToken = row.getAs[String]("device_token")
var doctorId = row.getAs[String]("doctor_id")
var mobile = row.getAs[String]("mobile")
if(deviceToken!=null && !deviceToken.equals("") ){
thisDeviceToken = deviceToken
}else {
deviceToken = thisDeviceToken
}
if(doctorId!=null && !doctorId.equals("") && !doctorId.equals("0") ){
thisDoctorId = doctorId
}else {
doctorId = thisDoctorId
}
if(mobile!=null && !mobile.equals("") ){
thisMobile = mobile
}else {
mobile = thisMobile
}
resList += (Row(deviceToken,
pseudo_session,
row.getAs[String]("class_name"),
row.getAs[String]("action"),
row.getAs[String]("view_path"),
row.getAs[String]("component_tag"),
row.getAs[String]("created"),
mobile,
doctorId,
row.getAs[String]("device_brand"),
row.getAs[String]("device_model"),
row.getAs[String]("app_version"),
row.getAs[String]("device_type"),
row.getAs[String]("web_data"),
row.getAs[String]("web_data_type"),
row.getAs[String]("alternate_info"),
row.getAs[String]("network_type"),
row.getAs[String]("login_state"),
row.getAs[String]("first_app_version"),
row.getAs[String]("user_token_tourist"),
row.getAs[String]("serviceName")
))
})
resList.iterator
})
import sessionProcessPref.sparkSession.implicits._
//step3:根据映射表来进行action_type和class_name数据过滤
val resDF = sessionProcessPref.sparkSession.createDataFrame(resRdd,sourceDF.schema)
resDF.persist(StorageLevel.MEMORY_AND_DISK_SER)
println("resDF.show=======>")
resDF.show()
// val data: RDD[Row] = resDF.rdd.mapPartitions(sessionProcessPref.filterRows)
println("---------------------------------------process columns-------------------------------------------")
val baseRdd = resDF.rdd.mapPartitions(sessionProcessPref.processColumns)
println(s"process columns.count===>${baseRdd.count()}")
var baseDF = baseRdd.toDF("pseudo_session", "user_id", "mobile", "device_token", "user_token", "view_class", "view_path","action", "component_tag",
"menu_code", "action_code", "position", "label_value","label_class", "app_version", "device_type", "device_brand", "device_model",
"net_type", "created_time", "date_time","web_data","web_data_type","alternate_info","login_state","first_app_version","serviceName")
println("baseDF.show=======>")
baseDF.show()
println("----------------------------------compute session id---------------------------------------------")
// val sessionIdDF: DataFrame = sessionProcessPref.getSessionId(baseDF,sessionProcessPref)
//默认缓存级别是:MEMORY_AND_DISK
// sessionIdDF.persist(StorageLevel.MEMORY_AND_DISK_SER)
println("-------------------------------match user_id 逻辑-------------------------------------------------")
val dwFactLogSession: DataFrame = sessionProcessPref.matchUserId(baseDF,sessionProcessPref.sparkSession,scnData)
println("dwFactLogSession.show=======>")
dwFactLogSession.show()
dwFactLogSession.printSchema()
println("-----------------create view fact_log_sesson_pref and load to dw_fact_log_sesson_pref--------------------")
dwFactLogSession.createOrReplaceTempView("fact_log_sesson_pref")
val loadDataSql =
s"insert overwrite table ${MyConfigSession.HIVE_TABLE4} partition(created_day='${scnData}') select * from fact_log_session distribute by rand()"
sessionProcessPref.sparkSession.sql(loadDataSql)
//设置同步数据的批次号,格式是2019-09-12
var scnData: String = DateUtils.getYesterdayDate
if (args.length >= 1) {
scnData = args(0)
}
println(s"scnData=${scnData}")
//设置任务开始时间,格式是2019-09-12 14:03:30
val startTime: String = DateUtils.getTodayTime
//存储SQL中的参数
val insertArr: Array[String] = Array[String](scnData, startTime)
//获取MYSQL连接
val connSql: sql.Connection = JDBCUtil.getConnection()
//向 record 表插入数据
val flag: Int = JDBCUtil.insertRecord(connSql, insertSQL, insertArr)
try {
val sessionProcessPref: SessionProcessPref = SessionProcessPref()
//step1:获取源数据,重新分区,产生shuffle,Spark读Hive默认的分区数太少,并对数据去重
var sourceDF: DataFrame = sessionProcessPref.sparkSession.sql(MyConfigSession.SOURCE_SQL_PREF + s" and created_day='${scnData}'").repartition(120).distinct()
//step2:抽取出当天pseudo_session对应的非空的device_token,doctor_id,mobile,补充到对应的pseudo_session下这几项为空的记录中
val groupRdd = sourceDF.rdd.groupBy(r => r.getAs[String]("pseudo_session"))
val baseRdd = groupRdd.flatMap(g => {
val pseudo_session = g._1
val resList: ListBuffer[SessionPref] = new ListBuffer[SessionPref]()
var rowList = g._2
rowList = rowList.toList.sortWith((x, y) => x.getAs[String]("created") > y.getAs[String]("created")) //按created由大到小排序
var thisDeviceToken = ""
var thisDoctorId = "0"
var thisMobile = ""
val path_menu: Map[String, String] = sessionProcessPref.menuCodeBroad.value //关联到menu_code的映射表广播变量
val actionCategoryMap: Map[String, String] = sessionProcessPref.actionCategory.value //关联到action_category的映射表广播变量
rowList.foreach(row => {
var deviceToken = row.getAs[String]("device_token")
var doctorId = row.getAs[String]("doctor_id")
var mobile = row.getAs[String]("mobile")
if (deviceToken != null && !deviceToken.equals("")) {
thisDeviceToken = deviceToken
} else {
deviceToken = thisDeviceToken
}
if (doctorId != null && !doctorId.equals("") && !doctorId.equals("0")) {
thisDoctorId = doctorId
} else {
doctorId = thisDoctorId
}
if (mobile != null && !mobile.equals("")) {
thisMobile = mobile
} else {
mobile = thisMobile
}
//1.获取网络类型
//2G,3G,4G,2G/3G/4G,WIFI,WLAN,或者为空字符串
val net_type = UseUtil.netTypeMatch(StringUtils.getNotNullString(row.getAs[String]("network_type")))
//2.修改action类型,保留原始字段
val action = row.getAs[String]("action")
var action_type: String = ""
if (row.getAs[String]("action") != null) {
action_type = actionCategoryMap.getOrElse(action, "ACTION_UNKNOWN")
}
//3.拆分 component_tag字段
val component_tag: String = StringUtils.getNotNullString(row.getAs[String]("component_tag"))
val tagArr = Array("menu_code", "action_code", "position", "label_value", "label_class", "module_class1", "module_class2", "tag8", "tag9", "tag10")
val tagMap = mutable.Map[String, String]()
tagArr.foreach(r => tagMap.put(r, ""))
//将符合要求的component_tag进行切割,获取 aciton_code,label_value
if (component_tag.contains("#")) {
//按照#号切割
val strs: Array[String] = component_tag.split("#")
var index = 0
for (value <- strs) {
val filedName = tagArr.apply(index)
if ("label_value".equals(filedName)) {
tagMap.put(filedName, value.substring(0, math.min(250, strs(3).length)))
} else {
tagMap.put(filedName, value)
}
index += 1
}
}
var menu_code_new = tagMap("menu_code")
if ("MenuCode_081".equals(menu_code_new)) {
menu_code_new = "081" //针对异常menu_code值单独处理
}
//匹配menu_code:如果上述截取出来的menu_code为(''||null||0||length(menu_code)>3 ) and action is ACTION_VIEW
if ((menu_code_new.equals("") || menu_code_new.equals("null") || menu_code_new.equals("0") || menu_code_new.length > 3)
&& action_type.equals("ACTION_VIEW")) {
menu_code_new = "0" //关联不上的显示为0
import scala.util.control.Breaks._
breakable {
//利用menu_code映射表匹配
for (tuple <- path_menu) {
//源数据view_path的字符串包含映射表view_path的字符串
if (StringUtils.getNotNullString(row.getAs[String]("view_path")).contains(tuple._1)) {
//满足条件后,修改源数据的menu_code
menu_code_new = tuple._2
println("--------------------menu_code match successfully-----------------------")
//结束遍历
break()
}
}
}
}
resList += SessionPref(pseudo_session,
doctorId,
mobile,
deviceToken,
StringUtils.getNotNullString(row.getAs[String]("user_token_tourist")),
StringUtils.getNotNullString(row.getAs[String]("class_name")),
StringUtils.getNotNullString(row.getAs[String]("view_path")),
action, action_type, component_tag, tagMap("menu_code"), menu_code_new, tagMap("action_code"),
tagMap("position"), tagMap("label_value"), tagMap("label_class"), tagMap("module_class1"), tagMap("module_class2"),
StringUtils.getNotNullString(row.getAs[String]("app_version")),
StringUtils.getNotNullString(row.getAs[String]("device_type")),
StringUtils.getNotNullString(row.getAs[String]("device_brand")),
StringUtils.getNotNullString(row.getAs[String]("device_model")),
net_type,
StringUtils.getNotNullString(row.getAs[String]("created")),
DateUtils.milliSecondsFormatTime(StringUtils.getNotNullString(row.getAs[String]("created"))),
StringUtils.getNotNullString(row.getAs[String]("web_data")),
StringUtils.getNotNullString(row.getAs[String]("web_data_type")),
StringUtils.getNotNullString(row.getAs[String]("alternate_info")),
StringUtils.getNotNullString(row.getAs[String]("login_state")),
StringUtils.getNotNullString(row.getAs[String]("first_app_version")),
StringUtils.getNotNullString(row.getAs[String]("serviceName")),
tagMap("tag8"), tagMap("tag9"), tagMap("tag10")
)
})
resList.iterator
})
println("----------------------------------update task record table---------------------------------------")
//任务执行成功,更新 Mysql record 配置表
val updateSQL: String =
s"""
|update ${MyConfigSession.JDBC_TABLE} set status=?,end_time=?,data_count=? where job_id=1969 and start_time='${startTime}'
import sessionProcessPref.sparkSession.implicits._
// val resDF = sessionProcessPref.sparkSession.createDataFrame(resRdd,sourceDF.schema)
// resDF.persist(StorageLevel.MEMORY_AND_DISK_SER)
// println("resDF.show=======>")
// resDF.printSchema()
println("---------------------------------------process columns-------------------------------------------")
// val baseRdd = resDF.rdd.mapPartitions(sessionProcessPref.processColumns)
// println("baseRdd.take(1)=" + baseRdd.take(1))
// println(s"process columns.count===>${baseRdd.count()}")
var baseDF = baseRdd.toDF("pseudo_session", "user_id", "mobile", "device_token", "user_token", "view_class", "view_path", "action", "action_type", "component_tag",
"menu_code", "menu_code_new", "action_code", "position", "label_value", "label_class", "module_class1", "module_class2", "app_version", "device_type", "device_brand", "device_model",
"net_type", "created_time", "date_time", "web_data", "web_data_type", "alternate_info", "login_state", "first_app_version", "serviceName", "tag8", "tag9", "tag10")
println("baseDF.show=======>")
baseDF.printSchema()
println("----------------------------------compute session id---------------------------------------------")
// val sessionIdDF: DataFrame = sessionProcessPref.getSessionId(baseDF,sessionProcessPref)
//默认缓存级别是:MEMORY_AND_DISK
println("-------------------------------match user_id 逻辑-------------------------------------------------")
baseDF.repartition(120).persist(StorageLevel.MEMORY_AND_DISK_SER)
// val dwFactLogSession: DataFrame = sessionProcessPref.matchUserId(baseDF, sessionProcessPref.sparkSession, scnData)
// println("dwFactLogSession.show=======>")
// dwFactLogSession.printSchema()
println("-----------------create view fact_log_session_pref and load to dw_fact_log_session_pref--------------------")
baseDF.createOrReplaceTempView("fact_log_session_pref")
val fields = List("pseudo_session", "user_id", "COALESCE(cast(user_id as int),0) user_id_int", "mobile", "device_token", "user_token", "view_class", "view_path", "action", "action_type",
"component_tag", "menu_code", "menu_code_new", "action_code", "position", "label_value", "label_class", "module_class1", "module_class2", "app_version", "device_type", "device_brand",
"device_model", "net_type", "created_time", "date_time", "web_data", "web_data_type", "alternate_info", "login_state", "first_app_version",
"servicename", "tag8", "tag9", "tag10")
val loadDataSql =
s"insert overwrite table ${MyConfigSession.HIVE_TABLE0} partition(created_day='${scnData}') select ${fields.mkString(",")} from fact_log_session_pref distribute by rand()"
sessionProcessPref.sparkSession.sql(loadDataSql)
val dataCount = baseDF.count()
// val dataCount = 1 //统计耗时较长,给个默认值1,表示不统计
println("----------------------------------update task record table---------------------------------------")
//任务执行成功,更新 Mysql record配置表
val updateSQL: String =
s"""
|update ${MyConfigSession.JDBC_TABLE} set status=?,end_time=?,data_count=? where job_name='${MyConfigSession.HIVE_TABLE0}' and start_time='${startTime}'
""".stripMargin
val endTime: String = DateUtils.getTodayTime
val upreSta: PreparedStatement = connSql.prepareStatement(updateSQL)
upreSta.setString(1, "1")
upreSta.setString(2, endTime)
upreSta.setInt(3, resDF.count().toInt)
//更新表数据
upreSta.executeUpdate()
//关闭连接
JDBCUtil.close(connSql, upreSta)
sessionProcessPref.sparkSession.stop()
} catch {
case e: Exception => {
println(s"-----------------------------------任务异常---------------------------------------------------")
e.printStackTrace()
val exceptionSQL: String =
s"""
|update ${MyConfigSession.JDBC_TABLE} set status=?,exception=?,end_time=? where job_id=1969 and start_time='${startTime}'
val endTime: String = DateUtils.getTodayTime
val upreSta: PreparedStatement = connSql.prepareStatement(updateSQL)
upreSta.setString(1, "1")
upreSta.setString(2, endTime)
upreSta.setInt(3, dataCount.toInt)
//更新表数据
upreSta.executeUpdate()
//关闭连接
JDBCUtil.close(connSql, upreSta)
sessionProcessPref.sparkSession.stop()
} catch {
case e: Exception => {
println(s"-----------------------------------任务异常---------------------------------------------------")
e.printStackTrace()
val exceptionSQL: String =
s"""
|update ${MyConfigSession.JDBC_TABLE} set status=?,exception=?,end_time=? where job_name='${MyConfigSession.HIVE_TABLE0}' and start_time='${startTime}'
""".stripMargin
val errorArr = Array[String]("2", e.getMessage, DateUtils.getTodayTime)
JDBCUtil.insertRecord(connSql, exceptionSQL, errorArr)
connSql.close()
}
}
val errorArr = Array[String]("2", e.getMessage, DateUtils.getTodayTime)
JDBCUtil.insertRecord(connSql, exceptionSQL, errorArr)
connSql.close()
}
}
}
}
class SessionProcessPref extends java.io.Serializable{
def getSparkSession(appName: String): SparkSession = {
val conf: SparkConf = new SparkConf().setAppName(appName)
UseUtil.setConfigure(conf)
val sparkSession: SparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
sparkSession
}
val sparkSession: SparkSession = getSparkSession("SessionProcessPref")
class SessionProcessPref extends java.io.Serializable {
def getSparkSession(appName: String): SparkSession = {
val conf: SparkConf = new SparkConf().setAppName(appName)
UseUtil.setConfigure(conf)
val sparkSession: SparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
sparkSession
}
//获取符合要求的actionType广播变量
// val actionTypeBroad =
// UseUtil.getBroadcast(sparkSession, MyConfigSession.ACTION_TYPE_SQL, "action_type", "is_valid")
//获取clasName广播变量
// val classNameBroad =
// UseUtil.getBroadcast(sparkSession, MyConfigSession.CLASS_NAME_SQL, "class_name", "is_valid")
//获取menu_code广播变量
// val menuCodeBroad =
// UseUtil.getBroadcast(sparkSession, MyConfigSession.MENU_CODE_SQL, "view_path", "menu_code")
// //获取actionCategory变量
// val actionCategory =
// UseUtil.getBroadcast(sparkSession,MyConfigSession.ACTION_CATEGORY_SQL,"action_type","action_category")
val sparkSession: SparkSession = getSparkSession("SessionProcessPref")
//获取符合要求的actionType广播变量
// val actionTypeBroad = UseUtil.getBroadcast(sparkSession, MyConfigSession.ACTION_TYPE_SQL, "action_type", "is_valid")
//获取clasName广播变量
// val classNameBroad =
// UseUtil.getBroadcast(sparkSession, MyConfigSession.CLASS_NAME_SQL, "class_name", "is_valid")
//获取menu_code广播变量
val menuCodeBroad = UseUtil.getBroadcast(sparkSession, MyConfigSession.MENU_CODE_SQL, "view_path", "menu_code")
// //获取actionCategory变量
val actionCategory = UseUtil.getBroadcast(sparkSession, MyConfigSession.ACTION_CATEGORY_SQL, "action_type", "action_category")
//处理字段,得到需要的字段值
val processColumns = (rows: Iterator[Row]) => {
// val baseList = new ListBuffer[(String, String, String, String, String, String, String, String, String,String, String, String,
// String, String, String, String, String, String, String, String, String, String, String, String,String,String, String, String)]() //
val baseList = new ListBuffer[SessionPref]()
//关联到menu_code的映射表广播变量
// val path_menu: Map[String, String] = menuCodeBroad.value
//关联到action_category的映射表广播变量
// val actionCategoryMap: Map[String, String] = actionCategory.value
rows.toList.foreach(row => {
//1.获取网络类型
//2G,3G,4G,2G/3G/4G,WIFI,WLAN,或者为空字符串
val net_type = UseUtil.netTypeMatch(StringUtils.getNotNullString(row.getAs[String]("network_type")))
//2.修改action类型,保留原始字段
val action = row.getAs[String]("action")
// var action_type: String = ""
// if (row.getAs[String]("action") !=null) {
// action_type = actionCategoryMap.getOrElse(action,"ACTION")
// }
//3.拆分 component_tag字段
val component_tag: String = StringUtils.getNotNullString(row.getAs[String]("component_tag"))
val tagArr = Array("menu_code","action_code","position","label_value","label_class")
val tagMap = scala.collection.immutable.Map[String,String]()
tagArr.foreach(r=>tagMap.+(r->""))
//将符合要求的component_tag进行切割,获取 aciton_code,label_value
if (component_tag.contains("#")) {
//按照#号切割
val strs: Array[String] = component_tag.split("#")
var index = 0
for (value <- strs) {
val filedName = tagArr.apply(index)
if ("label_value".equals(filedName)) {
tagMap.+(filedName -> value.substring(0, math.min(250, strs(3).length)))
} else {
tagMap.+(filedName -> value)
}
index += 1
}
//处理字段,得到需要的字段值
val processColumns = (rows: Iterator[Row]) => {
val baseList = new ListBuffer[SessionPref]()
//关联到menu_code的映射表广播变量
val path_menu: Map[String, String] = menuCodeBroad.value
// 关联到action_category的映射表广播变量
val actionCategoryMap: Map[String, String] = actionCategory.value
rows.toList.foreach(row => {
//1.获取网络类型
//2G,3G,4G,2G/3G/4G,WIFI,WLAN,或者为空字符串
val net_type = UseUtil.netTypeMatch(StringUtils.getNotNullString(row.getAs[String]("network_type")))
//2.修改action类型,保留原始字段
val action = row.getAs[String]("action")
var action_type: String = ""
if (row.getAs[String]("action") != null) {
action_type = actionCategoryMap.getOrElse(action, "ACTION_UNKNOWN")
}
//3.拆分 component_tag字段
val component_tag: String = StringUtils.getNotNullString(row.getAs[String]("component_tag"))
val tagArr = Array("menu_code", "action_code", "position", "label_value", "label_class", "module_class1", "module_class2", "tag8", "tag9", "tag10")
val tagMap = mutable.Map[String, String]()
tagArr.foreach(r => tagMap.put(r, ""))
//将符合要求的component_tag进行切割,获取 aciton_code,label_value
if (component_tag.contains("#")) {
//按照#号切割
val strs: Array[String] = component_tag.split("#")
var index = 0
for (value <- strs) {
val filedName = tagArr.apply(index)
if ("label_value".equals(filedName)) {
tagMap.put(filedName, value.substring(0, math.min(250, strs(3).length)))
} else {
tagMap.put(filedName, value)
}
index += 1
}
}
var menu_code_new = tagMap("menu_code")
if ("MenuCode_081".equals(menu_code_new)) {
menu_code_new = "081" //针对异常menu_code值单独处理
}
//匹配menu_code:如果上述截取出来的menu_code为(''||null||0||length(menu_code)>3 ) and action is ACTION_VIEW
if ((menu_code_new.equals("") || menu_code_new.equals("null") || menu_code_new.equals("0") || menu_code_new.length > 3)
&& action_type.equals("ACTION_VIEW")) {
menu_code_new = "0" //关联不上的显示为0
import scala.util.control.Breaks._
breakable {
//利用menu_code映射表匹配
for (tuple <- path_menu) {
//源数据view_path的字符串包含映射表view_path的字符串
if (StringUtils.getNotNullString(row.getAs[String]("view_path")).contains(tuple._1)) {
//满足条件后,修改源数据的menu_code
menu_code_new = tuple._2
println("--------------------menu_code match successfully-----------------------")
//结束遍历
break()
}
// //匹配menu_code:如果上述截取出来的menu_code为(''||null||0||length(menu_code)>3 ) and action is ACTION_VIEW
// if ((menu_code.equals("")|| menu_code.equals("null") || menu_code.equals("0") || menu_code.length> 3 )
// && action_type.equals("ACTION_VIEW")) {
// menu_code = "0" //关联不上的显示为0
// import scala.util.control.Breaks._
// breakable {
// //利用menu_code映射表匹配
// for (tuple <- path_menu) {
// //源数据view_path的字符串包含映射表view_path的字符串
// if (StringUtils.getNotNullString(row.getAs[String]("view_path")).contains(tuple._1)) {
// //满足条件后,修改源数据的menu_code
// menu_code = tuple._2
// println("--------------------menu_code match successfully-----------------------")
// //结束遍历
// break()
// }
// }
// }
// }
//一行数据添加到List中
baseList += SessionPref(StringUtils.getNotNullString(row.getAs[String]("pseudo_session")),
StringUtils.getNotNullString(row.getAs[String]("doctor_id")),
StringUtils.getNotNullString(row.getAs[String]("mobile")),
StringUtils.getNotNullString(row.getAs[String]("device_token")),
StringUtils.getNotNullString(row.getAs[String]("user_token_tourist")),
StringUtils.getNotNullString(row.getAs[String]("class_name")),
StringUtils.getNotNullString(row.getAs[String]("view_path")),
action, component_tag, tagMap("menu_code"), tagMap("action_code"),
tagMap("position"),tagMap("label_value"),tagMap("label_class"),
StringUtils.getNotNullString(row.getAs[String]("app_version")),
StringUtils.getNotNullString(row.getAs[String]("device_type")),
StringUtils.getNotNullString(row.getAs[String]("device_brand")),
StringUtils.getNotNullString(row.getAs[String]("device_model")),
net_type,
StringUtils.getNotNullString(row.getAs[String]("created")),
DateUtils.milliSecondsFormatTime(StringUtils.getNotNullString(row.getAs[String]("created"))),
StringUtils.getNotNullString(row.getAs[String]("web_data")),
StringUtils.getNotNullString(row.getAs[String]("web_data_type")),
StringUtils.getNotNullString(row.getAs[String]("alternate_info")),
StringUtils.getNotNullString(row.getAs[String]("login_state")),
StringUtils.getNotNullString(row.getAs[String]("first_app_version")),
StringUtils.getNotNullString(row.getAs[String]("serviceName"))
)
})
baseList.iterator
}
/**
* @Description 匹配user_id,补全数据中的user_id字段
* @param dataFrame 筛选后的数据
* @param sparkSQLSession SparkSession 环境
* @param created_day 当前数据的日期,格式 "2020-03-01"
* @return org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>
**/
def matchUserId(dataFrame: DataFrame,sparkSQLSession: SparkSession,created_day:String):DataFrame={
//追加:将dataFrame与pica_ds.pica_doctor根据user_id进行匹配,匹配不上的user_id置为'0'
println("matchUserId开始执行-----------------------------------")
dataFrame.createOrReplaceTempView(MyConfigSession.VIEW_SESSION_ODS)
val DF = sparkSQLSession.sql(MyConfigSession.INIT_USER_ID_SQL_PREF)
//以下的所有逻辑是为了补全user_id字段
//第一步:首先筛选出不符合的use_id数据,将这些user_id置为字符串'0'
val noMatchUserIdDF: Dataset[Row] = DF.where("user_id ='' OR user_id = '0' OR LENGTH(user_id) = 24")
// .selectExpr("pseudo_session","'0' as user_id","mobile","device_token","user_token",
// "view_class","view_path","action" ,"component_tag","menu_code",
// "action_code","position","label_value","label_class","app_version","device_type",
// "device_brand","device_model","net_type","created_time",
// "date_time","web_data","web_data_type","alternate_info","login_state","first_app_version","serviceName")
noMatchUserIdDF.createOrReplaceTempView(MyConfigSession.VIEW_SESSION_NO_MATCH)
//1.筛选出上一步没有匹配到的user_id,先按照mobile_phone进行匹配
val mobilePhoneDF: DataFrame = sparkSQLSession.sql(MyConfigSession.MOBILE_PHONE_SQL_PREF)
mobilePhoneDF.createOrReplaceTempView(MyConfigSession.VIEW_MOBILE_PHONE)
//2.使用临时表equiment,筛选出为1的那条最新数据
var equipmentInfoSql = MyConfigSession.EQUIPMENT_INFO_SQL
if(!created_day.equals(DateUtils.getYesterdayDate)){//如果不是跑昨天的数据,使用equipment拉链表
equipmentInfoSql = MyConfigSession.EQUIPMENT_INFO_SQL_ARGS+ s"'${created_day}'"
}
}
println(s"equipmentInfoSql=${equipmentInfoSql}")
val equipmentDF: DataFrame = sparkSQLSession.sql(equipmentInfoSql).where("row_d =1")
equipmentDF.createOrReplaceTempView(MyConfigSession.VIEW_EQUIPMENT_INFO)
}
//一行数据添加到List中
baseList += SessionPref(StringUtils.getNotNullString(row.getAs[String]("pseudo_session")),
StringUtils.getNotNullString(row.getAs[String]("doctor_id")),
StringUtils.getNotNullString(row.getAs[String]("mobile")),
StringUtils.getNotNullString(row.getAs[String]("device_token")),
StringUtils.getNotNullString(row.getAs[String]("user_token_tourist")),
StringUtils.getNotNullString(row.getAs[String]("class_name")),
StringUtils.getNotNullString(row.getAs[String]("view_path")),
action, action_type, component_tag, tagMap("menu_code"), menu_code_new, tagMap("action_code"),
tagMap("position"), tagMap("label_value"), tagMap("label_class"), tagMap("module_class1"), tagMap("module_class2"),
StringUtils.getNotNullString(row.getAs[String]("app_version")),
StringUtils.getNotNullString(row.getAs[String]("device_type")),
StringUtils.getNotNullString(row.getAs[String]("device_brand")),
StringUtils.getNotNullString(row.getAs[String]("device_model")),
net_type,
StringUtils.getNotNullString(row.getAs[String]("created")),
DateUtils.milliSecondsFormatTime(StringUtils.getNotNullString(row.getAs[String]("created"))),
StringUtils.getNotNullString(row.getAs[String]("web_data")),
StringUtils.getNotNullString(row.getAs[String]("web_data_type")),
StringUtils.getNotNullString(row.getAs[String]("alternate_info")),
StringUtils.getNotNullString(row.getAs[String]("login_state")),
StringUtils.getNotNullString(row.getAs[String]("first_app_version")),
StringUtils.getNotNullString(row.getAs[String]("serviceName")),
tagMap("tag8"), tagMap("tag9"), tagMap("tag10")
)
})
baseList.iterator
}
//3.将第2步筛选出来的数据按照device_token进行匹配,获得user_id
val deviceTokenDF: DataFrame = sparkSQLSession.sql(MyConfigSession.DEVICE_TOKEN_SQL_PREF)
//4.将上述三者union,最终导入表中的数据
val rightUserId: Dataset[Row] = DF.where("user_id !='' and user_id != '0' and LENGTH(user_id) !=24")
val mobilePhoneResDF: Dataset[Row] = mobilePhoneDF.where("user_id !='0'")
val dwFactLogSession: Dataset[Row] = rightUserId.union(mobilePhoneResDF).union(deviceTokenDF)
dwFactLogSession.createOrReplaceTempView(MyConfigSession.VIEW_DEVICE_TOKEN)
//根据pica_doctor补充user_id_int字段(字段类型转换成int型), 限制 delete_flag = 1 and creat_time截止昨日创建,未关联上显示为0
val USER_ID_INT_SQL:String=
s"""
|SELECT concat(regexp_replace( '${created_day}',"-","") ,cast(row_number() over(partition by 1 order by created_time) as string)) as id,
|ss.pseudo_session,ss.user_id,COALESCE(b.id,0) user_id_int ,ss.mobile,ss.device_token,ss.user_token,
|ss.view_class,ss.view_path,ss.action, ss.component_tag,ss.menu_code,
|ss.action_code,ss.position,ss.label_value,ss.label_class,ss.app_version,ss.device_type,
|ss.device_brand,ss.device_model, ss.net_type,ss.created_time,
|ss.date_time,ss.web_data,ss.web_data_type,ss.alternate_info,ss.login_state,ss.first_app_version,ss.serviceName
| from ${MyConfigSession.VIEW_DEVICE_TOKEN} AS ss
|left join (select id,cast(id as string) id_str from pica_ds.pica_doctor a where a.delete_flag = 1 and to_date(a.creat_time) <= '${created_day}') AS b on ss.user_id = b.id_str
|""".stripMargin
val userIdDF: DataFrame = sparkSQLSession.sql(USER_ID_INT_SQL)
userIdDF
/**
* @Description 匹配user_id,补全数据中的user_id字段
* @param dataFrame 筛选后的数据
* @param sparkSQLSession SparkSession 环境
* @param created_day 当前数据的日期,格式 "2020-03-01"
* @return org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>
**/
def matchUserId(dataFrame: DataFrame, sparkSQLSession: SparkSession, created_day: String): DataFrame = {
//追加:将dataFrame与pica_ds.pica_doctor根据user_id进行匹配,匹配不上的user_id置为'0'
println("matchUserId开始执行-----------------------------------")
dataFrame.withColumnRenamed("user_id", "user_id_ods").createOrReplaceTempView(MyConfigSession.VIEW_SESSION_ODS)
val INIT_USER_ID_SQL_PREF =
s"""
|SELECT t.*,t.user_id_ods user_id_old, COALESCE(cast(b.id as string),'0') AS user_id from ${MyConfigSession.VIEW_SESSION_ODS} as t
|left join (select id,cast(id as string) id_str from pica_ds.pica_doctor a where a.delete_flag = 1 and to_date(a.creat_time) <= '${created_day}') AS b on t.user_id_ods = cast(b.id as string)
""".stripMargin
val DF = sparkSQLSession.sql(INIT_USER_ID_SQL_PREF).drop("user_id_ods")
/*
以下的所有逻辑是为了补全user_id字段
*/
//第一步:首先筛选出不符合的use_id数据,将这些user_id置为字符串'0'
val noMatchUserIdDF: Dataset[Row] = DF.where("user_id ='' OR user_id = '0' OR LENGTH(user_id) = 24")
noMatchUserIdDF.drop("user_id_old").withColumnRenamed("user_id", "user_id_old")
.createOrReplaceTempView(MyConfigSession.VIEW_SESSION_NO_MATCH)
val MOBILE_PHONE_SQL_PREF: String =
s"""
|SELECT ss.*,COALESCE(cast(b.id as string),'0') AS user_id from ${MyConfigSession.VIEW_SESSION_NO_MATCH} AS ss
|left join (select distinct id,mobile_phone from pica_ds.pica_doctor where pica_doctor.delete_flag = 1 ) AS b on ss.mobile = b.mobile_phone
""".stripMargin
//1.筛选出上一步没有匹配到的user_id,先按照mobile_phone进行匹配
val mobilePhoneDF: DataFrame = sparkSQLSession.sql(MOBILE_PHONE_SQL_PREF)
//2.使用临时表equiment,筛选出为1的那条最新数据
var equipmentInfoSql = MyConfigSession.EQUIPMENT_INFO_SQL
if (!created_day.equals(DateUtils.getYesterdayDate)) { //如果不是跑昨天的数据,使用equipment拉链表
equipmentInfoSql = MyConfigSession.EQUIPMENT_INFO_SQL_ARGS + s"'${created_day}'"
}
println(s"equipmentInfoSql=${equipmentInfoSql}")
val equipmentDF: DataFrame = sparkSQLSession.sql(equipmentInfoSql).where("row_d =1")
equipmentDF.createOrReplaceTempView(MyConfigSession.VIEW_EQUIPMENT_INFO)
mobilePhoneDF.drop("user_id_old").withColumnRenamed("user_id", "user_id_old")
.createOrReplaceTempView(MyConfigSession.VIEW_MOBILE_PHONE)
val DEVICE_TOKEN_SQL_PREF: String =
s""" SELECT t.*,COALESCE(cast(b.user_id as string),'0') AS user_id
| from (select * from ${MyConfigSession.VIEW_MOBILE_PHONE} a where a.user_id_old= '0' ) as t
|left join ${MyConfigSession.VIEW_EQUIPMENT_INFO} as b on t.device_token = b.device_token
""".stripMargin
println(s"DEVICE_TOKEN_SQL_PREF=${DEVICE_TOKEN_SQL_PREF}")
//3.将第2步筛选出来的数据按照device_token进行匹配,获得user_id
val deviceTokenDF: DataFrame = sparkSQLSession.sql(DEVICE_TOKEN_SQL_PREF)
//4.将上述三者union,最终导入表中的数据
val rightUserIdDF: Dataset[Row] = DF.where("user_id !='' and user_id != '0' and LENGTH(user_id) !=24")
val mobilePhoneResDF: Dataset[Row] = mobilePhoneDF.where("user_id !='0'")
println("rightUserIdDF/mobilePhoneResDF/deviceTokenDF.schema===========")
rightUserIdDF.printSchema()
mobilePhoneResDF.printSchema()
deviceTokenDF.printSchema()
val dwFactLogSession: Dataset[Row] = rightUserIdDF.union(mobilePhoneResDF).union(deviceTokenDF)
dwFactLogSession.createOrReplaceTempView(MyConfigSession.VIEW_DEVICE_TOKEN)
dwFactLogSession
//根据pica_doctor补充user_id_int字段(字段类型转换成int型), 限制 delete_flag = 1 and creat_time截止昨日创建,未关联上显示为0
// val USER_ID_INT_SQL: String =
// s"""
// |SELECT ss.*,COALESCE(b.id,0) user_id_int from ${MyConfigSession.VIEW_DEVICE_TOKEN} AS ss
// |left join (select id,cast(id as string) id_str from pica_ds.pica_doctor a where a.delete_flag = 1 and to_date(a.creat_time) <= '${created_day}') AS b on ss.user_id = b.id_str
// |""".stripMargin
// val userIdDF: DataFrame = sparkSQLSession.sql(USER_ID_INT_SQL)
// userIdDF
}
}
case class SessionPref(pseudo_session:String,
user_id:String,
mobile:String,
device_token:String,
user_token:String,
view_class:String,
view_path:String,
action:String,
component_tag:String,
menu_code:String,
action_code:String,
position:String,
label_value:String,
label_class:String,
app_version:String,
device_type:String,
device_brand:String,
device_model:String,
net_type:String,
created_time:String,
date_time:String,
web_data:String,
web_data_type:String,
alternate_info:String,
login_state:String,
first_app_version:String,
serviceName:String)
case class SessionPref(pseudo_session: String,
user_id: String,
mobile: String,
device_token: String,
user_token: String,
view_class: String,
view_path: String,
action: String,
action_type: String,
component_tag: String,
menu_code: String,
menu_code_new: String,
action_code: String,
position: String,
label_value: String,
label_class: String,
module_class1: String,
module_class2: String,
app_version: String,
device_type: String,
device_brand: String,
device_model: String,
net_type: String,
created_time: String,
date_time: String,
web_data: String,
web_data_type: String,
alternate_info: String,
login_state: String,
first_app_version: String,
serviceName: String,
tag8: String, tag9: String, tag10: String)
......@@ -2,167 +2,512 @@ package com.session
import java.sql
import java.sql.PreparedStatement
import com.config.MyConfigSession
import com.pica.utils.DateUtils
import com.pica.utils.{DateUtils, StringUtils}
import com.utils.{JDBCUtil, UseUtil}
import org.apache.spark.SparkConf
import org.apache.spark.sql.expressions.{Window, WindowSpec}
import org.apache.spark.sql.functions.{lag, row_number}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.storage.StorageLevel
import scala.collection.mutable.ListBuffer
/**
* 处理昨天的数据,导入到pica_dw.dw_fact_log_session_path表
* 处理昨天的数据,用于计算时长以及访问路径,结果导入dw_fact_log_session_term
* 注意点:
* 1.只保留service_name='trace2'(native)埋点数据,(h5埋点数据也会走native上报一份)
* 2.去掉ACTION_EQUIP_INFO类数据,去掉action类型数量<=2的session
* 1.首页加载前的引导页(android不埋,ios:view_class=YQGuidePageViewVC)不计入首页流量统计
* 2.对于ACTION_VIEW类型事件,用页面的resume以及stop事件计算访问时差
* 3.对于ACTION_HEART_BEAT事件中的back类型或者是BACKGROUND类事件,将menu_code定位back,
* 取连续backgroud的末次back的时间以及首次back时间差作为后台执行时长
* 4.如果menu_code值为空或值不符合规范,如果view_class有值取view_class代替
*
* @Author zhenxin.ma
* @Date 2020/3/27 10:58
* @Version 1.0
*/
class SessionProcessTerm {
def getSparkSession(appName: String): SparkSession = {
val conf: SparkConf = new SparkConf().setAppName(appName)
UseUtil.setConfigure(conf)
val sparkSession: SparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
sparkSession
}
}
object SessionProcessTerm {
def apply(): SessionProcessTerm = new SessionProcessTerm()
def apply(): SessionProcessTerm = new SessionProcessTerm()
def main(args: Array[String]): Unit = {
//1.执行任务之前先往record表记录
val insertSQL: String =
s"""
def main(args: Array[String]): Unit = {
//1.执行任务之前先往record表记录
val insertSQL: String =
s"""
|insert into ${MyConfigSession.DATA_BASE}.${MyConfigSession.JDBC_TABLE} (job_id,job_name,job_type,job_scn,status,start_time)
|values(1968,'pica_dw.dw_fact_log_session_path','3',?,'0',?)
|values(0,'${MyConfigSession.HIVE_TABLE4}','3',?,'0',?)
""".stripMargin
//设置同步数据的批次号,格式是2019-09-12
val scnData: String = DateUtils.getYesterdayDate
//设置任务开始时间,格式是2019-09-12 14:03:30
val startTime: String = DateUtils.getTodayTime
//存储SQL中的参数
val insertArr: Array[String] = Array[String](scnData, startTime)
//获取MYSQL连接
val connSql: sql.Connection = JDBCUtil.getConnection()
//向 record 表插入数据
val flag: Int = JDBCUtil.insertRecord(connSql, insertSQL, insertArr)
try {
val sparkSession: SparkSession = SessionProcessTerm().getSparkSession("SessionProcessTerm")
//筛选源数据
val sourceDF: DataFrame = sparkSession.sql(MyConfigSession.SOURCE_SQL_PATH)
//注册日期在流量统计日期之前的用户
val doctorDF: DataFrame = sparkSession.sql(
"select id from pica_ds.pica_doctor where to_date(creat_time) <=DATE_SUB(current_date(),1)")
sourceDF.join(doctorDF, sourceDF("user_id") === doctorDF("id"), "left")
.createOrReplaceTempView("tmp_table")
//将id为null的记录设置为0
val reSql: String = "select session_id,case when id is null then 0 else user_id END as user_id,action_type," +
"user_token,menu_code,action_code,position,label_value,app_version,device_type,created_time,date_time from tmp_table"
val selectDF: DataFrame = sparkSession.sql(reSql)
println("-----------------------------------compute refer columns-----------------------------------------")
val resultDF: DataFrame = getReferColumns(selectDF,sparkSession)
println("-----------------------------------load data to pica_dw.dw_fact_log_session_path-----------------")
loadData(resultDF,sparkSession,scnData)
println("----------------------------------update task record table---------------------------------------")
//任务执行成功,更新 Mysql record 配置表
val updateSQL: String =
s"""
|update ${MyConfigSession.JDBC_TABLE} set status=?,end_time=?,data_count=? where job_id=1968 and start_time='${startTime}'
//设置同步数据的批次号,格式是2019-09-12
var scnData: String = DateUtils.getYesterdayDate
if (args.length >= 1) {
scnData = args(0)
}
println(s"scnData=${scnData}")
//设置任务开始时间,格式是2019-09-12 14:03:30
val startTime: String = DateUtils.getTodayTime
//存储SQL中的参数
val insertArr: Array[String] = Array[String](scnData, startTime)
//获取MYSQL连接
val connSql: sql.Connection = JDBCUtil.getConnection()
//向 record 表插入数据
val flag: Int = JDBCUtil.insertRecord(connSql, insertSQL, insertArr)
val sessionProcessTerm: SessionProcessTerm = SessionProcessTerm()
try {
val sparkSession: SparkSession = sessionProcessTerm.getSparkSession("SessionProcessTerm")
var SOURCE_SQL_TERM =
s"""
|select pseudo_session,user_id_int user_id,mobile,device_token,view_class,view_path,action,
|case when action='ACTION_HEART_BEAT' then 'ACTION_HEART' else action_type end action_type,component_tag,menu_code,menu_code_new,
|case when component_tag ='back' then 'back' when (menu_code_new in('0','null','') and view_class not in('0','null','')) then view_class
|else menu_code_new end menu_code_offset ,
|action_code,position,label_value,label_class,app_version,device_type,created_time,date_time
| from ${MyConfigSession.HIVE_TABLE0}
| where servicename='trace2' and action!='ACTION_EQUIP_INFO'
| and (case when ((view_class like '%YunqueApp%' and action!='ACTION_HEART_BEAT') or LENGTH(view_class)<=3 or view_class='YQGuidePageViewVC') then '2' else '1' end)='1'
| and created_day='${scnData}'
|""".stripMargin // and pseudo_session='3b3cec3b-2305-4e3a-b690-843e2f666c69'
val sourceDF: DataFrame = sparkSession.sql(SOURCE_SQL_TERM )
println("sourceDF.show==================")
sourceDF.printSchema()
sourceDF.createOrReplaceTempView("session_term_ods")
//过滤掉action事件<=2个的session,以及首页加载前的引导页数据(view_class=YQGuidePageViewVC)
SOURCE_SQL_TERM =
s"""
|select t1.* from session_term_ods t1
|join (select pseudo_session from session_term_ods group by pseudo_session having count(distinct action)>2) t2
|on t1.pseudo_session = t2.pseudo_session
|where t1.menu_code_offset not in('0','null','')
|""".stripMargin
val selectDF = sparkSession.sql(SOURCE_SQL_TERM).drop("menu_code_new")
println("selectDF.show========")
selectDF.printSchema()
println("selectDF.count=========",selectDF.count())
val conditionGroup = List("<='4' ","between '5' and '9'",">'9'")
var dataCount = 0
var index = 0
selectDF.persist(StorageLevel.MEMORY_AND_DISK_SER)
for(condition <- conditionGroup){
index += 1
val slideDF = selectDF.where(s" SUBSTRING(pseudo_session,1,1) ${condition}")
println(s"-----------------------------------compute refer columns,condition=${condition}-----------------------------------------")
val referResultRdd = sessionProcessTerm.getReferColumns(slideDF)
val referResultDF: DataFrame = sparkSession.createDataFrame(referResultRdd, StructType(
List(StructField("session_id", StringType, false),
StructField("device_token", StringType, false),
StructField("user_id", IntegerType, false),
StructField("mobile", StringType, false),
StructField("menu_code", StringType, false),
StructField("menu_begin_time", StringType, false),
StructField("action_code", StringType, false),
StructField("position", StringType, false),
StructField("label_value", StringType, false),
StructField("action", StringType, false),
StructField("action_type", StringType, false),
StructField("action_step", StringType, false),
StructField("device_type", StringType, false),
StructField("app_version", StringType, false),
StructField("created_time", StringType, false),
StructField("date_time", StringType, false),
StructField("refer_session_id", StringType, false)
)
))
println("referResultDF.show()============'")
referResultDF.printSchema()
// referResultDF.where("action_type='ACTION_VIEW'").show(100,true)
referResultDF.repartition(100).persist(StorageLevel.MEMORY_AND_DISK_SER).createOrReplaceTempView("refer_result_table")
println("-----------------------------------compute menu_code term-----------------------------------------")
val getMenuTermSql =
"""
|select a.session_id,a.device_token,a.user_id,a.menu_code,
|lag(a.created_time) over(partition by a.session_id order by a.created_time desc ) menu_end_time,a.created_time
| from refer_result_table a
| left join(select session_id,min(app_version) min_version from refer_result_table group by session_id) b on a.session_id=b.session_id
|""".stripMargin
//处理session最小版本>='3.4.5'的session数据
val newVersionMenuDF = sparkSession.sql(s"${getMenuTermSql} where b.min_version>='3.4.5' and a.action in('ACTION_ACTIVITY_RESUME','ACTION_HEART_BEAT') ")
println("newVersionMenuDF,show()======")
val oldVersionMenuDF = sparkSession.sql(s"${getMenuTermSql} where b.min_version<'3.4.5' and a.action_type in ('ACTION_VIEW','ACTION_HEART')")
println("oldVersionMenuDF,show()======")
val referMenuDF = newVersionMenuDF.union(oldVersionMenuDF)
println("referMenuDF.show()=========")
referMenuDF.printSchema()
referMenuDF.createOrReplaceTempView("refer_menu_table")
println("-----------------------------------compute session_id end_time-----------------------------------------")
val sessionEndDF: DataFrame = sessionProcessTerm.getSessionTail(slideDF, sparkSession)
println("------sessionEndDF.show()-------------")
sessionEndDF.printSchema()
sessionEndDF.createOrReplaceTempView("session_end_table")
val getReferSql =
s"""
|select concat(regexp_replace( '${scnData}',"-","") ,cast(row_number() over(partition by 1 order by a.created_time) as string)) as id,
|a.session_id,
|a.device_token,
|a.user_id,
|a.mobile,
|a.menu_code,
|a.menu_begin_time,
| case when a.action_type in('ACTION_VIEW','ACTION_HEART') and b.menu_end_time is null then c.session_end_time else b.menu_end_time end menu_end_time,
|(cast((case when a.action_type in('ACTION_VIEW','ACTION_HEART') and b.menu_end_time is null then c.session_end_time else b.menu_end_time end) as bigint)-cast(a.menu_begin_time as bigint))/1000 menu_time_diff,
|a.action_type,
|a.action_code,
|a.position,
|a.label_value,
|lag(a.menu_code) over(partition by a.session_id order by a.created_time) refer_menu_code,
|lag(a.action_code) over(partition by a.session_id order by a.created_time) refer_action_code,
|lag(a.position) over(partition by a.session_id order by a.created_time) refer_position,
|lag(a.label_value) over(partition by a.session_id order by a.created_time) refer_label_value,
|lag(a.action_type) over(partition by a.session_id order by a.created_time) refer_action_type,
|a.action_step,
|a.device_type,
|a.app_version,
|a.created_time,
|a.date_time,
|c.session_begin_time,
|c.session_end_time,
|(cast(c.session_end_time as bigint)-cast(c.session_begin_time as bigint))/1000 session_time_diff,
|a.refer_session_id
|from refer_result_table a
|left join refer_menu_table b on a.session_id=b.session_id and a.device_token=b.device_token and a.user_id=b.user_id and a.menu_code=b.menu_code and a.created_time=b.created_time
|left join session_end_table c on a.session_id = c.session_id
| distribute by rand()
|""".stripMargin
val menuCodeAddDF = sparkSession.sql(getReferSql).repartition(100)
println("menuCodeAddDF.show=======")
menuCodeAddDF.printSchema()
menuCodeAddDF.persist(StorageLevel.MEMORY_AND_DISK_SER)
println("-------------------------------match user_id 逻辑-------------------------------------------------")
val resultDF: DataFrame = sessionProcessTerm.matchUserId(menuCodeAddDF, sparkSession, scnData).repartition(5)
println("dwFactLogSession.show=======>")
resultDF.printSchema()
// resultDF.persist(StorageLevel.MEMORY_AND_DISK_SER)
println("-----------------------------------load data to pica_dw.dw_fact_log_session_term-----------------")
sessionProcessTerm.loadData(resultDF, sparkSession, scnData,index)
val resCount = resultDF.count().toInt
println(s"${condition}的结果==${resCount}")
dataCount += resCount
}
val fields = List("id","session_id","device_token","user_id","mobile","menu_code","menu_begin_time","menu_end_time","menu_time_diff","action_type",
"action_code","position","label_value","refer_menu_code","refer_action_code","refer_position","refer_label_value","refer_action_type","action_step",
"device_type","app_version","created_time","date_time","session_begin_time","session_end_time","session_time_diff","refer_session_id")
// sparkSession.sql(s"insert overwrite table ${MyConfigSession.HIVE_TABLE4} partition(created_day='${scnData}') " +
// s"select ${fields.mkString(",")} from ${MyConfigSession.HIVE_TABLE4} where created_day='${scnData}'")
println("----------------------------------update task record table---------------------------------------")
//任务执行成功,更新 Mysql record 配置表
val updateSQL: String =
s"""
|update ${MyConfigSession.JDBC_TABLE} set status=?,end_time=?,data_count=? where job_name='${MyConfigSession.HIVE_TABLE4}' and start_time='${startTime}'
""".stripMargin
val upreSta: PreparedStatement = connSql.prepareStatement(updateSQL)
upreSta.setString(1, "1")
upreSta.setString(2, DateUtils.getTodayTime)
upreSta.setInt(3, resultDF.count().toInt)
//更新表数据
upreSta.executeUpdate()
//关闭连接
JDBCUtil.close(connSql, upreSta)
sparkSession.stop()
}catch {
case e:Exception => {
println("-----------------------------------任务异常---------------------------------------------------")
val exceptionSQL: String =
s"""
|update ${MyConfigSession.JDBC_TABLE} set status=?,exception=?,end_time=? where job_id=1968 and start_time='${startTime}'
val upreSta: PreparedStatement = connSql.prepareStatement(updateSQL)
upreSta.setString(1, "1")
upreSta.setString(2, DateUtils.getTodayTime)
upreSta.setInt(3, dataCount.toInt)
//更新表数据
upreSta.executeUpdate()
//关闭连接
JDBCUtil.close(connSql, upreSta)
sparkSession.stop()
} catch {
case e: Exception => {
println("-----------------------------------任务异常---------------------------------------------------")
e.printStackTrace()
val exceptionSQL: String =
s"""
|update ${MyConfigSession.JDBC_TABLE} set status=?,exception=?,end_time=? where job_name='${MyConfigSession.HIVE_TABLE4}' and start_time='${startTime}'
""".stripMargin
val errorArr = Array[String]("2", e.getMessage, DateUtils.getTodayTime)
JDBCUtil.insertRecord(connSql, exceptionSQL, errorArr)
connSql.close()
}
}
val errorArr = Array[String]("2", e.getMessage, DateUtils.getTodayTime)
JDBCUtil.insertRecord(connSql, exceptionSQL, errorArr)
connSql.close()
}
}
}
}
class SessionProcessTerm {
def getSparkSession(appName: String): SparkSession = {
val conf: SparkConf = new SparkConf().setAppName(appName)
UseUtil.setConfigure(conf)
val sparkSession: SparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
sparkSession
}
/**
* @Description 获取需要的字段的refer字段
* @param dataFrame 源数据
* @param sparkSession SparkSession 环境
* @return org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>
**/
def getReferColumns(dataFrame: DataFrame ,sparkSession: SparkSession):DataFrame = {
//先按照 session_id分区,再按照 created_time排序,进行窗口计算
val sessionIDWinSpec: WindowSpec = Window.partitionBy("session_id").orderBy("created_time")
//增叫refer_字段
val menuDF: DataFrame =
dataFrame.withColumn("refer_menu_code", lag(dataFrame("menu_code"), 1).over(sessionIDWinSpec))
val acodeDF: DataFrame =
menuDF.withColumn("refer_action_code", lag(menuDF("action_code"), 1).over(sessionIDWinSpec))
val positionDF: DataFrame =
acodeDF.withColumn("refer_position", lag(acodeDF("position"), 1).over(sessionIDWinSpec))
val actypeDF: DataFrame =
positionDF.withColumn("refer_action_type", lag(positionDF("action_type"), 1).over(sessionIDWinSpec))
val recreatDF: DataFrame =
actypeDF.withColumn("refer_created", lag(actypeDF("created_time"), 1).over(sessionIDWinSpec))
val rowNumberDF: DataFrame =
recreatDF.withColumn("step_id", row_number().over(sessionIDWinSpec))
//去掉refer字段中的NULL值
val coaleseDF: DataFrame = rowNumberDF.selectExpr(
"session_id", "user_id", "action_type", "user_token", "menu_code", "action_code", "position", "label_value",
"COALESCE(refer_menu_code,'') as refer_menu_code",
"COALESCE(refer_action_code,'') as refer_action_code",
"COALESCE(refer_position,'') as refer_position",
"COALESCE(refer_action_type,'') as refer_action_type",
"COALESCE(refer_created,created_time) as refer_created",
"step_id", "app_version", "device_type", "created_time", "date_time")
//在此基础上增加字段 refer_time_diff,值为 created_time, refer_created 之差
val referTimeDiff: DataFrame =
coaleseDF.withColumn("refer_time_diff", coaleseDF("created_time") - coaleseDF("refer_created"))
referTimeDiff
/**
* @Description 匹配user_id,补全数据中的user_id字段
* @param dataFrame 筛选后的数据
* @param sparkSQLSession SparkSession 环境
* @param created_day 当前数据的日期,格式 "2020-03-01"
* @return org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>
**/
def matchUserId(dataFrame: DataFrame, sparkSQLSession: SparkSession, created_day: String): DataFrame = {
//追加:将dataFrame与pica_ds.pica_doctor根据user_id进行匹配,匹配不上的user_id置为'0'
println("matchUserId开始执行-----------------------------------")
dataFrame.withColumnRenamed("user_id", "user_id_ods").createOrReplaceTempView(MyConfigSession.VIEW_SESSION_ODS)
val INIT_USER_ID_SQL_PREF =
s"""
|SELECT t.*,t.user_id_ods user_id_old, COALESCE(cast(b.id as string),'0') AS user_id from ${MyConfigSession.VIEW_SESSION_ODS} as t
|left join (select id,cast(id as string) id_str from pica_ds.pica_doctor a where a.delete_flag = 1 and to_date(a.creat_time) <= '${created_day}') AS b on t.user_id_ods = cast(b.id as string)
""".stripMargin
val DF = sparkSQLSession.sql(INIT_USER_ID_SQL_PREF).drop("user_id_ods")
/*
以下的所有逻辑是为了补全user_id字段
*/
//第一步:首先筛选出不符合的use_id数据,将这些user_id置为字符串'0'
val noMatchUserIdDF: Dataset[Row] = DF.where("user_id ='' OR user_id = '0' OR LENGTH(user_id) = 24")
noMatchUserIdDF.drop("user_id_old").withColumnRenamed("user_id", "user_id_old")
.createOrReplaceTempView(MyConfigSession.VIEW_SESSION_NO_MATCH)
val MOBILE_PHONE_SQL_PREF: String =
s"""
|SELECT ss.*,COALESCE(cast(b.id as string),'0') AS user_id from ${MyConfigSession.VIEW_SESSION_NO_MATCH} AS ss
|left join (select distinct id,mobile_phone from pica_ds.pica_doctor where pica_doctor.delete_flag = 1 ) AS b on ss.mobile = b.mobile_phone
""".stripMargin
//1.筛选出上一步没有匹配到的user_id,先按照mobile_phone进行匹配
val mobilePhoneDF: DataFrame = sparkSQLSession.sql(MOBILE_PHONE_SQL_PREF)
//2.使用临时表equiment,筛选出为1的那条最新数据
var equipmentInfoSql = MyConfigSession.EQUIPMENT_INFO_SQL
if (!created_day.equals(DateUtils.getYesterdayDate)) { //如果不是跑昨天的数据,使用equipment拉链表
equipmentInfoSql = MyConfigSession.EQUIPMENT_INFO_SQL_ARGS + s"'${created_day}'"
}
println(s"equipmentInfoSql=${equipmentInfoSql}")
val equipmentDF: DataFrame = sparkSQLSession.sql(equipmentInfoSql).where("row_d =1")
equipmentDF.createOrReplaceTempView(MyConfigSession.VIEW_EQUIPMENT_INFO)
mobilePhoneDF.drop("user_id_old").withColumnRenamed("user_id", "user_id_old")
.createOrReplaceTempView(MyConfigSession.VIEW_MOBILE_PHONE)
val DEVICE_TOKEN_SQL_PREF: String =
s""" SELECT t.*,COALESCE(cast(b.user_id as string),'0') AS user_id
| from (select * from ${MyConfigSession.VIEW_MOBILE_PHONE} a where a.user_id_old= '0' ) as t
|left join ${MyConfigSession.VIEW_EQUIPMENT_INFO} as b on t.device_token = b.device_token
""".stripMargin
println(s"DEVICE_TOKEN_SQL_PREF=${DEVICE_TOKEN_SQL_PREF}")
//3.将第2步筛选出来的数据按照device_token进行匹配,获得user_id
val deviceTokenDF: DataFrame = sparkSQLSession.sql(DEVICE_TOKEN_SQL_PREF)
//4.将上述三者union,最终导入表中的数据
val rightUserIdDF: Dataset[Row] = DF.where("user_id !='' and user_id != '0' and LENGTH(user_id) !=24")
val mobilePhoneResDF: Dataset[Row] = mobilePhoneDF.where("user_id !='0'")
println("rightUserIdDF/mobilePhoneResDF/deviceTokenDF.schema===========")
rightUserIdDF.printSchema()
mobilePhoneResDF.printSchema()
deviceTokenDF.printSchema()
val dwFactLogSession: Dataset[Row] = rightUserIdDF.union(mobilePhoneResDF).union(deviceTokenDF)
dwFactLogSession.createOrReplaceTempView(MyConfigSession.VIEW_DEVICE_TOKEN)
dwFactLogSession
}
/**
* @Description 获取需要的字段的refer字段以及对pseudo_session进行拆分
* https://www.tapd.cn/64812329/prong/stories/view/1164812329001012031需求上线版本为(Android:3.4.5,iOS:3.4.4,之后的版本统一到3.4.6)
* 计算sessionId,按版本号区分数据,之前的作为老数据处理,之后的作为新数据处理,老数据对pseudo_session进行拆分,时间最大间隔30分钟
* 新数据不拆分session,按ACTION_HEART_BEAT区分session活跃时间(新增ACTION_HEART_BEAT事件的版本是3.3.2)
* @param sourceDF 源数据
* @return org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>
**/
def getReferColumns(sourceDF: DataFrame): RDD[Row] = {
val actions = List("'ACTION_VIEW'", "'ACTION_CLICK'", "'ACTION_EXPOSE'") //高版本使用"'ACTION_HEART_BEAT'"
val actionCommonDF = sourceDF.where(s" ( action_type in (${actions.mkString(",")}) or (action_type='ACTION_HEART' and menu_code_offset='back' ))")
println("actionCommonDF.show()===========")
actionCommonDF.printSchema()
// actionCommonDF.where("pseudo_session='5964908742'").show(200)
//计算session中menu_code的起始时间,对于ACTION_HEART_BEAT的埋点记录剔除冗余数据
val groupRdd = actionCommonDF.rdd.groupBy(r => r.getAs[String]("pseudo_session"))//.repartition(100)
println("开始执行groupRdd.flatMap=========")
val resRdd = groupRdd.flatMap(g => {
val pseudo_session = g._1
val resList: ListBuffer[Row] = new ListBuffer[Row]()
var rowList = g._2
rowList = rowList.toList.sortWith((x, y) => x.getAs[String]("created_time") < y.getAs[String]("created_time")) //按created由小到小排序
var actionStep = "0_0"
// var sessionBeginTime = rowList.head.getAs[String]("created_time")
var thisMenuCode = ""
var menuBeginTime = ""
var prefCreatedTime = ""
var prefActionType = ""
var count = 0
/**
* @Description 导入数据到表中
* @param dataFrame 源数据
* @param sparkSession SparkSession 环境
* @param partitionDay 分区日期
* @return void
**/
def loadData(dataFrame: DataFrame, sparkSession: SparkSession, partitionDay:String):Unit = {
dataFrame.createOrReplaceTempView("result_view")
val loadDataSql =
s"""
|insert overwrite table ${MyConfigSession.HIVE_TABLE2} partition(created_day='${partitionDay}')
| select session_id,user_id,action_type,user_token,menu_code,action_code,position,label_value,
| refer_menu_code,refer_action_code,refer_position,refer_action_type,
| cast(refer_time_diff as int) as refer_time_diff,step_id,app_version,device_type,created_time,date_time
| from result_view
""".stripMargin
sparkSession.sql(loadDataSql)
}
for (row <- rowList) {
var isSessionNew = false
var appVersion = row.getAs[String]("app_version")
//如果版本号低于3.3.2,则没有ACTION_HEART_BEAT事件,需要根据两次事件之间的时差对session进行切分,超过30分钟则拆分出一个session
var createdTime = row.getAs[String]("created_time")
if (appVersion < "3.3.2") { //版本号低于3.3.2,需要根据时差判断是否拆分session
if (prefCreatedTime != "" && (createdTime.toLong - prefCreatedTime.toLong) > MyConfigSession.SESSION_GAP) { //本次action事件事件比
count += 1
isSessionNew = true
}
}
var menuCode = row.getAs[String]("menu_code_offset")
val action = row.getAs[String]("action")
val actionType = row.getAs[String]("action_type")
var sessionId = pseudo_session + count
var referSessionId = ""
if (count > 0) {
referSessionId = pseudo_session + (count - 1)
}
//使用版本以及action_type限制是否入库
var needPut = true
if ("ACTION_VIEW".equals(actionType) || "ACTION_HEART".equals(actionType)) {
if (appVersion >= "3.4.5") { //针对3.4.5之后的版本单独处理
//如果本条记录为ACTION_ACTIVITY_RESUME类型或者ACTION_HEART类型,则入库
if ("ACTION_ACTIVITY_RESUME".equals(action) || "ACTION_HEART".equals(actionType) ) {
needPut = true
}else{
needPut = false
}
if ( thisMenuCode.equals(menuCode) && ("ACTION_HEART"==prefActionType || "ACTION_VIEW" == prefActionType)){
needPut = false
}
}else{
if ( thisMenuCode.equals(menuCode) && ("ACTION_HEART"==prefActionType || "ACTION_VIEW" == prefActionType) && !isSessionNew ) {
needPut = false
}
}
if(needPut){
menuBeginTime = createdTime
}
// /*
// 排除掉冗余数据:
// 1.如果上条埋点和本条埋点都是ACTION_HEART_BEAT类数据,则本条记录不在统计进来
// 2.如果本条和上条记录都为ACTION_VIEW类型,且menu不变,则本条记录不入库
// */
// if ( thisMenuCode.equals(menuCode) && ("ACTION_HEART"==prefActionType || "ACTION_VIEW" == prefActionType) && !isSessionNew ) {
// needPut = false
// }else{
// needPut = true
// menuBeginTime = createdTime
// }
}
if (!thisMenuCode.equals(menuCode) || isSessionNew) {
menuBeginTime = createdTime
if(needPut ){//只有在确定该menu_code对应的记录入库后,才更新thisMenuCode以及对应的menu_code访问顺序
thisMenuCode = menuCode
actionStep = (actionStep.split("_")(0).toInt + 1).toString + "_0"
}
} else {
if (!(prefActionType.equals(actionType) && "ACTION_VIEW".equals(actionType))) {
actionStep = (actionStep.split("_")(0)) + "_" + (actionStep.split("_")(1).toInt + 1).toString
}
}
// println(s"created_time=${createdTime},thisMenuCode=${thisMenuCode},actionType=${actionType},action=${action},isMenuChange=${!thisMenuCode.equals(menuCode)},prefActionType=${prefActionType},needPut=${needPut}" )
prefCreatedTime = createdTime
prefActionType = actionType
if (needPut) {
resList += (Row(sessionId,
row.getAs[String]("device_token"),
row.getAs[Integer]("user_id"),
row.getAs[String]("mobile"),
thisMenuCode,
menuBeginTime,
row.getAs[String]("action_code"),
row.getAs[String]("position"),
row.getAs[String]("label_value"),
action, actionType, actionStep,
row.getAs[String]("device_type"),
row.getAs[String]("app_version"),
createdTime,
row.getAs[String]("date_time"),
referSessionId
))
}
}
resList.iterator
})
resRdd
}
/**
* @Description 计算session_end_time,分两张情况:
* 1.如果是低版本数据,session有拆分记录,则取session_id的首条记录时间以及末次记录时间作为session起始结束时间
* 2.如果是高版本数据,session无拆分记录,则取session_id对应的pseudo_session首次以及末次出现的记录时间作为session起止时间
* @param sourceDF 源数据
* @return
*/
def getSessionTail(sourceDF: DataFrame, sparkSession: SparkSession): DataFrame = {
sourceDF.createOrReplaceTempView("select_res_table")
/*
1.排除掉所有拆分过新session的pseudo_session,计算出他们的起止时间
*/
val sourceSql =
"""
| select concat(pseudo_session,'0') session_id,min(created_time) session_begin_time,max(created_time) session_end_time from select_res_table
| where pseudo_session not in (select distinct SUBSTRING(refer_session_id,1,length(refer_session_id)-1) from refer_result_table where refer_session_id like '%0')
| group by concat(pseudo_session,'0')
|""".stripMargin
//2.计算所有新拆分的sessionid的起止时间
val sourceSql2 =
"""
|select session_id,min(created_time) session_begin_time,max(created_time) session_end_time from refer_result_table
| where SUBSTRING(session_id,1,length(session_id)-1)
| in (select distinct SUBSTRING(refer_session_id,1,length(refer_session_id)-1) from refer_result_table where refer_session_id!='')
| group by session_id
|""".stripMargin
val uniqueSessionEndDF = sparkSession.sql(sourceSql )
println("uniqueSessionEndDF.show()=========")
val firstSessionEndDF = sparkSession.sql(sourceSql2)
println("firstSessionEndDF.show()=========")
var sessionEndDF = uniqueSessionEndDF.union(firstSessionEndDF)
sessionEndDF
}
/**
* @Description 导入数据到表中
* @param dataFrame 源数据
* @param sparkSession SparkSession 环境
* @param partitionDay 分区日期
* @return void
**/
def loadData1(dataFrame: DataFrame, sparkSession: SparkSession, partitionDay: String,index:Integer): Unit = {
dataFrame.createOrReplaceTempView("result_view")
var insertSql = "insert overwrite"
if(index!=1){
insertSql = "insert into"
}
//补充新生session超过10个之后的session的session_begin_time与session_end_time
val loadDataSql =
s"""
|${insertSql} table ${MyConfigSession.HIVE_TABLE4} partition(created_day='${partitionDay}')
| select id,t1.session_id,device_token,user_id,mobile,menu_code,menu_begin_time,
| menu_end_time,
| menu_time_diff,
| action_type,action_code,position,label_value,refer_menu_code,refer_action_code,refer_position,refer_label_value,refer_action_type,
| action_step,device_type,app_version,created_time,date_time,
| session_begin_time,
| session_end_time,
| session_time_diff,
| refer_session_id from result_view t1
""".stripMargin
sparkSession.sql(loadDataSql)
}
def loadData(dataFrame: DataFrame, sparkSession: SparkSession, partitionDay: String,index:Integer): Unit = {
dataFrame.createOrReplaceTempView("result_view")
var insertSql = "insert overwrite"
if(index!=1){
insertSql = "insert into"
}
//补充新生session超过10个之后的session的session_begin_time与session_end_time
val loadDataSql =
s"""
|${insertSql} table ${MyConfigSession.HIVE_TABLE4} partition(created_day='${partitionDay}')
| select id,t1.session_id,device_token,user_id,mobile,menu_code,menu_begin_time,
| case when t1.menu_end_time is null and t1.action_type='ACTION_VIEW' then t2.session_end_time else t1.menu_end_time end menu_end_time,
| (cast((case when t1.menu_end_time is null and t1.action_type='ACTION_VIEW' then t2.session_end_time else t1.menu_end_time end) as bigint)-cast(t1.menu_begin_time as bigint))/1000 menu_time_diff,
| action_type,action_code,position,label_value,refer_menu_code,refer_action_code,refer_position,refer_label_value,refer_action_type,
| action_step,device_type,app_version,created_time,date_time,
| case when t1.session_begin_time is not null then t1.session_begin_time else t2.session_begin_time end session_begin_time,
| case when t1.session_end_time is not null then t1.session_end_time else t2.session_end_time end session_end_time,
| case when t1.session_time_diff is not null then t1.session_time_diff else t2.session_time_diff end session_time_diff,
| refer_session_id from result_view t1
| left join (select session_id, min(created_time) session_begin_time,max(created_time) session_end_time,
| (cast(max(created_time) as bigint)-cast( min(created_time) as bigint))/1000 session_time_diff
| from result_view where session_time_diff is null group by session_id ) t2
| on t1.session_id=t2.session_id
""".stripMargin
sparkSession.sql(loadDataSql)
}
}
package com.utils
import com.session.{SessionMenuCalc, SessionProcess, SessionProcessHeart, SessionProcessPath, SessionProcessPref}
import com.session.{SessionMenuCalc, SessionProcess, SessionProcessHeart, SessionProcessPath, SessionProcessPref, SessionProcessTerm}
import org.apache.hadoop.util.ProgramDriver
/**
......@@ -16,6 +16,7 @@ object Driver {
driver.addClass("SessionProcessHeart",classOf[SessionProcessHeart],"用户Session数据分析导入到dw_fact_log_session_heart表")
driver.addClass("SessionMenuCalc",classOf[SessionMenuCalc],"传递日期参数--用户Session数据分析导入到dw_fact_log_session_menu_calc表")
driver.addClass("SessionProcessPref",classOf[SessionProcessPref],"传递日期参数--用户Session数据etl导入到dw_fact_log_sesson_pref表")
driver.addClass("SessionProcessTerm",classOf[SessionProcessTerm],"传递日期参数--用户Session数据etl导入到dw_fact_log_sesson_term表")
driver.run(args)
}
}
Markdown 格式
0% or
您添加了 0 到此讨论。请谨慎行事。
先完成此消息的编辑!
想要评论请 注册