From 29be8085e3609148885fd2a1b5434a3bf784090b Mon Sep 17 00:00:00 2001 From: Mathias Buus Date: Mon, 3 Oct 2016 16:42:16 +0200 Subject: [PATCH] refactor query-stream --- query-stream.js | 119 +++++++++++++++++++++++++++--------------------- 1 file changed, 68 insertions(+), 51 deletions(-) diff --git a/query-stream.js b/query-stream.js index 611ac08..3885744 100644 --- a/query-stream.js +++ b/query-stream.js @@ -23,14 +23,16 @@ function QueryStream (dht, query, opts) { this.destroyed = false this.responses = 0 this.errors = 0 + this.closest = [] + this._post = opts.post this._dht = dht this._bootstrapped = false this._concurrency = opts.concurrency this._inflight = 0 this._k = 20 this._onresponse = onresponse - this.closest = [] + this._pending = [] function onresponse (err, response, peer) { self._update(err, response, peer) @@ -53,7 +55,8 @@ QueryStream.prototype._bootstrap = function () { var bootstrap = this._dht.nodes.closest(this.target, this._k) for (var i = 0; i < bootstrap.length; i++) { - this._add(bootstrap[i]) + var b = bootstrap[i] + this._addPending({id: b.id, port: b.port, host: b.host}) } if (bootstrap.length < this._dht.bootstrap.length) { @@ -71,31 +74,23 @@ QueryStream.prototype._update = function (err, res, peer) { this.errors++ this.emit('warning', err) if (this._readableState.flowing === true) this._read() - } else { - this.responses++ - - if (res.id) { - var prev = this._get(res.id) - if (prev) prev.roundtripToken = res.roundtripToken - } + return + } - // TODO: do not add nodes to table. - // instead merge-sort with table so we only add nodes that actually respond - var n = decodeNodes(res.nodes) + this.responses++ + this._addClosest(res, peer) - for (var i = 0; i < n.length; i++) { - if (!bufferEquals(n[i].id, this._dht.id)) this._add(n[i]) - } + var candidates = decodeNodes(res.nodes) + for (var i = 0; i < candidates.length; i++) this._addPending(candidates[i]) - this.push({ - node: { - id: res.id, - port: peer.port, - host: peer.host - }, - value: res.value - }) - } + this.push({ + node: { + id: res.id, + port: peer.port, + host: peer.host + }, + value: res.value + }) } QueryStream.prototype._read = function () { @@ -106,10 +101,10 @@ QueryStream.prototype._read = function () { if (!free && !this._inflight) free = 1 var missing = free - for (var i = 0; missing && i < this.closest.length; i++) { - if (this.closest[i].queried) continue + for (var i = 0; missing && i < this._pending.length; i++) { + if (this._pending[i].queried) continue missing-- - this._send(this.closest[i], false) + this._send(this._pending[i], false) } if (!this._inflight && free) { @@ -118,40 +113,36 @@ QueryStream.prototype._read = function () { } QueryStream.prototype._send = function (node, bootstrap) { - if (!bootstrap && node.queried) return - if (!bootstrap) node.queried = true + if (!bootstrap) { + if (node.queried) return + node.queried = true + } this._inflight++ this._dht._request(this.request, node, false, this._onresponse) } -QueryStream.prototype._get = function (id) { - for (var i = 0; i < this.closest.length; i++) { - if (bufferEquals(this.closest[i].id, id)) return this.closest[i] - } - return null +QueryStream.prototype._addPending = function (node) { + if (bufferEquals(node.id, this._dht.id)) return + node.distance = xor(this.target, node.id) + insertSorted(node, this._k, this._pending) } -QueryStream.prototype._add = function (node) { - if (this._get(node.id)) return +QueryStream.prototype._addClosest = function (res, peer) { + if (!res.id || !res.roundtripToken || bufferEquals(res.id, this._dht.id)) return - node.distance = xor(this.target, node.id) - node.queried = false + var prev = getNode(res.id, this._pending) - if (this.closest.length < this._k) { - this.closest.push(node) - return + if (!prev) { + prev = { + id: res.id, + port: peer.port, + host: peer.host, + distance: xor(res.id, this.target) + } } - if (!xor.gt(this.closest[this._k - 1].distance, node.distance)) return - - this.closest[this._k - 1] = node - - var pos = this._k - 1 - while (pos && xor.gt(this.closest[pos - 1].distance, node.distance)) { - this.closest[pos] = this.closest[pos - 1] - this.closest[pos - 1] = node - pos-- - } + prev.roundtripToken = res.roundtripToken + insertSorted(prev, this._k, this.closest) } function decodeNodes (buf) { @@ -162,3 +153,29 @@ function decodeNodes (buf) { return [] } } + +function getNode (id, list) { + // find id in the list. + // technically this would be faster with binary search (against distance) + // but this list is always small, so meh + for (var i = 0; i < list.length; i++) { + if (bufferEquals(list[i].id, id)) return list[i] + } + + return null +} + +function insertSorted (node, max, list) { + if (list.length === max && !xor.lt(node.distance, list[max - 1].distance)) return + if (getNode(node.id, list)) return + + if (list.length < max) list.push(node) + else list[max - 1] = node + + var pos = list.length - 1 + while (pos && xor.gt(list[pos - 1].distance, node.distance)) { + list[pos] = list[pos - 1] + list[pos - 1] = node + pos-- + } +}