Browse Source

Merge #861

861: Phase out `log_error` macro r=luckysori a=luckysori

Fixes #553.

Co-authored-by: Lucas Soriano del Pino <lucas_soriano@fastmail.com>
update-blockstream-electrum-server-url
bors[bot] 3 years ago
committed by GitHub
parent
commit
8509814856
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 9
      daemon/src/actors.rs
  2. 7
      daemon/src/collab_settlement_maker.rs
  3. 13
      daemon/src/connection.rs
  4. 2
      daemon/src/lib.rs
  5. 79
      daemon/src/maker_cfd.rs
  6. 9
      daemon/src/maker_inc_connections.rs
  7. 19
      daemon/src/monitor.rs
  8. 14
      daemon/src/oracle.rs
  9. 6
      daemon/src/rollover_maker.rs
  10. 7
      daemon/src/setup_maker.rs
  11. 13
      daemon/src/setup_taker.rs
  12. 21
      daemon/src/taker_cfd.rs
  13. 50
      daemon/src/xtra_ext.rs
  14. 5
      daemon/tests/harness/mocks/monitor.rs
  15. 4
      daemon/tests/harness/mod.rs

9
daemon/src/actors.rs

@ -1,9 +0,0 @@
/// Wrapper for handlers to log errors
#[macro_export]
macro_rules! log_error {
($future:expr) => {
if let Err(e) = $future.await {
tracing::error!("Message handler failed: {:#}", e);
}
};
}

7
daemon/src/collab_settlement_maker.rs

@ -9,6 +9,7 @@ use crate::model::cfd::SettlementKind;
use crate::model::cfd::SettlementProposal;
use crate::model::Identity;
use crate::projection;
use crate::xtra_ext::LogFailure;
use anyhow::Context;
use async_trait::async_trait;
use bdk::bitcoin::Script;
@ -172,7 +173,11 @@ impl Actor {
}
async fn complete(&mut self, completed: Completed, ctx: &mut xtra::Context<Self>) {
let _ = self.on_completed.send(completed).await;
let _ = self
.on_completed
.send(completed)
.log_failure("Failed to inform about collab settlement completion")
.await;
ctx.stop();
}

13
daemon/src/connection.rs

