提交 25f7602b 编写于 作者: weicheng.mao's avatar weicheng.mao

新路径漏斗分析底表

上级 49e0566f
package com.config package com.config
/** /**
* *
* 埋点流量相关常量配置 * 埋点流量相关常量配置
* @Author zhenxin.ma * @Author zhenxin.ma
* @Date 2019/8/20 9:43 * @Date 2019/8/20 9:43
* @Version 1.0 * @Version 1.0
*/ */
object MyConfigSession { object MyConfigSession {
//Hive的DW层流量表 //Hive的DW层流量表
final val HIVE_TABLE0: String = "pica_ods.ods_log_session_pref" final val HIVE_TABLE0: String = "pica_ods.ods_log_session_pref"
...@@ -17,6 +17,10 @@ object MyConfigSession { ...@@ -17,6 +17,10 @@ object MyConfigSession {
final val HIVE_TABLE3: String = "pica_dw.dw_fact_log_session_heart" final val HIVE_TABLE3: String = "pica_dw.dw_fact_log_session_heart"
final val HIVE_TABLE4: String = "pica_dw.dw_fact_log_session_term" final val HIVE_TABLE4: String = "pica_dw.dw_fact_log_session_term"
final val HIVE_TABLE4_MID: String = "pica_dw.dw_fact_log_session_term_mid" final val HIVE_TABLE4_MID: String = "pica_dw.dw_fact_log_session_term_mid"
final val HIVE_TABLE5: String = "pica_dw.pathconvertconfig"
final val HIVE_TABLE6: String = "pica_dw.dw_fact_log_session_path_convert"
//写入的文件路径 //写入的文件路径
final val PARQUET_PATH: String = "hdfs://bi-name1:8020/tmp/output/" final val PARQUET_PATH: String = "hdfs://bi-name1:8020/tmp/output/"
...@@ -88,6 +92,54 @@ object MyConfigSession { ...@@ -88,6 +92,54 @@ object MyConfigSession {
| and (menu_code != '0' and menu_code !='null' and menu_code !='' and (length(menu_code) <= 3 or length(menu_code)=7) and cast(menu_code as int ) is not null) | and (menu_code != '0' and menu_code !='null' and menu_code !='' and (length(menu_code) <= 3 or length(menu_code)=7) and cast(menu_code as int ) is not null)
""".stripMargin """.stripMargin
//从pica_dw.pathconvertconfig表中筛选数据
final val SOURCE_SQL_CONFIG: String =
s"""
|select
|projectid,
|nodeid,
|nodename,
|menucode,
|actioncode,
|uv,
|refer_menu_code,
|refer_action_code,
|starttime,
|endtime
|from ${MyConfigSession.HIVE_TABLE5}
""".stripMargin
//从pica_dw.dw_fact_log_session_path_convert表中筛选数据
final val SOURCE_SQL_PATH_CONVERT: String =
s"""
|select
|log_session_id,session_id,user_id,action_type,
|user_token,menu_code,action_code,position,
|label_value,label_class,refer_log_session_id,
|refer_menu_code,refer_action_code,refer_position,
|refer_label_value,refer_time_diff,refer_created,step_id,
|app_version,device_type,device_brand,
|device_model,net_type,created_time,
|date_time,module_class1,module_class2,
|user_identity_id,created_day
|from ${MyConfigSession.HIVE_TABLE6}
""".stripMargin
//从dw_fact_log_session_TERM表中筛选数据
final val SOURCE_PATH_CONVERT: String =
s"""
|select id log_session_id, session_id, user_id,device_token,action_type,user_token,menu_code,action_code,position,label_value,label_class,action_step,
|app_version,device_type,device_brand,device_model,net_type,created_time,date_time,module_class1,module_class2 from ${MyConfigSession.HIVE_TABLE4}
| where app_version >= '3.1.7'
| AND ((action_type ='ACTION_CLICK' and action_code != 'null' ) OR action_type ='ACTION_VIEW' )
| and (menu_code != '0' and menu_code !='null' and menu_code !='' and (length(menu_code) <= 3 or length(menu_code)=7) and cast(menu_code as int ) is not null)
""".stripMargin
//匹配user_id的条件 //匹配user_id的条件
//0.使用pica_ds.pica_doctor表匹配,匹配不上的user_id值为'0' //0.使用pica_ds.pica_doctor表匹配,匹配不上的user_id值为'0'
......
package com.session
import java.sql
import java.sql.PreparedStatement
import com.config.MyConfigSession
import com.pica.utils.{DateUtils, StringUtils}
import com.utils.{JDBCUtil, UseUtil}
import org.apache.spark.SparkConf
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.expressions.{Window, WindowSpec}
import org.apache.spark.sql.functions.{lag, row_number}
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import scala.util.control.Breaks.{break, breakable}
/**
* 处理昨天的数据,导入到pica_dw.dw_fact_log_session_path_new表
* @Author maoweicheng
* @Date 2021/07/12 09:58
* @Version 1.0
*/
object SessionPathConvert {
def apply(): SessionPathConvert = new SessionPathConvert()
def main(args: Array[String]): Unit = {
//1.执行任务之前先往record表记录
val insertSQL: String =
s"""
|insert into ${MyConfigSession.DATA_BASE}.${MyConfigSession.JDBC_TABLE} (job_id,job_name,job_type,job_scn,status,start_time)
|values(0,'pica_dw.dw_fact_log_session_path_convert','3',?,'0',?)
""".stripMargin
//设置同步数据的批次号,格式是2019-09-12
var scnData: String = DateUtils.getYesterdayDate
var elseFiler = " 1=1"
if (args.length >= 1) {
scnData = args(0)
if(args.length > 1 && args(1)!=""){
elseFiler = args(1)
}
}
println(s"scnData=${scnData}")
//设置任务开始时间,格式是2019-09-12 14:03:30
val startTime: String = DateUtils.getTodayTime
//存储SQL中的参数
val insertArr: Array[String] = Array[String](scnData, startTime)
//获取MYSQL连接
val connSql: sql.Connection = JDBCUtil.getConnection()
//向 record 表插入数据
// val flag: Int = JDBCUtil.insertRecord(connSql, insertSQL, insertArr)
val session_path_convert = SessionPathConvert()
try {
val sparkSession: SparkSession = session_path_convert.getSparkSession("Session_path_convert")
//获取position对应的label_value广播变量
val positionUrlLabelBroad = UseUtil.getBroadcast(sparkSession, MyConfigSession.ACTION_URLLABEL_SQL, "url_content", "label_value")
println(s"positionUrlLabelBroad=${positionUrlLabelBroad.value}")
//筛选源数据
println(s"查询sql:${MyConfigSession.SOURCE_PATH_CONVERT +s" and created_day='${scnData}' and ${elseFiler}"}")
val sourceDF: DataFrame = sparkSession.sql(MyConfigSession.SOURCE_PATH_CONVERT +s" and created_day='${scnData}' and ${elseFiler}")
// sourceDF.show()
println(s"sourceDF.count=${sourceDF.count()}")
var pathStepDF = session_path_convert.getReferColumns(sparkSession,sourceDF.where("action_type IN ('ACTION_CLICK','ACTION_VIEW')"),positionUrlLabelBroad )
println(s"pathStepDF.count=${pathStepDF.count()}")
pathStepDF.createOrReplaceTempView("menu_refer_record")
println("-----------------------------------compute refer columns-----------------------------------------")
val referDF= sparkSession.sql("select t.*," +
"(cast(t.created_time as bigint) - cast(t.refer_created as bigint) ) refer_time_diff, " +
"row_number() over(partition by t.session_id order by t.created_time ) step_id " +
"from menu_refer_record t " ) //
referDF.createOrReplaceTempView("pref_menu_info")
val referResDF: DataFrame = referDF.select( "log_session_id","session_id","user_id","device_token","action_type","user_token","menu_code","action_code","position",
"label_value","label_class","refer_log_session_id","refer_menu_code","refer_action_code","refer_position","refer_label_value","refer_created","step_id","app_version",
"device_type","device_brand","device_model","net_type","created_time","date_time","module_class1","module_class2","refer_time_diff")
println("referResDF.printSchema()")
referResDF.printSchema()
println(s"referResDF.count=${referResDF.count()}")
println("------------------------------------单独计算label_value----------------------------------------------")
//"menu_code = '930' and action_code IN ( '930000', '930001', '930002' ) and action_type = 'ACTION_CLICK'
println("-----------------------------------load data to pica_dw.dw_fact_log_session_path-----------------")
session_path_convert.loadData(referResDF,sparkSession,scnData)
println("----------------------------------update task record table---------------------------------------")
//任务执行成功,更新 Mysql record 配置表
val updateSQL: String =
s"""
|update ${MyConfigSession.JDBC_TABLE} set status=?,end_time=?,data_count=? where job_id=0 and start_time='${startTime}'
""".stripMargin
val upreSta: PreparedStatement = connSql.prepareStatement(updateSQL)
upreSta.setString(1, "1")
upreSta.setString(2, DateUtils.getTodayTime)
upreSta.setInt(3, referResDF.count().toInt)
//更新表数据
upreSta.executeUpdate()
//关闭连接
JDBCUtil.close(connSql, upreSta)
sparkSession.stop()
}catch {
case e:Exception => {
println("-----------------------------------任务异常---------------------------------------------------")
e.printStackTrace()
val exceptionSQL: String =
s"""
|update ${MyConfigSession.JDBC_TABLE} set status=?,exception=?,end_time=? where job_id=0 and start_time='${startTime}'
""".stripMargin
val errorArr = Array[String]("2", e.getMessage, DateUtils.getTodayTime)
// JDBCUtil.insertRecord(connSql, exceptionSQL, errorArr)
connSql.close()
}
}
}
}
class SessionPathConvert {
def getSparkSession(appName: String): SparkSession = {
val conf: SparkConf = new SparkConf().setAppName(appName).setMaster("local[*]")
.set("dfs.client.use.datanode.hostname","true")
UseUtil.setConfigure(conf)
val sparkSession: SparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
sparkSession
}
/**
* @Description 获取需要的字段的refer字段
* @param dataFrame 源数据
* @return org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>
**/
def getReferColumns( spark: SparkSession,dataFrame: DataFrame,positionUrlLabelBroad:Broadcast[Map[String,String]]) = {
val groupRdd = dataFrame.rdd.groupBy(r => r.getAs[String]("session_id"))
val baseRdd = groupRdd.flatMap(g => {
val session_id = g._1
val resList: ListBuffer[PathStep] = new ListBuffer[PathStep]()
var rowList = g._2
rowList = rowList.toList.sortWith((x, y) => x.getAs[String]("created_time") < y.getAs[String]("created_time")) //按created由大到小排序
var preId = ""
var preMenuCode = ""
var referLabelValue = ""
var prePosition = ""
var preActionCode = ""
var preActionType = ""
var preCreated = ""
var actionStepNew = "0_0"
var thisMenuCode = ""
var referMap = mutable.Map[String, String]()
import scala.collection.mutable._
var menuStack = Stack[ListBuffer[String]]()
//获取session的上个menu_code的最后一次action_code值,如无则空
rowList.foreach(row => {
var log_session_id = row.getAs[String]("log_session_id")
var menu_code = row.getAs[String]("menu_code")
var action_type=row.getAs[String]("action_type")
var action_code=row.getAs[String]("action_code")
val position = row.getAs[String]("position")
val label_value = row.getAs[String]("label_value")
val created = row.getAs[String]("created_time")
//重新计算action_step_new
//如果menuCode发生改变,或者session改变,重置本轮menuCode
if (!thisMenuCode.equals(menu_code) ) {
thisMenuCode = menu_code
if(preActionType=="ACTION_CLICK"||preActionType=="ACTION_VIEW"){//只有上个页面有过click行为时才改变refer值,否则refer值为空
referMap("referLogSessionId") = preId
referMap("referMenuCode") = preMenuCode
referMap("referActionCode") = preActionCode
referMap("referPostion") = prePosition
referMap("referLabelValue") = referLabelValue
referMap("referCreated") = preCreated
//menu_code改变,先查询栈顶记录,如果栈顶记录的pre记录与当前menu_code一致,则弹出栈顶元素,否则将上个menu_code的信息放入站中
//弹出栈顶记录后,下个栈顶记录数据中的refer信息作为该menu_code的refer(前提当前页面不是首页)
if(!menuStack.isEmpty && menuStack.top.apply(1) == menu_code ){
var preMenuInfo = menuStack.pop()
if(!menuStack.isEmpty){
referMap("referLogSessionId") = menuStack.top.apply(0)
referMap("referMenuCode") = menuStack.top.apply(1)
referMap("referActionCode") = menuStack.top.apply(2)
referMap("referPostion") = menuStack.top.apply(3)
referMap("referLabelValue") = menuStack.top.apply(4)
referMap("referCreated") = menuStack.top.apply(5)
}else{
referMap("referLogSessionId") = ""
referMap("referMenuCode") = ""
referMap("referActionCode") = ""
referMap("referPostion") = ""
referMap("referLabelValue") = ""
referMap("referCreated") = ""
}
}else{
var referInfo = ListBuffer(preId,preMenuCode,preActionCode,prePosition,referLabelValue,preCreated)
menuStack.push(referInfo)
}
if(menu_code=="001"){//针对首页,都认为是返回类型操作,将refer指向空
menuStack.clear()
referMap("referLogSessionId") = ""
referMap("referMenuCode") = ""
referMap("referActionCode") = ""
referMap("referPostion") = ""
referMap("referLabelValue") = ""
referMap("referCreated") = ""
}
}else{
referMap("referLogSessionId") = ""
referMap("referMenuCode") = ""
referMap("referActionCode") = ""
referMap("referPostion") = ""
referMap("referLabelValue") = ""
referMap("referCreated") = ""
}
actionStepNew = (actionStepNew.split("_")(0).toInt + 1).toString + "_0"
} else {//如果menuCode与session都不变, 则调整actionStep
actionStepNew = (actionStepNew.split("_")(0)) + "_" + (actionStepNew.split("_")(1).toInt + 1).toString
}
preId = log_session_id
preMenuCode = menu_code
preActionType= action_type
preActionCode = action_code
prePosition = position
referLabelValue = label_value
preCreated = created
if(menu_code=="001"){
println(s"001的referMap==${referMap}")
}
resList += PathStep(row.getAs[String]("log_session_id"),
session_id,
row.getAs[Integer]("user_id"),
row.getAs[String]("device_token"),
action_type,
row.getAs[String]("user_token"),
menu_code,
row.getAs[String]("action_code"),
row.getAs[String]("position"),
row.getAs[String]("label_value"),
row.getAs[String]("label_class"),
row.getAs[String]("app_version"),
row.getAs[String]("device_type"),
row.getAs[String]("device_brand"),
row.getAs[String]("device_model"),
row.getAs[String]("net_type"),
row.getAs[String]("created_time"),
row.getAs[String]("date_time"),
row.getAs[String]("module_class1"),
row.getAs[String]("module_class2"),
referMap.getOrElse("referLogSessionId",""),
referMap.getOrElse("referMenuCode",""),
referMap.getOrElse("referActionCode",""),
referMap.getOrElse("referPostion",""),
referMap.getOrElse("referLabelValue",""),
referMap.getOrElse("referCreated",""),
actionStepNew )
})
resList.iterator
})
import dataFrame.sparkSession.implicits._
var baseDF = baseRdd.toDF("log_session_id","session_id","user_id","device_token","action_type","user_token","menu_code","action_code",
"position","label_value","label_class","app_version","device_type","device_brand","device_model","net_type", "created_time","date_time",
"module_class1","module_class2","refer_log_session_id","refer_menu_code","refer_action_code","refer_position","refer_label_value","refer_created" ,"action_step_new")
println("baseDF.show=======>")
baseDF.show()
baseDF.printSchema()
baseDF
}
/**
* @Description 导入数据到表中
* @param dataFrame 源数据
* @param sparkSession SparkSession 环境
* @param partitionDay 分区日期
* @return void
**/
def loadData(dataFrame: DataFrame, sparkSession: SparkSession, partitionDay:String):Unit = {
dataFrame.createOrReplaceTempView("result_view")
val tableName = "pica_dw.dw_fact_log_session_path_convert"
val loadDataSql =
s"""
|insert overwrite table ${tableName} partition(created_day='${partitionDay}')
| select log_session_id, session_id,user_id,action_type,user_token,menu_code,action_code,position,label_value,label_class,
| refer_log_session_id,refer_menu_code,refer_action_code,refer_position,refer_label_value,
| cast(refer_time_diff as int) as refer_time_diff,refer_created,
| step_id,app_version,device_type,device_brand,device_model,net_type ,created_time,date_time, module_class1, module_class2,
| case when user_id=0 then device_token else user_id end user_identity_id
| from result_view distribute by rand()
""".stripMargin
sparkSession.sql(loadDataSql)
}
}
Markdown 格式
0% or
您添加了 0 到此讨论。请谨慎行事。
先完成此消息的编辑!
想要评论请 注册