提交 0315acc0 编写于 作者: wuyunfeng's avatar wuyunfeng

调整log_session—_path表中user_id类型为string

上级 f769e66c
...@@ -15,7 +15,11 @@ ...@@ -15,7 +15,11 @@
<maven.compiler.target>1.8</maven.compiler.target> <maven.compiler.target>1.8</maven.compiler.target>
<scala.version>2.11.12</scala.version> <scala.version>2.11.12</scala.version>
<spark.version>2.4.0-cdh6.2.0</spark.version> <spark.version>2.4.0-cdh6.2.0</spark.version>
<hive.version>2.1.1-cdh6.2.0</hive.version>
<hbase.version>2.1.0-cdh6.2.0</hbase.version>
<java.version>1.8</java.version> <java.version>1.8</java.version>
<fastjson.version>1.2.58</fastjson.version>
<mysqlconn.version>8.0.13</mysqlconn.version>
<!--<scopeargs>compile</scopeargs>--> <!--<scopeargs>compile</scopeargs>-->
<scopeargs>provided</scopeargs> <scopeargs>provided</scopeargs>
...@@ -25,13 +29,13 @@ ...@@ -25,13 +29,13 @@
<repositories> <repositories>
<repository> <repository>
<id>cloudera-releases</id> <id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos</url> <url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
<releases> <releases>
<enabled>true</enabled> <enabled>true</enabled>
</releases> </releases>
<snapshots> <snapshots>
<enabled>false</enabled> <enabled>true</enabled>
</snapshots> </snapshots>
</repository> </repository>
</repositories> </repositories>
...@@ -70,14 +74,14 @@ ...@@ -70,14 +74,14 @@
<dependency> <dependency>
<groupId>org.apache.hive</groupId> <groupId>org.apache.hive</groupId>
<artifactId>hive-hbase-handler</artifactId> <artifactId>hive-hbase-handler</artifactId>
<version>2.1.1-cdh6.2.0</version> <version>${hive.version}</version>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.hbase</groupId> <groupId>org.apache.hbase</groupId>
<artifactId>hbase-spark</artifactId> <artifactId>hbase-spark</artifactId>
<version>2.1.0-cdh6.2.0</version> <version>${hbase.version}</version>
<exclusions> <exclusions>
<exclusion> <exclusion>
<groupId>org.slf4j</groupId> <groupId>org.slf4j</groupId>
...@@ -90,14 +94,14 @@ ...@@ -90,14 +94,14 @@
<dependency> <dependency>
<groupId>mysql</groupId> <groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId> <artifactId>mysql-connector-java</artifactId>
<version>8.0.13</version> <version>${mysqlconn.version}</version>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.alibaba</groupId> <groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId> <artifactId>fastjson</artifactId>
<version>1.2.58</version> <version>${fastjson.version}</version>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
...@@ -105,14 +109,14 @@ ...@@ -105,14 +109,14 @@
<dependency> <dependency>
<groupId>org.apache.spark</groupId> <groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId> <artifactId>spark-streaming_2.11</artifactId>
<version>2.4.0-cdh6.2.0</version> <version>${spark.version}</version>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.spark</groupId> <groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.4.0-cdh6.2.0</version> <version>${spark.version}</version>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
</dependencies> </dependencies>
...@@ -128,6 +132,7 @@ ...@@ -128,6 +132,7 @@
<plugin> <plugin>
<groupId>org.apache.maven.plugins</groupId> <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId> <artifactId>maven-assembly-plugin</artifactId>
<version>2.5</version>
<configuration> <configuration>
<descriptors> <descriptors>
<descriptor>src/assembly/assembly.xml</descriptor> <descriptor>src/assembly/assembly.xml</descriptor>
...@@ -178,6 +183,7 @@ ...@@ -178,6 +183,7 @@
<!-- 定义项目编译版本 --> <!-- 定义项目编译版本 -->
<groupId>org.apache.maven.plugins</groupId> <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId> <artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration> <configuration>
<source>${maven.compiler.source}</source> <source>${maven.compiler.source}</source>
<target>${maven.compiler.target}</target> <target>${maven.compiler.target}</target>
......
...@@ -53,7 +53,7 @@ object MyConfigSession { ...@@ -53,7 +53,7 @@ object MyConfigSession {
//从dw_fact_log_session表中筛选数据 //从dw_fact_log_session表中筛选数据
final val SOURCE_SQL_PATH: String = final val SOURCE_SQL_PATH: String =
s""" s"""
|select session_id,cast(user_id as int) user_id,action_type,user_token,menu_code,action_code,position,label_value, |select session_id,user_id,action_type,user_token,menu_code,action_code,position,label_value,
|app_version,device_type,created_time,date_time from ${MyConfigSession.HIVE_TABLE1} |app_version,device_type,created_time,date_time from ${MyConfigSession.HIVE_TABLE1}
| where created_day=DATE_SUB(current_date(),1) and app_version >= '3.1.7' | where created_day=DATE_SUB(current_date(),1) and app_version >= '3.1.7'
| AND ((action_type ='ACTION_CLICK') OR (action_type ='ACTION_VIEW' and menu_code != '0' and menu_code !='null' and menu_code !='')) | AND ((action_type ='ACTION_CLICK') OR (action_type ='ACTION_VIEW' and menu_code != '0' and menu_code !='null' and menu_code !=''))
......
...@@ -109,23 +109,23 @@ class SessionProcess extends java.io.Serializable{ ...@@ -109,23 +109,23 @@ class SessionProcess extends java.io.Serializable{
//按照#号切割 //按照#号切割
val strs: Array[String] = component_tag.split("#") val strs: Array[String] = component_tag.split("#")
strs.length match { strs.length match {
case 4 => { case 1 => {
menu_code = strs(0)
}
case 2 => {
menu_code = strs(0) menu_code = strs(0)
action_code = strs(1) action_code = strs(1)
position = strs(2)
label_value = strs(3)
} }
case 3 => { case 3 => {
menu_code = strs(0) menu_code = strs(0)
action_code = strs(1) action_code = strs(1)
position = strs(2) position = strs(2)
} }
case 2 => { case _ => {
menu_code = strs(0) menu_code = strs(0)
action_code = strs(1) action_code = strs(1)
} position = strs(2)
case 1 => { label_value = strs(3).substring(0,math.min(250,strs(3).length))
menu_code = strs(0)
} }
} }
} }
......
...@@ -55,13 +55,13 @@ object SessionProcessPath { ...@@ -55,13 +55,13 @@ object SessionProcessPath {
//注册日期在流量统计日期之前的用户 //注册日期在流量统计日期之前的用户
val doctorDF: DataFrame = sparkSession.sql( val doctorDF: DataFrame = sparkSession.sql(
"select id from pica_ds.pica_doctor where to_date(creat_time) <=DATE_SUB(current_date(),1)") "select cast(id as string) id from pica_ds.pica_doctor where to_date(creat_time) <=DATE_SUB(current_date(),1)")
sourceDF.join(doctorDF, sourceDF("user_id") === doctorDF("id"), "left") sourceDF.join(doctorDF, sourceDF("user_id") === doctorDF("id"), "left")
.createOrReplaceTempView("tmp_table") .createOrReplaceTempView("tmp_table")
//将id为null的记录设置为0 //将id为null的记录设置为0
val reSql: String = "select session_id,case when id is null then 0 else user_id END as user_id,action_type," + val reSql: String = "select session_id,case when id is null then '0' else user_id END as user_id,action_type," +
"user_token,menu_code,action_code,position,label_value,app_version,device_type,created_time,date_time from tmp_table" "user_token,menu_code,action_code,position,label_value,app_version,device_type,created_time,date_time from tmp_table"
val selectDF: DataFrame = sparkSession.sql(reSql) val selectDF: DataFrame = sparkSession.sql(reSql)
......
...@@ -53,7 +53,7 @@ object SessionProcessPathArgs { ...@@ -53,7 +53,7 @@ object SessionProcessPathArgs {
//筛选源数据 //筛选源数据
val sourceSql = val sourceSql =
s""" s"""
|select session_id,cast(user_id as int) user_id,action_type,user_token,menu_code,action_code,position,label_value, |select session_id,user_id,action_type,user_token,menu_code,action_code,position,label_value,
|app_version,device_type,created_time,date_time from ${MyConfigSession.HIVE_TABLE1} |app_version,device_type,created_time,date_time from ${MyConfigSession.HIVE_TABLE1}
| where created_day='${args(0)}' and app_version >= '3.1.7' and menu_code !='null' and menu_code !='' | where created_day='${args(0)}' and app_version >= '3.1.7' and menu_code !='null' and menu_code !=''
| and ((action_type ='ACTION_VIEW' and menu_code != '0') or (action_type ='ACTION_CLICK' and action_code !='')) | and ((action_type ='ACTION_VIEW' and menu_code != '0') or (action_type ='ACTION_CLICK' and action_code !=''))
...@@ -62,13 +62,13 @@ object SessionProcessPathArgs { ...@@ -62,13 +62,13 @@ object SessionProcessPathArgs {
//注册日期在流量统计日期之前的用户 //注册日期在流量统计日期之前的用户
val doctorDF: DataFrame = sparkSession.sql( val doctorDF: DataFrame = sparkSession.sql(
"select id from pica_ds.pica_doctor where to_date(creat_time) <=DATE_SUB(current_date(),1)") "select cast(id as string) id from pica_ds.pica_doctor where to_date(creat_time) <=DATE_SUB(current_date(),1)")
sourceDF.join(doctorDF, sourceDF("user_id") === doctorDF("id"), "left") sourceDF.join(doctorDF, sourceDF("user_id") === doctorDF("id"), "left")
.createOrReplaceTempView("tmp_table") .createOrReplaceTempView("tmp_table")
//将id为null的记录设置为0 //将id为null的记录设置为0
val reSql: String = "select session_id,case when id is null then 0 else user_id END as user_id,action_type," + val reSql: String = "select session_id,case when id is null then '0' else user_id END as user_id,action_type," +
"user_token,menu_code,action_code,position,label_value,app_version,device_type,created_time,date_time from tmp_table" "user_token,menu_code,action_code,position,label_value,app_version,device_type,created_time,date_time from tmp_table"
val selectDF: DataFrame = sparkSession.sql(reSql) val selectDF: DataFrame = sparkSession.sql(reSql)
......
Markdown 格式
0% or
您添加了 0 到此讨论。请谨慎行事。
先完成此消息的编辑!
想要评论请 注册