Browse Source

deps: upgrade libuv to e079a99

v0.9.4-release
Ben Noordhuis 12 years ago
parent
commit
6cf68aead6
  1. 2
      deps/uv/.gitignore
  2. 4
      deps/uv/Makefile
  3. 4
      deps/uv/config-mingw.mk
  4. 32
      deps/uv/config-unix.mk
  5. 3
      deps/uv/include/uv-private/uv-darwin.h
  6. 7
      deps/uv/include/uv-private/uv-unix.h
  7. 56
      deps/uv/include/uv.h
  8. 2
      deps/uv/src/unix/async.c
  9. 25
      deps/uv/src/unix/core.c
  10. 10
      deps/uv/src/unix/fs.c
  11. 19
      deps/uv/src/unix/getaddrinfo.c
  12. 12
      deps/uv/src/unix/internal.h
  13. 17
      deps/uv/src/unix/pipe.c
  14. 9
      deps/uv/src/unix/process.c
  15. 349
      deps/uv/src/unix/stream.c
  16. 50
      deps/uv/src/unix/tcp.c
  17. 143
      deps/uv/src/unix/threadpool.c
  18. 9
      deps/uv/src/unix/timer.c
  19. 4
      deps/uv/src/unix/tty.c
  20. 2
      deps/uv/src/unix/udp.c
  21. 10
      deps/uv/src/win/core.c
  22. 2
      deps/uv/src/win/error.c
  23. 18
      deps/uv/src/win/process.c
  24. 10
      deps/uv/src/win/threadpool.c
  25. 250
      deps/uv/src/win/util.c
  26. 7
      deps/uv/test/test-condvar-consumer-producer.c
  27. 132
      deps/uv/test/test-embed.c
  28. 15
      deps/uv/test/test-list.h
  29. 73
      deps/uv/test/test-tcp-read-stop.c
  30. 266
      deps/uv/test/test-threadpool-cancel.c
  31. 21
      deps/uv/test/test-threadpool.c
  32. 3
      deps/uv/uv.gyp
  33. 9
      src/node_crypto.cc
  34. 4
      src/node_zlib.cc

2
deps/uv/.gitignore

@ -31,5 +31,3 @@ UpgradeLog*.XML
Debug
Release
ipch
*.mk
*.Makefile

4
deps/uv/Makefile

@ -44,10 +44,10 @@ BENCHMARKS=test/blackhole-server.c test/echo-server.c test/dns-server.c test/ben
all: libuv.a
test/run-tests$(E): test/run-tests.c test/runner.c $(RUNNER_SRC) $(TESTS) libuv.$(SOEXT)
$(CC) $(CPPFLAGS) $(RUNNER_CFLAGS) -o $@ $^ $(RUNNER_LIBS) $(RUNNER_LINKFLAGS)
$(CC) $(CPPFLAGS) $(RUNNER_CFLAGS) -o $@ $^ $(RUNNER_LIBS) $(RUNNER_LDFLAGS)
test/run-benchmarks$(E): test/run-benchmarks.c test/runner.c $(RUNNER_SRC) $(BENCHMARKS) libuv.$(SOEXT)
$(CC) $(CPPFLAGS) $(RUNNER_CFLAGS) -o $@ $^ $(RUNNER_LIBS) $(RUNNER_LINKFLAGS)
$(CC) $(CPPFLAGS) $(RUNNER_CFLAGS) -o $@ $^ $(RUNNER_LIBS) $(RUNNER_LDFLAGS)
test/echo.o: test/echo.c test/echo.h

4
deps/uv/config-mingw.mk

