Linux C/C++ 学习日记(46):io_uring(二):reactor 与proactor的性能测试对比
本文对比了Reactor和Proactor两种I/O模型的性能差异。测试结果显示,Proactor(基于io_uring)在处理不同数据包大小时QPS更高,特别是处理2KB数据时Proactor的QPS达到175,000,而Reactor(基于epoll)为148,000。文章包含完整的测试代码:epoll_tcp_server.c实现Reactor模式,uring_tcp_server.c实现P
·
注:该文用于个人学习记录和知识交流,如有不足,欢迎指点。
一、proreactor 与 reactor的区别
reactor:
- poll + io
- EPOLLIN:数据可以读
- 一个事件对应一个动作
proactor:
- io_uring
- EVENT_READ:数据已经读出来了
- 一个事件对应一个结果
二、测试维度:
- 建链时间(建立100万个连接的耗时)
- qps:每秒处理的响应数:(测试数据大小依次为128,512,1k,2k的回显响应)
- io并发 (能否建立100万个连接)
三、 测试不同连接数情况下的qps(客户端一线程一个连接)
1. reactor
连接数:1

连接数:2

连接数:3

连接数:4

连接数:5

连接数:6

连接数:7

在高并发下极限是6万多
- 为什么随着连接越来越多的时候,qps逐渐增多。
- 是因为,单连接的时候,限制qps的是用户发送请求的数量,而非服务器处理本身。
- 当连接数增加到6之后(每个连接都分配一个线程发送请求,即6个连接同时发送请求)qps不在增加,而是稳定到6万多,说明这是服务器的处理极限
- 当然也不是很严谨,毕竟我这个是单个客户端创建6个线程来模拟6条连接同时发送请求,但是我的CPU处理器只有4个,CPU是轮询来处理的,严格来讲也不是同时发送的;
2. proactor
连接数:1

连接数:2

连接数:3

连接数:4

连接数:5

连接数:6

连接数:7

最终测得的也是6万多左右,说明异步io的性能媲美epoll+同步io。
三、测试字节数变化qps的对比
在连接数位7的情况下
1. reactor的测试:epoll+io
字节:128

字节:512

字节:1024

字节:2048

2. proactor的测试:
字节:128

字节:512

字节:1024

字节:2048

