提交 cf1936af 编写于 作者: wuyunfeng's avatar wuyunfeng

新增两个流量分析表

上级 9c500371
......@@ -11,6 +11,7 @@ object MyConfigSession {
//Hive的DW层流量表
final val HIVE_TABLE1: String = "pica_dw.dw_fact_log_session"
final val HIVE_TABLE2: String = "pica_dw.dw_fact_log_session_path"
final val HIVE_TABLE3: String = "pica_dw.dw_fact_log_session_heart"
//写入的文件路径
final val PARQUET_PATH: String = "hdfs://bi-name1:8020/tmp/output/"
......@@ -24,10 +25,12 @@ object MyConfigSession {
//流量表中使用的三个字典表作为过滤数据条件
final val ACTION_TYPE_SQL: String = "select action_type,'1' as is_valid from pica_dw.dw_dim_log_action_type where is_valid=1"
final val ACTION_TYPE_SQL_HEART: String = "select action_type,'1' as is_valid from pica_dw.dw_dim_log_action_type where action_type='ACTION_HEART_BEAT'"
final val CLASS_NAME_SQL: String = "select class_name, '0' as is_valid from pica_dw.dw_dim_log_class_name where is_valid=0"
final val MENU_CODE_SQL: String = "select view_path, menu_code from pica_dw.dw_dim_log_menu_class_code where view_path is not Null"
//流量表中根据action_type获取对应的action_category类型
final val ACTION_CATEGORY_SQL: String = "select action_type,action_category from pica_dw.dw_dim_log_action_type where is_valid=1"
final val ACTION_CATEGORY_SQL_HEART: String = "select action_type,action_category from pica_dw.dw_dim_log_action_type where action_type='ACTION_HEART_BEAT'"
......
package com.session
import com.utils.UseUtil
import org.apache.spark.SparkConf
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
/**
* 处理结果存入 pica_dw.dw_fact_log_session_menu_calc
* @Author yunfeng.wu
* @Date 2020/06/12 10:23
* @Version 1.0
*/
class SessionMenuCalc extends Serializable{
def getSparkSession(appName: String): SparkSession = {
val conf: SparkConf = new SparkConf().setAppName(appName)
UseUtil.setConfigure(conf)
val sparkSession: SparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
sparkSession
}
val sparkSession: SparkSession = getSparkSession("SessionMenuCalc")
def handleByMcPart1(spark: SparkSession,createdDay:String) = {
var df = spark.sql("select cast(user_id as string),session_id,created_time,date_time, menu_code,refer_menu_code,action_code,refer_time_diff " +
s"from pica_dw.dw_fact_log_session_path where created_day='${createdDay}' and menu_code!='200' ")//and user_id='1000000186'
val groupRdd = df.rdd.groupBy(row => row.getAs[String]("user_id") + "_" + row.getAs[String]("session_id"))
val resRdd = groupRdd.flatMap(g => {
val user_session_id: String = g._1
val user_id = user_session_id.split("_")(0)
val session_id = user_session_id.split("_")(1)
var rowList: Iterable[Row] = g._2
//定义一个累加量
var before_menu = ""
var this_menu = ""
var series = 1
val list = new ListBuffer[Row]() //[(String,String,String,Integer,String,String,String)]
var mc_during_map = Map[String, Integer]()
var mc_time_map = Map[String, ArrayBuffer[String]]()
rowList = rowList.toList.sortBy(_.getAs[String]("created_time"))
rowList.foreach(row => {
this_menu = row.getAs[String]("menu_code")
val refer_time_diff = row.getAs[Integer]("refer_time_diff")
val created_time = row.getAs[String]("created_time")
var key = this_menu + "_" + series
if ("".equals(before_menu) || this_menu.equals(before_menu)) {
var sum_during: Integer = mc_during_map.getOrElse(key, 0)
mc_during_map.+=(key -> (refer_time_diff + sum_during))
before_menu = this_menu
} else {
series += 1
key = this_menu + "_" + series
var sum_during: Integer = mc_during_map.getOrElse(key, 0)
mc_during_map.+=(key -> (refer_time_diff + sum_during))
before_menu = this_menu
}
var time_arr: ArrayBuffer[String] = mc_time_map.getOrElse(key, new ArrayBuffer[String]())
time_arr += (created_time)
mc_time_map.+=(key -> time_arr)
})
mc_during_map.foreach(kv => {
val ar: ArrayBuffer[String] = mc_time_map.getOrElse(kv._1, new ArrayBuffer[String]())
// println((user_id, session_id, kv._1, kv._2, kv._1.split("_")(0), ar.toArray.min, ar.toArray.max))
list.append(Row(user_id, session_id, kv._1, kv._2, kv._1.split("_")(0),"", ar.toArray.min, ar.toArray.max))
})
list.toList
})
resRdd
}
def handleByMcPart2(spark: SparkSession, createdDay: String) = {
var df = spark.sql("select cast(user_id as string),session_id,created_time,date_time, menu_code,refer_menu_code,action_code,refer_time_diff " +
s"from pica_dw.dw_fact_log_session_path where created_day='${createdDay}' and menu_code ='200' ")
val groupRdd = df.rdd.groupBy(row => row.getAs[String]("user_id") + "_" + row.getAs[String]("session_id"))
val resRdd = groupRdd.flatMap(g => {
val user_session_id: String = g._1
val user_id = user_session_id.split("_")(0)
val session_id = user_session_id.split("_")(1)
var rowList: Iterable[Row] = g._2
//定义一个累加量
var before_action = ""
var this_action = ""
var series = 1
val list = new ListBuffer[Row]() //[(String,String,String,Integer,String,String,String)]
var ac_during_map = Map[String, Integer]()
var ac_time_map = Map[String, ArrayBuffer[String]]()
rowList = rowList.toList.sortBy(_.getAs[String]("created_time"))
rowList.foreach(row => {
this_action = row.getAs[String]("action_code")
val refer_time_diff = row.getAs[Integer]("refer_time_diff")
val created_time = row.getAs[String]("created_time")
var key = this_action + "_" + series
if ("".equals(before_action) || this_action.equals(before_action)) {
var sum_during: Integer = ac_during_map.getOrElse(key, 0)
ac_during_map.+=(key -> (refer_time_diff + sum_during))
before_action = this_action
} else {
series += 1
key = this_action + "_" + series
var sum_during: Integer = ac_during_map.getOrElse(key, 0)
ac_during_map.+=(key -> (refer_time_diff + sum_during))
before_action = this_action
}
var time_arr: ArrayBuffer[String] = ac_time_map.getOrElse(key, new ArrayBuffer[String]())
time_arr += (created_time)
ac_time_map.+=(key -> time_arr)
})
ac_during_map.foreach(kv => {
val ar: ArrayBuffer[String] = ac_time_map.getOrElse(kv._1, new ArrayBuffer[String]())
list.append(Row(user_id, session_id, "200_0", kv._2,"200", kv._1.split("_")(0), ar.toArray.min, ar.toArray.max))
})
list.toList
})
resRdd
}
}
object SessionMenuCalc {
def apply(): SessionMenuCalc = new SessionMenuCalc()
def main(args: Array[String]): Unit = {
if (args.length < 2) {
System.err.println("Usage: SessionMenuCalc <dbTable> <createdDay>")
System.exit(1)
}
val dbTable = args.apply(0)
val createdDay = args.apply(1)
println(s"dbTable:${dbTable},createdDay:${createdDay}")
val sessionMenuCalc: SessionMenuCalc = SessionMenuCalc()
val resRdd1 = sessionMenuCalc.handleByMcPart1(sessionMenuCalc.sparkSession,createdDay)
val resRdd2 = sessionMenuCalc.handleByMcPart2(sessionMenuCalc.sparkSession,createdDay)
val resRdd = resRdd1.union(resRdd2)
resRdd.take(20)
val resDf = sessionMenuCalc.sparkSession.createDataFrame(resRdd, StructType(
List(StructField("user_id", StringType, false),
StructField("session_id", StringType, false),
StructField("menu_code_term", StringType, false),
StructField("during_by_refer", IntegerType, false),
StructField("menu_code", StringType, false),
StructField("action_code", StringType, false),
StructField("begin_time", StringType, false),
StructField("end_time", StringType, false))
))
resDf.printSchema()
resDf.createOrReplaceTempView("session_menu_view_calc")
sessionMenuCalc.sparkSession.sql(s"insert overwrite table ${dbTable} partition(created_day='${createdDay}') " +
s"select cast(user_id as int) user_id,session_id,menu_code_term,during_by_refer,menu_code,action_code,begin_time,end_time from session_menu_view_calc")
sessionMenuCalc.sparkSession.close()
}
}
package com.session
import java.sql
import java.sql.PreparedStatement
import com.config.MyConfigSession
import com.pica.utils.{DateUtils, StringUtils}
import com.utils.{JDBCUtil, UseUtil}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.expressions.{Window, WindowSpec}
import org.apache.spark.sql.functions.lag
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.storage.StorageLevel
import scala.collection.mutable.ListBuffer
/**
*
* 处理埋点流量表,导入到DW层pica_dw.dw_fact_log_session_heart
*
* @Author yunfeng.wu
* @Date 2020/06/10 10:23
* @Version 1.0
*/
class SessionProcessHeart extends java.io.Serializable{
def getSparkSession(appName: String): SparkSession = {
val conf: SparkConf = new SparkConf().setAppName(appName)
UseUtil.setConfigure(conf)
val sparkSession: SparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
sparkSession
}
val sparkSession: SparkSession = getSparkSession("SessionProcessHeart")
//获取符合要求的actionType广播变量
val actionTypeBroad =
UseUtil.getBroadcast(sparkSession, MyConfigSession.ACTION_TYPE_SQL_HEART, "action_type", "is_valid")
//获取clasName广播变量
val classNameBroad =
UseUtil.getBroadcast(sparkSession, MyConfigSession.CLASS_NAME_SQL, "class_name", "is_valid")
//获取menu_code广播变量
val menuCodeBroad =
UseUtil.getBroadcast(sparkSession, MyConfigSession.MENU_CODE_SQL, "view_path", "menu_code")
//获取actionCategory变量
val actionCategory =
UseUtil.getBroadcast(sparkSession,MyConfigSession.ACTION_CATEGORY_SQL_HEART,"action_type","action_category")
//定义函数式变量,过滤映射表数据
val filterRows: Iterator[Row] => Iterator[Row] = (rows: Iterator[Row]) => {
val rowList: ListBuffer[Row] = new ListBuffer[Row]()
val actionTypeMap: Map[String, String] = actionTypeBroad.value
val classNameMap: Map[String, String] = classNameBroad.value
//关联到action_category的映射表广播变量
val actionCategoryMap: Map[String, String] = actionCategory.value
rows.toList.foreach(row => {
//筛选action的条件
val action: String = StringUtils.getNotNullString(row.getAs[String]("action"))
//说明该action类型即为所要的
if (actionTypeMap.getOrElse(action, "-1").equals("1")) {
//将action转化为映射表中对应的标准actionCategory
val action_type: String = actionCategoryMap.getOrElse(action,"ACTION")
// val action_type = UseUtil.getActionType(action)
//action为其中的任何一个
if (action_type.equals("ACTION_CLICK") || action_type.equals("ACTION_EXPOSE")) {
//判断 component_tag 必须要包含 "#"
if (row.getAs[String]("component_tag") != null
&& row.getAs[String]("component_tag").contains("#")) {
rowList += row
}
} else if (action_type.equals("ACTION_VIEW")) {
rowList += row
//非上述三种action_type,那么需要过滤掉映射表中class_name为"0"对应的那些数据
} else if (row.getAs[String]("class_name") != null
&& !classNameMap.getOrElse(row.getAs[String]("class_name"), "-1").equals("0")) {
rowList += row
}
}
})
rowList.iterator
}
//处理字段,得到需要的字段值
val processColumns = (rows: Iterator[Row]) => {
val baseList = new ListBuffer[(String, String, String, String, String, String, String, String, String,
String, String, String, String, String, String, String, String, String, String, String, String)]()
//关联到menu_code的映射表广播变量
val path_menu: Map[String, String] = menuCodeBroad.value
//关联到action_category的映射表广播变量
val actionCategoryMap: Map[String, String] = actionCategory.value
rows.toList.foreach(row => {
//1.获取网络类型
//2G,3G,4G,2G/3G/4G,WIFI,WLAN,或者为空字符串
val net_type = UseUtil.netTypeMatch(StringUtils.getNotNullString(row.getAs[String]("network_type")))
//2.修改action类型
var action_type: String = ""
if (row.getAs[String]("action") !=null) {
action_type = actionCategoryMap.getOrElse(row.getAs[String]("action"),"ACTION")
}
//3.拆分 component_tag字段
val component_tag: String = StringUtils.getNotNullString(row.getAs[String]("component_tag"))
var menu_code: String = ""
var action_code: String = ""
var position: String = ""
var label_value: String = ""
//将符合要求的component_tag进行切割,获取 aciton_code,label_value
if (component_tag.contains("#")) {
//按照#号切割
val strs: Array[String] = component_tag.split("#")
strs.length match {
case 1 => {
menu_code = strs(0)
}
case 2 => {
menu_code = strs(0)
action_code = strs(1)
}
case 3 => {
menu_code = strs(0)
action_code = strs(1)
position = strs(2)
}
case _ => {
menu_code = strs(0)
action_code = strs(1)
position = strs(2)
label_value = strs(3).substring(0,math.min(250,strs(3).length))
}
}
}
//匹配menu_code:如果上述截取出来的menu_code为'',null或者action is ACTION_VIEW
if (menu_code.equals("")|| menu_code.equals("null") || action_type.equals("ACTION_VIEW")) {
import scala.util.control.Breaks._
breakable {
//利用menu_code映射表匹配
for (tuple <- path_menu) {
//源数据view_path的字符串包含映射表view_path的字符串
if (StringUtils.getNotNullString(row.getAs[String]("view_path")).contains(tuple._1)) {
//满足条件后,修改源数据的menu_code
menu_code = tuple._2
println("--------------------menu_code match successfully-----------------------")
//结束遍历
break()
}
}
//经过上述匹配,如果menu_code仍然为空串,那么置为component_tag字段一样
if (menu_code.equals("")) {
menu_code = component_tag
}
}
}
//一行数据添加到List中
baseList += ((StringUtils.getNotNullString(row.getAs[String]("pseudo_session")),
StringUtils.getNotNullString(row.getAs[String]("doctor_id")),
StringUtils.getNotNullString(row.getAs[String]("mobile")),
StringUtils.getNotNullString(row.getAs[String]("device_token")),
StringUtils.getNotNullString(row.getAs[String]("user_token_tourist")),
StringUtils.getNotNullString(row.getAs[String]("class_name")),
StringUtils.getNotNullString(row.getAs[String]("view_path")),
action_type, component_tag, menu_code, action_code, position, label_value,
StringUtils.getNotNullString(row.getAs[String]("app_version")),
StringUtils.getNotNullString(row.getAs[String]("device_type")),
StringUtils.getNotNullString(row.getAs[String]("device_brand")),
StringUtils.getNotNullString(row.getAs[String]("device_model")),
"", net_type,
StringUtils.getNotNullString(row.getAs[String]("created")),
DateUtils.milliSecondsFormatTime(StringUtils.getNotNullString(row.getAs[String]("created")))))
})
baseList.iterator
}
//按照time_gap 切割session,计算session_id
val computeSessionId = (tuple: (String, Iterable[Row])) => {
//先按照 pseudo_session 的值命名 sessionID
val sessionID: String = tuple._1
val rowList: Iterable[Row] = tuple._2
//定义一个累加量
var count: Int = 0
//存储一行的数据
val list = new ListBuffer[(String, String, String, String, String, String, String, String, String,
String, String, String, String, String, String, String, String, String, String, String, String)]()
rowList.toList.foreach(row => {
val created: String = StringUtils.getNotNullString(row.getAs[String]("created_time"))
val refer_created: String = StringUtils.getNotNullString(row.getAs[String]("refer_created"))
val time_diff: Long = created.toLong - refer_created.toLong
//相邻的时间差小于等于30分钟,就是同一个 sessionID
if (time_diff > MyConfigSession.SESSION_GAP) {
count = count + 1
}
//添加到List
list += ((sessionID + count,
StringUtils.getNotNullString(row.getAs[String]("user_id")),
StringUtils.getNotNullString(row.getAs[String]("mobile")),
StringUtils.getNotNullString(row.getAs[String]("device_token")),
StringUtils.getNotNullString(row.getAs[String]("user_token")),
StringUtils.getNotNullString(row.getAs[String]("view_class")),
StringUtils.getNotNullString(row.getAs[String]("view_path")),
StringUtils.getNotNullString(row.getAs[String]("action_type")),
StringUtils.getNotNullString(row.getAs[String]("component_tag")),
StringUtils.getNotNullString(row.getAs[String]("menu_code")),
StringUtils.getNotNullString(row.getAs[String]("action_code")),
StringUtils.getNotNullString(row.getAs[String]("position")),
StringUtils.getNotNullString(row.getAs[String]("label_value")),
StringUtils.getNotNullString(row.getAs[String]("app_version")),
StringUtils.getNotNullString(row.getAs[String]("device_type")),
StringUtils.getNotNullString(row.getAs[String]("device_brand")),
StringUtils.getNotNullString(row.getAs[String]("device_model")),
StringUtils.getNotNullString(row.getAs[String]("device_system")),
StringUtils.getNotNullString(row.getAs[String]("net_type")),
created, StringUtils.getNotNullString(row.getAs[String]("date_time"))))
})
list
}
}
object SessionProcessHeart {
def apply(): SessionProcessHeart = new SessionProcessHeart()
def main(args: Array[String]): Unit = {
if(args.length<1){
System.err.println("Usage: SessionProcessHeart <yyyy-MM-dd>")
System.exit(1)
}
//1.执行任务之前先往record表记录
val insertSQL: String =
s"""
|insert into ${MyConfigSession.DATA_BASE}.${MyConfigSession.JDBC_TABLE} (job_id,job_name,job_type,job_scn,status,start_time)
|values(1969,'pica_dw.dw_fact_log_session','3',?,'0',?)
""".stripMargin
//设置同步数据的批次号,格式是2019-09-12
val scnDate: String = args(0)
//设置任务开始时间,格式是2019-09-12 14:03:30
val startTime: String = DateUtils.getTodayTime
//存储SQL中的参数
val insertArr: Array[String] = Array[String](scnDate, startTime)
//获取MYSQL连接
val connSql: sql.Connection = JDBCUtil.getConnection()
//向 record 表插入数据
val flag: Int = JDBCUtil.insertRecord(connSql, insertSQL, insertArr)
try {
val sessionProcessHeart: SessionProcessHeart = SessionProcessHeart()
//获取源数据
val sourceDF: DataFrame = sessionProcessHeart.sparkSession.sql(MyConfigSession.SOURCE_SQL_ARGS + s" and created_day='${scnDate}'")
//1.重新分区,产生shuffle,Spark读Hive默认的分区数太少
//2.过滤数据,解析创建时间,只获取昨天产生的数据
//3.过滤重复的记录
//4.利用action_type和class_name过滤
val filterDS: Dataset[Row] = sourceDF.repartition(200).filter(row => {
var createdTime: String = row.getAs[String]("created")
//防止出错
if (createdTime == null) {
createdTime = "0"
}
//注意这里的过滤条件,数据的批次时间要和数据产生的年月日一样,也就是当天的数据
scnDate.equals(DateUtils.milliSecondsFormatTime(createdTime).substring(0, 10))
}).distinct()
import sessionProcessHeart.sparkSession.implicits._
//根据映射表来进行action_type和class_name数据过滤
val data: RDD[Row] = filterDS.rdd.mapPartitions(sessionProcessHeart.filterRows)
println("---------------------------------------process columns-------------------------------------------")
val baseDF: DataFrame = data.mapPartitions(sessionProcessHeart.processColumns)
.toDF("pseudo_session", "user_id", "mobile", "device_token", "user_token", "view_class", "view_path",
"action_type", "component_tag", "menu_code", "action_code", "position", "label_value", "app_version",
"device_type", "device_brand", "device_model", "device_system", "net_type", "created_time", "date_time")
println("----------------------------------compute session id---------------------------------------------")
val sessionIdDF: DataFrame = getSessionId(baseDF,sessionProcessHeart)
//默认缓存级别是:MEMORY_AND_DISK
sessionIdDF.persist(StorageLevel.MEMORY_AND_DISK_SER)
println("-------------------------------match user_id 逻辑-------------------------------------------------")
val dwFactLogSession: DataFrame = matchUserId(sessionIdDF,sessionProcessHeart.sparkSession,scnDate)
println("-----------------create view fact_log_session and load to dw_fact_log_session--------------------")
dwFactLogSession.createOrReplaceTempView("fact_log_session")
//根据session_id以及user_id分组取最后一次心跳记录数据进行入库
val loadDataSql =
s"""
insert overwrite table ${MyConfigSession.HIVE_TABLE3} partition(created_day='${scnDate}')
select a.session_id,cast(a.user_id as int) user_id,a.mobile,a.device_token,a.user_token,
|a.app_version,a.device_type,a.device_brand,a.device_model,a.date_time
|from fact_log_session a,
| (select b.user_id, b.session_id , min(b.created_time) min_ct, max(b.created_time) max_ct
| from fact_log_session b
| group by b.user_id, b.session_id ) c
|where a.user_id = c.user_id and a.session_id = c.session_id and a.created_time = c.max_ct
|distribute by rand()
|""".stripMargin
sessionProcessHeart.sparkSession.sql(loadDataSql)
println("----------------------------------update task record table---------------------------------------")
//任务执行成功,更新 Mysql record 配置表
val updateSQL: String =
s"""
|update ${MyConfigSession.JDBC_TABLE} set status=?,end_time=?,data_count=? where job_id=1969 and start_time='${startTime}'
""".stripMargin
val endTime: String = DateUtils.getTodayTime
val upreSta: PreparedStatement = connSql.prepareStatement(updateSQL)
upreSta.setString(1, "1")
upreSta.setString(2, endTime)
upreSta.setInt(3, sessionIdDF.count().toInt)
//更新表数据
upreSta.executeUpdate()
//关闭连接
JDBCUtil.close(connSql, upreSta)
sessionProcessHeart.sparkSession.stop()
} catch {
case e: Exception => {
println("-----------------------------------任务异常---------------------------------------------------")
val exceptionSQL: String =
s"""
|update ${MyConfigSession.JDBC_TABLE} set status=?,exception=?,end_time=? where job_id=1969 and start_time='${startTime}'
""".stripMargin
val errorArr = Array[String]("2", e.getMessage, DateUtils.getTodayTime)
JDBCUtil.insertRecord(connSql, exceptionSQL, errorArr)
connSql.close()
}
}
}
/**
* @Description 按照TimeGap切割session,重命名session_id
* @param dataFrame 要处理的DataFrame
* @param sessionProcessHeart SessionProcess对象,包含SparkSession 环境
* @return org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>
**/
def getSessionId(dataFrame: DataFrame,sessionProcessHeart: SessionProcessHeart):DataFrame = {
import sessionProcessHeart.sparkSession.implicits._
//先按照 pseudo_session 分组,然后按照 created 排序,组件一个窗口
val pSessionWinSpec: WindowSpec = Window.partitionBy("pseudo_session").orderBy("created_time")
//增加一个字段 refer_created ,这个字段的值是上一条记录 created 字段的值,方便后面通过两者差值计算出 session_id
val rcreDF: DataFrame =
dataFrame.withColumn("refer_created", lag(dataFrame("created_time"), 1).over(pSessionWinSpec))
//执行COALESCE,目的是为了去掉 refer_created 为Null的值
val coalesceDF: DataFrame = rcreDF.selectExpr(
"pseudo_session", "user_id", "mobile", "device_token",
"user_token", "view_class", "view_path", "action_type",
"component_tag", "menu_code", "action_code", "position", "label_value",
"app_version", "device_type", "device_brand", "device_model",
"device_system", "net_type", "created_time", "date_time",
"COALESCE(refer_created,created_time) as refer_created")
//按照 pseudo_session聚合,计算 session_id
val groupRDD: RDD[(String, Iterable[Row])] =
coalesceDF.rdd.groupBy(row => row.getAs[String]("pseudo_session"))
//计算两者之差,这时候就得到了 session_id
val sessionIdDF: DataFrame = groupRDD.map(sessionProcessHeart.computeSessionId).flatMap(it => it)
.toDF("session_id", "user_id", "mobile", "device_token", "user_token",
"view_class", "view_path", "action_type", "component_tag",
"menu_code", "action_code", "position", "label_value",
"app_version", "device_type", "device_brand", "device_model",
"device_system", "net_type", "created_time", "date_time")
sessionIdDF
}
/**
* @Description 匹配user_id,补全数据中的user_id字段
* @param dataFrame 筛选后的数据
* @param sparkSQLSession SparkSession 环境
* @param created_day 当前数据的日期,格式 "2020-03-01"
* @return org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>
**/
def matchUserId(dataFrame: DataFrame,sparkSQLSession: SparkSession,created_day:String):DataFrame={
//追加:将dataFrame与pica_ds.pica_doctor根据user_id进行匹配,匹配不上的user_id置为'0'
println("matchUserId开始执行-----------------------------------")
dataFrame.createOrReplaceTempView(MyConfigSession.VIEW_SESSION_ODS)
val DF = sparkSQLSession.sql(MyConfigSession.INIT_USER_ID_SQL)
//以下的所有逻辑是为了补全user_id字段
//第一步:首先筛选出不符合的use_id数据,将这些user_id置为字符串'0'
val noMatchUserIdDF: Dataset[Row] = DF.where("user_id ='' OR user_id = '0' OR LENGTH(user_id) = 24")
.selectExpr("session_id","'0' as user_id", "mobile", "device_token", "user_token",
"view_class", "view_path", "action_type", "component_tag",
"menu_code", "action_code", "position", "label_value",
"app_version", "device_type", "device_brand", "device_model",
"device_system", "net_type", "created_time", "date_time")
noMatchUserIdDF.createOrReplaceTempView(MyConfigSession.VIEW_SESSION_NO_MATCH)
//使用临时表equipment,筛选出为1的那条最新数据
var equipmentSql = MyConfigSession.EQUIPMENT_INFO_SQL_ARGS + s"'${created_day}'"
if(DateUtils.getYesterdayDate.equals(created_day)){
equipmentSql = MyConfigSession.EQUIPMENT_INFO_SQL
}
println(s"执行equipmentSql==>${equipmentSql}")
val equipmentDF: DataFrame = sparkSQLSession.sql(equipmentSql).where("row_d =1")
equipmentDF.createOrReplaceTempView(MyConfigSession.VIEW_EQUIPMENT_INFO)
//1.将第一步筛选出来的数据先按照device_token进行匹配,获得user_id
val deviceTokenDF: DataFrame = sparkSQLSession.sql(MyConfigSession.DEVICE_TOKEN_SQL)
deviceTokenDF.createOrReplaceTempView(MyConfigSession.VIEW_DEVICE_TOKEN)
//2.筛选出上一步没有匹配到的user_id,再按照mobile_phone进行匹配
val mobilePhoneDF: DataFrame = sparkSQLSession.sql(MyConfigSession.MOBILE_PHONE_SQL)
//3.将上述三者union,最终导入表中的数据
val deviceToken: Dataset[Row] = deviceTokenDF.where("user_id !='0'")
val rightUserId: Dataset[Row] = DF.where("user_id !='' and user_id != '0' and LENGTH(user_id) !=24")
val dwFactLogSession: Dataset[Row] = rightUserId.union(deviceToken).union(mobilePhoneDF)
dwFactLogSession
}
}
package com.utils
import com.session.{SessionProcess, SessionProcessArgs, SessionProcessHistoryPathArgs, SessionProcessPath, SessionProcessPathArgs}
import com.session.{SessionMenuCalc, SessionProcess, SessionProcessArgs, SessionProcessHeart, SessionProcessHistoryPathArgs, SessionProcessPath, SessionProcessPathArgs}
import org.apache.hadoop.util.ProgramDriver
/**
......@@ -16,6 +16,8 @@ object Driver {
driver.addClass("SessionProcessArgs",classOf[SessionProcessArgs],"传递日期参数--用户Session数据分析导入到dw_fact_log_session表")
driver.addClass("SessionProcessPath",classOf[SessionProcessPath],"用户Session数据分析导入到dw_fact_log_session_path表")
driver.addClass("SessionProcessPathArgs",classOf[SessionProcessPathArgs],"传递日期参数--用户Session数据分析导入到dw_fact_log_session_path表")
driver.addClass("SessionProcessHeart",classOf[SessionProcessHeart],"用户Session数据分析导入到dw_fact_log_session_heart表")
driver.addClass("SessionMenuCalc",classOf[SessionMenuCalc],"传递日期参数--用户Session数据分析导入到dw_fact_log_session_menu_calc表")
driver.run(args)
}
}
Markdown 格式
0% or
您添加了 0 到此讨论。请谨慎行事。
先完成此消息的编辑!
想要评论请 注册