从一次凌晨三点的事故说起凌晨三点手机震得我直接从床上弹起来。监控告警Agent 服务集群的 CPU 全线飙到 95%响应时间从 50ms 暴涨到 12 秒。登录 Kibana 一看日志里全是同一个 IP 发来的请求——某个爬虫脚本对着我们的 Agent 接口疯狂调用/agent/chat每秒 2000 次直接把后端的 LLM 推理服务打挂了。更讽刺的是这个接口连最基本的认证都没做。因为当时赶项目上线想着“Agent 接口只给内部用”结果内部测试的 API Key 被某个前端同事不小心提交到了公开仓库。那次之后我花了整整一周重构网关层。今天这篇笔记就是把当时踩过的坑、改过的代码、以及后来在多个 Agent 项目中沉淀下来的经验掰开揉碎了讲清楚。认证别信“内部使用”这种鬼话Token 校验的坑很多人觉得 Agent 的 API 网关认证就是简单校验一个 Bearer Token。但实际跑起来你会发现LLM 推理动辄几十秒Token 在请求过程中过期了怎么办# 别这样写请求开始时校验一次后面就不管了defauthenticate(request):tokenextract_token(request)ifnotverify_token(token):# 这里踩过坑token 可能在长请求中过期raiseUnauthorized()returndecode_token(token)正确的做法是在网关层做短生命周期 Token 刷新机制。对于 Agent 的长连接或流式响应建议用 JWT 但把过期时间设长一些比如 30 分钟同时配合 Redis 做黑名单。# 推荐做法网关层做两级校验classAgentAuthMiddleware:def__init__(self):self.blacklistRedisClient()# 存被撤销的 tokenasyncdefauthenticate(self,request):tokenrequest.headers.get(X-Agent-Token)ifnottoken:# 这里踩过坑直接返回 401 会让客户端困惑# 应该返回 401 具体错误码raiseAuthError(codeMISSING_TOKEN,msg请提供 X-Agent-Token 头)# 先查黑名单再验签名ifawaitself.blacklist.exists(token):raiseAuthError(codeTOKEN_REVOKED,msgToken 已被撤销)try:payloadjwt.decode(token,self.secret,algorithms[HS256])exceptjwt.ExpiredSignatureError:# 别直接返回 401给客户端一个刷新机会raiseAuthError(codeTOKEN_EXPIRED,msgToken 已过期请刷新)exceptjwt.InvalidTokenError:raiseAuthError(codeINVALID_TOKEN,msgToken 格式错误)# 把用户信息注入到请求上下文request.state.userpayloadreturnTrueAPI Key vs OAuth2对于 Agent 服务我建议对内用 API Key对外用 OAuth2。别混用。API Key 的生成要带前缀标识来源比如sk-agent-prod-xxxx这样在日志里一眼就能看出是哪个环境、哪个服务的 Key 在调用。defgenerate_api_key(env:str,service:str)-str:# 这里踩过坑直接用 uuid 生成出了问题根本查不到来源prefixfsk-{env}-{service}random_partsecrets.token_hex(16)returnf{prefix}-{random_part}限流别等被打挂了才想起来限流粒度要细很多人的限流就是简单粗暴的“每个 IP 每秒 10 次”。但 Agent 场景下一个用户可能同时发起多个流式请求每个请求持续几十秒。这时候按 IP 限流完全没用——爬虫换个 IP 池就绕过去了。# 别这样写只按 IP 限流classRateLimiter:def__init__(self):self.redisRedis()self.window1# 1秒窗口self.max_requests10asyncdefcheck(self,request):iprequest.client.host keyfratelimit:ip:{ip}countawaitself.redis.incr(key)ifcountself.max_requests:raiseRateLimitError()正确的做法是多维限流classAgentRateLimiter:def__init__(self):self.redisRedis()asyncdefcheck(self,request):userrequest.state.user iprequest.client.host api_keyrequest.headers.get(X-Agent-Token,anonymous)# 三层限流用户级别、API Key 级别、全局级别limits[(fratelimit:user:{user.id},100,60),# 每分钟100次(fratelimit:apikey:{api_key},1000,60),# 每分钟1000次(fratelimit:global,10000,60),# 全局每分钟10000次]forkey,max_req,windowinlimits:countawaitself.sliding_window(key,window)ifcountmax_req:# 这里踩过坑直接返回 429 不够要告诉客户端什么时候能重试retry_afterself.calculate_retry_after(key,window)raiseRateLimitError(retry_afterretry_after,limitmax_req,windowwindow)流式请求的特殊处理Agent 的流式响应SSE是个大坑。一个 SSE 连接可能持续几分钟如果按请求次数限流一个连接就算一次请求完全起不到保护作用。# 推荐做法对 SSE 连接按 token 消耗量限流classSSERateLimiter:def__init__(self):self.token_bucket{}# 每个连接一个令牌桶asyncdefcheck_token(self,connection_id:str,tokens_consumed:int):bucketself.token_bucket.get(connection_id)ifnotbucket:bucketTokenBucket(capacity10000,refill_rate100)# 每秒补充100个tokenself.token_bucket[connection_id]bucketifnotbucket.consume(tokens_consumed):# 这里踩过坑直接断开连接太粗暴应该降速awaitself.slow_down(connection_id)日志别等出事了才后悔没打结构化日志是底线Agent 的调用链路特别长用户请求 - 网关 - Agent 编排 - LLM 调用 - 工具调用 - 返回。任何一个环节出问题没有结构化日志根本查不了。# 别这样写用 print 或者简单的 logging.infologging.info(fUser{user_id}called agent{agent_id})# 推荐做法结构化日志带上 trace_idclassAgentLogger:def__init__(self):self.loggerstructlog.get_logger()deflog_request(self,request,response,duration_ms):self.logger.info(agent_request,trace_idrequest.state.trace_id,user_idrequest.state.user.id,agent_idrequest.path_params.get(agent_id),modelrequest.state.model,input_tokensrequest.state.input_tokens,output_tokensresponse.output_tokens,duration_msduration_ms,status_coderesponse.status_code,# 这里踩过坑忘了记录错误类型排查时只能猜error_coderesponse.error_codeifhasattr(response,error_code)elseNone,)敏感信息脱敏Agent 的请求里经常包含用户隐私数据。日志里直接打明文等合规找上门就晚了。defsanitize_request(request_body:dict)-dict:# 这里踩过坑只脱敏了 password忘了脱敏 API Key 和 tokensensitive_fields[password,api_key,token,credit_card,phone]sanitized{}forkey,valueinrequest_body.items():ifkeyinsensitive_fields:sanitized[key]***elifisinstance(value,dict):sanitized[key]sanitize_request(value)else:sanitized[key]valuereturnsanitized错误处理让调用方少骂你两句统一错误响应格式Agent 的调用方可能是前端、其他微服务、或者第三方开发者。错误响应格式不统一每个调用方都得写一堆 if-else。# 别这样写每个接口返回不同的错误格式# /agent/chat 返回 {error: rate limit exceeded}# /agent/stream 返回 {code: 429, message: too many requests}# 推荐做法统一的错误响应结构classAgentErrorResponse(BaseModel):code:str# 机器可读的错误码message:str# 人类可读的错误信息details:Optional[dict]# 调试用的详细信息request_id:str# 方便调用方报错时提供timestamp:intclassConfig:# 这里踩过坑忘了加 schema 示例调用方看不懂schema_extra{example:{code:RATE_LIMIT_EXCEEDED,message:请求过于频繁请稍后重试,details:{retry_after:30,limit:100,window:60},request_id:req_abc123,timestamp:1700000000}}错误码设计错误码要分层HTTP 状态码 业务错误码。别只用 HTTP 状态码400 和 401 根本不够用。classErrorCodes:# 认证相关MISSING_TOKENAUTH_001INVALID_TOKENAUTH_002TOKEN_EXPIREDAUTH_003TOKEN_REVOKEDAUTH_004# 限流相关RATE_LIMIT_USERRATE_001RATE_LIMIT_GLOBALRATE_002RATE_LIMIT_TOKENRATE_003# Agent 相关AGENT_NOT_FOUNDAGENT_001AGENT_TIMEOUTAGENT_002AGENT_OVERLOADAGENT_003# LLM 相关LLM_TIMEOUTLLM_001LLM_OVERLOADLLM_002LLM_CONTENT_FILTEREDLLM_003重试策略Agent 调用 LLM 经常超时但直接重试可能让情况更糟。网关层要做智能重试。classRetryStrategy:def__init__(self):self.max_retries3self.backoffExponentialBackoff(base1,max_delay30)asyncdefshould_retry(self,error:AgentError,attempt:int):# 这里踩过坑所有错误都重试结果把数据库也打挂了retryable_codes[LLM_TIMEOUT,LLM_OVERLOAD,AGENT_TIMEOUT,AGENT_OVERLOAD]iferror.codenotinretryable_codes:returnFalseifattemptself.max_retries:returnFalse# 检查是否还有配额ifnotawaitself.check_quota():returnFalsedelayself.backoff.get_delay(attempt)awaitasyncio.sleep(delay)returnTrue个人经验性建议网关层不要做业务逻辑。我见过有人把 Agent 的 prompt 组装逻辑写在网关里结果每次改 prompt 都要重启网关。网关只做认证、限流、日志、路由业务逻辑全扔给下游。限流阈值要动态调整。别写死 100 次/分钟。根据后端 LLM 的负载、当前排队数、甚至时间白天 vs 凌晨动态调整。我后来加了个自适应限流模块根据 P99 延迟自动降级。日志要能快速定位到具体请求。每个请求生成一个 trace_id贯穿网关、Agent 编排、LLM 调用、工具调用。出问题时一个 trace_id 就能拉出整条链路。错误信息要区分内外。对内部日志把堆栈、参数、环境变量全打出来对客户端只返回友好的错误信息别把内部 IP、数据库表名暴露出去。压测要带上网关。很多人只压测 Agent 核心服务忽略了网关。结果上线后发现网关成了瓶颈——认证的 Redis 查询、限流的计数器、日志的写入这些都会增加延迟。那次凌晨三点的事故之后我养成了一个习惯每次上线新 Agent 接口第一件事不是测功能而是先跑一遍压测看看网关能不能扛住 10 倍流量。网关是系统的第一道防线这道防线垮了后面再好的 Agent 逻辑都是白搭。