Browse Source

Merge pull request #128 from comit-network/misc-improvements

Misc improvments
fix-bad-api-calls
Daniel Karzel 3 years ago
committed by GitHub
parent
commit
edeb3565d4
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 50
      daemon/src/maker.rs
  2. 116
      daemon/src/maker_cfd.rs
  3. 34
      daemon/src/maker_inc_connections.rs
  4. 14
      daemon/src/routes_maker.rs
  5. 21
      daemon/src/taker.rs
  6. 24
      daemon/src/taker_cfd_actor.rs
  7. 24
      daemon/src/wallet_sync.rs

50
daemon/src/maker.rs

@ -1,5 +1,5 @@
use crate::auth::MAKER_USERNAME; use crate::auth::MAKER_USERNAME;
use crate::maker_inc_connections_actor::in_taker_messages; use crate::maker_inc_connections::in_taker_messages;
use crate::model::TakerId; use crate::model::TakerId;
use crate::seed::Seed; use crate::seed::Seed;
use crate::wallet::Wallet; use crate::wallet::Wallet;
@ -12,7 +12,6 @@ use model::WalletInfo;
use rocket::fairing::AdHoc; use rocket::fairing::AdHoc;
use rocket_db_pools::Database; use rocket_db_pools::Database;
use std::path::PathBuf; use std::path::PathBuf;
use std::time::Duration;
use tokio::sync::watch; use tokio::sync::watch;
use tracing_subscriber::filter::LevelFilter; use tracing_subscriber::filter::LevelFilter;
use xtra::prelude::*; use xtra::prelude::*;
@ -24,8 +23,8 @@ mod bitmex_price_feed;
mod db; mod db;
mod keypair; mod keypair;
mod logger; mod logger;
mod maker_cfd_actor; mod maker_cfd;
mod maker_inc_connections_actor; mod maker_inc_connections;
mod model; mod model;
mod routes; mod routes;
mod routes_maker; mod routes_maker;
@ -34,6 +33,7 @@ mod send_wire_message_actor;
mod setup_contract_actor; mod setup_contract_actor;
mod to_sse_event; mod to_sse_event;
mod wallet; mod wallet;
mod wallet_sync;
mod wire; mod wire;
#[derive(Database)] #[derive(Database)]
@ -157,25 +157,27 @@ async fn main() -> Result<()> {
None => return Err(rocket), None => return Err(rocket),
}; };
let cfd_maker_actor_inbox = maker_cfd_actor::MakerCfdActor::new( let (maker_inc_connections_address, maker_inc_connections_context) =
xtra::Context::new(None);
let cfd_maker_actor_inbox = maker_cfd::Actor::new(
db, db,
wallet, wallet.clone(),
schnorrsig::PublicKey::from_keypair(SECP256K1, &oracle), schnorrsig::PublicKey::from_keypair(SECP256K1, &oracle),
cfd_feed_sender, cfd_feed_sender,
order_feed_sender, order_feed_sender,
wallet_feed_sender, maker_inc_connections_address.clone(),
) )
.await .await
.unwrap() .unwrap()
.create(None) .create(None)
.spawn_global(); .spawn_global();
let maker_inc_connections_address = tokio::spawn(
maker_inc_connections_actor::MakerIncConnectionsActor::new( maker_inc_connections_context.run(maker_inc_connections::Actor::new(
cfd_maker_actor_inbox.clone(), cfd_maker_actor_inbox.clone(),
) )),
.create(None) );
.spawn_global();
tokio::spawn({ tokio::spawn({
let cfd_maker_actor_inbox = cfd_maker_actor_inbox.clone(); let cfd_maker_actor_inbox = cfd_maker_actor_inbox.clone();
@ -199,7 +201,7 @@ async fn main() -> Result<()> {
tokio::spawn(out_msg_actor); tokio::spawn(out_msg_actor);
maker_inc_connections_address maker_inc_connections_address
.do_send_async(maker_inc_connections_actor::NewTakerOnline { .do_send_async(maker_inc_connections::NewTakerOnline {
taker_id, taker_id,
out_msg_actor_inbox, out_msg_actor_inbox,
}) })
@ -210,27 +212,7 @@ async fn main() -> Result<()> {
} }
}); });
// consecutive wallet syncs handled by task that triggers sync tokio::spawn(wallet_sync::new(wallet, wallet_feed_sender));
let wallet_sync_interval = Duration::from_secs(10);
tokio::spawn({
let cfd_actor_inbox = cfd_maker_actor_inbox.clone();
async move {
loop {
cfd_actor_inbox
.do_send_async(maker_cfd_actor::SyncWallet)
.await
.unwrap();
tokio::time::sleep(wallet_sync_interval).await;
}
}
});
cfd_maker_actor_inbox
.do_send_async(maker_cfd_actor::Initialized(
maker_inc_connections_address.clone(),
))
.await
.expect("not to fail after actors were initialized");
Ok(rocket.manage(cfd_maker_actor_inbox)) Ok(rocket.manage(cfd_maker_actor_inbox))
}, },

