Browse Source

Change IOWatcher constructor to have no arguments

v0.7.4-release
Ryan Dahl 15 years ago
parent
commit
153b755936
  1. 54
      lib/net.js
  2. 17
      src/node_io_watcher.cc
  3. 2
      src/node_io_watcher.h

54
lib/net.js

@ -7,6 +7,7 @@ function debug (x) {
} }
var IOWatcher = process.IOWatcher;
var assert = process.assert; var assert = process.assert;
var socket = process.socket; var socket = process.socket;
var bind = process.bind; var bind = process.bind;
@ -25,7 +26,7 @@ var needsLookup = process.needsLookup;
var EINPROGRESS = process.EINPROGRESS; var EINPROGRESS = process.EINPROGRESS;
function Stream (peerInfo) { function Socket (peerInfo) {
process.EventEmitter.call(); process.EventEmitter.call();
var self = this; var self = this;
@ -33,7 +34,8 @@ function Stream (peerInfo) {
// Allocated on demand. // Allocated on demand.
self.recvBuffer = null; self.recvBuffer = null;
self.readWatcher = new process.IOWatcher(function () { self.readWatcher = new IOWatcher()
self.readWatcher.callback = function () {
// If this is the first recv (recvBuffer doesn't exist) or we've used up // If this is the first recv (recvBuffer doesn't exist) or we've used up
// most of the recvBuffer, allocate a new one. // most of the recvBuffer, allocate a new one.
if (!self.recvBuffer || if (!self.recvBuffer ||
@ -59,7 +61,7 @@ function Stream (peerInfo) {
self.recvBuffer.used += bytesRead; self.recvBuffer.used += bytesRead;
self.emit('receive', slice); self.emit('receive', slice);
} }
}); };
self.readable = false; self.readable = false;
self.sendQueue = []; // queue of buffers that need to be written to socket self.sendQueue = []; // queue of buffers that need to be written to socket
@ -72,7 +74,8 @@ function Stream (peerInfo) {
self.emit("drain"); self.emit("drain");
} }
}; };
self.writeWatcher = new process.IOWatcher(self._doFlush); self.writeWatcher = new IOWatcher();
self.writeWatcher.callback = self._doFlush;
self.writable = false; self.writable = false;
if (peerInfo) { if (peerInfo) {
@ -88,18 +91,18 @@ function Stream (peerInfo) {
self.writable = true; self.writable = true;
} }
}; };
process.inherits(Stream, process.EventEmitter); process.inherits(Socket, process.EventEmitter);
exports.Stream = Stream; exports.Socket = Socket;
exports.createConnection = function (port, host) { exports.createConnection = function (port, host) {
var s = new Stream(); var s = new Socket();
s.connect(port, host); s.connect(port, host);
return s; return s;
}; };
Stream.prototype._allocateNewRecvBuf = function () { Socket.prototype._allocateNewRecvBuf = function () {
var self = this; var self = this;
var newBufferSize = 1024; // TODO make this adjustable from user API var newBufferSize = 1024; // TODO make this adjustable from user API
@ -125,7 +128,7 @@ Stream.prototype._allocateNewRecvBuf = function () {
}; };
Stream.prototype._allocateSendBuffer = function () { Socket.prototype._allocateSendBuffer = function () {
var b = new process.Buffer(1024); var b = new process.Buffer(1024);
b.used = 0; b.used = 0;
b.sent = 0; b.sent = 0;
@ -134,9 +137,9 @@ Stream.prototype._allocateSendBuffer = function () {
}; };
Stream.prototype._sendString = function (data, encoding) { Socket.prototype._sendString = function (data, encoding) {
var self = this; var self = this;
if (!self.writable) throw new Error('Stream is not writable'); if (!self.writable) throw new Error('Socket is not writable');
var buffer; var buffer;
if (self.sendQueue.length == 0) { if (self.sendQueue.length == 0) {
buffer = self._allocateSendBuffer(); buffer = self._allocateSendBuffer();
@ -191,9 +194,9 @@ Stream.prototype._sendString = function (data, encoding) {
// Returns true if all the data was flushed to socket. Returns false if // Returns true if all the data was flushed to socket. Returns false if
// something was queued. If data was queued, then the "drain" event will // something was queued. If data was queued, then the "drain" event will
// signal when it has been finally flushed to socket. // signal when it has been finally flushed to socket.
Stream.prototype.send = function (data, encoding) { Socket.prototype.send = function (data, encoding) {
var self = this; var self = this;
if (!self.writable) throw new Error('Stream is not writable'); if (!self.writable) throw new Error('Socket is not writable');
if (typeof(data) == 'string') { if (typeof(data) == 'string') {
self._sendString(data, encoding); self._sendString(data, encoding);
} else { } else {
@ -220,9 +223,9 @@ Stream.prototype.send = function (data, encoding) {
// Flushes the write buffer out. Emits "drain" if the buffer is empty. // Flushes the write buffer out. Emits "drain" if the buffer is empty.
Stream.prototype.flush = function () { Socket.prototype.flush = function () {
var self = this; var self = this;
if (!self.writable) throw new Error('Stream is not writable'); if (!self.writable) throw new Error('Socket is not writable');
var bytesWritten; var bytesWritten;
while (self.sendQueue.length > 0) { while (self.sendQueue.length > 0) {
@ -253,13 +256,13 @@ Stream.prototype.flush = function () {
}; };
// var stream = new Stream(); // var stream = new Socket();
// stream.connect(80) - TCP connect to port 80 on the localhost // stream.connect(80) - TCP connect to port 80 on the localhost
// stream.connect(80, 'nodejs.org') - TCP connect to port 80 on nodejs.org // stream.connect(80, 'nodejs.org') - TCP connect to port 80 on nodejs.org
// stream.connect('/tmp/socket') - UNIX connect to socket specified by path // stream.connect('/tmp/socket') - UNIX connect to socket specified by path
Stream.prototype.connect = function () { Socket.prototype.connect = function () {
var self = this; var self = this;
if (self.fd) throw new Error('Stream already opened'); if (self.fd) throw new Error('Socket already opened');
if (typeof(arguments[0]) == 'string' && arguments.length == 1) { if (typeof(arguments[0]) == 'string' && arguments.length == 1) {
self.fd = process.socket('UNIX'); self.fd = process.socket('UNIX');
@ -304,7 +307,7 @@ Stream.prototype.connect = function () {
}; };
Stream.prototype.forceClose = function (exception) { Socket.prototype.forceClose = function (exception) {
if (this.fd) { if (this.fd) {
this.readable = false; this.readable = false;
this.writable = false; this.writable = false;
@ -319,7 +322,7 @@ Stream.prototype.forceClose = function (exception) {
}; };
Stream.prototype._shutdown = function () { Socket.prototype._shutdown = function () {
if (this.writable) { if (this.writable) {
this.writable = false; this.writable = false;
shutdown(this.fd, "write"); shutdown(this.fd, "write");
@ -327,7 +330,7 @@ Stream.prototype._shutdown = function () {
}; };
Stream.prototype.close = function () { Socket.prototype.close = function () {
var self = this; var self = this;
var closeMethod; var closeMethod;
if (self.readable && self.writable) { if (self.readable && self.writable) {
@ -358,16 +361,17 @@ function Server (listener) {
self.addListener('connection', listener); self.addListener('connection', listener);
} }
self.watcher = new process.IOWatcher(function (readable, writeable) { self.watcher = new IOWatcher();
self.watcher.callback = function (readable, writeable) {
while (self.fd) { while (self.fd) {
var peerInfo = accept(self.fd); var peerInfo = accept(self.fd);
debug('accept: ' + JSON.stringify(peerInfo)); debug('accept: ' + JSON.stringify(peerInfo));
if (!peerInfo) return; if (!peerInfo) return;
var peer = new Stream(peerInfo); var peer = new Socket(peerInfo);
self.emit('connection', peer); self.emit('connection', peer);
} }
}); };
}; }
process.inherits(Server, process.EventEmitter); process.inherits(Server, process.EventEmitter);
exports.Server = Server; exports.Server = Server;

17
src/node_io_watcher.cc

@ -37,7 +37,11 @@ void IOWatcher::Callback(EV_P_ ev_io *w, int revents) {
HandleScope scope; HandleScope scope;
Local<Value> callback_v = io->handle_->Get(callback_symbol); Local<Value> callback_v = io->handle_->Get(callback_symbol);
assert(callback_v->IsFunction()); if (!callback_v->IsFunction()) {
io->Stop();
return;
}
Local<Function> callback = Local<Function>::Cast(callback_v); Local<Function> callback = Local<Function>::Cast(callback_v);
TryCatch try_catch; TryCatch try_catch;
@ -64,19 +68,9 @@ void IOWatcher::Callback(EV_P_ ev_io *w, int revents) {
Handle<Value> IOWatcher::New(const Arguments& args) { Handle<Value> IOWatcher::New(const Arguments& args) {
HandleScope scope; HandleScope scope;
if (!args[0]->IsFunction()) {
return ThrowException(Exception::TypeError(
String::New("First arg should a callback.")));
}
Local<Function> callback = Local<Function>::Cast(args[0]);
IOWatcher *s = new IOWatcher(); IOWatcher *s = new IOWatcher();
s->Wrap(args.This()); s->Wrap(args.This());
s->handle_->Set(callback_symbol, callback);
return args.This(); return args.This();
} }
@ -136,7 +130,6 @@ Handle<Value> IOWatcher::Stop(const Arguments& args) {
void IOWatcher::Stop () { void IOWatcher::Stop () {
if (watcher_.active) { if (watcher_.active) {
HandleScope scope;
ev_io_stop(EV_DEFAULT_UC_ &watcher_); ev_io_stop(EV_DEFAULT_UC_ &watcher_);
Unref(); Unref();
} }

2
src/node_io_watcher.h

@ -20,7 +20,7 @@ class IOWatcher : ObjectWrap {
} }
~IOWatcher() { ~IOWatcher() {
Stop(); ev_io_stop(EV_DEFAULT_UC_ &watcher_);
} }
static v8::Handle<v8::Value> New(const v8::Arguments& args); static v8::Handle<v8::Value> New(const v8::Arguments& args);

Loading…
Cancel
Save