多 Agent 协作系统:从任务分解到冲突消解的编排架构
多 Agent 协作系统从任务分解到冲突消解的编排架构一、多 Agent 协作的现实困境单 Agent 的能力天花板大模型应用落地到复杂业务场景时单 Agent 架构很快暴露出瓶颈。一个 Agent 同时承担意图识别、工具调用、结果校验和异常恢复上下文窗口被撑满指令遵循率急剧下降。生产环境中客服系统需要同时处理意图路由、知识检索、工单创建和情绪安抚数据分析平台需要同时完成 SQL 生成、结果校验、图表渲染和报告撰写。这些任务的认知负荷差异巨大强行塞进一个 Agent 的 Prompt 里只会导致每个环节都做不好。多 Agent 协作的核心动机不是分工好看而是降低单次推理的认知复杂度让每个 Agent 的 Prompt 聚焦在一个明确的职责边界内。但协作本身引入了新的工程难题任务如何分解、Agent 间如何通信、冲突如何消解、全局状态如何一致。本文从架构层面拆解这些问题给出可落地的工程方案。二、编排模式与通信机制的原理剖析多 Agent 系统的编排模式主要分为三类顺序编排、层级编排和去中心化编排。不同模式适用于不同的业务场景选择错误会导致不必要的复杂度或能力缺失。graph TB subgraph 顺序编排 A1[Agent A] -- A2[Agent B] -- A3[Agent C] end subgraph 层级编排 O[Orchestrator] -- W1[Worker Agent 1] O -- W2[Worker Agent 2] O -- W3[Worker Agent 3] W1 -- O W2 -- O W3 -- O end subgraph 去中心化编排 D1[Agent 1] -- D2[Agent 2] D2 -- D3[Agent 3] D1 -- D3 end顺序编排最简单Agent 按固定链路依次处理前一个的输出是后一个的输入。适合流水线型任务如文档翻译→校对→排版但不支持条件分支和并行。层级编排引入 Orchestrator 角色负责任务分解和结果聚合。Worker Agent 之间不直接通信所有交互通过 Orchestrator 中转。这是目前生产环境中最常见的模式因为 Orchestrator 掌握全局状态便于实现冲突检测和优先级调度。去中心化编排中所有 Agent 地位对等通过共享消息总线或黑板系统通信。灵活度最高但一致性和死锁问题难以控制目前仅在学术原型中验证。通信机制上Agent 间传递的消息需要结构化。推荐使用 JSON Schema 约束消息格式包含task_id、sender、receiver、payload、status五个必选字段。这比自然语言传递更可靠也便于 Orchestrator 做消息路由和状态追踪。三、生产级多 Agent 编排框架实现以下实现基于层级编排模式使用 Python 和 OpenAI Function Calling 构建import json import uuid from typing import Any from dataclasses import dataclass, field from enum import Enum class AgentStatus(Enum): IDLE idle RUNNING running WAITING waiting COMPLETED completed FAILED failed dataclass class AgentMessage: Agent 间通信的结构化消息 task_id: str sender: str receiver: str payload: dict[str, Any] status: str pending msg_id: str field(default_factorylambda: str(uuid.uuid4())) class BaseAgent: Agent 基类封装 LLM 调用与消息处理 def __init__(self, name: str, system_prompt: str, tools: list | None None): self.name name self.system_prompt system_prompt self.tools tools or [] self.status AgentStatus.IDLE self.message_queue: list[AgentMessage] [] def receive(self, message: AgentMessage) - None: self.message_queue.append(message) async def execute(self, client) - dict: 执行当前任务返回结构化结果 self.status AgentStatus.RUNNING try: messages [{role: system, content: self.system_prompt}] for msg in self.message_queue: messages.append({ role: user, content: json.dumps(msg.payload, ensure_asciiFalse) }) # 调用 LLM约束输出格式 response await client.chat.completions.create( modelgpt-4o, messagesmessages, toolsself.tools if self.tools else None, tool_choiceauto if self.tools else None, temperature0.1, # 低温度保证输出稳定性 ) self.status AgentStatus.COMPLETED result response.choices[0].message return {content: result.content, tool_calls: result.tool_calls} except Exception as e: self.status AgentStatus.FAILED return {error: str(e)} finally: self.message_queue.clear() class Orchestrator: 层级编排器负责任务分解、分发与结果聚合 def __init__(self, agents: dict[str, BaseAgent]): self.agents agents self.task_results: dict[str, list[dict]] {} def decompose_task(self, task: str) - list[dict]: 将复杂任务分解为子任务列表 实际生产中这里也由 LLM 完成此处简化为规则分解 # 示例数据分析任务分解 subtasks [ {agent: sql_writer, payload: {action: generate_sql, query: task}}, {agent: validator, payload: {action: validate, depends_on: sql_writer}}, {agent: reporter, payload: {action: summarize, depends_on: validator}}, ] return subtasks async def run(self, task: str, client) - dict: task_id str(uuid.uuid4()) self.task_results[task_id] [] subtasks self.decompose_task(task) for subtask in subtasks: agent_name subtask[agent] if agent_name not in self.agents: return {error: fAgent {agent_name} not found} agent self.agents[agent_name] # 构造消息将前序结果注入 payload payload {**subtask[payload]} if depends_on in subtask: prev_results self.task_results[task_id] payload[previous_results] prev_results msg AgentMessage( task_idtask_id, senderorchestrator, receiveragent_name, payloadpayload, ) agent.receive(msg) result await agent.execute(client) self.task_results[task_id].append({ agent: agent_name, result: result }) return { task_id: task_id, results: self.task_results[task_id] }关键设计决策说明Orchestrator 持有全局状态每个 Worker Agent 的 Prompt 只需关注自身职责消息通过previous_results字段传递前序输出避免 Agent 间直接耦合temperature0.1保证编排链路的输出稳定性。四、编排架构的 Trade-offs 分析延迟叠加问题顺序执行时N 个 Agent 的端到端延迟是各 Agent 推理延迟之和。3 个 Agent 各需 2 秒推理总延迟 6 秒。缓解方案是对无依赖的子任务并行执行但并行引入了结果合并的复杂度。上下文丢失风险每个 Agent 只看到 Orchestrator 传递的精简消息可能丢失原始任务的隐含语义。例如帮我分析上季度销售数据并给出建议SQL Agent 只收到生成 SQL的指令丢失了给出建议的意图。需要在任务分解时保留原始意图的摘要。Orchestrator 单点故障层级编排中 Orchestrator 是全局调度中心一旦崩溃整个系统停摆。生产环境需要为 Orchestrator 实现重试、超时和降级逻辑。如果 Orchestrator 自身也由 LLM 驱动还需要处理其推理失败的情况。成本倍增每个 Agent 独立调用 LLMN 个 Agent 意味着 N 倍的 Token 消耗。对于简单任务多 Agent 的收益不足以覆盖额外成本。建议设置复杂度阈值单步可完成的任务走单 Agent多步且职责差异大的任务才启用多 Agent 编排。五、总结多 Agent 协作系统的核心价值在于降低单次推理的认知复杂度通过职责隔离提升每个环节的输出质量。层级编排是目前生产环境最成熟的模式Orchestrator 掌握全局状态Worker Agent 聚焦单一职责。落地时需要重点关注延迟叠加、上下文丢失、单点故障和成本倍增四个 Trade-off。建议从顺序编排起步验证任务分解的合理性后再引入并行和冲突消解机制。对于 Token 成本敏感的场景优先评估单 Agent 工具调用是否已满足需求避免过度架构。