一、libaio 原理概述

1.1 libaio 介绍

libaio(Linux Asynchronous I/O)是 Linux 内核提供的异步 I/O 库,其核心原理是:

  1. 异步提交:应用程序通过 io_submit 提交 I/O 请求后立即返回,不阻塞进程
  2. 事件通知:内核通过完成队列(completion queue)通知应用程序 I/O 操作结果
  3. 零拷贝:结合 O_DIRECT 标志绕过内核缓冲区,实现直接磁盘访问
  4. 批量处理:单次系统调用可提交/完成多个 I/O 请求,减少上下文切换
io_submit
中断
io_getevents
应用程序
内核提交队列
I/O调度层
块设备驱动
存储设备
完成事件环

1.2 O_DIRECT 打开方式

在Linux系统中,使用O_DIRECT标志打开文件时,会绕过操作系统的缓存机制,直接与存储设备进行数据交互。这种模式具有以下特点:

绕过系统缓存

  • 不使用页缓存(Page Cache)
    正常文件操作会将数据先存入系统缓存,再异步写入磁盘。而O_DIRECT会跳过这一步,直接将数据写入磁盘或从磁盘读取,减少了数据在缓存中的拷贝开销。
  • 应用程序需自行管理缓存
    由于系统不再自动缓存数据,应用程序需要自己处理数据的缓存逻辑(如预读、缓存淘汰等)。
DMA
被绕过
用户缓冲区
磁盘控制器
物理磁盘
页缓存

在这里插入图片描述

性能特点

  • 减少数据拷贝次数
    传统I/O流程(如read()/write())会经历“用户空间→内核空间缓存→磁盘”的多次拷贝,而O_DIRECT直接操作磁盘,降低了CPU和内存带宽的消耗。
  • 适合大尺寸连续I/O
    对数据库、大数据处理等场景中频繁的大文件顺序读写(如日志写入、数据加载)性能提升明显。
  • 随机I/O可能更慢
    若操作小文件或随机读写,由于缺乏系统缓存的预读和合并优化,性能可能反而低于普通模式。

对齐要求

  • 数据缓冲区需按块对齐
    使用O_DIRECT时,数据缓冲区的地址、长度必须与磁盘块大小(通常为4KB)对齐,否则会导致I/O错误。例如:
    // 错误示例(缓冲区未对齐)
    char buf[1024];
    write(fd, buf, 1024);
    
    // 正确示例(使用malloc对齐或posix_memalign)
    char *buf;
    posix_memalign((void**)&buf, 4096, 1024);
    
  • 偏移量需对齐
    文件读写的偏移量也需是块大小的整数倍,否则可能触发部分写入或读取错误。

同步特性

  • 更接近同步I/O行为
    O_DIRECT下的write()操作会直接将数据写入磁盘,类似fsync()的效果,确保数据持久化。但需注意,这并不完全等同于同步I/O,仍需配合fsync()/fdatasync()保证元数据同步。
  • 降低缓存不一致风险
    多进程或多节点访问同一文件时,避免了因系统缓存未刷新导致的数据不一致问题(如数据库事务的持久性需求)。

适用场景

  • 数据库系统(如MySQL、PostgreSQL):通过O_DIRECT减少缓存干扰,自行管理数据页缓存。
  • 大数据存储与处理:处理TB级数据时,大尺寸连续I/O可提升吞吐量。
  • 高性能计算(HPC):科学计算中对I/O延迟和带宽敏感的场景。
  • 存储设备测试工具:如dd命令使用oflag=direct测试磁盘裸性能。

1.3 libaio 数据结构

  1. I/O 上下文(io_context_t)

    • 每个异步 I/O 操作都需要关联一个上下文,用于管理 I/O 请求队列和完成事件。
  2. 1. _I/O 请求(io_prep__ 系列函数)

    • 通过io_prep_pread()/io_prep_pwrite()等函数准备 I/O 请求,填充请求参数(如文件描述符、缓冲区、偏移量等)。
  3. 提交请求(io_submit)

    • 将准备好的请求批量提交到内核,由内核异步执行。
  4. 获取完成事件(io_getevents)

    • 应用程序通过轮询或阻塞方式检查哪些 I/O 请求已完成,并获取结果。
iocb - I/O控制块
包含 fd/buffer/offset/size
io_event - 完成事件
包含 iocb指针/结果码
io_context_t - AIO上下文
管理请求队列和完成队列

