从API到数据库,手把手教你打造一个能干活、能背锅的Agent

为什么Agent需要学这些?

你养的AI如果只会聊闲天,那它就是个“嘴炮王者”。真让它帮你干活——查订单、发消息、记用户偏好……它立马变人工智障

别慌!今天我们就给Agent装上三件套:API遥控器(RESTful/GraphQL)、实时通知系统(Webhook/SSE)、记忆仓库(SQL+Redis)。学完之后,你的Agent能从“键盘侠”进化成“全能打工人”。

准备好了吗?系好安全带,老司机要飙车了!

一、RESTful API 与 GraphQL 集成——让Agent学会“点外卖”和“收快递”

1.1 RESTful API:点套餐的快递小哥

通俗类比

  • GET:你打电话问快递员“我的包裹到哪了?”(只查不修改)

  • POST:你寄一个新包裹(创建资源)

  • PUT:你改收件地址(整体更新)

  • DELETE:你退货(删除资源)

  • 路径参数:快递单号 https://api.com/order/123456

  • 查询参数:筛选条件 ?page=1&size=10

  • 请求体:包裹里的东西(JSON数据)

  • Token认证:小区门禁卡,没有卡连门都进不去。

实战场景:电商商品查询 + 创建订单

代码示例1:GET请求(查商品列表 + 商品详情)
# -*- coding: utf-8 -*-
"""
场景:电商API对接 - 查询商品
API提供商:这里使用免费的FakeStore API(https://fakestoreapi.com)
不用注册,直接调,良心!
"""
import requests  # 专门发HTTP请求的库,相当于快递员的小三轮

# ------------------- 1. 查询商品列表(带查询参数) -------------------
def get_products(category=None, limit=10):
    """
    获取商品列表,支持按分类筛选和数量限制
    :param category: 分类名,如 "electronics", "jewelry"
    :param limit: 最多返回多少个商品
    """
    base_url = "https://fakestoreapi.com/products"
    
    # 构造查询参数:相当于在快递单上写“只要电子产品,只要10件”
    params = {}
    if category:
        # 如果指定分类,API路径变成 /products/category/xxx
        url = f"{base_url}/category/{category}"
    else:
        url = base_url
        params["limit"] = limit   # 查询参数加到问号后面
    
    # 发送GET请求:headers可以加User-Agent,有些API会检查
    headers = {"User-Agent": "MyAgent/1.0"}  # 假装自己是浏览器
    try:
        resp = requests.get(url, params=params, headers=headers, timeout=10)
        # timeout=10 表示最多等10秒,防止卡死(就像快递小哥等太久就不等了)
        
        # 检查HTTP状态码:200表示成功,404表示没找到,500表示服务器炸了
        if resp.status_code == 200:
            # .json() 自动把返回的JSON字符串解析成Python列表/字典
            products = resp.json()
            print(f"✅ 查到了 {len(products)} 个商品")
            return products
        else:
            print(f"❌ 请求失败,状态码:{resp.status_code}")
            return []
    except requests.exceptions.Timeout:
        print("⏰ 请求超时,快递小哥可能堵车了")
        return []
    except Exception as e:
        print(f"💥 未知错误:{e}")
        return []

# ------------------- 2. 查询单个商品(路径参数) -------------------
def get_product_detail(product_id):
    """路径参数直接拼在URL里,就像快递单号"""
    url = f"https://fakestoreapi.com/products/{product_id}"
    resp = requests.get(url, timeout=10)
    if resp.status_code == 200:
        product = resp.json()
        print(f"商品名:{product['title']}")
        print(f"价格:${product['price']}")
        return product
    else:
        print(f"商品{product_id}不存在或已下架")
        return None

