kafka插件化实现自定义CreateTopicPolicy
在《深入理解kafka》一书当中, 4.2.2 主题合法性验证小结中,里面提到Kafka broker 端有一个这样的参数:create.topic.policy.class.name,默认值为null,它提供了一个入口用来验证主题创建的合法性。使用方式很简单,只需要自定义实现org.apache.kafka.server.policy.CreateTopicPolicy 接口,比如下面示例中的
引言:
在《深入理解kafka》一书当中, 4.2.2 主题合法性验证小结中,里面提到
Kafka broker 端有一个这样的参数:create.topic.policy.class.name,默认值为null,它提供了一个入口用来验证主题创建的合法性。使用方式很简单,只需要自定义实现org.apache.kafka.server.policy.CreateTopicPolicy 接口,比如下面示例中的 PolicyDemo。然后在broker 端的配置文件 config/server.properties 中配置参数 create.topic.policy.class.name的值为org.apache.kafka.server.policy.PolicyDemo,最后启动服务
于是乎,我就好奇,既然要自定义实现org.apache.kafka.server.policy.CreateTopicPolicy 接口难道是要重写kafka的代码?
带着好奇的心情,展开了一番查找,发现Kafka 设计了插件化的扩展机制,你只需单独编写自定义 Policy 类、打包成独立 JAR,再通过配置让 Broker 加载即可,完全不需要改 Kafka 核心源码。
实践
我的本地版本是kafka2.13-4.0.1
1.创建自定义的CreateTopicPolicy 实现类
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.errors.PolicyViolationException;
import org.apache.kafka.server.policy.CreateTopicPolicy;
import java.util.Map;
public class CustomCreateTopicPolicy implements CreateTopicPolicy, Configurable {
// 主题名称前缀要求
private static final String TOPIC_PREFIX = "biz-";
@Override
public void validate(RequestMetadata requestMetadata) throws PolicyViolationException {
// ====================== 修复点:跳过系统主题 ======================
//之前没跳过系统主题导致消费有问题
if (topicName.startsWith("__") || topicName.startsWith("_confluent-")) {
// 系统内部主题,不校验
return;
}
String topicName = requestMetadata.topic();
// 验证主题名称是否以指定前缀开头
if (!topicName.startsWith(TOPIC_PREFIX)) {
throw new PolicyViolationException(
"主题创建失败:名称必须以'" + TOPIC_PREFIX + "'开头,当前名称:" + topicName
);
}
// 可选:验证分区数/副本数
if (requestMetadata.numPartitions() < 3) {
throw new PolicyViolationException(
"主题创建失败:分区数不能少于3,当前分区数:" + requestMetadata.numPartitions()
);
}
}
@Override
public void configure(Map<String, ?> configs) {
// 可读取Broker配置的自定义参数(可选)
}
@Override
public void close() throws Exception {
// 资源释放(可选)
}
}
maven依赖
<dependencies>
<!-- Kafka 核心依赖(版本与你的 Broker 一致) -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-server</artifactId>
<version>4.0.1</version>
<scope>provided</scope> <!-- 运行时由Broker提供,避免包冲突 -->
</dependency>
</dependencies>
随后添加打包的插件到pom文件
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.3.0</version>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
打包:
mvn clean package
然后得到一个jar包

kafka的libs目录下

并在对应的server.properties配置create.topic.policy.class.name={jar包自定义类的完整目录}
如org.apache.kafka.server.policy.CustomCreateTopicPolicy
然后启动kafka
然后执行我准备好的kafkaadminTest脚本创建topic,当然也可以用kafka-topics.sh的方式创建

然后就可以看到提示报错信息了

更多推荐
所有评论(0)