0%

IO复用

Select系统调用

select系统调用的用途是:在一段指定时间内,监听用户感兴趣的文件描述符上的可读、可写和异常等事件。

select系统调用的原型如下:

1
2
#include <sys/select.h>
int select(int nfds, fd_set* readfds, fd_set* writefds, fd_set* exceptfds, struct timeval* timeout);
  • nfds参数指定被监听的文件描述符的总数。它通常被设置为select监听的所有文件描述符中的最大值加1因为文件描述符是从0开始计数的。
  • readfds、writefds和exceptfds参数分别指向可读、可写和异常等事件对应的文件描述符集合。应用程序调用select函数时,通过这三个参数传入自己感兴趣的文件描述符。select调用返回时,内核将修改它们来通知应用程序哪些文件描述符已经就绪。fd_set结构体仅包含一个整型数组,该数组的每个元素的每一位bit标记一个文件描述符。
  • timeout参数用来设置select函数的超时时间。它是一个timeval结构类型的指针,采用指针参数是因为内核将修改它以告诉应用程序select等待了多久。不过我们不能完全信任select调用返回后的timeout值,比如调用失败时timeout值是不确定的。如果给timeout参数传递NULL,则select将一直阻塞,直到某个文件描述符就绪

select成功时返回就绪(可读、可写和异常)文件描述符的总数。如果在超时时间内没有任何文件描述符就绪,select将返回0。select失败时返回-1并设置errno为EINTR。

如果觉得自己手动使用位操作去处理fd_set,可以使用下面的一系列宏和函数来访问fd_set结构体中的位:

1
2
3
4
5
#include <sys/select.h>
FD_ZERO(fd_set *fdset); /*清零fdset的所有位*/
FD_SET(int fd, fd_set *fdset); /*设置fdset的位fd*/
FD_CLR(int fd, fd_set *fdset); /*清除fdset的位fd*/
int FD_ISSET(int fd, fd_set *fdset); /*测试fdset的位fd是否被设置*/

文件描述符就绪条件

哪些情况下文件描述符可以被认为是可读、可写或者出现异常,对于select的使用非常关键。在网络编程中,下列情况可认为socket可读:

  • socket内核接受缓冲区中的字节数大于或等于其低水位标记SO_RCVLOWAT。此时我们可以无阻塞地读该socket,并且读操作返回的字节数大于0。
  • socket通信的对方关闭连接。此时对该socket的读操作将返回0。
  • 监听scoket上有新的连接请求。
  • socket上有未处理的错误,此时我们可以使用getsockopt来读取和清除错误。

下列情况下socket可写:

  • socket内核发送缓存区中的可用字节数小于或等于其低水位标记SO_SNDLOWAT。此时我们可以无阻塞地写该socket,并且写操作返回的字节数大于0。
  • socket的写操作被关闭。对写操作被关闭的socket执行写操作将触发一个SIGPIPE信号。
  • socket使用非阻塞connect连接成功或者失败(超时)之后。
  • socket上有未处理的错误。此时我们可以使用getsocketopt来读取和清除该错误。

网络程序中,select能处理的异常情况只有一种:socket上接受到带外数据。

下面是使用select函数处理socket带外数据的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
#include <iostream>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/select.h>
#include <unistd.h>

int main() {
sockaddr_in sockAddress{};
sockAddress.sin_family = AF_INET;
inet_pton(AF_INET, "localhost", &sockAddress.sin_addr);
sockAddress.sin_port = htons(9000);

int sockFD = socket(PF_INET, SOCK_STREAM, 0);
if (sockFD < 0) {
std::cerr << "create socket fail" << std::endl;
return sockFD;
}

int ret = bind(sockFD, (sockaddr *)&sockAddress, sizeof(sockAddress));
if (ret < 0) {
std::cerr << "socket bind fail" << std::endl;
return ret;
}

ret = listen(sockFD, 5);
if (ret < 0) {
std::cerr << "socket listen fail" << std::endl;
return ret;
}

sockaddr_in clientSock{};
socklen_t clientSockLen;
int clientSockFD = accept(sockFD, (sockaddr *)&clientSock, &clientSockLen);
if (clientSockFD < 0) {
std::cerr << "socket accept fail" << std::endl;
return clientSockFD;
}

char buffer[1024];
fd_set read_fds;
fd_set exception_fds;
FD_ZERO(&read_fds);
FD_ZERO(&exception_fds);

while (true) {
memset(buffer, 0, sizeof(buffer));
/*每次调用select 前都要重新在read_fds和exception_fds中设置文件描述符clientSockFD,因为事件发生之后,文件描述符集合将被内核修改*/
FD_SET(clientSockFD, &read_fds);
FD_SET(clientSockFD, &exception_fds);
ret = select(clientSockFD + 1, &read_fds, nullptr, &exception_fds, nullptr);
if (ret < 0) {
std::cerr << "select fail " << std::endl;
break;
}
/*对于可读事件,采用普通的recv函数读取数据*/
if (FD_ISSET(clientSockFD, &read_fds)) {
ret = recv(clientSockFD, buffer, sizeof(buffer) - 1, 0);
if (ret <= 0) {
break;
}
std::cout << "get " << ret << " bytes of normal data: " << buffer << std::endl;
} else if (FD_ISSET(clientSockFD, &exception_fds)) {
/*对于异常事件,采用带MSG_OOB标志的recv函数读取带外数据*/
ret = recv(clientSockFD, buffer, sizeof(buffer) - 1, MSG_OOB);
if (ret <= 0) {
break;
}
std::cout << "get " << ret << " bytes of oob data: " << buffer << std::endl;
}
close(clientSockFD);
}

return 0;
}

