Browse Source

tweak query api

session-estimator
Mathias Buus 4 years ago
parent
commit
cbd9b8dcb0
  1. 25
      README.md
  2. 21
      index.js
  3. 52
      lib/query.js
  4. 6
      test.js

25
README.md

@ -50,7 +50,7 @@ function createNode () {
node.on('request', function (req) { node.on('request', function (req) {
if (req.command === 'values') { if (req.command === 'values') {
if (req.commit) { // if we are the closest node store the value if (req.commit) { // if we are the closest node store the value (ie the node sent a roundtrip token)
const key = sha256(req.value).toString('hex') const key = sha256(req.value).toString('hex')
values.set(key, req.value) values.set(key, req.value)
console.log('Storing', key, '-->', req.value.toString()) console.log('Storing', key, '-->', req.value.toString())
@ -184,7 +184,8 @@ Emitted when an incoming DHT request is received. This is where you can add your
* `req.target` - the dht target the peer is looking (routing is handled behind the scene) * `req.target` - the dht target the peer is looking (routing is handled behind the scene)
* `req.command` - the RPC command name * `req.command` - the RPC command name
* `req.value` - the RPC value buffer * `req.value` - the RPC value buffer
* `req.commit` - boolean if you are the closest node and the remote's from address was verified * `req.token` - If the remote peer echoed back a valid roundtrip token, proving their "from address" this is set
* `req.commit` - Boolean set as a convenience if a valid token was provided
* `req.from` - who sent this request (host, port) * `req.from` - who sent this request (host, port)
To reply to a request use the `req.reply(value)` method and to reply with an error code use `req.error(errorCode)`. To reply to a request use the `req.reply(value)` method and to reply with an error code use `req.error(errorCode)`.
@ -202,6 +203,18 @@ Those are:
Send a request to a specific node specified by the to address (`{ host, port }`). Send a request to a specific node specified by the to address (`{ host, port }`).
Options include:
```js
{
token: roundtripTokenFromAReply,
retry: true, // whether the request should retry on timeout
expectOk: true // expect the reply to have status 0 or error
}
```
Normally you'd set the token when commiting to the dht in the query's commit hook.
#### `reply = await node.ping(to)` #### `reply = await node.ping(to)`
Sugar for `dht.request(null, 'ping', null, to)` Sugar for `dht.request(null, 'ping', null, to)`
@ -233,8 +246,10 @@ that is called for each close reply.
``` js ``` js
{ {
async commit (closestNode, dht, query) { async commit (closestReply, dht, query) {
await dht.request(myTarget, myCommand, myValue, closestNode) // normally you'd send back the roundtrip token here, to prove to the remote that you own
// your ip/port
await dht.request(myTarget, myCommand, myValue, closestReply.from, { token: closestReply.token })
} }
} }
``` ```
@ -256,7 +271,7 @@ Other options include:
The query method returns a stream encapsulating the query, that is also an async iterator. Each `data` event contain a DHT reply. The query method returns a stream encapsulating the query, that is also an async iterator. Each `data` event contain a DHT reply.
If you just want to wait for the query to finish, you can use the `await stream.finished()` helper. After completion the closest If you just want to wait for the query to finish, you can use the `await stream.finished()` helper. After completion the closest
nodes are stored in `stream.closest` array. nodes are stored in `stream.closestNodes` array.
#### `node.destroy()` #### `node.destroy()`

21
index.js

@ -137,7 +137,7 @@ class DHT extends EventEmitter {
request (target, command, value, to, opts) { request (target, command, value, to, opts) {
const ephemeral = this.ephemeral || !!(opts && opts.socket !== this.rpc.socket) const ephemeral = this.ephemeral || !!(opts && opts.socket !== this.rpc.socket)
const token = to.token || (opts && opts.token) || null const token = (opts && opts.token) || null
return this.rpc.request({ return this.rpc.request({
version: 2, version: 2,
@ -163,17 +163,6 @@ class DHT extends EventEmitter {
return race(p, min, opts.max) return race(p, min, opts.max)
} }
// TODO: make this more smart - ie don't retry the first one etc etc
async requestAny (target, command, value, nodes, opts) {
for (const node of nodes) {
try {
return await this.request(target, command, value, node, opts)
} catch {}
}
throw new Error('All requests failed')
}
destroy () { destroy () {
this.rpc.destroy() this.rpc.destroy()
if (this._resolveSampled !== null) { if (this._resolveSampled !== null) {
@ -251,9 +240,9 @@ class DHT extends EventEmitter {
const q = this._backgroundQuery(PING_BOOTSTRAP, 'find_node', null) const q = this._backgroundQuery(PING_BOOTSTRAP, 'find_node', null)
q.on('close', () => { q.on('close', () => {
if (q.closest.length === 0) return if (q.closestReplies.length === 0) return
if (compare(PING_BOOTSTRAP, this.table.id, q.closest[q.closest.length - 1].id) > 0) { if (compare(PING_BOOTSTRAP, this.table.id, q.closestReplies[q.closestReplies.length - 1].id) > 0) {
return return
} }
@ -630,6 +619,10 @@ class DHT extends EventEmitter {
this.rpc.send(reply, socket) this.rpc.send(reply, socket)
return reply return reply
} }
static id (node, out) {
return nodeId(node.host, node.port, out)
}
} }
DHT.OK = 0 DHT.OK = 0

52
lib/query.js

@ -16,7 +16,7 @@ module.exports = class Query extends Readable {
this.concurrency = opts.concurrency || this.dht.concurrency this.concurrency = opts.concurrency || this.dht.concurrency
this.inflight = 0 this.inflight = 0
this.map = opts.map || defaultMap this.map = opts.map || defaultMap
this.closest = [] this.closestReplies = []
this._slowdown = false this._slowdown = false
this._seen = new Set() this._seen = new Set()
@ -28,7 +28,7 @@ module.exports = class Query extends Readable {
this._commiting = false this._commiting = false
this._ropts = { socket: (opts && opts.socket) || this.dht.rpc.socket, expectOk: false } this._ropts = { socket: (opts && opts.socket) || this.dht.rpc.socket, expectOk: false }
const nodes = opts.nodes || opts.closest const nodes = opts.nodes || opts.closestNodes
if (nodes) { if (nodes) {
// add them reverse as we pop below // add them reverse as we pop below
@ -39,6 +39,22 @@ module.exports = class Query extends Readable {
} }
} }
get closestNodes () {
const nodes = new Array(this.closestReplies.length)
for (let i = 0; i < nodes.length; i++) {
const c = this.closestReplies[i]
nodes[i] = {
id: c.id,
host: c.from.host,
port: c.from.port
}
}
return nodes
}
finished () { finished () {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
const self = this const self = this
@ -85,7 +101,7 @@ module.exports = class Query extends Readable {
} }
_isCloser (id) { _isCloser (id) {
return this.closest.length < this.k || this._compare(id, this.closest[this.closest.length - 1].id) < 0 return this.closestReplies.length < this.k || this._compare(id, this.closestReplies[this.closestReplies.length - 1].id) < 0
} }
_addPending (id, host, port) { _addPending (id, host, port) {
@ -136,7 +152,7 @@ module.exports = class Query extends Readable {
return return
} }
if (!this.closest.length) { if (!this.closestReplies.length) {
this.destroy(new Error('Too few nodes responded')) this.destroy(new Error('Too few nodes responded'))
return return
} }
@ -145,7 +161,7 @@ module.exports = class Query extends Readable {
this._commiting = true this._commiting = true
const p = [] const p = []
for (const node of this.closest) p.push(this._commit(node, this.dht, this)) for (const m of this.closestReplies) p.push(this._commit(m, this.dht, this))
race(p, 1, p.length) race(p, 1, p.length)
.then(() => this.push(null), (err) => this.destroy(err)) .then(() => this.push(null), (err) => this.destroy(err))
@ -158,13 +174,7 @@ module.exports = class Query extends Readable {
this.inflight-- this.inflight--
if (m.status === 0 && m.id !== null && this._isCloser(m.id)) { if (m.status === 0 && m.id !== null && this._isCloser(m.id)) {
this._pushClosest({ this._pushClosest(m)
id: m.id,
host: m.from.host,
port: m.from.port,
token: m.token,
to: m.to
})
} }
if (m.closerNodes !== null) { if (m.closerNodes !== null) {
@ -196,23 +206,23 @@ module.exports = class Query extends Readable {
this._readMore() this._readMore()
} }
_pushClosest (node) { _pushClosest (m) {
this.closest.push(node) this.closestReplies.push(m)
for (let i = this.closest.length - 2; i >= 0; i--) { for (let i = this.closestReplies.length - 2; i >= 0; i--) {
const prev = this.closest[i] const prev = this.closestReplies[i]
const cmp = this._compare(prev.id, node.id) const cmp = this._compare(prev.id, m.id)
// if sorted, done! // if sorted, done!
if (cmp < 0) break if (cmp < 0) break
// if dup, splice it out (rare) // if dup, splice it out (rare)
if (cmp === 0) { if (cmp === 0) {
this.closest.splice(i + 1, 1) this.closestReplies.splice(i + 1, 1)
break break
} }
// swap and continue down // swap and continue down
this.closest[i + 1] = prev this.closestReplies[i + 1] = prev
this.closest[i] = node this.closestReplies[i] = m
} }
if (this.closest.length > this.k) this.closest.pop() if (this.closestReplies.length > this.k) this.closestReplies.pop()
} }
_compare (a, b) { _compare (a, b) {

6
test.js

@ -26,7 +26,7 @@ tape('make bigger swarm', async function (t) {
t.ok(found, 'found target in ' + messages + ' message(s)') t.ok(found, 'found target in ' + messages + ' message(s)')
q = swarm[490].query(targetNode.id, 'find_node', null, { closest: q.closest }) q = swarm[490].query(targetNode.id, 'find_node', null, { nodes: q.closestNodes })
messages = 0 messages = 0
found = false found = false
@ -89,8 +89,8 @@ tape('commit after query', async function (t) {
} }
const q = swarm[42].query(swarm[0].table.id, 'before', null, { const q = swarm[42].query(swarm[0].table.id, 'before', null, {
commit (node, dht, query) { commit (m, dht, query) {
return dht.request(query.target, 'after', null, node) return dht.request(query.target, 'after', null, m.from, { token: m.token })
} }
}) })

Loading…
Cancel
Save