# ------------------- 3. 创建订单(POST请求 + 请求体 + Token认证) -------------------
def create_order(user_token, product_id, quantity):
    """
    模拟创建订单,需要用户Token(就像门禁卡)
    真实场景中Token从登录接口获得,放在Header的Authorization里
    """
    url = "https://fakestoreapi.com/carts"  # 这是个假接口,仅演示结构
    
    # 请求体:要寄的“包裹”内容,格式通常是JSON
    order_data = {
        "userId": 1,           # 假设用户ID
        "date": "2025-04-16",
        "products": [{"productId": product_id, "quantity": quantity}]
    }
    
    # 请求头:告诉服务器我发的是JSON,并且带上Token
    headers = {
        "Content-Type": "application/json",   # 告诉服务器,我给你的是JSON格式
        "Authorization": f"Bearer {user_token}"   # Token认证标准写法
    }
    
    resp = requests.post(url, json=order_data, headers=headers, timeout=10)
    # 注意:用 json= 参数会自动序列化字典并设置Content-Type,比手动data=json.dumps()方便
    
    if resp.status_code == 201:   # 201表示资源创建成功
        print("🎉 订单创建成功!订单号:", resp.json().get("id"))
        return resp.json()
    else:
        print(f"❌ 创建失败:{resp.status_code} - {resp.text}")
        return None

# ------------------- 运行示例 -------------------
if __name__ == "__main__":
    # 先查商品列表
    products = get_products(category="electronics", limit=3)
    if products:
        first_id = products[0]["id"]
        # 再查详情
        get_product_detail(first_id)
    
    # 模拟Token(实际应从登录获取)
    fake_token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."
    create_order(fake_token, product_id=1, quantity=2)

避坑指南

  • 路径参数和查询参数别搞混:/products/123 是路径参数;/products?category=shoes 是查询参数。

  • Token不要写死在代码里!应该从配置或登录响应中获取。

  • 生产环境必须用环境变量存Token,千万别提交到Git。

1.2 GraphQL:点外卖只点自己爱吃的

类比:RESTful是套餐(服务器给你一堆字段,哪怕你只要一个名字);GraphQL是自助餐(你想吃啥就拿啥,只拿需要的,绝不浪费)。

核心原理:客户端发一个查询字符串(描述想要哪些字段),服务器返回精确的JSON。

对比

  • RESTful:多个端点(/users/123/users/123/posts)→ 可能需要多次请求。

  • GraphQL:一个端点(/graphql) → 一次请求拿所有想要的数据。

代码示例2:调用GraphQL API(查询用户信息 + 文章列表)

我们使用GitHub的GraphQL API(需要Token,但可以申请免费)。另一个示例用公共的“国家信息”API。

# -*- coding: utf-8 -*-
"""
场景1:使用GraphQL查询国家信息(公开API,无需Token)
API地址:https://countries.trevorblades.com
"""
import requests

def graphql_query_countries(continent_code="AS"):
    """
    查询亚洲国家列表,只返回国家名称和首都
    GraphQL查询像点菜:你明确写出要哪些字段
    """
    url = "https://countries.trevorblades.com/"
    
    # GraphQL查询字符串:花括号里就是你想要的“菜”
    query = """
    {
        continent(code: "%s") {
            name
            countries {
                name
                capital
            }
        }
    }
    """ % continent_code
    
    # 标准的GraphQL请求体格式
    payload = {
        "query": query,
        "variables": None   # 变量(如果有)可以放这里
    }
    
    headers = {"Content-Type": "application/json"}
    resp = requests.post(url, json=payload, headers=headers, timeout=10)
    
    if resp.status_code == 200:
        data = resp.json()
        # 结构:data.continent.countries
        countries = data["data"]["continent"]["countries"]
        for c in countries[:5]:
            print(f"国家:{c['name']},首都:{c.get('capital', '无')}")
        return data
    else:
        print(f"GraphQL查询失败:{resp.text}")
        return None

