Kafka 2.x 升 3.x,我踩过的 3 个坑

写在前面

公司项目要用 Kafka 3.x,领导让我负责升级。

Java 能看懂(CRUD 工程师),Linux 命令会用(cd、ls、vim 三件套)。说实话,Kafka 这东西我之前也就停留在"会用"的 level,突然要升级版本,心里真没底。

网上的文档要么太深奥,要么太简单。没办法,只能自己一点点试,顺便把踩坑经历记下来。

如果你也是新手,希望这篇笔记能帮到你。


环境信息

  • Spring Boot: 2.7.18(单服务升级2.2->2.7)
  • Spring Cloud: Hoxton.SR9
  • Kafka Clients: 3.0.2(升级目标)
  • Spring Kafka: 由 Spring Boot 管理( 2.8)

坑 1:启动就报错,NoClassDefFoundError

❌ 我当时以为很简单

直接改 pom.xml,把 Kafka 版本改成 3.0.2,然后启动…

# 我当时看到这个配置,心想这不简单吗
kafka.version: 3.0.2

结果启动失败:

java.lang.NoClassDefFoundError: org/apache/kafka/common/requests/IsolationLevel

我当时就懵了。啥?IsolationLevel 是个啥?

🔍 问题根源

后来查资料才明白:

  1. API 变更:Kafka 3.0+ 中 IsolationLevel 类的包路径或者实现发生了重构
  2. 自动配置冲突:Spring Boot 2.7.18 的 KafkaAutoConfiguration 内部调用了已废弃的 Kafka API
  3. 依赖传递冲突:多个外部依赖传递引入了不同版本的 kafka-clients

好家伙,原来是版本冲突 + 自动配置惹的祸。

✅ 解决方案

禁用 Spring Boot 的 Kafka 自动配置 + 手动配置 Bean

// 在主启动类上禁用自动配置
@SpringBootApplication(exclude = {
    org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration.class
})
@ConfigurationPropertiesScan
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

教训:别以为改个版本号就完事了,自动配置可能会背刺你!


坑 2:依赖排除,一个都不能少

❌ 我只排除了部分依赖

<!-- 我只排除了一个依赖,心想应该够了 -->
<dependency>
    <groupId>com.example</groupId>
    <artifactId>some-dependency</artifactId>
    <exclusions>
        <exclusion>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
        </exclusion>
    </exclusions>
</dependency>

结果打包后发现,jar 里还是有多个版本的 kafka-clients。

✅ 正确做法

所有可能引入 Kafka 依赖的地方都要排除

<!-- 排除依赖 1 -->
<dependency>
    <groupId>com.example</groupId>
    <artifactId>common-lib</artifactId>
    <exclusions>
        <exclusion>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
        </exclusion>
        <exclusion>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </exclusion>
    </exclusions>
</dependency>

<!-- 排除依赖 2 -->
<dependency>
    <groupId>com.example</groupId>
    <artifactId>another-lib</artifactId>
    <exclusions>
        <exclusion>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
        </exclusion>
        <exclusion>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </exclusion>
    </exclusions>
</dependency>

<!-- 排除依赖 3 -->
<dependency>
    <groupId>com.example</groupId>
    <artifactId>third-lib</artifactId>
    <exclusions>
        <exclusion>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
        </exclusion>
        <exclusion>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </exclusion>
    </exclusions>
</dependency>

检查命令

# 查看打包后的 kafka-clients 版本
jar tf target/your-app.jar | Select-String "kafka-clients"

预期结果:应该只看到 kafka-clients-3.0.2.jar,没有其他版本。

教训:别手贱!一个一个排除,别偷懒!用 Maven Helper 插件检查依赖树。


坑 3:禁用自动配置后,Bean 不会自动创建

❌ 我以为禁用就完事了

禁用了 KafkaAutoConfiguration,启动…

NoSuchBeanDefinitionException: KafkaListenerEndpointRegistry

又懵了。这又是啥?

🔍 原因分析

