diff --git a/lib/dgram.js b/lib/dgram.js index 42473b40e7..d548ea2fb1 100644 --- a/lib/dgram.js +++ b/lib/dgram.js @@ -223,7 +223,11 @@ Socket.prototype.address = function() { Socket.prototype.setBroadcast = function(arg) { - throw new Error('not yet implemented'); + if (this._handle.setBroadcast((arg) ? 1 : 0) == -1) { + throw errnoException(errno, 'setBroadcast'); + } + + return true; }; diff --git a/src/udp_wrap.cc b/src/udp_wrap.cc index 5b2fb52923..e73f634cde 100644 --- a/src/udp_wrap.cc +++ b/src/udp_wrap.cc @@ -93,6 +93,7 @@ public: static Handle GetSockName(const Arguments& args); static Handle AddMembership(const Arguments& args); static Handle DropMembership(const Arguments& args); + static Handle SetBroadcast(const Arguments& args); private: static inline char* NewSlab(v8::Handle global, v8::Handle wrap_obj); @@ -153,6 +154,7 @@ void UDPWrap::Initialize(Handle target) { NODE_SET_PROTOTYPE_METHOD(t, "getsockname", GetSockName); NODE_SET_PROTOTYPE_METHOD(t, "addMembership", AddMembership); NODE_SET_PROTOTYPE_METHOD(t, "dropMembership", DropMembership); + NODE_SET_PROTOTYPE_METHOD(t, "setBroadcast", SetBroadcast); target->Set(String::NewSymbol("UDP"), Persistent::New(t)->GetFunction()); @@ -209,6 +211,20 @@ Handle UDPWrap::Bind6(const Arguments& args) { return DoBind(args, AF_INET6); } +Handle UDPWrap::SetBroadcast(const Arguments& args) { + HandleScope scope; + UNWRAP + + assert(args.Length() == 1); + + int on = args[0]->Uint32Value(); + int r = uv_udp_set_broadcast(&wrap->handle_, on); + + if (r) + SetErrno(uv_last_error(uv_default_loop())); + + return scope.Close(Integer::New(r)); +} Handle UDPWrap::SetMembership(const Arguments& args, uv_membership membership) { diff --git a/test/simple/test-dgram-broadcast-multi-process.js b/test/simple/test-dgram-broadcast-multi-process.js new file mode 100644 index 0000000000..d0a98aa1cf --- /dev/null +++ b/test/simple/test-dgram-broadcast-multi-process.js @@ -0,0 +1,160 @@ +// Copyright Joyent, Inc. and other Node contributors. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the +// "Software"), to deal in the Software without restriction, including +// without limitation the rights to use, copy, modify, merge, publish, +// distribute, sublicense, and/or sell copies of the Software, and to permit +// persons to whom the Software is furnished to do so, subject to the +// following conditions: +// +// The above copyright notice and this permission notice shall be included +// in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN +// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE +// USE OR OTHER DEALINGS IN THE SOFTWARE. + +var common = require('../common'), + assert = require('assert'), + cluster = require('cluster'), + dgram = require('dgram'), + util = require('util'), + assert = require('assert'), + Buffer = require('buffer').Buffer, + LOCAL_BROADCAST_HOST = '255.255.255.255', + messages = [ + new Buffer('First message to send'), + new Buffer('Second message to send'), + new Buffer('Third message to send'), + new Buffer('Fourth message to send') + ]; + +if (cluster.isMaster) { + var workers = {}, + listeners = 3, + listening = 0, + i = 0, + done = 0; + + //launch child processes + for (var x = 0; x < listeners; x++) { + (function () { + var worker = cluster.fork(); + workers[worker.pid] = worker; + + worker.messagesReceived = []; + + worker.on('message', function (msg) { + if (msg.listening) { + listening += 1; + + if (listening === listeners) { + //all child process are listening, so start sending + sendSocket.sendNext(); + } + } + else if (msg.message) { + worker.messagesReceived.push(msg.message); + + if (worker.messagesReceived.length === messages.length) { + done += 1; + console.error('%d received %d messages total.', worker.pid, + worker.messagesReceived.length); + } + + if (done === listeners) { + console.error('All workers have received the required number of ' + + 'messages. Will now compare.'); + + Object.keys(workers).forEach(function (pid) { + var worker = workers[pid]; + + var count = 0; + + worker.messagesReceived.forEach(function(buf) { + for (var i = 0; i < messages.length; ++i) { + if (buf.toString() === messages[i].toString()) { + count++; + break; + } + } + }); + + console.error('%d received %d matching messges.', worker.pid + , count); + + assert.equal(count, messages.length + ,'A worker received an invalid multicast message'); + }); + } + } + }); + })(x); + } + + var sendSocket = dgram.createSocket('udp4'); + + sendSocket.bind(common.PORT); + sendSocket.setBroadcast(true); + + sendSocket.on('close', function() { + console.error('sendSocket closed'); + }); + + sendSocket.sendNext = function() { + var buf = messages[i++]; + + if (!buf) { + try { sendSocket.close(); } catch (e) {} + return; + } + + sendSocket.send(buf, 0, buf.length, + common.PORT, LOCAL_BROADCAST_HOST, function(err) { + + if (err) throw err; + + console.error('sent %s to %s:%s', util.inspect(buf.toString()) + , LOCAL_BROADCAST_HOST, common.PORT); + + process.nextTick(sendSocket.sendNext); + }); + }; +} + +if (!cluster.isMaster) { + var receivedMessages = []; + var listenSocket = dgram.createSocket('udp4'); + + listenSocket.on('message', function(buf, rinfo) { + console.error('%s received %s from %j', process.pid + , util.inspect(buf.toString()), rinfo); + + receivedMessages.push(buf); + + process.send({ message : buf.toString() }); + + if (receivedMessages.length == messages.length) { + listenSocket.dropMembership(LOCAL_BROADCAST_HOST); + process.nextTick(function() { // TODO should be changed to below. + // listenSocket.dropMembership(LOCAL_BROADCAST_HOST, function() { + listenSocket.close(); + }); + } + }); + + listenSocket.on('close', function() { + process.exit(); + }); + + listenSocket.on('listening', function() { + process.send({ listening : true }); + }); + + listenSocket.bind(common.PORT); +}