目录

        5、建模操作

        6、hive的基础优化

        (1)HDFS的副本数量

        (2)yarn的基础配置

        (3)MapReduce基础配置

        (4)hive基础配置

        (5)hive的压缩配置

        (6)Hive执行引擎

        7、数据抽取

        8、数据清洗转换

        9、数据分析

        (1)访问量

        (2)咨询量

        10、数据导出

        11、增量流程-增量数据模拟

        12、增量流程-增量数据抽取

        13、增量流程-增量数据抽取oozie实现自动调度(没配完)

        14、增量流程-增量数据清洗转换

        15、增量流程-增量数据清洗转换oozie自动化调度

        16、增量流程-增量数据分析

        17、增量流程-增量数据分析oozie自动化调度

        18、增量流程-增量数据导出

        19、增量流程-增量数据导出oozie自动化调度


        5、建模操作

        ODS层:

        --创建数据库

        CREATE DATABASE IF NOT EXISTS `itcast_ods`;

        --写入时压缩生效

        set hive.exec.orc.compression.strategy=COMPRESSION;

        --访问咨询数据表

--在这里创建的表名就不用写时间,后续mysql中所有新增的相关数据都统一存储在web_chat_ems中
CREATE EXTERNAL TABLE IF NOT EXISTS itcast_ods.web_chat_ems (
  id INT comment '主键',
  create_date_time STRING comment '数据创建时间',
  session_id STRING comment '七陌sessionId',
  sid STRING comment '访客id',
  create_time STRING comment '会话创建时间',
  seo_source STRING comment '搜索来源',
  seo_keywords STRING comment '关键字',
  ip STRING comment 'IP地址',
  area STRING comment '地域',
  country STRING comment '所在国家',
  province STRING comment '省',
  city STRING comment '城市',
  origin_channel STRING comment '投放渠道',
  user_match STRING comment '所属坐席',         --mysql中的字段为user与这里不同,这是因为user在hive中是一个关键词,因此换了名字
  manual_time STRING comment '人工开始时间',
  begin_time STRING comment '坐席领取时间 ',
  end_time STRING comment '会话结束时间',
  last_customer_msg_time_stamp STRING comment '客户最后一条消息的时间',
  last_agent_msg_time_stamp STRING comment '坐席最后一下回复的时间',
  reply_msg_count INT comment '客服回复消息数',
  msg_count INT comment '客户发送消息数',
  browser_name STRING comment '浏览器名称',
  os_info STRING comment '系统名称')
comment '访问会话信息表'
PARTITIONED BY(starts_time STRING)    --按照抽取时间分区
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
stored as orc
location '/user/hive/warehouse/itcast_ods.db/web_chat_ems_ods'
TBLPROPERTIES ('orc.compress'='ZLIB');     --ODS层的数据较庞大,写操作很少,因此使用压缩率较高的zlib压缩方式

        --访问咨询数据表的附属表

CREATE EXTERNAL TABLE IF NOT EXISTS itcast_ods.web_chat_text_ems (
  id INT COMMENT '主键来自MySQL',
  referrer STRING comment '上级来源页面',
  from_url STRING comment '会话来源页面',
  landing_page_url STRING comment '访客着陆页面',
  url_title STRING comment '咨询页面title',
  platform_description STRING comment '客户平台信息',
  other_params STRING comment '扩展字段中数据',
  history STRING comment '历史访问记录'
) comment 'EMS-PV测试表'
PARTITIONED BY(start_time STRING)
ROW FORMAT DELIMITED 
FIELDS TERMINATED BY '\t'
stored as orc
location '/user/hive/warehouse/itcast_ods.db/web_chat_text_ems_ods'
TBLPROPERTIES ('orc.compress'='ZLIB');

         注意:ODS层创建的表一般为外部表,因为ODS层的作用是历史数据的积存工作,数据不允许修改,外部表的数据是不允许修改的。字段名不要采用关键字。其他层的表一般为内部表,因为这些层的表是我们从ODS层抽取出来的,我们对其有绝对控制权。

        DWD层:

        CREATE DATABASE IF NOT EXISTS `itcast_dwd`;

create table if not exists itcast_dwd.visit_consult_dwd(
  session_id STRING comment '七陌sessionId',
  sid STRING comment '访客id',
  create_time bigint comment '会话创建时间',
  seo_source STRING comment '搜索来源',
  ip STRING comment 'IP地址',
  area STRING comment '地域',
  msg_count int comment '客户发送消息数',
  origin_channel STRING COMMENT '来源渠道',
  referrer STRING comment '上级来源页面',
  from_url STRING comment '会话来源页面',
  landing_page_url STRING comment '访客着陆页面',
  url_title STRING comment '咨询页面title',
  platform_description STRING comment '客户平台信息',
  other_params STRING comment '扩展字段中数据',
  history STRING comment '历史访问记录',
  hourinfo string comment '小时'
)
comment '访问咨询DWD表'
partitioned by(yearinfo String, quarterinfo string, monthinfo String, dayinfo string)  --这里加不加季度分区都可以,只是目录层级多了一层
row format delimited fields terminated by '\t'
stored as orc
location '/user/hive/warehouse/itcast_dwd.db/visit_consult_dwd'
tblproperties ('orc.compress'='SNAPPY');

        DWS层:

        CREATE DATABASE IF NOT EXISTS `itcast_dws`;

        --访问量统计结果表

CREATE TABLE IF NOT EXISTS itcast_dws.visit_dws (
  sid_total INT COMMENT '根据sid去重求count',
  sessionid_total INT COMMENT '根据sessionid去重求count',
  ip_total INT COMMENT '根据IP去重求count',
  area STRING COMMENT '区域信息',
  seo_source STRING COMMENT '搜索来源',
  origin_channel STRING COMMENT '来源渠道',
  hourinfo STRING COMMENT '创建时间,统计至小时',
  time_str STRING COMMENT '时间明细',
  from_url STRING comment '会话来源页面',
  group_type STRING COMMENT '产品属性类型:1.地区;2.搜索来源;3.来源渠道;4.会话来源页面;5.总访问量',
  time_type STRING COMMENT '时间聚合类型:1、按小时聚合;2、按天聚合;3、按月聚合;4、按季度聚合;5、按年聚合;')