1.4 libaio 核心 API 完整介绍

io_setup - 创建 AIO 上下文
int io_setup(int maxevents, io_context_t *ctx);
  • 参数详解
    • maxevents:指定完成队列(Completion Queue)的最大容量,即同时能处理的最大异步事件数。该值需根据应用并发需求设置(如设置为 8192 表示最多缓存 8192 个完成事件)。
    • ctx:输出参数,用于存储创建的异步上下文句柄(本质为 unsigned long long 类型)。
  • 返回值
    • 0:成功。
    • 负数:错误码(如 -ENOMEM 表示内存不足,-EINVAL 表示参数无效)。
  • 注意事项
    • maxevents 需大于 0,且通常设为 2 的幂次(如 1024、8192)以优化内核队列管理。
    • 上下文创建后需通过 io_destroy() 释放资源。
io_submit - 提交异步请求
int io_submit(io_context_t ctx, long nr, struct iocb *cb[]);
  • 参数详解
    • ctx:异步上下文句柄,由 io_setup() 创建。
    • nr:提交的 I/O 控制块(iocb)数量,即 cb 数组的长度。
    • cb:指向 iocb 结构体的指针数组,每个元素对应一个待提交的 I/O 请求。
  • 返回值
    • 非负数:成功提交到内核的请求数量(可能小于 nr,如因内核资源不足)。
    • 负数:错误码(如 -EAGAIN 表示资源临时不可用)。
  • 关键行为
    • 内核接收请求后立即返回,应用程序无需阻塞等待 I/O 完成。
    • 提交的请求会被内核放入队列,按调度策略执行(如合并相邻的读写请求)。
io_getevents - 获取完成事件
int io_getevents(io_context_t ctx, long min_nr, long nr, 
                struct io_event *events, struct timespec *timeout);
  • 参数详解
    • ctx:异步上下文句柄。
    • min_nr:期望获取的最小完成事件数。若当前事件数不足,函数会阻塞(除非 timeout 非零)。
    • nr:最多获取的事件数,即 events 数组的最大长度。
    • events:输出参数,存储完成事件的数组,每个元素为 struct io_event 类型。
    • timeout:超时时间,NULL 表示无限等待;{0, 0} 表示非阻塞模式。
  • 返回值
    • 正数:实际获取的事件数(范围:min_nr ≤ 返回值 ≤ nr)。
    • 0:超时且无事件(仅当 timeout 非零时可能)。
    • 负数:错误码(如 -EBADF 表示无效上下文)。
  • 事件结构体 struct io_event
    struct io_event {
        void *data;       // 对应 iocb 中的 data 字段(用户自定义数据,如块索引)
        int res;          // I/O 操作结果(成功为字节数,失败为负错误码)
        int res2;         // 保留字段(通常为 0)
        struct iocb *obj; // 指向发起请求的 iocb 结构体
    };
    
io_prep_pwrite - 准备写请求
void io_prep_pwrite(struct iocb *iocb, int fd, void *buf, 
                   size_t count, long long offset);
  • 参数详解
    • iocb:待初始化的 I/O 控制块结构体指针。
    • fd:目标文件的描述符(需以 O_RDWR 打开)。
    • buf:写入数据的缓冲区,需按扇区大小对齐(如 4KB),否则配合 O_DIRECT 时会失败。
    • count:写入的字节数,需为块大小的整数倍(若使用 O_DIRECT)。
    • offset:写入的文件偏移量,需与块大小对齐(若使用 O_DIRECT)。
  • 关联操作
    • 写请求完成后,可通过 io_getevents() 获取 res 字段确认写入字节数。
    • 若需确保数据持久化,需配合 fsync()fdatasync()
io_prep_pread - 准备读请求
void io_prep_pread(struct iocb *iocb, int fd, void *buf, 
                   size_t count, long long offset);
  • 参数与 io_prep_pwrite 类似,区别在于功能为读取数据:
    • buf:用于存储读取数据的缓冲区(需对齐)。
    • 读取完成后,res 字段返回实际读取的字节数(失败时为负错误码)。
