800行代码实现 Open Claw 的 Tool、消息总线、子Agent管理架构
本文想说明的技术观点是对于 Tool 调用、消息分发、子 Agent 管理这三类 Agent 系统里的核心组件优先采用薄抽象、显式控制流和贴近模型 API 的实现方式往往比引入多层中间件更容易获得工程上的确定性。系统边界更清晰运行路径更容易追踪问题更容易定位也更适合作为后续扩展 Memory、调度和持久化能力的基础。引言这是一个基于 Anthropic Claude API 的 Agent 框架用 TypeScript 编写运行在单进程 Node.js 环境中。本文记录其中四个核心模块的实现工具系统Tool layer、消息总线MessageBus、子 Agent 管理SubagentManager、REPL 主循环。不涉及上层 Bot 接入层、持久化、Context / Memory 系统。框架不依赖 LangChain 或其他 Agent 框架直接基于 Anthropic SDK 构建。选择这条路的原因很简单中间层越薄调试越容易对 API 行为的控制越精确。基础设施Tool 抽象与 ToolRegistry▐****Tool 抽象类一个工具由四个要素组成name、description、input_schema、execute。export abstract class Tool { abstract readonly name: string; abstract readonly description: string; abstract readonly input_schema: AnthropicTool[input_schema]; abstract execute(args: Recordstring, unknown): Promiseunknown; toSchema(): AnthropicTool { return { name: this.name, description: this.description, input_schema: this.input_schema, }; } }input_schema的类型直接取自anthropic-ai/sdk的Tool类型定义。toSchema()将实例转换为 Anthropic API 要求的 function calling schema。没有中间层转换SDK 类型就是唯一的 schema 定义。这里有一个刻意的取舍schema 使用运行时普通对象定义而非 Zod 等库。好处是零额外依赖、直接对齐 SDK 类型。代价是没有运行时参数校验——LLM 传入的参数如果类型不对只能靠execute内部的as断言和实际调用时的错误来兜底。对于当前规模这个取舍可以接受。▐ToolRegistry注册表本身是一个Mapstring, Toolexport class ToolRegistry { private tools new Mapstring, Tool(); register(tool: Tool) { this.tools.set(tool.name, tool); } async execute(name: string, args: Recordstring, unknown) { const tool this.tools.get(name); if (!tool) throw new Error(Tool ${name} not found); return tool.execute(args); } getToolDefinition(): AnthropicTool[] { return Array.from(this.tools.values()).map((tool) tool.toSchema()); } exclude(names: string[]): ToolRegistry { const excludeSet new Set(names); const filtered new ToolRegistry(); for (const [name, tool] of this.tools) { if (!excludeSet.has(name)) { filtered.register(tool); } } return filtered; } }exclude()是为子 Agent 设计的。子 Agent 不应该持有spawn避免递归创建子 Agent、message避免直接向用户发消息等工具所以需要从主 Agent 的工具集中排除特定工具生成一个受限子集。exclude()返回新的ToolRegistry实例不修改原注册表。内置工具一览▐****文件操作ReadFileTool— 读取文件内容动态import(node:fs/promises)加载模块。WriteFileTool— 写入文件。写入前调用mkdir(dirname(path), { recursive: true })自动创建父目录避免因目录不存在而失败。EditFileTool— 精确文本替换。核心逻辑const occurrences content.split(oldText).length - 1; if (occurrences 0) { return Error: old_text not found in ${filePath}; } if (occurrences 1) { return Warning: old_text found ${occurrences} times in ${filePath}. Please provide a more unique text snippet. No changes made.; } const updated content.replace(oldText, newText);强制唯一匹配出现 0 次报错超过 1 次拒绝写入并要求提供更精确的文本片段。这个设计是为了防止 LLM 给出模糊的替换目标导致意外修改多处代码。ListDirTool— 列出目录内容对每个条目做stat用[folder]和[file]前缀区分类型。▐命令执行ExecTool— 执行 shell 命令。三层防护第一层危险命令正则黑名单const DANGEROUS_PATTERNS: RegExp[] [ /rm\s(-[a-zA-Z]*f[a-zA-Z]*\s)?(-[a-zA-Z]*r[a-zA-Z]*\s)?\/($|\s)/, /rm\s-[a-zA-Z]*rf?\s~($|\/|\s)/, /mkfs\b/, /dd\sif/, /:\(\)\s*\{\s*:\|:\s*\s*\}\s*;/, // fork bomb /\s*\/dev\/[sh]d[a-z]/, // 写入裸设备 /chmod\s-R\s777\s\//, ];覆盖rm -rf /、fork bomb、写裸设备等高危模式。需要说明的是正则黑名单是最低限度的防线不能替代沙箱隔离。第二层资源限制默认30 秒超时2MBmaxBuffer。超时后进程被 kill返回超时提示。第三层输出截断。超过 10,000 字符时取首尾各 5,000 字符中间用截断标记连接function truncateOutput(text: string): string { if (text.length MAX_OUTPUT_LENGTH) return text; const half Math.floor(MAX_OUTPUT_LENGTH / 2); return ( text.slice(0, half) \n\n--- truncated (${text.length} chars total) ---\n\n text.slice(-half) ); }保留首尾而非只取前 N 字符是因为命令输出的末尾通常包含最有价值的信息错误信息、统计摘要等。▐****Web 能力WebSearchTool— 封装 Brave Search API返回结构化搜索结果。参数count可选默认 5 条上限 10 条。WebFetchTool— 抓取 URL 内容。针对 HTML 页面内置了纯正则实现的htmlToText转换function htmlToText(html: string): string { return html .replace(/script[\s\S]*?\/script/gi, ) .replace(/style[\s\S]*?\/style/gi, ) .replace(/(br|\/p|\/div|\/li|\/tr|\/h[1-6])[^]*/gi, \n) .replace(/[^]/g, ) // HTML 实体解码... .replace(/[ \t]/g, ) .replace(/\n{3,}/g, \n\n) .trim(); }没有使用 DOM 解析库如 cheerio、jsdom纯正则处理。对于大多数常规网页足够用但对复杂嵌套结构可能丢失语义。内容超过 20,000 字符时截断。▐****通信与调度MessageTool— 出站方向的消息通道。通过构造时注入的sendCallback向外部发送消息export class MessageTool extends Tool { constructor(private sendCallback: SendCallback) { super(); } async execute(args: Recordstring, unknown): Promisestring { const content args.content as string; const channel (args.channel as string) ?? repl; const chatId (args.chat_id as string) ?? default; await this.sendCallback({ channel, chatId, content }); return Message sent to ${channel}:${chatId}; } }REPL 场景下sendCallback就是console.logBot 场景下替换为向 Telegram、Discord 等平台发送消息的函数。工具本身不关心消息最终去向。CronTool CronService— 定时任务管理分为服务层和工具层。CronService 基于setInterval实现支持两种定时方式every_seconds直接转换为毫秒间隔和cron_expr解析 cron 表达式为近似间隔。cron 表达式的解析是简化版本private parseCronInterval(expr: string): number { const parts expr.trim().split(/\s/); if (parts.length ! 5) return 60_000; const [minute, hour] parts; // */N * * * * → 每 N 分钟 if (minute?.startsWith(*/) hour *) { const n parseInt(minute.slice(2), 10); if (!isNaN(n) n 0) return n * 60_000; } // 0 */N * * * → 每 N 小时 if (minute 0 hour?.startsWith(*/)) { const n parseInt(hour.slice(2), 10); if (!isNaN(n) n 0) return n * 3600_000; } if (minute * hour *) return 60_000; // 每分钟 if (minute 0 hour *) return 3600_000; // 每小时 if (minute 0 hour 0) return 86400_000; // 每天 return 60_000; // 复杂表达式降级为每分钟 }只处理*/N、每小时、每天等常见模式。不支持每周三 14:30这类精确时间点的 cron 语义——setInterval本身也做不到这件事。复杂表达式静默降级为每分钟执行一次这是一个已知的精度妥协。CronTool 对外暴露add、list、remove三个 action作为 CronService 的 function calling 接口。MessageBus入站消息总线MessageBus 处理入站方向的消息流——从子系统或外部流向主 Agent。export class MessageBus { private listeners new Mapstring, SetMessageHandler(); private queue: InboundMessage[] []; subscribe(channel: string, handler: MessageHandler): () void { if (!this.listeners.has(channel)) { this.listeners.set(channel, new Set()); } this.listeners.get(channel)!.add(handler); return () { this.listeners.get(channel)?.delete(handler); }; } async publish(message: InboundMessage): Promisevoid { const handlers this.listeners.get(message.channel); if (handlers handlers.size 0) { for (const handler of handlers) { await handler(message); } } else { this.queue.push(message); } } drain(channel?: string): InboundMessage[] { if (!channel) { const msgs [...this.queue]; this.queue []; return msgs; } const matched this.queue.filter((m) m.channel channel); this.queue this.queue.filter((m) m.channel ! channel); return matched; } }数据结构InboundMessage包含四个字段channel消息通道、senderId发送者标识、chatId关联会话格式为originChannel:originChatId、content消息内容。两种消费模式subscribe— 注册实时回调。消息到达时立即调用 handler。适合常驻服务场景。drain— 从队列中取出并清空消息。适合轮询式的同步消费场景。路由规则有订阅者走回调无订阅者入队列。消息只走一条路径不会同时触发回调和入队。与 MessageTool 的关系需要明确MessageTool 负责出站Agent → 外部MessageBus 负责入站外部/子系统 → Agent。两者没有直接的代码耦合方向相反。SubagentManager后台子 Agent▐****架构单进程并发模型。每个子 Agent 是一个 Promise共享同一个 Node.js 事件循环。没有多进程、没有 Worker。每个子 Agent 拥有独立的AgentLoop实例有自己的 ReAct 循环没有历史上下文——每次从零开始处理完一个任务就结束。子 Agent 的工具集是主 Agent 的受限子集。在index.ts中通过exclude排除了spawn、message、edit_file、cronconst subagentTools tools.exclude([spawn, message, edit_file, cron]);排除spawn防止子 Agent 递归创建子 Agent排除message防止子 Agent 直接向用户发消息应该通过 MessageBus 回传给主 Agent 处理排除edit_file限制子 Agent 的写入能力排除cron避免子 Agent 创建定时任务。▐****生命周期spawn(params: { task: string; label?: string; ... }): string { const id subagent-${this.counter}; const label params.label ?? Task ${this.counter}; const promise this.runSubagent(id, params.task, label, ...); promise.finally(() { this.runningTasks.delete(id); }); this.runningTasks.set(id, { id, label, promise }); return id; }流程spawn( )分配自增 ID → 启动runSubagent()返回 Promise → 立即返回 ID。调用方不需要等待子 Agent 完成。runSubagent创建一个独立的AgentLoopbuildMessages只传入当前任务不带历史buildMessages: (_history, userMessage) [ { role: user as const, content: userMessage }, ],子 Agent 最大迭代 15 次主 Agent 是 10 次。完成后通过bus.publish( )将结果发送到systemchannelawait this.bus.publish({ channel: system, senderId: subagent, chatId: ${originChannel}:${originChatId}, content: [Subagent ${label} (${id}) completed]\n\n${result}, });成功和失败都走这条路径只是 content 不同。promise.finally()负责从runningTasksMap 中自动清理已完成的任务。▐****SpawnToolSpawnTool 是主 Agent 触发子 Agent 的接口。LLM 通过 function calling 调用它传入task任务描述和可选的label。返回值包含子 Agent ID 和当前运行中的子 Agent 数量让 LLM 对并发状态有感知。REPL入口与主循环REPLRead-Eval-Print Loop是整个 Agent 的终端交互入口。用户在终端输入文本Agent 处理后输出回复循环往复。▐****启动流程index.ts的初始化按以下顺序执行创建 Anthropic 客户端和 MessageBus 实例。注册所有工具到 ToolRegistry区分主 Agent 工具集和子 Agent 受限工具集。初始化 CronService触发回调通过bus.publish()写入 system channel。创建 SubagentManager注册 SpawnTool最后注册因为依赖 SubagentManager 实例。构建 ContextBuilder加载 skills 和 memory。创建 AgentLoop 实例使用readline/promises启动交互循环。▐****并发控制核心问题用户输入和子 Agent 回传结果都会触发agent.run()但history数组是共享的不能并发修改。解决方式——布尔互斥锁 暂存队列let processing false; const pendingSubagentResults: InboundMessage[] []; async function drainPendingResults(): Promisevoid { while (pendingSubagentResults.length 0) { const msg pendingSubagentResults.shift()!; const systemContent [ [SYSTEM NOTIFICATION - Subagent Result], msg.content, Please summarize the above subagent result for the user., ].join(\n\n); const reply await agent.run(systemContent, history); history.push({ role: user, content: systemContent }); history.push({ role: assistant, content: reply }); console.log(\nBot ${reply}\n); } } async function tryDrainPending(): Promisevoid { if (processing) return; processing true; try { await drainPendingResults(); } finally { processing false; } rl.prompt(); }processing布尔标志充当互斥锁。同一时刻只有一个agent.run()在执行。子 Agent 结果到达时先 push 到pendingSubagentResults数组。tryDrainPending只在!processing时进入避免并发写入 history。用户输入的处理流程rl.on(line, async (input) { // ... processing true; try { const reply await agent.run(trimmed, history); history.push({ role: user, content: trimmed }); history.push({ role: assistant, content: reply }); await drainPendingResults(); } finally { processing false; } rl.prompt(); });用户交互完成后在finally释放锁之前先调用drainPendingResults()处理期间积攒的子 Agent 结果。这保证了子 Agent 结果不会无限滞后。▐****消息订阅bus.subscribe(system, (msg) { pendingSubagentResults.push(msg); void tryDrainPending().catch(console.error); });systemchannel 是内部消息的统一入口。子 Agent 完成、CronService 触发都通过bus.publish()发送到这个 channel。handler 只做两件事入队、尝试消费。每条子 Agent 结果被包装为[SYSTEM NOTIFICATION]格式注入 history由主 Agent 总结后输出给用户。▐****MessageTool 的接线REPL 场景下sendCallback直接输出到终端tools.register( new MessageTool((msg) { console.log(\n[Message → ${msg.channel}:${msg.chatId}] ${msg.content}\n); }), );如果切换到 Bot 场景只需要替换这个回调为向 Telegram、Discord 等平台发送消息的函数。工具系统和 Agent 逻辑不需要任何改动。模块协作全景终端 stdin │ ▼ REPL 主循环 (index.ts) │ ┌──────────────────────── history (共享互斥访问) │ │ ▼ ▼ AgentLoop.run() ──→ Tool 调用 │ ├── 文件/命令/网络工具 → 直接返回结果 │ ├── SpawnTool → SubagentManager.spawn() │ │ └── 子 AgentLoop (独立 ReAct) │ │ └── bus.publish(system, 结果) │ ├── MessageTool → sendCallback → stdout │ └── CronTool → CronService │ └── setInterval → bus.publish(system, 触发通知) │ │ ◄── bus.subscribe(system) ◄── pendingSubagentResults 队列 │ ▼ stdout 输出数据流有两条主线同步路径用户输入 → REPL →agent.run()→ 工具调用 → 结果回传模型 → 最终回复 → stdout。这是标准的 ReAct 循环。异步路径SpawnTool / CronService →bus.publish(system)→ handler 入队 →tryDrainPending()→agent.run()处理系统通知 → stdout。异步结果通过 MessageBus 汇入主循环由互斥锁保证不与同步路径冲突。设计选择与局限零框架依赖。不依赖 LangChain 等 Agent 框架直接基于 Anthropic SDK 构建。好处是完全控制 API 交互细节调试时不需要穿透框架抽象层。代价是部分基础能力需要自己实现。schema 定义方式。运行时对象而非 Zod / JSON Schema 库。降低了复杂度和依赖数量但缺乏运行时校验。如果 LLM 传入了格式错误的参数错误只能在执行阶段暴露。子 Agent 无持久记忆。每次 spawn 从零开始适合一次性的并行任务搜索、分析、计算。不适合需要跨任务积累上下文的场景。CronService 的 cron 表达式。近似实现只支持常见的等间隔模式。复杂表达式会静默降级为每分钟执行不会报错。如果需要精确的 cron 语义应该引入 cron 解析库。MessageBus 无持久化。纯内存队列进程重启后队列消息丢失。对于 REPL 场景足够Bot 场景如果需要消息可靠性需要接入持久化存储。ExecTool 安全边界。正则黑名单只是最低防线。LLM 可以通过变量展开、别名、管道组合等方式绕过正则检测。生产环境应该使用容器沙箱或受限用户执行。REPL 并发模型。布尔锁在单用户场景下足够。Node.js 的单线程模型保证了processing标志不会出现竞态。但如果扩展到多用户如 Bot 同时处理多个会话需要每个会话独立的 history 和更完整的队列/锁机制。总结这个框架的核心设计可以概括为四个部分Tool 抽象 Registry 模式— 统一的工具注册和调用接口通过 exclude() 实现能力隔离。双通道消息机制— 出站走 sendCallbackMessageTool入站走 MessageBus方向明确互不耦合。Promise 并发的子 Agent— 共享事件循环独立 ReAct 循环通过 MessageBus 回传结果。互斥锁驱动的 REPL 主循环— 布尔标志 暂存队列保证 history 的一致性。这些模块组合成一个可扩展的 Agent 运行时。扩展新工具只需继承Tool并注册。切换接入层REPL → Bot只需替换sendCallback和输入源。子 Agent 的能力边界通过exclude()控制。学AI大模型的正确顺序千万不要搞错了2026年AI风口已来各行各业的AI渗透肉眼可见超多公司要么转型做AI相关产品要么高薪挖AI技术人才机遇直接摆在眼前有往AI方向发展或者本身有后端编程基础的朋友直接冲AI大模型应用开发转岗超合适就算暂时不打算转岗了解大模型、RAG、Prompt、Agent这些热门概念能上手做简单项目也绝对是求职加分王给大家整理了超全最新的AI大模型应用开发学习清单和资料手把手帮你快速入门学习路线:✅大模型基础认知—大模型核心原理、发展历程、主流模型GPT、文心一言等特点解析✅核心技术模块—RAG检索增强生成、Prompt工程实战、Agent智能体开发逻辑✅开发基础能力—Python进阶、API接口调用、大模型开发框架LangChain等实操✅应用场景开发—智能问答系统、企业知识库、AIGC内容生成工具、行业定制化大模型应用✅项目落地流程—需求拆解、技术选型、模型调优、测试上线、运维迭代✅面试求职冲刺—岗位JD解析、简历AI项目包装、高频面试题汇总、模拟面经以上6大模块看似清晰好上手实则每个部分都有扎实的核心内容需要吃透我把大模型的学习全流程已经整理好了抓住AI时代风口轻松解锁职业新可能希望大家都能把握机遇实现薪资/职业跃迁这份完整版的大模型 AI 学习资料已经上传CSDN朋友们如果需要可以微信扫描下方CSDN官方认证二维码免费领取【保证100%免费】