Browse Source

Merge #322

322: Cleanup in Cfd actors r=klochowicz a=klochowicz

I started out trying to fix a common gotcha (forgetting to refresh Cfd feed),
and then realised that I was not far off of addressing a bunch of TODOs in the code

Co-authored-by: Mariusz Klochowicz <mariusz@klochowicz.com>
refactor/no-log-handler
bors[bot] 3 years ago
committed by GitHub
parent
commit
e38aecc097
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 149
      daemon/src/cfd_actors.rs
  2. 1
      daemon/src/db.rs
  3. 1
      daemon/src/lib.rs
  4. 173
      daemon/src/maker_cfd.rs
  5. 1
      daemon/src/model/cfd.rs
  6. 173
      daemon/src/taker_cfd.rs

149
daemon/src/cfd_actors.rs

@ -0,0 +1,149 @@
use crate::model::cfd::{Attestation, Cfd, CfdState, CfdStateChangeEvent, OrderId};
use crate::wallet::Wallet;
use crate::{db, monitor, oracle};
use anyhow::{bail, Result};
use sqlx::pool::PoolConnection;
use sqlx::Sqlite;
use tokio::sync::watch;
pub async fn insert_cfd(
cfd: Cfd,
conn: &mut PoolConnection<Sqlite>,
update_sender: &watch::Sender<Vec<Cfd>>,
) -> Result<()> {
db::insert_cfd(cfd, conn).await?;
update_sender.send(db::load_all_cfds(conn).await?)?;
Ok(())
}
pub async fn insert_new_cfd_state_by_order_id(
order_id: OrderId,
new_state: &CfdState,
conn: &mut PoolConnection<Sqlite>,
update_sender: &watch::Sender<Vec<Cfd>>,
) -> Result<()> {
db::insert_new_cfd_state_by_order_id(order_id, new_state, conn).await?;
update_sender.send(db::load_all_cfds(conn).await?)?;
Ok(())
}
pub async fn try_cet_publication(
cfd: &mut Cfd,
conn: &mut PoolConnection<Sqlite>,
wallet: &Wallet,
update_sender: &watch::Sender<Vec<Cfd>>,
) -> Result<()> {
match cfd.cet()? {
Ok(cet) => {
let txid = wallet.try_broadcast_transaction(cet).await?;
tracing::info!("CET published with txid {}", txid);
if cfd.handle(CfdStateChangeEvent::CetSent)?.is_none() {
bail!("If we can get the CET we should be able to transition")
}
insert_new_cfd_state_by_order_id(cfd.order.id, &cfd.state, conn, update_sender).await?;
}
Err(not_ready_yet) => {
tracing::debug!("{:#}", not_ready_yet);
return Ok(());
}
};
Ok(())
}
pub async fn handle_monitoring_event(
event: monitor::Event,
conn: &mut PoolConnection<Sqlite>,
wallet: &Wallet,
update_sender: &watch::Sender<Vec<Cfd>>,
) -> Result<()> {
let order_id = event.order_id();
let mut cfd = db::load_cfd_by_order_id(order_id, conn).await?;
if cfd.handle(CfdStateChangeEvent::Monitor(event))?.is_none() {
// early exit if there was not state change
// this is for cases where we are already in a final state
return Ok(());
}
insert_new_cfd_state_by_order_id(order_id, &cfd.state, conn, update_sender).await?;
if let CfdState::OpenCommitted { .. } = cfd.state {
try_cet_publication(&mut cfd, conn, wallet, update_sender).await?;
} else if let CfdState::MustRefund { .. } = cfd.state {
let signed_refund_tx = cfd.refund_tx()?;
let txid = wallet.try_broadcast_transaction(signed_refund_tx).await?;
tracing::info!("Refund transaction published on chain: {}", txid);
}
Ok(())
}
pub async fn handle_commit(
order_id: OrderId,
conn: &mut PoolConnection<Sqlite>,
wallet: &Wallet,
update_sender: &watch::Sender<Vec<Cfd>>,
) -> Result<()> {
let mut cfd = db::load_cfd_by_order_id(order_id, conn).await?;
let signed_commit_tx = cfd.commit_tx()?;
let txid = wallet.try_broadcast_transaction(signed_commit_tx).await?;
if cfd.handle(CfdStateChangeEvent::CommitTxSent)?.is_none() {
bail!("If we can get the commit tx we should be able to transition")
}
insert_new_cfd_state_by_order_id(cfd.order.id, &cfd.state, conn, update_sender).await?;
tracing::info!("Commit transaction published on chain: {}", txid);
Ok(())
}
pub async fn handle_oracle_attestation(
attestation: oracle::Attestation,
conn: &mut PoolConnection<Sqlite>,
wallet: &Wallet,
update_sender: &watch::Sender<Vec<Cfd>>,
) -> Result<()> {
tracing::debug!(
"Learnt latest oracle attestation for event: {}",
attestation.id
);
let mut cfds = db::load_cfds_by_oracle_event_id(attestation.id, conn).await?;
for (cfd, dlc) in cfds
.iter_mut()
.filter_map(|cfd| cfd.dlc().map(|dlc| (cfd, dlc)))
{
if cfd
.handle(CfdStateChangeEvent::OracleAttestation(Attestation::new(
attestation.id,
attestation.price,
attestation.scalars.clone(),
dlc,
cfd.role(),
)?))?
.is_none()
{
// if we don't transition to a new state after oracle attestation we ignore the cfd
// this is for cases where we cannot handle the attestation which should be in a
// final state
continue;
}
insert_new_cfd_state_by_order_id(cfd.order.id, &cfd.state, conn, update_sender).await?;
if let Err(e) = try_cet_publication(cfd, conn, wallet, update_sender).await {
tracing::error!("Error when trying to publish CET: {:#}", e);
continue;
}
}
Ok(())
}

