Skip to content

调用频率控制(Rate Limits)

调用频率控制(Rate Limits)是API服务的重要机制,用于保护系统稳定性、确保公平使用,并防止滥用。了解和合理处理频率限制对于构建稳定的应用至关重要。

概述

频率控制的目的:

  • 保护服务器资源
  • 确保服务质量
  • 防止恶意攻击
  • 实现公平使用
  • 控制成本

限制类型

1. 时间窗口限制

  • 每分钟请求数(RPM):每分钟允许的最大请求数
  • 每秒请求数(RPS):每秒允许的最大请求数
  • 每天请求数(RPD):每天允许的最大请求数

2. 并发限制

  • 同时连接数:允许的并发连接数量
  • 队列长度:等待处理的请求队列长度

3. 令牌限制

  • 输入令牌数:每分钟/天的输入令牌限制
  • 输出令牌数:每分钟/天的输出令牌限制

基本用法

检查限制状态

python
import openai
import time
import requests
from typing import Dict, Any

client = openai.OpenAI(
    api_key="your_api_key",
    base_url="https://realmrouter.cn/v1"
)

def get_rate_limits() -> Dict[str, Any]:
    """获取当前频率限制信息"""
    try:
        # 通过API响应头获取限制信息
        response = client.chat.completions.create(
            model="gpt-3.5-turbo",
            messages=[{"role": "user", "content": "test"}],
            max_tokens=1
        )
        
        # 从响应头提取限制信息(具体字段可能因平台而异)
        headers = response.response.headers
        
        return {
            "requests_remaining": headers.get("x-ratelimit-remaining-requests"),
            "requests_limit": headers.get("x-ratelimit-limit-requests"),
            "tokens_remaining": headers.get("x-ratelimit-remaining-tokens"),
            "tokens_limit": headers.get("x-ratelimit-limit-tokens"),
            "reset_time": headers.get("x-ratelimit-reset")
        }
        
    except Exception as e:
        print(f"获取限制信息失败: {e}")
        return {}

# 使用示例
limits = get_rate_limits()
print(f"剩余请求数: {limits.get('requests_remaining')}")
print(f"请求限制: {limits.get('requests_limit')}")

基本的频率控制

python
class RateLimiter:
    """简单的频率限制器"""
    
    def __init__(self, max_requests: int, time_window: int):
        self.max_requests = max_requests
        self.time_window = time_window  # 秒
        self.requests = []
    
    def wait_if_needed(self):
        """如果需要,等待直到可以发送请求"""
        now = time.time()
        
        # 清理过期的请求记录
        self.requests = [req_time for req_time in self.requests 
                        if now - req_time < self.time_window]
        
        # 检查是否超过限制
        if len(self.requests) >= self.max_requests:
            # 计算需要等待的时间
            oldest_request = min(self.requests)
            wait_time = self.time_window - (now - oldest_request)
            
            if wait_time > 0:
                print(f"达到频率限制,等待 {wait_time:.1f} 秒...")
                time.sleep(wait_time)
        
        # 记录当前请求
        self.requests.append(now)

# 使用频率限制器
rate_limiter = RateLimiter(max_requests=60, time_window=60)  # 每分钟60次请求

def safe_api_call(messages: list) -> str:
    """带频率控制的API调用"""
    rate_limiter.wait_if_needed()
    
    try:
        response = client.chat.completions.create(
            model="gpt-3.5-turbo",
            messages=messages,
            max_tokens=1000
        )
        return response.choices[0].message.content
        
    except openai.RateLimitError as e:
        print(f"频率限制错误: {e}")
        # 等待更长时间后重试
        time.sleep(60)
        return safe_api_call(messages)
    except Exception as e:
        print(f"API调用失败: {e}")
        raise

# 使用示例
for i in range(10):
    messages = [{"role": "user", "content": f"请生成第{i+1}个创意句子"}]
    response = safe_api_call(messages)
    print(f"响应 {i+1}: {response[:50]}...")

高级功能

令牌桶算法

python
import threading
from collections import deque

