1. 项目背景详细介绍

在并发和分布式系统开发中,不同进程之间进行高效的数据交换是常见需求。共享内存(Shared Memory)是一种性能非常高的进程间通信(IPC)手段:它允许多个进程在同一个物理内存区域读写数据,避免了内核复制和频繁系统调用,从而极大提高吞吐和降低延迟。常见场景包括:多进程服务(如 web 工作进程共享缓存)、音视频实时处理(生产者把帧放入共享缓冲区,消费者读取并渲染)、大数据管道(多个进程并行处理同一份大数据)等。

然而,共享内存带来复杂性:内存布局设计、同步(互斥与条件通知)、生命周期管理、跨平台差异(POSIX vs Windows)以及安全性(访问控制与边界检查)都需要慎重处理。本文目标是提供一个教学级但可实用的 C++ 共享内存实现示例,覆盖 POSIX 和 Windows 两大主流平台,包含同步原语、环形缓冲区示例(producer/consumer)、以及完整构建与调试说明,便于在真实项目中直接参考或改造。


2. 项目需求详细介绍

功能需求(必须实现):

  • 提供跨进程的共享内存封装类(SharedMemory),支持在 POSIX (Linux/macOS) 和 Windows 上编译运行(使用条件编译)。

  • 支持在共享内存内放置用户自定义数据结构(通过 placement new 或已有 POD)。

  • 提供进程间同步机制:互斥(mutex)和条件变量(condvar),用于安全地管理访问与通知生产/消费事件。同步原语应放在共享内存中,以支持跨进程。

  • 实现一个环形队列(CircularBuffer)示例用于生产者-消费者场景:支持单生产者-单消费者(SPSC)与可扩展为多生产者/多消费者(MPMC)的基本思路。

  • 提供示例程序:producer 写入若干消息到共享内存;consumer 从共享内存读取并展示。示例要能在不同进程中运行并正确同步。

  • 提供清楚的错误处理与日志输出(便于调试)。

  • 在实现中尽量减少第三方依赖(仅在 Windows 使用 Win32 API,POSIX 使用 pthread / mmap / shm_open 等)。


3. 相关技术详细介绍

  1. POSIX 共享内存(shm_open / mmap)

    • shm_open 创建或打开一个命名共享内存对象(在 /dev/shm 或系统实现下)。返回文件描述符,通过 ftruncate 设置大小,再通过 mmap 将其映射到进程地址空间。

    • 映射可以是读写或只读。多个进程都能 mmap 同一个共享对象,从而访问同一内存区域。

    • 注意权限与生命周期:命名对象需要使用 shm_unlink 删除。

  2. Windows 共享内存(CreateFileMapping / MapViewOfFile)

    • 使用 CreateFileMapping(传入 INVALID_HANDLE_VALUE 创建基于内存的文件映射)并指定名称,然后 MapViewOfFile 映射到地址空间。

    • 通过 OpenFileMapping 在第二个进程中打开映射。

  3. 跨进程同步

    • Windows 提供命名互斥体(CreateMutex)、命名事件(CreateEvent)与命名信号量(CreateSemaphore)。Windows 也支持条件变量与 SRWLocks 但跨进程原生支持较弱,通常需要基于互斥+事件实现条件通知。

    • POSIX:pthread_mutex_tpthread_cond_t 支持放在共享内存中,但需设置 pthread_mutexattr_setpsharedpthread_condattr_setpsharedPTHREAD_PROCESS_SHARED,并在属性中配置适当的互斥类型(如 PTHREAD_MUTEX_DEFAULT)。注意不同 libc 版本兼容性。

    • 为简洁与可移植性,本文在 POSIX 中把 pthread_mutex_tpthread_cond_t 放在共享内存中并初始化共享属性;在 Windows 中使用命名互斥与命名事件。

  4. 内存布局与对齐

    • 共享内存中放置结构体时要注意对齐和版本兼容。一般定义一个头结构(metadata)放在内存开头,包含 magic、版本号、容量、读写索引、互斥/条件变量等。

  5. 环形缓冲区(Ring Buffer)

    • 环形队列用作生产者-消费者缓冲区。通常维护读索引(head)与写索引(tail)。要处理满/空判断并避免 ABA 等竞态(对于 SPSC 可以用无锁算法;但本文使用互斥保护以保证简单正确)。


4. 实现思路详细介绍

