|
@ -17,6 +17,8 @@ var accept = process.accept; |
|
|
var close = process.close; |
|
|
var close = process.close; |
|
|
var shutdown = process.shutdown; |
|
|
var shutdown = process.shutdown; |
|
|
var read = process.read; |
|
|
var read = process.read; |
|
|
|
|
|
var recvMsg = process.recvMsg; |
|
|
|
|
|
var sendFD = process.sendFD; |
|
|
var write = process.write; |
|
|
var write = process.write; |
|
|
var toRead = process.toRead; |
|
|
var toRead = process.toRead; |
|
|
var setNoDelay = process.setNoDelay; |
|
|
var setNoDelay = process.setNoDelay; |
|
@ -28,7 +30,6 @@ var EINPROGRESS = process.EINPROGRESS; |
|
|
var ENOENT = process.ENOENT; |
|
|
var ENOENT = process.ENOENT; |
|
|
var END_OF_FILE = 42; |
|
|
var END_OF_FILE = 42; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
function Socket (peerInfo) { |
|
|
function Socket (peerInfo) { |
|
|
process.EventEmitter.call(); |
|
|
process.EventEmitter.call(); |
|
|
|
|
|
|
|
@ -37,7 +38,7 @@ function Socket (peerInfo) { |
|
|
// Allocated on demand.
|
|
|
// Allocated on demand.
|
|
|
self.recvBuffer = null; |
|
|
self.recvBuffer = null; |
|
|
|
|
|
|
|
|
self.readWatcher = new IOWatcher() |
|
|
self.readWatcher = new IOWatcher(); |
|
|
self.readWatcher.callback = function () { |
|
|
self.readWatcher.callback = function () { |
|
|
// If this is the first recv (recvBuffer doesn't exist) or we've used up
|
|
|
// If this is the first recv (recvBuffer doesn't exist) or we've used up
|
|
|
// most of the recvBuffer, allocate a new one.
|
|
|
// most of the recvBuffer, allocate a new one.
|
|
@ -47,10 +48,23 @@ function Socket (peerInfo) { |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
debug('recvBuffer.used ' + self.recvBuffer.used); |
|
|
debug('recvBuffer.used ' + self.recvBuffer.used); |
|
|
var bytesRead = read(self.fd, |
|
|
var bytesRead; |
|
|
|
|
|
var receivedFd = -1; |
|
|
|
|
|
|
|
|
|
|
|
if (self.type == "unix") { |
|
|
|
|
|
var msgInfo = recvMsg(self.fd, |
|
|
|
|
|
self.recvBuffer, |
|
|
|
|
|
self.recvBuffer.used, |
|
|
|
|
|
self.recvBuffer.length - self.recvBuffer.used); |
|
|
|
|
|
bytesRead = msgInfo[0]; |
|
|
|
|
|
receivedFd = msgInfo[1]; |
|
|
|
|
|
debug('receivedFd ' + receivedFd); |
|
|
|
|
|
} else { |
|
|
|
|
|
bytesRead = read(self.fd, |
|
|
self.recvBuffer, |
|
|
self.recvBuffer, |
|
|
self.recvBuffer.used, |
|
|
self.recvBuffer.used, |
|
|
self.recvBuffer.length - self.recvBuffer.used); |
|
|
self.recvBuffer.length - self.recvBuffer.used); |
|
|
|
|
|
} |
|
|
debug('bytesRead ' + bytesRead + '\n'); |
|
|
debug('bytesRead ' + bytesRead + '\n'); |
|
|
|
|
|
|
|
|
if (bytesRead == 0) { |
|
|
if (bytesRead == 0) { |
|
@ -59,10 +73,15 @@ function Socket (peerInfo) { |
|
|
self.emit('eof'); |
|
|
self.emit('eof'); |
|
|
if (!self.writable) self.forceClose(); |
|
|
if (!self.writable) self.forceClose(); |
|
|
} else { |
|
|
} else { |
|
|
var slice = self.recvBuffer.slice(self.recvBuffer.used, |
|
|
if (receivedFd == -1) { |
|
|
self.recvBuffer.used + bytesRead); |
|
|
var slice = self.recvBuffer.slice(self.recvBuffer.used, |
|
|
self.recvBuffer.used += bytesRead; |
|
|
self.recvBuffer.used + bytesRead); |
|
|
self.emit('data', slice); |
|
|
self.recvBuffer.used += bytesRead; |
|
|
|
|
|
self.emit('data', slice); |
|
|
|
|
|
} else { |
|
|
|
|
|
self.recvBuffer.used += bytesRead; |
|
|
|
|
|
self.emit('fd', receivedFd); |
|
|
|
|
|
} |
|
|
} |
|
|
} |
|
|
}; |
|
|
}; |
|
|
self.readable = false; |
|
|
self.readable = false; |
|
@ -70,10 +89,16 @@ function Socket (peerInfo) { |
|
|
self.sendQueue = []; // queue of buffers that need to be written to socket
|
|
|
self.sendQueue = []; // queue of buffers that need to be written to socket
|
|
|
// XXX use link list?
|
|
|
// XXX use link list?
|
|
|
self.sendQueueSize = 0; // in bytes, not to be confused with sendQueue.length!
|
|
|
self.sendQueueSize = 0; // in bytes, not to be confused with sendQueue.length!
|
|
|
|
|
|
self.sendMessageQueueSize = 0; // number of messages remaining to be sent
|
|
|
self._doFlush = function () { |
|
|
self._doFlush = function () { |
|
|
assert(self.sendQueueSize > 0); |
|
|
/* Socket becomes writeable on connect() but don't flush if there's |
|
|
|
|
|
* nothing actually to write */ |
|
|
|
|
|
if ((self.sendQueueSize == 0) && (self.sendMessageQueueSize == 0)) { |
|
|
|
|
|
return; |
|
|
|
|
|
} |
|
|
if (self.flush()) { |
|
|
if (self.flush()) { |
|
|
assert(self.sendQueueSize == 0); |
|
|
assert(self.sendQueueSize == 0); |
|
|
|
|
|
assert(self.sendMessageQueueSize == 0); |
|
|
self.emit("drain"); |
|
|
self.emit("drain"); |
|
|
} |
|
|
} |
|
|
}; |
|
|
}; |
|
@ -134,6 +159,7 @@ Socket.prototype._allocateSendBuffer = function () { |
|
|
var b = new process.Buffer(1024); |
|
|
var b = new process.Buffer(1024); |
|
|
b.used = 0; |
|
|
b.used = 0; |
|
|
b.sent = 0; |
|
|
b.sent = 0; |
|
|
|
|
|
b.isMsg = false; |
|
|
this.sendQueue.push(b); |
|
|
this.sendQueue.push(b); |
|
|
return b; |
|
|
return b; |
|
|
}; |
|
|
}; |
|
@ -154,6 +180,7 @@ Socket.prototype._sendString = function (data, encoding) { |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
// if we didn't find one, take the last
|
|
|
// if we didn't find one, take the last
|
|
|
|
|
|
// TODO what if this isn't empty but encoding == fd ?
|
|
|
if (!buffer) { |
|
|
if (!buffer) { |
|
|
buffer = self._sendQueueLast(); |
|
|
buffer = self._sendQueueLast(); |
|
|
// if last buffer is used up
|
|
|
// if last buffer is used up
|
|
@ -166,13 +193,22 @@ Socket.prototype._sendString = function (data, encoding) { |
|
|
var charsWritten; |
|
|
var charsWritten; |
|
|
var bytesWritten; |
|
|
var bytesWritten; |
|
|
|
|
|
|
|
|
if (encoding.toLowerCase() == 'utf8') { |
|
|
// The special encoding "fd" means that data is an integer FD and we want
|
|
|
|
|
|
// to pass the FD on the socket with sendmsg()
|
|
|
|
|
|
if (encoding == "fd") { |
|
|
|
|
|
buffer.isFd = true; |
|
|
|
|
|
// TODO is this OK -- does it guarantee that the fd is the only thing in the buffer?
|
|
|
|
|
|
charsWritten = buffer.asciiWrite(data, buffer.used, buffer.length - buffer.used); |
|
|
|
|
|
bytesWritten = charsWritten; |
|
|
|
|
|
} else if (encoding.toLowerCase() == 'utf8') { |
|
|
|
|
|
buffer.isFd = false; |
|
|
charsWritten = buffer.utf8Write(data, |
|
|
charsWritten = buffer.utf8Write(data, |
|
|
buffer.used, |
|
|
buffer.used, |
|
|
buffer.length - buffer.used); |
|
|
buffer.length - buffer.used); |
|
|
bytesWritten = process.Buffer.utf8Length(data.slice(0, charsWritten)); |
|
|
bytesWritten = process.Buffer.utf8Length(data.slice(0, charsWritten)); |
|
|
} else { |
|
|
} else { |
|
|
// ascii
|
|
|
// ascii
|
|
|
|
|
|
buffer.isFd = false; |
|
|
charsWritten = buffer.asciiWrite(data, |
|
|
charsWritten = buffer.asciiWrite(data, |
|
|
buffer.used, |
|
|
buffer.used, |
|
|
buffer.length - buffer.used); |
|
|
buffer.length - buffer.used); |
|
@ -180,7 +216,11 @@ Socket.prototype._sendString = function (data, encoding) { |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
buffer.used += bytesWritten; |
|
|
buffer.used += bytesWritten; |
|
|
self.sendQueueSize += bytesWritten; |
|
|
if (buffer.isFd) { |
|
|
|
|
|
self.sendMessageQueueSize += 1; |
|
|
|
|
|
} else { |
|
|
|
|
|
self.sendQueueSize += bytesWritten; |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
debug('charsWritten ' + charsWritten); |
|
|
debug('charsWritten ' + charsWritten); |
|
|
debug('buffer.used ' + buffer.used); |
|
|
debug('buffer.used ' + buffer.used); |
|
@ -235,6 +275,27 @@ Socket.prototype.send = function (data, encoding) { |
|
|
return this.flush(); |
|
|
return this.flush(); |
|
|
}; |
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
// Sends a file descriptor over a unix socket
|
|
|
|
|
|
Socket.prototype.sendFD = function(socketToPass) { |
|
|
|
|
|
var self = this; |
|
|
|
|
|
|
|
|
|
|
|
if (!self.writable) throw new Error('Socket is not writable'); |
|
|
|
|
|
|
|
|
|
|
|
if (self._sendQueueLast == END_OF_FILE) { |
|
|
|
|
|
throw new Error('socket.close() called already; cannot write.'); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
if (self.type != "unix") { |
|
|
|
|
|
throw new Error('FD passing only available on unix sockets'); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
if (! socketToPass instanceof Socket) { |
|
|
|
|
|
throw new Error('Provided arg is not a socket'); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
return self.send(socketToPass.fd.toString(), "fd"); |
|
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Flushes the write buffer out. Emits "drain" if the buffer is empty.
|
|
|
// Flushes the write buffer out. Emits "drain" if the buffer is empty.
|
|
|
Socket.prototype.flush = function () { |
|
|
Socket.prototype.flush = function () { |
|
@ -253,23 +314,35 @@ Socket.prototype.flush = function () { |
|
|
|
|
|
|
|
|
if (b.sent == b.used) { |
|
|
if (b.sent == b.used) { |
|
|
// this can be improved - save the buffer for later?
|
|
|
// this can be improved - save the buffer for later?
|
|
|
self.sendQueue.shift() |
|
|
self.sendQueue.shift(); |
|
|
continue; |
|
|
continue; |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
bytesWritten = write(self.fd, |
|
|
var fdToSend = null; |
|
|
b, |
|
|
if (b.isFd) { |
|
|
b.sent, |
|
|
fdToSend = parseInt(b.asciiSlice(b.sent, b.used - b.sent)); |
|
|
b.used - b.sent); |
|
|
bytesWritten = sendFD(self.fd, fdToSend); |
|
|
|
|
|
} else { |
|
|
|
|
|
bytesWritten = write(self.fd, |
|
|
|
|
|
b, |
|
|
|
|
|
b.sent, |
|
|
|
|
|
b.used - b.sent); |
|
|
|
|
|
} |
|
|
if (bytesWritten === null) { |
|
|
if (bytesWritten === null) { |
|
|
// could not flush everything
|
|
|
// could not flush everything
|
|
|
self.writeWatcher.start(); |
|
|
self.writeWatcher.start(); |
|
|
assert(self.sendQueueSize > 0); |
|
|
assert(self.sendQueueSize > 0); |
|
|
return false; |
|
|
return false; |
|
|
} |
|
|
} |
|
|
b.sent += bytesWritten; |
|
|
if (b.isFd) { |
|
|
self.sendQueueSize -= bytesWritten; |
|
|
b.sent = b.used; |
|
|
debug('bytes sent: ' + b.sent); |
|
|
self.sendMessageQueueSize -= 1; |
|
|
|
|
|
debug('sent fd: ' + fdToSend); |
|
|
|
|
|
} else { |
|
|
|
|
|
b.sent += bytesWritten; |
|
|
|
|
|
self.sendQueueSize -= bytesWritten; |
|
|
|
|
|
debug('bytes sent: ' + b.sent); |
|
|
|
|
|
} |
|
|
} |
|
|
} |
|
|
self.writeWatcher.stop(); |
|
|
self.writeWatcher.stop(); |
|
|
return true; |
|
|
return true; |
|
@ -299,11 +372,11 @@ Socket.prototype.connect = function () { |
|
|
// socketError() if there isn't an error, we're connected. AFAIK this a
|
|
|
// socketError() if there isn't an error, we're connected. AFAIK this a
|
|
|
// platform independent way determining when a non-blocking connection
|
|
|
// platform independent way determining when a non-blocking connection
|
|
|
// is established, but I have only seen it documented in the Linux
|
|
|
// is established, but I have only seen it documented in the Linux
|
|
|
// Manual Page connect(2) under the error code EINPROGRESS.
|
|
|
// Manual Page connect(2) under the error code EINPROGRESS.
|
|
|
self.writeWatcher.set(self.fd, false, true); |
|
|
self.writeWatcher.set(self.fd, false, true); |
|
|
self.writeWatcher.start(); |
|
|
self.writeWatcher.start(); |
|
|
self.writeWatcher.callback = function () { |
|
|
self.writeWatcher.callback = function () { |
|
|
var errno = socketError(self.fd); |
|
|
var errno = socketError(self.fd); |
|
|
if (errno == 0) { |
|
|
if (errno == 0) { |
|
|
// connection established
|
|
|
// connection established
|
|
|
self.readWatcher.start(); |
|
|
self.readWatcher.start(); |
|
@ -340,7 +413,6 @@ Socket.prototype.address = function () { |
|
|
return getsockname(this.fd); |
|
|
return getsockname(this.fd); |
|
|
}; |
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Socket.prototype.setNoDelay = function (v) { |
|
|
Socket.prototype.setNoDelay = function (v) { |
|
|
if (this.type == 'tcp') setNoDelay(this.fd, v); |
|
|
if (this.type == 'tcp') setNoDelay(this.fd, v); |
|
|
}; |
|
|
}; |
|
@ -393,6 +465,7 @@ function Server (listener) { |
|
|
debug('accept: ' + JSON.stringify(peerInfo)); |
|
|
debug('accept: ' + JSON.stringify(peerInfo)); |
|
|
if (!peerInfo) return; |
|
|
if (!peerInfo) return; |
|
|
var peer = new Socket(peerInfo); |
|
|
var peer = new Socket(peerInfo); |
|
|
|
|
|
peer.type = self.type; |
|
|
peer.server = self; |
|
|
peer.server = self; |
|
|
self.emit('connection', peer); |
|
|
self.emit('connection', peer); |
|
|
} |
|
|
} |
|
|