1 tcp三次握手连接基本步骤

        1 server端bind(),listen(),进入LISTEN状态。此时创建了epoll此时epoll开始维护半就绪队列和就绪队列。

        2 第一次握手:client端connect(),发送SYN+seq,进入SYN_SENT状态

        3 第二次握手:server端接收到connect,返回SYN+ACK+seq+ack,进入SYN_RECV状态。此时更新半就绪队列。

        4 第三次握手:client端发送ACK+seq+ack,进入ESTABLISHED,server端accept(),返回进的socket。服务端更新就绪队列,epoll取出对应io。

2 什么是p2p?

        即只有两个客户端没有服务端。当第二次握手“client端”还未接收期间,就接受到了“server端”的第一次握手,此时直接进入SYN_RECV状态,接受到第二次握手后,直接进入ESTABLISHED状态。

3 代码实现p2p

        1 两个客户端同时循环connect,直到满足上方p2p的条件。

        2 创建两个线程:一个服务端reactor接受数据线程,一个客户端发送线程。

# 编译运行代码

# 注意:自行修改命令行ip、port和代码中的端口号

gcc -o p2p_r_cli p2p_r_cli.c

./p2p_r_cli 192.168.96.114 8000

gcc -o p2p_r_ser p2p_r_ser.c

./p2p_r_ser 192.168.96.114 8010

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <pthread.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>


typedef void (*reactor_cb)(int fd, void *arg);

struct reactor {
    int epfd;
    int maxevents;
    struct epoll_event *events;
};

struct reactor *reactor_create(int maxevents) {
    struct reactor *r = calloc(1, sizeof(*r));
    if (!r) return NULL;
    r->epfd = epoll_create1(0);
    if (r->epfd == -1) {
        free(r);
        return NULL;
    }
    r->maxevents = maxevents;
    r->events = calloc(maxevents, sizeof(struct epoll_event));
    if (!r->events) {
        close(r->epfd);
        free(r);
        return NULL;
    }
    return r;
}

int reactor_add(struct reactor *r, int fd, uint32_t events, void *data) {
    struct epoll_event ev;
    ev.events = events;
    ev.data.ptr = data;
    return epoll_ctl(r->epfd, EPOLL_CTL_ADD, fd, &ev);
}

int reactor_mod(struct reactor *r, int fd, uint32_t events, void *data) {
    struct epoll_event ev;
    ev.events = events;
    ev.data.ptr = data;
    return epoll_ctl(r->epfd, EPOLL_CTL_MOD, fd, &ev);
}

int reactor_del(struct reactor *r, int fd) {
    return epoll_ctl(r->epfd, EPOLL_CTL_DEL, fd, NULL);
}

void reactor_destroy(struct reactor *r) {
    if (!r) return;
    if (r->events) free(r->events);
    if (r->epfd != -1) close(r->epfd);
    free(r);
}

struct fd_ctx {
    int fd;
    reactor_cb cb;
    void *arg;
};

void *reactor_loop(void *arg) {
    struct reactor *r = arg;
    while (1) {
        int n = epoll_wait(r->epfd, r->events, r->maxevents, -1);
        if (n == -1) {
            if (errno == EINTR) continue;
            perror("epoll_wait");
            break;
        }
        for (int i = 0; i < n; ++i) {
            struct epoll_event *e = &r->events[i];
            struct fd_ctx *ctx = e->data.ptr;
            if (!ctx) continue;
            ctx->cb(ctx->fd, ctx->arg);
        }
    }
    return NULL;
}

/* ---------- 客户端连接 ---------- */


struct client_arg {
    int sockfd;
    struct reactor *r;
    struct fd_ctx ctx;
};

int connect_client(int fd, const char *ip, int port) {
    struct sockaddr_in addr;
    memset(&addr, 0, sizeof(addr));
    addr.sin_family = AF_INET;
    addr.sin_port = htons(port);
    addr.sin_addr.s_addr = inet_addr(ip);
    while (connect(fd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
        if (errno == EINPROGRESS) { usleep(1000); continue; }
        if (errno == EISCONN) break;
        if (errno == ECONNREFUSED || errno == EINTR) { usleep(1000); continue; }
        return -1;
    }
    return 0;
}

void on_recv(int fd, void *arg) {
    char buf[512];
    ssize_t n = recv(fd, buf, sizeof(buf)-1, 0);
    if (n > 0) {
        buf[n] = '\0';
        printf("recv: %s\n", buf);
        fflush(stdout);
    } else if (n == 0) {
        printf("peer closed\n");
        close(fd);
        struct client_arg *c = arg;
        reactor_del(c->r, fd);
    } else {
        if (errno == EINTR || errno == EAGAIN) return;
        perror("recv");
        close(fd);
        struct client_arg *c = arg;
        reactor_del(c->r, fd);
    }
}

void *stdin_thread(void *arg) {
    struct client_arg *c = arg;
    char buf[512];
    while (1) {
        printf("client > ");
        if (fgets(buf, sizeof(buf), stdin) == NULL) break;
        size_t len = strcspn(buf, "\n");
        buf[len] = '\0';
        if (strcmp(buf, "quit") == 0) break;
        ssize_t s = send(c->sockfd, buf, len, 0);
        if (s <= 0) {
            perror("send");
            break;
        }
    }
    shutdown(c->sockfd, SHUT_WR);
    return NULL;
}

int bind_localaddr(int port) {
    int localfd = socket(AF_INET, SOCK_STREAM, 0);
    if (localfd == -1) return -1;
    struct sockaddr_in localaddr;
    memset(&localaddr, 0, sizeof(localaddr));
    localaddr.sin_family = AF_INET;
    localaddr.sin_port = htons(port);
    localaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    if (bind(localfd, (struct sockaddr *)&localaddr, sizeof(localaddr)) == -1) {
        close(localfd);
        return -1;
    }
    return localfd;
}

int main(int argc, char **argv) {
    if (argc < 3) {
        fprintf(stderr, "Usage: %s <server-ip> <server-port>\n", argv[0]);
        return 1;
    }

    const char *ip = argv[1];
    int port = atoi(argv[2]);

    // 创建本地socket并绑定端口
    int sockfd = bind_localaddr(8010);
    if (sockfd < 0) {
        perror("bind_localaddr");
        return 1;
    }

    if (connect_client(sockfd, ip, port) < 0) {
        perror("connect_client");
        close(sockfd);
        return 1;
    }
    printf("connected to %s:%d, fd=%d\n", ip, port, sockfd);

    struct reactor *r = reactor_create(64);
    if (!r) {
        perror("reactor_create");
        close(sockfd);
        return 1;
    }

    struct client_arg c;
    c.sockfd = sockfd;
    c.r = r;
    c.ctx.fd = sockfd;
    c.ctx.cb = on_recv;
    c.ctx.arg = &c;

    if (reactor_add(r, sockfd, EPOLLIN | EPOLLRDHUP | EPOLLHUP, &c.ctx) == -1) {
        perror("reactor_add");
        reactor_destroy(r);
        close(sockfd);
        return 1;
    }

    pthread_t rtid, tid;
    pthread_create(&rtid, NULL, reactor_loop, r);
    pthread_create(&tid, NULL, stdin_thread, &c);

    pthread_join(tid, NULL);

    shutdown(sockfd, SHUT_RDWR);
    close(sockfd);

    sleep(1);

    reactor_destroy(r);

    pthread_cancel(rtid);
    pthread_join(rtid, NULL);

    return 0;
}

技术支持:https://github.com/0voice

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