Browse Source

bitcoind support, single source of chain data

patch-1
John Cantrell 3 years ago
parent
commit
a863e1be8c
  1. 826
      Cargo.lock
  2. 25
      Cargo.toml
  3. 300
      src/chain/bitcoind_client.rs
  4. 21
      src/chain/broadcaster.rs
  5. 65
      src/chain/listener.rs
  6. 251
      src/chain/listener_database.rs
  7. 97
      src/chain/manager.rs
  8. 6
      src/chain/mod.rs
  9. 128
      src/chain/wallet.rs
  10. 72
      src/config.rs
  11. 7
      src/database/mod.rs
  12. 61
      src/database/node.rs
  13. 21
      src/error.rs
  14. 56
      src/event_handler.rs
  15. 9
      src/grpc/admin.rs
  16. 4
      src/grpc/node.rs
  17. 20
      src/http/admin.rs
  18. 40
      src/http/node.rs
  19. 138
      src/main.rs
  20. 199
      src/node.rs
  21. 22
      src/services/admin.rs
  22. 9
      src/services/node.rs

826
Cargo.lock

File diff suppressed because it is too large

25
Cargo.toml

@ -12,18 +12,17 @@ name = "senseid"
path = "src/main.rs"
[dependencies]
lightning = { version = "0.0.104", features = ["max_level_trace"] }
lightning-block-sync = { version = "0.0.104", features = [ "rpc-client" ] }
lightning-invoice = { version = "0.12.0" }
lightning-net-tokio = { version = "0.0.104" }
lightning-persister = { version = "0.0.104" }
lightning-background-processor = { version = "0.0.104" }
lightning = { version = "0.0.104", features = ["max_level_trace"], path = "/Users/developer/Development/rust-lightning/lightning" }
lightning-block-sync = { version = "0.0.104", features = [ "rpc-client" ], path = "/Users/developer/Development/rust-lightning/lightning-block-sync" }
lightning-invoice = { version = "0.12.0", path = "/Users/developer/Development/rust-lightning/lightning-invoice" }
lightning-net-tokio = { version = "0.0.104", path = "/Users/developer/Development/rust-lightning/lightning-net-tokio" }
lightning-persister = { version = "0.0.104", path = "/Users/developer/Development/rust-lightning/lightning-persister" }
lightning-background-processor = { version = "0.0.104", path = "/Users/developer/Development/rust-lightning/lightning-background-processor" }
base64 = "0.13.0"
bitcoin = "0.27"
bitcoin-bech32 = "0.12"
bech32 = "0.8"
hex = "0.3"
futures = "0.3"
chrono = "0.4"
rand = "0.4"
@ -32,27 +31,27 @@ http = "0.2"
tower = { version = "0.4", features = ["full"] }
serde = { version = "^1.0", features = ["derive"] }
serde_json = { version = "1.0" }
ureq = { version = "~2.2.0", features = ["json"] }
tokio = { version = "1", features = [ "io-util", "macros", "rt", "rt-multi-thread", "sync", "net", "time" ] }
log = "^0.4"
bdk = { git = "https://github.com/johncantrell97/bdk" }
bdk-ldk = { git = "https://github.com/johncantrell97/bdk-ldk", rev = "44a055721735779a6e225e8d4418660741d79dec" }
bitcoincore-rpc = "0.14"
bdk = { version = "0.16.0", default-features=false, features = ["sqlite"] }
tonic = "0.6"
prost = "0.9"
pin-project = "1.0"
hyper = "0.14"
clap = { version = "3.0", features = [ "derive" ] }
headers = "0.3"
tindercrypt = { version = "0.3.2", default-features = false }
portpicker = "0.1"
rusqlite = { version = "0.26", features = ["uuid", "bundled"] }
rusqlite = { version = "0.25.3", features = ["uuid", "bundled"] }
uuid = { version = "0.8", features = ["serde", "v4"] }
tindercrypt = { version = "0.3.2", default-features = false }
macaroon = "0.2"
tower-http = { version = "0.2.0", features = ["fs", "trace", "cors"] }
tower-cookies = "0.4"
dirs = "4.0"
lazy_static = "1.4"
public-ip = "0.2"
rust-embed="6.3.0"
mime_guess = { version = "2" }
[build-dependencies]
tonic-build = "0.6"

300
src/chain/bitcoind_client.rs

@ -0,0 +1,300 @@
use base64;
use bitcoin::blockdata::block::Block;
use bitcoin::blockdata::transaction::Transaction;
use bitcoin::consensus::encode;
use bitcoin::hash_types::{BlockHash, Txid};
use lightning::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator};
use lightning_block_sync::http::HttpEndpoint;
use lightning_block_sync::rpc::RpcClient;
use lightning_block_sync::{AsyncBlockSourceResult, BlockHeaderData, BlockSource};
use serde_json;
use std::collections::HashMap;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
use bitcoin::hashes::hex::FromHex;
use lightning_block_sync::http::JsonResponse;
use std::convert::TryInto;
pub struct FeeResponse {
pub feerate_sat_per_kw: Option<u32>,
pub errored: bool,
}
impl TryInto<FeeResponse> for JsonResponse {
type Error = std::io::Error;
fn try_into(self) -> std::io::Result<FeeResponse> {
let errored = !self.0["errors"].is_null();
Ok(FeeResponse {
errored,
feerate_sat_per_kw: match self.0["feerate"].as_f64() {
// Bitcoin Core gives us a feerate in BTC/KvB, which we need to convert to
// satoshis/KW. Thus, we first multiply by 10^8 to get satoshis, then divide by 4
// to convert virtual-bytes into weight units.
Some(feerate_btc_per_kvbyte) => {
Some((feerate_btc_per_kvbyte * 100_000_000.0 / 4.0).round() as u32)
}
None => None,
},
})
}
}
pub struct BlockchainInfo {
pub latest_height: usize,
pub latest_blockhash: BlockHash,
pub chain: String,
}
impl TryInto<BlockchainInfo> for JsonResponse {
type Error = std::io::Error;
fn try_into(self) -> std::io::Result<BlockchainInfo> {
Ok(BlockchainInfo {
latest_height: self.0["blocks"].as_u64().unwrap() as usize,
latest_blockhash: BlockHash::from_hex(self.0["bestblockhash"].as_str().unwrap())
.unwrap(),
chain: self.0["chain"].as_str().unwrap().to_string(),
})
}
}
pub struct BitcoindClient {
bitcoind_rpc_client: Arc<Mutex<RpcClient>>,
host: String,
port: u16,
rpc_user: String,
rpc_password: String,
fees: Arc<HashMap<Target, AtomicU32>>,
handle: tokio::runtime::Handle,
}
#[derive(Clone, Eq, Hash, PartialEq)]
pub enum Target {
Background,
Normal,
HighPriority,
}
impl BlockSource for &BitcoindClient {
fn get_header<'a>(
&'a mut self,
header_hash: &'a BlockHash,
height_hint: Option<u32>,
) -> AsyncBlockSourceResult<'a, BlockHeaderData> {
Box::pin(async move {
let mut rpc = self.bitcoind_rpc_client.lock().await;
rpc.get_header(header_hash, height_hint).await
})
}
fn get_block<'a>(
&'a mut self,
header_hash: &'a BlockHash,
) -> AsyncBlockSourceResult<'a, Block> {
Box::pin(async move {
let mut rpc = self.bitcoind_rpc_client.lock().await;
rpc.get_block(header_hash).await
})
}
fn get_best_block<'a>(&'a mut self) -> AsyncBlockSourceResult<(BlockHash, Option<u32>)> {
Box::pin(async move {
let mut rpc = self.bitcoind_rpc_client.lock().await;
rpc.get_best_block().await
})
}
}
/// The minimum feerate we are allowed to send, as specify by LDK.
const MIN_FEERATE: u32 = 253;
impl BitcoindClient {
pub async fn new(
host: String,
port: u16,
rpc_user: String,
rpc_password: String,
handle: tokio::runtime::Handle,
) -> std::io::Result<Self> {
let http_endpoint = HttpEndpoint::for_host(host.clone()).with_port(port);
let rpc_credentials =
base64::encode(format!("{}:{}", rpc_user.clone(), rpc_password.clone()));
let mut bitcoind_rpc_client = RpcClient::new(&rpc_credentials, http_endpoint)?;
let _dummy = bitcoind_rpc_client
.call_method::<BlockchainInfo>("getblockchaininfo", &vec![])
.await
.map_err(|_| {
std::io::Error::new(std::io::ErrorKind::PermissionDenied,
"Failed to make initial call to bitcoind - please check your RPC user/password and access settings")
})?;
let mut fees: HashMap<Target, AtomicU32> = HashMap::new();
fees.insert(Target::Background, AtomicU32::new(MIN_FEERATE));
fees.insert(Target::Normal, AtomicU32::new(2000));
fees.insert(Target::HighPriority, AtomicU32::new(5000));
let client = Self {
bitcoind_rpc_client: Arc::new(Mutex::new(bitcoind_rpc_client)),
host,
port,
rpc_user,
rpc_password,
fees: Arc::new(fees),
handle: handle.clone(),
};
BitcoindClient::poll_for_fee_estimates(
client.fees.clone(),
client.bitcoind_rpc_client.clone(),
handle,
);
Ok(client)
}
fn poll_for_fee_estimates(
fees: Arc<HashMap<Target, AtomicU32>>,
rpc_client: Arc<Mutex<RpcClient>>,
handle: tokio::runtime::Handle,
) {
handle.spawn(async move {
loop {
let background_estimate = {
let mut rpc = rpc_client.lock().await;
let background_conf_target = serde_json::json!(144);
let background_estimate_mode = serde_json::json!("ECONOMICAL");
let resp = rpc
.call_method::<FeeResponse>(
"estimatesmartfee",
&vec![background_conf_target, background_estimate_mode],
)
.await
.unwrap();
match resp.feerate_sat_per_kw {
Some(feerate) => std::cmp::max(feerate, MIN_FEERATE),
None => MIN_FEERATE,
}
};
let normal_estimate = {
let mut rpc = rpc_client.lock().await;
let normal_conf_target = serde_json::json!(18);
let normal_estimate_mode = serde_json::json!("ECONOMICAL");
let resp = rpc
.call_method::<FeeResponse>(
"estimatesmartfee",
&vec![normal_conf_target, normal_estimate_mode],
)
.await
.unwrap();
match resp.feerate_sat_per_kw {
Some(feerate) => std::cmp::max(feerate, MIN_FEERATE),
None => 2000,
}
};
let high_prio_estimate = {
let mut rpc = rpc_client.lock().await;
let high_prio_conf_target = serde_json::json!(6);
let high_prio_estimate_mode = serde_json::json!("CONSERVATIVE");
let resp = rpc
.call_method::<FeeResponse>(
"estimatesmartfee",
&vec![high_prio_conf_target, high_prio_estimate_mode],
)
.await
.unwrap();
match resp.feerate_sat_per_kw {
Some(feerate) => std::cmp::max(feerate, MIN_FEERATE),
None => 5000,
}
};
fees.get(&Target::Background)
.unwrap()
.store(background_estimate, Ordering::Release);
fees.get(&Target::Normal)
.unwrap()
.store(normal_estimate, Ordering::Release);
fees.get(&Target::HighPriority)
.unwrap()
.store(high_prio_estimate, Ordering::Release);
tokio::time::sleep(Duration::from_secs(60)).await;
}
});
}
pub fn get_new_rpc_client(&self) -> std::io::Result<RpcClient> {
let http_endpoint = HttpEndpoint::for_host(self.host.clone()).with_port(self.port);
let rpc_credentials = base64::encode(format!(
"{}:{}",
self.rpc_user.clone(),
self.rpc_password.clone()
));
RpcClient::new(&rpc_credentials, http_endpoint)
}
pub async fn send_raw_transaction(&self, raw_tx: String) {
let mut rpc = self.bitcoind_rpc_client.lock().await;
let raw_tx_json = serde_json::json!(raw_tx);
rpc.call_method::<Txid>("sendrawtransaction", &[raw_tx_json])
.await
.unwrap();
}
pub async fn get_blockchain_info(&self) -> BlockchainInfo {
let mut rpc = self.bitcoind_rpc_client.lock().await;
rpc.call_method::<BlockchainInfo>("getblockchaininfo", &vec![])
.await
.unwrap()
}
}
impl FeeEstimator for BitcoindClient {
fn get_est_sat_per_1000_weight(&self, confirmation_target: ConfirmationTarget) -> u32 {
match confirmation_target {
ConfirmationTarget::Background => self
.fees
.get(&Target::Background)
.unwrap()
.load(Ordering::Acquire),
ConfirmationTarget::Normal => self
.fees
.get(&Target::Normal)
.unwrap()
.load(Ordering::Acquire),
ConfirmationTarget::HighPriority => self
.fees
.get(&Target::HighPriority)
.unwrap()
.load(Ordering::Acquire),
}
}
}
impl BroadcasterInterface for BitcoindClient {
fn broadcast_transaction(&self, tx: &Transaction) {
let bitcoind_rpc_client = self.bitcoind_rpc_client.clone();
let tx_serialized = serde_json::json!(encode::serialize_hex(tx));
self.handle.spawn(async move {
let mut rpc = bitcoind_rpc_client.lock().await;
// This may error due to RL calling `broadcast_transaction` with the same transaction
// multiple times, but the error is safe to ignore.
match rpc
.call_method::<Txid>("sendrawtransaction", &vec![tx_serialized])
.await
{
Ok(_) => {}
Err(e) => {
let err_str = e.get_ref().unwrap().to_string();
if !err_str.contains("Transaction already in block chain")
&& !err_str.contains("Inputs missing or spent")
&& !err_str.contains("bad-txns-inputs-missingorspent")
&& !err_str.contains("non-BIP68-final")
&& !err_str.contains("insufficient fee, rejecting replacement ")
{
panic!("{}", e);
}
}
}
});
}
}

