gravel
Goravel 中通过Filesystem组件配置temp本地磁盘,专门存储待 Kafka 消费的临时文件;文件上传时仅将元数据发送到 Kafka,文件本体保存到本地temp目录,消费完成后删除;必须配置定时清理任务,删除过期未消费的临时文件,避免磁盘容量耗尽。
·
你想在 Goravel 框架中实现文件上传后先存储到服务器本地磁盘(作为 Kafka 队列消费前的临时存储),核心是利用 Goravel 内置的Filesystem文件系统组件完成本地文件的保存、读取和清理,同时仅将文件元数据发送到 Kafka 队列。
前置准备
- 确保 Goravel 框架已安装(推荐 v1.10+),并配置好基础的 HTTP 服务;
- 安装 Goravel 的 Kafka 扩展(推荐官方维护的
goravel/kafka):bash
运行
go get github.com/goravel/kafka - 确认服务器本地有可读写的临时目录(如
/data/temp-files/),并给 Go 进程赋读写权限:bash
运行
mkdir -p /data/temp-files && chmod 755 /data/temp-files
核心实现步骤
步骤 1:配置 Goravel 本地存储磁盘
Goravel 默认已内置本地存储配置,我们新增一个专门的temp磁盘用于存放待消费的临时文件,修改config/filesystems.php:
go
运行
<?php
return [
'default' => env('FILESYSTEM_DISK', 'local'),
'disks' => [
// 原有local磁盘(可保留)
'local' => [
'driver' => 'local',
'root' => base_path('storage/app'),
],
// 新增temp磁盘:专门存待消费的临时文件
'temp' => [
'driver' => 'local',
'root' => '/data/temp-files', // 临时文件存储目录
'url' => env('APP_URL').'/temp-files',
'visibility' => 'private', // 私有,仅服务端可访问
],
],
];
步骤 2:实现文件上传接口(保存到本地 + 发送 Kafka 元数据)
创建文件上传控制器app/http/controllers/FileUploadController.go,核心逻辑是:接收文件→生成唯一文件名→保存到本地临时目录→发送元数据到 Kafka。
go
运行
package controllers
import (
"encoding/json"
"fmt"
"path/filepath"
"time"
"github.com/goravel/framework/contracts/http"
"github.com/goravel/framework/facades"
"github.com/goravel/kafka"
"github.com/google/uuid"
)
// 文件元数据结构体(仅存关键信息,发送到Kafka)
type FileMeta struct {
FileId string `json:"file_id"` // 文件唯一标识
FilePath string `json:"file_path"` // 本地临时存储路径
FileName string `json:"file_name"` // 原始文件名
FileSize int64 `json:"file_size"` // 文件大小
UploadTime string `json:"upload_time"` // 上传时间
}
type FileUploadController struct{}
func NewFileUploadController() *FileUploadController {
return &FileUploadController{}
}
// Upload 处理文件上传
func (c *FileUploadController) Upload(ctx http.Context) http.Response {
// 1. 获取上传的文件
file, err := ctx.Request().File("file")
if err != nil {
return ctx.Response().Json(http.StatusBadRequest, http.Json{
"code": 400,
"msg": "获取文件失败:" + err.Error(),
})
}
// 2. 生成唯一文件名(避免重复)
fileId := uuid.NewString()
ext := filepath.Ext(file.GetClientOriginalName()) // 获取文件后缀
tempFileName := fmt.Sprintf("%s%s", fileId, ext) // 唯一文件名:UUID+后缀
// 3. 保存文件到本地temp磁盘
_, err = facades.Storage().Disk("temp").PutFileAs("", file, tempFileName)
if err != nil {
return ctx.Response().Json(http.StatusInternalServerError, http.Json{
"code": 500,
"msg": "保存文件失败:" + err.Error(),
})
}
// 4. 构造文件元数据(仅发送到Kafka,不发文件本体)
fileMeta := FileMeta{
FileId: fileId,
FilePath: facades.Storage().Disk("temp").Path(tempFileName), // 本地绝对路径
FileName: file.GetClientOriginalName(),
FileSize: file.GetSize(),
UploadTime: time.Now().Format("2006-01-02 15:04:05"),
}
metaJson, _ := json.Marshal(fileMeta)
// 5. 发送元数据到Kafka队列(file-upload-topic)
err = facades.Kafka().Producer().Topic("file-upload-topic").Send(string(metaJson))
if err != nil {
// 发送失败:删除已保存的临时文件,避免垃圾文件
facades.Storage().Disk("temp").Delete(tempFileName)
return ctx.Response().Json(http.StatusInternalServerError, http.Json{
"code": 500,
"msg": "发送Kafka消息失败:" + err.Error(),
})
}
// 6. 返回成功响应
return ctx.Response().Json(http.StatusOK, http.Json{
"code": 200,
"msg": "文件上传成功,等待处理",
"data": fileMeta,
})
}
步骤 3:注册上传路由
修改routes/api.go,添加文件上传路由:
go
运行
package routes
import (
"github.com/goravel/framework/contracts/routing"
"goravel/app/http/controllers"
)
func Api() routing.Route {
return func(route routing.Router) {
// 文件上传接口
route.Post("/file/upload", controllers.NewFileUploadController().Upload)
}
}
步骤 4:实现 Kafka 消费端(读取本地文件 + 处理 + 清理)
创建 Kafka 消费者app/jobs/FileConsumeJob.go,消费 Kafka 元数据后读取本地文件处理,完成后删除临时文件:
go
运行
package jobs
import (
"encoding/json"
"io"
"os"
"github.com/goravel/framework/contracts/queue"
"github.com/goravel/framework/facades"
)
type FileConsumeJob struct {
FileMeta FileMeta `json:"file_meta"`
}
// Signature 任务唯一标识
func (receiver *FileConsumeJob) Signature() string {
return "file:consume"
}
// Handle 消费逻辑:读取本地文件→处理→删除临时文件
func (receiver *FileConsumeJob) Handle(args ...queue.Arg) error {
// 1. 解析Kafka消息中的元数据(也可在消费者入口解析)
meta := receiver.FileMeta
// 2. 检查本地文件是否存在
if !facades.Storage().Disk("temp").Exists(meta.FilePath) {
return fmt.Errorf("临时文件不存在:%s", meta.FilePath)
}
// 3. 读取本地文件(用于后续业务处理)
file, err := facades.Storage().Disk("temp").Get(meta.FilePath)
if err != nil {
return fmt.Errorf("读取文件失败:%s", err.Error())
}
defer file.Close()
// 4. 执行你的业务逻辑(比如解析文件、转存到正式存储、入库等)
// 示例:读取文件内容
content, _ := io.ReadAll(file)
facades.Log().Info(fmt.Sprintf("处理文件 %s,内容长度:%d", meta.FileName, len(content)))
// 5. 处理完成:删除本地临时文件
if err := facades.Storage().Disk("temp").Delete(meta.FilePath); err != nil {
facades.Log().Error(fmt.Sprintf("删除临时文件失败:%s", err.Error()))
return err
}
return nil
}
// 注册Kafka消费者(在app/providers/kafka_service_provider.go中)
// 示例消费者入口:
func RegisterKafkaConsumer() {
facades.Kafka().Consumer().Topic("file-upload-topic").Group("file-consume-group").Handle(func(message kafka.Message) error {
// 解析Kafka消息为FileMeta
var meta FileMeta
if err := json.Unmarshal([]byte(message.Value), &meta); err != nil {
return err
}
// 投递到Goravel队列(可选,解耦Kafka消费和文件处理)
return facades.Queue().Dispatch(&FileConsumeJob{FileMeta: meta})
})
}
步骤 5:配置临时文件过期清理(关键!防止磁盘满)
Goravel 支持定时任务,创建app/console/commands/CleanTempFilesCommand.go,清理超过 24 小时的未消费临时文件:
go
运行
package commands
import (
"time"
"github.com/goravel/framework/console"
"github.com/goravel/framework/console/command"
"github.com/goravel/framework/facades"
)
type CleanTempFilesCommand struct {
}
func NewCleanTempFilesCommand() *CleanTempFilesCommand {
return &CleanTempFilesCommand{}
}
func (receiver *CleanTempFilesCommand) Signature() string {
return "temp:clean"
}
func (receiver *CleanTempFilesCommand) Description() string {
return "清理超过24小时的未消费临时文件"
}
func (receiver *CleanTempFilesCommand) Handle(ctx console.Context) error {
// 1. 获取temp磁盘下的所有文件
files, err := facades.Storage().Disk("temp").AllFiles()
if err != nil {
return err
}
// 2. 遍历文件,删除超过24小时的文件
expireTime := time.Now().Add(-24 * time.Hour)
cleanCount := 0
for _, file := range files {
// 获取文件修改时间
fileInfo, err := os.Stat(facades.Storage().Disk("temp").Path(file))
if err != nil {
continue
}
// 超过24小时则删除
if fileInfo.ModTime().Before(expireTime) {
if err := facades.Storage().Disk("temp").Delete(file); err == nil {
cleanCount++
}
}
}
facades.Log().Info(fmt.Sprintf("清理临时文件完成,共删除 %d 个过期文件", cleanCount))
return nil
}
然后在app/console/kernel.go中注册定时任务,每天凌晨执行:
go
运行
package console
import (
"time"
"github.com/goravel/framework/console"
"github.com/goravel/framework/facades"
"goravel/app/console/commands"
)
func Kernel() {
// 注册命令
facades.Artisan().Register([]console.Command{
commands.NewCleanTempFilesCommand(),
})
// 配置定时任务:每天0点执行临时文件清理
facades.Schedule().Command("temp:clean").DailyAt("00:00")
}
关键代码解释
- Filesystem 磁盘:
facades.Storage().Disk("temp")指定使用我们配置的临时文件磁盘,所有操作都基于该目录,隔离性更好; - 唯一文件名:使用
uuid.NewString()生成唯一 ID,避免不同用户上传同名文件导致覆盖; - Kafka 消息内容:仅发送文件元数据(路径、名称、大小等),而非文件本体,符合 Kafka 设计原则;
- 异常回滚:Kafka 发送失败时,立即删除已保存的临时文件,避免产生垃圾文件;
- 过期清理:定时任务保障即使消费失败,过期文件也会被清理,防止磁盘占满。
总结
- Goravel 中通过
Filesystem组件配置temp本地磁盘,专门存储待 Kafka 消费的临时文件; - 文件上传时仅将元数据发送到 Kafka,文件本体保存到本地
temp目录,消费完成后删除; - 必须配置定时清理任务,删除过期未消费的临时文件,避免磁盘容量耗尽。
更多推荐

所有评论(0)