# 场景2:GitHub GraphQL示例(需Token,这里演示结构)
def graphql_github_user(username, token):
    """
    查询GitHub用户信息,只拿登录名、仓库名和星标数
    需要GitHub Personal Access Token(去Settings->Developer settings生成)
    """
    url = "https://api.github.com/graphql"
    query = """
    query($login: String!) {
        user(login: $login) {
            login
            name
            repositories(first: 5) {
                nodes {
                    name
                    stargazerCount
                }
            }
        }
    }
    """
    variables = {"login": username}
    payload = {"query": query, "variables": variables}
    headers = {
        "Authorization": f"Bearer {token}",
        "Content-Type": "application/json"
    }
    resp = requests.post(url, json=payload, headers=headers, timeout=15)
    if resp.status_code == 200:
        user_data = resp.json()["data"]["user"]
        print(f"用户:{user_data['login']},姓名:{user_data.get('name')}")
        for repo in user_data["repositories"]["nodes"]:
            print(f"仓库:{repo['name']},⭐ {repo['stargazerCount']}")
        return user_data
    else:
        print(f"错误:{resp.status_code} - {resp.text}")
        return None

if __name__ == "__main__":
    # 测试1:国家信息
    graphql_query_countries("AS")   # AS=亚洲
    
    # 测试2:GitHub(需要真实token,这里仅演示代码结构)
    # graphql_github_user("octocat", "ghp_your_token_here")

GraphQL的优点:一次请求拿多个资源,减少网络往返;字段按需索取,省流量。

缺点:缓存复杂(不像RESTful每个URL可以单独缓存),查询可能很深导致性能问题。

什么时候用GraphQL?

  • 移动端网络环境差,需要减少请求次数。

  • 前端需要灵活控制字段(比如不同页面展示不同信息)。

  • 后端有多个数据源聚合。

二、Webhook 与 SSE 实时响应——让Agent从“轮询”变“被叫”

2.1 Webhook:快递到站了,主动通知你

类比:你寄了个快递,不想一直刷物流页面。你给快递公司留了个电话,快递一到站就给你打电话——这就是Webhook。

流程:第三方系统(比如支付平台)发生事件(订单支付成功),主动向你的服务器(你提供的URL)发一个HTTP POST请求,带上事件数据。你收到后处理业务(比如更新订单状态)。

实战场景:接收支付回调、接收电商订单状态变更。

代码示例3:实现一个Webhook接收端点(使用Flask)
# -*- coding: utf-8 -*-
"""
场景:模拟支付平台回调通知
我们用Flask写一个简单的Webhook接收服务
需要安装:pip install flask
"""
from flask import Flask, request, jsonify
import hashlib
import hmac
import json

app = Flask(__name__)

# 假设这是你与第三方约定的签名密钥(像钥匙,双方都有)
WEBHOOK_SECRET = "my_super_secret_key_123"

def verify_signature(payload_body, signature_header, secret):
    """
    验证请求签名,防止伪造回调(就像快递员对暗号)
    常见方法:HMAC-SHA256
    """
    # 构造签名:用secret对请求体进行HMAC
    computed = hmac.new(
        secret.encode('utf-8'),
        payload_body,
        hashlib.sha256
    ).hexdigest()
    # 比较计算出的签名和头部的签名(应该用安全比较,这里简单演示)
    return hmac.compare_digest(computed, signature_header)

@app.route('/webhook/payment', methods=['POST'])
def payment_webhook():
    """
    支付结果回调接口
    第三方会POST到这里,携带订单信息和签名
    """
    # 1. 获取原始请求体(字节串)—— 签名计算需要原始数据
    raw_body = request.get_data()
    
    # 2. 获取签名头(不同平台头名称不同,常见 X-Signature)
    signature = request.headers.get('X-Signature', '')
    if not signature:
        return jsonify({"error": "Missing signature"}), 401
    
    # 3. 验证签名
    if not verify_signature(raw_body, signature, WEBHOOK_SECRET):
        print("❌ 签名验证失败,可能是伪造请求")
        return jsonify({"error": "Invalid signature"}), 403
    
    # 4. 解析JSON数据
    try:
        data = json.loads(raw_body)
    except:
        return jsonify({"error": "Invalid JSON"}), 400
    
    # 5. 处理业务逻辑(例如更新订单状态)
    order_id = data.get('order_id')
    status = data.get('status')   # 'paid', 'failed'
    print(f"收到支付回调:订单 {order_id} 状态 {status}")
    
    # 这里可以调用你的Agent更新数据库、发邮件等
    # 注意:回调可能重复发送,需要做幂等处理(比如检查订单状态是否已处理)
    
    # 6. 返回成功响应(告诉第三方“我收到了”,通常返回200)
    return jsonify({"code": 0, "message": "OK"}), 200

