Browse Source

dgram: integrate libuv UDP support

Ben Noordhuis 14 years ago
parent
commit
cbd4033619
  1. 12
      doc/api/dgram.markdown
  2. 0
      lib/dgram_legacy.js
  3. 312
      lib/dgram_uv.js
  4. 3
      src/node.js
  5. 1
      src/node_extensions.h
  6. 360
      src/udp_wrap.cc
  7. 1
      wscript

12
doc/api/dgram.markdown

@ -31,6 +31,11 @@ Creates a datagram socket of the specified types. Valid types are:
Takes an optional callback which is added as a listener for `message` events.
Call `socket.bind` if you want to receive datagrams. `socket.bind()` will bind
to the "all interfaces" address on a random port (it does the right thing for
both `udp4` and `udp6` sockets). You can then retrieve the address and port
with `socket.address().address` and `socket.address().port`.
### dgram.send(buf, offset, length, path, [callback])
For Unix domain datagram sockets, the destination address is a pathname in the filesystem.
@ -61,6 +66,10 @@ re-used. Note that DNS lookups will delay the time that a send takes place, at
least until the next tick. The only way to know for sure that a send has taken place
is to use the callback.
If the socket has not been previously bound with a call to `bind`, it's
assigned a random port number and bound to the "all interfaces" address
(0.0.0.0 for IPv4-only systems, ::0 for IPv6 and dual stack systems).
Example of sending a UDP packet to a random port on `localhost`;
var dgram = require('dgram');
@ -142,8 +151,7 @@ Example of a UDP server listening on port 41234:
### dgram.close()
Close the underlying socket and stop listening for data on it. UDP sockets
automatically listen for messages, even if they did not call `bind()`.
Close the underlying socket and stop listening for data on it.
### dgram.address()

0
lib/dgram.js → lib/dgram_legacy.js

312
lib/dgram_uv.js

