用户关系模块详解:从业务流程到技术实现

场景设定:用户小明关注用户小红

业务流程图

用户点击关注按钮
小明关注小红

RelationController接收请求

调用RelationService.follow
e:\download\zhiguang_be-main\src\main\java\com\tongji\relation\service\impl\RelationServiceImpl.java

Lua令牌桶限流
防刷

限流通过?

返回失败

插入关注关系
RelationMapper.insertFollowing

插入成功?

返回失败

写入Outbox事件
OutboxMapper.insert

返回成功给前端

Canal监听outbox表
CanalKafkaBridge.java

捕获行变更

转发到Kafka主题
canal-outbox

CanalOutboxConsumer消费事件
CanalOutboxConsumer.java

反序列化RelationEvent

调用RelationEventProcessor.process
RelationEventProcessor.java

Redis去重
防重复处理

插入粉丝关系
RelationMapper.insertFollower

更新Redis缓存
ZSet存储

更新用户计数
UserCounterService.increment

一、流程详细说明

1. 用户操作

用户小明在前端点击关注按钮,关注用户小红。

2. 接口接收

前端发送请求到关系控制器,控制器调用 RelationService.follow() 方法。

3. 服务处理

// RelationServiceImpl.java:86-107
@Override
@Transactional
public boolean follow(long fromUserId, long toUserId) {
    // Lua 脚本令牌桶限流
    Long ok = redis.execute(tokenScript, List.of("rl:follow:" + fromUserId), "100", "1");
    if (ok == 0L) {
        return false;
    }

    long id = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
    int inserted = mapper.insertFollowing(id, fromUserId, toUserId, 1);

    if (inserted > 0) {
        try {
            Long outId = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
            String payload = objectMapper.writeValueAsString(new RelationEvent("FollowCreated", fromUserId, toUserId, id));
            outboxMapper.insert(outId, "following", id, "FollowCreated", payload);
        } catch (Exception ignored) {}

        return true;
    }
    return false;
}

技术点

  • 使用 @Transactional 保证事务一致性
  • 使用 Lua 脚本实现令牌桶限流,防止刷关注
  • 插入关注关系到 following
  • 写入 Outbox 事件,触发后续的异步处理

Outbox 模式详解

好的,我来详细解释什么是 Outbox 模式,以及它在用户关系模块中的作用。

一、什么是 Outbox 模式?

1. 生活中的类比

想象你在一家餐厅点餐:

传统方式(直接调用)

  • 你告诉服务员要什么菜
  • 服务员立刻跑到厨房告诉厨师
  • 厨师开始做菜
  • 服务员站在厨房等菜做好
  • 服务员再把菜端给你

问题:如果厨师很忙,服务员要一直等,其他客人就没人服务了。

Outbox 方式(事件驱动)

  • 你告诉服务员要什么菜
  • 服务员把订单写在便签纸上,贴在厨房的订单板上
  • 服务员立刻回来,可以继续服务其他客人
  • 厨师有空时,从订单板上取订单做菜
  • 菜做好后,服务员端给你

优势:服务员不用等待,可以服务更多客人。

2. 技术定义

Outbox 模式是一种事件驱动架构模式,用于解决分布式系统中的数据一致性可靠性问题。

核心思想

  • 业务操作和事件发布在同一个事务中完成
  • 事件先持久化到数据库(Outbox 表)
  • 再通过 CDC(Change Data Capture)工具捕获变更
  • 最后转发到消息队列(如 Kafka)

二、为什么需要 Outbox 模式?

1. 传统方法的问题

场景:用户关注操作,需要:

  1. 写入关注关系到数据库
  2. 发送事件到 Kafka,通知其他服务更新粉丝关系

传统方法(直接发送 Kafka)

// 伪代码
@Transactional
public void follow(long fromUserId, long toUserId) {
    // 1. 写入数据库
    mapper.insertFollowing(id, fromUserId, toUserId, 1);
    
    // 2. 发送 Kafka 事件
    kafka.send("relation-events", event);  // 可能失败!
}