poll 系统调用

poll系统调用和select类似,也是在指定时间内轮训一定数量的文件描述符,以测试其中是否有就绪者。poll的原型如下

1
2
#include <poll.h>
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
  • fds参数是一个pollfd结构类型的数组,它指定所有我们感兴趣的文件描述符上发生的可读、可写和异常等事件。
  • nfds指定被监听事件的fds的数量。它本质是一个整数。
  • timeout参数指定poll的超时值,单位是毫秒。当timeout为-1时,poll调用将永远阻塞,直到某个事件发生;当timeout为0时,poll调用将立即返回。

poll调用的返回值的含义与select相同。

poll函数的第一个参数使用的pollfd结构体的定义如下:

1
2
3
4
5
6
struct pollfd
{
int fd; /*文件描述符*/
short events; /*注册的事件*/
short revents; /*实际发生的事件,由内核补充*/
}
  • fd指定文件描述符
  • events告诉poll监听fd上的那些事件,他是一系列事件的按位或
  • revents成员则由内核修改,以通知应用程序fd上实际发生了哪些事件

poll 支持的事件类型如下表所示:

事件 描述 是否可作为输入 是否可作为输出
POLLIN 数据可读(包括普通数据和优先数据) Y Y
POLLRDNORM 普通数据可读 Y Y
POLLRDBAND 优先级带数据可读(Linux不支持) Y Y
POLLPRI 高优先级数据可读,比如TCP带外数据 Y Y
POLLOUT 数据(包括普通数据和优先数据)可写 Y Y
POLLWRNORM 普通数据可写 Y Y
POLLWRBAND 优先级带数据可写 Y Y
POLLRDHUP TCP连接被对方关闭,或者对方关闭了写操作,它由GNU引入 Y Y
POLLERR 错误 N Y
POLLHUP 挂起。比如管道的写端被关闭后,读端描述符上将收到POLLHUP事件 N Y
POLLNVAL 文件描述符没有打开 N Y

epoll系统调用

内核事件表

epoll是Linux特有的I/O复用函数。它在实现和使用上与select、poll有很大差异。首先,epoll使用一组函数来完成任务,而不是单个函数。其次,epoll把用户关心的文件描述符上的事件放在内核里的一个事件表中。从而无须像select和poll那样每次调用都要重复传入文件描述符集或事件集。但epoll需要使用一个额外的文件描述符,来唯一标识内核中的这个事件表。这个文件描述符使用epoll_create函数来创建:
1
2
#include <sys/epoll.h>
int epoll_create(int size);
  • size参数现在并不起作用,只是给内核一个提示。告诉它事件表需要多大。该函数返回的文件描述符将用作其他所有epoll系统调用的第一个参数,以指要访问的内核事件表。

下面的函数可以用来操作epoll的内核事件表:

1
2
#include <sys/epoll.h>
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
  • epfd为epoll_create返回的事件表文件描述符
  • fd为要操作的文件描述符
  • op指定操作类型,操作类型有如下3种
    • EPOLL_CTL_ADD 往事件表中注册fd上的事件
    • EPOLL_CTL_MOD 修改fd上的注册事件
    • EPOLL_CTL_DEL 删除fd上对的注册事件
  • event 参数指定事件,它是epoll_event结构体指针类型。

epoll_event结构体的定义如下:

