Browse Source

Create wrapper functions to avoid the need for manually refreshing feed

Inserting cfd or updating its state requires a follow-up update on the Cfd feed
sent to the UI. Encapsulate the behaviour in functions that are shared
across Cfd actors.
refactor/no-log-handler
Mariusz Klochowicz 3 years ago
parent
commit
6779486f76
No known key found for this signature in database GPG Key ID: 470C865699C8D4D
  1. 28
      daemon/src/actors.rs
  2. 1
      daemon/src/db.rs
  3. 80
      daemon/src/maker_cfd.rs
  4. 78
      daemon/src/taker_cfd.rs

28
daemon/src/actors.rs

@ -1,3 +1,10 @@
use crate::{
db,
model::cfd::{Cfd, CfdState, OrderId},
};
use sqlx::{pool::PoolConnection, Sqlite};
use tokio::sync::watch;
/// Wrapper for handlers to log errors
#[macro_export]
macro_rules! log_error {
@ -7,3 +14,24 @@ macro_rules! log_error {
}
};
}
pub async fn insert_cfd(
cfd: Cfd,
conn: &mut PoolConnection<Sqlite>,
update_sender: &watch::Sender<Vec<Cfd>>,
) -> anyhow::Result<()> {
db::insert_cfd(cfd, conn).await?;
update_sender.send(db::load_all_cfds(conn).await?)?;
Ok(())
}
pub async fn insert_new_cfd_state_by_order_id(
order_id: OrderId,
new_state: &CfdState,
conn: &mut PoolConnection<Sqlite>,
update_sender: &watch::Sender<Vec<Cfd>>,
) -> anyhow::Result<()> {
db::insert_new_cfd_state_by_order_id(order_id, new_state, conn).await?;
update_sender.send(db::load_all_cfds(conn).await?)?;
Ok(())
}

1
daemon/src/db.rs