总体设计:

  • 共享内存文件包含三部分:

    1. 固定头(SharedHeader):包含 magic、版本、容量、读写索引、用于同步的原语(或用于存放句柄/名称)。

    2. 同步原语(在 POSIX 中直接放在 header 内;在 Windows 中 header 保存命名对象名以便打开)。

    3. 数据区(环形缓冲区的原始字节区),用于存放消息帧或固定大小记录。

  • 提供 SharedMemory C++ 类(RAII 风格):

    • 构造:创建或打开命名共享内存,设置大小,映射地址,初始化 header(仅在创建者时)。

    • 方法:write 写入一条消息(字节数据),read 读取一条消息;close 卸载映射并可选择 unlink。

    • 锁与条件:在写入前获得互斥锁,检查是否有足够空间;若满则等待条件变量;写入后通知消费者;消费者读取后通知生产者。

    • 兼容性:支持在 POSIX 上使用 pthread_mutex_tpthread_cond_t;在 Windows 上,使用命名互斥体与命名事件(或使用 fence 模式)。

  • 示例程序:

    • producer:打开/创建共享内存,循环写入字符串消息(例如 "msg 1", "msg 2"...),每写一条通知消费者,写完可退出或持续运行。

    • consumer:打开共享内存,循环读取消息并输出。可以在读取到特殊消息(如 "quit")时退出。

  • 错误与边界处理:

    • 如果进程异常退出,互斥以及条件对象的状态可能受影响。POSIX 的 pthread mutex 在进程崩溃时可能进入不确定状态(但支持 robust mutex 可检测)。为了教学简单,示例使用普通互斥并在遇到错误时打印并重建(生产环境推荐 robust mutex)。

    • 对消息长度进行限制(例如单条消息最大 4096 字节),并写入长度前缀以便读取。

  • 生命周期:

    • 提供选项:创建者可以在退出时选择 unlink(删除命名共享内存),或保留以供后续进程使用。示例默认由创建者在退出时保留共享内存(更安全)并提供命令行参数决定是否删除。


5. 完整实现代码

// === file: shared_memory_common.h ===
/*
  跨平台共享内存与同步封装(教学示例)
  支持 POSIX (Linux/macOS) 与 Windows。
  注意:示例追求清晰可读用于教学,生产请根据需求加强错误处理、安全与兼容性。
*/
#ifndef SHARED_MEMORY_COMMON_H
#define SHARED_MEMORY_COMMON_H

#include <string>
#include <cstdint>
#include <cstring>
#include <iostream>

// 最大消息大小与队列容量(字节级别设定,教学用)
constexpr size_t MAX_MESSAGE_SIZE = 4096;   // 单条消息最大 4KB
constexpr size_t DATA_REGION_SIZE = 1024 * 1024; // 共享数据区总大小 1MB

// 共享内存头部魔数,用于快速检测内存格式正确性
constexpr uint32_t SHM_MAGIC = 0x53484D31; // "SHM1"

#pragma pack(push, 1)
struct SharedHeader {
    uint32_t magic;        // magic
    uint32_t version;      // 结构版本
    size_t   capacity;     // 数据区容量(字节)
    size_t   head;         // 读索引(字节偏移)
    size_t   tail;         // 写索引(字节偏移)
    // 在 POSIX 中,我们会把 pthread_mutex_t / pthread_cond_t 放在 header 之后(或此处通过 flexible array)
    // 为简单起见,header 后紧接着放同步对象或保留区域。
    char reserved[256];    // 预留用于放同步数据或命名信息
};
#pragma pack(pop)

#endif // SHARED_MEMORY_COMMON_H

// === file: shared_memory_posix.h ===
/*
 POSIX 实现:使用 shm_open + ftruncate + mmap
 同步:把 pthread_mutex_t 和 pthread_cond_t 放入共享内存(设置 pshared 属性)
*/
#ifndef SHARED_MEMORY_POSIX_H
#define SHARED_MEMORY_POSIX_H

#ifdef __unix__
#include "shared_memory_common.h"
#include <sys/mman.h>
#include <sys/stat.h> /* For mode constants */
#include <fcntl.h>    /* For O_* constants */
#include <unistd.h>
#include <pthread.h>
#include <errno.h>
#include <system_error>

class SharedMemoryPosix {
public:
    SharedMemoryPosix(const std::string& name, size_t dataRegionSize = DATA_REGION_SIZE, bool create = false);
    ~SharedMemoryPosix();