@ -0,0 +1,312 @@
// Copyright Joyent, Inc. and other Node contributors.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the
// "Software"), to deal in the Software without restriction, including
// without limitation the rights to use, copy, modify, merge, publish,
// distribute, sublicense, and/or sell copies of the Software, and to permit
// persons to whom the Software is furnished to do so, subject to the
// following conditions:
//
// The above copyright notice and this permission notice shall be included
// in all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
// USE OR OTHER DEALINGS IN THE SOFTWARE.
var util = require('util');
var events = require('events');
var UDP = process.binding('udp_wrap').UDP;
// lazily loaded
var dns = null;
var net = null;
// no-op callback
function noop() {
}
function isIP(address) {
if (!net)
net = require('net');
return net.isIP(address);
}
function lookup(address, family, callback) {
// implicit 'bind before send' needs to run on the same tick
var matchedFamily = isIP(address);
if (matchedFamily)
return callback(null, address, matchedFamily);
if (!dns)
dns = require('dns');
return dns.lookup(address, family, callback);
}
function lookup4(address, callback) {
return lookup(address || '0.0.0.0', 4, callback);
}
function lookup6(address, callback) {
return lookup(address || '::0', 6, callback);
}
function newHandle(type) {
if (type == 'udp4') {
var handle = new UDP;
handle.lookup = lookup4;
return handle;
}
if (type == 'udp6') {
var handle = new UDP;
handle.lookup = lookup6;
handle.bind = handle.bind6;
handle.send = handle.send6;
return handle;
}
if (type == 'unix_dgram')
throw new Error('unix_dgram sockets are not supported any more.');
throw new Error('Bad socket type specified. Valid types are: udp4, udp6');
}
function Socket(type, listener) {
events.EventEmitter.call(this);
var handle = newHandle(type);
handle.socket = this;
this._handle = handle;
this._receiving = false;
this._bound = false;
this.type = type;
if (typeof listener === 'function')
this.on('message', listener);
}
util.inherits(Socket, events.EventEmitter);
exports.Socket = Socket;
exports.createSocket = function(type, listener) {
return new Socket(type, listener);
};
Socket.prototype.bind = function(port, address) {
var self = this;
self._healthCheck();
// resolve address first
self._handle.lookup(address, function(err, ip) {
if (!err) {
if (self._handle.bind(ip, port || 0, /*flags=*/0)) {
err = errnoException(errno, 'bind');
}
else {
self._bound = true;
self.emit('listening');
self._startReceiving();
}
}
if (err) {
// caller may not have had a chance yet to register its
// error event listener so defer the error to the next tick
process.nextTick(function() {
self.emit('error', err);
});
}
});
};
// thin wrapper around `send`, here for compatibility with dgram_legacy.js
Socket.prototype.sendto = function(buffer,
offset,
length,
port,
address,
callback) {
if (typeof offset !== 'number' || typeof length !== 'number')
throw new Error('send takes offset and length as args 2 and 3');
if (typeof address !== 'string')
throw new Error(this.type + ' sockets must send to port, address');
this.send(buffer, offset, length, port, address, callback);
};
Socket.prototype.send = function(buffer,
offset,
length,
port,
address,
callback) {
var self = this;
callback = callback || noop;
self._healthCheck();
self._startReceiving();
self._handle.lookup(address, function(err, ip) {
if (err) {
if (callback) callback(err);
self.emit('error', err);
}
else {
var req = self._handle.send(buffer, offset, length, port, ip);
if (req) {
req.oncomplete = afterSend;
req.cb = callback;
}
else {
// don't emit as error, dgram_legacy.js compatibility
callback(errnoException(errno, 'send'));
}
}
});
};
function afterSend(status, handle, req, buffer) {
var self = handle.socket;
// CHECKME socket's been closed by user, don't call callback?
if (handle !== self._handle)
void(0);
if (req.cb)
req.cb(null, buffer.length); // compatibility with dgram_legacy.js
}
Socket.prototype.close = function() {
this._healthCheck();
this._stopReceiving();
this._handle.close();
this._handle = null;
this.emit('close');
};
Socket.prototype.address = function() {
this._healthCheck();
var address = this._handle.getsockname();
if (!address)
throw errnoException(errno, 'getsockname');
return address;
};
Socket.prototype.setBroadcast = function(arg) {
throw new Error('not yet implemented');
};
Socket.prototype.setTTL = function(arg) {
throw new Error('not yet implemented');
};
Socket.prototype.setMulticastTTL = function(arg) {
throw new Error('not yet implemented');
};
Socket.prototype.setMulticastLoopback = function(arg) {
throw new Error('not yet implemented');
};
Socket.prototype.addMembership = function(multicastAddress,
multicastInterface) {
// are we ever going to support this in libuv?
throw new Error('not yet implemented');
};
Socket.prototype.dropMembership = function(multicastAddress,
multicastInterface) {
// are we ever going to support this in libuv?
throw new Error('not yet implemented');
};
Socket.prototype._healthCheck = function() {
if (!this._handle)
throw new Error('Not running'); // error message from dgram_legacy.js
};
Socket.prototype._startReceiving = function() {
if (this._receiving)
return;
if (!this._bound) {
this.bind(); // bind to random port
// sanity check
if (!this._bound)
throw new Error('implicit bind failed');
}
this._handle.onmessage = onMessage;
this._handle.recvStart();
this._receiving = true;
this.fd = -42; // compatibility hack
};
Socket.prototype._stopReceiving = function() {
if (!this._receiving)
return;
this._handle.onmessage = null;
this._handle.recvStop();
this._receiving = false;
};
function onMessage(handle, nread, buf, rinfo) {
var self = handle.socket;
if (nread == -1) {
self.emit('error', errnoException('recvmsg'));
}
else {
rinfo.size = buf.length; // compatibility
self.emit('message', buf, rinfo);
}
}
// TODO share with net_uv and others
function errnoException(errorno, syscall) {
var e = new Error(syscall + ' ' + errorno);
e.errno = e.code = errorno;
e.syscall = syscall;
return e;
}

