Browse Source

deps: upgrade libuv to 5b9c451

v0.9.1-release
Ben Noordhuis 13 years ago
parent
commit
1358bac6d1
  1. 23
      deps/uv/include/uv-private/uv-unix.h
  2. 225
      deps/uv/src/cares.c
  3. 48
      deps/uv/src/unix/core.c
  4. 16
      deps/uv/src/unix/internal.h
  5. 18
      deps/uv/src/unix/linux/inotify.c
  6. 5
      deps/uv/src/unix/loop.c
  7. 19
      deps/uv/src/unix/pipe.c
  8. 58
      deps/uv/src/unix/poll.c
  9. 102
      deps/uv/src/unix/stream.c
  10. 5
      deps/uv/src/unix/tcp.c
  11. 46
      deps/uv/src/unix/udp.c
  12. 77
      deps/uv/test/test-tcp-writealot.c

23
deps/uv/include/uv-private/uv-unix.h

@ -70,6 +70,16 @@ typedef struct {
char* errmsg; char* errmsg;
} uv_lib_t; } uv_lib_t;
struct uv__io_s;
struct uv_loop_s;
typedef struct uv__io_s uv__io_t;
typedef void (*uv__io_cb)(struct uv_loop_s* loop, uv__io_t* handle, int events);
struct uv__io_s {
ev_io io_watcher;
};
#define UV_REQ_TYPE_PRIVATE /* empty */ #define UV_REQ_TYPE_PRIVATE /* empty */
#if __linux__ #if __linux__
@ -78,7 +88,7 @@ typedef struct {
struct uv__inotify_watchers { \ struct uv__inotify_watchers { \
struct uv_fs_event_s* rbh_root; \ struct uv_fs_event_s* rbh_root; \
} inotify_watchers; \ } inotify_watchers; \
ev_io inotify_read_watcher; \ uv__io_t inotify_read_watcher; \
int inotify_fd; int inotify_fd;
#elif defined(PORT_SOURCE_FILE) #elif defined(PORT_SOURCE_FILE)
# define UV_LOOP_PRIVATE_PLATFORM_FIELDS \ # define UV_LOOP_PRIVATE_PLATFORM_FIELDS \
@ -142,8 +152,8 @@ typedef struct {
#define UV_STREAM_PRIVATE_FIELDS \ #define UV_STREAM_PRIVATE_FIELDS \
uv_connect_t *connect_req; \ uv_connect_t *connect_req; \
uv_shutdown_t *shutdown_req; \ uv_shutdown_t *shutdown_req; \
ev_io read_watcher; \ uv__io_t read_watcher; \
ev_io write_watcher; \ uv__io_t write_watcher; \
ngx_queue_t write_queue; \ ngx_queue_t write_queue; \
ngx_queue_t write_completed_queue; \ ngx_queue_t write_completed_queue; \
int delayed_error; \ int delayed_error; \
@ -160,8 +170,8 @@ typedef struct {
#define UV_UDP_PRIVATE_FIELDS \ #define UV_UDP_PRIVATE_FIELDS \
uv_alloc_cb alloc_cb; \ uv_alloc_cb alloc_cb; \
uv_udp_recv_cb recv_cb; \ uv_udp_recv_cb recv_cb; \
ev_io read_watcher; \ uv__io_t read_watcher; \
ev_io write_watcher; \ uv__io_t write_watcher; \
ngx_queue_t write_queue; \ ngx_queue_t write_queue; \
ngx_queue_t write_completed_queue; \ ngx_queue_t write_completed_queue; \
@ -173,7 +183,7 @@ typedef struct {
/* UV_POLL */ /* UV_POLL */
#define UV_POLL_PRIVATE_FIELDS \ #define UV_POLL_PRIVATE_FIELDS \
ev_io io_watcher; uv__io_t io_watcher;
/* UV_PREPARE */ \ /* UV_PREPARE */ \
@ -238,7 +248,6 @@ typedef struct {
struct uv_fs_event_s* rbe_parent; \ struct uv_fs_event_s* rbe_parent; \
int rbe_color; \ int rbe_color; \
} node; \ } node; \
ev_io read_watcher; \
uv_fs_event_cb cb; uv_fs_event_cb cb;
#elif defined(__APPLE__) \ #elif defined(__APPLE__) \

225
deps/uv/src/cares.c

@ -0,0 +1,225 @@
/* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to
* deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
* sell copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
* IN THE SOFTWARE.
*/
#include "uv.h"
#include "tree.h"
#include "uv-common.h"
#include <assert.h>
#include <errno.h>
#include <stdlib.h>
#include <string.h>
struct uv_ares_task_s {
UV_HANDLE_FIELDS
ares_socket_t sock;
uv_poll_t poll_watcher;
RB_ENTRY(uv_ares_task_s) node;
};
static int cmp_ares_tasks(const uv_ares_task_t* a, const uv_ares_task_t* b) {
if (a->sock < b->sock) return -1;
if (a->sock > b->sock) return 1;
return 0;
}
RB_GENERATE_STATIC(uv__ares_tasks, uv_ares_task_s, node, cmp_ares_tasks)
/* Add ares handle to list. */
static void uv_add_ares_handle(uv_loop_t* loop, uv_ares_task_t* handle) {
assert(loop == handle->loop);
RB_INSERT(uv__ares_tasks, &loop->ares_handles, handle);
}
/* Find matching ares handle in list. */
static uv_ares_task_t* uv_find_ares_handle(uv_loop_t* loop, ares_socket_t sock) {
uv_ares_task_t handle;
handle.sock = sock;
return RB_FIND(uv__ares_tasks, &loop->ares_handles, &handle);
}
/* Remove ares handle from list. */
static void uv_remove_ares_handle(uv_ares_task_t* handle) {
RB_REMOVE(uv__ares_tasks, &handle->loop->ares_handles, handle);
}
/* Returns 1 if the ares_handles list is empty, 0 otherwise. */
static int uv_ares_handles_empty(uv_loop_t* loop) {
return RB_EMPTY(&loop->ares_handles);
}
/* This is called once per second by loop->timer. It is used to constantly */
/* call back into c-ares for possibly processing timeouts. */
static void uv__ares_timeout(uv_timer_t* handle, int status) {
assert(!uv_ares_handles_empty(handle->loop));
ares_process_fd(handle->loop->channel, ARES_SOCKET_BAD, ARES_SOCKET_BAD);
}
static void uv__ares_poll_cb(uv_poll_t* watcher, int status, int events) {
uv_loop_t* loop = watcher->loop;
uv_ares_task_t* task = container_of(watcher, uv_ares_task_t, poll_watcher);
/* Reset the idle timer */
uv_timer_again(&loop->ares_timer);
if (status < 0) {
/* An error happened. Just pretend that the socket is both readable and */
/* writable. */
ares_process_fd(loop->channel, task->sock, task->sock);
return;
}
/* Process DNS responses */
ares_process_fd(loop->channel,
events & UV_READABLE ? task->sock : ARES_SOCKET_BAD,
events & UV_WRITABLE ? task->sock : ARES_SOCKET_BAD);
}
static void uv__ares_poll_close_cb(uv_handle_t* watcher) {
uv_ares_task_t* task = container_of(watcher, uv_ares_task_t, poll_watcher);
free(task);
}
/* Allocates and returns a new uv_ares_task_t */
static uv_ares_task_t* uv__ares_task_create(uv_loop_t* loop, ares_socket_t sock) {
uv_ares_task_t* task = (uv_ares_task_t*) malloc(sizeof *task);
if (task == NULL) {
/* Out of memory. */
return NULL;
}
task->loop = loop;
task->sock = sock;
if (uv_poll_init_socket(loop, &task->poll_watcher, sock) < 0) {
/* This should never happen. */
free(task);
return NULL;
}
return task;
}
/* Callback from ares when socket operation is started */
static void uv__ares_sockstate_cb(void* data, ares_socket_t sock,
int read, int write) {
uv_loop_t* loop = (uv_loop_t*) data;
uv_ares_task_t* task;
task = uv_find_ares_handle(loop, sock);
if (read || write) {
if (!task) {
/* New socket */
/* If this is the first socket then start the timer. */
if (!uv_is_active((uv_handle_t*) &loop->ares_timer)) {
assert(uv_ares_handles_empty(loop));
uv_timer_start(&loop->ares_timer, uv__ares_timeout, 1000, 1000);
}
task = uv__ares_task_create(loop, sock);
if (task == NULL) {
/* This should never happen unless we're out of memory or something */
/* is seriously wrong. The socket won't be polled, but the the query */
/* will eventually time out. */
return;
}
uv_add_ares_handle(loop, task);
}
/* This should never fail. If it fails anyway, the query will eventually */
/* time out. */
uv_poll_start(&task->poll_watcher,
(read ? UV_READABLE : 0) | (write ? UV_WRITABLE : 0),
uv__ares_poll_cb);
} else {
/* read == 0 and write == 0 this is c-ares's way of notifying us that */
/* the socket is now closed. We must free the data associated with */
/* socket. */
assert(task &&
"When an ares socket is closed we should have a handle for it");
uv_remove_ares_handle(task);
uv_close((uv_handle_t*) &task->poll_watcher, uv__ares_poll_close_cb);
if (uv_ares_handles_empty(loop)) {
uv_timer_stop(&loop->ares_timer);
}
}
}
/* C-ares integration initialize and terminate */
int uv_ares_init_options(uv_loop_t* loop, ares_channel *channelptr,
struct ares_options *options, int optmask) {
int rc;
/* only allow single init at a time */
if (loop->channel != NULL) {
uv__set_artificial_error(loop, UV_EALREADY);
return -1;
}
/* set our callback as an option */
options->sock_state_cb = uv__ares_sockstate_cb;
options->sock_state_cb_data = loop;
optmask |= ARES_OPT_SOCK_STATE_CB;
/* We do the call to ares_init_option for caller. */
rc = ares_init_options(channelptr, options, optmask);
/* if success, save channel */
if (rc == ARES_SUCCESS) {
loop->channel = *channelptr;
}
/* Initialize the timeout timer. The timer won't be started until the */
/* first socket is opened. */
uv_timer_init(loop, &loop->ares_timer);
return rc;
}
void uv_ares_destroy(uv_loop_t* loop, ares_channel channel) {
/* Only allow destroy if did init. */
if (loop->channel) {
uv_timer_stop(&loop->ares_timer);
ares_destroy(channel);
loop->channel = NULL;
}
}

48
deps/uv/src/unix/core.c

@ -592,3 +592,51 @@ uv_err_t uv_chdir(const char* dir) {
return uv__new_sys_error(errno); return uv__new_sys_error(errno);
} }
} }
static void uv__io_set_cb(uv__io_t* handle, uv__io_cb cb) {
union { void* data; uv__io_cb cb; } u;
u.cb = cb;
handle->io_watcher.data = u.data;
}
static void uv__io_rw(struct ev_loop* ev, ev_io* w, int events) {
union { void* data; uv__io_cb cb; } u;
uv_loop_t* loop = ev_userdata(ev);
uv__io_t* handle = container_of(w, uv__io_t, io_watcher);
u.data = handle->io_watcher.data;
u.cb(loop, handle, events & (EV_READ|EV_WRITE|EV_ERROR));
}
void uv__io_init(uv__io_t* handle, uv__io_cb cb, int fd, int events) {
ev_io_init(&handle->io_watcher, uv__io_rw, fd, events & (EV_READ|EV_WRITE));
uv__io_set_cb(handle, cb);
}
void uv__io_set(uv__io_t* handle, uv__io_cb cb, int fd, int events) {
ev_io_set(&handle->io_watcher, fd, events);
uv__io_set_cb(handle, cb);
}
void uv__io_start(uv_loop_t* loop, uv__io_t* handle) {
ev_io_start(loop->ev, &handle->io_watcher);
}
void uv__io_stop(uv_loop_t* loop, uv__io_t* handle) {
ev_io_stop(loop->ev, &handle->io_watcher);
}
void uv__io_feed(uv_loop_t* loop, uv__io_t* handle, int event) {
ev_feed_event(loop->ev, &handle->io_watcher, event);
}
int uv__io_active(uv__io_t* handle) {
return ev_is_active(&handle->io_watcher);
}

