Browse Source

Bugfix: blocked pumping in stdio coupling

This should fix the test in c05b5d8 by Mikeal Rogers.
v0.7.4-release
Ryan Dahl 15 years ago
parent
commit
fe85062046
  1. 206
      deps/coupling/coupling.c
  2. 11
      test/mjsunit/fixtures/echo.js
  3. 36
      test/mjsunit/test-stdio.js

206
deps/coupling/coupling.c

@ -143,98 +143,176 @@ ring_buffer_push (ring_buffer *ring, int fd)
return r; return r;
} }
/* PULL PUMP
*
* This is used to read data from a blocking file descriptor and pump it into
* a non-blocking pipe (or other non-blocking fd). The algorithm is this:
*
* while (true) {
* read(STDIN_FILENO) // blocking
*
* while (!ring.empty) {
* write(pipe) // non-blocking
* select(pipe, writable)
* }
* }
*
*/
static void static void
pump (int is_pull, int pullfd, int pushfd) pull_pump (int pullfd, int pushfd)
{ {
int r; int r;
ring_buffer ring; ring_buffer ring;
fd_set readfds, writefds, exceptfds;
fd_set writefds, exceptfds;
FD_ZERO(&exceptfds);
FD_ZERO(&writefds);
FD_SET(pushfd, &exceptfds);
FD_SET(pushfd, &writefds);
ring_buffer_init(&ring); ring_buffer_init(&ring);
int maxfd; while (pullfd >= 0) {
/* Blocking read from STDIN_FILENO */
r = ring_buffer_pull(&ring, pullfd);
while (pushfd >= 0 && (pullfd >= 0 || !ring_buffer_empty_p(&ring))) { if (r == 0) {
FD_ZERO(&exceptfds); /* eof */
FD_ZERO(&readfds); close(pullfd);
FD_ZERO(&writefds); pullfd = -1;
} else if (r < 0 && errno != EINTR && errno != EAGAIN) {
maxfd = -1; /* error */
perror("pull_pump read()");
if (is_pull) { close(pullfd);
if (!ring_buffer_empty_p(&ring)) { pullfd = -1;
maxfd = pushfd; }
FD_SET(pushfd, &exceptfds);
FD_SET(pushfd, &writefds); /* Push all of the data in the ring buffer out. */
} while (!ring_buffer_empty_p(&ring)) {
} else { /* non-blocking write() to the pipe */
if (pullfd >= 0) { r = ring_buffer_push(&ring, pushfd);
if (!ring_buffer_filled_p(&ring)) {
maxfd = pullfd; if (r < 0 && errno != EAGAIN && errno != EINTR) {
FD_SET(pullfd, &exceptfds); if (errno == EPIPE) {
FD_SET(pullfd, &readfds); /* This happens if someone closes the other end of the pipe. This
* is a normal forced close of STDIN. Hopefully there wasn't data
* in the ring buffer. Just close both ends and exit.
*/
close(pushfd);
close(pullfd);
pushfd = pullfd = -1;
} else {
perror("pull_pump write()");
close(pushfd);
close(pullfd);
} }
return;
} }
}
if (maxfd >= 0) { /* Select for writablity on the pipe end.
r = select(maxfd+1, &readfds, &writefds, &exceptfds, NULL); * Very rarely will this stick.
*/
r = select(pushfd+1, NULL, &writefds, &exceptfds, NULL);
if (r < 0 || (pullfd >= 0 && FD_ISSET(pushfd, &exceptfds))) { if (r < 0 || FD_ISSET(pushfd, &exceptfds)) {
close(pushfd); close(pushfd);
close(pullfd); close(pullfd);
pushfd = pullfd = -1; pushfd = pullfd = -1;
return; return;
} }
} }
}
assert(pullfd < 0);
assert(ring_buffer_empty_p(&ring));
close(pushfd);
}
/* PUSH PUMP
*
* This is used to push data out to a blocking file descriptor. It pulls
* data from a non-blocking pipe (pullfd) and pushes to STDOUT_FILENO
* (pushfd).
* When the pipe is closed, then the rest of the data is pushed out and then
* STDOUT_FILENO is closed.
*
* The algorithm looks roughly like this:
*
* while (true) {
* r = read(pipe) // nonblocking
*
* while (!ring.empty) {
* write(STDOUT_FILENO) // blocking
* }
*
* select(pipe, readable);
* }
*/
static void
push_pump (int pullfd, int pushfd)
{
int r;
ring_buffer ring;
fd_set readfds, exceptfds;
FD_ZERO(&exceptfds);
FD_ZERO(&readfds);
FD_SET(pullfd, &exceptfds);
FD_SET(pullfd, &readfds);
ring_buffer_init(&ring);
/* The pipe is open or there is data left to be pushed out
* NOTE: if pushfd (STDOUT_FILENO) ever errors out, then we just exit the
* loop.
*/
while (pullfd >= 0 || !ring_buffer_empty_p(&ring)) {
if (pullfd >= 0 && FD_ISSET(pullfd, &exceptfds)) { /* Pull from the non-blocking pipe */
r = ring_buffer_pull(&ring, pullfd);
if (r == 0) {
/* eof */
close(pullfd);
pullfd = -1;
} else if (r < 0 && errno != EINTR && errno != EAGAIN) {
perror("push_pump read()");
close(pullfd); close(pullfd);
pullfd = -1; pullfd = -1;
return;
} }
if (pullfd >= 0 && (is_pull || FD_ISSET(pullfd, &readfds))) { /* Push everything out to STDOUT */
r = ring_buffer_pull(&ring, pullfd); while (!ring_buffer_empty_p(&ring)) {
if (r == 0) { /* Blocking write() to pushfd (STDOUT_FILENO) */
/* eof */ r = ring_buffer_push(&ring, pushfd);
close(pullfd);
pullfd = -1; /* If there was a problem, just exit the entire function */
} else if (r < 0) { if (r < 0 && errno != EINTR) {
if (errno != EINTR && errno != EAGAIN) goto error; close(pushfd);
close(pullfd);
pushfd = pullfd = -1;
return;
} }
} }
if (pullfd >= 0) {
/* select for readability on the pullfd */
r = select(pullfd+1, &readfds, NULL, &exceptfds, NULL);
if (!is_pull || FD_ISSET(pushfd, &writefds)) { if (r < 0 || FD_ISSET(pullfd, &exceptfds)) {
r = ring_buffer_push(&ring, pushfd); close(pushfd);
if (r < 0) { close(pullfd);
switch (errno) { pushfd = pullfd = -1;
case EINTR: return;
case EAGAIN:
continue;
case EPIPE:
/* TODO catch SIGPIPE? */
close(pushfd);
close(pullfd);
pushfd = pullfd = -1;
return;
default:
goto error;
}
} }
} }
} }
/* If we got here then we got eof on pullfd and pushed all the data out.
* so now just close pushfd */
assert(pullfd < 0);
assert(ring_buffer_empty_p(&ring));
close(pushfd); close(pushfd);
close(pullfd);
return;
error:
close(pushfd);
close(pullfd);
perror("(coupling) pump");
} }
static inline int static inline int
@ -262,7 +340,11 @@ pump_thread (void *data)
{ {
struct coupling *c = (struct coupling*)data; struct coupling *c = (struct coupling*)data;
pump(c->is_pull, c->pullfd, c->pushfd); if (c->is_pull) {
pull_pump(c->pullfd, c->pushfd);
} else {
push_pump(c->pullfd, c->pushfd);
}
return NULL; return NULL;
} }

