最近在折腾一个个人项目,需要存点文件,但直接用本地磁盘感觉太low,用云盘又有点大材小用。我就琢磨,能不能自己搭一个稳定、安全、易用的迷你私有文件存储服务呢?

一番调研下来,我把目光锁定在了 RustFS 上。不过,在动手之前,咱们得先搞清楚一个核心概念——这直接决定了后续的开发方向,避免走弯路:

注意:RustFS 本身是一个分布式对象存储系统,类似于 MinIO。它是一个“服务端”软件,而不是一个让你用来“构建”文件系统的“库”。我们今天要做的,是利用 RustFS 作为后端存储,编写一个具备高可靠性、易用性、安全性的客户端程序,这个程序将实现迷你文件系统的核心功能(上传、下载、列表、批量操作等)。

明白了这一点,我们就可以开始动手了!这就像我们用一块高性能的硬盘(RustFS),打造一个带“进度条、指纹校验、安全锁”的移动U盘(我们的客户端程序)。

第一步:环境准备 - 安全启动 RustFS 后端

既然要用 RustFS 当后端,第一步当然是把它部署好。最简单且通用的方式,就是用 Docker Compose。

前置条件

确保你的机器上安装了 Docker 和 Docker Compose(建议使用最新稳定版)。

部署步骤

  1. 拉取项目并进入目录

    git clone https://gitcode.com/GitHub_Trending/rus/rustfs.git
    cd rustfs
    
  2. 修改默认配置(安全第一步)
    打开项目自带的 docker-compose.yml​ 文件,做两个关键修改:

    environment:
      - RUSTFS_ACCESS_KEY=mycustomaccesskey
      - RUSTFS_SECRET_KEY=mycustomsecretkey123
    ports:
      - "9001:9000"
    
    • 端口映射:将默认端口改成 9001​(避免和 MinIO 等服务冲突);

    • 密钥修改:替换默认的 minioadmin:minioadmin​ 密钥(防止未授权访问)。
      示例修改片段:

  3. 一键启动并验证

    # 后台启动核心服务
    docker-compose up -d
    
    # 验证服务是否存活
    curl http://localhost:9001/minio/health/live
    

    如果返回 OK​,恭喜你,你的私有对象存储后端已经就绪!
    此时 RustFS 会自动创建一个名为 my-bucket​ 的存储桶,后续客户端将基于这个桶进行文件操作。

第二步:创建客户端项目 - 集成工程化最佳实践

后端就绪后,我们开始编写客户端程序。这次我们不做“能用就行”的 demo,而是直接按照生产级标准构建,集成错误处理、日志、配置管理等核心能力。

1. 初始化项目

cargo new my-mini-fs
cd my-mini-fs

2. 添加核心依赖

打开 Cargo.toml​,添加以下依赖(覆盖可靠性、易用性、安全性需求):

[package]
name = "my-mini-fs"
version = "0.1.0"
edition = "2021"

[dependencies]
# 异步运行时
tokio = { version = "1", features = ["full"] }
# HTTP 客户端
reqwest = { version = "0.11", features = ["json", "stream"] }
# 序列化/反序列化
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_xml_rs = "0.9"
# CLI 构建
clap = { version = "4", features = ["derive", "cargo"] }
# 错误处理
thiserror = "1.0"
# 结构化日志
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
# 进度条
indicatif = "0.17"
# 哈希校验(文件完整性)
sha2 = "0.10"
hex = "0.4"
# 重试机制
backoff = { version = "0.4", features = ["tokio", "futures"] }
# 配置管理
config = { version = "0.13", features = ["toml"] }
# 密钥安全存储
keyring = "2.0"
# 并发控制
tokio-util = { version = "0.7", features = ["io"] }

3. 配置管理 - 告别硬编码参数

每次执行命令都传 --endpoint​ --access-key​ 太繁琐?我们实现 配置文件 + 环境变量 + 命令行参数 三级配置,优先级:命令行参数 > 环境变量 > 配置文件​。

(1)定义配置结构

在 src​ 目录下创建 config.rs​ 文件:

use serde::Deserialize;
use std::path::PathBuf;

#[derive(Debug, Deserialize, Clone)]
pub struct FsConfig {
    pub endpoint: String,
    pub access_key: String,
    pub secret_key: String,
    pub bucket: String,
}

