前言

  • 同步I/O模型通常用于实现Reactor模式
  • 异步I/O模型则用于实现Proactor模式
  • 最后我们会使用同步I/O方式模拟出Proactor模式

一、Reactor模式

  • Reactor 释义“反应堆”,是一种事件驱动机制
  • Reactor的回调函数:和普通函数调用的不同之处在于,应用程序不是主动的调用某个 API 完成处理,而是恰恰 相反,Reactor 逆置了事件处理流程,应用程序需要提供相应的接口并注册到 Reactor 上, 如果相应的时间发生,Reactor 将主动调用应用程序注册的接口,这些接口又称为“回调函数”

  • Reactor 模式是处理并发I/O比较常见的一种模式,用于同步 I/O,中心思想是将所有要处理的I/O 事件注册到一个中心I/O多路复用器上,同时主线程/进程阻塞在多路复用器上; 一旦有 I/O 事件到来或是准备就绪(文件描述符或 socket 可读、写),多路复用器返回并将事先注册的相应 I/O 事件分发到对应的处理器中。
  • Reactor 模型有三个重要的组件:
    • 多路复用器:由操作系统提供,在 linux 上一般是 select, poll, epoll 等系统调用。
    • 事件分发器:将多路复用器中返回的就绪事件分到对应的处理函数中
    • 事件处理器:负责处理特定事件的处理函数
  • 具体流程如下:
    • 注册读就绪事件和相应的事件处理器
    • 事件分离器等待事件
    • 事件到来,激活分离器,分离器调用事件对应的处理器
    • 事件处理器完成实际的读操作,处理读到的数据,注册新的事件,然后返还控制 权

多线程Reactor模式

  • 多线程Reactor模式特点:
    • 它要求主线程(I/O处理单元)只负责监听文件描述符上是否有事件发生,有的话就立即将时间通知工作线程(逻辑单元)。除此之外,主线程不做任何其他实质性的工作
    • 读写数据,接受新的连接,以及处理客户请求均在工作线程中完成
  • 工作流程:
    • ①主线程往epoll内核事件表中注册socket上有数据可读
    • ②主线程调用epoll_wait等待socket上有数据可读
    • ③当socket上有数据可读时,epoll_wait通知主线程。主线程则将socket可读事件放入请求队列
    • ④睡眠在请求请求队列上的某个工作线程被唤醒,它从socket读取数据,并处理客户请求,然后往epoll内核事件表中注册该socket上的写就绪时间
    • ⑤主线程调用epoll_wait等到socket可写
    • ⑥当socket可写时,epoll_wait通知主线程。主线程将socket可写事件放入请求队列
    • ⑦睡眠在请求队列上的某个工作线程被唤醒,它向socket上写入服务器处理客户请求的结果

单线程Reactor模式

  • 单线程Reactor模式与多线程Reactor模式原理相同。但是工作都是在同一个线程中完成的
  • 单线程优缺点:
    • 优点:Reactor模型开发效率上比起直接使用IO复用要高,它通常是单线程的,设计目标是希望单线程使用一颗 CPU 的全部资源。优点为每个事件处理中很多时候可以 不考虑共享资源的互斥访问
    • 缺点:可是缺点也是明显的,现在的硬件发展,已经不再遵循摩尔定 律,CPU 的频率受制于材料的限制不再有大的提升,而改为是从核数的增加上提升能力
  • 单线程Reactor使用多核:
    • 如果程序业务很简单,例如只是简单的访问一些提供了并发访问的服务,就可以直接开启多个反应堆(Reactor),每个反应堆对应一颗CPU核心
    • 这些反应堆上跑的请求互不相关,这是完全可以利用多核的。例如Nginx这样的http静态服务器
  • 下面是单线程Reactor模式的实现代码,下载下来之后可以直接编译运行:
// reactor.c
// 源码链接: https://github.com/dongyusheng/csdn-code/blob/master/server-client/reactor.c
// gcc -o reactor reactor.c
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/epoll.h>
#include <errno.h>
#include <time.h>
#include <libgen.h>
#include <fcntl.h>

