本文是 PostgreSQL 高级并发控制:使用 ON CONFLICT DO NOTHING 实现高并发下的奖励计数限制 的姊妹篇。前文聚焦于数据库层的原子操作实现,本文将从系统工程视角出发,深入剖析乐观锁如何融入完整的业务流程,以及多层防护机制如何协同保障并发安全。


1. 问题定义

1.1 业务场景

会员裂变活动中,存在两类角色和两种触发事件:

角色 说明 奖励限制
邀请人 发出邀请链接的用户 受次数限制(如注册事件最多奖励5次)
被邀请人 通过邀请链接注册的用户 不受次数限制
触发事件 说明
注册事件 被邀请人完成注册
首单事件 被邀请人完成首次支付

1.2 核心约束

同一邀请人 + 同一活动 + 同一触发事件 → 奖励次数 ≤ MaxCount

关键需求:仅限制邀请人获得奖励,被邀请人的奖励不受任何影响。

1.3 并发挑战

Kafka 消费者可能同时处理多条消息,典型竞态场景:

时刻T1: 进程A读取 reward_count = 4 (maxCount = 5)
时刻T2: 进程B读取 reward_count = 4 (maxCount = 5)
时刻T3: 进程A判断 4 < 5,准备更新
时刻T4: 进程B判断 4 < 5,准备更新
时刻T5: 进程A更新 reward_count = 5,发放奖励
时刻T6: 进程B更新 reward_count = 6,发放奖励 ← ❌ 超限!

2. 系统架构设计

2.1 多层防护体系

本方案采用三层防护,各层职责明确、互不冗余:

┌─────────────────────────────────────────────────┐
│  第1层:Kafka消息幂等                            │
│  防止同一消息重复处理                             │
│  实现:消息状态表 (pending/success/failed)         │
├─────────────────────────────────────────────────┤
│  第2层:Redis分布式锁                             │
│  防止同一用户同一事件的并发处理                    │
│  实现:lockKey = promotion_{event}_{activityId}_{customerId}  │
├─────────────────────────────────────────────────┤
│  第3层:DB乐观锁(TryIncrement)                  │
│  防止邀请人奖励次数超限                           │
│  实现:ON CONFLICT + 条件UPDATE + RowsAffected    │
└─────────────────────────────────────────────────┘

为什么需要三层?

层级 保护维度 解决的问题
消息幂等 消息维度 Kafka at-least-once 语义下的重复消费
分布式锁 用户事件维度 同一用户同一事件的并发请求
DB乐观锁 奖励计数维度 邀请人奖励次数的精确限制

三层防护缺一不可

  • 只有消息幂等 → 无法防止不同消息对同一邀请人的并发计数
  • 只有分布式锁 → 锁粒度太粗,无法精确控制奖励次数
  • 只有DB乐观锁 → 缺少上层防护,大量并发直接打到数据库

2.2 整体处理流程

Kafka消息
   │
   ▼
┌──────────────┐
│ 消息幂等检查   │ ── 已处理成功 → 跳过
│ (第1层)       │ ── 处理中 → 跳过
└──────┬───────┘ ── 失过败 → 重试
       │
       ▼
┌──────────────┐
│ 事件预处理     │ ── 解析业务数据
│               │ ── 校验活动有效性
│               │ ── 校验邀请关系
└──────┬───────┘
       │
       ▼
┌──────────────┐
│ Redis分布式锁  │ ── 获取失败 → 返回错误
│ (第2层)       │
└──────┬───────┘
       │
       ▼
┌──────────────┐
│ 开启DB事务     │
│               │ ┌─ 创建邀请关系
│               │ ├─ 创建邀请关系日志
│               │ ├─ 处理奖励规则 ──┐
│               │ │                │
│               │ │    ┌───────────┴───────────┐
│               │ │    │ processRewardRule      │
│               │ │    │ ┌─ 幂等性检查          │
│               │ │    │ ├─ TryIncrement(第3层) │ ← 仅邀请人
│               │ │    │ ├─ 创建奖励记录        │
│               │ │    │ └─ 执行奖励发放        │
│               │ │    └───────────────────────┘
│               │ └────────────────────────────┘
│               │
│  提交事务      │
└──────┬───────┘
       │
       ▼
  发送消息通知(异步)

