注:该文用于个人学习记录和知识交流,如有不足,欢迎指点。

一、proreactor 与 reactor的区别

reactor:

  • poll + io
  • EPOLLIN:数据可以读
  • 一个事件对应一个动作

proactor:

  • io_uring
  • EVENT_READ:数据已经读出来了
  • 一个事件对应一个结果

二、测试维度:

  1. 建链时间(建立100万个连接的耗时)
  2. qps:每秒处理的响应数:(测试数据大小依次为128,512,1k,2k的回显响应)
  3. io并发 (能否建立100万个连接)

三、 测试不同连接数情况下的qps(客户端一线程一个连接)

1. reactor

连接数:1

连接数:2

连接数:3

连接数:4

连接数:5

连接数:6

连接数:7

在高并发下极限是6万多

  • 为什么随着连接越来越多的时候,qps逐渐增多。
  • 是因为,单连接的时候,限制qps的是用户发送请求的数量,而非服务器处理本身。
  • 当连接数增加到6之后(每个连接都分配一个线程发送请求,即6个连接同时发送请求)qps不在增加,而是稳定到6万多,说明这是服务器的处理极限
  • 当然也不是很严谨,毕竟我这个是单个客户端创建6个线程来模拟6条连接同时发送请求,但是我的CPU处理器只有4个,CPU是轮询来处理的,严格来讲也不是同时发送的;

2. proactor

连接数:1

连接数:2

连接数:3

连接数:4

连接数:5

连接数:6

连接数:7

最终测得的也是6万多左右,说明异步io的性能媲美epoll+同步io。

三、测试字节数变化qps的对比

在连接数位7的情况下

1. reactor的测试:epoll+io

字节:128

字节:512

字节:1024

字节:2048

2. proactor的测试:

字节:128

字节:512

字节:1024

字节:2048

五、测试代码:

1. epoll_tcp_server.c (reactor模式)

#include <sys/socket.h>
#include <errno.h>
#include <netinet/in.h>

#include <stdio.h>
#include <string.h>
#include <unistd.h>

#include <pthread.h>
#include <sys/poll.h>
#include <sys/epoll.h>
#include <sys/time.h>

#define BUFFER_LENGTH 2048
#define CONNECTIONS 1000 // 1048576
#define PORT 8000
#define PORT_COUNT 1 // 100

typedef int (*RCALLBACK)(int fd);

int accept_cb(int fd);
int recv_cb(int fd);
int send_cb(int fd);

struct conn_item
{
	int fd;

	char rbuffer[BUFFER_LENGTH];
	int rlen;
	char wbuffer[BUFFER_LENGTH];
	int wlen;

	union
	{
		RCALLBACK accept_callback;
		RCALLBACK recv_callback;
	} recv_t;
	RCALLBACK send_callback;
};

int epfd = 0;
struct conn_item connlist[CONNECTIONS] = {0};

struct timeval tv;

#define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)

int set_event(int fd, int event, int flag)
{

	if (flag)
	{
		struct epoll_event ev;
		ev.events = event;
		ev.data.fd = fd;
		epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev);
	}
	else
	{

		struct epoll_event ev;
		ev.events = event;
		ev.data.fd = fd;
		epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);
	}
}

int accept_cb(int fd)
{

	struct sockaddr_in clientaddr;
	socklen_t len = sizeof(clientaddr);

	int clientfd = accept(fd, (struct sockaddr *)&clientaddr, &len);
	if (clientfd < 0)
	{
		return -1;
	}
	set_event(clientfd, EPOLLIN, 1);

	connlist[clientfd].fd = clientfd;
	memset(connlist[clientfd].rbuffer, 0, BUFFER_LENGTH);
	connlist[clientfd].rlen = 0;
	memset(connlist[clientfd].wbuffer, 0, BUFFER_LENGTH);
	connlist[clientfd].wlen = 0;

	connlist[clientfd].recv_t.recv_callback = recv_cb;
	connlist[clientfd].send_callback = send_cb;

	if ((clientfd % 1000) == 999)
	{
		struct timeval tv_cur;
		gettimeofday(&tv_cur, NULL);
		int time_used = TIME_SUB_MS(tv_cur, tv);

		memcpy(&tv, &tv_cur, sizeof(struct timeval));

		printf("clientfd : %d, time_used: %d\n", clientfd, time_used);
	}

	return clientfd;
}