    bool isOpen() const { return fd_ >= 0 && base_ != nullptr; }

    // 写入一条消息(阻塞直到写入成功或出错)
    bool writeMessage(const void* data, size_t len, std::string& err);

    // 读取一条消息(阻塞直到读取到或出错)
    bool readMessage(std::string& outMsg, std::string& err);

    // 选择性删除共享内存对象
    bool unlinkShm();

private:
    bool initSharedRegion(bool create);
    bool initSyncPrimitives(bool initialize); // initialize==true 时对 mutex/cond 做第一次初始化(仅创建者)

private:
    std::string name_;
    int fd_;
    size_t totalSize_; // header + dataRegion
    void* base_;       // mmap 映射起始地址
    SharedHeader* hdr_;
    char* dataRegion_;

    // 指针到同步对象放置位置(放在 header->reserved 区之后,或直接映射在 reserved 中)
    pthread_mutex_t* mutex_;
    pthread_cond_t*  canProduce_;
    pthread_cond_t*  canConsume_;
};

#endif // __unix__
#endif // SHARED_MEMORY_POSIX_H

// === file: shared_memory_posix.cpp ===
#ifdef __unix__
#include "shared_memory_posix.h"

SharedMemoryPosix::SharedMemoryPosix(const std::string& name, size_t dataRegionSize, bool create)
    : name_(name), fd_(-1), totalSize_(sizeof(SharedHeader) + dataRegionSize + 1024),
      base_(nullptr), hdr_(nullptr),
      dataRegion_(nullptr), mutex_(nullptr), canProduce_(nullptr), canConsume_(nullptr) {
    if (!initSharedRegion(create)) {
        // 报错信息在 err 打印
    } else {
        // 初始化同步原语(仅当创建时)
        bool rc = initSyncPrimitives(create);
        if (!rc) {
            std::cerr << "initSyncPrimitives failed\n";
        }
    }
}

SharedMemoryPosix::~SharedMemoryPosix() {
    if (base_) {
        munmap(base_, totalSize_);
        base_ = nullptr;
    }
    if (fd_ >= 0) close(fd_);
}

bool SharedMemoryPosix::initSharedRegion(bool create) {
    int flags = O_RDWR;
    if (create) flags |= O_CREAT;
    // 权限为用户读写
    fd_ = shm_open(name_.c_str(), flags, 0600);
    if (fd_ < 0) {
        std::cerr << "shm_open failed: " << strerror(errno) << "\n";
        return false;
    }
    if (create) {
        if (ftruncate(fd_, (off_t)totalSize_) != 0) {
            std::cerr << "ftruncate failed: " << strerror(errno) << "\n";
            return false;
        }
    }
    base_ = mmap(NULL, totalSize_, PROT_READ | PROT_WRITE, MAP_SHARED, fd_, 0);
    if (base_ == MAP_FAILED) {
        std::cerr << "mmap failed: " << strerror(errno) << "\n";
        base_ = nullptr;
        return false;
    }
    hdr_ = (SharedHeader*)base_;
    dataRegion_ = (char*)base_ + sizeof(SharedHeader) + 512; // 给 reserved 留下空间
    // 指定 mutex/cond 放在 reserved 区(aligned)
    void* syncBase = (char*)base_ + offsetof(SharedHeader, reserved) + 0;
    mutex_ = (pthread_mutex_t*)syncBase;
    canProduce_ = (pthread_cond_t*)((char*)syncBase + sizeof(pthread_mutex_t));
    canConsume_ = (pthread_cond_t*)((char*)syncBase + sizeof(pthread_mutex_t) + sizeof(pthread_cond_t));
    return true;
}