1
daemon/src/db.rs

@ -148,7 +148,6 @@ pub async fn insert_cfd(cfd: Cfd, conn: &mut PoolConnection<Sqlite>) -> anyhow::
Ok(())
}
#[allow(dead_code)]
pub async fn insert_new_cfd_state_by_order_id(
order_id: OrderId,
new_state: &CfdState,

1
daemon/src/lib.rs

@ -1,6 +1,7 @@
pub mod actors;
pub mod auth;
pub mod bitmex_price_feed;
pub mod cfd_actors;
pub mod db;
pub mod fan_out;
pub mod housekeeping;

173
daemon/src/maker_cfd.rs

@ -1,18 +1,16 @@
use crate::db::{
insert_cfd, insert_new_cfd_state_by_order_id, insert_order, load_all_cfds,
load_cfd_by_order_id, load_cfds_by_oracle_event_id, load_order_by_id,
};
use crate::cfd_actors::{self, insert_cfd, insert_new_cfd_state_by_order_id};
use crate::db::{insert_order, load_cfd_by_order_id, load_order_by_id};
use crate::maker_inc_connections::TakerCommand;
use crate::model::cfd::{
Attestation, Cfd, CfdState, CfdStateChangeEvent, CfdStateCommon, CollaborativeSettlement, Dlc,
Order, OrderId, Origin, Role, RollOverProposal, SettlementKind, SettlementProposal,
UpdateCfdProposal, UpdateCfdProposals,
Cfd, CfdState, CfdStateChangeEvent, CfdStateCommon, CollaborativeSettlement, Dlc, Order,
OrderId, Origin, Role, RollOverProposal, SettlementKind, SettlementProposal, UpdateCfdProposal,
UpdateCfdProposals,
};
use crate::model::{TakerId, Usd};
use crate::monitor::MonitorParams;
use crate::wallet::Wallet;
use crate::{log_error, maker_inc_connections, monitor, oracle, setup_contract, wire};
use anyhow::{bail, Context as _, Result};
use anyhow::{Context as _, Result};
use async_trait::async_trait;
use bdk::bitcoin::secp256k1::schnorrsig;
use cfd_protocol::secp256k1_zkp::Signature;
@ -310,9 +308,13 @@ impl Actor {
proposal.price,
),
))?;
insert_new_cfd_state_by_order_id(cfd.order.id, &cfd.state, &mut conn).await?;
self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
insert_new_cfd_state_by_order_id(
cfd.order.id,
&cfd.state,
&mut conn,
&self.cfd_feed_actor_inbox,
)
.await?;
let spend_tx = dlc.finalize_spend_transaction((tx, sig_maker), sig_taker)?;
@ -429,12 +431,10 @@ impl Actor {
attestation: None,
},
&mut conn,
&self.cfd_feed_actor_inbox,
)
.await?;
self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
let txid = self
.wallet
.try_broadcast_transaction(dlc.lock.0.clone())
@ -480,12 +480,10 @@ impl Actor {
collaborative_close: None,
},
&mut conn,
&self.cfd_feed_actor_inbox,
)
.await?;
self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
let cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
self.monitor_actor
@ -547,10 +545,7 @@ impl Actor {
taker_id,
},
);
insert_cfd(cfd, &mut conn).await?;
self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
insert_cfd(cfd, &mut conn, &self.cfd_feed_actor_inbox).await?;
// 4. Remove current order
self.current_order_id = None;
@ -604,12 +599,10 @@ impl Actor {
},
},
&mut conn,
&self.cfd_feed_actor_inbox,
)
.await?;
self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
// 4. Notify the taker that we are ready for contract setup
// Use `.send` here to ensure we only continue once the message has been sent
// Nothing done after this call should be able to fail, otherwise we notified the taker, but
@ -682,9 +675,9 @@ impl Actor {
},
},
&mut conn,
&self.cfd_feed_actor_inbox,
)
.await
.unwrap();
.await?;
self.takers
.do_send_async(maker_inc_connections::TakerMessage {
@ -692,8 +685,6 @@ impl Actor {
command: TakerCommand::NotifyOrderRejected { id: order_id },
})
.await?;
self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
// Remove order for all
self.current_order_id = None;
@ -707,23 +698,13 @@ impl Actor {
async fn handle_commit(&mut self, order_id: OrderId) -> Result<()> {
let mut conn = self.db.acquire().await?;
let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
let signed_commit_tx = cfd.commit_tx()?;
let txid = self
.wallet
.try_broadcast_transaction(signed_commit_tx)
cfd_actors::handle_commit(
order_id,
&mut conn,
&self.wallet,
&self.cfd_feed_actor_inbox,
)
.await?;
tracing::info!("Commit transaction published on chain: {}", txid);
if let Some(new_state) = cfd.handle(CfdStateChangeEvent::CommitTxSent)? {
insert_new_cfd_state_by_order_id(cfd.order.id, &new_state, &mut conn).await?;
}
self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
Ok(())
}
@ -881,104 +862,26 @@ impl Actor {
}
async fn handle_monitoring_event(&mut self, event: monitor::Event) -> Result<()> {
let order_id = event.order_id();
let mut conn = self.db.acquire().await?;
let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
if cfd.handle(CfdStateChangeEvent::Monitor(event))?.is_none() {
// early exit if there was not state change
// this is for cases where we are already in a final state
return Ok(());
}
insert_new_cfd_state_by_order_id(order_id, &cfd.state, &mut conn).await?;
self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
// TODO: code duplication maker/taker
if let CfdState::OpenCommitted { .. } = cfd.state {
self.try_cet_publication(&mut cfd).await?;
} else if let CfdState::MustRefund { .. } = cfd.state {
let signed_refund_tx = cfd.refund_tx()?;
let txid = self
.wallet
.try_broadcast_transaction(signed_refund_tx)
cfd_actors::handle_monitoring_event(
event,
&mut conn,
&self.wallet,
&self.cfd_feed_actor_inbox,
)
.await?;
tracing::info!("Refund transaction published on chain: {}", txid);
}
Ok(())
}
async fn handle_oracle_attestation(&mut self, attestation: oracle::Attestation) -> Result<()> {
tracing::debug!(
"Learnt latest oracle attestation for event: {}",
attestation.id
);
let mut conn = self.db.acquire().await?;
let mut cfds = load_cfds_by_oracle_event_id(attestation.id, &mut conn).await?;
for (cfd, dlc) in cfds
.iter_mut()
.filter_map(|cfd| cfd.dlc().map(|dlc| (cfd, dlc)))
{
if cfd
.handle(CfdStateChangeEvent::OracleAttestation(Attestation::new(
attestation.id,
attestation.price,
attestation.scalars.clone(),
dlc,
cfd.role(),
)?))?
.is_none()
{
// if we don't transition to a new state after oracle attestation we ignore the cfd
// this is for cases where we cannot handle the attestation which should be in a
// final state
continue;
}
insert_new_cfd_state_by_order_id(cfd.order.id, &cfd.state, &mut conn).await?;
self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
if let Err(e) = self.try_cet_publication(cfd).await {
tracing::error!("Error when trying to publish CET: {:#}", e);
continue;
}
}
Ok(())
}
// TODO: code duplication maker/taker
async fn try_cet_publication(&mut self, cfd: &mut Cfd) -> Result<()> {
let mut conn = self.db.acquire().await?;
match cfd.cet()? {
Ok(cet) => {
let txid = self.wallet.try_broadcast_transaction(cet).await?;
tracing::info!("CET published with txid {}", txid);
if cfd.handle(CfdStateChangeEvent::CetSent)?.is_none() {
bail!("If we can get the CET we should be able to transition")
}
insert_new_cfd_state_by_order_id(cfd.order.id, &cfd.state, &mut conn).await?;
self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
}
Err(not_ready_yet) => {
tracing::debug!("{:#}", not_ready_yet);
return Ok(());
}
};
cfd_actors::handle_oracle_attestation(
attestation,
&mut conn,
&self.wallet,
&self.cfd_feed_actor_inbox,
)
.await?;
Ok(())
}
}

1
daemon/src/model/cfd.rs

@ -262,7 +262,6 @@ pub enum CfdState {
/// commit + cet).
Closed {
common: CfdStateCommon,
// TODO: Use an enum of either Attestation or CollaborativeSettlement
payout: Payout,
},

173
daemon/src/taker_cfd.rs

@ -1,18 +1,16 @@
use crate::db::{
insert_cfd, insert_new_cfd_state_by_order_id, insert_order, load_all_cfds,
load_cfd_by_order_id, load_cfds_by_oracle_event_id, load_order_by_id,
};
use crate::cfd_actors::{self, insert_cfd, insert_new_cfd_state_by_order_id};
use crate::db::{insert_order, load_cfd_by_order_id, load_order_by_id};
use crate::model::cfd::{
Attestation, Cfd, CfdState, CfdStateChangeEvent, CfdStateCommon, CollaborativeSettlement, Dlc,
Order, OrderId, Origin, Role, RollOverProposal, SettlementKind, SettlementProposal,
UpdateCfdProposal, UpdateCfdProposals,
Cfd, CfdState, CfdStateChangeEvent, CfdStateCommon, CollaborativeSettlement, Dlc, Order,
OrderId, Origin, Role, RollOverProposal, SettlementKind, SettlementProposal, UpdateCfdProposal,
UpdateCfdProposals,
};
use crate::model::{BitMexPriceEventId, Usd};
use crate::monitor::{self, MonitorParams};
use crate::wallet::Wallet;
use crate::wire::{MakerToTaker, RollOverMsg, SetupMsg};
use crate::{log_error, oracle, send_to_socket, setup_contract, wire};
use anyhow::{bail, Context as _, Result};
use anyhow::{Context as _, Result};
use async_trait::async_trait;
use bdk::bitcoin::secp256k1::schnorrsig;
use futures::channel::mpsc;
@ -158,10 +156,8 @@ impl Actor {
},
);
insert_cfd(cfd, &mut conn).await?;
insert_cfd(cfd, &mut conn, &self.cfd_feed_actor_inbox).await?;
self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
self.send_to_maker
.do_send_async(wire::TakerToMaker::TakeOrder { order_id, quantity })
.await?;
@ -280,11 +276,10 @@ impl Actor {
},
},
&mut conn,
&self.cfd_feed_actor_inbox,
)
.await?;
self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
let cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
let offer_announcement = self
@ -339,12 +334,10 @@ impl Actor {
},
},
&mut conn,
&self.cfd_feed_actor_inbox,
)
.await?;
self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
Ok(())
}
@ -373,10 +366,13 @@ impl Actor {
cfd.handle(CfdStateChangeEvent::ProposalSigned(
CollaborativeSettlement::new(tx, dlc.script_pubkey_for(cfd.role()), proposal.price),
))?;
insert_new_cfd_state_by_order_id(cfd.order.id, &cfd.state, &mut conn).await?;
self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
insert_new_cfd_state_by_order_id(
cfd.order.id,
&cfd.state,
&mut conn,
&self.cfd_feed_actor_inbox,
)
.await?;
self.remove_pending_proposal(&order_id)?;
@ -504,12 +500,10 @@ impl Actor {
attestation: None,
},
&mut conn,
&self.cfd_feed_actor_inbox,
)
.await?;
self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
let txid = self
.wallet
.try_broadcast_transaction(dlc.lock.0.clone())
@ -555,12 +549,10 @@ impl Actor {
collaborative_close: None,
},
&mut conn,
&self.cfd_feed_actor_inbox,
)
.await?;
self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
let cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
self.monitor_actor
@ -574,129 +566,38 @@ impl Actor {
}
async fn handle_monitoring_event(&mut self, event: monitor::Event) -> Result<()> {
let order_id = event.order_id();
let mut conn = self.db.acquire().await?;
let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
if cfd.handle(CfdStateChangeEvent::Monitor(event))?.is_none() {
// early exit if there was not state change
// this is for cases where we are already in a final state
return Ok(());
}
insert_new_cfd_state_by_order_id(order_id, &cfd.state, &mut conn).await?;
self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
// TODO: code duplicateion maker/taker
if let CfdState::OpenCommitted { .. } = cfd.state {
self.try_cet_publication(&mut cfd).await?;
} else if let CfdState::MustRefund { .. } = cfd.state {
let signed_refund_tx = cfd.refund_tx()?;
let txid = self
.wallet
.try_broadcast_transaction(signed_refund_tx)
cfd_actors::handle_monitoring_event(
event,
&mut conn,
&self.wallet,
&self.cfd_feed_actor_inbox,
)
.await?;
tracing::info!("Refund transaction published on chain: {}", txid);
}
Ok(())
}
// TODO: code duplicateion maker/taker
async fn handle_commit(&mut self, order_id: OrderId) -> Result<()> {
let mut conn = self.db.acquire().await?;
let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
let signed_commit_tx = cfd.commit_tx()?;
let txid = self
.wallet
.try_broadcast_transaction(signed_commit_tx)
cfd_actors::handle_commit(
order_id,
&mut conn,
&self.wallet,
&self.cfd_feed_actor_inbox,
)
.await?;
if cfd.handle(CfdStateChangeEvent::CommitTxSent)?.is_none() {
bail!("If we can get the commit tx we should be able to transition")
}
insert_new_cfd_state_by_order_id(cfd.order.id, &cfd.state, &mut conn).await?;
self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
tracing::info!("Commit transaction published on chain: {}", txid);
Ok(())
}
async fn handle_oracle_attestation(&mut self, attestation: oracle::Attestation) -> Result<()> {
tracing::debug!(
"Learnt latest oracle attestation for event: {}",
attestation.id
);
let mut conn = self.db.acquire().await?;
let mut cfds = load_cfds_by_oracle_event_id(attestation.id, &mut conn).await?;
for (cfd, dlc) in cfds
.iter_mut()
.filter_map(|cfd| cfd.dlc().map(|dlc| (cfd, dlc)))
{
if cfd
.handle(CfdStateChangeEvent::OracleAttestation(Attestation::new(
attestation.id,
attestation.price,
attestation.scalars.clone(),
dlc,
cfd.role(),
)?))?
.is_none()
{
// if we don't transition to a new state after oracle attestation we ignore the cfd
// this is for cases where we cannot handle the attestation which should be in a
// final state
continue;
}
insert_new_cfd_state_by_order_id(cfd.order.id, &cfd.state, &mut conn).await?;
self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
if let Err(e) = self.try_cet_publication(cfd).await {
tracing::error!("Error when trying to publish CET: {:#}", e);
continue;
}
}
Ok(())
}
// TODO: code duplication maker/taker
async fn try_cet_publication(&mut self, cfd: &mut Cfd) -> Result<()> {
let mut conn = self.db.acquire().await?;
match cfd.cet()? {
Ok(cet) => {
let txid = self.wallet.try_broadcast_transaction(cet).await?;
tracing::info!("CET published with txid {}", txid);
if cfd.handle(CfdStateChangeEvent::CetSent)?.is_none() {
bail!("If we can get the CET we should be able to transition")
}
insert_new_cfd_state_by_order_id(cfd.order.id, &cfd.state, &mut conn).await?;
self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
}
Err(not_ready_yet) => {
tracing::debug!("{:#}", not_ready_yet);
return Ok(());
}
};
cfd_actors::handle_oracle_attestation(
attestation,
&mut conn,
&self.wallet,
&self.cfd_feed_actor_inbox,
)
.await?;
Ok(())
}
}

Loading…
Cancel
Save