by gngshn gngshn@gmail.com
上一篇, 我们讲到了libuv的工作队列, 这一篇我们讲到的文件操作刚好就用到了工作队列. 刚好复习一下. 先来看一段libuv文件操作的代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 #include <assert.h> #include <stdio.h> #include <fcntl.h> #include <unistd.h> #include <uv.h> void on_read (uv_fs_t *req) ;uv_fs_t open_req;uv_fs_t read_req;uv_fs_t write_req;static char buffer[1024 ];static uv_buf_t iov;void on_write (uv_fs_t *req) { if (req->result < 0 ) { fprintf (stderr , "Write error: %s\n" , uv_strerror((int )req->result)); } else { uv_fs_read(uv_default_loop(), &read_req, open_req.result, &iov, 1 , -1 , on_read); } } void on_read (uv_fs_t *req) { if (req->result < 0 ) { fprintf (stderr , "Read error: %s\n" , uv_strerror(req->result)); } else if (req->result == 0 ) { uv_fs_t close_req; uv_fs_close(uv_default_loop(), &close_req, open_req.result, NULL ); } else if (req->result > 0 ) { iov.len = req->result; uv_fs_write(uv_default_loop(), &write_req, 1 , &iov, 1 , -1 , on_write); } } void on_open (uv_fs_t *req) { assert(req == &open_req); if (req->result >= 0 ) { iov = uv_buf_init(buffer, sizeof (buffer)); uv_fs_read(uv_default_loop(), &read_req, req->result, &iov, 1 , -1 , on_read); } else { fprintf (stderr , "error opening file: %s\n" , uv_strerror((int )req->result)); } } int main (int argc, char **argv) { uv_fs_open(uv_default_loop(), &open_req, argv[1 ], O_RDONLY, 0 , on_open); uv_run(uv_default_loop(), UV_RUN_DEFAULT); uv_fs_req_cleanup(&open_req); uv_fs_req_cleanup(&read_req); uv_fs_req_cleanup(&write_req); return 0 ; }
可以看到, 首先调用uv_fs_open
, 然后在open的回调函数on_open
中调用uv_fs_read
读取文件, 之后在read的回调函数on_read
中调用uv_fs_close
(同步)或uv_fs_write
(异步), 在write的回调函数中继续调用uv_fs_read
从事时间将文件全部都出来然后写入到标准输出(1)中.
下面我们就一步一步跟踪libuv的api来看看libuv是如何处理文件操作的.
uv_fs_open uv_fs_open
的定义如下
1 2 3 4 5 6 7 8 9 10 11 12 int uv_fs_open (uv_loop_t * loop, uv_fs_t * req, const char * path, int flags, int mode, uv_fs_cb cb) { INIT(OPEN); PATH; req->flags = flags; req->mode = mode; POST; }
其中INIT
, PATH
, POST
宏是为了减少重复代码, 后面将看见它在多个地方都有用到 先来看看INIT
宏:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 #define INIT(subtype) \ do { \ req->type = UV_FS; \ if (cb != NULL) \ uv__req_init(loop, req, UV_FS); \ req->fs_type = UV_FS_ ## subtype; \ req->result = 0; \ req->ptr = NULL; \ req->loop = loop; \ req->path = NULL; \ req->new_path = NULL; \ req->cb = cb; \ } \ while (0)
初始化req的属性, 值得注意的是, 当cb不为NULL(表示异步操作时), 需要调用uv__req_init
把req注册到loop中.PATH
宏主要是处理path
变量是否需要复制一份, 因为如果在异步回调时, path是否在存在都不知道了.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 #define PATH \ do { \ assert(path != NULL); \ if (cb == NULL) { \ req->path = path; \ } else { \ req->path = uv__strdup(path); \ if (req->path == NULL) { \ uv__req_unregister(loop, req); \ return -ENOMEM; \ } \ } \ } \ while (0)
POST宏也是根据同步和异步操作来进行不同的处理, 如果是同步操作, 直接调用uv__fs_work
并返回结果. 如果是异步操作, 那么调用uv__work_submit
将任务uv__fs_work
交给工作队列(线程池)来做. 根据上一篇所讲, 工作队列完成工作后, 最终会调用uv__fs_done
回调.
1 2 3 4 5 6 7 8 9 10 11 12 #define POST \ do { \ if (cb != NULL) { \ uv__work_submit(loop, &req->work_req, uv__fs_work, uv__fs_done); \ return 0; \ } \ else { \ uv__fs_work(&req->work_req); \ return req->result; \ } \ } \ while (0)
uv_fs_open
就是上面的这些操作. 我们再来看看uv_fs_read
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 int uv_fs_read (uv_loop_t * loop, uv_fs_t * req, uv_file file, const uv_buf_t bufs[], unsigned int nbufs, int64_t off, uv_fs_cb cb) { if (bufs == NULL || nbufs == 0 ) return -EINVAL; INIT(READ); req->file = file; req->nbufs = nbufs; req->bufs = req->bufsml; if (nbufs > ARRAY_SIZE(req->bufsml)) req->bufs = uv__malloc(nbufs * sizeof (*bufs)); if (req->bufs == NULL ) { if (cb != NULL ) uv__req_unregister(loop, req); return -ENOMEM; } memcpy (req->bufs, bufs, nbufs * sizeof (*bufs)); req->off = off; POST; }
可以看到, 又一次用到了INIT
和POST
宏, 我们关注一下中间的部分. 这段代码就是调整一下bufs的大小并将bufs描述信息复制到req中. 如果调整失败就取消这个read请求. 后面的操作就跟open类似了, 根据是否有回调来决定用线程或同步的调用read相关的系统调用来处理以此read请求. 对于write跟read类似, 我们就不说了 除了用直接的request来处理文件的操作, 我们还可以用stream来处理, 接下来, 我们就来讲讲这部份.
先来直接看一段使用stream的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 #include <stdio.h> #include <fcntl.h> #include <unistd.h> #include <string.h> #include <stdlib.h> #include <uv.h> typedef struct { uv_write_t req; uv_buf_t buf; } write_req_t ; uv_loop_t *loop;uv_pipe_t stdin_pipe;uv_pipe_t stdout_pipe;uv_pipe_t file_pipe;void alloc_buffer (uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) { *buf = uv_buf_init((char *) malloc (suggested_size), suggested_size); } void free_write_req (uv_write_t *req) { write_req_t *wr = (write_req_t *) req; free (wr->buf.base); free (wr); } void on_stdout_write (uv_write_t *req, int status) { free_write_req(req); } void on_file_write (uv_write_t *req, int status) { free_write_req(req); } void write_data (uv_stream_t *dest, size_t size, uv_buf_t buf, uv_write_cb cb) { write_req_t *req = (write_req_t *) malloc (sizeof (write_req_t )); req->buf = uv_buf_init((char *) malloc (size), size); memcpy (req->buf.base, buf.base, size); uv_write((uv_write_t *) req, (uv_stream_t *)dest, &req->buf, 1 , cb); } void read_stdin (uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) { if (nread < 0 ){ if (nread == UV_EOF){ uv_close((uv_handle_t *)&stdin_pipe, NULL ); uv_close((uv_handle_t *)&stdout_pipe, NULL ); uv_close((uv_handle_t *)&file_pipe, NULL ); } } else if (nread > 0 ) { write_data((uv_stream_t *)&stdout_pipe, nread, *buf, on_stdout_write); write_data((uv_stream_t *)&file_pipe, nread, *buf, on_file_write); } if (buf->base) free (buf->base); } int main (int argc, char **argv) { loop = uv_default_loop(); uv_pipe_init(loop, &stdin_pipe, 0 ); uv_pipe_open(&stdin_pipe, 0 ); uv_pipe_init(loop, &stdout_pipe, 0 ); uv_pipe_open(&stdout_pipe, 1 ); uv_fs_t file_req; int fd = uv_fs_open(loop, &file_req, argv[1 ], O_CREAT | O_RDWR, 0644 , NULL ); uv_pipe_init(loop, &file_pipe, 0 ); uv_pipe_open(&file_pipe, fd); uv_read_start((uv_stream_t *)&stdin_pipe, alloc_buffer, read_stdin); uv_run(loop, UV_RUN_DEFAULT); return 0 ; }
可以看到程序首先初始化三个uv pipe, 分别打开标准输入, 标准输出和参数表中的文件. 然后调用uv_read_start
注册读事件, 会有两个回调函数: alloc_buffer
(从而允许用户自己进行内存管理)和read_stdin
(读完成后的回调). 然后调用uv_run
启动event loop, 后面会在read_stdin
中注册其他事件来完成输入的显示和保存. 我们先来看看uv_pipe_init
:
1 2 3 4 5 6 7 8 int uv_pipe_init (uv_loop_t * loop, uv_pipe_t * handle, int ipc) { uv__stream_init(loop, (uv_stream_t *)handle, UV_NAMED_PIPE); handle->shutdown_req = NULL ; handle->connect_req = NULL ; handle->pipe_fname = NULL ; handle->ipc = ipc; return 0 ; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 void uv__stream_init (uv_loop_t * loop, uv_stream_t * stream, uv_handle_type type) { int err; uv__handle_init(loop, (uv_handle_t *)stream, type); stream->read_cb = NULL ; stream->alloc_cb = NULL ; stream->close_cb = NULL ; stream->connection_cb = NULL ; stream->connect_req = NULL ; stream->shutdown_req = NULL ; stream->accepted_fd = -1 ; stream->queued_fds = NULL ; stream->delayed_error = 0 ; QUEUE_INIT(&stream->write_queue); QUEUE_INIT(&stream->write_completed_queue); stream->write_queue_size = 0 ; if (loop->emfile_fd == -1 ) { err = uv__open_cloexec("/dev/null" , O_RDONLY); if (err < 0 ) err = uv__open_cloexec("/" , O_RDONLY); if (err >= 0 ) loop->emfile_fd = err; } #if defined(__APPLE__) stream->select = NULL ; #endif uv__io_init(&stream->io_watcher, uv__stream_io, -1 ); }
init部分还是比较简单的, 主要有uv__handle_init
和uv__io_init
. 前面说过了 然后uv_pipe_open
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 int uv_pipe_open (uv_pipe_t * handle, uv_file fd) { int err; err = uv__nonblock(fd, 1 ); if (err) return err; #if defined(__APPLE__) err = uv__stream_try_select((uv_stream_t *) handle, &fd); if (err) return err; #endif return uv__stream_open((uv_stream_t *)handle, fd, UV_STREAM_READABLE | UV_STREAM_WRITABLE); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 int uv__stream_open (uv_stream_t * stream, int fd, int flags) {#if defined(__APPLE__) int enable; #endif if (!(stream->io_watcher.fd == -1 || stream->io_watcher.fd == fd)) return -EBUSY; assert(fd >= 0 ); stream->flags |= flags; if (stream->type == UV_TCP) { if ((stream->flags & UV_TCP_NODELAY) && uv__tcp_nodelay(fd, 1 )) return -errno; if ((stream->flags & UV_TCP_KEEPALIVE) && uv__tcp_keepalive(fd, 1 , 60 )) return -errno; } #if defined(__APPLE__) enable = 1 ; if (setsockopt(fd, SOL_SOCKET, SO_OOBINLINE, &enable, sizeof (enable)) && errno != ENOTSOCK && errno != EINVAL) { return -errno; } #endif stream->io_watcher.fd = fd; return 0 ; }
非常简单, 除了对TCP进行特殊处理就是把fd保存到io_watcher中.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 int uv_read_start (uv_stream_t * stream, uv_alloc_cb alloc_cb, uv_read_cb read_cb) { assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE || stream->type == UV_TTY); if (stream->flags & UV_CLOSING) return -EINVAL; stream->flags |= UV_STREAM_READING; assert(uv__stream_fd(stream) >= 0 ); assert(alloc_cb); stream->read_cb = read_cb; stream->alloc_cb = alloc_cb; uv__io_start(stream->loop, &stream->io_watcher, POLLIN); uv__handle_start(stream); uv__stream_osx_interrupt_select(stream); return 0 ; }
主要是uv__io_start
和uv__handle_start
, 前者把事件放到watcher_queue, 这里要注意event是先放在pevents中的, 后面poll的时候才放到events中, 后者启动handle(stream)
那么alloc_cb
和read_cb
是如何调用到的呢? 在uv__pipe_init
被调用时, 调用了uv__stream_init
, 进而调用uv__io_init(&stream->io_watcher, uv__stream_io, -1)
, 这里设定了poll事件的回调为io__stream_io
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 static void uv__stream_io (uv_loop_t * loop, uv__io_t * w, unsigned int events) { uv_stream_t * stream; stream = container_of(w, uv_stream_t , io_watcher); assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE || stream->type == UV_TTY); assert(!(stream->flags & UV_CLOSING)); if (stream->connect_req) { uv__stream_connect(stream); return ; } assert(uv__stream_fd(stream) >= 0 ); if (events & (POLLIN | POLLERR | POLLHUP)) uv__read(stream); if (uv__stream_fd(stream) == -1 ) return ; if ((events & POLLHUP) && (stream->flags & UV_STREAM_READING) && (stream->flags & UV_STREAM_READ_PARTIAL) && !(stream->flags & UV_STREAM_READ_EOF)) { uv_buf_t buf = { NULL , 0 }; uv__stream_eof(stream, &buf); } if (uv__stream_fd(stream) == -1 ) return ; if (events & (POLLOUT | POLLERR | POLLHUP)) { uv__write(stream); uv__write_callbacks(stream); if (QUEUE_EMPTY(&stream->write_queue)) uv__drain(stream); } }
connect_req等到tcp再说. 这里会调用uv__read
, 在这里面会调用alloc_cb
分配内存, 对于ipc用uv__recvmsg
读数据, 否则用read
来读数据.可以看到stream根据不同的结果(如需要重读, 读出错, 读完成, 正常读)来用不同的方式处理并调用read_cb
, 对于数据没有读到要求那么多是会置起UV_STREAM_READ_PARTIAL
.
write部分我就不分析了, 其实也比较类似.
后面就会进入libuv网络部分的分析了.