五、测试代码:
1. epoll_tcp_server.c (reactor模式)
#include <sys/socket.h>
#include <errno.h>
#include <netinet/in.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <sys/poll.h>
#include <sys/epoll.h>
#include <sys/time.h>
#define BUFFER_LENGTH 2048
#define CONNECTIONS 1000 // 1048576
#define PORT 8000
#define PORT_COUNT 1 // 100
typedef int (*RCALLBACK)(int fd);
int accept_cb(int fd);
int recv_cb(int fd);
int send_cb(int fd);
struct conn_item
{
int fd;
char rbuffer[BUFFER_LENGTH];
int rlen;
char wbuffer[BUFFER_LENGTH];
int wlen;
union
{
RCALLBACK accept_callback;
RCALLBACK recv_callback;
} recv_t;
RCALLBACK send_callback;
};
int epfd = 0;
struct conn_item connlist[CONNECTIONS] = {0};
struct timeval tv;
#define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)
int set_event(int fd, int event, int flag)
{
if (flag)
{
struct epoll_event ev;
ev.events = event;
ev.data.fd = fd;
epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev);
}
else
{
struct epoll_event ev;
ev.events = event;
ev.data.fd = fd;
epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);
}
}
int accept_cb(int fd)
{
struct sockaddr_in clientaddr;
socklen_t len = sizeof(clientaddr);
int clientfd = accept(fd, (struct sockaddr *)&clientaddr, &len);
if (clientfd < 0)
{
return -1;
}
set_event(clientfd, EPOLLIN, 1);
connlist[clientfd].fd = clientfd;
memset(connlist[clientfd].rbuffer, 0, BUFFER_LENGTH);
connlist[clientfd].rlen = 0;
memset(connlist[clientfd].wbuffer, 0, BUFFER_LENGTH);
connlist[clientfd].wlen = 0;
connlist[clientfd].recv_t.recv_callback = recv_cb;
connlist[clientfd].send_callback = send_cb;
if ((clientfd % 1000) == 999)
{
struct timeval tv_cur;
gettimeofday(&tv_cur, NULL);
int time_used = TIME_SUB_MS(tv_cur, tv);
memcpy(&tv, &tv_cur, sizeof(struct timeval));
printf("clientfd : %d, time_used: %d\n", clientfd, time_used);
}
return clientfd;
}
int recv_cb(int fd)
{
char *buffer = connlist[fd].rbuffer;
int idx = connlist[fd].rlen;
int count = recv(fd, buffer + idx, BUFFER_LENGTH - idx, 0);
if (count == 0)
{
// printf("disconnect\n");
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
close(fd);
connlist[fd].fd = 0;
return -1;
}
connlist[fd].rlen += count;
memcpy(connlist[fd].wbuffer, connlist[fd].rbuffer, connlist[fd].rlen);
connlist[fd].wlen = connlist[fd].rlen;
connlist[fd].rlen -= connlist[fd].rlen;
set_event(fd, EPOLLOUT, 0);
return count;
}
int send_cb(int fd)
{
char *buffer = connlist[fd].wbuffer;
int idx = connlist[fd].wlen;
int count = send(fd, buffer, idx, 0);
set_event(fd, EPOLLIN, 0);
return count;
}
int init_server(unsigned short port)
{
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
int reuse = 1;
if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) < 0)
{
return -1;
}
struct sockaddr_in serveraddr;
memset(&serveraddr, 0, sizeof(struct sockaddr_in));
serveraddr.sin_family = AF_INET;
serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
serveraddr.sin_port = htons(port);
if (-1 == bind(sockfd, (struct sockaddr *)&serveraddr, sizeof(struct sockaddr)))
{
perror("bind");
return -1;
}
listen(sockfd, 10);
return sockfd;
}
int close_server()
{
for (int i = 0; i < PORT_COUNT + 100; i++)
{
if (connlist[i].fd != 0)
{
close(connlist[i].fd);
}
}
}
// tcp
int main()
{
int i = 0;
epfd = epoll_create(1); // int size
for (i = 0; i < PORT_COUNT; i++)
{
int sockfd = init_server(PORT + i);
connlist[sockfd].fd = sockfd;
connlist[sockfd].recv_t.accept_callback = accept_cb;
set_event(sockfd, EPOLLIN, 1);
printf("listen at: %d\n", PORT + i);
}
gettimeofday(&tv, NULL);
struct epoll_event events[CONNECTIONS] = {0};
while (1)
{
int nready = epoll_wait(epfd, events, CONNECTIONS, -1); //
int i = 0;
for (i = 0; i < nready; i++)
{
int connfd = events[i].data.fd;
if (events[i].events & EPOLLIN)
{ //
int count = connlist[connfd].recv_t.recv_callback(connfd);
// printf("recv count: %d <-- buffer: %s\n", count, connlist[connfd].rbuffer);
}
else if (events[i].events & EPOLLOUT)
{
// printf("send --> buffer: %s\n", connlist[connfd].wbuffer);
int count = connlist[connfd].send_callback(connfd);
}
}
}
close_server();
}
2. uring_tcp_server
#include <stdio.h> // 标准输入输出(打印日志、错误信息)
#include <liburing.h> // io_uring库(Linux异步IO框架,替代epoll实现高效IO)
#include <netinet/in.h> // 网络地址结构(sockaddr_in等IPv4相关定义)
#include <string.h> // 内存操作(memset、memcpy等)
#include <unistd.h> // 系统调用(close等)
#include <stdlib.h> // 动态内存管理(malloc、free等)
// 编译:gcc -o uring_tcp_server uring_tcp_server.c -luring
// 事件类型宏定义:用于标识io_uring中待处理的事件类型
#define EVENT_ACCEPT 0 // 接受客户端连接事件(监听socket专用)
#define EVENT_READ 1 // 从客户端读取数据事件(客户端连接专用)
#define EVENT_WRITE 2 // 向客户端写入数据事件(客户端连接专用)
#define PORT 8000
#define BUFFER_LENGTH 2048
/**
* 连接上下文结构体:每个客户端连接的专属数据载体
* 作用:绑定连接的文件描述符、缓冲区及事件状态,避免多连接数据冲突
*/
struct conn_ctx
{
int fd; // 连接的socket文件描述符(监听socket/客户端socket)
int event; // 当前等待的事件类型(对应EVENT_*宏)
char rbuffer[BUFFER_LENGTH]; // 专属读缓冲区:存储从客户端读取的数据(1024字节)
char wbuffer[BUFFER_LENGTH]; // 专属写缓冲区:存储待发送给客户端的数据(1024字节)
ssize_t rlen; // 实际读取的字节数(记录rbuffer中有效数据长度)
ssize_t wlen; // 待发送的字节数(记录wbuffer中有效数据长度)
};
/**
* 初始化TCP服务器监听socket
* @param port 服务器监听的端口号
* @return 成功返回监听socket的文件描述符,失败返回-1
*/
int init_server(unsigned short port)
{
// 1. 创建TCP socket(IPv4协议族,字节流套接字)
// AF_INET:IPv4地址族;SOCK_STREAM:TCP协议(面向连接);0:默认协议(TCP)
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd == -1)
{
perror("socket创建失败"); // 打印错误原因(如权限不足、系统资源耗尽)
return -1;
}
int reuse = 1;
if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) < 0)
{
return -1;
}
// 2. 初始化服务器地址结构
struct sockaddr_in serveraddr;
memset(&serveraddr, 0, sizeof(struct sockaddr_in)); // 清空结构体(避免随机值干扰)
serveraddr.sin_family = AF_INET; // 使用IPv4协议
serveraddr.sin_addr.s_addr = htonl(INADDR_ANY); // 监听所有本地网卡(0.0.0.0)
serveraddr.sin_port = htons(port); // 端口号转换为网络字节序(大端)
// 3. 绑定socket到指定端口(将socket与服务器地址关联)
if (-1 == bind(sockfd, (struct sockaddr *)&serveraddr, sizeof(struct sockaddr)))
{
perror("bind绑定端口失败"); // 错误可能:端口被占用、无权限使用特权端口
close(sockfd); // 绑定失败需关闭socket释放资源
return -1;
}
// 4. 开始监听连接(将主动socket转为被动socket,准备接受客户端连接)
// 第二个参数1024:未完成连接队列(三次握手未完成)的最大长度(高并发场景需调大)
listen(sockfd, 1024);
return sockfd; // 返回监听socket的文件描述符
}
// io_uring提交队列(SQ)的最大长度:最多同时等待处理1024个IO事件
#define ENTRIES_LENGTH 1024
/**
* 向io_uring注册"异步读"事件
* 作用:告知内核"当该socket有数据可读时,读取数据到专属缓冲区"
* @param ring io_uring实例(管理提交队列和完成队列)
* @param ctx 连接上下文(包含socket fd、读缓冲区等)
* @return 0成功,-1失败
*/
int set_event_recv(struct io_uring *ring, struct conn_ctx *ctx)
{
// 从io_uring的提交队列(SQ)中获取一个空闲的队列项(SQE)
// SQE(Submission Queue Entry):用户态描述待执行的IO操作(此处为读操作)
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
if (!sqe) // 若提交队列已满,获取失败
{
perror("获取SQE失败");
return -1;
}
// 填充SQE:设置异步读操作(底层对应recv系统调用)
// 参数:sqe队列项、目标socket fd、读缓冲区(专属rbuffer)、最大读取长度、 flags(0为默认)
io_uring_prep_recv(sqe, ctx->fd, ctx->rbuffer, sizeof(ctx->rbuffer), 0);
// 将连接上下文指针存入SQE的user_data:事件完成时通过该字段定位到对应的连接
sqe->user_data = (unsigned long)ctx; // user_data是个64位的无符号整数,我们这里做个强转
return 0;
}
/**
* 向io_uring注册"异步写"事件
* 作用:告知内核"当该socket可写时,将专属缓冲区的数据发送给客户端"
* @param ring io_uring实例
* @param ctx 连接上下文(包含socket fd、写缓冲区及待发送长度)
* @return 0成功,-1失败
*/
int set_event_send(struct io_uring *ring, struct conn_ctx *ctx)
{
// 获取一个空闲的SQE(提交队列项)
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
if (!sqe)
{
perror("获取SQE失败");
return -1;
}
// 填充SQE:设置异步写操作(底层对应send系统调用)
// 参数:sqe队列项、目标socket fd、写缓冲区(专属wbuffer)、待发送长度(rlen记录的读取长度)
io_uring_prep_send(sqe, ctx->fd, ctx->wbuffer, ctx->rlen, 0);
// 存储连接上下文指针:事件完成时定位到对应的连接
sqe->user_data = (unsigned long)ctx;
return 0;
}
/**
* 向io_uring注册"异步接受连接"事件
* 作用:告知内核"当有新客户端连接时,接受连接并返回新的socket fd"
* @param ring io_uring实例
* @param sockfd 监听socket的文件描述符
* @return 0成功,-1失败
*/
int set_event_accept(struct io_uring *ring, int sockfd)
{
// 获取一个空闲的SQE(提交队列项)
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
if (!sqe)
{
perror("获取SQE失败");
return -1;
}
// 为接受连接操作分配临时上下文(仅用于存储监听socket信息)
struct conn_ctx *listen_ctx = malloc(sizeof(struct conn_ctx));
if (!listen_ctx)
{
perror("malloc listen_ctx失败");
return -1;
}
listen_ctx->fd = sockfd; // 绑定监听socket fd
listen_ctx->event = EVENT_ACCEPT; // 标记为接受连接事件
// 分配客户端地址结构(用于存储新连接的客户端IP和端口)
struct sockaddr_in *clientaddr = malloc(sizeof(struct sockaddr_in));
socklen_t *len = malloc(sizeof(socklen_t)); // 地址长度变量
*len = sizeof(struct sockaddr_in); // 初始化长度为IPv4地址结构大小
// 填充SQE:设置异步接受连接操作(底层对应accept系统调用)
// 参数:sqe队列项、监听socket fd、客户端地址指针、地址长度指针、flags(0为默认)
io_uring_prep_accept(sqe, sockfd, (struct sockaddr *)clientaddr, len, 0);
// 存储临时上下文指针:事件完成时通过该字段获取监听信息
sqe->user_data = (unsigned long)listen_ctx;
// 临时存储客户端地址和长度的指针(避免使用全局变量,保证线程安全)
// 利用listen_ctx的缓冲区暂存指针,后续从这里恢复
memcpy(listen_ctx->wbuffer, &clientaddr, sizeof(void *));
memcpy(listen_ctx->rbuffer, &len, sizeof(void *));
return 0;
}
int main(int argc, char *argv[])
{
// 初始化服务器:创建并返回监听socket
int sockfd = init_server(PORT);
if (sockfd == -1) // 初始化失败则退出
{
return -1;
}
printf("服务器启动,监听端口 %d\n", PORT);
// 初始化io_uring
struct io_uring_params params; // io_uring初始化参数(使用默认配置)
memset(¶ms, 0, sizeof(params)); // 清空参数
struct io_uring ring; // io_uring实例(管理提交队列SQ和完成队列CQ)
// 初始化io_uring:创建容量为ENTRIES_LENGTH的队列
// 内部通过io_uring_setup系统调用创建内核队列,并通过mmap映射到用户态(减少数据拷贝)
if (io_uring_queue_init_params(ENTRIES_LENGTH, &ring, ¶ms) != 0)
{
perror("io_uring初始化失败");
close(sockfd); // 失败需释放监听socket
return -1;
}
// 注册初始事件:向io_uring提交"接受连接"事件(开始监听新连接)
set_event_accept(&ring, sockfd);
// 主循环:持续处理io_uring中的事件(接受连接、读数据、写数据)
while (1)
{
// 1. 提交所有已准备好的SQE(提交队列中的事件)到内核
// 内核会异步处理这些IO操作,完成后将结果放入完成队列CQ(无需用户主动轮询)
io_uring_submit(&ring);
// 2. 批量获取完成队列(CQ)中的事件(最多128个)
// 相比单次获取,批量处理减少用户态与内核态交互,提升效率
struct io_uring_cqe *cqes[128]; // 存储完成的事件
// 从完成队列中获取事件,返回实际获取的数量(nready)
int nready = io_uring_peek_batch_cqe(&ring, cqes, 128);
if (nready < 0) // 获取失败(如系统调用错误)
{
perror("io_uring获取完成事件失败");
break;
}
// 3. 遍历处理每个完成的事件
for (int i = 0; i < nready; i++)
{
struct io_uring_cqe *cqe = cqes[i]; // 当前完成的事件
// 从事件的user_data中恢复连接上下文(之前注册时存入的指针)
struct conn_ctx *ctx = (struct conn_ctx *)(cqe->user_data);
// 处理"接受连接"事件(EVENT_ACCEPT)
if (ctx->event == EVENT_ACCEPT)
{
// 恢复客户端地址和长度的指针(从临时上下文的缓冲区中)
struct sockaddr_in **clientaddr = (struct sockaddr_in **)ctx->wbuffer;
socklen_t **len = (socklen_t **)ctx->rbuffer;
// 重新注册"接受连接"事件(一次accept只能处理一个连接,需持续监听新连接)
set_event_accept(&ring, sockfd);
// 从完成事件中获取新连接的socket fd(cqe->res是accept的返回值)
int connfd = cqe->res;
if (connfd == -1) // 接受连接失败(如队列满)
{
perror("接受连接失败");
}
else // 接受连接成功
{
// 为新连接分配专属上下文(含独立缓冲区,避免多连接数据冲突)
struct conn_ctx *new_ctx = malloc(sizeof(struct conn_ctx));
if (!new_ctx) // 内存分配失败
{
perror("malloc new_ctx失败");
close(connfd); // 关闭新连接,避免资源泄漏
}
else // 初始化新连接上下文
{
memset(new_ctx, 0, sizeof(struct conn_ctx)); // 清空缓冲区
new_ctx->fd = connfd; // 绑定新连接的fd
new_ctx->event = EVENT_READ; // 初始事件为"读"(等待客户端发数据)
// 注册读事件:告知内核"当该连接有数据时,读取到专属rbuffer"
set_event_recv(&ring, new_ctx);
}
}
// 释放临时资源(监听上下文、客户端地址和长度)
free(*clientaddr); // 释放客户端地址结构
free(*len); // 释放地址长度变量
free(ctx); // 释放监听上下文
}
// 处理"读数据"事件(EVENT_READ):客户端发送数据后触发
else if (ctx->event == EVENT_READ)
{
ssize_t ret = cqe->res; // ret是recv的返回值:>0为读取字节数;0为客户端关闭;-1为错误
if (ret <= 0) // 客户端关闭连接或读取失败
{
close(ctx->fd); // 关闭连接socket
free(ctx); // 释放连接上下文(避免内存泄漏)
}
else // 成功读取到数据(ret为实际读取的字节数)
{
ctx->rlen = ret; // 记录读取长度(用于后续回显)
// 将读缓冲区的数据复制到写缓冲区(准备回显,使用专属缓冲区避免冲突)
memcpy(ctx->wbuffer, ctx->rbuffer, ret);
// 切换事件类型为"写",并注册写事件(告知内核"可写时发送数据")
ctx->event = EVENT_WRITE;
set_event_send(&ring, ctx);
}
}
// 处理"写数据"事件(EVENT_WRITE):数据发送完成后触发
else if (ctx->event == EVENT_WRITE)
{
ssize_t ret = cqe->res; // ret是send的返回值:>0为发送字节数;<=0为失败
if (ret > 0) // 数据发送成功
{
// 切换事件类型为"读",重新注册读事件(等待客户端下一次发送数据)
ctx->event = EVENT_READ;
set_event_recv(&ring, ctx);
}
else // 发送失败(如客户端已关闭)
{
close(ctx->fd); // 关闭连接socket
free(ctx); // 释放连接上下文
}
}
// 标记事件已处理:告知内核该CQE可被复用(推进完成队列指针)
io_uring_cqe_seen(&ring, cqe);
}
}
// 资源清理(理论上主循环不会退出,实际应用中需在信号处理中调用)
io_uring_queue_exit(&ring); // 销毁io_uring实例(释放内核资源)
close(sockfd); // 关闭监听socket
return 0;
}
3. test_qps_tcpclient.c
#include <stdio.h>
#include <string.h>
#include <sys/socket.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/time.h>
#include <pthread.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <sys/errno.h>
#define RBUFFER_LENGTH 128 // 128 512 1024 2048
#define WBUFFER_LENGTH 128
// ./test_qps_tcpclient -s 192.168.248.130 -p 8000 -t 50 -c 100 -n 10000
// 宏定义:1=非阻塞模式(有超时、休眠时间,测得不准确,好处是不会卡死线程),0=阻塞模式(测得准确,阻塞时CPU会切换到别的线程,但是可能会一直阻塞导致该线程卡死,结果一直出不来)
// 建议使用阻塞模式,测得更准确,遇到线程卡死的情况再选择非阻塞
#define USE_NONBLOCK 0
typedef struct test_context_s
{
char serverip[16];
int port;
int threadnum;
int connection;
int requestion;
char buffer[RBUFFER_LENGTH];
int conns_per_thread;
int failed;
pthread_mutex_t mutex; // 线程安全锁
} test_context_t;
// 设置非阻塞模式(仅非阻塞模式使用)
int SetNoblock(int fd)
{
int flag = fcntl(fd, F_GETFL);
return fcntl(fd, F_SETFL, flag | O_NONBLOCK) == 0;
}
// 计算时间差(毫秒)
#define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)
// 连接服务器(阻塞/非阻塞模式通用)
int connect_tcpserver(const char *ip, unsigned short port, int timeout_ms)
{
int connfd = socket(AF_INET, SOCK_STREAM, 0);
if (connfd < 0)
{
perror("socket");
return -1;
}
struct sockaddr_in tcpserver_addr;
memset(&tcpserver_addr, 0, sizeof(struct sockaddr_in));
tcpserver_addr.sin_family = AF_INET;
tcpserver_addr.sin_addr.s_addr = inet_addr(ip);
tcpserver_addr.sin_port = htons(port);
// 阻塞模式:直接阻塞连接
int ret = connect(connfd, (struct sockaddr *)&tcpserver_addr, sizeof(struct sockaddr_in));
if (ret != 0)
{
perror("connect (block)");
close(connfd);
return -1;
}
#if USE_NONBLOCK
SetNoblock(connfd);
#endif
return connfd;
}
// 发送函数(阻塞/非阻塞模式通用)
int Send(int fd, char *wbuffer, int len, int timeout_ms)
{
#if USE_NONBLOCK
// 非阻塞模式:带超时循环发送
struct timeval start, now;
gettimeofday(&start, NULL);
size_t total = 0;
while (total < len)
{
gettimeofday(&now, NULL);
int elapsed = TIME_SUB_MS(now, start);
if (elapsed >= timeout_ms)
{
printf("Send timeout (elapsed: %d ms)\n", elapsed);
return -1;
}
ssize_t sent = send(fd, wbuffer + total, len - total, 0);
if (sent < 0)
{
if (errno == EAGAIN || errno == EWOULDBLOCK)
{
usleep(1); // 短暂休眠,减少CPU占用
continue;
}
else
{
printf("send errno: %d --> %s\n", errno, strerror(errno));
return -1;
}
}
if (sent == 0)
{
return -1; // 连接关闭
}
total += sent;
}
#else
// 阻塞模式:循环发送直到完成(无超时,依赖系统默认超时)
size_t total = 0;
while (total < len)
{
ssize_t sent = send(fd, wbuffer + total, len - total, 0);
if (sent <= 0)
{
printf("send errno: %d --> %s\n", errno, strerror(errno));
return -1;
}
total += sent;
}
#endif
return 0;
}
// 接收函数(阻塞/非阻塞模式通用)
int Recv(int fd, char *rbuffer, int len, int timeout_ms)
{
#if USE_NONBLOCK
// 非阻塞模式:带超时循环接收
struct timeval start, now;
gettimeofday(&start, NULL);
size_t total = 0;
while (total < len)
{
gettimeofday(&now, NULL);
int elapsed = TIME_SUB_MS(now, start);
if (elapsed >= timeout_ms)
{
printf("Recv timeout (elapsed: %d ms)\n", elapsed);
return -1;
}
ssize_t ret = recv(fd, rbuffer + total, len - total, 0);
if (ret < 0)
{
if (errno == EAGAIN || errno == EWOULDBLOCK)
{
usleep(1); // 短暂休眠,减少CPU占用
continue;
}
else
{
printf("recv errno: %d --> %s\n", errno, strerror(errno));
return -1;
}
}
if (ret == 0)
{
return -1; // 连接关闭
}
total += ret;
}
#else
// 阻塞模式:循环接收直到完成(无超时,依赖系统默认超时)
size_t total = 0;
while (total < len)
{
ssize_t ret = recv(fd, rbuffer + total, len - total, 0);
if (ret <= 0)
{
printf("recv errno: %d --> %s\n", errno, strerror(errno));
return -1;
}
total += ret;
}
#endif
return 0;
}
// 发送接收并校验
int send_recv_tcppkt(int fd, test_context_t *ctx)
{
// 发送超时:5秒,接收超时:5秒(可根据需求调整)
int res = Send(fd, ctx->buffer, WBUFFER_LENGTH, 5000);
if (res < 0)
{
return -1;
}
char rbuffer[WBUFFER_LENGTH] = {0};
res = Recv(fd, rbuffer, WBUFFER_LENGTH, 5000);
if (res < 0)
{
return -1;
}
if (memcmp(rbuffer, ctx->buffer, WBUFFER_LENGTH) != 0)
{
printf("data mismatch\n");
return -1;
}
return 0;
}
// 线程入口:按连接数分摊请求
static void *test_qps_entry(void *arg)
{
test_context_t *pctx = (test_context_t *)arg;
// 计算当前线程需创建的连接数
int conns_per_thread = pctx->conns_per_thread;
// 计算每个连接需处理的请求数
int reqs_per_conn = pctx->requestion / pctx->connection;
int remainder_reqs = pctx->requestion % pctx->connection;
for (int conn_idx = 0; conn_idx < conns_per_thread; conn_idx++)
{
// 连接超时:3秒(非阻塞模式生效,阻塞模式依赖系统默认)
int connfd = connect_tcpserver(pctx->serverip, pctx->port, 3000);
if (connfd < 0)
{
// 连接失败,标记该连接的所有请求为失败
int reqs = reqs_per_conn;
if (conn_idx < remainder_reqs)
{
reqs += 1;
}
pthread_mutex_lock(&pctx->mutex);
pctx->failed += reqs;
pthread_mutex_unlock(&pctx->mutex);
continue;
}
// 处理该连接的所有请求
int reqs = reqs_per_conn;
if (conn_idx < remainder_reqs)
{
reqs += 1;
}
for (int i = 0; i < reqs; i++)
{
int res = send_recv_tcppkt(connfd, pctx);
if (res != 0)
{
pthread_mutex_lock(&pctx->mutex);
pctx->failed++;
pthread_mutex_unlock(&pctx->mutex);
}
}
close(connfd);
}
return NULL;
}
int main(int argc, char *argv[])
{
int ret = 0;
test_context_t ctx = {0};
pthread_mutex_init(&ctx.mutex, NULL); // 初始化互斥锁
int opt;
while ((opt = getopt(argc, argv, "s:p:t:c:n:?")) != -1)
{
switch (opt)
{
case 's': // ip
printf("-s: %s\n", optarg);
strcpy(ctx.serverip, optarg);
break;
case 'p': // port
printf("-p: %s\n", optarg);
ctx.port = atoi(optarg);
break;
case 't': // 线程数
printf("-t: %s\n", optarg);
ctx.threadnum = atoi(optarg);
break;
case 'c': // 连接数
printf("-c: %s\n", optarg);
ctx.connection = atoi(optarg);
break;
case 'n': // 请求数
printf("-n: %s\n", optarg);
ctx.requestion = atoi(optarg);
break;
default:
return -1;
}
}
// 初始化发送缓冲区(二进制数据)
for (int i = 0; i < WBUFFER_LENGTH; i++)
{
ctx.buffer[i] = 'D';
}
ctx.buffer[WBUFFER_LENGTH - 1] = '\0'; // 保持与服务器回显一致
pthread_t *ptid = malloc(ctx.threadnum * sizeof(pthread_t));
if (!ptid)
{
perror("malloc ptid failed");
ret = -1;
goto clean;
}
int remainder_conns = ctx.connection % ctx.threadnum;
if (remainder_conns == 0)
{
ctx.conns_per_thread = ctx.connection / ctx.threadnum;
printf("开始执行:总连接数 %d,线程数 %d → 每个线程基础创建 %d 个连接,回显的数据大小:%d bytes\n",
ctx.connection, ctx.threadnum, ctx.conns_per_thread, WBUFFER_LENGTH);
}
else
{
printf("无法均匀分配连接数,这里增加 %d 个连接\n", ctx.threadnum - remainder_conns);
ctx.connection += ctx.threadnum - remainder_conns;
ctx.conns_per_thread = ctx.connection / ctx.threadnum;
printf("开始执行:总连接数 %d,线程数 %d → 每个线程基础创建 %d 个连接,回显的数据大小:%d bytes\n",
ctx.connection, ctx.threadnum, ctx.conns_per_thread, WBUFFER_LENGTH);
}
struct timeval tv_begin;
gettimeofday(&tv_begin, NULL);
for (int i = 0; i < ctx.threadnum; i++)
{
if (pthread_create(&ptid[i], NULL, test_qps_entry, &ctx) != 0)
{
perror("pthread_create failed");
ret = -1;
for (int j = 0; j < i; j++)
{
pthread_join(ptid[j], NULL);
}
goto clean;
}
}
for (int i = 0; i < ctx.threadnum; i++)
{
pthread_join(ptid[i], NULL);
}
struct timeval tv_end;
gettimeofday(&tv_end, NULL);
int time_used = TIME_SUB_MS(tv_end, tv_begin);
printf("模式: %s, 请求数:total: %d, success: %d, failed: %d, time_used: %d ms, qps: %d\n",
USE_NONBLOCK ? "非阻塞" : "阻塞",
ctx.requestion,
ctx.requestion - ctx.failed,
ctx.failed, time_used,
(time_used > 0) ? ((ctx.requestion - ctx.failed) * 1000 / time_used) : 0);
clean:
free(ptid);
pthread_mutex_destroy(&ctx.mutex);
return ret;
}
更多推荐
所有评论(0)