21
src/chain/broadcaster.rs

@ -0,0 +1,21 @@
use std::sync::Arc;
use bitcoin::Transaction;
use lightning::chain::chaininterface::BroadcasterInterface;
use super::{bitcoind_client::BitcoindClient, listener_database::ListenerDatabase};
pub struct SenseiBroadcaster {
pub bitcoind_client: Arc<BitcoindClient>,
pub listener_database: ListenerDatabase,
}
impl BroadcasterInterface for SenseiBroadcaster {
fn broadcast_transaction(&self, tx: &Transaction) {
self.bitcoind_client.broadcast_transaction(tx);
// TODO: there's a bug here if the broadcast fails
// best solution is to probably setup a zmq listener
self.listener_database.process_mempool_tx(tx);
}
}

65
src/chain/listener.rs

@ -0,0 +1,65 @@
use std::{
collections::HashMap,
sync::{Arc, Mutex},
};
use crate::node::{ChainMonitor, ChannelManager};
use bitcoin::{Block, BlockHeader};
use lightning::chain::Listen;
use super::listener_database::ListenerDatabase;
pub struct SenseiChainListener {
listeners: Mutex<HashMap<String, (Arc<ChainMonitor>, Arc<ChannelManager>, ListenerDatabase)>>,
}
impl SenseiChainListener {
pub fn new() -> Self {
Self {
listeners: Mutex::new(HashMap::new()),
}
}
fn get_key(
&self,
listener: &(Arc<ChainMonitor>, Arc<ChannelManager>, ListenerDatabase),
) -> String {
listener.1.get_our_node_id().to_string()
}
pub fn add_listener(
&self,
listener: (Arc<ChainMonitor>, Arc<ChannelManager>, ListenerDatabase),
) {
let mut listeners = self.listeners.lock().unwrap();
listeners.insert(self.get_key(&listener), listener);
}
pub fn remove_listener(
&self,
listener: (Arc<ChainMonitor>, Arc<ChannelManager>, ListenerDatabase),
) {
let mut listeners = self.listeners.lock().unwrap();
listeners.remove(&self.get_key(&listener));
}
}
impl Listen for SenseiChainListener {
fn block_connected(&self, block: &Block, height: u32) {
let listeners = self.listeners.lock().unwrap();
for (chain_monitor, channel_manager, listener_database) in listeners.values() {
channel_manager.block_connected(block, height);
chain_monitor.block_connected(block, height);
listener_database.block_connected(block, height);
}
}
fn block_disconnected(&self, header: &BlockHeader, height: u32) {
let listeners = self.listeners.lock().unwrap();
for (chain_monitor, channel_manager, listener_database) in listeners.values() {
channel_manager.block_disconnected(header, height);
chain_monitor.block_disconnected(header, height);
listener_database.block_disconnected(header, height);
}
}
}

251
src/chain/listener_database.rs

@ -0,0 +1,251 @@
use bdk::{
database::{BatchOperations, Database, SqliteDatabase},
BlockTime, KeychainKind, LocalUtxo, TransactionDetails,
};
use bitcoin::{Block, BlockHeader, OutPoint, Script, Transaction, TxOut, Txid};
use lightning::chain::Listen;
use crate::database::node::NodeDatabase;
#[derive(Clone)]
pub struct ListenerDatabase {
bdk_db_path: String,
node_db_path: String,
}
impl ListenerDatabase {
pub fn new(bdk_db_path: String, node_db_path: String) -> Self {
Self {
bdk_db_path,
node_db_path,
}
}
pub fn process_mempool_tx(&self, tx: &Transaction) {
let mut database = SqliteDatabase::new(self.bdk_db_path.clone());
let mut internal_max_deriv = None;
let mut external_max_deriv = None;
self.process_tx(
tx,
&mut database,
None,
None,
&mut internal_max_deriv,
&mut external_max_deriv,
);
let current_ext = database
.get_last_index(KeychainKind::External)
.unwrap()
.unwrap_or(0);
let first_ext_new = external_max_deriv.map(|x| x + 1).unwrap_or(0);
if first_ext_new > current_ext {
database
.set_last_index(KeychainKind::External, first_ext_new)
.unwrap();
}
let current_int = database
.get_last_index(KeychainKind::Internal)
.unwrap()
.unwrap_or(0);
let first_int_new = internal_max_deriv.map(|x| x + 1).unwrap_or(0);
if first_int_new > current_int {
database
.set_last_index(KeychainKind::Internal, first_int_new)
.unwrap();
}
}
pub fn process_tx(
&self,
tx: &Transaction,
database: &mut SqliteDatabase,
confirmation_height: Option<u32>,
confirmation_time: Option<u64>,
internal_max_deriv: &mut Option<u32>,
external_max_deriv: &mut Option<u32>,
) {
let mut incoming: u64 = 0;
let mut outgoing: u64 = 0;
let mut inputs_sum: u64 = 0;
let mut outputs_sum: u64 = 0;
// look for our own inputs
for (i, input) in tx.input.iter().enumerate() {
if let Some(previous_output) = database
.get_previous_output(&input.previous_output)
.unwrap()
{
inputs_sum += previous_output.value;
if database.is_mine(&previous_output.script_pubkey).unwrap() {
outgoing += previous_output.value;
database.del_utxo(&input.previous_output).unwrap();
}
}
}
for (i, output) in tx.output.iter().enumerate() {
// to compute the fees later
outputs_sum += output.value;
// this output is ours, we have a path to derive it
if let Some((keychain, child)) = database
.get_path_from_script_pubkey(&output.script_pubkey)
.unwrap()
{
database
.set_utxo(&LocalUtxo {
outpoint: OutPoint::new(tx.txid(), i as u32),
txout: output.clone(),
keychain,
})
.unwrap();
incoming += output.value;
// TODO: implement this
if keychain == KeychainKind::Internal
&& (internal_max_deriv.is_none() || child > internal_max_deriv.unwrap_or(0))
{
*internal_max_deriv = Some(child);
} else if keychain == KeychainKind::External
&& (external_max_deriv.is_none() || child > external_max_deriv.unwrap_or(0))
{
*external_max_deriv = Some(child);
}
}
}
if incoming > 0 || outgoing > 0 {
let tx = TransactionDetails {
txid: tx.txid(),
transaction: Some(tx.clone()),
received: incoming,
sent: outgoing,
confirmation_time: BlockTime::new(confirmation_height, confirmation_time),
verified: true,
fee: Some(inputs_sum.saturating_sub(outputs_sum)),
};
database.set_tx(&tx).unwrap();
}
}
}
impl Listen for ListenerDatabase {
fn block_connected(&self, block: &Block, height: u32) {
let mut database = SqliteDatabase::new(self.bdk_db_path.clone());
let mut internal_max_deriv = None;
let mut external_max_deriv = None;
// iterate all transactions in the block, looking for ones we care about
for tx in &block.txdata {
self.process_tx(
tx,
&mut database,
Some(height),
Some(block.header.time.into()),
&mut internal_max_deriv,
&mut external_max_deriv,
)
}
let current_ext = database
.get_last_index(KeychainKind::External)
.unwrap()
.unwrap_or(0);
let first_ext_new = external_max_deriv.map(|x| x + 1).unwrap_or(0);
if first_ext_new > current_ext {
database
.set_last_index(KeychainKind::External, first_ext_new)
.unwrap();
}
let current_int = database
.get_last_index(KeychainKind::Internal)
.unwrap()
.unwrap_or(0);
let first_int_new = internal_max_deriv.map(|x| x + 1).unwrap_or(0);
if first_int_new > current_int {
database
.set_last_index(KeychainKind::Internal, first_int_new)
.unwrap();
}
// TODO: there's probably a bug here.
// need to atomicly update bdk database and this last_sync
let mut node_database = NodeDatabase::new(self.node_db_path.clone());
node_database.update_last_sync(block.block_hash()).unwrap();
}
fn block_disconnected(&self, header: &BlockHeader, height: u32) {
let mut database = SqliteDatabase::new(self.bdk_db_path.clone());
let mut deleted_txids = vec![];
// delete all transactions with this height
for details in database.iter_txs(false).unwrap() {
match details.confirmation_time {
Some(c) if c.height < height => continue,
_ => {
database.del_tx(&details.txid, false).unwrap();
deleted_txids.push(details.txid)
}
};
}
// delete all utxos from the deleted txs
if deleted_txids.len() > 0 {
for utxo in database.iter_utxos().unwrap() {
if deleted_txids.contains(&utxo.outpoint.txid) {
database.del_utxo(&utxo.outpoint).unwrap();
}
}
}
// TODO: update the keychain indexes?
//
// TODO: there's probably a bug here.
// need to atomicly update bdk database and this last_sync
let mut node_database = NodeDatabase::new(self.node_db_path.clone());
node_database
.update_last_sync(header.prev_blockhash)
.unwrap();
}
}
pub(crate) trait DatabaseUtils: Database {
fn is_mine(&self, script: &Script) -> Result<bool, bdk::Error> {
self.get_path_from_script_pubkey(script)
.map(|o| o.is_some())
}
fn get_raw_tx_or<D>(&self, txid: &Txid, default: D) -> Result<Option<Transaction>, bdk::Error>
where
D: FnOnce() -> Result<Option<Transaction>, bdk::Error>,
{
self.get_tx(txid, true)?
.map(|t| t.transaction)
.flatten()
.map_or_else(default, |t| Ok(Some(t)))
}
fn get_previous_output(&self, outpoint: &OutPoint) -> Result<Option<TxOut>, bdk::Error> {
self.get_raw_tx(&outpoint.txid)?
.map(|previous_tx| {
if outpoint.vout as usize >= previous_tx.output.len() {
Err(bdk::Error::InvalidOutpoint(*outpoint))
} else {
Ok(previous_tx.output[outpoint.vout as usize].clone())
}
})
.transpose()
}
}
impl<T: Database> DatabaseUtils for T {}

