|
@ -59,6 +59,7 @@ struct uv_ares_data_s { |
|
|
static struct uv_ares_data_s ares_data; |
|
|
static struct uv_ares_data_s ares_data; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
void uv__req_init(uv_req_t*); |
|
|
void uv__tcp_io(EV_P_ ev_io* watcher, int revents); |
|
|
void uv__tcp_io(EV_P_ ev_io* watcher, int revents); |
|
|
void uv__next(EV_P_ ev_idle* watcher, int revents); |
|
|
void uv__next(EV_P_ ev_idle* watcher, int revents); |
|
|
static void uv__tcp_connect(uv_tcp_t*); |
|
|
static void uv__tcp_connect(uv_tcp_t*); |
|
@ -518,9 +519,9 @@ void uv__finish_close(uv_handle_t* handle) { |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
uv_req_t* uv_write_queue_head(uv_tcp_t* tcp) { |
|
|
uv_write_t* uv_write_queue_head(uv_tcp_t* tcp) { |
|
|
ngx_queue_t* q; |
|
|
ngx_queue_t* q; |
|
|
uv_req_t* req; |
|
|
uv_write_t* req; |
|
|
|
|
|
|
|
|
if (ngx_queue_empty(&tcp->write_queue)) { |
|
|
if (ngx_queue_empty(&tcp->write_queue)) { |
|
|
return NULL; |
|
|
return NULL; |
|
@ -531,7 +532,7 @@ uv_req_t* uv_write_queue_head(uv_tcp_t* tcp) { |
|
|
return NULL; |
|
|
return NULL; |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
req = ngx_queue_data(q, struct uv_req_s, queue); |
|
|
req = ngx_queue_data(q, struct uv_write_s, queue); |
|
|
assert(req); |
|
|
assert(req); |
|
|
|
|
|
|
|
|
return req; |
|
|
return req; |
|
@ -552,8 +553,7 @@ void uv__next(EV_P_ ev_idle* watcher, int revents) { |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static void uv__drain(uv_tcp_t* tcp) { |
|
|
static void uv__drain(uv_tcp_t* tcp) { |
|
|
uv_req_t* req; |
|
|
uv_shutdown_t* req; |
|
|
uv_shutdown_cb cb; |
|
|
|
|
|
|
|
|
|
|
|
assert(!uv_write_queue_head(tcp)); |
|
|
assert(!uv_write_queue_head(tcp)); |
|
|
assert(tcp->write_queue_size == 0); |
|
|
assert(tcp->write_queue_size == 0); |
|
@ -567,16 +567,19 @@ static void uv__drain(uv_tcp_t* tcp) { |
|
|
assert(tcp->shutdown_req); |
|
|
assert(tcp->shutdown_req); |
|
|
|
|
|
|
|
|
req = tcp->shutdown_req; |
|
|
req = tcp->shutdown_req; |
|
|
cb = (uv_shutdown_cb)req->cb; |
|
|
|
|
|
|
|
|
|
|
|
if (shutdown(tcp->fd, SHUT_WR)) { |
|
|
if (shutdown(tcp->fd, SHUT_WR)) { |
|
|
/* Error. Report it. User should call uv_close(). */ |
|
|
/* Error. Report it. User should call uv_close(). */ |
|
|
uv_err_new((uv_handle_t*)tcp, errno); |
|
|
uv_err_new((uv_handle_t*)tcp, errno); |
|
|
if (cb) cb(req, -1); |
|
|
if (req->cb) { |
|
|
|
|
|
req->cb(req, -1); |
|
|
|
|
|
} |
|
|
} else { |
|
|
} else { |
|
|
uv_err_new((uv_handle_t*)tcp, 0); |
|
|
uv_err_new((uv_handle_t*)tcp, 0); |
|
|
uv_flag_set((uv_handle_t*)tcp, UV_SHUT); |
|
|
uv_flag_set((uv_handle_t*)tcp, UV_SHUT); |
|
|
if (cb) cb(req, 0); |
|
|
if (req->cb) { |
|
|
|
|
|
req->cb(req, 0); |
|
|
|
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
@ -585,8 +588,8 @@ static void uv__drain(uv_tcp_t* tcp) { |
|
|
/* On success returns NULL. On error returns a pointer to the write request
|
|
|
/* On success returns NULL. On error returns a pointer to the write request
|
|
|
* which had the error. |
|
|
* which had the error. |
|
|
*/ |
|
|
*/ |
|
|
static uv_req_t* uv__write(uv_tcp_t* tcp) { |
|
|
static uv_write_t* uv__write(uv_tcp_t* tcp) { |
|
|
uv_req_t* req; |
|
|
uv_write_t* req; |
|
|
struct iovec* iov; |
|
|
struct iovec* iov; |
|
|
int iovcnt; |
|
|
int iovcnt; |
|
|
ssize_t n; |
|
|
ssize_t n; |
|
@ -602,7 +605,7 @@ static uv_req_t* uv__write(uv_tcp_t* tcp) { |
|
|
return NULL; |
|
|
return NULL; |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
assert(req->handle == (uv_handle_t*)tcp); |
|
|
assert(req->handle == (uv_stream_t*)tcp); |
|
|
|
|
|
|
|
|
/* Cast to iovec. We had to have our own uv_buf_t instead of iovec
|
|
|
/* Cast to iovec. We had to have our own uv_buf_t instead of iovec
|
|
|
* because Windows's WSABUF is not an iovec. |
|
|
* because Windows's WSABUF is not an iovec. |
|
@ -691,23 +694,20 @@ static uv_req_t* uv__write(uv_tcp_t* tcp) { |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static void uv__write_callbacks(uv_tcp_t* tcp) { |
|
|
static void uv__write_callbacks(uv_tcp_t* tcp) { |
|
|
uv_write_cb cb; |
|
|
|
|
|
int callbacks_made = 0; |
|
|
int callbacks_made = 0; |
|
|
ngx_queue_t* q; |
|
|
ngx_queue_t* q; |
|
|
uv_req_t* req; |
|
|
uv_write_t* req; |
|
|
|
|
|
|
|
|
while (!ngx_queue_empty(&tcp->write_completed_queue)) { |
|
|
while (!ngx_queue_empty(&tcp->write_completed_queue)) { |
|
|
/* Pop a req off write_completed_queue. */ |
|
|
/* Pop a req off write_completed_queue. */ |
|
|
q = ngx_queue_head(&tcp->write_completed_queue); |
|
|
q = ngx_queue_head(&tcp->write_completed_queue); |
|
|
assert(q); |
|
|
assert(q); |
|
|
req = ngx_queue_data(q, struct uv_req_s, queue); |
|
|
req = ngx_queue_data(q, struct uv_write_s, queue); |
|
|
ngx_queue_remove(q); |
|
|
ngx_queue_remove(q); |
|
|
|
|
|
|
|
|
cb = (uv_write_cb) req->cb; |
|
|
|
|
|
|
|
|
|
|
|
/* NOTE: call callback AFTER freeing the request data. */ |
|
|
/* NOTE: call callback AFTER freeing the request data. */ |
|
|
if (cb) { |
|
|
if (req->cb) { |
|
|
cb(req, 0); |
|
|
req->cb(req, 0); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
callbacks_made++; |
|
|
callbacks_made++; |
|
@ -772,10 +772,16 @@ void uv__read(uv_tcp_t* tcp) { |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
int uv_shutdown(uv_req_t* req) { |
|
|
int uv_shutdown(uv_shutdown_t* req, uv_stream_t* handle, uv_shutdown_cb cb) { |
|
|
uv_tcp_t* tcp = (uv_tcp_t*)req->handle; |
|
|
uv_tcp_t* tcp = (uv_tcp_t*)handle; |
|
|
|
|
|
assert(handle->type == UV_TCP && |
|
|
|
|
|
"uv_shutdown (unix) only supports uv_tcp_t right now"); |
|
|
assert(tcp->fd >= 0); |
|
|
assert(tcp->fd >= 0); |
|
|
assert(tcp->type == UV_TCP); |
|
|
|
|
|
|
|
|
/* Initialize request */ |
|
|
|
|
|
uv__req_init((uv_req_t*)req); |
|
|
|
|
|
req->handle = handle; |
|
|
|
|
|
req->cb = cb; |
|
|
|
|
|
|
|
|
if (uv_flag_is_set((uv_handle_t*)tcp, UV_SHUT) || |
|
|
if (uv_flag_is_set((uv_handle_t*)tcp, UV_SHUT) || |
|
|
uv_flag_is_set((uv_handle_t*)tcp, UV_CLOSED) || |
|
|
uv_flag_is_set((uv_handle_t*)tcp, UV_CLOSED) || |
|
@ -796,11 +802,11 @@ int uv_shutdown(uv_req_t* req) { |
|
|
|
|
|
|
|
|
void uv__tcp_io(EV_P_ ev_io* watcher, int revents) { |
|
|
void uv__tcp_io(EV_P_ ev_io* watcher, int revents) { |
|
|
uv_tcp_t* tcp = watcher->data; |
|
|
uv_tcp_t* tcp = watcher->data; |
|
|
|
|
|
|
|
|
|
|
|
assert(tcp->type == UV_TCP); |
|
|
assert(watcher == &tcp->read_watcher || |
|
|
assert(watcher == &tcp->read_watcher || |
|
|
watcher == &tcp->write_watcher); |
|
|
watcher == &tcp->write_watcher); |
|
|
|
|
|
|
|
|
assert(tcp->fd >= 0); |
|
|
assert(tcp->fd >= 0); |
|
|
|
|
|
|
|
|
assert(!uv_flag_is_set((uv_handle_t*)tcp, UV_CLOSING)); |
|
|
assert(!uv_flag_is_set((uv_handle_t*)tcp, UV_CLOSING)); |
|
|
|
|
|
|
|
|
if (tcp->connect_req) { |
|
|
if (tcp->connect_req) { |
|
@ -811,13 +817,11 @@ void uv__tcp_io(EV_P_ ev_io* watcher, int revents) { |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
if (revents & EV_WRITE) { |
|
|
if (revents & EV_WRITE) { |
|
|
uv_req_t* req = uv__write(tcp); |
|
|
uv_write_t* req = uv__write(tcp); |
|
|
if (req) { |
|
|
if (req) { |
|
|
/* Error. Notify the user. */ |
|
|
/* Error. Notify the user. */ |
|
|
uv_write_cb cb = (uv_write_cb) req->cb; |
|
|
if (req->cb) { |
|
|
|
|
|
req->cb(req, -1); |
|
|
if (cb) { |
|
|
|
|
|
cb(req, -1); |
|
|
|
|
|
} |
|
|
} |
|
|
} else { |
|
|
} else { |
|
|
uv__write_callbacks(tcp); |
|
|
uv__write_callbacks(tcp); |
|
@ -834,13 +838,11 @@ void uv__tcp_io(EV_P_ ev_io* watcher, int revents) { |
|
|
*/ |
|
|
*/ |
|
|
static void uv__tcp_connect(uv_tcp_t* tcp) { |
|
|
static void uv__tcp_connect(uv_tcp_t* tcp) { |
|
|
int error; |
|
|
int error; |
|
|
uv_req_t* req; |
|
|
uv_connect_t* req = tcp->connect_req; |
|
|
uv_connect_cb connect_cb; |
|
|
|
|
|
socklen_t errorsize = sizeof(int); |
|
|
socklen_t errorsize = sizeof(int); |
|
|
|
|
|
|
|
|
|
|
|
assert(tcp->type == UV_TCP); |
|
|
assert(tcp->fd >= 0); |
|
|
assert(tcp->fd >= 0); |
|
|
|
|
|
|
|
|
req = tcp->connect_req; |
|
|
|
|
|
assert(req); |
|
|
assert(req); |
|
|
|
|
|
|
|
|
if (tcp->delayed_error) { |
|
|
if (tcp->delayed_error) { |
|
@ -860,9 +862,8 @@ static void uv__tcp_connect(uv_tcp_t* tcp) { |
|
|
|
|
|
|
|
|
/* Successful connection */ |
|
|
/* Successful connection */ |
|
|
tcp->connect_req = NULL; |
|
|
tcp->connect_req = NULL; |
|
|
connect_cb = (uv_connect_cb) req->cb; |
|
|
if (req->cb) { |
|
|
if (connect_cb) { |
|
|
req->cb(req, 0); |
|
|
connect_cb(req, 0); |
|
|
|
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
} else if (error == EINPROGRESS) { |
|
|
} else if (error == EINPROGRESS) { |
|
@ -873,18 +874,15 @@ static void uv__tcp_connect(uv_tcp_t* tcp) { |
|
|
uv_err_new((uv_handle_t*)tcp, error); |
|
|
uv_err_new((uv_handle_t*)tcp, error); |
|
|
|
|
|
|
|
|
tcp->connect_req = NULL; |
|
|
tcp->connect_req = NULL; |
|
|
|
|
|
if (req->cb) { |
|
|
connect_cb = (uv_connect_cb) req->cb; |
|
|
req->cb(req, -1); |
|
|
if (connect_cb) { |
|
|
|
|
|
connect_cb(req, -1); |
|
|
|
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static int uv__connect(uv_req_t* req, struct sockaddr* addr, |
|
|
static int uv__connect(uv_connect_t* req, uv_tcp_t* tcp, struct sockaddr* addr, |
|
|
socklen_t addrlen) { |
|
|
socklen_t addrlen, uv_connect_cb cb) { |
|
|
uv_tcp_t* tcp = (uv_tcp_t*)req->handle; |
|
|
|
|
|
int r; |
|
|
int r; |
|
|
|
|
|
|
|
|
if (tcp->fd <= 0) { |
|
|
if (tcp->fd <= 0) { |
|
@ -901,6 +899,9 @@ static int uv__connect(uv_req_t* req, struct sockaddr* addr, |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
uv__req_init((uv_req_t*)req); |
|
|
|
|
|
req->cb = cb; |
|
|
|
|
|
req->handle = (uv_stream_t*)tcp; |
|
|
req->type = UV_CONNECT; |
|
|
req->type = UV_CONNECT; |
|
|
ngx_queue_init(&req->queue); |
|
|
ngx_queue_init(&req->queue); |
|
|
|
|
|
|
|
@ -947,17 +948,21 @@ static int uv__connect(uv_req_t* req, struct sockaddr* addr, |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
int uv_tcp_connect(uv_req_t* req, struct sockaddr_in addr) { |
|
|
int uv_tcp_connect(uv_connect_t* req, uv_tcp_t* handle, |
|
|
assert(addr.sin_family == AF_INET); |
|
|
struct sockaddr_in address, uv_connect_cb cb) { |
|
|
return uv__connect(req, (struct sockaddr*) &addr, |
|
|
assert(handle->type == UV_TCP); |
|
|
sizeof(struct sockaddr_in)); |
|
|
assert(address.sin_family == AF_INET); |
|
|
|
|
|
return uv__connect(req, handle, (struct sockaddr*) &address, |
|
|
|
|
|
sizeof(struct sockaddr_in), cb); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
int uv_tcp_connect6(uv_req_t* req, struct sockaddr_in6 addr) { |
|
|
int uv_tcp_connect6(uv_connect_t* req, uv_tcp_t* handle, |
|
|
assert(addr.sin6_family == AF_INET6); |
|
|
struct sockaddr_in6 address, uv_connect_cb cb) { |
|
|
return uv__connect(req, (struct sockaddr*) &addr, |
|
|
assert(handle->type == UV_TCP); |
|
|
sizeof(struct sockaddr_in6)); |
|
|
assert(address.sin6_family == AF_INET6); |
|
|
|
|
|
return uv__connect(req, handle, (struct sockaddr*) &address, |
|
|
|
|
|
sizeof(struct sockaddr_in6), cb); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -997,9 +1002,21 @@ static size_t uv__buf_count(uv_buf_t bufs[], int bufcnt) { |
|
|
/* The buffers to be written must remain valid until the callback is called.
|
|
|
/* The buffers to be written must remain valid until the callback is called.
|
|
|
* This is not required for the uv_buf_t array. |
|
|
* This is not required for the uv_buf_t array. |
|
|
*/ |
|
|
*/ |
|
|
int uv_write(uv_req_t* req, uv_buf_t bufs[], int bufcnt) { |
|
|
int uv_write(uv_write_t* req, uv_stream_t* handle, uv_buf_t bufs[], int bufcnt, |
|
|
uv_tcp_t* tcp = (uv_tcp_t*)req->handle; |
|
|
uv_write_cb cb) { |
|
|
int empty_queue = (tcp->write_queue_size == 0); |
|
|
int empty_queue; |
|
|
|
|
|
uv_tcp_t* tcp = (uv_tcp_t*)handle; |
|
|
|
|
|
|
|
|
|
|
|
/* Initialize the req */ |
|
|
|
|
|
uv__req_init((uv_req_t*) req); |
|
|
|
|
|
req->cb = cb; |
|
|
|
|
|
req->handle = handle; |
|
|
|
|
|
ngx_queue_init(&req->queue); |
|
|
|
|
|
|
|
|
|
|
|
assert(handle->type == UV_TCP && |
|
|
|
|
|
"uv_write (unix) does not yet support other types of streams"); |
|
|
|
|
|
|
|
|
|
|
|
empty_queue = (tcp->write_queue_size == 0); |
|
|
assert(tcp->fd >= 0); |
|
|
assert(tcp->fd >= 0); |
|
|
|
|
|
|
|
|
ngx_queue_init(&req->queue); |
|
|
ngx_queue_init(&req->queue); |
|
@ -1118,12 +1135,10 @@ int uv_read_stop(uv_stream_t* stream) { |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
void uv_req_init(uv_req_t* req, uv_handle_t* handle, void *(*cb)(void *)) { |
|
|
void uv__req_init(uv_req_t* req) { |
|
|
uv_counters()->req_init++; |
|
|
uv_counters()->req_init++; |
|
|
req->type = UV_UNKNOWN_REQ; |
|
|
req->type = UV_UNKNOWN_REQ; |
|
|
req->cb = cb; |
|
|
req->data = NULL; |
|
|
req->handle = handle; |
|
|
|
|
|
ngx_queue_init(&req->queue); |
|
|
|
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -1634,6 +1649,7 @@ int uv_pipe_listen(uv_pipe_t* handle, uv_connection_cb cb) { |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
int uv_pipe_connect(uv_req_t* req, const char* name) { |
|
|
int uv_pipe_connect(uv_connect_t* req, uv_pipe_t* handle, |
|
|
|
|
|
const char* name, uv_connect_cb cb) { |
|
|
assert(0 && "implement me"); |
|
|
assert(0 && "implement me"); |
|
|
} |
|
|
} |
|
|