libuv 源码分析(5) - 文件操作流程

by gngshn gngshn@gmail.com

上一篇, 我们讲到了libuv的工作队列, 这一篇我们讲到的文件操作刚好就用到了工作队列. 刚好复习一下.
先来看一段libuv文件操作的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
#include <assert.h>
#include <stdio.h>
#include <fcntl.h>
#include <unistd.h>
#include <uv.h>

void on_read(uv_fs_t *req);

uv_fs_t open_req;
uv_fs_t read_req;
uv_fs_t write_req;

static char buffer[1024];

static uv_buf_t iov;

void on_write(uv_fs_t *req) {
if (req->result < 0) {
fprintf(stderr, "Write error: %s\n", uv_strerror((int)req->result));
}
else {
uv_fs_read(uv_default_loop(), &read_req, open_req.result, &iov, 1, -1, on_read);
}
}

void on_read(uv_fs_t *req) {
if (req->result < 0) {
fprintf(stderr, "Read error: %s\n", uv_strerror(req->result));
}
else if (req->result == 0) {
uv_fs_t close_req;
// synchronous
uv_fs_close(uv_default_loop(), &close_req, open_req.result, NULL);
}
else if (req->result > 0) {
iov.len = req->result;
uv_fs_write(uv_default_loop(), &write_req, 1, &iov, 1, -1, on_write);
}
}

void on_open(uv_fs_t *req) {
// The request passed to the callback is the same as the one the call setup
// function was passed.
assert(req == &open_req);
if (req->result >= 0) {
iov = uv_buf_init(buffer, sizeof(buffer));
uv_fs_read(uv_default_loop(), &read_req, req->result,
&iov, 1, -1, on_read);
}
else {
fprintf(stderr, "error opening file: %s\n", uv_strerror((int)req->result));
}
}

int main(int argc, char **argv) {
uv_fs_open(uv_default_loop(), &open_req, argv[1], O_RDONLY, 0, on_open);
uv_run(uv_default_loop(), UV_RUN_DEFAULT);

uv_fs_req_cleanup(&open_req);
uv_fs_req_cleanup(&read_req);
uv_fs_req_cleanup(&write_req);
return 0;
}

可以看到, 首先调用uv_fs_open, 然后在open的回调函数on_open中调用uv_fs_read读取文件, 之后在read的回调函数on_read中调用uv_fs_close(同步)或uv_fs_write(异步), 在write的回调函数中继续调用uv_fs_read从事时间将文件全部都出来然后写入到标准输出(1)中.

下面我们就一步一步跟踪libuv的api来看看libuv是如何处理文件操作的.

uv_fs_open

uv_fs_open的定义如下

1
2
3
4
5
6
7
8
9
10
11
12
int uv_fs_open(uv_loop_t* loop,
uv_fs_t* req,
const char* path,
int flags,
int mode,
uv_fs_cb cb) {
INIT(OPEN);
PATH;
req->flags = flags;
req->mode = mode;
POST;
}

其中INIT, PATH, POST宏是为了减少重复代码, 后面将看见它在多个地方都有用到
先来看看INIT宏:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#define INIT(subtype)                                                         \
do { \
req->type = UV_FS; \
if (cb != NULL) \
uv__req_init(loop, req, UV_FS); \
req->fs_type = UV_FS_ ## subtype; \
req->result = 0; \
req->ptr = NULL; \
req->loop = loop; \
req->path = NULL; \
req->new_path = NULL; \
req->cb = cb; \
} \
while (0)

初始化req的属性, 值得注意的是, 当cb不为NULL(表示异步操作时), 需要调用uv__req_init把req注册到loop中.
PATH宏主要是处理path变量是否需要复制一份, 因为如果在异步回调时, path是否在存在都不知道了.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#define PATH                                                                  \
do { \
assert(path != NULL); \
if (cb == NULL) { \
req->path = path; \
} else { \
req->path = uv__strdup(path); \
if (req->path == NULL) { \
uv__req_unregister(loop, req); \
return -ENOMEM; \
} \
} \
} \
while (0)

