极简智能工作流平台:DAG 编排引擎的设计与实现
极简智能工作流平台DAG 编排引擎的设计与实现一、工作流编排的复杂度陷阱为什么简单配置总是走向图灵完备工作流平台的核心承诺是让非技术人员通过拖拽配置自动化流程。但在实际落地中工作流需求往往从简单的线性流程演变为包含条件分支、并行执行、错误重试、人工审批的复杂 DAG。许多工作流平台在应对复杂度增长时选择了两条路径之一要么保持简单但无法覆盖复杂场景要么不断添加特性直到变成一门图灵完备的编程语言。极简主义的设计哲学要求第三条路径用最少的抽象覆盖最常见的场景同时为长尾场景提供可扩展的逃生舱。具体到 DAG 编排引擎这意味着核心引擎只负责拓扑排序和节点调度条件逻辑、错误处理、状态管理通过插件化的 Hook 机制实现而非硬编码在引擎中。graph TD A[触发器br/Webhook/定时/事件] -- B[参数预处理节点] B -- C{条件判断} C --|路径A| D[并行执行组] C --|路径B| E[串行执行组] D -- D1[任务1] D -- D2[任务2] D -- D3[任务3] D1 -- F[结果聚合] D2 -- F D3 -- F E -- E1[步骤1] E1 -- E2[步骤2] F -- G{审批节点} E2 -- G G --|通过| H[后处理节点] G --|拒绝| I[通知节点] H -- J[输出节点] I -- J style C fill:#fff3e0 style G fill:#fff3e0 style D fill:#e1f5fe二、DAG 引擎的核心抽象节点、边与调度器DAG 引擎的三个核心抽象是节点Node、边Edge和调度器Scheduler。节点是工作流中的执行单元。每个节点有明确的输入输出类型定义以及执行逻辑。节点分为三类计算节点调用 API、执行脚本、控制节点条件判断、循环、等待节点人工审批、外部事件。节点的输入输出通过数据通道传递而非共享内存——这确保了节点间的解耦。边定义了节点间的数据依赖和执行顺序。边分为两类数据边上游输出传递给下游输入和控制边上游状态决定下游是否执行。控制边实现了条件分支和并行执行的语义。调度器负责根据 DAG 拓扑和节点状态决定下一个要执行的节点。调度器的核心算法是拓扑排序找到所有入度为零所有上游已完成的节点提交到执行队列。对于并行节点调度器同时提交多个节点由执行器并发执行。三、DAG 编排引擎的代码实现以下实现展示了极简 DAG 引擎的核心逻辑重点体现拓扑调度、条件分支和错误恢复。from dataclasses import dataclass, field from enum import Enum from typing import Optional, Any, Callable from datetime import datetime import asyncio class NodeStatus(Enum): PENDING pending RUNNING running SUCCESS success FAILED failed SKIPPED skipped # 条件不满足时跳过 WAITING waiting # 等待外部事件如审批 class EdgeType(Enum): DATA data # 数据依赖上游输出传递给下游 CONTROL control # 控制依赖上游状态决定下游是否执行 dataclass class Node: 工作流节点 node_id: str node_type: str # compute, condition, wait, output handler: Optional[Callable] None timeout: float 30.0 # 超时时间秒 retry_count: int 0 # 最大重试次数 status: NodeStatus NodeStatus.PENDING result: Optional[Any] None error: Optional[str] None dataclass class Edge: 工作流边 source_id: str target_id: str edge_type: EdgeType EdgeType.DATA condition: Optional[Callable] None # 控制边的条件函数 dataclass class WorkflowContext: 工作流执行上下文节点间数据传递的通道 workflow_id: str inputs: dict field(default_factorydict) # 工作流输入参数 node_outputs: dict[str, Any] field(default_factorydict) # 节点输出 variables: dict field(default_factorydict) # 工作流级变量 def get_input(self, node_id: str, param_name: str) - Any: 获取节点输入从上游输出或工作流输入中读取 if param_name in self.node_outputs.get(node_id, {}): return self.node_outputs[node_id][param_name] if param_name in self.inputs: return self.inputs[param_name] if param_name in self.variables: return self.variables[param_name] return None def set_output(self, node_id: str, output: Any): 设置节点输出 self.node_outputs[node_id] output class DAGScheduler: DAG 调度器拓扑排序 并行调度 def __init__(self, nodes: list[Node], edges: list[Edge]): self.nodes {n.node_id: n for n in nodes} self.edges edges self.incoming: dict[str, list[Edge]] {} # 入边映射 self.outgoing: dict[str, list[Edge]] {} # 出边映射 # 构建邻接表 for node_id in self.nodes: self.incoming[node_id] [] self.outgoing[node_id] [] for edge in self.edges: self.incoming[edge.target_id].append(edge) self.outgoing[edge.source_id].append(edge) def get_ready_nodes(self) - list[Node]: 获取当前可执行的节点所有上游已完成且条件满足 ready [] for node_id, node in self.nodes.items(): if node.status ! NodeStatus.PENDING: continue # 检查所有入边对应的上游节点是否已完成 all_upstream_done True should_skip False for edge in self.incoming[node_id]: source self.nodes[edge.source_id] # 上游未完成当前节点不可执行 if source.status not in (NodeStatus.SUCCESS, NodeStatus.SKIPPED): all_upstream_done False break # 控制边检查条件是否满足 if edge.edge_type EdgeType.CONTROL and edge.condition: if not edge.condition(source.result): should_skip True break if should_skip: node.status NodeStatus.SKIPPED continue if all_upstream_done: ready.append(node) return ready async def execute(self, ctx: WorkflowContext) - WorkflowContext: 执行工作流循环调度直到所有节点完成或失败 max_iterations len(self.nodes) * 2 # 防止死循环 iteration 0 while iteration max_iterations: iteration 1 ready_nodes self.get_ready_nodes() if not ready_nodes: # 检查是否所有节点都已完成 all_done all( n.status in (NodeStatus.SUCCESS, NodeStatus.FAILED, NodeStatus.SKIPPED, NodeStatus.WAITING) for n in self.nodes.values() ) if all_done: break # 存在未完成但不可执行的节点说明存在循环依赖或死锁 pending [n.node_id for n in self.nodes.values() if n.status NodeStatus.PENDING] if pending: raise RuntimeError( fDeadlock detected: nodes {pending} cannot be scheduled ) break # 并行执行所有就绪节点 tasks [self._execute_node(node, ctx) for node in ready_nodes] results await asyncio.gather(*tasks, return_exceptionsTrue) # 检查执行结果 for node, result in zip(ready_nodes, results): if isinstance(result, Exception): node.status NodeStatus.FAILED node.error str(result) # 失败策略标记下游节点为 SKIPPED self._skip_downstream(node.node_id) return ctx async def _execute_node(self, node: Node, ctx: WorkflowContext): 执行单个节点带重试和超时 if not node.handler: node.status NodeStatus.SUCCESS return for attempt in range(node.retry_count 1): try: node.status NodeStatus.RUNNING result await asyncio.wait_for( node.handler(ctx), timeoutnode.timeout, ) node.result result node.status NodeStatus.SUCCESS ctx.set_output(node.node_id, result) return except asyncio.TimeoutError: node.error fTimeout after {node.timeout}s (attempt {attempt 1}) except Exception as e: node.error f{e} (attempt {attempt 1}) # 所有重试都失败 node.status NodeStatus.FAILED def _skip_downstream(self, failed_node_id: str): 跳过失败节点的所有下游节点 for edge in self.outgoing.get(failed_node_id, []): downstream self.nodes.get(edge.target_id) if downstream and downstream.status NodeStatus.PENDING: downstream.status NodeStatus.SKIPPED self._skip_downstream(edge.target_id)四、DAG 引擎的边界与架构权衡DAG 表达力的边界。DAG 天然不支持循环但某些业务场景需要重试或迭代逻辑。解决方案有二将循环展开为固定次数的节点链适用于已知迭代次数或在节点内部实现循环逻辑适用于动态迭代。两种方案都有局限前者导致 DAG 膨胀后者将逻辑隐藏在节点内部降低了可观测性。状态持久化的性能代价。每个节点执行后都需要持久化状态以支持故障恢复。在高频触发的工作流中如每秒数百个实例状态持久化可能成为瓶颈。优化方向是批量写入和异步持久化但这会引入状态丢失的窗口期。条件边的可观测性。控制边的条件函数是运行时动态计算的静态分析无法确定工作流的实际执行路径。这导致调试困难——用户看到的是一个 DAG 图但实际执行路径取决于运行时数据。建议在执行日志中记录每条控制边的条件求值结果支持事后回溯。并行度的资源控制。并行节点同时提交执行如果没有资源限制可能导致下游服务过载。调度器需要支持最大并行度配置但限制并行度会增加工作流总执行时间。设计决策收益代价插件化 Hook核心引擎简洁Hook 接口稳定性要求高数据通道传递节点解耦序列化/反序列化开销拓扑调度自动并行不支持循环异步持久化高吞吐故障恢复可能丢失状态五、总结极简 DAG 编排引擎的设计核心是最小抽象覆盖最大场景。节点、边、调度器三个核心抽象足以表达绝大多数工作流需求条件分支和错误恢复通过控制边和重试机制实现。但 DAG 的表达力有明确边界——不支持循环、条件路径难以静态分析、并行度需要资源控制。落地路线建议第一从线性流程开始验证引擎稳定性再逐步引入条件分支和并行执行第二所有节点执行日志必须持久化支持事后回溯执行路径第三建立工作流执行时间的基线指标当 P99 延迟超过基线时自动告警。