Browse Source

Implement datagram sockets

- Adds new dgram module, for all data-gram type transports
- Supports both UDP client and servers
- Supports Unix Daemon sockets in DGRAM mode too (think syslog)
- Uses a shared Buffer and slices that as needed to be reasonably
  performant.
- One supplied test program so far, test-dgram-pingpong
- Passes test cases on osx 10.6 and ubuntu 9.10u
v0.7.4-release
Paul Querna 15 years ago
committed by Ryan Dahl
parent
commit
02da5ed4a1
  1. 184
      lib/dgram.js
  2. 1
      src/node.cc
  3. 157
      src/node_net.cc
  4. 83
      test/simple/test-dgram-pingpong.js

184
lib/dgram.js

@ -0,0 +1,184 @@
var sys = require("sys");
var fs = require("fs");
var events = require("events");
var dns = require('dns');
var Buffer = require('buffer').Buffer;
var IOWatcher = process.IOWatcher;
var binding = process.binding('net');
var socket = binding.socket;
var bind = binding.bind;
var recvfrom = binding.recvfrom;
var sendto = binding.sendto;
var close = binding.close;
var ENOENT = binding.ENOENT;
function isPort (x) { return parseInt(x) >= 0; }
var pool = null;
function getPool() {
/* TODO: this effectively limits you to 8kb maximum packet sizes */
var minPoolAvail = 1024 * 8;
var poolSize = 1024 * 64;
if (pool === null || (pool.used + minPoolAvail > pool.length)) {
pool = new Buffer(poolSize);
pool.used = 0;
}
return pool;
}
function Socket (listener) {
events.EventEmitter.call(this);
var self = this;
if (listener) {
self.addListener('message', listener);
}
self.watcher = new IOWatcher();
self.watcher.host = self;
self.watcher.callback = function () {
while (self.fd) {
var p = getPool();
var rinfo = recvfrom(self.fd, p, p.used, p.length - p.used, 0);
if (!rinfo) return;
self.emit('message', p.slice(p.used, p.used + rinfo.size), rinfo);
p.used += rinfo.size;
}
};
}
sys.inherits(Socket, events.EventEmitter);
exports.Socket = Socket;
exports.createSocket = function (listener) {
return new Socket(listener);
};
Socket.prototype.bind = function () {
var self = this;
if (self.fd) throw new Error('Server already opened');
if (!isPort(arguments[0])) {
/* TODO: unix path dgram */
self.fd = socket('unix_dgram');
self.type = 'unix_dgram';
var path = arguments[0];
self.path = path;
// unlink sockfile if it exists
fs.stat(path, function (err, r) {
if (err) {
if (err.errno == ENOENT) {
bind(self.fd, path);
process.nextTick(function() {
self._startWatcher();
});
} else {
throw r;
}
} else {
if (!r.isFile()) {
throw new Error("Non-file exists at " + path);
} else {
fs.unlink(path, function (err) {
if (err) {
throw err;
} else {
bind(self.fd, path);
process.nextTick(function() {
self._startWatcher();
});
}
});
}
}
});
} else if (!arguments[1]) {
// Don't bind(). OS will assign a port with INADDR_ANY.
// The port can be found with server.address()
self.type = 'udp4';
self.fd = socket(self.type);
bind(self.fd, arguments[0]);
process.nextTick(function() {
self._startWatcher();
});
} else {
// the first argument is the port, the second an IP
var port = arguments[0];
dns.lookup(arguments[1], function (err, ip, addressType) {
if (err) {
self.emit('error', err);
} else {
self.type = addressType == 4 ? 'udp4' : 'udp6';
self.fd = socket(self.type);
bind(self.fd, port, ip);
process.nextTick(function() {
self._startWatcher();
});
}
});
}
};
Socket.prototype._startWatcher = function () {
this.watcher.set(this.fd, true, false);
this.watcher.start();
this.emit("listening");
};
Socket.prototype.address = function () {
return getsockname(this.fd);
};
Socket.prototype.send = function(port, addr, buffer, offset, length) {
var self = this;
if (!isPort(arguments[0])) {
if (!self.fd) {
self.type = 'unix_dgram';
self.fd = socket(self.type);
}
sendto(self.fd, buffer, offset, length, 0, port, addr);
}
else {
dns.lookup(arguments[1], function (err, ip, addressType) {
if (err) {
self.emit('error', err);
} else {
if (!self.fd) {
self.type = addressType == 4 ? 'udp4' : 'udp6';
self.fd = socket(self.type);
process.nextTick(function() {
self._startWatcher();
});
}
sendto(self.fd, buffer, offset, length, 0, port, ip);
}
});
}
};
Socket.prototype.close = function () {
var self = this;
if (!self.fd) throw new Error('Not running');
self.watcher.stop();
close(self.fd);
self.fd = null;
if (self.type === "unix_dgram") {
fs.unlink(self.path, function () {
self.emit("close");
});
} else {
self.emit("close");
}
};