3. 核心业务逻辑详解

3.1 奖励规则处理

系统支持多种奖励规则,每条规则独立处理:

type PromotionRewardRule struct {
    TriggerEvent  int   // 触发事件:1-注册 2-首单支付
    RewardTarget  int   // 奖励目标:1-邀请人 2-被邀请人
    RewardType    int   // 奖励类型:1-积分 2-优惠券 3-积分+优惠券
    RewardContent PromotionRewardContent
}

一个活动可能配置多条规则,例如:

规则1:注册事件 + 邀请人 → 发放100积分
规则2:注册事件 + 被邀请人 → 发放50积分
规则3:首单事件 + 邀请人 → 发放200积分
规则4:首单事件 + 被邀请人 → 发放优惠券

processRewardRules 方法遍历所有匹配的规则,逐条处理:

func (s *PromotionMessageService) processRewardRules(
    ctx context.Context, tx *gorm.DB, 
    relation *models.PromotionInviteRelation, 
    activity *models.PromotionActivity, 
    rewardRules []PromotionRewardRule, 
    triggerEvent int, relationLogId int64, 
    ruleLogId int64, maxInviteCount int,
) (processRewardRulesResp, error) {
    targetRules := s.getRewardRulesByTriggerEvent(rewardRules, triggerEvent)
    resp := processRewardRulesResp{}
    for _, rewardRule := range targetRules {
        rewardId, err := s.processRewardRule(ctx, tx, relation, activity, 
            rewardRule, triggerEvent, relationLogId, ruleLogId, maxInviteCount)
        if err != nil {
            return resp, err
        }
        resp.rewardIds = append(resp.rewardIds, rewardId)
    }
    return resp, nil
}

关键设计:每条规则独立处理,邀请人规则失败不影响被邀请人规则。

3.2 单条奖励规则处理(核心)

func (s *PromotionMessageService) processRewardRule(
    ctx context.Context, tx *gorm.DB,
    relation *models.PromotionInviteRelation,
    activity *models.PromotionActivity,
    rewardRule PromotionRewardRule,
    triggerEvent int, relationLogId int64,
    ruleLogId int64, maxInviteCount int,
) (int64, error) {

    customerId := relation.InviterId
    if rewardRule.RewardTarget == PromotionCustomerRoleInvitee {
        customerId = relation.InviteeId
    }

    // ── 幂等性检查 ──
    idempotentKey := fmt.Sprintf("id:%d_event:%d_target:%d_type:%d",
        relation.ID, triggerEvent, rewardRule.RewardTarget, rewardRule.RewardType)

    existingReward, _ := s.rewardModel.GetByIdempotentKey(ctx, nil, idempotentKey)
    if existingReward != nil && existingReward.ID > 0 {
        return existingReward.ID, nil
    }

    // ── 邀请人计数限制(乐观锁核心) ──
    if rewardRule.RewardTarget == PromotionCustomerRoleInviter {
        req := PromotionActivityRewardCountTryIncrementRequest{
            CustomerId:   relation.InviterId,
            ActivityId:   relation.ActivityId,
            TriggerEvent: triggerEvent,
            MaxCount:     maxInviteCount,
        }
        canContinue, err := s.activityRewardCountModel.TryIncrement(ctx, tx, req)
        if err != nil {
            return 0, fmt.Errorf("校验邀请人奖励计数失败: %w", err)
        }
        if !canContinue {
            // 邀请人达到上限,跳过邀请人奖励
            // 但被邀请人的奖励会在另一条规则中正常处理
            return 0, nil
        }
    }

    // ── 创建奖励记录 + 发放奖励 ──
    reward := PromotionActivityReward{
        ActivityId:    activity.ID,
        RelationId:    relation.ID,
        RelationLogId: relationLogId,
        RuleLogId:     ruleLogId,
        CustomerId:    customerId,
        CustomerRole:  rewardRule.RewardTarget,
        TriggerEvent:  triggerEvent,
        RewardType:    rewardRule.RewardType,
        IdempotentKey: idempotentKey,
        RewardStatus:  PromotionActivityRewardStatusPending,
    }
    rewardId, _ := s.rewardModel.Create(ctx, tx, reward)

    if err := s.executeReward(ctx, tx, rewardId, customerId, rewardRule, triggerEvent); err != nil {
        // 发放失败,记录失败原因
        status := PromotionActivityRewardStatusFail
        failReason := err.Error()
        s.rewardModel.Patch(ctx, tx, PatchRequest{
            ID: &rewardId, RewardStatus: &status, FailReason: &failReason,
        })
        return 0, err
    }

    return rewardId, nil
}