问题

  • 如果 Kafka 发送失败,数据库已经写入,数据不一致
  • 如果 Kafka 发送成功,但事务回滚,事件已经发出,数据不一致
  • 网络抖动可能导致消息丢失

2. Outbox 模式的解决方案

核心思想:将事件和业务操作放在同一个事务中

// 伪代码
@Transactional
public void follow(long fromUserId, long toUserId) {
    // 1. 写入业务数据
    mapper.insertFollowing(id, fromUserId, toUserId, 1);
    
    // 2. 写入 Outbox 表(同一个事务)
    outboxMapper.insert(outId, "following", id, "FollowCreated", payload);
    // 事务提交:业务数据和事件一起提交
}

后续处理

  • Canal 监听 outbox 表
  • 捕获 INSERT 事件
  • 转发到 Kafka
  • 消费者处理事件

三、项目中的 Outbox 实现

1. Outbox 表结构

CREATE TABLE outbox (
    id BIGINT PRIMARY KEY,           -- 事件ID
    aggregate_type VARCHAR(255),     -- 聚合类型,如"following"
    aggregate_id BIGINT,             -- 聚合ID,如关系记录ID
    type VARCHAR(255),               -- 事件类型,如"FollowCreated"
    payload TEXT,                    -- 事件负载(JSON)
    created_at TIMESTAMP(3)          -- 创建时间
);

2. 写入 Outbox

// RelationServiceImpl.java:97-102
if (inserted > 0) {
    try {
        Long outId = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
        String payload = objectMapper.writeValueAsString(new RelationEvent("FollowCreated", fromUserId, toUserId, id));
        outboxMapper.insert(outId, "following", id, "FollowCreated", payload);
    } catch (Exception ignored) {}
    return true;
}

关键点

  • 和业务操作在同一个事务中
  • 使用 @Transactional 注解
  • 如果业务操作失败,Outbox 也不会写入

3. OutboxMapper 实现

// OutboxMapper.java:21-25
int insert(@Param("id") Long id,
           @Param("aggregateType") String aggregateType,
           @Param("aggregateId") Long aggregateId,
           @Param("type") String type,
           @Param("payload") String payload);