1
src/node.cc

@ -1845,6 +1845,7 @@ static Handle<Value> Binding(const Arguments& args) {
exports->Set(String::New("assert"), String::New(native_assert)); exports->Set(String::New("assert"), String::New(native_assert));
exports->Set(String::New("buffer"), String::New(native_buffer)); exports->Set(String::New("buffer"), String::New(native_buffer));
exports->Set(String::New("child_process"),String::New(native_child_process)); exports->Set(String::New("child_process"),String::New(native_child_process));
exports->Set(String::New("dgram"), String::New(native_dgram));
exports->Set(String::New("dns"), String::New(native_dns)); exports->Set(String::New("dns"), String::New(native_dns));
exports->Set(String::New("events"), String::New(native_events)); exports->Set(String::New("events"), String::New(native_events));
exports->Set(String::New("file"), String::New(native_file)); exports->Set(String::New("file"), String::New(native_file));

157
src/node_net.cc

@ -42,6 +42,7 @@ static Persistent<String> errno_symbol;
static Persistent<String> syscall_symbol; static Persistent<String> syscall_symbol;
static Persistent<String> fd_symbol; static Persistent<String> fd_symbol;
static Persistent<String> size_symbol;
static Persistent<String> address_symbol; static Persistent<String> address_symbol;
static Persistent<String> port_symbol; static Persistent<String> port_symbol;
static Persistent<String> type_symbol; static Persistent<String> type_symbol;
@ -147,7 +148,16 @@ static Handle<Value> Socket(const Arguments& args) {
} else if (0 == strcasecmp(*t, "UNIX")) { } else if (0 == strcasecmp(*t, "UNIX")) {
domain = PF_UNIX; domain = PF_UNIX;
type = SOCK_STREAM; type = SOCK_STREAM;
} else if (0 == strcasecmp(*t, "UNIX_DGRAM")) {
domain = PF_UNIX;
type = SOCK_DGRAM;
} else if (0 == strcasecmp(*t, "UDP")) { } else if (0 == strcasecmp(*t, "UDP")) {
domain = PF_INET;
type = SOCK_DGRAM;
} else if (0 == strcasecmp(*t, "UDP4")) {
domain = PF_INET;
type = SOCK_DGRAM;
} else if (0 == strcasecmp(*t, "UDP6")) {
domain = PF_INET6; domain = PF_INET6;
type = SOCK_DGRAM; type = SOCK_DGRAM;
} else { } else {
@ -520,6 +530,63 @@ static Handle<Value> Read(const Arguments& args) {
return scope.Close(Integer::New(bytes_read)); return scope.Close(Integer::New(bytes_read));
} }
// var info = t.recvfrom(fd, buffer, offset, length, flags);
// info.size // bytes read
// info.port // from port
// info.address // from address
// returns null on EAGAIN or EINTR, raises an exception on all other errors
// returns object otherwise
static Handle<Value> RecvFrom(const Arguments& args) {
HandleScope scope;
if (args.Length() < 5) {
return ThrowException(Exception::TypeError(
String::New("Takes 5 parameters")));
}
FD_ARG(args[0])
if (!Buffer::HasInstance(args[1])) {
return ThrowException(Exception::TypeError(
String::New("Second argument should be a buffer")));
}
Buffer * buffer = ObjectWrap::Unwrap<Buffer>(args[1]->ToObject());
size_t off = args[2]->Int32Value();
if (off >= buffer->length()) {
return ThrowException(Exception::Error(
String::New("Offset is out of bounds")));
}
size_t len = args[3]->Int32Value();
if (off + len > buffer->length()) {
return ThrowException(Exception::Error(
String::New("Length is extends beyond buffer")));
}
int flags = args[4]->Int32Value();
struct sockaddr_storage address_storage;
socklen_t addrlen = sizeof(struct sockaddr_storage);
ssize_t bytes_read = recvfrom(fd, (char*)buffer->data() + off, len, flags,
(struct sockaddr*) &address_storage, &addrlen);
if (bytes_read < 0) {
if (errno == EAGAIN || errno == EINTR) return Null();
return ThrowException(ErrnoException(errno, "read"));
}
Local<Object> info = Object::New();
info->Set(size_symbol, Integer::New(bytes_read));
ADDRESS_TO_JS(info, address_storage);
return scope.Close(info);
}
// bytesRead = t.recvMsg(fd, buffer, offset, length) // bytesRead = t.recvMsg(fd, buffer, offset, length)
// if (recvMsg.fd) { // if (recvMsg.fd) {
@ -780,6 +847,93 @@ static Handle<Value> SendMsg(const Arguments& args) {
return scope.Close(Integer::New(written)); return scope.Close(Integer::New(written));
} }
// var bytes = sendto(fd, buf, off, len, flags, destination port, desitnation address);
//
// Write a buffer with optional offset and length to the given file
// descriptor. Note that we refuse to send 0 bytes.
//
// The 'fd' parameter is a numerical file descriptor, or the undefined value
// to send none.
//
// The 'flags' parameter is a number representing a bitmask of MSG_* values.
// This is passed directly to sendmsg().
//
// The destination port can either be an int port, or a path.
//
// Returns null on EAGAIN or EINTR, raises an exception on all other errors
static Handle<Value> SendTo(const Arguments& args) {
HandleScope scope;
if (args.Length() < 5) {
return ThrowException(Exception::TypeError(
String::New("Takes 5 or 6 parameters")));
}
// The first argument should be a file descriptor
FD_ARG(args[0])
// Grab the actul data to be written
if (!Buffer::HasInstance(args[1])) {
return ThrowException(Exception::TypeError(
String::New("Expected either a string or a buffer")));
}
Buffer *buf = ObjectWrap::Unwrap<Buffer>(args[1]->ToObject());
size_t offset = 0;
if (args.Length() >= 3 && !args[2]->IsUndefined()) {
if (!args[2]->IsUint32()) {
return ThrowException(Exception::TypeError(
String::New("Expected unsigned integer for offset")));
}
offset = args[2]->Uint32Value();
if (offset >= buf->length()) {
return ThrowException(Exception::Error(
String::New("Offset into buffer too large")));
}
}
size_t length = buf->length() - offset;
if (args.Length() >= 4 && !args[3]->IsUndefined()) {
if (!args[3]->IsUint32()) {
return ThrowException(Exception::TypeError(
String::New("Expected unsigned integer for length")));
}
length = args[3]->Uint32Value();
if (offset + length > buf->length()) {
return ThrowException(Exception::Error(
String::New("offset + length beyond buffer length")));
}
}
int flags = 0;
if (args.Length() >= 5 && !args[4]->IsUndefined()) {
if (!args[4]->IsUint32()) {
return ThrowException(Exception::TypeError(
String::New("Expected unsigned integer for a flags argument")));
}
flags = args[4]->Uint32Value();
}
Handle<Value> error = ParseAddressArgs(args[5], args[6], false);
if (!error.IsEmpty()) return ThrowException(error);
ssize_t written = sendto(fd, buf->data() + offset, length, flags, addr, addrlen);
if (written < 0) {
if (errno == EAGAIN || errno == EINTR) return Null();
return ThrowException(ErrnoException(errno, "sendmsg"));
}
/* Note that the FD isn't explicitly closed here, this
* happens in the JS */
return scope.Close(Integer::New(written));
}
// Probably only works for Linux TCP sockets? // Probably only works for Linux TCP sockets?
// Returns the amount of data on the read queue. // Returns the amount of data on the read queue.
@ -891,6 +1045,8 @@ void InitNet(Handle<Object> target) {
NODE_SET_METHOD(target, "read", Read); NODE_SET_METHOD(target, "read", Read);
NODE_SET_METHOD(target, "sendMsg", SendMsg); NODE_SET_METHOD(target, "sendMsg", SendMsg);
NODE_SET_METHOD(target, "recvfrom", RecvFrom);
NODE_SET_METHOD(target, "sendto", SendTo);
recv_msg_template = recv_msg_template =
Persistent<FunctionTemplate>::New(FunctionTemplate::New(RecvMsg)); Persistent<FunctionTemplate>::New(FunctionTemplate::New(RecvMsg));
@ -927,6 +1083,7 @@ void InitNet(Handle<Object> target) {
errno_symbol = NODE_PSYMBOL("errno"); errno_symbol = NODE_PSYMBOL("errno");
syscall_symbol = NODE_PSYMBOL("syscall"); syscall_symbol = NODE_PSYMBOL("syscall");
fd_symbol = NODE_PSYMBOL("fd"); fd_symbol = NODE_PSYMBOL("fd");
size_symbol = NODE_PSYMBOL("size");
address_symbol = NODE_PSYMBOL("address"); address_symbol = NODE_PSYMBOL("address");
port_symbol = NODE_PSYMBOL("port"); port_symbol = NODE_PSYMBOL("port");
} }