# 另一个Webhook场景:电商订单状态变更(发货通知)
@app.route('/webhook/order_shipped', methods=['POST'])
def order_shipped_webhook():
    """
    接收订单发货通知
    """
    raw_body = request.get_data()
    # 这里签名验证逻辑类似,省略
    data = json.loads(raw_body)
    order_id = data.get('order_id')
    tracking_number = data.get('tracking_number')
    print(f"📦 订单 {order_id} 已发货,运单号:{tracking_number}")
    # 可触发消息推送给用户
    return "OK", 200

if __name__ == '__main__':
    # 启动服务,监听在0.0.0.0:5000,外网可访问(需要内网穿透工具如ngrok)
    app.run(host='0.0.0.0', port=5000, debug=True)

避坑

  • 签名验证必须做!否则任何人都可以伪造回调搞乱你的系统。

  • 幂等性:同一个回调可能发多次(网络抖动),你的处理逻辑要能重复执行而不出错(比如先查订单状态,已处理就忽略)。

  • 异步处理:回调里不要做耗时操作,应该立即返回200,然后丢到消息队列慢慢处理,否则第三方会超时重试。

  • 内网穿透:本地开发时,第三方无法访问你的localhost,需要用ngrok、frp等工具暴露公网URL。

2.2 SSE:实时弹幕,主播主动推

类比:你在直播间,主播说话,你不需要一直刷新页面,主播一开口你就能听到——这就是Server-Sent Events(SSE)
对比WebSocket:WebSocket是双向高速通道(像视频通话),SSE是单向广播(像收音机)。如果只是服务器推送(比如实时股价、通知),SSE更简单。

实战场景:Agent实时接收后台任务进度、新订单提醒。

代码示例4:SSE服务器(Flask) + 客户端(HTML/JS)
# -*- coding: utf-8 -*-
"""
SSE服务器端:实时推送天气更新
使用Flask,需要安装 flask 和 flask_cors(跨域)
"""
from flask import Flask, Response, request
from flask_cors import CORS
import time
import json
import random

app = Flask(__name__)
CORS(app)  # 允许跨域,方便前端测试

def event_stream():
    """生成SSE事件流,这是一个生成器函数,yield每个事件"""
    while True:
        # 模拟实时数据:比如每2秒推送一条新订单通知
        time.sleep(2)
        # 构造一个随机订单消息
        order = {
            "order_id": random.randint(1000, 9999),
            "amount": round(random.uniform(10, 500), 2),
            "status": "new"
        }
        # SSE格式:data: 后面跟JSON字符串,以两个换行结束
        yield f"data: {json.dumps(order)}\n\n"

@app.route('/stream/orders')
def stream_orders():
    """SSE推送端点"""
    # 设置响应头,告诉浏览器这是SSE流
    return Response(
        event_stream(),
        mimetype="text/event-stream",   # 必须的类型
        headers={
            "Cache-Control": "no-cache",   # 禁止缓存
            "X-Accel-Buffering": "no"      # 防止nginx缓冲
        }
    )

# 另一个场景:实时推送Agent思考过程
def agent_thought_stream():
    """模拟Agent一步步思考并推送给前端"""
    steps = ["正在拆解问题...", "查询天气API...", "生成总结...", "写入文件...", "完成!"]
    for step in steps:
        time.sleep(1.5)
        yield f"data: {json.dumps({'step': step})}\n\n"

@app.route('/stream/agent-thought')
def stream_agent_thought():
    return Response(agent_thought_stream(), mimetype="text/event-stream")

if __name__ == '__main__':
    app.run(port=5000, debug=True)

客户端HTML代码(保存为sse_client.html,双击用浏览器打开):

<!DOCTYPE html>
<html>
<head><title>SSE实时订单</title></head>
<body>
<h2>📦 实时新订单推送</h2>
<ul id="orders"></ul>

