提交 8a4ea1ce 编写于 作者: wuyunfeng's avatar wuyunfeng

删除无用job

上级 17d9e537
package com.session
import com.utils.UseUtil
import java.sql
import java.sql.PreparedStatement
import com.config.MyConfigSession
import com.pica.utils.DateUtils
import com.utils.{JDBCUtil, UseUtil}
import org.apache.spark.SparkConf
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}
......@@ -126,30 +131,72 @@ object SessionMenuCalc {
System.err.println("Usage: SessionMenuCalc <dbTable> <createdDay>")
System.exit(1)
}
//1.执行任务之前先往record表记录
val insertSQL: String =
s"""
|insert into ${MyConfigSession.DATA_BASE}.${MyConfigSession.JDBC_TABLE} (job_id,job_name,job_type,job_scn,status,start_time)
|values(0,'pica_dw.dw_fact_log_session_menu_calc','3',?,'0',?)
""".stripMargin
val dbTable = args.apply(0)
val createdDay = args.apply(1)
println(s"dbTable:${dbTable},createdDay:${createdDay}")
//设置任务开始时间,格式是2019-09-12 14:03:30
val startTime: String = DateUtils.getTodayTime
//存储SQL中的参数
val insertArr: Array[String] = Array[String](createdDay, startTime)
//获取MYSQL连接
val connSql: sql.Connection = JDBCUtil.getConnection()
//向 record 表插入数据
val flag: Int = JDBCUtil.insertRecord(connSql, insertSQL, insertArr)
try {
val sessionMenuCalc: SessionMenuCalc = SessionMenuCalc()
val resRdd1 = sessionMenuCalc.handleByMcPart1(sessionMenuCalc.sparkSession,createdDay)
val resRdd2 = sessionMenuCalc.handleByMcPart2(sessionMenuCalc.sparkSession,createdDay)
val resRdd = resRdd1.union(resRdd2)
resRdd.take(20)
val resDf = sessionMenuCalc.sparkSession.createDataFrame(resRdd, StructType(
List(StructField("user_id", StringType, false),
StructField("session_id", StringType, false),
StructField("menu_code_term", StringType, false),
StructField("during_by_refer", IntegerType, false),
StructField("menu_code", StringType, false),
StructField("action_code", StringType, false),
StructField("begin_time", StringType, false),
StructField("end_time", StringType, false))
))
resDf.printSchema()
resDf.createOrReplaceTempView("session_menu_view_calc")
sessionMenuCalc.sparkSession.sql(s"insert overwrite table ${dbTable} partition(created_day='${createdDay}') " +
s"select cast(user_id as int) user_id,session_id,menu_code_term,during_by_refer,menu_code,action_code,begin_time,end_time from session_menu_view_calc")
sessionMenuCalc.sparkSession.close()
val sessionMenuCalc: SessionMenuCalc = SessionMenuCalc()
val resRdd1 = sessionMenuCalc.handleByMcPart1(sessionMenuCalc.sparkSession,createdDay)
val resRdd2 = sessionMenuCalc.handleByMcPart2(sessionMenuCalc.sparkSession,createdDay)
val resRdd = resRdd1.union(resRdd2)
resRdd.take(20)
val resDf = sessionMenuCalc.sparkSession.createDataFrame(resRdd, StructType(
List(StructField("user_id", StringType, false),
StructField("session_id", StringType, false),
StructField("menu_code_term", StringType, false),
StructField("during_by_refer", IntegerType, false),
StructField("menu_code", StringType, false),
StructField("action_code", StringType, false),
StructField("begin_time", StringType, false),
StructField("end_time", StringType, false))
))
resDf.printSchema()
resDf.createOrReplaceTempView("session_menu_view_calc")
sessionMenuCalc.sparkSession.sql(s"insert overwrite table ${dbTable} partition(created_day='${createdDay}') " +
s"select cast(user_id as int) user_id,session_id,menu_code_term,during_by_refer,menu_code,action_code,begin_time,end_time from session_menu_view_calc")
sessionMenuCalc.sparkSession.close()
println("----------------------------------update task record table---------------------------------------")
//任务执行成功,更新 Mysql record 配置表
val updateSQL: String =s"update ${MyConfigSession.JDBC_TABLE} set status=?,end_time=?,data_count=? where job_name='pica_dw.dw_fact_log_session_menu_calc' and start_time='${startTime}'"
val endTime: String = DateUtils.getTodayTime
val upreSta: PreparedStatement = connSql.prepareStatement(updateSQL)
upreSta.setString(1, "1")
upreSta.setString(2, endTime)
upreSta.setInt(3, resDf.count().toInt)
//更新表数据
upreSta.executeUpdate()
//关闭连接
JDBCUtil.close(connSql, upreSta)
sessionMenuCalc.sparkSession.stop()
} catch {
case e: Exception => {
println(s"-----------------------------------任务异常:e=${e}---------------------------------------------------")
e.printStackTrace()
val exceptionSQL: String =
s"""
|update ${MyConfigSession.JDBC_TABLE} set status=?,exception=?,end_time=? where job_name='pica_dw.dw_fact_log_session_menu_calc' and start_time='${startTime}'
""".stripMargin
val errorArr = Array[String]("2", e.getMessage, DateUtils.getTodayTime)
JDBCUtil.insertRecord(connSql, exceptionSQL, errorArr)
connSql.close()
}
}
}
}
package com.session
import java.sql
import java.sql.PreparedStatement
import com.config.MyConfigSession
import com.pica.utils.{DateUtils, StringUtils}
import com.utils.{JDBCUtil, UseUtil}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.expressions.{Window, WindowSpec}
import org.apache.spark.sql.functions.lag
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.storage.StorageLevel
import scala.collection.mutable.ListBuffer
/**
* @Author zhenxin.ma
* @Date 2020/4/1 15:26
* @Version 1.0
*/
object SessionProcessArgs {
def apply(): SessionProcessArgs = new SessionProcessArgs()
def main(args: Array[String]): Unit = {
//1.执行任务之前先往record表记录
val insertSQL: String =
s"""
|insert into ${MyConfigSession.DATA_BASE}.${MyConfigSession.JDBC_TABLE} (job_id,job_name,job_type,job_scn,status,start_time)
|values(1969,'pica_dw.dw_fact_log_session','3',?,'0',?)
""".stripMargin
//设置同步数据的批次号,格式是2019-09-12
val scnData: String = args(0)
//设置任务开始时间,格式是2019-09-12 14:03:30
val startTime: String = DateUtils.getTodayTime
//存储SQL中的参数
val insertArr: Array[String] = Array[String](scnData, startTime)
//获取MYSQL连接
val connSql: sql.Connection = JDBCUtil.getConnection()
//向 record 表插入数据
val flag: Int = JDBCUtil.insertRecord(connSql, insertSQL, insertArr)
try {
val sessionProcess: SessionProcessArgs = SessionProcessArgs()
// 获取源数据,注意这里指定了日期参数,
// step1:获取源数据,重新分区,产生shuffle,Spark读Hive默认的分区数太少,并对数据去重
val sourceDF: DataFrame = sessionProcess.sparkSession.sql(MyConfigSession.SOURCE_SQL_ARGS + s" and created_day='${args(0)}'").distinct()
//step2:抽取出当天pseudo_session对应的非空的device_token,doctor_id,mobile,补充到对应的pseudo_session下这几项为空的记录中
val groupRdd = sourceDF.rdd.groupBy(r => r.getAs[String]("pseudo_session"))
val resRdd = groupRdd.flatMap(g => {
val pseudo_session = g._1
val resList: ListBuffer[Row] = new ListBuffer[Row]()
var rowList = g._2
rowList = rowList.toList.sortWith((x,y)=>x.getAs[String]("created") > y.getAs[String]("created"))//按created由大到小排序
var thisDeviceToken = ""
var thisDoctorId = "0"
var thisMobile = ""
rowList.foreach(row => {
var deviceToken = row.getAs[String]("device_token")
var doctorId = row.getAs[String]("doctor_id")
var mobile = row.getAs[String]("mobile")
val created = row.getAs[String]("created")
if(deviceToken!=null && !deviceToken.equals("") ){
thisDeviceToken = deviceToken
}else {
deviceToken = thisDeviceToken
}
if(doctorId!=null && !doctorId.equals("") && !doctorId.equals("0") ){
thisDoctorId = doctorId
}else {
doctorId = thisDoctorId
}
if(mobile!=null && !mobile.equals("") ){
thisMobile = mobile
}else {
mobile = thisMobile
}
resList += (Row(row.getAs[String]("pseudo_session"),
doctorId,
mobile,
deviceToken,
row.getAs[String]("user_token_tourist"),
row.getAs[String]("class_name"),
row.getAs[String]("view_path"),
row.getAs[String]("action"),
row.getAs[String]("component_tag"),
row.getAs[String]("app_version"),
row.getAs[String]("device_type"),
row.getAs[String]("device_brand"),
row.getAs[String]("device_model"),
row.getAs[String]("network_type"),
row.getAs[String]("created")))
})
resList.iterator
})
import sessionProcess.sparkSession.implicits._
//step3:根据映射表来进行action_type和class_name数据过滤
val resDF = sessionProcess.sparkSession.createDataFrame(resRdd,sourceDF.schema)
resDF.persist(StorageLevel.MEMORY_AND_DISK_SER)
println("resDF.show=======>")
resDF.show()
//根据映射表来进行action_type和class_name数据过滤
val data: RDD[Row] = resDF.rdd.mapPartitions(sessionProcess.filterRows)
println("---------------------------------------process columns-------------------------------------------")
val baseDF: DataFrame = data.mapPartitions(sessionProcess.processColumns)
.toDF("pseudo_session", "user_id", "mobile", "device_token", "user_token", "view_class", "view_path",
"action_type", "component_tag", "menu_code", "action_code", "position", "label_value","label_class", "app_version",
"device_type", "device_brand", "device_model", "device_system", "net_type", "created_time", "date_time")
println("----------------------------------compute session id---------------------------------------------")
val sessionIdDF: DataFrame = sessionProcess.getSessionId(baseDF,sessionProcess)
//默认缓存级别是:MEMORY_AND_DISK
sessionIdDF.persist(StorageLevel.MEMORY_AND_DISK_SER)
println("-------------------------------match user_id 逻辑-------------------------------------------------")
val dwFactLogSession: DataFrame = sessionProcess.matchUserId(sessionIdDF,sessionProcess.sparkSession,scnData)
println("-----------------create view fact_log_session and load to dw_fact_log_session--------------------")
dwFactLogSession.createOrReplaceTempView("fact_log_session")
val loadDataSql =
s"insert overwrite table ${MyConfigSession.HIVE_TABLE1_TMP} partition(created_day='${scnData}') select * from fact_log_session distribute by rand()"
sessionProcess.sparkSession.sql(loadDataSql)
println("----------------------------------update task record table---------------------------------------")
//任务执行成功,更新 Mysql record 配置表
val updateSQL: String =
s"""
|update ${MyConfigSession.JDBC_TABLE} set status=?,end_time=?,data_count=? where job_id=1969 and start_time='${startTime}'
""".stripMargin
val endTime: String = DateUtils.getTodayTime
val upreSta: PreparedStatement = connSql.prepareStatement(updateSQL)
upreSta.setString(1, "1")
upreSta.setString(2, endTime)
upreSta.setInt(3, sessionIdDF.count().toInt)
//更新表数据
upreSta.executeUpdate()
//关闭连接
JDBCUtil.close(connSql, upreSta)
sessionProcess.sparkSession.stop()
} catch {
case e: Exception => {
println("-----------------------------------任务异常---------------------------------------------------")
val exceptionSQL: String =
s"""
|update ${MyConfigSession.JDBC_TABLE} set status=?,exception=?,end_time=? where job_id=1969 and start_time='${startTime}'
""".stripMargin
val errorArr = Array[String]("2", e.getMessage, DateUtils.getTodayTime)
JDBCUtil.insertRecord(connSql, exceptionSQL, errorArr)
connSql.close()
}
}
}
}
class SessionProcessArgs extends java.io.Serializable {
def getSparkSession(appName: String): SparkSession = {
val conf: SparkConf = new SparkConf().setAppName(appName)
UseUtil.setConfigure(conf)
val sparkSession: SparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
sparkSession
}
val sparkSession: SparkSession = getSparkSession("SessionProcessArgs")
//获取符合要求的actionType广播变量
val actionTypeBroad =
UseUtil.getBroadcast(sparkSession, MyConfigSession.ACTION_TYPE_SQL, "action_type", "is_valid")
//获取clasName广播变量
val classNameBroad =
UseUtil.getBroadcast(sparkSession, MyConfigSession.CLASS_NAME_SQL, "class_name", "is_valid")
//获取menu_code广播变量
val menuCodeBroad =
UseUtil.getBroadcast(sparkSession, MyConfigSession.MENU_CODE_SQL, "view_path", "menu_code")
//获取actionCategory变量
val actionCategory =
UseUtil.getBroadcast(sparkSession,MyConfigSession.ACTION_CATEGORY_SQL,"action_type","action_category")
//定义函数式变量,过滤映射表数据
val filterRows: Iterator[Row] => Iterator[Row] = (rows: Iterator[Row]) => {
val rowList: ListBuffer[Row] = new ListBuffer[Row]()
val actionTypeMap: Map[String, String] = actionTypeBroad.value
val classNameMap: Map[String, String] = classNameBroad.value
//关联到action_category的映射表广播变量
val actionCategoryMap: Map[String, String] = actionCategory.value
rows.toList.foreach(row => {
//筛选action的条件
val action: String = StringUtils.getNotNullString(row.getAs[String]("action"))
//说明该action类型即为所要的
if (actionTypeMap.getOrElse(action, "-1").equals("1")) {
//将action转化为映射表中对应的标准actionCategory
val action_type: String = actionCategoryMap.getOrElse(action,"ACTION")
// val action_type = UseUtil.getActionType(action)
//action为其中的任何一个
if (action_type.equals("ACTION_CLICK") || action_type.equals("ACTION_EXPOSE")) {
//判断 component_tag 必须要包含 "#"
if (row.getAs[String]("component_tag") != null
&& row.getAs[String]("component_tag").contains("#")) {
rowList += row
}
} else if (action_type.equals("ACTION_VIEW")) {
rowList += row
//非上述三种action_type,那么需要过滤掉映射表中class_name为"0"对应的那些数据
} else if (row.getAs[String]("class_name") != null
&& !classNameMap.getOrElse(row.getAs[String]("class_name"), "-1").equals("0")) {
rowList += row
}
}
})
rowList.iterator
}
//处理字段,得到需要的字段值
val processColumns = (rows: Iterator[Row]) => {
val baseList = new ListBuffer[(String, String, String, String, String, String, String, String, String,
String, String, String, String, String, String, String, String, String, String, String,String, String)]()
//关联到menu_code的映射表广播变量
val path_menu: Map[String, String] = menuCodeBroad.value
//关联到action_category的映射表广播变量
val actionCategoryMap: Map[String, String] = actionCategory.value
rows.toList.foreach(row => {
//1.获取网络类型
//2G,3G,4G,2G/3G/4G,WIFI,WLAN,或者为空字符串
val net_type = UseUtil.netTypeMatch(StringUtils.getNotNullString(row.getAs[String]("network_type")))
//2.修改action类型
var action_type: String = ""
if (row.getAs[String]("action") !=null) {
action_type = actionCategoryMap.getOrElse(row.getAs[String]("action"),"ACTION")
}
//3.拆分 component_tag字段
val component_tag: String = StringUtils.getNotNullString(row.getAs[String]("component_tag"))
var menu_code: String = ""
var action_code: String = ""
var position: String = ""
var label_value: String = ""
var label_class:String = ""
//将符合要求的component_tag进行切割,获取 aciton_code,label_value
if (component_tag.contains("#")) {
//按照#号切割
val strs: Array[String] = component_tag.split("#")
strs.length match {
case 1 => {
menu_code = strs(0)
}
case 2 => {
menu_code = strs(0)
action_code = strs(1)
}
case 3 => {
menu_code = strs(0)
action_code = strs(1)
position = strs(2)
}
case 4 => {
menu_code = strs(0)
action_code = strs(1)
position = strs(2)
label_value = strs(3).substring(0,math.min(250,strs(3).length))
}
case _ => {
menu_code = strs(0)
action_code = strs(1)
position = strs(2)
label_value = strs(3).substring(0,math.min(250,strs(3).length))
label_class = strs(4).substring(0,math.min(250,strs(4).length))
}
}
}
//匹配menu_code:如果上述截取出来的menu_code为(''||null||0||length(menu_code)>3 ) and action is ACTION_VIEW
if ((menu_code.equals("")|| menu_code.equals("null") || menu_code.equals("0") || menu_code.length> 3 )
&& action_type.equals("ACTION_VIEW")) {
menu_code = "0" //关联不上的显示为0
import scala.util.control.Breaks._
breakable {
//利用menu_code映射表匹配
for (tuple <- path_menu) {
//源数据view_path的字符串包含映射表view_path的字符串
if (StringUtils.getNotNullString(row.getAs[String]("view_path")).contains(tuple._1)) {
//满足条件后,修改源数据的menu_code
menu_code = tuple._2
println("--------------------menu_code match successfully-----------------------")
//结束遍历
break()
}
}
//经过上述匹配,如果menu_code仍然为空串,那么置为component_tag字段一样
if (menu_code.equals("")) {
menu_code = component_tag
}
}
}
//一行数据添加到List中
baseList += ((StringUtils.getNotNullString(row.getAs[String]("pseudo_session")),
StringUtils.getNotNullString(row.getAs[String]("doctor_id")),
StringUtils.getNotNullString(row.getAs[String]("mobile")),
StringUtils.getNotNullString(row.getAs[String]("device_token")),
StringUtils.getNotNullString(row.getAs[String]("user_token_tourist")),
StringUtils.getNotNullString(row.getAs[String]("class_name")),
StringUtils.getNotNullString(row.getAs[String]("view_path")),
action_type, component_tag, menu_code, action_code, position, label_value,label_class,
StringUtils.getNotNullString(row.getAs[String]("app_version")),
StringUtils.getNotNullString(row.getAs[String]("device_type")),
StringUtils.getNotNullString(row.getAs[String]("device_brand")),
StringUtils.getNotNullString(row.getAs[String]("device_model")),
"", net_type,
StringUtils.getNotNullString(row.getAs[String]("created")),
DateUtils.milliSecondsFormatTime(StringUtils.getNotNullString(row.getAs[String]("created")))))
})
baseList.iterator
}
//按照time_gap 切割session,计算session_id
val computeSessionId = (tuple: (String, Iterable[Row])) => {
//先按照 pseudo_session 的值命名 sessionID
val sessionID: String = tuple._1
val rowList: Iterable[Row] = tuple._2
//定义一个累加量
var count: Int = 0
//存储一行的数据
val list = new ListBuffer[(String, String, String, String, String, String, String, String, String,
String, String, String, String, String, String, String, String, String, String, String,String, String)]()
rowList.toList.foreach(row => {
val created: String = StringUtils.getNotNullString(row.getAs[String]("created_time"))
val refer_created: String = StringUtils.getNotNullString(row.getAs[String]("refer_created"))
val time_diff: Long = created.toLong - refer_created.toLong
//相邻的时间差小于等于30分钟,就是同一个 sessionID
if (time_diff > MyConfigSession.SESSION_GAP) {
count = count + 1
}
//添加到List
list += ((sessionID + count,
StringUtils.getNotNullString(row.getAs[String]("user_id")),
StringUtils.getNotNullString(row.getAs[String]("mobile")),
StringUtils.getNotNullString(row.getAs[String]("device_token")),
StringUtils.getNotNullString(row.getAs[String]("user_token")),
StringUtils.getNotNullString(row.getAs[String]("view_class")),
StringUtils.getNotNullString(row.getAs[String]("view_path")),
StringUtils.getNotNullString(row.getAs[String]("action_type")),
StringUtils.getNotNullString(row.getAs[String]("component_tag")),
StringUtils.getNotNullString(row.getAs[String]("menu_code")),
StringUtils.getNotNullString(row.getAs[String]("action_code")),
StringUtils.getNotNullString(row.getAs[String]("position")),
StringUtils.getNotNullString(row.getAs[String]("label_value")),
StringUtils.getNotNullString(row.getAs[String]("label_class")),
StringUtils.getNotNullString(row.getAs[String]("app_version")),
StringUtils.getNotNullString(row.getAs[String]("device_type")),
StringUtils.getNotNullString(row.getAs[String]("device_brand")),
StringUtils.getNotNullString(row.getAs[String]("device_model")),
StringUtils.getNotNullString(row.getAs[String]("device_system")),
StringUtils.getNotNullString(row.getAs[String]("net_type")),
created, StringUtils.getNotNullString(row.getAs[String]("date_time"))))
})
list
}
/**
* @Description 按照TimeGap切割session,重命名session_id
* @param dataFrame 要处理的DataFrame
* @param sessionProcess SessionProcessArgs对象,包含SparkSession 环境
* @return org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>
**/
def getSessionId(dataFrame: DataFrame,sessionProcess:SessionProcessArgs):DataFrame = {
import sessionProcess.sparkSession.implicits._
//先按照 pseudo_session 分组,然后按照 created 排序,组件一个窗口
val pSessionWinSpec: WindowSpec = Window.partitionBy("pseudo_session").orderBy("created_time")
//增加一个字段 refer_created ,这个字段的值是上一条记录 created 字段的值,方便后面通过两者差值计算出 session_id
val rcreDF: DataFrame =
dataFrame.withColumn("refer_created", lag(dataFrame("created_time"), 1).over(pSessionWinSpec))
//执行COALESCE,目的是为了去掉 refer_created 为Null的值
val coalesceDF: DataFrame = rcreDF.selectExpr(
"pseudo_session", "user_id", "mobile", "device_token",
"user_token", "view_class", "view_path", "action_type",
"component_tag", "menu_code", "action_code", "position", "label_value","label_class",
"app_version", "device_type", "device_brand", "device_model",
"device_system", "net_type", "created_time", "date_time",
"COALESCE(refer_created,created_time) as refer_created")
//按照 pseudo_session聚合,计算 session_id
val groupRDD: RDD[(String, Iterable[Row])] =
coalesceDF.rdd.groupBy(row => row.getAs[String]("pseudo_session"))
//计算两者之差,这时候就得到了 session_id
val sessionIdDF: DataFrame = groupRDD.map(sessionProcess.computeSessionId).flatMap(it => it)
.toDF("session_id", "user_id", "mobile", "device_token", "user_token",
"view_class", "view_path", "action_type", "component_tag",
"menu_code", "action_code", "position", "label_value","label_class",
"app_version", "device_type", "device_brand", "device_model",
"device_system", "net_type", "created_time", "date_time")
sessionIdDF
}
/**
* @Description 匹配user_id,补全数据中的user_id字段
* @param dataFrame 筛选后的数据
* @param sparkSQLSession SparkSession 环境
* @param created_day 当前数据的日期,格式 "2020-03-01"
* @return org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>
**/
def matchUserId(dataFrame: DataFrame,sparkSQLSession: SparkSession,created_day:String):DataFrame={
//追加:将dataFrame与pica_ds.pica_doctor根据user_id进行匹配,匹配不上的user_id置为'0'
println("matchUserId开始执行-----------------------------------")
dataFrame.createOrReplaceTempView(MyConfigSession.VIEW_SESSION_ODS)
val DF = sparkSQLSession.sql(MyConfigSession.INIT_USER_ID_SQL)
//以下的所有逻辑是为了补全user_id字段
//第一步:首先筛选出不符合的use_id数据
val noMatchUserIdDF: Dataset[Row] = DF.where("user_id ='' OR user_id = '0' OR LENGTH(user_id) = 24")
.selectExpr("session_id","'0' as user_id", "mobile", "device_token", "user_token",
"view_class", "view_path", "action_type", "component_tag",
"menu_code", "action_code", "position", "label_value","label_class",
"app_version", "device_type", "device_brand", "device_model",
"device_system", "net_type", "created_time", "date_time")
noMatchUserIdDF.createOrReplaceTempView(MyConfigSession.VIEW_SESSION_NO_MATCH)
//1.筛选出上一步没有匹配到的user_id,先按照mobile_phone进行匹配
val mobilePhoneDF: DataFrame = sparkSQLSession.sql(MyConfigSession.MOBILE_PHONE_SQL)
mobilePhoneDF.createOrReplaceTempView(MyConfigSession.VIEW_MOBILE_PHONE)
//2.使用临时表equiment,筛选出为1的那条最新数据
val equipmentDF: DataFrame = sparkSQLSession.sql(MyConfigSession.EQUIPMENT_INFO_SQL_ARGS + s"'${created_day}'").where("row_d =1")
equipmentDF.createOrReplaceTempView(MyConfigSession.VIEW_EQUIPMENT_INFO)
//3.将第2步筛选出来的数据按照device_token进行匹配,获得user_id
val deviceTokenDF: DataFrame = sparkSQLSession.sql(MyConfigSession.DEVICE_TOKEN_SQL)
//4.将上述三者union,最终导入表中的数据
val rightUserId: Dataset[Row] = DF.where("user_id !='' and user_id != '0' and LENGTH(user_id) !=24")
val mobilePhoneResDF: Dataset[Row] = mobilePhoneDF.where("user_id !='0'")
val dwFactLogSession: Dataset[Row] = rightUserId.union(mobilePhoneResDF).union(deviceTokenDF)
dwFactLogSession.createOrReplaceTempView(MyConfigSession.VIEW_MOBILE_PHONE)
//根据pica_doctor补充user_id_int字段(字段类型转换成int型), 限制 delete_flag = 1 and creat_time截止昨日创建,未关联上显示为0
val USER_ID_INT_SQL:String=
s"""
|SELECT concat(regexp_replace( '${created_day}',"-","") ,cast(row_number() over(partition by 1 order by created_time) as string)) as id,
|ss.session_id, ss.user_id,COALESCE(b.id,0) user_id_int, ss.mobile, ss.device_token, ss.user_token,
|ss.view_class,ss.view_path,ss.action_type,ss.component_tag, ss.menu_code,
|ss.action_code, ss.position,ss.label_value,ss.label_class,ss.app_version, ss.device_type,
|ss.device_brand, ss.device_model,ss.device_system,ss.net_type,ss.created_time,
|ss.date_time from ${MyConfigSession.VIEW_DEVICE_TOKEN} AS ss
|left join (select id,cast(id as string) id_str from pica_ds.pica_doctor a where a.delete_flag = 1 and to_date(a.creat_time) <= '${created_day}') AS b on ss.user_id = b.id_str
|""".stripMargin
val userIdDF: DataFrame = sparkSQLSession.sql(USER_ID_INT_SQL)
userIdDF
}
}
......@@ -84,7 +84,7 @@ class SessionProcessHeart extends java.io.Serializable{
//处理字段,得到需要的字段值
val processColumns = (rows: Iterator[Row]) => {
val baseList = new ListBuffer[(String, String, String, String, String, String, String, String, String,
String, String, String, String, String, String, String, String, String, String, String, String)]()
String, String, String, String, String, String, String, String, String, String, String,String, String)]()
//关联到menu_code的映射表广播变量
val path_menu: Map[String, String] = menuCodeBroad.value
//关联到action_category的映射表广播变量
......@@ -105,6 +105,7 @@ class SessionProcessHeart extends java.io.Serializable{
var action_code: String = ""
var position: String = ""
var label_value: String = ""
var label_class:String = ""
//将符合要求的component_tag进行切割,获取 aciton_code,label_value
if (component_tag.contains("#")) {
//按照#号切割
......@@ -122,11 +123,18 @@ class SessionProcessHeart extends java.io.Serializable{
action_code = strs(1)
position = strs(2)
}
case 4 => {
menu_code = strs(0)
action_code = strs(1)
position = strs(2)
label_value = strs(3).substring(0,math.min(250,strs(3).length))
}
case _ => {
menu_code = strs(0)
action_code = strs(1)
position = strs(2)
label_value = strs(3).substring(0,math.min(250,strs(3).length))
label_class = strs(4).substring(0,math.min(250,strs(4).length))
}
}
}
......@@ -159,7 +167,7 @@ class SessionProcessHeart extends java.io.Serializable{
StringUtils.getNotNullString(row.getAs[String]("user_token_tourist")),
StringUtils.getNotNullString(row.getAs[String]("class_name")),
StringUtils.getNotNullString(row.getAs[String]("view_path")),
action_type, component_tag, menu_code, action_code, position, label_value,
action_type, component_tag, menu_code, action_code, position, label_value,label_class,
StringUtils.getNotNullString(row.getAs[String]("app_version")),
StringUtils.getNotNullString(row.getAs[String]("device_type")),
StringUtils.getNotNullString(row.getAs[String]("device_brand")),
......@@ -181,7 +189,7 @@ class SessionProcessHeart extends java.io.Serializable{
var count: Int = 0
//存储一行的数据
val list = new ListBuffer[(String, String, String, String, String, String, String, String, String,
String, String, String, String, String, String, String, String, String, String, String, String)]()
String, String, String, String, String, String, String, String, String, String,String, String, String)]()
rowList.toList.foreach(row => {
val created: String = StringUtils.getNotNullString(row.getAs[String]("created_time"))
val refer_created: String = StringUtils.getNotNullString(row.getAs[String]("refer_created"))
......@@ -204,6 +212,7 @@ class SessionProcessHeart extends java.io.Serializable{
StringUtils.getNotNullString(row.getAs[String]("action_code")),
StringUtils.getNotNullString(row.getAs[String]("position")),
StringUtils.getNotNullString(row.getAs[String]("label_value")),
StringUtils.getNotNullString(row.getAs[String]("label_class")),
StringUtils.getNotNullString(row.getAs[String]("app_version")),
StringUtils.getNotNullString(row.getAs[String]("device_type")),
StringUtils.getNotNullString(row.getAs[String]("device_brand")),
......@@ -230,7 +239,7 @@ object SessionProcessHeart {
val insertSQL: String =
s"""
|insert into ${MyConfigSession.DATA_BASE}.${MyConfigSession.JDBC_TABLE} (job_id,job_name,job_type,job_scn,status,start_time)
|values(1969,'pica_dw.dw_fact_log_session','3',?,'0',?)
|values(0,'pica_dw.dw_fact_log_session_heart','3',?,'0',?)
""".stripMargin
//设置同步数据的批次号,格式是2019-09-12
val scnDate: String = args(0)
......@@ -267,7 +276,7 @@ object SessionProcessHeart {
println("---------------------------------------process columns-------------------------------------------")
val baseDF: DataFrame = data.mapPartitions(sessionProcessHeart.processColumns)
.toDF("pseudo_session", "user_id", "mobile", "device_token", "user_token", "view_class", "view_path",
"action_type", "component_tag", "menu_code", "action_code", "position", "label_value", "app_version",
"action_type", "component_tag", "menu_code", "action_code", "position", "label_value", "label_class", "app_version",
"device_type", "device_brand", "device_model", "device_system", "net_type", "created_time", "date_time")
println("----------------------------------compute session id---------------------------------------------")
......@@ -302,7 +311,7 @@ object SessionProcessHeart {
//任务执行成功,更新 Mysql record 配置表
val updateSQL: String =
s"""
|update ${MyConfigSession.JDBC_TABLE} set status=?,end_time=?,data_count=? where job_id=1969 and start_time='${startTime}'
|update ${MyConfigSession.JDBC_TABLE} set status=?,end_time=?,data_count=? where job_name='pica_dw.dw_fact_log_session_heart' and start_time='${startTime}'
""".stripMargin
val endTime: String = DateUtils.getTodayTime
val upreSta: PreparedStatement = connSql.prepareStatement(updateSQL)
......@@ -316,10 +325,11 @@ object SessionProcessHeart {
sessionProcessHeart.sparkSession.stop()
} catch {
case e: Exception => {
println("-----------------------------------任务异常---------------------------------------------------")
println(s"-----------------------------------任务异常:e=${e}---------------------------------------------------")
e.printStackTrace()
val exceptionSQL: String =
s"""
|update ${MyConfigSession.JDBC_TABLE} set status=?,exception=?,end_time=? where job_id=1969 and start_time='${startTime}'
|update ${MyConfigSession.JDBC_TABLE} set status=?,exception=?,end_time=? where job_name='pica_dw.dw_fact_log_session_heart' and start_time='${startTime}'
""".stripMargin
val errorArr = Array[String]("2", e.getMessage, DateUtils.getTodayTime)
JDBCUtil.insertRecord(connSql, exceptionSQL, errorArr)
......@@ -348,7 +358,7 @@ object SessionProcessHeart {
val coalesceDF: DataFrame = rcreDF.selectExpr(
"pseudo_session", "user_id", "mobile", "device_token",
"user_token", "view_class", "view_path", "action_type",
"component_tag", "menu_code", "action_code", "position", "label_value",
"component_tag", "menu_code", "action_code", "position", "label_value","label_class",
"app_version", "device_type", "device_brand", "device_model",
"device_system", "net_type", "created_time", "date_time",
"COALESCE(refer_created,created_time) as refer_created")
......@@ -361,7 +371,7 @@ object SessionProcessHeart {
val sessionIdDF: DataFrame = groupRDD.map(sessionProcessHeart.computeSessionId).flatMap(it => it)
.toDF("session_id", "user_id", "mobile", "device_token", "user_token",
"view_class", "view_path", "action_type", "component_tag",
"menu_code", "action_code", "position", "label_value",
"menu_code", "action_code", "position", "label_value","label_class",
"app_version", "device_type", "device_brand", "device_model",
"device_system", "net_type", "created_time", "date_time")
sessionIdDF
......@@ -385,32 +395,44 @@ object SessionProcessHeart {
val noMatchUserIdDF: Dataset[Row] = DF.where("user_id ='' OR user_id = '0' OR LENGTH(user_id) = 24")
.selectExpr("session_id","'0' as user_id", "mobile", "device_token", "user_token",
"view_class", "view_path", "action_type", "component_tag",
"menu_code", "action_code", "position", "label_value",
"menu_code", "action_code", "position", "label_value","label_class",
"app_version", "device_type", "device_brand", "device_model",
"device_system", "net_type", "created_time", "date_time")
noMatchUserIdDF.createOrReplaceTempView(MyConfigSession.VIEW_SESSION_NO_MATCH)
//使用临时表equipment,筛选出为1的那条最新数据
var equipmentSql = MyConfigSession.EQUIPMENT_INFO_SQL_ARGS + s"'${created_day}'"
if(DateUtils.getYesterdayDate.equals(created_day)){
equipmentSql = MyConfigSession.EQUIPMENT_INFO_SQL
//1.筛选出上一步没有匹配到的user_id,先按照mobile_phone进行匹配
val mobilePhoneDF: DataFrame = sparkSQLSession.sql(MyConfigSession.MOBILE_PHONE_SQL)
mobilePhoneDF.createOrReplaceTempView(MyConfigSession.VIEW_MOBILE_PHONE)
//2.使用临时表equiment,筛选出为1的那条最新数据
var equipmentInfoSql = MyConfigSession.EQUIPMENT_INFO_SQL
if(!created_day.equals(DateUtils.getYesterdayDate)){//如果不是跑昨天的数据,使用equipment拉链表
equipmentInfoSql = MyConfigSession.EQUIPMENT_INFO_SQL_ARGS+ s"'${created_day}'"
}
println(s"执行equipmentSql==>${equipmentSql}")
val equipmentDF: DataFrame = sparkSQLSession.sql(equipmentSql).where("row_d =1")
println(s"equipmentInfoSql=${equipmentInfoSql}")
val equipmentDF: DataFrame = sparkSQLSession.sql(equipmentInfoSql).where("row_d =1")
equipmentDF.createOrReplaceTempView(MyConfigSession.VIEW_EQUIPMENT_INFO)
//1.将第一步筛选出来的数据先按照device_token进行匹配,获得user_id
//3.将第2步筛选出来的数据按照device_token进行匹配,获得user_id
val deviceTokenDF: DataFrame = sparkSQLSession.sql(MyConfigSession.DEVICE_TOKEN_SQL)
deviceTokenDF.createOrReplaceTempView(MyConfigSession.VIEW_DEVICE_TOKEN)
//2.筛选出上一步没有匹配到的user_id,再按照mobile_phone进行匹配
val mobilePhoneDF: DataFrame = sparkSQLSession.sql(MyConfigSession.MOBILE_PHONE_SQL)
//3.将上述三者union,最终导入表中的数据
val deviceToken: Dataset[Row] = deviceTokenDF.where("user_id !='0'")
//4.将上述三者union,最终导入表中的数据
val rightUserId: Dataset[Row] = DF.where("user_id !='' and user_id != '0' and LENGTH(user_id) !=24")
val dwFactLogSession: Dataset[Row] = rightUserId.union(deviceToken).union(mobilePhoneDF)
dwFactLogSession
val mobilePhoneResDF: Dataset[Row] = mobilePhoneDF.where("user_id !='0'")
val dwFactLogSession: Dataset[Row] = rightUserId.union(mobilePhoneResDF).union(deviceTokenDF)
dwFactLogSession.createOrReplaceTempView(MyConfigSession.VIEW_DEVICE_TOKEN)
//根据pica_doctor补充user_id_int字段(字段类型转换成int型), 限制 delete_flag = 1 and creat_time截止昨日创建,未关联上显示为0
val USER_ID_INT_SQL:String=
s"""
|SELECT concat(regexp_replace( '${created_day}',"-","") ,cast(row_number() over(partition by 1 order by created_time) as string)) as id,
|ss.session_id, ss.user_id,COALESCE(b.id,0) user_id_int, ss.mobile, ss.device_token, ss.user_token,
|ss.view_class,ss.view_path,ss.action_type,ss.component_tag, ss.menu_code,
|ss.action_code, ss.position,ss.label_value,ss.label_class,ss.app_version, ss.device_type,
|ss.device_brand, ss.device_model,ss.device_system,ss.net_type,ss.created_time,
|ss.date_time from ${MyConfigSession.VIEW_DEVICE_TOKEN} AS ss
|left join (select id,cast(id as string) id_str from pica_ds.pica_doctor a where a.delete_flag = 1 and to_date(a.creat_time) <= '${created_day}') AS b on ss.user_id = b.id_str
|""".stripMargin
val userIdDF: DataFrame = sparkSQLSession.sql(USER_ID_INT_SQL)
userIdDF
}
}
package com.session
import com.config.MyConfigSession
import com.utils.UseUtil
import org.apache.spark.SparkConf
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.expressions.{Window, WindowSpec}
import org.apache.spark.sql.functions.{lag, row_number}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SaveMode, SparkSession}
import scala.collection.mutable.ListBuffer
/**
* 暂停使用
* @Author zhenxin.ma
* @Date 2020/3/24 16:47
* @Version 1.0
*/
class SessionProcessHistoryPathArgs {
}
object SessionProcessHistoryPathArgs {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("UserActionAnalyzeArgs")
UseUtil.setConfigure(conf)
val sparkSession: SparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
val sourceSql =
s"""
|select session_id,cast(user_id as int) user_id,action_type,user_token,view_path,component_tag,menu_code,action_code,
|position,label_value,app_version,device_type,created_time,date_time from pica_dw.dw_fact_log_session
| where created_day='${args(0)}' and app_version >= '3.1.7' and action_type in ('ACTION_CLICK','ACTION_VIEW')
""".stripMargin
val sourceDF: DataFrame = sparkSession.sql(sourceSql)
sourceDF.printSchema()
//后续的join
val doctorDF: DataFrame = sparkSession.sql(s"select id from pica_ds.pica_doctor where to_date(creat_time) <='${args(0)}'")
//获取menu_code广播变量
val menuCodeBroad: Broadcast[Map[String, String]] =
UseUtil.getBroadcast(sparkSession, MyConfigSession.MENU_CODE_SQL, "view_path", "menu_code")
//匹配menu_code
import sparkSession.implicits._
val value = sourceDF.rdd.map(row => {
//关联到menu_code的映射表广播变量
val path_menu: Map[String, String] = menuCodeBroad.value
var action_type: String = row.getAs[String]("action_type")
var menu_code: String = row.getAs[String]("menu_code")
//补全menu_code
if (menu_code == null || menu_code.equals("") || menu_code.equals("null") || "ACTION_VIEW".equals(action_type)) {
import scala.util.control.Breaks._
breakable {
//利用menu_code映射表匹配
for (tuple <- path_menu) {
//源数据view_path的字符串包含映射表view_path的字符串
if (row.getAs[String]("view_path").contains(tuple._1)) {
//满足条件后,修改源数据的menu_code
menu_code = tuple._2
println("--------------------menu_code match successfully-----------------------")
//结束遍历
break()
}
}
//如果menu_code仍然为空串,那么置为 component_tag(它不为空前提)
if ("".equals(menu_code) && (row.getAs[String]("component_tag") !=null)) {
menu_code = row.getAs[String]("component_tag")
}
}
}
var session_id: String = row.getAs[String]("session_id")
if (session_id == null) {
session_id = ""
}
var user_id: Int = row.getAs[Int]("user_id")
if (user_id == null) {
user_id = 0
}
if (action_type == null) {
action_type = ""
}
var user_token: String = row.getAs[String]("user_token")
if (user_token == null) {
user_token = ""
}
val action_code: String = row.getAs[String]("action_code")
val position: String = row.getAs[String]("position")
val label_value: String = row.getAs[String]("label_value")
var app_version: String = row.getAs[String]("app_version")
if (app_version == null) {
app_version = ""
}
var device_type: String = row.getAs[String]("device_type")
if (device_type == null) {
device_type = ""
}
var created_time: String = row.getAs[String]("created_time")
if (created_time == null) {
created_time = ""
}
var date_time: String = row.getAs[String]("date_time")
if (date_time == null) {
date_time = ""
}
(session_id,user_id,action_type,user_token,menu_code,action_code,position,label_value,app_version,device_type,created_time,date_time)
})
value.toDF("session_id","user_id","action_type","user_token","menu_code","action_code",
"position","label_value","app_version","device_type","created_time","date_time").registerTempTable("filterview")
val filterDF =
sparkSession.sql(
"select * from filterview where action_type='ACTION_CLICK' OR (action_type='ACTION_VIEW' and menu_code !='null' and menu_code !='' and menu_code !='0')")
//Join doctor表
filterDF.join(doctorDF, filterDF("user_id") === doctorDF("id"), "left")
.createOrReplaceTempView("tmp_table")
val reSql: String = "select session_id,case when id is null then 0 else user_id END as user_id,action_type," +
"user_token,menu_code,action_code,position,label_value,app_version,device_type,created_time,date_time from tmp_table"
val joinDF: DataFrame = sparkSession.sql(reSql)
println("----------------------- join后数据 ---------------------------")
joinDF.printSchema()
println("-------------------------------------refer record----------------------------------------")
try {
println("-----------------------------------1111111111111-----------------------------------------")
//先按照 session_id分区,再按照 created_time排序,进行窗口计算
val sessionIDWinSpec: WindowSpec = Window.partitionBy("session_id").orderBy("created_time")
//增叫refer_字段
val menuDF: DataFrame =
joinDF.withColumn("refer_menu_code", lag(joinDF("menu_code"), 1).over(sessionIDWinSpec))
val acodeDF: DataFrame =
menuDF.withColumn("refer_action_code", lag(menuDF("action_code"), 1).over(sessionIDWinSpec))
val positionDF: DataFrame =
acodeDF.withColumn("refer_position", lag(acodeDF("position"), 1).over(sessionIDWinSpec))
val atypeDF: DataFrame =
positionDF.withColumn("refer_action_type", lag(positionDF("action_type"), 1).over(sessionIDWinSpec))
val recreatDF: DataFrame =
atypeDF.withColumn("refer_created", lag(atypeDF("created_time"), 1).over(sessionIDWinSpec))
val referDF: DataFrame =
recreatDF.withColumn("step_id", row_number().over(sessionIDWinSpec))
//临时视图,为的是去掉第一行中的空值NULL
referDF.createOrReplaceTempView("rankview")
} catch {
case e: Exception => {
println("--------------------------------任务异常1----------------------------------------------------------")
println(e.getStackTrace)
sparkSession.close()
}
}
try {
println("---------------------------------------22222222222222222-----------------------")
val coalNullSql =
"""
|select session_id,user_id,action_type,user_token,menu_code,action_code,position,label_value,
|COALESCE(refer_menu_code,"") as refer_menu_code,
|COALESCE(refer_action_code,"") as refer_action_code,
|COALESCE(refer_position,"") as refer_position,
|COALESCE(refer_action_type,"") as refer_action_type,
|COALESCE(refer_created,created_time) as refer_created,
|step_id,app_version,device_type,created_time,date_time from rankview
""".stripMargin
val coalNullDF: DataFrame = sparkSession.sql(coalNullSql)
// 在此基础上增加字段 refer_time_diff,值为 created_time, refer_created 之差
val referTimeDF: DataFrame =
coalNullDF.withColumn("refer_time_diff", coalNullDF("created_time") - coalNullDF("refer_created"))
referTimeDF.createOrReplaceTempView("referview")
} catch {
case e: Exception => {
println("--------------------------------------------任务异常2----------------------------")
println(e.getStackTrace)
sparkSession.close()
}
}
try{
println("-----------------------------------33333333333333333333-----------------------------------------")
val loadSql =
"""
|select session_id,user_id,action_type,user_token,menu_code,action_code,position,label_value,
|refer_menu_code,refer_action_code,refer_position,refer_action_type,cast(refer_time_diff as int) as refer_time_diff,step_id,
|app_version,device_type,created_time,date_time from referview
""".stripMargin
val resultDF: DataFrame = sparkSession.sql(loadSql)
resultDF.createOrReplaceTempView("result_view")
sparkSession.sql("use pica_dw")
sparkSession.sql(s"insert overwrite table dw_fact_log_session_path partition(created_day='${args(0)}') select * from result_view")
sparkSession.close()
}catch {
case e:Exception =>{
println("-------------------------任务异常3---------------------------------------------------")
println(e.getMessage)
println(e.getStackTrace)
sparkSession.close()
}
}
}
}
package com.session
import java.sql
import java.sql.PreparedStatement
import com.config.MyConfigSession
import com.pica.utils.DateUtils
import com.utils.{JDBCUtil, UseUtil}
import org.apache.spark.SparkConf
import org.apache.spark.sql.expressions.{Window, WindowSpec}
import org.apache.spark.sql.functions.{lag, row_number}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
/**
* 传递参数,处理指定日期的数据,导入到pica_dw.dw_fact_log_session_path表
* @Author zhenxin.ma
* @Date 2020/3/11 11:12
* @Version 1.0
*/
class SessionProcessPathArgs {
def getSparkSession(appName: String): SparkSession = {
val conf: SparkConf = new SparkConf().setAppName(appName)
UseUtil.setConfigure(conf)
val sparkSession: SparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
sparkSession
}
}
object SessionProcessPathArgs {
def apply(): SessionProcessPathArgs = new SessionProcessPathArgs()
def main(args: Array[String]): Unit = {
//1.执行任务之前先往record表记录
val insertSQL: String =
s"""
|insert into ${MyConfigSession.DATA_BASE}.${MyConfigSession.JDBC_TABLE} (job_id,job_name,job_type,job_scn,status,start_time)
|values(1968,'pica_dw.dw_fact_log_session_path','3',?,'0',?)
""".stripMargin
//设置同步数据的批次号,格式是2019-09-12
val scnData: String = args(0)
//设置任务开始时间,格式是2019-09-12 14:03:30
val startTime: String = DateUtils.getTodayTime
//存储SQL中的参数
val insertArr: Array[String] = Array[String](scnData, startTime)
//获取MYSQL连接
val connSql: sql.Connection = JDBCUtil.getConnection()
//向 record 表插入数据
val flag: Int = JDBCUtil.insertRecord(connSql, insertSQL, insertArr)
try {
val sparkSession: SparkSession = SessionProcessPath().getSparkSession("SessionProcessPathArgs")
//筛选源数据
val sourceSql =
s"""
|select session_id,user_id_int user_id,action_type,user_token,menu_code,action_code,position,label_value,label_class,
|app_version,device_type,created_time,date_time from ${MyConfigSession.HIVE_TABLE1_TMP}
| where created_day='${args(0)}' and app_version >= '3.1.7' and menu_code !='null' and menu_code !=''
| and ((action_type ='ACTION_VIEW' and menu_code != '0') or (action_type ='ACTION_CLICK' and action_code !=''))
""".stripMargin
val sourceDF: DataFrame = sparkSession.sql(sourceSql)
//注册日期在流量统计日期之前的用户
// val doctorDF: DataFrame = sparkSession.sql(
// "select id from pica_ds.pica_doctor where to_date(creat_time) <=DATE_SUB(current_date(),1)")
//
// sourceDF.join(doctorDF, sourceDF("user_id") === doctorDF("id"), "left")
// .createOrReplaceTempView("tmp_table")
//将id为null的记录设置为0
// val reSql: String = "select session_id,case when id is null then 0 else user_id END as user_id,action_type," +
// "user_token,menu_code,action_code,position,label_value,app_version,device_type,created_time,date_time from tmp_table"
// val selectDF: DataFrame = sparkSession.sql(reSql)
println("-----------------------------------compute refer columns-----------------------------------------")
val resultDF: DataFrame = getReferColumns(sourceDF,sparkSession)
println("-----------------------------------load data to pica_dw.dw_fact_log_session_path-----------------")
loadData(resultDF,sparkSession,scnData)
println("----------------------------------update task record table---------------------------------------")
//任务执行成功,更新 Mysql record 配置表
val updateSQL: String =
s"""
|update ${MyConfigSession.JDBC_TABLE} set status=?,end_time=?,data_count=? where job_id=1968 and start_time='${startTime}'
""".stripMargin
val upreSta: PreparedStatement = connSql.prepareStatement(updateSQL)
upreSta.setString(1, "1")
upreSta.setString(2, DateUtils.getTodayTime)
upreSta.setInt(3, resultDF.count().toInt)
//更新表数据
upreSta.executeUpdate()
//关闭连接
JDBCUtil.close(connSql, upreSta)
sparkSession.stop()
}catch {
case e:Exception => {
println("-----------------------------------任务异常---------------------------------------------------")
val exceptionSQL: String =
s"""
|update ${MyConfigSession.JDBC_TABLE} set status=?,exception=?,end_time=? where job_id=1968 and start_time='${startTime}'
""".stripMargin
val errorArr = Array[String]("2", e.getMessage, DateUtils.getTodayTime)
JDBCUtil.insertRecord(connSql, exceptionSQL, errorArr)
connSql.close()
}
}
}
/**
* @Description 获取需要的字段的refer字段
* @param dataFrame 源数据
* @param sparkSession SparkSession 环境
* @return org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>
**/
def getReferColumns(dataFrame: DataFrame ,sparkSession: SparkSession):DataFrame = {
//先按照 session_id分区,再按照 created_time排序,进行窗口计算
val sessionIDWinSpec: WindowSpec = Window.partitionBy("session_id").orderBy("created_time")
//增叫refer_字段
val menuDF: DataFrame =
dataFrame.withColumn("refer_menu_code", lag(dataFrame("menu_code"), 1).over(sessionIDWinSpec))
val acodeDF: DataFrame =
menuDF.withColumn("refer_action_code", lag(menuDF("action_code"), 1).over(sessionIDWinSpec))
val positionDF: DataFrame =
acodeDF.withColumn("refer_position", lag(acodeDF("position"), 1).over(sessionIDWinSpec))
val actypeDF: DataFrame =
positionDF.withColumn("refer_action_type", lag(positionDF("action_type"), 1).over(sessionIDWinSpec))
val recreatDF: DataFrame =
actypeDF.withColumn("refer_created", lag(actypeDF("created_time"), 1).over(sessionIDWinSpec))
val rowNumberDF: DataFrame =
recreatDF.withColumn("step_id", row_number().over(sessionIDWinSpec))
//去掉refer字段中的NULL值
val coaleseDF: DataFrame = rowNumberDF.selectExpr(
"log_session_id","session_id", "user_id", "action_type", "user_token", "menu_code", "action_code", "position", "label_value","label_class",
"COALESCE(refer_menu_code,'') as refer_menu_code",
"COALESCE(refer_action_code,'') as refer_action_code",
"COALESCE(refer_position,'') as refer_position",
"COALESCE(refer_action_type,'') as refer_action_type",
"COALESCE(refer_created,created_time) as refer_created",
"step_id", "app_version", "device_type", "created_time", "date_time")
//在此基础上增加字段 refer_time_diff,值为 created_time, refer_created 之差
val referTimeDiff: DataFrame =
coaleseDF.withColumn("refer_time_diff", coaleseDF("created_time") - coaleseDF("refer_created"))
referTimeDiff
}
/**
* @Description 导入数据到表中
* @param dataFrame 源数据
* @param sparkSession SparkSession 环境
* @param partitionDay 分区日期
* @return void
**/
def loadData(dataFrame: DataFrame, sparkSession: SparkSession, partitionDay:String):Unit = {
dataFrame.createOrReplaceTempView("result_view")
val loadDataSql =
s"""
|insert overwrite table ${MyConfigSession.HIVE_TABLE2_TMP} partition(created_day='${partitionDay}')
| select log_session_id,session_id,user_id,action_type,user_token,menu_code,action_code,position,label_value,label_class,
| refer_menu_code,refer_action_code,refer_position,refer_action_type,
| cast(refer_time_diff as int) as refer_time_diff,
| step_id,app_version,device_type,created_time,date_time,'' module_class1,'' module_class2
| from result_view
""".stripMargin
sparkSession.sql(loadDataSql)
}
}
\ No newline at end of file
package com.utils
import com.session.{SessionMenuCalc, SessionProcess, SessionProcessArgs, SessionProcessHeart, SessionProcessHistoryPathArgs, SessionProcessPath, SessionProcessPathArgs}
import com.session.{SessionMenuCalc, SessionProcess, SessionProcessHeart, SessionProcessPath }
import org.apache.hadoop.util.ProgramDriver
/**
......@@ -11,11 +11,8 @@ import org.apache.hadoop.util.ProgramDriver
object Driver {
def main(args: Array[String]): Unit = {
val driver: ProgramDriver = new ProgramDriver()
// driver.addClass("SessionProcessHistoryPathArgs",classOf[SessionProcessHistoryPathArgs],"传递日期参数--用户Session数据分析PATH")
driver.addClass("SessionProcess",classOf[SessionProcess],"用户Session数据分析导入到dw_fact_log_session表")
driver.addClass("SessionProcessArgs",classOf[SessionProcessArgs],"传递日期参数--用户Session数据分析导入到dw_fact_log_session表")
driver.addClass("SessionProcessPath",classOf[SessionProcessPath],"用户Session数据分析导入到dw_fact_log_session_path表")
driver.addClass("SessionProcessPathArgs",classOf[SessionProcessPathArgs],"传递日期参数--用户Session数据分析导入到dw_fact_log_session_path表")
driver.addClass("SessionProcessHeart",classOf[SessionProcessHeart],"用户Session数据分析导入到dw_fact_log_session_heart表")
driver.addClass("SessionMenuCalc",classOf[SessionMenuCalc],"传递日期参数--用户Session数据分析导入到dw_fact_log_session_menu_calc表")
driver.run(args)
......
Markdown 格式
0% or
您添加了 0 到此讨论。请谨慎行事。
先完成此消息的编辑!
想要评论请 注册