int recv_cb(int fd)
{

	char *buffer = connlist[fd].rbuffer;
	int idx = connlist[fd].rlen;

	int count = recv(fd, buffer + idx, BUFFER_LENGTH - idx, 0);
	if (count == 0)
	{
		// printf("disconnect\n");

		epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
		close(fd);
		connlist[fd].fd = 0;

		return -1;
	}
	connlist[fd].rlen += count;

	memcpy(connlist[fd].wbuffer, connlist[fd].rbuffer, connlist[fd].rlen);
	connlist[fd].wlen = connlist[fd].rlen;
	connlist[fd].rlen -= connlist[fd].rlen;

	set_event(fd, EPOLLOUT, 0);

	return count;
}

int send_cb(int fd)
{

	char *buffer = connlist[fd].wbuffer;
	int idx = connlist[fd].wlen;

	int count = send(fd, buffer, idx, 0);

	set_event(fd, EPOLLIN, 0);

	return count;
}

int init_server(unsigned short port)
{

	int sockfd = socket(AF_INET, SOCK_STREAM, 0);

	int reuse = 1;

	if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) < 0)
	{
		return -1;
	}

	struct sockaddr_in serveraddr;
	memset(&serveraddr, 0, sizeof(struct sockaddr_in));

	serveraddr.sin_family = AF_INET;
	serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
	serveraddr.sin_port = htons(port);

	if (-1 == bind(sockfd, (struct sockaddr *)&serveraddr, sizeof(struct sockaddr)))
	{
		perror("bind");
		return -1;
	}

	listen(sockfd, 10);

	return sockfd;
}

int close_server()
{
	for (int i = 0; i < PORT_COUNT + 100; i++)
	{
		if (connlist[i].fd != 0)
		{
			close(connlist[i].fd);
		}
	}
}

// tcp
int main()
{

	int i = 0;

	epfd = epoll_create(1); // int size

	for (i = 0; i < PORT_COUNT; i++)
	{
		int sockfd = init_server(PORT + i);
		connlist[sockfd].fd = sockfd;
		connlist[sockfd].recv_t.accept_callback = accept_cb;
		set_event(sockfd, EPOLLIN, 1);
		printf("listen at: %d\n", PORT + i);
	}

	gettimeofday(&tv, NULL);

	struct epoll_event events[CONNECTIONS] = {0};

	while (1)
	{

		int nready = epoll_wait(epfd, events, CONNECTIONS, -1); //

		int i = 0;
		for (i = 0; i < nready; i++)
		{

			int connfd = events[i].data.fd;
			if (events[i].events & EPOLLIN)
			{ //

				int count = connlist[connfd].recv_t.recv_callback(connfd);
				// printf("recv count: %d <-- buffer: %s\n", count, connlist[connfd].rbuffer);
			}
			else if (events[i].events & EPOLLOUT)
			{
				// printf("send --> buffer: %s\n",  connlist[connfd].wbuffer);

				int count = connlist[connfd].send_callback(connfd);
			}
		}
	}

	close_server();
}

2. uring_tcp_server

#include <stdio.h>      // 标准输入输出(打印日志、错误信息)
#include <liburing.h>   // io_uring库(Linux异步IO框架,替代epoll实现高效IO)
#include <netinet/in.h> // 网络地址结构(sockaddr_in等IPv4相关定义)
#include <string.h>     // 内存操作(memset、memcpy等)
#include <unistd.h>     // 系统调用(close等)
#include <stdlib.h>     // 动态内存管理(malloc、free等)

// 编译:gcc -o uring_tcp_server uring_tcp_server.c -luring

// 事件类型宏定义:用于标识io_uring中待处理的事件类型
#define EVENT_ACCEPT 0 // 接受客户端连接事件(监听socket专用)
#define EVENT_READ 1   // 从客户端读取数据事件(客户端连接专用)
#define EVENT_WRITE 2  // 向客户端写入数据事件(客户端连接专用)
#define PORT 8000
#define BUFFER_LENGTH 2048

