Skip to content
项目
群组
代码片段
帮助
正在加载...
帮助
提交反馈
为 GitLab 提交贡献
登录
切换导航
S
study-report
项目
项目
详情
动态
版本
周期分析
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
统计图
成员
成员
收起侧边栏
Close sidebar
动态
分支图
统计图
提交
打开侧边栏
zhenxin.ma
study-report
提交
c1230591
提交
c1230591
编写于
1月 17, 2020
作者:
zhenxin.ma
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
修改了逻辑
上级
a23da01d
变更
1
显示空白字符变更
内嵌
并排
正在显示
1 个修改的文件
包含
51 行增加
和
48 行删除
+51
-48
SyncAttachRegionResult.scala
src/main/scala/com/data/SyncAttachRegionResult.scala
+51
-48
未找到文件。
src/main/scala/com/data/SyncAttachRegionResult.scala
浏览文件 @
c1230591
...
...
@@ -57,7 +57,45 @@ object SyncAttachRegionResult {
val
broadcast
:
Broadcast
[
Array
[(
Long
,
Long
,
Long
,
Long
,
Long
)]]
=
sparkSession
.
sparkContext
.
broadcast
(
ppact
)
val
reDS
:
Dataset
[(
Long
,
Long
,
Long
,
Long
,
Long
,
Long
)]
=
df1
.
mapPartitions
(
it
=>
{
//转换为字段名的DataFrame
val
reDF
:
DataFrame
=
getDataFrame
(
df1
,
sparkSession
,
broadcast
)
//写入到parquet文件中
reDF
.
write
.
mode
(
SaveMode
.
Overwrite
).
format
(
"parquet"
)
.
save
(
s
"${SyncDataConfig.PARQUET_PATH}${SyncDataConfig.DATABASE2}.${SyncDataConfig.Hive_TABLE6}"
)
//导入到临时表中
sparkSession
.
sql
(
s
"load data INPATH '${SyncDataConfig.PARQUET_PATH}${SyncDataConfig.DATABASE2}.${SyncDataConfig.Hive_TABLE6}' "
+
s
"overwrite into table ${SyncDataConfig.DATABASE2}.${SyncDataConfig.Hive_TABLE6}"
)
//更新record表
println
(
"-------------------------更新 schedule_job_record表--------------------------------------"
)
//任务结束,更新 record 配置表
val
updateSQL
:
String
=
s
"""
|update ${MySQLConfig.HDFS_TABLE} set status=?,end_time=? where job_id=1892 and start_time='${startTime}'"""
.
stripMargin
val
endTime
:
String
=
DateUtils
.
getTodayDate
val
upreSta
:
PreparedStatement
=
connSql
.
prepareStatement
(
updateSQL
)
upreSta
.
setString
(
1
,
"1"
)
upreSta
.
setString
(
2
,
endTime
)
//更新表数据
upreSta
.
executeUpdate
()
//关闭连接
JDBCUtil
.
close
(
connSql
,
upreSta
)
}
catch
{
case
e
:
Exception
=>
{
println
(
"-------------------------任务异常---------------------------------------------------"
)
val
exceptionSQL
:
String
=
s
"""
|update ${MySQLConfig.HDFS_TABLE} set status=?,exception=?,end_time=? where job_id=1892 and start_time='${startTime}'"""
.
stripMargin
val
errorArr
=
Array
[
String
](
"2"
,
e
.
getMessage
,
DateUtils
.
getTodayDate
)
JDBCUtil
.
insertRecord
(
connSql
,
exceptionSQL
,
errorArr
)
connSql
.
close
()
}
}
}
def
getDataFrame
(
data
:
DataFrame
,
sparkSession
:
SparkSession
,
broadcast
:
Broadcast
[
Array
[(
Long
,
Long
,
Long
,
Long
,
Long
)]])
:
DataFrame
=
{
import
sparkSession.implicits._
val
reDS
:
Dataset
[(
Long
,
Long
,
Long
,
Long
,
Long
,
Long
)]
=
data
.
mapPartitions
(
it
=>
{
//存储最终的结果,每行代表一个元组
val
tuples
:
ListBuffer
[(
Long
,
Long
,
Long
,
Long
,
Long
,
Long
)]
=
ListBuffer
[(
Long
,
Long
,
Long
,
Long
,
Long
,
Long
)]()
val
list
:
List
[
Row
]
=
it
.
toList
...
...
@@ -82,7 +120,6 @@ object SyncAttachRegionResult {
if
(
town_id
!=
0
)
{
count
=
count
+
1
}
//通过广播变量,在广播变量中查看
val
broad
:
Array
[(
Long
,
Long
,
Long
,
Long
,
Long
)]
=
broadcast
.
value
broad
.
foreach
(
tuple
=>
{
...
...
@@ -98,44 +135,10 @@ object SyncAttachRegionResult {
tuples
+=
((
project_id
,
doctor_id
,
tuple
.
_2
,
tuple
.
_3
,
tuple
.
_4
,
tuple
.
_5
))
}
})
})
tuples
.
iterator
})
//转换为字段名
val
reDF
:
DataFrame
=
reDS
.
toDF
(
"project_id"
,
"doctor_id"
,
"province_id"
,
"city_id"
,
"county_id"
,
"town_id"
)
//写入到parquet文件中
reDF
.
write
.
mode
(
SaveMode
.
Overwrite
).
format
(
"parquet"
)
.
save
(
s
"${SyncDataConfig.PARQUET_PATH}${SyncDataConfig.DATABASE2}.${SyncDataConfig.Hive_TABLE6}"
)
//导入到临时表中
sparkSession
.
sql
(
s
"load data INPATH '${SyncDataConfig.PARQUET_PATH}${SyncDataConfig.DATABASE2}.${SyncDataConfig.Hive_TABLE6}' "
+
s
"overwrite into table ${SyncDataConfig.DATABASE2}.${SyncDataConfig.Hive_TABLE6}"
)
//更新record表
println
(
"-------------------------更新 schedule_job_record表--------------------------------------"
)
//任务结束,更新 record 配置表
val
updateSQL
:
String
=
s
"""
|update ${MySQLConfig.HDFS_TABLE} set status=?,end_time=? where job_id=1892 and start_time='${startTime}'"""
.
stripMargin
val
endTime
:
String
=
DateUtils
.
getTodayDate
val
upreSta
:
PreparedStatement
=
connSql
.
prepareStatement
(
updateSQL
)
upreSta
.
setString
(
1
,
"1"
)
upreSta
.
setString
(
2
,
endTime
)
//更新表数据
upreSta
.
executeUpdate
()
//关闭连接
JDBCUtil
.
close
(
connSql
,
upreSta
)
}
catch
{
case
e
:
Exception
=>
{
println
(
"-------------------------任务异常---------------------------------------------------"
)
val
exceptionSQL
:
String
=
s
"""
|update ${MySQLConfig.HDFS_TABLE} set status=?,exception=?,end_time=? where job_id=1892 and start_time='${startTime}'"""
.
stripMargin
val
errorArr
=
Array
[
String
](
"2"
,
e
.
getMessage
,
DateUtils
.
getTodayDate
)
JDBCUtil
.
insertRecord
(
connSql
,
exceptionSQL
,
errorArr
)
connSql
.
close
()
}
}
val
result
:
DataFrame
=
reDS
.
toDF
(
"project_id"
,
"doctor_id"
,
"province_id"
,
"city_id"
,
"county_id"
,
"town_id"
)
result
}
}
写
预览
Markdown
格式
0%
请重试
or
附加一个文件
附加文件
取消
您添加了
0
人
到此讨论。请谨慎行事。
先完成此消息的编辑!
取消
想要评论请
注册
或
登录