|
|
@ -143,98 +143,176 @@ ring_buffer_push (ring_buffer *ring, int fd) |
|
|
|
return r; |
|
|
|
} |
|
|
|
|
|
|
|
/* PULL PUMP
|
|
|
|
* |
|
|
|
* This is used to read data from a blocking file descriptor and pump it into |
|
|
|
* a non-blocking pipe (or other non-blocking fd). The algorithm is this: |
|
|
|
* |
|
|
|
* while (true) { |
|
|
|
* read(STDIN_FILENO) // blocking
|
|
|
|
* |
|
|
|
* while (!ring.empty) { |
|
|
|
* write(pipe) // non-blocking
|
|
|
|
* select(pipe, writable) |
|
|
|
* } |
|
|
|
* } |
|
|
|
* |
|
|
|
*/ |
|
|
|
static void |
|
|
|
pump (int is_pull, int pullfd, int pushfd) |
|
|
|
pull_pump (int pullfd, int pushfd) |
|
|
|
{ |
|
|
|
int r; |
|
|
|
ring_buffer ring; |
|
|
|
fd_set readfds, writefds, exceptfds; |
|
|
|
|
|
|
|
fd_set writefds, exceptfds; |
|
|
|
FD_ZERO(&exceptfds); |
|
|
|
FD_ZERO(&writefds); |
|
|
|
FD_SET(pushfd, &exceptfds); |
|
|
|
FD_SET(pushfd, &writefds); |
|
|
|
|
|
|
|
ring_buffer_init(&ring); |
|
|
|
|
|
|
|
int maxfd; |
|
|
|
while (pullfd >= 0) { |
|
|
|
/* Blocking read from STDIN_FILENO */ |
|
|
|
r = ring_buffer_pull(&ring, pullfd); |
|
|
|
|
|
|
|
while (pushfd >= 0 && (pullfd >= 0 || !ring_buffer_empty_p(&ring))) { |
|
|
|
FD_ZERO(&exceptfds); |
|
|
|
FD_ZERO(&readfds); |
|
|
|
FD_ZERO(&writefds); |
|
|
|
if (r == 0) { |
|
|
|
/* eof */ |
|
|
|
close(pullfd); |
|
|
|
pullfd = -1; |
|
|
|
} else if (r < 0 && errno != EINTR && errno != EAGAIN) { |
|
|
|
/* error */ |
|
|
|
perror("pull_pump read()"); |
|
|
|
close(pullfd); |
|
|
|
pullfd = -1; |
|
|
|
} |
|
|
|
|
|
|
|
maxfd = -1; |
|
|
|
/* Push all of the data in the ring buffer out. */ |
|
|
|
while (!ring_buffer_empty_p(&ring)) { |
|
|
|
/* non-blocking write() to the pipe */ |
|
|
|
r = ring_buffer_push(&ring, pushfd); |
|
|
|
|
|
|
|
if (is_pull) { |
|
|
|
if (!ring_buffer_empty_p(&ring)) { |
|
|
|
maxfd = pushfd; |
|
|
|
FD_SET(pushfd, &exceptfds); |
|
|
|
FD_SET(pushfd, &writefds); |
|
|
|
} |
|
|
|
} else { |
|
|
|
if (pullfd >= 0) { |
|
|
|
if (!ring_buffer_filled_p(&ring)) { |
|
|
|
maxfd = pullfd; |
|
|
|
FD_SET(pullfd, &exceptfds); |
|
|
|
FD_SET(pullfd, &readfds); |
|
|
|
if (r < 0 && errno != EAGAIN && errno != EINTR) { |
|
|
|
if (errno == EPIPE) { |
|
|
|
/* This happens if someone closes the other end of the pipe. This
|
|
|
|
* is a normal forced close of STDIN. Hopefully there wasn't data |
|
|
|
* in the ring buffer. Just close both ends and exit. |
|
|
|
*/ |
|
|
|
close(pushfd); |
|
|
|
close(pullfd); |
|
|
|
pushfd = pullfd = -1; |
|
|
|
} else { |
|
|
|
perror("pull_pump write()"); |
|
|
|
close(pushfd); |
|
|
|
close(pullfd); |
|
|
|
} |
|
|
|
return; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
if (maxfd >= 0) { |
|
|
|
r = select(maxfd+1, &readfds, &writefds, &exceptfds, NULL); |
|
|
|
/* Select for writablity on the pipe end.
|
|
|
|
* Very rarely will this stick. |
|
|
|
*/ |
|
|
|
r = select(pushfd+1, NULL, &writefds, &exceptfds, NULL); |
|
|
|
|
|
|
|
if (r < 0 || (pullfd >= 0 && FD_ISSET(pushfd, &exceptfds))) { |
|
|
|
if (r < 0 || FD_ISSET(pushfd, &exceptfds)) { |
|
|
|
close(pushfd); |
|
|
|
close(pullfd); |
|
|
|
pushfd = pullfd = -1; |
|
|
|
return; |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
assert(pullfd < 0); |
|
|
|
assert(ring_buffer_empty_p(&ring)); |
|
|
|
close(pushfd); |
|
|
|
} |
|
|
|
|
|
|
|
/* PUSH PUMP
|
|
|
|
* |
|
|
|
* This is used to push data out to a blocking file descriptor. It pulls |
|
|
|
* data from a non-blocking pipe (pullfd) and pushes to STDOUT_FILENO |
|
|
|
* (pushfd). |
|
|
|
* When the pipe is closed, then the rest of the data is pushed out and then |
|
|
|
* STDOUT_FILENO is closed. |
|
|
|
* |
|
|
|
* The algorithm looks roughly like this: |
|
|
|
* |
|
|
|
* while (true) { |
|
|
|
* r = read(pipe) // nonblocking
|
|
|
|
* |
|
|
|
* while (!ring.empty) { |
|
|
|
* write(STDOUT_FILENO) // blocking
|
|
|
|
* } |
|
|
|
* |
|
|
|
* select(pipe, readable); |
|
|
|
* } |
|
|
|
*/ |
|
|
|
static void |
|
|
|
push_pump (int pullfd, int pushfd) |
|
|
|
{ |
|
|
|
int r; |
|
|
|
ring_buffer ring; |
|
|
|
|
|
|
|
fd_set readfds, exceptfds; |
|
|
|
FD_ZERO(&exceptfds); |
|
|
|
FD_ZERO(&readfds); |
|
|
|
FD_SET(pullfd, &exceptfds); |
|
|
|
FD_SET(pullfd, &readfds); |
|
|
|
|
|
|
|
ring_buffer_init(&ring); |
|
|
|
|
|
|
|
/* The pipe is open or there is data left to be pushed out
|
|
|
|
* NOTE: if pushfd (STDOUT_FILENO) ever errors out, then we just exit the |
|
|
|
* loop. |
|
|
|
*/ |
|
|
|
while (pullfd >= 0 || !ring_buffer_empty_p(&ring)) { |
|
|
|
|
|
|
|
/* Pull from the non-blocking pipe */ |
|
|
|
r = ring_buffer_pull(&ring, pullfd); |
|
|
|
|
|
|
|
if (pullfd >= 0 && FD_ISSET(pullfd, &exceptfds)) { |
|
|
|
if (r == 0) { |
|
|
|
/* eof */ |
|
|
|
close(pullfd); |
|
|
|
pullfd = -1; |
|
|
|
} else if (r < 0 && errno != EINTR && errno != EAGAIN) { |
|
|
|
perror("push_pump read()"); |
|
|
|
close(pullfd); |
|
|
|
pullfd = -1; |
|
|
|
return; |
|
|
|
} |
|
|
|
|
|
|
|
if (pullfd >= 0 && (is_pull || FD_ISSET(pullfd, &readfds))) { |
|
|
|
r = ring_buffer_pull(&ring, pullfd); |
|
|
|
if (r == 0) { |
|
|
|
/* eof */ |
|
|
|
close(pullfd); |
|
|
|
pullfd = -1; |
|
|
|
/* Push everything out to STDOUT */ |
|
|
|
while (!ring_buffer_empty_p(&ring)) { |
|
|
|
/* Blocking write() to pushfd (STDOUT_FILENO) */ |
|
|
|
r = ring_buffer_push(&ring, pushfd); |
|
|
|
|
|
|
|
} else if (r < 0) { |
|
|
|
if (errno != EINTR && errno != EAGAIN) goto error; |
|
|
|
/* If there was a problem, just exit the entire function */ |
|
|
|
|
|
|
|
if (r < 0 && errno != EINTR) { |
|
|
|
close(pushfd); |
|
|
|
close(pullfd); |
|
|
|
pushfd = pullfd = -1; |
|
|
|
return; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
if (!is_pull || FD_ISSET(pushfd, &writefds)) { |
|
|
|
r = ring_buffer_push(&ring, pushfd); |
|
|
|
if (r < 0) { |
|
|
|
switch (errno) { |
|
|
|
case EINTR: |
|
|
|
case EAGAIN: |
|
|
|
continue; |
|
|
|
|
|
|
|
case EPIPE: |
|
|
|
/* TODO catch SIGPIPE? */ |
|
|
|
close(pushfd); |
|
|
|
close(pullfd); |
|
|
|
pushfd = pullfd = -1; |
|
|
|
return; |
|
|
|
|
|
|
|
default: |
|
|
|
goto error; |
|
|
|
} |
|
|
|
if (pullfd >= 0) { |
|
|
|
/* select for readability on the pullfd */ |
|
|
|
r = select(pullfd+1, &readfds, NULL, &exceptfds, NULL); |
|
|
|
|
|
|
|
if (r < 0 || FD_ISSET(pullfd, &exceptfds)) { |
|
|
|
close(pushfd); |
|
|
|
close(pullfd); |
|
|
|
pushfd = pullfd = -1; |
|
|
|
return; |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
/* If we got here then we got eof on pullfd and pushed all the data out.
|
|
|
|
* so now just close pushfd */ |
|
|
|
assert(pullfd < 0); |
|
|
|
assert(ring_buffer_empty_p(&ring)); |
|
|
|
close(pushfd); |
|
|
|
close(pullfd); |
|
|
|
return; |
|
|
|
|
|
|
|
error: |
|
|
|
close(pushfd); |
|
|
|
close(pullfd); |
|
|
|
perror("(coupling) pump"); |
|
|
|
} |
|
|
|
|
|
|
|
static inline int |
|
|
@ -262,7 +340,11 @@ pump_thread (void *data) |
|
|
|
{ |
|
|
|
struct coupling *c = (struct coupling*)data; |
|
|
|
|
|
|
|
pump(c->is_pull, c->pullfd, c->pushfd); |
|
|
|
if (c->is_pull) { |
|
|
|
pull_pump(c->pullfd, c->pushfd); |
|
|
|
} else { |
|
|
|
push_pump(c->pullfd, c->pushfd); |
|
|
|
} |
|
|
|
|
|
|
|
return NULL; |
|
|
|
} |
|
|
|