airflow 文档学习(二) - 概念

1. 核心功能 1.1 DAGs

有向无环图

反映所涉及的task的依赖关系

注:搜索dag的时候,airflow只会关注同事包含"DAG"和"airflow"字样的py文件

1.2 scope

airflow将加载任何可以从DAG file中import的DAG对象,但是它们必须出现在globals()中,例如下面的文件,只有tag_1会被加载,tag_2只会出现在本地scope中

dag_1 = DAG('this_dag_will_be_discovered') def my_function(): dag_2 = DAG('but_this_dag_will_not') my_function() 1.3 Default Arguments

如果一个字典default_args被传给一个DAGs,它将会将其运用到所有的它的operator中。这使得复用default_args变得非常的方便

1.4 Context Manager

dags可以被当做一个管理器,去自动的分配新的operators给dag

1.5 Operators

dags描述的是怎么去跑一个工作流,operators决定实际做什么。

一个operator描述了在一个工作流中的单个task。operators经常但不总是原子的,这意味着他们可以独立存在而不需要去和别的operator分享资源。DAG将确保operators以正确的顺序运行,在这些依赖之外,operator通常是独立运行的,甚至他们肯能运行在不同的机器上。

这是个非常微妙的关键点:事实上,如果两个operator需要去共享一些信息,就像文件名或者一些小的数据,你应该去考虑将他们合并到一个operator中,如果上述情况确实是无法避免的,airflow有operator的交叉通信(xcom)在文档中有描述。

并且airflow提供了非常多的通用operator:

BashOperator PythonOperator EmailOperator SimpleHttpOperator 等等 1.6 DAG Assignment

operator不用立马被分配给一个dag,但是,一旦operator被分配给了一个dag,它将无法被转移或者是取消分配。dag的分配在operator被创建之后可以被非常明确的完成,通过延期分配或者从其他operator推断的方式

例如下面的方式:

dag = DAG('my_dag', start_date=datetime(2016, 1, 1)) # sets the DAG explicitly explicit_op = DummyOperator(task_id='op1', dag=dag) # deferred DAG assignment deferred_op = DummyOperator(task_id='op2') deferred_op.dag = dag # inferred DAG assignment (linked operators must be in the same DAG) inferred_op = DummyOperator(task_id='op3') inferred_op.set_upstream(deferred_op) 1.7 Bitshift Composition

在以前,operator的关系描述是通过set_upstream()和set_downstream()方法,在1.8 之后可以通过<<和>>代替依赖方法。

1.8 Tasks

当一个operator被实例化之后,它就被称为是一个task。实例化在调用抽象operator时定义了具体的值,同时,参数化之后的task会称为dag的一个节点。

1.9 Task Instances

一个task实例代表一个task的特定运行,其特征在于dag、任务、和时间点的组合。

它拥有运行状态:running、success、failed、skipped、up for retry等

1.10 Workflows

通过组合dags和operators,你会创建TaskInstances,你可以创建复杂的工作流。

2. Additional Functionality 2.1 Hooks

Hooks是连接一些平台和数据库的接口,类似于 Hive, S3, MySQL, Postgres, HDFS, Pig。hooks尽可能的实现了通用接口,并且充当operator。还需要使用airflow.models.Connection 模型来检索主机名和身份认证信息,hooks将身份认证信息和代码放在管道之外,集中在元数据库中。

2.2 pools

一些系统会因为太多的进程不堪重负,airflow的pool可以被用作限制任意的task的运行。task可以在创建时通过参数指定存在的pool名称。

例如:

aggregate_db_message_job = BashOperator( task_id='aggregate_db_message_job', execution_timeout=timedelta(hours=3), pool='ep_data_pipeline_db_msg_agg', bash_command=aggregate_db_message_job_cmd, dag=dag) aggregate_db_message_job.set_upstream(wait_for_empty_queue)

pool中可以使用priority_weight参数去定义他的在队列中的权重,并且决定哪个task先行执行。

当容量被撑满时,task将会放入计划执行,一旦有容量,可运行的task和他们的状态将会被在前端展示,当插槽空闲,队列中的task将会基于权重进行排序执行。

2.3 Connections

外部系统的connection信息被存储在airflow的元数据库中,airflow的管道可以很简单地引用被集中管理的conn_id,无需另外进行操作。

当许多的connections被定义在同一个conn_id下,在这种情况下,当hooks使用get_connection方法时,airflow将随机选择一个connection,当重试时允许一些基本的负载均衡和容错。

一些hooks有默认的conn_id,当operators使用这个hook时不需要一个明确的conn_id。例如:PostgresHook的默认conn_id是postgres_default

2.4 Queues

当我们使用CeleryExecutor时,被塞入celery队列的task是可以被规定的,队列是BaseOperator的属性,所以任何task可以被分配到任何的队列中,而默认的队列环境是在配置文件的celery下的default_queue中配置的。

workers可以监听一个或多个队列中的task,当一个worker启动的时候(使用airflow worker命令),一个以逗号分隔的对列名可以被指定(airflow worker -q spark),这个worker将只会选择那些被连接到指定对列的task。

2.5 XComs

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wppzjf.html