上周我在用 OpenClaw 跑批量代码生成任务跑到一半控制台疯狂刷红Error: API rate limit reached。一开始以为是网络问题重启了两次才反应过来——请求频率打满了。搜了一圈发现不少人也踩过这个坑最近 OpenClaw 热度暴涨百度 APP 都接入了用的人一多限流就更狠。折腾了大半天把试过的方案整理出来直接说哪个好使。先说结论OpenClaw API 报 rate limit reached本质是单位时间内请求次数超了账户配额。解决路线有三条方案原理难度适用场景效果指数退避重试遇到限流自动等待后重试⭐偶发限流、请求量不大能用但慢请求队列 令牌桶限速主动控制发送频率⭐⭐批量任务、高并发场景稳定不浪费配额聚合 API 多通道分流请求分散到多个供应商⭐生产环境、不想改业务逻辑最省事基本告别限流我最后选了方案三搭配方案一跑了三天没再报错。下面一个一个说。搞清楚 Rate Limit 的触发机制先别急着写代码得知道限流是怎么算的。OpenClaw 的 rate limit 通常有两层RPMRequests Per Minute每分钟请求次数上限TPMTokens Per Minute每分钟 Token 消耗上限免费账户和付费账户的配额差距很大不同模型的限额也不一样Claude Opus 4.6 这种热门模型通常限得更紧。触发限流时API 会返回 HTTP 429 状态码响应头里通常带retry-after或x-ratelimit-reset字段告诉你多久后可以重试。很多人忽略了这个信息直接无脑重试反而被限得更久。200 OK429 Rate Limited有没有是否500/502发送 API 请求HTTP 状态码?正常处理响应读取 retry-after 头有 retry-after?等待指定秒数指数退避等待重试次数 上限?抛出异常 / 进入降级逻辑服务端错误, 直接重试方案一指数退避重试Exponential Backoff最基础的方案遇到 429 就等一会儿再试每次等待时间翻倍。适合请求量不大、偶尔触发限流的场景。importtimeimportrandomfromopenaiimportOpenAI,RateLimitError clientOpenAI(api_keyyour-api-key,base_urlhttps://api.example.com/v1# 替换为你的 API 地址)defchat_with_retry(messages,modelgpt-5,max_retries5):带指数退避的 API 调用forattemptinrange(max_retries):try:responseclient.chat.completions.create(modelmodel,messagesmessages,timeout30)returnresponseexceptRateLimitErrorase:ifattemptmax_retries-1:raisee# 尝试从错误信息中提取等待时间wait_time(2**attempt)random.uniform(0,1)# 如果响应头有 retry-after优先用它ifhasattr(e,response)ande.responseisnotNone:retry_aftere.response.headers.get(retry-after)ifretry_after:wait_timefloat(retry_after)random.uniform(0,0.5)print(f[Rate Limited] 第{attempt1}次重试等待{wait_time:.1f}s...)time.sleep(wait_time)raiseException(重试次数耗尽)# 使用respchat_with_retry(messages[{role:user,content:解释一下 Python 的 GIL}],modelgpt-5)print(resp.choices[0].message.content)一开始我没加随机抖动jitter结果多个并发请求同时触发限流、同时重试又同时被限——这叫thundering herd问题。加个random.uniform(0, 1)让每个请求的重试时间错开就好了。这个方案的问题是如果本身就是批量任务几百个请求排着队等重试整体耗时会很感人。方案二令牌桶限速 异步队列与其被动等限流再重试不如主动控制发送频率从根上不触发 429。用asyncio加简单令牌桶写了个异步版本importasyncioimporttimefromopenaiimportAsyncOpenAI,RateLimitErrorclassRateLimiter:简易令牌桶限速器def__init__(self,rpm50):self.rpmrpm self.interval60.0/rpm# 每个请求最小间隔self.last_request_time0self._lockasyncio.Lock()asyncdefacquire(self):asyncwithself._lock:nowtime.monotonic()wait_timeself.last_request_timeself.interval-nowifwait_time0:awaitasyncio.sleep(wait_time)self.last_request_timetime.monotonic()classBatchProcessor:def__init__(self,api_key,base_url,rpm50):self.clientAsyncOpenAI(api_keyapi_key,base_urlbase_url)self.limiterRateLimiter(rpmrpm)self.results[]asyncdefsingle_request(self,messages,modelgpt-5,idx0):awaitself.limiter.acquire()forattemptinrange(3):try:respawaitself.client.chat.completions.create(modelmodel,messagesmessages,timeout30)contentresp.choices[0].message.contentprint(f[{idx}] 完成)return{index:idx,content:content,status:ok}exceptRateLimitError:wait(2**attempt)1print(f[{idx}] 限流了等{wait}s 重试...)awaitasyncio.sleep(wait)return{index:idx,content:None,status:failed}asyncdefrun_batch(self,task_list,modelgpt-5,concurrency10):并发执行批量请求semasyncio.Semaphore(concurrency)asyncdefbounded_request(messages,idx):asyncwithsem:returnawaitself.single_request(messages,model,idx)tasks[bounded_request(msgs,i)fori,msgsinenumerate(task_list)]self.resultsawaitasyncio.gather(*tasks)oksum(1forrinself.resultsifr[status]ok)print(f\n完成:{ok}/{len(task_list)}成功)returnself.results# 使用示例asyncdefmain():processorBatchProcessor(api_keyyour-key,base_urlhttps://api.example.com/v1,rpm50# 根据你的账户配额设置)# 构造 100 个任务tasks[[{role:user,content:f用一句话解释概念 #{i}}]foriinrange(100)]resultsawaitprocessor.run_batch(tasks,concurrency10)asyncio.run(main())拿 100 个请求测过rpm 设 50总耗时 2 分钟出头零 429。代价是得准确知道自己的配额上限设低了浪费时间设高了还是会限流。有个地方容易搞混asyncio.Semaphore和RateLimiter是两回事。Semaphore 控制并发数同时在飞的请求数量RateLimiter 控制发送频率每分钟发几个。两个都要用少一个都可能出问题。方案三聚合 API 多通道分流前两个方案我都用了但真正让我安心的是换了个思路——不在一棵树上吊着。单个 API 端点有限流把请求分散到多个供应商通道不就完了手动维护多个 API Key 太麻烦我后来用了 ofox.ai 的聚合接口。ofox.ai 是一个 AI 模型聚合平台一个 API Key 可以调用 GPT-5、Claude Opus 4.6、Gemini 3、DeepSeek V3 等 50 模型后端有多供应商冗余备份Azure、Bedrock、阿里云、火山引擎单个供应商限流了会自动切换通道。代码改动极小就改个base_urlfromopenaiimportOpenAI# 只需要改这两行clientOpenAI(api_keyyour-ofox-key,base_urlhttps://api.ofox.ai/v1# 聚合接口多通道自动分流)# 业务代码完全不用动responseclient.chat.completions.create(modelgpt-5,messages[{role:user,content:写一个快速排序}])print(response.choices[0].message.content)# 切模型也是改个参数的事response_claudeclient.chat.completions.create(modelclaude-opus-4-6,# 无缝切换到 Claude Opus 4.6messages[{role:user,content:review 这段代码的性能问题}])配合方案一的指数退避重试一起用相当于双保险。跑了三天批量任务零 429。踩坑记录坑 1并发数开太大429 变 503异步并发开了 50以为令牌桶能兜住。结果令牌桶只管发送频率50 个请求几乎同时飞出去服务端直接返回 503。后来并发降到 10配合 RPM 限速才稳住。坑 2retry-after 的单位问题有的 API 返回的retry-after是秒数比如2有的是 HTTP 日期格式比如Thu, 20 Mar 2026 12:00:00 GMT。一开始统一按秒数处理碰到日期格式直接float()报错。记得做个判断importemail.utilsfromdatetimeimportdatetime,timezonedefparse_retry_after(value):try:returnfloat(value)exceptValueError:# 尝试解析为 HTTP 日期dtemail.utils.parsedate_to_datetime(value)returnmax(0,(dt-datetime.now(timezone.utc)).total_seconds())坑 3Streaming 模式下的限流更隐蔽用streamTrue的时候429 错误不是在连接时抛出的而是在迭代 chunk 的过程中才报。如果重试逻辑只包在create()外面stream 中途断了是抓不到的。要把整个迭代过程都包进 try-catchdefstream_with_retry(messages,max_retries3):forattemptinrange(max_retries):try:streamclient.chat.completions.create(modelgpt-5,messagesmessages,streamTrue)full_contentforchunkinstream:ifchunk.choices[0].delta.content:contentchunk.choices[0].delta.content full_contentcontentprint(content,end,flushTrue)print()returnfull_contentexceptExceptionase:ifrateinstr(e).lower()or429instr(e):wait2**attemptprint(f\nStream 中断等{wait}s 重试...)time.sleep(wait)else:raiseraiseException(Stream 重试耗尽)小结Rate limit 就三个层次被动应对用指数退避重试简单粗暴个人项目够用批量任务上令牌桶限速主动把频率压在配额以内不想折腾的生产环境直接上聚合 API 多通道分流。方案一几行代码的事必须加。然后根据场景选二或三——批量任务就二加三都上日常开发一加三就够了。最近 OpenClaw 用的人越来越多限流只会更频繁。代码都贴了直接复制能跑有问题评论区聊。