@ -1,7 +1,6 @@
use crate::address_map::AddressMap;
use crate::address_map::Stopping;
use crate::collab_settlement_taker;
use crate::log_error;
use crate::model::cfd::OrderId;
use crate::model::Identity;
use crate::model::Price;
@ -17,6 +16,7 @@ use crate::wire;
use crate::wire::EncryptedJsonCodec;
use crate::wire::TakerToMaker;
use crate::wire::Version;
use crate::xtra_ext::LogFailure;
use crate::Tasks;
use anyhow::bail;
use anyhow::Context;
@ -146,7 +146,10 @@ impl Actor {
#[xtra_productivity(message_impl = false)]
impl Actor {
async fn handle_taker_to_maker(&mut self, message: wire::TakerToMaker) {
log_error!(self.send_to_maker.send(message));
let msg_str = message.to_string();
if self.send_to_maker.send(message).await.is_err() {
tracing::warn!("Failed to send wire message {} to maker", msg_str);
}
}
async fn handle_collab_settlement_actor_stopping(
@ -430,7 +433,11 @@ impl Actor {
}
}
wire::MakerToTaker::CurrentOrder(msg) => {
log_error!(self.current_order.send(CurrentOrder(msg)));
let _ = self
.current_order
.send(CurrentOrder(msg))
.log_failure("Failed to forward current order from maker")
.await;
}
wire::MakerToTaker::Hello(_) => {
tracing::warn!("Ignoring unexpected Hello message from maker. Hello is only expected when opening a new connection.")

2
daemon/src/lib.rs

@ -26,7 +26,6 @@ use xtra::Address;
pub mod sqlx_ext; // Must come first because it is a macro.
pub mod actors;
pub mod address_map;
pub mod auth;
pub mod auto_rollover;
@ -67,6 +66,7 @@ pub mod try_continue;
pub mod tx;
pub mod wallet;
pub mod wire;
pub mod xtra_ext;
// Certain operations (e.g. contract setup) take long time in debug mode,
// causing us to lag behind in processing heartbeats.

79
daemon/src/maker_cfd.rs

@ -5,7 +5,6 @@ use crate::cfd_actors::insert_cfd_and_update_feed;
use crate::cfd_actors::{self};
use crate::collab_settlement_maker;
use crate::db::load_cfd;
use crate::log_error;
use crate::maker_inc_connections;
use crate::model::cfd::Cfd;
use crate::model::cfd::CfdState;
@ -557,8 +556,10 @@ where
M: xtra::Handler<monitor::CollaborativeSettlement>,
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
async fn handle_settlement_completed(&mut self, msg: collab_settlement_maker::Completed) {
log_error!(async {
async fn handle_settlement_completed(
&mut self,
msg: collab_settlement_maker::Completed,
) -> Result<()> {
use collab_settlement_maker::Completed::*;
let (order_id, settlement, script_pubkey) = match msg {
Confirmed {
@ -598,8 +599,7 @@ where
})
.await?;
anyhow::Ok(())
});
Ok(())
}
}
@ -754,8 +754,7 @@ where
M: xtra::Handler<monitor::StartMonitoring>,
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
async fn handle_setup_completed(&mut self, msg: setup_maker::Completed) {
log_error!(async {
async fn handle_setup_completed(&mut self, msg: setup_maker::Completed) -> Result<()> {
use setup_maker::Completed::*;
let (order_id, dlc) = match msg {
NewContract { order_id, dlc } => (order_id, dlc),
@ -803,7 +802,6 @@ where
.await?;
Ok(())
});
}
}
@ -812,8 +810,8 @@ impl<O: 'static, M: 'static, T: 'static, W: 'static> Handler<TakerConnected> for
where
T: xtra::Handler<maker_inc_connections::TakerMessage>,
{
async fn handle(&mut self, msg: TakerConnected, _ctx: &mut Context<Self>) {
log_error!(self.handle_taker_connected(msg.id));
async fn handle(&mut self, msg: TakerConnected, _ctx: &mut Context<Self>) -> Result<()> {
self.handle_taker_connected(msg.id).await
}
}
@ -823,8 +821,8 @@ impl<O: 'static, M: 'static, T: 'static, W: 'static> Handler<TakerDisconnected>
where
T: xtra::Handler<maker_inc_connections::TakerMessage>,
{
async fn handle(&mut self, msg: TakerDisconnected, _ctx: &mut Context<Self>) {
log_error!(self.handle_taker_disconnected(msg.id));
async fn handle(&mut self, msg: TakerDisconnected, _ctx: &mut Context<Self>) -> Result<()> {
self.handle_taker_disconnected(msg.id).await
}
}
@ -834,8 +832,8 @@ where
M: xtra::Handler<monitor::StartMonitoring>,
O: xtra::Handler<oracle::MonitorAttestation>,
{
async fn handle(&mut self, msg: Completed, _ctx: &mut Context<Self>) {
log_error!(self.handle_roll_over_completed(msg));
async fn handle(&mut self, msg: Completed, _ctx: &mut Context<Self>) -> Result<()> {
self.handle_roll_over_completed(msg).await
}
}
@ -844,8 +842,8 @@ impl<O: 'static, M: 'static, T: 'static, W: 'static> Handler<monitor::Event> for
where
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
async fn handle(&mut self, msg: monitor::Event, _ctx: &mut Context<Self>) {
log_error!(self.handle_monitoring_event(msg))
async fn handle(&mut self, msg: monitor::Event, _ctx: &mut Context<Self>) -> Result<()> {
self.handle_monitoring_event(msg).await
}
}
@ -866,10 +864,15 @@ where
+ xtra::Handler<wallet::BuildPartyParams>
+ xtra::Handler<wallet::TryBroadcastTransaction>,
{
async fn handle(&mut self, FromTaker { taker_id, msg }: FromTaker, ctx: &mut Context<Self>) {
async fn handle(
&mut self,
FromTaker { taker_id, msg }: FromTaker,
ctx: &mut Context<Self>,
) -> Result<()> {
match msg {
wire::TakerToMaker::TakeOrder { order_id, quantity } => {
log_error!(self.handle_take_order(taker_id, order_id, quantity, ctx))
self.handle_take_order(taker_id, order_id, quantity, ctx)
.await?
}
wire::TakerToMaker::Settlement {
order_id,
@ -881,17 +884,22 @@ where
price,
},
} => {
log_error!(self.handle_propose_settlement(
if let Err(e) = self
.handle_propose_settlement(
taker_id,
SettlementProposal {
order_id,
timestamp,
taker,
maker,
price
price,
},
ctx
))
ctx,
)
.await
{
tracing::warn!("Failed ot handle settlement proposal: {:#}", e);
}
}
wire::TakerToMaker::Settlement {
msg: wire::taker_to_maker::Settlement::Initiate { .. },
@ -903,14 +911,19 @@ where
order_id,
timestamp,
} => {
log_error!(self.handle_propose_roll_over(
if let Err(e) = self
.handle_propose_roll_over(
RolloverProposal {
order_id,
timestamp,
},
taker_id,
ctx
))
ctx,
)
.await
{
tracing::warn!("Failed to handle rollover proposal: {:#}", e);
}
}
wire::TakerToMaker::RollOverProtocol { .. } => {
unreachable!("This kind of message should be sent to the rollover_maker::Actor`")
@ -921,7 +934,9 @@ where
TakerToMaker::Hello(_) => {
unreachable!("The Hello message is not sent to the cfd actor")
}
}
};
Ok(())
}
}
@ -932,24 +947,26 @@ where
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
async fn handle(&mut self, msg: oracle::Attestation, _ctx: &mut Context<Self>) {
log_error!(self.handle_oracle_attestation(msg))
if let Err(e) = self.handle_oracle_attestation(msg).await {
tracing::warn!("Failed to handle oracle attestation: {:#}", e)
}
}
}
impl Message for TakerConnected {
type Result = ();
type Result = Result<()>;
}
impl Message for TakerDisconnected {
type Result = ();
type Result = Result<()>;
}
impl Message for Completed {
type Result = ();
type Result = Result<()>;
}
impl Message for FromTaker {
type Result = ();
type Result = Result<()>;
}
impl<O: 'static, M: 'static, T: 'static, W: 'static> xtra::Actor for Actor<O, M, T, W> {}

9
daemon/src/maker_inc_connections.rs

@ -20,6 +20,7 @@ use crate::wire::EncryptedJsonCodec;
use crate::wire::MakerToTaker;
use crate::wire::TakerToMaker;
use crate::wire::Version;
use crate::xtra_ext::LogFailure;
use crate::Tasks;
use anyhow::bail;
use anyhow::Context;
@ -138,6 +139,7 @@ impl Actor {
let _ = self
.taker_disconnected_channel
.send(maker_cfd::TakerDisconnected { id: *taker_id })
.log_failure("Failed to inform about taker disconnect")
.await;
let _ = self.connection_tasks.remove(taker_id);
}
@ -219,7 +221,7 @@ impl Actor {
let this = ctx.address().expect("self to be alive");
let read_fut = async move {
while let Ok(Some(msg)) = read.try_next().await {
let res = this.send(FromTaker { taker_id, msg }).await;
let res = this.send(FromTaker { taker_id, msg }).log_failure("").await;
if res.is_err() {
break;
@ -249,6 +251,7 @@ impl Actor {
let _ = self
.taker_connected_channel
.send(maker_cfd::TakerConnected { id: taker_id })
.log_failure("Failed to report new taker connection")
.await;
Ok(())
@ -342,7 +345,7 @@ impl Actor {
#[xtra_productivity(message_impl = false)]
impl Actor {
async fn handle_msg_from_taker(&mut self, msg: FromTaker) {
async fn handle_msg_from_taker(&mut self, msg: FromTaker) -> Result<()> {
use wire::TakerToMaker::*;
match msg.msg {
Protocol { order_id, msg } => match self.setup_actors.get_connected(&order_id) {
@ -380,6 +383,8 @@ impl Actor {
let _ = self.taker_msg_channel.send(msg);
}
}
Ok(())
}
async fn handle_setup_actor_stopping(&mut self, message: Stopping<setup_maker::Actor>) {

19
daemon/src/monitor.rs

@ -1,4 +1,3 @@
use crate::log_error;
use crate::model;
use crate::model::cfd::CetStatus;
use crate::model::cfd::Cfd;
@ -333,7 +332,7 @@ where
Ok(())
}
async fn handle_oracle_attestation(&mut self, attestation: oracle::Attestation) -> Result<()> {
async fn handle_oracle_attestation(&mut self, attestation: oracle::Attestation) {
for (order_id, MonitorParams { cets, .. }) in self
.cfds
.clone()
@ -342,8 +341,6 @@ where
{
try_continue!(self.monitor_cet_finality(cets, attestation.clone(), order_id))
}
Ok(())
}
async fn update_state(
@ -433,7 +430,7 @@ where
for (target_status, event) in reached_monitoring_target {
tracing::info!(%txid, target = %target_status, current = %status, "Bitcoin transaction reached monitoring target");
self.event_channel.send(event).await?;
self.event_channel.send(event).await??;
}
}
}
@ -628,7 +625,7 @@ fn map_cets(
}
impl xtra::Message for Event {
type Result = ();
type Result = Result<()>;
}
impl xtra::Message for Sync {
@ -669,14 +666,16 @@ where
C: bdk::electrum_client::ElectrumApi + Send + 'static,
{
async fn handle(&mut self, _: Sync, _ctx: &mut xtra::Context<Self>) {
log_error!(self.sync());
if let Err(e) = self.sync().await {
tracing::warn!("Sync failed: {:#}", e);
}
}
}
#[async_trait]
impl xtra::Handler<oracle::Attestation> for Actor {
async fn handle(&mut self, msg: oracle::Attestation, _ctx: &mut xtra::Context<Self>) {
log_error!(self.handle_oracle_attestation(msg));
self.handle_oracle_attestation(msg).await
}
}
@ -852,8 +851,10 @@ mod tests {
#[async_trait]
impl xtra::Handler<Event> for MessageRecordingActor {
async fn handle(&mut self, message: Event, _ctx: &mut xtra::Context<Self>) {
async fn handle(&mut self, message: Event, _ctx: &mut xtra::Context<Self>) -> Result<()> {
self.events.push(message);
Ok(())
}
}

14
daemon/src/oracle.rs

@ -1,9 +1,9 @@
use crate::log_error;
use crate::model::cfd::Cfd;
use crate::model::cfd::CfdState;
use crate::model::BitMexPriceEventId;
use crate::tokio_ext;
use crate::try_continue;
use crate::xtra_ext::LogFailure;
use anyhow::Context;
use anyhow::Result;
use async_trait::async_trait;
@ -188,6 +188,7 @@ impl Actor {
id: event_id,
attestation,
})
.log_failure("Failed to send attestation to oracle::Actor")
.await?;
Ok(())
@ -257,8 +258,13 @@ pub struct NoAnnouncement(pub BitMexPriceEventId);
#[async_trait]
impl xtra::Handler<NewAttestationFetched> for Actor {
async fn handle(&mut self, msg: NewAttestationFetched, _ctx: &mut xtra::Context<Self>) {
log_error!(self.handle_new_attestation_fetched(msg.id, msg.attestation));
async fn handle(
&mut self,
msg: NewAttestationFetched,
_ctx: &mut xtra::Context<Self>,
) -> Result<()> {
self.handle_new_attestation_fetched(msg.id, msg.attestation)
.await
}
}
@ -305,7 +311,7 @@ impl xtra::Message for Attestation {
}
impl xtra::Message for NewAttestationFetched {
type Result = ();
type Result = Result<()>;
}
mod olivia_api {

6
daemon/src/rollover_maker.rs

@ -20,6 +20,7 @@ use crate::tokio_ext::spawn_fallible;
use crate::wire;
use crate::wire::MakerToTaker;
use crate::wire::RollOverMsg;
use crate::xtra_ext::LogFailure;
use crate::Cfd;
use crate::Stopping;
use anyhow::Context as _;
@ -140,7 +141,10 @@ impl Actor {
order_id: self.cfd.id(),
dlc,
};
self.maker_cfd_actor.send(msg).await?;
self.maker_cfd_actor
.send(msg)
.log_failure("Failed to report rollover completion")
.await?;
ctx.stop();
Ok(())
}

7
daemon/src/setup_maker.rs

@ -14,6 +14,7 @@ use crate::tokio_ext::spawn_fallible;
use crate::wallet;
use crate::wire;
use crate::wire::SetupMsg;
use crate::xtra_ext::LogFailure;
use anyhow::Context;
use anyhow::Result;
use async_trait::async_trait;
@ -121,7 +122,11 @@ impl Actor {
}
async fn complete(&mut self, completed: Completed, ctx: &mut xtra::Context<Self>) {
let _ = self.on_completed.send(completed).await;
let _ = self
.on_completed
.send(completed)
.log_failure("Failed to inform about contract setup completion")
.await;
ctx.stop();
}

13
daemon/src/setup_taker.rs

@ -13,6 +13,7 @@ use crate::tokio_ext::spawn_fallible;
use crate::wallet;
use crate::wire;
use crate::wire::SetupMsg;
use crate::xtra_ext::LogFailure;
use anyhow::Context;
use anyhow::Result;
use async_trait::async_trait;
@ -74,7 +75,10 @@ impl Actor {
// inform the `taker_cfd::Actor` about the start of contract
// setup, so that the db and UI can be updated accordingly
self.on_accepted.send(Started(order_id)).await?;
self.on_accepted
.send(Started(order_id))
.log_failure("Failed to inform about contract setup start")
.await?;
let (sender, receiver) = mpsc::unbounded::<SetupMsg>();
// store the writing end to forward messages from the maker to
@ -124,6 +128,7 @@ impl Actor {
self.on_completed
.send(Completed::Rejected { order_id })
.log_failure("Failed to inform about contract setup rejection")
.await?;
ctx.stop();
@ -147,6 +152,7 @@ impl Actor {
order_id: msg.order_id,
dlc: msg.dlc,
})
.log_failure("Failed to inform about contract setup completion")
.await?;
ctx.stop();
@ -160,6 +166,7 @@ impl Actor {
order_id: msg.order_id,
error: msg.error,
})
.log_failure("Failed to inform about contract setup failure")
.await?;
ctx.stop();
@ -240,11 +247,11 @@ impl Rejected {
}
impl xtra::Message for Started {
type Result = ();
type Result = Result<()>;
}
impl xtra::Message for Completed {
type Result = ();
type Result = Result<()>;
}
impl address_map::ActorName for Actor {

21
daemon/src/taker_cfd.rs

@ -5,7 +5,6 @@ use crate::cfd_actors::{self};
use crate::collab_settlement_taker;
use crate::connection;
use crate::db::load_cfd;
use crate::log_error;
use crate::model::cfd::Cfd;
use crate::model::cfd::CfdState;
use crate::model::cfd::CfdStateCommon;
@ -412,8 +411,8 @@ where
#[xtra_productivity]
impl<O, M, W> Actor<O, M, W> {
async fn handle_current_order(&mut self, msg: CurrentOrder) {
log_error!(self.handle_new_order(msg.0));
async fn handle_current_order(&mut self, msg: CurrentOrder) -> Result<()> {
self.handle_new_order(msg.0).await
}
}
@ -424,8 +423,8 @@ where
M: xtra::Handler<monitor::StartMonitoring>,
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
async fn handle(&mut self, msg: Completed, _ctx: &mut Context<Self>) {
log_error!(self.handle_setup_completed(msg))
async fn handle(&mut self, msg: Completed, _ctx: &mut Context<Self>) -> Result<()> {
self.handle_setup_completed(msg).await
}
}
@ -434,8 +433,8 @@ impl<O: 'static, M: 'static, W: 'static> Handler<monitor::Event> for Actor<O, M,
where
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
async fn handle(&mut self, msg: monitor::Event, _ctx: &mut Context<Self>) {
log_error!(self.handle_monitoring_event(msg))
async fn handle(&mut self, msg: monitor::Event, _ctx: &mut Context<Self>) -> Result<()> {
self.handle_monitoring_event(msg).await
}
}
@ -445,14 +444,16 @@ where
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
async fn handle(&mut self, msg: oracle::Attestation, _ctx: &mut Context<Self>) {
log_error!(self.handle_oracle_attestation(msg))
if let Err(e) = self.handle_oracle_attestation(msg).await {
tracing::warn!("Failed to handle oracle attestation: {:#}", e)
}
}
}
#[async_trait]
impl<O: 'static, M: 'static, W: 'static> Handler<setup_taker::Started> for Actor<O, M, W> {
async fn handle(&mut self, msg: setup_taker::Started, _ctx: &mut Context<Self>) {
log_error!(self.handle_setup_started(msg.0))
async fn handle(&mut self, msg: setup_taker::Started, _ctx: &mut Context<Self>) -> Result<()> {
self.handle_setup_started(msg.0).await
}
}

50
daemon/src/xtra_ext.rs

@ -0,0 +1,50 @@
use async_trait::async_trait;
use xtra::address;
use xtra::message_channel;
use xtra::Actor;
use xtra::Disconnected;
use xtra::Message;
#[async_trait]
pub trait LogFailure {
async fn log_failure(self, context: &str) -> Result<(), Disconnected>;
}
#[async_trait]
impl<A, M> LogFailure for address::SendFuture<A, M>
where
A: Actor,
M: Message<Result = anyhow::Result<()>>,
{
async fn log_failure(self, context: &str) -> Result<(), Disconnected> {
if let Err(e) = self.await? {
tracing::warn!(
"{}: Message handler for message {} failed: {:#}",
context,
std::any::type_name::<M>(),
e
);
}
Ok(())
}
}
#[async_trait]
impl<M> LogFailure for message_channel::SendFuture<M>
where
M: xtra::Message<Result = anyhow::Result<()>>,
{
async fn log_failure(self, context: &str) -> Result<(), Disconnected> {
if let Err(e) = self.await? {
tracing::warn!(
"{}: Message handler for message {} failed: {:#}",
context,
std::any::type_name::<M>(),
e
);
}
Ok(())
}
}

5
daemon/tests/harness/mocks/monitor.rs

@ -1,8 +1,7 @@
use std::sync::Arc;
use daemon::monitor;
use daemon::oracle;
use mockall::*;
use std::sync::Arc;
use tokio::sync::Mutex;
use xtra_productivity::xtra_productivity;
@ -30,7 +29,7 @@ impl MonitorActor {
}
async fn handle(&mut self, msg: oracle::Attestation) {
self.mock.lock().await.oracle_attestation(msg)
self.mock.lock().await.oracle_attestation(msg);
}
}

4
daemon/tests/harness/mod.rs

@ -367,9 +367,13 @@ impl Taker {
#[macro_export]
macro_rules! deliver_event {
($maker:expr, $taker:expr, $event:expr) => {
#[allow(unused_must_use)]
{
tracing::debug!("Delivering event: {:?}", $event);
$taker.system.cfd_actor_addr.send($event).await.unwrap();
$maker.system.cfd_actor_addr.send($event).await.unwrap();
}
};
}

Loading…
Cancel
Save