快速上手 Kafka 安装部署运行(以阿里云为例单机版)
本文详细介绍了Kafka单机环境的安装部署流程。首先需要安装JDK17并配置环境变量,然后分别部署ZooKeeper和Kafka服务。ZooKeeper配置了数据存储路径并设置环境变量,Kafka则修改了日志目录、监听地址等关键配置。文章提供了完整的启动命令,包括后台启动和日志查看方法,并通过创建测试主题、发送/消费消息验证了安装的正确性。整个安装过程涵盖了从基础环境准备到服务测试的全套步骤,适合
·
1. Kafka 核心概念
1.1 基础组件
- Broker:Kafka 服务器节点,负责消息存储与转发,多个 Broker 组成集群。
- Topic:消息的分类逻辑容器,生产者发消息到 Topic,消费者从 Topic 消费。
- Partition:Topic 的物理分区,一个 Topic 可分多个 Partition,提升并行处理能力,消息在 Partition 内有序。
- Offset:消息在 Partition 中的唯一序号,消费者通过 Offset 定位消息。
- Producer:消息生产者,向 Kafka Topic 发送消息。
- Consumer:消息消费者,从 Kafka Topic 拉取消息。
- Consumer Group:消费者组,组内消费者共同消费一个 Topic,每个 Partition 只能被组内一个消费者消费,实现负载均衡。
1.2 核心特性
- 高吞吐:批量发送、零拷贝技术,支持每秒百万级消息。
- 持久化:消息持久化到磁盘,可配置保留时间 / 大小。
- 高可用:多副本机制,Leader 故障时 Follower 自动切换。
- 可扩展:集群可水平扩展,动态增加 Broker。
2. Kafka 环境安装
前置依赖
- Java 8+(Kafka 运行依赖 JVM)
- ZooKeeper(Kafka 2.8+ 可内置 KRaft,无需 ZooKeeper,本文用传统 ZooKeeper 模式)
Kafka 的运行环境需要 JVM 虚拟机,所以在安装之前要先安装好虚拟机环境及配置。
安装步骤如下:
2.1 JDK 安装
# 查询包管理器中是否有可用的OpenJDK 17包
sudo yum list available | grep java-17
# 安装 JDK
sudo yum install java-17-openjdk.x86_64
# 添加环境变量
echo -e "export JAVA_HOME=/usr/lib/jvm/java-17-openjdk-17.0.18.0.8-1.0.2.1.al8.x86_64\nexport PATH=\$JAVA_HOME/bin:\$PATH" >> ~/.bashrc
# 刷新配置文件
source ~/.bashrc
2.2 Kafka 环境
2.2.1 下载 ZooKeeper
虽然 Kafka 2.8+版本支持 KRaft 模式(去ZooKeeper化),但单机测试环境仍推荐使用嵌入式ZooKeeper以简化部署,下载 ZooKeeper

# 将下载好 ZooKeeper 的压缩包上传 并解压移动到自习习惯存放的目录
tar -xzf apache-zookeeper-3.8.6-bin.tar.gz && mv apache-zookeeper-3.8.6-bin /opt/
配置 ZooKeeper :
# 进入 ZooKeeper 解压后的文件目录
cd /opt/apache-zookeeper-3.8.6-bin/conf
# 配置初始化文件
cp zoo_sample.cfg zoo.cfg
# 配置相关信息
vi zoo.cfg
# 修改 zookeeper 数据存放地址,不要放在临时文件夹
dataDir=/opt/zookeeper/data
配置 ZooKeeper 环境变量
echo 'export ZOOKEEPER_HOME=/opt/apache-zookeeper-3.8.6-bin' >> ~/.bashrc
&& echo 'export PATH=$ZOOKEEPER_HOME/bin:$PATH' >> ~/.bashrc
# 刷新生效
source ~/.bashrc
# 启动
zkServer.sh start
# 查看状态
zkServer.sh status
# 停止
zkServer.sh stop
2.2.2 下载 Kafka

