diff --git a/README.md b/README.md index e7b43bc..f6ca666 100644 --- a/README.md +++ b/README.md @@ -114,7 +114,9 @@ Options include: // Optionally pass in your own UDP socket to use. socket: udpSocket, // Optionally pass in array of { host, port } to add to the routing table if you know any peers - nodes: [{ host, port }, ...] + nodes: [{ host, port }, ...], + // Optionally pass a port you prefer to bind to instead of a random one + bind: 0 } ``` @@ -128,9 +130,9 @@ Your DHT routing id is `hash(publicIp + publicPort)` and will be autoconfigured Wait for the node to be fully bootstrapped etc. You don't have to wait for this method, but can be useful during testing. -#### `await node.bind(port)` +#### `await node.bind([preferredPort])` -Bind to a specific UDP port instead of a random one. +Wait for the underlying socket to bind. If you prefer a specific port you can specify it here. #### `node.id` diff --git a/index.js b/index.js index fa6d882..e94b702 100644 --- a/index.js +++ b/index.js @@ -66,6 +66,7 @@ class DHT extends EventEmitter { this.table = new Table(opts.id || randomBytes(32)) this.rpc = new RPC({ + bind: opts.bind, maxWindow: opts.maxWindow, socket: opts.socket, onwarning: opts.onwarning, @@ -93,7 +94,6 @@ class DHT extends EventEmitter { this._nat = new NatAnalyzer(opts.natSampleSize || 16) this._onrow = (row) => row.on('full', (node) => this._onfullrow(node, row)) this._rotateSecrets = false - this._bound = false this._secrets = [ Buffer.alloc(32), Buffer.alloc(32) @@ -105,11 +105,7 @@ class DHT extends EventEmitter { sodium.randombytes_buf(this._secrets[1]) this.table.on('row', this._onrow) - - this.rpc.socket.on('listening', () => { - this._bound = true - this.emit('listening') - }) + this.rpc.socket.on('listening', () => this.emit('listening')) if (opts.nodes) { for (const node of opts.nodes) this.addNode(node) @@ -198,9 +194,7 @@ class DHT extends EventEmitter { this._resolveSampled = null } - if (!this._bound) { - await bind(this.rpc.socket, 0) - } + await this.rpc.bind() this.emit('ready') } @@ -676,24 +670,3 @@ function compare (id, a, b) { function randomOffset (n) { return n - ((Math.random() * 0.5 * n) | 0) } - -function bind (socket, port) { - return new Promise((resolve, reject) => { - socket.bind(port) - - socket.on('error', onerror) - socket.on('listening', ondone) - - function onerror (err) { - socket.removeListener('error', onerror) - socket.removeListener('listening', ondone) - reject(err) - } - - function ondone () { - socket.removeListener('error', onerror) - socket.removeListener('listening', ondone) - resolve() - } - }) -} diff --git a/lib/rpc.js b/lib/rpc.js index 4ddcb56..5f5b2f9 100644 --- a/lib/rpc.js +++ b/lib/rpc.js @@ -9,6 +9,9 @@ module.exports = class RPC { this._tick = 0 this._w = 0 this._win = [0, 0, 0, 0] + this._bind = opts.bind || 0 + this._bound = false + this._binding = null this.maxWindow = opts.maxWindow || 80 // ~100 per second burst, ~80 per second avg this.maxRetries = 3 @@ -45,31 +48,41 @@ module.exports = class RPC { return this.socket.address() } - bind (port) { - return new Promise((resolve, reject) => { - const s = this.socket + bind (port = this._bind) { + if (this._binding) return this._binding - if (s.listen) { - s.listen(port) - } else { - s.bind(port) - } + const defaultPort = this._bind + + this._binding = new Promise((resolve, reject) => { + const s = this.socket + s.bind(port) s.on('listening', onlistening) s.on('error', onerror) function onlistening () { + this._bound = true + s.removeListener('listening', onlistening) s.removeListener('error', onerror) resolve() } function onerror (err) { + // retry on any port if preferred port is unavail + if (port === defaultPort && port !== 0) { + port = 0 + s.bind(0) + return + } + s.removeListener('listening', onlistening) s.removeListener('error', onerror) reject(err) } }) + + return this._binding } destroy () { @@ -93,8 +106,12 @@ module.exports = class RPC { return this.socket } - request (m, opts) { - if (this.destroyed) return Promise.reject(new Error('RPC socket destroyed')) + async request (m, opts) { + if (this.destroyed) throw new Error('RPC socket destroyed') + + const socket = (opts && opts.socket) || this.socket + + if (!this._bound && socket === this.socket) await this.bind() if (this._drainInterval === null) { this._drainInterval = setInterval(this._drain.bind(this), 750) @@ -113,7 +130,7 @@ module.exports = class RPC { return new Promise((resolve, reject) => { const total = this._win[0] + this._win[1] + this._win[2] + this._win[3] const req = { - socket: (opts && opts.socket) || this.socket, + socket, timeout: 2, expectOk: !!(opts && opts.expectOk !== false), tries: (opts && opts.retry === false) ? this.maxRetries : 0, diff --git a/test.js b/test.js index 8232645..02a651b 100644 --- a/test.js +++ b/test.js @@ -1,4 +1,5 @@ const tape = require('tape') +const dgram = require('dgram') const DHT = require('./') tape('make tiny swarm', async function (t) { @@ -255,10 +256,39 @@ tape('addNode / nodes option', async function (t) { bootstrap.destroy() }) +tape('set bind', async function (t) { + const port = await freePort() + + const a = new DHT({ bind: port }) + await a.ready() + + t.same(a.address().port, port, 'bound to explicit port') + + const b = new DHT({ bind: port }) + await b.ready() + + t.notSame(b.address().port, port, 'bound to different port as explicit one is taken') + + a.destroy() + b.destroy() +}) + function destroy (list) { for (const node of list) node.destroy() } +function freePort () { + return new Promise(resolve => { + const socket = dgram.createSocket('udp4') + + socket.bind(0) + socket.on('listening', function () { + const { port } = socket.address() + socket.close(() => resolve(port)) + }) + }) +} + async function makeSwarm (n) { const node = new DHT() await node.bind(0)