You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
Mathias Buus 022e95e197 4.1.7 5 years ago
example add example from readme 6 years ago
lib graceful destroy 5 years ago
.gitignore Next version (#2) 6 years ago
.travis.yml we dropped 4 6 years ago
README.md fix readme example code 6 years ago
example.js Next version (#2) 6 years ago
index.js graceful destroy 5 years ago
package.json 4.1.7 5 years ago
schema.proto Next version (#2) 6 years ago
test.js Next version (#2) 6 years ago

README.md

dht-rpc

Make RPC calls over a Kademlia based DHT.

npm install dht-rpc

build status

Key Features

  • UDP hole punching support
  • Easily add any command to your DHT
  • Streaming queries and updates

Usage

Here is an example implementing a simple key value store

First spin up a bootstrap node. You can make multiple if you want for redundancy.

const dht = require('dht-rpc')

// Set ephemeral: true so other peers do not add us to the peer list, simply bootstrap
const bootstrap = dht({ ephemeral: true })

bootstrap.listen(10001)

Now lets make some dht nodes that can store values in our key value store.

const dht = require('dht-rpc')
const crypto = require('crypto')

// Let's create 100 dht nodes for our example.
for (var i = 0; i < 100; i++) createNode()

function createNode () {
  const node = dht({
    bootstrap: [
      'localhost:10001'
    ]
  })

  const values = new Map()

  node.command('values', {
    // When we are the closest node and someone is sending us a "store" command
    update (query, cb) {
      if (!query.value) return cb()

      // Use the hash of the value as the key
      const key = sha256(query.value).toString('hex')
      values.set(key, query.value)
      console.log('Storing', key, '-->', query.value.toString())
      cb()
    },
    // When someone is querying for a "lookup" command
    query (query, cb) {
      const value = values.get(query.target.toString('hex'))
      cb(null, value)
    }
  })
}

function sha256 (val) {
  return crypto.createHash('sha256').update(val).digest()
}

To insert a value into this dht make another script that does this following

// Set ephemeral: true as we are not part of the network.
const node = dht({ ephemeral: true })

node.update('values', sha256(val), value, function (err, res) {
  if (err) throw err
  console.log('Inserted', sha256(val).toString('hex'))
})

Then after inserting run this script to query for a value

node.query('values', Buffer.from(hexFromAbove, 'hex'))
  .on('data', function (data) {
    if (data.value && sha256(data.value).toString('hex') === hexFromAbove) {
      // We found the value! Destroy the query stream as there is no need to continue.
      console.log(val, '-->', data.value.toString())
      this.destroy()
    }
  })
  .on('end', function () {
    console.log('(query finished)')
  })

API

const node = dht([options])

Create a new DHT node.

Options include:

{
  // Whether or not this node is ephemeral or should join the routing table
  ephemeral: false,
  // A list of bootstrap nodes
  bootstrap: [ 'bootstrap-node.com:24242', ... ],
  // Optionally pass in your own UDP socket to use.
  socket: udpSocket
}

node.command(name, cmd)

Define a new RPC command. cmd should look like this

{
  // Query handler
  query (query, cb),
  // Update handler. only triggered when we are one of the closest nodes to the target
  update (query, cb),
  // Optional value encoding for the query/update incoming value. Defaults to binary.
  inputEncoding: 'json', 'utf-8', object,
  // Optional value encoding for the query/update outgoing value. Defaults to binary.
  outputEncoding: (same as above),
  valueEncoding: (sets both input/output encoding to this)
}

The query object in the query/update function looks like this:

{
  // always the same as your command def
  command: 'command-name',
  // the node who sent the query/update
  node: { port, host, id },
  // the query/update target (32 byte target)
  target: Buffer,
  // the query/update payload decoded with the inputEncoding
  value
}

You should call the query/update callback with (err, value) where value will be encoded using the outputEncoding and returned to the node.

const stream = node.query(name, target, [value], [callback])

Send a query command.

If you set a valueEncoding when defining the command the value will be encoded.

Returns a result stream that emits data that looks like this:

{
  // was this a query/update response
  type: dht.QUERY,
  // who sent this response
  node: { peer, host, id },
  // the response payload decoded using the outputEncoding
  value
}

If you pass a callback the stream will be error handled and buffered and the content passed as an array.

const stream = node.update(name, target, [value], [callback])

Send a update command

Same options/results as above but the response data will have type set to dht.UPDATE.

const stream = node.queryAndUpdate(name, target, [value], [callback])

Send a combined query and update command.

Will keep querying until it finds the closest nodes to the target and then issue an update. More efficient than doing a query/update yourself.

Same options/results as above but the response data will include both query and update results.

node.destroy(onclose)

Fully destroys the dht node.

node.bootstrap(cb)

Re-bootstrap the DHT node. Normally you shouldn't have to call this.

node.holepunch(peer, cb)

UDP holepunch to another peer. The DHT does this automatically when it cannot reach another peer but you can use this yourself also.

Peer should look like this:

{
  port,
  host,
  // referrer should be the node/peer that
  // told you about this node.
  referrer: { port, host }
}

node.listen([port], [address], [onlistening])

Explicitly bind the dht node to a certain port/address.

node.on('listening')

Emitted when the node starts listening on a udp port.

node.on('close')

Emitted when the node is fully closed.

node.on('holepunch', fromPeer, toPeer)

Emitted when the node is helping fromPeer udp holepunch to toPeer.