你想在 Goravel 框架中实现文件上传后先存储到服务器本地磁盘(作为 Kafka 队列消费前的临时存储),核心是利用 Goravel 内置的Filesystem文件系统组件完成本地文件的保存、读取和清理,同时仅将文件元数据发送到 Kafka 队列。

前置准备

  1. 确保 Goravel 框架已安装(推荐 v1.10+),并配置好基础的 HTTP 服务;
  2. 安装 Goravel 的 Kafka 扩展(推荐官方维护的goravel/kafka):

    bash

    运行

    go get github.com/goravel/kafka
    
  3. 确认服务器本地有可读写的临时目录(如/data/temp-files/),并给 Go 进程赋读写权限:

    bash

    运行

    mkdir -p /data/temp-files && chmod 755 /data/temp-files
    

核心实现步骤

步骤 1:配置 Goravel 本地存储磁盘

Goravel 默认已内置本地存储配置,我们新增一个专门的temp磁盘用于存放待消费的临时文件,修改config/filesystems.php

go

运行

<?php

return [
    'default' => env('FILESYSTEM_DISK', 'local'),

    'disks' => [
        // 原有local磁盘(可保留)
        'local' => [
            'driver' => 'local',
            'root' => base_path('storage/app'),
        ],
        // 新增temp磁盘:专门存待消费的临时文件
        'temp' => [
            'driver' => 'local',
            'root' => '/data/temp-files', // 临时文件存储目录
            'url' => env('APP_URL').'/temp-files',
            'visibility' => 'private', // 私有,仅服务端可访问
        ],
    ],
];
步骤 2:实现文件上传接口(保存到本地 + 发送 Kafka 元数据)

创建文件上传控制器app/http/controllers/FileUploadController.go,核心逻辑是:接收文件→生成唯一文件名→保存到本地临时目录→发送元数据到 Kafka。

go

运行

package controllers

import (
	"encoding/json"
	"fmt"
	"path/filepath"
	"time"

	"github.com/goravel/framework/contracts/http"
	"github.com/goravel/framework/facades"
	"github.com/goravel/kafka"
	"github.com/google/uuid"
)

// 文件元数据结构体(仅存关键信息,发送到Kafka)
type FileMeta struct {
	FileId     string `json:"file_id"`     // 文件唯一标识
	FilePath   string `json:"file_path"`   // 本地临时存储路径
	FileName   string `json:"file_name"`   // 原始文件名
	FileSize   int64  `json:"file_size"`   // 文件大小
	UploadTime string `json:"upload_time"` // 上传时间
}

type FileUploadController struct{}

func NewFileUploadController() *FileUploadController {
	return &FileUploadController{}
}

// Upload 处理文件上传
func (c *FileUploadController) Upload(ctx http.Context) http.Response {
	// 1. 获取上传的文件
	file, err := ctx.Request().File("file")
	if err != nil {
		return ctx.Response().Json(http.StatusBadRequest, http.Json{
			"code": 400,
			"msg":  "获取文件失败:" + err.Error(),
		})
	}

	// 2. 生成唯一文件名(避免重复)
	fileId := uuid.NewString()
	ext := filepath.Ext(file.GetClientOriginalName()) // 获取文件后缀
	tempFileName := fmt.Sprintf("%s%s", fileId, ext)  // 唯一文件名:UUID+后缀

	// 3. 保存文件到本地temp磁盘
	_, err = facades.Storage().Disk("temp").PutFileAs("", file, tempFileName)
	if err != nil {
		return ctx.Response().Json(http.StatusInternalServerError, http.Json{
			"code": 500,
			"msg":  "保存文件失败:" + err.Error(),
		})
	}

	// 4. 构造文件元数据(仅发送到Kafka,不发文件本体)
	fileMeta := FileMeta{
		FileId:     fileId,
		FilePath:   facades.Storage().Disk("temp").Path(tempFileName), // 本地绝对路径
		FileName:   file.GetClientOriginalName(),
		FileSize:   file.GetSize(),
		UploadTime: time.Now().Format("2006-01-02 15:04:05"),
	}
	metaJson, _ := json.Marshal(fileMeta)

	// 5. 发送元数据到Kafka队列(file-upload-topic)
	err = facades.Kafka().Producer().Topic("file-upload-topic").Send(string(metaJson))
	if err != nil {
		// 发送失败:删除已保存的临时文件,避免垃圾文件
		facades.Storage().Disk("temp").Delete(tempFileName)
		return ctx.Response().Json(http.StatusInternalServerError, http.Json{
			"code": 500,
			"msg":  "发送Kafka消息失败:" + err.Error(),
		})
	}

	// 6. 返回成功响应
	return ctx.Response().Json(http.StatusOK, http.Json{
		"code": 200,
		"msg":  "文件上传成功,等待处理",
		"data": fileMeta,
	})
}
步骤 3:注册上传路由

修改routes/api.go,添加文件上传路由:

go

运行

package routes

import (
	"github.com/goravel/framework/contracts/routing"

	"goravel/app/http/controllers"
)

func Api() routing.Route {
	return func(route routing.Router) {
		// 文件上传接口
		route.Post("/file/upload", controllers.NewFileUploadController().Upload)
	}
}
步骤 4:实现 Kafka 消费端(读取本地文件 + 处理 + 清理)

创建 Kafka 消费者app/jobs/FileConsumeJob.go,消费 Kafka 元数据后读取本地文件处理,完成后删除临时文件:

go

运行

package jobs