comment 'EMS访客日志dws表'
PARTITIONED BY(yearinfo STRING,quarterinfo STRING,monthinfo STRING,dayinfo STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
stored as orc
location '/user/hive/warehouse/itcast_dws.db/visit_dws'
TBLPROPERTIES ('orc.compress'='SNAPPY');

        --咨询量统计结果表

CREATE TABLE IF NOT EXISTS itcast_dws.consult_dws
(
  sid_total INT COMMENT '根据sid去重求count',
  sessionid_total INT COMMENT '根据sessionid去重求count',
  ip_total INT COMMENT '根据IP去重求count',
  area STRING COMMENT '区域信息',
  origin_channel STRING COMMENT '来源渠道',
  hourinfo STRING COMMENT '创建时间,统计至小时',
  time_str STRING COMMENT '时间明细',
  group_type STRING COMMENT '产品属性类型:1.地区;2.来源渠道;3.总咨询量',
  time_type STRING COMMENT '时间聚合类型:1、按小时聚合;2、按天聚合;3、按月聚合;4、按季度聚合;5、按年聚合;'
)
COMMENT '咨询量DWS宽表'
PARTITIONED BY (yearinfo string, quarterinfo STRING, monthinfo STRING, dayinfo string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS ORC
LOCATION '/user/hive/warehouse/itcast_dws.db/consult_dws'
TBLPROPERTIES ('orc.compress'='SNAPPY');
        6、hive的基础优化
        (1)HDFS的副本数量

        默认情况下hdfs的副本有3个,实际生产环境中一般副本也是3个。如果数据不是特别重要,也可以设置为2个副本。注意hdfs比较灵活不仅仅可以统一设置副本数,它支持针对某一个文件设置其副本数,一般情况下都是统一设置。如果使用hadoop3.x以上的版本,hdfs支持设置副本数为1.5,其中0.5不是指数据存储了一半,而是指将该数据以纠删码形式存储(纠删码是hadoop3.x以上版本中推出的一种特殊格式,如果数据出现丢失,通过纠删码可以使其恢复),以这种形式存储数据只占用原数据的一半大小,因此叫0.5。但是使用了纠删码会消耗大量的内存和其他资源。

        设置副本数:在CM中可以直接进行全局配置

        打开CM,进入到hdfs的配置项中,搜索“复制因子”

        (2)yarn的基础配置

        yarn用于资源的分配(资源:内存,cpu)

        yarn不管理磁盘,磁盘是由hdfs分配管理的:

        当一个mapreduce任务执行时,NodeManager提供内存和cpu,DataNode用于提供磁盘,它们管理的资源不同因此可以在一台服务器上同时拥有NodeManager和DataNode,不会冲突。

        1)CPU的配置

        yarn开辟的WEB UI是ResourceManager提供的,所以yarn的WEB UI数据都是总共的数据。

        注意:每一个NodeManager会自动向ResourceManager报告自己当前节点有多少核cpu,默认情况下报告8核,即使实际情况各节点不是8核NodeManager也报告8核,因为yarn不会自动校验每一个节点有多少核cpu。这显然是不合适的。由于这种特殊的机制导致我们在不借助其他自动校验工具时,可能会出现实际cpu核数与WEB UI显示cpu核数不一致的问题。

        推荐调整配置:当前节点有多少核,就向ResourceManager汇报多少核,据实汇报。

        查看当前节点cpu核数:通过CM打开主机查看,CM工具会自动校验各节点有多个核cpu;通过命名查看:grep 'processor' /proc/cpuinfo | sort -u | wc -l

        如何在yarn中配置各节点的cpu核数?

        打开CM中的yarn配置,搜索yarn.nodemanager.resource.cpu-vcores即可配置

        2)内存的配置

        注意:每个NodeManager会自动向ResourceManager报告自己当前节点有多少剩余内存,默认情况下报告8GB,即使实际情况各节点的剩余内存不足8GNodeManager也报告8G,因为yarn同样不会自动校验每个节点有多少剩余内存。这显然是不合适的。由于这种特殊的机制导致我们在不借助其他自动校验工具时,可能会出现实际剩余内存量与ResourceManager得知的剩余内存量不匹配。

        推荐调整配置:剩余内存*80%

        查看当前节点内存剩余:通过CM打开主机查看,CM工具会自动校验各节点的剩余内存;通过命令方式:free -m

        配置yarn可用内存:

        打开CM中的yarn配置,搜索yarn.nodemanager.resource.memory-mb(表示该节点上YARN可使用的物理内存总量)即可配置。除此之外还需要配置yarn.scheduler.maximum-allocation-mb(配置单个任务可申请的最大的内存大小,与yarn.nodemanager.resource.memory-mb一致)和yarn.app.mapreduce.am.command-opts(JVM内存,略小于yarn.nodemanager.resource.memory-mb,一般为0.9倍)

         本项目为了后续实验不会出现内存不足问题,将yarn可用内存设置大一点,在实际生产环境中一般配置剩余内存*80%

        3)yarn本地目录的配置

        配置项:yarn.nodemanager.local-dirs

        作用:yarn在运行的过程中,会产生一些临时文件数据(Map task的临时结果),该配置项用于配置这些临时文件的存储位置。

        推荐配置:当前服务器挂载了几块磁盘,就需要配置几个目录

        如何查看当前服务器挂载了多少磁盘?以及磁盘的挂载点在那?

        通过命令df -h可以查看其挂载点:

        其他磁盘都是临时磁盘,不用管,当前节点的磁盘被挂载到/目录下,即后续在/目录存储的数据实质是存储在该磁盘下。本项目各节点都只有一个磁盘(挂载点都是/),因此yarn的本地目录只需要在其各自磁盘下设置1个即可。

        (3)MapReduce基础配置

        配置项:

        mapreduce.map.memory.mb      :在运行MR时,一个Map Task需要占用多大内存

        mapreduce.map.java.opts          :在运行MR时,一个Map Task对应的jvm需要占用多大内存

        mapreduce.reduce.memory.mb  :在运行MR时,一个Reduce Task需要占用多大内存

        mapreduce.reduce.java.opts      :在运行MR时,一个Reduce Task对应的jvm需要占用多大内存

        注意:jvm的内存配置要略小于对应内存配置

                  上述所有的内存配置大小不能超过yarn可用的内存大小即yarn.nodemanager.resource.memory-mb配置项

        推荐配置:不需手动设置,默认即可

        (4)hive基础配置

        1)hiveserver2的内存大小

        hiveserver2在执行sql时,sql需要翻译成MR以及形成执行计划,这些都需要消耗内存,此外hiveserver2开启后,是通过客户端远程连接hive,也会消耗内存。

        配置项:HiveServer2 的 Java 堆栈大小(字节)默认4G

         注意:如果这个配置项的值较少,在执行sql时会出现下面的错误:此时hiveserver2宕机,需要调整hiveserver2的内存大小,并重启hiveserver2。

        2)动态生成分区的线程数

        配置项:hive.load.dynamic.partitions.thread

        说明:在执行动态分区的时候,最多允许多少个线程来运行动态分区操作,线程越多执行效率越高,但占用资源量也会越大。默认值为15

        推荐配置:先采用默认15,如果动态分区执行很慢,且此时资源还有剩余,则可以尝试调大

        3)监听输入文件线程数

        配置项:hive.exec.input.listing.max.threads

        说明:在运行sql时,最多允许多少个线程读取hdfs上的数据,线程越多读取效率越高,但占用资源也会越大。默认值为15.

        推荐配置:先采用默认15,如果读取效率很慢,且此时资源还有剩余,则可以尝试调大

        (5)hive的压缩配置

        除了在hive中创建表时可以指定压缩方案,在查询分析过程中,Map任务的中间结果,以及Reduce任务的最终结果都可以设置压缩方案。由于Map任务的输出结果需要通过网络传输到Reduce节点上,通过压缩Map任务的结果使得传输的数据量减少,可以获得性能的提升。对于Reduce任务最终结果的压缩既可以减少文件的存储空间,又可以加快网络传输效率。

        1)Map中间结果压缩配置

        配置项:hive.exec.compress.intermediate    : 是否开启hive对中间结果压缩

                      mapreduce.map.output.compress

                      设置是否启动map输出压缩,默认为false。在需要减少网络传输的时候,可以设置为true。

                      mapreduce.map.output.compress.codec

                      设置map输出压缩编码解码器,默认为org.apache.hadoop.io.compress.DefaultCodec,推荐使用SnappyCodec:org.apache.hadoop.io.compress.SnappyCodec。      

        2)Reduce最终结果压缩配置

        配置项:hive.exec.compress.output   : 是否开启hive对最终结果压缩

                      mapreduce.output.fileoutputformat.compress    : 是否启动Reduce压缩

                      mapreduce.output.fileoutputformat.compress.codec   :指定使用什么压缩方案,推荐使用SnappyCodec:org.apache.hadoop.io.compress.SnappyCodec。

                      mapreduce.output.fileoutputformat.compress.type   : 指定使用什么压缩类型(压缩方式):NONE(不指定)、RECORD(行压缩,默认使用)、BLOCK(块压缩),推荐配置:BLOCK,使用块压缩可以实现批量压缩,效率高。

注意:红色配置项在hive的会话窗口配置(set hive.exec.compress.intermediate=true;    set hive.exec.compress.output=true;),蓝色配置项在CM的YARN中配置。红色配置项需要优先开启,如果没有开启,即使蓝色配置项配置了也不生效。

        (6)Hive执行引擎

        在hive中sql语句可以转化为MapReduce,同时也可以转化为Spark

        配置项:hive.execution.engine   :    设置sql转化为MR还是Spark,在CM中直接配

        7、数据抽取

        利用sqoop将业务数据库mysql中的数据抽取到hive的ODS层所对应的表中,由于当前是第一次操作,因此需要全量抽取。

        # 访问咨询数据表抽取sql即--query的值:

select id,
       create_date_time,
       session_id,
       sid,
       create_time,
       seo_source,
       seo_keywords,
       ip,
       area,
       country,
       province,
       city,
       origin_channel,
       user as user_match,
       manual_time,
       begin_time,
       end_time,
       last_customer_msg_time_stamp,
       last_agent_msg_time_stamp,
       reply_msg_count,
       msg_count,
       browser_name,
       os_info,
       "2021-09-24" as starts_time
from web_chat_ems_2019_07;

        # 访问咨询数据表的附属表抽取sql

select
    id,
    referrer,
    from_url,
    landing_page_url,
    url_title,
    platform_description,
    other_params,
    history,
    "2021-09-24" as start_time
from web_chat_text_ems_2019_07;

        # 编写sqoop命令

# 导入访问咨询数据表
sqoop import \
--connect jdbc:mysql://hadoop01:3306/nev \
--username root \
--password 123456 \
--query 'select id,
       create_date_time,
       session_id,
       sid,
       create_time,
       seo_source,
       seo_keywords,
       ip,
       area,
       country,
       province,
       city,
       origin_channel,
       user as user_match,
       manual_time,
       begin_time,
       end_time,
       last_customer_msg_time_stamp,
       last_agent_msg_time_stamp,
       reply_msg_count,
       msg_count,
       browser_name,
       os_info,
       "2021-09-24" as starts_time
from web_chat_ems_2019_07 where 1=1 and $CONDITIONS' \
--hcatalog-database itcast_ods \
--hcatalog-table web_chat_ems \
-m 1
# 注意--query外面有‘’,里面有“”,不能都是‘’或“”

# 导入访问咨询数据表的附属表
sqoop import \
--connect jdbc:mysql://hadoop01:3306/nev \
--username root \
--password 123456 \
--query 'select
    id,
    referrer,
    from_url,
    landing_page_url,
    url_title,
    platform_description,
    other_params,
    history,
    "2021-09-24" as start_time
from web_chat_text_ems_2019_07 where 1=1 and $CONDITIONS' \
--hcatalog-database itcast_ods \
--hcatalog-table web_chat_text_ems \
-m 1

        # 校验数据是否导入成功

        查看mysql共计多少条数据?

        select count(1) from web_chat_ems_2019_07;    211197

        select count(1) from web_chat_text_ems_2019_07;  105599

        查看hive中对应表有多少条数据?

        select count(1) from itcast_ods.web_chat_ems;    211197

        select count(1) from itcast_ods.web_chat_text_ems;   105599

        查询其中一部分数据,观察数据映射是否ok

        select * from itcast_ods.web_chat_ems limit 10; 

        select * from itcast_ods.web_chat_text_ems limit 10;

        出现以下问题:

         hiveserver2内存不足出现宕机。需要调整hiveserver2的内存大小

         配置好后点击“保存更改”,然后进入到CM首页重启即可(CM可以自动监控配置的更改)

        8、数据清洗转换

        将ODS层数据进行ETL清洗,和少量维度退化,并将结果导入到DWD层的相应表中。

# 未转化的sql
select
    wce.session_id,
    wce.sid,
    wce.create_time,   --需要转换ODS层是string类型,DWD层是bigint类型这里需要将其转化为时间戳
    wce.seo_source,
    wce.ip,
    wce.area,
    wce.msg_count,
    wce.origin_channel,
    wcte.referrer,
    wcte.from_url,
    wcte.landing_page_url,
    wcte.url_title,
    wcte.platform_description,
    wcte.other_params,
    wcte.history,
    wce.create_time as hourinfo,    --需要转换
    wce.create_time as yearinfo,   --需要转换
    wce.create_time as quarterinfo,   --需要转换
    wce.create_time as monthinfo,   --需要转换
    wce.create_time as dayinfo   --需要转换