后来才明白:

  • 禁用 KafkaAutoConfiguration:阻止 Spring Boot 自动创建 KafkaProperties Bean,避免其初始化时调用 Kafka 3.x 已废弃的 API
  • 使用 @EnableKafka:启用 Kafka 监听器支持,自动创建 KafkaListenerEndpointRegistry 等监听器管理相关的 Bean
  • 手动配置 Bean:因为禁用了自动配置,需要手动创建 ConsumerFactoryProducerFactoryKafkaTemplate 等业务所需的 Bean

✅ 正确做法

1. 添加 @EnableKafka

@Configuration
@EnableKafka  // 这个注解必须有!
public class KafkaConfig {
    // ...
}

2. 手动配置所有 Bean

@Configuration
@EnableKafka
public class KafkaConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.consumer.group-id:default-group}")
    private String groupId;

    /**
     * 消费者工厂
     */
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return new DefaultKafkaConsumerFactory<>(configProps);
    }

    /**
     * 生产者工厂
     */
    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    /**
     * KafkaTemplate - 用于发送消息
     */
    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    /**
     * 批量消费监听器工厂
     */
    @Bean
    public KafkaListenerContainerFactory<?> batchFactory(ConsumerFactory consumerFactory) {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.setConcurrency(5);
        factory.getContainerProperties().setPollTimeout(1500);
        factory.setBatchListener(true);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }
}

重要说明

  • @EnableKafka 会自动创建 KafkaListenerEndpointRegistry,无需手动配置
  • 如果项目只使用生产者(KafkaTemplate),可以不配置 ConsumerFactorybatchFactory
  • 如果使用了 @KafkaListener,必须配置 ConsumerFactory

教训:禁用自动配置后,所有 Bean 都要手动配!一个都不能少!


验证步骤

1. 编译打包

cd your-project
mvn clean package -DskipTests

2. 检查 jar 包中的 Kafka 版本

# 查看打包后的 kafka-clients 版本
jar tf target/your-app.jar | Select-String "kafka-clients"

预期结果:应该只看到 kafka-clients-3.0.2.jar,没有其他版本。

3. 启动测试

启动应用,确认没有 NoClassDefFoundError: IsolationLevel 错误。


配置对比(2.x vs 3.x)

2.x 配置(自动配置)

// 啥都不用配,Spring Boot 自动帮你搞定
@SpringBootApplication
public class Application {
    // ...
}

好处:简单,省事

坏处:版本冲突时没法搞

3.x 配置(手动配置)

// 禁用自动配置 + 手动配置所有 Bean
@SpringBootApplication(exclude = {
    KafkaAutoConfiguration.class
})
@Configuration
@EnableKafka
public class KafkaConfig {
    // 手动配置 ConsumerFactory, ProducerFactory, KafkaTemplate, batchFactory
}

好处:完全可控,版本冲突时能搞定

坏处:配置多,容易漏

我的感受

  • 虽然配置多了,但心里有底
  • 出了问题知道从哪查
  • 推荐大家都这么配(虽然麻烦点)

回滚方案

如果升级后出现问题,可以快速回滚:

  1. 恢复 pom.xml 中的 kafka.version2.8.2
  2. 移除 @SpringBootApplicationexclude 属性
  3. 恢复 KafkaConfig.java 为原始版本(仅保留 batchFactory 方法,移除其他手动配置的 Bean)
  4. 移除 KafkaConfig 上的 @EnableKafka 注解
  5. 重新编译部署

教训:升级前一定要留好退路!


总结

我踩过的坑

  1. 坑 1:以为改个版本号就完事 → 启动报 NoClassDefFoundError
  2. 坑 2:依赖排除不干净 → jar 包里有多个版本的 kafka-clients
  3. 坑 3:禁用自动配置后不手动配 Bean → 报 NoSuchBeanDefinitionException

核心要点

  • 必须禁用 KafkaAutoConfiguration:避免自动配置与 Kafka 3.x 的兼容性问题
  • 必须添加 @EnableKafka:否则 KafkaListenerEndpointRegistry 不会创建
  • 必须手动配置所有 BeanConsumerFactoryProducerFactoryKafkaTemplatebatchFactory
  • 必须排除所有传递依赖:一个都不能少!

最后说两句

其实也没多难,就是配置多了点。

我刚开始也懵,后来一点点试,总算是搞定了。

肯定有理解不对的地方,欢迎大佬指正。

如果你也遇到类似问题,希望这篇笔记能帮到你。

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