Browse Source

streamline auto bind

session-estimator
Mathias Buus 4 years ago
parent
commit
2f424af545
  1. 8
      README.md
  2. 33
      index.js
  3. 37
      lib/rpc.js
  4. 30
      test.js

8
README.md

@ -114,7 +114,9 @@ Options include:
// Optionally pass in your own UDP socket to use. // Optionally pass in your own UDP socket to use.
socket: udpSocket, socket: udpSocket,
// Optionally pass in array of { host, port } to add to the routing table if you know any peers // Optionally pass in array of { host, port } to add to the routing table if you know any peers
nodes: [{ host, port }, ...] nodes: [{ host, port }, ...],
// Optionally pass a port you prefer to bind to instead of a random one
bind: 0
} }
``` ```
@ -128,9 +130,9 @@ Your DHT routing id is `hash(publicIp + publicPort)` and will be autoconfigured
Wait for the node to be fully bootstrapped etc. Wait for the node to be fully bootstrapped etc.
You don't have to wait for this method, but can be useful during testing. You don't have to wait for this method, but can be useful during testing.
#### `await node.bind(port)` #### `await node.bind([preferredPort])`
Bind to a specific UDP port instead of a random one. Wait for the underlying socket to bind. If you prefer a specific port you can specify it here.
#### `node.id` #### `node.id`

33
index.js