class TokenBucket:
    """令牌桶算法实现"""
    
    def __init__(self, capacity: int, refill_rate: float):
        self.capacity = capacity  # 桶容量
        self.refill_rate = refill_rate  # 每秒填充速率
        self.tokens = capacity  # 当前令牌数
        self.last_refill = time.time()
        self.lock = threading.Lock()
    
    def consume(self, tokens: int = 1) -> bool:
        """消费令牌"""
        with self.lock:
            self._refill()
            
            if self.tokens >= tokens:
                self.tokens -= tokens
                return True
            return False
    
    def _refill(self):
        """填充令牌"""
        now = time.time()
        elapsed = now - self.last_refill
        tokens_to_add = elapsed * self.refill_rate
        
        self.tokens = min(self.capacity, self.tokens + tokens_to_add)
        self.last_refill = now
    
    def wait_for_token(self, tokens: int = 1):
        """等待直到有足够令牌"""
        while not self.consume(tokens):
            time.sleep(0.1)

# 使用令牌桶
token_bucket = TokenBucket(capacity=10, refill_rate=1.0)  # 容量10,每秒填充1个

def token_bucket_api_call(messages: list) -> str:
    """使用令牌桶的API调用"""
    token_bucket.wait_for_token()
    
    response = client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=messages
    )
    
    return response.choices[0].message.content

自适应频率控制

python
class AdaptiveRateLimiter:
    """自适应频率限制器"""
    
    def __init__(self, initial_rpm: int = 60):
        self.current_rpm = initial_rpm
        self.min_rpm = 10
        self.max_rpm = 300
        self.success_count = 0
        self.error_count = 0
        self.last_adjustment = time.time()
        self.adjustment_interval = 300  # 5分钟调整一次
        self.request_times = deque()
    
    def wait_if_needed(self):
        """根据当前RPM等待"""
        now = time.time()
        
        # 清理一分钟前的请求
        one_minute_ago = now - 60
        while self.request_times and self.request_times[0] < one_minute_ago:
            self.request_times.popleft()
        
        # 计算当前RPM
        current_rpm = len(self.request_times)
        
        if current_rpm >= self.current_rpm:
            # 计算需要等待的时间
            if self.request_times:
                oldest_request = self.request_times[0]
                wait_time = 60 - (now - oldest_request)
                if wait_time > 0:
                    time.sleep(wait_time)
        
        self.request_times.append(now)
        
        # 定期调整RPM
        self._adjust_rpm()
    
    def record_success(self):
        """记录成功请求"""
        self.success_count += 1
    
    def record_error(self):
        """记录错误请求"""
        self.error_count += 1
    
    def _adjust_rpm(self):
        """调整RPM"""
        now = time.time()
        if now - self.last_adjustment < self.adjustment_interval:
            return
        
        total_requests = self.success_count + self.error_count
        
        if total_requests == 0:
            return
        
        error_rate = self.error_count / total_requests
        
        # 根据错误率调整RPM
        if error_rate > 0.1:  # 错误率超过10%
            self.current_rpm = max(self.min_rpm, int(self.current_rpm * 0.8))
            print(f"错误率过高 ({error_rate:.2%}),降低RPM至 {self.current_rpm}")
        elif error_rate < 0.02:  # 错误率低于2%
            self.current_rpm = min(self.max_rpm, int(self.current_rpm * 1.2))
            print(f"错误率较低 ({error_rate:.2%}),提高RPM至 {self.current_rpm}")
        
        # 重置计数器
        self.success_count = 0
        self.error_count = 0
        self.last_adjustment = now

# 使用自适应限制器
adaptive_limiter = AdaptiveRateLimiter(initial_rpm=60)

def adaptive_api_call(messages: list) -> str:
    """自适应频率控制的API调用"""
    adaptive_limiter.wait_if_needed()
    
    try:
        response = client.chat.completions.create(
            model="gpt-3.5-turbo",
            messages=messages
        )
        adaptive_limiter.record_success()
        return response.choices[0].message.content
        
    except openai.RateLimitError as e:
        adaptive_limiter.record_error()
        print(f"频率限制: {e}")
        # 等待后重试
        time.sleep(30)
        return adaptive_api_call(messages)
    except Exception as e:
        adaptive_limiter.record_error()
        print(f"API错误: {e}")
        raise

分布式频率控制

python
import redis
import json