<!-- OutboxMapper.xml:4-7 -->
<insert id="insert">
    INSERT INTO outbox (id, aggregate_type, aggregate_id, type, payload, created_at)
    VALUES (#{id}, #{aggregateType}, #{aggregateId}, #{type}, #{payload}, NOW(3))
</insert>

4. Canal 监听与转发

// CanalKafkaBridge.java:106-161
while (running) {
    // 拉取 Canal 消息
    Message message = connector.getWithoutAck(batchSize);
    
    for (CanalEntry.Entry entry : message.getEntries()) {
        // 解析行变更
        CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
        
        // 只处理 INSERT/UPDATE
        if (eventType != CanalEntry.EventType.INSERT && eventType != CanalEntry.EventType.UPDATE) {
            continue;
        }
        
        // 提取 payload 字段
        for (CanalEntry.Column col : rowData.getAfterColumnsList()) {
            if ("payload".equalsIgnoreCase(col.getName())) {
                rowNode.put("payload", col.getValue());
            }
        }
        
        // 发送到 Kafka
        String json = objectMapper.writeValueAsString(msgNode);
        kafka.send(OutboxTopics.CANAL_OUTBOX, json);
    }
    
    // 确认位点
    connector.ack(batchId);
}

5. 消费 Kafka 事件

// CanalOutboxConsumer.java:39-58
@KafkaListener(topics = OutboxTopics.CANAL_OUTBOX, groupId = "relation-outbox-consumer")
public void onMessage(String message, Acknowledgment ack) {
    List<JsonNode> rows = OutboxMessageUtil.extractRows(objectMapper, message);
    for (JsonNode row : rows) {
        JsonNode payloadNode = row.get("payload");
        RelationEvent evt = objectMapper.readValue(payloadNode.asText(), RelationEvent.class);
        processor.process(evt);  // 处理事件
    }
    ack.acknowledge();  // 确认消费
}

四、Outbox 模式的优势

1. 数据一致性

  • 业务操作和事件写入在同一个事务中
  • 要么都成功,要么都失败
  • 不会出现"业务成功但事件丢失"的情况

2. 可靠性

  • 事件持久化到数据库,不会丢失
  • Canal 捕获 binlog,保证至少一次语义
  • 支持重试和重放

3. 解耦系统

  • 业务服务不需要直接调用 Kafka
  • 通过 Outbox 表解耦
  • 消费者可以独立部署和扩展

4. 可追溯性

  • 所有事件都有记录
  • 支持审计和问题排查
  • 可以重放历史事件

五、Outbox 模式 vs 传统方法

特性 传统方法(直接发送 Kafka) Outbox 模式
数据一致性 难以保证 强一致性(同一事务)
可靠性 可能丢失消息 持久化到数据库,不会丢失
事务管理 复杂(跨系统事务) 简单(单数据库事务)
系统耦合 高(直接依赖 Kafka) 低(通过数据库解耦)
可追溯性 好(有事件记录)
性能 快(直接发送) 稍慢(多一层数据库)

六、Outbox 模式的适用场景

  1. 需要强一致性的场景:业务操作和事件必须一起成功或失败
  2. 高可靠性要求:事件不能丢失
  3. 分布式事务:跨多个服务的操作
  4. 事件溯源:需要记录所有变更历史

七、总结

Outbox 模式是一种解决分布式系统中数据一致性和可靠性问题的有效方案。

核心流程

  1. 业务操作和事件写入在同一个事务中
  2. 事件持久化到 Outbox 表
  3. Canal 捕获 binlog,转发到 Kafka
  4. 消费者处理事件

关键优势

  • 强一致性
  • 高可靠性
  • 系统解耦
  • 可追溯性

在用户关系模块中,Outbox 模式保证了关注操作和后续的粉丝关系更新、缓存更新、计数更新的一致性,是整个系统可靠性的重要保障。

4. 限流处理

-- RelationServiceImpl.java:411-429
local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local rate = tonumber(ARGV[2])
local now = redis.call('TIME')[1]
local last = redis.call('HGET', key, 'last')
local tokens = redis.call('HGET', key, 'tokens')
if not last then last = now; tokens = capacity end
local elapsed = tonumber(now) - tonumber(last)
local add = elapsed * rate
tokens = math.min(capacity, tonumber(tokens) + add)
if tokens < 1 then redis.call('HSET', key, 'last', now); redis.call('HSET', key, 'tokens', tokens); return 0 end
tokens = tokens - 1
redis.call('HSET', key, 'last', now)
redis.call('HSET', key, 'tokens', tokens)
redis.call('PEXPIRE', key, 60000)
return 1

技术点

  • 令牌桶算法:控制关注操作的频率
  • 容量 100,速率 1:最多 100 次关注,每秒恢复 1 个令牌
  • 防止恶意用户刷关注,保护系统

5. 写入关注关系

// RelationMapper.java:24-27
int insertFollowing(@Param("id") Long id,
                    @Param("fromUserId") Long fromUserId,
                    @Param("toUserId") Long toUserId,
                    @Param("relStatus") Integer relStatus);
<!-- RelationMapper.xml:4-8 -->
<insert id="insertFollowing">
    INSERT INTO following (id, from_user_id, to_user_id, rel_status, created_at, updated_at)
    VALUES (#{id}, #{fromUserId}, #{toUserId}, #{relStatus}, NOW(3), NOW(3))
    ON DUPLICATE KEY UPDATE rel_status=VALUES(rel_status), updated_at=VALUES(updated_at)
</insert>

技术点

  • 使用 ON DUPLICATE KEY UPDATE 处理重复关注
  • 逻辑删除:rel_status=1 表示有效,0 表示无效
  • 记录创建时间和更新时间

6. 写入 Outbox 事件

// OutboxMapper.java:21-25
int insert(@Param("id") Long id,
           @Param("aggregateType") String aggregateType,
           @Param("aggregateId") Long aggregateId,
           @Param("type") String type,
           @Param("payload") String payload);
<!-- OutboxMapper.xml:4-7 -->
<insert id="insert">
    INSERT INTO outbox (id, aggregate_type, aggregate_id, type, payload, created_at)
    VALUES (#{id}, #{aggregateType}, #{aggregateId}, #{type}, #{payload}, NOW(3))
</insert>

技术点

  • 事件持久化:将关系事件写入 outbox 表
  • 包含事件类型、聚合ID、负载等信息
  • 为后续的异步处理做准备

7. Canal 监听与转发

// CanalKafkaBridge.java:106-161
while (running) {
    // 拉取一批未确认消息
    Message message = connector.getWithoutAck(batchSize);
    long batchId = message.getId();
    // 空批次或心跳时,按间隔休眠并继续轮询
    if (batchId == -1 || message.getEntries() == null || message.getEntries().isEmpty()) {
        try {
            Thread.sleep(intervalMs);
        } catch (InterruptedException ignored) {}
        continue;
    }
    for (CanalEntry.Entry entry : message.getEntries()) {
        // 仅处理行级数据变更事件
        if (entry.getEntryType() != CanalEntry.EntryType.ROWDATA) {
            continue;
        }
        CanalEntry.RowChange rowChange;

        try {
            // 解析二进制为 RowChange
            rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
        } catch (Exception e) {
            continue;
        }

        CanalEntry.EventType eventType = rowChange.getEventType();
        // 仅转发 INSERT/UPDATE 事件
        if (eventType != CanalEntry.EventType.INSERT && eventType != CanalEntry.EventType.UPDATE) {
            continue;
        }
        ArrayNode dataArray = objectMapper.createArrayNode();

        for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
            ObjectNode rowNode = objectMapper.createObjectNode();
            for (CanalEntry.Column col : rowData.getAfterColumnsList()) {
                // 提取 payload 字段值
                if ("payload".equalsIgnoreCase(col.getName())) {
                    rowNode.put("payload", col.getValue());
                }
            }
            dataArray.add(rowNode);
        }

        ObjectNode msgNode = objectMapper.createObjectNode();
        msgNode.put("table", entry.getHeader().getTableName());
        msgNode.put("type", eventType == CanalEntry.EventType.INSERT ? "INSERT" : "UPDATE");
        msgNode.set("data", dataArray);

        try {
            // 序列化并发送到 Kafka 主题
            String json = objectMapper.writeValueAsString(msgNode);
            kafka.send(OutboxTopics.CANAL_OUTBOX, json);
        } catch (Exception ignored) {}
    }
    // 批次确认(推进位点)
    connector.ack(batchId);
}

技术点

  • Canal 监听 MySQL binlog,捕获 outbox 表的变更
  • 只处理 INSERT/UPDATE 事件
  • 提取 payload 字段,转发到 Kafka 主题
  • 批量确认位点,保证至少一次语义

8. 消费 Kafka 事件

// CanalOutboxConsumer.java:39-58
@KafkaListener(topics = OutboxTopics.CANAL_OUTBOX, groupId = "relation-outbox-consumer")
public void onMessage(String message, Acknowledgment ack) {
    try {
        List<JsonNode> rows = OutboxMessageUtil.extractRows(objectMapper, message);
        if (rows.isEmpty()) {
            ack.acknowledge();
            return;
        }
        for (JsonNode row : rows) {
            JsonNode payloadNode = row.get("payload");
            if (payloadNode == null) {
                continue;
            }
            
            RelationEvent evt = objectMapper.readValue(payloadNode.asText(), RelationEvent.class);
            processor.process(evt);
        }
        ack.acknowledge();
    } catch (Exception ignored) {}
}

技术点

  • 监听 canal-outbox 主题
  • 提取 payload 字段,反序列化为 RelationEvent
  • 调用 RelationEventProcessor.process() 处理事件
  • 手动提交位点,确保处理成功

9. 处理关系事件

// RelationEventProcessor.java:30-65
public void process(RelationEvent evt) {
    String dk = "dedup:rel:" + evt.type() + ":" + evt.fromUserId() + ":" + evt.toUserId() + ":" + (evt.id() == null ? "0" : String.valueOf(evt.id()));
    Boolean first = redis.opsForValue().setIfAbsent(dk, "1", Duration.ofMinutes(10));

    // 非首次(存在去重键)直接返回,保证消息幂等
    if (first == null || !first) {
        return;
    }
    if ("FollowCreated".equals(evt.type())) {
        // 异步插入粉丝表
        mapper.insertFollower(evt.id(), evt.toUserId(), evt.fromUserId(), 1);
        long now = System.currentTimeMillis();

        // 更新关注表与粉丝表缓存:ZSet 按时间分数维护最近项,设置短 TTL 减少陈旧数据
        redis.opsForZSet().add("uf:flws:" + evt.fromUserId(), String.valueOf(evt.toUserId()), now);
        redis.opsForZSet().add("uf:fans:" + evt.toUserId(), String.valueOf(evt.fromUserId()), now);
        redis.expire("uf:flws:" + evt.fromUserId(), Duration.ofHours(2));
        redis.expire("uf:fans:" + evt.toUserId(), Duration.ofHours(2));

        // 更新关注数与粉丝数
        userCounterService.incrementFollowings(evt.fromUserId(), 1);
        userCounterService.incrementFollowers(evt.toUserId(), 1);
    } else if ("FollowCanceled".equals(evt.type())) {
        mapper.cancelFollower(evt.toUserId(), evt.fromUserId());

        // 更新关注表与粉丝表缓存:移除 ZSet 项并刷新 TTL
        redis.opsForZSet().remove("uf:flws:" + evt.fromUserId(), String.valueOf(evt.toUserId()));
        redis.opsForZSet().remove("uf:fans:" + evt.toUserId(), String.valueOf(evt.fromUserId()));
        redis.expire("uf:flws:" + evt.fromUserId(), Duration.ofHours(2));
        redis.expire("uf:fans:" + evt.toUserId(), Duration.ofHours(2));

        // 更新关注数与粉丝数
        userCounterService.incrementFollowings(evt.fromUserId(), -1);
        userCounterService.incrementFollowers(evt.toUserId(), -1);
    }
}

技术点

  • Redis 去重:防止重复处理事件
  • 插入/取消粉丝关系
  • 更新 Redis 缓存:使用 ZSet 按时间排序
  • 更新用户计数:调用 UserCounterService

Kafka、Canal、MySQL、Outbox 的关系与流程详解

一、四个组件的基本概念

1. MySQL(数据库)

作用:存储所有业务数据

类比:就像一个账本,记录所有的交易。

例子

  • 用户表:存储用户信息
  • 关注表:存储关注关系
  • Outbox 表:存储事件(特殊表)

2. Outbox(事件表)

作用:一个特殊的数据库表,用于存储事件

类比:就像账本旁边的一个"通知本",记录需要通知其他人的事情。

表结构

outbox 表:
- id: 事件ID
- aggregate_type: 聚合类型(如"following")
- aggregate_id: 聚合ID(如关系记录ID)
- type: 事件类型(如"FollowCreated")
- payload: 事件内容(JSON 格式)
- created_at: 创建时间

3. Canal(监听器)

作用:监听 MySQL 的变更,自动捕获数据库操作

类比:就像一个摄像头,对着账本录像,记录每一笔交易。

工作方式

  • 伪装成 MySQL 的从库
  • 读取 MySQL 的 binlog(操作日志)
  • 解析 binlog,发现数据库变更
  • 将变更转发到 Kafka

4. Kafka(消息队列)

作用:存储和转发消息,连接不同的系统

类比:就像一个公告板,所有人都可以在上面贴通知,所有人都可以看通知。

特点

  • 持久化存储:消息不会丢失
  • 支持多个消费者:多个系统可以同时消费
  • 高吞吐量:支持大量消息

二、四个组件之间的关系

关系图

MySQL(数据库)
    ↓ binlog(操作日志)
Canal(监听器)
    ↓ 解析变更
    ↓ 提取事件
Kafka(消息队列)
    ↓ 消费消息
其他服务(粉丝服务、计数服务、缓存服务)

具体关系

  1. MySQL 和 Outbox

    • Outbox 是 MySQL 中的一个表
    • 和其他表(如关注表)在同一个数据库里
    • 业务操作时,同时写入业务表和 Outbox 表
  2. MySQL 和 Canal

    • Canal 监听 MySQL 的 binlog
    • MySQL 每次操作都会记录到 binlog
    • Canal 读取 binlog,发现数据库变更
  3. Canal 和 Kafka

    • Canal 解析 binlog,提取 Outbox 表的变更
    • 将变更事件发送到 Kafka
    • Kafka 存储这些事件
  4. Kafka 和其他服务

    • 其他服务从 Kafka 消费事件
    • 根据事件内容更新自己的数据
    • 多个服务可以同时消费同一个事件

三、完整流程:用户关注操作

场景:用户小明关注用户小红

完整流程图

用户点击关注按钮

RelationController 接收请求

调用 RelationService.follow

开启数据库事务

写入关注表
following

写入 Outbox 表
outbox

提交事务
同时成功或同时失败

返回成功给前端

Canal 监听 MySQL binlog

发现 Outbox 表有新数据

解析 binlog
提取事件

构造消息体

发送到 Kafka
canal-outbox 主题

CanalOutboxConsumer
监听 Kafka

收到消息

反序列化 RelationEvent

调用 RelationEventProcessor.process

Redis 去重
防止重复处理

插入粉丝表
follower

更新 Redis 缓存
ZSet

更新用户计数
UserCounterService

提交 Kafka 位点
确认处理完成

详细步骤

阶段1:用户请求处理

1. 用户点击关注按钮
   ↓
2. 前端发送请求到后端
   ↓
3. RelationController 接收请求
   ↓
4. 调用 RelationService.follow(fromUserId, toUserId)

阶段2:数据库操作(关键!)

// RelationServiceImpl.java:86-107
@Transactional  // 开启事务
public boolean follow(long fromUserId, long toUserId) {
    // 1. 写入关注表
    mapper.insertFollowing(id, fromUserId, toUserId, 1);
    
    // 2. 写入 Outbox 表(同一个事务!)
    String payload = objectMapper.writeValueAsString(new RelationEvent("FollowCreated", fromUserId, toUserId, id));
    outboxMapper.insert(outId, "following", id, "FollowCreated", payload);
    
    // 3. 事务提交:要么都成功,要么都失败
    return true;
}

关键点

  • 两个写入操作在同一个事务里
  • 如果关注表写入失败,Outbox 表也不会写入
  • 如果 Outbox 表写入失败,关注表也会回滚
  • 保证了数据一致性!

阶段3:Canal 监听与转发

5. MySQL 生成 binlog,记录这次操作
   ↓
6. Canal 监听 binlog,发现 Outbox 表有新数据
   ↓
7. Canal 解析 binlog,提取事件内容
   ↓
8. Canal 构造消息体
   ↓
9. Canal 发送消息到 Kafka(canal-outbox 主题)

关键点

  • Canal 是独立运行的,不需要业务代码调用
  • 自动监听,自动转发
  • 即使业务代码重启,Canal 也会继续工作

阶段4:Kafka 消费与处理

// CanalOutboxConsumer.java:39-58
@KafkaListener(topics = OutboxTopics.CANAL_OUTBOX, groupId = "relation-outbox-consumer")
public void onMessage(String message, Acknowledgment ack) {
    // 1. 反序列化消息
    List<JsonNode> rows = OutboxMessageUtil.extractRows(objectMapper, message);
    
    // 2. 提取 payload
    for (JsonNode row : rows) {
        JsonNode payloadNode = row.get("payload");
        RelationEvent evt = objectMapper.readValue(payloadNode.asText(), RelationEvent.class);
        
        // 3. 处理事件
        processor.process(evt);
    }
    
    // 4. 确认消费
    ack.acknowledge();
}

阶段5:事件处理

// RelationEventProcessor.java:30-65
public void process(RelationEvent evt) {
    // 1. Redis 去重
    String dk = "dedup:rel:" + evt.type() + ":" + evt.fromUserId() + ":" + evt.toUserId();
    Boolean first = redis.opsForValue().setIfAbsent(dk, "1", Duration.ofMinutes(10));
    if (first == null || !first) {
        return;  // 非首次,直接返回
    }
    
    // 2. 插入粉丝表
    mapper.insertFollower(evt.id(), evt.toUserId(), evt.fromUserId(), 1);
    
    // 3. 更新 Redis 缓存
    redis.opsForZSet().add("uf:flws:" + evt.fromUserId(), String.valueOf(evt.toUserId()), now);
    redis.opsForZSet().add("uf:fans:" + evt.toUserId(), String.valueOf(evt.fromUserId()), now);
    
    // 4. 更新用户计数
    userCounterService.incrementFollowings(evt.fromUserId(), 1);
    userCounterService.incrementFollowers(evt.toUserId(), 1);
}

四、各自解决了什么问题?

1. MySQL + Outbox:解决数据一致性问题

问题:如何保证业务操作和事件发布的一致性?

解决方案

  • 业务操作和事件写入在同一个事务里
  • 要么都成功,要么都失败
  • 不会出现"业务成功但事件丢失"的情况

例子

传统方法:
- 写入关注表(成功)
- 发送 Kafka 消息(失败)
- 结果:数据库有记录,但其他系统不知道

Outbox 方法:
- 写入关注表(成功)
- 写入 Outbox 表(成功)
- 事务提交:都成功
- 结果:数据库有记录,Canal 会自动转发事件

2. Canal:解决事件可靠性问题

问题:如何保证事件不会丢失?

解决方案

  • Canal 持续监听 MySQL 的 binlog
  • 即使服务重启,Canal 会接着上次的位置继续
  • 即使 Kafka 暂时不可用,Canal 会等它恢复了再继续

例子

传统方法:
- 业务代码发送 Kafka 消息
- 如果 Kafka 挂了,消息丢失
- 结果:其他系统不知道这次操作

Canal 方法:
- 业务代码只写数据库
- Canal 监听 binlog
- 如果 Kafka 挂了,Canal 会等它恢复
- 结果:消息不会丢失

3. Kafka:解决系统解耦问题

问题:如何让多个系统协同工作,又不互相依赖?

解决方案

  • Kafka 作为消息中间件,连接不同的系统
  • 业务代码不需要知道其他系统的存在
  • 其他系统从 Kafka 消费事件,独立运行

例子

传统方法:
- 关注服务调用粉丝服务
- 关注服务调用计数服务
- 关注服务调用缓存服务
- 结果:关注服务依赖所有其他服务

Kafka 方法:
- 关注服务只写数据库
- 粉丝服务从 Kafka 消费
- 计数服务从 Kafka 消费
- 缓存服务从 Kafka 消费
- 结果:系统解耦,易于扩展

4. Redis 去重:解决幂等性问题

问题:如何保证事件只处理一次?

解决方案

  • 使用 Redis 的 setIfAbsent 方法
  • 第一次处理时设置去重键
  • 后续处理时检查去重键,如果存在就跳过

例子

没有去重:
- Kafka 可能重复发送消息
- 事件被处理多次
- 结果:数据错误

有去重:
- 第一次处理:设置去重键,处理事件
- 第二次处理:检查去重键,跳过
- 结果:事件只处理一次

五、总结:完整流程

一句话总结

用户请求 → 写入 MySQL(业务表 + Outbox 表)→ Canal 监听 binlog → 转发到 Kafka → 其他服务消费事件

各自的作用

组件 作用 解决的问题
MySQL 存储业务数据和事件 数据持久化
Outbox 存储事件,和业务数据在同一事务 数据一致性
Canal 监听 MySQL 变更,转发到 Kafka 事件可靠性
Kafka 存储和转发消息 系统解耦

类比总结

  • MySQL:账本,记录所有交易
  • Outbox:通知本,记录需要通知的事情
  • Canal:摄像头,对着账本录像
  • Kafka:公告板,所有人都可以看通知

完整流程类比

你(用户):
- 告诉会计(业务代码)要关注某人

会计(业务代码):
- 在账本上记录(写入关注表)
- 在通知本上记录(写入 Outbox 表)
- 完成任务

摄像头(Canal):
- 看到会计在通知本上写了东西
- 把内容拍下来,发给公告板

公告板(Kafka):
- 收到通知,贴在公告板上

其他人(其他服务):
- 看到公告板上的通知
- 根据通知做自己的事情

二、技术亮点与传统方法对比

1. Outbox 模式

传统方法

  • 直接调用其他服务,同步处理
  • 事务边界难以管理
  • 容易出现数据不一致

我们的方法

  • 事件持久化到 outbox 表
  • Canal 捕获并转发到 Kafka
  • 异步处理,解耦系统

实现

  • OutboxMapper 写入事件
  • CanalKafkaBridge 转发事件
  • CanalOutboxConsumer 消费事件

业务作用

  • 保证事件不丢失
  • 解耦服务,提高系统可靠性
  • 支持重试和重放

2. 令牌桶限流

传统方法

  • 无限制或简单计数
  • 容易被恶意用户刷操作

我们的方法

  • Lua 脚本实现令牌桶
  • 控制关注频率

实现

  • TOKEN_BUCKET_LUA 脚本
  • 容量 100,速率 1

业务作用

  • 防止恶意刷关注
  • 保护系统不被滥用

3. 双表设计

传统方法

  • 单表存储关注关系
  • 查询粉丝列表需要反向查询,性能差

我们的方法

  • following 表:存储关注关系
  • follower 表:存储粉丝关系

实现

  • RelationMapper.insertFollowing
  • RelationMapper.insertFollower

业务作用

  • 双向存储,提高查询性能
  • 支持快速获取关注列表和粉丝列表

4. Redis 缓存优化

传统方法

  • 每次查询都从数据库读取
  • 性能差,特别是大V用户

我们的方法

  • Redis ZSet 缓存关注/粉丝列表
  • 大V用户本地 Top 缓存
  • 支持偏移和游标分页

实现

  • getListWithOffset 方法
  • getListWithCursor 方法
  • maybeUpdateTopCache 方法

业务作用

  • 提高查询性能
  • 减少数据库压力
  • 支持大V用户的快速访问

5. 幂等处理

传统方法

  • 可能重复处理事件
  • 导致数据不一致

我们的方法

  • Redis 去重键
  • 防重复处理

实现

  • RelationEventProcessor.process 中的去重逻辑

业务作用

  • 保证事件只处理一次
  • 避免重复更新数据

三、核心本质

用户关系模块的核心设计思想是事件驱动 + 缓存优化

  • 事件驱动:通过 Outbox 模式和 Kafka 实现异步处理,解耦系统
  • 缓存优化:使用 Redis ZSet 缓存关注/粉丝列表,提高查询性能
  • 限流保护:使用令牌桶防止恶意操作
  • 幂等处理:保证事件只处理一次

这种设计使得用户关系模块能够在高并发场景下,既快又准地处理用户的关注/取消关注操作,同时保证系统的可靠性和性能。

四、总结

用户关系模块通过以下技术实现了高效、可靠的关注/粉丝功能:

  1. Outbox 模式:保证事件可靠传递
  2. 令牌桶限流:防止恶意刷关注
  3. 双表设计:提高查询性能
  4. Redis 缓存:加速列表查询
  5. 幂等处理:保证数据一致性

这些技术的组合,使得用户关系模块能够应对百万级用户的高并发操作,同时保证数据的准确性和系统的可靠性。

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