<script>
// 创建EventSource对象,指向SSE接口
const evtSource = new EventSource("http://localhost:5000/stream/orders");
// 监听消息事件
evtSource.onmessage = function(event) {
    // event.data 就是服务器发来的字符串
    const order = JSON.parse(event.data);
    const li = document.createElement("li");
    li.textContent = `订单 ${order.order_id},金额 ${order.amount} 元,状态 ${order.status}`;
    document.getElementById("orders").appendChild(li);
};
// 处理错误(比如断线重连)
evtSource.onerror = function(e) {
    console.log("连接出错,尝试重连...");
};
</script>
</body>
</html>

SSE vs WebSocket

  • SSE:简单,基于HTTP,自动重连,只能服务器→客户端。适合通知、股票、日志流。

  • WebSocket:双向,持久连接,协议复杂。适合聊天、游戏。

避坑

  • SSE最大并发连接数受限于浏览器(通常6个),生产环境可以用nginx代理。

  • 如果客户端断开,服务器端的生成器函数会抛出异常,需要捕获并退出循环。

三、数据库对接——给Agent装上“仓库”和“冰箱”

3.1 SQL数据库(MySQL):大仓库,存所有正经东西

类比:数据库就像公司的大仓库,SQL就是仓库管理员的口令。

  • SELECT:你喊“把库存表里数量大于10的货拿出来”

  • INSERT:你喊“存一箱矿泉水到货架C”

  • UPDATE:你喊“把矿泉水价格改成2元”

  • DELETE:你喊“过期商品扔掉”

  • 参数绑定:相当于告诉管理员“我要找名字叫‘张三’的用户”,而不是“我要找名字叫'; DROP TABLE users;的人”,防止SQL注入(仓库管理员乱开门)。

实战场景:用户管理(增删改查) + 订单查询。

代码示例5:对接MySQL(使用pymysql)

首先安装:pip install pymysql(纯Python驱动,免编译)

# -*- coding: utf-8 -*-
"""
场景:用户管理模块(注册、登录、修改信息、删除)
数据库:MySQL(本地或云)
建表SQL(先手动执行):
CREATE DATABASE agent_demo;
USE agent_demo;
CREATE TABLE users (
    id INT AUTO_INCREMENT PRIMARY KEY,
    username VARCHAR(50) UNIQUE NOT NULL,
    password_hash VARCHAR(255) NOT NULL,
    email VARCHAR(100),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
"""
import pymysql
from pymysql.cursors import DictCursor   # 让查询结果返回字典,方便用列名访问

# 数据库连接配置(像仓库地址和钥匙)
DB_CONFIG = {
    "host": "localhost",      # 数据库IP
    "port": 3306,             # MySQL默认端口
    "user": "root",           # 用户名
    "password": "your_password",   # 密码,千万别提交到代码!
    "database": "agent_demo",
    "charset": "utf8mb4",
    "cursorclass": DictCursor      # 结果返回字典
}

def get_db_connection():
    """获取数据库连接(每次操作都新建连接,用完记得关)"""
    return pymysql.connect(**DB_CONFIG)

# ------------------- 1. 插入新用户(INSERT) -------------------
def create_user(username, password_hash, email):
    """注册用户,使用参数绑定防注入"""
    conn = get_db_connection()
    try:
        with conn.cursor() as cursor:
            # 参数绑定:用 %s 占位,然后传入元组,pymysql会正确转义
            sql = "INSERT INTO users (username, password_hash, email) VALUES (%s, %s, %s)"
            cursor.execute(sql, (username, password_hash, email))
        conn.commit()   # 提交事务(存到硬盘)
        print(f"✅ 用户 {username} 创建成功")
        return True
    except pymysql.err.IntegrityError as e:
        print(f"❌ 用户名已存在或邮箱重复:{e}")
        return False
    finally:
        conn.close()

# ------------------- 2. 查询用户(SELECT) -------------------
def get_user_by_username(username):
    """根据用户名查用户信息"""
    conn = get_db_connection()
    try:
        with conn.cursor() as cursor:
            sql = "SELECT id, username, email, created_at FROM users WHERE username = %s"
            cursor.execute(sql, (username,))
            user = cursor.fetchone()   # 获取一条记录,返回字典
            if user:
                print(f"找到用户:{user['username']},邮箱:{user['email']}")
            else:
                print("用户不存在")
            return user
    finally:
        conn.close()

