【消息队列】RocketMQ 生产和消费中的集群模式和广播模式
集群模式默认一条消息只会被一个消费者消费一次(消费者组内的实例之间进行均衡分配),适用于需要保证消息处理的高可用性和负载均衡性的场景;广播模式默认一条消息可以被整个消费组的消费者消费,适用于需要实时通知所有消费者的场景,如广告推送、实时通知等。转载:1、2、
【消息队列】RocketMQ 生产和消费中的集群模式和广播模式
1.集群模式(Cluster Mode):
在集群模式下,多个相同角色的实例组成一个集群,它们共同协作以提供服务。对于生产者和消费者而言,集群模式有以下特点:
生产者: 生产者将消息发送到整个集群,集群内的任意一个节点都可以接收和处理消息。
消费者: 当使用集群消费模式时,RocketMQ 认为任意一条消息只需要被消费组内的任意一个消费者处理即可。每个消息只会被消费者组内的一个实例消费。
优势:
高可用性:集群模式提供了高可用性,即使其中一个节点出现故障,其他节点仍然可以继续提供服务。负载均衡:消息在消费者组内的实例之间进行均衡分配,提高系统整体性能。集群消费模式适用于每条消息只需要被处理一次的场景,也就是说整个消费组会Topic收到全量的消息,而消费组内的消费分担消费这些消息,因此可以通过扩缩消费者数量,来提升或降低消费能力,具体示例如下图所示,是最常见的消费方式。适用场景:
需要保证消息处理的高可用性和负载均衡性的场景。 适用于大规模分布式系统,可以水平扩展。
2、广播模式(Broadcast Mode):
在广播模式下,消息会被发送到所有的订阅者,每个订阅者都会接收到相同的消息。对于生产者和消费者而言,广播模式有以下特点:
生产者: 生产者将消息发送到所有订阅该主题的消费者。
消费者: 消费者组内的每个实例都会接收相同的消息,每个消息都会被所有的消费者实例处理。因此即使扩缩消费者数量也无法提升或降低消费能力。
优势:
实时性:消息能够被所有消费者实例实时处理,适用于需要广播通知或实时更新的场景。
适用场景:
需要实时通知所有消费者的场景,如广告推送、实时通知等。 不需要负载均衡,每个消费者都需要处理所有消息。
3、代码实现
发送消息
package com.lik; import cn.hutool.core.util.IdUtil; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class) @SpringBootTest public class ProducerTest { @Autowired private RocketMQTemplate rocketMQTemplate; @Test public void demo1() { for (int i = 0; i < 10; i++) { rocketMQTemplate.syncSend("topic-A", "消息 " + i + " " + IdUtil.getSnowflake(1, 1).nextId()); } } }
消费者1( consumer-group-1)
package com.lik.listener; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; @Slf4j @Component @RocketMQMessageListener(consumerGroup="consumer-group-1", topic="topic-A") public class TestConsumer1 implements RocketMQListener<String> { @Override public void onMessage(String message) { log.info("消费者1: " + message); } }
消费者2( consumer-group-1)
package com.lik.listener; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; @Slf4j @Component @RocketMQMessageListener(consumerGroup="consumer-group-1", topic="topic-A") public class TestConsumer2 implements RocketMQListener<String> { @Override public void onMessage(String message) { log.info("消费者2: " + message); } }
消费者3( consumer-group-2)
package com.lik.listener; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; @Slf4j @Component @RocketMQMessageListener(consumerGroup="consumer-group-2", topic="topic-A") public class TestConsumer3 implements RocketMQListener<String> { @Override public void onMessage(String message) { log.info("消费者3: " + message); } }
结果:
消费者1:消息1 1485262673296887808 消费者3:消息3 1485262673368190976 消费者3:消息9 1485262673535963136 消费者3:消息6 1485262673435299840 消费者1:消息5 1485262673422716928 消费者3:消息5 1485262673422716928 肖费者1:消息2 1485262673334636544 消费者1:消息6 1485262673435299840 消费者1:消息9 1485262673535963136 消费者3:消息4 1485262673397551104 消费者3:消息8 1485262673494020096 消费者3:消息2 1485262673334636544 消费者3:消息0 1485262661867409408 消费者3:消息7 1485262673464659968
集群模式小结
消费者1 和 消费者2同为一个消费组并订阅同一个主题时,默认情况下会平分这个主题的消息进行消费。消费者3 跟 消费者1、2不在同一个消费组,会单独消费这个主题下的所有消息。
广播模式消费
默认情况下同一消费组群集时会分滩来消费一个主题的消息。如果特殊情况需要集群下的所有消费者都消费这个主题的消息时,可将消费模式设置为广播模式 messageModel = MessageModel.BROADCASTING
消费者5
@Slf4j @Component @RocketMQMessageListener(consumerGroup = "consumer-group-3", topic = "topic-B", messageModel = MessageModel.BROADCASTING) public class TestConsumer5 implements RocketMQListener<String> { @Override public void onMessage(String message) { log.info("消费者5: " + message); } }
消费者6
@Slf4j @Component @RocketMQMessageListener(consumerGroup = "consumer-group-3", topic = "topic-B", messageModel = MessageModel.BROADCASTING) public class TestConsumer6 implements RocketMQListener<String> { @Override public void onMessage(String message) { log.info("消费者6: " + message); } }
结果:
消费者5:消息9 1485274038556299264 消费者5:消息6 1485274038455635968 消费者5:消息3 1485274038401110016 消费者5:消息2 1485274038380138496 消费者5:消息7 1485274038468218880 消费者5:消息5 1485274038434664448 消费者5:消息0 1485274026405400576 消费者5:消息4 1485274038422081536 消费者5:消息8 1485274038489190400 消费者6:消息0 1485274026405400576 消费者6:消息3 1485274038401110016 消费者6:消息5 1485274038434664448 消费者6:消息8 1485274038489190400 消费者6:消息6 1485274038455635968 消费者6:消息2 1485274038380138496 消费者6:消息4 1485274038422081536 消费者6:消息7 1485274038468218880 消费者6:消息1 1485274038354972672
广播模式小结
消费者5 和 消费者6都同为一个消费组(consumer-group-3),并订阅同一个主题(topic-B),只需要在@RocketMQMessageListener注解中添加 messageModel = MessageModel.BROADCASTING 参数设置为广播模式,即可实现每个消费者都消费这个主题的所有消息。
总结
集群模式默认一条消息只会被一个消费者消费一次(消费者组内的实例之间进行均衡分配),适用于需要保证消息处理的高可用性和负载均衡性的场景;广播模式默认一条消息可以被整个消费组的消费者消费,适用于需要实时通知所有消费者的场景,如广告推送、实时通知等。
转载:
更多推荐
所有评论(0)