提交 2a048fc4 编写于 作者: zhenxin.ma's avatar zhenxin.ma

全埋点项目代码

上级
<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
<!-- TODO: a jarjar format would be better -->
<id>spark</id>
<formats>
<format>jar</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>${project.build.directory}/classes</directory>
<outputDirectory>/</outputDirectory>
<excludes>
<!--打jar包时去除.xml和.properties文件,不打入jar包-->
<!--<exclude>*.xml</exclude>-->
<exclude>*.properties</exclude>
</excludes>
</fileSet>
</fileSets>
<dependencySets>
<dependencySet>
<outputDirectory>/</outputDirectory>
<useProjectArtifact>false</useProjectArtifact>
<unpack>true</unpack>
<scope>runtime</scope>
</dependencySet>
</dependencySets>
</assembly>
\ No newline at end of file
package com.pica.utils;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
/**
* 日期时间工具类
* @Author zhenxin.ma
* @Date 2019/8/29 17:28
* @Version 1.0
*/
public class DateUtils {
public static final SimpleDateFormat TIME_FORMAT =
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
public static final SimpleDateFormat DATE_FORMAT =
new SimpleDateFormat("yyyy-MM-dd");
public static ThreadLocal<SimpleDateFormat> simpleDateFormatThreadLocal=new ThreadLocal<SimpleDateFormat>(){
@Override
protected SimpleDateFormat initialValue() {
return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
}
};
/**
* 判断一个时间是否在另一个时间之前
* @param time1 第一个时间
* @param time2 第二个时间
* @return 判断结果
*/
public static boolean before(String time1, String time2) {
try {
Date dateTime1 = TIME_FORMAT.parse(time1);
Date dateTime2 = TIME_FORMAT.parse(time2);
if(dateTime1.before(dateTime2)) {
return true;
}
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
/**
* 判断一个时间是否在另一个时间之后
* @param time1 第一个时间
* @param time2 第二个时间
* @return 判断结果
*/
public static boolean after(String time1, String time2) {
try {
Date dateTime1 = TIME_FORMAT.parse(time1);
Date dateTime2 = TIME_FORMAT.parse(time2);
if(dateTime1.after(dateTime2)) {
return true;
}
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
/**
* 计算时间差值(单位为秒)
* @param time1 时间1
* @param time2 时间2
* @return 差值
*/
public static int minus(String time1, String time2) {
try {
Date datetime1 = TIME_FORMAT.parse(time1);
Date datetime2 = TIME_FORMAT.parse(time2);
long millisecond = datetime1.getTime() - datetime2.getTime();
return Integer.valueOf(String.valueOf(millisecond / 1000));
} catch (Exception e) {
e.printStackTrace();
}
return 0;
}
/**
* 获取年月日和小时
* @param datetime 时间(yyyy-MM-dd HH:mm:ss)
* @return 结果
*/
public static String getDateHour(String datetime) {
String date = datetime.split(" ")[0];
String hourMinuteSecond = datetime.split(" ")[1];
String hour = hourMinuteSecond.split(":")[0];
return date + "_" + hour;
}
/**
* @Description 毫秒转换成标准时间 yyyy-MM-dd HH:mm:ss
* @param time 传入的参数,毫秒时间
* @return java.lang.String
**/
public static String milliSecondsFormatTime(String time){
if (time == null || time.isEmpty()) {
return "";
}
Date tmpDate = new Date();
tmpDate.setTime(Long.parseLong(time));
String dtime = simpleDateFormatThreadLocal.get().format(tmpDate);
return dtime;
}
/**
* 获取昨天的日期(yyyy-MM-dd)
* @return 昨天的日期
*/
public static String getYesterdayDate() {
Calendar cal = Calendar.getInstance();
cal.setTime(new Date());
cal.add(Calendar.DAY_OF_YEAR, -1);
Date date = cal.getTime();
return DATE_FORMAT.format(date);
}
/**
* 获取当天日期(yyyy-MM-dd)
* @return 当天日期
*/
public static String getTodayDate() {
return DATE_FORMAT.format(new Date());
}
/**
* @Description 获取当天的时间:yyyy-MM-dd HH:mm:ss
* @param
* @return java.lang.String
**/
public static String getTodayTime() {
return simpleDateFormatThreadLocal.get().format(new Date());
}
// /**
// * @Description 获取昨天的日期:yyyy-MM-dd HH:mm:ss
// * @param gapDay
// * @return java.lang.String
// **/
// public static String getYesterdayTime(int gapDay){
// Calendar calendar=Calendar.getInstance();
// calendar.set(Calendar.HOUR_OF_DAY,24*gapDay);
// String yesterdayDate=simpleDateFormatThreadLocal.get().format(calendar.getTime());
// return yesterdayDate;
// }
/**
* 格式化日期(yyyy-MM-dd)
* @param date Date对象
* @return 格式化后的日期
*/
public static String formatDate(Date date) {
return DATE_FORMAT.format(date);
}
/**
* 格式化时间(yyyy-MM-dd HH:mm:ss)
* @param date Date对象
* @return 格式化后的时间
*/
public static String formatTime(Date date) {
return TIME_FORMAT.format(date);
}
/**
* @Description 日期解析
* @param time 格式化的时间(yyyy-MM-dd HH:mm:ss)
* @return java.util.Date
**/
public static Date parseTime(String time)
{
try {
Date result=simpleDateFormatThreadLocal.get().parse(time);
simpleDateFormatThreadLocal.remove();
return result;
} catch (ParseException e) {
e.printStackTrace();
}
return null;
}
public static void main(String[] args) {
final Date date = new Date();
System.out.println(date);
System.out.println(parseTime("2020-03-30 11:45:18"));
System.out.println(formatDate(date));
System.out.println(formatTime(date));
System.out.println(getYesterdayDate());
String com = "005#";
final String[] strs = com.split("#");
System.out.println("length is "+strs.length);
System.out.println("0位数据是: "+strs[0]);
}
}
package com.pica.utils;
import java.math.BigDecimal;
/**
* 数字格工具类
* @Author zhenxin.ma
* @Date 2020/3/30 14:05
* @Version 1.0
*/
public class NumberUtils {
/**
* 格式化小数
* @param num 数字
* @param scale 四舍五入的位数
* @return 格式化小数
*/
public static double formatDouble(double num, int scale) {
BigDecimal bd = new BigDecimal(num);
return bd.setScale(scale, BigDecimal.ROUND_HALF_UP).doubleValue();
}
}
package com.pica.utils;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import java.awt.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* 参数工具类
* @Author zhenxin.ma
* @Date 2020/3/30 14:09
* @Version 1.0
*/
public class ParamUtils {
/**
* 从命令行参数中提取任务id
* @param args 命令行参数
* @return 任务id
*/
public static Long getTaskIdFromArgs(String[] args) {
try {
if(args != null && args.length > 0) {
return Long.valueOf(args[0]);
}
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
/**
* 从JSON对象中提取参数
* @param jsonObject JSON对象
* @return 参数
*/
public static String getParam(JSONObject jsonObject, String field) {
JSONArray jsonArray = jsonObject.getJSONArray(field);
if(jsonArray != null && jsonArray.size() > 0) {
return jsonArray.getString(0);
}
return null;
}
}
package com.pica.utils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.Map;
/**
* 读取HDFS配置文件
* @Author zhenxin.ma
* @Date 2020/1/2 14:50
* @Version 1.0
*/
public class PropertiesUtil {
public static Map<String, String> propertiesMap = new HashMap();
static {
//HDFS配置文件路径
String hdfsPropertiesPath = "/user/big-data/script/config2/config.properties";
System.out.println("hdfsPropertiesPath is :" +hdfsPropertiesPath);
BufferedReader br =null;
try {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
Path path = new Path(hdfsPropertiesPath);
FSDataInputStream fsDataInputStream = fs.open(path);
br = new BufferedReader(new InputStreamReader(fsDataInputStream));
String line = null;
while ((line = br.readLine()) != null) {
if(line.contains("=") && !line.trim().startsWith("#")){
//只替换第一个"="号
String rline =line.replaceFirst("="," ");
propertiesMap.put(rline.split(" ")[0].trim(),rline.split(" ")[1].trim());
}
}
//判断运行环境
if("online".equals(propertiesMap.get("spark.model"))){
System.out.println("propertiesMap:" + propertiesMap);
}
} catch (Exception e){
e.printStackTrace();
} finally {
try {
if (br !=null) {
br.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
package com.pica.utils;
/**
* 字符串工具类
* @Author zhenxin.ma
* @Date 2020/3/30 15:35
* @Version 1.0
*/
public class StringUtils {
/**
* 判断字符串是否为空
* @param str 字符串
* @return 是否为空
*/
public static boolean isEmpty(String str) {
return str == null || "".equals(str);
}
/**
* 判断字符串是否不为空
* @param str 字符串
* @return 是否不为空
*/
public static boolean isNotEmpty(String str) {
return str != null && !"".equals(str);
}
/**
* 判断字符串是否为null,如果是,那么给字符串设置默认值"";否则返回原值
* @param str 字符串
* @return java.lang.String 返回字符串值
**/
public static String getNotNullString(String str) {
if (isEmpty(str)) {
return "";
}
return str;
}
/**
* 截断字符串两侧的逗号
* @param str 字符串
* @return 字符串
*/
public static String trimComma(String str) {
if(str.startsWith(",")) {
str = str.substring(1);
}
if(str.endsWith(",")) {
str = str.substring(0, str.length() - 1);
}
return str;
}
/**
* 补全两位数字
* @param str
* @return
*/
public static String fulfuill(String str) {
if(str.length() == 2) {
return str;
} else {
return "0" + str;
}
}
/**
* 从拼接的字符串中提取字段
* @param str 字符串
* @param delimiter 分隔符
* @param field 字段
* @return 字段值
*/
public static String getFieldFromConcatString(String str,
String delimiter, String field) {
String[] fields = str.split(delimiter);
for(String concatField : fields) {
if(concatField.split("=").length==2) {
String fieldName = concatField.split("=")[0];
String fieldValue = concatField.split("=")[1];
if (fieldName.equals(field)) {
return fieldValue;
}
}
}
return null;
}
/**
* 从拼接的字符串中给字段设置值
* @param str 字符串
* @param delimiter 分隔符
* @param field 字段名
* @param newFieldValue 新的field值
* @return 字段值
*/
public static String setFieldInConcatString(String str,
String delimiter, String field, String newFieldValue) {
String[] fields = str.split(delimiter);
for(int i = 0; i < fields.length; i++) {
String fieldName = fields[i].split("=")[0];
if(fieldName.equals(field)) {
String concatField = fieldName + "=" + newFieldValue;
fields[i] = concatField;
break;
}
}
StringBuffer buffer = new StringBuffer("");
for(int i = 0; i < fields.length; i++) {
buffer.append(fields[i]);
if(i < fields.length - 1) {
buffer.append("|");
}
}
return buffer.toString();
}
}
package com.pica.utils;
/**
* @Author zhenxin.ma
* @Date 2020/3/30 15:37
* @Version 1.0
*/
public class ValidUtils {
/**
* 校验数据中的指定字段,是否在指定范围内
* @param data 数据
* @param dataField 数据字段
* @param parameter 参数
* @param startParamField 起始参数字段
* @param endParamField 结束参数字段
* @return 校验结果
*/
public static boolean between(String data, String dataField,
String parameter, String startParamField, String endParamField) {
String startParamFieldStr = StringUtils.getFieldFromConcatString(
parameter, "\\|", startParamField);
String endParamFieldStr = StringUtils.getFieldFromConcatString(
parameter, "\\|", endParamField);
if(startParamFieldStr == null || endParamFieldStr == null) {
return true;
}
int startParamFieldValue = Integer.valueOf(startParamFieldStr);
int endParamFieldValue = Integer.valueOf(endParamFieldStr);
String dataFieldStr = StringUtils.getFieldFromConcatString(
data, "\\|", dataField);
if(dataFieldStr != null) {
int dataFieldValue = Integer.valueOf(dataFieldStr);
if(dataFieldValue >= startParamFieldValue &&
dataFieldValue <= endParamFieldValue) {
return true;
} else {
return false;
}
}
return false;
}
/**
* 校验数据中的指定字段,是否有值与参数字段的值相同
* @param data 数据
* @param dataField 数据字段
* @param parameter 参数
* @param paramField 参数字段
* @return 校验结果
*/
public static boolean in(String data, String dataField,
String parameter, String paramField) {
String paramFieldValue = StringUtils.getFieldFromConcatString(
parameter, "\\|", paramField);
if(paramFieldValue == null) {
return true;
}
String[] paramFieldValueSplited = paramFieldValue.split(",");
String dataFieldValue = StringUtils.getFieldFromConcatString(
data, "\\|", dataField);
if(dataFieldValue != null) {
String[] dataFieldValueSplited = dataFieldValue.split(",");
for(String singleDataFieldValue : dataFieldValueSplited) {
for(String singleParamFieldValue : paramFieldValueSplited) {
if(singleDataFieldValue.equals(singleParamFieldValue)) {
return true;
}
}
}
}
return false;
}
/**
* 校验数据中的指定字段,是否在指定范围内
* @param data 数据
* @param dataField 数据字段
* @param parameter 参数
* @param paramField 参数字段
* @return 校验结果
*/
public static boolean equal(String data, String dataField,
String parameter, String paramField) {
String paramFieldValue = StringUtils.getFieldFromConcatString(
parameter, "\\|", paramField);
if(paramFieldValue == null) {
return true;
}
String dataFieldValue = StringUtils.getFieldFromConcatString(
data, "\\|", dataField);
if(dataFieldValue != null) {
if(dataFieldValue.equals(paramFieldValue)) {
return true;
}
}
return false;
}
}
package com.pica.utils;
import java.lang.reflect.Array;
import java.util.Collection;
import java.util.Map;
/**
* 通用工具类
* @Author zhenxin.ma
* @Date 2019/9/18 15:16
* @Version 1.0
*/
public class commonUtil {
/**
* 判断是否为空,支持字符串、map、集合、数组和其它对象
*
* @param obj 判断的对象
* @return 为空返回True,不为空返回False
*/
public static boolean isEmpty(Object obj){
if(obj == null){
return true;
}else if (obj instanceof String){
return "".equals(String.valueOf(obj).trim());
}else if (obj instanceof Map<?,?>){
return ((Map<?,?>) obj).isEmpty();
}else if (obj instanceof Collection<?>){
return ((Collection<?>) obj).isEmpty();
}else if (obj.getClass().isArray()){
return Array.getLength(obj) == 0;
}
return false;
}
/**
* 判断是否为空,支持字符串、map、集合、数组和其它对象
*
* @param obj 判断的对象
* @return 为空返回False,不为空返回True
*/
public static boolean isNotEmpty(Object obj){
return !isEmpty(obj);
}
}
package com.tmp;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.*;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.PropertyResourceBundle;
import java.util.ResourceBundle;
/**
* Created by yunfeng@picahealth.com on 2019/7/30.
*/
public class PropertiesUtil {
public static Map<String, String> propertiesMap = new HashMap<String, String>();
static {
String userDir = System.getProperty("user.dir");
System.out.println("userDir:" + userDir);
String userHome = System.getProperty("user.home");
System.out.println("userHome:" + userHome);
String hdfsPropertiesPath = "D:\\workspaces\\zhuque\\config\\config_tmp.properties";
if(userHome.startsWith("/")){
hdfsPropertiesPath = "/user/big-data/script/config/config_tmp.properties";
}
System.out.println("hdfsPropertiesPath:" + hdfsPropertiesPath);
BufferedReader br =null;
try {
// in = new BufferedInputStream(new FileInputStream(cleaningPropertiesPath));
// ResourceBundle resourceBundle = new PropertyResourceBundle(in);
// for (String key : resourceBundle.keySet()) {
// String value = resourceBundle.getString(key);
// propertiesMap.put(key, value);
// }
Configuration conf = new Configuration();
conf.set("fs.hdfs.impl",org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
Path path = new Path(hdfsPropertiesPath);
FileSystem fs = FileSystem.get(conf);
FSDataInputStream fsin= fs.open(path );
String line ;
br = new BufferedReader(new InputStreamReader(fsin));
while ((line = br.readLine()) != null) {
// System.out.println("config.line:"+line);
if(line.contains("=") && !line.trim().startsWith("#")){
String rline =line.replaceFirst("="," ");
propertiesMap.put(rline.split(" ")[0].trim(),rline.split(" ")[1].trim());
}
}
if("online".equals(propertiesMap.get("spark.model"))){
System.out.println("propertiesMap:" + propertiesMap);
}
} catch (Exception e) {
e.printStackTrace();
// throw new RuntimeException(e.getMessage());
} finally {
try {
if(br!=null){
br.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
<?xml version="1.0" encoding="UTF-8"?>
<!--Autogenerated by Cloudera Manager-->
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://bi-name1</value>
</property>
<property>
<name>fs.trash.interval</name>
<value>1</value>
</property>
<property>
<name>io.compression.codecs</name>
<value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.DeflateCodec,org.apache.hadoop.io.compress.SnappyCodec,org.apache.hadoop.io.compress.Lz4Codec</value>
</property>
<property>
<name>hadoop.security.authentication</name>
<value>simple</value>
</property>
<property>
<name>hadoop.security.authorization</name>
<value>false</value>
</property>
<property>
<name>hadoop.rpc.protection</name>
<value>authentication</value>
</property>
<property>
<name>hadoop.security.auth_to_local</name>
<value>DEFAULT</value>
</property>
<property>
<name>hadoop.proxyuser.oozie.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.oozie.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.flume.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.flume.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.HTTP.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.HTTP.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.hive.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.hive.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.hue.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.hue.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.httpfs.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.httpfs.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.hdfs.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.hdfs.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.yarn.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.yarn.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.security.group.mapping</name>
<value>org.apache.hadoop.security.ShellBasedUnixGroupsMapping</value>
</property>
<property>
<name>hadoop.security.instrumentation.requires.admin</name>
<value>false</value>
</property>
<property>
<name>net.topology.script.file.name</name>
<value>/etc/hadoop/conf.cloudera.yarn/topology.py</value>
</property>
<property>
<name>io.file.buffer.size</name>
<value>65536</value>
</property>
<property>
<name>hadoop.ssl.enabled</name>
<value>false</value>
</property>
<property>
<name>hadoop.ssl.require.client.cert</name>
<value>false</value>
<final>true</final>
</property>
<property>
<name>hadoop.ssl.keystores.factory.class</name>
<value>org.apache.hadoop.security.ssl.FileBasedKeyStoresFactory</value>
<final>true</final>
</property>
<property>
<name>hadoop.ssl.server.conf</name>
<value>ssl-server.xml</value>
<final>true</final>
</property>
<property>
<name>hadoop.ssl.client.conf</name>
<value>ssl-client.xml</value>
<final>true</final>
</property>
</configuration>
<?xml version="1.0" encoding="UTF-8"?>
<!--Autogenerated by Cloudera Manager-->
<configuration>
<property>
<name>dfs.nameservices</name>
<value>bi-name1</value>
</property>
<property>
<name>dfs.client.failover.proxy.provider.bi-name1</name>
<value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
</property>
<property>
<name>dfs.ha.automatic-failover.enabled.bi-name1</name>
<value>true</value>
</property>
<property>
<name>ha.zookeeper.quorum</name>
<value>hdfs127:2181,hdfs128:2181,hdfs129:2181</value>
</property>
<property>
<name>dfs.ha.namenodes.bi-name1</name>
<value>namenode127,namenode153</value>
</property>
<property>
<name>dfs.namenode.rpc-address.bi-name1.namenode127</name>
<value>hdfs127:8020</value>
</property>
<property>
<name>dfs.namenode.servicerpc-address.bi-name1.namenode127</name>
<value>hdfs127:8022</value>
</property>
<property>
<name>dfs.namenode.http-address.bi-name1.namenode127</name>
<value>hdfs127:9870</value>
</property>
<property>
<name>dfs.namenode.https-address.bi-name1.namenode127</name>
<value>hdfs127:9871</value>
</property>
<property>
<name>dfs.namenode.rpc-address.bi-name1.namenode153</name>
<value>hdfs130:8020</value>
</property>
<property>
<name>dfs.namenode.servicerpc-address.bi-name1.namenode153</name>
<value>hdfs130:8022</value>
</property>
<property>
<name>dfs.namenode.http-address.bi-name1.namenode153</name>
<value>hdfs130:9870</value>
</property>
<property>
<name>dfs.namenode.https-address.bi-name1.namenode153</name>
<value>hdfs130:9871</value>
</property>
<property>
<name>dfs.replication</name>
<value>3</value>
</property>
<property>
<name>dfs.blocksize</name>
<value>134217728</value>
</property>
<property>
<name>dfs.client.use.datanode.hostname</name>
<value>false</value>
</property>
<property>
<name>fs.permissions.umask-mode</name>
<value>022</value>
</property>
<property>
<name>dfs.client.block.write.locateFollowingBlock.retries</name>
<value>7</value>
</property>
<property>
<name>dfs.namenode.acls.enabled</name>
<value>false</value>
</property>
<property>
<name>dfs.client.read.shortcircuit</name>
<value>false</value>
</property>
<property>
<name>dfs.domain.socket.path</name>
<value>/var/run/hdfs-sockets/dn</value>
</property>
<property>
<name>dfs.client.read.shortcircuit.skip.checksum</name>
<value>false</value>
</property>
<property>
<name>dfs.client.domain.socket.data.traffic</name>
<value>false</value>
</property>
<property>
<name>dfs.datanode.hdfs-blocks-metadata.enabled</name>
<value>true</value>
</property>
</configuration>
<?xml version="1.0" encoding="UTF-8"?>
<!--Autogenerated by Cloudera Manager-->
<configuration>
<property>
<name>hive.metastore.uris</name>
<value>thrift://hdfs127:9083</value>
</property>
<property>
<name>hive.metastore.client.socket.timeout</name>
<value>300</value>
</property>
<property>
<name>hive.metastore.warehouse.dir</name>
<value>/user/hive/warehouse</value>
</property>
<property>
<name>hive.warehouse.subdir.inherit.perms</name>
<value>true</value>
</property>
<property>
<name>hive.auto.convert.join</name>
<value>true</value>
</property>
<property>
<name>hive.auto.convert.join.noconditionaltask.size</name>
<value>20971520</value>
</property>
<property>
<name>hive.optimize.bucketmapjoin.sortedmerge</name>
<value>false</value>
</property>
<property>
<name>hive.smbjoin.cache.rows</name>
<value>10000</value>
</property>
<property>
<name>hive.server2.logging.operation.enabled</name>
<value>true</value>
</property>
<property>
<name>hive.server2.logging.operation.log.location</name>
<value>/var/log/hive/operation_logs</value>
</property>
<property>
<name>mapred.reduce.tasks</name>
<value>-1</value>
</property>
<property>
<name>hive.exec.reducers.bytes.per.reducer</name>
<value>67108864</value>
</property>
<property>
<name>hive.exec.copyfile.maxsize</name>
<value>33554432</value>
</property>
<property>
<name>hive.exec.reducers.max</name>
<value>1099</value>
</property>
<property>
<name>hive.vectorized.groupby.checkinterval</name>
<value>4096</value>
</property>
<property>
<name>hive.vectorized.groupby.flush.percent</name>
<value>0.1</value>
</property>
<property>
<name>hive.compute.query.using.stats</name>
<value>false</value>
</property>
<property>
<name>hive.vectorized.execution.enabled</name>
<value>true</value>
</property>
<property>
<name>hive.vectorized.execution.reduce.enabled</name>
<value>true</value>
</property>
<property>
<name>hive.vectorized.use.vectorized.input.format</name>
<value>true</value>
</property>
<property>
<name>hive.vectorized.use.checked.expressions</name>
<value>true</value>
</property>
<property>
<name>hive.vectorized.use.vector.serde.deserialize</name>
<value>false</value>
</property>
<property>
<name>hive.vectorized.adaptor.usage.mode</name>
<value>chosen</value>
</property>
<property>
<name>hive.vectorized.input.format.excludes</name>
<value>org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat</value>
</property>
<property>
<name>hive.merge.mapfiles</name>
<value>true</value>
</property>
<property>
<name>hive.merge.mapredfiles</name>
<value>false</value>
</property>
<property>
<name>hive.cbo.enable</name>
<value>false</value>
</property>
<property>
<name>hive.fetch.task.conversion</name>
<value>minimal</value>
</property>
<property>
<name>hive.fetch.task.conversion.threshold</name>
<value>268435456</value>
</property>
<property>
<name>hive.limit.pushdown.memory.usage</name>
<value>0.1</value>
</property>
<property>
<name>hive.merge.sparkfiles</name>
<value>true</value>
</property>
<property>
<name>hive.merge.smallfiles.avgsize</name>
<value>16777216</value>
</property>
<property>
<name>hive.merge.size.per.task</name>
<value>268435456</value>
</property>
<property>
<name>hive.optimize.reducededuplication</name>
<value>true</value>
</property>
<property>
<name>hive.optimize.reducededuplication.min.reducer</name>
<value>4</value>
</property>
<property>
<name>hive.map.aggr</name>
<value>true</value>
</property>
<property>
<name>hive.map.aggr.hash.percentmemory</name>
<value>0.5</value>
</property>
<property>
<name>hive.optimize.sort.dynamic.partition</name>
<value>false</value>
</property>
<property>
<name>hive.execution.engine</name>
<value>mr</value>
</property>
<property>
<name>spark.executor.memory</name>
<value>3255880908b</value>
</property>
<property>
<name>spark.driver.memory</name>
<value>966367641b</value>
</property>
<property>
<name>spark.executor.cores</name>
<value>4</value>
</property>
<property>
<name>spark.yarn.driver.memoryOverhead</name>
<value>102m</value>
</property>
<property>
<name>spark.yarn.executor.memoryOverhead</name>
<value>547m</value>
</property>
<property>
<name>spark.dynamicAllocation.enabled</name>
<value>true</value>
</property>
<property>
<name>spark.dynamicAllocation.initialExecutors</name>
<value>1</value>
</property>
<property>
<name>spark.dynamicAllocation.minExecutors</name>
<value>1</value>
</property>
<property>
<name>spark.dynamicAllocation.maxExecutors</name>
<value>2147483647</value>
</property>
<property>
<name>hive.metastore.execute.setugi</name>
<value>true</value>
</property>
<property>
<name>hive.support.concurrency</name>
<value>true</value>
</property>
<property>
<name>hive.zookeeper.quorum</name>
<value>hdfs127,hdfs128,hdfs129</value>
</property>
<property>
<name>hive.zookeeper.client.port</name>
<value>2181</value>
</property>
<property>
<name>hive.zookeeper.namespace</name>
<value>hive_zookeeper_namespace_hive</value>
</property>
<property>
<name>hbase.zookeeper.quorum</name>
<value>hdfs127,hdfs128,hdfs129</value>
</property>
<property>
<name>hbase.zookeeper.property.clientPort</name>
<value>2181</value>
</property>
<property>
<name>hive.cluster.delegation.token.store.class</name>
<value>org.apache.hadoop.hive.thrift.MemoryTokenStore</value>
</property>
<property>
<name>hive.server2.enable.doAs</name>
<value>true</value>
</property>
<property>
<name>hive.server2.use.SSL</name>
<value>false</value>
</property>
<property>
<name>spark.shuffle.service.enabled</name>
<value>true</value>
</property>
<property>
<name>hive.strict.checks.orderby.no.limit</name>
<value>false</value>
</property>
<property>
<name>hive.strict.checks.no.partition.filter</name>
<value>false</value>
</property>
<property>
<name>hive.strict.checks.type.safety</name>
<value>true</value>
</property>
<property>
<name>hive.strict.checks.cartesian.product</name>
<value>false</value>
</property>
<property>
<name>hive.strict.checks.bucketing</name>
<value>true</value>
</property>
</configuration>
#spark\u6A21\u5F0F
spark.model=online
#hdfs online MYSQL\u914D\u7F6E
mysql.driver=com.mysql.jdbc.Driver
mysql.url=jdbc:mysql://hdfs127:3306/pica_job?useTimezone=true&serverTimezone=GMT%2B8
mysql.username=pica_spider
mysql.password=5$7FXgz#e5JWP08e
package com.common
import java.text.SimpleDateFormat
import java.util.{Calendar, Date}
/**
* created by yunfeng.wu@picahealth.com on 2019/07/30
*/
object DateUtil extends Serializable {
val yyyyMMdd = "yyyyMMdd"
def year_month_day(yyyymmdd: String) = {
val sdf = new SimpleDateFormat("yyyyMMdd")
sdf.parse(yyyymmdd)
}
def year_month_day(time: Date) = {
val sdf = new SimpleDateFormat("yyyyMMdd")
sdf.format(time)
}
def nextDay(current: Date) = {
val c = Calendar.getInstance
c.setTime(current)
c.add(Calendar.DAY_OF_MONTH, 1)
c.getTime
}
/**
* 根据日期获取当前日期所属季度
*
* @param date
* @return
*/
def getCurrentSeasonByDate(date: Date): String = {
var season = 0
val c = Calendar.getInstance
c.setTime(date)
val month = c.get(Calendar.MONTH)
month match {
case Calendar.JANUARY =>
case Calendar.FEBRUARY =>
case Calendar.MARCH =>
season = 1
case Calendar.APRIL =>
case Calendar.MAY =>
case Calendar.JUNE =>
season = 2
case Calendar.JULY =>
case Calendar.AUGUST =>
case Calendar.SEPTEMBER =>
season = 3
case Calendar.OCTOBER =>
case Calendar.NOVEMBER =>
case Calendar.DECEMBER =>
season = 4
}
return String.valueOf(season)
}
//计算两个日期之间的天数
def getBetweenDays(endDate: Long, beginDate: Long): Long = {
val cal = Calendar.getInstance()
cal.setTime(new Date(endDate));
val end = cal.getTimeInMillis()
cal.setTime(new Date(beginDate));
val begin = cal.getTimeInMillis()
val between_days = (end - begin) / (1000 * 3600 * 24)
between_days
}
import java.util.Calendar
/**
* 获取两个日期相差的月数
*
* @param d1 较大的日期
* @param d2 较小的日期
* @return 如果d1>d2返回 月数差 否则返回0
*/
def getMonthDiff(d1: Date, d2: Date): Int = {
val c1 = Calendar.getInstance
val c2 = Calendar.getInstance
c1.setTime(d1)
c2.setTime(d2)
if (c1.getTimeInMillis < c2.getTimeInMillis) return 0
val year1 = c1.get(Calendar.YEAR)
val year2 = c2.get(Calendar.YEAR)
val month1 = c1.get(Calendar.MONTH)
val month2 = c2.get(Calendar.MONTH)
val day1 = c1.get(Calendar.DAY_OF_MONTH)
val day2 = c2.get(Calendar.DAY_OF_MONTH)
// 获取年的差值 假设 d1 = 2015-8-16 d2 = 2011-9-30
var yearInterval = year1 - year2
// 如果 d1的 月-日 小于 d2的 月-日 那么 yearInterval-- 这样就得到了相差的年数
if (month1 < month2 || month1 == month2 && day1 < day2) {
yearInterval -= 1; yearInterval + 1
}
// 获取月数差值
var monthInterval = (month1 + 12) - month2
if (day1 < day2) {
monthInterval -= 1; monthInterval + 1
}
monthInterval %= 12
yearInterval * 12 + monthInterval
}
//获取日期时间字符串(yyyy-MM-dd HH:mm:ss)前后x天的时间
def beforeDay(dateTime: String, days: Int) = {
val date = Calendar.getInstance()
date.setTime(new Date(dateToLong(dateTime, yyyyMMdd)))
date.set(Calendar.DATE, date.get(Calendar.DATE) - days)
date.getTime().getTime
}
//得到前x天的时间
def beforeNowXDayTime(days: Int) = {
val date = Calendar.getInstance()
date.setTime(new Date())
date.set(Calendar.DATE, date.get(Calendar.DATE) - days)
date.getTime().getTime
}
//得到x年前的时间
def beforeNowXYearTime(year: Int) = {
val date = Calendar.getInstance()
date.setTime(new Date())
date.add(Calendar.YEAR, -year)
date.getTime().getTime
}
//得到前x个月的时间
def beforeNowXMonthTime(month: Int) = {
val date = Calendar.getInstance()
date.setTime(new Date())
date.add(Calendar.MONTH, -month)
date.getTime().getTime
}
//获取本月的第一天
def getNowMonthStart(): String = {
var period: String = ""
val cal: Calendar = Calendar.getInstance();
val df: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
cal.set(Calendar.DATE, 1)
period = df.format(cal.getTime()) //本月第一天
period
}
def dateToLong(dateTime: String, format2: String) = {
val sdf = new SimpleDateFormat(format2)
sdf.parse(dateTime).getTime
}
def longToString(t: Long, format: String) = {
val sdf = new SimpleDateFormat(format)
val d = new Date(t)
sdf.format(d)
}
}
package com.config
/**
*
* 埋点流量相关常量配置
* @Author zhenxin.ma
* @Date 2019/8/20 9:43
* @Version 1.0
*/
object MyConfigSession {
//Hive的DW层流量表
final val HIVE_TABLE1: String = "pica_dw.dw_fact_log_session"
final val HIVE_TABLE2: String = "pica_dw.dw_fact_log_session_path"
//写入的文件路径
final val PARQUET_PATH: String = "hdfs://bi-name1:8020/tmp/output/"
//流量表中的临时视图
final val VIEW_SESSION_NO_MATCH: String = "ods_session_no_user_id"
final val VIEW_EQUIPMENT_INFO: String = "equipment_info"
final val VIEW_DEVICE_TOKEN: String = "device_token_match"
//流量表中使用的三个字典表作为过滤数据条件
final val ACTION_TYPE_SQL: String = "select action_type,'1' as is_valid from pica_dw.dw_dim_log_action_type where is_valid=1"
final val CLASS_NAME_SQL: String = "select class_name, '0' as is_valid from pica_dw.dw_dim_log_class_name where is_valid=0"
final val MENU_CODE_SQL: String = "select view_path, menu_code from pica_dw.dw_dim_log_menu_class_code where view_path is not Null"
//流量表中根据action_type获取对应的action_category类型
final val ACTION_CATEGORY_SQL: String = "select action_type,action_category from pica_dw.dw_dim_log_action_type where is_valid=1"
//从源表pica_log.picalog_trace_app_part中执行SQL获取源数据,这里获取昨天的
final val SOURCE_SQL: String =
"""
|select pseudo_session,doctor_id,mobile,device_token,user_token_tourist,class_name,view_path,action,
|component_tag,app_version,device_type,device_brand,device_model,network_type,created from pica_log.picalog_trace_app_part
| where created_day = DATE_SUB(current_date(),1) and pseudo_session is not null and pseudo_session !=''
| and pseudo_id !='' and extra_info !='com.picahealth.patient' and serviceName != 'trace3'
""".stripMargin
//从源表pica_log.picalog_trace_app_part中执行SQL获取源数据,输入指定日期参数时执行的SQL
final val SOURCE_SQL_ARGS: String =
"""
|select pseudo_session,doctor_id,mobile,device_token,user_token_tourist,class_name,view_path,action,
|component_tag,app_version,device_type,device_brand,device_model,network_type,created from pica_log.picalog_trace_app_part
| where pseudo_session is not null and pseudo_session !=''
| and pseudo_id !='' and extra_info !='com.picahealth.patient' and serviceName != 'trace3'
""".stripMargin
//从dw_fact_log_session表中筛选数据
final val SOURCE_SQL_PATH: String =
s"""
|select session_id,cast(user_id as int) user_id,action_type,user_token,menu_code,action_code,position,label_value,
|app_version,device_type,created_time,date_time from ${MyConfigSession.HIVE_TABLE1}
| where created_day=DATE_SUB(current_date(),1) and app_version >= '3.1.7'
| AND ((action_type ='ACTION_CLICK') OR (action_type ='ACTION_VIEW' and menu_code != '0' and menu_code !='null' and menu_code !=''))
""".stripMargin
//匹配user_id的条件
//1.使用equipment表匹配,默认是昨天的
final val EQUIPMENT_INFO_SQL: String =
"""
|SELECT a.user_id,a.device_token ,ROW_NUMBER() OVER ( PARTITION BY a.device_token ORDER BY a.creat_time DESC ) row_d
|from pica_ds.picams_equipment_info AS a
| where a.user_id IS NOT NULL
| AND (to_date(a.creat_time) = date_sub(current_date(),1) OR to_date(a.modify_time) = date_sub(current_date(),1))
""".stripMargin
//注意:1.使用equipment表匹配,如果匹配的是历史指定日期的数据,那么需要使用不同的表和下面这个SQL表,限定stat_date日期
final val EQUIPMENT_INFO_SQL_ARGS: String =
"""
|SELECT a.doctor_id as user_id, a.device_token, ROW_NUMBER() OVER (PARTITION BY a.device_token ORDER BY a.creat_time DESC) row_d
| from pica_ods.ods_zipper_picams_equipment_info AS a where a.doctor_id IS NOT NULL AND a.stat_date=
""".stripMargin
//先和equiment表进行匹配,使用 device_token 匹配,得到 user_id 匹配,'0'
final val DEVICE_TOKEN_SQL: String =
s"""
|SELECT t.session_id, COALESCE(cast(b.user_id as string),'0') AS user_id, t.mobile, t.device_token, t.user_token,
|t.view_class,t.view_path,t.action_type,t.component_tag, t.menu_code,
|t.action_code, t.position, t.label_value,t.app_version,t.device_type,
|t.device_brand, t.device_model, t.device_system,t.net_type,t.created_time,
|t.date_time from ${MyConfigSession.VIEW_SESSION_NO_MATCH} as t
|left join ${MyConfigSession.VIEW_EQUIPMENT_INFO} as b on t.device_token = b.device_token
""".stripMargin
//在device_token匹配的基础上,筛选出没有匹配到的user_id,使用 mobile_phone 进行匹配,得到 user_id 匹配,'0'
final val MOBILE_PHONE_SQL: String =
s"""
|SELECT ss.session_id, COALESCE(cast(b.id as string),'0') AS user_id, ss.mobile, ss.device_token, ss.user_token,
|ss.view_class,ss.view_path,ss.action_type,ss.component_tag, ss.menu_code,
|ss.action_code, ss.position,ss.label_value,ss.app_version, ss.device_type,
|ss.device_brand, ss.device_model,ss.device_system,ss.net_type,ss.created_time,
|ss.date_time from (select * from ${MyConfigSession.VIEW_DEVICE_TOKEN} as a where a.user_id = '0') AS ss
|left join (select id,mobile_phone from pica_ds.pica_doctor where pica_doctor.delete_flag = 1 ) AS b on ss.mobile = b.mobile_phone
""".stripMargin
//sessionGap时间,切割 session 的依据,30 minutes
final val SESSION_GAP: Long = 30 * 60 * 1000
//分隔符
final val DELIMITER: String = "\001"
//hdfs127 Mysql配置,记录任务的执行情况
final val JDBC_DRIVER = "com.mysql.jdbc.Driver"
final val DATA_BASE = "pica_job"
final val JDBC_URL = s"jdbc:mysql://hdfs127:3306/${DATA_BASE}?useTimezone=true&serverTimezone=GMT%2B8"
final val JDBC_USERNAME = "pica_spider"
final val JDBC_PSSWORD = "5$7FXgz#e5JWP08e"
final val JDBC_TABLE = "schedule_job_record"
}
package com.session
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
/**
* 自动删除存在的HDFS目录
* @Author zhenxin.ma
* @Date 2019/9/4 20:42
* @Version 1.0
*/
class DeletePath(val outpath:String) {
def deletePath:Unit = {
//Hadoop的配置
val conf: Configuration = new Configuration()
//获取Hadoop的文件系统
val fs: FileSystem = FileSystem.get(conf)
//构造Path对象
val path: Path = new Path(outpath)
if (fs.exists(path)){
fs.delete(path,true)
println(s"delete ${outpath} success!")
}
}
}
此差异已折叠。
此差异已折叠。
package com.session
import java.sql
import java.sql.PreparedStatement
import com.config.MyConfigSession
import com.pica.utils.DateUtils
import com.utils.{JDBCUtil, UseUtil}
import org.apache.spark.SparkConf
import org.apache.spark.sql.expressions.{Window, WindowSpec}
import org.apache.spark.sql.functions.{lag, row_number}
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* 处理昨天的数据,导入到pica_dw.dw_fact_log_session_path表
* @Author zhenxin.ma
* @Date 2020/3/27 10:58
* @Version 1.0
*/
class SessionProcessPath {
def getSparkSession(appName: String): SparkSession = {
val conf: SparkConf = new SparkConf().setAppName(appName)
UseUtil.setConfigure(conf)
val sparkSession: SparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
sparkSession
}
}
object SessionProcessPath {
def apply(): SessionProcessPath = new SessionProcessPath()
def main(args: Array[String]): Unit = {
//1.执行任务之前先往record表记录
val insertSQL: String =
s"""
|insert into ${MyConfigSession.DATA_BASE}.${MyConfigSession.JDBC_TABLE} (job_id,job_name,job_type,job_scn,status,start_time)
|values(1968,'pica_dw.dw_fact_log_session_path','3',?,'0',?)
""".stripMargin
//设置同步数据的批次号,格式是2019-09-12
val scnData: String = DateUtils.getYesterdayDate
//设置任务开始时间,格式是2019-09-12 14:03:30
val startTime: String = DateUtils.getTodayTime
//存储SQL中的参数
val insertArr: Array[String] = Array[String](scnData, startTime)
//获取MYSQL连接
val connSql: sql.Connection = JDBCUtil.getConnection()
//向 record 表插入数据
val flag: Int = JDBCUtil.insertRecord(connSql, insertSQL, insertArr)
try {
val sparkSession: SparkSession = SessionProcessPath().getSparkSession("SessionProcessPath")
//筛选源数据
val sourceDF: DataFrame = sparkSession.sql(MyConfigSession.SOURCE_SQL_PATH)
//注册日期在流量统计日期之前的用户
val doctorDF: DataFrame = sparkSession.sql(
"select id from pica_ds.pica_doctor where to_date(creat_time) <=DATE_SUB(current_date(),1)")
sourceDF.join(doctorDF, sourceDF("user_id") === doctorDF("id"), "left")
.createOrReplaceTempView("tmp_table")
//将id为null的记录设置为0
val reSql: String = "select session_id,case when id is null then 0 else user_id END as user_id,action_type," +
"user_token,menu_code,action_code,position,label_value,app_version,device_type,created_time,date_time from tmp_table"
val selectDF: DataFrame = sparkSession.sql(reSql)
println("-----------------------------------compute refer columns-----------------------------------------")
val resultDF: DataFrame = getReferColumns(selectDF,sparkSession)
println("-----------------------------------load data to pica_dw.dw_fact_log_session_path-----------------")
loadData(resultDF,sparkSession,scnData)
println("----------------------------------update task record table---------------------------------------")
//任务执行成功,更新 Mysql record 配置表
val updateSQL: String =
s"""
|update ${MyConfigSession.JDBC_TABLE} set status=?,end_time=?,data_count=? where job_id=1968 and start_time='${startTime}'
""".stripMargin
val upreSta: PreparedStatement = connSql.prepareStatement(updateSQL)
upreSta.setString(1, "1")
upreSta.setString(2, DateUtils.getTodayTime)
upreSta.setInt(3, resultDF.count().toInt)
//更新表数据
upreSta.executeUpdate()
//关闭连接
JDBCUtil.close(connSql, upreSta)
sparkSession.stop()
}catch {
case e:Exception => {
println("-----------------------------------任务异常---------------------------------------------------")
val exceptionSQL: String =
s"""
|update ${MyConfigSession.JDBC_TABLE} set status=?,exception=?,end_time=? where job_id=1968 and start_time='${startTime}'
""".stripMargin
val errorArr = Array[String]("2", e.getMessage, DateUtils.getTodayTime)
JDBCUtil.insertRecord(connSql, exceptionSQL, errorArr)
connSql.close()
}
}
}
/**
* @Description 获取需要的字段的refer字段
* @param dataFrame 源数据
* @param sparkSession SparkSession 环境
* @return org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>
**/
def getReferColumns(dataFrame: DataFrame ,sparkSession: SparkSession):DataFrame = {
//先按照 session_id分区,再按照 created_time排序,进行窗口计算
val sessionIDWinSpec: WindowSpec = Window.partitionBy("session_id").orderBy("created_time")
//增叫refer_字段
val menuDF: DataFrame =
dataFrame.withColumn("refer_menu_code", lag(dataFrame("menu_code"), 1).over(sessionIDWinSpec))
val acodeDF: DataFrame =
menuDF.withColumn("refer_action_code", lag(menuDF("action_code"), 1).over(sessionIDWinSpec))
val positionDF: DataFrame =
acodeDF.withColumn("refer_position", lag(acodeDF("position"), 1).over(sessionIDWinSpec))
val actypeDF: DataFrame =
positionDF.withColumn("refer_action_type", lag(positionDF("action_type"), 1).over(sessionIDWinSpec))
val recreatDF: DataFrame =
actypeDF.withColumn("refer_created", lag(actypeDF("created_time"), 1).over(sessionIDWinSpec))
val rowNumberDF: DataFrame =
recreatDF.withColumn("step_id", row_number().over(sessionIDWinSpec))
//去掉refer字段中的NULL值
val coaleseDF: DataFrame = rowNumberDF.selectExpr(
"session_id", "user_id", "action_type", "user_token", "menu_code", "action_code", "position", "label_value",
"COALESCE(refer_menu_code,'') as refer_menu_code",
"COALESCE(refer_action_code,'') as refer_action_code",
"COALESCE(refer_position,'') as refer_position",
"COALESCE(refer_action_type,'') as refer_action_type",
"COALESCE(refer_created,created_time) as refer_created",
"step_id", "app_version", "device_type", "created_time", "date_time")
//在此基础上增加字段 refer_time_diff,值为 created_time, refer_created 之差
val referTimeDiff: DataFrame =
coaleseDF.withColumn("refer_time_diff", coaleseDF("created_time") - coaleseDF("refer_created"))
referTimeDiff
}
/**
* @Description 导入数据到表中
* @param dataFrame 源数据
* @param sparkSession SparkSession 环境
* @param partitionDay 分区日期
* @return void
**/
def loadData(dataFrame: DataFrame, sparkSession: SparkSession, partitionDay:String):Unit = {
dataFrame.createOrReplaceTempView("result_view")
val loadDataSql =
s"""
|insert overwrite table ${MyConfigSession.HIVE_TABLE2} partition(created_day='${partitionDay}')
| select session_id,user_id,action_type,user_token,menu_code,action_code,position,label_value,
| refer_menu_code,refer_action_code,refer_position,refer_action_type,
| cast(refer_time_diff as int) as refer_time_diff,step_id,app_version,device_type,created_time,date_time
| from result_view
""".stripMargin
sparkSession.sql(loadDataSql)
}
}
package com.session
import java.sql
import java.sql.PreparedStatement
import com.config.MyConfigSession
import com.pica.utils.DateUtils
import com.utils.{JDBCUtil, UseUtil}
import org.apache.spark.SparkConf
import org.apache.spark.sql.expressions.{Window, WindowSpec}
import org.apache.spark.sql.functions.{lag, row_number}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
/**
* 传递参数,处理指定日期的数据,导入到pica_dw.dw_fact_log_session_path表
* @Author zhenxin.ma
* @Date 2020/3/11 11:12
* @Version 1.0
*/
class SessionProcessPathArgs {
def getSparkSession(appName: String): SparkSession = {
val conf: SparkConf = new SparkConf().setAppName(appName)
UseUtil.setConfigure(conf)
val sparkSession: SparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
sparkSession
}
}
object SessionProcessPathArgs {
def apply(): SessionProcessPathArgs = new SessionProcessPathArgs()
def main(args: Array[String]): Unit = {
//1.执行任务之前先往record表记录
val insertSQL: String =
s"""
|insert into ${MyConfigSession.DATA_BASE}.${MyConfigSession.JDBC_TABLE} (job_id,job_name,job_type,job_scn,status,start_time)
|values(1968,'pica_dw.dw_fact_log_session_path','3',?,'0',?)
""".stripMargin
//设置同步数据的批次号,格式是2019-09-12
val scnData: String = args(0)
//设置任务开始时间,格式是2019-09-12 14:03:30
val startTime: String = DateUtils.getTodayTime
//存储SQL中的参数
val insertArr: Array[String] = Array[String](scnData, startTime)
//获取MYSQL连接
val connSql: sql.Connection = JDBCUtil.getConnection()
//向 record 表插入数据
val flag: Int = JDBCUtil.insertRecord(connSql, insertSQL, insertArr)
try {
val sparkSession: SparkSession = SessionProcessPath().getSparkSession("SessionProcessPathArgs")
//筛选源数据
val sourceSql =
s"""
|select session_id,cast(user_id as int) user_id,action_type,user_token,menu_code,action_code,position,label_value,
|app_version,device_type,created_time,date_time from ${MyConfigSession.HIVE_TABLE1}
| where created_day='${args(0)}' and app_version >= '3.1.7' and menu_code !='null' and menu_code !=''
| and ((action_type ='ACTION_VIEW' and menu_code != '0') or (action_type ='ACTION_CLICK' and action_code !=''))
""".stripMargin
val sourceDF: DataFrame = sparkSession.sql(sourceSql)
//注册日期在流量统计日期之前的用户
val doctorDF: DataFrame = sparkSession.sql(
"select id from pica_ds.pica_doctor where to_date(creat_time) <=DATE_SUB(current_date(),1)")
sourceDF.join(doctorDF, sourceDF("user_id") === doctorDF("id"), "left")
.createOrReplaceTempView("tmp_table")
//将id为null的记录设置为0
val reSql: String = "select session_id,case when id is null then 0 else user_id END as user_id,action_type," +
"user_token,menu_code,action_code,position,label_value,app_version,device_type,created_time,date_time from tmp_table"
val selectDF: DataFrame = sparkSession.sql(reSql)
println("-----------------------------------compute refer columns-----------------------------------------")
val resultDF: DataFrame = getReferColumns(selectDF,sparkSession)
println("-----------------------------------load data to pica_dw.dw_fact_log_session_path-----------------")
loadData(resultDF,sparkSession,scnData)
println("----------------------------------update task record table---------------------------------------")
//任务执行成功,更新 Mysql record 配置表
val updateSQL: String =
s"""
|update ${MyConfigSession.JDBC_TABLE} set status=?,end_time=?,data_count=? where job_id=1968 and start_time='${startTime}'
""".stripMargin
val upreSta: PreparedStatement = connSql.prepareStatement(updateSQL)
upreSta.setString(1, "1")
upreSta.setString(2, DateUtils.getTodayTime)
upreSta.setInt(3, resultDF.count().toInt)
//更新表数据
upreSta.executeUpdate()
//关闭连接
JDBCUtil.close(connSql, upreSta)
sparkSession.stop()
}catch {
case e:Exception => {
println("-----------------------------------任务异常---------------------------------------------------")
val exceptionSQL: String =
s"""
|update ${MyConfigSession.JDBC_TABLE} set status=?,exception=?,end_time=? where job_id=1968 and start_time='${startTime}'
""".stripMargin
val errorArr = Array[String]("2", e.getMessage, DateUtils.getTodayTime)
JDBCUtil.insertRecord(connSql, exceptionSQL, errorArr)
connSql.close()
}
}
}
/**
* @Description 获取需要的字段的refer字段
* @param dataFrame 源数据
* @param sparkSession SparkSession 环境
* @return org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>
**/
def getReferColumns(dataFrame: DataFrame ,sparkSession: SparkSession):DataFrame = {
//先按照 session_id分区,再按照 created_time排序,进行窗口计算
val sessionIDWinSpec: WindowSpec = Window.partitionBy("session_id").orderBy("created_time")
//增叫refer_字段
val menuDF: DataFrame =
dataFrame.withColumn("refer_menu_code", lag(dataFrame("menu_code"), 1).over(sessionIDWinSpec))
val acodeDF: DataFrame =
menuDF.withColumn("refer_action_code", lag(menuDF("action_code"), 1).over(sessionIDWinSpec))
val positionDF: DataFrame =
acodeDF.withColumn("refer_position", lag(acodeDF("position"), 1).over(sessionIDWinSpec))
val actypeDF: DataFrame =
positionDF.withColumn("refer_action_type", lag(positionDF("action_type"), 1).over(sessionIDWinSpec))
val recreatDF: DataFrame =
actypeDF.withColumn("refer_created", lag(actypeDF("created_time"), 1).over(sessionIDWinSpec))
val rowNumberDF: DataFrame =
recreatDF.withColumn("step_id", row_number().over(sessionIDWinSpec))
//去掉refer字段中的NULL值
val coaleseDF: DataFrame = rowNumberDF.selectExpr(
"session_id", "user_id", "action_type", "user_token", "menu_code", "action_code", "position", "label_value",
"COALESCE(refer_menu_code,'') as refer_menu_code",
"COALESCE(refer_action_code,'') as refer_action_code",
"COALESCE(refer_position,'') as refer_position",
"COALESCE(refer_action_type,'') as refer_action_type",
"COALESCE(refer_created,created_time) as refer_created",
"step_id", "app_version", "device_type", "created_time", "date_time")
//在此基础上增加字段 refer_time_diff,值为 created_time, refer_created 之差
val referTimeDiff: DataFrame =
coaleseDF.withColumn("refer_time_diff", coaleseDF("created_time") - coaleseDF("refer_created"))
referTimeDiff
}
/**
* @Description 导入数据到表中
* @param dataFrame 源数据
* @param sparkSession SparkSession 环境
* @param partitionDay 分区日期
* @return void
**/
def loadData(dataFrame: DataFrame, sparkSession: SparkSession, partitionDay:String):Unit = {
dataFrame.createOrReplaceTempView("result_view")
val loadDataSql =
s"""
|insert overwrite table ${MyConfigSession.HIVE_TABLE2} partition(created_day='${partitionDay}')
| select session_id,user_id,action_type,user_token,menu_code,action_code,position,label_value,
| refer_menu_code,refer_action_code,refer_position,refer_action_type,
| cast(refer_time_diff as int) as refer_time_diff,step_id,app_version,device_type,created_time,date_time
| from result_view
""".stripMargin
sparkSession.sql(loadDataSql)
}
}
\ No newline at end of file
package com.stream
import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.util.Date
import com.alibaba.fastjson.{JSON, JSONArray, JSONObject}
import com.common.DateUtil
import com.tmp.PropertiesUtil
import org.apache.hadoop.hive.ql.exec.UDF
import org.apache.hadoop.io.compress.GzipCodec
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, Row, SparkSession, types}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.rdd.RDD
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, TimestampType}
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
/**
* 消费kafka
* @Author zhenxin.ma
* @Date 2020/2/24 13:54
* @Version 1.0
*/
object PraseLogRestore {
var tableName = ""
val traceFields = List(
"package_id",
"uuid",
"device_token",
"pseudo_session",
"pseudo_id",
"class_name",
"action",
"view_path",
"component_tag",
"created",
"user_token",
"mobile",
"doctor_id",
"device_brand",
"device_model",
"app_version",
"device_type",
"device_ip",
"web_data",
"web_data_type",
"alternate_info",
"extra_info",
"network_type",
"created_on",
"remark1",
"remark2",
"remark3",
"remark4",
"remark5",
"remark6",
"remark7",
"remark8",
"remark9",
"remark10",
"user_token_tourist",
"machineID",
"serviceName",
"serviceSidePacketId",
"serviceSideRecordId")
def main(args: Array[String]): Unit = {
// val warehouseLocation = "hdfs://master61:8020/user/hive/warehouse"
//入口
if (args.length < 1) {
System.err.println("Usage: ParseLog <tableName>")
System.exit(1)
}
tableName = args.apply(0)
val conf: SparkConf = new SparkConf().setAppName("ParseLogRestore")
conf.set("spark.serializer", classOf[KryoSerializer].getName)
//启动反压机制
conf.set("spark.streaming.backpressure.enabled","true")
//启用反压机制时每个接收器接收第一批数据的初始最大速率
conf.set("spark.streaming.backpressure.initialRate", "1000")
//每秒钟从每个分区消费的最大数据
conf.set("spark.streaming.kafka.maxRatePerPartition","1000")
val spark = SparkSession
.builder()
.config(conf)
.enableHiveSupport()
.getOrCreate()
val ssc = new StreamingContext(spark.sparkContext,Seconds(PropertiesUtil.propertiesMap.get("kafka.during").toLong))
val kafkaParams = Map(
"bootstrap.servers" -> s"${PropertiesUtil.propertiesMap.get("kafka.brokers")}",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> s"${PropertiesUtil.propertiesMap.get("kafka.groupId")}",
//如果没有记录偏移量,就消费最早的数据
"auto.offset.reset" -> "earliest",
// "auto.offset.reset" -> "latest",
//spark 消费kafka中的偏移量自动维护: kafka 0.10之前的版本自动维护在zookeeper kafka 0.10之后偏移量自动维护topic(__consumer_offsets)
//开启自动维护偏移量
"enable.auto.commit" -> (true: java.lang.Boolean)
)
var topics = Array(s"${PropertiesUtil.propertiesMap.get("kafka.topic")}")
//直连方式
val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String,String](ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String,String](topics,kafkaParams))
val dStream = stream.map(cr => {
// println("cr.value==>"+cr.value())
cr.value()
}).repartition(1)
.foreachRDD(rs=> {
val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
var now = sdf.format(new Date().getTime)
// var tday= now.split(" ").apply(0).trim
var tday="2020-02-24"
val time = now.split(" ").apply(1).trim
if(time.startsWith("00:00")){//如果是0点0分,将该窗口数据写入上一天日期分区
tday = sdf.format(new Date().getTime-10000).split(" ").apply(0).trim
}
// val path = s"/data/logs/trace_logs/${tday}/${new Date().getTime}"
val path = s"/data/logs/trace_logs_tmp/${tday}/${new Date().getTime}"
println(s"hdfs_path==>${path}")
rs.saveAsTextFile(path) //使用gz格式对文件进行压缩
val rdd = parseForeach(rs.filter(_.toString().contains("datas\\\":")))
val df = createDf(spark,rdd)
df.show()
// println(s"df.count:${df.count()}")
writeToHive(spark,df,tday)
})
ssc.start()
ssc.awaitTermination()
}
def parseForeach(rs: RDD[String]) = {
rs.flatMap(line=>{
//去掉行首[行尾]
var linex = line.toString()
if(line.toString().endsWith("]") || line.toString().endsWith(")")){
linex = line.toString().dropRight(1)
}
val regex = "\"".r
var jsonStr = linex.toString().replaceAll("\\\\","")
jsonStr = regex.replaceFirstIn(jsonStr,"").dropRight(1)
// println(s"jsonStr:${jsonStr}")
val lines = new ListBuffer[Row]()
try {
val jsonObj = JSON.parseObject(jsonStr)
if (jsonObj.containsKey("datas")) {
val jsonArr: JSONArray = jsonObj.getJSONArray("datas")
if (jsonArr.size() > 0) {
for (i <- 0 to jsonArr.size() - 1) {
val fieldValues = ArrayBuffer[Any]()
fieldValues.append(0) //id值默认为0
val eachJson: JSONObject = jsonArr.getJSONObject(i)
for (field <- traceFields) {
if (field.equals("created_on")) {
fieldValues.append(new Timestamp(new Date().getTime()))
} else if (eachJson.containsKey(field)) {
fieldValues.append(eachJson.getString(field))
} else {
fieldValues.append("")
}
}
lines.append(Row.fromSeq(fieldValues.toSeq))
}
}
}
} catch {
case e:Exception => println(s"parseException:${e.getMessage}===>jsonStr:${jsonStr}")
}
lines.toList
})
}
def createDf(spark:SparkSession,rdd: RDD[Row]): DataFrame ={
val schemaList = new ListBuffer[StructField]
schemaList.append(StructField("id",IntegerType, false))
traceFields.map(eachField=>{
var struct:StructField = null
if(eachField.equals("created_on")){
struct = StructField(eachField, TimestampType, false)
}else if(eachField.equals("id")){
struct = StructField(eachField, IntegerType, false)
}else {
struct = StructField(eachField, StringType, false)
}
schemaList.append(struct)
})
val schema = types.StructType(schemaList.toList)
val resDF = spark.createDataFrame(rdd,schema)
// resDF.printSchema()
// resDF.show(false)
return resDF
}
def writeToHive(spark: SparkSession, df: DataFrame,tday: String ): Unit = {
df.createOrReplaceTempView("picalog_trace_app_part")
val sql = s"insert into ${tableName} partition(created_day='${tday}') select * from picalog_trace_app_part"
println(s"[excute sql]:${sql}")
spark.sql(sql)
}
}
package com.tmp
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable.ListBuffer
/**
* @Author zhenxin.ma
* @Date 2020/3/19 13:02
* @Version 1.0
*/
object ReduceByKeyTest {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val rdd: RDD[Int] = sc.parallelize(List(1,1,4,5,6,3,8,9,1,3,4,4,9))
val mapRDD: RDD[(Int, Int)] = rdd.map((_,2))
val fu: (Int, Int) => Int = (a:Int,b:Int) => {
a * b
}
val result: RDD[(Int, Int)] = mapRDD.reduceByKey(fu)
val arr: Array[(Int, Int)] = result.collect()
arr.foreach(f => {
println(f._1 + " " + f._2)
})
}
}
package com.tmp
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
/**
* 把RDD[T]编程RDD[case class],使用隐式转换toDF()得到DataFrame
* @Author zhenxin.ma
* @Date 2020/3/20 10:15
* @Version 1.0
*/
case class Person( id:Int, name:String, age:Int, score:Int){
}
object SparkRow_1 {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("").setMaster("local[*]")
val sparkSession: SparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
val sc: SparkContext = sparkSession.sparkContext
val source: RDD[String] = sc.textFile("file:\\D:\\ideaworkspace\\label\\data\\ss.text")
val personRDD: RDD[Person] = source.map(f => {
val strs: Array[String] = f.split(",")
Person(strs(0).toInt, strs(1), strs(2).toInt, strs(3).toInt)
})
import sparkSession.implicits._
val df: DataFrame = personRDD.toDF()
df.printSchema()
df.show(5)
sparkSession.close()
}
}
package com.tmp
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* 通过元组创建
*
* @Author zhenxin.ma
* @Date 2020/3/20 10:40
* @Version 1.0
*/
object SparkRow_2 {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("").setMaster("local[*]")
val sparkSession: SparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
val sc: SparkContext = sparkSession.sparkContext
val source: RDD[String] = sc.textFile("file:\\D:\\ideaworkspace\\label\\data\\ss.text")
val personRDD: RDD[(Int, String, Int, Int)] = source.map(f => {
val strs: Array[String] = f.split(",")
(strs(0).toInt, strs(1), strs(2).toInt, strs(3).toInt)
})
import sparkSession.implicits._
val df: DataFrame = personRDD.toDF("id","name","age","score")
df.printSchema()
df.show(5)
sparkSession.close()
}
}
package com.tmp
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
import com.tmp.Student
/**
*
* 通过转换成JavaBean创建DataFrame
* @Author zhenxin.ma
* @Date 2020/3/20 10:46
* @Version 1.0
*/
object SparkRow_3 {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("").setMaster("local[*]")
val sparkSession: SparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
val sc: SparkContext = sparkSession.sparkContext
val source: RDD[String] = sc.textFile("file:\\D:\\ideaworkspace\\label\\data\\ss.text")
val stuRDD: RDD[Student] = source.map(f => {
val strs: Array[String] = f.split(",")
new Student(strs(0).toInt, strs(1), strs(2).toInt, strs(3).toInt)
})
val df: DataFrame = sparkSession.createDataFrame(stuRDD,classOf[Student])
df.printSchema()
df.show(5)
sparkSession.close()
}
}
package com.tmp
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* 通过scala版的bean类注解实现
* @Author zhenxin.ma
* @Date 2020/3/20 10:59
* @Version 1.0
*/
object SparkRow_4 {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("").setMaster("local[*]")
val sparkSession: SparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
val sc: SparkContext = sparkSession.sparkContext
val source: RDD[String] = sc.textFile("file:\\D:\\ideaworkspace\\label\\data\\ss.text")
val stuRDD: RDD[Teacher] = source.map(f => {
val strs: Array[String] = f.split(",")
Teacher(strs(0).toInt, strs(1), strs(2).toInt, strs(3).toInt)
})
val df: DataFrame = sparkSession.createDataFrame(stuRDD,classOf[Teacher])
df.printSchema()
df.show(5)
sparkSession.close()
}
}
package com.tmp
/**
* 通过schema创建Row对象
* @Author zhenxin.ma
* @Date 2020/4/1 17:00
* @Version 1.0
*/
object SparkRow_5 {
}
package com.tmp
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
/**
* @Author zhenxin.ma
* @Date 2020/3/19 16:22
* @Version 1.0
*/
object SparkSqlExcercise {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("").setMaster("local[*]")
val sparkSession: SparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
val df: DataFrame = sparkSession.sql("")
// df.select("")
// df.agg(("id","max"),("time","min"))
// df.join(df,df("id") === df("is"))
// df.withColumn("",explode(df("")))
df.explode("","")((t:String)=>t.split(""))
// df.groupBy("").agg(sum(""))
}
}
package com.tmp;
/**
* @Author zhenxin.ma
* @Date 2020/3/20 10:46
* @Version 1.0
*/
public class Student {
int id;
String name;
int age;
int score;
public Student() {
}
public Student(int id, String name, int age, int score) {
this.id = id;
this.name = name;
this.age = age;
this.score = score;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public int getScore() {
return score;
}
public void setScore(int score) {
this.score = score;
}
@Override
public String toString() {
return "Student{" +
"id=" + id +
", name='" + name + '\'' +
", age=" + age +
", score=" + score +
'}';
}
}
package com.tmp
import scala.beans.BeanProperty
/**
* 通过scala版的bean类注解实现
* @Author zhenxin.ma
* @Date 2020/3/20 11:06
* @Version 1.0
*/
class Teacher (
@BeanProperty
val id: Int,
@BeanProperty
val name:String,
@BeanProperty
val age: Int,
@BeanProperty
val score: Int)
object Teacher{
def apply(id: Int, name: String, age: Int, score: Int): Teacher = new Teacher(id, name, age, score)
}
package com.utils
import com.session.{SessionProcess, SessionProcessArgs, SessionProcessHistoryPathArgs, SessionProcessPath, SessionProcessPathArgs}
import org.apache.hadoop.util.ProgramDriver
/**
* @author kim
* @since 2019/8/14 20:15
* @version scala-2.11.12
*/
object Driver {
def main(args: Array[String]): Unit = {
val driver: ProgramDriver = new ProgramDriver()
// driver.addClass("SessionProcessHistoryPathArgs",classOf[SessionProcessHistoryPathArgs],"传递日期参数--用户Session数据分析PATH")
driver.addClass("SessionProcess",classOf[SessionProcess],"用户Session数据分析导入到dw_fact_log_session表")
driver.addClass("SessionProcessArgs",classOf[SessionProcessArgs],"传递日期参数--用户Session数据分析导入到dw_fact_log_session表")
driver.addClass("SessionProcessPath",classOf[SessionProcessPath],"用户Session数据分析导入到dw_fact_log_session_path表")
driver.addClass("SessionProcessPathArgs",classOf[SessionProcessPathArgs],"传递日期参数--用户Session数据分析导入到dw_fact_log_session_path表")
driver.run(args)
}
}
package com.utils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, HTable, Put}
import org.apache.hadoop.hbase.util.Bytes
import scala.collection.mutable.Map
/**
* HBase连接工具
*
* @Author zhenxin.ma
* @Date 2019/9/18 15:14
* @Version 1.0
*/
object HBaseUtil {
def getConnection(): Connection = {
var connection: Connection = null
try {
//一个分区创建一个链接,按照分区批量导入
val hbaseConf: Configuration = HBaseConfiguration.create()
//获取HBase连接
connection = ConnectionFactory.createConnection(hbaseConf)
} catch {
case e:Exception => {
println("--------------HBase connect exception-----------------------")
println(e.getMessage)
}
}
connection
}
def getHTable(connection: Connection ,tableName: String): HTable = {
val table: HTable = connection.getTable(TableName.valueOf(tableName)).asInstanceOf[HTable]
table
}
/**
* @Description
* @param put 基于rowkey的PUT对象
* @param family 列簇
* @param columns 列名和列对应的值构建的 Map
* @return void
**/
def setPutColumn(put:Put, family:String,columns:Map[String,String]): Unit = {
columns.foreach(map => {
val columnName: String = map._1
val value: String = map._2
put.addColumn(Bytes.toBytes(family),Bytes.toBytes(columnName),Bytes.toBytes(value))
})
}
def close(connection: Connection, table:HTable): Unit = {
try{
if (table != null) {
table.close()
}
if (connection != null) {
connection.close()
}
println("------关闭 HBase 连接和释放资源------")
}catch {
case e: Exception => e.printStackTrace()
}
}
}
package com.utils
import java.sql
import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}
import com.config.MyConfigSession
/**
* 连接 MySQL的工具类
*
* @Author zhenxin.ma
* @Date 2019/8/12 13:47
* @Version 1.0
*/
object JDBCUtil {
/**
* @Description 获取JDBC链接MYSQL
* @return java.sql.Connection
**/
def getConnection():Connection = {
var con: Connection = null
try{
Class.forName(MyConfigSession.JDBC_DRIVER)
con = DriverManager.getConnection(MyConfigSession.JDBC_URL, MyConfigSession.JDBC_USERNAME, MyConfigSession.JDBC_PSSWORD)
} catch {
case e:Exception => {
println("-----------MYSQL Connection has exception , msg = "+ e.getMessage())
}
}
con
}
/**
* @Description 获取结果集
* @param sql 执行的SQL
* @param jobName 要分析的表任务名字
* @param sync 批次号
* @return java.sql.ResultSet
**/
def getResultSet(con: Connection,sql: String, jobName:String, sync:String):ResultSet = {
var resultSet: ResultSet = null
var prest:PreparedStatement = null
try{
prest = con.prepareStatement(sql)
prest.setString(1, jobName)
prest.setString(2, sync)
resultSet = prest.executeQuery()
resultSet
}catch {
case e: Exception => e.printStackTrace()
resultSet
}
}
/**
* @Description 关闭连接和释放资源
* @return void
**/
def close(con: Connection,prest:PreparedStatement): Unit = {
try{
if (prest != null) {
prest.close()
}
if (con != null) {
con.close()
}
println("------关闭MYSQL连接和释放资源------")
}catch {
case e: Exception => e.printStackTrace()
}
}
//往配置记录表 record插入数据
def insertRecord(connSql: sql.Connection, sq: String ,array:Array[String]): Int = {
val preState: PreparedStatement = connSql.prepareStatement(sq)
for (i <-0 until array.length) {
preState.setString(i + 1,array(i))
}
val flag: Int = preState.executeUpdate()
preState.close()
flag
}
}
package com.utils
import com.session.DeletePath
/**
* @Author zhenxin.ma
* @Date 2019/9/4 20:43
* @Version 1.0
*/
object MyPredef {
//隐式转化
implicit def deleteHdfs(path:String) = new DeletePath(path)
}
package com.utils
import java.sql.{Connection, DriverManager}
import java.util.{Properties, UUID}
import org.apache.hadoop.hbase.util.{Bytes, MD5Hash}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.sql.{Column, DataFrame, Row, SaveMode, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable.ArrayBuffer
/**
* @Author zhenxin.ma
* @Date 2019/8/30 11:59
* @Version 1.0
*/
object UseUtil {
def getUUID(): String = {
val uuid: String = UUID.randomUUID().toString().replaceAll("-", "").toLowerCase
uuid
}
def getMD5(a: String): String = {
val md5: String = MD5Hash.getMD5AsHex(Bytes.toBytes(a))
md5
}
//给DF增加列
def addDataFrameColumn(df:DataFrame , map: Map[String,Column]): DataFrame={
var result: DataFrame = df
map.foreach(it => {
result = result.withColumn(it._1,it._2)
})
result
}
/**
* @Description //TODO
* @param net NotNull
* @return java.lang.String
**/
def netTypeMatch(net: String): String = {
var net_type: String = null
net match {
case "2G" => net_type = "2"
case "3G" => net_type = "3"
case "4G" => net_type = "4"
case "5G" => net_type = "5"
case "6G" => net_type = "6"
case "2G/3G/4G" => net_type = "4"
case "2G/3G/4G/5G" => net_type = "5"
case "2G/3G/4G/5G/6G" => net_type = "6"
case "WIFI" => net_type = "1"
case "WLAN" => net_type = "1"
case _:Any => net_type = "0"
}
net_type
}
//Spark 任务设置配置
def setConfigure(conf: SparkConf): Unit = {
conf.set("spark.serializer", classOf[KryoSerializer].getName)
// 序列化时使用的内存缓冲区大小
conf.set("spark.kryoserializer.buffer.max", "128m")
// 启用rdd压缩
conf.set("spark.rdd.compress", "true")
// 设置压缩格式为snappy, 默认也就是lz4, 这种压缩格式压缩比高, 速度快, 但是耗费的内存相对也多一些
conf.set("spark.io.compression.codec", "snappy")
// 设置压缩时使用的内存缓冲区大小
conf.set("spark.io.compression.snappy.blockSize", "64k")
// spark sql 在shuffle时产生的partition数量, 默认是200
conf.set("spark.sql.shuffle.partitions", "200")
// SortShuffleManager开启by-pass(不需要排序)模式的阈值, 默认为200, 在partition数量小于这个值时会开启by-pass模式
conf.set("spark.shuffle.sort.bypassMergeThreshold", "210")
//调节持久化的内存比例
conf.set("spark.memory.useLegacyMode", "true")
//设置shuffle过程中一个task拉取到上个stage的task的输出后,进行聚合操作时能够使用的Executor内存的比例,默认是0.2
//shuffle操作比较多时,适当增加这个值,增加task执行需要的内存
conf.set("spark.shuffle.memoryFraction","0.5")
// rdd默认的并行度
conf.set("spark.default.parallelism", "200")
// shuffle溢写缓冲区, 默认32k, 在内存充足的情况下可以适当增加
conf.set("spark.shuffle.file.buffer", "64k")
// shuffle read task的buffer缓冲大小, 这个缓冲区大小决定了read task每次能拉取的数据量, 在内存充足的情况下可以适当增加
conf.set("spark.reducer.maxSizeInFlight", "96m")
//设置字符串
conf.set("spark.debug.maxToStringFields","100")
}
/**
* @Description 读取配置表,筛选符合的action_type类型
* @param action
* @return java.lang.String
**/
def getActionType(action:String): String = {
var actionType: String = action
actionType match {
case "ACTION_APP_BACKGROUND" => actionType = "ACTION_BACKGROUND"
case "ACTION_CLICK" => actionType = "ACTION_CLICK"
case "ACTION_WEB_CLICK" => actionType = "ACTION_CLICK"
case "WEB_ACTION_CLICK" => actionType = "ACTION_CLICK"
case "ACTION_EQUIP_INFO" => actionType = "ACTION_EQUIP_INFO"
case "ACTION_EXIT_APP" => actionType = "ACTION_EXIT"
case "ACTION_PAGE_EXPOSE" => actionType = "ACTION_EXPOSE"
case "ACTION_RECENT_APPS" => actionType = "ACTION_RECENT"
case "ACTION_APP_START" => actionType = "ACTION_START"
case "ACTION_ACTIVITY_CREATE" => actionType = "ACTION_VIEW"
case "ACTION_LOAD_URL" => actionType = "ACTION_VIEW"
case "ACTION_WEB_ENTER" => actionType = "ACTION_VIEW"
case "ACTION_WEB_PAGE_IN" => actionType = "ACTION_VIEW"
case _ => actionType =" "
}
actionType
}
/**
* @Description 根据SQL获取广播变量
* @param sparkSession
* @param sourSql
* @param colName1
* @param colName2
* @return org.apache.spark.broadcast.Broadcast<scala.collection.immutable.Map<java.lang.String,java.lang.Object>>
**/
def getBroadcast(sparkSession: SparkSession,sourSql: String,colName1: String,colName2: String):Broadcast[Map[String,String]] = {
import sparkSession.implicits._
val df: DataFrame = sparkSession.sql(sourSql)
val map: Map[String, String] = df.map(row => {
(row.getAs[String](colName1), row.getAs[String](colName2))
}).collect().toMap
val broad: Broadcast[Map[String, String]] = sparkSession.sparkContext.broadcast(map)
broad
}
/**
* 将DataFrame保存为Mysql表
* @param dataFrame 需要保存的dataFrame
* @param tableName 保存的mysql 表名
* @param saveMode 保存的模式 :Append、Overwrite、ErrorIfExists、Ignore
* @param proPath 配置文件的路径
*/
def saveASMysqlTable(dataFrame: DataFrame, tableName: String, saveMode: SaveMode, proPath: String) = {
var table = tableName
val properties: Properties = getProPerties(proPath)
val prop = new Properties //配置文件中的key 与 spark 中的 key 不同 所以创建prop 按照spark 的格式 进行配置数据库
prop.setProperty("user", properties.getProperty("mysql.username"))
prop.setProperty("password", properties.getProperty("mysql.password"))
prop.setProperty("driver", properties.getProperty("mysql.driver"))
prop.setProperty("url", properties.getProperty("mysql.url"))
if (saveMode == SaveMode.Overwrite) {
var conn: Connection = null
try {
conn = DriverManager.getConnection(
prop.getProperty("url"),
prop.getProperty("user"),
prop.getProperty("password")
)
val stmt = conn.createStatement
table = table.toUpperCase
stmt.execute(s"truncate table $table") //此操作的目的是在覆盖的时候不删除原来的表,避免数据的类型全部变为TEXT类型
conn.close()
}
catch {
case e: Exception =>
println("MySQL Error:")
e.printStackTrace()
}
}
dataFrame.write.mode(SaveMode.Append).jdbc(prop.getProperty("url"), tableName, prop)
}
/**
* 获取配置文件
* @param proPath 配置文件路径,放在resources目录下,路径为: /myconfig.properties
* @return
*/
def getProPerties(proPath: String): Properties = {
val properties: Properties = new Properties()
properties.load(this.getClass.getResourceAsStream(proPath))
properties
}
}
Markdown 格式
0% or
您添加了 0 到此讨论。请谨慎行事。
先完成此消息的编辑!
想要评论请 注册