频率限制
了解频率限制的工作原理以及如何处理限流错误
OpenModel 实施频率限制以确保公平使用和平台稳定性。了解这些限制的运作方式,有助于你构建能够优雅处理限流的应用。
限制维度
频率限制从两个维度进行衡量:
| 维度 | 说明 |
|---|---|
| RPM | 每分钟请求数 — 每分钟允许的 API 请求数量 |
| TPM | 每分钟 Token 数 — 每分钟允许的 Token 总量(输入 + 输出) |
数值为 0 表示不对该维度施加限制。
限制层级
限制在两个独立层级上生效,请求必须同时通过两个层级的检查:
用户级别限制
你的频率限制由账户所属的 用户组 决定。管理员为每个用户组配置 RPM 和 TPM 限制,从而为不同用户群体提供差异化的访问额度。
你可以在 控制台 中查看当前的用户组和限制配置。
通道级别限制
每个上游通道(提供商端点)也有独立的 RPM 和 TPM 限制,由平台管理员配置。这些限制保护上游提供商免受过多并发请求的冲击。
当某个通道的频率限制被触发时,网关可能会自动将请求路由到该模型的其他可用通道。
限制执行机制
OpenModel 使用 固定窗口 算法执行频率限制:
RPM 执行流程
- 在处理请求之前,网关递增你的 RPM 计数器
- 如果计数器超过限制,请求将被拒绝并返回
429 Too Many Requests - 计数器在每个分钟窗口的起始时刻重置
TPM 执行流程
- 在处理之前,网关 预估 你的输入 Token 数量(使用 Tokenizer 服务,或回退到
长度 / 4的启发式估算) - 将预估值与你的 TPM 限制进行比较
- 在响应完成后,使用提供商返回的实际 Token 数量 修正 计数器
这意味着预检阶段的 TPM 执行是近似的,但长期来看是准确的。
频率限制响应
当你超过频率限制时,API 返回 429 Too Many Requests 响应,格式与你使用的 API 格式匹配:
OpenAI 格式( /v1/responses):
{
"error": {
"message": "rate limit exceeded: user RPM limit",
"type": "rate_limit_error"
}
}Anthropic 格式( /v1/messages):
{
"type": "error",
"error": {
"type": "rate_limit_error",
"message": "rate limit exceeded: user RPM limit"
}
}Gemini 格式( /v1beta/models/*):
{
"error": {
"code": 429,
"message": "rate limit exceeded: user RPM limit",
"status": "RESOURCE_EXHAUSTED"
}
}处理频率限制
指数退避重试
收到 429 响应时,等待一段时间后再重试。使用指数退避配合随机抖动:
import time
import random
from openai import OpenAI, APIStatusError
client = OpenAI(
base_url="https://api.openmodel.ai/v1",
api_key="your-api-key",
)
def call_with_backoff(max_retries=5):
for attempt in range(max_retries):
try:
return client.responses.create(
model="gpt-4o",
input="你好!",
)
except APIStatusError as e:
if e.status_code == 429 and attempt < max_retries - 1:
delay = (2 ** attempt) + random.random()
print(f"触发频率限制,将在 {delay:.1f}s 后重试...")
time.sleep(delay)
else:
raise均匀分布请求
与其集中发送大量请求,不如将请求均匀分布在整个分钟窗口内。例如,如果你的 RPM 限制为 60,应该每秒发送约 1 个请求,而非在前几秒内发出全部 60 个请求。
import asyncio
from openai import AsyncOpenAI
client = AsyncOpenAI(
base_url="https://api.openmodel.ai/v1",
api_key="your-api-key",
)
async def process_batch(prompts: list[str], rps: float = 1.0):
"""按照受控速率处理提示词(每秒请求数)。"""
results = []
interval = 1.0 / rps
for prompt in prompts:
result = await client.responses.create(
model="gpt-4o",
input=prompt,
)
results.append(result)
await asyncio.sleep(interval)
return results使用流式响应提升吞吐
流式响应在第一个 Token 生成后即开始返回数据。虽然总请求时长相似,但感知延迟显著降低,并且你可以立即开始处理输出内容。
实现请求队列
对于高吞吐量应用,使用带有并发控制的请求队列以保持在频率限制之内:
import asyncio
from openai import AsyncOpenAI
client = AsyncOpenAI(
base_url="https://api.openmodel.ai/v1",
api_key="your-api-key",
)
async def process_with_concurrency(
prompts: list[str],
max_concurrent: int = 5
):
"""使用有界并发处理提示词。"""
semaphore = asyncio.Semaphore(max_concurrent)
async def limited_request(prompt: str):
async with semaphore:
return await client.responses.create(
model="gpt-4o",
input=prompt,
)
return await asyncio.gather(
*[limited_request(p) for p in prompts]
)监控使用情况
控制台 提供 API 使用情况的实时可视化:
- 仪表盘 — 查看请求数量、Token 用量和消费趋势
- 用量 — 按 API Key 分类的用量明细,支持日期范围筛选
- 日志 — 每条请求的详细消费记录
监控这些指标以了解你的使用模式,合理规划频率限制需求。
最佳实践
- 从一开始就为频率限制做设计 — 不要假设无限吞吐量,在应用架构中内置队列和退避机制。
- 为不同的服务或环境使用独立的 API Key,以隔离使用量。
- 尽可能缓存响应,减少不必要的 API 调用。
- 使用批量操作(如果 API 支持),而非发送大量小请求。
- 建立监控和告警 — 对 429 错误率设置监控,在接近限制时及时发现。