#define MAX_EPOLL_EVENTS    1024
#define MAX_BUFFER_SIZE     4096

typedef int NCALLBACK(int, int, void*);

// 事件结构体, 每个套接字都会被封装为一个事件
struct  ntyevent {
    int fd;           // 事件对应的fd
    int events;       // 事件类型(  本代码中我们只处理EPOLL_IN和EPOLL_OUT)

    void *arg;        // 事件回调函数的参数3, 实际传入的是一个struct ntyreactor结构体指针
    int (*callback)(int fd, int events, void *arg); //事件回调函数

    int status;       // 当前事件是否位于epoll集合中: 1表示在, 0表示不在

    char buffer[MAX_BUFFER_SIZE]; // 读写缓冲区
    int length;       //缓冲区数据的长度
    
    long last_active; // 最后一次活跃的时间
};


// Reactor主体
struct ntyreactor {
    int epoll_fd;             // epoll套接字
    struct ntyevent *events; // reactor当前处理的事件集
};

// 创建一个Tcp Server
int init_server(char *ip, short port);
// 向reactor中添加一个服务器监听事件
int ntyreactor_addlistener(struct ntyreactor *reactor, int fd, NCALLBACK callback);


/***下面这3个函数是用来对reactor操作的***/
// 初始化reactor
struct ntyreactor *ntyreactor_init();
// 销毁reactor
int ntyreactor_destroy(struct ntyreactor *reactor);
// reactor运行函数
int ntyreactor_run(struct ntyreactor *reactor);



/***下面这3个函数是用来对ntyevent事件结构操作的***/
// 将一个fd封装到事件结构中
int nty_event_set(struct ntyevent *ev, int fd, int event, int length, int status, NCALLBACK callback, void *arg);
// 将一个事件添加/更新到epoll的事件表中
int nty_event_add(int epoll_fd, struct ntyevent* ev);
// 将一个事件移出epoll事件表
int nty_event_del(int epoll_fd, struct ntyevent* event);


/***下面这3个函数是ntyevent事件可以使用的回调函数***/
int accept_callback(int fd, int events, void *arg);
int recv_callback(int fd, int events, void *arg);
int send_callback(int fd, int events, void *arg);



int main(int argc, char *argv[])
{
    if(argc != 3)
    {
        printf("usage: ./%s [ip] [port]\n", basename(argv[0]));
        exit(EXIT_FAILURE);
    }

    char *ip = argv[1];
    short port = atoi(argv[2]);
    
    int sock_fd;

    // 1.初始化一个Tcp Server
    sock_fd = init_server(ip, port);

    // 2.初始化reactor
    struct ntyreactor *reactor = ntyreactor_init();
    if( reactor == NULL)
    {
        printf("Error in %s(), ntyreactor_init: create reactor error\n", __func__);
        exit(EXIT_FAILURE);
    }

    // 3.将Tcp Server添加到reactor事件集中
    ntyreactor_addlistener(reactor, sock_fd, accept_callback);

    // 4.运行reactor
    ntyreactor_run(reactor);

    // 5.销毁
    ntyreactor_destroy(reactor);

    close(sock_fd);
    
    return 0;
}

int init_server(char *ip, short port)
{
    // 1.创建套接字
    int sock_fd = socket(AF_INET, SOCK_STREAM, 0);
    if(sock_fd == -1)
    {
        printf("Error in %s(), socket: %s\n", __func__, strerror(errno));
        return -1;
    }

    // 2.初始化服务器地址
    struct sockaddr_in server_addr;
    memset(&server_addr, 0, sizeof(server_addr));
    server_addr.sin_family = AF_INET;
    if(inet_pton(AF_INET, ip, (void*)&server_addr.sin_addr.s_addr) == -1)
    {
        printf("Error in %s(), inet_pton: %s\n", __func__, strerror(errno));
        return -1;
    }
    server_addr.sin_port = htons(port);

    // 3.绑定服务器地址
    if(bind(sock_fd, (const struct sockaddr*)&server_addr, sizeof(server_addr)) == -1)
    {
        printf("Error in %s(), bind: %s\n", __func__, strerror(errno));
        return -1;
    }

    // 3.监听
    if(listen(sock_fd, 20) == -1)
    {
        printf("Error in %s(), listen: %s\n", __func__, strerror(errno));
        return -1;
    }

    printf("Listen start [%s:%d]...\n", inet_ntoa(server_addr.sin_addr), ntohs(server_addr.sin_port));
    
    return sock_fd;
}