/**
 * 连接上下文结构体:每个客户端连接的专属数据载体
 * 作用:绑定连接的文件描述符、缓冲区及事件状态,避免多连接数据冲突
 */
struct conn_ctx
{
    int fd;                      // 连接的socket文件描述符(监听socket/客户端socket)
    int event;                   // 当前等待的事件类型(对应EVENT_*宏)
    char rbuffer[BUFFER_LENGTH]; // 专属读缓冲区:存储从客户端读取的数据(1024字节)
    char wbuffer[BUFFER_LENGTH]; // 专属写缓冲区:存储待发送给客户端的数据(1024字节)
    ssize_t rlen;                // 实际读取的字节数(记录rbuffer中有效数据长度)
    ssize_t wlen;                // 待发送的字节数(记录wbuffer中有效数据长度)
};

/**
 * 初始化TCP服务器监听socket
 * @param port 服务器监听的端口号
 * @return 成功返回监听socket的文件描述符,失败返回-1
 */
int init_server(unsigned short port)
{
    // 1. 创建TCP socket(IPv4协议族,字节流套接字)
    // AF_INET:IPv4地址族;SOCK_STREAM:TCP协议(面向连接);0:默认协议(TCP)
    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sockfd == -1)
    {
        perror("socket创建失败"); // 打印错误原因(如权限不足、系统资源耗尽)
        return -1;
    }

    int reuse = 1;

    if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) < 0)
    {
        return -1;
    }

    // 2. 初始化服务器地址结构
    struct sockaddr_in serveraddr;
    memset(&serveraddr, 0, sizeof(struct sockaddr_in)); // 清空结构体(避免随机值干扰)
    serveraddr.sin_family = AF_INET;                    // 使用IPv4协议
    serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);     // 监听所有本地网卡(0.0.0.0)
    serveraddr.sin_port = htons(port);                  // 端口号转换为网络字节序(大端)

    // 3. 绑定socket到指定端口(将socket与服务器地址关联)
    if (-1 == bind(sockfd, (struct sockaddr *)&serveraddr, sizeof(struct sockaddr)))
    {
        perror("bind绑定端口失败"); // 错误可能:端口被占用、无权限使用特权端口
        close(sockfd);              // 绑定失败需关闭socket释放资源
        return -1;
    }

    // 4. 开始监听连接(将主动socket转为被动socket,准备接受客户端连接)
    // 第二个参数1024:未完成连接队列(三次握手未完成)的最大长度(高并发场景需调大)
    listen(sockfd, 1024);
    return sockfd; // 返回监听socket的文件描述符
}

// io_uring提交队列(SQ)的最大长度:最多同时等待处理1024个IO事件
#define ENTRIES_LENGTH 1024

/**
 * 向io_uring注册"异步读"事件
 * 作用:告知内核"当该socket有数据可读时,读取数据到专属缓冲区"
 * @param ring io_uring实例(管理提交队列和完成队列)
 * @param ctx 连接上下文(包含socket fd、读缓冲区等)
 * @return 0成功,-1失败
 */
int set_event_recv(struct io_uring *ring, struct conn_ctx *ctx)
{
    // 从io_uring的提交队列(SQ)中获取一个空闲的队列项(SQE)
    // SQE(Submission Queue Entry):用户态描述待执行的IO操作(此处为读操作)
    struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
    if (!sqe) // 若提交队列已满,获取失败
    {
        perror("获取SQE失败");
        return -1;
    }

    // 填充SQE:设置异步读操作(底层对应recv系统调用)
    // 参数:sqe队列项、目标socket fd、读缓冲区(专属rbuffer)、最大读取长度、 flags(0为默认)
    io_uring_prep_recv(sqe, ctx->fd, ctx->rbuffer, sizeof(ctx->rbuffer), 0);
    // 将连接上下文指针存入SQE的user_data:事件完成时通过该字段定位到对应的连接
    sqe->user_data = (unsigned long)ctx; // user_data是个64位的无符号整数,我们这里做个强转

    return 0;
}

