libuv 源码阅读(3) - poll过程

by gngshn gngshn@gmail.com

上一篇, 我们将到libuv的event loop过程, 其中留了个悬念, 下面我们来解除这个悬念, 直接上函数, 这个函数特别长, 我们依旧直接在函数里面写注释把~

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
void uv__io_poll(uv_loop_t* loop, int timeout) {
/* A bug in kernels < 2.6.37 makes timeouts larger than ~30 minutes
* effectively infinite on 32 bits architectures. To avoid blocking
* indefinitely, we cap the timeout and poll again if necessary.
*
* Note that "30 minutes" is a simplification because it depends on
* the value of CONFIG_HZ. The magic constant assumes CONFIG_HZ=1200,
* that being the largest value I have seen in the wild (and only once.)
*/
static const int max_safe_timeout = 1789569;
static int no_epoll_pwait;
static int no_epoll_wait;
struct uv__epoll_event events[1024];
struct uv__epoll_event* pe;
struct uv__epoll_event e;
int real_timeout;
QUEUE* q;
uv__io_t* w;
sigset_t sigset;
uint64_t sigmask;
uint64_t base;
int have_signals;
int nevents;
int count;
int nfds;
int fd;
int op;
int i;

/* 如果watcher_queue是空, 表明没有时间要等待, 可以直接返回 */
if (loop->nfds == 0) {
assert(QUEUE_EMPTY(&loop->watcher_queue));
return;
}

/* 以此在`watcher_queue`中取出事件 */
while (!QUEUE_EMPTY(&loop->watcher_queue)) {
q = QUEUE_HEAD(&loop->watcher_queue);
QUEUE_REMOVE(q);
QUEUE_INIT(q);

w = QUEUE_DATA(q, uv__io_t, watcher_queue);
assert(w->pevents != 0);
assert(w->fd >= 0);
assert(w->fd < (int) loop->nwatchers);

/* 把需要监听的文件的事件加入/修改到epoll中 */
e.events = w->pevents;
e.data = w->fd;

if (w->events == 0)
op = UV__EPOLL_CTL_ADD;
else
op = UV__EPOLL_CTL_MOD;

/* XXX Future optimization: do EPOLL_CTL_MOD lazily if we stop watching
* events, skip the syscall and squelch the events after epoll_wait().
*/
if (uv__epoll_ctl(loop->backend_fd, op, w->fd, &e)) {
if (errno != EEXIST)
abort();

assert(op == UV__EPOLL_CTL_ADD);

/* We've reactivated a file descriptor that's been watched before. */
if (uv__epoll_ctl(loop->backend_fd, UV__EPOLL_CTL_MOD, w->fd, &e))
abort();
}

w->events = w->pevents;
}

/* 对SIGPROF的屏蔽处理, 注意这里用到了两种方式, 取决于系统的支持程度epoll_pwait/pthread_sigmask */
sigmask = 0;
if (loop->flags & UV_LOOP_BLOCK_SIGPROF) {
sigemptyset(&sigset);
sigaddset(&sigset, SIGPROF);
sigmask |= 1 << (SIGPROF - 1);
}

assert(timeout >= -1);
base = loop->time;
count = 48; /* Benchmarks suggest this gives the best throughput. */
real_timeout = timeout;

for (;;) {
/* See the comment for max_safe_timeout for an explanation of why
* this is necessary. Executive summary: kernel bug workaround.
*/
/* 见上面的英文注释 */
if (sizeof(int32_t) == sizeof(long) && timeout >= max_safe_timeout)
timeout = max_safe_timeout;

if (sigmask != 0 && no_epoll_pwait != 0)
if (pthread_sigmask(SIG_BLOCK, &sigset, NULL))
abort();

/* 进行epoll调用, 等待事件 */
if (no_epoll_wait != 0 || (sigmask != 0 && no_epoll_pwait == 0)) {
nfds = uv__epoll_pwait(loop->backend_fd,
events,
ARRAY_SIZE(events),
timeout,
sigmask);
if (nfds == -1 && errno == ENOSYS)
no_epoll_pwait = 1;
} else {
nfds = uv__epoll_wait(loop->backend_fd,
events,
ARRAY_SIZE(events),
timeout);
if (nfds == -1 && errno == ENOSYS)
no_epoll_wait = 1;
}

if (sigmask != 0 && no_epoll_pwait != 0)
if (pthread_sigmask(SIG_UNBLOCK, &sigset, NULL))
abort();

/* Update loop->time unconditionally. It's tempting to skip the update when
* timeout == 0 (i.e. non-blocking poll) but there is no guarantee that the
* operating system didn't reschedule our process while in the syscall.
*/
/* 见上面英文注释 */
SAVE_ERRNO(uv__update_time(loop));

if (nfds == 0) {
assert(timeout != -1);

if (timeout == 0)
return;

/* We may have been inside the system call for longer than |timeout|
* milliseconds so we need to update the timestamp to avoid drift.
*/
goto update_timeout;
}

if (nfds == -1) {
if (errno == ENOSYS) {
/* epoll_wait() or epoll_pwait() failed, try the other system call. */
assert(no_epoll_wait == 0 || no_epoll_pwait == 0);
continue;
}

if (errno != EINTR)
abort();

if (timeout == -1)
continue;

if (timeout == 0)
return;

/* Interrupted by a signal. Update timeout and poll again. */
goto update_timeout;
}

have_signals = 0;
nevents = 0;

/* 把所有poll到的事件描述信息放在数组的最后两个元素 */
assert(loop->watchers != NULL);
loop->watchers[loop->nwatchers] = (void*) events;
loop->watchers[loop->nwatchers + 1] = (void*) (uintptr_t) nfds;
for (i = 0; i < nfds; i++) {
pe = events + i;
fd = pe->data;

/* Skip invalidated events, see uv__platform_invalidate_fd */
if (fd == -1)
continue;

assert(fd >= 0);
assert((unsigned) fd < loop->nwatchers);

w = loop->watchers[fd];

if (w == NULL) {
/* File descriptor that we've stopped watching, disarm it.
*
* Ignore all errors because we may be racing with another thread
* when the file descriptor is closed.
*/
uv__epoll_ctl(loop->backend_fd, UV__EPOLL_CTL_DEL, fd, pe);
continue;
}

/* Give users only events they're interested in. Prevents spurious
* callbacks when previous callback invocation in this loop has stopped
* the current watcher. Also, filters out events that users has not
* requested us to watch.
*/
pe->events &= w->pevents | POLLERR | POLLHUP;

/* Work around an epoll quirk where it sometimes reports just the
* EPOLLERR or EPOLLHUP event. In order to force the event loop to
* move forward, we merge in the read/write events that the watcher
* is interested in; uv__read() and uv__write() will then deal with
* the error or hangup in the usual fashion.
*
* Note to self: happens when epoll reports EPOLLIN|EPOLLHUP, the user
* reads the available data, calls uv_read_stop(), then sometime later
* calls uv_read_start() again. By then, libuv has forgotten about the
* hangup and the kernel won't report EPOLLIN again because there's
* nothing left to read. If anything, libuv is to blame here. The
* current hack is just a quick bandaid; to properly fix it, libuv
* needs to remember the error/hangup event. We should get that for
* free when we switch over to edge-triggered I/O.
*/
if (pe->events == POLLERR || pe->events == POLLHUP)
pe->events |= w->pevents & (POLLIN | POLLOUT | UV__POLLPRI);

if (pe->events != 0) {
/* Run signal watchers last. This also affects child process watchers
* because those are implemented in terms of signal watchers.
*/
/* 调用回调, 对于信号处理放在后面统一执行 */
if (w == &loop->signal_io_watcher)
have_signals = 1;
else
w->cb(loop, w, pe->events);

nevents++;
}
}

/* 执行信号处理程序 根据uv_loop_init看出, 这个程序是uv__signal_event, 由uv__io_init(&loop->signal_io_watcher, uv__signal_event, loop->signal_pipefd[0]) 指定, 后面我们再来看这个函数 */
if (have_signals != 0)
loop->signal_io_watcher.cb(loop, &loop->signal_io_watcher, POLLIN);

/* 删除两个描述符 */
loop->watchers[loop->nwatchers] = NULL;
loop->watchers[loop->nwatchers + 1] = NULL;

if (have_signals != 0)
return; /* Event loop should cycle now so don't poll again. */

if (nevents != 0) {
if (nfds == ARRAY_SIZE(events) && --count != 0) {
/* Poll for more events but don't block this time. */
timeout = 0;
continue;
}
return;
}

if (timeout == 0)
return;

if (timeout == -1)
continue;

update_timeout:
assert(timeout > 0);

real_timeout -= (loop->time - base);
if (real_timeout <= 0)
return;

timeout = real_timeout;
}
}

