引言:

在《深入理解kafka》一书当中, 4.2.2 主题合法性验证小结中,里面提到
Kafka broker 端有一个这样的参数:create.topic.policy.class.name,默认值为null,它提供了一个入口用来验证主题创建的合法性。使用方式很简单,只需要自定义实现org.apache.kafka.server.policy.CreateTopicPolicy 接口,比如下面示例中的 PolicyDemo。然后在broker 端的配置文件 config/server.properties 中配置参数 create.topic.policy.class.name的值为org.apache.kafka.server.policy.PolicyDemo,最后启动服务

于是乎,我就好奇,既然要自定义实现org.apache.kafka.server.policy.CreateTopicPolicy 接口难道是要重写kafka的代码?

带着好奇的心情,展开了一番查找,发现Kafka 设计了插件化的扩展机制,你只需单独编写自定义 Policy 类、打包成独立 JAR,再通过配置让 Broker 加载即可,完全不需要改 Kafka 核心源码。

实践

我的本地版本是kafka2.13-4.0.1

1.创建自定义的CreateTopicPolicy 实现类

import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.errors.PolicyViolationException;
import org.apache.kafka.server.policy.CreateTopicPolicy;

import java.util.Map;

public class CustomCreateTopicPolicy implements CreateTopicPolicy, Configurable {
    // 主题名称前缀要求
    private static final String TOPIC_PREFIX = "biz-";

    @Override
    public void validate(RequestMetadata requestMetadata) throws PolicyViolationException {
        // ====================== 修复点:跳过系统主题 ======================
        //之前没跳过系统主题导致消费有问题
        if (topicName.startsWith("__") || topicName.startsWith("_confluent-")) {
            // 系统内部主题,不校验
            return;
        }
        String topicName = requestMetadata.topic();
        // 验证主题名称是否以指定前缀开头
        if (!topicName.startsWith(TOPIC_PREFIX)) {
            throw new PolicyViolationException(
                    "主题创建失败:名称必须以'" + TOPIC_PREFIX + "'开头,当前名称:" + topicName
            );
        }
        // 可选:验证分区数/副本数
        if (requestMetadata.numPartitions() < 3) {
            throw new PolicyViolationException(
                    "主题创建失败:分区数不能少于3,当前分区数:" + requestMetadata.numPartitions()
            );
        }
    }

    @Override
    public void configure(Map<String, ?> configs) {
        // 可读取Broker配置的自定义参数(可选)
    }

    @Override
    public void close() throws Exception {
        // 资源释放(可选)
    }
}

maven依赖

<dependencies>
            <!-- Kafka 核心依赖(版本与你的 Broker 一致) -->
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-server</artifactId>
                <version>4.0.1</version>
                <scope>provided</scope> <!-- 运行时由Broker提供,避免包冲突 -->
            </dependency>
    </dependencies>

随后添加打包的插件到pom文件

<build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <version>3.3.0</version>
                <configuration>
                    <archive>
                        <manifest>
                            <addClasspath>true</addClasspath>
                        </manifest>
                    </archive>
                </configuration>
            </plugin>
        </plugins>

打包:

mvn clean package

然后得到一个jar包

kafka的libs目录下

并在对应的server.properties配置create.topic.policy.class.name={jar包自定义类的完整目录}

如org.apache.kafka.server.policy.CustomCreateTopicPolicy
 

然后启动kafka

然后执行我准备好的kafkaadminTest脚本创建topic,当然也可以用kafka-topics.sh的方式创建

然后就可以看到提示报错信息了

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