/**
 * 向io_uring注册"异步写"事件
 * 作用:告知内核"当该socket可写时,将专属缓冲区的数据发送给客户端"
 * @param ring io_uring实例
 * @param ctx 连接上下文(包含socket fd、写缓冲区及待发送长度)
 * @return 0成功,-1失败
 */
int set_event_send(struct io_uring *ring, struct conn_ctx *ctx)
{
    // 获取一个空闲的SQE(提交队列项)
    struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
    if (!sqe)
    {
        perror("获取SQE失败");
        return -1;
    }

    // 填充SQE:设置异步写操作(底层对应send系统调用)
    // 参数:sqe队列项、目标socket fd、写缓冲区(专属wbuffer)、待发送长度(rlen记录的读取长度)
    io_uring_prep_send(sqe, ctx->fd, ctx->wbuffer, ctx->rlen, 0);
    // 存储连接上下文指针:事件完成时定位到对应的连接
    sqe->user_data = (unsigned long)ctx;

    return 0;
}

/**
 * 向io_uring注册"异步接受连接"事件
 * 作用:告知内核"当有新客户端连接时,接受连接并返回新的socket fd"
 * @param ring io_uring实例
 * @param sockfd 监听socket的文件描述符
 * @return 0成功,-1失败
 */
int set_event_accept(struct io_uring *ring, int sockfd)
{
    // 获取一个空闲的SQE(提交队列项)
    struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
    if (!sqe)
    {
        perror("获取SQE失败");
        return -1;
    }

    // 为接受连接操作分配临时上下文(仅用于存储监听socket信息)
    struct conn_ctx *listen_ctx = malloc(sizeof(struct conn_ctx));
    if (!listen_ctx)
    {
        perror("malloc listen_ctx失败");
        return -1;
    }
    listen_ctx->fd = sockfd;          // 绑定监听socket fd
    listen_ctx->event = EVENT_ACCEPT; // 标记为接受连接事件

    // 分配客户端地址结构(用于存储新连接的客户端IP和端口)
    struct sockaddr_in *clientaddr = malloc(sizeof(struct sockaddr_in));
    socklen_t *len = malloc(sizeof(socklen_t)); // 地址长度变量
    *len = sizeof(struct sockaddr_in);          // 初始化长度为IPv4地址结构大小

    // 填充SQE:设置异步接受连接操作(底层对应accept系统调用)
    // 参数:sqe队列项、监听socket fd、客户端地址指针、地址长度指针、flags(0为默认)
    io_uring_prep_accept(sqe, sockfd, (struct sockaddr *)clientaddr, len, 0);
    // 存储临时上下文指针:事件完成时通过该字段获取监听信息
    sqe->user_data = (unsigned long)listen_ctx;

    // 临时存储客户端地址和长度的指针(避免使用全局变量,保证线程安全)
    // 利用listen_ctx的缓冲区暂存指针,后续从这里恢复
    memcpy(listen_ctx->wbuffer, &clientaddr, sizeof(void *));
    memcpy(listen_ctx->rbuffer, &len, sizeof(void *));

    return 0;
}