# 将下载好 kafka 的压缩包上传 并解压移动到自习习惯存放的目录
tar -xzf kafka_2.13-3.8.0.tgz && mv kafka_2.13-3.8.0 /opt
编辑 Kafka 配置
cd /opt/kafka_2.13-3.8.0/config
# 编辑配置
vi server.properties
# 修改日志记录位置
log.dirs=/opt/kafka/kraft-combined-logs
# 直接复制替换这 2 行(最关键)
listeners=PLAINTEXT://0.0.0.0:9092
# 公网IP X.X.X.X
advertised.listeners=PLAINTEXT://X.X.X.X:9092
开放服务器防火墙,允许外网客户端连接
firewall-cmd --add-port=9092/tcp --permanent
firewall-cmd --reload
阿里云安全组放行 9092端口(最重要!)
- 登录 阿里云控制台
- 找到你的 ECS 服务器
- 进入 安全组 → 入方向
- 添加规则:
端口:9092
协议:TCP
授权对象:0.0.0.0/0
这个不开,永远连不上!
启动 Kafka 服务
# 打开 Kafka 目录
cd /opt/kafka_2.13-3.8.0
# 启动 zookeeper 并后台输出日志
nohup bin/zookeeper-server-start.sh config/zookeeper.properties > zookeeper.log 2>&1 &
# 后台启动,同时输出日志
nohup bin/kafka-server-start.sh config/server.properties > kafka.log 2>&1 &
# 查看日志输出情况
tail -f kafka.log
# 停止 Kafka,在同一个 bin 文件夹下
./kafka-server-stop.sh
2.2.3 测试 Kafka
# 打开
cd /opt/kafka_2.13-3.8.0/bin
# 创建名为 test 的主题
./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test
# 查看
./kafka-topics.sh --bootstrap-server localhost:9092 --list

# 给创建的主题发送消息
./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test
# 消费消息
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test


