Skip to content

兼容 Anthropic SDK

本平台提供了对 Anthropic Claude SDK 的兼容支持,使开发者可以轻松地将现有的 Anthropic 应用迁移到本平台,享受更优质的服务和更优惠的价格。

概述

兼容性特点:

  • 完全兼容 Anthropic SDK API
  • 支持所有 Claude 模型
  • 保持相同的接口和参数
  • 无缝迁移现有代码
  • 提供增强功能

支持的模型

Claude 3 系列

  • claude-3-opus - 最强大的模型
  • claude-3-sonnet - 平衡性能和速度
  • claude-3-haiku - 快速响应模型

Claude 3.5 系列

  • claude-3.5-sonnet - 最新高性能模型

基本用法

安装和配置

bash
# 安装 Anthropic SDK
pip install anthropic
python
import anthropic

# 配置客户端
client = anthropic.Anthropic(
    api_key="your_realmrouter_api_key",  # 使用 RealmRouter 的 API Key
    base_url="https://realmrouter.cn/v1"  # 指向 RealmRouter 的端点
)

基本消息发送

python
# 发送简单消息
response = client.messages.create(
    model="claude-3-sonnet",
    max_tokens=1000,
    messages=[
        {
            "role": "user",
            "content": "请解释什么是人工智能"
        }
    ]
)

print(response.content[0].text)

流式响应

python
# 流式响应
with client.messages.stream(
    model="claude-3-sonnet",
    max_tokens=1000,
    messages=[
        {
            "role": "user",
            "content": "请写一首关于春天的诗"
        }
    ]
) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)
    print()  # 换行

高级功能

多轮对话

python
# 多轮对话示例
conversation = [
    {
        "role": "user",
        "content": "我想学习Python编程,请给我一些建议"
    }
]

response = client.messages.create(
    model="claude-3-sonnet",
    max_tokens=1000,
    messages=conversation
)

print("Claude:", response.content[0].text)

# 添加 Claude 的回复到对话历史
conversation.append({
    "role": "assistant",
    "content": response.content[0].text
})

# 继续对话
conversation.append({
    "role": "user",
    "content": "能推荐一些好的学习资源吗?"
})

response = client.messages.create(
    model="claude-3-sonnet",
    max_tokens=1000,
    messages=conversation
)

print("Claude:", response.content[0].text)

系统提示词

python
# 使用系统提示词
response = client.messages.create(
    model="claude-3-sonnet",
    max_tokens=1000,
    system="你是一个专业的Python编程导师,请用简单易懂的语言解释概念。",
    messages=[
        {
            "role": "user",
            "content": "什么是装饰器?"
        }
    ]
)

print(response.content[0].text)

图像处理

python
import base64

# 读取并编码图像
def encode_image(image_path):
    with open(image_path, "rb") as image_file:
        return base64.b64encode(image_file.read()).decode('utf-8')

# 图像分析
image_base64 = encode_image("example.jpg")

response = client.messages.create(
    model="claude-3-sonnet",
    max_tokens=1000,
    messages=[
        {
            "role": "user",
            "content": [
                {
                    "type": "text",
                    "text": "请描述这张图片的内容"
                },
                {
                    "type": "image",
                    "source": {
                        "type": "base64",
                        "media_type": "image/jpeg",
                        "data": image_base64
                    }
                }
            ]
        }
    ]
)

print(response.content[0].text)

工具使用(Function Calling)

python
# 定义工具
tools = [
    {
        "name": "get_weather",
        "description": "获取指定城市的天气信息",
        "input_schema": {
            "type": "object",
            "properties": {
                "city": {
                    "type": "string",
                    "description": "城市名称"
                },
                "unit": {
                    "type": "string",
                    "enum": ["celsius", "fahrenheit"],
                    "description": "温度单位"
                }
            },
            "required": ["city"]
        }
    }
]

# 使用工具
response = client.messages.create(
    model="claude-3-sonnet",
    max_tokens=1000,
    tools=tools,
    messages=[
        {
            "role": "user",
            "content": "北京今天天气怎么样?"
        }
    ]
)