int main(int argc, char *argv[])
{

    // 初始化服务器:创建并返回监听socket
    int sockfd = init_server(PORT);
    if (sockfd == -1) // 初始化失败则退出
    {
        return -1;
    }
    printf("服务器启动,监听端口 %d\n", PORT);

    // 初始化io_uring
    struct io_uring_params params;      // io_uring初始化参数(使用默认配置)
    memset(&params, 0, sizeof(params)); // 清空参数

    struct io_uring ring; // io_uring实例(管理提交队列SQ和完成队列CQ)
    // 初始化io_uring:创建容量为ENTRIES_LENGTH的队列
    // 内部通过io_uring_setup系统调用创建内核队列,并通过mmap映射到用户态(减少数据拷贝)
    if (io_uring_queue_init_params(ENTRIES_LENGTH, &ring, &params) != 0)
    {
        perror("io_uring初始化失败");
        close(sockfd); // 失败需释放监听socket
        return -1;
    }

    // 注册初始事件:向io_uring提交"接受连接"事件(开始监听新连接)
    set_event_accept(&ring, sockfd);

    // 主循环:持续处理io_uring中的事件(接受连接、读数据、写数据)
    while (1)
    {
        // 1. 提交所有已准备好的SQE(提交队列中的事件)到内核
        // 内核会异步处理这些IO操作,完成后将结果放入完成队列CQ(无需用户主动轮询)
        io_uring_submit(&ring);

        // 2. 批量获取完成队列(CQ)中的事件(最多128个)
        // 相比单次获取,批量处理减少用户态与内核态交互,提升效率
        struct io_uring_cqe *cqes[128]; // 存储完成的事件
        // 从完成队列中获取事件,返回实际获取的数量(nready)
        int nready = io_uring_peek_batch_cqe(&ring, cqes, 128);
        if (nready < 0) // 获取失败(如系统调用错误)
        {
            perror("io_uring获取完成事件失败");
            break;
        }

        // 3. 遍历处理每个完成的事件
        for (int i = 0; i < nready; i++)
        {
            struct io_uring_cqe *cqe = cqes[i]; // 当前完成的事件
            // 从事件的user_data中恢复连接上下文(之前注册时存入的指针)
            struct conn_ctx *ctx = (struct conn_ctx *)(cqe->user_data);

            // 处理"接受连接"事件(EVENT_ACCEPT)
            if (ctx->event == EVENT_ACCEPT)
            {
                // 恢复客户端地址和长度的指针(从临时上下文的缓冲区中)
                struct sockaddr_in **clientaddr = (struct sockaddr_in **)ctx->wbuffer;
                socklen_t **len = (socklen_t **)ctx->rbuffer;

                // 重新注册"接受连接"事件(一次accept只能处理一个连接,需持续监听新连接)
                set_event_accept(&ring, sockfd);

                // 从完成事件中获取新连接的socket fd(cqe->res是accept的返回值)
                int connfd = cqe->res;
                if (connfd == -1) // 接受连接失败(如队列满)
                {
                    perror("接受连接失败");
                }
                else // 接受连接成功
                {
                    // 为新连接分配专属上下文(含独立缓冲区,避免多连接数据冲突)
                    struct conn_ctx *new_ctx = malloc(sizeof(struct conn_ctx));
                    if (!new_ctx) // 内存分配失败
                    {
                        perror("malloc new_ctx失败");
                        close(connfd); // 关闭新连接,避免资源泄漏
                    }
                    else // 初始化新连接上下文
                    {
                        memset(new_ctx, 0, sizeof(struct conn_ctx)); // 清空缓冲区
                        new_ctx->fd = connfd;                        // 绑定新连接的fd
                        new_ctx->event = EVENT_READ;                 // 初始事件为"读"(等待客户端发数据)
                        // 注册读事件:告知内核"当该连接有数据时,读取到专属rbuffer"
                        set_event_recv(&ring, new_ctx);
                    }
                }

                // 释放临时资源(监听上下文、客户端地址和长度)
                free(*clientaddr); // 释放客户端地址结构
                free(*len);        // 释放地址长度变量
                free(ctx);         // 释放监听上下文
            }

            // 处理"读数据"事件(EVENT_READ):客户端发送数据后触发
            else if (ctx->event == EVENT_READ)
            {
                ssize_t ret = cqe->res; // ret是recv的返回值:>0为读取字节数;0为客户端关闭;-1为错误

                if (ret <= 0) // 客户端关闭连接或读取失败
                {
                    close(ctx->fd); // 关闭连接socket
                    free(ctx);      // 释放连接上下文(避免内存泄漏)
                }
                else // 成功读取到数据(ret为实际读取的字节数)
                {
                    ctx->rlen = ret; // 记录读取长度(用于后续回显)
                    // 将读缓冲区的数据复制到写缓冲区(准备回显,使用专属缓冲区避免冲突)
                    memcpy(ctx->wbuffer, ctx->rbuffer, ret);
                    // 切换事件类型为"写",并注册写事件(告知内核"可写时发送数据")
                    ctx->event = EVENT_WRITE;
                    set_event_send(&ring, ctx);
                }
            }

            // 处理"写数据"事件(EVENT_WRITE):数据发送完成后触发
            else if (ctx->event == EVENT_WRITE)
            {
                ssize_t ret = cqe->res; // ret是send的返回值:>0为发送字节数;<=0为失败

                if (ret > 0) // 数据发送成功
                {
                    // 切换事件类型为"读",重新注册读事件(等待客户端下一次发送数据)
                    ctx->event = EVENT_READ;
                    set_event_recv(&ring, ctx);
                }
                else // 发送失败(如客户端已关闭)
                {
                    close(ctx->fd); // 关闭连接socket
                    free(ctx);      // 释放连接上下文
                }
            }

            // 标记事件已处理:告知内核该CQE可被复用(推进完成队列指针)
            io_uring_cqe_seen(&ring, cqe);
        }
    }

    // 资源清理(理论上主循环不会退出,实际应用中需在信号处理中调用)
    io_uring_queue_exit(&ring); // 销毁io_uring实例(释放内核资源)
    close(sockfd);              // 关闭监听socket
    return 0;
}

