目录

一、数仓简介

        1、数据仓库诞生原因

        2、数据仓库(Data Warehouse,DW)的基本概念

        3、数据库与数据仓库的区别

         4、数据仓库的建设方案

        5、MPP架构与分布式架构区别

二、数仓基本架构

三、数仓建模方法-宽表模型

四、案例-电商数据统计分析

五、总结

六、案例-电商数据统计分析续


一、数仓简介

        1、数据仓库诞生原因

        历史数据积存:对于企业来说,线上的业务系统随着业务的进行,会源源不断的产生数据。这些数据会存储在企业的业务数据库中(比如mysql....)。但是,随着时间的推移,业务数据库中积压的数据会越来越多,这会使得业务数据库产生一定的负载,导致业务系统运行缓慢,用户体验差。这些堆积的数据中大多数是冷数据,即很久远的数据,使用频率不高的数据。为了避免这些冷数据对业务数据库产生影响,妨碍业务的正常运行,企业需要定期将这些积压的冷数据从业务数据库中转移到一个专门用于存放历史数据的仓库中,这个仓库就是数据仓库。至此,业务数据库只存放高频数据,负荷小,可以很好的支持业务系统的正常运行,如果需要用到历史数据在从数据仓库中进行操作。

        企业数据分析需要:企业将上述历史数据存储到数据仓库中后,这些数据不仅可以帮助改善业务数据库的性能,同时企业可以对这些数据进行分析、统计,从而辅助管理层进行决策。但是,对于某些新兴企业来说,并没有历史数据的积存,这些企业一般有很多部门,他们建立数仓的目的仅仅就是进行数据分析。那么各部门在进行数据分析之前只能从业务数据库中获取数据(利用他们自己的数据抽取系统从业务数据库中抽取数据)。这种方式是存在问题的会导致各部门数据分析的结果不一致,假设部门A早上9点从业务数据库中抽取数据进行分析,而部门B是在下午2点进行的抽取分析,显然它们获取的数据不一致,那分析的结果一定也存在差异。此外,每个部门都有一个数据抽取系统资源浪费严重,且不安全。为了解决这个问题,企业需要统计建立一个数据仓库,使用专门的数据抽取系统定期从业务数据库中抽取数据,这样就不需要每个部门配一个数据抽取系统了,而且解决了各部门分析结果不一致的问题。

         比如数据仓库每天晚上12点抽取一次数据,各部门需要使用数据时,直接从数据仓库中取,显然各部门获取的数据都是一致的。而且将业务数据与历史数据进行划分,业务数据库只针对业务,数仓只针对数据分析,这样企业管理这些数据就更有针对性了,方面了管理,安全性得到提升。

        同时,A部门想要B部门的数据,如果没有数据仓库还需要向B部门申请,很麻烦,有了数据仓库,直接从中取就行了。数据仓库相当于为各部门建立一个统一的数据镜像。

        2、数据仓库(Data Warehouse,DW)的基本概念

        数据仓库是一个面向主题的、集成的、非易失的且随时间变化的数据集合。

        主要用于组织积累的历史数据,并使用分析方法(OLAP在线分析处理、数据分析)进行分析整理,进而辅助决策,为管理者、企业系统提供数据支持,构建商业智能。

        面向主题:由于数仓存储历史数据的主要目的是为了对数据分析提供服务,在数据分析过程中一般会有各种类型的主题(分析用户行为特征主题、分析用户购买商品类型主题),但是直接从业务数据库中获取的数据是零散的,无法满足数据分析的需要。因此,我们需要根据数据分析的主题,将各种分散的数据表进行聚合,这样可以方便后续分析过程。

         集成:原始数据来源于不同的数据源,要整合成最终数据,需要经过ETL清洗过程以及标准化处理。具体来讲,首先聚合而成的大数据里,可能存在一些缺失或异常的数据,我们需要对这些数据进行过滤(Extract);其次,将业务数据聚合成某个主题的大数据表后,需要按照某些需求进行数据分析,因此需要根据不同的需求对该大数据表进行进一步转换(Transform例如添加一些冗余列字段),方便按照需求进行分析;然后将转换后的数据加载(Load)到新的表中;最后原始数据可能来源于不同的数据源,数据中相同字段可能采用不同的书写形式,需要将其进行标准化处理。

         例如,每个表针对性别这一列,有不同的表达方式(m,f;1,0;........);每个表针对相同的字段,有不同的字段名;每个表针对相同字段的描述信息也不一致,都需要标准化处理。

        非易失:数仓中保存的数据是一系列历史快照,从业务数据库中同步过来的数据,这些数据不允许被修改,只允许通过工具进行查询、分析。因此,数据不容易丢失。

        时变性:数仓会定期从业务数据库中抽取、集成新的数据,从而反应出数据的最新变化。

        3、数据库与数据仓库的区别

        4、数据仓库的建设方案

        传统数据仓库:由若干关系型数据库组成大规模并行处理(MPP)集群。

         优点:数据迁移很方便,迁移成本低,因为一般业务数据库是单机的mysql等关系型数据库,将数仓同样建设为关系型数据库,这样同步业务数据库数据时很方便,不需要对数据进行再改造,成本低;传统数仓在数据量没有达到某个量级时,是非常优秀的解决方案;由于传统数仓是单机数据库改造过来的,所以它完全兼容原有的SQL语法。

         缺点:扩展性有限。当数据量超过某个量级时,需要额外增加更多的数据库,但我们并 不能无限制的增加数量,因为数量越多越混乱,越不好管理,出错的频率就越大,安全性会变差。

                    热点问题。传统数仓由若干数据库组成,那么从业务数据库抽取来的数据需要被划分为若干份,分别存储到不同的子数据库中,当子数据库1中的数据是热点数据,访问频率极高,而其他子数据库中的数据访问频率极低,这样会导致子数据库1负荷极大,可能导致宕机、超时。一旦它成为瓶颈,整个系统也会变慢。

         大数据数据仓库:它基于大数据天然的分布式存储和分布式计算,并添加了SQL的支持(Hive)。实质就是将分布式集群的HDFS文件系统组成数仓,完成海量数据的存放。在数据处理方面,为了避免海量数据的移动,造成IO和网络的开销,大数据数仓使用了移动计算的架构,而不是移动数据。传统数仓在面对计算任务时需要大量的数据移动工作,大数据数仓是将计算任务分发到数据存储位置,然后进行计算。

         优点:解决了扩展性有限问题。大数据数据仓库是以文件系统的方式存放数据,数据会以文 件的形式存放,文件这种类型扩展性很好。

                    缓解了热点问题。前面我们学到过向HDFS文件系统中存数据时,数据本身会被划分为若干Block块,且每个块默认都有3个备份,这些备份会分发到其他节点上。正是这种备份机制,使得后续在计算某个任务时,可以将该任务分发到最空闲的数据节点上,这样避免了某个节点负荷过高,导致的性能缓慢。此外值得注意的是,每个Block都有备份,那么系统的容错能力就得到提高,可扩展性得到极大提升。

         缺点:SQL支持率问题。传统数仓本身就是关系型数据库,可以完全兼容SQL。但大数据数 仓实质是HDFS文件系统(Hive当中的数据库与表本质上都是存储在HDFS上/user/hive/warehouse中的文件夹),并不是原生的关系型数据库。因此SQL支持率不如传统数仓。但是,目前随着技术的发展,现有大数据产品的SQL支持率也是越来越高,也接近完全兼容了。

                    缺少事务支持。由于大数据数仓是基于分布式架构的,相对于传统的单机数据库来 说,在分布式架构中实现事务比较复杂。但是值得注意的是在数仓中实现事务并没有那么重要,因为数仓是面向数据分析的。

                    数据量较少时,计算速度很慢。例如前面在hive中跑mapreduce程序,很小很小的数 据却需要跑1-2分钟。但是大数据数仓本身针对的对象是海量数据,小的数据本身不需要大数据数仓来完成。

        基于此,可以看出相比于大数据数仓的优点,它的缺点就不足为道了。

        总结:大数据数据仓库是一个很庞大的概念,它不仅可以存放海量的数据,在其内部还构建了很多其他组件例如数据聚合、数据清洗ETL、数据标准化、Hive、Hbase、分布式存储计算,资源调度(hadoop的三大组件)。如下图大方框中的内容全部属于大数据数仓。

        5、MPP架构与分布式架构区别

        (1)MPP架构

        将单机数据库节点组成集群,提升整体性能。节点之间采用非共享架构,每个节点有独立的磁盘存储系统和内存系统。有计算任务后每个节点独立运行,不需要关系集群的整体状态(每个节点各顾各的)。

        节点之间通过专用网络相互连接,因为MPP架构会有大量的数据移动,普通局域网无法完成。只能整体对外提供服务,单个节点无法单独运行局部的一个程序,因为数据不透明,单个节点独立运行找不到需要的数据在那。

        MPP本质上是关系型数据库(业务数据库),它继承了业务数据库的优点,设计上优先考虑C(一致性),其次A(可用性),尽量做好P(分区容错性)。适合中等规模的结构化数据处理。

        MPP的缺点:扩展性有限、热点问题。

                               数据存储位置不透明,查询任务在所有节点均会执行。

        (2)分布式架构(Hadoop)

        MPP架构的数据存储与计算是综合在一起的,因此我们并不知道数据在哪里存放。而分布式架构将分布式存储HDFS与分布式计算MapReduce划分开来,这样数据就透明了,各节点就可以单独运行局部应用了。

        分布式架构使用局域网或广域网连接,通信开销大,不如MPP的专用网络。因此分布式架构在运算时致力于减少数据移动,而采用移动计算架构。

        在设计上优先考虑分区容错性P,其次A,最后C。存储数据时每个Block有3个备份。

        (3)MPP+分布式架构

        数据存储采用HDFS,提高分区容错性;上层架构采用MPP,减少运算时延

