提交 4f5ca56b 编写于 作者: weicheng.mao's avatar weicheng.mao

Merge remote-tracking branch 'origin/master'

# Conflicts:
#	src/main/scala/com/utils/Driver.scala
...@@ -58,6 +58,7 @@ object MyConfigSession { ...@@ -58,6 +58,7 @@ object MyConfigSession {
|component_tag,app_version,device_type,device_brand,device_model,network_type,created from pica_log.picalog_trace_app_part |component_tag,app_version,device_type,device_brand,device_model,network_type,created from pica_log.picalog_trace_app_part
| where pseudo_session is not null and pseudo_session !='' | where pseudo_session is not null and pseudo_session !=''
| and pseudo_id !='' and extra_info !='com.picahealth.patient' and serviceName != 'trace3' | and pseudo_id !='' and extra_info !='com.picahealth.patient' and serviceName != 'trace3'
| and action!='ACTION_WEB_AFTER'
| and FROM_UNIXTIME(cast(substring(created,1,10) as bigint),'yyyy-MM-dd')=created_day and created is not null and created!='' | and FROM_UNIXTIME(cast(substring(created,1,10) as bigint),'yyyy-MM-dd')=created_day and created is not null and created!=''
""".stripMargin //and `action`!='ACTION_EQUIP_INFO' """.stripMargin //and `action`!='ACTION_EQUIP_INFO'
...@@ -239,4 +240,7 @@ object MyConfigSession { ...@@ -239,4 +240,7 @@ object MyConfigSession {
final val JDBC_PSSWORD = "5$7FXgz#e5JWP08e" final val JDBC_PSSWORD = "5$7FXgz#e5JWP08e"
final val JDBC_TABLE = "schedule_job_record" final val JDBC_TABLE = "schedule_job_record"
final val MENU_CODE_VIEW_PATH_SQL: String = "select distinct full_path view_path, menu_code from pica_ds.pica_bi_bi_menu_code_h5 where full_path is not Null"
} }
package com.session
import com.config.MyConfigSession
import com.pica.utils.{DateUtils, StringUtils}
import com.utils.{JDBCUtil, UseUtil}
import org.apache.spark.SparkConf
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.storage.StorageLevel
import java.sql
import java.sql.{PreparedStatement, Timestamp}
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import scala.util.control.Breaks.{break, breakable}
object LoopPicaLogTraceAppPart {
def apply(): LoopPicaLogTraceAppPart = new LoopPicaLogTraceAppPart()
def main(args: Array[String]): Unit = {
//1.执行任务之前先往record表记录
val insertSQL: String =
s"""
|insert into ${MyConfigSession.DATA_BASE}.${MyConfigSession.JDBC_TABLE} (job_id,job_name,job_type,job_scn,status,start_time)
|values(0,'picalog_trace_app_part_new','3',?,'0',?)
""".stripMargin
//设置同步数据的批次号,格式是2019-09-12
var scnData: String = DateUtils.getYesterdayDate
var tbname = ""
if (args.length >= 1) {
scnData = args(0)
if (args.length > 1 && args(1) != "") {
tbname = args(1)
}
}
println(s"scnData=${scnData}")
//设置任务开始时间,格式是2019-09-12 14:03:30
val startTime: String = DateUtils.getTodayTime
//存储SQL中的参数
val insertArr: Array[String] = Array[String](scnData, startTime)
//获取MYSQL连接
val connSql: sql.Connection = JDBCUtil.getConnection()
//向 record 表插入数据
val flag: Int = JDBCUtil.insertRecord(connSql, insertSQL, insertArr)
val loopPicaLogTraceAppPart: LoopPicaLogTraceAppPart = LoopPicaLogTraceAppPart()
try {
val sparkSession: SparkSession = loopPicaLogTraceAppPart.getSparkSession("LoopPicaLogTraceAppPart")
//获取menu_code广播变量
val menuCodeBroad: Broadcast[Map[String, String]] = UseUtil.getBroadcast(sparkSession, MyConfigSession.MENU_CODE_VIEW_PATH_SQL, "view_path", "menu_code")
// //获取actionCategory变量
val actionCategory: Broadcast[Map[String, String]] = UseUtil.getBroadcast(sparkSession, MyConfigSession.ACTION_CATEGORY_SQL, "action_type", "action_category")
sparkSession.sql(
s"""
|insert overwrite table pica_log.picalog_trace_app_part_backup partition(created_day='${scnData}')
|select
|id,package_id,uuid,device_token,pseudo_session,pseudo_id,class_name,action,view_path,component_tag,created,
|user_token,mobile,doctor_id,device_brand,device_model,app_version,device_type,device_ip,web_data,
|web_data_type,alternate_info,extra_info,network_type,created_on,remark1,remark2,remark3,remark4,remark5,
|remark6,remark7,remark8,remark9,remark10,user_token_tourist,machineid,servicename,servicesidepacketid,
|servicesiderecordid
|FROM pica_log.picalog_trace_app_part
|where created_day='${scnData}'
|""".stripMargin)
val traceDf = sparkSession.sql(
s"""
|select
|id,package_id,uuid,device_token,pseudo_session,pseudo_id,class_name,action,view_path,component_tag,created,
|user_token,mobile,doctor_id,device_brand,device_model,app_version,device_type,device_ip,web_data,
|web_data_type,alternate_info,extra_info,network_type,created_on,remark1,remark2,remark3,remark4,remark5,
|remark6,remark7,remark8,remark9,remark10,user_token_tourist,machineid,servicename,servicesidepacketid,
|servicesiderecordid,created_day
|FROM pica_log.picalog_trace_app_part_backup
|where created_day='${scnData}'
|""".stripMargin)
//
// var conditionGroup = List( "<='1'","between '2' and '3'","between '4' and '5'","between '6' and '7'","between '8' and '9'",
// "between 'a' and 'b'","between 'c' and 'd'",">='e'"," is null " ,"= '' ")
var index = 0
var dataCount = 0
// traceDf.persist(StorageLevel.MEMORY_AND_DISK_SER)
// for(condition <- conditionGroup){
// val slideDF: Dataset[Row] = traceDf.where(s" SUBSTRING(pseudo_session,2,1) ${condition}").repartition(100)
// println(s"-----------------------------------compute refer columns,condition=${condition}-----------------------------------------")
val trace_app_parts: RDD[Trace_app_part] = slideDFMapPartition(traceDf,menuCodeBroad,actionCategory)
import sparkSession.implicits._
val countrdd = trace_app_parts.count()
println(s"-----------------------------------compute countrdd,countrdd=${countrdd}-----------------------------------------")
val frame = trace_app_parts.toDF()
frame.
createOrReplaceTempView("newTrace_app_parts")
loadData( sparkSession, scnData,index)
val resCount = frame.count().toInt
println(s"${resCount}的结果==${resCount}")
dataCount += resCount
// }
println("----------------------------------update task record table---------------------------------------")
//任务执行成功,更新 Mysql record 配置表
val updateSQL: String =
s"""
|update ${MyConfigSession.JDBC_TABLE} set status=?,end_time=?,data_count=? where job_name='picalog_trace_app_part_new' and start_time='${startTime}'
""".stripMargin
val upreSta: PreparedStatement = connSql.prepareStatement(updateSQL)
upreSta.setString(1, "1")
upreSta.setString(2, DateUtils.getTodayTime)
upreSta.setInt(3, dataCount.toInt)
//更新表数据
upreSta.executeUpdate()
//关闭连接
JDBCUtil.close(connSql, upreSta)
sparkSession.stop()
} catch {
case e: Exception => {
println("-----------------------------------任务异常---------------------------------------------------")
e.printStackTrace()
val exceptionSQL: String =
s"""
|update ${MyConfigSession.JDBC_TABLE} set status=?,exception=?,end_time=? where job_name='picalog_trace_app_part_new' and start_time='${startTime}'
""".stripMargin
val errorArr = Array[String]("2", e.getMessage, DateUtils.getTodayTime)
JDBCUtil.insertRecord(connSql, exceptionSQL, errorArr)
connSql.close()
}
}
}
def slideDFMapPartition(slideDF: Dataset[Row],menuCodeBroad: Broadcast[Map[String, String]],actionCategory: Broadcast[Map[String, String]]) ={
val value: RDD[Trace_app_part] = slideDF.rdd.mapPartitions(itera => {
val path_menu: Map[String, String] = menuCodeBroad.value //关联到menu_code的映射表广播变量
val actionCategoryMap: Map[String, String] = actionCategory.value //关联到action_category的映射表广播变量
val resList: ListBuffer[Trace_app_part] = new ListBuffer[Trace_app_part]()
for (row <- itera) {
var mencode: String = ""
var component_tag: String = StringUtils.getNotNullString(row.getAs[String]("component_tag"))
val view_path = StringUtils.getNotNullString(row.getAs[String]("view_path"))
breakable {
//利用menu_code映射表匹配
for (tuple <- path_menu) {
//源数据view_path的字符串包含映射表view_path的字符串
if (view_path.contains(tuple._1)) {
//满足条件后,修改源数据的menu_code
mencode = tuple._2
println("--------------------menu_code match successfully-----------------------")
//结束遍历
break()
}
}
}
if (StringUtils.isNotEmpty(mencode)) {
if (component_tag.contains("#")) { //默认是
//按照#号切割
val strs: Array[String] = component_tag.split("#")
component_tag = mencode + component_tag.substring(component_tag.indexOf("#"))
} else if (StringUtils.isNumeric(component_tag) && component_tag.toInt > 0) { //如果component_tag不为0,且长度在合理区间,则作为menu_code值
component_tag = mencode
}
}
getBeanList(row, resList, component_tag)
}
resList.toIterator
})
value
}
def getBeanList(row:Row,resList: ListBuffer[Trace_app_part],component_tag:String) ={
resList += Trace_app_part(
row.getAs[Int]("id"),
StringUtils.getNotNullString(row.getAs[String]("package_id")),
StringUtils.getNotNullString(row.getAs[String]("uuid")),
StringUtils.getNotNullString(row.getAs[String]("device_token")),
StringUtils.getNotNullString(row.getAs[String]("pseudo_session")),
StringUtils.getNotNullString(row.getAs[String]("pseudo_id")),
StringUtils.getNotNullString(row.getAs[String]("class_name")),
StringUtils.getNotNullString(row.getAs[String]("action")),
StringUtils.getNotNullString(row.getAs[String]("view_path")),
component_tag,
StringUtils.getNotNullString(row.getAs[String]("created")),
StringUtils.getNotNullString(row.getAs[String]("user_token")),
StringUtils.getNotNullString(row.getAs[String]("mobile")),
StringUtils.getNotNullString(row.getAs[String]("doctor_id")),
StringUtils.getNotNullString(row.getAs[String]("device_brand")),
StringUtils.getNotNullString(row.getAs[String]("device_model")),
StringUtils.getNotNullString(row.getAs[String]("app_version")),
StringUtils.getNotNullString(row.getAs[String]("device_type")),
StringUtils.getNotNullString(row.getAs[String]("device_ip")),
StringUtils.getNotNullString(row.getAs[String]("web_data")),
StringUtils.getNotNullString(row.getAs[String]("web_data_type")),
StringUtils.getNotNullString(row.getAs[String]("alternate_info")),
StringUtils.getNotNullString(row.getAs[String]("extra_info")),
StringUtils.getNotNullString(row.getAs[String]("network_type")),
row.getAs[Timestamp]("created_on"),
StringUtils.getNotNullString(row.getAs[String]("remark1")),
StringUtils.getNotNullString(row.getAs[String]("remark2")),
StringUtils.getNotNullString(row.getAs[String]("remark3")),
StringUtils.getNotNullString(row.getAs[String]("remark4")),
StringUtils.getNotNullString(row.getAs[String]("remark5")),
StringUtils.getNotNullString(row.getAs[String]("remark6")),
StringUtils.getNotNullString(row.getAs[String]("remark7")),
StringUtils.getNotNullString(row.getAs[String]("remark8")),
StringUtils.getNotNullString(row.getAs[String]("remark9")),
StringUtils.getNotNullString(row.getAs[String]("remark10")),
StringUtils.getNotNullString(row.getAs[String]("user_token_tourist")),
StringUtils.getNotNullString(row.getAs[String]("machineid")),
StringUtils.getNotNullString(row.getAs[String]("servicename")),
StringUtils.getNotNullString(row.getAs[String]("servicesidepacketid")),
StringUtils.getNotNullString(row.getAs[String]("servicesiderecordid"))
)
resList
}
def loadData(sparkSession: SparkSession, partitionDay: String,index:Integer): Unit = {
var insertSql = "insert overwrite"
val loadDataSql =
s"""
|${insertSql} table pica_log.picalog_trace_app_part partition(created_day='${partitionDay}')
| select
| id,package_id,uuid,device_token,pseudo_session,pseudo_id,class_name,action,view_path,component_tag,created,
| user_token,mobile,doctor_id,device_brand,device_model,app_version,device_type,device_ip,web_data,
| web_data_type,alternate_info,extra_info,network_type,created_on,remark1,remark2,remark3,remark4,remark5,
| remark6,remark7,remark8,remark9,remark10,user_token_tourist,machineid,servicename,servicesidepacketid,
| servicesiderecordid
| from newTrace_app_parts
""".stripMargin
sparkSession.sql(loadDataSql)
}
}
class LoopPicaLogTraceAppPart {
def getSparkSession(appName: String): SparkSession = {
val conf: SparkConf = new SparkConf().setAppName(appName)
// .setMaster("local[*]")
// .set("dfs.client.use.datanode.hostname","true")
UseUtil.setConfigure(conf)
val sparkSession: SparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
sparkSession
}
}
case class Trace_app_part(
id:Int,
package_id:String,
uuid:String,
device_token:String,
pseudo_session:String,
pseudo_id:String,
class_name:String,
action:String,
view_path:String,
component_tag:String,
created:String,
user_token:String,
mobile:String,
doctor_id:String,
device_brand:String,
device_model:String,
app_version:String,
device_type:String,
device_ip:String,
web_data:String,
web_data_type:String,
alternate_info:String,
extra_info:String,
network_type:String,
created_on:Timestamp,
remark1:String,
remark2:String,
remark3:String,
remark4:String,
remark5:String,
remark6:String,
remark7:String,
remark8:String,
remark9:String,
remark10:String,
user_token_tourist:String,
machineid:String,
servicename:String,
servicesidepacketid:String,
servicesiderecordid:String)
\ No newline at end of file
...@@ -81,6 +81,7 @@ object SessionProcessTerm { ...@@ -81,6 +81,7 @@ object SessionProcessTerm {
| from pica_log.picalog_trace_app_part | from pica_log.picalog_trace_app_part
| where pseudo_session is not null and pseudo_session !='' and pseudo_id !='' and extra_info !='com.picahealth.patient' | where pseudo_session is not null and pseudo_session !='' and pseudo_id !='' and extra_info !='com.picahealth.patient'
| and action!='ACTION_EQUIP_INFO' | and action!='ACTION_EQUIP_INFO'
| and action!='ACTION_WEB_AFTER'
| and created is not null and created!='' and FROM_UNIXTIME(cast(substring(created,1,10) as bigint),'yyyy-MM-dd')='${scnData}' | and created is not null and created!='' and FROM_UNIXTIME(cast(substring(created,1,10) as bigint),'yyyy-MM-dd')='${scnData}'
| and created_day='${scnData}' and ${elseFiler} | and created_day='${scnData}' and ${elseFiler}
|""".stripMargin |""".stripMargin
......
package com.utils package com.utils
import com.session.{SessionMenuCalc, SessionPathConvert, SessionProcess, SessionProcessPath, SessionProcessPathNew, SessionProcessTerm} import com.session.{SessionMenuCalc, SessionProcess, SessionProcessPath, SessionProcessPathNew, SessionProcessTerm}
import org.apache.hadoop.util.ProgramDriver import org.apache.hadoop.util.ProgramDriver
/** /**
...@@ -17,6 +17,7 @@ object Driver { ...@@ -17,6 +17,7 @@ object Driver {
driver.addClass("SessionProcessTerm",classOf[SessionProcessTerm],"传递日期参数--用户Session数据etl导入到dw_fact_log_sesson_term表") driver.addClass("SessionProcessTerm",classOf[SessionProcessTerm],"传递日期参数--用户Session数据etl导入到dw_fact_log_sesson_term表")
driver.addClass("SessionProcessPathNew",classOf[SessionProcessPathNew],"用户Session数据分析导入到dw_fact_log_session_path_new表") driver.addClass("SessionProcessPathNew",classOf[SessionProcessPathNew],"用户Session数据分析导入到dw_fact_log_session_path_new表")
driver.addClass("SessionPathConvert",classOf[SessionPathConvert],"用户Session数据分析导入到dw_fact_log_session_path_convert表") driver.addClass("SessionPathConvert",classOf[SessionPathConvert],"用户Session数据分析导入到dw_fact_log_session_path_convert表")
driver.addClass("LoopPicaLogTraceAppPart",classOf[LoopPicaLogTraceAppPart],"弃用以前的menucode与viepath 对应表,启用新的一套")
driver.run(args) driver.run(args)
} }
......
Markdown 格式
0% or
您添加了 0 到此讨论。请谨慎行事。
先完成此消息的编辑!
想要评论请 注册