提交 2630891b 编写于 作者: weicheng.mao's avatar weicheng.mao

路径漏斗分析底表及路径漏斗计算

上级 4662bfdf
......@@ -133,7 +133,7 @@ object MyConfigSession {
final val SOURCE_PATH_CONVERT: String =
s"""
|select id log_session_id, session_id, user_id,device_token,action_type,user_token,menu_code,action_code,position,label_value,label_class,action_step,
|app_version,device_type,device_brand,device_model,net_type,created_time,date_time,module_class1,module_class2 from ${MyConfigSession.HIVE_TABLE4}
|app_version,device_type,device_brand,device_model,net_type,created_time,date_time,module_class1,module_class2,view_path from ${MyConfigSession.HIVE_TABLE4}
| where app_version >= '3.1.7'
| AND ((action_type ='ACTION_CLICK' and action_code != 'null' ) OR action_type ='ACTION_VIEW' )
| and (menu_code != '0' and menu_code !='null' and menu_code !='' and (length(menu_code) <= 3 or length(menu_code)=7) and cast(menu_code as int ) is not null)
......@@ -142,6 +142,7 @@ object MyConfigSession {
//匹配user_id的条件
//0.使用pica_ds.pica_doctor表匹配,匹配不上的user_id值为'0'
final val INIT_USER_ID_SQL =
......
package com.session
import com.config.MyConfigSession
import com.pica.utils.DateUtils
import com.utils.{JDBCUtil, UseUtil}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import java.sql
import java.sql.PreparedStatement
import scala.collection.mutable.ListBuffer
/**
*
* @Author 毛伟成
* @Date 2021/07/12 10:58
* @Version 1.0
*/
object Pathconvertconfig {
def apply(): Pathconvertconfig = new Pathconvertconfig()
def noposition_calculation(sparkSession: SparkSession, resList: ListBuffer[SourceConfig],i:Int,j:Int) = {
val frame = sparkSession.sql(
s"""
|select
|B.projectid,B.nodeid,B.nodename,B.menucode,B.actioncode,
|B.refer_menu_code,B.refer_action_code,B.starttime,B.endtime,A.user_id,
|A.created_time,A.refer_time_diff
|from sourceConfig B
| join sourcePathConvert A on A.menu_code=B.menucode AND
| (CASE when A.action_code="" THEN '0' ELSE A.action_code END )=B.actioncode
|and A.created_day>=B.startTime AND A.created_day<=endtime
|AND B.nodeid='${j}' AND B.projectid='${i}'
|""".stripMargin)
frame.createOrReplaceTempView("path_config"+j)
val frame1: DataFrame = sparkSession.sql(
s"""
|select
|projectid,nodeid,nodename,menucode,actioncode,count(distinct user_id) as UV,
|refer_menu_code,refer_action_code,starttime,endtime
|from path_config${j}
|where nodeid='${j}' and projectid='${i}'
|group by projectid,nodeid,nodename,menucode,actioncode,refer_menu_code,refer_action_code,starttime,endtime
|""".stripMargin).toDF()
frame1.show()
val lists: List[SourceConfig] = frame1.rdd.map(row => {
SourceConfig(row.getAs[String]("projectid"),
row.getAs[String]("nodeid"),
row.getAs[String]("nodename"),
row.getAs[String]("menucode"),
row.getAs[String]("actioncode"),
row.getAs[Long]("UV").toString,
row.getAs[String]("refer_menu_code"),
row.getAs[String]("refer_action_code"),
row.getAs[String]("starttime"),
row.getAs[String]("endtime")
)
}).collect().toList
lists.foreach(row=>{
resList.append(row)
})
}
def hasPosition_calculation(sparkSession: SparkSession, resList: ListBuffer[SourceConfig], i: Int, j: Int) = {
val frame = sparkSession.sql(
s"""
|select
|B.projectid,B.nodeid,B.nodename,B.menucode,B.actioncode,
|B.refer_menu_code,B.refer_action_code,B.starttime,B.endtime,A.user_id,
|A.created_time,A.refer_time_diff
|from sourceConfig B
| join sourcePathConvert A on A.menu_code=B.menucode AND
| (CASE when A.action_code="" THEN '0' ELSE A.action_code END )=B.actioncode
| and A.position=B.position
|and A.created_day>=B.startTime AND A.created_day<=endtime
|AND B.nodeid='${j}' AND B.projectid='${i}'
|""".stripMargin)
frame.createOrReplaceTempView("path_config"+j)
val frame1: DataFrame = sparkSession.sql(
s"""
|select
|projectid,nodeid,nodename,menucode,actioncode,count(distinct user_id) as UV,
|refer_menu_code,refer_action_code,starttime,endtime
|from path_config${j}
|where nodeid='${j}' and projectid='${i}'
|group by projectid,nodeid,nodename,menucode,actioncode,refer_menu_code,refer_action_code,starttime,endtime
|""".stripMargin).toDF()
val lists: List[SourceConfig] = frame1.rdd.map(row => {
SourceConfig(row.getAs[String]("projectid"),
row.getAs[String]("nodeid"),
row.getAs[String]("nodename"),
row.getAs[String]("menucode"),
row.getAs[String]("actioncode"),
row.getAs[Long]("UV").toString,
row.getAs[String]("refer_menu_code"),
row.getAs[String]("refer_action_code"),
row.getAs[String]("starttime"),
row.getAs[String]("endtime")
)
}).collect().toList
lists.foreach(row=>{
resList.append(row)
})
}
def path_convert_calculation(sparkSession: SparkSession, resList: ListBuffer[SourceConfig], i: Int, j: Int) = {
val df2= sparkSession.sql(
s"""
|select
|B.projectid,B.nodeid,B.nodename,B.menucode,B.actioncode,
|B.refer_menu_code,B.refer_action_code,B.starttime,B.endtime,C.user_id,
|A.created_time,A.refer_created
|from sourceConfig B
|join sourcePathConvert A
|on A.menu_code=B.menucode AND
| (CASE when A.action_code="" THEN '0' ELSE A.action_code END )=B.actioncode
|join path_config${j-1} C
|on A.refer_created=C.created_time and A.user_id=C.user_id
|where B.nodeid=${j} AND B.projectid=${i}
|""".stripMargin)
df2.createOrReplaceTempView("path_config"+j)
val outflag: Array[String] = sparkSession.sql(
s"""
|select outflag
|from sourceConfig
|where projectid=${i} and nodeid=${j}
|""".stripMargin).rdd.map(row=>row.getAs[String]("outflag")).collect()
println("outflag:--------------------------------------------------------------------"+outflag(0)+"--------------------------------------------------------------------")
println("projectid:--------------------------------------------------------------------"+i+"--------------------------------------------------------------------")
println("nodeid:--------------------------------------------------------------------"+j+"--------------------------------------------------------------------")
if(outflag(0)=="1"){
println("进来:outflag:--------------------------------------------------------------------"+outflag(0)+"--------------------------------------------------------------------")
sparkSession.sql(
s"""
|select
|B.projectid,B.nodeid,B.nodename,B.menucode,B.actioncode,user_id,
|B.starttime,B.endtime
|from path_config${j} B
|""".stripMargin).show()
println("展示:outflag:--------------------------------------------------------------------"+outflag(0)+"--------------------------------------------------------------------")
sparkSession.sql(
s"""
|insert into pica_dw.path_convert_config_user
|select
|B.projectid,B.nodeid,B.nodename,B.menucode,B.actioncode,user_id,
|B.starttime,B.endtime
|from path_config${j} B
|""".stripMargin)
}
val frame = sparkSession.sql(
s"""
|select
|B.projectid,B.nodeid,B.nodename,B.menucode,B.actioncode,count(distinct B.user_id) as UV,
|B.refer_menu_code,B.refer_action_code,B.starttime,B.endtime
|from path_config${j} B
|where nodeid=${j} and projectid=${i}
|group by B.projectid,B.nodeid,B.nodename,B.menucode,B.actioncode,B.refer_menu_code,B.refer_action_code,B.starttime,B.endtime
|""".stripMargin)
val value: List[SourceConfig] = frame.rdd.map(row => {
SourceConfig(row.getAs[String]("projectid"),
row.getAs[String]("nodeid"),
row.getAs[String]("nodename"),
row.getAs[String]("menucode"),
row.getAs[String]("actioncode"),
row.getAs[Long]("UV").toString,
row.getAs[String]("refer_menu_code"),
row.getAs[String]("refer_action_code"),
row.getAs[String]("starttime"),
row.getAs[String]("endtime")
)
}).collect().toList
value.foreach(row=>{
resList.append(row)
})
}
def path_convert_calculation2(sparkSession: SparkSession, resList: ListBuffer[SourceConfig], i: Int, j: Int, flag: Int) = {
val df2= sparkSession.sql(
s"""
|select
|B.projectid,B.nodeid,B.nodename,B.menucode,B.actioncode,
|B.refer_menu_code,B.refer_action_code,B.starttime,B.endtime,C.user_id,
|A.created_time,A.refer_created
|from sourceConfig B
|join sourcePathConvert A
|on A.menu_code=B.menucode AND
| (CASE when A.action_code="" THEN '0' ELSE A.action_code END )=B.actioncode
|join path_config${j-flag} C
|on A.refer_created=C.created_time and A.user_id=C.user_id
|where B.nodeid=${j} AND B.projectid=${i}
|""".stripMargin)
df2.createOrReplaceTempView("path_config"+j)
val outflag: Array[String] = sparkSession.sql(
s"""
|select outflag
|from sourceConfig
|where projectid=${i} and nodeid=${j}
|""".stripMargin).rdd.map(row=>row.getAs[String]("outflag")).collect()
println("outflag:--------------------------------------------------------------------"+outflag(0)+"--------------------------------------------------------------------")
println("projectid:--------------------------------------------------------------------"+i+"--------------------------------------------------------------------")
println("nodeid:--------------------------------------------------------------------"+j+"--------------------------------------------------------------------")
if(outflag(0)=="1"){
println("进来:outflag:--------------------------------------------------------------------"+outflag(0)+"--------------------------------------------------------------------")
sparkSession.sql(
s"""
|select
|B.projectid,B.nodeid,B.nodename,B.menucode,B.actioncode,user_id,
|B.starttime,B.endtime
|from path_config${j} B
|""".stripMargin).show()
println("展示:outflag:--------------------------------------------------------------------"+outflag(0)+"--------------------------------------------------------------------")
sparkSession.sql(
s"""
|insert into pica_dw.path_convert_config_user
|select
|B.projectid,B.nodeid,B.nodename,B.menucode,B.actioncode,user_id,
|B.starttime,B.endtime
|from path_config${j} B
|""".stripMargin)
}
val frame = sparkSession.sql(
s"""
|select
|B.projectid,B.nodeid,B.nodename,B.menucode,B.actioncode,count(distinct B.user_id) as UV,
|B.refer_menu_code,B.refer_action_code,B.starttime,B.endtime
|from path_config${j} B
|where nodeid=${j} and projectid=${i}
|group by B.projectid,B.nodeid,B.nodename,B.menucode,B.actioncode,B.refer_menu_code,B.refer_action_code,B.starttime,B.endtime
|""".stripMargin)
val value: List[SourceConfig] = frame.rdd.map(row => {
SourceConfig(row.getAs[String]("projectid"),
row.getAs[String]("nodeid"),
row.getAs[String]("nodename"),
row.getAs[String]("menucode"),
row.getAs[String]("actioncode"),
row.getAs[Long]("UV").toString,
row.getAs[String]("refer_menu_code"),
row.getAs[String]("refer_action_code"),
row.getAs[String]("starttime"),
row.getAs[String]("endtime")
)
}).collect().toList
value.foreach(row=>{
resList.append(row)
})
}
def main(args: Array[String]): Unit = {
//1.执行任务之前先往record表记录
val insertSQL: String =
s"""
|insert into ${MyConfigSession.DATA_BASE}.${MyConfigSession.JDBC_TABLE} (job_id,job_name,job_type,job_scn,status,start_time)
|values(0,'pica_dw.Pathconvertconfig','3',?,'0',?)
""".stripMargin
//设置同步数据的批次号,格式是2019-09-12
var scnData: String = DateUtils.getYesterdayDate
var elseFiler = " 1=1"
if (args.length >= 1) {
scnData = args(0)
if(args.length > 1 && args(1)!=""){
elseFiler = args(1)
}
}
println(s"scnData=${scnData}")
//设置任务开始时间,格式是2019-09-12 14:03:30
val startTime: String = DateUtils.getTodayTime
//存储SQL中的参数
val insertArr: Array[String] = Array[String](scnData, startTime)
//获取MYSQL连接
val connSql: sql.Connection = JDBCUtil.getConnection()
//向 record 表插入数据
val flag: Int = JDBCUtil.insertRecord(connSql, insertSQL, insertArr)
val pathconvertconfig: Pathconvertconfig = Pathconvertconfig()
try {
val sparkSession: SparkSession = pathconvertconfig.getSparkSession("pathconvertconfig")
val sourceConfig: DataFrame = sparkSession.sql(MyConfigSession.SOURCE_SQL_CONFIG )
val sourcePathConvert: DataFrame = sparkSession.sql(MyConfigSession.SOURCE_SQL_PATH_CONVERT)
sourceConfig.createOrReplaceTempView("sourceConfig")
sourcePathConvert.createOrReplaceTempView("sourcePathConvert")
// 按照project,node和去循环 按照对应表去和sessterm关联
//查询出有多少个项目
val projectMax: Array[Int] = sparkSession.sql(
"""
|select max(cast(projectid as int)) as maxp
|from sourceConfig
|""".stripMargin).rdd.map(row=>row.getAs[Int]("maxp")).collect()
val resList: ListBuffer[SourceConfig] = new ListBuffer[SourceConfig]()
for(i<- 1 to projectMax(0)){
println("project")
//查出每个project的最大的node
val maxnode = sparkSession.sql(
s"""
|select max(cast(nodeid as int)) as maxnode
|from sourceConfig
|where projectid='${i}'
|""".stripMargin).rdd.map(row=>row.getAs[Int]("maxnode")).collect()
var thisMenuCode = ""
//距离上一个不同的页面的标记
var flag=1
for(j <- 1 to maxnode(0)){
if(j==1){
//取出当前的mencode值
val menucode = sparkSession.sql(
s"""
|select menucode
|from sourceConfig
|where projectid=${i} and nodeid=${j}
|""".stripMargin).rdd.map(row=>row.getAs[String]("menucode")).collect()
val position = sparkSession.sql(
s"""
|select position
|from sourceConfig
|where projectid=${i} and nodeid=${j}
|""".stripMargin).rdd.map(row=>row.getAs[String]("position")).collect()(0)
println("展示前:position:--------------------------------------------------------------------"+position+"--------------------------------------------------------------------")
if(position =="0"){
println("展示0:position:--------------------------------------------------------------------"+position+"--------------------------------------------------------------------")
println("展示0:i:--------------------------------------------------------------------"+i+"--------------------------------------------------------------------")
println("展示0:j:--------------------------------------------------------------------"+j+"--------------------------------------------------------------------")
noposition_calculation(sparkSession,resList,i,j)
println("------------------------------------------------------------长度--------------------------------------------------------------"+resList.size)
}else{
println("展示不为0:position:--------------------------------------------------------------------"+position+"--------------------------------------------------------------------")
println("展示0:i:--------------------------------------------------------------------"+i+"--------------------------------------------------------------------")
println("展示0:j:--------------------------------------------------------------------"+j+"--------------------------------------------------------------------")
hasPosition_calculation(sparkSession,resList,i,j)
println("------------------------------------------------------------长度--------------------------------------------------------------"+resList.size)
}
thisMenuCode = menucode(0)
//非 node1的计算
}else{
//取出当前的mencode的值与上一个不同的页面值
val menucode = sparkSession.sql(
s"""
|select menucode
|from sourceConfig
|where projectid=${i} and nodeid=${j}
|""".stripMargin).rdd.map(row=>row.getAs[String]("menucode")).collect()
println("当前的mencode:"+menucode(0))
println("上一个的mencode:"+thisMenuCode)
if(thisMenuCode !=menucode(0)){
//重置 距离上一个不同的页面的
flag=1
println("展示0:i:--------------------------------------------------------------------"+i+"--------------------------------------------------------------------")
println("展示0:j:--------------------------------------------------------------------"+j+"--------------------------------------------------------------------")
path_convert_calculation(sparkSession,resList,i,j)
println("------------------------------------------------------------长度--------------------------------------------------------------"+resList.size)
//以下为相同的node处理逻辑
}else{
//取出当前的position值
val position = sparkSession.sql(
s"""
|select position
|from sourceConfig
|where projectid=${i} and nodeid=${j}
|""".stripMargin).rdd.map(row=>row.getAs[String]("position")).collect()(0)
if(j==2){
if(position=="0") {
println("展示0:i:--------------------------------------------------------------------"+i+"--------------------------------------------------------------------")
println("展示0:j:--------------------------------------------------------------------"+j+"--------------------------------------------------------------------")
noposition_calculation(sparkSession, resList, i, j)
println("------------------------------------------------------------长度--------------------------------------------------------------"+resList.size)
}else{
println("展示0:i:--------------------------------------------------------------------"+i+"--------------------------------------------------------------------")
println("展示0:j:--------------------------------------------------------------------"+j+"--------------------------------------------------------------------")
hasPosition_calculation(sparkSession,resList,i,j)
println("------------------------------------------------------------长度--------------------------------------------------------------"+resList.size)
}
}else{
flag=flag+1
println("展示0:i:--------------------------------------------------------------------"+i+"--------------------------------------------------------------------")
println("展示0:j:--------------------------------------------------------------------"+j+"--------------------------------------------------------------------")
path_convert_calculation2(sparkSession,resList,i,j,flag)
}
}
thisMenuCode =menucode(0)
}
}
}
import sparkSession.implicits._
val df = resList.toDF()
df.printSchema()
df.show()
println("长度"+resList.size)
pathconvertconfig.loadData(df,sparkSession)
println("----------------------------------update task record table---------------------------------------")
//任务执行成功,更新 Mysql record 配置表
val updateSQL: String =
s"""
|update ${MyConfigSession.JDBC_TABLE} set status=?,end_time=?,data_count=? where job_name='pica_dw.Pathconvertconfig' and start_time='${startTime}'
""".stripMargin
val upreSta: PreparedStatement = connSql.prepareStatement(updateSQL)
upreSta.setString(1, "1")
upreSta.setString(2, DateUtils.getTodayTime)
upreSta.setInt(3, resList.size)
//更新表数据
upreSta.executeUpdate()
//关闭连接
JDBCUtil.close(connSql, upreSta)
sparkSession.stop()
}catch {
case e:Exception => {
println("-----------------------------------任务异常---------------------------------------------------")
e.printStackTrace()
val exceptionSQL: String =
s"""
|update ${MyConfigSession.JDBC_TABLE} set status=?,exception=?,end_time=? where job_id=0 and start_time='${startTime}'
""".stripMargin
val errorArr = Array[String]("2", e.getMessage, DateUtils.getTodayTime)
// JDBCUtil.insertRecord(connSql, exceptionSQL, errorArr)
connSql.close()
}
}
}
}
class Pathconvertconfig {
def getSparkSession(appName: String): SparkSession = {
val conf: SparkConf = new SparkConf().setAppName(appName)
// .setMaster("local[*]")
// .set("dfs.client.use.datanode.hostname", "true")
UseUtil.setConfigure(conf)
val sparkSession: SparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
sparkSession
}
/**
* @Description 导入数据到表中
* @param dataFrame 源数据
* @param sparkSession SparkSession 环境
* @param partitionDay 分区日期
* @return void
**/
def loadData(dataFrame: DataFrame, sparkSession: SparkSession):Unit = {
dataFrame.createOrReplaceTempView("result_view")
val tableName = "pica_dw.pathconvertconfig_result"
val loadDataSql =
s"""
|insert overwrite table ${tableName}
|select
|projectid,nodeid,nodename,menucode,actioncode,UV,
|refer_menu_code,refer_action_code,starttime,endtime,current_date
|from result_view
""".stripMargin
sparkSession.sql(loadDataSql)
}
}
case class SourceConfig(
projectid: String,
nodeid: String,
nodename: String,
menucode: String,
actioncode: String,
uv: String,
refer_menu_code: String,
refer_action_code: String,
starttime: String,
endtime: String
)
case class User(name: String, age: Int)
......@@ -77,7 +77,7 @@ object SessionPathConvert {
referDF.createOrReplaceTempView("pref_menu_info")
val referResDF: DataFrame = referDF.select( "log_session_id","session_id","user_id","device_token","action_type","user_token","menu_code","action_code","position",
"label_value","label_class","refer_log_session_id","refer_menu_code","refer_action_code","refer_position","refer_label_value","refer_created","step_id","app_version",
"device_type","device_brand","device_model","net_type","created_time","date_time","module_class1","module_class2","refer_time_diff")
"device_type","device_brand","device_model","net_type","created_time","date_time","module_class1","module_class2","refer_time_diff","view_path")
println("referResDF.printSchema()")
referResDF.printSchema()
// println(s"referResDF.count=${referResDF.count()}")
......@@ -140,7 +140,7 @@ class SessionPathConvert {
val groupRdd = dataFrame.rdd.groupBy(r => r.getAs[String]("session_id"))
val baseRdd = groupRdd.flatMap(g => {
val session_id = g._1
val resList: ListBuffer[PathStep] = new ListBuffer[PathStep]()
val resList: ListBuffer[PathStepConvert] = new ListBuffer[PathStepConvert]()
var rowList = g._2
rowList = rowList.toList.sortWith((x, y) => x.getAs[String]("created_time") < y.getAs[String]("created_time")) //按created由大到小排序
......@@ -231,7 +231,7 @@ class SessionPathConvert {
if(menu_code=="001"){
println(s"001的referMap==${referMap}")
}
resList += PathStep(row.getAs[String]("log_session_id"),
resList += PathStepConvert(row.getAs[String]("log_session_id"),
session_id,
row.getAs[Integer]("user_id"),
row.getAs[String]("device_token"),
......@@ -257,14 +257,16 @@ class SessionPathConvert {
referMap.getOrElse("referPostion",""),
referMap.getOrElse("referLabelValue",""),
referMap.getOrElse("referCreated",""),
actionStepNew )
actionStepNew ,
row.getAs[String]("view_path")
)
})
resList.iterator
})
import dataFrame.sparkSession.implicits._
var baseDF = baseRdd.toDF("log_session_id","session_id","user_id","device_token","action_type","user_token","menu_code","action_code",
"position","label_value","label_class","app_version","device_type","device_brand","device_model","net_type", "created_time","date_time",
"module_class1","module_class2","refer_log_session_id","refer_menu_code","refer_action_code","refer_position","refer_label_value","refer_created" ,"action_step_new")
"module_class1","module_class2","refer_log_session_id","refer_menu_code","refer_action_code","refer_position","refer_label_value","refer_created" ,"action_step_new","view_path")
println("baseDF.show=======>")
baseDF.show()
baseDF.printSchema()
......@@ -289,11 +291,40 @@ class SessionPathConvert {
| refer_log_session_id,refer_menu_code,refer_action_code,refer_position,refer_label_value,
| cast(refer_time_diff as int) as refer_time_diff,refer_created,
| step_id,app_version,device_type,device_brand,device_model,net_type ,created_time,date_time, module_class1, module_class2,
| case when user_id=0 then device_token else user_id end user_identity_id
| from result_view distribute by rand()
| case when user_id=0 then device_token else user_id end user_identity_id,view_path
| from result_view
""".stripMargin
sparkSession.sql(loadDataSql)
}
}
case class PathStepConvert(log_session_id: String,
session_id: String,
user_id: Integer,
device_token: String,
action_type: String,
user_token: String,
menu_code: String,
action_code: String,
position: String,
label_value: String,
label_class: String,
app_version: String,
device_type: String,
device_brand: String,
device_model: String,
net_type: String,
created_time: String,
date_time: String,
module_class1: String,
module_class2: String,
refer_log_session_id:String,
refer_menu_code:String,
refer_action_code:String,
refer_position:String,
refer_label_value:String,
refer_created:String,
action_step_new:String,
view_path:String
)
\ No newline at end of file
Markdown 格式
0% or
您添加了 0 到此讨论。请谨慎行事。
先完成此消息的编辑!
想要评论请 注册