|
|
@ -60,6 +60,7 @@ static void uv__stream_connect(uv_stream_t*); |
|
|
|
static void uv__write(uv_stream_t* stream); |
|
|
|
static void uv__read(uv_stream_t* stream); |
|
|
|
static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events); |
|
|
|
static size_t uv__write_req_size(uv_write_t* req); |
|
|
|
|
|
|
|
|
|
|
|
/* Used by the accept() EMFILE party trick. */ |
|
|
@ -399,6 +400,7 @@ void uv__stream_destroy(uv_stream_t* stream) { |
|
|
|
|
|
|
|
if (req->bufs != req->bufsml) |
|
|
|
free(req->bufs); |
|
|
|
req->bufs = NULL; |
|
|
|
|
|
|
|
if (req->cb) { |
|
|
|
uv__set_artificial_error(req->handle->loop, UV_ECANCELED); |
|
|
@ -413,6 +415,13 @@ void uv__stream_destroy(uv_stream_t* stream) { |
|
|
|
req = QUEUE_DATA(q, uv_write_t, queue); |
|
|
|
uv__req_unregister(stream->loop, req); |
|
|
|
|
|
|
|
if (req->bufs != NULL) { |
|
|
|
stream->write_queue_size -= uv__write_req_size(req); |
|
|
|
if (req->bufs != req->bufsml) |
|
|
|
free(req->bufs); |
|
|
|
req->bufs = NULL; |
|
|
|
} |
|
|
|
|
|
|
|
if (req->cb) { |
|
|
|
uv__set_sys_error(stream->loop, req->error); |
|
|
|
req->cb(req, req->error ? -1 : 0); |
|
|
@ -420,6 +429,11 @@ void uv__stream_destroy(uv_stream_t* stream) { |
|
|
|
} |
|
|
|
|
|
|
|
if (stream->shutdown_req) { |
|
|
|
/* The UV_ECANCELED error code is a lie, the shutdown(2) syscall is a
|
|
|
|
* fait accompli at this point. Maybe we should revisit this in v0.11. |
|
|
|
* A possible reason for leaving it unchanged is that it informs the |
|
|
|
* callee that the handle has been destroyed. |
|
|
|
*/ |
|
|
|
uv__req_unregister(stream->loop, stream->shutdown_req); |
|
|
|
uv__set_artificial_error(stream->loop, UV_ECANCELED); |
|
|
|
stream->shutdown_req->cb(stream->shutdown_req, -1); |
|
|
@ -601,8 +615,6 @@ static void uv__drain(uv_stream_t* stream) { |
|
|
|
uv_shutdown_t* req; |
|
|
|
|
|
|
|
assert(QUEUE_EMPTY(&stream->write_queue)); |
|
|
|
assert(stream->write_queue_size == 0); |
|
|
|
|
|
|
|
uv__io_stop(stream->loop, &stream->io_watcher, UV__POLLOUT); |
|
|
|
|
|
|
|
/* Shutdown? */ |
|
|
@ -635,6 +647,7 @@ static void uv__drain(uv_stream_t* stream) { |
|
|
|
static size_t uv__write_req_size(uv_write_t* req) { |
|
|
|
size_t size; |
|
|
|
|
|
|
|
assert(req->bufs != NULL); |
|
|
|
size = uv__buf_count(req->bufs + req->write_index, |
|
|
|
req->bufcnt - req->write_index); |
|
|
|
assert(req->handle->write_queue_size >= size); |
|
|
@ -648,10 +661,18 @@ static void uv__write_req_finish(uv_write_t* req) { |
|
|
|
|
|
|
|
/* Pop the req off tcp->write_queue. */ |
|
|
|
QUEUE_REMOVE(&req->queue); |
|
|
|
if (req->bufs != req->bufsml) { |
|
|
|
free(req->bufs); |
|
|
|
|
|
|
|
/* Only free when there was no error. On error, we touch up write_queue_size
|
|
|
|
* right before making the callback. The reason we don't do that right away |
|
|
|
* is that a write_queue_size > 0 is our only way to signal to the user that |
|
|
|
* he should stop writing - which he should if we got an error. Something to |
|
|
|
* revisit in future revisions of the libuv API. |
|
|
|
*/ |
|
|
|
if (req->error == 0) { |
|
|
|
if (req->bufs != req->bufsml) |
|
|
|
free(req->bufs); |
|
|
|
req->bufs = NULL; |
|
|
|
} |
|
|
|
req->bufs = NULL; |
|
|
|
|
|
|
|
/* Add it to the write_completed_queue where it will have its
|
|
|
|
* callback called in the near future. |
|
|
@ -687,10 +708,8 @@ start: |
|
|
|
|
|
|
|
assert(uv__stream_fd(stream) >= 0); |
|
|
|
|
|
|
|
if (QUEUE_EMPTY(&stream->write_queue)) { |
|
|
|
assert(stream->write_queue_size == 0); |
|
|
|
if (QUEUE_EMPTY(&stream->write_queue)) |
|
|
|
return; |
|
|
|
} |
|
|
|
|
|
|
|
q = QUEUE_HEAD(&stream->write_queue); |
|
|
|
req = QUEUE_DATA(q, uv_write_t, queue); |
|
|
@ -761,8 +780,10 @@ start: |
|
|
|
if (errno != EAGAIN && errno != EWOULDBLOCK) { |
|
|
|
/* Error */ |
|
|
|
req->error = errno; |
|
|
|
stream->write_queue_size -= uv__write_req_size(req); |
|
|
|
uv__write_req_finish(req); |
|
|
|
uv__io_stop(stream->loop, &stream->io_watcher, UV__POLLOUT); |
|
|
|
if (!uv__io_active(&stream->io_watcher, UV__POLLIN)) |
|
|
|
uv__handle_stop(stream); |
|
|
|
return; |
|
|
|
} else if (stream->flags & UV_STREAM_BLOCKING) { |
|
|
|
/* If this is a blocking stream, try again. */ |
|
|
@ -838,6 +859,13 @@ static void uv__write_callbacks(uv_stream_t* stream) { |
|
|
|
QUEUE_REMOVE(q); |
|
|
|
uv__req_unregister(stream->loop, req); |
|
|
|
|
|
|
|
if (req->bufs != NULL) { |
|
|
|
stream->write_queue_size -= uv__write_req_size(req); |
|
|
|
if (req->bufs != req->bufsml) |
|
|
|
free(req->bufs); |
|
|
|
req->bufs = NULL; |
|
|
|
} |
|
|
|
|
|
|
|
/* NOTE: call callback AFTER freeing the request data. */ |
|
|
|
if (req->cb) { |
|
|
|
uv__set_sys_error(stream->loop, req->error); |
|
|
@ -1119,6 +1147,7 @@ static void uv__stream_connect(uv_stream_t* stream) { |
|
|
|
|
|
|
|
stream->connect_req = NULL; |
|
|
|
uv__req_unregister(stream->loop, req); |
|
|
|
uv__io_stop(stream->loop, &stream->io_watcher, UV__POLLOUT); |
|
|
|
|
|
|
|
if (req->cb) { |
|
|
|
uv__set_sys_error(stream->loop, error); |
|
|
@ -1158,6 +1187,12 @@ int uv_write2(uv_write_t* req, |
|
|
|
return uv__set_artificial_error(stream->loop, UV_EBADF); |
|
|
|
} |
|
|
|
|
|
|
|
/* It's legal for write_queue_size > 0 even when the write_queue is empty;
|
|
|
|
* it means there are error-state requests in the write_completed_queue that |
|
|
|
* will touch up write_queue_size later, see also uv__write_req_finish(). |
|
|
|
* We chould check that write_queue is empty instead but that implies making |
|
|
|
* a write() syscall when we know that the handle is in error mode. |
|
|
|
*/ |
|
|
|
empty_queue = (stream->write_queue_size == 0); |
|
|
|
|
|
|
|
/* Initialize the req */ |
|
|
@ -1266,9 +1301,20 @@ int uv_read2_start(uv_stream_t* stream, uv_alloc_cb alloc_cb, |
|
|
|
|
|
|
|
|
|
|
|
int uv_read_stop(uv_stream_t* stream) { |
|
|
|
uv__io_stop(stream->loop, &stream->io_watcher, UV__POLLIN); |
|
|
|
uv__handle_stop(stream); |
|
|
|
/* Sanity check. We're going to stop the handle unless it's primed for
|
|
|
|
* writing but that means there should be some kind of write action in |
|
|
|
* progress. |
|
|
|
*/ |
|
|
|
assert(!uv__io_active(&stream->io_watcher, UV__POLLOUT) || |
|
|
|
!QUEUE_EMPTY(&stream->write_completed_queue) || |
|
|
|
!QUEUE_EMPTY(&stream->write_queue) || |
|
|
|
stream->shutdown_req != NULL || |
|
|
|
stream->connect_req != NULL); |
|
|
|
|
|
|
|
stream->flags &= ~UV_STREAM_READING; |
|
|
|
uv__io_stop(stream->loop, &stream->io_watcher, UV__POLLIN); |
|
|
|
if (!uv__io_active(&stream->io_watcher, UV__POLLOUT)) |
|
|
|
uv__handle_stop(stream); |
|
|
|
|
|
|
|
#if defined(__APPLE__) |
|
|
|
/* Notify select() thread about state change */ |
|
|
|