参考: https://blog.csdn.net/heihaozi/article/details/119145266

首先去看了一下roketmq的issue,close里面有相关的报错。于是我就把roketmq和rocketmq-spring-boot-starter都升了级:

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot</artifactId>
            <version>2.2.2</version>
        </dependency>

但是这次还是报错,只是稍微变了一下:

invokeAsync call the addr[null] timeout

在这里插入图片描述

然后跟踪堆栈,发现和之前版本不同,这个超时时间不再是写死了,可以通过get和set方法获取或者设置它的值:

// MQClientInstance类updateTopicRouteInfoFromNameServer方法623行
 topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, clientConfig.getMqClientApiTimeout());

于是找这个怎么设置,最终找到RocketMQAutoConfiguration方法中注入defaultMQProducer和defaultLitePullConsumer时可以设置,但是这个类是rocketmq-spring-boot-starter包里面的类,没办法只有在我们类里面重新注入它们了,代码如下:

     //自定义了一个变量,通过nacos赋值
    @Value("${rocketmq.mq-client-api-timeout:10000}")
    private Integer mqClientApiTimeout;

    /**
     * 自定DefaultMQProducer
     * @param rocketMQProperties
     * @return
     */
    @Bean(PRODUCER_BEAN_NAME)
    @ConditionalOnProperty(prefix = "rocketmq", value = {"name-server", "producer.group"})
    public DefaultMQProducer defaultMQProducer(RocketMQProperties rocketMQProperties) {
        RocketMQProperties.Producer producerConfig = rocketMQProperties.getProducer();
        String nameServer = rocketMQProperties.getNameServer();
        String groupName = producerConfig.getGroup();
        Assert.hasText(nameServer, "[rocketmq.name-server] must not be null");
        Assert.hasText(groupName, "[rocketmq.producer.group] must not be null");

        String accessChannel = rocketMQProperties.getAccessChannel();

        String ak = rocketMQProperties.getProducer().getAccessKey();
        String sk = rocketMQProperties.getProducer().getSecretKey();
        boolean isEnableMsgTrace = rocketMQProperties.getProducer().isEnableMsgTrace();
        String customizedTraceTopic = rocketMQProperties.getProducer().getCustomizedTraceTopic();

        DefaultMQProducer producer = RocketMQUtil.createDefaultMQProducer(groupName, ak, sk, isEnableMsgTrace, customizedTraceTopic);

        producer.setNamesrvAddr(nameServer);
        if (!StringUtils.isEmpty(accessChannel)) {
            producer.setAccessChannel(AccessChannel.valueOf(accessChannel));
        }
        producer.setSendMsgTimeout(producerConfig.getSendMessageTimeout());
        producer.setRetryTimesWhenSendFailed(producerConfig.getRetryTimesWhenSendFailed());
        producer.setRetryTimesWhenSendAsyncFailed(producerConfig.getRetryTimesWhenSendAsyncFailed());
        producer.setMaxMessageSize(producerConfig.getMaxMessageSize());
        producer.setCompressMsgBodyOverHowmuch(producerConfig.getCompressMessageBodyThreshold());
        producer.setRetryAnotherBrokerWhenNotStoreOK(producerConfig.isRetryNextServer());
        producer.setUseTLS(producerConfig.isTlsEnable());
        producer.setNamespace(producerConfig.getNamespace());
        //自定义属性值
        producer.setMqClientApiTimeout(mqClientApiTimeout);
        return producer;
    }
    /**
     * 自定DefaultMQConsumer
     * @param rocketMQProperties
     * @return
     */
    @Bean(CONSUMER_BEAN_NAME)
    @ConditionalOnProperty(prefix = "rocketmq", value = {"name-server", "consumer.group", "consumer.topic"})
    public DefaultLitePullConsumer defaultLitePullConsumer(RocketMQProperties rocketMQProperties)
            throws MQClientException {
        RocketMQProperties.Consumer consumerConfig = rocketMQProperties.getConsumer();
        String nameServer = rocketMQProperties.getNameServer();
        String groupName = consumerConfig.getGroup();
        String topicName = consumerConfig.getTopic();
        Assert.hasText(nameServer, "[rocketmq.name-server] must not be null");
        Assert.hasText(groupName, "[rocketmq.consumer.group] must not be null");
        Assert.hasText(topicName, "[rocketmq.consumer.topic] must not be null");

        String accessChannel = rocketMQProperties.getAccessChannel();
        MessageModel messageModel = MessageModel.valueOf(consumerConfig.getMessageModel());
        SelectorType selectorType = SelectorType.valueOf(consumerConfig.getSelectorType());
        String selectorExpression = consumerConfig.getSelectorExpression();
        String ak = consumerConfig.getAccessKey();
        String sk = consumerConfig.getSecretKey();
        int pullBatchSize = consumerConfig.getPullBatchSize();
        boolean useTLS = consumerConfig.isTlsEnable();

        DefaultLitePullConsumer litePullConsumer = RocketMQUtil.createDefaultLitePullConsumer(nameServer, accessChannel,
                groupName, topicName, messageModel, selectorType, selectorExpression, ak, sk, pullBatchSize, useTLS);
        litePullConsumer.setEnableMsgTrace(consumerConfig.isEnableMsgTrace());
        litePullConsumer.setCustomizedTraceTopic(consumerConfig.getCustomizedTraceTopic());
        litePullConsumer.setNamespace(consumerConfig.getNamespace());
        //自定义参数设置
        litePullConsumer.setMqClientApiTimeout(mqClientApiTimeout);
        return litePullConsumer;
    }

以上步骤做了之后,重启还是报错,跟踪发现,这次是一个自定义的DefaultMQPushConsumer 消费者报错,于是找到消费者设置参数的地方,再设置一遍:

    @Value("${rocketmq.mq-client-api-timeout:10000}")
    private Integer mqClientApiTimeout;

    @Override
    public void prepareStart(DefaultMQPushConsumer consumer) {
        if (StrUtil.isNotEmpty(clientIp)) {
            consumer.setClientIP(clientIp);
        }
        consumer.setConsumeFromWhere(CONSUME_FROM_TIMESTAMP);
        consumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis()));
        //自定义api超时时间
        consumer.setMqClientApiTimeout(mqClientApiTimeout);
    }
}

这次之后,终于不报错了。断点发现,超时的地方执行了20s,不清楚具体是怎么回事,后面如果找到具体原因再更新一下。先用上面的方法治标拖延点时间。

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