Browse Source

better query streams

v4
Mathias Buus 8 years ago
parent
commit
85b9acaa6b
  1. 127
      example.js
  2. 191
      index.js
  3. 140
      query-stream.js

127
example.js

@ -1,4 +1,5 @@
require('crypto').randomBytes = randomBytes = require('random-bytes-seed')()
var seed = new Buffer('2fd346fe907c29d215838c1bac0719f05010dda530a825dcac36081040db6acb', 'hex')
require('crypto').randomBytes = require('random-bytes-seed')(seed)
// new Buffer('bc2cee9806f1408a4d09821a2478f593d8f08e27b4000c61e3672a1b6a42a5fa', 'hex')
var dht = require('./')
@ -6,8 +7,9 @@ var dht = require('./')
// bootstrap
dht().listen(10000)
var hash = require('crypto').createHash('sha256').update('hello world').digest()
var missing = 1000
var value = new Buffer('hello world')
var hash = require('crypto').createHash('sha256').update(value).digest()
var missing = 100
var KBucket = require('k-bucket')
var bingo = (0.6 * missing) | 0 //(Math.random() * missing) | 0
var t
@ -16,21 +18,19 @@ loop(0)
function createNode (i) {
var node = dht({bootstrap: 10000})
var store = {}
node.on('query', function (query, cb) {
if (query.command === 'put' && query.roundtripToken) {
console.log('got put command', query)
return cb()
}
if (query.command === 'get') {
console.log('got get command', query)
return cb()
}
node.on('closest', function (request, cb) {
console.log('storing')
store[request.target.toString('hex')] = request.value
cb()
})
node.on('query', function (request, cb) {
var value = store[request.target.toString('hex')]
cb(null, value)
})
return node
}
@ -53,39 +53,74 @@ function loop (i) {
// })
}
function update (node, nodes, request, cb) {
var missing = nodes.length
for (var i = 0; i < nodes.length; i++) {
var req = {
id: node.id,
command: request.command,
target: request.target,
value: request.value,
roundtripToken: nodes[i].roundtripToken
}
node._request(req, nodes[i], done)
}
function done (err, res) {
if (--missing) return
cb()
}
}
function test () {
var node = dht({bootstrap: 10000})
node.on('ready', function () {
console.log('ready')
node.closest({
command: 'store',
target: hash,
value: value
}, function (err, responses) {
console.log(err)
console.log('stored on ' + responses.length + ' peers')
var peers = 0
var node2 = dht({bootstrap: 10000})
node2.on('ready', function () {
console.log('ready to query')
var s = node2.query({
command: 'lookup',
target: hash
})
// s._debug = true
s.on('data', function (data) {
peers++
if (data.value) {
console.log(hash.toString('hex') + ' --> ' + data.value.toString(), '(' + peers + ' messages)')
console.log(data)
s.destroy()
}
})
s.on('end', function () {
console.log('(no more data)')
})
})
})
return
// var qs = node.query({
// command: '_find_node',
// target: hash
// }, {
// concurrency: 1
// })
// qs.on('end', function () {
// console.log('(end)')
// })
// loop()
// function loop () {
// var data = qs.read()
// if (data) console.log('data', data)
// setTimeout(loop, 1000)
// }
// return
var messages = 0
var qs = require('./query-stream')(node, {
var qs = node.query({
command: '_find_node',
target: hash
}, {
concurrency: 3
})
qs.on('data', function (data) {
@ -96,7 +131,21 @@ function test () {
console.log('(end)', '(used ' + messages + ')')
console.log('seed', randomBytes.seed.toString('hex'))
console.log('closest', qs.closest)
// console.log('closest', qs.closest)
update(node, qs.closest, {command: 'put', target: hash, value: value}, function () {
node.query({
command: 'get',
target: hash
}).on('data', function (data) {
if (data.value) {
console.log('->', data.value)
this.destroy()
}
}).on('end', function () {
console.log('(get query done)')
})
})
// node._closest({command: '_find_node', target: hash}, onresponse, function () {
// console.log('(end2)')

191
index.js

@ -5,8 +5,11 @@ var inherits = require('inherits')
var events = require('events')
var peers = require('ipv4-peers')
var bufferEquals = require('buffer-equals')
var duplexify = require('duplexify')
var collect = require('stream-collector')
var nodes = peers.idLength(32)
var messages = require('./messages')
var queryStream = require('./query-stream')
module.exports = DHT
@ -21,7 +24,7 @@ function DHT (opts) {
this.concurrency = opts.concurrency || 16
this.bootstrap = [].concat(opts.bootstrap || []).map(parseAddr)
this.id = opts.id || crypto.randomBytes(32)
this.nodes = new KBucket({localNodeId: this.id})
this.nodes = new KBucket({localNodeId: this.id, arbiter: arbiter})
this.nodes.on('ping', onnodeping)
this.socket = udp({
@ -35,8 +38,16 @@ function DHT (opts) {
this._bootstrapped = false
this._pendingRequests = []
this._tick = 0
this._secrets = [crypto.randomBytes(32), crypto.randomBytes(32)]
this._interval = setInterval(rotateSecrets, 5 * 60 * 1000)
this._secretsInterval = setInterval(rotateSecrets, 5 * 60 * 1000)
this._tickInterval = setInterval(tick, 5 * 1000)
if (opts.nodes) {
for (var i = 0; i < opts.nodes.length; i++) {
this._addNode(opts.nodes[i].id, opts.nodes[i])
}
}
process.nextTick(function () {
self._bootstrap()
@ -64,10 +75,51 @@ function DHT (opts) {
}
self.emit('close')
}
function tick () {
self._tick++
}
}
inherits(DHT, events.EventEmitter)
DHT.prototype.query = function (query, opts, cb) {
if (typeof opts === 'function') return this.query(query, null, opts)
return collect(queryStream(this, query, opts), cb)
}
DHT.prototype.closest = function (query, opts, cb) {
if (typeof opts === 'function') return this.closest(query, null, opts)
if (!opts) opts = {}
opts.token = true
return collect(queryStream(this, query, opts), cb)
}
DHT.prototype._closestNodes = function (target, opts, cb) {
var nodes = opts.nodes || opts.node
if (nodes) {
if (!Array.isArray(nodes)) nodes = [nodes]
process.nextTick(function () {
cb(null, nodes)
})
return null
}
var qs = this.get({
command: '_find_node',
target: target
})
qs.resume()
qs.on('error', noop)
qs.on('end', function () {
cb(null, qs.closest)
})
return qs
}
DHT.prototype.ping = function (peer, cb) {
this._ping(parseAddr(peer), function (err, res, peer) {
if (err) return cb(err)
@ -77,7 +129,13 @@ DHT.prototype.ping = function (peer, cb) {
})
}
DHT.prototype.toArray = function () {
return this.nodes.toArray()
}
DHT.prototype.destroy = function () {
clearInterval(this._secretsInterval)
clearInterval(this._tickInterval)
this.socket.destroy()
}
@ -92,68 +150,21 @@ DHT.prototype._bootstrap = function () {
// TODO: check stats, to determine wheather to rerun?
var self = this
this._closest({command: '_find_node', target: this.id, id: this.id}, null, function (err) {
if (err) return self.emit('error', err)
self._bootstrapped = true
self.emit('ready')
var qs = this.query({
command: '_find_node',
target: this.id
})
}
DHT.prototype._closest = function (request, onresponse, cb) {
if (!cb) cb = noop
var self = this
var target = request.target
var stats = {responses: 0, errors: 0}
// var table = new KBucket({localNodeId: target})
var table = require('./table')(target)
var requested = {}
var inflight = 0
var bootstrap = this.nodes.closest(target, 20)
if (bootstrap.length < this.bootstrap.length) bootstrap.push.apply(bootstrap, this.bootstrap)
bootstrap.forEach(send)
if (!inflight) cb(null, stats, table)
function send (peer) {
var addr = peer.host + ':' + peer.port
if (requested[addr]) return
requested[addr] = true
inflight++
self._request(request, peer, false, next)
}
function next (err, res, peer) {
inflight--
qs.resume()
if (err) {
stats.errors++
} else {
stats.responses++
if (res.id) {
// var prev = table.get(res.id)
// if (prev) prev.roundtripToken = res.roundtripToken
}
// TODO: do not add nodes to table.
// instead merge-sort with table so we only add nodes that actually respond
var n = decodeNodes(res.nodes)
for (var i = 0; i < n.length; i++) {
if (!bufferEquals(n[i].id, self.id)) table.add(n[i])
}
if (onresponse) onresponse(res, peer)
}
qs.on('error', function (err) {
self.emit('error', err)
})
table.closest(20).forEach(send)
if (!inflight) {
cb(null, stats, table)
}
}
qs.on('end', function () {
self._bootstrapped = true
self.emit('ready')
})
}
DHT.prototype._ping = function (peer, cb) {
@ -211,7 +222,9 @@ DHT.prototype._onquery = function (request, peer) {
roundtripToken: request.roundtripToken
}
if (!this.emit('query', query, callback)) callback()
var method = request.roundtripToken ? 'closest' : 'query'
if (!this.emit(method + ':' + request.command, query, callback) && !this.emit(method, query, callback)) callback()
function callback (err, value) {
// TODO: support errors?
@ -260,10 +273,39 @@ DHT.prototype._onfindnode = function (request, peer) {
DHT.prototype._onnodeping = function (oldContacts, newContact) {
if (!this._bootstrapped) return // bootstrapping, we've recently pinged all nodes
// TODO: record if we've recently pinged oldContacts, no need to flood them with new pings then
// console.log('onnodeping', this.bootstrap.length, this._bootstrapped, oldContacts.length)
var reping = []
for (var i = 0; i < oldContacts.length; i++) {
this.nodes.add(oldContacts[i])
var old = oldContacts[i]
if (this._tick - old.tick < 3) { // less than 10s since we talked to this peer ...
this.nodes.add(oldContacts[i])
continue
}
reping.push(old)
}
if (reping.length) this._reping(reping, newContact)
}
DHT.prototype._reping = function (oldContacts, newContact) {
var self = this
var next = null
ping()
function ping () {
next = oldContacts.shift()
if (next) self._request({command: '_ping', id: self.id}, next, true, afterPing)
}
function afterPing (err) {
if (!err) return ping()
self.nodes.remove(next)
self.nodes.add(newContact)
}
}
@ -273,7 +315,13 @@ DHT.prototype._token = function (peer, i) {
DHT.prototype._addNode = function (id, peer, token) {
if (bufferEquals(id, this.id)) return
this.nodes.add({id: id, roundtripToken: token, port: peer.port, host: peer.host})
this.nodes.add({
id: id,
port: peer.port,
host: peer.host,
roundtripToken: token,
tick: this._tick
})
}
DHT.prototype.listen = function (port, cb) {
@ -282,6 +330,15 @@ DHT.prototype.listen = function (port, cb) {
function noop () {}
function once (cb) {
var called = false
return function (err, val) {
if (called) return
called = true
cb(err, val)
}
}
function decodeNodes (buf) {
if (!buf) return []
try {
@ -312,3 +369,7 @@ function parseAddr (addr) {
function validateId (id) {
return id && id.length === 32
}
function arbiter (incumbant, candidate) {
return candidate
}

140
query-stream.js

@ -1,6 +1,5 @@
var stream = require('readable-stream')
var inherits = require('inherits')
var KBucket = require('k-bucket')
var nodes = require('ipv4-peers').idLength(32)
var bufferEquals = require('buffer-equals')
var xor = require('xor-distance')
@ -16,26 +15,31 @@ function QueryStream (dht, query, opts) {
stream.Readable.call(this, {objectMode: true, highWaterMark: opts.concurrency})
var self = this
var nodes = opts.node || opts.nodes
this.request = query
this.request.id = dht.id
this.target = this.request.target
this.destroyed = false
this.query = query
this.query.id = dht.id
this.target = query.target
this.token = !!opts.token
this.responses = 0
this.errors = 0
this.closest = []
this.destroyed = false
this.verbose = !!opts.verbose
this._post = opts.post
this._dht = dht
this._bootstrapped = false
this._committing = false
this._closest = opts.closest || []
this._concurrency = opts.concurrency
this._updating = false
this._pending = nodes ? [].concat(nodes).map(copyNode) : []
this._k = nodes ? Infinity : opts.k || 20
this._inflight = 0
this._k = 20
this._moveCloser = !nodes
this._bootstrapped = !this._moveCloser
this._onresponse = onresponse
this._pending = []
function onresponse (err, response, peer) {
self._update(err, response, peer)
function onresponse (err, res, peer) {
self._callback(err, res, peer)
}
}
@ -53,35 +57,77 @@ QueryStream.prototype._bootstrap = function () {
this._bootstrapped = true
var bootstrap = this._dht.nodes.closest(this.target, this._k)
var i = 0
for (var i = 0; i < bootstrap.length; i++) {
for (i = 0; i < bootstrap.length; i++) {
var b = bootstrap[i]
this._addPending({id: b.id, port: b.port, host: b.host})
}
if (bootstrap.length < this._dht.bootstrap.length) {
for (var i = 0; i < this._dht.bootstrap.length; i++) {
this._send(this._dht.bootstrap[i], true)
for (i = 0; i < this._dht.bootstrap.length; i++) {
this._send(this._dht.bootstrap[i], true, false)
}
}
}
QueryStream.prototype._update = function (err, res, peer) {
QueryStream.prototype._readMaybe = function () {
if (this._readableState.flowing === true) this._read()
}
QueryStream.prototype._sendTokens = function () {
if (this.destroyed) return
var sent = this._sendAll(this._closest, false, true)
if (sent || this._inflight) return
this.push(null)
}
QueryStream.prototype._sendPending = function () {
if (this.destroyed) return
if (!this._bootstrapped) this._bootstrap()
var sent = this._sendAll(this._pending, false, false)
if (sent || this._inflight) return
if (this.token) {
for (var i = 0; i < this._closest.length; i++) this._closest[i].queried = false
this._committing = true
this._sendTokens()
} else {
this.push(null)
}
}
QueryStream.prototype._read = function () {
if (this._committing) this._sendTokens()
else this._sendPending()
}
QueryStream.prototype._callback = function (err, res, peer) {
this._inflight--
if (this.destroyed) return
if (err) {
this.errors++
this.emit('warning', err)
if (this._readableState.flowing === true) this._read()
this._readMaybe()
return
}
this.responses++
this._addClosest(res, peer)
var candidates = decodeNodes(res.nodes)
for (var i = 0; i < candidates.length; i++) this._addPending(candidates[i])
if (this._moveCloser) {
var candidates = decodeNodes(res.nodes)
for (var i = 0; i < candidates.length; i++) this._addPending(candidates[i])
}
if (this.token && !this.verbose && !this._committing) {
this._readMaybe()
return
}
this.push({
node: {
@ -93,32 +139,44 @@ QueryStream.prototype._update = function (err, res, peer) {
})
}
QueryStream.prototype._read = function () {
if (this.destroyed) return
if (!this._bootstrapped) this._bootstrap()
QueryStream.prototype._sendAll = function (nodes, force, useToken) {
var sent = 0
var free = Math.max(0, this._concurrency - this._dht.socket.inflight)
if (!free && !this._inflight) free = 1
var missing = free
if (!free) return 0
for (var i = 0; missing && i < this._pending.length; i++) {
if (this._pending[i].queried) continue
missing--
this._send(this._pending[i], false)
for (var i = 0; i < nodes.length; i++) {
if (this._send(nodes[i], force, useToken)) {
if (++sent === free) break
}
}
if (!this._inflight && free) {
this.push(null)
}
return sent
}
QueryStream.prototype._send = function (node, bootstrap) {
if (!bootstrap) {
if (node.queried) return
QueryStream.prototype._send = function (node, force, useToken) {
if (!force) {
if (node.queried) return false
node.queried = true
}
this._inflight++
this._dht._request(this.request, node, false, this._onresponse)
var query = this.query
if (useToken && node.roundtripToken) {
query = {
command: this.query.command,
id: this.query.id,
target: this.query.target,
value: this.query.value,
roundtripToken: node.roundtripToken
}
}
this._dht._request(query, node, false, this._onresponse)
return true
}
QueryStream.prototype._addPending = function (node) {
@ -142,7 +200,7 @@ QueryStream.prototype._addClosest = function (res, peer) {
}
prev.roundtripToken = res.roundtripToken
insertSorted(prev, this._k, this.closest)
insertSorted(prev, this._k, this._closest)
}
function decodeNodes (buf) {
@ -179,3 +237,13 @@ function insertSorted (node, max, list) {
pos--
}
}
function copyNode (node) {
return {
id: node.id,
port: node.port,
host: node.host,
roundtripToken: node.roundtripToken,
referer: node.referer // TODO: is this the correct spelling?
}
}

Loading…
Cancel
Save