提交 25ed0ed0 编写于 作者: wuyunfeng's avatar wuyunfeng

更新dw_fact_log_session中user_id逻辑,不在pica_doctor中的值为0

上级 0315acc0
...@@ -16,6 +16,7 @@ object MyConfigSession { ...@@ -16,6 +16,7 @@ object MyConfigSession {
//流量表中的临时视图 //流量表中的临时视图
final val VIEW_SESSION_ODS: String = "ods_session"
final val VIEW_SESSION_NO_MATCH: String = "ods_session_no_user_id" final val VIEW_SESSION_NO_MATCH: String = "ods_session_no_user_id"
final val VIEW_EQUIPMENT_INFO: String = "equipment_info" final val VIEW_EQUIPMENT_INFO: String = "equipment_info"
final val VIEW_DEVICE_TOKEN: String = "device_token_match" final val VIEW_DEVICE_TOKEN: String = "device_token_match"
...@@ -62,6 +63,16 @@ object MyConfigSession { ...@@ -62,6 +63,16 @@ object MyConfigSession {
//匹配user_id的条件 //匹配user_id的条件
//0.使用pica_ds.pica_doctor表匹配,匹配不上的user_id值为'0'
final val INIT_USER_ID_SQL =
s"""
|SELECT t.session_id, COALESCE(cast(b.id as string),'0') AS user_id, t.mobile, t.device_token, t.user_token,
|t.view_class,t.view_path,t.action_type,t.component_tag, t.menu_code,
|t.action_code, t.position, t.label_value,t.app_version,t.device_type,
|t.device_brand, t.device_model, t.device_system,t.net_type,t.created_time,
|t.date_time from ${MyConfigSession.VIEW_SESSION_ODS} as t
|left join pica_ds.pica_doctor as b on t.user_id = cast(b.id as string)
""".stripMargin
//1.使用equipment表匹配,默认是昨天的 //1.使用equipment表匹配,默认是昨天的
final val EQUIPMENT_INFO_SQL: String = final val EQUIPMENT_INFO_SQL: String =
""" """
......
...@@ -358,9 +358,13 @@ object SessionProcess { ...@@ -358,9 +358,13 @@ object SessionProcess {
* @return org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> * @return org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>
**/ **/
def matchUserId(dataFrame: DataFrame,sparkSQLSession: SparkSession,created_day:String):DataFrame={ def matchUserId(dataFrame: DataFrame,sparkSQLSession: SparkSession,created_day:String):DataFrame={
//追加:将dataFrame与pica_ds.pica_doctor根据user_id进行匹配,匹配不上的user_id置为'0'
println("matchUserId开始执行-----------------------------------")
dataFrame.createOrReplaceTempView(MyConfigSession.VIEW_SESSION_ODS)
val DF = sparkSQLSession.sql(MyConfigSession.INIT_USER_ID_SQL)
//以下的所有逻辑是为了补全user_id字段 //以下的所有逻辑是为了补全user_id字段
//第一步:首先筛选出不符合的use_id数据,将这些user_id置为字符串'0' //第一步:首先筛选出不符合的use_id数据,将这些user_id置为字符串'0'
val noMatchUserIdDF: Dataset[Row] = dataFrame.where("user_id ='' OR user_id = '0' OR LENGTH(user_id) = 24") val noMatchUserIdDF: Dataset[Row] = DF.where("user_id ='' OR user_id = '0' OR LENGTH(user_id) = 24")
.selectExpr("session_id","'0' as user_id", "mobile", "device_token", "user_token", .selectExpr("session_id","'0' as user_id", "mobile", "device_token", "user_token",
"view_class", "view_path", "action_type", "component_tag", "view_class", "view_path", "action_type", "component_tag",
"menu_code", "action_code", "position", "label_value", "menu_code", "action_code", "position", "label_value",
...@@ -381,7 +385,7 @@ object SessionProcess { ...@@ -381,7 +385,7 @@ object SessionProcess {
//3.将上述三者union,最终导入表中的数据 //3.将上述三者union,最终导入表中的数据
val deviceToken: Dataset[Row] = deviceTokenDF.where("user_id !='0'") val deviceToken: Dataset[Row] = deviceTokenDF.where("user_id !='0'")
val rightUserId: Dataset[Row] = dataFrame.where("user_id !='' and user_id != '0' and LENGTH(user_id) !=24") val rightUserId: Dataset[Row] = DF.where("user_id !='' and user_id != '0' and LENGTH(user_id) !=24")
val dwFactLogSession: Dataset[Row] = rightUserId.union(deviceToken).union(mobilePhoneDF) val dwFactLogSession: Dataset[Row] = rightUserId.union(deviceToken).union(mobilePhoneDF)
dwFactLogSession dwFactLogSession
} }
......
...@@ -355,9 +355,13 @@ object SessionProcessArgs { ...@@ -355,9 +355,13 @@ object SessionProcessArgs {
* @return org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> * @return org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>
**/ **/
def matchUserId(dataFrame: DataFrame,sparkSQLSession: SparkSession,created_day:String):DataFrame={ def matchUserId(dataFrame: DataFrame,sparkSQLSession: SparkSession,created_day:String):DataFrame={
//追加:将dataFrame与pica_ds.pica_doctor根据user_id进行匹配,匹配不上的user_id置为'0'
println("matchUserId开始执行-----------------------------------")
dataFrame.createOrReplaceTempView(MyConfigSession.VIEW_SESSION_ODS)
val DF = sparkSQLSession.sql(MyConfigSession.INIT_USER_ID_SQL)
//以下的所有逻辑是为了补全user_id字段 //以下的所有逻辑是为了补全user_id字段
//第一步:首先筛选出不符合的use_id数据 //第一步:首先筛选出不符合的use_id数据
val noMatchUserIdDF: Dataset[Row] = dataFrame.where("user_id ='' OR user_id = '0' OR LENGTH(user_id) = 24") val noMatchUserIdDF: Dataset[Row] = DF.where("user_id ='' OR user_id = '0' OR LENGTH(user_id) = 24")
.selectExpr("session_id","'0' as user_id", "mobile", "device_token", "user_token", .selectExpr("session_id","'0' as user_id", "mobile", "device_token", "user_token",
"view_class", "view_path", "action_type", "component_tag", "view_class", "view_path", "action_type", "component_tag",
"menu_code", "action_code", "position", "label_value", "menu_code", "action_code", "position", "label_value",
...@@ -379,7 +383,7 @@ object SessionProcessArgs { ...@@ -379,7 +383,7 @@ object SessionProcessArgs {
//3.将上述三者union,最终导入表中的数据 //3.将上述三者union,最终导入表中的数据
val deviceToken: Dataset[Row] = deviceTokenDF.where("user_id !='0'") val deviceToken: Dataset[Row] = deviceTokenDF.where("user_id !='0'")
val rightUserId: Dataset[Row] = dataFrame.where("user_id !='' and user_id != '0' and LENGTH(user_id) !=24") val rightUserId: Dataset[Row] = DF.where("user_id !='' and user_id != '0' and LENGTH(user_id) !=24")
val dwFactLogSession: Dataset[Row] = rightUserId.union(deviceToken).union(mobilePhoneDF) val dwFactLogSession: Dataset[Row] = rightUserId.union(deviceToken).union(mobilePhoneDF)
dwFactLogSession dwFactLogSession
} }
......
Markdown 格式
0% or
您添加了 0 到此讨论。请谨慎行事。
先完成此消息的编辑!
想要评论请 注册