From 2e16ae703ee5d14cf7199218b3112407a188af42 Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Mon, 18 Jul 2011 16:26:37 -0700 Subject: [PATCH] Upgrade libuv to 4eff34da4 --- deps/uv/include/uv-unix.h | 22 +- deps/uv/include/uv-win.h | 2 +- deps/uv/src/uv-unix.c | 854 ++++++++++++++++++++++++---------- deps/uv/src/uv-win.c | 153 ++++-- deps/uv/test/benchmark-pump.c | 7 - deps/uv/test/echo-server.c | 43 +- deps/uv/test/task.h | 3 +- 7 files changed, 775 insertions(+), 309 deletions(-) diff --git a/deps/uv/include/uv-unix.h b/deps/uv/include/uv-unix.h index 5a92f1257d..cad2ed4abe 100644 --- a/deps/uv/include/uv-unix.h +++ b/deps/uv/include/uv-unix.h @@ -67,25 +67,27 @@ typedef struct { #define UV_STREAM_PRIVATE_FIELDS \ uv_read_cb read_cb; \ - uv_alloc_cb alloc_cb; - - -/* UV_TCP */ -#define UV_TCP_PRIVATE_FIELDS \ - int delayed_error; \ - uv_connection_cb connection_cb; \ - int accepted_fd; \ + uv_alloc_cb alloc_cb; \ uv_connect_t *connect_req; \ uv_shutdown_t *shutdown_req; \ ev_io read_watcher; \ ev_io write_watcher; \ ngx_queue_t write_queue; \ - ngx_queue_t write_completed_queue; + ngx_queue_t write_completed_queue; \ + int delayed_error; \ + uv_connection_cb connection_cb; \ + int accepted_fd; + + +/* UV_TCP */ +#define UV_TCP_PRIVATE_FIELDS /* UV_NAMED_PIPE */ #define UV_PIPE_PRIVATE_TYPEDEF -#define UV_PIPE_PRIVATE_FIELDS +#define UV_PIPE_PRIVATE_FIELDS \ + UV_TCP_PRIVATE_FIELDS \ + const char* pipe_fname; /* strdup'ed */ \ /* UV_PREPARE */ \ diff --git a/deps/uv/include/uv-win.h b/deps/uv/include/uv-win.h index d9f65ba5d1..12588e9697 100644 --- a/deps/uv/include/uv-win.h +++ b/deps/uv/include/uv-win.h @@ -96,7 +96,6 @@ typedef struct uv_buf_t { struct uv_req_s accept_req; \ #define uv_pipe_server_fields \ - char* name; \ uv_pipe_accept_t accept_reqs[4]; \ uv_pipe_accept_t* pending_accepts; @@ -104,6 +103,7 @@ typedef struct uv_buf_t { HANDLE handle; #define UV_PIPE_PRIVATE_FIELDS \ + char* name; \ union { \ struct { uv_pipe_server_fields }; \ struct { uv_pipe_connection_fields }; \ diff --git a/deps/uv/src/uv-unix.c b/deps/uv/src/uv-unix.c index 1dc20d8437..f35cc6f043 100644 --- a/deps/uv/src/uv-unix.c +++ b/deps/uv/src/uv-unix.c @@ -22,6 +22,8 @@ #include "uv-common.h" #include "uv-eio.h" +#define _GNU_SOURCE /* O_CLOEXEC */ + #include /* NULL */ #include /* printf */ #include @@ -29,8 +31,11 @@ #include #include #include +#include +#include #include #include +#include #include #include #include /* PATH_MAX */ @@ -60,11 +65,30 @@ static struct uv_ares_data_s ares_data; void uv__req_init(uv_req_t*); -void uv__tcp_io(EV_P_ ev_io* watcher, int revents); void uv__next(EV_P_ ev_idle* watcher, int revents); -static void uv__tcp_connect(uv_tcp_t*); -int uv_tcp_open(uv_tcp_t*, int fd); +static int uv__stream_open(uv_stream_t*, int fd); static void uv__finish_close(uv_handle_t* handle); +static uv_err_t uv_err_new(uv_handle_t* handle, int sys_error); + +static uv_write_t* uv__write(uv_stream_t* stream); +static void uv__read(uv_stream_t* stream); +static void uv__stream_connect(uv_stream_t*); +static void uv__stream_io(EV_P_ ev_io* watcher, int revents); +static void uv__pipe_accept(EV_P_ ev_io* watcher, int revents); + +#ifndef __GNUC__ +#define __attribute__(a) +#endif + +/* Unused on systems that support O_CLOEXEC, SOCK_CLOEXEC, etc. */ +static int uv__cloexec(int fd, int set) __attribute__((unused)); +static int uv__nonblock(int fd, int set) __attribute__((unused)); + +static int uv__socket(int domain, int type, int protocol); +static int uv__accept(int sockfd, struct sockaddr* saddr, socklen_t len); + +size_t uv__strlcpy(char* dst, const char* src, size_t size); + /* flags */ enum { @@ -161,6 +185,7 @@ static uv_err_t uv_err_new(uv_handle_t* handle, int sys_error) { int uv_close(uv_handle_t* handle, uv_close_cb close_cb) { uv_tcp_t* tcp; + uv_pipe_t* pipe; uv_async_t* async; uv_timer_t* timer; @@ -199,6 +224,22 @@ int uv_close(uv_handle_t* handle, uv_close_cb close_cb) { ev_timer_stop(EV_DEFAULT_ &timer->timer_watcher); break; + case UV_NAMED_PIPE: + pipe = (uv_pipe_t*)handle; + if (pipe->pipe_fname) { + /* + * Unlink the file system entity before closing the file descriptor. + * Doing it the other way around introduces a race where our process + * unlinks a socket with the same name that's just been created by + * another thread or process. + */ + unlink(pipe->pipe_fname); + free((void*)pipe->pipe_fname); + } + uv_read_stop((uv_stream_t*)pipe); + ev_io_stop(EV_DEFAULT_ &pipe->write_watcher); + break; + default: assert(0); return -1; @@ -258,10 +299,10 @@ int uv_tcp_init(uv_tcp_t* tcp) { ngx_queue_init(&tcp->write_completed_queue); tcp->write_queue_size = 0; - ev_init(&tcp->read_watcher, uv__tcp_io); + ev_init(&tcp->read_watcher, uv__stream_io); tcp->read_watcher.data = tcp; - ev_init(&tcp->write_watcher, uv__tcp_io); + ev_init(&tcp->write_watcher, uv__stream_io); tcp->write_watcher.data = tcp; assert(ngx_queue_empty(&tcp->write_queue)); @@ -273,40 +314,42 @@ int uv_tcp_init(uv_tcp_t* tcp) { int uv__bind(uv_tcp_t* tcp, int domain, struct sockaddr* addr, int addrsize) { - int r; + int saved_errno; + int status; + int fd; - if (tcp->fd <= 0) { - int fd = socket(domain, SOCK_STREAM, 0); + saved_errno = errno; + status = -1; - if (fd < 0) { + if (tcp->fd <= 0) { + if ((fd = uv__socket(domain, SOCK_STREAM, 0)) == -1) { uv_err_new((uv_handle_t*)tcp, errno); - return -1; + goto out; } - if (uv_tcp_open(tcp, fd)) { + if (uv__stream_open((uv_stream_t*)tcp, fd)) { + status = -2; close(fd); - return -2; + goto out; } } assert(tcp->fd >= 0); - r = bind(tcp->fd, addr, addrsize); tcp->delayed_error = 0; - - if (r) { - switch (errno) { - case EADDRINUSE: - tcp->delayed_error = errno; - return 0; - - default: - uv_err_new((uv_handle_t*)tcp, errno); - return -1; + if (bind(tcp->fd, addr, addrsize) == -1) { + if (errno == EADDRINUSE) { + tcp->delayed_error = errno; + } else { + uv_err_new((uv_handle_t*)tcp, errno); + goto out; } } + status = 0; - return 0; +out: + errno = saved_errno; + return status; } @@ -330,32 +373,27 @@ int uv_tcp_bind6(uv_tcp_t* tcp, struct sockaddr_in6 addr) { } -int uv_tcp_open(uv_tcp_t* tcp, int fd) { - int yes; - int r; +static int uv__stream_open(uv_stream_t* stream, int fd) { + socklen_t yes; assert(fd >= 0); - tcp->fd = fd; + stream->fd = fd; - /* Set non-blocking. */ + /* Reuse the port address if applicable. */ yes = 1; - r = fcntl(fd, F_SETFL, O_NONBLOCK); - assert(r == 0); - - /* Reuse the port address. */ - r = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)); - assert(r == 0); + if (stream->type == UV_TCP + && setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)) == -1) { + uv_err_new((uv_handle_t*)stream, errno); + return -1; + } /* Associate the fd with each ev_io watcher. */ - ev_io_set(&tcp->read_watcher, fd, EV_READ); - ev_io_set(&tcp->write_watcher, fd, EV_WRITE); + ev_io_set(&stream->read_watcher, fd, EV_READ); + ev_io_set(&stream->write_watcher, fd, EV_WRITE); - /* These should have been set up by uv_tcp_init. */ - assert(tcp->next_watcher.data == tcp); - assert(tcp->write_watcher.data == tcp); - assert(tcp->read_watcher.data == tcp); - assert(tcp->read_watcher.cb == uv__tcp_io); - assert(tcp->write_watcher.cb == uv__tcp_io); + /* These should have been set up by uv_tcp_init or uv_pipe_init. */ + assert(stream->read_watcher.cb == uv__stream_io); + assert(stream->write_watcher.cb == uv__stream_io); return 0; } @@ -365,22 +403,22 @@ void uv__server_io(EV_P_ ev_io* watcher, int revents) { int fd; struct sockaddr_storage addr; socklen_t addrlen = sizeof(struct sockaddr_storage); - uv_tcp_t* tcp = watcher->data; + uv_stream_t* stream = watcher->data; - assert(watcher == &tcp->read_watcher || - watcher == &tcp->write_watcher); + assert(watcher == &stream->read_watcher || + watcher == &stream->write_watcher); assert(revents == EV_READ); - assert(!uv_flag_is_set((uv_handle_t*)tcp, UV_CLOSING)); + assert(!uv_flag_is_set((uv_handle_t*)stream, UV_CLOSING)); - if (tcp->accepted_fd >= 0) { - ev_io_stop(EV_DEFAULT_ &tcp->read_watcher); + if (stream->accepted_fd >= 0) { + ev_io_stop(EV_DEFAULT_ &stream->read_watcher); return; } while (1) { - assert(tcp->accepted_fd < 0); - fd = accept(tcp->fd, (struct sockaddr*)&addr, &addrlen); + assert(stream->accepted_fd < 0); + fd = accept(stream->fd, (struct sockaddr*)&addr, &addrlen); if (fd < 0) { if (errno == EAGAIN) { @@ -390,16 +428,16 @@ void uv__server_io(EV_P_ ev_io* watcher, int revents) { /* TODO special trick. unlock reserved socket, accept, close. */ return; } else { - uv_err_new((uv_handle_t*)tcp, errno); - tcp->connection_cb((uv_handle_t*)tcp, -1); + uv_err_new((uv_handle_t*)stream, errno); + stream->connection_cb((uv_handle_t*)stream, -1); } } else { - tcp->accepted_fd = fd; - tcp->connection_cb((uv_handle_t*)tcp, 0); - if (tcp->accepted_fd >= 0) { + stream->accepted_fd = fd; + stream->connection_cb((uv_handle_t*)stream, 0); + if (stream->accepted_fd >= 0) { /* The user hasn't yet accepted called uv_accept() */ - ev_io_stop(EV_DEFAULT_ &tcp->read_watcher); + ev_io_stop(EV_DEFAULT_ &stream->read_watcher); return; } } @@ -408,24 +446,36 @@ void uv__server_io(EV_P_ ev_io* watcher, int revents) { int uv_accept(uv_handle_t* server, uv_stream_t* client) { - uv_tcp_t* tcpServer = (uv_tcp_t*)server; - uv_tcp_t* tcpClient = (uv_tcp_t*)client; + uv_stream_t* streamServer; + uv_stream_t* streamClient; + int saved_errno; + int status; + + saved_errno = errno; + status = -1; + + streamServer = (uv_stream_t*)server; + streamClient = (uv_stream_t*)client; - if (tcpServer->accepted_fd < 0) { + if (streamServer->accepted_fd < 0) { uv_err_new(server, EAGAIN); - return -1; + goto out; } - if (uv_tcp_open(tcpClient, tcpServer->accepted_fd)) { - /* Ignore error for now */ - tcpServer->accepted_fd = -1; - close(tcpServer->accepted_fd); - return -1; - } else { - tcpServer->accepted_fd = -1; - ev_io_start(EV_DEFAULT_ &tcpServer->read_watcher); - return 0; + if (uv__stream_open(streamClient, streamServer->accepted_fd)) { + /* TODO handle error */ + streamServer->accepted_fd = -1; + close(streamServer->accepted_fd); + goto out; } + + ev_io_start(EV_DEFAULT_ &streamServer->read_watcher); + streamServer->accepted_fd = -1; + status = 0; + +out: + errno = saved_errno; + return status; } @@ -457,33 +507,11 @@ int uv_tcp_listen(uv_tcp_t* tcp, int backlog, uv_connection_cb cb) { void uv__finish_close(uv_handle_t* handle) { - uv_tcp_t* tcp; - assert(uv_flag_is_set(handle, UV_CLOSING)); assert(!uv_flag_is_set(handle, UV_CLOSED)); uv_flag_set(handle, UV_CLOSED); switch (handle->type) { - case UV_TCP: - /* XXX Is it necessary to stop these watchers here? weren't they - * supposed to be stopped in uv_close()? - */ - tcp = (uv_tcp_t*)handle; - ev_io_stop(EV_DEFAULT_ &tcp->write_watcher); - ev_io_stop(EV_DEFAULT_ &tcp->read_watcher); - - assert(!ev_is_active(&tcp->read_watcher)); - assert(!ev_is_active(&tcp->write_watcher)); - - close(tcp->fd); - tcp->fd = -1; - - if (tcp->accepted_fd >= 0) { - close(tcp->accepted_fd); - tcp->accepted_fd = -1; - } - break; - case UV_PREPARE: assert(!ev_is_active(&((uv_prepare_t*)handle)->prepare_watcher)); break; @@ -504,6 +532,26 @@ void uv__finish_close(uv_handle_t* handle) { assert(!ev_is_active(&((uv_timer_t*)handle)->timer_watcher)); break; + case UV_NAMED_PIPE: + case UV_TCP: + { + uv_stream_t* stream; + + stream = (uv_stream_t*)handle; + + assert(!ev_is_active(&stream->read_watcher)); + assert(!ev_is_active(&stream->write_watcher)); + + close(stream->fd); + stream->fd = -1; + + if (stream->accepted_fd >= 0) { + close(stream->accepted_fd); + stream->accepted_fd = -1; + } + break; + } + default: assert(0); break; @@ -519,15 +567,15 @@ void uv__finish_close(uv_handle_t* handle) { } -uv_write_t* uv_write_queue_head(uv_tcp_t* tcp) { +uv_write_t* uv_write_queue_head(uv_stream_t* stream) { ngx_queue_t* q; uv_write_t* req; - if (ngx_queue_empty(&tcp->write_queue)) { + if (ngx_queue_empty(&stream->write_queue)) { return NULL; } - q = ngx_queue_head(&tcp->write_queue); + q = ngx_queue_head(&stream->write_queue); if (!q) { return NULL; } @@ -552,31 +600,31 @@ void uv__next(EV_P_ ev_idle* watcher, int revents) { } -static void uv__drain(uv_tcp_t* tcp) { +static void uv__drain(uv_stream_t* stream) { uv_shutdown_t* req; - assert(!uv_write_queue_head(tcp)); - assert(tcp->write_queue_size == 0); + assert(!uv_write_queue_head(stream)); + assert(stream->write_queue_size == 0); - ev_io_stop(EV_DEFAULT_ &tcp->write_watcher); + ev_io_stop(EV_DEFAULT_ &stream->write_watcher); /* Shutdown? */ - if (uv_flag_is_set((uv_handle_t*)tcp, UV_SHUTTING) && - !uv_flag_is_set((uv_handle_t*)tcp, UV_CLOSING) && - !uv_flag_is_set((uv_handle_t*)tcp, UV_SHUT)) { - assert(tcp->shutdown_req); + if (uv_flag_is_set((uv_handle_t*)stream, UV_SHUTTING) && + !uv_flag_is_set((uv_handle_t*)stream, UV_CLOSING) && + !uv_flag_is_set((uv_handle_t*)stream, UV_SHUT)) { + assert(stream->shutdown_req); - req = tcp->shutdown_req; + req = stream->shutdown_req; - if (shutdown(tcp->fd, SHUT_WR)) { + if (shutdown(stream->fd, SHUT_WR)) { /* Error. Report it. User should call uv_close(). */ - uv_err_new((uv_handle_t*)tcp, errno); + uv_err_new((uv_handle_t*)stream, errno); if (req->cb) { req->cb(req, -1); } } else { - uv_err_new((uv_handle_t*)tcp, 0); - uv_flag_set((uv_handle_t*)tcp, UV_SHUT); + uv_err_new((uv_handle_t*)stream, 0); + uv_flag_set((uv_handle_t*)stream, UV_SHUT); if (req->cb) { req->cb(req, 0); } @@ -588,24 +636,24 @@ static void uv__drain(uv_tcp_t* tcp) { /* On success returns NULL. On error returns a pointer to the write request * which had the error. */ -static uv_write_t* uv__write(uv_tcp_t* tcp) { +static uv_write_t* uv__write(uv_stream_t* stream) { uv_write_t* req; struct iovec* iov; int iovcnt; ssize_t n; - assert(tcp->fd >= 0); + assert(stream->fd >= 0); /* TODO: should probably while(1) here until EAGAIN */ /* Get the request at the head of the queue. */ - req = uv_write_queue_head(tcp); + req = uv_write_queue_head(stream); if (!req) { - assert(tcp->write_queue_size == 0); + assert(stream->write_queue_size == 0); return NULL; } - assert(req->handle == (uv_stream_t*)tcp); + assert(req->handle == stream); /* Cast to iovec. We had to have our own uv_buf_t instead of iovec * because Windows's WSABUF is not an iovec. @@ -619,16 +667,16 @@ static uv_write_t* uv__write(uv_tcp_t* tcp) { */ if (iovcnt == 1) { - n = write(tcp->fd, iov[0].iov_base, iov[0].iov_len); + n = write(stream->fd, iov[0].iov_base, iov[0].iov_len); } else { - n = writev(tcp->fd, iov, iovcnt); + n = writev(stream->fd, iov, iovcnt); } if (n < 0) { if (errno != EAGAIN) { /* Error */ - uv_err_new((uv_handle_t*)tcp, errno); + uv_err_new((uv_handle_t*)stream, errno); return req; } } else { @@ -644,7 +692,7 @@ static uv_write_t* uv__write(uv_tcp_t* tcp) { if (n < len) { buf->base += n; buf->len -= n; - tcp->write_queue_size -= n; + stream->write_queue_size -= n; n = 0; /* There is more to write. Break and ensure the watcher is pending. */ @@ -657,8 +705,8 @@ static uv_write_t* uv__write(uv_tcp_t* tcp) { assert(n >= len); n -= len; - assert(tcp->write_queue_size >= len); - tcp->write_queue_size -= len; + assert(stream->write_queue_size >= len); + stream->write_queue_size -= len; if (req->write_index == req->bufcnt) { /* Then we're done! */ @@ -675,8 +723,8 @@ static uv_write_t* uv__write(uv_tcp_t* tcp) { * callback called in the near future. * TODO: start trying to write the next request. */ - ngx_queue_insert_tail(&tcp->write_completed_queue, &req->queue); - ev_feed_event(EV_DEFAULT_ &tcp->write_watcher, EV_WRITE); + ngx_queue_insert_tail(&stream->write_completed_queue, &req->queue); + ev_feed_event(EV_DEFAULT_ &stream->write_watcher, EV_WRITE); return NULL; } } @@ -687,20 +735,20 @@ static uv_write_t* uv__write(uv_tcp_t* tcp) { assert(n == 0 || n == -1); /* We're not done. */ - ev_io_start(EV_DEFAULT_ &tcp->write_watcher); + ev_io_start(EV_DEFAULT_ &stream->write_watcher); return NULL; } -static void uv__write_callbacks(uv_tcp_t* tcp) { +static void uv__write_callbacks(uv_stream_t* stream) { int callbacks_made = 0; ngx_queue_t* q; uv_write_t* req; - while (!ngx_queue_empty(&tcp->write_completed_queue)) { + while (!ngx_queue_empty(&stream->write_completed_queue)) { /* Pop a req off write_completed_queue. */ - q = ngx_queue_head(&tcp->write_completed_queue); + q = ngx_queue_head(&stream->write_completed_queue); assert(q); req = ngx_queue_data(q, struct uv_write_s, queue); ngx_queue_remove(q); @@ -713,16 +761,16 @@ static void uv__write_callbacks(uv_tcp_t* tcp) { callbacks_made++; } - assert(ngx_queue_empty(&tcp->write_completed_queue)); + assert(ngx_queue_empty(&stream->write_completed_queue)); /* Write queue drained. */ - if (!uv_write_queue_head(tcp)) { - uv__drain(tcp); + if (!uv_write_queue_head(stream)) { + uv__drain(stream); } } -void uv__read(uv_tcp_t* tcp) { +static void uv__read(uv_stream_t* stream) { uv_buf_t buf; struct iovec* iov; ssize_t nread; @@ -730,43 +778,43 @@ void uv__read(uv_tcp_t* tcp) { /* XXX: Maybe instead of having UV_READING we just test if * tcp->read_cb is NULL or not? */ - while (tcp->read_cb && uv_flag_is_set((uv_handle_t*)tcp, UV_READING)) { - assert(tcp->alloc_cb); - buf = tcp->alloc_cb((uv_stream_t*)tcp, 64 * 1024); + while (stream->read_cb && uv_flag_is_set((uv_handle_t*)stream, UV_READING)) { + assert(stream->alloc_cb); + buf = stream->alloc_cb(stream, 64 * 1024); assert(buf.len > 0); assert(buf.base); iov = (struct iovec*) &buf; - nread = read(tcp->fd, buf.base, buf.len); + nread = read(stream->fd, buf.base, buf.len); if (nread < 0) { /* Error */ if (errno == EAGAIN) { /* Wait for the next one. */ - if (uv_flag_is_set((uv_handle_t*)tcp, UV_READING)) { - ev_io_start(EV_DEFAULT_UC_ &tcp->read_watcher); + if (uv_flag_is_set((uv_handle_t*)stream, UV_READING)) { + ev_io_start(EV_DEFAULT_UC_ &stream->read_watcher); } - uv_err_new((uv_handle_t*)tcp, EAGAIN); - tcp->read_cb((uv_stream_t*)tcp, 0, buf); + uv_err_new((uv_handle_t*)stream, EAGAIN); + stream->read_cb(stream, 0, buf); return; } else { /* Error. User should call uv_close(). */ - uv_err_new((uv_handle_t*)tcp, errno); - tcp->read_cb((uv_stream_t*)tcp, -1, buf); - assert(!ev_is_active(&tcp->read_watcher)); + uv_err_new((uv_handle_t*)stream, errno); + stream->read_cb(stream, -1, buf); + assert(!ev_is_active(&stream->read_watcher)); return; } } else if (nread == 0) { /* EOF */ - uv_err_new_artificial((uv_handle_t*)tcp, UV_EOF); - ev_io_stop(EV_DEFAULT_UC_ &tcp->read_watcher); - tcp->read_cb((uv_stream_t*)tcp, -1, buf); + uv_err_new_artificial((uv_handle_t*)stream, UV_EOF); + ev_io_stop(EV_DEFAULT_UC_ &stream->read_watcher); + stream->read_cb(stream, -1, buf); return; } else { /* Successful read */ - tcp->read_cb((uv_stream_t*)tcp, nread, buf); + stream->read_cb(stream, nread, buf); } } } @@ -774,8 +822,8 @@ void uv__read(uv_tcp_t* tcp) { int uv_shutdown(uv_shutdown_t* req, uv_stream_t* handle, uv_shutdown_cb cb) { uv_tcp_t* tcp = (uv_tcp_t*)handle; - assert(handle->type == UV_TCP && - "uv_shutdown (unix) only supports uv_tcp_t right now"); + assert((handle->type == UV_TCP || handle->type == UV_NAMED_PIPE) + && "uv_shutdown (unix) only supports uv_tcp_t right now"); assert(tcp->fd >= 0); /* Initialize request */ @@ -800,31 +848,32 @@ int uv_shutdown(uv_shutdown_t* req, uv_stream_t* handle, uv_shutdown_cb cb) { } -void uv__tcp_io(EV_P_ ev_io* watcher, int revents) { - uv_tcp_t* tcp = watcher->data; +static void uv__stream_io(EV_P_ ev_io* watcher, int revents) { + uv_stream_t* stream = watcher->data; - assert(tcp->type == UV_TCP); - assert(watcher == &tcp->read_watcher || - watcher == &tcp->write_watcher); - assert(tcp->fd >= 0); - assert(!uv_flag_is_set((uv_handle_t*)tcp, UV_CLOSING)); + assert(stream->type == UV_TCP || + stream->type == UV_NAMED_PIPE); + assert(watcher == &stream->read_watcher || + watcher == &stream->write_watcher); + assert(stream->fd >= 0); + assert(!uv_flag_is_set((uv_handle_t*)stream, UV_CLOSING)); - if (tcp->connect_req) { - uv__tcp_connect(tcp); + if (stream->connect_req) { + uv__stream_connect(stream); } else { if (revents & EV_READ) { - uv__read(tcp); + uv__read((uv_stream_t*)stream); } if (revents & EV_WRITE) { - uv_write_t* req = uv__write(tcp); + uv_write_t* req = uv__write(stream); if (req) { /* Error. Notify the user. */ if (req->cb) { req->cb(req, -1); } } else { - uv__write_callbacks(tcp); + uv__write_callbacks(stream); } } } @@ -836,32 +885,32 @@ void uv__tcp_io(EV_P_ ev_io* watcher, int revents) { * In order to determine if we've errored out or succeeded must call * getsockopt. */ -static void uv__tcp_connect(uv_tcp_t* tcp) { +static void uv__stream_connect(uv_stream_t* stream) { int error; - uv_connect_t* req = tcp->connect_req; + uv_connect_t* req = stream->connect_req; socklen_t errorsize = sizeof(int); - assert(tcp->type == UV_TCP); - assert(tcp->fd >= 0); + assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE); + assert(stream->fd >= 0); assert(req); - if (tcp->delayed_error) { + if (stream->delayed_error) { /* To smooth over the differences between unixes errors that * were reported synchronously on the first connect can be delayed * until the next tick--which is now. */ - error = tcp->delayed_error; - tcp->delayed_error = 0; + error = stream->delayed_error; + stream->delayed_error = 0; } else { /* Normal situation: we need to get the socket error from the kernel. */ - getsockopt(tcp->fd, SOL_SOCKET, SO_ERROR, &error, &errorsize); + getsockopt(stream->fd, SOL_SOCKET, SO_ERROR, &error, &errorsize); } if (!error) { - ev_io_start(EV_DEFAULT_ &tcp->read_watcher); + ev_io_start(EV_DEFAULT_ &stream->read_watcher); /* Successful connection */ - tcp->connect_req = NULL; + stream->connect_req = NULL; if (req->cb) { req->cb(req, 0); } @@ -871,9 +920,9 @@ static void uv__tcp_connect(uv_tcp_t* tcp) { return; } else { /* Error */ - uv_err_new((uv_handle_t*)tcp, error); + uv_err_new((uv_handle_t*)stream, error); - tcp->connect_req = NULL; + stream->connect_req = NULL; if (req->cb) { req->cb(req, -1); } @@ -881,45 +930,52 @@ static void uv__tcp_connect(uv_tcp_t* tcp) { } -static int uv__connect(uv_connect_t* req, uv_tcp_t* tcp, struct sockaddr* addr, - socklen_t addrlen, uv_connect_cb cb) { +static int uv__connect(uv_connect_t* req, + uv_stream_t* stream, + struct sockaddr* addr, + socklen_t addrlen, + uv_connect_cb cb) { + + int sockfd; int r; - if (tcp->fd <= 0) { - int fd = socket(addr->sa_family, SOCK_STREAM, 0); + if (stream->fd <= 0) { + if ((sockfd = uv__socket(addr->sa_family, SOCK_STREAM, 0)) == -1) { - if (fd < 0) { - uv_err_new((uv_handle_t*)tcp, errno); + } + + if (sockfd < 0) { + uv_err_new((uv_handle_t*)stream, errno); return -1; } - if (uv_tcp_open(tcp, fd)) { - close(fd); + if (uv__stream_open(stream, sockfd)) { + close(sockfd); return -2; } } uv__req_init((uv_req_t*)req); req->cb = cb; - req->handle = (uv_stream_t*)tcp; + req->handle = stream; req->type = UV_CONNECT; ngx_queue_init(&req->queue); - if (tcp->connect_req) { - uv_err_new((uv_handle_t*)tcp, EALREADY); + if (stream->connect_req) { + uv_err_new((uv_handle_t*)stream, EALREADY); return -1; } - if (tcp->type != UV_TCP) { - uv_err_new((uv_handle_t*)tcp, ENOTSOCK); + if (stream->type != UV_TCP) { + uv_err_new((uv_handle_t*)stream, ENOTSOCK); return -1; } - tcp->connect_req = req; + stream->connect_req = req; - r = connect(tcp->fd, addr, addrlen); + r = connect(stream->fd, addr, addrlen); - tcp->delayed_error = 0; + stream->delayed_error = 0; if (r != 0 && errno != EINPROGRESS) { switch (errno) { @@ -928,41 +984,87 @@ static int uv__connect(uv_connect_t* req, uv_tcp_t* tcp, struct sockaddr* addr, * wait. */ case ECONNREFUSED: - tcp->delayed_error = errno; + stream->delayed_error = errno; break; default: - uv_err_new((uv_handle_t*)tcp, errno); + uv_err_new((uv_handle_t*)stream, errno); return -1; } } - assert(tcp->write_watcher.data == tcp); - ev_io_start(EV_DEFAULT_ &tcp->write_watcher); + assert(stream->write_watcher.data == stream); + ev_io_start(EV_DEFAULT_ &stream->write_watcher); - if (tcp->delayed_error) { - ev_feed_event(EV_DEFAULT_ &tcp->write_watcher, EV_WRITE); + if (stream->delayed_error) { + ev_feed_event(EV_DEFAULT_ &stream->write_watcher, EV_WRITE); } return 0; } -int uv_tcp_connect(uv_connect_t* req, uv_tcp_t* handle, - struct sockaddr_in address, uv_connect_cb cb) { - assert(handle->type == UV_TCP); - assert(address.sin_family == AF_INET); - return uv__connect(req, handle, (struct sockaddr*) &address, - sizeof(struct sockaddr_in), cb); +int uv_tcp_connect(uv_connect_t* req, + uv_tcp_t* handle, + struct sockaddr_in address, + uv_connect_cb cb) { + int saved_errno; + int status; + + saved_errno = errno; + status = -1; + + if (handle->type != UV_TCP) { + uv_err_new((uv_handle_t*)handle, EINVAL); + goto out; + } + + if (address.sin_family != AF_INET) { + uv_err_new((uv_handle_t*)handle, EINVAL); + goto out; + } + + status = uv__connect(req, + (uv_stream_t*)handle, + (struct sockaddr*)&address, + sizeof address, + cb); + +out: + errno = saved_errno; + return status; } -int uv_tcp_connect6(uv_connect_t* req, uv_tcp_t* handle, - struct sockaddr_in6 address, uv_connect_cb cb) { - assert(handle->type == UV_TCP); - assert(address.sin6_family == AF_INET6); - return uv__connect(req, handle, (struct sockaddr*) &address, - sizeof(struct sockaddr_in6), cb); +int uv_tcp_connect6(uv_connect_t* req, + uv_tcp_t* handle, + struct sockaddr_in6 address, + uv_connect_cb cb) { + int saved_errno; + int status; + + saved_errno = errno; + status = -1; + + if (handle->type != UV_TCP) { + uv_err_new((uv_handle_t*)handle, EINVAL); + goto out; + } + + if (address.sin6_family != AF_INET6) { + uv_err_new((uv_handle_t*)handle, EINVAL); + goto out; + } + + status = uv__connect(req, + (uv_stream_t*)handle, + (struct sockaddr*)&address, + sizeof address, + cb); + +out: + errno = saved_errno; + return status; } @@ -1004,8 +1106,10 @@ static size_t uv__buf_count(uv_buf_t bufs[], int bufcnt) { */ int uv_write(uv_write_t* req, uv_stream_t* handle, uv_buf_t bufs[], int bufcnt, uv_write_cb cb) { + uv_stream_t* stream; int empty_queue; - uv_tcp_t* tcp = (uv_tcp_t*)handle; + + stream = (uv_stream_t*)handle; /* Initialize the req */ uv__req_init((uv_req_t*) req); @@ -1013,11 +1117,11 @@ int uv_write(uv_write_t* req, uv_stream_t* handle, uv_buf_t bufs[], int bufcnt, req->handle = handle; ngx_queue_init(&req->queue); - assert(handle->type == UV_TCP && - "uv_write (unix) does not yet support other types of streams"); + assert((handle->type == UV_TCP || handle->type == UV_NAMED_PIPE) + && "uv_write (unix) does not yet support other types of streams"); - empty_queue = (tcp->write_queue_size == 0); - assert(tcp->fd >= 0); + empty_queue = (stream->write_queue_size == 0); + assert(stream->fd >= 0); ngx_queue_init(&req->queue); req->type = UV_WRITE; @@ -1038,22 +1142,22 @@ int uv_write(uv_write_t* req, uv_stream_t* handle, uv_buf_t bufs[], int bufcnt, */ req->write_index = 0; - tcp->write_queue_size += uv__buf_count(bufs, bufcnt); + stream->write_queue_size += uv__buf_count(bufs, bufcnt); /* Append the request to write_queue. */ - ngx_queue_insert_tail(&tcp->write_queue, &req->queue); + ngx_queue_insert_tail(&stream->write_queue, &req->queue); - assert(!ngx_queue_empty(&tcp->write_queue)); - assert(tcp->write_watcher.cb == uv__tcp_io); - assert(tcp->write_watcher.data == tcp); - assert(tcp->write_watcher.fd == tcp->fd); + assert(!ngx_queue_empty(&stream->write_queue)); + assert(stream->write_watcher.cb == uv__stream_io); + assert(stream->write_watcher.data == stream); + assert(stream->write_watcher.fd == stream->fd); /* If the queue was empty when this function began, we should attempt to * do the write immediately. Otherwise start the write_watcher and wait * for the fd to become writable. */ if (empty_queue) { - if (uv__write(tcp)) { + if (uv__write(stream)) { /* Error. uv_last_error has been set. */ return -1; } @@ -1063,13 +1167,13 @@ int uv_write(uv_write_t* req, uv_stream_t* handle, uv_buf_t bufs[], int bufcnt, * means we need to make the callback. The callback can only be done on a * fresh stack so we feed the event loop in order to service it. */ - if (ngx_queue_empty(&tcp->write_queue)) { - ev_feed_event(EV_DEFAULT_ &tcp->write_watcher, EV_WRITE); + if (ngx_queue_empty(&stream->write_queue)) { + ev_feed_event(EV_DEFAULT_ &stream->write_watcher, EV_WRITE); } else { /* Otherwise there is data to write - so we should wait for the file * descriptor to become writable. */ - ev_io_start(EV_DEFAULT_ &tcp->write_watcher); + ev_io_start(EV_DEFAULT_ &stream->write_watcher); } return 0; @@ -1097,28 +1201,27 @@ int64_t uv_now() { int uv_read_start(uv_stream_t* stream, uv_alloc_cb alloc_cb, uv_read_cb read_cb) { - uv_tcp_t* tcp = (uv_tcp_t*)stream; + assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE); /* The UV_READING flag is irrelevant of the state of the tcp - it just * expresses the desired state of the user. */ - uv_flag_set((uv_handle_t*)tcp, UV_READING); + uv_flag_set((uv_handle_t*)stream, UV_READING); /* TODO: try to do the read inline? */ /* TODO: keep track of tcp state. If we've gotten a EOF then we should * not start the IO watcher. */ - assert(tcp->fd >= 0); + assert(stream->fd >= 0); assert(alloc_cb); - tcp->read_cb = read_cb; - tcp->alloc_cb = alloc_cb; + stream->read_cb = read_cb; + stream->alloc_cb = alloc_cb; /* These should have been set by uv_tcp_init. */ - assert(tcp->read_watcher.data == tcp); - assert(tcp->read_watcher.cb == uv__tcp_io); + assert(stream->read_watcher.cb == uv__stream_io); - ev_io_start(EV_DEFAULT_UC_ &tcp->read_watcher); + ev_io_start(EV_DEFAULT_UC_ &stream->read_watcher); return 0; } @@ -1635,21 +1738,306 @@ int uv_getaddrinfo(uv_getaddrinfo_t* handle, int uv_pipe_init(uv_pipe_t* handle) { - assert(0 && "implement me"); + memset(handle, 0, sizeof handle); + + uv__handle_init((uv_handle_t*)handle, UV_NAMED_PIPE); + uv_counters()->pipe_init++; + + handle->type = UV_NAMED_PIPE; + handle->pipe_fname = NULL; /* Only set by listener. */ + + ev_init(&handle->write_watcher, uv__stream_io); + ev_init(&handle->read_watcher, uv__stream_io); + handle->write_watcher.data = handle; + handle->read_watcher.data = handle; + handle->fd = -1; + + ngx_queue_init(&handle->write_completed_queue); + ngx_queue_init(&handle->write_queue); + + return 0; } int uv_pipe_bind(uv_pipe_t* handle, const char* name) { - assert(0 && "implement me"); + struct sockaddr_un sun; + int saved_errno; + int sockfd; + int status; + int bound; + + saved_errno = errno; + sockfd = -1; + status = -1; + bound = 0; + + /* Make a copy of the file name, it outlives this function's scope. */ + if ((name = (const char*)strdup(name)) == NULL) { + uv_err_new((uv_handle_t*)handle, ENOMEM); + goto out; + } + + if ((sockfd = uv__socket(AF_UNIX, SOCK_STREAM, 0)) == -1) { + uv_err_new((uv_handle_t*)handle, errno); + goto out; + } + + memset(&sun, 0, sizeof sun); + uv__strlcpy(sun.sun_path, name, sizeof(sun.sun_path)); + sun.sun_family = AF_UNIX; + + if (bind(sockfd, (struct sockaddr*)&sun, sizeof sun) == -1) { +#ifdef DONT_RACE_ME_BRO + /* + * Try to bind the socket. Note that we explicitly don't act + * on EADDRINUSE. Unlinking and trying to bind again opens + * a window for races with other threads and processes. + */ + uv_err_new((uv_handle_t*)handle, errno); + goto out; +#else + /* + * Try to re-purpose the socket. This is a potential race window. + */ + if (errno != EADDRINUSE + || unlink(name) == -1 + || bind(sockfd, (struct sockaddr*)&sun, sizeof sun) == -1) { + uv_err_new((uv_handle_t*)handle, errno); + goto out; + } +#endif + } + bound = 1; + + /* Success. */ + handle->pipe_fname = name; /* Is a strdup'ed copy. */ + handle->fd = sockfd; + status = 0; + +out: + /* Clean up on error. */ + if (status) { + if (bound) { + /* unlink() before close() to avoid races. */ + unlink(name); + } + close(sockfd); + free((void*)name); + } + + errno = saved_errno; + return status; } int uv_pipe_listen(uv_pipe_t* handle, uv_connection_cb cb) { - assert(0 && "implement me"); + int saved_errno; + int status; + + saved_errno = errno; + + if ((status = listen(handle->fd, SOMAXCONN)) == -1) { + uv_err_new((uv_handle_t*)handle, errno); + } else { + handle->connection_cb = cb; + ev_io_init(&handle->read_watcher, uv__pipe_accept, handle->fd, EV_READ); + ev_io_start(EV_DEFAULT_ &handle->read_watcher); + } + + errno = saved_errno; + return status; +} + + +int uv_pipe_connect(uv_connect_t* req, + uv_pipe_t* handle, + const char* name, + uv_connect_cb cb) { + struct sockaddr_un sun; + int saved_errno; + int sockfd; + int status; + + saved_errno = errno; + sockfd = -1; + status = -1; + + if ((sockfd = uv__socket(AF_UNIX, SOCK_STREAM, 0)) == -1) { + uv_err_new((uv_handle_t*)handle, errno); + goto out; + } + + memset(&sun, 0, sizeof sun); + uv__strlcpy(sun.sun_path, name, sizeof(sun.sun_path)); + sun.sun_family = AF_UNIX; + + /* We don't check for EINPROGRESS. Think about it: the socket + * is either there or not. + */ + if (connect(sockfd, (struct sockaddr*)&sun, sizeof sun) == -1) { + uv_err_new((uv_handle_t*)handle, errno); + close(sockfd); + goto out; + } + + handle->fd = sockfd; + ev_io_init(&handle->read_watcher, uv__stream_io, sockfd, EV_READ); + ev_io_init(&handle->write_watcher, uv__stream_io, sockfd, EV_WRITE); + ev_io_start(EV_DEFAULT_ &handle->read_watcher); + ev_io_start(EV_DEFAULT_ &handle->write_watcher); + + status = 0; + +out: + uv__req_init((uv_req_t*)req); + req->handle = (uv_stream_t*)handle; + req->type = UV_CONNECT; + req->cb = cb; + ngx_queue_init(&req->queue); + + if (cb) { + cb(req, status); + } + + /* Mimic the Windows pipe implementation, always + * return 0 and let the callback handle errors. + */ + errno = saved_errno; + return 0; } -int uv_pipe_connect(uv_connect_t* req, uv_pipe_t* handle, - const char* name, uv_connect_cb cb) { - assert(0 && "implement me"); +/* TODO merge with uv__server_io()? */ +static void uv__pipe_accept(EV_P_ ev_io* watcher, int revents) { + struct sockaddr_un sun; + uv_pipe_t* pipe; + int saved_errno; + int sockfd; + + saved_errno = errno; + pipe = watcher->data; + + assert(pipe->type == UV_NAMED_PIPE); + assert(pipe->pipe_fname != NULL); + + sockfd = uv__accept(pipe->fd, (struct sockaddr *)&sun, sizeof sun); + if (sockfd == -1) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + assert(0 && "EAGAIN on uv__accept(pipefd)"); + } else { + uv_err_new((uv_handle_t*)pipe, errno); + } + } else { + pipe->accepted_fd = sockfd; + pipe->connection_cb((uv_handle_t*)pipe, 0); + if (pipe->accepted_fd == sockfd) { + /* The user hasn't yet accepted called uv_accept() */ + ev_io_stop(EV_DEFAULT_ &pipe->read_watcher); + } + } + + errno = saved_errno; +} + + +/* Open a socket in non-blocking close-on-exec mode, atomically if possible. */ +static int uv__socket(int domain, int type, int protocol) { +#if defined(SOCK_NONBLOCK) && defined(SOCK_CLOEXEC) + return socket(domain, type | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol); +#else + int sockfd; + + if ((sockfd = socket(domain, type, protocol)) == -1) { + return -1; + } + + if (uv__nonblock(sockfd, 1) == -1 || uv__cloexec(sockfd, 1) == -1) { + close(sockfd); + return -1; + } + + return sockfd; +#endif +} + + +static int uv__accept(int sockfd, struct sockaddr* saddr, socklen_t slen) { +#if defined(SOCK_NONBLOCK) && defined(SOCK_CLOEXEC) + return accept4(sockfd, saddr, &slen, SOCK_NONBLOCK | SOCK_CLOEXEC); +#else + int peerfd; + + if ((peerfd = accept(sockfd, saddr, &slen)) == -1) { + return -1; + } + + if (uv__cloexec(peerfd, 1) == -1 || uv__nonblock(peerfd, 1) == -1) { + close(peerfd); + return -1; + } + + return peerfd; +#endif +} + + +static int uv__nonblock(int fd, int set) { + int flags; + + if ((flags = fcntl(fd, F_GETFL)) == -1) { + return -1; + } + + if (set) { + flags |= O_NONBLOCK; + } else { + flags &= ~O_NONBLOCK; + } + + if (fcntl(fd, F_SETFL, flags) == -1) { + return -1; + } + + return 0; +} + + +static int uv__cloexec(int fd, int set) { + int flags; + + if ((flags = fcntl(fd, F_GETFD)) == -1) { + return -1; + } + + if (set) { + flags |= FD_CLOEXEC; + } else { + flags &= ~FD_CLOEXEC; + } + + if (fcntl(fd, F_SETFD, flags) == -1) { + return -1; + } + + return 0; +} + + +/* TODO move to uv-common.c? */ +size_t uv__strlcpy(char* dst, const char* src, size_t size) { + const char *org; + + if (size == 0) { + return 0; + } + + org = src; + while (size > 1) { + if ((*dst++ = *src++) == '\0') { + return org - src; + } + } + *dst = '\0'; + + return src - org; } diff --git a/deps/uv/src/uv-win.c b/deps/uv/src/uv-win.c index 111d99cb00..8241340ae8 100644 --- a/deps/uv/src/uv-win.c +++ b/deps/uv/src/uv-win.c @@ -222,6 +222,7 @@ static char uv_zero_[] = ""; /* mark if IPv6 sockets are supported */ static BOOL uv_allow_ipv6 = FALSE; + /* * Subclass of uv_handle_t. Used for integration of c-ares. */ @@ -374,6 +375,7 @@ static uv_err_code uv_translate_sys_error(int sys_errno) { case ERROR_NO_UNICODE_TRANSLATION: return UV_ECHARSET; case ERROR_BROKEN_PIPE: return UV_EOF; case ERROR_PIPE_BUSY: return UV_EBUSY; + case ERROR_SEM_TIMEOUT: return UV_ETIMEDOUT; default: return UV_UNKNOWN; } } @@ -517,6 +519,7 @@ void uv_init() { static void uv_req_init(uv_req_t* req) { uv_counters()->req_init++; req->type = UV_UNKNOWN_REQ; + req->error = uv_ok_; } @@ -1028,10 +1031,14 @@ static void uv_pipe_queue_accept(uv_pipe_t* handle, uv_pipe_accept_t* req) { /* Prepare the overlapped structure. */ memset(&(req->overlapped), 0, sizeof(req->overlapped)); - if (!ConnectNamedPipe(pipeHandle, &req->overlapped) && - GetLastError() != ERROR_IO_PENDING && GetLastError() != ERROR_PIPE_CONNECTED) { - /* Make this req pending reporting an error. */ - req->error = uv_new_sys_error(GetLastError()); + if (!ConnectNamedPipe(pipeHandle, &req->overlapped) && GetLastError() != ERROR_IO_PENDING) { + if (GetLastError() == ERROR_PIPE_CONNECTED) { + req->pipeHandle = pipeHandle; + req->error = uv_ok_; + } else { + /* Make this req pending reporting an error. */ + req->error = uv_new_sys_error(GetLastError()); + } uv_insert_pending_req((uv_req_t*) req); handle->reqs_pending++; return; @@ -2314,9 +2321,7 @@ static void uv_poll() { /* Package was dequeued */ req = uv_overlapped_to_req(overlapped); - if (success) { - req->error = uv_ok_; - } else { + if (!success) { req->error = uv_new_sys_error(GetLastError()); } @@ -2970,6 +2975,7 @@ int uv_pipe_init(uv_pipe_t* handle) { handle->type = UV_NAMED_PIPE; handle->reqs_pending = 0; handle->pending_accepts = NULL; + handle->name = NULL; uv_counters()->pipe_init++; @@ -3033,61 +3039,131 @@ int uv_pipe_listen(uv_pipe_t* handle, uv_connection_cb cb) { return 0; } + +static int uv_set_pipe_handle(uv_pipe_t* handle, HANDLE pipeHandle) { + DWORD mode = PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT; + + if (!SetNamedPipeHandleState(pipeHandle, &mode, NULL, NULL)) { + return -1; + } + + if (CreateIoCompletionPort(pipeHandle, + uv_iocp_, + (ULONG_PTR)handle, + 0) == NULL) { + return -1; + } + + return 0; +} + + +static DWORD WINAPI pipe_connect_thread_proc(void* parameter) { + HANDLE pipeHandle = INVALID_HANDLE_VALUE; + int errno; + uv_pipe_t* handle; + uv_connect_t* req; + + req = (uv_connect_t*)parameter; + assert(req); + handle = (uv_pipe_t*)req->handle; + assert(handle); + + /* We're here because CreateFile on a pipe returned ERROR_PIPE_BUSY. We wait for the pipe to become available with WaitNamedPipe. */ + while (WaitNamedPipe(handle->name, 30000)) { + /* The pipe is now available, try to connect. */ + pipeHandle = CreateFile(handle->name, + GENERIC_READ | GENERIC_WRITE, + 0, + NULL, + OPEN_EXISTING, + FILE_FLAG_OVERLAPPED, + NULL); + + if (pipeHandle != INVALID_HANDLE_VALUE) { + break; + } + } + + if (pipeHandle != INVALID_HANDLE_VALUE && !uv_set_pipe_handle(handle, pipeHandle)) { + handle->handle = pipeHandle; + req->error = uv_ok_; + } else { + req->error = uv_new_sys_error(GetLastError()); + } + + memset(&req->overlapped, 0, sizeof(req->overlapped)); + + /* Post completed */ + if (!PostQueuedCompletionStatus(uv_iocp_, + 0, + 0, + &req->overlapped)) { + uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus"); + } + + return 0; +} + + /* TODO: make this work with UTF8 name */ -/* TODO: run this in the thread pool */ int uv_pipe_connect(uv_connect_t* req, uv_pipe_t* handle, const char* name, uv_connect_cb cb) { int errno; - DWORD mode; + HANDLE pipeHandle; + + handle->handle = INVALID_HANDLE_VALUE; uv_req_init((uv_req_t*) req); req->type = UV_CONNECT; req->handle = (uv_stream_t*) handle; req->cb = cb; - memset(&req->overlapped, 0, sizeof(req->overlapped)); - - handle->handle = CreateFile(name, - GENERIC_READ | GENERIC_WRITE, - 0, - NULL, - OPEN_EXISTING, - FILE_FLAG_OVERLAPPED, - NULL); + pipeHandle = CreateFile(name, + GENERIC_READ | GENERIC_WRITE, + 0, + NULL, + OPEN_EXISTING, + FILE_FLAG_OVERLAPPED, + NULL); - if (handle->handle == INVALID_HANDLE_VALUE && - GetLastError() != ERROR_IO_PENDING) { - errno = GetLastError(); - goto error; - } + if (pipeHandle == INVALID_HANDLE_VALUE) { + if (GetLastError() == ERROR_PIPE_BUSY) { + /* Wait for the server to make a pipe instance available. */ + handle->name = strdup(name); + if (!handle->name) { + uv_fatal_error(ERROR_OUTOFMEMORY, "malloc"); + } - mode = PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT; + if (!QueueUserWorkItem(&pipe_connect_thread_proc, req, WT_EXECUTELONGFUNCTION)) { + errno = GetLastError(); + goto error; + } - if (!SetNamedPipeHandleState(handle->handle, &mode, NULL, NULL)) { + return 0; + } + errno = GetLastError(); goto error; } - - if (CreateIoCompletionPort(handle->handle, - uv_iocp_, - (ULONG_PTR)handle, - 0) == NULL) { + + if (uv_set_pipe_handle((uv_pipe_t*)req->handle, pipeHandle)) { errno = GetLastError(); goto error; } + handle->handle = pipeHandle; + req->error = uv_ok_; uv_insert_pending_req((uv_req_t*) req); handle->reqs_pending++; return 0; error: - if (handle->handle != INVALID_HANDLE_VALUE) { - CloseHandle(handle->handle); + if (pipeHandle != INVALID_HANDLE_VALUE) { + CloseHandle(pipeHandle); } - req->error = uv_new_sys_error(errno); - uv_insert_pending_req((uv_req_t*) req); - handle->reqs_pending++; + uv_set_sys_error(errno); return -1; } @@ -3097,6 +3173,11 @@ static void close_pipe(uv_pipe_t* handle, int* status, uv_err_t* err) { int i; HANDLE pipeHandle; + if (handle->name) { + free(handle->name); + handle->name; + } + if (handle->flags & UV_HANDLE_PIPESERVER) { for (i = 0; i < COUNTOF(handle->accept_reqs); i++) { pipeHandle = handle->accept_reqs[i].pipeHandle; @@ -3105,7 +3186,7 @@ static void close_pipe(uv_pipe_t* handle, int* status, uv_err_t* err) { } } - } else { + } else if (handle->handle != INVALID_HANDLE_VALUE) { CloseHandle(handle->handle); } diff --git a/deps/uv/test/benchmark-pump.c b/deps/uv/test/benchmark-pump.c index d7524f7470..1732e84ff5 100644 --- a/deps/uv/test/benchmark-pump.c +++ b/deps/uv/test/benchmark-pump.c @@ -261,13 +261,6 @@ static void maybe_connect_some() { req = (uv_connect_t*) req_alloc(); r = uv_pipe_connect(req, pipe, TEST_PIPENAME, connect_cb); ASSERT(r == 0); - -#ifdef _WIN32 - /* HACK: This is temporary to give the pipes server enough time to create new handles. - * This will go away once uv_pipe_connect can deal with UV_EBUSY. - */ - Sleep(1); -#endif } } } diff --git a/deps/uv/test/echo-server.c b/deps/uv/test/echo-server.c index e107dc5b95..992c88bb2d 100644 --- a/deps/uv/test/echo-server.c +++ b/deps/uv/test/echo-server.c @@ -124,7 +124,7 @@ static uv_buf_t echo_alloc(uv_stream_t* handle, size_t suggested_size) { static void on_connection(uv_handle_t* server, int status) { - uv_handle_t* handle; + uv_stream_t* stream; int r; if (status != 0) { @@ -132,25 +132,31 @@ static void on_connection(uv_handle_t* server, int status) { } ASSERT(status == 0); - if (serverType == TCP) { - handle = (uv_handle_t*) malloc(sizeof(uv_tcp_t)); - ASSERT(handle != NULL); - - uv_tcp_init((uv_tcp_t*)handle); - } else { - handle = (uv_handle_t*) malloc(sizeof(uv_pipe_t)); - ASSERT(handle != NULL); - - uv_pipe_init((uv_pipe_t*)handle); + switch (serverType) { + case TCP: + stream = malloc(sizeof(uv_tcp_t)); + ASSERT(stream != NULL); + uv_tcp_init((uv_tcp_t*)stream); + break; + + case PIPE: + stream = malloc(sizeof(uv_pipe_t)); + ASSERT(stream != NULL); + uv_pipe_init((uv_pipe_t*)stream); + break; + + default: + ASSERT(0 && "Bad serverType"); + abort(); } /* associate server with stream */ - handle->data = server; + stream->data = server; - r = uv_accept(server, (uv_stream_t*)handle); + r = uv_accept(server, stream); ASSERT(r == 0); - r = uv_read_start((uv_stream_t*)handle, echo_alloc, after_read); + r = uv_read_start(stream, echo_alloc, after_read); ASSERT(r == 0); } @@ -233,22 +239,19 @@ static int pipe_echo_start(char* pipeName) { r = uv_pipe_init(&pipeServer); if (r) { - /* TODO: Error codes */ - fprintf(stderr, "Pipe creation error\n"); + fprintf(stderr, "uv_pipe_init: %s\n", uv_strerror(uv_last_error())); return 1; } r = uv_pipe_bind(&pipeServer, pipeName); if (r) { - /* TODO: Error codes */ - fprintf(stderr, "create error\n"); + fprintf(stderr, "uv_pipe_bind: %s\n", uv_strerror(uv_last_error())); return 1; } r = uv_pipe_listen(&pipeServer, on_connection); if (r) { - /* TODO: Error codes */ - fprintf(stderr, "Listen error on IPv6\n"); + fprintf(stderr, "uv_pipe_listen: %s\n", uv_strerror(uv_last_error())); return 1; } diff --git a/deps/uv/test/task.h b/deps/uv/test/task.h index d47c209480..2c0febdad9 100644 --- a/deps/uv/test/task.h +++ b/deps/uv/test/task.h @@ -33,8 +33,7 @@ #ifdef _WIN32 # define TEST_PIPENAME "\\\\.\\pipe\\uv-test" #else -# /* TODO: define unix pipe name */ -# define TEST_PIPENAME "" +# define TEST_PIPENAME "/tmp/uv-test-sock" #endif typedef enum {