调用频率控制(Rate Limits)
调用频率控制(Rate Limits)是API服务的重要机制,用于保护系统稳定性、确保公平使用,并防止滥用。了解和合理处理频率限制对于构建稳定的应用至关重要。
概述
频率控制的目的:
- 保护服务器资源
- 确保服务质量
- 防止恶意攻击
- 实现公平使用
- 控制成本
限制类型
1. 时间窗口限制
- 每分钟请求数(RPM):每分钟允许的最大请求数
- 每秒请求数(RPS):每秒允许的最大请求数
- 每天请求数(RPD):每天允许的最大请求数
2. 并发限制
- 同时连接数:允许的并发连接数量
- 队列长度:等待处理的请求队列长度
3. 令牌限制
- 输入令牌数:每分钟/天的输入令牌限制
- 输出令牌数:每分钟/天的输出令牌限制
基本用法
检查限制状态
python
import openai
import time
import requests
from typing import Dict, Any
client = openai.OpenAI(
api_key="your_api_key",
base_url="https://realmrouter.cn/v1"
)
def get_rate_limits() -> Dict[str, Any]:
"""获取当前频率限制信息"""
try:
# 通过API响应头获取限制信息
response = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "test"}],
max_tokens=1
)
# 从响应头提取限制信息(具体字段可能因平台而异)
headers = response.response.headers
return {
"requests_remaining": headers.get("x-ratelimit-remaining-requests"),
"requests_limit": headers.get("x-ratelimit-limit-requests"),
"tokens_remaining": headers.get("x-ratelimit-remaining-tokens"),
"tokens_limit": headers.get("x-ratelimit-limit-tokens"),
"reset_time": headers.get("x-ratelimit-reset")
}
except Exception as e:
print(f"获取限制信息失败: {e}")
return {}
# 使用示例
limits = get_rate_limits()
print(f"剩余请求数: {limits.get('requests_remaining')}")
print(f"请求限制: {limits.get('requests_limit')}")基本的频率控制
python
class RateLimiter:
"""简单的频率限制器"""
def __init__(self, max_requests: int, time_window: int):
self.max_requests = max_requests
self.time_window = time_window # 秒
self.requests = []
def wait_if_needed(self):
"""如果需要,等待直到可以发送请求"""
now = time.time()
# 清理过期的请求记录
self.requests = [req_time for req_time in self.requests
if now - req_time < self.time_window]
# 检查是否超过限制
if len(self.requests) >= self.max_requests:
# 计算需要等待的时间
oldest_request = min(self.requests)
wait_time = self.time_window - (now - oldest_request)
if wait_time > 0:
print(f"达到频率限制,等待 {wait_time:.1f} 秒...")
time.sleep(wait_time)
# 记录当前请求
self.requests.append(now)
# 使用频率限制器
rate_limiter = RateLimiter(max_requests=60, time_window=60) # 每分钟60次请求
def safe_api_call(messages: list) -> str:
"""带频率控制的API调用"""
rate_limiter.wait_if_needed()
try:
response = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=messages,
max_tokens=1000
)
return response.choices[0].message.content
except openai.RateLimitError as e:
print(f"频率限制错误: {e}")
# 等待更长时间后重试
time.sleep(60)
return safe_api_call(messages)
except Exception as e:
print(f"API调用失败: {e}")
raise
# 使用示例
for i in range(10):
messages = [{"role": "user", "content": f"请生成第{i+1}个创意句子"}]
response = safe_api_call(messages)
print(f"响应 {i+1}: {response[:50]}...")高级功能
令牌桶算法
python
import threading
from collections import deque
class TokenBucket:
"""令牌桶算法实现"""
def __init__(self, capacity: int, refill_rate: float):
self.capacity = capacity # 桶容量
self.refill_rate = refill_rate # 每秒填充速率
self.tokens = capacity # 当前令牌数
self.last_refill = time.time()
self.lock = threading.Lock()
def consume(self, tokens: int = 1) -> bool:
"""消费令牌"""
with self.lock:
self._refill()
if self.tokens >= tokens:
self.tokens -= tokens
return True
return False
def _refill(self):
"""填充令牌"""
now = time.time()
elapsed = now - self.last_refill
tokens_to_add = elapsed * self.refill_rate
self.tokens = min(self.capacity, self.tokens + tokens_to_add)
self.last_refill = now
def wait_for_token(self, tokens: int = 1):
"""等待直到有足够令牌"""
while not self.consume(tokens):
time.sleep(0.1)
# 使用令牌桶
token_bucket = TokenBucket(capacity=10, refill_rate=1.0) # 容量10,每秒填充1个
def token_bucket_api_call(messages: list) -> str:
"""使用令牌桶的API调用"""
token_bucket.wait_for_token()
response = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=messages
)
return response.choices[0].message.content自适应频率控制
python
class AdaptiveRateLimiter:
"""自适应频率限制器"""
def __init__(self, initial_rpm: int = 60):
self.current_rpm = initial_rpm
self.min_rpm = 10
self.max_rpm = 300
self.success_count = 0
self.error_count = 0
self.last_adjustment = time.time()
self.adjustment_interval = 300 # 5分钟调整一次
self.request_times = deque()
def wait_if_needed(self):
"""根据当前RPM等待"""
now = time.time()
# 清理一分钟前的请求
one_minute_ago = now - 60
while self.request_times and self.request_times[0] < one_minute_ago:
self.request_times.popleft()
# 计算当前RPM
current_rpm = len(self.request_times)
if current_rpm >= self.current_rpm:
# 计算需要等待的时间
if self.request_times:
oldest_request = self.request_times[0]
wait_time = 60 - (now - oldest_request)
if wait_time > 0:
time.sleep(wait_time)
self.request_times.append(now)
# 定期调整RPM
self._adjust_rpm()
def record_success(self):
"""记录成功请求"""
self.success_count += 1
def record_error(self):
"""记录错误请求"""
self.error_count += 1
def _adjust_rpm(self):
"""调整RPM"""
now = time.time()
if now - self.last_adjustment < self.adjustment_interval:
return
total_requests = self.success_count + self.error_count
if total_requests == 0:
return
error_rate = self.error_count / total_requests
# 根据错误率调整RPM
if error_rate > 0.1: # 错误率超过10%
self.current_rpm = max(self.min_rpm, int(self.current_rpm * 0.8))
print(f"错误率过高 ({error_rate:.2%}),降低RPM至 {self.current_rpm}")
elif error_rate < 0.02: # 错误率低于2%
self.current_rpm = min(self.max_rpm, int(self.current_rpm * 1.2))
print(f"错误率较低 ({error_rate:.2%}),提高RPM至 {self.current_rpm}")
# 重置计数器
self.success_count = 0
self.error_count = 0
self.last_adjustment = now
# 使用自适应限制器
adaptive_limiter = AdaptiveRateLimiter(initial_rpm=60)
def adaptive_api_call(messages: list) -> str:
"""自适应频率控制的API调用"""
adaptive_limiter.wait_if_needed()
try:
response = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=messages
)
adaptive_limiter.record_success()
return response.choices[0].message.content
except openai.RateLimitError as e:
adaptive_limiter.record_error()
print(f"频率限制: {e}")
# 等待后重试
time.sleep(30)
return adaptive_api_call(messages)
except Exception as e:
adaptive_limiter.record_error()
print(f"API错误: {e}")
raise分布式频率控制
python
import redis
import json
class DistributedRateLimiter:
"""基于Redis的分布式频率限制器"""
def __init__(self, redis_client: redis.Redis, key_prefix: str = "rate_limit"):
self.redis = redis_client
self.key_prefix = key_prefix
def is_allowed(self, identifier: str, limit: int, window: int) -> bool:
"""检查是否允许请求"""
key = f"{self.key_prefix}:{identifier}"
current_time = int(time.time())
window_start = current_time - window
# 使用Redis管道提高性能
pipe = self.redis.pipeline()
# 移除过期的请求记录
pipe.zremrangebyscore(key, 0, window_start)
# 添加当前请求
pipe.zadd(key, {str(current_time): current_time})
# 获取当前窗口内的请求数
pipe.zcard(key)
# 设置过期时间
pipe.expire(key, window)
results = pipe.execute()
current_requests = results[2]
return current_requests <= limit
def wait_if_needed(self, identifier: str, limit: int, window: int):
"""等待直到可以发送请求"""
while not self.is_allowed(identifier, limit, window):
time.sleep(1)
# 使用分布式限制器
redis_client = redis.Redis(host='localhost', port=6379, db=0)
distributed_limiter = DistributedRateLimiter(redis_client)
def distributed_api_call(messages: list, user_id: str) -> str:
"""分布式频率控制的API调用"""
# 每用户每分钟60次请求
distributed_limiter.wait_if_needed(user_id, 60, 60)
response = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=messages
)
return response.choices[0].message.content实际应用场景
1. 批量处理
python
class BatchProcessor:
"""批量处理器,带频率控制"""
def __init__(self, batch_size: int = 10, rpm: int = 60):
self.batch_size = batch_size
self.rate_limiter = RateLimiter(max_requests=rpm, time_window=60)
self.results = []
def process_batch(self, items: list, process_func) -> list:
"""处理批量数据"""
results = []
for i in range(0, len(items), self.batch_size):
batch = items[i:i + self.batch_size]
# 频率控制
self.rate_limiter.wait_if_needed()
# 处理当前批次
try:
batch_results = process_func(batch)
results.extend(batch_results)
print(f"已处理批次 {i//self.batch_size + 1}/{(len(items)-1)//self.batch_size + 1}")
except Exception as e:
print(f"批次处理失败: {e}")
# 可以选择重试或跳过
continue
return results
def process_text_batch(text_batch: list) -> list:
"""处理文本批次"""
results = []
for text in text_batch:
messages = [{"role": "user", "content": f"请总结以下内容:{text}"}]
response = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=messages,
max_tokens=200
)
results.append(response.choices[0].message.content)
return results
# 使用示例
texts = ["文本1内容", "文本2内容", "文本3内容"] * 20 # 60个文本
processor = BatchProcessor(batch_size=5, rpm=30)
results = processor.process_batch(texts, process_text_batch)2. 实时聊天应用
python
class ChatRateLimiter:
"""聊天应用专用频率限制器"""
def __init__(self):
self.user_limiters = {} # 每用户的限制器
self.global_limiter = RateLimiter(max_requests=1000, time_window=60) # 全局限制
def get_user_limiter(self, user_id: str) -> RateLimiter:
"""获取用户的限制器"""
if user_id not in self.user_limiters:
self.user_limiters[user_id] = RateLimiter(max_requests=20, time_window=60) # 每用户每分钟20次
return self.user_limiters[user_id]
def can_send_message(self, user_id: str) -> bool:
"""检查是否可以发送消息"""
user_limiter = self.get_user_limiter(user_id)
# 检查用户限制和全局限制
return (len(user_limiter.requests) < user_limiter.max_requests and
len(self.global_limiter.requests) < self.global_limiter.max_requests)
def record_message(self, user_id: str):
"""记录消息发送"""
user_limiter = self.get_user_limiter(user_id)
user_limiter.requests.append(time.time())
self.global_limiter.requests.append(time.time())
# 使用示例
chat_limiter = ChatRateLimiter()
def send_chat_message(user_id: str, message: str) -> str:
"""发送聊天消息"""
if not chat_limiter.can_send_message(user_id):
raise Exception("发送频率过高,请稍后再试")
messages = [{"role": "user", "content": message}]
response = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=messages
)
chat_limiter.record_message(user_id)
return response.choices[0].message.content错误处理和重试
指数退避重试
python
import random
def exponential_backoff_retry(func, max_retries: int = 5, base_delay: float = 1.0):
"""指数退避重试装饰器"""
def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except openai.RateLimitError as e:
if attempt == max_retries - 1:
raise
# 计算退避时间:base_delay * (2^attempt) + 随机抖动
delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
print(f"频率限制,{delay:.1f}秒后重试 (尝试 {attempt + 1}/{max_retries})")
time.sleep(delay)
except Exception as e:
print(f"其他错误: {e}")
raise
return None
return wrapper
@exponential_backoff_retry(max_retries=5)
def robust_api_call(messages: list) -> str:
"""带重试的API调用"""
response = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=messages
)
return response.choices[0].message.content
# 使用示例
messages = [{"role": "user", "content": "测试消息"}]
response = robust_api_call(messages)监控和日志
频率限制监控
python
import logging
from dataclasses import dataclass
from typing import Dict, List
@dataclass
class RateLimitMetrics:
total_requests: int = 0
successful_requests: int = 0
rate_limited_requests: int = 0
failed_requests: int = 0
average_wait_time: float = 0.0
class RateLimitMonitor:
"""频率限制监控器"""
def __init__(self):
self.metrics = RateLimitMetrics()
self.wait_times = []
self.logger = logging.getLogger(__name__)
def record_request(self, success: bool, rate_limited: bool = False, wait_time: float = 0.0):
"""记录请求"""
self.metrics.total_requests += 1
if rate_limited:
self.metrics.rate_limited_requests += 1
elif success:
self.metrics.successful_requests += 1
else:
self.metrics.failed_requests += 1
if wait_time > 0:
self.wait_times.append(wait_time)
# 保持最近100次等待时间
if len(self.wait_times) > 100:
self.wait_times.pop(0)
self.metrics.average_wait_time = sum(self.wait_times) / len(self.wait_times)
def get_metrics(self) -> Dict:
"""获取监控指标"""
success_rate = (self.metrics.successful_requests / self.metrics.total_requests
if self.metrics.total_requests > 0 else 0)
return {
"total_requests": self.metrics.total_requests,
"success_rate": f"{success_rate:.2%}",
"rate_limited_requests": self.metrics.rate_limited_requests,
"failed_requests": self.metrics.failed_requests,
"average_wait_time": f"{self.metrics.average_wait_time:.2f}s"
}
def log_metrics(self):
"""记录监控指标"""
metrics = self.get_metrics()
self.logger.info(f"频率限制监控: {metrics}")
# 使用监控器
monitor = RateLimitMonitor()
def monitored_api_call(messages: list) -> str:
"""带监控的API调用"""
start_time = time.time()
try:
response = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=messages
)
wait_time = time.time() - start_time
monitor.record_request(success=True, wait_time=wait_time)
return response.choices[0].message.content
except openai.RateLimitError as e:
monitor.record_request(success=False, rate_limited=True)
raise
except Exception as e:
monitor.record_request(success=False)
raise最佳实践
1. 频率限制策略
- 根据API文档设置合理的限制
- 实现多层限制(用户级、应用级、全局)
- 使用令牌桶算法平滑请求
2. 错误处理
- 实现指数退避重试
- 区分不同类型的错误
- 记录详细的错误日志
3. 性能优化
- 使用分布式缓存
- 实现请求批处理
- 优化网络连接
4. 监控告警
- 监控频率限制命中率
- 设置异常告警
- 定期分析性能指标
限制和注意事项
- 延迟影响:频率控制会增加请求延迟
- 复杂性:实现复杂的限制逻辑会增加系统复杂性
- 资源消耗:监控和缓存会消耗额外资源
- 准确性:分布式环境下的限制可能不够精确
- 用户体验:过于严格的限制可能影响用户体验