From 1f4239f32caf902ea335e427f6655a7eceb813f9 Mon Sep 17 00:00:00 2001 From: Mathias Buus Date: Thu, 22 Apr 2021 15:56:32 +0200 Subject: [PATCH] add basic v5 rewrite --- LICENSE | 21 ++ README.md | 271 +----------------- example.js | 50 ---- example/bootstrap.js | 6 - example/find.js | 21 -- example/insert.js | 15 - example/network.js | 37 --- index.js | 649 +++++++++++++++++++------------------------ lib/io.js | 359 ------------------------ lib/key-pair.js | 11 + lib/messages.js | 397 ++++++++------------------ lib/query-stream.js | 269 ------------------ lib/query-table.js | 69 ----- lib/query.js | 172 ++++++++++++ lib/rpc.js | 272 ++++++++++++++++++ package.json | 36 +-- schema.proto | 30 -- test.js | 314 --------------------- 18 files changed, 888 insertions(+), 2111 deletions(-) create mode 100644 LICENSE delete mode 100644 example.js delete mode 100644 example/bootstrap.js delete mode 100644 example/find.js delete mode 100644 example/insert.js delete mode 100644 example/network.js delete mode 100644 lib/io.js create mode 100644 lib/key-pair.js delete mode 100644 lib/query-stream.js delete mode 100644 lib/query-table.js create mode 100644 lib/query.js create mode 100644 lib/rpc.js delete mode 100644 schema.proto delete mode 100644 test.js diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..702b8e7 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +The MIT License (MIT) + +Copyright (c) 2021 Mathias Buus + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/README.md b/README.md index da39a1e..bc4943d 100644 --- a/README.md +++ b/README.md @@ -1,276 +1,17 @@ -# dht-rpc +# dht-rpc2 -Make RPC calls over a [Kademlia](https://pdos.csail.mit.edu/~petar/papers/maymounkov-kademlia-lncs.pdf) based DHT. +WIP - nothing to see here ``` -npm install dht-rpc +npm install dht-rpc2 ``` -[![build status](http://img.shields.io/travis/mafintosh/dht-rpc.svg?style=flat)](http://travis-ci.org/mafintosh/dht-rpc) - -## Key Features - -* UDP hole punching support -* Easily add any command to your DHT -* Streaming queries and updates - ## Usage -Here is an example implementing a simple key value store - -First spin up a bootstrap node. You can make multiple if you want for redundancy. - -``` js -const dht = require('dht-rpc') - -// Set ephemeral: true so other peers do not add us to the peer list, simply bootstrap -const bootstrap = dht({ ephemeral: true }) - -bootstrap.listen(10001) -``` - -Now lets make some dht nodes that can store values in our key value store. - -``` js -const dht = require('dht-rpc') -const crypto = require('crypto') - -// Let's create 100 dht nodes for our example. -for (var i = 0; i < 100; i++) createNode() - -function createNode () { - const node = dht({ - bootstrap: [ - 'localhost:10001' - ] - }) - - const values = new Map() - - node.command('values', { - // When we are the closest node and someone is sending us a "store" command - update (query, cb) { - if (!query.value) return cb() - - // Use the hash of the value as the key - const key = sha256(query.value).toString('hex') - values.set(key, query.value) - console.log('Storing', key, '-->', query.value.toString()) - cb() - }, - // When someone is querying for a "lookup" command - query (query, cb) { - const value = values.get(query.target.toString('hex')) - cb(null, value) - } - }) -} - -function sha256 (val) { - return crypto.createHash('sha256').update(val).digest() -} -``` - -To insert a value into this dht make another script that does this following - -``` js -// Set ephemeral: true as we are not part of the network. -const node = dht({ ephemeral: true }) - -node.update('values', sha256(val), value, function (err, res) { - if (err) throw err - console.log('Inserted', sha256(val).toString('hex')) -}) -``` - -Then after inserting run this script to query for a value - ``` js -node.query('values', Buffer.from(hexFromAbove, 'hex')) - .on('data', function (data) { - if (data.value && sha256(data.value).toString('hex') === hexFromAbove) { - // We found the value! Destroy the query stream as there is no need to continue. - console.log(val, '-->', data.value.toString()) - this.destroy() - } - }) - .on('end', function () { - console.log('(query finished)') - }) -``` - -## API - -#### `const node = dht([options])` - -Create a new DHT node. - -Options include: - -```js -{ - // Whether or not this node is ephemeral or should join the routing table - ephemeral: false, - // A list of bootstrap nodes - bootstrap: [ 'bootstrap-node.com:24242', ... ], - // Optionally pass in your own UDP socket to use. - socket: udpSocket -} -``` - -#### `node.command(name, cmd)` - -Define a new RPC command. `cmd` should look like this - -```js -{ - // Query handler - query (query, cb), - // Update handler. only triggered when we are one of the closest nodes to the target - update (query, cb), - // Optional value encoding for the query/update incoming value. Defaults to binary. - inputEncoding: 'json', 'utf-8', object, - // Optional value encoding for the query/update outgoing value. Defaults to binary. - outputEncoding: (same as above), - valueEncoding: (sets both input/output encoding to this) -} -``` - -The `query` object in the query/update function looks like this: - -```js -{ - // always the same as your command def - command: 'command-name', - // the node who sent the query/update - node: { port, host, id }, - // the query/update target (32 byte target) - target: Buffer, - // the query/update payload decoded with the inputEncoding - value -} +const dht-rpc2 = require('dht-rpc2') ``` -You should call the query/update callback with `(err, value)` where -value will be encoded using the outputEncoding and returned to the node. - -#### `const stream = node.query(name, target, [value], [callback])` - -Send a query command. - -If you set a valueEncoding when defining the command the value will be encoded. - -Returns a result stream that emits data that looks like this: - -```js -{ - // was this a query/update response - type: dht.QUERY, - // who sent this response - node: { peer, host, id }, - // the response payload decoded using the outputEncoding - value -} -``` - -If you pass a callback the stream will be error handled and buffered -and the content passed as an array. - -#### `const stream = node.update(name, target, [value], [callback])` - -Send a update command - -Same options/results as above but the response data will have `type` -set to `dht.UPDATE`. - -#### `const stream = node.queryAndUpdate(name, target, [value], [callback])` - -Send a combined query and update command. - -Will keep querying until it finds the closest nodes to the target and then -issue an update. More efficient than doing a query/update yourself. - -Same options/results as above but the response data will include both -query and update results. - -#### `node.destroy(onclose)` - -Fully destroys the dht node. - -#### `node.bootstrap(cb)` - -Re-bootstrap the DHT node. Normally you shouldn't have to call this. - -#### `node.holepunch(peer, cb)` - -UDP holepunch to another peer. The DHT does this automatically -when it cannot reach another peer but you can use this yourself also. - -Peer should look like this: - -```js -{ - port, - host, - // referrer should be the node/peer that - // told you about this node. - referrer: { port, host } -} -``` - -#### `node.holepunchable()` - -Returns `true` if your current network is holepunchable. -Relies on a heuristic interally based on remote node information. - -It's usually best to wait for the `initial-nodes` or `ready` event before checking this -as it is more reliable the more routing information the node has. - -#### `{ host, port } = node.remoteAddress()` - -Returns your remote IP and port. -Relies on a heuristic interally based on remote node information. - -If your IP could not be inferred `null` is returned. -If your IP could be inferred but your port not, `{ host, port: 0 }` is returned. - -It's usually best to wait for the `initial-nodes` or `ready` event before checking this -as it is more reliable the more routing information the node has. - - -#### `node.listen([port], [address], [onlistening])` - -Explicitly bind the dht node to a certain port/address. - -#### `node.persistent()` - -Dynamically convert the node from ephemeral to non-ephemeral (join the DHT). - -#### `const nodes = node.getNodes()` - -Get the list of peer nodes as an array of objects with fields `{ id, host, port }`. - -#### `node.addNodes(nodes)` - -Given an array of `{ id, host, port }` objects, adds those in the list of -peer nodes. - -#### `node.on('ready')` - -Emitted when the node is fully bootstrapped. You can make queries/updates before. - -#### `node.on('initial-nodes')` - -Emitted when the routing table has been initially populated. - -#### `node.on('listening')` - -Emitted when the node starts listening on a udp port. - -#### `node.on('close')` - -Emitted when the node is fully closed. - -#### `node.on('holepunch', fromPeer, toPeer)` +## License -Emitted when the node is helping `fromPeer` udp holepunch to `toPeer`. +MIT diff --git a/example.js b/example.js deleted file mode 100644 index 614440f..0000000 --- a/example.js +++ /dev/null @@ -1,50 +0,0 @@ -const dht = require('./') - -const bootstrap = dht() -bootstrap.listen(10001) - -const nodes = [] -var swarm = 1000 -loop(null) - -function loop (err) { - if (err) throw err - if (swarm--) addNode(loop) - else done() -} - -function done () { - console.log('executing hi update') - - const i = Math.floor(Math.random() * nodes.length) - const rs = nodes[i].update('hi', Buffer.alloc(32)) - - rs.resume() - rs.on('end', function () { - setTimeout(done, 2000) - }) -} - -function addNode (cb) { - const node = dht({ - bootstrap: [ - 10001 - ] - }) - - var hits = 0 - node.command('hi', { - update (query, cb) { - console.log('hi', ++hits) - cb(null) - }, - query (query, cb) { - cb(null) - } - }) - - node.once('ready', function () { - nodes.push(node) - cb() - }) -} diff --git a/example/bootstrap.js b/example/bootstrap.js deleted file mode 100644 index 19e7ed7..0000000 --- a/example/bootstrap.js +++ /dev/null @@ -1,6 +0,0 @@ -const dht = require('../') - -// Set ephemeral: true so other peers do not add us to the peer list, simply bootstrap -const bootstrap = dht({ ephemeral: true }) - -bootstrap.listen(10001) diff --git a/example/find.js b/example/find.js deleted file mode 100644 index 9633bc2..0000000 --- a/example/find.js +++ /dev/null @@ -1,21 +0,0 @@ -const dht = require('../') -const crypto = require('crypto') - -const hex = process.argv[2] -const node = dht({ ephemeral: true, bootstrap: ['localhost:10001'] }) - -node.query('values', Buffer.from(hex, 'hex')) - .on('data', function (data) { - if (data.value && sha256(data.value).toString('hex') === hex) { - // We found the value! Destroy the query stream as there is no need to continue. - console.log(hex, '-->', data.value.toString()) - this.destroy() - } - }) - .on('end', function () { - console.log('(query finished)') - }) - -function sha256 (val) { - return crypto.createHash('sha256').update(val).digest() -} diff --git a/example/insert.js b/example/insert.js deleted file mode 100644 index cd23068..0000000 --- a/example/insert.js +++ /dev/null @@ -1,15 +0,0 @@ -const dht = require('../') -const crypto = require('crypto') - -// Set ephemeral: true as we are not part of the network. -const node = dht({ ephemeral: true, bootstrap: ['localhost:10001'] }) -const val = Buffer.from(process.argv[2]) - -node.update('values', sha256(val), val, function (err, res) { - if (err) throw err - console.log('Inserted', sha256(val).toString('hex')) -}) - -function sha256 (val) { - return crypto.createHash('sha256').update(val).digest() -} diff --git a/example/network.js b/example/network.js deleted file mode 100644 index 459f46f..0000000 --- a/example/network.js +++ /dev/null @@ -1,37 +0,0 @@ -const dht = require('../') -const crypto = require('crypto') - -// Let's create 100 dht nodes for our example. -for (var i = 0; i < 100; i++) createNode() - -function createNode () { - const node = dht({ - bootstrap: [ - 'localhost:10001' - ] - }) - - const values = new Map() - - node.command('values', { - // When we are the closest node and someone is sending us a "store" command - update (query, cb) { - if (!query.value) return cb() - - // Use the hash of the value as the key - const key = sha256(query.value).toString('hex') - values.set(key, query.value) - console.log('Storing', key, '-->', query.value.toString()) - cb() - }, - // When someone is querying for a "lookup" command - query (query, cb) { - const value = values.get(query.target.toString('hex')) - cb(null, value) - } - }) -} - -function sha256 (val) { - return crypto.createHash('sha256').update(val).digest() -} diff --git a/index.js b/index.js index f4610ca..a72f680 100644 --- a/index.js +++ b/index.js @@ -1,483 +1,394 @@ +const dns = require('dns') +const RPC = require('./lib/rpc') +const createKeyPair = require('./lib/key-pair') +const Query = require('./lib/query') +const Table = require('kademlia-routing-table') +const TOS = require('time-ordered-set') +const FIFO = require('fast-fifo/fixed-size') +const sodium = require('sodium-universal') const { EventEmitter } = require('events') -const peers = require('ipv4-peers') -const dgram = require('dgram') -const sodium = require('sodium-native') -const KBucket = require('k-bucket') -const tos = require('time-ordered-set') -const collect = require('stream-collector') -const codecs = require('codecs') -const { Message, Holepunch } = require('./lib/messages') -const IO = require('./lib/io') -const QueryStream = require('./lib/query-stream') -const blake2b = require('blake2b-universal') - -const UNSUPPORTED_COMMAND = new Error('Unsupported command') -const nodes = peers.idLength(32) - -exports = module.exports = opts => new DHT(opts) - -class DHT extends EventEmitter { - constructor (opts) { - if (!opts) opts = {} - super() +const TICK_INTERVAL = 5000 +const REFRESH_TICKS = 60 // refresh every ~5min when idle +const RECENT_NODE = 20 // we've heard from a node less than 1min ago +const OLD_NODE = 360 // if an node has been around more than 30 min we consider it old... - this.bootstrapped = false - this.destroyed = false - this.concurrency = 16 - this.concurrencyRPS = 50 - this.socket = opts.socket || dgram.createSocket('udp4') - this.id = opts.id || randomBytes(32) - this.inflightQueries = 0 - this.ephemeral = !!opts.ephemeral - - this.nodes = tos() - this.bucket = new KBucket({ localNodeId: this.id }) - this.bucket.on('ping', this._onnodeping.bind(this)) - this.bootstrapNodes = [].concat(opts.bootstrap || []).map(parsePeer) - - this.socket.on('listening', this.emit.bind(this, 'listening')) - this.socket.on('close', this.emit.bind(this, 'close')) - this.socket.on('error', this._onsocketerror.bind(this)) - - const queryId = this.ephemeral ? null : this.id - const io = new IO(this.socket, queryId, this) - - this._io = io - this._commands = new Map() - this._tick = 0 - this._tickInterval = setInterval(this._ontick.bind(this), 5000) - this._initialNodes = false - - process.nextTick(this.bootstrap.bind(this)) +class Request { + constructor (dht, m) { + this.rpc = dht.rpc + this.dht = dht + this.tid = m.tid + this.from = m.from + this.to = m.to + this.nodeId = m.nodeId + this.target = m.target + this.closerNodes = m.closerNodes + this.status = m.status + this.token = m.token + this.command = m.command + this.value = m.value } - _onsocketerror (err) { - if (err.code === 'EADDRINUSE' || err.code === 'EPERM' || err.code === 'EACCES') this.emit('error', err) - else this.emit('warning', err) + error (code) { + this.dht._reply(this.rpc, this.tid, this.target, code, null, false, this.from) } - _ontick () { - this._tick++ - if ((this._tick & 7) === 0) this._pingSome() - if ((this._tick & 63) === 0 && this.nodes.length < 20) this.bootstrap() + reply (value, token = false) { + this.dht._reply(this.rpc, this.tid, this.target, 0, value, token, this.from) } +} - address () { - return this.socket.address() - } +module.exports = class DHT extends EventEmitter { + constructor (opts = {}) { + super() - command (name, opts) { - this._commands.set(name, { - inputEncoding: codecs(opts.inputEncoding || opts.valueEncoding), - outputEncoding: codecs(opts.outputEncoding || opts.valueEncoding), - query: opts.query || queryNotSupported, - update: opts.update || updateNotSupported + this.bootstrapNodes = (opts.bootstrapNodes || []).map(parseNode) + this.keyPair = opts.keyPair || createKeyPair(opts.seed) + this.nodes = new TOS() + this.table = new Table(this.keyPair.publicKey) + this.rpc = new RPC({ + socket: opts.socket, + onwarning: opts.onwarning, + onrequest: this._onrequest.bind(this), + onresponse: this._onresponse.bind(this) }) - } - - ready (onready) { - if (!this.bootstrapped) this.once('ready', onready) - else onready() - } - onrequest (type, message, peer) { - if (validateId(message.id)) { - this._addNode(message.id, peer, null, message.to) - } + this.bootstrapped = false + this.concurrency = opts.concurrency || 16 + this.persistent = opts.ephemeral ? false : true - switch (message.command) { - case '_ping': - return this._onping(message, peer) + this._repinging = 0 + this._reping = new FIFO(128) + this._bootstrapping = this.bootstrap() + this._secrets = [randomBytes(32), randomBytes(32)] + this._tick = (Math.random() * 1024) | 0 // random offset it + this._refreshTick = REFRESH_TICKS + this._tickInterval = setInterval(this._ontick.bind(this), TICK_INTERVAL) - case '_find_node': - return this._onfindnode(message, peer) + this.table.on('row', (row) => row.on('full', (node) => this._onfullrow(node, row))) + } - case '_holepunch': - return this._onholepunch(message, peer) + static OK = 0 + static UNKNOWN_COMMAND = 1 + static BAD_TOKEN = 2 - default: - return this._oncommand(type, message, peer) - } + static createRPCSocket (opts) { + return new RPC(opts) } - _onping (message, peer) { - if (message.value && !this.id.equals(message.value)) return - this._io.response(message, peers.encode([peer]), null, peer) + static keyPair (seed) { + return createKeyPair(seed) } - _onholepunch (message, peer) { - const value = decodeHolepunch(message.value) - if (!value) return - - if (value.to) { - const to = decodePeer(value.to) - if (!to || samePeer(to, peer)) return - message.version = IO.VERSION - message.id = this._io.id - message.to = peers.encode([to]) - message.value = Holepunch.encode({ from: peers.encode([peer]) }) - this.emit('holepunch', peer, to) - this._io.send(Message.encode(message), to) - return - } - - if (value.from) { - const from = decodePeer(value.from) - if (from) peer = from - } - - this._io.response(message, null, null, peer) + ready () { + return this._bootstrapping } - _onfindnode (message, peer) { - if (!validateId(message.target)) return - - const closerNodes = nodes.encode(this.bucket.closest(message.target, 20)) - this._io.response(message, null, closerNodes, peer) + query (target, command, value, opts) { + this._refreshTick = this._tick + REFRESH_TICKS + return new Query(this, target, command, value, opts) } - _oncommand (type, message, peer) { - if (!message.target) return + ping (node) { + return this.request(null, 'ping', null, node) + } - const self = this - const cmd = this._commands.get(message.command) + request (target, command, value, to) { + return this.rpc.request({ + version: 1, + tid: 0, + from: null, + to, + token: to.token || null, + nodeId: this.persistent ? this.table.id : null, + target, + closerNodes: null, + command, + status: 0, + value + }) + } - if (!cmd) return reply(UNSUPPORTED_COMMAND) + requestAll (target, command, value, nodes, opts = {}) { + if (nodes instanceof Table) nodes = nodes.closest(nodes.id) + if (nodes instanceof Query) nodes = nodes.table.closest(nodes.table.id) + if (nodes.length === 0) return Promise.resolve([]) - let value = null - try { - value = message.value && cmd.inputEncoding.decode(message.value) - } catch (_) { - return - } + const p = [] + for (const node of nodes) p.push(this.request(target, command, value, node)) - const query = { - type, - command: message.command, - node: peer, - target: message.target, - value - } + let errors = 0 + const results = [] + const min = typeof opts.min === 'number' ? opts.min : 1 + const max = typeof opts.max === 'number' ? opts.max : p.length - if (type === IO.UPDATE) cmd.update(query, reply) - else cmd.query(query, reply) + return new Promise((resolve, reject) => { + for (let i = 0; i < p.length; i++) p[i].then(ondone, onerror) - function reply (err, value) { - const closerNodes = nodes.encode(self.bucket.closest(message.target, 20)) - if (err) { - return self._io.error(message, err, closerNodes, peer, value && cmd.outputEncoding.encode(value)) + function ondone (res) { + if (results.length < max) results.push(res) + if (results.length >= max) return resolve(results) + if (results.length + errors === p.length) return resolve(results) } - self._io.response(message, value && cmd.outputEncoding.encode(value), closerNodes, peer) - } - } - - onresponse (message, peer) { - if (validateId(message.id)) { - this._addNode(message.id, peer, message.roundtripToken, message.to) - } - } - onbadid (peer) { - this._removeNode(peer) - } - - holepunch (peer, cb) { - if (!peer.referrer) throw new Error('peer.referrer is required') - this._io.query('_holepunch', null, null, peer, cb) + function onerror (err) { + if ((p.length - ++errors) < min) reject(new Error('Too many requests failed')) + } + }) } destroy () { - if (this.destroyed) return - this.destroyed = true - this._io.destroy() + this.rpc.destroy() clearInterval(this._tickInterval) } - ping (peer, cb) { - this._io.query('_ping', null, peer.id, peer, function (err, res) { - if (err) return cb(err) - if (res.error) return cb(new Error(res.error)) - const pong = decodePeer(res.to || res.value) // res.value will be deprecated - if (!pong) return cb(new Error('Invalid pong')) - cb(null, pong) + async bootstrap () { + return new Promise((resolve) => { + this._backgroundQuery(this.table.id, 'find_node', null) + .on('close', () => { + if (!this.bootstrapped) { + this.bootstrapped = true + this.emit('ready') + } + resolve() + }) }) } - _tally (onlyIp) { - const sum = new Map() - var result = null - var node = this.nodes.latest - var cnt = 0 - var good = 0 - - for (; node && cnt < 10; node = node.prev) { - if (!node.to || node.to.length !== 6) continue - const to = onlyIp ? node.to.toString('hex').slice(0, 8) + '0000' : node.to.toString('hex') - const hits = 1 + (sum.get(to) || 0) - if (hits > good) { - good = hits - result = node.to - } - sum.set(to, hits) - cnt++ - } - - // We want at least 3 samples all with the same ip:port from - // different remotes (the to field) to be consider it consistent - // If we get >=3 samples with conflicting info we are not (or under attack) (Subject for tweaking) + _backgroundQuery (target, command, value) { + const backgroundCon = Math.min(this.concurrency, Math.max(2, (this.concurrency / 8) | 0)) + const q = this.query(target, command, value, { + concurrency: backgroundCon + }) - const bad = cnt - good - return bad < 3 && good >= 3 ? result : null - } + q.on('data', () => { + // yield to other traffic + q.concurrency = this.rpc.inflightRequests < 3 + ? this.concurrency + : backgroundCon + }) - remoteAddress () { - const both = this._tally(false) - if (both) return peers.decode(both)[0] - const onlyIp = this._tally(true) - if (onlyIp) return peers.decode(onlyIp)[0] - return null + return q } - holepunchable () { - return this._tally(false) !== null + refresh () { + const node = this.table.random() + this._backgroundQuery(node ? node.id : this.table.id, 'find_node', null) } - _addNode (id, peer, token, to) { - if (id.equals(this.id)) return - - var node = this.bucket.get(id) - const fresh = !node + _pingSome () { + let cnt = this.rpc.inflightRequests > 2 ? 3 : 5 + let oldest = this.nodes.oldest - if (!node) node = {} + // tiny dht, ping the bootstrap again + if (!oldest) { + this.refresh() + return + } - node.id = id - node.port = peer.port - node.host = peer.host - if (token) node.roundtripToken = token - node.tick = this._tick - if (to) node.to = to + // we've recently pinged the oldest one, so only trigger a couple of repings + if ((this._tick - oldest.seen) < RECENT_NODE) { + cnt = 2 + } - if (!fresh) this.nodes.remove(node) - this.bucket.add(node) - if (this.bucket.get(node.id) !== node) return // in a ping - this.nodes.add(node) - if (fresh) { - this.emit('add-node', node) - if (!this._initialNodes && this.nodes.length >= 5) { - this._initialNodes = true - this.emit('initial-nodes') - } + while (cnt--) { + if (!oldest || this._tick === oldest.seen) continue + this._check(oldest) + oldest = oldest.next } } - _removeNode (node) { - if (!this.nodes.has(node)) return - this.nodes.remove(node) - this.bucket.remove(node.id) - this.emit('remove-node', node) + _check (node) { + this.ping(node).catch(() => this._removeNode(node)) } _token (peer, i) { const out = Buffer.allocUnsafe(32) - blake2b.batch(out, [ - this._secrets[i], - Buffer.from(peer.host) - ]) + sodium.crypto_generichash(out, Buffer.from(peer.host), this._secrets[i]) return out } - _onnodeping (oldContacts, newContact) { - // if bootstrapping, we've recently pinged all nodes + _ontick () { + // rotate secrets + const tmp = this._secrets[0] + this._secrets[0] = this._secrets[1] + this._secrets[1] = tmp + sodium.randombytes_buf(tmp) + if (!this.bootstrapped) return - const reping = [] + this._tick++ + if ((this._tick & 7) === 0) this._pingSome() + if (((this._tick & 63) === 0 && this.nodes.length < 20) || this._tick === this._refreshTick) this.refresh() + } - for (var i = 0; i < oldContacts.length; i++) { - const old = oldContacts[i] + _onfullrow (newNode, row) { + if (this.bootstrapped && this._reping.push({ newNode, row })) this._repingMaybe() + } - // check if we recently talked to this peer ... - if (this._tick === old.tick && this.nodes.has(oldContacts[i])) { - this.bucket.add(oldContacts[i]) - continue - } + _repingMaybe () { + while (this._repinging < 3 && this._reping.isEmpty() === false) { + const { newNode, row } = this._reping.shift() + if (this.table.get(newNode.id)) continue - reping.push(old) - } + let oldest = null + for (const node of row.nodes) { + if (node.seen === this._tick) continue + if (oldest === null || oldest.seen > node.seen || (oldest.seen === node.seen && oldest.added > node.added)) oldest = node + } - if (reping.length) this._reping(reping, newContact) - } + if (oldest === null) continue + if ((this._tick - oldest.seen) < RECENT_NODE && (this._tick - oldest.added) > OLD_NODE) continue - _check (node) { - const self = this - this.ping(node, function (err) { - if (err) { - self._removeNode(node) - } - }) + this._repingAndSwap(newNode, oldest) + } } - _reping (oldContacts, newContact) { + _repingAndSwap (newNode, oldNode) { const self = this - ping() + this._repinging++ + this.ping(oldNode).then(onsuccess, onswap) - function ping () { - const next = oldContacts.shift() - if (!next) return - self._io.queryImmediately('_ping', null, next.id, next, afterPing) + function onsuccess () { + self._repinging-- + self._repingMaybe() } - function afterPing (err, res, node) { - if (!err) return ping() - self._removeNode(node) - self._addNode(newContact.id, newContact, newContact.roundtripToken || null, newContact.to || null) + function onswap () { + self._repinging-- + self._repingMaybe() + self._removeNode(oldNode) + self._addNode(newNode) } } - _pingSome () { - var cnt = this.inflightQueries > 2 ? 3 : 5 - var oldest = this.nodes.oldest - // tiny dht, ping the bootstrap again - if (!oldest) return this.bootstrap() + _resolveBootstrapNodes (cb) { + if (!this.bootstrapNodes.length) return cb([]) - while (cnt--) { - if (!oldest || this._tick === oldest.tick) continue - this._check(oldest) - oldest = oldest.next + let missing = this.bootstrapNodes.length + const nodes = [] + + for (const node of this.bootstrapNodes) { + dns.lookup(node.host, (_, host) => { + if (host) nodes.push({ id: node.id || null, host, port: node.port }) + if (--missing === 0) cb(nodes) + }) } } - query (command, target, value, cb) { - if (typeof value === 'function') return this.query(command, target, null, value) - return collect(this.runCommand(command, target, value, { query: true, update: false }), cb) - } + _addNode (node) { + if (this.nodes.has(node)) return - update (command, target, value, cb) { - if (typeof value === 'function') return this.update(command, target, null, value) - return collect(this.runCommand(command, target, value, { query: false, update: true }), cb) - } + node.added = node.seen = this._tick - queryAndUpdate (command, target, value, cb) { - if (typeof value === 'function') return this.queryAndUpdate(command, target, null, value) - return collect(this.runCommand(command, target, value, { query: true, update: true }), cb) - } + this.nodes.add(node) + this.table.add(node) - runCommand (command, target, value, opts) { - return new QueryStream(this, command, target, value, opts) + this.emit('add-node', node) } - listen (port, addr, cb) { - if (typeof port === 'function') return this.listen(0, null, port) - if (typeof addr === 'function') return this.listen(port, null, addr) - if (cb) this.once('listening', cb) - this.socket.bind(port, addr) - } + _removeNode (node) { + if (!this.nodes.has(node)) return - bootstrap (cb) { - const self = this - const backgroundCon = Math.min(this.concurrency, Math.max(2, Math.floor(this.concurrency / 8))) + this.nodes.remove(node) + this.table.remove(node.id) - if (!this.bootstrapNodes.length) return process.nextTick(done) + this.emit('remove-node', node) + } - const qs = this.query('_find_node', this.id) + _addNodeFromMessage (m) { + const oldNode = this.table.get(m.nodeId) - qs.on('data', update) - qs.on('error', onerror) - qs.on('end', done) + if (oldNode) { + if (oldNode.port === m.from.port && oldNode.host === m.from.host) { + // refresh it + oldNode.seen = this._tick + this.nodes.add(oldNode) + } + return + } - update() + this._addNode({ + id: m.nodeId, + port: m.from.port, + host: m.from.host, + token: null, + added: this._tick, + seen: this._tick, + prev: null, + next: null, + }) + } - function onerror (err) { - if (cb) cb(err) - } + _onrequest (req) { + if (req.nodeId !== null) this._addNodeFromMessage(req) - function done () { - if (!self.bootstrapped) { - self.bootstrapped = true - self.emit('ready') + if (req.token !== null) { + if (!req.token.equals(this._token(req.from, 1)) && !req.token.equals(this._token(req.from, 0))) { + req.token = null } - if (cb) cb() } - function update () { - qs._concurrency = self.inflightQueries === 1 ? self.concurrency : backgroundCon + // empty reply back + if (req.command === 'ping') { + this._reply(this.rpc, req.tid, null, 0, null, false, req.from) + return } - } - persistent (cb) { - this._io.id = this.id - this.bootstrap((err) => { - if (err) { - if (cb) cb(err) - return - } - this.ephemeral = false - if (cb) cb() - }) - } + if (req.command === 'find_node') { + this._reply(this.rpc, req.tid, req.target, 0, null, false, req.from) + return + } - getNodes () { - return this.nodes.toArray().map(({ id, host, port }) => ({ id, host, port })) + if (this.emit('request', new Request(this, req)) === false) { + this._reply(this.rpc, req.tid, req.target, 1, null, false, req.from) + } } - addNodes (nodes) { - for (const { id, host, port } of nodes) this._addNode(id, { host, port }) + _onresponse (res) { + if (res.nodeId !== null) this._addNodeFromMessage(res) } -} - -exports.id = () => randomBytes(32) -exports.QUERY = DHT.QUERY = IO.QUERY -exports.UPDATE = DHT.UPDATE = IO.UPDATE -exports.DHT = DHT - -function validateId (id) { - return id && id.length === 32 -} -function randomBytes (n) { - const buf = Buffer.allocUnsafe(n) - sodium.randombytes_buf(buf) - return buf -} - -function decodeHolepunch (buf) { - try { - return Holepunch.decode(buf) - } catch (err) { - return null + bind (...args) { + return this.rpc.bind(...args) } -} -function decodePeer (buf) { - try { - const p = peers.decode(buf)[0] - if (!p) throw new Error('No peer in buffer') - return p - } catch (err) { - return null + _reply (rpc, tid, target, status, value, token, to) { + const closerNodes = target ? this.table.closest(target) : null + const persistent = this.persistent && rpc === this.rpc + + rpc.send({ + version: 1, + tid, + from: null, + to, + token: token ? this._token(to, 1) : null, + nodeId: persistent ? this.table.id : null, + target: null, + closerNodes, + command: null, + status, + value + }) } } -function parsePeer (peer) { - if (typeof peer === 'object' && peer) return peer - if (typeof peer === 'number') return parsePeer(':' + peer) - if (peer[0] === ':') return parsePeer('127.0.0.1' + peer) +function parseNode (s) { + if (typeof s === 'object') return s + const [_, id, host, port] = s.match(/([a-f0-9]{64}@)?([^:@]+)(:\d+)?$/i) + if (!port) throw new Error('Node format is id@?host:port') - const parts = peer.split(':') return { - host: parts[0], - port: parseInt(parts[1], 10) + id: id ? Buffer.from(id.slice(0, -1), 'hex') : null, + host, + port } } -function samePeer (a, b) { - return a.port === b.port && a.host === b.host -} - -function updateNotSupported (query, cb) { - cb(new Error('Update not supported')) +function randomBytes (n) { + const b = Buffer.alloc(n) + sodium.randombytes_buf(b) + return b } -function queryNotSupported (query, cb) { - cb(null, null) -} +function noop () {} diff --git a/lib/io.js b/lib/io.js deleted file mode 100644 index 2395522..0000000 --- a/lib/io.js +++ /dev/null @@ -1,359 +0,0 @@ -const { Message, Holepunch, TYPE } = require('./messages') -const blake2b = require('blake2b-universal') -const sodium = require('sodium-native') -const peers = require('ipv4-peers') -const speedometer = require('speedometer') - -const VERSION = 1 -const QUERY = Symbol('QUERY') -const UPDATE = Symbol('UPDATE') - -const ECANCELLED = new Error('Request cancelled') -const ETIMEDOUT = new Error('Request timed out') - -ETIMEDOUT.code = 'ETIMEDOUT' -ECANCELLED.code = 'ECANCELLED' - -const TRIES = 3 - -class IO { - constructor (socket, id, ctx) { - this.id = id - this.socket = socket - this.inflight = [] - - this._ctx = ctx - this._rid = (Math.random() * 65536) | 0 - this._requests = new Array(65536) - this._pending = [] - this._secrets = [randomBytes(32), randomBytes(32)] - this._ticking = false - this._tickInterval = setInterval(this._ontick.bind(this), 750) - this._rotateInterval = setInterval(this._onrotate.bind(this), 300000) - this._speed = speedometer() - - socket.on('message', this._onmessage.bind(this)) - } - - _token (peer, i) { - const out = Buffer.allocUnsafe(32) - blake2b.batch(out, [ - this._secrets[i], - Buffer.from(peer.host) - ]) - return out - } - - _free () { - const rid = this._rid++ - if (this._rid === 65536) this._rid = 0 - return rid - } - - /* - - R - / \ - A B - - A sent a message to B that failed - - It could be that the message got dropped - or that it needs holepunching - - To retry - - resend(req, A -> B) - fire_and_forget({ _holepunch, to: B }, A -> R) - - R.onholepunch { to: B } => fire_and_forget({ _holepunch, from: A }, R -> B) - B.onholepunch { from: A } => fire_and_forget({ _holepunch }, B -> A) - - A and B is now holepunched and the session has been retried as well - - */ - - _holepunch (req) { - const rid = req.message.command === '_holepunch' - ? req.rid - : this._free() - - const punch = { - version: VERSION, - type: TYPE.QUERY, - to: encodeIP(req.peer.referrer), - id: req.message.id, - rid, - command: '_holepunch', - value: Holepunch.encode({ - to: peers.encode([req.peer]) - }) - } - - this.send(Message.encode(punch), req.peer.referrer) - } - - _retry (req) { - req.timeout = 4 - this.send(req.buffer, req.peer) - // if referrer is avail, try holepunching automatically - if (req.peer.referrer) this._holepunch(req) - } - - _onmessage (buf, rinfo) { - if (!rinfo.port) return - const message = decodeMessage(buf) - if (!message) return - if (message.id && message.id.length !== 32) return - // Force eph if older version - if (message.id && !(message.version >= VERSION)) message.id = null - - const peer = { port: rinfo.port, host: rinfo.address } - - switch (message.type) { - case TYPE.RESPONSE: { - this._ctx.onresponse(message, peer) - this._finish(message.rid, null, message, peer) - break - } - - case TYPE.QUERY: { - this._ctx.onrequest(QUERY, message, peer) - break - } - - case TYPE.UPDATE: { - const rt = message.roundtripToken - if (!rt || (!rt.equals(this._token(peer, 0)) && !rt.equals(this._token(peer, 1)))) return - this._ctx.onrequest(UPDATE, message, peer) - break - } - } - } - - _saturated () { - return this._speed(0) >= this._ctx.concurrencyRPS && this.inflight.length >= this._ctx.concurrency - } - - _finish (rid, err, val, peer) { - const req = this._requests[rid] - if (!req) return - if (req.holepunch) clearTimeout(req.holepunch) - - this._requests[rid] = undefined - const top = this.inflight[this.inflight.length - 1] - this.inflight[top.index = req.index] = top - this.inflight.pop() - - if (val && req.peer.id) { - if (!val.id || val.id.length !== 32 || !val.id.equals(req.peer.id)) { - this._ctx.onbadid(req.peer) - } - } - - const type = req.message.type === TYPE.QUERY - ? QUERY - : UPDATE - - req.callback(err, val, peer, req.message, req.peer, type) - - while (this._pending.length && !this._saturated()) { - const { message, peer, callback } = this._pending.shift() - this._requestImmediately(message, peer, callback) - } - } - - _request (message, peer, callback) { - // Should we wait to send? - if (this._pending.length || this._saturated()) { - this._pending.push({ message, peer, callback }) - } else { - this._requestImmediately(message, peer, callback) - } - } - - _requestImmediately (message, peer, callback) { - const rid = message.rid = this._free() - const buffer = Message.encode(message) - - this._speed(1) - - const req = { - rid, - index: this.inflight.length, - callback, - message, - buffer, - peer, - timeout: this._ticking ? 5 : 4, // if ticking this will be decremented after this fun call - tries: 0, - holepunch: null - } - - if (req.peer.referrer && !req.peer.fastHolepunch) { - req.peer.fastHolepunch = true - req.holepunch = setTimeout(holepunchNT, 500, this, req) - } - - this._requests[rid] = req - this.inflight.push(req) - this.send(buffer, peer) - - // if sending a holepunch cmd, forward it right away - if (message.command === '_holepunch') this._holepunch(req) - } - - _cancel (rid, err, peer) { - this._finish(rid, err || ECANCELLED, null, peer) - } - - _onrotate () { - this._secrets[1] = this._secrets[0] - this._secrets[0] = randomBytes(32) - } - - _ontick () { - this._ticking = true - - for (var i = 0; i < this.inflight.length; i++) { - const req = this.inflight[i] - - if (req.timeout === 2 && ++req.tries < TRIES) { - if (this._saturated()) req.tries-- - else this._retry(req) - continue - } - - if (--req.timeout) { - continue - } - - this._cancel(req.rid, ETIMEDOUT, req.peer) - i-- // the cancel removes the entry so we need to dec i - } - - this._ticking = false - } - - send (buffer, peer) { - if (this._ctx.destroyed) return - this.socket.send(buffer, 0, buffer.length, peer.port, peer.host) - } - - destroy () { - clearInterval(this._rotateInterval) - clearInterval(this._tickInterval) - - this.socket.close() - - const pending = this._pending - this._pending = [] - - for (const req of pending) req.callback(ECANCELLED, null, req.peer) - for (const req of this.inflight) this._cancel(req.rid, null, req.peer) - } - - response (request, value, closerNodes, peer) { - const message = { - version: VERSION, - type: TYPE.RESPONSE, - rid: request.rid, - to: peers.encode([peer]), - id: this.id, - closerNodes, - roundtripToken: this._token(peer, 0), - value - } - this.send(Message.encode(message), peer) - } - - error (request, error, closerNodes, peer, value) { - const message = { - version: VERSION, - type: TYPE.RESPONSE, - rid: request.rid, - to: peers.encode([peer]), - id: this.id, - closerNodes, - error: error.message, - value - } - this.send(Message.encode(message), peer) - } - - query (command, target, value, peer, callback) { - if (!callback) callback = noop - - this._request({ - version: VERSION, - type: TYPE.QUERY, - rid: 0, - to: encodeIP(peer), - id: this.id, - target, - command, - value - }, peer, callback) - } - - queryImmediately (command, target, value, peer, callback) { - if (!callback) callback = noop - - this._requestImmediately({ - version: VERSION, - type: TYPE.QUERY, - rid: 0, - to: encodeIP(peer), - id: this.id, - target, - command, - value - }, peer, callback) - } - - update (command, target, value, peer, callback) { - if (!callback) callback = noop - - this._request({ - version: VERSION, - type: TYPE.UPDATE, - rid: 0, - to: encodeIP(peer), - id: this.id, - roundtripToken: peer.roundtripToken, - target, - command, - value - }, peer, callback) - } -} - -IO.QUERY = QUERY -IO.UPDATE = UPDATE -IO.VERSION = VERSION - -module.exports = IO - -function noop () {} - -function holepunchNT (io, req) { - io._holepunch(req) -} - -function decodeMessage (buf) { - try { - return Message.decode(buf) - } catch (err) { - return null - } -} - -function randomBytes (n) { - const buf = Buffer.allocUnsafe(32) - sodium.randombytes_buf(buf) - return buf -} - -function encodeIP (peer) { - return /^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$/.test(peer.host) ? peers.encode([peer]) : null -} diff --git a/lib/key-pair.js b/lib/key-pair.js new file mode 100644 index 0000000..54891e3 --- /dev/null +++ b/lib/key-pair.js @@ -0,0 +1,11 @@ +const sodium = require('sodium-universal') + +module.exports = function createKeyPair (seed) { + const publicKey = Buffer.alloc(32) + const secretKey = Buffer.alloc(32) + + if (seed) sodium.crypto_kx_seed_keypair(publicKey, secretKey, seed) + else sodium.crypto_kx_keypair(publicKey, secretKey) + + return { publicKey, secretKey } +} diff --git a/lib/messages.js b/lib/messages.js index 15778d1..dabc407 100644 --- a/lib/messages.js +++ b/lib/messages.js @@ -1,303 +1,132 @@ -// This file is auto generated by the protocol-buffers compiler +const cenc = require('compact-encoding') -/* eslint-disable quotes */ -/* eslint-disable indent */ -/* eslint-disable no-redeclare */ -/* eslint-disable camelcase */ - -// Remember to `npm install --save protocol-buffers-encodings` -var encodings = require('protocol-buffers-encodings') -var varint = encodings.varint -var skip = encodings.skip - -exports.TYPE = { - QUERY: 1, - UPDATE: 2, - RESPONSE: 3 +const IPv4 = exports.IPv4 = { + preencode (state, ip) { + state.end += 4 + }, + encode (state, ip) { + const nums = ip.split('.') + state.buffer[state.start++] = Number(nums[0]) || 0 + state.buffer[state.start++] = Number(nums[1]) || 0 + state.buffer[state.start++] = Number(nums[2]) || 0 + state.buffer[state.start++] = Number(nums[3]) || 0 + }, + decode (state) { + if (state.end - state.start < 4) throw new Error('Out of bounds') + return state.buffer[state.start++] + '.' + state.buffer[state.start++] + '.' + state.buffer[state.start++] + '.' + state.buffer[state.start++] + } } -var Holepunch = exports.Holepunch = { - buffer: true, - encodingLength: null, - encode: null, - decode: null +const peerIPv4 = { + preencode (state, peer) { + state.end += 6 + }, + encode (state, peer) { + IPv4.encode(state, peer.host) + cenc.uint16.encode(state, peer.port) + }, + decode (state) { + return { + host: IPv4.decode(state), + port: cenc.uint16.decode(state) + } + } } -var Message = exports.Message = { - buffer: true, - encodingLength: null, - encode: null, - decode: null +const dhtPeerIPv4 = exports.dhtPeerIPv4 = { + preencode (state, peer) { + state.end += 6 + 32 + }, + encode (state, peer) { + cenc.fixed32.encode(state, peer.id) + IPv4.encode(state, peer.host) + cenc.uint16.encode(state, peer.port) + }, + decode (state) { + return { + id: cenc.fixed32.decode(state), + host: IPv4.decode(state), + port: cenc.uint16.decode(state) + } + } } -defineHolepunch() -defineMessage() +const dhtPeerIPv4Array = exports.dhtPeerIPv4Array = cenc.array(dhtPeerIPv4) -function defineHolepunch () { - Holepunch.encodingLength = encodingLength - Holepunch.encode = encode - Holepunch.decode = decode +/* eslint-disable no-multi-spaces */ - function encodingLength (obj) { - var length = 0 - if (defined(obj.from)) { - var len = encodings.bytes.encodingLength(obj.from) - length += 1 + len - } - if (defined(obj.to)) { - var len = encodings.bytes.encodingLength(obj.to) - length += 1 + len - } - return length - } +const TYPE = 0b0001 +const HAS_TOKEN = 0b0010 +const HAS_NODE_ID = 0b0100 +const HAS_TARGET = 0b1001 +const HAS_CLOSER_NODES = 0b1001 - function encode (obj, buf, offset) { - if (!offset) offset = 0 - if (!buf) buf = Buffer.allocUnsafe(encodingLength(obj)) - var oldOffset = offset - if (defined(obj.from)) { - buf[offset++] = 18 - encodings.bytes.encode(obj.from, buf, offset) - offset += encodings.bytes.encode.bytes - } - if (defined(obj.to)) { - buf[offset++] = 26 - encodings.bytes.encode(obj.to, buf, offset) - offset += encodings.bytes.encode.bytes - } - encode.bytes = offset - oldOffset - return buf - } +const RESPONSE = 0b0000 +const REQUEST = 0b0001 +const TOKEN = 0b0010 +const NODE_ID = 0b0100 +const TARGET = 0b1000 | REQUEST +const CLOSER_NODES = 0b1000 | RESPONSE - function decode (buf, offset, end) { - if (!offset) offset = 0 - if (!end) end = buf.length - if (!(end <= buf.length && offset <= buf.length)) throw new Error("Decoded message is not valid") - var oldOffset = offset - var obj = { - from: null, - to: null - } - while (true) { - if (end <= offset) { - decode.bytes = offset - oldOffset - return obj - } - var prefix = varint.decode(buf, offset) - offset += varint.decode.bytes - var tag = prefix >> 3 - switch (tag) { - case 2: - obj.from = encodings.bytes.decode(buf, offset) - offset += encodings.bytes.decode.bytes - break - case 3: - obj.to = encodings.bytes.decode(buf, offset) - offset += encodings.bytes.decode.bytes - break - default: - offset = skip(prefix & 7, buf, offset) - } - } - } -} +exports.message = { + preencode (state, m) { + state.end += 1 // version + state.end += 1 // flags + state.end += 2 // tid + state.end += 6 // to -function defineMessage () { - Message.encodingLength = encodingLength - Message.encode = encode - Message.decode = decode + if (m.token) state.end += 32 + if (m.nodeId) state.end += 32 + if (m.target) state.end += 32 + if (m.closerNodes && m.closerNodes.length) dhtPeerIPv4Array.preencode(state, m.closerNodes) + if (m.command) cenc.string.preencode(state, m.command) + else cenc.uint.preencode(state, m.status) - function encodingLength (obj) { - var length = 0 - if (defined(obj.version)) { - var len = encodings.varint.encodingLength(obj.version) - length += 1 + len - } - if (!defined(obj.type)) throw new Error("type is required") - var len = encodings.enum.encodingLength(obj.type) - length += 1 + len - if (!defined(obj.rid)) throw new Error("rid is required") - var len = encodings.varint.encodingLength(obj.rid) - length += 1 + len - if (defined(obj.to)) { - var len = encodings.bytes.encodingLength(obj.to) - length += 1 + len - } - if (defined(obj.id)) { - var len = encodings.bytes.encodingLength(obj.id) - length += 1 + len - } - if (defined(obj.target)) { - var len = encodings.bytes.encodingLength(obj.target) - length += 1 + len - } - if (defined(obj.closerNodes)) { - var len = encodings.bytes.encodingLength(obj.closerNodes) - length += 1 + len - } - if (defined(obj.roundtripToken)) { - var len = encodings.bytes.encodingLength(obj.roundtripToken) - length += 1 + len - } - if (defined(obj.command)) { - var len = encodings.string.encodingLength(obj.command) - length += 1 + len - } - if (defined(obj.error)) { - var len = encodings.string.encodingLength(obj.error) - length += 1 + len - } - if (defined(obj.value)) { - var len = encodings.bytes.encodingLength(obj.value) - length += 1 + len - } - return length - } + cenc.buffer.preencode(state, m.value) + }, + encode (state, m) { + const closerNodes = m.closerNodes || [] + const flags = (m.token ? HAS_TOKEN : 0) | + (m.nodeId ? NODE_ID : 0) | + (m.target ? TARGET : 0) | + (closerNodes.length ? CLOSER_NODES : 0) | + (m.command ? REQUEST : 0) - function encode (obj, buf, offset) { - if (!offset) offset = 0 - if (!buf) buf = Buffer.allocUnsafe(encodingLength(obj)) - var oldOffset = offset - if (defined(obj.version)) { - buf[offset++] = 88 - encodings.varint.encode(obj.version, buf, offset) - offset += encodings.varint.encode.bytes - } - if (!defined(obj.type)) throw new Error("type is required") - buf[offset++] = 8 - encodings.enum.encode(obj.type, buf, offset) - offset += encodings.enum.encode.bytes - if (!defined(obj.rid)) throw new Error("rid is required") - buf[offset++] = 16 - encodings.varint.encode(obj.rid, buf, offset) - offset += encodings.varint.encode.bytes - if (defined(obj.to)) { - buf[offset++] = 82 - encodings.bytes.encode(obj.to, buf, offset) - offset += encodings.bytes.encode.bytes - } - if (defined(obj.id)) { - buf[offset++] = 26 - encodings.bytes.encode(obj.id, buf, offset) - offset += encodings.bytes.encode.bytes - } - if (defined(obj.target)) { - buf[offset++] = 34 - encodings.bytes.encode(obj.target, buf, offset) - offset += encodings.bytes.encode.bytes - } - if (defined(obj.closerNodes)) { - buf[offset++] = 42 - encodings.bytes.encode(obj.closerNodes, buf, offset) - offset += encodings.bytes.encode.bytes - } - if (defined(obj.roundtripToken)) { - buf[offset++] = 50 - encodings.bytes.encode(obj.roundtripToken, buf, offset) - offset += encodings.bytes.encode.bytes - } - if (defined(obj.command)) { - buf[offset++] = 58 - encodings.string.encode(obj.command, buf, offset) - offset += encodings.string.encode.bytes - } - if (defined(obj.error)) { - buf[offset++] = 66 - encodings.string.encode(obj.error, buf, offset) - offset += encodings.string.encode.bytes - } - if (defined(obj.value)) { - buf[offset++] = 74 - encodings.bytes.encode(obj.value, buf, offset) - offset += encodings.bytes.encode.bytes - } - encode.bytes = offset - oldOffset - return buf - } + state.buffer[state.start++] = 1 + state.buffer[state.start++] = flags + cenc.uint16.encode(state, m.tid) + peerIPv4.encode(state, m.to) + + if ((flags & HAS_TOKEN) === TOKEN) cenc.fixed32.encode(state, m.token) + if ((flags & HAS_NODE_ID) === NODE_ID) cenc.fixed32.encode(state, m.nodeId) + if ((flags & HAS_TARGET) === TARGET) cenc.fixed32.encode(state, m.target) + if ((flags & HAS_CLOSER_NODES) === CLOSER_NODES) dhtPeerIPv4Array.encode(state, closerNodes) + if ((flags & TYPE) === REQUEST) cenc.string.encode(state, m.command) + if ((flags & TYPE) === RESPONSE) cenc.uint.encode(state, m.status) - function decode (buf, offset, end) { - if (!offset) offset = 0 - if (!end) end = buf.length - if (!(end <= buf.length && offset <= buf.length)) throw new Error("Decoded message is not valid") - var oldOffset = offset - var obj = { - version: 0, - type: 1, - rid: 0, - to: null, - id: null, - target: null, - closerNodes: null, - roundtripToken: null, - command: "", - error: "", - value: null + cenc.buffer.encode(state, m.value) + }, + decode (state) { + const version = state.buffer[state.start++] + + if (version !== 1) { + throw new Error('Incompatible version') } - var found1 = false - var found2 = false - while (true) { - if (end <= offset) { - if (!found1 || !found2) throw new Error("Decoded message is not valid") - decode.bytes = offset - oldOffset - return obj - } - var prefix = varint.decode(buf, offset) - offset += varint.decode.bytes - var tag = prefix >> 3 - switch (tag) { - case 11: - obj.version = encodings.varint.decode(buf, offset) - offset += encodings.varint.decode.bytes - break - case 1: - obj.type = encodings.enum.decode(buf, offset) - offset += encodings.enum.decode.bytes - found1 = true - break - case 2: - obj.rid = encodings.varint.decode(buf, offset) - offset += encodings.varint.decode.bytes - found2 = true - break - case 10: - obj.to = encodings.bytes.decode(buf, offset) - offset += encodings.bytes.decode.bytes - break - case 3: - obj.id = encodings.bytes.decode(buf, offset) - offset += encodings.bytes.decode.bytes - break - case 4: - obj.target = encodings.bytes.decode(buf, offset) - offset += encodings.bytes.decode.bytes - break - case 5: - obj.closerNodes = encodings.bytes.decode(buf, offset) - offset += encodings.bytes.decode.bytes - break - case 6: - obj.roundtripToken = encodings.bytes.decode(buf, offset) - offset += encodings.bytes.decode.bytes - break - case 7: - obj.command = encodings.string.decode(buf, offset) - offset += encodings.string.decode.bytes - break - case 8: - obj.error = encodings.string.decode(buf, offset) - offset += encodings.string.decode.bytes - break - case 9: - obj.value = encodings.bytes.decode(buf, offset) - offset += encodings.bytes.decode.bytes - break - default: - offset = skip(prefix & 7, buf, offset) - } + + const flags = cenc.uint.decode(state) + + return { + version: 1, + tid: cenc.uint16.decode(state), + from: null, // populated in caller + to: peerIPv4.decode(state), + token: ((flags & HAS_TOKEN) === TOKEN) ? cenc.fixed32.decode(state) : null, + nodeId: ((flags & HAS_NODE_ID) === NODE_ID) ? cenc.fixed32.decode(state) : null, + target: ((flags & HAS_TARGET) === TARGET) ? cenc.fixed32.decode(state) : null, + closerNodes: ((flags & HAS_CLOSER_NODES) === CLOSER_NODES) ? dhtPeerIPv4Array.decode(state) : null, + command: ((flags & TYPE) === REQUEST) ? cenc.string.decode(state) : null, + status: ((flags & TYPE) === RESPONSE) ? cenc.uint.decode(state) : 0, + value: cenc.buffer.decode(state) } } } - -function defined (val) { - return val !== null && val !== undefined && (typeof val !== 'number' || !isNaN(val)) -} diff --git a/lib/query-stream.js b/lib/query-stream.js deleted file mode 100644 index 8c7919e..0000000 --- a/lib/query-stream.js +++ /dev/null @@ -1,269 +0,0 @@ -const { Readable } = require('stream') -const peers = require('ipv4-peers') -const nodes = peers.idLength(32) -const QueryTable = require('./query-table') - -const BOOTSTRAPPING = Symbol('BOOTSTRAPPING') -const MOVING_CLOSER = Symbol('MOVING_CLOSER') -const UPDATING = Symbol('UPDATING') -const FINALIZED = Symbol('FINALIZED') - -class QueryStream extends Readable { - constructor (node, command, target, value, opts) { - if (!opts) opts = {} - if (!opts.concurrency) opts.concurrency = opts.highWaterMark || node.concurrency - - super({ - objectMode: true - }) - - const cmd = node._commands.get(command) - - this.command = command - this.target = target - this.value = (cmd && value) ? cmd.inputEncoding.encode(value) : (value || null) - this.update = !!opts.update - this.query = !!opts.query || !opts.update - this.destroyed = false - this.inflight = 0 - this.responses = 0 - this.errors = 0 - this.updates = 0 - this.table = opts.table || new QueryTable(node.id, target) - - node.inflightQueries++ - - this._status = opts.table ? MOVING_CLOSER : BOOTSTRAPPING - this._node = node - this._concurrency = opts.concurrency - this._callback = this._onresponse.bind(this) - this._map = identity - this._outputEncoding = cmd ? cmd.outputEncoding : null - } - - map (fn) { - this._map = fn - return this - } - - _onresponse (err, message, peer, request, to, type) { - this.inflight-- - if (err && to && to.id) { - // Request, including retries, failed completely - // Remove the "to" node. - const node = this._node.bucket.get(to.id) - if (node) this._node._removeNode(node) - } - - if (this._status === FINALIZED) { - if (!this.inflight) { - if (this.destroyed) this.emit('close') - else this.destroy() - } - return - } - - if (err) { - this.errors++ - this.emit('warning', err) - this._readMaybe() - return - } - - this.responses++ - this.emit('response') - this.table.addVerified(message, peer) - - if (this._status === MOVING_CLOSER) { - const candidates = decodeNodes(message.closerNodes) - for (var i = 0; i < candidates.length; i++) { - this.table.addUnverified(candidates[i], peer) - } - } else if (this._status === UPDATING) { - this.updates++ - } - - if (message.error) { - const { value } = message - const proof = value && this._decodeOutput(value) - this.emit('warning', new Error(message.error), proof) - this._readMaybe() - return - } - - if (!this.query && this._status === MOVING_CLOSER) { - this._readMaybe() - return - } - - const value = this._outputEncoding - ? this._decodeOutput(message.value) - : message.value - - const data = this._map({ - type, - to: message.to && message.to.length === 6 ? peers.decode(message.to)[0] : null, - node: { - id: message.id, - port: peer.port, - host: peer.host - }, - value - }) - - if (!data) { - this._readMaybe() - return - } - - this.push(data) - } - - _decodeOutput (val) { - try { - return val && this._outputEncoding.decode(val) - } catch (err) { - return null - } - } - - _bootstrap () { - const table = this.table - const bootstrap = this._node.bucket.closest(table.target, table.k) - var i = 0 - - for (; i < bootstrap.length; i++) { - const b = bootstrap[i] - const node = { id: b.id, port: b.port, host: b.host } - table.addUnverified(node, null) - } - - const bootstrapNodes = this._node.bootstrapNodes - if (bootstrap.length < bootstrapNodes.length) { - for (i = 0; i < bootstrapNodes.length; i++) { - this._send(bootstrapNodes[i], true, false) - } - } - - this._status = MOVING_CLOSER - this._moveCloser() - } - - _sendAll (nodes, force, sendToken) { - var free = Math.max(0, this._concurrency - this._node._io.inflight.length) - var sent = 0 - - if (!free && !this.inflight) free = 1 - if (!free) return 0 - - for (var i = 0; i < nodes.length; i++) { - if (this._send(nodes[i], force, sendToken)) { - if (++sent === free) break - } - } - - return sent - } - - _send (node, force, isUpdate) { - if (!force) { - if (node.queried) return false - node.queried = true - } - - this.inflight++ - const io = this._node._io - - if (isUpdate) { - if (!node.roundtripToken) return this._callback(new Error('Roundtrip token is required')) - io.update(this.command, this.target, this.value, node, this._callback) - } else if (this.query) { - io.query(this.command, this.target, this.value, node, this._callback) - } else { - io.query('_find_node', this.target, null, node, this._callback) - } - - return true - } - - _sendUpdate () { - const sent = this._sendAll(this.table.closest, false, true) - if (sent || this.inflight) return - - this._finalize() - } - - _moveCloser () { - const table = this.table - const sent = this._sendAll(table.unverified, false, false) - if (sent || this.inflight) return - - if (this.update) { - for (var i = 0; i < table.closest.length; i++) { - table.closest[i].queried = false - } - this._status = UPDATING - this._sendUpdate() - } else { - this._finalize() - } - } - - _finalize () { - const status = this._status - if (status === FINALIZED) return - - this._status = FINALIZED - this._node.inflightQueries-- - - if (!this.responses && !this.destroyed) { - this.destroy(new Error('No nodes responded')) - } - if (status === UPDATING && !this.updates && !this.destroyed) { - this.destroy(new Error('No close nodes responded')) - } - - this.push(null) - } - - _readMaybe () { - if (!this.inflight || this._readableState.flowing === true) this._read() - } - - _read () { - if (this._node.destroyed) return - - switch (this._status) { - case BOOTSTRAPPING: return this._bootstrap() - case MOVING_CLOSER: return this._moveCloser() - case UPDATING: return this._sendUpdate() - case FINALIZED: return - } - - throw new Error('Unknown status: ' + this._status) - } - - destroy (err) { - if (this.destroyed) return - this.destroyed = true - - if (err) this.emit('error', err) - this._finalize() - if (!this.inflight) this.emit('close') - } -} - -module.exports = QueryStream - -function decodeNodes (buf) { - if (!buf) return [] - try { - return nodes.decode(buf) - } catch (err) { - return [] - } -} - -function identity (a) { - return a -} diff --git a/lib/query-table.js b/lib/query-table.js deleted file mode 100644 index 5748d88..0000000 --- a/lib/query-table.js +++ /dev/null @@ -1,69 +0,0 @@ -const xor = require('xor-distance') - -class QueryTable { - constructor (id, target) { - this.k = 20 - this.id = id - this.target = target - this.closest = [] - this.unverified = [] - } - - addUnverified (node, referrer) { - if (node.id.equals(this.id)) return - - node.distance = xor(this.target, node.id) - node.referrer = referrer - - insertSorted(node, this.k, this.unverified) - } - - addVerified (message, peer) { - if (!message.id || !message.roundtripToken || message.id.equals(this.id)) { - return - } - - var prev = getNode(message.id, this.unverified) - - if (!prev) { - prev = { - id: message.id, - host: peer.host, - port: peer.port, - distance: xor(message.id, this.target) - } - } - - prev.roundtripToken = message.roundtripToken - insertSorted(prev, this.k, this.closest) - } -} - -module.exports = QueryTable - -function getNode (id, list) { - // find id in the list. - // technically this would be faster with binary search (against distance) - // but this list is always small, so meh - - for (var i = 0; i < list.length; i++) { - if (list[i].id.equals(id)) return list[i] - } - - return null -} - -function insertSorted (node, max, list) { - if (list.length === max && !xor.lt(node.distance, list[max - 1].distance)) return - if (getNode(node.id, list)) return - - if (list.length < max) list.push(node) - else list[max - 1] = node - - var pos = list.length - 1 - while (pos && xor.gt(list[pos - 1].distance, node.distance)) { - list[pos] = list[pos - 1] - list[pos - 1] = node - pos-- - } -} diff --git a/lib/query.js b/lib/query.js new file mode 100644 index 0000000..7fc87ed --- /dev/null +++ b/lib/query.js @@ -0,0 +1,172 @@ +const Table = require('kademlia-routing-table') +const { Readable } = require('streamx') + +module.exports = class Query extends Readable { + constructor (dht, target, command, value, opts = {}) { + super() + + this.dht = dht + this.table = opts.table || new Table(target, { k: 20 }) + this.command = command + this.value = value + this.errors = 0 + this.successes = 0 + this.concurrency = opts.concurrency || 16 + this.inflight = 0 + this.map = opts.map || defaultMap + + this._onresolve = this._onvisit.bind(this) + this._onreject = this._onerror.bind(this) + } + + get target () { + return this.table.id + } + + closest () { + return this.table.closest(this.table.id) + } + + finished () { + return new Promise((resolve, reject) => { + const self = this + let error = null + + this.resume() + this.on('error', onerror) + this.on('close', onclose) + + function onclose () { + self.removeListener('error', onerror) + self.removeListener('close', onclose) + if (error) reject(error) + else resolve() + } + + function onerror (err) { + error = err + } + }) + } + + async commit (command = this.command, value = this.value, opts) { + if (typeof command === 'object' && command) return this.commit(undefined, undefined, command) + return this.dht.requestAll(this.table.id, command, value, this.closest(), opts) + } + + async toArray () { + const all = [] + this.on('data', data => all.push(data)) + await this.finished() + return all + } + + _open (cb) { + let cnt = 0 + + // we need to do this in case of table reuse + for (const node of this.table.closest(this.table.id)) { + node.visited = false + cnt++ + } + + const closest = this.dht.table.closest(this.table.id) + + for (const node of closest) { + cnt++ + this.table.add({ + visited: false, + id: node.id, + token: null, + port: node.port, + host: node.host + }) + } + + if (cnt >= this.concurrency) return cb(null) + + this.dht._resolveBootstrapNodes((bootstrapNodes) => { + for (const node of bootstrapNodes) { + this._visit({ + visited: false, + id: node.id, + token: null, + port: node.port, + host: node.host + }) + } + + cb(null) + }) + } + + _read (cb) { + this._readMore() + cb(null) + } + + _readMore () { + if (this.destroying) return + + const closest = this.table.closest(this.table.id) + + for (const node of closest) { + if (node.visited) continue + if (this.inflight >= this.concurrency) return + this._visit(node) + } + + if (this.inflight === 0) { + this.push(null) + } + } + + _onvisit (m) { + this.successes++ + this.inflight-- + + if (m.nodeId !== null) { + this.table.add({ + visited: true, + id: m.nodeId, + token: m.token, + port: m.from.port, + host: m.from.host + }) + } + + if (m.closerNodes !== null) { + for (const node of m.closerNodes) { + if (node.id.equals(this.dht.table.id)) continue + if (this.table.get(node.id)) continue + this.table.add({ + visited: false, + id: node.id, + token: null, + port: node.port, + host: node.host + }) + } + } + + if (this.push(this.map(m)) !== false) this._readMore() + } + + _onerror () { + this.errors++ + this.inflight-- + this._readMore() + } + + _visit (node) { + node.visited = true + + this.inflight++ + this.dht.request(this.table.id, this.command, this.value, node) + .then(this._onresolve, this._onreject) + } +} + +function defaultMap (m) { + return m +} diff --git a/lib/rpc.js b/lib/rpc.js new file mode 100644 index 0000000..2ffac82 --- /dev/null +++ b/lib/rpc.js @@ -0,0 +1,272 @@ +const dgram = require('dgram') +const { message } = require('./messages') + +const DEFAULT_TTL = 64 +const HOLEPUNCH = Buffer.from([0]) + +module.exports = class RPC { + constructor (opts = {}) { + this._ttl = DEFAULT_TTL + this._pendingSends = [] + this._sending = 0 + this._onflush = onflush.bind(this) + this._tid = (Math.random() * 65536) | 0 + this._drainInterval = null + + this.maxRetries = 3 + this.destroyed = false + this.inflight = [] + this.onholepunch = opts.onholepunch || noop + this.onrequest = opts.onrequest || noop + this.onresponse = opts.onresponse || noop + this.onwarning = opts.onwarning || noop + this.onconnection = opts.onconnection || noop + this.socket = opts.socket || dgram.createSocket('udp4') + this.socket.on('message', this._onmessage.bind(this)) + if (this.onconnection) this.socket.on('connection', this._onutpconnection.bind(this)) + } + + get inflightRequests () { + return this.inflight.length + } + + connect (addr) { + if (!this.socket.connect) throw new Error('UTP needed for connections') + return this.socket.connect(addr.port, addr.host) + } + + send (m) { + const state = { start: 0, end: 0, buffer: null } + + message.preencode(state, m) + state.buffer = Buffer.allocUnsafe(state.end) + message.encode(state, m) + + this._send(state.buffer, DEFAULT_TTL, m.to) + } + + reply (req, reply) { + reply.tid = req.tid + reply.to = req.from + this.send(reply) + } + + holepunch (addr, ttl = DEFAULT_TTL) { + return new Promise((resolve) => { + this._send(HOLEPUNCH, ttl, addr, (err) => { + this._onflush() + resolve(!err) + }) + }) + } + + address () { + return this.socket.address() + } + + bind (port) { + return new Promise((resolve, reject) => { + const s = this.socket + + if (s.listen) { + s.listen(port) + } else { + s.bind(port) + } + + s.on('listening', onlistening) + s.on('error', onerror) + + function onlistening () { + s.removeListener('listening', onlistening) + s.removeListener('error', onerror) + resolve() + } + + function onerror (err) { + s.removeListener('listening', onlistening) + s.removeListener('error', onerror) + reject(err) + } + }) + } + + destroy () { + if (this.destroyed) return + this.unwrap(true) + this.socket.close() + } + + unwrap (closing = false) { + if (this.destroyed) return + this.destroyed = true + + clearInterval(this._drainInterval) + this.socket.removeAllListeners() + + for (const req of this.inflight) { + req.reject(new Error('RPC socket destroyed')) + } + + this.inflight = [] + if (!closing) this.socket.setTTL(DEFAULT_TTL) + + return this.socket + } + + request (m, opts) { + if (this.destroyed) return Promise.reject(new Error('RPC socket destroyed')) + + if (this._drainInterval === null) { + this._drainInterval = setInterval(this._drain.bind(this), 1500) + if (this._drainInterval.unref) this._drainInterval.unref() + } + + m.tid = this._tid++ + if (this._tid === 65536) this._tid = 0 + + const state = { start: 0, end: 0, buffer: null } + + message.preencode(state, m) + state.buffer = Buffer.allocUnsafe(state.end) + message.encode(state, m) + + return new Promise((resolve, reject) => { + this.inflight.push({ + tries: (opts && opts.retry === false) ? this.maxRetries : 0, + lastTry: 0, + tid: m.tid, + buffer: state.buffer, + to: m.to, + resolve, + reject + }) + this._drain() + }) + } + + static async race (rpc, command, value, hosts) { + const p = new Array(hosts.length) + + for (let i = 0; i < hosts.length; i++) { + p[i] = rpc.request(command, value, hosts[i]) + } + + return Promise.race(p) + } + + _onutpconnection (socket) { + this.onconnection(socket, this) + } + + _onmessage (buffer, rinfo) { + const from = { host: rinfo.address, port: rinfo.port } + if (!from.port) return + + if (buffer.byteLength <= 1) return this.onholepunch(from, this) + + const state = { start: 0, end: buffer.byteLength, buffer } + let m = null + + try { + m = message.decode(state) + } catch (err) { + console.log(err) + this.onwarning(err) + return + } + + m.from = from + + if (m.command !== null) { // request + if (this.onrequest === noop) return + this.onrequest(m, this) + return + } + + const req = this._dequeue(m.tid) + + if (req === null) return + this.onresponse(m, this) + + if (m.status === 0) { + req.resolve(m) + } else { + req.reject(createStatusError(m.status)) + } + } + + _send (buffer, ttl, addr, done) { + if ((this._ttl !== ttl && this._sending > 0) || this._pendingSends.length > 0) { + this._pendingSends.push({ buffer, ttl, addr, done }) + } else { + this._sendNow(buffer, ttl, addr, done) + } + } + + _sendNow (buf, ttl, addr, done) { + if (this.destroyed) return + this._sending++ + + if (ttl !== this._ttl) { + this._ttl = ttl + this.socket.setTTL(ttl) + } + + this.socket.send(buf, 0, buf.byteLength, addr.port, addr.host, done || this._onflush) + } + + _dequeue (tid) { + for (let i = 0; i < this.inflight.length; i++) { + const req = this.inflight[i] + + if (req.tid === tid) { + if (i === this.inflight.length - 1) this.inflight.pop() + else this.inflight[i] = this.inflight.pop() + return req + } + } + + return null + } + + _drain () { + const now = Date.now() + + for (let i = 0; i < this.inflight.length; i++) { + const req = this.inflight[i] + + if (now - req.lastTry < 3000) { + continue + } + + req.lastTry = now + + if (req.tries++ > this.maxRetries) { + if (i === this.inflight.length - 1) this.inflight.pop() + else this.inflight[i] = this.inflight.pop() + req.reject(new Error('Request timed out')) + continue + } + + this._send(req.buffer, DEFAULT_TTL, req.to) + } + } +} + +function createStatusError (status) { + const err = new Error('Request failed with status ' + status) + err.status = status + return err +} + +function onflush () { + if (--this._sending === 0) { + while (this._pendingSends.length > 0 && (this._sending === 0 || this._pendingSends[0].ttl === this._ttl)) { + const { buffer, ttl, addr, done } = this._pendingSends.shift() + this._sendNow(buffer, ttl, addr, done) + } + } +} + +function noop () {} diff --git a/package.json b/package.json index e300cc3..652c26d 100644 --- a/package.json +++ b/package.json @@ -1,37 +1,27 @@ { "name": "dht-rpc", - "version": "4.9.6", + "version": "5.0.0-beta1", "description": "Make RPC calls over a Kademlia based DHT", "main": "index.js", - "scripts": { - "test": "standard && tape test.js", - "protobuf": "protocol-buffers schema.proto -o lib/messages.js" + "dependencies": { + "compact-encoding": "^2.1.0", + "fast-fifo": "^1.0.0", + "kademlia-routing-table": "^1.0.0", + "sodium-universal": "^3.0.4", + "streamx": "^2.10.3", + "time-ordered-set": "^1.0.2" + }, + "devDependencies": { + "standard": "^16.0.3" }, "repository": { "type": "git", - "url": "git+https://github.com/mafintosh/dht-rpc.git" + "url": "https://github.com/mafintosh/dht-rpc.git" }, "author": "Mathias Buus (@mafintosh)", "license": "MIT", "bugs": { "url": "https://github.com/mafintosh/dht-rpc/issues" }, - "homepage": "https://github.com/mafintosh/dht-rpc#readme", - "devDependencies": { - "protocol-buffers": "^4.1.1", - "standard": "^14.3.1", - "tape": "^4.13.0" - }, - "dependencies": { - "blake2b-universal": "^1.0.0", - "codecs": "^2.0.0", - "ipv4-peers": "^2.0.0", - "k-bucket": "^5.0.0", - "protocol-buffers-encodings": "^1.1.0", - "sodium-native": "^3.1.1", - "speedometer": "^1.1.0", - "stream-collector": "^1.0.1", - "time-ordered-set": "^1.0.1", - "xor-distance": "^2.0.0" - } + "homepage": "https://github.com/mafintosh/dht-rpc" } diff --git a/schema.proto b/schema.proto deleted file mode 100644 index dadfa84..0000000 --- a/schema.proto +++ /dev/null @@ -1,30 +0,0 @@ -message Holepunch { - optional bytes from = 2; - optional bytes to = 3; -} - -enum TYPE { - QUERY = 1; - UPDATE = 2; - RESPONSE = 3; -} - -message Message { - optional uint64 version = 11; - - // request/response type + id - required TYPE type = 1; - required uint64 rid = 2; - optional bytes to = 10; - - // kademlia stuff - optional bytes id = 3; - optional bytes target = 4; - optional bytes closerNodes = 5; - optional bytes roundtripToken = 6; - - // rpc stuff - optional string command = 7; - optional string error = 8; - optional bytes value = 9; -} diff --git a/test.js b/test.js deleted file mode 100644 index bcba33d..0000000 --- a/test.js +++ /dev/null @@ -1,314 +0,0 @@ -const tape = require('tape') -const dht = require('./') -const blake2b = require('blake2b-universal') - -tape('simple update', function (t) { - bootstrap(function (port, node) { - const a = dht({ bootstrap: port }) - const b = dht({ bootstrap: port }) - - a.command('echo', { - query (data, callback) { - t.fail('should not query') - callback(new Error('nope')) - }, - update (data, callback) { - t.same(data.value, Buffer.from('Hello, World!'), 'expected data') - callback(null, data.value) - } - }) - - a.ready(function () { - b.update('echo', a.id, Buffer.from('Hello, World!'), function (err, responses) { - a.destroy() - b.destroy() - node.destroy() - - t.error(err, 'no errors') - t.same(responses.length, 1, 'one response') - t.same(responses[0].value, Buffer.from('Hello, World!'), 'echoed data') - t.end() - }) - }) - }) -}) - -tape('simple query', function (t) { - bootstrap(function (port, node) { - const a = dht({ bootstrap: port }) - const b = dht({ bootstrap: port }) - - a.command('hello', { - query (data, callback) { - t.same(data.value, null, 'expected data') - callback(null, Buffer.from('world')) - } - }) - - a.ready(function () { - b.query('hello', a.id, function (err, responses) { - a.destroy() - b.destroy() - node.destroy() - - t.error(err, 'no errors') - t.same(responses.length, 1, 'one response') - t.same(responses[0].value, Buffer.from('world'), 'responded') - t.end() - }) - }) - }) -}) - -tape('query and update', function (t) { - bootstrap(function (port, node) { - const a = dht({ bootstrap: port }) - const b = dht({ bootstrap: port }) - - a.command('hello', { - query (data, callback) { - t.same(data.value, null, 'expected query data') - callback(null, Buffer.from('world')) - }, - update (data, callback) { - t.same(data.value, null, 'expected update data') - callback(null, Buffer.from('world')) - } - }) - - a.ready(function () { - b.queryAndUpdate('hello', a.id, function (err, responses) { - a.destroy() - b.destroy() - node.destroy() - - t.error(err, 'no errors') - t.same(responses.length, 2, 'two responses') - t.same(responses[0].value, Buffer.from('world'), 'responded') - t.same(responses[1].value, Buffer.from('world'), 'responded') - t.ok(responses[0].type !== responses[1].type, 'not the same type') - t.end() - }) - }) - }) -}) - -tape('swarm query', function (t) { - bootstrap(function (port, node) { - const swarm = [] - var closest = 0 - - loop() - - function done () { - t.pass('created swarm') - - const key = Buffer.allocUnsafe(32) - blake2b(key, Buffer.from('hello')) - const me = dht({ bootstrap: port }) - - me.update('kv', key, Buffer.from('hello'), function (err, responses) { - t.error(err, 'no error') - t.same(closest, 20, '20 closest nodes') - t.same(responses.length, 20, '20 responses') - - const stream = me.query('kv', key) - - stream.on('data', function (data) { - if (data.value) { - t.same(data.value, Buffer.from('hello'), 'echoed value') - t.end() - swarm.forEach(function (node) { - node.destroy() - }) - me.destroy() - node.destroy() - stream.destroy() - } - }) - }) - } - - function loop () { - if (swarm.length === 256) return done() - const node = dht({ bootstrap: port }) - swarm.push(node) - - var value = null - - node.command('kv', { - update (data, cb) { - closest++ - value = data.value - cb() - }, - query (data, cb) { - cb(null, value) - } - }) - - node.ready(loop) - } - }) -}) - -tape('holepunch api', function (t) { - bootstrap(function (port, node) { - const a = dht({ bootstrap: port }) - const b = dht({ bootstrap: port }) - var holepunched = false - - a.ready(function () { - b.ready(function () { - node.on('holepunch', function (from, to) { - t.same(from.port, a.address().port) - t.same(to.port, b.address().port) - holepunched = true - }) - a.holepunch({ - host: '127.0.0.1', - port: b.address().port, - referrer: { - host: '127.0.0.1', - port: node.address().port - } - }, function (err) { - t.error(err, 'no error') - t.ok(holepunched) - t.end() - - node.destroy() - a.destroy() - b.destroy() - }) - }) - }) - }) -}) - -tape('timeouts', function (t) { - bootstrap(function (port, node) { - const a = dht({ bootstrap: port, ephemeral: true }) - const b = dht({ bootstrap: port }) - - var tries = 0 - - b.command('nope', { - update (query, cb) { - tries++ - t.pass('ignoring update') - } - }) - - b.ready(function () { - a.update('nope', Buffer.alloc(32), function (err) { - t.ok(err, 'errored') - t.same(tries, 3) - t.end() - node.destroy() - a.destroy() - b.destroy() - }) - }) - }) -}) - -tape('persistent', function (t) { - bootstrap(function (port, node) { - const a = dht({ bootstrap: port, ephemeral: true }) - const b = dht({ bootstrap: port }) - a.command('hello', { - query (data, callback) { - callback(null, Buffer.from('world')) - } - }) - - a.ready(function () { - b.ready(function () { - const key = Buffer.allocUnsafe(32) - blake2b(key, Buffer.from('hello')) - b.query('hello', key, (err, result) => { - t.error(err) - t.is(result.length, 0) - a.persistent((err) => { - t.error(err) - b.query('hello', key, (err, result) => { - t.error(err) - t.is(result.length, 1) - t.is(Buffer.compare(result[0].node.id, a.id), 0) - a.destroy() - b.destroy() - node.destroy() - t.end() - }) - }) - }) - }) - }) - }) -}) - -tape('getNodes', function (t) { - bootstrap(function (port, node) { - const a = dht({ bootstrap: port }) - const b = dht({ bootstrap: port }) - a.ready(function () { - b.ready(function () { - const aNodes = a.getNodes() - const bNodes = b.getNodes() - t.deepEqual([{ id: b.id, host: '127.0.0.1', port: b.address().port }], aNodes) - t.deepEqual([{ id: a.id, host: '127.0.0.1', port: a.address().port }], bNodes) - b.destroy() - a.destroy() - node.destroy() - t.end() - }) - }) - }) -}) - -tape('addNodes', function (t) { - bootstrap(function (port, node) { - const a = dht({ bootstrap: port }) - const b = dht({ bootstrap: [] }) - b.listen() // https://github.com/hyperswarm/dht/issues/22 - - a.command('hello', { - query (data, callback) { - t.same(data.value, null, 'expected data') - callback(null, Buffer.from('world')) - } - }) - - a.ready(function () { - b.ready(function () { - process.nextTick(function () { - const bNodes = b.getNodes() - t.deepEqual(bNodes, [{ id: a.id, host: '127.0.0.1', port: a.address().port }]) - b.query('hello', a.id, function (err, responses) { - t.error(err, 'no errors') - t.same(responses.length, 1, 'one response') - t.same(responses[0].value, Buffer.from('world'), 'responded') - const aNodes = a.getNodes() - t.deepEqual(aNodes, [{ id: b.id, host: '127.0.0.1', port: b.address().port }]) - b.destroy() - a.destroy() - node.destroy() - t.end() - }) - }) - }) - b.addNodes([{ id: a.id, host: '127.0.0.1', port: a.address().port }]) - }) - }) -}) - -function bootstrap (done) { - const node = dht({ - ephemeral: true - }) - - node.listen(0, function () { - done(node.address().port, node) - }) -}