diff --git a/README.md b/README.md index f6ca666..0a04815 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,7 @@ To see the v4 documentation/code go to https://github.com/mafintosh/dht-rpc/tree ## Key Features -* NAT type detection +* Remote IP / firewall detection * Easily add any command to your DHT * Streaming queries and updates @@ -27,19 +27,18 @@ Here is an example implementing a simple key value store First spin up a bootstrap node. You can make multiple if you want for redundancy. ``` js -const DHT = require('dht-rpc') +import DHT from 'dht-rpc' -// Set ephemeral: true so other peers never add us to their routing table, simply bootstrap -const bootstrap = new DHT({ ephemeral: true }) - -bootstrap.bind(10001) +// If the bootstrap node doesn't implement the same commands as your other nodes +// remember to set ephemeral: true so it isn't added to the routing table. +const bootstrap = DHT.bootstrapper(10001, { ephemeral: true }) ``` Now lets make some dht nodes that can store values in our key value store. ``` js -const DHT = require('dht-rpc') -const crypto = require('crypto') +import DHT from 'dht-rpc' +import crypto from 'crypto' // Let's create 100 dht nodes for our example. for (var i = 0; i < 100; i++) createNode() @@ -55,8 +54,8 @@ function createNode () { node.on('request', function (req) { if (req.command === 'values') { - if (req.commit) { // if we are the closest node store the value (ie the node sent a roundtrip token) - const key = sha256(req.value).toString('hex') + if (req.token) { // if we are the closest node store the value (ie the node sent a valid roundtrip token) + const key = hash(req.value).toString('hex') values.set(key, req.value) console.log('Storing', key, '-->', req.value.toString()) return req.reply(null) @@ -68,8 +67,8 @@ function createNode () { }) } -function sha256 (val) { - return crypto.createHash('sha256').update(val).digest() +function hash (value) { + return crypto.createHash('sha256').update(value).digest() } ``` @@ -78,14 +77,25 @@ To insert a value into this dht make another script that does this following ``` js const node = new DHT() -await node.query(sha256(val), 'values', value, { commit: true }).finished() +const q = node.query({ + target: hash(val), + command: 'values', + value +}, { + // commit true will make the query re-reuqest the 20 closest + // nodes with a valid round trip token to update the values + commit: true +}) + +await q.finished() ``` Then after inserting run this script to query for a value ``` js -for await (const data of node.query(Buffer.from(hexFromAbove, 'hex'))) { - if (data.value && sha256(data.value).toString('hex') === hexFromAbove) { +const target = Buffer.from(hexFromAbove, 'hex') +for await (const data of node.query({ target, command: 'values' })) { + if (data.value && hash(data.value).toString('hex') === hexFromAbove) { // We found the value! Destroy the query stream as there is no need to continue. console.log(val, '-->', data.value.toString()) break @@ -116,7 +126,9 @@ Options include: // Optionally pass in array of { host, port } to add to the routing table if you know any peers nodes: [{ host, port }, ...], // Optionally pass a port you prefer to bind to instead of a random one - bind: 0 + bind: 0, + // dht-rpc will automatically detect if you are firewalled. If you know that you are not set this to false + firewalled: true } ``` @@ -125,15 +137,24 @@ For the majority of use-cases you should always use adaptive mode to ensure good Your DHT routing id is `hash(publicIp + publicPort)` and will be autoconfigured internally. +#### `const node = DHT.boostrapper(bind, [options])` + +Sugar for the options needed to run a bootstrap node, ie + +```js +{ + firewalled: false, // a bootstrapper can never be firewalled + bootstrap: [] // force set no other bootstrappers. +} +``` + +Additionally since you'll want a known port for a bootstrap node it adds the bind option as a primary argument. + #### `await node.ready()` Wait for the node to be fully bootstrapped etc. You don't have to wait for this method, but can be useful during testing. -#### `await node.bind([preferredPort])` - -Wait for the underlying socket to bind. If you prefer a specific port you can specify it here. - #### `node.id` Get your own routing ID. Only available when the node is not ephemeral. @@ -167,26 +188,30 @@ it will switch from persistent mode to ephemeral again. Refresh the routing table by looking up a random node in the background. This is called internally periodically, but exposed in-case you want to force a refresh. -#### `{ type, host, port } = node.remoteAddress()` +#### `node.host` + +Get your node's public ip, inferred from other nodes in the DHT. +If the ip cannot be determined, this is set to `null`. -Get your node's public ip, public port and the NAT type based on a series of internal -statistics (see the nat-analyzer code for more info). +#### `node.port` -This is extremely useful to figure out a relevant NAT holepunching technique as well if you want to connect -peers behind the DHT later on. +Get your node's public port, inferred from other nodes in the DHT. +If your node does not have a consistent port, this is set to 0. -`type` is an enum symbol +#### `node.firewalled` -* `DHT.NAT_UNKNOWN` - not enough data to figure out the NAT -* `DHT.NAT_OPEN` - fully open nat (ie a server) - a requirement for adaptive nodes to go persistent. -* `DHT.NAT_PORT_CONSISTENT` - NAT sessions appear consistent across multiple peers. -* `DHT.NAT_PORT_INCREMENTING` - NAT sessions appear to have an incremeting port across sessions. -* `DHT.NAT_PORT_RANDOMIZED` - NAT sessions appear randomized across sessions. +Boolean indicated if your node is behind a firewall. -#### `await node.sampledNAT()` +This is auto detected by having other node's trying to do a PING to you +without you contacting them first. -Helper to indicate when the NAT analyzer has enough data to determine your NAT type as that happens much -faster than the bootstrapping promise returned by `ready()`. +#### `const udpAddr = node.address()` + +Get the local address of the UDP socket bound. + +Note that if you are in ephemeral mode, this will return a different +port than the one you provided in the constructor (under bind), as ephemeral +mode always uses a random port. #### `node.on('request', req)` @@ -196,12 +221,20 @@ Emitted when an incoming DHT request is received. This is where you can add your * `req.command` - the RPC command name * `req.value` - the RPC value buffer * `req.token` - If the remote peer echoed back a valid roundtrip token, proving their "from address" this is set -* `req.commit` - Boolean set as a convenience if a valid token was provided * `req.from` - who sent this request (host, port) To reply to a request use the `req.reply(value)` method and to reply with an error code use `req.error(errorCode)`. -Error codes are up to the user to define. `dht-rpc` defines `0` as OK (ie no error), `1` as `UNKNOWN_COMMAND`, -both available as `DHT.OK` and `DHT.UNKNOWN_COMMAND`. + +In general error codes are up to the user to define, with the general suggestion to start application specific errors +from error code `16` and up, to avoid future clashes with `dht-rpc` internals. + +Currently dht-rpc defines the following errors + +``` js +DHT.OK = 0 // ie no error +DHT.ERROR_UNKNOWN_COMMAND = 1 // the command requested does not exist +DHT.ERROR_INVALID_TOKEN = 2 // the round trip token sent is invalid +``` The DHT has a couple of built in commands for bootstrapping and general DHT health management. Those are: @@ -209,8 +242,9 @@ Those are: * `find_node` - Find the closest DHT nodes to a specific target with no side-effects. * `ping` - Ping another node to see if it is alive. * `ping_nat` - Ping another node, but have it reply on a different UDP session to see if you are firewalled. +* `down_hint` - Gossiped internally to hint that a specific node might be down. -#### `reply = await node.request(target, command, value, to, [options])` +#### `reply = await node.request({ token, target, command, value }, to, [options])` Send a request to a specific node specified by the to address (`{ host, port }`). @@ -218,9 +252,8 @@ Options include: ```js { - token: roundtripTokenFromAReply, retry: true, // whether the request should retry on timeout - expectOk: true // expect the reply to have status 0 or error + socket: udpSocket // request on this specific socket } ``` @@ -228,16 +261,13 @@ Normally you'd set the token when commiting to the dht in the query's commit hoo #### `reply = await node.ping(to)` -Sugar for `dht.request(null, 'ping', null, to)` +Sugar for `dht.request({ command: 'ping' }, to)` -#### `replies = await node.requestAll(target, command, value, toArray, [options])` - -Conveinience method for requesting many nodes at once. - -#### `stream = node.query(target, command, [value], [options])` +#### `stream = node.query({ target, command, value }, [options])` Query the DHT. Will move as close as possible to the `target` provided, which should be a 32-byte uniformly distributed buffer (ie a hash). +* `target` - find nodes close to this * `command` - the method you want to invoke * `value` - optional binary payload to send with it @@ -257,10 +287,10 @@ that is called for each close reply. ``` js { - async commit (closestReply, dht, query) { + async commit (reply, dht, query) { // normally you'd send back the roundtrip token here, to prove to the remote that you own // your ip/port - await dht.request(myTarget, myCommand, myValue, closestReply.from, { token: closestReply.token }) + await dht.request({ token: reply.token, target, command, value }, reply.from) } } ``` @@ -273,6 +303,10 @@ Other options include: // start the query by querying these nodes // useful if you are re-doing a query from a set of closest nodes. ], + replies: [ + // similar to nodes, but if you useful if you have an array of closest replies instead + // from a previous query. + ], map (reply) { // map the reply into what you want returned on the stram return { onlyValue: reply.value } @@ -284,10 +318,16 @@ The query method returns a stream encapsulating the query, that is also an async If you just want to wait for the query to finish, you can use the `await stream.finished()` helper. After completion the closest nodes are stored in `stream.closestNodes` array. +If you want to access the closest replies to your provided target you can see those at `stream.closestReplies`. + #### `node.destroy()` Shutdown the DHT node. +#### `node.destroyed` + +Boolean indicating if this has been destroyed. + #### `node.toArray()` Get the routing table peers out as an array of `{ host, port}` diff --git a/index.js b/index.js index d570717..fd9f0d3 100644 --- a/index.js +++ b/index.js @@ -8,7 +8,7 @@ const NatSampler = require('nat-sampler') const IO = require('./lib/io') const Query = require('./lib/query') const peer = require('./lib/peer') -const { UNKNOWN_COMMAND, BAD_COMMAND, INVALID_TOKEN } = require('./lib/errors') +const { UNKNOWN_COMMAND, INVALID_TOKEN } = require('./lib/errors') const TMP = Buffer.allocUnsafe(32) const TICK_INTERVAL = 5000 @@ -623,14 +623,15 @@ class DHT extends EventEmitter { } } +DHT.OK = 0 DHT.ERROR_UNKNOWN_COMMAND = UNKNOWN_COMMAND DHT.ERROR_INVALID_TOKEN = INVALID_TOKEN -DHT.ERROR_BAD_COMMAND = BAD_COMMAND module.exports = DHT function parseNode (s) { if (typeof s === 'object') return s + if (typeof s === 'number') return { host: '127.0.0.1', port: s } const [host, port] = s.split(':') if (!port) throw new Error('Bootstrap node format is host:port') diff --git a/lib/errors.js b/lib/errors.js index 79141e0..b73eac6 100644 --- a/lib/errors.js +++ b/lib/errors.js @@ -1,6 +1,5 @@ exports.UNKNOWN_COMMAND = 1 exports.INVALID_TOKEN = 2 -exports.BAD_COMMAND = 3 exports.TIMEOUT = new Error('Request timed out') exports.TIMEOUT.code = 'ETIMEDOUT' diff --git a/lib/query.js b/lib/query.js index 4d94b9a..6819a1e 100644 --- a/lib/query.js +++ b/lib/query.js @@ -34,13 +34,18 @@ module.exports = class Query extends Readable { this._oncyclebound = this._oncycle.bind(this) const nodes = opts.nodes || opts.closestNodes + const replies = opts.replies || opts.closestReplies + // add them reverse as we pop below if (nodes) { - // add them reverse as we pop below for (let i = nodes.length - 1; i >= 0; i--) { const node = nodes[i] this._addPending({ id: node.id || peer.id(node.host, node.port), host: node.host, port: node.port }, null) } + } else if (replies) { + for (let i = replies.length - 1; i >= 0; i--) { + this._addPending(replies[i].from, null) + } } } diff --git a/test.js b/test.js index 37bdb9f..736d8fb 100644 --- a/test.js +++ b/test.js @@ -26,6 +26,7 @@ tape('make bigger swarm', async function (t) { } } + const replies = q.closestReplies t.ok(found, 'found target in ' + messages + ' message(s)') q = swarm[490].query({ command: 'find_node', target }, { nodes: q.closestNodes }) @@ -42,6 +43,20 @@ tape('make bigger swarm', async function (t) { t.ok(found, 'found target again in ' + messages + ' message(s)') + q = swarm[470].query({ command: 'find_node', target }, { replies }) + messages = 0 + found = false + + for await (const data of q) { + messages++ + if (data.from.id && data.from.id.equals(target)) { + found = true + break + } + } + + t.ok(found, 'found target again in ' + messages + ' message(s) with original replies') + const { firewalled, host, port } = swarm[490] t.same(firewalled, false)