1
2
3
4
5
struct epoll_event 
{
__uint32_t events; /* epoll事件 */
epoll_data_t data; /* 用户数据 */
}
  • event 成员描述事件类型。epoll支持的事件类型和poll基本相同,表示epoll事件类型的宏是在poll对应的宏前加上”E”。但epoll有两个额外的事件类型——EPOLLET和EPOLLONESHOT,它们对于epoll的高效运作非常关键,这两个事件将在后面讨论。

  • data成员用于存储用户数据,其类型epoll_data_t的定义如下:

    1
    2
    3
    4
    5
    6
    7
    typedef union epoll_data
    {
    void *ptr;
    int fd;
    uint32_t u32;
    uint64_t u64;
    }epoll_data_t;

    epoll_data_t是一个union,其四个成员中使用最多的是fd,它指定事件所从属的目标文件描述符。ptr成员可用来指定与fd相关的用户数据。但由于epoll_data_t是一个union,我们不能同时使用ptr和fd成员,因此如果要将文件描述符和用户数据关联起来,只能使用其他手段,比如放弃使用epoll_data_t的fd成员,而在ptr指向的用户数据中包含fd。

epoll_ctl成功时返回0,失败时返回-1并设置errno。

epoll_wait函数

epoll的一系列系统调用的主要接口时epoll_wait函数。它在一段超时时间内等待一组文件描述符上的事件,其原型如下:

1
2
#include <sys/epoll.h>
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
  • timeout参数指定超时时间,单位为毫秒
  • maxevents参数指定最多监听多少个事件,它必须大于0
  • events参数,如果epoll_wait函数检测到事件,就将所有就绪的事件从内核事件表中复制到它events指向的数组中。这个数组只用于输出epoll_wait检测到的就绪事件,而不像select和poll的数组参数那样既用于传入用户注册的事件,又用于输出内核检查到的就绪事件。
  • epfd参数指定内核事件表的文件描述符

LT和ET模式

epoll对文件描述符的操作有两种模式:LT(Level Trigger,电平触发)模式和ET(Edge Trigger,边沿触发)模式。LT模式是默认的工作模式,这种模式下epoll相对于一个效率较高的poll。当往epoll内核事件表中注册一个文件描述符上的EPOLLET事件时,epoll将以ET模式来操作该文件描述符。ET模式时epoll的高效工作模式。

对于采用LT工作模式的文件描述符,当epoll_wait检测到其上有事件发生并将此事件通知应用程序后,应用程序可以不立即处理事件。这样当应用程序下一次调用epoll_wait时,epoll_wait还会再次向应用程序通告此事件,直到该事件被处理。

而对于采用ET工作模式的文件描述符,当epoll_wait检测到其上有事件发生并将此事件通知应用程序后,应用程序必须立即处理该事件,因为后续epoll_wait调用将不再向应用程序通知这一事件。可见,ET模式在很大程度上降低了epoll事件被重复触发的次数,因此效率要比LT模式高。

LT和ET模式下对事件的不同处理方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
//
// Created by Bytedance-Wall Flower on 2021/1/10.
//

#include <fcntl.h>
#include <sys/epoll.h>
#include <netinet/in.h>
#include <cstdio>
#include <cstring>
#include <unistd.h>
#include <cerrno>
#include <arpa/inet.h>


#define MAX_EVENT_NUM 64
#define BUFFER_SIZE 512


int setNonblocking(int fd) {
int old_option = fcntl(fd, F_GETFL);
int new_option = old_option | O_NONBLOCK;
fcntl(fd, F_SETFL, new_option);
return old_option;
}

void monitorEventForFd(int epoll_fd, int fd, bool enable_et) {
epoll_event event;
event.data.fd = fd;
event.events = EPOLLIN;
if (enable_et) {
event.events |= EPOLLET;
}
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &event);
setNonblocking(fd);
}

void lt_mode(epoll_event *events, int number, int epoll_fd, int listen_fd) {
char buf[512];
for (int i = 0; i < number; ++i) {
int sock_fd = events[i].data.fd;
if (sock_fd == listen_fd) {
sockaddr_in client_socket_addr{};
socklen_t sock_len;
int conn_fd = accept(listen_fd, (sockaddr *)&client_socket_addr, &sock_len);
monitorEventForFd(epoll_fd, conn_fd, false); //使用ET模式监听客户端socket fd

} else if (events[i].events & EPOLLIN) {//客户端socket fd有数据可读
printf("client socket fd %d is readable\n", sock_fd);
memset(buf, 0, BUFFER_SIZE);
int ret = recv(sock_fd, buf, BUFFER_SIZE-1, 0);
if (ret <= 0) {
close(sock_fd);
continue;
}
printf("sock fd %d get %d bytes of content: %s\n", sock_fd, ret, buf);
} else {
printf("other event happen in le mode\n");
}
}
}