16
deps/uv/src/unix/internal.h

@ -78,6 +78,10 @@
} \ } \
while (0) while (0)
#define UV__IO_READ EV_READ
#define UV__IO_WRITE EV_WRITE
#define UV__IO_ERROR EV_ERROR
/* flags */ /* flags */
enum { enum {
UV_CLOSING = 0x01, /* uv_close() called but not finished. */ UV_CLOSING = 0x01, /* uv_close() called but not finished. */
@ -127,6 +131,13 @@ int uv__socket(int domain, int type, int protocol);
int uv__dup(int fd); int uv__dup(int fd);
int uv_async_stop(uv_async_t* handle); int uv_async_stop(uv_async_t* handle);
void uv__io_init(uv__io_t* handle, uv__io_cb cb, int fd, int events);
void uv__io_set(uv__io_t* handle, uv__io_cb cb, int fd, int events);
void uv__io_start(uv_loop_t* loop, uv__io_t* handle);
void uv__io_stop(uv_loop_t* loop, uv__io_t* handle);
void uv__io_feed(uv_loop_t* loop, uv__io_t* handle, int event);
int uv__io_active(uv__io_t* handle);
/* loop */ /* loop */
int uv__loop_init(uv_loop_t* loop, int default_loop); int uv__loop_init(uv_loop_t* loop, int default_loop);
void uv__loop_delete(uv_loop_t* loop); void uv__loop_delete(uv_loop_t* loop);
@ -143,8 +154,7 @@ void uv__stream_init(uv_loop_t* loop, uv_stream_t* stream,
uv_handle_type type); uv_handle_type type);
int uv__stream_open(uv_stream_t*, int fd, int flags); int uv__stream_open(uv_stream_t*, int fd, int flags);
void uv__stream_destroy(uv_stream_t* stream); void uv__stream_destroy(uv_stream_t* stream);
void uv__stream_io(EV_P_ ev_io* watcher, int revents); void uv__server_io(uv_loop_t* loop, uv__io_t* watcher, int events);
void uv__server_io(EV_P_ ev_io* watcher, int revents);
int uv__accept(int sockfd, struct sockaddr* saddr, socklen_t len); int uv__accept(int sockfd, struct sockaddr* saddr, socklen_t len);
int uv__connect(uv_connect_t* req, uv_stream_t* stream, struct sockaddr* addr, int uv__connect(uv_connect_t* req, uv_stream_t* stream, struct sockaddr* addr,
socklen_t addrlen, uv_connect_cb cb); socklen_t addrlen, uv_connect_cb cb);
@ -156,11 +166,9 @@ int uv__tcp_keepalive(uv_tcp_t* handle, int enable, unsigned int delay);
/* pipe */ /* pipe */
int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb); int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb);
void uv__pipe_accept(EV_P_ ev_io* watcher, int revents);
/* poll */ /* poll */
void uv__poll_close(uv_poll_t* handle); void uv__poll_close(uv_poll_t* handle);
int uv__poll_active(const uv_poll_t* handle);
/* various */ /* various */
void uv__async_close(uv_async_t* handle); void uv__async_close(uv_async_t* handle);

