Airflow任务组失败处理:让触发与监听共进退
1. 项目概述为什么“任务组失败”是 Airflow 生产环境里最隐蔽的定时炸弹我在金融数据平台带团队做调度系统升级时连续三个月被同一个问题反复打脸每天凌晨三点一个关键的 ETL 流水线会准时失败但重跑一次就成功。日志里找不到明显报错监控看资源也完全正常。最后发现问题出在 Azure Data Factory Pipeline 的两个任务上——RunOperator 触发了远程作业SensorOperator 等待它完成并检查状态。RunOperator 成功返回了 pipeline ID但 SensorOperator 在轮询过程中因为网络抖动超时失败。Airflow 默认只重试 Sensor而 RunOperator 已标记为 success不会重放。结果就是下游所有依赖这个 pipeline 输出的清洗、建模任务全被卡死整个数据链路中断四小时直到人工介入重跑。这根本不是“单点故障”而是典型的任务组语义断裂——Airflow 把两个逻辑上必须共进退的操作当成了彼此独立的原子单元。这就是本文要解决的核心问题在 Airflow 中如何让一组存在强业务耦合关系的任务比如“触发 监听”、“提取 校验”、“发送 确认”真正作为一个整体来失败、重试和恢复关键词Airflow不只是工具名它代表了一种工程约束DAG 是有向无环图但业务逻辑常常是闭环的、成对的、状态强关联的。官方文档里从不提“任务组”社区讨论也多聚焦在 DAG 编排或 UI 优化可真实生产环境里90% 的非代码类故障都源于这种语义鸿沟。我见过太多团队用 hack 方式绕过——比如把 Sensor 逻辑硬塞进 RunOperator 里或者写个自定义 Operator 把两步合成一步。这些方案短期能用但代价是牺牲可观测性、破坏幂等性设计、增加调试成本。本文提供的不是“技巧”而是基于 Airflow 内部 TaskInstance 状态机原理的正解。它不改源码、不依赖插件、不引入外部服务纯 Python 实现已在我负责的三个千级 DAG 规模集群稳定运行 18 个月。如果你正在用 Airflow 调度云服务 API、批处理作业、外部系统集成或者任何需要“发起请求 等待响应”的场景这篇文章能帮你省下至少 200 小时的故障排查时间。2. 核心设计思路为什么必须绕开 DAG 依赖直击 TaskInstance 状态层2.1 DAG 依赖的本质是“执行顺序”不是“状态绑定”很多人第一次遇到这个问题时第一反应是检查或set_downstream的写法是否正确。比如这样写task_run AzureDataFactoryRunPipelineOperator( task_idrun_pipeline, pipeline_namedaily_ingestion, # ... 其他参数 ) task_sensor AzureDataFactoryPipelineRunStatusSensor( task_idwait_for_pipeline, pipeline_run_id{{ ti.xcom_pull(task_idsrun_pipeline) }}, # ... 其他参数 ) task_run task_sensor看起来天衣无缝task_run必须成功task_sensor才会启动。但这里有个致命的认知偏差——DAG 边只控制执行流不控制状态流。Airflow 的调度器在构建执行计划时确实会根据 DAG 边决定task_sensor的前置条件是task_run.state success。可一旦task_sensor开始执行它的生命周期就完全脱离了task_run的状态管理。如果task_sensor因为网络超时、认证失效、目标服务不可用等原因失败Airflow 只会把它自己的TaskInstance状态设为failed然后按retries3的配置重试它自己。task_run的状态早已固化为success调度器连看都不会再看它一眼。这不是 bug是设计使然Airflow 必须保证上游任务的成功结果可复用。想象一下如果task_extract成功读取了 10TB 数据到临时表task_transform失败后task_extract被强制重跑那不仅是资源浪费更可能引发数据重复写入或锁表风险。所以默认行为完全合理但它恰好撞上了我们这类“发起即承诺”的集成场景。2.2 真正的突破口在 TaskInstance 的上游追溯能力既然 DAG 边走不通就得换条路。我翻了 Airflow 2.2 的源码发现TaskInstance对象有一个常被忽略的属性task。而task对象本身又包含upstream_task_ids和downstream_task_ids这两个列表。更重要的是TaskInstance的get_direct_relatives()方法能直接获取其上下游的TaskInstance实例。这意味着当task_sensor进入on_failure_callback时我们完全可以通过它反向查到task_run的TaskInstance然后手动修改task_run的状态。这比操作 DAG 层面的元数据安全得多因为TaskInstance是运行时实体状态变更直接影响调度器的下一轮决策。我测试过在on_failure_callback里调用session.merge()更新TaskInstance.state后调度器会在下一个调度周期默认 30 秒自动识别到该实例状态变为up_for_retry并像对待普通失败任务一样触发重试逻辑。整个过程不重启 Webserver不刷新 DAG零感知。2.3 为什么不能简单用trigger_dag或clear_task_instances有同事提议用TriggerDagRunOperator重新触发整个 DAG。这看似粗暴有效但实际埋了三个雷第一它会创建全新的DagRun导致历史记录断裂无法追踪原始失败根因第二如果 DAG 里有LatestOnlyOperator或TimeDeltaSensor这类依赖时间窗口的组件新触发的 DAG 可能跳过关键校验第三也是最致命的它会丢失原始DagRun的conf参数比如动态传入的日期分区ds导致下游任务处理错误的数据集。另一种方案是用 CLI 命令airflow tasks clear但这是运维操作无法嵌入到任务失败的自动化流程中且需要额外权限管控。我们的目标是让失败处理逻辑成为任务定义的一部分就像retries和retry_delay那样声明式、可版本化、可审计。所以最终方案必须满足纯 Python、运行时生效、不依赖外部命令、状态变更可回滚、与 Airflow 原生重试机制无缝集成。3. 实操细节解析三步实现任务组状态同步3.1 第一步精准定位上游任务实例支持深度遍历与 ID 白名单双模式核心函数get_upstream_task_instances必须解决两个现实问题一是有些场景需要“就近原则”比如只重试直接上游的 RunOperator二是复杂 DAG 里存在分支比如task_run同时触发了task_sensor_a和task_sensor_b现在task_sensor_a失败我们只想拉上task_run不想误伤task_sensor_b的上游。因此函数设计为双模式def get_upstream_task_instances( ti: TaskInstance, upstream_task_ids: Optional[List[str]] None, upstream_depth: Optional[int] None, session: Optional[Session] None ) - List[TaskInstance]: 获取指定 TaskInstance 的上游 TaskInstance 列表 Args: ti: 当前失败/重试的 TaskInstance upstream_task_ids: 显式指定要包含的上游 task_id 列表白名单模式 upstream_depth: 从当前 ti 向上追溯的最大层级深度优先模式 session: SQLAlchemy session若未提供则自动创建 Returns: 符合条件的 TaskInstance 列表 if not session: session settings.Session() # 模式一白名单优先显式指定 task_id if upstream_task_ids: return session.query(TaskInstance).filter( TaskInstance.dag_id ti.dag_id, TaskInstance.execution_date ti.execution_date, TaskInstance.task_id.in_(upstream_task_ids), TaskInstance.state.in_([success, up_for_retry, queued, running]) ).all() # 模式二深度遍历适用于线性依赖链 if upstream_depth is not None and upstream_depth 0: # 使用递归 CTE 查询PostgreSQL/MySQL 8.0或循环查询兼容旧版 # 此处为简化版循环实现生产环境建议用原生 SQL CTE 提升性能 upstream_tis [] current_level_tis [ti] for depth in range(1, upstream_depth 1): next_level_tis [] for current_ti in current_level_tis: # 获取 current_ti 的直接上游 task_ids upstream_ids current_ti.task.upstream_task_ids if not upstream_ids: continue # 查询这些 task_id 在同一 dag_run 下的 TaskInstance level_tis session.query(TaskInstance).filter( TaskInstance.dag_id ti.dag_id, TaskInstance.execution_date ti.execution_date, TaskInstance.task_id.in_(upstream_ids), TaskInstance.state.in_([success, up_for_retry, queued, running]) ).all() next_level_tis.extend(level_tis) if not next_level_tis: break upstream_tis.extend(next_level_tis) current_level_tis next_level_tis return list(set(upstream_tis)) # 去重 # 默认只返回直接上游 direct_upstream_ids ti.task.upstream_task_ids if not direct_upstream_ids: return [] return session.query(TaskInstance).filter( TaskInstance.dag_id ti.dag_id, TaskInstance.execution_date ti.execution_date, TaskInstance.task_id.in_(direct_upstream_ids), TaskInstance.state.in_([success, up_for_retry, queued, running]) ).all()提示白名单模式upstream_task_ids适合绝大多数场景因为它明确、可控、性能好。深度模式upstream_depth仅在 DAG 结构高度动态时使用比如用BranchPythonOperator生成不同分支且你希望某个分支失败时把该分支路径上的所有上游都拉下来重试。但要注意深度模式在复杂 DAG 中可能查到无关任务务必配合state.in_()过滤避免把已失败或已跳过的任务也拉进来。3.2 第二步安全地批量更新上游任务状态状态更新是高危操作必须遵循 Airflow 的状态机规则。不能直接把success改成failed因为failed是终态无法再重试。正确的做法是设为up_for_retry让调度器接管后续流程。同时要处理并发冲突——如果多个 Sensor 同时失败它们可能同时尝试修改同一个 RunOperator 的状态必须加数据库行锁def mark_upstream_for_retry( ti: TaskInstance, upstream_task_ids: Optional[List[str]] None, upstream_depth: Optional[int] None, session: Optional[Session] None ) - int: 将上游 TaskInstance 标记为 up_for_retry Returns: 成功更新的数量 if not session: session settings.Session() try: upstream_tis get_upstream_task_instances( ti, upstream_task_ids, upstream_depth, session ) updated_count 0 for upstream_ti in upstream_tis: # 关键只更新处于 success 状态的上游任务 # 避免覆盖 already_failed 或 up_for_retry 的状态 if upstream_ti.state ! success: continue # 使用 SELECT FOR UPDATE 加行锁防止并发修改 locked_ti session.query(TaskInstance).filter( TaskInstance.dag_id upstream_ti.dag_id, TaskInstance.execution_date upstream_ti.execution_date, TaskInstance.task_id upstream_ti.task_id ).with_for_update().first() if locked_ti and locked_ti.state success: # 更新状态和重试计数 locked_ti.state up_for_retry locked_ti.try_number 1 # 增加重试次数 locked_ti.start_date timezone.utcnow() # 重置开始时间 locked_ti.end_date None session.merge(locked_ti) updated_count 1 session.commit() return updated_count except Exception as e: session.rollback() logging.error(fFailed to mark upstream for retry: {e}) raise finally: if not session: session.close() def mark_upstream_as_failed( ti: TaskInstance, upstream_task_ids: Optional[List[str]] None, upstream_depth: Optional[int] None, session: Optional[Session] None ) - int: 将上游 TaskInstance 标记为 failed慎用通常用于不可重试场景 # 实现逻辑类似 mark_upstream_for_retry但 state 设为 failed # 注意failed 是终态设置后无法再重试仅用于业务上绝对不可逆的操作 pass注意try_number 1这一行至关重要。Airflow 的重试逻辑依赖try_number和max_tries的比较。如果只改state不改try_number调度器会认为这是第 0 次重试可能立即再次失败。另外start_date重置是为了让重试任务的耗时统计准确否则会把首次执行时间也算进去。3.3 第三步将回调函数注入到任务定义中这才是让方案落地的关键。回调函数必须作为任务参数传入而不是全局注册这样才能保证每个任务组的策略独立。以 Azure Data Factory 为例from airflow import DAG from airflow.operators.python import PythonOperator from airflow.providers.microsoft.azure.operators.data_factory import AzureDataFactoryRunPipelineOperator from airflow.providers.microsoft.azure.sensors.data_factory import AzureDataFactoryPipelineRunStatusSensor from datetime import datetime, timedelta def sensor_failure_callback(context): Sensor 失败时的回调重试其上游的 RunOperator ti context[task_instance] # 显式指定要重试的上游 task_id mark_upstream_for_retry( titi, upstream_task_ids[run_pipeline] # 精准锁定 ) def sensor_retry_callback(context): Sensor 重试时的回调同样重试上游避免状态不一致 ti context[task_instance] mark_upstream_for_retry( titi, upstream_task_ids[run_pipeline] ) default_args { owner: data-engineering, depends_on_past: False, start_date: datetime(2023, 1, 1), email_on_failure: True, retries: 2, retry_delay: timedelta(minutes5), } dag DAG( adf_pipeline_orchestration, default_argsdefault_args, descriptionOrchestrate ADF pipeline with grouped failure handling, schedule_interval0 3 * * *, # 每天凌晨3点 catchupFalse, tags[azure, data-factory, production], ) # RunOperator触发 ADF pipeline task_run AzureDataFactoryRunPipelineOperator( task_idrun_pipeline, pipeline_namedaily_ingestion, resource_group_namerg-data-pipeline, factory_nameadf-prod, # ... 其他必要参数 dagdag, ) # SensorOperator等待 pipeline 完成 task_sensor AzureDataFactoryPipelineRunStatusSensor( task_idwait_for_pipeline, pipeline_run_id{{ ti.xcom_pull(task_idsrun_pipeline) }}, resource_group_namerg-data-pipeline, factory_nameadf-prod, # 关键注入回调 on_failure_callbacksensor_failure_callback, on_retry_callbacksensor_retry_callback, # 注意sensor 自身也要配置重试否则不会触发 on_retry_callback retries3, retry_delaytimedelta(minutes2), dagdag, ) task_run task_sensor这里有个易错点on_retry_callback只有在任务进入up_for_retry状态时才会触发而up_for_retry的前提是任务先失败failed然后调度器根据retries配置将其状态改为up_for_retry。所以task_sensor必须配置retries 0否则on_retry_callback永远不会执行。我见过太多人漏掉这行导致回调函数形同虚设。4. 完整实操流程与生产级配置验证4.1 搭建最小可验证环境MVE别急着上生产先用本地 Airflow 测试闭环。我推荐用 Docker Compose 启一个 Airflow 2.6 单节点# docker-compose.yml version: 3 services: webserver: image: apache/airflow:2.6.3 environment: - AIRFLOW__CORE__EXECUTORSequentialExecutor - AIRFLOW__CORE__SQL_ALCHEMY_CONNsqlite:///airflow.db - AIRFLOW__CORE__FERNET_KEY46BKJoQYlPPOexq0OhDZnIlNepKFf87WFwLbfzqDDho volumes: - ./dags:/opt/airflow/dags - ./plugins:/opt/airflow/plugins ports: - 8080:8080把上面的 DAG 文件保存为dags/adf_orchestration.py然后启动docker-compose up -d # 等待 Webserver 启动后访问 http://localhost:8080启用 DAG4.2 模拟故障并验证状态同步Airflow UI 是最好的验证工具。按以下步骤操作手动触发 DAG在 UI 中点击Trigger DAG选择adf_pipeline_orchestration。强制 Sensor 失败在wait_for_pipeline任务详情页点击Clear清除该任务实例。然后编辑其日志找到AzureDataFactoryPipelineRunStatusSensor的poke方法在返回False前插入raise AirflowException(Simulated network timeout)。这会让 Sensor 在每次 poke 时都失败。观察状态流转第一次执行run_pipeline→successwait_for_pipeline→failed第一次失败第二次调度约30秒后wait_for_pipeline→up_for_retry触发on_retry_callback同时run_pipeline→up_for_retry我们的回调生效第三次调度run_pipeline→success重放wait_for_pipeline→success这次 poke 成功实测心得在 SequentialExecutor 模式下整个过程约 2 分钟内完成。切换到 CeleryExecutor 时由于任务分发延迟状态同步可能有 10-15 秒滞后但逻辑完全一致。关键指标是查看TaskInstance表SELECT * FROM task_instance WHERE dag_idadf_pipeline_orchestration AND execution_date2023-01-01 ORDER BY start_date;你会看到run_pipeline的try_number从 1 变成 2state从success变成up_for_retry再变回success。4.3 生产环境加固配置上线前必须做三件事第一添加重试保护。回调函数本身不能失败否则整个机制崩塌。给mark_upstream_for_retry加一层装饰器import functools import time def robust_callback(max_retries3, delay1): def decorator(func): functools.wraps(func) def wrapper(*args, **kwargs): for attempt in range(max_retries): try: return func(*args, **kwargs) except Exception as e: if attempt max_retries - 1: raise time.sleep(delay * (2 ** attempt)) # 指数退避 return None return wrapper return decorator robust_callback(max_retries3, delay1) def sensor_failure_callback(context): # 原有逻辑 pass第二配置告警分级。不是所有上游重试都需要通知。在sensor_failure_callback里加入判断def sensor_failure_callback(context): ti context[task_instance] # 只有当上游重试次数超过阈值时才发严重告警 upstream_tis get_upstream_task_instances(ti, upstream_task_ids[run_pipeline]) if upstream_tis and upstream_tis[0].try_number 2: send_pagerduty_alert(fADF pipeline {ti.dag_id} retrying for the 3rd time) mark_upstream_for_retry(ti, upstream_task_ids[run_pipeline])第三审计日志。在回调里记录关键操作logging.info( fGrouped failure handler triggered for {ti.task_id} fin DAG {ti.dag_id}. Marked upstream tasks {upstream_task_ids} ffor retry. Original state: {upstream_tis[0].state if upstream_tis else none} )5. 常见问题与实战排查技巧5.1 问题速查表问题现象可能原因排查命令/方法解决方案on_failure_callback完全没执行1. 任务未配置retries 02. DAG 未启用3. 回调函数抛出异常未被捕获airflow dags list检查 DAG 状态airflow tasks list dag_id确认任务存在查看airflow-webserver日志搜索callback确保retries1以上在回调外层加try/except并打日志上游任务状态没变还是success1.upstream_task_ids拼写错误2. 上游任务不在同一execution_date3. 上游任务已处于failed或up_for_retrySELECT * FROM task_instance WHERE dag_iddag_id AND task_id IN (run_pipeline) AND execution_datedate;核对 task_id 大小写确认execution_date是否一致检查上游状态是否允许修改重试后run_pipeline执行了两次数据重复1.run_pipeline本身不具备幂等性2. XCom 传递的pipeline_run_id被缓存查看run_pipeline的日志确认是否生成了新 pipeline ID在run_pipeline中添加幂等性校验比如用XCom.get_one()检查是否已存在 pipeline ID多个 Sensor 同时失败run_pipeline被多次重试并发修改未加锁导致try_number只增1次但状态被多次设为up_for_retry查看task_instance表try_number字段是否异常确保mark_upstream_for_retry中使用with_for_update()行锁回调执行慢拖慢整个 DAG 调度数据库查询未加索引或upstream_depth过大EXPLAIN ANALYZE查看 SQL 执行计划为task_instance(dag_id, execution_date, task_id)创建联合索引5.2 独家避坑技巧技巧一用XCom传递上下文避免硬编码不要在回调里写死upstream_task_ids[run_pipeline]。改成从context里动态读取def sensor_failure_callback(context): ti context[task_instance] # 从 XCom 读取上游 task_id由 RunOperator 在成功时推送 upstream_task_id ti.xcom_pull( task_idsti.task_id.replace(wait_for_, run_), keyupstream_task_id ) or ti.task_id.replace(wait_for_, run_) mark_upstream_for_retry(ti, upstream_task_ids[upstream_task_id])然后在run_pipeline任务里加一行task_run AzureDataFactoryRunPipelineOperator( # ... 其他参数 do_xcom_pushTrue, # 确保推送 XCom ) # 在 operator 的 execute 方法末尾或用 PythonOperator 包装 def push_upstream_context(**context): context[ti].xcom_push(keyupstream_task_id, valuerun_pipeline) push_task PythonOperator( task_idpush_upstream_context, python_callablepush_upstream_context, dagdag, ) task_run push_task task_sensor这样即使 DAG 重构只要命名规范wait_for_X对应run_X回调就能自动适配。技巧二为 Sensor 添加“软失败”开关有些场景下Sensor 失败是预期行为比如等待的文件还没生成。这时不该重试整个组。加一个soft_fail参数class SmartSensor(AzureDataFactoryPipelineRunStatusSensor): template_fields AzureDataFactoryPipelineRunStatusSensor.template_fields (soft_fail_on_timeout,) def __init__(self, soft_fail_on_timeout: bool False, **kwargs): super().__init__(**kwargs) self.soft_fail_on_timeout soft_fail_on_timeout def poke(self, context: Dict): try: return super().poke(context) except Exception as e: if self.soft_fail_on_timeout: # 记录为 skipped不触发回调 logging.info(fSoft fail enabled, skipping {self.task_id}) return True # 返回 True 表示成功跳过后续逻辑 raise # 使用时 task_sensor SmartSensor( task_idwait_for_pipeline, soft_fail_on_timeoutTrue, # 这次失败不重试上游 # ... )技巧三可视化状态同步链路在 Airflow UI 的 DAG Graph View 里很难看出哪个任务被哪个回调影响。我写了个小插件自动在任务节点上加注释# plugins/grouped_failure_plugin.py from airflow.plugins_manager import AirflowPlugin from airflow.www import utils as wwwutils def add_grouped_failure_annotation(task, task_instance): if hasattr(task, on_failure_callback) and sensor_failure_callback in str(task.on_failure_callback): upstream_ids getattr(task, _grouped_upstream_ids, []) if upstream_ids: return f↑ Grouped with: {, .join(upstream_ids)} return wwwutils.task_instance_state_color add_grouped_failure_annotation重启 Webserver 后Graph View 里每个 Sensor 节点下方会显示↑ Grouped with: run_pipeline一目了然。6. 扩展应用不止于 Sensor-Run 模式这套机制的威力远不止解决 Sensor 问题。我把它抽象成一个通用模式已成功应用于五类场景6.1 场景一Kubernetes Pod 的“启动 健康检查”组# KubernetesPodOperator 启动容器CustomHealthCheckSensor 检查 readiness task_pod KubernetesPodOperator( task_iddeploy_service, namemy-service, # ... ) task_health CustomHealthCheckSensor( task_idcheck_service_health, endpointhttp://my-service:8080/health, # 注入回调失败时重试 deploy_service on_failure_callbacklambda ctx: mark_upstream_for_retry(ctx[task_instance], [deploy_service]), )6.2 场景二Snowflake 作业的“提交 监控”组# SnowflakeOperator 提交 SQLSnowflakeQueryStatusSensor 监控执行状态 task_submit SnowflakeOperator( task_idsubmit_query, sqlINSERT INTO ..., ) task_monitor SnowflakeQueryStatusSensor( task_idmonitor_query, query_id{{ ti.xcom_pull(task_idssubmit_query) }}, on_failure_callbacklambda ctx: mark_upstream_for_retry(ctx[task_instance], [submit_query]), )6.3 场景三跨 DAG 的强依赖协调有时一个 DAG 的成功依赖另一个 DAG 的某个任务。传统做法是用ExternalTaskSensor但它只检查状态不处理失败联动。我们可以# 在 DAG_A 的 sensor 中不仅检查 DAG_B 的任务还准备重试 DAG_B def cross_dag_failure_callback(context): ti context[task_instance] # 触发 DAG_B 的重跑 trigger_dag( dag_iddag_b, execution_dateti.execution_date, conf{reason: upstream_failed_in_dag_a} ) # 同时标记本 DAG 的上游任务重试 mark_upstream_for_retry(ti, upstream_task_ids[prepare_data])6.4 场景四人工审批环节的“申请 审批”组# EmailOperator 发送审批邮件CustomApprovalSensor 等待人工回复 task_apply EmailOperator( task_idsend_approval_request, tomanagercompany.com, subjectApprove data export, ) task_approve CustomApprovalSensor( task_idwait_for_approval, email_thread_id{{ ti.xcom_pull(task_idssend_approval_request) }}, on_failure_callbacklambda ctx: mark_upstream_for_retry(ctx[task_instance], [send_approval_request]), )6.5 场景五资源清理的“分配 释放”组这是最容易被忽视的。比如用EC2StartInstanceOperator启动临时计算节点用EC2StopInstanceOperator停止。如果 Stop 失败节点可能一直运行产生费用。我们可以让 Stop 失败时重试整个“启动-停止”流程task_start EC2StartInstanceOperator( task_idstart_worker, instance_idi-12345, ) task_stop EC2StopInstanceOperator( task_idstop_worker, instance_idi-12345, # Stop 失败时重试 Start确保节点状态一致 on_failure_callbacklambda ctx: mark_upstream_for_retry(ctx[task_instance], [start_worker]), )我在实际项目中发现超过 60% 的 Airflow 生产事故根源不是调度器故障而是任务间的状态契约没有被显式表达和强制执行。这篇方案的价值不在于它多精巧而在于它把隐含的业务逻辑变成了可配置、可测试、可审计的代码。当你下次再看到一个“触发 监听”的任务对时别再把它当成两个独立任务了。拿出这个模板花五分钟配置好回调你就已经把系统可靠性提升了一个数量级。最后分享个小技巧把这个mark_upstream_for_retry函数封装成公司内部的AirflowUtils包所有新 DAG 都强制要求在requirements.txt里声明用 CI/CD 流水线扫描 DAG 代码确保每个 Sensor 都配置了对应的回调——这才是真正的生产就绪。