提交 a3b45e90 编写于 作者: zhenxin.ma's avatar zhenxin.ma

配置、工具类

上级 86828c95
package com.utils
import java.sql
import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}
/**
* 连接 MySQL的工具类
*
* @Author zhenxin.ma
* @Date 2019/8/12 13:47
* @Version 1.0
*/
object JDBCUtil {
/**
* @Description 获取JDBC链接MYSQL
* @return java.sql.Connection
**/
def getConnection():Connection = {
var con: Connection = null
try{
Class.forName(MySQLConfig.HDFS_DRIVER)
con = DriverManager.getConnection(MySQLConfig.HDFS_URL, MySQLConfig.HDFS_USERNAME, MySQLConfig.HDFS_PSSWORD)
} catch {
case e:Exception => {
println("-----------MYSQL Connection has exception , msg = "+ e.getMessage())
}
}
con
}
/**
* @Description 获取结果集
* @param sql 执行的SQL
* @param jobName 要分析的表任务名字
* @param sync 批次号
* @return java.sql.ResultSet
**/
def getResultSet(sql: String, jobName:String, sync:String):ResultSet = {
var resultSet: ResultSet = null
var prest:PreparedStatement = null
try{
val con: Connection = getConnection()
prest = con.prepareStatement(sql)
prest.setString(1, jobName)
prest.setString(2, sync)
resultSet = prest.executeQuery()
resultSet
}catch {
case e: Exception => e.printStackTrace()
resultSet
}
}
/**
* @Description 关闭连接和释放资源
* @return void
**/
def close(con: Connection,prest:PreparedStatement): Unit = {
try{
if (prest != null) {
prest.close()
}
if (con != null) {
con.close()
}
println("------关闭MYSQL连接和释放资源------")
}catch {
case e: Exception => e.printStackTrace()
}
}
//往配置记录表 record插入数据
def insertRecord(connSql: sql.Connection, sq: String ,array:Array[String]): Int = {
val preState: PreparedStatement = connSql.prepareStatement(sq)
for (i <-0 until array.length) {
preState.setString(i + 1,array(i))
}
val flag: Int = preState.executeUpdate()
preState.close()
flag
}
}
package com.utils
/**
* @Author zhenxin.ma
* @Date 2019/11/15 9:46
* @Version 1.0
*/
object MySQLConfig {
//集群hdfs127 Mysql配置,记录任务的执行情况
final val HDFS_DRIVER = "com.mysql.jdbc.Driver"
final val HDFS_BASE = "pica_job"
final val HDFS_URL = s"jdbc:mysql://hdfs127:3306/${HDFS_BASE}?useTimezone=true&serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=utf8"
final val HDFS_USERNAME = "pica_spider"
final val HDFS_PSSWORD = "5$7FXgz#e5JWP08e"
final val HDFS_TABLE = "schedule_job_record"
//同步MYSQL线上环境账号配置
final val URL: String = "jdbc:mysql://rr-uf6p67797265cm09f.mysql.rds.aliyuncs.com:3306/pica" +
"?useTimezone=true&serverTimezone=GMT%2B8&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull"
final val USER: String = "bi_readonly"
final val PSSWORD: String = "1Qaz2wsx"
//同步MYSQLUAT环境账号配置
// final val URL: String = "jdbc:mysql://192.168.110.181:3306/pica" +
// "?useTimezone=true&serverTimezone=GMT%2B8&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull"
// final val USER: String = "pica_test"
// final val PSSWORD: String = "pkv#sqvSGn@O1@tg"
}
package com.utils
import java.sql.{Connection, DriverManager}
import java.util.Properties
import org.apache.spark.SparkConf
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
/**
* @Author zhenxin.ma
* @Date 2019/11/15 9:57
* @Version 1.0
*/
object MyUtil {
//连接MYSQL,将MYSQL数据导入到 Hive中
def loadMysqlToHive(sparkSession: SparkSession, url: String, table: String, loadSQL: String): DataFrame = {
//连接MYSQL,按照 id 进行分区
val mappingData: DataFrame = sparkSession.read.format("jdbc")
.option("driver", classOf[com.mysql.jdbc.Driver].getName)
.option("url", url)
.option("dbtable", table)
.option("user", MySQLConfig.USER)
.option("password", MySQLConfig.PSSWORD)
.option("partitionColumn",SyncDataConfig.PARTITIONCOLUMN)
.option("lowerBound",SyncDataConfig.LOWERBOUND)
.option("upperBound",SyncDataConfig.UPPERBOUND)
.option("numPartitions",SyncDataConfig.NUMPARTITIONS)
.load()
println("---------------- schema information---------------------")
mappingData.printSchema()
mappingData.createOrReplaceTempView(table)
val df: DataFrame = sparkSession.sql(loadSQL)
df
}
//Spark 任务相关的配置
def setConfigure(conf: SparkConf): Unit = {
conf.set("spark.serializer", classOf[KryoSerializer].getName)
// 序列化时使用的内存缓冲区大小
conf.set("spark.kryoserializer.buffer.max", "128m")
// 启用rdd压缩
conf.set("spark.rdd.compress", "true")
// 设置压缩格式为snappy, 默认也就是lz4, 这种压缩格式压缩比高, 速度快, 但是耗费的内存相对也多一些
conf.set("spark.io.compression.codec", "snappy")
// 设置压缩时使用的内存缓冲区大小
conf.set("spark.io.compression.snappy.blockSize", "64k")
//调节持久化的内存比例
conf.set("spark.memory.useLegacyMode", "true")
conf.set("spark.storage.memoryFraction", "0.5")
//设置shuffle过程中一个task拉取到上个stage的task的输出后,进行聚合操作时能够使用的Executor内存的比例,默认是0.2
//shuffle操作比较多时,适当增加这个值,增加task执行需要的内存
conf.set("spark.shuffle.memoryFraction","0.4")
// spark sql 在shuffle时产生的partition数量, 默认是200
conf.set("spark.sql.shuffle.partitions", "210")
// SortShuffleManager开启by-pass(不需要排序)模式的阈值, 默认为200, 在partition数量小于这个值时会开启by-pass模式
conf.set("spark.shuffle.sort.bypassMergeThreshold", "300")
}
/**
* 将DataFrame保存为Mysql表
* @param dataFrame 需要保存的dataFrame
* @param tableName 保存的mysql 表名
* @param saveMode 保存的模式 :Append、Overwrite、ErrorIfExists、Ignore
* @param proPath 配置文件的路径
*/
def saveASMysqlTable(dataFrame: DataFrame, tableName: String, saveMode: SaveMode, proPath: String) = {
var table = tableName
val properties: Properties = getProPerties(proPath)
val prop = new Properties //配置文件中的key 与 spark 中的 key 不同 所以创建prop 按照spark 的格式 进行配置数据库
prop.setProperty("user", properties.getProperty("mysql.username"))
prop.setProperty("password", properties.getProperty("mysql.password"))
prop.setProperty("driver", properties.getProperty("mysql.driver"))
prop.setProperty("url", properties.getProperty("mysql.url"))
if (saveMode == SaveMode.Overwrite) {
var conn: Connection = null
try {
conn = DriverManager.getConnection(
prop.getProperty("url"),
prop.getProperty("user"),
prop.getProperty("password")
)
val stmt = conn.createStatement
table = table.toUpperCase
stmt.execute(s"truncate table $table") //此操作的目的是在覆盖的时候不删除原来的表,避免数据的类型全部变为TEXT类型
conn.close()
}
catch {
case e: Exception =>
println("MySQL Error:")
e.printStackTrace()
}
}
dataFrame.write.mode(SaveMode.Append).jdbc(prop.getProperty("url"), tableName, prop)
}
/**
* 获取配置文件
* @param proPath 配置文件路径
* @return
*/
def getProPerties(proPath: String): Properties = {
val properties: Properties = new Properties()
properties.load(this.getClass.getResourceAsStream(proPath))
properties
}
}
package com.utils
/**
* @Author zhenxin.ma
* @Date 2019/11/15 9:58
* @Version 1.0
*/
object SyncDataConfig {
//同步MYSQL数据导入到Hive中,线上环境,Hive库名
final val DATABASE1:String = "pica_ds"
final val DATABASE2:String = "pica_project_v2"
final val DATABASE3:String = "pica_ods"
final val DATABASE4:String = "pica_dw"
//线上Parquet文件路径
final val PARQUET_PATH: String = "hdfs://bi-name1:8020/tmp/output/"
//UAT环境,Hive库名
// final val DATABASE1:String = "pica_ds"
// final val DATABASE3:String = "pica_ods"
// final val DATABASE4:String = "pica_dw"
// final val DATABASE2:String = "pica_project"
// //UAT环境,Parquet文件路径
// final val PARQUET_PATH: String = "hdfs://master61:8020/tmp/output/"
//区域反推中间表数据目录
final val REGION_DATA_PATH: String = "/home/big-data/ods_parent_hospital_level/parent_hospital_level.txt"
final val REGION_BAD_PATH: String = "/home/big-data/ods_parent_hospital_level/bad.txt"
//区域反推中间表用到的SQL
final val REGION_SQL1: String =
s"""
| SELECT cd.project_id,cd.doctor_id,ppa.province_id,ppa.city_id,ppa.county_id,
| ppa.town_id FROM ${DATABASE1}.pica_portal_campaign_doctor cd
| INNER JOIN ${DATABASE4}.dw_dim_portal_project pj ON cd.project_id = pj.project_id
| INNER JOIN ${DATABASE3}.ods_basic_doctor_info d ON cd.doctor_id = d.doctor_id
| INNER JOIN ${DATABASE1}.pica_portal_project_attachregion ppa ON cd.project_id = ppa.project_id AND cd.doctor_id = ppa.doctor_id
| WHERE cd.delete_flag = 1 AND cd.doctor_role = 3 AND cd.doctor_type != 2
| AND date_format(cd.modified_time,'yyyy-MM-dd') <= date_sub(from_unixtime(unix_timestamp(),'yyyy-MM-dd'),1)
| AND d.delete_flag = 1 AND d.hospital_id != 0
| AND ppa.delete_flag = 1 AND ppa.content != ''
| AND date_format(ppa.modified_time,'yyyy-MM-dd') <= date_sub(from_unixtime(unix_timestamp(),'yyyy-MM-dd'),0)
""".stripMargin
final val REGION_SQL2: String = s"SELECT project_id,province_id,city_id,COALESCE(county_id,0) county_id,COALESCE(town_id,0) town_id " +
s"FROM ${DATABASE2}.lr_project_attachregion"
//Hive表名
final val Hive_TABLE: String = "pica_portal_campaign_mapping"
final val Hive_TABLE1: String = "pica_portal_campaign_doctor"
final val Hive_TABLE2: String = "pica_portal_campaign_organization"
final val Hive_TABLE3: String = "pica_portal_campaign_department"
final val Hive_TABLE4: String = "pica_portal_project_attachregion"
final val Hive_TABLE5: String = "lr_project_attachregion"
final val Hive_TABLE6: String = "attach_region_result"
final val Hive_TABLE7: String = "lr_project_sub_leader_attachregion"
//同步的MySQL表名
final val MYSQL_TABLE1: String = "portal_campaign_doctor_"
final val MYSQL_TABLE2: String = "portal_campaign_organization_"
final val MYSQL_TABLE3: String = "portal_campaign_department"
final val MYSQL_TABLE4: String = "portal_project_attachregion"
//以下是Spark读取Mysql时,设置的分区属性
final val PARTITIONCOLUMN: String = "id"
final val LOWERBOUND: String = "100"
final val UPPERBOUND: String = "20000000"
final val NUMPARTITIONS: String = "12"
//导入Hive语句
final val Hive_TABLE1_SQL: String = s"insert into table ${DATABASE1}.${Hive_TABLE1} " +
s"select id,project_id,doctor_id,doctor_role,doctor_role_flag,doctor_type," +
s"white_flag,id_type,delete_flag,created_id,created_time,modified_id,modified_time from "
final val Hive_TABLE2_SQL: String = s"insert into table ${DATABASE1}.${Hive_TABLE2} " +
s"select id,project_id,organization_id,organization_type," +
s"white_flag,scope_flag,id_type,delete_flag,created_id,created_time,modified_id,modified_time from "
}
Markdown 格式
0% or
您添加了 0 到此讨论。请谨慎行事。
先完成此消息的编辑!
想要评论请 注册