diff --git a/index.js b/index.js index 7f975da..7b63daa 100644 --- a/index.js +++ b/index.js @@ -1,301 +1,403 @@ const dns = require('dns') -const RPC = require('./lib/rpc') -const Query = require('./lib/query') -const race = require('./lib/race') -const nodeId = require('./lib/id') -const NatAnalyzer = require('./lib/nat-analyzer') +const { EventEmitter } = require('events') const Table = require('kademlia-routing-table') const TOS = require('time-ordered-set') -const FIFO = require('fast-fifo/fixed-size') const sodium = require('sodium-universal') -const dgram = require('dgram') -const { EventEmitter } = require('events') +const c = require('compact-encoding') +const NatSampler = require('nat-sampler') +const IO = require('./lib/io') +const Query = require('./lib/query') +const peer = require('./lib/peer') +const { BAD_COMMAND, BAD_TOKEN, TIMEOUT, DESTROY } = require('./lib/errors') +const TMP = Buffer.allocUnsafe(32) const TICK_INTERVAL = 5000 const SLEEPING_INTERVAL = 3 * TICK_INTERVAL -const PERSISTENT_TICKS = 240 // if nothing major bad happens in ~20mins we can consider this node stable (if nat is friendly) -const MORE_PERSISTENT_TICKS = 3 * PERSISTENT_TICKS +const STABLE_TICKS = 240 // if nothing major bad happens in ~20mins we can consider this node stable (if nat is friendly) +const MORE_STABLE_TICKS = 3 * STABLE_TICKS const REFRESH_TICKS = 60 // refresh every ~5min when idle -const RECENT_NODE = 20 // we've heard from a node less than 1min ago +const RECENT_NODE = 12 // we've heard from a node less than 1min ago const OLD_NODE = 360 // if an node has been around more than 30 min we consider it old -// this is the known id for figuring out if we should ping bootstrap nodes -const PING_BOOTSTRAP = Buffer.allocUnsafe(32) -sodium.crypto_generichash(PING_BOOTSTRAP, Buffer.from('ping bootstrap')) - -class Request { - constructor (dht, m) { - this.dht = dht - this.tid = m.tid - this.from = m.from - this.to = m.to - this.token = m.token - this.target = m.target - this.closerNodes = m.closerNodes - this.status = m.status - this.command = m.command - this.value = m.value - } - - get commit () { - return this.token !== null - } - - error (code, opts = {}) { - return this.send(code, null, opts) - } - - reply (value, opts = {}) { - return this.send(0, value, opts) - } - - send (status, value, opts) { - const target = opts.closerNodes === false ? null : this.target - const token = opts.token !== false - return this.dht._reply(this.tid, target, status, value, this.from, token, opts.socket) - } -} - class DHT extends EventEmitter { constructor (opts = {}) { super() this.bootstrapNodes = opts.bootstrap === false ? [] : (opts.bootstrap || []).map(parseNode) - this.nodes = new TOS() - // this is just the private id, see below in the persistence handler this.table = new Table(opts.id || randomBytes(32)) - - this.rpc = new RPC({ - bind: opts.bind, - maxWindow: opts.maxWindow, - socket: opts.socket, - onwarning: opts.onwarning, + this.nodes = new TOS() + this.io = new IO(this.table, { + ...opts, onrequest: this._onrequest.bind(this), - onresponse: this._onresponse.bind(this) + onresponse: this._onresponse.bind(this), + ontimeout: this._ontimeout.bind(this) }) + this.concurrency = opts.concurrency || 10 this.bootstrapped = false - this.concurrency = opts.concurrency || this.table.k this.ephemeral = true + this.firewalled = this.io.firewalled this.adaptive = typeof opts.ephemeral !== 'boolean' && opts.adaptive !== false - this.clientOnly = !this.adaptive && opts.ephemeral !== false - this._userAddNode = opts.addNode || allowAll + this._nat = new NatSampler() + this._bind = opts.bind || 0 this._forcePersistent = opts.ephemeral === false this._repinging = 0 - this._reping = new FIFO(128) - this._bootstrapping = this.bootstrap() - this._localSocket = opts.localSocket || null // used for auto configuring nat sessions + this._checks = 0 this._tick = randomOffset(100) // make sure to random offset all the network ticks this._refreshTicks = randomOffset(REFRESH_TICKS) - this._pingBootstrapTicks = randomOffset(REFRESH_TICKS) - this._persistentTicks = this.adaptive ? PERSISTENT_TICKS : 0 + this._stableTicks = this.adaptive ? STABLE_TICKS : 0 this._tickInterval = setInterval(this._ontick.bind(this), TICK_INTERVAL) this._lastTick = Date.now() - this._nat = new NatAnalyzer(opts.natSampleSize || 16) + this._lastHost = null this._onrow = (row) => row.on('full', (node) => this._onfullrow(node, row)) - this._rotateSecrets = false - this._secrets = [ - Buffer.alloc(32), - Buffer.alloc(32) - ] - this._resolveSampled = null - this._sampled = new Promise((resolve) => { this._resolveSampled = resolve }) - - sodium.randombytes_buf(this._secrets[0]) - sodium.randombytes_buf(this._secrets[1]) + this._nonePersistentSamples = [] + this._bootstrapping = this.bootstrap() this.table.on('row', this._onrow) - this.rpc.socket.on('listening', () => this.emit('listening')) if (opts.nodes) { for (const node of opts.nodes) this.addNode(node) } } + static bootstrapper (bind, opts) { + return new this({ bind, firewalled: false, ...opts }) + } + get id () { return this.ephemeral ? null : this.table.id } + get host () { + return this._nat.host + } + + get port () { + return this._nat.port + } + onmessage (buf, rinfo) { - if (buf.byteLength > 1) this.rpc.onmessage(false, buf, rinfo) + if (buf.byteLength > 1) this.io.onmessage(null, buf, rinfo) + } + + bind () { + return this.io.bind() + } + + address () { + const socket = this.firewalled ? this.io.clientSocket : this.io.serverSocket + return socket ? socket.address() : null } - sampledNAT () { - return this._sampled + addNode ({ host, port }) { + this._addNode({ + id: peer.id(host, port), + port, + host, + token: null, + to: null, + sampled: 0, + added: this._tick, + pinged: 0, + seen: 0, + downHints: 0, + prev: null, + next: null + }) + } + + toArray () { + return this.nodes.toArray().map(({ host, port }) => ({ host, port })) } ready () { return this._bootstrapping } - query (target, command, value, opts) { + query ({ target, command, value }, opts) { this._refreshTicks = REFRESH_TICKS return new Query(this, target, command, value || null, opts) } - ping (node) { - return this.request(null, 'ping', null, node) + ping (to) { + return this.request({ token: null, command: 'ping', target: null, value: null }, to) } - request (target, command, value, to, opts) { - const ephemeral = this.ephemeral || !!(opts && opts.socket !== this.rpc.socket) - const token = (opts && opts.token) || null + request ({ token = null, command, target = null, value = null }, { host, port }, opts) { + const req = this.io.createRequest({ id: null, host, port }, token, command, target, value) - return this.rpc.request({ - version: 2, - tid: 0, - from: null, - to, - id: ephemeral ? null : this.table.id, - token, - target, - closerNodes: null, - command, - status: 0, - value - }, opts) - } + if (opts && opts.socket) req.socket = opts.socket - requestAll (target, command, value, nodes, opts = {}) { - const min = typeof opts.min === 'number' ? opts.min : 1 - if (nodes.length < min) return Promise.reject(new Error('Too few nodes to request')) - - const p = [] - for (const node of nodes) p.push(this.request(target, command, value, node)) - return race(p, min, opts.max) - } - - destroy () { - this.rpc.destroy() - if (this._resolveSampled !== null) { - this._resolveSampled(false) - this._resolveSampled = null - } - if (this._localSocket) { - this._localSocket.close() - this._localSocket = null - } - clearInterval(this._tickInterval) + return new Promise((resolve, reject) => { + req.onresponse = resolve + req.onerror = reject + req.send() + }) } async bootstrap () { await Promise.resolve() // wait a tick, so apis can be used from the outside + await this.io.bind() + + this.emit('listening') + + // TODO: some papers describe more advanced ways of bootstrapping - we should prob look into that for (let i = 0; i < 2; i++) { await this._backgroundQuery(this.table.id, 'find_node', null).finished() - if (this.bootstrapped || !this._forcePersistent || !(await this._onpersistent())) break + if (this.bootstrapped || !this._forcePersistent || !(await this._onstable())) break } if (this.bootstrapped) return this.bootstrapped = true - if (this._resolveSampled !== null) { - this._resolveSampled(true) - this._resolveSampled = null - } + this.emit('ready') + } + + refresh () { + const node = this.table.random() + this._backgroundQuery(node ? node.id : this.table.id, 'find_node', null) + } - await this.rpc.bind() + destroy () { + clearInterval(this._tickInterval) + return this.io.destroy() + } - this.emit('ready') + _request (to, command, target, value, onresponse, onerror) { + const req = this.io.createRequest(to, null, command, target, value) + + req.onresponse = onresponse + req.onerror = onerror + req.send() + + return req } - _backgroundQuery (target, command, value) { - const backgroundCon = Math.min(this.concurrency, Math.max(2, (this.concurrency / 8) | 0)) - const q = this.query(target, command, value, { - concurrency: backgroundCon - }) + _sampleBootstrapMaybe (from, to) { // we don't check that this is a bootstrap but good enough, some node once + if (this._nonePersistentSamples.length >= this.bootstrapNodes.length) return + const id = from.host + ':' + from.port + if (this._nonePersistentSamples.indexOf(id) > -1) return + this._nonePersistentSamples.push(id) + this._nat.add(to.host, to.port) + } - q.on('data', () => { - // yield to other traffic - q.concurrency = this.rpc.inflightRequests < 3 - ? this.concurrency - : backgroundCon + _addNodeFromNetwork (sample, from, to) { + if (from.id === null) { + this._sampleBootstrapMaybe(from, to) + return + } + + const oldNode = this.table.get(from.id) + + // refresh it, if we've seen this before + if (oldNode) { + if (sample && (oldNode.sampled === 0 || (this._tick - oldNode.sampled) >= OLD_NODE)) { + oldNode.to = to + oldNode.sampled = this._tick + this._nat.add(to.host, to.port) + } + + oldNode.pinged = oldNode.seen = this._tick + this.nodes.add(oldNode) + return + } + + this._addNode({ + id: from.id, + port: from.port, + host: from.host, + to, + sampled: 0, + added: this._tick, + pinged: this._tick, // last time we interacted with them + seen: this._tick, // last time we heard from them + downHints: 0, + prev: null, + next: null }) + } - return q + _addNode (node) { + if (this.nodes.has(node) || node.id.equals(this.table.id)) return + + node.added = node.pinged = node.seen = this._tick + + if (!this.table.add(node)) return + this.nodes.add(node) + + if (node.to && node.sampled === 0) { + node.sampled = this._tick + this._nat.add(node.to.host, node.to.port) + } + + this.emit('add-node', node) } - _token (addr, i) { - const token = Buffer.allocUnsafe(32) - this._rotateSecrets = true - // TODO: also add .port? - sodium.crypto_generichash(token, Buffer.from(addr.host), this._secrets[i]) - return token + _removeStaleNode (node, lastSeen) { + if (node.seen <= lastSeen) this._removeNode(node) } - refresh () { - const node = this.table.random() - this._backgroundQuery(node ? node.id : this.table.id, 'find_node', null) + _removeNode (node) { + if (!this.nodes.has(node)) return + + this.table.remove(node.id) + this.nodes.remove(node) + + this.emit('remove-node', node) } - _pingSomeBootstrapNodes () { - // once in a while it can be good to ping the bootstrap nodes, since we force them to be ephemeral to lower their load - // to make this a lightweight as possible we first check if we are the closest node we know to a known id (hash(ping bootstrap)) - // and if so we issue a background query against that. if after doing this query we are still one of the closests nodes - // we ping the bootstrapper - in practice this results to very little bootstrap ping traffic. + _onwakeup () { + this._tick += 2 * OLD_NODE // bump the tick enough that everything appears old. + this._tick += 8 - (this._tick & 7) - 2 // triggers a series of pings in two ticks + this._stableTicks = MORE_STABLE_TICKS + this._refreshTicks = 1 // triggers a refresh next tick (allow network time to wake up also) + this._lastHost = null // clear network cache check - this._pingBootstrapTicks = REFRESH_TICKS + if (this.adaptive && !this.ephemeral) { + this.ephemeral = true + this.io.ephemeral = true + this.emit('ephemeral') + } - const nodes = this.table.closest(PING_BOOTSTRAP, 1) - if (nodes.length === 0 || compare(PING_BOOTSTRAP, this.table.id, nodes[0].id) > 0) { + this.emit('wakeup') + } + + _onfullrow (newNode, row) { + if (!this.bootstrapped || this._repinging >= 3) return + + let oldest = null + for (const node of row.nodes) { + if (node.pinged === this._tick) continue + if (oldest === null || oldest.pinged > node.pinged || (oldest.pinged === node.pinged && oldest.added > node.added)) oldest = node + } + + if (oldest === null) return + if ((this._tick - oldest.pinged) < RECENT_NODE && (this._tick - oldest.added) > OLD_NODE) return + + this._repingAndSwap(newNode, oldest) + } + + _repingAndSwap (newNode, oldNode) { + const self = this + const lastSeen = oldNode.seen + + oldNode.pinged = this._tick + + this._repinging++ + this._request({ id: null, host: oldNode.host, port: oldNode.port }, 'ping', null, null, onsuccess, onswap) + + function onsuccess (m) { + if (oldNode.seen <= lastSeen) return onswap() + self._repinging-- + } + + function onswap (e) { + self._repinging-- + self._removeNode(oldNode) + self._addNode(newNode) + } + } + + _onrequest (req, external) { + if (req.from.id !== null) { + this._addNodeFromNetwork(!external, req.from, req.to) + } + + // standard keep alive call + if (req.command === 'ping') { + req.sendReply(0, null, false, false) return } - const q = this._backgroundQuery(PING_BOOTSTRAP, 'find_node', null) + // check if the other side can receive a message to their other socket + if (req.command === 'ping_nat') { + if (req.value === null || req.value.byteLength < 2) return + const port = c.uint16.decode({ start: 0, end: 2, buffer: req.value }) + if (port === 0) return + req.from.port = port + req.sendReply(0, null, false, false) + return + } - q.on('close', () => { - if (q.closestReplies.length === 0) return + // empty dht reply back + if (req.command === 'find_node') { + if (!req.target) return + req.sendReply(0, null, false, true) + return + } - if (compare(PING_BOOTSTRAP, this.table.id, q.closestReplies[q.closestReplies.length - 1].id) > 0) { - return + if (req.command === 'down_hint') { + if (req.value === null || req.value.byteLength < 6) return + if (this._checks < 10) { + sodium.crypto_generichash(TMP, req.value.subarray(0, 6)) + const node = this.table.get(TMP) + if (node && (node.pinged < this._tick || node.downHints === 0)) { + node.downHints++ + this._check(node) + } } + req.sendReply(0, null, false, false) + return + } - this._resolveBootstrapNodes((nodes) => { - const node = nodes[(Math.random() * nodes.length) | 0] - this.ping(node).then(noop, noop) - }) - }) + // ask the user to handle it or reply back with a bad command + if (this.emit('request', req) === false) { + req.sendReply(BAD_COMMAND, null, false, true) + } + } + + _onresponse (res, external) { + this._addNodeFromNetwork(!external, res.from, res.to) + } + + _ontimeout (req) { + if (!req.to.id) return + const node = this.table.get(req.to.id) + if (node) this._removeNode(node) } _pingSome () { - let cnt = this.rpc.inflightRequests > 2 ? 3 : 5 + let cnt = this.io.inflight.length > 2 ? 3 : 5 let oldest = this.nodes.oldest - // tiny dht, ping the bootstrap again + // tiny dht, pinged the bootstrap again if (!oldest) { this.refresh() return } // we've recently pinged the oldest one, so only trigger a couple of repings - if ((this._tick - oldest.seen) < RECENT_NODE) { + if ((this._tick - oldest.pinged) < RECENT_NODE) { cnt = 2 } while (cnt--) { - if (!oldest || this._tick === oldest.seen) continue - this._check(oldest, oldest.seen) + if (!oldest || this._tick === oldest.pinged) continue + this._check(oldest) oldest = oldest.next } } - _check (node, lastSeen) { - this.ping(node) - .then( - () => this._removeStaleNode(node, lastSeen), - () => this._removeNode(node) - ) - } + _check (node) { + node.pinged = this._tick - _ontick () { - if (this._rotateSecrets) { - const tmp = this._secrets[0] - this._secrets[0] = this._secrets[1] - this._secrets[1] = tmp - sodium.crypto_generichash(tmp, tmp) + const lastSeen = node.seen + const onresponse = () => { + this._checks-- + this._removeStaleNode(node, lastSeen) + } + const onerror = () => { + this._checks-- + this._removeNode(node) } + this._checks++ + this._request({ id: null, host: node.host, port: node.port }, 'ping', null, null, onresponse, onerror) + } + + _ontick () { const time = Date.now() if (time - this._lastTick > SLEEPING_INTERVAL) { @@ -308,136 +410,82 @@ class DHT extends EventEmitter { if (!this.bootstrapped) return - if (this.adaptive && this.ephemeral && --this._persistentTicks <= 0) { - this._onpersistent() // the promise returned here never fails so just ignore it + if (this.adaptive && this.ephemeral && --this._stableTicks <= 0) { + if (this._lastHost === this._nat.host) { // do not recheck the same network... + this._stableTicks = MORE_STABLE_TICKS + } else { + this._onstable() // the promise returned here never fails so just ignore it + } } if ((this._tick & 7) === 0) { this._pingSome() } - if (!this.ephemeral && --this._pingBootstrapTicks <= 0) { - this._pingSomeBootstrapNodes() - } - if (((this._tick & 63) === 0 && this.nodes.length < this.table.k) || --this._refreshTicks <= 0) { this.refresh() } } - async _onpersistent () { - if (this.ephemeral === false) return false + async _onstable () { + if (!this.ephemeral) return false - // only require one sample here as we do the nat check anyway after - // makes settting up a dht easier... - const addr = this.remoteAddress(1) + const { host, port } = this._nat - if (addr.type !== DHT.NAT_PORT_CONSISTENT && addr.type !== DHT.NAT_OPEN) { - this._persistentTicks = MORE_PERSISTENT_TICKS + // remember what host we checked and reset the counter + this._stableTicks = MORE_STABLE_TICKS + this._lastHost = host + + // check if we have a consistent host and port + if (host === null || port === 0) { return false } - if (await this._checkIfFirewalled()) return false - if (!this.ephemeral) return false // incase it's called in parallel for some reason - - const id = nodeId(addr.host, addr.port) - if (this.table.id.equals(id)) return false - - const nodes = this.table.toArray() - - this.table = new Table(id) - for (const node of nodes) this.table.add(node) - this.table.on('row', this._onrow) - - this.ephemeral = false - this.emit('persistent') - return true - } - - async _addBootstrapNodes (nodes) { - return new Promise((resolve) => { - this._resolveBootstrapNodes(function (bootstrappers) { - nodes.push(...bootstrappers) - resolve() - }) - }) - } - - async _checkIfFirewalled () { - const nodes = [] - for (let node = this.nodes.latest; node && nodes.length < 5; node = node.prev) { - nodes.push(node) + // check if the external port is mapped to the internal port + if (this._nat.port !== this.address().port) { + return false } - if (nodes.length < 5) await this._addBootstrapNodes(nodes) - if (!nodes.length) return true // no nodes available, including bootstrappers - bail + const natSampler = this.firewalled ? new NatSampler() : this._nat - try { - await this.requestAll(null, 'ping_nat', null, nodes, { min: nodes.length >= 5 ? 3 : 1, max: 3 }) - } catch { - // not enough nat pings succeded - assume firewalled - return true - } + // ask remote nodes to ping us on our server socket to see if we have the port open + const firewalled = this.firewalled && await this._checkIfFirewalled(natSampler) + if (firewalled) return false - return false - } + this.firewalled = this.io.firewalled = false - _onwakeup () { - this._tick += 2 * OLD_NODE // bump the tick enough that everything appears old. - this._tick += 8 - (this._tick & 7) - 2 // triggers a series of pings in two ticks - this._persistentTicks = MORE_PERSISTENT_TICKS - this._pingBootstrapTicks = REFRESH_TICKS // forced ephemeral, so no need for a bootstrap ping soon - this._refreshTicks = 1 // triggers a refresh next tick (allow network time to wake up also) + // incase it's called in parallel for some reason, or if our nat status somehow changed + if (!this.ephemeral || host !== this._nat.host || port !== this._nat.port) return false + // if the firewall probe returned a different host / non consistent port, bail as well + if (natSampler.host !== host || natSampler.port === 0) return false - if (this.adaptive) { - this.ephemeral = true - this.emit('ephemeral') - } + const id = peer.id(natSampler.host, natSampler.port) - this.emit('wakeup') - } + this.ephemeral = this.io.ephemeral = false - _onfullrow (newNode, row) { - if (this.bootstrapped && this._reping.push({ newNode, row })) this._repingMaybe() - } + this._nonePersistentSamples = [] + this._nat = natSampler - _repingMaybe () { - while (this._repinging < 3 && this._reping.isEmpty() === false) { - const { newNode, row } = this._reping.shift() - if (this.table.get(newNode.id)) continue + // all good! copy over the old routing table to the new one + if (!this.table.id.equals(id)) { + const nodes = this.table.toArray() - let oldest = null - for (const node of row.nodes) { - if (node.seen === this._tick) continue - if (oldest === null || oldest.seen > node.seen || (oldest.seen === node.seen && oldest.added > node.added)) oldest = node + this.table = this.io.table = new Table(id) + + for (const node of nodes) { + if (node.id.equals(id)) continue + if (!this.table.add(node)) this.nodes.remove(node) } - if (oldest === null) continue - if ((this._tick - oldest.seen) < RECENT_NODE && (this._tick - oldest.added) > OLD_NODE) continue + this.table.on('row', this._onrow) - this._repingAndSwap(newNode, oldest) + // we need to rebootstrap/refresh since we updated our id + if (this.bootstrapped) this.refresh() } - } - _repingAndSwap (newNode, oldNode) { - const self = this - const lastSeen = oldNode.seen - - this._repinging++ - this.ping(oldNode).then(onsuccess, onswap) - - function onsuccess (m) { - if (oldNode.seen <= lastSeen) return onswap() - self._repinging-- - self._repingMaybe() - } + this.emit('persistent') - function onswap () { - self._repinging-- - self._repingMaybe() - self._removeNode(oldNode) - self._addNode(newNode) - } + return true } _resolveBootstrapNodes (done) { @@ -448,196 +496,91 @@ class DHT extends EventEmitter { for (const node of this.bootstrapNodes) { dns.lookup(node.host, { family: 4 }, (_, host) => { - if (host) nodes.push({ id: nodeId(host, node.port), host, port: node.port }) + if (host) nodes.push({ id: peer.id(host, node.port), host, port: node.port }) if (--missing === 0) done(nodes) }) } } - _addNode (node) { - if (this.nodes.has(node) || node.id.equals(this.table.id)) return - if (!this._userAddNode(node)) return - - node.added = node.seen = this._tick - - if (this.table.add(node)) this.nodes.add(node) - - this.emit('add-node', node) - } - - _removeStaleNode (node, lastSeen) { - if (node.seen <= lastSeen) this._removeNode(node) - } - - _removeNode (node) { - if (!this.nodes.has(node)) return - - this.table.remove(node.id) - this.nodes.remove(node) - - this.emit('remove-node', node) + async _addBootstrapNodes (nodes) { + return new Promise((resolve) => { + this._resolveBootstrapNodes(function (bootstrappers) { + nodes.push(...bootstrappers) + resolve() + }) + }) } - _addNodeFromMessage (m, sample) { - const id = nodeId(m.from.host, m.from.port) - - // verify id, if the id is mismatched it doesn't strictly mean the node is bad, could be - // a weird NAT thing, that the node is unaware of - in any case it def means we should not - // add this node to our routing table - if (!m.id.equals(id)) { - m.id = null - return + async _checkIfFirewalled (natSampler = new NatSampler()) { + const nodes = [] + for (let node = this.nodes.latest; node && nodes.length < 5; node = node.prev) { + nodes.push(node) } - const oldNode = this.table.get(id) + if (nodes.length < 5) await this._addBootstrapNodes(nodes) + // if no nodes are available, including bootstrappers - bail + if (nodes.length === 0) return true - // refresh it, if we've seen this before - if (oldNode) { - // if the node is indicating that we got a new ip - // make sure to add it again to the sampler. make sure we don't allow the remote node - // to add multiple entries though. - if (sample && (oldNode.to === null || oldNode.to.host !== m.to.host)) { - oldNode.to = m.to - // TODO: would be technically better to add to the head of the sample queue, but - // this is prop fine - const s = this._nat.sample(m.from) - if (s) { - s.port = m.to.port - s.host = m.to.host - } else { - this._nat.add(m.to, m.from) - } - } + const hosts = [] + const value = Buffer.allocUnsafe(2) + let pongs - oldNode.seen = this._tick - this.nodes.add(oldNode) - return - } + c.uint16.encode({ start: 0, end: 2, buffer: value }, this.io.serverSocket.address().port) - // add a sample of our address from the remote nodes pov - if (sample) this._nat.add(m.to, m.from) + // double check they actually came on the server socket... + this.io.serverSocket.on('message', onmessage) - this._addNode({ - id, - port: m.from.port, - host: m.from.host, - token: null, // adding this so it has the same "shape" as the query nodes for easier debugging - to: sample ? m.to : null, - added: this._tick, - seen: this._tick, - prev: null, - next: null - }) - } + pongs = await requestAll(this, 'ping_nat', value, nodes) + if (!pongs.length) return true - _onrequest (req, sample) { - // check if the roundtrip token is one we've generated within the last 10s for this peer - if (req.token !== null && !this._token(req.from, 1).equals(req.token) && !this._token(req.from, 0).equals(req.token)) { - req.token = null + let count = 0 + for (const res of pongs) { + if (hosts.indexOf(res.from.host) > -1) count++ } - if (req.id !== null) this._addNodeFromMessage(req, sample) - else this._pingBootstrapTicks = REFRESH_TICKS - // o/ if this node is ephemeral, it prob originated from a bootstrapper somehow so no need to ping them + this.io.serverSocket.removeListener('message', onmessage) - // echo the value back - if (req.command === 'ping') { - if (req.value === null || req.value.byteLength <= 32) this._reply(req.tid, null, 0, req.value, req.from, false) - return - } + // if we got very few replies, consider it a fluke + if (count < (nodes.length >= 5 ? 3 : 1)) return true - if (req.command === 'ping_nat') { - if (!this._localSocket) this._localSocket = dgram.createSocket('udp4') - this._reply(req.tid, null, 0, null, req.from, false, this._localSocket) - return - } + // check that the server socket has the same properties nat wise + pongs = await requestAll(this, 'ping', null, nodes, { socket: this.io.serverSocket }) + for (const seen of pongs) natSampler.add(seen.to.host, seen.to.port) - // empty dht reply back - if (req.command === 'find_node') { - this._reply(req.tid, req.target, 0, null, req.from, false) - return - } + // check that the server socket has the same ip as the client socket + if (natSampler.host === null || this._nat.host !== natSampler.host) return true - if (this.emit('request', new Request(this, req)) === false) { - this._reply(req.tid, req.target, 1, null, req.from, false) - } - } + // check that the local port of the server socket is the same as the remote port + if (natSampler.port === 0 || natSampler.port !== this.io.serverSocket.address().port) return true - _onresponse (res, sample) { - if (res.id !== null) this._addNodeFromMessage(res, sample) - else if (sample && this._nat.length < 3 && this._nat.sample(res.from) === null) this._nat.add(res.to, res.from) + return false - if (this._resolveSampled !== null && this._nat.length >= 3) { - this._resolveSampled(true) - this._resolveSampled = null + function onmessage (_, rinfo) { + hosts.push(rinfo.address) } } - bind (...args) { - return this.rpc.bind(...args) - } - - address () { - return this.rpc.address() - } + _backgroundQuery (target, command, value) { + this._refreshTicks = REFRESH_TICKS - remoteAddress (minSamples) { - const result = this._nat.analyze(minSamples) - if (result.type === NatAnalyzer.PORT_CONSISTENT && !this.ephemeral) result.type = DHT.NAT_OPEN - return result - } + const backgroundCon = Math.min(this.concurrency, Math.max(2, (this.concurrency / 8) | 0)) + const q = new Query(this, target, command, value, { concurrency: backgroundCon, maxSlow: 0 }) - addNode ({ host, port }) { - this._addNode({ - id: nodeId(host, port), - port, - host, - token: null, - to: null, - added: this._tick, - seen: this._tick, - prev: null, - next: null + q.on('data', () => { + // yield to other traffic + q.concurrency = this.io.inflight.length < 3 + ? this.concurrency + : backgroundCon }) - } - - toArray () { - return this.nodes.toArray().map(({ host, port }) => ({ host, port })) - } - _reply (tid, target, status, value, to, addToken, socket = this.rpc.socket) { - const closerNodes = target ? this.table.closest(target) : null - const ephemeral = socket !== this.rpc.socket || this.ephemeral - const reply = { - version: 2, - tid, - from: null, - to, - id: ephemeral ? null : this.table.id, - token: (!ephemeral && addToken) ? this._token(to, 1) : null, - target: null, - closerNodes, - command: null, - status, - value - } - - this.rpc.send(reply, socket) - return reply - } - - static id (node, out) { - return nodeId(node.host, node.port, out) + return q } } -DHT.OK = 0 -DHT.UNKNOWN_COMMAND = 1 -DHT.NAT_UNKNOWN = NatAnalyzer.UNKNOWN -DHT.NAT_OPEN = Symbol.for('NAT_OPEN') // implies PORT_CONSISTENT -DHT.NAT_PORT_CONSISTENT = NatAnalyzer.PORT_CONSISTENT -DHT.NAT_PORT_INCREMENTING = NatAnalyzer.PORT_INCREMENTING -DHT.NAT_PORT_RANDOMIZED = NatAnalyzer.PORT_RANDOMIZED +DHT.ERROR_BAD_TOKEN = BAD_TOKEN +DHT.ERROR_BAD_COMMAND = BAD_COMMAND +DHT.ERROR_TIMEOUT = TIMEOUT +DHT.ERROR_DESTROY = DESTROY module.exports = DHT @@ -658,21 +601,27 @@ function randomBytes (n) { return b } -function noop () {} - -function compare (id, a, b) { - for (let i = 0; i < id.length; i++) { - if (a[i] === b[i]) continue - const t = id[i] - return (t ^ a[i]) - (t ^ b[i]) - } - return 0 -} - function randomOffset (n) { return n - ((Math.random() * 0.5 * n) | 0) } -function allowAll (node) { - return true +function requestAll (dht, command, value, nodes, opts) { + let missing = nodes.length + const replies = [] + + return new Promise((resolve) => { + for (const node of nodes) { + dht.request({ token: null, command, target: null, value }, node, opts) + .then(onsuccess, onerror) + } + + function onsuccess (res) { + replies.push(res) + if (--missing === 0) resolve(replies) + } + + function onerror () { + if (--missing === 0) resolve(replies) + } + }) } diff --git a/lib/bind.js b/lib/bind.js new file mode 100644 index 0000000..d0f41aa --- /dev/null +++ b/lib/bind.js @@ -0,0 +1,39 @@ +// TODO: move to module so we can have udp+tcp mode also on the same port etc etc + +const dgram = require('dgram') + +module.exports = async function bind (port) { + return new Promise((resolve, reject) => { + const socket = dgram.createSocket('udp4') + let tries = 1 + + socket.bind(port) + socket.on('listening', onlistening) + socket.on('error', onerror) + + function onlistening () { + cleanup() + resolve(socket) + } + + function onerror (err) { + if (port === 0 || tries >= 5) { + cleanup() + reject(err) + return + } + + if (++tries < 5) { + socket.bind(++port) + } else { + port = 0 + socket.bind(0) + } + } + + function cleanup () { + socket.removeListener('error', onerror) + socket.removeListener('listening', onlistening) + } + }) +} diff --git a/lib/errors.js b/lib/errors.js new file mode 100644 index 0000000..695696a --- /dev/null +++ b/lib/errors.js @@ -0,0 +1,8 @@ +exports.BAD_COMMAND = 1 +exports.BAD_TOKEN = 2 + +exports.TIMEOUT = new Error('Request timed out') +exports.TIMEOUT.code = 'ETIMEDOUT' + +exports.DESTROY = new Error('Request destroyed') +exports.DESTROY.code = 'EDESTROYED' diff --git a/lib/id.js b/lib/id.js deleted file mode 100644 index ce61df8..0000000 --- a/lib/id.js +++ /dev/null @@ -1,25 +0,0 @@ -const sodium = require('sodium-universal') - -const addr = Buffer.alloc(6) -let i = 0 - -module.exports = hash - -function num (ip) { - let n = 0 - let c = 0 - while (i < ip.length && (c = ip.charCodeAt(i++)) !== 46) n = n * 10 + (c - 48) - return n -} - -function hash (ip, port, out = Buffer.allocUnsafe(32)) { - i = 0 - addr[0] = num(ip) - addr[1] = num(ip) - addr[2] = num(ip) - addr[3] = num(ip) - addr[4] = port - addr[5] = port >>> 8 - sodium.crypto_generichash(out, addr) - return out -} diff --git a/lib/io.js b/lib/io.js new file mode 100644 index 0000000..928a97b --- /dev/null +++ b/lib/io.js @@ -0,0 +1,404 @@ +const FIFO = require('fast-fifo') +const sodium = require('sodium-universal') +const c = require('compact-encoding') +const peer = require('./peer') +const bind = require('./bind') +const { BAD_TOKEN, TIMEOUT, DESTROY } = require('./errors') + +const VERSION = 0b11 +const RESPONSE_ID = (0b0001 << 4) | VERSION +const REQUEST_ID = (0b0000 << 4) | VERSION +const TMP = Buffer.alloc(32) +const EMPTY_ARRAY = [] + +module.exports = class IO { + constructor (table, { maxWindow = 80, bind = 0, firewalled = true, onrequest, onresponse = noop, ontimeout = noop } = {}) { + this.table = table + this.inflight = [] + this.clientSocket = null + this.serverSocket = null + this.firewalled = firewalled !== false + this.ephemeral = true + this.congestion = new CongestionWindow(maxWindow) + + this.onrequest = onrequest + this.onresponse = onresponse + this.ontimeout = ontimeout + + this._pending = new FIFO() + this._rotateSecrets = 8 + this._tid = (Math.random() * 65536) | 0 + this._secrets = null + this._drainInterval = null + this._destroying = null + this._binding = null + this._bind = bind + } + + onmessage (socket, buffer, rinfo) { + if (buffer.byteLength < 2) return + + const from = { id: null, host: rinfo.address, port: rinfo.port } + const state = { start: 1, end: buffer.byteLength, buffer } + const expectedSocket = this.firewalled ? this.clientSocket : this.serverSocket + const external = socket !== expectedSocket + + if (buffer[0] === REQUEST_ID) { + const req = Request.decode(this, socket, from, state) + if (req === null) return + if (req.token !== null && !req.token.equals(this.token(req.from, 1)) && !req.token.equals(this.token(req.from, 0))) { + req.error(BAD_TOKEN, { token: true }) + return + } + this.onrequest(req, external) + return + } + + if (buffer[0] === RESPONSE_ID) { + const res = decodeReply(from, state) + if (res === null) return + + for (let i = 0; i < this.inflight.length; i++) { + const req = this.inflight[i] + if (req.tid !== res.tid) continue + + if (i === this.inflight.length - 1) this.inflight.pop() + else this.inflight[i] = this.inflight.pop() + + if (req._timeout) { + clearTimeout(req._timeout) + req._timeout = null + } + + this.congestion.recv() + this.onresponse(res, external) + req.onresponse(res, req) + break + } + } + } + + token (addr, i) { + if (this._secrets === null) { + const buf = Buffer.alloc(64) + this._secrets = [buf.subarray(0, 32), buf.subarray(32, 64)] + sodium.randombytes_buf(this._secrets[0]) + sodium.randombytes_buf(this._secrets[1]) + } + + const token = Buffer.allocUnsafe(32) + sodium.crypto_generichash(token, Buffer.from(addr.host), this._secrets[i]) + return token + } + + async destroy () { + if (this._destroying) return this._destroying + + // simplifies timing to await the bind here also, although it might be unneeded + await this.bind() + + if (this._drainInterval) { + clearInterval(this._drainInterval) + this._drainInterval = null + } + + while (this.inflight.length) { + const req = this.inflight.pop() + if (req._timeout) clearTimeout(req._timeout) + req._timeout = null + req.destroyed = true + req.onerror(DESTROY, req) + } + + this._destroying = new Promise((resolve) => { + let missing = 2 + + this.serverSocket.close(done) + this.clientSocket.close(done) + + function done () { + if (--missing === 0) resolve() + } + }) + + return this._destroying + } + + bind () { + if (this._binding) return this._binding + this._binding = this._bindSockets() + return this._binding + } + + async _bindSockets () { + this.serverSocket = typeof this._bind === 'function' ? this._bind() : await bind(this._bind) + + try { + // TODO: we should reroll the socket is it's close to our preferred range of ports + // to avoid it being accidentally opened + // We'll prop need additional APIs for that + this.clientSocket = await bind(0) + } catch (err) { + await new Promise((resolve) => this.serverSocket.close(resolve)) + this.serverSocket = null + throw err + } + + this.serverSocket.on('message', this.onmessage.bind(this, this.serverSocket)) + this.clientSocket.on('message', this.onmessage.bind(this, this.clientSocket)) + + if (this._drainInterval === null) { + this._drainInterval = setInterval(this._drain.bind(this), 750) + if (this._drainInterval.unref) this._drainInterval.unref() + } + + for (const req of this.inflight) { + if (!req.socket) req.socket = this.firewalled ? this.clientSocket : this.serverSocket + req.sent = 0 + req.send(false) + } + } + + _drain () { + if (this._secrets !== null && --this._rotateSecrets === 0) { + this._rotateSecrets = 8 + const tmp = this._secrets[0] + this._secrets[0] = this._secrets[1] + this._secrets[1] = tmp + sodium.crypto_generichash(tmp, tmp) + } + + this.congestion.drain() + + while (!this.congestion.isFull()) { + const p = this._pending.shift() + if (p === undefined) return + p._sendNow() + } + } + + createRequest (to, token, command, target, value) { + if (this._destroying !== null) return null + + if (this._tid === 65536) this._tid = 0 + + const tid = this._tid++ + const socket = this.firewalled ? this.clientSocket : this.serverSocket + + const req = new Request(this, socket, tid, null, to, token, command, target, value) + this.inflight.push(req) + return req + } +} + +class Request { + constructor (io, socket, tid, from, to, token, command, target, value) { + this.socket = socket + this.tid = tid + this.from = from + this.to = to + this.token = token + this.command = command + this.target = target + this.value = value + this.sent = 0 + this.destroyed = false + + this.oncycle = noop + this.onerror = noop + this.onresponse = noop + + this._buffer = null + this._io = io + this._timeout = null + } + + static decode (io, socket, from, state) { + try { + const flags = c.uint.decode(state) + const tid = c.uint16.decode(state) + const to = peer.ipv4.decode(state) + const id = flags & 1 ? c.fixed32.decode(state) : null + const token = flags & 2 ? c.fixed32.decode(state) : null + const command = c.string.decode(state) + const target = flags & 4 ? c.fixed32.decode(state) : null + const value = flags & 8 ? c.buffer.decode(state) : null + + if (id !== null) from.id = validateId(id, from) + + return new Request(io, socket, tid, from, to, token, command, target, value) + } catch { + return null + } + } + + reply (value, opts = {}) { + this.sendReply(0, value || null, opts.token !== false, this.target !== null && opts.closerNodes !== false) + } + + error (code, opts = {}) { + this.sendReply(code, null, false, this.target !== null && opts.closerNodes !== false) + } + + send (force = false) { + if (this.destroyed) return + + if (this.socket === null) return + if (this._buffer === null) this._buffer = this._encodeRequest() + + if (!force && this._io.congestion.isFull()) { + this._io._pending.push(this) + return + } + + this._sendNow() + } + + _sendNow () { + if (this.destroyed) return + this.sent++ + this._io.congestion.send() + this.socket.send(this._buffer, 0, this._buffer.byteLength, this.to.port, this.to.host) + if (this._timeout) clearTimeout(this._timeout) + this._timeout = setTimeout(oncycle, 1000, this) + } + + destroy (err) { + if (this.destroyed) return + this.destroyed = true + + const i = this._io.inflight.indexOf(this) + if (i === -1) return + + if (i === this._io.inflight.length - 1) this._io.inflight.pop() + else this._io.inflight[i] = this._io.inflight.pop() + + this.onerror(err || DESTROY, this) + } + + sendReply (error, value, token, hasCloserNodes) { + if (this.socket === null || this.destroyed) return + + const id = this._io.ephemeral === false && this.socket === this._io.serverSocket + const closerNodes = hasCloserNodes ? this._io.table.closest(this.target) : EMPTY_ARRAY + const state = { start: 0, end: 1 + 1 + 6 + 2, buffer: null } // (type | version) + flags + to + tid + + if (id) state.end += 32 + if (token) state.end += 32 + if (closerNodes.length > 0) peer.ipv4Array.preencode(state, closerNodes) + if (error > 0) c.uint.preencode(state, error) + if (value) c.buffer.preencode(state, value) + + state.buffer = Buffer.allocUnsafe(state.end) + state.buffer[state.start++] = RESPONSE_ID + state.buffer[state.start++] = (id ? 1 : 0) | (token ? 2 : 0) | (closerNodes.length > 0 ? 4 : 0) | (error > 0 ? 8 : 0) | (value ? 16 : 0) + + c.uint16.encode(state, this.tid) + peer.ipv4.encode(state, this.from) + + if (id) c.fixed32.encode(state, this._io.table.id) + if (token) c.fixed32.encode(state, this._io.token(this.to, 1)) + if (closerNodes.length > 0) peer.ipv4Array.encode(state, closerNodes) + if (error > 0) c.uint.encode(state, error) + if (value) c.buffer.encode(state, value) + + this.socket.send(state.buffer, 0, state.buffer.byteLength, this.from.port, this.from.host) + } + + _encodeRequest () { + const id = this._io.ephemeral === false && this.socket === this._io.serverSocket + const state = { start: 0, end: 1 + 1 + 6 + 2, buffer: null } // (type | version) + flags + to + tid + + if (id) state.end += 32 + if (this.token) state.end += 32 + + c.string.preencode(state, this.command) + + if (this.target) state.end += 32 + if (this.value) c.buffer.preencode(state, this.value) + + state.buffer = Buffer.allocUnsafe(state.end) + state.buffer[state.start++] = REQUEST_ID + state.buffer[state.start++] = (id ? 1 : 0) | (this.token ? 2 : 0) | (this.target ? 4 : 0) | (this.value ? 8 : 0) + + c.uint16.encode(state, this.tid) + peer.ipv4.encode(state, this.to) + + if (id) c.fixed32.encode(state, this._io.table.id) + if (this.token) c.fixed32.encode(state, this.token) + + c.string.encode(state, this.command) + + if (this.target) c.fixed32.encode(state, this.target) + if (this.value) c.buffer.encode(state, this.value) + + return state.buffer + } +} + +class CongestionWindow { + constructor (maxWindow) { + this._i = 0 + this._total = 0 + this._window = [0, 0, 0, 0] + this._maxWindow = maxWindow + } + + isFull () { + return this._total >= 2 * this._maxWindow || this._window[this._i] >= this._maxWindow + } + + recv () { + if (this._window[this._i] > 0) { + this._window[this._i]-- + this._total-- + } + } + + send () { + this._total++ + this._window[this._i]++ + } + + drain () { + this._i = (this._i + 1) & 3 + this._total -= this._window[this._i] + this._window[this._i] = 0 // clear oldest + } +} + +function noop () {} + +function oncycle (req) { + req._timeout = null + req.oncycle(req) + if (req.sent === 3) { + req.destroy(TIMEOUT) + req._io.ontimeout(req) + } else { + req.send() + } +} + +function decodeReply (from, state) { + const flags = c.uint.decode(state) + const tid = c.uint16.decode(state) + const to = peer.ipv4.decode(state) + const id = flags & 1 ? c.fixed32.decode(state) : null + const token = flags & 2 ? c.fixed32.decode(state) : null + const closerNodes = flags & 4 ? peer.ipv4Array.decode(state) : null + const error = flags & 8 ? c.uint.decode(state) : 0 + const value = flags & 16 ? c.buffer.decode(state) : null + + if (id !== null) from.id = validateId(id, from) + + try { + return { tid, from, to, token, closerNodes, error, value } + } catch { + return null + } +} + +function validateId (id, from) { + return peer.id(from.host, from.port, TMP).equals(id) ? id : null +} diff --git a/lib/messages.js b/lib/messages.js deleted file mode 100644 index 930fc57..0000000 --- a/lib/messages.js +++ /dev/null @@ -1,107 +0,0 @@ -const cenc = require('compact-encoding') - -const IPv4 = exports.IPv4 = { - preencode (state, ip) { - state.end += 4 - }, - encode (state, ip) { // TODO: move over fast parser from ./id.js - const nums = ip.split('.') - state.buffer[state.start++] = Number(nums[0]) || 0 - state.buffer[state.start++] = Number(nums[1]) || 0 - state.buffer[state.start++] = Number(nums[2]) || 0 - state.buffer[state.start++] = Number(nums[3]) || 0 - }, - decode (state) { - if (state.end - state.start < 4) throw new Error('Out of bounds') - return state.buffer[state.start++] + '.' + state.buffer[state.start++] + '.' + state.buffer[state.start++] + '.' + state.buffer[state.start++] - } -} - -const peerIPv4 = exports.peerIPv4 = { - preencode (state, peer) { - state.end += 6 - }, - encode (state, peer) { - IPv4.encode(state, peer.host) - cenc.uint16.encode(state, peer.port) - }, - decode (state) { - return { - host: IPv4.decode(state), - port: cenc.uint16.decode(state) - } - } -} - -const peerIPv4Array = exports.peerIPv4Array = cenc.array(peerIPv4) - -const IS_REQUEST = 0b0001 -const HAS_ID = 0b0010 -const HAS_TOKEN = 0b0100 -const ROUTE_INFO = 0b1000 | IS_REQUEST -const HAS_TARGET = ROUTE_INFO | IS_REQUEST -const HAS_CLOSER_NODES = ROUTE_INFO ^ IS_REQUEST - -exports.message = { - preencode (state, m) { - state.end += 1 // version - state.end += 1 // flags - state.end += 2 // tid - state.end += 6 // to - - if (m.id) state.end += 32 - if (m.token) state.end += 32 - if (m.target) state.end += 32 - if (m.closerNodes && m.closerNodes.length) peerIPv4Array.preencode(state, m.closerNodes) - if (m.command) cenc.string.preencode(state, m.command) - else cenc.uint.preencode(state, m.status) - - cenc.buffer.preencode(state, m.value) - }, - encode (state, m) { - const closerNodes = m.closerNodes || [] - const flags = (m.id ? HAS_ID : 0) | - (m.token ? HAS_TOKEN : 0) | - (closerNodes.length ? HAS_CLOSER_NODES : 0) | - (m.target ? HAS_TARGET : 0) | - (m.command ? IS_REQUEST : 0) - - state.buffer[state.start++] = 2 - state.buffer[state.start++] = flags - - cenc.uint16.encode(state, m.tid) - peerIPv4.encode(state, m.to) - - if ((flags & HAS_ID) === HAS_ID) cenc.fixed32.encode(state, m.id) - if ((flags & HAS_TOKEN) === HAS_TOKEN) cenc.fixed32.encode(state, m.token) - if ((flags & ROUTE_INFO) === HAS_TARGET) cenc.fixed32.encode(state, m.target) - if ((flags & ROUTE_INFO) === HAS_CLOSER_NODES) peerIPv4Array.encode(state, closerNodes) - if ((flags & IS_REQUEST) === IS_REQUEST) cenc.string.encode(state, m.command) - if ((flags & IS_REQUEST) === 0) cenc.uint.encode(state, m.status) - - cenc.buffer.encode(state, m.value) - }, - decode (state) { - const version = state.buffer[state.start++] - - if (version !== 2) { - throw new Error('Incompatible version') - } - - const flags = cenc.uint.decode(state) - - return { - version: 2, - tid: cenc.uint16.decode(state), - from: null, // populated in caller - to: peerIPv4.decode(state), - id: (flags & HAS_ID) === HAS_ID ? cenc.fixed32.decode(state) : null, - token: (flags & HAS_TOKEN) === HAS_TOKEN ? cenc.fixed32.decode(state) : null, - target: ((flags & ROUTE_INFO) === HAS_TARGET) ? cenc.fixed32.decode(state) : null, - closerNodes: ((flags & ROUTE_INFO) === HAS_CLOSER_NODES) ? peerIPv4Array.decode(state) : null, - command: ((flags & IS_REQUEST) === IS_REQUEST) ? cenc.string.decode(state) : null, - status: ((flags & IS_REQUEST) === 0) ? cenc.uint.decode(state) : 0, - value: cenc.buffer.decode(state) - } - } -} diff --git a/lib/nat-analyzer.js b/lib/nat-analyzer.js deleted file mode 100644 index 788a36d..0000000 --- a/lib/nat-analyzer.js +++ /dev/null @@ -1,117 +0,0 @@ -// how far can the port median distance be? -const INCREMENTING_THRESHOLD = 200 - -class NatAnalyzer { - constructor (sampleSize) { - // sampleSize must be 2^n - this.samples = new Array(sampleSize) - this.length = 0 - this.top = 0 - } - - sample (referrer) { - for (let i = 0; i < this.length; i++) { - const s = this.samples[i] - const r = s.referrer - if (r.port === referrer.port && r.host === referrer.host) return s - } - return null - } - - add (addr, referrer) { - if (this.length < this.samples.length) this.length++ - this.samples[this.top] = { port: addr.port, host: addr.host, dist: 0, referrer } - this.top = (this.top + 1) & (this.samples.length - 1) - } - - analyze (minSamples = 3) { - if (this.length < minSamples) return { type: NatAnalyzer.UNKNOWN, host: null, port: 0 } - - const samples = this.samples.slice(0, this.length) - const hosts = new Map() - - let bestHost = null - let bestHits = 0 - - for (let i = 0; i < samples.length; i++) { - const host = samples[i].host - const hits = (hosts.get(host) || 0) + 1 - - hosts.set(host, hits) - - if (hits > bestHits) { - bestHits = hits - bestHost = host - } - } - - if (bestHits < (samples.length >> 1)) { - return { type: NatAnalyzer.UNKNOWN, host: null, port: 0 } - } - - samples.sort(cmpPort) - - let start = 0 - let end = samples.length - let mid = samples[samples.length >> 1].port - - // remove the 3 biggest outliers from the median if we have more than 6 samples - if (samples.length >= 6) { - for (let i = 0; i < 3; i++) { - const s = samples[start] - const e = samples[end - 1] - - if (Math.abs(mid - s.port) < Math.abs(mid - e.port)) end-- - else start++ - } - } - - const len = end - start - mid = samples[len >> 1].port - - for (let i = 0; i < samples.length; i++) { - samples[i].dist = Math.abs(mid - samples[i].port) - } - - // note that still sorts with the outliers which is why we just start=0, end=len-1 below - samples.sort(cmpDist) - mid = samples[len >> 1].dist - - if (samples[0].dist === 0 && samples[len - 1].dist === 0) { - return { - type: NatAnalyzer.PORT_CONSISTENT, - host: bestHost, - port: samples[0].port - } - } - - if (mid < INCREMENTING_THRESHOLD) { - return { - type: NatAnalyzer.PORT_INCREMENTING, - host: bestHost, - port: 0 - } - } - - return { - type: NatAnalyzer.PORT_RANDOMIZED, - host: bestHost, - port: 0 - } - } -} - -NatAnalyzer.UNKNOWN = Symbol.for('NAT_UNKNOWN') -NatAnalyzer.PORT_CONSISTENT = Symbol.for('NAT_PORT_CONSISTENT') -NatAnalyzer.PORT_INCREMENTING = Symbol.for('NAT_PORT_INCREMENTING') -NatAnalyzer.PORT_RANDOMIZED = Symbol.for('NAT_PORT_RANDOMIZED') - -module.exports = NatAnalyzer - -function cmpDist (a, b) { - return a.dist - b.dist -} - -function cmpPort (a, b) { - return a.port - b.port -} diff --git a/lib/peer.js b/lib/peer.js new file mode 100644 index 0000000..55eae72 --- /dev/null +++ b/lib/peer.js @@ -0,0 +1,49 @@ +const sodium = require('sodium-universal') +const c = require('compact-encoding') + +const addr = Buffer.alloc(6) +let i = 0 + +const ipv4 = { + preencode (state, p) { + state.end += 6 + }, + encode (state, p) { + i = 0 + state.buffer[state.start++] = num(p.host) + state.buffer[state.start++] = num(p.host) + state.buffer[state.start++] = num(p.host) + state.buffer[state.start++] = num(p.host) + state.buffer[state.start++] = p.port + state.buffer[state.start++] = p.port >>> 8 + }, + decode (state) { + if (state.end - state.start < 6) throw new Error('Out of bounds') + return { + id: null, // populated elsewhere + host: state.buffer[state.start++] + '.' + state.buffer[state.start++] + '.' + state.buffer[state.start++] + '.' + state.buffer[state.start++], + port: state.buffer[state.start++] + 256 * state.buffer[state.start++] + } + } +} + +module.exports = { id, ipv4, ipv4Array: c.array(ipv4) } + +function num (ip) { + let n = 0 + let c = 0 + while (i < ip.length && (c = ip.charCodeAt(i++)) !== 46) n = n * 10 + (c - 48) + return n +} + +function id (ip, port, out = Buffer.allocUnsafe(32)) { + i = 0 + addr[0] = num(ip) + addr[1] = num(ip) + addr[2] = num(ip) + addr[3] = num(ip) + addr[4] = port + addr[5] = port >>> 8 + sodium.crypto_generichash(out, addr) + return out +} diff --git a/lib/query.js b/lib/query.js index 3d6ce04..f9f2a86 100644 --- a/lib/query.js +++ b/lib/query.js @@ -1,6 +1,8 @@ const { Readable } = require('streamx') -const race = require('./race') -const nodeId = require('./id') +const peer = require('./peer') + +const DONE = [] +const DOWN = [] module.exports = class Query extends Readable { constructor (dht, target, command, value, opts = {}) { @@ -16,17 +18,20 @@ module.exports = class Query extends Readable { this.concurrency = opts.concurrency || this.dht.concurrency this.inflight = 0 this.map = opts.map || defaultMap + this.maxSlow = opts.maxSlow === 0 ? 0 : (opts.maxSlow || 5) this.closestReplies = [] + this._slow = 0 this._slowdown = false - this._seen = new Set() + this._seen = new Map() this._pending = [] - this._onresolve = this._onvisit.bind(this) - this._onreject = this._onerror.bind(this) this._fromTable = false this._commit = opts.commit === true ? autoCommit : (opts.commit || null) this._commiting = false - this._ropts = { socket: (opts && opts.socket) || this.dht.rpc.socket, expectOk: false } + + this._onvisitbound = this._onvisit.bind(this) + this._onerrorbound = this._onerror.bind(this) + this._oncyclebound = this._oncycle.bind(this) const nodes = opts.nodes || opts.closestNodes @@ -34,7 +39,7 @@ module.exports = class Query extends Readable { // add them reverse as we pop below for (let i = nodes.length - 1; i >= 0; i--) { const node = nodes[i] - this._addPending(node.id || null, node.host, node.port) + this._addPending({ id: node.id || peer.id(node.host, node.port), host: node.host, port: node.port }, null) } } } @@ -43,13 +48,7 @@ module.exports = class Query extends Readable { const nodes = new Array(this.closestReplies.length) for (let i = 0; i < nodes.length; i++) { - const c = this.closestReplies[i] - - nodes[i] = { - id: c.id, - host: c.from.host, - port: c.from.port - } + nodes[i] = this.closestReplies[i].from } return nodes @@ -84,7 +83,7 @@ module.exports = class Query extends Readable { const closest = this.dht.table.closest(this.target, this.k - this._pending.length) for (const node of closest) { - this._addPending(node.id, node.host, node.port) + this._addPending({ id: node.id, host: node.host, port: node.port }, null) } } @@ -94,22 +93,42 @@ module.exports = class Query extends Readable { this.dht._resolveBootstrapNodes((bootstrapNodes) => { for (const node of bootstrapNodes) { - this._addPending(node.id, node.host, node.port) + this._addPending(node, null) } cb(null) }) } _isCloser (id) { - return this.closestReplies.length < this.k || this._compare(id, this.closestReplies[this.closestReplies.length - 1].id) < 0 + return this.closestReplies.length < this.k || this._compare(id, this.closestReplies[this.closestReplies.length - 1].from.id) < 0 } - _addPending (id, host, port) { - if (id && !this._isCloser(id)) return false - const addr = host + ':' + port - if (this._seen.has(addr)) return true - this._seen.add(addr) - this._pending.push({ id, host, port }) + _addPending (node, ref) { + const addr = node.host + ':' + node.port + const refs = this._seen.get(addr) + const isCloser = this._isCloser(node.id) + + if (refs === DONE) { + return isCloser + } + + if (refs === DOWN) { + if (ref) this._downHint(ref, node) + return isCloser + } + + if (refs) { + if (ref !== null) refs.push(ref) + return isCloser + } + + if (!isCloser) { + return false + } + + this._seen.set(addr, ref === null ? [] : [ref]) + this._pending.push(node) + return true } @@ -119,9 +138,9 @@ module.exports = class Query extends Readable { } _readMore () { - if (this.destroying) return + if (this.destroying || this._commiting) return - const concurrency = this._slowdown ? 3 : this.concurrency + const concurrency = (this._slowdown ? 3 : this.concurrency) + this._slow while (this.inflight < concurrency && this._pending.length > 0) { const next = this._pending.pop() @@ -135,11 +154,15 @@ module.exports = class Query extends Readable { this._slowdown = true } - if (this.inflight === 0 && this._pending.length === 0) { + if (this._pending.length > 0) return + + // if no inflight OR all the queries we are waiting on are marked as slow (within our limits) and we have a full result. + if (this.inflight === 0 || (this._slow <= this.maxSlow && this._slow === this.inflight && this.closestReplies.length >= this.k)) { // if more than 3/4 failed and we only used cached nodes, try again from the routing table if (!this._fromTable && this.successes < this.k / 4) { this._addFromTable() - return this._readMore() + this._readMore() + return } this._flush() @@ -147,41 +170,73 @@ module.exports = class Query extends Readable { } _flush () { + if (this._commiting) return + this._commiting = true + if (this._commit === null) { this.push(null) return } - if (!this.closestReplies.length) { + const p = [] + for (const m of this.closestReplies) p.push(this._commit(m, this.dht, this)) + this._endAfterCommit(p) + } + + _endAfterCommit (ps) { + if (!ps.length) { this.destroy(new Error('Too few nodes responded')) return } - if (this._commiting) return - this._commiting = true + const self = this - const p = [] - for (const m of this.closestReplies) p.push(this._commit(m, this.dht, this)) + let pending = ps.length + let success = 0 - race(p, 1, p.length) - .then(() => this.push(null), (err) => this.destroy(err)) - } + for (const p of ps) p.then(ondone, onerror) - _onvisit (m) { - if (m.status === 0) this.successes++ - else this.errors++ + function ondone () { + success++ + if (--pending === 0) self.push(null) + } - this.inflight-- + function onerror (err) { + if (--pending > 0) return + if (success) self.push(null) + else self.destroy(err) + } + } - if (m.status === 0 && m.id !== null && this._isCloser(m.id)) { - this._pushClosest(m) + _dec (req) { + if (req.oncycle === noop) { + this._slow-- + } else { + req.oncycle = noop } + this.inflight-- + } + + _onvisit (m, req) { + this._dec(req) + + const addr = req.to.host + ':' + req.to.port + this._seen.set(addr, DONE) + + if (this._commiting) return + + if (m.error === 0) this.successes++ + else this.errors++ + + if (m.error === 0 && m.from.id !== null && this._isCloser(m.from.id)) this._pushClosest(m) if (m.closerNodes !== null) { for (const node of m.closerNodes) { - const id = nodeId(node.host, node.port) - if (id.equals(this.dht.table.id)) continue - if (!this._addPending(id, node.host, node.port)) break + node.id = peer.id(node.host, node.port) + if (node.id.equals(this.dht.table.id)) continue + // TODO: we could continue here instead of breaking to ensure that one of the nodes in the closer list + // is later marked as DOWN that we gossip that back + if (!this._addPending(node, m.from)) break } } @@ -189,7 +244,7 @@ module.exports = class Query extends Readable { this._slowdown = false } - if (m.status !== 0) { + if (m.error !== 0) { this._readMore() return } @@ -200,17 +255,35 @@ module.exports = class Query extends Readable { } } - _onerror () { + _onerror (_, req) { + const addr = req.to.host + ':' + req.to.port + const refs = this._seen.get(addr) + + this._seen.set(addr, DOWN) + for (const node of refs) this._downHint(node, req.to) + + this._dec(req) this.errors++ - this.inflight-- this._readMore() } + _oncycle (req) { + req.oncycle = noop + this._slow++ + this._readMore() + } + + _downHint (node, down) { + const state = { start: 0, end: 6, buffer: Buffer.allocUnsafe(6) } + peer.ipv4.encode(state, down) + this.dht._request(node, 'down_hint', null, state.buffer, noop, noop) + } + _pushClosest (m) { this.closestReplies.push(m) for (let i = this.closestReplies.length - 2; i >= 0; i--) { const prev = this.closestReplies[i] - const cmp = this._compare(prev.id, m.id) + const cmp = this._compare(prev.from.id, m.from.id) // if sorted, done! if (cmp < 0) break // if dup, splice it out (rare) @@ -234,18 +307,21 @@ module.exports = class Query extends Readable { return 0 } - _visit (node) { + _visit (to) { this.inflight++ - this.dht.request(this.target, this.command, this.value, node, this._ropts) - .then(this._onresolve, this._onreject) + + const req = this.dht._request(to, this.command, this.target, this.value, this._onvisitbound, this._onerrorbound) + req.oncycle = this._oncyclebound } } function autoCommit (reply, dht, query) { if (!reply.token) return Promise.reject(new Error('No token received for closest node')) - return dht.request(query.target, query.command, query.value, reply.from, { token: reply.token }) + return dht.request({ token: reply.token, target: query.target, command: query.command, value: query.value }, reply.from) } function defaultMap (m) { return m } + +function noop () {} diff --git a/lib/race.js b/lib/race.js deleted file mode 100644 index 277b54f..0000000 --- a/lib/race.js +++ /dev/null @@ -1,16 +0,0 @@ -module.exports = async function race (p, min = 1, max = p.length) { - let errors = 0 - const results = [] - // avoid unhandled rejections after early return/throw - for (const promise of p) promise.catch(() => {}) - for (let i = 0; i < p.length; i++) { - try { - const res = await p[i] - if (results.length < max) results.push(res) - if (results.length >= max) return results - if (results.length + errors === p.length) return results - } catch { - if ((p.length - ++errors) < min) throw new Error('Too many requests failed') - } - } -} diff --git a/lib/rpc.js b/lib/rpc.js deleted file mode 100644 index 8412a22..0000000 --- a/lib/rpc.js +++ /dev/null @@ -1,255 +0,0 @@ -const dgram = require('dgram') -const { message } = require('./messages') - -module.exports = class RPC { - constructor (opts = {}) { - this._pendingSends = [] - this._tid = (Math.random() * 65536) | 0 - this._drainInterval = null - this._tick = 0 - this._w = 0 - this._win = [0, 0, 0, 0] - this._bind = opts.bind || 0 - this._bound = false - this._binding = null - - this.maxWindow = opts.maxWindow || 80 // ~100 per second burst, ~80 per second avg - this.maxRetries = 3 - this.destroyed = false - this.inflight = [] - this.onrequest = opts.onrequest || noop - this.onresponse = opts.onresponse || noop - this.onwarning = opts.onwarning || noop - this.socket = opts.socket || dgram.createSocket('udp4') - this.socket.on('message', this.onmessage.bind(this, true)) - } - - get inflightRequests () { - return this.inflight.length - } - - send (m, socket = this.socket) { - const state = { start: 0, end: 0, buffer: null } - - message.preencode(state, m) - state.buffer = Buffer.allocUnsafe(state.end) - message.encode(state, m) - - this._send(socket, state.buffer, m.to) - } - - reply (req, reply, socket = this.socket) { - reply.tid = req.tid - reply.to = req.from - this.send(reply, socket) - } - - address () { - return this.socket.address() - } - - bind (port = this._bind) { - if (this._binding) return this._binding - - const self = this - - this._binding = new Promise((resolve, reject) => { - const s = this.socket - - s.bind(port) - s.on('listening', onlistening) - s.on('error', onerror) - - function onlistening () { - self._bound = true - - s.removeListener('listening', onlistening) - s.removeListener('error', onerror) - resolve(s.address().port) - } - - function onerror (err) { - // retry on any port if preferred port is unavail - if (port === self._bind && port !== 0) { - port = 0 - s.bind(0) - return - } - - s.removeListener('listening', onlistening) - s.removeListener('error', onerror) - reject(err) - } - }) - - return this._binding - } - - destroy () { - if (this.destroyed) return - this.unwrap(true) - this.socket.close() - } - - unwrap (closing = false) { - if (this.destroyed) return - this.destroyed = true - - clearInterval(this._drainInterval) - this.socket.removeAllListeners() - - for (const req of this.inflight) { - req.reject(new Error('RPC socket destroyed')) - } - - this.inflight = [] - return this.socket - } - - async request (m, opts) { - if (this.destroyed) throw new Error('RPC socket destroyed') - - const socket = (opts && opts.socket) || this.socket - - if (!this._bound && socket === this.socket) await this.bind() - - if (this._drainInterval === null) { - this._drainInterval = setInterval(this._drain.bind(this), 750) - if (this._drainInterval.unref) this._drainInterval.unref() - } - - m.tid = this._tid++ - if (this._tid === 65536) this._tid = 0 - - const state = { start: 0, end: 0, buffer: null } - - message.preencode(state, m) - state.buffer = Buffer.allocUnsafe(state.end) - message.encode(state, m) - - return new Promise((resolve, reject) => { - const total = this._win[0] + this._win[1] + this._win[2] + this._win[3] - const req = { - socket, - timeout: 2, - expectOk: !!(opts && opts.expectOk !== false), - tries: (opts && opts.retry === false) ? this.maxRetries : 0, - tid: m.tid, - buffer: state.buffer, - to: m.to, - resolve, - reject - } - - this.inflight.push(req) - - if (total < 2 * this.maxWindow && this._win[this._w] < this.maxWindow) { - this._win[this._w]++ - req.tries++ - this._send(req.socket, req.buffer, req.to) - } - }) - } - - onmessage (sample, buffer, rinfo) { - const from = { host: rinfo.address, port: rinfo.port } - if (!from.port) return - if (buffer.byteLength <= 1) return - - const state = { start: 0, end: buffer.byteLength, buffer } - let m = null - - try { - m = message.decode(state) - } catch (err) { - this.onwarning(err) - return - } - - m.from = from - - if (m.command !== null) { // request - if (this.onrequest === noop) return - this.onrequest(m, sample) - return - } - - const req = this._dequeue(m.tid) - - if (req === null) return - if (m.id && (req.to.port !== from.port || req.to.host !== from.host)) m.id = null - - // decrement the inflight window as this is an "ack" - if (this._win[this._w] > 0) this._win[this._w]-- - this.onresponse(m, sample) - - if (m.status === 0 || req.expectOk === false) { - req.resolve(m) - } else { - req.reject(createStatusError(m.status)) - } - } - - _send (socket, buf, addr) { - if (this.destroyed) return - socket.send(buf, 0, buf.byteLength, addr.port, addr.host) - } - - _dequeue (tid) { - for (let i = 0; i < this.inflight.length; i++) { - const req = this.inflight[i] - - if (req.tid === tid) { - if (i === this.inflight.length - 1) this.inflight.pop() - else this.inflight[i] = this.inflight.pop() - return req - } - } - - return null - } - - _drain () { - let total = this._win[0] + this._win[1] + this._win[2] + this._win[3] - - for (let i = 0; i < this.inflight.length; i++) { - const req = this.inflight[i] - - if (req.tries > 0 && --req.timeout >= 0) continue - req.timeout = 2 - - if (req.tries++ > this.maxRetries) { - if (i === this.inflight.length - 1) this.inflight.pop() - else this.inflight[i] = this.inflight.pop() - req.reject(createTimeoutError()) - continue - } - - if (total >= 2 * this.maxWindow || this._win[this._w] >= this.maxWindow) { - req.tries-- - continue - } - - total++ - this._win[this._w]++ - this._send(req.socket, req.buffer, req.to) - } - - this._w = (this._w + 1) & 3 - this._win[this._w] = 0 // clear oldest - } -} - -function createTimeoutError () { - const err = new Error('Request timed out') - err.status = 0 - return err -} - -function createStatusError (status) { - const err = new Error('Request failed with status ' + status) - err.status = status - return err -} - -function noop () {} diff --git a/package.json b/package.json index 30c171e..fd764be 100644 --- a/package.json +++ b/package.json @@ -7,6 +7,7 @@ "compact-encoding": "^2.1.0", "fast-fifo": "^1.0.0", "kademlia-routing-table": "^1.0.0", + "nat-sampler": "^1.0.1", "sodium-universal": "^3.0.4", "streamx": "^2.10.3", "time-ordered-set": "^1.0.2" diff --git a/test.js b/test.js index 02a651b..2227a28 100644 --- a/test.js +++ b/test.js @@ -12,14 +12,15 @@ tape('make bigger swarm', async function (t) { const swarm = await makeSwarm(500) const targetNode = swarm[25] + const target = targetNode.id - let q = swarm[499].query(targetNode.id, 'find_node', null) + let q = swarm[499].query({ command: 'find_node', target }) let messages = 0 let found = false for await (const data of q) { messages++ - if (data.id && data.id.equals(targetNode.id)) { + if (data.from.id && data.from.id.equals(target)) { found = true break } @@ -27,13 +28,13 @@ tape('make bigger swarm', async function (t) { t.ok(found, 'found target in ' + messages + ' message(s)') - q = swarm[490].query(targetNode.id, 'find_node', null, { nodes: q.closestNodes }) + q = swarm[490].query({ command: 'find_node', target }, { nodes: q.closestNodes }) messages = 0 found = false for await (const data of q) { messages++ - if (data.id && data.id.equals(targetNode.id)) { + if (data.from.id && data.from.id.equals(target)) { found = true break } @@ -41,37 +42,15 @@ tape('make bigger swarm', async function (t) { t.ok(found, 'found target again in ' + messages + ' message(s)') - const { type, host, port } = swarm[490].remoteAddress() + const { firewalled, host, port } = swarm[490] - t.same(type, DHT.NAT_OPEN) + t.same(firewalled, false) t.same(port, swarm[490].address().port) t.ok(host) destroy(swarm) }) -tape('nat sample promise', async function (t) { - const swarm = await makeSwarm(5) - - const node = new DHT({ - bootstrap: [{ host: '127.0.0.1', port: swarm[0].address().port }] - }) - - let ready = false - node.ready().then(() => { - ready = true - }) - - await node.sampledNAT() - t.ok(node._nat.length >= 3, 'min 3 samples') - t.notOk(ready, 'before ready') - await node.ready() - t.ok(ready, 'after ready') - - node.destroy() - destroy(swarm) -}) - tape('commit after query', async function (t) { const swarm = await makeSwarm(100) @@ -82,16 +61,16 @@ tape('commit after query', async function (t) { if (req.command === 'before') { return req.reply(null) } - if (req.command === 'after' && req.commit) { + if (req.command === 'after' && req.token) { commits++ return req.reply(null) } }) } - const q = swarm[42].query(swarm[0].table.id, 'before', null, { + const q = swarm[42].query({ command: 'before', target: swarm[0].table.id }, { commit (m, dht, query) { - return dht.request(query.target, 'after', null, m.from, { token: m.token }) + return dht.request({ command: 'after', target: query.target, token: m.token }, m.from) } }) @@ -106,11 +85,11 @@ tape('map query stream', async function (t) { const swarm = await makeSwarm(10) const expected = [] - const q = swarm[0].query(swarm[0].table.id, 'find_node', null, { + const q = swarm[0].query({ command: 'find_node', target: swarm[0].table.id }, { map (data) { if (expected.length > 3) return null - expected.push(data.id) - return data.id + expected.push(data.from.id) + return data.from.id } }) @@ -119,7 +98,9 @@ tape('map query stream', async function (t) { await q.finished() + t.ok(expected.length > 0) t.same(buf, expected) + destroy(swarm) }) @@ -134,10 +115,10 @@ tape('timeouts', async function (t) { } }) - const q = a.query(Buffer.alloc(32), 'nope') + const q = a.query({ command: 'nope', target: Buffer.alloc(32) }) await q.finished() - t.same(tries, 4) + t.same(tries, 3) bootstrap.destroy() a.destroy() @@ -146,20 +127,24 @@ tape('timeouts', async function (t) { tape('shorthand commit', async function (t) { const swarm = await makeSwarm(40) + let tokens = 0 + let notTokens = 0 for (const node of swarm) { node.on('request', function (req) { - if (req.commit) tokens++ + if (req.token) tokens++ + else notTokens++ req.reply(null) }) } - const q = swarm[0].query(Buffer.alloc(32), 'nope', null, { commit: true }) + const q = swarm[0].query({ command: 'hello', target: Buffer.alloc(32) }, { commit: true }) await q.finished() t.same(tokens, 20) + t.ok(notTokens >= tokens) destroy(swarm) }) @@ -192,7 +177,7 @@ tape('timeouts when commiting', async function (t) { } }) - const q = a.query(Buffer.alloc(32), 'nope', null, { commit: true }) + const q = a.query({ command: 'nope', target: Buffer.alloc(32) }, { commit: true }) let error = null try { @@ -202,7 +187,7 @@ tape('timeouts when commiting', async function (t) { } t.ok(error, 'commit should fail') - t.same(tries, 4) + t.same(tries, 3) bootstrap.destroy() a.destroy() @@ -237,10 +222,10 @@ tape('addNode / nodes option', async function (t) { const bNodes = b.toArray() - t.deepEqual(bNodes, [{ host: '127.0.0.1', port: a.address().port }]) + t.same(bNodes, [{ host: '127.0.0.1', port: a.address().port }]) const responses = [] - for await (const data of b.query(a.id, 'hello')) { + for await (const data of b.query({ command: 'hello', target: a.id })) { responses.push(data) } @@ -249,7 +234,7 @@ tape('addNode / nodes option', async function (t) { const aNodes = a.toArray() - t.deepEqual(aNodes, [{ host: '127.0.0.1', port: b.address().port }]) + t.same(aNodes, [{ host: '127.0.0.1', port: b.address().port }]) a.destroy() b.destroy() @@ -259,7 +244,7 @@ tape('addNode / nodes option', async function (t) { tape('set bind', async function (t) { const port = await freePort() - const a = new DHT({ bind: port }) + const a = new DHT({ bind: port, firewalled: false }) await a.ready() t.same(a.address().port, port, 'bound to explicit port') @@ -290,8 +275,8 @@ function freePort () { } async function makeSwarm (n) { - const node = new DHT() - await node.bind(0) + const node = DHT.bootstrapper() + await node.ready() const all = [node] const bootstrap = ['localhost:' + node.address().port] while (all.length < n) {