提交 e17e1ffe 编写于 作者: wuyunfeng's avatar wuyunfeng

修改session_term过滤条件

上级 bb76e55c
package com.session
import java.sql
import java.sql.PreparedStatement
import com.config.MyConfigSession
import com.pica.utils.DateUtils
import com.utils.{JDBCUtil, UseUtil}
import org.apache.spark.SparkConf
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
/**
* 处理结果存入 pica_dw.dw_fact_log_session_menu_calc_new
* @Author yunfeng.wu
* @Date 2020/06/12 10:23
* @Version 1.0
*/
class SessionMenuCalcNew extends Serializable{
def getSparkSession(appName: String): SparkSession = {
val conf: SparkConf = new SparkConf().setAppName(appName)
UseUtil.setConfigure(conf)
val sparkSession: SparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
sparkSession
}
val sparkSession: SparkSession = getSparkSession("SessionMenuCalcNew")
def handleByMcPart1(spark: SparkSession,createdDay:String) = {
var df = spark.sql("select cast(user_id as string),session_id,created_time,date_time, menu_code,refer_menu_code,action_code,nvl(menu_time_diff,0) menu_time_diff " +
s"from pica_dw.dw_fact_log_session_term where created_day='${createdDay}' and menu_code!='200' and action_type in ('ACTION_CLICK','ACTION_VIEW') ")//and user_id='1000000186'
val groupRdd = df.rdd.groupBy(row => row.getAs[String]("user_id") + "_" + row.getAs[String]("session_id"))
val resRdd = groupRdd.flatMap(g => {
val user_session_id: String = g._1
val user_id = user_session_id.split("_")(0)
val session_id = user_session_id.split("_")(1)
var rowList: Iterable[Row] = g._2
//定义一个累加量
var before_menu = ""
var this_menu = ""
var series = 1
val list = new ListBuffer[Row]() //[(String,String,String,Integer,String,String,String)]
var mc_during_map = Map[String, Integer]()
var mc_time_map = Map[String, ArrayBuffer[String]]()
rowList = rowList.toList.sortBy(_.getAs[String]("created_time"))
rowList.foreach(row => {
this_menu = row.getAs[String]("menu_code")
val menu_time_diff = row.getAs[Integer]("menu_time_diff")
val created_time = row.getAs[String]("created_time")
var key = this_menu + "_" + series
if ("".equals(before_menu) || this_menu.equals(before_menu)) {
var sum_during: Integer = mc_during_map.getOrElse(key, 0)
mc_during_map.+=(key -> (menu_time_diff + sum_during))
before_menu = this_menu
} else {
series += 1
key = this_menu + "_" + series
var sum_during: Integer = mc_during_map.getOrElse(key, 0)
mc_during_map.+=(key -> (menu_time_diff + sum_during))
before_menu = this_menu
}
var time_arr: ArrayBuffer[String] = mc_time_map.getOrElse(key, new ArrayBuffer[String]())
time_arr += (created_time)
mc_time_map.+=(key -> time_arr)
})
mc_during_map.foreach(kv => {
val ar: ArrayBuffer[String] = mc_time_map.getOrElse(kv._1, new ArrayBuffer[String]())
// println((user_id, session_id, kv._1, kv._2, kv._1.split("_")(0), ar.toArray.min, ar.toArray.max))
list.append(Row(user_id, session_id, kv._1, kv._2, kv._1.split("_")(0),"", ar.toArray.min, ar.toArray.max))
})
list.toList
})
resRdd
}
def handleByMcPart2(spark: SparkSession, createdDay: String) = {
var df = spark.sql("select cast(user_id as string),session_id,created_time,date_time, menu_code,refer_menu_code,action_code,nvl(menu_time_diff,0) menu_time_diff " +
s"from pica_dw.dw_fact_log_session_term where created_day='${createdDay}' and menu_code ='200' and action_type in ('ACTION_CLICK','ACTION_VIEW') ")
val groupRdd = df.rdd.groupBy(row => row.getAs[String]("user_id") + "_" + row.getAs[String]("session_id"))
val resRdd = groupRdd.flatMap(g => {
val user_session_id: String = g._1
val user_id = user_session_id.split("_")(0)
val session_id = user_session_id.split("_")(1)
var rowList: Iterable[Row] = g._2
//定义一个累加量
var before_action = ""
var this_action = ""
var series = 1
val list = new ListBuffer[Row]() //[(String,String,String,Integer,String,String,String)]
var ac_during_map = Map[String, Integer]()
var ac_time_map = Map[String, ArrayBuffer[String]]()
rowList = rowList.toList.sortBy(_.getAs[String]("created_time"))
rowList.foreach(row => {
this_action = row.getAs[String]("action_code")
val menu_time_diff = row.getAs[Integer]("menu_time_diff")
val created_time = row.getAs[String]("created_time")
var key = this_action + "_" + series
if ("".equals(before_action) || this_action.equals(before_action)) {
var sum_during: Integer = ac_during_map.getOrElse(key, 0)
ac_during_map.+=(key -> (menu_time_diff + sum_during))
before_action = this_action
} else {
series += 1
key = this_action + "_" + series
var sum_during: Integer = ac_during_map.getOrElse(key, 0)
ac_during_map.+=(key -> (menu_time_diff + sum_during))
before_action = this_action
}
var time_arr: ArrayBuffer[String] = ac_time_map.getOrElse(key, new ArrayBuffer[String]())
time_arr += (created_time)
ac_time_map.+=(key -> time_arr)
})
ac_during_map.foreach(kv => {
val ar: ArrayBuffer[String] = ac_time_map.getOrElse(kv._1, new ArrayBuffer[String]())
list.append(Row(user_id, session_id, "200_0", kv._2,"200", kv._1.split("_")(0), ar.toArray.min, ar.toArray.max))
})
list.toList
})
resRdd
}
}
object SessionMenuCalcNew {
def apply(): SessionMenuCalcNew = new SessionMenuCalcNew()
def main(args: Array[String]): Unit = {
if (args.length < 2) {
System.err.println("Usage: SessionMenuCalcNew <dbTable> <createdDay>")
System.exit(1)
}
//1.执行任务之前先往record表记录
val insertSQL: String =
s"""
|insert into ${MyConfigSession.DATA_BASE}.${MyConfigSession.JDBC_TABLE} (job_id,job_name,job_type,job_scn,status,start_time)
|values(0,'pica_dw.dw_fact_log_session_menu_calc_new','3',?,'0',?)
""".stripMargin
val dbTable = args.apply(0)
val createdDay = args.apply(1)
println(s"dbTable:${dbTable},createdDay:${createdDay}")
//设置任务开始时间,格式是2019-09-12 14:03:30
val startTime: String = DateUtils.getTodayTime
//存储SQL中的参数
val insertArr: Array[String] = Array[String](createdDay, startTime)
//获取MYSQL连接
val connSql: sql.Connection = JDBCUtil.getConnection()
//向 record 表插入数据
val flag: Int = JDBCUtil.insertRecord(connSql, insertSQL, insertArr)
try {
val sessionMenuCalcNew: SessionMenuCalcNew = SessionMenuCalcNew()
val resRdd1 = sessionMenuCalcNew.handleByMcPart1(sessionMenuCalcNew.sparkSession,createdDay)
val resRdd2 = sessionMenuCalcNew.handleByMcPart2(sessionMenuCalcNew.sparkSession,createdDay)
val resRdd = resRdd1.union(resRdd2)
resRdd.take(20)
val resDf = sessionMenuCalcNew.sparkSession.createDataFrame(resRdd, StructType(
List(StructField("user_id", StringType, false),
StructField("session_id", StringType, false),
StructField("menu_code_term", StringType, false),
StructField("during_by_refer", IntegerType, false),
StructField("menu_code", StringType, false),
StructField("action_code", StringType, false),
StructField("begin_time", StringType, false),
StructField("end_time", StringType, false))
))
resDf.printSchema()
resDf.createOrReplaceTempView("session_menu_view_calc")
sessionMenuCalcNew.sparkSession.sql(s"insert overwrite table ${dbTable} partition(created_day='${createdDay}') " +
s"select cast(user_id as int) user_id,session_id,menu_code_term,during_by_refer,menu_code,action_code,begin_time,end_time from session_menu_view_calc distribute by rand()")
println("----------------------------------update task record table---------------------------------------")
//任务执行成功,更新 Mysql record 配置表
val updateSQL: String =s"update ${MyConfigSession.JDBC_TABLE} set status=?,end_time=?,data_count=? where job_name='pica_dw.dw_fact_log_session_menu_calc_new' and start_time='${startTime}'"
val endTime: String = DateUtils.getTodayTime
val upreSta: PreparedStatement = connSql.prepareStatement(updateSQL)
upreSta.setString(1, "1")
upreSta.setString(2, endTime)
upreSta.setInt(3, resDf.count().toInt)
//更新表数据
upreSta.executeUpdate()
//关闭连接
JDBCUtil.close(connSql, upreSta)
sessionMenuCalcNew.sparkSession.stop()
} catch {
case e: Exception => {
println(s"-----------------------------------任务异常:e=${e}---------------------------------------------------")
e.printStackTrace()
val exceptionSQL: String =
s"""
|update ${MyConfigSession.JDBC_TABLE} set status=?,exception=?,end_time=? where job_name='pica_dw.dw_fact_log_session_menu_calc_new' and start_time='${startTime}'
""".stripMargin
val errorArr = Array[String]("2", e.getMessage, DateUtils.getTodayTime)
JDBCUtil.insertRecord(connSql, exceptionSQL, errorArr)
connSql.close()
}
}
}
}
...@@ -43,11 +43,11 @@ object SessionProcessTerm { ...@@ -43,11 +43,11 @@ object SessionProcessTerm {
""".stripMargin """.stripMargin
//设置同步数据的批次号,格式是2019-09-12 //设置同步数据的批次号,格式是2019-09-12
var scnData: String = DateUtils.getYesterdayDate var scnData: String = DateUtils.getYesterdayDate
var condition = " 1=1" var elseFiler = " 1=1"
if (args.length >= 1) { if (args.length >= 1) {
scnData = args(0) scnData = args(0)
if(args.length > 1 && args(1)!=""){ if(args.length > 1 && args(1)!=""){
condition = args(1) elseFiler = args(1)
} }
} }
println(s"scnData=${scnData}") println(s"scnData=${scnData}")
...@@ -72,33 +72,35 @@ object SessionProcessTerm { ...@@ -72,33 +72,35 @@ object SessionProcessTerm {
| from ${MyConfigSession.HIVE_TABLE0} | from ${MyConfigSession.HIVE_TABLE0}
| where servicename='trace2' and action!='ACTION_EQUIP_INFO' | where servicename='trace2' and action!='ACTION_EQUIP_INFO'
| and (case when ((view_class like '%YunqueApp%' and action!='ACTION_HEART_BEAT') or LENGTH(view_class)<=3 or view_class='YQGuidePageViewVC') then '2' else '1' end)='1' | and (case when ((view_class like '%YunqueApp%' and action!='ACTION_HEART_BEAT') or LENGTH(view_class)<=3 or view_class='YQGuidePageViewVC') then '2' else '1' end)='1'
| and created_day='${scnData}' and ${condition} | and created_day='${scnData}' and ${elseFiler}
|""".stripMargin |""".stripMargin
val sourceDF: DataFrame = sparkSession.sql(SOURCE_SQL_TERM ) val sourceDF: DataFrame = sparkSession.sql(SOURCE_SQL_TERM )
println("sourceDF.show==================") println("sourceDF.show==================")
sourceDF.printSchema() sourceDF.printSchema()
sourceDF.createOrReplaceTempView("session_term_ods") sourceDF.createOrReplaceTempView("session_term_ods")
//过滤掉action事件<=2个的session,以及首页加载前的引导页数据(view_class=YQGuidePageViewVC) //过滤掉action事件<=2个的session,以及首页加载前的引导页数据(view_class=YQGuidePageViewVC)
SOURCE_SQL_TERM = // SOURCE_SQL_TERM =
s""" // s"""
|select t1.* from session_term_ods t1 // |select t1.* from session_term_ods t1
|join (select pseudo_session from session_term_ods group by pseudo_session having count(distinct action)>2) t2 // |join (select pseudo_session from session_term_ods group by pseudo_session having count(distinct action)>2) t2
|on t1.pseudo_session = t2.pseudo_session // |on t1.pseudo_session = t2.pseudo_session
|where t1.menu_code_offset not in('0','null','') // |where t1.menu_code_offset not in('0','null','')
|""".stripMargin // |""".stripMargin
val selectDF = sparkSession.sql(SOURCE_SQL_TERM).drop("menu_code_new") // val selectDF = sparkSession.sql(SOURCE_SQL_TERM).drop("menu_code_new")
println("selectDF.show========") // println("selectDF.show========")
selectDF.printSchema() // selectDF.printSchema()
println("selectDF.count=========",selectDF.count()) println("selectDF.count=========",sourceDF.count())
val conditionGroup = List("<='4' ","between '5' and '9'",">'9'") // val conditionGroup = List("<'?4' ","between '4' and '7'","between '8' and 'b'",">'b'")
// val conditionGroup = List("='0'","='1'","='2'","='3'","='4'","='5'","='6'","='7'","='8'","='9'", // val conditionGroup = List("='0'","='1'","='2'","='3'","='4'","='5'","='6'","='7'","='8'","='9'",
// "='a'","='b'","='c'","='d'","='e'","='f'") // "='a'","='b'","='c'","='d'","='e'",">='f'")
val conditionGroup = List( "<='1'","between '2' and '3'","between '4' and '5'","between '6' and '7'","between '8' and '9'",
"between 'a' and 'b'","between 'c' and 'd'",">='e'" )
var dataCount = 0 var dataCount = 0
var index = 0 var index = 0
selectDF.persist(StorageLevel.MEMORY_AND_DISK_SER) sourceDF.persist(StorageLevel.MEMORY_AND_DISK_SER)
for(condition <- conditionGroup){ for(condition <- conditionGroup){
index += 1 index += 1
val slideDF = selectDF.where(s" SUBSTRING(pseudo_session,2,1) ${condition}").repartition(100) val slideDF = sourceDF.where(s" SUBSTRING(pseudo_session,2,1) ${condition}").repartition(100)
println(s"-----------------------------------compute refer columns,condition=${condition}-----------------------------------------") println(s"-----------------------------------compute refer columns,condition=${condition}-----------------------------------------")
val referResultRdd = sessionProcessTerm.getReferColumns(slideDF) val referResultRdd = sessionProcessTerm.getReferColumns(slideDF)
val referResultDF: DataFrame = sparkSession.createDataFrame(referResultRdd, StructType( val referResultDF: DataFrame = sparkSession.createDataFrame(referResultRdd, StructType(
...@@ -134,7 +136,7 @@ object SessionProcessTerm { ...@@ -134,7 +136,7 @@ object SessionProcessTerm {
| left join(select session_id,min(app_version) min_version from refer_result_table group by session_id) b on a.session_id=b.session_id | left join(select session_id,min(app_version) min_version from refer_result_table group by session_id) b on a.session_id=b.session_id
|""".stripMargin |""".stripMargin
//处理session最小版本>='3.4.5'的session数据 //处理session最小版本>='3.4.5'的session数据
val newVersionMenuDF = sparkSession.sql(s"${getMenuTermSql} where b.min_version>='3.4.5' and a.action in('ACTION_ACTIVITY_RESUME','ACTION_HEART_BEAT') ") val newVersionMenuDF = sparkSession.sql(s"${getMenuTermSql} where b.min_version>='3.4.5' and a.action in('ACTION_ACTIVITY_RESUME','ACTION_HEART_BEAT','ACTION_WEB_ENTER') ")
println("newVersionMenuDF,show()======") println("newVersionMenuDF,show()======")
val oldVersionMenuDF = sparkSession.sql(s"${getMenuTermSql} where b.min_version<'3.4.5' and a.action_type in ('ACTION_VIEW','ACTION_HEART')") val oldVersionMenuDF = sparkSession.sql(s"${getMenuTermSql} where b.min_version<'3.4.5' and a.action_type in ('ACTION_VIEW','ACTION_HEART')")
println("oldVersionMenuDF,show()======") println("oldVersionMenuDF,show()======")
...@@ -353,7 +355,7 @@ class SessionProcessTerm { ...@@ -353,7 +355,7 @@ class SessionProcessTerm {
if ("ACTION_VIEW".equals(actionType) || "ACTION_HEART".equals(actionType)) { if ("ACTION_VIEW".equals(actionType) || "ACTION_HEART".equals(actionType)) {
if (appVersion >= "3.4.5") { //针对3.4.5之后的版本单独处理 if (appVersion >= "3.4.5") { //针对3.4.5之后的版本单独处理
//如果本条记录为ACTION_ACTIVITY_RESUME类型或者ACTION_HEART类型,则入库 //如果本条记录为ACTION_ACTIVITY_RESUME类型或者ACTION_HEART类型,则入库
if ("ACTION_ACTIVITY_RESUME".equals(action) || "ACTION_HEART".equals(actionType) ) { if ("ACTION_ACTIVITY_RESUME".equals(action) || "ACTION_HEART".equals(actionType) ||"ACTION_WEB_ENTER".equals(action)) {//有些h5页面没有对应的ACTION_ACTIVITY_RESUME,需要补进来
needPut = true needPut = true
}else{ }else{
needPut = false needPut = false
...@@ -369,17 +371,6 @@ class SessionProcessTerm { ...@@ -369,17 +371,6 @@ class SessionProcessTerm {
if(needPut){ if(needPut){
menuBeginTime = createdTime menuBeginTime = createdTime
} }
// /*
// 排除掉冗余数据:
// 1.如果上条埋点和本条埋点都是ACTION_HEART_BEAT类数据,则本条记录不在统计进来
// 2.如果本条和上条记录都为ACTION_VIEW类型,且menu不变,则本条记录不入库
// */
// if ( thisMenuCode.equals(menuCode) && ("ACTION_HEART"==prefActionType || "ACTION_VIEW" == prefActionType) && !isSessionNew ) {
// needPut = false
// }else{
// needPut = true
// menuBeginTime = createdTime
// }
} }
if (!thisMenuCode.equals(menuCode) || isSessionNew) { if (!thisMenuCode.equals(menuCode) || isSessionNew) {
menuBeginTime = createdTime menuBeginTime = createdTime
......
package com.utils package com.utils
import com.session.{SessionMenuCalc, SessionProcess, SessionProcessHeart, SessionProcessPath, SessionProcessPref, SessionProcessTerm} import com.session.{SessionMenuCalc, SessionMenuCalcNew, SessionProcess, SessionProcessHeart, SessionProcessPath, SessionProcessPref, SessionProcessTerm}
import org.apache.hadoop.util.ProgramDriver import org.apache.hadoop.util.ProgramDriver
/** /**
...@@ -17,6 +17,7 @@ object Driver { ...@@ -17,6 +17,7 @@ object Driver {
driver.addClass("SessionMenuCalc",classOf[SessionMenuCalc],"传递日期参数--用户Session数据分析导入到dw_fact_log_session_menu_calc表") driver.addClass("SessionMenuCalc",classOf[SessionMenuCalc],"传递日期参数--用户Session数据分析导入到dw_fact_log_session_menu_calc表")
driver.addClass("SessionProcessPref",classOf[SessionProcessPref],"传递日期参数--用户Session数据etl导入到dw_fact_log_sesson_pref表") driver.addClass("SessionProcessPref",classOf[SessionProcessPref],"传递日期参数--用户Session数据etl导入到dw_fact_log_sesson_pref表")
driver.addClass("SessionProcessTerm",classOf[SessionProcessTerm],"传递日期参数--用户Session数据etl导入到dw_fact_log_sesson_term表") driver.addClass("SessionProcessTerm",classOf[SessionProcessTerm],"传递日期参数--用户Session数据etl导入到dw_fact_log_sesson_term表")
driver.addClass("SessionMenuCalcNew",classOf[SessionMenuCalcNew],"传递日期参数--用户Session数据分析导入到dw_fact_log_session_menu_calc_new表")
driver.run(args) driver.run(args)
} }
} }
Markdown 格式
0% or
您添加了 0 到此讨论。请谨慎行事。
先完成此消息的编辑!
想要评论请 注册