前置准备

  1. 已搭建 Goravel 项目(go install github.com/goravel/framework@latest → goravel new project
  2. 安装 Kafka Go 客户端:go get github.com/segmentio/kafka-go
  3. Kafka 服务已部署并可访问(获取 broker 地址、可创建 topic)

第一步:配置 Kafka 连接信息

config目录下新建kafka.go,统一管理 Kafka 配置:

go

运行

package config

import "github.com/goravel/framework/facades"

func init() {
    config := facades.Config
    // 添加Kafka配置
    config.Add("kafka", map[string]interface{}{
        "brokers":   config.Env("KAFKA_BROKERS", "127.0.0.1:9092"), // Kafka地址
        "topic":     config.Env("KAFKA_FILE_TOPIC", "file_upload"), // 任务topic
        "group_id":  config.Env("KAFKA_GROUP_ID", "file_consumer"), // 消费者组ID
        "timeout":   10, // 超时时间(秒)
    })
}

.env文件补充配置:

env

KAFKA_BROKERS=127.0.0.1:9092
KAFKA_FILE_TOPIC=file_upload
KAFKA_GROUP_ID=file_consumer

第二步:封装 Kafka 生产者工具

app/utils目录创建kafka_producer.go,封装消息发送逻辑:

go

运行

package utils

import (
    "context"
    "encoding/json"
    "fmt"

    "github.com/goravel/framework/facades"
    "github.com/segmentio/kafka-go"
)

// FileUploadTask 定义Kafka消息结构体(文件任务信息)
type FileUploadTask struct {
    FilePath string `json:"file_path"` // 文件保存路径
    FileName string `json:"file_name"` // 文件名
    FileSize int64  `json:"file_size"` // 文件大小(字节)
    MimeType string `json:"mime_type"` // 文件类型
}

var kafkaWriter *kafka.Writer

// InitKafkaProducer 初始化Kafka生产者
func InitKafkaProducer() {
    brokers := facades.Config.GetString("kafka.brokers")
    topic := facades.Config.GetString("kafka.topic")

    kafkaWriter = kafka.NewWriter(kafka.WriterConfig{
        Brokers:  []string{brokers},
        Topic:    topic,
        Balancer: &kafka.LeastBytes{}, // 负载均衡策略
    })
    facades.Log.Info("Kafka生产者初始化成功")
}

// SendFileTask 发送文件任务到Kafka队列
func SendFileTask(task FileUploadTask) error {
    if kafkaWriter == nil {
        InitKafkaProducer()
    }

    // 序列化任务为JSON
    msgBytes, err := json.Marshal(task)
    if err != nil {
        facades.Log.Error("序列化文件任务失败", err)
        return fmt.Errorf("序列化失败: %v", err)
    }

    // 发送消息
    err = kafkaWriter.WriteMessages(context.Background(),
        kafka.Message{Value: msgBytes},
    )
    if err != nil {
        facades.Log.Error("发送Kafka消息失败", err)
        return fmt.Errorf("发送失败: %v", err)
    }

    return nil
}

第三步:初始化 Kafka 生产者

修改bootstrap/app.go,在项目启动时初始化生产者:

go

运行

package bootstrap

import (
    "your-project/app/utils" // 替换为实际模块名
    "github.com/goravel/framework/foundation"
)

func Boot(app *foundation.Application) {
    // 其他初始化逻辑...
    
    // 初始化Kafka生产者
    utils.InitKafkaProducer()
}

第四步:实现文件上传接口

app/http/controllers创建FileController.go

go

运行

package controllers

import (
    "net/http"
    "path/filepath"
    "time"

    "github.com/goravel/framework/contracts/http"
    "github.com/goravel/framework/facades"
    "github.com/goravel/framework/http/validate"
    "your-project/app/utils" // 替换为实际模块名
)

type FileController struct{}

// Upload 文件上传接口
func (c *FileController) Upload(ctx http.Context) http.Response {
    // 1. 获取上传文件
    file, err := ctx.Request().File("file")
    if err != nil {
        return ctx.Response().Json(http.StatusBadRequest, http.Json{
            "code":    400,
            "message": "获取文件失败",
        })
    }

    // 2. 验证文件(50MB上限、允许的扩展名)
    validateErr := validate.File(ctx, validate.FileRules{
        Field: "file",
        Rules: []validate.FileRule{
            validate.FileSize(1024 * 1024 * 50), // 50MB
            validate.FileExts([]string{"jpg", "png", "pdf", "txt"}),
        },
    })
    if validateErr != nil {
        return ctx.Response().Json(http.StatusBadRequest, http.Json{
            "code":    400,
            "message": validateErr.Error(),
        })
    }

    // 3. 生成唯一文件名,避免重复
    fileName := fmt.Sprintf("%d_%s", time.Now().UnixMicro(), file.GetClientOriginalName())
    savePath := filepath.Join("storage", "uploads", fileName)

    // 4. 保存文件到服务器
    if err := file.StoreAs(savePath); err != nil {
        return ctx.Response().Json(http.StatusInternalServerError, http.Json{
            "code":    500,
            "message": "保存文件失败",
        })
    }

    // 5. 封装任务并发送到Kafka
    task := utils.FileUploadTask{
        FilePath: savePath,
        FileName: fileName,
        FileSize: file.GetSize(),
        MimeType: file.GetMimeType(),
    }
    if err := utils.SendFileTask(task); err != nil {
        // 发送失败,回滚文件保存
        facades.Storage().Delete(savePath)
        return ctx.Response().Json(http.StatusInternalServerError, http.Json{
            "code":    500,
            "message": "任务入队失败",
        })
    }

    // 6. 返回成功响应
    return ctx.Response().Json(http.StatusOK, http.Json{
        "code":    200,
        "message": "文件上传成功,任务已入队",
        "data": map[string]string{
            "file_name": fileName,
            "file_path": savePath,
        },
    })
}

第五步:注册上传路由

修改routes/api.go

go

运行

package routes

import (
    "github.com/goravel/framework/facades"
    "your-project/app/http/controllers" // 替换为实际模块名
)

func Api() {
    router := facades.Route
    // 文件上传路由
    router.Post("/file/upload", controllers.FileController{}.Upload)
}

第六步:编写 Kafka 消费者

app/consumers创建file_consumer.go,处理队列任务:

go

运行

package consumers

import (
    "context"
    "encoding/json"

    "github.com/goravel/framework/facades"
    "github.com/segmentio/kafka-go"
    "your-project/app/utils" // 替换为实际模块名
)

// StartFileConsumer 启动文件任务消费者
func StartFileConsumer() {
    brokers := facades.Config.GetString("kafka.brokers")
    topic := facades.Config.GetString("kafka.topic")
    groupID := facades.Config.GetString("kafka.group_id")

    // 创建Kafka消费者
    reader := kafka.NewReader(kafka.ReaderConfig{
        Brokers:  []string{brokers},
        Topic:    topic,
        GroupID:  groupID,
        MinBytes: 1024,  // 最小读取字节
        MaxBytes: 10240, // 最大读取字节
    })
    defer reader.Close()

    facades.Log.Info("Kafka消费者已启动,监听topic: " + topic)

    // 循环消费消息
    for {
        msg, err := reader.ReadMessage(context.Background())
        if err != nil {
            facades.Log.Error("读取Kafka消息失败", err)
            continue
        }

        // 反序列化消息
        var task utils.FileUploadTask
        if err := json.Unmarshal(msg.Value, &task); err != nil {
            facades.Log.Error("反序列化消息失败", err)
            // 提交偏移量,避免重复消费
            _ = reader.CommitMessages(context.Background(), msg)
            continue
        }

        // 核心业务处理(替换为你的实际逻辑)
        handleFileTask(task)

        // 提交偏移量,确认消费完成
        if err := reader.CommitMessages(context.Background(), msg); err != nil {
            facades.Log.Error("提交偏移量失败", err)
        } else {
            facades.Log.Info("文件任务处理完成", "file", task.FileName)
        }
    }
}

// handleFileTask 实际文件处理逻辑(示例)
func handleFileTask(task utils.FileUploadTask) {
    facades.Log.Info("开始处理文件", 
        "name", task.FileName,
        "path", task.FilePath,
        "size", task.FileSize,
    )

    // 示例逻辑:检查文件是否存在
    exists, _ := facades.Storage().Exists(task.FilePath)
    if !exists {
        facades.Log.Error("文件不存在", "path", task.FilePath)
        return
    }

    // 你的业务逻辑:解析文件、存储到数据库、格式转换等...
}

第七步:创建消费者启动入口

在项目根目录创建cmd/consumer/main.go

go

运行

package main

import (
    "your-project/app/consumers" // 替换为实际模块名
    "your-project/bootstrap"     // 替换为实际模块名
)

func main() {
    // 初始化Goravel应用
    bootstrap.Application()
    // 启动消费者
    consumers.StartFileConsumer()

    // 阻塞主线程
    select {}
}

测试流程

  1. 创建 Kafka Topic

    bash

    运行

    kafka-topics.sh --create --topic file_upload --bootstrap-server 127.0.0.1:9092 --partitions 1 --replication-factor 1
    
  2. 启动 Goravel 服务

    bash

    运行

    go run main.go serve
    
  3. 启动 Kafka 消费者

    bash

    运行

    go run cmd/consumer/main.go
    
  4. 测试上传接口

    bash

    运行

    curl -X POST -F "file=@/本地文件路径/test.jpg" http://127.0.0.1:3000/file/upload
    

总结

  1. 核心流程:文件上传 → 保存到服务器 → 封装任务推送到 Kafka → 消费者异步处理文件,实现了接口和业务解耦。
  2. 关键要点
    • 配置解耦:Kafka 连接信息通过配置文件 + 环境变量管理,便于环境切换;
    • 错误处理:上传失败 / 消息发送失败有回滚逻辑,消费失败有日志和偏移量提交机制;
    • 可扩展性:生产者 / 消费者封装为独立模块,便于后续扩展多 topic、多消费者。
  3. 生产建议:消费者建议用 systemd/supervisor 守护进程,Kafka 配置消息确认机制(RequiredAcks)确保消息不丢失。
Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