You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

330 lines
11 KiB

// @flow
import split2 from 'split2'
import { spawn } from 'child_process'
import EventEmitter from 'events'
import getPort from 'get-port'
import { mainLog, lndLog, lndLogGetLevel } from '../utils/log'
import { fetchBlockHeight } from './util'
import LndConfig from './config'
// Sync statuses
const CHAIN_SYNC_PENDING = 'chain-sync-pending'
const CHAIN_SYNC_WAITING = 'chain-sync-waiting'
const CHAIN_SYNC_IN_PROGRESS = 'chain-sync-started'
const CHAIN_SYNC_COMPLETE = 'chain-sync-finished'
// Events
const ERROR = 'error'
const EXIT = 'exit'
const WALLET_UNLOCKER_GRPC_ACTIVE = 'wallet-unlocker-grpc-active'
const LIGHTNING_GRPC_ACTIVE = 'lightning-grpc-active'
const GOT_CURRENT_BLOCK_HEIGHT = 'got-current-block-height'
const GOT_LND_BLOCK_HEIGHT = 'got-lnd-block-height'
const GOT_LND_CFILTER_HEIGHT = 'got-lnd-cfilter-height'
/**
* Wrapper class for Lnd to run and monitor it in Neutrino mode.
* @extends EventEmitter
*/
class Neutrino extends EventEmitter {
lndConfig: LndConfig
process: any
walletUnlockerGrpcActive: boolean
lightningGrpcActive: boolean
chainSyncStatus: string
currentBlockHeight: number
lndBlockHeight: number
lndCfilterHeight: number
lastError: ?string
constructor(lndConfig: LndConfig) {
super()
this.lndConfig = lndConfig
this.process = null
this.walletUnlockerGrpcActive = false
this.lightningGrpcActive = false
this.chainSyncStatus = CHAIN_SYNC_PENDING
this.currentBlockHeight = 0
this.lndBlockHeight = 0
this.lndCfilterHeight = 0
this.lastError = null
}
static incrementIfHigher = (context: any, property: string, newVal: any): boolean => {
const { [property]: oldVal } = context
if (newVal > oldVal) {
context[property] = newVal
return true
}
return false
}
/**
* Start the Lnd process in Neutrino mode.
* @return {Number} PID of the Lnd process that was started.
*/
async start() {
if (this.process) {
return Promise.reject(
new Error('Neutrino process with PID ${this.process.pid} already exists.')
)
}
mainLog.info('Starting lnd in neutrino mode')
mainLog.info(' > binaryPath', this.lndConfig.binaryPath)
mainLog.info(' > rpcProtoPath:', this.lndConfig.rpcProtoPath)
mainLog.info(' > host:', this.lndConfig.host)
mainLog.info(' > cert:', this.lndConfig.cert)
mainLog.info(' > macaroon:', this.lndConfig.macaroon)
// Get a free port to use as the rpc listen address.
const rpcListen = await getPort({
host: 'localhost',
port: [10009, 10008, 10007, 10006, 10005, 10004, 10003, 10002, 10001]
})
this.lndConfig.host = `localhost:${rpcListen}`
// Get a free port to use as the p2p listen address.
const p2pListen = await getPort({
host: '0.0.0.0',
port: [9735, 9734, 9733, 9732, 9731, 9736, 9737, 9738, 9739]
})
//Configure lnd.
const neutrinoArgs = [
`--configfile=${this.lndConfig.configPath}`,
`--lnddir=${this.lndConfig.lndDir}`,
`--listen=0.0.0.0:${p2pListen}`,
`--rpclisten=localhost:${rpcListen}`,
`${this.lndConfig.autopilot ? '--autopilot.active' : ''}`,
`${this.lndConfig.alias ? `--alias=${this.lndConfig.alias}` : ''}`
]
// Configure neutrino backend.
if (this.lndConfig.network === 'mainnet') {
neutrinoArgs.push('--neutrino.connect=mainnet1-btcd.zaphq.io')
neutrinoArgs.push('--neutrino.connect=mainnet2-btcd.zaphq.io')
} else {
neutrinoArgs.push('--neutrino.connect=testnet1-btcd.zaphq.io')
neutrinoArgs.push('--neutrino.connect=testnet2-btcd.zaphq.io')
}
// Log the final config.
mainLog.info(
'Spawning Neutrino process: %s %s',
this.lndConfig.binaryPath,
neutrinoArgs.filter(v => v != '').join(' ')
)
// Spawn lnd process.
this.process = spawn(this.lndConfig.binaryPath, neutrinoArgs)
.on('error', error => {
mainLog.debug('Neutrino process received "error" event with error: %s', error)
this.emit(ERROR, error, this.lastError)
})
.on('exit', (code, signal) => {
mainLog.debug(
'Neutrino process received "exit" event with code %s and signal %s',
code,
signal
)
this.process = null
this.emit(EXIT, code, signal, this.lastError)
})
// Listen for when neutrino prints data to stderr.
this.process.stderr.pipe(split2()).on('data', line => {
lndLog.error(line)
if (line.startsWith('panic:')) {
this.lastError = line
}
})
// Listen for when neutrino prints data to stdout.
this.process.stdout.pipe(split2()).on('data', line => {
const level = lndLogGetLevel(line)
lndLog[level](line)
if (level === 'error') {
this.lastError = line.split('[ERR] LTND:')[1]
}
// password RPC server listening (wallet unlocker started).
if (!this.walletUnlockerGrpcActive && !this.lightningGrpcActive) {
if (line.includes('RPC server listening on') && line.includes('password')) {
this.walletUnlockerGrpcActive = true
this.emit(WALLET_UNLOCKER_GRPC_ACTIVE)
}
}
// RPC server listening (wallet unlocked).
if (!this.lightningGrpcActive) {
if (line.includes('RPC server listening on') && !line.includes('password')) {
this.lightningGrpcActive = true
this.emit(LIGHTNING_GRPC_ACTIVE)
}
}
// If the sync has already completed then we don't need to do anything else.
if (this.is(CHAIN_SYNC_COMPLETE)) {
return
}
if (this.is(CHAIN_SYNC_PENDING) || this.is(CHAIN_SYNC_IN_PROGRESS)) {
// If we cant get a connectionn to the backend.
if (
line.includes('Waiting for chain backend to finish sync') ||
line.includes('Waiting for block headers to sync, then will start cfheaders sync')
) {
this.setState(CHAIN_SYNC_WAITING)
}
// If we are still waiting for the back end to finish synncing.
if (line.includes('No sync peer candidates available')) {
this.setState(CHAIN_SYNC_WAITING)
}
}
// Lnd syncing has started or resumed.
if (this.is(CHAIN_SYNC_PENDING) || this.is(CHAIN_SYNC_WAITING)) {
const match =
line.match(/Syncing to block height (\d+)/) ||
line.match(/Starting cfilters sync at block_height=(\d+)/)
if (match) {
// Notify that chhain syncronisation has now started.
this.setState(CHAIN_SYNC_IN_PROGRESS)
// This is the latest block that BTCd is aware of.
const btcdHeight = match[1]
this.setCurrentBlockHeight(btcdHeight)
// The height returned from the LND log output may not be the actual current block height (this is the case
// when BTCD is still in the middle of syncing the blockchain) so try to fetch thhe current height from from
// some block explorers just incase.
fetchBlockHeight()
.then(height => (height > btcdHeight ? this.setCurrentBlockHeight(height) : null))
// If we were unable to fetch from bock explorers at least we already have what BTCd gave us so just warn.
.catch(err => mainLog.warn(`Unable to fetch block height: ${err.message}`))
}
}
// Lnd as received some updated block data.
if (this.is(CHAIN_SYNC_WAITING) || this.is(CHAIN_SYNC_IN_PROGRESS)) {
let height
let cfilter
let match
if ((match = line.match(/Caught up to height (\d+)/))) {
height = match[1]
} else if ((match = line.match(/Processed \d* blocks? in the last.+\(height (\d+)/))) {
height = match[1]
} else if ((match = line.match(/Difficulty retarget at block height (\d+)/))) {
height = match[1]
} else if ((match = line.match(/Fetching set of headers from tip \(height=(\d+)/))) {
height = match[1]
} else if ((match = line.match(/Waiting for filter headers \(height=(\d+)\) to catch/))) {
height = match[1]
} else if ((match = line.match(/Writing filter headers up to height=(\d+)/))) {
height = match[1]
} else if ((match = line.match(/Starting cfheaders sync at block_height=(\d+)/))) {
height = match[1]
} else if ((match = line.match(/Got cfheaders from height=(\d*) to height=(\d+)/))) {
cfilter = match[2]
} else if ((match = line.match(/Writing filter headers up to height=(\d*)/))) {
cfilter = match[1]
} else if ((match = line.match(/Verified \d* filter headers? in the.+\(height (\d+)/))) {
cfilter = match[1]
} else if ((match = line.match(/Fetching filter for height=(\d+)/))) {
cfilter = match[1]
}
if (height) {
this.setState(CHAIN_SYNC_IN_PROGRESS)
this.setLndBlockHeight(height)
}
if (cfilter) {
this.setState(CHAIN_SYNC_IN_PROGRESS)
this.setLndCfilterHeight(cfilter)
}
// Lnd syncing has completed.
if (line.includes('Chain backend is fully synced')) {
this.setState(CHAIN_SYNC_COMPLETE)
}
}
})
return this.process
}
/**
* Stop the Lnd process.
*/
kill(signalName: string = 'SIGINT') {
if (this.process) {
mainLog.info('Killing Neutrino process...')
this.process.kill(signalName)
}
}
/**
* Check if the current state matches the passted in state.
* @param {String} state State to compare against the current state.
* @return {Boolean} Boolean indicating if the current state matches the passed in state.
*/
is(state: string) {
return this.chainSyncStatus === state
}
/**
* Set the current state and emit an event to notify others if te state as canged.
* @param {String} state Target state.
*/
setState(state: string) {
if (state !== this.chainSyncStatus) {
this.chainSyncStatus = state
this.emit(state)
}
}
/**
* Set the current block height and emit an event to notify others if it has changed.
* @param {String|Number} height Block height
*/
setCurrentBlockHeight(height: number | string) {
const heightAsNumber = Number(height)
const changed = Neutrino.incrementIfHigher(this, 'currentBlockHeight', heightAsNumber)
if (changed) {
this.emit(GOT_CURRENT_BLOCK_HEIGHT, heightAsNumber)
}
}
/**
* Set the lnd block height and emit an event to notify others if it has changed.
* @param {String|Number} height Block height
*/
setLndBlockHeight(height: number | string) {
const heightAsNumber = Number(height)
const changed = Neutrino.incrementIfHigher(this, 'lndBlockHeight', heightAsNumber)
if (changed) {
this.emit(GOT_LND_BLOCK_HEIGHT, heightAsNumber)
this.setCurrentBlockHeight(heightAsNumber)
}
}
/**
* Set the lnd cfilter height and emit an event to notify others if it has changed.
* @param {String|Number} height Block height
*/
setLndCfilterHeight(height: number | string) {
const heightAsNumber = Number(height)
const changed = Neutrino.incrementIfHigher(this, 'lndCfilterHeight', heightAsNumber)
if (changed) {
this.emit(GOT_LND_CFILTER_HEIGHT, heightAsNumber)
this.setCurrentBlockHeight(heightAsNumber)
}
}
}
export default Neutrino