bool SharedMemoryPosix::initSyncPrimitives(bool initialize) {
    // 如果是创建者,需要初始化 header 与互斥/条件变量属性
    if (initialize) {
        // 初始化 header
        hdr_->magic = SHM_MAGIC;
        hdr_->version = 1;
        hdr_->capacity = totalSize_ - sizeof(SharedHeader) - 512;
        hdr_->head = 0;
        hdr_->tail = 0;
        memset(hdr_->reserved, 0, sizeof(hdr_->reserved));
        // 初始化 mutex 与 cond 并设置进程共享属性
        pthread_mutexattr_t mattr;
        pthread_condattr_t  cattr;
        if (pthread_mutexattr_init(&mattr) != 0) { std::cerr << "pthread_mutexattr_init failed\n"; return false; }
        if (pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED) != 0) { std::cerr << "setpshared mutex failed\n"; return false; }
#ifdef __linux__
        // Optional: robust mutex to recover from dead process (production建议开启)
        // pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST);
#endif
        if (pthread_mutex_init(mutex_, &mattr) != 0) { std::cerr << "pthread_mutex_init failed\n"; return false; }
        pthread_mutexattr_destroy(&mattr);

        if (pthread_condattr_init(&cattr) != 0) { std::cerr << "pthread_condattr_init failed\n"; return false; }
        if (pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED) != 0) { std::cerr << "setpshared cond failed\n"; return false; }
        if (pthread_cond_init(canProduce_, &cattr) != 0) { std::cerr << "pthread_cond_init canProduce failed\n"; return false; }
        if (pthread_cond_init(canConsume_, &cattr) != 0) { std::cerr << "pthread_cond_init canConsume failed\n"; return false; }
        pthread_condattr_destroy(&cattr);
        return true;
    } else {
        // 非创建者:可在此检查 magic 等
        if (hdr_->magic != SHM_MAGIC) {
            std::cerr << "magic not match or uninitialized shared memory\n";
            return false;
        }
        return true;
    }
}

bool SharedMemoryPosix::writeMessage(const void* data, size_t len, std::string& err) {
    if (!isOpen()) { err = "not open"; return false; }
    if (len == 0 || len > MAX_MESSAGE_SIZE) { err = "invalid length"; return false; }

    // 写入时加锁
    if (pthread_mutex_lock(mutex_) != 0) { err = "mutex lock failed"; return false; }

    // 可用空间计算(留一个字节避免 head==tail 时空/满混淆)
    size_t cap = hdr_->capacity;
    size_t head = hdr_->head;
    size_t tail = hdr_->tail;
    size_t used = (tail >= head) ? (tail - head) : (cap - (head - tail));
    size_t free_space = cap - used - 1;
    size_t need = len + sizeof(uint32_t); // 存放长度前缀
    while (free_space < need) {
        // 等待消费者消费
        // 为避免无限等待,可以添加超时或返回错误。此处简单等待。
        pthread_cond_wait(canProduce_, mutex_);
        head = hdr_->head;
        tail = hdr_->tail;
        used = (tail >= head) ? (tail - head) : (cap - (head - tail));
        free_space = cap - used - 1;
    }
    // 写入长度前缀与数据,考虑环绕
    uint32_t l = (uint32_t)len;
    size_t pos = tail;
    auto writeBytes = [&](const void* src, size_t n) {
        size_t first = std::min(n, cap - pos);
        memcpy(dataRegion_ + pos, src, first);
        pos = (pos + first) % cap;
        if (n > first) {
            memcpy(dataRegion_ + pos, (char*)src + first, n - first);
            pos = (pos + (n - first)) % cap;
        }
    };
    writeBytes(&l, sizeof(l));
    writeBytes(data, len);
    hdr_->tail = pos;
    // 通知消费者
    pthread_cond_signal(canConsume_);
    pthread_mutex_unlock(mutex_);
    return true;
}

bool SharedMemoryPosix::readMessage(std::string& outMsg, std::string& err) {
    if (!isOpen()) { err = "not open"; return false; }

    if (pthread_mutex_lock(mutex_) != 0) { err = "mutex lock failed"; return false; }

    size_t cap = hdr_->capacity;
    size_t head = hdr_->head;
    size_t tail = hdr_->tail;
    // 判断是否为空
    while (head == tail) {
        // 等待生产者
        pthread_cond_wait(canConsume_, mutex_);
        head = hdr_->head;
        tail = hdr_->tail;
    }
    // 读取长度前缀
    uint32_t l = 0;
    size_t pos = head;
    auto readBytes = [&](void* dst, size_t n) {
        size_t first = std::min(n, cap - pos);
        memcpy(dst, dataRegion_ + pos, first);
        pos = (pos + first) % cap;
        if (n > first) {
            memcpy((char*)dst + first, dataRegion_ + pos, n - first);
            pos = (pos + (n - first)) % cap;
        }
    };
    readBytes(&l, sizeof(l));
    if (l == 0 || l > MAX_MESSAGE_SIZE) {
        pthread_mutex_unlock(mutex_);
        err = "invalid message length";
        return false;
    }
    outMsg.resize(l);
    readBytes(&outMsg[0], l);
    hdr_->head = pos;
    // 通知生产者(有空间)
    pthread_cond_signal(canProduce_);
    pthread_mutex_unlock(mutex_);
    return true;
}

