每个科班的同学是不是都想做校园社区app?
本文介绍了校园社区内容审核系统的设计实现,重点阐述了文字和文件审核的不同处理策略。作者采用 Azure Content Safety 作为审核引擎,其多维度评分体系(仇恨、自残、色情、暴力内容)支持灵活配置不同版块的容忍阈值。核心创新点在于区分处理:文字内容采用同步审核(发帖前拦截),而文件附件采用异步审核(发帖后隐藏)。系统使用 PostgreSQL 的任务队列表保证异步可靠性,通过格式归一化管
最近开发一个类似于校园社区的功能,上大学的时候就卡在了内容审核这一步😫——现在一起看看我是怎么同时处理文字和文件的内容审核!
一个资源贡献型社区的内容审核设计实录:为什么文字和文件要走完全不同的审核路径,以及如何保证异步审核的可靠性。
技术栈:Python · FastAPI · PostgreSQL · Azure Content Safety · Docker
关于 Azure Content Safety (Azure 内容安全)
- 多维度的评分体系
不同于简单的“黑名单”匹配,Azure 采用的是 严重程度级别 (Severity Levels) 评分模型(0, 2, 4, 6):
- Hate (仇恨内容)
- Self-Harm (自我伤害)
- Sexual (色情内容)
- Violence (暴力内容)
优势:你可以针对不同版块设置不同的阈值。例如,“资源下载区”对暴力的容忍度可以设为 2,而“吐槽闲聊区”可以放宽到 4。
-
多语言原生支持
对于校园社区,学生可能会中英混发(中英夹杂)。Azure 的模型是多语言原生的,不需要先翻译再审核,这大大降低了语义丢失的风险。 -
实时分析与批量分析
Text Analysis API:适用于你的同步文字审核。
Image Analysis API:如果以后你的社区支持发图,Azure 同样可以检测图片中的成人内容、文字(OCR)甚至是违规标识。
但当我真正开始设计的时候,发现一件事:"内容审核"不是一个问题,它至少是两个完全不同的问题。
核心判断:文字和文件不能用同一套策略
发一篇帖子,涉及两类内容:
- 文字内容:标题(title)和正文(body)
- 文件附件:PDF、PPTX、DOCX、XLSX、TXT、MD 等
表面上都是"审核",但它们有一个本质的差异:
| 文字内容 | 文件附件 | |
|---|---|---|
| 内容获取 | 直接可读 | 需要先解析提取 |
| 处理耗时 | 毫秒级 | 不确定,可能数秒 |
| 审核策略 | 同步,发帖前拦截 | 异步,发帖后 hidden |
如果把文件也做成同步审核,用户点击发布之后要等文件解析完、传给 AI 审核、拿到结果,整个过程可能要好几秒。这对发帖体验是不可接受的。
所以这是一个主动的设计决策,而不是妥协:文字快,就同步拦截;文件慢,就异步处理,发帖成功但文件暂时不可见。
文字审核:同步,发帖前拦截
用户点击发布时,FastAPI 后端在写库之前先调用 Azure Content Safety:
# 发帖接口,简化版
async def create_post(post_data: PostCreate, db: Session):
# 第一步:先审核文字内容
text_to_check = f"{post_data.title}\n{post_data.body}"
result = await content_safety.analyze_text(text_to_check)
if result.severity > THRESHOLD:
# 审核不通过,直接拦截,不写库
raise HTTPException(status_code=400, detail="内容包含违规信息,请修改后重新发布")
# 审核通过,正常写库
post = Post(**post_data.dict())
db.add(post)
db.commit()
return post
逻辑很直接:审核不通过就返回错误,用户看到提示修改内容再发。没有中间状态,没有异步,用户体验上就是"发失败了"。
这套流程能成立,前提是文字审核的延迟足够低。Azure Content Safety 的文本分析通常在几百毫秒以内,加在发帖流程里用户基本感知不到。
文件审核:异步,发帖后 hidden
文件走的是完全不同的路径。
用户发帖时,文件先上传到对象存储,帖子正常发布成功,但文件处于 hidden 状态,其他用户看不到。同时,一条审核任务写入 task_queue,由后台 Worker 异步处理。
async def create_post_with_files(post_data: PostCreate, files: List[UploadFile], db: Session):
# 文字审核(同步,同上)
await check_text_content(post_data.title, post_data.body)
# 帖子正常写库
post = Post(**post_data.dict())
db.add(post)
# 文件上传,初始状态为 hidden
for file in files:
file_key = await blob_store.upload(file)
attachment = Attachment(
post_id=post.id,
file_key=file_key,
status="hidden" # 审核完成前不可见
)
db.add(attachment)
# 写入审核任务
task = TaskQueue(
task_type="file_moderation",
payload={"attachment_id": attachment.id}
)
db.add(task)
db.commit()
# 发帖成功,文件审核在后台进行
return post
用户发帖成功,文件显示为"审核中",审核通过后自动变为可见。这套逻辑对用户是透明的,体验上比卡住等待要好得多。
文件解析管道:格式归一化是关键
文件审核比文字审核复杂的地方,不只是异步,还有格式的多样性。
Azure Content Safety 的文本分析接口只接受纯文本,但用户上传的是 PDF、PPTX、DOCX、XLSX 等各种格式。所以 Worker 要做的第一件事,是把这些文件统一提取成文本,再送给 AI 审核。
# 不同格式,用不同的库提取文本
def extract_text(file_path: str, file_type: str) -> str:
if file_type in ("txt", "md"):
return open(file_path).read()
elif file_type == "pdf":
import pdfplumber
with pdfplumber.open(file_path) as pdf:
return "\n".join(page.extract_text() or "" for page in pdf.pages)
elif file_type == "docx":
from docx import Document
doc = Document(file_path)
return "\n".join(p.text for p in doc.paragraphs)
elif file_type == "pptx":
from pptx import Presentation
prs = Presentation(file_path)
texts = []
for slide in prs.slides:
for shape in slide.shapes:
if shape.has_text_frame:
texts.append(shape.text_frame.text)
return "\n".join(texts)
elif file_type == "xlsx":
import openpyxl
wb = openpyxl.load_workbook(file_path, read_only=True)
texts = []
for ws in wb.worksheets:
for row in ws.iter_rows(values_only=True):
texts.append(" ".join(str(c) for c in row if c))
return "\n".join(texts)
提取出文本之后,剩下的就和文字审核一样了:送给 Azure Content Safety,拿到评分,决定是通过还是拒绝。
这一层格式归一化,是文件审核管道里最容易被忽视、但实际上最容易出问题的地方。不同版本的 DOCX、加密的 PDF、空白的 XLSX 工作表,都可能让提取失败,需要做好异常处理和降级策略。
异步可靠性:task_queue 的设计
异步审核的核心问题是:任务不能丢。
Worker 挂了、重启了、处理到一半崩了,任务应该能被重新捡起来,而不是悄无声息地消失。
我没有引入 Redis 或 RabbitMQ,用 PostgreSQL 的 task_queue 表直接承担这个职责:
CREATE TABLE task_queue (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
task_type VARCHAR NOT NULL,
payload JSONB NOT NULL,
status VARCHAR NOT NULL DEFAULT 'pending',
-- pending / processing / completed / failed
retry_count INT NOT NULL DEFAULT 0,
next_run_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
Worker 取任务时用 SKIP LOCKED,确保多个 Worker 并发时不会重复消费同一个任务:
async def dequeue_next_task(db) -> Optional[Task]:
return db.execute("""
SELECT * FROM task_queue
WHERE status IN ('pending', 'failed')
AND next_run_at <= NOW()
ORDER BY created_at ASC
LIMIT 1
FOR UPDATE SKIP LOCKED
""").first()
任务失败时,更新 retry_count 并设置 next_run_at 为未来某个时间,实现自动重试:
async def mark_failed(task_id, db):
db.execute("""
UPDATE task_queue
SET status = 'failed',
retry_count = retry_count + 1,
next_run_at = NOW() + INTERVAL '5 minutes'
WHERE id = :id
""", {"id": task_id})
数据库就是任务的唯一真相,任何时候 Worker 重启,未完成的任务都会被重新捡起。
从轮询到 LISTEN/NOTIFY:减少无效等待
最初的 Worker 实现是最简单的轮询:
while running:
task = await dequeue_next_task(db)
if task:
await process(task)
else:
await asyncio.sleep(2) # 没任务就等 2 秒
这带来一个问题:高峰期任务来了要等最多 2 秒,低峰期每 2 秒跑一次空查询。
改进方案是利用 PostgreSQL 原生的 LISTEN/NOTIFY,不引入任何新依赖:
-- 有任务写入时,触发器自动发通知
CREATE OR REPLACE FUNCTION notify_new_task()
RETURNS TRIGGER AS $$
BEGIN
PERFORM pg_notify('new_task', NEW.id::text);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER task_queue_notify
AFTER INSERT ON task_queue
FOR EACH ROW EXECUTE FUNCTION notify_new_task();
# Worker 改为监听通知,而不是轮询
listen_conn = await asyncpg.connect(DSN)
await listen_conn.execute("LISTEN new_task;")
async def on_notify(conn, pid, channel, payload):
# 收到通知立刻 drain 队列,加上限防止 listener 被长期占用
for _ in range(MAX_DRAIN):
task = await dequeue_next_task()
if not task:
break
await process(task)
await listen_conn.add_listener('new_task', on_notify)
这里有一个重要原则:NOTIFY 只是唤醒信号,不是任务真相。
Worker 收到通知后,还是要去 task_queue 表里用 SKIP LOCKED 取任务,而不是直接相信 NOTIFY 的 payload。这样即使 NOTIFY 漏发了(listener 短暂断线),任务也不会丢,只是晚一点被 30 秒的兜底轮询捡起来:
# 兜底轮询,防止 listener 断线期间漏任务
async def fallback_poll():
while running:
await asyncio.sleep(30)
await drain_queue()
最终效果:低峰期完全无空转,高峰期毫秒级响应,任务不丢。
完整架构
用户发帖
├── 文字内容(title / body)
│ ↓
│ Azure Content Safety(同步)
│ ↓
│ 通过 → 写库 不通过 → 返回错误给用户
│
└── 文件附件(pdf / docx / pptx / xlsx / txt / md)
↓
上传 Blob Store,状态 = hidden
↓
task_queue 写入审核任务
↓
pg_notify 唤醒 Worker
↓
Worker: 下载文件 → 提取文本 → Azure Content Safety
↓
通过 → 状态改为 visible 不通过 → 状态改为 rejected
↓
实时推送通知用户(Centrifugo)
小结
回到最开始的问题:文件如何审核?。
文字和文件的差异——耗时、格式、用户体验容忍度——决定了它们必须走不同的路径。同步和异步不是技术偏好,是从业务约束推导出来的结论。
异步的复杂性主要在可靠性上:任务不能丢,重试要有机制,唤醒要及时。PostgreSQL 的 task_queue + LISTEN/NOTIFY 在不引入新基础设施的前提下,把这几点都覆盖了。
更多推荐
所有评论(0)