c++之epoll原理解析与Client Server功能实现
epoll详解
系列服务器开发
前言
从事服务端开发,少不了要接触网络编程。Epoll 作为 Linux 下高性能网络服务器的必备技术至关重要,Nginx、Redis、Skynet 和大部分游戏服务器都使用到这一多路复用技术。select,poll,epoll都是用来实现IO多路复用的机制,在Linux网络模型中对应着IO复用模型Unix上的IO模型。阻塞是进程调度的关键一环,指的是进程在等待某事件(如接收到网络数据)发生之前的等待状态,Recv、Select 和 Epoll 都是阻塞方法。
内核接收网络数据全过程
网卡接收数据的过程:
在 1 阶段网卡收到网线传来的数据。
经过 2 阶段的硬件电路的传输。
最终 3 阶段将数据写入到内存中的某个地址上,操作系统就可以去读取它们。
总结为:网卡会把接收到的数据写入内存。一般而言,由硬件产生的信号需要 CPU 立马做出回应,不然数据可能就丢失了,所以它的优先级很高。CPU 理应中断掉正在执行的程序,去做出响应;当 CPU 完成对硬件的响应后,再重新执行用户程序。
工作队列
操作系统为了支持多任务,实现了进程调度的功能,会把进程分为“运行”和“等待”等几种状态。
运行状态是进程获得 CPU 使用权,正在执行代码的状态;等待状态是阻塞状态,比如上述程序运行到 Recv 时,程序会从运行状态变为等待状态,接收到数据后又变回运行状态。
多路复用:
多路复用是一种同步I/O模型,实现一个线程可以监视多个文件句柄;
一旦某个文件句柄就绪,就能够通知应用程序进行相应的读写操作;
没有文件句柄就绪就会阻塞应用程序,交出CPU。
同步是在Linux下进行网络编程时,同步(Sync)/异步(Async),阻塞(Block)/非阻塞(Unblock)四种调用方式之一。
同步/异步:
同步(sync)和异步(async)的概念描述的是用户线程与内核的交互方式。
同步是指用户线程发起IO请求后需要等待或者轮询内核IO操作完成后才能继续执行;
异步是指用户线程发起IO请求后仍继续执行,当内核IO操作完成后会通知用户线程,或者调用用户线程注册的回调函数
阻塞/非阻塞:
阻塞和非阻塞的概念描述的是用户线程调用内核操作的方式。
阻塞是指IO操作在没有接收完数据或者没有得到结果之前不会返回,需要彻底完成后才返回到用户空间;
非阻塞是指IO操作后立即返回给用户一个状态值,无需等到IO操作彻底完成。
常见的IO模型有五种:
(1)同步阻塞IO(Blocking IO):即传统的IO模型,在linux中默认情况下所有的socket都是阻塞模式。发送方发送请求之后一直等待响应,接收方处理请求时进行的的IO操作如果不能马上等到返回结果,就一直等到返回结果后,才响应发送方,期间不能进行其它工作。listen()、read()、write() 等接口都是阻塞型的,一个简单的改进方案是在服务器端使用多线程(或多进程)。多线程(或多进程)的目的是让每个连接都拥有独立的线程(或进程),这样任何一个连接的阻塞都不会影响其他的连接。
(2)同步非阻塞IO(Non-blocking IO):默认创建的socket都是阻塞的,同步非阻塞IO是在同步阻塞IO的基础上,将socket设置为NONBLOCK,这个可以使用ioctl()系统调用设置。这样做发送方可以在发起IO请求后可以立即返回,如果该次读操作并未读取到任何数据,需要不断地发起IO请求,直到数据到达后,才真正读取到数据,继续执行。
(3)IO多路复用(IO Multiplexing):建立在内核提供的多路分离函数select基础之上的,使用select函数可以避免同步非阻塞IO模型中轮询等待的问题,此外poll、epoll都是这种模型。用户首先将需要进行IO操作的socket添加到select中,然后阻塞等待select系统调用返回。当数据到达时,socket被激活,select函数返回。用户线程正式发起read请求,读取数据并继续执行。使用select以后最大的优势是用户可以在一个线程内同时处理多个socket的IO请求。用户可以注册多个socket,然后不断地调用select读取被激活的socket,即可达到在同一个线程内同时处理多个IO请求的目的。
(4) 信号驱动IO(signal driven IO):调用sigaltion系统调用,当内核中IO数据就绪时以SIGIO信号通知请求进程,请求进程再把数据从内核读入到用户空间,这一步是阻塞的。
(5)异步IO(Asynchronous IO):也称为异步非阻塞IO。在IO多路复用模型中,事件循环将文件句柄的状态事件通知给用户线程,由用户线程自行读取数据、处理数据。而在异步IO模型中,当用户线程收到通知时,数据已经被内核读取完毕,并放在了用户线程指定的缓冲区内,内核在IO完成后通知用户线程直接使用即可。
基本代码流程如下:
//创建socket
int s = socket(AF_INET, SOCK_STREAM, 0);
//绑定
bind(s, ...)
//监听
listen(s, ...)
//接受客户端连接
int c = accept(s, ...)
//接收客户端数据
recv(c, ...);
//将数据打印出来
printf(...)
一、epoll是什么?
SELECT
select 实现多路复用的方式是,将已连接的 Socket 都放到一个文件描述符集合,然后调用 select 函数将文件描述符集合拷贝到内核里,让内核来检查是否有网络事件产生,检查的方式很粗暴,遍历文件描述符集合的方式,当检查到有事件产生后,将此 Socket 标记为可读或可写,接着再把整个文件描述符集合拷贝回用户态里,然后用户态还需要再通过遍历的方法找到可读或可写的 Socket,然后再对其处理。select 使用固定长度的 BitsMap,表示文件描述符集合,而且所支持的文件描述符的个是有限制的,在 Linux 系统中,有内核中的额FD_SETSIZE 限制,默认最大值为 1024,只能监听 0~1023 的文件描述符。select 需要进行 2 次遍历文件描述符集合,一次是在内核态里,一次是在用户态里,而且还会发生 2 次拷贝文件描述符集合,先从用户空间传入内核空间,由内核修改后,再传出到用户空间中。
int s = socket(AF_INET, SOCK_STREAM, 0);
bind(s, ...)
listen(s, ...)
int fds[] = 存放需要监听的socket
while(1){
int n = select(..., fds, ...)
for(int i=0; i < fds.count; i++){
if(FD_ISSET(fds[i], ...)){
//fds[i]的数据处理
}
}
}
这种简单方式行之有效,在几乎所有操作系统都有对应的实现。
缺点主要是:每次调用 Select 都需要将进程加入到所有监视 Socket 的等待队列,每次唤醒都需要从每个队列中移除。这里涉及了两次遍历,而且每次都要将整个 FDS 列表传递给内核,有一定的开销。
POLL
poll 不再用 BitsMap 来存储所关注的文件描述符,取而代之用动态数组,以链表形式来组织,突破了 select 的文件描述符个数限制,当然还会受到系统文件描述符的限制。
poll 和 select 并没有太大的本质区别,都是使用线性结构存储进程关注的 Socket 集合,因此都需要遍历文件描述符集合来找到可读或可写的 Socket,时间复杂度为 O(n),而且也需要在用户态与内核态之间拷贝文件描述符集合,这种方式随着并发数上来,性能损耗会呈指数级增长。
具体不细讲解了。
EPOLL
Select 低效的原因之一是将“维护等待队列”和“阻塞进程”两个步骤合二为一,Epoll 将这两个操作分开,先用 epoll_ctl 维护等待队列,再调用 epoll_wait 阻塞进程。显而易见epoll 通过两个方面,很好的解决了 select/poll 的问题。另一个原因在于程序不知道哪些 Socket 收到数据,只能一个个遍历。如果内核维护一个“就绪列表”,引用收到数据的 Socket,就能避免遍历。当进程被唤醒后,只要获取 Rdlist 的内容,就能够知道哪些 Socket 收到数据。
a)epoll 在内核里使用红黑树来跟踪进程所有待检测的文件描述字,把需要监控的 socket 通过 epoll_ctl()函数加入到内核中的红黑树里,红黑树是个高效的数据结构,增删查一般时间复杂度都是 O(logn),通过对这颗红黑树进行操作,这样就不需要像 select/poll 每次操作时都传入整个 socket 集合,只需要传入一个待检测的 socket,减少了内核和用户空间大量的数据拷贝和内存分配。
b)epoll 使用事件驱动的机制,内核里面维护了一个链表来记录就绪事件,当某个 socket 有事件发生时,通过回调函数内核会将其将入到这个就绪事件列表中,当用户调用epoll_wait()时,只会返回有事件发生的文件描述符的个数,不需要像 select/poll 那样轮询扫描整个 socket 集合,大大提高了检测的效率。
epoll 的方式监听的 Socket 数量越多的时候,效率不会大幅度降低,能够同时监听的 Socket 的数目也非常多,上限就为系统定义的进程打开的最大文件描述符个数。
epoll 支持两种事件触发模式,分别是边缘触发和水平触发。
a)使用边缘触发时,当被监控的 Socket 描述符有可读事件发生时,服务器端只会从 epoll_wait 中苏醒一次,即时进程没有调用 read 函数从内核读取数据,也依然只苏醒一次,因此我们程序要保证一次性将内核缓冲区的数据读取完。
b)使用水平触发模式时,当被监控的 Socket 上有可读事件发生时,服务端不断地从 epoll_wait 中苏醒,直到内核缓冲区数据被 read 函数读完才结束,目的是告诉我们有数据需要读取。
select/poll 只支持水平触发模式,epoll 默认的触发模式是水平触发,但是可以根据应用场景设置为边缘触发模式。
int s = socket(AF_INET, SOCK_STREAM, 0);
bind(s, ...)
listen(s, ...)
int epfd = epoll_create(...);
epoll_ctl(epfd, ...); //将所有需要监听的socket添加到epfd中
while(1){
int n = epoll_wait(...)
for(接收到数据的socket){
//处理
}
}
二、epoll相关函数解析
Epoll 在 Select 和 Poll 的基础上引入了 eventpoll 作为中间层,使用了先进的数据结构,是一种高效的多路复用技术。
1. int epoll_create(int size)
内核会产生一个epoll实例数据结构并返回一个文件描述符,这个特殊的描述符就是epoll实例的句柄,后面的两个接口都以它为中心(即epfd形参)。size参数表示所要监视文件描述符的最大值,不过在后来的Linux版本中已经被弃用(同时,size不要传0,会报invalid argument错误)
更新为
int epoll_create1(int flags);
0:如果这个参数是0,这个函数等价于poll_create(0)
EPOLL_CLOEXEC:这是这个参数唯一的有效值,如果这个参数设置为这个。
一般子进程会调用exec执行另一个程序,此时会用全新的程序替换子进程的正文,数据,堆和栈等。此时保存文件描述符的变量当然也不存在了,导致无法关闭无用的文件描述符了。所以会fork子进程后在子进程中直接执行close关掉无用的文件描述符,然后再执行exec。
有时我们fork子进程时已经不知道打开了多少个文件描述符(包括socket句柄等),这此时进行逐一清理确实有很大难度。我们期望的是能在fork子进程前打开某个文件句柄时就指定好:“这个句柄我在fork子进程后执行exec时就关闭”。即所谓的 close-on-exec。
2. int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
将被监听的描述符添加到红黑树或从红黑树中删除或者对监听事件进行修改,对于需要监视的文件描述符集合,epoll_ctl对红黑树进行管理,红黑树中每个成员由描述符值和所要监控的文件描述符指向的文件表项的引用等组成。
typedef union epoll_data {
void *ptr; /* 指向用户自定义数据 */
int fd; /* 注册的文件描述符 */
uint32_t u32; /* 32-bit integer */
uint64_t u64; /* 64-bit integer */
} epoll_data_t;
struct epoll_event {
uint32_t events; /* 描述epoll事件 */
epoll_data_t data; /* 见上面的结构体 */
};
op参数说明操作类型:
EPOLL_CTL_ADD:向interest list添加一个需要监视的描述符
EPOLL_CTL_DEL:从interest list中删除一个描述符
EPOLL_CTL_MOD:修改interest list中一个描述符
struct epoll_event结构描述一个文件描述符的epoll行为。在使用epoll_wait函数返回处于ready状态的描述符列表时,
data域是唯一能给出描述符信息的字段,所以在调用epoll_ctl加入一个需要检测的描述符时,一定要在此域写入描述符相关信息events域是bitmask,描述一组epoll事件,在epoll_ctl调用中解释为:描述符所期望的epoll事件,可多选。
常用的epoll事件描述如下:
EPOLLIN:描述符处于可读状态
EPOLLOUT:描述符处于可写状态
EPOLLET:将epoll event通知模式设置成edge triggered
EPOLLONESHOT:第一次进行通知,之后不再监测
EPOLLHUP:本端描述符产生一个挂断事件,默认监测事件
EPOLLRDHUP:对端描述符产生一个挂断事件
EPOLLPRI:由带外数据触发
EPOLLERR:描述符产生错误时触发,默认检测事件
3. int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout)
功能:阻塞等待注册的事件发生,返回事件的数目,并将触发的事件写入events数组中。
events: 用来记录被触发的events,其大小应该和maxevents一致
maxevents: 返回的events的最大个数
处于ready状态的那些文件描述符会被复制进ready list中,epoll_wait用于向用户进程返回ready list。events和maxevents两个参数描述一个由用户分配的struct epoll event数组,调用返回时,内核将ready list复制到这个数组中,并将实际复制的个数作为返回值。注意,如果ready list比maxevents长,则只能复制前maxevents个成员;反之,则能够完全复制ready list。
另外,struct epoll event结构中的events域在这里的解释是:在被监测的文件描述符上实际发生的事件。
参数timeout描述在函数调用中阻塞时间上限,单位是ms:
timeout = -1 表示调用将一直阻塞,直到有文件描述符进入ready状态或者捕获到信号才返回;
timeout = 0 用于非阻塞检测是否有描述符处于ready状态,不管结果怎么样,调用都立即返回;
timeout > 0 表示调用将最多持续timeout时间,如果期间有检测对象变为ready状态或者捕获到信号则返回,否则直到超时。
4. int eventfd(unsigned int initval , int flags )
eventfd () 返回一个新的文件描述符,可以用来引用 eventfd 对象。
该对象包含一个由内核维护的无符号 64 位整数 ( uint64_t ) 计数器。此计数器使用参数 initval 中指定的值进行初始化。
initval:创建eventfd时它所对应的64位计数器的初始值;
flags: eventfd文件描述符的标志,用以改变 eventfd 的行为,可以使用以下三个标志位进行或的结果:
EFD_CLOEXEC:在新的文件描述符上设置 close-on-exec ( FD_CLOEXEC ) 标志,简单说就是 fork 子进程时不继承,对于多线程的程序设上这个值不会有错的。
EFD_NONBLOCK:文件会被设置成 O_NONBLOCK,读操作不阻塞。若不设置,一直阻塞直到计数器中的值大于0。
EFD_SEMAPHORE:(自2.6.30起支持)支持 semophore语义的 read,每次读操作,计数器的值自减1。
5. epoll_event
epoll_event 结构体的定义如上所示,分为 events 和 data 两个部分。events 是 epoll 注册的事件,比如EPOLLIN、EPOLLOUT等等,这个参数在epoll_ctl注册事件时,可以明确告知注册事件的类型。
void* ptr指针主要是为了自定义的数据结构,fd主要用于存放文件描述符。
struct epoll_event
{
uint32_t events;
epoll_data_t data;
} __attribute__ ((__packed__));
typedef union epoll_data
{
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
三、epoll服务端样例
/**********************server***********************/
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/epoll.h>
#include <pthread.h>
#include <sys/syscall.h>
#define SERVER_CONSUMER "./server_consumer"
#define CLIENT_SERVER "./client_server"
#define N_THREAD 3 //线程数
#define DATA_SIZE 5 //数据大小
#define BUFFER_SIZE 20 //缓冲区大小
int fd[N_THREAD]; //server-consumer-pipe描述符
//定义循环队列缓冲区
typedef struct Queue{
int rear;
int front;
int elem[BUFFER_SIZE];
}Queue;
void initQueue(Queue* q)
{
memset(q, 0, sizeof(Queue));
}
int isEmpty(Queue* q)
{
return q->rear == q->front;
}
int isFull(Queue* q)
{
return (q->rear + 1) % BUFFER_SIZE == q->front;
}
int push(Queue* q, int data)
{
if(isFull(q))
return 0;
q->elem[q->rear] = data;
q->rear = (q->rear + 1) % BUFFER_SIZE;
return 1;
}
int pop(Queue* q, int* data)
{
if(isEmpty(q))
return 0;
*data = q->elem[q->front];
q->front = (q->front + 1) % BUFFER_SIZE;
return 1;
}
//创建消费者任务,其消费者通过id区分
void* consumer(void *arg)
{
int id = *((int*)arg);
char buf[DATA_SIZE] = {0};
while(1){
memset(buf, 0, sizeof(buf));
read(fd[id], buf, sizeof(buf));
sleep(rand() % 3 + 1);
int data;
sscanf(buf, "%d", &data);
printf("id:%d data:%d\n", id, data);
}
return nullptr;
}
int epollserver()
{
//初始化环形队列
Queue buffer;
initQueue(&buffer);
//创建并打开server-consumer-pipe
for(int i = 0; i < N_THREAD; ++i){
char path[128] = {0};
sprintf(path, "%s%d", SERVER_CONSUMER, i);
mkfifo(path, 0666);
fd[i] = open(path, O_RDWR);
}
//打开client-server-pipe(由client创建)
int cs = open(CLIENT_SERVER, O_RDONLY);
//创建N个消费者子线程
pthread_t tid[N_THREAD];
int id[N_THREAD]; //线程标识
for(int i = 0; i < N_THREAD; ++i){
id[i] = i;
pthread_create(&tid[i], NULL, consumer, id + i);
}
//创建epoll实例
int epfd = epoll_create(N_THREAD + 1);
struct epoll_event event[N_THREAD + 1];
for(int i = 0; i < N_THREAD; ++i){
event[i].data.fd = fd[i];
event[i].events = EPOLLOUT; //监听写事件
epoll_ctl(epfd, EPOLL_CTL_ADD, fd[i], event + i);
}
event[N_THREAD].data.fd = cs;
event[N_THREAD].events = EPOLLIN; //监听读事件
epoll_ctl(epfd, EPOLL_CTL_ADD, cs, event + N_THREAD);
//监听epoll,等待事件可读可写的事件返回
struct epoll_event wait_event[N_THREAD + 1];
while(1){
int n = epoll_wait(epfd, wait_event, N_THREAD + 1, -1);
char buf[DATA_SIZE] = {0};
for(int i = 0; i < n; ++i){
if(wait_event[i].data.fd == cs){
memset(buf, 0, sizeof(buf));
read(cs, buf, sizeof(buf));
if(!isFull(&buffer)){
int data;
sscanf(buf, "%d", &data);
push(&buffer, data);
}
}
else{
if(!isEmpty(&buffer)){
int data;
pop(&buffer, &data);
memset(buf, 0, sizeof(buf));
sprintf(buf, "%d", data);
write(wait_event[i].data.fd, buf, sizeof(buf));
}
}
}
}
//等待线程退出
for(int i = 0; i < N_THREAD; ++i){
pthread_join(tid[i], NULL);
}
//关闭文件句柄
for(int i = 0; i < N_THREAD; ++i){
close(fd[i]);
}
return 0;
}
四、epoll客户端样例
/**********************client***********************/
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/epoll.h>
#include <pthread.h>
#include <sys/syscall.h>
#define PRODUCER_CLIENT "./producer_client"
#define CLIENT_SERVER "./client_server"
#define N_THREAD 3 //线程数
#define DATA_SIZE 5 //数据大小
#define BUFFER_SIZE 20//缓冲区大小
int cp[N_THREAD]; //client-producer-pipe描述符
//定义缓冲区
typedef struct Queue{
int rear;
int front;
int elem[BUFFER_SIZE];
}Queue;
void* producer(void *arg)
{
int id = *((int*)arg);
char buf[DATA_SIZE] = {0};
while(1){
sleep(rand() % 3 + 1);
int data = rand() % 1000;
sprintf(buf, "%d", data);
write(cp[id], buf, sizeof(buf));
printf("id:%d data:%d\n", id, data);
memset(buf, 0, sizeof(buf));
}
}
int isEmpty(Queue* q)
{
return q->rear == q->front;
}
int isFull(Queue* q)
{
return (q->rear + 1) % BUFFER_SIZE == q->front;
}
void initQueue(Queue* q)
{
memset(q, 0, sizeof(Queue));
}
int push(Queue* q, int data)
{
if(isFull(q))
return 0;
q->elem[q->rear] = data;
q->rear = (q->rear + 1) % BUFFER_SIZE;
return 1;
}
int pop(Queue* q, int* data)
{
if(isEmpty(q))
return 0;
*data = q->elem[q->front];
q->front = (q->front + 1) % BUFFER_SIZE;
return 1;
}
int epoolclient(int argc, char *argv[])
{
Queue buffer;
initQueue(&buffer);
//创建并打开client-producer-pipe
for(int i = 0; i < N_THREAD; ++i){
char path[128] = {0};
sprintf(path, "%s%d", PRODUCER_CLIENT, i);
mkfifo(path, 0666);
cp[i] = open(path, O_RDWR);
}
//创建并打开client-server-pipe
mkfifo(CLIENT_SERVER, 0666);
int cs = open(CLIENT_SERVER, O_WRONLY);
//创建生产者子线程
pthread_t tid[N_THREAD];
int id[N_THREAD]; //线程标识
for(int i = 0; i < N_THREAD; ++i){
id[i] = i;
pthread_create(&tid[i], NULL, producer, id + i);
}
//创建epoll
int epfd = epoll_create(N_THREAD + 1);
struct epoll_event event[N_THREAD + 1];
for(int i = 0; i < N_THREAD; ++i){
event[i].data.fd = cp[i];
event[i].events = EPOLLIN;
epoll_ctl(epfd, EPOLL_CTL_ADD, cp[i], event + i);
}
event[N_THREAD].data.fd = cs;
event[N_THREAD].events = EPOLLOUT;
epoll_ctl(epfd, EPOLL_CTL_ADD, cs, event + N_THREAD);
//监听epoll
struct epoll_event wait_event[N_THREAD + 1];
char buf[DATA_SIZE] = {0};
while(1){
int n = epoll_wait(epfd, wait_event, N_THREAD + 1, -1);
for(int i = 0; i < n; ++i){
if(wait_event[i].data.fd == cs){
if(!isEmpty(&buffer)){
int data;
pop(&buffer, &data);
memset(buf, 0, sizeof(buf));
sprintf(buf, "%d", data);
write(cs, buf, sizeof(buf));
}
}
else{
memset(buf, 0, sizeof(buf));
read(wait_event[i].data.fd, buf, sizeof(buf));
if(!isFull(&buffer)){
int data;
sscanf(buf, "%d", &data);
push(&buffer, data);
}
}
}
}
for(int i = 0; i < N_THREAD; ++i){
pthread_join(tid[i], NULL);
}
for(int i = 0; i < N_THREAD; ++i){
close(cp[i]);
}
close(cs);
return 0;
}
总结
Epoll的优点:
1.支持一个进程打开大数目的socket描述符(FD)。(它所支持的FD上限是最大可以打开文件的数目,一般远大于2048,当,在1GB内存的机器上大约是10万左右,具体数目可以cat /proc/sys/fs/file-max)
2.IO效率不随FD数目增加而线性下降。(只会对"活跃"的socket进行操作—这是因为在内核实现中epoll是根据每个fd上面的callback函数实现的。那么,只有"活跃"的socket才会主动的去调用 callback函数,其他idle状态socket则不会,在这点上,epoll实现了一个"伪"AIO,因为这时候推动力在os内核)
3.使用mmap加速内核与用户空间的消息传递。(epoll是通过内核于用户空间mmap同一块内存实现的)
参考:https://blog.csdn.net/armlinuxww/article/details/92803381
参考:https://blog.csdn.net/qq_52670477/article/details/126707016
参考:https://blog.csdn.net/weixin_53331739/article/details/126285124
更多推荐
所有评论(0)