提交 9bd3742d 编写于 作者: weicheng.mao's avatar weicheng.mao

初始化代码

上级 ec546bff
流水线 #46727 已取消 于阶段
......@@ -38,7 +38,8 @@ object SessionProcess {
val sessionProcess: SessionProcess = SessionProcess()
//step1:获取源数据,重新分区,产生shuffle,Spark读Hive默认的分区数太少,并对数据去重
var sourceDF: DataFrame = sessionProcess.sparkSession.sql(MyConfigSession.SOURCE_SQL+s" and created_day='${scnData}'").repartition(200).distinct()
sourceDF.show()
// sourceDF.show()
println("sourceDF_cnt----------------------->"+sourceDF.count()+"--------------------------------")
var conditionGroup = List( "<='2'","between '3' and '5'","between '6' and '8'","between '9' and 'b'",">='c'" )
var dataCount = 0
var index = 0
......@@ -46,6 +47,7 @@ object SessionProcess {
for(condition <- conditionGroup){
index += 1
val slideDF = sourceDF.where(s" SUBSTRING(pseudo_session,1,1) ${condition}").repartition(100)
println("slideDF_cnt----------------------->"+slideDF.count()+"--------------------------------")
//step2:抽取出当天pseudo_session对应的非空的device_token,doctor_id,mobile,补充到对应的pseudo_session下这几项为空的记录中
val groupRdd = slideDF.rdd.groupBy(r => r.getAs[String]("pseudo_session"))
val resRdd = groupRdd.flatMap(g => {
......@@ -91,32 +93,37 @@ object SessionProcess {
row.getAs[String]("device_brand"),
row.getAs[String]("device_model"),
row.getAs[String]("network_type"),
row.getAs[String]("created")))
row.getAs[String]("created"),
row.getAs[String]("web_data")
))
})
resList.iterator
})
import sessionProcess.sparkSession.implicits._
//step3:根据映射表来进行action_type和class_name数据过滤
println("sourceDF.schema "+sourceDF.schema)
val resDF = sessionProcess.sparkSession.createDataFrame(resRdd,sourceDF.schema)
resDF.persist(StorageLevel.MEMORY_AND_DISK_SER)
println("resDF.show=======>")
println("resDF.cnt=======>"+resDF.count())
// resDF.show()
val data: RDD[Row] = resDF.rdd.mapPartitions(sessionProcess.filterRows)
println("resDF.schema "+resDF.schema)
println("---------------------------------------process columns-------------------------------------------")
val baseDF: DataFrame = data.mapPartitions(sessionProcess.processColumns)
.toDF("pseudo_session", "user_id", "mobile", "device_token", "user_token", "view_class", "view_path",
"action_type", "component_tag", "menu_code", "action_code", "position", "label_value","label_class","module_class1","module_class2", "app_version",
"device_type", "device_brand", "device_model", "device_system", "net_type", "created_time", "date_time")
"device_type", "device_brand", "device_model", "device_system", "net_type", "created_time", "date_time","web_data")
println("----------------------------------compute session id---------------------------------------------")
println("baseDF.schema "+baseDF.schema)
val sessionIdDF: DataFrame = sessionProcess.getSessionId(baseDF,sessionProcess)
//默认缓存级别是:MEMORY_AND_DISK
sessionIdDF.persist(StorageLevel.MEMORY_AND_DISK_SER)
println("sessionIdDF.show=======>")
println("sessionIdDF.count()=======>"+sessionIdDF.count())
println("-------------------------------match user_id 逻辑-------------------------------------------------")
val dwFactLogSession: DataFrame = sessionProcess.matchUserId(sessionIdDF,sessionProcess.sparkSession,scnData)
println("dwFactLogSession.show=======>")
println("dwFactLogSession.count=======>"+dwFactLogSession.count())
println("-----------------create view fact_log_session and load to dw_fact_log_session--------------------")
dwFactLogSession.createOrReplaceTempView("fact_log_session")
var insertMode = "insert overwrite"
......@@ -134,9 +141,9 @@ object SessionProcess {
sessionProcess.sparkSession.stop()
} catch {
case e: Exception => {
println(s"-----------------------------------任务异常---------------------------------------------------")
println(s"-----------------------------------任务异常:fail---------------------------------------------------")
e.printStackTrace()
throw new Exception
}
}
}
......@@ -147,22 +154,22 @@ object SessionProcess {
class SessionProcess extends java.io.Serializable{
def getSparkSession(appName: String): SparkSession = {
// val sparkSession = SparkSession
// .builder()
// .appName("SparkPi")
// .config("spark.master", "local[1]") // 需设置spark.master为local[N]才能直接运行,N为并发数。
// .config("spark.hadoop.odps.project.name", "pica")
// .config("spark.hadoop.odps.access.id", "LTAI5tQEnb9VREcPdDAR32bg")
// .config("spark.hadoop.odps.access.key", "RYazt1JIYZnRcEdlntarZ1Ov21zoNi")
// .config("spark.hadoop.odps.end.point", "http://service.cn.maxcompute.aliyun.com/api")
// .config("spark.sql.catalogImplementation", "odps")
// .getOrCreate()
val sparkSession = SparkSession
.builder()
.config("spark.hadoop.odps.project.name", "pica")
.appName("SessionProcess")
.getOrCreate()
// val sparkSession = SparkSession
// .builder()
// .appName("SparkPi")
// .config("spark.master", "local[1]") // 需设置spark.master为local[N]才能直接运行,N为并发数。
// .config("spark.hadoop.odps.project.name", "pica")
// .config("spark.hadoop.odps.access.id", "LTAI5tQEnb9VREcPdDAR32bg")
// .config("spark.hadoop.odps.access.key", "RYazt1JIYZnRcEdlntarZ1Ov21zoNi")
// .config("spark.hadoop.odps.end.point", "http://service.cn.maxcompute.aliyun.com/api")
// .config("spark.sql.catalogImplementation", "odps")
// .getOrCreate()
val sparkSession = SparkSession
.builder()
.config("spark.hadoop.odps.project.name", "pica")
.appName("SessionProcess")
.getOrCreate()
sparkSession
}
......@@ -332,7 +339,9 @@ class SessionProcess extends java.io.Serializable{
StringUtils.getNotNullString(row.getAs[String]("device_model")),
"", net_type,
StringUtils.getNotNullString(row.getAs[String]("created")),
DateUtils.milliSecondsFormatTime(StringUtils.getNotNullString(row.getAs[String]("created"))))
DateUtils.milliSecondsFormatTime(StringUtils.getNotNullString(row.getAs[String]("created"))),
StringUtils.getNotNullString(row.getAs[String]("web_data"))
)
})
baseList.iterator
}
......@@ -378,7 +387,9 @@ class SessionProcess extends java.io.Serializable{
StringUtils.getNotNullString(row.getAs[String]("device_model")),
StringUtils.getNotNullString(row.getAs[String]("device_system")),
StringUtils.getNotNullString(row.getAs[String]("net_type")),
created, StringUtils.getNotNullString(row.getAs[String]("date_time")))
created, StringUtils.getNotNullString(row.getAs[String]("date_time")),
StringUtils.getNotNullString(row.getAs[String]("web_data"))
)
})
list
}
......@@ -403,7 +414,7 @@ class SessionProcess extends java.io.Serializable{
"user_token", "view_class", "view_path", "action_type",
"component_tag", "menu_code", "action_code", "position", "label_value","label_class","module_class1","module_class2",
"app_version", "device_type", "device_brand", "device_model",
"device_system", "net_type", "created_time", "date_time",
"device_system", "net_type", "created_time", "date_time","web_data",
"COALESCE(refer_created,created_time) as refer_created")
//按照 pseudo_session聚合,计算 session_id
......@@ -416,7 +427,7 @@ class SessionProcess extends java.io.Serializable{
"view_class", "view_path", "action_type", "component_tag",
"menu_code", "action_code", "position", "label_value","label_class","module_class1","module_class2",
"app_version", "device_type", "device_brand", "device_model",
"device_system", "net_type", "created_time", "date_time")
"device_system", "net_type", "created_time", "date_time","web_data")
sessionIdDF
}
......@@ -440,7 +451,7 @@ class SessionProcess extends java.io.Serializable{
"view_class", "view_path", "action_type", "component_tag",
"menu_code", "action_code", "position", "label_value","label_class","module_class1","module_class2",
"app_version", "device_type", "device_brand", "device_model",
"device_system", "net_type", "created_time", "date_time")
"device_system", "net_type", "created_time", "date_time","web_data")
noMatchUserIdDF.createOrReplaceTempView(MyConfigSession.VIEW_SESSION_NO_MATCH)
//1.筛选出上一步没有匹配到的user_id,先按照mobile_phone进行匹配
......@@ -471,7 +482,7 @@ class SessionProcess extends java.io.Serializable{
|ss.view_class,ss.view_path,ss.action_type,ss.component_tag, ss.menu_code,
|ss.action_code, ss.position,ss.label_value,ss.label_class,ss.module_class1,ss.module_class2,ss.app_version, ss.device_type,
|ss.device_brand, ss.device_model,ss.device_system,ss.net_type,ss.created_time,
|ss.date_time from ${MyConfigSession.VIEW_DEVICE_TOKEN} AS ss
|ss.date_time,ss.web_data from ${MyConfigSession.VIEW_DEVICE_TOKEN} AS ss
|left join (select id,cast(id as string) id_str from pica.ods_pica_p_doctor a where a.delete_flag = 1 and to_date(a.creat_time) <= '${created_day}') AS b on ss.user_id = b.id_str
|""".stripMargin
val userIdDF: DataFrame = sparkSQLSession.sql(USER_ID_INT_SQL)
......@@ -501,6 +512,8 @@ case class SessionLog(pseudo_session: String,
device_system: String,
net_type: String,
created_time: String,
date_time: String)
date_time: String,
web_data:String
)
......@@ -13,7 +13,7 @@ object MyConfigSession {
final val SOURCE_SQL: String =
"""
|select pseudo_session,doctor_id,mobile,device_token,user_token_tourist,class_name,view_path,action,
|component_tag,app_version,device_type,device_brand,device_model,network_type,created from pica.picalog_trace_app_part
|component_tag,app_version,device_type,device_brand,device_model,network_type,created,web_data from pica.picalog_trace_app_part
| where pseudo_session is not null and pseudo_session !=''
| and pseudo_id !='' and extra_info !='com.picahealth.patient' and serviceName != 'trace3'
| and action!='ACTION_WEB_AFTER'
......@@ -29,9 +29,9 @@ object MyConfigSession {
final val HIVE_TABLE2: String = "pica_dw.dw_fact_log_session_path"
final val HIVE_TABLE3: String = "pica_dw.dw_fact_log_session_heart"
final val HIVE_TABLE4: String = "pica.dwd_fact_log_session_term"
final val HIVE_TABLE4_MID: String = "pica_dw.dw_fact_log_session_term_mid"
final val HIVE_TABLE5: String = "pica_dw.pathconvertconfig"
final val HIVE_TABLE6: String = "pica_dw.dw_fact_log_session_path_convert"
final val HIVE_TABLE4_MID: String = "pica.dw_fact_log_session_term_mid"
final val HIVE_TABLE5: String = "pica.pathconvertconfig"
final val HIVE_TABLE6: String = "pica.dwd_fact_log_session_path_convert"
//写入的文件路径
......@@ -106,11 +106,13 @@ object MyConfigSession {
|nodename,
|menucode,
|actioncode,
|position,
|uv,
|refer_menu_code,
|refer_action_code,
|starttime,
|endtime
|endtime,
|outflag
|from ${MyConfigSession.HIVE_TABLE5}
""".stripMargin
......@@ -156,7 +158,7 @@ object MyConfigSession {
|t.view_class,t.view_path,t.action_type,t.component_tag, t.menu_code,
|t.action_code, t.position, t.label_value,t.label_class,t.module_class1,t.module_class2,t.app_version,t.device_type,
|t.device_brand, t.device_model, t.device_system,t.net_type,t.created_time,
|t.date_time from ${MyConfigSession.VIEW_SESSION_ODS} as t
|t.date_time,t.web_data from ${MyConfigSession.VIEW_SESSION_ODS} as t
|left join pica.ods_pica_p_doctor as b on t.user_id = cast(b.id as string)
""".stripMargin
final val INIT_USER_ID_SQL_PREF =
......@@ -176,7 +178,7 @@ object MyConfigSession {
|ss.view_class,ss.view_path,ss.action_type,ss.component_tag, ss.menu_code,
|ss.action_code, ss.position,ss.label_value,ss.label_class,ss.module_class1,ss.module_class2,ss.app_version, ss.device_type,
|ss.device_brand, ss.device_model,ss.device_system, ss.net_type,ss.created_time,
|ss.date_time from ${MyConfigSession.VIEW_SESSION_NO_MATCH} AS ss
|ss.date_time,ss.web_data from ${MyConfigSession.VIEW_SESSION_NO_MATCH} AS ss
|left join (select distinct id,mobile_phone from pica.ods_pica_p_doctor where ods_pica_p_doctor.delete_flag = 1 and mobile_phone!='' and mobile_phone!='XK0HdMN6dAfOlYPOFHHL0A==' ) AS b on ss.mobile = b.mobile_phone
""".stripMargin
final val MOBILE_PHONE_SQL_PREF: String =
......@@ -214,7 +216,7 @@ object MyConfigSession {
|t.view_class,t.view_path,t.action_type,t.component_tag, t.menu_code,
|t.action_code, t.position, t.label_value,t.label_class,t.module_class1,t.module_class2,t.app_version,t.device_type,
|t.device_brand, t.device_model, t.device_system,t.net_type,t.created_time,
|t.date_time from (select * from ${MyConfigSession.VIEW_MOBILE_PHONE} a where a.user_id= '0' ) as t
|t.date_time,t.web_data from (select * from ${MyConfigSession.VIEW_MOBILE_PHONE} a where a.user_id= '0' ) as t
|left join ${MyConfigSession.VIEW_EQUIPMENT_INFO} as b on t.device_token = b.device_token
""".stripMargin
final val DEVICE_TOKEN_SQL_PREF: String =
......@@ -243,10 +245,15 @@ object MyConfigSession {
final val JDBC_USERNAME = "pica_spider"
final val JDBC_PSSWORD = "5$7FXgz#e5JWP08e"
final val JDBC_TABLE = "schedule_job_record"
final val mysql_url_user_profile="jdbc:mysql://hdfs127:3306/user_profile?useTimezone=true&serverTimezone=GMT%2B8"
final val mysql_username_user_profile="bi_admin"
final val mysql_password_user_profile="5$7FXgz#e5JWP08e"
// rm-ukksi760c7spi07dd8go.mysql.rds.aliyuncs.com:3306 bigdata_rw rJzoYogUHYEwLZ8v
// rr-uf6p67797265cm09f.mysql.rds.aliyuncs.com
final val mysql_url_user_profile="jdbc:mysql://rr-uf6p67797265cm09f.mysql.rds.aliyuncs.com:3306/pica_bi"
final val mysql_username_user_profile="bi_readonly"
final val mysql_password_user_profile="1Qaz2wsx"
// final val mysql_url_user_profile="jdbc:mysql://rm-ukksi760c7spi07dd8go.mysql.rds.aliyuncs.com:3306/pica_bi?useTimezone=true&serverTimezone=GMT%2B8"
// final val mysql_username_user_profile="bigdata_rw"
// final val mysql_password_user_profile="rJzoYogUHYEwLZ8v"
final val MENU_CODE_VIEW_PATH_SQL: String = "select distinct full_path view_path, menu_code from pica_ds.pica_bi_bi_menu_code_h5 where full_path is not Null"
......
package utils
import java.sql.{Connection, DriverManager}
import java.util.{Properties, UUID}
import org.apache.spark.broadcast.Broadcast
......
Markdown 格式
0% or
您添加了 0 到此讨论。请谨慎行事。
先完成此消息的编辑!
想要评论请 注册