最近开发一个类似于校园社区的功能,上大学的时候就卡在了内容审核这一步😫——现在一起看看我是怎么同时处理文字和文件的内容审核!

一个资源贡献型社区的内容审核设计实录:为什么文字和文件要走完全不同的审核路径,以及如何保证异步审核的可靠性。

技术栈:Python · FastAPI · PostgreSQL · Azure Content Safety · Docker


关于 Azure Content Safety (Azure 内容安全)

  1. 多维度的评分体系

不同于简单的“黑名单”匹配,Azure 采用的是 严重程度级别 (Severity Levels) 评分模型(0, 2, 4, 6):

  • Hate (仇恨内容)
  • Self-Harm (自我伤害)
  • Sexual (色情内容)
  • Violence (暴力内容)

优势:你可以针对不同版块设置不同的阈值。例如,“资源下载区”对暴力的容忍度可以设为 2,而“吐槽闲聊区”可以放宽到 4。

  1. 多语言原生支持
    对于校园社区,学生可能会中英混发(中英夹杂)。Azure 的模型是多语言原生的,不需要先翻译再审核,这大大降低了语义丢失的风险。

  2. 实时分析与批量分析
    Text Analysis API:适用于你的同步文字审核。

Image Analysis API:如果以后你的社区支持发图,Azure 同样可以检测图片中的成人内容、文字(OCR)甚至是违规标识。

但当我真正开始设计的时候,发现一件事:"内容审核"不是一个问题,它至少是两个完全不同的问题。


核心判断:文字和文件不能用同一套策略

发一篇帖子,涉及两类内容:

  • 文字内容:标题(title)和正文(body)
  • 文件附件:PDF、PPTX、DOCX、XLSX、TXT、MD 等

表面上都是"审核",但它们有一个本质的差异:

文字内容 文件附件
内容获取 直接可读 需要先解析提取
处理耗时 毫秒级 不确定,可能数秒
审核策略 同步,发帖前拦截 异步,发帖后 hidden

如果把文件也做成同步审核,用户点击发布之后要等文件解析完、传给 AI 审核、拿到结果,整个过程可能要好几秒。这对发帖体验是不可接受的。

所以这是一个主动的设计决策,而不是妥协:文字快,就同步拦截;文件慢,就异步处理,发帖成功但文件暂时不可见。


文字审核:同步,发帖前拦截

用户点击发布时,FastAPI 后端在写库之前先调用 Azure Content Safety:

# 发帖接口,简化版
async def create_post(post_data: PostCreate, db: Session):
    # 第一步:先审核文字内容
    text_to_check = f"{post_data.title}\n{post_data.body}"
    result = await content_safety.analyze_text(text_to_check)

    if result.severity > THRESHOLD:
        # 审核不通过,直接拦截,不写库
        raise HTTPException(status_code=400, detail="内容包含违规信息,请修改后重新发布")

    # 审核通过,正常写库
    post = Post(**post_data.dict())
    db.add(post)
    db.commit()
    return post

逻辑很直接:审核不通过就返回错误,用户看到提示修改内容再发。没有中间状态,没有异步,用户体验上就是"发失败了"。

这套流程能成立,前提是文字审核的延迟足够低。Azure Content Safety 的文本分析通常在几百毫秒以内,加在发帖流程里用户基本感知不到。


文件审核:异步,发帖后 hidden

文件走的是完全不同的路径。

用户发帖时,文件先上传到对象存储,帖子正常发布成功,但文件处于 hidden 状态,其他用户看不到。同时,一条审核任务写入 task_queue,由后台 Worker 异步处理。

async def create_post_with_files(post_data: PostCreate, files: List[UploadFile], db: Session):
    # 文字审核(同步,同上)
    await check_text_content(post_data.title, post_data.body)

    # 帖子正常写库
    post = Post(**post_data.dict())
    db.add(post)

    # 文件上传,初始状态为 hidden
    for file in files:
        file_key = await blob_store.upload(file)
        attachment = Attachment(
            post_id=post.id,
            file_key=file_key,
            status="hidden"  # 审核完成前不可见
        )
        db.add(attachment)

        # 写入审核任务
        task = TaskQueue(
            task_type="file_moderation",
            payload={"attachment_id": attachment.id}
        )
        db.add(task)

    db.commit()
    # 发帖成功,文件审核在后台进行
    return post

用户发帖成功,文件显示为"审核中",审核通过后自动变为可见。这套逻辑对用户是透明的,体验上比卡住等待要好得多。


文件解析管道:格式归一化是关键

文件审核比文字审核复杂的地方,不只是异步,还有格式的多样性

Azure Content Safety 的文本分析接口只接受纯文本,但用户上传的是 PDF、PPTX、DOCX、XLSX 等各种格式。所以 Worker 要做的第一件事,是把这些文件统一提取成文本,再送给 AI 审核。

# 不同格式,用不同的库提取文本
def extract_text(file_path: str, file_type: str) -> str:
    if file_type in ("txt", "md"):
        return open(file_path).read()

    elif file_type == "pdf":
        import pdfplumber
        with pdfplumber.open(file_path) as pdf:
            return "\n".join(page.extract_text() or "" for page in pdf.pages)

    elif file_type == "docx":
        from docx import Document
        doc = Document(file_path)
        return "\n".join(p.text for p in doc.paragraphs)

    elif file_type == "pptx":
        from pptx import Presentation
        prs = Presentation(file_path)
        texts = []
        for slide in prs.slides:
            for shape in slide.shapes:
                if shape.has_text_frame:
                    texts.append(shape.text_frame.text)
        return "\n".join(texts)

    elif file_type == "xlsx":
        import openpyxl
        wb = openpyxl.load_workbook(file_path, read_only=True)
        texts = []
        for ws in wb.worksheets:
            for row in ws.iter_rows(values_only=True):
                texts.append(" ".join(str(c) for c in row if c))
        return "\n".join(texts)