# 检查是否使用了工具
if response.stop_reason == "tool_use":
    for content in response.content:
        if content.type == "tool_use":
            tool_use = content
            print(f"调用工具: {tool_use.name}")
            print(f"参数: {tool_use.input}")
            
            # 执行工具(这里用模拟函数)
            if tool_use.name == "get_weather":
                result = get_weather(tool_use.input["city"])
                
                # 继续对话,提供工具结果
                response = client.messages.create(
                    model="claude-3-sonnet",
                    max_tokens=1000,
                    messages=[
                        {
                            "role": "user",
                            "content": "北京今天天气怎么样?"
                        },
                        {
                            "role": "assistant",
                            "content": response.content
                        },
                        {
                            "role": "user",
                            "content": [
                                {
                                    "type": "tool_result",
                                    "tool_use_id": tool_use.id,
                                    "content": json.dumps(result)
                                }
                            ]
                        }
                    ]
                )
                
                print(response.content[0].text)

def get_weather(city):
    """模拟天气API"""
    weather_data = {
        "北京": {"temp": 25, "condition": "晴朗", "humidity": 60},
        "上海": {"temp": 28, "condition": "多云", "humidity": 70}
    }
    return weather_data.get(city, {"error": "城市未找到"})

迁移指南

从 Anthropic 官方 API 迁移

1. 更改端点

python
# 原来的代码
client = anthropic.Anthropic(
    api_key="your_anthropic_key"
)

# 迁移后的代码
client = anthropic.Anthropic(
    api_key="your_realmrouter_api_key",
    base_url="https://realmrouter.cn/v1"
)

2. 模型名称映射

python
# 模型名称映射表
MODEL_MAPPING = {
    # Anthropic 官方 -> RealmRouter
    "claude-3-opus-20240229": "claude-3-opus",
    "claude-3-sonnet-20240229": "claude-3-sonnet",
    "claude-3-haiku-20240307": "claude-3-haiku",
    "claude-3-5-sonnet-20240620": "claude-3.5-sonnet"
}

def get_realmrouter_model(anthropic_model):
    """获取对应的 RealmRouter 模型名"""
    return MODEL_MAPPING.get(anthropic_model, anthropic_model)

# 使用示例
original_model = "claude-3-sonnet-20240229"
realmrouter_model = get_realmrouter_model(original_model)

response = client.messages.create(
    model=realmrouter_model,  # 使用映射后的模型名
    max_tokens=1000,
    messages=[...]
)

3. 错误处理适配

python
import anthropic

def safe_api_call(client, model, messages, max_retries=3):
    """安全的 API 调用,适配 RealmRouter 错误格式"""
    for attempt in range(max_retries):
        try:
            response = client.messages.create(
                model=model,
                max_tokens=1000,
                messages=messages
            )
            return response
            
        except anthropic.RateLimitError as e:
            print(f"频率限制: {e}")
            if attempt < max_retries - 1:
                time.sleep(2 ** attempt)  # 指数退避
                continue
            raise
            
        except anthropic.APIError as e:
            print(f"API错误: {e}")
            if attempt < max_retries - 1:
                time.sleep(1)
                continue
            raise
            
        except Exception as e:
            print(f"未知错误: {e}")
            raise

# 使用示例
try:
    response = safe_api_call(client, "claude-3-sonnet", messages)
    print(response.content[0].text)
except Exception as e:
    print(f"调用失败: {e}")

性能优化

连接池配置

python
from anthropic import Anthropic
import httpx

# 配置连接池
http_client = httpx.Client(
    limits=httpx.Limits(max_keepalive_connections=10, max_connections=20),
    timeout=30.0
)

client = Anthropic(
    api_key="your_realmrouter_api_key",
    base_url="https://realmrouter.cn/v1",
    http_client=http_client
)

批量处理

python
import asyncio
from anthropic import AsyncAnthropic

async_client = AsyncAnthropic(
    api_key="your_realmrouter_api_key",
    base_url="https://realmrouter.cn/v1"
)

async def batch_process(messages_list):
    """批量处理消息"""
    tasks = []
    
    for messages in messages_list:
        task = async_client.messages.create(
            model="claude-3-sonnet",
            max_tokens=1000,
            messages=messages
        )
        tasks.append(task)
    
    responses = await asyncio.gather(*tasks, return_exceptions=True)
    
    results = []
    for i, response in enumerate(responses):
        if isinstance(response, Exception):
            print(f"请求 {i} 失败: {response}")
            results.append(None)
        else:
            results.append(response.content[0].text)
    
    return results

# 使用示例
async def main():
    message_batches = [
        [{"role": "user", "content": f"请生成第{i+1}个创意"}]
        for i in range(5)
    ]
    
    results = await batch_process(message_batches)
    for i, result in enumerate(results):
        print(f"结果 {i+1}: {result}")

# 运行异步任务
asyncio.run(main())

监控和调试

