当mq相关的功能只是项目的一部分独立功能,进行开发的时候可能不总是有mq的环境,这时如果使用jms,控制台就会一直输出mq连接失败的信息——如果可以控制mq连接的开启和关闭,就不会一直弹出连接失败了。这在spring boot里要怎么做?

想当初研究了一阵spring jms的源码,了解spring是如何通过一个注解来启动一个consumer,具体过程已经不记得了,指出要点

不使用JmsListener注解来注册消费者,自行注入MessageListenerContainer来控制consumer的启停

第一步,consumer的注册类

外部只需要调用injectConsumers方法及完成MessageListenerContainer的注册,每个consumer都在唯一的一个container中

@Component
@Slf4j
public class ConsumerRegister {
    private static SimpleMessageConverter jmsConverter = new SimpleMessageConverter();
    private ConfigurableApplicationContext applicationContext;
    private ConnectionFactory connectionFactory;

    public ConsumerRegister(ConfigurableApplicationContext applicationContext, ConnectionFactory connectionFactory) {
        this.applicationContext = applicationContext;
        this.connectionFactory = connectionFactory;
    }

    public Map<String, DefaultMessageListenerContainer> injectConsumers(Map<ActiveMQDestination, CustomConsumer> input) {
        if (null == input || input.size() <= 0) {
            return null;
        }

        Map<String, DefaultMessageListenerContainer> result = new HashMap<>();
        Random rnd = new Random();
        for (Map.Entry<ActiveMQDestination, CustomConsumer> destinationCustomConsumerEntry : input.entrySet()) {
            ActiveMQDestination destination = destinationCustomConsumerEntry.getKey();
            CustomConsumer consumer = destinationCustomConsumerEntry.getValue();

            BeanDefinitionBuilder definitionBuilder = getListenerContainerBeanBuilder(destination, consumer);
            String beanName = ("consumer_" + System.currentTimeMillis()) + "_" + rnd.nextInt(32);
            registeListenerContainerBean(beanName, definitionBuilder);

            result.put(destination.getPhysicalName(), applicationContext.getBean(beanName, DefaultMessageListenerContainer.class));
        }
        return result;
    }

    /**
     * bean注入到容器
     * @author chenbihui 2019/6/23 18:43
     */
    private void registeListenerContainerBean(String beanName, BeanDefinitionBuilder listenerContainerBeanBuilder) {
        try {
            DefaultListableBeanFactory beanFactory = (DefaultListableBeanFactory) applicationContext.getBeanFactory();

            beanFactory.registerBeanDefinition(beanName, listenerContainerBeanBuilder.getRawBeanDefinition());

            log.info("bean register: {}", beanName);
        } catch (Throwable throwable) {
            log.error("bean register failed.", throwable);
        }
    }

    /**
     * 创建DefaultMessageListenerContainer的beanbuilder
     * @author chenbihui 2019/6/23 18:43
     */
    private BeanDefinitionBuilder getListenerContainerBeanBuilder(Destination destination, CustomConsumer consumer) {
        MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter();
        messageListenerAdapter.setDelegate(consumer);
        messageListenerAdapter.setDefaultListenerMethod("receiveObject");
        messageListenerAdapter.setMessageConverter(jmsConverter);

        BeanDefinitionBuilder definitionBuilder =
                BeanDefinitionBuilder.genericBeanDefinition(DefaultMessageListenerContainer.class);
        definitionBuilder.addPropertyValue("connectionFactory", connectionFactory);
        definitionBuilder.addPropertyValue("destination", destination);
        definitionBuilder.addPropertyValue("messageListener", messageListenerAdapter);
        definitionBuilder.addPropertyValue("errorHandler", new ErrorHandler(){
            @Override
            public void handleError(Throwable throwable) {
                log.error("listener error caught", throwable);
            }
        });
        return definitionBuilder;
    }

}

第二步,consumer的监控类

外部只需要调用startWatch方法即启动监控,每watchDelayMS毫秒检查一次

@Slf4j
public class ConsumerWatcher {

    private long watchDelayMS;
    private ConnectionFactory connectionFactory;
    private ScheduledExecutorService service = Executors.newScheduledThreadPool(1);
    private Map<String, DefaultMessageListenerContainer> containerMap;

    public ConsumerWatcher(long watchDelayMS,
                           ConnectionFactory connectionFactory,
                           Map<String, DefaultMessageListenerContainer> containerMap) {
        this.watchDelayMS = watchDelayMS;
        this.connectionFactory = connectionFactory;
        this.containerMap = containerMap;
    }

    public void startWatch() {
        service.scheduleWithFixedDelay(new MQOnlineChecker(), 5L, watchDelayMS, TimeUnit.MILLISECONDS);
    }

    protected class MQOnlineChecker implements Runnable {
        @Override
        public void run() {
            try {
                Connection connection = connectionFactory.createConnection();
                connection.start();
                connection.getMetaData();
                connection.close();

                startListener();

            } catch (JMSException e) {
//                log.error("error in connecting mq.", e);
                log.error("error in connecting mq. stop listeners");
                closeListener();

            }
        }

