提交 f6990c5a 编写于 作者: wuyunfeng's avatar wuyunfeng

去掉无用的job

上级 7359ba44
package com.session
import java.sql
import java.sql.PreparedStatement
import com.config.MyConfigSession
import com.pica.utils.DateUtils
import com.utils.{JDBCUtil, UseUtil}
import org.apache.spark.SparkConf
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
/**
* 处理结果存入 pica_dw.dw_fact_log_session_menu_calc_new
* @Author yunfeng.wu
* @Date 2020/06/12 10:23
* @Version 1.0
*/
class SessionMenuCalcNew extends Serializable{
def getSparkSession(appName: String): SparkSession = {
val conf: SparkConf = new SparkConf().setAppName(appName)
UseUtil.setConfigure(conf)
val sparkSession: SparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
sparkSession
}
val sparkSession: SparkSession = getSparkSession("SessionMenuCalcNew")
def handleByMcPart1(spark: SparkSession,createdDay:String) = {
var df = spark.sql("select cast(user_id as string),session_id,created_time,date_time, menu_code,refer_menu_code,action_code,nvl(menu_time_diff,0) menu_time_diff " +
s"from pica_dw.dw_fact_log_session_term where created_day='${createdDay}' and menu_code!='200' and action_type in ('ACTION_CLICK','ACTION_VIEW') ")//and user_id='1000000186'
val groupRdd = df.rdd.groupBy(row => row.getAs[String]("user_id") + "_" + row.getAs[String]("session_id"))
val resRdd = groupRdd.flatMap(g => {
val user_session_id: String = g._1
val user_id = user_session_id.split("_")(0)
val session_id = user_session_id.split("_")(1)
var rowList: Iterable[Row] = g._2
//定义一个累加量
var before_menu = ""
var this_menu = ""
var series = 1
val list = new ListBuffer[Row]() //[(String,String,String,Integer,String,String,String)]
var mc_during_map = Map[String, Integer]()
var mc_time_map = Map[String, ArrayBuffer[String]]()
rowList = rowList.toList.sortBy(_.getAs[String]("created_time"))
rowList.foreach(row => {
this_menu = row.getAs[String]("menu_code")
val menu_time_diff = row.getAs[Integer]("menu_time_diff")
val created_time = row.getAs[String]("created_time")
var key = this_menu + "_" + series
if ("".equals(before_menu) || this_menu.equals(before_menu)) {
var sum_during: Integer = mc_during_map.getOrElse(key, 0)
mc_during_map.+=(key -> (menu_time_diff + sum_during))
before_menu = this_menu
} else {
series += 1
key = this_menu + "_" + series
var sum_during: Integer = mc_during_map.getOrElse(key, 0)
mc_during_map.+=(key -> (menu_time_diff + sum_during))
before_menu = this_menu
}
var time_arr: ArrayBuffer[String] = mc_time_map.getOrElse(key, new ArrayBuffer[String]())
time_arr += (created_time)
mc_time_map.+=(key -> time_arr)
})
mc_during_map.foreach(kv => {
val ar: ArrayBuffer[String] = mc_time_map.getOrElse(kv._1, new ArrayBuffer[String]())
// println((user_id, session_id, kv._1, kv._2, kv._1.split("_")(0), ar.toArray.min, ar.toArray.max))
list.append(Row(user_id, session_id, kv._1, kv._2, kv._1.split("_")(0),"", ar.toArray.min, ar.toArray.max))
})
list.toList
})
resRdd
}
def handleByMcPart2(spark: SparkSession, createdDay: String) = {
var df = spark.sql("select cast(user_id as string),session_id,created_time,date_time, menu_code,refer_menu_code,action_code,nvl(menu_time_diff,0) menu_time_diff " +
s"from pica_dw.dw_fact_log_session_term where created_day='${createdDay}' and menu_code ='200' and action_type in ('ACTION_CLICK','ACTION_VIEW') ")
val groupRdd = df.rdd.groupBy(row => row.getAs[String]("user_id") + "_" + row.getAs[String]("session_id"))
val resRdd = groupRdd.flatMap(g => {
val user_session_id: String = g._1
val user_id = user_session_id.split("_")(0)
val session_id = user_session_id.split("_")(1)
var rowList: Iterable[Row] = g._2
//定义一个累加量
var before_action = ""
var this_action = ""
var series = 1
val list = new ListBuffer[Row]() //[(String,String,String,Integer,String,String,String)]
var ac_during_map = Map[String, Integer]()
var ac_time_map = Map[String, ArrayBuffer[String]]()
rowList = rowList.toList.sortBy(_.getAs[String]("created_time"))
rowList.foreach(row => {
this_action = row.getAs[String]("action_code")
val menu_time_diff = row.getAs[Integer]("menu_time_diff")
val created_time = row.getAs[String]("created_time")
var key = this_action + "_" + series
if ("".equals(before_action) || this_action.equals(before_action)) {
var sum_during: Integer = ac_during_map.getOrElse(key, 0)
ac_during_map.+=(key -> (menu_time_diff + sum_during))
before_action = this_action
} else {
series += 1
key = this_action + "_" + series
var sum_during: Integer = ac_during_map.getOrElse(key, 0)
ac_during_map.+=(key -> (menu_time_diff + sum_during))
before_action = this_action
}
var time_arr: ArrayBuffer[String] = ac_time_map.getOrElse(key, new ArrayBuffer[String]())
time_arr += (created_time)
ac_time_map.+=(key -> time_arr)
})
ac_during_map.foreach(kv => {
val ar: ArrayBuffer[String] = ac_time_map.getOrElse(kv._1, new ArrayBuffer[String]())
list.append(Row(user_id, session_id, "200_0", kv._2,"200", kv._1.split("_")(0), ar.toArray.min, ar.toArray.max))
})
list.toList
})
resRdd
}
}
object SessionMenuCalcNew {
def apply(): SessionMenuCalcNew = new SessionMenuCalcNew()
def main(args: Array[String]): Unit = {
if (args.length < 2) {
System.err.println("Usage: SessionMenuCalcNew <dbTable> <createdDay>")
System.exit(1)
}
//1.执行任务之前先往record表记录
val insertSQL: String =
s"""
|insert into ${MyConfigSession.DATA_BASE}.${MyConfigSession.JDBC_TABLE} (job_id,job_name,job_type,job_scn,status,start_time)
|values(0,'pica_dw.dw_fact_log_session_menu_calc_new','3',?,'0',?)
""".stripMargin
val dbTable = args.apply(0)
val createdDay = args.apply(1)
println(s"dbTable:${dbTable},createdDay:${createdDay}")
//设置任务开始时间,格式是2019-09-12 14:03:30
val startTime: String = DateUtils.getTodayTime
//存储SQL中的参数
val insertArr: Array[String] = Array[String](createdDay, startTime)
//获取MYSQL连接
val connSql: sql.Connection = JDBCUtil.getConnection()
//向 record 表插入数据
val flag: Int = JDBCUtil.insertRecord(connSql, insertSQL, insertArr)
try {
val sessionMenuCalcNew: SessionMenuCalcNew = SessionMenuCalcNew()
val resRdd1 = sessionMenuCalcNew.handleByMcPart1(sessionMenuCalcNew.sparkSession,createdDay)
val resRdd2 = sessionMenuCalcNew.handleByMcPart2(sessionMenuCalcNew.sparkSession,createdDay)
val resRdd = resRdd1.union(resRdd2)
resRdd.take(20)
val resDf = sessionMenuCalcNew.sparkSession.createDataFrame(resRdd, StructType(
List(StructField("user_id", StringType, false),
StructField("session_id", StringType, false),
StructField("menu_code_term", StringType, false),
StructField("during_by_refer", IntegerType, false),
StructField("menu_code", StringType, false),
StructField("action_code", StringType, false),
StructField("begin_time", StringType, false),
StructField("end_time", StringType, false))
))
resDf.printSchema()
resDf.createOrReplaceTempView("session_menu_view_calc")
sessionMenuCalcNew.sparkSession.sql(s"insert overwrite table ${dbTable} partition(created_day='${createdDay}') " +
s"select cast(user_id as int) user_id,session_id,menu_code_term,during_by_refer,menu_code,action_code,begin_time,end_time from session_menu_view_calc distribute by rand()")
println("----------------------------------update task record table---------------------------------------")
//任务执行成功,更新 Mysql record 配置表
val updateSQL: String =s"update ${MyConfigSession.JDBC_TABLE} set status=?,end_time=?,data_count=? where job_name='pica_dw.dw_fact_log_session_menu_calc_new' and start_time='${startTime}'"
val endTime: String = DateUtils.getTodayTime
val upreSta: PreparedStatement = connSql.prepareStatement(updateSQL)
upreSta.setString(1, "1")
upreSta.setString(2, endTime)
upreSta.setInt(3, resDf.count().toInt)
//更新表数据
upreSta.executeUpdate()
//关闭连接
JDBCUtil.close(connSql, upreSta)
sessionMenuCalcNew.sparkSession.stop()
} catch {
case e: Exception => {
println(s"-----------------------------------任务异常:e=${e}---------------------------------------------------")
e.printStackTrace()
val exceptionSQL: String =
s"""
|update ${MyConfigSession.JDBC_TABLE} set status=?,exception=?,end_time=? where job_name='pica_dw.dw_fact_log_session_menu_calc_new' and start_time='${startTime}'
""".stripMargin
val errorArr = Array[String]("2", e.getMessage, DateUtils.getTodayTime)
JDBCUtil.insertRecord(connSql, exceptionSQL, errorArr)
connSql.close()
}
}
}
}
package com.session
import java.sql
import java.sql.PreparedStatement
import com.config.MyConfigSession
import com.pica.utils.{DateUtils, StringUtils}
import com.utils.{JDBCUtil, UseUtil}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.expressions.{Window, WindowSpec}
import org.apache.spark.sql.functions.lag
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.storage.StorageLevel
import scala.collection.mutable.ListBuffer
/**
*
* 处理埋点流量表,导入到DW层pica_dw.dw_fact_log_session_heart
*
* @Author yunfeng.wu
* @Date 2020/06/10 10:23
* @Version 1.0
*/
object SessionProcessHeart {
def apply(): SessionProcessHeart = new SessionProcessHeart()
def main(args: Array[String]): Unit = {
if(args.length<1){
System.err.println("Usage: SessionProcessHeart <yyyy-MM-dd>")
System.exit(1)
}
//1.执行任务之前先往record表记录
val insertSQL: String =
s"""
|insert into ${MyConfigSession.DATA_BASE}.${MyConfigSession.JDBC_TABLE} (job_id,job_name,job_type,job_scn,status,start_time)
|values(0,'${MyConfigSession.HIVE_TABLE3}','3',?,'0',?)
""".stripMargin
//设置同步数据的批次号,格式是2019-09-12
val scnDate: String = args(0)
//设置任务开始时间,格式是2019-09-12 14:03:30
val startTime: String = DateUtils.getTodayTime
//存储SQL中的参数
val insertArr: Array[String] = Array[String](scnDate, startTime)
//获取MYSQL连接
val connSql: sql.Connection = JDBCUtil.getConnection()
//向 record 表插入数据
val flag: Int = JDBCUtil.insertRecord(connSql, insertSQL, insertArr)
try {
val sessionProcessHeart: SessionProcessHeart = SessionProcessHeart()
//获取源数据
val sourceDF: DataFrame = sessionProcessHeart.sparkSession.sql(MyConfigSession.SOURCE_SQL_FROM_PREF + s" and created_day='${scnDate}'")
//1.重新分区,产生shuffle,Spark读Hive默认的分区数太少
//2.过滤数据,解析创建时间,只获取昨天产生的数据
//3.过滤重复的记录
//4.利用action_type和class_name过滤
// val filterDS: Dataset[Row] = sourceDF.repartition(200).filter(row => {
// var createdTime: String = row.getAs[String]("created")
// //防止出错
// if (createdTime == null) {
// createdTime = "0"
// }
// //注意这里的过滤条件,数据的批次时间要和数据产生的年月日一样,也就是当天的数据
// scnDate.equals(DateUtils.milliSecondsFormatTime(createdTime).substring(0, 10))
// }).distinct()
import sessionProcessHeart.sparkSession.implicits._
//根据映射表来进行action_type和class_name数据过滤
val data: RDD[Row] = sourceDF.rdd.mapPartitions(sessionProcessHeart.filterRows)
sourceDF.printSchema()
println("---------------------------------------process columns-------------------------------------------")
// val baseDF: DataFrame = data.mapPartitions(sessionProcessHeart.processColumns)
val baseDF: DataFrame = sessionProcessHeart.sparkSession.createDataFrame(data,StructType(
List(StructField("pseudo_session", StringType, false),
StructField("user_id", StringType, false),
StructField("mobile", StringType, false),
StructField("device_token", StringType, false),
StructField("user_token", StringType, false),
StructField("view_class", StringType, false),
StructField("view_path", StringType, false),
StructField("action", StringType, false),
StructField("action_type", StringType, false),
StructField("component_tag", StringType, false),
StructField("menu_code", StringType, false),
StructField("action_code", StringType, false),
StructField("position", StringType, false),
StructField("label_value", StringType, false),
StructField("label_class", StringType, false),
StructField("app_version", StringType, false),
StructField("device_type", StringType, false),
StructField("device_brand", StringType, false),
StructField("device_model", StringType, false),
StructField("device_system", StringType, false),
StructField("net_type", StringType, false),
StructField("created_time", StringType, false),
StructField("date_time", StringType, false))
))
// val baseDF: DataFrame = data.toDF("pseudo_session", "user_id", "mobile", "device_token", "user_token", "view_class", "view_path",
// "action_type", "component_tag", "menu_code", "action_code", "position", "label_value", "label_class", "app_version",
// "device_type", "device_brand", "device_model", "device_system", "net_type", "created_time", "date_time")
baseDF.printSchema()
println("----------------------------------compute session id---------------------------------------------")
val sessionIdDF: DataFrame = sessionProcessHeart.getSessionId(baseDF )
//默认缓存级别是:MEMORY_AND_DISK
sessionIdDF.persist(StorageLevel.MEMORY_AND_DISK_SER)
// println("-------------------------------match user_id 逻辑-------------------------------------------------")
// val dwFactLogSession: DataFrame = sessionProcessHeart.matchUserId(sessionIdDF, scnDate)
sessionIdDF.printSchema()
println("-----------------create view fact_log_session and load to dw_fact_log_session_heart--------------------")
sessionIdDF.createOrReplaceTempView("fact_log_session")
val resSql =
s"""
select a.session_id,cast(a.user_id as int) user_id,a.mobile,a.device_token,a.user_token,
|a.app_version,a.device_type,a.device_brand,a.device_model,a.date_time
|from fact_log_session a,
| (select b.user_id, b.session_id , min(b.created_time) min_ct, max(b.created_time) max_ct
| from fact_log_session b
| group by b.user_id, b.session_id ) c
|where a.user_id = c.user_id and a.session_id = c.session_id and a.created_time = c.max_ct
|distribute by rand()
|""".stripMargin
val resDF = sessionProcessHeart.sparkSession.sql(resSql)
resDF.createOrReplaceTempView("session_heart")
//根据session_id以及user_id分组取最后一次心跳记录数据进行入库
val loadDataSql =s"insert overwrite table ${MyConfigSession.HIVE_TABLE3} partition(created_day='${scnDate}') select * from session_heart "
sessionProcessHeart.sparkSession.sql(loadDataSql)
println("----------------------------------update task record table---------------------------------------")
//任务执行成功,更新 Mysql record 配置表
val updateSQL: String =
s"""
|update ${MyConfigSession.JDBC_TABLE} set status=?,end_time=?,data_count=? where job_name='${MyConfigSession.HIVE_TABLE3}' and start_time='${startTime}'
""".stripMargin
val endTime: String = DateUtils.getTodayTime
val upreSta: PreparedStatement = connSql.prepareStatement(updateSQL)
upreSta.setString(1, "1")
upreSta.setString(2, endTime)
upreSta.setInt(3, resDF.count().toInt)
//更新表数据
upreSta.executeUpdate()
//关闭连接
JDBCUtil.close(connSql, upreSta)
sessionProcessHeart.sparkSession.stop()
} catch {
case e: Exception => {
println(s"-----------------------------------任务异常:e=${e}---------------------------------------------------")
e.printStackTrace()
val exceptionSQL: String =
s"""
|update ${MyConfigSession.JDBC_TABLE} set status=?,exception=?,end_time=? where job_name='${MyConfigSession.HIVE_TABLE3}' and start_time='${startTime}'
""".stripMargin
val errorArr = Array[String]("2", e.getMessage, DateUtils.getTodayTime)
JDBCUtil.insertRecord(connSql, exceptionSQL, errorArr)
connSql.close()
}
}
}
}
class SessionProcessHeart extends java.io.Serializable{
def getSparkSession(appName: String): SparkSession = {
val conf: SparkConf = new SparkConf().setAppName(appName)
UseUtil.setConfigure(conf)
val sparkSession: SparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
sparkSession
}
val sparkSession: SparkSession = getSparkSession("SessionProcessHeart")
//获取符合要求的actionType广播变量
val actionTypeBroad =
UseUtil.getBroadcast(sparkSession, MyConfigSession.ACTION_TYPE_SQL_HEART, "action_type", "is_valid")
//获取clasName广播变量
val classNameBroad =
UseUtil.getBroadcast(sparkSession, MyConfigSession.CLASS_NAME_SQL, "class_name", "is_valid")
//获取menu_code广播变量
val menuCodeBroad =
UseUtil.getBroadcast(sparkSession, MyConfigSession.MENU_CODE_SQL, "view_path", "menu_code")
//获取actionCategory变量
val actionCategory =
UseUtil.getBroadcast(sparkSession,MyConfigSession.ACTION_CATEGORY_SQL_HEART,"action_type","action_category")
val INIT_USER_ID_SQL =
s"""
|SELECT t.session_id, COALESCE(cast(b.id as string),'0') AS user_id, t.mobile, t.device_token, t.user_token,
|t.view_class,t.view_path,t.action_type,t.component_tag, t.menu_code,
|t.action_code, t.position, t.label_value,t.label_class ,t.app_version,t.device_type,
|t.device_brand, t.device_model, t.device_system,t.net_type,t.created_time,
|t.date_time from ${MyConfigSession.VIEW_SESSION_ODS} as t
|left join pica_ds.pica_doctor as b on t.user_id = cast(b.id as string)
""".stripMargin
val MOBILE_PHONE_SQL: String =
s"""
|SELECT ss.session_id, COALESCE(cast(b.id as string),'0') AS user_id, ss.mobile, ss.device_token, ss.user_token,
|ss.view_class,ss.view_path,ss.action_type,ss.component_tag, ss.menu_code,
|ss.action_code, ss.position,ss.label_value,ss.label_class, ss.app_version, ss.device_type,
|ss.device_brand, ss.device_model,ss.device_system, ss.net_type,ss.created_time,
|ss.date_time from ${MyConfigSession.VIEW_SESSION_NO_MATCH} AS ss
|left join (select distinct id,mobile_phone from pica_ds.pica_doctor where pica_doctor.delete_flag = 1 ) AS b on ss.mobile = b.mobile_phone
""".stripMargin
val DEVICE_TOKEN_SQL: String =
s"""
|SELECT t.session_id, COALESCE(cast(b.user_id as string),'0') AS user_id, t.mobile, t.device_token, t.user_token,
|t.view_class,t.view_path,t.action_type,t.component_tag, t.menu_code,
|t.action_code, t.position, t.label_value,t.label_class, t.app_version,t.device_type,
|t.device_brand, t.device_model, t.device_system,t.net_type,t.created_time,
|t.date_time from (select * from ${MyConfigSession.VIEW_MOBILE_PHONE} a where a.user_id= '0' ) as t
|left join ${MyConfigSession.VIEW_EQUIPMENT_INFO} as b on t.device_token = b.device_token
""".stripMargin
//定义函数式变量,过滤映射表数据
val filterRows: Iterator[Row] => Iterator[Row] = (rows: Iterator[Row]) => {
val rowList: ListBuffer[Row] = new ListBuffer[Row]()
val actionTypeMap: Map[String, String] = actionTypeBroad.value
val classNameMap: Map[String, String] = classNameBroad.value
//关联到action_category的映射表广播变量
// val actionCategoryMap: Map[String, String] = actionCategory.value
rows.toList.foreach(row => {
//筛选action的条件
val action: String = StringUtils.getNotNullString(row.getAs[String]("action"))
//说明该action类型即为所要的
if (actionTypeMap.getOrElse(action, "-1").equals("1")) {
val action_type: String = StringUtils.getNotNullString(row.getAs[String]("action_type"))
//将action转化为映射表中对应的标准actionCategory
// val action_type: String = actionCategoryMap.getOrElse(action,"ACTION")
//action为其中的任何一个
if (action_type.equals("ACTION_CLICK") || action_type.equals("ACTION_EXPOSE")) {
//判断 component_tag 必须要包含 "#"
if (row.getAs[String]("component_tag") != null
&& row.getAs[String]("component_tag").contains("#")) {
rowList += row
}
} else if (action_type.equals("ACTION_VIEW")) {
rowList += row
//非上述三种action_type,那么需要过滤掉映射表中class_name为"0"对应的那些数据
} else if (row.getAs[String]("view_class") != null
&& !classNameMap.getOrElse(row.getAs[String]("view_class"), "-1").equals("0")) {
rowList += row
}
}
})
rowList.iterator
}
//处理字段,得到需要的字段值
val processColumns = (rows: Iterator[Row]) => {
val baseList = new ListBuffer[(String, String, String, String, String, String, String, String, String,
String, String, String, String, String, String, String, String, String, String, String,String, String)]()
//关联到menu_code的映射表广播变量
val path_menu: Map[String, String] = menuCodeBroad.value
//关联到action_category的映射表广播变量
val actionCategoryMap: Map[String, String] = actionCategory.value
rows.toList.foreach(row => {
//1.获取网络类型
//2G,3G,4G,2G/3G/4G,WIFI,WLAN,或者为空字符串
val net_type = UseUtil.netTypeMatch(StringUtils.getNotNullString(row.getAs[String]("network_type")))
//2.修改action类型
var action_type: String = ""
if (row.getAs[String]("action") !=null) {
action_type = actionCategoryMap.getOrElse(row.getAs[String]("action"),"ACTION")
}
//3.拆分 component_tag字段
val component_tag: String = StringUtils.getNotNullString(row.getAs[String]("component_tag"))
var menu_code: String = ""
var action_code: String = ""
var position: String = ""
var label_value: String = ""
var label_class:String = ""
//将符合要求的component_tag进行切割,获取 aciton_code,label_value
if (component_tag.contains("#")) {
//按照#号切割
val strs: Array[String] = component_tag.split("#")
strs.length match {
case 1 => {
menu_code = strs(0)
}
case 2 => {
menu_code = strs(0)
action_code = strs(1)
}
case 3 => {
menu_code = strs(0)
action_code = strs(1)
position = strs(2)
}
case 4 => {
menu_code = strs(0)
action_code = strs(1)
position = strs(2)
label_value = strs(3).substring(0,math.min(250,strs(3).length))
}
case _ => {
menu_code = strs(0)
action_code = strs(1)
position = strs(2)
label_value = strs(3).substring(0,math.min(250,strs(3).length))
label_class = strs(4).substring(0,math.min(250,strs(4).length))
}
}
}
//匹配menu_code:如果上述截取出来的menu_code为'',null或者action is ACTION_VIEW
if (menu_code.equals("")|| menu_code.equals("null") || action_type.equals("ACTION_VIEW")) {
import scala.util.control.Breaks._
breakable {
//利用menu_code映射表匹配
for (tuple <- path_menu) {
//源数据view_path的字符串包含映射表view_path的字符串
if (StringUtils.getNotNullString(row.getAs[String]("view_path")).contains(tuple._1)) {
//满足条件后,修改源数据的menu_code
menu_code = tuple._2
println("--------------------menu_code match successfully-----------------------")
//结束遍历
break()
}
}
//经过上述匹配,如果menu_code仍然为空串,那么置为component_tag字段一样
if (menu_code.equals("")) {
menu_code = component_tag
}
}
}
//一行数据添加到List中
baseList += ((StringUtils.getNotNullString(row.getAs[String]("pseudo_session")),
StringUtils.getNotNullString(row.getAs[String]("doctor_id")),
StringUtils.getNotNullString(row.getAs[String]("mobile")),
StringUtils.getNotNullString(row.getAs[String]("device_token")),
StringUtils.getNotNullString(row.getAs[String]("user_token_tourist")),
StringUtils.getNotNullString(row.getAs[String]("class_name")),
StringUtils.getNotNullString(row.getAs[String]("view_path")),
action_type, component_tag, menu_code, action_code, position, label_value,label_class,
StringUtils.getNotNullString(row.getAs[String]("app_version")),
StringUtils.getNotNullString(row.getAs[String]("device_type")),
StringUtils.getNotNullString(row.getAs[String]("device_brand")),
StringUtils.getNotNullString(row.getAs[String]("device_model")),
"", net_type,
StringUtils.getNotNullString(row.getAs[String]("created")),
DateUtils.milliSecondsFormatTime(StringUtils.getNotNullString(row.getAs[String]("created")))))
})
baseList.iterator
}
//按照time_gap 切割session,计算session_id
val computeSessionId = (tuple: (String, Iterable[Row])) => {
//先按照 pseudo_session 的值命名 sessionID
val sessionID: String = tuple._1
val rowList: Iterable[Row] = tuple._2
//定义一个累加量
var count: Int = 0
//存储一行的数据
val list = new ListBuffer[(String, String, String, String, String, String, String, String, String,
String, String, String, String, String, String, String, String, String, String,String, String, String)]()
rowList.toList.foreach(row => {
val created: String = StringUtils.getNotNullString(row.getAs[String]("created_time"))
val refer_created: String = StringUtils.getNotNullString(row.getAs[String]("refer_created"))
val time_diff: Long = created.toLong - refer_created.toLong
//相邻的时间差小于等于30分钟,就是同一个 sessionID
if (time_diff > MyConfigSession.SESSION_GAP) {
count = count + 1
}
//添加到List
list += ((sessionID + count,
StringUtils.getNotNullString(row.getAs[String]("user_id")),
StringUtils.getNotNullString(row.getAs[String]("mobile")),
StringUtils.getNotNullString(row.getAs[String]("device_token")),
StringUtils.getNotNullString(row.getAs[String]("user_token")),
StringUtils.getNotNullString(row.getAs[String]("view_class")),
StringUtils.getNotNullString(row.getAs[String]("view_path")),
StringUtils.getNotNullString(row.getAs[String]("action_type")),
StringUtils.getNotNullString(row.getAs[String]("component_tag")),
StringUtils.getNotNullString(row.getAs[String]("menu_code")),
StringUtils.getNotNullString(row.getAs[String]("action_code")),
StringUtils.getNotNullString(row.getAs[String]("position")),
StringUtils.getNotNullString(row.getAs[String]("label_value")),
StringUtils.getNotNullString(row.getAs[String]("label_class")),
StringUtils.getNotNullString(row.getAs[String]("app_version")),
StringUtils.getNotNullString(row.getAs[String]("device_type")),
StringUtils.getNotNullString(row.getAs[String]("device_brand")),
StringUtils.getNotNullString(row.getAs[String]("device_model")),
StringUtils.getNotNullString(row.getAs[String]("device_system")),
StringUtils.getNotNullString(row.getAs[String]("net_type")),
created, StringUtils.getNotNullString(row.getAs[String]("date_time"))))
})
list
}
/**
* @Description 按照TimeGap切割session,重命名session_id
* @param dataFrame 要处理的DataFrame
* @return org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>
**/
def getSessionId(dataFrame: DataFrame ):DataFrame = {
import sparkSession.implicits._
//先按照 pseudo_session 分组,然后按照 created 排序,组件一个窗口
val pSessionWinSpec: WindowSpec = Window.partitionBy("pseudo_session").orderBy("created_time")
//增加一个字段 refer_created ,这个字段的值是上一条记录 created 字段的值,方便后面通过两者差值计算出 session_id
val rcreDF: DataFrame =
dataFrame.withColumn("refer_created", lag(dataFrame("created_time"), 1).over(pSessionWinSpec))
//执行COALESCE,目的是为了去掉 refer_created 为Null的值
val coalesceDF: DataFrame = rcreDF.selectExpr(
"pseudo_session", "user_id", "mobile", "device_token",
"user_token", "view_class", "view_path", "action_type",
"component_tag", "menu_code", "action_code", "position", "label_value","label_class",
"app_version", "device_type", "device_brand", "device_model",
"device_system", "net_type", "created_time", "date_time",
"COALESCE(refer_created,created_time) as refer_created")
//按照 pseudo_session聚合,计算 session_id
val groupRDD: RDD[(String, Iterable[Row])] =
coalesceDF.rdd.groupBy(row => row.getAs[String]("pseudo_session"))
//计算两者之差,这时候就得到了 session_id
val sessionIdDF: DataFrame = groupRDD.map( computeSessionId).flatMap(it => it)
.toDF("session_id", "user_id", "mobile", "device_token", "user_token",
"view_class", "view_path", "action_type", "component_tag",
"menu_code", "action_code", "position", "label_value","label_class",
"app_version", "device_type", "device_brand", "device_model",
"device_system", "net_type", "created_time", "date_time")
sessionIdDF
}
/**
* @Description 匹配user_id,补全数据中的user_id字段
* @param dataFrame 筛选后的数据
* @param created_day 当前数据的日期,格式 "2020-03-01"
* @return org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>
**/
def matchUserId(dataFrame: DataFrame, created_day:String):DataFrame={
//追加:将dataFrame与pica_ds.pica_doctor根据user_id进行匹配,匹配不上的user_id置为'0'
println("matchUserId开始执行-----------------------------------")
dataFrame.createOrReplaceTempView(MyConfigSession.VIEW_SESSION_ODS)
val DF = sparkSession.sql( INIT_USER_ID_SQL)
//以下的所有逻辑是为了补全user_id字段
//第一步:首先筛选出不符合的use_id数据,将这些user_id置为字符串'0'
val noMatchUserIdDF: Dataset[Row] = DF.where("user_id ='' OR user_id = '0' OR LENGTH(user_id) = 24")
.selectExpr("session_id","'0' as user_id", "mobile", "device_token", "user_token",
"view_class", "view_path", "action_type", "component_tag",
"menu_code", "action_code", "position", "label_value","label_class",
"app_version", "device_type", "device_brand", "device_model",
"device_system", "net_type", "created_time", "date_time")
noMatchUserIdDF.createOrReplaceTempView(MyConfigSession.VIEW_SESSION_NO_MATCH)
//1.筛选出上一步没有匹配到的user_id,先按照mobile_phone进行匹配
val mobilePhoneDF: DataFrame = sparkSession.sql( MOBILE_PHONE_SQL)
mobilePhoneDF.createOrReplaceTempView(MyConfigSession.VIEW_MOBILE_PHONE)
//2.使用临时表equiment,筛选出为1的那条最新数据
var equipmentInfoSql = MyConfigSession.EQUIPMENT_INFO_SQL
if(!created_day.equals(DateUtils.getYesterdayDate)){//如果不是跑昨天的数据,使用equipment拉链表
equipmentInfoSql = MyConfigSession.EQUIPMENT_INFO_SQL_ARGS+ s"'${created_day}'"
}
println(s"equipmentInfoSql=${equipmentInfoSql}")
val equipmentDF: DataFrame = sparkSession.sql(equipmentInfoSql).where("row_d =1")
equipmentDF.createOrReplaceTempView(MyConfigSession.VIEW_EQUIPMENT_INFO)
//3.将第2步筛选出来的数据按照device_token进行匹配,获得user_id
val deviceTokenDF: DataFrame = sparkSession.sql( DEVICE_TOKEN_SQL)
//4.将上述三者union,最终导入表中的数据
val rightUserId: Dataset[Row] = DF.where("user_id !='' and user_id != '0' and LENGTH(user_id) !=24")
val mobilePhoneResDF: Dataset[Row] = mobilePhoneDF.where("user_id !='0'")
val dwFactLogSession: Dataset[Row] = rightUserId.union(mobilePhoneResDF).union(deviceTokenDF)
dwFactLogSession.createOrReplaceTempView(MyConfigSession.VIEW_DEVICE_TOKEN)
//根据pica_doctor补充user_id_int字段(字段类型转换成int型), 限制 delete_flag = 1 and creat_time截止昨日创建,未关联上显示为0
val USER_ID_INT_SQL:String=
s"""
|SELECT concat(regexp_replace( '${created_day}',"-","") ,cast(row_number() over(partition by 1 order by created_time) as string)) as id,
|ss.session_id, ss.user_id,COALESCE(b.id,0) user_id_int, ss.mobile, ss.device_token, ss.user_token,
|ss.view_class,ss.view_path,ss.action_type,ss.component_tag, ss.menu_code,
|ss.action_code, ss.position,ss.label_value,ss.label_class,ss.app_version, ss.device_type,
|ss.device_brand, ss.device_model,ss.device_system,ss.net_type,ss.created_time,
|ss.date_time from ${MyConfigSession.VIEW_DEVICE_TOKEN} AS ss
|left join (select id,cast(id as string) id_str from pica_ds.pica_doctor a where a.delete_flag = 1 and to_date(a.creat_time) <= '${created_day}') AS b on ss.user_id = b.id_str
|""".stripMargin
val userIdDF: DataFrame = sparkSession.sql(USER_ID_INT_SQL)
userIdDF
}
}
package com.session
import java.sql
import java.sql.PreparedStatement
import com.config.MyConfigSession
import com.pica.utils.{DateUtils, StringUtils}
import com.utils.{JDBCUtil, UseUtil}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.storage.StorageLevel
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
/**
* 处理埋点数据,进行简单的清晰过滤,导入到DW层pica_dw.dw_fact_log_session_pref,已废弃
* @Author yunfeng.wu
* @Date 2020/08/07 09:23
* @Version 1.0
*/
object SessionProcessPref {
def apply(): SessionProcessPref = new SessionProcessPref()
def main(args: Array[String]): Unit = {
//1.执行任务之前先往record表记录
val insertSQL: String =
s"""
|insert into ${MyConfigSession.DATA_BASE}.${MyConfigSession.JDBC_TABLE} (job_id,job_name,job_type,job_scn,status,start_time)
|values(0,'${MyConfigSession.HIVE_TABLE0}','3',?,'0',?)
""".stripMargin
//设置同步数据的批次号,格式是2019-09-12
var scnData: String = DateUtils.getYesterdayDate
if (args.length >= 1) {
scnData = args(0)
}
println(s"scnData=${scnData}")
//设置任务开始时间,格式是2019-09-12 14:03:30
val startTime: String = DateUtils.getTodayTime
//存储SQL中的参数
val insertArr: Array[String] = Array[String](scnData, startTime)
//获取MYSQL连接
val connSql: sql.Connection = JDBCUtil.getConnection()
//向 record 表插入数据
val flag: Int = JDBCUtil.insertRecord(connSql, insertSQL, insertArr)
try {
val sessionProcessPref: SessionProcessPref = SessionProcessPref()
//step1:获取源数据,重新分区,产生shuffle,Spark读Hive默认的分区数太少,并对数据去重
var sourceDF: DataFrame = sessionProcessPref.sparkSession.sql(MyConfigSession.SOURCE_SQL_PREF + s" and created_day='${scnData}'").repartition(120).distinct()
var dataCount = 0
var index = 0
val conditionGroup = List("<='4' ","between '5' and '9'",">'9'")
for(condition <- conditionGroup){
index += 1
val slideDF = sourceDF.where(s" SUBSTRING(pseudo_session,1,1) ${condition}").repartition(100)
println(s"-----------------------------------compute refer columns,condition=${condition}-----------------------------------------")
//step2:抽取出当天pseudo_session对应的非空的device_token,doctor_id,mobile,补充到对应的pseudo_session下这几项为空的记录中
val baseRdd:RDD[SessionPref] = sessionProcessPref.offsetValues(slideDF)
println("---------------------------------------process columns-------------------------------------------")
import sessionProcessPref.sparkSession.implicits._
var baseDF = baseRdd.toDF("pseudo_session", "user_id", "mobile", "device_token", "user_token", "view_class", "view_path", "action", "action_type", "component_tag",
"menu_code", "menu_code_new", "action_code", "position", "label_value", "label_class", "module_class_1", "module_class_2", "app_version", "device_type", "device_brand", "device_model",
"net_type", "created_time", "date_time", "web_data", "web_data_type", "alternate_info", "login_state", "first_app_version", "serviceName", "tag8", "tag9", "tag10")
println("baseDF.show=======>")
baseDF.printSchema()
baseDF.repartition(120).persist(StorageLevel.MEMORY_AND_DISK_SER)
sessionProcessPref.loadData(baseDF,scnData,index,dataCount)
dataCount += baseDF.count().toInt
}
println("----------------------------------update task record table---------------------------------------")
//任务执行成功,更新 Mysql record配置表
val updateSQL: String =
s"""
|update ${MyConfigSession.JDBC_TABLE} set status=?,end_time=?,data_count=? where job_name='${MyConfigSession.HIVE_TABLE0}' and start_time='${startTime}'
""".stripMargin
val endTime: String = DateUtils.getTodayTime
val upreSta: PreparedStatement = connSql.prepareStatement(updateSQL)
upreSta.setString(1, "1")
upreSta.setString(2, endTime)
upreSta.setInt(3, dataCount.toInt)
//更新表数据
upreSta.executeUpdate()
//关闭连接
JDBCUtil.close(connSql, upreSta)
sessionProcessPref.sparkSession.stop()
} catch {
case e: Exception => {
println(s"-----------------------------------任务异常---------------------------------------------------")
e.printStackTrace()
val exceptionSQL: String =
s"""
|update ${MyConfigSession.JDBC_TABLE} set status=?,exception=?,end_time=? where job_name='${MyConfigSession.HIVE_TABLE0}' and start_time='${startTime}'
""".stripMargin
val errorArr = Array[String]("2", e.getMessage, DateUtils.getTodayTime)
JDBCUtil.insertRecord(connSql, exceptionSQL, errorArr)
connSql.close()
}
}
}
}
class SessionProcessPref extends java.io.Serializable {
def getSparkSession(appName: String): SparkSession = {
val conf: SparkConf = new SparkConf().setAppName(appName)
UseUtil.setConfigure(conf)
val sparkSession: SparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
sparkSession
}
val sparkSession: SparkSession = getSparkSession("SessionProcessPref")
val menuCodeBroad = UseUtil.getBroadcast(sparkSession, MyConfigSession.MENU_CODE_SQL, "view_path", "menu_code")
// //获取actionCategory变量
val actionCategory = UseUtil.getBroadcast(sparkSession, MyConfigSession.ACTION_CATEGORY_SQL, "action_type", "action_category")
def offsetValues(dataFrame: DataFrame): RDD[SessionPref] = {
val groupRdd = dataFrame.rdd.groupBy(r => r.getAs[String]("pseudo_session"))
val baseRdd = groupRdd.flatMap(g => {
val pseudo_session = g._1
val resList: ListBuffer[SessionPref] = new ListBuffer[SessionPref]()
var rowList = g._2
rowList = rowList.toList.sortWith((x, y) => x.getAs[String]("created") > y.getAs[String]("created")) //按created由大到小排序
var thisDeviceToken = ""
var thisDoctorId = "0"
var thisMobile = ""
val path_menu: Map[String, String] = menuCodeBroad.value //关联到menu_code的映射表广播变量
val actionCategoryMap: Map[String, String] = actionCategory.value //关联到action_category的映射表广播变量
rowList.foreach(row => {
var deviceToken = row.getAs[String]("device_token")
var doctorId = row.getAs[String]("doctor_id")
var mobile = row.getAs[String]("mobile")
if (deviceToken != null && !deviceToken.equals("")) {
thisDeviceToken = deviceToken
} else {
deviceToken = thisDeviceToken
}
if (doctorId != null && !doctorId.equals("") && !doctorId.equals("0")) {
thisDoctorId = doctorId
} else {
doctorId = thisDoctorId
}
if (mobile != null && !mobile.equals("")) {
thisMobile = mobile
} else {
mobile = thisMobile
}
//1.获取网络类型
//2G,3G,4G,2G/3G/4G,WIFI,WLAN,或者为空字符串
val net_type = UseUtil.netTypeMatch(StringUtils.getNotNullString(row.getAs[String]("network_type")))
//2.修改action类型,保留原始字段
val action = row.getAs[String]("action")
var action_type: String = ""
if (row.getAs[String]("action") != null) {
action_type = actionCategoryMap.getOrElse(action, "ACTION_UNKNOWN")
}
//3.拆分 component_tag字段
val component_tag: String = StringUtils.getNotNullString(row.getAs[String]("component_tag"))
val tagArr = Array("menu_code", "action_code", "position", "label_value", "label_class", "module_class_1", "module_class_2", "tag8", "tag9", "tag10")
val tagMap = mutable.Map[String, String]()
tagArr.foreach(r => tagMap.put(r, ""))
//将符合要求的component_tag进行切割,获取 aciton_code,label_value
if (component_tag.contains("#")) {
//按照#号切割
val strs: Array[String] = component_tag.split("#")
var index = 0
for (value <- strs) {
val filedName = tagArr.apply(index)
if ("label_value".equals(filedName)) {
tagMap.put(filedName, value.substring(0, math.min(250, strs(3).length)))
} else {
tagMap.put(filedName, value)
}
index += 1
}
}
var menu_code_new = tagMap("menu_code")
if ("MenuCode_081".equals(menu_code_new)) {
menu_code_new = "081" //针对异常menu_code值单独处理
}
//匹配menu_code:如果上述截取出来的menu_code为(''||null||0||length(menu_code)>3 ) and action is ACTION_VIEW
if ((menu_code_new.equals("") || menu_code_new.equals("null") || menu_code_new.equals("0") || menu_code_new.length > 3)
&& action_type.equals("ACTION_VIEW")) {
menu_code_new = "0" //关联不上的显示为0
import scala.util.control.Breaks._
breakable {
//利用menu_code映射表匹配
for (tuple <- path_menu) {
//源数据view_path的字符串包含映射表view_path的字符串
if (StringUtils.getNotNullString(row.getAs[String]("view_path")).contains(tuple._1)) {
//满足条件后,修改源数据的menu_code
menu_code_new = tuple._2
println("--------------------menu_code match successfully-----------------------")
//结束遍历
break()
}
}
}
}
resList += SessionPref(pseudo_session,
doctorId,
mobile,
deviceToken,
StringUtils.getNotNullString(row.getAs[String]("user_token_tourist")),
StringUtils.getNotNullString(row.getAs[String]("class_name")),
StringUtils.getNotNullString(row.getAs[String]("view_path")),
action, action_type, component_tag, tagMap("menu_code"), menu_code_new, tagMap("action_code"),
tagMap("position"), tagMap("label_value"), tagMap("label_class"), tagMap("module_class_1"), tagMap("module_class_2"),
StringUtils.getNotNullString(row.getAs[String]("app_version")),
StringUtils.getNotNullString(row.getAs[String]("device_type")),
StringUtils.getNotNullString(row.getAs[String]("device_brand")),
StringUtils.getNotNullString(row.getAs[String]("device_model")),
net_type,
StringUtils.getNotNullString(row.getAs[String]("created")),
DateUtils.milliSecondsFormatTime(StringUtils.getNotNullString(row.getAs[String]("created"))),
StringUtils.getNotNullString(row.getAs[String]("web_data")),
StringUtils.getNotNullString(row.getAs[String]("web_data_type")),
StringUtils.getNotNullString(row.getAs[String]("alternate_info")),
StringUtils.getNotNullString(row.getAs[String]("login_state")),
StringUtils.getNotNullString(row.getAs[String]("first_app_version")),
StringUtils.getNotNullString(row.getAs[String]("serviceName")),
tagMap("tag8"), tagMap("tag9"), tagMap("tag10")
)
})
resList.iterator
})
baseRdd
}
def loadData(dataFrame: DataFrame, partitionDay: String,index:Integer,count:Integer): Unit = {
val tmpTable = "result_view"
var insertSql = "insert overwrite"
if(index!=1){
insertSql = "insert into"
}
println(s"-----------------create view ${tmpTable} and load to ${MyConfigSession.HIVE_TABLE0} --------------------")
dataFrame.repartition(10).createOrReplaceTempView(tmpTable)
val fields = List("pseudo_session", "user_id", "COALESCE(cast(user_id as int),0) user_id_int", "mobile", "device_token", "user_token", "view_class", "view_path", "action", "action_type",
"component_tag", "menu_code", "menu_code_new", "action_code", "position", "label_value", "label_class", "module_class_1", "module_class_2", "app_version", "device_type", "device_brand",
"device_model", "net_type", "created_time", "date_time", "web_data", "web_data_type", "alternate_info", "login_state", "first_app_version",
"servicename", "tag8", "tag9", "tag10")
val loadDataSql = s"${insertSql} table ${MyConfigSession.HIVE_TABLE0} partition(created_day='${partitionDay}') select concat(regexp_replace( '${partitionDay}','-','') ,cast( (row_number() over(partition by 1 order by created_time) +${count}) as string)) as id," +
s"${fields.mkString(",")} from ${tmpTable} distribute by rand()"
sparkSession.sql(loadDataSql)
}
}
case class SessionPref(pseudo_session: String,
user_id: String,
mobile: String,
device_token: String,
user_token: String,
view_class: String,
view_path: String,
action: String,
action_type: String,
component_tag: String,
menu_code: String,
menu_code_new: String,
action_code: String,
position: String,
label_value: String,
label_class: String,
module_class_1: String,
module_class_2: String,
app_version: String,
device_type: String,
device_brand: String,
device_model: String,
net_type: String,
created_time: String,
date_time: String,
web_data: String,
web_data_type: String,
alternate_info: String,
login_state: String,
first_app_version: String,
serviceName: String,
tag8: String, tag9: String, tag10: String)
package com.utils package com.utils
import com.session.{SessionMenuCalc, SessionMenuCalcNew, SessionProcess, SessionProcessHeart, SessionProcessPath, SessionProcessPathNew, SessionProcessPref, SessionProcessTerm} import com.session.{SessionMenuCalc, SessionProcess, SessionProcessPath, SessionProcessPathNew, SessionProcessTerm}
import org.apache.hadoop.util.ProgramDriver import org.apache.hadoop.util.ProgramDriver
/** /**
...@@ -13,10 +13,8 @@ object Driver { ...@@ -13,10 +13,8 @@ object Driver {
val driver: ProgramDriver = new ProgramDriver() val driver: ProgramDriver = new ProgramDriver()
driver.addClass("SessionProcess",classOf[SessionProcess],"用户Session数据分析导入到dw_fact_log_session表") driver.addClass("SessionProcess",classOf[SessionProcess],"用户Session数据分析导入到dw_fact_log_session表")
driver.addClass("SessionProcessPath",classOf[SessionProcessPath],"用户Session数据分析导入到dw_fact_log_session_path表") driver.addClass("SessionProcessPath",classOf[SessionProcessPath],"用户Session数据分析导入到dw_fact_log_session_path表")
driver.addClass("SessionProcessHeart",classOf[SessionProcessHeart],"用户Session数据分析导入到dw_fact_log_session_heart表")
driver.addClass("SessionMenuCalc",classOf[SessionMenuCalc],"传递日期参数--用户Session数据分析导入到dw_fact_log_session_menu_calc表") driver.addClass("SessionMenuCalc",classOf[SessionMenuCalc],"传递日期参数--用户Session数据分析导入到dw_fact_log_session_menu_calc表")
driver.addClass("SessionProcessTerm",classOf[SessionProcessTerm],"传递日期参数--用户Session数据etl导入到dw_fact_log_sesson_term表") driver.addClass("SessionProcessTerm",classOf[SessionProcessTerm],"传递日期参数--用户Session数据etl导入到dw_fact_log_sesson_term表")
driver.addClass("SessionMenuCalcNew",classOf[SessionMenuCalcNew],"传递日期参数--用户Session数据分析导入到dw_fact_log_session_menu_calc_new表")
driver.addClass("SessionProcessPathNew",classOf[SessionProcessPathNew],"用户Session数据分析导入到dw_fact_log_session_path_new表") driver.addClass("SessionProcessPathNew",classOf[SessionProcessPathNew],"用户Session数据分析导入到dw_fact_log_session_path_new表")
driver.run(args) driver.run(args)
......
Markdown 格式
0% or
您添加了 0 到此讨论。请谨慎行事。
先完成此消息的编辑!
想要评论请 注册