请求日志

python
import logging
from anthropic import Anthropic

# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class LoggingAnthropic(Anthropic):
    """带日志记录的 Anthropic 客户端"""
    
    def messages_create(self, **kwargs):
        """重写消息创建方法,添加日志"""
        logger.info(f"发送请求: model={kwargs.get('model')}, "
                   f"messages={len(kwargs.get('messages', []))}")
        
        start_time = time.time()
        
        try:
            response = super().messages.create(**kwargs)
            
            end_time = time.time()
            logger.info(f"请求成功: 耗时={end_time - start_time:.2f}s, "
                       f"输出tokens={response.usage.output_tokens}")
            
            return response
            
        except Exception as e:
            end_time = time.time()
            logger.error(f"请求失败: 耗时={end_time - start_time:.2f}s, "
                        f"错误={str(e)}")
            raise

# 使用日志客户端
client = LoggingAnthropic(
    api_key="your_realmrouter_api_key",
    base_url="https://realmrouter.cn/v1"
)

性能监控

python
import time
from dataclasses import dataclass
from typing import List, Dict

@dataclass
class PerformanceMetrics:
    """性能指标"""
    model: str
    response_time: float
    input_tokens: int
    output_tokens: int
    total_tokens: int
    success: bool

class PerformanceMonitor:
    """性能监控器"""
    
    def __init__(self):
        self.metrics: List[PerformanceMetrics] = []
    
    def record_call(self, model: str, response_time: float, 
                   input_tokens: int, output_tokens: int, success: bool):
        """记录调用指标"""
        metric = PerformanceMetrics(
            model=model,
            response_time=response_time,
            input_tokens=input_tokens,
            output_tokens=output_tokens,
            total_tokens=input_tokens + output_tokens,
            success=success
        )
        self.metrics.append(metric)
    
    def get_stats(self) -> Dict:
        """获取统计信息"""
        if not self.metrics:
            return {}
        
        total_calls = len(self.metrics)
        successful_calls = sum(1 for m in self.metrics if m.success)
        avg_response_time = sum(m.response_time for m in self.metrics) / total_calls
        total_tokens = sum(m.total_tokens for m in self.metrics)
        
        return {
            "total_calls": total_calls,
            "success_rate": successful_calls / total_calls,
            "avg_response_time": avg_response_time,
            "total_tokens": total_tokens
        }

# 使用监控器
monitor = PerformanceMonitor()

def monitored_call(client, model, messages):
    """带监控的调用"""
    start_time = time.time()
    
    try:
        response = client.messages.create(
            model=model,
            max_tokens=1000,
            messages=messages
        )
        
        response_time = time.time() - start_time
        
        monitor.record_call(
            model=model,
            response_time=response_time,
            input_tokens=response.usage.input_tokens,
            output_tokens=response.usage.output_tokens,
            success=True
        )
        
        return response
        
    except Exception as e:
        response_time = time.time() - start_time
        
        monitor.record_call(
            model=model,
            response_time=response_time,
            input_tokens=0,
            output_tokens=0,
            success=False
        )
        
        raise

# 查看性能统计
stats = monitor.get_stats()
print(f"性能统计: {stats}")

最佳实践

1. 错误处理

  • 实现重试机制
  • 处理频率限制
  • 记录详细错误信息

2. 性能优化

  • 使用连接池
  • 实现批量处理
  • 缓存常用结果

3. 安全考虑

  • 保护 API 密钥
  • 验证输入内容
  • 限制输出长度

4. 成本控制

  • 监控令牌使用
  • 选择合适的模型
  • 优化提示词

常见问题

Q: 如何处理模型名称差异?

A: 使用模型名称映射表,将 Anthropic 官方模型名转换为 RealmRouter 对应的模型名。

Q: 频率限制如何处理?

A: 实现指数退避重试机制,监控请求频率,避免触发限制。

Q: 如何确保兼容性?

A: 使用相同的 SDK 和接口,只更改端点和 API 密钥,其他代码保持不变。

Q: 性能如何优化?

A: 使用连接池、批量处理、异步调用等技术提高性能。

限制和注意事项

  1. 模型可用性:部分最新模型可能需要时间同步
  2. 功能差异:某些高级功能可能有细微差异
  3. 性能表现:响应时间可能因网络环境而异
  4. 计费方式:请确认 RealmRouter 的计费标准
  5. 服务等级:了解 RealmRouter 的 SLA 和服务保障

基于 MIT 许可发布 厦门界云聚算网络科技有限公司