|
|
@ -34,6 +34,27 @@ |
|
|
|
#include <sys/un.h> |
|
|
|
#include <unistd.h> |
|
|
|
|
|
|
|
#if defined(__APPLE__) |
|
|
|
# include <sys/event.h> |
|
|
|
# include <sys/time.h> |
|
|
|
# include <sys/select.h> |
|
|
|
|
|
|
|
/* ev.h is overwriting EV_ERROR from sys/event.h */ |
|
|
|
#define EV_ERROR_ORIG 0x4000 |
|
|
|
|
|
|
|
/* Forward declaration */ |
|
|
|
typedef struct uv__stream_select_s uv__stream_select_t; |
|
|
|
|
|
|
|
struct uv__stream_select_s { |
|
|
|
uv_stream_t* stream; |
|
|
|
uv_thread_t thread; |
|
|
|
uv_sem_t sem; |
|
|
|
uv_mutex_t mutex; |
|
|
|
uv_async_t async; |
|
|
|
int events; |
|
|
|
int fake_fd; |
|
|
|
}; |
|
|
|
#endif /* defined(__APPLE__) */ |
|
|
|
|
|
|
|
static void uv__stream_connect(uv_stream_t*); |
|
|
|
static void uv__write(uv_stream_t* stream); |
|
|
@ -69,11 +90,182 @@ void uv__stream_init(uv_loop_t* loop, |
|
|
|
ngx_queue_init(&stream->write_completed_queue); |
|
|
|
stream->write_queue_size = 0; |
|
|
|
|
|
|
|
#if defined(__APPLE__) |
|
|
|
stream->select = NULL; |
|
|
|
#endif /* defined(__APPLE_) */ |
|
|
|
|
|
|
|
uv__io_init(&stream->read_watcher, uv__stream_io, -1, 0); |
|
|
|
uv__io_init(&stream->write_watcher, uv__stream_io, -1, 0); |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
#if defined(__APPLE__) |
|
|
|
void uv__stream_osx_select(void* arg) { |
|
|
|
uv_stream_t* stream; |
|
|
|
uv__stream_select_t* s; |
|
|
|
fd_set read; |
|
|
|
fd_set write; |
|
|
|
fd_set error; |
|
|
|
struct timeval timeout; |
|
|
|
int events; |
|
|
|
int fd; |
|
|
|
int r; |
|
|
|
|
|
|
|
stream = arg; |
|
|
|
s = stream->select; |
|
|
|
fd = stream->fd; |
|
|
|
|
|
|
|
while (1) { |
|
|
|
/* Terminate on semaphore */ |
|
|
|
if (uv_sem_trywait(&s->sem) == 0) break; |
|
|
|
|
|
|
|
/* Watch fd using select(2) */ |
|
|
|
FD_ZERO(&read); |
|
|
|
FD_ZERO(&write); |
|
|
|
FD_ZERO(&error); |
|
|
|
FD_SET(fd, &read); |
|
|
|
FD_SET(fd, &write); |
|
|
|
FD_SET(fd, &error); |
|
|
|
|
|
|
|
timeout.tv_sec = 0; |
|
|
|
timeout.tv_usec = 250000; /* 250 ms timeout */ |
|
|
|
r = select(fd + 1, &read, &write, &error, &timeout); |
|
|
|
if (r == -1) { |
|
|
|
if (errno == EINTR) continue; |
|
|
|
/* XXX: Possible?! */ |
|
|
|
abort(); |
|
|
|
} |
|
|
|
|
|
|
|
/* Ignore timeouts */ |
|
|
|
if (r == 0) continue; |
|
|
|
|
|
|
|
/* Handle events */ |
|
|
|
events = 0; |
|
|
|
if (FD_ISSET(fd, &read)) events |= UV__IO_READ; |
|
|
|
if (FD_ISSET(fd, &write)) events |= UV__IO_WRITE; |
|
|
|
if (FD_ISSET(fd, &error)) events |= UV__IO_ERROR; |
|
|
|
|
|
|
|
uv_mutex_lock(&s->mutex); |
|
|
|
s->events |= events; |
|
|
|
uv_mutex_unlock(&s->mutex); |
|
|
|
|
|
|
|
if (events != 0) uv_async_send(&s->async); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
void uv__stream_osx_select_cb(uv_async_t* handle, int status) { |
|
|
|
uv_stream_t* stream; |
|
|
|
uv__stream_select_t* s; |
|
|
|
int events; |
|
|
|
|
|
|
|
s = container_of(handle, uv__stream_select_t, async); |
|
|
|
stream = s->stream; |
|
|
|
|
|
|
|
/* Get and reset stream's events */ |
|
|
|
uv_mutex_lock(&s->mutex); |
|
|
|
events = s->events; |
|
|
|
s->events = 0; |
|
|
|
uv_mutex_unlock(&s->mutex); |
|
|
|
|
|
|
|
/* Invoke callback on event-loop */ |
|
|
|
if ((events & UV__IO_READ) && uv__io_active(&stream->read_watcher)) { |
|
|
|
uv__stream_io(stream->loop, &stream->read_watcher, UV__IO_READ); |
|
|
|
} |
|
|
|
if ((events & UV__IO_WRITE) && uv__io_active(&stream->write_watcher)) { |
|
|
|
uv__stream_io(stream->loop, &stream->write_watcher, UV__IO_WRITE); |
|
|
|
} |
|
|
|
if (events & UV__IO_ERROR) { |
|
|
|
/* XXX: Handle it! */ |
|
|
|
uv__stream_io(stream->loop, NULL, UV__IO_ERROR); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
void uv__stream_osx_cb_close(uv_handle_t* async) { |
|
|
|
/* Free container */ |
|
|
|
free(container_of(async, uv__stream_select_t, async)); |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
int uv__stream_try_select(uv_stream_t* stream, int fd) { |
|
|
|
/*
|
|
|
|
* kqueue doesn't work with some files from /dev mount on osx. |
|
|
|
* select(2) in separate thread for those fds |
|
|
|
*/ |
|
|
|
|
|
|
|
int kq; |
|
|
|
int ret; |
|
|
|
struct kevent filter[1]; |
|
|
|
struct kevent events[1]; |
|
|
|
struct timespec timeout; |
|
|
|
uv__stream_select_t* s; |
|
|
|
|
|
|
|
kq = kqueue(); |
|
|
|
if (kq < 0) { |
|
|
|
fprintf(stderr, "(libuv) Failed to create kqueue (%d)\n", errno); |
|
|
|
abort(); |
|
|
|
} |
|
|
|
|
|
|
|
EV_SET(&filter[0], fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, 0); |
|
|
|
|
|
|
|
/* Use small timeout, because we only want to capture EINVALs */ |
|
|
|
timeout.tv_sec = 0; |
|
|
|
timeout.tv_nsec = 1; |
|
|
|
|
|
|
|
ret = kevent(kq, filter, 1, events, 1, &timeout); |
|
|
|
close(kq); |
|
|
|
if (ret < 1) return -1; |
|
|
|
if ((events[0].flags & EV_ERROR_ORIG) == 0 || events[0].data != EINVAL) { |
|
|
|
return -1; |
|
|
|
} |
|
|
|
|
|
|
|
/* At this point we definitely know that this fd won't work with kqueue */ |
|
|
|
s = malloc(sizeof(*s)); |
|
|
|
if (s == NULL) { |
|
|
|
/* TODO: Return error */ |
|
|
|
abort(); |
|
|
|
} |
|
|
|
|
|
|
|
if (uv_async_init(stream->loop, |
|
|
|
&s->async, |
|
|
|
uv__stream_osx_select_cb)) { |
|
|
|
return -1; |
|
|
|
} |
|
|
|
s->async.flags |= UV__HANDLE_INTERNAL; |
|
|
|
uv__handle_unref((uv_handle_t*) &s->async); |
|
|
|
|
|
|
|
if (uv_sem_init(&s->sem, 0)) goto fatal1; |
|
|
|
if (uv_mutex_init(&s->mutex)) goto fatal2; |
|
|
|
|
|
|
|
/* Create fake fd for io watcher */ |
|
|
|
s->fake_fd = socket(AF_UNIX, SOCK_STREAM, 0); |
|
|
|
if (s->fake_fd == -1) goto fatal3; |
|
|
|
|
|
|
|
if (uv_thread_create(&s->thread, uv__stream_osx_select, stream)) { |
|
|
|
goto fatal4; |
|
|
|
} |
|
|
|
|
|
|
|
s->stream = stream; |
|
|
|
stream->select = s; |
|
|
|
|
|
|
|
return 0; |
|
|
|
|
|
|
|
fatal4: |
|
|
|
close(s->fake_fd); |
|
|
|
fatal3: |
|
|
|
uv_mutex_destroy(&s->mutex); |
|
|
|
fatal2: |
|
|
|
uv_sem_destroy(&s->sem); |
|
|
|
fatal1: |
|
|
|
uv_close((uv_handle_t*) &s->async, uv__stream_osx_cb_close); |
|
|
|
|
|
|
|
free(s); |
|
|
|
return -1; |
|
|
|
} |
|
|
|
#endif /* defined(__APPLE__) */ |
|
|
|
|
|
|
|
|
|
|
|
int uv__stream_open(uv_stream_t* stream, int fd, int flags) { |
|
|
|
socklen_t yes; |
|
|
|
|
|
|
@ -102,6 +294,13 @@ int uv__stream_open(uv_stream_t* stream, int fd, int flags) { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
#if defined(__APPLE__) |
|
|
|
if (uv__stream_try_select(stream, fd) == 0) { |
|
|
|
/* Use fake fd */ |
|
|
|
fd = ((uv__stream_select_t*) stream->select)->fake_fd; |
|
|
|
} |
|
|
|
#endif /* defined(__APPLE__) */ |
|
|
|
|
|
|
|
/* Associate the fd with each watcher. */ |
|
|
|
uv__io_set(&stream->read_watcher, uv__stream_io, fd, UV__IO_READ); |
|
|
|
uv__io_set(&stream->write_watcher, uv__stream_io, fd, UV__IO_WRITE); |
|
|
@ -980,6 +1179,24 @@ int uv_is_writable(const uv_stream_t* stream) { |
|
|
|
|
|
|
|
|
|
|
|
void uv__stream_close(uv_stream_t* handle) { |
|
|
|
#if defined(__APPLE__) |
|
|
|
/* Terminate select loop first */ |
|
|
|
if (handle->select != NULL) { |
|
|
|
uv__stream_select_t* s; |
|
|
|
|
|
|
|
s = handle->select; |
|
|
|
|
|
|
|
uv_sem_post(&s->sem); |
|
|
|
uv_thread_join(&s->thread); |
|
|
|
uv_sem_destroy(&s->sem); |
|
|
|
uv_mutex_destroy(&s->mutex); |
|
|
|
close(s->fake_fd); |
|
|
|
uv_close((uv_handle_t*) &s->async, uv__stream_osx_cb_close); |
|
|
|
|
|
|
|
handle->select = NULL; |
|
|
|
} |
|
|
|
#endif /* defined(__APPLE__) */ |
|
|
|
|
|
|
|
uv_read_stop(handle); |
|
|
|
uv__io_stop(handle->loop, &handle->write_watcher); |
|
|
|
|
|
|
|