diff --git a/deps/uv/include/uv-private/uv-unix.h b/deps/uv/include/uv-private/uv-unix.h index f50142c06e..42e39fcccf 100644 --- a/deps/uv/include/uv-private/uv-unix.h +++ b/deps/uv/include/uv-private/uv-unix.h @@ -108,6 +108,23 @@ typedef pthread_rwlock_t uv_rwlock_t; typedef UV_PLATFORM_SEM_T uv_sem_t; typedef pthread_cond_t uv_cond_t; + +#if defined(__APPLE__) && defined(__MACH__) + +typedef struct { + unsigned int n; + unsigned int count; + uv_mutex_t mutex; + uv_sem_t turnstile1; + uv_sem_t turnstile2; +} uv_barrier_t; + +#else /* defined(__APPLE__) && defined(__MACH__) */ + +typedef pthread_barrier_t uv_barrier_t; + +#endif /* defined(__APPLE__) && defined(__MACH__) */ + /* Platform-specific definitions for uv_spawn support. */ typedef gid_t uv_gid_t; typedef uid_t uv_uid_t; diff --git a/deps/uv/include/uv-private/uv-win.h b/deps/uv/include/uv-private/uv-win.h index 78dbb96b7d..10bd17ea4c 100644 --- a/deps/uv/include/uv-private/uv-win.h +++ b/deps/uv/include/uv-private/uv-win.h @@ -240,6 +240,14 @@ typedef union { } fallback_; } uv_rwlock_t; +typedef struct { + unsigned int n; + unsigned int count; + uv_mutex_t mutex; + uv_sem_t turnstile1; + uv_sem_t turnstile2; +} uv_barrier_t; + #define UV_ONCE_INIT { 0, NULL } typedef struct uv_once_s { diff --git a/deps/uv/include/uv.h b/deps/uv/include/uv.h index cebee681f0..99b69884ea 100644 --- a/deps/uv/include/uv.h +++ b/deps/uv/include/uv.h @@ -352,11 +352,11 @@ UV_EXTERN const char* uv_err_name(uv_err_t err); #define UV_REQ_FIELDS \ /* public */ \ void* data; \ + /* read-only */ \ + uv_req_type type; \ /* private */ \ ngx_queue_t active_queue; \ UV_REQ_PRIVATE_FIELDS \ - /* read-only */ \ - uv_req_type type; \ /* Abstract base class of all requests. */ struct uv_req_s { @@ -1262,14 +1262,6 @@ typedef struct uv_process_options_s { * `enum uv_process_flags` below. */ unsigned int flags; - /* - * Libuv can change the child process' user/group id. This happens only when - * the appropriate bits are set in the flags fields. This is not supported on - * windows; uv_spawn() will fail and set the error to UV_ENOTSUP. - */ - uv_uid_t uid; - uv_gid_t gid; - /* * The `stdio` field points to an array of uv_stdio_container_t structs that * describe the file descriptors that will be made available to the child @@ -1281,6 +1273,13 @@ typedef struct uv_process_options_s { */ int stdio_count; uv_stdio_container_t* stdio; + /* + * Libuv can change the child process' user/group id. This happens only when + * the appropriate bits are set in the flags fields. This is not supported on + * windows; uv_spawn() will fail and set the error to UV_ENOTSUP. + */ + uv_uid_t uid; + uv_gid_t gid; } uv_process_options_t; /* @@ -1822,6 +1821,10 @@ UV_EXTERN void uv_cond_wait(uv_cond_t* cond, uv_mutex_t* mutex); UV_EXTERN int uv_cond_timedwait(uv_cond_t* cond, uv_mutex_t* mutex, uint64_t timeout); +UV_EXTERN int uv_barrier_init(uv_barrier_t* barrier, unsigned int count); +UV_EXTERN void uv_barrier_destroy(uv_barrier_t* barrier); +UV_EXTERN void uv_barrier_wait(uv_barrier_t* barrier); + /* Runs a function once and only once. Concurrent calls to uv_once() with the * same guard will block all callers except one (it's unspecified which one). * The guard should be initialized statically with the UV_ONCE_INIT macro. diff --git a/deps/uv/src/unix/fs.c b/deps/uv/src/unix/fs.c index 1e1c653bd5..f0bf81afab 100644 --- a/deps/uv/src/unix/fs.c +++ b/deps/uv/src/unix/fs.c @@ -133,24 +133,6 @@ static ssize_t uv__fs_futime(uv_fs_t* req) { } -static ssize_t uv__fs_pwrite(uv_fs_t* req) { -#if defined(__APPLE__) - /* Serialize writes on OS X, concurrent pwrite() calls result in data loss. - * We can't use a per-file descriptor lock, the descriptor may be a dup(). - */ - static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER; - ssize_t r; - - pthread_mutex_lock(&lock); - r = pwrite(req->file, req->buf, req->len, req->off); - pthread_mutex_unlock(&lock); - - return r; -#else - return pwrite(req->file, req->buf, req->len, req->off); -#endif -} - static ssize_t uv__fs_read(uv_fs_t* req) { if (req->off < 0) return read(req->file, req->buf, req->len); @@ -447,10 +429,27 @@ static ssize_t uv__fs_utime(uv_fs_t* req) { static ssize_t uv__fs_write(uv_fs_t* req) { + ssize_t r; + + /* Serialize writes on OS X, concurrent write() and pwrite() calls result in + * data loss. We can't use a per-file descriptor lock, the descriptor may be + * a dup(). + */ +#if defined(__APPLE__) + static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER; + pthread_mutex_lock(&lock); +#endif + if (req->off < 0) - return write(req->file, req->buf, req->len); + r = write(req->file, req->buf, req->len); else - return uv__fs_pwrite(req); + r = pwrite(req->file, req->buf, req->len, req->off); + +#if defined(__APPLE__) + pthread_mutex_unlock(&lock); +#endif + + return r; } diff --git a/deps/uv/src/unix/thread.c b/deps/uv/src/unix/thread.c index f50961c921..0583cb474c 100644 --- a/deps/uv/src/unix/thread.c +++ b/deps/uv/src/unix/thread.c @@ -370,3 +370,83 @@ int uv_cond_timedwait(uv_cond_t* cond, uv_mutex_t* mutex, uint64_t timeout) { } #endif /* defined(__APPLE__) && defined(__MACH__) */ + + +#if defined(__APPLE__) && defined(__MACH__) + +int uv_barrier_init(uv_barrier_t* barrier, unsigned int count) { + barrier->n = count; + barrier->count = 0; + + if (uv_mutex_init(&barrier->mutex)) + return -1; + + if (uv_sem_init(&barrier->turnstile1, 0)) + goto error2; + + if (uv_sem_init(&barrier->turnstile2, 1)) + goto error; + + return 0; + +error: + uv_sem_destroy(&barrier->turnstile1); +error2: + uv_mutex_destroy(&barrier->mutex); + return -1; + +} + + +void uv_barrier_destroy(uv_barrier_t* barrier) { + uv_sem_destroy(&barrier->turnstile2); + uv_sem_destroy(&barrier->turnstile1); + uv_mutex_destroy(&barrier->mutex); +} + + +void uv_barrier_wait(uv_barrier_t* barrier) { + uv_mutex_lock(&barrier->mutex); + if (++barrier->count == barrier->n) { + uv_sem_wait(&barrier->turnstile2); + uv_sem_post(&barrier->turnstile1); + } + uv_mutex_unlock(&barrier->mutex); + + uv_sem_wait(&barrier->turnstile1); + uv_sem_post(&barrier->turnstile1); + + uv_mutex_lock(&barrier->mutex); + if (--barrier->count == 0) { + uv_sem_wait(&barrier->turnstile1); + uv_sem_post(&barrier->turnstile2); + } + uv_mutex_unlock(&barrier->mutex); + + uv_sem_wait(&barrier->turnstile2); + uv_sem_post(&barrier->turnstile2); +} + +#else /* !(defined(__APPLE__) && defined(__MACH__)) */ + +int uv_barrier_init(uv_barrier_t* barrier, unsigned int count) { + if (pthread_barrier_init(barrier, NULL, count)) + return -1; + else + return 0; +} + + +void uv_barrier_destroy(uv_barrier_t* barrier) { + if (pthread_barrier_destroy(barrier)) + abort(); +} + + +void uv_barrier_wait(uv_barrier_t* barrier) { + int r = pthread_barrier_wait(barrier); + if (r && r != PTHREAD_BARRIER_SERIAL_THREAD) + abort(); +} + +#endif /* defined(__APPLE__) && defined(__MACH__) */ diff --git a/deps/uv/src/unix/threadpool.c b/deps/uv/src/unix/threadpool.c index a1aaa2bd6e..c7044a5d5d 100644 --- a/deps/uv/src/unix/threadpool.c +++ b/deps/uv/src/unix/threadpool.c @@ -98,18 +98,17 @@ static void init_once(void) { __attribute__((destructor)) static void cleanup(void) { unsigned int i; - int err; if (initialized == 0) return; post(&exit_message); - for (i = 0; i < ARRAY_SIZE(threads); i++) { - err = pthread_join(threads[i], NULL); - assert(err == 0 || err == ESRCH); - (void) err; /* Silence compiler warning in release builds. */ - } + for (i = 0; i < ARRAY_SIZE(threads); i++) + if (pthread_join(threads[i], NULL)) + abort(); + + initialized = 0; } diff --git a/deps/uv/src/win/thread.c b/deps/uv/src/win/thread.c index da6138ad7d..e774fd9cf9 100644 --- a/deps/uv/src/win/thread.c +++ b/deps/uv/src/win/thread.c @@ -610,3 +610,57 @@ int uv_cond_timedwait(uv_cond_t* cond, uv_mutex_t* mutex, else return uv_cond_fallback_timedwait(cond, mutex, timeout); } + + +int uv_barrier_init(uv_barrier_t* barrier, unsigned int count) { + barrier->n = count; + barrier->count = 0; + + if (uv_mutex_init(&barrier->mutex)) + return -1; + + if (uv_sem_init(&barrier->turnstile1, 0)) + goto error2; + + if (uv_sem_init(&barrier->turnstile2, 1)) + goto error; + + return 0; + +error: + uv_sem_destroy(&barrier->turnstile1); +error2: + uv_mutex_destroy(&barrier->mutex); + return -1; + +} + + +void uv_barrier_destroy(uv_barrier_t* barrier) { + uv_sem_destroy(&barrier->turnstile2); + uv_sem_destroy(&barrier->turnstile1); + uv_mutex_destroy(&barrier->mutex); +} + + +void uv_barrier_wait(uv_barrier_t* barrier) { + uv_mutex_lock(&barrier->mutex); + if (++barrier->count == barrier->n) { + uv_sem_wait(&barrier->turnstile2); + uv_sem_post(&barrier->turnstile1); + } + uv_mutex_unlock(&barrier->mutex); + + uv_sem_wait(&barrier->turnstile1); + uv_sem_post(&barrier->turnstile1); + + uv_mutex_lock(&barrier->mutex); + if (--barrier->count == 0) { + uv_sem_wait(&barrier->turnstile1); + uv_sem_post(&barrier->turnstile2); + } + uv_mutex_unlock(&barrier->mutex); + + uv_sem_wait(&barrier->turnstile2); + uv_sem_post(&barrier->turnstile2); +} diff --git a/deps/uv/test/runner-win.c b/deps/uv/test/runner-win.c index 111a6869d1..8f534bcdb7 100644 --- a/deps/uv/test/runner-win.c +++ b/deps/uv/test/runner-win.c @@ -24,7 +24,10 @@ #include #include #include -#include +#if !defined(__MINGW32__) +# include +#endif + #include "task.h" #include "runner.h" @@ -44,8 +47,10 @@ void platform_init(int argc, char **argv) { /* Disable the "application crashed" popup. */ SetErrorMode(SEM_FAILCRITICALERRORS | SEM_NOGPFAULTERRORBOX | SEM_NOOPENFILEERRORBOX); +#if !defined(__MINGW32__) _CrtSetReportMode(_CRT_ASSERT, _CRTDBG_MODE_DEBUG); _CrtSetReportMode(_CRT_ERROR, _CRTDBG_MODE_DEBUG); +#endif _setmode(0, _O_BINARY); _setmode(1, _O_BINARY); diff --git a/deps/uv/test/test-barrier.c b/deps/uv/test/test-barrier.c new file mode 100644 index 0000000000..97df704c0e --- /dev/null +++ b/deps/uv/test/test-barrier.c @@ -0,0 +1,98 @@ +/* Copyright Joyent, Inc. and other Node contributors. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +#include "uv.h" +#include "task.h" + +#include +#include + +typedef struct { + uv_barrier_t barrier; + int delay; + volatile int posted; +} worker_config; + + +static void worker(void* arg) { + worker_config* c = arg; + + if (c->delay) + uv_sleep(c->delay); + + uv_barrier_wait(&c->barrier); +} + + +TEST_IMPL(barrier_1) { + uv_thread_t thread; + worker_config wc; + + memset(&wc, 0, sizeof(wc)); + + ASSERT(0 == uv_barrier_init(&wc.barrier, 2)); + ASSERT(0 == uv_thread_create(&thread, worker, &wc)); + + uv_sleep(100); + uv_barrier_wait(&wc.barrier); + + ASSERT(0 == uv_thread_join(&thread)); + uv_barrier_destroy(&wc.barrier); + + return 0; +} + + +TEST_IMPL(barrier_2) { + uv_thread_t thread; + worker_config wc; + + memset(&wc, 0, sizeof(wc)); + wc.delay = 100; + + ASSERT(0 == uv_barrier_init(&wc.barrier, 2)); + ASSERT(0 == uv_thread_create(&thread, worker, &wc)); + + uv_barrier_wait(&wc.barrier); + + ASSERT(0 == uv_thread_join(&thread)); + uv_barrier_destroy(&wc.barrier); + + return 0; +} + + +TEST_IMPL(barrier_3) { + uv_thread_t thread; + worker_config wc; + + memset(&wc, 0, sizeof(wc)); + + ASSERT(0 == uv_barrier_init(&wc.barrier, 2)); + ASSERT(0 == uv_thread_create(&thread, worker, &wc)); + + uv_barrier_wait(&wc.barrier); + + ASSERT(0 == uv_thread_join(&thread)); + uv_barrier_destroy(&wc.barrier); + + return 0; +} diff --git a/deps/uv/test/test-condvar-consumer-producer.c b/deps/uv/test/test-condvar-consumer-producer.c new file mode 100644 index 0000000000..b2e8d3d9d9 --- /dev/null +++ b/deps/uv/test/test-condvar-consumer-producer.c @@ -0,0 +1,137 @@ +/* Copyright Joyent, Inc. and other Node contributors. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +#include "uv.h" +#include "task.h" + +#include +#include + +#define MAX_CONSUMERS 32 +#define MAX_LOOPS 1000 + +struct buffer_s { + ngx_queue_t queue; + int data; +}; +typedef struct buffer_s buffer_t; + +static ngx_queue_t queue; +static uv_mutex_t mutex; +static uv_cond_t empty; +static uv_cond_t full; + +static volatile int finished_consumers = 0; + + +static void produce(int value) { + buffer_t* buf; + + buf = malloc(sizeof(*buf)); + ngx_queue_init(&buf->queue); + buf->data = value; + ngx_queue_insert_tail(&queue, &buf->queue); +} + + +static int consume(void) { + ngx_queue_t* q; + buffer_t* buf; + int data; + + ASSERT(!ngx_queue_empty(&queue)); + q = ngx_queue_last(&queue); + ngx_queue_remove(q); + + buf = ngx_queue_data(q, buffer_t, queue); + data = buf->data; + free(buf); + + return data; +} + + +static void producer(void* arg) { + int i; + + (void) arg; + + for (i = 0; i < MAX_LOOPS * MAX_CONSUMERS; i++) { + uv_mutex_lock(&mutex); + while(!ngx_queue_empty(&queue)) + uv_cond_wait(&empty, &mutex); + produce(i); + uv_cond_signal(&full); + uv_mutex_unlock(&mutex); + } + + LOGF("finished_consumers: %d\n", finished_consumers); + ASSERT(finished_consumers == MAX_CONSUMERS); +} + + +static void consumer(void* arg) { + int i; + int value; + + (void) arg; + + for (i = 0; i < MAX_LOOPS; i++) { + uv_mutex_lock(&mutex); + while (ngx_queue_empty(&queue)) + uv_cond_wait(&full, &mutex); + value = consume(); + ASSERT(value < MAX_LOOPS * MAX_CONSUMERS); + uv_cond_signal(&empty); + uv_mutex_unlock(&mutex); + } + + finished_consumers++; +} + + +TEST_IMPL(consumer_producer) { + int i; + uv_thread_t cthreads[MAX_CONSUMERS]; + uv_thread_t pthread; + + ngx_queue_init(&queue); + ASSERT(0 == uv_mutex_init(&mutex)); + ASSERT(0 == uv_cond_init(&empty)); + ASSERT(0 == uv_cond_init(&full)); + + for (i = 0; i < MAX_CONSUMERS; i++) { + ASSERT(0 == uv_thread_create(&cthreads[i], consumer, NULL)); + } + + ASSERT(0 == uv_thread_create(&pthread, producer, NULL)); + + for (i = 0; i < MAX_CONSUMERS; i++) { + ASSERT(0 == uv_thread_join(&cthreads[i])); + } + + ASSERT(0 == uv_thread_join(&pthread)); + uv_cond_destroy(&empty); + uv_cond_destroy(&full); + uv_mutex_destroy(&mutex); + + return 0; +} diff --git a/deps/uv/test/test-list.h b/deps/uv/test/test-list.h index 20c12e3e0a..28a13b2e23 100644 --- a/deps/uv/test/test-list.h +++ b/deps/uv/test/test-list.h @@ -22,6 +22,9 @@ TEST_DECLARE (platform_output) TEST_DECLARE (callback_order) TEST_DECLARE (run_once) +TEST_DECLARE (barrier_1) +TEST_DECLARE (barrier_2) +TEST_DECLARE (barrier_3) TEST_DECLARE (condvar_1) TEST_DECLARE (condvar_2) TEST_DECLARE (condvar_3) @@ -214,6 +217,9 @@ TASK_LIST_START TEST_ENTRY (callback_order) #endif TEST_ENTRY (run_once) + TEST_ENTRY (barrier_1) + TEST_ENTRY (barrier_2) + TEST_ENTRY (barrier_3) TEST_ENTRY (condvar_1) TEST_ENTRY (condvar_2) TEST_ENTRY (condvar_3) diff --git a/deps/uv/uv.gyp b/deps/uv/uv.gyp index fadbd29e3a..9962194db7 100644 --- a/deps/uv/uv.gyp +++ b/deps/uv/uv.gyp @@ -305,6 +305,7 @@ 'test/test-mutexes.c', 'test/test-signal.c', 'test/test-thread.c', + 'test/test-barrier.c', 'test/test-condvar.c', 'test/test-condvar-consumer-producer.c', 'test/test-timer-again.c',