class DistributedRateLimiter:
    """基于Redis的分布式频率限制器"""
    
    def __init__(self, redis_client: redis.Redis, key_prefix: str = "rate_limit"):
        self.redis = redis_client
        self.key_prefix = key_prefix
    
    def is_allowed(self, identifier: str, limit: int, window: int) -> bool:
        """检查是否允许请求"""
        key = f"{self.key_prefix}:{identifier}"
        current_time = int(time.time())
        window_start = current_time - window
        
        # 使用Redis管道提高性能
        pipe = self.redis.pipeline()
        
        # 移除过期的请求记录
        pipe.zremrangebyscore(key, 0, window_start)
        
        # 添加当前请求
        pipe.zadd(key, {str(current_time): current_time})
        
        # 获取当前窗口内的请求数
        pipe.zcard(key)
        
        # 设置过期时间
        pipe.expire(key, window)
        
        results = pipe.execute()
        current_requests = results[2]
        
        return current_requests <= limit
    
    def wait_if_needed(self, identifier: str, limit: int, window: int):
        """等待直到可以发送请求"""
        while not self.is_allowed(identifier, limit, window):
            time.sleep(1)

# 使用分布式限制器
redis_client = redis.Redis(host='localhost', port=6379, db=0)
distributed_limiter = DistributedRateLimiter(redis_client)

def distributed_api_call(messages: list, user_id: str) -> str:
    """分布式频率控制的API调用"""
    # 每用户每分钟60次请求
    distributed_limiter.wait_if_needed(user_id, 60, 60)
    
    response = client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=messages
    )
    
    return response.choices[0].message.content

实际应用场景

1. 批量处理

python
class BatchProcessor:
    """批量处理器,带频率控制"""
    
    def __init__(self, batch_size: int = 10, rpm: int = 60):
        self.batch_size = batch_size
        self.rate_limiter = RateLimiter(max_requests=rpm, time_window=60)
        self.results = []
    
    def process_batch(self, items: list, process_func) -> list:
        """处理批量数据"""
        results = []
        
        for i in range(0, len(items), self.batch_size):
            batch = items[i:i + self.batch_size]
            
            # 频率控制
            self.rate_limiter.wait_if_needed()
            
            # 处理当前批次
            try:
                batch_results = process_func(batch)
                results.extend(batch_results)
                print(f"已处理批次 {i//self.batch_size + 1}/{(len(items)-1)//self.batch_size + 1}")
                
            except Exception as e:
                print(f"批次处理失败: {e}")
                # 可以选择重试或跳过
                continue
        
        return results

def process_text_batch(text_batch: list) -> list:
    """处理文本批次"""
    results = []
    for text in text_batch:
        messages = [{"role": "user", "content": f"请总结以下内容:{text}"}]
        
        response = client.chat.completions.create(
            model="gpt-3.5-turbo",
            messages=messages,
            max_tokens=200
        )
        
        results.append(response.choices[0].message.content)
    
    return results

# 使用示例
texts = ["文本1内容", "文本2内容", "文本3内容"] * 20  # 60个文本
processor = BatchProcessor(batch_size=5, rpm=30)
results = processor.process_batch(texts, process_text_batch)

2. 实时聊天应用

python
class ChatRateLimiter:
    """聊天应用专用频率限制器"""
    
    def __init__(self):
        self.user_limiters = {}  # 每用户的限制器
        self.global_limiter = RateLimiter(max_requests=1000, time_window=60)  # 全局限制
    
    def get_user_limiter(self, user_id: str) -> RateLimiter:
        """获取用户的限制器"""
        if user_id not in self.user_limiters:
            self.user_limiters[user_id] = RateLimiter(max_requests=20, time_window=60)  # 每用户每分钟20次
        return self.user_limiters[user_id]
    
    def can_send_message(self, user_id: str) -> bool:
        """检查是否可以发送消息"""
        user_limiter = self.get_user_limiter(user_id)
        
        # 检查用户限制和全局限制
        return (len(user_limiter.requests) < user_limiter.max_requests and 
                len(self.global_limiter.requests) < self.global_limiter.max_requests)
    
    def record_message(self, user_id: str):
        """记录消息发送"""
        user_limiter = self.get_user_limiter(user_id)
        user_limiter.requests.append(time.time())
        self.global_limiter.requests.append(time.time())

# 使用示例
chat_limiter = ChatRateLimiter()

def send_chat_message(user_id: str, message: str) -> str:
    """发送聊天消息"""
    if not chat_limiter.can_send_message(user_id):
        raise Exception("发送频率过高,请稍后再试")
    
    messages = [{"role": "user", "content": message}]
    
    response = client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=messages
    )
    
    chat_limiter.record_message(user_id)
    return response.choices[0].message.content

