You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 

336 lines
8.3 KiB

const { Readable } = require('streamx')
const peer = require('./peer')
const DONE = []
const DOWN = []
module.exports = class Query extends Readable {
constructor (dht, target, command, value, opts = {}) {
super()
this.dht = dht
this.k = this.dht.table.k
this.target = target
this.command = command
this.value = value
this.errors = 0
this.successes = 0
this.concurrency = opts.concurrency || this.dht.concurrency
this.inflight = 0
this.map = opts.map || defaultMap
this.maxSlow = opts.maxSlow === 0 ? 0 : (opts.maxSlow || 5)
this.closestReplies = []
this._slow = 0
this._slowdown = false
this._seen = new Map()
this._pending = []
this._fromTable = false
this._commit = opts.commit === true ? autoCommit : (opts.commit || null)
this._commiting = false
this._onvisitbound = this._onvisit.bind(this)
this._onerrorbound = this._onerror.bind(this)
this._oncyclebound = this._oncycle.bind(this)
const nodes = opts.nodes || opts.closestNodes
const replies = opts.replies || opts.closestReplies
// add them reverse as we pop below
if (nodes) {
for (let i = nodes.length - 1; i >= 0; i--) {
const node = nodes[i]
this._addPending({ id: node.id || peer.id(node.host, node.port), host: node.host, port: node.port }, null)
}
} else if (replies) {
for (let i = replies.length - 1; i >= 0; i--) {
this._addPending(replies[i].from, null)
}
}
}
get closestNodes () {
const nodes = new Array(this.closestReplies.length)
for (let i = 0; i < nodes.length; i++) {
nodes[i] = this.closestReplies[i].from
}
return nodes
}
finished () {
return new Promise((resolve, reject) => {
const self = this
let error = null
this.resume()
this.on('error', onerror)
this.on('close', onclose)
function onclose () {
self.removeListener('error', onerror)
self.removeListener('close', onclose)
if (error) reject(error)
else resolve()
}
function onerror (err) {
error = err
}
})
}
_addFromTable () {
if (this._pending.length >= this.k) return
this._fromTable = true
const closest = this.dht.table.closest(this.target, this.k - this._pending.length)
for (const node of closest) {
this._addPending({ id: node.id, host: node.host, port: node.port }, null)
}
}
_open (cb) {
this._addFromTable()
if (this._pending.length >= this.k) return cb(null)
this.dht._resolveBootstrapNodes((bootstrapNodes) => {
for (const node of bootstrapNodes) {
this._addPending(node, null)
}
cb(null)
})
}
_isCloser (id) {
return this.closestReplies.length < this.k || this._compare(id, this.closestReplies[this.closestReplies.length - 1].from.id) < 0
}
_addPending (node, ref) {
const addr = node.host + ':' + node.port
const refs = this._seen.get(addr)
const isCloser = this._isCloser(node.id)
if (refs === DONE) {
return isCloser
}
if (refs === DOWN) {
if (ref) this._downHint(ref, node)
return isCloser
}
if (refs) {
if (ref !== null) refs.push(ref)
return isCloser
}
if (!isCloser) {
return false
}
this._seen.set(addr, ref === null ? [] : [ref])
this._pending.push(node)
return true
}
_read (cb) {
this._readMore()
cb(null)
}
_readMore () {
if (this.destroying || this._commiting) return
const concurrency = (this._slowdown ? 3 : this.concurrency) + this._slow
while (this.inflight < concurrency && this._pending.length > 0) {
const next = this._pending.pop()
if (next && next.id && !this._isCloser(next.id)) continue
this._visit(next)
}
// if reusing closest nodes, slow down after the first readMore tick to allow
// the closest node a chance to reply before going broad to question more
if (!this._fromTable && this.successes === 0 && this.errors === 0) {
this._slowdown = true
}
if (this._pending.length > 0) return
// if no inflight OR all the queries we are waiting on are marked as slow (within our limits) and we have a full result.
if (this.inflight === 0 || (this._slow <= this.maxSlow && this._slow === this.inflight && this.closestReplies.length >= this.k)) {
// if more than 3/4 failed and we only used cached nodes, try again from the routing table
if (!this._fromTable && this.successes < this.k / 4) {
this._addFromTable()
this._readMore()
return
}
this._flush()
}
}
_flush () {
if (this._commiting) return
this._commiting = true
if (this._commit === null) {
this.push(null)
return
}
const p = []
for (const m of this.closestReplies) p.push(this._commit(m, this.dht, this))
this._endAfterCommit(p)
}
_endAfterCommit (ps) {
if (!ps.length) {
this.destroy(new Error('Too few nodes responded'))
return
}
const self = this
let pending = ps.length
let success = 0
for (const p of ps) p.then(ondone, onerror)
function ondone () {
success++
if (--pending === 0) self.push(null)
}
function onerror (err) {
if (--pending > 0) return
if (success) self.push(null)
else self.destroy(err)
}
}
_dec (req) {
if (req.oncycle === noop) {
this._slow--
} else {
req.oncycle = noop
}
this.inflight--
}
_onvisit (m, req) {
this._dec(req)
const addr = req.to.host + ':' + req.to.port
this._seen.set(addr, DONE)
if (this._commiting) return
if (m.error === 0) this.successes++
else this.errors++
if (m.error === 0 && m.from.id !== null && this._isCloser(m.from.id)) this._pushClosest(m)
if (m.closerNodes !== null) {
for (const node of m.closerNodes) {
node.id = peer.id(node.host, node.port)
if (node.id.equals(this.dht.table.id)) continue
// TODO: we could continue here instead of breaking to ensure that one of the nodes in the closer list
// is later marked as DOWN that we gossip that back
if (!this._addPending(node, m.from)) break
}
}
if (!this._fromTable && this.successes + this.errors >= this.concurrency) {
this._slowdown = false
}
if (m.error !== 0) {
this._readMore()
return
}
const data = this.map(m)
if (!data || this.push(data) !== false) {
this._readMore()
}
}
_onerror (_, req) {
const addr = req.to.host + ':' + req.to.port
const refs = this._seen.get(addr)
this._seen.set(addr, DOWN)
for (const node of refs) this._downHint(node, req.to)
this._dec(req)
this.errors++
this._readMore()
}
_oncycle (req) {
req.oncycle = noop
this._slow++
this._readMore()
}
_downHint (node, down) {
const state = { start: 0, end: 6, buffer: Buffer.allocUnsafe(6) }
peer.ipv4.encode(state, down)
this.dht._request(node, 'down_hint', null, state.buffer, noop, noop)
}
_pushClosest (m) {
this.closestReplies.push(m)
for (let i = this.closestReplies.length - 2; i >= 0; i--) {
const prev = this.closestReplies[i]
const cmp = this._compare(prev.from.id, m.from.id)
// if sorted, done!
if (cmp < 0) break
// if dup, splice it out (rare)
if (cmp === 0) {
this.closestReplies.splice(i + 1, 1)
break
}
// swap and continue down
this.closestReplies[i + 1] = prev
this.closestReplies[i] = m
}
if (this.closestReplies.length > this.k) this.closestReplies.pop()
}
_compare (a, b) {
for (let i = 0; i < a.length; i++) {
if (a[i] === b[i]) continue
const t = this.target[i]
return (t ^ a[i]) - (t ^ b[i])
}
return 0
}
_visit (to) {
this.inflight++
const req = this.dht._request(to, this.command, this.target, this.value, this._onvisitbound, this._onerrorbound)
if (req === null) {
this.destroy(new Error('Node was destroyed'))
return
}
req.oncycle = this._oncyclebound
}
}
function autoCommit (reply, dht, query) {
if (!reply.token) return Promise.reject(new Error('No token received for closest node'))
return dht.request({ token: reply.token, target: query.target, command: query.command, value: query.value }, reply.from)
}
function defaultMap (m) {
return m
}
function noop () {}