Browse Source

Initialize the addresses to other actors without `Option`

By creating the `Context` first, we can obtain an address without
actually instantiating the actor. This allows us to have two actors
own a reference to each other without the use of `Option`.
fix-bad-api-calls
Thomas Eizinger 3 years ago
parent
commit
e69ffc86b9
No known key found for this signature in database GPG Key ID: 651AC83A6C6C8B96
  1. 20
      daemon/src/maker.rs
  2. 46
      daemon/src/maker_cfd_actor.rs

20
daemon/src/maker.rs

@ -157,6 +157,9 @@ async fn main() -> Result<()> {
None => return Err(rocket),
};
let (maker_inc_connections_address, maker_inc_connections_context) =
xtra::Context::new(None);
let cfd_maker_actor_inbox = maker_cfd_actor::MakerCfdActor::new(
db,
wallet,
@ -164,16 +167,18 @@ async fn main() -> Result<()> {
cfd_feed_sender,
order_feed_sender,
wallet_feed_sender,
maker_inc_connections_address.clone(),
)
.await
.unwrap()
.create(None)
.spawn_global();
let maker_inc_connections_address =
maker_inc_connections::Actor::new(cfd_maker_actor_inbox.clone())
.create(None)
.spawn_global();
tokio::spawn(
maker_inc_connections_context.run(maker_inc_connections::Actor::new(
cfd_maker_actor_inbox.clone(),
)),
);
tokio::spawn({
let cfd_maker_actor_inbox = cfd_maker_actor_inbox.clone();
@ -223,13 +228,6 @@ async fn main() -> Result<()> {
}
});
cfd_maker_actor_inbox
.do_send_async(maker_cfd_actor::Initialized(
maker_inc_connections_address.clone(),
))
.await
.expect("not to fail after actors were initialized");
Ok(rocket.manage(cfd_maker_actor_inbox))
},
))

46
daemon/src/maker_cfd_actor.rs

@ -9,15 +9,13 @@ use crate::model::{TakerId, Usd, WalletInfo};
use crate::wallet::Wallet;
use crate::wire::SetupMsg;
use crate::{maker_inc_connections, setup_contract_actor};
use anyhow::{Context as AnyhowContext, Result};
use anyhow::Result;
use async_trait::async_trait;
use bdk::bitcoin::secp256k1::schnorrsig;
use std::time::SystemTime;
use tokio::sync::{mpsc, watch};
use xtra::prelude::*;
pub struct Initialized(pub Address<maker_inc_connections::Actor>);
pub struct TakeOrder {
pub taker_id: TakerId,
pub order_id: OrderId,
@ -54,7 +52,7 @@ pub struct MakerCfdActor {
cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>,
order_feed_sender: watch::Sender<Option<Order>>,
wallet_feed_sender: watch::Sender<WalletInfo>,
takers: Option<Address<maker_inc_connections::Actor>>,
takers: Address<maker_inc_connections::Actor>,
current_order_id: Option<OrderId>,
current_contract_setup: Option<mpsc::UnboundedSender<SetupMsg>>,
// TODO: Move the contract setup into a dedicated actor and send messages to that actor that
@ -70,6 +68,7 @@ impl MakerCfdActor {
cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>,
order_feed_sender: watch::Sender<Option<Order>>,
wallet_feed_sender: watch::Sender<WalletInfo>,
takers: Address<maker_inc_connections::Actor>,
) -> Result<Self> {
let mut conn = db.acquire().await?;
@ -83,7 +82,7 @@ impl MakerCfdActor {
cfd_feed_actor_inbox,
order_feed_sender,
wallet_feed_sender,
takers: None,
takers,
current_order_id: None,
current_contract_setup: None,
contract_setup_message_buffer: vec![],
@ -104,18 +103,12 @@ impl MakerCfdActor {
self.order_feed_sender.send(Some(order.clone()))?;
// 4. Inform connected takers
self.takers()?
self.takers
.do_send_async(maker_inc_connections::BroadcastOrder(Some(order)))
.await?;
Ok(())
}
fn takers(&self) -> Result<&Address<maker_inc_connections::Actor>> {
self.takers
.as_ref()
.context("Maker inc connections actor to be initialised")
}
async fn handle_new_taker_online(&mut self, msg: NewTakerOnline) -> Result<()> {
let mut conn = self.db.acquire().await?;
@ -124,7 +117,7 @@ impl MakerCfdActor {
None => None,
};
self.takers()?
self.takers
.do_send_async(maker_inc_connections::TakerMessage {
taker_id: msg.id,
command: TakerCommand::SendOrder {
@ -202,7 +195,7 @@ impl MakerCfdActor {
load_order_by_id(current_order_id, &mut conn).await?
}
_ => {
self.takers()?
self.takers
.do_send_async(maker_inc_connections::TakerMessage {
taker_id,
command: TakerCommand::NotifyInvalidOrderId { id: order_id },
@ -231,7 +224,7 @@ impl MakerCfdActor {
// 3. Remove current order
self.current_order_id = None;
self.takers()?
self.takers
.do_send_async(maker_inc_connections::BroadcastOrder(None))
.await?;
self.order_feed_sender.send(None)?;
@ -275,7 +268,7 @@ impl MakerCfdActor {
.await
.unwrap();
self.takers()?
self.takers
.do_send_async(maker_inc_connections::TakerMessage {
taker_id,
command: TakerCommand::NotifyOrderAccepted { id: msg.order_id },
@ -284,7 +277,9 @@ impl MakerCfdActor {
self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
self.takers()?
// 3. Remove current order
self.current_order_id = None;
self.takers
.do_send_async(maker_inc_connections::BroadcastOrder(None))
.await?;
self.current_order_id = None;
@ -303,7 +298,7 @@ impl MakerCfdActor {
let (actor, inbox) = setup_contract_actor::new(
{
let inbox = self.takers()?.clone();
let inbox = self.takers.clone();
move |msg| {
tokio::spawn(inbox.do_send_async(maker_inc_connections::TakerMessage {
taker_id,
@ -383,7 +378,7 @@ impl MakerCfdActor {
.await
.unwrap();
self.takers()?
self.takers
.do_send_async(maker_inc_connections::TakerMessage {
taker_id,
command: TakerCommand::NotifyOrderRejected { id: msg.order_id },
@ -394,7 +389,7 @@ impl MakerCfdActor {
// Remove order for all
self.current_order_id = None;
self.takers()?
self.takers
.do_send_async(maker_inc_connections::BroadcastOrder(None))
.await?;
self.order_feed_sender.send(None)?;
@ -403,13 +398,6 @@ impl MakerCfdActor {
}
}
#[async_trait]
impl Handler<Initialized> for MakerCfdActor {
async fn handle(&mut self, msg: Initialized, _ctx: &mut Context<Self>) {
self.takers.replace(msg.0);
}
}
#[async_trait]
impl Handler<TakeOrder> for MakerCfdActor {
async fn handle(&mut self, msg: TakeOrder, _ctx: &mut Context<Self>) {
@ -466,10 +454,6 @@ impl Handler<SyncWallet> for MakerCfdActor {
}
}
impl Message for Initialized {
type Result = ();
}
impl Message for TakeOrder {
type Result = ();
}

Loading…
Cancel
Save