11
test/mjsunit/fixtures/echo.js

@ -1,5 +1,12 @@
process.mixin(require("../common")); process.mixin(require("../common"));
process.stdio.open(); process.stdio.open();
print("hello world\r\n");
process.stdio.addListener("data", function (data) { process.stdio.addListener("data", function (data) {
puts(data); print(data);
}); });
process.stdio.addListener("close", function () {
process.stdio.close();
});

36
test/mjsunit/test-stdio.js

@ -2,20 +2,34 @@ process.mixin(require("./common"));
var sub = path.join(fixturesDir, 'echo.js'); var sub = path.join(fixturesDir, 'echo.js');
var result = false; var gotHelloWorld = false;
var gotEcho = false;
var child = process.createChildProcess(path.join(libDir, "../bin/node"), [sub]);
var child = process.createChildProcess(process.argv[0], [sub]);
child.addListener("error", function (data){ child.addListener("error", function (data){
puts("parent stderr: " + data); puts("parent stderr: " + data);
}); });
child.addListener("output", function (data){ child.addListener("output", function (data){
if (data && data[0] == 't') { if (data) {
result = true; puts('child said: ' + JSON.stringify(data));
if (!gotHelloWorld) {
assert.equal("hello world\r\n", data);
gotHelloWorld = true;
child.write('echo me\r\n');
} else {
assert.equal("echo me\r\n", data);
gotEcho = true;
child.close();
}
} else {
puts('child eof');
} }
}); });
setTimeout(function () {
child.write('t\r\n');
}, 100); process.addListener('exit', function () {
setTimeout(function (){ assert.ok(gotHelloWorld);
assert.ok(result); assert.ok(gotEcho);
}, 500) });

Loading…
Cancel
Save