        private void closeListener() {
            Collection<DefaultMessageListenerContainer> containerCollection = containerMap.values();
            for (DefaultMessageListenerContainer defaultMessageListenerContainer : containerCollection) {
                if (defaultMessageListenerContainer.isRunning()) {
                    defaultMessageListenerContainer.stop();
                    log.info("listener container stopped. {}", defaultMessageListenerContainer.toString());
                }
            }
        }

        private void startListener() {
            Collection<DefaultMessageListenerContainer> containerCollection = containerMap.values();
            for (DefaultMessageListenerContainer defaultMessageListenerContainer : containerCollection) {
                try {
                    if (!defaultMessageListenerContainer.isRunning()) {
                        defaultMessageListenerContainer.start();
                        log.info("listener container started. {}", defaultMessageListenerContainer.toString());
                    }
                } catch (Throwable throwable) {
                    log.error("listener container failed.", throwable);
                }
            }
        }
    }

}

至此,测试一下,日志输出如下。activemq重启过程中,只有监测器的日志输出

2019-06-25 22:28:12.927  INFO 15232 --- [nio-7890-exec-2] q.p.s.j.consumer.ConsumerRegister        : bean register: consumer_1561472892927_26
2019-06-25 22:28:15.540  INFO 15232 --- [nio-7890-exec-2] q.p.s.jmswatch.controller.MQController   : listener container injected
2019-06-25 22:28:15.543  INFO 15232 --- [nio-7890-exec-2] q.p.s.jmswatch.controller.MQController   : watcher started
2019-06-25 22:28:16.759 ERROR 15232 --- [pool-1-thread-1] q.p.s.jmswatch.consumer.ConsumerWatcher  : error in connecting mq. stop listeners
2019-06-25 22:28:20.762 ERROR 15232 --- [pool-1-thread-1] q.p.s.jmswatch.consumer.ConsumerWatcher  : error in connecting mq. stop listeners
2019-06-25 22:28:24.767 ERROR 15232 --- [pool-1-thread-1] q.p.s.jmswatch.consumer.ConsumerWatcher  : error in connecting mq. stop listeners
2019-06-25 22:28:28.782 ERROR 15232 --- [pool-1-thread-1] q.p.s.jmswatch.consumer.ConsumerWatcher  : error in connecting mq. stop listeners
2019-06-25 22:28:32.786 ERROR 15232 --- [pool-1-thread-1] q.p.s.jmswatch.consumer.ConsumerWatcher  : error in connecting mq. stop listeners
2019-06-25 22:28:35.911  INFO 15232 --- [pool-1-thread-1] q.p.s.jmswatch.consumer.ConsumerWatcher  : listener container started. org.springframework.jms.listener.DefaultMessageListenerContainer@16b04f3b
2019-06-25 22:29:10.267  INFO 15232 --- [1472892927_26-1] q.p.s.jmswatch.consumer.LogConsumer      : receive: send message: gogogo1561472950205
2019-06-25 22:29:23.780  WARN 15232 --- [1472892927_26-1] o.s.j.l.DefaultMessageListenerContainer  : Setup of JMS message listener invoker failed for destination 'queue://local_test_queue' - trying to recover. Cause: java.io.EOFException
2019-06-25 22:29:24.795 ERROR 15232 --- [1472892927_26-1] o.s.j.l.DefaultMessageListenerContainer  : Could not refresh JMS Connection for destination 'queue://local_test_queue' - retrying using FixedBackOff{interval=5000, currentAttempts=0, maxAttempts=unlimited}. Cause: Could not connect to broker URL: tcp://127.0.0.1:61616. Reason: java.net.ConnectException: Connection refused: connect
2019-06-25 22:29:25.088 ERROR 15232 --- [pool-1-thread-1] q.p.s.jmswatch.consumer.ConsumerWatcher  : error in connecting mq. stop listeners
2019-06-25 22:29:25.088  INFO 15232 --- [pool-1-thread-1] q.p.s.jmswatch.consumer.ConsumerWatcher  : listener container stopped. org.springframework.jms.listener.DefaultMessageListenerContainer@16b04f3b
2019-06-25 22:29:29.093 ERROR 15232 --- [pool-1-thread-1] q.p.s.jmswatch.consumer.ConsumerWatcher  : error in connecting mq. stop listeners
2019-06-25 22:29:33.097 ERROR 15232 --- [pool-1-thread-1] q.p.s.jmswatch.consumer.ConsumerWatcher  : error in connecting mq. stop listeners
2019-06-25 22:29:37.099 ERROR 15232 --- [pool-1-thread-1] q.p.s.jmswatch.consumer.ConsumerWatcher  : error in connecting mq. stop listeners

 

相当于是给出手动维护消费者同mq的连接

拖更,一拖就是半年……下次还是不开这种连续文了

 

---------------------------------20190701补充-------------------------------------------------

ActiveMQConnentionFactory使用的brokerUrl加上failover开头,也能断线重连

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