import (
	"encoding/json"
	"io"
	"os"

	"github.com/goravel/framework/contracts/queue"
	"github.com/goravel/framework/facades"
)

type FileConsumeJob struct {
	FileMeta FileMeta `json:"file_meta"`
}

// Signature 任务唯一标识
func (receiver *FileConsumeJob) Signature() string {
	return "file:consume"
}

// Handle 消费逻辑:读取本地文件→处理→删除临时文件
func (receiver *FileConsumeJob) Handle(args ...queue.Arg) error {
	// 1. 解析Kafka消息中的元数据(也可在消费者入口解析)
	meta := receiver.FileMeta

	// 2. 检查本地文件是否存在
	if !facades.Storage().Disk("temp").Exists(meta.FilePath) {
		return fmt.Errorf("临时文件不存在:%s", meta.FilePath)
	}

	// 3. 读取本地文件(用于后续业务处理)
	file, err := facades.Storage().Disk("temp").Get(meta.FilePath)
	if err != nil {
		return fmt.Errorf("读取文件失败:%s", err.Error())
	}
	defer file.Close()

	// 4. 执行你的业务逻辑(比如解析文件、转存到正式存储、入库等)
	// 示例:读取文件内容
	content, _ := io.ReadAll(file)
	facades.Log().Info(fmt.Sprintf("处理文件 %s,内容长度:%d", meta.FileName, len(content)))

	// 5. 处理完成:删除本地临时文件
	if err := facades.Storage().Disk("temp").Delete(meta.FilePath); err != nil {
		facades.Log().Error(fmt.Sprintf("删除临时文件失败:%s", err.Error()))
		return err
	}

	return nil
}

// 注册Kafka消费者(在app/providers/kafka_service_provider.go中)
// 示例消费者入口:
func RegisterKafkaConsumer() {
	facades.Kafka().Consumer().Topic("file-upload-topic").Group("file-consume-group").Handle(func(message kafka.Message) error {
		// 解析Kafka消息为FileMeta
		var meta FileMeta
		if err := json.Unmarshal([]byte(message.Value), &meta); err != nil {
			return err
		}

		// 投递到Goravel队列(可选,解耦Kafka消费和文件处理)
		return facades.Queue().Dispatch(&FileConsumeJob{FileMeta: meta})
	})
}
步骤 5:配置临时文件过期清理(关键!防止磁盘满)

Goravel 支持定时任务,创建app/console/commands/CleanTempFilesCommand.go,清理超过 24 小时的未消费临时文件:

go

运行

package commands

import (
	"time"

	"github.com/goravel/framework/console"
	"github.com/goravel/framework/console/command"
	"github.com/goravel/framework/facades"
)

type CleanTempFilesCommand struct {
}

func NewCleanTempFilesCommand() *CleanTempFilesCommand {
	return &CleanTempFilesCommand{}
}

func (receiver *CleanTempFilesCommand) Signature() string {
	return "temp:clean"
}

func (receiver *CleanTempFilesCommand) Description() string {
	return "清理超过24小时的未消费临时文件"
}

func (receiver *CleanTempFilesCommand) Handle(ctx console.Context) error {
	// 1. 获取temp磁盘下的所有文件
	files, err := facades.Storage().Disk("temp").AllFiles()
	if err != nil {
		return err
	}

	// 2. 遍历文件,删除超过24小时的文件
	expireTime := time.Now().Add(-24 * time.Hour)
	cleanCount := 0
	for _, file := range files {
		// 获取文件修改时间
		fileInfo, err := os.Stat(facades.Storage().Disk("temp").Path(file))
		if err != nil {
			continue
		}

		// 超过24小时则删除
		if fileInfo.ModTime().Before(expireTime) {
			if err := facades.Storage().Disk("temp").Delete(file); err == nil {
				cleanCount++
			}
		}
	}

	facades.Log().Info(fmt.Sprintf("清理临时文件完成,共删除 %d 个过期文件", cleanCount))
	return nil
}

然后在app/console/kernel.go中注册定时任务,每天凌晨执行:

go

运行

package console

import (
	"time"

	"github.com/goravel/framework/console"
	"github.com/goravel/framework/facades"

	"goravel/app/console/commands"
)

func Kernel() {
	// 注册命令
	facades.Artisan().Register([]console.Command{
		commands.NewCleanTempFilesCommand(),
	})

	// 配置定时任务:每天0点执行临时文件清理
	facades.Schedule().Command("temp:clean").DailyAt("00:00")
}

关键代码解释

  1. Filesystem 磁盘facades.Storage().Disk("temp") 指定使用我们配置的临时文件磁盘,所有操作都基于该目录,隔离性更好;
  2. 唯一文件名:使用uuid.NewString()生成唯一 ID,避免不同用户上传同名文件导致覆盖;
  3. Kafka 消息内容:仅发送文件元数据(路径、名称、大小等),而非文件本体,符合 Kafka 设计原则;
  4. 异常回滚:Kafka 发送失败时,立即删除已保存的临时文件,避免产生垃圾文件;
  5. 过期清理:定时任务保障即使消费失败,过期文件也会被清理,防止磁盘占满。

总结

  1. Goravel 中通过Filesystem组件配置temp本地磁盘,专门存储待 Kafka 消费的临时文件;
  2. 文件上传时仅将元数据发送到 Kafka,文件本体保存到本地temp目录,消费完成后删除;
  3. 必须配置定时清理任务,删除过期未消费的临时文件,避免磁盘容量耗尽。
Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