You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 

241 lines
5.9 KiB

const { Readable } = require('streamx')
const race = require('./race')
const nodeId = require('./id')
module.exports = class Query extends Readable {
constructor (dht, target, command, value, opts = {}) {
super()
this.dht = dht
this.k = this.dht.table.k
this.target = target
this.command = command
this.value = value
this.errors = 0
this.successes = 0
this.concurrency = opts.concurrency || this.dht.concurrency
this.inflight = 0
this.map = opts.map || defaultMap
this.closest = []
this._slowdown = false
this._seen = new Set()
this._pending = []
this._onresolve = this._onvisit.bind(this)
this._onreject = this._onerror.bind(this)
this._fromTable = false
this._commit = opts.commit === true ? autoCommit : (opts.commit || null)
this._commiting = false
this._ropts = { socket: (opts && opts.socket) || this.dht.rpc.socket, expectOk: false }
const nodes = opts.nodes || opts.closest
if (nodes) {
// add them reverse as we pop below
for (let i = nodes.length - 1; i >= 0; i--) {
const node = nodes[i]
this._addPending(node.id || null, node.host, node.port)
}
}
}
finished () {
return new Promise((resolve, reject) => {
const self = this
let error = null
this.resume()
this.on('error', onerror)
this.on('close', onclose)
function onclose () {
self.removeListener('error', onerror)
self.removeListener('close', onclose)
if (error) reject(error)
else resolve()
}
function onerror (err) {
error = err
}
})
}
_addFromTable () {
if (this._pending.length >= this.k) return
this._fromTable = true
const closest = this.dht.table.closest(this.target, this.k - this._pending.length)
for (const node of closest) {
this._addPending(node.id, node.host, node.port)
}
}
_open (cb) {
this._addFromTable()
if (this._pending.length >= this.k) return cb(null)
this.dht._resolveBootstrapNodes((bootstrapNodes) => {
for (const node of bootstrapNodes) {
this._addPending(node.id, node.host, node.port)
}
cb(null)
})
}
_isCloser (id) {
return this.closest.length < this.k || this._compare(id, this.closest[this.closest.length - 1].id) < 0
}
_addPending (id, host, port) {
if (id && !this._isCloser(id)) return false
const addr = host + ':' + port
if (this._seen.has(addr)) return true
this._seen.add(addr)
this._pending.push({ id, host, port })
return true
}
_read (cb) {
this._readMore()
cb(null)
}
_readMore () {
if (this.destroying) return
const concurrency = this._slowdown ? 3 : this.concurrency
while (this.inflight < concurrency && this._pending.length > 0) {
const next = this._pending.pop()
if (next && next.id && !this._isCloser(next.id)) continue
this._visit(next)
}
// if reusing closest nodes, slow down after the first readMore tick to allow
// the closest node a chance to reply before going broad to question more
if (!this._fromTable && this.successes === 0 && this.errors === 0) {
this._slowdown = true
}
if (this.inflight === 0 && this._pending.length === 0) {
// if more than 3/4 failed and we only used cached nodes, try again from the routing table
if (!this._fromTable && this.successes < this.k / 4) {
this._addFromTable()
return this._readMore()
}
this._flush()
}
}
_flush () {
if (this._commit === null) {
this.push(null)
return
}
if (!this.closest.length) {
this.destroy(new Error('Too few nodes responded'))
return
}
if (this._commiting) return
this._commiting = true
const p = []
for (const node of this.closest) p.push(this._commit(node, this.dht, this))
race(p, 1, p.length)
.then(() => this.push(null), (err) => this.destroy(err))
}
_onvisit (m) {
if (m.status === 0) this.successes++
else this.errors++
this.inflight--
if (m.status === 0 && m.id !== null && this._isCloser(m.id)) {
this._pushClosest({
id: m.id,
host: m.from.host,
port: m.from.port,
token: m.token,
to: m.to
})
}
if (m.closerNodes !== null) {
for (const node of m.closerNodes) {
const id = nodeId(node.host, node.port)
if (id.equals(this.dht.table.id)) continue
if (!this._addPending(id, node.host, node.port)) break
}
}
if (!this._fromTable && this.successes + this.errors >= this.concurrency) {
this._slowdown = false
}
if (m.status !== 0) {
this._readMore()
return
}
const data = this.map(m)
if (!data || this.push(data) !== false) {
this._readMore()
}
}
_onerror () {
this.errors++
this.inflight--
this._readMore()
}
_pushClosest (node) {
this.closest.push(node)
for (let i = this.closest.length - 2; i >= 0; i--) {
const prev = this.closest[i]
const cmp = this._compare(prev.id, node.id)
// if sorted, done!
if (cmp < 0) break
// if dup, splice it out (rare)
if (cmp === 0) {
this.closest.splice(i + 1, 1)
break
}
// swap and continue down
this.closest[i + 1] = prev
this.closest[i] = node
}
if (this.closest.length > this.k) this.closest.pop()
}
_compare (a, b) {
for (let i = 0; i < a.length; i++) {
if (a[i] === b[i]) continue
const t = this.target[i]
return (t ^ a[i]) - (t ^ b[i])
}
return 0
}
_visit (node) {
this.inflight++
this.dht.request(this.target, this.command, this.value, node, this._ropts)
.then(this._onresolve, this._onreject)
}
}
function autoCommit (node, dht, query) {
if (!node.token) return Promise.reject(new Error('No token received for closest node'))
return dht.request(query.target, query.command, query.value, node)
}
function defaultMap (m) {
return m
}