struct ntyreactor *ntyreactor_init()
{
    // 1.创建一个reactor
    struct ntyreactor *reactor = (struct ntyreactor*)malloc(sizeof(struct ntyreactor));
    if(reactor == NULL)
        return NULL;
    memset(reactor, 0, sizeof(struct ntyreactor));

    // 2.创建reacotr的epoll_fd
    reactor->epoll_fd = epoll_create(1);
    if(reactor->epoll_fd == -1)
    {
        printf("Error in %s(), epoll_create: %s\n", __func__, strerror(errno));
        free(reactor);
        return NULL;
    }

    // 3.创建reactor的事件集
    reactor->events = (struct ntyevent*)malloc(sizeof(struct ntyevent) * MAX_EPOLL_EVENTS);
    if(reactor->events == NULL)
    {
        printf("Error in %s(), malloc: %s\n", __func__, strerror(errno));
        close(reactor->epoll_fd);
        free(reactor);
        return NULL;
    }

    return reactor;
}

int ntyreactor_destroy(struct ntyreactor *reactor)
{
    if(reactor == NULL)
    {
        printf("Error in %s(): %s\n", __func__, "reactor arg is NULL");
        return -1;
    }

    // 关闭epoll_fd、销毁事件集、释放结构
    close(reactor->epoll_fd);
    free(reactor->events);

    free(reactor);
    
    return 0;
}

int ntyreactor_run(struct ntyreactor *reactor)
{
    // 1.判断参数
    if(reactor == NULL || reactor->epoll_fd < 0 || reactor->events == NULL)
    {
        printf("Error in %s(): %s\n", __func__, "reactor arg is error");
        return -1;
    }


    struct epoll_event ep_events[MAX_EPOLL_EVENTS + 1];

    // 2.进行epoll_wait()
    int nready;
    while(1)
    {
        // 超时检测
        /*
        int checkpos = 0, i;
        long now = time(NULL);
		for (i = 0; i < MAX_EPOLL_EVENTS; i++, checkpos ++) {
			if (checkpos == MAX_EPOLL_EVENTS) {
				checkpos = 0;
			}
            // 如果当前索引处的事件status为0, 则不检测, 进行下一个
			if (reactor->events[checkpos].status != 1) {
				continue;
			}

            // 如果超过60秒, 那么就认定为超时, 超时后关闭移除
			long duration = now - reactor->events[checkpos].last_active;
			if (duration >= 60) {
				close(reactor->events[checkpos].fd);
				printf("[fd=%d] timeout\n", reactor->events[checkpos].fd);
				nty_event_del(reactor->epfd, &reactor->events[checkpos]);
			}
		}*/
        
        nready = epoll_wait(reactor->epoll_fd, ep_events, MAX_EPOLL_EVENTS, 1000);
        // 3.函数出错
        if(nready == -1)
        {
            // 如果函数在阻塞过程中接收到信号, 那么继续进行epoll_wait()
            if(errno == EAGAIN || errno == EWOULDBLOCK)
                continue;
            printf("Error in %s(), epoll_wait: %s\n", __func__, strerror(errno));
            return -1;
        }
        // 4.函数超时
        else if(nready == 0)
            continue;
        // 5.有事件准备好
        else
        {
            // 遍历处理已就绪的事件
            int i;
            for(i = 0; i < nready; ++i)
            {
                // 获取事件结构体, 保存在struct epoll_event结构的data.ptr中
                struct ntyevent* ev = (struct ntyevent*)ep_events[i].data.ptr;

                // 如果事件可读
                if((ep_events[i].events & EPOLLIN) && (ev->events & EPOLLIN))
                    ev->callback(ev->fd, ev->events, ev->arg);

                // 如果事件可写
                if((ep_events[i].events & EPOLLOUT) && (ev->events & EPOLLOUT))
                    ev->callback(ev->fd, ev->events, ev->arg);
            }
        }
    }

    return 0;
}