97
src/chain/manager.rs

@ -0,0 +1,97 @@
use std::{sync::Arc, time::Duration};
use crate::{
config::SenseiConfig,
node::{ChainMonitor, ChannelManager},
};
use bitcoin::BlockHash;
use lightning::chain::{BestBlock, Listen};
use lightning_block_sync::poll::ValidatedBlockHeader;
use lightning_block_sync::SpvClient;
use lightning_block_sync::{init, poll, UnboundedCache};
use std::ops::Deref;
use super::{
bitcoind_client::BitcoindClient, listener::SenseiChainListener,
listener_database::ListenerDatabase,
};
pub struct SenseiChainManager {
config: SenseiConfig,
pub listener: Arc<SenseiChainListener>,
pub bitcoind_client: Arc<BitcoindClient>,
}
impl SenseiChainManager {
pub async fn new(config: SenseiConfig) -> Result<Self, crate::error::Error> {
let listener = Arc::new(SenseiChainListener::new());
let bitcoind_client = Arc::new(
BitcoindClient::new(
config.bitcoind_rpc_host.clone(),
config.bitcoind_rpc_port,
config.bitcoind_rpc_username.clone(),
config.bitcoind_rpc_password.clone(),
tokio::runtime::Handle::current(),
)
.await
.expect("invalid bitcoind rpc config"),
);
let block_source_poller = bitcoind_client.clone();
let listener_poller = listener.clone();
tokio::spawn(async move {
let derefed = &mut block_source_poller.deref();
let mut cache = UnboundedCache::new();
let chain_tip = init::validate_best_block_header(derefed).await.unwrap();
let chain_poller = poll::ChainPoller::new(derefed, config.network);
let mut spv_client =
SpvClient::new(chain_tip, chain_poller, &mut cache, listener_poller);
loop {
spv_client.poll_best_tip().await.unwrap();
tokio::time::sleep(Duration::from_secs(1)).await;
}
});
Ok(Self {
config,
listener,
bitcoind_client,
})
}
pub async fn synchronize_to_tip(
&self,
chain_listeners: Vec<(BlockHash, &(dyn Listen + Send + Sync))>,
) -> Result<ValidatedBlockHeader, crate::error::Error> {
let chain_tip = init::synchronize_listeners(
&mut self.bitcoind_client.deref(),
self.config.network,
&mut UnboundedCache::new(),
chain_listeners,
)
.await
.unwrap();
Ok(chain_tip)
}
pub async fn keep_in_sync(
&self,
channel_manager: Arc<ChannelManager>,
chain_monitor: Arc<ChainMonitor>,
listener_database: ListenerDatabase,
) -> Result<(), crate::error::Error> {
let chain_listener = (chain_monitor, channel_manager, listener_database);
self.listener.add_listener(chain_listener);
Ok(())
}
pub async fn get_best_block(&self) -> Result<BestBlock, crate::error::Error> {
let blockchain_info = self.bitcoind_client.get_blockchain_info().await;
Ok(BestBlock::new(
blockchain_info.latest_blockhash,
blockchain_info.latest_height as u32,
))
}
}

6
src/chain/mod.rs

@ -0,0 +1,6 @@
pub mod bitcoind_client;
pub mod broadcaster;
pub mod listener;
pub mod listener_database;
pub mod manager;
pub mod wallet;

128
src/chain/wallet.rs

@ -0,0 +1,128 @@
use bdk::bitcoin::{Address, Script, Transaction};
use bdk::blockchain::{noop_progress, Blockchain};
use bdk::database::BatchDatabase;
use bdk::wallet::{AddressIndex, Wallet};
use bdk::SignOptions;
use lightning::chain::chaininterface::BroadcasterInterface;
use lightning::chain::chaininterface::{ConfirmationTarget, FeeEstimator};
use std::sync::{Mutex, MutexGuard};
use crate::error::Error;
/// Lightning Wallet
///
/// A wrapper around a bdk::Wallet to fulfill many of the requirements
/// needed to use lightning with LDK.
pub struct LightningWallet<B, D> {
inner: Mutex<Wallet<B, D>>,
}
impl<B, D> LightningWallet<B, D>
where
B: Blockchain,
D: BatchDatabase,
{
/// create a new lightning wallet from your bdk wallet
pub fn new(wallet: Wallet<B, D>) -> Self {
LightningWallet {
inner: Mutex::new(wallet),
}
}
pub fn get_unused_address(&self) -> Result<Address, Error> {
let wallet = self.inner.lock().unwrap();
let address_info = wallet.get_address(AddressIndex::LastUnused)?;
Ok(address_info.address)
}
pub fn construct_funding_transaction(
&self,
output_script: &Script,
value: u64,
target_blocks: usize,
) -> Result<Transaction, Error> {
let wallet = self.inner.lock().unwrap();
let mut tx_builder = wallet.build_tx();
let fee_rate = wallet.client().estimate_fee(target_blocks)?;
tx_builder
.add_recipient(output_script.clone(), value)
.fee_rate(fee_rate)
.enable_rbf();
let (mut psbt, _tx_details) = tx_builder.finish()?;
let _finalized = wallet.sign(&mut psbt, SignOptions::default())?;
Ok(psbt.extract_tx())
}
pub fn get_balance(&self) -> Result<u64, Error> {
let wallet = self.inner.lock().unwrap();
wallet.get_balance().map_err(Error::Bdk)
}
pub fn get_wallet(&self) -> MutexGuard<Wallet<B, D>> {
self.inner.lock().unwrap()
}
fn sync(&self) -> Result<(), Error> {
let wallet = self.inner.lock().unwrap();
wallet.sync(noop_progress(), None)?;
Ok(())
}
}
impl<B, D> From<Wallet<B, D>> for LightningWallet<B, D>
where
B: Blockchain,
D: BatchDatabase,
{
fn from(wallet: Wallet<B, D>) -> Self {
Self::new(wallet)
}
}
impl<B, D> FeeEstimator for LightningWallet<B, D>
where
B: Blockchain,
D: BatchDatabase,
{
fn get_est_sat_per_1000_weight(&self, confirmation_target: ConfirmationTarget) -> u32 {
let wallet = self.inner.lock().unwrap();
let target_blocks = match confirmation_target {
ConfirmationTarget::Background => 6,
ConfirmationTarget::Normal => 3,
ConfirmationTarget::HighPriority => 1,
};
let estimate = wallet
.client()
.estimate_fee(target_blocks)
.unwrap_or_default();
let sats_per_vbyte = estimate.as_sat_vb() as u32;
sats_per_vbyte * 253
}
}
impl<B, D> BroadcasterInterface for LightningWallet<B, D>
where
B: Blockchain,
D: BatchDatabase,
{
fn broadcast_transaction(&self, tx: &Transaction) {
let wallet = self.inner.lock().unwrap();
let _result = wallet.client().broadcast(tx);
}
}
#[cfg(test)]
mod tests {
#[test]
fn it_works() {
let result = 2 + 2;
assert_eq!(result, 4);
}
}

