From 130be31cfff34a2c6ccb477dbcfead97b3e09646 Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Fri, 15 Jul 2011 11:15:02 -0700 Subject: [PATCH] Upgrade libuv to 1be48f12a0 and bindings for new req interface --- deps/uv/include/uv-unix.h | 16 +- deps/uv/include/uv-win.h | 45 +-- deps/uv/include/uv.h | 99 +++-- deps/uv/src/uv-unix.c | 132 ++++--- deps/uv/src/uv-win.c | 570 +++++++++++----------------- deps/uv/test/benchmark-ping-pongs.c | 31 +- deps/uv/test/benchmark-pump.c | 32 +- deps/uv/test/dns-server.c | 22 +- deps/uv/test/echo-server.c | 20 +- deps/uv/test/test-callback-stack.c | 21 +- deps/uv/test/test-connection-fail.c | 14 +- deps/uv/test/test-delayed-accept.c | 7 +- deps/uv/test/test-getsockname.c | 31 +- deps/uv/test/test-ping-pong.c | 31 +- deps/uv/test/test-shutdown-eof.c | 17 +- deps/uv/test/test-tcp-writealot.c | 32 +- src/tcp_wrap.cc | 71 ++-- 17 files changed, 553 insertions(+), 638 deletions(-) diff --git a/deps/uv/include/uv-unix.h b/deps/uv/include/uv-unix.h index d17ebc3b27..50b29c7e32 100644 --- a/deps/uv/include/uv-unix.h +++ b/deps/uv/include/uv-unix.h @@ -41,14 +41,20 @@ typedef struct { #define UV_REQ_BUFSML_SIZE (4) -#define UV_REQ_PRIVATE_FIELDS \ - int write_index; \ - ev_timer timer; \ +#define UV_REQ_PRIVATE_FIELDS /* empty */ + +#define UV_WRITE_PRIVATE_FIELDS \ ngx_queue_t queue; \ + int write_index; \ uv_buf_t* bufs; \ int bufcnt; \ uv_buf_t bufsml[UV_REQ_BUFSML_SIZE]; +#define UV_SHUTDOWN_PRIVATE_FIELDS /* empty */ + +#define UV_CONNECT_PRIVATE_FIELDS \ + ngx_queue_t queue; + /* TODO: union or classes please! */ #define UV_HANDLE_PRIVATE_FIELDS \ @@ -67,8 +73,8 @@ typedef struct { int delayed_error; \ uv_connection_cb connection_cb; \ int accepted_fd; \ - uv_req_t *connect_req; \ - uv_req_t *shutdown_req; \ + uv_connect_t *connect_req; \ + uv_shutdown_t *shutdown_req; \ ev_io read_watcher; \ ev_io write_watcher; \ ngx_queue_t write_queue; \ diff --git a/deps/uv/include/uv-win.h b/deps/uv/include/uv-win.h index 2d6093aa4f..e0bfa848b7 100644 --- a/deps/uv/include/uv-win.h +++ b/deps/uv/include/uv-win.h @@ -42,22 +42,6 @@ typedef struct uv_buf_t { char* base; } uv_buf_t; -/* - * Private uv_pipe_instance state. - */ -typedef enum { - UV_PIPEINSTANCE_CONNECTED = 0, - UV_PIPEINSTANCE_DISCONNECTED, - UV_PIPEINSTANCE_ACTIVE -} uv_pipeinstance_state; - -/* Used to store active pipe instances inside a linked list. */ -typedef struct uv_pipe_instance_s { - HANDLE handle; - uv_pipeinstance_state state; - struct uv_pipe_instance_s* next; -} uv_pipe_instance_t; - #define UV_REQ_PRIVATE_FIELDS \ union { \ /* Used by I/O operations */ \ @@ -66,13 +50,21 @@ typedef struct uv_pipe_instance_s { size_t queued_bytes; \ }; \ }; \ - int flags; \ uv_err_t error; \ struct uv_req_s* next_req; +#define UV_WRITE_PRIVATE_FIELDS \ + /* empty */ + +#define UV_CONNECT_PRIVATE_FIELDS \ + /* empty */ + +#define UV_SHUTDOWN_PRIVATE_FIELDS \ + /* empty */ + #define uv_stream_connection_fields \ unsigned int write_reqs_pending; \ - uv_req_t* shutdown_req; + uv_shutdown_t* shutdown_req; #define uv_stream_server_fields \ uv_connection_cb connection_cb; @@ -81,7 +73,7 @@ typedef struct uv_pipe_instance_s { unsigned int reqs_pending; \ uv_alloc_cb alloc_cb; \ uv_read_cb read_cb; \ - struct uv_req_s read_req; \ + uv_req_t read_req; \ union { \ struct { uv_stream_connection_fields }; \ struct { uv_stream_server_fields }; \ @@ -94,17 +86,19 @@ typedef struct uv_pipe_instance_s { }; \ SOCKET accept_socket; \ char accept_buffer[sizeof(struct sockaddr_storage) * 2 + 32]; \ - struct uv_req_s accept_req; + struct uv_req_s accept_req; \ #define uv_pipe_server_fields \ char* name; \ - uv_pipe_instance_t* connections; \ - struct uv_req_s accept_reqs[4]; + struct uv_pipe_accept_s { \ + UV_REQ_FIELDS \ + HANDLE pipeHandle; \ + struct uv_pipe_accept_s* next_pending; \ + } accept_reqs[4]; \ + struct uv_pipe_accept_s* pending_accepts; #define uv_pipe_connection_fields \ - uv_pipe_t* server; \ - uv_pipe_instance_t* connection; \ - uv_pipe_instance_t clientConnection; + HANDLE handle; #define UV_PIPE_PRIVATE_FIELDS \ union { \ @@ -120,6 +114,7 @@ typedef struct uv_pipe_instance_s { #define UV_ASYNC_PRIVATE_FIELDS \ struct uv_req_s async_req; \ + uv_async_cb async_cb; \ /* char to avoid alignment issues */ \ char volatile async_sent; diff --git a/deps/uv/include/uv.h b/deps/uv/include/uv.h index d470a259d5..1f8c1cec13 100644 --- a/deps/uv/include/uv.h +++ b/deps/uv/include/uv.h @@ -48,10 +48,13 @@ typedef struct uv_timer_s uv_timer_t; typedef struct uv_prepare_s uv_prepare_t; typedef struct uv_check_s uv_check_t; typedef struct uv_idle_s uv_idle_t; -typedef struct uv_req_s uv_req_t; typedef struct uv_async_s uv_async_t; typedef struct uv_getaddrinfo_s uv_getaddrinfo_t; - +/* Request types */ +typedef struct uv_req_s uv_req_t; +typedef struct uv_shutdown_s uv_shutdown_t; +typedef struct uv_write_s uv_write_t; +typedef struct uv_connect_s uv_connect_t; #if defined(__unix__) || defined(__POSIX__) || defined(__APPLE__) # include "uv-unix.h" @@ -70,9 +73,9 @@ typedef struct uv_getaddrinfo_s uv_getaddrinfo_t; */ typedef uv_buf_t (*uv_alloc_cb)(uv_stream_t* tcp, size_t suggested_size); typedef void (*uv_read_cb)(uv_stream_t* tcp, ssize_t nread, uv_buf_t buf); -typedef void (*uv_write_cb)(uv_req_t* req, int status); -typedef void (*uv_connect_cb)(uv_req_t* req, int status); -typedef void (*uv_shutdown_cb)(uv_req_t* req, int status); +typedef void (*uv_write_cb)(uv_write_t* req, int status); +typedef void (*uv_connect_cb)(uv_connect_t* req, int status); +typedef void (*uv_shutdown_cb)(uv_shutdown_t* req, int status); typedef void (*uv_connection_cb)(uv_handle_t* server, int status); typedef void (*uv_close_cb)(uv_handle_t* handle); typedef void (*uv_timer_cb)(uv_timer_t* handle, int status); @@ -168,23 +171,34 @@ struct uv_err_s { }; -struct uv_req_s { - /* read-only */ - uv_req_type type; - /* public */ - uv_handle_t* handle; - void *(*cb)(void *); - void* data; - /* private */ +#define UV_REQ_FIELDS \ + /* read-only */ \ + uv_req_type type; \ + /* public */ \ + void* data; \ + /* private */ \ UV_REQ_PRIVATE_FIELDS + +/* Abstract base class of all requests. */ +struct uv_req_s { + UV_REQ_FIELDS }; + /* - * Initialize a request for use with uv_write, uv_shutdown, or uv_connect. + * Shutdown the outgoing (write) side of a duplex stream. It waits for + * pending write requests to complete. The handle should refer to a + * initialized stream. req should be an uninitalized shutdown request + * struct. The cb is a called after shutdown is complete. */ -void uv_req_init(uv_req_t* req, uv_handle_t* handle, void *(*cb)(void *)); +struct uv_shutdown_s { + UV_REQ_FIELDS + uv_stream_t* handle; + uv_shutdown_cb cb; + UV_SHUTDOWN_PRIVATE_FIELDS +}; -int uv_shutdown(uv_req_t* req); +int uv_shutdown(uv_shutdown_t* req, uv_stream_t* handle, uv_shutdown_cb cb); #define UV_HANDLE_FIELDS \ @@ -251,7 +265,8 @@ int uv_read_start(uv_stream_t*, uv_alloc_cb alloc_cb, uv_read_cb read_cb); int uv_read_stop(uv_stream_t*); -/* Write data to stream. Buffers are written in order. Example: +/* + * Write data to stream. Buffers are written in order. Example: * * uv_buf_t a[] = { * { .base = "1", .len = 1 }, @@ -268,7 +283,15 @@ int uv_read_stop(uv_stream_t*); * uv_write(req, b, 2); * */ -int uv_write(uv_req_t* req, uv_buf_t bufs[], int bufcnt); +struct uv_write_s { + UV_REQ_FIELDS + uv_write_cb cb; + uv_stream_t* handle; + UV_WRITE_PRIVATE_FIELDS +}; + +int uv_write(uv_write_t* req, uv_stream_t* handle, uv_buf_t bufs[], int bufcnt, + uv_write_cb cb); /* @@ -287,8 +310,23 @@ int uv_tcp_init(uv_tcp_t* handle); int uv_tcp_bind(uv_tcp_t* handle, struct sockaddr_in); int uv_tcp_bind6(uv_tcp_t* handle, struct sockaddr_in6); -int uv_tcp_connect(uv_req_t* req, struct sockaddr_in); -int uv_tcp_connect6(uv_req_t* req, struct sockaddr_in6); +/* + * uv_tcp_connect, uv_tcp_connect6 + * These functions establish IPv4 and IPv6 TCP connections. Provide an + * initialized TCP handle and an uninitialized uv_connect_t*. The callback + * will be made when the connection is estabished. + */ +struct uv_connect_s { + UV_REQ_FIELDS + uv_connect_cb cb; + uv_stream_t* handle; + UV_CONNECT_PRIVATE_FIELDS +}; + +int uv_tcp_connect(uv_connect_t* req, uv_tcp_t* handle, + struct sockaddr_in address, uv_connect_cb cb); +int uv_tcp_connect6(uv_connect_t* req, uv_tcp_t* handle, + struct sockaddr_in6 address, uv_connect_cb cb); int uv_tcp_listen(uv_tcp_t* handle, int backlog, uv_connection_cb cb); @@ -298,10 +336,10 @@ int uv_getsockname(uv_tcp_t* handle, struct sockaddr* name, int* namelen); /* * A subclass of uv_stream_t representing a pipe stream or pipe server. */ -struct uv_pipe_s { - UV_HANDLE_FIELDS - UV_STREAM_FIELDS - UV_PIPE_PRIVATE_FIELDS +struct uv_pipe_s { + UV_HANDLE_FIELDS + UV_STREAM_FIELDS + UV_PIPE_PRIVATE_FIELDS }; int uv_pipe_init(uv_pipe_t* handle); @@ -310,7 +348,8 @@ int uv_pipe_bind(uv_pipe_t* handle, const char* name); int uv_pipe_listen(uv_pipe_t* handle, uv_connection_cb cb); -int uv_pipe_connect(uv_req_t* req, const char* name); +int uv_pipe_connect(uv_connect_t* req, uv_pipe_t* handle, + const char* name, uv_connect_cb cb); /* @@ -489,7 +528,7 @@ int uv_exepath(char* buffer, size_t* size); extern uint64_t uv_hrtime(void); -/* the presence of this union forces similar struct layout */ +/* the presence of these unions force similar struct layout */ union uv_any_handle { uv_tcp_t tcp; uv_pipe_t pipe; @@ -501,6 +540,14 @@ union uv_any_handle { uv_getaddrinfo_t getaddrinfo; }; +union uv_any_req { + uv_req_t req; + uv_write_t write; + uv_connect_t connect; + uv_shutdown_t shutdown; +}; + + /* Diagnostic counters */ typedef struct { uint64_t req_init; diff --git a/deps/uv/src/uv-unix.c b/deps/uv/src/uv-unix.c index ab8a5e6a7b..1dc20d8437 100644 --- a/deps/uv/src/uv-unix.c +++ b/deps/uv/src/uv-unix.c @@ -59,6 +59,7 @@ struct uv_ares_data_s { static struct uv_ares_data_s ares_data; +void uv__req_init(uv_req_t*); void uv__tcp_io(EV_P_ ev_io* watcher, int revents); void uv__next(EV_P_ ev_idle* watcher, int revents); static void uv__tcp_connect(uv_tcp_t*); @@ -518,9 +519,9 @@ void uv__finish_close(uv_handle_t* handle) { } -uv_req_t* uv_write_queue_head(uv_tcp_t* tcp) { +uv_write_t* uv_write_queue_head(uv_tcp_t* tcp) { ngx_queue_t* q; - uv_req_t* req; + uv_write_t* req; if (ngx_queue_empty(&tcp->write_queue)) { return NULL; @@ -531,7 +532,7 @@ uv_req_t* uv_write_queue_head(uv_tcp_t* tcp) { return NULL; } - req = ngx_queue_data(q, struct uv_req_s, queue); + req = ngx_queue_data(q, struct uv_write_s, queue); assert(req); return req; @@ -552,8 +553,7 @@ void uv__next(EV_P_ ev_idle* watcher, int revents) { static void uv__drain(uv_tcp_t* tcp) { - uv_req_t* req; - uv_shutdown_cb cb; + uv_shutdown_t* req; assert(!uv_write_queue_head(tcp)); assert(tcp->write_queue_size == 0); @@ -567,16 +567,19 @@ static void uv__drain(uv_tcp_t* tcp) { assert(tcp->shutdown_req); req = tcp->shutdown_req; - cb = (uv_shutdown_cb)req->cb; if (shutdown(tcp->fd, SHUT_WR)) { /* Error. Report it. User should call uv_close(). */ uv_err_new((uv_handle_t*)tcp, errno); - if (cb) cb(req, -1); + if (req->cb) { + req->cb(req, -1); + } } else { uv_err_new((uv_handle_t*)tcp, 0); uv_flag_set((uv_handle_t*)tcp, UV_SHUT); - if (cb) cb(req, 0); + if (req->cb) { + req->cb(req, 0); + } } } } @@ -585,8 +588,8 @@ static void uv__drain(uv_tcp_t* tcp) { /* On success returns NULL. On error returns a pointer to the write request * which had the error. */ -static uv_req_t* uv__write(uv_tcp_t* tcp) { - uv_req_t* req; +static uv_write_t* uv__write(uv_tcp_t* tcp) { + uv_write_t* req; struct iovec* iov; int iovcnt; ssize_t n; @@ -602,7 +605,7 @@ static uv_req_t* uv__write(uv_tcp_t* tcp) { return NULL; } - assert(req->handle == (uv_handle_t*)tcp); + assert(req->handle == (uv_stream_t*)tcp); /* Cast to iovec. We had to have our own uv_buf_t instead of iovec * because Windows's WSABUF is not an iovec. @@ -691,23 +694,20 @@ static uv_req_t* uv__write(uv_tcp_t* tcp) { static void uv__write_callbacks(uv_tcp_t* tcp) { - uv_write_cb cb; int callbacks_made = 0; ngx_queue_t* q; - uv_req_t* req; + uv_write_t* req; while (!ngx_queue_empty(&tcp->write_completed_queue)) { /* Pop a req off write_completed_queue. */ q = ngx_queue_head(&tcp->write_completed_queue); assert(q); - req = ngx_queue_data(q, struct uv_req_s, queue); + req = ngx_queue_data(q, struct uv_write_s, queue); ngx_queue_remove(q); - cb = (uv_write_cb) req->cb; - /* NOTE: call callback AFTER freeing the request data. */ - if (cb) { - cb(req, 0); + if (req->cb) { + req->cb(req, 0); } callbacks_made++; @@ -772,10 +772,16 @@ void uv__read(uv_tcp_t* tcp) { } -int uv_shutdown(uv_req_t* req) { - uv_tcp_t* tcp = (uv_tcp_t*)req->handle; +int uv_shutdown(uv_shutdown_t* req, uv_stream_t* handle, uv_shutdown_cb cb) { + uv_tcp_t* tcp = (uv_tcp_t*)handle; + assert(handle->type == UV_TCP && + "uv_shutdown (unix) only supports uv_tcp_t right now"); assert(tcp->fd >= 0); - assert(tcp->type == UV_TCP); + + /* Initialize request */ + uv__req_init((uv_req_t*)req); + req->handle = handle; + req->cb = cb; if (uv_flag_is_set((uv_handle_t*)tcp, UV_SHUT) || uv_flag_is_set((uv_handle_t*)tcp, UV_CLOSED) || @@ -796,11 +802,11 @@ int uv_shutdown(uv_req_t* req) { void uv__tcp_io(EV_P_ ev_io* watcher, int revents) { uv_tcp_t* tcp = watcher->data; + + assert(tcp->type == UV_TCP); assert(watcher == &tcp->read_watcher || watcher == &tcp->write_watcher); - assert(tcp->fd >= 0); - assert(!uv_flag_is_set((uv_handle_t*)tcp, UV_CLOSING)); if (tcp->connect_req) { @@ -811,13 +817,11 @@ void uv__tcp_io(EV_P_ ev_io* watcher, int revents) { } if (revents & EV_WRITE) { - uv_req_t* req = uv__write(tcp); + uv_write_t* req = uv__write(tcp); if (req) { /* Error. Notify the user. */ - uv_write_cb cb = (uv_write_cb) req->cb; - - if (cb) { - cb(req, -1); + if (req->cb) { + req->cb(req, -1); } } else { uv__write_callbacks(tcp); @@ -834,13 +838,11 @@ void uv__tcp_io(EV_P_ ev_io* watcher, int revents) { */ static void uv__tcp_connect(uv_tcp_t* tcp) { int error; - uv_req_t* req; - uv_connect_cb connect_cb; + uv_connect_t* req = tcp->connect_req; socklen_t errorsize = sizeof(int); + assert(tcp->type == UV_TCP); assert(tcp->fd >= 0); - - req = tcp->connect_req; assert(req); if (tcp->delayed_error) { @@ -860,9 +862,8 @@ static void uv__tcp_connect(uv_tcp_t* tcp) { /* Successful connection */ tcp->connect_req = NULL; - connect_cb = (uv_connect_cb) req->cb; - if (connect_cb) { - connect_cb(req, 0); + if (req->cb) { + req->cb(req, 0); } } else if (error == EINPROGRESS) { @@ -873,18 +874,15 @@ static void uv__tcp_connect(uv_tcp_t* tcp) { uv_err_new((uv_handle_t*)tcp, error); tcp->connect_req = NULL; - - connect_cb = (uv_connect_cb) req->cb; - if (connect_cb) { - connect_cb(req, -1); + if (req->cb) { + req->cb(req, -1); } } } -static int uv__connect(uv_req_t* req, struct sockaddr* addr, - socklen_t addrlen) { - uv_tcp_t* tcp = (uv_tcp_t*)req->handle; +static int uv__connect(uv_connect_t* req, uv_tcp_t* tcp, struct sockaddr* addr, + socklen_t addrlen, uv_connect_cb cb) { int r; if (tcp->fd <= 0) { @@ -901,6 +899,9 @@ static int uv__connect(uv_req_t* req, struct sockaddr* addr, } } + uv__req_init((uv_req_t*)req); + req->cb = cb; + req->handle = (uv_stream_t*)tcp; req->type = UV_CONNECT; ngx_queue_init(&req->queue); @@ -947,17 +948,21 @@ static int uv__connect(uv_req_t* req, struct sockaddr* addr, } -int uv_tcp_connect(uv_req_t* req, struct sockaddr_in addr) { - assert(addr.sin_family == AF_INET); - return uv__connect(req, (struct sockaddr*) &addr, - sizeof(struct sockaddr_in)); +int uv_tcp_connect(uv_connect_t* req, uv_tcp_t* handle, + struct sockaddr_in address, uv_connect_cb cb) { + assert(handle->type == UV_TCP); + assert(address.sin_family == AF_INET); + return uv__connect(req, handle, (struct sockaddr*) &address, + sizeof(struct sockaddr_in), cb); } -int uv_tcp_connect6(uv_req_t* req, struct sockaddr_in6 addr) { - assert(addr.sin6_family == AF_INET6); - return uv__connect(req, (struct sockaddr*) &addr, - sizeof(struct sockaddr_in6)); +int uv_tcp_connect6(uv_connect_t* req, uv_tcp_t* handle, + struct sockaddr_in6 address, uv_connect_cb cb) { + assert(handle->type == UV_TCP); + assert(address.sin6_family == AF_INET6); + return uv__connect(req, handle, (struct sockaddr*) &address, + sizeof(struct sockaddr_in6), cb); } @@ -997,9 +1002,21 @@ static size_t uv__buf_count(uv_buf_t bufs[], int bufcnt) { /* The buffers to be written must remain valid until the callback is called. * This is not required for the uv_buf_t array. */ -int uv_write(uv_req_t* req, uv_buf_t bufs[], int bufcnt) { - uv_tcp_t* tcp = (uv_tcp_t*)req->handle; - int empty_queue = (tcp->write_queue_size == 0); +int uv_write(uv_write_t* req, uv_stream_t* handle, uv_buf_t bufs[], int bufcnt, + uv_write_cb cb) { + int empty_queue; + uv_tcp_t* tcp = (uv_tcp_t*)handle; + + /* Initialize the req */ + uv__req_init((uv_req_t*) req); + req->cb = cb; + req->handle = handle; + ngx_queue_init(&req->queue); + + assert(handle->type == UV_TCP && + "uv_write (unix) does not yet support other types of streams"); + + empty_queue = (tcp->write_queue_size == 0); assert(tcp->fd >= 0); ngx_queue_init(&req->queue); @@ -1118,12 +1135,10 @@ int uv_read_stop(uv_stream_t* stream) { } -void uv_req_init(uv_req_t* req, uv_handle_t* handle, void *(*cb)(void *)) { +void uv__req_init(uv_req_t* req) { uv_counters()->req_init++; req->type = UV_UNKNOWN_REQ; - req->cb = cb; - req->handle = handle; - ngx_queue_init(&req->queue); + req->data = NULL; } @@ -1634,6 +1649,7 @@ int uv_pipe_listen(uv_pipe_t* handle, uv_connection_cb cb) { } -int uv_pipe_connect(uv_req_t* req, const char* name) { +int uv_pipe_connect(uv_connect_t* req, uv_pipe_t* handle, + const char* name, uv_connect_cb cb) { assert(0 && "implement me"); } diff --git a/deps/uv/src/uv-win.c b/deps/uv/src/uv-win.c index 7166e37fa3..a4478f5f51 100644 --- a/deps/uv/src/uv-win.c +++ b/deps/uv/src/uv-win.c @@ -24,6 +24,7 @@ #include #include #include +#include #include "uv.h" #include "uv-common.h" @@ -165,12 +166,7 @@ static LPFN_TRANSMITFILE pTransmitFile6; #define UV_HANDLE_BIND_ERROR 0x1000 #define UV_HANDLE_IPV6 0x2000 #define UV_HANDLE_PIPESERVER 0x4000 - -/* - * Private uv_req flags. - */ -/* The request is currently queued. */ -#define UV_REQ_PENDING 0x01 +#define UV_HANDLE_READ_PENDING 0x8000 /* Binary tree used to keep the list of timers sorted. */ @@ -518,12 +514,9 @@ void uv_init() { } -void uv_req_init(uv_req_t* req, uv_handle_t* handle, void *(*cb)(void *)) { +static void uv_req_init(uv_req_t* req) { uv_counters()->req_init++; req->type = UV_UNKNOWN_REQ; - req->flags = 0; - req->handle = handle; - req->cb = cb; } @@ -602,7 +595,10 @@ static int uv_tcp_set_socket(uv_tcp_t* handle, SOCKET socket) { static void uv_init_connection(uv_stream_t* handle) { handle->flags |= UV_HANDLE_CONNECTION; handle->write_reqs_pending = 0; - uv_req_init(&(handle->read_req), (uv_handle_t*)handle, NULL); + + uv_req_init((uv_req_t*) &(handle->read_req)); + handle->read_req.type = UV_READ; + handle->read_req.data = handle; } @@ -650,11 +646,10 @@ static void uv_tcp_endgame(uv_tcp_t* handle) { err = uv_new_sys_error(WSAGetLastError()); } if (handle->shutdown_req->cb) { - handle->shutdown_req->flags &= ~UV_REQ_PENDING; if (status == -1) { uv_last_error_ = err; } - ((uv_shutdown_cb)handle->shutdown_req->cb)(handle->shutdown_req, status); + handle->shutdown_req->cb(handle->shutdown_req, status); } handle->reqs_pending--; } @@ -683,11 +678,10 @@ static void uv_pipe_endgame(uv_pipe_t* handle) { close_pipe(handle, &status, &err); if (handle->shutdown_req->cb) { - handle->shutdown_req->flags &= ~UV_REQ_PENDING; if (status == -1) { uv_last_error_ = err; } - ((uv_shutdown_cb)handle->shutdown_req->cb)(handle->shutdown_req, status); + handle->shutdown_req->cb(handle->shutdown_req, status); } handle->reqs_pending--; } @@ -952,9 +946,6 @@ static void uv_tcp_queue_accept(uv_tcp_t* handle) { /* Prepare the uv_req structure. */ req = &handle->accept_req; - assert(!(req->flags & UV_REQ_PENDING)); - req->type = UV_ACCEPT; - req->flags |= UV_REQ_PENDING; /* choose family and extension function */ if ((handle->flags & UV_HANDLE_IPV6) != 0) { @@ -999,57 +990,55 @@ static void uv_tcp_queue_accept(uv_tcp_t* handle) { handle->accept_socket = accept_socket; handle->reqs_pending++; - req->flags |= UV_REQ_PENDING; } -static void uv_pipe_queue_accept(uv_pipe_t* handle) { - uv_req_t* req; +static void uv_pipe_queue_accept(uv_pipe_t* handle, struct uv_pipe_accept_s* req) { HANDLE pipeHandle; - int i; assert(handle->flags & UV_HANDLE_LISTENING); + assert(req->pipeHandle == INVALID_HANDLE_VALUE); + + pipeHandle = CreateNamedPipe(handle->name, + PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED, + PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT, + PIPE_UNLIMITED_INSTANCES, + 65536, + 65536, + 0, + NULL); + + if (pipeHandle == INVALID_HANDLE_VALUE) { + req->error = uv_new_sys_error(GetLastError()); + uv_insert_pending_req((uv_req_t*) req); + handle->reqs_pending++; + return; + } - for (i = 0; i < COUNTOF(handle->accept_reqs); i++) { - req = &handle->accept_reqs[i]; - if (!(req->flags & UV_REQ_PENDING)) { - pipeHandle = CreateNamedPipe(handle->name, - PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED, - PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT, - PIPE_UNLIMITED_INSTANCES, - 65536, - 65536, - 0, - NULL); - - if (pipeHandle == INVALID_HANDLE_VALUE) { - continue; - } - - if (CreateIoCompletionPort(pipeHandle, - uv_iocp_, - (ULONG_PTR)handle, - 0) == NULL) { - continue; - } - - /* Prepare the overlapped structure. */ - memset(&(req->overlapped), 0, sizeof(req->overlapped)); + if (CreateIoCompletionPort(pipeHandle, + uv_iocp_, + (ULONG_PTR)handle, + 0) == NULL) { + req->error = uv_new_sys_error(GetLastError()); + uv_insert_pending_req((uv_req_t*) req); + handle->reqs_pending++; + return; + } - if (!ConnectNamedPipe(pipeHandle, &req->overlapped) && - GetLastError() != ERROR_IO_PENDING && GetLastError() != ERROR_PIPE_CONNECTED) { - /* Make this req pending reporting an error. */ - req->error = uv_new_sys_error(GetLastError()); - uv_insert_pending_req(req); - handle->reqs_pending++; - continue; - } + /* Prepare the overlapped structure. */ + memset(&(req->overlapped), 0, sizeof(req->overlapped)); - req->data = pipeHandle; - req->flags |= UV_REQ_PENDING; - handle->reqs_pending++; - } + if (!ConnectNamedPipe(pipeHandle, &req->overlapped) && + GetLastError() != ERROR_IO_PENDING && GetLastError() != ERROR_PIPE_CONNECTED) { + /* Make this req pending reporting an error. */ + req->error = uv_new_sys_error(GetLastError()); + uv_insert_pending_req((uv_req_t*) req); + handle->reqs_pending++; + return; } + + req->pipeHandle = pipeHandle; + handle->reqs_pending++; } @@ -1060,11 +1049,10 @@ static void uv_tcp_queue_read(uv_tcp_t* handle) { DWORD bytes, flags; assert(handle->flags & UV_HANDLE_READING); + assert(!(handle->flags & UV_HANDLE_READ_PENDING)); req = &handle->read_req; - assert(!(req->flags & UV_REQ_PENDING)); memset(&req->overlapped, 0, sizeof(req->overlapped)); - req->type = UV_READ; buf.base = (char*) &uv_zero_; buf.len = 0; @@ -1085,7 +1073,7 @@ static void uv_tcp_queue_read(uv_tcp_t* handle) { return; } - req->flags |= UV_REQ_PENDING; + handle->flags |= UV_HANDLE_READ_PENDING; handle->reqs_pending++; } @@ -1095,16 +1083,15 @@ static void uv_pipe_queue_read(uv_pipe_t* handle) { int result; assert(handle->flags & UV_HANDLE_READING); - assert(handle->connection); - assert(handle->connection->handle != INVALID_HANDLE_VALUE); + assert(!(handle->flags & UV_HANDLE_READ_PENDING)); + + assert(handle->handle != INVALID_HANDLE_VALUE); req = &handle->read_req; - assert(!(req->flags & UV_REQ_PENDING)); memset(&req->overlapped, 0, sizeof(req->overlapped)); - req->type = UV_READ; /* Do 0-read */ - result = ReadFile(handle->connection->handle, + result = ReadFile(handle->handle, &uv_zero_, 0, NULL, @@ -1118,7 +1105,7 @@ static void uv_pipe_queue_read(uv_pipe_t* handle) { return; } - req->flags |= UV_REQ_PENDING; + handle->flags |= UV_HANDLE_READ_PENDING; handle->reqs_pending++; } @@ -1138,6 +1125,10 @@ int uv_tcp_listen(uv_tcp_t* handle, int backlog, uv_connection_cb cb) { return -1; } + if (!(handle->flags & UV_HANDLE_BOUND) && + uv_tcp_bind(handle, uv_addr_ip4_any_) < 0) + return -1; + if (listen(handle->socket, backlog) == SOCKET_ERROR) { uv_set_sys_error(WSAGetLastError()); return -1; @@ -1146,7 +1137,9 @@ int uv_tcp_listen(uv_tcp_t* handle, int backlog, uv_connection_cb cb) { handle->flags |= UV_HANDLE_LISTENING; handle->connection_cb = cb; - uv_req_init(&(handle->accept_req), (uv_handle_t*)handle, NULL); + uv_req_init(&(handle->accept_req)); + handle->accept_req.type = UV_ACCEPT; + handle->accept_req.data = handle; uv_tcp_queue_accept(handle); return 0; @@ -1179,36 +1172,27 @@ static int uv_tcp_accept(uv_tcp_t* server, uv_tcp_t* client) { static int uv_pipe_accept(uv_pipe_t* server, uv_pipe_t* client) { - uv_pipe_instance_t* connection = server->connections; - /* Find a connection instance that has been connected, but not yet accepted. */ - while (connection) { - if (connection->state == UV_PIPEINSTANCE_CONNECTED) { - break; - } + struct uv_pipe_accept_s* req = server->pending_accepts; - connection = connection->next; - } - - if (!connection) { + if (!req) { /* No valid connections found, so we error out. */ - uv_set_sys_error(UV_ENOTCONN); + uv_set_sys_error(WSAEWOULDBLOCK); return -1; } - /* Make the connection instance active */ - connection->state = UV_PIPEINSTANCE_ACTIVE; - - /* Assign the connection to the client. */ - client->connection = connection; - client->server = server; + /* Initialize the client handle and copy the pipeHandle to the client */ + uv_pipe_init(client); + uv_init_connection((uv_stream_t*) client); + client->handle = req->pipeHandle; - uv_init_connection((uv_stream_t*)client); - client->flags |= UV_HANDLE_PIPESERVER; - uv_req_init(&(client->read_req), (uv_handle_t*)client, NULL); + /* Prepare the req to pick up a new connection */ + server->pending_accepts = req->next_pending; + req->next_pending = NULL; + req->pipeHandle = INVALID_HANDLE_VALUE; if (!(server->flags & UV_HANDLE_CLOSING)) { - uv_pipe_queue_accept(server); + uv_pipe_queue_accept(server, req); } return 0; @@ -1250,7 +1234,7 @@ static int uv_tcp_read_start(uv_tcp_t* handle, uv_alloc_cb alloc_cb, uv_read_cb /* If reading was stopped and then started again, there could stell be a */ /* read request pending. */ - if (!(handle->read_req.flags & UV_REQ_PENDING)) + if (!(handle->flags & UV_HANDLE_READ_PENDING)) uv_tcp_queue_read(handle); return 0; @@ -1279,7 +1263,7 @@ static int uv_pipe_read_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb, uv_read_c /* If reading was stopped and then started again, there could stell be a */ /* read request pending. */ - if (!(handle->read_req.flags & UV_REQ_PENDING)) + if (!(handle->flags & UV_HANDLE_READ_PENDING)) uv_pipe_queue_read(handle); return 0; @@ -1304,20 +1288,18 @@ int uv_read_stop(uv_stream_t* handle) { } -int uv_tcp_connect(uv_req_t* req, struct sockaddr_in addr) { +int uv_tcp_connect(uv_connect_t* req, uv_tcp_t* handle, + struct sockaddr_in address, uv_connect_cb cb) { int addrsize = sizeof(struct sockaddr_in); BOOL success; DWORD bytes; - uv_tcp_t* handle = (uv_tcp_t*)req->handle; - - assert(!(req->flags & UV_REQ_PENDING)); if (handle->flags & UV_HANDLE_BIND_ERROR) { uv_last_error_ = handle->error; return -1; } - if (addr.sin_family != AF_INET) { + if (address.sin_family != AF_INET) { uv_set_sys_error(WSAEFAULT); return -1; } @@ -1326,11 +1308,14 @@ int uv_tcp_connect(uv_req_t* req, struct sockaddr_in addr) { uv_tcp_bind(handle, uv_addr_ip4_any_) < 0) return -1; - memset(&req->overlapped, 0, sizeof(req->overlapped)); + uv_req_init((uv_req_t*) req); req->type = UV_CONNECT; + req->handle = (uv_stream_t*) handle; + req->cb = cb; + memset(&req->overlapped, 0, sizeof(req->overlapped)); success = pConnectEx(handle->socket, - (struct sockaddr*)&addr, + (struct sockaddr*) &address, addrsize, NULL, 0, @@ -1342,32 +1327,29 @@ int uv_tcp_connect(uv_req_t* req, struct sockaddr_in addr) { return -1; } - req->flags |= UV_REQ_PENDING; handle->reqs_pending++; return 0; } -int uv_tcp_connect6(uv_req_t* req, struct sockaddr_in6 addr) { +int uv_tcp_connect6(uv_connect_t* req, uv_tcp_t* handle, + struct sockaddr_in6 address, uv_connect_cb cb) { int addrsize = sizeof(struct sockaddr_in6); BOOL success; DWORD bytes; - uv_tcp_t* handle = (uv_tcp_t*)req->handle; if (!uv_allow_ipv6) { uv_new_sys_error(UV_EAFNOSUPPORT); return -1; } - assert(!(req->flags & UV_REQ_PENDING)); - if (handle->flags & UV_HANDLE_BIND_ERROR) { uv_last_error_ = handle->error; return -1; } - if (addr.sin6_family != AF_INET6) { + if (address.sin6_family != AF_INET6) { uv_set_sys_error(WSAEFAULT); return -1; } @@ -1376,11 +1358,14 @@ int uv_tcp_connect6(uv_req_t* req, struct sockaddr_in6 addr) { uv_tcp_bind6(handle, uv_addr_ip6_any_) < 0) return -1; - memset(&req->overlapped, 0, sizeof(req->overlapped)); + uv_req_init((uv_req_t*) req); req->type = UV_CONNECT; + req->handle = (uv_stream_t*) handle; + req->cb = cb; + memset(&req->overlapped, 0, sizeof(req->overlapped)); success = pConnectEx6(handle->socket, - (struct sockaddr*)&addr, + (struct sockaddr*) &address, addrsize, NULL, 0, @@ -1392,7 +1377,6 @@ int uv_tcp_connect6(uv_req_t* req, struct sockaddr_in6 addr) { return -1; } - req->flags |= UV_REQ_PENDING; handle->reqs_pending++; return 0; @@ -1429,25 +1413,26 @@ static size_t uv_count_bufs(uv_buf_t bufs[], int count) { } -int uv_tcp_write(uv_req_t* req, uv_buf_t bufs[], int bufcnt) { +static int uv_tcp_write(uv_write_t* req, uv_tcp_t* handle, uv_buf_t bufs[], int bufcnt, + uv_write_cb cb) { int result; DWORD bytes, err; - uv_tcp_t* handle = (uv_tcp_t*) req->handle; - assert(!(req->flags & UV_REQ_PENDING)); - - if (!(req->handle->flags & UV_HANDLE_CONNECTION)) { + if (!(handle->flags & UV_HANDLE_CONNECTION)) { uv_set_sys_error(WSAEINVAL); return -1; } - if (req->handle->flags & UV_HANDLE_SHUTTING) { + if (handle->flags & UV_HANDLE_SHUTTING) { uv_set_sys_error(WSAESHUTDOWN); return -1; } - memset(&req->overlapped, 0, sizeof(req->overlapped)); + uv_req_init((uv_req_t*) req); req->type = UV_WRITE; + req->handle = (uv_stream_t*) handle; + req->cb = cb; + memset(&req->overlapped, 0, sizeof(req->overlapped)); result = WSASend(handle->socket, (WSABUF*)bufs, @@ -1474,7 +1459,6 @@ int uv_tcp_write(uv_req_t* req, uv_buf_t bufs[], int bufcnt) { handle->write_queue_size += req->queued_bytes; } - req->flags |= UV_REQ_PENDING; handle->reqs_pending++; handle->write_reqs_pending++; @@ -1482,34 +1466,34 @@ int uv_tcp_write(uv_req_t* req, uv_buf_t bufs[], int bufcnt) { } -int uv_pipe_write(uv_req_t* req, uv_buf_t bufs[], int bufcnt) { +int uv_pipe_write(uv_write_t* req, uv_pipe_t* handle, uv_buf_t bufs[], int bufcnt, + uv_write_cb cb) { int result; - uv_pipe_t* handle = (uv_pipe_t*) req->handle; - - assert(!(req->flags & UV_REQ_PENDING)); if (bufcnt != 1) { uv_set_sys_error(UV_ENOTSUP); return -1; } - assert(handle->connection); - assert(handle->connection->handle != INVALID_HANDLE_VALUE); + assert(handle->handle != INVALID_HANDLE_VALUE); - if (!(req->handle->flags & UV_HANDLE_CONNECTION)) { + if (!(handle->flags & UV_HANDLE_CONNECTION)) { uv_set_sys_error(UV_EINVAL); return -1; } - if (req->handle->flags & UV_HANDLE_SHUTTING) { + if (handle->flags & UV_HANDLE_SHUTTING) { uv_set_sys_error(UV_EOF); return -1; } - memset(&req->overlapped, 0, sizeof(req->overlapped)); + uv_req_init((uv_req_t*) req); req->type = UV_WRITE; + req->handle = (uv_stream_t*) handle; + req->cb = cb; + memset(&req->overlapped, 0, sizeof(req->overlapped)); - result = WriteFile(handle->connection->handle, + result = WriteFile(handle->handle, bufs[0].base, bufs[0].len, NULL, @@ -1529,7 +1513,6 @@ int uv_pipe_write(uv_req_t* req, uv_buf_t bufs[], int bufcnt) { handle->write_queue_size += req->queued_bytes; } - req->flags |= UV_REQ_PENDING; handle->reqs_pending++; handle->write_reqs_pending++; @@ -1537,22 +1520,21 @@ int uv_pipe_write(uv_req_t* req, uv_buf_t bufs[], int bufcnt) { } -int uv_write(uv_req_t* req, uv_buf_t bufs[], int bufcnt) { - if (req->handle->type == UV_TCP) { - return uv_tcp_write(req, bufs, bufcnt); - } else if (req->handle->type == UV_NAMED_PIPE) { - return uv_pipe_write(req, bufs, bufcnt); +int uv_write(uv_write_t* req, uv_stream_t* handle, uv_buf_t bufs[], int bufcnt, + uv_write_cb cb) { + if (handle->type == UV_TCP) { + return uv_tcp_write(req, (uv_tcp_t*) handle, bufs, bufcnt, cb); + } else if (handle->type == UV_NAMED_PIPE) { + return uv_pipe_write(req, (uv_pipe_t*) handle, bufs, bufcnt, cb); } + uv_set_sys_error(WSAEINVAL); return -1; } -int uv_shutdown(uv_req_t* req) { - uv_tcp_t* handle = (uv_tcp_t*) req->handle; - int status = 0; - - if (!(req->handle->flags & UV_HANDLE_CONNECTION)) { +int uv_shutdown(uv_shutdown_t* req, uv_stream_t* handle, uv_shutdown_cb cb) { + if (!(handle->flags & UV_HANDLE_CONNECTION)) { uv_set_sys_error(WSAEINVAL); return -1; } @@ -1562,8 +1544,10 @@ int uv_shutdown(uv_req_t* req) { return -1; } + uv_req_init((uv_req_t*) req); req->type = UV_SHUTDOWN; - req->flags |= UV_REQ_PENDING; + req->handle = handle; + req->cb = cb; handle->flags |= UV_HANDLE_SHUTTING; handle->shutdown_req = req; @@ -1592,8 +1576,7 @@ static void uv_process_tcp_read_req(uv_tcp_t* handle, uv_req_t* req) { assert(handle->type == UV_TCP); - /* Mark the request non-pending */ - req->flags &= ~UV_REQ_PENDING; + handle->flags &= ~UV_HANDLE_READ_PENDING; if (req->error.code != UV_OK) { /* An error occurred doing the 0-read. */ @@ -1649,7 +1632,8 @@ static void uv_process_tcp_read_req(uv_tcp_t* handle, uv_req_t* req) { } /* Post another 0-read if still reading and not closing. */ - if (handle->flags & UV_HANDLE_READING) { + if ((handle->flags & UV_HANDLE_READING) && + !(handle->flags & UV_HANDLE_READ_PENDING)) { uv_tcp_queue_read(handle); } } @@ -1658,12 +1642,9 @@ static void uv_process_tcp_read_req(uv_tcp_t* handle, uv_req_t* req) { } -static void uv_process_tcp_write_req(uv_tcp_t* handle, uv_req_t* req) { +static void uv_process_tcp_write_req(uv_tcp_t* handle, uv_write_t* req) { assert(handle->type == UV_TCP); - /* Mark the request non-pending */ - req->flags &= ~UV_REQ_PENDING; - handle->write_queue_size -= req->queued_bytes; if (req->cb) { @@ -1684,9 +1665,6 @@ static void uv_process_tcp_write_req(uv_tcp_t* handle, uv_req_t* req) { static void uv_process_tcp_accept_req(uv_tcp_t* handle, uv_req_t* req) { assert(handle->type == UV_TCP); - /* Mark the request non-pending */ - req->flags &= ~UV_REQ_PENDING; - /* If handle->accepted_socket is not a valid socket, then */ /* uv_queue_accept must have failed. This is a serious error. We stop */ /* accepting connections and report this error to the connection */ @@ -1723,12 +1701,9 @@ static void uv_process_tcp_accept_req(uv_tcp_t* handle, uv_req_t* req) { } -static void uv_process_tcp_connect_req(uv_tcp_t* handle, uv_req_t* req) { +static void uv_process_tcp_connect_req(uv_tcp_t* handle, uv_connect_t* req) { assert(handle->type == UV_TCP); - /* Mark the request non-pending */ - req->flags &= ~UV_REQ_PENDING; - if (req->cb) { if (req->error.code == UV_OK) { if (setsockopt(handle->socket, @@ -1758,8 +1733,7 @@ static void uv_process_pipe_read_req(uv_pipe_t* handle, uv_req_t* req) { assert(handle->type == UV_NAMED_PIPE); - /* Mark the request non-pending */ - req->flags &= ~UV_REQ_PENDING; + handle->flags &= ~UV_HANDLE_READ_PENDING; if (req->error.code != UV_OK) { /* An error occurred doing the 0-read. */ @@ -1777,7 +1751,7 @@ static void uv_process_pipe_read_req(uv_pipe_t* handle, uv_req_t* req) { * This is so that ReadFile doesn't block if the read buffer is empty. */ mode = PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_NOWAIT; - if (!SetNamedPipeHandleState(handle->connection->handle, &mode, NULL, NULL)) { + if (!SetNamedPipeHandleState(handle->handle, &mode, NULL, NULL)) { /* We can't continue processing this read. */ handle->flags &= ~UV_HANDLE_READING; uv_set_sys_error(GetLastError()); @@ -1791,11 +1765,11 @@ static void uv_process_pipe_read_req(uv_pipe_t* handle, uv_req_t* req) { buf = handle->alloc_cb((uv_stream_t*)handle, 65536); assert(buf.len > 0); - if (ReadFile(handle->connection->handle, - buf.base, - buf.len, - &bytes, - NULL)) { + if (ReadFile(handle->handle, + buf.base, + buf.len, + &bytes, + NULL)) { if (bytes > 0) { /* Successful read */ handle->read_cb((uv_stream_t*)handle, bytes, buf); @@ -1816,7 +1790,7 @@ static void uv_process_pipe_read_req(uv_pipe_t* handle, uv_req_t* req) { err = GetLastError(); if (err == ERROR_NO_DATA) { /* Read buffer was completely empty, report a 0-byte read. */ - uv_set_sys_error(UV_EAGAIN); + uv_set_sys_error(WSAEWOULDBLOCK); handle->read_cb((uv_stream_t*)handle, 0, buf); } else { /* Ouch! serious error. */ @@ -1829,10 +1803,11 @@ static void uv_process_pipe_read_req(uv_pipe_t* handle, uv_req_t* req) { /* TODO: if the read callback stops reading we can't start reading again because the pipe will still be in nowait mode. */ - if (handle->flags & UV_HANDLE_READING) { + if ((handle->flags & UV_HANDLE_READING) && + !(handle->flags & UV_HANDLE_READ_PENDING)) { /* Switch back to blocking mode so that we can use IOCP for 0-reads */ mode = PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT; - if (SetNamedPipeHandleState(handle->connection->handle, &mode, NULL, NULL)) { + if (SetNamedPipeHandleState(handle->handle, &mode, NULL, NULL)) { /* Post another 0-read */ uv_pipe_queue_read(handle); } else { @@ -1851,12 +1826,9 @@ static void uv_process_pipe_read_req(uv_pipe_t* handle, uv_req_t* req) { } -static void uv_process_pipe_write_req(uv_pipe_t* handle, uv_req_t* req) { +static void uv_process_pipe_write_req(uv_pipe_t* handle, uv_write_t* req) { assert(handle->type == UV_NAMED_PIPE); - /* Mark the request non-pending */ - req->flags &= ~UV_REQ_PENDING; - handle->write_queue_size -= req->queued_bytes; if (req->cb) { @@ -1874,39 +1846,27 @@ static void uv_process_pipe_write_req(uv_pipe_t* handle, uv_req_t* req) { } -static void uv_process_pipe_accept_req(uv_pipe_t* handle, uv_req_t* req) { - uv_pipe_instance_t* pipeInstance; +static void uv_process_pipe_accept_req(uv_pipe_t* handle, uv_req_t* raw_req) { + struct uv_pipe_accept_s* req = (struct uv_pipe_accept_s*) raw_req; assert(handle->type == UV_NAMED_PIPE); - /* Mark the request non-pending */ - req->flags &= ~UV_REQ_PENDING; - if (req->error.code == UV_OK) { - assert(req->data); - - /* Create the connection instance and add it to the connections list. */ - pipeInstance = (uv_pipe_instance_t*)malloc(sizeof(uv_pipe_instance_t)); - if (!pipeInstance) { - uv_fatal_error(ERROR_OUTOFMEMORY, "malloc"); - } - - pipeInstance->handle = req->data; - pipeInstance->state = UV_PIPEINSTANCE_CONNECTED; - pipeInstance->next = handle->connections; - handle->connections = pipeInstance; + assert(req->pipeHandle != INVALID_HANDLE_VALUE); - /* Clear the request. */ - req->data = NULL; - req->flags = 0; + req->next_pending = handle->pending_accepts; + handle->pending_accepts = req; if (handle->connection_cb) { handle->connection_cb((uv_handle_t*)handle, 0); } } else { - /* Ignore errors and continue listening */ - if (handle->flags & UV_HANDLE_LISTENING) { - uv_pipe_queue_accept(handle); + if (req->pipeHandle != INVALID_HANDLE_VALUE) { + CloseHandle(req->pipeHandle); + req->pipeHandle = INVALID_HANDLE_VALUE; + } + if (!(handle->flags & UV_HANDLE_CLOSING)) { + uv_pipe_queue_accept(handle, req); } } @@ -1914,12 +1874,9 @@ static void uv_process_pipe_accept_req(uv_pipe_t* handle, uv_req_t* req) { } -static void uv_process_pipe_connect_req(uv_pipe_t* handle, uv_req_t* req) { +static void uv_process_pipe_connect_req(uv_pipe_t* handle, uv_connect_t* req) { assert(handle->type == UV_NAMED_PIPE); - /* Mark the request non-pending */ - req->flags &= ~UV_REQ_PENDING; - if (req->cb) { if (req->error.code == UV_OK) { uv_init_connection((uv_stream_t*)handle); @@ -2172,10 +2129,12 @@ int uv_async_init(uv_async_t* handle, uv_async_cb async_cb) { handle->flags = 0; handle->async_sent = 0; handle->error = uv_ok_; + handle->async_cb = async_cb; req = &handle->async_req; - uv_req_init(req, (uv_handle_t*)handle, async_cb); + uv_req_init(req); req->type = UV_WAKEUP; + req->data = handle; uv_refs_++; @@ -2211,8 +2170,8 @@ static void uv_process_async_wakeup_req(uv_async_t* handle, uv_req_t* req) { assert(req->type == UV_WAKEUP); handle->async_sent = 0; - if (req->cb) { - ((uv_async_cb)req->cb)((uv_async_t*) handle, 0); + if (handle->async_cb) { + handle->async_cb((uv_async_t*) handle, 0); } if (handle->flags & UV_HANDLE_CLOSING) { uv_want_endgame((uv_handle_t*)handle); @@ -2220,15 +2179,15 @@ static void uv_process_async_wakeup_req(uv_async_t* handle, uv_req_t* req) { } -#define DELEGATE_STREAM_REQ(req, method) \ +#define DELEGATE_STREAM_REQ(req, method, handle_at) \ do { \ - switch (req->handle->type) { \ + switch (((uv_handle_t*) (req)->handle_at)->type) { \ case UV_TCP: \ - uv_process_tcp_##method##_req((uv_tcp_t*) req->handle, req); \ + uv_process_tcp_##method##_req((uv_tcp_t*) ((req)->handle_at), req); \ break; \ \ case UV_NAMED_PIPE: \ - uv_process_pipe_##method##_req((uv_pipe_t*) req->handle, req); \ + uv_process_pipe_##method##_req((uv_pipe_t*) ((req)->handle_at), req); \ break; \ \ default: \ @@ -2243,35 +2202,35 @@ static void uv_process_reqs() { while (req = uv_remove_pending_req()) { switch (req->type) { case UV_READ: - DELEGATE_STREAM_REQ(req, read); + DELEGATE_STREAM_REQ(req, read, data); break; case UV_WRITE: - DELEGATE_STREAM_REQ(req, write); + DELEGATE_STREAM_REQ((uv_write_t*) req, write, handle); break; case UV_ACCEPT: - DELEGATE_STREAM_REQ(req, accept); + DELEGATE_STREAM_REQ(req, accept, data); break; case UV_CONNECT: - DELEGATE_STREAM_REQ(req, connect); + DELEGATE_STREAM_REQ((uv_connect_t*) req, connect, handle); break; case UV_WAKEUP: - uv_process_async_wakeup_req((uv_async_t*) req->handle, req); + uv_process_async_wakeup_req((uv_async_t*) req->data, req); break; case UV_ARES_EVENT_REQ: - uv_process_ares_event_req((uv_ares_action_t*) req->handle, req); + uv_process_ares_event_req((uv_ares_action_t*) req->data, req); break; case UV_ARES_CLEANUP_REQ: - uv_process_ares_cleanup_req((uv_ares_task_t*) req->handle, req); + uv_process_ares_cleanup_req((uv_ares_task_t*) req->data, req); break; case UV_GETADDRINFO_REQ: - uv_process_getaddrinfo_req((uv_getaddrinfo_t*) req->handle, req); + uv_process_getaddrinfo_req((uv_getaddrinfo_t*) req->data, req); break; default: @@ -2505,8 +2464,9 @@ VOID CALLBACK uv_ares_socksignal_tp(void* parameter, BOOLEAN timerfired) { selhandle->write = (network_events.lNetworkEvents & (FD_WRITE | FD_CONNECT)) ? 1 : 0; uv_ares_req = &selhandle->ares_req; - uv_req_init(uv_ares_req, (uv_handle_t*)selhandle, NULL); + uv_req_init(uv_ares_req); uv_ares_req->type = UV_ARES_EVENT_REQ; + uv_ares_req->data = selhandle; /* post ares needs to called */ if (!PostQueuedCompletionStatus(uv_iocp_, @@ -2551,8 +2511,9 @@ void uv_ares_sockstate_cb(void *data, ares_socket_t sock, int read, int write) { /* Post request to cleanup the Task */ uv_ares_req = &uv_handle_ares->ares_req; - uv_req_init(uv_ares_req, (uv_handle_t*)uv_handle_ares, NULL); + uv_req_init(uv_ares_req); uv_ares_req->type = UV_ARES_CLEANUP_REQ; + uv_ares_req->data = uv_handle_ares; /* post ares done with socket - finish cleanup when all threads done. */ if (!PostQueuedCompletionStatus(uv_iocp_, @@ -2981,7 +2942,8 @@ int uv_getaddrinfo(uv_getaddrinfo_t* handle, } /* init request for Post handling */ - uv_req_init(&handle->getadddrinfo_req, (uv_handle_t*)handle, NULL); + uv_req_init(&handle->getadddrinfo_req); + handle->getadddrinfo_req.data = handle; handle->getadddrinfo_req.type = UV_GETADDRINFO_REQ; /* Ask thread to run. Treat this as a long operation */ @@ -3007,6 +2969,7 @@ int uv_pipe_init(uv_pipe_t* handle) { handle->type = UV_NAMED_PIPE; handle->reqs_pending = 0; + handle->pending_accepts = NULL; uv_counters()->pipe_init++; @@ -3018,30 +2981,27 @@ int uv_pipe_init(uv_pipe_t* handle) { /* TODO: make this work with UTF8 name */ int uv_pipe_bind(uv_pipe_t* handle, const char* name) { int i; + struct uv_pipe_accept_s* req; if (!name) { + uv_set_sys_error(WSAEINVAL); return -1; } - handle->connections = NULL; - - /* Initialize accept requests. */ - for (i = 0; i < COUNTOF(handle->accept_reqs); i++) { - handle->accept_reqs[i].flags = 0; - handle->accept_reqs[i].type = UV_ACCEPT; - handle->accept_reqs[i].handle = (uv_handle_t*)handle; - handle->accept_reqs[i].cb = NULL; - handle->accept_reqs[i].data = NULL; - uv_counters()->req_init++; - } - /* Make our own copy of the pipe name */ - handle->name = (char*)malloc(MAX_PIPENAME_LEN); + handle->name = strdup(name); if (!handle->name) { uv_fatal_error(ERROR_OUTOFMEMORY, "malloc"); } - strcpy(handle->name, name); - handle->name[255] = '\0'; + + for (i = 0; i < COUNTOF(handle->accept_reqs); i++) { + req = &handle->accept_reqs[i]; + uv_req_init((uv_req_t*) req); + req->type = UV_ACCEPT; + req->data = handle; + req->pipeHandle = INVALID_HANDLE_VALUE; + req->next_pending = NULL; + } handle->flags |= UV_HANDLE_PIPESERVER; return 0; @@ -3050,9 +3010,7 @@ int uv_pipe_bind(uv_pipe_t* handle, const char* name) { /* Starts listening for connections for the given pipe. */ int uv_pipe_listen(uv_pipe_t* handle, uv_connection_cb cb) { - int i, maxInstances, errno; - HANDLE pipeHandle; - uv_pipe_instance_t* pipeInstance; + int i, errno; if (handle->flags & UV_HANDLE_LISTENING || handle->flags & UV_HANDLE_READING) { @@ -3068,32 +3026,36 @@ int uv_pipe_listen(uv_pipe_t* handle, uv_connection_cb cb) { handle->flags |= UV_HANDLE_LISTENING; handle->connection_cb = cb; - uv_pipe_queue_accept(handle); + for (i = 0; i < COUNTOF(handle->accept_reqs); i++) { + uv_pipe_queue_accept(handle, &handle->accept_reqs[i]); + } + return 0; } /* TODO: make this work with UTF8 name */ -int uv_pipe_connect(uv_req_t* req, const char* name) { +/* TODO: run this in the thread pool */ +int uv_pipe_connect(uv_connect_t* req, uv_pipe_t* handle, + const char* name, uv_connect_cb cb) { int errno; DWORD mode; - uv_pipe_t* handle = (uv_pipe_t*)req->handle; - - assert(!(req->flags & UV_REQ_PENDING)); + uv_req_init((uv_req_t*) req); req->type = UV_CONNECT; - handle->connection = &handle->clientConnection; - handle->server = NULL; + req->handle = (uv_stream_t*) handle; + req->cb = cb; + memset(&req->overlapped, 0, sizeof(req->overlapped)); - handle->clientConnection.handle = CreateFile(name, - GENERIC_READ | GENERIC_WRITE, - 0, - NULL, - OPEN_EXISTING, - FILE_FLAG_OVERLAPPED, - NULL); + handle->handle = CreateFile(name, + GENERIC_READ | GENERIC_WRITE, + 0, + NULL, + OPEN_EXISTING, + FILE_FLAG_OVERLAPPED, + NULL); - if (handle->clientConnection.handle == INVALID_HANDLE_VALUE && + if (handle->handle == INVALID_HANDLE_VALUE && GetLastError() != ERROR_IO_PENDING) { errno = GetLastError(); goto error; @@ -3101,12 +3063,12 @@ int uv_pipe_connect(uv_req_t* req, const char* name) { mode = PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT; - if (!SetNamedPipeHandleState(handle->clientConnection.handle, &mode, NULL, NULL)) { + if (!SetNamedPipeHandleState(handle->handle, &mode, NULL, NULL)) { errno = GetLastError(); goto error; } - if (CreateIoCompletionPort(handle->clientConnection.handle, + if (CreateIoCompletionPort(handle->handle, uv_iocp_, (ULONG_PTR)handle, 0) == NULL) { @@ -3115,124 +3077,36 @@ int uv_pipe_connect(uv_req_t* req, const char* name) { } req->error = uv_ok_; - req->flags |= UV_REQ_PENDING; - handle->connection->state = UV_PIPEINSTANCE_ACTIVE; - uv_insert_pending_req(req); + uv_insert_pending_req((uv_req_t*) req); handle->reqs_pending++; return 0; error: - close_pipe(handle, NULL, NULL); + if (handle->handle != INVALID_HANDLE_VALUE) { + CloseHandle(handle->handle); + } req->error = uv_new_sys_error(errno); - uv_insert_pending_req(req); + uv_insert_pending_req((uv_req_t*) req); handle->reqs_pending++; - return 0; + return -1; } /* Cleans up uv_pipe_t (server or connection) and all resources associated with it */ static void close_pipe(uv_pipe_t* handle, int* status, uv_err_t* err) { - uv_pipe_instance_t* connection, *next, *cur, **prev; - HANDLE pipeHandle; int i; + HANDLE pipeHandle; if (handle->flags & UV_HANDLE_PIPESERVER) { - if (handle->flags & UV_HANDLE_CONNECTION) { - /* - * The handle is for a connection instance on the pipe server. - * To clean-up, we call DisconnectNamedPipe, and then uv_pipe_queue_accept will cleanup the allocated uv_pipe_instance_t. - */ - - connection = handle->connection; - if (connection && connection->handle != INVALID_HANDLE_VALUE) { - /* Disconnect the connection intance and return it to pending state. */ - if (DisconnectNamedPipe(connection->handle)) { - if (status) *status = 0; - } else { - if (status) *status = -1; - if (err) *err = uv_new_sys_error(GetLastError()); - } - - connection->state = UV_PIPEINSTANCE_DISCONNECTED; - connection->handle = NULL; - - cur = handle->connections; - handle->connection = NULL; - prev = &handle->server->connections; - - /* Remove the connection from the list. */ - while (connection) { - if (cur == connection) { - *prev = connection->next; - free(connection); - break; - } else { - prev = &connection->next; - connection = connection->next; - } - } - - /* Queue accept now that the instance is in pending state. */ - if (!(handle->server->flags & UV_HANDLE_CLOSING)) { - uv_pipe_queue_accept(handle->server); - } - } - } else { - /* - * The handle is for the pipe server. - * To clean-up we close every active and pending connection instance. - */ - - if (handle->name) { - free(handle->name); - handle->name = NULL; + for (i = 0; i < COUNTOF(handle->accept_reqs); i++) { + pipeHandle = handle->accept_reqs[i].pipeHandle; + if (pipeHandle != INVALID_HANDLE_VALUE) { + CloseHandle(pipeHandle); } - - connection = handle->connections; - while (connection) { - pipeHandle = connection->handle; - - if (pipeHandle) { - DisconnectNamedPipe(pipeHandle); - CloseHandle(pipeHandle); - } - - next = connection->next; - free(connection); - connection = next; - } - - handle->connections = NULL; - - for (i = 0; i < COUNTOF(handle->accept_reqs); i++) { - if (handle->accept_reqs[i].flags & UV_REQ_PENDING) { - pipeHandle = handle->accept_reqs[i].data; - assert(pipeHandle); - DisconnectNamedPipe(pipeHandle); - CloseHandle(pipeHandle); - handle->accept_reqs[i].flags = 0; - handle->reqs_pending--; - } - } - - if (status) *status = 0; } + } else { - /* - * The handle is for a connection instance on the pipe client. - * To clean-up we close the pipe handle. - */ - connection = handle->connection; - if (connection && connection->handle != INVALID_HANDLE_VALUE) { - if (CloseHandle(connection->handle)) { - connection->state = UV_PIPEINSTANCE_DISCONNECTED; - handle->connection = NULL; - if (status) *status = 0; - } else { - if (status) *status = -1; - if (err) *err = uv_new_sys_error(GetLastError()); - } - } + CloseHandle(handle->handle); } handle->flags |= UV_HANDLE_SHUT; diff --git a/deps/uv/test/benchmark-ping-pongs.c b/deps/uv/test/benchmark-ping-pongs.c index 7124a3614a..721e8c82e3 100644 --- a/deps/uv/test/benchmark-ping-pongs.c +++ b/deps/uv/test/benchmark-ping-pongs.c @@ -34,8 +34,8 @@ typedef struct { int pongs; int state; uv_tcp_t tcp; - uv_req_t connect_req; - uv_req_t shutdown_req; + uv_connect_t connect_req; + uv_shutdown_t shutdown_req; } pinger_t; typedef struct buf_s { @@ -90,7 +90,7 @@ static void pinger_close_cb(uv_handle_t* handle) { } -static void pinger_write_cb(uv_req_t *req, int status) { +static void pinger_write_cb(uv_write_t* req, int status) { ASSERT(status == 0); free(req); @@ -98,22 +98,20 @@ static void pinger_write_cb(uv_req_t *req, int status) { static void pinger_write_ping(pinger_t* pinger) { - uv_req_t *req; + uv_write_t* req; uv_buf_t buf; buf.base = (char*)&PING; buf.len = strlen(PING); - req = (uv_req_t*)malloc(sizeof(*req)); - uv_req_init(req, (uv_handle_t*)(&pinger->tcp), pinger_write_cb); - - if (uv_write(req, &buf, 1)) { + req = malloc(sizeof *req); + if (uv_write(req, (uv_stream_t*) &pinger->tcp, &buf, 1, pinger_write_cb)) { FATAL("uv_write failed"); } } -static void pinger_shutdown_cb(uv_handle_t* handle, int status) { +static void pinger_shutdown_cb(uv_shutdown_t* req, int status) { ASSERT(status == 0); pinger_shutdown_cb_called++; @@ -151,8 +149,7 @@ static void pinger_read_cb(uv_stream_t* tcp, ssize_t nread, uv_buf_t buf) { if (pinger->state == 0) { pinger->pongs++; if (uv_now() - start_time > TIME) { - uv_req_init(&pinger->shutdown_req, (uv_handle_t*)tcp, pinger_shutdown_cb); - uv_shutdown(&pinger->shutdown_req); + uv_shutdown(&pinger->shutdown_req, (uv_stream_t*) tcp, pinger_shutdown_cb); break; } else { pinger_write_ping(pinger); @@ -164,14 +161,14 @@ static void pinger_read_cb(uv_stream_t* tcp, ssize_t nread, uv_buf_t buf) { } -static void pinger_connect_cb(uv_req_t *req, int status) { +static void pinger_connect_cb(uv_connect_t* req, int status) { pinger_t *pinger = (pinger_t*)req->handle->data; ASSERT(status == 0); pinger_write_ping(pinger); - if (uv_read_start((uv_stream_t*)(req->handle), buf_alloc, pinger_read_cb)) { + if (uv_read_start(req->handle, buf_alloc, pinger_read_cb)) { FATAL("uv_read_start failed"); } } @@ -193,13 +190,9 @@ static void pinger_new() { pinger->tcp.data = pinger; - /* We are never doing multiple reads/connects at a time anyway. */ - /* so these handles can be pre-initialized. */ - uv_req_init(&pinger->connect_req, (uv_handle_t*)&pinger->tcp, - pinger_connect_cb); - uv_tcp_bind(&pinger->tcp, client_addr); - r = uv_tcp_connect(&pinger->connect_req, server_addr); + + r = uv_tcp_connect(&pinger->connect_req, &pinger->tcp, server_addr, pinger_connect_cb); ASSERT(!r); } diff --git a/deps/uv/test/benchmark-pump.c b/deps/uv/test/benchmark-pump.c index ad3676d75a..d7524f7470 100644 --- a/deps/uv/test/benchmark-pump.c +++ b/deps/uv/test/benchmark-pump.c @@ -183,22 +183,22 @@ static void read_cb(uv_stream_t* stream, ssize_t bytes, uv_buf_t buf) { } -static void write_cb(uv_req_t *req, int status) { +static void write_cb(uv_write_t* req, int status) { uv_buf_t* buf = (uv_buf_t*) req->data; ASSERT(status == 0); - req_free(req); + req_free((uv_req_t*) req); nsent += sizeof write_buffer; nsent_total += sizeof write_buffer; - do_write((uv_stream_t*)req->handle); + do_write((uv_stream_t*) req->handle); } static void do_write(uv_stream_t* stream) { - uv_req_t* req; + uv_write_t* req; uv_buf_t buf; int r; @@ -206,23 +206,21 @@ static void do_write(uv_stream_t* stream) { buf.len = sizeof write_buffer; while (stream->write_queue_size == 0) { - req = req_alloc(); - uv_req_init(req, (uv_handle_t*)stream, write_cb); - - r = uv_write(req, &buf, 1); + req = (uv_write_t*) req_alloc(); + r = uv_write(req, stream, &buf, 1, write_cb); ASSERT(r == 0); } } -static void connect_cb(uv_req_t* req, int status) { +static void connect_cb(uv_connect_t* req, int status) { int i; if (status) LOG(uv_strerror(uv_last_error())); ASSERT(status == 0); write_sockets++; - req_free(req); + req_free((uv_req_t*) req); maybe_connect_some(); @@ -238,7 +236,7 @@ static void connect_cb(uv_req_t* req, int status) { static void maybe_connect_some() { - uv_req_t* req; + uv_connect_t* req; uv_tcp_t* tcp; uv_pipe_t* pipe; int r; @@ -251,9 +249,8 @@ static void maybe_connect_some() { r = uv_tcp_init(tcp); ASSERT(r == 0); - req = req_alloc(); - uv_req_init(req, (uv_handle_t*)tcp, connect_cb); - r = uv_tcp_connect(req, connect_addr); + req = (uv_connect_t*) req_alloc(); + r = uv_tcp_connect(req, tcp, connect_addr, connect_cb); ASSERT(r == 0); } else { pipe = &pipe_write_handles[max_connect_socket++]; @@ -261,9 +258,8 @@ static void maybe_connect_some() { r = uv_pipe_init(pipe); ASSERT(r == 0); - req = req_alloc(); - uv_req_init(req, (uv_handle_t*)pipe, connect_cb); - r = uv_pipe_connect(req, TEST_PIPENAME); + req = (uv_connect_t*) req_alloc(); + r = uv_pipe_connect(req, pipe, TEST_PIPENAME, connect_cb); ASSERT(r == 0); #ifdef _WIN32 @@ -308,7 +304,7 @@ static void connection_cb(uv_handle_t* s, int status) { */ typedef struct req_list_s { - uv_req_t uv_req; + union uv_any_req uv_req; struct req_list_s* next; } req_list_t; diff --git a/deps/uv/test/dns-server.c b/deps/uv/test/dns-server.c index 53076e7bfb..1c6b78fd33 100644 --- a/deps/uv/test/dns-server.c +++ b/deps/uv/test/dns-server.c @@ -27,7 +27,7 @@ typedef struct { - uv_req_t req; + uv_write_t req; uv_buf_t buf; } write_req_t; @@ -51,7 +51,7 @@ static int server_closed; static uv_tcp_t server; -static void after_write(uv_req_t* req, int status); +static void after_write(uv_write_t* req, int status); static void after_read(uv_stream_t*, ssize_t nread, uv_buf_t buf); static void on_close(uv_handle_t* peer); static void on_server_close(uv_handle_t* handle); @@ -67,7 +67,7 @@ unsigned char qrecord[] = {5, 'e', 'c', 'h', 'o', 's', 3, 's', 'r', 'v', 0, 0, 1 unsigned char arecord[] = {0xc0, 0x0c, 0, 1, 0, 1, 0, 0, 5, 0xbd, 0, 4, 10, 0, 1, 1 }; -static void after_write(uv_req_t* req, int status) { +static void after_write(uv_write_t* req, int status) { write_req_t* wr; if (status) { @@ -84,8 +84,8 @@ static void after_write(uv_req_t* req, int status) { } -static void after_shutdown(uv_req_t* req, int status) { - uv_close(req->handle, on_close); +static void after_shutdown(uv_shutdown_t* req, int status) { + uv_close((uv_handle_t*) req->handle, on_close); free(req); } @@ -116,7 +116,7 @@ static void addrsp(write_req_t* wr, char* hdr) { } static void process_req(uv_stream_t* handle, ssize_t nread, uv_buf_t buf) { - write_req_t *wr; + write_req_t* wr; dnshandle* dns = (dnshandle*)handle; char hdrbuf[DNSREC_LEN]; int hdrbuf_remaining = DNSREC_LEN; @@ -127,7 +127,6 @@ static void process_req(uv_stream_t* handle, ssize_t nread, uv_buf_t buf) { int usingprev = 0; wr = (write_req_t*) malloc(sizeof *wr); - uv_req_init(&wr->req, (uv_handle_t*)handle, after_write); wr->buf.base = (char*)malloc(WRITE_BUF_LEN); wr->buf.len = 0; @@ -197,7 +196,7 @@ static void process_req(uv_stream_t* handle, ssize_t nread, uv_buf_t buf) { /* send write buffer */ if (wr->buf.len > 0) { - if (uv_write(&wr->req, &wr->buf, 1)) { + if (uv_write((uv_write_t*) &wr->req, handle, &wr->buf, 1, after_write)) { FATAL("uv_write failed"); } } @@ -217,7 +216,7 @@ static void process_req(uv_stream_t* handle, ssize_t nread, uv_buf_t buf) { } static void after_read(uv_stream_t* handle, ssize_t nread, uv_buf_t buf) { - uv_req_t* req; + uv_shutdown_t* req; if (nread < 0) { /* Error or EOF */ @@ -227,9 +226,8 @@ static void after_read(uv_stream_t* handle, ssize_t nread, uv_buf_t buf) { free(buf.base); } - req = (uv_req_t*) malloc(sizeof *req); - uv_req_init(req, (uv_handle_t*)handle, after_shutdown); - uv_shutdown(req); + req = malloc(sizeof *req); + uv_shutdown(req, handle, after_shutdown); return; } diff --git a/deps/uv/test/echo-server.c b/deps/uv/test/echo-server.c index 4dc0e20c60..e107dc5b95 100644 --- a/deps/uv/test/echo-server.c +++ b/deps/uv/test/echo-server.c @@ -25,7 +25,7 @@ #include typedef struct { - uv_req_t req; + uv_write_t req; uv_buf_t buf; } write_req_t; @@ -35,14 +35,14 @@ static uv_tcp_t tcpServer; static uv_pipe_t pipeServer; static uv_handle_t* server; -static void after_write(uv_req_t* req, int status); +static void after_write(uv_write_t* req, int status); static void after_read(uv_stream_t*, ssize_t nread, uv_buf_t buf); static void on_close(uv_handle_t* peer); static void on_server_close(uv_handle_t* handle); static void on_connection(uv_handle_t*, int status); -static void after_write(uv_req_t* req, int status) { +static void after_write(uv_write_t* req, int status) { write_req_t* wr; if (status) { @@ -59,8 +59,8 @@ static void after_write(uv_req_t* req, int status) { } -static void after_shutdown(uv_req_t* req, int status) { - uv_close(req->handle, on_close); +static void after_shutdown(uv_shutdown_t* req, int status) { + uv_close((uv_handle_t*)req->handle, on_close); free(req); } @@ -68,7 +68,7 @@ static void after_shutdown(uv_req_t* req, int status) { static void after_read(uv_stream_t* handle, ssize_t nread, uv_buf_t buf) { int i; write_req_t *wr; - uv_req_t* req; + uv_shutdown_t* req; if (nread < 0) { /* Error or EOF */ @@ -78,9 +78,8 @@ static void after_read(uv_stream_t* handle, ssize_t nread, uv_buf_t buf) { free(buf.base); } - req = (uv_req_t*) malloc(sizeof *req); - uv_req_init(req, (uv_handle_t*)handle, (void *(*)(void *))after_shutdown); - uv_shutdown(req); + req = (uv_shutdown_t*) malloc(sizeof *req); + uv_shutdown(req, handle, after_shutdown); return; } @@ -103,10 +102,9 @@ static void after_read(uv_stream_t* handle, ssize_t nread, uv_buf_t buf) { wr = (write_req_t*) malloc(sizeof *wr); - uv_req_init(&wr->req, (uv_handle_t*)handle, (void *(*)(void *))after_write); wr->buf.base = buf.base; wr->buf.len = nread; - if (uv_write(&wr->req, &wr->buf, 1)) { + if (uv_write(&wr->req, handle, &wr->buf, 1, after_write)) { FATAL("uv_write failed"); } } diff --git a/deps/uv/test/test-callback-stack.c b/deps/uv/test/test-callback-stack.c index 5b12c8b9b4..4162b22112 100644 --- a/deps/uv/test/test-callback-stack.c +++ b/deps/uv/test/test-callback-stack.c @@ -32,7 +32,9 @@ static const char MESSAGE[] = "Failure is for the weak. Everyone dies alone."; static uv_tcp_t client; static uv_timer_t timer; -static uv_req_t connect_req, write_req, shutdown_req; +static uv_connect_t connect_req; +static uv_write_t write_req; +static uv_shutdown_t shutdown_req; static int nested = 0; static int close_cb_called = 0; @@ -59,7 +61,7 @@ static void close_cb(uv_handle_t* handle) { } -static void shutdown_cb(uv_req_t* req, int status) { +static void shutdown_cb(uv_shutdown_t* req, int status) { ASSERT(status == 0); ASSERT(nested == 0 && "shutdown_cb must be called from a fresh stack"); @@ -97,11 +99,10 @@ static void read_cb(uv_stream_t* tcp, ssize_t nread, uv_buf_t buf) { /* from a fresh stack. */ if (bytes_received == sizeof MESSAGE) { nested++; - uv_req_init(&shutdown_req, (uv_handle_t*)tcp, (void *(*)(void *))shutdown_cb); puts("Shutdown"); - if (uv_shutdown(&shutdown_req)) { + if (uv_shutdown(&shutdown_req, (uv_stream_t*)tcp, shutdown_cb)) { FATAL("uv_shutdown failed"); } nested--; @@ -131,7 +132,7 @@ static void timer_cb(uv_timer_t* handle, int status) { } -static void write_cb(uv_req_t* req, int status) { +static void write_cb(uv_write_t* req, int status) { int r; ASSERT(status == 0); @@ -154,7 +155,7 @@ static void write_cb(uv_req_t* req, int status) { } -static void connect_cb(uv_req_t* req, int status) { +static void connect_cb(uv_connect_t* req, int status) { uv_buf_t buf; puts("Connected. Write some data to echo server..."); @@ -167,9 +168,7 @@ static void connect_cb(uv_req_t* req, int status) { buf.base = (char*) &MESSAGE; buf.len = sizeof MESSAGE; - uv_req_init(&write_req, req->handle, (void *(*)(void *))write_cb); - - if (uv_write(&write_req, &buf, 1)) { + if (uv_write(&write_req, (uv_stream_t*)req->handle, &buf, 1, write_cb)) { FATAL("uv_write failed"); } @@ -191,10 +190,8 @@ TEST_IMPL(callback_stack) { puts("Connecting..."); nested++; - uv_req_init(&connect_req, (uv_handle_t*)&client, - (void *(*)(void *))connect_cb); - if (uv_tcp_connect(&connect_req, addr)) { + if (uv_tcp_connect(&connect_req, &client, addr, connect_cb)) { FATAL("uv_tcp_connect failed"); } nested--; diff --git a/deps/uv/test/test-connection-fail.c b/deps/uv/test/test-connection-fail.c index 9fc3f0ba6e..1c2d212120 100644 --- a/deps/uv/test/test-connection-fail.c +++ b/deps/uv/test/test-connection-fail.c @@ -27,7 +27,7 @@ static uv_tcp_t tcp; -static uv_req_t req; +static uv_connect_t req; static int connect_cb_calls; static int close_cb_calls; @@ -66,18 +66,18 @@ static void timer_cb(uv_timer_t* handle, int status) { } -static void on_connect_with_close(uv_req_t *req, int status) { - ASSERT(&tcp == (uv_tcp_t*) req->handle); +static void on_connect_with_close(uv_connect_t *req, int status) { + ASSERT((uv_stream_t*) &tcp == req->handle); ASSERT(status == -1); ASSERT(uv_last_error().code == UV_ECONNREFUSED); connect_cb_calls++; ASSERT(close_cb_calls == 0); - uv_close(req->handle, on_close); + uv_close((uv_handle_t*)req->handle, on_close); } -static void on_connect_without_close(uv_req_t *req, int status) { +static void on_connect_without_close(uv_connect_t *req, int status) { ASSERT(status == -1); ASSERT(uv_last_error().code == UV_ECONNREFUSED); connect_cb_calls++; @@ -103,10 +103,8 @@ void connection_fail(uv_connect_cb connect_cb) { /* We are never doing multiple reads/connects at a time anyway. */ /* so these handles can be pre-initialized. */ - uv_req_init(&req, (uv_handle_t*)&tcp, (void *(*)(void *))connect_cb); - uv_tcp_bind(&tcp, client_addr); - r = uv_tcp_connect(&req, server_addr); + r = uv_tcp_connect(&req, &tcp, server_addr, connect_cb); ASSERT(!r); uv_run(); diff --git a/deps/uv/test/test-delayed-accept.c b/deps/uv/test/test-delayed-accept.c index 6f56518434..30b63b9b6d 100644 --- a/deps/uv/test/test-delayed-accept.c +++ b/deps/uv/test/test-delayed-accept.c @@ -147,7 +147,7 @@ static void read_cb(uv_stream_t* tcp, ssize_t nread, uv_buf_t buf) { } -static void connect_cb(uv_req_t* req, int status) { +static void connect_cb(uv_connect_t* req, int status) { int r; ASSERT(req != NULL); @@ -167,7 +167,7 @@ static void connect_cb(uv_req_t* req, int status) { static void client_connect() { struct sockaddr_in addr = uv_ip4_addr("127.0.0.1", TEST_PORT); uv_tcp_t* client = (uv_tcp_t*)malloc(sizeof *client); - uv_req_t* connect_req = (uv_req_t*)malloc(sizeof *connect_req); + uv_connect_t* connect_req = malloc(sizeof *connect_req); int r; ASSERT(client != NULL); @@ -176,8 +176,7 @@ static void client_connect() { r = uv_tcp_init(client); ASSERT(r == 0); - uv_req_init(connect_req, (uv_handle_t*)client, (void *(*)(void *))connect_cb); - r = uv_tcp_connect(connect_req, addr); + r = uv_tcp_connect(connect_req, client, addr, connect_cb); ASSERT(r == 0); } diff --git a/deps/uv/test/test-getsockname.c b/deps/uv/test/test-getsockname.c index 9d265d37d7..f533806c3b 100644 --- a/deps/uv/test/test-getsockname.c +++ b/deps/uv/test/test-getsockname.c @@ -29,7 +29,7 @@ static int getsocknamecount = 0; static uv_tcp_t tcp; -static uv_req_t connect_req; +static uv_connect_t connect_req; static uv_tcp_t tcpServer; @@ -47,22 +47,23 @@ static void on_close(uv_handle_t* peer) { } -static void after_shutdown(uv_req_t* req, int status) { - uv_close(req->handle, on_close); +static void after_shutdown(uv_shutdown_t* req, int status) { + uv_close((uv_handle_t*) req->handle, on_close); free(req); } static void after_read(uv_stream_t* handle, ssize_t nread, uv_buf_t buf) { - uv_req_t* req; + uv_shutdown_t* req; + int r; if (buf.base) { free(buf.base); } - req = (uv_req_t*) malloc(sizeof *req); - uv_req_init(req, (uv_handle_t*)handle, (void *(*)(void *))after_shutdown); - uv_shutdown(req); + req = (uv_shutdown_t*) malloc(sizeof *req); + r = uv_shutdown(req, handle, after_shutdown); + ASSERT(r == 0); } @@ -102,16 +103,18 @@ static void on_connection(uv_handle_t* server, int status) { } -static void on_connect(void* req) { +static void on_connect(uv_connect_t* req, int status) { struct sockaddr sockname; int namelen = sizeof(sockname); - int status; + int r; - status = uv_getsockname(&tcp, &sockname, &namelen); - if (status != 0) { + ASSERT(status == 0); + + r = uv_getsockname(&tcp, &sockname, &namelen); + if (r != 0) { fprintf(stderr, "uv_getsockname error (connector) %d\n", uv_last_error().code); } - ASSERT(status == 0); + ASSERT(r == 0); getsocknamecount++; @@ -162,9 +165,7 @@ static void tcp_connector() { tcp.data = &connect_req; ASSERT(!r); - uv_req_init(&connect_req, (uv_handle_t*)(&tcp), (void *(*)(void *))on_connect); - - r = uv_tcp_connect(&connect_req, server_addr); + r = uv_tcp_connect(&connect_req, &tcp, server_addr, on_connect); ASSERT(!r); } diff --git a/deps/uv/test/test-ping-pong.c b/deps/uv/test/test-ping-pong.c index 81cd93e533..e4d4f1d03d 100644 --- a/deps/uv/test/test-ping-pong.c +++ b/deps/uv/test/test-ping-pong.c @@ -43,8 +43,7 @@ typedef struct { uv_tcp_t tcp; uv_pipe_t pipe; }; - uv_req_t connect_req; - uv_req_t read_req; + uv_connect_t connect_req; char read_buffer[BUFSIZE]; } pinger_t; @@ -70,25 +69,22 @@ static void pinger_on_close(uv_handle_t* handle) { } -static void pinger_after_write(uv_req_t *req, int status) { +static void pinger_after_write(uv_write_t *req, int status) { ASSERT(status == 0); - free(req); } static void pinger_write_ping(pinger_t* pinger) { - uv_req_t *req; + uv_write_t *req; uv_buf_t buf; buf.base = (char*)&PING; buf.len = strlen(PING); - req = (uv_req_t*)malloc(sizeof(*req)); - uv_req_init(req, (uv_handle_t*)(&pinger->tcp), - (void *(*)(void *))pinger_after_write); + req = malloc(sizeof(uv_write_t)); - if (uv_write(req, &buf, 1)) { + if (uv_write(req, (uv_stream_t*)&pinger->tcp, &buf, 1, pinger_after_write)) { FATAL("uv_write failed"); } @@ -134,7 +130,7 @@ static void pinger_read_cb(uv_stream_t* stream, ssize_t nread, uv_buf_t buf) { } -static void pinger_on_connect(uv_req_t *req, int status) { +static void pinger_on_connect(uv_connect_t *req, int status) { pinger_t *pinger = (pinger_t*)req->handle->data; ASSERT(status == 0); @@ -162,10 +158,8 @@ static void tcp_pinger_v6_new() { /* We are never doing multiple reads/connects at a time anyway. */ /* so these handles can be pre-initialized. */ - uv_req_init(&pinger->connect_req, (uv_handle_t*)(&pinger->tcp), - (void *(*)(void *))pinger_on_connect); - - r = uv_tcp_connect6(&pinger->connect_req, server_addr); + r = uv_tcp_connect6(&pinger->connect_req, &pinger->tcp, server_addr, + pinger_on_connect); ASSERT(!r); } @@ -186,10 +180,7 @@ static void tcp_pinger_new() { /* We are never doing multiple reads/connects at a time anyway. */ /* so these handles can be pre-initialized. */ - uv_req_init(&pinger->connect_req, (uv_handle_t*)(&pinger->tcp), - (void *(*)(void *))pinger_on_connect); - - r = uv_tcp_connect(&pinger->connect_req, server_addr); + r = uv_tcp_connect(&pinger->connect_req, &pinger->tcp, server_addr, pinger_on_connect); ASSERT(!r); } @@ -209,10 +200,8 @@ static void pipe_pinger_new() { /* We are never doing multiple reads/connects at a time anyway. */ /* so these handles can be pre-initialized. */ - uv_req_init(&pinger->connect_req, (uv_handle_t*)(&pinger->pipe), - (void *(*)(void *))pinger_on_connect); - r = uv_pipe_connect(&pinger->connect_req, TEST_PIPENAME); + r = uv_pipe_connect(&pinger->connect_req, &pinger->pipe, TEST_PIPENAME, pinger_on_connect); ASSERT(!r); } diff --git a/deps/uv/test/test-shutdown-eof.c b/deps/uv/test/test-shutdown-eof.c index 8a960c9ea6..67af49531f 100644 --- a/deps/uv/test/test-shutdown-eof.c +++ b/deps/uv/test/test-shutdown-eof.c @@ -26,7 +26,9 @@ static uv_timer_t timer; static uv_tcp_t tcp; -static uv_req_t connect_req, write_req, shutdown_req; +static uv_connect_t connect_req; +static uv_write_t write_req; +static uv_shutdown_t shutdown_req; static uv_buf_t qbuf; static int got_q; static int got_eof; @@ -74,7 +76,7 @@ static void read_cb(uv_stream_t* t, ssize_t nread, uv_buf_t buf) { } -static void shutdown_cb(uv_req_t *req, int status) { +static void shutdown_cb(uv_shutdown_t *req, int status) { ASSERT(req == &shutdown_req); ASSERT(called_connect_cb == 1); @@ -87,7 +89,7 @@ static void shutdown_cb(uv_req_t *req, int status) { } -static void connect_cb(uv_req_t *req, int status) { +static void connect_cb(uv_connect_t *req, int status) { ASSERT(status == 0); ASSERT(req == &connect_req); @@ -98,12 +100,10 @@ static void connect_cb(uv_req_t *req, int status) { * Write the letter 'Q' to gracefully kill the echo-server. This will not * effect our connection. */ - uv_req_init(&write_req, (uv_handle_t*)&tcp, NULL); - uv_write(&write_req, &qbuf, 1); + uv_write(&write_req, (uv_stream_t*) &tcp, &qbuf, 1, NULL); /* Shutdown our end of the connection. */ - uv_req_init(&shutdown_req, (uv_handle_t*)&tcp, (void *(*)(void *))shutdown_cb); - uv_shutdown(&shutdown_req); + uv_shutdown(&shutdown_req, (uv_stream_t*) &tcp, shutdown_cb); called_connect_cb++; ASSERT(called_shutdown_cb == 0); @@ -165,8 +165,7 @@ TEST_IMPL(shutdown_eof) { r = uv_tcp_init(&tcp); ASSERT(!r); - uv_req_init(&connect_req, (uv_handle_t*) &tcp, (void *(*)(void *))connect_cb); - r = uv_tcp_connect(&connect_req, server_addr); + r = uv_tcp_connect(&connect_req, &tcp, server_addr, connect_cb); ASSERT(!r); uv_run(); diff --git a/deps/uv/test/test-tcp-writealot.c b/deps/uv/test/test-tcp-writealot.c index 4e305a9f33..73cf45b1c9 100644 --- a/deps/uv/test/test-tcp-writealot.c +++ b/deps/uv/test/test-tcp-writealot.c @@ -62,7 +62,7 @@ static void close_cb(uv_handle_t* handle) { } -static void shutdown_cb(uv_req_t* req, int status) { +static void shutdown_cb(uv_shutdown_t* req, int status) { uv_tcp_t* tcp; ASSERT(req); @@ -104,7 +104,7 @@ static void read_cb(uv_stream_t* tcp, ssize_t nread, uv_buf_t buf) { } -static void write_cb(uv_req_t* req, int status) { +static void write_cb(uv_write_t* req, int status) { ASSERT(req != NULL); if (status) { @@ -120,9 +120,11 @@ static void write_cb(uv_req_t* req, int status) { } -static void connect_cb(uv_req_t* req, int status) { +static void connect_cb(uv_connect_t* req, int status) { uv_buf_t send_bufs[CHUNKS_PER_WRITE]; uv_tcp_t* tcp; + uv_write_t* write_req; + uv_shutdown_t* shutdown_req; int i, j, r; ASSERT(req != NULL); @@ -141,26 +143,21 @@ static void connect_cb(uv_req_t* req, int status) { bytes_sent += CHUNK_SIZE; } - req = (uv_req_t*)malloc(sizeof *req); - ASSERT(req != NULL); + write_req = malloc(sizeof(uv_write_t)); + ASSERT(write_req != NULL); - uv_req_init(req, (uv_handle_t*)tcp, (void *(*)(void *))write_cb); - r = uv_write(req, (uv_buf_t*)&send_bufs, CHUNKS_PER_WRITE); + r = uv_write(write_req, (uv_stream_t*) tcp, (uv_buf_t*)&send_bufs, + CHUNKS_PER_WRITE, write_cb); ASSERT(r == 0); } /* Shutdown on drain. FIXME: dealloc req? */ - req = (uv_req_t*) malloc(sizeof(uv_req_t)); - ASSERT(req != NULL); - uv_req_init(req, (uv_handle_t*)tcp, (void *(*)(void *))shutdown_cb); - r = uv_shutdown(req); + shutdown_req = malloc(sizeof(uv_shutdown_t)); + ASSERT(shutdown_req != NULL); + r = uv_shutdown(shutdown_req, (uv_stream_t*)tcp, shutdown_cb); ASSERT(r == 0); /* Start reading */ - req = (uv_req_t*)malloc(sizeof *req); - ASSERT(req != NULL); - - uv_req_init(req, (uv_handle_t*)tcp, (void *(*)(void *))read_cb); r = uv_read_start((uv_stream_t*)tcp, alloc_cb, read_cb); ASSERT(r == 0); } @@ -169,7 +166,7 @@ static void connect_cb(uv_req_t* req, int status) { TEST_IMPL(tcp_writealot) { struct sockaddr_in addr = uv_ip4_addr("127.0.0.1", TEST_PORT); uv_tcp_t* client = (uv_tcp_t*)malloc(sizeof *client); - uv_req_t* connect_req = (uv_req_t*)malloc(sizeof *connect_req); + uv_connect_t* connect_req = malloc(sizeof(uv_connect_t)); int r; ASSERT(client != NULL); @@ -184,8 +181,7 @@ TEST_IMPL(tcp_writealot) { r = uv_tcp_init(client); ASSERT(r == 0); - uv_req_init(connect_req, (uv_handle_t*)client, (void *(*)(void *))connect_cb); - r = uv_tcp_connect(connect_req, addr); + r = uv_tcp_connect(connect_req, client, addr, connect_cb); ASSERT(r == 0); uv_run(); diff --git a/src/tcp_wrap.cc b/src/tcp_wrap.cc index 1e530e8bc7..b0e5dfef5e 100644 --- a/src/tcp_wrap.cc +++ b/src/tcp_wrap.cc @@ -61,25 +61,37 @@ static Persistent write_queue_size_sym; class TCPWrap; +template class ReqWrap { public: - ReqWrap(uv_handle_t* handle, void* callback) { + ReqWrap() { HandleScope scope; object_ = Persistent::New(Object::New()); - uv_req_init(&req_, handle, (void* (*)(void*))callback); - req_.data = this; } ~ReqWrap() { + // Assert that someone has called Dispatched() + assert(req_.data == this); assert(!object_.IsEmpty()); object_.Dispose(); object_.Clear(); } + // Call this after the req has been dispatched. + void Dispatched() { + req_.data = this; + } + Persistent object_; - uv_req_t req_; + T req_; }; + +typedef class ReqWrap ShutdownWrap; +typedef class ReqWrap WriteWrap; +typedef class ReqWrap ConnectWrap; + + class TCPWrap { public: @@ -373,8 +385,8 @@ class TCPWrap { return scope.Close(Integer::New(r)); } - static void AfterWrite(uv_req_t* req, int status) { - ReqWrap* req_wrap = (ReqWrap*) req->data; + static void AfterWrite(uv_write_t* req, int status) { + WriteWrap* req_wrap = (WriteWrap*) req->data; TCPWrap* wrap = (TCPWrap*) req->handle->data; HandleScope scope; @@ -420,11 +432,7 @@ class TCPWrap { length = args[2]->IntegerValue(); } - // I hate when people program C++ like it was C, and yet I do it too. - // I'm too lazy to come up with the perfect class hierarchy here. Let's - // just do some type munging. - ReqWrap* req_wrap = new ReqWrap((uv_handle_t*) &wrap->handle_, - (void*)AfterWrite); + WriteWrap* req_wrap = new WriteWrap(); req_wrap->object_->SetHiddenValue(buffer_sym, buffer_obj); @@ -432,7 +440,10 @@ class TCPWrap { buf.base = Buffer::Data(buffer_obj) + offset; buf.len = length; - int r = uv_write(&req_wrap->req_, &buf, 1); + int r = uv_write(&req_wrap->req_, (uv_stream_t*)&wrap->handle_, &buf, 1, + AfterWrite); + + req_wrap->Dispatched(); wrap->UpdateWriteQueueSize(); @@ -445,8 +456,8 @@ class TCPWrap { } } - static void AfterConnect(uv_req_t* req, int status) { - ReqWrap* req_wrap = (ReqWrap*) req->data; + static void AfterConnect(uv_connect_t* req, int status) { + ConnectWrap* req_wrap = (ConnectWrap*) req->data; TCPWrap* wrap = (TCPWrap*) req->handle->data; HandleScope scope; @@ -482,10 +493,12 @@ class TCPWrap { // I hate when people program C++ like it was C, and yet I do it too. // I'm too lazy to come up with the perfect class hierarchy here. Let's // just do some type munging. - ReqWrap* req_wrap = new ReqWrap((uv_handle_t*) &wrap->handle_, - (void*)AfterConnect); + ConnectWrap* req_wrap = new ConnectWrap(); + + int r = uv_tcp_connect(&req_wrap->req_, &wrap->handle_, address, + AfterConnect); - int r = uv_tcp_connect(&req_wrap->req_, address); + req_wrap->Dispatched(); if (r) { SetErrno(uv_last_error().code); @@ -506,13 +519,12 @@ class TCPWrap { struct sockaddr_in6 address = uv_ip6_addr(*ip_address, port); - // I hate when people program C++ like it was C, and yet I do it too. - // I'm too lazy to come up with the perfect class hierarchy here. Let's - // just do some type munging. - ReqWrap* req_wrap = new ReqWrap((uv_handle_t*) &wrap->handle_, - (void*)AfterConnect); + ConnectWrap* req_wrap = new ConnectWrap(); - int r = uv_tcp_connect6(&req_wrap->req_, address); + int r = uv_tcp_connect6(&req_wrap->req_, &wrap->handle_, address, + AfterConnect); + + req_wrap->Dispatched(); if (r) { SetErrno(uv_last_error().code); @@ -523,8 +535,8 @@ class TCPWrap { } } - static void AfterShutdown(uv_req_t* req, int status) { - ReqWrap* req_wrap = (ReqWrap*) req->data; + static void AfterShutdown(uv_shutdown_t* req, int status) { + ReqWrap* req_wrap = (ReqWrap*) req->data; TCPWrap* wrap = (TCPWrap*) req->handle->data; // The request object should still be there. @@ -552,10 +564,12 @@ class TCPWrap { UNWRAP - ReqWrap* req_wrap = new ReqWrap((uv_handle_t*) &wrap->handle_, - (void*)AfterShutdown); + ShutdownWrap* req_wrap = new ShutdownWrap(); + + int r = uv_shutdown(&req_wrap->req_, (uv_stream_t*) &wrap->handle_, + AfterShutdown); - int r = uv_shutdown(&req_wrap->req_); + req_wrap->Dispatched(); if (r) { SetErrno(uv_last_error().code); @@ -569,7 +583,6 @@ class TCPWrap { uv_tcp_t handle_; Persistent object_; size_t slab_offset_; - friend class ReqWrap; };