int ntyreactor_addlistener(struct ntyreactor *reactor, int fd, NCALLBACK callback)
{
    if(reactor == NULL || fd <0 || callback == NULL)
    {
        printf("Error in %s(): %s\n", __func__, "arg error");
        return -1;
    }

    // 初始化ntyevent事件结构, 然后添加到reactor的epoll事件表中即可
    nty_event_set(&reactor->events[fd], fd, EPOLLIN, 0, 0, callback, reactor);
    nty_event_add(reactor->epoll_fd, &reactor->events[fd]);

    return 0;
}

int nty_event_set(struct ntyevent *ev, int fd, int event, int length, int status, NCALLBACK callback, void *arg)
{
    if(ev == NULL || fd <0 || event <0 || length < 0 || callback == NULL || arg == NULL || status < 0)
    {
        printf("Error in %s(): %s\n", __func__, "arg error");
        return -1;
    }

    // 初始化ntyevent结构的相关内容即可
    ev->fd = fd;
    ev->events = event;
    ev->arg = arg;
    ev->callback = callback;
    ev->status = status;
    ev->length = length;
    ev->last_active = time(NULL);

    return 0;
}

int nty_event_add(int epoll_fd, struct ntyevent* ev)
{
    if(epoll_fd <0 || ev == NULL)
    {
        printf("Error in %s(): %s\n", __func__, "arg error");
        return -1;
    }
    
    // 1.创建一个epoll事件结构
    struct epoll_event ep_event;
    memset(&ep_event, 0, sizeof(ep_event));
    ep_event.events = ev->events;
    ep_event.data.ptr = ev;
    //ep_event.data.fd = ev->fd; data成员是一个联合体, 不能同时使用fd和ptr成员

    // 2.如果当前ev已经在epoll事件表中, 那么就修改; 否则就把ev加入到epoll事件表中
    int op;
    if(ev->status == 0)
    {
        op = EPOLL_CTL_ADD;
        ev->status = 1;
    } 
    else
        op = EPOLL_CTL_MOD;

    // 3.添加/更新 
    if(epoll_ctl(epoll_fd, op, ev->fd, &ep_event) == -1)
    {
        printf("Error in %s(), epoll_ctl: %s\n", __func__, strerror(errno));
        return -1;
    }

    return 0;
}

int nty_event_del(int epoll_fd, struct ntyevent* ev)
{
    if(epoll_fd < 0 || ev == NULL || ev->status != 1)
    {
        printf("Error in %s(): %s\n", __func__, "ev arg is error");
        return -1;
    }

    // 初始要删除的epoll事件结构
    struct epoll_event ep_event;
    memset(&ep_event, 0, sizeof(ep_event));
    ep_event.data.ptr = ev;
    //ep_event.data.fd = ev->fd; data成员是一个枚举, 不能同时使用ptr和fd成员
    ev->status = 0;

    // 从epoll事件表中删除epoll事件
    if(epoll_ctl(epoll_fd, EPOLL_CTL_DEL, ev->fd, &ep_event) == -1)
    {
        printf("Error in %s(), epoll_ctl: %s\n", __func__, strerror(errno));
        return -1;
    }
    
    return 0;
}