# ------------------- 3. 更新用户邮箱(UPDATE) -------------------
def update_email(username, new_email):
    """修改用户邮箱"""
    conn = get_db_connection()
    try:
        with conn.cursor() as cursor:
            sql = "UPDATE users SET email = %s WHERE username = %s"
            affected = cursor.execute(sql, (new_email, username))
        conn.commit()
        if affected:
            print(f"✉️ 用户 {username} 邮箱已更新为 {new_email}")
        else:
            print("用户不存在,更新失败")
        return affected > 0
    finally:
        conn.close()

# ------------------- 4. 删除用户(DELETE) -------------------
def delete_user(username):
    """删除用户(慎用)"""
    conn = get_db_connection()
    try:
        with conn.cursor() as cursor:
            sql = "DELETE FROM users WHERE username = %s"
            affected = cursor.execute(sql, (username,))
        conn.commit()
        if affected:
            print(f"🗑️ 用户 {username} 已删除")
        else:
            print("用户不存在")
        return affected > 0
    finally:
        conn.close()

# ------------------- 5. 事务示例:批量插入订单(要么全成功,要么全回滚) -------------------
def batch_create_orders(orders):
    """
    orders: 列表,每个元素是 (user_id, product, amount)
    模拟批量下单,其中一个失败就全部取消
    """
    conn = get_db_connection()
    try:
        conn.begin()   # 开始事务
        with conn.cursor() as cursor:
            for order in orders:
                sql = "INSERT INTO orders (user_id, product, amount) VALUES (%s, %s, %s)"
                cursor.execute(sql, order)
        conn.commit()   # 全部成功才提交
        print("🎉 批量下单成功")
    except Exception as e:
        conn.rollback()  # 出错就回滚,就像没发生过
        print(f"💥 批量下单失败,已回滚:{e}")
    finally:
        conn.close()

# 运行示例
if __name__ == "__main__":
    # 注意:先确保数据库和表存在
    create_user("alice", "hashed_pw_123", "alice@example.com")
    user = get_user_by_username("alice")
    if user:
        update_email("alice", "newalice@example.com")
        # delete_user("alice")

避坑

  • SQL注入:永远不要拼接字符串!用参数绑定(%s 占位)。

  • 连接管理:用完必须close(),或者用with上下文自动关。

  • 事务:多个相关操作要包在事务里,保证原子性。

  • 密码存储:绝对不要存明文,要用bcrypt等哈希。

3.2 NoSQL(Redis):冰箱,放经常用的东西

类比:Redis就像你家的冰箱,牛奶、鸡蛋这些天天用的东西放冰箱,不用每次去大仓库(SQL)翻。冰箱还有“过期自动扔”的功能(TTL)。

实战场景:缓存商品详情、存储用户会话、Agent短期记忆。

代码示例6:对接Redis(缓存商品数据 + 存储用户Token)

安装:pip install redis

# -*- coding: utf-8 -*-
"""
场景1:缓存数据库查询结果(避免频繁查SQL)
场景2:存储用户登录状态(Session)
需要先启动Redis:redis-server
"""
import redis
import json
import time

# 连接Redis(默认本地,端口6379)
r = redis.Redis(
    host='localhost',
    port=6379,
    db=0,                # 数据库索引,默认0-15
    decode_responses=True  # 自动将返回的字节串解码为字符串
)

# ------------------- 场景1:缓存商品详情 -------------------
def get_product_from_db(product_id):
    """模拟从MySQL查询商品(很慢)"""
    print("🐢 查询数据库...")
    time.sleep(1)   # 假装慢查询
    return {"id": product_id, "name": "手机", "price": 2999, "stock": 100}