impl FsConfig {
    // 加载配置:优先读取 ~/.mini-fs/config.toml,支持环境变量覆盖
    pub fn load() -> Result<Self, Box<dyn std::error::Error>> {
        let mut settings = config::Config::default();
        let config_dir = dirs::home_dir()
            .map(|dir| dir.join(".mini-fs").join("config.toml"))
            .unwrap_or_else(|| PathBuf::from("config.toml"));

        // 加载配置文件
        if config_dir.exists() {
            settings.merge(config::File::from(config_dir).format(config::FileFormat::Toml))?;
        }

        // 加载环境变量(前缀 MINI_FS_)
        settings.merge(config::Environment::with_prefix("MINI_FS"))?;

        // 解析为配置结构体
        let mut config: FsConfig = settings.try_into()?;

        // 填充默认值
        if config.bucket.is_empty() {
            config.bucket = "my-bucket".to_string();
        }

        Ok(config)
    }
}
(2)创建默认配置文件

在用户主目录下创建配置文件 ~/.mini-fs/config.toml​:

endpoint = "http://localhost:9001"
access_key = "mycustomaccesskey"
secret_key = "mycustomsecretkey123"
bucket = "my-bucket"

4. 错误处理 - 精准定位问题根源

在 src​ 目录下创建 error.rs​ 文件,用 thiserror​ 定义自定义错误类型,告别模糊的 Box<dyn Error>​:

use thiserror::Error;

