【c++】基于reactor实现的简单p2p对话功能
即只有两个客户端没有服务端。当第二次握手“client端”还未接收期间,就接受到了“server端”的第一次握手,此时直接进入SYN_RECV状态,接受到第二次握手后,直接进入ESTABLISHED状态。
1 tcp三次握手连接基本步骤
1 server端bind(),listen(),进入LISTEN状态。此时创建了epoll此时epoll开始维护半就绪队列和就绪队列。
2 第一次握手:client端connect(),发送SYN+seq,进入SYN_SENT状态
3 第二次握手:server端接收到connect,返回SYN+ACK+seq+ack,进入SYN_RECV状态。此时更新半就绪队列。
4 第三次握手:client端发送ACK+seq+ack,进入ESTABLISHED,server端accept(),返回进的socket。服务端更新就绪队列,epoll取出对应io。
2 什么是p2p?
即只有两个客户端没有服务端。当第二次握手“client端”还未接收期间,就接受到了“server端”的第一次握手,此时直接进入SYN_RECV状态,接受到第二次握手后,直接进入ESTABLISHED状态。
3 代码实现p2p
1 两个客户端同时循环connect,直到满足上方p2p的条件。
2 创建两个线程:一个服务端reactor接受数据线程,一个客户端发送线程。
# 编译运行代码
# 注意:自行修改命令行ip、port和代码中的端口号
gcc -o p2p_r_cli p2p_r_cli.c
./p2p_r_cli 192.168.96.114 8000
gcc -o p2p_r_ser p2p_r_ser.c
./p2p_r_ser 192.168.96.114 8010
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <pthread.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
typedef void (*reactor_cb)(int fd, void *arg);
struct reactor {
int epfd;
int maxevents;
struct epoll_event *events;
};
struct reactor *reactor_create(int maxevents) {
struct reactor *r = calloc(1, sizeof(*r));
if (!r) return NULL;
r->epfd = epoll_create1(0);
if (r->epfd == -1) {
free(r);
return NULL;
}
r->maxevents = maxevents;
r->events = calloc(maxevents, sizeof(struct epoll_event));
if (!r->events) {
close(r->epfd);
free(r);
return NULL;
}
return r;
}
int reactor_add(struct reactor *r, int fd, uint32_t events, void *data) {
struct epoll_event ev;
ev.events = events;
ev.data.ptr = data;
return epoll_ctl(r->epfd, EPOLL_CTL_ADD, fd, &ev);
}
int reactor_mod(struct reactor *r, int fd, uint32_t events, void *data) {
struct epoll_event ev;
ev.events = events;
ev.data.ptr = data;
return epoll_ctl(r->epfd, EPOLL_CTL_MOD, fd, &ev);
}
int reactor_del(struct reactor *r, int fd) {
return epoll_ctl(r->epfd, EPOLL_CTL_DEL, fd, NULL);
}
void reactor_destroy(struct reactor *r) {
if (!r) return;
if (r->events) free(r->events);
if (r->epfd != -1) close(r->epfd);
free(r);
}
struct fd_ctx {
int fd;
reactor_cb cb;
void *arg;
};
void *reactor_loop(void *arg) {
struct reactor *r = arg;
while (1) {
int n = epoll_wait(r->epfd, r->events, r->maxevents, -1);
if (n == -1) {
if (errno == EINTR) continue;
perror("epoll_wait");
break;
}
for (int i = 0; i < n; ++i) {
struct epoll_event *e = &r->events[i];
struct fd_ctx *ctx = e->data.ptr;
if (!ctx) continue;
ctx->cb(ctx->fd, ctx->arg);
}
}
return NULL;
}
/* ---------- 客户端连接 ---------- */
struct client_arg {
int sockfd;
struct reactor *r;
struct fd_ctx ctx;
};
int connect_client(int fd, const char *ip, int port) {
struct sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = inet_addr(ip);
while (connect(fd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
if (errno == EINPROGRESS) { usleep(1000); continue; }
if (errno == EISCONN) break;
if (errno == ECONNREFUSED || errno == EINTR) { usleep(1000); continue; }
return -1;
}
return 0;
}
void on_recv(int fd, void *arg) {
char buf[512];
ssize_t n = recv(fd, buf, sizeof(buf)-1, 0);
if (n > 0) {
buf[n] = '\0';
printf("recv: %s\n", buf);
fflush(stdout);
} else if (n == 0) {
printf("peer closed\n");
close(fd);
struct client_arg *c = arg;
reactor_del(c->r, fd);
} else {
if (errno == EINTR || errno == EAGAIN) return;
perror("recv");
close(fd);
struct client_arg *c = arg;
reactor_del(c->r, fd);
}
}
void *stdin_thread(void *arg) {
struct client_arg *c = arg;
char buf[512];
while (1) {
printf("client > ");
if (fgets(buf, sizeof(buf), stdin) == NULL) break;
size_t len = strcspn(buf, "\n");
buf[len] = '\0';
if (strcmp(buf, "quit") == 0) break;
ssize_t s = send(c->sockfd, buf, len, 0);
if (s <= 0) {
perror("send");
break;
}
}
shutdown(c->sockfd, SHUT_WR);
return NULL;
}
int bind_localaddr(int port) {
int localfd = socket(AF_INET, SOCK_STREAM, 0);
if (localfd == -1) return -1;
struct sockaddr_in localaddr;
memset(&localaddr, 0, sizeof(localaddr));
localaddr.sin_family = AF_INET;
localaddr.sin_port = htons(port);
localaddr.sin_addr.s_addr = htonl(INADDR_ANY);
if (bind(localfd, (struct sockaddr *)&localaddr, sizeof(localaddr)) == -1) {
close(localfd);
return -1;
}
return localfd;
}
int main(int argc, char **argv) {
if (argc < 3) {
fprintf(stderr, "Usage: %s <server-ip> <server-port>\n", argv[0]);
return 1;
}
const char *ip = argv[1];
int port = atoi(argv[2]);
// 创建本地socket并绑定端口
int sockfd = bind_localaddr(8010);
if (sockfd < 0) {
perror("bind_localaddr");
return 1;
}
if (connect_client(sockfd, ip, port) < 0) {
perror("connect_client");
close(sockfd);
return 1;
}
printf("connected to %s:%d, fd=%d\n", ip, port, sockfd);
struct reactor *r = reactor_create(64);
if (!r) {
perror("reactor_create");
close(sockfd);
return 1;
}
struct client_arg c;
c.sockfd = sockfd;
c.r = r;
c.ctx.fd = sockfd;
c.ctx.cb = on_recv;
c.ctx.arg = &c;
if (reactor_add(r, sockfd, EPOLLIN | EPOLLRDHUP | EPOLLHUP, &c.ctx) == -1) {
perror("reactor_add");
reactor_destroy(r);
close(sockfd);
return 1;
}
pthread_t rtid, tid;
pthread_create(&rtid, NULL, reactor_loop, r);
pthread_create(&tid, NULL, stdin_thread, &c);
pthread_join(tid, NULL);
shutdown(sockfd, SHUT_RDWR);
close(sockfd);
sleep(1);
reactor_destroy(r);
pthread_cancel(rtid);
pthread_join(rtid, NULL);
return 0;
}
更多推荐
所有评论(0)