select()

由 kernel 注意某些 fd 是否 active(readable、writable 及有 error),有則 return 讓 application process 對 active 的 fd 做相應的處理。用 select() 可避免 application process 去 polling 看各個 socket 是否 active、浪費 CPU 資源。如果沒有 fd active、沒設 timeout、沒有 signal 打斷,select() 是 blocking。

正常狀況下 select() return 三個 fdset 共有多少 fd active。timeout 時 return 0。收到 signal return -1 且 errno 設為 EINTR,不會測試 fd 也不會修改 fd_set,所以不能用 fd 判斷是否 active。select() 之所以在被 signal 打斷時不修改 fd_set,是為了避免 select() 跟 signal handler 不斷修改同一個 flag 造成 infinite loop。例如 select() 發現某 flag 是 0 會將 flag 設為 1,而某個 signal handler 遇到 flag 是 1 又把 flag 設為 0,沒完沒了。

pselect() 可設定擋住哪些 signal,讓這些 signal 不打斷 pselect()

Sample Code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <cstring>
#include <cstdlib>
#include <errno.h>
#include <set>
#include <iostream>

#define MAXBUF 1000

using namespace std;

void run(int listenPort, int qlen);
int CreateListenSock(int listenPort, int qlen);

int main()
{
run(8899, 5);
return 0;
}

void run(int listenPort, int qlen)
{
fd_set afdset, rfdset, wfdset, efdset;
int listenfd = CreateListenSock(listenPort, qlen);
int maxfd = listenfd;
set<int> fds;
bool bNeedWrite = false;

FD_ZERO(&afdset); FD_ZERO(&rfdset); FD_ZERO(&wfdset); FD_ZERO(&efdset);
FD_SET(listenfd, &afdset);

fds.insert(listenfd);

while (true)
{
int iActive = 0;
struct timeval timeout;

rfdset = afdset;
efdset = afdset;

if (bNeedWrite)
{
wfdset = afdset;
}
else
{
FD_ZERO(&wfdset);
}

timeout.tv_sec = 3;
timeout.tv_usec = 0;

if ((iActive = select(maxfd + 1, &rfdset, &wfdset, &efdset, &timeout)) == -1)
{
// handle error
if (errno == EINTR)
{
}
else
{
}
}
else
{
int iHandled = 0;
set<int>::iterator fdIter = fds.begin();
for (; fdIter != fds.end() && iHandled < iActive; ++fdIter)
{
int fd = *fdIter;

if (FD_ISSET(fd, &rfdset))
{
if (fd == listenfd)
{
// handle new connection
struct sockaddr_in cliaddr;
socklen_t cliaddrlen = sizeof(cliaddr);

bzero((char *)&cliaddr, sizeof(cliaddr));

int connfd = accept(listenfd, (struct sockaddr *)&cliaddr, &cliaddrlen);

fds.insert(connfd);
FD_SET(connfd, &afdset);
if (connfd > maxfd)
{
maxfd = connfd;
}
}
else
{
// handle read
char readBuf[MAXBUF];
int iRead = 0;

bzero(readBuf, MAXBUF);

if ((iRead = read(fd, &readBuf, MAXBUF - 1)) > 0)
{
readBuf[iRead] = 0;
}
else if (iRead == 0)
{
close(fd);
FD_CLR(fd, &afdset);
fds.erase(fd);
}
else
{
// handle read error
}
}

iHandled++;
}

if (FD_ISSET(fd, &wfdset))
{
// handle write
iHandled++;
}

if (FD_ISSET(fd, &efdset))
{
// handle error
iHandled++;
}
}
}
}
}

int CreateListenSock(int listenPort, int qlen)
{
struct sockaddr_in servAddr;
int listenfd = -1;

if ((listenfd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
{
cerr << "Create socket failed" << endl;
exit(1);
}

bzero((char *)&servAddr, sizeof(servAddr));
servAddr.sin_family = AF_INET;
servAddr.sin_addr.s_addr = htonl(INADDR_ANY);
servAddr.sin_port = htons(listenPort);

if (bind(listenfd, (struct sockaddr *)&servAddr, sizeof(servAddr)) < 0)
{
cerr << "Bind socket failed" << endl;
exit(1);
}

listen(listenfd, qlen);

return listenfd;
}