Browse Source

Reduce code duplication in cfd actors

Extract shared handlers code into a common module
refactor/no-log-handler
Mariusz Klochowicz 3 years ago
parent
commit
3730cfae36
No known key found for this signature in database GPG Key ID: 470C865699C8D4D
  1. 28
      daemon/src/actors.rs
  2. 149
      daemon/src/cfd_actors.rs
  3. 1
      daemon/src/lib.rs
  4. 151
      daemon/src/maker_cfd.rs
  5. 145
      daemon/src/taker_cfd.rs

28
daemon/src/actors.rs

@ -1,10 +1,3 @@
use crate::{
db,
model::cfd::{Cfd, CfdState, OrderId},
};
use sqlx::{pool::PoolConnection, Sqlite};
use tokio::sync::watch;
/// Wrapper for handlers to log errors
#[macro_export]
macro_rules! log_error {
@ -14,24 +7,3 @@ macro_rules! log_error {
}
};
}
pub async fn insert_cfd(
cfd: Cfd,
conn: &mut PoolConnection<Sqlite>,
update_sender: &watch::Sender<Vec<Cfd>>,
) -> anyhow::Result<()> {
db::insert_cfd(cfd, conn).await?;
update_sender.send(db::load_all_cfds(conn).await?)?;
Ok(())
}
pub async fn insert_new_cfd_state_by_order_id(
order_id: OrderId,
new_state: &CfdState,
conn: &mut PoolConnection<Sqlite>,
update_sender: &watch::Sender<Vec<Cfd>>,
) -> anyhow::Result<()> {
db::insert_new_cfd_state_by_order_id(order_id, new_state, conn).await?;
update_sender.send(db::load_all_cfds(conn).await?)?;
Ok(())
}

149
daemon/src/cfd_actors.rs

@ -0,0 +1,149 @@
use crate::model::cfd::{Attestation, Cfd, CfdState, CfdStateChangeEvent, OrderId};
use crate::wallet::Wallet;
use crate::{db, monitor, oracle};
use anyhow::{bail, Result};
use sqlx::pool::PoolConnection;
use sqlx::Sqlite;
use tokio::sync::watch;
pub async fn insert_cfd(
cfd: Cfd,
conn: &mut PoolConnection<Sqlite>,
update_sender: &watch::Sender<Vec<Cfd>>,
) -> Result<()> {
db::insert_cfd(cfd, conn).await?;
update_sender.send(db::load_all_cfds(conn).await?)?;
Ok(())
}
pub async fn insert_new_cfd_state_by_order_id(
order_id: OrderId,
new_state: &CfdState,
conn: &mut PoolConnection<Sqlite>,
update_sender: &watch::Sender<Vec<Cfd>>,
) -> Result<()> {
db::insert_new_cfd_state_by_order_id(order_id, new_state, conn).await?;
update_sender.send(db::load_all_cfds(conn).await?)?;
Ok(())
}
pub async fn try_cet_publication(
cfd: &mut Cfd,
conn: &mut PoolConnection<Sqlite>,
wallet: &Wallet,
update_sender: &watch::Sender<Vec<Cfd>>,
) -> Result<()> {
match cfd.cet()? {
Ok(cet) => {
let txid = wallet.try_broadcast_transaction(cet).await?;
tracing::info!("CET published with txid {}", txid);
if cfd.handle(CfdStateChangeEvent::CetSent)?.is_none() {
bail!("If we can get the CET we should be able to transition")
}
insert_new_cfd_state_by_order_id(cfd.order.id, &cfd.state, conn, update_sender).await?;
}
Err(not_ready_yet) => {
tracing::debug!("{:#}", not_ready_yet);
return Ok(());
}
};
Ok(())
}
pub async fn handle_monitoring_event(
event: monitor::Event,
conn: &mut PoolConnection<Sqlite>,
wallet: &Wallet,
update_sender: &watch::Sender<Vec<Cfd>>,
) -> Result<()> {
let order_id = event.order_id();
let mut cfd = db::load_cfd_by_order_id(order_id, conn).await?;
if cfd.handle(CfdStateChangeEvent::Monitor(event))?.is_none() {
// early exit if there was not state change
// this is for cases where we are already in a final state
return Ok(());
}
insert_new_cfd_state_by_order_id(order_id, &cfd.state, conn, update_sender).await?;
if let CfdState::OpenCommitted { .. } = cfd.state {
try_cet_publication(&mut cfd, conn, wallet, update_sender).await?;
} else if let CfdState::MustRefund { .. } = cfd.state {
let signed_refund_tx = cfd.refund_tx()?;
let txid = wallet.try_broadcast_transaction(signed_refund_tx).await?;
tracing::info!("Refund transaction published on chain: {}", txid);
}
Ok(())
}
pub async fn handle_commit(
order_id: OrderId,
conn: &mut PoolConnection<Sqlite>,
wallet: &Wallet,
update_sender: &watch::Sender<Vec<Cfd>>,
) -> Result<()> {
let mut cfd = db::load_cfd_by_order_id(order_id, conn).await?;
let signed_commit_tx = cfd.commit_tx()?;
let txid = wallet.try_broadcast_transaction(signed_commit_tx).await?;
if cfd.handle(CfdStateChangeEvent::CommitTxSent)?.is_none() {
bail!("If we can get the commit tx we should be able to transition")
}
insert_new_cfd_state_by_order_id(cfd.order.id, &cfd.state, conn, update_sender).await?;
tracing::info!("Commit transaction published on chain: {}", txid);
Ok(())
}
pub async fn handle_oracle_attestation(
attestation: oracle::Attestation,
conn: &mut PoolConnection<Sqlite>,
wallet: &Wallet,
update_sender: &watch::Sender<Vec<Cfd>>,
) -> Result<()> {
tracing::debug!(
"Learnt latest oracle attestation for event: {}",
attestation.id
);
let mut cfds = db::load_cfds_by_oracle_event_id(attestation.id, conn).await?;
for (cfd, dlc) in cfds
.iter_mut()
.filter_map(|cfd| cfd.dlc().map(|dlc| (cfd, dlc)))
{
if cfd
.handle(CfdStateChangeEvent::OracleAttestation(Attestation::new(
attestation.id,
attestation.price,
attestation.scalars.clone(),
dlc,
cfd.role(),
)?))?
.is_none()
{
// if we don't transition to a new state after oracle attestation we ignore the cfd
// this is for cases where we cannot handle the attestation which should be in a
// final state
continue;
}
insert_new_cfd_state_by_order_id(cfd.order.id, &cfd.state, conn, update_sender).await?;
if let Err(e) = try_cet_publication(cfd, conn, wallet, update_sender).await {
tracing::error!("Error when trying to publish CET: {:#}", e);
continue;
}
}
Ok(())
}

