本周小组完成了用户体系和错题本的部分基础开发,实现了数据的存储与一些基本功能。其中,我主要负责 Python 智能后端服务*的搭建。

目前的 AI 不仅仅是一个聊天框,我们采用了 “Java 业务层 + Python AI 层”的异构架构,通过 “RabbitMQ 消息队列”进行解耦,确保系统在处理复杂的 AI 分析时依然保持高并发和低延迟。

一、AI 服务核心与异步通信

为了支撑未来的“错题分析”和“个性化推荐”,我在 Python 端实现了以下核心组件:

  • 大模型接入:使用 `openai` 库兼容模式接入通义千问(Qwen),实现智能对话。
  • 消息队列消费者:通过 pika库监听 RabbitMQ,实时接收 Java 端发来的分析任务。
  • 会话记忆管理:在 Python 内存中维护多用户会话上下文,保证对话连贯性。

1. AI 核心服务类 

这是 Python 服务的“大脑”,负责与大模型交互。我封装了系统提示词(System Prompt),让 AI 始终扮演“学习助手”的角色。

from openai import OpenAI
from typing import Dict, List
from config.settings import settings

class AIService:
    def __init__(self):
        # 初始化兼容模式客户端
        self.client = OpenAI(
            api_key=settings.ai.api_key,
            base_url=settings.ai.base_url
        )
        # 设定 AI 角色:磨坊错题本智能助手
        self.system_prompt = """你是磨坊错题本的智能学习小助手。
        你可以为用户提供错题分析、知识点讲解以及个性化学习建议。"""
        
        # 简单的内存会话管理
        self.chat_memory: Dict[str, List[Dict]] = {}

    def chat(self, user_id: str, prompt: str) -> str:
        """处理用户对话请求"""
        messages = [{"role": "system", "content": self.system_prompt}]
        
        # 加载历史上下文(保留最近 10 轮)
        if user_id in self.chat_memory:
            messages.extend(self.chat_memory[user_id][-10:])
        
        messages.append({"role": "user", "content": prompt})
        
        # 调用大模型
        response = self.client.chat.completions.create(
            model=settings.ai.model,
            messages=messages,
            temperature=0.8
        )
        
        answer = response.choices[0].message.content
        
        # 更新记忆
        if user_id not in self.chat_memory:
            self.chat_memory[user_id] = []
        self.chat_memory[user_id].append({"role": "user", "content": prompt})
        self.chat_memory[user_id].append({"role": "assistant", "content": answer})
        
        return answer
```


2. RabbitMQ 消费者

这是连接 Java 和 Python 的桥梁。Python 服务启动后会一直监听队列,一旦 Java 端有错题数据需要同步或分析,Python 就会立即消费并处理。

import pika
import json
from config.settings import settings
from core.ai_service import ai_service

class MQConsumer:
    def __init__(self):
        # 建立 RabbitMQ 连接
        credentials = pika.PlainCredentials(settings.rabbitmq.username, settings.rabbitmq.password)
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(host=settings.rabbitmq.host, credentials=credentials)
        )
        self.channel = self.connection.channel()
        self.channel.queue_declare(queue='ai_task_queue', durable=True)

    def on_message(self, ch, method, properties, body):
        """接收到任务时的回调函数"""
        try:
            task = json.loads(body)
            task_type = task.get("type")
            user_id = str(task.get("user_id"))
            
            # 根据任务类型分发给不同的 AI 逻辑
            if task_type == "chat":
                result = ai_service.chat(user_id, task["prompt"])
            
            # 这里未来会扩展 analyze_errors (错题分析) 等任务
            # elif task_type == "analyze_errors": ...
            
            # 处理完成后,通过生产者将结果返回给 Java 端
            self._send_result(task.get("request_id"), result)
            
            ch.basic_ack(delivery_tag=method.delivery_tag)
        except Exception as e:
            print(f"任务处理失败:{str(e)}")

    def start(self):
        print("Python AI 服务已启动,正在监听任务队列...")
        self.channel.basic_consume(queue='ai_task_queue', on_message_callback=self.on_message)
        self.channel.start_consuming()
```

二、架构设计思路:为什么要用 Python + 消息队列?

在开发过程中,我们选择了Python而不是继续在 Java 中写 AI 逻辑,原因如下:

1.  生态优势:

Python 拥有 "LangChain"、"FAISS" 等强大的 AI 生态库。未来我们要实现的 "RAG"(检索增强生成)和 "向量数据库",在 Python 中可以较为简单的做到。


2.  异步解耦(RabbitMQ):

AI 分析(如分析全班同学的错题薄弱点)通常很耗时。如果 Java 同步等待,用户界面会卡死。而通过 RabbitMQ,Java 只需要把任务扔进队列就立即响应用户“分析已开始”。Python 在后台慢慢算,算完后通过另一个队列通知 Java。这种方式极大地提升了系统的吞吐量和用户体验。


3.  数据共享:

Python 服务通过配置直接连接与 Java 共享的 MySQL 数据库。这意味着 AI 可以实时读取最新的错题数据,无需额外的数据同步接口。

三、规划与总结

1.规划

下周我将重点推进 RAG (Retrieval-Augmented Generation)技术的落地:

  • 大模型接入:使用openai库兼容模式接入通义千问(Qwen),实现智能对话。构建本地知识库,利用 Python 将我们的教材、笔记切片并向量化,存入FAISS向量数据库。
  • 精准问答::当用户提问时,Python 会先在本地知识库中检索相关片段,再结合大模型生成答案。

下周即将实现的 RAG 检索逻辑示例

def rag_query(self, query: str):
    # 1. 从向量库检索最相关的 3 个知识片段
    docs = self.vector_store.similarity_search(query, k=3)
    context = "\n".join([doc.page_content for doc in docs])
    
    # 2. 将检索到的知识作为背景传给 AI
    prompt = f"参考资料:{context}\n问题:{query}"
    return ai_service.chat("system", prompt)
```

2. 总结

本周我完成了 Python AI 微服务的 搭建,打通了 RabbitMQ 消息通道和大模型对话链路。目前的架构已经具备了极强的扩展性,下周我们将正式进入“深度智能”阶段,让错题本真正做到智能分析和推题。

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