From 7363ccd27324c31e9fdab6c196fad9040136821f Mon Sep 17 00:00:00 2001 From: Ryan Date: Wed, 24 Jun 2009 13:44:12 +0200 Subject: [PATCH] bugfix: Properly exit a process. This requires that onExit() is not called immediately upon receiving a SIGCHLD. There could still be data in the pipez. So, instead just set a flag and invoke the pipe watchers. Sometimes one will not receive an EOF from pipes because the process was killed by a SIGTERM, or something. If SIGCHLD has been recved but we are getting EAGAIN, the pipez need to be closed too. --- src/process.cc | 106 ++++++++++++++++++------- src/process.h | 10 ++- test/mjsunit/test-process-buffering.js | 29 +++++++ 3 files changed, 113 insertions(+), 32 deletions(-) create mode 100644 test/mjsunit/test-process-buffering.js diff --git a/src/process.cc b/src/process.cc index d43c550ad6..65e2c2f438 100644 --- a/src/process.cc +++ b/src/process.cc @@ -16,6 +16,12 @@ using namespace node; #define ON_EXIT_SYMBOL String::NewSymbol("onExit") #define PID_SYMBOL String::NewSymbol("pid") +/* defines for the parent side */ +#define STDOUT_CLOSED (stdout_pipe_[0] < 0) +#define STDERR_CLOSED (stderr_pipe_[0] < 0) +#define STDIN_CLOSED (stdin_pipe_[1] < 0) + + Persistent Process::constructor_template; void @@ -119,11 +125,7 @@ Process::Write (const Arguments& args) } else return ThrowException(String::New("Bad argument")); - if (process->Write(buf) != 0) { - return ThrowException(String::New("Pipe already closed")); - } - - return Undefined(); + return process->Write(buf) == 0 ? True() : False(); } Handle @@ -149,12 +151,7 @@ Process::Close (const Arguments& args) HandleScope scope; Process *process = NODE_UNWRAP(Process, args.Holder()); assert(process); - - if (process->Close() != 0) { - return ThrowException(String::New("Pipe already closed.")); - } - - return Undefined(); + return process->Close() == 0 ? True() : False(); } Process::Process (Handle handle) @@ -169,7 +166,7 @@ Process::Process (Handle handle) ev_init(&stdin_watcher_, Process::OnWritable); stdin_watcher_.data = this; - ev_init(&child_watcher_, Process::OnExit); + ev_init(&child_watcher_, Process::OnCHLD); child_watcher_.data = this; stdout_pipe_[0] = -1; @@ -180,6 +177,8 @@ Process::Process (Handle handle) stdin_pipe_[1] = -1; got_close_ = false; + got_chld_ = false; + exit_code_ = 0; pid_ = 0; @@ -342,7 +341,18 @@ Process::OnOutput (EV_P_ ev_io *watcher, int revents) r = read(fd, buf, buf_size); if (r < 0) { - if (errno != EAGAIN) perror("IPC pipe read error"); + if (errno != EAGAIN) { + perror("IPC pipe read error"); + } else { + if (process->got_chld_) { + close(fd); + if (is_stdout) { + process->stdout_pipe_[0] = -1; + } else { + process->stderr_pipe_[0] = -1; + } + } + } break; } @@ -364,9 +374,16 @@ Process::OnOutput (EV_P_ ev_io *watcher, int revents) if (r == 0) { ev_io_stop(EV_DEFAULT_UC_ watcher); + close(fd); + if (is_stdout) { + process->stdout_pipe_[0] = -1; + } else { + process->stderr_pipe_[0] = -1; + } break; } } + process->MaybeShutdown(); } void @@ -387,7 +404,13 @@ Process::OnWritable (EV_P_ ev_io *watcher, int revents) , to_write->len - to_write->written ); if (sent < 0) { - if (errno == EAGAIN) break; + if (errno == EAGAIN) { + if (process->got_chld_) { + close(process->stdin_pipe_[1]); + process->stdin_pipe_[1] = -1; + } + break; + } perror("IPC pipe write error"); break; } @@ -410,7 +433,7 @@ Process::OnWritable (EV_P_ ev_io *watcher, int revents) } void -Process::OnExit (EV_P_ ev_child *watcher, int revents) +Process::OnCHLD (EV_P_ ev_child *watcher, int revents) { ev_child_stop(EV_A_ watcher); Process *process = static_cast(watcher->data); @@ -419,24 +442,29 @@ Process::OnExit (EV_P_ ev_child *watcher, int revents) assert(process->pid_ == watcher->rpid); assert(&process->child_watcher_ == watcher); - // Call onExit ( watcher->rstatus ) - HandleScope scope; - Handle callback_v = process->handle_->Get(ON_EXIT_SYMBOL); + process->got_chld_ = true; + process->exit_code_ = watcher->rstatus; - if (callback_v->IsFunction()) { - Handle callback = Handle::Cast(callback_v); - TryCatch try_catch; - Handle argv[1] = { Integer::New(watcher->rstatus) }; - callback->Call(process->handle_, 1, argv); - if (try_catch.HasCaught()) FatalException(try_catch); + if (process->stdout_pipe_[0] >= 0) { + ev_feed_event(&process->stdout_watcher_, EV_READ); + } + + if (process->stderr_pipe_[0] >= 0) { + ev_feed_event(&process->stderr_watcher_, EV_READ); } - process->Shutdown(); + + if (process->stdin_pipe_[1] >= 0) { + ev_io_start(EV_DEFAULT_UC_ &process->stdin_watcher_); + ev_feed_event(&process->stdin_watcher_, EV_WRITE); + } + + process->MaybeShutdown(); } int Process::Write (oi_buf *buf) { - if (stdin_pipe_[1] < 0 || got_close_) return -1; + if (STDIN_CLOSED || got_close_ || got_chld_) return -1; oi_queue_insert_head(&out_stream_, &buf->queue); buf->written = 0; ev_io_start(EV_DEFAULT_UC_ &stdin_watcher_); @@ -444,9 +472,9 @@ Process::Write (oi_buf *buf) } int -Process::Close () +Process::Close (void) { - if (stdin_pipe_[1] < 0 || got_close_) return -1; + if (STDIN_CLOSED || got_close_ || got_chld_) return -1; got_close_ = true; ev_io_start(EV_DEFAULT_UC_ &stdin_watcher_); return 0; @@ -455,6 +483,26 @@ Process::Close () int Process::Kill (int sig) { - if (pid_ == 0) return -1; + if (got_chld_ || pid_ == 0) return -1; return kill(pid_, sig); } + +int +Process::MaybeShutdown (void) +{ + if (STDOUT_CLOSED && STDERR_CLOSED && got_chld_) { + // Call onExit + HandleScope scope; + Handle callback_v = handle_->Get(ON_EXIT_SYMBOL); + + if (callback_v->IsFunction()) { + Handle callback = Handle::Cast(callback_v); + TryCatch try_catch; + Handle argv[1] = { Integer::New(exit_code_) }; + callback->Call(handle_, 1, argv); + if (try_catch.HasCaught()) FatalException(try_catch); + } + + Shutdown(); + } +} diff --git a/src/process.h b/src/process.h index 38138b87c1..55891be070 100644 --- a/src/process.h +++ b/src/process.h @@ -24,17 +24,19 @@ class Process : ObjectWrap { Process(v8::Handle handle); ~Process(); - void Shutdown (); int Spawn (const char *command); int Write (oi_buf *buf); - int Close (); + int Close (void); int Kill (int sig); private: static void OnOutput (EV_P_ ev_io *watcher, int revents); static void OnError (EV_P_ ev_io *watcher, int revents); static void OnWritable (EV_P_ ev_io *watcher, int revents); - static void OnExit (EV_P_ ev_child *watcher, int revents); + static void OnCHLD (EV_P_ ev_child *watcher, int revents); + + int MaybeShutdown (void); + void Shutdown (void); ev_io stdout_watcher_; ev_io stderr_watcher_; @@ -48,6 +50,8 @@ class Process : ObjectWrap { pid_t pid_; bool got_close_; + bool got_chld_; + int exit_code_; oi_queue out_stream_; }; diff --git a/test/mjsunit/test-process-buffering.js b/test/mjsunit/test-process-buffering.js new file mode 100644 index 0000000000..74dfca5a35 --- /dev/null +++ b/test/mjsunit/test-process-buffering.js @@ -0,0 +1,29 @@ +include("mjsunit.js"); + +var pwd_called = false; + +function pwd (callback) { + var output = ""; + var process = new node.Process("pwd"); + process.onOutput = function (s) { + if (s) output += s; + }; + process.onExit = function(c) { + assertEquals(0, c); + callback(output); + pwd_called = true; + }; +} + + +function onLoad () { + pwd(function (result) { + print(result); + assertTrue(result.length > 1); + assertEquals("\n", result[result.length-1]); + }); +} + +function onExit () { + assertTrue(pwd_called); +}