Swape.AISwape.AI Docs

频率限制

了解频率限制的工作原理以及如何处理限流错误

OpenModel 实施频率限制以确保公平使用和平台稳定性。了解这些限制的运作方式,有助于你构建能够优雅处理限流的应用。

限制维度

频率限制从两个维度进行衡量:

维度说明
RPM每分钟请求数 — 每分钟允许的 API 请求数量
TPM每分钟 Token 数 — 每分钟允许的 Token 总量(输入 + 输出)

数值为 0 表示不对该维度施加限制。

限制层级

限制在两个独立层级上生效,请求必须同时通过两个层级的检查:

用户级别限制

你的频率限制由账户所属的 用户组 决定。管理员为每个用户组配置 RPM 和 TPM 限制,从而为不同用户群体提供差异化的访问额度。

你可以在 控制台 中查看当前的用户组和限制配置。

通道级别限制

每个上游通道(提供商端点)也有独立的 RPM 和 TPM 限制,由平台管理员配置。这些限制保护上游提供商免受过多并发请求的冲击。

当某个通道的频率限制被触发时,网关可能会自动将请求路由到该模型的其他可用通道。

限制执行机制

OpenModel 使用 固定窗口 算法执行频率限制:

RPM 执行流程

  1. 在处理请求之前,网关递增你的 RPM 计数器
  2. 如果计数器超过限制,请求将被拒绝并返回 429 Too Many Requests
  3. 计数器在每个分钟窗口的起始时刻重置

TPM 执行流程

  1. 在处理之前,网关 预估 你的输入 Token 数量(使用 Tokenizer 服务,或回退到 长度 / 4 的启发式估算)
  2. 将预估值与你的 TPM 限制进行比较
  3. 在响应完成后,使用提供商返回的实际 Token 数量 修正 计数器

这意味着预检阶段的 TPM 执行是近似的,但长期来看是准确的。

频率限制响应

当你超过频率限制时,API 返回 429 Too Many Requests 响应,格式与你使用的 API 格式匹配:

OpenAI 格式/v1/responses):

{
  "error": {
    "message": "rate limit exceeded: user RPM limit",
    "type": "rate_limit_error"
  }
}

Anthropic 格式/v1/messages):

{
  "type": "error",
  "error": {
    "type": "rate_limit_error",
    "message": "rate limit exceeded: user RPM limit"
  }
}

Gemini 格式/v1beta/models/*):

{
  "error": {
    "code": 429,
    "message": "rate limit exceeded: user RPM limit",
    "status": "RESOURCE_EXHAUSTED"
  }
}

处理频率限制

指数退避重试

收到 429 响应时,等待一段时间后再重试。使用指数退避配合随机抖动:

import time
import random
from openai import OpenAI, APIStatusError

client = OpenAI(
    base_url="https://api.openmodel.ai/v1",
    api_key="your-api-key",
)

def call_with_backoff(max_retries=5):
    for attempt in range(max_retries):
        try:
            return client.responses.create(
                model="gpt-4o",
                input="你好!",
            )
        except APIStatusError as e:
            if e.status_code == 429 and attempt < max_retries - 1:
                delay = (2 ** attempt) + random.random()
                print(f"触发频率限制,将在 {delay:.1f}s 后重试...")
                time.sleep(delay)
            else:
                raise

均匀分布请求

与其集中发送大量请求,不如将请求均匀分布在整个分钟窗口内。例如,如果你的 RPM 限制为 60,应该每秒发送约 1 个请求,而非在前几秒内发出全部 60 个请求。

import asyncio
from openai import AsyncOpenAI

client = AsyncOpenAI(
    base_url="https://api.openmodel.ai/v1",
    api_key="your-api-key",
)

async def process_batch(prompts: list[str], rps: float = 1.0):
    """按照受控速率处理提示词(每秒请求数)。"""
    results = []
    interval = 1.0 / rps
    for prompt in prompts:
        result = await client.responses.create(
            model="gpt-4o",
            input=prompt,
        )
        results.append(result)
        await asyncio.sleep(interval)
    return results

使用流式响应提升吞吐

流式响应在第一个 Token 生成后即开始返回数据。虽然总请求时长相似,但感知延迟显著降低,并且你可以立即开始处理输出内容。

实现请求队列

对于高吞吐量应用,使用带有并发控制的请求队列以保持在频率限制之内:

import asyncio
from openai import AsyncOpenAI

client = AsyncOpenAI(
    base_url="https://api.openmodel.ai/v1",
    api_key="your-api-key",
)

async def process_with_concurrency(
    prompts: list[str],
    max_concurrent: int = 5
):
    """使用有界并发处理提示词。"""
    semaphore = asyncio.Semaphore(max_concurrent)

    async def limited_request(prompt: str):
        async with semaphore:
            return await client.responses.create(
                model="gpt-4o",
                input=prompt,
            )

    return await asyncio.gather(
        *[limited_request(p) for p in prompts]
    )

监控使用情况

控制台 提供 API 使用情况的实时可视化:

  • 仪表盘 — 查看请求数量、Token 用量和消费趋势
  • 用量 — 按 API Key 分类的用量明细,支持日期范围筛选
  • 日志 — 每条请求的详细消费记录

监控这些指标以了解你的使用模式,合理规划频率限制需求。

最佳实践

  • 从一开始就为频率限制做设计 — 不要假设无限吞吐量,在应用架构中内置队列和退避机制。
  • 为不同的服务或环境使用独立的 API Key,以隔离使用量。
  • 尽可能缓存响应,减少不必要的 API 调用。
  • 使用批量操作(如果 API 支持),而非发送大量小请求。
  • 建立监控和告警 — 对 429 错误率设置监控,在接近限制时及时发现。

On this page