int accept_callback(int fd, int events, void *arg)
{
    // 1.获得reactor结构
    struct ntyreactor *reactor = (struct ntyreactor*)arg;
    // 2.获取该fd对应的事件结构
    struct ntyevent *ev = reactor->events + fd;

    // 3.初始化客户端地址结构
    struct sockaddr_in cli_addr;
    memset(&cli_addr, 0 , sizeof(cli_addr));
    socklen_t len = sizeof(cli_addr);

    // 4.接收客户端
    int cli_fd;
    cli_fd = accept(ev->fd, (struct sockaddr*)&cli_addr, &len);
    if(cli_fd == -1)
    {
        printf("Error in %s(), accept: %s\n", __func__, strerror(errno));
        return -1;
    }

    int i;
    do {
        // 5.在reactor事件表中找到第一个空位置, 用i表示新事件存放的位置, 也是其套接字的值
        // reactor->events的0、1、2、3、4都被占用了, 客户端第一个可以使用的套接字为5, 因此此处从5开始遍历
        for(i = 5; i< MAX_EPOLL_EVENTS; ++i)
        {
            if(reactor->events[i].status == 0)
                break;
        }

        // 6.如果满了, 就退出
        if(i == MAX_EPOLL_EVENTS)
        {
            printf("Error in %s(): max connect limit[%d]\n", __func__, MAX_EPOLL_EVENTS);
            return -1;
        }

        // 7.将套接字设置为非阻塞
        int flag = 0;
		if ((flag = fcntl(cli_fd, F_SETFL, O_NONBLOCK)) < 0) {
            printf("Error in %s(), fcntl: %s\n", __func__, strerror(errno));
			return -1;
		}
        
        // 8.将新事件添加到reactor事件表中
        // 此处我们将新客户端的回调函数首先设置为recv_callback, 事件类型为EPOLLIN, 因为一般都是客户端向服务器发送数据的
        nty_event_set(&reactor->events[cli_fd], cli_fd, EPOLLIN, 0, 0, recv_callback, reactor);
        nty_event_add(reactor->epoll_fd, &reactor->events[cli_fd]);
    } while(0);

    printf("New connect: [%s:%d], [time:%ld], pos[%d]\n", \
        inet_ntoa(cli_addr.sin_addr), ntohs(cli_addr.sin_port), reactor->events[cli_fd].last_active, i);

    return 0;
}

int recv_callback(int fd, int events, void *arg)
{
    // 1.获得reactor结构
    struct ntyreactor *reactor =(struct ntyreactor*)arg;
    // 2.获取该fd对应的事件结构
    struct ntyevent *ev = reactor->events + fd;

    // 3.先将事件从epoll事件集移除
    nty_event_del(reactor->epoll_fd, ev);
    
    // 3.接收数据
    int rc = recv(ev->fd, ev->buffer, MAX_BUFFER_SIZE, 0);
    if(rc < 0)        //recv出错
    {
        //if(errno == EAGAIN || errno == EWOULDBLOCK)
        //    return rc;
        
        printf("Error in %s(), recv: %s\n", __func__, strerror(errno));

        // 此处我们不再需要将该nty_event从epoll事件集移除, 因为上面我们已经移除了
        close(ev->fd);
    }
    else if(rc == 0)  //对方关闭了
    {
        printf("Client closed the connection, fd = %d\n", ev->fd);

        // 此处我们也当做错误处理
        // 此处我们不再需要将该nty_event从epoll事件集移除, 因为上面我们已经移除了
        close(ev->fd);
    } 
    else              //接收到数据
    {
        ev->buffer[rc] = '\0';
        printf("Recv[fd = %d]: %s\n", ev->fd, ev->buffer);

        // 将事件变为可读, 然后加入到epoll事件表中
        nty_event_set(ev, ev->fd, EPOLLOUT, rc, 0, send_callback, reactor);
        nty_event_add(reactor->epoll_fd, ev);
    }

    return rc;
}

int send_callback(int fd, int events, void *arg)
{
    // 1.获得reactor结构
    struct ntyreactor *reactor =(struct ntyreactor*)arg;
    // 2.获取该fd对应的事件结构
    struct ntyevent *ev = reactor->events + fd;

    // 3.此处我们把接收的内容再回送给对象, 因此使用的是ev->buffer
    int rc = send(ev->fd, ev->buffer, ev->length, 0);
    if(rc > 0) //send成功
    {
        printf("Send[fd = %d]: %s\n", ev->fd, ev->buffer);

        // 移除、添加: 将其变为可读
        nty_event_del(reactor->epoll_fd, ev);
        nty_event_set(ev, ev->fd, EPOLLIN, 0, 0, recv_callback, reactor);
        nty_event_add(reactor->epoll_fd, ev);
    }
    else //send失败
    {
        printf("Error in %s(), send: %s\n", __func__, strerror(errno));

        // 关闭、移除
        close(ev->fd);
        nty_event_del(reactor->epoll_fd, ev);
    }

    return rc;
}

 

