Browse Source

Merge branch 'writev'

v0.7.4-release
Ryan Dahl 14 years ago
parent
commit
cd9515efd9
  1. 4
      lib/http.js
  2. 320
      lib/net.js
  3. 2
      src/node.cc
  4. 4
      src/node.js
  5. 8
      src/node_buffer.cc
  6. 4
      src/node_buffer.h
  7. 438
      src/node_io_watcher.cc
  8. 8
      src/node_io_watcher.h
  9. 2
      src/node_net.cc
  10. 16
      test/fixtures/recvfd.js
  11. 135
      test/simple/test-dumper-unix.js
  12. 128
      test/simple/test-dumper.js
  13. 13
      test/simple/test-pipe.js
  14. 15
      test/simple/test-sendfd.js

4
lib/http.js

@ -827,8 +827,8 @@ function connectionListener (socket) {
// No more messages to be pushed out.
// HACK: need way to do this with socket interface
if (socket._writeQueue.length) {
socket.__destroyOnDrain = true; //socket.end();
if (socket._writeWatcher.firstBucket) {
socket.__destroyOnDrain = true;
} else {
socket.destroy();
}

320
lib/net.js

@ -54,6 +54,33 @@ var ioWatchers = new FreeList("iowatcher", 100, function () {
return new IOWatcher();
});
IOWatcher.prototype.ondrain = function () {
if (this.socket) {
var socket = this.socket;
socket._haveTriedFlush = false;
if (socket.writable || socket.readable) {
require('timers').active(socket);
}
socket.emit('drain');
if (socket.ondrain) socket.ondrain();
if (socket._eof) socket._shutdown();
}
};
IOWatcher.prototype.onerror = function (errno) {
assert(this.socket);
var e = errnoException(errno, 'write');
e.message += " fd=" + this.socket.fd;
this.socket.destroy(e);
};
exports.isIP = binding.isIP;
exports.isIPv4 = function (input) {
@ -92,16 +119,6 @@ function setImplmentationMethods (self) {
};
if (self.type == 'unix') {
self._writeImpl = function (buf, off, len, fd, flags) {
// Detect and disallow zero-byte writes wth an attached file
// descriptor. This is an implementation limitation of sendmsg(2).
if (fd && noData(buf, off, len)) {
throw new Error('File descriptors can only be written with data');
}
return sendMsg(self.fd, buf, off, len, fd, flags);
};
self._readImpl = function (buf, off, len) {
var bytesRead = recvMsg(self.fd, buf, off, len);
@ -123,36 +140,21 @@ function setImplmentationMethods (self) {
return bytesRead;
};
} else {
self._writeImpl = function (buf, off, len, fd, flags) {
// XXX: TLS support requires that 0-byte writes get processed
// by the kernel for some reason. Otherwise, we'd just
// fast-path return here.
// Drop 'fd' and 'flags' as these are not supported by the write(2)
// system call
return write(self.fd, buf, off, len);
};
self._readImpl = function (buf, off, len) {
return read(self.fd, buf, off, len);
};
}
self._shutdownImpl = function () {
shutdown(self.fd, 'write');
};
};
function onReadable (readable, writeable) {
function onReadable (readable, writable) {
assert(this.socket);
var socket = this.socket;
socket._onReadable();
}
function onWritable (readable, writeable) {
function onWritable (readable, writable) {
assert(this.socket);
var socket = this.socket;
if (socket._connecting) {
@ -167,11 +169,7 @@ function initStream (self) {
self._readWatcher.socket = self;
self._readWatcher.callback = onReadable;
self.readable = false;
// Queue of buffers and string that need to be written to socket.
self._writeQueue = [];
self._writeQueueEncoding = [];
self._writeQueueFD = [];
self._eof = false;
self._writeWatcher = ioWatchers.alloc();
self._writeWatcher.socket = self;
@ -213,6 +211,11 @@ Stream.prototype._onTimeout = function () {
};
Stream.prototype.writeQueueSize = function () {
return this._writeWatcher.queueSize || 0;
};
Stream.prototype.open = function (fd, type) {
initStream(this);
@ -222,6 +225,10 @@ Stream.prototype.open = function (fd, type) {
setImplmentationMethods(this);
if (this.type === "unix") {
this._writeWatcher.isUnixSocket = true;
}
this._writeWatcher.set(this.fd, false, true);
this.writable = true;
};
@ -255,182 +262,79 @@ Object.defineProperty(Stream.prototype, 'readyState', {
});
// Returns true if all the data was flushed to socket. Returns false if
// something was queued. If data was queued, then the "drain" event will
// signal when it has been finally flushed to socket.
Stream.prototype.write = function (data, encoding, fd) {
if (this._connecting || (this._writeQueue && this._writeQueue.length)) {
if (!this._writeQueue) {
this._writeQueue = [];
this._writeQueueEncoding = [];
this._writeQueueFD = [];
}
Stream.prototype._appendBucket = function (data, encoding, fd, callback) {
if (data.length != 0) {
// TODO reject empty data.
var newBucket = { data: data };
if (encoding) newBucket.encoding = encoding;
if (fd) newBucket.fd = fd;
if (callback) newBucket.callback = callback;
// Slow. There is already a write queue, so let's append to it.
if (this._writeQueueLast() === END_OF_FILE) {
throw new Error('Stream.end() called already; cannot write.');
}
// TODO properly calculate queueSize
if (typeof data == 'string' &&
this._writeQueue.length &&
typeof this._writeQueue[this._writeQueue.length-1] === 'string' &&
this._writeQueueEncoding[this._writeQueueEncoding.length-1] === encoding) {
// optimization - concat onto last
this._writeQueue[this._writeQueue.length-1] += data;
if (this._writeWatcher.lastBucket) {
this._writeWatcher.lastBucket.next = newBucket;
} else {
this._writeQueue.push(data);
this._writeQueueEncoding.push(encoding);
this._writeWatcher.firstBucket = newBucket;
}
if (fd != undefined) {
this._writeQueueFD.push(fd);
this._writeWatcher.lastBucket = newBucket;
}
return false;
} else {
// Fast.
// The most common case. There is no write queue. Just push the data
// directly to the socket.
return this._writeOut(data, encoding, fd);
if (this._writeWatcher.queueSize === undefined) {
this._writeWatcher.queueSize = 0;
}
assert(this._writeWatcher.queueSize >= 0);
this._writeWatcher.queueSize += data.length;
return this._writeWatcher.queueSize;
};
// Directly writes the data to socket.
//
// Steps:
// 1. If it's a string, write it to the `pool`. (If not space remains
// on the pool make a new one.)
// 2. Write data to socket. Return true if flushed.
// 3. Slice out remaining
// 4. Unshift remaining onto _writeQueue. Return false.
Stream.prototype._writeOut = function (data, encoding, fd) {
Stream.prototype.write = function (data /* encoding, fd, callback */) {
if (this._eof) {
throw new Error('Stream.end() called already; cannot write.');
}
if (!this.writable) {
throw new Error('Stream is not writable');
}
var buffer, off, len;
var bytesWritten, charsWritten;
var queuedData = false;
// parse the arguments. ugly.
if (typeof data != 'string') {
// 'data' is a buffer, ignore 'encoding'
buffer = data;
off = 0;
len = data.length;
var encoding, fd, callback;
if (arguments[1] === undefined || typeof arguments[1] == 'string') {
encoding = arguments[1];
if (typeof arguments[2] == 'number') {
fd = arguments[2];
callback = arguments[3];
} else {
assert(typeof data == 'string');
if (!pool || pool.length - pool.used < kMinPoolSpace) {
pool = null;
allocNewPool();
callback = arguments[2];
}
if (!encoding || encoding == 'utf8' || encoding == 'utf-8') {
// default to utf8
bytesWritten = pool.write(data, 'utf8', pool.used);
charsWritten = Buffer._charsWritten;
} else if (typeof arguments[1] == 'number') {
fd = arguments[1];
callback = arguments[2];
} else if (typeof arguments[1] == 'function') {
callback = arguments[1];
} else {
bytesWritten = pool.write(data, encoding, pool.used);
charsWritten = bytesWritten;
}
if (encoding && data.length > 0) {
assert(bytesWritten > 0);
throw new Error("Bad type for second argument");
}
buffer = pool;
len = bytesWritten;
off = pool.used;
pool.used += bytesWritten;
var queueSize = this._appendBucket(data, encoding, fd, callback);
debug('wrote ' + bytesWritten + ' bytes to pool');
if (charsWritten != data.length) {
//debug("couldn't fit " + (data.length - charsWritten) + " bytes into the pool\n");
// Unshift whatever didn't fit onto the buffer
this._writeQueue.unshift(data.slice(charsWritten));
this._writeQueueEncoding.unshift(encoding);
this._writeWatcher.start();
queuedData = true;
}
}
try {
bytesWritten = this._writeImpl(buffer, off, len, fd, 0);
} catch (e) {
this.destroy(e);
return false;
}
debug('wrote ' + bytesWritten + ' to socket. [fd, off, len] = ' + JSON.stringify([this.fd, off, len]) + "\n");
if (this._connecting) return false;
this._onWritable(); // Insert writeWatcher into the dumpQueue
require('timers').active(this);
if (bytesWritten == len) {
// awesome. sent to buffer.
if (buffer === pool) {
// If we're just writing from the pool then we can make a little
// optimization and save the space.
buffer.used -= len;
}
if (queuedData) {
return false;
} else {
return true;
}
}
// Didn't write the entire thing to buffer.
// Need to wait for the socket to become available before trying again.
this._writeWatcher.start();
// Slice out the data left.
var leftOver = buffer.slice(off + bytesWritten, off + len);
leftOver.used = leftOver.length; // used the whole thing...
// util.error('data.used = ' + data.used);
//if (!this._writeQueue) initWriteStream(this);
// data should be the next thing to write.
this._writeQueue.unshift(leftOver);
this._writeQueueEncoding.unshift(null);
// If didn't successfully write any bytes, enqueue our fd and try again
if (!bytesWritten) {
this._writeQueueFD.unshift(fd);
if (queueSize > (64*1024) && !this._haveTriedFlush) {
IOWatcher.flush();
this._haveTriedFlush = true;
}
return false;
};
// Flushes the write buffer out.
// Returns true if the entire buffer was flushed.
Stream.prototype.flush = function () {
while (this._writeQueue && this._writeQueue.length) {
var data = this._writeQueue.shift();
var encoding = this._writeQueueEncoding.shift();
var fd = this._writeQueueFD.shift();
if (data === END_OF_FILE) {
this._shutdown();
return true;
}
var flushed = this._writeOut(data,encoding,fd);
if (!flushed) return false;
}
if (this._writeWatcher) this._writeWatcher.stop();
return true;
};
Stream.prototype._writeQueueLast = function () {
return this._writeQueue.length > 0 ? this._writeQueue[this._writeQueue.length-1]
: null;
return queueSize < (64*1024);
};
@ -473,6 +377,12 @@ Stream.prototype._onConnect = function () {
this._connecting = false;
this.resume();
this.readable = this.writable = true;
if (this._writeWatcher.firstBucket) {
// Flush this in case any writes are queued up while connecting.
this._onWritable();
}
try {
this.emit('connect');
} catch (e) {
@ -480,12 +390,6 @@ Stream.prototype._onConnect = function () {
return;
}
if (this._writeQueue && this._writeQueue.length) {
// Flush this in case any writes are queued up while connecting.
this._onWritable();
}
} else if (errno != EINPROGRESS) {
this.destroy(errnoException(errno, 'connect'));
}
@ -493,11 +397,10 @@ Stream.prototype._onConnect = function () {
Stream.prototype._onWritable = function () {
// Stream becomes writable on connect() but don't flush if there's
// nothing actually to write
if (this.flush()) {
if (this._events && this._events['drain']) this.emit("drain");
if (this.ondrain) this.ondrain(); // Optimization
// Stick it into the dumpQueue
if (!this._writeWatcher.next) {
this._writeWatcher.next = IOWatcher.dumpQueue.next;
IOWatcher.dumpQueue.next = this._writeWatcher;
}
};
@ -638,7 +541,12 @@ Stream.prototype.pause = function () {
Stream.prototype.resume = function () {
if (this.fd === null) throw new Error('Cannot resume() closed Stream.');
if (this.fd === null) {
// TODO, FIXME: throwing here breaks test/simple/test-pipe.js
// throw new Error('Cannot resume() closed Stream.');
return;
}
this._readWatcher.stop();
this._readWatcher.set(this.fd, true, false);
this._readWatcher.start();
};
@ -648,15 +556,14 @@ Stream.prototype.destroy = function (exception) {
// pool is shared between sockets, so don't need to free it here.
var self = this;
// TODO would like to set _writeQueue to null to avoid extra object alloc,
// but lots of code assumes this._writeQueue is always an array.
this._writeQueue = [];
this.readable = this.writable = false;
this._eof = this.readable = this.writable = false;
if (this._writeWatcher) {
this._writeWatcher.stop();
this._writeWatcher.socket = null;
this._writeWatcher.firstBucket = null;
this._writeWatcher.lastBucket = null;
this._writeWatcher.isUnixSocket = false;
ioWatchers.free(this._writeWatcher);
this._writeWatcher = null;
}
@ -695,7 +602,7 @@ Stream.prototype._shutdown = function () {
this.writable = false;
try {
this._shutdownImpl();
shutdown(this.fd, 'write');
} catch (e) {
this.destroy(e);
}
@ -708,15 +615,14 @@ Stream.prototype._shutdown = function () {
Stream.prototype.end = function (data, encoding) {
if (this.writable) {
if (this._writeQueueLast() !== END_OF_FILE) {
if (data) this.write(data, encoding);
this._writeQueue.push(END_OF_FILE);
if (!this._connecting) {
this.flush();
}
}
}
if (!this.writable) return; // TODO this should throw error
if (this._eof) return; // TODO this should also throw error
if (data) this._appendBucket(data, encoding);
this._eof = true;
// If this isn't in the dumpQueue then we shutdown now.
if (!this._writeWatcher.firstBucket) this._shutdown();
};

2
src/node.cc

@ -1967,6 +1967,8 @@ int Start(int argc, char *argv[]) {
Tick();
IOWatcher::Dump();
} while (need_tick_cb || ev_activecnt(EV_DEFAULT_UC) > 0);

4
src/node.js

@ -29,6 +29,10 @@ process.assert = function (x, msg) {
var writeError = process.binding('stdio').writeError;
// Need to force-load this binding so that we can IOWatcher::Dump in
// src/node.cc
var IOWatcher = process.binding('io_watcher');
// nextTick()
var nextTickQueue = [];

8
src/node_buffer.cc

@ -82,7 +82,8 @@ static size_t ByteLength (Handle<String> string, enum encoding enc) {
}
Handle<Object> Buffer::New(Handle<String> string) {
Local<Object> Buffer::New(Handle<String> string,
Handle<Value> encoding) {
HandleScope scope;
// get Buffer from global scope.
@ -91,8 +92,9 @@ Handle<Object> Buffer::New(Handle<String> string) {
assert(bv->IsFunction());
Local<Function> b = Local<Function>::Cast(bv);
Local<Value> argv[1] = { Local<Value>::New(string) };
Local<Object> instance = b->NewInstance(1, argv);
Local<Value> argv[2] = { Local<Value>::New(string),
Local<Value>::New(encoding) };
Local<Object> instance = b->NewInstance(2, argv);
return scope.Close(instance);
}

4
src/node_buffer.h

@ -25,7 +25,9 @@ class Buffer : public ObjectWrap {
typedef void (*free_callback)(char *data, void *hint);
// C++ API for constructing fast buffer
static v8::Handle<v8::Object> New(v8::Handle<v8::String> string);
static v8::Local<v8::Object> New(
v8::Handle<v8::String> string,
v8::Handle<v8::Value> encoding = v8::Handle<v8::Value>());
static void Initialize(v8::Handle<v8::Object> target);
static Buffer* New(size_t length); // public constructor

438
src/node_io_watcher.cc

@ -2,17 +2,44 @@
#include <node_io_watcher.h>
#include <node.h>
#include <node_buffer.h>
#include <v8.h>
#include <sys/uio.h> /* writev */
#include <errno.h>
#include <limits.h> /* IOV_MAX */
#include <sys/types.h>
#include <sys/socket.h>
#include <assert.h>
namespace node {
using namespace v8;
static ev_prepare dumper;
static Persistent<Object> dump_queue;
Persistent<FunctionTemplate> IOWatcher::constructor_template;
Persistent<String> callback_symbol;
static Persistent<String> next_sym;
static Persistent<String> prev_sym;
static Persistent<String> ondrain_sym;
static Persistent<String> onerror_sym;
static Persistent<String> data_sym;
static Persistent<String> encoding_sym;
static Persistent<String> offset_sym;
static Persistent<String> fd_sym;
static Persistent<String> is_unix_socket_sym;
static Persistent<String> first_bucket_sym;
static Persistent<String> last_bucket_sym;
static Persistent<String> queue_size_sym;
static Persistent<String> callback_sym;
void IOWatcher::Initialize(Handle<Object> target) {
HandleScope scope;
@ -26,9 +53,39 @@ void IOWatcher::Initialize(Handle<Object> target) {
NODE_SET_PROTOTYPE_METHOD(constructor_template, "stop", IOWatcher::Stop);
NODE_SET_PROTOTYPE_METHOD(constructor_template, "set", IOWatcher::Set);
target->Set(String::NewSymbol("IOWatcher"), constructor_template->GetFunction());
Local<Function> io_watcher = constructor_template->GetFunction();
target->Set(String::NewSymbol("IOWatcher"), io_watcher);
NODE_SET_METHOD(constructor_template->GetFunction(),
"flush",
IOWatcher::Flush);
callback_symbol = NODE_PSYMBOL("callback");
next_sym = NODE_PSYMBOL("next");
prev_sym = NODE_PSYMBOL("prev");
ondrain_sym = NODE_PSYMBOL("ondrain");
onerror_sym = NODE_PSYMBOL("onerror");
first_bucket_sym = NODE_PSYMBOL("firstBucket");
last_bucket_sym = NODE_PSYMBOL("lastBucket");
queue_size_sym = NODE_PSYMBOL("queueSize");
offset_sym = NODE_PSYMBOL("offset");
fd_sym = NODE_PSYMBOL("fd");
is_unix_socket_sym = NODE_PSYMBOL("isUnixSocket");
data_sym = NODE_PSYMBOL("data");
encoding_sym = NODE_PSYMBOL("encoding");
callback_sym = NODE_PSYMBOL("callback");
ev_prepare_init(&dumper, IOWatcher::Dump);
ev_prepare_start(EV_DEFAULT_UC_ &dumper);
// Need to make sure that Dump runs *after* all other prepare watchers -
// in particular the next tick one.
ev_set_priority(&dumper, EV_MINPRI);
ev_unref(EV_DEFAULT_UC);
dump_queue = Persistent<Object>::New(Object::New());
io_watcher->Set(String::NewSymbol("dumpQueue"), dump_queue);
}
@ -144,5 +201,384 @@ Handle<Value> IOWatcher::Set(const Arguments& args) {
}
Handle<Value> IOWatcher::Flush(const Arguments& args) {
HandleScope scope; // unneccessary?
IOWatcher::Dump();
return Undefined();
}
#define KB 1024
/*
* A large javascript object structure is built up in net.js. The function
* Dump is called at the end of each iteration, before select() is called,
* to push all the data out to sockets.
*
* The structure looks like this:
*
* IOWatcher . dumpQueue
* |
* watcher . buckets - b - b - b - b
* |
* watcher . buckets - b - b
* |
* watcher . buckets - b
* |
* watcher . buckets - b - b - b
*
* The 'b' nodes are little javascript objects buckets. Each has a 'data'
* member. 'data' is either a string or buffer. E.G.
*
* b = { data: "hello world" }
*
*/
// To enable this debug output, add '-DDUMP_DEBUG' to CPPFLAGS
// in 'build/c4che/default.cache.py' and 'make clean all'
#ifdef DUMP_DEBUG
#define DEBUG_PRINT(fmt,...) \
fprintf(stderr, "(dump:%d) " fmt "\n", __LINE__, ##__VA_ARGS__)
#else
#define DEBUG_PRINT(fmt,...)
#endif
void IOWatcher::Dump(EV_P_ ev_prepare *w, int revents) {
assert(revents == EV_PREPARE);
assert(w == &dumper);
Dump();
}
void IOWatcher::Dump() {
HandleScope scope;
static struct iovec iov[IOV_MAX];
// Loop over all watchers in the dump queue. Each one stands for a socket
// that has stuff to be written out.
//
// There are several possible outcomes for each watcher.
// 1. All the buckets associated with the watcher are written out. In this
// case the watcher is disabled; it is removed from the dump_queue.
// 2. Some of the data was written, but there still remains buckets. In
// this case the watcher is enabled (i.e. we wait for the file
// descriptor to become readable) and we remove it from the dump_queue.
// When it becomes readable, we'll get a callback in net.js and add it
// again to the dump_queue
// 3. writev returns EAGAIN. This is the same as case 2.
//
// In any case, the dump queue should be empty when we exit this function.
// (See the assert at the end of the outermost for loop.
Local<Value> watcher_v;
Local<Object> watcher;
for (watcher_v = dump_queue->Get(next_sym);
watcher_v->IsObject();
dump_queue->Set(next_sym, (watcher_v = watcher->Get(next_sym))),
watcher->Set(next_sym, Null())) {
watcher = watcher_v->ToObject();
IOWatcher *io = ObjectWrap::Unwrap<IOWatcher>(watcher);
// stats (just for fun)
io->dumps_++;
io->last_dump_ = ev_now(EV_DEFAULT_UC);
DEBUG_PRINT("<%d> dumping", io->watcher_.fd);
// Number of items we've stored in iov
int iovcnt = 0;
// Number of bytes we've stored in iov
size_t to_write = 0;
bool unix_socket = false;
if (watcher->Has(is_unix_socket_sym) && watcher->Get(is_unix_socket_sym)->IsTrue()) {
unix_socket = true;
}
// Unix sockets don't like huge messages. TCP sockets do.
// TODO: handle EMSGSIZE after sendmsg().
size_t max_to_write = unix_socket ? 8*KB : 256*KB;
int fd_to_send = -1;
// Offset is only as large as the first buffer of data. (See assert
// below) Offset > 0 occurs when a previous writev could not entirely
// drain a bucket.
size_t offset = 0;
if (watcher->Has(offset_sym)) {
offset = watcher->Get(offset_sym)->Uint32Value();
}
size_t first_offset = offset;
DEBUG_PRINT("<%d> offset=%ld", io->watcher_.fd, offset);
// Loop over all the buckets for this particular watcher/socket in order
// to fill iov.
Local<Value> bucket_v;
Local<Object> bucket;
unsigned int bucket_index = 0;
for (bucket_v = watcher->Get(first_bucket_sym);
// Break if we have an FD to send.
// sendmsg can only handle one FD at a time.
fd_to_send < 0 &&
// break if we've hit the end
bucket_v->IsObject() &&
// break if iov contains a lot of data
to_write < max_to_write &&
// break if iov is running out of space
iovcnt < IOV_MAX;
bucket_v = bucket->Get(next_sym), bucket_index++) {
assert(bucket_v->IsObject());
bucket = bucket_v->ToObject();
Local<Value> data_v = bucket->Get(data_sym);
// net.js will be setting this 'data' value. We can ensure that it is
// never empty.
assert(!data_v.IsEmpty());
Local<Object> buf_object;
if (data_v->IsString()) {
// TODO: insert v8::String::Pointers() hack here.
Local<String> s = data_v->ToString();
Local<Value> e = bucket->Get(encoding_sym);
buf_object = Buffer::New(s, e);
bucket->Set(data_sym, buf_object);
} else {
assert(Buffer::HasInstance(data_v));
buf_object = data_v->ToObject();
}
size_t l = Buffer::Length(buf_object);
if (l == 0) continue;
assert(first_offset < l);
iov[iovcnt].iov_base = Buffer::Data(buf_object) + first_offset;
iov[iovcnt].iov_len = l - first_offset;
to_write += iov[iovcnt].iov_len;
iovcnt++;
first_offset = 0; // only the first buffer will be offset.
if (unix_socket && bucket->Has(fd_sym)) {
Local<Value> fd_v = bucket->Get(fd_sym);
if (fd_v->IsInt32()) {
fd_to_send = fd_v->Int32Value();
DEBUG_PRINT("<%d> got fd to send: %d", io->watcher_.fd, fd_to_send);
assert(fd_to_send >= 0);
}
}
}
if (to_write > 0) {
ssize_t written;
if (unix_socket) {
struct msghdr msg;
char scratch[64];
msg.msg_name = NULL;
msg.msg_namelen = 0;
msg.msg_iov = iov;
msg.msg_iovlen = iovcnt;
msg.msg_control = NULL; // void*
msg.msg_controllen = 0; // socklen_t
msg.msg_flags = 0; // int
if (fd_to_send >= 0) {
struct cmsghdr *cmsg;
msg.msg_control = (void *) scratch;
msg.msg_controllen = CMSG_LEN(sizeof(fd_to_send));
cmsg = CMSG_FIRSTHDR(&msg);
cmsg->cmsg_level = SOL_SOCKET;
cmsg->cmsg_type = SCM_RIGHTS;
cmsg->cmsg_len = msg.msg_controllen;
*(int*) CMSG_DATA(cmsg) = fd_to_send;
}
written = sendmsg(io->watcher_.fd, &msg, 0);
} else {
written = writev(io->watcher_.fd, iov, iovcnt);
}
DEBUG_PRINT("<%d> iovcnt: %d, to_write: %ld, written: %ld",
io->watcher_.fd,
iovcnt,
to_write,
written);
if (written < 0) {
// Allow EAGAIN.
// TODO: handle EMSGSIZE after sendmsg().
if (errno == EAGAIN) {
DEBUG_PRINT("<%d> EAGAIN", io->watcher_.fd);
io->Start();
} else {
// Emit error event
if (watcher->Has(onerror_sym)) {
Local<Value> callback_v = io->handle_->Get(onerror_sym);
assert(callback_v->IsFunction());
Local<Function> callback = Local<Function>::Cast(callback_v);
Local<Value> argv[1] = { Integer::New(errno) };
TryCatch try_catch;
callback->Call(io->handle_, 1, argv);
if (try_catch.HasCaught()) {
FatalException(try_catch);
}
}
}
// Continue with the next watcher.
continue;
}
// what about written == 0 ?
size_t queue_size = watcher->Get(queue_size_sym)->Uint32Value();
DEBUG_PRINT("<%d> queue_size=%ld", io->watcher_.fd, queue_size);
assert(queue_size >= offset);
// Now drop the buckets that have been written.
bucket_index = 0;
while (written > 0) {
bucket_v = watcher->Get(first_bucket_sym);
if (!bucket_v->IsObject()) {
// No more buckets in the queue. Make sure the last_bucket_sym is
// updated and then go to the next watcher.
watcher->Set(last_bucket_sym, Null());
break;
}
bucket = bucket_v->ToObject();
Local<Value> data_v = bucket->Get(data_sym);
assert(!data_v.IsEmpty());
// At the moment we're turning all string into buffers
// so we assert that this is not a string. However, when the
// "Pointer patch" lands, this assert will need to be removed.
assert(!data_v->IsString());
// When the "Pointer patch" lands, we will need to be careful
// to somehow store the length of strings that we're optimizing on
// so that it need not be recalculated here. Note the "Pointer patch"
// will only apply to ASCII strings - UTF8 ones will need to be
// serialized onto a buffer.
size_t bucket_len = Buffer::Length(data_v->ToObject());
if (unix_socket && bucket->Has(fd_sym)) {
bucket->Set(fd_sym, Null());
}
DEBUG_PRINT("<%d,%ld> bucket_len: %ld, offset: %ld",
io->watcher_.fd,
bucket_index,
bucket_len,
offset);
assert(bucket_len > offset);
// Only on the first bucket does is the offset > 0.
if (offset + written < bucket_len) {
// we have not written the entire bucket
DEBUG_PRINT("<%d,%ld> Only wrote part of the buffer. "
"setting watcher.offset = %ld",
io->watcher_.fd,
bucket_index,
offset + written);
watcher->Set(offset_sym,
Integer::NewFromUnsigned(offset + written));
break;
} else {
DEBUG_PRINT("<%d,%ld> wrote the whole bucket. discarding.",
io->watcher_.fd,
bucket_index);
assert(bucket_len <= queue_size);
queue_size -= bucket_len;
assert(bucket_len - offset <= written);
written -= bucket_len - offset;
Local<Value> bucket_callback_v = bucket->Get(callback_sym);
if (bucket_callback_v->IsFunction()) {
Local<Function> bucket_callback =
Local<Function>::Cast(bucket_callback_v);
TryCatch try_catch;
bucket_callback->Call(io->handle_, 0, NULL);
if (try_catch.HasCaught()) {
FatalException(try_catch);
}
}
// Offset is now zero
watcher->Set(offset_sym, Integer::NewFromUnsigned(0));
}
offset = 0; // the next bucket will have zero offset;
bucket_index++;
// unshift
watcher->Set(first_bucket_sym, bucket->Get(next_sym));
}
watcher->Set(queue_size_sym, Integer::NewFromUnsigned(queue_size));
}
// Finished dumping the buckets.
//
// If our list of buckets is empty, we can emit 'drain' and forget about
// this socket. Nothing needs to be done.
//
// Otherwise we need to prepare the io_watcher to wait for the interface
// to become writable again.
if (watcher->Get(first_bucket_sym)->IsObject()) {
// Still have buckets to be written. Wait for fd to become writable.
io->Start();
DEBUG_PRINT("<%d> Started watcher", io->watcher_.fd);
} else {
// No more buckets in the queue. Make sure the last_bucket_sym is
// updated and then go to the next watcher.
watcher->Set(last_bucket_sym, Null());
// Emptied the buckets queue for this socket. Don't wait for it to
// become writable.
io->Stop();
DEBUG_PRINT("<%d> Stop watcher", io->watcher_.fd);
// Emit drain event
if (watcher->Has(ondrain_sym)) {
Local<Value> callback_v = io->handle_->Get(ondrain_sym);
assert(callback_v->IsFunction());
Local<Function> callback = Local<Function>::Cast(callback_v);
TryCatch try_catch;
callback->Call(io->handle_, 0, NULL);
if (try_catch.HasCaught()) {
FatalException(try_catch);
}
}
}
}
// Assert that the dump_queue is empty.
assert(!dump_queue->Get(next_sym)->IsObject());
}
} // namespace node

8
src/node_io_watcher.h

@ -10,6 +10,7 @@ namespace node {
class IOWatcher : ObjectWrap {
public:
static void Initialize(v8::Handle<v8::Object> target);
static void Dump();
protected:
static v8::Persistent<v8::FunctionTemplate> constructor_template;
@ -26,6 +27,7 @@ class IOWatcher : ObjectWrap {
}
static v8::Handle<v8::Value> New(const v8::Arguments& args);
static v8::Handle<v8::Value> Flush(const v8::Arguments& args);
static v8::Handle<v8::Value> Start(const v8::Arguments& args);
static v8::Handle<v8::Value> Stop(const v8::Arguments& args);
static v8::Handle<v8::Value> Set(const v8::Arguments& args);
@ -33,9 +35,15 @@ class IOWatcher : ObjectWrap {
private:
static void Callback(EV_P_ ev_io *watcher, int revents);
static void Dump(EV_P_ ev_prepare *watcher, int revents);
void Start();
void Stop();
// stats. TODO: expose to js, add reset() method
uint64_t dumps_;
ev_tstamp last_dump_;
ev_io watcher_;
};

2
src/node_net.cc

@ -3,6 +3,7 @@
#include <node.h>
#include <node_buffer.h>
#include <node_io_watcher.h>
#include <string.h>
#include <stdlib.h>
@ -37,7 +38,6 @@
#define ARRAY_SIZE(a) (sizeof(a) / sizeof(*(a)))
namespace node {
using namespace v8;

16
test/fixtures/recvfd.js

@ -22,35 +22,33 @@ function processData(s) {
// version of our modified object back. Clean up when we're done.
var pipeStream = new net.Stream(fd);
var drainFunc = function() {
pipeStream.resume();
pipeStream.write(JSON.stringify(d) + '\n', function () {
pipeStream.destroy();
if (++numSentMessages == 2) {
s.destroy();
}
};
pipeStream.addListener('drain', drainFunc);
pipeStream.resume();
if (pipeStream.write(JSON.stringify(d) + '\n')) {
drainFunc();
}
});
};
// Create a UNIX socket to the path defined by argv[2] and read a file
// descriptor and misc data from it.
var s = new net.Stream();
s.addListener('fd', function(fd) {
receivedFDs.unshift(fd);
processData(s);
});
s.addListener('data', function(data) {
data.toString('utf8').trim().split('\n').forEach(function(d) {
receivedData.unshift(JSON.parse(d));
});
processData(s);
});
s.connect(process.argv[2]);
// vim:ts=2 sw=2 et

135
test/simple/test-dumper-unix.js

@ -0,0 +1,135 @@
var assert =require('assert');
var IOWatcher = process.binding('io_watcher').IOWatcher;
var errnoException = process.binding('net').errnoException;
var close = process.binding('net').close;
var net = require('net');
var ncomplete = 0;
function test (N, b, cb) {
var fdsSent = 0;
var fdsRecv = 0;
//console.trace();
var expected = N * b.length;
var nread = 0;
// Create a socketpair
var fds = process.binding('net').socketpair();
// Use writev/dumper to send data down the one of the sockets, fds[1].
// This requires a IOWatcher.
var w = new IOWatcher();
w.set(fds[1], false, true);
w.isUnixSocket = true;
w.callback = function (readable, writable) {
assert.ok(!readable && writable); // not really important.
// Insert watcher into dumpQueue
w.next = IOWatcher.dumpQueue.next;
IOWatcher.dumpQueue.next = w;
}
var ndrain = 0;
w.ondrain = function () {
ndrain++;
}
var nerror = 0;
w.onerror = function (errno) {
throw errnoException(errno);
nerror++;
}
// The read end, fds[0], will be used to count how much comes through.
// This sets up a readable stream on fds[0].
var stream = new net.Stream({ fd: fds[0], type: 'unix' });
//stream.readable = true;
stream.resume();
stream.on('fd', function (fd) {
console.log('got fd %d', fd);
fdsRecv++;
});
// Count the data as it arrives on the other end
stream.on('data', function (d) {
nread += d.length;
if (nread >= expected) {
assert.ok(nread === expected);
assert.equal(1, ndrain);
assert.equal(0, nerror);
console.error("done. wrote %d bytes\n", nread);
close(fds[1]);
}
});
stream.on('close', function () {
assert.equal(fdsSent, fdsRecv);
// check to make sure the watcher isn't in the dump queue.
for (var x = IOWatcher.dumpQueue; x; x = x.next) {
assert.ok(x !== w);
}
assert.equal(null, w.next);
// completely flushed
assert.ok(!w.firstBucket);
assert.ok(!w.lastBucket);
ncomplete++;
if (cb) cb();
});
// Insert watcher into dumpQueue
w.next = IOWatcher.dumpQueue.next;
IOWatcher.dumpQueue.next = w;
w.firstBucket = { data: b };
w.lastBucket = w.firstBucket;
w.queueSize = b.length;
for (var i = 0; i < N-1; i++) {
var bucket = { data: b };
w.lastBucket.next = bucket;
w.lastBucket = bucket;
w.queueSize += b.length;
// Kind of randomly fill these buckets with fds.
if (fdsSent < 5 && i % 2 == 0) {
bucket.fd = 1; // send stdout
fdsSent++;
}
}
}
function runTests (values) {
expectedToComplete = values.length;
function go () {
if (ncomplete < values.length) {
var v = values[ncomplete];
console.log("test N=%d, size=%d", v[0], v[1].length);
test(v[0], v[1], go);
}
}
go();
}
runTests([ [30, Buffer(1000)]
, [4, Buffer(10000)]
, [1, "hello world\n"]
, [50, Buffer(1024*1024)]
, [500, Buffer(40960+1)]
, [500, Buffer(40960-1)]
, [500, Buffer(40960)]
, [500, Buffer(1024*1024+1)]
, [50000, "hello world\n"]
]);
process.on('exit', function () {
assert.equal(expectedToComplete, ncomplete);
});

128
test/simple/test-dumper.js

@ -0,0 +1,128 @@
var assert =require('assert');
var IOWatcher = process.binding('io_watcher').IOWatcher;
var errnoException = process.binding('net').errnoException;
var close = process.binding('net').close;
var net = require('net');
var ncomplete = 0;
function test (N, b, cb) {
//console.trace();
var expected = N * b.length;
var nread = 0;
// Create a pipe
var fds = process.binding('net').pipe();
console.log("fds == %j", fds);
// Use writev/dumper to send data down the write end of the pipe, fds[1].
// This requires a IOWatcher.
var w = new IOWatcher();
w.set(fds[1], false, true);
w.callback = function (readable, writable) {
assert.ok(!readable && writable); // not really important.
// Insert watcher into dumpQueue
w.next = IOWatcher.dumpQueue.next;
IOWatcher.dumpQueue.next = w;
}
var ndrain = 0;
w.ondrain = function () {
ndrain++;
}
var nerror = 0;
w.onerror = function (errno) {
throw errnoException(errno);
nerror++;
}
// The read end, fds[0], will be used to count how much comes through.
// This sets up a readable stream on fds[0].
var stream = new net.Stream();
stream.open(fds[0]);
stream.readable = true;
stream.resume();
// Count the data as it arrives on the read end of the pipe.
stream.on('data', function (d) {
nread += d.length;
if (nread >= expected) {
assert.ok(nread === expected);
assert.equal(1, ndrain);
assert.equal(0, nerror);
console.error("done. wrote %d bytes\n", nread);
close(fds[1]);
}
});
stream.on('close', function () {
// check to make sure the watcher isn't in the dump queue.
for (var x = IOWatcher.dumpQueue; x; x = x.next) {
assert.ok(x !== w);
}
assert.equal(null, w.next);
// completely flushed
assert.ok(!w.firstBucket);
assert.ok(!w.lastBucket);
ncomplete++;
if (cb) cb();
});
// Insert watcher into dumpQueue
w.next = IOWatcher.dumpQueue.next;
IOWatcher.dumpQueue.next = w;
w.firstBucket = { data: b };
w.lastBucket = w.firstBucket;
w.queueSize = b.length;
for (var i = 0; i < N-1; i++) {
var bucket = { data: b };
assert.ok(!w.lastBucket.next);
w.lastBucket.next = bucket;
w.lastBucket = bucket;
w.queueSize += b.length;
}
}
function runTests (values) {
expectedToComplete = values.length;
function go () {
if (ncomplete < values.length) {
var v = values[ncomplete];
console.log("test N=%d, size=%d", v[0], v[1].length);
test(v[0], v[1], go);
}
}
go();
}
runTests([ [3, Buffer(1000)],
[30, Buffer(1000)],
[4, Buffer(10000)],
[1, "hello world\n"],
[50, Buffer(1024*1024)],
[500, Buffer(40960+1)],
[500, Buffer(40960-1)],
[500, Buffer(40960)],
[500, Buffer(1024*1024+1)],
[50000, "hello world\n"]
]);
process.on('exit', function () {
assert.equal(expectedToComplete, ncomplete);
});

13
test/simple/test-pipe.js

@ -17,20 +17,22 @@ var bufferSize = 5 * 1024 * 1024;
*/
var buffer = Buffer(bufferSize);
for (var i = 0; i < buffer.length; i++) {
buffer[i] = parseInt(Math.random()*10000) % 256;
buffer[i] = 100; //parseInt(Math.random()*10000) % 256;
}
var web = http.Server(function (req, res) {
web.close();
console.log("web server connection fd=%d", req.connection.fd);
console.log(req.headers);
var socket = net.Stream();
socket.connect(tcpPort);
socket.on('connect', function () {
console.log('socket connected');
console.log('http->tcp connected fd=%d', socket.fd);
});
req.pipe(socket);
@ -54,7 +56,7 @@ web.listen(webPort, startClient);
var tcp = net.Server(function (s) {
tcp.close();
console.log("tcp server connection");
console.log("tcp server connection fd=%d", s.fd);
var i = 0;
@ -91,6 +93,11 @@ function startClient () {
req.write(buffer);
req.end();
console.log("request fd=%d", req.connection.fd);
// note the queue includes http headers.
assert.ok(req.connection.writeQueueSize() > buffer.length);
req.on('response', function (res) {
console.log('Got response');
res.setEncoding('utf8');

15
test/simple/test-sendfd.js

@ -53,7 +53,7 @@ var logChild = function(d) {
d.split('\n').forEach(function(l) {
if (l.length > 0) {
common.debug('CHILD: ' + l);
console.error('CHILD: ' + l);
}
});
};
@ -96,19 +96,18 @@ var srv = net.createServer(function(s) {
buf.write(JSON.stringify(DATA) + '\n', 'utf8');
s.write(str, 'utf8', pipeFDs[1]);
if (s.write(buf, undefined, pipeFDs[1])) {
netBinding.close(pipeFDs[1]);
} else {
s.addListener('drain', function() {
s.write(buf, pipeFDs[1], function () {
console.error("close pipeFDs[1]");
netBinding.close(pipeFDs[1]);
});
}
});
srv.listen(SOCK_PATH);
// Spawn a child running test/fixtures/recvfd.js
var cp = child_process.spawn(process.argv[0],
[path.join(common.fixturesDir, 'recvfd.js'), SOCK_PATH]);
var cp = child_process.spawn(process.execPath,
[path.join(common.fixturesDir, 'recvfd.js'),
SOCK_PATH]);
cp.stdout.addListener('data', logChild);
cp.stderr.addListener('data', logChild);

Loading…
Cancel
Save