|
|
@ -16,7 +16,7 @@ module.exports = class Query extends Readable { |
|
|
|
this.concurrency = opts.concurrency || this.dht.concurrency |
|
|
|
this.inflight = 0 |
|
|
|
this.map = opts.map || defaultMap |
|
|
|
this.closests = [] |
|
|
|
this.closest = [] |
|
|
|
|
|
|
|
this._slowdown = false |
|
|
|
this._seen = new Set() |
|
|
@ -62,7 +62,7 @@ module.exports = class Query extends Readable { |
|
|
|
|
|
|
|
async commit (command = this.command, value = this.value, opts) { |
|
|
|
if (typeof command === 'object' && command) return this.commit(undefined, undefined, command) |
|
|
|
return this.dht.requestAll(this.target, command, value, this.closests, opts) |
|
|
|
return this.dht.requestAll(this.target, command, value, this.closest, opts) |
|
|
|
} |
|
|
|
|
|
|
|
async toArray () { |
|
|
@ -96,7 +96,7 @@ module.exports = class Query extends Readable { |
|
|
|
} |
|
|
|
|
|
|
|
_isCloser (id) { |
|
|
|
return this.closests.length < this.k || this._compare(id, this.closests[this.closests.length - 1].id) < 0 |
|
|
|
return this.closest.length < this.k || this._compare(id, this.closest[this.closest.length - 1].id) < 0 |
|
|
|
} |
|
|
|
|
|
|
|
_addPending (id, host, port) { |
|
|
@ -125,7 +125,7 @@ module.exports = class Query extends Readable { |
|
|
|
} |
|
|
|
|
|
|
|
// if reusing closest nodes, slow down after the first readMore tick to allow
|
|
|
|
// the closests node a chance to reply before going broad to question more
|
|
|
|
// the closest node a chance to reply before going broad to question more
|
|
|
|
if (!this._fromTable && this.successes === 0 && this.errors === 0) { |
|
|
|
this._slowdown = true |
|
|
|
} |
|
|
@ -147,7 +147,7 @@ module.exports = class Query extends Readable { |
|
|
|
return |
|
|
|
} |
|
|
|
|
|
|
|
if (!this.closests.length) { |
|
|
|
if (!this.closest.length) { |
|
|
|
this.destroy(new Error('Too few nodes responded')) |
|
|
|
return |
|
|
|
} |
|
|
@ -156,7 +156,7 @@ module.exports = class Query extends Readable { |
|
|
|
this._commiting = true |
|
|
|
|
|
|
|
const p = [] |
|
|
|
for (const node of this.closests) p.push(this._commit(node, this.dht, this)) |
|
|
|
for (const node of this.closest) p.push(this._commit(node, this.dht, this)) |
|
|
|
|
|
|
|
race(p, 1, p.length) |
|
|
|
.then(() => this.push(null), (err) => this.destroy(err)) |
|
|
@ -209,22 +209,22 @@ module.exports = class Query extends Readable { |
|
|
|
} |
|
|
|
|
|
|
|
_pushClosest (node) { |
|
|
|
this.closests.push(node) |
|
|
|
for (let i = this.closests.length - 2; i >= 0; i--) { |
|
|
|
const prev = this.closests[i] |
|
|
|
this.closest.push(node) |
|
|
|
for (let i = this.closest.length - 2; i >= 0; i--) { |
|
|
|
const prev = this.closest[i] |
|
|
|
const cmp = this._compare(prev.id, node.id) |
|
|
|
// if sorted, done!
|
|
|
|
if (cmp < 0) break |
|
|
|
// if dup, splice it out (rare)
|
|
|
|
if (cmp === 0) { |
|
|
|
this.closests.splice(i + 1, 1) |
|
|
|
this.closest.splice(i + 1, 1) |
|
|
|
break |
|
|
|
} |
|
|
|
// swap and continue down
|
|
|
|
this.closests[i + 1] = prev |
|
|
|
this.closests[i] = node |
|
|
|
this.closest[i + 1] = prev |
|
|
|
this.closest[i] = node |
|
|
|
} |
|
|
|
if (this.closests.length > this.k) this.closests.pop() |
|
|
|
if (this.closest.length > this.k) this.closest.pop() |
|
|
|
} |
|
|
|
|
|
|
|
_compare (a, b) { |
|
|
|