From cbd9b8dcb09affc16b7dec5a49c30b2f925130b9 Mon Sep 17 00:00:00 2001 From: Mathias Buus Date: Fri, 28 May 2021 22:43:15 +0200 Subject: [PATCH] tweak query api --- README.md | 25 ++++++++++++++++++++----- index.js | 21 +++++++-------------- lib/query.js | 52 +++++++++++++++++++++++++++++++--------------------- test.js | 6 +++--- 4 files changed, 61 insertions(+), 43 deletions(-) diff --git a/README.md b/README.md index b0f725c..137bb04 100644 --- a/README.md +++ b/README.md @@ -50,7 +50,7 @@ function createNode () { node.on('request', function (req) { if (req.command === 'values') { - if (req.commit) { // if we are the closest node store the value + if (req.commit) { // if we are the closest node store the value (ie the node sent a roundtrip token) const key = sha256(req.value).toString('hex') values.set(key, req.value) console.log('Storing', key, '-->', req.value.toString()) @@ -184,7 +184,8 @@ Emitted when an incoming DHT request is received. This is where you can add your * `req.target` - the dht target the peer is looking (routing is handled behind the scene) * `req.command` - the RPC command name * `req.value` - the RPC value buffer -* `req.commit` - boolean if you are the closest node and the remote's from address was verified +* `req.token` - If the remote peer echoed back a valid roundtrip token, proving their "from address" this is set +* `req.commit` - Boolean set as a convenience if a valid token was provided * `req.from` - who sent this request (host, port) To reply to a request use the `req.reply(value)` method and to reply with an error code use `req.error(errorCode)`. @@ -202,6 +203,18 @@ Those are: Send a request to a specific node specified by the to address (`{ host, port }`). +Options include: + +```js +{ + token: roundtripTokenFromAReply, + retry: true, // whether the request should retry on timeout + expectOk: true // expect the reply to have status 0 or error +} +``` + +Normally you'd set the token when commiting to the dht in the query's commit hook. + #### `reply = await node.ping(to)` Sugar for `dht.request(null, 'ping', null, to)` @@ -233,8 +246,10 @@ that is called for each close reply. ``` js { - async commit (closestNode, dht, query) { - await dht.request(myTarget, myCommand, myValue, closestNode) + async commit (closestReply, dht, query) { + // normally you'd send back the roundtrip token here, to prove to the remote that you own + // your ip/port + await dht.request(myTarget, myCommand, myValue, closestReply.from, { token: closestReply.token }) } } ``` @@ -256,7 +271,7 @@ Other options include: The query method returns a stream encapsulating the query, that is also an async iterator. Each `data` event contain a DHT reply. If you just want to wait for the query to finish, you can use the `await stream.finished()` helper. After completion the closest -nodes are stored in `stream.closest` array. +nodes are stored in `stream.closestNodes` array. #### `node.destroy()` diff --git a/index.js b/index.js index 5e9dfeb..1e311ec 100644 --- a/index.js +++ b/index.js @@ -137,7 +137,7 @@ class DHT extends EventEmitter { request (target, command, value, to, opts) { const ephemeral = this.ephemeral || !!(opts && opts.socket !== this.rpc.socket) - const token = to.token || (opts && opts.token) || null + const token = (opts && opts.token) || null return this.rpc.request({ version: 2, @@ -163,17 +163,6 @@ class DHT extends EventEmitter { return race(p, min, opts.max) } - // TODO: make this more smart - ie don't retry the first one etc etc - async requestAny (target, command, value, nodes, opts) { - for (const node of nodes) { - try { - return await this.request(target, command, value, node, opts) - } catch {} - } - - throw new Error('All requests failed') - } - destroy () { this.rpc.destroy() if (this._resolveSampled !== null) { @@ -251,9 +240,9 @@ class DHT extends EventEmitter { const q = this._backgroundQuery(PING_BOOTSTRAP, 'find_node', null) q.on('close', () => { - if (q.closest.length === 0) return + if (q.closestReplies.length === 0) return - if (compare(PING_BOOTSTRAP, this.table.id, q.closest[q.closest.length - 1].id) > 0) { + if (compare(PING_BOOTSTRAP, this.table.id, q.closestReplies[q.closestReplies.length - 1].id) > 0) { return } @@ -630,6 +619,10 @@ class DHT extends EventEmitter { this.rpc.send(reply, socket) return reply } + + static id (node, out) { + return nodeId(node.host, node.port, out) + } } DHT.OK = 0 diff --git a/lib/query.js b/lib/query.js index 7de7d49..4b5f942 100644 --- a/lib/query.js +++ b/lib/query.js @@ -16,7 +16,7 @@ module.exports = class Query extends Readable { this.concurrency = opts.concurrency || this.dht.concurrency this.inflight = 0 this.map = opts.map || defaultMap - this.closest = [] + this.closestReplies = [] this._slowdown = false this._seen = new Set() @@ -28,7 +28,7 @@ module.exports = class Query extends Readable { this._commiting = false this._ropts = { socket: (opts && opts.socket) || this.dht.rpc.socket, expectOk: false } - const nodes = opts.nodes || opts.closest + const nodes = opts.nodes || opts.closestNodes if (nodes) { // add them reverse as we pop below @@ -39,6 +39,22 @@ module.exports = class Query extends Readable { } } + get closestNodes () { + const nodes = new Array(this.closestReplies.length) + + for (let i = 0; i < nodes.length; i++) { + const c = this.closestReplies[i] + + nodes[i] = { + id: c.id, + host: c.from.host, + port: c.from.port + } + } + + return nodes + } + finished () { return new Promise((resolve, reject) => { const self = this @@ -85,7 +101,7 @@ module.exports = class Query extends Readable { } _isCloser (id) { - return this.closest.length < this.k || this._compare(id, this.closest[this.closest.length - 1].id) < 0 + return this.closestReplies.length < this.k || this._compare(id, this.closestReplies[this.closestReplies.length - 1].id) < 0 } _addPending (id, host, port) { @@ -136,7 +152,7 @@ module.exports = class Query extends Readable { return } - if (!this.closest.length) { + if (!this.closestReplies.length) { this.destroy(new Error('Too few nodes responded')) return } @@ -145,7 +161,7 @@ module.exports = class Query extends Readable { this._commiting = true const p = [] - for (const node of this.closest) p.push(this._commit(node, this.dht, this)) + for (const m of this.closestReplies) p.push(this._commit(m, this.dht, this)) race(p, 1, p.length) .then(() => this.push(null), (err) => this.destroy(err)) @@ -158,13 +174,7 @@ module.exports = class Query extends Readable { this.inflight-- if (m.status === 0 && m.id !== null && this._isCloser(m.id)) { - this._pushClosest({ - id: m.id, - host: m.from.host, - port: m.from.port, - token: m.token, - to: m.to - }) + this._pushClosest(m) } if (m.closerNodes !== null) { @@ -196,23 +206,23 @@ module.exports = class Query extends Readable { this._readMore() } - _pushClosest (node) { - this.closest.push(node) - for (let i = this.closest.length - 2; i >= 0; i--) { - const prev = this.closest[i] - const cmp = this._compare(prev.id, node.id) + _pushClosest (m) { + this.closestReplies.push(m) + for (let i = this.closestReplies.length - 2; i >= 0; i--) { + const prev = this.closestReplies[i] + const cmp = this._compare(prev.id, m.id) // if sorted, done! if (cmp < 0) break // if dup, splice it out (rare) if (cmp === 0) { - this.closest.splice(i + 1, 1) + this.closestReplies.splice(i + 1, 1) break } // swap and continue down - this.closest[i + 1] = prev - this.closest[i] = node + this.closestReplies[i + 1] = prev + this.closestReplies[i] = m } - if (this.closest.length > this.k) this.closest.pop() + if (this.closestReplies.length > this.k) this.closestReplies.pop() } _compare (a, b) { diff --git a/test.js b/test.js index 063945a..2362168 100644 --- a/test.js +++ b/test.js @@ -26,7 +26,7 @@ tape('make bigger swarm', async function (t) { t.ok(found, 'found target in ' + messages + ' message(s)') - q = swarm[490].query(targetNode.id, 'find_node', null, { closest: q.closest }) + q = swarm[490].query(targetNode.id, 'find_node', null, { nodes: q.closestNodes }) messages = 0 found = false @@ -89,8 +89,8 @@ tape('commit after query', async function (t) { } const q = swarm[42].query(swarm[0].table.id, 'before', null, { - commit (node, dht, query) { - return dht.request(query.target, 'after', null, node) + commit (m, dht, query) { + return dht.request(query.target, 'after', null, m.from, { token: m.token }) } })