3. test_qps_tcpclient.c

#include <stdio.h>
#include <string.h>
#include <sys/socket.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/time.h>
#include <pthread.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <sys/errno.h>

#define RBUFFER_LENGTH 128 // 128 512 1024 2048
#define WBUFFER_LENGTH 128

// ./test_qps_tcpclient -s 192.168.248.130 -p 8000 -t 50 -c 100 -n 10000

// 宏定义:1=非阻塞模式(有超时、休眠时间,测得不准确,好处是不会卡死线程),0=阻塞模式(测得准确,阻塞时CPU会切换到别的线程,但是可能会一直阻塞导致该线程卡死,结果一直出不来)
// 建议使用阻塞模式,测得更准确,遇到线程卡死的情况再选择非阻塞
#define USE_NONBLOCK 0

typedef struct test_context_s
{
	char serverip[16];
	int port;
	int threadnum;
	int connection;
	int requestion;
	char buffer[RBUFFER_LENGTH];

	int conns_per_thread;
	int failed;
	pthread_mutex_t mutex; // 线程安全锁
} test_context_t;

// 设置非阻塞模式(仅非阻塞模式使用)
int SetNoblock(int fd)
{
	int flag = fcntl(fd, F_GETFL);
	return fcntl(fd, F_SETFL, flag | O_NONBLOCK) == 0;
}

// 计算时间差(毫秒)
#define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)

// 连接服务器(阻塞/非阻塞模式通用)
int connect_tcpserver(const char *ip, unsigned short port, int timeout_ms)
{
	int connfd = socket(AF_INET, SOCK_STREAM, 0);
	if (connfd < 0)
	{
		perror("socket");
		return -1;
	}

	struct sockaddr_in tcpserver_addr;
	memset(&tcpserver_addr, 0, sizeof(struct sockaddr_in));
	tcpserver_addr.sin_family = AF_INET;
	tcpserver_addr.sin_addr.s_addr = inet_addr(ip);
	tcpserver_addr.sin_port = htons(port);

	// 阻塞模式:直接阻塞连接
	int ret = connect(connfd, (struct sockaddr *)&tcpserver_addr, sizeof(struct sockaddr_in));
	if (ret != 0)
	{
		perror("connect (block)");
		close(connfd);
		return -1;
	}
#if USE_NONBLOCK
	SetNoblock(connfd);
#endif

	return connfd;
}

