diff --git a/clippy.toml b/clippy.toml new file mode 100644 index 0000000..32bfb77 --- /dev/null +++ b/clippy.toml @@ -0,0 +1,4 @@ +disallowed-methods = [ + "xtra::Address::do_send_async", # discards the return value, possibly swallowing an error + "xtra::Address::do_send", # discards the return value, possibly swallowing an error +] diff --git a/daemon/src/lib.rs b/daemon/src/lib.rs index 716db47..7530aec 100644 --- a/daemon/src/lib.rs +++ b/daemon/src/lib.rs @@ -1,4 +1,6 @@ #![cfg_attr(not(test), warn(clippy::unwrap_used))] +#![warn(clippy::disallowed_method)] + use crate::db::load_all_cfds; use crate::maker_cfd::{FromTaker, NewTakerOnline}; use crate::model::cfd::{Cfd, Order, UpdateCfdProposals}; @@ -170,7 +172,7 @@ where .spawn_with_handle(), ); - oracle_addr.do_send_async(oracle::Sync).await?; + oracle_addr.send(oracle::Sync).await?; tracing::debug!("Maker actor system ready"); diff --git a/daemon/src/maker_cfd.rs b/daemon/src/maker_cfd.rs index e3f7cd4..4f3afd2 100644 --- a/daemon/src/maker_cfd.rs +++ b/daemon/src/maker_cfd.rs @@ -329,6 +329,9 @@ where None => None, }; + // Need to use `do_send_async` here because we are being invoked from the + // `maker_inc_connections::Actor`. Using `send` would result in a deadlock. + #[allow(clippy::disallowed_method)] self.takers .do_send_async(maker_inc_connections::TakerMessage { taker_id, @@ -346,17 +349,27 @@ where let taker_id = self.get_taker_id_of_proposal(&order_id)?; - self.takers - .do_send_async(maker_inc_connections::TakerMessage { + match self + .takers + .send(maker_inc_connections::TakerMessage { taker_id, command: TakerCommand::NotifySettlementAccepted { id: order_id }, }) - .await?; + .await? + { + Ok(_) => { + self.current_agreed_proposals + .insert(order_id, self.get_settlement_proposal(order_id)?); + self.remove_pending_proposal(&order_id) + .context("accepted settlement")?; + } + Err(e) => { + tracing::warn!("Failed to notify taker of accepted settlement: {}", e); + self.remove_pending_proposal(&order_id) + .context("accepted settlement")?; + } + } - self.current_agreed_proposals - .insert(order_id, self.get_settlement_proposal(order_id)?); - self.remove_pending_proposal(&order_id) - .context("accepted settlement")?; Ok(()) } @@ -365,15 +378,18 @@ where let taker_id = self.get_taker_id_of_proposal(&order_id)?; + // clean-up state ahead of sending to ensure consistency in case we fail to deliver the + // message + self.remove_pending_proposal(&order_id) + .context("rejected settlement")?; + self.takers - .do_send_async(maker_inc_connections::TakerMessage { + .send(maker_inc_connections::TakerMessage { taker_id, command: TakerCommand::NotifySettlementRejected { id: order_id }, }) - .await?; + .await??; - self.remove_pending_proposal(&order_id) - .context("rejected settlement")?; Ok(()) } @@ -394,15 +410,18 @@ where } }; + // clean-up state ahead of sending to ensure consistency in case we fail to deliver the + // message + self.remove_pending_proposal(&order_id) + .context("rejected roll_over")?; + self.takers - .do_send_async(maker_inc_connections::TakerMessage { + .send(maker_inc_connections::TakerMessage { taker_id, command: TakerCommand::NotifyRollOverRejected { id: order_id }, }) - .await?; + .await??; - self.remove_pending_proposal(&order_id) - .context("rejected roll_over")?; Ok(()) } } @@ -428,19 +447,19 @@ where load_order_by_id(current_order_id, &mut conn).await? } _ => { - self.takers - .do_send_async(maker_inc_connections::TakerMessage { - taker_id, - command: TakerCommand::NotifyInvalidOrderId { id: order_id }, - }) - .await?; - // An outdated order on the taker side does not require any state change on the // maker. notifying the taker with a specific message should be sufficient. // Since this is a scenario that we should rarely see we log // a warning to be sure we don't trigger this code path frequently. tracing::warn!("Taker tried to take order with outdated id {}", order_id); + self.takers + .send(maker_inc_connections::TakerMessage { + taker_id, + command: TakerCommand::NotifyInvalidOrderId { id: order_id }, + }) + .await??; + return Ok(()); } }; @@ -451,7 +470,7 @@ where // have to remove the current order. self.current_order_id = None; self.takers - .do_send_async(maker_inc_connections::BroadcastOrder(None)) + .send(maker_inc_connections::BroadcastOrder(None)) .await?; self.order_feed_sender.send(None)?; @@ -517,11 +536,11 @@ where append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?; self.takers - .do_send_async(maker_inc_connections::TakerMessage { + .send(maker_inc_connections::TakerMessage { taker_id, command: TakerCommand::NotifyOrderRejected { id: cfd.order.id }, }) - .await?; + .await??; Ok(()) } @@ -606,8 +625,9 @@ where tokio::spawn(async move { let dlc = contract_future.await; - this.do_send_async(CfdSetupCompleted { order_id, dlc }) + this.send(CfdSetupCompleted { order_id, dlc }) .await + .expect("always connected to ourselves"); }); // 6. Record that we are in an active contract setup @@ -668,7 +688,7 @@ where tracing::info!("Lock transaction published with txid {}", txid); self.monitor_actor - .do_send_async(monitor::StartMonitoring { + .send(monitor::StartMonitoring { id: order_id, params: MonitorParams::new( dlc, @@ -679,7 +699,7 @@ where .await?; self.oracle_actor - .do_send_async(oracle::MonitorAttestation { + .send(oracle::MonitorAttestation { event_id: cfd.order.oracle_event_id, }) .await?; @@ -740,7 +760,7 @@ where .await??; self.oracle_actor - .do_send_async(oracle::MonitorAttestation { + .send(oracle::MonitorAttestation { event_id: announcement.id, }) .await?; @@ -773,8 +793,9 @@ where tokio::spawn(async move { let dlc = contract_future.await; - this.do_send_async(CfdRollOverCompleted { order_id, dlc }) + this.send(CfdRollOverCompleted { order_id, dlc }) .await + .expect("always connected to ourselves") }); self.remove_pending_proposal(&order_id) @@ -806,7 +827,7 @@ where append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?; self.monitor_actor - .do_send_async(monitor::StartMonitoring { + .send(monitor::StartMonitoring { id: order_id, params: MonitorParams::new( dlc, @@ -875,7 +896,7 @@ where tracing::info!("Close transaction published with txid {}", txid); self.monitor_actor - .do_send_async(monitor::CollaborativeSettlement { + .send(monitor::CollaborativeSettlement { order_id, tx: (txid, own_script_pubkey), }) @@ -954,7 +975,7 @@ where // 4. Inform connected takers self.takers - .do_send_async(maker_inc_connections::BroadcastOrder(Some(order))) + .send(maker_inc_connections::BroadcastOrder(Some(order))) .await?; Ok(()) diff --git a/daemon/src/taker_cfd.rs b/daemon/src/taker_cfd.rs index 3242dd8..3ac2ce1 100644 --- a/daemon/src/taker_cfd.rs +++ b/daemon/src/taker_cfd.rs @@ -441,7 +441,7 @@ where tracing::info!("Lock transaction published with txid {}", txid); self.monitor_actor - .do_send_async(monitor::StartMonitoring { + .send(monitor::StartMonitoring { id: order_id, params: MonitorParams::new( dlc, @@ -452,7 +452,7 @@ where .await?; self.oracle_actor - .do_send_async(oracle::MonitorAttestation { + .send(oracle::MonitorAttestation { event_id: cfd.order.oracle_event_id, }) .await?; @@ -493,7 +493,7 @@ where .with_context(|| format!("Announcement {} not found", cfd.order.oracle_event_id))?; self.oracle_actor - .do_send_async(oracle::MonitorAttestation { + .send(oracle::MonitorAttestation { event_id: offer_announcement.id, }) .await?; @@ -518,8 +518,9 @@ where tokio::spawn(async move { let dlc = contract_future.await; - this.do_send_async(CfdSetupCompleted { order_id, dlc }) + this.send(CfdSetupCompleted { order_id, dlc }) .await + .expect("always connected to ourselves") }); self.setup_state = SetupState::Active { sender }; @@ -582,8 +583,9 @@ where tokio::spawn(async move { let dlc = contract_future.await; - this.do_send_async(CfdRollOverCompleted { order_id, dlc }) + this.send(CfdRollOverCompleted { order_id, dlc }) .await + .expect("always connected to ourselves") }); self.remove_pending_proposal(&order_id) @@ -616,7 +618,7 @@ where append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?; self.monitor_actor - .do_send_async(monitor::StartMonitoring { + .send(monitor::StartMonitoring { id: order_id, params: MonitorParams::new( dlc, @@ -670,7 +672,7 @@ where self.remove_pending_proposal(&order_id)?; self.monitor_actor - .do_send_async(monitor::CollaborativeSettlement { + .send(monitor::CollaborativeSettlement { order_id, tx: (tx.txid(), dlc.script_pubkey_for(Role::Taker)), }) diff --git a/daemon/tests/happy_path.rs b/daemon/tests/happy_path.rs index 0a712d3..d22701c 100644 --- a/daemon/tests/happy_path.rs +++ b/daemon/tests/happy_path.rs @@ -85,6 +85,15 @@ async fn taker_takes_order_and_maker_accepts_and_contract_setup() { maker.mocks.mock_party_params().await; taker.mocks.mock_party_params().await; + maker.mocks.mock_monitor_oracle_attestation().await; + taker.mocks.mock_monitor_oracle_attestation().await; + + maker.mocks.mock_oracle_monitor_attestation().await; + taker.mocks.mock_oracle_monitor_attestation().await; + + maker.mocks.mock_monitor_start_monitoring().await; + taker.mocks.mock_monitor_start_monitoring().await; + maker.accept_take_request(received.clone()).await; let (taker_cfd, maker_cfd) = next_cfd(taker.cfd_feed(), maker.cfd_feed()).await.unwrap(); diff --git a/daemon/tests/harness/mocks/mod.rs b/daemon/tests/harness/mocks/mod.rs index 50ae3cb..03d806f 100644 --- a/daemon/tests/harness/mocks/mod.rs +++ b/daemon/tests/harness/mocks/mod.rs @@ -32,13 +32,9 @@ impl Mocks { self.oracle.lock().await } - /// Mock message handlers that are not important for the test, but the cfd - /// actor still sends messages - pub async fn mock_common_empty_handlers(&mut self) { - // Sync methods need to be mocked before actors start + pub async fn mock_sync_handlers(&mut self) { self.oracle().await.expect_sync().return_const(()); self.monitor().await.expect_sync().return_const(()); - self.mock_monitor_oracle_attestation().await; } // Helper function setting up a "happy path" wallet mock @@ -65,6 +61,13 @@ impl Mocks { .return_const(Some(oracle::dummy_announcement())); } + pub async fn mock_oracle_monitor_attestation(&mut self) { + self.oracle() + .await + .expect_monitor_attestation() + .return_const(()); + } + pub async fn mock_party_params(&mut self) { #[allow(clippy::redundant_closure)] // clippy is in the wrong here self.wallet() @@ -79,6 +82,13 @@ impl Mocks { .expect_oracle_attestation() .return_const(()); } + + pub async fn mock_monitor_start_monitoring(&mut self) { + self.monitor() + .await + .expect_start_monitoring() + .return_const(()); + } } impl Default for Mocks { diff --git a/daemon/tests/harness/mod.rs b/daemon/tests/harness/mod.rs index f6f49d8..a564326 100644 --- a/daemon/tests/harness/mod.rs +++ b/daemon/tests/harness/mod.rs @@ -63,7 +63,6 @@ impl Maker { let mut mocks = mocks::Mocks::default(); let (oracle, monitor, wallet) = mocks::create_actors(&mocks); - mocks.mock_common_empty_handlers().await; let (wallet_addr, wallet_fut) = wallet.create(None).run(); tokio::spawn(wallet_fut); @@ -73,6 +72,8 @@ impl Maker { let seed = Seed::default(); let (identity_pk, identity_sk) = seed.derive_identity(); + // system startup sends sync messages, mock them + mocks.mock_sync_handlers().await; let maker = daemon::MakerActorSystem::new( db, wallet_addr, @@ -119,6 +120,8 @@ impl Maker { } pub async fn publish_order(&mut self, new_order_params: maker_cfd::NewOrder) { + self.mocks.mock_monitor_oracle_attestation().await; + self.system .cfd_actor_addr .send(new_order_params) @@ -178,11 +181,12 @@ impl Taker { let mut mocks = mocks::Mocks::default(); let (oracle, monitor, wallet) = mocks::create_actors(&mocks); - mocks.mock_common_empty_handlers().await; let (wallet_addr, wallet_fut) = wallet.create(None).run(); tokio::spawn(wallet_fut); + // system startup sends sync messages, mock them + mocks.mock_sync_handlers().await; let taker = daemon::TakerActorSystem::new( db, wallet_addr,