bool SharedMemoryPosix::unlinkShm() {
    if (shm_unlink(name_.c_str()) != 0) {
        std::cerr << "shm_unlink failed: " << strerror(errno) << "\n";
        return false;
    }
    return true;
}

#endif // __unix__

// === file: shared_memory_windows.h ===
/*
 Windows 实现(简化教学示例)
 使用 CreateFileMapping / MapViewOfFile
 同步使用命名互斥体 + 命名事件(或命名信号量)
*/
#ifndef SHARED_MEMORY_WINDOWS_H
#define SHARED_MEMORY_WINDOWS_H

#ifdef _WIN32
#include "shared_memory_common.h"
#include <windows.h>
#include <stdexcept>

class SharedMemoryWin {
public:
    SharedMemoryWin(const std::string& name, size_t dataRegionSize = DATA_REGION_SIZE, bool create = false);
    ~SharedMemoryWin();

    bool isOpen() const { return mapping_ != NULL && base_ != nullptr; }

    bool writeMessage(const void* data, size_t len, std::string& err);
    bool readMessage(std::string& outMsg, std::string& err);

    bool unlinkMapping();

private:
    bool initMapping(bool create);

private:
    std::string name_;
    HANDLE mapping_;
    void* base_;
    size_t totalSize_;
    SharedHeader* hdr_;
    char* dataRegion_;

    // 同步对象名
    std::string mutexName_;
    std::string evtCanProduce_;
    std::string evtCanConsume_;

    HANDLE hMutex_;
    HANDLE hEvtCanProduce_;
    HANDLE hEvtCanConsume_;
};

#endif // _WIN32
#endif // SHARED_MEMORY_WINDOWS_H

// === file: shared_memory_windows.cpp ===
#ifdef _WIN32
#include "shared_memory_windows.h"
#include <iostream>

SharedMemoryWin::SharedMemoryWin(const std::string& name, size_t dataRegionSize, bool create)
    : name_(name), mapping_(NULL), base_(nullptr), totalSize_(sizeof(SharedHeader) + dataRegionSize + 1024),
      hdr_(nullptr), dataRegion_(nullptr),
      mutexName_(name + "_mtx"), evtCanProduce_(name + "_prod"), evtCanConsume_(name + "_cons"),
      hMutex_(NULL), hEvtCanProduce_(NULL), hEvtCanConsume_(NULL) {
    if (!initMapping(create)) {
        std::cerr << "initMapping failed\n";
    }
}

SharedMemoryWin::~SharedMemoryWin() {
    if (base_) UnmapViewOfFile(base_);
    if (mapping_) CloseHandle(mapping_);
    if (hMutex_) CloseHandle(hMutex_);
    if (hEvtCanProduce_) CloseHandle(hEvtCanProduce_);
    if (hEvtCanConsume_) CloseHandle(hEvtCanConsume_);
}

bool SharedMemoryWin::initMapping(bool create) {
    DWORD flProtect = PAGE_READWRITE;
    if (create) {
        mapping_ = CreateFileMappingA(INVALID_HANDLE_VALUE, NULL, flProtect, (DWORD)(totalSize_ >> 32), (DWORD)(totalSize_ & 0xffffffff), name_.c_str());
    } else {
        mapping_ = OpenFileMappingA(FILE_MAP_ALL_ACCESS, FALSE, name_.c_str());
    }
    if (!mapping_) {
        std::cerr << "Create/OpenFileMapping failed: " << GetLastError() << "\n";
        return false;
    }
    base_ = MapViewOfFile(mapping_, FILE_MAP_ALL_ACCESS, 0, 0, totalSize_);
    if (!base_) {
        std::cerr << "MapViewOfFile failed: " << GetLastError() << "\n";
        return false;
    }
    hdr_ = (SharedHeader*)base_;
    dataRegion_ = (char*)base_ + sizeof(SharedHeader) + 512;
    // 创建或打开同步对象
    if (create) {
        hMutex_ = CreateMutexA(NULL, FALSE, mutexName_.c_str());
        hEvtCanProduce_ = CreateEventA(NULL, FALSE, TRUE, evtCanProduce_.c_str()); // 生产者初始可写
        hEvtCanConsume_ = CreateEventA(NULL, FALSE, FALSE, evtCanConsume_.c_str());
        // 初始化 header
        hdr_->magic = SHM_MAGIC;
        hdr_->version = 1;
        hdr_->capacity = totalSize_ - sizeof(SharedHeader) - 512;
        hdr_->head = 0;
        hdr_->tail = 0;
    } else {
        hMutex_ = OpenMutexA(MUTEX_ALL_ACCESS, FALSE, mutexName_.c_str());
        hEvtCanProduce_ = OpenEventA(EVENT_ALL_ACCESS, FALSE, evtCanProduce_.c_str());
        hEvtCanConsume_ = OpenEventA(EVENT_ALL_ACCESS, FALSE, evtCanConsume_.c_str());
    }
    return true;
}

