by gngshn [email protected]
libuv通过uv_work_queue
来交付任务给工作队列的, 这个api也是libuv实现文件异步操作的基础:
1 2 3 4 UV_EXTERN int uv_queue_work (uv_loop_t * loop, uv_work_t * req, uv_work_cb work_cb, uv_after_work_cb after_work_cb) ;
这个工作队列实现方式就是把任务(work_cb)交给线程池来处理, 并且任务完成后, 调用相应的回调函数(after_work_cb). 我们先来看看这个函数的实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 int uv_queue_work (uv_loop_t * loop, uv_work_t * req, uv_work_cb work_cb, uv_after_work_cb after_work_cb) { if (work_cb == NULL ) return UV_EINVAL; uv__req_init(loop, req, UV_WORK); req->loop = loop; req->work_cb = work_cb; req->after_work_cb = after_work_cb; uv__work_submit(loop, &req->work_req, uv__queue_work, uv__queue_done); return 0 ; }
可以看到函数首先初始化这个请求(req), 然后就调用uv__work_submit
完成剩余的工作. 下面我们主要就来分析一下uv__work_submit
的操作过程. 看看libuv是如何调用work_cb
来完成任务并调用到after_work_cb
回调函数的.
1 2 3 4 5 6 7 8 9 10 void uv__work_submit (uv_loop_t * loop, struct uv__work* w, void (*work)(struct uv__work* w), void (*done)(struct uv__work* w, int status)) { uv_once(&once, init_once); w->loop = loop; w->work = work; w->done = done; post(&w->wq); }
uv_once(&once, init_once);
用来初始化libuv的线程池, 只会被调用一次. 线程池中的线程都是执行这样一个函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 static void worker (void * arg) { struct uv__work * w ; QUEUE* q; (void ) arg; for (;;) { uv_mutex_lock(&mutex); while (QUEUE_EMPTY(&wq)) { idle_threads += 1 ; uv_cond_wait(&cond, &mutex); idle_threads -= 1 ; } ...... } }
可以看到每个线程都是等待在uv_cond_wait(&cond, &mutex);
. 初始化完成后, 会调用post(&w->wq)
:
1 2 3 4 5 6 7 static void post (QUEUE* q) { uv_mutex_lock(&mutex); QUEUE_INSERT_TAIL(&wq, q); if (idle_threads > 0 ) uv_cond_signal(&cond); uv_mutex_unlock(&mutex); }
可以看到post(&w->wq)
就是把w
挂到全局的wq
上面, 然后调用uv_cond_signal
, 这就会唤醒一个前面的正在等待的线程来处理这个任务, 一个线程唤醒后, 就会执行work函数的后续部分.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 static void worker (void * arg) { struct uv__work * w ; QUEUE* q; (void ) arg; for (;;) { uv_mutex_lock(&mutex); while (QUEUE_EMPTY(&wq)) { idle_threads += 1 ; uv_cond_wait(&cond, &mutex); idle_threads -= 1 ; } q = QUEUE_HEAD(&wq); if (q == &exit_message) uv_cond_signal(&cond); else { QUEUE_REMOVE(q); QUEUE_INIT(q); } uv_mutex_unlock(&mutex); if (q == &exit_message) break ; w = QUEUE_DATA(q, struct uv__work, wq); w->work(w); uv_mutex_lock(&w->loop->wq_mutex); w->work = NULL ; QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq); uv_async_send(&w->loop->wq_async); uv_mutex_unlock(&w->loop->wq_mutex); } }
线程先取出一个链表的元素q.exit_message
相关的操作是用来退出所有线程的, 在libuv退出时, post(&exit_message);
被调用, 这会让libuv的所有线程都退出. 如果没有exit_message
, 线程正常往后执行. 先把q
从wq
链表中删除, 然后调用w->work
(就是最开始的work_cb
, 相当于执行工作), 然后把w放到loop->wq
中并调用uv_async_send
向event loop发送信号. event loop会注意这个信号并作出相应的处理. 为了弄清楚event loop是如何注意到这个信号的, 我们先来看看uv_async_send
都干了什么
1 2 3 4 5 6 7 8 9 10 int uv_async_send (uv_async_t * handle) { if (ACCESS_ONCE(int , handle->pending) != 0 ) return 0 ; if (cmpxchgi(&handle->pending, 0 , 1 ) == 0 ) uv__async_send(handle->loop); return 0 ; }
这里async事件在还没有被处理时(penging=1)多次发送也只有一次生效 判断handle->pending
如果是1, 表示已经发送过并且还没处理, 所以直接返回. 如果是0就表明没有pending事件, 原子的设置pending
为1, 并调用uv__async_send
, 这个函数会往loop->async_io_watcher.fd
(一个eventfd)里面写入’\n’, 从而event_loop会在epoll中发现. 发现后会调用相应的回调函数, 那么回调函数是什么呢? 在uv_loop_init
中会调用uv_async_init(loop, &loop->wq_async, uv__work_done)
来指定回调函数是uv__work_done
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 int uv_async_init (uv_loop_t * loop, uv_async_t * handle, uv_async_cb async_cb) { int err; err = uv__async_start(loop); if (err) return err; uv__handle_init(loop, (uv_handle_t *)handle, UV_ASYNC); handle->async_cb = async_cb; handle->pending = 0 ; QUEUE_INSERT_TAIL(&loop->async_handles, &handle->queue ); uv__handle_start(handle); return 0 ; }
uv__async_start
中创建eventfd, 并将其POLLIN事件加入event loop, 事件发生时会调用uv__async_io
1 2 uv__io_init(&loop->async_io_watcher, uv__async_io, pipefd[0 ]); uv__io_start(loop, &loop->async_io_watcher, POLLIN);
然后把handle初始化并配置async_cb和pending加入到event loop的async_handles中, 并启动handle. 事件发生后调用uv__async_io
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 static void uv__async_io (uv_loop_t * loop, uv__io_t * w, unsigned int events) { char buf[1024 ]; ssize_t r; QUEUE queue ; QUEUE* q; uv_async_t * h; assert(w == &loop->async_io_watcher); for (;;) { r = read(w->fd, buf, sizeof (buf)); if (r == sizeof (buf)) continue ; if (r != -1 ) break ; if (errno == EAGAIN || errno == EWOULDBLOCK) break ; if (errno == EINTR) continue ; abort (); } QUEUE_MOVE(&loop->async_handles, &queue ); while (!QUEUE_EMPTY(&queue )) { q = QUEUE_HEAD(&queue ); h = QUEUE_DATA(q, uv_async_t , queue ); QUEUE_REMOVE(q); QUEUE_INSERT_TAIL(&loop->async_handles, q); if (cmpxchgi(&h->pending, 1 , 0 ) == 0 ) continue ; if (h->async_cb == NULL ) continue ; h->async_cb(h); } }
可以看到这个函数先把eventfd的内容读空, 然后一次对async_handles中的元素判断其pending, 如果为1就原子的将其至0, 这也表示handle有待处理的异步通知, 因此就会调用h->async_cb(h)
前面说过(uv_async_init(loop, &loop->wq_async, uv__work_done)
)对于我们的线程池来说这个回调是uv__work_done
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 void uv__work_done (uv_async_t * handle) { struct uv__work * w ; uv_loop_t * loop; QUEUE* q; QUEUE wq; int err; loop = container_of(handle, uv_loop_t , wq_async); uv_mutex_lock(&loop->wq_mutex); QUEUE_MOVE(&loop->wq, &wq); uv_mutex_unlock(&loop->wq_mutex); while (!QUEUE_EMPTY(&wq)) { q = QUEUE_HEAD(&wq); QUEUE_REMOVE(q); w = container_of(q, struct uv__work, wq); err = (w->work == uv__cancelled) ? UV_ECANCELED : 0 ; w->done(w, err); } }
函数把loop->wq
的元素挨个删除并调用done
回调函数. 这个done
回调函数就是我们前面说的after_work_cb
回调. 至此, libuv的工作队列的实现就说完了.