83
test/simple/test-dgram-pingpong.js

@ -0,0 +1,83 @@
require("../common");
var Buffer = require('buffer').Buffer;
var dgram = require("dgram");
var tests_run = 0;
function pingPongTest (port, host) {
var N = 500;
var count = 0;
var sent_final_ping = false;
var server = dgram.createSocket(function (msg, rinfo) {
puts("connection: " + rinfo.address + ":"+ rinfo.port);
puts("server got: " + msg);
if (/PING/.exec(msg)) {
var buf = new Buffer(4);
buf.write('PONG');
server.send(rinfo.port, rinfo.address, buf, 0, buf.length);
}
});
server.addListener("error", function (e) {
throw e;
});
server.bind(port, host);
server.addListener("listening", function () {
puts("server listening on " + port + " " + host);
var buf = new Buffer(4);
buf.write('PING');
var client = dgram.createSocket();
client.addListener("message", function (msg, rinfo) {
puts("client got: " + msg);
assert.equal("PONG", msg.toString('ascii'));
count += 1;
if (count < N) {
client.send(port, host, buf, 0, buf.length);
} else {
sent_final_ping = true;
client.send(port, host, buf, 0, buf.length);
process.nextTick(function() {
client.close();
});
}
});
client.addListener("close", function () {
puts('client.close');
assert.equal(N, count);
tests_run += 1;
server.close();
});
client.addListener("error", function (e) {
throw e;
});
client.send(port, host, buf, 0, buf.length);
count += 1;
});
}
/* All are run at once, so run on different ports */
pingPongTest(20989, "localhost");
pingPongTest(20990, "localhost");
pingPongTest(20988);
pingPongTest(20997, "::1");
//pingPongTest("/tmp/pingpong.sock");
process.addListener("exit", function () {
assert.equal(4, tests_run);
puts('done');
});
Loading…
Cancel
Save