流程要点

  1. 幂等性检查在前:通过 IdempotentKey(基于 relation_id + event + target + type 生成)防止重复发放
  2. 计数限制仅针对邀请人if rewardRule.RewardTarget == Inviter 分支判断
  3. 被邀请人不受影响:被邀请人的规则不走 TryIncrement 分支,直接创建奖励记录

3.3 TryIncrement 乐观锁实现

关于 TryIncrement 的数据库层实现细节,请参阅前文 PostgreSQL 高级并发控制:使用 ON CONFLICT DO NOTHING 实现高并发下的奖励计数限制

核心逻辑回顾:

func (t *PromotionActivityRewardCountModel) TryIncrement(
    ctx context.Context, db *gorm.DB, req TryIncrementRequest,
) (bool, error) {
    if req.MaxCount <= 0 {
        return true, nil
    }

    // 步骤1:幂等初始化
    db.Exec(`INSERT INTO ... ON CONFLICT (...) DO NOTHING`, ...)

    // 步骤2:条件原子更新
    result := db.Table(t.TableName()).
        Where("customer_id = ? AND activity_id = ? AND trigger_event = ? AND reward_count < ?",
            req.CustomerId, req.ActivityId, req.TriggerEvent, req.MaxCount).
        Updates(map[string]interface{}{
            "reward_count": gorm.Expr("reward_count + 1"),
            "updated_at":   utils.NewXTime(now),
        })

    // 步骤3:通过受影响行数判断
    return result.RowsAffected > 0, nil
}

4. 并发安全深度分析

4.1 PostgreSQL 行级锁机制

核心问题:两个事务同时 UPDATE 同一行,会发生什么?

PostgreSQL 的 UPDATE 语句执行时,会自动获取行级排他锁(Row Exclusive Lock)。这意味着:

事务A: UPDATE ... WHERE reward_count < 5
       → 获取该行的排他锁
       → 检查条件:reward_count = 4 < 5 ✓
       → 执行更新:reward_count = 5
       → RowsAffected = 1

事务B: UPDATE ... WHERE reward_count < 5
       → 尝试获取该行的排他锁
       → ⚠️ 被阻塞!等待事务A释放锁
       → 事务A提交后,B获得锁
       → 重新检查条件:reward_count = 5 < 5 ✗
       → 不满足条件,不执行更新
       → RowsAffected = 0

关键认知:不是等到 COMMIT 时才发现冲突,而是在 UPDATE 执行时就已经通过锁机制序列化了。

4.2 完整并发时序图

假设:reward_count = 4maxCount = 5,两个进程同时处理不同被邀请人的注册事件(但邀请人相同)

