mirror of https://github.com/lukechilds/node.git
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
603 lines
16 KiB
603 lines
16 KiB
var debugLevel = 0;
|
|
if ('NODE_DEBUG' in process.ENV) debugLevel = 1;
|
|
function debug (x) {
|
|
if (debugLevel > 0) {
|
|
process.stdio.writeError(x + '\n');
|
|
}
|
|
}
|
|
|
|
|
|
var IOWatcher = process.IOWatcher;
|
|
var assert = process.assert;
|
|
var socket = process.socket;
|
|
var bind = process.bind;
|
|
var connect = process.connect;
|
|
var listen = process.listen;
|
|
var accept = process.accept;
|
|
var close = process.close;
|
|
var shutdown = process.shutdown;
|
|
var read = process.read;
|
|
var recvMsg = process.recvMsg;
|
|
var sendFD = process.sendFD;
|
|
var write = process.write;
|
|
var toRead = process.toRead;
|
|
var setNoDelay = process.setNoDelay;
|
|
var socketError = process.socketError;
|
|
var getsockname = process.getsockname;
|
|
var getaddrinfo = process.getaddrinfo;
|
|
var needsLookup = process.needsLookup;
|
|
var EINPROGRESS = process.EINPROGRESS;
|
|
var ENOENT = process.ENOENT;
|
|
var END_OF_FILE = 0;
|
|
|
|
function Socket (peerInfo) {
|
|
process.EventEmitter.call(this);
|
|
|
|
var self = this;
|
|
|
|
// Allocated on demand.
|
|
self.recvBuffer = null;
|
|
|
|
self.readWatcher = new IOWatcher();
|
|
self.readWatcher.host = this;
|
|
self.readWatcher.callback = function () {
|
|
// If this is the first recv (recvBuffer doesn't exist) or we've used up
|
|
// most of the recvBuffer, allocate a new one.
|
|
if (!self.recvBuffer ||
|
|
self.recvBuffer.length - self.recvBuffer.used < 128) {
|
|
self._allocateNewRecvBuf();
|
|
}
|
|
|
|
debug('recvBuffer.used ' + self.recvBuffer.used);
|
|
var bytesRead;
|
|
|
|
if (self.type == "unix") {
|
|
bytesRead = recvMsg(self.fd,
|
|
self.recvBuffer,
|
|
self.recvBuffer.used,
|
|
self.recvBuffer.length - self.recvBuffer.used);
|
|
debug('recvMsg.fd ' + recvMsg.fd);
|
|
if (recvMsg.fd) {
|
|
self.emit('fd', recvMsg.fd);
|
|
}
|
|
} else {
|
|
bytesRead = read(self.fd,
|
|
self.recvBuffer,
|
|
self.recvBuffer.used,
|
|
self.recvBuffer.length - self.recvBuffer.used);
|
|
}
|
|
|
|
debug('bytesRead ' + bytesRead + '\n');
|
|
|
|
if (!recvMsg.fd && bytesRead == 0) {
|
|
self.readable = false;
|
|
self.readWatcher.stop();
|
|
self.emit('eof');
|
|
if (!self.writable) self.forceClose();
|
|
} else if (bytesRead > 0) {
|
|
var slice = self.recvBuffer.slice(self.recvBuffer.used,
|
|
self.recvBuffer.used + bytesRead);
|
|
self.recvBuffer.used += bytesRead;
|
|
self.emit('data', slice);
|
|
}
|
|
};
|
|
self.readable = false;
|
|
|
|
self.sendQueue = []; // queue of buffers that need to be written to socket
|
|
// XXX use link list?
|
|
self.sendQueueSize = 0; // in bytes, not to be confused with sendQueue.length!
|
|
self.sendMessageQueueSize = 0; // number of messages remaining to be sent
|
|
self._doFlush = function () {
|
|
/* Socket becomes writeable on connect() but don't flush if there's
|
|
* nothing actually to write */
|
|
if ((self.sendQueueSize == 0) && (self.sendMessageQueueSize == 0)) {
|
|
return;
|
|
}
|
|
if (self.flush()) {
|
|
assert(self.sendQueueSize == 0);
|
|
assert(self.sendMessageQueueSize == 0);
|
|
self.emit("drain");
|
|
}
|
|
};
|
|
self.writeWatcher = new IOWatcher();
|
|
self.writeWatcher.host = this;
|
|
self.writeWatcher.callback = self._doFlush;
|
|
self.writable = false;
|
|
|
|
if (peerInfo) {
|
|
self.fd = peerInfo.fd;
|
|
self.remoteAddress = peerInfo.remoteAddress;
|
|
self.remotePort = peerInfo.remotePort;
|
|
|
|
self.readWatcher.set(self.fd, true, false);
|
|
self.readWatcher.start();
|
|
self.readable = true;
|
|
|
|
self.writeWatcher.set(self.fd, false, true);
|
|
self.writable = true;
|
|
}
|
|
};
|
|
process.inherits(Socket, process.EventEmitter);
|
|
exports.Socket = Socket;
|
|
|
|
|
|
exports.createConnection = function (port, host) {
|
|
var s = new Socket();
|
|
s.connect(port, host);
|
|
return s;
|
|
};
|
|
|
|
|
|
Socket.prototype._allocateNewRecvBuf = function () {
|
|
var self = this;
|
|
|
|
var newBufferSize = 128; // TODO make this adjustable from user API
|
|
|
|
if (toRead) {
|
|
// Is the extra system call even worth it?
|
|
var bytesToRead = toRead(self.fd);
|
|
if (bytesToRead > 1024) {
|
|
newBufferSize = 4*1024;
|
|
} else if (bytesToRead == 0) {
|
|
// Probably getting an EOF - so let's not allocate so much.
|
|
if (self.recvBuffer &&
|
|
self.recvBuffer.length - self.recvBuffer.used > 0) {
|
|
return; // just recv the eof on the old buf.
|
|
}
|
|
newBufferSize = 128;
|
|
}
|
|
}
|
|
|
|
self.recvBuffer = new process.Buffer(newBufferSize);
|
|
self.recvBuffer.used = 0;
|
|
};
|
|
|
|
|
|
Socket.prototype._allocateSendBuffer = function () {
|
|
var b = new process.Buffer(1024);
|
|
b.used = 0;
|
|
b.sent = 0;
|
|
b.isMsg = false;
|
|
this.sendQueue.push(b);
|
|
return b;
|
|
};
|
|
|
|
|
|
Socket.prototype._sendString = function (data, encoding) {
|
|
var self = this;
|
|
if (!self.writable) throw new Error('Socket is not writable');
|
|
var buffer;
|
|
if (self.sendQueue.length == 0) {
|
|
buffer = self._allocateSendBuffer();
|
|
} else {
|
|
// walk through the sendQueue, find the buffer with free space
|
|
for (var i = 0; i < self.sendQueue.length; i++) {
|
|
if (self.sendQueue[i].used == 0) {
|
|
buffer = self.sendQueue[i];
|
|
break;
|
|
}
|
|
}
|
|
// if we didn't find one, take the last
|
|
// TODO what if this isn't empty but encoding == fd ?
|
|
if (!buffer) {
|
|
buffer = self._sendQueueLast();
|
|
// if last buffer is used up
|
|
if (buffer.length == buffer.used) buffer = self._allocateSendBuffer();
|
|
}
|
|
}
|
|
|
|
encoding = encoding || 'ascii'; // default to ascii since it's faster
|
|
|
|
var charsWritten;
|
|
var bytesWritten;
|
|
|
|
// The special encoding "fd" means that data is an integer FD and we want
|
|
// to pass the FD on the socket with sendmsg()
|
|
if (encoding == "fd") {
|
|
buffer.isFd = true;
|
|
// TODO is this OK -- does it guarantee that the fd is the only thing in the buffer?
|
|
charsWritten = buffer.asciiWrite(data, buffer.used, buffer.length - buffer.used);
|
|
bytesWritten = charsWritten;
|
|
} else if (encoding.toLowerCase() == 'utf8') {
|
|
buffer.isFd = false;
|
|
charsWritten = buffer.utf8Write(data,
|
|
buffer.used,
|
|
buffer.length - buffer.used);
|
|
bytesWritten = process.Buffer.utf8Length(data.slice(0, charsWritten));
|
|
} else {
|
|
// ascii
|
|
buffer.isFd = false;
|
|
charsWritten = buffer.asciiWrite(data,
|
|
buffer.used,
|
|
buffer.length - buffer.used);
|
|
bytesWritten = charsWritten;
|
|
}
|
|
|
|
buffer.used += bytesWritten;
|
|
if (buffer.isFd) {
|
|
self.sendMessageQueueSize += 1;
|
|
} else {
|
|
self.sendQueueSize += bytesWritten;
|
|
}
|
|
|
|
debug('charsWritten ' + charsWritten);
|
|
debug('buffer.used ' + buffer.used);
|
|
|
|
// If we didn't finish, then recurse with the rest of the string.
|
|
if (charsWritten < data.length) {
|
|
debug('recursive send');
|
|
self._sendString(data.slice(charsWritten), encoding);
|
|
}
|
|
};
|
|
|
|
|
|
Socket.prototype._sendQueueLast = function () {
|
|
return this.sendQueue.length > 0 ? this.sendQueue[this.sendQueue.length-1]
|
|
: null;
|
|
};
|
|
|
|
|
|
// Returns true if all the data was flushed to socket. Returns false if
|
|
// something was queued. If data was queued, then the "drain" event will
|
|
// signal when it has been finally flushed to socket.
|
|
Socket.prototype.send = function (data, encoding) {
|
|
var self = this;
|
|
|
|
if (!self.writable) throw new Error('Socket is not writable');
|
|
|
|
if (self._sendQueueLast() == END_OF_FILE) {
|
|
throw new Error('socket.close() called already; cannot write.');
|
|
}
|
|
|
|
if (typeof(data) == 'string') {
|
|
self._sendString(data, encoding);
|
|
} else {
|
|
// data is a process.Buffer
|
|
// walk through the sendQueue, find the first empty buffer
|
|
var inserted = false;
|
|
data.sent = 0;
|
|
data.used = data.length;
|
|
for (var i = 0; i < self.sendQueue.length; i++) {
|
|
if (self.sendQueue[i].used == 0) {
|
|
// if found, insert the data there
|
|
self.sendQueue.splice(i, 0, data);
|
|
inserted = true;
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (!inserted) self.sendQueue.push(data);
|
|
|
|
self.sendQueueSize += data.used;
|
|
}
|
|
return this.flush();
|
|
};
|
|
|
|
// Sends a file descriptor over a unix socket
|
|
Socket.prototype.sendFD = function(socketToPass) {
|
|
var self = this;
|
|
|
|
if (!self.writable) throw new Error('Socket is not writable');
|
|
|
|
if (self._sendQueueLast() == END_OF_FILE) {
|
|
throw new Error('socket.close() called already; cannot write.');
|
|
}
|
|
|
|
if (self.type != "unix") {
|
|
throw new Error('FD passing only available on unix sockets');
|
|
}
|
|
|
|
if (! socketToPass instanceof Socket) {
|
|
throw new Error('Provided arg is not a socket');
|
|
}
|
|
|
|
return self.send(socketToPass.fd.toString(), "fd");
|
|
};
|
|
|
|
|
|
// Flushes the write buffer out. Emits "drain" if the buffer is empty.
|
|
Socket.prototype.flush = function () {
|
|
var self = this;
|
|
|
|
var bytesWritten;
|
|
while (self.sendQueue.length > 0) {
|
|
if (!self.writable) throw new Error('Socket is not writable');
|
|
|
|
var b = self.sendQueue[0];
|
|
|
|
if (b == END_OF_FILE) {
|
|
self._shutdown();
|
|
break;
|
|
}
|
|
|
|
if (b.sent == b.used) {
|
|
// this can be improved - save the buffer for later?
|
|
self.sendQueue.shift();
|
|
continue;
|
|
}
|
|
|
|
var fdToSend = null;
|
|
if (b.isFd) {
|
|
fdToSend = parseInt(b.asciiSlice(b.sent, b.used - b.sent));
|
|
bytesWritten = sendFD(self.fd, fdToSend);
|
|
} else {
|
|
bytesWritten = write(self.fd,
|
|
b,
|
|
b.sent,
|
|
b.used - b.sent);
|
|
}
|
|
if (bytesWritten === null) {
|
|
// could not flush everything
|
|
self.writeWatcher.start();
|
|
assert(self.sendQueueSize > 0);
|
|
return false;
|
|
}
|
|
if (b.isFd) {
|
|
b.sent = b.used;
|
|
self.sendMessageQueueSize -= 1;
|
|
debug('sent fd: ' + fdToSend);
|
|
} else {
|
|
b.sent += bytesWritten;
|
|
self.sendQueueSize -= bytesWritten;
|
|
debug('bytes sent: ' + b.sent);
|
|
}
|
|
}
|
|
self.writeWatcher.stop();
|
|
return true;
|
|
};
|
|
|
|
|
|
// var stream = new Socket();
|
|
// stream.connect(80) - TCP connect to port 80 on the localhost
|
|
// stream.connect(80, 'nodejs.org') - TCP connect to port 80 on nodejs.org
|
|
// stream.connect('/tmp/socket') - UNIX connect to socket specified by path
|
|
Socket.prototype.connect = function () {
|
|
var self = this;
|
|
if (self.fd) throw new Error('Socket already opened');
|
|
|
|
function doConnect () {
|
|
try {
|
|
connect(self.fd, arguments[0], arguments[1]);
|
|
} catch (e) {
|
|
close(self.fd);
|
|
throw e;
|
|
}
|
|
|
|
// Don't start the read watcher until connection is established
|
|
self.readWatcher.set(self.fd, true, false);
|
|
|
|
// How to connect on POSIX: Wait for fd to become writable, then call
|
|
// socketError() if there isn't an error, we're connected. AFAIK this a
|
|
// platform independent way determining when a non-blocking connection
|
|
// is established, but I have only seen it documented in the Linux
|
|
// Manual Page connect(2) under the error code EINPROGRESS.
|
|
self.writeWatcher.set(self.fd, false, true);
|
|
self.writeWatcher.start();
|
|
self.writeWatcher.callback = function () {
|
|
var errno = socketError(self.fd);
|
|
if (errno == 0) {
|
|
// connection established
|
|
self.readWatcher.start();
|
|
self.readable = true;
|
|
self.writable = true;
|
|
self.writeWatcher.callback = self._doFlush;
|
|
self.emit('connect');
|
|
} else if (errno != EINPROGRESS) {
|
|
var e = new Error('connection error');
|
|
e.errno = errno;
|
|
self.forceClose(e);
|
|
}
|
|
};
|
|
}
|
|
|
|
if (typeof(arguments[0]) == 'string') {
|
|
self.fd = socket('unix');
|
|
self.type = 'unix';
|
|
// TODO check if sockfile exists?
|
|
doConnect(arguments[0]);
|
|
} else {
|
|
self.fd = socket('tcp');
|
|
self.type = 'tcp';
|
|
// TODO dns resolution on arguments[1]
|
|
var port = arguments[0];
|
|
lookupDomainName(arguments[1], function (ip) {
|
|
doConnect(port, ip);
|
|
});
|
|
}
|
|
};
|
|
|
|
|
|
Socket.prototype.address = function () {
|
|
return getsockname(this.fd);
|
|
};
|
|
|
|
Socket.prototype.setNoDelay = function (v) {
|
|
if (this.type == 'tcp') setNoDelay(this.fd, v);
|
|
};
|
|
|
|
|
|
Socket.prototype.forceClose = function (exception) {
|
|
if (this.fd) {
|
|
this.readable = false;
|
|
this.writable = false;
|
|
|
|
this.writeWatcher.stop();
|
|
this.readWatcher.stop();
|
|
|
|
close(this.fd);
|
|
debug('close socket ' + this.fd);
|
|
this.fd = null;
|
|
this.emit('close', exception);
|
|
}
|
|
};
|
|
|
|
|
|
Socket.prototype._shutdown = function () {
|
|
if (this.writable) {
|
|
this.writable = false;
|
|
shutdown(this.fd, 'write');
|
|
}
|
|
};
|
|
|
|
|
|
Socket.prototype.close = function () {
|
|
if (this.writable) {
|
|
if (this._sendQueueLast() != END_OF_FILE) {
|
|
this.sendQueue.push(END_OF_FILE);
|
|
this.flush();
|
|
}
|
|
}
|
|
};
|
|
|
|
|
|
function Server (listener) {
|
|
process.EventEmitter.call(this);
|
|
var self = this;
|
|
|
|
if (listener) {
|
|
self.addListener('connection', listener);
|
|
}
|
|
|
|
self.watcher = new IOWatcher();
|
|
self.watcher.host = self;
|
|
self.watcher.callback = function (readable, writeable) {
|
|
while (self.fd) {
|
|
var peerInfo = accept(self.fd);
|
|
debug('accept: ' + JSON.stringify(peerInfo));
|
|
if (!peerInfo) return;
|
|
var peer = new Socket(peerInfo);
|
|
peer.type = self.type;
|
|
peer.server = self;
|
|
self.emit('connection', peer);
|
|
}
|
|
};
|
|
}
|
|
process.inherits(Server, process.EventEmitter);
|
|
exports.Server = Server;
|
|
|
|
|
|
exports.createServer = function (listener) {
|
|
return new Server(listener);
|
|
};
|
|
|
|
/* This function does both an ipv4 and ipv6 look up.
|
|
* It first tries the ipv4 look up, if that fails, then it does the ipv6.
|
|
*/
|
|
function lookupDomainName (dn, callback) {
|
|
if (!needsLookup(dn)) {
|
|
callback(dn);
|
|
} else {
|
|
debug("getaddrinfo 4 " + dn);
|
|
getaddrinfo(dn, 4, function (r4) {
|
|
if (r4 instanceof Error) throw r4;
|
|
if (r4.length > 0) {
|
|
debug("getaddrinfo 4 found " + r4);
|
|
callback(r4[0]);
|
|
} else {
|
|
debug("getaddrinfo 6 " + dn);
|
|
getaddrinfo(dn, 6, function (r6) {
|
|
if (r6 instanceof Error) throw r6;
|
|
if (r6.length < 0) {
|
|
throw new Error("No address associated with hostname " + dn);
|
|
}
|
|
debug("getaddrinfo 6 found " + r6);
|
|
callback(r6[0]);
|
|
});
|
|
}
|
|
});
|
|
}
|
|
}
|
|
|
|
|
|
// Listen on a UNIX socket
|
|
// server.listen("/tmp/socket");
|
|
//
|
|
// Listen on port 8000, accept connections from INADDR_ANY.
|
|
// server.listen(8000);
|
|
//
|
|
// Listen on port 8000, accept connections to "192.168.1.2"
|
|
// server.listen(8000, "192.168.1.2");
|
|
Server.prototype.listen = function () {
|
|
var self = this;
|
|
if (self.fd) throw new Error('Server already opened');
|
|
|
|
function doListen () {
|
|
listen(self.fd, 128);
|
|
self.watcher.set(self.fd, true, false);
|
|
self.watcher.start();
|
|
self.emit("listening");
|
|
}
|
|
|
|
if (typeof(arguments[0]) == 'string') {
|
|
// the first argument specifies a path
|
|
self.fd = socket('unix');
|
|
self.type = 'unix';
|
|
var path = arguments[0];
|
|
self.path = path;
|
|
// unlink sockfile if it exists
|
|
process.fs.stat(path, function (r) {
|
|
if (r instanceof Error) {
|
|
if (r.errno == ENOENT) {
|
|
bind(self.fd, path);
|
|
doListen();
|
|
} else {
|
|
throw r;
|
|
}
|
|
} else {
|
|
if (!r.isFile()) {
|
|
throw new Error("Non-file exists at " + path);
|
|
} else {
|
|
process.fs.unlink(path, function (err) {
|
|
if (err) {
|
|
throw err;
|
|
} else {
|
|
bind(self.fd, path);
|
|
doListen();
|
|
}
|
|
});
|
|
}
|
|
}
|
|
});
|
|
} else if (!arguments[0]) {
|
|
// Don't bind(). OS will assign a port with INADDR_ANY.
|
|
// The port can be found with server.address()
|
|
self.fd = socket('tcp');
|
|
self.type = 'tcp';
|
|
doListen();
|
|
} else {
|
|
// the first argument is the port, the second an IP
|
|
self.fd = socket('tcp');
|
|
self.type = 'tcp';
|
|
var port = arguments[0];
|
|
debug("starting tcp server on port " + port);
|
|
lookupDomainName(arguments[1], function (ip) {
|
|
debug("starting tcp server on ip " + ip);
|
|
bind(self.fd, port, ip);
|
|
doListen();
|
|
});
|
|
}
|
|
};
|
|
|
|
|
|
Server.prototype.address = function () {
|
|
return getsockname(this.fd);
|
|
};
|
|
|
|
|
|
Server.prototype.close = function () {
|
|
var self = this;
|
|
if (!self.fd) throw new Error('Not running');
|
|
|
|
self.watcher.stop();
|
|
|
|
close(self.fd);
|
|
self.fd = null;
|
|
|
|
if (self.type === "unix") {
|
|
process.fs.unlink(self.path, function () {
|
|
self.emit("close");
|
|
});
|
|
} else {
|
|
self.emit("close");
|
|
}
|
|
};
|
|
|