项目实战-知行教育大数据分析平台-03
实战项目-知行教育大数据分析平台
目录
13、增量流程-增量数据抽取oozie实现自动调度(没配完)
5、建模操作
ODS层:
--创建数据库
CREATE DATABASE IF NOT EXISTS `itcast_ods`;
--写入时压缩生效
set hive.exec.orc.compression.strategy=COMPRESSION;
--访问咨询数据表
--在这里创建的表名就不用写时间,后续mysql中所有新增的相关数据都统一存储在web_chat_ems中
CREATE EXTERNAL TABLE IF NOT EXISTS itcast_ods.web_chat_ems (
id INT comment '主键',
create_date_time STRING comment '数据创建时间',
session_id STRING comment '七陌sessionId',
sid STRING comment '访客id',
create_time STRING comment '会话创建时间',
seo_source STRING comment '搜索来源',
seo_keywords STRING comment '关键字',
ip STRING comment 'IP地址',
area STRING comment '地域',
country STRING comment '所在国家',
province STRING comment '省',
city STRING comment '城市',
origin_channel STRING comment '投放渠道',
user_match STRING comment '所属坐席', --mysql中的字段为user与这里不同,这是因为user在hive中是一个关键词,因此换了名字
manual_time STRING comment '人工开始时间',
begin_time STRING comment '坐席领取时间 ',
end_time STRING comment '会话结束时间',
last_customer_msg_time_stamp STRING comment '客户最后一条消息的时间',
last_agent_msg_time_stamp STRING comment '坐席最后一下回复的时间',
reply_msg_count INT comment '客服回复消息数',
msg_count INT comment '客户发送消息数',
browser_name STRING comment '浏览器名称',
os_info STRING comment '系统名称')
comment '访问会话信息表'
PARTITIONED BY(starts_time STRING) --按照抽取时间分区
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
stored as orc
location '/user/hive/warehouse/itcast_ods.db/web_chat_ems_ods'
TBLPROPERTIES ('orc.compress'='ZLIB'); --ODS层的数据较庞大,写操作很少,因此使用压缩率较高的zlib压缩方式
--访问咨询数据表的附属表
CREATE EXTERNAL TABLE IF NOT EXISTS itcast_ods.web_chat_text_ems (
id INT COMMENT '主键来自MySQL',
referrer STRING comment '上级来源页面',
from_url STRING comment '会话来源页面',
landing_page_url STRING comment '访客着陆页面',
url_title STRING comment '咨询页面title',
platform_description STRING comment '客户平台信息',
other_params STRING comment '扩展字段中数据',
history STRING comment '历史访问记录'
) comment 'EMS-PV测试表'
PARTITIONED BY(start_time STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
stored as orc
location '/user/hive/warehouse/itcast_ods.db/web_chat_text_ems_ods'
TBLPROPERTIES ('orc.compress'='ZLIB');
注意:ODS层创建的表一般为外部表,因为ODS层的作用是历史数据的积存工作,数据不允许修改,外部表的数据是不允许修改的。字段名不要采用关键字。其他层的表一般为内部表,因为这些层的表是我们从ODS层抽取出来的,我们对其有绝对控制权。
DWD层:
CREATE DATABASE IF NOT EXISTS `itcast_dwd`;
create table if not exists itcast_dwd.visit_consult_dwd(
session_id STRING comment '七陌sessionId',
sid STRING comment '访客id',
create_time bigint comment '会话创建时间',
seo_source STRING comment '搜索来源',
ip STRING comment 'IP地址',
area STRING comment '地域',
msg_count int comment '客户发送消息数',
origin_channel STRING COMMENT '来源渠道',
referrer STRING comment '上级来源页面',
from_url STRING comment '会话来源页面',
landing_page_url STRING comment '访客着陆页面',
url_title STRING comment '咨询页面title',
platform_description STRING comment '客户平台信息',
other_params STRING comment '扩展字段中数据',
history STRING comment '历史访问记录',
hourinfo string comment '小时'
)
comment '访问咨询DWD表'
partitioned by(yearinfo String, quarterinfo string, monthinfo String, dayinfo string) --这里加不加季度分区都可以,只是目录层级多了一层
row format delimited fields terminated by '\t'
stored as orc
location '/user/hive/warehouse/itcast_dwd.db/visit_consult_dwd'
tblproperties ('orc.compress'='SNAPPY');
DWS层:
CREATE DATABASE IF NOT EXISTS `itcast_dws`;
--访问量统计结果表
CREATE TABLE IF NOT EXISTS itcast_dws.visit_dws (
sid_total INT COMMENT '根据sid去重求count',
sessionid_total INT COMMENT '根据sessionid去重求count',
ip_total INT COMMENT '根据IP去重求count',
area STRING COMMENT '区域信息',
seo_source STRING COMMENT '搜索来源',
origin_channel STRING COMMENT '来源渠道',
hourinfo STRING COMMENT '创建时间,统计至小时',
time_str STRING COMMENT '时间明细',
from_url STRING comment '会话来源页面',
group_type STRING COMMENT '产品属性类型:1.地区;2.搜索来源;3.来源渠道;4.会话来源页面;5.总访问量',
time_type STRING COMMENT '时间聚合类型:1、按小时聚合;2、按天聚合;3、按月聚合;4、按季度聚合;5、按年聚合;')
comment 'EMS访客日志dws表'
PARTITIONED BY(yearinfo STRING,quarterinfo STRING,monthinfo STRING,dayinfo STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
stored as orc
location '/user/hive/warehouse/itcast_dws.db/visit_dws'
TBLPROPERTIES ('orc.compress'='SNAPPY');
--咨询量统计结果表
CREATE TABLE IF NOT EXISTS itcast_dws.consult_dws
(
sid_total INT COMMENT '根据sid去重求count',
sessionid_total INT COMMENT '根据sessionid去重求count',
ip_total INT COMMENT '根据IP去重求count',
area STRING COMMENT '区域信息',
origin_channel STRING COMMENT '来源渠道',
hourinfo STRING COMMENT '创建时间,统计至小时',
time_str STRING COMMENT '时间明细',
group_type STRING COMMENT '产品属性类型:1.地区;2.来源渠道;3.总咨询量',
time_type STRING COMMENT '时间聚合类型:1、按小时聚合;2、按天聚合;3、按月聚合;4、按季度聚合;5、按年聚合;'
)
COMMENT '咨询量DWS宽表'
PARTITIONED BY (yearinfo string, quarterinfo STRING, monthinfo STRING, dayinfo string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS ORC
LOCATION '/user/hive/warehouse/itcast_dws.db/consult_dws'
TBLPROPERTIES ('orc.compress'='SNAPPY');
6、hive的基础优化
(1)HDFS的副本数量
默认情况下hdfs的副本有3个,实际生产环境中一般副本也是3个。如果数据不是特别重要,也可以设置为2个副本。注意hdfs比较灵活不仅仅可以统一设置副本数,它支持针对某一个文件设置其副本数,一般情况下都是统一设置。如果使用hadoop3.x以上的版本,hdfs支持设置副本数为1.5,其中0.5不是指数据存储了一半,而是指将该数据以纠删码形式存储(纠删码是hadoop3.x以上版本中推出的一种特殊格式,如果数据出现丢失,通过纠删码可以使其恢复),以这种形式存储数据只占用原数据的一半大小,因此叫0.5。但是使用了纠删码会消耗大量的内存和其他资源。
设置副本数:在CM中可以直接进行全局配置
打开CM,进入到hdfs的配置项中,搜索“复制因子”
(2)yarn的基础配置
yarn用于资源的分配(资源:内存,cpu)
yarn不管理磁盘,磁盘是由hdfs分配管理的:
当一个mapreduce任务执行时,NodeManager提供内存和cpu,DataNode用于提供磁盘,它们管理的资源不同因此可以在一台服务器上同时拥有NodeManager和DataNode,不会冲突。
1)CPU的配置
yarn开辟的WEB UI是ResourceManager提供的,所以yarn的WEB UI数据都是总共的数据。
注意:每一个NodeManager会自动向ResourceManager报告自己当前节点有多少核cpu,默认情况下报告8核,即使实际情况各节点不是8核NodeManager也报告8核,因为yarn不会自动校验每一个节点有多少核cpu。这显然是不合适的。由于这种特殊的机制导致我们在不借助其他自动校验工具时,可能会出现实际cpu核数与WEB UI显示cpu核数不一致的问题。
推荐调整配置:当前节点有多少核,就向ResourceManager汇报多少核,据实汇报。
查看当前节点cpu核数:通过CM打开主机查看,CM工具会自动校验各节点有多个核cpu;通过命名查看:grep 'processor' /proc/cpuinfo | sort -u | wc -l
如何在yarn中配置各节点的cpu核数?
打开CM中的yarn配置,搜索yarn.nodemanager.resource.cpu-vcores即可配置
2)内存的配置
注意:每个NodeManager会自动向ResourceManager报告自己当前节点有多少剩余内存,默认情况下报告8GB,即使实际情况各节点的剩余内存不足8GNodeManager也报告8G,因为yarn同样不会自动校验每个节点有多少剩余内存。这显然是不合适的。由于这种特殊的机制导致我们在不借助其他自动校验工具时,可能会出现实际剩余内存量与ResourceManager得知的剩余内存量不匹配。
推荐调整配置:剩余内存*80%
查看当前节点内存剩余:通过CM打开主机查看,CM工具会自动校验各节点的剩余内存;通过命令方式:free -m
配置yarn可用内存:
打开CM中的yarn配置,搜索yarn.nodemanager.resource.memory-mb(表示该节点上YARN可使用的物理内存总量)即可配置。除此之外还需要配置yarn.scheduler.maximum-allocation-mb(配置单个任务可申请的最大的内存大小,与yarn.nodemanager.resource.memory-mb一致)和yarn.app.mapreduce.am.command-opts(JVM内存,略小于yarn.nodemanager.resource.memory-mb,一般为0.9倍)
本项目为了后续实验不会出现内存不足问题,将yarn可用内存设置大一点,在实际生产环境中一般配置剩余内存*80%
3)yarn本地目录的配置
配置项:yarn.nodemanager.local-dirs
作用:yarn在运行的过程中,会产生一些临时文件数据(Map task的临时结果),该配置项用于配置这些临时文件的存储位置。
推荐配置:当前服务器挂载了几块磁盘,就需要配置几个目录
如何查看当前服务器挂载了多少磁盘?以及磁盘的挂载点在那?
通过命令df -h可以查看其挂载点:
其他磁盘都是临时磁盘,不用管,当前节点的磁盘被挂载到/目录下,即后续在/目录存储的数据实质是存储在该磁盘下。本项目各节点都只有一个磁盘(挂载点都是/),因此yarn的本地目录只需要在其各自磁盘下设置1个即可。
(3)MapReduce基础配置
配置项:
mapreduce.map.memory.mb :在运行MR时,一个Map Task需要占用多大内存
mapreduce.map.java.opts :在运行MR时,一个Map Task对应的jvm需要占用多大内存
mapreduce.reduce.memory.mb :在运行MR时,一个Reduce Task需要占用多大内存
mapreduce.reduce.java.opts :在运行MR时,一个Reduce Task对应的jvm需要占用多大内存
注意:jvm的内存配置要略小于对应内存配置
上述所有的内存配置大小不能超过yarn可用的内存大小即yarn.nodemanager.resource.memory-mb配置项
推荐配置:不需手动设置,默认即可
(4)hive基础配置
1)hiveserver2的内存大小
hiveserver2在执行sql时,sql需要翻译成MR以及形成执行计划,这些都需要消耗内存,此外hiveserver2开启后,是通过客户端远程连接hive,也会消耗内存。
配置项:HiveServer2 的 Java 堆栈大小(字节)默认4G
注意:如果这个配置项的值较少,在执行sql时会出现下面的错误:此时hiveserver2宕机,需要调整hiveserver2的内存大小,并重启hiveserver2。
2)动态生成分区的线程数
配置项:hive.load.dynamic.partitions.thread
说明:在执行动态分区的时候,最多允许多少个线程来运行动态分区操作,线程越多执行效率越高,但占用资源量也会越大。默认值为15
推荐配置:先采用默认15,如果动态分区执行很慢,且此时资源还有剩余,则可以尝试调大
3)监听输入文件线程数
配置项:hive.exec.input.listing.max.threads
说明:在运行sql时,最多允许多少个线程读取hdfs上的数据,线程越多读取效率越高,但占用资源也会越大。默认值为15.
推荐配置:先采用默认15,如果读取效率很慢,且此时资源还有剩余,则可以尝试调大
(5)hive的压缩配置
除了在hive中创建表时可以指定压缩方案,在查询分析过程中,Map任务的中间结果,以及Reduce任务的最终结果都可以设置压缩方案。由于Map任务的输出结果需要通过网络传输到Reduce节点上,通过压缩Map任务的结果使得传输的数据量减少,可以获得性能的提升。对于Reduce任务最终结果的压缩既可以减少文件的存储空间,又可以加快网络传输效率。
1)Map中间结果压缩配置
配置项:hive.exec.compress.intermediate : 是否开启hive对中间结果压缩
mapreduce.map.output.compress
设置是否启动map输出压缩,默认为false。在需要减少网络传输的时候,可以设置为true。
mapreduce.map.output.compress.codec
设置map输出压缩编码解码器,默认为org.apache.hadoop.io.compress.DefaultCodec,推荐使用SnappyCodec:org.apache.hadoop.io.compress.SnappyCodec。
2)Reduce最终结果压缩配置
配置项:hive.exec.compress.output : 是否开启hive对最终结果压缩
mapreduce.output.fileoutputformat.compress : 是否启动Reduce压缩
mapreduce.output.fileoutputformat.compress.codec :指定使用什么压缩方案,推荐使用SnappyCodec:org.apache.hadoop.io.compress.SnappyCodec。
mapreduce.output.fileoutputformat.compress.type : 指定使用什么压缩类型(压缩方式):NONE(不指定)、RECORD(行压缩,默认使用)、BLOCK(块压缩),推荐配置:BLOCK,使用块压缩可以实现批量压缩,效率高。
注意:红色配置项在hive的会话窗口配置(set hive.exec.compress.intermediate=true; set hive.exec.compress.output=true;),蓝色配置项在CM的YARN中配置。红色配置项需要优先开启,如果没有开启,即使蓝色配置项配置了也不生效。
(6)Hive执行引擎
在hive中sql语句可以转化为MapReduce,同时也可以转化为Spark
配置项:hive.execution.engine : 设置sql转化为MR还是Spark,在CM中直接配
7、数据抽取
利用sqoop将业务数据库mysql中的数据抽取到hive的ODS层所对应的表中,由于当前是第一次操作,因此需要全量抽取。
# 访问咨询数据表抽取sql即--query的值:
select id,
create_date_time,
session_id,
sid,
create_time,
seo_source,
seo_keywords,
ip,
area,
country,
province,
city,
origin_channel,
user as user_match,
manual_time,
begin_time,
end_time,
last_customer_msg_time_stamp,
last_agent_msg_time_stamp,
reply_msg_count,
msg_count,
browser_name,
os_info,
"2021-09-24" as starts_time
from web_chat_ems_2019_07;
# 访问咨询数据表的附属表抽取sql
select
id,
referrer,
from_url,
landing_page_url,
url_title,
platform_description,
other_params,
history,
"2021-09-24" as start_time
from web_chat_text_ems_2019_07;
# 编写sqoop命令
# 导入访问咨询数据表
sqoop import \
--connect jdbc:mysql://hadoop01:3306/nev \
--username root \
--password 123456 \
--query 'select id,
create_date_time,
session_id,
sid,
create_time,
seo_source,
seo_keywords,
ip,
area,
country,
province,
city,
origin_channel,
user as user_match,
manual_time,
begin_time,
end_time,
last_customer_msg_time_stamp,
last_agent_msg_time_stamp,
reply_msg_count,
msg_count,
browser_name,
os_info,
"2021-09-24" as starts_time
from web_chat_ems_2019_07 where 1=1 and $CONDITIONS' \
--hcatalog-database itcast_ods \
--hcatalog-table web_chat_ems \
-m 1
# 注意--query外面有‘’,里面有“”,不能都是‘’或“”
# 导入访问咨询数据表的附属表
sqoop import \
--connect jdbc:mysql://hadoop01:3306/nev \
--username root \
--password 123456 \
--query 'select
id,
referrer,
from_url,
landing_page_url,
url_title,
platform_description,
other_params,
history,
"2021-09-24" as start_time
from web_chat_text_ems_2019_07 where 1=1 and $CONDITIONS' \
--hcatalog-database itcast_ods \
--hcatalog-table web_chat_text_ems \
-m 1
# 校验数据是否导入成功
查看mysql共计多少条数据?
select count(1) from web_chat_ems_2019_07; 211197
select count(1) from web_chat_text_ems_2019_07; 105599
查看hive中对应表有多少条数据?
select count(1) from itcast_ods.web_chat_ems; 211197
select count(1) from itcast_ods.web_chat_text_ems; 105599
查询其中一部分数据,观察数据映射是否ok
select * from itcast_ods.web_chat_ems limit 10;
select * from itcast_ods.web_chat_text_ems limit 10;
出现以下问题:
hiveserver2内存不足出现宕机。需要调整hiveserver2的内存大小
配置好后点击“保存更改”,然后进入到CM首页重启即可(CM可以自动监控配置的更改)
8、数据清洗转换
将ODS层数据进行ETL清洗,和少量维度退化,并将结果导入到DWD层的相应表中。
# 未转化的sql
select
wce.session_id,
wce.sid,
wce.create_time, --需要转换ODS层是string类型,DWD层是bigint类型这里需要将其转化为时间戳
wce.seo_source,
wce.ip,
wce.area,
wce.msg_count,
wce.origin_channel,
wcte.referrer,
wcte.from_url,
wcte.landing_page_url,
wcte.url_title,
wcte.platform_description,
wcte.other_params,
wcte.history,
wce.create_time as hourinfo, --需要转换
wce.create_time as yearinfo, --需要转换
wce.create_time as quarterinfo, --需要转换
wce.create_time as monthinfo, --需要转换
wce.create_time as dayinfo --需要转换
from itcast_ods.web_chat_ems wce inner join itcast_ods.web_chat_text_ems wcte
on wce.id = wcte.id;
1)unix_timestamp()日期转为时间戳
unix_timestamp() : 获取当前时间戳,返回10位bigint类型的数
例子:select unix_timestamp() -- 1689169289
unix_timestamp(string) : 获取指定时间的时间戳,输入的string时间必须是'yyyy-MM-dd HH:mm:ss'格式,如果不是则返回NULL。
例子:
select unix_timestamp('2019-08-15 16:40:00') --1565858400
select unix_timestamp('2019-08-15') --NULL
unix_timestamp(string,string指明时间的格式) : 获取指定时间的时间戳,输入的string可以是任意类型,后面指明其类型即可
例子:
select unix_timestamp('2019-08-15','yyyy-MM-dd') --1565798400
select unix_timestamp('2019-08-15 16:40:00','yyyy-MM-dd HH:mm:ss') --1565858400
select unix_timestamp('2019-08-15','yyyy-MM-dd HH:mm:ss') --NULL
2)强制类型转换
cast(1.58 as int)
3)from_unixtime()时间戳转为日期
该函数的输入必须是10位bigint类型的数,返回值为string类型的日期。
from_unixtime(bigint,string指明时间戳转换后日期的格式)
例子:
select from_unixtime(1565858389) --2019-08-15 16:39:49,不写转换格式默认为yyyy-MM-dd HH:mm:ss格式
select from_unixtime(1565858389,'yyyy-MM-dd HH:mm:ss') --2019-08-15 16:39:49
select from_unixtime(1565858389,'yyyy-MM-dd') --2019-08-15
输入必须是10位时间戳,如果不是10位必须先变为10位在转换:
例子:
select from_unixtime(1553184000488, 'yyyy-MM-dd HH:mm:ss') --51188-06-11 00:08:08 显然不对
select from_unixtime(cast(1553184000488/1000 as int),'yyyy-MM-dd HH:mm:ss') --2019-03-22 00:00:00
select from_unixtime(cast(substr(1553184000488,1,10) as int),'yyyy-MM-dd HH:mm:ss') --2019-03-22 00:00:00
# 获取当前时间
select from_unixtime(unix_timestamp(),'yyyy-MM-dd HH:mm:ss') -- 2019-08-15 17:18:55
4)日期转换函数
year() quarter() month() day() hour()
这种方式可以实现create_time字段的转换,但是我们想要的是7月显示07,而使用日期转换函数无法实现。此外,日期转换函数要求输入的日期必须是标准的。
5)字符串截取函数substr()
大多数数据库中都有substr和substring两种字符串截取函数。但与其他的关系型数据库不同,在hive中,substr与substring函数的使用方式是完全一致的,属于同一个函数。
substr(string类型日期,int开始截取位置) : 返回指定开始到结尾的字符串
例子:select substr('2020-06-06', 6); --06-06
substr(string类型日期, int开始截取位置, int截取长度) : 返回指定开始到指定结尾的字符串
例子:select substr('2020-06-06', 6,2); --06
通过substr()就可以实现上述日期转换函数无法实现的create_time字段的转换:
# 最终查询sql
select
wce.session_id,
wce.sid,
unix_timestamp(wce.create_time) as create_time,
wce.seo_source,
wce.ip,
wce.area,
wce.msg_count,
wce.origin_channel,
wcte.referrer,
wcte.from_url,
wcte.landing_page_url,
wcte.url_title,
wcte.platform_description,
wcte.other_params,
wcte.history,
substr(wce.create_time,12,2) as hourinfo,
substr(wce.create_time,1,4) as yearinfo,
quarter(wce.create_time) as quarterinfo,
substr(wce.create_time,6,2) as monthinfo,
substr(wce.create_time,9,2) as dayinfo
from itcast_ods.web_chat_ems wce inner join itcast_ods.web_chat_text_ems wcte
on wce.id = wcte.id;
可能会出现的错误:
错误原因:在执行转换操作时,由于需要进行两表联合查询,其中一个表的数据较少,此时hive会自动对其优化(通过explain命令可以看到当前的sql命令hive进行了map join优化,此优化会将数据量少的表数据存储在内存中进行),优化的过程会消耗内存,但是目前内存不足,导致出现内存溢出错误,这种错误的报错方式有两种:
第一种错误信息:return code 1
第二种错误信息:return code 137 或 Execution failed with exit status: 137
解决方案:关闭map join优化 set hive.auto.convert.join=false;
# 最终sql
# 目前select查询出的数据有多个分区,静态分区加载数据很麻烦,我们使用动态分区
--动态分区配置
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
--hive压缩
set hive.exec.compress.intermediate=true;
set hive.exec.compress.output=true;
--写入时压缩生效visit_consult_dwd表在建表时指定需要使用snappy方案压缩,需要设置写入压缩生效
set hive.exec.orc.compression.strategy=COMPRESSION;
insert into table itcast_dwd.visit_consult_dwd partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
wce.session_id,
wce.sid,
unix_timestamp(wce.create_time) as create_time,
wce.seo_source,
wce.ip,
wce.area,
wce.msg_count,
wce.origin_channel,
wcte.referrer,
wcte.from_url,
wcte.landing_page_url,
wcte.url_title,
wcte.platform_description,
wcte.other_params,
wcte.history,
substr(wce.create_time,12,2) as hourinfo,
substr(wce.create_time,1,4) as yearinfo,
quarter(wce.create_time) as quarterinfo,
substr(wce.create_time,6,2) as monthinfo,
substr(wce.create_time,9,2) as dayinfo
from itcast_ods.web_chat_ems wce inner join itcast_ods.web_chat_text_ems wcte
on wce.id = wcte.id;
9、数据分析
对DWD层的数据进行分析,并将分析结果导入DWS层
(1)访问量
除以上维度还有总访问量,共25个需求
# 统计每年的总访问量
insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
'-1' as area,
'-1' as seo_source,
'-1' as origin_channel,
'-1' as hourinfo,
yearinfo as time_str,
'-1' as from_url,
'5' as group_type,
'5' as time_type,
yearinfo,
'-1' as quarterinfo,
'-1' as monthinfo,
'-1' as dayinfo
from itcast_dwd.visit_consult_dwd
group by yearinfo;
-- 该数据存储在hdfs的yearinfo=2019,quarterinfo=-1,monthinfo=-1,dayinfo=-1的目录下
# 统计每年每季度的总访问量
insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
'-1' as area,
'-1' as seo_source,
'-1' as origin_channel,
'-1' as hourinfo,
concat_ws('_',yearinfo,quarterinfo) as time_str,
'-1' as from_url,
'5' as group_type,
'4' as time_type,
yearinfo,
quarterinfo,
'-1' as monthinfo,
'-1' as dayinfo
from itcast_dwd.visit_consult_dwd
group by yearinfo,quarterinfo;
# 统计每年每季度每月的总访问量
insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
'-1' as area,
'-1' as seo_source,
'-1' as origin_channel,
'-1' as hourinfo,
concat(yearinfo,'-',monthinfo) as time_str,
'-1' as from_url,
'5' as group_type,
'3' as time_type,
yearinfo,
quarterinfo,
monthinfo,
'-1' as dayinfo
from itcast_dwd.visit_consult_dwd
group by yearinfo,quarterinfo,monthinfo;
# 统计每年每季度每月每天的总访问量
insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
'-1' as area,
'-1' as seo_source,
'-1' as origin_channel,
'-1' as hourinfo,
concat(yearinfo,'-',monthinfo,'-',dayinfo) as time_str,
'-1' as from_url,
'5' as group_type,
'2' as time_type,
yearinfo,
quarterinfo,
monthinfo,
dayinfo
from itcast_dwd.visit_consult_dwd
group by yearinfo,quarterinfo,monthinfo,dayinfo;
# 统计每年每季度每月每天每小时的总访问量
insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
'-1' as area,
'-1' as seo_source,
'-1' as origin_channel,
hourinfo,
concat(yearinfo,'-',monthinfo,'-',dayinfo,' ',hourinfo) as time_str,
'-1' as from_url,
'5' as group_type,
'1' as time_type,
yearinfo,
quarterinfo,
monthinfo,
dayinfo
from itcast_dwd.visit_consult_dwd
group by yearinfo,quarterinfo,monthinfo,dayinfo,hourinfo;
# 统计每年各个受访页面的访问量
insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
'-1' as area,
'-1' as seo_source,
'-1' as origin_channel,
'-1' as hourinfo,
yearinfo as time_str,
from_url,
'4' as group_type,
'5' as time_type,
yearinfo,
'-1' as quarterinfo,
'-1' as monthinfo,
'-1' as dayinfo
from itcast_dwd.visit_consult_dwd
group by yearinfo,from_url;
# 统计每年每季度各个受访页面的访问量
insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
'-1' as area,
'-1' as seo_source,
'-1' as origin_channel,
'-1' as hourinfo,
concat(yearinfo,'_',quarterinfo) as time_str,
from_url,
'4' as group_type,
'4' as time_type,
yearinfo,
quarterinfo,
'-1' as monthinfo,
'-1' as dayinfo
from itcast_dwd.visit_consult_dwd
group by yearinfo,quarterinfo,from_url;
# 统计每年每季度每月各个受访页面的访问量
insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
'-1' as area,
'-1' as seo_source,
'-1' as origin_channel,
'-1' as hourinfo,
concat(yearinfo,'-',monthinfo) as time_str,
from_url,
'4' as group_type,
'3' as time_type,
yearinfo,
quarterinfo,
monthinfo,
'-1' as dayinfo
from itcast_dwd.visit_consult_dwd
group by yearinfo,quarterinfo,monthinfo,from_url;
# 统计每年每季度每月每天各个受访页面的访问量
insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
'-1' as area,
'-1' as seo_source,
'-1' as origin_channel,
'-1' as hourinfo,
concat(yearinfo,'-',monthinfo,'-',dayinfo) as time_str,
from_url,
'4' as group_type,
'2' as time_type,
yearinfo,
quarterinfo,
monthinfo,
dayinfo
from itcast_dwd.visit_consult_dwd
group by yearinfo,quarterinfo,monthinfo,dayinfo,from_url;
# 统计每年每季度每月每天每小时各个受访页面的访问量
insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
'-1' as area,
'-1' as seo_source,
'-1' as origin_channel,
hourinfo,
concat(yearinfo,'-',monthinfo,'-',dayinfo,' ',hourinfo) as time_str,
from_url,
'4' as group_type,
'1' as time_type,
yearinfo,
quarterinfo,
monthinfo,
dayinfo
from itcast_dwd.visit_consult_dwd
group by yearinfo,quarterinfo,monthinfo,dayinfo,hourinfo,from_url;
# 同理其他维度都差不多......见visit.sql文件
(2)咨询量
除上述维度外,还有总咨询量维度,共15个需求
# 统计每年的总咨询量
insert into table itcast_dws.consult_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
'-1' as area,
'-1' as origin_channel,
'-1' as hourinfo,
yearinfo as time_str,
'3' as group_type,
'5' as time_type,
yearinfo,
'-1' as quarterinfo,
'-1' as monthinfo,
'-1' as dayinfo
from itcast_dwd.visit_consult_dwd where msg_count >= 1
group by yearinfo;
# 统计每年每季度的总咨询量
insert into table itcast_dws.consult_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
'-1' as area,
'-1' as origin_channel,
'-1' as hourinfo,
concat(yearinfo,'_',quarterinfo) as time_str,
'3' as group_type,
'4' as time_type,
yearinfo,
quarterinfo,
'-1' as monthinfo,
'-1' as dayinfo
from itcast_dwd.visit_consult_dwd where msg_count >= 1
group by yearinfo,quarterinfo;
# 统计每年每季度每月的总咨询量
insert into table itcast_dws.consult_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
'-1' as area,
'-1' as origin_channel,
'-1' as hourinfo,
concat(yearinfo,'-',monthinfo) as time_str,
'3' as group_type,
'3' as time_type,
yearinfo,
quarterinfo,
monthinfo,
'-1' as dayinfo
from itcast_dwd.visit_consult_dwd where msg_count >= 1
group by yearinfo,quarterinfo,monthinfo;
# 统计每年每季度每月每天的总咨询量
insert into table itcast_dws.consult_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
'-1' as area,
'-1' as origin_channel,
'-1' as hourinfo,
concat(yearinfo,'-',monthinfo,'-',dayinfo) as time_str,
'3' as group_type,
'2' as time_type,
yearinfo,
quarterinfo,
monthinfo,
dayinfo
from itcast_dwd.visit_consult_dwd where msg_count >= 1
group by yearinfo,quarterinfo,monthinfo,dayinfo;
# 统计每年每季度每月每天每小时的总咨询量
insert into table itcast_dws.consult_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
'-1' as area,
'-1' as origin_channel,
hourinfo,
concat(yearinfo,'-',monthinfo,'-',dayinfo,' ',hourinfo) as time_str,
'3' as group_type,
'1' as time_type,
yearinfo,
quarterinfo,
monthinfo,
dayinfo
from itcast_dwd.visit_consult_dwd where msg_count >= 1
group by yearinfo,quarterinfo,monthinfo,dayinfo,hourinfo;
# 统计每年各个地区的咨询量
insert into table itcast_dws.consult_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
area,
'-1' as origin_channel,
'-1' as hourinfo,
yearinfo as time_str,
'1' as group_type,
'5' as time_type,
yearinfo,
'-1' as quarterinfo,
'-1' as monthinfo,
'-1' as dayinfo
from itcast_dwd.visit_consult_dwd where msg_count >= 1
group by yearinfo,area;
# 统计每年每季度各个地区的咨询量
insert into table itcast_dws.consult_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
area,
'-1' as origin_channel,
'-1' as hourinfo,
concat(yearinfo,'_',quarterinfo) as time_str,
'1' as group_type,
'4' as time_type,
yearinfo,
quarterinfo,
'-1' as monthinfo,
'-1' as dayinfo
from itcast_dwd.visit_consult_dwd where msg_count >= 1
group by yearinfo,quarterinfo,area;
# 统计每年每季度每月各个地区的咨询量
insert into table itcast_dws.consult_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
area,
'-1' as origin_channel,
'-1' as hourinfo,
concat(yearinfo,'-',monthinfo) as time_str,
'1' as group_type,
'3' as time_type,
yearinfo,
quarterinfo,
monthinfo,
'-1' as dayinfo
from itcast_dwd.visit_consult_dwd where msg_count >= 1
group by yearinfo,quarterinfo,monthinfo,area;
# 统计每年每季度每月每天各个地区的咨询量
insert into table itcast_dws.consult_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
area,
'-1' as origin_channel,
'-1' as hourinfo,
concat(yearinfo,'-',monthinfo,'-',dayinfo) as time_str,
'1' as group_type,
'2' as time_type,
yearinfo,
quarterinfo,
monthinfo,
dayinfo
from itcast_dwd.visit_consult_dwd where msg_count >= 1
group by yearinfo,quarterinfo,monthinfo,dayinfo,area;
# 统计每年每季度每月每天每小时各个地区的咨询量
insert into table itcast_dws.consult_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
area,
'-1' as origin_channel,
hourinfo,
concat(yearinfo,'-',monthinfo,'-',dayinfo,' ',hourinfo) as time_str,
'1' as group_type,
'1' as time_type,
yearinfo,
quarterinfo,
monthinfo,
dayinfo
from itcast_dwd.visit_consult_dwd where msg_count >= 1
group by yearinfo,quarterinfo,monthinfo,dayinfo,hourinfo,area;
# 同理其他维度都一样......见consult.sql文件
10、数据导出
利用sqoop将DWS层的数据导出到Mysql中
首先在Mysql中创建目标表:
create database scrm_bi default character set utf8mb4 collate utf8mb4_general_ci;
--访问量结果表
CREATE TABLE `visit_dws` (
sid_total int COMMENT '根据sid去重求count',
sessionid_total int COMMENT '根据sessionid去重求count',
ip_total int COMMENT '根据IP去重求count',
area varchar(32) COMMENT '区域信息',
seo_source varchar(32) COMMENT '搜索来源',
origin_channel varchar(32) COMMENT '来源渠道',
hourinfo varchar(32) COMMENT '小时信息',
time_str varchar(32) COMMENT '时间明细',
from_url varchar(32) comment '会话来源页面',
group_type varchar(32) COMMENT '产品属性类型:1.地区;2.搜索来源;3.来源渠道;4.会话来源页面;5.总访问量',
time_type varchar(32) COMMENT '时间聚合类型:1、按小时聚合;2、按天聚合;3、按月聚合;4、按季度聚合;5、按年聚合;',
yearinfo varchar(32) COMMENT '年信息',
quarterinfo varchar(32) COMMENT '季度',
monthinfo varchar(32) COMMENT '月信息',
dayinfo varchar(32) COMMENT '日信息'
);
--咨询量结果表
CREATE TABLE `consult_dws` (
sid_total int COMMENT '根据sid去重求count',
sessionid_total int COMMENT '根据sessionid去重求count',
ip_total int COMMENT '根据IP去重求count',
area varchar(32) COMMENT '区域信息',
origin_channel varchar(32) COMMENT '来源渠道',
hourinfo varchar(32) COMMENT '小时信息',
time_str varchar(32) COMMENT '时间明细',
group_type varchar(32) COMMENT '产品属性类型:1.地区;2.来源渠道;3.总访问量',
time_type varchar(32) COMMENT '时间聚合类型:1、按小时聚合;2、按天聚合;3、按月聚合;4、按季度聚合;5、按年聚合;',
yearinfo varchar(32) COMMENT '年信息',
quarterinfo varchar(32) COMMENT '季度',
monthinfo varchar(32) COMMENT '月信息',
dayinfo varchar(32) COMMENT '日信息'
);
然后执行导出操作:
导出咨询量数据时可能会出现的问题:出现中文乱码
# 导出咨询量数据
sqoop export \
--connect jdbc:mysql://hadoop01:3306/scrm_bi \
--username root \
--password 123456 \
--table consult_dws \
--hcatalog-database itcast_dws \
--hcatalog-table consult_dws \
-m 1
解决方法:
先清空表数据,然后重新导入
--connect "jdbc:mysql://hadoop01:3306/scrm_bi?useUnicode=true&characterEncoding=utf-8" \
sqoop export \
--connect "jdbc:mysql://hadoop01:3306/scrm_bi?useUnicode=true&characterEncoding=utf-8" \
--username root \
--password 123456 \
--table consult_dws \
--hcatalog-database itcast_dws \
--hcatalog-table consult_dws \
-m 1
# 注意--connect中出现特殊符号?,建议加上""。默认是拉丁编码不支持中文
导出访问量数据时可能会出现的错误:字段大小设置太小
# 导出访问量数据
sqoop export \
--connect "jdbc:mysql://hadoop01:3306/scrm_bi?useUnicode=true&characterEncoding=utf-8" \
--username root \
--password 123456 \
--table visit_dws \
--hcatalog-database itcast_dws \
--hcatalog-table visit_dws \
-m 1
可以看见报错信息只是说导出失败,并没有具体说为什么导出失败,这种错误一般不是语法错误(语法错误会具体指出错误原因),该错误是sqoop在执行MR后报出的错误,已经通过了sqoop的语法检查,此后的错误sqoop不知道。
如何查阅具体的错误信息呢?必须查看MR的运行日志
点击这里here查看详细日志(注意INFO是正常日志,我们需要找ERROR)
日志显示:from_url列的数据太长了,就是创表时该字段的长度设的太小了,导致加载不进去
解决方法:将from_url的varchar(32)变大varchar(1000),然后重新导入
至此,所有全量操作已经完成,后续的操作都是增量的操作,一般为按天进行增量操作。
11、增量流程-增量数据模拟
在实际生产环境下,业务数据库中的数据是以月为单位的,即一个月的数据会形成一张表,上述全量操作我们已经模拟了公司所有的历史数据分析工作,当然这里我们是以两张表进行分析,实际生产环境下可能会有上百张表。接下来只需要对前一天的数据进行增量统计即可。这里我们模拟一天的业务数据:
-- 模拟访问咨询数据表数据
create table web_chat_ems_2021_09 as
select * from web_chat_ems_2019_07 where create_time between '2019-07-01 00:00:00' and '2019-07-01 23:59:59';
-- 修改表中的create_time值模拟增量数据
update web_chat_ems_2021_09 set create_time = concat('2021-09-25 ',substr(create_time,12))
-- 模拟访问咨询数据表的附属表数据
create table web_chat_text_ems_2021_09 as
select
b.*
from (select * from web_chat_ems_2019_07 where create_time between '2019-07-01 00:00:00' and '2019-07-01 23:59:59') a, web_chat_text_ems_2019_07 b
where a.id = b.id
注意:虽然web_chat_ems_2021_09和web_chat_text_ems_2021_09表中只有一天的数据,但是在实际生产环境中从1号到25号的数据都有,全量操作已经分析完1号到24号了,这里需要分析25号的增量数据即可。
12、增量流程-增量数据抽取
利用sqoop将mysql中新增的数据抽取到ODS层
# 查询新增数据:
select id,
create_date_time,
session_id,
sid,
create_time,
seo_source,
seo_keywords,
ip,
area,
country,
province,
city,
origin_channel,
user as user_match,
manual_time,
begin_time,
end_time,
last_customer_msg_time_stamp,
last_agent_msg_time_stamp,
reply_msg_count,
msg_count,
browser_name,
os_info,
"2021-09-26" as starts_time
from web_chat_ems_2021_09 where create_time between '2021-09-25 00:00:00' and '2021-09-25 23:59:59';
select
temp2.*,
"2021-09-26" as start_time
from (select id from web_chat_ems_2021_09 where create_time between '2021-09-25 00:00:00' and '2021-09-25 23:59:59') temp1, web_chat_text_ems_2021_09 temp2
where temp1.id = temp2.id;
# sqoop导入命令:
# 导入访问咨询数据表
sqoop import \
--connect jdbc:mysql://hadoop01:3306/nev \
--username root \
--password 123456 \
--query 'select id,
create_date_time,
session_id,
sid,
create_time,
seo_source,
seo_keywords,
ip,
area,
country,
province,
city,
origin_channel,
user as user_match,
manual_time,
begin_time,
end_time,
last_customer_msg_time_stamp,
last_agent_msg_time_stamp,
reply_msg_count,
msg_count,
browser_name,
os_info,
"2021-09-26" as starts_time
from web_chat_ems_2021_09 where create_time between "2021-09-25 00:00:00" and "2021-09-25 23:59:59" and $CONDITIONS' \
--hcatalog-database itcast_ods \
--hcatalog-table web_chat_ems \
-m 1
# 注意--query外面有‘’,里面有“”,不能都是‘’或“”
# 导入访问咨询数据表的附属表
sqoop import \
--connect jdbc:mysql://hadoop01:3306/nev \
--username root \
--password 123456 \
--query 'select
temp2.*,
"2021-09-26" as start_time
from (select id from web_chat_ems_2021_09 where create_time between "2021-09-25 00:00:00" and "2021-09-25 23:59:59") temp1, web_chat_text_ems_2021_09 temp2
where temp1.id = temp2.id and $CONDITIONS' \
--hcatalog-database itcast_ods \
--hcatalog-table web_chat_text_ems \
-m 1
# 增量化操作是周期性的,上述代码执行完就没了,无法实现自动化任务调度,因此我们需要将其编写为shell脚本的方式自动获取前一天的时间,通过ozzie自动化执行:
shell脚本要求:能够实现自动获取前一天的数据,并且还支持抽取指定日期的数据
思考1:如何获取前一天日期?date命令(今天日期减1天)
# 获取今天的日期
date
date +'%Y%m%d' : 指定输出格式
# 获取指定日期的年月日格式输出
date -d '2014-11-12' +'%Y-%m-%d'
# 获取指定日期的星期(周几)格式输出
date --date='2014-11-23' +'%w'
# 获取上周日期
date -d '-1 week' +'%Y-%m-%d'
# 获取前一天日期
date -d '-1 day' +'%Y-%m-%d'
date --date='-24 hour' +'%Y-%m-%d %H:%M:%S'
思考2:如何让shell脚本接收外部的参数呢?$符号
$1 $2 : 接收传入的第一个参数与第二个参数
$# : 统计传入多少个参数
思考3:如何在shell中实现判断操作
需求:如果传递了参数,设置为该参数即可;如果没有传递参数,设置为上一天日期
#!/bin/bash
if [ $# == 1 ]; then
dateStr=$1 # 负值时不能有空格
else
dateStr=`date -d '-1 day' +'%Y-%m-%d'` # 命令需要用``括起来
fi
echo ${dateStr}
# shell脚本(Oozie安装在hadoop01上因此后续所有的shell脚本我们编写在hadoop01的/root目录下,当然编写在哪里都可以,最后都需要将shell脚本下载到windows下,然后上传至oozie)
cd /root/mode1
vim edu_mode_1_collect.sh (mode_1表示访问咨询主题,是第一个主题)
#!/bin/bash
SQOOP_HOME=/usr/bin/sqoop
if [ $# == 1 ]; then
dateStr=$1
else
dateStr=`date -d '-1 day' +'%Y-%m-%d'`
fi
dateNowStr=`date +'%Y-%m-%d'`
yearStr=`date -d ${dateStr} +'%Y'`
monthStr=`date -d ${dateStr} +'%m'`
# 公共参数
jdbcUrl='jdbc:mysql://hadoop01:3306/nev'
username='root'
password='123456'
m='1'
hivedatabase='itcast_ods'
${SQOOP_HOME} import \
--connect ${jdbcUrl} \
--username ${username} \
--password ${password} \
--query "select id,
create_date_time,
session_id,
sid,
create_time,
seo_source,
seo_keywords,
ip,
area,
country,
province,
city,
origin_channel,
user as user_match,
manual_time,
begin_time,
end_time,
last_customer_msg_time_stamp,
last_agent_msg_time_stamp,
reply_msg_count,
msg_count,
browser_name,
os_info,
'${dateNowStr}' as starts_time
from web_chat_ems_${yearStr}_${monthStr} where create_time between '${dateStr} 00:00:00' and '${dateStr} 23:59:59' and \$CONDITIONS" \
--hcatalog-database ${hivedatabase} \
--hcatalog-table web_chat_ems \
-m ${m}
# & # 上下两个sqoop并行执行
${SQOOP_HOME} import \
--connect ${jdbcUrl} \
--username ${username} \
--password ${password} \
--query "select
temp2.*,
'${dateNowStr}' as start_time
from (select id from web_chat_ems_${yearStr}_${monthStr} where create_time between '${dateStr} 00:00:00' and '${dateStr} 23:59:59') temp1, web_chat_text_ems_${yearStr}_${monthStr} temp2
where temp1.id = temp2.id and \$CONDITIONS" \
--hcatalog-database ${hivedatabase} \
--hcatalog-table web_chat_text_ems \
-m ${m}
# 注意在shell中''与""是不同的,''表示原样输出即''内不允许获取变量值${},""内允许获取变量值,因此如果我们写的语句没有${}过程使用'',如果有${}存在则必须使用""
# 注意一般在shell中公有的参数会转化为变量,方便后续重复使用
# 注意shell脚本默认是串行执行的,我们可以加入&使其并行执行,该脚本的两条sqoop命令是独立的,在电脑资源充足时可以选择并行执行
# 注意本项目已经将sqoop命令加入环境变量,因此这里直接写sqoop就可以运行,如果未来做项目时没有将sqoop命令加载至环境变量,此时在shell脚本中必须执行sqoop命令的绝对路径
# SQOOP_HOME=/usr/bin/sqoop sqoop import替换为${SQOOP_HOME} import
shell脚本的串行执行与并行执行:
sleep 5 &
echo "done"
加入&系统会将其之前的命令放入后台,而继续执行下面的命令。此脚本执行后会立即打印出"done",sleep命令被扔给后台执行,不会阻塞脚本执行。如果想要在进入下个循环前,必须等待上个后台命令执行完毕,可以使用wait命令:这样,需要等待5s后才能在屏幕上看到"done"。
sleep 5 &
wait
echo "done"
注意:只有多条命令之间没有依赖关系时,才可以并行执行。
# 测试脚本是否正常执行
sh edu_mode_1_collect.sh 2021-09-25
查看ODS层是否导入了新增数据:
select * from itcast_ods.web_chat_ems where starts_time = '2021-09-26' limit 10;
13、增量流程-增量数据抽取oozie实现自动调度(没配完)
(1)配置workflow工作流
当前shell脚本是在linux中,需要先下载到windows下,然后打开oozie将windows下的shell脚本上传至hdfs的某个目录下。需要保证shell脚本在hdfs中。
选中该脚本添加至oozie:
建议将同样的shell脚本添加两次:
然后保存即可:
(2)配置计划
点击运行启动计划即可
14、增量流程-增量数据清洗转换
将ODS层中新增的数据进行ETL清洗,和少量维度退化,并将结果追加至DWD层
# sql语句:
--动态分区配置
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
--hive压缩
set hive.exec.compress.intermediate=true;
set hive.exec.compress.output=true;
--写入时压缩生效visit_consult_dwd表在建表时指定需要使用snappy方案压缩,需要设置写入压缩生效
set hive.exec.orc.compression.strategy=COMPRESSION;
insert into table itcast_dwd.visit_consult_dwd partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
wce.session_id,
wce.sid,
unix_timestamp(wce.create_time) as create_time,
wce.seo_source,
wce.ip,
wce.area,
wce.msg_count,
wce.origin_channel,
wcte.referrer,
wcte.from_url,
wcte.landing_page_url,
wcte.url_title,
wcte.platform_description,
wcte.other_params,
wcte.history,
substr(wce.create_time,12,2) as hourinfo,
substr(wce.create_time,1,4) as yearinfo,
quarter(wce.create_time) as quarterinfo,
substr(wce.create_time,6,2) as monthinfo,
substr(wce.create_time,9,2) as dayinfo
from (select * from itcast_ods.web_chat_ems where starts_time = '2021-09-26') wce inner join (select * from itcast_ods.web_chat_text_ems where start_time = '2021-09-26') wcte
on wce.id = wcte.id;
-- 注意在涉及多表联合查询时,一般先查询出各表的增量数据然后在建立连接,这样建立连接时只有增量数据连接加快了执行效率;如果没有查询而直接先建立连接然后在筛选增量数据,这样多表连接时会有很庞大的数据量(大多数的join都是无用的),执行速度很慢,这就是sql的优化
# shell脚本:
思考:如何在shell下执行hive的sql呢?
hive -e | -f "sql语句 | sql文件" -S
-S : 静态化执行,不会显示日志信息(即不会显示Map 0% Reduce 0%)
shell脚本要求:如果没有提供参数则清洗抽取时间为今天的数据,如果指定了参数则清洗抽取时间为指定时间的数据
cd /root
vim edu_mode_1_handle.sh
#!/bin/bash
HIVE_HOME=/usr/bin/hive
if [ $# == 1 ]; then
dateStr=$1
else
dateStr=`date +'%Y-%m-%d'`
fi
sqlStr="
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.compress.intermediate=true;
set hive.exec.compress.output=true;
set hive.exec.orc.compression.strategy=COMPRESSION;
insert into table itcast_dwd.visit_consult_dwd partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
wce.session_id,
wce.sid,
unix_timestamp(wce.create_time) as create_time,
wce.seo_source,
wce.ip,
wce.area,
wce.msg_count,
wce.origin_channel,
wcte.referrer,
wcte.from_url,
wcte.landing_page_url,
wcte.url_title,
wcte.platform_description,
wcte.other_params,
wcte.history,
substr(wce.create_time,12,2) as hourinfo,
substr(wce.create_time,1,4) as yearinfo,
quarter(wce.create_time) as quarterinfo,
substr(wce.create_time,6,2) as monthinfo,
substr(wce.create_time,9,2) as dayinfo
from (select * from itcast_ods.web_chat_ems where starts_time = '${dateStr}') wce inner join (select * from itcast_ods.web_chat_text_ems where start_time = '${dateStr}') wcte
on wce.id = wcte.id;"
${HIVE_HOME} -e "${sqlStr}" -S
# 测试脚本是否可以正常执行:sh edu_mode_1_handle.sh
select * from itcast_dwd.visit_consult_dwd where yearinfo = '2021' and monthinfo = '09' and dayinfo = '25' limit 10;
15、增量流程-增量数据清洗转换oozie自动化调度
点击保存即可。
16、增量流程-增量数据分析
注意:假设当前以年统计,且dws层中的数据已经统计了2018,2019,2020年的数据,2021年的数据统计至2021-09-24,现在新增了2021-09-25号的数据,我们只需要对2021年的数据进行重新统计即可,新增的数据对2018,2019,2020年的统计结果并没有影响。同理按季度统计时,只需要重新统计新增数据所在季度的数据即可,其他季度的数据不需要重新统计。
说明:
在按照年统计的时候,只需要统计加上这一天以后这一年的数据即可,之前年的数据不需要重新统计;在按照季度统计的时候,只需要统计加上这一天以后这一年中所在季度的数据即可,之前季度的数据不需要重新统计;在按照月统计的时候,只需要统计加上这一天以后这一年中所在季度所在月的数据即可,其他月的数据不需要重新统计;在按照天统计的时候,只需要统计新增数据即可,其他天的数据不需要统计;在按照小时统计的时候,只需要统计新增数据的各个小时即可,其他天的数据不需要统计。
思考:在统计的过程中,比如以年统计,得到一个新的年的统计结果,但是DWS层中还有一个历史统计结果,这不就冲突了吗?例如按年统计DWS层中已经对2021-01-01--2021-09-24的数据统计了,现在又加入2021-09-25数据重新统计了2021年的数据,显然,历史统计结果就没用了。
解决方法:删除受影响的历史统计结果(只有按年,季度,月统计时存在受影响的历史统计结果)
思考:如何删除受影响的历史统计结果?注意hive不支持直接对表中某条数据的删除(delete,update都不支持)
解决方法:可以通过删除分区的方案进行处理(注意一定是先删除历史统计结果,然后在insert into新统计结果)
alter table 表名 drop partition(key=value,.....);
例子:
删除2021年历史统计结果
alter table itcast_dws.visit_dws drop partition(yearinfo='2021',quarterinfo='-1',monthinfo='-1',dayinfo='-1');
alter table itcast_dws.consult_dws drop partition(yearinfo='2021',quarterinfo='-1',monthinfo='-1',dayinfo='-1');
删除2021年第3季度的历史统计结果
alter table itcast_dws.visit_dws drop partition(yearinfo='2021',quarterinfo='3',monthinfo='-1',dayinfo='-1');
alter table itcast_dws.consult_dws drop partition(yearinfo='2021',quarterinfo='3',monthinfo='-1',dayinfo='-1');
删除2021年第3季度9月份的历史统计结果
alter table itcast_dws.visit_dws drop partition(yearinfo='2021',quarterinfo='3',monthinfo='09',dayinfo='-1');
alter table itcast_dws.consult_dws drop partition(yearinfo='2021',quarterinfo='3',monthinfo='09',dayinfo='-1');
(1)访问量-增量分析sql
# 以总访问量为例
insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
'-1' as area,
'-1' as seo_source,
'-1' as origin_channel,
'-1' as hourinfo,
yearinfo as time_str,
'-1' as from_url,
'5' as group_type,
'5' as time_type,
yearinfo,
'-1' as quarterinfo,
'-1' as monthinfo,
'-1' as dayinfo
from itcast_dwd.visit_consult_dwd where yearinfo = '2021'
group by yearinfo;
insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
'-1' as area,
'-1' as seo_source,
'-1' as origin_channel,
'-1' as hourinfo,
concat_ws('_',yearinfo,quarterinfo) as time_str,
'-1' as from_url,
'5' as group_type,
'4' as time_type,
yearinfo,
quarterinfo,
'-1' as monthinfo,
'-1' as dayinfo
from itcast_dwd.visit_consult_dwd where yearinfo = '2021' and quarterinfo = '3'
group by yearinfo,quarterinfo;
insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
'-1' as area,
'-1' as seo_source,
'-1' as origin_channel,
'-1' as hourinfo,
concat(yearinfo,'-',monthinfo) as time_str,
'-1' as from_url,
'5' as group_type,
'3' as time_type,
yearinfo,
quarterinfo,
monthinfo,
'-1' as dayinfo
from itcast_dwd.visit_consult_dwd where yearinfo = '2021' and quarterinfo = '3' and monthinfo = '09'
group by yearinfo,quarterinfo,monthinfo;
insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
'-1' as area,
'-1' as seo_source,
'-1' as origin_channel,
'-1' as hourinfo,
concat(yearinfo,'-',monthinfo,'-',dayinfo) as time_str,
'-1' as from_url,
'5' as group_type,
'2' as time_type,
yearinfo,
quarterinfo,
monthinfo,
dayinfo
from itcast_dwd.visit_consult_dwd where yearinfo = '2021' and quarterinfo = '3' and monthinfo = '09' and dayinfo = '25'
group by yearinfo,quarterinfo,monthinfo,dayinfo;
insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
'-1' as area,
'-1' as seo_source,
'-1' as origin_channel,
hourinfo,
concat(yearinfo,'-',monthinfo,'-',dayinfo,' ',hourinfo) as time_str,
'-1' as from_url,
'5' as group_type,
'1' as time_type,
yearinfo,
quarterinfo,
monthinfo,
dayinfo
from itcast_dwd.visit_consult_dwd where yearinfo = '2021' and quarterinfo = '3' and monthinfo = '09' and dayinfo = '25'
group by yearinfo,quarterinfo,monthinfo,dayinfo,hourinfo;
其他维度都一样
(2)咨询量-增量分析sql
# 以地区维度为例
insert into table itcast_dws.consult_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
area,
'-1' as origin_channel,
'-1' as hourinfo,
yearinfo as time_str,
'1' as group_type,
'5' as time_type,
yearinfo,
'-1' as quarterinfo,
'-1' as monthinfo,
'-1' as dayinfo
from itcast_dwd.visit_consult_dwd where msg_count >= 1 and yearinfo = '2021'
group by yearinfo,area;
insert into table itcast_dws.consult_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
area,
'-1' as origin_channel,
'-1' as hourinfo,
concat(yearinfo,'_',quarterinfo) as time_str,
'1' as group_type,
'4' as time_type,
yearinfo,
quarterinfo,
'-1' as monthinfo,
'-1' as dayinfo
from itcast_dwd.visit_consult_dwd where msg_count >= 1 and yearinfo = '2021' and quarterinfo = '3'
group by yearinfo,quarterinfo,area;
insert into table itcast_dws.consult_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
area,
'-1' as origin_channel,
'-1' as hourinfo,
concat(yearinfo,'-',monthinfo) as time_str,
'1' as group_type,
'3' as time_type,
yearinfo,
quarterinfo,
monthinfo,
'-1' as dayinfo
from itcast_dwd.visit_consult_dwd where msg_count >= 1 and yearinfo = '2021' and quarterinfo = '3' and monthinfo = '09'
group by yearinfo,quarterinfo,monthinfo,area;
insert into table itcast_dws.consult_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
area,
'-1' as origin_channel,
'-1' as hourinfo,
concat(yearinfo,'-',monthinfo,'-',dayinfo) as time_str,
'1' as group_type,
'2' as time_type,
yearinfo,
quarterinfo,
monthinfo,
dayinfo
from itcast_dwd.visit_consult_dwd where msg_count >= 1 and yearinfo = '2021' and quarterinfo = '3' and monthinfo = '09' and dayinfo = '25'
group by yearinfo,quarterinfo,monthinfo,dayinfo,area;
insert into table itcast_dws.consult_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
area,
'-1' as origin_channel,
hourinfo,
concat(yearinfo,'-',monthinfo,'-',dayinfo,' ',hourinfo) as time_str,
'1' as group_type,
'1' as time_type,
yearinfo,
quarterinfo,
monthinfo,
dayinfo
from itcast_dwd.visit_consult_dwd where msg_count >= 1 and yearinfo = '2021' and quarterinfo = '3' and monthinfo = '09' and dayinfo = '25'
group by yearinfo,quarterinfo,monthinfo,dayinfo,hourinfo,area;
其他维度都一样
(3)shell脚本
要求:如果指定了日期参数,则统计分析该日期即可;如果没有指定日期参数,则统计上一期的日期
注意:
cd /root
vim edu_mode_1_analyse.sh(这里的shell不完整)
#!/bin/bash
HIVE_HOME=/usr/bin/hive
if [ $# == 1 ]; then
dateStr=$1
else
dateStr=`date -d '-1 day' +'%Y-%m-%d'`
fi
yearStr=`date -d ${dateStr} +'%Y'`
monthStr=`date -d ${dateStr} +'%m'`
month_for_quarter=`date -d ${dateStr} +'%-m'`
quarterStr=$(((${month_for_quarter}-1)/3+1)) # 在shell中进行数据计算,需要用()括起来,然后用$()获取计算结果
dayStr=`date -d ${dateStr} +'%d'`
# quarterStr=`quarter(${dateStr})`不对,quarter是hive中的函数,shell实质是运行在linux中的,因此必须使用Linux的命令,而linux中并没有quarter()函数
# 季度需要通过月份计算
sqlStr="
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.compress.intermediate=true;
set hive.exec.compress.output=true;
set hive.exec.orc.compression.strategy=COMPRESSION;
alter table itcast_dws.visit_dws drop partition(yearinfo='${yearStr}',quarterinfo='-1',monthinfo='-1',dayinfo='-1');
alter table itcast_dws.visit_dws drop partition(yearinfo='${yearStr}',quarterinfo='${quarterStr}',monthinfo='-1',dayinfo='-1');
alter table itcast_dws.visit_dws drop partition(yearinfo='${yearStr}',quarterinfo='${quarterStr}',monthinfo='${monthStr}',dayinfo='-1');
alter table itcast_dws.consult_dws drop partition(yearinfo='${yearStr}',quarterinfo='-1',monthinfo='-1',dayinfo='-1');
alter table itcast_dws.consult_dws drop partition(yearinfo='${yearStr}',quarterinfo='${quarterStr}',monthinfo='-1',dayinfo='-1');
alter table itcast_dws.consult_dws drop partition(yearinfo='${yearStr}',quarterinfo='${quarterStr}',monthinfo='${monthStr}',dayinfo='-1');
insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
'-1' as area,
'-1' as seo_source,
'-1' as origin_channel,
'-1' as hourinfo,
yearinfo as time_str,
'-1' as from_url,
'5' as group_type,
'5' as time_type,
yearinfo,
'-1' as quarterinfo,
'-1' as monthinfo,
'-1' as dayinfo
from itcast_dwd.visit_consult_dwd where yearinfo = '${yearStr}'
group by yearinfo;
insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
'-1' as area,
'-1' as seo_source,
'-1' as origin_channel,
'-1' as hourinfo,
concat_ws('_',yearinfo,quarterinfo) as time_str,
'-1' as from_url,
'5' as group_type,
'4' as time_type,
yearinfo,
quarterinfo,
'-1' as monthinfo,
'-1' as dayinfo
from itcast_dwd.visit_consult_dwd where yearinfo = '${yearStr}' and quarterinfo = '${quarterStr}'
group by yearinfo,quarterinfo;
insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
'-1' as area,
'-1' as seo_source,
'-1' as origin_channel,
'-1' as hourinfo,
concat(yearinfo,'-',monthinfo) as time_str,
'-1' as from_url,
'5' as group_type,
'3' as time_type,
yearinfo,
quarterinfo,
monthinfo,
'-1' as dayinfo
from itcast_dwd.visit_consult_dwd where yearinfo = '${yearStr}' and quarterinfo = '${quarterStr}' and monthinfo = '${monthStr}'
group by yearinfo,quarterinfo,monthinfo;
insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
'-1' as area,
'-1' as seo_source,
'-1' as origin_channel,
'-1' as hourinfo,
concat(yearinfo,'-',monthinfo,'-',dayinfo) as time_str,
'-1' as from_url,
'5' as group_type,
'2' as time_type,
yearinfo,
quarterinfo,
monthinfo,
dayinfo
from itcast_dwd.visit_consult_dwd where yearinfo = '${yearStr}' and quarterinfo = '${quarterStr}' and monthinfo = '${monthStr}' and dayinfo = '${dayStr}'
group by yearinfo,quarterinfo,monthinfo,dayinfo;
insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
'-1' as area,
'-1' as seo_source,
'-1' as origin_channel,
hourinfo,
concat(yearinfo,'-',monthinfo,'-',dayinfo,' ',hourinfo) as time_str,
'-1' as from_url,
'5' as group_type,
'1' as time_type,
yearinfo,
quarterinfo,
monthinfo,
dayinfo
from itcast_dwd.visit_consult_dwd where yearinfo = '${yearStr}' and quarterinfo = '${quarterStr}' and monthinfo = '${monthStr}' and dayinfo = '${dayStr}'
group by yearinfo,quarterinfo,monthinfo,dayinfo,hourinfo;
insert into table itcast_dws.consult_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
area,
'-1' as origin_channel,
'-1' as hourinfo,
yearinfo as time_str,
'1' as group_type,
'5' as time_type,
yearinfo,
'-1' as quarterinfo,
'-1' as monthinfo,
'-1' as dayinfo
from itcast_dwd.visit_consult_dwd where msg_count >= 1 and yearinfo = '${yearStr}'
group by yearinfo,area;
insert into table itcast_dws.consult_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
area,
'-1' as origin_channel,
'-1' as hourinfo,
concat(yearinfo,'_',quarterinfo) as time_str,
'1' as group_type,
'4' as time_type,
yearinfo,
quarterinfo,
'-1' as monthinfo,
'-1' as dayinfo
from itcast_dwd.visit_consult_dwd where msg_count >= 1 and yearinfo = '${yearStr}' and quarterinfo = '${quarterStr}'
group by yearinfo,quarterinfo,area;
insert into table itcast_dws.consult_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
area,
'-1' as origin_channel,
'-1' as hourinfo,
concat(yearinfo,'-',monthinfo) as time_str,
'1' as group_type,
'3' as time_type,
yearinfo,
quarterinfo,
monthinfo,
'-1' as dayinfo
from itcast_dwd.visit_consult_dwd where msg_count >= 1 and yearinfo = '${yearStr}' and quarterinfo = '${quarterStr}' and monthinfo = '${monthStr}'
group by yearinfo,quarterinfo,monthinfo,area;
insert into table itcast_dws.consult_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
area,
'-1' as origin_channel,
'-1' as hourinfo,
concat(yearinfo,'-',monthinfo,'-',dayinfo) as time_str,
'1' as group_type,
'2' as time_type,
yearinfo,
quarterinfo,
monthinfo,
dayinfo
from itcast_dwd.visit_consult_dwd where msg_count >= 1 and yearinfo = '${yearStr}' and quarterinfo = '${quarterStr}' and monthinfo = '${monthStr}' and dayinfo = '${dayStr}'
group by yearinfo,quarterinfo,monthinfo,dayinfo,area;
insert into table itcast_dws.consult_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
count(distinct sid) as sid_total,
count(distinct session_id) as sessionid_total,
count(distinct ip) as ip_total,
area,
'-1' as origin_channel,
hourinfo,
concat(yearinfo,'-',monthinfo,'-',dayinfo,' ',hourinfo) as time_str,
'1' as group_type,
'1' as time_type,
yearinfo,
quarterinfo,
monthinfo,
dayinfo
from itcast_dwd.visit_consult_dwd where msg_count >= 1 and yearinfo = '${yearStr}' and quarterinfo = '${quarterStr}' and monthinfo = '${monthStr}' and dayinfo = '${dayStr}'
group by yearinfo,quarterinfo,monthinfo,dayinfo,hourinfo,area;
"
${HIVE_HOME} -e "${sqlStr}" -S
# 测试是否执行成功
sh edu_mode_1_analyse.sh 2021-09-25
select * from itcast_dws.visit_dws where time_type = '2' and group_type = '5' # 增量操作之前,没有2021-09-25的数据
select * from itcast_dws.visit_dws where time_type = '2' and group_type = '5' # 增量操作之后,有2021-09-25的数据了
select * from itcast_dws.consult_dws where time_type = '2' and group_type = '3' # 增量操作之前
select * from itcast_dws.consult_dws where time_type = '2' and group_type = '3' # 增量操作之后
17、增量流程-增量数据分析oozie自动化调度
18、增量流程-增量数据导出
由于mysql中的数据与DWS层数据完全一样,在mysql中已经导出了历史的统计数据,因此在MySQL中需要先删除受影响的历史统计结果(注意mysql支持数据的删除操作delete),然后增量导出新的结果。
为了简化操作,由于受影响最大的范围是当年的数据,我们可以直接将当年的统计结果数据全部删除,然后重新导出;以2021-09-25日新增数据为例,我们可以将2021年受影响的数据及不受影响的数据全部删除,重新导出。因为DWS层的数据是结果数据,本身数据量比较小,通过全部删除的方式可以有助于sql脚本的编写,同时效率也不会有太大影响。
思考:如何删除全年的数据?以2021年为例
delete from scrm_bi.visit_dws where yearinfo = '2021';
delete from scrm_bi.consult_dws where yearinfo = '2021';
# shell脚本
要求:如果指定了日期参数,则删除该日期所对应年的所有数据,重新导出该年的数据;如果没有指定日期参数,则删除前一天所对应年的所有数据,重新导出所删年的数据。
cd /root
vim edu_mode_1_export.sh
#!/bin/bash
SQOOP_HOME=/usr/bin/sqoop
if [ $# == 1 ]; then
dateStr=$1
else
dateStr=`date -d '-1 day' +'%Y-%m-%d'`
fi
yearStr=`date -d ${dateStr} +'%Y'`
mysql -uroot -p123456 -hhadoop01 -P3306 -e "delete from scrm_bi.visit_dws where yearinfo = '${yearStr}'; delete from scrm_bi.consult_dws where yearinfo = '${yearStr}';"
jdbcUrl='jdbc:mysql://hadoop01:3306/scrm_bi?useUnicode=true&characterEncoding=utf-8'
username='root'
password='123456'
m='1'
hivedatabase='itcast_dws'
${SQOOP_HOME} export \
--connect ${jdbcUrl} \
--username ${username} \
--password ${password} \
--table visit_dws \
--hcatalog-database ${hivedatabase} \
--hcatalog-table visit_dws \
--hcatalog-partition-keys yearinfo \
--hcatalog-partition-values ${yearStr} \ # 增量导出,只导出该分区下的数据
-m ${m}
${SQOOP_HOME} export \
--connect ${jdbcUrl} \
--username ${username} \
--password ${password} \
--table consult_dws \
--hcatalog-database ${hivedatabase} \
--hcatalog-table consult_dws \
--hcatalog-partition-keys yearinfo \
--hcatalog-partition-values ${yearStr} \
-m ${m}
# 测试是否成功执行导出操作
sh edu_mode_1_export.sh 2021-09-25
select * from scrm_bi.visit_dws where time_type = '2' and group_type = '5' # 增量操作之前
select * from scrm_bi.visit_dws where time_type = '2' and group_type = '5' # 增量操作之后
select * from scrm_bi.consult_dws where time_type = '2' and group_type = '3' # 增量操作之前
select * from scrm_bi.consult_dws where time_type = '2' and group_type = '3' # 增量操作之后
19、增量流程-增量数据导出oozie自动化调度
至此访问咨询主题模块完结
注意:
(1)交换空间
交换空间的意思是当内存不够用时,系统会自动将磁盘的一部分空间转化为内存使用,相当于扩充内存,当然交换空间的效率不如实际内存。
(2)shell的运行
方式一:如果shell脚本没有x权限,需要chmod加入x权限然后运行
方式二:如果shell脚本没有x权限,利用sh命令可以直接运行,sh shell脚本路径
(3)Oozie默认内部时区不是上海,需要在hue的配置项中搜索time,重新配置时区为Asia/Shanghai
更多推荐
所有评论(0)