airflow 文档学习(一) 基本Operator

简单来说,Operator就是task的抽象类

2. BaseOperator

所有的功能性Operator的来源

2.1 参数: task_id (string) :唯一标识task的id owner (string) retries (int):任务重试此时 retry_delay (timedelta) :重试间隔 start_date (datetime):任务开始时间,第一个任务实例的执行时间 end_date (datetime):如果指定的话,任务执行不会超过这个时间 depends_on_past (bool):如果指定的话,任务实例将以此运行并且依赖上一个任务的成功 wait_for_downstream (bool):在这个参数指定的任何地方,depends_on_past都将强制为true queue(str):指定到队列,CeleryExcutor指定特定队列 dag(DAG):指定dag pool(str):此任务运行的插槽池,限制任务的并发 execution_timeout (datetime.timedelta):执行此任务实例允许的最长时间,超过最长时间则任务失败 sla(datetime.timedelta):作业预计成功时间 on_failure_callback(callable):当此任务实例失败时调用的函数 on_retry_callback (callable) :与on_failure_callback相似,只是重试任务时触发 on_success_callback (callable) :与on_failure_callback相似,任务成功时触发 trigger_rule (str):定义依赖的触发规则 ·包括的选项有:{all_success | all_failed | all_done | one_success | one_failed | dummy},默认为all_success run_as_user(str):在运行任务时使用unix用户名进行模拟 executor_config (dict):特定的执行程序解释其任务参数 task_concurrency (int):设置后,任务将限制execution_dates之间的并发运行 resources (dict):资源参数名称(Resources构造函数的参数名称)与其值的映射。 priority_weight(int):此任务的优先级权重 weight_rule(str):用于任务的有效总优先级权重的加权方法 max_retry_delay (timedelta):重试最大延迟间隔 retry_exponential_backoff (bool) 2.2 方法 set_downstream(task_or_task_list) 将任务或者任务列表设置为当前任务的下游 set_upstream(task_or_task_list) 将任务或者任务列表设置为当前任务的上游 clear(start_date=None, end_date=None, upstream=False, downstream=False, session=None) 根据指定参数清除与任务关联的任务实例的状态 execute(context) 创建operator时派生的主要方法 get_direct_relative_ids(upstream=False) 获取当前任务上游或者下游的直接相对id get_direct_relatives(upstream=False) 获取当前任务的上游或者下游直接关联对象 get_flat_relative_ids(upstream=False, found_descendants=None) 获取上游或者下游的关联对象的id列表 get_flat_relatives(upstream=False) 获取上游或者下游的关联对象列表 get_task_instances(session, start_date=None, end_date=None) 获取特定时间范围的与此任务相关的任务实例 has_dag() 是否设置dag on_kill() 当任务实例被杀死时,重写方法以清除子进程 run(start_date=None, end_date=None, ignore_first_depends_on_past=False, ignore_ti_state=False, mark_success=False) 为日期范围运行一组任务实例 post_execute(context, result=None) 调用self.execute()后立即触发,它传递上下文和operator返回的结果 pre_execute(context) 调用self.execute()之前触发 prepare_template() 模板化字段被其内容替换之后触发 render_template(attr, content, context) 从文件或直接在字段中呈现模板,并返回呈现结果 render_template_from_field(attr, content, context, jinja_env) 从字段中呈现模板 xcom_pull(context, task_ids=None, dag_id=None, key='return_value', include_prior_dates=None) xcom_push(context, key, value, execution_date=None) 2.3 可调用值 dag:如果设置了则返回dag,否则报错 deps:返回依赖任务列表 downstream_list:下游任务列表 schedule_interval:任务排列 upstream_list:上游任务列表 3. BaseSensorOperator

基于 airflow.models.BaseOperator, airflow.models.SkipMixin

3.1 参数: soft_fail (bool):设置为true以将任务标记为失败时的skipped poke_interval (int):作业在每次尝试之间应等待的时间(单位:秒) timeout (int):超时时间 3.2 方法: execute(context) poke(context) 4. Core Operators 4.1 airflow.sensors.base_sensor_operator.BaseSensorOperator(poke_interval=60, timeout=604800, soft_fail=False, *args, **kwargs)

基于 airflow.models.BaseOperator

4.1.1 参数: bash_command (string):bash命令 xcom_push (bool):设置为true,则当bash命令完成之后,写入stdout的最后一行也会被推送到xcom env (dict):如果不为None的话,则它定义新进程的环境变量的映射,用于替代当前的进程环境 4.1.2 方法: execute(context) 会在临时目录中执行bash,并且之后会对其进行清理 on_kill() 同BaseOperator 4.2 airflow.operators.bash_operator.BashOperator(bash_command, xcom_push=False, env=None, output_encoding='utf-8', *args, **kwargs)

基于 airflow.operators.python_operator.PythonOperator, airflow.models.SkipMixin

4.3 airflow.operators.python_operator.BranchPythonOperator(python_callable, op_args=None, op_kwargs=None, provide_context=False, templates_dict=None, templates_exts=None, *args, **kwargs)

基于 airflow.operators.python_operator.PythonOperator, airflow.models.SkipMixin

4.4 airflow.operators.check_operator.CheckOperator(sql, conn_id=None, *args, **kwargs)

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpzfzj.html