POST宏也是根据同步和异步操作来进行不同的处理, 如果是同步操作, 直接调用uv__fs_work并返回结果. 如果是异步操作, 那么调用uv__work_submit将任务uv__fs_work交给工作队列(线程池)来做. 根据上一篇所讲, 工作队列完成工作后, 最终会调用uv__fs_done回调.

1
2
3
4
5
6
7
8
9
10
11
12
#define POST                                                                  \
do { \
if (cb != NULL) { \
uv__work_submit(loop, &req->work_req, uv__fs_work, uv__fs_done); \
return 0; \
} \
else { \
uv__fs_work(&req->work_req); \
return req->result; \
} \
} \
while (0)

uv_fs_open就是上面的这些操作.
我们再来看看uv_fs_read:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
int uv_fs_read(uv_loop_t* loop, uv_fs_t* req,
uv_file file,
const uv_buf_t bufs[],
unsigned int nbufs,
int64_t off,
uv_fs_cb cb) {
if (bufs == NULL || nbufs == 0)
return -EINVAL;

INIT(READ);
req->file = file;

req->nbufs = nbufs;
req->bufs = req->bufsml;
if (nbufs > ARRAY_SIZE(req->bufsml))
req->bufs = uv__malloc(nbufs * sizeof(*bufs));

if (req->bufs == NULL) {
if (cb != NULL)
uv__req_unregister(loop, req);
return -ENOMEM;
}

memcpy(req->bufs, bufs, nbufs * sizeof(*bufs));

req->off = off;
POST;
}

可以看到, 又一次用到了INITPOST宏, 我们关注一下中间的部分. 这段代码就是调整一下bufs的大小并将bufs描述信息复制到req中. 如果调整失败就取消这个read请求. 后面的操作就跟open类似了, 根据是否有回调来决定用线程或同步的调用read相关的系统调用来处理以此read请求.

对于write跟read类似, 我们就不说了

除了用直接的request来处理文件的操作, 我们还可以用stream来处理, 接下来, 我们就来讲讲这部份.

先来直接看一段使用stream的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
#include <stdio.h>
#include <fcntl.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>

#include <uv.h>

typedef struct {
uv_write_t req;
uv_buf_t buf;
} write_req_t;

uv_loop_t *loop;
uv_pipe_t stdin_pipe;
uv_pipe_t stdout_pipe;
uv_pipe_t file_pipe;

void alloc_buffer(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) {
*buf = uv_buf_init((char*) malloc(suggested_size), suggested_size);
}

void free_write_req(uv_write_t *req) {
write_req_t *wr = (write_req_t*) req;
free(wr->buf.base);
free(wr);
}

void on_stdout_write(uv_write_t *req, int status) {
free_write_req(req);
}

void on_file_write(uv_write_t *req, int status) {
free_write_req(req);
}

void write_data(uv_stream_t *dest, size_t size, uv_buf_t buf, uv_write_cb cb) {
write_req_t *req = (write_req_t*) malloc(sizeof(write_req_t));
req->buf = uv_buf_init((char*) malloc(size), size);
memcpy(req->buf.base, buf.base, size);
uv_write((uv_write_t*) req, (uv_stream_t*)dest, &req->buf, 1, cb);
}

void read_stdin(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) {
if (nread < 0){
if (nread == UV_EOF){
// end of file
uv_close((uv_handle_t *)&stdin_pipe, NULL);
uv_close((uv_handle_t *)&stdout_pipe, NULL);
uv_close((uv_handle_t *)&file_pipe, NULL);
}
} else if (nread > 0) {
write_data((uv_stream_t *)&stdout_pipe, nread, *buf, on_stdout_write);
write_data((uv_stream_t *)&file_pipe, nread, *buf, on_file_write);
}

// OK to free buffer as write_data copies it.
if (buf->base)
free(buf->base);
}

