兼容 Anthropic SDK
本平台提供了对 Anthropic Claude SDK 的兼容支持,使开发者可以轻松地将现有的 Anthropic 应用迁移到本平台,享受更优质的服务和更优惠的价格。
概述
兼容性特点:
- 完全兼容 Anthropic SDK API
- 支持所有 Claude 模型
- 保持相同的接口和参数
- 无缝迁移现有代码
- 提供增强功能
支持的模型
Claude 3 系列
claude-3-opus- 最强大的模型claude-3-sonnet- 平衡性能和速度claude-3-haiku- 快速响应模型
Claude 3.5 系列
claude-3.5-sonnet- 最新高性能模型
基本用法
安装和配置
bash
# 安装 Anthropic SDK
pip install anthropicpython
import anthropic
# 配置客户端
client = anthropic.Anthropic(
api_key="your_realmrouter_api_key", # 使用 RealmRouter 的 API Key
base_url="https://realmrouter.cn/v1" # 指向 RealmRouter 的端点
)基本消息发送
python
# 发送简单消息
response = client.messages.create(
model="claude-3-sonnet",
max_tokens=1000,
messages=[
{
"role": "user",
"content": "请解释什么是人工智能"
}
]
)
print(response.content[0].text)流式响应
python
# 流式响应
with client.messages.stream(
model="claude-3-sonnet",
max_tokens=1000,
messages=[
{
"role": "user",
"content": "请写一首关于春天的诗"
}
]
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
print() # 换行高级功能
多轮对话
python
# 多轮对话示例
conversation = [
{
"role": "user",
"content": "我想学习Python编程,请给我一些建议"
}
]
response = client.messages.create(
model="claude-3-sonnet",
max_tokens=1000,
messages=conversation
)
print("Claude:", response.content[0].text)
# 添加 Claude 的回复到对话历史
conversation.append({
"role": "assistant",
"content": response.content[0].text
})
# 继续对话
conversation.append({
"role": "user",
"content": "能推荐一些好的学习资源吗?"
})
response = client.messages.create(
model="claude-3-sonnet",
max_tokens=1000,
messages=conversation
)
print("Claude:", response.content[0].text)系统提示词
python
# 使用系统提示词
response = client.messages.create(
model="claude-3-sonnet",
max_tokens=1000,
system="你是一个专业的Python编程导师,请用简单易懂的语言解释概念。",
messages=[
{
"role": "user",
"content": "什么是装饰器?"
}
]
)
print(response.content[0].text)图像处理
python
import base64
# 读取并编码图像
def encode_image(image_path):
with open(image_path, "rb") as image_file:
return base64.b64encode(image_file.read()).decode('utf-8')
# 图像分析
image_base64 = encode_image("example.jpg")
response = client.messages.create(
model="claude-3-sonnet",
max_tokens=1000,
messages=[
{
"role": "user",
"content": [
{
"type": "text",
"text": "请描述这张图片的内容"
},
{
"type": "image",
"source": {
"type": "base64",
"media_type": "image/jpeg",
"data": image_base64
}
}
]
}
]
)
print(response.content[0].text)工具使用(Function Calling)
python
# 定义工具
tools = [
{
"name": "get_weather",
"description": "获取指定城市的天气信息",
"input_schema": {
"type": "object",
"properties": {
"city": {
"type": "string",
"description": "城市名称"
},
"unit": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "温度单位"
}
},
"required": ["city"]
}
}
]
# 使用工具
response = client.messages.create(
model="claude-3-sonnet",
max_tokens=1000,
tools=tools,
messages=[
{
"role": "user",
"content": "北京今天天气怎么样?"
}
]
)
# 检查是否使用了工具
if response.stop_reason == "tool_use":
for content in response.content:
if content.type == "tool_use":
tool_use = content
print(f"调用工具: {tool_use.name}")
print(f"参数: {tool_use.input}")
# 执行工具(这里用模拟函数)
if tool_use.name == "get_weather":
result = get_weather(tool_use.input["city"])
# 继续对话,提供工具结果
response = client.messages.create(
model="claude-3-sonnet",
max_tokens=1000,
messages=[
{
"role": "user",
"content": "北京今天天气怎么样?"
},
{
"role": "assistant",
"content": response.content
},
{
"role": "user",
"content": [
{
"type": "tool_result",
"tool_use_id": tool_use.id,
"content": json.dumps(result)
}
]
}
]
)
print(response.content[0].text)
def get_weather(city):
"""模拟天气API"""
weather_data = {
"北京": {"temp": 25, "condition": "晴朗", "humidity": 60},
"上海": {"temp": 28, "condition": "多云", "humidity": 70}
}
return weather_data.get(city, {"error": "城市未找到"})迁移指南
从 Anthropic 官方 API 迁移
1. 更改端点
python
# 原来的代码
client = anthropic.Anthropic(
api_key="your_anthropic_key"
)
# 迁移后的代码
client = anthropic.Anthropic(
api_key="your_realmrouter_api_key",
base_url="https://realmrouter.cn/v1"
)2. 模型名称映射
python
# 模型名称映射表
MODEL_MAPPING = {
# Anthropic 官方 -> RealmRouter
"claude-3-opus-20240229": "claude-3-opus",
"claude-3-sonnet-20240229": "claude-3-sonnet",
"claude-3-haiku-20240307": "claude-3-haiku",
"claude-3-5-sonnet-20240620": "claude-3.5-sonnet"
}
def get_realmrouter_model(anthropic_model):
"""获取对应的 RealmRouter 模型名"""
return MODEL_MAPPING.get(anthropic_model, anthropic_model)
# 使用示例
original_model = "claude-3-sonnet-20240229"
realmrouter_model = get_realmrouter_model(original_model)
response = client.messages.create(
model=realmrouter_model, # 使用映射后的模型名
max_tokens=1000,
messages=[...]
)3. 错误处理适配
python
import anthropic
def safe_api_call(client, model, messages, max_retries=3):
"""安全的 API 调用,适配 RealmRouter 错误格式"""
for attempt in range(max_retries):
try:
response = client.messages.create(
model=model,
max_tokens=1000,
messages=messages
)
return response
except anthropic.RateLimitError as e:
print(f"频率限制: {e}")
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # 指数退避
continue
raise
except anthropic.APIError as e:
print(f"API错误: {e}")
if attempt < max_retries - 1:
time.sleep(1)
continue
raise
except Exception as e:
print(f"未知错误: {e}")
raise
# 使用示例
try:
response = safe_api_call(client, "claude-3-sonnet", messages)
print(response.content[0].text)
except Exception as e:
print(f"调用失败: {e}")性能优化
连接池配置
python
from anthropic import Anthropic
import httpx
# 配置连接池
http_client = httpx.Client(
limits=httpx.Limits(max_keepalive_connections=10, max_connections=20),
timeout=30.0
)
client = Anthropic(
api_key="your_realmrouter_api_key",
base_url="https://realmrouter.cn/v1",
http_client=http_client
)批量处理
python
import asyncio
from anthropic import AsyncAnthropic
async_client = AsyncAnthropic(
api_key="your_realmrouter_api_key",
base_url="https://realmrouter.cn/v1"
)
async def batch_process(messages_list):
"""批量处理消息"""
tasks = []
for messages in messages_list:
task = async_client.messages.create(
model="claude-3-sonnet",
max_tokens=1000,
messages=messages
)
tasks.append(task)
responses = await asyncio.gather(*tasks, return_exceptions=True)
results = []
for i, response in enumerate(responses):
if isinstance(response, Exception):
print(f"请求 {i} 失败: {response}")
results.append(None)
else:
results.append(response.content[0].text)
return results
# 使用示例
async def main():
message_batches = [
[{"role": "user", "content": f"请生成第{i+1}个创意"}]
for i in range(5)
]
results = await batch_process(message_batches)
for i, result in enumerate(results):
print(f"结果 {i+1}: {result}")
# 运行异步任务
asyncio.run(main())监控和调试
请求日志
python
import logging
from anthropic import Anthropic
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class LoggingAnthropic(Anthropic):
"""带日志记录的 Anthropic 客户端"""
def messages_create(self, **kwargs):
"""重写消息创建方法,添加日志"""
logger.info(f"发送请求: model={kwargs.get('model')}, "
f"messages={len(kwargs.get('messages', []))}")
start_time = time.time()
try:
response = super().messages.create(**kwargs)
end_time = time.time()
logger.info(f"请求成功: 耗时={end_time - start_time:.2f}s, "
f"输出tokens={response.usage.output_tokens}")
return response
except Exception as e:
end_time = time.time()
logger.error(f"请求失败: 耗时={end_time - start_time:.2f}s, "
f"错误={str(e)}")
raise
# 使用日志客户端
client = LoggingAnthropic(
api_key="your_realmrouter_api_key",
base_url="https://realmrouter.cn/v1"
)性能监控
python
import time
from dataclasses import dataclass
from typing import List, Dict
@dataclass
class PerformanceMetrics:
"""性能指标"""
model: str
response_time: float
input_tokens: int
output_tokens: int
total_tokens: int
success: bool
class PerformanceMonitor:
"""性能监控器"""
def __init__(self):
self.metrics: List[PerformanceMetrics] = []
def record_call(self, model: str, response_time: float,
input_tokens: int, output_tokens: int, success: bool):
"""记录调用指标"""
metric = PerformanceMetrics(
model=model,
response_time=response_time,
input_tokens=input_tokens,
output_tokens=output_tokens,
total_tokens=input_tokens + output_tokens,
success=success
)
self.metrics.append(metric)
def get_stats(self) -> Dict:
"""获取统计信息"""
if not self.metrics:
return {}
total_calls = len(self.metrics)
successful_calls = sum(1 for m in self.metrics if m.success)
avg_response_time = sum(m.response_time for m in self.metrics) / total_calls
total_tokens = sum(m.total_tokens for m in self.metrics)
return {
"total_calls": total_calls,
"success_rate": successful_calls / total_calls,
"avg_response_time": avg_response_time,
"total_tokens": total_tokens
}
# 使用监控器
monitor = PerformanceMonitor()
def monitored_call(client, model, messages):
"""带监控的调用"""
start_time = time.time()
try:
response = client.messages.create(
model=model,
max_tokens=1000,
messages=messages
)
response_time = time.time() - start_time
monitor.record_call(
model=model,
response_time=response_time,
input_tokens=response.usage.input_tokens,
output_tokens=response.usage.output_tokens,
success=True
)
return response
except Exception as e:
response_time = time.time() - start_time
monitor.record_call(
model=model,
response_time=response_time,
input_tokens=0,
output_tokens=0,
success=False
)
raise
# 查看性能统计
stats = monitor.get_stats()
print(f"性能统计: {stats}")最佳实践
1. 错误处理
- 实现重试机制
- 处理频率限制
- 记录详细错误信息
2. 性能优化
- 使用连接池
- 实现批量处理
- 缓存常用结果
3. 安全考虑
- 保护 API 密钥
- 验证输入内容
- 限制输出长度
4. 成本控制
- 监控令牌使用
- 选择合适的模型
- 优化提示词
常见问题
Q: 如何处理模型名称差异?
A: 使用模型名称映射表,将 Anthropic 官方模型名转换为 RealmRouter 对应的模型名。
Q: 频率限制如何处理?
A: 实现指数退避重试机制,监控请求频率,避免触发限制。
Q: 如何确保兼容性?
A: 使用相同的 SDK 和接口,只更改端点和 API 密钥,其他代码保持不变。
Q: 性能如何优化?
A: 使用连接池、批量处理、异步调用等技术提高性能。
限制和注意事项
- 模型可用性:部分最新模型可能需要时间同步
- 功能差异:某些高级功能可能有细微差异
- 性能表现:响应时间可能因网络环境而异
- 计费方式:请确认 RealmRouter 的计费标准
- 服务等级:了解 RealmRouter 的 SLA 和服务保障