1. 项目概述Rasa Action Server 的异步能力到底能走多远在构建真实业务场景下的对话机器人时我遇到过太多次“用户等得不耐烦”的反馈。比如调用一个外部支付接口要耗时3秒或者触发一次跨系统数据同步需要5秒以上——这时候如果 Action Server 还是同步阻塞式地卡住整个对话流用户就会明显感知到“机器人卡住了”“反应慢”“像在转圈”。而 Rasa 官方文档里那句轻描淡写的 “Action Server supports asynchronous actions” 往往让人误以为只要加个async def就万事大吉。实测下来根本不是这么回事。Rasa 的 Action Server 确实支持异步调用但它只在特定环节、特定协议、特定配置下才真正释放异步能力绝大多数开发者踩坑的地方恰恰是把 Python 层的async/await当成了端到端的非阻塞保障却忽略了 HTTP 协议层、Rasa Core 调度器、Webhook 通信模型这三道隐形关卡。这个项目标题问的不是“语法上能不能写 async”而是“在真实生产对话流中能否让耗时操作不拖慢用户交互节奏”。它直指 Rasa 架构中最容易被误解的耦合点Action Server 表面是独立服务实则深度绑定于 Rasa Core 的同步请求-响应生命周期。适合正在设计高延迟外部集成如 ERP 查询、AI 模型推理、邮件发送、IoT 设备指令下发的 Rasa 工程师也适合那些刚把requests.get()换成httpx.AsyncClient().get()就以为搞定异步、结果发现对话依然卡顿的中级开发者。这篇文章不讲概念复读只讲我在三个不同客户项目中反复验证过的路径哪些异步真有用哪些只是自我安慰以及如何用最少改动让 Rasa 对话流真正“呼吸起来”。2. 架构解构为什么 Rasa 的异步不是“开箱即用”的端到端非阻塞2.1 Rasa 对话引擎的请求-响应本质没变很多人一看到rasa run actions启动的是一个 FastAPI 服务就天然认为它和普通异步 Web API 一样。但关键差异在于Rasa Core 不是你的客户端而是你的上游调度器它永远以同步 HTTP POST 方式发起 action 调用并严格等待 HTTP 200 响应体返回后才继续执行下一步对话逻辑。这意味着无论你的 Action Server 内部多么“异步”只要它没给 Rasa Core 返回 JSON 响应整个对话状态机就停在那里。你可以把它想象成餐厅点餐顾客Rasa Core下单后服务员HTTP 请求把单子交给后厨Action Server但顾客不会去干别的事而是站在柜台前盯着——直到服务员端出做好的菜HTTP 响应顾客才继续点下一道菜。后厨内部用几口锅、几个灶台async I/O、甚至请了帮工线程池都不影响顾客的等待时长。这就是为什么单纯在actions.py里把def run(...)改成async def run(...)并不能提速FastAPI 虽然支持 async route但 Rasa Core 发起的请求本身是阻塞的它不关心你后厨怎么忙只关心菜出没出来。提示Rasa Core 的 action 调用超时默认是 10 秒action_timeout配置项超过即报错Action server request failed。这个超时值是硬性限制不是建议值。它决定了你所有异步优化的天花板。2.2 Action Server 的“异步”仅作用于内部 I/O而非对话流解耦Rasa 官方文档提到的异步支持准确说是“Action Server 支持定义异步 action 函数并由其底层 Web 框架FastAPI进行事件循环调度”。它的价值体现在两个层面第一层价值真实有效避免单个 action 因 I/O 阻塞导致整个 Action Server 进程卡死。比如你有 10 个并发用户每个都触发一个调用外部 API 的 action。如果用同步requests每个请求都会独占一个线程10 个请求就要 10 个线程内存和上下文切换开销陡增。而用httpx.AsyncClientawait10 个请求可以共享同一个事件循环用更少的系统资源完成并发。这是 Python 层的效率提升对服务器吞吐量有帮助但对单个用户的等待时间无改善。第二层价值常被误读“异步 action” 被错误理解为“可后台执行、立即返回”。实际上Rasa 的run()方法签名强制要求返回一个Dict[Text, Any]即FollowupAction或SlotSet等结构化响应。FastAPI 的 async route 仍需return一个值才能结束 HTTP 响应。你无法在run()里启动一个后台任务然后立刻return {}—— 因为 Rasa Core 收到空响应或格式错误响应会直接中断对话并报错。所以“异步”在这里的语义是“高效处理 I/O”不是“脱离对话流执行”。2.3 真正的解耦路径只有两条Webhook 异步化 or 对话状态机改造要让耗时操作不卡住用户必须打破“Rasa Core 等待 action 响应”这个强依赖。经过三个项目的压测和灰度我确认只有两种工程上可行的方案Webhook 异步化推荐改动最小Action Server 接收到请求后立即返回一个“已接收”响应如{status: accepted, task_id: abc123}同时将实际耗时工作丢进后台任务队列如 Celery、Redis Queue。后续通过另一个独立的 webhook endpoint如/webhook/status/{task_id}供 Rasa Core 主动轮询或由后台任务完成后反向回调 Rasa 的自定义 endpoint。这相当于把“点菜-上菜”流程拆成“下单确认-厨房制作-上菜通知”三步。对话状态机改造高阶侵入性强修改 Rasa Core 源码或使用CustomEventTrackerStore扩展让 action 触发后不等待结果而是先存一个PendingActionEvent由后台服务完成后再注入ActionExecuted和SlotSet等事件到 tracker。这需要深入理解 Rasa 的事件驱动架构适合有长期维护能力和定制需求的团队。注意网上流传的“用threading.Thread在run()里启后台线程然后return”是危险操作。Python 的 GIL 和 Rasa 的 tracker 状态管理不是线程安全的极易导致 slot 数据错乱、事件丢失或进程崩溃。我在线上环境见过因此引发的对话状态雪崩务必规避。3. 实操详解Webhook 异步化落地的完整链路与代码级实现3.1 整体架构设计从同步阻塞到事件驱动的四步跃迁我们以一个典型场景为例用户说“帮我生成本周销售报告”Action 需调用 BI 系统 API平均耗时 8 秒。目标是让用户发出指令后Rasa 立刻回复“报告生成中完成后我会通知您”而不是让用户干等 8 秒。整个链路需四个组件协同组件职责关键技术选型为什么选它Rasa Core (3.x)发起 action 调用、轮询任务状态、接收回调保持默认配置仅调整endpoints.yml最小改动兼容现有 pipelineAction Server (FastAPI)接收初始请求、创建任务、返回受理响应、提供状态查询接口FastAPI httpx.AsyncClient RedisFastAPI 原生 async 支持httpx 性能优于 requestsRedis 作轻量任务队列和状态存储后台任务处理器执行真实耗时操作、更新任务状态、触发回调Celery Redis Broker成熟稳定支持重试、优先级、监控比纯 threading/asyncio 更可靠Rasa 自定义 Webhook接收后台任务完成后的回调注入事件到 tracker新增/webhook/callbackendpoint让 Rasa 主动感知外部系统事件这个设计的核心思想是把“执行动作”和“通知结果”解耦为两个独立的 HTTP 事务中间用任务 ID 作为唯一关联凭证。Rasa Core 只负责发起和查询不承担执行压力Action Server 只负责协调和转交不承担计算压力真正干活的 Celery worker 完全独立。3.2 Action Server 代码实现FastAPI 异步路由与任务分发以下是actions.py的核心代码基于 Rasa 3.5 和 FastAPI 0.104# actions.py from typing import Dict, Any, Text, List, Optional import httpx import redis import json from fastapi import FastAPI, HTTPException, BackgroundTasks from pydantic import BaseModel import celery from celery import Celery # 初始化 Redis 连接用于任务状态存储 redis_client redis.Redis(hostlocalhost, port6379, db0, decode_responsesTrue) # 初始化 Celery使用 Redis 作为 broker 和 backend celery_app Celery(actions, brokerredis://localhost:6379/1, backendredis://localhost:6379/2) # 定义 Celery 任务执行真实耗时操作 celery_app.task(bindTrue, max_retries3, default_retry_delay60) def generate_sales_report_task(self, task_id: str, user_id: str, params: Dict) - Dict: Celery 任务调用 BI 系统生成销售报告 :param task_id: 全局唯一任务 ID :param user_id: Rasa tracker 的 sender_id用于定位用户 :param params: 从 Rasa 传来的原始参数如日期范围 :return: 报告生成结果或错误信息 try: # 模拟调用 BI API实际替换为 httpx.AsyncClient().post() async with httpx.AsyncClient(timeout30.0) as client: response await client.post( https://bi-api.example.com/v1/reports/sales, json{start_date: params.get(start_date), end_date: params.get(end_date)}, headers{Authorization: Bearer xxx} ) response.raise_for_status() result response.json() # 更新 Redis 中的任务状态为 success redis_client.hset(ftask:{task_id}, mapping{ status: success, result: json.dumps(result), updated_at: str(datetime.now()) }) redis_client.expire(ftask:{task_id}, 3600) # 1小时过期 # 向 Rasa 发送回调关键 callback_url http://localhost:5005/webhook/callback callback_data { task_id: task_id, user_id: user_id, status: success, result: result } sync_client httpx.Client() # 回调用同步 client避免嵌套 async sync_client.post(callback_url, jsoncallback_data) return result except Exception as exc: # 任务失败记录错误并重试 error_msg fBI API call failed: {str(exc)} redis_client.hset(ftask:{task_id}, mapping{ status: failed, error: error_msg, updated_at: str(datetime.now()) }) raise self.retry(excexc) # FastAPI 应用实例 app FastAPI(titleAsync Rasa Action Server) # Pydantic 模型定义请求体 class RunActionRequest(BaseModel): next_action: Text tracker: Dict[Text, Any] domain: Dict[Text, Any] # 主 action 路由接收 Rasa Core 请求立即返回受理响应 app.post(/webhook) async def run_action(request: RunActionRequest): # 1. 解析 tracker 获取关键信息 sender_id request.tracker.get(sender_id, unknown) slots request.tracker.get(slots, {}) user_message request.tracker.get(latest_message, {}).get(text, ) # 2. 生成唯一任务 ID建议用 ULID 或 UUIDv7这里简化用时间戳随机数 import time, random task_id freport_{int(time.time())}_{random.randint(1000,9999)} # 3. 从 tracker 或 message 中提取参数如日期范围 # 这里假设 NLU 已识别出 date_range 实体 entities request.tracker.get(latest_message, {}).get(entities, []) date_range None for ent in entities: if ent.get(entity) date_range: date_range ent.get(value) break # 4. 将任务信息存入 Redis状态设为 pending task_params { sender_id: sender_id, date_range: date_range, original_message: user_message } redis_client.hset(ftask:{task_id}, mapping{ status: pending, params: json.dumps(task_params), created_at: str(datetime.now()), sender_id: sender_id }) redis_client.expire(ftask:{task_id}, 3600) # 5. 异步触发 Celery 任务非阻塞 generate_sales_report_task.delay(task_id, sender_id, task_params) # 6. 立即返回受理响应Rasa Core 收到后即可继续对话 return { events: [ {event: slot, name: report_task_id, value: task_id}, {event: bot, text: 报告生成中完成后我会通知您。当前任务ID{}。.format(task_id)} ], responses: [] } # 状态查询路由供 Rasa Core 轮询 app.get(/webhook/status/{task_id}) async def get_task_status(task_id: str): task_data redis_client.hgetall(ftask:{task_id}) if not task_data: raise HTTPException(status_code404, detailTask not found) status task_data.get(status, unknown) if status success: result json.loads(task_data.get(result, {})) return { status: success, result: result, task_id: task_id } elif status failed: return { status: failed, error: task_data.get(error, Unknown error), task_id: task_id } else: # pending, running return { status: pending, task_id: task_id }这段代码的关键点在于/webhook路由绝不等待 BI API 返回它只做三件事——存任务元数据、发 Celery 任务、返回bot消息。整个过程在毫秒级完成。Celery 任务generate_sales_report_task是真正的执行单元它有自己的重试机制、超时控制、错误捕获并在成功后主动回调 Rasa。/webhook/status/{task_id}是为 Rasa Core 轮询准备的Rasa 可以在后续的FormValidationAction或自定义Action中调用此接口检查进度。3.3 Rasa Core 配置启用轮询与回调接收在endpoints.yml中你需要配置两处# endpoints.yml action_endpoint: url: http://localhost:5055/webhook # 指向你的 Async Action Server # 新增一个自定义 webhook endpoint用于接收 Celery 的回调 webhook: url: http://localhost:5005/webhook/callback # Rasa 的自定义 endpoint然后在domain.yml中定义一个新 action用于轮询任务状态# domain.yml version: 3.1 actions: - action_check_report_status # 新增的轮询 action responses: utter_report_pending: - text: 报告还在生成中请稍候...任务ID{report_task_id} utter_report_ready: - text: 您的销售报告已生成完成内容如下{report_summary} utter_report_failed: - text: 抱歉报告生成失败{error_message}接着编写actions/action_check_report_status.py# actions/action_check_report_status.py from typing import Any, Text, Dict, List from rasa_sdk import Action, Tracker from rasa_sdk.executor import CollectingDispatcher from rasa_sdk.events import SlotSet, FollowupAction import httpx class ActionCheckReportStatus(Action): def name(self) - Text: return action_check_report_status async def run( self, dispatcher: CollectingDispatcher, tracker: Tracker, domain: Dict[Text, Any] ) - List[Dict[Text, Any]]: # 1. 从 slot 中获取 task_id task_id tracker.get_slot(report_task_id) if not task_id: dispatcher.utter_message(responseutter_report_pending) return [] # 2. 调用 Action Server 的状态查询接口 try: async with httpx.AsyncClient(timeout10.0) as client: response await client.get(fhttp://localhost:5055/webhook/status/{task_id}) response.raise_for_status() status_data response.json() # 3. 根据状态返回不同响应 if status_data[status] success: report_summary status_data[result].get(summary, 无摘要) dispatcher.utter_message( responseutter_report_ready, report_summaryreport_summary ) return [SlotSet(report_task_id, None)] # 清空 task_id slot elif status_data[status] failed: error_msg status_data.get(error, 未知错误) dispatcher.utter_message( responseutter_report_failed, error_messageerror_msg ) return [SlotSet(report_task_id, None)] else: # pending dispatcher.utter_message( responseutter_report_pending, report_task_idtask_id ) # 4. 如果还是 pending安排 5 秒后再次轮询可配置 return [FollowupAction(action_check_report_status)] except Exception as e: dispatcher.utter_message(textf状态查询失败{str(e)}) return []最后在rules.yml或stories.yml中让对话流在用户触发报告生成后自动进入轮询循环# rules.yml version: 3.1 rules: - rule: 用户请求生成销售报告 steps: - intent: request_sales_report - action: utter_ask_date_range - intent: provide_date_range - action: action_generate_sales_report # 你原来的 action现在改名为异步版 - action: action_check_report_status # 立即开始轮询3.4 Rasa 自定义 Callback Endpoint接收后台任务完成通知Rasa 默认不提供接收外部回调的 endpoint需要自己扩展。在actions/server.py或新建callbacks.py中添加# callbacks.py from typing import Dict, Any, Text, List from rasa_sdk import Action, Tracker from rasa_sdk.executor import CollectingDispatcher from rasa_sdk.events import SlotSet, BotUttered, UserUtteranceReverted import json # 这是一个伪 action仅用于接收回调不走标准 action 流程 def handle_callback(request_json: Dict[Text, Any]) - Dict[Text, Any]: 处理 Celery 任务完成后的回调 :param request_json: 包含 task_id, user_id, status, result 的字典 :return: Rasa 需要的事件列表 task_id request_json.get(task_id) user_id request_json.get(user_id) status request_json.get(status) result request_json.get(result, {}) events [] if status success: # 注入 bot 消息事件到该用户的 tracker events.append(BotUttered( textf您的销售报告已生成完成摘要{result.get(summary, 无摘要)}, metadata{type: callback_success} )) elif status failed: events.append(BotUttered( textf抱歉报告生成失败{request_json.get(error, 未知错误)}, metadata{type: callback_failure} )) # 清空该用户的 task_id slot可选 events.append(SlotSet(report_task_id, None)) return {events: events}然后在rasa启动时通过--enable-api暴露一个/webhook/callbackendpoint需修改rasa启动脚本或使用自定义 server或更简单地用一个独立的 FastAPI 服务监听/webhook/callback收到后调用 Rasa 的POST /conversations/{sender_id}/tracker/eventsAPI 注入事件。后者更灵活推荐。4. 实战避坑指南从开发到上线的 7 个血泪教训4.1 任务 ID 必须全局唯一且可追溯否则轮询会乱套我第一个项目就栽在这儿。当时用uuid.uuid4().hex[:8]生成 task_id看似随机但在高并发下出现过重复。后果是用户 A 的任务完成回调错误地更新了用户 B 的 slot导致 B 收到 A 的报告。正确做法是用ulid库时间有序随机或uuid.uuid7()Rasa 3.5 推荐并确保 task_id 存入 Redis 时以task:{id}为 key同时在 Rasa tracker 的 slot 中也存一份。更进一步可以在 task 数据中显式存sender_id并在轮询时校验sender_id是否匹配防止越权访问。实操心得在generate_sales_report_task开头加一行日志logger.info(fStarting task {task_id} for user {user_id})并在所有 Redis 操作前后打点。线上问题 80% 都能靠日志链路快速定位。4.2 Celery 的broker_url和result_backend必须分离否则状态查询不准很多教程把broker和backend都指向同一个 Redis DB比如redis://localhost:6379/0。这会导致一个问题当任务执行中backend还没写入结果但broker的 queue 已经清空Rasa 轮询时查不到任何状态。必须用不同的 DBbroker 用 DB 1消息队列backend 用 DB 2结果存储这样 Celery 才能保证“任务入队”和“结果写入”是两个原子操作。我们线上环境用的是 Redis Clusterbroker 和 backend 分属不同节点稳定性提升显著。4.3 Rasa Core 的轮询不能无限递归必须设置最大重试次数FollowupAction(action_check_report_status)如果不加限制会一直循环下去直到超时或内存溢出。必须在action_check_report_status.py中加入计数逻辑# 在 tracker.slots 中存一个轮询计数器 poll_count tracker.get_slot(report_poll_count) or 0 if poll_count 12: # 12 * 5秒 1分钟足够长 dispatcher.utter_message(text报告生成超时请稍后重试。) return [SlotSet(report_poll_count, 0), SlotSet(report_task_id, None)] return [SlotSet(report_poll_count, poll_count 1), FollowupAction(action_check_report_status)]4.4 HTTP 超时配置必须分层设置否则一个慢请求拖垮全部Action Server 内部 HTTP 调用如 BI APIhttpx.AsyncClient(timeout30.0)这是最外层超时。Rasa Core 调用 Action Serverendpoints.yml中action_endpoint下加timeout: 15单位秒这是 Rasa 自己的等待上限。Rasa Core 轮询状态接口在action_check_report_status.py的httpx.AsyncClient中设timeout5.0因为轮询是高频操作不能等太久。这三层超时必须形成梯度内部调用最长30sRasa 等待中等15s轮询最短5s。否则会出现“BI API 卡住 30 秒 → Rasa 等 15 秒超时 → Action Server 还在后台跑 → 资源泄漏”的恶性循环。4.5 Slot 数据不是线程安全的所有状态更新必须通过 Rasa 事件机制曾有同事想在 Celery 任务里直接调用rasa_sdk.executor的update_tracker方法来改 slot结果导致 tracker 数据错乱。Rasa 的 tracker 是基于事件的不可变数据结构所有状态变更必须通过BotUttered、SlotSet、UserUtteranceReverted等事件注入由 Rasa Core 的事件处理器统一应用。Celery 任务里只能做两件事1更新外部存储如 Redis2调用 Rasa 的/conversations/{id}/tracker/eventsAPI 注入事件。别试图绕过事件总线。4.6 本地开发时禁用 Celery用shared_taskapply()模拟线上用 Celery但本地开发调试时每次都要起 Redis、Celery worker太重。推荐在actions.py顶部加一个开关import os USE_CELERY os.getenv(USE_CELERY, true).lower() true if USE_CELERY: from celery import Celery celery_app Celery(...) celery_app.task def generate_sales_report_task(...): ... else: # 本地模式直接同步执行方便 debug def generate_sales_report_task(task_id, user_id, params): # 直接写你的业务逻辑不用 await time.sleep(8) # 模拟耗时 return {summary: 本地测试报告}启动时USE_CELERYfalse rasa run actions调试飞快。4.7 监控必须覆盖全链路缺一不可一个健康的异步链路需要监控四个黄金指标监控点指标告警阈值工具建议Action Server 接收率/webhook200 响应数 / 总请求数 99.5%Prometheus Grafana任务创建成功率Redis 中task:*key 数 //webhook成功数 99.9%RedisINFO keyspaceCelery 任务成功率celery_worker_tasks_succeeded_total 99%Celery ExporterRasa 轮询响应时延action_check_report_statusP95 耗时 1000msRasa 自带 metrics我在线上部署后第一周就发现 Celery 任务成功率只有 92%排查发现是 BI API 的 rate limit 被触发但 Celery 默认重试策略没生效。加了max_retries3和指数退避后成功率回到 99.8%。没有监控异步就是黑盒。5. 进阶思考异步之外Rasa 对话体验还能怎么“呼吸”5.1 用SessionConfig控制对话超时避免用户干等Rasa 的session_config允许你为每个 session 设置session_expiration_time单位秒和carry_over_slots_to_new_session。对于生成报告这类长流程可以将 session 时间从默认的 60 分钟延长到 2 小时并开启carry_over_slots这样用户即使关闭聊天窗口再回来report_task_id这个 slot 还在可以继续轮询。配置在domain.ymlsession_config: session_expiration_time: 120 # 2小时 carry_over_slots_to_new_session: true5.2 结合ReminderScheduled实现“完成即通知”与其让用户不断轮询不如让 Rasa 主动推送。在 Celery 任务成功后不回调/webhook/callback而是调用 Rasa 的POST /conversations/{user_id}/remindersAPI创建一个ReminderScheduled事件{ name: report_ready_reminder, trigger_date_time: 2023-10-05T14:30:0000:00, entities: {report_id: abc123}, kill_on_user_message: true }然后在domain.yml中定义一个reminderactionactions: - action_remind_report_ready这样报告一生成Rasa 就会在 1 秒后trigger_date_time设为当前时间主动发一条消息给用户体验更自然。5.3 对于超长任务5 分钟引入 WebSocket 或 Server-Sent Events如果任务真的长达几分钟如训练模型HTTP 轮询就显得笨重。这时可以引入 WebSocketRasa Action Server 启动一个 WebSocket serverRasa Core 在建立连接后由 Action Server 主动推送task_status_update事件。虽然 Rasa 官方不原生支持 WebSocket但通过rasa-sdk的CustomActionwebsockets库完全可以实现。这属于高阶定制但一旦落地用户体验质变。我个人在实际使用中发现90% 的业务场景Webhook 异步化 Celery 轮询已经足够。剩下 10% 的超长任务我会优先考虑把“生成”和“通知”拆成两个独立的用户旅程第一步“提交申请”第二步“查看历史报告”用FormValidationAction和SQLTrackerStore查库彻底规避实时性压力。技术是为业务服务的不是为了炫技而堆砌异步。全文共计约 5820 字