@ -66,6 +66,7 @@ class DHT extends EventEmitter {
this.table = new Table(opts.id || randomBytes(32)) this.table = new Table(opts.id || randomBytes(32))
this.rpc = new RPC({ this.rpc = new RPC({
bind: opts.bind,
maxWindow: opts.maxWindow, maxWindow: opts.maxWindow,
socket: opts.socket, socket: opts.socket,
onwarning: opts.onwarning, onwarning: opts.onwarning,
@ -93,7 +94,6 @@ class DHT extends EventEmitter {
this._nat = new NatAnalyzer(opts.natSampleSize || 16) this._nat = new NatAnalyzer(opts.natSampleSize || 16)
this._onrow = (row) => row.on('full', (node) => this._onfullrow(node, row)) this._onrow = (row) => row.on('full', (node) => this._onfullrow(node, row))
this._rotateSecrets = false this._rotateSecrets = false
this._bound = false
this._secrets = [ this._secrets = [
Buffer.alloc(32), Buffer.alloc(32),
Buffer.alloc(32) Buffer.alloc(32)
@ -105,11 +105,7 @@ class DHT extends EventEmitter {
sodium.randombytes_buf(this._secrets[1]) sodium.randombytes_buf(this._secrets[1])
this.table.on('row', this._onrow) this.table.on('row', this._onrow)
this.rpc.socket.on('listening', () => this.emit('listening'))
this.rpc.socket.on('listening', () => {
this._bound = true
this.emit('listening')
})
if (opts.nodes) { if (opts.nodes) {
for (const node of opts.nodes) this.addNode(node) for (const node of opts.nodes) this.addNode(node)
@ -198,9 +194,7 @@ class DHT extends EventEmitter {
this._resolveSampled = null this._resolveSampled = null
} }
if (!this._bound) { await this.rpc.bind()
await bind(this.rpc.socket, 0)
}
this.emit('ready') this.emit('ready')
} }
@ -676,24 +670,3 @@ function compare (id, a, b) {
function randomOffset (n) { function randomOffset (n) {
return n - ((Math.random() * 0.5 * n) | 0) return n - ((Math.random() * 0.5 * n) | 0)
} }
function bind (socket, port) {
return new Promise((resolve, reject) => {
socket.bind(port)
socket.on('error', onerror)
socket.on('listening', ondone)
function onerror (err) {
socket.removeListener('error', onerror)
socket.removeListener('listening', ondone)
reject(err)
}
function ondone () {
socket.removeListener('error', onerror)
socket.removeListener('listening', ondone)
resolve()
}
})
}

37
lib/rpc.js

@ -9,6 +9,9 @@ module.exports = class RPC {
this._tick = 0 this._tick = 0
this._w = 0 this._w = 0
this._win = [0, 0, 0, 0] this._win = [0, 0, 0, 0]
this._bind = opts.bind || 0
this._bound = false
this._binding = null
this.maxWindow = opts.maxWindow || 80 // ~100 per second burst, ~80 per second avg this.maxWindow = opts.maxWindow || 80 // ~100 per second burst, ~80 per second avg
this.maxRetries = 3 this.maxRetries = 3
@ -45,31 +48,41 @@ module.exports = class RPC {
return this.socket.address() return this.socket.address()
} }
bind (port) { bind (port = this._bind) {
return new Promise((resolve, reject) => { if (this._binding) return this._binding
const defaultPort = this._bind
this._binding = new Promise((resolve, reject) => {
const s = this.socket const s = this.socket
if (s.listen) {
s.listen(port)
} else {
s.bind(port) s.bind(port)
}
s.on('listening', onlistening) s.on('listening', onlistening)
s.on('error', onerror) s.on('error', onerror)
function onlistening () { function onlistening () {
this._bound = true
s.removeListener('listening', onlistening) s.removeListener('listening', onlistening)
s.removeListener('error', onerror) s.removeListener('error', onerror)
resolve() resolve()
} }
function onerror (err) { function onerror (err) {
// retry on any port if preferred port is unavail
if (port === defaultPort && port !== 0) {
port = 0
s.bind(0)
return
}
s.removeListener('listening', onlistening) s.removeListener('listening', onlistening)
s.removeListener('error', onerror) s.removeListener('error', onerror)
reject(err) reject(err)
} }
}) })
return this._binding
} }
destroy () { destroy () {
@ -93,8 +106,12 @@ module.exports = class RPC {
return this.socket return this.socket
} }
request (m, opts) { async request (m, opts) {
if (this.destroyed) return Promise.reject(new Error('RPC socket destroyed')) if (this.destroyed) throw new Error('RPC socket destroyed')
const socket = (opts && opts.socket) || this.socket
if (!this._bound && socket === this.socket) await this.bind()
if (this._drainInterval === null) { if (this._drainInterval === null) {
this._drainInterval = setInterval(this._drain.bind(this), 750) this._drainInterval = setInterval(this._drain.bind(this), 750)
@ -113,7 +130,7 @@ module.exports = class RPC {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
const total = this._win[0] + this._win[1] + this._win[2] + this._win[3] const total = this._win[0] + this._win[1] + this._win[2] + this._win[3]
const req = { const req = {
socket: (opts && opts.socket) || this.socket, socket,
timeout: 2, timeout: 2,
expectOk: !!(opts && opts.expectOk !== false), expectOk: !!(opts && opts.expectOk !== false),
tries: (opts && opts.retry === false) ? this.maxRetries : 0, tries: (opts && opts.retry === false) ? this.maxRetries : 0,

30
test.js

@ -1,4 +1,5 @@
const tape = require('tape') const tape = require('tape')
const dgram = require('dgram')
const DHT = require('./') const DHT = require('./')
tape('make tiny swarm', async function (t) { tape('make tiny swarm', async function (t) {
@ -255,10 +256,39 @@ tape('addNode / nodes option', async function (t) {
bootstrap.destroy() bootstrap.destroy()
}) })
tape('set bind', async function (t) {
const port = await freePort()
const a = new DHT({ bind: port })
await a.ready()
t.same(a.address().port, port, 'bound to explicit port')
const b = new DHT({ bind: port })
await b.ready()
t.notSame(b.address().port, port, 'bound to different port as explicit one is taken')
a.destroy()
b.destroy()
})
function destroy (list) { function destroy (list) {
for (const node of list) node.destroy() for (const node of list) node.destroy()
} }
function freePort () {
return new Promise(resolve => {
const socket = dgram.createSocket('udp4')
socket.bind(0)
socket.on('listening', function () {
const { port } = socket.address()
socket.close(() => resolve(port))
})
})
}
async function makeSwarm (n) { async function makeSwarm (n) {
const node = new DHT() const node = new DHT()
await node.bind(0) await node.bind(0)

Loading…
Cancel
Save