from itcast_ods.web_chat_ems wce inner join itcast_ods.web_chat_text_ems wcte
on wce.id = wcte.id;

        1)unix_timestamp()日期转为时间戳

        unix_timestamp()   : 获取当前时间戳,返回10位bigint类型的数

        例子:select unix_timestamp()      -- 1689169289

        unix_timestamp(string)     : 获取指定时间的时间戳,输入的string时间必须是'yyyy-MM-dd HH:mm:ss'格式,如果不是则返回NULL。

        例子:

        select unix_timestamp('2019-08-15 16:40:00')   --1565858400

        select unix_timestamp('2019-08-15')  --NULL

        unix_timestamp(string,string指明时间的格式)  : 获取指定时间的时间戳,输入的string可以是任意类型,后面指明其类型即可

        例子:        

        select unix_timestamp('2019-08-15','yyyy-MM-dd')   --1565798400

        select unix_timestamp('2019-08-15 16:40:00','yyyy-MM-dd HH:mm:ss')   --1565858400

        select unix_timestamp('2019-08-15','yyyy-MM-dd HH:mm:ss')   --NULL

        2)强制类型转换

        cast(1.58 as int)

        3)from_unixtime()时间戳转为日期

        该函数的输入必须是10位bigint类型的数,返回值为string类型的日期。

        from_unixtime(bigint,string指明时间戳转换后日期的格式)

        例子:

        select from_unixtime(1565858389)   --2019-08-15 16:39:49,不写转换格式默认为yyyy-MM-dd HH:mm:ss格式

        select from_unixtime(1565858389,'yyyy-MM-dd HH:mm:ss')  --2019-08-15 16:39:49

        select from_unixtime(1565858389,'yyyy-MM-dd')   --2019-08-15

        输入必须是10位时间戳,如果不是10位必须先变为10位在转换:

        例子:

        select from_unixtime(1553184000488, 'yyyy-MM-dd HH:mm:ss')  --51188-06-11 00:08:08  显然不对

        select from_unixtime(cast(1553184000488/1000 as int),'yyyy-MM-dd HH:mm:ss')   --2019-03-22 00:00:00

        select from_unixtime(cast(substr(1553184000488,1,10) as int),'yyyy-MM-dd HH:mm:ss')  --2019-03-22 00:00:00

        # 获取当前时间

        select from_unixtime(unix_timestamp(),'yyyy-MM-dd HH:mm:ss')   -- 2019-08-15 17:18:55

        4)日期转换函数

        year()  quarter()  month()  day()  hour()

        这种方式可以实现create_time字段的转换,但是我们想要的是7月显示07,而使用日期转换函数无法实现。此外,日期转换函数要求输入的日期必须是标准的。

        5)字符串截取函数substr()

        大多数数据库中都有substr和substring两种字符串截取函数。但与其他的关系型数据库不同,在hive中,substr与substring函数的使用方式是完全一致的,属于同一个函数。

        substr(string类型日期,int开始截取位置)    :  返回指定开始到结尾的字符串

        例子:select substr('2020-06-06', 6);    --06-06

        substr(string类型日期, int开始截取位置, int截取长度)   : 返回指定开始到指定结尾的字符串

        例子:select substr('2020-06-06', 6,2);   --06

        通过substr()就可以实现上述日期转换函数无法实现的create_time字段的转换:

# 最终查询sql
select
    wce.session_id,
    wce.sid,
    unix_timestamp(wce.create_time) as create_time,
    wce.seo_source,
    wce.ip,
    wce.area,
    wce.msg_count,
    wce.origin_channel,
    wcte.referrer,
    wcte.from_url,
    wcte.landing_page_url,
    wcte.url_title,
    wcte.platform_description,
    wcte.other_params,
    wcte.history,
    substr(wce.create_time,12,2) as hourinfo,
    substr(wce.create_time,1,4) as yearinfo,
    quarter(wce.create_time) as quarterinfo,
    substr(wce.create_time,6,2) as monthinfo,
    substr(wce.create_time,9,2) as dayinfo
from itcast_ods.web_chat_ems wce inner join itcast_ods.web_chat_text_ems wcte
on wce.id = wcte.id;

        可能会出现的错误:

        错误原因:在执行转换操作时,由于需要进行两表联合查询,其中一个表的数据较少,此时hive会自动对其优化(通过explain命令可以看到当前的sql命令hive进行了map join优化,此优化会将数据量少的表数据存储在内存中进行),优化的过程会消耗内存,但是目前内存不足,导致出现内存溢出错误,这种错误的报错方式有两种:

        第一种错误信息:return code 1

        第二种错误信息:return code 137 或 Execution failed with exit status: 137

        解决方案:关闭map join优化 set hive.auto.convert.join=false;

# 最终sql
# 目前select查询出的数据有多个分区,静态分区加载数据很麻烦,我们使用动态分区

--动态分区配置
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;

--hive压缩
set hive.exec.compress.intermediate=true;
set hive.exec.compress.output=true;

--写入时压缩生效visit_consult_dwd表在建表时指定需要使用snappy方案压缩,需要设置写入压缩生效
set hive.exec.orc.compression.strategy=COMPRESSION;

insert into table itcast_dwd.visit_consult_dwd partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
    wce.session_id,
    wce.sid,
    unix_timestamp(wce.create_time) as create_time,
    wce.seo_source,
    wce.ip,
    wce.area,
    wce.msg_count,
    wce.origin_channel,
    wcte.referrer,
    wcte.from_url,
    wcte.landing_page_url,
    wcte.url_title,
    wcte.platform_description,
    wcte.other_params,
    wcte.history,
    substr(wce.create_time,12,2) as hourinfo,
    substr(wce.create_time,1,4) as yearinfo,
    quarter(wce.create_time) as quarterinfo,
    substr(wce.create_time,6,2) as monthinfo,
    substr(wce.create_time,9,2) as dayinfo
from itcast_ods.web_chat_ems wce inner join itcast_ods.web_chat_text_ems wcte
on wce.id = wcte.id;
        9、数据分析

        对DWD层的数据进行分析,并将分析结果导入DWS层

        (1)访问量

         除以上维度还有总访问量,共25个需求

        # 统计每年的总访问量

insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select 
    count(distinct sid) as sid_total,
    count(distinct session_id) as sessionid_total,
    count(distinct ip) as ip_total,
    '-1' as area,
    '-1' as seo_source,
    '-1' as origin_channel,
    '-1' as hourinfo,
    yearinfo as time_str,
    '-1' as from_url,
    '5' as group_type,
    '5' as time_type,
    yearinfo,
    '-1' as quarterinfo,
    '-1' as monthinfo,
    '-1' as dayinfo
from itcast_dwd.visit_consult_dwd
group by yearinfo;

-- 该数据存储在hdfs的yearinfo=2019,quarterinfo=-1,monthinfo=-1,dayinfo=-1的目录下

        # 统计每年每季度的总访问量

insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select 
    count(distinct sid) as sid_total,
    count(distinct session_id) as sessionid_total,
    count(distinct ip) as ip_total,
    '-1' as area,
    '-1' as seo_source,
    '-1' as origin_channel,
    '-1' as hourinfo,
    concat_ws('_',yearinfo,quarterinfo) as time_str,
    '-1' as from_url,
    '5' as group_type,
    '4' as time_type,
    yearinfo,
    quarterinfo,
    '-1' as monthinfo,
    '-1' as dayinfo
from itcast_dwd.visit_consult_dwd
group by yearinfo,quarterinfo;

        # 统计每年每季度每月的总访问量

insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select 
    count(distinct sid) as sid_total,
    count(distinct session_id) as sessionid_total,
    count(distinct ip) as ip_total,
    '-1' as area,
    '-1' as seo_source,
    '-1' as origin_channel,
    '-1' as hourinfo,
    concat(yearinfo,'-',monthinfo) as time_str,
    '-1' as from_url,
    '5' as group_type,
    '3' as time_type,
    yearinfo,
    quarterinfo,
    monthinfo,
    '-1' as dayinfo
from itcast_dwd.visit_consult_dwd
group by yearinfo,quarterinfo,monthinfo;

        # 统计每年每季度每月每天的总访问量

insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select 
    count(distinct sid) as sid_total,
    count(distinct session_id) as sessionid_total,
    count(distinct ip) as ip_total,
    '-1' as area,
    '-1' as seo_source,
    '-1' as origin_channel,
    '-1' as hourinfo,
    concat(yearinfo,'-',monthinfo,'-',dayinfo) as time_str,
    '-1' as from_url,
    '5' as group_type,
    '2' as time_type,
    yearinfo,
    quarterinfo,
    monthinfo,
    dayinfo
from itcast_dwd.visit_consult_dwd
group by yearinfo,quarterinfo,monthinfo,dayinfo;

        # 统计每年每季度每月每天每小时的总访问量

insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select 
    count(distinct sid) as sid_total,
    count(distinct session_id) as sessionid_total,
    count(distinct ip) as ip_total,
    '-1' as area,
    '-1' as seo_source,
    '-1' as origin_channel,
    hourinfo,
    concat(yearinfo,'-',monthinfo,'-',dayinfo,' ',hourinfo) as time_str,
    '-1' as from_url,
    '5' as group_type,
    '1' as time_type,
    yearinfo,
    quarterinfo,
    monthinfo,
    dayinfo
from itcast_dwd.visit_consult_dwd
group by yearinfo,quarterinfo,monthinfo,dayinfo,hourinfo;

        # 统计每年各个受访页面的访问量

insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select 
    count(distinct sid) as sid_total,
    count(distinct session_id) as sessionid_total,
    count(distinct ip) as ip_total,
    '-1' as area,
    '-1' as seo_source,
    '-1' as origin_channel,
    '-1' as hourinfo,
    yearinfo as time_str,
    from_url,
    '4' as group_type,
    '5' as time_type,
    yearinfo,
    '-1' as quarterinfo,
    '-1' as monthinfo,
    '-1' as dayinfo
from itcast_dwd.visit_consult_dwd
group by yearinfo,from_url;

        # 统计每年每季度各个受访页面的访问量

insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select 
    count(distinct sid) as sid_total,
    count(distinct session_id) as sessionid_total,
    count(distinct ip) as ip_total,
    '-1' as area,
    '-1' as seo_source,
    '-1' as origin_channel,
    '-1' as hourinfo,
    concat(yearinfo,'_',quarterinfo) as time_str,
    from_url,
    '4' as group_type,
    '4' as time_type,
    yearinfo,
    quarterinfo,
    '-1' as monthinfo,
    '-1' as dayinfo
from itcast_dwd.visit_consult_dwd
group by yearinfo,quarterinfo,from_url;

        # 统计每年每季度每月各个受访页面的访问量

insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select 
    count(distinct sid) as sid_total,
    count(distinct session_id) as sessionid_total,
    count(distinct ip) as ip_total,
    '-1' as area,
    '-1' as seo_source,
    '-1' as origin_channel,
    '-1' as hourinfo,
    concat(yearinfo,'-',monthinfo) as time_str,
    from_url,
    '4' as group_type,
    '3' as time_type,
    yearinfo,
    quarterinfo,
    monthinfo,
    '-1' as dayinfo
from itcast_dwd.visit_consult_dwd
group by yearinfo,quarterinfo,monthinfo,from_url;

        # 统计每年每季度每月每天各个受访页面的访问量

insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select 
    count(distinct sid) as sid_total,
    count(distinct session_id) as sessionid_total,
    count(distinct ip) as ip_total,
    '-1' as area,
    '-1' as seo_source,
    '-1' as origin_channel,
    '-1' as hourinfo,
    concat(yearinfo,'-',monthinfo,'-',dayinfo) as time_str,
    from_url,
    '4' as group_type,
    '2' as time_type,
    yearinfo,
    quarterinfo,
    monthinfo,
    dayinfo
from itcast_dwd.visit_consult_dwd
group by yearinfo,quarterinfo,monthinfo,dayinfo,from_url;

        # 统计每年每季度每月每天每小时各个受访页面的访问量

insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select 
    count(distinct sid) as sid_total,
    count(distinct session_id) as sessionid_total,
    count(distinct ip) as ip_total,
    '-1' as area,
    '-1' as seo_source,
    '-1' as origin_channel,
    hourinfo,
    concat(yearinfo,'-',monthinfo,'-',dayinfo,' ',hourinfo) as time_str,
    from_url,
    '4' as group_type,
    '1' as time_type,
    yearinfo,
    quarterinfo,
    monthinfo,
    dayinfo
from itcast_dwd.visit_consult_dwd
group by yearinfo,quarterinfo,monthinfo,dayinfo,hourinfo,from_url;

        # 同理其他维度都差不多......见visit.sql文件

        (2)咨询量

        除上述维度外,还有总咨询量维度,共15个需求

        # 统计每年的总咨询量

insert into table itcast_dws.consult_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
    count(distinct sid) as sid_total,
    count(distinct session_id) as sessionid_total,
    count(distinct ip) as ip_total,
    '-1' as area,
    '-1' as origin_channel,
    '-1' as hourinfo,
    yearinfo as time_str,
    '3' as group_type,
    '5' as time_type,
    yearinfo,
    '-1' as quarterinfo,
    '-1' as monthinfo,
    '-1' as dayinfo
from itcast_dwd.visit_consult_dwd where msg_count >= 1
group by yearinfo;

        # 统计每年每季度的总咨询量

insert into table itcast_dws.consult_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
    count(distinct sid) as sid_total,
    count(distinct session_id) as sessionid_total,
    count(distinct ip) as ip_total,
    '-1' as area,
    '-1' as origin_channel,
    '-1' as hourinfo,
    concat(yearinfo,'_',quarterinfo) as time_str,
    '3' as group_type,
    '4' as time_type,
    yearinfo,
    quarterinfo,
    '-1' as monthinfo,
    '-1' as dayinfo
from itcast_dwd.visit_consult_dwd where msg_count >= 1
group by yearinfo,quarterinfo;

        # 统计每年每季度每月的总咨询量

insert into table itcast_dws.consult_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
    count(distinct sid) as sid_total,
    count(distinct session_id) as sessionid_total,
    count(distinct ip) as ip_total,
    '-1' as area,
    '-1' as origin_channel,
    '-1' as hourinfo,
    concat(yearinfo,'-',monthinfo) as time_str,
    '3' as group_type,
    '3' as time_type,
    yearinfo,
    quarterinfo,
    monthinfo,
    '-1' as dayinfo
from itcast_dwd.visit_consult_dwd where msg_count >= 1
group by yearinfo,quarterinfo,monthinfo;

        # 统计每年每季度每月每天的总咨询量

insert into table itcast_dws.consult_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
    count(distinct sid) as sid_total,
    count(distinct session_id) as sessionid_total,
    count(distinct ip) as ip_total,
    '-1' as area,
    '-1' as origin_channel,
    '-1' as hourinfo,
    concat(yearinfo,'-',monthinfo,'-',dayinfo) as time_str,
    '3' as group_type,
    '2' as time_type,
    yearinfo,
    quarterinfo,
    monthinfo,
    dayinfo
from itcast_dwd.visit_consult_dwd where msg_count >= 1
group by yearinfo,quarterinfo,monthinfo,dayinfo;

        # 统计每年每季度每月每天每小时的总咨询量

insert into table itcast_dws.consult_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
    count(distinct sid) as sid_total,
    count(distinct session_id) as sessionid_total,
    count(distinct ip) as ip_total,
    '-1' as area,
    '-1' as origin_channel,
    hourinfo,
    concat(yearinfo,'-',monthinfo,'-',dayinfo,' ',hourinfo) as time_str,
    '3' as group_type,
    '1' as time_type,
    yearinfo,
    quarterinfo,
    monthinfo,
    dayinfo
from itcast_dwd.visit_consult_dwd where msg_count >= 1
group by yearinfo,quarterinfo,monthinfo,dayinfo,hourinfo;

        # 统计每年各个地区的咨询量

insert into table itcast_dws.consult_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
    count(distinct sid) as sid_total,
    count(distinct session_id) as sessionid_total,
    count(distinct ip) as ip_total,
    area,
    '-1' as origin_channel,
    '-1' as hourinfo,
    yearinfo as time_str,
    '1' as group_type,
    '5' as time_type,
    yearinfo,
    '-1' as quarterinfo,
    '-1' as monthinfo,
    '-1' as dayinfo
from itcast_dwd.visit_consult_dwd where msg_count >= 1
group by yearinfo,area;

        # 统计每年每季度各个地区的咨询量

insert into table itcast_dws.consult_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
    count(distinct sid) as sid_total,
    count(distinct session_id) as sessionid_total,
    count(distinct ip) as ip_total,
    area,
    '-1' as origin_channel,
    '-1' as hourinfo,
    concat(yearinfo,'_',quarterinfo) as time_str,
    '1' as group_type,
    '4' as time_type,
    yearinfo,
    quarterinfo,
    '-1' as monthinfo,
    '-1' as dayinfo
from itcast_dwd.visit_consult_dwd where msg_count >= 1
group by yearinfo,quarterinfo,area;

        # 统计每年每季度每月各个地区的咨询量

insert into table itcast_dws.consult_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
    count(distinct sid) as sid_total,
    count(distinct session_id) as sessionid_total,
    count(distinct ip) as ip_total,
    area,
    '-1' as origin_channel,
    '-1' as hourinfo,
    concat(yearinfo,'-',monthinfo) as time_str,
    '1' as group_type,
    '3' as time_type,
    yearinfo,
    quarterinfo,
    monthinfo,
    '-1' as dayinfo
from itcast_dwd.visit_consult_dwd where msg_count >= 1
group by yearinfo,quarterinfo,monthinfo,area;

        # 统计每年每季度每月每天各个地区的咨询量

insert into table itcast_dws.consult_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
    count(distinct sid) as sid_total,
    count(distinct session_id) as sessionid_total,
    count(distinct ip) as ip_total,
    area,
    '-1' as origin_channel,
    '-1' as hourinfo,
    concat(yearinfo,'-',monthinfo,'-',dayinfo) as time_str,
    '1' as group_type,
    '2' as time_type,
    yearinfo,
    quarterinfo,
    monthinfo,
    dayinfo
from itcast_dwd.visit_consult_dwd where msg_count >= 1
group by yearinfo,quarterinfo,monthinfo,dayinfo,area;

        # 统计每年每季度每月每天每小时各个地区的咨询量

insert into table itcast_dws.consult_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
    count(distinct sid) as sid_total,
    count(distinct session_id) as sessionid_total,
    count(distinct ip) as ip_total,
    area,
    '-1' as origin_channel,
    hourinfo,
    concat(yearinfo,'-',monthinfo,'-',dayinfo,' ',hourinfo) as time_str,
    '1' as group_type,
    '1' as time_type,
    yearinfo,
    quarterinfo,
    monthinfo,
    dayinfo
from itcast_dwd.visit_consult_dwd where msg_count >= 1
group by yearinfo,quarterinfo,monthinfo,dayinfo,hourinfo,area;

        # 同理其他维度都一样......见consult.sql文件

        10、数据导出

        利用sqoop将DWS层的数据导出到Mysql中

        首先在Mysql中创建目标表:

create database scrm_bi default character set utf8mb4 collate utf8mb4_general_ci;

--访问量结果表
CREATE TABLE `visit_dws` (
  sid_total int COMMENT '根据sid去重求count',
  sessionid_total int COMMENT '根据sessionid去重求count',
  ip_total int COMMENT '根据IP去重求count',
  area varchar(32) COMMENT '区域信息',
  seo_source varchar(32) COMMENT '搜索来源',
  origin_channel varchar(32) COMMENT '来源渠道',
  hourinfo varchar(32) COMMENT '小时信息',
  time_str varchar(32) COMMENT '时间明细',
  from_url varchar(32) comment '会话来源页面',
  group_type varchar(32) COMMENT '产品属性类型:1.地区;2.搜索来源;3.来源渠道;4.会话来源页面;5.总访问量',
  time_type varchar(32) COMMENT '时间聚合类型:1、按小时聚合;2、按天聚合;3、按月聚合;4、按季度聚合;5、按年聚合;',
  yearinfo varchar(32) COMMENT '年信息',
  quarterinfo varchar(32) COMMENT '季度',
  monthinfo varchar(32) COMMENT '月信息',
  dayinfo varchar(32) COMMENT '日信息'
);