void et_mode(epoll_event *events, int number, int epoll_fd, int listen_fd) {
char buf[BUFFER_SIZE];
for (int i = 0; i < number; ++i) {
int sock_fd = events[i].data.fd;
if (sock_fd == listen_fd) {
sockaddr_in client_socket_addr{};
socklen_t sock_len;
int conn_fd = accept(listen_fd, (sockaddr *)&client_socket_addr, &sock_len);
monitorEventForFd(epoll_fd, conn_fd, true); //使用ET模式监听客户端socket fd

} else if (events[i].events & EPOLLIN) {
printf("client socket fd %d is readable\n", sock_fd);
while (true) { //循环读,直到无数据可读
memset(buf, 0, BUFFER_SIZE);
int ret = recv(sock_fd, buf, BUFFER_SIZE-1, 0);
if (ret < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
//对于非阻塞io,下面的条件成立表示数据已经全部读取完毕,
//此后epoll 就能再次触发该sock fd上的EPOLLIN事件,以驱动下一次读操作
printf("read later\n");
break;
}
close(sock_fd);
break;
} else if (ret == 0) {
close(sock_fd);
} else {
printf("sock fd %d get %d bytes of content: %s\n", sock_fd, ret, buf);
}
}
} else {
printf("other event happen in et mode\n");
}
}

}

int main() {
sockaddr_in server_sock_addr{};
server_sock_addr.sin_family = AF_INET;
inet_pton(AF_INET, "localhost", &server_sock_addr.sin_addr);
server_sock_addr.sin_port = htons(9000);
int server_fd = socket(PF_INET, SOCK_STREAM, 0);
bind(server_fd, (sockaddr *)&server_sock_addr, sizeof(server_sock_addr));
listen(server_fd, 5);
epoll_event events[MAX_EVENT_NUM];
int epoll_fd = epoll_create(5);
monitorEventForFd(epoll_fd, server_fd, true);//monitor server socket fd use et mode
while (true) {
int ret = epoll_wait(epoll_fd, events, MAX_EVENT_NUM, -1);
if (ret < 0) {
printf("epoll wait fail\n");
break;
}
lt_mode(events, ret, epoll_fd, server_fd); //LT mode to process client connection
// et_mode(events, ret, epoll_fd, server_fd);//ET mode to process client connection
}
close(server_fd);
return 0;
}
注意在ET模式下,当客户端socket有数据可读的时候,需要使用循环一次性把所有数据读完。

EPOLLONESHOT事件

即使使用ET模式,一个socket上的某个事件还是可能被触发多次。这在并发程序中就会引起一个问题。比如一个线程在读取完某个socket上的数据后开始处理这些数据,而在数据的处理过程中该socket上又有数据可读(EPOLLIN再次被触发),此时另外一个线程被唤醒来读取这些新的数据。于是就出现了两个线程同时操作一个socket的局面。这当然不是我们期望的。我们期望的时一个socket连接在任一时刻都只被一个线程处理。这一点可以使用epoll的EPOLLONESHOT事件实现。

对于注册了EPOLLONESHOT事件的文件描述符,操作系统最多触发其上注册的一个可读、可写或者异常事件,且只触发一次,除非我们使用epoll_ctl函数重置该文件描述符上注册的EPOLLONESHOT事件。这样,当一个线程在处理某个socket时,其他线程是不可能有机会操作该socket的。但是,反过来,注册了EPOLLONESHOT事件的socket一旦被某个线程处理完毕,该线程就应该立即重置这个socket上的EPOLLONESHOT事件,以确保这个socket下一次可读时,其他EPOLLIN事件能被触发。

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
#include <fcntl.h>
#include <sys/epoll.h>
#include <netinet/in.h>
#include <cstdio>
#include <cstring>
#include <unistd.h>
#include <cerrno>
#include <arpa/inet.h>
#include <thread>

#define BUFFER_SIZE 512
#define MAX_EVENT_NUM 64

typedef struct FDPair {
int epoll_fd;
int sock_fd;
} FDPair;

int setNonBlocking(int fd) {
int old_option = fcntl(fd, F_GETFL);
int new_option = old_option | O_NONBLOCK;
fcntl(fd, F_SETFL, new_option);
return old_option;
}

