Browse Source

add holepunching support

v4
Mathias Buus 8 years ago
parent
commit
f9624a8500
  1. 2
      index.js
  2. 2
      package.json
  3. 33
      query-stream.js

2
index.js

@ -357,7 +357,7 @@ DHT.prototype._reping = function (oldContacts, newContact) {
function afterPing (err) {
if (!err) return ping()
self.nodes.remove(next)
self.nodes.remove(next.id)
self.nodes.add(newContact)
}
}

2
package.json

@ -12,7 +12,7 @@
"protocol-buffers": "^3.1.6",
"readable-stream": "^2.1.5",
"stream-collector": "^1.0.1",
"udp-request": "^1.2.0",
"udp-request": "^1.3.0",
"xor-distance": "^1.0.0"
},
"devDependencies": {

33
query-stream.js

@ -21,6 +21,7 @@ function QueryStream (dht, query, opts) {
this.query.id = dht._queryId
this.target = query.target
this.token = !!opts.token
this.holepunching = opts.holepunching !== false
this.responses = 0
this.errors = 0
this.destroyed = false
@ -40,6 +41,12 @@ function QueryStream (dht, query, opts) {
this._moveCloser = !nodes
this._bootstrapped = !this._moveCloser
this._onresponse = onresponse
this._onresponseholepunch = onresponseholepunch
function onresponseholepunch (err, res, peer, query) {
if (!err || !peer || !peer.referrer) self._callback(err, res, peer)
else self._holepunch(peer, query)
}
function onresponse (err, res, peer) {
self._callback(err, res, peer)
@ -60,6 +67,7 @@ QueryStream.prototype._finalize = function () {
if (this._finalized) return
this._finalized = true
this._dht.inflightQueries--
if (!this.responses && !this.destroyed) this.destroy(new Error('No nodes responded'))
this.push(null)
}
@ -71,7 +79,7 @@ QueryStream.prototype._bootstrap = function () {
for (i = 0; i < bootstrap.length; i++) {
var b = bootstrap[i]
this._addPending({id: b.id, port: b.port, host: b.host})
this._addPending({id: b.id, port: b.port, host: b.host}, null)
}
if (bootstrap.length < this._dht.bootstrap.length) {
@ -115,6 +123,15 @@ QueryStream.prototype._read = function () {
else this._sendPending()
}
QueryStream.prototype._holepunch = function (peer, query) {
var self = this
this._dht._holepunch(peer, peer.referrer, function (err) {
if (err) return self._callback(err, null, peer)
self._dht._request(query, peer, false, self._onresponse)
})
}
QueryStream.prototype._callback = function (err, res, peer) {
this._inflight--
if (this.destroyed) return
@ -131,10 +148,10 @@ QueryStream.prototype._callback = function (err, res, peer) {
if (this._moveCloser) {
var candidates = decodeNodes(res.nodes)
for (var i = 0; i < candidates.length; i++) this._addPending(candidates[i])
for (var i = 0; i < candidates.length; i++) this._addPending(candidates[i], peer)
}
if (this.token && !this.verbose && !this._committing) {
if (!validateId(res.id) || (this.token && !this.verbose && !this._committing)) {
this._readMaybe()
return
}
@ -185,13 +202,15 @@ QueryStream.prototype._send = function (node, force, useToken) {
}
}
this._dht._request(query, node, false, this._onresponse)
var s = this
this._dht._request(query, node, false, this.holepunching ? this._onresponseholepunch : this._onresponse)
return true
}
QueryStream.prototype._addPending = function (node) {
QueryStream.prototype._addPending = function (node, ref) {
if (bufferEquals(node.id, this._dht.id)) return
node.distance = xor(this.target, node.id)
node.referrer = ref
insertSorted(node, this._k, this._pending)
}
@ -233,6 +252,10 @@ function getNode (id, list) {
return null
}
function validateId (id) {
return id && id.length === 32
}
function insertSorted (node, max, list) {
if (list.length === max && !xor.lt(node.distance, list[max - 1].distance)) return
if (getNode(node.id, list)) return

Loading…
Cancel
Save