提取出文本之后,剩下的就和文字审核一样了:送给 Azure Content Safety,拿到评分,决定是通过还是拒绝。

这一层格式归一化,是文件审核管道里最容易被忽视、但实际上最容易出问题的地方。不同版本的 DOCX、加密的 PDF、空白的 XLSX 工作表,都可能让提取失败,需要做好异常处理和降级策略。


异步可靠性:task_queue 的设计

异步审核的核心问题是:任务不能丢。

Worker 挂了、重启了、处理到一半崩了,任务应该能被重新捡起来,而不是悄无声息地消失。

我没有引入 Redis 或 RabbitMQ,用 PostgreSQL 的 task_queue 表直接承担这个职责:

CREATE TABLE task_queue (
    id          UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    task_type   VARCHAR NOT NULL,
    payload     JSONB NOT NULL,
    status      VARCHAR NOT NULL DEFAULT 'pending',
    -- pending / processing / completed / failed
    retry_count INT NOT NULL DEFAULT 0,
    next_run_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    created_at  TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

Worker 取任务时用 SKIP LOCKED,确保多个 Worker 并发时不会重复消费同一个任务:

async def dequeue_next_task(db) -> Optional[Task]:
    return db.execute("""
        SELECT * FROM task_queue
        WHERE status IN ('pending', 'failed')
          AND next_run_at <= NOW()
        ORDER BY created_at ASC
        LIMIT 1
        FOR UPDATE SKIP LOCKED
    """).first()

任务失败时,更新 retry_count 并设置 next_run_at 为未来某个时间,实现自动重试:

async def mark_failed(task_id, db):
    db.execute("""
        UPDATE task_queue
        SET status = 'failed',
            retry_count = retry_count + 1,
            next_run_at = NOW() + INTERVAL '5 minutes'
        WHERE id = :id
    """, {"id": task_id})

数据库就是任务的唯一真相,任何时候 Worker 重启,未完成的任务都会被重新捡起。


从轮询到 LISTEN/NOTIFY:减少无效等待

最初的 Worker 实现是最简单的轮询:

while running:
    task = await dequeue_next_task(db)
    if task:
        await process(task)
    else:
        await asyncio.sleep(2)  # 没任务就等 2 秒

这带来一个问题:高峰期任务来了要等最多 2 秒,低峰期每 2 秒跑一次空查询。

改进方案是利用 PostgreSQL 原生的 LISTEN/NOTIFY,不引入任何新依赖:

-- 有任务写入时,触发器自动发通知
CREATE OR REPLACE FUNCTION notify_new_task()
RETURNS TRIGGER AS $$
BEGIN
  PERFORM pg_notify('new_task', NEW.id::text);
  RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER task_queue_notify
AFTER INSERT ON task_queue
FOR EACH ROW EXECUTE FUNCTION notify_new_task();
# Worker 改为监听通知,而不是轮询
listen_conn = await asyncpg.connect(DSN)
await listen_conn.execute("LISTEN new_task;")

async def on_notify(conn, pid, channel, payload):
    # 收到通知立刻 drain 队列,加上限防止 listener 被长期占用
    for _ in range(MAX_DRAIN):
        task = await dequeue_next_task()
        if not task:
            break
        await process(task)

await listen_conn.add_listener('new_task', on_notify)

这里有一个重要原则:NOTIFY 只是唤醒信号,不是任务真相。

Worker 收到通知后,还是要去 task_queue 表里用 SKIP LOCKED 取任务,而不是直接相信 NOTIFY 的 payload。这样即使 NOTIFY 漏发了(listener 短暂断线),任务也不会丢,只是晚一点被 30 秒的兜底轮询捡起来:

# 兜底轮询,防止 listener 断线期间漏任务
async def fallback_poll():
    while running:
        await asyncio.sleep(30)
        await drain_queue()

最终效果:低峰期完全无空转,高峰期毫秒级响应,任务不丢。


完整架构

用户发帖
  ├── 文字内容(title / body)
  │       ↓
  │   Azure Content Safety(同步)
  │       ↓
  │   通过 → 写库    不通过 → 返回错误给用户
  │
  └── 文件附件(pdf / docx / pptx / xlsx / txt / md)
          ↓
      上传 Blob Store,状态 = hidden
          ↓
      task_queue 写入审核任务
          ↓
      pg_notify 唤醒 Worker
          ↓
      Worker: 下载文件 → 提取文本 → Azure Content Safety
          ↓
      通过 → 状态改为 visible    不通过 → 状态改为 rejected
          ↓
      实时推送通知用户(Centrifugo)

小结

回到最开始的问题:文件如何审核?。

文字和文件的差异——耗时、格式、用户体验容忍度——决定了它们必须走不同的路径。同步和异步不是技术偏好,是从业务约束推导出来的结论。

异步的复杂性主要在可靠性上:任务不能丢,重试要有机制,唤醒要及时。PostgreSQL 的 task_queue + LISTEN/NOTIFY 在不引入新基础设施的前提下,把这几点都覆盖了。


Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