io_destroy - 销毁上下文
int io_destroy(io_context_t ctx);
  • 参数详解
    • ctx:待销毁的异步上下文句柄。
  • 返回值
    • 0:成功释放上下文及关联资源(如事件队列)。
    • 负数:错误码(如 -EBUSY 表示仍有未完成的请求)。
  • 注意事项
    • 调用前需确保所有提交的请求已通过 io_getevents() 获取完成事件,否则可能导致资源泄漏。
    • 若存在未完成请求,可先通过 io_cancel() 取消请求再销毁上下文。
补充:iocb 结构体与常用标志
struct iocb {
    uint64_t aio_fildes;      // 文件描述符
    uint64_t aio_offset;      // 偏移量
    void *aio_buf;            // 数据缓冲区
    uint64_t aio_nbytes;      // 操作字节数
    int aio_lio_opcode;       // 操作类型(如 IOCB_CMD_PREAD/IOCB_CMD_PWRITE)
    uint64_t aio_flags;       // 标志位(常用如下)
    void *aio_data;           // 用户自定义数据(可通过 io_event.data 访问)
};
  • 常用 aio_flags 标志
    • IOCB_FLAG_NOWAIT:非阻塞提交(内核忙时立即返回错误)。
    • IOCB_FLAG_RESFD:将完成事件写入指定文件描述符(替代轮询)。
    • IOCB_FLAG_NOFSYNC:不自动执行 fsync(需手动调用)。

核心 API 工作流程示例

  1. 初始化io_setup() → 分配对齐缓冲区 → 构建 iocb 数组
  2. 提交请求io_submit() 批量发送请求到内核
  3. 事件处理io_getevents() 阻塞获取完成事件,处理 res 结果
  4. 资源释放io_destroy() 销毁上下文,释放缓冲区

二、libaio实现异步读写文件

API补充

posix_memalign

posix_memalign是一个用于在 C 语言中分配内存对齐空间的函数,主要用于满足某些硬件或 API(如libaio)对内存地址对齐的特殊要求。以下是其详细介绍:

函数原型

int posix_memalign(void **memptr, size_t alignment, size_t size);

参数说明

参数 含义
memptr 输出参数,指向分配内存的指针地址
alignment 对齐字节数,必须是 2 的幂(如5124096),且通常需大于等于sizeof(void*)
size 分配的内存大小(字节)

返回值

  • 0:内存分配成功,memptr指向对齐后的内存起始地址。
  • 非 0 错误码:分配失败,常见错误包括:
    • ENOMEM:内存不足
    • EINVALalignment不是 2 的幂或无效值

分批异步写入文件

这段代码实现了使用 libaio 库进行批量异步写操作的核心逻辑,主要功能是将数据分批次提交给内核进行异步写入,并等待每批次完成后再处理下一批。

