spring boot使用jms对接activeMQ(二)
当mq相关的功能只是项目的一部分独立功能,进行开发的时候可能不总是有mq的环境,这时如果使用jms,控制台就会一直输出mq连接失败的信息——如果可以控制mq连接的开启和关闭,就不会一直弹出连接失败了。这在spring boot里要怎么做?想当初研究了一阵spring jms的源码,了解spring是如何通过一个注解来启动一个consumer,具体过程已经不记得了,指出要点不使用JmsLis...
当mq相关的功能只是项目的一部分独立功能,进行开发的时候可能不总是有mq的环境,这时如果使用jms,控制台就会一直输出mq连接失败的信息——如果可以控制mq连接的开启和关闭,就不会一直弹出连接失败了。这在spring boot里要怎么做?
想当初研究了一阵spring jms的源码,了解spring是如何通过一个注解来启动一个consumer,具体过程已经不记得了,指出要点
不使用JmsListener注解来注册消费者,自行注入MessageListenerContainer来控制consumer的启停
第一步,consumer的注册类
外部只需要调用injectConsumers方法及完成MessageListenerContainer的注册,每个consumer都在唯一的一个container中
@Component
@Slf4j
public class ConsumerRegister {
private static SimpleMessageConverter jmsConverter = new SimpleMessageConverter();
private ConfigurableApplicationContext applicationContext;
private ConnectionFactory connectionFactory;
public ConsumerRegister(ConfigurableApplicationContext applicationContext, ConnectionFactory connectionFactory) {
this.applicationContext = applicationContext;
this.connectionFactory = connectionFactory;
}
public Map<String, DefaultMessageListenerContainer> injectConsumers(Map<ActiveMQDestination, CustomConsumer> input) {
if (null == input || input.size() <= 0) {
return null;
}
Map<String, DefaultMessageListenerContainer> result = new HashMap<>();
Random rnd = new Random();
for (Map.Entry<ActiveMQDestination, CustomConsumer> destinationCustomConsumerEntry : input.entrySet()) {
ActiveMQDestination destination = destinationCustomConsumerEntry.getKey();
CustomConsumer consumer = destinationCustomConsumerEntry.getValue();
BeanDefinitionBuilder definitionBuilder = getListenerContainerBeanBuilder(destination, consumer);
String beanName = ("consumer_" + System.currentTimeMillis()) + "_" + rnd.nextInt(32);
registeListenerContainerBean(beanName, definitionBuilder);
result.put(destination.getPhysicalName(), applicationContext.getBean(beanName, DefaultMessageListenerContainer.class));
}
return result;
}
/**
* bean注入到容器
* @author chenbihui 2019/6/23 18:43
*/
private void registeListenerContainerBean(String beanName, BeanDefinitionBuilder listenerContainerBeanBuilder) {
try {
DefaultListableBeanFactory beanFactory = (DefaultListableBeanFactory) applicationContext.getBeanFactory();
beanFactory.registerBeanDefinition(beanName, listenerContainerBeanBuilder.getRawBeanDefinition());
log.info("bean register: {}", beanName);
} catch (Throwable throwable) {
log.error("bean register failed.", throwable);
}
}
/**
* 创建DefaultMessageListenerContainer的beanbuilder
* @author chenbihui 2019/6/23 18:43
*/
private BeanDefinitionBuilder getListenerContainerBeanBuilder(Destination destination, CustomConsumer consumer) {
MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter();
messageListenerAdapter.setDelegate(consumer);
messageListenerAdapter.setDefaultListenerMethod("receiveObject");
messageListenerAdapter.setMessageConverter(jmsConverter);
BeanDefinitionBuilder definitionBuilder =
BeanDefinitionBuilder.genericBeanDefinition(DefaultMessageListenerContainer.class);
definitionBuilder.addPropertyValue("connectionFactory", connectionFactory);
definitionBuilder.addPropertyValue("destination", destination);
definitionBuilder.addPropertyValue("messageListener", messageListenerAdapter);
definitionBuilder.addPropertyValue("errorHandler", new ErrorHandler(){
@Override
public void handleError(Throwable throwable) {
log.error("listener error caught", throwable);
}
});
return definitionBuilder;
}
}
第二步,consumer的监控类
外部只需要调用startWatch方法即启动监控,每watchDelayMS毫秒检查一次
@Slf4j
public class ConsumerWatcher {
private long watchDelayMS;
private ConnectionFactory connectionFactory;
private ScheduledExecutorService service = Executors.newScheduledThreadPool(1);
private Map<String, DefaultMessageListenerContainer> containerMap;
public ConsumerWatcher(long watchDelayMS,
ConnectionFactory connectionFactory,
Map<String, DefaultMessageListenerContainer> containerMap) {
this.watchDelayMS = watchDelayMS;
this.connectionFactory = connectionFactory;
this.containerMap = containerMap;
}
public void startWatch() {
service.scheduleWithFixedDelay(new MQOnlineChecker(), 5L, watchDelayMS, TimeUnit.MILLISECONDS);
}
protected class MQOnlineChecker implements Runnable {
@Override
public void run() {
try {
Connection connection = connectionFactory.createConnection();
connection.start();
connection.getMetaData();
connection.close();
startListener();
} catch (JMSException e) {
// log.error("error in connecting mq.", e);
log.error("error in connecting mq. stop listeners");
closeListener();
}
}
private void closeListener() {
Collection<DefaultMessageListenerContainer> containerCollection = containerMap.values();
for (DefaultMessageListenerContainer defaultMessageListenerContainer : containerCollection) {
if (defaultMessageListenerContainer.isRunning()) {
defaultMessageListenerContainer.stop();
log.info("listener container stopped. {}", defaultMessageListenerContainer.toString());
}
}
}
private void startListener() {
Collection<DefaultMessageListenerContainer> containerCollection = containerMap.values();
for (DefaultMessageListenerContainer defaultMessageListenerContainer : containerCollection) {
try {
if (!defaultMessageListenerContainer.isRunning()) {
defaultMessageListenerContainer.start();
log.info("listener container started. {}", defaultMessageListenerContainer.toString());
}
} catch (Throwable throwable) {
log.error("listener container failed.", throwable);
}
}
}
}
}
至此,测试一下,日志输出如下。activemq重启过程中,只有监测器的日志输出
2019-06-25 22:28:12.927 INFO 15232 --- [nio-7890-exec-2] q.p.s.j.consumer.ConsumerRegister : bean register: consumer_1561472892927_26
2019-06-25 22:28:15.540 INFO 15232 --- [nio-7890-exec-2] q.p.s.jmswatch.controller.MQController : listener container injected
2019-06-25 22:28:15.543 INFO 15232 --- [nio-7890-exec-2] q.p.s.jmswatch.controller.MQController : watcher started
2019-06-25 22:28:16.759 ERROR 15232 --- [pool-1-thread-1] q.p.s.jmswatch.consumer.ConsumerWatcher : error in connecting mq. stop listeners
2019-06-25 22:28:20.762 ERROR 15232 --- [pool-1-thread-1] q.p.s.jmswatch.consumer.ConsumerWatcher : error in connecting mq. stop listeners
2019-06-25 22:28:24.767 ERROR 15232 --- [pool-1-thread-1] q.p.s.jmswatch.consumer.ConsumerWatcher : error in connecting mq. stop listeners
2019-06-25 22:28:28.782 ERROR 15232 --- [pool-1-thread-1] q.p.s.jmswatch.consumer.ConsumerWatcher : error in connecting mq. stop listeners
2019-06-25 22:28:32.786 ERROR 15232 --- [pool-1-thread-1] q.p.s.jmswatch.consumer.ConsumerWatcher : error in connecting mq. stop listeners
2019-06-25 22:28:35.911 INFO 15232 --- [pool-1-thread-1] q.p.s.jmswatch.consumer.ConsumerWatcher : listener container started. org.springframework.jms.listener.DefaultMessageListenerContainer@16b04f3b
2019-06-25 22:29:10.267 INFO 15232 --- [1472892927_26-1] q.p.s.jmswatch.consumer.LogConsumer : receive: send message: gogogo1561472950205
2019-06-25 22:29:23.780 WARN 15232 --- [1472892927_26-1] o.s.j.l.DefaultMessageListenerContainer : Setup of JMS message listener invoker failed for destination 'queue://local_test_queue' - trying to recover. Cause: java.io.EOFException
2019-06-25 22:29:24.795 ERROR 15232 --- [1472892927_26-1] o.s.j.l.DefaultMessageListenerContainer : Could not refresh JMS Connection for destination 'queue://local_test_queue' - retrying using FixedBackOff{interval=5000, currentAttempts=0, maxAttempts=unlimited}. Cause: Could not connect to broker URL: tcp://127.0.0.1:61616. Reason: java.net.ConnectException: Connection refused: connect
2019-06-25 22:29:25.088 ERROR 15232 --- [pool-1-thread-1] q.p.s.jmswatch.consumer.ConsumerWatcher : error in connecting mq. stop listeners
2019-06-25 22:29:25.088 INFO 15232 --- [pool-1-thread-1] q.p.s.jmswatch.consumer.ConsumerWatcher : listener container stopped. org.springframework.jms.listener.DefaultMessageListenerContainer@16b04f3b
2019-06-25 22:29:29.093 ERROR 15232 --- [pool-1-thread-1] q.p.s.jmswatch.consumer.ConsumerWatcher : error in connecting mq. stop listeners
2019-06-25 22:29:33.097 ERROR 15232 --- [pool-1-thread-1] q.p.s.jmswatch.consumer.ConsumerWatcher : error in connecting mq. stop listeners
2019-06-25 22:29:37.099 ERROR 15232 --- [pool-1-thread-1] q.p.s.jmswatch.consumer.ConsumerWatcher : error in connecting mq. stop listeners
相当于是给出手动维护消费者同mq的连接
拖更,一拖就是半年……下次还是不开这种连续文了
---------------------------------20190701补充-------------------------------------------------
ActiveMQConnentionFactory使用的brokerUrl加上failover开头,也能断线重连
更多推荐
所有评论(0)