提交 b98a80ff 编写于 作者: wuyunfeng's avatar wuyunfeng

修改SessionProcessHeart

上级 2df226da
...@@ -23,6 +23,123 @@ import scala.collection.mutable.ListBuffer ...@@ -23,6 +23,123 @@ import scala.collection.mutable.ListBuffer
* @Date 2020/06/10 10:23 * @Date 2020/06/10 10:23
* @Version 1.0 * @Version 1.0
*/ */
object SessionProcessHeart {
def apply(): SessionProcessHeart = new SessionProcessHeart()
def main(args: Array[String]): Unit = {
if(args.length<1){
System.err.println("Usage: SessionProcessHeart <yyyy-MM-dd>")
System.exit(1)
}
//1.执行任务之前先往record表记录
val insertSQL: String =
s"""
|insert into ${MyConfigSession.DATA_BASE}.${MyConfigSession.JDBC_TABLE} (job_id,job_name,job_type,job_scn,status,start_time)
|values(0,'pica_dw.dw_fact_log_session_heart','3',?,'0',?)
""".stripMargin
//设置同步数据的批次号,格式是2019-09-12
val scnDate: String = args(0)
//设置任务开始时间,格式是2019-09-12 14:03:30
val startTime: String = DateUtils.getTodayTime
//存储SQL中的参数
val insertArr: Array[String] = Array[String](scnDate, startTime)
//获取MYSQL连接
val connSql: sql.Connection = JDBCUtil.getConnection()
//向 record 表插入数据
val flag: Int = JDBCUtil.insertRecord(connSql, insertSQL, insertArr)
try {
val sessionProcessHeart: SessionProcessHeart = SessionProcessHeart()
//获取源数据
val sourceDF: DataFrame = sessionProcessHeart.sparkSession.sql(MyConfigSession.SOURCE_SQL_ARGS + s" and created_day='${scnDate}'")
//1.重新分区,产生shuffle,Spark读Hive默认的分区数太少
//2.过滤数据,解析创建时间,只获取昨天产生的数据
//3.过滤重复的记录
//4.利用action_type和class_name过滤
val filterDS: Dataset[Row] = sourceDF.repartition(200).filter(row => {
var createdTime: String = row.getAs[String]("created")
//防止出错
if (createdTime == null) {
createdTime = "0"
}
//注意这里的过滤条件,数据的批次时间要和数据产生的年月日一样,也就是当天的数据
scnDate.equals(DateUtils.milliSecondsFormatTime(createdTime).substring(0, 10))
}).distinct()
import sessionProcessHeart.sparkSession.implicits._
//根据映射表来进行action_type和class_name数据过滤
val data: RDD[Row] = filterDS.rdd.mapPartitions(sessionProcessHeart.filterRows)
println("---------------------------------------process columns-------------------------------------------")
val baseDF: DataFrame = data.mapPartitions(sessionProcessHeart.processColumns)
.toDF("pseudo_session", "user_id", "mobile", "device_token", "user_token", "view_class", "view_path",
"action_type", "component_tag", "menu_code", "action_code", "position", "label_value", "label_class", "app_version",
"device_type", "device_brand", "device_model", "device_system", "net_type", "created_time", "date_time")
println("----------------------------------compute session id---------------------------------------------")
val sessionIdDF: DataFrame = sessionProcessHeart.getSessionId(baseDF )
//默认缓存级别是:MEMORY_AND_DISK
sessionIdDF.persist(StorageLevel.MEMORY_AND_DISK_SER)
println("-------------------------------match user_id 逻辑-------------------------------------------------")
val dwFactLogSession: DataFrame = sessionProcessHeart.matchUserId(sessionIdDF, scnDate)
println("-----------------create view fact_log_session and load to dw_fact_log_session--------------------")
dwFactLogSession.createOrReplaceTempView("fact_log_session")
//根据session_id以及user_id分组取最后一次心跳记录数据进行入库
val loadDataSql =
s"""
insert overwrite table ${MyConfigSession.HIVE_TABLE3} partition(created_day='${scnDate}')
select a.session_id,cast(a.user_id as int) user_id,a.mobile,a.device_token,a.user_token,
|a.app_version,a.device_type,a.device_brand,a.device_model,a.date_time
|from fact_log_session a,
| (select b.user_id, b.session_id , min(b.created_time) min_ct, max(b.created_time) max_ct
| from fact_log_session b
| group by b.user_id, b.session_id ) c
|where a.user_id = c.user_id and a.session_id = c.session_id and a.created_time = c.max_ct
|distribute by rand()
|""".stripMargin
sessionProcessHeart.sparkSession.sql(loadDataSql)
println("----------------------------------update task record table---------------------------------------")
//任务执行成功,更新 Mysql record 配置表
val updateSQL: String =
s"""
|update ${MyConfigSession.JDBC_TABLE} set status=?,end_time=?,data_count=? where job_name='pica_dw.dw_fact_log_session_heart' and start_time='${startTime}'
""".stripMargin
val endTime: String = DateUtils.getTodayTime
val upreSta: PreparedStatement = connSql.prepareStatement(updateSQL)
upreSta.setString(1, "1")
upreSta.setString(2, endTime)
upreSta.setInt(3, dwFactLogSession.count().toInt)
//更新表数据
upreSta.executeUpdate()
//关闭连接
JDBCUtil.close(connSql, upreSta)
sessionProcessHeart.sparkSession.stop()
} catch {
case e: Exception => {
println(s"-----------------------------------任务异常:e=${e}---------------------------------------------------")
e.printStackTrace()
val exceptionSQL: String =
s"""
|update ${MyConfigSession.JDBC_TABLE} set status=?,exception=?,end_time=? where job_name='pica_dw.dw_fact_log_session_heart' and start_time='${startTime}'
""".stripMargin
val errorArr = Array[String]("2", e.getMessage, DateUtils.getTodayTime)
JDBCUtil.insertRecord(connSql, exceptionSQL, errorArr)
connSql.close()
}
}
}
}
class SessionProcessHeart extends java.io.Serializable{ class SessionProcessHeart extends java.io.Serializable{
def getSparkSession(appName: String): SparkSession = { def getSparkSession(appName: String): SparkSession = {
val conf: SparkConf = new SparkConf().setAppName(appName) val conf: SparkConf = new SparkConf().setAppName(appName)
...@@ -46,7 +163,33 @@ class SessionProcessHeart extends java.io.Serializable{ ...@@ -46,7 +163,33 @@ class SessionProcessHeart extends java.io.Serializable{
val actionCategory = val actionCategory =
UseUtil.getBroadcast(sparkSession,MyConfigSession.ACTION_CATEGORY_SQL_HEART,"action_type","action_category") UseUtil.getBroadcast(sparkSession,MyConfigSession.ACTION_CATEGORY_SQL_HEART,"action_type","action_category")
val INIT_USER_ID_SQL =
s"""
|SELECT t.session_id, COALESCE(cast(b.id as string),'0') AS user_id, t.mobile, t.device_token, t.user_token,
|t.view_class,t.view_path,t.action_type,t.component_tag, t.menu_code,
|t.action_code, t.position, t.label_value,t.label_class ,t.app_version,t.device_type,
|t.device_brand, t.device_model, t.device_system,t.net_type,t.created_time,
|t.date_time from ${MyConfigSession.VIEW_SESSION_ODS} as t
|left join pica_ds.pica_doctor as b on t.user_id = cast(b.id as string)
""".stripMargin
val MOBILE_PHONE_SQL: String =
s"""
|SELECT ss.session_id, COALESCE(cast(b.id as string),'0') AS user_id, ss.mobile, ss.device_token, ss.user_token,
|ss.view_class,ss.view_path,ss.action_type,ss.component_tag, ss.menu_code,
|ss.action_code, ss.position,ss.label_value,ss.label_class, ss.app_version, ss.device_type,
|ss.device_brand, ss.device_model,ss.device_system, ss.net_type,ss.created_time,
|ss.date_time from ${MyConfigSession.VIEW_SESSION_NO_MATCH} AS ss
|left join (select distinct id,mobile_phone from pica_ds.pica_doctor where pica_doctor.delete_flag = 1 ) AS b on ss.mobile = b.mobile_phone
""".stripMargin
val DEVICE_TOKEN_SQL: String =
s"""
|SELECT t.session_id, COALESCE(cast(b.user_id as string),'0') AS user_id, t.mobile, t.device_token, t.user_token,
|t.view_class,t.view_path,t.action_type,t.component_tag, t.menu_code,
|t.action_code, t.position, t.label_value,t.label_class, t.app_version,t.device_type,
|t.device_brand, t.device_model, t.device_system,t.net_type,t.created_time,
|t.date_time from (select * from ${MyConfigSession.VIEW_MOBILE_PHONE} a where a.user_id= '0' ) as t
|left join ${MyConfigSession.VIEW_EQUIPMENT_INFO} as b on t.device_token = b.device_token
""".stripMargin
//定义函数式变量,过滤映射表数据 //定义函数式变量,过滤映射表数据
val filterRows: Iterator[Row] => Iterator[Row] = (rows: Iterator[Row]) => { val filterRows: Iterator[Row] => Iterator[Row] = (rows: Iterator[Row]) => {
val rowList: ListBuffer[Row] = new ListBuffer[Row]() val rowList: ListBuffer[Row] = new ListBuffer[Row]()
...@@ -66,14 +209,14 @@ class SessionProcessHeart extends java.io.Serializable{ ...@@ -66,14 +209,14 @@ class SessionProcessHeart extends java.io.Serializable{
if (action_type.equals("ACTION_CLICK") || action_type.equals("ACTION_EXPOSE")) { if (action_type.equals("ACTION_CLICK") || action_type.equals("ACTION_EXPOSE")) {
//判断 component_tag 必须要包含 "#" //判断 component_tag 必须要包含 "#"
if (row.getAs[String]("component_tag") != null if (row.getAs[String]("component_tag") != null
&& row.getAs[String]("component_tag").contains("#")) { && row.getAs[String]("component_tag").contains("#")) {
rowList += row rowList += row
} }
} else if (action_type.equals("ACTION_VIEW")) { } else if (action_type.equals("ACTION_VIEW")) {
rowList += row rowList += row
//非上述三种action_type,那么需要过滤掉映射表中class_name为"0"对应的那些数据 //非上述三种action_type,那么需要过滤掉映射表中class_name为"0"对应的那些数据
} else if (row.getAs[String]("class_name") != null } else if (row.getAs[String]("class_name") != null
&& !classNameMap.getOrElse(row.getAs[String]("class_name"), "-1").equals("0")) { && !classNameMap.getOrElse(row.getAs[String]("class_name"), "-1").equals("0")) {
rowList += row rowList += row
} }
} }
...@@ -84,7 +227,7 @@ class SessionProcessHeart extends java.io.Serializable{ ...@@ -84,7 +227,7 @@ class SessionProcessHeart extends java.io.Serializable{
//处理字段,得到需要的字段值 //处理字段,得到需要的字段值
val processColumns = (rows: Iterator[Row]) => { val processColumns = (rows: Iterator[Row]) => {
val baseList = new ListBuffer[(String, String, String, String, String, String, String, String, String, val baseList = new ListBuffer[(String, String, String, String, String, String, String, String, String,
String, String, String, String, String, String, String, String, String, String, String,String, String)]() String, String, String, String, String, String, String, String, String, String, String,String, String)]()
//关联到menu_code的映射表广播变量 //关联到menu_code的映射表广播变量
val path_menu: Map[String, String] = menuCodeBroad.value val path_menu: Map[String, String] = menuCodeBroad.value
//关联到action_category的映射表广播变量 //关联到action_category的映射表广播变量
...@@ -161,20 +304,20 @@ class SessionProcessHeart extends java.io.Serializable{ ...@@ -161,20 +304,20 @@ class SessionProcessHeart extends java.io.Serializable{
} }
//一行数据添加到List中 //一行数据添加到List中
baseList += ((StringUtils.getNotNullString(row.getAs[String]("pseudo_session")), baseList += ((StringUtils.getNotNullString(row.getAs[String]("pseudo_session")),
StringUtils.getNotNullString(row.getAs[String]("doctor_id")), StringUtils.getNotNullString(row.getAs[String]("doctor_id")),
StringUtils.getNotNullString(row.getAs[String]("mobile")), StringUtils.getNotNullString(row.getAs[String]("mobile")),
StringUtils.getNotNullString(row.getAs[String]("device_token")), StringUtils.getNotNullString(row.getAs[String]("device_token")),
StringUtils.getNotNullString(row.getAs[String]("user_token_tourist")), StringUtils.getNotNullString(row.getAs[String]("user_token_tourist")),
StringUtils.getNotNullString(row.getAs[String]("class_name")), StringUtils.getNotNullString(row.getAs[String]("class_name")),
StringUtils.getNotNullString(row.getAs[String]("view_path")), StringUtils.getNotNullString(row.getAs[String]("view_path")),
action_type, component_tag, menu_code, action_code, position, label_value,label_class, action_type, component_tag, menu_code, action_code, position, label_value,label_class,
StringUtils.getNotNullString(row.getAs[String]("app_version")), StringUtils.getNotNullString(row.getAs[String]("app_version")),
StringUtils.getNotNullString(row.getAs[String]("device_type")), StringUtils.getNotNullString(row.getAs[String]("device_type")),
StringUtils.getNotNullString(row.getAs[String]("device_brand")), StringUtils.getNotNullString(row.getAs[String]("device_brand")),
StringUtils.getNotNullString(row.getAs[String]("device_model")), StringUtils.getNotNullString(row.getAs[String]("device_model")),
"", net_type, "", net_type,
StringUtils.getNotNullString(row.getAs[String]("created")), StringUtils.getNotNullString(row.getAs[String]("created")),
DateUtils.milliSecondsFormatTime(StringUtils.getNotNullString(row.getAs[String]("created"))))) DateUtils.milliSecondsFormatTime(StringUtils.getNotNullString(row.getAs[String]("created")))))
}) })
baseList.iterator baseList.iterator
} }
...@@ -189,7 +332,7 @@ class SessionProcessHeart extends java.io.Serializable{ ...@@ -189,7 +332,7 @@ class SessionProcessHeart extends java.io.Serializable{
var count: Int = 0 var count: Int = 0
//存储一行的数据 //存储一行的数据
val list = new ListBuffer[(String, String, String, String, String, String, String, String, String, val list = new ListBuffer[(String, String, String, String, String, String, String, String, String,
String, String, String, String, String, String, String, String, String, String,String, String, String)]() String, String, String, String, String, String, String, String, String, String,String, String, String)]()
rowList.toList.foreach(row => { rowList.toList.foreach(row => {
val created: String = StringUtils.getNotNullString(row.getAs[String]("created_time")) val created: String = StringUtils.getNotNullString(row.getAs[String]("created_time"))
val refer_created: String = StringUtils.getNotNullString(row.getAs[String]("refer_created")) val refer_created: String = StringUtils.getNotNullString(row.getAs[String]("refer_created"))
...@@ -200,153 +343,36 @@ class SessionProcessHeart extends java.io.Serializable{ ...@@ -200,153 +343,36 @@ class SessionProcessHeart extends java.io.Serializable{
} }
//添加到List //添加到List
list += ((sessionID + count, list += ((sessionID + count,
StringUtils.getNotNullString(row.getAs[String]("user_id")), StringUtils.getNotNullString(row.getAs[String]("user_id")),
StringUtils.getNotNullString(row.getAs[String]("mobile")), StringUtils.getNotNullString(row.getAs[String]("mobile")),
StringUtils.getNotNullString(row.getAs[String]("device_token")), StringUtils.getNotNullString(row.getAs[String]("device_token")),
StringUtils.getNotNullString(row.getAs[String]("user_token")), StringUtils.getNotNullString(row.getAs[String]("user_token")),
StringUtils.getNotNullString(row.getAs[String]("view_class")), StringUtils.getNotNullString(row.getAs[String]("view_class")),
StringUtils.getNotNullString(row.getAs[String]("view_path")), StringUtils.getNotNullString(row.getAs[String]("view_path")),
StringUtils.getNotNullString(row.getAs[String]("action_type")), StringUtils.getNotNullString(row.getAs[String]("action_type")),
StringUtils.getNotNullString(row.getAs[String]("component_tag")), StringUtils.getNotNullString(row.getAs[String]("component_tag")),
StringUtils.getNotNullString(row.getAs[String]("menu_code")), StringUtils.getNotNullString(row.getAs[String]("menu_code")),
StringUtils.getNotNullString(row.getAs[String]("action_code")), StringUtils.getNotNullString(row.getAs[String]("action_code")),
StringUtils.getNotNullString(row.getAs[String]("position")), StringUtils.getNotNullString(row.getAs[String]("position")),
StringUtils.getNotNullString(row.getAs[String]("label_value")), StringUtils.getNotNullString(row.getAs[String]("label_value")),
StringUtils.getNotNullString(row.getAs[String]("label_class")), StringUtils.getNotNullString(row.getAs[String]("label_class")),
StringUtils.getNotNullString(row.getAs[String]("app_version")), StringUtils.getNotNullString(row.getAs[String]("app_version")),
StringUtils.getNotNullString(row.getAs[String]("device_type")), StringUtils.getNotNullString(row.getAs[String]("device_type")),
StringUtils.getNotNullString(row.getAs[String]("device_brand")), StringUtils.getNotNullString(row.getAs[String]("device_brand")),
StringUtils.getNotNullString(row.getAs[String]("device_model")), StringUtils.getNotNullString(row.getAs[String]("device_model")),
StringUtils.getNotNullString(row.getAs[String]("device_system")), StringUtils.getNotNullString(row.getAs[String]("device_system")),
StringUtils.getNotNullString(row.getAs[String]("net_type")), StringUtils.getNotNullString(row.getAs[String]("net_type")),
created, StringUtils.getNotNullString(row.getAs[String]("date_time")))) created, StringUtils.getNotNullString(row.getAs[String]("date_time"))))
}) })
list list
} }
}
object SessionProcessHeart {
def apply(): SessionProcessHeart = new SessionProcessHeart()
def main(args: Array[String]): Unit = {
if(args.length<1){
System.err.println("Usage: SessionProcessHeart <yyyy-MM-dd>")
System.exit(1)
}
//1.执行任务之前先往record表记录
val insertSQL: String =
s"""
|insert into ${MyConfigSession.DATA_BASE}.${MyConfigSession.JDBC_TABLE} (job_id,job_name,job_type,job_scn,status,start_time)
|values(0,'pica_dw.dw_fact_log_session_heart','3',?,'0',?)
""".stripMargin
//设置同步数据的批次号,格式是2019-09-12
val scnDate: String = args(0)
//设置任务开始时间,格式是2019-09-12 14:03:30
val startTime: String = DateUtils.getTodayTime
//存储SQL中的参数
val insertArr: Array[String] = Array[String](scnDate, startTime)
//获取MYSQL连接
val connSql: sql.Connection = JDBCUtil.getConnection()
//向 record 表插入数据
val flag: Int = JDBCUtil.insertRecord(connSql, insertSQL, insertArr)
try {
val sessionProcessHeart: SessionProcessHeart = SessionProcessHeart()
//获取源数据
val sourceDF: DataFrame = sessionProcessHeart.sparkSession.sql(MyConfigSession.SOURCE_SQL_ARGS + s" and created_day='${scnDate}'")
//1.重新分区,产生shuffle,Spark读Hive默认的分区数太少
//2.过滤数据,解析创建时间,只获取昨天产生的数据
//3.过滤重复的记录
//4.利用action_type和class_name过滤
val filterDS: Dataset[Row] = sourceDF.repartition(200).filter(row => {
var createdTime: String = row.getAs[String]("created")
//防止出错
if (createdTime == null) {
createdTime = "0"
}
//注意这里的过滤条件,数据的批次时间要和数据产生的年月日一样,也就是当天的数据
scnDate.equals(DateUtils.milliSecondsFormatTime(createdTime).substring(0, 10))
}).distinct()
import sessionProcessHeart.sparkSession.implicits._
//根据映射表来进行action_type和class_name数据过滤
val data: RDD[Row] = filterDS.rdd.mapPartitions(sessionProcessHeart.filterRows)
println("---------------------------------------process columns-------------------------------------------")
val baseDF: DataFrame = data.mapPartitions(sessionProcessHeart.processColumns)
.toDF("pseudo_session", "user_id", "mobile", "device_token", "user_token", "view_class", "view_path",
"action_type", "component_tag", "menu_code", "action_code", "position", "label_value", "label_class", "app_version",
"device_type", "device_brand", "device_model", "device_system", "net_type", "created_time", "date_time")
println("----------------------------------compute session id---------------------------------------------")
val sessionIdDF: DataFrame = getSessionId(baseDF,sessionProcessHeart)
//默认缓存级别是:MEMORY_AND_DISK
sessionIdDF.persist(StorageLevel.MEMORY_AND_DISK_SER)
println("-------------------------------match user_id 逻辑-------------------------------------------------")
val dwFactLogSession: DataFrame = matchUserId(sessionIdDF,sessionProcessHeart.sparkSession,scnDate)
println("-----------------create view fact_log_session and load to dw_fact_log_session--------------------")
dwFactLogSession.createOrReplaceTempView("fact_log_session")
//根据session_id以及user_id分组取最后一次心跳记录数据进行入库
val loadDataSql =
s"""
insert overwrite table ${MyConfigSession.HIVE_TABLE3} partition(created_day='${scnDate}')
select a.session_id,cast(a.user_id as int) user_id,a.mobile,a.device_token,a.user_token,
|a.app_version,a.device_type,a.device_brand,a.device_model,a.date_time
|from fact_log_session a,
| (select b.user_id, b.session_id , min(b.created_time) min_ct, max(b.created_time) max_ct
| from fact_log_session b
| group by b.user_id, b.session_id ) c
|where a.user_id = c.user_id and a.session_id = c.session_id and a.created_time = c.max_ct
|distribute by rand()
|""".stripMargin
sessionProcessHeart.sparkSession.sql(loadDataSql)
println("----------------------------------update task record table---------------------------------------")
//任务执行成功,更新 Mysql record 配置表
val updateSQL: String =
s"""
|update ${MyConfigSession.JDBC_TABLE} set status=?,end_time=?,data_count=? where job_name='pica_dw.dw_fact_log_session_heart' and start_time='${startTime}'
""".stripMargin
val endTime: String = DateUtils.getTodayTime
val upreSta: PreparedStatement = connSql.prepareStatement(updateSQL)
upreSta.setString(1, "1")
upreSta.setString(2, endTime)
upreSta.setInt(3, dwFactLogSession.count().toInt)
//更新表数据
upreSta.executeUpdate()
//关闭连接
JDBCUtil.close(connSql, upreSta)
sessionProcessHeart.sparkSession.stop()
} catch {
case e: Exception => {
println(s"-----------------------------------任务异常:e=${e}---------------------------------------------------")
e.printStackTrace()
val exceptionSQL: String =
s"""
|update ${MyConfigSession.JDBC_TABLE} set status=?,exception=?,end_time=? where job_name='pica_dw.dw_fact_log_session_heart' and start_time='${startTime}'
""".stripMargin
val errorArr = Array[String]("2", e.getMessage, DateUtils.getTodayTime)
JDBCUtil.insertRecord(connSql, exceptionSQL, errorArr)
connSql.close()
}
}
}
/** /**
* @Description 按照TimeGap切割session,重命名session_id * @Description 按照TimeGap切割session,重命名session_id
* @param dataFrame 要处理的DataFrame * @param dataFrame 要处理的DataFrame
* @param sessionProcessHeart SessionProcess对象,包含SparkSession 环境
* @return org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> * @return org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>
**/ **/
def getSessionId(dataFrame: DataFrame,sessionProcessHeart: SessionProcessHeart):DataFrame = { def getSessionId(dataFrame: DataFrame ):DataFrame = {
import sessionProcessHeart.sparkSession.implicits._ import sparkSession.implicits._
//先按照 pseudo_session 分组,然后按照 created 排序,组件一个窗口 //先按照 pseudo_session 分组,然后按照 created 排序,组件一个窗口
val pSessionWinSpec: WindowSpec = Window.partitionBy("pseudo_session").orderBy("created_time") val pSessionWinSpec: WindowSpec = Window.partitionBy("pseudo_session").orderBy("created_time")
...@@ -368,7 +394,7 @@ object SessionProcessHeart { ...@@ -368,7 +394,7 @@ object SessionProcessHeart {
coalesceDF.rdd.groupBy(row => row.getAs[String]("pseudo_session")) coalesceDF.rdd.groupBy(row => row.getAs[String]("pseudo_session"))
//计算两者之差,这时候就得到了 session_id //计算两者之差,这时候就得到了 session_id
val sessionIdDF: DataFrame = groupRDD.map(sessionProcessHeart.computeSessionId).flatMap(it => it) val sessionIdDF: DataFrame = groupRDD.map( computeSessionId).flatMap(it => it)
.toDF("session_id", "user_id", "mobile", "device_token", "user_token", .toDF("session_id", "user_id", "mobile", "device_token", "user_token",
"view_class", "view_path", "action_type", "component_tag", "view_class", "view_path", "action_type", "component_tag",
"menu_code", "action_code", "position", "label_value","label_class", "menu_code", "action_code", "position", "label_value","label_class",
...@@ -381,15 +407,14 @@ object SessionProcessHeart { ...@@ -381,15 +407,14 @@ object SessionProcessHeart {
/** /**
* @Description 匹配user_id,补全数据中的user_id字段 * @Description 匹配user_id,补全数据中的user_id字段
* @param dataFrame 筛选后的数据 * @param dataFrame 筛选后的数据
* @param sparkSQLSession SparkSession 环境
* @param created_day 当前数据的日期,格式 "2020-03-01" * @param created_day 当前数据的日期,格式 "2020-03-01"
* @return org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> * @return org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>
**/ **/
def matchUserId(dataFrame: DataFrame,sparkSQLSession: SparkSession,created_day:String):DataFrame={ def matchUserId(dataFrame: DataFrame, created_day:String):DataFrame={
//追加:将dataFrame与pica_ds.pica_doctor根据user_id进行匹配,匹配不上的user_id置为'0' //追加:将dataFrame与pica_ds.pica_doctor根据user_id进行匹配,匹配不上的user_id置为'0'
println("matchUserId开始执行-----------------------------------") println("matchUserId开始执行-----------------------------------")
dataFrame.createOrReplaceTempView(MyConfigSession.VIEW_SESSION_ODS) dataFrame.createOrReplaceTempView(MyConfigSession.VIEW_SESSION_ODS)
val DF = sparkSQLSession.sql(MyConfigSession.INIT_USER_ID_SQL) val DF = sparkSession.sql( INIT_USER_ID_SQL)
//以下的所有逻辑是为了补全user_id字段 //以下的所有逻辑是为了补全user_id字段
//第一步:首先筛选出不符合的use_id数据,将这些user_id置为字符串'0' //第一步:首先筛选出不符合的use_id数据,将这些user_id置为字符串'0'
val noMatchUserIdDF: Dataset[Row] = DF.where("user_id ='' OR user_id = '0' OR LENGTH(user_id) = 24") val noMatchUserIdDF: Dataset[Row] = DF.where("user_id ='' OR user_id = '0' OR LENGTH(user_id) = 24")
...@@ -401,7 +426,7 @@ object SessionProcessHeart { ...@@ -401,7 +426,7 @@ object SessionProcessHeart {
noMatchUserIdDF.createOrReplaceTempView(MyConfigSession.VIEW_SESSION_NO_MATCH) noMatchUserIdDF.createOrReplaceTempView(MyConfigSession.VIEW_SESSION_NO_MATCH)
//1.筛选出上一步没有匹配到的user_id,先按照mobile_phone进行匹配 //1.筛选出上一步没有匹配到的user_id,先按照mobile_phone进行匹配
val mobilePhoneDF: DataFrame = sparkSQLSession.sql(MyConfigSession.MOBILE_PHONE_SQL) val mobilePhoneDF: DataFrame = sparkSession.sql( MOBILE_PHONE_SQL)
mobilePhoneDF.createOrReplaceTempView(MyConfigSession.VIEW_MOBILE_PHONE) mobilePhoneDF.createOrReplaceTempView(MyConfigSession.VIEW_MOBILE_PHONE)
//2.使用临时表equiment,筛选出为1的那条最新数据 //2.使用临时表equiment,筛选出为1的那条最新数据
var equipmentInfoSql = MyConfigSession.EQUIPMENT_INFO_SQL var equipmentInfoSql = MyConfigSession.EQUIPMENT_INFO_SQL
...@@ -409,11 +434,11 @@ object SessionProcessHeart { ...@@ -409,11 +434,11 @@ object SessionProcessHeart {
equipmentInfoSql = MyConfigSession.EQUIPMENT_INFO_SQL_ARGS+ s"'${created_day}'" equipmentInfoSql = MyConfigSession.EQUIPMENT_INFO_SQL_ARGS+ s"'${created_day}'"
} }
println(s"equipmentInfoSql=${equipmentInfoSql}") println(s"equipmentInfoSql=${equipmentInfoSql}")
val equipmentDF: DataFrame = sparkSQLSession.sql(equipmentInfoSql).where("row_d =1") val equipmentDF: DataFrame = sparkSession.sql(equipmentInfoSql).where("row_d =1")
equipmentDF.createOrReplaceTempView(MyConfigSession.VIEW_EQUIPMENT_INFO) equipmentDF.createOrReplaceTempView(MyConfigSession.VIEW_EQUIPMENT_INFO)
//3.将第2步筛选出来的数据按照device_token进行匹配,获得user_id //3.将第2步筛选出来的数据按照device_token进行匹配,获得user_id
val deviceTokenDF: DataFrame = sparkSQLSession.sql(MyConfigSession.DEVICE_TOKEN_SQL) val deviceTokenDF: DataFrame = sparkSession.sql( DEVICE_TOKEN_SQL)
//4.将上述三者union,最终导入表中的数据 //4.将上述三者union,最终导入表中的数据
val rightUserId: Dataset[Row] = DF.where("user_id !='' and user_id != '0' and LENGTH(user_id) !=24") val rightUserId: Dataset[Row] = DF.where("user_id !='' and user_id != '0' and LENGTH(user_id) !=24")
...@@ -431,8 +456,7 @@ object SessionProcessHeart { ...@@ -431,8 +456,7 @@ object SessionProcessHeart {
|ss.date_time from ${MyConfigSession.VIEW_DEVICE_TOKEN} AS ss |ss.date_time from ${MyConfigSession.VIEW_DEVICE_TOKEN} AS ss
|left join (select id,cast(id as string) id_str from pica_ds.pica_doctor a where a.delete_flag = 1 and to_date(a.creat_time) <= '${created_day}') AS b on ss.user_id = b.id_str |left join (select id,cast(id as string) id_str from pica_ds.pica_doctor a where a.delete_flag = 1 and to_date(a.creat_time) <= '${created_day}') AS b on ss.user_id = b.id_str
|""".stripMargin |""".stripMargin
val userIdDF: DataFrame = sparkSQLSession.sql(USER_ID_INT_SQL) val userIdDF: DataFrame = sparkSession.sql(USER_ID_INT_SQL)
userIdDF userIdDF
} }
} }
...@@ -218,41 +218,22 @@ class SessionProcessPref extends java.io.Serializable{ ...@@ -218,41 +218,22 @@ class SessionProcessPref extends java.io.Serializable{
// } // }
//3.拆分 component_tag字段 //3.拆分 component_tag字段
val component_tag: String = StringUtils.getNotNullString(row.getAs[String]("component_tag")) val component_tag: String = StringUtils.getNotNullString(row.getAs[String]("component_tag"))
var menu_code: String = "" val tagArr = Array("menu_code","action_code","position","label_value","label_class")
var action_code: String = "" val tagMap = scala.collection.immutable.Map[String,String]()
var position: String = "" tagArr.foreach(r=>tagMap.+(r->""))
var label_value: String = ""
var label_class:String = ""
//将符合要求的component_tag进行切割,获取 aciton_code,label_value //将符合要求的component_tag进行切割,获取 aciton_code,label_value
if (component_tag.contains("#")) { if (component_tag.contains("#")) {
//按照#号切割 //按照#号切割
val strs: Array[String] = component_tag.split("#") val strs: Array[String] = component_tag.split("#")
strs.length match { var index = 0
case 1 => { for (value <- strs) {
menu_code = strs(0) val filedName = tagArr.apply(index)
} if ("label_value".equals(filedName)) {
case 2 => { tagMap.+(filedName -> value.substring(0, math.min(250, strs(3).length)))
menu_code = strs(0) } else {
action_code = strs(1) tagMap.+(filedName -> value)
}
case 3 => {
menu_code = strs(0)
action_code = strs(1)
position = strs(2)
}
case 4 => {
menu_code = strs(0)
action_code = strs(1)
position = strs(2)
label_value = strs(3).substring(0,math.min(250,strs(3).length))
}
case _ => {
menu_code = strs(0)
action_code = strs(1)
position = strs(2)
label_value = strs(3).substring(0,math.min(250,strs(3).length))
label_class = strs(4).substring(0,math.min(250,strs(4).length))
} }
index += 1
} }
} }
// //匹配menu_code:如果上述截取出来的menu_code为(''||null||0||length(menu_code)>3 ) and action is ACTION_VIEW // //匹配menu_code:如果上述截取出来的menu_code为(''||null||0||length(menu_code)>3 ) and action is ACTION_VIEW
...@@ -282,7 +263,8 @@ class SessionProcessPref extends java.io.Serializable{ ...@@ -282,7 +263,8 @@ class SessionProcessPref extends java.io.Serializable{
StringUtils.getNotNullString(row.getAs[String]("user_token_tourist")), StringUtils.getNotNullString(row.getAs[String]("user_token_tourist")),
StringUtils.getNotNullString(row.getAs[String]("class_name")), StringUtils.getNotNullString(row.getAs[String]("class_name")),
StringUtils.getNotNullString(row.getAs[String]("view_path")), StringUtils.getNotNullString(row.getAs[String]("view_path")),
action, component_tag, menu_code, action_code, position, label_value,label_class, action, component_tag, tagMap("menu_code"), tagMap("action_code"),
tagMap("position"),tagMap("label_value"),tagMap("label_class"),
StringUtils.getNotNullString(row.getAs[String]("app_version")), StringUtils.getNotNullString(row.getAs[String]("app_version")),
StringUtils.getNotNullString(row.getAs[String]("device_type")), StringUtils.getNotNullString(row.getAs[String]("device_type")),
StringUtils.getNotNullString(row.getAs[String]("device_brand")), StringUtils.getNotNullString(row.getAs[String]("device_brand")),
...@@ -317,11 +299,11 @@ class SessionProcessPref extends java.io.Serializable{ ...@@ -317,11 +299,11 @@ class SessionProcessPref extends java.io.Serializable{
//以下的所有逻辑是为了补全user_id字段 //以下的所有逻辑是为了补全user_id字段
//第一步:首先筛选出不符合的use_id数据,将这些user_id置为字符串'0' //第一步:首先筛选出不符合的use_id数据,将这些user_id置为字符串'0'
val noMatchUserIdDF: Dataset[Row] = DF.where("user_id ='' OR user_id = '0' OR LENGTH(user_id) = 24") val noMatchUserIdDF: Dataset[Row] = DF.where("user_id ='' OR user_id = '0' OR LENGTH(user_id) = 24")
.selectExpr("pseudo_session","'0' as user_id","mobile","device_token","user_token", // .selectExpr("pseudo_session","'0' as user_id","mobile","device_token","user_token",
"view_class","view_path","action" ,"component_tag","menu_code", // "view_class","view_path","action" ,"component_tag","menu_code",
"action_code","position","label_value","label_class","app_version","device_type", // "action_code","position","label_value","label_class","app_version","device_type",
"device_brand","device_model","net_type","created_time", // "device_brand","device_model","net_type","created_time",
"date_time","web_data","web_data_type","alternate_info","login_state","first_app_version","serviceName") // "date_time","web_data","web_data_type","alternate_info","login_state","first_app_version","serviceName")
noMatchUserIdDF.createOrReplaceTempView(MyConfigSession.VIEW_SESSION_NO_MATCH) noMatchUserIdDF.createOrReplaceTempView(MyConfigSession.VIEW_SESSION_NO_MATCH)
//1.筛选出上一步没有匹配到的user_id,先按照mobile_phone进行匹配 //1.筛选出上一步没有匹配到的user_id,先按照mobile_phone进行匹配
......
Markdown 格式
0% or
您添加了 0 到此讨论。请谨慎行事。
先完成此消息的编辑!
想要评论请 注册