Python速率限制实现
Python 速率限制实现 —— 令牌桶与滑动窗口算法涵盖内存实现、Redis 分布式限流和装饰器封装# 安装依赖pip install redis# 速率限制是保护 API 免受滥用和 DoS 攻击的核心手段import timeimport threadingfrom typing import Dict, Optional, Callable, Tuplefrom collections import defaultdict, dequefrom functools import wraps# 第一部分令牌桶算法内存实现class TokenBucket:令牌桶算法 —— 一种经典流量整形算法。原理桶中存放令牌请求需消耗令牌。桶以固定速率填充令牌。优点允许短暂的突发流量同时保证长期平均速率。def __init__(self, rate: float, capacity: int):初始化令牌桶。rate: 令牌填充速率个/秒capacity: 桶容量最大令牌数self.rate rateself.capacity capacityself.tokens capacity # 当前令牌数self.last_refill time.time() # 上次填充时间self._lock threading.Lock() # 线程安全def _refill(self) - None:根据时间间隔补充令牌。now time.time()elapsed now - self.last_refill# 计算应补充的令牌数new_tokens elapsed * self.rateself.tokens min(self.capacity, self.tokens new_tokens)self.last_refill nowdef consume(self, tokens: int 1) - bool:消耗令牌。如果有足够的令牌则返回 True否则返回 False。with self._lock:self._refill()if self.tokens tokens:self.tokens - tokensreturn Truereturn Falsedef get_wait_time(self) - float:获取需要等待的时间秒以便有足够的令牌。with self._lock:self._refill()if self.tokens 1:return 0.0return (1 - self.tokens) / self.rateclass MultiKeyTokenBucket:多键令牌桶 —— 为每个用户或 IP 维护独立的令牌桶。适用于多租户场景的速率限制。def __init__(self, rate: float, capacity: int):self.rate rateself.capacity capacityself._buckets: Dict[str, TokenBucket] {}self._lock threading.Lock()def get_bucket(self, key: str) - TokenBucket:获取或创建指定键的令牌桶。with self._lock:if key not in self._buckets:self._buckets[key] TokenBucket(self.rate, self.capacity)return self._buckets[key]def consume(self, key: str, tokens: int 1) - bool:消耗指定键的令牌。return self.get_bucket(key).consume(tokens)def cleanup(self, max_idle_seconds: float 3600) - None:清理长时间未使用的桶释放内存。now time.time()with self._lock:stale_keys [k for k, v in self._buckets.items()if now - v.last_refill max_idle_seconds]for k in stale_keys:del self._buckets[k]# 第二部分滑动窗口算法Redis 实现class SlidingWindowRateLimiter:滑动窗口限流器 —— 基于 Redis 的有序集合Sorted Set。原理维护一个时间窗口内的请求时间戳统计请求数量。优点精确控制时间窗口内的请求数量无突发问题。def __init__(self, redis_clientNone):# 如果没有 Redis使用内存存储仅用于演示self._redis redis_clientself._memory_store: Dict[str, deque] defaultdict(deque)self._memory_mode redis_client is Nonedef is_allowed(self, key: str, max_requests: int,window_seconds: int 60) - Tuple[bool, int]:检查请求是否被允许。返回 (是否允许, 当前窗口内请求数)。now time.time()window_start now - window_secondsif self._memory_mode:return self._check_memory(key, max_requests, window_start)return self._check_redis(key, max_requests, window_start, now)def _check_memory(self, key: str, max_requests: int,window_start: float) - Tuple[bool, int]:内存模式的滑动窗口检查。# 移除窗口外的记录while (self._memory_store[key] andself._memory_store[key][0] window_start):self._memory_store[key].popleft()# 检查是否超出限制if len(self._memory_store[key]) max_requests:return False, len(self._memory_store[key])# 记录本次请求self._memory_store[key].append(time.time())return True, len(self._memory_store[key])def _check_redis(self, key: str, max_requests: int,window_start: float,now: float) - Tuple[bool, int]:Redis 模式的滑动窗口检查。redis_key fratelimit:{key}try:# 移除过期记录self._redis.zremrangebyscore(redis_key, -inf, window_start)# 统计当前窗口内请求数current_count self._redis.zcard(redis_key)if current_count max_requests:return False, current_count# 添加新请求self._redis.zadd(redis_key, {str(now): now})# 设置过期时间self._redis.expire(redis_key, 120)return True, current_count 1except Exception as e:print(fRedis 操作失败降级为放行{e})return True, 0def get_remaining(self, key: str, max_requests: int,window_seconds: int 60) - int:获取当前窗口内剩余的请求额度。now time.time()window_start now - window_secondsif self._memory_mode:count len([t for t in self._memory_store[key]if t window_start])else:try:count self._redis.zcount(fratelimit:{key}, window_start, now)except Exception:return max_requestsreturn max(0, max_requests - count)# 第三部分装饰器实现 def rate_limit(max_per_second: int 10, max_burst: int 20):速率限制装饰器 —— 使用令牌桶算法限制函数调用频率。可以装饰同步和异步函数。# 为每个被装饰函数创建独立的令牌桶bucket TokenBucket(ratemax_per_second, capacitymax_burst)def decorator(func: Callable) - Callable:wraps(func)def wrapper(*args, **kwargs):if not bucket.consume():wait_time bucket.get_wait_time()raise RateLimitExceeded(f超出速率限制请在 {wait_time:.1f} 秒后重试)return func(*args, **kwargs)return wrapperreturn decoratordef per_user_rate_limit(max_per_minute: int 60):按用户限制速率 —— 从函数参数中提取 user_id 进行独立限流。buckets MultiKeyTokenBucket(ratemax_per_minute / 60.0,capacitymax_per_minute)def decorator(func: Callable) - Callable:wraps(func)def wrapper(*args, **kwargs):# 尝试从 kwargs 或 args 中获取 user_iduser_id kwargs.get(user_id) or (args[0] if args else anonymous)if not buckets.consume(str(user_id)):raise RateLimitExceeded(f用户 {user_id} 超出速率限制)return func(*args, **kwargs)return wrapperreturn decoratorclass RateLimitExceeded(Exception):速率限制超限异常pass# 第四部分演示 rate_limit(max_per_second2, max_burst2)def demo_api_call(request_id: str) - str:受速率限制的演示 API 调用。每秒最多 2 次突发最多 2 次。return f请求 {request_id} 处理完成def demo_rate_limiting():演示各种速率限制算法的使用。print( 速率限制实现演示 \n)# 1. 令牌桶演示print(--- 令牌桶算法 ---)bucket TokenBucket(rate2, capacity3)for i in range(5):allowed bucket.consume()print(f 请求 {i1}: {通过 if allowed else 拒绝})# 2. 滑动窗口演示print(\n--- 滑动窗口算法 ---)limiter SlidingWindowRateLimiter()for i in range(5):allowed, count limiter.is_allowed(user_alice, max_requests3, window_seconds60)print(f 请求 {i1}: {通过 if allowed else 拒绝}窗口内{count})# 3. 装饰器演示print(\n--- 速率限制装饰器 ---)for i in range(4):try:result demo_api_call(str(i))print(f {result})except RateLimitExceeded as e:print(f 请求 {i} 被限制{e})# 4. API 限流建议print(\n--- 生产环境建议 ---)print(选择限流策略的参考- 令牌桶适合允许突发流量的 API- 滑动窗口适合需要精确计费的 API- Redis 实现适合分布式系统的统一限流- 多键限流适用于多租户或多用户分级限流)if __name__ __main__:demo_rate_limiting()