1. 批次循环控制
for (int batch = 0; batch < NUM_BLOCKS; batch += BATCH_SIZE) {
    // 计算当前批次实际处理的块数
    int current_batch = (batch + BATCH_SIZE > NUM_BLOCKS) ? NUM_BLOCKS - batch : BATCH_SIZE;
  • 动态批次大小:处理最后一批时自动调整大小,避免越界(如总块数不是 BATCH_SIZE 整数倍时)。
  • 循环步进:每次处理完一批后,batch += BATCH_SIZE跳转到下一批。
2. 构建 I/O 请求
for (i = 0; i < current_batch; i++) {
    int idx = batch + i;
    // 初始化写请求控制块
    io_prep_pwrite(&iocbs[i], fd, buffers[idx], BLOCK_SIZE, idx * BLOCK_SIZE);
    // 存储块索引到iocb.data,用于事件处理时关联
    iocbs[i].data = (void*)(long)idx;
    iocb_ptrs[i] = &iocbs[i];
}
  • io_prep_pwrite:设置写请求参数(文件描述符fd、缓冲区buffers[idx]、块大小BLOCK_SIZE、偏移量idx * BLOCK_SIZE)。
  • 上下文关联:通过iocbs[i].data = (void*)(long)idx将块索引存入 iocb,以便事件处理时知道哪个块完成了写入。
3. 提交请求到内核
ret = io_submit(ctx, current_batch, iocb_ptrs);
if (ret != current_batch) {
    fprintf(stderr, "io_submit write batch %d failed: %d/%d, %s\n", 
            batch/BATCH_SIZE, ret, current_batch, strerror(-ret));
    goto cleanup;
}
  • 批量提交io_submit一次性提交current_batch个请求,返回成功提交的数量。
  • 错误检查:若提交数量不等于请求数量,输出错误信息并跳转清理资源。
4. 等待事件完成
completed = 0;
while (completed < current_batch) {
    ret = io_getevents(ctx, 1, current_batch, events, NULL);
    if (ret < 0) {
        perror("io_getevents write");
        verify_ok = 0;
        goto cleanup;
    }
    
    completed += ret;
    for (i = 0; i < ret; i++) {
        int block_idx = (int)(long)events[i].data;
        printf("Write completed for block %d/%d (batch %d/%d)\r", 
               block_idx + 1, NUM_BLOCKS, batch/BATCH_SIZE + 1, (NUM_BLOCKS+BATCH_SIZE-1)/BATCH_SIZE);
        fflush(stdout);
    }
}
  • 阻塞等待事件io_geteventsmin_nr=1表示至少等待 1 个事件完成才返回,timeout=NULL表示无限等待。
  • 进度更新:每次获取到完成事件后,通过events[i].data获取块索引,打印进度信息(使用\r实现行内刷新)。
  • 循环完成条件:当completed等于current_batch时,说明当前批次所有请求已完成。
   for (int batch = 0; batch < NUM_BLOCKS; batch += BATCH_SIZE) {
        int current_batch = (batch + BATCH_SIZE > NUM_BLOCKS) ? NUM_BLOCKS - batch : BATCH_SIZE;
        
        for (i = 0; i < current_batch; i++) {
            int idx = batch + i;
            io_prep_pwrite(&iocbs[i], fd, buffers[idx], BLOCK_SIZE, idx * BLOCK_SIZE);
            iocbs[i].data = (void*)(long)idx;
            iocb_ptrs[i] = &iocbs[i];
        }
        
        ret = io_submit(ctx, current_batch, iocb_ptrs);
        if (ret != current_batch) {
            fprintf(stderr, "io_submit write batch %d failed: %d/%d, %s\n", 
                    batch/BATCH_SIZE, ret, current_batch, strerror(-ret));
            goto cleanup;
        }
        
        // 等待当前批次完成
        completed = 0;
        while (completed < current_batch) {
            ret = io_getevents(ctx, 1, current_batch, events, NULL);
            if (ret < 0) {
                perror("io_getevents write");
                verify_ok = 0;
                goto cleanup;
            }
            
            completed += ret;
            for (i = 0; i < ret; i++) {
                int block_idx = (int)(long)events[i].data;
                printf("Write completed for block %d/%d (batch %d/%d)\r", 
                       block_idx + 1, NUM_BLOCKS, batch/BATCH_SIZE + 1, (NUM_BLOCKS+BATCH_SIZE-1)/BATCH_SIZE);
                fflush(stdout);
            }
        }
    }

分批异步读取文件

代码通过批处理机制分批次提交异步读请求,每批处理 1024 个 4KB 块(共 4MB),利用 libaio 的异步特性实现非阻塞读取,同时通过字符串比对验证数据准确性。主要流程包括:

1. 批次循环与当前批次计算
for (int batch = 0; batch < NUM_BLOCKS; batch += BATCH_SIZE) {
    int current_batch = (batch + BATCH_SIZE > NUM_BLOCKS) ? NUM_BLOCKS - batch : BATCH_SIZE;
  • 动态批次大小:处理最后一批时自动调整大小(如总块数 262144,批次 1024,最后一批仍为 1024;若总块数 262145,最后一批为 1)。
  • 循环步进:每次处理完一批后,batch += BATCH_SIZE跳转到下一批,确保所有块被读取。
2. 构建异步读请求
for (i = 0; i < current_batch; i++) {
    int idx = batch + i;
    // 初始化读请求控制块(与写请求的区别:io_prep_pread)
    io_prep_pread(&iocbs[i], fd, buffers[idx], BLOCK_SIZE, idx * BLOCK_SIZE);
    iocbs[i].data = (void*)(long)idx;
    iocb_ptrs[i] = &iocbs[i];
}
  • io_prep_pread:设置读请求参数(文件描述符fd、缓冲区buffers[idx]、块大小BLOCK_SIZE、偏移量idx * BLOCK_SIZE)。
  • 与写操作的区别:读请求从文件读取数据到buffers[idx],而写请求是从缓冲区写入文件。
3. 提交读请求到内核
ret = io_submit(ctx, current_batch, iocb_ptrs);
if (ret != current_batch) {
    fprintf(stderr, "io_submit read batch %d failed: %d/%d, %s\n", 
            batch/BATCH_SIZE, ret, current_batch, strerror(-ret));
    goto cleanup;
}
  • 批量提交io_submit一次性提交current_batch个读请求,返回成功提交数。
  • 错误处理:若提交数不等于请求数,输出错误并跳转清理资源(如释放内存、关闭文件)。
4. 等待读事件完成并验证数据
completed = 0;
while (completed < current_batch) {
    ret = io_getevents(ctx, 1, current_batch, events, NULL);
    if (ret < 0) {
        perror("io_getevents read");
        verify_ok = 0;
        goto cleanup;
    }
    
    completed += ret;
    for (i = 0; i < ret; i++) {
        int block_idx = (int)(long)events[i].data;
        printf("Read completed for block %d/%d (batch %d/%d)\r", 
               block_idx + 1, NUM_BLOCKS, batch/BATCH_SIZE + 1, (NUM_BLOCKS+BATCH_SIZE-1)/BATCH_SIZE);
        fflush(stdout);
        
        // 数据验证核心逻辑
        char expected[BLOCK_SIZE];
        sprintf(expected, "This is block %d of %d\n", block_idx + 1, NUM_BLOCKS);
        if (strncmp(buffers[block_idx], expected, BLOCK_SIZE) != 0) {
            printf("\nBlock %d verification failed!\n", block_idx + 1);
            printf("Expected: %s", expected);
            printf("Actual:   %s", buffers[block_idx]);
            verify_ok = 0;
        }
    }
}
  • 阻塞等待事件io_geteventsmin_nr=1确保至少等待 1 个事件完成,timeout=NULL无限等待。
  • 进度显示:通过\r实现行内刷新,实时显示当前完成的块编号和批次。
  • 数据验证逻辑
    1. sprintf生成预期数据(如"This is block 1 of 262144\n")。
    2. strncmp对比实际读取数据与预期数据。
    3. 若不一致,标记verify_ok=0并输出差异,确保数据完整性。

完整代码

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <fcntl.h>
#include <unistd.h>
#include <libaio.h>
#include <sys/stat.h>
#include <errno.h>
#include <time.h>

#define BLOCK_SIZE 4096       // 4KB块大小
#define ALIGN_SIZE 4096       // 内存对齐大小
#define NUM_BLOCKS 262144     // 1GB文件 = 4KB * 262144
#define FILE_SIZE (BLOCK_SIZE * NUM_BLOCKS) // 测试文件大小
#define BATCH_SIZE 1024       // 每批处理1024个请求

// 记录时间点
typedef struct {
    struct timespec start;
    struct timespec write_start;
    struct timespec write_end;
    struct timespec read_start;
    struct timespec read_end;
    struct timespec end;
} TimeRecorder;

// 初始化时间记录器
void init_time_recorder(TimeRecorder *recorder) {
    memset(recorder, 0, sizeof(TimeRecorder));
    clock_gettime(CLOCK_MONOTONIC, &recorder->start);
}

// 记录时间点
void record_time(struct timespec *timepoint) {
    clock_gettime(CLOCK_MONOTONIC, timepoint);
}

// 计算时间差(纳秒)
long long calculate_time_diff(struct timespec *start, struct timespec *end) {
    return (end->tv_sec - start->tv_sec) * 1e9 + (end->tv_nsec - start->tv_nsec);
}

// 打印性能统计
void print_performance(TimeRecorder *recorder) {
    long long total_time_ns = calculate_time_diff(&recorder->start, &recorder->end);
    long long write_time_ns = calculate_time_diff(&recorder->write_start, &recorder->write_end);
    long long read_time_ns = calculate_time_diff(&recorder->read_start, &recorder->read_end);
    
    double total_time_s = total_time_ns / 1e9;
    double write_time_s = write_time_ns / 1e9;
    double read_time_s = read_time_ns / 1e9;
    
    double total_size_mb = FILE_SIZE / (1024.0 * 1024.0);
    double total_speed = total_size_mb / total_time_s;
    double write_speed = total_size_mb / write_time_s;
    double read_speed = total_size_mb / read_time_s;
    
    printf("\n===== Performance Statistics =====\n");
    printf("Total operation time: %.2f seconds\n", total_time_s);
    printf("Write operation time: %.2f seconds\n", write_time_s);
    printf("Read operation time: %.2f seconds\n", read_time_s);
    printf("Total data size: %.2f MB (%.2f GB)\n", total_size_mb, total_size_mb/1024);
    printf("Overall transfer speed: %.2f MB/s\n", total_speed);
    printf("Write speed: %.2f MB/s\n", write_speed);
    printf("Read speed: %.2f MB/s\n", read_speed);
    printf("================================\n");
}

int main() {
    io_context_t ctx = 0;
    int fd;
    char **buffers = NULL;
    struct iocb **iocb_ptrs = NULL;
    struct iocb iocbs[BATCH_SIZE];  // 改为批处理大小
    struct io_event events[BATCH_SIZE];
    const char *filename = "aio_test.txt";
    int i, j, ret;
    int completed = 0;
    int verify_ok = 1;
    TimeRecorder time_rec;
    
    // 初始化时间记录器
    init_time_recorder(&time_rec);
    
    // 1. 初始化异步I/O上下文,最多缓存8192个异步事件
    if (io_setup(8192, &ctx) < 0) { 
        perror("io_setup");
        return 1;
    }
    
    // 2. 创建测试文件,使用O_DIRECT方式打开
    fd = open(filename, O_RDWR | O_CREAT | O_TRUNC | O_DIRECT, 0644);
    if (fd < 0) {
        perror("open");
        io_destroy(ctx);
        return 1;
    }
    
    // 3. 分配对齐的内存缓冲区 4K大小
    buffers = (char**)malloc(NUM_BLOCKS * sizeof(char *));
    if (!buffers) {
        perror("malloc");
        close(fd);
        io_destroy(ctx);
        return 1;
    }
    
    printf("Allocating memory buffers...\n");
    for (i = 0; i < NUM_BLOCKS; i++) {
        if (posix_memalign((void**)&buffers[i], ALIGN_SIZE, BLOCK_SIZE) != 0) { //以4K大小对齐
            perror("posix_memalign");
            for (j = 0; j < i; j++) free(buffers[j]);
            free(buffers);
            close(fd);
            io_destroy(ctx);
            return 1;
        }
        memset(buffers[i], 0, BLOCK_SIZE);
        
        // 显示内存分配进度
        if (i % 10000 == 0) {
            printf("Allocated %d/%d buffers\r", i, NUM_BLOCKS);
            fflush(stdout);
        }
    }
    printf("Allocated %d buffers successfully\n", NUM_BLOCKS);
    
    // 4. 准备测试数据
    printf("Preparing test data...\n");
    for (i = 0; i < NUM_BLOCKS; i++) {
        sprintf(buffers[i], "This is block %d of %d\n", i + 1, NUM_BLOCKS);
        
        // 显示数据准备进度
        if (i % 10000 == 0) {
            printf("Prepared %d/%d data blocks\r", i, NUM_BLOCKS);
            fflush(stdout);
        }
    }
    printf("Prepared %d data blocks successfully\n", NUM_BLOCKS);
    
    // 5. 分配I/O控制块指针数组,每个数组对应的数量为BATCH_SZIE
    iocb_ptrs = (struct iocb**)malloc(BATCH_SIZE * sizeof(struct iocb*));
    if (!iocb_ptrs) {
        perror("malloc iocb_ptrs");
        goto cleanup;
    }
    
    // 6. 提交异步写请求 (分批)
    record_time(&time_rec.write_start);
    for (int batch = 0; batch < NUM_BLOCKS; batch += BATCH_SIZE) {
        int current_batch = (batch + BATCH_SIZE > NUM_BLOCKS) ? NUM_BLOCKS - batch : BATCH_SIZE;
        
        for (i = 0; i < current_batch; i++) {
            int idx = batch + i;
            io_prep_pwrite(&iocbs[i], fd, buffers[idx], BLOCK_SIZE, idx * BLOCK_SIZE);
            iocbs[i].data = (void*)(long)idx;
            iocb_ptrs[i] = &iocbs[i];
        }
        
        ret = io_submit(ctx, current_batch, iocb_ptrs);
        if (ret != current_batch) {
            fprintf(stderr, "io_submit write batch %d failed: %d/%d, %s\n", 
                    batch/BATCH_SIZE, ret, current_batch, strerror(-ret));
            goto cleanup;
        }
        
        // 等待当前批次完成
        completed = 0;
        while (completed < current_batch) {
            ret = io_getevents(ctx, 1, current_batch, events, NULL);
            if (ret < 0) {
                perror("io_getevents write");
                verify_ok = 0;
                goto cleanup;
            }
            
            completed += ret;
            for (i = 0; i < ret; i++) {
                int block_idx = (int)(long)events[i].data;
                printf("Write completed for block %d/%d (batch %d/%d)\r", 
                       block_idx + 1, NUM_BLOCKS, batch/BATCH_SIZE + 1, (NUM_BLOCKS+BATCH_SIZE-1)/BATCH_SIZE);
                fflush(stdout);
            }
        }
    }
    record_time(&time_rec.write_end);
    printf("\nAll write operations completed\n");
    
    // 7. 提交异步读请求 (分批)
    record_time(&time_rec.read_start);
    for (int batch = 0; batch < NUM_BLOCKS; batch += BATCH_SIZE) {
        int current_batch = (batch + BATCH_SIZE > NUM_BLOCKS) ? NUM_BLOCKS - batch : BATCH_SIZE;
        
        for (i = 0; i < current_batch; i++) {
            int idx = batch + i;
            io_prep_pread(&iocbs[i], fd, buffers[idx], BLOCK_SIZE, idx * BLOCK_SIZE); //准备异步读请求
            iocbs[i].data = (void*)(long)idx;
            iocb_ptrs[i] = &iocbs[i];
        }
        
        ret = io_submit(ctx, current_batch, iocb_ptrs); //提交异步读请求
        if (ret != current_batch) {
            fprintf(stderr, "io_submit read batch %d failed: %d/%d, %s\n", 
                    batch/BATCH_SIZE, ret, current_batch, strerror(-ret));
            goto cleanup;
        }
        
        // 等待当前批次完成并验证
        completed = 0;
        while (completed < current_batch) {
            ret = io_getevents(ctx, 1, current_batch, events, NULL);
            if (ret < 0) {
                perror("io_getevents read");
                verify_ok = 0;
                goto cleanup;
            }
            
            completed += ret;
            for (i = 0; i < ret; i++) {
                int block_idx = (int)(long)events[i].data;
                printf("Read completed for block %d/%d (batch %d/%d)\r", 
                       block_idx + 1, NUM_BLOCKS, batch/BATCH_SIZE + 1, (NUM_BLOCKS+BATCH_SIZE-1)/BATCH_SIZE);
                fflush(stdout);
                
                // 验证数据
                char expected[BLOCK_SIZE];
                sprintf(expected, "This is block %d of %d\n", block_idx + 1, NUM_BLOCKS);
                if (strncmp(buffers[block_idx], expected, BLOCK_SIZE) != 0) {
                    printf("\nBlock %d verification failed!\n", block_idx + 1);
                    printf("Expected: %s", expected);
                    printf("Actual:   %s", buffers[block_idx]);
                    verify_ok = 0;
                }
            }
        }
    }
    record_time(&time_rec.read_end);
    printf("\nAll read operations completed\n");
    
    // 9. 输出验证结果
    if (verify_ok) {
        printf("All blocks verified successfully!\n");
    } else {
        printf("Verification failed for some blocks\n");
    }
    
cleanup:
    // 10. 记录总结束时间
    record_time(&time_rec.end);
    
    // 11. 打印性能统计
    print_performance(&time_rec);
    
    // 12. 清理资源
    printf("Cleaning up resources...\n");
    for (i = 0; i < NUM_BLOCKS; i++) {
        if (buffers[i]) free(buffers[i]);
        
        // 显示清理进度
        if (i % 10000 == 0) {
            printf("Freed %d/%d buffers\r", i, NUM_BLOCKS);
            fflush(stdout);
        }
    }
    if (buffers) free(buffers);
    if (iocb_ptrs) free(iocb_ptrs);
    if (fd >= 0) close(fd);
    if (ctx) io_destroy(ctx);
    
    printf("Cleanup completed\n");
    return verify_ok ? 0 : 1;
}

编译

编译的时候需要链接aio

 g++ main.cpp -o main -laio -O2

运行结果

运行结果如下

在这里插入图片描述

更多资料:https://github.com/0voice

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