提交 1a0bc349 编写于 作者: wuyunfeng's avatar wuyunfeng

新增一个job,SessionProcessPathNew

上级 7dec4501
......@@ -41,7 +41,7 @@ object MyConfigSession {
final val SOURCE_SQL_PREF: String =
"""
|select device_token,pseudo_session,class_name,action,view_path,component_tag,created,mobile,doctor_id,device_brand,device_model,app_version,
|device_type,web_data,web_data_type,alternate_info,network_type,remark1 login_state,remark2 first_app_version,user_token_tourist,serviceName
|device_type,web_data,web_data_type,alternate_info,network_type,remark1 login_state,remark2 first_app_version,remark3 ,user_token_tourist,serviceName
| from pica_log.picalog_trace_app_part
| where pseudo_session is not null and pseudo_session !=''
| and pseudo_id !='' and extra_info !='com.picahealth.patient' and serviceName != 'trace3'
......@@ -78,6 +78,15 @@ object MyConfigSession {
| and (menu_code != '0' and menu_code !='null' and menu_code !='' and length(menu_code) <= 3 )
""".stripMargin
//从dw_fact_log_session_TERM表中筛选数据
final val SOURCE_SQL_PATH_NEW: String =
s"""
|select id log_session_id, session_id, user_id,device_token,action_type,user_token,menu_code,action_code,position,label_value,label_class,action_step,
|app_version,device_type,device_brand,device_model,net_type,created_time,date_time,module_class1,module_class2 from ${MyConfigSession.HIVE_TABLE4}
| where app_version >= '3.1.7'
| AND ((action_type ='ACTION_CLICK' and action_code != 'null' ) OR action_type ='ACTION_VIEW' )
| and (menu_code != '0' and menu_code !='null' and menu_code !='' and length(menu_code) <= 3 and cast(menu_code as int ) is not null)
""".stripMargin
//匹配user_id的条件
......@@ -101,7 +110,7 @@ object MyConfigSession {
| from ${MyConfigSession.VIEW_SESSION_ODS} as t
|left join pica_ds.pica_doctor as b on t.user_id = cast(b.id as string)
""".stripMargin
//1.针对没有匹配到的user_id,先使用 mobile_phone 进行匹配,得到 user_id 匹配,'0'
//1.针对没有匹配到的user_id,先使用 mobile_phone 进行匹配,得到 user_id 匹配,'0',XK0HdMN6dAfOlYPOFHHL0A==表示''加密之后的mobile
final val MOBILE_PHONE_SQL: String =
s"""
|SELECT ss.session_id, COALESCE(cast(b.id as string),'0') AS user_id, ss.mobile, ss.device_token, ss.user_token,
......@@ -109,7 +118,7 @@ object MyConfigSession {
|ss.action_code, ss.position,ss.label_value,ss.label_class,ss.module_class1,ss.module_class2,ss.app_version, ss.device_type,
|ss.device_brand, ss.device_model,ss.device_system, ss.net_type,ss.created_time,
|ss.date_time from ${MyConfigSession.VIEW_SESSION_NO_MATCH} AS ss
|left join (select distinct id,mobile_phone from pica_ds.pica_doctor where pica_doctor.delete_flag = 1 ) AS b on ss.mobile = b.mobile_phone
|left join (select distinct id,mobile_phone from pica_ds.pica_doctor where pica_doctor.delete_flag = 1 and mobile_phone!='' and mobile_phone!='XK0HdMN6dAfOlYPOFHHL0A==' ) AS b on ss.mobile = b.mobile_phone
""".stripMargin
final val MOBILE_PHONE_SQL_PREF: String =
s"""
......@@ -125,8 +134,9 @@ object MyConfigSession {
final val EQUIPMENT_INFO_SQL: String =
"""
|SELECT a.user_id,a.device_token ,ROW_NUMBER() OVER ( PARTITION BY a.device_token ORDER BY a.creat_time DESC ) row_d
|from pica_ds.picams_equipment_info AS a
| where a.user_id IS NOT NULL
|from (select e1.user_id,e1.device_token,e1.creat_time,e1.modify_time,e1.delete_flag from pica_ds.picams_equipment_info e1 union
| select e2.user_id,e2.device_token,e2.creat_time, e2.modify_time,e2.delete_flag from pica_ds.picams_p_equipment_info_shadow e2 ) a
| where a.user_id IS NOT NULL and a.user_id>0 and a.delete_flag=1 and device_token is not null
| AND (to_date(a.creat_time) = date_sub(current_date(),1) OR to_date(a.modify_time) = date_sub(current_date(),1))
""".stripMargin
......
package com.session
import java.sql
import java.sql.PreparedStatement
import com.config.MyConfigSession
import com.pica.utils.{DateUtils, StringUtils}
import com.utils.{JDBCUtil, UseUtil}
import org.apache.spark.SparkConf
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.expressions.{Window, WindowSpec}
import org.apache.spark.sql.functions.{lag, row_number}
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import scala.util.control.Breaks.{break, breakable}
/**
* 处理昨天的数据,导入到pica_dw.dw_fact_log_session_path_new表
* @Author yunfeng.wu
* @Date 2020/10/29 09:58
* @Version 1.0
*/
object SessionProcessPathNew {
def apply(): SessionProcessPathNew = new SessionProcessPathNew()
def main(args: Array[String]): Unit = {
//1.执行任务之前先往record表记录
val insertSQL: String =
s"""
|insert into ${MyConfigSession.DATA_BASE}.${MyConfigSession.JDBC_TABLE} (job_id,job_name,job_type,job_scn,status,start_time)
|values(0,'pica_dw.dw_fact_log_session_path_new','3',?,'0',?)
""".stripMargin
//设置同步数据的批次号,格式是2019-09-12
var scnData: String = DateUtils.getYesterdayDate
var elseFiler = " 1=1"
if (args.length >= 1) {
scnData = args(0)
if(args.length > 1 && args(1)!=""){
elseFiler = args(1)
}
}
println(s"scnData=${scnData}")
//设置任务开始时间,格式是2019-09-12 14:03:30
val startTime: String = DateUtils.getTodayTime
//存储SQL中的参数
val insertArr: Array[String] = Array[String](scnData, startTime)
//获取MYSQL连接
val connSql: sql.Connection = JDBCUtil.getConnection()
//向 record 表插入数据
val flag: Int = JDBCUtil.insertRecord(connSql, insertSQL, insertArr)
val sessionProcessPathNew: SessionProcessPathNew = SessionProcessPathNew()
try {
val sparkSession: SparkSession = sessionProcessPathNew.getSparkSession("SessionProcessPathNew")
//获取position对应的label_value广播变量
val positionUrlLabelBroad = UseUtil.getBroadcast(sparkSession, MyConfigSession.ACTION_URLLABEL_SQL, "url_content", "label_value")
println(s"positionUrlLabelBroad=${positionUrlLabelBroad.value}")
//筛选源数据
println(s"查询sql:${MyConfigSession.SOURCE_SQL_PATH_NEW +s" and created_day='${scnData}' and ${elseFiler}"}")
val sourceDF: DataFrame = sparkSession.sql(MyConfigSession.SOURCE_SQL_PATH_NEW +s" and created_day='${scnData}' and ${elseFiler}")
sourceDF.show()
println(s"sourceDF.count=${sourceDF.count()}")
var pathStepDF = sessionProcessPathNew.getReferColumns(sourceDF.where("action_type='ACTION_CLICK'"),positionUrlLabelBroad )
println(s"pathStepDF.count=${pathStepDF.count()}")
pathStepDF.createOrReplaceTempView("menu_refer_record")
println("-----------------------------------compute refer columns-----------------------------------------")
val referDF= sparkSession.sql("select t.*," +
"(cast(t.created_time as bigint) - cast(t.refer_created as bigint) ) refer_time_diff, " +
"row_number() over(partition by t.session_id order by t.created_time ) step_id " +
"from menu_refer_record t where action_type='ACTION_CLICK' " ) //
referDF.createOrReplaceTempView("pref_menu_info")
val referResDF: DataFrame = referDF.select( "log_session_id","session_id","user_id","device_token","action_type","user_token","menu_code","action_code","position",
"label_value","label_class","refer_log_session_id","refer_menu_code","refer_action_code","refer_position","refer_label_value","refer_created","step_id","app_version",
"device_type","device_brand","device_model","net_type","created_time","date_time","module_class1","module_class2","refer_time_diff")
println("referResDF.printSchema()")
referResDF.printSchema()
println(s"referResDF.count=${referResDF.count()}")
println("------------------------------------单独计算label_value----------------------------------------------")
//"menu_code = '930' and action_code IN ( '930000', '930001', '930002' ) and action_type = 'ACTION_CLICK'
println("-----------------------------------load data to pica_dw.dw_fact_log_session_path-----------------")
sessionProcessPathNew.loadData(referResDF,sparkSession,scnData)
println("----------------------------------update task record table---------------------------------------")
//任务执行成功,更新 Mysql record 配置表
val updateSQL: String =
s"""
|update ${MyConfigSession.JDBC_TABLE} set status=?,end_time=?,data_count=? where job_id=0 and start_time='${startTime}'
""".stripMargin
val upreSta: PreparedStatement = connSql.prepareStatement(updateSQL)
upreSta.setString(1, "1")
upreSta.setString(2, DateUtils.getTodayTime)
upreSta.setInt(3, referResDF.count().toInt)
//更新表数据
upreSta.executeUpdate()
//关闭连接
JDBCUtil.close(connSql, upreSta)
sparkSession.stop()
}catch {
case e:Exception => {
println("-----------------------------------任务异常---------------------------------------------------")
e.printStackTrace()
val exceptionSQL: String =
s"""
|update ${MyConfigSession.JDBC_TABLE} set status=?,exception=?,end_time=? where job_id=0 and start_time='${startTime}'
""".stripMargin
val errorArr = Array[String]("2", e.getMessage, DateUtils.getTodayTime)
JDBCUtil.insertRecord(connSql, exceptionSQL, errorArr)
connSql.close()
}
}
}
}
class SessionProcessPathNew {
def getSparkSession(appName: String): SparkSession = {
val conf: SparkConf = new SparkConf().setAppName(appName)
UseUtil.setConfigure(conf)
val sparkSession: SparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
sparkSession
}
/**
* @Description 获取需要的字段的refer字段
* @param dataFrame 源数据
* @return org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>
**/
def getReferColumns( dataFrame: DataFrame,positionUrlLabelBroad:Broadcast[Map[String,String]]): DataFrame = {
val groupRdd = dataFrame.rdd.groupBy(r => r.getAs[String]("session_id"))
val baseRdd = groupRdd.flatMap(g => {
val session_id = g._1
val resList: ListBuffer[PathStep] = new ListBuffer[PathStep]()
var rowList = g._2
rowList = rowList.toList.sortWith((x, y) => x.getAs[String]("created_time") < y.getAs[String]("created_time")) //按created由大到小排序
var preId = ""
var preMenuCode = ""
var referLabelValue = ""
var prePosition = ""
var preActionCode = ""
var preActionType = ""
var preCreated = ""
var actionStepNew = "0_0"
var thisMenuCode = ""
var referMap = mutable.Map[String, String]()
import scala.collection.mutable._
var menuStack = Stack[ListBuffer[String]]()
//获取session的上个menu_code的最后一次action_code值,如无则空
rowList.foreach(row => {
var log_session_id = row.getAs[String]("log_session_id")
var menu_code = row.getAs[String]("menu_code")
var action_type=row.getAs[String]("action_type")
var action_code=row.getAs[String]("action_code")
val position = row.getAs[String]("position")
val label_value = row.getAs[String]("label_value")
val created = row.getAs[String]("created_time")
//重新计算action_step_new
//如果menuCode发生改变,或者session改变,重置本轮menuCode
if (!thisMenuCode.equals(menu_code) ) {
thisMenuCode = menu_code
if(preActionType=="ACTION_CLICK"){//只有上个页面有过click行为时才改变refer值,否则refer值为空
referMap("referLogSessionId") = preId
referMap("referMenuCode") = preMenuCode
referMap("referActionCode") = preActionCode
referMap("referPostion") = prePosition
referMap("referLabelValue") = referLabelValue
referMap("referCreated") = preCreated
//menu_code改变,先查询栈顶记录,如果栈顶记录的pre记录与当前menu_code一致,则弹出栈顶元素,否则将上个menu_code的信息放入站中
//弹出栈顶记录后,下个栈顶记录数据中的refer信息作为该menu_code的refer(前提当前页面不是首页)
if(!menuStack.isEmpty && menuStack.top.apply(1) == menu_code ){
var preMenuInfo = menuStack.pop()
if(!menuStack.isEmpty){
referMap("referLogSessionId") = menuStack.top.apply(0)
referMap("referMenuCode") = menuStack.top.apply(1)
referMap("referActionCode") = menuStack.top.apply(2)
referMap("referPostion") = menuStack.top.apply(3)
referMap("referLabelValue") = menuStack.top.apply(4)
referMap("referCreated") = menuStack.top.apply(5)
}else{
referMap("referLogSessionId") = ""
referMap("referMenuCode") = ""
referMap("referActionCode") = ""
referMap("referPostion") = ""
referMap("referLabelValue") = ""
referMap("referCreated") = ""
}
}else{
var referInfo = ListBuffer(preId,preMenuCode,preActionCode,prePosition,referLabelValue,preCreated)
menuStack.push(referInfo)
}
if(menu_code=="001"){//针对首页,都认为是返回类型操作,将refer指向空
menuStack.clear()
referMap("referLogSessionId") = ""
referMap("referMenuCode") = ""
referMap("referActionCode") = ""
referMap("referPostion") = ""
referMap("referLabelValue") = ""
referMap("referCreated") = ""
}
}else{
referMap("referLogSessionId") = ""
referMap("referMenuCode") = ""
referMap("referActionCode") = ""
referMap("referPostion") = ""
referMap("referLabelValue") = ""
referMap("referCreated") = ""
}
actionStepNew = (actionStepNew.split("_")(0).toInt + 1).toString + "_0"
} else {//如果menuCode与session都不变, 则调整actionStep
actionStepNew = (actionStepNew.split("_")(0)) + "_" + (actionStepNew.split("_")(1).toInt + 1).toString
}
preId = log_session_id
preMenuCode = menu_code
preActionType= action_type
preActionCode = action_code
prePosition = position
referLabelValue = label_value
preCreated = created
if(menu_code=="001"){
println(s"001的referMap==${referMap}")
}
resList += PathStep(row.getAs[String]("log_session_id"),
session_id,
row.getAs[Integer]("user_id"),
row.getAs[String]("device_token"),
action_type,
row.getAs[String]("user_token"),
menu_code,
row.getAs[String]("action_code"),
row.getAs[String]("position"),
row.getAs[String]("label_value"),
row.getAs[String]("label_class"),
row.getAs[String]("app_version"),
row.getAs[String]("device_type"),
row.getAs[String]("device_brand"),
row.getAs[String]("device_model"),
row.getAs[String]("net_type"),
row.getAs[String]("created_time"),
row.getAs[String]("date_time"),
row.getAs[String]("module_class1"),
row.getAs[String]("module_class2"),
referMap.getOrElse("referLogSessionId",""),
referMap.getOrElse("referMenuCode",""),
referMap.getOrElse("referActionCode",""),
referMap.getOrElse("referPostion",""),
referMap.getOrElse("referLabelValue",""),
referMap.getOrElse("referCreated",""),
actionStepNew )
})
resList.iterator
})
import dataFrame.sparkSession.implicits._
var baseDF = baseRdd.toDF("log_session_id","session_id","user_id","device_token","action_type","user_token","menu_code","action_code",
"position","label_value","label_class","app_version","device_type","device_brand","device_model","net_type", "created_time","date_time",
"module_class1","module_class2","refer_log_session_id","refer_menu_code","refer_action_code","refer_position","refer_label_value","refer_created" ,"action_step_new")
println("baseDF.show=======>")
baseDF.show()
baseDF.printSchema()
baseDF
}
/**
* @Description 导入数据到表中
* @param dataFrame 源数据
* @param sparkSession SparkSession 环境
* @param partitionDay 分区日期
* @return void
**/
def loadData(dataFrame: DataFrame, sparkSession: SparkSession, partitionDay:String):Unit = {
dataFrame.createOrReplaceTempView("result_view")
val tableName = "pica_dw.dw_fact_log_session_path_new"
val loadDataSql =
s"""
|insert overwrite table ${tableName} partition(created_day='${partitionDay}')
| select log_session_id, session_id,user_id,action_type,user_token,menu_code,action_code,position,label_value,label_class,
| refer_log_session_id,refer_menu_code,refer_action_code,refer_position,refer_label_value,
| cast(refer_time_diff as int) as refer_time_diff,
| step_id,app_version,device_type,device_brand,device_model,net_type ,created_time,date_time, module_class1, module_class2,
| case when user_id=0 then device_token else user_id end user_identity_id
| from result_view distribute by rand()
""".stripMargin
sparkSession.sql(loadDataSql)
}
}
case class PathStep(log_session_id: String,
session_id: String,
user_id: Integer,
device_token: String,
action_type: String,
user_token: String,
menu_code: String,
action_code: String,
position: String,
label_value: String,
label_class: String,
app_version: String,
device_type: String,
device_brand: String,
device_model: String,
net_type: String,
created_time: String,
date_time: String,
module_class1: String,
module_class2: String,
refer_log_session_id:String,
refer_menu_code:String,
refer_action_code:String,
refer_position:String,
refer_label_value:String,
refer_created:String,
action_step_new:String
)
......@@ -15,8 +15,7 @@ import scala.collection.mutable
import scala.collection.mutable.ListBuffer
/**
* 处理埋点数据,进行简单的清晰过滤,导入到DW层pica_dw.dw_fact_log_session_pref
*
* 处理埋点数据,进行简单的清晰过滤,导入到DW层pica_dw.dw_fact_log_session_pref,已废弃
* @Author yunfeng.wu
* @Date 2020/08/07 09:23
* @Version 1.0
......@@ -62,12 +61,12 @@ object SessionProcessPref {
println("---------------------------------------process columns-------------------------------------------")
import sessionProcessPref.sparkSession.implicits._
var baseDF = baseRdd.toDF("pseudo_session", "user_id", "mobile", "device_token", "user_token", "view_class", "view_path", "action", "action_type", "component_tag",
"menu_code", "menu_code_new", "action_code", "position", "label_value", "label_class", "module_class1", "module_class2", "app_version", "device_type", "device_brand", "device_model",
"menu_code", "menu_code_new", "action_code", "position", "label_value", "label_class", "module_class_1", "module_class_2", "app_version", "device_type", "device_brand", "device_model",
"net_type", "created_time", "date_time", "web_data", "web_data_type", "alternate_info", "login_state", "first_app_version", "serviceName", "tag8", "tag9", "tag10")
println("baseDF.show=======>")
baseDF.printSchema()
baseDF.repartition(120).persist(StorageLevel.MEMORY_AND_DISK_SER)
sessionProcessPref.loadData(baseDF,scnData,index)
sessionProcessPref.loadData(baseDF,scnData,index,dataCount)
dataCount += baseDF.count().toInt
}
println("----------------------------------update task record table---------------------------------------")
......@@ -115,12 +114,6 @@ class SessionProcessPref extends java.io.Serializable {
val sparkSession: SparkSession = getSparkSession("SessionProcessPref")
//获取符合要求的actionType广播变量
// val actionTypeBroad = UseUtil.getBroadcast(sparkSession, MyConfigSession.ACTION_TYPE_SQL, "action_type", "is_valid")
//获取clasName广播变量
// val classNameBroad =
// UseUtil.getBroadcast(sparkSession, MyConfigSession.CLASS_NAME_SQL, "class_name", "is_valid")
//获取menu_code广播变量
val menuCodeBroad = UseUtil.getBroadcast(sparkSession, MyConfigSession.MENU_CODE_SQL, "view_path", "menu_code")
// //获取actionCategory变量
val actionCategory = UseUtil.getBroadcast(sparkSession, MyConfigSession.ACTION_CATEGORY_SQL, "action_type", "action_category")
......@@ -167,7 +160,7 @@ class SessionProcessPref extends java.io.Serializable {
}
//3.拆分 component_tag字段
val component_tag: String = StringUtils.getNotNullString(row.getAs[String]("component_tag"))
val tagArr = Array("menu_code", "action_code", "position", "label_value", "label_class", "module_class1", "module_class2", "tag8", "tag9", "tag10")
val tagArr = Array("menu_code", "action_code", "position", "label_value", "label_class", "module_class_1", "module_class_2", "tag8", "tag9", "tag10")
val tagMap = mutable.Map[String, String]()
tagArr.foreach(r => tagMap.put(r, ""))
//将符合要求的component_tag进行切割,获取 aciton_code,label_value
......@@ -216,7 +209,7 @@ class SessionProcessPref extends java.io.Serializable {
StringUtils.getNotNullString(row.getAs[String]("class_name")),
StringUtils.getNotNullString(row.getAs[String]("view_path")),
action, action_type, component_tag, tagMap("menu_code"), menu_code_new, tagMap("action_code"),
tagMap("position"), tagMap("label_value"), tagMap("label_class"), tagMap("module_class1"), tagMap("module_class2"),
tagMap("position"), tagMap("label_value"), tagMap("label_class"), tagMap("module_class_1"), tagMap("module_class_2"),
StringUtils.getNotNullString(row.getAs[String]("app_version")),
StringUtils.getNotNullString(row.getAs[String]("device_type")),
StringUtils.getNotNullString(row.getAs[String]("device_brand")),
......@@ -238,7 +231,7 @@ class SessionProcessPref extends java.io.Serializable {
baseRdd
}
def loadData(dataFrame: DataFrame, partitionDay: String,index:Integer): Unit = {
def loadData(dataFrame: DataFrame, partitionDay: String,index:Integer,count:Integer): Unit = {
val tmpTable = "result_view"
var insertSql = "insert overwrite"
if(index!=1){
......@@ -247,10 +240,11 @@ class SessionProcessPref extends java.io.Serializable {
println(s"-----------------create view ${tmpTable} and load to ${MyConfigSession.HIVE_TABLE0} --------------------")
dataFrame.repartition(10).createOrReplaceTempView(tmpTable)
val fields = List("pseudo_session", "user_id", "COALESCE(cast(user_id as int),0) user_id_int", "mobile", "device_token", "user_token", "view_class", "view_path", "action", "action_type",
"component_tag", "menu_code", "menu_code_new", "action_code", "position", "label_value", "label_class", "module_class1", "module_class2", "app_version", "device_type", "device_brand",
"component_tag", "menu_code", "menu_code_new", "action_code", "position", "label_value", "label_class", "module_class_1", "module_class_2", "app_version", "device_type", "device_brand",
"device_model", "net_type", "created_time", "date_time", "web_data", "web_data_type", "alternate_info", "login_state", "first_app_version",
"servicename", "tag8", "tag9", "tag10")
val loadDataSql = s"${insertSql} table ${MyConfigSession.HIVE_TABLE0} partition(created_day='${partitionDay}') select ${fields.mkString(",")} from ${tmpTable} distribute by rand()"
val loadDataSql = s"${insertSql} table ${MyConfigSession.HIVE_TABLE0} partition(created_day='${partitionDay}') select concat(regexp_replace( '${partitionDay}','-','') ,cast( (row_number() over(partition by 1 order by created_time) +${count}) as string)) as id," +
s"${fields.mkString(",")} from ${tmpTable} distribute by rand()"
sparkSession.sql(loadDataSql)
}
}
......@@ -271,8 +265,8 @@ case class SessionPref(pseudo_session: String,
position: String,
label_value: String,
label_class: String,
module_class1: String,
module_class2: String,
module_class_1: String,
module_class_2: String,
app_version: String,
device_type: String,
device_brand: String,
......
......@@ -7,20 +7,23 @@ import com.config.MyConfigSession
import com.pica.utils.{DateUtils, StringUtils}
import com.utils.{JDBCUtil, UseUtil}
import org.apache.spark.SparkConf
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.storage.StorageLevel
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import scala.util.control.Breaks.{break, breakable}
/**
* 处理昨天的数据,用于计算时长以及访问路径,结果导入dw_fact_log_session_term
* 处理昨天的数据,用于计算时长以及访问路径,结果导入dw_fact_log_session_term
* 注意点:
* 1.只保留service_name='trace2'(native)埋点数据,(h5埋点数据也会走native上报一份)
* 2.去掉ACTION_EQUIP_INFO类数据,去掉action类型数量<=2的session
* 1.首页加载前的引导页(android不埋,ios:view_class=YQGuidePageViewVC)不计入首页流量统计
* 2.去掉ACTION_EQUIP_INFO类数据
* 1.首页加载前的引导页(android不埋 )不计入首页流量统计
* 2.对于ACTION_VIEW类型事件,用页面的resume以及stop事件计算访问时差
* 3.对于ACTION_HEART_BEAT事件中的back类型或者是BACKGROUND类事件,将menu_code定位back,
* 取连续backgroud的末次back的时间以及首次back时间差作为后台执行时长
......@@ -60,53 +63,80 @@ object SessionProcessTerm {
//向 record 表插入数据
val flag: Int = JDBCUtil.insertRecord(connSql, insertSQL, insertArr)
val sessionProcessTerm: SessionProcessTerm = SessionProcessTerm()
try {
try {
val sparkSession: SparkSession = sessionProcessTerm.getSparkSession("SessionProcessTerm")
var SOURCE_SQL_TERM =
//获取menu_code广播变量
val menuCodeBroad = UseUtil.getBroadcast(sparkSession, MyConfigSession.MENU_CODE_SQL, "view_path", "menu_code")
// //获取actionCategory变量
val actionCategory = UseUtil.getBroadcast(sparkSession, MyConfigSession.ACTION_CATEGORY_SQL, "action_type", "action_category")
//获取position对应的label_value广播变量
val positionUrlLabelBroad = UseUtil.getBroadcast(sparkSession, MyConfigSession.ACTION_URLLABEL_SQL, "url_content", "label_value")
println(s"positionUrlLabelBroad=${positionUrlLabelBroad.value}")
var SOURCE_SQL_COMMON =
s"""
|select pseudo_session,user_id_int user_id,mobile,device_token,view_class,view_path,action,
|case when action='ACTION_HEART_BEAT' then 'ACTION_HEART' else action_type end action_type,component_tag,menu_code,menu_code_new,
|case when component_tag ='back' then 'back' when (menu_code_new in('0','null','') and view_class not in('0','null','')) then view_class
|else menu_code_new end menu_code_offset ,
|action_code,position,label_value,label_class,app_version,device_type,created_time,date_time
| from ${MyConfigSession.HIVE_TABLE0}
| where servicename='trace2' and action!='ACTION_EQUIP_INFO'
| and (case when ((view_class like '%YunqueApp%' and action!='ACTION_HEART_BEAT') or LENGTH(view_class)<=3 or view_class='YQGuidePageViewVC') then '2' else '1' end)='1'
|select pseudo_session,doctor_id,mobile,device_token,class_name,view_path,action,
|component_tag,app_version,device_type, cast(created as bigint) created,user_token_tourist,
|network_type ,device_brand,device_model,alternate_info,remark2 first_app_version,serviceName service_name
| from pica_log.picalog_trace_app_part
| where pseudo_session is not null and pseudo_session !='' and pseudo_id !='' and extra_info !='com.picahealth.patient'
| and action!='ACTION_EQUIP_INFO'
| and created is not null and created!='' and FROM_UNIXTIME(cast(substring(created,1,10) as bigint),'yyyy-MM-dd')='${scnData}'
| and created_day='${scnData}' and ${elseFiler}
|""".stripMargin
val sourceDF: DataFrame = sparkSession.sql(SOURCE_SQL_TERM )
println("sourceDF.show==================")
sourceDF.printSchema()
sourceDF.createOrReplaceTempView("session_term_ods")
//过滤掉action事件<=2个的session,以及首页加载前的引导页数据(view_class=YQGuidePageViewVC)
// SOURCE_SQL_TERM =
// s"""
// |select t1.* from session_term_ods t1
// |join (select pseudo_session from session_term_ods group by pseudo_session having count(distinct action)>2) t2
// |on t1.pseudo_session = t2.pseudo_session
// |where t1.menu_code_offset not in('0','null','')
// |""".stripMargin
// val selectDF = sparkSession.sql(SOURCE_SQL_TERM).drop("menu_code_new")
// println("selectDF.show========")
// selectDF.printSchema()
println("selectDF.count=========",sourceDF.count())
// val conditionGroup = List("<'?4' ","between '4' and '7'","between '8' and 'b'",">'b'")
// val conditionGroup = List("='0'","='1'","='2'","='3'","='4'","='5'","='6'","='7'","='8'","='9'",
// "='a'","='b'","='c'","='d'","='e'",">='f'")
val conditionGroup = List( "<='1'","between '2' and '3'","between '4' and '5'","between '6' and '7'","between '8' and '9'",
var SOURCE_SQL_TERM = SOURCE_SQL_COMMON + " and serviceName='trace2' "
println("SOURCE_SQL_TERM=="+SOURCE_SQL_TERM )
var sourceDF: DataFrame = sparkSession.sql(SOURCE_SQL_TERM )
/*
补充站外埋点数据
*/
//part1.通过device_token补充,device_token不为空时对应的埋点只有trace1数据的需要加进来
val SOURCE_SQL_TERM_ADD =
s"""
|select b.* from (
| select device_token, concat_ws( '_',collect_set(service_name) ) cws
| from (${SOURCE_SQL_COMMON}) s group by device_token having concat_ws( '_',collect_set(service_name)) ='trace1'
| ) a
| join ( ${SOURCE_SQL_COMMON + " and serviceName='trace1' "}) b on a.device_token=b.device_token
|""".stripMargin
println("SOURCE_SQL_TERM_ADD=="+SOURCE_SQL_TERM_ADD)
var sourceAddDF = sparkSession.sql(SOURCE_SQL_TERM_ADD )
sourceAddDF.createOrReplaceTempView("source_add_table")
//part2.通过pseudo_session补充,device_token为空时,pseudo_session对应的埋点只有trace1数据且doctor_id不为空的需要加进来
val SOURCE_SQL_TERM_ADD2 =
s"""
|select b.* from ( ${SOURCE_SQL_COMMON + " and serviceName='trace1' "}) b where b.doctor_id in (
| select doctor_id
| from (${SOURCE_SQL_COMMON +" and doctor_id not in ('','0') and doctor_id is not null" }) s group by doctor_id having concat_ws( '_',collect_set(service_name)) ='trace1'
| )
| and b.pseudo_session not in (select pseudo_session from source_add_table group by pseudo_session )
|""".stripMargin
println("SOURCE_SQL_TERM_ADD2=="+SOURCE_SQL_TERM_ADD2)
var sourceAdd2DF = sparkSession.sql(SOURCE_SQL_TERM_ADD2 )
val sourceResDF = sourceDF.union(sourceAddDF).union(sourceAdd2DF)
println("sourceResDF.show==================")
sourceResDF.printSchema()
println("selectDF.count=========",sourceResDF.count())
var conditionGroup = List( "<='1'","between '2' and '3'","between '4' and '5'","between '6' and '7'","between '8' and '9'",
"between 'a' and 'b'","between 'c' and 'd'",">='e'" )
var dataCount = 0
var index = 0
sourceDF.persist(StorageLevel.MEMORY_AND_DISK_SER)
sourceResDF.persist(StorageLevel.MEMORY_AND_DISK_SER)
for(condition <- conditionGroup){
index += 1
val slideDF = sourceDF.where(s" SUBSTRING(pseudo_session,2,1) ${condition}").repartition(100)
val slideDF = sourceResDF.where(s" SUBSTRING(pseudo_session,2,1) ${condition}").repartition(100)
println(s"-----------------------------------compute refer columns,condition=${condition}-----------------------------------------")
val referResultRdd = sessionProcessTerm.getReferColumns(slideDF)
val baseDF = sessionProcessTerm.offsetValues(menuCodeBroad,actionCategory,positionUrlLabelBroad,slideDF)
baseDF.repartition(120).persist(StorageLevel.MEMORY_AND_DISK_SER)
val referResultRdd = sessionProcessTerm.getReferColumns(baseDF)
val referResultDF: DataFrame = sparkSession.createDataFrame(referResultRdd, StructType(
List(StructField("session_id", StringType, false),
List(StructField("pseudo_session", StringType, false),
StructField("session_id", StringType, false),
StructField("device_token", StringType, false),
StructField("user_id", IntegerType, false),
StructField("user_token", StringType, false),
StructField("mobile", StringType, false),
StructField("menu_code", StringType, false),
StructField("menu_begin_time", StringType, false),
......@@ -118,19 +148,32 @@ object SessionProcessTerm {
StructField("action_step", StringType, false),
StructField("device_type", StringType, false),
StructField("app_version", StringType, false),
StructField("created_time", StringType, false),
StructField("created_time", LongType, false),
StructField("date_time", StringType, false),
StructField("refer_session_id", StringType, false)
StructField("pre_session_id", StringType, false),
//新增字段
StructField("label_class", StringType, true),
StructField("net_type", StringType, true),
StructField("module_class1", StringType, true),
StructField("module_class2", StringType, true),
StructField("device_brand", StringType, true),
StructField("device_model", StringType, true),
StructField("view_path", StringType, true),
StructField("alternate_info", StringType, true),
StructField("first_app_version", StringType, true),
StructField("service_name", StringType, true),
StructField("tag8", StringType, true),
StructField("tag9", StringType, true)
)
))
println("referResultDF.show()============'")
referResultDF.show()
referResultDF.printSchema()
// referResultDF.where("action_type='ACTION_VIEW'").show(100,true)
referResultDF.persist(StorageLevel.MEMORY_AND_DISK_SER).createOrReplaceTempView("refer_result_table")
println("-----------------------------------compute menu_code term-----------------------------------------")
val getMenuTermSql =
"""
|select a.session_id,a.device_token,a.user_id,a.menu_code,
|select a.session_id,a.device_token,a.user_id,a.menu_code,
|lag(a.created_time) over(partition by a.session_id order by a.created_time desc ) menu_end_time,a.created_time
| from refer_result_table a
| left join(select session_id,min(app_version) min_version from refer_result_table group by session_id) b on a.session_id=b.session_id
......@@ -152,33 +195,37 @@ object SessionProcessTerm {
sessionEndDF.createOrReplaceTempView("session_end_table")
val getReferSql =
s"""
|select concat(regexp_replace( '${scnData}',"-","") ,cast(row_number() over(partition by 1 order by a.created_time) as string)) as id,
|select
|a.session_id,
|a.device_token,
|a.user_id,
|a.user_token,
|a.mobile,
|a.menu_code,
|a.menu_begin_time,
| case when a.action_type in('ACTION_VIEW','ACTION_HEART') and b.menu_end_time is null then c.session_end_time else b.menu_end_time end menu_end_time,
|(cast((case when a.action_type in('ACTION_VIEW','ACTION_HEART') and b.menu_end_time is null then c.session_end_time else b.menu_end_time end) as bigint)-cast(a.menu_begin_time as bigint))/1000 menu_time_diff,
|a.action_type,
|a.action_code,
|a.position,
|a.label_value,
|lag(a.menu_code) over(partition by a.session_id order by a.created_time) refer_menu_code,
|lag(a.action_code) over(partition by a.session_id order by a.created_time) refer_action_code,
|lag(a.position) over(partition by a.session_id order by a.created_time) refer_position,
|lag(a.label_value) over(partition by a.session_id order by a.created_time) refer_label_value,
|lag(a.action_type) over(partition by a.session_id order by a.created_time) refer_action_type,
|a.label_class,
|a.module_class1,
|a.module_class2,
|a.action_type,
|a.menu_begin_time,
| case when a.action_type in('ACTION_VIEW','ACTION_HEART') and b.menu_end_time is null then c.session_end_time else b.menu_end_time end menu_end_time,
|(cast((case when a.action_type in('ACTION_VIEW','ACTION_HEART') and b.menu_end_time is null then c.session_end_time else b.menu_end_time end) as bigint)-cast(a.menu_begin_time as bigint))/1000 menu_time_diff,
|a.action_step,
|a.device_type,
|a.device_brand,
|a.device_model,
|a.app_version,
|a.net_type,
|a.created_time,
|a.date_time,
|c.session_begin_time,
|c.session_end_time,
|(cast(c.session_end_time as bigint)-cast(c.session_begin_time as bigint))/1000 session_time_diff,
|a.refer_session_id
|a.pre_session_id,
|case when a.user_id=0 then a.device_token else a.user_id end user_identity_id,
|a.view_path, a.alternate_info, a.first_app_version,a.service_name,a.tag8,a.tag9
|from refer_result_table a
|left join refer_menu_table b on a.session_id=b.session_id and a.device_token=b.device_token and a.user_id=b.user_id and a.menu_code=b.menu_code and a.created_time=b.created_time and a.action_type in('ACTION_VIEW','ACTION_HEART')
|left join session_end_table c on a.session_id = c.session_id
......@@ -192,9 +239,8 @@ object SessionProcessTerm {
val resultDF: DataFrame = sessionProcessTerm.matchUserId(menuCodeAddDF, sparkSession, scnData).repartition(5)
println("dwFactLogSession.show=======>")
resultDF.printSchema()
// resultDF.persist(StorageLevel.MEMORY_AND_DISK_SER)
println("-----------------------------------load data to pica_dw.dw_fact_log_session_term-----------------")
sessionProcessTerm.loadData(resultDF, sparkSession, scnData,index)
sessionProcessTerm.loadData(resultDF, sparkSession, scnData,index,dataCount)
val resCount = resultDF.count().toInt
println(s"${condition}的结果==${resCount}")
dataCount += resCount
......@@ -239,6 +285,183 @@ class SessionProcessTerm {
sparkSession
}
def offsetValues(menuCodeBroad:Broadcast[Map[String,String]], actionCategory:Broadcast[Map[String,String]],
positionUrlLabelBroad:Broadcast[Map[String,String]],dataFrame: DataFrame): DataFrame = {
val path_menu: Map[String, String] = menuCodeBroad.value //关联到menu_code的映射表广播变量
val actionCategoryMap: Map[String, String] = actionCategory.value //关联到action_category的映射表广播变量
val positionLabelMap: Map[String, String] = positionUrlLabelBroad.value
val groupRdd = dataFrame.rdd.groupBy(r => r.getAs[String]("pseudo_session"))
val baseRdd = groupRdd.flatMap(g => {
val pseudo_session = g._1
val resList: ListBuffer[SessionTerm] = new ListBuffer[SessionTerm]()
var rowList = g._2
rowList = rowList.toList.sortWith((x, y) => x.getAs[Long]("created") > y.getAs[Long]("created")) //按created由大到小排序
var thisDeviceToken = ""
var thisDoctorId = "0"
var thisMobile = ""
rowList.foreach(row => {
var deviceToken = row.getAs[String]("device_token")
var doctorId = row.getAs[String]("doctor_id")
var mobile = row.getAs[String]("mobile")
if (deviceToken != null && !deviceToken.equals("")) {
thisDeviceToken = deviceToken
} else {
deviceToken = thisDeviceToken
}
if (doctorId != null && !doctorId.equals("") && !doctorId.equals("0")) {
thisDoctorId = doctorId
} else {
doctorId = thisDoctorId
}
if (mobile != null && !mobile.equals("")) {
thisMobile = mobile
} else {
mobile = thisMobile
}
//1.获取网络类型
//2G,3G,4G,2G/3G/4G,WIFI,WLAN,或者为空字符串
val net_type = UseUtil.netTypeMatch(StringUtils.getNotNullString(row.getAs[String]("network_type")))
//2.修改action类型,保留原始字段
val action = row.getAs[String]("action")
var action_type: String = ""
if (action != null) {
action_type = actionCategoryMap.getOrElse(action, "ACTION_UNKNOWN")
if(action=="ACTION_HEART_BEAT"){
action_type="ACTION_HEART"
}
}
//3.拆分 component_tag字段
val component_tag: String = StringUtils.getNotNullString(row.getAs[String]("component_tag"))
val view_path = StringUtils.getNotNullString(row.getAs[String]("view_path"))
val tagArr = Array("menu_code", "action_code", "position", "label_value", "label_class", "module_class1", "module_class2", "tag8", "tag9")
val tagMap = mutable.Map[String, String]()
tagArr.foreach(r => tagMap.put(r, ""))
//针对特定类型事件,将符合要求的component_tag进行切割,获取 aciton_code,label_value
if(List("ACTION_VIEW", "ACTION_CLICK", "ACTION_EXPOSE").contains(action_type)){
if (component_tag.contains("#")) {//默认是
//按照#号切割
val strs: Array[String] = component_tag.split("#")
var index = 0
for (value <- strs) {
val filedName = tagArr.apply(index)
if ("label_value".equals(filedName)) {
tagMap.put(filedName, value.substring(0, math.min(250, strs(3).length)))
} else {
tagMap.put(filedName, value)
}
index += 1
}
}else if(component_tag!="0" && component_tag.length<5){ //如果component_tag不为0,且长度在合理区间,则作为menu_code值
tagMap.put("menu_code",component_tag)
}
}
var menu_code_new = tagMap("menu_code")
if ("MenuCode_081".equals(menu_code_new)) {
menu_code_new = "081" //针对异常menu_code值单独处理
}
//匹配menu_code:如果上述截取出来的menu_code为(''||null||0||length(menu_code)>3 ) and action is ACTION_VIEW
if ((menu_code_new.equals("") || menu_code_new.equals("null") || menu_code_new.equals("0") || menu_code_new.length > 3)
&& action_type.equals("ACTION_VIEW")) {
menu_code_new = "0" //关联不上的显示为0
import scala.util.control.Breaks._
breakable {
//利用menu_code映射表匹配
for (tuple <- path_menu) {
//源数据view_path的字符串包含映射表view_path的字符串
if (view_path.contains(tuple._1)) {
//满足条件后,修改源数据的menu_code
menu_code_new = tuple._2
println("--------------------menu_code match successfully-----------------------")
//结束遍历
break()
}
}
}
}
//解析(5分钟|健康知识)详情页的id到position
if(action_type.equals("ACTION_VIEW") && List("425","015").contains(menu_code_new) && view_path.contains("?")){
val kvs=view_path.split("\\?").apply(1).split("&")
for(kv <- kvs){
val k = kv.split("=").apply(0)
if(menu_code_new=="425" && k=="id"){ //五分钟详情页
tagMap("position")= kv.split("=").apply(1)
}
if(menu_code_new=="015" && k=="eduComId"){ //健康知识详情页
tagMap("position")= kv.split("=").apply(1)
}
}
}
//
if(List("930000","930001","930002" ).contains(tagMap("action_code"))
&& "ACTION_CLICK".equals(action_type) && "930"== menu_code_new){
println("------------------------------------单独计算label_value----------------------------------------------")
//"menu_code = '930' and action_code IN ( '930000', '930001', '930002' ) and action_type = 'ACTION_CLICK'
breakable {
//利用position url_content映射表匹配
for (tuple <- positionLabelMap) {
if (StringUtils.getNotNullString(tagMap("position")).contains(tuple._1) && tuple._2!="" && tuple._2 != null) {
//满足条件后,修改源数据的label_value
tagMap("label_value") = tuple._2
println("--------------------menu_code match successfully-----------------------")
//结束遍历
break()
}
}
}
}
//针对menu_code缺失情况补偿处理
var menu_code_offset = menu_code_new
if(component_tag=="back"){
menu_code_offset = "back"
}
val view_class = StringUtils.getNotNullString(row.getAs[String]("class_name"))
if( List("0","null","").contains(menu_code_new) && !List("0","null","").contains(view_class)){
menu_code_offset = view_class
}
var user_id = 0
if(doctorId != null && !doctorId.equals("") && !doctorId.equals("0")){
user_id = doctorId.toInt
}
var app_version = StringUtils.getNotNullString(row.getAs[String]("app_version"))
var is_need = true //添加一个字段,下一步计算时长使用,
// 针对3.4.5之后的版本,使用ACTION_ACTIVITY_RESUME计算页面起止
if(app_version.length<8 && app_version>="3.4.5" && action=="ACTION_ACTIVITY_CREATE"){
is_need = false
}
resList += SessionTerm(pseudo_session,
user_id,
mobile,
deviceToken,
StringUtils.getNotNullString(row.getAs[String]("user_token_tourist")),
view_class,
StringUtils.getNotNullString(row.getAs[String]("view_path")),
action, action_type, component_tag, tagMap("menu_code"), menu_code_new,menu_code_offset, tagMap("action_code"),
tagMap("position"), tagMap("label_value"), tagMap("label_class"), tagMap("module_class1"), tagMap("module_class2"),
app_version,
StringUtils.getNotNullString(row.getAs[String]("device_type")),
StringUtils.getNotNullString(row.getAs[String]("device_brand")),
StringUtils.getNotNullString(row.getAs[String]("device_model")),
net_type, row.getAs[Long]("created"),
DateUtils.milliSecondsFormatTime(row.getAs[Long]("created") + ""),
StringUtils.getNotNullString(row.getAs[String]("alternate_info")),
StringUtils.getNotNullString(row.getAs[String]("first_app_version")),
StringUtils.getNotNullString(row.getAs[String]("service_name")),
tagMap("tag8"), tagMap("tag9"), is_need
)
})
resList.iterator
})
import dataFrame.sparkSession.implicits._
var baseDF = baseRdd.toDF("pseudo_session", "user_id", "mobile", "device_token", "user_token", "view_class", "view_path", "action", "action_type", "component_tag",
"menu_code", "menu_code_new","menu_code_offset", "action_code", "position", "label_value", "label_class", "module_class1", "module_class2", "app_version", "device_type", "device_brand", "device_model",
"net_type", "created_time", "date_time", "alternate_info", "first_app_version", "service_name", "tag8", "tag9", "is_need")
println("baseDF.show=======>")
baseDF.where("action_code !=''").show()
baseDF.printSchema()
baseDF
}
/**
* @Description 匹配user_id,补全数据中的user_id字段
* @param dataFrame 筛选后的数据
......@@ -253,7 +476,7 @@ class SessionProcessTerm {
val INIT_USER_ID_SQL_PREF =
s"""
|SELECT t.*,t.user_id_ods user_id_old, COALESCE(cast(b.id as string),'0') AS user_id from ${MyConfigSession.VIEW_SESSION_ODS} as t
|left join (select id,cast(id as string) id_str from pica_ds.pica_doctor a where a.delete_flag = 1 and to_date(a.creat_time) <= '${created_day}') AS b on t.user_id_ods = cast(b.id as string)
|left join (select id,cast(id as string) id_str from pica_ds.pica_doctor a where to_date(a.creat_time) <= '${created_day}') AS b on t.user_id_ods = cast(b.id as string)
""".stripMargin
val DF = sparkSQLSession.sql(INIT_USER_ID_SQL_PREF).drop("user_id_ods")
/*
......@@ -263,12 +486,12 @@ class SessionProcessTerm {
val noMatchUserIdDF: Dataset[Row] = DF.where("user_id ='' OR user_id = '0' OR LENGTH(user_id) = 24")
noMatchUserIdDF.drop("user_id_old").withColumnRenamed("user_id", "user_id_old")
.createOrReplaceTempView(MyConfigSession.VIEW_SESSION_NO_MATCH)
//1.筛选出上一步没有匹配到的user_id,先按照mobile_phone进行匹配
val MOBILE_PHONE_SQL_PREF: String =
s"""
|SELECT ss.*,COALESCE(cast(b.id as string),'0') AS user_id from ${MyConfigSession.VIEW_SESSION_NO_MATCH} AS ss
|left join (select distinct id,mobile_phone from pica_ds.pica_doctor where pica_doctor.delete_flag = 1 ) AS b on ss.mobile = b.mobile_phone
|left join (select distinct id,mobile_phone from pica_ds.pica_doctor where delete_flag=1 and mobile_phone!='' and mobile_phone!='XK0HdMN6dAfOlYPOFHHL0A==') AS b on ss.mobile = b.mobile_phone
""".stripMargin
//1.筛选出上一步没有匹配到的user_id,先按照mobile_phone进行匹配
val mobilePhoneDF: DataFrame = sparkSQLSession.sql(MOBILE_PHONE_SQL_PREF)
//2.使用临时表equiment,筛选出为1的那条最新数据
var equipmentInfoSql = MyConfigSession.EQUIPMENT_INFO_SQL
......@@ -310,11 +533,12 @@ class SessionProcessTerm {
* @return org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>
**/
def getReferColumns(sourceDF: DataFrame): RDD[Row] = {
val actions = List("'ACTION_VIEW'", "'ACTION_CLICK'", "'ACTION_EXPOSE'") //高版本使用"'ACTION_HEART_BEAT'"
val actionCommonDF = sourceDF.where(s" ( action_type in (${actions.mkString(",")}) or (action_type='ACTION_HEART' and menu_code_offset='back' ))")
val actions = List("'ACTION_VIEW'", "'ACTION_CLICK'", "'ACTION_EXPOSE'","'ACTION_START'") //高版本使用"'ACTION_HEART_BEAT'"
val actionCommonDF = sourceDF.where(s" ( action_type in (${actions.mkString(",")}) or (action_type='ACTION_HEART' and menu_code_offset='back' )) ")
.filter("is_need")
println("actionCommonDF.show()===========")
actionCommonDF.printSchema()
// actionCommonDF.where("pseudo_session='5964908742'").show(200)
//计算session中menu_code的起始时间,对于ACTION_HEART_BEAT的埋点记录剔除冗余数据
val groupRdd = actionCommonDF.rdd.groupBy(r => r.getAs[String]("pseudo_session"))//.repartition(100)
println("开始执行groupRdd.flatMap=========")
......@@ -322,9 +546,8 @@ class SessionProcessTerm {
val pseudo_session = g._1
val resList: ListBuffer[Row] = new ListBuffer[Row]()
var rowList = g._2
rowList = rowList.toList.sortWith((x, y) => x.getAs[String]("created_time") < y.getAs[String]("created_time")) //按created由小到小排序
rowList = rowList.toList.sortWith((x, y) => x.getAs[Long]("created_time") < y.getAs[Long]("created_time")) //按created由小到小排序
var actionStep = "0_0"
// var sessionBeginTime = rowList.head.getAs[String]("created_time")
var thisMenuCode = ""
var menuBeginTime = ""
var prefCreatedTime = ""
......@@ -335,8 +558,8 @@ class SessionProcessTerm {
var isSessionNew = false
var appVersion = row.getAs[String]("app_version")
//如果版本号低于3.3.2,则没有ACTION_HEART_BEAT事件,需要根据两次事件之间的时差对session进行切分,超过30分钟则拆分出一个session
var createdTime = row.getAs[String]("created_time")
if (appVersion < "3.3.2") { //版本号低于3.3.2,需要根据时差判断是否拆分session
var createdTime = row.getAs[Long]("created_time")
if (appVersion < "3.3.2") { //版本号低于3.3.2,需要根据时差判断是否 拆分session
if (prefCreatedTime != "" && (createdTime.toLong - prefCreatedTime.toLong) > MyConfigSession.SESSION_GAP) { //本次action事件事件比
count += 1
isSessionNew = true
......@@ -346,9 +569,9 @@ class SessionProcessTerm {
val action = row.getAs[String]("action")
val actionType = row.getAs[String]("action_type")
var sessionId = pseudo_session + count
var referSessionId = ""
var preSessionId = ""
if (count > 0) {
referSessionId = pseudo_session + (count - 1)
preSessionId = pseudo_session + (count - 1)
}
//使用版本以及action_type限制是否入库
var needPut = true
......@@ -369,27 +592,29 @@ class SessionProcessTerm {
}
}
if(needPut){
menuBeginTime = createdTime
menuBeginTime = createdTime.toString
}
}
//如果menuCode发生改变,或者session改变,重置本轮menuCode
if (!thisMenuCode.equals(menuCode) || isSessionNew) {
menuBeginTime = createdTime
menuBeginTime = createdTime.toString
if(needPut ){//只有在确定该menu_code对应的记录入库后,才更新thisMenuCode以及对应的menu_code访问顺序
thisMenuCode = menuCode
actionStep = (actionStep.split("_")(0).toInt + 1).toString + "_0"
}
} else {
} else {//如果menuCode与session都不变, actionType改变或者actionType为非浏览行为,则调整actionStep
if (!(prefActionType.equals(actionType) && "ACTION_VIEW".equals(actionType))) {
actionStep = (actionStep.split("_")(0)) + "_" + (actionStep.split("_")(1).toInt + 1).toString
}
}
// println(s"created_time=${createdTime},thisMenuCode=${thisMenuCode},actionType=${actionType},action=${action},isMenuChange=${!thisMenuCode.equals(menuCode)},prefActionType=${prefActionType},needPut=${needPut}" )
prefCreatedTime = createdTime
prefCreatedTime = createdTime.toString
prefActionType = actionType
if (needPut) {
resList += (Row(sessionId,
resList += (Row( pseudo_session,sessionId,
row.getAs[String]("device_token"),
row.getAs[Integer]("user_id"),
row.getAs[String]("user_token"),
row.getAs[String]("mobile"),
thisMenuCode,
menuBeginTime,
......@@ -401,7 +626,19 @@ class SessionProcessTerm {
row.getAs[String]("app_version"),
createdTime,
row.getAs[String]("date_time"),
referSessionId
preSessionId,
row.getAs[String]("label_class"),
row.getAs[String]("net_type"),
row.getAs[String]("module_class1"),
row.getAs[String]("module_class2"),
row.getAs[String]("device_brand"),
row.getAs[String]("device_model"),
row.getAs[String]("view_path"),
row.getAs[String]("alternate_info"),
row.getAs[String]("first_app_version"),
row.getAs[String]("service_name"),
row.getAs[String]("tag8"),
row.getAs[String]("tag9")
))
}
}
......@@ -420,86 +657,66 @@ class SessionProcessTerm {
*/
def getSessionTail(sourceDF: DataFrame, sparkSession: SparkSession): DataFrame = {
sourceDF.createOrReplaceTempView("select_res_table")
/*
1.排除掉所有拆分过新session的pseudo_session,计算出他们的起止时间
*/
val sourceSql =
"""
| select concat(pseudo_session,'0') session_id,min(created_time) session_begin_time,max(created_time) session_end_time from select_res_table
| where pseudo_session not in (select distinct SUBSTRING(refer_session_id,1,length(refer_session_id)-1) from refer_result_table where refer_session_id like '%0')
| group by concat(pseudo_session,'0')
|""".stripMargin
//2.计算所有新拆分的sessionid的起止时间
val sourceSql2 =
"""
|select session_id,min(created_time) session_begin_time,max(created_time) session_end_time from refer_result_table
| where SUBSTRING(session_id,1,length(session_id)-1)
| in (select distinct SUBSTRING(refer_session_id,1,length(refer_session_id)-1) from refer_result_table where refer_session_id!='')
| group by session_id
|""".stripMargin
val uniqueSessionEndDF = sparkSession.sql(sourceSql )
println("uniqueSessionEndDF.show()=========")
val firstSessionEndDF = sparkSession.sql(sourceSql2)
val firstSessionEndDF = sparkSession.sql(sourceSql)
println("firstSessionEndDF.show()=========")
var sessionEndDF = uniqueSessionEndDF.union(firstSessionEndDF)
sessionEndDF
firstSessionEndDF
}
/**
* @Description 导入数据到表中
* @param dataFrame 源数据
* @param sparkSession SparkSession 环境
* @param partitionDay 分区日期
* @return void
**/
def loadData1(dataFrame: DataFrame, sparkSession: SparkSession, partitionDay: String,index:Integer): Unit = {
def loadData(dataFrame: DataFrame, sparkSession: SparkSession, partitionDay: String,index:Integer,count:Integer): Unit = {
dataFrame.createOrReplaceTempView("result_view")
var insertSql = "insert overwrite"
if(index!=1){
insertSql = "insert into"
}
//补充新生session超过10个之后的session的session_begin_time与session_end_time
val HIVE_TABLE4 = "pica_dw.dw_fact_log_session_term_tmp"
val loadDataSql =
s"""
|${insertSql} table ${MyConfigSession.HIVE_TABLE4} partition(created_day='${partitionDay}')
| select id,t1.session_id,device_token,user_id,mobile,menu_code,menu_begin_time,
| menu_end_time,
| menu_time_diff,
| action_type,action_code,position,label_value,refer_menu_code,refer_action_code,refer_position,refer_label_value,refer_action_type,
| action_step,device_type,app_version,created_time,date_time,
| session_begin_time,
| session_end_time,
| session_time_diff,
| refer_session_id from result_view t1
| select concat(regexp_replace( '${partitionDay}','-','') ,cast( (row_number() over(partition by 1 order by created_time) )+${count} as string)) id,
| t1.session_id,device_token,user_id,user_token,mobile,menu_code,action_code,position,label_value,label_class,module_class1,module_class2,action_type,
| cast(menu_begin_time as string), cast(menu_end_time as string), menu_time_diff,
| action_step,device_type,device_brand,device_model, app_version, net_type ,created_time,date_time,
| cast(session_begin_time as string) , cast(session_end_time as string), session_time_diff,
| pre_session_id, user_identity_id,view_path,alternate_info,first_app_version,service_name,tag8,tag9
| from result_view t1
""".stripMargin
sparkSession.sql(loadDataSql)
}
def loadData(dataFrame: DataFrame, sparkSession: SparkSession, partitionDay: String,index:Integer): Unit = {
dataFrame.createOrReplaceTempView("result_view")
var insertSql = "insert overwrite"
if(index!=1){
insertSql = "insert into"
}
//补充新生session超过10个之后的session的session_begin_time与session_end_time
val loadDataSql =
s"""
|${insertSql} table ${MyConfigSession.HIVE_TABLE4} partition(created_day='${partitionDay}')
| select id,t1.session_id,device_token,user_id,mobile,menu_code,menu_begin_time,
| case when t1.menu_end_time is null and t1.action_type='ACTION_VIEW' then t2.session_end_time else t1.menu_end_time end menu_end_time,
| (cast((case when t1.menu_end_time is null and t1.action_type='ACTION_VIEW' then t2.session_end_time else t1.menu_end_time end) as bigint)-cast(t1.menu_begin_time as bigint))/1000 menu_time_diff,
| action_type,action_code,position,label_value,refer_menu_code,refer_action_code,refer_position,refer_label_value,refer_action_type,
| action_step,device_type,app_version,created_time,date_time,
| case when t1.session_begin_time is not null then t1.session_begin_time else t2.session_begin_time end session_begin_time,
| case when t1.session_end_time is not null then t1.session_end_time else t2.session_end_time end session_end_time,
| case when t1.session_time_diff is not null then t1.session_time_diff else t2.session_time_diff end session_time_diff,
| refer_session_id from result_view t1
| left join (select session_id, min(created_time) session_begin_time,max(created_time) session_end_time,
| (cast(max(created_time) as bigint)-cast( min(created_time) as bigint))/1000 session_time_diff
| from result_view where session_time_diff is null group by session_id ) t2
| on t1.session_id=t2.session_id
""".stripMargin
sparkSession.sql(loadDataSql)
}
}
case class SessionTerm(pseudo_session: String,
user_id: Integer,
mobile: String,
device_token: String,
user_token: String,
view_class: String,
view_path: String,
action: String,
action_type: String,
component_tag: String,
menu_code: String,
menu_code_new: String,
menu_code_offset:String,
action_code: String,
position: String,
label_value: String,
label_class: String,
module_class1: String,
module_class2: String,
app_version: String,
device_type: String,
device_brand: String,
device_model: String,
net_type: String,
created_time: Long,
date_time: String,
alternate_info: String,
first_app_version: String,
service_name: String,
tag8: String, tag9: String,
is_need:Boolean)
package com.utils
import com.session.{SessionMenuCalc, SessionMenuCalcNew, SessionProcess, SessionProcessHeart, SessionProcessPath, SessionProcessPref, SessionProcessTerm}
import com.session.{SessionMenuCalc, SessionMenuCalcNew, SessionProcess, SessionProcessHeart, SessionProcessPath, SessionProcessPathNew, SessionProcessPref, SessionProcessTerm}
import org.apache.hadoop.util.ProgramDriver
/**
......@@ -15,9 +15,10 @@ object Driver {
driver.addClass("SessionProcessPath",classOf[SessionProcessPath],"用户Session数据分析导入到dw_fact_log_session_path表")
driver.addClass("SessionProcessHeart",classOf[SessionProcessHeart],"用户Session数据分析导入到dw_fact_log_session_heart表")
driver.addClass("SessionMenuCalc",classOf[SessionMenuCalc],"传递日期参数--用户Session数据分析导入到dw_fact_log_session_menu_calc表")
driver.addClass("SessionProcessPref",classOf[SessionProcessPref],"传递日期参数--用户Session数据etl导入到dw_fact_log_sesson_pref表")
driver.addClass("SessionProcessTerm",classOf[SessionProcessTerm],"传递日期参数--用户Session数据etl导入到dw_fact_log_sesson_term表")
driver.addClass("SessionMenuCalcNew",classOf[SessionMenuCalcNew],"传递日期参数--用户Session数据分析导入到dw_fact_log_session_menu_calc_new表")
driver.addClass("SessionProcessPathNew",classOf[SessionProcessPathNew],"用户Session数据分析导入到dw_fact_log_session_path_new表")
driver.run(args)
}
}
Markdown 格式
0% or
您添加了 0 到此讨论。请谨慎行事。
先完成此消息的编辑!
想要评论请 注册