Browse Source

Merge pull request #150 from comit-network/monitor-taker

Share code for handling monitoring events to enable taker
fix-bad-api-calls
Daniel Karzel 3 years ago
committed by GitHub
parent
commit
1c7d28f6f4
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      daemon/src/db.rs
  2. 312
      daemon/src/maker_cfd.rs
  3. 221
      daemon/src/model/cfd.rs
  4. 67
      daemon/src/monitor.rs
  5. 12
      daemon/src/taker.rs
  6. 72
      daemon/src/taker_cfd.rs

6
daemon/src/db.rs

@ -170,7 +170,11 @@ pub async fn insert_new_cfd_state_by_order_id(
// make sure that the new state is different than the current one to avoid that we save the same // make sure that the new state is different than the current one to avoid that we save the same
// state twice // state twice
if mem::discriminant(&latest_cfd_state_in_db) == mem::discriminant(&new_state) { if mem::discriminant(&latest_cfd_state_in_db) == mem::discriminant(&new_state) {
anyhow::bail!("Cannot insert new state {} for cfd with order_id {} because it currently already is in state {}", new_state, order_id, latest_cfd_state_in_db); tracing::warn!(
"Same state transition for cfd with order_id {}: {}",
order_id,
latest_cfd_state_in_db
);
} }
let cfd_state = serde_json::to_string(&new_state)?; let cfd_state = serde_json::to_string(&new_state)?;

312
daemon/src/maker_cfd.rs

@ -4,21 +4,15 @@ use crate::db::{
load_cfd_by_order_id, load_order_by_id, load_cfd_by_order_id, load_order_by_id,
}; };
use crate::maker_inc_connections::TakerCommand; use crate::maker_inc_connections::TakerCommand;
use crate::model::cfd::{CetStatus, Cfd, CfdState, CfdStateCommon, Dlc, Order, OrderId}; use crate::model::cfd::{Cfd, CfdState, CfdStateChangeEvent, CfdStateCommon, Dlc, Order, OrderId};
use crate::model::{TakerId, Usd}; use crate::model::{TakerId, Usd};
use crate::monitor::{ use crate::monitor::MonitorParams;
CetTimelockExpired, CommitFinality, LockFinality, MonitorParams, RefundFinality,
RefundTimelockExpired,
};
use crate::wallet::Wallet; use crate::wallet::Wallet;
use crate::wire::SetupMsg; use crate::wire::SetupMsg;
use crate::{maker_inc_connections, monitor, setup_contract_actor}; use crate::{maker_inc_connections, monitor, setup_contract_actor};
use anyhow::{bail, Result}; use anyhow::Result;
use async_trait::async_trait; use async_trait::async_trait;
use bdk::bitcoin::secp256k1::schnorrsig; use bdk::bitcoin::secp256k1::schnorrsig;
use bdk::bitcoin::{Amount, PublicKey};
use cfd_protocol::secp256k1_zkp::SECP256K1;
use cfd_protocol::{finalize_spend_transaction, spending_tx_sighash};
use std::time::SystemTime; use std::time::SystemTime;
use tokio::sync::{mpsc, watch}; use tokio::sync::{mpsc, watch};
use xtra::prelude::*; use xtra::prelude::*;
@ -414,277 +408,29 @@ impl Actor {
Ok(()) Ok(())
} }
async fn handle_lock_finality(&mut self, msg: LockFinality) -> Result<()> { async fn handle_monitoring_event(&mut self, event: monitor::Event) -> Result<()> {
let order_id = msg.0; let order_id = event.order_id();
tracing::debug!(%order_id, "Lock transaction has reached finality");
let mut conn = self.db.acquire().await?;
let cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
use CfdState::*;
let dlc = match cfd.state {
PendingOpen { dlc, .. } => dlc,
OutgoingOrderRequest { .. } => unreachable!("taker-only state"),
IncomingOrderRequest { .. }
| Accepted { .. }
| Rejected { .. }
| ContractSetup { .. } => bail!("Did not expect lock finality yet: ignoring"),
Open { .. } | OpenCommitted { .. } | MustRefund { .. } | Refunded { .. } => {
bail!("State already assumes lock finality: ignoring")
}
};
insert_new_cfd_state_by_order_id(
msg.0,
CfdState::Open {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
dlc,
},
&mut conn,
)
.await?;
Ok(())
}
async fn handle_commit_finality(&mut self, msg: CommitFinality) -> Result<()> {
let order_id = msg.0;
tracing::debug!(%order_id, "Commit transaction has reached finality");
let mut conn = self.db.acquire().await?;
let cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
use CfdState::*;
let dlc = match cfd.state {
Open { dlc, .. } => dlc,
PendingOpen { dlc, .. } => {
tracing::debug!(%order_id, "Was waiting on lock finality, jumping ahead");
dlc
}
OutgoingOrderRequest { .. } => unreachable!("taker-only state"),
IncomingOrderRequest { .. }
| Accepted { .. }
| Rejected { .. }
| ContractSetup { .. } => bail!("Did not expect commit finality yet: ignoring"),
OpenCommitted { .. } | MustRefund { .. } | Refunded { .. } => {
bail!("State already assumes commit finality: ignoring")
}
};
insert_new_cfd_state_by_order_id(
msg.0,
CfdState::OpenCommitted {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
dlc,
cet_status: CetStatus::Unprepared,
},
&mut conn,
)
.await?;
Ok(())
}
async fn handle_cet_timelock_expired(&mut self, msg: CetTimelockExpired) -> Result<()> {
let order_id = msg.0;
tracing::debug!(%order_id, "CET timelock has expired");
let mut conn = self.db.acquire().await?; let mut conn = self.db.acquire().await?;
let cfd = load_cfd_by_order_id(order_id, &mut conn).await?; let cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
use CfdState::*; let new_state = cfd.handle(CfdStateChangeEvent::Monitor(event))?;
let new_state = match cfd.state {
CfdState::OpenCommitted {
dlc,
cet_status: CetStatus::Unprepared,
..
} => CfdState::OpenCommitted {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
dlc,
cet_status: CetStatus::TimelockExpired,
},
CfdState::OpenCommitted {
dlc,
cet_status: CetStatus::OracleSigned(price),
..
} => CfdState::OpenCommitted {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
dlc,
cet_status: CetStatus::Ready(price),
},
PendingOpen { dlc, .. } => {
tracing::debug!(%order_id, "Was waiting on lock finality, jumping ahead");
CfdState::OpenCommitted {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
dlc,
cet_status: CetStatus::TimelockExpired,
}
}
Open { dlc, .. } => {
tracing::debug!(%order_id, "Was not aware of commit TX broadcast, jumping ahead");
CfdState::OpenCommitted {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
dlc,
cet_status: CetStatus::TimelockExpired,
}
}
OutgoingOrderRequest { .. } => unreachable!("taker-only state"),
IncomingOrderRequest { .. }
| Accepted { .. }
| Rejected { .. }
| ContractSetup { .. } => bail!("Did not expect CET timelock expiry yet: ignoring"),
OpenCommitted {
cet_status: CetStatus::TimelockExpired,
..
}
| OpenCommitted {
cet_status: CetStatus::Ready(_),
..
} => bail!("State already assumes CET timelock expiry: ignoring"),
MustRefund { .. } | Refunded { .. } => {
bail!("Refund path does not care about CET timelock expiry: ignoring")
}
};
insert_new_cfd_state_by_order_id(msg.0, new_state, &mut conn).await?;
Ok(()) insert_new_cfd_state_by_order_id(order_id, new_state.clone(), &mut conn).await?;
}
async fn handle_refund_timelock_expired(&mut self, msg: RefundTimelockExpired) -> Result<()> {
let order_id = msg.0;
tracing::debug!(%order_id, "Refund timelock has expired");
let mut conn = self.db.acquire().await?;
let cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
use CfdState::*;
let dlc = match cfd.state {
OpenCommitted { dlc, .. } => {
insert_new_cfd_state_by_order_id(
msg.0,
MustRefund {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
dlc: dlc.clone(),
},
&mut conn,
)
.await?;
dlc
}
MustRefund { .. } | Refunded { .. } => {
bail!("State already assumes refund timelock expiry: ignoring")
}
OutgoingOrderRequest { .. } => unreachable!("taker-only state"),
IncomingOrderRequest { .. }
| Accepted { .. }
| Rejected { .. }
| ContractSetup { .. } => bail!("Did not expect refund timelock expiry yet: ignoring"),
PendingOpen { dlc, .. } => {
tracing::debug!(%order_id, "Was waiting on lock finality, jumping ahead");
dlc
}
Open { dlc, .. } => {
tracing::debug!(%order_id, "Was waiting on CET timelock expiry, jumping ahead");
dlc
}
};
insert_new_cfd_state_by_order_id(
msg.0,
MustRefund {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
dlc: dlc.clone(),
},
&mut conn,
)
.await?;
let sig_hash = spending_tx_sighash(
&dlc.refund.0,
&dlc.commit.2,
Amount::from_sat(dlc.commit.0.output[0].value),
);
let our_sig = SECP256K1.sign(&sig_hash, &dlc.identity);
let our_pubkey = PublicKey::new(bdk::bitcoin::secp256k1::PublicKey::from_secret_key(
SECP256K1,
&dlc.identity,
));
let counterparty_sig = dlc.refund.1;
let counterparty_pubkey = dlc.identity_counterparty;
let signed_refund_tx = finalize_spend_transaction(
dlc.refund.0,
&dlc.commit.2,
(our_pubkey, our_sig),
(counterparty_pubkey, counterparty_sig),
)?;
// TODO: Not sure that should be done here...
// Consider bubbling the refund availability up to the user, and let user trigger
// transaction publication
if let CfdState::MustRefund { .. } = new_state {
let signed_refund_tx = cfd.refund_tx()?;
let txid = self let txid = self
.wallet .wallet
.try_broadcast_transaction(signed_refund_tx) .try_broadcast_transaction(signed_refund_tx)
.await?; .await?;
tracing::info!("Refund transaction published on chain: {}", txid); tracing::info!("Refund transaction published on chain: {}", txid);
Ok(())
} }
async fn handle_refund_finality(&mut self, msg: RefundFinality) -> Result<()> {
let order_id = msg.0;
tracing::debug!(%order_id, "Refund transaction has reached finality");
let mut conn = self.db.acquire().await?;
let cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
use CfdState::*;
match cfd.state {
MustRefund { .. } => (),
OutgoingOrderRequest { .. } => unreachable!("taker-only state"),
IncomingOrderRequest { .. }
| Accepted { .. }
| Rejected { .. }
| ContractSetup { .. } => bail!("Did not expect refund finality yet: ignoring"),
PendingOpen { .. } => {
tracing::debug!(%order_id, "Was waiting on lock finality, jumping ahead");
}
Open { .. } => {
tracing::debug!(%order_id, "Was waiting on CET timelock expiry, jumping ahead");
}
OpenCommitted { .. } => {
tracing::debug!(%order_id, "Was waiting on refund timelock expiry, jumping ahead");
}
Refunded { .. } => bail!("State already assumes refund finality: ignoring"),
};
insert_new_cfd_state_by_order_id(
msg.0,
CfdState::Refunded {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
},
&mut conn,
)
.await?;
Ok(()) Ok(())
} }
} }
@ -739,37 +485,9 @@ impl Handler<CfdSetupCompleted> for Actor {
} }
#[async_trait] #[async_trait]
impl Handler<LockFinality> for Actor { impl Handler<monitor::Event> for Actor {
async fn handle(&mut self, msg: LockFinality, _ctx: &mut Context<Self>) { async fn handle(&mut self, msg: monitor::Event, _ctx: &mut Context<Self>) {
log_error!(self.handle_lock_finality(msg)) log_error!(self.handle_monitoring_event(msg))
}
}
#[async_trait]
impl Handler<CommitFinality> for Actor {
async fn handle(&mut self, msg: CommitFinality, _ctx: &mut Context<Self>) {
log_error!(self.handle_commit_finality(msg))
}
}
#[async_trait]
impl Handler<CetTimelockExpired> for Actor {
async fn handle(&mut self, msg: CetTimelockExpired, _ctx: &mut Context<Self>) {
log_error!(self.handle_cet_timelock_expired(msg))
}
}
#[async_trait]
impl Handler<RefundTimelockExpired> for Actor {
async fn handle(&mut self, msg: RefundTimelockExpired, _ctx: &mut Context<Self>) {
log_error!(self.handle_refund_timelock_expired(msg))
}
}
#[async_trait]
impl Handler<RefundFinality> for Actor {
async fn handle(&mut self, msg: RefundFinality, _ctx: &mut Context<Self>) {
log_error!(self.handle_refund_finality(msg))
} }
} }