bool SharedMemoryWin::writeMessage(const void* data, size_t len, std::string& err) {
    if (!isOpen()) { err = "not open"; return false; }
    if (len == 0 || len > MAX_MESSAGE_SIZE) { err = "invalid length"; return false; }
    // 等待 canProduce 事件必须为有信号
    WaitForSingleObject(hEvtCanProduce_, INFINITE);
    // 加互斥
    WaitForSingleObject(hMutex_, INFINITE);
    size_t cap = hdr_->capacity;
    size_t head = hdr_->head;
    size_t tail = hdr_->tail;
    size_t used = (tail >= head) ? (tail - head) : (cap - (head - tail));
    size_t free_space = cap - used - 1;
    size_t need = len + sizeof(uint32_t);
    if (free_space < need) {
        // 没空间,清醒地释放互斥并重置事件为等待
        ReleaseMutex(hMutex_);
        ResetEvent(hEvtCanProduce_);
        err = "no space";
        return false;
    }
    uint32_t l = (uint32_t)len;
    size_t pos = tail;
    auto writeBytes = [&](const void* src, size_t n) {
        size_t first = std::min(n, cap - pos);
        memcpy(dataRegion_ + pos, src, first);
        pos = (pos + first) % cap;
        if (n > first) {
            memcpy(dataRegion_ + pos, (char*)src + first, n - first);
            pos = (pos + (n - first)) % cap;
        }
    };
    writeBytes(&l, sizeof(l));
    writeBytes(data, len);
    hdr_->tail = pos;
    // 通知消费者
    SetEvent(hEvtCanConsume_);
    // 如果依旧有空间则保持 canProduce 为有信号,否则重置
    head = hdr_->head; tail = hdr_->tail;
    used = (tail >= head) ? (tail - head) : (cap - (head - tail));
    free_space = cap - used - 1;
    if (free_space >= sizeof(uint32_t) + 1) SetEvent(hEvtCanProduce_); else ResetEvent(hEvtCanProduce_);
    ReleaseMutex(hMutex_);
    return true;
}

bool SharedMemoryWin::readMessage(std::string& outMsg, std::string& err) {
    if (!isOpen()) { err = "not open"; return false; }
    // 等待消费者事件
    WaitForSingleObject(hEvtCanConsume_, INFINITE);
    WaitForSingleObject(hMutex_, INFINITE);
    size_t cap = hdr_->capacity;
    size_t head = hdr_->head;
    size_t tail = hdr_->tail;
    if (head == tail) {
        ReleaseMutex(hMutex_);
        ResetEvent(hEvtCanConsume_);
        err = "empty";
        return false;
    }
    uint32_t l = 0;
    size_t pos = head;
    auto readBytes = [&](void* dst, size_t n) {
        size_t first = std::min(n, cap - pos);
        memcpy(dst, dataRegion_ + pos, first);
        pos = (pos + first) % cap;
        if (n > first) {
            memcpy((char*)dst + first, dataRegion_ + pos, n - first);
            pos = (pos + (n - first)) % cap;
        }
    };
    readBytes(&l, sizeof(l));
    if (l == 0 || l > MAX_MESSAGE_SIZE) {
        ReleaseMutex(hMutex_);
        err = "invalid length";
        return false;
    }
    outMsg.resize(l);
    readBytes(&outMsg[0], l);
    hdr_->head = pos;
    // 通知生产者
    SetEvent(hEvtCanProduce_);
    // 如果没有更多数据则重置 consumer event
    head = hdr_->head; tail = hdr_->tail;
    if (head == tail) ResetEvent(hEvtCanConsume_);
    ReleaseMutex(hMutex_);
    return true;
}

