高并发营销活动系统:乐观锁在奖励次数限制中的完整工程实践
本文探讨了在高并发会员裂变活动中如何实现精确的奖励计数限制。系统采用三层防护机制:1) Kafka消息幂等防止重复消费;2) Redis分布式锁控制用户事件并发;3) PostgreSQL乐观锁(ON CONFLICT)确保邀请人奖励次数不超限。文章详细分析了业务场景、并发挑战,并给出了包含消息预处理、分布式锁获取、事务处理等环节的完整流程设计。核心逻辑通过独立处理每条奖励规则,确保邀请人和被邀请
本文是 PostgreSQL 高级并发控制:使用 ON CONFLICT DO NOTHING 实现高并发下的奖励计数限制 的姊妹篇。前文聚焦于数据库层的原子操作实现,本文将从系统工程视角出发,深入剖析乐观锁如何融入完整的业务流程,以及多层防护机制如何协同保障并发安全。
1. 问题定义
1.1 业务场景
会员裂变活动中,存在两类角色和两种触发事件:
| 角色 | 说明 | 奖励限制 |
|---|---|---|
| 邀请人 | 发出邀请链接的用户 | 受次数限制(如注册事件最多奖励5次) |
| 被邀请人 | 通过邀请链接注册的用户 | 不受次数限制 |
| 触发事件 | 说明 |
|---|---|
| 注册事件 | 被邀请人完成注册 |
| 首单事件 | 被邀请人完成首次支付 |
1.2 核心约束
同一邀请人 + 同一活动 + 同一触发事件 → 奖励次数 ≤ MaxCount
关键需求:仅限制邀请人获得奖励,被邀请人的奖励不受任何影响。
1.3 并发挑战
Kafka 消费者可能同时处理多条消息,典型竞态场景:
时刻T1: 进程A读取 reward_count = 4 (maxCount = 5)
时刻T2: 进程B读取 reward_count = 4 (maxCount = 5)
时刻T3: 进程A判断 4 < 5,准备更新
时刻T4: 进程B判断 4 < 5,准备更新
时刻T5: 进程A更新 reward_count = 5,发放奖励
时刻T6: 进程B更新 reward_count = 6,发放奖励 ← ❌ 超限!
2. 系统架构设计
2.1 多层防护体系
本方案采用三层防护,各层职责明确、互不冗余:
┌─────────────────────────────────────────────────┐
│ 第1层:Kafka消息幂等 │
│ 防止同一消息重复处理 │
│ 实现:消息状态表 (pending/success/failed) │
├─────────────────────────────────────────────────┤
│ 第2层:Redis分布式锁 │
│ 防止同一用户同一事件的并发处理 │
│ 实现:lockKey = promotion_{event}_{activityId}_{customerId} │
├─────────────────────────────────────────────────┤
│ 第3层:DB乐观锁(TryIncrement) │
│ 防止邀请人奖励次数超限 │
│ 实现:ON CONFLICT + 条件UPDATE + RowsAffected │
└─────────────────────────────────────────────────┘
为什么需要三层?
| 层级 | 保护维度 | 解决的问题 |
|---|---|---|
| 消息幂等 | 消息维度 | Kafka at-least-once 语义下的重复消费 |
| 分布式锁 | 用户事件维度 | 同一用户同一事件的并发请求 |
| DB乐观锁 | 奖励计数维度 | 邀请人奖励次数的精确限制 |
三层防护缺一不可:
- 只有消息幂等 → 无法防止不同消息对同一邀请人的并发计数
- 只有分布式锁 → 锁粒度太粗,无法精确控制奖励次数
- 只有DB乐观锁 → 缺少上层防护,大量并发直接打到数据库
2.2 整体处理流程
Kafka消息
│
▼
┌──────────────┐
│ 消息幂等检查 │ ── 已处理成功 → 跳过
│ (第1层) │ ── 处理中 → 跳过
└──────┬───────┘ ── 失过败 → 重试
│
▼
┌──────────────┐
│ 事件预处理 │ ── 解析业务数据
│ │ ── 校验活动有效性
│ │ ── 校验邀请关系
└──────┬───────┘
│
▼
┌──────────────┐
│ Redis分布式锁 │ ── 获取失败 → 返回错误
│ (第2层) │
└──────┬───────┘
│
▼
┌──────────────┐
│ 开启DB事务 │
│ │ ┌─ 创建邀请关系
│ │ ├─ 创建邀请关系日志
│ │ ├─ 处理奖励规则 ──┐
│ │ │ │
│ │ │ ┌───────────┴───────────┐
│ │ │ │ processRewardRule │
│ │ │ │ ┌─ 幂等性检查 │
│ │ │ │ ├─ TryIncrement(第3层) │ ← 仅邀请人
│ │ │ │ ├─ 创建奖励记录 │
│ │ │ │ └─ 执行奖励发放 │
│ │ │ └───────────────────────┘
│ │ └────────────────────────────┘
│ │
│ 提交事务 │
└──────┬───────┘
│
▼
发送消息通知(异步)
3. 核心业务逻辑详解
3.1 奖励规则处理
系统支持多种奖励规则,每条规则独立处理:
type PromotionRewardRule struct {
TriggerEvent int // 触发事件:1-注册 2-首单支付
RewardTarget int // 奖励目标:1-邀请人 2-被邀请人
RewardType int // 奖励类型:1-积分 2-优惠券 3-积分+优惠券
RewardContent PromotionRewardContent
}
一个活动可能配置多条规则,例如:
规则1:注册事件 + 邀请人 → 发放100积分
规则2:注册事件 + 被邀请人 → 发放50积分
规则3:首单事件 + 邀请人 → 发放200积分
规则4:首单事件 + 被邀请人 → 发放优惠券
processRewardRules 方法遍历所有匹配的规则,逐条处理:
func (s *PromotionMessageService) processRewardRules(
ctx context.Context, tx *gorm.DB,
relation *models.PromotionInviteRelation,
activity *models.PromotionActivity,
rewardRules []PromotionRewardRule,
triggerEvent int, relationLogId int64,
ruleLogId int64, maxInviteCount int,
) (processRewardRulesResp, error) {
targetRules := s.getRewardRulesByTriggerEvent(rewardRules, triggerEvent)
resp := processRewardRulesResp{}
for _, rewardRule := range targetRules {
rewardId, err := s.processRewardRule(ctx, tx, relation, activity,
rewardRule, triggerEvent, relationLogId, ruleLogId, maxInviteCount)
if err != nil {
return resp, err
}
resp.rewardIds = append(resp.rewardIds, rewardId)
}
return resp, nil
}
关键设计:每条规则独立处理,邀请人规则失败不影响被邀请人规则。
3.2 单条奖励规则处理(核心)
func (s *PromotionMessageService) processRewardRule(
ctx context.Context, tx *gorm.DB,
relation *models.PromotionInviteRelation,
activity *models.PromotionActivity,
rewardRule PromotionRewardRule,
triggerEvent int, relationLogId int64,
ruleLogId int64, maxInviteCount int,
) (int64, error) {
customerId := relation.InviterId
if rewardRule.RewardTarget == PromotionCustomerRoleInvitee {
customerId = relation.InviteeId
}
// ── 幂等性检查 ──
idempotentKey := fmt.Sprintf("id:%d_event:%d_target:%d_type:%d",
relation.ID, triggerEvent, rewardRule.RewardTarget, rewardRule.RewardType)
existingReward, _ := s.rewardModel.GetByIdempotentKey(ctx, nil, idempotentKey)
if existingReward != nil && existingReward.ID > 0 {
return existingReward.ID, nil
}
// ── 邀请人计数限制(乐观锁核心) ──
if rewardRule.RewardTarget == PromotionCustomerRoleInviter {
req := PromotionActivityRewardCountTryIncrementRequest{
CustomerId: relation.InviterId,
ActivityId: relation.ActivityId,
TriggerEvent: triggerEvent,
MaxCount: maxInviteCount,
}
canContinue, err := s.activityRewardCountModel.TryIncrement(ctx, tx, req)
if err != nil {
return 0, fmt.Errorf("校验邀请人奖励计数失败: %w", err)
}
if !canContinue {
// 邀请人达到上限,跳过邀请人奖励
// 但被邀请人的奖励会在另一条规则中正常处理
return 0, nil
}
}
// ── 创建奖励记录 + 发放奖励 ──
reward := PromotionActivityReward{
ActivityId: activity.ID,
RelationId: relation.ID,
RelationLogId: relationLogId,
RuleLogId: ruleLogId,
CustomerId: customerId,
CustomerRole: rewardRule.RewardTarget,
TriggerEvent: triggerEvent,
RewardType: rewardRule.RewardType,
IdempotentKey: idempotentKey,
RewardStatus: PromotionActivityRewardStatusPending,
}
rewardId, _ := s.rewardModel.Create(ctx, tx, reward)
if err := s.executeReward(ctx, tx, rewardId, customerId, rewardRule, triggerEvent); err != nil {
// 发放失败,记录失败原因
status := PromotionActivityRewardStatusFail
failReason := err.Error()
s.rewardModel.Patch(ctx, tx, PatchRequest{
ID: &rewardId, RewardStatus: &status, FailReason: &failReason,
})
return 0, err
}
return rewardId, nil
}
流程要点:
- 幂等性检查在前:通过
IdempotentKey(基于 relation_id + event + target + type 生成)防止重复发放 - 计数限制仅针对邀请人:
if rewardRule.RewardTarget == Inviter分支判断 - 被邀请人不受影响:被邀请人的规则不走 TryIncrement 分支,直接创建奖励记录
3.3 TryIncrement 乐观锁实现
关于
TryIncrement的数据库层实现细节,请参阅前文 PostgreSQL 高级并发控制:使用 ON CONFLICT DO NOTHING 实现高并发下的奖励计数限制
核心逻辑回顾:
func (t *PromotionActivityRewardCountModel) TryIncrement(
ctx context.Context, db *gorm.DB, req TryIncrementRequest,
) (bool, error) {
if req.MaxCount <= 0 {
return true, nil
}
// 步骤1:幂等初始化
db.Exec(`INSERT INTO ... ON CONFLICT (...) DO NOTHING`, ...)
// 步骤2:条件原子更新
result := db.Table(t.TableName()).
Where("customer_id = ? AND activity_id = ? AND trigger_event = ? AND reward_count < ?",
req.CustomerId, req.ActivityId, req.TriggerEvent, req.MaxCount).
Updates(map[string]interface{}{
"reward_count": gorm.Expr("reward_count + 1"),
"updated_at": utils.NewXTime(now),
})
// 步骤3:通过受影响行数判断
return result.RowsAffected > 0, nil
}
4. 并发安全深度分析
4.1 PostgreSQL 行级锁机制
核心问题:两个事务同时 UPDATE 同一行,会发生什么?
PostgreSQL 的 UPDATE 语句执行时,会自动获取行级排他锁(Row Exclusive Lock)。这意味着:
事务A: UPDATE ... WHERE reward_count < 5
→ 获取该行的排他锁
→ 检查条件:reward_count = 4 < 5 ✓
→ 执行更新:reward_count = 5
→ RowsAffected = 1
事务B: UPDATE ... WHERE reward_count < 5
→ 尝试获取该行的排他锁
→ ⚠️ 被阻塞!等待事务A释放锁
→ 事务A提交后,B获得锁
→ 重新检查条件:reward_count = 5 < 5 ✗
→ 不满足条件,不执行更新
→ RowsAffected = 0
关键认知:不是等到 COMMIT 时才发现冲突,而是在 UPDATE 执行时就已经通过锁机制序列化了。
4.2 完整并发时序图
假设:reward_count = 4,maxCount = 5,两个进程同时处理不同被邀请人的注册事件(但邀请人相同)
进程A(被邀请人X注册) 进程B(被邀请人Y注册) 数据库
│ │ count=4
│ ┌─ 获取Redis锁(X) ─┐ │ │
│ │ 创建邀请关系(X) │ │ ┌─ 获取Redis锁(Y) ─┐ │
│ │ 创建关系日志(X) │ │ │ 创建邀请关系(Y) │ │
│ │ │ │ │ 创建关系日志(Y) │ │
│ └────────────────────┘ │ └────────────────────┘ │
│ │ │
│ ┌─ 处理邀请人奖励规则 ─┐ │ │
│ │ TryIncrement: │ │ │
│ │ INSERT ON CONFLICT │ │ │
│ │ UPDATE WHERE < 5 │─────────│───────────────────────────│→ 获取锁
│ │ → count=5, OK ✓ │ │ │ count=5
│ └──────────────────────┘ │ │
│ │ │
│ ┌─ 处理被邀请人奖励规则 ─┐ │ ┌─ 处理邀请人奖励规则 ─┐ │
│ │ 直接创建奖励记录 │ │ │ TryIncrement: │ │
│ │ 发放被邀请人X奖励 ✓ │ │ │ INSERT ON CONFLICT │ │
│ └──────────────────────┘ │ │ UPDATE WHERE < 5 │──│→ 等待锁
│ │ │ → 等待A释放锁... │ │
│ ┌─ COMMIT ─────────────┐ │ │ │ │
│ └──────────────────────┘ │ │ → 获得锁 │──│→ 锁释放
│ │ │ → count=5 < 5 ✗ │ │
│ │ │ → RowsAffected=0 │ │
│ │ │ → canContinue=false │ │
│ │ └──────────────────────┘ │
│ │ │
│ │ ┌─ 处理被邀请人奖励规则 ─┐│
│ │ │ 直接创建奖励记录 ││
│ │ │ 发放被邀请人Y奖励 ✓ ││
│ │ └──────────────────────┘ │
│ │ │
│ │ ┌─ COMMIT ─────────────┐│
│ │ └──────────────────────┘│
│ │ count=5
▼ ▼
结果:
- ✅ 邀请人只获得1次奖励(count从4→5)
- ✅ 被邀请人X获得奖励
- ✅ 被邀请人Y获得奖励
- ✅ 邀请人因被邀请人Y的奖励被跳过(达到上限)
4.3 不同场景下的行为验证
| 场景 | TryIncrement结果 | 邀请人奖励 | 被邀请人奖励 |
|---|---|---|---|
| 正常(count < max) | RowsAffected=1, true | ✅ 发放 | ✅ 发放 |
| 临界并发-A先到 | RowsAffected=1, true | ✅ 发放 | ✅ 发放 |
| 临界并发-B后到 | RowsAffected=0, false | ❌ 跳过 | ✅ 发放 |
| 已达上限 | RowsAffected=0, false | ❌ 跳过 | ✅ 发放 |
| maxCount=0(无限制) | 直接返回true | ✅ 发放 | ✅ 发放 |
5. 事务一致性保障
5.1 事务边界设计
func (h *RegisterEventHandler) processInTransaction(ctx context.Context, data *registerEventData) error {
tx := models.Db.Begin()
defer tx.RollbackUnlessCommitted()
// 以下所有操作在同一事务中:
relation, _ := h.createInvitation(ctx, tx, data) // 创建邀请关系
relationLogId, _ := h.createInvitationLog(ctx, tx, data) // 创建关系日志
resp, _ := h.service.processRewardRules(ctx, tx, ...) // 处理奖励(含TryIncrement)
err := tx.Commit().Error
if err != nil {
return err
}
go h.service.sendMessage(ctx, resp) // 事务提交后异步发通知
return nil
}
关键设计:
- TryIncrement 与奖励发放、邀请关系创建在同一事务中
- 如果后续步骤失败回滚,TryIncrement 的 count 也会回滚
- 消息通知放在事务提交后异步执行,避免长事务
5.2 回滚安全性
场景:TryIncrement成功(count+1),但后续发放优惠券失败
事务内:
1. TryIncrement → count 4→5 ✓
2. 创建奖励记录 ✓
3. 发放积分 ✓
4. 发放优惠券 ✗ → 返回error
→ tx.RollbackUnlessCommitted() 自动回滚
→ count 恢复为 4
→ 奖励记录回滚
→ 积分发放回滚
结果:数据一致性完整,下次重试可以再次尝试
6. 幂等性设计
6.1 三重幂等保障
| 层级 | 幂等键 | 保障范围 |
|---|---|---|
| 消息幂等 | Kafka MessageId | 同一消息不重复处理 |
| 奖励幂等 | id:{relationId}_event:{event}_target:{target}_type:{type} |
同一奖励不重复发放 |
| 计数幂等 | (customer_id, activity_id, trigger_event) 唯一索引 |
计数记录不重复创建 |
6.2 幂等键设计
idempotentKey := fmt.Sprintf("id:%d_event:%d_target:%d_type:%d",
relation.ID, // 邀请关系ID(唯一标识一次邀请)
triggerEvent, // 触发事件类型
rewardRule.RewardTarget, // 奖励目标(邀请人/被邀请人)
rewardRule.RewardType, // 奖励类型(积分/优惠券/积分+优惠券)
)
设计考量:
- 基于
relation.ID而非customerId:同一邀请人邀请不同人会产生不同 relation,每条奖励独立 - 包含
RewardTarget:邀请人和被邀请人的奖励是两条独立记录 - 包含
RewardType:同一关系下不同奖励类型独立计算
7. 方案对比与选型
7.1 与其他方案的对比
| 方案 | 实现方式 | 优点 | 缺点 |
|---|---|---|---|
| 悲观锁 | SELECT ... FOR UPDATE |
简单直观 | 锁持有时间长,并发性能差 |
| Redis计数器 | INCR + 判断 |
性能极高 | 非事务性,Redis与DB一致性难保证 |
| 分布式锁+计数 | Redis Lock + DB计数 | 安全性高 | 锁粒度粗,实现复杂 |
| 本方案(DB乐观锁) | 条件UPDATE + RowsAffected | 安全性高、性能好、事务一致 | 极端并发下有少量重试 |
7.2 为什么选择乐观锁
- 业务特性匹配:营销活动的并发量是"中等突发"而非"极端高并发",乐观锁的冲突率低
- 事务一致性:TryIncrement与业务操作在同一事务中,无需处理分布式事务
- 实现简洁:核心逻辑仅20行代码,可维护性高
- 无需额外依赖:不依赖Redis做计数,减少故障点
8. 测试验证策略
8.1 并发测试
func TestTryIncrement_InsertOnConflict_Concurrent(t *testing.T) {
// 20个goroutine竞争5次额度
const concurrent = 20
const maxCount = 5
var wg sync.WaitGroup
var mu sync.Mutex
var successCount int
wg.Add(concurrent)
for i := 0; i < concurrent; i++ {
go func() {
defer wg.Done()
success, err := repo.TryIncrement(ctx, tx, req)
mu.Lock()
if success {
successCount++
}
mu.Unlock()
}()
}
wg.Wait()
assert.Equal(t, maxCount, successCount) // 恰好5次成功
}
8.2 测试覆盖矩阵
| 测试场景 | 验证点 | 预期结果 |
|---|---|---|
| 同一客户并发竞争 | 乐观锁互斥 | 成功次数 = maxCount |
| 不同客户并发 | 互不影响 | 全部成功 |
| 达到上限后调用 | 条件判断 | 返回false |
| 事务回滚 | 数据一致性 | count恢复原值 |
| maxCount=0 | 无限制 | 直接返回true |
9. 总结
本方案通过 PostgreSQL 乐观锁 实现了高并发场景下的奖励次数限制,核心设计要点:
- 三层防护体系:消息幂等 → 分布式锁 → DB乐观锁,各层职责清晰
- 条件原子更新:
UPDATE ... WHERE reward_count < maxCount,一行SQL完成检查+更新 - RowsAffected判断:无需额外查询,通过受影响行数即可判断成功与否
- 事务一致性:所有操作在同一事务中,回滚安全
- 被邀请人不受影响:通过 RewardTarget 分支判断,邀请人限制与被邀请人奖励解耦
该方案在实际生产环境中运行稳定,有效解决了"限制用户领取奖励次数"这一高并发经典难题。
更多推荐
所有评论(0)