116
daemon/src/maker_cfd_actor.rs → daemon/src/maker_cfd.rs

@ -3,21 +3,19 @@ use crate::db::{
insert_cfd, insert_new_cfd_state_by_order_id, insert_order, load_all_cfds, insert_cfd, insert_new_cfd_state_by_order_id, insert_order, load_all_cfds,
load_cfd_by_order_id, load_order_by_id, load_cfd_by_order_id, load_order_by_id,
}; };
use crate::maker_inc_connections_actor::{MakerIncConnectionsActor, TakerCommand}; use crate::maker_inc_connections::TakerCommand;
use crate::model::cfd::{Cfd, CfdState, CfdStateCommon, Dlc, Order, OrderId}; use crate::model::cfd::{Cfd, CfdState, CfdStateCommon, Dlc, Order, OrderId};
use crate::model::{TakerId, Usd, WalletInfo}; use crate::model::{TakerId, Usd};
use crate::wallet::Wallet; use crate::wallet::Wallet;
use crate::wire::SetupMsg; use crate::wire::SetupMsg;
use crate::{maker_inc_connections_actor, setup_contract_actor}; use crate::{maker_inc_connections, setup_contract_actor};
use anyhow::{Context as AnyhowContext, Result}; use anyhow::Result;
use async_trait::async_trait; use async_trait::async_trait;
use bdk::bitcoin::secp256k1::schnorrsig; use bdk::bitcoin::secp256k1::schnorrsig;
use std::time::SystemTime; use std::time::SystemTime;
use tokio::sync::{mpsc, watch}; use tokio::sync::{mpsc, watch};
use xtra::prelude::*; use xtra::prelude::*;
pub struct Initialized(pub Address<MakerIncConnectionsActor>);
pub struct TakeOrder { pub struct TakeOrder {
pub taker_id: TakerId, pub taker_id: TakerId,
pub order_id: OrderId, pub order_id: OrderId,
@ -45,16 +43,13 @@ pub struct CfdSetupCompleted {
pub dlc: Dlc, pub dlc: Dlc,
} }
pub struct SyncWallet; pub struct Actor {
pub struct MakerCfdActor {
db: sqlx::SqlitePool, db: sqlx::SqlitePool,
wallet: Wallet, wallet: Wallet,
oracle_pk: schnorrsig::PublicKey, oracle_pk: schnorrsig::PublicKey,
cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>, cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>,
order_feed_sender: watch::Sender<Option<Order>>, order_feed_sender: watch::Sender<Option<Order>>,
wallet_feed_sender: watch::Sender<WalletInfo>, takers: Address<maker_inc_connections::Actor>,
takers: Option<Address<MakerIncConnectionsActor>>,
current_order_id: Option<OrderId>, current_order_id: Option<OrderId>,
current_contract_setup: Option<mpsc::UnboundedSender<SetupMsg>>, current_contract_setup: Option<mpsc::UnboundedSender<SetupMsg>>,
// TODO: Move the contract setup into a dedicated actor and send messages to that actor that // TODO: Move the contract setup into a dedicated actor and send messages to that actor that
@ -62,14 +57,14 @@ pub struct MakerCfdActor {
contract_setup_message_buffer: Vec<SetupMsg>, contract_setup_message_buffer: Vec<SetupMsg>,
} }
impl MakerCfdActor { impl Actor {
pub async fn new( pub async fn new(
db: sqlx::SqlitePool, db: sqlx::SqlitePool,
wallet: Wallet, wallet: Wallet,
oracle_pk: schnorrsig::PublicKey, oracle_pk: schnorrsig::PublicKey,
cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>, cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>,
order_feed_sender: watch::Sender<Option<Order>>, order_feed_sender: watch::Sender<Option<Order>>,
wallet_feed_sender: watch::Sender<WalletInfo>, takers: Address<maker_inc_connections::Actor>,
) -> Result<Self> { ) -> Result<Self> {
let mut conn = db.acquire().await?; let mut conn = db.acquire().await?;
@ -82,8 +77,7 @@ impl MakerCfdActor {
oracle_pk, oracle_pk,
cfd_feed_actor_inbox, cfd_feed_actor_inbox,
order_feed_sender, order_feed_sender,
wallet_feed_sender, takers,
takers: None,
current_order_id: None, current_order_id: None,
current_contract_setup: None, current_contract_setup: None,
contract_setup_message_buffer: vec![], contract_setup_message_buffer: vec![],
@ -104,18 +98,12 @@ impl MakerCfdActor {
self.order_feed_sender.send(Some(order.clone()))?; self.order_feed_sender.send(Some(order.clone()))?;
// 4. Inform connected takers // 4. Inform connected takers
self.takers()? self.takers
.do_send_async(maker_inc_connections_actor::BroadcastOrder(Some(order))) .do_send_async(maker_inc_connections::BroadcastOrder(Some(order)))
.await?; .await?;
Ok(()) Ok(())
} }
fn takers(&self) -> Result<&Address<MakerIncConnectionsActor>> {
self.takers
.as_ref()
.context("Maker inc connections actor to be initialised")
}
async fn handle_new_taker_online(&mut self, msg: NewTakerOnline) -> Result<()> { async fn handle_new_taker_online(&mut self, msg: NewTakerOnline) -> Result<()> {
let mut conn = self.db.acquire().await?; let mut conn = self.db.acquire().await?;
@ -124,8 +112,8 @@ impl MakerCfdActor {
None => None, None => None,
}; };
self.takers()? self.takers
.do_send_async(maker_inc_connections_actor::TakerMessage { .do_send_async(maker_inc_connections::TakerMessage {
taker_id: msg.id, taker_id: msg.id,
command: TakerCommand::SendOrder { command: TakerCommand::SendOrder {
order: current_order, order: current_order,
@ -149,12 +137,6 @@ impl MakerCfdActor {
Ok(()) Ok(())
} }
async fn handle_sync_wallet(&mut self) -> Result<()> {
let wallet_info = self.wallet.sync().await?;
self.wallet_feed_sender.send(wallet_info)?;
Ok(())
}
async fn handle_cfd_setup_completed(&mut self, msg: CfdSetupCompleted) -> Result<()> { async fn handle_cfd_setup_completed(&mut self, msg: CfdSetupCompleted) -> Result<()> {
let mut conn = self.db.acquire().await?; let mut conn = self.db.acquire().await?;
@ -202,8 +184,8 @@ impl MakerCfdActor {
load_order_by_id(current_order_id, &mut conn).await? load_order_by_id(current_order_id, &mut conn).await?
} }
_ => { _ => {
self.takers()? self.takers
.do_send_async(maker_inc_connections_actor::TakerMessage { .do_send_async(maker_inc_connections::TakerMessage {
taker_id, taker_id,
command: TakerCommand::NotifyInvalidOrderId { id: order_id }, command: TakerCommand::NotifyInvalidOrderId { id: order_id },
}) })
@ -231,8 +213,8 @@ impl MakerCfdActor {
// 3. Remove current order // 3. Remove current order
self.current_order_id = None; self.current_order_id = None;
self.takers()? self.takers
.do_send_async(maker_inc_connections_actor::BroadcastOrder(None)) .do_send_async(maker_inc_connections::BroadcastOrder(None))
.await?; .await?;
self.order_feed_sender.send(None)?; self.order_feed_sender.send(None)?;
@ -275,8 +257,8 @@ impl MakerCfdActor {
.await .await
.unwrap(); .unwrap();
self.takers()? self.takers
.do_send_async(maker_inc_connections_actor::TakerMessage { .do_send_async(maker_inc_connections::TakerMessage {
taker_id, taker_id,
command: TakerCommand::NotifyOrderAccepted { id: msg.order_id }, command: TakerCommand::NotifyOrderAccepted { id: msg.order_id },
}) })
@ -284,8 +266,10 @@ impl MakerCfdActor {
self.cfd_feed_actor_inbox self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?; .send(load_all_cfds(&mut conn).await?)?;
self.takers()? // 3. Remove current order
.do_send_async(maker_inc_connections_actor::BroadcastOrder(None)) self.current_order_id = None;
self.takers
.do_send_async(maker_inc_connections::BroadcastOrder(None))
.await?; .await?;
self.current_order_id = None; self.current_order_id = None;
self.order_feed_sender.send(None)?; self.order_feed_sender.send(None)?;
@ -303,14 +287,12 @@ impl MakerCfdActor {
let (actor, inbox) = setup_contract_actor::new( let (actor, inbox) = setup_contract_actor::new(
{ {
let inbox = self.takers()?.clone(); let inbox = self.takers.clone();
move |msg| { move |msg| {
tokio::spawn( tokio::spawn(inbox.do_send_async(maker_inc_connections::TakerMessage {
inbox.do_send_async(maker_inc_connections_actor::TakerMessage {
taker_id, taker_id,
command: TakerCommand::OutProtocolMsg { setup_msg: msg }, command: TakerCommand::OutProtocolMsg { setup_msg: msg },
}), }));
);
} }
}, },
setup_contract_actor::OwnParams::Maker(maker_params), setup_contract_actor::OwnParams::Maker(maker_params),
@ -385,8 +367,8 @@ impl MakerCfdActor {
.await .await
.unwrap(); .unwrap();
self.takers()? self.takers
.do_send_async(maker_inc_connections_actor::TakerMessage { .do_send_async(maker_inc_connections::TakerMessage {
taker_id, taker_id,
command: TakerCommand::NotifyOrderRejected { id: msg.order_id }, command: TakerCommand::NotifyOrderRejected { id: msg.order_id },
}) })
@ -396,8 +378,8 @@ impl MakerCfdActor {
// Remove order for all // Remove order for all
self.current_order_id = None; self.current_order_id = None;
self.takers()? self.takers
.do_send_async(maker_inc_connections_actor::BroadcastOrder(None)) .do_send_async(maker_inc_connections::BroadcastOrder(None))
.await?; .await?;
self.order_feed_sender.send(None)?; self.order_feed_sender.send(None)?;
@ -406,72 +388,54 @@ impl MakerCfdActor {
} }
#[async_trait] #[async_trait]
impl Handler<Initialized> for MakerCfdActor { impl Handler<TakeOrder> for Actor {
async fn handle(&mut self, msg: Initialized, _ctx: &mut Context<Self>) {
self.takers.replace(msg.0);
}
}
#[async_trait]
impl Handler<TakeOrder> for MakerCfdActor {
async fn handle(&mut self, msg: TakeOrder, _ctx: &mut Context<Self>) { async fn handle(&mut self, msg: TakeOrder, _ctx: &mut Context<Self>) {
log_error!(self.handle_take_order(msg)) log_error!(self.handle_take_order(msg))
} }
} }
#[async_trait] #[async_trait]
impl Handler<AcceptOrder> for MakerCfdActor { impl Handler<AcceptOrder> for Actor {
async fn handle(&mut self, msg: AcceptOrder, ctx: &mut Context<Self>) { async fn handle(&mut self, msg: AcceptOrder, ctx: &mut Context<Self>) {
log_error!(self.handle_accept_order(msg, ctx)) log_error!(self.handle_accept_order(msg, ctx))
} }
} }
#[async_trait] #[async_trait]
impl Handler<RejectOrder> for MakerCfdActor { impl Handler<RejectOrder> for Actor {
async fn handle(&mut self, msg: RejectOrder, _ctx: &mut Context<Self>) { async fn handle(&mut self, msg: RejectOrder, _ctx: &mut Context<Self>) {
log_error!(self.handle_reject_order(msg)) log_error!(self.handle_reject_order(msg))
} }
} }
#[async_trait] #[async_trait]
impl Handler<NewOrder> for MakerCfdActor { impl Handler<NewOrder> for Actor {
async fn handle(&mut self, msg: NewOrder, _ctx: &mut Context<Self>) { async fn handle(&mut self, msg: NewOrder, _ctx: &mut Context<Self>) {
log_error!(self.handle_new_order(msg)); log_error!(self.handle_new_order(msg));
} }
} }
#[async_trait] #[async_trait]
impl Handler<NewTakerOnline> for MakerCfdActor { impl Handler<NewTakerOnline> for Actor {
async fn handle(&mut self, msg: NewTakerOnline, _ctx: &mut Context<Self>) { async fn handle(&mut self, msg: NewTakerOnline, _ctx: &mut Context<Self>) {
log_error!(self.handle_new_taker_online(msg)); log_error!(self.handle_new_taker_online(msg));
} }
} }
#[async_trait] #[async_trait]
impl Handler<IncProtocolMsg> for MakerCfdActor { impl Handler<IncProtocolMsg> for Actor {
async fn handle(&mut self, msg: IncProtocolMsg, _ctx: &mut Context<Self>) { async fn handle(&mut self, msg: IncProtocolMsg, _ctx: &mut Context<Self>) {
log_error!(self.handle_inc_protocol_msg(msg)); log_error!(self.handle_inc_protocol_msg(msg));
} }
} }
#[async_trait] #[async_trait]
impl Handler<CfdSetupCompleted> for MakerCfdActor { impl Handler<CfdSetupCompleted> for Actor {
async fn handle(&mut self, msg: CfdSetupCompleted, _ctx: &mut Context<Self>) { async fn handle(&mut self, msg: CfdSetupCompleted, _ctx: &mut Context<Self>) {
log_error!(self.handle_cfd_setup_completed(msg)); log_error!(self.handle_cfd_setup_completed(msg));
} }
} }
#[async_trait]
impl Handler<SyncWallet> for MakerCfdActor {
async fn handle(&mut self, _msg: SyncWallet, _ctx: &mut Context<Self>) {
log_error!(self.handle_sync_wallet());
}
}
impl Message for Initialized {
type Result = ();
}
impl Message for TakeOrder { impl Message for TakeOrder {
type Result = (); type Result = ();
} }
@ -492,10 +456,6 @@ impl Message for CfdSetupCompleted {
type Result = (); type Result = ();
} }
impl Message for SyncWallet {
type Result = ();
}
impl Message for AcceptOrder { impl Message for AcceptOrder {
type Result = (); type Result = ();
} }
@ -504,4 +464,4 @@ impl Message for RejectOrder {
type Result = (); type Result = ();
} }
impl xtra::Actor for MakerCfdActor {} impl xtra::Actor for Actor {}

34
daemon/src/maker_inc_connections_actor.rs → daemon/src/maker_inc_connections.rs

@ -1,17 +1,15 @@
use crate::actors::log_error; use crate::actors::log_error;
use crate::maker_cfd_actor::MakerCfdActor;
use crate::model::cfd::{Order, OrderId}; use crate::model::cfd::{Order, OrderId};
use crate::model::TakerId; use crate::model::TakerId;
use crate::wire::SetupMsg; use crate::wire::SetupMsg;
use crate::{maker_cfd_actor, wire}; use crate::{maker_cfd, wire};
use anyhow::{Context as AnyhowContext, Result}; use anyhow::{Context as AnyhowContext, Result};
use async_trait::async_trait;
use futures::{Future, StreamExt}; use futures::{Future, StreamExt};
use std::collections::HashMap; use std::collections::HashMap;
use tokio::net::tcp::OwnedReadHalf; use tokio::net::tcp::OwnedReadHalf;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio_util::codec::{FramedRead, LengthDelimitedCodec}; use tokio_util::codec::{FramedRead, LengthDelimitedCodec};
use async_trait::async_trait;
use xtra::prelude::*; use xtra::prelude::*;
type MakerToTakerSender = mpsc::UnboundedSender<wire::MakerToTaker>; type MakerToTakerSender = mpsc::UnboundedSender<wire::MakerToTaker>;
@ -49,18 +47,18 @@ impl Message for NewTakerOnline {
type Result = Result<()>; type Result = Result<()>;
} }
pub struct MakerIncConnectionsActor { pub struct Actor {
write_connections: HashMap<TakerId, MakerToTakerSender>, write_connections: HashMap<TakerId, MakerToTakerSender>,
cfd_maker_actor_address: Address<MakerCfdActor>, cfd_actor: Address<maker_cfd::Actor>,
} }
impl Actor for MakerIncConnectionsActor {} impl xtra::Actor for Actor {}
impl MakerIncConnectionsActor { impl Actor {
pub fn new(cfd_maker_actor_address: Address<MakerCfdActor>) -> Self { pub fn new(cfd_actor: Address<maker_cfd::Actor>) -> Self {
Self { Self {
write_connections: HashMap::<TakerId, MakerToTakerSender>::new(), write_connections: HashMap::<TakerId, MakerToTakerSender>::new(),
cfd_maker_actor_address, cfd_actor,
} }
} }
@ -103,8 +101,8 @@ impl MakerIncConnectionsActor {
} }
async fn handle_new_taker_online(&mut self, msg: NewTakerOnline) -> Result<()> { async fn handle_new_taker_online(&mut self, msg: NewTakerOnline) -> Result<()> {
self.cfd_maker_actor_address self.cfd_actor
.do_send_async(maker_cfd_actor::NewTakerOnline { id: msg.taker_id }) .do_send_async(maker_cfd::NewTakerOnline { id: msg.taker_id })
.await?; .await?;
self.write_connections self.write_connections
@ -122,7 +120,7 @@ macro_rules! log_error {
} }
#[async_trait] #[async_trait]
impl Handler<BroadcastOrder> for MakerIncConnectionsActor { impl Handler<BroadcastOrder> for Actor {
async fn handle(&mut self, msg: BroadcastOrder, _ctx: &mut Context<Self>) -> Result<()> { async fn handle(&mut self, msg: BroadcastOrder, _ctx: &mut Context<Self>) -> Result<()> {
log_error!(self.handle_broadcast_order(msg)); log_error!(self.handle_broadcast_order(msg));
Ok(()) Ok(())
@ -130,7 +128,7 @@ impl Handler<BroadcastOrder> for MakerIncConnectionsActor {
} }
#[async_trait] #[async_trait]
impl Handler<TakerMessage> for MakerIncConnectionsActor { impl Handler<TakerMessage> for Actor {
async fn handle(&mut self, msg: TakerMessage, _ctx: &mut Context<Self>) -> Result<()> { async fn handle(&mut self, msg: TakerMessage, _ctx: &mut Context<Self>) -> Result<()> {
log_error!(self.handle_taker_message(msg)); log_error!(self.handle_taker_message(msg));
Ok(()) Ok(())
@ -138,7 +136,7 @@ impl Handler<TakerMessage> for MakerIncConnectionsActor {
} }
#[async_trait] #[async_trait]
impl Handler<NewTakerOnline> for MakerIncConnectionsActor { impl Handler<NewTakerOnline> for Actor {
async fn handle(&mut self, msg: NewTakerOnline, _ctx: &mut Context<Self>) -> Result<()> { async fn handle(&mut self, msg: NewTakerOnline, _ctx: &mut Context<Self>) -> Result<()> {
log_error!(self.handle_new_taker_online(msg)); log_error!(self.handle_new_taker_online(msg));
Ok(()) Ok(())
@ -149,7 +147,7 @@ impl Handler<NewTakerOnline> for MakerIncConnectionsActor {
pub fn in_taker_messages( pub fn in_taker_messages(
read: OwnedReadHalf, read: OwnedReadHalf,
cfd_actor_inbox: Address<MakerCfdActor>, cfd_actor_inbox: Address<maker_cfd::Actor>,
taker_id: TakerId, taker_id: TakerId,
) -> impl Future<Output = ()> { ) -> impl Future<Output = ()> {
let mut messages = FramedRead::new(read, LengthDelimitedCodec::new()).map(|result| { let mut messages = FramedRead::new(read, LengthDelimitedCodec::new()).map(|result| {
@ -162,7 +160,7 @@ pub fn in_taker_messages(
match message { match message {
Ok(wire::TakerToMaker::TakeOrder { order_id, quantity }) => { Ok(wire::TakerToMaker::TakeOrder { order_id, quantity }) => {
cfd_actor_inbox cfd_actor_inbox
.do_send_async(maker_cfd_actor::TakeOrder { .do_send_async(maker_cfd::TakeOrder {
taker_id, taker_id,
order_id, order_id,
quantity, quantity,
@ -172,7 +170,7 @@ pub fn in_taker_messages(
} }
Ok(wire::TakerToMaker::Protocol(msg)) => { Ok(wire::TakerToMaker::Protocol(msg)) => {
cfd_actor_inbox cfd_actor_inbox
.do_send_async(maker_cfd_actor::IncProtocolMsg(msg)) .do_send_async(maker_cfd::IncProtocolMsg(msg))
.await .await
.unwrap(); .unwrap();
} }

14
daemon/src/routes_maker.rs

@ -1,5 +1,5 @@
use crate::auth::Authenticated; use crate::auth::Authenticated;
use crate::maker_cfd_actor::{self, MakerCfdActor}; use crate::maker_cfd;
use crate::model::cfd::{Cfd, Order, OrderId, Origin}; use crate::model::cfd::{Cfd, Order, OrderId, Origin};
use crate::model::{Usd, WalletInfo}; use crate::model::{Usd, WalletInfo};
use crate::routes::EmbeddedFileExt; use crate::routes::EmbeddedFileExt;
@ -72,7 +72,7 @@ pub struct CfdNewOrderRequest {
#[rocket::post("/order/sell", data = "<order>")] #[rocket::post("/order/sell", data = "<order>")]
pub async fn post_sell_order( pub async fn post_sell_order(
order: Json<CfdNewOrderRequest>, order: Json<CfdNewOrderRequest>,
cfd_actor_address: &State<Address<MakerCfdActor>>, cfd_actor_address: &State<Address<maker_cfd::Actor>>,
_auth: Authenticated, _auth: Authenticated,
) -> Result<status::Accepted<()>, status::BadRequest<String>> { ) -> Result<status::Accepted<()>, status::BadRequest<String>> {
let order = Order::from_default_with_price(order.price, Origin::Ours) let order = Order::from_default_with_price(order.price, Origin::Ours)
@ -81,7 +81,7 @@ pub async fn post_sell_order(
.with_max_quantity(order.max_quantity); .with_max_quantity(order.max_quantity);
cfd_actor_address cfd_actor_address
.do_send_async(maker_cfd_actor::NewOrder(order)) .do_send_async(maker_cfd::NewOrder(order))
.await .await
.expect("actor to always be available"); .expect("actor to always be available");
@ -131,11 +131,11 @@ pub struct AcceptOrRejectOrderRequest {
#[rocket::post("/order/accept", data = "<cfd_accept_order_request>")] #[rocket::post("/order/accept", data = "<cfd_accept_order_request>")]
pub async fn post_accept_order( pub async fn post_accept_order(
cfd_accept_order_request: Json<AcceptOrRejectOrderRequest>, cfd_accept_order_request: Json<AcceptOrRejectOrderRequest>,
cfd_actor_address: &State<Address<MakerCfdActor>>, cfd_actor_address: &State<Address<maker_cfd::Actor>>,
_auth: Authenticated, _auth: Authenticated,
) -> status::Accepted<()> { ) -> status::Accepted<()> {
cfd_actor_address cfd_actor_address
.do_send_async(maker_cfd_actor::AcceptOrder { .do_send_async(maker_cfd::AcceptOrder {
order_id: cfd_accept_order_request.order_id, order_id: cfd_accept_order_request.order_id,
}) })
.await .await
@ -146,11 +146,11 @@ pub async fn post_accept_order(
#[rocket::post("/order/reject", data = "<cfd_reject_order_request>")] #[rocket::post("/order/reject", data = "<cfd_reject_order_request>")]
pub async fn post_reject_order( pub async fn post_reject_order(
cfd_reject_order_request: Json<AcceptOrRejectOrderRequest>, cfd_reject_order_request: Json<AcceptOrRejectOrderRequest>,
cfd_actor_address: &State<Address<MakerCfdActor>>, cfd_actor_address: &State<Address<maker_cfd::Actor>>,
_auth: Authenticated, _auth: Authenticated,
) -> status::Accepted<()> { ) -> status::Accepted<()> {
cfd_actor_address cfd_actor_address
.do_send_async(maker_cfd_actor::RejectOrder { .do_send_async(maker_cfd::RejectOrder {
order_id: cfd_reject_order_request.order_id, order_id: cfd_reject_order_request.order_id,
}) })
.await .await

21
daemon/src/taker.rs

@ -32,6 +32,7 @@ mod taker_cfd_actor;
mod taker_inc_message_actor; mod taker_inc_message_actor;
mod to_sse_event; mod to_sse_event;
mod wallet; mod wallet;
mod wallet_sync;
mod wire; mod wire;
const CONNECTION_RETRY_INTERVAL: Duration = Duration::from_secs(5); const CONNECTION_RETRY_INTERVAL: Duration = Duration::from_secs(5);
@ -161,11 +162,10 @@ async fn main() -> Result<()> {
let cfd_actor_inbox = taker_cfd_actor::TakerCfdActor::new( let cfd_actor_inbox = taker_cfd_actor::TakerCfdActor::new(
db, db,
wallet, wallet.clone(),
schnorrsig::PublicKey::from_keypair(SECP256K1, &oracle), schnorrsig::PublicKey::from_keypair(SECP256K1, &oracle),
cfd_feed_sender, cfd_feed_sender,
order_feed_sender, order_feed_sender,
wallet_feed_sender,
out_maker_actor_inbox, out_maker_actor_inbox,
) )
.await .await
@ -176,22 +176,7 @@ async fn main() -> Result<()> {
let inc_maker_messages_actor = let inc_maker_messages_actor =
taker_inc_message_actor::new(read, cfd_actor_inbox.clone()); taker_inc_message_actor::new(read, cfd_actor_inbox.clone());
// consecutive wallet syncs handled by task that triggers sync tokio::spawn(wallet_sync::new(wallet, wallet_feed_sender));
let wallet_sync_interval = Duration::from_secs(10);
tokio::spawn({
let cfd_actor_inbox = cfd_actor_inbox.clone();
async move {
loop {
cfd_actor_inbox
.do_send_async(taker_cfd_actor::SyncWallet)
.await
.unwrap();
tokio::time::sleep(wallet_sync_interval).await;
}
}
});
tokio::spawn(inc_maker_messages_actor); tokio::spawn(inc_maker_messages_actor);
tokio::spawn(out_maker_messages_actor); tokio::spawn(out_maker_messages_actor);

24
daemon/src/taker_cfd_actor.rs

@ -5,7 +5,7 @@ use crate::db::{
use crate::actors::log_error; use crate::actors::log_error;
use crate::model::cfd::{Cfd, CfdState, CfdStateCommon, Dlc, Order, OrderId}; use crate::model::cfd::{Cfd, CfdState, CfdStateCommon, Dlc, Order, OrderId};
use crate::model::{Usd, WalletInfo}; use crate::model::Usd;
use crate::wallet::Wallet; use crate::wallet::Wallet;
use crate::wire::SetupMsg; use crate::wire::SetupMsg;
use crate::{setup_contract_actor, wire}; use crate::{setup_contract_actor, wire};
@ -16,8 +16,6 @@ use std::time::SystemTime;
use tokio::sync::{mpsc, watch}; use tokio::sync::{mpsc, watch};
use xtra::prelude::*; use xtra::prelude::*;
pub struct SyncWallet;
pub struct TakeOffer { pub struct TakeOffer {
pub order_id: OrderId, pub order_id: OrderId,
pub quantity: Usd, pub quantity: Usd,
@ -40,7 +38,6 @@ pub struct TakerCfdActor {
oracle_pk: schnorrsig::PublicKey, oracle_pk: schnorrsig::PublicKey,
cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>, cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>,
order_feed_actor_inbox: watch::Sender<Option<Order>>, order_feed_actor_inbox: watch::Sender<Option<Order>>,
wallet_feed_sender: watch::Sender<WalletInfo>,
out_msg_maker_inbox: mpsc::UnboundedSender<wire::TakerToMaker>, out_msg_maker_inbox: mpsc::UnboundedSender<wire::TakerToMaker>,
current_contract_setup: Option<mpsc::UnboundedSender<SetupMsg>>, current_contract_setup: Option<mpsc::UnboundedSender<SetupMsg>>,
// TODO: Move the contract setup into a dedicated actor and send messages to that actor that // TODO: Move the contract setup into a dedicated actor and send messages to that actor that
@ -55,7 +52,6 @@ impl TakerCfdActor {
oracle_pk: schnorrsig::PublicKey, oracle_pk: schnorrsig::PublicKey,
cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>, cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>,
order_feed_actor_inbox: watch::Sender<Option<Order>>, order_feed_actor_inbox: watch::Sender<Option<Order>>,
wallet_feed_sender: watch::Sender<WalletInfo>,
out_msg_maker_inbox: mpsc::UnboundedSender<wire::TakerToMaker>, out_msg_maker_inbox: mpsc::UnboundedSender<wire::TakerToMaker>,
) -> Result<Self> { ) -> Result<Self> {
let mut conn = db.acquire().await?; let mut conn = db.acquire().await?;
@ -67,19 +63,12 @@ impl TakerCfdActor {
oracle_pk, oracle_pk,
cfd_feed_actor_inbox, cfd_feed_actor_inbox,
order_feed_actor_inbox, order_feed_actor_inbox,
wallet_feed_sender,
out_msg_maker_inbox, out_msg_maker_inbox,
current_contract_setup: None, current_contract_setup: None,
contract_setup_message_buffer: vec![], contract_setup_message_buffer: vec![],
}) })
} }
async fn handle_sync_wallet(&mut self) -> Result<()> {
let wallet_info = self.wallet.sync().await?;
self.wallet_feed_sender.send(wallet_info)?;
Ok(())
}
async fn handle_take_offer(&mut self, order_id: OrderId, quantity: Usd) -> Result<()> { async fn handle_take_offer(&mut self, order_id: OrderId, quantity: Usd) -> Result<()> {
let mut conn = self.db.acquire().await?; let mut conn = self.db.acquire().await?;
@ -250,13 +239,6 @@ impl TakerCfdActor {
} }
} }
#[async_trait]
impl Handler<SyncWallet> for TakerCfdActor {
async fn handle(&mut self, _msg: SyncWallet, _ctx: &mut Context<Self>) {
log_error!(self.handle_sync_wallet());
}
}
#[async_trait] #[async_trait]
impl Handler<TakeOffer> for TakerCfdActor { impl Handler<TakeOffer> for TakerCfdActor {
async fn handle(&mut self, msg: TakeOffer, _ctx: &mut Context<Self>) { async fn handle(&mut self, msg: TakeOffer, _ctx: &mut Context<Self>) {
@ -299,10 +281,6 @@ impl Handler<CfdSetupCompleted> for TakerCfdActor {
} }
} }
impl Message for SyncWallet {
type Result = ();
}
impl Message for TakeOffer { impl Message for TakeOffer {
type Result = (); type Result = ();
} }

24
daemon/src/wallet_sync.rs

@ -0,0 +1,24 @@
use crate::wallet::Wallet;
use crate::WalletInfo;
use std::time::Duration;
use tokio::sync::watch;
use tokio::time::sleep;
pub async fn new(wallet: Wallet, sender: watch::Sender<WalletInfo>) {
loop {
sleep(Duration::from_secs(10)).await;
let info = match wallet.sync().await {
Ok(info) => info,
Err(e) => {
tracing::warn!("Failed to sync wallet: {:#}", e);
continue;
}
};
if sender.send(info).is_err() {
tracing::warn!("Wallet feed receiver not available, stopping wallet sync");
break;
}
}
}
Loading…
Cancel
Save