错误处理和重试

指数退避重试

python
import random

def exponential_backoff_retry(func, max_retries: int = 5, base_delay: float = 1.0):
    """指数退避重试装饰器"""
    def wrapper(*args, **kwargs):
        for attempt in range(max_retries):
            try:
                return func(*args, **kwargs)
            except openai.RateLimitError as e:
                if attempt == max_retries - 1:
                    raise
                
                # 计算退避时间:base_delay * (2^attempt) + 随机抖动
                delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
                print(f"频率限制,{delay:.1f}秒后重试 (尝试 {attempt + 1}/{max_retries})")
                time.sleep(delay)
                
            except Exception as e:
                print(f"其他错误: {e}")
                raise
        
        return None
    
    return wrapper

@exponential_backoff_retry(max_retries=5)
def robust_api_call(messages: list) -> str:
    """带重试的API调用"""
    response = client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=messages
    )
    return response.choices[0].message.content

# 使用示例
messages = [{"role": "user", "content": "测试消息"}]
response = robust_api_call(messages)

监控和日志

频率限制监控

python
import logging
from dataclasses import dataclass
from typing import Dict, List

@dataclass
class RateLimitMetrics:
    total_requests: int = 0
    successful_requests: int = 0
    rate_limited_requests: int = 0
    failed_requests: int = 0
    average_wait_time: float = 0.0

class RateLimitMonitor:
    """频率限制监控器"""
    
    def __init__(self):
        self.metrics = RateLimitMetrics()
        self.wait_times = []
        self.logger = logging.getLogger(__name__)
    
    def record_request(self, success: bool, rate_limited: bool = False, wait_time: float = 0.0):
        """记录请求"""
        self.metrics.total_requests += 1
        
        if rate_limited:
            self.metrics.rate_limited_requests += 1
        elif success:
            self.metrics.successful_requests += 1
        else:
            self.metrics.failed_requests += 1
        
        if wait_time > 0:
            self.wait_times.append(wait_time)
            # 保持最近100次等待时间
            if len(self.wait_times) > 100:
                self.wait_times.pop(0)
            
            self.metrics.average_wait_time = sum(self.wait_times) / len(self.wait_times)
    
    def get_metrics(self) -> Dict:
        """获取监控指标"""
        success_rate = (self.metrics.successful_requests / self.metrics.total_requests 
                       if self.metrics.total_requests > 0 else 0)
        
        return {
            "total_requests": self.metrics.total_requests,
            "success_rate": f"{success_rate:.2%}",
            "rate_limited_requests": self.metrics.rate_limited_requests,
            "failed_requests": self.metrics.failed_requests,
            "average_wait_time": f"{self.metrics.average_wait_time:.2f}s"
        }
    
    def log_metrics(self):
        """记录监控指标"""
        metrics = self.get_metrics()
        self.logger.info(f"频率限制监控: {metrics}")

# 使用监控器
monitor = RateLimitMonitor()

def monitored_api_call(messages: list) -> str:
    """带监控的API调用"""
    start_time = time.time()
    
    try:
        response = client.chat.completions.create(
            model="gpt-3.5-turbo",
            messages=messages
        )
        
        wait_time = time.time() - start_time
        monitor.record_request(success=True, wait_time=wait_time)
        
        return response.choices[0].message.content
        
    except openai.RateLimitError as e:
        monitor.record_request(success=False, rate_limited=True)
        raise
    except Exception as e:
        monitor.record_request(success=False)
        raise

最佳实践

1. 频率限制策略

  • 根据API文档设置合理的限制
  • 实现多层限制(用户级、应用级、全局)
  • 使用令牌桶算法平滑请求

2. 错误处理

  • 实现指数退避重试
  • 区分不同类型的错误
  • 记录详细的错误日志

3. 性能优化

  • 使用分布式缓存
  • 实现请求批处理
  • 优化网络连接

4. 监控告警

  • 监控频率限制命中率
  • 设置异常告警
  • 定期分析性能指标

限制和注意事项

  1. 延迟影响:频率控制会增加请求延迟
  2. 复杂性:实现复杂的限制逻辑会增加系统复杂性
  3. 资源消耗:监控和缓存会消耗额外资源
  4. 准确性:分布式环境下的限制可能不够精确
  5. 用户体验:过于严格的限制可能影响用户体验

基于 MIT 许可发布 厦门界云聚算网络科技有限公司