|
|
@ -60,6 +60,7 @@ static void uv__stream_connect(uv_stream_t*); |
|
|
|
static void uv__write(uv_stream_t* stream); |
|
|
|
static void uv__read(uv_stream_t* stream); |
|
|
|
static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events); |
|
|
|
static size_t uv__write_req_size(uv_write_t* req); |
|
|
|
|
|
|
|
|
|
|
|
/* Used by the accept() EMFILE party trick. */ |
|
|
@ -399,6 +400,7 @@ void uv__stream_destroy(uv_stream_t* stream) { |
|
|
|
|
|
|
|
if (req->bufs != req->bufsml) |
|
|
|
free(req->bufs); |
|
|
|
req->bufs = NULL; |
|
|
|
|
|
|
|
if (req->cb) { |
|
|
|
uv__set_artificial_error(req->handle->loop, UV_ECANCELED); |
|
|
@ -413,6 +415,13 @@ void uv__stream_destroy(uv_stream_t* stream) { |
|
|
|
req = ngx_queue_data(q, uv_write_t, queue); |
|
|
|
uv__req_unregister(stream->loop, req); |
|
|
|
|
|
|
|
if (req->bufs != NULL) { |
|
|
|
stream->write_queue_size -= uv__write_req_size(req); |
|
|
|
if (req->bufs != req->bufsml) |
|
|
|
free(req->bufs); |
|
|
|
req->bufs = NULL; |
|
|
|
} |
|
|
|
|
|
|
|
if (req->cb) { |
|
|
|
uv__set_sys_error(stream->loop, req->error); |
|
|
|
req->cb(req, req->error ? -1 : 0); |
|
|
@ -652,6 +661,7 @@ static void uv__drain(uv_stream_t* stream) { |
|
|
|
static size_t uv__write_req_size(uv_write_t* req) { |
|
|
|
size_t size; |
|
|
|
|
|
|
|
assert(req->bufs != NULL); |
|
|
|
size = uv__buf_count(req->bufs + req->write_index, |
|
|
|
req->bufcnt - req->write_index); |
|
|
|
assert(req->handle->write_queue_size >= size); |
|
|
@ -665,10 +675,18 @@ static void uv__write_req_finish(uv_write_t* req) { |
|
|
|
|
|
|
|
/* Pop the req off tcp->write_queue. */ |
|
|
|
ngx_queue_remove(&req->queue); |
|
|
|
if (req->bufs != req->bufsml) { |
|
|
|
|
|
|
|
/* Only free when there was no error. On error, we touch up write_queue_size
|
|
|
|
* right before making the callback. The reason we don't do that right away |
|
|
|
* is that a write_queue_size > 0 is our only way to signal to the user that |
|
|
|
* he should stop writing - which he should if we got an error. Something to |
|
|
|
* revisit in future revisions of the libuv API. |
|
|
|
*/ |
|
|
|
if (req->error == 0) { |
|
|
|
if (req->bufs != req->bufsml) |
|
|
|
free(req->bufs); |
|
|
|
} |
|
|
|
req->bufs = NULL; |
|
|
|
} |
|
|
|
|
|
|
|
/* Add it to the write_completed_queue where it will have its
|
|
|
|
* callback called in the near future. |
|
|
@ -778,7 +796,6 @@ start: |
|
|
|
if (errno != EAGAIN && errno != EWOULDBLOCK) { |
|
|
|
/* Error */ |
|
|
|
req->error = errno; |
|
|
|
stream->write_queue_size -= uv__write_req_size(req); |
|
|
|
uv__write_req_finish(req); |
|
|
|
return; |
|
|
|
} else if (stream->flags & UV_STREAM_BLOCKING) { |
|
|
@ -855,6 +872,13 @@ static void uv__write_callbacks(uv_stream_t* stream) { |
|
|
|
ngx_queue_remove(q); |
|
|
|
uv__req_unregister(stream->loop, req); |
|
|
|
|
|
|
|
if (req->bufs != NULL) { |
|
|
|
stream->write_queue_size -= uv__write_req_size(req); |
|
|
|
if (req->bufs != req->bufsml) |
|
|
|
free(req->bufs); |
|
|
|
req->bufs = NULL; |
|
|
|
} |
|
|
|
|
|
|
|
/* NOTE: call callback AFTER freeing the request data. */ |
|
|
|
if (req->cb) { |
|
|
|
uv__set_sys_error(stream->loop, req->error); |
|
|
@ -1136,6 +1160,7 @@ static void uv__stream_connect(uv_stream_t* stream) { |
|
|
|
|
|
|
|
stream->connect_req = NULL; |
|
|
|
uv__req_unregister(stream->loop, req); |
|
|
|
uv__io_stop(stream->loop, &stream->io_watcher, UV__POLLOUT); |
|
|
|
|
|
|
|
if (req->cb) { |
|
|
|
uv__set_sys_error(stream->loop, error); |
|
|
@ -1283,6 +1308,16 @@ int uv_read2_start(uv_stream_t* stream, uv_alloc_cb alloc_cb, |
|
|
|
|
|
|
|
|
|
|
|
int uv_read_stop(uv_stream_t* stream) { |
|
|
|
/* Sanity check. We're going to stop the handle unless it's primed for
|
|
|
|
* writing but that means there should be some kind of write action in |
|
|
|
* progress. |
|
|
|
*/ |
|
|
|
assert(!uv__io_active(&stream->io_watcher, UV__POLLOUT) || |
|
|
|
!ngx_queue_empty(&stream->write_completed_queue) || |
|
|
|
!ngx_queue_empty(&stream->write_queue) || |
|
|
|
stream->shutdown_req != NULL || |
|
|
|
stream->connect_req != NULL); |
|
|
|
|
|
|
|
uv__io_stop(stream->loop, &stream->io_watcher, UV__POLLIN); |
|
|
|
uv__handle_stop(stream); |
|
|
|
stream->flags &= ~UV_STREAM_READING; |
|
|
|