大数据几种任务调度工具
Apache DolphinScheduler是一个分布式、易扩展的可视化DAG工作流任务调度平台。致力于解决数据处理流程中错综复杂的依赖关系,使调度系统在数据处理流程中开箱即用DolphinScheduler支持多种部署模式,包括单机模式(Standalone)、伪集群模式(Pseudo-Cluster)、集群模式(Cluster)等Airflow是一个以编程方式编写,安排和监视工作流的平台。使
一、DolphinScheduler概述和部署
1、DolphinScheduler简介
1.1 概述
Apache DolphinScheduler是一个分布式、易扩展的可视化DAG工作流任务调度平台。致力于解决数据处理流程中错综复杂的依赖关系,使调度系统在数据处理流程中开箱即用
1.2 核心架构
DolphinScheduler的主要角色如下:
- MasterServer采用分布式无中心设计理念,MasterServer主要负责 DAG 任务切分、任务提交、任务监控,并同时监听其它MasterServer和WorkerServer的健康状态
- WorkerServer也采用分布式无中心设计理念,WorkerServer主要负责任务的执行和提供日志服务
- ZooKeeper服务,系统中的MasterServer和WorkerServer节点都通过ZooKeeper来进行集群管理和容错
- Alert服务,提供告警相关服务
- API接口层,主要负责处理前端UI层的请求
- UI,系统的前端页面,提供系统的各种可视化操作界面
2、DolphinScheduler部署模式
https://dolphinscheduler.apache.org/zh-cn/docs/2.0.3/部署指南_menu
2.1 概述
DolphinScheduler支持多种部署模式,包括单机模式(Standalone)、伪集群模式(Pseudo-Cluster)、集群模式(Cluster)等
2.2 单机模式
单机模式(standalone)模式下,所有服务均集中于一个StandaloneServer进程中,并且其中内置了注册中心Zookeeper和数据库H2。只需配置JDK环境,就可一键启动DolphinScheduler,快速体验其功能
由于DolphinScheduler的单机模式使用的是内置的ZK和数据库,故在集群模式下所做的相关配置在单机模式下并不可见,所以需要重新配置,必要的配置为创建租户和创建用户
bin/dolphinscheduler-daemon.sh start standalone-server
2.3 伪集群模式
伪集群模式(Pseudo-Cluster)是在单台机器部署 DolphinScheduler 各项服务,该模式下master、worker、api server、logger server等服务都只在同一台机器上。Zookeeper和数据库需单独安装并进行相应配置
2.4 集群模式
集群模式(Cluster)与伪集群模式的区别就是在多台机器部署 DolphinScheduler各项服务,并且可以配置多个Master及多个Worker
3、DolphinScheduler集群模式部署
3.1 集群规划与准备
- 三台节点均需部署JDK(1.8+),并配置相关环境变量
- 需部署数据库,支持MySQL(5.7+)或者PostgreSQL(8.2.15+)。如 MySQL 则需要 JDBC Driver 8.0.16
- 需部署Zookeeper(3.4.6+)
- 如果启用 HDFS 文件系统,则需要 Hadoop(2.6+)环境
- 三台节点均需安装进程管理工具包psmisc
sudo yum install -y psmisc
3.2 下载与配置部署脚本
wget https://archive.apache.org/dist/dolphinscheduler/2.0.3/apache-dolphinscheduler-2.0.3-bin.tar.gz
tar -zxvf apache-dolphinscheduler-2.0.3-bin.tar,gz
修改解压目录下的conf/config目录下的install_config.conf文件,不需要修改的可以直接略过
# ---------------------------------------------------------
# INSTALL MACHINE
# ---------------------------------------------------------
# A comma separated list of machine hostname or IP would be installed DolphinScheduler,
# including master, worker, api, alert. If you want to deploy in pseudo-distributed
# mode, just write a pseudo-distributed hostname
# Example for hostnames: ips="ds1,ds2,ds3,ds4,ds5", Example for IPs: ips="192.168.8.1,192.168.8.2,192.168.8.3,192.168.8.4,192.168.8.5"
ips="hadoop102,hadoop103,hadoop104"
# 将要部署任一 DolphinScheduler 服务的服务器主机名或 ip 列表
# Port of SSH protocol, default value is 22. For now we only support same port in all `ips` machine
# modify it if you use different ssh port
sshPort="22"
# A comma separated list of machine hostname or IP would be installed Master server, it
# must be a subset of configuration `ips`.
# Example for hostnames: masters="ds1,ds2", Example for IPs: masters="192.168.8.1,192.168.8.2"
masters="hadoop102"
# master 所在主机名列表,必须是 ips 的子集
# A comma separated list of machine <hostname>:<workerGroup> or <IP>:<workerGroup>.All hostname or IP must be a
# subset of configuration `ips`, And workerGroup have default value as `default`, but we recommend you declare behind the hosts
# Example for hostnames: workers="ds1:default,ds2:default,ds3:default", Example for IPs: workers="192.168.8.1:default,192.168.8.2:default,192.168.8.3:default"
workers="hadoop102:default,hadoop103:default,hadoop104:default"
# worker主机名及队列,此处的 ip 必须在 ips 列表中
# A comma separated list of machine hostname or IP would be installed Alert server, it
# must be a subset of configuration `ips`.
# Example for hostname: alertServer="ds3", Example for IP: alertServer="192.168.8.3"
alertServer="hadoop102"
# 告警服务所在服务器主机名
# A comma separated list of machine hostname or IP would be installed API server, it
# must be a subset of configuration `ips`.
# Example for hostname: apiServers="ds1", Example for IP: apiServers="192.168.8.1"
apiServers="hadoop102"
# api服务所在服务器主机名
# A comma separated list of machine hostname or IP would be installed Python gateway server, it
# must be a subset of configuration `ips`.
# Example for hostname: pythonGatewayServers="ds1", Example for IP: pythonGatewayServers="192.168.8.1"
# pythonGatewayServers="ds1"
# 不需要的配置项,可以保留默认值,也可以用 # 注释
# The directory to install DolphinScheduler for all machine we config above. It will automatically be created by `install.sh` script if not exists.
# Do not set this configuration same as the current path (pwd)
installPath="/opt/module/dolphinscheduler"
# DS 安装路径,如果不存在会创建
# The user to deploy DolphinScheduler for all machine we config above. For now user must create by yourself before running `install.sh`
# script. The user needs to have sudo privileges and permissions to operate hdfs. If hdfs is enabled than the root directory needs
# to be created by this user
deployUser="atguigu"
# 部署用户,任务执行服务是以 sudo -u {linux-user} 切换不同 Linux 用户的方式来实现多租户运行作业,因此该用户必须有免密的 sudo 权限。
# The directory to store local data for all machine we config above. Make sure user `deployUser` have permissions to read and write this directory.
dataBasedirPath="/tmp/dolphinscheduler"
# 前文配置的所有节点的本地数据存储路径,需要确保部署用户拥有该目录的读写权限
# ---------------------------------------------------------
# DolphinScheduler ENV
# ---------------------------------------------------------
# JAVA_HOME, we recommend use same JAVA_HOME in all machine you going to install DolphinScheduler
# and this configuration only support one parameter so far.
javaHome="/opt/module/jdk1.8.0_212"
# JAVA_HOME 路径
# DolphinScheduler API service port, also this is your DolphinScheduler UI component's URL port, default value is 12345
apiServerPort="12345"
# ---------------------------------------------------------
# Database
# NOTICE: If database value has special characters, such as `.*[]^${}\+?|()@#&`, Please add prefix `\` for escaping.
# ---------------------------------------------------------
# The type for the metadata database
# Supported values: ``postgresql``, ``mysql`, `h2``.
# 注意:数据库相关配置的 value 必须加引号,否则配置无法生效
DATABASE_TYPE="mysql"
# 数据库类型
# Spring datasource url, following <HOST>:<PORT>/<database>?<parameter> format, If you using mysql, you could use jdbc
# string jdbc:mysql://127.0.0.1:3306/dolphinscheduler?useUnicode=true&characterEncoding=UTF-8 as example
# SPRING_DATASOURCE_URL=${SPRING_DATASOURCE_URL:-"jdbc:h2:mem:dolphinscheduler;MODE=MySQL;DB_CLOSE_DELAY=-1;DATABASE_TO_LOWER=true"}
SPRING_DATASOURCE_URL="jdbc:mysql://hadoop102:3306/dolphinscheduler?useUnicode=true&characterEncoding=UTF-8"
# 数据库 URL
# Spring datasource username
# SPRING_DATASOURCE_USERNAME=${SPRING_DATASOURCE_USERNAME:-"sa"}
SPRING_DATASOURCE_USERNAME="dolphinscheduler"
# 数据库用户名
# Spring datasource password
# SPRING_DATASOURCE_PASSWORD=${SPRING_DATASOURCE_PASSWORD:-""}
SPRING_DATASOURCE_PASSWORD="dolphinscheduler"
# 数据库密码
# ---------------------------------------------------------
# Registry Server
# ---------------------------------------------------------
# Registry Server plugin name, should be a substring of `registryPluginDir`, DolphinScheduler use this for verifying configuration consistency
registryPluginName="zookeeper"
# 注册中心插件名称,DS 通过注册中心来确保集群配置的一致性
# Registry Server address.
registryServers="hadoop102:2181,hadoop103:2181,hadoop104:2181"
# 注册中心地址,即 Zookeeper 集群的地址
# Registry Namespace
registryNamespace="dolphinscheduler"
# DS 在 Zookeeper 的结点名称
# ---------------------------------------------------------
# Worker Task Server
# ---------------------------------------------------------
# Worker Task Server plugin dir. DolphinScheduler will find and load the worker task plugin jar package from this dir.
taskPluginDir="lib/plugin/task"
# resource storage type: HDFS, S3, NONE
resourceStorageType="HDFS"
# 资源存储类型
# resource store on HDFS/S3 path, resource file will store to this hdfs path, self configuration, please make sure the directory exists on hdfs and has read write permissions. "/dolphinscheduler" is recommended
resourceUploadPath="/dolphinscheduler"
# 资源上传路径
# if resourceStorageType is HDFS,defaultFS write namenode address,HA, you need to put core-site.xml and hdfs-site.xml in the conf directory.
# if S3,write S3 address,HA,for example :s3a://dolphinscheduler,
# Note,S3 be sure to create the root directory /dolphinscheduler
defaultFS="hdfs://hadoop102:8020"
# 默认文件系统
# if resourceStorageType is S3, the following three configuration is required, otherwise please ignore
s3Endpoint="http://192.168.xx.xx:9010"
s3AccessKey="xxxxxxxxxx"
s3SecretKey="xxxxxxxxxx"
# resourcemanager port, the default value is 8088 if not specified
resourceManagerHttpAddressPort="8088"
# yarn RM http 访问端口
# if resourcemanager HA is enabled, please set the HA IPs; if resourcemanager is single node, keep this value empty
yarnHaIps=
# Yarn RM 高可用 ip,若未启用 RM 高可用,则将该值置空
# if resourcemanager HA is enabled or not use resourcemanager, please keep the default value; If resourcemanager is single node, you only need to replace 'yarnIp1' to actual resourcemanager hostname
singleYarnIp="hadoop103"
# Yarn RM 主机名,若启用了 HA 或未启用 RM,保留默认值
# who has permission to create directory under HDFS/S3 root path
# Note: if kerberos is enabled, please config hdfsRootUser=
hdfsRootUser="atguigu"
# 拥有 HDFS 根目录操作权限的用户
# 下面是如果hdfs开启了验证在操作的
# kerberos config
# whether kerberos starts, if kerberos starts, following four items need to config, otherwise please ignore
kerberosStartUp="false"
# kdc krb5 config file path
krb5ConfPath="$installPath/conf/krb5.conf"
# keytab username,watch out the @ sign should followd by \\
keytabUserName="hdfs-mycluster\\@ESZ.COM"
# username keytab path
keytabPath="$installPath/conf/hdfs.headless.keytab"
# kerberos expire time, the unit is hour
kerberosExpireTime="2"
# use sudo or not
sudoEnable="true"
# worker tenant auto create
workerTenantAutoCreate="false"
3.3 初始化数据库
DolphinScheduler 元数据存储在关系型数据库中,故需创建相应的数据库和用户
# 创建数据库
CREATE DATABASE dolphinscheduler DEFAULT CHARACTER SET utf8 DEFAULT COLLATE utf8_general_ci;
# 创建用户
CREATE USER 'dolphinscheduler'@'%' IDENTIFIED BY 'dolphinscheduler';
# 提高密码复杂度或者执行以下命令降低MySQL密码强度级别
set global validate_password_length=4;
set global validate_password_policy=0;
# 赋予用户相应权限
GRANT ALL PRIVILEGES ON dolphinscheduler.* TO 'dolphinscheduler'@'%';
flush privileges;
# 拷贝MySQL驱动到DolphinScheduler的解压目录下的lib中
cp /opt/software/mysql-connector-java-8.0.16.jar lib/
# 执行数据库初始化脚本
# 数据库初始化脚本位于DolphinScheduler解压目录下的script目录中,即/opt/software/ds/apache-dolphinscheduler-2.0.3-bin/script/
script/create-dolphinscheduler.sh
3.4 一键部署DolphinScheduler
# 启动zk
zk.sh start
# 一键部署并启动DolphinScheduler
./install.sh
# 查看DolphinScheduler进程
# ApiApplicationServer
# WorkerServer
# MasterServer
# AlertServer
# LoggerServer
# ----------
# 访问DolphinScheduler UI
# DolphinScheduler UI地址为http://hadoop102:12345/dolphinscheduler
# 初始用户的用户名为:admin,密码为dolphinscheduler123
3.5 DolphinScheduler启停命令
安装完后得去/opt/module/dolphinscheduler
修改或启停
# 一键启停所有服务
./bin/start-all.sh
./bin/stop-all.sh
# 注意同Hadoop的启停脚本进行区分
# 启停 Master
./bin/dolphinscheduler-daemon.sh start master-server
./bin/dolphinscheduler-daemon.sh stop master-server
# 启停 Worker
./bin/dolphinscheduler-daemon.sh start worker-server
./bin/dolphinscheduler-daemon.sh stop worker-server
# 启停 Api
./bin/dolphinscheduler-daemon.sh start api-server
./bin/dolphinscheduler-daemon.sh stop api-server
# 启停 Logger
./bin/dolphinscheduler-daemon.sh start logger-server
./bin/dolphinscheduler-daemon.sh stop logger-server
# 启停 Alert
./bin/dolphinscheduler-daemon.sh start alert-server
./bin/dolphinscheduler-daemon.sh stop alert-server
二、DolphinScheduler操作
入门文档可以参考:https://dolphinscheduler.apache.org/zh-cn/docs/2.0.3/guide/quick-start
1、工作流传参
https://dolphinscheduler.apache.org/zh-cn/docs/2.0.3/功能介绍_menu/参数_menu
DolphinScheduler支持对任务节点进行灵活的传参,任务节点可通过${参数名}
引用参数值
1.1 内置参数
基础内置参数
变量名 | 参数 | 说明 |
---|---|---|
system.biz.date | ${system.biz.date} | 定时时间前一天,格式为 yyyyMMdd |
system.biz.curdate | ${system.biz.curdate} | 定时时间,格式为 yyyyMMdd |
system.datetime | ${system.datetime} | 定时时间,格式为 yyyyMMddHHmmss |
衍生内置参数
可通过衍生内置参数,设置任意格式、任意时间的日期。
- 自定义日期格式:可以对 $[yyyyMMddHHmmss] 任意分解组合,如 $[yyyyMMdd], $[HHmmss], $[yyyy-MM-dd]。
- 使用 add_months() 函数:该函数用于加减月份, 第一个入口参数为[yyyyMMdd],表示返回时间的格式 第二个入口参数为月份偏移量,表示加减多少个月
参数 | 说明 |
---|---|
$[add_months(yyyyMMdd,12*N)] | 后 N 年 |
$[add_months(yyyyMMdd,-12*N)] | 前 N 年 |
$[add_months(yyyyMMdd,N)] | 后 N 月 |
$[add_months(yyyyMMdd,-N)] | 前 N 月 |
$[yyyyMMdd+7*N] | 后 N 周 |
$[yyyyMMdd-7*N] | 前 N 周 |
$[yyyyMMdd+N] | 后 N 天 |
$[yyyyMMdd-N] | 前 N 天 |
$[HHmmss+N/24] | 后 N 小时 |
$[HHmmss-N/24] | 前 N 小时 |
$[HHmmss+N/24/60] | 后 N 分钟 |
$[HHmmss-N/24/60] | 前 N 分钟 |
相关说明
- dt:参数名
- IN:IN 表示局部参数仅能在当前节点使用,OUT 表示局部参数可以向下游传递(目前支持这个特性的任务类型有:Shell、SQL、Procedure;同时若节点之间没有依赖关系,则局部参数无法传递)
- DATE:数据类型,日期
- $[yyyy-MM-dd]:自定义格式的衍生内置参数
全局参数在工作流定义,本地参数在节点定义,本地参数 > 全局参数 > 上游任务传递的参数
1.2 参数传递
- 本地参数 > 全局参数 > 上游任务传递的参数;
- 多个上游节点均传递同名参数时,下游节点会优先使用值为非空的参数;
- 如果存在多个值为非空的参数,则按照上游任务的完成时间排序,选择完成时间最早的上游任务对应的参数。
2、引用依赖资源
有些任务需要引用一些额外的资源,例如MR、Spark等任务须引用jar包,Shell任务需要引用其他脚本等。DolphinScheduler提供了资源中心来对这些资源进行统一管理。
如果需要用到资源上传功能,针对单机可以选择本地文件目录作为上传文件夹(此操作不需要部署 Hadoop)。当然也可以选择上传到 Hadoop or MinIO 集群上,此时则需要有Hadoop (2.6+) 或者 MinIO 等相关环境。本文在部署 DS 集群时指定了文件系统为 HDFS
https://dolphinscheduler.apache.org/zh-cn/docs/2.0.3/guide/resource
3、数据源配置
https://dolphinscheduler.apache.org/zh-cn/docs/2.0.3/功能介绍_menu/数据源中心_menu
数据源中心支持MySQL、POSTGRESQL、HIVE/IMPALA、SPARK、CLICKHOUSE、ORACLE、SQLSERVER等数据源。此处仅对 HIVE 数据源进行介绍
- 数据源:选择HIVE
- 数据源名称:输入数据源的名称
- 描述:输入数据源的描述,可置空
- IP/主机名:输入连接HIVE的IP
- 端口:输入连接HIVE的端口,默认 10000
- 用户名:设置连接HIVE的用户名,如果没有配置 HIVE 权限管理,则用户名可以任意,但 HIVE 表数据存储在 HDFS,为了保证对所有表的数据均有操作权限,此处选择 HDFS 超级用户 atguigu(注:HDFS 超级用户名与执行 HDFS 启动命令的 Linux 节点用户名相同)
- 密码:设置连接HIVE的密码,如果没有配置 HIVE 权限管理,则密码置空即可
- 数据库名:输入连接HIVE的数据库名称
- Jdbc连接参数:用于HIVE连接的参数设置,以JSON形式填写,没有参数可置空
然后在工作流中可以选择SQL
- 节点名称:自定义节点名称
- 环境名称:HIVE 执行所需环境
- 数据源:类型选择 HIVE,数据源选择上文配置的 HIVE 数据源
- SQL 类型:根据SQL 语句选择,此处选用默认的“查询”即可
- SQL 语句:要执行的 SQL 语句,末尾不能有分号,否则报错:语法错误
4、告警实例配置
4.1 邮箱告警实例配置
需要登陆管理员账户
- 告警实例名称:在告警组配置时可以选择的告警插件实例名称,用户自定义
- 选择插件:选择 Email 则为邮箱告警实例
- 收件人:接收方邮箱地址,收件人不需要开启 SMTP 服务
- 抄送人:抄送是指用户给收件人发出邮件的同时把该邮件发送给另外的人,收件人之外的收件方都是抄送人,“收件人”可以获知该邮件的所有抄送人;抄送人可以为空。
- mail.smtp.host:邮箱的 SMTP 服务器域名,对于 QQ 邮箱,为 smtp.qq.com。各邮箱的 SMTP 服务器见此链接:https://blog.csdn.net/wustzjf/article/details/52481309
- mail.smtp.port:邮箱的 SMTP 服务端口号,主流邮箱均为 25 端口,使用默认值即可
- mail.sender:发件方邮箱地址,需要开启 SMTP 服务
- mail.user:与 mail.sender 保持一致即可
- mail.password:获取的邮箱授权码。未列出的选项保留默认值或默认选项即可
4.2 其他告警
其他告警可以参考:https://dolphinscheduler.apache.org/zh-cn/docs/3.0.0
同时还可以电话告警,这里有个运维平台是一站式集成的,睿象云官网:https://www.aiops.com/
5、其他注意事项
DolphinScheduler的环境变量是不和主机共享的,默认需要进入/opt/module/dolphinscheduler/conf/env/dolphinscheduler_env.sh
进行修改,也可以直接在admin用户下在可视化界面进行创建,创建节点的时候选择即可
三、Airflow
1、Airflow基本概念
1.1 概述
Airflow是一个以编程方式编写,安排和监视工作流的平台。使用Airflow将工作流编写任务的有向无环图(DAG)。Airflow计划程序在遵循指定的依赖项,同时在一组工作线程上执行任务。丰富的命令实用程序使在DAG上执行复杂的调度变的轻而易举。丰富的用户界面使查看生产中正在运行的管道,监视进度以及需要时对问题进行故障排除变的容易
1.2 名词解释
- Dynamic:Airflow配置需要实用Python,允许动态生产管道。这允许编写可动态。这允许编写可动态实例化管道的代码
- Extensible:轻松定义自己的运算符,执行程序并扩展库,使其适合于您的环境
- Elegant:Airlfow是精简的,使用功能强大的Jinja模板引擎,将脚本参数化内置于Airflow的核心中
- Scalable:Airflow具有模板块架构,并使用消息队列来安排任意数量的工作任务
2、Airflow安装
2.1 python环境安装
# Superset是由Python语言编写的Web应用,要求Python3.8的环境
# 这里使用MiniConda作为包管理器
# 下载地址:https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh
bash Miniconda3-latest-Linux-x86_64.sh
# 加载环境变量配置文件,使之生效
source ~/.bashrc
# Miniconda安装完成后,每次打开终端都会激活其默认的base环境,我们可通过以下命令,禁止激活默认base环境
conda config --set auto_activate_base false
# 配置conda国内镜像
conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free
conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/main
conda config --set show_channel_urls yes
# 创建Python3.8环境
conda create --name airflow python=3.8
# 创建环境:conda create -n env_name
# 查看所有环境:conda info --envs
# 删除一个环境:conda remove -n env_name --all
# 激活airflow环境
conda activate airflow
# 执行python -V命令查看python版本
python -V
2.2 安装Airflow
conda activate airflow
pip install numpy -i https://pypi.tuna.tsinghua.edu.cn/simple
sudo mkdir ~/.pip
sudo vim ~/.pip/pip.conf
#添加以下内容
[global]
index-url = https://pypi.tuna.tsinghua.edu.cn/simple
[install]
trusted-host = https://pypi.tuna.tsinghua.edu.cn
# 安装airflow
pip install "apache-airflow==2.4.3"
# 初始化airflow
airflow db init
# 查看版本
airflow version
# airflow安装好存放路径
pwd
# 启动airflow web服务,启动后浏览器访问http://hadoop102:8081
airflow webserver -p 8081 -D
# 启动airflow调度
airflow scheduler -D
# 创建账号
airflow users create \
--username admin \
--firstname atguigu \
--lastname atguigu \
--role Admin \
--email shawn@atguigu.com
# 启动停止脚本
vim af.sh
#!/bin/bash
case $1 in
"start"){
echo " --------启动 airflow-------"
ssh hadoop102 "conda activate airflow;airflow webserver -p 8081 -D;airflow scheduler -D; conda deactivate"
};;
"stop"){
echo " --------关闭 airflow-------"
ps -ef|egrep 'scheduler|airflow-webserver'|grep -v grep|awk '{print $2}'|xargs kill -15
};;
esac
# 添加权限即可使用
chmod +x af.sh
3、修改数据库与调度器
3.1 修改数据库为mysql
# https://airflow.apache.org/docs/apache-airflow/2.4.3/howto/set-up-database.html#setting-up-a-mysql-database
# 在MySQL中建库
CREATE DATABASE airflow_db CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
# 如果报错Linux error:1425F102:SSL routines:ssl_choose_client_version:unsupported protocol,可以关闭MySQL的SSL证书
SHOW VARIABLES LIKE '%ssl%';
# 修改配置文件my.cnf,加入以下内容
# disable_ssl
skip_ssl
# 添加python连接的依赖:
pip install mysql-connector-python
# 修改airflow的配置文件
vim ~/airflow/airflow.cfg
[database]
# The SqlAlchemy connection string to the metadata database.
# SqlAlchemy supports many different database engines.
# More information here:
# http://airflow.apache.org/docs/apache-airflow/stable/howto/set-up-database.html#database-uri
#sql_alchemy_conn = sqlite:home/atguigu/airflow/airflow.db
sql_alchemy_conn = mysql+mysqlconnector://root:123456@hadoop102:3306/airflow_db
# 关闭airflow,初始化后重启
af.sh stop
airflow db init
# 初始化报错1067 - Invalid default value for ‘update_at’
# 原因:字段 'update_at' 为 timestamp类型,取值范围是:1970-01-01 00:00:00 到 2037-12-31 23:59:59(UTC +8 北京时间从1970-01-01 08:00:00 开始),而这里默认给了空值,所以导致失败
set GLOBAL sql_mode='STRICT_TRANS_TABLES,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION';
# 重启MySQL会造成参数失效,推荐将参数写入到配置文件my.cnf中
sql_mode = STRICT_TRANS_TABLES,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION
# 重启
af.sh start
# 重新创建账号登录
airflow users create \
--username admin \
--firstname atguigu \
--lastname atguigu \
--role Admin \
--email shawn@atguigu.com
3.2 修改执行器
官网不推荐在开发中使用顺序执行器,会造成任务调度阻塞
# 修改airflow的配置文件
[core]
# The executor class that airflow should use. Choices include
# ``SequentialExecutor``, ``LocalExecutor``, ``CeleryExecutor``, ``DaskExecutor``,
# ``KubernetesExecutor``, ``CeleryKubernetesExecutor`` or the
# full import path to the class when using a custom executor.
executor = LocalExecutor
# dags_folder是保存文件位置
4、部署使用
文档:https://airflow.apache.org/docs/apache-airflow/2.4.3/howto/index.html
4.1 环境部署启动
# 需要启动hadoop和spark的历史服务器
# 编写.py脚本,创建work-py目录用于存放python调度脚本
mkdir ~/airflow/dags
cd dags/
vim test.py
编写脚本
#!/usr/bin/python
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
# 用户
'owner': 'test_owner',
# 是否开启任务依赖
'depends_on_past': True,
# 邮箱
'email': ['403627000@qq.com'],
# 启动时间
'start_date':datetime(2022,11,28),
# 出错是否发邮件报警
'email_on_failure': False,
# 重试是否发邮件报警
'email_on_retry': False,
# 重试次数
'retries': 1,
# 重试时间间隔
'retry_delay': timedelta(minutes=5),
}
# 声明任务图
dag = DAG('test', default_args=default_args, schedule_interval=timedelta(days=1))
# 创建单个任务
t1 = BashOperator(
# 任务id
task_id='dwd',
# 任务命令
bash_command='ssh hadoop102 "/opt/module/spark-yarn/bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn /opt/module/spark-yarn/examples/jars/spark-examples_2.12-3.1.3.jar 10 "',
# 重试次数
retries=3,
# 把任务添加进图中
dag=dag)
t2 = BashOperator(
task_id='dws',
bash_command='ssh hadoop102 "/opt/module/spark-yarn/bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn /opt/module/spark-yarn/examples/jars/spark-examples_2.12-3.1.3.jar 10 "',
retries=3,
dag=dag)
t3 = BashOperator(
task_id='ads',
bash_command='ssh hadoop102 "/opt/module/spark-yarn/bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn /opt/module/spark-yarn/examples/jars/spark-examples_2.12-3.1.3.jar 10 "',
retries=3,
dag=dag)
# 设置任务依赖
t2.set_upstream(t1)
t3.set_upstream(t2)
注意一些注意事项
-
必须导包
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
-
default_args 设置默认参数
-
depends_on_past 是否开启任务依赖
-
schedule_interval 调度频率
-
retries 重试次数
-
start_date 开始时间
-
BashOperator 具体执行任务,如果为true前置任务必须成功完成才会走下一个依赖任务,如果为false则忽略是否成功完成
-
task_id 任务唯一标识(必填)
-
bash_command 具体任务执行命令
-
set_upstream 设置依赖
4.2 Dag任务操作
# 过段时间会加载
airflow dags list
# 查看所有任务
airflow list_dags
# 查看单个任务
airflow tasks list test --tree
# 如果删除的话需要UI和底层都删除才行
4.3 配置邮件服务器
修改airflow配置文件,用stmps服务对应587端口,
vim ~/airflow/airflow.cfg
smtp_host = smtp.qq.com
smtp_starttls = True
smtp_ssl = False
smtp_user = xx@qq.com
# smtp_user =
smtp_password = qluxdbuhgrhgbigi
# smtp_password =
smtp_port = 587
smtp_mail_from = xx@qq.com
# 然后重启
af.sh stop
af.sh star
# 编辑test.py脚本,并且替换
#!/usr/bin/python
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.email_operator import EmailOperator
from datetime import datetime, timedelta
default_args = {
# 用户
'owner': 'test_owner',
# 是否开启任务依赖
'depends_on_past': True,
# 邮箱
'email': ['xx@qq.com'],
# 启动时间
'start_date':datetime(2022,11,28),
# 出错是否发邮件报警
'email_on_failure': False,
# 重试是否发邮件报警
'email_on_retry': False,
# 重试次数
'retries': 1,
# 重试时间间隔
'retry_delay': timedelta(minutes=5),
}
# 声明任务图
dag = DAG('test', default_args=default_args, schedule_interval=timedelta(days=1))
# 创建单个任务
t1 = BashOperator(
# 任务id
task_id='dwd',
# 任务命令
bash_command='ssh hadoop102 "/opt/module/spark-yarn/bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn /opt/module/spark-yarn/examples/jars/spark-examples_2.12-3.1.3.jar 10 "',
# 重试次数
retries=3,
# 把任务添加进图中
dag=dag)
t2 = BashOperator(
task_id='dws',
bash_command='ssh hadoop102 "/opt/module/spark-yarn/bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn /opt/module/spark-yarn/examples/jars/spark-examples_2.12-3.1.3.jar 10 "',
retries=3,
dag=dag)
t3 = BashOperator(
task_id='ads',
bash_command='ssh hadoop102 "/opt/module/spark-yarn/bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn /opt/module/spark-yarn/examples/jars/spark-examples_2.12-3.1.3.jar 10 "',
retries=3,
dag=dag)
email=EmailOperator(
task_id="email",
to="yaohm163@163.com ",
subject="test-subject",
html_content="<h1>test-content</h1>",
cc="xx@qq.com ",
dag=dag)
t2.set_upstream(t1)
t3.set_upstream(t2)
email.set_upstream(t3)
四、Azkaban
azkaban官网:https://azkaban.github.io/downloads.html
1、Azkaban入门
1.1 上传jar包和配置sql
首先获取azkaban的三个包,可以自行编译,github地址
# https://pan.baidu.com/s/10zD2Y_h0oB_rC-BAjLal1g%C2%A0 密码:zsxa
# 将azkaban-db-3.84.4.tar.gz,azkaban-exec-server-3.84.4.tar.gz,azkaban-web-server-3.84.4.tar.gz上传到hadoop102的/opt/software路径
# 新建/opt/module/azkaban目录,并将所有tar包解压到这个目录下
mkdir /opt/module/azkaban
# 解压
tar -zxvf azkaban-db-3.84.4.tar.gz -C /opt/module/azkaban/
tar -zxvf azkaban-exec-server-3.84.4.tar.gz -C /opt/module/azkaban/
tar -zxvf azkaban-web-server-3.84.4.tar.gz -C /opt/module/azkaban/
# 进入到/opt/module/azkaban目录,依次修改名称
mv azkaban-exec-server-3.84.4/ azkaban-exec
mv azkaban-web-server-3.84.4/ azkaban-web
# ==============然后配置mysql=====================
mysql -uroot -p123456
# 登陆MySQL,创建Azkaban数据库
create database azkaban;
# 创建azkaban用户并赋予权限
set global validate_password_length=4;
set global validate_password_policy=0;
CREATE USER 'azkaban'@'%' IDENTIFIED BY '000000';
# 赋予Azkaban用户增删改查权限
GRANT SELECT,INSERT,UPDATE,DELETE ON azkaban.* to 'azkaban'@'%' WITH GRANT OPTION;
# 创建Azkaban表,完成后退出MySQL
use azkaban;
source /opt/module/azkaban/azkaban-db-3.84.4/create-all-sql-3.84.4.sql
quit;
# 更改MySQL包大小;防止Azkaban连接MySQL阻塞
sudo vim /etc/my.cnf
# 在[mysqld]下面加一行max_allowed_packet=1024M
[mysqld]
max_allowed_packet=1024M
# 重启MySQL
sudo systemctl restart mysqld
1.2 配置Executor Server
Azkaban Executor Server处理工作流和作业的实际执行
# 编辑azkaban.properties
vim /opt/module/azkaban/azkaban-exec/conf/azkaban.properties
# 修改如下属性
#...
default.timezone.id=Asia/Shanghai
#...
azkaban.webserver.url=http://hadoop102:8081
executor.port=12321
#...
database.type=mysql
mysql.port=3306
mysql.host=hadoop102
mysql.database=azkaban
mysql.user=azkaban
mysql.password=000000
mysql.numconnections=100
# 同步azkaban-exec到所有节点
xsync /opt/module/azkaban/azkaban-exec
# 必须进入到/opt/module/azkaban/azkaban-exec路径,分别在三台机器上,启动executor server
bin/start-exec.sh
bin/start-exec.sh
bin/start-exec.sh
# 注意:如果在/opt/module/azkaban/azkaban-exec目录下出现executor.port文件,说明启动成功
# 下面激活executor,需要分别在三台机器依次执行
curl -G "hadoop102:12321/executor?action=activate" && echo
curl -G "hadoop103:12321/executor?action=activate" && echo
curl -G "hadoop104:12321/executor?action=activate" && echo
# 如果三台机器都出现如下提示,则表示激活成功
{"status":"success"}
1.3 配置Web Server
Azkaban Web Server处理项目管理,身份验证,计划和执行触发
# 编辑azkaban.properties
vim /opt/module/azkaban/azkaban-web/conf/azkaban.properties
# 修改如下属性
...
default.timezone.id=Asia/Shanghai
...
database.type=mysql
mysql.port=3306
mysql.host=hadoop102
mysql.database=azkaban
mysql.user=azkaban
mysql.password=000000
mysql.numconnections=100
...
azkaban.executorselector.filters=StaticRemainingFlowSize,CpuStatus
# 说明:
# StaticRemainingFlowSize:正在排队的任务数;
# CpuStatus:CPU占用情况
# MinimumFreeMemory:内存占用情况。测试环境,必须将MinimumFreeMemory删除掉,否则它会认为集群资源不够,不执行。
# 修改azkaban-users.xml文件,添加atguigu用户
vim /opt/module/azkaban/azkaban-web/conf/azkaban-users.xml
<azkaban-users>
<user groups="azkaban" password="azkaban" roles="admin" username="azkaban"/>
<user password="metrics" roles="metrics" username="metrics"/>
<user password="atguigu" roles="admin" username="atguigu"/>
<role name="admin" permissions="ADMIN"/>
<role name="metrics" permissions="METRICS"/>
</azkaban-users>
# 必须进入到hadoop102的/opt/module/azkaban/azkaban-web路径,启动web server
bin/start-web.sh
# 访问http://hadoop102:8081,并用atguigu用户登陆
2、Work Flow案例实操
2.1 HelloWorld案例
# 在windows环境,新建azkaban.project文件,编辑内容如下
# 注意:该文件作用,是采用新的Flow-API方式解析flow文件
azkaban-flow-version: 2.0
# 新建basic.flow文件,内容如下
nodes:
- name: jobA
type: command
config:
command: echo "Hello World"
# Name:job名称
# Type:job类型。command表示你要执行作业的方式为命令
# Config:job配置
# 将azkaban.project、basic.flow文件压缩到一个zip文件,文件名称必须是英文
# 在WebServer新建项目:http://hadoop102:8081/index
# 然后上传压缩文件,执行,查看日志
2.2 作业依赖案例
需求:JobA和JobB执行完了,才能执行JobC
# 修改basic.flow为如下内容
nodes:
- name: jobC
type: command
# jobC 依赖 JobA和JobB
dependsOn:
- jobA
- jobB
config:
command: echo "I’m JobC"
- name: jobA
type: command
config:
command: echo "I’m JobA"
- name: jobB
type: command
config:
command: echo "I’m JobB"
2.3 自动失败重试案例
需求:如果执行任务失败,需要重试3次,重试的时间间隔10000ms
nodes:
- name: JobA
type: command
config:
command: sh /not_exists.sh
retries: 3
retry.backoff: 10000
也可以在Flow全局配置中添加任务失败重试配置,此时重试配置会应用到所有Job
config:
retries: 3
retry.backoff: 10000
nodes:
- name: JobA
type: command
config:
command: sh /not_exists.sh
2.4 手动失败重试案例
需求:JobA⇒JobB(依赖于A)⇒JobC⇒JobD⇒JobE⇒JobF。生产环境,任何Job都有可能挂掉,可以根据需求执行想要执行的Job。
nodes:
- name: JobA
type: command
config:
command: echo "This is JobA."
- name: JobB
type: command
dependsOn:
- JobA
config:
command: echo "This is JobB."
- name: JobC
type: command
dependsOn:
- JobB
config:
command: echo "This is JobC."
- name: JobD
type: command
dependsOn:
- JobC
config:
command: echo "This is JobD."
- name: JobE
type: command
dependsOn:
- JobD
config:
command: echo "This is JobE."
- name: JobF
type: command
dependsOn:
- JobE
config:
command: echo "This is JobF."
在可视化界面,Enable和Disable下面都分别有如下参数:
- Parents:该作业的上一个任务
- Ancestors:该作业前的所有任务
- Children:该作业后的一个任务
- Descendents:该作业后的所有任务
- Enable All:所有的任务
3、JavaProcess作业类型案例
3.1 概述
JavaProcess类型可以运行一个自定义主类方法,type类型为javaprocess,可用的配置为:
- Xms:最小堆
- Xmx:最大堆
- classpath:类路径
- java.class:要运行的Java对象,其中必须包含Main方法
- main.args:main方法的参数
3.2 案例
新建一个azkaban的maven工程,然后创建包名:com.atguigu,创建AzTest类
package com.atguigu;
public class AzTest {
public static void main(String[] args) {
System.out.println("This is for testing!");
}
}
打包成jar包azkaban-1.0-SNAPSHOT.jar,新建testJava.flow,内容如下
nodes:
- name: test_java
type: javaprocess
config:
Xms: 96M
Xmx: 200M
java.class: com.atguigu.AzTest
**将Jar包、flow文件和project文件打包成javatest.zip **,然后上传执行
4、条件工作流案例
4.1 概述
条件工作流功能允许用户自定义执行条件来决定是否运行某些Job。条件可以由当前Job的父Job输出的运行时参数构成,也可以使用预定义宏。在这些条件下,用户可以在确定Job执行逻辑时获得更大的灵活性,例如,只要父Job之一成功,就可以运行当前Job
4.2 运行时参数案例
基本原理:父Job将参数写入JOB_OUTPUT_PROP_FILE
环境变量所指向的文件;子Job使用 ${jobName:param}
来获取父Job输出的参数并定义执行条件
支持的条件运算符:
(1)== 等于
(2)!= 不等于
(3)> 大于
(4)>= 大于等于
(5)< 小于
(6)<= 小于等于
(7)&& 与
(8)|| 或
(9)! 非
需求分析:
# JobA执行一个shell脚本。
# JobB执行一个shell脚本,但JobB不需要每天都执行,而只需要每个周一执行
# 新建JobA.sh
#!/bin/bash
echo "do JobA"
wk=`date +%w`
echo "{\"wk\":$wk}" > $JOB_OUTPUT_PROP_FILE
# 新建JobB.sh
#!/bin/bash
echo "do JobB"
# 新建condition.flow
nodes:
- name: JobA
type: command
config:
command: sh JobA.sh
- name: JobB
type: command
dependsOn:
- JobA
config:
command: sh JobB.sh
condition: ${JobA:wk} == 1
# 最后将JobA.sh、JobB.sh、condition.flow和azkaban.project打包成condition.zip
4.3 预定义宏案例
Azkaban中预置了几个特殊的判断条件,称为预定义宏。预定义宏会根据所有父Job的完成情况进行判断,再决定是否执行。可用的预定义宏如下:
(1)all_success: 表示父Job全部成功才执行(默认)
(2)all_done:表示父Job全部完成才执行
(3)all_failed:表示父Job全部失败才执行
(4)one_success:表示父Job至少一个成功才执行
(5)one_failed:表示父Job至少一个失败才执行
# 需求
# JobA执行一个shell脚本
# JobB执行一个shell脚本
# JobC执行一个shell脚本,要求JobA、JobB中有一个成功即可执行
# 新建JobA.sh
#!/bin/bash
echo "do JobA"
# 新建JobC.sh
#!/bin/bash
echo "do JobC"
# 新建macro.flow
nodes:
- name: JobA
type: command
config:
command: sh JobA.sh
- name: JobB
type: command
config:
command: sh JobB.sh
- name: JobC
type: command
dependsOn:
- JobA
- JobB
config:
command: sh JobC.sh
condition: one_success
5、邮箱告警
首先申请好邮箱,然后配置
# 在azkaban-web节点hadoop102上,编辑/opt/module/azkaban/azkaban-web/conf/azkaban.properties,修改如下内容
vim /opt/module/azkaban/azkaban-web/conf/azkaban.properties
# 添加如下内容:
#这里设置邮件发送服务器,需要 申请邮箱,切开通stmp服务,以下只是例子
mail.sender=atguigu@126.com
mail.host=smtp.126.com
mail.user=atguigu@126.com
mail.password=用邮箱的授权码
# 保存并重启web-server
bin/shutdown-web.sh
bin/start-web.sh
# 编辑basic.flow
nodes:
- name: jobA
type: command
config:
command: echo "This is an email test."
# 将azkaban.project和basic.flow压缩成email.zip
# 然后上传,在可视化页面里选择邮箱告警
# 针对电话告警,可以使用睿象云,https://www.aiops.com/
6、Azkaban多Executor模式注意事项
Azkaban多Executor模式是指,在集群中多个节点部署Executor。在这种模式下, Azkaban web Server会根据策略,选取其中一个Executor去执行任务。为确保所选的Executor能够准确的执行任务,我们须在以下两种方案任选其一,推荐使用方案二。
方案一:指定特定的Executor(hadoop102)去执行任务
# 在MySQL中azkaban数据库executors表中,查询hadoop102上的Executor的id
mysql> use azkaban;
mysql> select * from executors;
# 在执行工作流程时选择Flow Parameters加入useExecutor属性
方案二:在Executor所在所有节点部署任务所需脚本和应用
官网文档:https://azkaban.readthedocs.io/en/latest/configuration.html
更多推荐
所有评论(0)