Browse Source

Decouple wallet sync from cfd actors

Now that we have a cloneable wallet, we don't actually need to have
a single owner of it.
fix-bad-api-calls
Thomas Eizinger 3 years ago
parent
commit
81259e1b61
No known key found for this signature in database GPG Key ID: 651AC83A6C6C8B96
  1. 20
      daemon/src/maker.rs
  2. 24
      daemon/src/maker_cfd.rs
  3. 21
      daemon/src/taker.rs
  4. 24
      daemon/src/taker_cfd_actor.rs
  5. 24
      daemon/src/wallet_sync.rs

20
daemon/src/maker.rs

@ -12,7 +12,6 @@ use model::WalletInfo;
use rocket::fairing::AdHoc;
use rocket_db_pools::Database;
use std::path::PathBuf;
use std::time::Duration;
use tokio::sync::watch;
use tracing_subscriber::filter::LevelFilter;
use xtra::prelude::*;
@ -34,6 +33,7 @@ mod send_wire_message_actor;
mod setup_contract_actor;
mod to_sse_event;
mod wallet;
mod wallet_sync;
mod wire;
#[derive(Database)]
@ -162,11 +162,10 @@ async fn main() -> Result<()> {
let cfd_maker_actor_inbox = maker_cfd::Actor::new(
db,
wallet,
wallet.clone(),
schnorrsig::PublicKey::from_keypair(SECP256K1, &oracle),
cfd_feed_sender,
order_feed_sender,
wallet_feed_sender,
maker_inc_connections_address.clone(),
)
.await
@ -213,20 +212,7 @@ async fn main() -> Result<()> {
}
});
// consecutive wallet syncs handled by task that triggers sync
let wallet_sync_interval = Duration::from_secs(10);
tokio::spawn({
let cfd_actor_inbox = cfd_maker_actor_inbox.clone();
async move {
loop {
cfd_actor_inbox
.do_send_async(maker_cfd::SyncWallet)
.await
.unwrap();
tokio::time::sleep(wallet_sync_interval).await;
}
}
});
tokio::spawn(wallet_sync::new(wallet, wallet_feed_sender));
Ok(rocket.manage(cfd_maker_actor_inbox))
},

24
daemon/src/maker_cfd.rs

