From 145aa636b9349a375c36b3a3f1c20e03416c750d Mon Sep 17 00:00:00 2001 From: Ben Noordhuis Date: Thu, 20 Oct 2011 15:25:57 -0700 Subject: [PATCH] uv: upgrade to 28234d7 --- deps/uv/AUTHORS | 2 + deps/uv/include/uv-private/uv-win.h | 43 +++- deps/uv/include/uv.h | 4 +- deps/uv/src/ares/ares_init.c | 21 +- deps/uv/src/unix/linux.c | 4 + deps/uv/src/unix/stream.c | 2 +- deps/uv/src/win/internal.h | 31 ++- deps/uv/src/win/pipe.c | 354 +++++++++++++++++++++++---- deps/uv/src/win/stream.c | 2 + deps/uv/src/win/tcp.c | 7 +- deps/uv/src/win/tty.c | 10 +- deps/uv/src/win/udp.c | 114 +++++---- deps/uv/src/win/winapi.c | 8 + deps/uv/src/win/winapi.h | 25 ++ deps/uv/src/win/winsock.c | 207 ++++++++++++++++ deps/uv/src/win/winsock.h | 66 ++++- deps/uv/test/run-tests.c | 101 ++++++++ deps/uv/test/task.h | 2 + deps/uv/test/test-list.h | 2 + deps/uv/test/test-stdio-over-pipes.c | 154 ++++++++++++ deps/uv/uv.gyp | 1 + 21 files changed, 1030 insertions(+), 130 deletions(-) create mode 100644 deps/uv/test/test-stdio-over-pipes.c diff --git a/deps/uv/AUTHORS b/deps/uv/AUTHORS index 0b852b12c1..2d43c36614 100644 --- a/deps/uv/AUTHORS +++ b/deps/uv/AUTHORS @@ -29,3 +29,5 @@ Fedor Indutny Saúl Ibarra Corretgé Felix Geisendörfer Yuki OKUMURA +Roman Shtylman +Frank DENIS diff --git a/deps/uv/include/uv-private/uv-win.h b/deps/uv/include/uv-private/uv-win.h index 81693ea882..b7fb0a4b3d 100644 --- a/deps/uv/include/uv-private/uv-win.h +++ b/deps/uv/include/uv-private/uv-win.h @@ -104,6 +104,27 @@ DWORD dwFlags); #endif +typedef int (WSAAPI* LPFN_WSARECV) + (SOCKET socket, + LPWSABUF buffers, + DWORD buffer_count, + LPDWORD bytes, + LPDWORD flags, + LPWSAOVERLAPPED overlapped, + LPWSAOVERLAPPED_COMPLETION_ROUTINE + completion_routine); + +typedef int (WSAAPI* LPFN_WSARECVFROM) + (SOCKET socket, + LPWSABUF buffers, + DWORD buffer_count, + LPDWORD bytes, + LPDWORD flags, + struct sockaddr* addr, + LPINT addr_len, + LPWSAOVERLAPPED overlapped, + LPWSAOVERLAPPED_COMPLETION_ROUTINE completion_routine); + /** * It should be possible to cast uv_buf_t[] to WSABUF[] @@ -169,7 +190,10 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s); struct uv_req_s* next_req; #define UV_WRITE_PRIVATE_FIELDS \ - int ipc_header; + int ipc_header; \ + uv_buf_t write_buffer; \ + HANDLE event_handle; \ + HANDLE wait_handle; #define UV_CONNECT_PRIVATE_FIELDS \ /* empty */ @@ -194,7 +218,13 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s); HANDLE event_handle; \ HANDLE wait_handle; \ struct uv_tcp_accept_s* next_pending; \ - } uv_tcp_accept_t; + } uv_tcp_accept_t; \ + \ + typedef struct uv_read_s { \ + UV_REQ_FIELDS \ + HANDLE event_handle; \ + HANDLE wait_handle; \ + } uv_read_t; #define uv_stream_connection_fields \ unsigned int write_reqs_pending; \ @@ -205,7 +235,7 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s); #define UV_STREAM_PRIVATE_FIELDS \ unsigned int reqs_pending; \ - uv_req_t read_req; \ + uv_read_t read_req; \ union { \ struct { uv_stream_connection_fields }; \ struct { uv_stream_server_fields }; \ @@ -236,7 +266,9 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s); struct sockaddr_storage recv_from; \ int recv_from_len; \ uv_udp_recv_cb recv_cb; \ - uv_alloc_cb alloc_cb; + uv_alloc_cb alloc_cb; \ + LPFN_WSARECV func_wsarecv; \ + LPFN_WSARECVFROM func_wsarecvfrom; #define uv_pipe_server_fields \ uv_pipe_accept_t accept_reqs[4]; \ @@ -247,7 +279,8 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s); uv_write_t ipc_header_write_req; \ int ipc_pid; \ uint64_t remaining_ipc_rawdata_bytes; \ - WSAPROTOCOL_INFOW* pending_socket_info; + WSAPROTOCOL_INFOW* pending_socket_info; \ + uv_write_t* non_overlapped_writes_tail; #define UV_PIPE_PRIVATE_FIELDS \ HANDLE handle; \ diff --git a/deps/uv/include/uv.h b/deps/uv/include/uv.h index 3173b71406..3428f56ed8 100644 --- a/deps/uv/include/uv.h +++ b/deps/uv/include/uv.h @@ -610,7 +610,7 @@ int uv_udp_send6(uv_udp_send_t* req, uv_udp_t* handle, uv_buf_t bufs[], int bufcnt, struct sockaddr_in6 addr, uv_udp_send_cb send_cb); /* - * Send data. If the socket has not previously been bound with `uv_udp_bind` + * Receive data. If the socket has not previously been bound with `uv_udp_bind` * or `uv_udp_bind6`, it is bound to 0.0.0.0 (the "all interfaces" address) * and a random port number. * @@ -1061,7 +1061,7 @@ int uv_fs_lstat(uv_loop_t* loop, uv_fs_t* req, const char* path, uv_fs_cb cb); int uv_fs_link(uv_loop_t* loop, uv_fs_t* req, const char* path, const char* new_path, uv_fs_cb cb); -/* +/* * This flag can be used with uv_fs_symlink on Windows * to specify whether path argument points to a directory. */ diff --git a/deps/uv/src/ares/ares_init.c b/deps/uv/src/ares/ares_init.c index 44d4cf934f..52bb4d6c89 100644 --- a/deps/uv/src/ares/ares_init.c +++ b/deps/uv/src/ares/ares_init.c @@ -696,17 +696,20 @@ static int get_iphlpapi_dns_info (char *ret_buf, size_t ret_size) struct sockaddr_in6 *pIPv6Addr = ( struct sockaddr_in6 * ) pGenericAddr; ares_inet_ntop( AF_INET6, &pIPv6Addr->sin6_addr, ret, ipv6_size - 1 ); /* -1 for comma */ - /* Append a comma to the end, THEN NULL. Should be OK because we - already tested the size at the top of the if statement. */ stringlen = strlen( ret ); - ret[ stringlen ] = ','; - ret[ stringlen + 1 ] = '\0'; - ret += stringlen + 1; - left -= ret - ret_buf; - ++count; - /* NB on Windows this also returns stuff in the fec0::/10 range, - seems to be hard-coded somehow. Do we need to ignore them? */ + /* Windows apparently always reports some IPv6 DNS servers that + prefixed with fec0:0:0:ffff. These ususally do not point to + working DNS servers, so we ignore them. */ + if (strncmp(ret, "fec0:0:0:ffff:", 14) != 0) { + /* Append a comma to the end, THEN NULL. Should be OK because we + already tested the size at the top of the if statement. */ + ret[ stringlen ] = ','; + ret[ stringlen + 1 ] = '\0'; + ret += stringlen + 1; + left -= ret - ret_buf; + ++count; + } } } } diff --git a/deps/uv/src/unix/linux.c b/deps/uv/src/unix/linux.c index e37983d115..fb499a97aa 100644 --- a/deps/uv/src/unix/linux.c +++ b/deps/uv/src/unix/linux.c @@ -144,6 +144,9 @@ static void uv__inotify_read(EV_P_ ev_io* w, int revents) { filename = e->len ? e->name : basename_r(handle->filename); handle->cb(handle, filename, events, 0); + + if (handle->fd == -1) + break; } } while (handle->fd != -1); /* handle might've been closed by callback */ @@ -198,4 +201,5 @@ void uv__fs_event_destroy(uv_fs_event_t* handle) { uv__close(handle->fd); handle->fd = -1; free(handle->filename); + handle->filename = NULL; } diff --git a/deps/uv/src/unix/stream.c b/deps/uv/src/unix/stream.c index 855f45bc9e..3dbdf8997d 100644 --- a/deps/uv/src/unix/stream.c +++ b/deps/uv/src/unix/stream.c @@ -209,8 +209,8 @@ int uv_accept(uv_stream_t* server, uv_stream_t* client) { if (uv__stream_open(streamClient, streamServer->accepted_fd, UV_READABLE | UV_WRITABLE)) { /* TODO handle error */ - streamServer->accepted_fd = -1; uv__close(streamServer->accepted_fd); + streamServer->accepted_fd = -1; goto out; } diff --git a/deps/uv/src/win/internal.h b/deps/uv/src/win/internal.h index 4bb36ad8fe..ea0867c1eb 100644 --- a/deps/uv/src/win/internal.h +++ b/deps/uv/src/win/internal.h @@ -65,6 +65,7 @@ void uv_process_timers(uv_loop_t* loop); #define UV_HANDLE_ZERO_READ 0x40000 #define UV_HANDLE_TTY_RAW 0x80000 #define UV_HANDLE_EMULATE_IOCP 0x100000 +#define UV_HANDLE_NON_OVERLAPPED_PIPE 0x200000 void uv_want_endgame(uv_loop_t* loop, uv_handle_t* handle); void uv_process_endgames(uv_loop_t* loop); @@ -307,14 +308,40 @@ uv_err_code uv_translate_sys_error(int sys_errno); /* - * Initialization for the windows and winsock api + * Winapi and ntapi utility functions */ void uv_winapi_init(); + + +/* + * Winsock utility functions + */ void uv_winsock_init(); + int uv_ntstatus_to_winsock_error(NTSTATUS status); +BOOL uv_get_acceptex_function(SOCKET socket, LPFN_ACCEPTEX* target); +BOOL uv_get_connectex_function(SOCKET socket, LPFN_CONNECTEX* target); + +int WSAAPI uv_wsarecv_workaround(SOCKET socket, WSABUF* buffers, + DWORD buffer_count, DWORD* bytes, DWORD* flags, WSAOVERLAPPED *overlapped, + LPWSAOVERLAPPED_COMPLETION_ROUTINE completion_routine); +int WSAAPI uv_wsarecvfrom_workaround(SOCKET socket, WSABUF* buffers, + DWORD buffer_count, DWORD* bytes, DWORD* flags, struct sockaddr* addr, + int* addr_len, WSAOVERLAPPED *overlapped, + LPWSAOVERLAPPED_COMPLETION_ROUTINE completion_routine); -/* Threads and synchronization */ +/* Whether ipv6 is supported */ +extern int uv_allow_ipv6; + +/* Ip address used to bind to any port at any interface */ +extern struct sockaddr_in uv_addr_ip4_any_; +extern struct sockaddr_in6 uv_addr_ip6_any_; + + +/* + * Threads and synchronization + */ typedef struct uv_once_s { unsigned char ran; /* The actual event handle must be aligned to sizeof(HANDLE), so in */ diff --git a/deps/uv/src/win/pipe.c b/deps/uv/src/win/pipe.c index 9c90fda05f..81ce3e4a0d 100644 --- a/deps/uv/src/win/pipe.c +++ b/deps/uv/src/win/pipe.c @@ -79,6 +79,7 @@ int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) { handle->remaining_ipc_rawdata_bytes = 0; handle->pending_socket_info = NULL; handle->ipc = ipc; + handle->non_overlapped_writes_tail = NULL; uv_req_init(loop, (uv_req_t*) &handle->ipc_header_write_req); @@ -90,6 +91,7 @@ int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) { static void uv_pipe_connection_init(uv_pipe_t* handle) { uv_connection_init((uv_stream_t*) handle); + handle->read_req.data = handle; handle->eof_timer = NULL; } @@ -149,19 +151,39 @@ done: static int uv_set_pipe_handle(uv_loop_t* loop, uv_pipe_t* handle, HANDLE pipeHandle) { + NTSTATUS nt_status; + IO_STATUS_BLOCK io_status; + FILE_MODE_INFORMATION mode_info; DWORD mode = PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT; if (!SetNamedPipeHandleState(pipeHandle, &mode, NULL, NULL)) { return -1; } - if (CreateIoCompletionPort(pipeHandle, - loop->iocp, - (ULONG_PTR)handle, - 0) == NULL) { + /* Check if the pipe was created with FILE_FLAG_OVERLAPPED. */ + nt_status = pNtQueryInformationFile(pipeHandle, + &io_status, + &mode_info, + sizeof(mode_info), + FileModeInformation); + if (nt_status != STATUS_SUCCESS) { return -1; } + if (mode_info.Mode & FILE_SYNCHRONOUS_IO_ALERT || + mode_info.Mode & FILE_SYNCHRONOUS_IO_NONALERT) { + /* Non-overlapped pipe. */ + handle->flags |= UV_HANDLE_NON_OVERLAPPED_PIPE; + } else { + /* Overlapped pipe. Try to associate with IOCP. */ + if (CreateIoCompletionPort(pipeHandle, + loop->iocp, + (ULONG_PTR)handle, + 0) == NULL) { + handle->flags |= UV_HANDLE_EMULATE_IOCP; + } + } + return 0; } @@ -258,6 +280,17 @@ void uv_pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) { free(handle->pending_socket_info); handle->pending_socket_info = NULL; } + + if (handle->flags & UV_HANDLE_EMULATE_IOCP) { + if (handle->read_req.wait_handle != INVALID_HANDLE_VALUE) { + UnregisterWait(handle->read_req.wait_handle); + handle->read_req.wait_handle = INVALID_HANDLE_VALUE; + } + if (handle->read_req.event_handle) { + CloseHandle(handle->read_req.event_handle); + handle->read_req.event_handle = NULL; + } + } } /* Remember the state of this flag because the close callback is */ @@ -657,8 +690,99 @@ int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb) { } +static DWORD WINAPI uv_pipe_zero_readfile_thread_proc(void* parameter) { + int result; + DWORD bytes; + uv_read_t* req = (uv_read_t*) parameter; + uv_pipe_t* handle = (uv_pipe_t*) req->data; + uv_loop_t* loop = handle->loop; + + assert(req != NULL); + assert(req->type == UV_READ); + assert(handle->type == UV_NAMED_PIPE); + + result = ReadFile(handle->handle, + &uv_zero_, + 0, + &bytes, + NULL); + + if (!result) { + SET_REQ_ERROR(req, GetLastError()); + } + + POST_COMPLETION_FOR_REQ(loop, req); + return 0; +} + + +static DWORD WINAPI uv_pipe_writefile_thread_proc(void* parameter) { + int result; + DWORD bytes; + uv_write_t* req = (uv_write_t*) parameter; + uv_pipe_t* handle = (uv_pipe_t*) req->handle; + uv_loop_t* loop = handle->loop; + + assert(req != NULL); + assert(req->type == UV_WRITE); + assert(handle->type == UV_NAMED_PIPE); + assert(req->write_buffer.base); + + result = WriteFile(handle->handle, + req->write_buffer.base, + req->write_buffer.len, + &bytes, + NULL); + + if (!result) { + SET_REQ_ERROR(req, GetLastError()); + } + + POST_COMPLETION_FOR_REQ(loop, req); + return 0; +} + + +static void CALLBACK post_completion_read_wait(void* context, BOOLEAN timed_out) { + uv_read_t* req; + uv_tcp_t* handle; + + req = (uv_read_t*) context; + assert(req != NULL); + handle = (uv_tcp_t*)req->data; + assert(handle != NULL); + assert(!timed_out); + + if (!PostQueuedCompletionStatus(handle->loop->iocp, + req->overlapped.InternalHigh, + 0, + &req->overlapped)) { + uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus"); + } +} + + +static void CALLBACK post_completion_write_wait(void* context, BOOLEAN timed_out) { + uv_write_t* req; + uv_tcp_t* handle; + + req = (uv_write_t*) context; + assert(req != NULL); + handle = (uv_tcp_t*)req->handle; + assert(handle != NULL); + assert(!timed_out); + + if (!PostQueuedCompletionStatus(handle->loop->iocp, + req->overlapped.InternalHigh, + 0, + &req->overlapped)) { + uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus"); + } +} + + static void uv_pipe_queue_read(uv_loop_t* loop, uv_pipe_t* handle) { - uv_req_t* req; + uv_read_t* req; int result; assert(handle->flags & UV_HANDLE_READING); @@ -667,28 +791,60 @@ static void uv_pipe_queue_read(uv_loop_t* loop, uv_pipe_t* handle) { assert(handle->handle != INVALID_HANDLE_VALUE); req = &handle->read_req; - memset(&req->overlapped, 0, sizeof(req->overlapped)); - /* Do 0-read */ - result = ReadFile(handle->handle, - &uv_zero_, - 0, - NULL, - &req->overlapped); + if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) { + if (!QueueUserWorkItem(&uv_pipe_zero_readfile_thread_proc, + req, + WT_EXECUTELONGFUNCTION)) { + /* Make this req pending reporting an error. */ + SET_REQ_ERROR(req, GetLastError()); + goto error; + } + } else { + memset(&req->overlapped, 0, sizeof(req->overlapped)); + if (handle->flags & UV_HANDLE_EMULATE_IOCP) { + req->overlapped.hEvent = (HANDLE) ((DWORD) req->event_handle | 1); + } - if (!result && GetLastError() != ERROR_IO_PENDING) { - /* Make this req pending reporting an error. */ - SET_REQ_ERROR(req, WSAGetLastError()); - uv_insert_pending_req(loop, req); + /* Do 0-read */ + result = ReadFile(handle->handle, + &uv_zero_, + 0, + NULL, + &req->overlapped); - handle->flags |= UV_HANDLE_READ_PENDING; - handle->reqs_pending++; - return; + if (!result && GetLastError() != ERROR_IO_PENDING) { + /* Make this req pending reporting an error. */ + SET_REQ_ERROR(req, GetLastError()); + goto error; + } + + if (handle->flags & UV_HANDLE_EMULATE_IOCP) { + if (!req->event_handle) { + req->event_handle = CreateEvent(NULL, 0, 0, NULL); + if (!req->event_handle) { + uv_fatal_error(GetLastError(), "CreateEvent"); + } + } + if (req->wait_handle == INVALID_HANDLE_VALUE) { + if (!RegisterWaitForSingleObject(&req->wait_handle, + req->overlapped.hEvent, post_completion_read_wait, (void*) req, + INFINITE, WT_EXECUTEINWAITTHREAD)) { + SET_REQ_ERROR(req, GetLastError()); + goto error; + } + } + } } /* Start the eof timer if there is one */ eof_timer_start(handle); + handle->flags |= UV_HANDLE_READ_PENDING; + handle->reqs_pending++; + return; +error: + uv_insert_pending_req(loop, (uv_req_t*)req); handle->flags |= UV_HANDLE_READ_PENDING; handle->reqs_pending++; } @@ -739,6 +895,54 @@ int uv_pipe_read2_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb, } +static void uv_insert_non_overlapped_write_req(uv_pipe_t* handle, + uv_write_t* req) { + req->next_req = NULL; + if (handle->non_overlapped_writes_tail) { + req->next_req = + handle->non_overlapped_writes_tail->next_req; + handle->non_overlapped_writes_tail->next_req = (uv_req_t*)req; + handle->non_overlapped_writes_tail = req; + } else { + req->next_req = (uv_req_t*)req; + handle->non_overlapped_writes_tail = req; + } +} + + +static uv_write_t* uv_remove_non_overlapped_write_req(uv_pipe_t* handle) { + uv_write_t* req; + + if (handle->non_overlapped_writes_tail) { + req = (uv_write_t*)handle->non_overlapped_writes_tail->next_req; + + if (req == handle->non_overlapped_writes_tail) { + handle->non_overlapped_writes_tail = NULL; + } else { + handle->non_overlapped_writes_tail->next_req = + req->next_req; + } + + return req; + } else { + /* queue empty */ + return NULL; + } +} + + +static void uv_queue_non_overlapped_write(uv_pipe_t* handle) { + uv_write_t* req = uv_remove_non_overlapped_write_req(handle); + if (req) { + if (!QueueUserWorkItem(&uv_pipe_writefile_thread_proc, + req, + WT_EXECUTELONGFUNCTION)) { + uv_fatal_error(GetLastError(), "QueueUserWorkItem"); + } + } +} + + static int uv_pipe_write_impl(uv_loop_t* loop, uv_write_t* req, uv_pipe_t* handle, uv_buf_t bufs[], int bufcnt, uv_stream_t* send_handle, uv_write_cb cb) { @@ -775,9 +979,12 @@ static int uv_pipe_write_impl(uv_loop_t* loop, uv_write_t* req, req->handle = (uv_stream_t*) handle; req->cb = cb; req->ipc_header = 0; + req->event_handle = NULL; + req->wait_handle = INVALID_HANDLE_VALUE; memset(&req->overlapped, 0, sizeof(req->overlapped)); if (handle->ipc) { + assert(!(handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE)); ipc_frame.header.flags = 0; /* Use the IPC framing protocol. */ @@ -847,6 +1054,10 @@ static int uv_pipe_write_impl(uv_loop_t* loop, uv_write_t* req, handle->write_queue_size += req->queued_bytes; } + if (handle->write_reqs_pending == 0) { + uv_ref(loop); + } + handle->reqs_pending++; handle->write_reqs_pending++; @@ -856,24 +1067,53 @@ static int uv_pipe_write_impl(uv_loop_t* loop, uv_write_t* req, } } - result = WriteFile(handle->handle, - bufs[0].base, - bufs[0].len, - NULL, - &req->overlapped); - - if (!result && GetLastError() != ERROR_IO_PENDING) { - uv__set_sys_error(loop, GetLastError()); - return -1; - } + if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) { + req->write_buffer = bufs[0]; + uv_insert_non_overlapped_write_req(handle, req); + if (handle->write_reqs_pending == 0) { + uv_queue_non_overlapped_write(handle); + } - if (result) { - /* Request completed immediately. */ - req->queued_bytes = 0; - } else { /* Request queued by the kernel. */ req->queued_bytes = uv_count_bufs(bufs, bufcnt); handle->write_queue_size += req->queued_bytes; + } else { + result = WriteFile(handle->handle, + bufs[0].base, + bufs[0].len, + NULL, + &req->overlapped); + + if (!result && GetLastError() != ERROR_IO_PENDING) { + uv__set_sys_error(loop, GetLastError()); + return -1; + } + + if (result) { + /* Request completed immediately. */ + req->queued_bytes = 0; + } else { + /* Request queued by the kernel. */ + req->queued_bytes = uv_count_bufs(bufs, bufcnt); + handle->write_queue_size += req->queued_bytes; + } + + if (handle->flags & UV_HANDLE_EMULATE_IOCP) { + req->event_handle = CreateEvent(NULL, 0, 0, NULL); + if (!req->event_handle) { + uv_fatal_error(GetLastError(), "CreateEvent"); + } + if (!RegisterWaitForSingleObject(&req->wait_handle, + req->overlapped.hEvent, post_completion_write_wait, (void*) req, + INFINITE, WT_EXECUTEINWAITTHREAD)) { + uv__set_sys_error(loop, GetLastError()); + return -1; + } + } + } + + if (handle->write_reqs_pending == 0) { + uv_ref(loop); } handle->reqs_pending++; @@ -999,7 +1239,7 @@ void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle, } assert(bytes == sizeof(ipc_frame.header)); - assert(ipc_frame.header.flags <= UV_IPC_UV_STREAM | UV_IPC_RAW_DATA); + assert(ipc_frame.header.flags <= (UV_IPC_UV_STREAM | UV_IPC_RAW_DATA)); if (ipc_frame.header.flags & UV_IPC_UV_STREAM) { assert(avail - sizeof(ipc_frame.header) >= @@ -1094,6 +1334,17 @@ void uv_process_pipe_write_req(uv_loop_t* loop, uv_pipe_t* handle, handle->write_queue_size -= req->queued_bytes; + if (handle->flags & UV_HANDLE_EMULATE_IOCP) { + if (req->wait_handle != INVALID_HANDLE_VALUE) { + UnregisterWait(req->wait_handle); + req->wait_handle = INVALID_HANDLE_VALUE; + } + if (req->event_handle) { + CloseHandle(req->event_handle); + req->event_handle = NULL; + } + } + if (req->ipc_header) { if (req == &handle->ipc_header_write_req) { req->type = UV_UNKNOWN_REQ; @@ -1112,6 +1363,17 @@ void uv_process_pipe_write_req(uv_loop_t* loop, uv_pipe_t* handle, } handle->write_reqs_pending--; + + if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE && + handle->non_overlapped_writes_tail) { + assert(handle->write_reqs_pending > 0); + uv_queue_non_overlapped_write(handle); + } + + if (handle->write_reqs_pending == 0) { + uv_unref(loop); + } + if (handle->write_reqs_pending == 0 && handle->flags & UV_HANDLE_SHUTTING) { uv_want_endgame(loop, (uv_handle_t*)handle); @@ -1277,21 +1539,19 @@ static void eof_timer_close_cb(uv_handle_t* handle) { void uv_pipe_open(uv_pipe_t* pipe, uv_file file) { - HANDLE os_handle; - - /* Special-case stdin with ipc. */ - if (file == 0 && pipe->ipc) { - os_handle = (HANDLE)_get_osfhandle(file); - - if (os_handle == INVALID_HANDLE_VALUE || - uv_set_pipe_handle(pipe->loop, pipe, os_handle) == -1) { - return; - } + HANDLE os_handle = (HANDLE)_get_osfhandle(file); - uv_pipe_connection_init(pipe); + if (os_handle == INVALID_HANDLE_VALUE || + uv_set_pipe_handle(pipe->loop, pipe, os_handle) == -1) { + return; + } + + uv_pipe_connection_init(pipe); + pipe->handle = os_handle; + + if (pipe->ipc) { + assert(!(pipe->flags & UV_HANDLE_NON_OVERLAPPED_PIPE)); pipe->ipc_pid = uv_parent_pid(); assert(pipe->ipc_pid != -1); - - pipe->handle = os_handle; } } diff --git a/deps/uv/src/win/stream.c b/deps/uv/src/win/stream.c index f12117841a..c2354eecba 100644 --- a/deps/uv/src/win/stream.c +++ b/deps/uv/src/win/stream.c @@ -43,6 +43,8 @@ void uv_connection_init(uv_stream_t* handle) { handle->write_reqs_pending = 0; uv_req_init(handle->loop, (uv_req_t*) &(handle->read_req)); + handle->read_req.event_handle = NULL; + handle->read_req.wait_handle = INVALID_HANDLE_VALUE; handle->read_req.type = UV_READ; handle->read_req.data = handle; } diff --git a/deps/uv/src/win/tcp.c b/deps/uv/src/win/tcp.c index 897ea5e9c4..ee0591e753 100644 --- a/deps/uv/src/win/tcp.c +++ b/deps/uv/src/win/tcp.c @@ -316,6 +316,7 @@ static void uv_tcp_queue_accept(uv_tcp_t* handle, uv_tcp_accept_t* req) { INFINITE, WT_EXECUTEINWAITTHREAD)) { SET_REQ_ERROR(req, GetLastError()); uv_insert_pending_req(loop, (uv_req_t*)req); + handle->reqs_pending++; return; } } else { @@ -335,7 +336,7 @@ static void uv_tcp_queue_accept(uv_tcp_t* handle, uv_tcp_accept_t* req) { static void uv_tcp_queue_read(uv_loop_t* loop, uv_tcp_t* handle) { - uv_req_t* req; + uv_read_t* req; uv_buf_t buf; int result; DWORD bytes, flags; @@ -375,7 +376,7 @@ static void uv_tcp_queue_read(uv_loop_t* loop, uv_tcp_t* handle) { handle->flags |= UV_HANDLE_READ_PENDING; req->overlapped.InternalHigh = bytes; handle->reqs_pending++; - uv_insert_pending_req(loop, req); + uv_insert_pending_req(loop, (uv_req_t*)req); } else if (UV_SUCCEEDED_WITH_IOCP(result == 0)) { /* The req will be processed with IOCP. */ handle->flags |= UV_HANDLE_READ_PENDING; @@ -383,7 +384,7 @@ static void uv_tcp_queue_read(uv_loop_t* loop, uv_tcp_t* handle) { } else { /* Make this req pending reporting an error. */ SET_REQ_ERROR(req, WSAGetLastError()); - uv_insert_pending_req(loop, req); + uv_insert_pending_req(loop, (uv_req_t*)req); handle->reqs_pending++; } } diff --git a/deps/uv/src/win/tty.c b/deps/uv/src/win/tty.c index 16064eed10..c02c102f12 100644 --- a/deps/uv/src/win/tty.c +++ b/deps/uv/src/win/tty.c @@ -239,7 +239,7 @@ static void CALLBACK uv_tty_post_raw_read(void* data, BOOLEAN didTimeout) { static void uv_tty_queue_read_raw(uv_loop_t* loop, uv_tty_t* handle) { - uv_req_t* req; + uv_read_t* req; BOOL r; assert(handle->flags & UV_HANDLE_READING); @@ -261,7 +261,7 @@ static void uv_tty_queue_read_raw(uv_loop_t* loop, uv_tty_t* handle) { if (!r) { handle->read_raw_wait = NULL; SET_REQ_ERROR(req, GetLastError()); - uv_insert_pending_req(loop, req); + uv_insert_pending_req(loop, (uv_req_t*)req); } handle->flags |= UV_HANDLE_READ_PENDING; @@ -309,7 +309,7 @@ static DWORD CALLBACK uv_tty_line_read_thread(void* data) { static void uv_tty_queue_read_line(uv_loop_t* loop, uv_tty_t* handle) { - uv_req_t* req; + uv_read_t* req; BOOL r; assert(handle->flags & UV_HANDLE_READING); @@ -337,7 +337,7 @@ static void uv_tty_queue_read_line(uv_loop_t* loop, uv_tty_t* handle) { if (!r) { handle->read_line_handle = NULL; SET_REQ_ERROR(req, GetLastError()); - uv_insert_pending_req(loop, req); + uv_insert_pending_req(loop, (uv_req_t*)req); goto out; } } @@ -347,7 +347,7 @@ static void uv_tty_queue_read_line(uv_loop_t* loop, uv_tty_t* handle) { WT_EXECUTELONGFUNCTION); if (!r) { SET_REQ_ERROR(req, GetLastError()); - uv_insert_pending_req(loop, req); + uv_insert_pending_req(loop, (uv_req_t*)req); } out: diff --git a/deps/uv/src/win/udp.c b/deps/uv/src/win/udp.c index cba80e5b45..07082ddb5d 100644 --- a/deps/uv/src/win/udp.c +++ b/deps/uv/src/win/udp.c @@ -24,9 +24,8 @@ #include "uv.h" #include "../uv-common.h" #include "internal.h" -#include -#if 0 + /* * Threshold of active udp streams for which to preallocate udp read buffers. */ @@ -34,7 +33,6 @@ const unsigned int uv_active_udp_streams_threshold = 0; /* A zero-size buffer for use by uv_udp_read */ static char uv_zero_[] = ""; -#endif /* Counter to keep track of active udp streams */ static unsigned int active_udp_streams = 0; @@ -63,6 +61,8 @@ int uv_udp_getsockname(uv_udp_t* handle, struct sockaddr* name, static int uv_udp_set_socket(uv_loop_t* loop, uv_udp_t* handle, SOCKET socket) { DWORD yes = 1; + WSAPROTOCOL_INFOW info; + int opt_len; assert(handle->socket == INVALID_SOCKET); @@ -89,14 +89,33 @@ static int uv_udp_set_socket(uv_loop_t* loop, uv_udp_t* handle, } if (pSetFileCompletionNotificationModes) { - if (pSetFileCompletionNotificationModes((HANDLE)socket, - FILE_SKIP_SET_EVENT_ON_HANDLE | - FILE_SKIP_COMPLETION_PORT_ON_SUCCESS)) { - handle->flags |= UV_HANDLE_SYNC_BYPASS_IOCP; - } else if (GetLastError() != ERROR_INVALID_FUNCTION) { + /* All know windowses that support SetFileCompletionNotificationModes */ + /* have a bug that makes it impossible to use this function in */ + /* conjunction with datagram sockets. We can work around that but only */ + /* if the user is using the default UDP driver (AFD) and has no other */ + /* LSPs stacked on top. Here we check whether that is the case. */ + opt_len = (int) sizeof info; + if (!getsockopt(socket, + SOL_SOCKET, + SO_PROTOCOL_INFOW, + (char*) &info, + &opt_len) == SOCKET_ERROR) { uv__set_sys_error(loop, GetLastError()); return -1; } + + if (info.ProtocolChain.ChainLen == 1) { + if (pSetFileCompletionNotificationModes((HANDLE)socket, + FILE_SKIP_SET_EVENT_ON_HANDLE | + FILE_SKIP_COMPLETION_PORT_ON_SUCCESS)) { + handle->flags |= UV_HANDLE_SYNC_BYPASS_IOCP; + handle->func_wsarecv = uv_wsarecv_workaround; + handle->func_wsarecvfrom = uv_wsarecvfrom_workaround; + } else if (GetLastError() != ERROR_INVALID_FUNCTION) { + uv__set_sys_error(loop, GetLastError()); + return -1; + } + } } handle->socket = socket; @@ -111,6 +130,8 @@ int uv_udp_init(uv_loop_t* loop, uv_udp_t* handle) { handle->reqs_pending = 0; handle->loop = loop; handle->flags = 0; + handle->func_wsarecv = WSARecv; + handle->func_wsarecvfrom = WSARecvFrom; uv_req_init(loop, (uv_req_t*) &(handle->recv_req)); handle->recv_req.type = UV_UDP_RECV; @@ -248,10 +269,9 @@ static void uv_udp_queue_recv(uv_loop_t* loop, uv_udp_t* handle) { * Preallocate a read buffer if the number of active streams is below * the threshold. */ -#if 0 if (active_udp_streams < uv_active_udp_streams_threshold) { handle->flags &= ~UV_HANDLE_ZERO_READ; -#endif + handle->recv_buffer = handle->alloc_cb((uv_handle_t*) handle, 65536); assert(handle->recv_buffer.len > 0); @@ -260,15 +280,15 @@ static void uv_udp_queue_recv(uv_loop_t* loop, uv_udp_t* handle) { handle->recv_from_len = sizeof handle->recv_from; flags = 0; - result = WSARecvFrom(handle->socket, - (WSABUF*) &buf, - 1, - &bytes, - &flags, - (struct sockaddr*) &handle->recv_from, - &handle->recv_from_len, - &req->overlapped, - NULL); + result = handle->func_wsarecvfrom(handle->socket, + (WSABUF*) &buf, + 1, + &bytes, + &flags, + (struct sockaddr*) &handle->recv_from, + &handle->recv_from_len, + &req->overlapped, + NULL); if (UV_SUCCEEDED_WITHOUT_IOCP(result == 0)) { /* Process the req without IOCP. */ @@ -286,21 +306,21 @@ static void uv_udp_queue_recv(uv_loop_t* loop, uv_udp_t* handle) { uv_insert_pending_req(loop, req); handle->reqs_pending++; } -#if 0 + } else { handle->flags |= UV_HANDLE_ZERO_READ; buf.base = (char*) uv_zero_; buf.len = 0; - flags = MSG_PARTIAL; + flags = MSG_PEEK; - result = WSARecv(handle->socket, - (WSABUF*) &buf, - 1, - &bytes, - &flags, - &req->overlapped, - NULL); + result = handle->func_wsarecv(handle->socket, + (WSABUF*) &buf, + 1, + &bytes, + &flags, + &req->overlapped, + NULL); if (UV_SUCCEEDED_WITHOUT_IOCP(result == 0)) { /* Process the req without IOCP. */ @@ -319,7 +339,6 @@ static void uv_udp_queue_recv(uv_loop_t* loop, uv_udp_t* handle) { handle->reqs_pending++; } } -#endif } @@ -448,34 +467,27 @@ void uv_process_udp_recv_req(uv_loop_t* loop, uv_udp_t* handle, handle->flags &= ~UV_HANDLE_READ_PENDING; if (!REQ_SUCCESS(req) && - GET_REQ_STATUS(req) != STATUS_RECEIVE_EXPEDITED) { + GET_REQ_SOCK_ERROR(req) != WSAEMSGSIZE) { /* An error occurred doing the read. */ - if ((handle->flags & UV_HANDLE_READING)) { - uv__set_sys_error(loop, GET_REQ_SOCK_ERROR(req)); + if (handle->flags & UV_HANDLE_READING) { + uv__set_sys_error(loop, GET_REQ_SOCK_ERROR(req)); uv_udp_recv_stop(handle); -#if 0 buf = (handle->flags & UV_HANDLE_ZERO_READ) ? uv_buf_init(NULL, 0) : handle->recv_buffer; -#else - buf = handle->recv_buffer; -#endif handle->recv_cb(handle, -1, buf, NULL, 0); } goto done; } -#if 0 if (!(handle->flags & UV_HANDLE_ZERO_READ)) { -#endif /* Successful read */ - partial = (GET_REQ_STATUS(req) == STATUS_RECEIVE_EXPEDITED); + partial = !REQ_SUCCESS(req); handle->recv_cb(handle, req->overlapped.InternalHigh, handle->recv_buffer, (struct sockaddr*) &handle->recv_from, partial ? UV_UDP_PARTIAL : 0); -#if 0 - } else { + } else if (handle->flags & UV_HANDLE_READING) { DWORD bytes, err, flags; struct sockaddr_storage from; int from_len; @@ -487,7 +499,8 @@ void uv_process_udp_recv_req(uv_loop_t* loop, uv_udp_t* handle, memset(&from, 0, sizeof from); from_len = sizeof from; - flags = MSG_PARTIAL; + + flags = 0; if (WSARecvFrom(handle->socket, (WSABUF*)&buf, @@ -500,14 +513,18 @@ void uv_process_udp_recv_req(uv_loop_t* loop, uv_udp_t* handle, NULL) != SOCKET_ERROR) { /* Message received */ - handle->recv_cb(handle, - bytes, - buf, - (struct sockaddr*) &from, - (flags & MSG_PARTIAL) ? UV_UDP_PARTIAL : 0); + handle->recv_cb(handle, bytes, buf, (struct sockaddr*) &from, 0); } else { err = WSAGetLastError(); - if (err == WSAEWOULDBLOCK) { + if (err == WSAEMSGSIZE) { + /* Message truncated */ + handle->recv_cb(handle, + bytes, + buf, + (struct sockaddr*) &from, + UV_UDP_PARTIAL); + } if (err == WSAEWOULDBLOCK) { + /* Kernel buffer empty */ uv__set_sys_error(loop, WSAEWOULDBLOCK); handle->recv_cb(handle, 0, buf, NULL, 0); } else { @@ -517,7 +534,6 @@ void uv_process_udp_recv_req(uv_loop_t* loop, uv_udp_t* handle, } } } -#endif done: /* Post another read if still reading and not closing. */ diff --git a/deps/uv/src/win/winapi.c b/deps/uv/src/win/winapi.c index 4f8597cc64..cc21361bc7 100644 --- a/deps/uv/src/win/winapi.c +++ b/deps/uv/src/win/winapi.c @@ -27,6 +27,7 @@ sRtlNtStatusToDosError pRtlNtStatusToDosError; +sNtDeviceIoControlFile pNtDeviceIoControlFile; sNtQueryInformationFile pNtQueryInformationFile; sNtSetInformationFile pNtSetInformationFile; sGetQueuedCompletionStatusEx pGetQueuedCompletionStatusEx; @@ -57,6 +58,13 @@ void uv_winapi_init() { uv_fatal_error(GetLastError(), "GetProcAddress"); } + pNtDeviceIoControlFile = (sNtDeviceIoControlFile) GetProcAddress( + ntdll_module, + "NtDeviceIoControlFile"); + if (pNtDeviceIoControlFile == NULL) { + uv_fatal_error(GetLastError(), "GetProcAddress"); + } + pNtSetInformationFile = (sNtSetInformationFile) GetProcAddress( ntdll_module, "NtSetInformationFile"); diff --git a/deps/uv/src/win/winapi.h b/deps/uv/src/win/winapi.h index 78ffe165b5..9ed808ea6b 100644 --- a/deps/uv/src/win/winapi.h +++ b/deps/uv/src/win/winapi.h @@ -4137,6 +4137,13 @@ typedef struct _FILE_BASIC_INFORMATION { DWORD FileAttributes; } FILE_BASIC_INFORMATION, *PFILE_BASIC_INFORMATION; +typedef struct _FILE_MODE_INFORMATION { + ULONG Mode; +} FILE_MODE_INFORMATION, *PFILE_MODE_INFORMATION; + +#define FILE_SYNCHRONOUS_IO_ALERT 0x00000010 +#define FILE_SYNCHRONOUS_IO_NONALERT 0x00000020 + typedef enum _FILE_INFORMATION_CLASS { FileDirectoryInformation = 1, FileFullDirectoryInformation, @@ -4270,9 +4277,26 @@ typedef enum _FILE_INFORMATION_CLASS { FILE_SPECIAL_ACCESS) #endif +typedef VOID (NTAPI *PIO_APC_ROUTINE) + (PVOID ApcContext, + PIO_STATUS_BLOCK IoStatusBlock, + ULONG Reserved); + typedef ULONG (NTAPI *sRtlNtStatusToDosError) (NTSTATUS Status); +typedef NTSTATUS (NTAPI *sNtDeviceIoControlFile) + (HANDLE FileHandle, + HANDLE Event, + PIO_APC_ROUTINE ApcRoutine, + PVOID ApcContext, + PIO_STATUS_BLOCK IoStatusBlock, + ULONG IoControlCode, + PVOID InputBuffer, + ULONG InputBufferLength, + PVOID OutputBuffer, + ULONG OutputBufferLength); + typedef NTSTATUS (NTAPI *sNtQueryInformationFile) (HANDLE FileHandle, PIO_STATUS_BLOCK IoStatusBlock, @@ -4325,6 +4349,7 @@ typedef BOOLEAN (WINAPI* sCreateSymbolicLinkW) /* Ntapi function pointers */ extern sRtlNtStatusToDosError pRtlNtStatusToDosError; +extern sNtDeviceIoControlFile pNtDeviceIoControlFile; extern sNtQueryInformationFile pNtQueryInformationFile; extern sNtSetInformationFile pNtSetInformationFile; diff --git a/deps/uv/src/win/winsock.c b/deps/uv/src/win/winsock.c index e37a60a9d3..5309f1eedb 100644 --- a/deps/uv/src/win/winsock.c +++ b/deps/uv/src/win/winsock.c @@ -216,3 +216,210 @@ int uv_ntstatus_to_winsock_error(NTSTATUS status) { } } } + + +/* + * This function provides a workaround for a bug in the winsock implementation + * of WSARecv. The problem is that when SetFileCompletionNotificationModes is + * used to avoid IOCP notifications of completed reads, WSARecv does not + * reliably indicate whether we can expect a completion package to be posted + * when the receive buffer is smaller than the received datagram. + * + * However it is desirable to use SetFileCompletionNotificationModes because + * it yields a massive performance increase. + * + * This function provides a workaround for that bug, but it only works for the + * specific case that we need it for. E.g. it assumes that the "avoid iocp" + * bit has been set, and supports only overlapped operation. It also requires + * the user to use the default msafd driver, doesn't work when other LSPs are + * stacked on top of it. + */ +int WSAAPI uv_wsarecv_workaround(SOCKET socket, WSABUF* buffers, + DWORD buffer_count, DWORD* bytes, DWORD* flags, WSAOVERLAPPED *overlapped, + LPWSAOVERLAPPED_COMPLETION_ROUTINE completion_routine) { + NTSTATUS status; + void* apc_context; + IO_STATUS_BLOCK* iosb = (IO_STATUS_BLOCK*) &overlapped->Internal; + AFD_RECV_INFO info; + DWORD error; + + if (overlapped == NULL || completion_routine != NULL) { + WSASetLastError(WSAEINVAL); + return SOCKET_ERROR; + } + + info.BufferArray = buffers; + info.BufferCount = buffer_count; + info.AfdFlags = AFD_OVERLAPPED; + info.TdiFlags = TDI_RECEIVE_NORMAL; + + if (*flags & MSG_PEEK) { + info.TdiFlags |= TDI_RECEIVE_PEEK; + } + + if (*flags & MSG_PARTIAL) { + info.TdiFlags |= TDI_RECEIVE_PARTIAL; + } + + if (!((intptr_t) overlapped->hEvent & 1)) { + apc_context = (void*) overlapped; + } else { + apc_context = NULL; + } + + iosb->Status = STATUS_PENDING; + iosb->Pointer = 0; + + status = pNtDeviceIoControlFile((HANDLE) socket, + overlapped->hEvent, + NULL, + apc_context, + iosb, + IOCTL_AFD_RECEIVE, + &info, + sizeof(info), + NULL, + 0); + + *flags = 0; + *bytes = (DWORD) iosb->Information; + + switch (status) { + case STATUS_SUCCESS: + error = ERROR_SUCCESS; + break; + + case STATUS_PENDING: + error = WSA_IO_PENDING; + break; + + case STATUS_BUFFER_OVERFLOW: + error = WSAEMSGSIZE; + break; + + case STATUS_RECEIVE_EXPEDITED: + error = ERROR_SUCCESS; + *flags = MSG_OOB; + break; + + case STATUS_RECEIVE_PARTIAL_EXPEDITED: + error = ERROR_SUCCESS; + *flags = MSG_PARTIAL | MSG_OOB; + break; + + case STATUS_RECEIVE_PARTIAL: + error = ERROR_SUCCESS; + *flags = MSG_PARTIAL; + break; + + default: + error = uv_ntstatus_to_winsock_error(status); + break; + } + + WSASetLastError(error); + + if (error == ERROR_SUCCESS) { + return 0; + } else { + return SOCKET_ERROR; + } +} + + +/* See description of uv_wsarecv_workaround. */ +int WSAAPI uv_wsarecvfrom_workaround(SOCKET socket, WSABUF* buffers, + DWORD buffer_count, DWORD* bytes, DWORD* flags, struct sockaddr* addr, + int* addr_len, WSAOVERLAPPED *overlapped, + LPWSAOVERLAPPED_COMPLETION_ROUTINE completion_routine) { + NTSTATUS status; + void* apc_context; + IO_STATUS_BLOCK* iosb = (IO_STATUS_BLOCK*) &overlapped->Internal; + AFD_RECV_DATAGRAM_INFO info; + DWORD error; + + if (overlapped == NULL || addr == NULL || addr_len == NULL || + completion_routine != NULL) { + WSASetLastError(WSAEINVAL); + return SOCKET_ERROR; + } + + info.BufferArray = buffers; + info.BufferCount = buffer_count; + info.AfdFlags = AFD_OVERLAPPED; + info.TdiFlags = TDI_RECEIVE_NORMAL; + info.Address = addr; + info.AddressLength = addr_len; + + if (*flags & MSG_PEEK) { + info.TdiFlags |= TDI_RECEIVE_PEEK; + } + + if (*flags & MSG_PARTIAL) { + info.TdiFlags |= TDI_RECEIVE_PARTIAL; + } + + if (!((intptr_t) overlapped->hEvent & 1)) { + apc_context = (void*) overlapped; + } else { + apc_context = NULL; + } + + iosb->Status = STATUS_PENDING; + iosb->Pointer = 0; + + status = pNtDeviceIoControlFile((HANDLE) socket, + overlapped->hEvent, + NULL, + apc_context, + iosb, + IOCTL_AFD_RECEIVE_DATAGRAM, + &info, + sizeof(info), + NULL, + 0); + + *flags = 0; + *bytes = (DWORD) iosb->Information; + + switch (status) { + case STATUS_SUCCESS: + error = ERROR_SUCCESS; + break; + + case STATUS_PENDING: + error = WSA_IO_PENDING; + break; + + case STATUS_BUFFER_OVERFLOW: + error = WSAEMSGSIZE; + break; + + case STATUS_RECEIVE_EXPEDITED: + error = ERROR_SUCCESS; + *flags = MSG_OOB; + break; + + case STATUS_RECEIVE_PARTIAL_EXPEDITED: + error = ERROR_SUCCESS; + *flags = MSG_PARTIAL | MSG_OOB; + break; + + case STATUS_RECEIVE_PARTIAL: + error = ERROR_SUCCESS; + *flags = MSG_PARTIAL; + break; + + default: + error = uv_ntstatus_to_winsock_error(status); + break; + } + + WSASetLastError(error); + + if (error == ERROR_SUCCESS) { + return 0; + } else { + return SOCKET_ERROR; + } +} diff --git a/deps/uv/src/win/winsock.h b/deps/uv/src/win/winsock.h index 1927d656ff..18978cf345 100644 --- a/deps/uv/src/win/winsock.h +++ b/deps/uv/src/win/winsock.h @@ -39,14 +39,66 @@ #define IPV6_V6ONLY 27 #endif -/* Whether ipv6 is supported */ -extern int uv_allow_ipv6; +/* + * TDI defines that are only in the DDK. + * We only need receive flags so far. + */ +#ifndef TDI_RECEIVE_NORMAL + #define TDI_RECEIVE_BROADCAST 0x00000004 + #define TDI_RECEIVE_MULTICAST 0x00000008 + #define TDI_RECEIVE_PARTIAL 0x00000010 + #define TDI_RECEIVE_NORMAL 0x00000020 + #define TDI_RECEIVE_EXPEDITED 0x00000040 + #define TDI_RECEIVE_PEEK 0x00000080 + #define TDI_RECEIVE_NO_RESPONSE_EXP 0x00000100 + #define TDI_RECEIVE_COPY_LOOKAHEAD 0x00000200 + #define TDI_RECEIVE_ENTIRE_MESSAGE 0x00000400 + #define TDI_RECEIVE_AT_DISPATCH_LEVEL 0x00000800 + #define TDI_RECEIVE_CONTROL_INFO 0x00001000 + #define TDI_RECEIVE_FORCE_INDICATION 0x00002000 + #define TDI_RECEIVE_NO_PUSH 0x00004000 +#endif + +/* + * The "Auxiliary Function Driver" is the windows kernel-mode driver that does + * TCP, UDP etc. Winsock is just a layer that dispatches requests to it. + * Having these definitions allows us to bypass winsock and make an AFD kernel + * call directly, avoiding a bug in winsock's recvfrom implementation. + */ + +#define AFD_NO_FAST_IO 0x00000001 +#define AFD_OVERLAPPED 0x00000002 +#define AFD_IMMEDIATE 0x00000004 + +typedef struct _AFD_RECV_DATAGRAM_INFO { + LPWSABUF BufferArray; + ULONG BufferCount; + ULONG AfdFlags; + ULONG TdiFlags; + struct sockaddr* Address; + int* AddressLength; +} AFD_RECV_DATAGRAM_INFO, *PAFD_RECV_DATAGRAM_INFO; + +typedef struct _AFD_RECV_INFO { + LPWSABUF BufferArray; + ULONG BufferCount; + ULONG AfdFlags; + ULONG TdiFlags; +} AFD_RECV_INFO, *PAFD_RECV_INFO; + + +#define _AFD_CONTROL_CODE(operation, method) \ + ((FSCTL_AFD_BASE) << 12 | (operation << 2) | method) + +#define FSCTL_AFD_BASE FILE_DEVICE_NETWORK + +#define AFD_RECEIVE 5 +#define AFD_RECEIVE_DATAGRAM 6 -BOOL uv_get_acceptex_function(SOCKET socket, LPFN_ACCEPTEX* target); -BOOL uv_get_connectex_function(SOCKET socket, LPFN_CONNECTEX* target); +#define IOCTL_AFD_RECEIVE \ + _AFD_CONTROL_CODE(AFD_RECEIVE, METHOD_NEITHER) -/* Ip address used to bind to any port at any interface */ -extern struct sockaddr_in uv_addr_ip4_any_; -extern struct sockaddr_in6 uv_addr_ip6_any_; +#define IOCTL_AFD_RECEIVE_DATAGRAM \ + _AFD_CONTROL_CODE(AFD_RECEIVE_DATAGRAM, METHOD_NEITHER) #endif /* UV_WIN_WINSOCK_H_ */ diff --git a/deps/uv/test/run-tests.c b/deps/uv/test/run-tests.c index fa7b8b8f30..7fb48d14b6 100644 --- a/deps/uv/test/run-tests.c +++ b/deps/uv/test/run-tests.c @@ -55,6 +55,11 @@ static uv_write_t conn_notify_req; static int close_cb_called; static int connection_accepted; +static uv_pipe_t stdin_pipe; +static uv_pipe_t stdout_pipe; +static int on_pipe_read_called; +static int after_write_called; + static void close_cb(uv_handle_t* handle) { close_cb_called++; @@ -148,6 +153,98 @@ static int ipc_helper() { } +void on_pipe_read(uv_stream_t* tcp, ssize_t nread, uv_buf_t buf) { + ASSERT(nread > 0); + ASSERT(memcmp("hello world\n", buf.base, nread) == 0); + on_pipe_read_called++; + + free(buf.base); + + uv_close((uv_handle_t*)&stdin_pipe, close_cb); + uv_close((uv_handle_t*)&stdout_pipe, close_cb); +} + + +static uv_buf_t on_pipe_read_alloc(uv_handle_t* handle, + size_t suggested_size) { + uv_buf_t buf; + buf.base = (char*)malloc(suggested_size); + buf.len = suggested_size; + return buf; +} + + +static void after_pipe_write(uv_write_t* req, int status) { + ASSERT(status == 0); + after_write_called++; +} + + +static int stdio_over_pipes_helper() { + /* Write several buffers to test that the write order is preserved. */ + char* buffers[] = { + "he", + "ll", + "o ", + "wo", + "rl", + "d", + "\n" + }; + + uv_write_t write_req[COUNTOF(buffers)]; + uv_buf_t buf[COUNTOF(buffers)]; + int r, i; + uv_loop_t* loop = uv_default_loop(); + + ASSERT(UV_NAMED_PIPE == uv_guess_handle(0)); + ASSERT(UV_NAMED_PIPE == uv_guess_handle(1)); + + r = uv_pipe_init(loop, &stdin_pipe, 0); + ASSERT(r == 0); + r = uv_pipe_init(loop, &stdout_pipe, 0); + ASSERT(r == 0); + + uv_pipe_open(&stdin_pipe, 0); + uv_pipe_open(&stdout_pipe, 1); + + /* Unref both stdio handles to make sure that all writes complete. */ + uv_unref(loop); + uv_unref(loop); + + for (i = 0; i < COUNTOF(buffers); i++) { + buf[i] = uv_buf_init((char*)buffers[i], strlen(buffers[i])); + } + + for (i = 0; i < COUNTOF(buffers); i++) { + r = uv_write(&write_req[i], (uv_stream_t*)&stdout_pipe, &buf[i], 1, + after_pipe_write); + ASSERT(r == 0); + } + + uv_run(loop); + + ASSERT(after_write_called == 7); + ASSERT(on_pipe_read_called == 0); + ASSERT(close_cb_called == 0); + + uv_ref(loop); + uv_ref(loop); + + r = uv_read_start((uv_stream_t*)&stdin_pipe, on_pipe_read_alloc, + on_pipe_read); + ASSERT(r == 0); + + uv_run(loop); + + ASSERT(after_write_called == 7); + ASSERT(on_pipe_read_called == 1); + ASSERT(close_cb_called == 2); + + return 0; +} + + static int maybe_run_test(int argc, char **argv) { if (strcmp(argv[1], "--list") == 0) { print_tests(stdout); @@ -158,6 +255,10 @@ static int maybe_run_test(int argc, char **argv) { return ipc_helper(); } + if (strcmp(argv[1], "stdio_over_pipes_helper") == 0) { + return stdio_over_pipes_helper(); + } + if (strcmp(argv[1], "spawn_helper1") == 0) { return 1; } diff --git a/deps/uv/test/task.h b/deps/uv/test/task.h index 76c6903311..e28b393bb7 100644 --- a/deps/uv/test/task.h +++ b/deps/uv/test/task.h @@ -38,6 +38,8 @@ # define TEST_PIPENAME_2 "/tmp/uv-test-sock2" #endif +#define COUNTOF(a) (sizeof(a) / sizeof(a[0])) + typedef enum { TCP = 0, PIPE diff --git a/deps/uv/test/test-list.h b/deps/uv/test/test-list.h index 925358478a..17b98c2185 100644 --- a/deps/uv/test/test-list.h +++ b/deps/uv/test/test-list.h @@ -20,6 +20,7 @@ */ TEST_DECLARE (tty) +TEST_DECLARE (stdio_over_pipes) TEST_DECLARE (ipc) TEST_DECLARE (tcp_ping_pong) TEST_DECLARE (tcp_ping_pong_v6) @@ -117,6 +118,7 @@ HELPER_DECLARE (pipe_echo_server) TASK_LIST_START TEST_ENTRY (tty) + TEST_ENTRY (stdio_over_pipes) TEST_ENTRY (ipc) diff --git a/deps/uv/test/test-stdio-over-pipes.c b/deps/uv/test/test-stdio-over-pipes.c new file mode 100644 index 0000000000..fd96fc2d28 --- /dev/null +++ b/deps/uv/test/test-stdio-over-pipes.c @@ -0,0 +1,154 @@ +/* Copyright Joyent, Inc. and other Node contributors. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +#include "uv.h" +#include "task.h" + + +static char exepath[1024]; +static size_t exepath_size = 1024; +static char* args[3]; +static uv_process_options_t options; +static int close_cb_called; +static int exit_cb_called; +static int on_read_cb_called; +static int after_write_cb_called; +uv_pipe_t out, in; +static uv_loop_t* loop; +#define OUTPUT_SIZE 1024 +static char output[OUTPUT_SIZE]; +static int output_used; + +typedef struct { + uv_write_t req; + uv_buf_t buf; +} write_req_t; + + +static void close_cb(uv_handle_t* handle) { + printf("close_cb\n"); + close_cb_called++; +} + + +static void exit_cb(uv_process_t* process, int exit_status, int term_signal) { + printf("exit_cb\n"); + exit_cb_called++; + ASSERT(exit_status == 0); + ASSERT(term_signal == 0); + uv_close((uv_handle_t*)process, close_cb); + uv_close((uv_handle_t*)&in, close_cb); + uv_close((uv_handle_t*)&out, close_cb); +} + + +static void init_process_options(char* test, uv_exit_cb exit_cb) { + int r = uv_exepath(exepath, &exepath_size); + ASSERT(r == 0); + exepath[exepath_size] = '\0'; + args[0] = exepath; + args[1] = test; + args[2] = NULL; + options.file = exepath; + options.args = args; + options.exit_cb = exit_cb; +} + + +static uv_buf_t on_alloc(uv_handle_t* handle, size_t suggested_size) { + uv_buf_t buf; + buf.base = output + output_used; + buf.len = OUTPUT_SIZE - output_used; + return buf; +} + + +static void after_write(uv_write_t* req, int status) { + write_req_t* wr; + + if (status) { + uv_err_t err = uv_last_error(loop); + fprintf(stderr, "uv_write error: %s\n", uv_strerror(err)); + ASSERT(0); + } + + wr = (write_req_t*) req; + + /* Free the read/write buffer and the request */ + free(wr); + + after_write_cb_called++; +} + + +static void on_read(uv_stream_t* tcp, ssize_t nread, uv_buf_t buf) { + write_req_t* write_req; + int r; + uv_err_t err = uv_last_error(uv_default_loop()); + + ASSERT(nread > 0 || err.code == UV_EOF); + + if (nread > 0) { + output_used += nread; + if (output_used == 12) { + ASSERT(memcmp("hello world\n", output, 12) == 0); + write_req = (write_req_t*)malloc(sizeof(*write_req)); + write_req->buf = uv_buf_init(output, output_used); + r = uv_write(&write_req->req, (uv_stream_t*)&in, &write_req->buf, 1, after_write); + ASSERT(r == 0); + } + } + + on_read_cb_called++; +} + + +TEST_IMPL(stdio_over_pipes) { + int r; + uv_process_t process; + loop = uv_default_loop(); + + init_process_options("stdio_over_pipes_helper", exit_cb); + + uv_pipe_init(loop, &out, 0); + options.stdout_stream = &out; + uv_pipe_init(loop, &in, 0); + options.stdin_stream = ∈ + + r = uv_spawn(loop, &process, options); + ASSERT(r == 0); + + r = uv_read_start((uv_stream_t*) &out, on_alloc, on_read); + ASSERT(r == 0); + + r = uv_run(uv_default_loop()); + ASSERT(r == 0); + + ASSERT(on_read_cb_called > 1); + ASSERT(after_write_cb_called == 1); + ASSERT(exit_cb_called == 1); + ASSERT(close_cb_called == 3); + ASSERT(memcmp("hello world\n", output, 12) == 0); + ASSERT(output_used == 12); + + return 0; +} + diff --git a/deps/uv/uv.gyp b/deps/uv/uv.gyp index 3503f27f7e..9fe0867b14 100644 --- a/deps/uv/uv.gyp +++ b/deps/uv/uv.gyp @@ -286,6 +286,7 @@ 'test/test-ref.c', 'test/test-shutdown-eof.c', 'test/test-spawn.c', + 'test/test-stdio-over-pipes.c', 'test/test-tcp-bind-error.c', 'test/test-tcp-bind6-error.c', 'test/test-tcp-close.c',