// 发送函数(阻塞/非阻塞模式通用)
int Send(int fd, char *wbuffer, int len, int timeout_ms)
{
#if USE_NONBLOCK
	// 非阻塞模式:带超时循环发送
	struct timeval start, now;
	gettimeofday(&start, NULL);
	size_t total = 0;
	while (total < len)
	{
		gettimeofday(&now, NULL);
		int elapsed = TIME_SUB_MS(now, start);
		if (elapsed >= timeout_ms)
		{
			printf("Send timeout (elapsed: %d ms)\n", elapsed);
			return -1;
		}

		ssize_t sent = send(fd, wbuffer + total, len - total, 0);
		if (sent < 0)
		{
			if (errno == EAGAIN || errno == EWOULDBLOCK)
			{
				usleep(1); // 短暂休眠,减少CPU占用
				continue;
			}
			else
			{
				printf("send errno: %d --> %s\n", errno, strerror(errno));
				return -1;
			}
		}
		if (sent == 0)
		{
			return -1; // 连接关闭
		}
		total += sent;
	}
#else
	// 阻塞模式:循环发送直到完成(无超时,依赖系统默认超时)
	size_t total = 0;
	while (total < len)
	{
		ssize_t sent = send(fd, wbuffer + total, len - total, 0);
		if (sent <= 0)
		{
			printf("send errno: %d --> %s\n", errno, strerror(errno));
			return -1;
		}
		total += sent;
	}
#endif
	return 0;
}

// 接收函数(阻塞/非阻塞模式通用)
int Recv(int fd, char *rbuffer, int len, int timeout_ms)
{
#if USE_NONBLOCK
	// 非阻塞模式:带超时循环接收
	struct timeval start, now;
	gettimeofday(&start, NULL);
	size_t total = 0;
	while (total < len)
	{
		gettimeofday(&now, NULL);
		int elapsed = TIME_SUB_MS(now, start);
		if (elapsed >= timeout_ms)
		{
			printf("Recv timeout (elapsed: %d ms)\n", elapsed);
			return -1;
		}

		ssize_t ret = recv(fd, rbuffer + total, len - total, 0);
		if (ret < 0)
		{
			if (errno == EAGAIN || errno == EWOULDBLOCK)
			{
				usleep(1); // 短暂休眠,减少CPU占用
				continue;
			}
			else
			{
				printf("recv errno: %d --> %s\n", errno, strerror(errno));
				return -1;
			}
		}
		if (ret == 0)
		{
			return -1; // 连接关闭
		}
		total += ret;
	}
#else
	// 阻塞模式:循环接收直到完成(无超时,依赖系统默认超时)
	size_t total = 0;
	while (total < len)
	{
		ssize_t ret = recv(fd, rbuffer + total, len - total, 0);
		if (ret <= 0)
		{
			printf("recv errno: %d --> %s\n", errno, strerror(errno));
			return -1;
		}
		total += ret;
	}
#endif
	return 0;
}

// 发送接收并校验
int send_recv_tcppkt(int fd, test_context_t *ctx)
{
	// 发送超时:5秒,接收超时:5秒(可根据需求调整)
	int res = Send(fd, ctx->buffer, WBUFFER_LENGTH, 5000);
	if (res < 0)
	{
		return -1;
	}

	char rbuffer[WBUFFER_LENGTH] = {0};
	res = Recv(fd, rbuffer, WBUFFER_LENGTH, 5000);
	if (res < 0)
	{
		return -1;
	}

	if (memcmp(rbuffer, ctx->buffer, WBUFFER_LENGTH) != 0)
	{
		printf("data mismatch\n");
		return -1;
	}
	return 0;
}

// 线程入口:按连接数分摊请求
static void *test_qps_entry(void *arg)
{
	test_context_t *pctx = (test_context_t *)arg;

	// 计算当前线程需创建的连接数
	int conns_per_thread = pctx->conns_per_thread;

	// 计算每个连接需处理的请求数
	int reqs_per_conn = pctx->requestion / pctx->connection;
	int remainder_reqs = pctx->requestion % pctx->connection;

	for (int conn_idx = 0; conn_idx < conns_per_thread; conn_idx++)
	{
		// 连接超时:3秒(非阻塞模式生效,阻塞模式依赖系统默认)
		int connfd = connect_tcpserver(pctx->serverip, pctx->port, 3000);
		if (connfd < 0)
		{
			// 连接失败,标记该连接的所有请求为失败
			int reqs = reqs_per_conn;
			if (conn_idx < remainder_reqs)
			{
				reqs += 1;
			}
			pthread_mutex_lock(&pctx->mutex);
			pctx->failed += reqs;
			pthread_mutex_unlock(&pctx->mutex);
			continue;
		}

		// 处理该连接的所有请求
		int reqs = reqs_per_conn;
		if (conn_idx < remainder_reqs)
		{
			reqs += 1;
		}
		for (int i = 0; i < reqs; i++)
		{
			int res = send_recv_tcppkt(connfd, pctx);
			if (res != 0)
			{
				pthread_mutex_lock(&pctx->mutex);
				pctx->failed++;
				pthread_mutex_unlock(&pctx->mutex);
			}
		}

		close(connfd);
	}

	return NULL;
}