通过看libuv source code中的注释, 可以很快的理解这段代码到底做了写什么. 主要是加入待处理的事件到epoll, 然后poll, 发生事件就调用相关的回调函数. 另外, 为了规避BUG, 引入max_safe_timeout, 为了吞吐量, 引入poll的count.

我们最后来看看信号处理函数干了写啥:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
static void uv__signal_event(uv_loop_t* loop,
uv__io_t* w,
unsigned int events) {
uv__signal_msg_t* msg;
uv_signal_t* handle;
char buf[sizeof(uv__signal_msg_t) * 32];
size_t bytes, end, i;
int r;

bytes = 0;
end = 0;

do {
r = read(loop->signal_pipefd[0], buf + bytes, sizeof(buf) - bytes);

if (r == -1 && errno == EINTR)
continue;

if (r == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
/* If there are bytes in the buffer already (which really is extremely
* unlikely if possible at all) we can't exit the function here. We'll
* spin until more bytes are read instead.
*/
if (bytes > 0)
continue;

/* Otherwise, there was nothing there. */
return;
}

/* Other errors really should never happen. */
if (r == -1)
abort();

bytes += r;

/* `end` is rounded down to a multiple of sizeof(uv__signal_msg_t). */
end = (bytes / sizeof(uv__signal_msg_t)) * sizeof(uv__signal_msg_t);

for (i = 0; i < end; i += sizeof(uv__signal_msg_t)) {
msg = (uv__signal_msg_t*) (buf + i);
handle = msg->handle;

if (msg->signum == handle->signum) {
assert(!(handle->flags & UV_CLOSING));
handle->signal_cb(handle, handle->signum);
}

handle->dispatched_signals++;

if (handle->flags & UV__SIGNAL_ONE_SHOT)
uv__signal_stop(handle);

/* If uv_close was called while there were caught signals that were not
* yet dispatched, the uv__finish_close was deferred. Make close pending
* now if this has happened.
*/
if ((handle->flags & UV_CLOSING) &&
(handle->caught_signals == handle->dispatched_signals)) {
uv__make_close_pending((uv_handle_t*) handle);
}
}

bytes -= end;

/* If there are any "partial" messages left, move them to the start of the
* the buffer, and spin. This should not happen.
*/
if (bytes) {
memmove(buf, buf + end, bytes);
continue;
}
} while (end == sizeof buf);
}

这个函数从之前讲过的的signal_pipefd[0]读取异常处理程序(uv__signal_handler)发送过来的消息(uv__signal_msg_t), 这个消息包括相应的handle的signum, 调用handle的回调函数, 并进行一些回收的处理. 具体细节可以追踪函数来看, 这里就不去细说了.

到此我们就看完了libuv的event loop.