libuv 源码分析(4) - libuv的工作队列(线程池)

by gngshn [email protected]

libuv通过uv_work_queue来交付任务给工作队列的, 这个api也是libuv实现文件异步操作的基础:

1
2
3
4
UV_EXTERN int uv_queue_work(uv_loop_t* loop,
uv_work_t* req,
uv_work_cb work_cb,
uv_after_work_cb after_work_cb);

这个工作队列实现方式就是把任务(work_cb)交给线程池来处理, 并且任务完成后, 调用相应的回调函数(after_work_cb).

我们先来看看这个函数的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
int uv_queue_work(uv_loop_t* loop,
uv_work_t* req,
uv_work_cb work_cb,
uv_after_work_cb after_work_cb) {
if (work_cb == NULL)
return UV_EINVAL;

uv__req_init(loop, req, UV_WORK);
req->loop = loop;
req->work_cb = work_cb;
req->after_work_cb = after_work_cb;
uv__work_submit(loop, &req->work_req, uv__queue_work, uv__queue_done);
return 0;
}

可以看到函数首先初始化这个请求(req), 然后就调用uv__work_submit完成剩余的工作.

下面我们主要就来分析一下uv__work_submit的操作过程. 看看libuv是如何调用work_cb来完成任务并调用到after_work_cb回调函数的.

1
2
3
4
5
6
7
8
9
10
void uv__work_submit(uv_loop_t* loop,
struct uv__work* w,
void (*work)(struct uv__work* w),
void (*done)(struct uv__work* w, int status)) {
uv_once(&once, init_once);
w->loop = loop;
w->work = work;
w->done = done;
post(&w->wq);
}

uv_once(&once, init_once);用来初始化libuv的线程池, 只会被调用一次. 线程池中的线程都是执行这样一个函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
static void worker(void* arg) {
struct uv__work* w;
QUEUE* q;

(void) arg;

for (;;) {
uv_mutex_lock(&mutex);

while (QUEUE_EMPTY(&wq)) {
idle_threads += 1;
uv_cond_wait(&cond, &mutex);
idle_threads -= 1;
}
......
}
}

可以看到每个线程都是等待在uv_cond_wait(&cond, &mutex);.

初始化完成后, 会调用post(&w->wq):

1
2
3
4
5
6
7
static void post(QUEUE* q) {
uv_mutex_lock(&mutex);
QUEUE_INSERT_TAIL(&wq, q);
if (idle_threads > 0)
uv_cond_signal(&cond);
uv_mutex_unlock(&mutex);
}

可以看到post(&w->wq)就是把w挂到全局的wq上面, 然后调用uv_cond_signal, 这就会唤醒一个前面的正在等待的线程来处理这个任务, 一个线程唤醒后, 就会执行work函数的后续部分.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
static void worker(void* arg) {
struct uv__work* w;
QUEUE* q;

(void) arg;

for (;;) {
uv_mutex_lock(&mutex);

while (QUEUE_EMPTY(&wq)) {
idle_threads += 1;
uv_cond_wait(&cond, &mutex);
idle_threads -= 1;
}

q = QUEUE_HEAD(&wq);

if (q == &exit_message)
uv_cond_signal(&cond);
else {
QUEUE_REMOVE(q);
QUEUE_INIT(q); /* Signal uv_cancel() that the work req is
executing. */
}

uv_mutex_unlock(&mutex);

if (q == &exit_message)
break;

w = QUEUE_DATA(q, struct uv__work, wq);
w->work(w);

uv_mutex_lock(&w->loop->wq_mutex);
w->work = NULL; /* Signal uv_cancel() that the work req is done
executing. */
QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq);
uv_async_send(&w->loop->wq_async);
uv_mutex_unlock(&w->loop->wq_mutex);
}
}

线程先取出一个链表的元素q.

exit_message相关的操作是用来退出所有线程的, 在libuv退出时, post(&exit_message);被调用, 这会让libuv的所有线程都退出.

如果没有exit_message, 线程正常往后执行. 先把qwq链表中删除, 然后调用w->work(就是最开始的work_cb, 相当于执行工作), 然后把w放到loop->wq中并调用uv_async_send向event loop发送信号. event loop会注意这个信号并作出相应的处理.

