【消息队列】RocketMQ 生产和消费中的集群模式和广播模式

1.集群模式(Cluster Mode):

在集群模式下,多个相同角色的实例组成一个集群,它们共同协作以提供服务。对于生产者和消费者而言,集群模式有以下特点:

生产者: 生产者将消息发送到整个集群,集群内的任意一个节点都可以接收和处理消息。

消费者: 当使用集群消费模式时,RocketMQ 认为任意一条消息只需要被消费组内的任意一个消费者处理即可。每个消息只会被消费者组内的一个实例消费。

优势:

高可用性:集群模式提供了高可用性,即使其中一个节点出现故障,其他节点仍然可以继续提供服务。负载均衡:消息在消费者组内的实例之间进行均衡分配,提高系统整体性能。集群消费模式适用于每条消息只需要被处理一次的场景,也就是说整个消费组会Topic收到全量的消息,而消费组内的消费分担消费这些消息,因此可以通过扩缩消费者数量,来提升或降低消费能力,具体示例如下图所示,是最常见的消费方式。适用场景:

需要保证消息处理的高可用性和负载均衡性的场景。 适用于大规模分布式系统,可以水平扩展。

2、广播模式(Broadcast Mode):

在广播模式下,消息会被发送到所有的订阅者,每个订阅者都会接收到相同的消息。对于生产者和消费者而言,广播模式有以下特点:

生产者: 生产者将消息发送到所有订阅该主题的消费者。

消费者: 消费者组内的每个实例都会接收相同的消息,每个消息都会被所有的消费者实例处理。因此即使扩缩消费者数量也无法提升或降低消费能力。

优势:

实时性:消息能够被所有消费者实例实时处理,适用于需要广播通知或实时更新的场景。

适用场景:

需要实时通知所有消费者的场景,如广告推送、实时通知等。 不需要负载均衡,每个消费者都需要处理所有消息。

3、代码实现

发送消息

package com.lik;
​
import cn.hutool.core.util.IdUtil;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
​
@RunWith(SpringRunner.class)
@SpringBootTest
public class ProducerTest {
​
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
​
    @Test
    public void demo1() {
        for (int i = 0; i < 10; i++) {
            rocketMQTemplate.syncSend("topic-A", "消息  " + i + "  " + IdUtil.getSnowflake(1, 1).nextId());
        }
    }
}
​

消费者1( consumer-group-1)

package com.lik.listener;
​
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
​
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup="consumer-group-1", topic="topic-A")
public class TestConsumer1 implements RocketMQListener<String> {
​
    @Override
    public void onMessage(String message) {
        log.info("消费者1: " + message);
    }
}
​
​

消费者2( consumer-group-1)

package com.lik.listener;
​
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
​
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup="consumer-group-1", topic="topic-A")
public class TestConsumer2 implements RocketMQListener<String> {
​
    @Override
    public void onMessage(String message) {
        log.info("消费者2: " + message);
    }
}
​

消费者3( consumer-group-2)

package com.lik.listener;
​
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
​
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup="consumer-group-2", topic="topic-A")
public class TestConsumer3 implements RocketMQListener<String> {
​
    @Override
    public void onMessage(String message) {
        log.info("消费者3: " + message);
    }
}
​

结果:

消费者1:消息1 1485262673296887808
消费者3:消息3 1485262673368190976
消费者3:消息9 1485262673535963136
消费者3:消息6 1485262673435299840
消费者1:消息5 1485262673422716928
消费者3:消息5 1485262673422716928
肖费者1:消息2 1485262673334636544
消费者1:消息6 1485262673435299840
消费者1:消息9 1485262673535963136
消费者3:消息4 1485262673397551104
消费者3:消息8 1485262673494020096
消费者3:消息2 1485262673334636544
消费者3:消息0 1485262661867409408
消费者3:消息7 1485262673464659968

集群模式小结

消费者1 和 消费者2同为一个消费组并订阅同一个主题时,默认情况下会平分这个主题的消息进行消费。消费者3 跟 消费者1、2不在同一个消费组,会单独消费这个主题下的所有消息。

广播模式消费

默认情况下同一消费组群集时会分滩来消费一个主题的消息。如果特殊情况需要集群下的所有消费者都消费这个主题的消息时,可将消费模式设置为广播模式 messageModel = MessageModel.BROADCASTING

消费者5

@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "consumer-group-3", topic = "topic-B", messageModel = MessageModel.BROADCASTING)
public class TestConsumer5 implements RocketMQListener<String> {
​
    @Override
    public void onMessage(String message) {
        log.info("消费者5: " + message);
    }
}
​

消费者6

@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "consumer-group-3", topic = "topic-B", messageModel = MessageModel.BROADCASTING)
public class TestConsumer6 implements RocketMQListener<String> {
​
    @Override
    public void onMessage(String message) {
        log.info("消费者6: " + message);
    }
}
​

结果:

消费者5:消息9 1485274038556299264
消费者5:消息6 1485274038455635968
消费者5:消息3 1485274038401110016
消费者5:消息2 1485274038380138496
消费者5:消息7 1485274038468218880
消费者5:消息5 1485274038434664448
消费者5:消息0 1485274026405400576
消费者5:消息4 1485274038422081536
消费者5:消息8 1485274038489190400
​
消费者6:消息0 1485274026405400576
消费者6:消息3 1485274038401110016
消费者6:消息5 1485274038434664448
消费者6:消息8 1485274038489190400
消费者6:消息6 1485274038455635968
消费者6:消息2 1485274038380138496
消费者6:消息4 1485274038422081536
消费者6:消息7 1485274038468218880
消费者6:消息1 1485274038354972672
​

广播模式小结

消费者5 和 消费者6都同为一个消费组(consumer-group-3),并订阅同一个主题(topic-B),只需要在@RocketMQMessageListener注解中添加 messageModel = MessageModel.BROADCASTING 参数设置为广播模式,即可实现每个消费者都消费这个主题的所有消息。

总结

集群模式默认一条消息只会被一个消费者消费一次(消费者组内的实例之间进行均衡分配),适用于需要保证消息处理的高可用性和负载均衡性的场景;广播模式默认一条消息可以被整个消费组的消费者消费,适用于需要实时通知所有消费者的场景,如广告推送、实时通知等。

转载:

1、http://t.csdnimg.cn/SthbM

2、http://t.csdnimg.cn/IyQRF

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