Goravel连接kafka
核心流程:文件上传 → 保存到服务器 → 封装任务推送到 Kafka → 消费者异步处理文件,实现了接口和业务解耦。关键要点配置解耦:Kafka 连接信息通过配置文件 + 环境变量管理,便于环境切换;错误处理:上传失败 / 消息发送失败有回滚逻辑,消费失败有日志和偏移量提交机制;可扩展性:生产者 / 消费者封装为独立模块,便于后续扩展多 topic、多消费者。生产建议:消费者建议用 systemd
·
前置准备
- 已搭建 Goravel 项目(
go install github.com/goravel/framework@latest→goravel new project) - 安装 Kafka Go 客户端:
go get github.com/segmentio/kafka-go - Kafka 服务已部署并可访问(获取 broker 地址、可创建 topic)
第一步:配置 Kafka 连接信息
在config目录下新建kafka.go,统一管理 Kafka 配置:
go
运行
package config
import "github.com/goravel/framework/facades"
func init() {
config := facades.Config
// 添加Kafka配置
config.Add("kafka", map[string]interface{}{
"brokers": config.Env("KAFKA_BROKERS", "127.0.0.1:9092"), // Kafka地址
"topic": config.Env("KAFKA_FILE_TOPIC", "file_upload"), // 任务topic
"group_id": config.Env("KAFKA_GROUP_ID", "file_consumer"), // 消费者组ID
"timeout": 10, // 超时时间(秒)
})
}
在.env文件补充配置:
env
KAFKA_BROKERS=127.0.0.1:9092
KAFKA_FILE_TOPIC=file_upload
KAFKA_GROUP_ID=file_consumer
第二步:封装 Kafka 生产者工具
在app/utils目录创建kafka_producer.go,封装消息发送逻辑:
go
运行
package utils
import (
"context"
"encoding/json"
"fmt"
"github.com/goravel/framework/facades"
"github.com/segmentio/kafka-go"
)
// FileUploadTask 定义Kafka消息结构体(文件任务信息)
type FileUploadTask struct {
FilePath string `json:"file_path"` // 文件保存路径
FileName string `json:"file_name"` // 文件名
FileSize int64 `json:"file_size"` // 文件大小(字节)
MimeType string `json:"mime_type"` // 文件类型
}
var kafkaWriter *kafka.Writer
// InitKafkaProducer 初始化Kafka生产者
func InitKafkaProducer() {
brokers := facades.Config.GetString("kafka.brokers")
topic := facades.Config.GetString("kafka.topic")
kafkaWriter = kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{brokers},
Topic: topic,
Balancer: &kafka.LeastBytes{}, // 负载均衡策略
})
facades.Log.Info("Kafka生产者初始化成功")
}
// SendFileTask 发送文件任务到Kafka队列
func SendFileTask(task FileUploadTask) error {
if kafkaWriter == nil {
InitKafkaProducer()
}
// 序列化任务为JSON
msgBytes, err := json.Marshal(task)
if err != nil {
facades.Log.Error("序列化文件任务失败", err)
return fmt.Errorf("序列化失败: %v", err)
}
// 发送消息
err = kafkaWriter.WriteMessages(context.Background(),
kafka.Message{Value: msgBytes},
)
if err != nil {
facades.Log.Error("发送Kafka消息失败", err)
return fmt.Errorf("发送失败: %v", err)
}
return nil
}
第三步:初始化 Kafka 生产者
修改bootstrap/app.go,在项目启动时初始化生产者:
go
运行
package bootstrap
import (
"your-project/app/utils" // 替换为实际模块名
"github.com/goravel/framework/foundation"
)
func Boot(app *foundation.Application) {
// 其他初始化逻辑...
// 初始化Kafka生产者
utils.InitKafkaProducer()
}
第四步:实现文件上传接口
在app/http/controllers创建FileController.go:
go
运行
package controllers
import (
"net/http"
"path/filepath"
"time"
"github.com/goravel/framework/contracts/http"
"github.com/goravel/framework/facades"
"github.com/goravel/framework/http/validate"
"your-project/app/utils" // 替换为实际模块名
)
type FileController struct{}
// Upload 文件上传接口
func (c *FileController) Upload(ctx http.Context) http.Response {
// 1. 获取上传文件
file, err := ctx.Request().File("file")
if err != nil {
return ctx.Response().Json(http.StatusBadRequest, http.Json{
"code": 400,
"message": "获取文件失败",
})
}
// 2. 验证文件(50MB上限、允许的扩展名)
validateErr := validate.File(ctx, validate.FileRules{
Field: "file",
Rules: []validate.FileRule{
validate.FileSize(1024 * 1024 * 50), // 50MB
validate.FileExts([]string{"jpg", "png", "pdf", "txt"}),
},
})
if validateErr != nil {
return ctx.Response().Json(http.StatusBadRequest, http.Json{
"code": 400,
"message": validateErr.Error(),
})
}
// 3. 生成唯一文件名,避免重复
fileName := fmt.Sprintf("%d_%s", time.Now().UnixMicro(), file.GetClientOriginalName())
savePath := filepath.Join("storage", "uploads", fileName)
// 4. 保存文件到服务器
if err := file.StoreAs(savePath); err != nil {
return ctx.Response().Json(http.StatusInternalServerError, http.Json{
"code": 500,
"message": "保存文件失败",
})
}
// 5. 封装任务并发送到Kafka
task := utils.FileUploadTask{
FilePath: savePath,
FileName: fileName,
FileSize: file.GetSize(),
MimeType: file.GetMimeType(),
}
if err := utils.SendFileTask(task); err != nil {
// 发送失败,回滚文件保存
facades.Storage().Delete(savePath)
return ctx.Response().Json(http.StatusInternalServerError, http.Json{
"code": 500,
"message": "任务入队失败",
})
}
// 6. 返回成功响应
return ctx.Response().Json(http.StatusOK, http.Json{
"code": 200,
"message": "文件上传成功,任务已入队",
"data": map[string]string{
"file_name": fileName,
"file_path": savePath,
},
})
}
第五步:注册上传路由
修改routes/api.go:
go
运行
package routes
import (
"github.com/goravel/framework/facades"
"your-project/app/http/controllers" // 替换为实际模块名
)
func Api() {
router := facades.Route
// 文件上传路由
router.Post("/file/upload", controllers.FileController{}.Upload)
}
第六步:编写 Kafka 消费者
在app/consumers创建file_consumer.go,处理队列任务:
go
运行
package consumers
import (
"context"
"encoding/json"
"github.com/goravel/framework/facades"
"github.com/segmentio/kafka-go"
"your-project/app/utils" // 替换为实际模块名
)
// StartFileConsumer 启动文件任务消费者
func StartFileConsumer() {
brokers := facades.Config.GetString("kafka.brokers")
topic := facades.Config.GetString("kafka.topic")
groupID := facades.Config.GetString("kafka.group_id")
// 创建Kafka消费者
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{brokers},
Topic: topic,
GroupID: groupID,
MinBytes: 1024, // 最小读取字节
MaxBytes: 10240, // 最大读取字节
})
defer reader.Close()
facades.Log.Info("Kafka消费者已启动,监听topic: " + topic)
// 循环消费消息
for {
msg, err := reader.ReadMessage(context.Background())
if err != nil {
facades.Log.Error("读取Kafka消息失败", err)
continue
}
// 反序列化消息
var task utils.FileUploadTask
if err := json.Unmarshal(msg.Value, &task); err != nil {
facades.Log.Error("反序列化消息失败", err)
// 提交偏移量,避免重复消费
_ = reader.CommitMessages(context.Background(), msg)
continue
}
// 核心业务处理(替换为你的实际逻辑)
handleFileTask(task)
// 提交偏移量,确认消费完成
if err := reader.CommitMessages(context.Background(), msg); err != nil {
facades.Log.Error("提交偏移量失败", err)
} else {
facades.Log.Info("文件任务处理完成", "file", task.FileName)
}
}
}
// handleFileTask 实际文件处理逻辑(示例)
func handleFileTask(task utils.FileUploadTask) {
facades.Log.Info("开始处理文件",
"name", task.FileName,
"path", task.FilePath,
"size", task.FileSize,
)
// 示例逻辑:检查文件是否存在
exists, _ := facades.Storage().Exists(task.FilePath)
if !exists {
facades.Log.Error("文件不存在", "path", task.FilePath)
return
}
// 你的业务逻辑:解析文件、存储到数据库、格式转换等...
}
第七步:创建消费者启动入口
在项目根目录创建cmd/consumer/main.go:
go
运行
package main
import (
"your-project/app/consumers" // 替换为实际模块名
"your-project/bootstrap" // 替换为实际模块名
)
func main() {
// 初始化Goravel应用
bootstrap.Application()
// 启动消费者
consumers.StartFileConsumer()
// 阻塞主线程
select {}
}
测试流程
- 创建 Kafka Topic:
bash
运行
kafka-topics.sh --create --topic file_upload --bootstrap-server 127.0.0.1:9092 --partitions 1 --replication-factor 1 - 启动 Goravel 服务:
bash
运行
go run main.go serve - 启动 Kafka 消费者:
bash
运行
go run cmd/consumer/main.go - 测试上传接口:
bash
运行
curl -X POST -F "file=@/本地文件路径/test.jpg" http://127.0.0.1:3000/file/upload
总结
- 核心流程:文件上传 → 保存到服务器 → 封装任务推送到 Kafka → 消费者异步处理文件,实现了接口和业务解耦。
- 关键要点:
- 配置解耦:Kafka 连接信息通过配置文件 + 环境变量管理,便于环境切换;
- 错误处理:上传失败 / 消息发送失败有回滚逻辑,消费失败有日志和偏移量提交机制;
- 可扩展性:生产者 / 消费者封装为独立模块,便于后续扩展多 topic、多消费者。
- 生产建议:消费者建议用 systemd/supervisor 守护进程,Kafka 配置消息确认机制(
RequiredAcks)确保消息不丢失。
更多推荐

所有评论(0)