提交 2a97e397 编写于 作者: weicheng.mao's avatar weicheng.mao

初始化代码

上级 9bd3742d
流水线 #46728 已失败 于阶段
package com.aliyun.odps.spark.sparksql
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import java.sql
import java.sql.PreparedStatement
import config.MyConfigSession
import utils.DateUtils
import scala.collection.mutable.ListBuffer
/**
*
* @Author 毛伟成
* @Date 2021/07/12 10:58
* @Version 1.0
*/
object Pathconvertconfig {
def apply(): Pathconvertconfig = new Pathconvertconfig()
def noposition_calculation(sparkSession: SparkSession, resList: ListBuffer[SourceConfig],i:Int,j:Int,starttime:String,endtime:String) = {
val frame = sparkSession.sql(
s"""
|select
|B.projectid,B.nodeid,B.nodename,B.menucode,B.actioncode,
|B.refer_menu_code,B.refer_action_code,B.starttime,B.endtime,A.user_id,
|A.created_time,A.refer_time_diff
|from sourceConfig B
| join sourcePathConvert A on A.menu_code=B.menucode AND
| (CASE when A.action_code="" THEN '0' ELSE A.action_code END )=B.actioncode
|and A.created_day>='${starttime}' AND A.created_day<='${endtime}'
|AND B.nodeid='${j}' AND B.projectid='${i}'
|""".stripMargin)
frame.createOrReplaceTempView("path_config"+j)
val frame1: DataFrame = sparkSession.sql(
s"""
|select
|projectid,nodeid,nodename,menucode,actioncode,count(distinct user_id) as UV,
|refer_menu_code,refer_action_code,starttime,endtime
|from path_config${j}
|where nodeid='${j}' and projectid='${i}'
|group by projectid,nodeid,nodename,menucode,actioncode,refer_menu_code,refer_action_code,starttime,endtime
|""".stripMargin).toDF()
frame1.show()
val lists: List[SourceConfig] = frame1.rdd.map(row => {
SourceConfig(row.getAs[String]("projectid"),
row.getAs[String]("nodeid"),
row.getAs[String]("nodename"),
row.getAs[String]("menucode"),
row.getAs[String]("actioncode"),
row.getAs[Long]("UV").toString,
row.getAs[String]("refer_menu_code"),
row.getAs[String]("refer_action_code"),
row.getAs[String]("starttime"),
row.getAs[String]("endtime")
)
}).collect().toList
lists.foreach(row=>{
resList.append(row)
})
}
def hasPosition_calculation(sparkSession: SparkSession, resList: ListBuffer[SourceConfig], i: Int, j: Int,starttime:String,endtime:String) = {
val frame = sparkSession.sql(
s"""
|select
|B.projectid,B.nodeid,B.nodename,B.menucode,B.actioncode,
|B.refer_menu_code,B.refer_action_code,B.starttime,B.endtime,A.user_id,
|A.created_time,A.refer_time_diff
|from sourceConfig B
| join sourcePathConvert A on A.menu_code=B.menucode AND
| (CASE when A.action_code="" THEN '0' ELSE A.action_code END )=B.actioncode
| and A.position=B.position
|and A.created_day>='${starttime}' AND A.created_day<='${endtime}'
|AND B.nodeid='${j}' AND B.projectid='${i}'
|""".stripMargin)
frame.createOrReplaceTempView("path_config"+j)
val frame1: DataFrame = sparkSession.sql(
s"""
|select
|projectid,nodeid,nodename,menucode,actioncode,count(distinct user_id) as UV,
|refer_menu_code,refer_action_code,starttime,endtime
|from path_config${j}
|where nodeid='${j}' and projectid='${i}'
|group by projectid,nodeid,nodename,menucode,actioncode,refer_menu_code,refer_action_code,starttime,endtime
|""".stripMargin).toDF()
val lists: List[SourceConfig] = frame1.rdd.map(row => {
SourceConfig(row.getAs[String]("projectid"),
row.getAs[String]("nodeid"),
row.getAs[String]("nodename"),
row.getAs[String]("menucode"),
row.getAs[String]("actioncode"),
row.getAs[Long]("UV").toString,
row.getAs[String]("refer_menu_code"),
row.getAs[String]("refer_action_code"),
row.getAs[String]("starttime"),
row.getAs[String]("endtime")
)
}).collect().toList
lists.foreach(row=>{
resList.append(row)
})
}
def path_convert_calculation(sparkSession: SparkSession, resList: ListBuffer[SourceConfig], i: Int, j: Int,starttime:String,endtime:String) = {
val df2= sparkSession.sql(
s"""
|select
|B.projectid,B.nodeid,B.nodename,B.menucode,B.actioncode,
|B.refer_menu_code,B.refer_action_code,B.starttime,B.endtime,C.user_id,
|A.created_time,A.refer_created
|from sourceConfig B
|join sourcePathConvert A
|on A.menu_code=B.menucode AND
| (CASE when A.action_code="" THEN '0' ELSE A.action_code END )=B.actioncode
|join path_config${j-1} C
|on A.refer_created=C.created_time and A.user_id=C.user_id
|where B.nodeid=${j} AND B.projectid=${i} and A.created_day>='${starttime}' AND A.created_day<='${endtime}'
|""".stripMargin)
df2.createOrReplaceTempView("path_config"+j)
val outflag: Array[String] = sparkSession.sql(
s"""
|select outflag
|from sourceConfig
|where projectid=${i} and nodeid=${j}
|""".stripMargin).rdd.map(row=>row.getAs[String]("outflag")).collect()
println("outflag:--------------------------------------------------------------------"+outflag(0)+"--------------------------------------------------------------------")
println("projectid:--------------------------------------------------------------------"+i+"--------------------------------------------------------------------")
println("nodeid:--------------------------------------------------------------------"+j+"--------------------------------------------------------------------")
if(outflag(0)=="1"){
println("进来:outflag:--------------------------------------------------------------------"+outflag(0)+"--------------------------------------------------------------------")
sparkSession.sql(
s"""
|select
|B.projectid,B.nodeid,B.nodename,B.menucode,B.actioncode,user_id,
|B.starttime,B.endtime
|from path_config${j} B
|""".stripMargin).show()
println("展示:outflag:--------------------------------------------------------------------"+outflag(0)+"--------------------------------------------------------------------")
sparkSession.sql(
s"""
|insert into pica_dw.path_convert_config_user
|select
|B.projectid,B.nodeid,B.nodename,B.menucode,B.actioncode,user_id,
|B.starttime,B.endtime
|from path_config${j} B
|""".stripMargin)
}
val frame = sparkSession.sql(
s"""
|select
|B.projectid,B.nodeid,B.nodename,B.menucode,B.actioncode,count(distinct B.user_id) as UV,
|B.refer_menu_code,B.refer_action_code,B.starttime,B.endtime
|from path_config${j} B
|where nodeid=${j} and projectid=${i}
|group by B.projectid,B.nodeid,B.nodename,B.menucode,B.actioncode,B.refer_menu_code,B.refer_action_code,B.starttime,B.endtime
|""".stripMargin)
val value: List[SourceConfig] = frame.rdd.map(row => {
SourceConfig(row.getAs[String]("projectid"),
row.getAs[String]("nodeid"),
row.getAs[String]("nodename"),
row.getAs[String]("menucode"),
row.getAs[String]("actioncode"),
row.getAs[Long]("UV").toString,
row.getAs[String]("refer_menu_code"),
row.getAs[String]("refer_action_code"),
row.getAs[String]("starttime"),
row.getAs[String]("endtime")
)
}).collect().toList
value.foreach(row=>{
resList.append(row)
})
}
def path_convert_calculation2(sparkSession: SparkSession, resList: ListBuffer[SourceConfig], i: Int, j: Int, flag: Int,starttime:String,endtime:String) = {
val df2= sparkSession.sql(
s"""
|select
|B.projectid,B.nodeid,B.nodename,B.menucode,B.actioncode,
|B.refer_menu_code,B.refer_action_code,B.starttime,B.endtime,C.user_id,
|A.created_time,A.refer_created
|from sourceConfig B
|join sourcePathConvert A
|on A.menu_code=B.menucode AND
| (CASE when A.action_code="" THEN '0' ELSE A.action_code END )=B.actioncode
|join path_config${j-flag} C
|on A.refer_created=C.created_time and A.user_id=C.user_id
|where B.nodeid=${j} AND B.projectid=${i} and A.created_day>='${starttime}' AND A.created_day<='${endtime}'
|""".stripMargin)
df2.createOrReplaceTempView("path_config"+j)
val outflag: Array[String] = sparkSession.sql(
s"""
|select outflag
|from sourceConfig
|where projectid=${i} and nodeid=${j}
|""".stripMargin).rdd.map(row=>row.getAs[String]("outflag")).collect()
println("outflag:--------------------------------------------------------------------"+outflag(0)+"--------------------------------------------------------------------")
println("projectid:--------------------------------------------------------------------"+i+"--------------------------------------------------------------------")
println("nodeid:--------------------------------------------------------------------"+j+"--------------------------------------------------------------------")
if(outflag(0)=="1"){
println("进来:outflag:--------------------------------------------------------------------"+outflag(0)+"--------------------------------------------------------------------")
sparkSession.sql(
s"""
|select
|B.projectid,B.nodeid,B.nodename,B.menucode,B.actioncode,user_id,
|B.starttime,B.endtime
|from path_config${j} B
|""".stripMargin).show()
println("展示:outflag:--------------------------------------------------------------------"+outflag(0)+"--------------------------------------------------------------------")
sparkSession.sql(
s"""
|insert into pica_dw.path_convert_config_user
|select
|B.projectid,B.nodeid,B.nodename,B.menucode,B.actioncode,user_id,
|B.starttime,B.endtime
|from path_config${j} B
|""".stripMargin)
}
val frame = sparkSession.sql(
s"""
|select
|B.projectid,B.nodeid,B.nodename,B.menucode,B.actioncode,count(distinct B.user_id) as UV,
|B.refer_menu_code,B.refer_action_code,B.starttime,B.endtime
|from path_config${j} B
|where nodeid=${j} and projectid=${i}
|group by B.projectid,B.nodeid,B.nodename,B.menucode,B.actioncode,B.refer_menu_code,B.refer_action_code,B.starttime,B.endtime
|""".stripMargin)
val value: List[SourceConfig] = frame.rdd.map(row => {
SourceConfig(row.getAs[String]("projectid"),
row.getAs[String]("nodeid"),
row.getAs[String]("nodename"),
row.getAs[String]("menucode"),
row.getAs[String]("actioncode"),
row.getAs[Long]("UV").toString,
row.getAs[String]("refer_menu_code"),
row.getAs[String]("refer_action_code"),
row.getAs[String]("starttime"),
row.getAs[String]("endtime")
)
}).collect().toList
value.foreach(row=>{
resList.append(row)
})
}
def main(args: Array[String]): Unit = {
//1.执行任务之前先往record表记录
// val insertSQL: String =
// s"""
// |insert into ${MyConfigSession.DATA_BASE}.${MyConfigSession.JDBC_TABLE} (job_id,job_name,job_type,job_scn,status,start_time)
// |values(0,'pica_dw.Pathconvertconfig','3',?,'0',?)
// """.stripMargin
//设置同步数据的批次号,格式是2019-09-12
var scnData: String = DateUtils.getYesterdayDate
var fromtable = ""
var totable = ""
if (args.length >= 1) {
scnData = args(0)
if(args.length > 1 && args(1)!=""){
fromtable = args(1)
}
if(args.length > 2 && args(2)!=""){
totable = args(2)
}
}
println(s"scnData=${scnData}")
//设置任务开始时间,格式是2019-09-12 14:03:30
val startTime: String = DateUtils.getTodayTime
//存储SQL中的参数
val insertArr: Array[String] = Array[String](scnData, startTime)
//获取MYSQL连接
// val connSql: sql.Connection = JDBCUtil.getConnection()
// //向 record 表插入数据
// val flag: Int = JDBCUtil.insertRecord(connSql, insertSQL, insertArr)
val pathconvertconfig: Pathconvertconfig = Pathconvertconfig()
try {
val sparkSession: SparkSession = pathconvertconfig.getSparkSession("pathconvertconfig")
val sourceConfig: DataFrame = sparkSession.sql(
s"""
|select
|projectid,
|nodeid,
|nodename,
|menucode,
|actioncode,
|position,
|uv,
|refer_menu_code,
|refer_action_code,
|starttime,
|endtime,
|outflag
|from pica.${fromtable}
|""".stripMargin)
val sourcePathConvert: DataFrame = sparkSession.sql(MyConfigSession.SOURCE_SQL_PATH_CONVERT)
sourceConfig.createOrReplaceTempView("sourceConfig")
sourcePathConvert.createOrReplaceTempView("sourcePathConvert")
// 按照project,node和去循环 按照对应表去和sessterm关联
//查询出有多少个项目
val projectMax: Array[Int] = sparkSession.sql(
"""
|select max(cast(projectid as int)) as maxp
|from sourceConfig
|""".stripMargin).rdd.map(row=>row.getAs[Int]("maxp")).collect()
val resList: ListBuffer[SourceConfig] = new ListBuffer[SourceConfig]()
for(i<- 1 to projectMax(0)){
println("project")
//查出每个project的最大的node
val maxnode = sparkSession.sql(
s"""
|select max(cast(nodeid as int)) as maxnode
|from sourceConfig
|where projectid='${i}'
|""".stripMargin).rdd.map(row=>row.getAs[Int]("maxnode")).collect()
var thisMenuCode = ""
//距离上一个不同的页面的标记
var flag=1
for(j <- 1 to maxnode(0)){
if(j==1){
//取出当前的mencode值
val menucode = sparkSession.sql(
s"""
|select menucode
|from sourceConfig
|where projectid=${i} and nodeid=${j}
|""".stripMargin).rdd.map(row=>row.getAs[String]("menucode")).collect()
val position = sparkSession.sql(
s"""
|select position
|from sourceConfig
|where projectid=${i} and nodeid=${j}
|""".stripMargin).rdd.map(row=>row.getAs[String]("position")).collect()(0)
println("展示前:position:--------------------------------------------------------------------"+position+"--------------------------------------------------------------------")
val starttime = sparkSession.sql(
s"""
|select starttime
|from sourceConfig
|where projectid=${i} and nodeid=${j}
|""".stripMargin).rdd.map(row=>row.getAs[String]("starttime")).collect()(0)
println("展示前:starttime:--------------------------------------------------------------------"+starttime+"--------------------------------------------------------------------")
val endtime = sparkSession.sql(
s"""
|select endtime
|from sourceConfig
|where projectid=${i} and nodeid=${j}
|""".stripMargin).rdd.map(row=>row.getAs[String]("endtime")).collect()(0)
println("展示前:endtime:--------------------------------------------------------------------"+endtime+"--------------------------------------------------------------------")
if(position =="0"){
println("展示0:position:--------------------------------------------------------------------"+position+"--------------------------------------------------------------------")
println("展示0:i:--------------------------------------------------------------------"+i+"--------------------------------------------------------------------")
println("展示0:j:--------------------------------------------------------------------"+j+"--------------------------------------------------------------------")
noposition_calculation(sparkSession,resList,i,j,starttime,endtime)
println("------------------------------------------------------------长度--------------------------------------------------------------"+resList.size)
}else{
println("展示不为0:position:--------------------------------------------------------------------"+position+"--------------------------------------------------------------------")
println("展示0:i:--------------------------------------------------------------------"+i+"--------------------------------------------------------------------")
println("展示0:j:--------------------------------------------------------------------"+j+"--------------------------------------------------------------------")
hasPosition_calculation(sparkSession,resList,i,j,starttime,endtime)
println("------------------------------------------------------------长度--------------------------------------------------------------"+resList.size)
}
thisMenuCode = menucode(0)
//非 node1的计算
}else{
//取出当前的mencode的值与上一个不同的页面值
val menucode = sparkSession.sql(
s"""
|select menucode
|from sourceConfig
|where projectid=${i} and nodeid=${j}
|""".stripMargin).rdd.map(row=>row.getAs[String]("menucode")).collect()
println("当前的mencode:"+menucode(0))
println("上一个的mencode:"+thisMenuCode)
val starttime = sparkSession.sql(
s"""
|select starttime
|from sourceConfig
|where projectid=${i} and nodeid=${j}
|""".stripMargin).rdd.map(row=>row.getAs[String]("starttime")).collect()(0)
println("展示前:starttime:--------------------------------------------------------------------"+starttime+"--------------------------------------------------------------------")
val endtime = sparkSession.sql(
s"""
|select endtime
|from sourceConfig
|where projectid=${i} and nodeid=${j}
|""".stripMargin).rdd.map(row=>row.getAs[String]("endtime")).collect()(0)
println("展示前:endtime:--------------------------------------------------------------------"+endtime+"--------------------------------------------------------------------")
if(thisMenuCode !=menucode(0)){
//重置 距离上一个不同的页面的
flag=1
println("展示0:i:--------------------------------------------------------------------"+i+"--------------------------------------------------------------------")
println("展示0:j:--------------------------------------------------------------------"+j+"--------------------------------------------------------------------")
path_convert_calculation(sparkSession,resList,i,j,starttime,endtime)
println("------------------------------------------------------------长度--------------------------------------------------------------"+resList.size)
//以下为相同的node处理逻辑
}else{
//取出当前的position值
val position = sparkSession.sql(
s"""
|select position
|from sourceConfig
|where projectid=${i} and nodeid=${j}
|""".stripMargin).rdd.map(row=>row.getAs[String]("position")).collect()(0)
if(j==2){
if(position=="0") {
println("展示0:i:--------------------------------------------------------------------"+i+"--------------------------------------------------------------------")
println("展示0:j:--------------------------------------------------------------------"+j+"--------------------------------------------------------------------")
noposition_calculation(sparkSession, resList, i, j,starttime,endtime)
println("------------------------------------------------------------长度--------------------------------------------------------------"+resList.size)
}else{
println("展示0:i:--------------------------------------------------------------------"+i+"--------------------------------------------------------------------")
println("展示0:j:--------------------------------------------------------------------"+j+"--------------------------------------------------------------------")
hasPosition_calculation(sparkSession,resList,i,j,starttime,endtime)
println("------------------------------------------------------------长度--------------------------------------------------------------"+resList.size)
}
}else{
flag=flag+1
println("展示0:i:--------------------------------------------------------------------"+i+"--------------------------------------------------------------------")
println("展示0:j:--------------------------------------------------------------------"+j+"--------------------------------------------------------------------")
path_convert_calculation2(sparkSession,resList,i,j,flag,starttime,endtime)
}
}
thisMenuCode =menucode(0)
}
}
}
import sparkSession.implicits._
val df = resList.toDF()
df.printSchema()
df.show()
println("长度"+resList.size)
pathconvertconfig.loadData(df,sparkSession,totable)
println("----------------------------------update task record table---------------------------------------")
//任务执行成功,更新 Mysql record 配置表
val updateSQL: String =
s"""
|update ${MyConfigSession.JDBC_TABLE} set status=?,end_time=?,data_count=? where job_name='pica_dw.Pathconvertconfig' and start_time='${startTime}'
""".stripMargin
// val upreSta: PreparedStatement = connSql.prepareStatement(updateSQL)
// upreSta.setString(1, "1")
// upreSta.setString(2, DateUtils.getTodayTime)
// upreSta.setInt(3, resList.size)
// //更新表数据
// upreSta.executeUpdate()
// //关闭连接
// JDBCUtil.close(connSql, upreSta)
sparkSession.stop()
}catch {
case e:Exception => {
println("-----------------------------------任务异常---------------------------------------------------")
e.printStackTrace()
// val exceptionSQL: String =
// s"""
// |update ${MyConfigSession.JDBC_TABLE} set status=?,exception=?,end_time=? where job_id=0 and start_time='${startTime}'
// """.stripMargin
// val errorArr = Array[String]("2", e.getMessage, DateUtils.getTodayTime)
// // JDBCUtil.insertRecord(connSql, exceptionSQL, errorArr)
// connSql.close()
}
}
}
}
class Pathconvertconfig {
def getSparkSession(appName: String): SparkSession = {
val sparkSession = SparkSession
.builder()
.config("spark.hadoop.odps.project.name", "pica")
.appName("SessionProcessTerm")
.getOrCreate()
sparkSession
}
/**
* @Description 导入数据到表中
* @param dataFrame 源数据
* @param sparkSession SparkSession 环境
* @return void
**/
def loadData(dataFrame: DataFrame, sparkSession: SparkSession,totable:String):Unit = {
dataFrame.createOrReplaceTempView("result_view")
val tableName = s"pica.${totable}"
val loadDataSql =
s"""
|insert overwrite table ${tableName}
|select
|projectid,nodeid,nodename,menucode,actioncode,UV,
|refer_menu_code,refer_action_code,starttime,endtime,current_date
|from result_view
""".stripMargin
sparkSession.sql(loadDataSql)
}
}
case class SourceConfig(
projectid: String,
nodeid: String,
nodename: String,
menucode: String,
actioncode: String,
uv: String,
refer_menu_code: String,
refer_action_code: String,
starttime: String,
endtime: String
)
case class User(name: String, age: Int)
package com.aliyun.odps.spark.sparksql
import java.sql
import java.sql.PreparedStatement
import config.MyConfigSession
import org.apache.spark.SparkConf
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.expressions.{Window, WindowSpec}
import org.apache.spark.sql.functions.{lag, row_number}
import utils.{DateUtils, UseUtil}
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import scala.util.control.Breaks.{break, breakable}
/**
* 处理昨天的数据,导入到pica_dw.dw_fact_log_session_path_new表
* @Author maoweicheng
* @Date 2021/07/12 09:58
* @Version 1.0
*/
object SessionPathConvert {
def apply(): SessionPathConvert = new SessionPathConvert()
def main(args: Array[String]): Unit = {
//1.执行任务之前先往record表记录
// val insertSQL: String =
// s"""
// |insert into ${MyConfigSession.DATA_BASE}.${MyConfigSession.JDBC_TABLE} (job_id,job_name,job_type,job_scn,status,start_time)
// |values(0,'pica_dw.dw_fact_log_session_path_convert','3',?,'0',?)
// """.stripMargin
//设置同步数据的批次号,格式是2019-09-12
var scnData: String = DateUtils.getYesterdayDate
var elseFiler = " 1=1"
if (args.length >= 1) {
scnData = args(0)
if(args.length > 1 && args(1)!=""){
elseFiler = args(1)
}
}
println(s"scnData=${scnData}")
//设置任务开始时间,格式是2019-09-12 14:03:30
val startTime: String = DateUtils.getTodayTime
//存储SQL中的参数
val insertArr: Array[String] = Array[String](scnData, startTime)
//获取MYSQL连接
// val connSql: sql.Connection = JDBCUtil.getConnection()
// //向 record 表插入数据
// val flag: Int = JDBCUtil.insertRecord(connSql, insertSQL, insertArr)
val session_path_convert = SessionPathConvert()
try {
val sparkSession: SparkSession = session_path_convert.getSparkSession("Session_path_convert")
//获取position对应的label_value广播变量
val positionUrlLabelBroad = UseUtil.getBroadcast(sparkSession, MyConfigSession.ACTION_URLLABEL_SQL, "url_content", "label_value")
println(s"positionUrlLabelBroad=${positionUrlLabelBroad.value}")
//筛选源数据
println(s"查询sql:${MyConfigSession.SOURCE_PATH_CONVERT +s" and created_day='${scnData}' and ${elseFiler}"}")
val sourceDF: DataFrame = sparkSession.sql(MyConfigSession.SOURCE_PATH_CONVERT +s" and created_day='${scnData}' and ${elseFiler}")
// sourceDF.show()
println(s"sourceDF.count=${sourceDF.count()}")
var pathStepDF = session_path_convert.getReferColumns(sparkSession,sourceDF.where("action_type IN ('ACTION_CLICK','ACTION_VIEW')"),positionUrlLabelBroad )
println(s"pathStepDF.count=${pathStepDF.count()}")
pathStepDF.createOrReplaceTempView("menu_refer_record")
println("-----------------------------------compute refer columns-----------------------------------------")
val referDF= sparkSession.sql("select t.*," +
"(cast(t.created_time as bigint) - cast(t.refer_created as bigint) ) refer_time_diff, " +
"row_number() over(partition by t.session_id order by t.created_time ) step_id " +
"from menu_refer_record t " ) //
referDF.createOrReplaceTempView("pref_menu_info")
val referResDF: DataFrame = referDF.select( "log_session_id","session_id","user_id","device_token","action_type","user_token","menu_code","action_code","position",
"label_value","label_class","refer_log_session_id","refer_menu_code","refer_action_code","refer_position","refer_label_value","refer_created","step_id","app_version",
"device_type","device_brand","device_model","net_type","created_time","date_time","module_class1","module_class2","refer_time_diff","view_path")
println("referResDF.printSchema()")
referResDF.printSchema()
// println(s"referResDF.count=${referResDF.count()}")
println("------------------------------------单独计算label_value----------------------------------------------")
//"menu_code = '930' and action_code IN ( '930000', '930001', '930002' ) and action_type = 'ACTION_CLICK'
println("-----------------------------------load data to pica_dw.dw_fact_log_session_path-----------------")
session_path_convert.loadData(referResDF,sparkSession,scnData)
println("----------------------------------update task record table---------------------------------------")
sparkSession.stop()
}catch {
case e:Exception => {
println("-----------------------------------任务异常---------------------------------------------------")
e.printStackTrace()
// val exceptionSQL: String =
// s"""
// |update ${MyConfigSession.JDBC_TABLE} set status=?,exception=?,end_time=? where job_id=0 and start_time='${startTime}'
// """.stripMargin
// val errorArr = Array[String]("2", e.getMessage, DateUtils.getTodayTime)
// JDBCUtil.insertRecord(connSql, exceptionSQL, errorArr)
// connSql.close()
}
}
}
}
class SessionPathConvert {
def getSparkSession(appName: String): SparkSession = {
// / val conf: SparkConf = new SparkConf().setAppName(appName)
// // UseUtil.setConfigure(conf)
// // val sparkSession: SparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
// // sparkSession
val sparkSession = SparkSession
.builder()
.config("spark.hadoop.odps.project.name", "pica")
.appName("SessionProcessTerm")
.getOrCreate()
sparkSession
}
/**
* @Description 获取需要的字段的refer字段
* @param dataFrame 源数据
* @return org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>
**/
def getReferColumns( spark: SparkSession,dataFrame: DataFrame,positionUrlLabelBroad:Broadcast[Map[String,String]]) = {
val groupRdd = dataFrame.rdd.groupBy(r => r.getAs[String]("session_id"))
val baseRdd = groupRdd.flatMap(g => {
val session_id = g._1
val resList: ListBuffer[PathStepConvert] = new ListBuffer[PathStepConvert]()
var rowList = g._2
rowList = rowList.toList.sortWith((x, y) => x.getAs[String]("created_time") < y.getAs[String]("created_time")) //按created由大到小排序
var preId = ""
var preMenuCode = ""
var referLabelValue = ""
var prePosition = ""
var preActionCode = ""
var preActionType = ""
var preCreated = ""
var actionStepNew = "0_0"
var thisMenuCode = ""
var referMap = mutable.Map[String, String]()
import scala.collection.mutable._
var menuStack = Stack[ListBuffer[String]]()
//获取session的上个menu_code的最后一次action_code值,如无则空
rowList.foreach(row => {
var log_session_id = row.getAs[String]("log_session_id")
var menu_code = row.getAs[String]("menu_code")
var action_type=row.getAs[String]("action_type")
var action_code=row.getAs[String]("action_code")
val position = row.getAs[String]("position")
val label_value = row.getAs[String]("label_value")
val created = row.getAs[String]("created_time")
//重新计算action_step_new
//如果menuCode发生改变,或者session改变,重置本轮menuCode
if (!thisMenuCode.equals(menu_code) ) {
thisMenuCode = menu_code
if(preActionType=="ACTION_CLICK"||preActionType=="ACTION_VIEW"){//只有上个页面有过click行为时才改变refer值,否则refer值为空
referMap("referLogSessionId") = preId
referMap("referMenuCode") = preMenuCode
referMap("referActionCode") = preActionCode
referMap("referPostion") = prePosition
referMap("referLabelValue") = referLabelValue
referMap("referCreated") = preCreated
//menu_code改变,先查询栈顶记录,如果栈顶记录的pre记录与当前menu_code一致,则弹出栈顶元素,否则将上个menu_code的信息放入站中
//弹出栈顶记录后,下个栈顶记录数据中的refer信息作为该menu_code的refer(前提当前页面不是首页)
if(!menuStack.isEmpty && menuStack.top.apply(1) == menu_code ){
var preMenuInfo = menuStack.pop()
if(!menuStack.isEmpty){
referMap("referLogSessionId") = menuStack.top.apply(0)
referMap("referMenuCode") = menuStack.top.apply(1)
referMap("referActionCode") = menuStack.top.apply(2)
referMap("referPostion") = menuStack.top.apply(3)
referMap("referLabelValue") = menuStack.top.apply(4)
referMap("referCreated") = menuStack.top.apply(5)
}else{
referMap("referLogSessionId") = ""
referMap("referMenuCode") = ""
referMap("referActionCode") = ""
referMap("referPostion") = ""
referMap("referLabelValue") = ""
referMap("referCreated") = ""
}
}else{
var referInfo = ListBuffer(preId,preMenuCode,preActionCode,prePosition,referLabelValue,preCreated)
menuStack.push(referInfo)
}
if(menu_code=="001"){//针对首页,都认为是返回类型操作,将refer指向空
menuStack.clear()
referMap("referLogSessionId") = ""
referMap("referMenuCode") = ""
referMap("referActionCode") = ""
referMap("referPostion") = ""
referMap("referLabelValue") = ""
referMap("referCreated") = ""
}
}else{
referMap("referLogSessionId") = ""
referMap("referMenuCode") = ""
referMap("referActionCode") = ""
referMap("referPostion") = ""
referMap("referLabelValue") = ""
referMap("referCreated") = ""
}
actionStepNew = (actionStepNew.split("_")(0).toInt + 1).toString + "_0"
} else {//如果menuCode与session都不变, 则调整actionStep
actionStepNew = (actionStepNew.split("_")(0)) + "_" + (actionStepNew.split("_")(1).toInt + 1).toString
}
preId = log_session_id
preMenuCode = menu_code
preActionType= action_type
preActionCode = action_code
prePosition = position
referLabelValue = label_value
preCreated = created
if(menu_code=="001"){
println(s"001的referMap==${referMap}")
}
resList += PathStepConvert(row.getAs[String]("log_session_id"),
session_id,
row.getAs[Long]("user_id"),
row.getAs[String]("device_token"),
action_type,
row.getAs[String]("user_token"),
menu_code,
row.getAs[String]("action_code"),
row.getAs[String]("position"),
row.getAs[String]("label_value"),
row.getAs[String]("label_class"),
row.getAs[String]("app_version"),
row.getAs[String]("device_type"),
row.getAs[String]("device_brand"),
row.getAs[String]("device_model"),
row.getAs[String]("net_type"),
row.getAs[String]("created_time"),
row.getAs[String]("date_time"),
row.getAs[String]("module_class1"),
row.getAs[String]("module_class2"),
referMap.getOrElse("referLogSessionId",""),
referMap.getOrElse("referMenuCode",""),
referMap.getOrElse("referActionCode",""),
referMap.getOrElse("referPostion",""),
referMap.getOrElse("referLabelValue",""),
referMap.getOrElse("referCreated",""),
actionStepNew ,
row.getAs[String]("view_path")
)
})
resList.iterator
})
import dataFrame.sparkSession.implicits._
var baseDF = baseRdd.toDF("log_session_id","session_id","user_id","device_token","action_type","user_token","menu_code","action_code",
"position","label_value","label_class","app_version","device_type","device_brand","device_model","net_type", "created_time","date_time",
"module_class1","module_class2","refer_log_session_id","refer_menu_code","refer_action_code","refer_position","refer_label_value","refer_created" ,"action_step_new","view_path")
println("baseDF.show=======>")
baseDF.show()
baseDF.printSchema()
baseDF
}
/**
* @Description 导入数据到表中
* @param dataFrame 源数据
* @param sparkSession SparkSession 环境
* @param partitionDay 分区日期
* @return void
**/
def loadData(dataFrame: DataFrame, sparkSession: SparkSession, partitionDay:String):Unit = {
dataFrame.createOrReplaceTempView("result_view")
val tableName = "pica.dwd_fact_log_session_path_convert"
val loadDataSql =
s"""
|insert overwrite table ${tableName} partition(created_day='${partitionDay}')
| select log_session_id, session_id,user_id,action_type,user_token,menu_code,action_code,position,label_value,label_class,
| refer_log_session_id,refer_menu_code,refer_action_code,refer_position,refer_label_value,
| cast(refer_time_diff as int) as refer_time_diff,refer_created,
| step_id,app_version,device_type,device_brand,device_model,net_type ,created_time,date_time, module_class1, module_class2,
| case when user_id=0 then device_token else user_id end user_identity_id,view_path
| from result_view
""".stripMargin
sparkSession.sql(loadDataSql)
}
}
case class PathStepConvert(log_session_id: String,
session_id: String,
user_id: Long,
device_token: String,
action_type: String,
user_token: String,
menu_code: String,
action_code: String,
position: String,
label_value: String,
label_class: String,
app_version: String,
device_type: String,
device_brand: String,
device_model: String,
net_type: String,
created_time: String,
date_time: String,
module_class1: String,
module_class2: String,
refer_log_session_id:String,
refer_menu_code:String,
refer_action_code:String,
refer_position:String,
refer_label_value:String,
refer_created:String,
action_step_new:String,
view_path:String
)
\ No newline at end of file
Markdown 格式
0% or
您添加了 0 到此讨论。请谨慎行事。
先完成此消息的编辑!
想要评论请 注册