#[derive(Error, Debug)]
pub enum FsError {
    #[error("网络请求失败: {0}")]
    NetworkError(#[from] reqwest::Error),

    #[error("文件操作失败: {0}")]
    FileError(#[from] tokio::io::Error),

    #[error("配置加载失败: {0}")]
    ConfigError(String),

    #[error("认证失败: 请检查 AccessKey/SecretKey 是否正确")]
    AuthFailed,

    #[error("文件不存在: {0}")]
    FileNotFound(String),

    #[error("哈希校验失败: 本地文件与远程文件不一致")]
    HashMismatch,

    #[error("XML 解析失败: {0}")]
    XmlParseError(#[from] serde_xml_rs::Error),

    #[error("密钥存储操作失败: {0}")]
    KeyringError(#[from] keyring::Error),
}

impl From<config::ConfigError> for FsError {
    fn from(err: config::ConfigError) -> Self {
        FsError::ConfigError(err.to_string())
    }
}

5. 工具函数 - 封装通用能力

在 src​ 目录下创建 utils.rs​ 文件,实现文件哈希计算、进度条创建等通用功能:

use crate::error::FsError;
use indicatif::{ProgressBar, ProgressStyle};
use sha2::{Digest, Sha256};
use std::path::Path;

// 计算文件 SHA256 哈希
pub async fn calculate_file_hash(path: &Path) -> Result<String, FsError> {
    let mut file = tokio::fs::File::open(path).await?;
    let mut hasher = Sha256::new();
    let mut buffer = [0; 4096];

    loop {
        let n = file.read(&mut buffer).await?;
        if n == 0 {
            break;
        }
        hasher.update(&buffer[..n]);
    }

    let hash = hasher.finalize();
    Ok(hex::encode(hash))
}

// 创建上传/下载进度条
pub fn create_progress_bar(len: u64) -> ProgressBar {
    let pb = ProgressBar::new(len);
    pb.set_style(
        ProgressStyle::default_bar()
            .template("{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {bytes}/{total_bytes} ({bytes_per_sec}, {eta})")
            .unwrap()
            .progress_chars("#>-"),
    );
    pb
}

第三步:编写核心功能 - 高可靠、易用的文件操作

打开 src/main.rs​,整合上述模块,实现带进度条、哈希校验、重试机制的上传、下载、列表、批量操作功能。

1. 导入依赖与模块

mod config;
mod error;
mod utils;

use backoff::{backoff::Backoff, ExponentialBackoff};
use clap::{Parser, Subcommand};
use config::FsConfig;
use error::FsError;
use indicatif::ProgressBar;
use reqwest::header;
use serde::Deserialize;
use std::path::Path;
use tokio::fs::File;
use tokio_util::io::ReaderStream;
use tracing::{info, warn, error};
use utils::{calculate_file_hash, create_progress_bar};

#[derive(Parser)]
#[command(name = "my-mini-fs")]
#[command(about = "一个基于 RustFS 的高可靠迷你文件系统客户端", long_about = None)]
#[command(author, version)]
struct Cli {
    /// RustFS 服务地址(优先级高于配置文件)
    #[arg(long)]
    endpoint: Option<String>,

    /// Access Key(优先级高于配置文件)
    #[arg(long)]
    access_key: Option<String>,

    /// Secret Key(优先级高于配置文件)
    #[arg(long)]
    secret_key: Option<String>,

    #[command(subcommand)]
    command: Commands,
}

#[derive(Subcommand)]
enum Commands {
    /// 上传单个文件
    Upload {
        /// 本地文件路径
        local_path: String,
        /// 远程对象名
        object_name: String,
    },

    /// 下载单个文件
    Download {
        /// 远程对象名
        object_name: String,
        /// 本地保存路径
        local_path: String,
    },

    /// 列出桶内所有文件
    List,

    /// 批量上传文件夹
    UploadDir {
        /// 本地文件夹路径
        local_dir: String,
        /// 远程前缀(可选)
        #[arg(default_value = "")]
        prefix: String,
    },

    /// 安全存储密钥到系统密钥环
    SaveKey {
        /// Access Key
        access_key: String,
        /// Secret Key
        secret_key: String,
    },
}

2. 定义列表 XML 解析结构

RustFS 列表接口返回 XML 格式,我们用 serde_xml_rs​ 解析为友好的结构体:

#[derive(Debug, Deserialize)]
#[serde(rename = "ListBucketResult")]
struct ListBucketResult {
    #[serde(rename = "Contents")]
    contents: Vec<ObjectInfo>,
}

#[derive(Debug, Deserialize)]
struct ObjectInfo {
    #[serde(rename = "Key")]
    key: String,
    #[serde(rename = "Size")]
    size: u64,
    #[serde(rename = "LastModified")]
    last_modified: String,
}

3. 核心 HTTP 请求封装(带重试)

实现带指数退避重试的 HTTP 请求函数,提升网络稳定性:

async fn request_with_retry<F, T>(mut f: F) -> Result<T, FsError>
where
    F: FnMut() -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<T, FsError>>>>,
{
    let backoff = ExponentialBackoff::default();
    let mut backoff_iter = backoff.into_iter();

    loop {
        match f().await {
            Ok(res) => return Ok(res),
            Err(e) => match backoff_iter.next() {
                Some(duration) => {
                    warn!("请求失败: {:?},{} 毫秒后重试", e, duration.as_millis());
                    tokio::time::sleep(duration).await;
                }
                None => {
                    error!("请求重试次数耗尽: {:?}", e);
                    return Err(e);
                }
            },
        }
    }
}

4. 实现具体命令逻辑

在 main​ 函数中匹配 CLI 命令,实现所有核心功能:

#[tokio::main]
async fn main() -> Result<(), FsError> {
    // 初始化结构化日志
    tracing_subscriber::fmt()
        .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
        .init();

    // 加载配置
    let mut config = FsConfig::load()?;
    let cli = Cli::parse();

    // 命令行参数覆盖配置文件
    if let Some(endpoint) = cli.endpoint {
        config.endpoint = endpoint;
    }
    if let Some(access_key) = cli.access_key {
        config.access_key = access_key;
    }
    if let Some(secret_key) = cli.secret_key {
        config.secret_key = secret_key;
    }

    // 创建 HTTP 客户端
    let client = reqwest::Client::new();

    match &cli.command {
        Commands::Upload { local_path, object_name } => {
            upload_file(&client, &config, local_path, object_name).await?;
        }
        Commands::Download { object_name, local_path } => {
            download_file(&client, &config, object_name, local_path).await?;
        }
        Commands::List => {
            list_files(&client, &config).await?;
        }
        Commands::UploadDir { local_dir, prefix } => {
            upload_dir(&client, &config, local_dir, prefix).await?;
        }
        Commands::SaveKey { access_key, secret_key } => {
            save_key_to_keyring(access_key, secret_key).await?;
        }
    }

    Ok(())
}

// 上传文件(带进度条、哈希校验、重试)
async fn upload_file(
    client: &reqwest::Client,
    config: &FsConfig,
    local_path: &str,
    object_name: &str,
) -> Result<(), FsError> {
    let path = Path::new(local_path);
    if !path.exists() {
        return Err(FsError::FileNotFound(local_path.to_string()));
    }

    // 计算本地文件哈希
    let local_hash = calculate_file_hash(path).await?;
    info!("本地文件哈希: {}", local_hash);

    // 获取文件大小,创建进度条
    let file_size = tokio::fs::metadata(path).await?.len();
    let pb = create_progress_bar(file_size);
    pb.set_message(format!("正在上传 {}", local_path));

    // 流式读取文件(避免大文件内存溢出)
    let file = File::open(path).await?;
    let stream = ReaderStream::new(file);
    let body = reqwest::Body::wrap_stream(stream);

    // 构造请求 URL
    let url = format!("{}/{}/{}", config.endpoint, config.bucket, object_name);

    // 带重试的上传请求
    let response = request_with_retry(|| {
        let client = client.clone();
        let url = url.clone();
        let config = config.clone();
        Box::pin(async move {
            let response = client
                .put(&url)
                .basic_auth(&config.access_key, Some(&config.secret_key))
                .header(header::CONTENT_LENGTH, file_size)
                .header("x-amz-meta-hash", &local_hash)
                .body(body.clone())
                .send()
                .await?;

            if response.status().is_success() {
                Ok(response)
            } else if response.status() == 401 {
                Err(FsError::AuthFailed)
            } else {
                Err(FsError::NetworkError(response.error_for_status()?))
            }
        })
    }).await?;

    pb.finish_with_message(format!("{} 上传完成", local_path));

    if response.status().is_success() {
        info!("上传成功!");
    } else {
        error!("上传失败: {}", response.text().await?);
    }

    Ok(())
}

// 下载文件(带进度条、哈希校验)
async fn download_file(
    client: &reqwest::Client,
    config: &FsConfig,
    object_name: &str,
    local_path: &str,
) -> Result<(), FsError> {
    let url = format!("{}/{}/{}", config.endpoint, config.bucket, object_name);
    let path = Path::new(local_path);

    // 带重试的下载请求
    let response = request_with_retry(|| {
        let client = client.clone();
        let url = url.clone();
        let config = config.clone();
        Box::pin(async move {
            let response = client
                .get(&url)
                .basic_auth(&config.access_key, Some(&config.secret_key))
                .send()
                .await?;

            if response.status().is_success() {
                Ok(response)
            } else if response.status() == 404 {
                Err(FsError::FileNotFound(object_name.to_string()))
            } else if response.status() == 401 {
                Err(FsError::AuthFailed)
            } else {
                Err(FsError::NetworkError(response.error_for_status()?))
            }
        })
    }).await?;

    // 获取文件大小和远程哈希
    let file_size = response.content_length().unwrap_or(0);
    let remote_hash = response
        .headers()
        .get("x-amz-meta-hash")
        .map(|h| h.to_str().unwrap_or(""))
        .unwrap_or("");
    info!("远程文件哈希: {}", remote_hash);

    // 创建进度条
    let pb = create_progress_bar(file_size);
    pb.set_message(format!("正在下载 {}", object_name));

    // 流式写入文件
    let mut file = File::create(path).await?;
    let mut stream = response.bytes_stream();
    while let Some(chunk) = stream.next().await {
        let chunk = chunk.map_err(FsError::NetworkError)?;
        file.write_all(&chunk).await?;
        pb.inc(chunk.len() as u64);
    }
    pb.finish_with_message(format!("{} 下载完成", object_name));

    // 哈希校验
    if !remote_hash.is_empty() {
        let local_hash = calculate_file_hash(path).await?;
        if local_hash != remote_hash {
            tokio::fs::remove_file(path).await?;
            return Err(FsError::HashMismatch);
        }
        info!("哈希校验通过,文件完整");
    }

    Ok(())
}

// 列出文件(解析 XML 为友好格式)
async fn list_files(client: &reqwest::Client, config: &FsConfig) -> Result<(), FsError> {
    info!("正在列出 {} 中的所有文件...", config.bucket);
    let url = format!("{}/{}", config.endpoint, config.bucket);

    let response = request_with_retry(|| {
        let client = client.clone();
        let url = url.clone();
        let config = config.clone();
        Box::pin(async move {
            let response = client
                .get(&url)
                .basic_auth(&config.access_key, Some(&config.secret_key))
                .send()
                .await?;

            if response.status().is_success() {
                Ok(response)
            } else if response.status() == 401 {
                Err(FsError::AuthFailed)
            } else {
                Err(FsError::NetworkError(response.error_for_status()?))
            }
        })
    }).await?;

    let xml_str = response.text().await?;
    let list_result: ListBucketResult = serde_xml_rs::from_str(&xml_str)?;

    // 格式化输出
    println!("\n=== {} 文件列表 ===", config.bucket);
    println!("{:<40} {:<10} {}", "文件名", "大小(字节)", "最后修改时间");
    println!("{}", "-".repeat(80));
    for obj in list_result.contents {
        println!("{:<40} {:<10} {}", obj.key, obj.size, obj.last_modified);
    }

    Ok(())
}

// 批量上传文件夹(带并发控制)
async fn upload_dir(
    client: &reqwest::Client,
    config: &FsConfig,
    local_dir: &str,
    prefix: &str,
) -> Result<(), FsError> {
    let dir = Path::new(local_dir);
    if !dir.is_dir() {
        return Err(FsError::FileNotFound(local_dir.to_string()));
    }

    // 并发控制:最多 8 个文件同时上传
    let semaphore = std::sync::Arc::new(tokio::sync::Semaphore::new(8));
    let mut tasks = Vec::new();

    // 遍历文件夹
    let entries = tokio::fs::read_dir(dir).await?;
    for entry in entries {
        let entry = entry?;
        let path = entry.path();
        if path.is_file() {
            // 获取文件名称,拼接远程前缀
            let file_name = path.file_name().unwrap().to_str().unwrap().to_string();
            let object_name = if prefix.is_empty() {
                file_name.clone()
            } else {
                format!("{}/{}", prefix, file_name)
            };

            // 申请并发许可
            let permit = semaphore.clone().acquire_owned().await?;
            let client_clone = client.clone();
            let config_clone = config.clone();
            let local_path_str = path.to_str().unwrap().to_string();

            // 异步上传
            let task = tokio::spawn(async move {
                let _permit = permit; // 持有许可直到任务完成
                if let Err(e) = upload_file(&client_clone, &config_clone, &local_path_str, &object_name).await {
                    error!("上传 {} 失败: {:?}", local_path_str, e);
                }
            });
            tasks.push(task);
        }
    }

    // 等待所有任务完成
    for task in tasks {
        task.await?;
    }

    info!("文件夹 {} 批量上传完成", local_dir);
    Ok(())
}

// 保存密钥到系统密钥环(避免明文存储)
async fn save_key_to_keyring(access_key: &str, secret_key: &str) -> Result<(), FsError> {
    let entry = keyring::Entry::new("my-mini-fs", "rustfs-credentials")?;
    entry.set_password(&format!("{}:{}", access_key, secret_key))?;
    info!("密钥已安全存储到系统密钥环");
    Ok(())
}

第四步:编译与测试 - 验证所有功能

代码编写完成后,我们通过一系列测试验证客户端的稳定性和功能完整性。

1. 编译项目

cargo build --release

编译后的可执行文件位于 target/release/my-mini-fs​。

2. 功能测试

(1)保存密钥到系统密钥环(可选)
./target/release/my-mini-fs save-key --access-key mycustomaccesskey --secret-key mycustomsecretkey123
(2)上传单个文件
# 创建测试文件
echo "Hello, RustFS!" > test.txt

# 上传文件
./target/release/my-mini-fs upload ./test.txt hello.txt

此时会看到带进度条的上传过程,终端输出哈希值和成功提示。

(3)列出文件
./target/release/my-mini-fs list

客户端会解析 XML 并输出友好的表格格式文件列表。

(4)下载文件并校验
./target/release/my-mini-fs download hello.txt downloaded.txt

下载完成后会自动校验哈希,确保文件完整。

(5)批量上传文件夹
# 创建测试文件夹和文件
mkdir test_dir
echo "File 1" > test_dir/file1.txt
echo "File 2" > test_dir/file2.txt

# 批量上传
./target/release/my-mini-fs upload-dir ./test_dir test_prefix/

客户端会并发上传文件夹内的所有文件,最多同时上传 8 个。

第五步:进阶优化与拓展方向

我们的客户端已经具备高可靠、易用、安全的核心特性,但还有更多进阶功能可以探索:

  1. 端到端加密:使用 aes-gcm​ 库在上传前加密文件,下载后解密,即使 RustFS 服务被攻破,文件也无法被读取。

  2. 回收站功能:删除文件时不直接删除,而是移动到 trash​ 前缀的路径下,支持恢复和永久删除。

  3. 版本控制:上传同名文件时自动生成版本号,支持下载历史版本。

  4. 运维监控:集成 prometheus​ 库,暴露文件上传/下载量、成功率等指标,对接 Grafana 实现可视化监控。

  5. TUI 界面:使用 ratatui​ 库构建终端图形界面,支持鼠标操作和文件预览。

  6. Web 界面:基于 axum​ 框架搭建 Web 服务,实现跨设备访问。

总结

通过本次实战,我们不仅搭建了基于 RustFS 的迷你文件系统,还融入了生产级工程实践:结构化日志、精准错误处理、配置管理、文件完整性校验、并发控制、密钥安全存储等。这个客户端已经从“能用”的 demo 升级为“好用、稳定、安全”的工具。

更重要的是,我们掌握了利用开源分布式存储构建私有文件服务的核心思路——按需定制功能,兼顾实用性和可靠性。这,就是编程的乐趣所在!


以下是深入学习 RustFS 的推荐资源:RustFS

官方文档: RustFS 官方文档- 提供架构、安装指南和 API 参考。

GitHub 仓库: GitHub 仓库 - 获取源代码、提交问题或贡献代码。

社区支持: GitHub Discussions- 与开发者交流经验和解决方案。

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