🔄 整体架构

┌─────────────┐     ①启动任务      ┌─────────────┐     ③后台执行     ┌─────────────┐
│   前端      │ ──────────────────▶ │   后端API   │ ──────────────▶  │  后台线程   │
│  (Vue.js)   │ ◀────────────────── │  (FastAPI)  │                   │ (ThreadPool)│
└─────────────┘   ②返回task_id     └─────────────┘                   └─────────────┘
      │                                   ▲                                 │
      │         ④轮询查询状态              │           ⑤更新任务状态          │
      └───────────────────────────────────┴─────────────────────────────────┘

📦 一、批量添加功能的实现

1. 后端部分(backend/main.py

步骤1: 接收请求,立即返回

@app.post("/api/alicloud/rules/batch-add-advanced-execute")
async def alicloud_batch_add_rules_advanced_execute(request, db):
    # 生成唯一任务ID
    task_id = str(uuid.uuid4())
    
    # 初始化任务状态(存入内存字典)
    with batch_add_tasks_lock:
        batch_add_tasks[task_id] = {
            "status": "running",
            "message": "批量添加任务正在执行中...",
            "created_at": datetime.now().isoformat()
        }
    
    # 定义后台执行函数
    def batch_add_in_background():
        # ... 实际的添加逻辑 ...
        # 完成后更新状态
        with batch_add_tasks_lock:
            batch_add_tasks[task_id] = {
                "status": "success",  # 或 "partial_success" / "failed"
                "message": "执行完成",
                "execution_stats": {"success": 10, "failed": 2}
            }
    
    # 启动后台线程(不阻塞当前请求)
    loop = asyncio.get_event_loop()
    loop.run_in_executor(None, batch_add_in_background)
    
    # 立即返回 task_id(不等待执行完成)
    return {"success": True, "task_id": task_id, "message": "任务已启动"}

步骤2: 任务状态查询API

@app.get("/api/batch-add/task/{task_id}")
async def get_batch_add_task_status(task_id: str):
    with batch_add_tasks_lock:
        task = batch_add_tasks.get(task_id)
        if not task:
            raise HTTPException(status_code=404, detail="任务不存在")
        return {"success": True, **task}

数据存储结构

batch_add_tasks = {
    "uuid-xxx-xxx": {
        "status": "running" | "success" | "partial_success" | "failed",
        "message": "执行中...",
        "execution_stats": {"success": 0, "failed": 0},
        "execution_details": [...],
        "created_at": "2026-01-23T14:00:00",
        "completed_at": "2026-01-23T14:05:00"
    }
}

2. 前端部分(frontend/app_alicloud.js

步骤1: 发起请求,开始轮询

async executeBatchAddAdvanced() {
    // 显示加载动画
    this.batchAddAdvancedDialog.executing = true;
    
    // 调用后端API
    const response = await axios.post('/api/alicloud/rules/batch-add-advanced-execute', data);
    
    if (response.data.success && response.data.task_id) {
        // 获取到 task_id,开始轮询
        await this.pollBatchAddTaskStatus(response.data.task_id);
    }
}

步骤2: 轮询任务状态

async pollBatchAddTaskStatus(taskId, attempt = 0) {
    const maxAttempts = 200;  // 最多200次(10分钟)
    const pollInterval = 3000; // 每3秒查一次
    
    // 第一次延迟2秒再查(给后端初始化时间)
    if (attempt === 0) {
        await new Promise(resolve => setTimeout(resolve, 2000));
    }
    
    // 查询任务状态
    const response = await axios.get(`/api/batch-add/task/${taskId}`);
    const task = response.data;
    
    if (task.status === 'running') {
        // 还在执行,3秒后继续轮询
        setTimeout(() => this.pollBatchAddTaskStatus(taskId, attempt + 1), pollInterval);
    } 
    else if (task.status === 'success' || task.status === 'partial_success') {
        // 执行完成!
        this.batchAddAdvancedDialog.executing = false;  // 关闭加载
        this.batchAddAdvancedDialog.step = 3;           // 显示结果页
        this.batchAddAdvancedDialog.executionStats = task.execution_stats;
        
        ElMessage.success(`成功 ${task.execution_stats.success}`);
        
        // 刷新页面数据
        await this.searchSecurityGroups();
        await this.loadStatistics();
    }
    else if (task.status === 'failed') {
        // 执行失败
        this.batchAddAdvancedDialog.executing = false;
        ElMessage.error(task.message);
    }
}

🔄 二、同步功能的实现

后端部分

@app.post("/api/sync/manual/{region}")
async def sync_manual(region: str, db):
    # 启动后台同步任务
    def sync_in_background():
        bg_db = SessionLocal()  # 新建数据库连接
        try:
            sync_service = get_sync_service(bg_db)
            result = sync_service.sync_region(region, SyncTypeEnum.MANUAL)
            logger.info(f"✅ 同步完成: {result}")
        finally:
            bg_db.close()
    
    loop = asyncio.get_event_loop()
    loop.run_in_executor(None, sync_in_background)
    
    # 立即返回
    return {"success": True, "message": "同步任务已启动"}

前端部分

同步功能通过查询同步日志来判断任务是否完成:

async pollSyncStatus(region, syncStartTime, attempt = 0) {
    // 查询该区域最新的同步日志
    const response = await axios.get(`/api/sync/logs/latest?region=${region}`);
    const log = response.data;
    
    // 判断是否是当前任务的日志(通过时间戳)
    const logCreatedTime = new Date(log.created_at).getTime();
    const isNewLog = logCreatedTime >= syncStartTime - 2000;
    
    if (isNewLog && log.status === 'completed') {
        // 同步完成!
        ElMessage.success('同步成功');
        await this.searchSecurityGroups();
    } else {
        // 继续轮询
        setTimeout(() => this.pollSyncStatus(region, syncStartTime, attempt + 1), 3000);
    }
}

🔑 三、关键技术点

技术点 说明
run_in_executor Python异步框架中,将同步阻塞代码放到线程池执行,不阻塞主线程
batch_add_tasks 字典 内存中存储任务状态,前端可随时查询
batch_add_tasks_lock 线程锁,保证多线程访问字典时的数据安全
SessionLocal() 后台线程需要新建数据库会话,不能用主线程的
syncStartTime 前端记录任务开始时间,用于判断日志是否属于当前任务
轮询间隔 3秒 平衡服务器压力和用户体验
最大轮询200次 10分钟超时保护,防止无限轮询

📊 四、时序图

前端                          后端                         后台线程
 │                             │                             │
 │── POST /batch-add-execute ─▶│                             │
 │                             │── 生成 task_id              │
 │                             │── 初始化状态 "running"       │
 │                             │── run_in_executor() ───────▶│
 │◀── 返回 {task_id} ─────────│                             │
 │                             │                             │── 执行添加规则...
 │   (等待2秒)                  │                             │
 │── GET /task/{task_id} ─────▶│                             │
 │◀── {status: "running"} ────│                             │
 │                             │                             │── 执行同步...
 │   (等待3秒)                  │                             │
 │── GET /task/{task_id} ─────▶│                             │
 │◀── {status: "running"} ────│                             │
 │                             │                             │
 │   (等待3秒)                  │                             │── 完成!更新状态
 │── GET /task/{task_id} ─────▶│◀──────────────────────────│
 │◀── {status: "success"} ────│                             │
 │                             │                             │
 │── 显示结果,刷新页面          │                             │

❓ 为什么需要这种架构?

  1. 避免HTTP超时:云API操作可能需要几分钟,HTTP请求默认60秒超时
  2. 避免Pod重启:K8s健康检查会认为长时间无响应的Pod已死亡
  3. 用户体验好:用户立即得到反馈"任务已启动",不用干等
  4. 可追踪:任意时刻都能查询任务进度

轮询的基本概念

什么是轮询?

轮询(Polling)= 定期询问

就像你每隔一段时间问一次:
"任务完成了吗?"
"还没完成?"
"完成了吗?"
"还没完成?"
...
直到得到"完成了!"的答案

为什么需要轮询?

问题:后端任务需要时间

前端发送请求 → 后端立即返回 task_id
              ↓
             后端在后台执行任务(可能需要几分钟)
              ↓
             任务完成

问题:前端如何知道任务何时完成?

解决方案对比

方案 说明 缺点
等待响应 前端一直等待后端完成 HTTP请求超时(60秒)
WebSocket 后端主动推送消息 需要额外配置,复杂度高
轮询 前端定期查询状态 简单可靠,适合你的场景

轮询的工作流程

1. 启动任务

// 前端发送请求
const response = await axios.post('/api/batch-add-execute', data);
// 后端立即返回: {task_id: "abc-123"}

// 开始轮询
pollBatchAddTaskStatus("abc-123");

2. 轮询循环

async pollBatchAddTaskStatus(taskId, attempt = 0) {
    // ① 等待一段时间(第一次等待2秒,之后每3秒)
    if (attempt === 0) {
        await new Promise(resolve => setTimeout(resolve, 2000));
    }
    
    // ② 查询任务状态
    const response = await axios.get(`/api/batch-add/task/${taskId}`);
    const task = response.data;
    
    // ③ 根据状态决定下一步
    if (task.status === 'running') {
        // 还在运行,3秒后继续查询
        setTimeout(() => {
            pollBatchAddTaskStatus(taskId, attempt + 1);
        }, 3000);
    } else if (task.status === 'success') {
        // 完成了!停止轮询,显示结果
        showResults(task);
    }
}

轮询的时间线

时间轴:
─────────────────────────────────────────────────────────
T+0s    前端发送请求,后端返回 task_id
        ↓
T+0s    开始轮询(第1次)
        ↓
T+2s    等待2秒后,查询状态 → "running"
        ↓
T+5s    等待3秒后,查询状态 → "running"
        ↓
T+8s    等待3秒后,查询状态 → "running"
        ↓
T+11s   等待3秒后,查询状态 → "running"
        ↓
T+14s   等待3秒后,查询状态 → "running"
        ↓
...     继续轮询...
        ↓
T+220s  等待3秒后,查询状态 → "success" ✅
        ↓
        停止轮询,显示结果
─────────────────────────────────────────────────────────

你的代码中的轮询实现

完整流程

// 第1步:执行批量添加
async executeBatchAddAdvanced() {
    // 发送请求
    const response = await axios.post('/api/aws/rules/batch-add-advanced-execute', data);
    
    // 获取 task_id,开始轮询
    if (response.data.task_id) {
        await this.pollBatchAddTaskStatus(response.data.task_id);
    }
}

// 第2步:轮询函数
async pollBatchAddTaskStatus(taskId, attempt = 0) {
    const maxAttempts = 200;      // 最多200次
    const pollInterval = 3000;    // 每3秒一次
    
    // ① 第一次延迟2秒
    if (attempt === 0) {
        await new Promise(resolve => setTimeout(resolve, 2000));
    }
    
    // ② 检查是否超时
    if (attempt >= maxAttempts) {
        ElMessage.error('任务执行超时');
        return;
    }
    
    // ③ 查询任务状态
    try {
        const response = await axios.get(`/api/batch-add/task/${taskId}`);
        const task = response.data;
        
        // ④ 根据状态处理
        if (task.status === 'running') {
            // 还在运行,继续轮询
            setTimeout(() => {
                this.pollBatchAddTaskStatus(taskId, attempt + 1);
            }, pollInterval);
            
        } else if (task.status === 'success') {
            // 完成!停止轮询
            this.batchAddAdvancedDialog.executing = false;
            this.batchAddAdvancedDialog.step = 3;
            await this.searchSecurityGroups();
            
        } else if (task.status === 'failed') {
            // 失败!停止轮询
            this.batchAddAdvancedDialog.executing = false;
            ElMessage.error(task.message);
        }
    } catch (error) {
        // 网络错误,继续重试
        setTimeout(() => {
            this.pollBatchAddTaskStatus(taskId, attempt + 1);
        }, pollInterval);
    }
}

轮询的关键参数

1. 轮询间隔(pollInterval)

const pollInterval = 3000; // 每3秒查询一次

为什么是3秒?

  • 太短(1秒):服务器压力大,浪费资源
  • 太长(10秒):用户体验差,感觉卡顿
  • 3秒:平衡用户体验和服务器压力

2. 最大尝试次数(maxAttempts)

const maxAttempts = 200; // 最多200次
// 200次 × 3秒 = 600秒 = 10分钟

为什么是200次?

  • 防止无限轮询
  • 给任务足够时间(10分钟)
  • 超时后提示用户

3. 初始延迟

if (attempt === 0) {
    await new Promise(resolve => setTimeout(resolve, 2000));
}

为什么第一次延迟2秒?

  • 给后端初始化任务的时间
  • 避免立即查询时任务还没创建

轮询的状态判断

状态流转

┌─────────┐
│ running │ ← 任务执行中,继续轮询
└─────────┘
    ↓
┌─────────┐
│ success │ ← 任务成功,停止轮询,显示结果
└─────────┘
    ↓
┌─────────┐
│ failed  │ ← 任务失败,停止轮询,显示错误
└─────────┘

代码中的判断逻辑

if (task.status === 'running') {
    // 继续轮询
    setTimeout(() => pollBatchAddTaskStatus(...), 3000);
    
} else if (task.status === 'success' || task.status === 'partial_success') {
    // 停止轮询,显示结果
    showResults();
    
} else if (task.status === 'failed') {
    // 停止轮询,显示错误
    showError();
}

轮询 vs 其他方案

方案对比

方案 实现难度 实时性 服务器压力 适用场景
轮询 简单 中等(3秒延迟) 中等 你的场景
WebSocket 复杂 高(实时) 聊天、实时通知
Server-Sent Events 中等 高(实时) 实时推送
长轮询 中等 高(实时) 需要实时性

为什么选择轮询?

  • 实现简单
  • 不需要额外配置
  • 3秒延迟可接受
  • 适合你的批量任务场景

轮询的优化技巧

1. 指数退避(你的代码没有用,但可以优化)

// 当前:固定3秒间隔
setTimeout(() => poll(...), 3000);

// 优化:逐渐增加间隔
const delay = Math.min(3000 * Math.pow(1.5, attempt), 30000);
setTimeout(() => poll(...), delay);
// 第1次:3秒
// 第2次:4.5秒
// 第3次:6.75秒
// ...

2. 错误重试(你的代码已有)

catch (error) {
    // 网络错误,继续重试
    setTimeout(() => {
        this.pollBatchAddTaskStatus(taskId, attempt + 1);
    }, pollInterval);
}

3. 超时保护(你的代码已有)

if (attempt >= maxAttempts) {
    ElMessage.error('任务执行超时');
    return;
}

轮询的完整示例

实际执行过程

// T+0s: 开始轮询
pollBatchAddTaskStatus("task-123", 0)
  → 等待2秒
  → GET /api/batch-add/task/task-123
  → 返回: {status: "running"}setTimeout(() => poll("task-123", 1), 3000)

// T+5s: 第2次轮询
pollBatchAddTaskStatus("task-123", 1)GET /api/batch-add/task/task-123
  → 返回: {status: "running"}setTimeout(() => poll("task-123", 2), 3000)

// T+8s: 第3次轮询
pollBatchAddTaskStatus("task-123", 2)GET /api/batch-add/task/task-123
  → 返回: {status: "running"}setTimeout(() => poll("task-123", 3), 3000)

// ... 继续轮询 ...

// T+220s: 第N次轮询
pollBatchAddTaskStatus("task-123", N)GET /api/batch-add/task/task-123
  → 返回: {status: "success", execution_stats: {...}}
  → 停止轮询,显示结果 ✅

总结

轮询的思路:

  1. 定期查询:每3秒查询一次任务状态
  2. 状态判断:根据状态决定继续或停止
  3. 超时保护:最多查询200次(10分钟)
  4. 错误重试:网络错误时继续重试
  5. 结果展示:任务完成时停止轮询并显示结果

优点:

  • 实现简单
  • 可靠性高
  • 适合批量任务场景

缺点:

  • 有延迟(最多3秒)
  • 增加服务器请求数

在你的场景中,轮询是合适的选择。

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