XComs使得tasks可以交换信息,允许更加细微的控制形式和分享状态。XComs原则上定义成key、value和timestamp,但是也可以跟踪一些属性,例如创建XCom的task/DAG。
XComs可以被pushed或者pulled,当一个task发送一个xcom,这个xcom是普遍可获得的。task可以被发送通过使用方法xcom_push(),此外,当一个task返回一个值时(不管是operators的execute()方法还是PythonOperators的python_callable方法),一个包含着返回值的xcom会自动发送。
tasks调用xcom_pull()去接受xcoms,可选择的根据key、task_ids、dag_id进行过滤。默认的,xcom_pull()在获得值时会根据keys自动筛选执行方法。
如果xcom_pull被传了一个task_id,则对应task最近一次的xcom值会被返回,如果一组task_ids传过去,会返回一组对应的xcom值
也可以直接在模板中获取xcom,例如:SELECT * FROM {{ task_instance.xcom_pull(task_ids='foo', key='table_name') }}
值得注意的是,xcom与variable非常相似,但它是专门用于任务之间的通信而不是全局设置
variables是一种传统的方式去存储和取回任意的内容或者是key-value形式的airflow的设置,它可以在前端界面、代码或者cli中进行增删改查的操作,当你定义管道代码,就可非常方便的使用,例如:
from airflow.models import Variable foo = Variable.get("foo") bar = Variable.get("bar", deserialize_json=True)你可以使用variables在一个jinjia模板中:
echo {{ var.value.<variable_name> }} 2.7 Branching有时候你需要你的工作流进行分支,或者是根据任意上游发生的条件走某条路。这其中一种实现方法就是使用BranchPythonOperator
BranchPythonOperator和PythonOperator十分相似,除了python会期望一个python_callable去返回一个task_id,返回的task_id跳过所有其他路径,python的返回函数的task_id必须直接引用BranchPythonOperator任务下游的任务。
注意,在BranchPythonOperator中使用depend_on_past = True下游的任务在逻辑上是不合理的,因为跳过状态总是导致依赖于过去成功的块任务。如果非要这样的话可以中间建立一个虚拟任务进行过度。
2.8 SubDAGs 2.9 SLAs记录失败的过错的sla任务列表
2.10 Trigger Rules虽然正常的工作流行为是在所有的直接上游任务成功之后触发的,但是airflow允许更为复杂的依赖项。
所有的operators有一个trigger_rule,用来定义生成的任务被触发的规则,trigger_rule的默认参数是all_success,以下为别的参数解释:
注意,这些可以与depends_on_past结合使用,当设置为true时,如果任务的先前计划未成功,则不会触发
2.11 Latest Run Only标准工作流行为涉及为特定日期/时间范围内运行的一系列任务,但是,某些工作流执行的任务与运行时间无关,但是需要按计划运行,就像标准的cron作业一样,在这些情况下,暂停期间错过的回填运行作业会浪费cpu周期。
2.12 Zombies & Undeads僵尸任务的特点是没有心跳(由工作定期发出)和数据库中的运行状态,当工作节点无法访问数据库的时候,airflow进程在外部被终止或者节点重启的时候,他们可能会发生。僵尸查杀由调度程序的进程定期执行。
undead进程的特点是存在进程和匹配的心跳,但是airflow不知道此任务在数据库中运行。这种不匹配通常在数据库状态发生改变的时候发生,最有可能是通过删除UI中的任务实例视图中的行,指示任务验证其作为心跳例程的一部分的状态,并在确定他们处于这种不死的状态时终止自身。
你本地airflow设置文件可以定义一个策略功能,该功能可以根据其他任务或DAG属性改变其任务属性。
2.14 Documentation & Notes 2.15 Jinja Templating