bool SharedMemoryWin::unlinkMapping() {
    // Windows 不需显式 unlink 文件映射名;通过 CloseHandle 释放即可。但若需要可以 CloseHandle 并在设计上删除命名对象。
    return true;
}

#endif // _WIN32

// === file: producer.cpp ===
/*
  生产者示例:向共享内存写入若干消息
  使用方法示例(POSIX): ./producer /my_shm create
  使用方法示例(Windows): producer.exe my_shm create
*/
#include <iostream>
#include <thread>
#include <chrono>
#ifdef __unix__
#include "shared_memory_posix.h"
int main(int argc, char** argv) {
    if (argc < 2) { std::cout << "usage: producer <shm_name> [create]\n"; return 1; }
    std::string name = argv[1];
    bool create = (argc >= 3 && std::string(argv[2]) == "create");
    SharedMemoryPosix shm(name, DATA_REGION_SIZE, create);
    if (!shm.isOpen()) { std::cerr << "open shm failed\n"; return 1; }
    for (int i = 0; i < 100; ++i) {
        std::string msg = "msg " + std::to_string(i);
        std::string err;
        if (!shm.writeMessage(msg.data(), msg.size(), err)) {
            std::cerr << "write failed: " << err << "\n";
        } else {
            std::cout << "wrote: " << msg << "\n";
        }
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
    return 0;
}
#elif defined(_WIN32)
#include "shared_memory_windows.h"
int main(int argc, char** argv) {
    if (argc < 2) { std::cout << "usage: producer <shm_name> [create]\n"; return 1; }
    std::string name = argv[1];
    bool create = (argc >= 3 && std::string(argv[2]) == "create");
    SharedMemoryWin shm(name, DATA_REGION_SIZE, create);
    if (!shm.isOpen()) { std::cerr << "open shm failed\n"; return 1; }
    for (int i = 0; i < 100; ++i) {
        std::string msg = "msg " + std::to_string(i);
        std::string err;
        if (!shm.writeMessage(msg.data(), msg.size(), err)) {
            std::cerr << "write failed: " << err << "\n";
        } else {
            std::cout << "wrote: " << msg << "\n";
        }
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
    return 0;
}
#endif

// === file: consumer.cpp ===
/*
  消费者示例:从共享内存读取并打印消息
  使用方法示例(POSIX): ./consumer /my_shm
*/
#include <iostream>
#ifdef __unix__
#include "shared_memory_posix.h"
int main(int argc, char** argv) {
    if (argc < 2) { std::cout << "usage: consumer <shm_name>\n"; return 1; }
    std::string name = argv[1];
    SharedMemoryPosix shm(name, DATA_REGION_SIZE, false);
    if (!shm.isOpen()) { std::cerr << "open shm failed\n"; return 1; }
    while (true) {
        std::string msg; std::string err;
        if (!shm.readMessage(msg, err)) {
            std::cerr << "read failed: " << err << "\n";
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
            continue;
        }
        std::cout << "read: " << msg << "\n";
        if (msg == "quit") break;
    }
    return 0;
}
#elif defined(_WIN32)
#include "shared_memory_windows.h"
int main(int argc, char** argv) {
    if (argc < 2) { std::cout << "usage: consumer <shm_name>\n"; return 1; }
    std::string name = argv[1];
    SharedMemoryWin shm(name, DATA_REGION_SIZE, false);
    if (!shm.isOpen()) { std::cerr << "open shm failed\n"; return 1; }
    while (true) {
        std::string msg; std::string err;
        if (!shm.readMessage(msg, err)) {
            std::cerr << "read failed: " << err << "\n";
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
            continue;
        }
        std::cout << "read: " << msg << "\n";
        if (msg == "quit") break;
    }
    return 0;
}
#endif

// === Build / 使用说明(在代码块内以注释形式提供) ===
/*
编译说明(POSIX Linux):
  g++ -std=c++17 -pthread -o producer producer.cpp shared_memory_posix.cpp -lrt
  g++ -std=c++17 -pthread -o consumer consumer.cpp shared_memory_posix.cpp -lrt
  注意:某些系统需要 -lrt(实时库)以使用 shm_open;在现代 glibc 上可能不需要。

执行示例(POSIX):
  终端1: ./producer /my_shm create
  终端2: ./consumer /my_shm

Windows 编译请在 Visual Studio 中建立工程,包含相关源文件并链接 kernel32.lib 等。
*/

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