APIXO 提供两种接收任务结果的方式:轮询(async 模式)和 webhooks(callback 模式)。本指南帮助你选择合适方式并正确实现。
快速对比
| Aspect | Polling (Async) | Webhooks (Callback) |
|---|
| 配置 | 简单 | 需要公网 endpoint |
| 实时性 | 准实时 | 即时 |
| API 调用 | 每任务多次 | 每任务一次 |
| 适用场景 | 开发、客户端应用 | 生产服务器 |
| 基础设施 | 无 | HTTPS endpoint |
何时使用轮询
以下情况选择轮询:
- 构建客户端应用
- 原型开发或测试
- 无公网服务器
- 任务量较少
基础轮询实现
async function pollForResult(model, taskId, apiKey) {
const interval = 3000; // 3 seconds
const maxAttempts = 60;
for (let i = 0; i < maxAttempts; i++) {
const response = await fetch(
`https://api.apixo.ai/api/v1/statusTask/${model}?taskId=${taskId}`,
{ headers: { 'Authorization': `Bearer ${apiKey}` } }
);
const { data } = await response.json();
if (data.state === 'success') {
return JSON.parse(data.resultJson).resultUrls;
}
if (data.state === 'failed') {
throw new Error(data.failMsg);
}
await new Promise(r => setTimeout(r, interval));
}
throw new Error('Timeout');
}
指数退避
提高效率可使用指数退避:
async function pollWithBackoff(model, taskId, apiKey) {
let interval = 3000; // Start at 3s
const maxInterval = 30000; // Max 30s
const maxTime = 300000; // 5 min timeout
let elapsed = 0;
while (elapsed < maxTime) {
const response = await fetch(
`https://api.apixo.ai/api/v1/statusTask/${model}?taskId=${taskId}`,
{ headers: { 'Authorization': `Bearer ${apiKey}` } }
);
const { data } = await response.json();
if (data.state === 'success') {
return JSON.parse(data.resultJson).resultUrls;
}
if (data.state === 'failed') {
throw new Error(data.failMsg);
}
await new Promise(r => setTimeout(r, interval));
elapsed += interval;
interval = Math.min(interval * 1.5, maxInterval);
}
throw new Error('Timeout');
}
按模型类型的推荐间隔
| Model Type | Initial Wait | Poll Interval | Max Wait |
|---|
| 图像(快速) | 5s | 3s | 2 min |
| 图像(高质量) | 10s | 5s | 3 min |
| 视频 | 60s | 15s | 10 min |
| 音频 | 30s | 10s | 5 min |
何时使用 Webhooks
以下情况选择 Webhooks:
- 运行生产服务器
- 高并发任务
- 需要实时通知
- 希望减少 API 调用
Webhook 配置
1. 创建 endpoint:
Express.js
FastAPI
Next.js
const express = require('express');
const app = express();
app.use(express.json());
app.post('/webhook/apixo', (req, res) => {
const { taskId, state, resultJson, failCode, failMsg } = req.body.data;
if (state === 'success') {
const urls = JSON.parse(resultJson).resultUrls;
console.log('Generated:', urls);
// Process your images/videos
} else if (state === 'failed') {
console.error(`Task ${taskId} failed: ${failCode}`);
}
res.status(200).send('OK');
});
app.listen(3000);
from fastapi import FastAPI, Request
import json
app = FastAPI()
@app.post('/webhook/apixo')
async def handle_webhook(request: Request):
body = await request.json()
data = body['data']
if data['state'] == 'success':
urls = json.loads(data['resultJson'])['resultUrls']
print(f"Generated: {urls}")
elif data['state'] == 'failed':
print(f"Task {data['taskId']} failed: {data['failCode']}")
return {'status': 'ok'}
// app/api/webhook/apixo/route.ts
import { NextRequest, NextResponse } from 'next/server';
export async function POST(request: NextRequest) {
const body = await request.json();
const { taskId, state, resultJson, failCode } = body.data;
if (state === 'success') {
const urls = JSON.parse(resultJson).resultUrls;
console.log('Generated:', urls);
} else if (state === 'failed') {
console.error(`Task ${taskId} failed: ${failCode}`);
}
return NextResponse.json({ status: 'ok' });
}
2. 提交任务时使用 callback:
const response = await fetch('https://api.apixo.ai/api/v1/generateTask/nano-banana', {
method: 'POST',
headers: {
'Authorization': 'Bearer YOUR_API_KEY',
'Content-Type': 'application/json',
},
body: JSON.stringify({
request_type: 'callback',
callback_url: 'https://your-domain.com/webhook/apixo',
input: {
mode: 'text-to-image',
prompt: 'A beautiful landscape',
},
}),
});
Webhook 要求
- 必须通过 HTTPS 公网可访问
- 必须在 30 秒内返回 HTTP 200
- 应能处理重复投递(使用
taskId 实现幂等)
重试策略
Webhook 失败时:
| Attempt | Delay |
|---|
| 1st retry | 30 seconds |
| 2nd retry | 2 minutes |
| 3rd retry | 10 minutes |
重试 3 次后 webhook 将不再投递。你仍可通过 status 接口查询。
混合方案
为兼顾可靠性,可结合两种方式:
class TaskManager {
constructor(apiKey) {
this.apiKey = apiKey;
this.pendingTasks = new Map();
}
async submit(model, input, callbackUrl = null) {
const response = await fetch(
`https://api.apixo.ai/api/v1/generateTask/${model}`,
{
method: 'POST',
headers: {
'Authorization': `Bearer ${this.apiKey}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({
request_type: callbackUrl ? 'callback' : 'async',
callback_url: callbackUrl,
input,
}),
}
);
const { data } = await response.json();
if (!callbackUrl) {
// Fallback to polling if no webhook
return this.poll(model, data.taskId);
}
// Track task for webhook
this.pendingTasks.set(data.taskId, { model, status: 'pending' });
return data.taskId;
}
handleWebhook(body) {
const { taskId, state, resultJson, failMsg } = body.data;
if (state === 'success') {
this.pendingTasks.delete(taskId);
return JSON.parse(resultJson).resultUrls;
}
if (state === 'failed') {
this.pendingTasks.delete(taskId);
throw new Error(failMsg);
}
}
async poll(model, taskId) {
// Fallback polling implementation
return pollWithBackoff(model, taskId, this.apiKey);
}
}
使用 ngrok 进行本地开发
测试 webhook 的本地环境:
# Install ngrok
npm install -g ngrok
# Start your local server
node server.js # runs on port 3000
# Tunnel to local server
ngrok http 3000
# Returns: https://abc123.ngrok.io
# Use ngrok URL as callback_url
| Scenario | Recommendation |
|---|
| 开发 / 测试 | Polling |
| 客户端应用 | Polling |
| 生产后端 | Webhooks |
| 高并发 | Webhooks |
| 最高可靠性 | 混合(Webhooks + 轮询兜底) |
建议先用轮询简化实现,随着应用规模增长再迁移到 Webhooks。