二、数仓基本架构

        1、数仓的整体架构分为4个部分分别是:ETL、ODS层、CDM层和ADS层。        

         (1)首先定时从业务数据库中抽取历史数据到数据仓库中的ODS层,该层与原始数据保持一致,目的就是保存原始数据,且该层的数据不允许被修改。业务数据库一般保存最新的数据,当历史数据导入到数据仓库后,为了保障业务数据库正常运行,都会将这部分历史数据从业务数据库中删除。如果后续需要用到这些历史数据就直接从ODS层中获取。因为有ODS层的存在,保证了数仓的非易失性。

        (2)其次,将ODS层的数据进行ETL清洗存储到DWD层,为数据分析作服务,该层的数据是经过清洗后的数据,但同样还是零散的。

        (3)然后将DWD层的数据按照主题进行聚合汇总形成一张大表,并存储到DWS层,为数据分析作服务。

        (4)最后拿到DWS层的数据进行数据分析,并将分析的结果可视化展示、形成报表.....。

        总结: ETL进行数据清洗

                   ODS层保存历史数据,完成数据的积存

                   CDM层服务于数据分析

                   ADS层对结果的应用

        2、数据抽取与ETL

       数据抽取方式有全量同步、增量同步两种方式

                全量同步会将业务数据库中的全部数据进行抽取,一般用于初始化数据装载

                增量同步会检测数据的变动,抽取发生变动的数据,一般用于数据更新

       ETL包含数据过滤、转换、加载。数据过滤用于对数据中出现的重复、二义性、不完整、异常、违法业务逻辑的数据进行过滤。数据转换用于对数据进行标准化、增加一些冗余列,方便后续按需求进行统计等等。结构化数据一般本身符合规范在数据过滤和转换过程中的逻辑简单,非|半结构化数据的过滤转换工作较复杂,较费时。

        数据抽取和ETL规则的设计和实施约占整个数仓搭建工作量的60%-80%。

        注意:有的人将数据抽取(Extract)与ETL一起统称为ETL。

        常见的ETL工具:Sqoop、Kettle、Datastage、Informatica、Kafka     (结构化数据ETL工具)

                                     Flume、Logstash        (非|半结构化数据ETL工具)

        3、ODS层

        ODS层数据与原业务数据保持一致,在一致的基础上可以增加额外字段用来方便数据管理。因此在实际的企业开发中ODS层是原始业务数据的一个扩充,

         问题1:我们注意到update_type中有insert、update,ODS层的数据不是不允许修改吗?为什么这里会有insert与update呢?

        首先insert类型一定是有的,因为ODS层的历史数据一定是从业务数据库中经数据抽取插入进去的,因此一开始数据都是insert类型。插入完成后,业务数据库中的历史数据将被删除,后续想要访问历史数据需要操作ODS层。那对于ODS层的历史数据业务数据库就只有查询功能吗,显然不是,一定可以修改,只不过这种修改不在ODS层中进行。业务数据库可以先把想要修改的历史数据从ODS层中查询出来,然后在业务系统里对其进行修改,最后追加(增量同步)到ODS层中。该数据的类型就变成了update。通过insert与update可以标识那些数据是新增的那些数据是修改的。

        问题2:在追加的过程中一定是即有修改的数据又有最新的数据,ODS层如何识别那些是最新的数据,那些是修改的数据呢?

        为了识别不同类型的数据,ODS层在抽取到数据后会与已有的数据进行join操作。没有join成功的数据一定是最新的数据(insert类型),join成功的数据一定是修改的数据(update类型)。update类型的数据在ODS层是冗余数据,业务数据库对历史数据修改的越频繁,数据仓库中的冗余数据会越来越多。为了解决冗余数据多的问题,企业大多使用外连接&全覆盖的方式进行追加。

        外连接&全覆盖方式就是将ODS层的历史数据抽取出来与增量数据在外部进行join操作,识别出那些是最新数据,那些是修改数据,然后直接在外部(内存中)对历史数据要修改的数据进行修改,最后将历史数据与最新数据全覆盖到ODS层,这样就不会出现冗余数据同时能够保持与业务系统一致的数据。

        4、CDM、ADS层

        DWD层对ODS层的数据进行清洗、标准化、维度退化。

        维度退化就是例如某公司是个大厂,北京、上海、广州、深圳等地都有分公司。每个分公司都有一套业务系统,都会产生一堆表,比如北京、上海、广州、深圳的分公司都有用户表,这样就有4个用户表,只是地域维度不同,因此需要将地域维度退化掉,将4张表合并为一张表增加一个city字段,这样就将地域维度退化掉了。

         维度退化是将不同维度但相同的表进行组合。总之,DWD层的数据还是零散的,仍然满足3NF。

        DWS层对DWD层的数据按分析主题进行聚合(是将不同的表进行组合)。DWS层的数据是有主题的,已经不满足3NF了,DWS层的数据注重的是如何方便后续的数据分析,提升数据分析的性能。除此之外在DWS层会对聚合后的数据进行建模(比如维度模型:将数据进行分类整理划分为维度表和事实表;宽表模型:将维度表与事实表在融合),目的是将聚合后的大表进行整理,加快后续数据分析效率。

         ADS层(数据应用层,数据分析层)也称为数据集市,用于存储数据分析的结果,为不同业务场景提供接口。比如后续要对结果进行报表决策,可以将分析的结果存储在Kylin中.....。针对不同的查询业务,可以将数据分析的结果存储在利于查询的应用中,因此ADS层可能由多个产品来组成

        总之,数据仓库从广泛上讲就是ODS层,DW层,和ADS层,具体来讲数据仓库实质就是ODS层,用于将分散的业务数据全部(包括不同不同主题的数据)聚集在一起进行相关处理。数据集市就是ADS层即数据分析的结果存放在不同的应用中,方便不同的用户或不同的场景使用。一个数据仓库可以有多个数据集市,因为每个主题都会有各自的分析结果。因此数据集市与主题密切相关。