def get_product_with_cache(product_id):
    """先查Redis缓存,没有再去数据库,并写入缓存"""
    cache_key = f"product:{product_id}"
    # 从Redis取
    cached = r.get(cache_key)
    if cached:
        print("🎯 命中缓存,秒开!")
        return json.loads(cached)   # 存的是JSON字符串,要解析
    else:
        print("😭 缓存未命中,去数据库查")
        product = get_product_from_db(product_id)
        # 存入Redis,并设置过期时间(比如600秒)
        r.setex(cache_key, 600, json.dumps(product))
        print("📦 已写入缓存,有效期10分钟")
        return product

# ------------------- 场景2:存储用户Token(会话) -------------------
def store_user_session(user_id, token, expire_seconds=3600):
    """用户登录成功,将token存入Redis,设置过期时间"""
    session_key = f"session:{token}"
    # 可以用Hash存储更多信息,这里简单存user_id
    r.setex(session_key, expire_seconds, user_id)
    print(f"🔑 用户{user_id}的会话已存入,有效期{expire_seconds}秒")

def get_user_id_by_token(token):
    """根据token获取用户ID(验证登录)"""
    user_id = r.get(f"session:{token}")
    if user_id:
        print(f"✅ 有效token,用户ID:{user_id}")
        return user_id
    else:
        print("❌ token无效或已过期")
        return None

# ------------------- 场景3:Redis做分布式锁(防止重复处理) -------------------
def process_order(order_id):
    """模拟处理订单,用Redis锁防止并发重复处理"""
    lock_key = f"lock:order:{order_id}"
    # setnx(set if not exists)加锁,设置10秒自动释放(防止死锁)
    acquired = r.setnx(lock_key, "locked")
    if acquired:
        r.expire(lock_key, 10)   # 设置过期时间,防止死锁
        try:
            print(f"🔒 获得锁,处理订单{order_id}...")
            # 实际业务逻辑
            time.sleep(2)
            print(f"✅ 订单{order_id}处理完成")
        finally:
            r.delete(lock_key)   # 释放锁
            print("🔓 锁已释放")
    else:
        print(f"⚠️ 订单{order_id}正在被其他线程处理,稍后重试")

# ------------------- 运行 -------------------
if __name__ == "__main__":
    # 测试缓存
    print(get_product_with_cache(1))
    print(get_product_with_cache(1))   # 第二次应该命中缓存
    
    # 测试会话
    store_user_session(123, "token_abc")
    get_user_id_by_token("token_abc")
    get_user_id_by_token("fake_token")
    
    # 测试锁(多线程环境下,这里简单模拟)
    process_order(1001)
    process_order(1001)   # 第二次会失败

Redis常用数据类型

  • String:缓存值、计数器

  • Hash:存储对象(如用户信息)

  • List:消息队列

  • Set:标签、去重

  • Sorted Set:排行榜

缓存策略

  • 过期时间:必须设置,否则内存会爆。

  • 缓存更新:数据变更时要主动删除或更新缓存(比如修改商品价格后,r.delete("product:1"))。

  • 缓存穿透:查询不存在的数据,每次都穿透到DB。解决方案:缓存空值并设置短过期时间。

  • 缓存击穿:热点key过期瞬间大量请求打到DB。解决方案:互斥锁(如上面的分布式锁)。

四、总结与作业

恭喜你!你的Agent现在会:

  • 优雅地调用RESTful和GraphQL API(点外卖收快递)

  • 接收Webhook回调并验证签名(等快递通知)

  • 实时推送SSE消息(当主播)

  • 把重要数据存MySQL(大仓库)

  • 把热点数据放Redis(冰箱)

课后作业

  1. 用RESTful API对接一个真实电商接口(比如京东联盟),实现商品搜索。

  2. 用GraphQL查询GitHub的issues列表,只返回标题和作者。

  3. 实现一个Webhook接收GitHub的push事件,自动拉取代码(假装一下)。

  4. 用SSE做一个实时股票价格展示页面。

  5. 把用户登录状态同时存MySQL和Redis,实现带缓存的认证。

最后一句:代码就像乐高,照着图纸拼出来只是第一步。试着改参数、加功能、甚至故意搞破坏,才能真正理解它怎么工作的。遇到bug别慌,那是代码在教你成长。

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