const dns = require('dns') const RPC = require('./lib/rpc') const Query = require('./lib/query') const race = require('./lib/race') const nodeId = require('./lib/id') const NatAnalyzer = require('./lib/nat-analyzer') const Table = require('kademlia-routing-table') const TOS = require('time-ordered-set') const FIFO = require('fast-fifo/fixed-size') const sodium = require('sodium-universal') const dgram = require('dgram') const { EventEmitter } = require('events') const TICK_INTERVAL = 5000 const SLEEPING_INTERVAL = 3 * TICK_INTERVAL const PERSISTENT_TICKS = 240 // if nothing major bad happens in ~20mins we can consider this node stable (if nat is friendly) const MORE_PERSISTENT_TICKS = 3 * PERSISTENT_TICKS const REFRESH_TICKS = 60 // refresh every ~5min when idle const RECENT_NODE = 20 // we've heard from a node less than 1min ago const OLD_NODE = 360 // if an node has been around more than 30 min we consider it old // this is the known id for figuring out if we should ping bootstrap nodes const PING_BOOTSTRAP = Buffer.allocUnsafe(32) sodium.crypto_generichash(PING_BOOTSTRAP, Buffer.from('ping bootstrap')) class Request { constructor (dht, m) { this.dht = dht this.tid = m.tid this.from = m.from = this.token = m.token = this.closerNodes = m.closerNodes this.status = m.status this.command = m.command this.value = m.value } get commit () { return this.token !== null } error (code, opts = {}) { return this.send(code, null, opts) } reply (value, opts = {}) { return this.send(0, value, opts) } send (status, value, opts) { const target = opts.closerNodes === false ? null : const token = opts.token !== false return this.dht._reply(this.tid, target, status, value, this.from, token, opts.socket) } } class DHT extends EventEmitter { constructor (opts = {}) { super() this.bootstrapNodes = opts.bootstrap === false ? [] : (opts.bootstrap || []).map(parseNode) this.nodes = new TOS() // this is just the private id, see below in the persistence handler this.table = new Table( || randomBytes(32)) this.rpc = new RPC({ bind: opts.bind, maxWindow: opts.maxWindow, socket: opts.socket, onwarning: opts.onwarning, onrequest: this._onrequest.bind(this), onresponse: this._onresponse.bind(this) }) this.bootstrapped = false this.concurrency = opts.concurrency || this.table.k this.ephemeral = true this.adaptive = typeof opts.ephemeral !== 'boolean' && opts.adaptive !== false this.clientOnly = !this.adaptive && opts.ephemeral !== false this._forcePersistent = opts.ephemeral === false this._repinging = 0 this._reping = new FIFO(128) this._bootstrapping = this.bootstrap() this._localSocket = opts.localSocket || null // used for auto configuring nat sessions this._tick = randomOffset(100) // make sure to random offset all the network ticks this._refreshTicks = randomOffset(REFRESH_TICKS) this._pingBootstrapTicks = randomOffset(REFRESH_TICKS) this._persistentTicks = this.adaptive ? PERSISTENT_TICKS : 0 this._tickInterval = setInterval(this._ontick.bind(this), TICK_INTERVAL) this._lastTick = this._nat = new NatAnalyzer(opts.natSampleSize || 16) this._onrow = (row) => row.on('full', (node) => this._onfullrow(node, row)) this._rotateSecrets = false this._secrets = [ Buffer.alloc(32), Buffer.alloc(32) ] this._resolveSampled = null this._sampled = new Promise((resolve) => { this._resolveSampled = resolve }) sodium.randombytes_buf(this._secrets[0]) sodium.randombytes_buf(this._secrets[1]) this.table.on('row', this._onrow) this.rpc.socket.on('listening', () => this.emit('listening')) if (opts.nodes) { for (const node of opts.nodes) this.addNode(node) } } get id () { return this.ephemeral ? null : } onmessage (buf, rinfo) { if (buf.byteLength > 1) this.rpc.onmessage(false, buf, rinfo) } sampledNAT () { return this._sampled } ready () { return this._bootstrapping } query (target, command, value, opts) { this._refreshTicks = REFRESH_TICKS return new Query(this, target, command, value || null, opts) } ping (node) { return this.request(null, 'ping', null, node) } request (target, command, value, to, opts) { const ephemeral = this.ephemeral || !!(opts && opts.socket !== this.rpc.socket) const token = (opts && opts.token) || null return this.rpc.request({ version: 2, tid: 0, from: null, to, id: ephemeral ? null :, token, target, closerNodes: null, command, status: 0, value }, opts) } requestAll (target, command, value, nodes, opts = {}) { const min = typeof opts.min === 'number' ? opts.min : 1 if (nodes.length < min) return Promise.reject(new Error('Too few nodes to request')) const p = [] for (const node of nodes) p.push(this.request(target, command, value, node)) return race(p, min, opts.max) } destroy () { this.rpc.destroy() if (this._resolveSampled !== null) { this._resolveSampled(false) this._resolveSampled = null } if (this._localSocket) { this._localSocket.close() this._localSocket = null } clearInterval(this._tickInterval) } async bootstrap () { await Promise.resolve() // wait a tick, so apis can be used from the outside for (let i = 0; i < 2; i++) { await this._backgroundQuery(, 'find_node', null).finished() if (this.bootstrapped || !this._forcePersistent || !(await this._onpersistent())) break } if (this.bootstrapped) return this.bootstrapped = true if (this._resolveSampled !== null) { this._resolveSampled(true) this._resolveSampled = null } await this.rpc.bind() this.emit('ready') } _backgroundQuery (target, command, value) { const backgroundCon = Math.min(this.concurrency, Math.max(2, (this.concurrency / 8) | 0)) const q = this.query(target, command, value, { concurrency: backgroundCon }) q.on('data', () => { // yield to other traffic q.concurrency = this.rpc.inflightRequests < 3 ? this.concurrency : backgroundCon }) return q } _token (addr, i) { const token = Buffer.allocUnsafe(32) this._rotateSecrets = true // TODO: also add .port? sodium.crypto_generichash(token, Buffer.from(, this._secrets[i]) return token } refresh () { const node = this.table.random() this._backgroundQuery(node ? :, 'find_node', null) } _pingSomeBootstrapNodes () { // once in a while it can be good to ping the bootstrap nodes, since we force them to be ephemeral to lower their load // to make this a lightweight as possible we first check if we are the closest node we know to a known id (hash(ping bootstrap)) // and if so we issue a background query against that. if after doing this query we are still one of the closests nodes // we ping the bootstrapper - in practice this results to very little bootstrap ping traffic. this._pingBootstrapTicks = REFRESH_TICKS const nodes = this.table.closest(PING_BOOTSTRAP, 1) if (nodes.length === 0 || compare(PING_BOOTSTRAP,, nodes[0].id) > 0) { return } const q = this._backgroundQuery(PING_BOOTSTRAP, 'find_node', null) q.on('close', () => { if (q.closestReplies.length === 0) return if (compare(PING_BOOTSTRAP,, q.closestReplies[q.closestReplies.length - 1].id) > 0) { return } this._resolveBootstrapNodes((nodes) => { const node = nodes[(Math.random() * nodes.length) | 0], noop) }) }) } _pingSome () { let cnt = this.rpc.inflightRequests > 2 ? 3 : 5 let oldest = this.nodes.oldest // tiny dht, ping the bootstrap again if (!oldest) { this.refresh() return } // we've recently pinged the oldest one, so only trigger a couple of repings if ((this._tick - oldest.seen) < RECENT_NODE) { cnt = 2 } while (cnt--) { if (!oldest || this._tick === oldest.seen) continue this._check(oldest, oldest.seen) oldest = } } _check (node, lastSeen) { .then( () => this._removeStaleNode(node, lastSeen), () => this._removeNode(node) ) } _ontick () { if (this._rotateSecrets) { const tmp = this._secrets[0] this._secrets[0] = this._secrets[1] this._secrets[1] = tmp sodium.crypto_generichash(tmp, tmp) } const time = if (time - this._lastTick > SLEEPING_INTERVAL) { this._onwakeup() } else { this._tick++ } this._lastTick = time if (!this.bootstrapped) return if (this.adaptive && this.ephemeral && --this._persistentTicks <= 0) { this._onpersistent() // the promise returned here never fails so just ignore it } if ((this._tick & 7) === 0) { this._pingSome() } if (!this.ephemeral && --this._pingBootstrapTicks <= 0) { this._pingSomeBootstrapNodes() } if (((this._tick & 63) === 0 && this.nodes.length < this.table.k) || --this._refreshTicks <= 0) { this.refresh() } } async _onpersistent () { if (this.ephemeral === false) return false // only require one sample here as we do the nat check anyway after // makes settting up a dht easier... const addr = this.remoteAddress(1) if (addr.type !== DHT.NAT_PORT_CONSISTENT && addr.type !== DHT.NAT_OPEN) { this._persistentTicks = MORE_PERSISTENT_TICKS return false } if (await this._checkIfFirewalled()) return false if (!this.ephemeral) return false // incase it's called in parallel for some reason const id = nodeId(, addr.port) if ( return false const nodes = this.table.toArray() this.table = new Table(id) for (const node of nodes) this.table.add(node) this.table.on('row', this._onrow) this.ephemeral = false this.emit('persistent') return true } async _addBootstrapNodes (nodes) { return new Promise((resolve) => { this._resolveBootstrapNodes(function (bootstrappers) { nodes.push(...bootstrappers) resolve() }) }) } async _checkIfFirewalled () { const nodes = [] for (let node = this.nodes.latest; node && nodes.length < 5; node = node.prev) { nodes.push(node) } if (nodes.length < 5) await this._addBootstrapNodes(nodes) if (!nodes.length) return true // no nodes available, including bootstrappers - bail try { await this.requestAll(null, 'ping_nat', null, nodes, { min: nodes.length >= 5 ? 3 : 1, max: 3 }) } catch { // not enough nat pings succeded - assume firewalled return true } return false } _onwakeup () { this._tick += 2 * OLD_NODE // bump the tick enough that everything appears old. this._tick += 8 - (this._tick & 7) - 2 // triggers a series of pings in two ticks this._persistentTicks = MORE_PERSISTENT_TICKS this._pingBootstrapTicks = REFRESH_TICKS // forced ephemeral, so no need for a bootstrap ping soon this._refreshTicks = 1 // triggers a refresh next tick (allow network time to wake up also) if (this.adaptive) { this.ephemeral = true this.emit('ephemeral') } this.emit('wakeup') } _onfullrow (newNode, row) { if (this.bootstrapped && this._reping.push({ newNode, row })) this._repingMaybe() } _repingMaybe () { while (this._repinging < 3 && this._reping.isEmpty() === false) { const { newNode, row } = this._reping.shift() if (this.table.get( continue let oldest = null for (const node of row.nodes) { if (node.seen === this._tick) continue if (oldest === null || oldest.seen > node.seen || (oldest.seen === node.seen && oldest.added > node.added)) oldest = node } if (oldest === null) continue if ((this._tick - oldest.seen) < RECENT_NODE && (this._tick - oldest.added) > OLD_NODE) continue this._repingAndSwap(newNode, oldest) } } _repingAndSwap (newNode, oldNode) { const self = this const lastSeen = oldNode.seen this._repinging++, onswap) function onsuccess (m) { if (oldNode.seen <= lastSeen) return onswap() self._repinging-- self._repingMaybe() } function onswap () { self._repinging-- self._repingMaybe() self._removeNode(oldNode) self._addNode(newNode) } } _resolveBootstrapNodes (done) { if (!this.bootstrapNodes.length) return done([]) let missing = this.bootstrapNodes.length const nodes = [] for (const node of this.bootstrapNodes) { dns.lookup(, { family: 4 }, (_, host) => { if (host) nodes.push({ id: nodeId(host, node.port), host, port: node.port }) if (--missing === 0) done(nodes) }) } } _addNode (node) { if (this.nodes.has(node) || return node.added = node.seen = this._tick if (this.table.add(node)) this.nodes.add(node) this.emit('add-node', node) } _removeStaleNode (node, lastSeen) { if (node.seen <= lastSeen) this._removeNode(node) } _removeNode (node) { if (!this.nodes.has(node)) return this.table.remove( this.nodes.remove(node) this.emit('remove-node', node) } _addNodeFromMessage (m, sample) { const id = nodeId(, m.from.port) // verify id, if the id is mismatched it doesn't strictly mean the node is bad, could be // a weird NAT thing, that the node is unaware of - in any case it def means we should not // add this node to our routing table if (! { = null return } const oldNode = this.table.get(id) // refresh it, if we've seen this before if (oldNode) { // if the node is indicating that we got a new ip // make sure to add it again to the sampler. make sure we don't allow the remote node // to add multiple entries though. if (sample && ( === null || !== { = // TODO: would be technically better to add to the head of the sample queue, but // this is prop fine const s = this._nat.sample(m.from) if (s) { s.port = = } else { this._nat.add(, m.from) } } oldNode.seen = this._tick this.nodes.add(oldNode) return } // add a sample of our address from the remote nodes pov if (sample) this._nat.add(, m.from) this._addNode({ id, port: m.from.port, host:, token: null, // adding this so it has the same "shape" as the query nodes for easier debugging to: sample ? : null, added: this._tick, seen: this._tick, prev: null, next: null }) } _onrequest (req, sample) { // check if the roundtrip token is one we've generated within the last 10s for this peer if (req.token !== null && !this._token(req.from, 1).equals(req.token) && !this._token(req.from, 0).equals(req.token)) { req.token = null } if ( !== null) this._addNodeFromMessage(req, sample) else this._pingBootstrapTicks = REFRESH_TICKS // o/ if this node is ephemeral, it prob originated from a bootstrapper somehow so no need to ping them // echo the value back if (req.command === 'ping') { if (req.value === null || req.value.byteLength <= 32) this._reply(req.tid, null, 0, req.value, req.from, false) return } if (req.command === 'ping_nat') { if (!this._localSocket) this._localSocket = dgram.createSocket('udp4') this._reply(req.tid, null, 0, null, req.from, false, this._localSocket) return } // empty dht reply back if (req.command === 'find_node') { this._reply(req.tid,, 0, null, req.from, false) return } if (this.emit('request', new Request(this, req)) === false) { this._reply(req.tid,, 1, null, req.from, false) } } _onresponse (res, sample) { if ( !== null) this._addNodeFromMessage(res, sample) else if (sample && this._nat.length < 3 && this._nat.sample(res.from) === null) this._nat.add(, res.from) if (this._resolveSampled !== null && this._nat.length >= 3) { this._resolveSampled(true) this._resolveSampled = null } } bind (...args) { return this.rpc.bind(...args) } address () { return this.rpc.address() } remoteAddress (minSamples) { const result = this._nat.analyze(minSamples) if (result.type === NatAnalyzer.PORT_CONSISTENT && !this.ephemeral) result.type = DHT.NAT_OPEN return result } addNode ({ host, port }) { this._addNode({ id: nodeId(host, port), port, host, token: null, to: null, added: this._tick, seen: this._tick, prev: null, next: null }) } toArray () { return this.nodes.toArray().map(({ host, port }) => ({ host, port })) } _reply (tid, target, status, value, to, addToken, socket = this.rpc.socket) { const closerNodes = target ? this.table.closest(target) : null const ephemeral = socket !== this.rpc.socket || this.ephemeral const reply = { version: 2, tid, from: null, to, id: ephemeral ? null :, token: (!ephemeral && addToken) ? this._token(to, 1) : null, target: null, closerNodes, command: null, status, value } this.rpc.send(reply, socket) return reply } static id (node, out) { return nodeId(, node.port, out) } } DHT.OK = 0 DHT.UNKNOWN_COMMAND = 1 DHT.NAT_UNKNOWN = NatAnalyzer.UNKNOWN DHT.NAT_OPEN = Symbol.for('NAT_OPEN') // implies PORT_CONSISTENT DHT.NAT_PORT_CONSISTENT = NatAnalyzer.PORT_CONSISTENT DHT.NAT_PORT_INCREMENTING = NatAnalyzer.PORT_INCREMENTING DHT.NAT_PORT_RANDOMIZED = NatAnalyzer.PORT_RANDOMIZED module.exports = DHT function parseNode (s) { if (typeof s === 'object') return s const [host, port] = s.split(':') if (!port) throw new Error('Bootstrap node format is host:port') return { host, port: Number(port) } } function randomBytes (n) { const b = Buffer.alloc(n) sodium.randombytes_buf(b) return b } function noop () {} function compare (id, a, b) { for (let i = 0; i < id.length; i++) { if (a[i] === b[i]) continue const t = id[i] return (t ^ a[i]) - (t ^ b[i]) } return 0 } function randomOffset (n) { return n - ((Math.random() * 0.5 * n) | 0) }