|
|
@ -7,6 +7,7 @@ function debug (x) { |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
var assert = process.assert; |
|
|
|
var socket = process.socket; |
|
|
|
var bind = process.bind; |
|
|
|
var connect = process.connect; |
|
|
@ -18,6 +19,9 @@ var read = process.read; |
|
|
|
var write = process.write; |
|
|
|
var toRead = process.toRead; |
|
|
|
var socketError = process.socketError; |
|
|
|
var getsockname = process.getsockname; |
|
|
|
var getaddrinfo = process.getaddrinfo; |
|
|
|
var needsLookup = process.needsLookup; |
|
|
|
var EINPROGRESS = process.EINPROGRESS; |
|
|
|
|
|
|
|
|
|
|
@ -28,11 +32,8 @@ function Stream (peerInfo) { |
|
|
|
|
|
|
|
// Allocated on demand.
|
|
|
|
self.recvBuffer = null; |
|
|
|
self.sendQueue = []; |
|
|
|
|
|
|
|
self.readWatcher = new process.IOWatcher(function () { |
|
|
|
debug('\n' + self.fd + ' readable'); |
|
|
|
|
|
|
|
// If this is the first recv (recvBuffer doesn't exist) or we've used up
|
|
|
|
// most of the recvBuffer, allocate a new one.
|
|
|
|
if (!self.recvBuffer || |
|
|
@ -59,14 +60,19 @@ function Stream (peerInfo) { |
|
|
|
self.emit('receive', slice); |
|
|
|
} |
|
|
|
}); |
|
|
|
self.readable = false; |
|
|
|
|
|
|
|
self._onWriteFlush = function () { |
|
|
|
self.flush(); |
|
|
|
self.sendQueue = []; // queue of buffers that need to be written to socket
|
|
|
|
// XXX use link list?
|
|
|
|
self.sendQueueSize = 0; // in bytes, not to be confused with sendQueue.length!
|
|
|
|
self._doFlush = function () { |
|
|
|
assert(self.sendQueueSize > 0); |
|
|
|
if (self.flush()) { |
|
|
|
assert(self.sendQueueSize == 0); |
|
|
|
self.emit("drain"); |
|
|
|
} |
|
|
|
}; |
|
|
|
|
|
|
|
self.writeWatcher = new process.IOWatcher(self._onWriteFlush); |
|
|
|
|
|
|
|
self.readable = false; |
|
|
|
self.writeWatcher = new process.IOWatcher(self._doFlush); |
|
|
|
self.writable = false; |
|
|
|
|
|
|
|
if (peerInfo) { |
|
|
@ -76,8 +82,9 @@ function Stream (peerInfo) { |
|
|
|
|
|
|
|
self.readWatcher.set(self.fd, true, false); |
|
|
|
self.readWatcher.start(); |
|
|
|
self.writeWatcher.set(self.fd, false, true); |
|
|
|
self.readable = true; |
|
|
|
|
|
|
|
self.writeWatcher.set(self.fd, false, true); |
|
|
|
self.writable = true; |
|
|
|
} |
|
|
|
}; |
|
|
@ -85,6 +92,13 @@ process.inherits(Stream, process.EventEmitter); |
|
|
|
exports.Stream = Stream; |
|
|
|
|
|
|
|
|
|
|
|
exports.createConnection = function (port, host) { |
|
|
|
var s = new Stream(); |
|
|
|
s.connect(port, host); |
|
|
|
return s; |
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
Stream.prototype._allocateNewRecvBuf = function () { |
|
|
|
var self = this; |
|
|
|
|
|
|
@ -122,6 +136,7 @@ Stream.prototype._allocateSendBuffer = function () { |
|
|
|
|
|
|
|
Stream.prototype._sendString = function (data, encoding) { |
|
|
|
var self = this; |
|
|
|
if (!self.writable) throw new Error('Stream is not writable'); |
|
|
|
var buffer; |
|
|
|
if (self.sendQueue.length == 0) { |
|
|
|
buffer = self._allocateSendBuffer(); |
|
|
@ -144,22 +159,26 @@ Stream.prototype._sendString = function (data, encoding) { |
|
|
|
encoding = encoding || 'ascii'; // default to ascii since it's faster
|
|
|
|
|
|
|
|
var charsWritten; |
|
|
|
var bytesWritten; |
|
|
|
|
|
|
|
if (encoding.toLowerCase() == 'utf8') { |
|
|
|
charsWritten = buffer.utf8Write(data, |
|
|
|
buffer.used, |
|
|
|
buffer.length - buffer.used); |
|
|
|
buffer.used += process.Buffer.utf8Length(data.slice(0, charsWritten)); |
|
|
|
bytesWritten = process.Buffer.utf8Length(data.slice(0, charsWritten)); |
|
|
|
} else { |
|
|
|
// ascii
|
|
|
|
charsWritten = buffer.asciiWrite(data, |
|
|
|
buffer.used, |
|
|
|
buffer.length - buffer.used); |
|
|
|
buffer.used += charsWritten; |
|
|
|
debug('ascii charsWritten ' + charsWritten); |
|
|
|
debug('ascii buffer.used ' + buffer.used); |
|
|
|
bytesWritten = charsWritten; |
|
|
|
} |
|
|
|
|
|
|
|
buffer.used += bytesWritten; |
|
|
|
self.sendQueueSize += bytesWritten; |
|
|
|
|
|
|
|
debug('charsWritten ' + charsWritten); |
|
|
|
debug('buffer.used ' + buffer.used); |
|
|
|
|
|
|
|
// If we didn't finish, then recurse with the rest of the string.
|
|
|
|
if (charsWritten < data.length) { |
|
|
@ -169,8 +188,12 @@ Stream.prototype._sendString = function (data, encoding) { |
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
// Returns true if all the data was flushed to socket. Returns false if
|
|
|
|
// something was queued. If data was queued, then the "drain" event will
|
|
|
|
// signal when it has been finally flushed to socket.
|
|
|
|
Stream.prototype.send = function (data, encoding) { |
|
|
|
var self = this; |
|
|
|
if (!self.writable) throw new Error('Stream is not writable'); |
|
|
|
if (typeof(data) == 'string') { |
|
|
|
self._sendString(data, encoding); |
|
|
|
} else { |
|
|
@ -189,15 +212,18 @@ Stream.prototype.send = function (data, encoding) { |
|
|
|
} |
|
|
|
|
|
|
|
if (!inserted) self.sendQueue.push(data); |
|
|
|
|
|
|
|
self.sendQueueSize += data.used; |
|
|
|
} |
|
|
|
this.flush(); |
|
|
|
return this.flush(); |
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
// returns true if flushed without getting EAGAIN
|
|
|
|
// false if it got EAGAIN
|
|
|
|
// Flushes the write buffer out. Emits "drain" if the buffer is empty.
|
|
|
|
Stream.prototype.flush = function () { |
|
|
|
var self = this; |
|
|
|
if (!self.writable) throw new Error('Stream is not writable'); |
|
|
|
|
|
|
|
var bytesWritten; |
|
|
|
while (self.sendQueue.length > 0) { |
|
|
|
var b = self.sendQueue[0]; |
|
|
@ -213,13 +239,16 @@ Stream.prototype.flush = function () { |
|
|
|
b.sent, |
|
|
|
b.used - b.sent); |
|
|
|
if (bytesWritten === null) { |
|
|
|
this.writeWatcher.start(); |
|
|
|
// could not flush everything
|
|
|
|
self.writeWatcher.start(); |
|
|
|
assert(self.sendQueueSize > 0); |
|
|
|
return false; |
|
|
|
} |
|
|
|
b.sent += bytesWritten; |
|
|
|
self.sendQueueSize -= bytesWritten; |
|
|
|
debug('bytes sent: ' + b.sent); |
|
|
|
} |
|
|
|
this.writeWatcher.stop(); |
|
|
|
self.writeWatcher.stop(); |
|
|
|
return true; |
|
|
|
}; |
|
|
|
|
|
|
@ -261,17 +290,15 @@ Stream.prototype.connect = function () { |
|
|
|
var errno = socketError(self.fd); |
|
|
|
if (errno == 0) { |
|
|
|
// connection established
|
|
|
|
self.emit('connect'); |
|
|
|
self.readWatcher.start(); |
|
|
|
self.readable = true; |
|
|
|
self.writable = true; |
|
|
|
self.writeWatcher.callback = self._onWriteFlush; |
|
|
|
self.writeWatcher.callback = self._doFlush; |
|
|
|
self.emit('connect'); |
|
|
|
} else if (errno != EINPROGRESS) { |
|
|
|
var e = new Error('connection error'); |
|
|
|
e.errno = errno; |
|
|
|
self.readWatcher.stop(); |
|
|
|
self.writeWatcher.stop(); |
|
|
|
close(self.fd); |
|
|
|
self.forceClose(e); |
|
|
|
} |
|
|
|
}; |
|
|
|
}; |
|
|
@ -292,17 +319,35 @@ Stream.prototype.forceClose = function (exception) { |
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
Stream.prototype.close = function () { |
|
|
|
if (this.readable && this.writable) { |
|
|
|
Stream.prototype._shutdown = function () { |
|
|
|
if (this.writable) { |
|
|
|
this.writable = false; |
|
|
|
shutdown(this.fd, "write"); |
|
|
|
} else if (!this.readable && this.writable) { |
|
|
|
} |
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
Stream.prototype.close = function () { |
|
|
|
var self = this; |
|
|
|
var closeMethod; |
|
|
|
if (self.readable && self.writable) { |
|
|
|
closeMethod = self._shutdown; |
|
|
|
} else if (!self.readable && self.writable) { |
|
|
|
// already got EOF
|
|
|
|
this.forceClose(this.fd); |
|
|
|
closeMethod = self.forceClose; |
|
|
|
} |
|
|
|
// In the case we've already shutdown write side,
|
|
|
|
// but haven't got EOF: ignore. In the case we're
|
|
|
|
// fully closed already: ignore.
|
|
|
|
|
|
|
|
if (closeMethod) { |
|
|
|
if (self.sendQueueSize == 0) { |
|
|
|
// no queue. just shut down the socket.
|
|
|
|
closeMethod(); |
|
|
|
} else { |
|
|
|
self.addListener("drain", closeMethod); |
|
|
|
} |
|
|
|
} |
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
@ -327,30 +372,43 @@ process.inherits(Server, process.EventEmitter); |
|
|
|
exports.Server = Server; |
|
|
|
|
|
|
|
|
|
|
|
exports.createServer = function (listener) { |
|
|
|
return new Server(listener); |
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
Server.prototype.listen = function () { |
|
|
|
var self = this; |
|
|
|
if (self.fd) throw new Error('Server already opened'); |
|
|
|
|
|
|
|
var backlogIndex; |
|
|
|
if (typeof(arguments[0]) == 'string' && arguments.length == 1) { |
|
|
|
// the first argument specifies a path
|
|
|
|
self.fd = process.socket('UNIX'); |
|
|
|
self.type = 'UNIX'; |
|
|
|
// TODO unlink sockfile if exists?
|
|
|
|
// if (lstat(SOCKFILE, &tstat) == 0) {
|
|
|
|
// assert(S_ISSOCK(tstat.st_mode));
|
|
|
|
// unlink(SOCKFILE);
|
|
|
|
// }
|
|
|
|
bind(self.fd, arguments[0]); |
|
|
|
backlogIndex = 1; |
|
|
|
} else if (arguments.length == 0) { |
|
|
|
self.fd = process.socket('TCP'); |
|
|
|
self.type = 'TCP'; |
|
|
|
// Don't bind(). OS will assign a port with INADDR_ANY. The port will be
|
|
|
|
// passed to the 'listening' event.
|
|
|
|
} else { |
|
|
|
// the first argument is the port, the second an IP
|
|
|
|
self.fd = process.socket('TCP'); |
|
|
|
self.type = 'TCP'; |
|
|
|
if (needsLookup(arguments[1])) { |
|
|
|
getaddrinfo(arguments[1], function (ip) { |
|
|
|
}); |
|
|
|
} |
|
|
|
// TODO dns resolution on arguments[1]
|
|
|
|
bind(self.fd, arguments[0], arguments[1]); |
|
|
|
backlogIndex = typeof(arguments[1]) == 'string' ? 2 : 1; |
|
|
|
} |
|
|
|
listen(self.fd, arguments[backlogIndex] ? arguments[backlogIndex] : 128); |
|
|
|
|
|
|
|
listen(self.fd, 128); |
|
|
|
self.emit("listening"); |
|
|
|
|
|
|
|
self.watcher.set(self.fd, true, false); |
|
|
@ -358,10 +416,15 @@ Server.prototype.listen = function () { |
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
Server.prototype.sockName = function () { |
|
|
|
return getsockname(self.fd); |
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
Server.prototype.close = function () { |
|
|
|
var self = this; |
|
|
|
if (!self.fd) throw new Error('Not running'); |
|
|
|
self.watcher.stop(); |
|
|
|
close(self.fd); |
|
|
|
self.fd = null; |
|
|
|
if (!this.fd) throw new Error('Not running'); |
|
|
|
this.watcher.stop(); |
|
|
|
close(this.fd); |
|
|
|
this.fd = null; |
|
|
|
this.emit("close"); |
|
|
|
}; |
|
|
|