int main(int argc, char **argv) {
loop = uv_default_loop();

uv_pipe_init(loop, &stdin_pipe, 0);
uv_pipe_open(&stdin_pipe, 0);

uv_pipe_init(loop, &stdout_pipe, 0);
uv_pipe_open(&stdout_pipe, 1);

uv_fs_t file_req;
int fd = uv_fs_open(loop, &file_req, argv[1], O_CREAT | O_RDWR, 0644, NULL);
uv_pipe_init(loop, &file_pipe, 0);
uv_pipe_open(&file_pipe, fd);

uv_read_start((uv_stream_t*)&stdin_pipe, alloc_buffer, read_stdin);

uv_run(loop, UV_RUN_DEFAULT);
return 0;
}

可以看到程序首先初始化三个uv pipe, 分别打开标准输入, 标准输出和参数表中的文件. 然后调用uv_read_start注册读事件, 会有两个回调函数: alloc_buffer(从而允许用户自己进行内存管理)和read_stdin(读完成后的回调). 然后调用uv_run启动event loop, 后面会在read_stdin中注册其他事件来完成输入的显示和保存.
我们先来看看uv_pipe_init:

1
2
3
4
5
6
7
8
int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) {
uv__stream_init(loop, (uv_stream_t*)handle, UV_NAMED_PIPE);
handle->shutdown_req = NULL;
handle->connect_req = NULL;
handle->pipe_fname = NULL;
handle->ipc = ipc;
return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
void uv__stream_init(uv_loop_t* loop,
uv_stream_t* stream,
uv_handle_type type) {
int err;

uv__handle_init(loop, (uv_handle_t*)stream, type);
stream->read_cb = NULL;
stream->alloc_cb = NULL;
stream->close_cb = NULL;
stream->connection_cb = NULL;
stream->connect_req = NULL;
stream->shutdown_req = NULL;
stream->accepted_fd = -1;
stream->queued_fds = NULL;
stream->delayed_error = 0;
QUEUE_INIT(&stream->write_queue);
QUEUE_INIT(&stream->write_completed_queue);
stream->write_queue_size = 0;

if (loop->emfile_fd == -1) {
err = uv__open_cloexec("/dev/null", O_RDONLY);
if (err < 0)
/* In the rare case that "/dev/null" isn't mounted open "/"
* instead.
*/
err = uv__open_cloexec("/", O_RDONLY);
if (err >= 0)
loop->emfile_fd = err;
}

#if defined(__APPLE__)
stream->select = NULL;
#endif /* defined(__APPLE_) */

uv__io_init(&stream->io_watcher, uv__stream_io, -1);
}

init部分还是比较简单的, 主要有uv__handle_inituv__io_init. 前面说过了
然后uv_pipe_open:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
int uv_pipe_open(uv_pipe_t* handle, uv_file fd) {
int err;

err = uv__nonblock(fd, 1);
if (err)
return err;

#if defined(__APPLE__)
err = uv__stream_try_select((uv_stream_t*) handle, &fd);
if (err)
return err;
#endif /* defined(__APPLE__) */

return uv__stream_open((uv_stream_t*)handle,
fd,
UV_STREAM_READABLE | UV_STREAM_WRITABLE);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
int uv__stream_open(uv_stream_t* stream, int fd, int flags) {
#if defined(__APPLE__)
int enable;
#endif

if (!(stream->io_watcher.fd == -1 || stream->io_watcher.fd == fd))
return -EBUSY;

assert(fd >= 0);
stream->flags |= flags;

if (stream->type == UV_TCP) {
if ((stream->flags & UV_TCP_NODELAY) && uv__tcp_nodelay(fd, 1))
return -errno;

/* TODO Use delay the user passed in. */
if ((stream->flags & UV_TCP_KEEPALIVE) && uv__tcp_keepalive(fd, 1, 60))
return -errno;
}

#if defined(__APPLE__)
enable = 1;
if (setsockopt(fd, SOL_SOCKET, SO_OOBINLINE, &enable, sizeof(enable)) &&
errno != ENOTSOCK &&
errno != EINVAL) {
return -errno;
}
#endif

stream->io_watcher.fd = fd;

return 0;
}

非常简单, 除了对TCP进行特殊处理就是把fd保存到io_watcher中.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
int uv_read_start(uv_stream_t* stream,
uv_alloc_cb alloc_cb,
uv_read_cb read_cb) {
assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE ||
stream->type == UV_TTY);

if (stream->flags & UV_CLOSING)
return -EINVAL;

/* The UV_STREAM_READING flag is irrelevant of the state of the tcp - it just
* expresses the desired state of the user.
*/
stream->flags |= UV_STREAM_READING;

/* TODO: try to do the read inline? */
/* TODO: keep track of tcp state. If we've gotten a EOF then we should
* not start the IO watcher.
*/
assert(uv__stream_fd(stream) >= 0);
assert(alloc_cb);

stream->read_cb = read_cb;
stream->alloc_cb = alloc_cb;

uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
uv__handle_start(stream);
uv__stream_osx_interrupt_select(stream);

return 0;
}

主要是uv__io_startuv__handle_start, 前者把事件放到watcher_queue, 这里要注意event是先放在pevents中的, 后面poll的时候才放到events中, 后者启动handle(stream)

那么alloc_cbread_cb是如何调用到的呢?
uv__pipe_init被调用时, 调用了uv__stream_init, 进而调用uv__io_init(&stream->io_watcher, uv__stream_io, -1), 这里设定了poll事件的回调为io__stream_io:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
uv_stream_t* stream;

stream = container_of(w, uv_stream_t, io_watcher);

assert(stream->type == UV_TCP ||
stream->type == UV_NAMED_PIPE ||
stream->type == UV_TTY);
assert(!(stream->flags & UV_CLOSING));

if (stream->connect_req) {
uv__stream_connect(stream);
return;
}

assert(uv__stream_fd(stream) >= 0);

/* Ignore POLLHUP here. Even it it's set, there may still be data to read. */
if (events & (POLLIN | POLLERR | POLLHUP))
uv__read(stream);

if (uv__stream_fd(stream) == -1)
return; /* read_cb closed stream. */

/* Short-circuit iff POLLHUP is set, the user is still interested in read
* events and uv__read() reported a partial read but not EOF. If the EOF
* flag is set, uv__read() called read_cb with err=UV_EOF and we don't
* have to do anything. If the partial read flag is not set, we can't
* report the EOF yet because there is still data to read.
*/
if ((events & POLLHUP) &&
(stream->flags & UV_STREAM_READING) &&
(stream->flags & UV_STREAM_READ_PARTIAL) &&
!(stream->flags & UV_STREAM_READ_EOF)) {
uv_buf_t buf = { NULL, 0 };
uv__stream_eof(stream, &buf);
}

if (uv__stream_fd(stream) == -1)
return; /* read_cb closed stream. */

if (events & (POLLOUT | POLLERR | POLLHUP)) {
uv__write(stream);
uv__write_callbacks(stream);

/* Write queue drained. */
if (QUEUE_EMPTY(&stream->write_queue))
uv__drain(stream);
}
}

connect_req等到tcp再说. 这里会调用uv__read, 在这里面会调用alloc_cb分配内存, 对于ipc用uv__recvmsg读数据, 否则用read来读数据.可以看到stream根据不同的结果(如需要重读, 读出错, 读完成, 正常读)来用不同的方式处理并调用read_cb, 对于数据没有读到要求那么多是会置起UV_STREAM_READ_PARTIAL.

write部分我就不分析了, 其实也比较类似.

后面就会进入libuv网络部分的分析了.