Browse Source

update docs and corresponding code tweaks

session-estimator
Mathias Buus 3 years ago
parent
commit
5e4aa99672
  1. 136
      README.md
  2. 5
      index.js
  3. 1
      lib/errors.js
  4. 7
      lib/query.js
  5. 15
      test.js

136
README.md

@ -13,7 +13,7 @@ To see the v4 documentation/code go to https://github.com/mafintosh/dht-rpc/tree
## Key Features
* NAT type detection
* Remote IP / firewall detection
* Easily add any command to your DHT
* Streaming queries and updates
@ -27,19 +27,18 @@ Here is an example implementing a simple key value store
First spin up a bootstrap node. You can make multiple if you want for redundancy.
``` js
const DHT = require('dht-rpc')
import DHT from 'dht-rpc'
// Set ephemeral: true so other peers never add us to their routing table, simply bootstrap
const bootstrap = new DHT({ ephemeral: true })
bootstrap.bind(10001)
// If the bootstrap node doesn't implement the same commands as your other nodes
// remember to set ephemeral: true so it isn't added to the routing table.
const bootstrap = DHT.bootstrapper(10001, { ephemeral: true })
```
Now lets make some dht nodes that can store values in our key value store.
``` js
const DHT = require('dht-rpc')
const crypto = require('crypto')
import DHT from 'dht-rpc'
import crypto from 'crypto'
// Let's create 100 dht nodes for our example.
for (var i = 0; i < 100; i++) createNode()
@ -55,8 +54,8 @@ function createNode () {
node.on('request', function (req) {
if (req.command === 'values') {
if (req.commit) { // if we are the closest node store the value (ie the node sent a roundtrip token)
const key = sha256(req.value).toString('hex')
if (req.token) { // if we are the closest node store the value (ie the node sent a valid roundtrip token)
const key = hash(req.value).toString('hex')
values.set(key, req.value)
console.log('Storing', key, '-->', req.value.toString())
return req.reply(null)
@ -68,8 +67,8 @@ function createNode () {
})
}
function sha256 (val) {
return crypto.createHash('sha256').update(val).digest()
function hash (value) {
return crypto.createHash('sha256').update(value).digest()
}
```
@ -78,14 +77,25 @@ To insert a value into this dht make another script that does this following
``` js
const node = new DHT()
await node.query(sha256(val), 'values', value, { commit: true }).finished()
const q = node.query({
target: hash(val),
command: 'values',
value
}, {
// commit true will make the query re-reuqest the 20 closest
// nodes with a valid round trip token to update the values
commit: true
})
await q.finished()
```
Then after inserting run this script to query for a value
``` js
for await (const data of node.query(Buffer.from(hexFromAbove, 'hex'))) {
if (data.value && sha256(data.value).toString('hex') === hexFromAbove) {
const target = Buffer.from(hexFromAbove, 'hex')
for await (const data of node.query({ target, command: 'values' })) {
if (data.value && hash(data.value).toString('hex') === hexFromAbove) {
// We found the value! Destroy the query stream as there is no need to continue.
console.log(val, '-->', data.value.toString())
break
@ -116,7 +126,9 @@ Options include:
// Optionally pass in array of { host, port } to add to the routing table if you know any peers
nodes: [{ host, port }, ...],
// Optionally pass a port you prefer to bind to instead of a random one
bind: 0
bind: 0,
// dht-rpc will automatically detect if you are firewalled. If you know that you are not set this to false
firewalled: true
}
```
@ -125,15 +137,24 @@ For the majority of use-cases you should always use adaptive mode to ensure good
Your DHT routing id is `hash(publicIp + publicPort)` and will be autoconfigured internally.
#### `const node = DHT.boostrapper(bind, [options])`
Sugar for the options needed to run a bootstrap node, ie
```js
{
firewalled: false, // a bootstrapper can never be firewalled
bootstrap: [] // force set no other bootstrappers.
}
```
Additionally since you'll want a known port for a bootstrap node it adds the bind option as a primary argument.
#### `await node.ready()`
Wait for the node to be fully bootstrapped etc.
You don't have to wait for this method, but can be useful during testing.
#### `await node.bind([preferredPort])`
Wait for the underlying socket to bind. If you prefer a specific port you can specify it here.
#### `node.id`
Get your own routing ID. Only available when the node is not ephemeral.
@ -167,26 +188,30 @@ it will switch from persistent mode to ephemeral again.
Refresh the routing table by looking up a random node in the background.
This is called internally periodically, but exposed in-case you want to force a refresh.
#### `{ type, host, port } = node.remoteAddress()`
#### `node.host`
Get your node's public ip, public port and the NAT type based on a series of internal
statistics (see the nat-analyzer code for more info).
Get your node's public ip, inferred from other nodes in the DHT.
If the ip cannot be determined, this is set to `null`.
This is extremely useful to figure out a relevant NAT holepunching technique as well if you want to connect
peers behind the DHT later on.
#### `node.port`
`type` is an enum symbol
Get your node's public port, inferred from other nodes in the DHT.
If your node does not have a consistent port, this is set to 0.
* `DHT.NAT_UNKNOWN` - not enough data to figure out the NAT
* `DHT.NAT_OPEN` - fully open nat (ie a server) - a requirement for adaptive nodes to go persistent.
* `DHT.NAT_PORT_CONSISTENT` - NAT sessions appear consistent across multiple peers.
* `DHT.NAT_PORT_INCREMENTING` - NAT sessions appear to have an incremeting port across sessions.
* `DHT.NAT_PORT_RANDOMIZED` - NAT sessions appear randomized across sessions.
#### `node.firewalled`
#### `await node.sampledNAT()`
Boolean indicated if your node is behind a firewall.
Helper to indicate when the NAT analyzer has enough data to determine your NAT type as that happens much
faster than the bootstrapping promise returned by `ready()`.
This is auto detected by having other node's trying to do a PING to you
without you contacting them first.
#### `const udpAddr = node.address()`
Get the local address of the UDP socket bound.
Note that if you are in ephemeral mode, this will return a different
port than the one you provided in the constructor (under bind), as ephemeral
mode always uses a random port.
#### `node.on('request', req)`
@ -196,12 +221,20 @@ Emitted when an incoming DHT request is received. This is where you can add your
* `req.command` - the RPC command name
* `req.value` - the RPC value buffer
* `req.token` - If the remote peer echoed back a valid roundtrip token, proving their "from address" this is set
* `req.commit` - Boolean set as a convenience if a valid token was provided
* `req.from` - who sent this request (host, port)
To reply to a request use the `req.reply(value)` method and to reply with an error code use `req.error(errorCode)`.
Error codes are up to the user to define. `dht-rpc` defines `0` as OK (ie no error), `1` as `UNKNOWN_COMMAND`,
both available as `DHT.OK` and `DHT.UNKNOWN_COMMAND`.
In general error codes are up to the user to define, with the general suggestion to start application specific errors
from error code `16` and up, to avoid future clashes with `dht-rpc` internals.
Currently dht-rpc defines the following errors
``` js
DHT.OK = 0 // ie no error
DHT.ERROR_UNKNOWN_COMMAND = 1 // the command requested does not exist
DHT.ERROR_INVALID_TOKEN = 2 // the round trip token sent is invalid
```
The DHT has a couple of built in commands for bootstrapping and general DHT health management.
Those are:
@ -209,8 +242,9 @@ Those are:
* `find_node` - Find the closest DHT nodes to a specific target with no side-effects.
* `ping` - Ping another node to see if it is alive.
* `ping_nat` - Ping another node, but have it reply on a different UDP session to see if you are firewalled.
* `down_hint` - Gossiped internally to hint that a specific node might be down.
#### `reply = await node.request(target, command, value, to, [options])`
#### `reply = await node.request({ token, target, command, value }, to, [options])`
Send a request to a specific node specified by the to address (`{ host, port }`).
@ -218,9 +252,8 @@ Options include:
```js
{
token: roundtripTokenFromAReply,
retry: true, // whether the request should retry on timeout
expectOk: true // expect the reply to have status 0 or error
socket: udpSocket // request on this specific socket
}
```
@ -228,16 +261,13 @@ Normally you'd set the token when commiting to the dht in the query's commit hoo
#### `reply = await node.ping(to)`
Sugar for `dht.request(null, 'ping', null, to)`
#### `replies = await node.requestAll(target, command, value, toArray, [options])`
Sugar for `dht.request({ command: 'ping' }, to)`
Conveinience method for requesting many nodes at once.
#### `stream = node.query(target, command, [value], [options])`
#### `stream = node.query({ target, command, value }, [options])`
Query the DHT. Will move as close as possible to the `target` provided, which should be a 32-byte uniformly distributed buffer (ie a hash).
* `target` - find nodes close to this
* `command` - the method you want to invoke
* `value` - optional binary payload to send with it
@ -257,10 +287,10 @@ that is called for each close reply.
``` js
{
async commit (closestReply, dht, query) {
async commit (reply, dht, query) {
// normally you'd send back the roundtrip token here, to prove to the remote that you own
// your ip/port
await dht.request(myTarget, myCommand, myValue, closestReply.from, { token: closestReply.token })
await dht.request({ token: reply.token, target, command, value }, reply.from)
}
}
```
@ -273,6 +303,10 @@ Other options include:
// start the query by querying these nodes
// useful if you are re-doing a query from a set of closest nodes.
],
replies: [
// similar to nodes, but if you useful if you have an array of closest replies instead
// from a previous query.
],
map (reply) {
// map the reply into what you want returned on the stram
return { onlyValue: reply.value }
@ -284,10 +318,16 @@ The query method returns a stream encapsulating the query, that is also an async
If you just want to wait for the query to finish, you can use the `await stream.finished()` helper. After completion the closest
nodes are stored in `stream.closestNodes` array.
If you want to access the closest replies to your provided target you can see those at `stream.closestReplies`.
#### `node.destroy()`
Shutdown the DHT node.
#### `node.destroyed`
Boolean indicating if this has been destroyed.
#### `node.toArray()`
Get the routing table peers out as an array of `{ host, port}`

5
index.js

@ -8,7 +8,7 @@ const NatSampler = require('nat-sampler')
const IO = require('./lib/io')
const Query = require('./lib/query')
const peer = require('./lib/peer')
const { UNKNOWN_COMMAND, BAD_COMMAND, INVALID_TOKEN } = require('./lib/errors')
const { UNKNOWN_COMMAND, INVALID_TOKEN } = require('./lib/errors')
const TMP = Buffer.allocUnsafe(32)
const TICK_INTERVAL = 5000
@ -623,14 +623,15 @@ class DHT extends EventEmitter {
}
}
DHT.OK = 0
DHT.ERROR_UNKNOWN_COMMAND = UNKNOWN_COMMAND
DHT.ERROR_INVALID_TOKEN = INVALID_TOKEN
DHT.ERROR_BAD_COMMAND = BAD_COMMAND
module.exports = DHT
function parseNode (s) {
if (typeof s === 'object') return s
if (typeof s === 'number') return { host: '127.0.0.1', port: s }
const [host, port] = s.split(':')
if (!port) throw new Error('Bootstrap node format is host:port')

1
lib/errors.js

@ -1,6 +1,5 @@
exports.UNKNOWN_COMMAND = 1
exports.INVALID_TOKEN = 2
exports.BAD_COMMAND = 3
exports.TIMEOUT = new Error('Request timed out')
exports.TIMEOUT.code = 'ETIMEDOUT'

7
lib/query.js

@ -34,13 +34,18 @@ module.exports = class Query extends Readable {
this._oncyclebound = this._oncycle.bind(this)
const nodes = opts.nodes || opts.closestNodes
const replies = opts.replies || opts.closestReplies
if (nodes) {
// add them reverse as we pop below
if (nodes) {
for (let i = nodes.length - 1; i >= 0; i--) {
const node = nodes[i]
this._addPending({ id: node.id || peer.id(node.host, node.port), host: node.host, port: node.port }, null)
}
} else if (replies) {
for (let i = replies.length - 1; i >= 0; i--) {
this._addPending(replies[i].from, null)
}
}
}

15
test.js

@ -26,6 +26,7 @@ tape('make bigger swarm', async function (t) {
}
}
const replies = q.closestReplies
t.ok(found, 'found target in ' + messages + ' message(s)')
q = swarm[490].query({ command: 'find_node', target }, { nodes: q.closestNodes })
@ -42,6 +43,20 @@ tape('make bigger swarm', async function (t) {
t.ok(found, 'found target again in ' + messages + ' message(s)')
q = swarm[470].query({ command: 'find_node', target }, { replies })
messages = 0
found = false
for await (const data of q) {
messages++
if (data.from.id && data.from.id.equals(target)) {
found = true
break
}
}
t.ok(found, 'found target again in ' + messages + ' message(s) with original replies')
const { firewalled, host, port } = swarm[490]
t.same(firewalled, false)

Loading…
Cancel
Save