进程A(被邀请人X注册)              进程B(被邀请人Y注册)              数据库
    │                                   │                           count=4
    │  ┌─ 获取Redis锁(X) ─┐            │                           │
    │  │  创建邀请关系(X)   │            │  ┌─ 获取Redis锁(Y) ─┐    │
    │  │  创建关系日志(X)   │            │  │  创建邀请关系(Y)   │    │
    │  │                    │            │  │  创建关系日志(Y)   │    │
    │  └────────────────────┘            │  └────────────────────┘    │
    │                                   │                           │
    │  ┌─ 处理邀请人奖励规则 ─┐         │                           │
    │  │  TryIncrement:       │         │                           │
    │  │  INSERT ON CONFLICT  │         │                           │
    │  │  UPDATE WHERE < 5    │─────────│───────────────────────────│→ 获取锁
    │  │  → count=5, OK ✓    │         │                           │   count=5
    │  └──────────────────────┘         │                           │
    │                                   │                           │
    │  ┌─ 处理被邀请人奖励规则 ─┐       │  ┌─ 处理邀请人奖励规则 ─┐  │
    │  │  直接创建奖励记录      │       │  │  TryIncrement:       │  │
    │  │  发放被邀请人X奖励 ✓  │       │  │  INSERT ON CONFLICT  │  │
    │  └──────────────────────┘       │  │  UPDATE WHERE < 5    │──│→ 等待锁
    │                                   │  │  → 等待A释放锁...    │  │
    │  ┌─ COMMIT ─────────────┐       │  │                      │  │
    │  └──────────────────────┘       │  │  → 获得锁            │──│→ 锁释放
    │                                   │  │  → count=5 < 5 ✗    │  │
    │                                   │  │  → RowsAffected=0   │  │
    │                                   │  │  → canContinue=false │  │
    │                                   │  └──────────────────────┘  │
    │                                   │                           │
    │                                   │  ┌─ 处理被邀请人奖励规则 ─┐│
    │                                   │  │  直接创建奖励记录      ││
    │                                   │  │  发放被邀请人Y奖励 ✓  ││
    │                                   │  └──────────────────────┘  │
    │                                   │                           │
    │                                   │  ┌─ COMMIT ─────────────┐│
    │                                   │  └──────────────────────┘│
    │                                   │                        count=5
    ▼                                   ▼                           

结果

  • ✅ 邀请人只获得1次奖励(count从4→5)
  • ✅ 被邀请人X获得奖励
  • ✅ 被邀请人Y获得奖励
  • ✅ 邀请人因被邀请人Y的奖励被跳过(达到上限)

4.3 不同场景下的行为验证

场景 TryIncrement结果 邀请人奖励 被邀请人奖励
正常(count < max) RowsAffected=1, true ✅ 发放 ✅ 发放
临界并发-A先到 RowsAffected=1, true ✅ 发放 ✅ 发放
临界并发-B后到 RowsAffected=0, false ❌ 跳过 ✅ 发放
已达上限 RowsAffected=0, false ❌ 跳过 ✅ 发放
maxCount=0(无限制) 直接返回true ✅ 发放 ✅ 发放

5. 事务一致性保障

5.1 事务边界设计

func (h *RegisterEventHandler) processInTransaction(ctx context.Context, data *registerEventData) error {
    tx := models.Db.Begin()
    defer tx.RollbackUnlessCommitted()

    // 以下所有操作在同一事务中:
    relation, _ := h.createInvitation(ctx, tx, data)          // 创建邀请关系
    relationLogId, _ := h.createInvitationLog(ctx, tx, data)  // 创建关系日志
    resp, _ := h.service.processRewardRules(ctx, tx, ...)     // 处理奖励(含TryIncrement)

    err := tx.Commit().Error
    if err != nil {
        return err
    }

    go h.service.sendMessage(ctx, resp)  // 事务提交后异步发通知
    return nil
}

关键设计

  • TryIncrement 与奖励发放、邀请关系创建在同一事务
  • 如果后续步骤失败回滚,TryIncrement 的 count 也会回滚
  • 消息通知放在事务提交后异步执行,避免长事务

5.2 回滚安全性

场景:TryIncrement成功(count+1),但后续发放优惠券失败

事务内:
  1. TryIncrement → count 4→5 ✓
  2. 创建奖励记录 ✓
  3. 发放积分 ✓
  4. 发放优惠券 ✗ → 返回error
  
  → tx.RollbackUnlessCommitted() 自动回滚
  → count 恢复为 4
  → 奖励记录回滚
  → 积分发放回滚
  