int main(int argc, char *argv[])
{
	int ret = 0;
	test_context_t ctx = {0};
	pthread_mutex_init(&ctx.mutex, NULL); // 初始化互斥锁

	int opt;
	while ((opt = getopt(argc, argv, "s:p:t:c:n:?")) != -1)
	{
		switch (opt)
		{
		case 's': // ip
			printf("-s: %s\n", optarg);
			strcpy(ctx.serverip, optarg);
			break;
		case 'p': // port
			printf("-p: %s\n", optarg);
			ctx.port = atoi(optarg);
			break;
		case 't': // 线程数
			printf("-t: %s\n", optarg);
			ctx.threadnum = atoi(optarg);
			break;
		case 'c': // 连接数
			printf("-c: %s\n", optarg);
			ctx.connection = atoi(optarg);
			break;
		case 'n': // 请求数
			printf("-n: %s\n", optarg);
			ctx.requestion = atoi(optarg);
			break;
		default:
			return -1;
		}
	}

	// 初始化发送缓冲区(二进制数据)
	for (int i = 0; i < WBUFFER_LENGTH; i++)
	{
		ctx.buffer[i] = 'D';
	}
	ctx.buffer[WBUFFER_LENGTH - 1] = '\0'; // 保持与服务器回显一致

	pthread_t *ptid = malloc(ctx.threadnum * sizeof(pthread_t));
	if (!ptid)
	{
		perror("malloc ptid failed");
		ret = -1;
		goto clean;
	}
	
	int remainder_conns = ctx.connection % ctx.threadnum;
	if (remainder_conns == 0)
	{
		 ctx.conns_per_thread = ctx.connection / ctx.threadnum;
		 printf("开始执行:总连接数 %d,线程数 %d → 每个线程基础创建 %d 个连接,回显的数据大小:%d bytes\n",
           ctx.connection, ctx.threadnum, ctx.conns_per_thread, WBUFFER_LENGTH);
	}
	else
	{
		printf("无法均匀分配连接数,这里增加 %d 个连接\n", ctx.threadnum - remainder_conns);
		ctx.connection += ctx.threadnum - remainder_conns;
		ctx.conns_per_thread = ctx.connection / ctx.threadnum;
		 printf("开始执行:总连接数 %d,线程数 %d → 每个线程基础创建 %d 个连接,回显的数据大小:%d bytes\n",
           ctx.connection, ctx.threadnum, ctx.conns_per_thread, WBUFFER_LENGTH);

	}
	

	struct timeval tv_begin;
	gettimeofday(&tv_begin, NULL);
	for (int i = 0; i < ctx.threadnum; i++)
	{
		if (pthread_create(&ptid[i], NULL, test_qps_entry, &ctx) != 0)
		{
			perror("pthread_create failed");
			ret = -1;
			for (int j = 0; j < i; j++)
			{
				pthread_join(ptid[j], NULL);
			}
			goto clean;
		}
	}

	for (int i = 0; i < ctx.threadnum; i++)
	{
		pthread_join(ptid[i], NULL);
	}

	struct timeval tv_end;
	gettimeofday(&tv_end, NULL);

	int time_used = TIME_SUB_MS(tv_end, tv_begin);
	printf("模式: %s, 请求数:total: %d, success: %d, failed: %d, time_used: %d ms, qps: %d\n",
		   USE_NONBLOCK ? "非阻塞" : "阻塞",
		   ctx.requestion,
		   ctx.requestion - ctx.failed, 
		   ctx.failed, time_used,
		   (time_used > 0) ? ((ctx.requestion - ctx.failed) * 1000 / time_used) : 0);

clean:
	free(ptid);
	pthread_mutex_destroy(&ctx.mutex);
	return ret;
}

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