roketmq报错invokeAsync call timeout
rocketmq生产和消费时报错:invokeAsync call the addr[null] timeout
·
参考: https://blog.csdn.net/heihaozi/article/details/119145266
首先去看了一下roketmq的issue,close里面有相关的报错。于是我就把roketmq和rocketmq-spring-boot-starter都升了级:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot</artifactId>
<version>2.2.2</version>
</dependency>
但是这次还是报错,只是稍微变了一下:
invokeAsync call the addr[null] timeout
然后跟踪堆栈,发现和之前版本不同,这个超时时间不再是写死了,可以通过get和set方法获取或者设置它的值:
// MQClientInstance类updateTopicRouteInfoFromNameServer方法623行
topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, clientConfig.getMqClientApiTimeout());
于是找这个怎么设置,最终找到RocketMQAutoConfiguration方法中注入defaultMQProducer和defaultLitePullConsumer时可以设置,但是这个类是rocketmq-spring-boot-starter包里面的类,没办法只有在我们类里面重新注入它们了,代码如下:
//自定义了一个变量,通过nacos赋值
@Value("${rocketmq.mq-client-api-timeout:10000}")
private Integer mqClientApiTimeout;
/**
* 自定DefaultMQProducer
* @param rocketMQProperties
* @return
*/
@Bean(PRODUCER_BEAN_NAME)
@ConditionalOnProperty(prefix = "rocketmq", value = {"name-server", "producer.group"})
public DefaultMQProducer defaultMQProducer(RocketMQProperties rocketMQProperties) {
RocketMQProperties.Producer producerConfig = rocketMQProperties.getProducer();
String nameServer = rocketMQProperties.getNameServer();
String groupName = producerConfig.getGroup();
Assert.hasText(nameServer, "[rocketmq.name-server] must not be null");
Assert.hasText(groupName, "[rocketmq.producer.group] must not be null");
String accessChannel = rocketMQProperties.getAccessChannel();
String ak = rocketMQProperties.getProducer().getAccessKey();
String sk = rocketMQProperties.getProducer().getSecretKey();
boolean isEnableMsgTrace = rocketMQProperties.getProducer().isEnableMsgTrace();
String customizedTraceTopic = rocketMQProperties.getProducer().getCustomizedTraceTopic();
DefaultMQProducer producer = RocketMQUtil.createDefaultMQProducer(groupName, ak, sk, isEnableMsgTrace, customizedTraceTopic);
producer.setNamesrvAddr(nameServer);
if (!StringUtils.isEmpty(accessChannel)) {
producer.setAccessChannel(AccessChannel.valueOf(accessChannel));
}
producer.setSendMsgTimeout(producerConfig.getSendMessageTimeout());
producer.setRetryTimesWhenSendFailed(producerConfig.getRetryTimesWhenSendFailed());
producer.setRetryTimesWhenSendAsyncFailed(producerConfig.getRetryTimesWhenSendAsyncFailed());
producer.setMaxMessageSize(producerConfig.getMaxMessageSize());
producer.setCompressMsgBodyOverHowmuch(producerConfig.getCompressMessageBodyThreshold());
producer.setRetryAnotherBrokerWhenNotStoreOK(producerConfig.isRetryNextServer());
producer.setUseTLS(producerConfig.isTlsEnable());
producer.setNamespace(producerConfig.getNamespace());
//自定义属性值
producer.setMqClientApiTimeout(mqClientApiTimeout);
return producer;
}
/**
* 自定DefaultMQConsumer
* @param rocketMQProperties
* @return
*/
@Bean(CONSUMER_BEAN_NAME)
@ConditionalOnProperty(prefix = "rocketmq", value = {"name-server", "consumer.group", "consumer.topic"})
public DefaultLitePullConsumer defaultLitePullConsumer(RocketMQProperties rocketMQProperties)
throws MQClientException {
RocketMQProperties.Consumer consumerConfig = rocketMQProperties.getConsumer();
String nameServer = rocketMQProperties.getNameServer();
String groupName = consumerConfig.getGroup();
String topicName = consumerConfig.getTopic();
Assert.hasText(nameServer, "[rocketmq.name-server] must not be null");
Assert.hasText(groupName, "[rocketmq.consumer.group] must not be null");
Assert.hasText(topicName, "[rocketmq.consumer.topic] must not be null");
String accessChannel = rocketMQProperties.getAccessChannel();
MessageModel messageModel = MessageModel.valueOf(consumerConfig.getMessageModel());
SelectorType selectorType = SelectorType.valueOf(consumerConfig.getSelectorType());
String selectorExpression = consumerConfig.getSelectorExpression();
String ak = consumerConfig.getAccessKey();
String sk = consumerConfig.getSecretKey();
int pullBatchSize = consumerConfig.getPullBatchSize();
boolean useTLS = consumerConfig.isTlsEnable();
DefaultLitePullConsumer litePullConsumer = RocketMQUtil.createDefaultLitePullConsumer(nameServer, accessChannel,
groupName, topicName, messageModel, selectorType, selectorExpression, ak, sk, pullBatchSize, useTLS);
litePullConsumer.setEnableMsgTrace(consumerConfig.isEnableMsgTrace());
litePullConsumer.setCustomizedTraceTopic(consumerConfig.getCustomizedTraceTopic());
litePullConsumer.setNamespace(consumerConfig.getNamespace());
//自定义参数设置
litePullConsumer.setMqClientApiTimeout(mqClientApiTimeout);
return litePullConsumer;
}
以上步骤做了之后,重启还是报错,跟踪发现,这次是一个自定义的DefaultMQPushConsumer 消费者报错,于是找到消费者设置参数的地方,再设置一遍:
@Value("${rocketmq.mq-client-api-timeout:10000}")
private Integer mqClientApiTimeout;
@Override
public void prepareStart(DefaultMQPushConsumer consumer) {
if (StrUtil.isNotEmpty(clientIp)) {
consumer.setClientIP(clientIp);
}
consumer.setConsumeFromWhere(CONSUME_FROM_TIMESTAMP);
consumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis()));
//自定义api超时时间
consumer.setMqClientApiTimeout(mqClientApiTimeout);
}
}
这次之后,终于不报错了。断点发现,超时的地方执行了20s,不清楚具体是怎么回事,后面如果找到具体原因再更新一下。先用上面的方法治标拖延点时间。
更多推荐
已为社区贡献1条内容
所有评论(0)