From 8658999c7d0a722101765aa412e067dced82da6e Mon Sep 17 00:00:00 2001 From: Ryan Date: Tue, 25 Aug 2009 01:06:49 +0200 Subject: [PATCH] Refactor node.Process to take advantage of evcom_reader/writer. --- src/node.cc | 25 -- src/node.h | 1 - src/process.cc | 412 ++++++++++--------------- src/process.h | 27 +- test/mjsunit/test-process-buffering.js | 6 +- test/mjsunit/test-process-simple.js | 12 +- 6 files changed, 185 insertions(+), 298 deletions(-) diff --git a/src/node.cc b/src/node.cc index 4f85d51d9b..be68ba1e82 100644 --- a/src/node.cc +++ b/src/node.cc @@ -24,31 +24,6 @@ using namespace v8; using namespace node; -static void -buf_free (evcom_buf *b) -{ - size_t total = sizeof(evcom_buf) + b->len; - V8::AdjustAmountOfExternalAllocatedMemory(-total); - free(b); -} - -evcom_buf * -node::buf_new (size_t size) -{ - size_t total = sizeof(evcom_buf) + size; - void *p = malloc(total); - if (p == NULL) return NULL; - - evcom_buf *b = static_cast(p); - b->base = static_cast(p) + sizeof(evcom_buf); - - b->len = size; - b->release = buf_free; - V8::AdjustAmountOfExternalAllocatedMemory(total); - - return b; -} - // Extracts a C string from a V8 Utf8Value. const char* ToCString(const v8::String::Utf8Value& value) diff --git a/src/node.h b/src/node.h index 1ce7a34000..c2e466610d 100644 --- a/src/node.h +++ b/src/node.h @@ -32,7 +32,6 @@ do { \ enum encoding {ASCII, UTF8, RAW}; enum encoding ParseEncoding (v8::Handle encoding_v); void FatalException (v8::TryCatch &try_catch); -evcom_buf * buf_new (size_t size); } // namespace node #endif // node_h diff --git a/src/process.cc b/src/process.cc index 3f91571547..d36add77ed 100644 --- a/src/process.cc +++ b/src/process.cc @@ -11,13 +11,7 @@ using namespace v8; using namespace node; -#define PID_SYMBOL String::NewSymbol("pid") - -/* defines for the parent side */ -#define STDOUT_CLOSED (stdout_pipe_[0] < 0) -#define STDERR_CLOSED (stderr_pipe_[0] < 0) -#define STDIN_CLOSED (stdin_pipe_[1] < 0) - +#define PID_SYMBOL String::NewSymbol("pid") Persistent Process::constructor_template; @@ -36,9 +30,6 @@ Process::Initialize (Handle target) NODE_SET_PROTOTYPE_METHOD(constructor_template, "close", Process::Close); NODE_SET_PROTOTYPE_METHOD(constructor_template, "kill", Process::Kill); - constructor_template->PrototypeTemplate()->SetAccessor(PID_SYMBOL, - PIDGetter); - target->Set(String::NewSymbol("Process"), constructor_template->GetFunction()); } @@ -70,73 +61,55 @@ Process::Spawn (const Arguments& args) return ThrowException(String::New("Error spawning")); } + process->handle_->Set(PID_SYMBOL, Integer::New(process->pid_)); + return Undefined(); } Handle -Process::PIDGetter (Local property, const AccessorInfo& info) +Process::Write (const Arguments& args) { HandleScope scope; - Process *process = ObjectWrap::Unwrap(info.This()); - + Process *process = ObjectWrap::Unwrap(args.Holder()); assert(process); - assert(property == PID_SYMBOL); - if (process->pid_ == 0) return Null(); + ssize_t len; - Local pid = Integer::New(process->pid_); - return scope.Close(pid); -} + Local string; + Local array; -Handle -Process::Write (const Arguments& args) -{ - HandleScope scope; - Process *process = ObjectWrap::Unwrap(args.Holder()); - assert(process); + if (args[0]->IsArray()) { + array = Local::Cast(args[0]); + len = array->Length(); + } else { + string = args[0]->ToString(); + len = string->Utf8Length(); + } + + char buf[len]; - // XXX - // A lot of improvement can be made here. First of all we're allocating - // evcom_bufs for every send which is clearly inefficent - it should use a - // memory pool or ring buffer. Of course, expressing binary data as an - // array of integers is extremely inefficent. This can improved when v8 - // bug 270 (http://code.google.com/p/v8/issues/detail?id=270) has been - // addressed. - - evcom_buf *buf; - size_t len; - - if (args[0]->IsString()) { - enum encoding enc = ParseEncoding(args[1]); - Local s = args[0]->ToString(); - len = s->Utf8Length(); - buf = node::buf_new(len); - switch (enc) { + if (args[0]->IsArray()) { + for (ssize_t index = 0; index < len; index++) { + Local int_value = array->Get(Integer::New(index)); + buf[index] = int_value->IntegerValue(); + } + } else { + switch (ParseEncoding(args[1])) { case RAW: case ASCII: - s->WriteAscii(buf->base, 0, len); + string->WriteAscii(buf, 0, len); break; case UTF8: - s->WriteUtf8(buf->base, len); + string->WriteUtf8(buf, len); break; default: - assert(0 && "unhandled string encoding"); + return ThrowException(String::New("Unknown encoding.")); } + } - } else if (args[0]->IsArray()) { - Handle array = Handle::Cast(args[0]); - len = array->Length(); - buf = node::buf_new(len); - for (size_t i = 0; i < len; i++) { - Local int_value = array->Get(Integer::New(i)); - buf->base[i] = int_value->IntegerValue(); - } - - } else return ThrowException(String::New("Bad argument")); - - return process->Write(buf) == 0 ? True() : False(); + return process->Write(buf, len) == 0 ? True() : False(); } Handle @@ -153,7 +126,7 @@ Process::Kill (const Arguments& args) return ThrowException(String::New("Process already dead")); } - return Undefined(); + return Undefined(); } Handle @@ -165,35 +138,93 @@ Process::Close (const Arguments& args) return process->Close() == 0 ? True() : False(); } +void +Process::reader_closed (evcom_reader *r) +{ + Process *process = static_cast (r->data); + if (r == &process->stdout_reader_) { + process->stdout_fd_ = -1; + } else { + assert(r == &process->stderr_reader_); + process->stderr_fd_ = -1; + } + evcom_reader_detach(r); + process->MaybeShutdown(); +} + +void +Process::stdin_closed (evcom_writer *w) +{ + Process *process = static_cast (w->data); + assert(w == &process->stdin_writer_); + process->stdin_fd_ = -1; + evcom_writer_detach(w); + process->MaybeShutdown(); +} + +void +Process::on_read (evcom_reader *r, const void *buf, size_t len) +{ + Process *process = static_cast (r->data); + HandleScope scope; + + bool isSTDOUT = (r == &process->stdout_reader_); + Local argv[1]; + + enum encoding encoding = isSTDOUT ? process->stdout_encoding_ : process->stderr_encoding_; + + if (len == 0) { + argv[0] = Local::New(Null()); + + } else if (encoding == RAW) { + // raw encoding + Local array = Array::New(len); + for (size_t i = 0; i < len; i++) { + unsigned char val = static_cast(buf)[i]; + array->Set(Integer::New(i), Integer::New(val)); + } + argv[0] = array; + + } else { + // utf8 or ascii encoding + argv[0] = String::New((const char*)buf, len); + } + + process->Emit(isSTDOUT ? "output" : "error", 1, argv); + process->MaybeShutdown(); +} + Process::Process () : EventEmitter() { - ev_init(&stdout_watcher_, Process::OnOutput); - stdout_watcher_.data = this; + evcom_reader_init(&stdout_reader_); + stdout_reader_.data = this; + stdout_reader_.on_read = on_read; + stdout_reader_.on_close = reader_closed; - ev_init(&stderr_watcher_, Process::OnOutput); - stderr_watcher_.data = this; + evcom_reader_init(&stderr_reader_); + stderr_reader_.data = this; + stderr_reader_.on_read = on_read; + stderr_reader_.on_close = reader_closed; - ev_init(&stdin_watcher_, Process::OnWritable); - stdin_watcher_.data = this; + evcom_writer_init(&stdin_writer_); + stdin_writer_.data = this; + stdin_writer_.on_close = stdin_closed; ev_init(&child_watcher_, Process::OnCHLD); child_watcher_.data = this; - stdout_pipe_[0] = -1; - stdout_pipe_[1] = -1; - stderr_pipe_[0] = -1; - stderr_pipe_[1] = -1; - stdin_pipe_[0] = -1; - stdin_pipe_[1] = -1; + stdout_fd_ = -1; + stderr_fd_ = -1; + stdin_fd_ = -1; + + stdout_encoding_ = UTF8; + stderr_encoding_ = UTF8; - got_close_ = false; got_chld_ = false; exit_code_ = 0; - pid_ = 0; - - evcom_queue_init(&out_stream_); + pid_ = 0; } Process::~Process () @@ -202,37 +233,26 @@ Process::~Process () } void -Process::Shutdown () +Process::Shutdown () { - // Clear the out_stream - while (!evcom_queue_empty(&out_stream_)) { - evcom_queue *q = evcom_queue_last(&out_stream_); - evcom_buf *buf = (evcom_buf*) evcom_queue_data(q, evcom_buf, queue); - evcom_queue_remove(q); - if (buf->release) buf->release(buf); + if (stdin_fd_ >= 0) { + evcom_writer_close(&stdin_writer_); } - if (stdout_pipe_[0] >= 0) close(stdout_pipe_[0]); - if (stdout_pipe_[1] >= 0) close(stdout_pipe_[1]); + if (stdin_fd_ >= 0) close(stdin_fd_); + if (stdout_fd_ >= 0) close(stdout_fd_); + if (stderr_fd_ >= 0) close(stderr_fd_); - if (stderr_pipe_[0] >= 0) close(stderr_pipe_[0]); - if (stderr_pipe_[1] >= 0) close(stderr_pipe_[1]); + stdin_fd_ = -1; + stdout_fd_ = -1; + stderr_fd_ = -1; - if (stdin_pipe_[0] >= 0) close(stdin_pipe_[0]); - if (stdin_pipe_[1] >= 0) close(stdin_pipe_[1]); - - stdout_pipe_[0] = -1; - stdout_pipe_[1] = -1; - stderr_pipe_[0] = -1; - stderr_pipe_[1] = -1; - stdin_pipe_[0] = -1; - stdin_pipe_[1] = -1; - - ev_io_stop(EV_DEFAULT_UC_ &stdout_watcher_); - ev_io_stop(EV_DEFAULT_UC_ &stderr_watcher_); - ev_io_stop(EV_DEFAULT_UC_ &stdin_watcher_); + evcom_writer_detach(&stdin_writer_); + evcom_reader_detach(&stdout_reader_); + evcom_reader_detach(&stderr_reader_); ev_child_stop(EV_DEFAULT_UC_ &child_watcher_); + /* XXX Kill the PID? */ pid_ = 0; } @@ -252,25 +272,24 @@ int Process::Spawn (const char *command) { assert(pid_ == 0); - assert(stdout_pipe_[0] == -1); - assert(stdout_pipe_[1] == -1); - assert(stderr_pipe_[0] == -1); - assert(stderr_pipe_[1] == -1); - assert(stdin_pipe_[0] == -1); - assert(stdin_pipe_[1] == -1); + assert(stdout_fd_ == -1); + assert(stderr_fd_ == -1); + assert(stdin_fd_ == -1); + + int stdout_pipe[2], stdin_pipe[2], stderr_pipe[2]; /* An implementation of popen(), basically */ - if (pipe(stdout_pipe_) < 0) { + if (pipe(stdout_pipe) < 0) { perror("pipe()"); return -1; } - if (pipe(stderr_pipe_) < 0) { + if (pipe(stderr_pipe) < 0) { perror("pipe()"); return -2; } - if (pipe(stdin_pipe_) < 0) { + if (pipe(stdin_pipe) < 0) { perror("pipe()"); return -3; } @@ -281,14 +300,14 @@ Process::Spawn (const char *command) return -4; case 0: // Child. - close(stdout_pipe_[0]); // close read end - dup2(stdout_pipe_[1], STDOUT_FILENO); + close(stdout_pipe[0]); // close read end + dup2(stdout_pipe[1], STDOUT_FILENO); - close(stderr_pipe_[0]); // close read end - dup2(stderr_pipe_[1], STDERR_FILENO); + close(stderr_pipe[0]); // close read end + dup2(stderr_pipe[1], STDERR_FILENO); - close(stdin_pipe_[1]); // close write end - dup2(stdin_pipe_[0], STDIN_FILENO); + close(stdin_pipe[1]); // close write end + dup2(stdin_pipe[0], STDIN_FILENO); execl("/bin/sh", "sh", "-c", command, (char *)NULL); _exit(127); @@ -299,136 +318,33 @@ Process::Spawn (const char *command) ev_child_set(&child_watcher_, pid_, 0); ev_child_start(EV_DEFAULT_UC_ &child_watcher_); - SetNonBlocking(stdout_pipe_[0]); - ev_io_set(&stdout_watcher_, stdout_pipe_[0], EV_READ); - ev_io_start(EV_DEFAULT_UC_ &stdout_watcher_); - close(stdout_pipe_[1]); // close write end - stdout_pipe_[1] = -1; - - SetNonBlocking(stderr_pipe_[0]); - ev_io_set(&stderr_watcher_, stderr_pipe_[0], EV_READ); - ev_io_start(EV_DEFAULT_UC_ &stderr_watcher_); - close(stderr_pipe_[1]); // close write end - stderr_pipe_[1] = -1; - - SetNonBlocking(stdin_pipe_[1]); - ev_io_set(&stdin_watcher_, stdin_pipe_[1], EV_WRITE); - ev_io_start(EV_DEFAULT_UC_ &stdin_watcher_); - close(stdin_pipe_[0]); // close read end - stdin_pipe_[0] = -1; + close(stdout_pipe[1]); + stdout_fd_ = stdout_pipe[0]; + SetNonBlocking(stdout_fd_); - Attach(); - - return 0; -} - -void -Process::OnOutput (EV_P_ ev_io *watcher, int revents) -{ - int r; - char buf[16*1024]; - size_t buf_size = 16*1024; + close(stderr_pipe[1]); + stderr_fd_ = stderr_pipe[0]; + SetNonBlocking(stderr_fd_); - Process *process = static_cast(watcher->data); - - bool is_stdout = (&process->stdout_watcher_ == watcher); - int fd = is_stdout ? process->stdout_pipe_[0] : process->stderr_pipe_[0]; + close(stdin_pipe[0]); + stdin_fd_ = stdin_pipe[1]; + SetNonBlocking(stdin_fd_); - assert(revents == EV_READ); - assert(fd >= 0); + evcom_reader_set(&stdout_reader_, stdout_fd_); + evcom_reader_attach(EV_DEFAULT_UC_ &stdout_reader_); - HandleScope scope; - Handle argv[1]; - - for (;;) { - r = read(fd, buf, buf_size); - - if (r < 0) { - if (errno != EAGAIN) { - perror("IPC pipe read error"); - } else { - if (process->got_chld_) { - close(fd); - if (is_stdout) { - process->stdout_pipe_[0] = -1; - } else { - process->stderr_pipe_[0] = -1; - } - } - } - break; - } + evcom_reader_set(&stderr_reader_, stderr_fd_); + evcom_reader_attach(EV_DEFAULT_UC_ &stderr_reader_); - if (r == 0) { - argv[0] = Null(); - } else { - // TODO multiple encodings - argv[0] = String::New((const char*)buf, r); - } + evcom_writer_set(&stdin_writer_, stdin_fd_); + evcom_writer_attach(EV_DEFAULT_UC_ &stdin_writer_); - process->Emit(is_stdout ? "output" : "error", 1, argv); - - if (r == 0) { - ev_io_stop(EV_DEFAULT_UC_ watcher); - close(fd); - if (is_stdout) { - process->stdout_pipe_[0] = -1; - } else { - process->stderr_pipe_[0] = -1; - } - break; - } - } - process->MaybeShutdown(); -} - -void -Process::OnWritable (EV_P_ ev_io *watcher, int revents) -{ - Process *process = static_cast(watcher->data); - int sent; - - assert(revents == EV_WRITE); - assert(process->stdin_pipe_[1] >= 0); - - while (!evcom_queue_empty(&process->out_stream_)) { - evcom_queue *q = evcom_queue_last(&process->out_stream_); - evcom_buf *to_write = (evcom_buf*) evcom_queue_data(q, evcom_buf, queue); - - sent = write( process->stdin_pipe_[1] - , to_write->base + to_write->written - , to_write->len - to_write->written - ); - if (sent < 0) { - if (errno == EAGAIN) { - if (process->got_chld_) { - close(process->stdin_pipe_[1]); - process->stdin_pipe_[1] = -1; - } - break; - } - perror("IPC pipe write error"); - break; - } - - to_write->written += sent; - - if (to_write->written == to_write->len) { - evcom_queue_remove(q); - if (to_write->release) to_write->release(to_write); - } - } + Attach(); - if (evcom_queue_empty(&process->out_stream_)) { - ev_io_stop(EV_DEFAULT_UC_ &process->stdin_watcher_); - if (process->got_close_) { - close(process->stdin_pipe_[1]); - process->stdin_pipe_[1] = -1; - } - } + return 0; } -void +void Process::OnCHLD (EV_P_ ev_child *watcher, int revents) { ev_child_stop(EV_A_ watcher); @@ -441,38 +357,24 @@ Process::OnCHLD (EV_P_ ev_child *watcher, int revents) process->got_chld_ = true; process->exit_code_ = watcher->rstatus; - if (process->stdout_pipe_[0] >= 0) { - ev_feed_event(&process->stdout_watcher_, EV_READ); - } - - if (process->stderr_pipe_[0] >= 0) { - ev_feed_event(&process->stderr_watcher_, EV_READ); - } - - if (process->stdin_pipe_[1] >= 0) { - ev_io_start(EV_DEFAULT_UC_ &process->stdin_watcher_); - ev_feed_event(&process->stdin_watcher_, EV_WRITE); - } + if (process->stdin_fd_ >= 0) evcom_writer_close(&process->stdin_writer_); process->MaybeShutdown(); } int -Process::Write (evcom_buf *buf) +Process::Write (const char *str, size_t len) { - if (STDIN_CLOSED || got_close_ || got_chld_) return -1; - evcom_queue_insert_head(&out_stream_, &buf->queue); - buf->written = 0; - ev_io_start(EV_DEFAULT_UC_ &stdin_watcher_); + if (stdin_fd_ < 0 || got_chld_) return -1; + evcom_writer_write(&stdin_writer_, str, len); return 0; } int Process::Close (void) { - if (STDIN_CLOSED || got_close_ || got_chld_) return -1; - got_close_ = true; - ev_io_start(EV_DEFAULT_UC_ &stdin_watcher_); + if (stdin_fd_ < 0 || got_chld_) return -1; + evcom_writer_close(EV_DEFAULT_UC_ &stdin_writer_); return 0; } @@ -486,11 +388,11 @@ Process::Kill (int sig) void Process::MaybeShutdown (void) { - if (STDOUT_CLOSED && STDERR_CLOSED && got_chld_) { + if (stdout_fd_ < 0 && stderr_fd_ < 0 && got_chld_) { HandleScope scope; Handle argv[1] = { Integer::New(exit_code_) }; Emit("exit", 1, argv); Shutdown(); Detach(); - } + } } diff --git a/src/process.h b/src/process.h index a940789524..fea3cda4ab 100644 --- a/src/process.h +++ b/src/process.h @@ -26,35 +26,36 @@ class Process : EventEmitter { ~Process(); int Spawn (const char *command); - int Write (evcom_buf *buf); + int Write (const char *str, size_t len); int Close (void); int Kill (int sig); private: - static void OnOutput (EV_P_ ev_io *watcher, int revents); - static void OnError (EV_P_ ev_io *watcher, int revents); - static void OnWritable (EV_P_ ev_io *watcher, int revents); + static void on_read (evcom_reader *r, const void *buf, size_t len); + static void reader_closed (evcom_reader *r); + static void stdin_closed (evcom_writer *w); static void OnCHLD (EV_P_ ev_child *watcher, int revents); void MaybeShutdown (void); void Shutdown (void); - ev_io stdout_watcher_; - ev_io stderr_watcher_; - ev_io stdin_watcher_; + evcom_reader stdout_reader_; + evcom_reader stderr_reader_; + evcom_writer stdin_writer_; + ev_child child_watcher_; - int stdout_pipe_[2]; - int stderr_pipe_[2]; - int stdin_pipe_[2]; + int stdout_fd_; + int stderr_fd_; + int stdin_fd_; + + enum encoding stdout_encoding_; + enum encoding stderr_encoding_; pid_t pid_; - bool got_close_; bool got_chld_; int exit_code_; - - evcom_queue out_stream_; }; } // namespace node diff --git a/test/mjsunit/test-process-buffering.js b/test/mjsunit/test-process-buffering.js index 2e48ad7b06..75438cdc24 100644 --- a/test/mjsunit/test-process-buffering.js +++ b/test/mjsunit/test-process-buffering.js @@ -6,9 +6,11 @@ function pwd (callback) { var output = ""; var process = node.createProcess("pwd"); process.addListener("output", function (s) { + puts("stdout: " + JSON.stringify(s)); if (s) output += s; }); - process.addListener("exit", function(c) { + process.addListener("exit", function (c) { + puts("exit: " + c); assertEquals(0, c); callback(output); pwd_called = true; @@ -18,7 +20,7 @@ function pwd (callback) { function onLoad () { pwd(function (result) { - print(result); + p(result); assertTrue(result.length > 1); assertEquals("\n", result[result.length-1]); }); diff --git a/test/mjsunit/test-process-simple.js b/test/mjsunit/test-process-simple.js index 6f1b8f41c1..9d68b7db51 100644 --- a/test/mjsunit/test-process-simple.js +++ b/test/mjsunit/test-process-simple.js @@ -6,15 +6,23 @@ var response = ""; var exit_status = -1; cat.addListener("output", function (chunk) { + puts("stdout: " + JSON.stringify(chunk)); if (chunk) { response += chunk; - if (response === "hello world") cat.close(); + if (response === "hello world") { + puts("closing cat"); + cat.close(); + } } }); cat.addListener("error", function (chunk) { + puts("stderr: " + JSON.stringify(chunk)); assertEquals(null, chunk); }); -cat.addListener("exit", function (status) { exit_status = status; }); +cat.addListener("exit", function (status) { + puts("exit event"); + exit_status = status; +}); function onLoad () { cat.write("hello");