三、数仓建模方法-宽表模型

        1、OLTP与OLAP

        (1)OLTP

        业务数据库属于OLTP(在线事务处理)系统,为了保证数据的一致性,减少冗余,常用ER模型,ER模型就是尽量将表进行拆分使其满足3NF。

        (2)OLAP

        数仓属于OLAP(在线分析处理)系统,主要操作是复杂分析查询;关注数据整合,以及分析,处理性能。OLAP根据数据存储的方式不同,又分为ROLAP(Relation OLAP,关系型OLAP),MOLAP(Multidimensional OLAP,多维型OLAP),HOLAP(Hybrid OLAP,混合架构的OLAP)。

        ROLAP是最常见的OLAP,使用关系型数据库搭建而成,因此它更注重事务这块,数据分析能力不是太强。为此,MOLAP诞生可以极大的提升数据分析性能,它在数据分析之前预先分组(group by)计算,节省大量计算时间。HOLAP是R与M的综合,查询效率比R高,但低于M。

        2、ROLAP建模方法(我们研究的对象)

        典型的数仓建模方法有ER模型(与OLTP的ER不同这里的ER模型更注重数据分析)、Data Value、Anchor、维度模型,其中维度模型较为常用。ER模型、Data Value、Anchor适用于变动不频繁的数仓,适合于传统的行业。互联网行业本身业务变动比较频繁,不适合使用ER模型、Data Value、Anchor来建模。维度模型比较灵活适应互联网行业的多变性。

        (1)维度模型

        维度模型中,大表被拆分为维度表和事实表,维度是对事实的一种分类、组织。维度一般包括分类、时间、地域等。

         维度模型相当于提前对表进行按不同维度Group by。维度模型根据事实表和维度表的排列方式不同又分为星型模型、雪花模型和星座模型。

        (2)星型模型

        维度只有一层,分析性能最优

        (3)雪花模型

        维度有多层,接近3NF的设计(精细),分析性能略差

        (4)星座模型

        存在多张事实表,事实表之间共享一些维度表,星座模型是大型数仓的常态,是业务增长的结果,与模型设计无关。

        (5)宽表模型

        维度模型在大数据仓库中很不友好,因为把数据拆分为维度表和事实表,后续数据分析时必须join操作,面对海量的数据join操作会带来大量数据的移动。

        宽表模型是维度模型的一个衍生,它将维度冗余到事实表中,形成宽表,以此减少join操作

        3、MOLAP建模方法

        MOLAP是一种以空间换时间的方式,MOLAP将数据进行预分析,并将结果以多维数组形式(CUBE模型)存放。生成CUBE需要大量的时间、空间成本,但这样可以保证后续真正数据分析是效率极快。

         多维分析方法包括:钻取(包括上卷、下钻)、切片、切块、旋转。

         钻取是将维度变细或变粗的过程,例如将月维度变为年(上卷),月维度变为天(下钻)

         切片、切块(多维切片)是对维度进行分割,例如数据有10年1季度、2季度、3季度的数据,我只查询1季度的数据(切片),在1季度的数据中我只查询日用品(切块)

         假设现在是先按时间筛选,然后在对地域筛选,选择就是先对地域筛选,然后在对时间筛选。