3
src/node.js

@ -425,6 +425,9 @@
case 'timers':
return process.features.uv ? 'timers_uv' : 'timers_legacy';
case 'dgram':
return process.features.uv ? 'dgram_uv' : 'dgram_legacy';
case 'dns':
return process.features.uv ? 'dns_uv' : 'dns_legacy';

1
src/node_extensions.h

@ -44,6 +44,7 @@ NODE_EXT_LIST_ITEM(node_os)
// libuv rewrite
NODE_EXT_LIST_ITEM(node_timer_wrap)
NODE_EXT_LIST_ITEM(node_tcp_wrap)
NODE_EXT_LIST_ITEM(node_udp_wrap)
NODE_EXT_LIST_ITEM(node_pipe_wrap)
NODE_EXT_LIST_ITEM(node_cares_wrap)
NODE_EXT_LIST_ITEM(node_stdio_wrap)

360
src/udp_wrap.cc

@ -0,0 +1,360 @@
#include <node.h>
#include <node_buffer.h>
#include <req_wrap.h>
#include <handle_wrap.h>
// Temporary hack: libuv should provide uv_inet_pton and uv_inet_ntop.
// Clean this up in tcp_wrap.cc too.
#if defined(__MINGW32__) || defined(_MSC_VER)
extern "C" {
# include <inet_net_pton.h>
# include <inet_ntop.h>
}
# define uv_inet_pton ares_inet_pton
# define uv_inet_ntop ares_inet_ntop
#else // __POSIX__
# include <arpa/inet.h>
# define uv_inet_pton inet_pton
# define uv_inet_ntop inet_ntop
#endif
using namespace v8;
namespace node {
#define UNWRAP \
assert(!args.Holder().IsEmpty()); \
assert(args.Holder()->InternalFieldCount() > 0); \
UDPWrap* wrap = \
static_cast<UDPWrap*>(args.Holder()->GetPointerFromInternalField(0)); \
if (!wrap) { \
SetErrno(UV_EBADF); \
return scope.Close(Integer::New(-1)); \
}
// TODO share with tcp_wrap.cc
Persistent<String> address_symbol;
Persistent<String> port_symbol;
Persistent<String> buffer_sym;
void AddressToJS(Handle<Object> info,
const sockaddr* addr,
int addrlen);
typedef ReqWrap<uv_udp_send_t> SendWrap;
class UDPWrap: public HandleWrap {
public:
static void Initialize(Handle<Object> target);
static Handle<Value> New(const Arguments& args);
static Handle<Value> Bind(const Arguments& args);
static Handle<Value> Send(const Arguments& args);
static Handle<Value> Bind6(const Arguments& args);
static Handle<Value> Send6(const Arguments& args);
static Handle<Value> RecvStart(const Arguments& args);
static Handle<Value> RecvStop(const Arguments& args);
static Handle<Value> GetSockName(const Arguments& args);
private:
UDPWrap(Handle<Object> object);
virtual ~UDPWrap();
static uv_buf_t OnAlloc(uv_handle_t* handle, size_t suggested_size);
static void OnSend(uv_udp_send_t* req, int status);
static void OnRecv(uv_udp_t* handle,
ssize_t nread,
uv_buf_t buf,
struct sockaddr* addr,
unsigned flags);
uv_udp_t handle_;
};
UDPWrap::UDPWrap(Handle<Object> object): HandleWrap(object,
(uv_handle_t*)&handle_) {
int r = uv_udp_init(&handle_);
assert(r == 0); // can't fail anyway
handle_.data = reinterpret_cast<void*>(this);
}
UDPWrap::~UDPWrap() {
}
void UDPWrap::Initialize(Handle<Object> target) {
HandleWrap::Initialize(target);
HandleScope scope;
buffer_sym = NODE_PSYMBOL("buffer");
port_symbol = NODE_PSYMBOL("port");
address_symbol = NODE_PSYMBOL("address");
Local<FunctionTemplate> t = FunctionTemplate::New(New);
t->InstanceTemplate()->SetInternalFieldCount(1);
t->SetClassName(String::NewSymbol("UDP"));
NODE_SET_PROTOTYPE_METHOD(t, "bind", Bind);
NODE_SET_PROTOTYPE_METHOD(t, "send", Send);
NODE_SET_PROTOTYPE_METHOD(t, "bind6", Bind6);
NODE_SET_PROTOTYPE_METHOD(t, "send6", Send6);
NODE_SET_PROTOTYPE_METHOD(t, "close", Close);
NODE_SET_PROTOTYPE_METHOD(t, "recvStart", RecvStart);
NODE_SET_PROTOTYPE_METHOD(t, "recvStop", RecvStop);
NODE_SET_PROTOTYPE_METHOD(t, "getsockname", GetSockName);
target->Set(String::NewSymbol("UDP"),
Persistent<FunctionTemplate>::New(t)->GetFunction());
}
Handle<Value> UDPWrap::New(const Arguments& args) {
HandleScope scope;
assert(args.IsConstructCall());
new UDPWrap(args.This());
return scope.Close(args.This());
}
Handle<Value> UDPWrap::Bind(const Arguments& args) {
HandleScope scope;
UNWRAP
// bind(ip, port, flags)
assert(args.Length() == 3);
String::Utf8Value address(args[0]->ToString());
const int port = args[1]->Uint32Value();
const int flags = args[2]->Uint32Value();
const sockaddr_in addr = uv_ip4_addr(*address, port);
int r = uv_udp_bind(&wrap->handle_, addr, flags);
if (r)
SetErrno(uv_last_error().code);
return scope.Close(Integer::New(r));
}
Handle<Value> UDPWrap::Bind6(const Arguments& args) {
assert(0 && "implement me");
return Null();
}
Handle<Value> UDPWrap::Send(const Arguments& args) {
HandleScope scope;
// send(buffer, offset, length, port, address)
assert(args.Length() == 5);
UNWRAP
assert(Buffer::HasInstance(args[0]));
Local<Object> buffer_obj = args[0]->ToObject();
size_t offset = args[1]->Uint32Value();
size_t length = args[2]->Uint32Value();
SendWrap* req_wrap = new SendWrap();
req_wrap->object_->SetHiddenValue(buffer_sym, buffer_obj);
uv_buf_t buf = uv_buf_init(Buffer::Data(buffer_obj) + offset,
length);
const unsigned short port = args[3]->Uint32Value();
String::Utf8Value address(args[4]->ToString());
const sockaddr_in addr = uv_ip4_addr(*address, port);
int r = uv_udp_send(&req_wrap->req_, &wrap->handle_, &buf, 1, addr, OnSend);
req_wrap->Dispatched();
if (r) {
SetErrno(uv_last_error().code);
delete req_wrap;
return Null();
}
else {
return scope.Close(req_wrap->object_);
}
}
Handle<Value> UDPWrap::Send6(const Arguments& args) {
assert(0 && "implement me");
return Null();
}
Handle<Value> UDPWrap::RecvStart(const Arguments& args) {
HandleScope scope;
UNWRAP
// UV_EALREADY means that the socket is already bound but that's okay
int r = uv_udp_recv_start(&wrap->handle_, OnAlloc, OnRecv);
if (r && uv_last_error().code != UV_EALREADY) {
SetErrno(uv_last_error().code);
return False();
}
return True();
}
Handle<Value> UDPWrap::RecvStop(const Arguments& args) {
HandleScope scope;
UNWRAP
int r = uv_udp_recv_stop(&wrap->handle_);
return scope.Close(Integer::New(r));
}
Handle<Value> UDPWrap::GetSockName(const Arguments& args) {
HandleScope scope;
struct sockaddr_storage address;
UNWRAP
int addrlen = sizeof(address);
int r = uv_getsockname(reinterpret_cast<uv_handle_t*>(&wrap->handle_),
reinterpret_cast<sockaddr*>(&address),
&addrlen);
if (r == 0) {
Local<Object> sockname = Object::New();
AddressToJS(sockname, reinterpret_cast<sockaddr*>(&address), addrlen);
return scope.Close(sockname);
}
else {
SetErrno(uv_last_error().code);
return Null();
}
}
// TODO share with StreamWrap::AfterWrite() in stream_wrap.cc
void UDPWrap::OnSend(uv_udp_send_t* req, int status) {
HandleScope scope;
assert(req != NULL);
SendWrap* req_wrap = reinterpret_cast<SendWrap*>(req->data);
UDPWrap* wrap = reinterpret_cast<UDPWrap*>(req->handle->data);
assert(req_wrap->object_.IsEmpty() == false);
assert(wrap->object_.IsEmpty() == false);
if (status) {
SetErrno(uv_last_error().code);
}
Local<Value> argv[4] = {
Integer::New(status),
Local<Value>::New(wrap->object_),
Local<Value>::New(req_wrap->object_),
req_wrap->object_->GetHiddenValue(buffer_sym),
};
MakeCallback(req_wrap->object_, "oncomplete", 4, argv);
delete req_wrap;
}
uv_buf_t UDPWrap::OnAlloc(uv_handle_t* handle, size_t suggested_size) {
// FIXME switch to slab allocation, share with stream_wrap.cc
return uv_buf_init(new char[suggested_size], suggested_size);
}
static void ReleaseMemory(char* data, void* arg) {
delete[] data; // data == buf.base
}
void UDPWrap::OnRecv(uv_udp_t* handle,
ssize_t nread,
uv_buf_t buf,
struct sockaddr* addr,
unsigned flags) {
if (nread == 0) {
ReleaseMemory(buf.base, NULL);
return;
}
HandleScope scope;
UDPWrap* wrap = reinterpret_cast<UDPWrap*>(handle->data);
Handle<Value> argv[4] = {
wrap->object_,
Integer::New(nread),
Null(),
Null()
};
if (nread == -1) {
SetErrno(uv_last_error().code);
}
else {
Local<Object> rinfo = Object::New();
AddressToJS(rinfo, addr, sizeof *addr);
argv[2] = Buffer::New(buf.base, nread, ReleaseMemory, NULL)->handle_;
argv[3] = rinfo;
}
MakeCallback(wrap->object_, "onmessage", ARRAY_SIZE(argv), argv);
}
void AddressToJS(Handle<Object> info,
const sockaddr* addr,
int addrlen) {
char ip[INET6_ADDRSTRLEN];
const sockaddr_in *a4;
const sockaddr_in6 *a6;
int port;
assert(addr != NULL);
if (addrlen == 0) {
info->Set(address_symbol, String::Empty());
return;
}
switch (addr->sa_family) {
case AF_INET6:
a6 = reinterpret_cast<const sockaddr_in6*>(addr);
inet_ntop(AF_INET6, &a6->sin6_addr, ip, sizeof ip);
port = ntohs(a6->sin6_port);
info->Set(address_symbol, String::New(ip));
info->Set(port_symbol, Integer::New(port));
break;
case AF_INET:
a4 = reinterpret_cast<const sockaddr_in*>(addr);
inet_ntop(AF_INET, &a4->sin_addr, ip, sizeof ip);
port = ntohs(a4->sin_port);
info->Set(address_symbol, String::New(ip));
info->Set(port_symbol, Integer::New(port));
break;
default:
info->Set(address_symbol, String::Empty());
}
}
} // namespace node
NODE_MODULE(node_udp_wrap, node::UDPWrap::Initialize);

1
wscript

@ -865,6 +865,7 @@ def build(bld):
src/handle_wrap.cc
src/stream_wrap.cc
src/tcp_wrap.cc
src/udp_wrap.cc
src/pipe_wrap.cc
src/cares_wrap.cc
src/stdio_wrap.cc

Loading…
Cancel
Save