3. Java 开发环境准备
3.1 Maven 依赖
在 pom.xml 中添加 Kafka 客户端依赖:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>kafkaDemo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.8.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.9</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>2.0.9</version>
</dependency>
<!-- Jackson 核心包(包含 ObjectMapper) -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.15.2</version>
</dependency>
<!-- 配套依赖(必须一起加,否则会报错) -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.15.2</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.15.2</version>
</dependency>
</dependencies>
</project>
3.2 Producer(生产者)代码示例
3.2.1 生产者(异步发送)
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
/**
* Kafka 基础生产者(异步发送)
*/
public class KafkaProducerAsync {
// Kafka 集群地址
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
// 目标 Topic
private static final String TOPIC = "test-topic";
public static void main(String[] args) {
// 1. 配置生产者参数
Properties props = new Properties();
// 必选:集群地址
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
// 必选:Key 序列化器
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 必选:Value 序列化器
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 可选:消息确认机制(all=所有副本确认,最安全)
props.put(ProducerConfig.ACKS_CONFIG, "all");
// 可选:重试次数
props.put(ProducerConfig.RETRIES_CONFIG, 3);
// 可选:批量发送大小(16KB)
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
// 可选:批量延迟(100ms 未达批量则发送)
props.put(ProducerConfig.LINGER_MS_CONFIG, 100);
// 2. 创建生产者实例
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
try {
// 3. 发送消息(异步 + 回调)
for (int i = 0; i < 10; i++) {
String key = "key-" + i;
String value = "Hello Kafka " + i;
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, key, value);
// 异步发送,带回调(获取发送结果)
producer.send(record, (metadata, exception) -> {
if (exception == null) {
// 发送成功:打印元数据
System.out.printf("发送成功 -> Topic: %s, Partition: %d, Offset: %d, Key: %s, Value: %s%n",
metadata.topic(), metadata.partition(), metadata.offset(), key, value);
} else {
// 发送失败:打印异常
System.err.println("发送失败:" + exception.getMessage());
}
});
}
} finally {
// 4. 关闭生产者(必须,否则资源泄漏)
producer.close();
}
}
}
3.2.2 生产者(同步发送,阻塞等待结果)
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
/**
* Kafka 同步生产者(阻塞等待发送结果)
*/
public class KafkaProducerSync {
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String TOPIC = "test-topic";
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.ACKS_CONFIG, "all");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
try {
for (int i = 0; i < 5; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, "sync-key-" + i, "sync-value-" + i);
// 同步发送:get() 阻塞直到返回结果
RecordMetadata metadata = producer.send(record).get();
System.out.printf("同步发送成功 -> Partition: %d, Offset: %d%n", metadata.partition(), metadata.offset());
}
} finally {
producer.close();
}
}
}
3.3 Consumer(消费者)代码示例
3.3.1 Consumer(自动提交 Offset)
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
/**
* Kafka 基础消费者(自动提交 Offset)
*/
public class KafkaConsumerAutoCommit {
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String TOPIC = "test-topic";
// 消费者组 ID(必须,同一组内消费者负载均衡)
private static final String GROUP_ID = "test-group-1";
public static void main(String[] args) {
// 1. 配置消费者参数
Properties props = new Properties();
// 必选:集群地址
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
// 必选:消费者组 ID
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
// 必选:Key 反序列化器(与生产者序列化器一致)
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 必选:Value 反序列化器
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 可选:自动提交 Offset(默认 true)
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
// 可选:自动提交间隔(5000ms)
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000");
// 可选:无 Offset 时从头消费(earliest)/从最新消费(latest)
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// 2. 创建消费者实例
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 3. 订阅 Topic(可订阅多个:Arrays.asList("topic1", "topic2"))
consumer.subscribe(Collections.singletonList(TOPIC));
// 4. 循环拉取消息
try {
while (true) {
// 拉取消息,超时 100ms
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
// 遍历消息
for (ConsumerRecord<String, String> record : records) {
System.out.printf("消费成功 -> Partition: %d, Offset: %d, Key: %s, Value: %s%n",
record.partition(), record.offset(), record.key(), record.value());
}
}
} finally {
// 5. 关闭消费者
consumer.close();
}
}
}
3.3.2 Consumer(手动提交 Offset,生产环境推荐,保证消息不丢失)
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
/**
* Kafka 消费者(手动提交 Offset,保证可靠性)
*/
public class KafkaConsumerManualCommit {
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String TOPIC = "test-topic";
private static final String GROUP_ID = "test-group-2";
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 关闭自动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(TOPIC));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
if (!records.isEmpty()) {
// 处理消息
for (ConsumerRecord<String, String> record : records) {
System.out.printf("处理消息 -> Partition: %d, Offset: %d, Value: %s%n",
record.partition(), record.offset(), record.value());
}
// 手动同步提交 Offset(处理完再提交,失败则不提交,下次重试)
consumer.commitSync();
System.out.println("Offset 提交成功");
}
}
} catch (Exception e) {
System.err.println("消息处理失败:" + e.getMessage());
// 异常时不提交 Offset,下次 poll 会重新拉取
} finally {
consumer.close();
}
}
}
3.4 自定义序列化 / 反序列化(对象传输)
3.4.1 定义实体类
// 用户实体类
public class User {
private String id;
private String name;
private int age;
// 构造方法、getter/setter、toString
public User() {}
public User(String id, String name, int age) {
this.id = id;
this.name = name;
this.age = age;
}
@Override
public String toString() {
return "User{id='" + id + "', name='" + name + "', age=" + age + "}";
}
// getter/setter 省略
}
3.4.2 自定义 JSON 序列化器
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.serialization.Serializer;
import java.util.Map;
/**
* 自定义 JSON 序列化器(对象转字节数组)
*/
public class JsonSerializer<T> implements Serializer<T> {
private final ObjectMapper objectMapper = new ObjectMapper();
@Override
public void configure(Map<String, ?> configs, boolean isKey) {}
@Override
public byte[] serialize(String topic, T data) {
if (data == null) return null;
try {
return objectMapper.writeValueAsBytes(data);
} catch (Exception e) {
throw new RuntimeException("JSON 序列化失败", e);
}
}
@Override
public void close() {}
}
3.4.3 自定义 JSON 反序列化器
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.serialization.Deserializer;
import java.util.Map;
/**
* 自定义 JSON 反序列化器(字节数组转对象)
*/
public class JsonDeserializer<T> implements Deserializer<T> {
private final ObjectMapper objectMapper = new ObjectMapper();
private Class<T> clazz;
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// 获取反序列化目标类
clazz = (Class<T>) configs.get("value.class");
}
@Override
public T deserialize(String topic, byte[] data) {
if (data == null) return null;
try {
return objectMapper.readValue(data, clazz);
} catch (Exception e) {
throw new RuntimeException("JSON 反序列化失败", e);
}
}
@Override
public void close() {}
}
3.4.4 对象生产者
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class UserProducer {
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String TOPIC = "user-topic";
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 使用自定义 JSON 序列化器
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName());
props.put(ProducerConfig.ACKS_CONFIG, "all");
KafkaProducer<String, User> producer = new KafkaProducer<>(props);
try {
User user = new User("1", "张三", 25);
ProducerRecord<String, User> record = new ProducerRecord<>(TOPIC, user.getId(), user);
producer.send(record, (metadata, exception) -> {
if (exception == null) {
System.out.println("用户对象发送成功:" + user);
} else {
exception.printStackTrace();
}
});
} finally {
producer.close();
}
}
}
3.4.5 对象消费者
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class UserConsumer {
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String TOPIC = "user-topic";
private static final String GROUP_ID = "user-group";
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 使用自定义 JSON 反序列化器
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class.getName());
// 指定反序列化目标类
props.put("value.class", User.class);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer<String, User> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(TOPIC));
try {
while (true) {
ConsumerRecords<String, User> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, User> record : records) {
System.out.println("消费用户对象:" + record.value());
}
consumer.commitSync();
}
} finally {
consumer.close();
}
}
}
4. 生产环境核心配置
4.1 Producer 可靠性配置
# 所有副本确认(最安全)
acks=all
# 无限重试
retries=Integer.MAX_VALUE
# 开启幂等性(防止重复发送)
enable.idempotence=true
# 事务超时(事务模式)
transaction.timeout.ms=30000
4.2 Consumer 可靠性配置
# 关闭自动提交
enable.auto.commit=false
# 手动提交(同步/异步)
# 最大拉取记录数
max.poll.records=500
# 消费者会话超时(10s)
session.timeout.ms=10000
# 重平衡超时(30s)
max.poll.interval.ms=300000
4.3 Consumer 可靠性配置
# 创建 3 分区 2 副本的 Topic(生产环境推荐)
bin/kafka-topics.sh --create --topic prod-topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 2
5. Kafka Kraft 模式启动(单机)
该模式启动不再依赖 zookeeper,直接配置 config/kraft 目录下的文件即可
-
配置 kraft/server.properties 文件
vi /opt/kafka-3.8.0/config/kraft/server.properties单台服务器快速启动,直接复制覆盖
# ==================== Kafka 3.8.0 KRaft 单机配置 ==================== process.roles=broker,controller node.id=1 cluster.id= listeners=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093 # 云服务器需要换成公网IP advertised.listeners=PLAINTEXT://127.0.0.1:9092 controller.listener.names=CONTROLLER listener.security.protocol.map=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT controller.quorum.voters=1@127.0.0.1:9093 # 日志存储目录(可自行修改) log.dirs=/tmp/kafka380-logs metadata.log.dir=/tmp/kafka380-metadata # 基础参数 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 num.partitions=1 default.replication.factor=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 -
生成集群 ID(全局唯一)
bin/kafka-storage.sh random-uuid会输出一串 UUID,例如:
J-x9J5-QRPmfqgMjKo-usA
把上面生成的 UUID 填入 kraft/server.properties 配置文件的:cluster.id=J-x9J5-QRPmfqgMjKo-usA -
格式化存储(首次启动必须执行)
bin/kafka-storage.sh format -t J-x9J5-QRPmfqgMjKo-usA -c config/kraft/server.properties -
启动 Kafka
# 前台启动 bin/kafka-server-start.sh config/kraft/server.properties # 后台守护启动(生产用) bin/kafka-server-start.sh -daemon config/kraft/server.properties
更多推荐
所有评论(0)