diff --git a/index.js b/index.js index 8cb0f01..de461cb 100644 --- a/index.js +++ b/index.js @@ -42,11 +42,11 @@ class Request { } error (code) { - this.dht._reply(this.rpc, this.tid, this.target, code, null, false, this.from) + return this.dht._reply(this.rpc, this.tid, this.target, code, null, false, this.from) } reply (value, token = true) { - this.dht._reply(this.rpc, this.tid, this.target, 0, value, token, this.from) + return this.dht._reply(this.rpc, this.tid, this.target, 0, value, token, this.from) } } @@ -88,7 +88,7 @@ class DHT extends EventEmitter { this.table.on('row', (row) => row.on('full', (node) => this._onfullrow(node, row))) } - get id () { + get nodeId () { return this.table.id } @@ -137,6 +137,17 @@ class DHT extends EventEmitter { return race(p, min, opts.max) } + // TODO: make this more smart - ie don't retry the first one etc etc + async requestAny (target, command, value, nodes, opts) { + for (const node of nodes) { + try { + return await this.request(target, command, value, node, opts) + } catch {} + } + + throw new Error('All requests failed') + } + destroy () { this.rpc.destroy() clearInterval(this._tickInterval) @@ -173,7 +184,7 @@ class DHT extends EventEmitter { refresh () { const node = this.table.random() - this._backgroundQuery(node ? node.id : this.table.id, 'find_node', null) + this._backgroundQuery(node ? node.nodeId : this.table.id, 'find_node', null) } _pingSomeBootstrapNodes () { @@ -185,7 +196,7 @@ class DHT extends EventEmitter { this._pingBootstrapTicks = REFRESH_TICKS const nodes = this.table.closest(PING_BOOTSTRAP, 1) - if (nodes.length === 0 || compare(PING_BOOTSTRAP, this.table.id, nodes[0].id) > 0) { + if (nodes.length === 0 || compare(PING_BOOTSTRAP, this.table.id, nodes[0].nodeId) > 0) { return } @@ -194,7 +205,7 @@ class DHT extends EventEmitter { q.on('close', () => { if (q.closest.length === 0) return - if (compare(PING_BOOTSTRAP, this.table.id, q.closest[q.closest.length - 1].id) > 0) { + if (compare(PING_BOOTSTRAP, this.table.id, q.closest[q.closest.length - 1].nodeId) > 0) { return } @@ -309,7 +320,7 @@ class DHT extends EventEmitter { _repingMaybe () { while (this._repinging < 3 && this._reping.isEmpty() === false) { const { newNode, row } = this._reping.shift() - if (this.table.get(newNode.id)) continue + if (this.table.get(newNode.nodeId)) continue let oldest = null for (const node of row.nodes) { @@ -331,7 +342,7 @@ class DHT extends EventEmitter { this.ping(oldNode).then(onsuccess, onswap) function onsuccess (m) { - if (m.nodeId === null || !m.nodeId.equals(oldNode.id)) return onswap() + if (m.nodeId === null || !m.nodeId.equals(oldNode.nodeId)) return onswap() self._repinging-- self._repingMaybe() } @@ -352,14 +363,14 @@ class DHT extends EventEmitter { for (const node of this.bootstrapNodes) { dns.lookup(node.host, (_, host) => { - if (host) nodes.push({ id: node.id || null, host, port: node.port }) + if (host) nodes.push({ nodeId: node.nodeId || null, host, port: node.port }) if (--missing === 0) done(nodes) }) } } _addNode (node) { - if (this.nodes.has(node) || node.id.equals(this.table.id)) return + if (this.nodes.has(node) || node.nodeId.equals(this.table.id)) return node.added = node.seen = this._tick @@ -369,14 +380,14 @@ class DHT extends EventEmitter { } _maybeRemoveNode (node, expectedId) { - if (expectedId !== null && expectedId.equals(node.id)) return + if (expectedId !== null && expectedId.equals(node.nodeId)) return this._removeNode(node) } _removeNode (node) { if (!this.nodes.has(node)) return - this.table.remove(node.id) + this.table.remove(node.nodeId) this.nodes.remove(node) this.emit('remove-node', node) @@ -398,10 +409,11 @@ class DHT extends EventEmitter { this._nat.add(m.to) this._addNode({ - id: m.nodeId, + nodeId: m.nodeId, token: null, port: m.from.port, host: m.from.host, + id: m.nodeId, // alias for nodeId as the routing table expects that added: this._tick, seen: this._tick, prev: null, @@ -459,8 +471,7 @@ class DHT extends EventEmitter { _reply (rpc, tid, target, status, value, token, to) { const closerNodes = target ? this.table.closest(target) : null const persistent = !this.ephemeral && rpc === this.rpc - - rpc.send({ + const reply = { version: 1, tid, from: null, @@ -472,7 +483,10 @@ class DHT extends EventEmitter { command: null, status, value - }) + } + + rpc.send(reply) + return reply } } @@ -492,7 +506,7 @@ function parseNode (s) { if (!port) throw new Error('Node format is id@?host:port') return { - id: id ? Buffer.from(id.slice(0, -1), 'hex') : null, + nodeId: id ? Buffer.from(id.slice(0, -1), 'hex') : null, host, port: Number(port.slice(1)) } diff --git a/lib/messages.js b/lib/messages.js index dabc407..209bea0 100644 --- a/lib/messages.js +++ b/lib/messages.js @@ -38,13 +38,13 @@ const dhtPeerIPv4 = exports.dhtPeerIPv4 = { state.end += 6 + 32 }, encode (state, peer) { - cenc.fixed32.encode(state, peer.id) + cenc.fixed32.encode(state, peer.nodeId) IPv4.encode(state, peer.host) cenc.uint16.encode(state, peer.port) }, decode (state) { return { - id: cenc.fixed32.decode(state), + nodeId: cenc.fixed32.decode(state), host: IPv4.decode(state), port: cenc.uint16.decode(state) } diff --git a/lib/query.js b/lib/query.js index 4a51d43..865b72a 100644 --- a/lib/query.js +++ b/lib/query.js @@ -15,8 +15,8 @@ module.exports = class Query extends Readable { this.concurrency = opts.concurrency || this.k this.inflight = 0 this.map = opts.map || defaultMap - this.closest = [] + this._closestReplies = [] this._slowdown = false this._seen = new Set() this._pending = [] @@ -32,11 +32,15 @@ module.exports = class Query extends Readable { // add them reverse as we pop below for (let i = nodes.length - 1; i >= 0; i--) { const node = nodes[i] - this._addPending(node.id, node.host, node.port) + this._addPending(node.nodeId, node.host, node.port) } } } + get closest () { + return this._closestReplies.map(r => ({ nodeId: r.nodeId, host: r.from.host, port: r.from.port })) + } + finished () { return new Promise((resolve, reject) => { const self = this @@ -61,7 +65,7 @@ module.exports = class Query extends Readable { async commit (command = this.command, value = this.value, opts) { if (typeof command === 'object' && command) return this.commit(undefined, undefined, command) - return this.dht.requestAll(this.target, command, value, this.closest, opts) + return this.dht.requestAll(this.target, command, value, this._closestReplies, opts) } async toArray () { @@ -78,7 +82,7 @@ module.exports = class Query extends Readable { const closest = this.dht.table.closest(this.target, this.k - this._pending.length) for (const node of closest) { - this._addPending(node.id, node.host, node.port) + this._addPending(node.nodeId, node.host, node.port) } } @@ -88,22 +92,22 @@ module.exports = class Query extends Readable { this.dht._resolveBootstrapNodes((bootstrapNodes) => { for (const node of bootstrapNodes) { - this._addPending(node.id, node.host, node.port) + this._addPending(node.nodeId, node.host, node.port) } cb(null) }) } _isCloser (id) { - return this.closest.length < this.k || this._compare(id, this.closest[this.closest.length - 1].id) < 0 + return this._closestReplies.length < this.k || this._compare(id, this._closestReplies[this._closestReplies.length - 1].nodeId) < 0 } - _addPending (id, host, port) { - if (id && !this._isCloser(id)) return + _addPending (nodeId, host, port) { + if (nodeId && !this._isCloser(nodeId)) return const addr = host + ':' + port if (this._seen.has(addr)) return this._seen.add(addr) - this._pending.push({ id, host, port }) + this._pending.push({ nodeId, host, port }) } _read (cb) { @@ -118,7 +122,7 @@ module.exports = class Query extends Readable { while (this.inflight < concurrency && this._pending.length > 0) { const next = this._pending.pop() - if (next && next.id && !this._isCloser(next.id)) continue + if (next && next.nodeId && !this._isCloser(next.nodeId)) continue this._visit(next) } @@ -145,7 +149,7 @@ module.exports = class Query extends Readable { return } - if (!this.closest.length) { + if (!this._closestReplies.length) { this.destroy(new Error('Too few nodes responded')) return } @@ -154,7 +158,7 @@ module.exports = class Query extends Readable { this._commiting = true const p = [] - for (const node of this.closest) p.push(this._commit(this.dht, this.target, node)) + for (const node of this._closestReplies) p.push(this._commit(this.dht, this.target, node)) race(p, 1, p.length) .then(() => this.push(null), (err) => this.destroy(err)) @@ -167,20 +171,13 @@ module.exports = class Query extends Readable { this.inflight-- if (m.nodeId !== null && this._isCloser(m.nodeId)) { - const node = { - id: m.nodeId, - token: m.token, - port: m.from.port, - host: m.from.host - } - - this._pushClosest(node) + this._pushClosest(m) } if (m.closerNodes !== null) { for (const node of m.closerNodes) { - if (node.id.equals(this.dht.table.id)) continue - this._addPending(node.id, node.host, node.port) + if (node.nodeId.equals(this.dht.table.id)) continue + this._addPending(node.nodeId, node.host, node.port) } } @@ -206,22 +203,22 @@ module.exports = class Query extends Readable { } _pushClosest (node) { - this.closest.push(node) - for (let i = this.closest.length - 2; i >= 0; i--) { - const prev = this.closest[i] - const cmp = this._compare(prev.id, node.id) + this._closestReplies.push(node) + for (let i = this._closestReplies.length - 2; i >= 0; i--) { + const prev = this._closestReplies[i] + const cmp = this._compare(prev.nodeId, node.nodeId) // if sorted, done! if (cmp < 0) break // if dup, splice it out (rare) if (cmp === 0) { - this.closest.splice(i + 1, 1) + this._closestReplies.splice(i + 1, 1) break } // swap and continue down - this.closest[i + 1] = prev - this.closest[i] = node + this._closestReplies[i + 1] = prev + this._closestReplies[i] = node } - if (this.closest.length > this.k) this.closest.pop() + if (this._closestReplies.length > this.k) this._closestReplies.pop() } _compare (a, b) { diff --git a/lib/rpc.js b/lib/rpc.js index 5a481a2..95c0fcf 100644 --- a/lib/rpc.js +++ b/lib/rpc.js @@ -238,13 +238,13 @@ module.exports = class RPC { for (let i = 0; i < this.inflight.length; i++) { const req = this.inflight[i] - if (--req.timeout >= 0) continue + if (req.tries > 0 && --req.timeout >= 0) continue req.timeout = 2 if (req.tries++ > this.maxRetries) { if (i === this.inflight.length - 1) this.inflight.pop() else this.inflight[i] = this.inflight.pop() - req.reject(new Error('Request timed out')) + req.reject(createTimeoutError()) continue } @@ -263,6 +263,12 @@ module.exports = class RPC { } } +function createTimeoutError () { + const err = new Error('Request timed out') + err.status = 0 + return err +} + function createStatusError (status) { const err = new Error('Request failed with status ' + status) err.status = status diff --git a/test.js b/test.js index d56a37e..2ec5363 100644 --- a/test.js +++ b/test.js @@ -12,13 +12,13 @@ tape('make bigger swarm', async function (t) { const targetNode = swarm[25] - let q = swarm[499].query(targetNode.id, 'find_node', null) + let q = swarm[499].query(targetNode.nodeId, 'find_node', null) let messages = 0 let found = false for await (const data of q) { messages++ - if (data.nodeId.equals(targetNode.id)) { + if (data.nodeId.equals(targetNode.nodeId)) { found = true break } @@ -26,13 +26,13 @@ tape('make bigger swarm', async function (t) { t.ok(found, 'found target in ' + messages + ' message(s)') - q = swarm[490].query(targetNode.id, 'find_node', null, { closest: q.closest }) + q = swarm[490].query(targetNode.nodeId, 'find_node', null, { closest: q.closest }) messages = 0 found = false for await (const data of q) { messages++ - if (data.nodeId.equals(targetNode.id)) { + if (data.nodeId.equals(targetNode.nodeId)) { found = true break } @@ -67,8 +67,8 @@ tape('commit after query', async function (t) { } const q = swarm[42].query(swarm[0].table.id, 'before', null, { - commit (node, target, to) { - return node.request(target, 'after', null, to) + commit (node, target, m) { + return node.request(target, 'after', null, { token: m.token, ...m.from }) } })