Browse Source

Refactor node.Process to take advantage of evcom_reader/writer.

v0.7.4-release
Ryan 16 years ago
parent
commit
8658999c7d
  1. 25
      src/node.cc
  2. 1
      src/node.h
  3. 396
      src/process.cc
  4. 27
      src/process.h
  5. 4
      test/mjsunit/test-process-buffering.js
  6. 12
      test/mjsunit/test-process-simple.js

25
src/node.cc

@ -24,31 +24,6 @@
using namespace v8; using namespace v8;
using namespace node; using namespace node;
static void
buf_free (evcom_buf *b)
{
size_t total = sizeof(evcom_buf) + b->len;
V8::AdjustAmountOfExternalAllocatedMemory(-total);
free(b);
}
evcom_buf *
node::buf_new (size_t size)
{
size_t total = sizeof(evcom_buf) + size;
void *p = malloc(total);
if (p == NULL) return NULL;
evcom_buf *b = static_cast<evcom_buf*>(p);
b->base = static_cast<char*>(p) + sizeof(evcom_buf);
b->len = size;
b->release = buf_free;
V8::AdjustAmountOfExternalAllocatedMemory(total);
return b;
}
// Extracts a C string from a V8 Utf8Value. // Extracts a C string from a V8 Utf8Value.
const char* const char*
ToCString(const v8::String::Utf8Value& value) ToCString(const v8::String::Utf8Value& value)

1
src/node.h

@ -32,7 +32,6 @@ do { \
enum encoding {ASCII, UTF8, RAW}; enum encoding {ASCII, UTF8, RAW};
enum encoding ParseEncoding (v8::Handle<v8::Value> encoding_v); enum encoding ParseEncoding (v8::Handle<v8::Value> encoding_v);
void FatalException (v8::TryCatch &try_catch); void FatalException (v8::TryCatch &try_catch);
evcom_buf * buf_new (size_t size);
} // namespace node } // namespace node
#endif // node_h #endif // node_h

396
src/process.cc

