大家好,我是苍何。

顺序消息是业务中常用的功能之一,而 RocketMQ 默认发送的事普通无序的消息,那该如何发送顺序消息呢?

要保证消息的顺序,要从生产端到 broker 消息存储,再到消费消息都要保证链路的顺序,才可以做到真正的顺序消息。

何为顺序

那苍何首先抛出一个「玄学」问题,何为顺序?

你肯定会用你聪明的大脑瓜子说"顺序不就是有序吗?有啥好说的"。不,那你就浅了,我从顺序的严格划分程度来说,可分为:

  • 普通顺序
  • 严格顺序

普通顺序是指的相同队列收到的消息是有序的(有时也叫局部有序)。这有个前提条件,必须消息在同一个队列,我们知道队列本身就是先进先出嘛,故而自带顺序。

普通顺序场景

严格顺序就好理解啦,我可不管你是哪个队列,哪个 topic,不管你在天涯海角的哪一台服务器,都要满足顺序(有时也叫全局有序)。

严格顺序消费

这一看就很难对不对,在地球上天涯海角的 2 台服务器,硬是要来保证顺序。

那如果是业务来分呢,顺序还可以分为:因果顺序时间顺序

凡事有因必有果,比如交易系统中订单创建、支付、退款等流程,先要创建订单才能支付,支付完成才能退款。这些步骤间有因果顺序

因果顺序

对于时间顺序步骤之间没有因果联系,只要满足先进先出的顺序,比如股票交易中,对于出价相同的交易单,坚持按照先出价先交易的原则,下游处理订单的系统需要严格按照出价顺序来处理订单。

股票交易中的时间顺序

所以一个简单的玄学问题硬是可以整出好几个概念,哈哈,这就是计算机的魅力。总结下何为顺序吧:

何为顺序总结

顺序消息的使用场景

其实顺序消息的使用场景比较多,只要你的业务需要用消息来保证顺序就都用到顺序消息,你肯定会说"苍何,你这说了和没说一样😂"

别急,除了刚刚的股票交易场景,在 MySQL 数据库利用 binlog 消息来进行数据实时增量同步也是需要顺序消息的。

增量同步是指的两个不同数据库,当有一方增删改时,另一方也要同步进行增删改。

增量同步的目的是为了保证数据数据一致性,如果是普通消息,数据可能就乱了。

数据实时增量同步

就这两场景啊,也不多啊,别急,以下这些业务场景也都可用到顺序消息:

  • 库存管理:保证商品入库、出库、盘点等操作的顺序性,避免超卖或库存错误。
  • 事件溯源:在事件驱动架构中,保证事件按照发生的顺序被处理和记录。
  • 消息重试机制:在处理失败需要重试的场景中,保证重试消息的顺序。

顺序发送技术原理

关关难过,关关过。要保证顺序消息,第一关,当然是,先让生产者发送的消息是顺序的吧

在之前文章 [[图解RocketMQ之生产者如何进行消息重试]]中,说到了在 RocketMQ 中消息发送的 3 种方式:同步、异步、单向。

要保证发送消息的顺序,只能保证同步发送,且必须是单个生产者。这 2 个条件缺一不可。

究其原因还是因为顺序发送的技术原理,其技术原理也比较简单,就是要将同一类消息发送到相同队列即可。

RocketMQ 顺序消息的顺序关系是通过消息组(MessageGroup)判定和识别。发送消息的时候需要为每条消息设置 MessageGroup。那如何保证同一 MessageGroup 下同一业务 ID 的消息发送到指定相同的队列呢?

我们不妨看看 RocketMQ 的源码,在 SelectMessageQueueByHash 类中:

public class SelectMessageQueueByHash implements MessageQueueSelector {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        int value = arg.hashCode();
        if (value < 0) {
            value = Math.abs(value);
        }
        value = value % mqs.size();
        return mqs.get(value);,
    }
}

在发送消息的时候,生产者可以使用这个选择器,来选择指定队列的消息;

SendResult sendResult = producer.send(msg, new SelectMessageQueueByHash(), orderId);
  • orderId 指的就是业务 ID,取 orderId 的 hash 值(想通的业务 ID 具有想通的 hash 值)
  • 用哈希值和队列数 mqs.size() 取摸,得到一个索引值,结果会小于队列数
  • 根据索引值从队列列表中取出一个队列,那 hash 值相同,取出的队列也就相同。

通过发送消息的时候指定队列方式就保证了同一个业务 ID 发往相同的队列,也即保证消息发送的顺序性。

生产者保证消息顺序原理

队列列表的获取,Producer 会从 Nameserver 中获取对应 Topic 的 Broker 列表,并将结果缓存到本地,下次将直接从缓存中拿到结果。

顺序存储技术原理

我们在之前的[[图解RocketMQ之消息如何存储]]中有介绍通过 commitlog 存储全量的消息,且会按照 topic 和队列分配到 consumerQueue 中。

由于我们发送消息的时候指定了队列,那对于相同业务 ID 的消息,也会被存储到相同的 consumerQueue 中,且通常在实际项目中,同一个业务位于同一个消息组。

这样相同一笔订单,无论是创建、支付还是退款消息,都按顺序会被发送到相同的队列,不同的订单会被分配到不同的队列中。且消息存储也是按照顺序的。

顺序存储技术原理

