提交 3b66557b 编写于 作者: zhenxin.ma's avatar zhenxin.ma

ods层区域反推代码

上级 09cdbd5a
package com.data
import java.io.PrintWriter
import com.utils.SyncDataConfig
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, ListBuffer, Map}
/**
* 递归反推区域,生成文件,最终导入到 pica_ods.parent_hospital_level表中 1880
* @Author zhenxin.ma
* @Date 2019/10/23 19:07
* @Version 1.0
*/
class SyncParentHospitalLevel{
}
object SyncParentHospitalLevel {
final val sql1: String = s"select hospital_id,COALESCE(parent_id,0) parent_id from ${SyncDataConfig.DATABASE3}.ods_basic_hospital_info " +
s"where creat_time < current_date() and delete_flag=1"
def main(args: Array[String]): Unit = {
//存储源数据
val map : mutable.Map[Int, Int] = Map[Int,Int]()
//根节点
val map0: mutable.Map[Int, Int] = Map[Int,Int]()
//一级节点
val map1: mutable.Map[Int, Int] = Map[Int,Int]()
//二级节点
val map2: mutable.Map[Int, Int] = Map[Int,Int]()
//三级节点
val map3: mutable.Map[Int, Int] = Map[Int,Int]()
//四级节点
val map4: mutable.Map[Int, Int] = Map[Int,Int]()
//五级节点
val map5: mutable.Map[Int, Int] = Map[Int,Int]()
//六级节点
val map6: mutable.Map[Int, Int] = Map[Int,Int]()
//七级节点
val map7: mutable.Map[Int, Int] = Map[Int,Int]()
//节点等级(id,level),根节点对应的等级为0,后续一次递增
val levelID: mutable.Map[Int, Int] = Map[Int,Int]()
val conf: SparkConf = new SparkConf().setAppName("RecurseArea")
val sparkSession: SparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
val srcDataFrame: DataFrame = sparkSession.sql(sql1)
import sparkSession.implicits._
val mapDataSet: Dataset[(Int, Int)] = srcDataFrame.mapPartitions(it => {
val tuples: ListBuffer[(Int, Int)] = ListBuffer[(Int, Int)]()
val list: List[Row] = it.toList
list.foreach(row => {
val id: Int = row.getAs[Int]("hospital_id")
val parent_id: Int = row.getAs[Int]("parent_id")
tuples += ((id,parent_id))
})
tuples.iterator
})
mapDataSet.cache()
println("---------开始存储源数据---------------")
mapDataSet.collect().foreach(tuple => {
if (map.contains(tuple._1)) {
throw new RuntimeException("脏数据,存在重复的 id")
}
map.put(tuple._1,tuple._2)
})
println("---------------map size is " + map.size)
//存储根节点
map.foreach(tuple => {
if (tuple._2 == 0) {
map0.put(tuple._1,0)
levelID.put(tuple._1,0)
}
})
println("---------------map0 size is " + map0.size)
//存储一级节点
map.foreach(tuple => {
//该节点的父节点在根节点的Map0中,说明这是一级节点
if (map0.contains(tuple._2)) {
map1.put(tuple._1,tuple._2)
levelID.put(tuple._1,1)
}
})
println("---------------map1 size is " + map1.size)
//存储二级节点
map.foreach(tuple => {
//该节点的父节点在一级节点的Map1中,说明这是二级节点
if (map1.contains(tuple._2)) {
map2.put(tuple._1,tuple._2)
levelID.put(tuple._1,2)
}
})
println("---------------map2 size is " + map2.size)
//存储三级节点
map.foreach(tuple => {
//该节点的父节点在二级节点的Map2中,说明这是三级节点
if (map2.contains(tuple._2)) {
map3.put(tuple._1,tuple._2)
levelID.put(tuple._1,3)
}
})
println("---------------map3 size is " + map3.size)
//存储四级节点
map.foreach(tuple => {
//该节点的父节点在三级节点的Map3中,说明这是四级节点
if (map3.contains(tuple._2)) {
map4.put(tuple._1,tuple._2)
levelID.put(tuple._1,4)
}
})
println("---------------map4 size is " + map4.size)
//存储五级节点
map.foreach(tuple => {
//该节点的父节点在四级节点的Map4中,说明这是五级节点
if (map4.contains(tuple._2)) {
map5.put(tuple._1,tuple._2)
levelID.put(tuple._1,5)
}
})
println("---------------map5 size is " + map5.size)
//存储六级节点
map.foreach(tuple => {
//该节点的父节点在五级节点的Map4中,说明这是六级节点
if (map5.contains(tuple._2)) {
map6.put(tuple._1,tuple._2)
levelID.put(tuple._1,6)
}
})
println("---------------map6 size is " + map6.size)
//存储七级节点
map.foreach(tuple => {
//该节点的父节点在六级节点的Map4中,说明这是七级节点
if (map6.contains(tuple._2)) {
map7.put(tuple._1,tuple._2)
levelID.put(tuple._1,7)
}
})
println("---------------map7 size is " + map7.size)
import scala.collection.mutable.ArrayBuffer
val array: ArrayBuffer[Int] = ArrayBuffer[Int]()
//存储数据
val set: mutable.Set[String] = mutable.Set[String]()
//将数据写入文件中
//符合规则的数据
val writer: PrintWriter = new PrintWriter(SyncDataConfig.REGION_DATA_PATH)
//不匹配的数据
val writerBad: PrintWriter = new PrintWriter(SyncDataConfig.REGION_BAD_PATH)
var count:Int = 0
map.foreach(tuple => {
levelID.getOrElse(tuple._1,-1) match {
case 0 => {
set +=(tuple._1 +","+0+","+0)
}
//一级节点
case 1 => {
//从根节点写成树结构
array +=(tuple._2,tuple._1)
addSet(array,set)
}
case 2 => {
val level0: Int = map1.getOrElse(tuple._2,-1)
//从根节点写成树结构
array +=(level0,tuple._2,tuple._1)
addSet(array,set)
}
case 3 => {
val level1: Int = map2.getOrElse(tuple._2,-1)
val level0: Int = map1.getOrElse(level1,-1)
array +=(level0,level1,tuple._2,tuple._1)
addSet(array,set)
}
case 4 => {
val level2: Int = map3.getOrElse(tuple._2,-1)
val level1: Int = map2.getOrElse(level2,-1)
val level0: Int = map1.getOrElse(level1,-1)
//从根节点写成树结构
array +=(level0,level1,level2,tuple._2,tuple._1)
addSet(array,set)
}
case 5 => {
val level3: Int = map4.getOrElse(tuple._2,-1)
val level2: Int = map3.getOrElse(level3,-1)
val level1: Int = map2.getOrElse(level2,-1)
val level0: Int = map1.getOrElse(level1,-1)
//从根节点写成树结构
array +=(level0,level1,level2,level3,tuple._2,tuple._1)
addSet(array,set)
}
case 6 => {
val level4: Int = map5.getOrElse(tuple._2,-1)
val level3: Int = map4.getOrElse(level4,-1)
val level2: Int = map3.getOrElse(level3,-1)
val level1: Int = map2.getOrElse(level2,-1)
val level0: Int = map1.getOrElse(level1,-1)
//从根节点写成树结构
array +=(level0,level1,level2,level3,level4,tuple._2,tuple._1)
addSet(array,set)
}
case 7 => {
val level5: Int = map6.getOrElse(tuple._2,-1)
val level4: Int = map5.getOrElse(level5,-1)
val level3: Int = map4.getOrElse(level4,-1)
val level2: Int = map3.getOrElse(level3,-1)
val level1: Int = map2.getOrElse(level2,-1)
val level0: Int = map1.getOrElse(level1,-1)
//从根节点写成树结构
array +=(level0,level1,level2,level3,level4,level5,tuple._2,tuple._1)
addSet(array,set)
}
case _ => {
count +=1
// println("-------------不匹配的数据为 (" + tuple._1 +"," + tuple._2 +") ---------编号 " + count)
writerBad.println(tuple._1 + "," + tuple._2)
}
}
})
println("----不匹配的数量为 "+ count)
//将set存入的数据写入到文件中
set.foreach(str =>{
writer.println(str)
})
set.clear()
//关闭流
writer.close()
writerBad.close()
}
//将数据存入到set中
def addSet(array:ArrayBuffer[Int],set: mutable.Set[String]):Unit = {
for (i<- 0 until array.length -1) {
for (j<- i+1 until array.length){
set +=(array(i) +","+array(j)+","+(j - i))
}
}
//清空
array.clear()
}
}
Markdown 格式
0% or
您添加了 0 到此讨论。请谨慎行事。
先完成此消息的编辑!
想要评论请 注册