diff --git a/deps/uv/include/uv-private/uv-unix.h b/deps/uv/include/uv-private/uv-unix.h index 29b052002a..71253bc9ad 100644 --- a/deps/uv/include/uv-private/uv-unix.h +++ b/deps/uv/include/uv-private/uv-unix.h @@ -70,6 +70,16 @@ typedef struct { char* errmsg; } uv_lib_t; +struct uv__io_s; +struct uv_loop_s; + +typedef struct uv__io_s uv__io_t; +typedef void (*uv__io_cb)(struct uv_loop_s* loop, uv__io_t* handle, int events); + +struct uv__io_s { + ev_io io_watcher; +}; + #define UV_REQ_TYPE_PRIVATE /* empty */ #if __linux__ @@ -78,7 +88,7 @@ typedef struct { struct uv__inotify_watchers { \ struct uv_fs_event_s* rbh_root; \ } inotify_watchers; \ - ev_io inotify_read_watcher; \ + uv__io_t inotify_read_watcher; \ int inotify_fd; #elif defined(PORT_SOURCE_FILE) # define UV_LOOP_PRIVATE_PLATFORM_FIELDS \ @@ -142,8 +152,8 @@ typedef struct { #define UV_STREAM_PRIVATE_FIELDS \ uv_connect_t *connect_req; \ uv_shutdown_t *shutdown_req; \ - ev_io read_watcher; \ - ev_io write_watcher; \ + uv__io_t read_watcher; \ + uv__io_t write_watcher; \ ngx_queue_t write_queue; \ ngx_queue_t write_completed_queue; \ int delayed_error; \ @@ -160,8 +170,8 @@ typedef struct { #define UV_UDP_PRIVATE_FIELDS \ uv_alloc_cb alloc_cb; \ uv_udp_recv_cb recv_cb; \ - ev_io read_watcher; \ - ev_io write_watcher; \ + uv__io_t read_watcher; \ + uv__io_t write_watcher; \ ngx_queue_t write_queue; \ ngx_queue_t write_completed_queue; \ @@ -173,7 +183,7 @@ typedef struct { /* UV_POLL */ #define UV_POLL_PRIVATE_FIELDS \ - ev_io io_watcher; + uv__io_t io_watcher; /* UV_PREPARE */ \ @@ -238,7 +248,6 @@ typedef struct { struct uv_fs_event_s* rbe_parent; \ int rbe_color; \ } node; \ - ev_io read_watcher; \ uv_fs_event_cb cb; #elif defined(__APPLE__) \ diff --git a/deps/uv/src/cares.c b/deps/uv/src/cares.c new file mode 100644 index 0000000000..c0acbec843 --- /dev/null +++ b/deps/uv/src/cares.c @@ -0,0 +1,225 @@ +/* Copyright Joyent, Inc. and other Node contributors. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +#include "uv.h" +#include "tree.h" +#include "uv-common.h" + +#include +#include +#include +#include + + +struct uv_ares_task_s { + UV_HANDLE_FIELDS + ares_socket_t sock; + uv_poll_t poll_watcher; + RB_ENTRY(uv_ares_task_s) node; +}; + + +static int cmp_ares_tasks(const uv_ares_task_t* a, const uv_ares_task_t* b) { + if (a->sock < b->sock) return -1; + if (a->sock > b->sock) return 1; + return 0; +} + + +RB_GENERATE_STATIC(uv__ares_tasks, uv_ares_task_s, node, cmp_ares_tasks) + + +/* Add ares handle to list. */ +static void uv_add_ares_handle(uv_loop_t* loop, uv_ares_task_t* handle) { + assert(loop == handle->loop); + RB_INSERT(uv__ares_tasks, &loop->ares_handles, handle); +} + + +/* Find matching ares handle in list. */ +static uv_ares_task_t* uv_find_ares_handle(uv_loop_t* loop, ares_socket_t sock) { + uv_ares_task_t handle; + handle.sock = sock; + return RB_FIND(uv__ares_tasks, &loop->ares_handles, &handle); +} + + +/* Remove ares handle from list. */ +static void uv_remove_ares_handle(uv_ares_task_t* handle) { + RB_REMOVE(uv__ares_tasks, &handle->loop->ares_handles, handle); +} + + +/* Returns 1 if the ares_handles list is empty, 0 otherwise. */ +static int uv_ares_handles_empty(uv_loop_t* loop) { + return RB_EMPTY(&loop->ares_handles); +} + + +/* This is called once per second by loop->timer. It is used to constantly */ +/* call back into c-ares for possibly processing timeouts. */ +static void uv__ares_timeout(uv_timer_t* handle, int status) { + assert(!uv_ares_handles_empty(handle->loop)); + ares_process_fd(handle->loop->channel, ARES_SOCKET_BAD, ARES_SOCKET_BAD); +} + + +static void uv__ares_poll_cb(uv_poll_t* watcher, int status, int events) { + uv_loop_t* loop = watcher->loop; + uv_ares_task_t* task = container_of(watcher, uv_ares_task_t, poll_watcher); + + /* Reset the idle timer */ + uv_timer_again(&loop->ares_timer); + + if (status < 0) { + /* An error happened. Just pretend that the socket is both readable and */ + /* writable. */ + ares_process_fd(loop->channel, task->sock, task->sock); + return; + } + + /* Process DNS responses */ + ares_process_fd(loop->channel, + events & UV_READABLE ? task->sock : ARES_SOCKET_BAD, + events & UV_WRITABLE ? task->sock : ARES_SOCKET_BAD); +} + + +static void uv__ares_poll_close_cb(uv_handle_t* watcher) { + uv_ares_task_t* task = container_of(watcher, uv_ares_task_t, poll_watcher); + free(task); +} + + +/* Allocates and returns a new uv_ares_task_t */ +static uv_ares_task_t* uv__ares_task_create(uv_loop_t* loop, ares_socket_t sock) { + uv_ares_task_t* task = (uv_ares_task_t*) malloc(sizeof *task); + + if (task == NULL) { + /* Out of memory. */ + return NULL; + } + + task->loop = loop; + task->sock = sock; + + if (uv_poll_init_socket(loop, &task->poll_watcher, sock) < 0) { + /* This should never happen. */ + free(task); + return NULL; + } + + return task; +} + + +/* Callback from ares when socket operation is started */ +static void uv__ares_sockstate_cb(void* data, ares_socket_t sock, + int read, int write) { + uv_loop_t* loop = (uv_loop_t*) data; + uv_ares_task_t* task; + + task = uv_find_ares_handle(loop, sock); + + if (read || write) { + if (!task) { + /* New socket */ + + /* If this is the first socket then start the timer. */ + if (!uv_is_active((uv_handle_t*) &loop->ares_timer)) { + assert(uv_ares_handles_empty(loop)); + uv_timer_start(&loop->ares_timer, uv__ares_timeout, 1000, 1000); + } + + task = uv__ares_task_create(loop, sock); + if (task == NULL) { + /* This should never happen unless we're out of memory or something */ + /* is seriously wrong. The socket won't be polled, but the the query */ + /* will eventually time out. */ + return; + } + + uv_add_ares_handle(loop, task); + } + + /* This should never fail. If it fails anyway, the query will eventually */ + /* time out. */ + uv_poll_start(&task->poll_watcher, + (read ? UV_READABLE : 0) | (write ? UV_WRITABLE : 0), + uv__ares_poll_cb); + + } else { + /* read == 0 and write == 0 this is c-ares's way of notifying us that */ + /* the socket is now closed. We must free the data associated with */ + /* socket. */ + assert(task && + "When an ares socket is closed we should have a handle for it"); + + uv_remove_ares_handle(task); + uv_close((uv_handle_t*) &task->poll_watcher, uv__ares_poll_close_cb); + + if (uv_ares_handles_empty(loop)) { + uv_timer_stop(&loop->ares_timer); + } + } +} + + +/* C-ares integration initialize and terminate */ +int uv_ares_init_options(uv_loop_t* loop, ares_channel *channelptr, + struct ares_options *options, int optmask) { + int rc; + + /* only allow single init at a time */ + if (loop->channel != NULL) { + uv__set_artificial_error(loop, UV_EALREADY); + return -1; + } + + /* set our callback as an option */ + options->sock_state_cb = uv__ares_sockstate_cb; + options->sock_state_cb_data = loop; + optmask |= ARES_OPT_SOCK_STATE_CB; + + /* We do the call to ares_init_option for caller. */ + rc = ares_init_options(channelptr, options, optmask); + + /* if success, save channel */ + if (rc == ARES_SUCCESS) { + loop->channel = *channelptr; + } + + /* Initialize the timeout timer. The timer won't be started until the */ + /* first socket is opened. */ + uv_timer_init(loop, &loop->ares_timer); + + return rc; +} + + +void uv_ares_destroy(uv_loop_t* loop, ares_channel channel) { + /* Only allow destroy if did init. */ + if (loop->channel) { + uv_timer_stop(&loop->ares_timer); + ares_destroy(channel); + loop->channel = NULL; + } +} diff --git a/deps/uv/src/unix/core.c b/deps/uv/src/unix/core.c index db0cc48aba..49785da035 100644 --- a/deps/uv/src/unix/core.c +++ b/deps/uv/src/unix/core.c @@ -592,3 +592,51 @@ uv_err_t uv_chdir(const char* dir) { return uv__new_sys_error(errno); } } + + +static void uv__io_set_cb(uv__io_t* handle, uv__io_cb cb) { + union { void* data; uv__io_cb cb; } u; + u.cb = cb; + handle->io_watcher.data = u.data; +} + + +static void uv__io_rw(struct ev_loop* ev, ev_io* w, int events) { + union { void* data; uv__io_cb cb; } u; + uv_loop_t* loop = ev_userdata(ev); + uv__io_t* handle = container_of(w, uv__io_t, io_watcher); + u.data = handle->io_watcher.data; + u.cb(loop, handle, events & (EV_READ|EV_WRITE|EV_ERROR)); +} + + +void uv__io_init(uv__io_t* handle, uv__io_cb cb, int fd, int events) { + ev_io_init(&handle->io_watcher, uv__io_rw, fd, events & (EV_READ|EV_WRITE)); + uv__io_set_cb(handle, cb); +} + + +void uv__io_set(uv__io_t* handle, uv__io_cb cb, int fd, int events) { + ev_io_set(&handle->io_watcher, fd, events); + uv__io_set_cb(handle, cb); +} + + +void uv__io_start(uv_loop_t* loop, uv__io_t* handle) { + ev_io_start(loop->ev, &handle->io_watcher); +} + + +void uv__io_stop(uv_loop_t* loop, uv__io_t* handle) { + ev_io_stop(loop->ev, &handle->io_watcher); +} + + +void uv__io_feed(uv_loop_t* loop, uv__io_t* handle, int event) { + ev_feed_event(loop->ev, &handle->io_watcher, event); +} + + +int uv__io_active(uv__io_t* handle) { + return ev_is_active(&handle->io_watcher); +} diff --git a/deps/uv/src/unix/internal.h b/deps/uv/src/unix/internal.h index 25213cc4da..328d76d77b 100644 --- a/deps/uv/src/unix/internal.h +++ b/deps/uv/src/unix/internal.h @@ -78,6 +78,10 @@ } \ while (0) +#define UV__IO_READ EV_READ +#define UV__IO_WRITE EV_WRITE +#define UV__IO_ERROR EV_ERROR + /* flags */ enum { UV_CLOSING = 0x01, /* uv_close() called but not finished. */ @@ -127,6 +131,13 @@ int uv__socket(int domain, int type, int protocol); int uv__dup(int fd); int uv_async_stop(uv_async_t* handle); +void uv__io_init(uv__io_t* handle, uv__io_cb cb, int fd, int events); +void uv__io_set(uv__io_t* handle, uv__io_cb cb, int fd, int events); +void uv__io_start(uv_loop_t* loop, uv__io_t* handle); +void uv__io_stop(uv_loop_t* loop, uv__io_t* handle); +void uv__io_feed(uv_loop_t* loop, uv__io_t* handle, int event); +int uv__io_active(uv__io_t* handle); + /* loop */ int uv__loop_init(uv_loop_t* loop, int default_loop); void uv__loop_delete(uv_loop_t* loop); @@ -143,8 +154,7 @@ void uv__stream_init(uv_loop_t* loop, uv_stream_t* stream, uv_handle_type type); int uv__stream_open(uv_stream_t*, int fd, int flags); void uv__stream_destroy(uv_stream_t* stream); -void uv__stream_io(EV_P_ ev_io* watcher, int revents); -void uv__server_io(EV_P_ ev_io* watcher, int revents); +void uv__server_io(uv_loop_t* loop, uv__io_t* watcher, int events); int uv__accept(int sockfd, struct sockaddr* saddr, socklen_t len); int uv__connect(uv_connect_t* req, uv_stream_t* stream, struct sockaddr* addr, socklen_t addrlen, uv_connect_cb cb); @@ -156,11 +166,9 @@ int uv__tcp_keepalive(uv_tcp_t* handle, int enable, unsigned int delay); /* pipe */ int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb); -void uv__pipe_accept(EV_P_ ev_io* watcher, int revents); /* poll */ void uv__poll_close(uv_poll_t* handle); -int uv__poll_active(const uv_poll_t* handle); /* various */ void uv__async_close(uv_async_t* handle); diff --git a/deps/uv/src/unix/linux/inotify.c b/deps/uv/src/unix/linux/inotify.c index 564e47a547..54525bc7f1 100644 --- a/deps/uv/src/unix/linux/inotify.c +++ b/deps/uv/src/unix/linux/inotify.c @@ -51,7 +51,7 @@ static int compare_watchers(const uv_fs_event_t* a, const uv_fs_event_t* b) { RB_GENERATE_STATIC(uv__inotify_watchers, uv_fs_event_s, node, compare_watchers) -static void uv__inotify_read(EV_P_ ev_io* w, int revents); +static void uv__inotify_read(uv_loop_t* loop, uv__io_t* w, int revents); static int new_inotify_fd(void) { @@ -85,11 +85,11 @@ static int init_inotify(uv_loop_t* loop) { return -1; } - ev_io_init(&loop->inotify_read_watcher, - uv__inotify_read, - loop->inotify_fd, - EV_READ); - ev_io_start(loop->ev, &loop->inotify_read_watcher); + uv__io_init(&loop->inotify_read_watcher, + uv__inotify_read, + loop->inotify_fd, + UV__IO_READ); + uv__io_start(loop, &loop->inotify_read_watcher); return 0; } @@ -112,22 +112,18 @@ static void remove_watcher(uv_fs_event_t* handle) { } -static void uv__inotify_read(EV_P_ ev_io* w, int revents) { +static void uv__inotify_read(uv_loop_t* loop, uv__io_t* w, int events) { const struct uv__inotify_event* e; uv_fs_event_t* handle; - uv_loop_t* uv_loop; const char* filename; ssize_t size; - int events; const char *p; /* needs to be large enough for sizeof(inotify_event) + strlen(filename) */ char buf[4096]; - uv_loop = container_of(w, uv_loop_t, inotify_read_watcher); - while (1) { do { - size = read(uv_loop->inotify_fd, buf, sizeof buf); + size = read(loop->inotify_fd, buf, sizeof buf); } while (size == -1 && errno == EINTR); @@ -148,7 +144,7 @@ static void uv__inotify_read(EV_P_ ev_io* w, int revents) { if (e->mask & ~(UV__IN_ATTRIB|UV__IN_MODIFY)) events |= UV_RENAME; - handle = find_watcher(uv_loop, e->wd); + handle = find_watcher(loop, e->wd); if (handle == NULL) continue; /* Handle has already been closed. */ diff --git a/deps/uv/src/unix/loop.c b/deps/uv/src/unix/loop.c index 1373db027e..08b34994fc 100644 --- a/deps/uv/src/unix/loop.c +++ b/deps/uv/src/unix/loop.c @@ -66,10 +66,11 @@ void uv__loop_delete(uv_loop_t* loop) { uv_ares_destroy(loop, loop->channel); ev_loop_destroy(loop->ev); #if __linux__ - if (loop->inotify_fd == -1) return; - ev_io_stop(loop->ev, &loop->inotify_read_watcher); - close(loop->inotify_fd); - loop->inotify_fd = -1; + if (loop->inotify_fd != -1) { + uv__io_stop(loop, &loop->inotify_read_watcher); + close(loop->inotify_fd); + loop->inotify_fd = -1; + } #endif #if HAVE_PORTS_FS if (loop->fs_fd != -1) diff --git a/deps/uv/src/unix/pipe.c b/deps/uv/src/unix/pipe.c index 91afb0811e..fb40e9622a 100644 --- a/deps/uv/src/unix/pipe.c +++ b/deps/uv/src/unix/pipe.c @@ -29,6 +29,8 @@ #include #include +static void uv__pipe_accept(uv_loop_t* loop, uv__io_t* w, int events); + int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) { uv__stream_init(loop, (uv_stream_t*)handle, UV_NAMED_PIPE); @@ -138,8 +140,11 @@ int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb) { uv__set_sys_error(handle->loop, errno); } else { handle->connection_cb = cb; - ev_io_init(&handle->read_watcher, uv__pipe_accept, handle->fd, EV_READ); - ev_io_start(handle->loop->ev, &handle->read_watcher); + uv__io_init(&handle->read_watcher, + uv__pipe_accept, + handle->fd, + UV__IO_READ); + uv__io_start(handle->loop, &handle->read_watcher); } out: @@ -211,8 +216,8 @@ void uv_pipe_connect(uv_connect_t* req, uv__stream_open((uv_stream_t*)handle, sockfd, UV_STREAM_READABLE | UV_STREAM_WRITABLE); - ev_io_start(handle->loop->ev, &handle->read_watcher); - ev_io_start(handle->loop->ev, &handle->write_watcher); + uv__io_start(handle->loop, &handle->read_watcher); + uv__io_start(handle->loop, &handle->write_watcher); status = 0; out: @@ -235,14 +240,14 @@ out: /* TODO merge with uv__server_io()? */ -void uv__pipe_accept(EV_P_ ev_io* watcher, int revents) { +static void uv__pipe_accept(uv_loop_t* loop, uv__io_t* w, int events) { struct sockaddr_un saddr; uv_pipe_t* pipe; int saved_errno; int sockfd; saved_errno = errno; - pipe = watcher->data; + pipe = container_of(w, uv_pipe_t, read_watcher); assert(pipe->type == UV_NAMED_PIPE); @@ -257,7 +262,7 @@ void uv__pipe_accept(EV_P_ ev_io* watcher, int revents) { pipe->connection_cb((uv_stream_t*)pipe, 0); if (pipe->accepted_fd == sockfd) { /* The user hasn't called uv_accept() yet */ - ev_io_stop(pipe->loop->ev, &pipe->read_watcher); + uv__io_stop(pipe->loop, &pipe->read_watcher); } } diff --git a/deps/uv/src/unix/poll.c b/deps/uv/src/unix/poll.c index 45def2c15a..f0a4344e18 100644 --- a/deps/uv/src/unix/poll.c +++ b/deps/uv/src/unix/poll.c @@ -27,11 +27,13 @@ #include -static void uv__poll_io(EV_P_ ev_io* watcher, int ev_events) { - uv_poll_t* handle = watcher->data; - int events; +static void uv__poll_io(uv_loop_t* loop, uv__io_t* w, int events) { + uv_poll_t* handle; + int pevents; + + handle = container_of(w, uv_poll_t, io_watcher); - if (ev_events & EV_ERROR) { + if (events & UV__IO_ERROR) { /* An error happened. Libev has implicitly stopped the watcher, but we */ /* need to fix the refcount. */ uv__handle_stop(handle); @@ -40,16 +42,13 @@ static void uv__poll_io(EV_P_ ev_io* watcher, int ev_events) { return; } - assert(ev_events & (EV_READ | EV_WRITE)); - assert((ev_events & ~(EV_READ | EV_WRITE)) == 0); + pevents = 0; + if (events & UV__IO_READ) + pevents |= UV_READABLE; + if (events & UV__IO_WRITE) + pevents |= UV_WRITABLE; - events = 0; - if (ev_events & EV_READ) - events |= UV_READABLE; - if (ev_events & EV_WRITE) - events |= UV_WRITABLE; - - handle->poll_cb(handle, 0, events); + handle->poll_cb(handle, 0, pevents); } @@ -59,9 +58,7 @@ int uv_poll_init(uv_loop_t* loop, uv_poll_t* handle, int fd) { handle->fd = fd; handle->poll_cb = NULL; - - ev_init(&handle->io_watcher, uv__poll_io); - handle->io_watcher.data = handle; + uv__io_init(&handle->io_watcher, uv__poll_io, fd, 0); return 0; } @@ -74,7 +71,7 @@ int uv_poll_init_socket(uv_loop_t* loop, uv_poll_t* handle, static void uv__poll_stop(uv_poll_t* handle) { - ev_io_stop(handle->loop->ev, &handle->io_watcher); + uv__io_stop(handle->loop, &handle->io_watcher); uv__handle_stop(handle); } @@ -86,25 +83,25 @@ int uv_poll_stop(uv_poll_t* handle) { } -int uv_poll_start(uv_poll_t* handle, int events, uv_poll_cb poll_cb) { - int ev_events; +int uv_poll_start(uv_poll_t* handle, int pevents, uv_poll_cb poll_cb) { + int events; - assert((events & ~(UV_READABLE | UV_WRITABLE)) == 0); + assert((pevents & ~(UV_READABLE | UV_WRITABLE)) == 0); assert(!(handle->flags & (UV_CLOSING | UV_CLOSED))); - if (events == 0) { + if (pevents == 0) { uv__poll_stop(handle); return 0; } - ev_events = 0; - if (events & UV_READABLE) - ev_events |= EV_READ; - if (events & UV_WRITABLE) - ev_events |= EV_WRITE; + events = 0; + if (pevents & UV_READABLE) + events |= UV__IO_READ; + if (pevents & UV_WRITABLE) + events |= UV__IO_WRITE; - ev_io_set(&handle->io_watcher, handle->fd, ev_events); - ev_io_start(handle->loop->ev, &handle->io_watcher); + uv__io_set(&handle->io_watcher, uv__poll_io, handle->fd, events); + uv__io_start(handle->loop, &handle->io_watcher); handle->poll_cb = poll_cb; uv__handle_start(handle); @@ -116,8 +113,3 @@ int uv_poll_start(uv_poll_t* handle, int events, uv_poll_cb poll_cb) { void uv__poll_close(uv_poll_t* handle) { uv__poll_stop(handle); } - - -int uv__poll_active(const uv_poll_t* handle) { - return ev_is_active(&handle->io_watcher); -} diff --git a/deps/uv/src/unix/stream.c b/deps/uv/src/unix/stream.c index 76e2581908..f441c9b945 100644 --- a/deps/uv/src/unix/stream.c +++ b/deps/uv/src/unix/stream.c @@ -38,6 +38,7 @@ static void uv__stream_connect(uv_stream_t*); static void uv__write(uv_stream_t* stream); static void uv__read(uv_stream_t* stream); +static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, int events); static size_t uv__buf_count(uv_buf_t bufs[], int bufcnt) { @@ -71,15 +72,8 @@ void uv__stream_init(uv_loop_t* loop, ngx_queue_init(&stream->write_completed_queue); stream->write_queue_size = 0; - ev_init(&stream->read_watcher, uv__stream_io); - stream->read_watcher.data = stream; - - ev_init(&stream->write_watcher, uv__stream_io); - stream->write_watcher.data = stream; - - assert(ngx_queue_empty(&stream->write_queue)); - assert(ngx_queue_empty(&stream->write_completed_queue)); - assert(stream->write_queue_size == 0); + uv__io_init(&stream->read_watcher, uv__stream_io, -1, 0); + uv__io_init(&stream->write_watcher, uv__stream_io, -1, 0); } @@ -111,13 +105,9 @@ int uv__stream_open(uv_stream_t* stream, int fd, int flags) { } } - /* Associate the fd with each ev_io watcher. */ - ev_io_set(&stream->read_watcher, fd, EV_READ); - ev_io_set(&stream->write_watcher, fd, EV_WRITE); - - /* These should have been set up by uv_tcp_init or uv_pipe_init. */ - assert(stream->read_watcher.cb == uv__stream_io); - assert(stream->write_watcher.cb == uv__stream_io); + /* Associate the fd with each watcher. */ + uv__io_set(&stream->read_watcher, uv__stream_io, fd, UV__IO_READ); + uv__io_set(&stream->write_watcher, uv__stream_io, fd, UV__IO_WRITE); return 0; } @@ -174,19 +164,16 @@ void uv__stream_destroy(uv_stream_t* stream) { } -void uv__server_io(EV_P_ ev_io* watcher, int revents) { +void uv__server_io(uv_loop_t* loop, uv__io_t* w, int events) { int fd; struct sockaddr_storage addr; - uv_stream_t* stream = watcher->data; - - assert(watcher == &stream->read_watcher || - watcher == &stream->write_watcher); - assert(revents == EV_READ); + uv_stream_t* stream = container_of(w, uv_stream_t, read_watcher); + assert(events == UV__IO_READ); assert(!(stream->flags & UV_CLOSING)); if (stream->accepted_fd >= 0) { - ev_io_stop(EV_A, &stream->read_watcher); + uv__io_stop(loop, &stream->read_watcher); return; } @@ -216,7 +203,7 @@ void uv__server_io(EV_P_ ev_io* watcher, int revents) { stream->connection_cb((uv_stream_t*)stream, 0); if (stream->accepted_fd >= 0) { /* The user hasn't yet accepted called uv_accept() */ - ev_io_stop(stream->loop->ev, &stream->read_watcher); + uv__io_stop(stream->loop, &stream->read_watcher); return; } } @@ -252,7 +239,7 @@ int uv_accept(uv_stream_t* server, uv_stream_t* client) { goto out; } - ev_io_start(streamServer->loop->ev, &streamServer->read_watcher); + uv__io_start(streamServer->loop, &streamServer->read_watcher); streamServer->accepted_fd = -1; status = 0; @@ -312,7 +299,7 @@ static void uv__drain(uv_stream_t* stream) { assert(!uv_write_queue_head(stream)); assert(stream->write_queue_size == 0); - ev_io_stop(stream->loop->ev, &stream->write_watcher); + uv__io_stop(stream->loop, &stream->write_watcher); /* Shutdown? */ if ((stream->flags & UV_STREAM_SHUTTING) && @@ -366,7 +353,7 @@ static void uv__write_req_finish(uv_write_t* req) { * callback called in the near future. */ ngx_queue_insert_tail(&stream->write_completed_queue, &req->queue); - ev_feed_event(stream->loop->ev, &stream->write_watcher, EV_WRITE); + uv__io_feed(stream->loop, &stream->write_watcher, UV__IO_WRITE); } @@ -517,7 +504,7 @@ start: assert(!stream->blocking); /* We're not done. */ - ev_io_start(stream->loop->ev, &stream->write_watcher); + uv__io_start(stream->loop, &stream->write_watcher); } @@ -576,7 +563,6 @@ static void uv__read(uv_stream_t* stream) { struct msghdr msg; struct cmsghdr* cmsg; char cmsg_space[64]; - struct ev_loop* ev = stream->loop->ev; /* XXX: Maybe instead of having UV_STREAM_READING we just test if * tcp->read_cb is NULL or not? @@ -619,7 +605,7 @@ static void uv__read(uv_stream_t* stream) { if (errno == EAGAIN || errno == EWOULDBLOCK) { /* Wait for the next one. */ if (stream->flags & UV_STREAM_READING) { - ev_io_start(ev, &stream->read_watcher); + uv__io_start(stream->loop, &stream->read_watcher); } uv__set_sys_error(stream->loop, EAGAIN); @@ -647,7 +633,7 @@ static void uv__read(uv_stream_t* stream) { } else if (nread == 0) { /* EOF */ uv__set_artificial_error(stream->loop, UV_EOF); - ev_io_stop(ev, &stream->read_watcher); + uv__io_stop(stream->loop, &stream->read_watcher); if (!ev_is_active(&stream->write_watcher)) uv__handle_stop(stream); @@ -728,40 +714,43 @@ int uv_shutdown(uv_shutdown_t* req, uv_stream_t* stream, uv_shutdown_cb cb) { stream->shutdown_req = req; stream->flags |= UV_STREAM_SHUTTING; - ev_io_start(stream->loop->ev, &stream->write_watcher); + uv__io_start(stream->loop, &stream->write_watcher); return 0; } void uv__stream_pending(uv_stream_t* handle) { - uv__stream_io(handle->loop->ev, &handle->write_watcher, EV_WRITE); + uv__stream_io(handle->loop, &handle->write_watcher, UV__IO_WRITE); } -void uv__stream_io(EV_P_ ev_io* watcher, int revents) { - uv_stream_t* stream = watcher->data; +static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, int events) { + uv_stream_t* stream; - assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE || - stream->type == UV_TTY); - assert(watcher == &stream->read_watcher || - watcher == &stream->write_watcher); + /* either UV__IO_READ or UV__IO_WRITE but not both */ + assert(!!(events & UV__IO_READ) ^ !!(events & UV__IO_WRITE)); + + if (events & UV__IO_READ) + stream = container_of(w, uv_stream_t, read_watcher); + else + stream = container_of(w, uv_stream_t, write_watcher); + + assert(stream->type == UV_TCP || + stream->type == UV_NAMED_PIPE || + stream->type == UV_TTY); assert(!(stream->flags & UV_CLOSING)); - if (stream->connect_req) { + if (stream->connect_req) uv__stream_connect(stream); - } else { - assert(revents & (EV_READ | EV_WRITE)); + else if (events & UV__IO_READ) { assert(stream->fd >= 0); - - if (revents & EV_READ) { - uv__read((uv_stream_t*)stream); - } - - if (revents & EV_WRITE) { - uv__write(stream); - uv__write_callbacks(stream); - } + uv__read(stream); + } + else { + assert(stream->fd >= 0); + uv__write(stream); + uv__write_callbacks(stream); } } @@ -796,7 +785,7 @@ static void uv__stream_connect(uv_stream_t* stream) { return; if (error == 0) - ev_io_start(stream->loop->ev, &stream->read_watcher); + uv__io_start(stream->loop, &stream->read_watcher); stream->connect_req = NULL; uv__req_unregister(stream->loop, req); @@ -869,8 +858,7 @@ int uv__connect(uv_connect_t* req, uv_stream_t* stream, struct sockaddr* addr, } } - assert(stream->write_watcher.data == stream); - ev_io_start(stream->loop->ev, &stream->write_watcher); + uv__io_start(stream->loop, &stream->write_watcher); if (stream->delayed_error) uv__make_pending(stream); @@ -930,9 +918,6 @@ int uv_write2(uv_write_t* req, uv_stream_t* stream, uv_buf_t bufs[], int bufcnt, ngx_queue_insert_tail(&stream->write_queue, &req->queue); assert(!ngx_queue_empty(&stream->write_queue)); - assert(stream->write_watcher.cb == uv__stream_io); - assert(stream->write_watcher.data == stream); - assert(stream->write_watcher.fd == stream->fd); /* If the queue was empty when this function began, we should attempt to * do the write immediately. Otherwise start the write_watcher and wait @@ -948,7 +933,7 @@ int uv_write2(uv_write_t* req, uv_stream_t* stream, uv_buf_t bufs[], int bufcnt, */ assert(!stream->blocking); - ev_io_start(stream->loop->ev, &stream->write_watcher); + uv__io_start(stream->loop, &stream->write_watcher); } return 0; @@ -990,10 +975,7 @@ int uv__read_start_common(uv_stream_t* stream, uv_alloc_cb alloc_cb, stream->read2_cb = read2_cb; stream->alloc_cb = alloc_cb; - /* These should have been set by uv_tcp_init. */ - assert(stream->read_watcher.cb == uv__stream_io); - - ev_io_start(stream->loop->ev, &stream->read_watcher); + uv__io_start(stream->loop, &stream->read_watcher); uv__handle_start(stream); return 0; @@ -1013,7 +995,7 @@ int uv_read2_start(uv_stream_t* stream, uv_alloc_cb alloc_cb, int uv_read_stop(uv_stream_t* stream) { - ev_io_stop(stream->loop->ev, &stream->read_watcher); + uv__io_stop(stream->loop, &stream->read_watcher); uv__handle_stop(stream); stream->flags &= ~UV_STREAM_READING; stream->read_cb = NULL; @@ -1035,7 +1017,7 @@ int uv_is_writable(const uv_stream_t* stream) { void uv__stream_close(uv_stream_t* handle) { uv_read_stop(handle); - ev_io_stop(handle->loop->ev, &handle->write_watcher); + uv__io_stop(handle->loop, &handle->write_watcher); close(handle->fd); handle->fd = -1; diff --git a/deps/uv/src/unix/tcp.c b/deps/uv/src/unix/tcp.c index a28db550ef..07ad2d9eca 100644 --- a/deps/uv/src/unix/tcp.c +++ b/deps/uv/src/unix/tcp.c @@ -201,9 +201,8 @@ int uv_tcp_listen(uv_tcp_t* tcp, int backlog, uv_connection_cb cb) { tcp->connection_cb = cb; /* Start listening for connections. */ - ev_io_set(&tcp->read_watcher, tcp->fd, EV_READ); - ev_set_cb(&tcp->read_watcher, uv__server_io); - ev_io_start(tcp->loop->ev, &tcp->read_watcher); + uv__io_set(&tcp->read_watcher, uv__server_io, tcp->fd, UV__IO_READ); + uv__io_start(tcp->loop, &tcp->read_watcher); return 0; } diff --git a/deps/uv/src/unix/udp.c b/deps/uv/src/unix/udp.c index c67dcc2c9d..a9c03066ad 100644 --- a/deps/uv/src/unix/udp.c +++ b/deps/uv/src/unix/udp.c @@ -31,33 +31,31 @@ static void uv__udp_run_completed(uv_udp_t* handle); static void uv__udp_run_pending(uv_udp_t* handle); -static void uv__udp_recvmsg(EV_P_ ev_io* w, int revents); -static void uv__udp_sendmsg(EV_P_ ev_io* w, int revents); +static void uv__udp_recvmsg(uv_loop_t* loop, uv__io_t* w, int revents); +static void uv__udp_sendmsg(uv_loop_t* loop, uv__io_t* w, int revents); static int uv__udp_maybe_deferred_bind(uv_udp_t* handle, int domain); static int uv__udp_send(uv_udp_send_t* req, uv_udp_t* handle, uv_buf_t bufs[], int bufcnt, struct sockaddr* addr, socklen_t addrlen, uv_udp_send_cb send_cb); static void uv__udp_start_watcher(uv_udp_t* handle, - ev_io* w, - void (*cb)(EV_P_ ev_io*, int), - int flags) { - if (ev_is_active(w)) return; - ev_set_cb(w, cb); - ev_io_set(w, handle->fd, flags); - ev_io_start(handle->loop->ev, w); + uv__io_t* w, + uv__io_cb cb, + int events) { + if (uv__io_active(w)) return; + uv__io_init(w, cb, handle->fd, events); + uv__io_start(handle->loop, w); uv__handle_start(handle); } -static void uv__udp_stop_watcher(uv_udp_t* handle, ev_io* w) { - if (!ev_is_active(w)) return; - ev_io_stop(handle->loop->ev, w); - ev_io_set(w, -1, 0); - ev_set_cb(w, NULL); +static void uv__udp_stop_watcher(uv_udp_t* handle, uv__io_t* w) { + if (!uv__io_active(w)) return; + uv__io_stop(handle->loop, w); - if (!ev_is_active(&handle->read_watcher) && - !ev_is_active(&handle->write_watcher)) { + if (!uv__io_active(&handle->read_watcher) && + !uv__io_active(&handle->write_watcher)) + { uv__handle_stop(handle); } } @@ -67,7 +65,7 @@ static void uv__udp_start_read_watcher(uv_udp_t* handle) { uv__udp_start_watcher(handle, &handle->read_watcher, uv__udp_recvmsg, - EV_READ); + UV__IO_READ); } @@ -75,7 +73,7 @@ static void uv__udp_start_write_watcher(uv_udp_t* handle) { uv__udp_start_watcher(handle, &handle->write_watcher, uv__udp_sendmsg, - EV_WRITE); + UV__IO_WRITE); } @@ -101,8 +99,8 @@ void uv__udp_finish_close(uv_udp_t* handle) { uv_udp_send_t* req; ngx_queue_t* q; - assert(!ev_is_active(&handle->write_watcher)); - assert(!ev_is_active(&handle->read_watcher)); + assert(!uv__io_active(&handle->write_watcher)); + assert(!uv__io_active(&handle->read_watcher)); assert(handle->fd == -1); uv__udp_run_completed(handle); @@ -216,7 +214,7 @@ static void uv__udp_run_completed(uv_udp_t* handle) { } -static void uv__udp_recvmsg(EV_P_ ev_io* w, int revents) { +static void uv__udp_recvmsg(uv_loop_t* loop, uv__io_t* w, int revents) { struct sockaddr_storage peer; struct msghdr h; uv_udp_t* handle; @@ -278,7 +276,7 @@ static void uv__udp_recvmsg(EV_P_ ev_io* w, int revents) { } -static void uv__udp_sendmsg(EV_P_ ev_io* w, int revents) { +static void uv__udp_sendmsg(uv_loop_t* loop, uv__io_t* w, int revents) { uv_udp_t* handle; handle = container_of(w, uv_udp_t, write_watcher); @@ -296,7 +294,7 @@ static void uv__udp_sendmsg(EV_P_ ev_io* w, int revents) { if (!ngx_queue_empty(&handle->write_completed_queue)) { /* Schedule completion callbacks. */ - ev_feed_event(handle->loop->ev, &handle->write_watcher, EV_WRITE); + uv__io_feed(handle->loop, &handle->write_watcher, UV__IO_WRITE); } else if (ngx_queue_empty(&handle->write_queue)) { /* Pending queue and completion queue empty, stop watcher. */ @@ -649,7 +647,7 @@ int uv_udp_recv_start(uv_udp_t* handle, return -1; } - if (ev_is_active(&handle->read_watcher)) { + if (uv__io_active(&handle->read_watcher)) { uv__set_artificial_error(handle->loop, UV_EALREADY); return -1; } diff --git a/deps/uv/test/test-tcp-writealot.c b/deps/uv/test/test-tcp-writealot.c index a8c28bff49..09c96f1645 100644 --- a/deps/uv/test/test-tcp-writealot.c +++ b/deps/uv/test/test-tcp-writealot.c @@ -31,10 +31,8 @@ #define TOTAL_BYTES (WRITES * CHUNKS_PER_WRITE * CHUNK_SIZE) - static char* send_buffer; - static int shutdown_cb_called = 0; static int connect_cb_called = 0; static int write_cb_called = 0; @@ -43,20 +41,18 @@ static int bytes_sent = 0; static int bytes_sent_done = 0; static int bytes_received_done = 0; +static uv_connect_t connect_req; +static uv_shutdown_t shutdown_req; +static uv_write_t write_reqs[WRITES]; + static uv_buf_t alloc_cb(uv_handle_t* handle, size_t size) { - uv_buf_t buf; - buf.base = (char*)malloc(size); - buf.len = size; - return buf; + return uv_buf_init(malloc(size), size); } static void close_cb(uv_handle_t* handle) { ASSERT(handle != NULL); - - free(handle); - close_cb_called++; } @@ -64,7 +60,7 @@ static void close_cb(uv_handle_t* handle) { static void shutdown_cb(uv_shutdown_t* req, int status) { uv_tcp_t* tcp; - ASSERT(req); + ASSERT(req == &shutdown_req); ASSERT(status == 0); tcp = (uv_tcp_t*)(req->handle); @@ -77,28 +73,21 @@ static void shutdown_cb(uv_shutdown_t* req, int status) { /* We should have had all the writes called already. */ ASSERT(write_cb_called == WRITES); - - free(req); } static void read_cb(uv_stream_t* tcp, ssize_t nread, uv_buf_t buf) { ASSERT(tcp != NULL); - if (nread < 0) { + if (nread >= 0) { + bytes_received_done += nread; + } + else { ASSERT(uv_last_error(uv_default_loop()).code == UV_EOF); printf("GOT EOF\n"); - - if (buf.base) { - free(buf.base); - } - uv_close((uv_handle_t*)tcp, close_cb); - return; } - bytes_received_done += nread; - free(buf.base); } @@ -114,71 +103,55 @@ static void write_cb(uv_write_t* req, int status) { bytes_sent_done += CHUNKS_PER_WRITE * CHUNK_SIZE; write_cb_called++; - - free(req); } static void connect_cb(uv_connect_t* req, int status) { uv_buf_t send_bufs[CHUNKS_PER_WRITE]; - uv_tcp_t* tcp; - uv_write_t* write_req; - uv_shutdown_t* shutdown_req; + uv_stream_t* stream; int i, j, r; - ASSERT(req != NULL); + ASSERT(req == &connect_req); ASSERT(status == 0); - tcp = (uv_tcp_t*)req->handle; - + stream = req->handle; connect_cb_called++; - free(req); /* Write a lot of data */ for (i = 0; i < WRITES; i++) { + uv_write_t* write_req = write_reqs + i; + for (j = 0; j < CHUNKS_PER_WRITE; j++) { - send_bufs[j].len = CHUNK_SIZE; - send_bufs[j].base = send_buffer + bytes_sent; + send_bufs[j] = uv_buf_init(send_buffer + bytes_sent, CHUNK_SIZE); bytes_sent += CHUNK_SIZE; } - write_req = malloc(sizeof(uv_write_t)); - ASSERT(write_req != NULL); - - r = uv_write(write_req, (uv_stream_t*) tcp, (uv_buf_t*)&send_bufs, - CHUNKS_PER_WRITE, write_cb); + r = uv_write(write_req, stream, send_bufs, CHUNKS_PER_WRITE, write_cb); ASSERT(r == 0); } - /* Shutdown on drain. FIXME: dealloc req? */ - shutdown_req = malloc(sizeof(uv_shutdown_t)); - ASSERT(shutdown_req != NULL); - r = uv_shutdown(shutdown_req, (uv_stream_t*)tcp, shutdown_cb); + /* Shutdown on drain. */ + r = uv_shutdown(&shutdown_req, stream, shutdown_cb); ASSERT(r == 0); /* Start reading */ - r = uv_read_start((uv_stream_t*)tcp, alloc_cb, read_cb); + r = uv_read_start(stream, alloc_cb, read_cb); ASSERT(r == 0); } TEST_IMPL(tcp_writealot) { struct sockaddr_in addr = uv_ip4_addr("127.0.0.1", TEST_PORT); - uv_tcp_t* client = (uv_tcp_t*)malloc(sizeof *client); - uv_connect_t* connect_req = malloc(sizeof(uv_connect_t)); + uv_tcp_t client; int r; - ASSERT(client != NULL); - ASSERT(connect_req != NULL); - - send_buffer = (char*)malloc(TOTAL_BYTES + 1); - + send_buffer = malloc(TOTAL_BYTES); ASSERT(send_buffer != NULL); - r = uv_tcp_init(uv_default_loop(), client); + r = uv_tcp_init(uv_default_loop(), &client); ASSERT(r == 0); - r = uv_tcp_connect(connect_req, client, addr, connect_cb); + r = uv_tcp_connect(&connect_req, &client, addr, connect_cb); ASSERT(r == 0); uv_run(uv_default_loop()); @@ -191,5 +164,7 @@ TEST_IMPL(tcp_writealot) { ASSERT(bytes_sent_done == TOTAL_BYTES); ASSERT(bytes_received_done == TOTAL_BYTES); + free(send_buffer); + return 0; }