1.rocketmq如何保证消息不被重复消费

1.1 为什么消息会被重复消费

1.1.1 生产者发送消息重试

生产者发送失败会重试,导致重试。

生产者发送消息的时候,通过负载均衡算法选择到合适的broker的queue,然后通过网络将消息发送到对应的queue上,在发送的过程中,如果网络超时等原因,导致发送失败的话,生产者会进行2次重试。这里有可能生产者已经将消息投递到了broker上面,但是因为网络原因,生产者没有收到broker的响应,此时就会产生重复消费。

1.1.2 消费失败重试

消费失败后重试是一批次的重试,所以会导致重复消费.

rocketmq的消费方式有两种,一种是并发消费,实现 MessageListenerConcurrently监听器,一种是顺序消费,实现 MessageListenerOrderly接口,如果是顺序消费的话,可以启用多个消费实例,并发的消费一个messagequeue里面的消息,主要是为了增大吞吐量。如果是顺序消费的话,一个messagequeue在一个时刻只能有一个消费者实例消费。
在消费过程中,有一个consumeMessageBatchMaxSize的参数,标识消费者一次会消费多少条数据。如果consumeMessageBatchMaxSize>1的话,并且是并发消费的情况下,可能某个线程已经将其中一条数据消费成功,但是另一个线程却消费失败了,这个时候会进行消费重试,会导致整批消息都会被重新消费,导致重复消费问题。

1.1.3 消费者的offset提交失败

消费者消费消息过后,需要告诉broker消费的offset。但是消费者在消费过后,会将offset存储在本地缓存中,让后启动一个5秒的定时任务将offset发送给broker。所以当消费完成过后实际消费了8条消息,但是给broker返回了offset=5过后,consumer便停机了,这个时候,下次便会从offset=5开始消费。

1.1.4 主节点持久化offset失败

consumer同步offset到broker过后,broker会启动一个线程去5秒钟去持久化offset。所以当没有持久化的时候,broker挂掉后可能会丢掉这5秒钟的数据。

1.1.5 主从同步从节点持久化offset失败

consumer同步offset到broker过后,从节点会启动一个时间为10秒的线程,将offset同步并持久化到从broker中,如果这个时候从节点挂掉的话,也可能丢失offset。

1.1.6 重平衡

重平衡是针对消费者的概念,假设最开始一个消费者消费3个queue,但是新加入了一个消费者过后,这个消费者便会分担消费其中一个queue。这里有一个概念,消费者的offset的存储是以消费者组消费了messagequeue多少数据存储在broker中的,现假设有消费者1和消费者2,消费者1消费了queue1,但是由于消费者2的加入,被重平衡给了消费者2,消费者1已经拉取了部分数据到consumer的消费队列中去,但是由于重平衡,消费者1把queue1的数据消费完成过后并不能提交offset给broker,导致offset的丢失,造成重新消费。

1.1.7 长时间消息未被消费导致

rocketmq中有一个机制,就是会启动一个定时任务,去检查消费时间15分钟的消息,并且会将其清理掉(如果是顺序消息不能被清理,如果是普通消息,会将该消息作为一条延迟消息发送,等后面再消费),此时可能该条消息在客户端中已经消费完成,但是后面会重新发送导致重复消费。

为什么需要这个机制,假设线程consumer是并行消费,并且一次从broker中拉取了5条数据到本地队列中,线程1消费了1-2条数据,线程2消费3-5条数据,线程1被阻塞了,线程2在返回offset的时候,发现本地队列还有数据未被消费,所以返回的是offset1,会从头开始消费。这样便可以保证1-5条数据都至少被消费一遍,但也会造成重复消费的问题。

1.1.8 常见场景总结

1.2 怎么解决重复消费的方法

保证消息不被重复消费的主要解决方案主要是保证消费者端的幂等性。主要解决思路是更加messageId作为未Id,保证它的唯一性。所以我们可以用redis或者mysql的唯一索引来存储messageId,当消费的时候根据messageId查询一下redis或者mysql,如果查到有数据,表示已经被重复消费。在插入数据的时候我们也可以采用分布式锁防止并发插入问题。

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