提交 2df226da 编写于 作者: wuyunfeng's avatar wuyunfeng

添加两个字段module_class1,module_class2

上级 d1b8a427
...@@ -14,6 +14,7 @@ object MyConfigSession { ...@@ -14,6 +14,7 @@ object MyConfigSession {
final val HIVE_TABLE1: String = "pica_dw.dw_fact_log_session" final val HIVE_TABLE1: String = "pica_dw.dw_fact_log_session"
final val HIVE_TABLE2: String = "pica_dw.dw_fact_log_session_path" final val HIVE_TABLE2: String = "pica_dw.dw_fact_log_session_path"
final val HIVE_TABLE3: String = "pica_dw.dw_fact_log_session_heart" final val HIVE_TABLE3: String = "pica_dw.dw_fact_log_session_heart"
final val HIVE_TABLE4: String = "pica_dw.dw_fact_log_session_pref"
//写入的文件路径 //写入的文件路径
final val PARQUET_PATH: String = "hdfs://bi-name1:8020/tmp/output/" final val PARQUET_PATH: String = "hdfs://bi-name1:8020/tmp/output/"
...@@ -34,7 +35,16 @@ object MyConfigSession { ...@@ -34,7 +35,16 @@ object MyConfigSession {
final val ACTION_CATEGORY_SQL_HEART: String = "select action_type,action_category from pica_dw.dw_dim_log_action_type where action_type='ACTION_HEART_BEAT'" final val ACTION_CATEGORY_SQL_HEART: String = "select action_type,action_category from pica_dw.dw_dim_log_action_type where action_type='ACTION_HEART_BEAT'"
final val ACTION_URLLABEL_SQL:String = "select url_content,label_value from pica_dw.dw_dim_log_action_urllabel " final val ACTION_URLLABEL_SQL:String = "select url_content,label_value from pica_dw.dw_dim_log_action_urllabel "
//从源表pica_log.picalog_trace_app_part中执行SQL获取源数据,这里获取昨天的
final val SOURCE_SQL_PREF: String =
"""
|select device_token,pseudo_session,class_name,action,view_path,component_tag,created,mobile,doctor_id,device_brand,device_model,app_version,
|device_type,web_data,web_data_type,alternate_info,network_type,remark1 login_state,remark2 first_app_version,user_token_tourist,serviceName
| from pica_log.picalog_trace_app_part
| where pseudo_session is not null and pseudo_session !=''
| and pseudo_id !='' and extra_info !='com.picahealth.patient' and serviceName != 'trace3'
| and FROM_UNIXTIME(cast(substring(created,1,10) as bigint),'yyyy-MM-dd')=created_day and created is not null and created!=''
""".stripMargin
//从源表pica_log.picalog_trace_app_part中执行SQL获取源数据,这里获取昨天的 //从源表pica_log.picalog_trace_app_part中执行SQL获取源数据,这里获取昨天的
final val SOURCE_SQL: String = final val SOURCE_SQL: String =
""" """
...@@ -61,7 +71,7 @@ object MyConfigSession { ...@@ -61,7 +71,7 @@ object MyConfigSession {
final val SOURCE_SQL_PATH: String = final val SOURCE_SQL_PATH: String =
s""" s"""
|select id log_session_id, session_id,user_id_int user_id,action_type,user_token,menu_code,action_code,position,label_value,label_class, |select id log_session_id, session_id,user_id_int user_id,action_type,user_token,menu_code,action_code,position,label_value,label_class,
|app_version,device_type,created_time,date_time from ${MyConfigSession.HIVE_TABLE1} |app_version,device_type,created_time,date_time,module_class1,module_class2 from ${MyConfigSession.HIVE_TABLE1}
| where app_version >= '3.1.7' | where app_version >= '3.1.7'
| AND ((action_type ='ACTION_CLICK' and action_code != 'null' ) OR action_type ='ACTION_VIEW' ) | AND ((action_type ='ACTION_CLICK' and action_code != 'null' ) OR action_type ='ACTION_VIEW' )
| and (menu_code != '0' and menu_code !='null' and menu_code !='' and length(menu_code) <= 3 ) | and (menu_code != '0' and menu_code !='null' and menu_code !='' and length(menu_code) <= 3 )
...@@ -75,20 +85,40 @@ object MyConfigSession { ...@@ -75,20 +85,40 @@ object MyConfigSession {
s""" s"""
|SELECT t.session_id, COALESCE(cast(b.id as string),'0') AS user_id, t.mobile, t.device_token, t.user_token, |SELECT t.session_id, COALESCE(cast(b.id as string),'0') AS user_id, t.mobile, t.device_token, t.user_token,
|t.view_class,t.view_path,t.action_type,t.component_tag, t.menu_code, |t.view_class,t.view_path,t.action_type,t.component_tag, t.menu_code,
|t.action_code, t.position, t.label_value,t.label_class,t.app_version,t.device_type, |t.action_code, t.position, t.label_value,t.label_class,t.module_class1,t.module_class2,t.app_version,t.device_type,
|t.device_brand, t.device_model, t.device_system,t.net_type,t.created_time, |t.device_brand, t.device_model, t.device_system,t.net_type,t.created_time,
|t.date_time from ${MyConfigSession.VIEW_SESSION_ODS} as t |t.date_time from ${MyConfigSession.VIEW_SESSION_ODS} as t
|left join pica_ds.pica_doctor as b on t.user_id = cast(b.id as string) |left join pica_ds.pica_doctor as b on t.user_id = cast(b.id as string)
""".stripMargin
final val INIT_USER_ID_SQL_PREF =
s"""
|SELECT t.pseudo_session, COALESCE(cast(b.id as string),'0') AS user_id, t.mobile, t.device_token, t.user_token,
|t.view_class,t.view_path,t.action, t.component_tag, t.menu_code,
|t.action_code, t.position, t.label_value,t.label_class,t.app_version,t.device_type,
|t.device_brand, t.device_model, t.net_type,t.created_time,t.date_time,
|t.web_data,t.web_data_type,t.alternate_info,t.login_state,t.first_app_version,t.serviceName
| from ${MyConfigSession.VIEW_SESSION_ODS} as t
|left join pica_ds.pica_doctor as b on t.user_id = cast(b.id as string)
""".stripMargin """.stripMargin
//1.针对没有匹配到的user_id,先使用 mobile_phone 进行匹配,得到 user_id 匹配,'0' //1.针对没有匹配到的user_id,先使用 mobile_phone 进行匹配,得到 user_id 匹配,'0'
final val MOBILE_PHONE_SQL: String = final val MOBILE_PHONE_SQL: String =
s""" s"""
|SELECT ss.session_id, COALESCE(cast(b.id as string),'0') AS user_id, ss.mobile, ss.device_token, ss.user_token, |SELECT ss.session_id, COALESCE(cast(b.id as string),'0') AS user_id, ss.mobile, ss.device_token, ss.user_token,
|ss.view_class,ss.view_path,ss.action_type,ss.component_tag, ss.menu_code, |ss.view_class,ss.view_path,ss.action_type,ss.component_tag, ss.menu_code,
|ss.action_code, ss.position,ss.label_value,ss.label_class,ss.app_version, ss.device_type, |ss.action_code, ss.position,ss.label_value,ss.label_class,ss.module_class1,ss.module_class2,ss.app_version, ss.device_type,
|ss.device_brand, ss.device_model,ss.device_system,ss.net_type,ss.created_time, |ss.device_brand, ss.device_model,ss.device_system, ss.net_type,ss.created_time,
|ss.date_time from ${MyConfigSession.VIEW_SESSION_NO_MATCH} AS ss |ss.date_time from ${MyConfigSession.VIEW_SESSION_NO_MATCH} AS ss
|left join (select distinct id,mobile_phone from pica_ds.pica_doctor where pica_doctor.delete_flag = 1 ) AS b on ss.mobile = b.mobile_phone |left join (select distinct id,mobile_phone from pica_ds.pica_doctor where pica_doctor.delete_flag = 1 ) AS b on ss.mobile = b.mobile_phone
""".stripMargin
final val MOBILE_PHONE_SQL_PREF: String =
s"""
|SELECT ss.pseudo_session,COALESCE(cast(b.id as string),'0') AS user_id,ss.mobile,ss.device_token,ss.user_token,
|ss.view_class,ss.view_path,ss.action, ss.component_tag,ss.menu_code,
|ss.action_code,ss.position,ss.label_value,ss.label_class,ss.app_version,ss.device_type,
|ss.device_brand,ss.device_model, ss.net_type,ss.created_time,
|ss.date_time,ss.web_data,ss.web_data_type,ss.alternate_info,ss.login_state,ss.first_app_version,ss.serviceName
| from ${MyConfigSession.VIEW_SESSION_NO_MATCH} AS ss
|left join (select distinct id,mobile_phone from pica_ds.pica_doctor where pica_doctor.delete_flag = 1 ) AS b on ss.mobile = b.mobile_phone
""".stripMargin """.stripMargin
//2.使用equipment表匹配,默认是昨天的 //2.使用equipment表匹配,默认是昨天的
final val EQUIPMENT_INFO_SQL: String = final val EQUIPMENT_INFO_SQL: String =
...@@ -112,12 +142,21 @@ object MyConfigSession { ...@@ -112,12 +142,21 @@ object MyConfigSession {
s""" s"""
|SELECT t.session_id, COALESCE(cast(b.user_id as string),'0') AS user_id, t.mobile, t.device_token, t.user_token, |SELECT t.session_id, COALESCE(cast(b.user_id as string),'0') AS user_id, t.mobile, t.device_token, t.user_token,
|t.view_class,t.view_path,t.action_type,t.component_tag, t.menu_code, |t.view_class,t.view_path,t.action_type,t.component_tag, t.menu_code,
|t.action_code, t.position, t.label_value,t.label_class,t.app_version,t.device_type, |t.action_code, t.position, t.label_value,t.label_class,t.module_class1,t.module_class2,t.app_version,t.device_type,
|t.device_brand, t.device_model, t.device_system,t.net_type,t.created_time, |t.device_brand, t.device_model, t.device_system,t.net_type,t.created_time,
|t.date_time from (select * from ${MyConfigSession.VIEW_MOBILE_PHONE} a where a.user_id= '0' ) as t |t.date_time from (select * from ${MyConfigSession.VIEW_MOBILE_PHONE} a where a.user_id= '0' ) as t
|left join ${MyConfigSession.VIEW_EQUIPMENT_INFO} as b on t.device_token = b.device_token |left join ${MyConfigSession.VIEW_EQUIPMENT_INFO} as b on t.device_token = b.device_token
""".stripMargin """.stripMargin
final val DEVICE_TOKEN_SQL_PREF: String =
s"""
|SELECT t.pseudo_session,COALESCE(cast(b.user_id as string),'0') AS user_id,t.mobile,t.device_token,t.user_token,
|t.view_class,t.view_path,t.action ,t.component_tag,t.menu_code,
|t.action_code,t.position,t.label_value,t.label_class,t.app_version,t.device_type,
|t.device_brand,t.device_model, t.net_type,t.created_time,
|t.date_time,t.web_data,t.web_data_type,t.alternate_info,t.login_state,t.first_app_version,t.serviceName
| from (select * from ${MyConfigSession.VIEW_MOBILE_PHONE} a where a.user_id= '0' ) as t
|left join ${MyConfigSession.VIEW_EQUIPMENT_INFO} as b on t.device_token = b.device_token
""".stripMargin
......
...@@ -113,7 +113,7 @@ object SessionProcess { ...@@ -113,7 +113,7 @@ object SessionProcess {
val baseDF: DataFrame = data.mapPartitions(sessionProcess.processColumns) val baseDF: DataFrame = data.mapPartitions(sessionProcess.processColumns)
.toDF("pseudo_session", "user_id", "mobile", "device_token", "user_token", "view_class", "view_path", .toDF("pseudo_session", "user_id", "mobile", "device_token", "user_token", "view_class", "view_path",
"action_type", "component_tag", "menu_code", "action_code", "position", "label_value","label_class", "app_version", "action_type", "component_tag", "menu_code", "action_code", "position", "label_value","label_class","module_class1","module_class2", "app_version",
"device_type", "device_brand", "device_model", "device_system", "net_type", "created_time", "date_time") "device_type", "device_brand", "device_model", "device_system", "net_type", "created_time", "date_time")
println("----------------------------------compute session id---------------------------------------------") println("----------------------------------compute session id---------------------------------------------")
val sessionIdDF: DataFrame = sessionProcess.getSessionId(baseDF,sessionProcess) val sessionIdDF: DataFrame = sessionProcess.getSessionId(baseDF,sessionProcess)
...@@ -226,8 +226,9 @@ class SessionProcess extends java.io.Serializable{ ...@@ -226,8 +226,9 @@ class SessionProcess extends java.io.Serializable{
//处理字段,得到需要的字段值 //处理字段,得到需要的字段值
val processColumns = (rows: Iterator[Row]) => { val processColumns = (rows: Iterator[Row]) => {
val baseList = new ListBuffer[(String, String, String, String, String, String, String, String, String, // val baseList = new ListBuffer[(String, String, String, String, String, String, String, String, String,
String, String, String, String, String, String, String, String, String, String, String, String,String)]() // String, String, String, String, String, String, String, String, String, String, String, String,String,String,String)]()
val baseList = new ListBuffer[SessionLog]()
//关联到menu_code的映射表广播变量 //关联到menu_code的映射表广播变量
val path_menu: Map[String, String] = menuCodeBroad.value val path_menu: Map[String, String] = menuCodeBroad.value
//关联到action_category的映射表广播变量 //关联到action_category的映射表广播变量
...@@ -249,6 +250,8 @@ class SessionProcess extends java.io.Serializable{ ...@@ -249,6 +250,8 @@ class SessionProcess extends java.io.Serializable{
var position: String = "" var position: String = ""
var label_value: String = "" var label_value: String = ""
var label_class:String = "" var label_class:String = ""
var module_class1:String=""
var module_class2:String=""
//将符合要求的component_tag进行切割,获取 aciton_code,label_value //将符合要求的component_tag进行切割,获取 aciton_code,label_value
if (component_tag.contains("#")) { if (component_tag.contains("#")) {
//按照#号切割 //按照#号切割
...@@ -272,12 +275,29 @@ class SessionProcess extends java.io.Serializable{ ...@@ -272,12 +275,29 @@ class SessionProcess extends java.io.Serializable{
position = strs(2) position = strs(2)
label_value = strs(3).substring(0,math.min(250,strs(3).length)) label_value = strs(3).substring(0,math.min(250,strs(3).length))
} }
case 5 => {
menu_code = strs(0)
action_code = strs(1)
position = strs(2)
label_value = strs(3).substring(0,math.min(250,strs(3).length))
label_class = strs(4)
}
case 6 => {
menu_code = strs(0)
action_code = strs(1)
position = strs(2)
label_value = strs(3).substring(0,math.min(250,strs(3).length))
label_class = strs(4)
module_class1=strs(5)
}
case _ => { case _ => {
menu_code = strs(0) menu_code = strs(0)
action_code = strs(1) action_code = strs(1)
position = strs(2) position = strs(2)
label_value = strs(3).substring(0,math.min(250,strs(3).length)) label_value = strs(3).substring(0,math.min(250,strs(3).length))
label_class = strs(4).substring(0,math.min(250,strs(4).length)) label_class = strs(4)
module_class1=strs(5)
module_class2=strs(6)
} }
} }
} }
...@@ -305,21 +325,21 @@ class SessionProcess extends java.io.Serializable{ ...@@ -305,21 +325,21 @@ class SessionProcess extends java.io.Serializable{
} }
} }
//一行数据添加到List中 //一行数据添加到List中
baseList += ((StringUtils.getNotNullString(row.getAs[String]("pseudo_session")), baseList += SessionLog(StringUtils.getNotNullString(row.getAs[String]("pseudo_session")),
StringUtils.getNotNullString(row.getAs[String]("doctor_id")), StringUtils.getNotNullString(row.getAs[String]("doctor_id")),
StringUtils.getNotNullString(row.getAs[String]("mobile")), StringUtils.getNotNullString(row.getAs[String]("mobile")),
StringUtils.getNotNullString(row.getAs[String]("device_token")), StringUtils.getNotNullString(row.getAs[String]("device_token")),
StringUtils.getNotNullString(row.getAs[String]("user_token_tourist")), StringUtils.getNotNullString(row.getAs[String]("user_token_tourist")),
StringUtils.getNotNullString(row.getAs[String]("class_name")), StringUtils.getNotNullString(row.getAs[String]("class_name")),
StringUtils.getNotNullString(row.getAs[String]("view_path")), StringUtils.getNotNullString(row.getAs[String]("view_path")),
action_type, component_tag, menu_code, action_code, position, label_value,label_class, action_type, component_tag, menu_code, action_code, position, label_value,label_class,module_class1,module_class2,
StringUtils.getNotNullString(row.getAs[String]("app_version")), StringUtils.getNotNullString(row.getAs[String]("app_version")),
StringUtils.getNotNullString(row.getAs[String]("device_type")), StringUtils.getNotNullString(row.getAs[String]("device_type")),
StringUtils.getNotNullString(row.getAs[String]("device_brand")), StringUtils.getNotNullString(row.getAs[String]("device_brand")),
StringUtils.getNotNullString(row.getAs[String]("device_model")), StringUtils.getNotNullString(row.getAs[String]("device_model")),
"", net_type, "", net_type,
StringUtils.getNotNullString(row.getAs[String]("created")), StringUtils.getNotNullString(row.getAs[String]("created")),
DateUtils.milliSecondsFormatTime(StringUtils.getNotNullString(row.getAs[String]("created"))))) DateUtils.milliSecondsFormatTime(StringUtils.getNotNullString(row.getAs[String]("created"))))
}) })
baseList.iterator baseList.iterator
} }
...@@ -331,8 +351,9 @@ class SessionProcess extends java.io.Serializable{ ...@@ -331,8 +351,9 @@ class SessionProcess extends java.io.Serializable{
//定义一个累加量 //定义一个累加量
var count: Int = 0 var count: Int = 0
//存储一行的数据 //存储一行的数据
val list = new ListBuffer[(String, String, String, String, String, String, String, String, String, // val list = new ListBuffer[(String, String, String, String, String, String, String, String, String,
String, String, String, String, String, String, String, String, String, String, String, String, String)]() // String, String, String, String, String, String, String, String, String, String, String, String, String,String, String)]()
val list = new ListBuffer[SessionLog]()
rowList.toList.foreach(row => { rowList.toList.foreach(row => {
val created: String = StringUtils.getNotNullString(row.getAs[String]("created_time")) val created: String = StringUtils.getNotNullString(row.getAs[String]("created_time"))
val refer_created: String = StringUtils.getNotNullString(row.getAs[String]("refer_created")) val refer_created: String = StringUtils.getNotNullString(row.getAs[String]("refer_created"))
...@@ -342,7 +363,7 @@ class SessionProcess extends java.io.Serializable{ ...@@ -342,7 +363,7 @@ class SessionProcess extends java.io.Serializable{
count = count + 1 count = count + 1
} }
//添加到List //添加到List
list += ((sessionID + count, list += SessionLog(sessionID + count,
StringUtils.getNotNullString(row.getAs[String]("user_id")), StringUtils.getNotNullString(row.getAs[String]("user_id")),
StringUtils.getNotNullString(row.getAs[String]("mobile")), StringUtils.getNotNullString(row.getAs[String]("mobile")),
StringUtils.getNotNullString(row.getAs[String]("device_token")), StringUtils.getNotNullString(row.getAs[String]("device_token")),
...@@ -356,13 +377,15 @@ class SessionProcess extends java.io.Serializable{ ...@@ -356,13 +377,15 @@ class SessionProcess extends java.io.Serializable{
StringUtils.getNotNullString(row.getAs[String]("position")), StringUtils.getNotNullString(row.getAs[String]("position")),
StringUtils.getNotNullString(row.getAs[String]("label_value")), StringUtils.getNotNullString(row.getAs[String]("label_value")),
StringUtils.getNotNullString(row.getAs[String]("label_class")), StringUtils.getNotNullString(row.getAs[String]("label_class")),
StringUtils.getNotNullString(row.getAs[String]("module_class1")),
StringUtils.getNotNullString(row.getAs[String]("module_class2")),
StringUtils.getNotNullString(row.getAs[String]("app_version")), StringUtils.getNotNullString(row.getAs[String]("app_version")),
StringUtils.getNotNullString(row.getAs[String]("device_type")), StringUtils.getNotNullString(row.getAs[String]("device_type")),
StringUtils.getNotNullString(row.getAs[String]("device_brand")), StringUtils.getNotNullString(row.getAs[String]("device_brand")),
StringUtils.getNotNullString(row.getAs[String]("device_model")), StringUtils.getNotNullString(row.getAs[String]("device_model")),
StringUtils.getNotNullString(row.getAs[String]("device_system")), StringUtils.getNotNullString(row.getAs[String]("device_system")),
StringUtils.getNotNullString(row.getAs[String]("net_type")), StringUtils.getNotNullString(row.getAs[String]("net_type")),
created, StringUtils.getNotNullString(row.getAs[String]("date_time")))) created, StringUtils.getNotNullString(row.getAs[String]("date_time")))
}) })
list list
} }
...@@ -385,7 +408,7 @@ class SessionProcess extends java.io.Serializable{ ...@@ -385,7 +408,7 @@ class SessionProcess extends java.io.Serializable{
val coalesceDF: DataFrame = rcreDF.selectExpr( val coalesceDF: DataFrame = rcreDF.selectExpr(
"pseudo_session", "user_id", "mobile", "device_token", "pseudo_session", "user_id", "mobile", "device_token",
"user_token", "view_class", "view_path", "action_type", "user_token", "view_class", "view_path", "action_type",
"component_tag", "menu_code", "action_code", "position", "label_value","label_class", "component_tag", "menu_code", "action_code", "position", "label_value","label_class","module_class1","module_class2",
"app_version", "device_type", "device_brand", "device_model", "app_version", "device_type", "device_brand", "device_model",
"device_system", "net_type", "created_time", "date_time", "device_system", "net_type", "created_time", "date_time",
"COALESCE(refer_created,created_time) as refer_created") "COALESCE(refer_created,created_time) as refer_created")
...@@ -398,7 +421,7 @@ class SessionProcess extends java.io.Serializable{ ...@@ -398,7 +421,7 @@ class SessionProcess extends java.io.Serializable{
val sessionIdDF: DataFrame = groupRDD.map(sessionProcess.computeSessionId).flatMap(it => it) val sessionIdDF: DataFrame = groupRDD.map(sessionProcess.computeSessionId).flatMap(it => it)
.toDF("session_id", "user_id", "mobile", "device_token", "user_token", .toDF("session_id", "user_id", "mobile", "device_token", "user_token",
"view_class", "view_path", "action_type", "component_tag", "view_class", "view_path", "action_type", "component_tag",
"menu_code", "action_code", "position", "label_value","label_class", "menu_code", "action_code", "position", "label_value","label_class","module_class1","module_class2",
"app_version", "device_type", "device_brand", "device_model", "app_version", "device_type", "device_brand", "device_model",
"device_system", "net_type", "created_time", "date_time") "device_system", "net_type", "created_time", "date_time")
sessionIdDF sessionIdDF
...@@ -422,7 +445,7 @@ class SessionProcess extends java.io.Serializable{ ...@@ -422,7 +445,7 @@ class SessionProcess extends java.io.Serializable{
val noMatchUserIdDF: Dataset[Row] = DF.where("user_id ='' OR user_id = '0' OR LENGTH(user_id) = 24") val noMatchUserIdDF: Dataset[Row] = DF.where("user_id ='' OR user_id = '0' OR LENGTH(user_id) = 24")
.selectExpr("session_id","'0' as user_id", "mobile", "device_token", "user_token", .selectExpr("session_id","'0' as user_id", "mobile", "device_token", "user_token",
"view_class", "view_path", "action_type", "component_tag", "view_class", "view_path", "action_type", "component_tag",
"menu_code", "action_code", "position", "label_value","label_class", "menu_code", "action_code", "position", "label_value","label_class","module_class1","module_class2",
"app_version", "device_type", "device_brand", "device_model", "app_version", "device_type", "device_brand", "device_model",
"device_system", "net_type", "created_time", "date_time") "device_system", "net_type", "created_time", "date_time")
noMatchUserIdDF.createOrReplaceTempView(MyConfigSession.VIEW_SESSION_NO_MATCH) noMatchUserIdDF.createOrReplaceTempView(MyConfigSession.VIEW_SESSION_NO_MATCH)
...@@ -453,7 +476,7 @@ class SessionProcess extends java.io.Serializable{ ...@@ -453,7 +476,7 @@ class SessionProcess extends java.io.Serializable{
|SELECT concat(regexp_replace( '${created_day}',"-","") ,cast(row_number() over(partition by 1 order by created_time) as string)) as id, |SELECT concat(regexp_replace( '${created_day}',"-","") ,cast(row_number() over(partition by 1 order by created_time) as string)) as id,
|ss.session_id, ss.user_id,COALESCE(b.id,0) user_id_int, ss.mobile, ss.device_token, ss.user_token, |ss.session_id, ss.user_id,COALESCE(b.id,0) user_id_int, ss.mobile, ss.device_token, ss.user_token,
|ss.view_class,ss.view_path,ss.action_type,ss.component_tag, ss.menu_code, |ss.view_class,ss.view_path,ss.action_type,ss.component_tag, ss.menu_code,
|ss.action_code, ss.position,ss.label_value,ss.label_class,ss.app_version, ss.device_type, |ss.action_code, ss.position,ss.label_value,ss.label_class,ss.module_class1,ss.module_class2,ss.app_version, ss.device_type,
|ss.device_brand, ss.device_model,ss.device_system,ss.net_type,ss.created_time, |ss.device_brand, ss.device_model,ss.device_system,ss.net_type,ss.created_time,
|ss.date_time from ${MyConfigSession.VIEW_DEVICE_TOKEN} AS ss |ss.date_time from ${MyConfigSession.VIEW_DEVICE_TOKEN} AS ss
|left join (select id,cast(id as string) id_str from pica_ds.pica_doctor a where a.delete_flag = 1 and to_date(a.creat_time) <= '${created_day}') AS b on ss.user_id = b.id_str |left join (select id,cast(id as string) id_str from pica_ds.pica_doctor a where a.delete_flag = 1 and to_date(a.creat_time) <= '${created_day}') AS b on ss.user_id = b.id_str
...@@ -461,8 +484,30 @@ class SessionProcess extends java.io.Serializable{ ...@@ -461,8 +484,30 @@ class SessionProcess extends java.io.Serializable{
val userIdDF: DataFrame = sparkSQLSession.sql(USER_ID_INT_SQL) val userIdDF: DataFrame = sparkSQLSession.sql(USER_ID_INT_SQL)
userIdDF userIdDF
} }
} }
case class SessionLog(pseudo_session: String,
user_id: String,
mobile: String,
device_token: String,
user_token: String,
view_class: String,
view_path: String,
action_type: String,
component_tag: String,
menu_code: String,
action_code: String,
position: String,
label_value: String,
label_class: String,
module_class1: String,
module_class2: String,
app_version: String,
device_type: String,
device_brand: String,
device_model: String,
device_system: String,
net_type: String,
created_time: String,
date_time: String)
...@@ -121,9 +121,9 @@ object SessionProcessPath { ...@@ -121,9 +121,9 @@ object SessionProcessPath {
StringUtils.getNotNullString(row.getAs[String]("device_type")), StringUtils.getNotNullString(row.getAs[String]("device_type")),
StringUtils.getNotNullString(row.getAs[String]("created_time")), StringUtils.getNotNullString(row.getAs[String]("created_time")),
StringUtils.getNotNullString(row.getAs[String]("date_time")), StringUtils.getNotNullString(row.getAs[String]("date_time")),
StringUtils.getNotNullString(row.getAs[String]("module_class1")),
StringUtils.getNotNullString(row.getAs[String]("module_class2")),
row.getAs[Double]("refer_time_diff") row.getAs[Double]("refer_time_diff")
// StringUtils.getNotNullString(row.getAs[String]("module_class1")),
// StringUtils.getNotNullString(row.getAs[String]("module_class2"))
)) ))
}) })
rowList.iterator rowList.iterator
...@@ -150,6 +150,7 @@ object SessionProcessPath { ...@@ -150,6 +150,7 @@ object SessionProcessPath {
}catch { }catch {
case e:Exception => { case e:Exception => {
println("-----------------------------------任务异常---------------------------------------------------") println("-----------------------------------任务异常---------------------------------------------------")
e.printStackTrace()
val exceptionSQL: String = val exceptionSQL: String =
s""" s"""
|update ${MyConfigSession.JDBC_TABLE} set status=?,exception=?,end_time=? where job_id=1968 and start_time='${startTime}' |update ${MyConfigSession.JDBC_TABLE} set status=?,exception=?,end_time=? where job_id=1968 and start_time='${startTime}'
...@@ -194,7 +195,7 @@ object SessionProcessPath { ...@@ -194,7 +195,7 @@ object SessionProcessPath {
"COALESCE(refer_position,'') as refer_position", "COALESCE(refer_position,'') as refer_position",
"COALESCE(refer_action_type,'') as refer_action_type", "COALESCE(refer_action_type,'') as refer_action_type",
"COALESCE(refer_created,created_time) as refer_created", "COALESCE(refer_created,created_time) as refer_created",
"step_id", "app_version", "device_type", "created_time", "date_time") "step_id", "app_version", "device_type", "created_time", "date_time","module_class1","module_class2")
//在此基础上增加字段 refer_time_diff,值为 created_time, refer_created 之差 //在此基础上增加字段 refer_time_diff,值为 created_time, refer_created 之差
val referTimeDiff: DataFrame = val referTimeDiff: DataFrame =
...@@ -219,7 +220,7 @@ object SessionProcessPath { ...@@ -219,7 +220,7 @@ object SessionProcessPath {
| select log_session_id, session_id,user_id,action_type,user_token,menu_code,action_code,position,label_value,label_class, | select log_session_id, session_id,user_id,action_type,user_token,menu_code,action_code,position,label_value,label_class,
| refer_menu_code,refer_action_code,refer_position,refer_action_type, | refer_menu_code,refer_action_code,refer_position,refer_action_type,
| cast(refer_time_diff as int) as refer_time_diff, | cast(refer_time_diff as int) as refer_time_diff,
| step_id,app_version,device_type,created_time,date_time,'' module_class1,'' module_class2 | step_id,app_version,device_type,created_time,date_time, module_class1, module_class2
| from result_view distribute by rand() | from result_view distribute by rand()
""".stripMargin """.stripMargin
sparkSession.sql(loadDataSql) sparkSession.sql(loadDataSql)
......
package com.session
import java.sql
import java.sql.PreparedStatement
import com.config.MyConfigSession
import com.pica.utils.{DateUtils, StringUtils}
import com.utils.{JDBCUtil, UseUtil}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.expressions.{Window, WindowSpec}
import org.apache.spark.sql.functions.lag
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.storage.StorageLevel
import scala.collection.mutable.ListBuffer
/**
* 处理埋点数据,进行简单的清晰过滤,导入到DW层pica_dw.dw_fact_log_sesson_pref
* @Author yunfeng.wu
* @Date 2020/08/07 09:23
* @Version 1.0
*/
object SessionProcessPref {
def apply(): SessionProcessPref = new SessionProcessPref()
def main(args: Array[String]): Unit = {
//1.执行任务之前先往record表记录
val insertSQL: String =
s"""
|insert into ${MyConfigSession.DATA_BASE}.${MyConfigSession.JDBC_TABLE} (job_id,job_name,job_type,job_scn,status,start_time)
|values(1969,'pica_dw.dw_fact_log_session','3',?,'0',?)
""".stripMargin
//设置同步数据的批次号,格式是2019-09-12
var scnData: String = DateUtils.getYesterdayDate
if(args.length>=1){
scnData = args(0)
}
println(s"scnData=${scnData}")
//设置任务开始时间,格式是2019-09-12 14:03:30
val startTime: String = DateUtils.getTodayTime
//存储SQL中的参数
val insertArr: Array[String] = Array[String](scnData, startTime)
//获取MYSQL连接
val connSql: sql.Connection = JDBCUtil.getConnection()
//向 record 表插入数据
val flag: Int = JDBCUtil.insertRecord(connSql, insertSQL, insertArr)
try {
val sessionProcessPref: SessionProcessPref = SessionProcessPref()
//step1:获取源数据,重新分区,产生shuffle,Spark读Hive默认的分区数太少,并对数据去重
var sourceDF: DataFrame = sessionProcessPref.sparkSession.sql(MyConfigSession.SOURCE_SQL_PREF+s" and created_day='${scnData}'").repartition(200).distinct()
//step2:抽取出当天pseudo_session对应的非空的device_token,doctor_id,mobile,补充到对应的pseudo_session下这几项为空的记录中
val groupRdd = sourceDF.rdd.groupBy(r => r.getAs[String]("pseudo_session"))
val resRdd = groupRdd.flatMap(g => {
val pseudo_session = g._1
val resList: ListBuffer[Row] = new ListBuffer[Row]()
var rowList = g._2
rowList = rowList.toList.sortWith((x,y)=>x.getAs[String]("created") > y.getAs[String]("created"))//按created由大到小排序
var thisDeviceToken = ""
var thisDoctorId = "0"
var thisMobile = ""
rowList.foreach(row => {
var deviceToken = row.getAs[String]("device_token")
var doctorId = row.getAs[String]("doctor_id")
var mobile = row.getAs[String]("mobile")
if(deviceToken!=null && !deviceToken.equals("") ){
thisDeviceToken = deviceToken
}else {
deviceToken = thisDeviceToken
}
if(doctorId!=null && !doctorId.equals("") && !doctorId.equals("0") ){
thisDoctorId = doctorId
}else {
doctorId = thisDoctorId
}
if(mobile!=null && !mobile.equals("") ){
thisMobile = mobile
}else {
mobile = thisMobile
}
resList += (Row(deviceToken,
pseudo_session,
row.getAs[String]("class_name"),
row.getAs[String]("action"),
row.getAs[String]("view_path"),
row.getAs[String]("component_tag"),
row.getAs[String]("created"),
mobile,
doctorId,
row.getAs[String]("device_brand"),
row.getAs[String]("device_model"),
row.getAs[String]("app_version"),
row.getAs[String]("device_type"),
row.getAs[String]("web_data"),
row.getAs[String]("web_data_type"),
row.getAs[String]("alternate_info"),
row.getAs[String]("network_type"),
row.getAs[String]("login_state"),
row.getAs[String]("first_app_version"),
row.getAs[String]("user_token_tourist"),
row.getAs[String]("serviceName")
))
})
resList.iterator
})
import sessionProcessPref.sparkSession.implicits._
//step3:根据映射表来进行action_type和class_name数据过滤
val resDF = sessionProcessPref.sparkSession.createDataFrame(resRdd,sourceDF.schema)
resDF.persist(StorageLevel.MEMORY_AND_DISK_SER)
println("resDF.show=======>")
resDF.show()
// val data: RDD[Row] = resDF.rdd.mapPartitions(sessionProcessPref.filterRows)
println("---------------------------------------process columns-------------------------------------------")
val baseRdd = resDF.rdd.mapPartitions(sessionProcessPref.processColumns)
println(s"process columns.count===>${baseRdd.count()}")
var baseDF = baseRdd.toDF("pseudo_session", "user_id", "mobile", "device_token", "user_token", "view_class", "view_path","action", "component_tag",
"menu_code", "action_code", "position", "label_value","label_class", "app_version", "device_type", "device_brand", "device_model",
"net_type", "created_time", "date_time","web_data","web_data_type","alternate_info","login_state","first_app_version","serviceName")
println("baseDF.show=======>")
baseDF.show()
println("----------------------------------compute session id---------------------------------------------")
// val sessionIdDF: DataFrame = sessionProcessPref.getSessionId(baseDF,sessionProcessPref)
//默认缓存级别是:MEMORY_AND_DISK
// sessionIdDF.persist(StorageLevel.MEMORY_AND_DISK_SER)
println("-------------------------------match user_id 逻辑-------------------------------------------------")
val dwFactLogSession: DataFrame = sessionProcessPref.matchUserId(baseDF,sessionProcessPref.sparkSession,scnData)
println("dwFactLogSession.show=======>")
dwFactLogSession.show()
dwFactLogSession.printSchema()
println("-----------------create view fact_log_sesson_pref and load to dw_fact_log_sesson_pref--------------------")
dwFactLogSession.createOrReplaceTempView("fact_log_sesson_pref")
val loadDataSql =
s"insert overwrite table ${MyConfigSession.HIVE_TABLE4} partition(created_day='${scnData}') select * from fact_log_session distribute by rand()"
sessionProcessPref.sparkSession.sql(loadDataSql)
println("----------------------------------update task record table---------------------------------------")
//任务执行成功,更新 Mysql record 配置表
val updateSQL: String =
s"""
|update ${MyConfigSession.JDBC_TABLE} set status=?,end_time=?,data_count=? where job_id=1969 and start_time='${startTime}'
""".stripMargin
val endTime: String = DateUtils.getTodayTime
val upreSta: PreparedStatement = connSql.prepareStatement(updateSQL)
upreSta.setString(1, "1")
upreSta.setString(2, endTime)
upreSta.setInt(3, resDF.count().toInt)
//更新表数据
upreSta.executeUpdate()
//关闭连接
JDBCUtil.close(connSql, upreSta)
sessionProcessPref.sparkSession.stop()
} catch {
case e: Exception => {
println(s"-----------------------------------任务异常---------------------------------------------------")
e.printStackTrace()
val exceptionSQL: String =
s"""
|update ${MyConfigSession.JDBC_TABLE} set status=?,exception=?,end_time=? where job_id=1969 and start_time='${startTime}'
""".stripMargin
val errorArr = Array[String]("2", e.getMessage, DateUtils.getTodayTime)
JDBCUtil.insertRecord(connSql, exceptionSQL, errorArr)
connSql.close()
}
}
}
}
class SessionProcessPref extends java.io.Serializable{
def getSparkSession(appName: String): SparkSession = {
val conf: SparkConf = new SparkConf().setAppName(appName)
UseUtil.setConfigure(conf)
val sparkSession: SparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
sparkSession
}
val sparkSession: SparkSession = getSparkSession("SessionProcessPref")
//获取符合要求的actionType广播变量
// val actionTypeBroad =
// UseUtil.getBroadcast(sparkSession, MyConfigSession.ACTION_TYPE_SQL, "action_type", "is_valid")
//获取clasName广播变量
// val classNameBroad =
// UseUtil.getBroadcast(sparkSession, MyConfigSession.CLASS_NAME_SQL, "class_name", "is_valid")
//获取menu_code广播变量
// val menuCodeBroad =
// UseUtil.getBroadcast(sparkSession, MyConfigSession.MENU_CODE_SQL, "view_path", "menu_code")
// //获取actionCategory变量
// val actionCategory =
// UseUtil.getBroadcast(sparkSession,MyConfigSession.ACTION_CATEGORY_SQL,"action_type","action_category")
//处理字段,得到需要的字段值
val processColumns = (rows: Iterator[Row]) => {
// val baseList = new ListBuffer[(String, String, String, String, String, String, String, String, String,String, String, String,
// String, String, String, String, String, String, String, String, String, String, String, String,String,String, String, String)]() //
val baseList = new ListBuffer[SessionPref]()
//关联到menu_code的映射表广播变量
// val path_menu: Map[String, String] = menuCodeBroad.value
//关联到action_category的映射表广播变量
// val actionCategoryMap: Map[String, String] = actionCategory.value
rows.toList.foreach(row => {
//1.获取网络类型
//2G,3G,4G,2G/3G/4G,WIFI,WLAN,或者为空字符串
val net_type = UseUtil.netTypeMatch(StringUtils.getNotNullString(row.getAs[String]("network_type")))
//2.修改action类型,保留原始字段
val action = row.getAs[String]("action")
// var action_type: String = ""
// if (row.getAs[String]("action") !=null) {
// action_type = actionCategoryMap.getOrElse(action,"ACTION")
// }
//3.拆分 component_tag字段
val component_tag: String = StringUtils.getNotNullString(row.getAs[String]("component_tag"))
var menu_code: String = ""
var action_code: String = ""
var position: String = ""
var label_value: String = ""
var label_class:String = ""
//将符合要求的component_tag进行切割,获取 aciton_code,label_value
if (component_tag.contains("#")) {
//按照#号切割
val strs: Array[String] = component_tag.split("#")
strs.length match {
case 1 => {
menu_code = strs(0)
}
case 2 => {
menu_code = strs(0)
action_code = strs(1)
}
case 3 => {
menu_code = strs(0)
action_code = strs(1)
position = strs(2)
}
case 4 => {
menu_code = strs(0)
action_code = strs(1)
position = strs(2)
label_value = strs(3).substring(0,math.min(250,strs(3).length))
}
case _ => {
menu_code = strs(0)
action_code = strs(1)
position = strs(2)
label_value = strs(3).substring(0,math.min(250,strs(3).length))
label_class = strs(4).substring(0,math.min(250,strs(4).length))
}
}
}
// //匹配menu_code:如果上述截取出来的menu_code为(''||null||0||length(menu_code)>3 ) and action is ACTION_VIEW
// if ((menu_code.equals("")|| menu_code.equals("null") || menu_code.equals("0") || menu_code.length> 3 )
// && action_type.equals("ACTION_VIEW")) {
// menu_code = "0" //关联不上的显示为0
// import scala.util.control.Breaks._
// breakable {
// //利用menu_code映射表匹配
// for (tuple <- path_menu) {
// //源数据view_path的字符串包含映射表view_path的字符串
// if (StringUtils.getNotNullString(row.getAs[String]("view_path")).contains(tuple._1)) {
// //满足条件后,修改源数据的menu_code
// menu_code = tuple._2
// println("--------------------menu_code match successfully-----------------------")
// //结束遍历
// break()
// }
// }
// }
// }
//一行数据添加到List中
baseList += SessionPref(StringUtils.getNotNullString(row.getAs[String]("pseudo_session")),
StringUtils.getNotNullString(row.getAs[String]("doctor_id")),
StringUtils.getNotNullString(row.getAs[String]("mobile")),
StringUtils.getNotNullString(row.getAs[String]("device_token")),
StringUtils.getNotNullString(row.getAs[String]("user_token_tourist")),
StringUtils.getNotNullString(row.getAs[String]("class_name")),
StringUtils.getNotNullString(row.getAs[String]("view_path")),
action, component_tag, menu_code, action_code, position, label_value,label_class,
StringUtils.getNotNullString(row.getAs[String]("app_version")),
StringUtils.getNotNullString(row.getAs[String]("device_type")),
StringUtils.getNotNullString(row.getAs[String]("device_brand")),
StringUtils.getNotNullString(row.getAs[String]("device_model")),
net_type,
StringUtils.getNotNullString(row.getAs[String]("created")),
DateUtils.milliSecondsFormatTime(StringUtils.getNotNullString(row.getAs[String]("created"))),
StringUtils.getNotNullString(row.getAs[String]("web_data")),
StringUtils.getNotNullString(row.getAs[String]("web_data_type")),
StringUtils.getNotNullString(row.getAs[String]("alternate_info")),
StringUtils.getNotNullString(row.getAs[String]("login_state")),
StringUtils.getNotNullString(row.getAs[String]("first_app_version")),
StringUtils.getNotNullString(row.getAs[String]("serviceName"))
)
})
baseList.iterator
}
/**
* @Description 匹配user_id,补全数据中的user_id字段
* @param dataFrame 筛选后的数据
* @param sparkSQLSession SparkSession 环境
* @param created_day 当前数据的日期,格式 "2020-03-01"
* @return org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>
**/
def matchUserId(dataFrame: DataFrame,sparkSQLSession: SparkSession,created_day:String):DataFrame={
//追加:将dataFrame与pica_ds.pica_doctor根据user_id进行匹配,匹配不上的user_id置为'0'
println("matchUserId开始执行-----------------------------------")
dataFrame.createOrReplaceTempView(MyConfigSession.VIEW_SESSION_ODS)
val DF = sparkSQLSession.sql(MyConfigSession.INIT_USER_ID_SQL_PREF)
//以下的所有逻辑是为了补全user_id字段
//第一步:首先筛选出不符合的use_id数据,将这些user_id置为字符串'0'
val noMatchUserIdDF: Dataset[Row] = DF.where("user_id ='' OR user_id = '0' OR LENGTH(user_id) = 24")
.selectExpr("pseudo_session","'0' as user_id","mobile","device_token","user_token",
"view_class","view_path","action" ,"component_tag","menu_code",
"action_code","position","label_value","label_class","app_version","device_type",
"device_brand","device_model","net_type","created_time",
"date_time","web_data","web_data_type","alternate_info","login_state","first_app_version","serviceName")
noMatchUserIdDF.createOrReplaceTempView(MyConfigSession.VIEW_SESSION_NO_MATCH)
//1.筛选出上一步没有匹配到的user_id,先按照mobile_phone进行匹配
val mobilePhoneDF: DataFrame = sparkSQLSession.sql(MyConfigSession.MOBILE_PHONE_SQL_PREF)
mobilePhoneDF.createOrReplaceTempView(MyConfigSession.VIEW_MOBILE_PHONE)
//2.使用临时表equiment,筛选出为1的那条最新数据
var equipmentInfoSql = MyConfigSession.EQUIPMENT_INFO_SQL
if(!created_day.equals(DateUtils.getYesterdayDate)){//如果不是跑昨天的数据,使用equipment拉链表
equipmentInfoSql = MyConfigSession.EQUIPMENT_INFO_SQL_ARGS+ s"'${created_day}'"
}
println(s"equipmentInfoSql=${equipmentInfoSql}")
val equipmentDF: DataFrame = sparkSQLSession.sql(equipmentInfoSql).where("row_d =1")
equipmentDF.createOrReplaceTempView(MyConfigSession.VIEW_EQUIPMENT_INFO)
//3.将第2步筛选出来的数据按照device_token进行匹配,获得user_id
val deviceTokenDF: DataFrame = sparkSQLSession.sql(MyConfigSession.DEVICE_TOKEN_SQL_PREF)
//4.将上述三者union,最终导入表中的数据
val rightUserId: Dataset[Row] = DF.where("user_id !='' and user_id != '0' and LENGTH(user_id) !=24")
val mobilePhoneResDF: Dataset[Row] = mobilePhoneDF.where("user_id !='0'")
val dwFactLogSession: Dataset[Row] = rightUserId.union(mobilePhoneResDF).union(deviceTokenDF)
dwFactLogSession.createOrReplaceTempView(MyConfigSession.VIEW_DEVICE_TOKEN)
//根据pica_doctor补充user_id_int字段(字段类型转换成int型), 限制 delete_flag = 1 and creat_time截止昨日创建,未关联上显示为0
val USER_ID_INT_SQL:String=
s"""
|SELECT concat(regexp_replace( '${created_day}',"-","") ,cast(row_number() over(partition by 1 order by created_time) as string)) as id,
|ss.pseudo_session,ss.user_id,COALESCE(b.id,0) user_id_int ,ss.mobile,ss.device_token,ss.user_token,
|ss.view_class,ss.view_path,ss.action, ss.component_tag,ss.menu_code,
|ss.action_code,ss.position,ss.label_value,ss.label_class,ss.app_version,ss.device_type,
|ss.device_brand,ss.device_model, ss.net_type,ss.created_time,
|ss.date_time,ss.web_data,ss.web_data_type,ss.alternate_info,ss.login_state,ss.first_app_version,ss.serviceName
| from ${MyConfigSession.VIEW_DEVICE_TOKEN} AS ss
|left join (select id,cast(id as string) id_str from pica_ds.pica_doctor a where a.delete_flag = 1 and to_date(a.creat_time) <= '${created_day}') AS b on ss.user_id = b.id_str
|""".stripMargin
val userIdDF: DataFrame = sparkSQLSession.sql(USER_ID_INT_SQL)
userIdDF
}
}
case class SessionPref(pseudo_session:String,
user_id:String,
mobile:String,
device_token:String,
user_token:String,
view_class:String,
view_path:String,
action:String,
component_tag:String,
menu_code:String,
action_code:String,
position:String,
label_value:String,
label_class:String,
app_version:String,
device_type:String,
device_brand:String,
device_model:String,
net_type:String,
created_time:String,
date_time:String,
web_data:String,
web_data_type:String,
alternate_info:String,
login_state:String,
first_app_version:String,
serviceName:String)
package com.utils package com.utils
import com.session.{SessionMenuCalc, SessionProcess, SessionProcessHeart, SessionProcessPath } import com.session.{SessionMenuCalc, SessionProcess, SessionProcessHeart, SessionProcessPath, SessionProcessPref}
import org.apache.hadoop.util.ProgramDriver import org.apache.hadoop.util.ProgramDriver
/** /**
...@@ -15,6 +15,7 @@ object Driver { ...@@ -15,6 +15,7 @@ object Driver {
driver.addClass("SessionProcessPath",classOf[SessionProcessPath],"用户Session数据分析导入到dw_fact_log_session_path表") driver.addClass("SessionProcessPath",classOf[SessionProcessPath],"用户Session数据分析导入到dw_fact_log_session_path表")
driver.addClass("SessionProcessHeart",classOf[SessionProcessHeart],"用户Session数据分析导入到dw_fact_log_session_heart表") driver.addClass("SessionProcessHeart",classOf[SessionProcessHeart],"用户Session数据分析导入到dw_fact_log_session_heart表")
driver.addClass("SessionMenuCalc",classOf[SessionMenuCalc],"传递日期参数--用户Session数据分析导入到dw_fact_log_session_menu_calc表") driver.addClass("SessionMenuCalc",classOf[SessionMenuCalc],"传递日期参数--用户Session数据分析导入到dw_fact_log_session_menu_calc表")
driver.addClass("SessionProcessPref",classOf[SessionProcessPref],"传递日期参数--用户Session数据etl导入到dw_fact_log_sesson_pref表")
driver.run(args) driver.run(args)
} }
} }
Markdown 格式
0% or
您添加了 0 到此讨论。请谨慎行事。
先完成此消息的编辑!
想要评论请 注册