提交 74138c3e 编写于 作者: weicheng.mao's avatar weicheng.mao

新增刷底表代码,session 和sessionterm 过滤 ACTION_WEB_AFTER 该事件BUG修复

上级 ac165450
......@@ -186,7 +186,7 @@ object MyConfigSession {
final val JDBC_PSSWORD = "5$7FXgz#e5JWP08e"
final val JDBC_TABLE = "schedule_job_record"
final val MENU_CODE_VIEW_PATH_SQL: String = "select distinct full_path, menu_code from pica_ds.pica_bi_bi_menu_code_h5 where view_path is not Null"
final val MENU_CODE_VIEW_PATH_SQL: String = "select distinct full_path view_path, menu_code from pica_ds.pica_bi_bi_menu_code_h5 where full_path is not Null"
}
......@@ -4,10 +4,13 @@ import com.config.MyConfigSession
import com.pica.utils.{DateUtils, StringUtils}
import com.utils.{JDBCUtil, UseUtil}
import org.apache.spark.SparkConf
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.storage.StorageLevel
import java.sql
import java.sql.{PreparedStatement, Timestamp}
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import scala.util.control.Breaks.{break, breakable}
......@@ -25,11 +28,11 @@ object LoopPicaLogTraceAppPart {
""".stripMargin
//设置同步数据的批次号,格式是2019-09-12
var scnData: String = DateUtils.getYesterdayDate
var elseFiler = " 1=1"
var tbname = ""
if (args.length >= 1) {
scnData = args(0)
if (args.length > 1 && args(1) != "") {
elseFiler = args(1)
tbname = args(1)
}
}
println(s"scnData=${scnData}")
......@@ -45,9 +48,9 @@ object LoopPicaLogTraceAppPart {
try {
val sparkSession: SparkSession = loopPicaLogTraceAppPart.getSparkSession("LoopPicaLogTraceAppPart")
//获取menu_code广播变量
val menuCodeBroad = UseUtil.getBroadcast(sparkSession, MyConfigSession.MENU_CODE_VIEW_PATH_SQL, "view_path", "menu_code")
val menuCodeBroad: Broadcast[Map[String, String]] = UseUtil.getBroadcast(sparkSession, MyConfigSession.MENU_CODE_VIEW_PATH_SQL, "view_path", "menu_code")
// //获取actionCategory变量
val actionCategory = UseUtil.getBroadcast(sparkSession, MyConfigSession.ACTION_CATEGORY_SQL, "action_type", "action_category")
val actionCategory: Broadcast[Map[String, String]] = UseUtil.getBroadcast(sparkSession, MyConfigSession.ACTION_CATEGORY_SQL, "action_type", "action_category")
sparkSession.sql(
s"""
|insert overwrite table pica_log.picalog_trace_app_part_backup partition(created_day='${scnData}')
......@@ -56,10 +59,11 @@ object LoopPicaLogTraceAppPart {
|user_token,mobile,doctor_id,device_brand,device_model,app_version,device_type,device_ip,web_data,
|web_data_type,alternate_info,extra_info,network_type,created_on,remark1,remark2,remark3,remark4,remark5,
|remark6,remark7,remark8,remark9,remark10,user_token_tourist,machineid,servicename,servicesidepacketid,
|servicesiderecordid,created_day
|FROM pica_log.picalog_trace_app_part_temp_mao
|where FROM_UNIXTIME(cast(substring(created,1,10) as bigint),'yyyy-MM-dd')='${scnData}'
|servicesiderecordid
|FROM pica_log.picalog_trace_app_part
|where created_day='${scnData}'
|""".stripMargin)
val traceDf = sparkSession.sql(
s"""
|select
......@@ -68,50 +72,49 @@ object LoopPicaLogTraceAppPart {
|web_data_type,alternate_info,extra_info,network_type,created_on,remark1,remark2,remark3,remark4,remark5,
|remark6,remark7,remark8,remark9,remark10,user_token_tourist,machineid,servicename,servicesidepacketid,
|servicesiderecordid,created_day
|FROM pica_log.picalog_trace_app_part_temp_mao
|where FROM_UNIXTIME(cast(substring(created,1,10) as bigint),'yyyy-MM-dd')='${scnData}'
|FROM pica_log.picalog_trace_app_part_backup
|where created_day='${scnData}'
|""".stripMargin)
//
// var conditionGroup = List( "<='1'","between '2' and '3'","between '4' and '5'","between '6' and '7'","between '8' and '9'",
// "between 'a' and 'b'","between 'c' and 'd'",">='e'"," is null " ,"= '' ")
var index = 0
var dataCount = 0
// traceDf.persist(StorageLevel.MEMORY_AND_DISK_SER)
// for(condition <- conditionGroup){
val trace_app_parts: RDD[Trace_app_part] = traceDf.rdd.mapPartitions(itera => {
val path_menu: Map[String, String] = menuCodeBroad.value //关联到menu_code的映射表广播变量
val actionCategoryMap: Map[String, String] = actionCategory.value //关联到action_category的映射表广播变量
val resList: ListBuffer[Trace_app_part] = new ListBuffer[Trace_app_part]()
for (row <- itera) {
var mencode: String = ""
var component_tag: String = StringUtils.getNotNullString(row.getAs[String]("component_tag"))
val view_path = StringUtils.getNotNullString(row.getAs[String]("view_path"))
breakable {
//利用menu_code映射表匹配
for (tuple <- path_menu) {
//源数据view_path的字符串包含映射表view_path的字符串
if (view_path.contains(tuple._1)) {
//满足条件后,修改源数据的menu_code
mencode = tuple._2
println("--------------------menu_code match successfully-----------------------")
//结束遍历
break()
}
}
}
if (StringUtils.isNotEmpty(mencode)) {
if (component_tag.contains("#")) { //默认是
//按照#号切割
val strs: Array[String] = component_tag.split("#")
component_tag = mencode + component_tag.substring(component_tag.indexOf("#"))
} else if (StringUtils.isNumeric(component_tag) && component_tag.toInt > 0) { //如果component_tag不为0,且长度在合理区间,则作为menu_code值
component_tag = mencode
}
}
getBeanList(row, resList,component_tag)
}
resList.toIterator
})
// val slideDF: Dataset[Row] = traceDf.where(s" SUBSTRING(pseudo_session,2,1) ${condition}").repartition(100)
// println(s"-----------------------------------compute refer columns,condition=${condition}-----------------------------------------")
val trace_app_parts: RDD[Trace_app_part] = slideDFMapPartition(traceDf,menuCodeBroad,actionCategory)
import sparkSession.implicits._
val frame = trace_app_parts.toDF().createOrReplaceTempView("newTrace_app_parts")
val countrdd = trace_app_parts.count()
println(s"-----------------------------------compute countrdd,countrdd=${countrdd}-----------------------------------------")
val frame = trace_app_parts.toDF()
frame.
createOrReplaceTempView("newTrace_app_parts")
loadData( sparkSession, scnData,index)
loadData(sparkSession,scnData)
val resCount = frame.count().toInt
println(s"${resCount}的结果==${resCount}")
dataCount += resCount
// }
println("----------------------------------update task record table---------------------------------------")
//任务执行成功,更新 Mysql record 配置表
val updateSQL: String =
s"""
|update ${MyConfigSession.JDBC_TABLE} set status=?,end_time=?,data_count=? where job_name='picalog_trace_app_part' and start_time='${startTime}'
""".stripMargin
val upreSta: PreparedStatement = connSql.prepareStatement(updateSQL)
upreSta.setString(1, "1")
upreSta.setString(2, DateUtils.getTodayTime)
upreSta.setInt(3, dataCount.toInt)
//更新表数据
upreSta.executeUpdate()
//关闭连接
JDBCUtil.close(connSql, upreSta)
sparkSession.stop()
} catch {
case e: Exception => {
......@@ -119,7 +122,7 @@ object LoopPicaLogTraceAppPart {
e.printStackTrace()
val exceptionSQL: String =
s"""
|update ${MyConfigSession.JDBC_TABLE} set status=?,exception=?,end_time=? where job_name='picalog_trace_app_part' and start_time='${startTime}'
|update ${MyConfigSession.JDBC_TABLE} set status=?,exception=?,end_time=? where job_name='picalog_trace_app_part_new' and start_time='${startTime}'
""".stripMargin
val errorArr = Array[String]("2", e.getMessage, DateUtils.getTodayTime)
JDBCUtil.insertRecord(connSql, exceptionSQL, errorArr)
......@@ -131,6 +134,47 @@ object LoopPicaLogTraceAppPart {
}
def slideDFMapPartition(slideDF: Dataset[Row],menuCodeBroad: Broadcast[Map[String, String]],actionCategory: Broadcast[Map[String, String]]) ={
val value: RDD[Trace_app_part] = slideDF.rdd.mapPartitions(itera => {
val path_menu: Map[String, String] = menuCodeBroad.value //关联到menu_code的映射表广播变量
val actionCategoryMap: Map[String, String] = actionCategory.value //关联到action_category的映射表广播变量
val resList: ListBuffer[Trace_app_part] = new ListBuffer[Trace_app_part]()
for (row <- itera) {
var mencode: String = ""
var component_tag: String = StringUtils.getNotNullString(row.getAs[String]("component_tag"))
val view_path = StringUtils.getNotNullString(row.getAs[String]("view_path"))
breakable {
//利用menu_code映射表匹配
for (tuple <- path_menu) {
//源数据view_path的字符串包含映射表view_path的字符串
if (view_path.contains(tuple._1)) {
//满足条件后,修改源数据的menu_code
mencode = tuple._2
println("--------------------menu_code match successfully-----------------------")
//结束遍历
break()
}
}
}
if (StringUtils.isNotEmpty(mencode)) {
if (component_tag.contains("#")) { //默认是
//按照#号切割
val strs: Array[String] = component_tag.split("#")
component_tag = mencode + component_tag.substring(component_tag.indexOf("#"))
} else if (StringUtils.isNumeric(component_tag) && component_tag.toInt > 0) { //如果component_tag不为0,且长度在合理区间,则作为menu_code值
component_tag = mencode
}
}
getBeanList(row, resList, component_tag)
}
resList.toIterator
})
value
}
def getBeanList(row:Row,resList: ListBuffer[Trace_app_part],component_tag:String) ={
resList += Trace_app_part(
row.getAs[Int]("id"),
......@@ -157,7 +201,7 @@ object LoopPicaLogTraceAppPart {
StringUtils.getNotNullString(row.getAs[String]("alternate_info")),
StringUtils.getNotNullString(row.getAs[String]("extra_info")),
StringUtils.getNotNullString(row.getAs[String]("network_type")),
StringUtils.getNotNullString(row.getAs[String]("created_on")),
row.getAs[Timestamp]("created_on"),
StringUtils.getNotNullString(row.getAs[String]("remark1")),
StringUtils.getNotNullString(row.getAs[String]("remark2")),
StringUtils.getNotNullString(row.getAs[String]("remark3")),
......@@ -169,26 +213,27 @@ object LoopPicaLogTraceAppPart {
StringUtils.getNotNullString(row.getAs[String]("remark9")),
StringUtils.getNotNullString(row.getAs[String]("remark10")),
StringUtils.getNotNullString(row.getAs[String]("user_token_tourist")),
StringUtils.getNotNullString(row.getAs[String]("machineID")),
StringUtils.getNotNullString(row.getAs[String]("serviceName")),
StringUtils.getNotNullString(row.getAs[String]("serviceSidePacketId")),
StringUtils.getNotNullString(row.getAs[String]("serviceSideRecordId"))
StringUtils.getNotNullString(row.getAs[String]("machineid")),
StringUtils.getNotNullString(row.getAs[String]("servicename")),
StringUtils.getNotNullString(row.getAs[String]("servicesidepacketid")),
StringUtils.getNotNullString(row.getAs[String]("servicesiderecordid"))
)
resList
}
def loadData(sparkSession: SparkSession, partitionDay: String): Unit = {
def loadData(sparkSession: SparkSession, partitionDay: String,index:Integer): Unit = {
var insertSql = "insert overwrite"
val loadDataSql =
s"""
|insert overwrite table pica_log.picalog_trace_app_part partition(created_day='${partitionDay}')
|${insertSql} table pica_log.picalog_trace_app_part partition(created_day='${partitionDay}')
| select
| id,package_id,uuid,device_token,pseudo_session,pseudo_id,class_name,action,view_path,component_tag,created,
| user_token,mobile,doctor_id,device_brand,device_model,app_version,device_type,device_ip,web_data,
| web_data_type,alternate_info,extra_info,network_type,created_on,remark1,remark2,remark3,remark4,remark5,
| remark6,remark7,remark8,remark9,remark10,user_token_tourist,machineid,servicename,servicesidepacketid,
| servicesiderecordid,created_day
| servicesiderecordid
| from newTrace_app_parts
""".stripMargin
sparkSession.sql(loadDataSql)
......@@ -202,6 +247,8 @@ class LoopPicaLogTraceAppPart {
def getSparkSession(appName: String): SparkSession = {
val conf: SparkConf = new SparkConf().setAppName(appName)
// .setMaster("local[*]")
// .set("dfs.client.use.datanode.hostname","true")
UseUtil.setConfigure(conf)
val sparkSession: SparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
sparkSession
......@@ -211,42 +258,42 @@ class LoopPicaLogTraceAppPart {
case class Trace_app_part(
id:Int,
package_id:String,
uuid:String,
device_token:String,
pseudo_session:String,
pseudo_id:String,
class_name:String,
action:String,
view_path:String,
component_tag:String,
created:String,
user_token:String,
mobile:String,
doctor_id:String,
device_brand:String,
device_model:String,
app_version:String,
device_type:String,
device_ip:String,
web_data:String,
web_data_type:String,
alternate_info:String,
extra_info:String,
network_type:String,
created_on:String,
remark1:String,
remark2:String,
remark3:String,
remark4:String,
remark5:String,
remark6:String,
remark7:String,
remark8:String,
remark9:String,
remark10:String,
user_token_tourist:String,
machineid:String,
servicename:String,
servicesidepacketid:String,
servicesiderecordid:String)
\ No newline at end of file
package_id:String,
uuid:String,
device_token:String,
pseudo_session:String,
pseudo_id:String,
class_name:String,
action:String,
view_path:String,
component_tag:String,
created:String,
user_token:String,
mobile:String,
doctor_id:String,
device_brand:String,
device_model:String,
app_version:String,
device_type:String,
device_ip:String,
web_data:String,
web_data_type:String,
alternate_info:String,
extra_info:String,
network_type:String,
created_on:Timestamp,
remark1:String,
remark2:String,
remark3:String,
remark4:String,
remark5:String,
remark6:String,
remark7:String,
remark8:String,
remark9:String,
remark10:String,
user_token_tourist:String,
machineid:String,
servicename:String,
servicesidepacketid:String,
servicesiderecordid:String)
\ No newline at end of file
Markdown 格式
0% or
您添加了 0 到此讨论。请谨慎行事。
先完成此消息的编辑!
想要评论请 注册