--咨询量结果表
CREATE TABLE `consult_dws` (
  sid_total int COMMENT '根据sid去重求count',
  sessionid_total int COMMENT '根据sessionid去重求count',
  ip_total int COMMENT '根据IP去重求count',
  area varchar(32) COMMENT '区域信息',
  origin_channel varchar(32) COMMENT '来源渠道',
  hourinfo varchar(32) COMMENT '小时信息',
  time_str varchar(32) COMMENT '时间明细',
  group_type varchar(32) COMMENT '产品属性类型:1.地区;2.来源渠道;3.总访问量',
  time_type varchar(32) COMMENT '时间聚合类型:1、按小时聚合;2、按天聚合;3、按月聚合;4、按季度聚合;5、按年聚合;',
  yearinfo varchar(32) COMMENT '年信息',
  quarterinfo varchar(32) COMMENT '季度',
  monthinfo varchar(32) COMMENT '月信息',
  dayinfo varchar(32) COMMENT '日信息'
);

        然后执行导出操作:

       导出咨询量数据时可能会出现的问题:出现中文乱码

# 导出咨询量数据
sqoop export \
--connect jdbc:mysql://hadoop01:3306/scrm_bi \
--username root \
--password 123456 \
--table consult_dws \
--hcatalog-database itcast_dws \
--hcatalog-table consult_dws \
-m 1

        解决方法:

        先清空表数据,然后重新导入

         --connect "jdbc:mysql://hadoop01:3306/scrm_bi?useUnicode=true&characterEncoding=utf-8" \

sqoop export \
--connect "jdbc:mysql://hadoop01:3306/scrm_bi?useUnicode=true&characterEncoding=utf-8" \
--username root \
--password 123456 \
--table consult_dws \
--hcatalog-database itcast_dws \
--hcatalog-table consult_dws \
-m 1

# 注意--connect中出现特殊符号?,建议加上""。默认是拉丁编码不支持中文

        导出访问量数据时可能会出现的错误:字段大小设置太小

# 导出访问量数据
sqoop export \
--connect "jdbc:mysql://hadoop01:3306/scrm_bi?useUnicode=true&characterEncoding=utf-8" \
--username root \
--password 123456 \
--table visit_dws \
--hcatalog-database itcast_dws \
--hcatalog-table visit_dws \
-m 1

         可以看见报错信息只是说导出失败,并没有具体说为什么导出失败,这种错误一般不是语法错误(语法错误会具体指出错误原因),该错误是sqoop在执行MR后报出的错误,已经通过了sqoop的语法检查,此后的错误sqoop不知道。

        如何查阅具体的错误信息呢?必须查看MR的运行日志

         点击这里here查看详细日志(注意INFO是正常日志,我们需要找ERROR)

        日志显示:from_url列的数据太长了,就是创表时该字段的长度设的太小了,导致加载不进去

        解决方法:将from_url的varchar(32)变大varchar(1000),然后重新导入

        至此,所有全量操作已经完成,后续的操作都是增量的操作,一般为按天进行增量操作。

        11、增量流程-增量数据模拟

        在实际生产环境下,业务数据库中的数据是以月为单位的,即一个月的数据会形成一张表,上述全量操作我们已经模拟了公司所有的历史数据分析工作,当然这里我们是以两张表进行分析,实际生产环境下可能会有上百张表。接下来只需要对前一天的数据进行增量统计即可。这里我们模拟一天的业务数据:

-- 模拟访问咨询数据表数据
create table web_chat_ems_2021_09 as 
select * from web_chat_ems_2019_07 where create_time between '2019-07-01 00:00:00' and '2019-07-01 23:59:59';

-- 修改表中的create_time值模拟增量数据
update web_chat_ems_2021_09 set create_time = concat('2021-09-25 ',substr(create_time,12))

-- 模拟访问咨询数据表的附属表数据
create table web_chat_text_ems_2021_09 as
select
    b.*
from (select * from web_chat_ems_2019_07 where create_time between '2019-07-01 00:00:00' and '2019-07-01 23:59:59') a, web_chat_text_ems_2019_07 b
where a.id = b.id

        注意:虽然web_chat_ems_2021_09和web_chat_text_ems_2021_09表中只有一天的数据,但是在实际生产环境中从1号到25号的数据都有,全量操作已经分析完1号到24号了,这里需要分析25号的增量数据即可。

        12、增量流程-增量数据抽取

        利用sqoop将mysql中新增的数据抽取到ODS层

        # 查询新增数据:

select id,
       create_date_time,
       session_id,
       sid,
       create_time,
       seo_source,
       seo_keywords,
       ip,
       area,
       country,
       province,
       city,
       origin_channel,
       user as user_match,
       manual_time,
       begin_time,
       end_time,
       last_customer_msg_time_stamp,
       last_agent_msg_time_stamp,
       reply_msg_count,
       msg_count,
       browser_name,
       os_info,
       "2021-09-26" as starts_time
from web_chat_ems_2021_09 where create_time between '2021-09-25 00:00:00' and '2021-09-25 23:59:59';

select
    temp2.*,
    "2021-09-26" as start_time
from (select id from web_chat_ems_2021_09 where create_time between '2021-09-25 00:00:00' and '2021-09-25 23:59:59') temp1, web_chat_text_ems_2021_09 temp2
where temp1.id = temp2.id;

        # sqoop导入命令:

# 导入访问咨询数据表
sqoop import \
--connect jdbc:mysql://hadoop01:3306/nev \
--username root \
--password 123456 \
--query 'select id,
       create_date_time,
       session_id,
       sid,
       create_time,
       seo_source,
       seo_keywords,
       ip,
       area,
       country,
       province,
       city,
       origin_channel,
       user as user_match,
       manual_time,
       begin_time,
       end_time,
       last_customer_msg_time_stamp,
       last_agent_msg_time_stamp,
       reply_msg_count,
       msg_count,
       browser_name,
       os_info,
       "2021-09-26" as starts_time
from web_chat_ems_2021_09 where create_time between "2021-09-25 00:00:00" and "2021-09-25 23:59:59" and $CONDITIONS' \
--hcatalog-database itcast_ods \
--hcatalog-table web_chat_ems \
-m 1
# 注意--query外面有‘’,里面有“”,不能都是‘’或“”

# 导入访问咨询数据表的附属表
sqoop import \
--connect jdbc:mysql://hadoop01:3306/nev \
--username root \
--password 123456 \
--query 'select
    temp2.*,
    "2021-09-26" as start_time
from (select id from web_chat_ems_2021_09 where create_time between "2021-09-25 00:00:00" and "2021-09-25 23:59:59") temp1, web_chat_text_ems_2021_09 temp2
where temp1.id = temp2.id and $CONDITIONS' \
--hcatalog-database itcast_ods \
--hcatalog-table web_chat_text_ems \
-m 1

        # 增量化操作是周期性的,上述代码执行完就没了,无法实现自动化任务调度,因此我们需要将其编写为shell脚本的方式自动获取前一天的时间,通过ozzie自动化执行:

        shell脚本要求:能够实现自动获取前一天的数据,并且还支持抽取指定日期的数据

        思考1:如何获取前一天日期?date命令(今天日期减1天)

        # 获取今天的日期

        date

        date +'%Y%m%d'    :   指定输出格式

        # 获取指定日期的年月日格式输出

        date -d '2014-11-12' +'%Y-%m-%d'

        # 获取指定日期的星期(周几)格式输出

        date --date='2014-11-23' +'%w'

        # 获取上周日期

        date -d '-1 week' +'%Y-%m-%d'

        # 获取前一天日期

        date -d '-1 day' +'%Y-%m-%d'

        date --date='-24 hour' +'%Y-%m-%d %H:%M:%S'

        思考2:如何让shell脚本接收外部的参数呢?$符号

        $1  $2    : 接收传入的第一个参数与第二个参数

        $#          : 统计传入多少个参数

         思考3:如何在shell中实现判断操作

                需求:如果传递了参数,设置为该参数即可;如果没有传递参数,设置为上一天日期

#!/bin/bash

