大语言模型监控
大语言模型监控是确保API服务质量、优化性能和成本控制的重要手段。通过全面的监控指标,可以及时发现问题、优化使用策略并提升用户体验。
概述
监控的重要性:
- 实时了解API性能
- 及时发现异常情况
- 优化成本和资源使用
- 提升用户体验
- 支持业务决策
核心监控指标
1. 性能指标
- 响应时间:API调用的延迟
- 吞吐量:单位时间内的请求数
- 可用性:服务的可用时间比例
- 错误率:失败请求的比例
2. 使用指标
- 请求次数:总调用次数
- 令牌使用量:输入/输出令牌数
- 模型使用分布:各模型的使用比例
- 用户活跃度:活跃用户数量
3. 成本指标
- API费用:总消费金额
- 单次调用成本:平均每次调用费用
- 成本趋势:费用变化趋势
- ROI分析:投入产出比
基本监控实现
简单监控器
python
import openai
import time
import json
from typing import Dict, List, Any
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
client = openai.OpenAI(
api_key="your_api_key",
base_url="https://realmrouter.cn/v1"
)
@dataclass
class APIMetrics:
"""API调用指标"""
timestamp: datetime
model: str
response_time: float
input_tokens: int
output_tokens: int
total_tokens: int
success: bool
error_message: str = ""
cost: float = 0.0
class BasicMonitor:
"""基础监控器"""
def __init__(self):
self.metrics: List[APIMetrics] = []
self.model_costs = {
"gpt-3.5-turbo": {"input": 0.0015, "output": 0.002}, # 每1K tokens价格
"gpt-4": {"input": 0.03, "output": 0.06},
"gpt-4-turbo": {"input": 0.01, "output": 0.03}
}
def record_call(self, model: str, response_time: float,
input_tokens: int, output_tokens: int,
success: bool, error_message: str = ""):
"""记录API调用"""
total_tokens = input_tokens + output_tokens
cost = self._calculate_cost(model, input_tokens, output_tokens)
metric = APIMetrics(
timestamp=datetime.now(),
model=model,
response_time=response_time,
input_tokens=input_tokens,
output_tokens=output_tokens,
total_tokens=total_tokens,
success=success,
error_message=error_message,
cost=cost
)
self.metrics.append(metric)
# 保持最近1000条记录
if len(self.metrics) > 1000:
self.metrics = self.metrics[-1000:]
def _calculate_cost(self, model: str, input_tokens: int, output_tokens: int) -> float:
"""计算调用成本"""
if model not in self.model_costs:
return 0.0
input_cost = (input_tokens / 1000) * self.model_costs[model]["input"]
output_cost = (output_tokens / 1000) * self.model_costs[model]["output"]
return input_cost + output_cost
def get_summary(self, hours: int = 24) -> Dict[str, Any]:
"""获取监控摘要"""
cutoff_time = datetime.now() - timedelta(hours=hours)
recent_metrics = [m for m in self.metrics if m.timestamp >= cutoff_time]
if not recent_metrics:
return {"message": "没有数据"}
total_calls = len(recent_metrics)
successful_calls = sum(1 for m in recent_metrics if m.success)
failed_calls = total_calls - successful_calls
avg_response_time = sum(m.response_time for m in recent_metrics) / total_calls
total_tokens = sum(m.total_tokens for m in recent_metrics)
total_cost = sum(m.cost for m in recent_metrics)
# 按模型统计
model_stats = {}
for metric in recent_metrics:
if metric.model not in model_stats:
model_stats[metric.model] = {
"calls": 0,
"tokens": 0,
"cost": 0.0,
"success_rate": 0.0
}
model_stats[metric.model]["calls"] += 1
model_stats[metric.model]["tokens"] += metric.total_tokens
model_stats[metric.model]["cost"] += metric.cost
# 计算成功率
for model in model_stats:
model_calls = [m for m in recent_metrics if m.model == model]
successful = sum(1 for m in model_calls if m.success)
model_stats[model]["success_rate"] = successful / len(model_calls)
return {
"time_range_hours": hours,
"total_calls": total_calls,
"successful_calls": successful_calls,
"failed_calls": failed_calls,
"success_rate": successful_calls / total_calls,
"avg_response_time": f"{avg_response_time:.2f}s",
"total_tokens": total_tokens,
"total_cost": f"${total_cost:.4f}",
"model_breakdown": model_stats
}
# 使用监控器
monitor = BasicMonitor()
def monitored_api_call(model: str, messages: list, **kwargs) -> str:
"""带监控的API调用"""
start_time = time.time()
try:
response = client.chat.completions.create(
model=model,
messages=messages,
**kwargs
)
response_time = time.time() - start_time
# 提取使用信息
usage = response.usage
input_tokens = usage.prompt_tokens
output_tokens = usage.completion_tokens
# 记录成功调用
monitor.record_call(
model=model,
response_time=response_time,
input_tokens=input_tokens,
output_tokens=output_tokens,
success=True
)
return response.choices[0].message.content
except Exception as e:
response_time = time.time() - start_time
# 记录失败调用
monitor.record_call(
model=model,
response_time=response_time,
input_tokens=0,
output_tokens=0,
success=False,
error_message=str(e)
)
raise
# 使用示例
messages = [{"role": "user", "content": "请解释什么是人工智能"}]
response = monitored_api_call("gpt-3.5-turbo", messages)
print(response)
# 查看监控摘要
summary = monitor.get_summary(24)
print(json.dumps(summary, indent=2, ensure_ascii=False))高级监控功能
实时监控仪表板
python
import threading
import queue
from collections import defaultdict, deque
class RealTimeMonitor:
"""实时监控器"""
def __init__(self, update_interval: int = 5):
self.update_interval = update_interval
self.metrics_queue = queue.Queue()
self.current_stats = defaultdict(int)
self.response_times = deque(maxlen=100)
self.error_counts = defaultdict(int)
self.running = False
self.update_thread = None
def start(self):
"""启动监控"""
self.running = True
self.update_thread = threading.Thread(target=self._update_loop)
self.update_thread.daemon = True
self.update_thread.start()
print("实时监控已启动")
def stop(self):
"""停止监控"""
self.running = False
if self.update_thread:
self.update_thread.join()
print("实时监控已停止")
def record_metric(self, metric: APIMetrics):
"""记录指标"""
self.metrics_queue.put(metric)
def _update_loop(self):
"""更新循环"""
while self.running:
try:
# 处理队列中的指标
while not self.metrics_queue.empty():
metric = self.metrics_queue.get_nowait()
self._process_metric(metric)
# 更新统计信息
self._update_stats()
time.sleep(self.update_interval)
except Exception as e:
print(f"监控更新错误: {e}")
def _process_metric(self, metric: APIMetrics):
"""处理单个指标"""
self.current_stats["total_calls"] += 1
if metric.success:
self.current_stats["successful_calls"] += 1
self.response_times.append(metric.response_time)
self.current_stats["total_tokens"] += metric.total_tokens
self.current_stats["total_cost"] += metric.cost
else:
self.current_stats["failed_calls"] += 1
self.error_counts[metric.error_message] += 1
self.current_stats[f"model_{metric.model}_calls"] += 1
def _update_stats(self):
"""更新统计信息"""
if self.response_times:
self.current_stats["avg_response_time"] = sum(self.response_times) / len(self.response_times)
self.current_stats["max_response_time"] = max(self.response_times)
self.current_stats["min_response_time"] = min(self.response_times)
total = self.current_stats["total_calls"]
if total > 0:
self.current_stats["success_rate"] = self.current_stats["successful_calls"] / total
def get_current_stats(self) -> Dict[str, Any]:
"""获取当前统计信息"""
stats = dict(self.current_stats)
# 添加格式化信息
if "avg_response_time" in stats:
stats["avg_response_time_formatted"] = f"{stats['avg_response_time']:.2f}s"
if "total_cost" in stats:
stats["total_cost_formatted"] = f"${stats['total_cost']:.4f}"
if "success_rate" in stats:
stats["success_rate_formatted"] = f"{stats['success_rate']:.2%}"
return stats
def print_dashboard(self):
"""打印仪表板"""
stats = self.get_current_stats()
print("\n" + "="*50)
print("API监控仪表板")
print("="*50)
print(f"总调用次数: {stats.get('total_calls', 0)}")
print(f"成功调用: {stats.get('successful_calls', 0)}")
print(f"失败调用: {stats.get('failed_calls', 0)}")
print(f"成功率: {stats.get('success_rate_formatted', 'N/A')}")
print(f"平均响应时间: {stats.get('avg_response_time_formatted', 'N/A')}")
print(f"总令牌数: {stats.get('total_tokens', 0)}")
print(f"总成本: {stats.get('total_cost_formatted', 'N/A')}")
print("\n模型使用情况:")
for key, value in stats.items():
if key.startswith("model_") and key.endswith("_calls"):
model_name = key[6:-6] # 提取模型名
print(f" {model_name}: {value} 次调用")
if self.error_counts:
print("\n错误统计:")
for error, count in self.error_counts.items():
print(f" {error}: {count} 次")
print("="*50)
# 使用实时监控
realtime_monitor = RealTimeMonitor(update_interval=5)
realtime_monitor.start()
def realtime_monitored_call(model: str, messages: list, **kwargs) -> str:
"""实时监控的API调用"""
start_time = time.time()
try:
response = client.chat.completions.create(
model=model,
messages=messages,
**kwargs
)
response_time = time.time() - start_time
usage = response.usage
metric = APIMetrics(
timestamp=datetime.now(),
model=model,
response_time=response_time,
input_tokens=usage.prompt_tokens,
output_tokens=usage.completion_tokens,
total_tokens=usage.total_tokens,
success=True
)
realtime_monitor.record_metric(metric)
return response.choices[0].message.content
except Exception as e:
response_time = time.time() - start_time
metric = APIMetrics(
timestamp=datetime.now(),
model=model,
response_time=response_time,
input_tokens=0,
output_tokens=0,
total_tokens=0,
success=False,
error_message=str(e)
)
realtime_monitor.record_metric(metric)
raise
# 定期打印仪表板
def dashboard_printer():
while True:
time.sleep(10) # 每10秒打印一次
realtime_monitor.print_dashboard()
# 启动仪表板打印线程
import threading
dashboard_thread = threading.Thread(target=dashboard_printer)
dashboard_thread.daemon = True
dashboard_thread.start()告警系统
python
import smtplib
from email.mime.text import MIMEText
from typing import Callable, List
class AlertManager:
"""告警管理器"""
def __init__(self):
self.alert_rules: List[Callable] = []
self.alert_history = deque(maxlen=100)
def add_rule(self, rule_func: Callable):
"""添加告警规则"""
self.alert_rules.append(rule_func)
def check_alerts(self, stats: Dict[str, Any]):
"""检查告警条件"""
for rule in self.alert_rules:
try:
alert = rule(stats)
if alert:
self._send_alert(alert)
except Exception as e:
print(f"告警检查失败: {e}")
def _send_alert(self, alert: Dict[str, Any]):
"""发送告警"""
alert["timestamp"] = datetime.now()
self.alert_history.append(alert)
print(f"🚨 告警: {alert['message']}")
# 这里可以添加邮件、短信等通知方式
# self._send_email_alert(alert)
# self._send_sms_alert(alert)
# 预定义告警规则
def high_error_rate_alert(stats: Dict[str, Any]) -> Dict[str, Any]:
"""高错误率告警"""
success_rate = stats.get("success_rate", 1.0)
if success_rate < 0.9: # 成功率低于90%
return {
"type": "high_error_rate",
"message": f"错误率过高: {(1-success_rate):.2%}",
"severity": "warning"
}
return None
def slow_response_alert(stats: Dict[str, Any]) -> Dict[str, Any]:
"""响应缓慢告警"""
avg_response_time = stats.get("avg_response_time", 0)
if avg_response_time > 5.0: # 平均响应时间超过5秒
return {
"type": "slow_response",
"message": f"响应时间过长: {avg_response_time:.2f}s",
"severity": "warning"
}
return None
def high_cost_alert(stats: Dict[str, Any]) -> Dict[str, Any]:
"""高成本告警"""
total_cost = stats.get("total_cost", 0)
if total_cost > 100: # 成本超过$100
return {
"type": "high_cost",
"message": f"成本过高: ${total_cost:.2f}",
"severity": "critical"
}
return None
# 集成告警系统
class MonitoredAPIClient:
"""带监控和告警的API客户端"""
def __init__(self):
self.monitor = RealTimeMonitor()
self.alert_manager = AlertManager()
self._setup_alerts()
def _setup_alerts(self):
"""设置告警规则"""
self.alert_manager.add_rule(high_error_rate_alert)
self.alert_manager.add_rule(slow_response_alert)
self.alert_manager.add_rule(high_cost_alert)
def start_monitoring(self):
"""开始监控"""
self.monitor.start()
# 启动告警检查线程
def alert_checker():
while True:
stats = self.monitor.get_current_stats()
self.alert_manager.check_alerts(stats)
time.sleep(30) # 每30秒检查一次
alert_thread = threading.Thread(target=alert_checker)
alert_thread.daemon = True
alert_thread.start()
def chat_completion(self, model: str, messages: list, **kwargs) -> str:
"""带监控的聊天完成"""
start_time = time.time()
try:
response = client.chat.completions.create(
model=model,
messages=messages,
**kwargs
)
response_time = time.time() - start_time
usage = response.usage
metric = APIMetrics(
timestamp=datetime.now(),
model=model,
response_time=response_time,
input_tokens=usage.prompt_tokens,
output_tokens=usage.completion_tokens,
total_tokens=usage.total_tokens,
success=True
)
self.monitor.record_metric(metric)
return response.choices[0].message.content
except Exception as e:
response_time = time.time() - start_time
metric = APIMetrics(
timestamp=datetime.now(),
model=model,
response_time=response_time,
input_tokens=0,
output_tokens=0,
total_tokens=0,
success=False,
error_message=str(e)
)
self.monitor.record_metric(metric)
raise
# 使用监控客户端
monitored_client = MonitoredAPIClient()
monitored_client.start_monitoring()
# 进行API调用
response = monitored_client.chat_completion(
"gpt-3.5-turbo",
[{"role": "user", "content": "测试消息"}]
)数据持久化
数据库存储
python
import sqlite3
from contextlib import contextmanager
class DatabaseMonitor:
"""基于数据库的监控"""
def __init__(self, db_path: str = "api_monitoring.db"):
self.db_path = db_path
self._init_database()
def _init_database(self):
"""初始化数据库"""
with self._get_connection() as conn:
conn.execute('''
CREATE TABLE IF NOT EXISTS api_metrics (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp DATETIME,
model TEXT,
response_time REAL,
input_tokens INTEGER,
output_tokens INTEGER,
total_tokens INTEGER,
success BOOLEAN,
error_message TEXT,
cost REAL
)
''')
conn.execute('''
CREATE INDEX IF NOT EXISTS idx_timestamp
ON api_metrics(timestamp)
''')
conn.execute('''
CREATE INDEX IF NOT EXISTS idx_model
ON api_metrics(model)
''')
@contextmanager
def _get_connection(self):
"""获取数据库连接"""
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
def store_metric(self, metric: APIMetrics):
"""存储指标"""
with self._get_connection() as conn:
conn.execute('''
INSERT INTO api_metrics
(timestamp, model, response_time, input_tokens,
output_tokens, total_tokens, success, error_message, cost)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
metric.timestamp,
metric.model,
metric.response_time,
metric.input_tokens,
metric.output_tokens,
metric.total_tokens,
metric.success,
metric.error_message,
metric.cost
))
def get_metrics(self, hours: int = 24) -> List[Dict]:
"""获取指标数据"""
cutoff_time = datetime.now() - timedelta(hours=hours)
with self._get_connection() as conn:
cursor = conn.execute('''
SELECT * FROM api_metrics
WHERE timestamp >= ?
ORDER BY timestamp DESC
''', (cutoff_time,))
return [dict(row) for row in cursor.fetchall()]
def get_aggregated_stats(self, hours: int = 24) -> Dict:
"""获取聚合统计"""
cutoff_time = datetime.now() - timedelta(hours=hours)
with self._get_connection() as conn:
cursor = conn.execute('''
SELECT
COUNT(*) as total_calls,
SUM(CASE WHEN success = 1 THEN 1 ELSE 0 END) as successful_calls,
AVG(response_time) as avg_response_time,
SUM(total_tokens) as total_tokens,
SUM(cost) as total_cost,
model
FROM api_metrics
WHERE timestamp >= ?
GROUP BY model
''', (cutoff_time,))
results = cursor.fetchall()
total_calls = sum(row["total_calls"] for row in results)
successful_calls = sum(row["successful_calls"] for row in results)
total_tokens = sum(row["total_tokens"] for row in results)
total_cost = sum(row["total_cost"] for row in results)
return {
"time_range_hours": hours,
"total_calls": total_calls,
"successful_calls": successful_calls,
"success_rate": successful_calls / total_calls if total_calls > 0 else 0,
"total_tokens": total_tokens,
"total_cost": total_cost,
"by_model": [dict(row) for row in results]
}
# 使用数据库监控
db_monitor = DatabaseMonitor()
def database_monitored_call(model: str, messages: list, **kwargs) -> str:
"""数据库监控的API调用"""
start_time = time.time()
try:
response = client.chat.completions.create(
model=model,
messages=messages,
**kwargs
)
response_time = time.time() - start_time
usage = response.usage
metric = APIMetrics(
timestamp=datetime.now(),
model=model,
response_time=response_time,
input_tokens=usage.prompt_tokens,
output_tokens=usage.completion_tokens,
total_tokens=usage.total_tokens,
success=True
)
db_monitor.store_metric(metric)
return response.choices[0].message.content
except Exception as e:
response_time = time.time() - start_time
metric = APIMetrics(
timestamp=datetime.now(),
model=model,
response_time=response_time,
input_tokens=0,
output_tokens=0,
total_tokens=0,
success=False,
error_message=str(e)
)
db_monitor.store_metric(metric)
raise
# 查询统计信息
stats = db_monitor.get_aggregated_stats(24)
print(json.dumps(stats, indent=2, ensure_ascii=False))可视化和报告
生成监控报告
python
import matplotlib.pyplot as plt
import pandas as pd
from datetime import datetime, timedelta
class MonitoringReporter:
"""监控报告生成器"""
def __init__(self, db_monitor: DatabaseMonitor):
self.db_monitor = db_monitor
def generate_daily_report(self, date: datetime = None) -> str:
"""生成日报"""
if date is None:
date = datetime.now().date()
start_time = datetime.combine(date, datetime.min.time())
end_time = start_time + timedelta(days=1)
with self.db_monitor._get_connection() as conn:
# 获取当日数据
cursor = conn.execute('''
SELECT * FROM api_metrics
WHERE timestamp >= ? AND timestamp < ?
ORDER BY timestamp
''', (start_time, end_time))
data = [dict(row) for row in cursor.fetchall()]
if not data:
return f"日期 {date} 没有数据"
# 生成报告
report = f"""
# API监控日报 - {date}
## 总体概况
- 总调用次数: {len(data)}
- 成功调用: {sum(1 for d in data if d['success'])}
- 失败调用: {sum(1 for d in data if not d['success'])}
- 成功率: {sum(1 for d in data if d['success']) / len(data):.2%}
- 平均响应时间: {sum(d['response_time'] for d in data) / len(data):.2f}s
- 总令牌数: {sum(d['total_tokens'] for d in data)}
- 总成本: ${sum(d['cost'] for d in data):.4f}
## 模型使用情况
"""
# 按模型统计
model_stats = {}
for d in data:
model = d['model']
if model not in model_stats:
model_stats[model] = {
'calls': 0,
'tokens': 0,
'cost': 0,
'success_rate': 0
}
model_stats[model]['calls'] += 1
model_stats[model]['tokens'] += d['total_tokens']
model_stats[model]['cost'] += d['cost']
for model, stats in model_stats.items():
model_data = [d for d in data if d['model'] == model]
success_rate = sum(1 for d in model_data if d['success']) / len(model_data)
stats['success_rate'] = success_rate
report += f"""
### {model}
- 调用次数: {stats['calls']}
- 成功率: {success_rate:.2%}
- 令牌数: {stats['tokens']}
- 成本: ${stats['cost']:.4f}
"""
# 错误分析
errors = [d for d in data if not d['success']]
if errors:
report += "\n## 错误分析\n"
error_counts = {}
for error in errors:
msg = error['error_message']
error_counts[msg] = error_counts.get(msg, 0) + 1
for error_msg, count in error_counts.items():
report += f"- {error_msg}: {count}次\n"
return report
def plot_usage_trends(self, days: int = 7):
"""绘制使用趋势图"""
end_time = datetime.now()
start_time = end_time - timedelta(days=days)
with self.db_monitor._get_connection() as conn:
cursor = conn.execute('''
SELECT DATE(timestamp) as date,
COUNT(*) as calls,
SUM(total_tokens) as tokens,
SUM(cost) as cost
FROM api_metrics
WHERE timestamp >= ?
GROUP BY DATE(timestamp)
ORDER BY date
''', (start_time,))
data = [dict(row) for row in cursor.fetchall()]
if not data:
print("没有数据可绘制")
return
df = pd.DataFrame(data)
df['date'] = pd.to_datetime(df['date'])
# 创建子图
fig, (ax1, ax2, ax3) = plt.subplots(3, 1, figsize=(12, 10))
# 调用次数趋势
ax1.plot(df['date'], df['calls'], marker='o')
ax1.set_title('API调用次数趋势')
ax1.set_ylabel('调用次数')
ax1.grid(True)
# 令牌使用趋势
ax2.plot(df['date'], df['tokens'], marker='s', color='orange')
ax2.set_title('令牌使用趋势')
ax2.set_ylabel('令牌数')
ax2.grid(True)
# 成本趋势
ax3.plot(df['date'], df['cost'], marker='^', color='red')
ax3.set_title('成本趋势')
ax3.set_ylabel('成本 ($)')
ax3.grid(True)
plt.tight_layout()
plt.savefig('api_usage_trends.png', dpi=300, bbox_inches='tight')
plt.show()
print("趋势图已保存为 api_usage_trends.png")
# 使用报告生成器
reporter = MonitoringReporter(db_monitor)
# 生成日报
daily_report = reporter.generate_daily_report()
print(daily_report)
# 绘制趋势图
reporter.plot_usage_trends(7)最佳实践
1. 监控策略
- 设置合理的监控指标
- 实现分层监控(实时、小时、天)
- 建立告警阈值和规则
- 定期分析监控数据
2. 数据管理
- 定期清理历史数据
- 实现数据备份和恢复
- 优化数据库查询性能
- 确保数据一致性
3. 告警管理
- 设置合理的告警阈值
- 避免告警风暴
- 实现告警升级机制
- 记录告警处理过程
4. 性能优化
- 使用异步处理
- 实现批量数据写入
- 优化数据库索引
- 缓存常用查询结果
限制和注意事项
- 存储成本:大量监控数据需要存储空间
- 性能影响:监控可能影响API性能
- 数据准确性:网络问题可能导致数据丢失
- 隐私保护:监控数据可能包含敏感信息
- 维护成本:监控系统需要持续维护