Browse Source

Use streams for stdout and stdin

v0.7.4-release
Ryan Dahl 15 years ago
parent
commit
fdf46a65c9
  1. 13
      lib/net.js
  2. 10
      lib/repl.js
  3. 8
      lib/sys.js
  4. 12
      src/node.cc
  5. 27
      src/node.js
  6. 183
      src/node_stdio.cc
  7. 6
      src/node_stdio.h
  8. 3
      test/simple/test-net-pingpong.js

13
lib/net.js

@ -359,12 +359,13 @@ function initSocket (self) {
function Socket (fd) { function Socket (fd) {
process.EventEmitter.call(this); process.EventEmitter.call(this);
if (fd) { this.fd = null;
if (parseInt(fd) >= 0) {
initSocket(this); initSocket(this);
this.fd = fd; this.fd = fd;
this.resume();
this.readable = true; this.readable = true;
this._writeWatcher.set(this.fd, false, true); this._writeWatcher.set(this.fd, false, true);
@ -615,7 +616,7 @@ function doConnect (socket, port, host) {
var errno = socketError(socket.fd); var errno = socketError(socket.fd);
if (errno == 0) { if (errno == 0) {
// connection established // connection established
socket._readWatcher.start(); socket.resume();
socket.readable = true; socket.readable = true;
socket.writable = true; socket.writable = true;
socket._writeWatcher.callback = socket._doFlush; socket._writeWatcher.callback = socket._doFlush;
@ -680,7 +681,7 @@ Socket.prototype.pause = function () {
Socket.prototype.resume = function () { Socket.prototype.resume = function () {
if (!this.fd) throw new Error('Cannot resume() closed Socket.'); if (this.fd === null) throw new Error('Cannot resume() closed Socket.');
this._readWatcher.set(this.fd, true, false); this._readWatcher.set(this.fd, true, false);
this._readWatcher.start(); this._readWatcher.start();
}; };
@ -762,16 +763,18 @@ function Server (listener) {
while (self.fd) { while (self.fd) {
var peerInfo = accept(self.fd); var peerInfo = accept(self.fd);
if (!peerInfo) return; if (!peerInfo) return;
var s = new Socket(peerInfo.fd); var s = new Socket(peerInfo.fd);
s.remoteAddress = peerInfo.remoteAddress; s.remoteAddress = peerInfo.remoteAddress;
s.remotePort = peerInfo.remotePort; s.remotePort = peerInfo.remotePort;
s.type = self.type; s.type = self.type;
s.server = self; s.server = self;
s.resume();
self.emit('connection', s); self.emit('connection', s);
// The 'connect' event probably should be removed for server-side // The 'connect' event probably should be removed for server-side
// sockets. It's redundent. // sockets. It's redundent.
s.emit('connect'); s.emit('connect');
timeout.active(s);
} }
}; };
} }

10
lib/repl.js

@ -12,13 +12,17 @@ exports.scope = {};
exports.prompt = "node> "; exports.prompt = "node> ";
// Can overridden with custom print functions, such as `probe` or `eyes.js` // Can overridden with custom print functions, such as `probe` or `eyes.js`
exports.writer = sys.p; exports.writer = sys.p;
var stdin;
exports.start = function (prompt) { exports.start = function (prompt) {
if (prompt !== undefined) { if (prompt !== undefined) {
exports.prompt = prompt; exports.prompt = prompt;
} }
process.stdio.open(); stdin = process.openStdin();
process.stdio.addListener("data", readline); stdin.setEncoding('utf8');
stdin.addListener("data", readline);
displayPrompt(); displayPrompt();
} }
@ -96,7 +100,7 @@ function parseREPLKeyword (cmd) {
displayPrompt(); displayPrompt();
return true; return true;
case ".exit": case ".exit":
process.stdio.close(); stdin.close();
return true; return true;
case ".help": case ".help":
sys.puts(".break\tSometimes you get stuck in a place you can't get out... This will get you out."); sys.puts(".break\tSometimes you get stuck in a place you can't get out... This will get you out.");

8
lib/sys.js

@ -2,23 +2,23 @@ var events = require('events');
exports.print = function () { exports.print = function () {
for (var i = 0, len = arguments.length; i < len; ++i) { for (var i = 0, len = arguments.length; i < len; ++i) {
process.stdio.write(arguments[i]); process.stdout.write(arguments[i]);
} }
}; };
exports.puts = function () { exports.puts = function () {
for (var i = 0, len = arguments.length; i < len; ++i) { for (var i = 0, len = arguments.length; i < len; ++i) {
process.stdio.write(arguments[i] + '\n'); process.stdout.write(arguments[i] + '\n');
} }
}; };
exports.debug = function (x) { exports.debug = function (x) {
process.stdio.writeError("DEBUG: " + x + "\n"); process.binding('stdio').writeError("DEBUG: " + x + "\n");
}; };
exports.error = function (x) { exports.error = function (x) {
for (var i = 0, len = arguments.length; i < len; ++i) { for (var i = 0, len = arguments.length; i < len; ++i) {
process.stdio.writeError(arguments[i] + '\n'); process.binding('stdio').writeError(arguments[i] + '\n');
} }
}; };

12
src/node.cc

@ -1073,7 +1073,16 @@ static Handle<Value> Binding(const Arguments& args) {
Local<Object> exports; Local<Object> exports;
if (!strcmp(*module_v, "http")) { if (!strcmp(*module_v, "stdio")) {
if (binding_cache->Has(module)) {
exports = binding_cache->Get(module)->ToObject();
} else {
exports = Object::New();
Stdio::Initialize(exports);
binding_cache->Set(module, exports);
}
} else if (!strcmp(*module_v, "http")) {
if (binding_cache->Has(module)) { if (binding_cache->Has(module)) {
exports = binding_cache->Get(module)->ToObject(); exports = binding_cache->Get(module)->ToObject();
} else { } else {
@ -1258,7 +1267,6 @@ static void Load(int argc, char *argv[]) {
IOWatcher::Initialize(process); // io_watcher.cc IOWatcher::Initialize(process); // io_watcher.cc
IdleWatcher::Initialize(process); // idle_watcher.cc IdleWatcher::Initialize(process); // idle_watcher.cc
Timer::Initialize(process); // timer.cc Timer::Initialize(process); // timer.cc
Stdio::Initialize(process); // stdio.cc
InitNet2(process); // net2.cc InitNet2(process); // net2.cc
InitHttpParser(process); // http_parser.cc InitHttpParser(process); // http_parser.cc
ChildProcess::Initialize(process); // child_process.cc ChildProcess::Initialize(process); // child_process.cc

27
src/node.js

@ -106,10 +106,12 @@ process.createChildProcess = function (file, args, env) {
return child; return child;
}; };
process.assert = function (x, msg) { process.assert = function (x, msg) {
if (!(x)) throw new Error(msg || "assertion error"); if (!(x)) throw new Error(msg || "assertion error");
}; };
// From jQuery.extend in the jQuery JavaScript Library v1.3.2 // From jQuery.extend in the jQuery JavaScript Library v1.3.2
// Copyright (c) 2009 John Resig // Copyright (c) 2009 John Resig
// Dual licensed under the MIT and GPL licenses. // Dual licensed under the MIT and GPL licenses.
@ -119,7 +121,7 @@ var mixinMessage;
process.mixin = function() { process.mixin = function() {
if (!mixinMessage) { if (!mixinMessage) {
mixinMessage = 'deprecation warning: process.mixin will be removed from node-core future releases.\n' mixinMessage = 'deprecation warning: process.mixin will be removed from node-core future releases.\n'
process.stdio.writeError(mixinMessage); process.binding('stdio').writeError(mixinMessage);
} }
// copy reference to target object // copy reference to target object
var target = arguments[0] || {}, i = 1, length = arguments.length, deep = false, source; var target = arguments[0] || {}, i = 1, length = arguments.length, deep = false, source;
@ -338,7 +340,7 @@ if ("NODE_DEBUG" in process.env) debugLevel = 1;
function debug (x) { function debug (x) {
if (debugLevel > 0) { if (debugLevel > 0) {
process.stdio.writeError(x + "\n"); process.binding('stdio').writeError(x + "\n");
} }
} }
@ -781,6 +783,27 @@ Module.prototype._waitChildrenLoad = function (callback) {
}; };
var stdout;
process.__defineGetter__('stdout', function () {
if (stdout) return stdout;
var net = requireNative('net');
stdout = new net.Socket(process.binding('stdio').stdoutFD);
return stdout;
});
var stdin;
process.openStdin = function () {
if (stdin) return stdin;
var net = requireNative('net');
var fd = process.binding('stdio').openStdin();
stdin = new net.Socket(fd);
process.stdout.write(stdin.fd + "\n");
stdin.resume();
stdin.readable = true;
return stdin;
};
process.exit = function (code) { process.exit = function (code) {
process.emit("exit"); process.emit("exit");
process.reallyExit(code); process.reallyExit(code);

183
src/node_stdio.cc

@ -8,10 +8,8 @@
#include <errno.h> #include <errno.h>
using namespace v8; using namespace v8;
using namespace node; namespace node {
static Persistent<Object> stdio;
static Persistent<Function> emit;
static struct coupling *stdin_coupling = NULL; static struct coupling *stdin_coupling = NULL;
static struct coupling *stdout_coupling = NULL; static struct coupling *stdout_coupling = NULL;
@ -19,33 +17,8 @@ static struct coupling *stdout_coupling = NULL;
static int stdin_fd = -1; static int stdin_fd = -1;
static int stdout_fd = -1; static int stdout_fd = -1;
static evcom_reader in;
static evcom_writer out;
static enum encoding stdin_encoding; static Local<Value> errno_exception(int errorno) {
static void
EmitInput (Local<Value> input)
{
HandleScope scope;
Local<Value> argv[2] = { String::NewSymbol("data"), input };
emit->Call(stdio, 2, argv);
}
static void
EmitClose (void)
{
HandleScope scope;
Local<Value> argv[1] = { String::NewSymbol("close") };
emit->Call(stdio, 1, argv);
}
static inline Local<Value> errno_exception(int errorno) {
Local<Value> e = Exception::Error(String::NewSymbol(strerror(errorno))); Local<Value> e = Exception::Error(String::NewSymbol(strerror(errorno)));
Local<Object> obj = e->ToObject(); Local<Object> obj = e->ToObject();
obj->Set(String::NewSymbol("errno"), Integer::New(errorno)); obj->Set(String::NewSymbol("errno"), Integer::New(errorno));
@ -53,7 +26,7 @@ static inline Local<Value> errno_exception(int errorno) {
} }
/* STDERR IS ALWAY SYNC */ /* STDERR IS ALWAY SYNC ALWAYS UTF8 */
static Handle<Value> static Handle<Value>
WriteError (const Arguments& args) WriteError (const Arguments& args)
{ {
@ -81,84 +54,8 @@ WriteError (const Arguments& args)
return Undefined(); return Undefined();
} }
static Handle<Value>
Write (const Arguments& args)
{
HandleScope scope;
if (args.Length() == 0) {
return ThrowException(Exception::Error(String::New("Bad argument")));
}
enum encoding enc = UTF8;
if (args.Length() > 1) enc = ParseEncoding(args[1], UTF8);
ssize_t len = DecodeBytes(args[0], enc);
if (len < 0) {
Local<Value> exception = Exception::TypeError(String::New("Bad argument"));
return ThrowException(exception);
}
char buf[len];
ssize_t written = DecodeWrite(buf, len, args[0], enc);
assert(written == len);
evcom_writer_write(&out, buf, len);
return Undefined();
}
static void
detach_in (evcom_reader *r)
{
assert(r == &in);
HandleScope scope;
EmitClose();
evcom_reader_detach(&in);
if (stdin_coupling) {
coupling_destroy(stdin_coupling);
stdin_coupling = NULL;
}
stdin_fd = -1;
}
static void
detach_out (evcom_writer* w)
{
assert(w == &out);
evcom_writer_detach(&out);
if (stdout_coupling) {
coupling_destroy(stdout_coupling);
stdout_coupling = NULL;
}
stdout_fd = -1;
}
static void
on_read (evcom_reader *r, const void *buf, size_t len)
{
assert(r == &in);
HandleScope scope;
if (!len) {
return;
}
Local<Value> data = Encode(buf, len, stdin_encoding); static inline int SetNonblock(int fd) {
EmitInput(data);
}
static inline int
set_nonblock (int fd)
{
int flags = fcntl(fd, F_GETFL, 0); int flags = fcntl(fd, F_GETFL, 0);
if (flags == -1) return -1; if (flags == -1) return -1;
@ -168,20 +65,14 @@ set_nonblock (int fd)
return 0; return 0;
} }
static Handle<Value>
Open (const Arguments& args) static Handle<Value> OpenStdin(const Arguments& args) {
{
HandleScope scope; HandleScope scope;
if (stdin_fd >= 0) { if (stdin_fd >= 0) {
return ThrowException(Exception::Error(String::New("stdin already open"))); return ThrowException(Exception::Error(String::New("stdin already open")));
} }
stdin_encoding = UTF8;
if (args.Length() > 0) {
stdin_encoding = ParseEncoding(args[0]);
}
if (isatty(STDIN_FILENO)) { if (isatty(STDIN_FILENO)) {
// XXX selecting on tty fds wont work in windows. // XXX selecting on tty fds wont work in windows.
// Must ALWAYS make a coupling on shitty platforms. // Must ALWAYS make a coupling on shitty platforms.
@ -190,34 +81,11 @@ Open (const Arguments& args)
stdin_coupling = coupling_new_pull(STDIN_FILENO); stdin_coupling = coupling_new_pull(STDIN_FILENO);
stdin_fd = coupling_nonblocking_fd(stdin_coupling); stdin_fd = coupling_nonblocking_fd(stdin_coupling);
} }
set_nonblock(stdin_fd); SetNonblock(stdin_fd);
evcom_reader_init(&in);
in.on_read = on_read;
in.on_close = detach_in;
evcom_reader_set(&in, stdin_fd);
evcom_reader_attach(EV_DEFAULT_ &in);
return Undefined(); return scope.Close(Integer::New(stdin_fd));
} }
static Handle<Value>
Close (const Arguments& args)
{
HandleScope scope;
assert(stdio == args.Holder());
if (stdin_fd < 0) {
return ThrowException(Exception::Error(String::New("stdin not open")));
}
evcom_reader_close(&in);
return Undefined();
}
void Stdio::Flush() { void Stdio::Flush() {
if (stdout_fd >= 0) { if (stdout_fd >= 0) {
@ -232,27 +100,9 @@ void Stdio::Flush() {
} }
} }
void
Stdio::Initialize (v8::Handle<v8::Object> target)
{
HandleScope scope;
Local<Object> stdio_local =
EventEmitter::constructor_template->GetFunction()->NewInstance(0, NULL);
stdio = Persistent<Object>::New(stdio_local);
NODE_SET_METHOD(stdio, "open", Open);
NODE_SET_METHOD(stdio, "write", Write);
NODE_SET_METHOD(stdio, "writeError", WriteError);
NODE_SET_METHOD(stdio, "close", Close);
target->Set(String::NewSymbol("stdio"), stdio); void Stdio::Initialize(v8::Handle<v8::Object> target) {
HandleScope scope;
Local<Value> emit_v = stdio->Get(String::NewSymbol("emit"));
assert(emit_v->IsFunction());
Local<Function> emit_f = Local<Function>::Cast(emit_v);
emit = Persistent<Function>::New(emit_f);
if (isatty(STDOUT_FILENO)) { if (isatty(STDOUT_FILENO)) {
// XXX selecting on tty fds wont work in windows. // XXX selecting on tty fds wont work in windows.
@ -262,10 +112,13 @@ Stdio::Initialize (v8::Handle<v8::Object> target)
stdout_coupling = coupling_new_push(STDOUT_FILENO); stdout_coupling = coupling_new_push(STDOUT_FILENO);
stdout_fd = coupling_nonblocking_fd(stdout_coupling); stdout_fd = coupling_nonblocking_fd(stdout_coupling);
} }
set_nonblock(stdout_fd); SetNonblock(stdout_fd);
target->Set(String::NewSymbol("stdoutFD"), Integer::New(stdout_fd));
evcom_writer_init(&out); NODE_SET_METHOD(target, "writeError", WriteError);
out.on_close = detach_out; NODE_SET_METHOD(target, "openStdin", OpenStdin);
evcom_writer_set(&out, stdout_fd);
evcom_writer_attach(EV_DEFAULT_ &out);
} }
} // namespace node

6
src/node_stdio.h

@ -2,9 +2,7 @@
#define node_stdio_h #define node_stdio_h
#include <node.h> #include <node.h>
#include <v8.h> #include <v8.h>
#include <evcom.h>
namespace node { namespace node {
@ -14,5 +12,5 @@ public:
static void Flush (); static void Flush ();
}; };
} // namespace node } // namespace node
#endif #endif // node_stdio_h

3
test/simple/test-net-pingpong.js

@ -1,4 +1,5 @@
process.mixin(require("../common")); require("../common");
net = require("net"); net = require("net");
process.Buffer.prototype.toString = function () { process.Buffer.prototype.toString = function () {

Loading…
Cancel
Save