if [ $# == 1 ]; then
    dateStr=$1    # 负值时不能有空格
else
    dateStr=`date -d '-1 day' +'%Y-%m-%d'`    # 命令需要用``括起来
fi

echo ${dateStr}

        # shell脚本(Oozie安装在hadoop01上因此后续所有的shell脚本我们编写在hadoop01的/root目录下,当然编写在哪里都可以,最后都需要将shell脚本下载到windows下,然后上传至oozie)

        cd /root/mode1

        vim edu_mode_1_collect.sh   (mode_1表示访问咨询主题,是第一个主题)

#!/bin/bash

SQOOP_HOME=/usr/bin/sqoop

if [ $# == 1 ]; then
    dateStr=$1
else
    dateStr=`date -d '-1 day' +'%Y-%m-%d'`
fi

dateNowStr=`date +'%Y-%m-%d'`
yearStr=`date -d ${dateStr} +'%Y'`
monthStr=`date -d ${dateStr} +'%m'`

# 公共参数
jdbcUrl='jdbc:mysql://hadoop01:3306/nev'
username='root'
password='123456'
m='1'
hivedatabase='itcast_ods'

${SQOOP_HOME} import \
--connect ${jdbcUrl} \
--username ${username} \
--password ${password} \
--query "select id,
       create_date_time,
       session_id,
       sid,
       create_time,
       seo_source,
       seo_keywords,
       ip,
       area,
       country,
       province,
       city,
       origin_channel,
       user as user_match,
       manual_time,
       begin_time,
       end_time,
       last_customer_msg_time_stamp,
       last_agent_msg_time_stamp,
       reply_msg_count,
       msg_count,
       browser_name,
       os_info,
       '${dateNowStr}' as starts_time
from web_chat_ems_${yearStr}_${monthStr} where create_time between '${dateStr} 00:00:00' and '${dateStr} 23:59:59' and \$CONDITIONS" \
--hcatalog-database ${hivedatabase} \
--hcatalog-table web_chat_ems \
-m ${m}

# &     # 上下两个sqoop并行执行

${SQOOP_HOME} import \
--connect ${jdbcUrl} \
--username ${username} \
--password ${password} \
--query "select
    temp2.*,
    '${dateNowStr}' as start_time
from (select id from web_chat_ems_${yearStr}_${monthStr} where create_time between '${dateStr} 00:00:00' and '${dateStr} 23:59:59') temp1, web_chat_text_ems_${yearStr}_${monthStr} temp2
where temp1.id = temp2.id and \$CONDITIONS" \
--hcatalog-database ${hivedatabase} \
--hcatalog-table web_chat_text_ems \
-m ${m}

# 注意在shell中''与""是不同的,''表示原样输出即''内不允许获取变量值${},""内允许获取变量值,因此如果我们写的语句没有${}过程使用'',如果有${}存在则必须使用""
# 注意一般在shell中公有的参数会转化为变量,方便后续重复使用
# 注意shell脚本默认是串行执行的,我们可以加入&使其并行执行,该脚本的两条sqoop命令是独立的,在电脑资源充足时可以选择并行执行
# 注意本项目已经将sqoop命令加入环境变量,因此这里直接写sqoop就可以运行,如果未来做项目时没有将sqoop命令加载至环境变量,此时在shell脚本中必须执行sqoop命令的绝对路径
# SQOOP_HOME=/usr/bin/sqoop   sqoop import替换为${SQOOP_HOME} import

        shell脚本的串行执行与并行执行: 

sleep 5 &
echo "done"

        加入&系统会将其之前的命令放入后台,而继续执行下面的命令。此脚本执行后会立即打印出"done",sleep命令被扔给后台执行,不会阻塞脚本执行。如果想要在进入下个循环前,必须等待上个后台命令执行完毕,可以使用wait命令:这样,需要等待5s后才能在屏幕上看到"done"。

sleep 5 &
wait
echo "done"

        注意:只有多条命令之间没有依赖关系时,才可以并行执行。

        # 测试脚本是否正常执行

        sh edu_mode_1_collect.sh 2021-09-25

        查看ODS层是否导入了新增数据:

        select * from itcast_ods.web_chat_ems where starts_time = '2021-09-26' limit 10;

        13、增量流程-增量数据抽取oozie实现自动调度(没配完)

        (1)配置workflow工作流

        当前shell脚本是在linux中,需要先下载到windows下,然后打开oozie将windows下的shell脚本上传至hdfs的某个目录下。需要保证shell脚本在hdfs中。

         选中该脚本添加至oozie:

         建议将同样的shell脚本添加两次:

         然后保存即可:

        (2)配置计划

         点击运行启动计划即可

        14、增量流程-增量数据清洗转换

        将ODS层中新增的数据进行ETL清洗,和少量维度退化,并将结果追加至DWD层

        # sql语句:

--动态分区配置
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;

--hive压缩
set hive.exec.compress.intermediate=true;
set hive.exec.compress.output=true;

--写入时压缩生效visit_consult_dwd表在建表时指定需要使用snappy方案压缩,需要设置写入压缩生效
set hive.exec.orc.compression.strategy=COMPRESSION;

insert into table itcast_dwd.visit_consult_dwd partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
    wce.session_id,
    wce.sid,
    unix_timestamp(wce.create_time) as create_time,
    wce.seo_source,
    wce.ip,
    wce.area,
    wce.msg_count,
    wce.origin_channel,
    wcte.referrer,
    wcte.from_url,
    wcte.landing_page_url,
    wcte.url_title,
    wcte.platform_description,
    wcte.other_params,
    wcte.history,
    substr(wce.create_time,12,2) as hourinfo,
    substr(wce.create_time,1,4) as yearinfo,
    quarter(wce.create_time) as quarterinfo,
    substr(wce.create_time,6,2) as monthinfo,
    substr(wce.create_time,9,2) as dayinfo
from (select * from itcast_ods.web_chat_ems where starts_time = '2021-09-26') wce inner join (select * from itcast_ods.web_chat_text_ems where start_time = '2021-09-26') wcte
on wce.id = wcte.id;

-- 注意在涉及多表联合查询时,一般先查询出各表的增量数据然后在建立连接,这样建立连接时只有增量数据连接加快了执行效率;如果没有查询而直接先建立连接然后在筛选增量数据,这样多表连接时会有很庞大的数据量(大多数的join都是无用的),执行速度很慢,这就是sql的优化

        # shell脚本:

        思考:如何在shell下执行hive的sql呢?

        hive -e | -f "sql语句 | sql文件" -S

        -S   :   静态化执行,不会显示日志信息(即不会显示Map 0% Reduce 0%)

        shell脚本要求:如果没有提供参数则清洗抽取时间为今天的数据,如果指定了参数则清洗抽取时间为指定时间的数据

        cd /root

        vim edu_mode_1_handle.sh

#!/bin/bash

HIVE_HOME=/usr/bin/hive

if [ $# == 1 ]; then
    dateStr=$1
else
    dateStr=`date +'%Y-%m-%d'`
fi

sqlStr="
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.compress.intermediate=true;
set hive.exec.compress.output=true;
set hive.exec.orc.compression.strategy=COMPRESSION;

insert into table itcast_dwd.visit_consult_dwd partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
    wce.session_id,
    wce.sid,
    unix_timestamp(wce.create_time) as create_time,
    wce.seo_source,
    wce.ip,
    wce.area,
    wce.msg_count,
    wce.origin_channel,
    wcte.referrer,
    wcte.from_url,
    wcte.landing_page_url,
    wcte.url_title,
    wcte.platform_description,
    wcte.other_params,
    wcte.history,
    substr(wce.create_time,12,2) as hourinfo,
    substr(wce.create_time,1,4) as yearinfo,
    quarter(wce.create_time) as quarterinfo,
    substr(wce.create_time,6,2) as monthinfo,
    substr(wce.create_time,9,2) as dayinfo
from (select * from itcast_ods.web_chat_ems where starts_time = '${dateStr}') wce inner join (select * from itcast_ods.web_chat_text_ems where start_time = '${dateStr}') wcte
on wce.id = wcte.id;"

${HIVE_HOME} -e "${sqlStr}" -S

        # 测试脚本是否可以正常执行:sh edu_mode_1_handle.sh

        select * from itcast_dwd.visit_consult_dwd where yearinfo = '2021' and monthinfo = '09' and dayinfo = '25' limit 10;

        15、增量流程-增量数据清洗转换oozie自动化调度

         点击保存即可。

        16、增量流程-增量数据分析

        注意:假设当前以年统计,且dws层中的数据已经统计了2018,2019,2020年的数据,2021年的数据统计至2021-09-24,现在新增了2021-09-25号的数据,我们只需要对2021年的数据进行重新统计即可,新增的数据对2018,2019,2020年的统计结果并没有影响。同理按季度统计时,只需要重新统计新增数据所在季度的数据即可,其他季度的数据不需要重新统计。

        说明:

        在按照年统计的时候,只需要统计加上这一天以后这一年的数据即可,之前年的数据不需要重新统计;在按照季度统计的时候,只需要统计加上这一天以后这一年中所在季度的数据即可,之前季度的数据不需要重新统计;在按照月统计的时候,只需要统计加上这一天以后这一年中所在季度所在月的数据即可,其他月的数据不需要重新统计;在按照天统计的时候,只需要统计新增数据即可,其他天的数据不需要统计;在按照小时统计的时候,只需要统计新增数据的各个小时即可,其他天的数据不需要统计。

        思考:在统计的过程中,比如以年统计,得到一个新的年的统计结果,但是DWS层中还有一个历史统计结果,这不就冲突了吗?例如按年统计DWS层中已经对2021-01-01--2021-09-24的数据统计了,现在又加入2021-09-25数据重新统计了2021年的数据,显然,历史统计结果就没用了。

        解决方法:删除受影响的历史统计结果(只有按年,季度,月统计时存在受影响的历史统计结果

        思考:如何删除受影响的历史统计结果?注意hive不支持直接对表中某条数据的删除(delete,update都不支持)

        解决方法:可以通过删除分区的方案进行处理(注意一定是先删除历史统计结果,然后在insert into新统计结果)

        alter table 表名 drop partition(key=value,.....);

        例子:

        删除2021年历史统计结果

        alter table itcast_dws.visit_dws drop partition(yearinfo='2021',quarterinfo='-1',monthinfo='-1',dayinfo='-1');

        alter table itcast_dws.consult_dws drop partition(yearinfo='2021',quarterinfo='-1',monthinfo='-1',dayinfo='-1');

        删除2021年第3季度的历史统计结果

        alter table itcast_dws.visit_dws drop partition(yearinfo='2021',quarterinfo='3',monthinfo='-1',dayinfo='-1');

        alter table itcast_dws.consult_dws drop partition(yearinfo='2021',quarterinfo='3',monthinfo='-1',dayinfo='-1');

        删除2021年第3季度9月份的历史统计结果

        alter table itcast_dws.visit_dws drop partition(yearinfo='2021',quarterinfo='3',monthinfo='09',dayinfo='-1');

        alter table itcast_dws.consult_dws drop partition(yearinfo='2021',quarterinfo='3',monthinfo='09',dayinfo='-1');

        (1)访问量-增量分析sql

        # 以总访问量为例

insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select 
    count(distinct sid) as sid_total,
    count(distinct session_id) as sessionid_total,
    count(distinct ip) as ip_total,
    '-1' as area,
    '-1' as seo_source,
    '-1' as origin_channel,
    '-1' as hourinfo,
    yearinfo as time_str,
    '-1' as from_url,
    '5' as group_type,
    '5' as time_type,
    yearinfo,
    '-1' as quarterinfo,
    '-1' as monthinfo,
    '-1' as dayinfo
from itcast_dwd.visit_consult_dwd where yearinfo = '2021'
group by yearinfo;

insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select 
    count(distinct sid) as sid_total,
    count(distinct session_id) as sessionid_total,
    count(distinct ip) as ip_total,
    '-1' as area,
    '-1' as seo_source,
    '-1' as origin_channel,
    '-1' as hourinfo,
    concat_ws('_',yearinfo,quarterinfo) as time_str,
    '-1' as from_url,
    '5' as group_type,
    '4' as time_type,
    yearinfo,
    quarterinfo,
    '-1' as monthinfo,
    '-1' as dayinfo
from itcast_dwd.visit_consult_dwd where yearinfo = '2021' and quarterinfo = '3'
group by yearinfo,quarterinfo;

insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select 
    count(distinct sid) as sid_total,
    count(distinct session_id) as sessionid_total,
    count(distinct ip) as ip_total,
    '-1' as area,
    '-1' as seo_source,
    '-1' as origin_channel,
    '-1' as hourinfo,
    concat(yearinfo,'-',monthinfo) as time_str,
    '-1' as from_url,
    '5' as group_type,
    '3' as time_type,
    yearinfo,
    quarterinfo,
    monthinfo,
    '-1' as dayinfo
from itcast_dwd.visit_consult_dwd where yearinfo = '2021' and quarterinfo = '3' and monthinfo = '09' 
group by yearinfo,quarterinfo,monthinfo;

insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select 
    count(distinct sid) as sid_total,
    count(distinct session_id) as sessionid_total,
    count(distinct ip) as ip_total,
    '-1' as area,
    '-1' as seo_source,
    '-1' as origin_channel,
    '-1' as hourinfo,
    concat(yearinfo,'-',monthinfo,'-',dayinfo) as time_str,
    '-1' as from_url,
    '5' as group_type,
    '2' as time_type,
    yearinfo,
    quarterinfo,
    monthinfo,
    dayinfo
from itcast_dwd.visit_consult_dwd where yearinfo = '2021' and quarterinfo = '3' and monthinfo = '09' and dayinfo = '25'
group by yearinfo,quarterinfo,monthinfo,dayinfo;

insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select 
    count(distinct sid) as sid_total,
    count(distinct session_id) as sessionid_total,
    count(distinct ip) as ip_total,
    '-1' as area,
    '-1' as seo_source,
    '-1' as origin_channel,
    hourinfo,
    concat(yearinfo,'-',monthinfo,'-',dayinfo,' ',hourinfo) as time_str,
    '-1' as from_url,
    '5' as group_type,
    '1' as time_type,
    yearinfo,
    quarterinfo,
    monthinfo,
    dayinfo
from itcast_dwd.visit_consult_dwd where yearinfo = '2021' and quarterinfo = '3' and monthinfo = '09' and dayinfo = '25'
group by yearinfo,quarterinfo,monthinfo,dayinfo,hourinfo;

        其他维度都一样

        (2)咨询量-增量分析sql

        # 以地区维度为例

insert into table itcast_dws.consult_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
    count(distinct sid) as sid_total,
    count(distinct session_id) as sessionid_total,
    count(distinct ip) as ip_total,
    area,
    '-1' as origin_channel,
    '-1' as hourinfo,
    yearinfo as time_str,
    '1' as group_type,
    '5' as time_type,
    yearinfo,
    '-1' as quarterinfo,
    '-1' as monthinfo,
    '-1' as dayinfo
from itcast_dwd.visit_consult_dwd where msg_count >= 1 and yearinfo = '2021'
group by yearinfo,area;

insert into table itcast_dws.consult_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
    count(distinct sid) as sid_total,
    count(distinct session_id) as sessionid_total,
    count(distinct ip) as ip_total,
    area,
    '-1' as origin_channel,
    '-1' as hourinfo,
    concat(yearinfo,'_',quarterinfo) as time_str,
    '1' as group_type,
    '4' as time_type,
    yearinfo,
    quarterinfo,
    '-1' as monthinfo,
    '-1' as dayinfo
from itcast_dwd.visit_consult_dwd where msg_count >= 1 and yearinfo = '2021' and quarterinfo = '3'
group by yearinfo,quarterinfo,area;

insert into table itcast_dws.consult_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
    count(distinct sid) as sid_total,
    count(distinct session_id) as sessionid_total,
    count(distinct ip) as ip_total,
    area,
    '-1' as origin_channel,
    '-1' as hourinfo,
    concat(yearinfo,'-',monthinfo) as time_str,
    '1' as group_type,
    '3' as time_type,
    yearinfo,
    quarterinfo,
    monthinfo,
    '-1' as dayinfo
from itcast_dwd.visit_consult_dwd where msg_count >= 1 and yearinfo = '2021' and quarterinfo = '3' and monthinfo = '09'
group by yearinfo,quarterinfo,monthinfo,area;

insert into table itcast_dws.consult_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
    count(distinct sid) as sid_total,
    count(distinct session_id) as sessionid_total,
    count(distinct ip) as ip_total,
    area,
    '-1' as origin_channel,
    '-1' as hourinfo,
    concat(yearinfo,'-',monthinfo,'-',dayinfo) as time_str,
    '1' as group_type,
    '2' as time_type,
    yearinfo,
    quarterinfo,
    monthinfo,
    dayinfo
from itcast_dwd.visit_consult_dwd where msg_count >= 1 and yearinfo = '2021' and quarterinfo = '3' and monthinfo = '09' and dayinfo = '25'
group by yearinfo,quarterinfo,monthinfo,dayinfo,area;

insert into table itcast_dws.consult_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
    count(distinct sid) as sid_total,
    count(distinct session_id) as sessionid_total,
    count(distinct ip) as ip_total,
    area,
    '-1' as origin_channel,
    hourinfo,
    concat(yearinfo,'-',monthinfo,'-',dayinfo,' ',hourinfo) as time_str,
    '1' as group_type,
    '1' as time_type,
    yearinfo,
    quarterinfo,
    monthinfo,
    dayinfo
from itcast_dwd.visit_consult_dwd where msg_count >= 1 and yearinfo = '2021' and quarterinfo = '3' and monthinfo = '09' and dayinfo = '25'
group by yearinfo,quarterinfo,monthinfo,dayinfo,hourinfo,area;

        其他维度都一样

        (3)shell脚本

        要求:如果指定了日期参数,则统计分析该日期即可;如果没有指定日期参数,则统计上一期的日期

        注意:

        cd /root

        vim edu_mode_1_analyse.sh(这里的shell不完整)

#!/bin/bash

HIVE_HOME=/usr/bin/hive

if [ $# == 1 ]; then
    dateStr=$1
else
    dateStr=`date -d '-1 day' +'%Y-%m-%d'`
fi

yearStr=`date -d ${dateStr} +'%Y'`
monthStr=`date -d ${dateStr} +'%m'`
month_for_quarter=`date -d ${dateStr} +'%-m'`
quarterStr=$(((${month_for_quarter}-1)/3+1))   # 在shell中进行数据计算,需要用()括起来,然后用$()获取计算结果
dayStr=`date -d ${dateStr} +'%d'`
# quarterStr=`quarter(${dateStr})`不对,quarter是hive中的函数,shell实质是运行在linux中的,因此必须使用Linux的命令,而linux中并没有quarter()函数
# 季度需要通过月份计算

sqlStr="
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.compress.intermediate=true;
set hive.exec.compress.output=true;
set hive.exec.orc.compression.strategy=COMPRESSION;

alter table itcast_dws.visit_dws drop partition(yearinfo='${yearStr}',quarterinfo='-1',monthinfo='-1',dayinfo='-1');
alter table itcast_dws.visit_dws drop partition(yearinfo='${yearStr}',quarterinfo='${quarterStr}',monthinfo='-1',dayinfo='-1');
alter table itcast_dws.visit_dws drop partition(yearinfo='${yearStr}',quarterinfo='${quarterStr}',monthinfo='${monthStr}',dayinfo='-1');

alter table itcast_dws.consult_dws drop partition(yearinfo='${yearStr}',quarterinfo='-1',monthinfo='-1',dayinfo='-1');
alter table itcast_dws.consult_dws drop partition(yearinfo='${yearStr}',quarterinfo='${quarterStr}',monthinfo='-1',dayinfo='-1');
alter table itcast_dws.consult_dws drop partition(yearinfo='${yearStr}',quarterinfo='${quarterStr}',monthinfo='${monthStr}',dayinfo='-1');

insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select 
    count(distinct sid) as sid_total,
    count(distinct session_id) as sessionid_total,
    count(distinct ip) as ip_total,
    '-1' as area,
    '-1' as seo_source,
    '-1' as origin_channel,
    '-1' as hourinfo,
    yearinfo as time_str,
    '-1' as from_url,
    '5' as group_type,
    '5' as time_type,
    yearinfo,
    '-1' as quarterinfo,
    '-1' as monthinfo,
    '-1' as dayinfo
from itcast_dwd.visit_consult_dwd where yearinfo = '${yearStr}'
group by yearinfo;

insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select 
    count(distinct sid) as sid_total,
    count(distinct session_id) as sessionid_total,
    count(distinct ip) as ip_total,
    '-1' as area,
    '-1' as seo_source,
    '-1' as origin_channel,
    '-1' as hourinfo,
    concat_ws('_',yearinfo,quarterinfo) as time_str,
    '-1' as from_url,
    '5' as group_type,
    '4' as time_type,
    yearinfo,
    quarterinfo,
    '-1' as monthinfo,
    '-1' as dayinfo
from itcast_dwd.visit_consult_dwd where yearinfo = '${yearStr}' and quarterinfo = '${quarterStr}'
group by yearinfo,quarterinfo;

insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select 
    count(distinct sid) as sid_total,
    count(distinct session_id) as sessionid_total,
    count(distinct ip) as ip_total,
    '-1' as area,
    '-1' as seo_source,
    '-1' as origin_channel,
    '-1' as hourinfo,
    concat(yearinfo,'-',monthinfo) as time_str,
    '-1' as from_url,
    '5' as group_type,
    '3' as time_type,
    yearinfo,
    quarterinfo,
    monthinfo,
    '-1' as dayinfo
from itcast_dwd.visit_consult_dwd where yearinfo = '${yearStr}' and quarterinfo = '${quarterStr}' and monthinfo = '${monthStr}' 
group by yearinfo,quarterinfo,monthinfo;

insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select 
    count(distinct sid) as sid_total,
    count(distinct session_id) as sessionid_total,
    count(distinct ip) as ip_total,
    '-1' as area,
    '-1' as seo_source,
    '-1' as origin_channel,
    '-1' as hourinfo,
    concat(yearinfo,'-',monthinfo,'-',dayinfo) as time_str,
    '-1' as from_url,
    '5' as group_type,
    '2' as time_type,
    yearinfo,
    quarterinfo,
    monthinfo,
    dayinfo
from itcast_dwd.visit_consult_dwd where yearinfo = '${yearStr}' and quarterinfo = '${quarterStr}' and monthinfo = '${monthStr}' and dayinfo = '${dayStr}'
group by yearinfo,quarterinfo,monthinfo,dayinfo;

insert into table itcast_dws.visit_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select 
    count(distinct sid) as sid_total,
    count(distinct session_id) as sessionid_total,
    count(distinct ip) as ip_total,
    '-1' as area,
    '-1' as seo_source,
    '-1' as origin_channel,
    hourinfo,
    concat(yearinfo,'-',monthinfo,'-',dayinfo,' ',hourinfo) as time_str,
    '-1' as from_url,
    '5' as group_type,
    '1' as time_type,
    yearinfo,
    quarterinfo,
    monthinfo,
    dayinfo
from itcast_dwd.visit_consult_dwd where yearinfo = '${yearStr}' and quarterinfo = '${quarterStr}' and monthinfo = '${monthStr}' and dayinfo = '${dayStr}'
group by yearinfo,quarterinfo,monthinfo,dayinfo,hourinfo;


insert into table itcast_dws.consult_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
    count(distinct sid) as sid_total,
    count(distinct session_id) as sessionid_total,
    count(distinct ip) as ip_total,
    area,
    '-1' as origin_channel,
    '-1' as hourinfo,
    yearinfo as time_str,
    '1' as group_type,
    '5' as time_type,
    yearinfo,
    '-1' as quarterinfo,
    '-1' as monthinfo,
    '-1' as dayinfo
from itcast_dwd.visit_consult_dwd where msg_count >= 1 and yearinfo = '${yearStr}'
group by yearinfo,area;

insert into table itcast_dws.consult_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
    count(distinct sid) as sid_total,
    count(distinct session_id) as sessionid_total,
    count(distinct ip) as ip_total,
    area,
    '-1' as origin_channel,
    '-1' as hourinfo,
    concat(yearinfo,'_',quarterinfo) as time_str,
    '1' as group_type,
    '4' as time_type,
    yearinfo,
    quarterinfo,
    '-1' as monthinfo,
    '-1' as dayinfo
from itcast_dwd.visit_consult_dwd where msg_count >= 1 and yearinfo = '${yearStr}' and quarterinfo = '${quarterStr}'
group by yearinfo,quarterinfo,area;

insert into table itcast_dws.consult_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
    count(distinct sid) as sid_total,
    count(distinct session_id) as sessionid_total,
    count(distinct ip) as ip_total,
    area,
    '-1' as origin_channel,
    '-1' as hourinfo,
    concat(yearinfo,'-',monthinfo) as time_str,
    '1' as group_type,
    '3' as time_type,
    yearinfo,
    quarterinfo,
    monthinfo,
    '-1' as dayinfo
from itcast_dwd.visit_consult_dwd where msg_count >= 1 and yearinfo = '${yearStr}' and quarterinfo = '${quarterStr}' and monthinfo = '${monthStr}'
group by yearinfo,quarterinfo,monthinfo,area;

insert into table itcast_dws.consult_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
    count(distinct sid) as sid_total,
    count(distinct session_id) as sessionid_total,
    count(distinct ip) as ip_total,
    area,
    '-1' as origin_channel,
    '-1' as hourinfo,
    concat(yearinfo,'-',monthinfo,'-',dayinfo) as time_str,
    '1' as group_type,
    '2' as time_type,
    yearinfo,
    quarterinfo,
    monthinfo,
    dayinfo
from itcast_dwd.visit_consult_dwd where msg_count >= 1 and yearinfo = '${yearStr}' and quarterinfo = '${quarterStr}' and monthinfo = '${monthStr}' and dayinfo = '${dayStr}'
group by yearinfo,quarterinfo,monthinfo,dayinfo,area;

insert into table itcast_dws.consult_dws partition(yearinfo,quarterinfo,monthinfo,dayinfo)
select
    count(distinct sid) as sid_total,
    count(distinct session_id) as sessionid_total,
    count(distinct ip) as ip_total,
    area,
    '-1' as origin_channel,
    hourinfo,
    concat(yearinfo,'-',monthinfo,'-',dayinfo,' ',hourinfo) as time_str,
    '1' as group_type,
    '1' as time_type,
    yearinfo,
    quarterinfo,
    monthinfo,
    dayinfo
from itcast_dwd.visit_consult_dwd where msg_count >= 1 and yearinfo = '${yearStr}' and quarterinfo = '${quarterStr}' and monthinfo = '${monthStr}' and dayinfo = '${dayStr}'
group by yearinfo,quarterinfo,monthinfo,dayinfo,hourinfo,area;
"

${HIVE_HOME} -e "${sqlStr}" -S

        # 测试是否执行成功

        sh edu_mode_1_analyse.sh 2021-09-25

        select * from itcast_dws.visit_dws where time_type = '2' and group_type = '5'  # 增量操作之前,没有2021-09-25的数据

        select * from itcast_dws.visit_dws where time_type = '2' and group_type = '5'  # 增量操作之后,有2021-09-25的数据了

        select * from itcast_dws.consult_dws where time_type = '2' and group_type = '3'  # 增量操作之前

        select * from itcast_dws.consult_dws where time_type = '2' and group_type = '3'   # 增量操作之后

        17、增量流程-增量数据分析oozie自动化调度

        18、增量流程-增量数据导出

        由于mysql中的数据与DWS层数据完全一样,在mysql中已经导出了历史的统计数据,因此在MySQL中需要先删除受影响的历史统计结果(注意mysql支持数据的删除操作delete),然后增量导出新的结果。

        为了简化操作,由于受影响最大的范围是当年的数据,我们可以直接将当年的统计结果数据全部删除,然后重新导出;以2021-09-25日新增数据为例,我们可以将2021年受影响的数据及不受影响的数据全部删除,重新导出。因为DWS层的数据是结果数据,本身数据量比较小,通过全部删除的方式可以有助于sql脚本的编写,同时效率也不会有太大影响。

        思考:如何删除全年的数据?以2021年为例

        delete from scrm_bi.visit_dws where yearinfo = '2021';

        delete from scrm_bi.consult_dws where yearinfo = '2021';

        # shell脚本

        要求:如果指定了日期参数,则删除该日期所对应年的所有数据,重新导出该年的数据;如果没有指定日期参数,则删除前一天所对应年的所有数据,重新导出所删年的数据。

        cd /root

        vim edu_mode_1_export.sh

#!/bin/bash

SQOOP_HOME=/usr/bin/sqoop

if [ $# == 1 ]; then
    dateStr=$1
else
    dateStr=`date -d '-1 day' +'%Y-%m-%d'`
fi

yearStr=`date -d ${dateStr} +'%Y'`

mysql -uroot -p123456 -hhadoop01 -P3306 -e "delete from scrm_bi.visit_dws where yearinfo = '${yearStr}'; delete from scrm_bi.consult_dws where yearinfo = '${yearStr}';"

jdbcUrl='jdbc:mysql://hadoop01:3306/scrm_bi?useUnicode=true&characterEncoding=utf-8'
username='root'
password='123456'
m='1'
hivedatabase='itcast_dws'

${SQOOP_HOME} export \
--connect ${jdbcUrl} \
--username ${username} \
--password ${password} \
--table visit_dws \
--hcatalog-database ${hivedatabase} \
--hcatalog-table visit_dws \
--hcatalog-partition-keys yearinfo \
--hcatalog-partition-values ${yearStr} \    # 增量导出,只导出该分区下的数据
-m ${m}

${SQOOP_HOME} export \
--connect ${jdbcUrl} \
--username ${username} \
--password ${password} \
--table consult_dws \
--hcatalog-database ${hivedatabase} \
--hcatalog-table consult_dws \
--hcatalog-partition-keys yearinfo \
--hcatalog-partition-values ${yearStr} \
-m ${m}

        # 测试是否成功执行导出操作

        sh edu_mode_1_export.sh 2021-09-25

        select * from scrm_bi.visit_dws where time_type = '2' and group_type = '5'  # 增量操作之前

        select * from scrm_bi.visit_dws where time_type = '2' and group_type = '5'  # 增量操作之后

        select * from scrm_bi.consult_dws where time_type = '2' and group_type = '3'  # 增量操作之前

        select * from scrm_bi.consult_dws where time_type = '2' and group_type = '3'   # 增量操作之后

        ​​​​​​​19、增量流程-增量数据导出oozie自动化调度

         至此访问咨询主题模块完结

注意:

(1)交换空间

        交换空间的意思是当内存不够用时,系统会自动将磁盘的一部分空间转化为内存使用,相当于扩充内存,当然交换空间的效率不如实际内存。

(2)shell的运行

        方式一:如果shell脚本没有x权限,需要chmod加入x权限然后运行

        方式二:如果shell脚本没有x权限,利用sh命令可以直接运行,sh shell脚本路径

(3)Oozie默认内部时区不是上海,需要在hue的配置项中搜索time,重新配置时区为Asia/Shanghai

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