这个时候问题很多的小明又问了,如果不同消息组的消息都发往 Topic 中的同一个队列,那这个时候存储的 consumerQueue 中也会有多个消息组的消息,如何保证顺序呢?

多个消息组发送同一个队列如何保证顺序性

这其实就扯到一个概念了,我们这里所说的顺序其实还只是分区顺序,也就是同一个消息组的消息在队列中是能保证顺序的,不同消息组的消息在同一个队列中无法保证顺序。

上图中的例子就是对于消息组 1 中的订单消息在 order-队列 1 中的消息是按照顺序存储的,同理消息组 3 中的消息 msg-1 和 msg-2 在 order-队列 1 中消息也是顺序的。

但消息组 1 和消息组 3 虽然都放在了同一个队列,但并不涉及到顺序。

顺序消费技术原理

RocketMQ 支持 2 种消费模式,集群消费广播消费

集群消费模式下每一条消息只会被 ConsumerGroup 分组下的一个 Consumer 消费,而广播消费模式下,每个 Consumer 都会消费这条消息。

多数场景下用的都是集群消费,也就是一次消费代表一次业务处理,每一条消息都将由集群中的一个实例来对应处理。

而顺序消费也叫有序消费,如果消息是顺序发送,且顺序存储,那理应消费也是一条条消费,这个用屁股想也知道,但实际却没这么简单。

在 Consumer 中不止一个线程在那消费,因为同一个消费者可能会处理不同的队列消息。如果只有一个线程。那不得慢死,实际上会有多个线程同时消费,对应的是 Consumer 中的消费线程池

多个线程消费同一条消息,如何防止消息被重复消费又是一大问题。

如果是你,会怎么做呢?

没错,就是加锁,在 RocketMQ 中用了 3 把锁来保证,分别是分布式锁SynchronizedReentrantLock

我们先来看看第一把锁:分布式锁。

顺序消费用的是 MessageListenerOrderly 来保证顺序消费,RocketMQ 默认已经提供了一个实现类 ConsumeMessageOrderlyService 。

这个 service 在启动的时候就会向 Broker 申请当前消费者负责的队列锁,会将自己的消费组、自己的客户端ID、以及负责的队列发往 Broker,Broker 就把对应的队列与这个消费者绑定,将这个关系存储在了本地。

分布式锁源码

加了这分布式锁,就可以保证在同一个消费组内,一个队列只能被一个 Consumer 消费。

这个分布式锁在 broker 中会过期,默认消费者每 20 s 去续签这把锁。

在 Consumer 中消费线程池会并发消费,分布式锁可管不到这,那就需要另外一把本地锁,那就是 Synchronized。

Synchronized 其实是为了保证同一个队列的消息只会被 Consumer 线程池中的一个线程所消费

最后终于费了九牛二虎之力获得了消费的资格,还不够,在消费内部逻辑中又加了一把更细粒度的 ReentrantLock 锁来标记队列还有消费者在消费。

特别注意在顺序消费时,如果有线程消费发生异常,会阻塞该队列中的其他消息,因为他拿着锁不妨,别的消费者依然也无法获取,之前说过有重试机制可以重试,直到超出最大重试次数,在这段期间内,该队列的消息都将会被阻塞。

实际顺序消息中最大重试次数要谨慎设置,防止消息大量堆积。

实战——如何发送和消费顺序消息

以 PmHub 项目中的顺序发和消费消息为例,我们来实战一波。

顺序发消息

public class OrderMessageProducer {
    private DefaultMQProducer producer;

    public OrderMessageProducer() throws Exception {
        producer = new DefaultMQProducer("order_producer_group");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();
    }

    public void sendOrderMessage(String orderId, String status) throws Exception {
        Message msg = new Message("OrderTopic", status, orderId.getBytes());
        
        SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
            @Override
            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                String orderId = (String) arg;
                int index = Math.abs(orderId.hashCode()) % mqs.size();
                return mqs.get(index);
            }
        }, orderId);

        System.out.printf("Send order message: %s, status: %s, result: %s\n", orderId, status, sendResult);
    }

    public void shutdown() {
        producer.shutdown();
    }
}

在发消息时候,OrderMessageProducer 使用 MessageQueueSelector 来确保同一订单的消息被发送到同一个队列。且需要注意设置发送消息为同步发送(默认)。

顺序消费

public class OrderMessageConsumer {
    private DefaultMQPushConsumer consumer;

    public OrderMessageConsumer() throws Exception {
        consumer = new DefaultMQPushConsumer("order_consumer_group");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.subscribe("OrderTopic", "*");

        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                for (MessageExt msg : msgs) {
                    String orderId = msg.getKeys();
                    String status = new String(msg.getBody());
                    System.out.printf("Consume order message: %s, status: %s\n", orderId, status);
                    // 在这里处理订单状态更新的业务逻辑
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        consumer.start();
        System.out.println("Order message consumer started.");
    }

    public void shutdown() {
        consumer.shutdown();
    }
}

OrderMessageConsumer 使用 MessageListenerOrderly 来保证消息的顺序消费。

这样我们就实现了顺序生产和消费消息。

实际上企业级项目中,实现顺序消息需要考虑更为复杂,稍微一不注意就无法保证顺序性,且顺序消息的性能和队列数有很大关系,一般实际项目中都只会分区顺序即可

好啦,今天的分享结束。

我是苍何,这是图解 RocketMQ 教程的第 9 篇,我们下篇见~

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