airflow整合环境搭建

1. 整体结构

mysql -> 后端数据库

redis -> 用于broker

CeleryExecutor -> 执行器

2. 环境安装

安装python anaconda环境

添加py用户

# useradd py

设置密码

# passwd py

创建anaconda安装路径

# mkdir /anaconda

赋权

# chown -R py:py /anaconda

上传anaconda安装包并用py用户运行安装程序

$ chmod +x Anaconda3-5.1.0-Linux-x86_64.sh

$ ./Anaconda3-5.1.0-Linux-x86_64.sh

Welcome to Anaconda3 5.1.0

In order to continue the installation process, please review the license

......

- Press ENTER to confirm the location

- Press CTRL-C to abort the installation

- Or specify a different location below

[/home/py/anaconda3] >>> /anaconda/anaconda3 输入自定义安装路径,如果用默认的话回车跳过

然后将anaconda加入环境变量,并使其生效

$ vi .bash_profile

在最后一行加上如下配置:

export PATH=/anaconda/anaconda3/bin:$PATH

然后使其生效:

$source .bash_profile

检测一下安装结果,出现以下结果说明安装成功:

$ python -V

Python 3.6.4 :: Anaconda, Inc.

配置pipy源:

$ mkdir ~/.pip

$ touch ~/.pip/pip.conf

$ echo ‘[global]‘ >> ~/.pip/pip.conf

$ echo ‘trusted-host=pypi.douban.com/simple‘ >> ~/.pip/pip.conf

$ echo ‘index-url=http://pypi.douban.com/simple‘ >> ~/.pip/pip.conf

安装mysql相关依赖

去mysql官网下载mysql-5.7.22-1.el6.x86_64.rpm-bundle.tar安装包并上传至服务器:

# tar xvf mysql-5.7.22-1.el6.x86_64.rpm-bundle.tar

检查服务器是否有旧版本依赖:

# rpm -qa|grep mysql-libs-5.1.73|wc -l

如果结果大于0则执行如下命令卸载旧依赖:

# rpm -e --nodeps mysql-libs-5.1.73-5.el6_6.x86_64

如果等于0则不需要此操作.

然后依次安装以下依赖:

# rpm -ivh mysql-community-common-5.7.22-1.el6.x86_64.rpm

# rpm -ivh mysql-community-libs-5.7.22-1.el6.x86_64.rpm

# rpm -ivh mysql-community-devel-5.7.22-1.el6.x86_64.rpm

安装相关模块

$ pip install apache-airflow[celery]

$ pip install apache-airflow[redis]

$ pip install apache-airflow[mysql]

检测一下安装结果:

$ airflow -h

如果显示正常则表示安装成功,并且用户根目录会出现airflow文件夹

安装mysql

上传mysql-5.7.22-1.el6.x86_64.rpm-bundle.tar至需要安装mysql的服务器上:

# tar xvf mysql-5.7.22-1.el6.x86_64.rpm-bundle.tar

检查服务器是否有旧版本依赖:

# rpm -qa|grep mysql-libs-5.1.73|wc -l

如果结果大于0则执行如下命令卸载旧依赖:

# rpm -e --nodeps mysql-libs-5.1.73-5.el6_6.x86_64

如果等于0则不需要此操作.

然后依次安装以下安装:

# rpm -ivh mysql-community-common-5.7.22-1.el6.x86_64.rpm

# rpm -ivh mysql-community-libs-5.7.22-1.el6.x86_64.rpm

# rpm -ivh mysql-community-devel-5.7.22-1.el6.x86_64.rpm

# rpm -ivh mysql-community-client-5.7.22-1.el6.x86_64.rpm

# rpm -ivh mysql-community-server-5.7.22-1.el6.x86_64.rpm

# vi /etc/my.cnf

尾部添加skip-grant-tables

# service mysqld start

# mysql -u root

用于测试话剧所以密码设置的比较简单,仅供测试:

mysql> use mysql

mysql> update user set password_expired=‘N‘ where user=‘root‘;

mysql> update user set authentication_string=password(‘123456‘) where user=‘root‘;

编辑/etc/my.cnf去掉skip-grant-tables 并重启mysql:

# service mysqld restart

# mysql -u root -p

使用密码123456登陆

#降低密码复杂度要求仅仅用于测试

mysql> set global validate_password_policy=0;

mysql> set global validate_password_length=4;

mysql> SET PASSWORD = PASSWORD(‘123456‘);

mysql> flush privileges;

针对airflow使用创建数据库,添加用户并授权

mysql> CREATE DATABASE airflow;

mysql> CREATE USER [email protected] IDENTIFIED BY ‘123456‘;

mysql> GRANT all privileges on airflow.* TO [email protected] IDENTIFIED BY ‘123456‘;

mysql> GRANT all privileges on airflow.* TO [email protected]%‘ IDENTIFIED BY ‘123456‘;

mysql> flush privileges;

账户测试:

# mysql -u af -p

使用123456登陆

安装redis

redis官网下载redis-4.0.9.tar.gz安装包,并上传至需要安装redis的服务器:

$ tar zxvf redis-4.0.9.tar.gz

$ cd redis-4.0.9

$ make

$ cp redis.conf src/

$ cd src

编辑配置文件redis.conf将bind属性改为bind 0.0.0.0

启动redis