@ -5,7 +5,7 @@ use crate::db::{
};
use crate::maker_inc_connections::TakerCommand;
use crate::model::cfd::{Cfd, CfdState, CfdStateCommon, Dlc, Order, OrderId};
use crate::model::{TakerId, Usd, WalletInfo};
use crate::model::{TakerId, Usd};
use crate::wallet::Wallet;
use crate::wire::SetupMsg;
use crate::{maker_inc_connections, setup_contract_actor};
@ -43,15 +43,12 @@ pub struct CfdSetupCompleted {
pub dlc: Dlc,
}
pub struct SyncWallet;
pub struct Actor {
db: sqlx::SqlitePool,
wallet: Wallet,
oracle_pk: schnorrsig::PublicKey,
cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>,
order_feed_sender: watch::Sender<Option<Order>>,
wallet_feed_sender: watch::Sender<WalletInfo>,
takers: Address<maker_inc_connections::Actor>,
current_order_id: Option<OrderId>,
current_contract_setup: Option<mpsc::UnboundedSender<SetupMsg>>,
@ -67,7 +64,6 @@ impl Actor {
oracle_pk: schnorrsig::PublicKey,
cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>,
order_feed_sender: watch::Sender<Option<Order>>,
wallet_feed_sender: watch::Sender<WalletInfo>,
takers: Address<maker_inc_connections::Actor>,
) -> Result<Self> {
let mut conn = db.acquire().await?;
@ -81,7 +77,6 @@ impl Actor {
oracle_pk,
cfd_feed_actor_inbox,
order_feed_sender,
wallet_feed_sender,
takers,
current_order_id: None,
current_contract_setup: None,
@ -142,12 +137,6 @@ impl Actor {
Ok(())
}
async fn handle_sync_wallet(&mut self) -> Result<()> {
let wallet_info = self.wallet.sync().await?;
self.wallet_feed_sender.send(wallet_info)?;
Ok(())
}
async fn handle_cfd_setup_completed(&mut self, msg: CfdSetupCompleted) -> Result<()> {
let mut conn = self.db.acquire().await?;
@ -447,13 +436,6 @@ impl Handler<CfdSetupCompleted> for Actor {
}
}
#[async_trait]
impl Handler<SyncWallet> for Actor {
async fn handle(&mut self, _msg: SyncWallet, _ctx: &mut Context<Self>) {
log_error!(self.handle_sync_wallet());
}
}
impl Message for TakeOrder {
type Result = ();
}
@ -474,10 +456,6 @@ impl Message for CfdSetupCompleted {
type Result = ();
}
impl Message for SyncWallet {
type Result = ();
}
impl Message for AcceptOrder {
type Result = ();
}

21
daemon/src/taker.rs

@ -32,6 +32,7 @@ mod taker_cfd_actor;
mod taker_inc_message_actor;
mod to_sse_event;
mod wallet;
mod wallet_sync;
mod wire;
const CONNECTION_RETRY_INTERVAL: Duration = Duration::from_secs(5);
@ -161,11 +162,10 @@ async fn main() -> Result<()> {
let cfd_actor_inbox = taker_cfd_actor::TakerCfdActor::new(
db,
wallet,
wallet.clone(),
schnorrsig::PublicKey::from_keypair(SECP256K1, &oracle),
cfd_feed_sender,
order_feed_sender,
wallet_feed_sender,
out_maker_actor_inbox,
)
.await
@ -176,22 +176,7 @@ async fn main() -> Result<()> {
let inc_maker_messages_actor =
taker_inc_message_actor::new(read, cfd_actor_inbox.clone());
// consecutive wallet syncs handled by task that triggers sync
let wallet_sync_interval = Duration::from_secs(10);
tokio::spawn({
let cfd_actor_inbox = cfd_actor_inbox.clone();
async move {
loop {
cfd_actor_inbox
.do_send_async(taker_cfd_actor::SyncWallet)
.await
.unwrap();
tokio::time::sleep(wallet_sync_interval).await;
}
}
});
tokio::spawn(wallet_sync::new(wallet, wallet_feed_sender));
tokio::spawn(inc_maker_messages_actor);
tokio::spawn(out_maker_messages_actor);

24
daemon/src/taker_cfd_actor.rs

@ -5,7 +5,7 @@ use crate::db::{
use crate::actors::log_error;
use crate::model::cfd::{Cfd, CfdState, CfdStateCommon, Dlc, Order, OrderId};
use crate::model::{Usd, WalletInfo};
use crate::model::Usd;
use crate::wallet::Wallet;
use crate::wire::SetupMsg;
use crate::{setup_contract_actor, wire};
@ -16,8 +16,6 @@ use std::time::SystemTime;
use tokio::sync::{mpsc, watch};
use xtra::prelude::*;
pub struct SyncWallet;
pub struct TakeOffer {
pub order_id: OrderId,
pub quantity: Usd,
@ -40,7 +38,6 @@ pub struct TakerCfdActor {
oracle_pk: schnorrsig::PublicKey,
cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>,
order_feed_actor_inbox: watch::Sender<Option<Order>>,
wallet_feed_sender: watch::Sender<WalletInfo>,
out_msg_maker_inbox: mpsc::UnboundedSender<wire::TakerToMaker>,
current_contract_setup: Option<mpsc::UnboundedSender<SetupMsg>>,
// TODO: Move the contract setup into a dedicated actor and send messages to that actor that
@ -55,7 +52,6 @@ impl TakerCfdActor {
oracle_pk: schnorrsig::PublicKey,
cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>,
order_feed_actor_inbox: watch::Sender<Option<Order>>,
wallet_feed_sender: watch::Sender<WalletInfo>,
out_msg_maker_inbox: mpsc::UnboundedSender<wire::TakerToMaker>,
) -> Result<Self> {
let mut conn = db.acquire().await?;
@ -67,19 +63,12 @@ impl TakerCfdActor {
oracle_pk,
cfd_feed_actor_inbox,
order_feed_actor_inbox,
wallet_feed_sender,
out_msg_maker_inbox,
current_contract_setup: None,
contract_setup_message_buffer: vec![],
})
}
async fn handle_sync_wallet(&mut self) -> Result<()> {
let wallet_info = self.wallet.sync().await?;
self.wallet_feed_sender.send(wallet_info)?;
Ok(())
}
async fn handle_take_offer(&mut self, order_id: OrderId, quantity: Usd) -> Result<()> {
let mut conn = self.db.acquire().await?;
@ -250,13 +239,6 @@ impl TakerCfdActor {
}
}
#[async_trait]
impl Handler<SyncWallet> for TakerCfdActor {
async fn handle(&mut self, _msg: SyncWallet, _ctx: &mut Context<Self>) {
log_error!(self.handle_sync_wallet());
}
}
#[async_trait]
impl Handler<TakeOffer> for TakerCfdActor {
async fn handle(&mut self, msg: TakeOffer, _ctx: &mut Context<Self>) {
@ -299,10 +281,6 @@ impl Handler<CfdSetupCompleted> for TakerCfdActor {
}
}
impl Message for SyncWallet {
type Result = ();
}
impl Message for TakeOffer {
type Result = ();
}

24
daemon/src/wallet_sync.rs

@ -0,0 +1,24 @@
use crate::wallet::Wallet;
use crate::WalletInfo;
use std::time::Duration;
use tokio::sync::watch;
use tokio::time::sleep;
pub async fn new(wallet: Wallet, sender: watch::Sender<WalletInfo>) {
loop {
sleep(Duration::from_secs(10)).await;
let info = match wallet.sync().await {
Ok(info) => info,
Err(e) => {
tracing::warn!("Failed to sync wallet: {:#}", e);
continue;
}
};
if sender.send(info).is_err() {
tracing::warn!("Wallet feed receiver not available, stopping wallet sync");
break;
}
}
}
Loading…
Cancel
Save