221
daemon/src/model/cfd.rs

@ -1,9 +1,11 @@
use crate::model::{Leverage, Position, TakerId, TradingPair, Usd}; use crate::model::{Leverage, Position, TakerId, TradingPair, Usd};
use anyhow::Result; use crate::monitor;
use anyhow::{bail, Result};
use bdk::bitcoin::secp256k1::{SecretKey, Signature}; use bdk::bitcoin::secp256k1::{SecretKey, Signature};
use bdk::bitcoin::{Address, Amount, PublicKey, Transaction}; use bdk::bitcoin::{Address, Amount, PublicKey, Transaction};
use bdk::descriptor::Descriptor; use bdk::descriptor::Descriptor;
use cfd_protocol::secp256k1_zkp::EcdsaAdaptorSignature; use cfd_protocol::secp256k1_zkp::{EcdsaAdaptorSignature, SECP256K1};
use cfd_protocol::{finalize_spend_transaction, spending_tx_sighash};
use rust_decimal::Decimal; use rust_decimal::Decimal;
use rust_decimal_macros::dec; use rust_decimal_macros::dec;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -363,6 +365,221 @@ impl Cfd {
#[allow(dead_code)] #[allow(dead_code)]
pub const CET_TIMELOCK: u32 = 12; pub const CET_TIMELOCK: u32 = 12;
pub fn handle(&self, event: CfdStateChangeEvent) -> Result<CfdState> {
use CfdState::*;
// TODO: Display impl
tracing::info!("Cfd state change event {:?}", event);
let order_id = self.order.id;
let new_state = match event {
CfdStateChangeEvent::Monitor(event) => match event {
monitor::Event::LockFinality(_) => match self.state.clone() {
PendingOpen { dlc, .. } => CfdState::Open {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
dlc,
},
OutgoingOrderRequest { .. } => unreachable!("taker-only state"),
IncomingOrderRequest { .. }
| Accepted { .. }
| Rejected { .. }
| ContractSetup { .. } => bail!("Did not expect lock finality yet: ignoring"),
Open { .. } | OpenCommitted { .. } | MustRefund { .. } | Refunded { .. } => {
bail!("State already assumes lock finality: ignoring")
}
},
monitor::Event::CommitFinality(_) => {
let dlc = match self.state.clone() {
Open { dlc, .. } => dlc,
PendingOpen { dlc, .. } => {
tracing::debug!(%order_id, "Was waiting on lock finality, jumping ahead");
dlc
}
OutgoingOrderRequest { .. } => unreachable!("taker-only state"),
IncomingOrderRequest { .. }
| Accepted { .. }
| Rejected { .. }
| ContractSetup { .. } => {
bail!("Did not expect commit finality yet: ignoring")
}
OpenCommitted { .. } | MustRefund { .. } | Refunded { .. } => {
bail!("State already assumes commit finality: ignoring")
}
};
OpenCommitted {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
dlc,
cet_status: CetStatus::Unprepared,
}
}
monitor::Event::CetTimelockExpired(_) => match self.state.clone() {
CfdState::OpenCommitted {
dlc,
cet_status: CetStatus::Unprepared,
..
} => CfdState::OpenCommitted {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
dlc,
cet_status: CetStatus::TimelockExpired,
},
CfdState::OpenCommitted {
dlc,
cet_status: CetStatus::OracleSigned(price),
..
} => CfdState::OpenCommitted {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
dlc,
cet_status: CetStatus::Ready(price),
},
PendingOpen { dlc, .. } => {
tracing::debug!(%order_id, "Was waiting on lock finality, jumping ahead");
CfdState::OpenCommitted {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
dlc,
cet_status: CetStatus::TimelockExpired,
}
}
Open { dlc, .. } => {
tracing::debug!(%order_id, "Was not aware of commit TX broadcast, jumping ahead");
CfdState::OpenCommitted {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
dlc,
cet_status: CetStatus::TimelockExpired,
}
}
OutgoingOrderRequest { .. } => unreachable!("taker-only state"),
IncomingOrderRequest { .. }
| Accepted { .. }
| Rejected { .. }
| ContractSetup { .. } => {
bail!("Did not expect CET timelock expiry yet: ignoring")
}
OpenCommitted {
cet_status: CetStatus::TimelockExpired,
..
}
| OpenCommitted {
cet_status: CetStatus::Ready(_),
..
} => bail!("State already assumes CET timelock expiry: ignoring"),
MustRefund { .. } | Refunded { .. } => {
bail!("Refund path does not care about CET timelock expiry: ignoring")
}
},
monitor::Event::RefundTimelockExpired(_) => {
let dlc = match self.state.clone() {
OpenCommitted { dlc, .. } => dlc,
MustRefund { .. } | Refunded { .. } => {
bail!("State already assumes refund timelock expiry: ignoring")
}
OutgoingOrderRequest { .. } => unreachable!("taker-only state"),
IncomingOrderRequest { .. }
| Accepted { .. }
| Rejected { .. }
| ContractSetup { .. } => {
bail!("Did not expect refund timelock expiry yet: ignoring")
}
PendingOpen { dlc, .. } => {
tracing::debug!(%order_id, "Was waiting on lock finality, jumping ahead");
dlc
}
Open { dlc, .. } => {
tracing::debug!(%order_id, "Was waiting on CET timelock expiry, jumping ahead");
dlc
}
};
MustRefund {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
dlc,
}
}
monitor::Event::RefundFinality(_) => {
match self.state {
MustRefund { .. } => (),
OutgoingOrderRequest { .. } => unreachable!("taker-only state"),
IncomingOrderRequest { .. }
| Accepted { .. }
| Rejected { .. }
| ContractSetup { .. } => {
bail!("Did not expect refund finality yet: ignoring")
}
PendingOpen { .. } => {
tracing::debug!(%order_id, "Was waiting on lock finality, jumping ahead");
}
Open { .. } => {
tracing::debug!(%order_id, "Was waiting on CET timelock expiry, jumping ahead");
}
OpenCommitted { .. } => {
tracing::debug!(%order_id, "Was waiting on refund timelock expiry, jumping ahead");
}
Refunded { .. } => bail!("State already assumes refund finality: ignoring"),
}
Refunded {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
}
}
},
};
Ok(new_state)
}
pub fn refund_tx(&self) -> Result<Transaction> {
let dlc = if let CfdState::MustRefund { dlc, .. } = self.state.clone() {
dlc
} else {
bail!("Refund transaction can only be constructed when in state MustRefund, but we are currently in {}", self.state.clone())
};
let sig_hash = spending_tx_sighash(
&dlc.refund.0,
&dlc.commit.2,
Amount::from_sat(dlc.commit.0.output[0].value),
);
let our_sig = SECP256K1.sign(&sig_hash, &dlc.identity);
let our_pubkey = PublicKey::new(bdk::bitcoin::secp256k1::PublicKey::from_secret_key(
SECP256K1,
&dlc.identity,
));
let counterparty_sig = dlc.refund.1;
let counterparty_pubkey = dlc.identity_counterparty;
let signed_refund_tx = finalize_spend_transaction(
dlc.refund.0,
&dlc.commit.2,
(our_pubkey, our_sig),
(counterparty_pubkey, counterparty_sig),
)?;
Ok(signed_refund_tx)
}
}
#[derive(Debug, Clone)]
pub enum CfdStateChangeEvent {
// TODO: groupd other events by actors into enums and add them here so we can bundle all
// transitions into cfd.transition_to(...)
Monitor(monitor::Event),
} }
fn calculate_profit( fn calculate_profit(

67
daemon/src/monitor.rs

@ -23,12 +23,7 @@ pub struct MonitorParams {
impl<T> Actor<T> impl<T> Actor<T>
where where
T: xtra::Actor T: xtra::Actor + xtra::Handler<Event>,
+ xtra::Handler<LockFinality>
+ xtra::Handler<CommitFinality>
+ xtra::Handler<CetTimelockExpired>
+ xtra::Handler<RefundTimelockExpired>
+ xtra::Handler<RefundFinality>,
{ {
pub fn new( pub fn new(
electrum_rpc_url: &str, electrum_rpc_url: &str,
@ -59,7 +54,7 @@ where
lock_subscription.wait_until_final().await.unwrap(); lock_subscription.wait_until_final().await.unwrap();
cfd_actor_addr cfd_actor_addr
.do_send_async(LockFinality(id)) .do_send_async(Event::LockFinality(id))
.await .await
.unwrap(); .unwrap();
} }
@ -77,7 +72,7 @@ where
commit_subscription.wait_until_final().await.unwrap(); commit_subscription.wait_until_final().await.unwrap();
cfd_actor_addr cfd_actor_addr
.do_send_async(CommitFinality(id)) .do_send_async(Event::CommitFinality(id))
.await .await
.unwrap(); .unwrap();
} }
@ -93,7 +88,7 @@ where
.unwrap(); .unwrap();
cfd_actor_addr cfd_actor_addr
.do_send_async(CetTimelockExpired(id)) .do_send_async(Event::CetTimelockExpired(id))
.await .await
.unwrap(); .unwrap();
} }
@ -110,7 +105,7 @@ where
.unwrap(); .unwrap();
cfd_actor_addr cfd_actor_addr
.do_send_async(RefundTimelockExpired(id)) .do_send_async(Event::RefundTimelockExpired(id))
.await .await
.unwrap(); .unwrap();
} }
@ -126,7 +121,7 @@ where
refund_subscription.wait_until_final().await.unwrap(); refund_subscription.wait_until_final().await.unwrap();
cfd_actor_addr cfd_actor_addr
.do_send_async(RefundFinality(id)) .do_send_async(Event::RefundFinality(id))
.await .await
.unwrap(); .unwrap();
} }
@ -147,33 +142,30 @@ impl xtra::Message for StartMonitoring {
type Result = (); type Result = ();
} }
pub struct LockFinality(pub OrderId); #[derive(Debug, Clone)]
pub enum Event {
impl xtra::Message for LockFinality { LockFinality(OrderId),
type Result = (); CommitFinality(OrderId),
} CetTimelockExpired(OrderId),
RefundTimelockExpired(OrderId),
pub struct CommitFinality(pub OrderId); RefundFinality(OrderId),
impl xtra::Message for CommitFinality {
type Result = ();
}
pub struct CetTimelockExpired(pub OrderId);
impl xtra::Message for CetTimelockExpired {
type Result = ();
} }
pub struct RefundTimelockExpired(pub OrderId); impl Event {
pub fn order_id(&self) -> OrderId {
impl xtra::Message for RefundTimelockExpired { let order_id = match self {
type Result = (); Event::LockFinality(order_id) => order_id,
Event::CommitFinality(order_id) => order_id,
Event::CetTimelockExpired(order_id) => order_id,
Event::RefundTimelockExpired(order_id) => order_id,
Event::RefundFinality(order_id) => order_id,
};
*order_id
}
} }
pub struct RefundFinality(pub OrderId); impl xtra::Message for Event {
impl xtra::Message for RefundFinality {
type Result = (); type Result = ();
} }
@ -192,12 +184,7 @@ impl<T> xtra::Actor for Actor<T> where T: xtra::Actor {}
#[async_trait] #[async_trait]
impl<T> xtra::Handler<StartMonitoring> for Actor<T> impl<T> xtra::Handler<StartMonitoring> for Actor<T>
where where
T: xtra::Actor T: xtra::Actor + xtra::Handler<Event>,
+ xtra::Handler<LockFinality>
+ xtra::Handler<CommitFinality>
+ xtra::Handler<CetTimelockExpired>
+ xtra::Handler<RefundTimelockExpired>
+ xtra::Handler<RefundFinality>,
{ {
async fn handle(&mut self, msg: StartMonitoring, _ctx: &mut xtra::Context<Self>) { async fn handle(&mut self, msg: StartMonitoring, _ctx: &mut xtra::Context<Self>) {
log_error!(self.handle_start_monitoring(msg)); log_error!(self.handle_start_monitoring(msg));

12
daemon/src/taker.rs

@ -8,6 +8,7 @@ use model::cfd::{Cfd, Order};
use rocket::fairing::AdHoc; use rocket::fairing::AdHoc;
use rocket_db_pools::Database; use rocket_db_pools::Database;
use seed::Seed; use seed::Seed;
use std::collections::HashMap;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::path::PathBuf; use std::path::PathBuf;
use std::thread::sleep; use std::thread::sleep;
@ -24,6 +25,7 @@ mod db;
mod keypair; mod keypair;
mod logger; mod logger;
mod model; mod model;
mod monitor;
mod routes; mod routes;
mod routes_taker; mod routes_taker;
mod seed; mod seed;
@ -77,6 +79,7 @@ async fn main() -> Result<()> {
let data_dir = opts let data_dir = opts
.data_dir .data_dir
.clone()
.unwrap_or_else(|| std::env::current_dir().expect("unable to get cwd")); .unwrap_or_else(|| std::env::current_dir().expect("unable to get cwd"));
if !data_dir.exists() { if !data_dir.exists() {
@ -162,6 +165,8 @@ async fn main() -> Result<()> {
.create(None) .create(None)
.spawn_global(); .spawn_global();
let (monitor_actor_address, monitor_actor_context) = xtra::Context::new(None);
let cfd_actor_inbox = taker_cfd::Actor::new( let cfd_actor_inbox = taker_cfd::Actor::new(
db, db,
wallet.clone(), wallet.clone(),
@ -169,6 +174,7 @@ async fn main() -> Result<()> {
cfd_feed_sender, cfd_feed_sender,
order_feed_sender, order_feed_sender,
send_to_maker, send_to_maker,
monitor_actor_address,
) )
.await .await
.unwrap() .unwrap()
@ -178,6 +184,12 @@ async fn main() -> Result<()> {
let inc_maker_messages_actor = let inc_maker_messages_actor =
taker_inc_message_actor::new(read, cfd_actor_inbox.clone()); taker_inc_message_actor::new(read, cfd_actor_inbox.clone());
tokio::spawn(monitor_actor_context.run(monitor::Actor::new(
&opts.electrum,
HashMap::new(),
cfd_actor_inbox.clone(),
)));
tokio::spawn(wallet_sync::new(wallet, wallet_feed_sender)); tokio::spawn(wallet_sync::new(wallet, wallet_feed_sender));
tokio::spawn(inc_maker_messages_actor); tokio::spawn(inc_maker_messages_actor);

72
daemon/src/taker_cfd.rs

@ -4,11 +4,12 @@ use crate::db::{
}; };
use crate::actors::log_error; use crate::actors::log_error;
use crate::model::cfd::{Cfd, CfdState, CfdStateCommon, Dlc, Order, OrderId}; use crate::model::cfd::{Cfd, CfdState, CfdStateChangeEvent, CfdStateCommon, Dlc, Order, OrderId};
use crate::model::Usd; use crate::model::Usd;
use crate::monitor::MonitorParams;
use crate::wallet::Wallet; use crate::wallet::Wallet;
use crate::wire::SetupMsg; use crate::wire::SetupMsg;
use crate::{send_to_socket, setup_contract_actor, wire}; use crate::{monitor, send_to_socket, setup_contract_actor, wire};
use anyhow::Result; use anyhow::Result;
use async_trait::async_trait; use async_trait::async_trait;
use bdk::bitcoin::secp256k1::schnorrsig; use bdk::bitcoin::secp256k1::schnorrsig;
@ -43,6 +44,7 @@ pub struct Actor {
// TODO: Move the contract setup into a dedicated actor and send messages to that actor that // TODO: Move the contract setup into a dedicated actor and send messages to that actor that
// manages the state instead of this ugly buffer // manages the state instead of this ugly buffer
contract_setup_message_buffer: Vec<SetupMsg>, contract_setup_message_buffer: Vec<SetupMsg>,
monitor_actor: Address<monitor::Actor<Actor>>,
} }
impl Actor { impl Actor {
@ -53,6 +55,7 @@ impl Actor {
cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>, cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>,
order_feed_actor_inbox: watch::Sender<Option<Order>>, order_feed_actor_inbox: watch::Sender<Option<Order>>,
send_to_maker: Address<send_to_socket::Actor>, send_to_maker: Address<send_to_socket::Actor>,
monitor_actor: Address<monitor::Actor<Actor>>,
) -> Result<Self> { ) -> Result<Self> {
let mut conn = db.acquire().await?; let mut conn = db.acquire().await?;
cfd_feed_actor_inbox.send(load_all_cfds(&mut conn).await?)?; cfd_feed_actor_inbox.send(load_all_cfds(&mut conn).await?)?;
@ -66,6 +69,7 @@ impl Actor {
send_to_maker, send_to_maker,
current_contract_setup: None, current_contract_setup: None,
contract_setup_message_buffer: vec![], contract_setup_message_buffer: vec![],
monitor_actor,
}) })
} }
@ -231,12 +235,63 @@ impl Actor {
self.cfd_feed_actor_inbox self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?; .send(load_all_cfds(&mut conn).await?)?;
let txid = self.wallet.try_broadcast_transaction(dlc.lock.0).await?; let txid = self
.wallet
.try_broadcast_transaction(dlc.lock.0.clone())
.await?;
tracing::info!("Lock transaction published with txid {}", txid); tracing::info!("Lock transaction published with txid {}", txid);
// TODO: tx monitoring, once confirmed with x blocks transition the Cfd to // TODO: It's a bit suspicious to load this just to get the
// Open // refund timelock
let cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
let script_pubkey = dlc.address.script_pubkey();
self.monitor_actor
.do_send_async(monitor::StartMonitoring {
id: order_id,
params: MonitorParams {
lock: (dlc.lock.0.txid(), dlc.lock.1),
commit: (dlc.commit.0.txid(), dlc.commit.2),
cets: dlc
.cets
.into_iter()
.map(|(tx, _, range)| (tx.txid(), script_pubkey.clone(), range))
.collect(),
refund: (
dlc.refund.0.txid(),
script_pubkey,
cfd.refund_timelock_in_blocks(),
),
},
})
.await?;
Ok(())
}
async fn handle_monitoring_event(&mut self, event: monitor::Event) -> Result<()> {
let order_id = event.order_id();
let mut conn = self.db.acquire().await?;
let cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
let new_state = cfd.handle(CfdStateChangeEvent::Monitor(event))?;
insert_new_cfd_state_by_order_id(order_id, new_state.clone(), &mut conn).await?;
// TODO: Not sure that should be done here...
// Consider bubbling the refund availability up to the user, and let user trigger
// transaction publication
if let CfdState::MustRefund { .. } = new_state {
let signed_refund_tx = cfd.refund_tx()?;
let txid = self
.wallet
.try_broadcast_transaction(signed_refund_tx)
.await?;
tracing::info!("Refund transaction published on chain: {}", txid);
}
Ok(()) Ok(())
} }
@ -284,6 +339,13 @@ impl Handler<CfdSetupCompleted> for Actor {
} }
} }
#[async_trait]
impl Handler<monitor::Event> for Actor {
async fn handle(&mut self, msg: monitor::Event, _ctx: &mut Context<Self>) {
log_error!(self.handle_monitoring_event(msg))
}
}
impl Message for TakeOffer { impl Message for TakeOffer {
type Result = (); type Result = ();
} }

Loading…
Cancel
Save