@ -13,12 +13,6 @@ using namespace node;
#define PID_SYMBOL String::NewSymbol("pid") #define PID_SYMBOL String::NewSymbol("pid")
/* defines for the parent side */
#define STDOUT_CLOSED (stdout_pipe_[0] < 0)
#define STDERR_CLOSED (stderr_pipe_[0] < 0)
#define STDIN_CLOSED (stdin_pipe_[1] < 0)
Persistent<FunctionTemplate> Process::constructor_template; Persistent<FunctionTemplate> Process::constructor_template;
void void
@ -36,9 +30,6 @@ Process::Initialize (Handle<Object> target)
NODE_SET_PROTOTYPE_METHOD(constructor_template, "close", Process::Close); NODE_SET_PROTOTYPE_METHOD(constructor_template, "close", Process::Close);
NODE_SET_PROTOTYPE_METHOD(constructor_template, "kill", Process::Kill); NODE_SET_PROTOTYPE_METHOD(constructor_template, "kill", Process::Kill);
constructor_template->PrototypeTemplate()->SetAccessor(PID_SYMBOL,
PIDGetter);
target->Set(String::NewSymbol("Process"), constructor_template->GetFunction()); target->Set(String::NewSymbol("Process"), constructor_template->GetFunction());
} }
@ -70,73 +61,55 @@ Process::Spawn (const Arguments& args)
return ThrowException(String::New("Error spawning")); return ThrowException(String::New("Error spawning"));
} }
process->handle_->Set(PID_SYMBOL, Integer::New(process->pid_));
return Undefined(); return Undefined();
} }
Handle<Value> Handle<Value>
Process::PIDGetter (Local<String> property, const AccessorInfo& info) Process::Write (const Arguments& args)
{ {
HandleScope scope; HandleScope scope;
Process *process = ObjectWrap::Unwrap<Process>(info.This()); Process *process = ObjectWrap::Unwrap<Process>(args.Holder());
assert(process); assert(process);
assert(property == PID_SYMBOL);
if (process->pid_ == 0) return Null(); ssize_t len;
Local<Integer> pid = Integer::New(process->pid_); Local<String> string;
return scope.Close(pid); Local<Array> array;
if (args[0]->IsArray()) {
array = Local<Array>::Cast(args[0]);
len = array->Length();
} else {
string = args[0]->ToString();
len = string->Utf8Length();
} }
Handle<Value> char buf[len];
Process::Write (const Arguments& args)
{
HandleScope scope;
Process *process = ObjectWrap::Unwrap<Process>(args.Holder());
assert(process);
// XXX if (args[0]->IsArray()) {
// A lot of improvement can be made here. First of all we're allocating for (ssize_t index = 0; index < len; index++) {
// evcom_bufs for every send which is clearly inefficent - it should use a Local<Value> int_value = array->Get(Integer::New(index));
// memory pool or ring buffer. Of course, expressing binary data as an buf[index] = int_value->IntegerValue();
// array of integers is extremely inefficent. This can improved when v8 }
// bug 270 (http://code.google.com/p/v8/issues/detail?id=270) has been } else {
// addressed. switch (ParseEncoding(args[1])) {
evcom_buf *buf;
size_t len;
if (args[0]->IsString()) {
enum encoding enc = ParseEncoding(args[1]);
Local<String> s = args[0]->ToString();
len = s->Utf8Length();
buf = node::buf_new(len);
switch (enc) {
case RAW: case RAW:
case ASCII: case ASCII:
s->WriteAscii(buf->base, 0, len); string->WriteAscii(buf, 0, len);
break; break;
case UTF8: case UTF8:
s->WriteUtf8(buf->base, len); string->WriteUtf8(buf, len);
break; break;
default: default:
assert(0 && "unhandled string encoding"); return ThrowException(String::New("Unknown encoding."));
} }
} else if (args[0]->IsArray()) {
Handle<Array> array = Handle<Array>::Cast(args[0]);
len = array->Length();
buf = node::buf_new(len);
for (size_t i = 0; i < len; i++) {
Local<Value> int_value = array->Get(Integer::New(i));
buf->base[i] = int_value->IntegerValue();
} }
} else return ThrowException(String::New("Bad argument")); return process->Write(buf, len) == 0 ? True() : False();
return process->Write(buf) == 0 ? True() : False();
} }
Handle<Value> Handle<Value>
@ -165,35 +138,93 @@ Process::Close (const Arguments& args)
return process->Close() == 0 ? True() : False(); return process->Close() == 0 ? True() : False();
} }
void
Process::reader_closed (evcom_reader *r)
{
Process *process = static_cast<Process*> (r->data);
if (r == &process->stdout_reader_) {
process->stdout_fd_ = -1;
} else {
assert(r == &process->stderr_reader_);
process->stderr_fd_ = -1;
}
evcom_reader_detach(r);
process->MaybeShutdown();
}
void
Process::stdin_closed (evcom_writer *w)
{
Process *process = static_cast<Process*> (w->data);
assert(w == &process->stdin_writer_);
process->stdin_fd_ = -1;
evcom_writer_detach(w);
process->MaybeShutdown();
}
void
Process::on_read (evcom_reader *r, const void *buf, size_t len)
{
Process *process = static_cast<Process*> (r->data);
HandleScope scope;
bool isSTDOUT = (r == &process->stdout_reader_);
Local<Value> argv[1];
enum encoding encoding = isSTDOUT ? process->stdout_encoding_ : process->stderr_encoding_;
if (len == 0) {
argv[0] = Local<Value>::New(Null());
} else if (encoding == RAW) {
// raw encoding
Local<Array> array = Array::New(len);
for (size_t i = 0; i < len; i++) {
unsigned char val = static_cast<const unsigned char*>(buf)[i];
array->Set(Integer::New(i), Integer::New(val));
}
argv[0] = array;
} else {
// utf8 or ascii encoding
argv[0] = String::New((const char*)buf, len);
}
process->Emit(isSTDOUT ? "output" : "error", 1, argv);
process->MaybeShutdown();
}
Process::Process () Process::Process ()
: EventEmitter() : EventEmitter()
{ {
ev_init(&stdout_watcher_, Process::OnOutput); evcom_reader_init(&stdout_reader_);
stdout_watcher_.data = this; stdout_reader_.data = this;
stdout_reader_.on_read = on_read;
stdout_reader_.on_close = reader_closed;
ev_init(&stderr_watcher_, Process::OnOutput); evcom_reader_init(&stderr_reader_);
stderr_watcher_.data = this; stderr_reader_.data = this;
stderr_reader_.on_read = on_read;
stderr_reader_.on_close = reader_closed;
ev_init(&stdin_watcher_, Process::OnWritable); evcom_writer_init(&stdin_writer_);
stdin_watcher_.data = this; stdin_writer_.data = this;
stdin_writer_.on_close = stdin_closed;
ev_init(&child_watcher_, Process::OnCHLD); ev_init(&child_watcher_, Process::OnCHLD);
child_watcher_.data = this; child_watcher_.data = this;
stdout_pipe_[0] = -1; stdout_fd_ = -1;
stdout_pipe_[1] = -1; stderr_fd_ = -1;
stderr_pipe_[0] = -1; stdin_fd_ = -1;
stderr_pipe_[1] = -1;
stdin_pipe_[0] = -1; stdout_encoding_ = UTF8;
stdin_pipe_[1] = -1; stderr_encoding_ = UTF8;
got_close_ = false;
got_chld_ = false; got_chld_ = false;
exit_code_ = 0; exit_code_ = 0;
pid_ = 0; pid_ = 0;
evcom_queue_init(&out_stream_);
} }
Process::~Process () Process::~Process ()
@ -204,35 +235,24 @@ Process::~Process ()
void void
Process::Shutdown () Process::Shutdown ()
{ {
// Clear the out_stream if (stdin_fd_ >= 0) {
while (!evcom_queue_empty(&out_stream_)) { evcom_writer_close(&stdin_writer_);
evcom_queue *q = evcom_queue_last(&out_stream_);
evcom_buf *buf = (evcom_buf*) evcom_queue_data(q, evcom_buf, queue);
evcom_queue_remove(q);
if (buf->release) buf->release(buf);
} }
if (stdout_pipe_[0] >= 0) close(stdout_pipe_[0]); if (stdin_fd_ >= 0) close(stdin_fd_);
if (stdout_pipe_[1] >= 0) close(stdout_pipe_[1]); if (stdout_fd_ >= 0) close(stdout_fd_);
if (stderr_fd_ >= 0) close(stderr_fd_);
if (stderr_pipe_[0] >= 0) close(stderr_pipe_[0]);
if (stderr_pipe_[1] >= 0) close(stderr_pipe_[1]);
if (stdin_pipe_[0] >= 0) close(stdin_pipe_[0]); stdin_fd_ = -1;
if (stdin_pipe_[1] >= 0) close(stdin_pipe_[1]); stdout_fd_ = -1;
stderr_fd_ = -1;
stdout_pipe_[0] = -1; evcom_writer_detach(&stdin_writer_);
stdout_pipe_[1] = -1; evcom_reader_detach(&stdout_reader_);
stderr_pipe_[0] = -1; evcom_reader_detach(&stderr_reader_);
stderr_pipe_[1] = -1;
stdin_pipe_[0] = -1;
stdin_pipe_[1] = -1;
ev_io_stop(EV_DEFAULT_UC_ &stdout_watcher_);
ev_io_stop(EV_DEFAULT_UC_ &stderr_watcher_);
ev_io_stop(EV_DEFAULT_UC_ &stdin_watcher_);
ev_child_stop(EV_DEFAULT_UC_ &child_watcher_); ev_child_stop(EV_DEFAULT_UC_ &child_watcher_);
/* XXX Kill the PID? */ /* XXX Kill the PID? */
pid_ = 0; pid_ = 0;
} }
@ -252,25 +272,24 @@ int
Process::Spawn (const char *command) Process::Spawn (const char *command)
{ {
assert(pid_ == 0); assert(pid_ == 0);
assert(stdout_pipe_[0] == -1); assert(stdout_fd_ == -1);
assert(stdout_pipe_[1] == -1); assert(stderr_fd_ == -1);
assert(stderr_pipe_[0] == -1); assert(stdin_fd_ == -1);
assert(stderr_pipe_[1] == -1);
assert(stdin_pipe_[0] == -1); int stdout_pipe[2], stdin_pipe[2], stderr_pipe[2];
assert(stdin_pipe_[1] == -1);
/* An implementation of popen(), basically */ /* An implementation of popen(), basically */
if (pipe(stdout_pipe_) < 0) { if (pipe(stdout_pipe) < 0) {
perror("pipe()"); perror("pipe()");
return -1; return -1;
} }
if (pipe(stderr_pipe_) < 0) { if (pipe(stderr_pipe) < 0) {
perror("pipe()"); perror("pipe()");
return -2; return -2;
} }
if (pipe(stdin_pipe_) < 0) { if (pipe(stdin_pipe) < 0) {
perror("pipe()"); perror("pipe()");
return -3; return -3;
} }
@ -281,14 +300,14 @@ Process::Spawn (const char *command)
return -4; return -4;
case 0: // Child. case 0: // Child.
close(stdout_pipe_[0]); // close read end close(stdout_pipe[0]); // close read end
dup2(stdout_pipe_[1], STDOUT_FILENO); dup2(stdout_pipe[1], STDOUT_FILENO);
close(stderr_pipe_[0]); // close read end close(stderr_pipe[0]); // close read end
dup2(stderr_pipe_[1], STDERR_FILENO); dup2(stderr_pipe[1], STDERR_FILENO);
close(stdin_pipe_[1]); // close write end close(stdin_pipe[1]); // close write end
dup2(stdin_pipe_[0], STDIN_FILENO); dup2(stdin_pipe[0], STDIN_FILENO);
execl("/bin/sh", "sh", "-c", command, (char *)NULL); execl("/bin/sh", "sh", "-c", command, (char *)NULL);
_exit(127); _exit(127);
@ -299,133 +318,30 @@ Process::Spawn (const char *command)
ev_child_set(&child_watcher_, pid_, 0); ev_child_set(&child_watcher_, pid_, 0);
ev_child_start(EV_DEFAULT_UC_ &child_watcher_); ev_child_start(EV_DEFAULT_UC_ &child_watcher_);
SetNonBlocking(stdout_pipe_[0]); close(stdout_pipe[1]);
ev_io_set(&stdout_watcher_, stdout_pipe_[0], EV_READ); stdout_fd_ = stdout_pipe[0];
ev_io_start(EV_DEFAULT_UC_ &stdout_watcher_); SetNonBlocking(stdout_fd_);
close(stdout_pipe_[1]); // close write end
stdout_pipe_[1] = -1;
SetNonBlocking(stderr_pipe_[0]); close(stderr_pipe[1]);
ev_io_set(&stderr_watcher_, stderr_pipe_[0], EV_READ); stderr_fd_ = stderr_pipe[0];
ev_io_start(EV_DEFAULT_UC_ &stderr_watcher_); SetNonBlocking(stderr_fd_);
close(stderr_pipe_[1]); // close write end
stderr_pipe_[1] = -1;
SetNonBlocking(stdin_pipe_[1]); close(stdin_pipe[0]);
ev_io_set(&stdin_watcher_, stdin_pipe_[1], EV_WRITE); stdin_fd_ = stdin_pipe[1];
ev_io_start(EV_DEFAULT_UC_ &stdin_watcher_); SetNonBlocking(stdin_fd_);
close(stdin_pipe_[0]); // close read end
stdin_pipe_[0] = -1;
Attach(); evcom_reader_set(&stdout_reader_, stdout_fd_);
evcom_reader_attach(EV_DEFAULT_UC_ &stdout_reader_);
return 0;
}
void
Process::OnOutput (EV_P_ ev_io *watcher, int revents)
{
int r;
char buf[16*1024];
size_t buf_size = 16*1024;
Process *process = static_cast<Process*>(watcher->data);
bool is_stdout = (&process->stdout_watcher_ == watcher);
int fd = is_stdout ? process->stdout_pipe_[0] : process->stderr_pipe_[0];
assert(revents == EV_READ);
assert(fd >= 0);
HandleScope scope;
Handle<Value> argv[1];
for (;;) {
r = read(fd, buf, buf_size);
if (r < 0) {
if (errno != EAGAIN) {
perror("IPC pipe read error");
} else {
if (process->got_chld_) {
close(fd);
if (is_stdout) {
process->stdout_pipe_[0] = -1;
} else {
process->stderr_pipe_[0] = -1;
}
}
}
break;
}
if (r == 0) {
argv[0] = Null();
} else {
// TODO multiple encodings
argv[0] = String::New((const char*)buf, r);
}
process->Emit(is_stdout ? "output" : "error", 1, argv);
if (r == 0) {
ev_io_stop(EV_DEFAULT_UC_ watcher);
close(fd);
if (is_stdout) {
process->stdout_pipe_[0] = -1;
} else {
process->stderr_pipe_[0] = -1;
}
break;
}
}
process->MaybeShutdown();
}
void
Process::OnWritable (EV_P_ ev_io *watcher, int revents)
{
Process *process = static_cast<Process*>(watcher->data);
int sent;
assert(revents == EV_WRITE);
assert(process->stdin_pipe_[1] >= 0);
while (!evcom_queue_empty(&process->out_stream_)) { evcom_reader_set(&stderr_reader_, stderr_fd_);
evcom_queue *q = evcom_queue_last(&process->out_stream_); evcom_reader_attach(EV_DEFAULT_UC_ &stderr_reader_);
evcom_buf *to_write = (evcom_buf*) evcom_queue_data(q, evcom_buf, queue);
sent = write( process->stdin_pipe_[1]
, to_write->base + to_write->written
, to_write->len - to_write->written
);
if (sent < 0) {
if (errno == EAGAIN) {
if (process->got_chld_) {
close(process->stdin_pipe_[1]);
process->stdin_pipe_[1] = -1;
}
break;
}
perror("IPC pipe write error");
break;
}
to_write->written += sent; evcom_writer_set(&stdin_writer_, stdin_fd_);
evcom_writer_attach(EV_DEFAULT_UC_ &stdin_writer_);
if (to_write->written == to_write->len) { Attach();
evcom_queue_remove(q);
if (to_write->release) to_write->release(to_write);
}
}
if (evcom_queue_empty(&process->out_stream_)) { return 0;
ev_io_stop(EV_DEFAULT_UC_ &process->stdin_watcher_);
if (process->got_close_) {
close(process->stdin_pipe_[1]);
process->stdin_pipe_[1] = -1;
}
}
} }
void void
@ -441,38 +357,24 @@ Process::OnCHLD (EV_P_ ev_child *watcher, int revents)
process->got_chld_ = true; process->got_chld_ = true;
process->exit_code_ = watcher->rstatus; process->exit_code_ = watcher->rstatus;
if (process->stdout_pipe_[0] >= 0) { if (process->stdin_fd_ >= 0) evcom_writer_close(&process->stdin_writer_);
ev_feed_event(&process->stdout_watcher_, EV_READ);
}
if (process->stderr_pipe_[0] >= 0) {
ev_feed_event(&process->stderr_watcher_, EV_READ);
}
if (process->stdin_pipe_[1] >= 0) {
ev_io_start(EV_DEFAULT_UC_ &process->stdin_watcher_);
ev_feed_event(&process->stdin_watcher_, EV_WRITE);
}
process->MaybeShutdown(); process->MaybeShutdown();
} }
int int
Process::Write (evcom_buf *buf) Process::Write (const char *str, size_t len)
{ {
if (STDIN_CLOSED || got_close_ || got_chld_) return -1; if (stdin_fd_ < 0 || got_chld_) return -1;
evcom_queue_insert_head(&out_stream_, &buf->queue); evcom_writer_write(&stdin_writer_, str, len);
buf->written = 0;
ev_io_start(EV_DEFAULT_UC_ &stdin_watcher_);
return 0; return 0;
} }
int int
Process::Close (void) Process::Close (void)
{ {
if (STDIN_CLOSED || got_close_ || got_chld_) return -1; if (stdin_fd_ < 0 || got_chld_) return -1;
got_close_ = true; evcom_writer_close(EV_DEFAULT_UC_ &stdin_writer_);
ev_io_start(EV_DEFAULT_UC_ &stdin_watcher_);
return 0; return 0;
} }
@ -486,7 +388,7 @@ Process::Kill (int sig)
void void
Process::MaybeShutdown (void) Process::MaybeShutdown (void)
{ {
if (STDOUT_CLOSED && STDERR_CLOSED && got_chld_) { if (stdout_fd_ < 0 && stderr_fd_ < 0 && got_chld_) {
HandleScope scope; HandleScope scope;
Handle<Value> argv[1] = { Integer::New(exit_code_) }; Handle<Value> argv[1] = { Integer::New(exit_code_) };
Emit("exit", 1, argv); Emit("exit", 1, argv);

27
src/process.h

@ -26,35 +26,36 @@ class Process : EventEmitter {
~Process(); ~Process();
int Spawn (const char *command); int Spawn (const char *command);
int Write (evcom_buf *buf); int Write (const char *str, size_t len);
int Close (void); int Close (void);
int Kill (int sig); int Kill (int sig);
private: private:
static void OnOutput (EV_P_ ev_io *watcher, int revents); static void on_read (evcom_reader *r, const void *buf, size_t len);
static void OnError (EV_P_ ev_io *watcher, int revents); static void reader_closed (evcom_reader *r);
static void OnWritable (EV_P_ ev_io *watcher, int revents); static void stdin_closed (evcom_writer *w);
static void OnCHLD (EV_P_ ev_child *watcher, int revents); static void OnCHLD (EV_P_ ev_child *watcher, int revents);
void MaybeShutdown (void); void MaybeShutdown (void);
void Shutdown (void); void Shutdown (void);
ev_io stdout_watcher_; evcom_reader stdout_reader_;
ev_io stderr_watcher_; evcom_reader stderr_reader_;
ev_io stdin_watcher_; evcom_writer stdin_writer_;
ev_child child_watcher_; ev_child child_watcher_;
int stdout_pipe_[2]; int stdout_fd_;
int stderr_pipe_[2]; int stderr_fd_;
int stdin_pipe_[2]; int stdin_fd_;
enum encoding stdout_encoding_;
enum encoding stderr_encoding_;
pid_t pid_; pid_t pid_;
bool got_close_;
bool got_chld_; bool got_chld_;
int exit_code_; int exit_code_;
evcom_queue out_stream_;
}; };
} // namespace node } // namespace node

4
test/mjsunit/test-process-buffering.js

@ -6,9 +6,11 @@ function pwd (callback) {
var output = ""; var output = "";
var process = node.createProcess("pwd"); var process = node.createProcess("pwd");
process.addListener("output", function (s) { process.addListener("output", function (s) {
puts("stdout: " + JSON.stringify(s));
if (s) output += s; if (s) output += s;
}); });
process.addListener("exit", function (c) { process.addListener("exit", function (c) {
puts("exit: " + c);
assertEquals(0, c); assertEquals(0, c);
callback(output); callback(output);
pwd_called = true; pwd_called = true;
@ -18,7 +20,7 @@ function pwd (callback) {
function onLoad () { function onLoad () {
pwd(function (result) { pwd(function (result) {
print(result); p(result);
assertTrue(result.length > 1); assertTrue(result.length > 1);
assertEquals("\n", result[result.length-1]); assertEquals("\n", result[result.length-1]);
}); });

12
test/mjsunit/test-process-simple.js

@ -6,15 +6,23 @@ var response = "";
var exit_status = -1; var exit_status = -1;
cat.addListener("output", function (chunk) { cat.addListener("output", function (chunk) {
puts("stdout: " + JSON.stringify(chunk));
if (chunk) { if (chunk) {
response += chunk; response += chunk;
if (response === "hello world") cat.close(); if (response === "hello world") {
puts("closing cat");
cat.close();
}
} }
}); });
cat.addListener("error", function (chunk) { cat.addListener("error", function (chunk) {
puts("stderr: " + JSON.stringify(chunk));
assertEquals(null, chunk); assertEquals(null, chunk);
}); });
cat.addListener("exit", function (status) { exit_status = status; }); cat.addListener("exit", function (status) {
puts("exit event");
exit_status = status;
});
function onLoad () { function onLoad () {
cat.write("hello"); cat.write("hello");

Loading…
Cancel
Save