@ -148,7 +148,6 @@ pub async fn insert_cfd(cfd: Cfd, conn: &mut PoolConnection<Sqlite>) -> anyhow::
Ok(())
}
#[allow(dead_code)]
pub async fn insert_new_cfd_state_by_order_id(
order_id: OrderId,
new_state: &CfdState,

80
daemon/src/maker_cfd.rs

@ -1,6 +1,7 @@
use crate::actors::insert_cfd;
use crate::actors::insert_new_cfd_state_by_order_id;
use crate::db::{
insert_cfd, insert_new_cfd_state_by_order_id, insert_order, load_all_cfds,
load_cfd_by_order_id, load_cfds_by_oracle_event_id, load_order_by_id,
insert_order, load_cfd_by_order_id, load_cfds_by_oracle_event_id, load_order_by_id,
};
use crate::maker_inc_connections::TakerCommand;
use crate::model::cfd::{
@ -310,9 +311,13 @@ impl Actor {
proposal.price,
),
))?;
insert_new_cfd_state_by_order_id(cfd.order.id, &cfd.state, &mut conn).await?;
self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
insert_new_cfd_state_by_order_id(
cfd.order.id,
&cfd.state,
&mut conn,
&self.cfd_feed_actor_inbox,
)
.await?;
let spend_tx = dlc.finalize_spend_transaction((tx, sig_maker), sig_taker)?;
@ -429,12 +434,10 @@ impl Actor {
attestation: None,
},
&mut conn,
&self.cfd_feed_actor_inbox,
)
.await?;
self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
let txid = self
.wallet
.try_broadcast_transaction(dlc.lock.0.clone())
@ -480,12 +483,10 @@ impl Actor {
collaborative_close: None,
},
&mut conn,
&self.cfd_feed_actor_inbox,
)
.await?;
self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
let cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
self.monitor_actor
@ -547,10 +548,7 @@ impl Actor {
taker_id,
},
);
insert_cfd(cfd, &mut conn).await?;
self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
insert_cfd(cfd, &mut conn, &self.cfd_feed_actor_inbox).await?;
// 4. Remove current order
self.current_order_id = None;
@ -604,12 +602,10 @@ impl Actor {
},
},
&mut conn,
&self.cfd_feed_actor_inbox,
)
.await?;
self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
// 4. Notify the taker that we are ready for contract setup
// Use `.send` here to ensure we only continue once the message has been sent
// Nothing done after this call should be able to fail, otherwise we notified the taker, but
@ -682,9 +678,9 @@ impl Actor {
},
},
&mut conn,
&self.cfd_feed_actor_inbox,
)
.await
.unwrap();
.await?;
self.takers
.do_send_async(maker_inc_connections::TakerMessage {
@ -692,8 +688,6 @@ impl Actor {
command: TakerCommand::NotifyOrderRejected { id: order_id },
})
.await?;
self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
// Remove order for all
self.current_order_id = None;
@ -719,11 +713,15 @@ impl Actor {
tracing::info!("Commit transaction published on chain: {}", txid);
if let Some(new_state) = cfd.handle(CfdStateChangeEvent::CommitTxSent)? {
insert_new_cfd_state_by_order_id(cfd.order.id, &new_state, &mut conn).await?;
insert_new_cfd_state_by_order_id(
cfd.order.id,
&new_state,
&mut conn,
&self.cfd_feed_actor_inbox,
)
.await?;
}
self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
Ok(())
}
@ -892,10 +890,13 @@ impl Actor {
return Ok(());
}
insert_new_cfd_state_by_order_id(order_id, &cfd.state, &mut conn).await?;
self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
insert_new_cfd_state_by_order_id(
order_id,
&cfd.state,
&mut conn,
&self.cfd_feed_actor_inbox,
)
.await?;
// TODO: code duplication maker/taker
if let CfdState::OpenCommitted { .. } = cfd.state {
@ -942,9 +943,13 @@ impl Actor {
continue;
}
insert_new_cfd_state_by_order_id(cfd.order.id, &cfd.state, &mut conn).await?;
self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
insert_new_cfd_state_by_order_id(
cfd.order.id,
&cfd.state,
&mut conn,
&self.cfd_feed_actor_inbox,
)
.await?;
if let Err(e) = self.try_cet_publication(cfd).await {
tracing::error!("Error when trying to publish CET: {:#}", e);
@ -968,10 +973,13 @@ impl Actor {
bail!("If we can get the CET we should be able to transition")
}
insert_new_cfd_state_by_order_id(cfd.order.id, &cfd.state, &mut conn).await?;
self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
insert_new_cfd_state_by_order_id(
cfd.order.id,
&cfd.state,
&mut conn,
&self.cfd_feed_actor_inbox,
)
.await?;
}
Err(not_ready_yet) => {
tracing::debug!("{:#}", not_ready_yet);

78
daemon/src/taker_cfd.rs

@ -1,6 +1,7 @@
use crate::actors::insert_cfd;
use crate::actors::insert_new_cfd_state_by_order_id;
use crate::db::{
insert_cfd, insert_new_cfd_state_by_order_id, insert_order, load_all_cfds,
load_cfd_by_order_id, load_cfds_by_oracle_event_id, load_order_by_id,
insert_order, load_cfd_by_order_id, load_cfds_by_oracle_event_id, load_order_by_id,
};
use crate::model::cfd::{
Attestation, Cfd, CfdState, CfdStateChangeEvent, CfdStateCommon, CollaborativeSettlement, Dlc,
@ -158,10 +159,8 @@ impl Actor {
},
);
insert_cfd(cfd, &mut conn).await?;
insert_cfd(cfd, &mut conn, &self.cfd_feed_actor_inbox).await?;
self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
self.send_to_maker
.do_send_async(wire::TakerToMaker::TakeOrder { order_id, quantity })
.await?;
@ -280,11 +279,10 @@ impl Actor {
},
},
&mut conn,
&self.cfd_feed_actor_inbox,
)
.await?;
self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
let cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
let offer_announcement = self
@ -339,12 +337,10 @@ impl Actor {
},
},
&mut conn,
&self.cfd_feed_actor_inbox,
)
.await?;
self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
Ok(())
}
@ -373,10 +369,13 @@ impl Actor {
cfd.handle(CfdStateChangeEvent::ProposalSigned(
CollaborativeSettlement::new(tx, dlc.script_pubkey_for(cfd.role()), proposal.price),
))?;
insert_new_cfd_state_by_order_id(cfd.order.id, &cfd.state, &mut conn).await?;
self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
insert_new_cfd_state_by_order_id(
cfd.order.id,
&cfd.state,
&mut conn,
&self.cfd_feed_actor_inbox,
)
.await?;
self.remove_pending_proposal(&order_id)?;
@ -504,12 +503,10 @@ impl Actor {
attestation: None,
},
&mut conn,
&self.cfd_feed_actor_inbox,
)
.await?;
self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
let txid = self
.wallet
.try_broadcast_transaction(dlc.lock.0.clone())
@ -555,12 +552,10 @@ impl Actor {
collaborative_close: None,
},
&mut conn,
&self.cfd_feed_actor_inbox,
)
.await?;
self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
let cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
self.monitor_actor
@ -585,10 +580,13 @@ impl Actor {
return Ok(());
}
insert_new_cfd_state_by_order_id(order_id, &cfd.state, &mut conn).await?;
self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
insert_new_cfd_state_by_order_id(
order_id,
&cfd.state,
&mut conn,
&self.cfd_feed_actor_inbox,
)
.await?;
// TODO: code duplicateion maker/taker
if let CfdState::OpenCommitted { .. } = cfd.state {
@ -622,10 +620,13 @@ impl Actor {
bail!("If we can get the commit tx we should be able to transition")
}
insert_new_cfd_state_by_order_id(cfd.order.id, &cfd.state, &mut conn).await?;
self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
insert_new_cfd_state_by_order_id(
cfd.order.id,
&cfd.state,
&mut conn,
&self.cfd_feed_actor_inbox,
)
.await?;
tracing::info!("Commit transaction published on chain: {}", txid);
Ok(())
@ -660,9 +661,13 @@ impl Actor {
continue;
}
insert_new_cfd_state_by_order_id(cfd.order.id, &cfd.state, &mut conn).await?;
self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
insert_new_cfd_state_by_order_id(
cfd.order.id,
&cfd.state,
&mut conn,
&self.cfd_feed_actor_inbox,
)
.await?;
if let Err(e) = self.try_cet_publication(cfd).await {
tracing::error!("Error when trying to publish CET: {:#}", e);
@ -686,10 +691,13 @@ impl Actor {
bail!("If we can get the CET we should be able to transition")
}
insert_new_cfd_state_by_order_id(cfd.order.id, &cfd.state, &mut conn).await?;
self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
insert_new_cfd_state_by_order_id(
cfd.order.id,
&cfd.state,
&mut conn,
&self.cfd_feed_actor_inbox,
)
.await?;
}
Err(not_ready_yet) => {
tracing::debug!("{:#}", not_ready_yet);

Loading…
Cancel
Save