Browse Source

Handle existing Cfds upon daemon startup

- Cleanup and transaction (re-)publication
- Monitor actor monitors existing cfds upon startup
fix-olivia-event-id
Daniel Karzel 3 years ago
parent
commit
c3f8b4b35b
No known key found for this signature in database GPG Key ID: 30C3FC2E438ADB6E
  1. 27
      daemon/src/cleanup.rs
  2. 18
      daemon/src/maker.rs
  3. 31
      daemon/src/maker_cfd.rs
  4. 92
      daemon/src/model/cfd.rs
  5. 170
      daemon/src/monitor.rs
  6. 22
      daemon/src/taker.rs
  7. 31
      daemon/src/taker_cfd.rs

27
daemon/src/cleanup.rs

@ -0,0 +1,27 @@
use crate::db::{insert_new_cfd_state_by_order_id, load_all_cfds};
use crate::model::cfd::{Cfd, CfdState, CfdStateCommon};
use anyhow::Result;
use sqlx::SqlitePool;
use std::time::SystemTime;
pub async fn transition_non_continue_cfds_to_setup_failed(db: SqlitePool) -> Result<()> {
let mut conn = db.acquire().await?;
let cfds = load_all_cfds(&mut conn).await?;
for cfd in cfds.iter().filter(|cfd| Cfd::is_cleanup(cfd)) {
insert_new_cfd_state_by_order_id(
cfd.order.id,
CfdState::SetupFailed {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
info: format!("Was in state {} which cannot be continued.", cfd.state),
},
&mut conn,
)
.await?;
}
Ok(())
}

18
daemon/src/maker.rs