1
daemon/src/lib.rs

@ -1,6 +1,7 @@
pub mod actors;
pub mod auth;
pub mod bitmex_price_feed;
pub mod cfd_actors;
pub mod db;
pub mod fan_out;
pub mod housekeeping;

151
daemon/src/maker_cfd.rs

@ -1,19 +1,16 @@
use crate::actors::insert_cfd;
use crate::actors::insert_new_cfd_state_by_order_id;
use crate::db::{
insert_order, load_cfd_by_order_id, load_cfds_by_oracle_event_id, load_order_by_id,
};
use crate::cfd_actors::{self, insert_cfd, insert_new_cfd_state_by_order_id};
use crate::db::{insert_order, load_cfd_by_order_id, load_order_by_id};
use crate::maker_inc_connections::TakerCommand;
use crate::model::cfd::{
Attestation, Cfd, CfdState, CfdStateChangeEvent, CfdStateCommon, CollaborativeSettlement, Dlc,
Order, OrderId, Origin, Role, RollOverProposal, SettlementKind, SettlementProposal,
UpdateCfdProposal, UpdateCfdProposals,
Cfd, CfdState, CfdStateChangeEvent, CfdStateCommon, CollaborativeSettlement, Dlc, Order,
OrderId, Origin, Role, RollOverProposal, SettlementKind, SettlementProposal, UpdateCfdProposal,
UpdateCfdProposals,
};
use crate::model::{TakerId, Usd};
use crate::monitor::MonitorParams;
use crate::wallet::Wallet;
use crate::{log_error, maker_inc_connections, monitor, oracle, setup_contract, wire};
use anyhow::{bail, Context as _, Result};
use anyhow::{Context as _, Result};
use async_trait::async_trait;
use bdk::bitcoin::secp256k1::schnorrsig;
use cfd_protocol::secp256k1_zkp::Signature;
@ -701,27 +698,13 @@ impl Actor {
async fn handle_commit(&mut self, order_id: OrderId) -> Result<()> {
let mut conn = self.db.acquire().await?;
let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
let signed_commit_tx = cfd.commit_tx()?;
let txid = self
.wallet
.try_broadcast_transaction(signed_commit_tx)
.await?;
tracing::info!("Commit transaction published on chain: {}", txid);
if let Some(new_state) = cfd.handle(CfdStateChangeEvent::CommitTxSent)? {
insert_new_cfd_state_by_order_id(
cfd.order.id,
&new_state,
&mut conn,
&self.cfd_feed_actor_inbox,
)
.await?;
}
cfd_actors::handle_commit(
order_id,
&mut conn,
&self.wallet,
&self.cfd_feed_actor_inbox,
)
.await?;
Ok(())
}
@ -879,114 +862,26 @@ impl Actor {
}
async fn handle_monitoring_event(&mut self, event: monitor::Event) -> Result<()> {
let order_id = event.order_id();
let mut conn = self.db.acquire().await?;
let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
if cfd.handle(CfdStateChangeEvent::Monitor(event))?.is_none() {
// early exit if there was not state change
// this is for cases where we are already in a final state
return Ok(());
}
insert_new_cfd_state_by_order_id(
order_id,
&cfd.state,
cfd_actors::handle_monitoring_event(
event,
&mut conn,
&self.wallet,
&self.cfd_feed_actor_inbox,
)
.await?;
// TODO: code duplication maker/taker
if let CfdState::OpenCommitted { .. } = cfd.state {
self.try_cet_publication(&mut cfd).await?;
} else if let CfdState::MustRefund { .. } = cfd.state {
let signed_refund_tx = cfd.refund_tx()?;
let txid = self
.wallet
.try_broadcast_transaction(signed_refund_tx)
.await?;
tracing::info!("Refund transaction published on chain: {}", txid);
}
Ok(())
}
async fn handle_oracle_attestation(&mut self, attestation: oracle::Attestation) -> Result<()> {
tracing::debug!(
"Learnt latest oracle attestation for event: {}",
attestation.id
);
let mut conn = self.db.acquire().await?;
let mut cfds = load_cfds_by_oracle_event_id(attestation.id, &mut conn).await?;
for (cfd, dlc) in cfds
.iter_mut()
.filter_map(|cfd| cfd.dlc().map(|dlc| (cfd, dlc)))
{
if cfd
.handle(CfdStateChangeEvent::OracleAttestation(Attestation::new(
attestation.id,
attestation.price,
attestation.scalars.clone(),
dlc,
cfd.role(),
)?))?
.is_none()
{
// if we don't transition to a new state after oracle attestation we ignore the cfd
// this is for cases where we cannot handle the attestation which should be in a
// final state
continue;
}
insert_new_cfd_state_by_order_id(
cfd.order.id,
&cfd.state,
&mut conn,
&self.cfd_feed_actor_inbox,
)
.await?;
if let Err(e) = self.try_cet_publication(cfd).await {
tracing::error!("Error when trying to publish CET: {:#}", e);
continue;
}
}
Ok(())
}
// TODO: code duplication maker/taker
async fn try_cet_publication(&mut self, cfd: &mut Cfd) -> Result<()> {
let mut conn = self.db.acquire().await?;
match cfd.cet()? {
Ok(cet) => {
let txid = self.wallet.try_broadcast_transaction(cet).await?;
tracing::info!("CET published with txid {}", txid);
if cfd.handle(CfdStateChangeEvent::CetSent)?.is_none() {
bail!("If we can get the CET we should be able to transition")
}
insert_new_cfd_state_by_order_id(
cfd.order.id,
&cfd.state,
&mut conn,
&self.cfd_feed_actor_inbox,
)
.await?;
}
Err(not_ready_yet) => {
tracing::debug!("{:#}", not_ready_yet);
return Ok(());
}
};
cfd_actors::handle_oracle_attestation(
attestation,
&mut conn,
&self.wallet,
&self.cfd_feed_actor_inbox,
)
.await?;
Ok(())
}
}

145
daemon/src/taker_cfd.rs

@ -1,19 +1,16 @@
use crate::actors::insert_cfd;
use crate::actors::insert_new_cfd_state_by_order_id;
use crate::db::{
insert_order, load_cfd_by_order_id, load_cfds_by_oracle_event_id, load_order_by_id,
};
use crate::cfd_actors::{self, insert_cfd, insert_new_cfd_state_by_order_id};
use crate::db::{insert_order, load_cfd_by_order_id, load_order_by_id};
use crate::model::cfd::{
Attestation, Cfd, CfdState, CfdStateChangeEvent, CfdStateCommon, CollaborativeSettlement, Dlc,
Order, OrderId, Origin, Role, RollOverProposal, SettlementKind, SettlementProposal,
UpdateCfdProposal, UpdateCfdProposals,
Cfd, CfdState, CfdStateChangeEvent, CfdStateCommon, CollaborativeSettlement, Dlc, Order,
OrderId, Origin, Role, RollOverProposal, SettlementKind, SettlementProposal, UpdateCfdProposal,
UpdateCfdProposals,
};
use crate::model::{BitMexPriceEventId, Usd};
use crate::monitor::{self, MonitorParams};
use crate::wallet::Wallet;
use crate::wire::{MakerToTaker, RollOverMsg, SetupMsg};
use crate::{log_error, oracle, send_to_socket, setup_contract, wire};
use anyhow::{bail, Context as _, Result};
use anyhow::{Context as _, Result};
use async_trait::async_trait;
use bdk::bitcoin::secp256k1::schnorrsig;
use futures::channel::mpsc;
@ -569,142 +566,38 @@ impl Actor {
}
async fn handle_monitoring_event(&mut self, event: monitor::Event) -> Result<()> {
let order_id = event.order_id();
let mut conn = self.db.acquire().await?;
let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
if cfd.handle(CfdStateChangeEvent::Monitor(event))?.is_none() {
// early exit if there was not state change
// this is for cases where we are already in a final state
return Ok(());
}
insert_new_cfd_state_by_order_id(
order_id,
&cfd.state,
cfd_actors::handle_monitoring_event(
event,
&mut conn,
&self.wallet,
&self.cfd_feed_actor_inbox,
)
.await?;
// TODO: code duplicateion maker/taker
if let CfdState::OpenCommitted { .. } = cfd.state {
self.try_cet_publication(&mut cfd).await?;
} else if let CfdState::MustRefund { .. } = cfd.state {
let signed_refund_tx = cfd.refund_tx()?;
let txid = self
.wallet
.try_broadcast_transaction(signed_refund_tx)
.await?;
tracing::info!("Refund transaction published on chain: {}", txid);
}
Ok(())
}
// TODO: code duplicateion maker/taker
async fn handle_commit(&mut self, order_id: OrderId) -> Result<()> {
let mut conn = self.db.acquire().await?;
let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
let signed_commit_tx = cfd.commit_tx()?;
let txid = self
.wallet
.try_broadcast_transaction(signed_commit_tx)
.await?;
if cfd.handle(CfdStateChangeEvent::CommitTxSent)?.is_none() {
bail!("If we can get the commit tx we should be able to transition")
}
insert_new_cfd_state_by_order_id(
cfd.order.id,
&cfd.state,
cfd_actors::handle_commit(
order_id,
&mut conn,
&self.wallet,
&self.cfd_feed_actor_inbox,
)
.await?;
tracing::info!("Commit transaction published on chain: {}", txid);
Ok(())
}
async fn handle_oracle_attestation(&mut self, attestation: oracle::Attestation) -> Result<()> {
tracing::debug!(
"Learnt latest oracle attestation for event: {}",
attestation.id
);
let mut conn = self.db.acquire().await?;
let mut cfds = load_cfds_by_oracle_event_id(attestation.id, &mut conn).await?;
for (cfd, dlc) in cfds
.iter_mut()
.filter_map(|cfd| cfd.dlc().map(|dlc| (cfd, dlc)))
{
if cfd
.handle(CfdStateChangeEvent::OracleAttestation(Attestation::new(
attestation.id,
attestation.price,
attestation.scalars.clone(),
dlc,
cfd.role(),
)?))?
.is_none()
{
// if we don't transition to a new state after oracle attestation we ignore the cfd
// this is for cases where we cannot handle the attestation which should be in a
// final state
continue;
}
insert_new_cfd_state_by_order_id(
cfd.order.id,
&cfd.state,
&mut conn,
&self.cfd_feed_actor_inbox,
)
.await?;
if let Err(e) = self.try_cet_publication(cfd).await {
tracing::error!("Error when trying to publish CET: {:#}", e);
continue;
}
}
Ok(())
}
// TODO: code duplication maker/taker
async fn try_cet_publication(&mut self, cfd: &mut Cfd) -> Result<()> {
let mut conn = self.db.acquire().await?;
match cfd.cet()? {
Ok(cet) => {
let txid = self.wallet.try_broadcast_transaction(cet).await?;
tracing::info!("CET published with txid {}", txid);
if cfd.handle(CfdStateChangeEvent::CetSent)?.is_none() {
bail!("If we can get the CET we should be able to transition")
}
insert_new_cfd_state_by_order_id(
cfd.order.id,
&cfd.state,
&mut conn,
&self.cfd_feed_actor_inbox,
)
.await?;
}
Err(not_ready_yet) => {
tracing::debug!("{:#}", not_ready_yet);
return Ok(());
}
};
cfd_actors::handle_oracle_attestation(
attestation,
&mut conn,
&self.wallet,
&self.cfd_feed_actor_inbox,
)
.await?;
Ok(())
}
}

Loading…
Cancel
Save