@ -25,13 +25,13 @@ AR = $(PREFIX)ar
E=.exe
CFLAGS=$(CPPFLAGS) -g --std=gnu89 -D_WIN32_WINNT=0x0600
LINKFLAGS=-lm
LDFLAGS=-lm
WIN_SRCS=$(wildcard src/win/*.c)
WIN_OBJS=$(WIN_SRCS:.c=.o)
RUNNER_CFLAGS=$(CFLAGS) -D_GNU_SOURCE # Need _GNU_SOURCE for strdup?
RUNNER_LINKFLAGS=$(LINKFLAGS)
RUNNER_LDFLAGS=$(LDFLAGS)
RUNNER_LIBS=-lws2_32 -lpsapi -liphlpapi
RUNNER_SRC=test/runner-win.c

32
deps/uv/config-unix.mk

@ -22,14 +22,14 @@ E=
CSTDFLAG=--std=c89 -pedantic -Wall -Wextra -Wno-unused-parameter
CFLAGS += -g
CPPFLAGS += -Isrc
LINKFLAGS=-lm
LDFLAGS=-lm
CPPFLAGS += -D_LARGEFILE_SOURCE
CPPFLAGS += -D_FILE_OFFSET_BITS=64
RUNNER_SRC=test/runner-unix.c
RUNNER_CFLAGS=$(CFLAGS) -Itest
RUNNER_LINKFLAGS=-L"$(PWD)" -luv -Xlinker -rpath -Xlinker "$(PWD)"
RUNNER_LDFLAGS=-L"$(PWD)" -luv -Xlinker -rpath -Xlinker "$(PWD)"
OBJS += src/unix/async.o
OBJS += src/unix/core.o
@ -56,21 +56,21 @@ OBJS += src/inet.o
ifeq (SunOS,$(uname_S))
CPPFLAGS += -D__EXTENSIONS__ -D_XOPEN_SOURCE=500
LINKFLAGS+=-lkstat -lnsl -lsendfile -lsocket
LDFLAGS+=-lkstat -lnsl -lsendfile -lsocket
# Library dependencies are not transitive.
RUNNER_LINKFLAGS += $(LINKFLAGS)
RUNNER_LDFLAGS += $(LDFLAGS)
OBJS += src/unix/sunos.o
endif
ifeq (AIX,$(uname_S))
CPPFLAGS += -Isrc/ares/config_aix -D_ALL_SOURCE -D_XOPEN_SOURCE=500
LINKFLAGS+= -lperfstat
LDFLAGS+= -lperfstat
OBJS += src/unix/aix.o
endif
ifeq (Darwin,$(uname_S))
CPPFLAGS += -D_DARWIN_USE_64_BIT_INODE=1
LINKFLAGS+=-framework CoreServices -dynamiclib -install_name "@rpath/libuv.dylib"
LDFLAGS+=-framework CoreServices -dynamiclib -install_name "@rpath/libuv.dylib"
SOEXT = dylib
OBJS += src/unix/darwin.o
OBJS += src/unix/kqueue.o
@ -79,7 +79,7 @@ endif
ifeq (Linux,$(uname_S))
CSTDFLAG += -D_GNU_SOURCE
LINKFLAGS+=-ldl -lrt
LDFLAGS+=-ldl -lrt
RUNNER_CFLAGS += -D_GNU_SOURCE
OBJS += src/unix/linux/linux-core.o \
src/unix/linux/inotify.o \
@ -87,25 +87,25 @@ OBJS += src/unix/linux/linux-core.o \
endif
ifeq (FreeBSD,$(uname_S))
LINKFLAGS+=-lkvm
LDFLAGS+=-lkvm
OBJS += src/unix/freebsd.o
OBJS += src/unix/kqueue.o
endif
ifeq (DragonFly,$(uname_S))
LINKFLAGS+=-lkvm
LDFLAGS+=-lkvm
OBJS += src/unix/freebsd.o
OBJS += src/unix/kqueue.o
endif
ifeq (NetBSD,$(uname_S))
LINKFLAGS+=-lkvm
LDFLAGS+=-lkvm
OBJS += src/unix/netbsd.o
OBJS += src/unix/kqueue.o
endif
ifeq (OpenBSD,$(uname_S))
LINKFLAGS+=-lkvm
LDFLAGS+=-lkvm
OBJS += src/unix/openbsd.o
OBJS += src/unix/kqueue.o
endif
@ -113,22 +113,22 @@ endif
ifneq (,$(findstring CYGWIN,$(uname_S)))
# We drop the --std=c89, it hides CLOCK_MONOTONIC on cygwin
CSTDFLAG = -D_GNU_SOURCE
LINKFLAGS+=
LDFLAGS+=
OBJS += src/unix/cygwin.o
endif
ifeq (SunOS,$(uname_S))
RUNNER_LINKFLAGS += -pthreads
RUNNER_LDFLAGS += -pthreads
else
RUNNER_LINKFLAGS += -pthread
RUNNER_LDFLAGS += -pthread
endif
libuv.a: $(OBJS)
$(AR) rcs $@ $^
libuv.$(SOEXT): CFLAGS += -fPIC
libuv.$(SOEXT): override CFLAGS += -fPIC
libuv.$(SOEXT): $(OBJS)
$(CC) -shared -o $@ $^ $(LINKFLAGS)
$(CC) -shared -o $@ $^ $(LDFLAGS)
src/%.o: src/%.c include/uv.h include/uv-private/uv-unix.h
$(CC) $(CSTDFLAG) $(CPPFLAGS) $(CFLAGS) -c $< -o $@

3
deps/uv/include/uv-private/uv-darwin.h

@ -49,4 +49,7 @@
uv_sem_t cf_sem; \
uv_mutex_t cf_mutex; \
#define UV_STREAM_PRIVATE_PLATFORM_FIELDS \
void* select; \
#endif /* UV_DARWIN_H */

7
deps/uv/include/uv-private/uv-unix.h

@ -60,7 +60,7 @@ struct uv__io_s {
struct uv__work {
void (*work)(struct uv__work *w);
void (*done)(struct uv__work *w);
void (*done)(struct uv__work *w, int status);
struct uv_loop_s* loop;
ngx_queue_t wq;
};
@ -90,6 +90,10 @@ struct uv__work {
# define UV_PLATFORM_FS_EVENT_FIELDS /* empty */
#endif
#ifndef UV_STREAM_PRIVATE_PLATFORM_FIELDS
# define UV_STREAM_PRIVATE_PLATFORM_FIELDS /* empty */
#endif
/* Note: May be cast to struct iovec. See writev(2). */
typedef struct {
char* base;
@ -209,6 +213,7 @@ typedef struct {
uv_connection_cb connection_cb; \
int delayed_error; \
int accepted_fd; \
UV_STREAM_PRIVATE_PLATFORM_FIELDS \
#define UV_TCP_PRIVATE_FIELDS /* empty */

56
deps/uv/include/uv.h

@ -261,6 +261,28 @@ UV_EXTERN void uv_unref(uv_handle_t*);
UV_EXTERN void uv_update_time(uv_loop_t*);
UV_EXTERN int64_t uv_now(uv_loop_t*);
/*
* Get backend file descriptor. Only kqueue, epoll and event ports are
* supported.
*
* This can be used in conjuction with uv_run_once() to poll in one thread and
* run the event loop's event callbacks in another.
*
* Useful for embedding libuv's event loop in another event loop.
* See test/test-embed.c for an example.
*
* Note that embedding a kqueue fd in another kqueue pollset doesn't work on
* all platforms. It's not an error to add the fd but it never generates
* events.
*/
UV_EXTERN int uv_backend_fd(const uv_loop_t*);
/*
* Get the poll timeout. The return value is in milliseconds, or -1 for no
* timeout.
*/
UV_EXTERN int uv_backend_timeout(const uv_loop_t*);
/*
* Should return a buffer that libuv can use to read data into.
@ -308,7 +330,7 @@ typedef void (*uv_exit_cb)(uv_process_t*, int exit_status, int term_signal);
typedef void (*uv_walk_cb)(uv_handle_t* handle, void* arg);
typedef void (*uv_fs_cb)(uv_fs_t* req);
typedef void (*uv_work_cb)(uv_work_t* req);
typedef void (*uv_after_work_cb)(uv_work_t* req);
typedef void (*uv_after_work_cb)(uv_work_t* req, int status);
typedef void (*uv_getaddrinfo_cb)(uv_getaddrinfo_t* req,
int status,
struct addrinfo* res);
@ -1314,7 +1336,13 @@ enum uv_process_flags {
* parent's event loop alive unless the parent process calls uv_unref() on
* the child's process handle.
*/
UV_PROCESS_DETACHED = (1 << 3)
UV_PROCESS_DETACHED = (1 << 3),
/*
* Hide the subprocess console window that would normally be created. This
* option is only meaningful on Windows systems. On unix it is silently
* ignored.
*/
UV_PROCESS_WINDOWS_HIDE = (1 << 4)
};
/*
@ -1358,6 +1386,30 @@ struct uv_work_s {
UV_EXTERN int uv_queue_work(uv_loop_t* loop, uv_work_t* req,
uv_work_cb work_cb, uv_after_work_cb after_work_cb);
/* Cancel a pending request. Fails if the request is executing or has finished
* executing.
*
* Returns 0 on success, -1 on error. The loop error code is not touched.
*
* Only cancellation of uv_fs_t, uv_getaddrinfo_t and uv_work_t requests is
* currently supported.
*
* Cancelled requests have their callbacks invoked some time in the future.
* It's _not_ safe to free the memory associated with the request until your
* callback is called.
*
* Here is how cancellation is reported to your callback:
*
* - A uv_fs_t request has its req->errorno field set to UV_ECANCELED.
*
* - A uv_work_t or uv_getaddrinfo_t request has its callback invoked with
* status == -1 and uv_last_error(loop).code == UV_ECANCELED.
*
* This function is currently only implemented on UNIX platforms. On Windows,
* it always returns -1.
*/
UV_EXTERN int uv_cancel(uv_req_t* req);
struct uv_cpu_info_s {
char* model;

2
deps/uv/src/unix/async.c

@ -84,6 +84,8 @@ int uv_async_send(uv_async_t* handle) {
r = write(handle->loop->async_pipefd[1], "x", 1);
while (r == -1 && errno == EINTR);
assert(r == -1 || r == 1);
if (r == -1 && errno != EAGAIN && errno != EWOULDBLOCK)
return uv__set_sys_error(handle->loop, errno);

25
deps/uv/src/unix/core.c

@ -248,7 +248,12 @@ void uv_loop_delete(uv_loop_t* loop) {
}
static unsigned int uv__poll_timeout(uv_loop_t* loop) {
int uv_backend_fd(const uv_loop_t* loop) {
return loop->backend_fd;
}
int uv_backend_timeout(const uv_loop_t* loop) {
if (!uv__has_active_handles(loop) && !uv__has_active_reqs(loop))
return 0;
@ -268,7 +273,7 @@ static int uv__run(uv_loop_t* loop) {
uv__run_idle(loop);
uv__run_prepare(loop);
uv__run_pending(loop);
uv__io_poll(loop, uv__poll_timeout(loop));
uv__io_poll(loop, uv_backend_timeout(loop));
uv__run_check(loop);
uv__run_closing_handles(loop);
return uv__has_active_handles(loop) || uv__has_active_reqs(loop);
@ -325,6 +330,13 @@ int uv__socket(int domain, int type, int protocol) {
sockfd = -1;
}
#if defined(SO_NOSIGPIPE)
{
int on = 1;
setsockopt(sockfd, SOL_SOCKET, SO_NOSIGPIPE, &on, sizeof(on));
}
#endif
out:
return sockfd;
}
@ -629,9 +641,6 @@ void uv__io_stop(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
w->pevents &= ~events;
if (w->pevents == 0) {
ngx_queue_remove(&w->pending_queue);
ngx_queue_init(&w->pending_queue);
ngx_queue_remove(&w->watcher_queue);
ngx_queue_init(&w->watcher_queue);
@ -648,6 +657,12 @@ void uv__io_stop(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
}
void uv__io_close(uv_loop_t* loop, uv__io_t* w) {
uv__io_stop(loop, w, UV__POLLIN | UV__POLLOUT);
ngx_queue_remove(&w->pending_queue);
}
void uv__io_feed(uv_loop_t* loop, uv__io_t* w) {
if (ngx_queue_empty(&w->pending_queue))
ngx_queue_insert_tail(&loop->pending_queue, &w->pending_queue);

10
deps/uv/src/unix/fs.c

@ -90,7 +90,7 @@
} \
else { \
uv__fs_work(&(req)->work_req); \
uv__fs_done(&(req)->work_req); \
uv__fs_done(&(req)->work_req, 0); \
return (req)->result; \
} \
} \
@ -516,7 +516,7 @@ static void uv__fs_work(struct uv__work* w) {
}
static void uv__fs_done(struct uv__work* w) {
static void uv__fs_done(struct uv__work* w, int status) {
uv_fs_t* req;
req = container_of(w, uv_fs_t, work_req);
@ -527,6 +527,12 @@ static void uv__fs_done(struct uv__work* w) {
uv__set_artificial_error(req->loop, req->errorno);
}
if (status == -UV_ECANCELED) {
assert(req->errorno == 0);
req->errorno = UV_ECANCELED;
uv__set_artificial_error(req->loop, UV_ECANCELED);
}
if (req->cb != NULL)
req->cb(req);
}

19
deps/uv/src/unix/getaddrinfo.c

@ -37,11 +37,16 @@ static void uv__getaddrinfo_work(struct uv__work* w) {
}
static void uv__getaddrinfo_done(struct uv__work* w) {
static void uv__getaddrinfo_done(struct uv__work* w, int status) {
uv_getaddrinfo_t* req = container_of(w, uv_getaddrinfo_t, work_req);
struct addrinfo *res = req->res;
#if __sun
size_t hostlen = strlen(req->hostname);
size_t hostlen;
if (req->hostname)
hostlen = strlen(req->hostname);
else
hostlen = 0;
#endif
req->res = NULL;
@ -58,6 +63,10 @@ static void uv__getaddrinfo_done(struct uv__work* w) {
else
assert(0);
req->hints = NULL;
req->service = NULL;
req->hostname = NULL;
if (req->retcode == 0) {
/* OK */
#if EAI_NODATA /* FreeBSD deprecated EAI_NODATA */
@ -75,6 +84,12 @@ static void uv__getaddrinfo_done(struct uv__work* w) {
req->loop->last_err.sys_errno_ = req->retcode;
}
if (status == -UV_ECANCELED) {
assert(req->retcode == 0);
req->retcode = UV_ECANCELED;
uv__set_artificial_error(req->loop, UV_ECANCELED);
}
req->cb(req, req->retcode, res);
}

12
deps/uv/src/unix/internal.h

@ -130,6 +130,7 @@ void uv__make_close_pending(uv_handle_t* handle);
void uv__io_init(uv__io_t* w, uv__io_cb cb, int fd);
void uv__io_start(uv_loop_t* loop, uv__io_t* w, unsigned int events);
void uv__io_stop(uv_loop_t* loop, uv__io_t* w, unsigned int events);
void uv__io_close(uv_loop_t* loop, uv__io_t* w);
void uv__io_feed(uv_loop_t* loop, uv__io_t* w);
int uv__io_active(const uv__io_t* w, unsigned int events);
void uv__io_poll(uv_loop_t* loop, int timeout); /* in milliseconds or -1 */
@ -163,7 +164,7 @@ int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb);
/* timer */
void uv__run_timers(uv_loop_t* loop);
unsigned int uv__next_timeout(uv_loop_t* loop);
int uv__next_timeout(const uv_loop_t* loop);
/* signal */
void uv__signal_close(uv_signal_t* handle);
@ -174,7 +175,7 @@ void uv__signal_loop_cleanup();
void uv__work_submit(uv_loop_t* loop,
struct uv__work *w,
void (*work)(struct uv__work *w),
void (*done)(struct uv__work *w));
void (*done)(struct uv__work *w, int status));
void uv__work_done(uv_async_t* handle, int status);
/* platform specific */
@ -197,6 +198,13 @@ void uv__timer_close(uv_timer_t* handle);
void uv__udp_close(uv_udp_t* handle);
void uv__udp_finish_close(uv_udp_t* handle);
#if defined(__APPLE__)
int uv___stream_fd(uv_stream_t* handle);
#define uv__stream_fd(handle) (uv___stream_fd((uv_stream_t*) (handle)))
#else
#define uv__stream_fd(handle) ((handle)->io_watcher.fd)
#endif /* defined(__APPLE__) */
#ifdef UV__O_NONBLOCK
# define UV__F_NONBLOCK UV__O_NONBLOCK
#else

17
deps/uv/src/unix/pipe.c

@ -57,7 +57,7 @@ int uv_pipe_bind(uv_pipe_t* handle, const char* name) {
bound = 0;
/* Already bound? */
if (handle->io_watcher.fd >= 0) {
if (uv__stream_fd(handle) >= 0) {
uv__set_artificial_error(handle->loop, UV_EINVAL);
goto out;
}
@ -117,13 +117,13 @@ int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb) {
saved_errno = errno;
status = -1;
if (handle->io_watcher.fd == -1) {
if (uv__stream_fd(handle) == -1) {
uv__set_artificial_error(handle->loop, UV_EINVAL);
goto out;
}
assert(handle->io_watcher.fd >= 0);
assert(uv__stream_fd(handle) >= 0);
if ((status = listen(handle->io_watcher.fd, backlog)) == -1) {
if ((status = listen(uv__stream_fd(handle), backlog)) == -1) {
uv__set_sys_error(handle->loop, errno);
} else {
handle->connection_cb = cb;
@ -172,7 +172,7 @@ void uv_pipe_connect(uv_connect_t* req,
int r;
saved_errno = errno;
new_sock = (handle->io_watcher.fd == -1);
new_sock = (uv__stream_fd(handle) == -1);
err = -1;
if (new_sock)
@ -187,7 +187,8 @@ void uv_pipe_connect(uv_connect_t* req,
* is either there or not.
*/
do {
r = connect(handle->io_watcher.fd, (struct sockaddr*)&saddr, sizeof saddr);
r = connect(uv__stream_fd(handle),
(struct sockaddr*)&saddr, sizeof saddr);
}
while (r == -1 && errno == EINTR);
@ -196,7 +197,7 @@ void uv_pipe_connect(uv_connect_t* req,
if (new_sock)
if (uv__stream_open((uv_stream_t*)handle,
handle->io_watcher.fd,
uv__stream_fd(handle),
UV_STREAM_READABLE | UV_STREAM_WRITABLE))
goto out;
@ -233,7 +234,7 @@ static void uv__pipe_accept(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
assert(pipe->type == UV_NAMED_PIPE);
sockfd = uv__accept(pipe->io_watcher.fd);
sockfd = uv__accept(uv__stream_fd(pipe));
if (sockfd == -1) {
if (errno != EAGAIN && errno != EWOULDBLOCK) {
uv__set_sys_error(pipe->loop, errno);

9
deps/uv/src/unix/process.c

@ -204,7 +204,7 @@ static int uv__process_init_stdio(uv_stdio_container_t* container, int fds[2]) {
if (container->flags & UV_INHERIT_FD) {
fd = container->data.fd;
} else {
fd = container->data.stream->io_watcher.fd;
fd = uv__stream_fd(container->data.stream);
}
if (fd == -1) {
@ -363,10 +363,11 @@ int uv_spawn(uv_loop_t* loop,
int i;
assert(options.file != NULL);
assert(!(options.flags & ~(UV_PROCESS_WINDOWS_VERBATIM_ARGUMENTS |
UV_PROCESS_DETACHED |
assert(!(options.flags & ~(UV_PROCESS_DETACHED |
UV_PROCESS_SETGID |
UV_PROCESS_SETUID)));
UV_PROCESS_SETUID |
UV_PROCESS_WINDOWS_HIDE |
UV_PROCESS_WINDOWS_VERBATIM_ARGUMENTS)));
uv__handle_init(loop, (uv_handle_t*)process, UV_PROCESS);
ngx_queue_init(&process->queue);

349
deps/uv/src/unix/stream.c

@ -34,6 +34,26 @@
#include <sys/un.h>
#include <unistd.h>
#if defined(__APPLE__)
# include <sys/event.h>
# include <sys/time.h>
# include <sys/select.h>
/* Forward declaration */
typedef struct uv__stream_select_s uv__stream_select_t;
struct uv__stream_select_s {
uv_stream_t* stream;
uv_thread_t thread;
uv_sem_t sem;
uv_mutex_t mutex;
uv_async_t async;
int events;
int fake_fd;
int int_fd;
int fd;
};
#endif /* defined(__APPLE__) */
static void uv__stream_connect(uv_stream_t*);
static void uv__write(uv_stream_t* stream);
@ -96,23 +116,231 @@ void uv__stream_init(uv_loop_t* loop,
if (loop->emfile_fd == -1)
loop->emfile_fd = uv__open_cloexec("/", O_RDONLY);
#if defined(__APPLE__)
stream->select = NULL;
#endif /* defined(__APPLE_) */
uv__io_init(&stream->io_watcher, uv__stream_io, -1);
}
int uv__stream_open(uv_stream_t* stream, int fd, int flags) {
socklen_t yes;
#if defined(__APPLE__)
void uv__stream_osx_select(void* arg) {
uv_stream_t* stream;
uv__stream_select_t* s;
fd_set read;
fd_set write;
fd_set error;
struct timeval timeout;
int events;
int fd;
int r;
int max_fd;
assert(fd >= 0);
stream->flags |= flags;
stream = arg;
s = stream->select;
fd = stream->io_watcher.fd;
if (stream->type == UV_TCP) {
/* Reuse the port address if applicable. */
yes = 1;
if (fd > s->int_fd)
max_fd = fd;
else
max_fd = s->int_fd;
while (1) {
/* Terminate on semaphore */
if (uv_sem_trywait(&s->sem) == 0)
break;
/* Watch fd using select(2) */
FD_ZERO(&read);
FD_ZERO(&write);
FD_ZERO(&error);
if (uv_is_readable(stream))
FD_SET(fd, &read);
if (uv_is_writable(stream))
FD_SET(fd, &write);
FD_SET(fd, &error);
FD_SET(s->int_fd, &read);
timeout.tv_sec = 0;
timeout.tv_usec = 250000; /* 250 ms timeout */
r = select(max_fd + 1, &read, &write, &error, &timeout);
if (r == -1) {
if (errno == EINTR)
continue;
/* XXX: Possible?! */
abort();
}
/* Ignore timeouts */
if (r == 0)
continue;
/* Handle events */
events = 0;
if (FD_ISSET(fd, &read))
events |= UV__POLLIN;
if (FD_ISSET(fd, &write))
events |= UV__POLLOUT;
if (FD_ISSET(fd, &error))
events |= UV__POLLERR;
uv_mutex_lock(&s->mutex);
s->events |= events;
uv_mutex_unlock(&s->mutex);
if (events != 0)
uv_async_send(&s->async);
}
}
void uv__stream_osx_interrupt_select(uv_stream_t* stream) {
/* Notify select() thread about state change */
uv__stream_select_t* s;
int r;
s = stream->select;
/* Interrupt select() loop
* NOTE: fake_fd and int_fd are socketpair(), thus writing to one will
* emit read event on other side
*/
do
r = write(s->fake_fd, "x", 1);
while (r == -1 && errno == EINTR);
assert(r == 1);
}
void uv__stream_osx_select_cb(uv_async_t* handle, int status) {
uv__stream_select_t* s;
uv_stream_t* stream;
int events;
s = container_of(handle, uv__stream_select_t, async);
stream = s->stream;
/* Get and reset stream's events */
uv_mutex_lock(&s->mutex);
events = s->events;
s->events = 0;
uv_mutex_unlock(&s->mutex);
assert(0 == (events & UV__POLLERR));
/* Invoke callback on event-loop */
if ((events & UV__POLLIN) && uv__io_active(&stream->io_watcher, UV__POLLIN))
uv__stream_io(stream->loop, &stream->io_watcher, UV__POLLIN);
if ((events & UV__POLLOUT) && uv__io_active(&stream->io_watcher, UV__POLLOUT))
uv__stream_io(stream->loop, &stream->io_watcher, UV__POLLOUT);
}
void uv__stream_osx_cb_close(uv_handle_t* async) {
uv__stream_select_t* s;
s = container_of(async, uv__stream_select_t, async);
free(s);
}
int uv__stream_try_select(uv_stream_t* stream, int fd) {
/*
* kqueue doesn't work with some files from /dev mount on osx.
* select(2) in separate thread for those fds
*/
struct kevent filter[1];
struct kevent events[1];
struct timespec timeout;
uv__stream_select_t* s;
int fds[2];
int ret;
int kq;
kq = kqueue();
if (kq == -1) {
fprintf(stderr, "(libuv) Failed to create kqueue (%d)\n", errno);
return uv__set_sys_error(stream->loop, errno);
}
EV_SET(&filter[0], fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, 0);
/* Use small timeout, because we only want to capture EINVALs */
timeout.tv_sec = 0;
timeout.tv_nsec = 1;
ret = kevent(kq, filter, 1, events, 1, &timeout);
SAVE_ERRNO(close(kq));
if (ret == -1)
return uv__set_sys_error(stream->loop, errno);
if ((events[0].flags & EV_ERROR) == 0 || events[0].data != EINVAL)
return 0;
/* At this point we definitely know that this fd won't work with kqueue */
s = malloc(sizeof(*s));
if (s == NULL)
return uv__set_artificial_error(stream->loop, UV_ENOMEM);
s->fd = fd;
if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof yes) == -1)
if (uv_async_init(stream->loop, &s->async, uv__stream_osx_select_cb)) {
SAVE_ERRNO(free(s));
return uv__set_sys_error(stream->loop, errno);
}
s->async.flags |= UV__HANDLE_INTERNAL;
uv__handle_unref(&s->async);
if (uv_sem_init(&s->sem, 0))
goto fatal1;
if (uv_mutex_init(&s->mutex))
goto fatal2;
/* Create fds for io watcher and to interrupt the select() loop. */
if (socketpair(AF_UNIX, SOCK_STREAM, 0, fds))
goto fatal3;
s->fake_fd = fds[0];
s->int_fd = fds[1];
if (uv_thread_create(&s->thread, uv__stream_osx_select, stream))
goto fatal4;
s->stream = stream;
stream->select = s;
return 0;
fatal4:
close(s->fake_fd);
close(s->int_fd);
s->fake_fd = -1;
s->int_fd = -1;
fatal3:
uv_mutex_destroy(&s->mutex);
fatal2:
uv_sem_destroy(&s->sem);
fatal1:
uv_close((uv_handle_t*) &s->async, uv__stream_osx_cb_close);
return uv__set_sys_error(stream->loop, errno);
}
#endif /* defined(__APPLE__) */
int uv__stream_open(uv_stream_t* stream, int fd, int flags) {
assert(fd >= 0);
stream->flags |= flags;
if (stream->type == UV_TCP) {
if ((stream->flags & UV_TCP_NODELAY) && uv__tcp_nodelay(fd, 1))
return uv__set_sys_error(stream->loop, errno);
@ -121,6 +349,21 @@ int uv__stream_open(uv_stream_t* stream, int fd, int flags) {
return uv__set_sys_error(stream->loop, errno);
}
#if defined(__APPLE__)
{
uv__stream_select_t* s;
int r;
r = uv__stream_try_select(stream, fd);
if (r == -1)
return r;
s = stream->select;
if (s != NULL)
fd = s->fake_fd;
}
#endif /* defined(__APPLE__) */
stream->io_watcher.fd = fd;
return 0;
@ -239,9 +482,9 @@ void uv__server_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
/* connection_cb can close the server socket while we're
* in the loop so check it on each iteration.
*/
while (stream->io_watcher.fd != -1) {
while (uv__stream_fd(stream) != -1) {
assert(stream->accepted_fd == -1);
fd = uv__accept(stream->io_watcher.fd);
fd = uv__accept(uv__stream_fd(stream));
if (fd == -1) {
switch (errno) {
@ -262,7 +505,7 @@ void uv__server_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
}
if (use_emfile_trick) {
SAVE_ERRNO(r = uv__emfile_trick(loop, stream->io_watcher.fd));
SAVE_ERRNO(r = uv__emfile_trick(loop, uv__stream_fd(stream)));
if (r == 0)
continue;
}
@ -394,7 +637,7 @@ static void uv__drain(uv_stream_t* stream) {
stream->shutdown_req = NULL;
uv__req_unregister(stream->loop, req);
if (shutdown(stream->io_watcher.fd, SHUT_WR)) {
if (shutdown(uv__stream_fd(stream), SHUT_WR)) {
/* Error. Report it. User should call uv_close(). */
uv__set_sys_error(stream->loop, errno);
if (req->cb) {
@ -458,7 +701,7 @@ static void uv__write(uv_stream_t* stream) {
start:
assert(stream->io_watcher.fd >= 0);
assert(uv__stream_fd(stream) >= 0);
/* Get the request at the head of the queue. */
req = uv_write_queue_head(stream);
@ -512,15 +755,15 @@ start:
}
do {
n = sendmsg(stream->io_watcher.fd, &msg, 0);
n = sendmsg(uv__stream_fd(stream), &msg, 0);
}
while (n == -1 && errno == EINTR);
} else {
do {
if (iovcnt == 1) {
n = write(stream->io_watcher.fd, iov[0].iov_base, iov[0].iov_len);
n = write(uv__stream_fd(stream), iov[0].iov_base, iov[0].iov_len);
} else {
n = writev(stream->io_watcher.fd, iov, iovcnt);
n = writev(uv__stream_fd(stream), iov, iovcnt);
}
}
while (n == -1 && errno == EINTR);
@ -669,11 +912,11 @@ static void uv__read(uv_stream_t* stream) {
assert(buf.len > 0);
assert(buf.base);
assert(stream->io_watcher.fd >= 0);
assert(uv__stream_fd(stream) >= 0);
if (stream->read_cb) {
do {
nread = read(stream->io_watcher.fd, buf.base, buf.len);
nread = read(uv__stream_fd(stream), buf.base, buf.len);
}
while (nread < 0 && errno == EINTR);
} else {
@ -689,7 +932,7 @@ static void uv__read(uv_stream_t* stream) {
msg.msg_control = (void *) cmsg_space;
do {
nread = recvmsg(stream->io_watcher.fd, &msg, 0);
nread = recvmsg(uv__stream_fd(stream), &msg, 0);
}
while (nread < 0 && errno == EINTR);
}
@ -798,7 +1041,7 @@ static void uv__read(uv_stream_t* stream) {
int uv_shutdown(uv_shutdown_t* req, uv_stream_t* stream, uv_shutdown_cb cb) {
assert((stream->type == UV_TCP || stream->type == UV_NAMED_PIPE) &&
"uv_shutdown (unix) only supports uv_handle_t right now");
assert(stream->io_watcher.fd >= 0);
assert(uv__stream_fd(stream) >= 0);
if (!(stream->flags & UV_STREAM_WRITABLE) ||
stream->flags & UV_STREAM_SHUT ||
@ -837,16 +1080,16 @@ static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
}
if (events & UV__POLLIN) {
assert(stream->io_watcher.fd >= 0);
assert(uv__stream_fd(stream) >= 0);
uv__read(stream);
if (stream->io_watcher.fd == -1)
if (uv__stream_fd(stream) == -1)
return; /* read_cb closed stream. */
}
if (events & UV__POLLOUT) {
assert(stream->io_watcher.fd >= 0);
assert(uv__stream_fd(stream) >= 0);
uv__write(stream);
uv__write_callbacks(stream);
}
@ -875,8 +1118,12 @@ static void uv__stream_connect(uv_stream_t* stream) {
stream->delayed_error = 0;
} else {
/* Normal situation: we need to get the socket error from the kernel. */
assert(stream->io_watcher.fd >= 0);
getsockopt(stream->io_watcher.fd, SOL_SOCKET, SO_ERROR, &error, &errorsize);
assert(uv__stream_fd(stream) >= 0);
getsockopt(uv__stream_fd(stream),
SOL_SOCKET,
SO_ERROR,
&error,
&errorsize);
}
if (error == EINPROGRESS)
@ -906,7 +1153,7 @@ int uv_write2(uv_write_t* req,
stream->type == UV_TTY) &&
"uv_write (unix) does not yet support other types of streams");
if (stream->io_watcher.fd < 0) {
if (uv__stream_fd(stream) < 0) {
uv__set_sys_error(stream->loop, EBADF);
return -1;
}
@ -989,11 +1236,17 @@ int uv__read_start_common(uv_stream_t* stream, uv_alloc_cb alloc_cb,
*/
stream->flags |= UV_STREAM_READING;
#if defined(__APPLE__)
/* Notify select() thread about state change */
if (stream->select != NULL)
uv__stream_osx_interrupt_select(stream);
#endif /* defined(__APPLE__) */
/* TODO: try to do the read inline? */
/* TODO: keep track of tcp state. If we've gotten a EOF then we should
* not start the IO watcher.
*/
assert(stream->io_watcher.fd >= 0);
assert(uv__stream_fd(stream) >= 0);
assert(alloc_cb);
stream->read_cb = read_cb;
@ -1023,6 +1276,13 @@ int uv_read_stop(uv_stream_t* stream) {
uv__io_stop(stream->loop, &stream->io_watcher, UV__POLLIN);
uv__handle_stop(stream);
stream->flags &= ~UV_STREAM_READING;
#if defined(__APPLE__)
/* Notify select() thread about state change */
if (stream->select != NULL)
uv__stream_osx_interrupt_select(stream);
#endif /* defined(__APPLE__) */
stream->read_cb = NULL;
stream->read2_cb = NULL;
stream->alloc_cb = NULL;
@ -1040,9 +1300,42 @@ int uv_is_writable(const uv_stream_t* stream) {
}
#if defined(__APPLE__)
int uv___stream_fd(uv_stream_t* handle) {
uv__stream_select_t* s;
s = handle->select;
if (s != NULL)
return s->fd;
return handle->io_watcher.fd;
}
#endif /* defined(__APPLE__) */
void uv__stream_close(uv_stream_t* handle) {
#if defined(__APPLE__)
/* Terminate select loop first */
if (handle->select != NULL) {
uv__stream_select_t* s;
s = handle->select;
uv_sem_post(&s->sem);
uv__stream_osx_interrupt_select(handle);
uv_thread_join(&s->thread);
uv_sem_destroy(&s->sem);
uv_mutex_destroy(&s->mutex);
close(s->fake_fd);
close(s->int_fd);
uv_close((uv_handle_t*) &s->async, uv__stream_osx_cb_close);
handle->select = NULL;
}
#endif /* defined(__APPLE__) */
uv_read_stop(handle);
uv__io_stop(handle->loop, &handle->io_watcher, UV__POLLOUT);
uv__io_close(handle->loop, &handle->io_watcher);
close(handle->io_watcher.fd);
handle->io_watcher.fd = -1;

50
deps/uv/src/unix/tcp.c

@ -37,7 +37,7 @@ int uv_tcp_init(uv_loop_t* loop, uv_tcp_t* tcp) {
static int maybe_new_socket(uv_tcp_t* handle, int domain, int flags) {
int sockfd;
if (handle->io_watcher.fd != -1)
if (uv__stream_fd(handle) != -1)
return 0;
sockfd = uv__socket(domain, SOCK_STREAM, 0);
@ -58,29 +58,21 @@ static int uv__bind(uv_tcp_t* tcp,
int domain,
struct sockaddr* addr,
int addrsize) {
int saved_errno;
int status;
saved_errno = errno;
status = -1;
int on;
if (maybe_new_socket(tcp, domain, UV_STREAM_READABLE|UV_STREAM_WRITABLE))
return -1;
tcp->delayed_error = 0;
if (bind(tcp->io_watcher.fd, addr, addrsize) == -1) {
if (errno == EADDRINUSE) {
tcp->delayed_error = errno;
} else {
uv__set_sys_error(tcp->loop, errno);
goto out;
}
}
status = 0;
on = 1;
if (setsockopt(tcp->io_watcher.fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)))
return uv__set_sys_error(tcp->loop, errno);
out:
errno = saved_errno;
return status;
errno = 0;
if (bind(tcp->io_watcher.fd, addr, addrsize) && errno != EADDRINUSE)
return uv__set_sys_error(tcp->loop, errno);
tcp->delayed_error = errno;
return 0;
}
@ -105,7 +97,7 @@ static int uv__connect(uv_connect_t* req,
handle->delayed_error = 0;
do
r = connect(handle->io_watcher.fd, addr, addrlen);
r = connect(uv__stream_fd(handle), addr, addrlen);
while (r == -1 && errno == EINTR);
if (r == -1) {
@ -174,7 +166,7 @@ int uv_tcp_getsockname(uv_tcp_t* handle, struct sockaddr* name,
goto out;
}
if (handle->io_watcher.fd < 0) {
if (uv__stream_fd(handle) < 0) {
uv__set_sys_error(handle->loop, EINVAL);
rv = -1;
goto out;
@ -183,7 +175,7 @@ int uv_tcp_getsockname(uv_tcp_t* handle, struct sockaddr* name,
/* sizeof(socklen_t) != sizeof(int) on some systems. */
socklen = (socklen_t)*namelen;
if (getsockname(handle->io_watcher.fd, name, &socklen) == -1) {
if (getsockname(uv__stream_fd(handle), name, &socklen) == -1) {
uv__set_sys_error(handle->loop, errno);
rv = -1;
} else {
@ -211,7 +203,7 @@ int uv_tcp_getpeername(uv_tcp_t* handle, struct sockaddr* name,
goto out;
}
if (handle->io_watcher.fd < 0) {
if (uv__stream_fd(handle) < 0) {
uv__set_sys_error(handle->loop, EINVAL);
rv = -1;
goto out;
@ -220,7 +212,7 @@ int uv_tcp_getpeername(uv_tcp_t* handle, struct sockaddr* name,
/* sizeof(socklen_t) != sizeof(int) on some systems. */
socklen = (socklen_t)*namelen;
if (getpeername(handle->io_watcher.fd, name, &socklen) == -1) {
if (getpeername(uv__stream_fd(handle), name, &socklen) == -1) {
uv__set_sys_error(handle->loop, errno);
rv = -1;
} else {
@ -320,8 +312,8 @@ int uv__tcp_keepalive(int fd, int on, unsigned int delay) {
int uv_tcp_nodelay(uv_tcp_t* handle, int on) {
if (handle->io_watcher.fd != -1)
if (uv__tcp_nodelay(handle->io_watcher.fd, on))
if (uv__stream_fd(handle) != -1)
if (uv__tcp_nodelay(uv__stream_fd(handle), on))
return -1;
if (on)
@ -334,8 +326,8 @@ int uv_tcp_nodelay(uv_tcp_t* handle, int on) {
int uv_tcp_keepalive(uv_tcp_t* handle, int on, unsigned int delay) {
if (handle->io_watcher.fd != -1)
if (uv__tcp_keepalive(handle->io_watcher.fd, on, delay))
if (uv__stream_fd(handle) != -1)
if (uv__tcp_keepalive(uv__stream_fd(handle), on, delay))
return -1;
if (on)
@ -343,7 +335,7 @@ int uv_tcp_keepalive(uv_tcp_t* handle, int on, unsigned int delay) {
else
handle->flags &= ~UV_TCP_KEEPALIVE;
/* TODO Store delay if handle->io_watcher.fd == -1 but don't want to enlarge
/* TODO Store delay if uv__stream_fd(handle) == -1 but don't want to enlarge
* uv_tcp_t with an int that's almost never used...
*/

143
deps/uv/src/unix/threadpool.c

@ -20,43 +20,48 @@
*/
#include "internal.h"
#include <stdlib.h>
#include <errno.h>
#include <pthread.h>
/* TODO add condvar support to libuv */
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
static pthread_once_t once = PTHREAD_ONCE_INIT;
static pthread_t threads[4];
static uv_once_t once = UV_ONCE_INIT;
static uv_cond_t cond;
static uv_mutex_t mutex;
static uv_thread_t threads[4];
static ngx_queue_t exit_message;
static ngx_queue_t wq = { &wq, &wq };
static ngx_queue_t wq;
static volatile int initialized;
static void* worker(void* arg) {
static void uv__cancelled(struct uv__work* w) {
abort();
}
/* To avoid deadlock with uv_cancel() it's crucial that the worker
* never holds the global mutex and the loop-local mutex at the same time.
*/
static void worker(void* arg) {
struct uv__work* w;
ngx_queue_t* q;
(void) arg;
for (;;) {
if (pthread_mutex_lock(&mutex))
abort();
uv_mutex_lock(&mutex);
while (ngx_queue_empty(&wq))
if (pthread_cond_wait(&cond, &mutex))
abort();
uv_cond_wait(&cond, &mutex);
q = ngx_queue_head(&wq);
if (q == &exit_message)
pthread_cond_signal(&cond);
else
uv_cond_signal(&cond);
else {
ngx_queue_remove(q);
ngx_queue_init(q); /* Signal uv_cancel() that the work req is
executing. */
}
if (pthread_mutex_unlock(&mutex))
abort();
uv_mutex_unlock(&mutex);
if (q == &exit_message)
break;
@ -65,36 +70,43 @@ static void* worker(void* arg) {
w->work(w);
uv_mutex_lock(&w->loop->wq_mutex);
w->work = NULL; /* Signal uv_cancel() that the work req is done
executing. */
ngx_queue_insert_tail(&w->loop->wq, &w->wq);
uv_mutex_unlock(&w->loop->wq_mutex);
uv_async_send(&w->loop->wq_async);
uv_mutex_unlock(&w->loop->wq_mutex);
}
return NULL;
}
static void post(ngx_queue_t* q) {
pthread_mutex_lock(&mutex);
uv_mutex_lock(&mutex);
ngx_queue_insert_tail(&wq, q);
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);
uv_cond_signal(&cond);
uv_mutex_unlock(&mutex);
}
static void init_once(void) {
unsigned int i;
if (uv_cond_init(&cond))
abort();
if (uv_mutex_init(&mutex))
abort();
ngx_queue_init(&wq);
for (i = 0; i < ARRAY_SIZE(threads); i++)
if (pthread_create(threads + i, NULL, worker, NULL))
if (uv_thread_create(threads + i, worker, NULL))
abort();
initialized = 1;
}
#if defined(__GNUC__)
__attribute__((destructor))
static void cleanup(void) {
unsigned int i;
@ -105,18 +117,21 @@ static void cleanup(void) {
post(&exit_message);
for (i = 0; i < ARRAY_SIZE(threads); i++)
if (pthread_join(threads[i], NULL))
if (uv_thread_join(threads + i))
abort();
uv_mutex_destroy(&mutex);
uv_cond_destroy(&cond);
initialized = 0;
}
#endif
void uv__work_submit(uv_loop_t* loop,
struct uv__work* w,
void (*work)(struct uv__work* w),
void (*done)(struct uv__work* w)) {
pthread_once(&once, init_once);
void (*done)(struct uv__work* w, int status)) {
uv_once(&once, init_once);
w->loop = loop;
w->work = work;
w->done = done;
@ -124,11 +139,37 @@ void uv__work_submit(uv_loop_t* loop,
}
int uv__work_cancel(uv_loop_t* loop, uv_req_t* req, struct uv__work* w) {
int cancelled;
uv_mutex_lock(&mutex);
uv_mutex_lock(&w->loop->wq_mutex);
cancelled = !ngx_queue_empty(&w->wq) && w->work != NULL;
if (cancelled)
ngx_queue_remove(&w->wq);
uv_mutex_unlock(&w->loop->wq_mutex);
uv_mutex_unlock(&mutex);
if (!cancelled)
return -1;
w->work = uv__cancelled;
uv_mutex_lock(&loop->wq_mutex);
ngx_queue_insert_tail(&loop->wq, &w->wq);
uv_mutex_unlock(&loop->wq_mutex);
return 0;
}
void uv__work_done(uv_async_t* handle, int status) {
struct uv__work* w;
uv_loop_t* loop;
ngx_queue_t* q;
ngx_queue_t wq;
int err;
loop = container_of(handle, uv_loop_t, wq_async);
ngx_queue_init(&wq);
@ -145,7 +186,8 @@ void uv__work_done(uv_async_t* handle, int status) {
ngx_queue_remove(q);
w = container_of(q, struct uv__work, wq);
w->done(w);
err = (w->work == uv__cancelled) ? -UV_ECANCELED : 0;
w->done(w, err);
}
}
@ -153,18 +195,23 @@ void uv__work_done(uv_async_t* handle, int status) {
static void uv__queue_work(struct uv__work* w) {
uv_work_t* req = container_of(w, uv_work_t, work_req);
if (req->work_cb)
req->work_cb(req);
}
static void uv__queue_done(struct uv__work* w) {
uv_work_t* req = container_of(w, uv_work_t, work_req);
static void uv__queue_done(struct uv__work* w, int status) {
uv_work_t* req;
req = container_of(w, uv_work_t, work_req);
uv__req_unregister(req->loop, req);
if (req->after_work_cb)
req->after_work_cb(req);
if (req->after_work_cb == NULL)
return;
if (status == -UV_ECANCELED)
uv__set_artificial_error(req->loop, UV_ECANCELED);
req->after_work_cb(req, status ? -1 : 0);
}
@ -172,6 +219,9 @@ int uv_queue_work(uv_loop_t* loop,
uv_work_t* req,
uv_work_cb work_cb,
uv_after_work_cb after_work_cb) {
if (work_cb == NULL)
return uv__set_artificial_error(loop, UV_EINVAL);
uv__req_init(loop, req, UV_WORK);
req->loop = loop;
req->work_cb = work_cb;
@ -179,3 +229,28 @@ int uv_queue_work(uv_loop_t* loop,
uv__work_submit(loop, &req->work_req, uv__queue_work, uv__queue_done);
return 0;
}
int uv_cancel(uv_req_t* req) {
struct uv__work* wreq;
uv_loop_t* loop;
switch (req->type) {
case UV_FS:
loop = ((uv_fs_t*) req)->loop;
wreq = &((uv_fs_t*) req)->work_req;
break;
case UV_GETADDRINFO:
loop = ((uv_getaddrinfo_t*) req)->loop;
wreq = &((uv_getaddrinfo_t*) req)->work_req;
break;
case UV_WORK:
loop = ((uv_work_t*) req)->loop;
wreq = &((uv_work_t*) req)->work_req;
break;
default:
return -1;
}
return uv__work_cancel(loop, req, wreq);
}

9
deps/uv/src/unix/timer.c

@ -102,13 +102,14 @@ int64_t uv_timer_get_repeat(uv_timer_t* handle) {
}
unsigned int uv__next_timeout(uv_loop_t* loop) {
uv_timer_t* handle;
int uv__next_timeout(const uv_loop_t* loop) {
const uv_timer_t* handle;
handle = RB_MIN(uv__timers, &loop->timer_handles);
/* RB_MIN expects a non-const tree root. That's okay, it doesn't modify it. */
handle = RB_MIN(uv__timers, (struct uv__timers*) &loop->timer_handles);
if (handle == NULL)
return (unsigned int) -1; /* block indefinitely */
return -1; /* block indefinitely */
if (handle->timeout <= loop->time)
return 0;

4
deps/uv/src/unix/tty.c

@ -54,7 +54,7 @@ int uv_tty_set_mode(uv_tty_t* tty, int mode) {
struct termios raw;
int fd;
fd = tty->io_watcher.fd;
fd = uv__stream_fd(tty);
if (mode && tty->mode == 0) {
/* on */
@ -105,7 +105,7 @@ fatal:
int uv_tty_get_winsize(uv_tty_t* tty, int* width, int* height) {
struct winsize ws;
if (ioctl(tty->io_watcher.fd, TIOCGWINSZ, &ws) < 0) {
if (ioctl(uv__stream_fd(tty), TIOCGWINSZ, &ws) < 0) {
uv__set_sys_error(tty->loop, errno);
return -1;
}

2
deps/uv/src/unix/udp.c

@ -40,7 +40,7 @@ static int uv__udp_send(uv_udp_send_t* req, uv_udp_t* handle, uv_buf_t bufs[],
void uv__udp_close(uv_udp_t* handle) {
uv__io_stop(handle->loop, &handle->io_watcher, UV__POLLIN | UV__POLLOUT);
uv__io_close(handle->loop, &handle->io_watcher);
uv__handle_stop(handle);
close(handle->io_watcher.fd);
handle->io_watcher.fd = -1;

10
deps/uv/src/win/core.c

@ -171,6 +171,16 @@ void uv_loop_delete(uv_loop_t* loop) {
}
int uv_backend_fd(const uv_loop_t* loop) {
return -1;
}
int uv_backend_timeout(const uv_loop_t* loop) {
return 0;
}
static void uv_poll(uv_loop_t* loop, int block) {
BOOL success;
DWORD bytes, timeout;

2
deps/uv/src/win/error.c

@ -109,6 +109,7 @@ uv_err_code uv_translate_sys_error(int sys_errno) {
case WSAECONNRESET: return UV_ECONNRESET;
case ERROR_ALREADY_EXISTS: return UV_EEXIST;
case ERROR_FILE_EXISTS: return UV_EEXIST;
case ERROR_BUFFER_OVERFLOW: return UV_EFAULT;
case WSAEFAULT: return UV_EFAULT;
case ERROR_HOST_UNREACHABLE: return UV_EHOSTUNREACH;
case WSAEHOSTUNREACH: return UV_EHOSTUNREACH;
@ -125,6 +126,7 @@ uv_err_code uv_translate_sys_error(int sys_errno) {
case ERROR_NETWORK_UNREACHABLE: return UV_ENETUNREACH;
case WSAENETUNREACH: return UV_ENETUNREACH;
case WSAENOBUFS: return UV_ENOBUFS;
case ERROR_NOT_ENOUGH_MEMORY: return UV_ENOMEM;
case ERROR_OUTOFMEMORY: return UV_ENOMEM;
case ERROR_CANNOT_MAKE: return UV_ENOSPC;
case ERROR_DISK_FULL: return UV_ENOSPC;

18
deps/uv/src/win/process.c

@ -777,10 +777,11 @@ int uv_spawn(uv_loop_t* loop, uv_process_t* process,
}
assert(options.file != NULL);
assert(!(options.flags & ~(UV_PROCESS_WINDOWS_VERBATIM_ARGUMENTS |
UV_PROCESS_DETACHED |
assert(!(options.flags & ~(UV_PROCESS_DETACHED |
UV_PROCESS_SETGID |
UV_PROCESS_SETUID)));
UV_PROCESS_SETUID |
UV_PROCESS_WINDOWS_HIDE |
UV_PROCESS_WINDOWS_VERBATIM_ARGUMENTS)));
uv_process_init(loop, process);
process->exit_cb = options.exit_cb;
@ -872,13 +873,22 @@ int uv_spawn(uv_loop_t* loop, uv_process_t* process,
startup.lpReserved = NULL;
startup.lpDesktop = NULL;
startup.lpTitle = NULL;
startup.dwFlags = STARTF_USESTDHANDLES;
startup.dwFlags = STARTF_USESTDHANDLES | STARTF_USESHOWWINDOW;
startup.cbReserved2 = uv__stdio_size(process->child_stdio_buffer);
startup.lpReserved2 = (BYTE*) process->child_stdio_buffer;
startup.hStdInput = uv__stdio_handle(process->child_stdio_buffer, 0);
startup.hStdOutput = uv__stdio_handle(process->child_stdio_buffer, 1);
startup.hStdError = uv__stdio_handle(process->child_stdio_buffer, 2);
if (options.flags & UV_PROCESS_WINDOWS_HIDE) {
/* Use SW_HIDE to avoid any potential process window. */
startup.wShowWindow = SW_HIDE;
} else {
startup.wShowWindow = SW_SHOWDEFAULT;
}
process_flags = CREATE_UNICODE_ENVIRONMENT;
if (options.flags & UV_PROCESS_DETACHED) {
process_flags |= DETACHED_PROCESS | CREATE_NEW_PROCESS_GROUP;

10
deps/uv/src/win/threadpool.c

@ -55,6 +55,9 @@ static DWORD WINAPI uv_work_thread_proc(void* parameter) {
int uv_queue_work(uv_loop_t* loop, uv_work_t* req, uv_work_cb work_cb,
uv_after_work_cb after_work_cb) {
if (work_cb == NULL)
return uv__set_artificial_error(loop, UV_EINVAL);
uv_work_req_init(loop, req, work_cb, after_work_cb);
if (!QueueUserWorkItem(&uv_work_thread_proc, req, WT_EXECUTELONGFUNCTION)) {
@ -67,8 +70,13 @@ int uv_queue_work(uv_loop_t* loop, uv_work_t* req, uv_work_cb work_cb,
}
int uv_cancel(uv_req_t* req) {
return -1;
}
void uv_process_work_req(uv_loop_t* loop, uv_work_t* req) {
uv__req_unregister(loop, req);
if(req->after_work_cb)
req->after_work_cb(req);
req->after_work_cb(req, 0);
}

250
deps/uv/src/win/util.c

@ -740,109 +740,201 @@ void uv_free_cpu_info(uv_cpu_info_t* cpu_infos, int count) {
}
uv_err_t uv_interface_addresses(uv_interface_address_t** addresses,
int* count) {
unsigned long size = 0;
IP_ADAPTER_ADDRESSES* adapter_addresses;
IP_ADAPTER_ADDRESSES* adapter_address;
uv_interface_address_t* address;
struct sockaddr* sock_addr;
int length;
char* name;
/* Use IP_ADAPTER_UNICAST_ADDRESS_XP to retain backwards compatibility */
/* with Windows XP */
IP_ADAPTER_UNICAST_ADDRESS_XP* unicast_address;
uv_err_t uv_interface_addresses(uv_interface_address_t** addresses_ptr,
int* count_ptr) {
IP_ADAPTER_ADDRESSES* win_address_buf;
ULONG win_address_buf_size;
IP_ADAPTER_ADDRESSES* win_address;
if (GetAdaptersAddresses(AF_UNSPEC, 0, NULL, NULL, &size)
!= ERROR_BUFFER_OVERFLOW) {
return uv__new_sys_error(GetLastError());
}
uv_interface_address_t* uv_address_buf;
char* name_buf;
size_t uv_address_buf_size;
uv_interface_address_t* uv_address;
adapter_addresses = (IP_ADAPTER_ADDRESSES*)malloc(size);
if (!adapter_addresses) {
uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
int count;
/* Fetch the size of the adapters reported by windows, and then get the */
/* list itself. */
win_address_buf_size = 0;
win_address_buf = NULL;
for (;;) {
ULONG r;
/* If win_address_buf is 0, then GetAdaptersAddresses will fail with */
/* ERROR_BUFFER_OVERFLOW, and the required buffer size will be stored in */
/* win_address_buf_size. */
r = GetAdaptersAddresses(AF_UNSPEC,
0,
NULL,
win_address_buf,
&win_address_buf_size);
if (r == ERROR_SUCCESS)
break;
free(win_address_buf);
switch (r) {
case ERROR_BUFFER_OVERFLOW:
/* This happens when win_address_buf is NULL or too small to hold */
/* all adapters. */
win_address_buf = malloc(win_address_buf_size);
if (win_address_buf == NULL)
return uv__new_artificial_error(UV_ENOMEM);
continue;
case ERROR_NO_DATA: {
/* No adapters were found. */
uv_address_buf = malloc(1);
if (uv_address_buf == NULL)
return uv__new_artificial_error(UV_ENOMEM);
*count_ptr = 0;
*addresses_ptr = uv_address_buf;
return uv_ok_;
}
if (GetAdaptersAddresses(AF_UNSPEC, 0, NULL, adapter_addresses, &size)
!= ERROR_SUCCESS) {
return uv__new_sys_error(GetLastError());
case ERROR_ADDRESS_NOT_ASSOCIATED:
return uv__new_artificial_error(UV_EAGAIN);
case ERROR_INVALID_PARAMETER:
/* MSDN says:
* "This error is returned for any of the following conditions: the
* SizePointer parameter is NULL, the Address parameter is not
* AF_INET, AF_INET6, or AF_UNSPEC, or the address information for
* the parameters requested is greater than ULONG_MAX."
* Since the first two conditions are not met, it must be that the
* adapter data is too big.
*/
return uv__new_artificial_error(UV_ENOBUFS);
default:
/* Other (unspecified) errors can happen, but we don't have any */
/* special meaning for them. */
assert(r != ERROR_SUCCESS);
return uv__new_sys_error(r);
}
}
/* Count the number of interfaces */
*count = 0;
/* Count the number of enabled interfaces and compute how much space is */
/* needed to store their info. */
count = 0;
uv_address_buf_size = 0;
for (adapter_address = adapter_addresses;
adapter_address != NULL;
adapter_address = adapter_address->Next) {
for (win_address = win_address_buf;
win_address != NULL;
win_address = win_address->Next) {
/* Use IP_ADAPTER_UNICAST_ADDRESS_XP to retain backwards compatibility */
/* with Windows XP */
IP_ADAPTER_UNICAST_ADDRESS_XP* unicast_address;
int name_size;
if (adapter_address->OperStatus != IfOperStatusUp)
/* Interfaces that are not 'up' should not be reported. Also skip */
/* interfaces that have no associated unicast address, as to avoid */
/* allocating space for the name for this interface. */
if (win_address->OperStatus != IfOperStatusUp ||
win_address->FirstUnicastAddress == NULL)
continue;
unicast_address = (IP_ADAPTER_UNICAST_ADDRESS_XP*)
adapter_address->FirstUnicastAddress;
/* Compute the size of the interface name. */
name_size = WideCharToMultiByte(CP_UTF8,
0,
win_address->FriendlyName,
-1,
NULL,
0,
NULL,
FALSE);
if (name_size <= 0) {
free(win_address_buf);
return uv__new_sys_error(GetLastError());
}
uv_address_buf_size += name_size;
while (unicast_address) {
(*count)++;
unicast_address = unicast_address->Next;
/* Count the number of addresses associated with this interface, and */
/* compute the size. */
for (unicast_address = (IP_ADAPTER_UNICAST_ADDRESS_XP*)
win_address->FirstUnicastAddress;
unicast_address != NULL;
unicast_address = unicast_address->Next) {
count++;
uv_address_buf_size += sizeof(uv_interface_address_t);
}
}
*addresses = (uv_interface_address_t*)
malloc(*count * sizeof(uv_interface_address_t));
if (!(*addresses)) {
uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
/* Allocate space to store interface data plus adapter names. */
uv_address_buf = malloc(uv_address_buf_size);
if (uv_address_buf == NULL) {
free(win_address_buf);
return uv__new_artificial_error(UV_ENOMEM);
}
address = *addresses;
/* Compute the start of the uv_interface_address_t array, and the place in */
/* the buffer where the interface names will be stored. */
uv_address = uv_address_buf;
name_buf = (char*) (uv_address_buf + count);
for (adapter_address = adapter_addresses;
adapter_address != NULL;
adapter_address = adapter_address->Next) {
/* Fill out the output buffer. */
for (win_address = win_address_buf;
win_address != NULL;
win_address = win_address->Next) {
IP_ADAPTER_UNICAST_ADDRESS_XP* unicast_address;
int name_size;
size_t max_name_size;
if (adapter_address->OperStatus != IfOperStatusUp)
if (win_address->OperStatus != IfOperStatusUp ||
win_address->FirstUnicastAddress == NULL)
continue;
name = NULL;
unicast_address = (IP_ADAPTER_UNICAST_ADDRESS_XP*)
adapter_address->FirstUnicastAddress;
while (unicast_address) {
sock_addr = unicast_address->Address.lpSockaddr;
if (sock_addr->sa_family == AF_INET6) {
address->address.address6 = *((struct sockaddr_in6 *)sock_addr);
} else {
address->address.address4 = *((struct sockaddr_in *)sock_addr);
/* Convert the interface name to UTF8. */
max_name_size = (char*) uv_address_buf + uv_address_buf_size - name_buf;
if (max_name_size > (size_t) INT_MAX)
max_name_size = INT_MAX;
name_size = WideCharToMultiByte(CP_UTF8,
0,
win_address->FriendlyName,
-1,
name_buf,
(int) max_name_size,
NULL,
FALSE);
if (name_size <= 0) {
free(win_address_buf);
free(uv_address_buf);
return uv__new_sys_error(GetLastError());
}
address->is_internal =
adapter_address->IfType == IF_TYPE_SOFTWARE_LOOPBACK ? 1 : 0;
/* Add an uv_interface_address_t element for every unicast address. */
for (unicast_address = (IP_ADAPTER_UNICAST_ADDRESS_XP*)
win_address->FirstUnicastAddress;
unicast_address != NULL;
unicast_address = unicast_address->Next) {
struct sockaddr* sa;
if (!name) {
/* Convert FriendlyName to utf8 */
length = uv_utf16_to_utf8(adapter_address->FriendlyName, -1, NULL, 0);
if (length) {
name = (char*)malloc(length);
if (!name) {
uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
}
uv_address->name = name_buf;
if (!uv_utf16_to_utf8(adapter_address->FriendlyName, -1, name,
length)) {
free(name);
name = NULL;
}
}
}
sa = unicast_address->Address.lpSockaddr;
if (sa->sa_family == AF_INET6)
uv_address->address.address6 = *((struct sockaddr_in6 *) sa);
else
uv_address->address.address4 = *((struct sockaddr_in *) sa);
assert(name);
address->name = name;
uv_address->is_internal =
(win_address->IfType == IF_TYPE_SOFTWARE_LOOPBACK);
unicast_address = unicast_address->Next;
address++;
uv_address++;
}
name_buf += name_size;
}
free(adapter_addresses);
free(win_address_buf);
*addresses_ptr = uv_address_buf;
*count_ptr = count;
return uv_ok_;
}
@ -850,15 +942,5 @@ uv_err_t uv_interface_addresses(uv_interface_address_t** addresses,
void uv_free_interface_addresses(uv_interface_address_t* addresses,
int count) {
int i;
char* freed_name = NULL;
for (i = 0; i < count; i++) {
if (freed_name != addresses[i].name) {
freed_name = addresses[i].name;
free(freed_name);
}
}
free(addresses);
}

7
deps/uv/test/test-condvar-consumer-producer.c

@ -82,9 +82,6 @@ static void producer(void* arg) {
uv_cond_signal(&full);
uv_mutex_unlock(&mutex);
}
LOGF("finished_consumers: %d\n", finished_consumers);
ASSERT(finished_consumers == MAX_CONSUMERS);
}
@ -129,6 +126,10 @@ TEST_IMPL(consumer_producer) {
}
ASSERT(0 == uv_thread_join(&pthread));
LOGF("finished_consumers: %d\n", finished_consumers);
ASSERT(finished_consumers == MAX_CONSUMERS);
uv_cond_destroy(&empty);
uv_cond_destroy(&full);
uv_mutex_destroy(&mutex);

132
deps/uv/test/test-embed.c

@ -0,0 +1,132 @@
/* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to
* deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
* sell copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
* IN THE SOFTWARE.
*/
#include "uv.h"
#include "task.h"
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#ifndef HAVE_KQUEUE
# if __APPLE__ || __DragonFly__ || __FreeBSD__ || __OpenBSD__ || __NetBSD__
# define HAVE_KQUEUE 1
# endif
#endif
#ifndef HAVE_EPOLL
# if defined(__linux__)
# define HAVE_EPOLL 1
# endif
#endif
#if defined(HAVE_KQUEUE) || defined(HAVE_EPOLL)
#if defined(HAVE_KQUEUE)
# include <sys/types.h>
# include <sys/event.h>
# include <sys/time.h>
#endif
#if defined(HAVE_EPOLL)
# include <sys/epoll.h>
#endif
static uv_thread_t embed_thread;
static uv_sem_t embed_sem;
static uv_timer_t embed_timer;
static uv_async_t embed_async;
static volatile int embed_closed;
static int embed_timer_called;
static void embed_thread_runner(void* arg) {
int r;
int fd;
int timeout;
while (!embed_closed) {
fd = uv_backend_fd(uv_default_loop());
timeout = uv_backend_timeout(uv_default_loop());
do {
#if defined(HAVE_KQUEUE)
struct timespec ts;
ts.tv_sec = timeout / 1000;
ts.tv_nsec = (timeout % 1000) * 1000000;
r = kevent(fd, NULL, 0, NULL, 0, &ts);
#elif defined(HAVE_EPOLL)
r = epoll_wait(fd, NULL, 0, timeout);
#endif
} while (r == -1 && errno == EINTR);
uv_async_send(&embed_async);
uv_sem_wait(&embed_sem);
}
}
static void embed_cb(uv_async_t* async, int status) {
uv_run_once(uv_default_loop());
uv_sem_post(&embed_sem);
}
static void embed_timer_cb(uv_timer_t* timer, int status) {
embed_timer_called++;
embed_closed = 1;
uv_close((uv_handle_t*) &embed_async, NULL);
}
#endif
TEST_IMPL(embed) {
#if defined(HAVE_KQUEUE) || defined(HAVE_EPOLL)
uv_loop_t* external;
external = uv_loop_new();
ASSERT(external != NULL);
embed_timer_called = 0;
embed_closed = 0;
uv_async_init(external, &embed_async, embed_cb);
/* Start timer in default loop */
uv_timer_init(uv_default_loop(), &embed_timer);
uv_timer_start(&embed_timer, embed_timer_cb, 250, 0);
/* Start worker that will interrupt external loop */
uv_sem_init(&embed_sem, 0);
uv_thread_create(&embed_thread, embed_thread_runner, NULL);
/* But run external loop */
uv_run(external);
uv_thread_join(&embed_thread);
uv_loop_delete(external);
ASSERT(embed_timer_called == 1);
#endif
return 0;
}

15
deps/uv/test/test-list.h

@ -67,6 +67,7 @@ TEST_DECLARE (tcp_flags)
TEST_DECLARE (tcp_write_error)
TEST_DECLARE (tcp_write_to_half_open_connection)
TEST_DECLARE (tcp_unexpected_read)
TEST_DECLARE (tcp_read_stop)
TEST_DECLARE (tcp_bind6_error_addrinuse)
TEST_DECLARE (tcp_bind6_error_addrnotavail)
TEST_DECLARE (tcp_bind6_error_fault)
@ -124,6 +125,7 @@ TEST_DECLARE (pipe_ref3)
TEST_DECLARE (pipe_ref4)
TEST_DECLARE (process_ref)
TEST_DECLARE (active)
TEST_DECLARE (embed)
TEST_DECLARE (async)
TEST_DECLARE (get_currentexe)
TEST_DECLARE (process_title)
@ -184,7 +186,11 @@ TEST_DECLARE (fs_readdir_file)
TEST_DECLARE (fs_open_dir)
TEST_DECLARE (fs_rename_to_existing_file)
TEST_DECLARE (threadpool_queue_work_simple)
TEST_DECLARE (threadpool_queue_work_einval)
TEST_DECLARE (threadpool_multiple_event_loops)
TEST_DECLARE (threadpool_cancel_getaddrinfo)
TEST_DECLARE (threadpool_cancel_work)
TEST_DECLARE (threadpool_cancel_fs)
TEST_DECLARE (thread_mutex)
TEST_DECLARE (thread_rwlock)
TEST_DECLARE (thread_create)
@ -284,6 +290,9 @@ TASK_LIST_START
TEST_ENTRY (tcp_write_to_half_open_connection)
TEST_ENTRY (tcp_unexpected_read)
TEST_ENTRY (tcp_read_stop)
TEST_HELPER (tcp_read_stop, tcp4_echo_server)
TEST_ENTRY (tcp_bind6_error_addrinuse)
TEST_ENTRY (tcp_bind6_error_addrnotavail)
TEST_ENTRY (tcp_bind6_error_fault)
@ -362,6 +371,8 @@ TASK_LIST_START
TEST_ENTRY (active)
TEST_ENTRY (embed)
TEST_ENTRY (async)
TEST_ENTRY (get_currentexe)
@ -448,7 +459,11 @@ TASK_LIST_START
TEST_ENTRY (fs_open_dir)
TEST_ENTRY (fs_rename_to_existing_file)
TEST_ENTRY (threadpool_queue_work_simple)
TEST_ENTRY (threadpool_queue_work_einval)
TEST_ENTRY (threadpool_multiple_event_loops)
TEST_ENTRY (threadpool_cancel_getaddrinfo)
TEST_ENTRY (threadpool_cancel_work)
TEST_ENTRY (threadpool_cancel_fs)
TEST_ENTRY (thread_mutex)
TEST_ENTRY (thread_rwlock)
TEST_ENTRY (thread_create)

73
deps/uv/test/test-tcp-read-stop.c

@ -0,0 +1,73 @@
/* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to
* deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
* sell copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
* IN THE SOFTWARE.
*/
#include "uv.h"
#include "task.h"
static uv_timer_t timer_handle;
static uv_tcp_t tcp_handle;
static uv_write_t write_req;
static void fail_cb(void) {
ASSERT(0 && "fail_cb called");
}
static void write_cb(uv_write_t* req, int status) {
uv_close((uv_handle_t*) &timer_handle, NULL);
uv_close((uv_handle_t*) &tcp_handle, NULL);
}
static void timer_cb(uv_timer_t* handle, int status) {
uv_buf_t buf = uv_buf_init("PING", 4);
ASSERT(0 == uv_write(&write_req,
(uv_stream_t*) &tcp_handle,
&buf,
1,
write_cb));
ASSERT(0 == uv_read_stop((uv_stream_t*) &tcp_handle));
}
static void connect_cb(uv_connect_t* req, int status) {
ASSERT(0 == status);
ASSERT(0 == uv_timer_start(&timer_handle, timer_cb, 50, 0));
ASSERT(0 == uv_read_start((uv_stream_t*) &tcp_handle,
(uv_alloc_cb) fail_cb,
(uv_read_cb) fail_cb));
}
TEST_IMPL(tcp_read_stop) {
uv_connect_t connect_req;
struct sockaddr_in addr;
addr = uv_ip4_addr("127.0.0.1", TEST_PORT);
ASSERT(0 == uv_timer_init(uv_default_loop(), &timer_handle));
ASSERT(0 == uv_tcp_init(uv_default_loop(), &tcp_handle));
ASSERT(0 == uv_tcp_connect(&connect_req, &tcp_handle, addr, connect_cb));
ASSERT(0 == uv_run(uv_default_loop()));
MAKE_VALGRIND_HAPPY();
return 0;
}

266
deps/uv/test/test-threadpool-cancel.c

@ -0,0 +1,266 @@
/* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to
* deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
* sell copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
* IN THE SOFTWARE.
*/
#include "uv.h"
#include "task.h"
#define INIT_CANCEL_INFO(ci, what) \
do { \
(ci)->reqs = (what); \
(ci)->nreqs = ARRAY_SIZE(what); \
(ci)->stride = sizeof((what)[0]); \
} \
while (0)
struct cancel_info {
void* reqs;
unsigned nreqs;
unsigned stride;
uv_timer_t timer_handle;
};
static uv_cond_t signal_cond;
static uv_mutex_t signal_mutex;
static uv_mutex_t wait_mutex;
static unsigned num_threads;
static unsigned fs_cb_called;
static unsigned work_cb_called;
static unsigned done_cb_called;
static unsigned done2_cb_called;
static unsigned timer_cb_called;
static unsigned getaddrinfo_cb_called;
static void work_cb(uv_work_t* req) {
uv_mutex_lock(&signal_mutex);
uv_cond_signal(&signal_cond);
uv_mutex_unlock(&signal_mutex);
uv_mutex_lock(&wait_mutex);
uv_mutex_unlock(&wait_mutex);
work_cb_called++;
}
static void done_cb(uv_work_t* req, int status) {
done_cb_called++;
free(req);
}
static void saturate_threadpool(void) {
uv_work_t* req;
ASSERT(0 == uv_cond_init(&signal_cond));
ASSERT(0 == uv_mutex_init(&signal_mutex));
ASSERT(0 == uv_mutex_init(&wait_mutex));
uv_mutex_lock(&signal_mutex);
uv_mutex_lock(&wait_mutex);
for (num_threads = 0; /* empty */; num_threads++) {
req = malloc(sizeof(*req));
ASSERT(req != NULL);
ASSERT(0 == uv_queue_work(uv_default_loop(), req, work_cb, done_cb));
/* Expect to get signalled within 350 ms, otherwise assume that
* the thread pool is saturated. As with any timing dependent test,
* this is obviously not ideal.
*/
if (uv_cond_timedwait(&signal_cond, &signal_mutex, 350 * 1e6)) {
ASSERT(0 == uv_cancel((uv_req_t*) req));
break;
}
}
}
static void unblock_threadpool(void) {
uv_mutex_unlock(&signal_mutex);
uv_mutex_unlock(&wait_mutex);
}
static void cleanup_threadpool(void) {
ASSERT(done_cb_called == num_threads + 1); /* +1 == cancelled work req. */
ASSERT(work_cb_called == num_threads);
uv_cond_destroy(&signal_cond);
uv_mutex_destroy(&signal_mutex);
uv_mutex_destroy(&wait_mutex);
}
static void fs_cb(uv_fs_t* req) {
ASSERT(req->errorno == UV_ECANCELED);
uv_fs_req_cleanup(req);
fs_cb_called++;
}
static void getaddrinfo_cb(uv_getaddrinfo_t* req,
int status,
struct addrinfo* res) {
ASSERT(UV_ECANCELED == uv_last_error(req->loop).code);
ASSERT(UV_ECANCELED == status);
getaddrinfo_cb_called++;
}
static void work2_cb(uv_work_t* req) {
ASSERT(0 && "work2_cb called");
}
static void done2_cb(uv_work_t* req, int status) {
ASSERT(uv_last_error(req->loop).code == UV_ECANCELED);
ASSERT(status == -1);
done2_cb_called++;
}
static void timer_cb(uv_timer_t* handle, int status) {
struct cancel_info* ci;
uv_req_t* req;
unsigned i;
ci = container_of(handle, struct cancel_info, timer_handle);
for (i = 0; i < ci->nreqs; i++) {
req = (uv_req_t*) ((char*) ci->reqs + i * ci->stride);
ASSERT(0 == uv_cancel(req));
}
uv_close((uv_handle_t*) &ci->timer_handle, NULL);
unblock_threadpool();
timer_cb_called++;
}
TEST_IMPL(threadpool_cancel_getaddrinfo) {
uv_getaddrinfo_t reqs[4];
struct cancel_info ci;
struct addrinfo hints;
uv_loop_t* loop;
int r;
INIT_CANCEL_INFO(&ci, reqs);
loop = uv_default_loop();
saturate_threadpool();
r = uv_getaddrinfo(loop, reqs + 0, getaddrinfo_cb, "fail", NULL, NULL);
ASSERT(r == 0);
r = uv_getaddrinfo(loop, reqs + 1, getaddrinfo_cb, NULL, "fail", NULL);
ASSERT(r == 0);
r = uv_getaddrinfo(loop, reqs + 2, getaddrinfo_cb, "fail", "fail", NULL);
ASSERT(r == 0);
r = uv_getaddrinfo(loop, reqs + 3, getaddrinfo_cb, "fail", NULL, &hints);
ASSERT(r == 0);
ASSERT(0 == uv_timer_init(loop, &ci.timer_handle));
ASSERT(0 == uv_timer_start(&ci.timer_handle, timer_cb, 10, 0));
ASSERT(0 == uv_run(loop));
ASSERT(1 == timer_cb_called);
cleanup_threadpool();
return 0;
}
TEST_IMPL(threadpool_cancel_work) {
struct cancel_info ci;
uv_work_t reqs[16];
uv_loop_t* loop;
unsigned i;
INIT_CANCEL_INFO(&ci, reqs);
loop = uv_default_loop();
saturate_threadpool();
for (i = 0; i < ARRAY_SIZE(reqs); i++)
ASSERT(0 == uv_queue_work(loop, reqs + i, work2_cb, done2_cb));
ASSERT(0 == uv_timer_init(loop, &ci.timer_handle));
ASSERT(0 == uv_timer_start(&ci.timer_handle, timer_cb, 10, 0));
ASSERT(0 == uv_run(loop));
ASSERT(1 == timer_cb_called);
ASSERT(ARRAY_SIZE(reqs) == done2_cb_called);
cleanup_threadpool();
return 0;
}
TEST_IMPL(threadpool_cancel_fs) {
struct cancel_info ci;
uv_fs_t reqs[25];
uv_loop_t* loop;
unsigned n;
INIT_CANCEL_INFO(&ci, reqs);
loop = uv_default_loop();
saturate_threadpool();
/* Needs to match ARRAY_SIZE(fs_reqs). */
n = 0;
ASSERT(0 == uv_fs_chmod(loop, reqs + n++, "/", 0, fs_cb));
ASSERT(0 == uv_fs_chown(loop, reqs + n++, "/", 0, 0, fs_cb));
ASSERT(0 == uv_fs_close(loop, reqs + n++, 0, fs_cb));
ASSERT(0 == uv_fs_fchmod(loop, reqs + n++, 0, 0, fs_cb));
ASSERT(0 == uv_fs_fchown(loop, reqs + n++, 0, 0, 0, fs_cb));
ASSERT(0 == uv_fs_fdatasync(loop, reqs + n++, 0, fs_cb));
ASSERT(0 == uv_fs_fstat(loop, reqs + n++, 0, fs_cb));
ASSERT(0 == uv_fs_fsync(loop, reqs + n++, 0, fs_cb));
ASSERT(0 == uv_fs_ftruncate(loop, reqs + n++, 0, 0, fs_cb));
ASSERT(0 == uv_fs_futime(loop, reqs + n++, 0, 0, 0, fs_cb));
ASSERT(0 == uv_fs_link(loop, reqs + n++, "/", "/", fs_cb));
ASSERT(0 == uv_fs_lstat(loop, reqs + n++, "/", fs_cb));
ASSERT(0 == uv_fs_mkdir(loop, reqs + n++, "/", 0, fs_cb));
ASSERT(0 == uv_fs_open(loop, reqs + n++, "/", 0, 0, fs_cb));
ASSERT(0 == uv_fs_read(loop, reqs + n++, 0, NULL, 0, 0, fs_cb));
ASSERT(0 == uv_fs_readdir(loop, reqs + n++, "/", 0, fs_cb));
ASSERT(0 == uv_fs_readlink(loop, reqs + n++, "/", fs_cb));
ASSERT(0 == uv_fs_rename(loop, reqs + n++, "/", "/", fs_cb));
ASSERT(0 == uv_fs_mkdir(loop, reqs + n++, "/", 0, fs_cb));
ASSERT(0 == uv_fs_sendfile(loop, reqs + n++, 0, 0, 0, 0, fs_cb));
ASSERT(0 == uv_fs_stat(loop, reqs + n++, "/", fs_cb));
ASSERT(0 == uv_fs_symlink(loop, reqs + n++, "/", "/", 0, fs_cb));
ASSERT(0 == uv_fs_unlink(loop, reqs + n++, "/", fs_cb));
ASSERT(0 == uv_fs_utime(loop, reqs + n++, "/", 0, 0, fs_cb));
ASSERT(0 == uv_fs_write(loop, reqs + n++, 0, NULL, 0, 0, fs_cb));
ASSERT(n == ARRAY_SIZE(reqs));
ASSERT(0 == uv_timer_init(loop, &ci.timer_handle));
ASSERT(0 == uv_timer_start(&ci.timer_handle, timer_cb, 10, 0));
ASSERT(0 == uv_run(loop));
ASSERT(n == fs_cb_called);
ASSERT(1 == timer_cb_called);
cleanup_threadpool();
return 0;
}

21
deps/uv/test/test-threadpool.c

@ -35,7 +35,8 @@ static void work_cb(uv_work_t* req) {
}
static void after_work_cb(uv_work_t* req) {
static void after_work_cb(uv_work_t* req, int status) {
ASSERT(status == 0);
ASSERT(req == &work_req);
ASSERT(req->data == &data);
after_work_cb_count++;
@ -56,3 +57,21 @@ TEST_IMPL(threadpool_queue_work_simple) {
MAKE_VALGRIND_HAPPY();
return 0;
}
TEST_IMPL(threadpool_queue_work_einval) {
int r;
work_req.data = &data;
r = uv_queue_work(uv_default_loop(), &work_req, NULL, after_work_cb);
ASSERT(r == -1);
uv_run(uv_default_loop());
ASSERT(uv_last_error(uv_default_loop()).code == UV_EINVAL);
ASSERT(work_cb_count == 0);
ASSERT(after_work_cb_count == 0);
MAKE_VALGRIND_HAPPY();
return 0;
}

3
deps/uv/uv.gyp

@ -250,6 +250,7 @@
'test/test-cwd-and-chdir.c',
'test/test-delayed-accept.c',
'test/test-error.c',
'test/test-embed.c',
'test/test-fail-always.c',
'test/test-fs.c',
'test/test-fs-event.c',
@ -298,7 +299,9 @@
'test/test-tcp-write-to-half-open-connection.c',
'test/test-tcp-writealot.c',
'test/test-tcp-unexpected-read.c',
'test/test-tcp-read-stop.c',
'test/test-threadpool.c',
'test/test-threadpool-cancel.c',
'test/test-mutexes.c',
'test/test-thread.c',
'test/test-barrier.c',

9
src/node_crypto.cc

@ -3728,9 +3728,9 @@ void EIO_PBKDF2After(pbkdf2_req* req, Local<Value> argv[2]) {
}
void EIO_PBKDF2After(uv_work_t* work_req) {
void EIO_PBKDF2After(uv_work_t* work_req, int status) {
assert(status == 0);
pbkdf2_req* req = container_of(work_req, pbkdf2_req, work_req);
HandleScope scope;
Local<Value> argv[2];
Persistent<Object> obj = req->obj;
@ -3902,16 +3902,15 @@ void RandomBytesCheck(RandomBytesRequest* req, Local<Value> argv[2]) {
}
void RandomBytesAfter(uv_work_t* work_req) {
void RandomBytesAfter(uv_work_t* work_req, int status) {
assert(status == 0);
RandomBytesRequest* req = container_of(work_req,
RandomBytesRequest,
work_req_);
HandleScope scope;
Local<Value> argv[2];
RandomBytesCheck(req, argv);
MakeCallback(req->obj_, "ondone", ARRAY_SIZE(argv), argv);
delete req;
}

4
src/node_zlib.cc

@ -213,7 +213,9 @@ class ZCtx : public ObjectWrap {
}
// v8 land!
static void After(uv_work_t* work_req) {
static void After(uv_work_t* work_req, int status) {
assert(status == 0);
HandleScope scope;
ZCtx *ctx = container_of(work_req, ZCtx, work_req_);

Loading…
Cancel
Save