@ -1,5 +1,6 @@
use crate::auth::MAKER_USERNAME; use crate::auth::MAKER_USERNAME;
use crate::cfd_feed::CfdFeed; use crate::cfd_feed::CfdFeed;
use crate::db::load_all_cfds;
use crate::seed::Seed; use crate::seed::Seed;
use crate::wallet::Wallet; use crate::wallet::Wallet;
use anyhow::{Context, Result}; use anyhow::{Context, Result};
@ -10,7 +11,6 @@ use model::cfd::Order;
use model::WalletInfo; use model::WalletInfo;
use rocket::fairing::AdHoc; use rocket::fairing::AdHoc;
use rocket_db_pools::Database; use rocket_db_pools::Database;
use std::collections::HashMap;
use std::path::PathBuf; use std::path::PathBuf;
use std::task::Poll; use std::task::Poll;
use tokio::sync::watch; use tokio::sync::watch;
@ -22,6 +22,7 @@ mod actors;
mod auth; mod auth;
mod bitmex_price_feed; mod bitmex_price_feed;
mod cfd_feed; mod cfd_feed;
mod cleanup;
mod db; mod db;
mod keypair; mod keypair;
mod logger; mod logger;
@ -150,11 +151,17 @@ async fn main() -> Result<()> {
None => return Err(rocket), None => return Err(rocket),
}; };
cleanup::transition_non_continue_cfds_to_setup_failed(db.clone())
.await
.unwrap();
let (maker_inc_connections_address, maker_inc_connections_context) = let (maker_inc_connections_address, maker_inc_connections_context) =
xtra::Context::new(None); xtra::Context::new(None);
let (monitor_actor_address, monitor_actor_context) = xtra::Context::new(None); let (monitor_actor_address, monitor_actor_context) = xtra::Context::new(None);
let mut conn = db.acquire().await.unwrap();
let cfds = load_all_cfds(&mut conn).await.unwrap();
let cfd_maker_actor_inbox = maker_cfd::Actor::new( let cfd_maker_actor_inbox = maker_cfd::Actor::new(
db, db,
wallet.clone(), wallet.clone(),
@ -163,6 +170,7 @@ async fn main() -> Result<()> {
order_feed_sender, order_feed_sender,
maker_inc_connections_address.clone(), maker_inc_connections_address.clone(),
monitor_actor_address, monitor_actor_address,
cfds.clone(),
) )
.await .await
.unwrap() .unwrap()
@ -174,11 +182,9 @@ async fn main() -> Result<()> {
cfd_maker_actor_inbox.clone(), cfd_maker_actor_inbox.clone(),
)), )),
); );
tokio::spawn(monitor_actor_context.run(monitor::Actor::new( tokio::spawn(monitor_actor_context.run(
&opts.electrum, monitor::Actor::new(&opts.electrum, cfd_maker_actor_inbox.clone(), cfds).await,
HashMap::new(), ));
cfd_maker_actor_inbox.clone(),
)));
let listener_stream = futures::stream::poll_fn(move |ctx| { let listener_stream = futures::stream::poll_fn(move |ctx| {
let message = match futures::ready!(listener.poll_accept(ctx)) { let message = match futures::ready!(listener.poll_accept(ctx)) {

31
daemon/src/maker_cfd.rs

@ -67,6 +67,7 @@ enum SetupState {
} }
impl Actor { impl Actor {
#[allow(clippy::too_many_arguments)]
pub async fn new( pub async fn new(
db: sqlx::SqlitePool, db: sqlx::SqlitePool,
wallet: Wallet, wallet: Wallet,
@ -75,12 +76,26 @@ impl Actor {
order_feed_sender: watch::Sender<Option<Order>>, order_feed_sender: watch::Sender<Option<Order>>,
takers: Address<maker_inc_connections::Actor>, takers: Address<maker_inc_connections::Actor>,
monitor_actor: Address<monitor::Actor<Actor>>, monitor_actor: Address<monitor::Actor<Actor>>,
cfds: Vec<Cfd>,
) -> Result<Self> { ) -> Result<Self> {
let mut conn = db.acquire().await?; let mut conn = db.acquire().await?;
// populate the CFD feed with existing CFDs // populate the CFD feed with existing CFDs
cfd_feed.update(&mut conn).await?; cfd_feed.update(&mut conn).await?;
for dlc in cfds.iter().filter_map(|cfd| Cfd::pending_open_dlc(cfd)) {
let txid = wallet.try_broadcast_transaction(dlc.lock.0.clone()).await?;
tracing::info!("Lock transaction published with txid {}", txid);
}
for cfd in cfds.iter().filter(|cfd| Cfd::is_must_refund(cfd)) {
let signed_refund_tx = cfd.refund_tx()?;
let txid = wallet.try_broadcast_transaction(signed_refund_tx).await?;
tracing::info!("Refund transaction published on chain: {}", txid);
}
Ok(Self { Ok(Self {
db, db,
wallet, wallet,
@ -186,24 +201,10 @@ impl Actor {
// refund timelock // refund timelock
let cfd = load_cfd_by_order_id(msg.order_id, &mut conn).await?; let cfd = load_cfd_by_order_id(msg.order_id, &mut conn).await?;
let script_pubkey = dlc.address.script_pubkey();
self.monitor_actor self.monitor_actor
.do_send_async(monitor::StartMonitoring { .do_send_async(monitor::StartMonitoring {
id: msg.order_id, id: msg.order_id,
params: MonitorParams { params: MonitorParams::from_dlc_and_timelocks(dlc, cfd.refund_timelock_in_blocks()),
lock: (dlc.lock.0.txid(), dlc.lock.1),
commit: (dlc.commit.0.txid(), dlc.commit.2),
cets: dlc
.cets
.into_iter()
.map(|(tx, _, range)| (tx.txid(), script_pubkey.clone(), range))
.collect(),
refund: (
dlc.refund.0.txid(),
script_pubkey,
cfd.refund_timelock_in_blocks(),
),
},
}) })
.await?; .await?;

92
daemon/src/model/cfd.rs

@ -162,6 +162,7 @@ pub enum CfdState {
/// The maker rejected the CFD order. /// The maker rejected the CFD order.
/// ///
/// This state applies to taker and maker. /// This state applies to taker and maker.
/// This is a final state.
Rejected { Rejected {
common: CfdStateCommon, common: CfdStateCommon,
}, },
@ -206,9 +207,23 @@ pub enum CfdState {
dlc: Dlc, dlc: Dlc,
}, },
/// The Cfd was refunded and the refund transaction reached finality
///
/// This state applies to taker and maker.
/// This is a final state.
Refunded { Refunded {
common: CfdStateCommon, common: CfdStateCommon,
}, },
/// The Cfd was in a state that could not be continued after the application got interrupted
///
/// This state applies to taker and maker.
/// This is a final state.
/// It is safe to remove Cfds in this state from the database.
SetupFailed {
common: CfdStateCommon,
info: String,
},
} }
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
@ -233,6 +248,7 @@ impl CfdState {
CfdState::OpenCommitted { common, .. } => common, CfdState::OpenCommitted { common, .. } => common,
CfdState::MustRefund { common, .. } => common, CfdState::MustRefund { common, .. } => common,
CfdState::Refunded { common, .. } => common, CfdState::Refunded { common, .. } => common,
CfdState::SetupFailed { common, .. } => common,
}; };
*common *common
@ -276,6 +292,9 @@ impl Display for CfdState {
CfdState::Refunded { .. } => { CfdState::Refunded { .. } => {
write!(f, "Refunded") write!(f, "Refunded")
} }
CfdState::SetupFailed { .. } => {
write!(f, "Safely Aborted")
}
} }
} }
} }
@ -384,13 +403,15 @@ impl Cfd {
dlc, dlc,
}, },
OutgoingOrderRequest { .. } => unreachable!("taker-only state"), OutgoingOrderRequest { .. } => unreachable!("taker-only state"),
IncomingOrderRequest { .. } IncomingOrderRequest { .. } | Accepted { .. } | ContractSetup { .. } => {
| Accepted { .. } bail!("Did not expect lock finality yet: ignoring")
| Rejected { .. } }
| ContractSetup { .. } => bail!("Did not expect lock finality yet: ignoring"), Open { .. } | OpenCommitted { .. } | MustRefund { .. } => {
Open { .. } | OpenCommitted { .. } | MustRefund { .. } | Refunded { .. } => {
bail!("State already assumes lock finality: ignoring") bail!("State already assumes lock finality: ignoring")
} }
Rejected { .. } | Refunded { .. } | SetupFailed { .. } => {
bail!("The cfd is already in final state {}", self.state)
}
}, },
monitor::Event::CommitFinality(_) => { monitor::Event::CommitFinality(_) => {
let dlc = match self.state.clone() { let dlc = match self.state.clone() {
@ -400,15 +421,15 @@ impl Cfd {
dlc dlc
} }
OutgoingOrderRequest { .. } => unreachable!("taker-only state"), OutgoingOrderRequest { .. } => unreachable!("taker-only state"),
IncomingOrderRequest { .. } IncomingOrderRequest { .. } | Accepted { .. } | ContractSetup { .. } => {
| Accepted { .. }
| Rejected { .. }
| ContractSetup { .. } => {
bail!("Did not expect commit finality yet: ignoring") bail!("Did not expect commit finality yet: ignoring")
} }
OpenCommitted { .. } | MustRefund { .. } | Refunded { .. } => { OpenCommitted { .. } | MustRefund { .. } => {
bail!("State already assumes commit finality: ignoring") bail!("State already assumes commit finality: ignoring")
} }
Rejected { .. } | Refunded { .. } | SetupFailed { .. } => {
bail!("The cfd is already in final state {}", self.state)
}
}; };
OpenCommitted { OpenCommitted {
@ -463,10 +484,7 @@ impl Cfd {
} }
} }
OutgoingOrderRequest { .. } => unreachable!("taker-only state"), OutgoingOrderRequest { .. } => unreachable!("taker-only state"),
IncomingOrderRequest { .. } IncomingOrderRequest { .. } | Accepted { .. } | ContractSetup { .. } => {
| Accepted { .. }
| Rejected { .. }
| ContractSetup { .. } => {
bail!("Did not expect CET timelock expiry yet: ignoring") bail!("Did not expect CET timelock expiry yet: ignoring")
} }
OpenCommitted { OpenCommitted {
@ -477,21 +495,21 @@ impl Cfd {
cet_status: CetStatus::Ready(_), cet_status: CetStatus::Ready(_),
.. ..
} => bail!("State already assumes CET timelock expiry: ignoring"), } => bail!("State already assumes CET timelock expiry: ignoring"),
MustRefund { .. } | Refunded { .. } => { MustRefund { .. } => {
bail!("Refund path does not care about CET timelock expiry: ignoring") bail!("Refund path does not care about CET timelock expiry: ignoring")
} }
Rejected { .. } | Refunded { .. } | SetupFailed { .. } => {
bail!("The cfd is already in final state {}", self.state)
}
}, },
monitor::Event::RefundTimelockExpired(_) => { monitor::Event::RefundTimelockExpired(_) => {
let dlc = match self.state.clone() { let dlc = match self.state.clone() {
OpenCommitted { dlc, .. } => dlc, OpenCommitted { dlc, .. } => dlc,
MustRefund { .. } | Refunded { .. } => { MustRefund { .. } => {
bail!("State already assumes refund timelock expiry: ignoring") bail!("State already assumes refund timelock expiry: ignoring")
} }
OutgoingOrderRequest { .. } => unreachable!("taker-only state"), OutgoingOrderRequest { .. } => unreachable!("taker-only state"),
IncomingOrderRequest { .. } IncomingOrderRequest { .. } | Accepted { .. } | ContractSetup { .. } => {
| Accepted { .. }
| Rejected { .. }
| ContractSetup { .. } => {
bail!("Did not expect refund timelock expiry yet: ignoring") bail!("Did not expect refund timelock expiry yet: ignoring")
} }
PendingOpen { dlc, .. } => { PendingOpen { dlc, .. } => {
@ -502,6 +520,9 @@ impl Cfd {
tracing::debug!(%order_id, "Was waiting on CET timelock expiry, jumping ahead"); tracing::debug!(%order_id, "Was waiting on CET timelock expiry, jumping ahead");
dlc dlc
} }
Rejected { .. } | Refunded { .. } | SetupFailed { .. } => {
bail!("The cfd is already in final state {}", self.state)
}
}; };
MustRefund { MustRefund {
@ -515,10 +536,7 @@ impl Cfd {
match self.state { match self.state {
MustRefund { .. } => (), MustRefund { .. } => (),
OutgoingOrderRequest { .. } => unreachable!("taker-only state"), OutgoingOrderRequest { .. } => unreachable!("taker-only state"),
IncomingOrderRequest { .. } IncomingOrderRequest { .. } | Accepted { .. } | ContractSetup { .. } => {
| Accepted { .. }
| Rejected { .. }
| ContractSetup { .. } => {
bail!("Did not expect refund finality yet: ignoring") bail!("Did not expect refund finality yet: ignoring")
} }
PendingOpen { .. } => { PendingOpen { .. } => {
@ -530,7 +548,9 @@ impl Cfd {
OpenCommitted { .. } => { OpenCommitted { .. } => {
tracing::debug!(%order_id, "Was waiting on refund timelock expiry, jumping ahead"); tracing::debug!(%order_id, "Was waiting on refund timelock expiry, jumping ahead");
} }
Refunded { .. } => bail!("State already assumes refund finality: ignoring"), Rejected { .. } | Refunded { .. } | SetupFailed { .. } => {
bail!("The cfd is already in final state {}", self.state)
}
} }
Refunded { Refunded {
@ -573,6 +593,28 @@ impl Cfd {
Ok(signed_refund_tx) Ok(signed_refund_tx)
} }
pub fn pending_open_dlc(&self) -> Option<Dlc> {
if let CfdState::PendingOpen { dlc, .. } = self.state.clone() {
Some(dlc)
} else {
None
}
}
pub fn is_must_refund(&self) -> bool {
matches!(self.state.clone(), CfdState::MustRefund { .. })
}
pub fn is_cleanup(&self) -> bool {
matches!(
self.state.clone(),
CfdState::OutgoingOrderRequest { .. }
| CfdState::IncomingOrderRequest { .. }
| CfdState::Accepted { .. }
| CfdState::ContractSetup { .. }
)
}
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]

170
daemon/src/monitor.rs

@ -1,5 +1,6 @@
use crate::actors::log_error; use crate::actors::log_error;
use crate::model::cfd::{Cfd, OrderId}; use crate::model::cfd::{CetStatus, Cfd, CfdState, Dlc, OrderId};
use crate::monitor::subscription::Subscription;
use anyhow::Result; use anyhow::Result;
use async_trait::async_trait; use async_trait::async_trait;
use bdk::bitcoin::{PublicKey, Script, Txid}; use bdk::bitcoin::{PublicKey, Script, Txid};
@ -15,35 +16,142 @@ const FINALITY_CONFIRMATIONS: u32 = 1;
#[derive(Clone)] #[derive(Clone)]
pub struct MonitorParams { pub struct MonitorParams {
pub lock: (Txid, Descriptor<PublicKey>), lock: (Txid, Descriptor<PublicKey>),
pub commit: (Txid, Descriptor<PublicKey>), commit: (Txid, Descriptor<PublicKey>),
pub cets: Vec<(Txid, Script, RangeInclusive<u64>)>, cets: Vec<(Txid, Script, RangeInclusive<u64>)>,
pub refund: (Txid, Script, u32), refund: (Txid, Script, u32),
}
impl MonitorParams {
pub fn from_dlc_and_timelocks(dlc: Dlc, refund_timelock_in_blocks: u32) -> Self {
let script_pubkey = dlc.address.script_pubkey();
MonitorParams {
lock: (dlc.lock.0.txid(), dlc.lock.1),
commit: (dlc.commit.0.txid(), dlc.commit.2),
cets: dlc
.cets
.into_iter()
.map(|(tx, _, range)| (tx.txid(), script_pubkey.clone(), range))
.collect(),
refund: (
dlc.refund.0.txid(),
script_pubkey,
refund_timelock_in_blocks,
),
}
}
} }
impl<T> Actor<T> impl<T> Actor<T>
where where
T: xtra::Actor + xtra::Handler<Event>, T: xtra::Actor + xtra::Handler<Event>,
{ {
pub fn new( pub async fn new(
electrum_rpc_url: &str, electrum_rpc_url: &str,
cfds: HashMap<OrderId, MonitorParams>,
cfd_actor_addr: xtra::Address<T>, cfd_actor_addr: xtra::Address<T>,
cfds: Vec<Cfd>,
) -> Self { ) -> Self {
let monitor = Monitor::new(electrum_rpc_url, FINALITY_CONFIRMATIONS).unwrap(); let monitor = Monitor::new(electrum_rpc_url, FINALITY_CONFIRMATIONS).unwrap();
Self { let mut actor = Self {
monitor, monitor,
cfds, cfds: HashMap::new(),
cfd_actor_addr, cfd_actor_addr,
};
for cfd in cfds {
match cfd.state.clone() {
// In PendingOpen we know the complete dlc setup and assume that the lock transaction will be published
CfdState::PendingOpen { dlc, .. } => {
let params = MonitorParams::from_dlc_and_timelocks(dlc.clone(), cfd.refund_timelock_in_blocks());
actor.cfds.insert(cfd.order.id, params.clone());
actor.monitor_all(&params, cfd.order.id).await;
}
CfdState::Open { dlc, .. } => {
let params = MonitorParams::from_dlc_and_timelocks(dlc.clone(), cfd.refund_timelock_in_blocks());
actor.cfds.insert(cfd.order.id, params.clone());
actor.monitor_commit_finality_and_timelocks(&params, cfd.order.id).await;
actor.monitor_refund_finality( params.clone(),cfd.order.id).await;
}
CfdState::OpenCommitted { dlc, cet_status, .. } => {
let params = MonitorParams::from_dlc_and_timelocks(dlc.clone(), cfd.refund_timelock_in_blocks());
actor.cfds.insert(cfd.order.id, params.clone());
let commit_subscription = actor
.monitor
.subscribe_to((params.commit.0, params.commit.1.script_pubkey()))
.await;
match cet_status {
CetStatus::Unprepared
| CetStatus::OracleSigned(_) => {
actor.monitor_commit_cet_timelock(cfd.order.id, &commit_subscription).await;
actor.monitor_commit_refund_timelock(&params, cfd.order.id, &commit_subscription).await;
actor.monitor_refund_finality( params.clone(),cfd.order.id).await;
}
CetStatus::TimelockExpired => {
actor.monitor_commit_refund_timelock(&params, cfd.order.id, &commit_subscription).await;
actor.monitor_refund_finality( params.clone(),cfd.order.id).await;
}
CetStatus::Ready(_price) => {
// TODO: monitor CET finality
actor.monitor_commit_refund_timelock(&params, cfd.order.id, &commit_subscription).await;
actor.monitor_refund_finality( params.clone(),cfd.order.id).await;
}
}
}
CfdState::MustRefund { dlc, .. } => {
// TODO: CET monitoring (?) - note: would require to add CetStatus information to MustRefund
let params = MonitorParams::from_dlc_and_timelocks(dlc.clone(), cfd.refund_timelock_in_blocks());
actor.cfds.insert(cfd.order.id, params.clone());
let commit_subscription = actor
.monitor
.subscribe_to((params.commit.0, params.commit.1.script_pubkey()))
.await;
actor.monitor_commit_refund_timelock(&params, cfd.order.id, &commit_subscription).await;
actor.monitor_refund_finality( params.clone(),cfd.order.id).await;
}
// too early to monitor
CfdState::OutgoingOrderRequest { .. }
| CfdState::IncomingOrderRequest { .. }
| CfdState::Accepted { .. }
| CfdState::ContractSetup { .. }
// final states
| CfdState::Rejected { .. }
| CfdState::Refunded { .. }
| CfdState::SetupFailed { .. } => ()
} }
} }
actor
}
async fn handle_start_monitoring(&mut self, msg: StartMonitoring) -> Result<()> { async fn handle_start_monitoring(&mut self, msg: StartMonitoring) -> Result<()> {
let StartMonitoring { id, params } = msg; let StartMonitoring { id, params } = msg;
self.cfds.insert(id, params.clone()); self.cfds.insert(id, params.clone());
self.monitor_all(&params, id).await;
Ok(())
}
async fn monitor_all(&self, params: &MonitorParams, order_id: OrderId) {
self.monitor_lock_finality(params, order_id).await;
self.monitor_commit_finality_and_timelocks(params, order_id)
.await;
self.monitor_refund_finality(params.clone(), order_id).await;
}
async fn monitor_lock_finality(&self, params: &MonitorParams, order_id: OrderId) {
tokio::spawn({ tokio::spawn({
let cfd_actor_addr = self.cfd_actor_addr.clone(); let cfd_actor_addr = self.cfd_actor_addr.clone();
let lock_subscription = self let lock_subscription = self
@ -54,17 +162,32 @@ where
lock_subscription.wait_until_final().await.unwrap(); lock_subscription.wait_until_final().await.unwrap();
cfd_actor_addr cfd_actor_addr
.do_send_async(Event::LockFinality(id)) .do_send_async(Event::LockFinality(order_id))
.await .await
.unwrap(); .unwrap();
} }
}); });
}
async fn monitor_commit_finality_and_timelocks(
&self,
params: &MonitorParams,
order_id: OrderId,
) {
let commit_subscription = self let commit_subscription = self
.monitor .monitor
.subscribe_to((params.commit.0, params.commit.1.script_pubkey())) .subscribe_to((params.commit.0, params.commit.1.script_pubkey()))
.await; .await;
self.monitor_commit_finality(order_id, &commit_subscription)
.await;
self.monitor_commit_cet_timelock(order_id, &commit_subscription)
.await;
self.monitor_commit_refund_timelock(params, order_id, &commit_subscription)
.await;
}
async fn monitor_commit_finality(&self, order_id: OrderId, commit_subscription: &Subscription) {
tokio::spawn({ tokio::spawn({
let cfd_actor_addr = self.cfd_actor_addr.clone(); let cfd_actor_addr = self.cfd_actor_addr.clone();
let commit_subscription = commit_subscription.clone(); let commit_subscription = commit_subscription.clone();
@ -72,12 +195,18 @@ where
commit_subscription.wait_until_final().await.unwrap(); commit_subscription.wait_until_final().await.unwrap();
cfd_actor_addr cfd_actor_addr
.do_send_async(Event::CommitFinality(id)) .do_send_async(Event::CommitFinality(order_id))
.await .await
.unwrap(); .unwrap();
} }
}); });
}
async fn monitor_commit_cet_timelock(
&self,
order_id: OrderId,
commit_subscription: &Subscription,
) {
tokio::spawn({ tokio::spawn({
let cfd_actor_addr = self.cfd_actor_addr.clone(); let cfd_actor_addr = self.cfd_actor_addr.clone();
let commit_subscription = commit_subscription.clone(); let commit_subscription = commit_subscription.clone();
@ -88,12 +217,19 @@ where
.unwrap(); .unwrap();
cfd_actor_addr cfd_actor_addr
.do_send_async(Event::CetTimelockExpired(id)) .do_send_async(Event::CetTimelockExpired(order_id))
.await .await
.unwrap(); .unwrap();
} }
}); });
}
async fn monitor_commit_refund_timelock(
&self,
params: &MonitorParams,
order_id: OrderId,
commit_subscription: &Subscription,
) {
tokio::spawn({ tokio::spawn({
let cfd_actor_addr = self.cfd_actor_addr.clone(); let cfd_actor_addr = self.cfd_actor_addr.clone();
let commit_subscription = commit_subscription.clone(); let commit_subscription = commit_subscription.clone();
@ -105,12 +241,14 @@ where
.unwrap(); .unwrap();
cfd_actor_addr cfd_actor_addr
.do_send_async(Event::RefundTimelockExpired(id)) .do_send_async(Event::RefundTimelockExpired(order_id))
.await .await
.unwrap(); .unwrap();
} }
}); });
}
async fn monitor_refund_finality(&self, params: MonitorParams, order_id: OrderId) {
tokio::spawn({ tokio::spawn({
let cfd_actor_addr = self.cfd_actor_addr.clone(); let cfd_actor_addr = self.cfd_actor_addr.clone();
let refund_subscription = self let refund_subscription = self
@ -121,15 +259,11 @@ where
refund_subscription.wait_until_final().await.unwrap(); refund_subscription.wait_until_final().await.unwrap();
cfd_actor_addr cfd_actor_addr
.do_send_async(Event::RefundFinality(id)) .do_send_async(Event::RefundFinality(order_id))
.await .await
.unwrap(); .unwrap();
} }
}); });
// TODO: CET subscription => Requires information from Oracle
Ok(())
} }
} }

22
daemon/src/taker.rs

@ -1,4 +1,5 @@
use crate::cfd_feed::CfdFeed; use crate::cfd_feed::CfdFeed;
use crate::db::load_all_cfds;
use crate::model::WalletInfo; use crate::model::WalletInfo;
use crate::wallet::Wallet; use crate::wallet::Wallet;
use anyhow::{Context, Result}; use anyhow::{Context, Result};
@ -10,7 +11,6 @@ use model::cfd::Order;
use rocket::fairing::AdHoc; use rocket::fairing::AdHoc;
use rocket_db_pools::Database; use rocket_db_pools::Database;
use seed::Seed; use seed::Seed;
use std::collections::HashMap;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::path::PathBuf; use std::path::PathBuf;
use std::thread::sleep; use std::thread::sleep;
@ -25,6 +25,7 @@ use xtra::Actor;
mod actors; mod actors;
mod bitmex_price_feed; mod bitmex_price_feed;
mod cfd_feed; mod cfd_feed;
mod cleanup;
mod db; mod db;
mod keypair; mod keypair;
mod logger; mod logger;
@ -153,20 +154,27 @@ async fn main() -> Result<()> {
None => return Err(rocket), None => return Err(rocket),
}; };
cleanup::transition_non_continue_cfds_to_setup_failed(db.clone())
.await
.unwrap();
let send_to_maker = send_to_socket::Actor::new(write) let send_to_maker = send_to_socket::Actor::new(write)
.create(None) .create(None)
.spawn_global(); .spawn_global();
let (monitor_actor_address, monitor_actor_context) = xtra::Context::new(None); let (monitor_actor_address, monitor_actor_context) = xtra::Context::new(None);
let mut conn = db.acquire().await.unwrap();
let cfds = load_all_cfds(&mut conn).await.unwrap();
let cfd_actor_inbox = taker_cfd::Actor::new( let cfd_actor_inbox = taker_cfd::Actor::new(
db, db.clone(),
wallet.clone(), wallet.clone(),
schnorrsig::PublicKey::from_keypair(SECP256K1, &oracle), schnorrsig::PublicKey::from_keypair(SECP256K1, &oracle),
cfd_feed_updater, cfd_feed_updater,
order_feed_sender, order_feed_sender,
send_to_maker, send_to_maker,
monitor_actor_address, monitor_actor_address,
cfds.clone(),
) )
.await .await
.unwrap() .unwrap()
@ -177,11 +185,11 @@ async fn main() -> Result<()> {
.map(move |item| taker_cfd::MakerStreamMessage { item }); .map(move |item| taker_cfd::MakerStreamMessage { item });
tokio::spawn(cfd_actor_inbox.clone().attach_stream(read)); tokio::spawn(cfd_actor_inbox.clone().attach_stream(read));
tokio::spawn(monitor_actor_context.run(monitor::Actor::new( tokio::spawn(
&opts.electrum, monitor_actor_context.run(
HashMap::new(), monitor::Actor::new(&opts.electrum, cfd_actor_inbox.clone(), cfds).await,
cfd_actor_inbox.clone(), ),
))); );
tokio::spawn(wallet_sync::new(wallet, wallet_feed_sender)); tokio::spawn(wallet_sync::new(wallet, wallet_feed_sender));
tokio::spawn({ tokio::spawn({
let cfd_actor_inbox = cfd_actor_inbox.clone(); let cfd_actor_inbox = cfd_actor_inbox.clone();

31
daemon/src/taker_cfd.rs

@ -57,6 +57,7 @@ pub struct Actor {
} }
impl Actor { impl Actor {
#[allow(clippy::too_many_arguments)]
pub async fn new( pub async fn new(
db: sqlx::SqlitePool, db: sqlx::SqlitePool,
wallet: Wallet, wallet: Wallet,
@ -65,12 +66,26 @@ impl Actor {
order_feed_actor_inbox: watch::Sender<Option<Order>>, order_feed_actor_inbox: watch::Sender<Option<Order>>,
send_to_maker: Address<send_to_socket::Actor<wire::TakerToMaker>>, send_to_maker: Address<send_to_socket::Actor<wire::TakerToMaker>>,
monitor_actor: Address<monitor::Actor<Actor>>, monitor_actor: Address<monitor::Actor<Actor>>,
cfds: Vec<Cfd>,
) -> Result<Self> { ) -> Result<Self> {
let mut conn = db.acquire().await?; let mut conn = db.acquire().await?;
// populate the CFD feed with existing CFDs // populate the CFD feed with existing CFDs
cfd_feed.update(&mut conn).await?; cfd_feed.update(&mut conn).await?;
for dlc in cfds.iter().filter_map(|cfd| Cfd::pending_open_dlc(cfd)) {
let txid = wallet.try_broadcast_transaction(dlc.lock.0.clone()).await?;
tracing::info!("Lock transaction published with txid {}", txid);
}
for cfd in cfds.iter().filter(|cfd| Cfd::is_must_refund(cfd)) {
let signed_refund_tx = cfd.refund_tx()?;
let txid = wallet.try_broadcast_transaction(signed_refund_tx).await?;
tracing::info!("Refund transaction published on chain: {}", txid);
}
Ok(Self { Ok(Self {
db, db,
wallet, wallet,
@ -250,24 +265,10 @@ impl Actor {
// refund timelock // refund timelock
let cfd = load_cfd_by_order_id(order_id, &mut conn).await?; let cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
let script_pubkey = dlc.address.script_pubkey();
self.monitor_actor self.monitor_actor
.do_send_async(monitor::StartMonitoring { .do_send_async(monitor::StartMonitoring {
id: order_id, id: order_id,
params: MonitorParams { params: MonitorParams::from_dlc_and_timelocks(dlc, cfd.refund_timelock_in_blocks()),
lock: (dlc.lock.0.txid(), dlc.lock.1),
commit: (dlc.commit.0.txid(), dlc.commit.2),
cets: dlc
.cets
.into_iter()
.map(|(tx, _, range)| (tx.txid(), script_pubkey.clone(), range))
.collect(),
refund: (
dlc.refund.0.txid(),
script_pubkey,
cfd.refund_timelock_in_blocks(),
),
},
}) })
.await?; .await?;

Loading…
Cancel
Save