四、案例-电商数据统计分析

        1、项目背景

        某电商企业,因数据积存、分析需要,筹划搭建数据仓库,并完成用户复购率的分析计算,支持业务查询需求。

        复购率是指在一段时间内,多次重复购买产品的用户占全部人数的比率

        需求分析:

        统计各个一级品类下,品牌月单次复购率,和多次复购率        

         数据描述: 

         订单表order_info

         订单详情表order_detail

         商品表sku_info

         用户表user_info          

         商品一级分类表base_category1        

         商品二级分类表base_category2

         商品三级分类表base_category3        

         支付流水表payment_info        

         2、项目架构

       Tez引擎                 :  用于对MapReduce进行优化

        Sqoop                  :用于对数据进行导入与导出

        Azkaban               :自动化任务调度

        Presto                  :为数据分析人员提供快速查询接口

        node11为主节点,为了负载均衡使用上述规划,Hive与sqoop必须在同一个服务器上,这是为了后续利用Azkaban实现任务自动调度。

        3、环境搭建

        因为大数据配置过程比较繁琐,前文也都介绍过了,因此这里就不手动配置了,我们使用自动化脚本实现大数据环境的安装与配置。

        安装3台虚拟机,主机名命名为node11(192.168.11.141),node22(192.168.11.142),node33(192.168.11.143),配置好ssh免密登录和地址映射,关闭防火墙与selinux,然后上传自动化安装脚本automaticDeploy.zip到虚拟机的node11中,解压到/home/hadoop下(该脚本只能在该目录下运行)。

        更改frames.txt文件,配置节点安装信息:

         更改configs.txt文件,设置相关配置:DBa2020*

         更改host_ip.txt文件,设置地址映射:

         hadoop文件夹中记录了所有一键安装大数据软件的脚本:         

         system文件中记录了所有一键配置环境的脚本,例如一键关闭防火墙,一键配置集群固定ip,一键配置JDK,一键配置yum,一键配置ssh免密登录等等。

        因此我们需要先执行system下的脚本,配置前置环境,然后在执行hadoop下的脚本,安装大数据软件。脚本是用来执行的,上述hadoop与system文件并没有x权限:

        chmod +x hadop/* system/*

        接下来上传安装包(frames.zip),并解压至/home/hadoop/automaticDeploy/中(必须解压在该目录下,因为脚本文件在编写时就是按照这个目录编写的)。

         在node22,node33上创建目录/home/hadoop

        这后续执行过程中出现上述错误,原因是整个脚本文件是从windows上传到linux的,在windows环境下换行符为\r\n,在linux环境下换行符为\n,相当于从windows到linux过程中所有文件都多了\r。(如果脚本文件不是从windows上传的而是直接下载的可以忽略这个问题)

        解决方法:将所有文件(包括所有的.sh和.txt文件)都作如下设置:

        vim batchOperate.sh

        :set ff        # 点回车看到是dos类型

        :set ff=unix

        :wq

        将node11中的automaticDeploy文件夹分发到node22、node33上

        scp -r automaticDeploy node22:`pwd`

        scp -r automaticDeploy node33:`pwd`

        4、自动化配置安装

        (1)jdk与hadoop

         脚本(.sh文件)中记录了一推Linux命令,可以实现一键执行,首先cd到system目录下,batchOperate.sh脚本从名字可以看出它主要的功能的批量执行,即实现其他.sh文件的批量执行,这样就不用一个一个.sh的执行了。

        cat batchOperate.sh

        这里我们的主机已经配置好主机映射、关闭防火墙、SSH免密登录了,因此需要将这些注释掉。因人而异可以灵活变动。

        然后在3台服务器上都执行batchOperate.sh脚本,然后进入到hadoop目录下安装大数据软件,执行./installHadoop.sh脚本,注意如果主机名与本文不同需要手动修改后,在执行:

        ./installHadoop.sh

        source /etc/profile

        在node11上初始化hadoop: hadoop namenode -format

        start-all.sh   # 开启hadoop,可以同时开启hdfs与yarn

        HDFS的WEB UI node11:50070

        (2)mysql

        在node22上执行./installMysql.sh

        (3)Hive与Tez

        在node33上执行./installHive.sh

        (4)sqoop

        在node33上执行./installSqoop.sh

         source /etc/profile

        (5)azkaban与presto

        在node11,node22,node33上执行./installPresto.sh     ./installAzkaban.sh   

         在node33上执行   ./installYanagishima.sh (Presto的插件使其可视化)

        这里我们注重的是理解数仓的整体架构流程,上述具体的某些工具和shell脚本的编写后续会详细介绍。

        5、项目开发

        (1)整体开发流程

                业务数据生成

                数据抽取

                创建ODS层,并完成HDFS数据接入

                创建DWD层,并完成ODS层数据导入

                创建DWS层,导入DWD层数据

                创建ADS层,完成数据分析复购率计算

                编写脚本,将ADS层的结果导出到Mysql中,供业务查询

                使用Azkaban调度器,实现脚本自动化运行

        (2)业务数据生成

        在node22上执行:

        export MYSQL_PWD=DBa2020*             # 设置mysql登录密码环境变量,后续登录mysql无

                                                                           需设置密码

        mysql -uroot -e "create database mall;"   # -e选项不需要登录进mysql就可执行sql语句,使用-e选项之前需要export设置密码,登录时无需手动输入密码。

        上传数据生成脚本到linux任意位置(/root/)

        将脚本上传至mysql

        mysql -uroot mall < /root/............sql  (4个sql文件都执行)

        mysql                     # 登录mysql     

        use mall;

        CALL init_data('2020-08-29',300,200,300,FALSE);            

        show tables;

        (3)数据抽取Sqoop

        编写脚本实现数据的抽取,编写脚本的目的是方便后续Azkaban自动化调度

        在node33上执行:

        mkdir -p /home/warehouse/shell

        cd /home/warehouse/shell/

        vim sqoop_import.sh

#!/bin/bash

# 传入两个参数,第一个参数$1是表名,第二个参数$2是时间字段
db_date=$2
echo $db_date
db_name=mall

# 公共抽取函数
import_data() {
sqoop import \
--connect jdbc:mysql://node22:3306/$db_name \
--username root \
--password DBa2020* \
--target-dir  /origin_data/$db_name/db/$1/$db_date \
--delete-target-dir \
--num-mappers 1 \
--fields-terminated-by "\t" \
--query   "$2"' and  $CONDITIONS;'
}

import_sku_info(){
  import_data  "sku_info"  "select 
id, spu_id, price, sku_name, sku_desc, weight, tm_id,
category3_id, create_time 
  from sku_info  where 1=1"
}

import_user_info(){
  import_data "user_info" "select 
id, name, birthday, gender, email, user_level, 
create_time 
from user_info where 1=1"
}

import_base_category1(){
  import_data "base_category1" "select 
id, name from base_category1 where 1=1"
}

import_base_category2(){
  import_data "base_category2" "select 
id, name, category1_id from base_category2 where 1=1"
}

import_base_category3(){
  import_data "base_category3" "select id, name, category2_id from base_category3 where 1=1"
}

import_order_detail(){
  import_data   "order_detail"   "select 
    od.id, 
    order_id, 
    user_id, 
    sku_id, 
    sku_name, 
    order_price, 
    sku_num, 
    o.create_time  
  from order_info o , order_detail od 
  where o.id=od.order_id 
  and DATE_FORMAT(create_time,'%Y-%m-%d')='$db_date'"     # 这里我认为是o.create_time
}

import_payment_info(){
  import_data  "payment_info"   "select 
    id,  
    out_trade_no, 
    order_id, 
    user_id, 
    alipay_trade_no, 
    total_amount,  
    subject , 
    payment_type, 
    payment_time 
  from payment_info 
  where DATE_FORMAT(payment_time,'%Y-%m-%d')='$db_date'"
}

import_order_info(){
  import_data   "order_info"   "select 
    id, 
    total_amount, 
    order_status, 
    user_id, 
    payment_way, 
    out_trade_no, 
    create_time, 
    operate_time  
  from order_info 
  where  (DATE_FORMAT(create_time,'%Y-%m-%d')='$db_date' or DATE_FORMAT(operate_time,'%Y-%m-%d')='$db_date')"
}

case $1 in
  "base_category1")
     import_base_category1
;;
  "base_category2")
     import_base_category2
;;
  "base_category3")
     import_base_category3
;;
  "order_info")
     import_order_info
;;
  "order_detail")
     import_order_detail
;;
  "sku_info")
     import_sku_info
;;
  "user_info")
     import_user_info
;;
  "payment_info")
     import_payment_info
;;
   "all")
   import_base_category1
   import_base_category2
   import_base_category3
   import_order_info
   import_order_detail
   import_sku_info
   import_user_info
   import_payment_info
;;
esac

        注意事项:A、shell文件必须要有shell头#!/bin/bash

                          B、sqoop导入导出数据的原理是先对表进行select查询,然后将查询的结果导入

                                或导出到相应的路径下。因此需要指定--query选项。' and  $CONDITIONS;'是

                                固定语法。

                          C、在实际的企业中select * 一般是不允许的,因为select * 本身效率很低。

                          D、where 1=1 是为了方便SQL拼接(个人写作习惯):

String sql  =  "select * from t_user  where 1=1 "; 
if(!b.equals("")){
	sql += "and 条件";
}
# 有1=1,且当b不等于空时,可以直接拼接and语句,变为
select * from t_user where 1=1 and 条件;

# 没有1=1,且当b不等于空时,无法拼接and,变为
select * from t_user where and 条件;     # 显然and需要两个条件这里只有一个不合法

                          E、前5张表,由于数据量不多我们使用全量抽取,后3张表使用增量抽取。

                          F、订单表与订单详细表在记录时,一般是先在订单表中记录,然后在一定延迟

                              (1-2s)后在记录订单详细表,所以就会有一种同样的订单但时间不一致的情

                                况,比如订单表记录时间是1月1号晚上11:59:59秒,过了2s后第二天的0:0:01

                                秒记录,显然出现本身是前一天的数据却记录在了第二天的记录中,因此,在

                                对订单详细表导入时会以订单表的时间为准。

        chmod +x sqoop_import.sh

        ./sqoop_import.sh all 2020-08-29

        sqoop工具的运行与Hive类似,是将相应的代码转换为mapreduce程序,且只有map任务没有reduce任务,然后在yarn中运行。

        node11:50070查看是否导入成功

        (4)ODS层

        实质就是在Hive上建立与原始数据结构一致的表(创建ODS层,需要编写sql文件),然后将hdfs中的数据加载到hive中(利用load data语法,需要编写shell脚本)。

        在node33上执行:

        hive --service metasotre &

        hive --service hiveserver2 &

         cd /home/warehouse/

        mkdir sql

        cd sql

        vim ods_ddl.sql     # 创建ODS层,默认hive在hdfs中的存储路径为/user/hive/warehouse/,该文件就是我们的数据仓库,后续所有的内容全部在这里存放。当然该位置可通过location选项自行指定。

-- 创建数据库
create database if not exists mall;
use mall;

-- 创建订单表
drop table if exists ods_order_info;
create table ods_order_info ( 
    `id` string COMMENT '订单编号',
    `total_amount` decimal(10,2) COMMENT '订单金额', 
    `order_status` string COMMENT '订单状态', 
    `user_id` string COMMENT '用户id' ,
    `payment_way` string COMMENT '支付方式',  
    `out_trade_no` string COMMENT '支付流水号',  
    `create_time` string COMMENT '创建时间',  
    `operate_time` string COMMENT '操作时间' 
) COMMENT '订单表'
PARTITIONED BY ( `dt` string)
row format delimited  fields terminated by '\t' 
location '/warehouse/mall/ods/ods_order_info/'
tblproperties ("parquet.compression"="snappy")   # 对文件进行压缩,默认情况下文件为txt文件,压缩后大概可以降低1/3的存储大小
;

-- 创建订单详情表
drop table if exists ods_order_detail;
create table ods_order_detail( 
    `id` string COMMENT '订单编号',
    `order_id` string  COMMENT '订单号', 
    `user_id` string COMMENT '用户id' ,
    `sku_id` string COMMENT '商品id',  
    `sku_name` string COMMENT '商品名称',  
    `order_price` string COMMENT '下单价格',  
    `sku_num` string COMMENT '商品数量',  
    `create_time` string COMMENT '创建时间'
) COMMENT '订单明细表'
PARTITIONED BY ( `dt` string)
row format delimited  fields terminated by '\t' 
location '/warehouse/mall/ods/ods_order_detail/'
tblproperties ("parquet.compression"="snappy")
;

-- 创建商品表
drop table if exists ods_sku_info;
create table ods_sku_info( 
    `id` string COMMENT 'skuId',
    `spu_id` string   COMMENT 'spuid', 
    `price` decimal(10,2) COMMENT '价格' ,
    `sku_name` string COMMENT '商品名称',  
    `sku_desc` string COMMENT '商品描述',  
    `weight` string COMMENT '重量',  
    `tm_id` string COMMENT '品牌id',  
    `category3_id` string COMMENT '品类id',  
    `create_time` string COMMENT '创建时间'
) COMMENT '商品表'
PARTITIONED BY ( `dt` string)
row format delimited  fields terminated by '\t' 
location '/warehouse/mall/ods/ods_sku_info/'
tblproperties ("parquet.compression"="snappy")
;

-- 创建用户表
drop table if exists ods_user_info;
create table ods_user_info( 
    `id` string COMMENT '用户id',
    `name`  string COMMENT '姓名', 
    `birthday` string COMMENT '生日' ,
    `gender` string COMMENT '性别',  
    `email` string COMMENT '邮箱',  
    `user_level` string COMMENT '用户等级',  
    `create_time` string COMMENT '创建时间'
) COMMENT '用户信息'
PARTITIONED BY ( `dt` string)
row format delimited  fields terminated by '\t' 
location '/warehouse/mall/ods/ods_user_info/'
tblproperties ("parquet.compression"="snappy")
;

-- 创建商品一级分类表
drop table if exists ods_base_category1;
create table ods_base_category1( 
    `id` string COMMENT 'id',
    `name`  string COMMENT '名称'
) COMMENT '商品一级分类'
PARTITIONED BY ( `dt` string)
row format delimited  fields terminated by '\t' 
location '/warehouse/mall/ods/ods_base_category1/'
tblproperties ("parquet.compression"="snappy")
;

-- 创建商品二级分类表
drop table if exists ods_base_category2;
create external table ods_base_category2( 
    `id` string COMMENT ' id',
    `name`  string COMMENT '名称',
    category1_id string COMMENT '一级品类id'
) COMMENT '商品二级分类'
PARTITIONED BY ( `dt` string)
row format delimited  fields terminated by '\t' 
location '/warehouse/mall/ods/ods_base_category2/'
tblproperties ("parquet.compression"="snappy")
;

-- 创建商品三级分类表
drop table if exists ods_base_category3;
create table ods_base_category3( 
    `id` string COMMENT ' id',
    `name`  string COMMENT '名称',
    category2_id string COMMENT '二级品类id'
) COMMENT '商品三级分类'
PARTITIONED BY ( `dt` string)
row format delimited  fields terminated by '\t' 
location '/warehouse/mall/ods/ods_base_category3/'
tblproperties ("parquet.compression"="snappy")
;

-- 创建支付流水表
drop table if exists `ods_payment_info`;
create table  `ods_payment_info`(
    `id`   bigint COMMENT '编号',
    `out_trade_no`    string COMMENT '对外业务编号',
    `order_id`        string COMMENT '订单编号',
    `user_id`         string COMMENT '用户编号',
    `alipay_trade_no` string COMMENT '支付宝交易流水编号',
    `total_amount`    decimal(16,2) COMMENT '支付金额',
    `subject`         string COMMENT '交易内容',
    `payment_type` string COMMENT '支付类型',
    `payment_time`   string COMMENT '支付时间'
   )  COMMENT '支付流水表'
PARTITIONED BY ( `dt` string)
row format delimited  fields terminated by '\t' 
location '/warehouse/mall/ods/ods_payment_info/'
tblproperties ("parquet.compression"="snappy")
;

        接下来需要将ods_ddl.sql上传到hive中运行,实现ODS层的创建:

        hive -f /home/warehouse/sql/ods_ddl.sql     # 实现将文件在hive中运行

        如果出现不正常的现象,可以执行jps -m来查看该开启的进程是否以开启(-m查看详细信息)。

        hive            # 进入到hive检查是否创建成功

        show databases;

        use mall;

        show tables;

        select * from order_info;    # 空表select count(*) from order_info;

        至此完成ODS层的创建,接下来需要将hdfs上的原始业务数据加载到ODS层:

        cd /home/warehouse/shell/

        vim ods_db.sh

#!/bin/bash

   do_date=$1
   APP=mall
   hive=hive   # 指定hive安装目录下..../bin/hive,这里我们已经设置好环境变量直接写为hive系统会自动找到

sql=" 
load data inpath '/origin_data/$APP/db/order_info/$do_date'  OVERWRITE into table $APP"".ods_order_info partition(dt='$do_date');

load data inpath '/origin_data/$APP/db/order_detail/$do_date'  OVERWRITE into table $APP"".ods_order_detail partition(dt='$do_date');

load data inpath '/origin_data/$APP/db/sku_info/$do_date'  OVERWRITE into table $APP"".ods_sku_info partition(dt='$do_date');

load data inpath '/origin_data/$APP/db/user_info/$do_date' OVERWRITE into table $APP"".ods_user_info partition(dt='$do_date');

load data inpath '/origin_data/$APP/db/payment_info/$do_date' OVERWRITE into table $APP"".ods_payment_info partition(dt='$do_date');

load data inpath '/origin_data/$APP/db/base_category1/$do_date' OVERWRITE into table $APP"".ods_base_category1 partition(dt='$do_date');

load data inpath '/origin_data/$APP/db/base_category2/$do_date' OVERWRITE into table $APP"".ods_base_category2 partition(dt='$do_date');

load data inpath '/origin_data/$APP/db/base_category3/$do_date' OVERWRITE into table $APP"".ods_base_category3 partition(dt='$do_date'); 
"
$hive -e "$sql"   # 在hive中运行该sql语句
# 在sql中$APP"".ods_order_info中""可以不写,这是个人习惯。


# 这里我认为不能使用overwrite这样就无法实现历史数据积存的任务了。

        chmod +x ods_db.sh

        ./ods_db.sh 2020-08-29

        hive

        use mall;

        select * from order_info;         ..........

        至此所有hdfs上的原始业务数据全部加载到ODS层

        (5)DWD

        对ODS层的数据进行ETL清洗,标准化以及维度退化,因业务数据库质量较高,ETL清洗只需要去掉缺失值即可,本身数据就是标准的。分类表可以进行维度退化,维度合并到商品表中。

         DWD层的设计包括三部:首先创建DWD层,然后对ODS层的数据进行ETL清洗,最后将清洗的结果加载到DWD层。

        首先创建DWD层:与ODS层的创建类似,就是在Hive中创建各种表来存放处理后的数据。ODS层有8张表,经维度退化后变为5张表,所以在DWD层新建5张表即可(需要编写sql文件)。

        cd /home/warehouse/sql/

        vim dwd_ddl.sql

-- 进入数据库
use mall;

-- 创建订单表
drop table if exists dwd_order_info;
create external table dwd_order_info ( 
    `id` string COMMENT '',
    `total_amount` decimal(10,2) COMMENT '', 
    `order_status` string COMMENT ' 1 2  3  4  5', 
    `user_id` string COMMENT 'id' ,
    `payment_way` string COMMENT '',  
    `out_trade_no` string COMMENT '',  
    `create_time` string COMMENT '',  
    `operate_time` string COMMENT '' 
) COMMENT ''
PARTITIONED BY ( `dt` string)
stored as  parquet     # 指定文件格式为parquet压缩格式
location '/warehouse/mall/dwd/dwd_order_info/'
tblproperties ("parquet.compression"="snappy")    # 指定具体一snappy方式压缩
;

-- 创建订单详情表
drop table if exists dwd_order_detail;
create external table dwd_order_detail( 
    `id` string COMMENT '',
    `order_id` decimal(10,2) COMMENT '', 
    `user_id` string COMMENT 'id' ,
    `sku_id` string COMMENT 'id',  
    `sku_name` string COMMENT '',  
    `order_price` string COMMENT '',  
    `sku_num` string COMMENT '', 
    `create_time` string COMMENT ''
) COMMENT ''
PARTITIONED BY ( `dt` string)
stored as  parquet
location '/warehouse/mall/dwd/dwd_order_detail/'
tblproperties ("parquet.compression"="snappy")
;

-- 创建用户表
drop table if exists dwd_user_info;
create external table dwd_user_info( 
    `id` string COMMENT 'id',
    `name`  string COMMENT '', 
    `birthday` string COMMENT '' ,
    `gender` string COMMENT '',  
    `email` string COMMENT '',  
    `user_level` string COMMENT '',  
    `create_time` string COMMENT ''
) COMMENT ''
PARTITIONED BY ( `dt` string)
stored as  parquet
location '/warehouse/mall/dwd/dwd_user_info/'
tblproperties ("parquet.compression"="snappy")
;

-- 创建支付流水表
drop table if exists `dwd_payment_info`;
create external  table  `dwd_payment_info`(
    `id`   bigint COMMENT '',
    `out_trade_no`   string COMMENT '',
    `order_id`        string COMMENT '',
    `user_id`         string COMMENT '',
    `alipay_trade_no` string COMMENT '',
    `total_amount`    decimal(16,2) COMMENT '',
    `subject`         string COMMENT '',
    `payment_type` string COMMENT '',
    `payment_time`   string COMMENT ''
   )  COMMENT ''
PARTITIONED BY ( `dt` string)
stored as  parquet
location '/warehouse/mall/dwd/dwd_payment_info/'
tblproperties ("parquet.compression"="snappy")
;

-- 创建商品表(增加分类)
drop table if exists dwd_sku_info;
create external table dwd_sku_info( 
    `id` string COMMENT 'skuId',
    `spu_id` string COMMENT 'spuid', 
    `price` decimal(10,2) COMMENT '' ,
    `sku_name` string COMMENT '',  
    `sku_desc` string COMMENT '',  
    `weight` string COMMENT '',  
    `tm_id` string COMMENT 'id',  
    `category3_id` string COMMENT '1id',  
    `category2_id` string COMMENT '2id',  
    `category1_id` string COMMENT '3id',  
    `category3_name` string COMMENT '3',  
    `category2_name` string COMMENT '2',  
    `category1_name` string COMMENT '1',  
    `create_time` string COMMENT ''
) COMMENT ''
PARTITIONED BY ( `dt` string)
stored as  parquet
location '/warehouse/mall/dwd/dwd_sku_info/'
tblproperties ("parquet.compression"="snappy")
;

        hive -f dwd_ddl.sql

        检查是否创建成功:

        hive

        use mall;

        show tables;     # 可以看到新创的5个表已经创建成功

        接下来实现数据预处理及加载工作(需要编写shell脚本):

        cd /home/warehouse/shell/

        vim dwd_db.sh

#!/bin/bash

# 定义变量方便修改
APP=mall
hive=hive

# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
# -n选项判断输入的值是否不空
if [ -n $1 ] ;then
	log_date=$1
else 
	log_date=`date  -d "-1 day"  +%F`    # date -d 的作用是将""中的日期以指定的格式显示,%F=年-月-日格式,如果""中没指定日期而是类似计算的方式,则以当前时间为基准进行计算,这里表示前一天时间
fi 

sql="

set hive.exec.dynamic.partition.mode=nonstrict;

insert  overwrite table   "$APP".dwd_order_info partition(dt)     # 应该这样写partition(dt=$log_date)但是由于本身数据中dt字段只有一个时间,不写具体的值系统也可以自动识别
select  * from "$APP".ods_order_info 
where dt='$log_date'  and id is not null;
 
insert  overwrite table   "$APP".dwd_order_detail partition(dt)
select  * from "$APP".ods_order_detail 
where dt='$log_date'   and id is not null;

insert  overwrite table   "$APP".dwd_user_info partition(dt)
select  * from "$APP".ods_user_info
where dt='$log_date'   and id is not null;
 
insert  overwrite table   "$APP".dwd_payment_info partition(dt)
select  * from "$APP".ods_payment_info
where dt='$log_date'  and id is not null;

insert  overwrite table   "$APP".dwd_sku_info partition(dt)
select  
    sku.id,
    sku.spu_id, 
    sku.price,
    sku.sku_name,  
    sku.sku_desc,  
    sku.weight,  
    sku.tm_id,  
    sku.category3_id,  
    c2.id category2_id ,  
    c1.id category1_id,  
    c3.name category3_name,  
    c2.name category2_name,  
    c1.name category1_name,  
    sku.create_time,
    sku.dt
from
    "$APP".ods_sku_info sku 
join "$APP".ods_base_category3 c3 on sku.category3_id=c3.id 
    join "$APP".ods_base_category2 c2 on c3.category2_id=c2.id 
    join "$APP".ods_base_category1 c1 on c2.category1_id=c1.id 
where sku.dt='$log_date'  and c2.dt='$log_date'  
and  c3.dt='$log_date' and  c1.dt='$log_date' 
and sku.id is not null;

"
$hive -e "$sql"

        chmod +x dwd_db.sh

        ./dwd_db.sh 2020-08-29

        检查是否加载成功:

        hive

        use mall;

        select count(*) from dwd_sku_info;

        至此ODS层的数据经ETL清洗加载到了DWD层,当然这里的清洗很简单在实际业务环境中一定要比这复杂,后续大家可以针对具体的业务数据特点进行灵活处理。

        (6)DWS

        DWS层对DWD层零散的数据表按具体的主题进行聚合,形成宽表模型,方便后续数据分析。主题的归纳具有通用性,后续也可能会随着分析业务的增加而扩展。(主题不能太细)

         在电商行业里较为通用的主题表有用户行为宽表(user_id为主键)和用户购买商品明细表(user_id与sku_id为联合主键)。用户行为表可以分析与用户行为相关的信息,用户购买商品明细表可以统计与商品相关的信息以及分析用户购买习惯,比如这里的复购率分析。

        DWS层同理分两步,首先创建DWS层,然后select出宽表模型并insert到DWS层。

        创建DWS层:

        cd /home/warehouse/sql/

        vim dws_ddl.sql

-- 进入数据库
use mall;

-- 创建用户行为宽表
drop table if exists dws_user_action;
create  external table dws_user_action 
(   
    user_id         string      comment '用户 id',
    order_count     bigint      comment '下单次数 ',
    order_amount    decimal(16,2)  comment '下单金额 ',
    payment_count   bigint      comment '支付次数',
    payment_amount  decimal(16,2) comment '支付金额 '
) COMMENT '每日用户行为宽表'
PARTITIONED BY ( `dt` string)
stored as  parquet 
location '/warehouse/mall/dws/dws_user_action/'
tblproperties ("parquet.compression"="snappy");

-- 创建用户购买商品明细表
drop table if exists  dws_sale_detail_daycount;
create external table  dws_sale_detail_daycount
(   user_id   string  comment '用户 id',
    sku_id    string comment '商品 Id',
    user_gender  string comment '用户性别',
    user_age string  comment '用户年龄',
    user_level string comment '用户等级',
    order_price decimal(10,2) comment '订单价格',
    sku_name string   comment '商品名称',
    sku_tm_id string   comment '品牌id',
    sku_category3_id string comment '商品三级品类id',
    sku_category2_id string comment '商品二级品类id',
    sku_category1_id string comment '商品一级品类id',
    sku_category3_name string comment '商品三级品类名称',
    sku_category2_name string comment '商品二级品类名称',
    sku_category1_name string comment '商品一级品类名称',
    spu_id  string comment '商品 spu',
    sku_num  int comment '购买个数',
    order_count string comment '当日下单单数',
    order_amount string comment '当日下单金额'
) COMMENT '用户购买商品明细表'
PARTITIONED BY ( `dt` string)
stored as  parquet 
location '/warehouse/mall/dws/dws_user_sale_detail_daycount/'
tblproperties ("parquet.compression"="snappy");

        hive -f dws_ddl.sql

        hive

        use mall;

        show tables;

        cd /home/warehouse/shell/

        vim dws_db.sh

#!/bin/bash

# 定义变量方便修改
APP=mall
hive=hive

# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n $1 ] ;then
	log_date=$1
else 
	log_date=`date  -d "-1 day"  +%F`  
fi 

# 用户行为宽表
function user_actions()
{
    # 定义变量
    APP=$1
    hive=$2
    log_date=$3

    sql="
    
    with  
    tmp_order as
    (
        select 
        user_id, 
        sum(oc.total_amount) order_amount, 
        count(*)  order_count
        from "$APP".dwd_order_info  oc
        where date_format(oc.create_time,'yyyy-MM-dd')='$log_date'
        group by user_id
    ),
    tmp_payment as
    (
        select 
        user_id, 
        sum(pi.total_amount) payment_amount, 
        count(*) payment_count 
        from "$APP".dwd_payment_info pi 
        where date_format(pi.payment_time,'yyyy-MM-dd')='$log_date'
        group by user_id
    )

    insert  overwrite table "$APP".dws_user_action partition(dt='$log_date')
    select 
        user_actions.user_id, 
        sum(user_actions.order_count), 
        sum(user_actions.order_amount),
        sum(user_actions.payment_count), 
        sum(user_actions.payment_amount)
    from 
    (
        select 
        user_id, 
        order_count,
        order_amount ,
        0 payment_count , 
        0 payment_amount
        from tmp_order 

        union all
        select 
        user_id, 
        0,
        0, 
        payment_count, 
        payment_amount
        from tmp_payment
    ) user_actions
    group by user_id;

    "

    $hive -e "$sql"
}

function user_sales()
{
    # 定义变量
    APP=$1
    hive=$2
    log_date=$3

    sql="

    set hive.exec.dynamic.partition.mode=nonstrict;

    with
    tmp_detail as
    (
        select 
            user_id,
            sku_id, 
            sum(sku_num) sku_num ,   
            count(*) order_count , 
            sum(od.order_price*sku_num)  order_amount 
        from "$APP".dwd_order_detail od
        where od.dt='$log_date' and user_id is not null
        group by user_id, sku_id
    )  
    insert overwrite table  "$APP".dws_sale_detail_daycount partition(dt='$log_date')
    select 
        tmp_detail.user_id,
        tmp_detail.sku_id,
        u.gender,
        months_between('$log_date', u.birthday)/12  age, 
        u.user_level,
        price,
        sku_name,
        tm_id,
        category3_id ,  
        category2_id ,  
        category1_id ,  
        category3_name ,  
        category2_name ,  
        category1_name ,  
        spu_id,
        tmp_detail.sku_num,
        tmp_detail.order_count,
        tmp_detail.order_amount 
    from tmp_detail 
    left join "$APP".dwd_user_info u 
    on u.id=tmp_detail.user_id  and u.dt='$log_date'
    left join "$APP".dwd_sku_info s on tmp_detail.sku_id =s.id  and s.dt='$log_date';

    "
    $hive -e "$sql"
}

user_actions $APP $hive $log_date
user_sales $APP $hive $log_date

         chmod +x dws_db.sh

        ./dws_db.sh 2020-08-29

        hive

        use mall;

        select count(*) from dws_user_action;

        至此构建好两张主题表,后续可以进行数据分析了

        (7)ADS/DA

        对DWS层的数据进行数据分析,这里我们进行复购率统计,然后将分析的结果保存到ADS层,因此该过程需要两步:首先创建ADS层,然后进行数据分析并将结果加载至ADS层。

        创建ADS层: 

        cd /home/warehouse/sql/

        vim ads_sale_ddl.sql

-- 进入数据库
use mall;

-- 创建品牌复购率表
drop  table ads_sale_tm_category1_stat_mn;
create  table ads_sale_tm_category1_stat_mn
(   
    tm_id string comment '品牌id ' ,
    category1_id string comment '1级品类id ',
    category1_name string comment '1级品类名称 ',
    buycount   bigint comment  '购买人数',
    buy_twice_last bigint  comment '两次以上购买人数',
    buy_twice_last_ratio decimal(10,2)  comment  '单次复购率', 
    buy_3times_last   bigint comment   '三次以上购买人数',
    buy_3times_last_ratio decimal(10,2)  comment  '多次复购率' ,
    stat_mn string comment '统计月份',
    stat_date string comment '统计日期' 
)   COMMENT '复购率统计'
row format delimited  fields terminated by '\t' 
location '/warehouse/mall/ads/ads_sale_tm_category1_stat_mn/'
;

        hive -f ads_sale_ddl.sql

        hive 

        use mall;

        show tables;

        cd /home/warehouse/shell/

        vim ads_sale.sh

#!/bin/bash

# 定义变量方便修改
APP=mall
hive=hive

# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n $1 ] ;then
	log_date=$1
else 
	log_date=`date  -d "-1 day"  +%F`  
fi 

sql="

set hive.exec.dynamic.partition.mode=nonstrict;

insert into table "$APP".ads_sale_tm_category1_stat_mn
select   
    mn.sku_tm_id,
    mn.sku_category1_id,
    mn.sku_category1_name,
    sum(if(mn.order_count>=1,1,0)) buycount,
    sum(if(mn.order_count>=2,1,0)) buyTwiceLast,
    sum(if(mn.order_count>=2,1,0))/sum( if(mn.order_count>=1,1,0)) buyTwiceLastRatio,
    sum(if(mn.order_count>3,1,0))  buy3timeLast  ,
    sum(if(mn.order_count>=3,1,0))/sum( if(mn.order_count>=1,1,0)) buy3timeLastRatio ,
    date_format('$log_date' ,'yyyy-MM') stat_mn,
    '$log_date' stat_date
from 
(     
    select od.sku_tm_id, 
        od.sku_category1_id,
        od.sku_category1_name,  
        user_id , 
        sum(order_count) order_count
    from  "$APP".dws_sale_detail_daycount  od 
    where date_format(dt,'yyyy-MM')<=date_format('$log_date' ,'yyyy-MM')
    group by od.sku_tm_id, od.sku_category1_id, od.sku_category1_name, user_id 
) mn
group by mn.sku_tm_id, mn.sku_category1_id, mn.sku_category1_name;

"
$hive -e "$sql"

        chmod +x ads_sale.sh

        ./ads_sale.sh 2020-08-29

        hive

        use mall;

        select count(*) from ads_sale_tm_category1_stat_mn;

        至此数据分析结束,分析的结果保存在ADS层中。

        (8)结果导出

        由于现在我们需要将分析的结果用于各业务部门使用,但是ADS层属于数据仓库,无法提供外部应用的快速查询,因此需要将ADS层中的结果导出到关系型数据库MySQL中(利用sqoop工具进行)。

        可以看出这里也分两步,首先在node22上进入mysql创建相应的表来存放分析结果,然后在node33上利用sqoop工具将结果导出至mysql中。

        在node22中执行:

        mkdir -p /home/warehouse/sql

        cd /home/warehouse/sql/

        vim mysql_sqle_ddl.sql

-- 进入数据库
use mall;

-- 创建复购率表
create  table ads_sale_tm_category1_stat_mn
(   
    tm_id varchar(200) comment '品牌id ' ,
    category1_id varchar(200) comment '1级品类id ',
    category1_name varchar(200) comment '1级品类名称 ',
    buycount   varchar(200) comment  '购买人数',
    buy_twice_last varchar(200) comment '两次以上购买人数',
    buy_twice_last_ratio varchar(200) comment  '单次复购率', 
    buy_3times_last   varchar(200) comment   '三次以上购买人数',
    buy_3times_last_ratio varchar(200)  comment  '多次复购率' ,
    stat_mn varchar(200) comment '统计月份',
    stat_date varchar(200) comment '统计日期' 
)  

        export MYSQL_PWD=DBa2020*

        mysql -uroot mall < /home/warehouse/sql/mysql_sqle_ddl.sql

        mysql

        use mall;

        show tables;

        在node33中执行:

        cd /home/warehouse/shell/

        vim sqoop_export.sh

#!/bin/bash

db_name=mall

export_data() {
sqoop export \
--connect "jdbc:mysql://node22:3306/${db_name}?useUnicode=true&characterEncoding=utf-8"  \
--username root \
--password DBa2020* \
--table $1 \        # 设置导出到MySQL中的哪个表里即目标路径
--num-mappers 1 \
--export-dir /warehouse/$db_name/ads/$1 \       # 源数据路径
--input-fields-terminated-by "\t"  \
--update-key "tm_id,category1_id,stat_mn,stat_date" \
--update-mode allowinsert \
--input-null-string '\\N'    \   # hive中分析出的结果很有可能存在null值,在sqoop导出数据到MySQL时指定string类型或非string类型的列,如果出现null,在MySQL中使用\N表示。
--input-null-non-string '\\N'  
}

case $1 in
  "ads_sale_tm_category1_stat_mn")
     export_data "ads_sale_tm_category1_stat_mn"
;;
   "all")
     export_data "ads_sale_tm_category1_stat_mn"
;;
esac

# sqoop export 一共有两种导出模式分别是updateonly,allowinsert
# updateonly模式为默认模式,仅仅只能更新已有的记录数据(修改操作),无法插入新数据。具体来讲,根据update key字段将MySQL中原有的数据与从hive中新导出的数据进行比较,如果update key匹配,则对MySQL数据库中相应的字段进行更新,但是hive中不匹配的数据无法insert。
# allowinsert模式,可以实现既更新已存在的数据,同时插入新数据。相当于update&insert

        chmod +x sqoop_export.sh

        ./sqoop_export.sh all

        在node22执行:

        mysql

        use mall;

        select * from ads_sale_tm_category1_stat_mn;

        至此数据已导出到MySQL中,数仓的整体架构到这里就结束了,业务人员可以通过MySQL快速的查询出数据分析的结果。

        (9)Azkaban

        从上述步骤可以总结出搭建数仓的具体流程为:

  • 数据抽取:利用sqoop工具将业务数据库中的原始数据导入到HDFS中
  • 创建ODS层:用于历史数据积存
  • 将HDFS中的数据加载至ODS层
  • 创建DWD层:保存ODS层经预处理后的数据
  • 将ODS层数据进行ETL清洗,标准化,维度退化,并将处理后的数据加载至DWD层
  • 创建DWS层:保存DWD层经聚合后的数据
  • 将DWD层数据按照某具体主题聚合,形成宽表模型,并加载至DWS层
  • 创建ADS层:保存数据分析的结果
  • 将DWS层数据按需求进行分析、统计,并将结果加载至ADS层
  • 在业务数据库中创建结果表:用于保存分析结果,供业务人员查看
  • 数据导出:利用sqoop工具将HDFS(数仓hive中)中的分析结果导出至业务数据库

        具体执行文件顺序为:

  • 执行sqoop_import.sh
  • 执行ods_ddl.sql
  • 执行ods_db.sh
  • 执行dwd_ddl.sql
  • 执行dwd_db.sh
  • 执行dws_ddl.sql
  • 执行dws_db.sh
  • 执行ads_sale_ddl.sql
  • 执行ads_sale.sh
  • 执行mysql_sqle_ddl.sql
  • 执行sqoop_export.sh

        在这里我们利用Azkaban工具实现数据分析任务的自动化调度,Azkaban工具是通过WEB界面来进行作业调度的,具体步骤是:

                A:创建job文件,设置任务流,然后压缩

                B:在相关服务器上开启Azkaban服务与WEB UI

                C:将压缩好的文件上传至WEB UI执行调度任务

需要注意的是:第一次创建数仓时需要创建各ODS,DWD....层,在自动化调度时各层已经创建好了就不需要再次创建了。首先模拟新数据的生成:

        在node22上执行:

        mysql -uroot -p

        use mall;

        CALL init_data('2020-08-30',300,200,300,FALSE);

        exit;

        其次创建所需要的job文件:

        创建import.job文件

        type=command                  # 表示命令类型

        do_date=${dt}                    # 调用import.job时需要传入dt参数 

        command=/home/warehouse/shell/sqoop_import.sh all ${do_date}     # 要执行的命令

        # 没有设置dependencies说明该文件第一个执行

        创建ods.job文件

        type=command

        do_date=${dt}

        dependencies=import       # 指明文件的依赖,该文件依赖于import.job文件既必须

                                                    import.job执行完才能执行ods.job文件

        command=/home/warehouse/shell/ods_db.sh ${do_date}

        创建dwd.job文件

        type=command

        do_date=${dt}

        dependencies=ods

        command=/home/warehouse/shell/dwd_db.sh ${do_date}

        创建dws.job文件

        type=command

        do_date=${dt}

        dependencies=dwd

        command=/home/warehouse/shell/dws_db.sh ${do_date}

        创建ads.job文件

        type=command

        do_date=${dt}

        dependencies=dws

        command=/home/warehouse/shell/ads_sale.sh ${do_date}

        创建export.job文件

        type=command

        do_date=${dt}

        dependencies=ads

        command=/home/warehouse/shell/sqoop_export.sh all

        将上述所有.job文件压缩为mall-job.zip包

        接下来启动Azkaban的服务,在node11,22,33上执行:

        azkaban-executor-start.sh

        检查是否启动成功

        jps    # 出现AzkabanExectorServer进程

        (azkaban-executor-shutdown.sh)

        然后在node33(存放着所有需要的shell脚本,这也是为什么将hive与sqoop配置在同一台服务器上的原因)上启动Azkaban的WEB UI

        cd /opt/app/azkaban/server/

        azkaban-web-start.sh

        在浏览器中打开node33:8443    账户密码都是admin

         传入参数dt为2020-08-30 

         这里我们就不定时了,直接右下角点击执行。

         蓝色表示正在执行,执行成功后变为绿色。

        查看结果

        在node22中执行

        mysql -uroot -p

        use mall;

        select * from ads_sale_tm_category1_stat_mn;

五、总结

六、案例-电商数据统计分析续

        1、需求 :计算成交总额GMV

        2、设计

        目前整个数仓已经搭建完成,用户行为宽表也在DWS层建好,因此我们只需要关注ADS层设计和数据导出设计即可,首先需要在ADS层中创建用于存放GMV的新表,然后编写shell脚本实现数据分析并将结果过保存至ADS层新表,其次,在MySQL数据库中创建GMV新表,最后利用sqoop工具将ADS层的GMV表导出至MySQL数据库。后续运行就可以使用Azkaban工具替换原有的数据分析和数据导出shell脚本完成任务自动化调度。

        在node33上执行:

        # ADS层

        cd /home/warehouse/sql/

        vim ads_gmv_ddl.sql

use mall;

drop table if exists ads_gmv_sum_day;
create table ads_gmv_sum_day(
    `dt` string,
    `gmv_count` bigint,
    `gmv_amount` decimal(16,2),
    `gmv_payment` decimal(16,2)
)
row format delimited fields terminated by '\t'
location '/warehouse/mall/ads/ads_gmv_sum_day/'
;

        hive -f ads_gmv_ddl.sql

        hive 

        use mall;

        show tables;

        cd /home/warehouse/shell/

        vim ads_gmv.sh

#!/bin/bash

# 定义变量方便修改
APP=mall
hive=hive

# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n $1 ] ;then
	log_date=$1
else 
	log_date=`date  -d "-1 day"  +%F`  
fi 

sql="

set hive.exec.dynamic.partition.mode=nonstrict;

insert into table "$APP".ads_gmv_sum_day 
select 
'$log_date' dt ,
    sum(order_count)  gmv_count ,
    sum(order_amount) gmv_amount ,
    sum(payment_amount) payment_amount 
from "$APP".dws_user_action 
where dt ='$log_date'
group by dt
;

"
$hive -e "$sql"

        chmod +x ads_gmv.sh

        ./ads_gmv.sh 2020-08-30

        hive

        use mall;

        select * from ads_gmv_sum_day;

        # 数据导出

        在node22上执行:

        cd /home/warehouse/sql/

        vim mysql_gmv_ddl.sql

use mall;

create table ads_gmv_sum_day(
    dt varchar(200),
    mv_count varchar(200),
    gmv_amount varchar(200),
    gmv_payment varchar(200)
);

        export MYSQL_PWD=DBa2020*

        mysql -uroot mall < mysql_gmv_ddl.sql

        mysql 

        use mall;

        show tables;

        在node33上执行:

        cd /home/warehouse/shell/

        vim sqoop_gmv_export.sh

#!/bin/bash

db_name=mall

export_data() {
sqoop export \
--connect "jdbc:mysql://node22:3306/${db_name}?useUnicode=true&characterEncoding=utf-8"  \
--username root \
--password DBa2020* \
--table $1 \
--num-mappers 1 \
--export-dir /warehouse/$db_name/ads/$1 \
--input-fields-terminated-by "\t"  \
--update-key "dt" \
--update-mode allowinsert \
--input-null-string '\\N'    \
--input-null-non-string '\\N'  
}

case $1 in
  "ads_gmv_sum_day")
     export_data "ads_gmv_sum_day"
;;
   "all")
     export_data "ads_gmv_sum_day"
;;
esac

        chmod +x sqoop_gmv_export.sh

        ./sqoop_gmv_export.sh all

        在node22上执行:

        mysql

        use mall;

        select * from ads_gmv_sum_day;

        3、Azkaban调度

        为了方便演示流程,这里新生成一天的数据:

        在node22上执行

        mysql

        use mall;

        CALL init_date('2020-08-31',300,200,300,FALSE);

        exit;

        在本地计算机上编写job文件

        import.job

        ods.job

        dwd.job

        dws.job

        创建gmv_ads.job文件

        type=command

        do_date=${dt}

        dependencies=dws

        command=/home/warehouse/shell/ads_gmv.sh ${do_date}

        创建gmv_export.job文件

        type=command

        do_date=${dt}

        dependencies=gmv_ads

        command=/home/warehouse/shell/sqoop_gmv_export.sh all

        后续与上述一样

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