18
deps/uv/src/unix/linux/inotify.c

@ -51,7 +51,7 @@ static int compare_watchers(const uv_fs_event_t* a, const uv_fs_event_t* b) {
RB_GENERATE_STATIC(uv__inotify_watchers, uv_fs_event_s, node, compare_watchers) RB_GENERATE_STATIC(uv__inotify_watchers, uv_fs_event_s, node, compare_watchers)
static void uv__inotify_read(EV_P_ ev_io* w, int revents); static void uv__inotify_read(uv_loop_t* loop, uv__io_t* w, int revents);
static int new_inotify_fd(void) { static int new_inotify_fd(void) {
@ -85,11 +85,11 @@ static int init_inotify(uv_loop_t* loop) {
return -1; return -1;
} }
ev_io_init(&loop->inotify_read_watcher, uv__io_init(&loop->inotify_read_watcher,
uv__inotify_read, uv__inotify_read,
loop->inotify_fd, loop->inotify_fd,
EV_READ); UV__IO_READ);
ev_io_start(loop->ev, &loop->inotify_read_watcher); uv__io_start(loop, &loop->inotify_read_watcher);
return 0; return 0;
} }
@ -112,22 +112,18 @@ static void remove_watcher(uv_fs_event_t* handle) {
} }
static void uv__inotify_read(EV_P_ ev_io* w, int revents) { static void uv__inotify_read(uv_loop_t* loop, uv__io_t* w, int events) {
const struct uv__inotify_event* e; const struct uv__inotify_event* e;
uv_fs_event_t* handle; uv_fs_event_t* handle;
uv_loop_t* uv_loop;
const char* filename; const char* filename;
ssize_t size; ssize_t size;
int events;
const char *p; const char *p;
/* needs to be large enough for sizeof(inotify_event) + strlen(filename) */ /* needs to be large enough for sizeof(inotify_event) + strlen(filename) */
char buf[4096]; char buf[4096];
uv_loop = container_of(w, uv_loop_t, inotify_read_watcher);
while (1) { while (1) {
do { do {
size = read(uv_loop->inotify_fd, buf, sizeof buf); size = read(loop->inotify_fd, buf, sizeof buf);
} }
while (size == -1 && errno == EINTR); while (size == -1 && errno == EINTR);
@ -148,7 +144,7 @@ static void uv__inotify_read(EV_P_ ev_io* w, int revents) {
if (e->mask & ~(UV__IN_ATTRIB|UV__IN_MODIFY)) if (e->mask & ~(UV__IN_ATTRIB|UV__IN_MODIFY))
events |= UV_RENAME; events |= UV_RENAME;
handle = find_watcher(uv_loop, e->wd); handle = find_watcher(loop, e->wd);
if (handle == NULL) if (handle == NULL)
continue; /* Handle has already been closed. */ continue; /* Handle has already been closed. */

5
deps/uv/src/unix/loop.c

@ -66,10 +66,11 @@ void uv__loop_delete(uv_loop_t* loop) {
uv_ares_destroy(loop, loop->channel); uv_ares_destroy(loop, loop->channel);
ev_loop_destroy(loop->ev); ev_loop_destroy(loop->ev);
#if __linux__ #if __linux__
if (loop->inotify_fd == -1) return; if (loop->inotify_fd != -1) {
ev_io_stop(loop->ev, &loop->inotify_read_watcher); uv__io_stop(loop, &loop->inotify_read_watcher);
close(loop->inotify_fd); close(loop->inotify_fd);
loop->inotify_fd = -1; loop->inotify_fd = -1;
}
#endif #endif
#if HAVE_PORTS_FS #if HAVE_PORTS_FS
if (loop->fs_fd != -1) if (loop->fs_fd != -1)

19
deps/uv/src/unix/pipe.c

@ -29,6 +29,8 @@
#include <unistd.h> #include <unistd.h>
#include <stdlib.h> #include <stdlib.h>
static void uv__pipe_accept(uv_loop_t* loop, uv__io_t* w, int events);
int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) { int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) {
uv__stream_init(loop, (uv_stream_t*)handle, UV_NAMED_PIPE); uv__stream_init(loop, (uv_stream_t*)handle, UV_NAMED_PIPE);
@ -138,8 +140,11 @@ int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb) {
uv__set_sys_error(handle->loop, errno); uv__set_sys_error(handle->loop, errno);
} else { } else {
handle->connection_cb = cb; handle->connection_cb = cb;
ev_io_init(&handle->read_watcher, uv__pipe_accept, handle->fd, EV_READ); uv__io_init(&handle->read_watcher,
ev_io_start(handle->loop->ev, &handle->read_watcher); uv__pipe_accept,
handle->fd,
UV__IO_READ);
uv__io_start(handle->loop, &handle->read_watcher);
} }
out: out:
@ -211,8 +216,8 @@ void uv_pipe_connect(uv_connect_t* req,
uv__stream_open((uv_stream_t*)handle, uv__stream_open((uv_stream_t*)handle,
sockfd, sockfd,
UV_STREAM_READABLE | UV_STREAM_WRITABLE); UV_STREAM_READABLE | UV_STREAM_WRITABLE);
ev_io_start(handle->loop->ev, &handle->read_watcher); uv__io_start(handle->loop, &handle->read_watcher);
ev_io_start(handle->loop->ev, &handle->write_watcher); uv__io_start(handle->loop, &handle->write_watcher);
status = 0; status = 0;
out: out:
@ -235,14 +240,14 @@ out:
/* TODO merge with uv__server_io()? */ /* TODO merge with uv__server_io()? */
void uv__pipe_accept(EV_P_ ev_io* watcher, int revents) { static void uv__pipe_accept(uv_loop_t* loop, uv__io_t* w, int events) {
struct sockaddr_un saddr; struct sockaddr_un saddr;
uv_pipe_t* pipe; uv_pipe_t* pipe;
int saved_errno; int saved_errno;
int sockfd; int sockfd;
saved_errno = errno; saved_errno = errno;
pipe = watcher->data; pipe = container_of(w, uv_pipe_t, read_watcher);
assert(pipe->type == UV_NAMED_PIPE); assert(pipe->type == UV_NAMED_PIPE);
@ -257,7 +262,7 @@ void uv__pipe_accept(EV_P_ ev_io* watcher, int revents) {
pipe->connection_cb((uv_stream_t*)pipe, 0); pipe->connection_cb((uv_stream_t*)pipe, 0);
if (pipe->accepted_fd == sockfd) { if (pipe->accepted_fd == sockfd) {
/* The user hasn't called uv_accept() yet */ /* The user hasn't called uv_accept() yet */
ev_io_stop(pipe->loop->ev, &pipe->read_watcher); uv__io_stop(pipe->loop, &pipe->read_watcher);
} }
} }

58
deps/uv/src/unix/poll.c

@ -27,11 +27,13 @@
#include <errno.h> #include <errno.h>
static void uv__poll_io(EV_P_ ev_io* watcher, int ev_events) { static void uv__poll_io(uv_loop_t* loop, uv__io_t* w, int events) {
uv_poll_t* handle = watcher->data; uv_poll_t* handle;
int events; int pevents;
handle = container_of(w, uv_poll_t, io_watcher);
if (ev_events & EV_ERROR) { if (events & UV__IO_ERROR) {
/* An error happened. Libev has implicitly stopped the watcher, but we */ /* An error happened. Libev has implicitly stopped the watcher, but we */
/* need to fix the refcount. */ /* need to fix the refcount. */
uv__handle_stop(handle); uv__handle_stop(handle);
@ -40,16 +42,13 @@ static void uv__poll_io(EV_P_ ev_io* watcher, int ev_events) {
return; return;
} }
assert(ev_events & (EV_READ | EV_WRITE)); pevents = 0;
assert((ev_events & ~(EV_READ | EV_WRITE)) == 0); if (events & UV__IO_READ)
pevents |= UV_READABLE;
if (events & UV__IO_WRITE)
pevents |= UV_WRITABLE;
events = 0; handle->poll_cb(handle, 0, pevents);
if (ev_events & EV_READ)
events |= UV_READABLE;
if (ev_events & EV_WRITE)
events |= UV_WRITABLE;
handle->poll_cb(handle, 0, events);
} }
@ -59,9 +58,7 @@ int uv_poll_init(uv_loop_t* loop, uv_poll_t* handle, int fd) {
handle->fd = fd; handle->fd = fd;
handle->poll_cb = NULL; handle->poll_cb = NULL;
uv__io_init(&handle->io_watcher, uv__poll_io, fd, 0);
ev_init(&handle->io_watcher, uv__poll_io);
handle->io_watcher.data = handle;
return 0; return 0;
} }
@ -74,7 +71,7 @@ int uv_poll_init_socket(uv_loop_t* loop, uv_poll_t* handle,
static void uv__poll_stop(uv_poll_t* handle) { static void uv__poll_stop(uv_poll_t* handle) {
ev_io_stop(handle->loop->ev, &handle->io_watcher); uv__io_stop(handle->loop, &handle->io_watcher);
uv__handle_stop(handle); uv__handle_stop(handle);
} }
@ -86,25 +83,25 @@ int uv_poll_stop(uv_poll_t* handle) {
} }
int uv_poll_start(uv_poll_t* handle, int events, uv_poll_cb poll_cb) { int uv_poll_start(uv_poll_t* handle, int pevents, uv_poll_cb poll_cb) {
int ev_events; int events;
assert((events & ~(UV_READABLE | UV_WRITABLE)) == 0); assert((pevents & ~(UV_READABLE | UV_WRITABLE)) == 0);
assert(!(handle->flags & (UV_CLOSING | UV_CLOSED))); assert(!(handle->flags & (UV_CLOSING | UV_CLOSED)));
if (events == 0) { if (pevents == 0) {
uv__poll_stop(handle); uv__poll_stop(handle);
return 0; return 0;
} }
ev_events = 0; events = 0;
if (events & UV_READABLE) if (pevents & UV_READABLE)
ev_events |= EV_READ; events |= UV__IO_READ;
if (events & UV_WRITABLE) if (pevents & UV_WRITABLE)
ev_events |= EV_WRITE; events |= UV__IO_WRITE;
ev_io_set(&handle->io_watcher, handle->fd, ev_events); uv__io_set(&handle->io_watcher, uv__poll_io, handle->fd, events);
ev_io_start(handle->loop->ev, &handle->io_watcher); uv__io_start(handle->loop, &handle->io_watcher);
handle->poll_cb = poll_cb; handle->poll_cb = poll_cb;
uv__handle_start(handle); uv__handle_start(handle);
@ -116,8 +113,3 @@ int uv_poll_start(uv_poll_t* handle, int events, uv_poll_cb poll_cb) {
void uv__poll_close(uv_poll_t* handle) { void uv__poll_close(uv_poll_t* handle) {
uv__poll_stop(handle); uv__poll_stop(handle);
} }
int uv__poll_active(const uv_poll_t* handle) {
return ev_is_active(&handle->io_watcher);
}

102
deps/uv/src/unix/stream.c

@ -38,6 +38,7 @@
static void uv__stream_connect(uv_stream_t*); static void uv__stream_connect(uv_stream_t*);
static void uv__write(uv_stream_t* stream); static void uv__write(uv_stream_t* stream);
static void uv__read(uv_stream_t* stream); static void uv__read(uv_stream_t* stream);
static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, int events);
static size_t uv__buf_count(uv_buf_t bufs[], int bufcnt) { static size_t uv__buf_count(uv_buf_t bufs[], int bufcnt) {
@ -71,15 +72,8 @@ void uv__stream_init(uv_loop_t* loop,
ngx_queue_init(&stream->write_completed_queue); ngx_queue_init(&stream->write_completed_queue);
stream->write_queue_size = 0; stream->write_queue_size = 0;
ev_init(&stream->read_watcher, uv__stream_io); uv__io_init(&stream->read_watcher, uv__stream_io, -1, 0);
stream->read_watcher.data = stream; uv__io_init(&stream->write_watcher, uv__stream_io, -1, 0);
ev_init(&stream->write_watcher, uv__stream_io);
stream->write_watcher.data = stream;
assert(ngx_queue_empty(&stream->write_queue));
assert(ngx_queue_empty(&stream->write_completed_queue));
assert(stream->write_queue_size == 0);
} }
@ -111,13 +105,9 @@ int uv__stream_open(uv_stream_t* stream, int fd, int flags) {
} }
} }
/* Associate the fd with each ev_io watcher. */ /* Associate the fd with each watcher. */
ev_io_set(&stream->read_watcher, fd, EV_READ); uv__io_set(&stream->read_watcher, uv__stream_io, fd, UV__IO_READ);
ev_io_set(&stream->write_watcher, fd, EV_WRITE); uv__io_set(&stream->write_watcher, uv__stream_io, fd, UV__IO_WRITE);
/* These should have been set up by uv_tcp_init or uv_pipe_init. */
assert(stream->read_watcher.cb == uv__stream_io);
assert(stream->write_watcher.cb == uv__stream_io);
return 0; return 0;
} }
@ -174,19 +164,16 @@ void uv__stream_destroy(uv_stream_t* stream) {
} }
void uv__server_io(EV_P_ ev_io* watcher, int revents) { void uv__server_io(uv_loop_t* loop, uv__io_t* w, int events) {
int fd; int fd;
struct sockaddr_storage addr; struct sockaddr_storage addr;
uv_stream_t* stream = watcher->data; uv_stream_t* stream = container_of(w, uv_stream_t, read_watcher);
assert(watcher == &stream->read_watcher ||
watcher == &stream->write_watcher);
assert(revents == EV_READ);
assert(events == UV__IO_READ);
assert(!(stream->flags & UV_CLOSING)); assert(!(stream->flags & UV_CLOSING));
if (stream->accepted_fd >= 0) { if (stream->accepted_fd >= 0) {
ev_io_stop(EV_A, &stream->read_watcher); uv__io_stop(loop, &stream->read_watcher);
return; return;
} }
@ -216,7 +203,7 @@ void uv__server_io(EV_P_ ev_io* watcher, int revents) {
stream->connection_cb((uv_stream_t*)stream, 0); stream->connection_cb((uv_stream_t*)stream, 0);
if (stream->accepted_fd >= 0) { if (stream->accepted_fd >= 0) {
/* The user hasn't yet accepted called uv_accept() */ /* The user hasn't yet accepted called uv_accept() */
ev_io_stop(stream->loop->ev, &stream->read_watcher); uv__io_stop(stream->loop, &stream->read_watcher);
return; return;
} }
} }
@ -252,7 +239,7 @@ int uv_accept(uv_stream_t* server, uv_stream_t* client) {
goto out; goto out;
} }
ev_io_start(streamServer->loop->ev, &streamServer->read_watcher); uv__io_start(streamServer->loop, &streamServer->read_watcher);
streamServer->accepted_fd = -1; streamServer->accepted_fd = -1;
status = 0; status = 0;
@ -312,7 +299,7 @@ static void uv__drain(uv_stream_t* stream) {
assert(!uv_write_queue_head(stream)); assert(!uv_write_queue_head(stream));
assert(stream->write_queue_size == 0); assert(stream->write_queue_size == 0);
ev_io_stop(stream->loop->ev, &stream->write_watcher); uv__io_stop(stream->loop, &stream->write_watcher);
/* Shutdown? */ /* Shutdown? */
if ((stream->flags & UV_STREAM_SHUTTING) && if ((stream->flags & UV_STREAM_SHUTTING) &&
@ -366,7 +353,7 @@ static void uv__write_req_finish(uv_write_t* req) {
* callback called in the near future. * callback called in the near future.
*/ */
ngx_queue_insert_tail(&stream->write_completed_queue, &req->queue); ngx_queue_insert_tail(&stream->write_completed_queue, &req->queue);
ev_feed_event(stream->loop->ev, &stream->write_watcher, EV_WRITE); uv__io_feed(stream->loop, &stream->write_watcher, UV__IO_WRITE);
} }
@ -517,7 +504,7 @@ start:
assert(!stream->blocking); assert(!stream->blocking);
/* We're not done. */ /* We're not done. */
ev_io_start(stream->loop->ev, &stream->write_watcher); uv__io_start(stream->loop, &stream->write_watcher);
} }
@ -576,7 +563,6 @@ static void uv__read(uv_stream_t* stream) {
struct msghdr msg; struct msghdr msg;
struct cmsghdr* cmsg; struct cmsghdr* cmsg;
char cmsg_space[64]; char cmsg_space[64];
struct ev_loop* ev = stream->loop->ev;
/* XXX: Maybe instead of having UV_STREAM_READING we just test if /* XXX: Maybe instead of having UV_STREAM_READING we just test if
* tcp->read_cb is NULL or not? * tcp->read_cb is NULL or not?
@ -619,7 +605,7 @@ static void uv__read(uv_stream_t* stream) {
if (errno == EAGAIN || errno == EWOULDBLOCK) { if (errno == EAGAIN || errno == EWOULDBLOCK) {
/* Wait for the next one. */ /* Wait for the next one. */
if (stream->flags & UV_STREAM_READING) { if (stream->flags & UV_STREAM_READING) {
ev_io_start(ev, &stream->read_watcher); uv__io_start(stream->loop, &stream->read_watcher);
} }
uv__set_sys_error(stream->loop, EAGAIN); uv__set_sys_error(stream->loop, EAGAIN);
@ -647,7 +633,7 @@ static void uv__read(uv_stream_t* stream) {
} else if (nread == 0) { } else if (nread == 0) {
/* EOF */ /* EOF */
uv__set_artificial_error(stream->loop, UV_EOF); uv__set_artificial_error(stream->loop, UV_EOF);
ev_io_stop(ev, &stream->read_watcher); uv__io_stop(stream->loop, &stream->read_watcher);
if (!ev_is_active(&stream->write_watcher)) if (!ev_is_active(&stream->write_watcher))
uv__handle_stop(stream); uv__handle_stop(stream);
@ -728,41 +714,44 @@ int uv_shutdown(uv_shutdown_t* req, uv_stream_t* stream, uv_shutdown_cb cb) {
stream->shutdown_req = req; stream->shutdown_req = req;
stream->flags |= UV_STREAM_SHUTTING; stream->flags |= UV_STREAM_SHUTTING;
ev_io_start(stream->loop->ev, &stream->write_watcher); uv__io_start(stream->loop, &stream->write_watcher);
return 0; return 0;
} }
void uv__stream_pending(uv_stream_t* handle) { void uv__stream_pending(uv_stream_t* handle) {
uv__stream_io(handle->loop->ev, &handle->write_watcher, EV_WRITE); uv__stream_io(handle->loop, &handle->write_watcher, UV__IO_WRITE);
} }
void uv__stream_io(EV_P_ ev_io* watcher, int revents) { static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, int events) {
uv_stream_t* stream = watcher->data; uv_stream_t* stream;
assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE || /* either UV__IO_READ or UV__IO_WRITE but not both */
assert(!!(events & UV__IO_READ) ^ !!(events & UV__IO_WRITE));
if (events & UV__IO_READ)
stream = container_of(w, uv_stream_t, read_watcher);
else
stream = container_of(w, uv_stream_t, write_watcher);
assert(stream->type == UV_TCP ||
stream->type == UV_NAMED_PIPE ||
stream->type == UV_TTY); stream->type == UV_TTY);
assert(watcher == &stream->read_watcher ||
watcher == &stream->write_watcher);
assert(!(stream->flags & UV_CLOSING)); assert(!(stream->flags & UV_CLOSING));
if (stream->connect_req) { if (stream->connect_req)
uv__stream_connect(stream); uv__stream_connect(stream);
} else { else if (events & UV__IO_READ) {
assert(revents & (EV_READ | EV_WRITE));
assert(stream->fd >= 0); assert(stream->fd >= 0);
uv__read(stream);
if (revents & EV_READ) {
uv__read((uv_stream_t*)stream);
} }
else {
if (revents & EV_WRITE) { assert(stream->fd >= 0);
uv__write(stream); uv__write(stream);
uv__write_callbacks(stream); uv__write_callbacks(stream);
} }
}
} }
@ -796,7 +785,7 @@ static void uv__stream_connect(uv_stream_t* stream) {
return; return;
if (error == 0) if (error == 0)
ev_io_start(stream->loop->ev, &stream->read_watcher); uv__io_start(stream->loop, &stream->read_watcher);
stream->connect_req = NULL; stream->connect_req = NULL;
uv__req_unregister(stream->loop, req); uv__req_unregister(stream->loop, req);
@ -869,8 +858,7 @@ int uv__connect(uv_connect_t* req, uv_stream_t* stream, struct sockaddr* addr,
} }
} }
assert(stream->write_watcher.data == stream); uv__io_start(stream->loop, &stream->write_watcher);
ev_io_start(stream->loop->ev, &stream->write_watcher);
if (stream->delayed_error) if (stream->delayed_error)
uv__make_pending(stream); uv__make_pending(stream);
@ -930,9 +918,6 @@ int uv_write2(uv_write_t* req, uv_stream_t* stream, uv_buf_t bufs[], int bufcnt,
ngx_queue_insert_tail(&stream->write_queue, &req->queue); ngx_queue_insert_tail(&stream->write_queue, &req->queue);
assert(!ngx_queue_empty(&stream->write_queue)); assert(!ngx_queue_empty(&stream->write_queue));
assert(stream->write_watcher.cb == uv__stream_io);
assert(stream->write_watcher.data == stream);
assert(stream->write_watcher.fd == stream->fd);
/* If the queue was empty when this function began, we should attempt to /* If the queue was empty when this function began, we should attempt to
* do the write immediately. Otherwise start the write_watcher and wait * do the write immediately. Otherwise start the write_watcher and wait
@ -948,7 +933,7 @@ int uv_write2(uv_write_t* req, uv_stream_t* stream, uv_buf_t bufs[], int bufcnt,
*/ */
assert(!stream->blocking); assert(!stream->blocking);
ev_io_start(stream->loop->ev, &stream->write_watcher); uv__io_start(stream->loop, &stream->write_watcher);
} }
return 0; return 0;
@ -990,10 +975,7 @@ int uv__read_start_common(uv_stream_t* stream, uv_alloc_cb alloc_cb,
stream->read2_cb = read2_cb; stream->read2_cb = read2_cb;
stream->alloc_cb = alloc_cb; stream->alloc_cb = alloc_cb;
/* These should have been set by uv_tcp_init. */ uv__io_start(stream->loop, &stream->read_watcher);
assert(stream->read_watcher.cb == uv__stream_io);
ev_io_start(stream->loop->ev, &stream->read_watcher);
uv__handle_start(stream); uv__handle_start(stream);
return 0; return 0;
@ -1013,7 +995,7 @@ int uv_read2_start(uv_stream_t* stream, uv_alloc_cb alloc_cb,
int uv_read_stop(uv_stream_t* stream) { int uv_read_stop(uv_stream_t* stream) {
ev_io_stop(stream->loop->ev, &stream->read_watcher); uv__io_stop(stream->loop, &stream->read_watcher);
uv__handle_stop(stream); uv__handle_stop(stream);
stream->flags &= ~UV_STREAM_READING; stream->flags &= ~UV_STREAM_READING;
stream->read_cb = NULL; stream->read_cb = NULL;
@ -1035,7 +1017,7 @@ int uv_is_writable(const uv_stream_t* stream) {
void uv__stream_close(uv_stream_t* handle) { void uv__stream_close(uv_stream_t* handle) {
uv_read_stop(handle); uv_read_stop(handle);
ev_io_stop(handle->loop->ev, &handle->write_watcher); uv__io_stop(handle->loop, &handle->write_watcher);
close(handle->fd); close(handle->fd);
handle->fd = -1; handle->fd = -1;

5
deps/uv/src/unix/tcp.c

@ -201,9 +201,8 @@ int uv_tcp_listen(uv_tcp_t* tcp, int backlog, uv_connection_cb cb) {
tcp->connection_cb = cb; tcp->connection_cb = cb;
/* Start listening for connections. */ /* Start listening for connections. */
ev_io_set(&tcp->read_watcher, tcp->fd, EV_READ); uv__io_set(&tcp->read_watcher, uv__server_io, tcp->fd, UV__IO_READ);
ev_set_cb(&tcp->read_watcher, uv__server_io); uv__io_start(tcp->loop, &tcp->read_watcher);
ev_io_start(tcp->loop->ev, &tcp->read_watcher);
return 0; return 0;
} }

46
deps/uv/src/unix/udp.c

@ -31,33 +31,31 @@
static void uv__udp_run_completed(uv_udp_t* handle); static void uv__udp_run_completed(uv_udp_t* handle);
static void uv__udp_run_pending(uv_udp_t* handle); static void uv__udp_run_pending(uv_udp_t* handle);
static void uv__udp_recvmsg(EV_P_ ev_io* w, int revents); static void uv__udp_recvmsg(uv_loop_t* loop, uv__io_t* w, int revents);
static void uv__udp_sendmsg(EV_P_ ev_io* w, int revents); static void uv__udp_sendmsg(uv_loop_t* loop, uv__io_t* w, int revents);
static int uv__udp_maybe_deferred_bind(uv_udp_t* handle, int domain); static int uv__udp_maybe_deferred_bind(uv_udp_t* handle, int domain);
static int uv__udp_send(uv_udp_send_t* req, uv_udp_t* handle, uv_buf_t bufs[], static int uv__udp_send(uv_udp_send_t* req, uv_udp_t* handle, uv_buf_t bufs[],
int bufcnt, struct sockaddr* addr, socklen_t addrlen, uv_udp_send_cb send_cb); int bufcnt, struct sockaddr* addr, socklen_t addrlen, uv_udp_send_cb send_cb);
static void uv__udp_start_watcher(uv_udp_t* handle, static void uv__udp_start_watcher(uv_udp_t* handle,
ev_io* w, uv__io_t* w,
void (*cb)(EV_P_ ev_io*, int), uv__io_cb cb,
int flags) { int events) {
if (ev_is_active(w)) return; if (uv__io_active(w)) return;
ev_set_cb(w, cb); uv__io_init(w, cb, handle->fd, events);
ev_io_set(w, handle->fd, flags); uv__io_start(handle->loop, w);
ev_io_start(handle->loop->ev, w);
uv__handle_start(handle); uv__handle_start(handle);
} }
static void uv__udp_stop_watcher(uv_udp_t* handle, ev_io* w) { static void uv__udp_stop_watcher(uv_udp_t* handle, uv__io_t* w) {
if (!ev_is_active(w)) return; if (!uv__io_active(w)) return;
ev_io_stop(handle->loop->ev, w); uv__io_stop(handle->loop, w);
ev_io_set(w, -1, 0);
ev_set_cb(w, NULL);
if (!ev_is_active(&handle->read_watcher) && if (!uv__io_active(&handle->read_watcher) &&
!ev_is_active(&handle->write_watcher)) { !uv__io_active(&handle->write_watcher))
{
uv__handle_stop(handle); uv__handle_stop(handle);
} }
} }
@ -67,7 +65,7 @@ static void uv__udp_start_read_watcher(uv_udp_t* handle) {
uv__udp_start_watcher(handle, uv__udp_start_watcher(handle,
&handle->read_watcher, &handle->read_watcher,
uv__udp_recvmsg, uv__udp_recvmsg,
EV_READ); UV__IO_READ);
} }
@ -75,7 +73,7 @@ static void uv__udp_start_write_watcher(uv_udp_t* handle) {
uv__udp_start_watcher(handle, uv__udp_start_watcher(handle,
&handle->write_watcher, &handle->write_watcher,
uv__udp_sendmsg, uv__udp_sendmsg,
EV_WRITE); UV__IO_WRITE);
} }
@ -101,8 +99,8 @@ void uv__udp_finish_close(uv_udp_t* handle) {
uv_udp_send_t* req; uv_udp_send_t* req;
ngx_queue_t* q; ngx_queue_t* q;
assert(!ev_is_active(&handle->write_watcher)); assert(!uv__io_active(&handle->write_watcher));
assert(!ev_is_active(&handle->read_watcher)); assert(!uv__io_active(&handle->read_watcher));
assert(handle->fd == -1); assert(handle->fd == -1);
uv__udp_run_completed(handle); uv__udp_run_completed(handle);
@ -216,7 +214,7 @@ static void uv__udp_run_completed(uv_udp_t* handle) {
} }
static void uv__udp_recvmsg(EV_P_ ev_io* w, int revents) { static void uv__udp_recvmsg(uv_loop_t* loop, uv__io_t* w, int revents) {
struct sockaddr_storage peer; struct sockaddr_storage peer;
struct msghdr h; struct msghdr h;
uv_udp_t* handle; uv_udp_t* handle;
@ -278,7 +276,7 @@ static void uv__udp_recvmsg(EV_P_ ev_io* w, int revents) {
} }
static void uv__udp_sendmsg(EV_P_ ev_io* w, int revents) { static void uv__udp_sendmsg(uv_loop_t* loop, uv__io_t* w, int revents) {
uv_udp_t* handle; uv_udp_t* handle;
handle = container_of(w, uv_udp_t, write_watcher); handle = container_of(w, uv_udp_t, write_watcher);
@ -296,7 +294,7 @@ static void uv__udp_sendmsg(EV_P_ ev_io* w, int revents) {
if (!ngx_queue_empty(&handle->write_completed_queue)) { if (!ngx_queue_empty(&handle->write_completed_queue)) {
/* Schedule completion callbacks. */ /* Schedule completion callbacks. */
ev_feed_event(handle->loop->ev, &handle->write_watcher, EV_WRITE); uv__io_feed(handle->loop, &handle->write_watcher, UV__IO_WRITE);
} }
else if (ngx_queue_empty(&handle->write_queue)) { else if (ngx_queue_empty(&handle->write_queue)) {
/* Pending queue and completion queue empty, stop watcher. */ /* Pending queue and completion queue empty, stop watcher. */
@ -649,7 +647,7 @@ int uv_udp_recv_start(uv_udp_t* handle,
return -1; return -1;
} }
if (ev_is_active(&handle->read_watcher)) { if (uv__io_active(&handle->read_watcher)) {
uv__set_artificial_error(handle->loop, UV_EALREADY); uv__set_artificial_error(handle->loop, UV_EALREADY);
return -1; return -1;
} }

77
deps/uv/test/test-tcp-writealot.c

@ -31,10 +31,8 @@
#define TOTAL_BYTES (WRITES * CHUNKS_PER_WRITE * CHUNK_SIZE) #define TOTAL_BYTES (WRITES * CHUNKS_PER_WRITE * CHUNK_SIZE)
static char* send_buffer; static char* send_buffer;
static int shutdown_cb_called = 0; static int shutdown_cb_called = 0;
static int connect_cb_called = 0; static int connect_cb_called = 0;
static int write_cb_called = 0; static int write_cb_called = 0;
@ -43,20 +41,18 @@ static int bytes_sent = 0;
static int bytes_sent_done = 0; static int bytes_sent_done = 0;
static int bytes_received_done = 0; static int bytes_received_done = 0;
static uv_connect_t connect_req;
static uv_shutdown_t shutdown_req;
static uv_write_t write_reqs[WRITES];
static uv_buf_t alloc_cb(uv_handle_t* handle, size_t size) { static uv_buf_t alloc_cb(uv_handle_t* handle, size_t size) {
uv_buf_t buf; return uv_buf_init(malloc(size), size);
buf.base = (char*)malloc(size);
buf.len = size;
return buf;
} }
static void close_cb(uv_handle_t* handle) { static void close_cb(uv_handle_t* handle) {
ASSERT(handle != NULL); ASSERT(handle != NULL);
free(handle);
close_cb_called++; close_cb_called++;
} }
@ -64,7 +60,7 @@ static void close_cb(uv_handle_t* handle) {
static void shutdown_cb(uv_shutdown_t* req, int status) { static void shutdown_cb(uv_shutdown_t* req, int status) {
uv_tcp_t* tcp; uv_tcp_t* tcp;
ASSERT(req); ASSERT(req == &shutdown_req);
ASSERT(status == 0); ASSERT(status == 0);
tcp = (uv_tcp_t*)(req->handle); tcp = (uv_tcp_t*)(req->handle);
@ -77,28 +73,21 @@ static void shutdown_cb(uv_shutdown_t* req, int status) {
/* We should have had all the writes called already. */ /* We should have had all the writes called already. */
ASSERT(write_cb_called == WRITES); ASSERT(write_cb_called == WRITES);
free(req);
} }
static void read_cb(uv_stream_t* tcp, ssize_t nread, uv_buf_t buf) { static void read_cb(uv_stream_t* tcp, ssize_t nread, uv_buf_t buf) {
ASSERT(tcp != NULL); ASSERT(tcp != NULL);
if (nread < 0) { if (nread >= 0) {
bytes_received_done += nread;
}
else {
ASSERT(uv_last_error(uv_default_loop()).code == UV_EOF); ASSERT(uv_last_error(uv_default_loop()).code == UV_EOF);
printf("GOT EOF\n"); printf("GOT EOF\n");
if (buf.base) {
free(buf.base);
}
uv_close((uv_handle_t*)tcp, close_cb); uv_close((uv_handle_t*)tcp, close_cb);
return;
} }
bytes_received_done += nread;
free(buf.base); free(buf.base);
} }
@ -114,71 +103,55 @@ static void write_cb(uv_write_t* req, int status) {
bytes_sent_done += CHUNKS_PER_WRITE * CHUNK_SIZE; bytes_sent_done += CHUNKS_PER_WRITE * CHUNK_SIZE;
write_cb_called++; write_cb_called++;
free(req);
} }
static void connect_cb(uv_connect_t* req, int status) { static void connect_cb(uv_connect_t* req, int status) {
uv_buf_t send_bufs[CHUNKS_PER_WRITE]; uv_buf_t send_bufs[CHUNKS_PER_WRITE];
uv_tcp_t* tcp; uv_stream_t* stream;
uv_write_t* write_req;
uv_shutdown_t* shutdown_req;
int i, j, r; int i, j, r;
ASSERT(req != NULL); ASSERT(req == &connect_req);
ASSERT(status == 0); ASSERT(status == 0);
tcp = (uv_tcp_t*)req->handle; stream = req->handle;
connect_cb_called++; connect_cb_called++;
free(req);
/* Write a lot of data */ /* Write a lot of data */
for (i = 0; i < WRITES; i++) { for (i = 0; i < WRITES; i++) {
uv_write_t* write_req = write_reqs + i;
for (j = 0; j < CHUNKS_PER_WRITE; j++) { for (j = 0; j < CHUNKS_PER_WRITE; j++) {
send_bufs[j].len = CHUNK_SIZE; send_bufs[j] = uv_buf_init(send_buffer + bytes_sent, CHUNK_SIZE);
send_bufs[j].base = send_buffer + bytes_sent;
bytes_sent += CHUNK_SIZE; bytes_sent += CHUNK_SIZE;
} }
write_req = malloc(sizeof(uv_write_t)); r = uv_write(write_req, stream, send_bufs, CHUNKS_PER_WRITE, write_cb);
ASSERT(write_req != NULL);
r = uv_write(write_req, (uv_stream_t*) tcp, (uv_buf_t*)&send_bufs,
CHUNKS_PER_WRITE, write_cb);
ASSERT(r == 0); ASSERT(r == 0);
} }
/* Shutdown on drain. FIXME: dealloc req? */ /* Shutdown on drain. */
shutdown_req = malloc(sizeof(uv_shutdown_t)); r = uv_shutdown(&shutdown_req, stream, shutdown_cb);
ASSERT(shutdown_req != NULL);
r = uv_shutdown(shutdown_req, (uv_stream_t*)tcp, shutdown_cb);
ASSERT(r == 0); ASSERT(r == 0);
/* Start reading */ /* Start reading */
r = uv_read_start((uv_stream_t*)tcp, alloc_cb, read_cb); r = uv_read_start(stream, alloc_cb, read_cb);
ASSERT(r == 0); ASSERT(r == 0);
} }
TEST_IMPL(tcp_writealot) { TEST_IMPL(tcp_writealot) {
struct sockaddr_in addr = uv_ip4_addr("127.0.0.1", TEST_PORT); struct sockaddr_in addr = uv_ip4_addr("127.0.0.1", TEST_PORT);
uv_tcp_t* client = (uv_tcp_t*)malloc(sizeof *client); uv_tcp_t client;
uv_connect_t* connect_req = malloc(sizeof(uv_connect_t));
int r; int r;
ASSERT(client != NULL); send_buffer = malloc(TOTAL_BYTES);
ASSERT(connect_req != NULL);
send_buffer = (char*)malloc(TOTAL_BYTES + 1);
ASSERT(send_buffer != NULL); ASSERT(send_buffer != NULL);
r = uv_tcp_init(uv_default_loop(), client); r = uv_tcp_init(uv_default_loop(), &client);
ASSERT(r == 0); ASSERT(r == 0);
r = uv_tcp_connect(connect_req, client, addr, connect_cb); r = uv_tcp_connect(&connect_req, &client, addr, connect_cb);
ASSERT(r == 0); ASSERT(r == 0);
uv_run(uv_default_loop()); uv_run(uv_default_loop());
@ -191,5 +164,7 @@ TEST_IMPL(tcp_writealot) {
ASSERT(bytes_sent_done == TOTAL_BYTES); ASSERT(bytes_sent_done == TOTAL_BYTES);
ASSERT(bytes_received_done == TOTAL_BYTES); ASSERT(bytes_received_done == TOTAL_BYTES);
free(send_buffer);
return 0; return 0;
} }

Loading…
Cancel
Save