void monitorEventForFd(int epoll_fd, int target_fd, bool enable_one_shot) {
epoll_event event{};
event.data.fd = target_fd;
event.events = EPOLLIN | EPOLLET;
if (enable_one_shot) {
event.events |= EPOLLONESHOT;
}
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, target_fd, &event);
setNonBlocking(target_fd);
}


void resetOneShot(int epoll_fd, int target_fd) {
//重置target fd上的事件,这样操作之后,
//尽管target fd上的EPOLLONESHOT事件被注册,但是操作系统仍然会触发target fd上的EPOLLIN事件,且只触发一次
epoll_event event{};
event.data.fd = target_fd;
event.events = EPOLLIN | EPOLLET | EPOLLONESHOT;
epoll_ctl(epoll_fd, EPOLL_CTL_MOD, target_fd, &event);//注意这里的操作是modify
}

//处理客户端socket的工作线程
void* worker(void *arg) {
auto *pair = (FDPair *)arg;
printf("start new thread to receive data on fd: %d\n", pair->sock_fd);
char buf[BUFFER_SIZE];
memset(buf, 0, BUFFER_SIZE);
while (true) {
int ret = recv(pair->sock_fd, buf, BUFFER_SIZE - 1, 0);
if (ret == 0) {
close(pair->sock_fd);
printf("foreigner closed the connection\n");
break;
} else if (ret < 0) {
//errno是线程安全的,不用担心
if (errno == EAGAIN) {
resetOneShot(pair->epoll_fd, pair->sock_fd);
printf("read later\n");
break;
}
} else {
printf("get content: %s\n", buf);
//休眠3s,模拟数据处理过程
sleep(3);
}
}
printf("end thread receiving data on socket fd: %d\n", pair->sock_fd);
return nullptr;
}

int main() {
sockaddr_in server_sock_addr{};
server_sock_addr.sin_family = AF_INET;
inet_pton(AF_INET, "localhost", &server_sock_addr.sin_addr);
server_sock_addr.sin_port = htons(9000);
int server_fd = socket(PF_INET, SOCK_STREAM, 0);
bind(server_fd, (sockaddr *)&server_sock_addr, sizeof(server_sock_addr));
listen(server_fd, 5);
epoll_event events[MAX_EVENT_NUM];
int epoll_fd = epoll_create(5);
//注意,监听server fd是不能注册EPOLLONESHOT事件的,
//否则应用程序只能处理一个客户连接,因为后续的客户连接请求将不再触发server_fd上的EPOLLIN事件
monitorEventForFd(epoll_fd, server_fd, false);

while (true) {
int event_count = epoll_wait(epoll_fd, events, MAX_EVENT_NUM, -1);
if (event_count < 0) {
printf("epoll wait fail\n");
break;
}
for (int i = 0; i < event_count; ++i) {
int sock_fd = events[i].data.fd;
if (sock_fd == server_fd) {
sockaddr_in client_sock_addr{};
socklen_t sock_len;
int client_sock_fd = accept(server_fd, (sockaddr *)&client_sock_addr, &sock_len);
monitorEventForFd(epoll_fd, client_sock_fd, true);
} else if (events[i].events & EPOLLIN){
pthread_t thread_handler;
FDPair pair{};
pair.epoll_fd = epoll_fd;
pair.sock_fd = sock_fd;
pthread_create(&thread_handler, nullptr, worker, &pair);
} else {
printf("other event happen\n");
}
}
}
close(server_fd);
return 0;
}
注意在工作线程中,处理完一次client socket fd上的数据后,需要重置该client socket fd

三个I/O复用函数的比较

  • 每次select和poll调用都返回整个用户注册的事件集合(所有注册的fd,不论是就绪的还是未就绪的),所以应用程序检索就绪文件描述符的时间复杂度为O(n)。而epoll_wait返回的events参数仅用来返回就绪事件,这使得应用程序索引就绪文件描述符的事件复杂度减少到O(1)。
  • poll和epoll_wait分别使用nfds和maxevents参数指定最多监听多少个文件描述符和事件。这两个数值都能到达系统允许打开的最大文件描述符数目。而select允许监听的最大文件描述符数量通常有限制。虽然用户可以修改这个限制,但这可能导致不可预期的后果。
  • 从实现原理上来说,select和poll采用的都是轮询的方式,即每次调用都要扫描整个注册文件描述符集合,并将其中就绪的文件描述符返回给用户程序,因此它们检测就绪事件的算法时间复杂度为O(n)。而epoll_wait采用的是回调的方式。内核检测到就绪的文件描述符时,将触发回调函数,回调函数就将该文件描述符上对应的事件插入内核就绪事件队列。内核最后在适当的时机将就绪事件队列中的内容拷贝到用户空间。因此epoll_wati的算法时间复杂度为O(1)。但是,当活动连接比较多的时候,epoll_wati的效率未必比select和poll高,因为此时回调函数被触发得过于频繁。所以epoll_wait适用于连接数量多,但是活动连接较少的情况

