提交 5c56dda7 编写于 作者: wuyunfeng's avatar wuyunfeng

优化session_pref与term

上级 500cc55e
...@@ -7,6 +7,7 @@ import com.config.MyConfigSession ...@@ -7,6 +7,7 @@ import com.config.MyConfigSession
import com.pica.utils.{DateUtils, StringUtils} import com.pica.utils.{DateUtils, StringUtils}
import com.utils.{JDBCUtil, UseUtil} import com.utils.{JDBCUtil, UseUtil}
import org.apache.spark.SparkConf import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.storage.StorageLevel import org.apache.spark.storage.StorageLevel
...@@ -49,151 +50,26 @@ object SessionProcessPref { ...@@ -49,151 +50,26 @@ object SessionProcessPref {
val sessionProcessPref: SessionProcessPref = SessionProcessPref() val sessionProcessPref: SessionProcessPref = SessionProcessPref()
//step1:获取源数据,重新分区,产生shuffle,Spark读Hive默认的分区数太少,并对数据去重 //step1:获取源数据,重新分区,产生shuffle,Spark读Hive默认的分区数太少,并对数据去重
var sourceDF: DataFrame = sessionProcessPref.sparkSession.sql(MyConfigSession.SOURCE_SQL_PREF + s" and created_day='${scnData}'").repartition(120).distinct() var sourceDF: DataFrame = sessionProcessPref.sparkSession.sql(MyConfigSession.SOURCE_SQL_PREF + s" and created_day='${scnData}'").repartition(120).distinct()
//step2:抽取出当天pseudo_session对应的非空的device_token,doctor_id,mobile,补充到对应的pseudo_session下这几项为空的记录中 var dataCount = 0
val groupRdd = sourceDF.rdd.groupBy(r => r.getAs[String]("pseudo_session")) var index = 0
val baseRdd = groupRdd.flatMap(g => { val conditionGroup = List("<='4' ","between '5' and '9'",">'9'")
val pseudo_session = g._1 for(condition <- conditionGroup){
val resList: ListBuffer[SessionPref] = new ListBuffer[SessionPref]() val slideDF = sourceDF.where(s" SUBSTRING(pseudo_session,1,1) ${condition}").repartition(100)
var rowList = g._2 println(s"-----------------------------------compute refer columns,condition=${condition}-----------------------------------------")
rowList = rowList.toList.sortWith((x, y) => x.getAs[String]("created") > y.getAs[String]("created")) //按created由大到小排序 //step2:抽取出当天pseudo_session对应的非空的device_token,doctor_id,mobile,补充到对应的pseudo_session下这几项为空的记录中
var thisDeviceToken = "" val baseRdd:RDD[SessionPref] = sessionProcessPref.offsetValues(slideDF)
var thisDoctorId = "0" println("---------------------------------------process columns-------------------------------------------")
var thisMobile = "" import sessionProcessPref.sparkSession.implicits._
val path_menu: Map[String, String] = sessionProcessPref.menuCodeBroad.value //关联到menu_code的映射表广播变量 var baseDF = baseRdd.toDF("pseudo_session", "user_id", "mobile", "device_token", "user_token", "view_class", "view_path", "action", "action_type", "component_tag",
val actionCategoryMap: Map[String, String] = sessionProcessPref.actionCategory.value //关联到action_category的映射表广播变量 "menu_code", "menu_code_new", "action_code", "position", "label_value", "label_class", "module_class1", "module_class2", "app_version", "device_type", "device_brand", "device_model",
rowList.foreach(row => { "net_type", "created_time", "date_time", "web_data", "web_data_type", "alternate_info", "login_state", "first_app_version", "serviceName", "tag8", "tag9", "tag10")
var deviceToken = row.getAs[String]("device_token") println("baseDF.show=======>")
var doctorId = row.getAs[String]("doctor_id") baseDF.printSchema()
var mobile = row.getAs[String]("mobile") baseDF.repartition(120).persist(StorageLevel.MEMORY_AND_DISK_SER)
if (deviceToken != null && !deviceToken.equals("")) { sessionProcessPref.loadData(baseDF,scnData,index)
thisDeviceToken = deviceToken dataCount += baseDF.count().toInt
} else { }
deviceToken = thisDeviceToken println("----------------------------------update task record table---------------------------------------")
}
if (doctorId != null && !doctorId.equals("") && !doctorId.equals("0")) {
thisDoctorId = doctorId
} else {
doctorId = thisDoctorId
}
if (mobile != null && !mobile.equals("")) {
thisMobile = mobile
} else {
mobile = thisMobile
}
//1.获取网络类型
//2G,3G,4G,2G/3G/4G,WIFI,WLAN,或者为空字符串
val net_type = UseUtil.netTypeMatch(StringUtils.getNotNullString(row.getAs[String]("network_type")))
//2.修改action类型,保留原始字段
val action = row.getAs[String]("action")
var action_type: String = ""
if (row.getAs[String]("action") != null) {
action_type = actionCategoryMap.getOrElse(action, "ACTION_UNKNOWN")
}
//3.拆分 component_tag字段
val component_tag: String = StringUtils.getNotNullString(row.getAs[String]("component_tag"))
val tagArr = Array("menu_code", "action_code", "position", "label_value", "label_class", "module_class1", "module_class2", "tag8", "tag9", "tag10")
val tagMap = mutable.Map[String, String]()
tagArr.foreach(r => tagMap.put(r, ""))
//将符合要求的component_tag进行切割,获取 aciton_code,label_value
if (component_tag.contains("#")) {
//按照#号切割
val strs: Array[String] = component_tag.split("#")
var index = 0
for (value <- strs) {
val filedName = tagArr.apply(index)
if ("label_value".equals(filedName)) {
tagMap.put(filedName, value.substring(0, math.min(250, strs(3).length)))
} else {
tagMap.put(filedName, value)
}
index += 1
}
}
var menu_code_new = tagMap("menu_code")
if ("MenuCode_081".equals(menu_code_new)) {
menu_code_new = "081" //针对异常menu_code值单独处理
}
//匹配menu_code:如果上述截取出来的menu_code为(''||null||0||length(menu_code)>3 ) and action is ACTION_VIEW
if ((menu_code_new.equals("") || menu_code_new.equals("null") || menu_code_new.equals("0") || menu_code_new.length > 3)
&& action_type.equals("ACTION_VIEW")) {
menu_code_new = "0" //关联不上的显示为0
import scala.util.control.Breaks._
breakable {
//利用menu_code映射表匹配
for (tuple <- path_menu) {
//源数据view_path的字符串包含映射表view_path的字符串
if (StringUtils.getNotNullString(row.getAs[String]("view_path")).contains(tuple._1)) {
//满足条件后,修改源数据的menu_code
menu_code_new = tuple._2
println("--------------------menu_code match successfully-----------------------")
//结束遍历
break()
}
}
}
}
resList += SessionPref(pseudo_session,
doctorId,
mobile,
deviceToken,
StringUtils.getNotNullString(row.getAs[String]("user_token_tourist")),
StringUtils.getNotNullString(row.getAs[String]("class_name")),
StringUtils.getNotNullString(row.getAs[String]("view_path")),
action, action_type, component_tag, tagMap("menu_code"), menu_code_new, tagMap("action_code"),
tagMap("position"), tagMap("label_value"), tagMap("label_class"), tagMap("module_class1"), tagMap("module_class2"),
StringUtils.getNotNullString(row.getAs[String]("app_version")),
StringUtils.getNotNullString(row.getAs[String]("device_type")),
StringUtils.getNotNullString(row.getAs[String]("device_brand")),
StringUtils.getNotNullString(row.getAs[String]("device_model")),
net_type,
StringUtils.getNotNullString(row.getAs[String]("created")),
DateUtils.milliSecondsFormatTime(StringUtils.getNotNullString(row.getAs[String]("created"))),
StringUtils.getNotNullString(row.getAs[String]("web_data")),
StringUtils.getNotNullString(row.getAs[String]("web_data_type")),
StringUtils.getNotNullString(row.getAs[String]("alternate_info")),
StringUtils.getNotNullString(row.getAs[String]("login_state")),
StringUtils.getNotNullString(row.getAs[String]("first_app_version")),
StringUtils.getNotNullString(row.getAs[String]("serviceName")),
tagMap("tag8"), tagMap("tag9"), tagMap("tag10")
)
})
resList.iterator
})
import sessionProcessPref.sparkSession.implicits._
// val resDF = sessionProcessPref.sparkSession.createDataFrame(resRdd,sourceDF.schema)
// resDF.persist(StorageLevel.MEMORY_AND_DISK_SER)
// println("resDF.show=======>")
// resDF.printSchema()
println("---------------------------------------process columns-------------------------------------------")
// val baseRdd = resDF.rdd.mapPartitions(sessionProcessPref.processColumns)
// println("baseRdd.take(1)=" + baseRdd.take(1))
// println(s"process columns.count===>${baseRdd.count()}")
var baseDF = baseRdd.toDF("pseudo_session", "user_id", "mobile", "device_token", "user_token", "view_class", "view_path", "action", "action_type", "component_tag",
"menu_code", "menu_code_new", "action_code", "position", "label_value", "label_class", "module_class1", "module_class2", "app_version", "device_type", "device_brand", "device_model",
"net_type", "created_time", "date_time", "web_data", "web_data_type", "alternate_info", "login_state", "first_app_version", "serviceName", "tag8", "tag9", "tag10")
println("baseDF.show=======>")
baseDF.printSchema()
println("----------------------------------compute session id---------------------------------------------")
// val sessionIdDF: DataFrame = sessionProcessPref.getSessionId(baseDF,sessionProcessPref)
//默认缓存级别是:MEMORY_AND_DISK
println("-------------------------------match user_id 逻辑-------------------------------------------------")
baseDF.repartition(120).persist(StorageLevel.MEMORY_AND_DISK_SER)
// val dwFactLogSession: DataFrame = sessionProcessPref.matchUserId(baseDF, sessionProcessPref.sparkSession, scnData)
// println("dwFactLogSession.show=======>")
// dwFactLogSession.printSchema()
println("-----------------create view fact_log_session_pref and load to dw_fact_log_session_pref--------------------")
baseDF.createOrReplaceTempView("fact_log_session_pref")
val fields = List("pseudo_session", "user_id", "COALESCE(cast(user_id as int),0) user_id_int", "mobile", "device_token", "user_token", "view_class", "view_path", "action", "action_type",
"component_tag", "menu_code", "menu_code_new", "action_code", "position", "label_value", "label_class", "module_class1", "module_class2", "app_version", "device_type", "device_brand",
"device_model", "net_type", "created_time", "date_time", "web_data", "web_data_type", "alternate_info", "login_state", "first_app_version",
"servicename", "tag8", "tag9", "tag10")
val loadDataSql =
s"insert overwrite table ${MyConfigSession.HIVE_TABLE0} partition(created_day='${scnData}') select ${fields.mkString(",")} from fact_log_session_pref distribute by rand()"
sessionProcessPref.sparkSession.sql(loadDataSql)
val dataCount = baseDF.count()
// val dataCount = 1 //统计耗时较长,给个默认值1,表示不统计
println("----------------------------------update task record table---------------------------------------")
//任务执行成功,更新 Mysql record配置表 //任务执行成功,更新 Mysql record配置表
val updateSQL: String = val updateSQL: String =
s""" s"""
...@@ -228,6 +104,7 @@ object SessionProcessPref { ...@@ -228,6 +104,7 @@ object SessionProcessPref {
} }
class SessionProcessPref extends java.io.Serializable { class SessionProcessPref extends java.io.Serializable {
def getSparkSession(appName: String): SparkSession = { def getSparkSession(appName: String): SparkSession = {
val conf: SparkConf = new SparkConf().setAppName(appName) val conf: SparkConf = new SparkConf().setAppName(appName)
UseUtil.setConfigure(conf) UseUtil.setConfigure(conf)
...@@ -247,168 +124,134 @@ class SessionProcessPref extends java.io.Serializable { ...@@ -247,168 +124,134 @@ class SessionProcessPref extends java.io.Serializable {
// //获取actionCategory变量 // //获取actionCategory变量
val actionCategory = UseUtil.getBroadcast(sparkSession, MyConfigSession.ACTION_CATEGORY_SQL, "action_type", "action_category") val actionCategory = UseUtil.getBroadcast(sparkSession, MyConfigSession.ACTION_CATEGORY_SQL, "action_type", "action_category")
def offsetValues(dataFrame: DataFrame): RDD[SessionPref] = {
//处理字段,得到需要的字段值 val groupRdd = dataFrame.rdd.groupBy(r => r.getAs[String]("pseudo_session"))
val processColumns = (rows: Iterator[Row]) => { val baseRdd = groupRdd.flatMap(g => {
val baseList = new ListBuffer[SessionPref]() val pseudo_session = g._1
//关联到menu_code的映射表广播变量 val resList: ListBuffer[SessionPref] = new ListBuffer[SessionPref]()
val path_menu: Map[String, String] = menuCodeBroad.value var rowList = g._2
// 关联到action_category的映射表广播变量 rowList = rowList.toList.sortWith((x, y) => x.getAs[String]("created") > y.getAs[String]("created")) //按created由大到小排序
val actionCategoryMap: Map[String, String] = actionCategory.value var thisDeviceToken = ""
rows.toList.foreach(row => { var thisDoctorId = "0"
//1.获取网络类型 var thisMobile = ""
//2G,3G,4G,2G/3G/4G,WIFI,WLAN,或者为空字符串 val path_menu: Map[String, String] = menuCodeBroad.value //关联到menu_code的映射表广播变量
val net_type = UseUtil.netTypeMatch(StringUtils.getNotNullString(row.getAs[String]("network_type"))) val actionCategoryMap: Map[String, String] = actionCategory.value //关联到action_category的映射表广播变量
//2.修改action类型,保留原始字段 rowList.foreach(row => {
val action = row.getAs[String]("action") var deviceToken = row.getAs[String]("device_token")
var action_type: String = "" var doctorId = row.getAs[String]("doctor_id")
if (row.getAs[String]("action") != null) { var mobile = row.getAs[String]("mobile")
action_type = actionCategoryMap.getOrElse(action, "ACTION_UNKNOWN") if (deviceToken != null && !deviceToken.equals("")) {
} thisDeviceToken = deviceToken
//3.拆分 component_tag字段 } else {
val component_tag: String = StringUtils.getNotNullString(row.getAs[String]("component_tag")) deviceToken = thisDeviceToken
val tagArr = Array("menu_code", "action_code", "position", "label_value", "label_class", "module_class1", "module_class2", "tag8", "tag9", "tag10") }
val tagMap = mutable.Map[String, String]() if (doctorId != null && !doctorId.equals("") && !doctorId.equals("0")) {
tagArr.foreach(r => tagMap.put(r, "")) thisDoctorId = doctorId
//将符合要求的component_tag进行切割,获取 aciton_code,label_value } else {
if (component_tag.contains("#")) { doctorId = thisDoctorId
//按照#号切割 }
val strs: Array[String] = component_tag.split("#") if (mobile != null && !mobile.equals("")) {
var index = 0 thisMobile = mobile
for (value <- strs) { } else {
val filedName = tagArr.apply(index) mobile = thisMobile
if ("label_value".equals(filedName)) { }
tagMap.put(filedName, value.substring(0, math.min(250, strs(3).length))) //1.获取网络类型
} else { //2G,3G,4G,2G/3G/4G,WIFI,WLAN,或者为空字符串
tagMap.put(filedName, value) val net_type = UseUtil.netTypeMatch(StringUtils.getNotNullString(row.getAs[String]("network_type")))
//2.修改action类型,保留原始字段
val action = row.getAs[String]("action")
var action_type: String = ""
if (row.getAs[String]("action") != null) {
action_type = actionCategoryMap.getOrElse(action, "ACTION_UNKNOWN")
}
//3.拆分 component_tag字段
val component_tag: String = StringUtils.getNotNullString(row.getAs[String]("component_tag"))
val tagArr = Array("menu_code", "action_code", "position", "label_value", "label_class", "module_class1", "module_class2", "tag8", "tag9", "tag10")
val tagMap = mutable.Map[String, String]()
tagArr.foreach(r => tagMap.put(r, ""))
//将符合要求的component_tag进行切割,获取 aciton_code,label_value
if (component_tag.contains("#")) {
//按照#号切割
val strs: Array[String] = component_tag.split("#")
var index = 0
for (value <- strs) {
val filedName = tagArr.apply(index)
if ("label_value".equals(filedName)) {
tagMap.put(filedName, value.substring(0, math.min(250, strs(3).length)))
} else {
tagMap.put(filedName, value)
}
index += 1
} }
index += 1
} }
} var menu_code_new = tagMap("menu_code")
var menu_code_new = tagMap("menu_code") if ("MenuCode_081".equals(menu_code_new)) {
if ("MenuCode_081".equals(menu_code_new)) { menu_code_new = "081" //针对异常menu_code值单独处理
menu_code_new = "081" //针对异常menu_code值单独处理 }
} //匹配menu_code:如果上述截取出来的menu_code为(''||null||0||length(menu_code)>3 ) and action is ACTION_VIEW
//匹配menu_code:如果上述截取出来的menu_code为(''||null||0||length(menu_code)>3 ) and action is ACTION_VIEW if ((menu_code_new.equals("") || menu_code_new.equals("null") || menu_code_new.equals("0") || menu_code_new.length > 3)
if ((menu_code_new.equals("") || menu_code_new.equals("null") || menu_code_new.equals("0") || menu_code_new.length > 3) && action_type.equals("ACTION_VIEW")) {
&& action_type.equals("ACTION_VIEW")) { menu_code_new = "0" //关联不上的显示为0
menu_code_new = "0" //关联不上的显示为0 import scala.util.control.Breaks._
import scala.util.control.Breaks._ breakable {
breakable { //利用menu_code映射表匹配
//利用menu_code映射表匹配 for (tuple <- path_menu) {
for (tuple <- path_menu) { //源数据view_path的字符串包含映射表view_path的字符串
//源数据view_path的字符串包含映射表view_path的字符串 if (StringUtils.getNotNullString(row.getAs[String]("view_path")).contains(tuple._1)) {
if (StringUtils.getNotNullString(row.getAs[String]("view_path")).contains(tuple._1)) { //满足条件后,修改源数据的menu_code
//满足条件后,修改源数据的menu_code menu_code_new = tuple._2
menu_code_new = tuple._2 println("--------------------menu_code match successfully-----------------------")
println("--------------------menu_code match successfully-----------------------") //结束遍历
//结束遍历 break()
break() }
} }
} }
} }
} resList += SessionPref(pseudo_session,
//一行数据添加到List中 doctorId,
baseList += SessionPref(StringUtils.getNotNullString(row.getAs[String]("pseudo_session")), mobile,
StringUtils.getNotNullString(row.getAs[String]("doctor_id")), deviceToken,
StringUtils.getNotNullString(row.getAs[String]("mobile")), StringUtils.getNotNullString(row.getAs[String]("user_token_tourist")),
StringUtils.getNotNullString(row.getAs[String]("device_token")), StringUtils.getNotNullString(row.getAs[String]("class_name")),
StringUtils.getNotNullString(row.getAs[String]("user_token_tourist")), StringUtils.getNotNullString(row.getAs[String]("view_path")),
StringUtils.getNotNullString(row.getAs[String]("class_name")), action, action_type, component_tag, tagMap("menu_code"), menu_code_new, tagMap("action_code"),
StringUtils.getNotNullString(row.getAs[String]("view_path")), tagMap("position"), tagMap("label_value"), tagMap("label_class"), tagMap("module_class1"), tagMap("module_class2"),
action, action_type, component_tag, tagMap("menu_code"), menu_code_new, tagMap("action_code"), StringUtils.getNotNullString(row.getAs[String]("app_version")),
tagMap("position"), tagMap("label_value"), tagMap("label_class"), tagMap("module_class1"), tagMap("module_class2"), StringUtils.getNotNullString(row.getAs[String]("device_type")),
StringUtils.getNotNullString(row.getAs[String]("app_version")), StringUtils.getNotNullString(row.getAs[String]("device_brand")),
StringUtils.getNotNullString(row.getAs[String]("device_type")), StringUtils.getNotNullString(row.getAs[String]("device_model")),
StringUtils.getNotNullString(row.getAs[String]("device_brand")), net_type,
StringUtils.getNotNullString(row.getAs[String]("device_model")), StringUtils.getNotNullString(row.getAs[String]("created")),
net_type, DateUtils.milliSecondsFormatTime(StringUtils.getNotNullString(row.getAs[String]("created"))),
StringUtils.getNotNullString(row.getAs[String]("created")), StringUtils.getNotNullString(row.getAs[String]("web_data")),
DateUtils.milliSecondsFormatTime(StringUtils.getNotNullString(row.getAs[String]("created"))), StringUtils.getNotNullString(row.getAs[String]("web_data_type")),
StringUtils.getNotNullString(row.getAs[String]("web_data")), StringUtils.getNotNullString(row.getAs[String]("alternate_info")),
StringUtils.getNotNullString(row.getAs[String]("web_data_type")), StringUtils.getNotNullString(row.getAs[String]("login_state")),
StringUtils.getNotNullString(row.getAs[String]("alternate_info")), StringUtils.getNotNullString(row.getAs[String]("first_app_version")),
StringUtils.getNotNullString(row.getAs[String]("login_state")), StringUtils.getNotNullString(row.getAs[String]("serviceName")),
StringUtils.getNotNullString(row.getAs[String]("first_app_version")), tagMap("tag8"), tagMap("tag9"), tagMap("tag10")
StringUtils.getNotNullString(row.getAs[String]("serviceName")), )
tagMap("tag8"), tagMap("tag9"), tagMap("tag10") })
) resList.iterator
}) })
baseList.iterator baseRdd
} }
def loadData(dataFrame: DataFrame, partitionDay: String,index:Integer): Unit = {
/** val tmpTable = "result_view"
* @Description 匹配user_id,补全数据中的user_id字段 var insertSql = "insert overwrite"
* @param dataFrame 筛选后的数据 if(index!=1){
* @param sparkSQLSession SparkSession 环境 insertSql = "insert into"
* @param created_day 当前数据的日期,格式 "2020-03-01"
* @return org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>
**/
def matchUserId(dataFrame: DataFrame, sparkSQLSession: SparkSession, created_day: String): DataFrame = {
//追加:将dataFrame与pica_ds.pica_doctor根据user_id进行匹配,匹配不上的user_id置为'0'
println("matchUserId开始执行-----------------------------------")
dataFrame.withColumnRenamed("user_id", "user_id_ods").createOrReplaceTempView(MyConfigSession.VIEW_SESSION_ODS)
val INIT_USER_ID_SQL_PREF =
s"""
|SELECT t.*,t.user_id_ods user_id_old, COALESCE(cast(b.id as string),'0') AS user_id from ${MyConfigSession.VIEW_SESSION_ODS} as t
|left join (select id,cast(id as string) id_str from pica_ds.pica_doctor a where a.delete_flag = 1 and to_date(a.creat_time) <= '${created_day}') AS b on t.user_id_ods = cast(b.id as string)
""".stripMargin
val DF = sparkSQLSession.sql(INIT_USER_ID_SQL_PREF).drop("user_id_ods")
/*
以下的所有逻辑是为了补全user_id字段
*/
//第一步:首先筛选出不符合的use_id数据,将这些user_id置为字符串'0'
val noMatchUserIdDF: Dataset[Row] = DF.where("user_id ='' OR user_id = '0' OR LENGTH(user_id) = 24")
noMatchUserIdDF.drop("user_id_old").withColumnRenamed("user_id", "user_id_old")
.createOrReplaceTempView(MyConfigSession.VIEW_SESSION_NO_MATCH)
val MOBILE_PHONE_SQL_PREF: String =
s"""
|SELECT ss.*,COALESCE(cast(b.id as string),'0') AS user_id from ${MyConfigSession.VIEW_SESSION_NO_MATCH} AS ss
|left join (select distinct id,mobile_phone from pica_ds.pica_doctor where pica_doctor.delete_flag = 1 ) AS b on ss.mobile = b.mobile_phone
""".stripMargin
//1.筛选出上一步没有匹配到的user_id,先按照mobile_phone进行匹配
val mobilePhoneDF: DataFrame = sparkSQLSession.sql(MOBILE_PHONE_SQL_PREF)
//2.使用临时表equiment,筛选出为1的那条最新数据
var equipmentInfoSql = MyConfigSession.EQUIPMENT_INFO_SQL
if (!created_day.equals(DateUtils.getYesterdayDate)) { //如果不是跑昨天的数据,使用equipment拉链表
equipmentInfoSql = MyConfigSession.EQUIPMENT_INFO_SQL_ARGS + s"'${created_day}'"
} }
println(s"equipmentInfoSql=${equipmentInfoSql}") println(s"-----------------create view ${tmpTable} and load to dw_fact_log_session_pref--------------------")
val equipmentDF: DataFrame = sparkSQLSession.sql(equipmentInfoSql).where("row_d =1") dataFrame.repartition(10).createOrReplaceTempView(tmpTable)
equipmentDF.createOrReplaceTempView(MyConfigSession.VIEW_EQUIPMENT_INFO) val fields = List("pseudo_session", "user_id", "COALESCE(cast(user_id as int),0) user_id_int", "mobile", "device_token", "user_token", "view_class", "view_path", "action", "action_type",
mobilePhoneDF.drop("user_id_old").withColumnRenamed("user_id", "user_id_old") "component_tag", "menu_code", "menu_code_new", "action_code", "position", "label_value", "label_class", "module_class1", "module_class2", "app_version", "device_type", "device_brand",
.createOrReplaceTempView(MyConfigSession.VIEW_MOBILE_PHONE) "device_model", "net_type", "created_time", "date_time", "web_data", "web_data_type", "alternate_info", "login_state", "first_app_version",
val DEVICE_TOKEN_SQL_PREF: String = "servicename", "tag8", "tag9", "tag10")
s""" SELECT t.*,COALESCE(cast(b.user_id as string),'0') AS user_id val loadDataSql = s"${insertSql} table ${MyConfigSession.HIVE_TABLE0} partition(created_day='${partitionDay}') select ${fields.mkString(",")} from ${tmpTable} distribute by rand()"
| from (select * from ${MyConfigSession.VIEW_MOBILE_PHONE} a where a.user_id_old= '0' ) as t sparkSession.sql(loadDataSql)
|left join ${MyConfigSession.VIEW_EQUIPMENT_INFO} as b on t.device_token = b.device_token
""".stripMargin
println(s"DEVICE_TOKEN_SQL_PREF=${DEVICE_TOKEN_SQL_PREF}")
//3.将第2步筛选出来的数据按照device_token进行匹配,获得user_id
val deviceTokenDF: DataFrame = sparkSQLSession.sql(DEVICE_TOKEN_SQL_PREF)
//4.将上述三者union,最终导入表中的数据
val rightUserIdDF: Dataset[Row] = DF.where("user_id !='' and user_id != '0' and LENGTH(user_id) !=24")
val mobilePhoneResDF: Dataset[Row] = mobilePhoneDF.where("user_id !='0'")
println("rightUserIdDF/mobilePhoneResDF/deviceTokenDF.schema===========")
rightUserIdDF.printSchema()
mobilePhoneResDF.printSchema()
deviceTokenDF.printSchema()
val dwFactLogSession: Dataset[Row] = rightUserIdDF.union(mobilePhoneResDF).union(deviceTokenDF)
dwFactLogSession.createOrReplaceTempView(MyConfigSession.VIEW_DEVICE_TOKEN)
dwFactLogSession
//根据pica_doctor补充user_id_int字段(字段类型转换成int型), 限制 delete_flag = 1 and creat_time截止昨日创建,未关联上显示为0
// val USER_ID_INT_SQL: String =
// s"""
// |SELECT ss.*,COALESCE(b.id,0) user_id_int from ${MyConfigSession.VIEW_DEVICE_TOKEN} AS ss
// |left join (select id,cast(id as string) id_str from pica_ds.pica_doctor a where a.delete_flag = 1 and to_date(a.creat_time) <= '${created_day}') AS b on ss.user_id = b.id_str
// |""".stripMargin
// val userIdDF: DataFrame = sparkSQLSession.sql(USER_ID_INT_SQL)
// userIdDF
} }
} }
case class SessionPref(pseudo_session: String, case class SessionPref(pseudo_session: String,
......
...@@ -43,8 +43,12 @@ object SessionProcessTerm { ...@@ -43,8 +43,12 @@ object SessionProcessTerm {
""".stripMargin """.stripMargin
//设置同步数据的批次号,格式是2019-09-12 //设置同步数据的批次号,格式是2019-09-12
var scnData: String = DateUtils.getYesterdayDate var scnData: String = DateUtils.getYesterdayDate
var condition = " 1=1"
if (args.length >= 1) { if (args.length >= 1) {
scnData = args(0) scnData = args(0)
if(args.length > 1){
condition = args(1)
}
} }
println(s"scnData=${scnData}") println(s"scnData=${scnData}")
//设置任务开始时间,格式是2019-09-12 14:03:30 //设置任务开始时间,格式是2019-09-12 14:03:30
...@@ -68,8 +72,8 @@ object SessionProcessTerm { ...@@ -68,8 +72,8 @@ object SessionProcessTerm {
| from ${MyConfigSession.HIVE_TABLE0} | from ${MyConfigSession.HIVE_TABLE0}
| where servicename='trace2' and action!='ACTION_EQUIP_INFO' | where servicename='trace2' and action!='ACTION_EQUIP_INFO'
| and (case when ((view_class like '%YunqueApp%' and action!='ACTION_HEART_BEAT') or LENGTH(view_class)<=3 or view_class='YQGuidePageViewVC') then '2' else '1' end)='1' | and (case when ((view_class like '%YunqueApp%' and action!='ACTION_HEART_BEAT') or LENGTH(view_class)<=3 or view_class='YQGuidePageViewVC') then '2' else '1' end)='1'
| and created_day='${scnData}' | and created_day='${scnData}' and ${condition}
|""".stripMargin // and pseudo_session='3b3cec3b-2305-4e3a-b690-843e2f666c69' |""".stripMargin
val sourceDF: DataFrame = sparkSession.sql(SOURCE_SQL_TERM ) val sourceDF: DataFrame = sparkSession.sql(SOURCE_SQL_TERM )
println("sourceDF.show==================") println("sourceDF.show==================")
sourceDF.printSchema() sourceDF.printSchema()
...@@ -86,13 +90,15 @@ object SessionProcessTerm { ...@@ -86,13 +90,15 @@ object SessionProcessTerm {
println("selectDF.show========") println("selectDF.show========")
selectDF.printSchema() selectDF.printSchema()
println("selectDF.count=========",selectDF.count()) println("selectDF.count=========",selectDF.count())
val conditionGroup = List("<='4' ","between '5' and '9'",">'9'") // val conditionGroup = List("<='4' ","between '5' and '9'",">'9'")
val conditionGroup = List("='0'","='1'","='2'","='3'","='4'","='5'","='6'","='7'","='8'","='9'",
"='a'","='b'","='c'","='d'","='e'","='f'")
var dataCount = 0 var dataCount = 0
var index = 0 var index = 0
selectDF.persist(StorageLevel.MEMORY_AND_DISK_SER) selectDF.persist(StorageLevel.MEMORY_AND_DISK_SER)
for(condition <- conditionGroup){ for(condition <- conditionGroup){
index += 1 index += 1
val slideDF = selectDF.where(s" SUBSTRING(pseudo_session,1,1) ${condition}") val slideDF = selectDF.where(s" SUBSTRING(pseudo_session,2,1) ${condition}").repartition(100)
println(s"-----------------------------------compute refer columns,condition=${condition}-----------------------------------------") println(s"-----------------------------------compute refer columns,condition=${condition}-----------------------------------------")
val referResultRdd = sessionProcessTerm.getReferColumns(slideDF) val referResultRdd = sessionProcessTerm.getReferColumns(slideDF)
val referResultDF: DataFrame = sparkSession.createDataFrame(referResultRdd, StructType( val referResultDF: DataFrame = sparkSession.createDataFrame(referResultRdd, StructType(
...@@ -118,7 +124,7 @@ object SessionProcessTerm { ...@@ -118,7 +124,7 @@ object SessionProcessTerm {
println("referResultDF.show()============'") println("referResultDF.show()============'")
referResultDF.printSchema() referResultDF.printSchema()
// referResultDF.where("action_type='ACTION_VIEW'").show(100,true) // referResultDF.where("action_type='ACTION_VIEW'").show(100,true)
referResultDF.repartition(100).persist(StorageLevel.MEMORY_AND_DISK_SER).createOrReplaceTempView("refer_result_table") referResultDF.persist(StorageLevel.MEMORY_AND_DISK_SER).createOrReplaceTempView("refer_result_table")
println("-----------------------------------compute menu_code term-----------------------------------------") println("-----------------------------------compute menu_code term-----------------------------------------")
val getMenuTermSql = val getMenuTermSql =
""" """
...@@ -172,7 +178,7 @@ object SessionProcessTerm { ...@@ -172,7 +178,7 @@ object SessionProcessTerm {
|(cast(c.session_end_time as bigint)-cast(c.session_begin_time as bigint))/1000 session_time_diff, |(cast(c.session_end_time as bigint)-cast(c.session_begin_time as bigint))/1000 session_time_diff,
|a.refer_session_id |a.refer_session_id
|from refer_result_table a |from refer_result_table a
|left join refer_menu_table b on a.session_id=b.session_id and a.device_token=b.device_token and a.user_id=b.user_id and a.menu_code=b.menu_code and a.created_time=b.created_time |left join refer_menu_table b on a.session_id=b.session_id and a.device_token=b.device_token and a.user_id=b.user_id and a.menu_code=b.menu_code and a.created_time=b.created_time and a.action_type in('ACTION_VIEW','ACTION_HEART')
|left join session_end_table c on a.session_id = c.session_id |left join session_end_table c on a.session_id = c.session_id
| distribute by rand() | distribute by rand()
|""".stripMargin |""".stripMargin
...@@ -191,11 +197,6 @@ object SessionProcessTerm { ...@@ -191,11 +197,6 @@ object SessionProcessTerm {
println(s"${condition}的结果==${resCount}") println(s"${condition}的结果==${resCount}")
dataCount += resCount dataCount += resCount
} }
val fields = List("id","session_id","device_token","user_id","mobile","menu_code","menu_begin_time","menu_end_time","menu_time_diff","action_type",
"action_code","position","label_value","refer_menu_code","refer_action_code","refer_position","refer_label_value","refer_action_type","action_step",
"device_type","app_version","created_time","date_time","session_begin_time","session_end_time","session_time_diff","refer_session_id")
// sparkSession.sql(s"insert overwrite table ${MyConfigSession.HIVE_TABLE4} partition(created_day='${scnData}') " +
// s"select ${fields.mkString(",")} from ${MyConfigSession.HIVE_TABLE4} where created_day='${scnData}'")
println("----------------------------------update task record table---------------------------------------") println("----------------------------------update task record table---------------------------------------")
//任务执行成功,更新 Mysql record 配置表 //任务执行成功,更新 Mysql record 配置表
val updateSQL: String = val updateSQL: String =
......
...@@ -88,6 +88,8 @@ object UseUtil { ...@@ -88,6 +88,8 @@ object UseUtil {
conf.set("spark.reducer.maxSizeInFlight", "96m") conf.set("spark.reducer.maxSizeInFlight", "96m")
//设置字符串 //设置字符串
conf.set("spark.debug.maxToStringFields","100") conf.set("spark.debug.maxToStringFields","100")
//启用自动设置 Shuffle Reducer,默认false
conf.set("spark.sql.adaptive.enabled","true")
} }
......
Markdown 格式
0% or
您添加了 0 到此讨论。请谨慎行事。
先完成此消息的编辑!
想要评论请 注册