提交 ac165450 编写于 作者: weicheng.mao's avatar weicheng.mao

新增刷底表代码,session 和sessionterm 过滤 ACTION_WEB_AFTER 该事件

上级 49e0566f
......@@ -54,6 +54,7 @@ object MyConfigSession {
|component_tag,app_version,device_type,device_brand,device_model,network_type,created from pica_log.picalog_trace_app_part
| where pseudo_session is not null and pseudo_session !=''
| and pseudo_id !='' and extra_info !='com.picahealth.patient' and serviceName != 'trace3'
| and action!='ACTION_WEB_AFTER'
| and FROM_UNIXTIME(cast(substring(created,1,10) as bigint),'yyyy-MM-dd')=created_day and created is not null and created!=''
""".stripMargin //and `action`!='ACTION_EQUIP_INFO'
......@@ -185,4 +186,7 @@ object MyConfigSession {
final val JDBC_PSSWORD = "5$7FXgz#e5JWP08e"
final val JDBC_TABLE = "schedule_job_record"
final val MENU_CODE_VIEW_PATH_SQL: String = "select distinct full_path, menu_code from pica_ds.pica_bi_bi_menu_code_h5 where view_path is not Null"
}
package com.session
import com.config.MyConfigSession
import com.pica.utils.{DateUtils, StringUtils}
import com.utils.{JDBCUtil, UseUtil}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import java.sql
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import scala.util.control.Breaks.{break, breakable}
object LoopPicaLogTraceAppPart {
def apply(): LoopPicaLogTraceAppPart = new LoopPicaLogTraceAppPart()
def main(args: Array[String]): Unit = {
//1.执行任务之前先往record表记录
val insertSQL: String =
s"""
|insert into ${MyConfigSession.DATA_BASE}.${MyConfigSession.JDBC_TABLE} (job_id,job_name,job_type,job_scn,status,start_time)
|values(0,'picalog_trace_app_part_new','3',?,'0',?)
""".stripMargin
//设置同步数据的批次号,格式是2019-09-12
var scnData: String = DateUtils.getYesterdayDate
var elseFiler = " 1=1"
if (args.length >= 1) {
scnData = args(0)
if (args.length > 1 && args(1) != "") {
elseFiler = args(1)
}
}
println(s"scnData=${scnData}")
//设置任务开始时间,格式是2019-09-12 14:03:30
val startTime: String = DateUtils.getTodayTime
//存储SQL中的参数
val insertArr: Array[String] = Array[String](scnData, startTime)
//获取MYSQL连接
val connSql: sql.Connection = JDBCUtil.getConnection()
//向 record 表插入数据
val flag: Int = JDBCUtil.insertRecord(connSql, insertSQL, insertArr)
val loopPicaLogTraceAppPart: LoopPicaLogTraceAppPart = LoopPicaLogTraceAppPart()
try {
val sparkSession: SparkSession = loopPicaLogTraceAppPart.getSparkSession("LoopPicaLogTraceAppPart")
//获取menu_code广播变量
val menuCodeBroad = UseUtil.getBroadcast(sparkSession, MyConfigSession.MENU_CODE_VIEW_PATH_SQL, "view_path", "menu_code")
// //获取actionCategory变量
val actionCategory = UseUtil.getBroadcast(sparkSession, MyConfigSession.ACTION_CATEGORY_SQL, "action_type", "action_category")
sparkSession.sql(
s"""
|insert overwrite table pica_log.picalog_trace_app_part_backup partition(created_day='${scnData}')
|select
|id,package_id,uuid,device_token,pseudo_session,pseudo_id,class_name,action,view_path,component_tag,created,
|user_token,mobile,doctor_id,device_brand,device_model,app_version,device_type,device_ip,web_data,
|web_data_type,alternate_info,extra_info,network_type,created_on,remark1,remark2,remark3,remark4,remark5,
|remark6,remark7,remark8,remark9,remark10,user_token_tourist,machineid,servicename,servicesidepacketid,
|servicesiderecordid,created_day
|FROM pica_log.picalog_trace_app_part_temp_mao
|where FROM_UNIXTIME(cast(substring(created,1,10) as bigint),'yyyy-MM-dd')='${scnData}'
|""".stripMargin)
val traceDf = sparkSession.sql(
s"""
|select
|id,package_id,uuid,device_token,pseudo_session,pseudo_id,class_name,action,view_path,component_tag,created,
|user_token,mobile,doctor_id,device_brand,device_model,app_version,device_type,device_ip,web_data,
|web_data_type,alternate_info,extra_info,network_type,created_on,remark1,remark2,remark3,remark4,remark5,
|remark6,remark7,remark8,remark9,remark10,user_token_tourist,machineid,servicename,servicesidepacketid,
|servicesiderecordid,created_day
|FROM pica_log.picalog_trace_app_part_temp_mao
|where FROM_UNIXTIME(cast(substring(created,1,10) as bigint),'yyyy-MM-dd')='${scnData}'
|""".stripMargin)
val trace_app_parts: RDD[Trace_app_part] = traceDf.rdd.mapPartitions(itera => {
val path_menu: Map[String, String] = menuCodeBroad.value //关联到menu_code的映射表广播变量
val actionCategoryMap: Map[String, String] = actionCategory.value //关联到action_category的映射表广播变量
val resList: ListBuffer[Trace_app_part] = new ListBuffer[Trace_app_part]()
for (row <- itera) {
var mencode: String = ""
var component_tag: String = StringUtils.getNotNullString(row.getAs[String]("component_tag"))
val view_path = StringUtils.getNotNullString(row.getAs[String]("view_path"))
breakable {
//利用menu_code映射表匹配
for (tuple <- path_menu) {
//源数据view_path的字符串包含映射表view_path的字符串
if (view_path.contains(tuple._1)) {
//满足条件后,修改源数据的menu_code
mencode = tuple._2
println("--------------------menu_code match successfully-----------------------")
//结束遍历
break()
}
}
}
if (StringUtils.isNotEmpty(mencode)) {
if (component_tag.contains("#")) { //默认是
//按照#号切割
val strs: Array[String] = component_tag.split("#")
component_tag = mencode + component_tag.substring(component_tag.indexOf("#"))
} else if (StringUtils.isNumeric(component_tag) && component_tag.toInt > 0) { //如果component_tag不为0,且长度在合理区间,则作为menu_code值
component_tag = mencode
}
}
getBeanList(row, resList,component_tag)
}
resList.toIterator
})
import sparkSession.implicits._
val frame = trace_app_parts.toDF().createOrReplaceTempView("newTrace_app_parts")
loadData(sparkSession,scnData)
} catch {
case e: Exception => {
println("-----------------------------------任务异常---------------------------------------------------")
e.printStackTrace()
val exceptionSQL: String =
s"""
|update ${MyConfigSession.JDBC_TABLE} set status=?,exception=?,end_time=? where job_name='picalog_trace_app_part' and start_time='${startTime}'
""".stripMargin
val errorArr = Array[String]("2", e.getMessage, DateUtils.getTodayTime)
JDBCUtil.insertRecord(connSql, exceptionSQL, errorArr)
connSql.close()
}
}
}
def getBeanList(row:Row,resList: ListBuffer[Trace_app_part],component_tag:String) ={
resList += Trace_app_part(
row.getAs[Int]("id"),
StringUtils.getNotNullString(row.getAs[String]("package_id")),
StringUtils.getNotNullString(row.getAs[String]("uuid")),
StringUtils.getNotNullString(row.getAs[String]("device_token")),
StringUtils.getNotNullString(row.getAs[String]("pseudo_session")),
StringUtils.getNotNullString(row.getAs[String]("pseudo_id")),
StringUtils.getNotNullString(row.getAs[String]("class_name")),
StringUtils.getNotNullString(row.getAs[String]("action")),
StringUtils.getNotNullString(row.getAs[String]("view_path")),
component_tag,
StringUtils.getNotNullString(row.getAs[String]("created")),
StringUtils.getNotNullString(row.getAs[String]("user_token")),
StringUtils.getNotNullString(row.getAs[String]("mobile")),
StringUtils.getNotNullString(row.getAs[String]("doctor_id")),
StringUtils.getNotNullString(row.getAs[String]("device_brand")),
StringUtils.getNotNullString(row.getAs[String]("device_model")),
StringUtils.getNotNullString(row.getAs[String]("app_version")),
StringUtils.getNotNullString(row.getAs[String]("device_type")),
StringUtils.getNotNullString(row.getAs[String]("device_ip")),
StringUtils.getNotNullString(row.getAs[String]("web_data")),
StringUtils.getNotNullString(row.getAs[String]("web_data_type")),
StringUtils.getNotNullString(row.getAs[String]("alternate_info")),
StringUtils.getNotNullString(row.getAs[String]("extra_info")),
StringUtils.getNotNullString(row.getAs[String]("network_type")),
StringUtils.getNotNullString(row.getAs[String]("created_on")),
StringUtils.getNotNullString(row.getAs[String]("remark1")),
StringUtils.getNotNullString(row.getAs[String]("remark2")),
StringUtils.getNotNullString(row.getAs[String]("remark3")),
StringUtils.getNotNullString(row.getAs[String]("remark4")),
StringUtils.getNotNullString(row.getAs[String]("remark5")),
StringUtils.getNotNullString(row.getAs[String]("remark6")),
StringUtils.getNotNullString(row.getAs[String]("remark7")),
StringUtils.getNotNullString(row.getAs[String]("remark8")),
StringUtils.getNotNullString(row.getAs[String]("remark9")),
StringUtils.getNotNullString(row.getAs[String]("remark10")),
StringUtils.getNotNullString(row.getAs[String]("user_token_tourist")),
StringUtils.getNotNullString(row.getAs[String]("machineID")),
StringUtils.getNotNullString(row.getAs[String]("serviceName")),
StringUtils.getNotNullString(row.getAs[String]("serviceSidePacketId")),
StringUtils.getNotNullString(row.getAs[String]("serviceSideRecordId"))
)
resList
}
def loadData(sparkSession: SparkSession, partitionDay: String): Unit = {
val loadDataSql =
s"""
|insert overwrite table pica_log.picalog_trace_app_part partition(created_day='${partitionDay}')
| select
| id,package_id,uuid,device_token,pseudo_session,pseudo_id,class_name,action,view_path,component_tag,created,
| user_token,mobile,doctor_id,device_brand,device_model,app_version,device_type,device_ip,web_data,
| web_data_type,alternate_info,extra_info,network_type,created_on,remark1,remark2,remark3,remark4,remark5,
| remark6,remark7,remark8,remark9,remark10,user_token_tourist,machineid,servicename,servicesidepacketid,
| servicesiderecordid,created_day
| from newTrace_app_parts
""".stripMargin
sparkSession.sql(loadDataSql)
}
}
class LoopPicaLogTraceAppPart {
def getSparkSession(appName: String): SparkSession = {
val conf: SparkConf = new SparkConf().setAppName(appName)
UseUtil.setConfigure(conf)
val sparkSession: SparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
sparkSession
}
}
case class Trace_app_part(
id:Int,
package_id:String,
uuid:String,
device_token:String,
pseudo_session:String,
pseudo_id:String,
class_name:String,
action:String,
view_path:String,
component_tag:String,
created:String,
user_token:String,
mobile:String,
doctor_id:String,
device_brand:String,
device_model:String,
app_version:String,
device_type:String,
device_ip:String,
web_data:String,
web_data_type:String,
alternate_info:String,
extra_info:String,
network_type:String,
created_on:String,
remark1:String,
remark2:String,
remark3:String,
remark4:String,
remark5:String,
remark6:String,
remark7:String,
remark8:String,
remark9:String,
remark10:String,
user_token_tourist:String,
machineid:String,
servicename:String,
servicesidepacketid:String,
servicesiderecordid:String)
\ No newline at end of file
......@@ -81,6 +81,7 @@ object SessionProcessTerm {
| from pica_log.picalog_trace_app_part
| where pseudo_session is not null and pseudo_session !='' and pseudo_id !='' and extra_info !='com.picahealth.patient'
| and action!='ACTION_EQUIP_INFO'
| and action!='ACTION_WEB_AFTER'
| and created is not null and created!='' and FROM_UNIXTIME(cast(substring(created,1,10) as bigint),'yyyy-MM-dd')='${scnData}'
| and created_day='${scnData}' and ${elseFiler}
|""".stripMargin
......
package com.utils
import com.session.{SessionMenuCalc, SessionProcess, SessionProcessPath, SessionProcessPathNew, SessionProcessTerm}
import com.session.{LoopPicaLogTraceAppPart, SessionMenuCalc, SessionProcess, SessionProcessPath, SessionProcessPathNew, SessionProcessTerm}
import org.apache.hadoop.util.ProgramDriver
/**
......@@ -16,6 +16,7 @@ object Driver {
driver.addClass("SessionMenuCalc",classOf[SessionMenuCalc],"传递日期参数--用户Session数据分析导入到dw_fact_log_session_menu_calc表")
driver.addClass("SessionProcessTerm",classOf[SessionProcessTerm],"传递日期参数--用户Session数据etl导入到dw_fact_log_sesson_term表")
driver.addClass("SessionProcessPathNew",classOf[SessionProcessPathNew],"用户Session数据分析导入到dw_fact_log_session_path_new表")
driver.addClass("LoopPicaLogTraceAppPart",classOf[LoopPicaLogTraceAppPart],"弃用以前的menucode与viepath 对应表,启用新的一套")
driver.run(args)
}
......
Markdown 格式
0% or
您添加了 0 到此讨论。请谨慎行事。
先完成此消息的编辑!
想要评论请 注册