72
src/config.rs

@ -9,52 +9,17 @@
use std::{fs, io};
use bdk::blockchain::ElectrumBlockchainConfig;
use bitcoin::Network;
use serde::{Deserialize, Serialize};
pub const ELECTRUM_MAINNET_URL: &str = "ssl://blockstream.info:700";
pub const DEFAULT_SOCKS5_PROXY: Option<String> = None;
pub const DEFAULT_RETRY_ATTEMPTS: u8 = 3;
pub const DEFAULT_REQUEST_TIMEOUT_SECONDS: Option<u8> = Some(10);
pub const DEFAULT_STOP_GAP: usize = 20;
#[derive(Clone, Serialize, Deserialize, Debug)]
pub enum LightningNodeBackendConfig {
#[serde(rename = "electrum")]
Electrum(ElectrumBlockchainConfig),
}
impl Default for LightningNodeBackendConfig {
fn default() -> Self {
LightningNodeBackendConfig::Electrum(ElectrumBlockchainConfig {
url: ELECTRUM_MAINNET_URL.into(),
socks5: DEFAULT_SOCKS5_PROXY,
retry: DEFAULT_RETRY_ATTEMPTS,
timeout: DEFAULT_REQUEST_TIMEOUT_SECONDS,
stop_gap: DEFAULT_STOP_GAP,
})
}
}
impl LightningNodeBackendConfig {
pub fn electrum_from_url(url: String) -> Self {
LightningNodeBackendConfig::Electrum( ElectrumBlockchainConfig {
url,
socks5: DEFAULT_SOCKS5_PROXY,
retry: DEFAULT_RETRY_ATTEMPTS,
timeout: DEFAULT_REQUEST_TIMEOUT_SECONDS,
stop_gap: DEFAULT_STOP_GAP,
})
}
}
#[derive(Clone, Serialize, Deserialize)]
pub struct SenseiConfig {
#[serde(skip)]
pub path: String,
pub backend: LightningNodeBackendConfig,
pub bitcoind_rpc_host: String,
pub bitcoind_rpc_port: u16,
pub bitcoind_rpc_username: String,
pub bitcoind_rpc_password: String,
pub network: Network,
pub api_port: u16,
}
@ -65,7 +30,10 @@ impl Default for SenseiConfig {
let path = format!("{}/.sensei/config.json", home_dir.to_str().unwrap());
Self {
path,
backend: LightningNodeBackendConfig::default(),
bitcoind_rpc_host: String::from("127.0.0.1"),
bitcoind_rpc_port: 8133,
bitcoind_rpc_username: String::from("bitcoin"),
bitcoind_rpc_password: String::from("bitcoin"),
network: Network::Bitcoin,
api_port: 5401,
}
@ -83,7 +51,10 @@ impl SenseiConfig {
serde_json::from_str(&config_str).expect("failed to parse configuration file");
// merge all of `config` properties into `merge_config`
// return `merge_config`
merge_config.set_backend(config.backend);
merge_config.bitcoind_rpc_host = config.bitcoind_rpc_host;
merge_config.bitcoind_rpc_port = config.bitcoind_rpc_port;
merge_config.bitcoind_rpc_username = config.bitcoind_rpc_username;
merge_config.bitcoind_rpc_password = config.bitcoind_rpc_password;
merge_config
}
Err(e) => match e.kind() {
@ -108,10 +79,6 @@ impl SenseiConfig {
self.network = network;
}
pub fn set_backend(&mut self, backend: LightningNodeBackendConfig) {
self.backend = backend;
}
pub fn save(&mut self) {
fs::write(
self.path.clone(),
@ -123,7 +90,10 @@ impl SenseiConfig {
#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct LightningNodeConfig {
pub backend: LightningNodeBackendConfig,
pub bitcoind_rpc_host: String,
pub bitcoind_rpc_port: u16,
pub bitcoind_rpc_username: String,
pub bitcoind_rpc_password: String,
pub data_dir: String,
pub ldk_peer_listening_port: u16,
pub ldk_announced_listen_addr: Vec<String>,
@ -136,7 +106,10 @@ pub struct LightningNodeConfig {
impl Default for LightningNodeConfig {
fn default() -> Self {
LightningNodeConfig {
backend: LightningNodeBackendConfig::default(),
bitcoind_rpc_host: String::from("127.0.0.1"),
bitcoind_rpc_port: 8133,
bitcoind_rpc_username: String::from("bitcoin"),
bitcoind_rpc_password: String::from("bitcoin"),
data_dir: ".".into(),
ldk_peer_listening_port: 9735,
ldk_announced_listen_addr: vec![],
@ -155,6 +128,11 @@ impl LightningNodeConfig {
pub fn node_database_path(&self) -> String {
format!("{}/node.db", self.data_dir())
}
pub fn bdk_database_path(&self) -> String {
format!("{}/bdk.db", self.data_dir())
}
pub fn admin_macaroon_path(&self) -> String {
format!("{}/admin.macaroon", self.data_dir())
}

7
src/database/mod.rs

@ -13,4 +13,11 @@ pub mod node;
#[derive(Debug)]
pub enum Error {
Generic(String),
Encode(bitcoin::consensus::encode::Error),
}
impl From<bitcoin::consensus::encode::Error> for Error {
fn from(e: bitcoin::consensus::encode::Error) -> Error {
Error::Encode(e)
}
}

61
src/database/node.rs

@ -13,8 +13,11 @@ use crate::{
node::PaymentInfo,
services::{PaginationRequest, PaginationResponse, PaymentsFilter},
};
use bitcoin::consensus::encode::{deserialize, serialize};
use std::str::FromStr;
use super::Error;
use bitcoin::BlockHash;
use rusqlite::{named_params, Connection};
use serde::Serialize;
@ -74,8 +77,9 @@ static MIGRATIONS: &[&str] = &[
"CREATE TRIGGER tg_forwarded_payments_updated AFTER UPDATE ON forwarded_payments FOR EACH ROW BEGIN UPDATE forwarded_payments SET updated_at = current_timestamp, total_payments = old.total_payments + 1 WHERE id=old.id; END;",
"CREATE INDEX idx_from_channel_id ON forwarded_payments(from_channel_id)",
"CREATE INDEX idx_to_channel_id ON forwarded_payments(to_channel_id)",
"CREATE UNIQUE INDEX idx_hours_since_epoch ON forwarded_payments(hours_since_epoch, from_channel_id, to_channel_id)"
];
"CREATE UNIQUE INDEX idx_hours_since_epoch ON forwarded_payments(hours_since_epoch, from_channel_id, to_channel_id)",
"CREATE TABLE last_sync (blockhash BLOB)",
];
pub struct NodeDatabase {
pub path: String,
@ -123,6 +127,59 @@ impl NodeDatabase {
}
}
pub fn find_or_create_last_sync(
&mut self,
current_blockhash: BlockHash,
) -> Result<BlockHash, Error> {
match self.get_last_sync()? {
Some(last_sync) => Ok(last_sync),
None => {
self.create_last_sync(current_blockhash)?;
Ok(current_blockhash)
}
}
}
pub fn create_last_sync(&mut self, blockhash: BlockHash) -> Result<(), Error> {
let mut statement = self
.connection
.prepare_cached("INSERT INTO last_sync (blockhash) VALUES (:blockhash)")?;
statement.execute(named_params! {
":blockhash": serialize(&blockhash),
})?;
Ok(())
}
pub fn update_last_sync(&mut self, blockhash: BlockHash) -> Result<(), Error> {
let mut statement = self
.connection
.prepare_cached("UPDATE last_sync SET blockhash=:blockhash")?;
statement.execute(named_params! {
":blockhash": serialize(&blockhash),
})?;
Ok(())
}
pub fn get_last_sync(&mut self) -> Result<Option<BlockHash>, Error> {
let mut statement = self
.connection
.prepare_cached("SELECT blockhash FROM last_sync")?;
let mut rows = statement.query(named_params! {})?;
let row = rows.next()?;
match row {
Some(row) => {
let blockhash: Vec<u8> = row.get(0)?;
Ok(Some(deserialize(&blockhash)?))
}
None => Ok(None),
}
}
pub fn create_macaroon(&mut self, identifier: Vec<u8>) -> Result<(), Error> {
let mut statement = self
.connection

21
src/error.rs

@ -19,7 +19,7 @@ pub enum Error {
Io(std::io::Error),
Secp256k1(bitcoin::secp256k1::Error),
Bdk(bdk::Error),
BdkLdk(bdk_ldk::Error),
BitcoinRpc(bitcoincore_rpc::Error),
LdkApi(lightning::util::errors::APIError),
LdkMsg(lightning::ln::msgs::LightningError),
LdkInvoice(lightning_invoice::payment::PaymentError),
@ -38,15 +38,14 @@ impl Display for Error {
let str = match self {
Error::Db(e) => match e {
database::Error::Generic(str) => str.clone(),
database::Error::Encode(e) => e.to_string(),
},
Error::Macaroon(_e) => "macaroon error".to_string(),
Error::TinderCrypt(e) => e.to_string(),
Error::Io(e) => e.to_string(),
Error::Secp256k1(e) => e.to_string(),
Error::Bdk(e) => e.to_string(),
Error::BdkLdk(e) => match e {
bdk_ldk::Error::Bdk(e) => e.to_string(),
},
Error::BitcoinRpc(e) => e.to_string(),
Error::LdkApi(e) => format!("{:?}", e),
Error::LdkMsg(e) => format!("{:?}", e),
Error::LdkInvoice(e) => format!("{:?}", e),
@ -81,6 +80,12 @@ impl From<bdk::Error> for Error {
}
}
impl From<bitcoincore_rpc::Error> for Error {
fn from(e: bitcoincore_rpc::Error) -> Error {
Error::BitcoinRpc(e)
}
}
impl From<lightning_invoice::payment::PaymentError> for Error {
fn from(e: lightning_invoice::payment::PaymentError) -> Self {
Error::LdkInvoice(e)
@ -128,11 +133,3 @@ impl From<macaroon::MacaroonError> for Error {
Error::Macaroon(e)
}
}
// TODO: since this is our library maybe we just want to map
// these to the underlying errors instead of being wrapped again
impl From<bdk_ldk::Error> for Error {
fn from(e: bdk_ldk::Error) -> Error {
Error::BdkLdk(e)
}
}

56
src/event_handler.rs

@ -7,13 +7,15 @@
// You may not use this file except in accordance with one or both of these
// licenses.
use crate::chain::manager::SenseiChainManager;
use crate::database::node::{ForwardedPayment, NodeDatabase};
use crate::node::{
ChannelManager, HTLCStatus, LightningWallet, MillisatAmount, PaymentInfo, PaymentOrigin,
};
use crate::node::{ChannelManager, HTLCStatus, MillisatAmount, PaymentInfo, PaymentOrigin};
use crate::utils;
use crate::{config::LightningNodeConfig, hex_utils};
use bdk::database::SqliteDatabase;
use bdk::wallet::AddressIndex;
use bdk::{FeeRate, SignOptions};
use bitcoin::{secp256k1::Secp256k1, Network};
use bitcoin_bech32::WitnessProgram;
use lightning::{
@ -30,10 +32,11 @@ use tokio::runtime::Handle;
pub struct LightningNodeEventHandler {
pub config: LightningNodeConfig,
pub wallet: Arc<LightningWallet>,
pub wallet: Arc<Mutex<bdk::Wallet<(), SqliteDatabase>>>,
pub channel_manager: Arc<ChannelManager>,
pub keys_manager: Arc<KeysManager>,
pub database: Arc<Mutex<NodeDatabase>>,
pub chain_manager: Arc<SenseiChainManager>,
pub tokio_handle: Handle,
}
@ -60,18 +63,29 @@ impl EventHandler for LightningNodeEventHandler {
.expect("Lightning funding tx should always be to a SegWit output")
.to_address();
let target_blocks = 100;
// Have wallet put the inputs into the transaction such that the output
// is satisfied and then sign the funding transaction
let funding_tx = self
.wallet
.construct_funding_transaction(
output_script,
*channel_value_satoshis,
target_blocks,
)
.unwrap();
let wallet = self.wallet.lock().unwrap();
let mut tx_builder = wallet.build_tx();
let fee_sats_per_1000_wu = self
.chain_manager
.bitcoind_client
.get_est_sat_per_1000_weight(ConfirmationTarget::Normal);
// TODO: is this the correct conversion??
let fee_rate = FeeRate::from_sat_per_vb(2.0);
tx_builder
.add_recipient(output_script.clone(), *channel_value_satoshis)
.fee_rate(fee_rate)
.enable_rbf();
let (mut psbt, _tx_details) = tx_builder.finish().unwrap();
let _finalized = wallet.sign(&mut psbt, SignOptions::default()).unwrap();
let funding_tx = psbt.extract_tx();
// Give the funding transaction back to LDK for opening the channel.
if self
@ -249,11 +263,16 @@ impl EventHandler for LightningNodeEventHandler {
});
}
Event::SpendableOutputs { outputs } => {
let destination_address = self.wallet.get_unused_address().unwrap();
let wallet = self.wallet.lock().unwrap();
let address_info = wallet.get_address(AddressIndex::LastUnused).unwrap();
let destination_address = address_info.address;
let output_descriptors = &outputs.iter().collect::<Vec<_>>();
let tx_feerate = self
.wallet
.chain_manager
.bitcoind_client
.get_est_sat_per_1000_weight(ConfirmationTarget::Normal);
let spending_tx = self
.keys_manager
.spend_spendable_outputs(
@ -264,7 +283,10 @@ impl EventHandler for LightningNodeEventHandler {
&Secp256k1::new(),
)
.unwrap();
self.wallet.broadcast_transaction(&spending_tx);
self.chain_manager
.bitcoind_client
.broadcast_transaction(&spending_tx);
}
Event::ChannelClosed {
channel_id,

9
src/grpc/admin.rs

@ -7,13 +7,14 @@
// You may not use this file except in accordance with one or both of these
// licenses.
use std::sync::Arc;
pub use super::sensei::admin_server::{Admin, AdminServer};
use super::sensei::{
AdminStartNodeRequest, AdminStartNodeResponse, AdminStopNodeRequest, AdminStopNodeResponse,
CreateAdminRequest, CreateAdminResponse, CreateNodeRequest, CreateNodeResponse,
DeleteNodeRequest, DeleteNodeResponse, GetStatusRequest,
GetStatusResponse, ListNode, ListNodesRequest, ListNodesResponse, StartAdminRequest,
StartAdminResponse
DeleteNodeRequest, DeleteNodeResponse, GetStatusRequest, GetStatusResponse, ListNode,
ListNodesRequest, ListNodesResponse, StartAdminRequest, StartAdminResponse,
};
use crate::{
services::admin::{AdminRequest, AdminResponse},
@ -213,7 +214,7 @@ impl TryFrom<AdminResponse> for DeleteNodeResponse {
}
}
pub struct AdminService {
pub request_context: crate::RequestContext,
pub request_context: Arc<crate::RequestContext>,
}
impl AdminService {

4
src/grpc/node.rs

@ -7,6 +7,8 @@
// You may not use this file except in accordance with one or both of these
// licenses.
use std::sync::Arc;
pub use super::sensei::node_server::{Node, NodeServer};
use super::sensei::{
@ -30,7 +32,7 @@ use crate::{
use tonic::{metadata::MetadataMap, Response, Status};
pub struct NodeService {
pub request_context: crate::RequestContext,
pub request_context: Arc<crate::RequestContext>,
}
impl NodeService {
async fn authenticated_request(

20
src/http/admin.rs

@ -7,6 +7,8 @@
// You may not use this file except in accordance with one or both of these
// licenses.
use std::sync::Arc;
use axum::{
extract::{Extension, Query},
routing::{get, post},
@ -208,7 +210,7 @@ pub fn add_routes(router: Router) -> Router {
}
pub async fn list_nodes(
Extension(request_context): Extension<RequestContext>,
Extension(request_context): Extension<Arc<RequestContext>>,
cookies: Cookies,
Query(pagination): Query<PaginationRequest>,
MacaroonHeader(macaroon): MacaroonHeader,
@ -229,7 +231,7 @@ pub async fn list_nodes(
}
pub async fn login(
Extension(request_context): Extension<RequestContext>,
Extension(request_context): Extension<Arc<RequestContext>>,
cookies: Cookies,
Json(payload): Json<Value>,
) -> Result<Json<Value>, StatusCode> {
@ -280,7 +282,7 @@ pub async fn logout(cookies: Cookies) -> Result<Json<Value>, StatusCode> {
}
pub async fn init_sensei(
Extension(request_context): Extension<RequestContext>,
Extension(request_context): Extension<Arc<RequestContext>>,
cookies: Cookies,
Json(payload): Json<Value>,
) -> Result<Json<AdminResponse>, StatusCode> {
@ -317,7 +319,7 @@ pub async fn init_sensei(
}
pub async fn get_status(
Extension(request_context): Extension<RequestContext>,
Extension(request_context): Extension<Arc<RequestContext>>,
cookies: Cookies,
MacaroonHeader(macaroon): MacaroonHeader,
) -> Result<Json<AdminResponse>, StatusCode> {
@ -344,7 +346,7 @@ pub async fn get_status(
}
pub async fn start_sensei(
Extension(request_context): Extension<RequestContext>,
Extension(request_context): Extension<Arc<RequestContext>>,
cookies: Cookies,
Json(payload): Json<Value>,
) -> Result<Json<AdminResponse>, StatusCode> {
@ -381,7 +383,7 @@ pub async fn start_sensei(
}
pub async fn create_node(
Extension(request_context): Extension<RequestContext>,
Extension(request_context): Extension<Arc<RequestContext>>,
Json(payload): Json<Value>,
cookies: Cookies,
MacaroonHeader(macaroon): MacaroonHeader,
@ -406,7 +408,7 @@ pub async fn create_node(
}
pub async fn start_node(
Extension(request_context): Extension<RequestContext>,
Extension(request_context): Extension<Arc<RequestContext>>,
Json(payload): Json<Value>,
cookies: Cookies,
MacaroonHeader(macaroon): MacaroonHeader,
@ -431,7 +433,7 @@ pub async fn start_node(
}
pub async fn stop_node(
Extension(request_context): Extension<RequestContext>,
Extension(request_context): Extension<Arc<RequestContext>>,
Json(payload): Json<Value>,
cookies: Cookies,
MacaroonHeader(macaroon): MacaroonHeader,
@ -456,7 +458,7 @@ pub async fn stop_node(
}
pub async fn delete_node(
Extension(request_context): Extension<RequestContext>,
Extension(request_context): Extension<Arc<RequestContext>>,
Json(payload): Json<Value>,
cookies: Cookies,
MacaroonHeader(macaroon): MacaroonHeader,

40
src/http/node.rs

@ -7,6 +7,8 @@
// You may not use this file except in accordance with one or both of these
// licenses.
use std::sync::Arc;
use crate::http::macaroon_header::MacaroonHeader;
use crate::services::admin::AdminRequest;
use crate::services::node::{NodeRequest, NodeRequestError, NodeResponse};
@ -185,7 +187,7 @@ pub fn add_routes(router: Router) -> Router {
}
pub async fn get_unused_address(
Extension(request_context): Extension<RequestContext>,
Extension(request_context): Extension<Arc<RequestContext>>,
MacaroonHeader(macaroon): MacaroonHeader,
cookies: Cookies,
) -> Result<Json<NodeResponse>, StatusCode> {
@ -199,7 +201,7 @@ pub async fn get_unused_address(
}
pub async fn get_wallet_balance(
Extension(request_context): Extension<RequestContext>,
Extension(request_context): Extension<Arc<RequestContext>>,
MacaroonHeader(macaroon): MacaroonHeader,
cookies: Cookies,
) -> Result<Json<NodeResponse>, StatusCode> {
@ -213,7 +215,7 @@ pub async fn get_wallet_balance(
}
pub async fn handle_get_payments(
Extension(request_context): Extension<RequestContext>,
Extension(request_context): Extension<Arc<RequestContext>>,
Query(params): Query<ListPaymentsParams>,
MacaroonHeader(macaroon): MacaroonHeader,
cookies: Cookies,
@ -227,7 +229,7 @@ pub async fn handle_get_payments(
}
pub async fn get_channels(
Extension(request_context): Extension<RequestContext>,
Extension(request_context): Extension<Arc<RequestContext>>,
Query(params): Query<ListChannelsParams>,
MacaroonHeader(macaroon): MacaroonHeader,
cookies: Cookies,
@ -240,7 +242,7 @@ pub async fn get_channels(
}
pub async fn get_transactions(
Extension(request_context): Extension<RequestContext>,
Extension(request_context): Extension<Arc<RequestContext>>,
Query(params): Query<ListTransactionsParams>,
MacaroonHeader(macaroon): MacaroonHeader,
cookies: Cookies,
@ -253,7 +255,7 @@ pub async fn get_transactions(
}
pub async fn get_info(
Extension(request_context): Extension<RequestContext>,
Extension(request_context): Extension<Arc<RequestContext>>,
MacaroonHeader(macaroon): MacaroonHeader,
cookies: Cookies,
) -> Result<Json<NodeResponse>, StatusCode> {
@ -261,7 +263,7 @@ pub async fn get_info(
}
pub async fn get_peers(
Extension(request_context): Extension<RequestContext>,
Extension(request_context): Extension<Arc<RequestContext>>,
MacaroonHeader(macaroon): MacaroonHeader,
cookies: Cookies,
) -> Result<Json<NodeResponse>, StatusCode> {
@ -275,7 +277,7 @@ pub async fn get_peers(
}
pub async fn stop_node(
Extension(request_context): Extension<RequestContext>,
Extension(request_context): Extension<Arc<RequestContext>>,
MacaroonHeader(macaroon): MacaroonHeader,
cookies: Cookies,
) -> Result<Json<NodeResponse>, StatusCode> {
@ -283,7 +285,7 @@ pub async fn stop_node(
}
pub async fn handle_authenticated_request(
request_context: RequestContext,
request_context: Arc<RequestContext>,
request: NodeRequest,
macaroon: Option<HeaderValue>,
cookies: Cookies,
@ -362,7 +364,7 @@ pub async fn handle_authenticated_request(
}
pub async fn start_node(
Extension(request_context): Extension<RequestContext>,
Extension(request_context): Extension<Arc<RequestContext>>,
Json(payload): Json<Value>,
MacaroonHeader(macaroon): MacaroonHeader,
cookies: Cookies,
@ -378,7 +380,7 @@ pub async fn start_node(
}
pub async fn create_invoice(
Extension(request_context): Extension<RequestContext>,
Extension(request_context): Extension<Arc<RequestContext>>,
Json(payload): Json<Value>,
MacaroonHeader(macaroon): MacaroonHeader,
cookies: Cookies,
@ -394,7 +396,7 @@ pub async fn create_invoice(
}
pub async fn label_payment(
Extension(request_context): Extension<RequestContext>,
Extension(request_context): Extension<Arc<RequestContext>>,
Json(payload): Json<Value>,
MacaroonHeader(macaroon): MacaroonHeader,
cookies: Cookies,
@ -410,7 +412,7 @@ pub async fn label_payment(
}
pub async fn delete_payment(
Extension(request_context): Extension<RequestContext>,
Extension(request_context): Extension<Arc<RequestContext>>,
Json(payload): Json<Value>,
MacaroonHeader(macaroon): MacaroonHeader,
cookies: Cookies,
@ -426,7 +428,7 @@ pub async fn delete_payment(
}
pub async fn pay_invoice(
Extension(request_context): Extension<RequestContext>,
Extension(request_context): Extension<Arc<RequestContext>>,
Json(payload): Json<Value>,
MacaroonHeader(macaroon): MacaroonHeader,
cookies: Cookies,
@ -442,7 +444,7 @@ pub async fn pay_invoice(
}
pub async fn open_channel(
Extension(request_context): Extension<RequestContext>,
Extension(request_context): Extension<Arc<RequestContext>>,
Json(payload): Json<Value>,
MacaroonHeader(macaroon): MacaroonHeader,
cookies: Cookies,
@ -458,7 +460,7 @@ pub async fn open_channel(
}
pub async fn close_channel(
Extension(request_context): Extension<RequestContext>,
Extension(request_context): Extension<Arc<RequestContext>>,
Json(payload): Json<Value>,
MacaroonHeader(macaroon): MacaroonHeader,
cookies: Cookies,
@ -474,7 +476,7 @@ pub async fn close_channel(
}
pub async fn keysend(
Extension(request_context): Extension<RequestContext>,
Extension(request_context): Extension<Arc<RequestContext>>,
Json(payload): Json<Value>,
MacaroonHeader(macaroon): MacaroonHeader,
cookies: Cookies,
@ -490,7 +492,7 @@ pub async fn keysend(
}
pub async fn connect_peer(
Extension(request_context): Extension<RequestContext>,
Extension(request_context): Extension<Arc<RequestContext>>,
Json(payload): Json<Value>,
MacaroonHeader(macaroon): MacaroonHeader,
cookies: Cookies,
@ -506,7 +508,7 @@ pub async fn connect_peer(
}
pub async fn sign_message(
Extension(request_context): Extension<RequestContext>,
Extension(request_context): Extension<Arc<RequestContext>>,
Json(payload): Json<Value>,
MacaroonHeader(macaroon): MacaroonHeader,
cookies: Cookies,

138
src/main.rs

@ -7,6 +7,7 @@
// You may not use this file except in accordance with one or both of these
// licenses.
mod chain;
mod config;
mod database;
mod disk;
@ -21,21 +22,24 @@ mod node;
mod services;
mod utils;
use crate::config::{LightningNodeBackendConfig, SenseiConfig};
use crate::config::SenseiConfig;
use crate::database::admin::AdminDatabase;
use crate::http::admin::add_routes as add_admin_routes;
use crate::http::node::add_routes as add_node_routes;
use ::http::{
header::{ACCEPT, AUTHORIZATION, CONTENT_TYPE, COOKIE},
Method,
header::{self, ACCEPT, AUTHORIZATION, CONTENT_TYPE, COOKIE},
Method, Uri,
};
use axum::{
body::{boxed, Full},
handler::Handler,
http::StatusCode,
response::Html,
response::{Html, IntoResponse, Response},
routing::{get, get_service},
AddExtensionLayer, Router,
};
use clap::Parser;
use rust_embed::RustEmbed;
use std::net::SocketAddr;
use tower_cookies::CookieManagerLayer;
@ -45,7 +49,7 @@ use grpc::admin::{AdminServer, AdminService as GrpcAdminService};
use grpc::node::{NodeServer, NodeService as GrpcNodeService};
use lightning_background_processor::BackgroundProcessor;
use node::LightningNode;
use services::admin::AdminService;
use services::admin::{AdminRequest, AdminResponse, AdminService};
use std::collections::HashMap;
use std::fs;
use std::sync::Arc;
@ -54,12 +58,7 @@ use tokio::task::JoinHandle;
use tonic::transport::Server;
use tower_http::cors::{CorsLayer, Origin};
#[macro_use]
extern crate lazy_static;
lazy_static! {
static ref INDEX_HTML: String = fs::read_to_string("./web-admin/build/index.html").unwrap();
}
use tokio::sync::mpsc::Sender;
pub struct NodeHandle {
pub node: Arc<LightningNode>,
@ -69,7 +68,6 @@ pub struct NodeHandle {
pub type NodeDirectory = Arc<Mutex<HashMap<String, NodeHandle>>>;
#[derive(Clone)]
pub struct RequestContext {
pub node_directory: NodeDirectory,
pub admin_service: AdminService,
@ -80,14 +78,22 @@ pub struct RequestContext {
#[clap(version)]
struct SenseiArgs {
/// Sensei data directory, defaults to home directory
#[clap(short, long)]
#[clap(long)]
data_dir: Option<String>,
#[clap(short, long)]
#[clap(long)]
network: Option<String>,
#[clap(short, long)]
electrum_url: Option<String>,
#[clap(long)]
bitcoind_rpc_host: Option<String>,
#[clap(long)]
bitcoind_rpc_port: Option<u16>,
#[clap(long)]
bitcoind_rpc_username: Option<String>,
#[clap(long)]
bitcoind_rpc_password: Option<String>,
}
pub type AdminRequestResponse = (AdminRequest, Sender<AdminResponse>);
#[tokio::main]
async fn main() {
macaroon::initialize().expect("failed to initialize macaroons");
@ -116,8 +122,18 @@ async fn main() {
let network_config_path = format!("{}/{}/config.json", sensei_dir, root_config.network);
let mut config = SenseiConfig::from_file(network_config_path, Some(root_config));
if let Some(electrum_url) = args.electrum_url {
config.set_backend(LightningNodeBackendConfig::electrum_from_url(electrum_url));
// override config with command line arguments or ENV vars
if let Some(bitcoind_rpc_host) = args.bitcoind_rpc_host {
config.bitcoind_rpc_host = bitcoind_rpc_host
}
if let Some(bitcoind_rpc_port) = args.bitcoind_rpc_port {
config.bitcoind_rpc_port = bitcoind_rpc_port
}
if let Some(bitcoind_rpc_username) = args.bitcoind_rpc_username {
config.bitcoind_rpc_username = bitcoind_rpc_username
}
if let Some(bitcoind_rpc_password) = args.bitcoind_rpc_password {
config.bitcoind_rpc_password = bitcoind_rpc_password
}
let sqlite_path = format!("{}/{}/admin.db", sensei_dir, config.network);
@ -132,29 +148,19 @@ async fn main() {
config.clone(),
node_directory.clone(),
database,
);
)
.await;
// TODO: this seems odd too, maybe just pass around the 'admin service'
// and the servers will use it to get the node from the directory
let request_context = RequestContext {
let request_context = Arc::new(RequestContext {
node_directory: node_directory.clone(),
admin_service,
};
});
let router = Router::new()
.route("/admin", get(live))
.nest(
"/admin/static",
get_service(ServeDir::new("./web-admin/build/static")).handle_error(
|error: std::io::Error| async move {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("Unhandled internal error: {}", error),
)
},
),
)
.fallback(get(live));
.route("/admin/*path", static_handler.into_service())
.fallback(get(not_found));
let router = add_admin_routes(router);
let router = add_node_routes(router);
@ -188,7 +194,9 @@ async fn main() {
.add_service(NodeServer::new(GrpcNodeService {
request_context: request_context.clone(),
}))
.add_service(AdminServer::new(GrpcAdminService { request_context }))
.add_service(AdminServer::new(GrpcAdminService {
request_context: request_context.clone(),
}))
.into_service();
let hybrid_service = hybrid::hybrid(http_service, grpc_service);
@ -205,11 +213,61 @@ async fn main() {
}
}
async fn live() -> Html<String> {
let index = fs::read_to_string("./web-admin/build/index.html").unwrap();
Html(index)
// We use static route matchers ("/" and "/index.html") to serve our home
// page.
async fn index_handler() -> impl IntoResponse {
static_handler("/index.html".parse::<Uri>().unwrap()).await
}
// We use a wildcard matcher ("/static/*file") to match against everything
// within our defined assets directory. This is the directory on our Asset
// struct below, where folder = "examples/public/".
async fn static_handler(uri: Uri) -> impl IntoResponse {
let mut path = uri.path().trim_start_matches('/').to_string();
println!("in static handler with path: {}", path);
if path.starts_with("admin/static/") {
path = path.replace("admin/static/", "static/");
} else {
path = String::from("index.html");
}
println!("out static handler with path: {}", path);
StaticFile(path)
}
// Finally, we use a fallback route for anything that didn't match.
async fn not_found() -> Html<&'static str> {
Html("<h1>404</h1><p>Not Found</p>")
}
async fn _handler() -> Html<&'static str> {
Html(&INDEX_HTML)
#[derive(RustEmbed)]
#[folder = "web-admin/build/"]
struct Asset;
pub struct StaticFile<T>(pub T);
impl<T> IntoResponse for StaticFile<T>
where
T: Into<String>,
{
fn into_response(self) -> Response {
let path = self.0.into();
match Asset::get(path.as_str()) {
Some(content) => {
let body = boxed(Full::from(content.data));
let mime = mime_guess::from_path(path).first_or_octet_stream();
Response::builder()
.header(header::CONTENT_TYPE, mime.as_ref())
.body(body)
.unwrap()
}
None => Response::builder()
.status(StatusCode::NOT_FOUND)
.body(boxed(Full::from("404")))
.unwrap(),
}
}
}

199
src/node.rs

@ -7,7 +7,11 @@
// You may not use this file except in accordance with one or both of these
// licenses.
use crate::config::{LightningNodeBackendConfig, LightningNodeConfig};
use crate::chain::bitcoind_client::BitcoindClient;
use crate::chain::broadcaster::SenseiBroadcaster;
use crate::chain::listener_database::ListenerDatabase;
use crate::chain::manager::SenseiChainManager;
use crate::config::LightningNodeConfig;
use crate::database::node::NodeDatabase;
use crate::disk::FilesystemLogger;
use crate::error::Error;
@ -18,13 +22,19 @@ use crate::services::{PaginationRequest, PaginationResponse, PaymentsFilter};
use crate::utils::PagedVec;
use crate::{database, disk, hex_utils};
use bdk::blockchain::ConfigurableBlockchain;
use bdk::database::SqliteDatabase;
use bdk::keys::ExtendedKey;
use bdk::wallet::AddressIndex;
use bdk::TransactionDetails;
use bdk::{blockchain::ElectrumBlockchain, database::MemoryDatabase};
use bitcoin::hashes::Hash;
use lightning::chain::channelmonitor::ChannelMonitor;
use lightning::chain::{Confirm, Listen};
use lightning_block_sync::BlockSource;
use lightning_block_sync::UnboundedCache;
use lightning::ln::msgs::NetAddress;
use lightning::routing::router::{self, Payee, RouteParameters};
use lightning_block_sync::init;
use lightning_invoice::payment::PaymentError;
use tindercrypt::cryptors::RingCryptor;
@ -134,13 +144,11 @@ pub struct PaymentInfo {
pub invoice: Option<String>,
}
pub type LightningWallet = bdk_ldk::LightningWallet<ElectrumBlockchain, MemoryDatabase>;
pub type ChainMonitor = chainmonitor::ChainMonitor<
InMemorySigner,
Arc<dyn Filter + Send + Sync>,
Arc<LightningWallet>,
Arc<LightningWallet>,
Arc<SenseiBroadcaster>,
Arc<BitcoindClient>,
Arc<FilesystemLogger>,
Arc<FilesystemPersister>,
>;
@ -158,13 +166,13 @@ pub type SimpleArcPeerManager<SD, M, T, F, L> = LdkPeerManager<
pub type PeerManager = SimpleArcPeerManager<
SocketDescriptor,
ChainMonitor,
LightningWallet,
LightningWallet,
SenseiBroadcaster,
BitcoindClient,
FilesystemLogger,
>;
pub type ChannelManager =
SimpleArcChannelManager<ChainMonitor, LightningWallet, LightningWallet, FilesystemLogger>;
SimpleArcChannelManager<ChainMonitor, SenseiBroadcaster, BitcoindClient, FilesystemLogger>;
pub type Router = DefaultRouter<Arc<NetworkGraph>, Arc<FilesystemLogger>>;
@ -176,10 +184,10 @@ pub type InvoicePayer = payment::InvoicePayer<
Arc<LightningNodeEventHandler>,
>;
pub type ConfirmableMonitor = (
pub type SyncableMonitor = (
ChannelMonitor<InMemorySigner>,
Arc<LightningWallet>,
Arc<LightningWallet>,
Arc<SenseiBroadcaster>,
Arc<BitcoindClient>,
Arc<FilesystemLogger>,
);
@ -241,9 +249,10 @@ pub struct LightningNode {
pub macaroon: Macaroon,
pub seed: [u8; 32],
pub database: Arc<Mutex<NodeDatabase>>,
pub wallet: Arc<LightningWallet>,
pub wallet: Arc<Mutex<bdk::Wallet<(), SqliteDatabase>>>,
pub channel_manager: Arc<ChannelManager>,
pub chain_monitor: Arc<ChainMonitor>,
pub chain_manager: Arc<SenseiChainManager>,
pub peer_manager: Arc<PeerManager>,
pub network_graph: Arc<NetworkGraph>,
pub network_graph_msg_handler: Arc<NetworkGraphMessageHandler>,
@ -352,16 +361,17 @@ impl LightningNode {
.map_err(|_e| Error::InvalidMacaroon)
}
pub fn new(
pub async fn new(
config: LightningNodeConfig,
network_graph: Option<Arc<NetworkGraph>>,
network_graph_msg_handler: Option<Arc<NetworkGraphMessageHandler>>,
chain_manager: Arc<SenseiChainManager>,
) -> Result<Self, Error> {
let data_dir = config.data_dir();
fs::create_dir_all(data_dir.clone())?;
let mut node_database = NodeDatabase::new(config.node_database_path());
let network = config.network;
let backend = config.backend.clone();
let channel_manager_path = config.channel_manager_path();
let admin_macaroon_path = config.admin_macaroon_path();
@ -384,53 +394,48 @@ impl LightningNode {
account_number,
);
// TODO: this needs to be replaced with SqliteDatabase
// it's waiting on my changes being merged upstream to bdk though
let database = MemoryDatabase::default();
let database = SqliteDatabase::new(config.bdk_database_path());
let blockchain = match backend {
LightningNodeBackendConfig::Electrum(config) => {
ElectrumBlockchain::from_config(&config)?
}
};
let bdk_wallet = bdk::Wallet::new(
let bdk_wallet = Arc::new(Mutex::new(bdk::Wallet::new_offline(
receive_descriptor_template,
Some(change_descriptor_template),
network,
database,
blockchain,
)?;
)?));
let lightning_wallet = Arc::new(LightningWallet::new(bdk_wallet));
let fee_estimator = lightning_wallet.clone();
let listener_database =
ListenerDatabase::new(config.bdk_database_path(), config.node_database_path());
let fee_estimator = chain_manager.bitcoind_client.clone();
let logger = Arc::new(FilesystemLogger::new(data_dir.clone()));
let broadcaster = lightning_wallet.clone();
let broadcaster = Arc::new(SenseiBroadcaster {
bitcoind_client: chain_manager.bitcoind_client.clone(),
listener_database: listener_database.clone(),
});
let persister = Arc::new(FilesystemPersister::new(data_dir));
let filter = lightning_wallet.clone();
let keys_manager = Arc::new(KeysManager::new(&seed, cur.as_secs(), cur.subsec_nanos()));
let chain_monitor: Arc<ChainMonitor> = Arc::new(chainmonitor::ChainMonitor::new(
Some(filter.clone()),
None,
broadcaster.clone(),
logger.clone(),
fee_estimator.clone(),
persister.clone(),
));
let keys_manager = Arc::new(KeysManager::new(&seed, cur.as_secs(), cur.subsec_nanos()));
let mut channelmonitors = persister.read_channelmonitors(keys_manager.clone())?;
for (_, monitor) in channelmonitors.iter() {
monitor.load_outputs_to_watch(&filter.clone());
}
// TODO: likely expose a lot of this config to our LightningNodeConfig
let mut user_config = UserConfig::default();
user_config
.peer_channel_config_limits
.force_announced_channel_preference = false;
let (_channel_manager_blockhash, channel_manager) = {
let best_block = chain_manager.get_best_block().await?;
let (channel_manager_blockhash, mut channel_manager) = {
if let Ok(mut f) = fs::File::open(channel_manager_path) {
let mut channel_monitor_mut_references = Vec::new();
for (_, channel_monitor) in channelmonitors.iter_mut() {
@ -451,12 +456,11 @@ impl LightningNode {
// an existing chanenl manager. need to handle this the same way we do for seed file
// really should extract to generic error handle for io where we really want to know if
// the file exists or not.
let (tip_height, tip_header) = lightning_wallet.get_tip()?;
let tip_hash = tip_header.block_hash();
let tip_hash = best_block.block_hash();
let chain_params = ChainParameters {
network: config.network,
best_block: BestBlock::new(tip_hash, tip_height),
best_block,
};
let fresh_channel_manager = channelmanager::ChannelManager::new(
fee_estimator.clone(),
@ -471,42 +475,58 @@ impl LightningNode {
}
};
let channel_manager: Arc<ChannelManager> = Arc::new(channel_manager);
// `Confirm` trait is not implemented on an individual ChannelMonitor
// but on a tuple consisting of (channel_monitor, broadcaster, fee_estimator, logger)
// this maps our channel monitors into a tuple that implements Confirm
let mut confirmable_monitors = channelmonitors
.into_iter()
.map(|(_monitor_hash, channel_monitor)| {
let mut bundled_channel_monitors = Vec::new();
for (blockhash, channel_monitor) in channelmonitors.drain(..) {
let outpoint = channel_monitor.get_funding_txo().0;
bundled_channel_monitors.push((
blockhash,
(
channel_monitor,
broadcaster.clone(),
fee_estimator.clone(),
logger.clone(),
)
})
.collect::<Vec<ConfirmableMonitor>>();
),
outpoint,
));
}
let confirmables = confirmable_monitors
.iter()
.map(|cm| cm as &dyn chain::Confirm)
.chain(iter::once(&*channel_manager as &dyn chain::Confirm))
.collect::<Vec<&dyn chain::Confirm>>();
let channel_manager_info = (channel_manager_blockhash, &mut channel_manager);
let monitor_info = bundled_channel_monitors
.iter_mut()
.map(|monitor_bundle| (monitor_bundle.0, &mut monitor_bundle.1))
.collect::<Vec<(BlockHash, &mut SyncableMonitor)>>();
// save a sync if there are no monitors
if confirmables.len() > 1 {
lightning_wallet.sync(confirmables)?;
let mut chain_listeners = vec![(
channel_manager_info.0,
channel_manager_info.1 as &(dyn chain::Listen + Send + Sync),
)];
for (block_hash, monitor) in monitor_info.into_iter() {
chain_listeners.push((block_hash, monitor as &(dyn chain::Listen + Send + Sync)));
}
for confirmable_monitor in confirmable_monitors.drain(..) {
let channel_monitor = confirmable_monitor.0;
let funding_txo = channel_monitor.get_funding_txo().0;
let bdk_database_last_sync =
node_database.find_or_create_last_sync(best_block.block_hash())?;
chain_listeners.push((
bdk_database_last_sync,
&listener_database as &(dyn chain::Listen + Send + Sync),
));
chain_manager
.synchronize_to_tip(chain_listeners)
.await
.unwrap();
for confirmable_monitor in bundled_channel_monitors.drain(..) {
chain_monitor
.watch_channel(funding_txo, channel_monitor)
.watch_channel(confirmable_monitor.2, confirmable_monitor.1 .0)
.unwrap();
}
let channel_manager: Arc<ChannelManager> = Arc::new(channel_manager);
let network_graph = match network_graph {
Some(network_graph) => network_graph,
None => {
@ -573,11 +593,12 @@ impl LightningNode {
let event_handler = Arc::new(LightningNodeEventHandler {
config: config.clone(),
wallet: lightning_wallet.clone(),
wallet: bdk_wallet.clone(),
channel_manager: channel_manager.clone(),
keys_manager: keys_manager.clone(),
database: database.clone(),
tokio_handle: Handle::current(),
chain_manager: chain_manager.clone(),
});
let invoice_payer = Arc::new(InvoicePayer::new(
@ -594,9 +615,10 @@ impl LightningNode {
database,
seed,
macaroon,
wallet: lightning_wallet,
wallet: bdk_wallet,
channel_manager,
chain_monitor,
chain_manager,
peer_manager,
network_graph,
network_graph_msg_handler,
@ -607,10 +629,22 @@ impl LightningNode {
})
}
pub fn start(self) -> (Vec<JoinHandle<()>>, BackgroundProcessor) {
pub async fn start(self) -> (Vec<JoinHandle<()>>, BackgroundProcessor) {
let mut handles = vec![];
let config = self.config.clone();
// is it safe to start this now instead of in `start`
// need to better understand separation; will depend on actual creation and startup flows
let listener_database =
ListenerDatabase::new(config.bdk_database_path(), config.node_database_path());
let channel_manager_sync = self.channel_manager.clone();
let chain_monitor_sync = self.chain_monitor.clone();
self.chain_manager
.keep_in_sync(channel_manager_sync, chain_monitor_sync, listener_database)
.await
.unwrap();
if !config.external_router {
let network_graph = self.network_graph.clone();
let network_graph_path = config.network_graph_path();
@ -654,25 +688,6 @@ impl LightningNode {
}
}));
let wallet_sync = self.wallet.clone();
let channel_manager_sync = self.channel_manager.clone();
let chain_monitor_sync = self.chain_monitor.clone();
handles.push(tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(30));
loop {
interval.tick().await;
let confirmables = vec![
&*channel_manager_sync as &dyn chain::Confirm,
&*chain_monitor_sync as &dyn chain::Confirm,
];
if let Err(e) = wallet_sync.sync(confirmables) {
println!("sync failed: {:?}", e);
}
}
}));
let scorer_path = self.config.scorer_path();
let scorer_persist = Arc::clone(&self.scorer);
@ -1070,7 +1085,7 @@ impl LightningNode {
let page: usize = pagination.page.try_into().unwrap();
let index = page * per_page;
let bdk_wallet = self.wallet.get_wallet();
let bdk_wallet = self.wallet.lock().unwrap();
let transaction_details = bdk_wallet
.list_transactions(false)?
@ -1203,13 +1218,15 @@ impl LightningNode {
NodeRequest::StartNode { passphrase: _ } => Ok(NodeResponse::StartNode {}),
NodeRequest::StopNode {} => Ok(NodeResponse::StopNode {}),
NodeRequest::GetUnusedAddress {} => {
let address = self.wallet.get_unused_address()?;
let wallet = self.wallet.lock().unwrap();
let address_info = wallet.get_address(AddressIndex::LastUnused)?;
Ok(NodeResponse::GetUnusedAddress {
address: address.to_string(),
address: address_info.address.to_string(),
})
}
NodeRequest::GetBalance {} => {
let balance = self.wallet.get_balance()?;
let wallet = self.wallet.lock().unwrap();
let balance = wallet.get_balance().map_err(Error::Bdk)?;
Ok(NodeResponse::GetBalance {
balance_satoshis: balance,
})

22
src/services/admin.rs

@ -8,11 +8,13 @@
// licenses.
use super::{PaginationRequest, PaginationResponse};
use crate::chain::manager::SenseiChainManager;
use crate::database::{
self,
admin::{AdminDatabase, Node, Role, Status},
};
use crate::error::Error as SenseiError;
use crate::AdminRequestResponse;
use crate::{
config::{LightningNodeConfig, SenseiConfig},
hex_utils,
@ -20,7 +22,9 @@ use crate::{
NodeDirectory, NodeHandle,
};
use lightning_block_sync::BlockSource;
use serde::Serialize;
use std::sync::mpsc::Receiver;
use std::{collections::hash_map::Entry, fs, sync::Arc};
use tokio::sync::Mutex;
pub enum AdminRequest {
@ -101,10 +105,11 @@ pub struct AdminService {
pub config: Arc<Mutex<SenseiConfig>>,
pub node_directory: NodeDirectory,
pub database: Arc<Mutex<AdminDatabase>>,
pub chain_manager: Arc<SenseiChainManager>,
}
impl AdminService {
pub fn new(
pub async fn new(
data_dir: &str,
config: SenseiConfig,
node_directory: NodeDirectory,
@ -112,9 +117,10 @@ impl AdminService {
) -> Self {
Self {
data_dir: String::from(data_dir),
config: Arc::new(Mutex::new(config)),
config: Arc::new(Mutex::new(config.clone())),
node_directory,
database: Arc::new(Mutex::new(database)),
chain_manager: Arc::new(SenseiChainManager::new(config).await.unwrap()),
}
}
}
@ -140,6 +146,7 @@ impl From<database::Error> for Error {
fn from(e: database::Error) -> Self {
match e {
database::Error::Generic(str) => Self::Generic(str),
database::Error::Encode(e) => Self::Generic(e.to_string()),
}
}
}
@ -398,7 +405,9 @@ impl AdminService {
node_config,
Some(network_graph),
Some(network_graph_message_handler),
self.chain_manager.clone(),
)
.await
}
Entry::Vacant(_entry) => Err(crate::error::Error::AdminNodeNotStarted),
}
@ -406,7 +415,7 @@ impl AdminService {
None => Err(crate::error::Error::AdminNodeNotCreated),
}
} else {
LightningNode::new(node_config, None, None)
LightningNode::new(node_config, None, None, self.chain_manager.clone()).await
}
}
@ -414,7 +423,10 @@ impl AdminService {
let external_router = node.is_user();
let config = self.config.lock().await;
LightningNodeConfig {
backend: config.backend.clone(),
bitcoind_rpc_host: config.bitcoind_rpc_host.clone(),
bitcoind_rpc_port: config.bitcoind_rpc_port,
bitcoind_rpc_username: config.bitcoind_rpc_username.clone(),
bitcoind_rpc_password: config.bitcoind_rpc_password.clone(),
data_dir: format!("{}/{}/{}", self.data_dir, config.network, node.external_id),
ldk_peer_listening_port: node.listen_port,
ldk_announced_listen_addr: vec![],
@ -447,7 +459,7 @@ impl AdminService {
node.pubkey.clone(),
node.listen_port
);
let (handles, background_processor) = start_lightning_node.start();
let (handles, background_processor) = start_lightning_node.start().await;
entry.insert(NodeHandle {
node: Arc::new(lightning_node.clone()),
background_processor,

9
src/services/node.rs

@ -217,19 +217,10 @@ pub enum NodeResponse {
#[derive(Serialize, Debug)]
pub enum NodeRequestError {
Sensei(String),
BdkLdk(String),
Bdk(String),
Io(String),
}
impl From<bdk_ldk::Error> for NodeRequestError {
fn from(e: bdk_ldk::Error) -> Self {
match e {
bdk_ldk::Error::Bdk(e) => Self::BdkLdk(e.to_string()),
}
}
}
impl From<bdk::Error> for NodeRequestError {
fn from(e: bdk::Error) -> Self {
Self::Bdk(e.to_string())

Loading…
Cancel
Save