$ nohup ./redis-server redis.conf > output.log 2>&1 &

配置airflow

如果执行过airflow -h命令后,则用户目录下面会出现一个airflow文件夹, airflow文件夹下面有个airflow.cfg

的文件,这个就是airflow的配置文件;

编辑airflow.cfg文件,修改一下内容,具体情况根据实际情况填写[ip和端口]:

[core]

#sql_alchemy_conn = mysql://[username]:[password]@[host]:[port]/airflow

sql_alchemy_conn = mysql://af:[email protected]/airflow

executor = CeleryExecutor

[celery]

broker_url = redis://localhost:6379/0

celery_result_backend = redis://localhost:6379/0

配置完成之后即可进行数据库初始化:

$ airflow initdb

启动airflow

关于启动,这个要分应用节点和作业节点:

1) 应用节点:

$ airflow webserver -D

$ airflow scheduler -D

$ airflow worker -D (应用节点可不运行woker)

2) 作业节点:(作业节点只需要运行worker就行)

$ airflow worker -D

3.增加定时任务

在airflow文件夹下面新建dags文件夹用于存储定时任务文件

创建如下的定时任务文件helloworld.py

from datetime import timedelta, datetime

import airflow

from airflow import DAG

from airflow.operators.bash_operator import BashOperator

from airflow.operators.dummy_operator import DummyOperator

default_args = {

‘owner‘: ‘jifeng.si‘,

‘depends_on_past‘: False,

# ‘depends_on_past‘: True,

#‘start_date‘: airflow.utils.dates.days_ago(2),

‘start_date‘: datetime(2018, 5, 2),

‘email‘: [‘email_on_failure‘: False,

‘email_on_retry‘: False,

‘retries‘: 1,

‘retry_delay‘: timedelta(minutes=5),

}

dag = DAG(

‘example_hello_world_dag‘,

default_args=default_args,

description=‘my first DAG‘,

schedule_interval=‘*/25 * * * *‘,

start_date=datetime(2018, 5, 28)

)

dummy_operator = DummyOperator(task_id=‘dummy_task‘, dag=dag)

hello_operator = BashOperator(

task_id=‘sleep_task‘,

depends_on_past=False,

bash_command=‘echo `date` >> /home/py/test.txt‘,

dag=dag

)

dummy_operator >> hello_operator

测试一下代码的正确性:

$ python helloworld.py

如果没出现异常则说明代码无错误, 并且airflow环境正常.

将helloworld.py 放在 /home/py/airflow/dags下.

测试一下任务看任务是否能正常运行:

$ touch ~/test.txt 创建用于测试的文件

$ airflow run -A example_hello_world_dag sleep_task 20180528

如果运行正常,则可以启用该定时任务,启用任务有两种方式:

1) 通过命令启动:

$ airflow unpause example_hello_world_dag

2) 通过界面启动:

b6c297e8997e50372900ede9cddbf188.png

然后观察用户路径下的test.txt文件,如果运行正常的话会不断增加时间信息:

$ cat test.txt

....

Thu May 31 15:55:10 CST 2018

Thu May 31 15:56:10 CST 2018

Thu May 31 15:57:09 CST 2018

Thu May 31 16:04:10 CST 2018

....

4.注意事项

airflow的cron定时器只能精确到分钟,而不能精确到秒

airflow使用的是utc时区,正式使用的时候需要进行时区转换

附录:

附上一个环境初始化shell脚本:

#!/bin/sh

#拷贝mysql依赖

#scp命令必须手工输入密码确认过一次之后才可保证sshpass能正常运行

sshpass -p ‘123456‘ scp -q [email protected]:/root/mysql-community-devel-5.7.22-1.el6.x86_64.rpm /root/ &&

sshpass -p ‘123456‘ scp -q [email protected]:/root/mysql-community-libs-5.7.22-1.el6.x86_64.rpm /root/ &&

sshpass -p ‘123456‘ scp -q [email protected]:/root/mysql-community-common-5.7.22-1.el6.x86_64.rpm /root/ &&

#添加py用户

password=username="py"

pass=$(perl -e ‘print crypt($ARGV[0], "password")‘ $password)

useradd -m -p $pass $username &&

#添加anaconda安装路径

mkdir -p /anaconda &&

chown -R py:py /anaconda &&

#设置pipy源信息

su - py -c "mkdir ~/.pip && touch ~/.pip/pip.conf"

su - py -c "echo ‘[global]‘ >> ~/.pip/pip.conf"

su - py -c "echo ‘trusted-host=pypi.douban.com/simple‘ >> ~/.pip/pip.conf"

su - py -c "echo ‘index-url=http://pypi.douban.com/simple‘ >> ~/.pip/pip.conf"

#安装mysql依赖

old=$(rpm -qa|grep mysql-libs-5.1.73|wc -l)

if [ $old -gt 0]; then

rpm -e --nodeps mysql-libs-5.1.73-5.el6_6.x86_64

fi

rpm -ivh mysql-community-common-5.7.22-1.el6.x86_64.rpm &&

rpm -ivh mysql-community-libs-5.7.22-1.el6.x86_64.rpm &&

rpm -ivh mysql-community-devel-5.7.22-1.el6.x86_64.rpm &&

参考链接:

原文:https://www.cnblogs.com/cord/p/9226608.html

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