结果:数据一致性完整,下次重试可以再次尝试

6. 幂等性设计

6.1 三重幂等保障

层级 幂等键 保障范围
消息幂等 Kafka MessageId 同一消息不重复处理
奖励幂等 id:{relationId}_event:{event}_target:{target}_type:{type} 同一奖励不重复发放
计数幂等 (customer_id, activity_id, trigger_event) 唯一索引 计数记录不重复创建

6.2 幂等键设计

idempotentKey := fmt.Sprintf("id:%d_event:%d_target:%d_type:%d",
    relation.ID,          // 邀请关系ID(唯一标识一次邀请)
    triggerEvent,         // 触发事件类型
    rewardRule.RewardTarget,  // 奖励目标(邀请人/被邀请人)
    rewardRule.RewardType,    // 奖励类型(积分/优惠券/积分+优惠券)
)

设计考量

  • 基于 relation.ID 而非 customerId:同一邀请人邀请不同人会产生不同 relation,每条奖励独立
  • 包含 RewardTarget:邀请人和被邀请人的奖励是两条独立记录
  • 包含 RewardType:同一关系下不同奖励类型独立计算

7. 方案对比与选型

7.1 与其他方案的对比

方案 实现方式 优点 缺点
悲观锁 SELECT ... FOR UPDATE 简单直观 锁持有时间长,并发性能差
Redis计数器 INCR + 判断 性能极高 非事务性,Redis与DB一致性难保证
分布式锁+计数 Redis Lock + DB计数 安全性高 锁粒度粗,实现复杂
本方案(DB乐观锁) 条件UPDATE + RowsAffected 安全性高、性能好、事务一致 极端并发下有少量重试

7.2 为什么选择乐观锁

  1. 业务特性匹配:营销活动的并发量是"中等突发"而非"极端高并发",乐观锁的冲突率低
  2. 事务一致性:TryIncrement与业务操作在同一事务中,无需处理分布式事务
  3. 实现简洁:核心逻辑仅20行代码,可维护性高
  4. 无需额外依赖:不依赖Redis做计数,减少故障点

8. 测试验证策略

8.1 并发测试

func TestTryIncrement_InsertOnConflict_Concurrent(t *testing.T) {
    // 20个goroutine竞争5次额度
    const concurrent = 20
    const maxCount = 5

    var wg sync.WaitGroup
    var mu sync.Mutex
    var successCount int

    wg.Add(concurrent)
    for i := 0; i < concurrent; i++ {
        go func() {
            defer wg.Done()
            success, err := repo.TryIncrement(ctx, tx, req)
            mu.Lock()
            if success {
                successCount++
            }
            mu.Unlock()
        }()
    }
    wg.Wait()

    assert.Equal(t, maxCount, successCount)  // 恰好5次成功
}

8.2 测试覆盖矩阵

测试场景 验证点 预期结果
同一客户并发竞争 乐观锁互斥 成功次数 = maxCount
不同客户并发 互不影响 全部成功
达到上限后调用 条件判断 返回false
事务回滚 数据一致性 count恢复原值
maxCount=0 无限制 直接返回true

9. 总结

本方案通过 PostgreSQL 乐观锁 实现了高并发场景下的奖励次数限制,核心设计要点:

  1. 三层防护体系:消息幂等 → 分布式锁 → DB乐观锁,各层职责清晰
  2. 条件原子更新UPDATE ... WHERE reward_count < maxCount,一行SQL完成检查+更新
  3. RowsAffected判断:无需额外查询,通过受影响行数即可判断成功与否
  4. 事务一致性:所有操作在同一事务中,回滚安全
  5. 被邀请人不受影响:通过 RewardTarget 分支判断,邀请人限制与被邀请人奖励解耦

该方案在实际生产环境中运行稳定,有效解决了"限制用户领取奖励次数"这一高并发经典难题。

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