I/O复用的高级应用一:非阻塞connect

对于非阻塞的socket调用connect,而连接又没有立即建立时,这个时候,connect会设置errno值为EINPROGRESS。这个时候对于非阻塞的socket,我们可以调用select、poll等函数来监听这个连接失败的socket上的可写事件。当select、poll函数返回后,再利用getsockopt来读取错误码并清除该socket上的错误。如果错误码为0,表示连接成功建立,否则连接失败。非阻塞connect的应用:可以同时发起多个连接并一起等待。

代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
//
// Created by Bytedance-Wall Flower on 2021/1/15.
//


#include <fcntl.h>
#include <sys/select.h>
#include <netinet/in.h>
#include <cstdio>
#include <cstring>
#include <unistd.h>
#include <cerrno>
#include <arpa/inet.h>

#define BUFFER_SIZE 512

int setNonBlocking(int fd) {
int old_option = fcntl(fd, F_GETFL);
int new_option = old_option | O_NONBLOCK;
fcntl(fd, F_SETFL, new_option);
return old_option;
}

int nonblock_connect(const char *ip, int port, int timeout) {
sockaddr_in server_sock_addr{};
server_sock_addr.sin_family = AF_INET;
inet_pton(AF_INET, ip, &server_sock_addr.sin_addr);
server_sock_addr.sin_port = htons(port);
int sock_fd = socket(PF_INET, SOCK_STREAM, 0);
int old_option = setNonBlocking(sock_fd);
int ret = connect(sock_fd, (sockaddr *)&server_sock_addr, sizeof(server_sock_addr));
if (ret == 0) {
//如果连接成功,则恢复sock fd的属性
printf("connect with server immediately\n");
fcntl(sock_fd, F_SETFL, old_option);
return sock_fd;
} else if (errno == EINPROGRESS) {
//只有当errno是EINPROGRESS时才表示连接还在进行,
fd_set write_fds;
timeval select_timeout{};
FD_ZERO(&write_fds);
FD_SET(sock_fd, &write_fds);
select_timeout.tv_sec = timeout;
ret = select(sock_fd + 1, nullptr, &write_fds, nullptr, &select_timeout);
if (ret < 0) {
printf("select timeout\n");
close(sock_fd);
return -1;
}
if (!FD_ISSET(sock_fd, &write_fds)) {
printf("no write events on sock fd: %d\n", sock_fd);
close(sock_fd);
return -1;
}
int error_in_sock_fd = 0;
socklen_t error_len = sizeof(int);
if (getsockopt(sock_fd, SOL_SOCKET, SO_ERROR, &error_in_sock_fd, &error_len) < 0) {
printf("get socket option failed\n");
close(sock_fd);
return -1;
}
if (error_in_sock_fd != 0) {
//错误不为0,表示连接出错
printf("connection failed after select with the error: %d\n", error_in_sock_fd);
close(sock_fd);
return -1;
} else {
//连接成功
printf("connection ready after select with the socket: %d\n", sock_fd);
fcntl(sock_fd, F_SETFL, old_option);//重置为原来的属性
return sock_fd;
}

} else {
//如果连接没有立即建立,连接又不还在进行,出错返回
printf("non-block connect unsupported\n");
close(sock_fd);
return -1;
}
}

int main() {
int sock_fd = nonblock_connect("localhost", 9000, 10);
if (sock_fd < 0) {
return -1;
}
const char *data = "data from client !!!";
send(sock_fd, data, strlen(data), 0);
close(sock_fd);
return 0;
}
注意,上面的代码有平台移植的问题:
  • 首先,非阻塞的socket可能导致connect始终失败。
  • 其次,select对处于EINPROGRESS状态下的socket可能不起作用
  • 对于出错的socket,getsockopt在有些系统(比如Linux)上返回-1,而在有些系统(比如BSD)上则返回0。

这些问题没有一个统一的解决方法,可能需要针对不同的平台使用宏来分别判断。

I/O复用高级应用二:简单聊天室程序