diff --git a/lib/net.js b/lib/net.js index cf2a8bf8a1..3108feebde 100644 --- a/lib/net.js +++ b/lib/net.js @@ -7,6 +7,7 @@ function debug (x) { } +var IOWatcher = process.IOWatcher; var assert = process.assert; var socket = process.socket; var bind = process.bind; @@ -25,7 +26,7 @@ var needsLookup = process.needsLookup; var EINPROGRESS = process.EINPROGRESS; -function Stream (peerInfo) { +function Socket (peerInfo) { process.EventEmitter.call(); var self = this; @@ -33,10 +34,11 @@ function Stream (peerInfo) { // Allocated on demand. self.recvBuffer = null; - self.readWatcher = new process.IOWatcher(function () { + self.readWatcher = new IOWatcher() + self.readWatcher.callback = function () { // If this is the first recv (recvBuffer doesn't exist) or we've used up // most of the recvBuffer, allocate a new one. - if (!self.recvBuffer || + if (!self.recvBuffer || self.recvBuffer.length - self.recvBuffer.used < 128) { self._allocateNewRecvBuf(); } @@ -52,18 +54,18 @@ function Stream (peerInfo) { self.readable = false; self.readWatcher.stop(); self.emit('eof'); - if (!self.writable) self.forceClose(); + if (!self.writable) self.forceClose(); } else { var slice = self.recvBuffer.slice(self.recvBuffer.used, self.recvBuffer.used + bytesRead); self.recvBuffer.used += bytesRead; self.emit('receive', slice); } - }); + }; self.readable = false; self.sendQueue = []; // queue of buffers that need to be written to socket - // XXX use link list? + // XXX use link list? self.sendQueueSize = 0; // in bytes, not to be confused with sendQueue.length! self._doFlush = function () { assert(self.sendQueueSize > 0); @@ -72,7 +74,8 @@ function Stream (peerInfo) { self.emit("drain"); } }; - self.writeWatcher = new process.IOWatcher(self._doFlush); + self.writeWatcher = new IOWatcher(); + self.writeWatcher.callback = self._doFlush; self.writable = false; if (peerInfo) { @@ -88,18 +91,18 @@ function Stream (peerInfo) { self.writable = true; } }; -process.inherits(Stream, process.EventEmitter); -exports.Stream = Stream; +process.inherits(Socket, process.EventEmitter); +exports.Socket = Socket; exports.createConnection = function (port, host) { - var s = new Stream(); + var s = new Socket(); s.connect(port, host); return s; }; -Stream.prototype._allocateNewRecvBuf = function () { +Socket.prototype._allocateNewRecvBuf = function () { var self = this; var newBufferSize = 1024; // TODO make this adjustable from user API @@ -125,7 +128,7 @@ Stream.prototype._allocateNewRecvBuf = function () { }; -Stream.prototype._allocateSendBuffer = function () { +Socket.prototype._allocateSendBuffer = function () { var b = new process.Buffer(1024); b.used = 0; b.sent = 0; @@ -134,9 +137,9 @@ Stream.prototype._allocateSendBuffer = function () { }; -Stream.prototype._sendString = function (data, encoding) { +Socket.prototype._sendString = function (data, encoding) { var self = this; - if (!self.writable) throw new Error('Stream is not writable'); + if (!self.writable) throw new Error('Socket is not writable'); var buffer; if (self.sendQueue.length == 0) { buffer = self._allocateSendBuffer(); @@ -191,9 +194,9 @@ Stream.prototype._sendString = function (data, encoding) { // Returns true if all the data was flushed to socket. Returns false if // something was queued. If data was queued, then the "drain" event will // signal when it has been finally flushed to socket. -Stream.prototype.send = function (data, encoding) { +Socket.prototype.send = function (data, encoding) { var self = this; - if (!self.writable) throw new Error('Stream is not writable'); + if (!self.writable) throw new Error('Socket is not writable'); if (typeof(data) == 'string') { self._sendString(data, encoding); } else { @@ -220,9 +223,9 @@ Stream.prototype.send = function (data, encoding) { // Flushes the write buffer out. Emits "drain" if the buffer is empty. -Stream.prototype.flush = function () { +Socket.prototype.flush = function () { var self = this; - if (!self.writable) throw new Error('Stream is not writable'); + if (!self.writable) throw new Error('Socket is not writable'); var bytesWritten; while (self.sendQueue.length > 0) { @@ -253,13 +256,13 @@ Stream.prototype.flush = function () { }; -// var stream = new Stream(); +// var stream = new Socket(); // stream.connect(80) - TCP connect to port 80 on the localhost // stream.connect(80, 'nodejs.org') - TCP connect to port 80 on nodejs.org // stream.connect('/tmp/socket') - UNIX connect to socket specified by path -Stream.prototype.connect = function () { +Socket.prototype.connect = function () { var self = this; - if (self.fd) throw new Error('Stream already opened'); + if (self.fd) throw new Error('Socket already opened'); if (typeof(arguments[0]) == 'string' && arguments.length == 1) { self.fd = process.socket('UNIX'); @@ -304,7 +307,7 @@ Stream.prototype.connect = function () { }; -Stream.prototype.forceClose = function (exception) { +Socket.prototype.forceClose = function (exception) { if (this.fd) { this.readable = false; this.writable = false; @@ -314,12 +317,12 @@ Stream.prototype.forceClose = function (exception) { close(this.fd); debug('close peer ' + this.fd); this.fd = null; - this.emit('close', exception); + this.emit('close', exception); } }; -Stream.prototype._shutdown = function () { +Socket.prototype._shutdown = function () { if (this.writable) { this.writable = false; shutdown(this.fd, "write"); @@ -327,7 +330,7 @@ Stream.prototype._shutdown = function () { }; -Stream.prototype.close = function () { +Socket.prototype.close = function () { var self = this; var closeMethod; if (self.readable && self.writable) { @@ -336,7 +339,7 @@ Stream.prototype.close = function () { // already got EOF closeMethod = self.forceClose; } - // In the case we've already shutdown write side, + // In the case we've already shutdown write side, // but haven't got EOF: ignore. In the case we're // fully closed already: ignore. @@ -358,16 +361,17 @@ function Server (listener) { self.addListener('connection', listener); } - self.watcher = new process.IOWatcher(function (readable, writeable) { + self.watcher = new IOWatcher(); + self.watcher.callback = function (readable, writeable) { while (self.fd) { var peerInfo = accept(self.fd); debug('accept: ' + JSON.stringify(peerInfo)); if (!peerInfo) return; - var peer = new Stream(peerInfo); + var peer = new Socket(peerInfo); self.emit('connection', peer); } - }); -}; + }; +} process.inherits(Server, process.EventEmitter); exports.Server = Server; @@ -411,7 +415,7 @@ Server.prototype.listen = function () { listen(self.fd, 128); self.emit("listening"); - self.watcher.set(self.fd, true, false); + self.watcher.set(self.fd, true, false); self.watcher.start(); }; diff --git a/src/node_io_watcher.cc b/src/node_io_watcher.cc index 3c60a7bacf..4a2d9378d9 100644 --- a/src/node_io_watcher.cc +++ b/src/node_io_watcher.cc @@ -37,7 +37,11 @@ void IOWatcher::Callback(EV_P_ ev_io *w, int revents) { HandleScope scope; Local callback_v = io->handle_->Get(callback_symbol); - assert(callback_v->IsFunction()); + if (!callback_v->IsFunction()) { + io->Stop(); + return; + } + Local callback = Local::Cast(callback_v); TryCatch try_catch; @@ -64,19 +68,9 @@ void IOWatcher::Callback(EV_P_ ev_io *w, int revents) { Handle IOWatcher::New(const Arguments& args) { HandleScope scope; - if (!args[0]->IsFunction()) { - return ThrowException(Exception::TypeError( - String::New("First arg should a callback."))); - } - - Local callback = Local::Cast(args[0]); - IOWatcher *s = new IOWatcher(); - s->Wrap(args.This()); - s->handle_->Set(callback_symbol, callback); - return args.This(); } @@ -136,7 +130,6 @@ Handle IOWatcher::Stop(const Arguments& args) { void IOWatcher::Stop () { if (watcher_.active) { - HandleScope scope; ev_io_stop(EV_DEFAULT_UC_ &watcher_); Unref(); } diff --git a/src/node_io_watcher.h b/src/node_io_watcher.h index 420c6de5cc..5e73177076 100644 --- a/src/node_io_watcher.h +++ b/src/node_io_watcher.h @@ -20,7 +20,7 @@ class IOWatcher : ObjectWrap { } ~IOWatcher() { - Stop(); + ev_io_stop(EV_DEFAULT_UC_ &watcher_); } static v8::Handle New(const v8::Arguments& args);