为了弄清楚event loop是如何注意到这个信号的, 我们先来看看uv_async_send都干了什么

1
2
3
4
5
6
7
8
9
10
int uv_async_send(uv_async_t* handle) {
/* Do a cheap read first. */
if (ACCESS_ONCE(int, handle->pending) != 0)
return 0;

if (cmpxchgi(&handle->pending, 0, 1) == 0)
uv__async_send(handle->loop);

return 0;
}

这里async事件在还没有被处理时(penging=1)多次发送也只有一次生效

判断handle->pending如果是1, 表示已经发送过并且还没处理, 所以直接返回.

如果是0就表明没有pending事件, 原子的设置pending为1, 并调用uv__async_send, 这个函数会往loop->async_io_watcher.fd(一个eventfd)里面写入’\n’, 从而event_loop会在epoll中发现. 发现后会调用相应的回调函数, 那么回调函数是什么呢?

uv_loop_init中会调用uv_async_init(loop, &loop->wq_async, uv__work_done)来指定回调函数是uv__work_done:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
int uv_async_init(uv_loop_t* loop, uv_async_t* handle, uv_async_cb async_cb) {
int err;

err = uv__async_start(loop);
if (err)
return err;

uv__handle_init(loop, (uv_handle_t*)handle, UV_ASYNC);
handle->async_cb = async_cb;
handle->pending = 0;

QUEUE_INSERT_TAIL(&loop->async_handles, &handle->queue);
uv__handle_start(handle);

return 0;
}

uv__async_start中创建eventfd, 并将其POLLIN事件加入event loop, 事件发生时会调用uv__async_io

1
2
uv__io_init(&loop->async_io_watcher, uv__async_io, pipefd[0]);
uv__io_start(loop, &loop->async_io_watcher, POLLIN);

然后把handle初始化并配置async_cb和pending加入到event loop的async_handles中, 并启动handle.

事件发生后调用uv__async_io:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
static void uv__async_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
char buf[1024];
ssize_t r;
QUEUE queue;
QUEUE* q;
uv_async_t* h;

assert(w == &loop->async_io_watcher);

for (;;) {
r = read(w->fd, buf, sizeof(buf));

if (r == sizeof(buf))
continue;

if (r != -1)
break;

if (errno == EAGAIN || errno == EWOULDBLOCK)
break;

if (errno == EINTR)
continue;

abort();
}

QUEUE_MOVE(&loop->async_handles, &queue);
while (!QUEUE_EMPTY(&queue)) {
q = QUEUE_HEAD(&queue);
h = QUEUE_DATA(q, uv_async_t, queue);

QUEUE_REMOVE(q);
QUEUE_INSERT_TAIL(&loop->async_handles, q);

if (cmpxchgi(&h->pending, 1, 0) == 0)
continue;

if (h->async_cb == NULL)
continue;

h->async_cb(h);
}
}

可以看到这个函数先把eventfd的内容读空, 然后一次对async_handles中的元素判断其pending, 如果为1就原子的将其至0, 这也表示handle有待处理的异步通知, 因此就会调用h->async_cb(h)

前面说过(uv_async_init(loop, &loop->wq_async, uv__work_done))对于我们的线程池来说这个回调是uv__work_done:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
void uv__work_done(uv_async_t* handle) {
struct uv__work* w;
uv_loop_t* loop;
QUEUE* q;
QUEUE wq;
int err;

loop = container_of(handle, uv_loop_t, wq_async);
uv_mutex_lock(&loop->wq_mutex);
QUEUE_MOVE(&loop->wq, &wq);
uv_mutex_unlock(&loop->wq_mutex);

while (!QUEUE_EMPTY(&wq)) {
q = QUEUE_HEAD(&wq);
QUEUE_REMOVE(q);

w = container_of(q, struct uv__work, wq);
err = (w->work == uv__cancelled) ? UV_ECANCELED : 0;
w->done(w, err);
}
}

函数把loop->wq的元素挨个删除并调用done回调函数. 这个done回调函数就是我们前面说的after_work_cb回调.
至此, libuv的工作队列的实现就说完了.