二、Proactor模式

Proactor模式特点

  • 与Reactor不同,Proactor模式将所有的I/O操作都交给主线程和内核来处理,工作线程仅仅负责业务逻辑

Proactor模式的工作流程

  • ①主线程调用aio_read函数向内核注册socket上读完成事件,并告诉内核用户读缓冲区的位置,以及读操作完成时如何通知应用程序(这里以信号为例)
  • ②主线程继续处理其他逻辑
  • ③当socket上的数据被读入用户缓冲区后,内核将向应用程序发送一个信号,以通知应用程序数据已经可用
  • ④应用程序预先定义好的信号处理函数选择一个工作线程来处理客户请求。工作线程处理完客户请求之后,调用aio_write函数向内核注册socket上的写完成事件,并告诉内核用户写缓冲区的位置,以及写操作完成时如何通知应用程序(这里以信号为例)
  • ⑤主线程继续处理其他逻辑
  • ⑥当用户缓冲区的数据被写入socket之后,内核将向应用程序发送一个信号,以通知应用程序数据已经发送完毕
  • ⑦应用程序预先定义好的信号处理函数选择一个工作线程来做善后处理,比如决定是否关闭socket

  • 在上图中,连接socket上的读写事件是通过aio_read/aio_write向内核注册的,因此内核将通过信号来向应用程序报告连接socket上的读写事件。所以,主线程的epoll_wait调用仅能用来检测监听socket上的连接请求事件,而不能用来检测连接socket的读写事件

三、使用同步I/O模拟Proactor模式

原理:

  • 主线程执行数据读写操作,读写完成之后,主线程向工作线程通知这一“完成事件”。那么从工作线程的角度来看,它们就直接获得了数据读写的结果,接下来要做的只是对读写的结果进行逻辑处理

工作流程:

  • ①主线程往epoll内核事件表中注册socket上的读就绪事件
  • ②主线程调用epoll_wait等待socket上有数据可读
  • ③当socket上有数据可读时,epoll_wait通知主线程。主线程从socket循环读取数据,直到没有更多数据可读,然后将读取到的数据封装成一个请求对象并插入请求队列
  • ④睡眠在请求队列上的某个工作线程被唤醒,它获得请求对象并处理客户请求,然后往epoll内核事件表中注册socket上的写就绪事件
  • ⑤主线程调用epoll_wait等到socket可写
  • ⑥当socket可写时,epoll_wait通知主线程。主线程往socket上写入服务器处理客户请求的结果

四、几种开源库

  • 下面是几种使用到上面技术的开源库:
    • libevent:名气最大,应用最广泛,历史悠久的跨平台事件库
    • libev:较 libevent 而言,设计更简练,性能更好,但对 Windows 支持不够好;
    • libuv:开发 node 的过程中需要一个跨平台的事件库,他们首选了 libev,但又要支持 Windows,故重新封装了一套,linux 下用 libev 实现,Windows 下用 IOCP 实现

优先级

  • libevent:激活的事件组织在优先级队列中,各类事件默认的优先级是相同的,可以通过设置 事件的优先级使其优先被处理
  • libev:也是通过优先级队列来管理激活的时间,也可以设置事件优先级
  • libuv:也是通过优先级队列来管理激活的时间,也可以设置事件优先级

事件循环

  • libevent:event_base 用于管理事件
  • libev:激活的事件组织在优先级队列中,各类事件默认的优先级是相同的,
  • libuv:可以通 过设置事件的优先级 使其优先被处理

线程安全

  • event_base 和 loop 都不是线程安全的,一个 event_base 或 loop 实例只能在用户的一个线程 内访问(一般是主线程),注册到 event_base 或者 loop 的 event 都是串行访问的,即每个执 行过程中,会按照优先级顺序访问已经激活的事件,执行其回调函数。所以在仅使用一个 event_base 或 loop 的情况下,回调函数的执行不存在并行关系
Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