Browse Source

Make maker_cfd::Actor generic over other actor addresses

testing
Lucas Soriano del Pino 3 years ago
committed by Thomas Eizinger
parent
commit
a2b2a04a3a
No known key found for this signature in database GPG Key ID: 651AC83A6C6C8B96
  1. 677
      daemon/src/maker_cfd.rs

677
daemon/src/maker_cfd.rs

@ -16,8 +16,8 @@ use bdk::bitcoin::secp256k1::schnorrsig;
use cfd_protocol::secp256k1_zkp::Signature;
use futures::channel::mpsc;
use futures::{future, SinkExt};
use rocket_db_pools::sqlx::Sqlite;
use sqlx::pool::PoolConnection;
use sqlx::Sqlite;
use std::collections::HashMap;
use std::time::SystemTime;
use tokio::sync::watch;
@ -58,19 +58,19 @@ pub struct FromTaker {
pub msg: wire::TakerToMaker,
}
pub struct Actor {
pub struct Actor<O, M, T> {
db: sqlx::SqlitePool,
wallet: Wallet,
oracle_pk: schnorrsig::PublicKey,
cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>,
order_feed_sender: watch::Sender<Option<Order>>,
update_cfd_feed_sender: watch::Sender<UpdateCfdProposals>,
takers: Address<maker_inc_connections::Actor>,
takers: Address<T>,
current_order_id: Option<OrderId>,
monitor_actor: Address<monitor::Actor>,
monitor_actor: Address<M>,
setup_state: SetupState,
roll_over_state: RollOverState,
oracle_actor: Address<oracle::Actor>,
oracle_actor: Address<O>,
// Maker needs to also store TakerId to be able to send a reply back
current_pending_proposals: HashMap<OrderId, (UpdateCfdProposal, TakerId)>,
// TODO: Persist instead of using an in-memory hashmap for resiliency?
@ -95,7 +95,7 @@ enum RollOverState {
None,
}
impl Actor {
impl<O, M, T> Actor<O, M, T> {
#[allow(clippy::too_many_arguments)]
pub fn new(
db: sqlx::SqlitePool,
@ -104,9 +104,9 @@ impl Actor {
cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>,
order_feed_sender: watch::Sender<Option<Order>>,
update_cfd_feed_sender: watch::Sender<UpdateCfdProposals>,
takers: Address<maker_inc_connections::Actor>,
monitor_actor: Address<monitor::Actor>,
oracle_actor: Address<oracle::Actor>,
takers: Address<T>,
monitor_actor: Address<M>,
oracle_actor: Address<O>,
) -> Self {
Self {
db,
@ -126,104 +126,55 @@ impl Actor {
}
}
/// Send pending proposals for the purposes of UI updates.
/// Filters out the TakerIds, as they are an implementation detail inside of
/// the actor
fn send_pending_proposals(&self) -> Result<()> {
Ok(self.update_cfd_feed_sender.send(
self.current_pending_proposals
.iter()
.map(|(order_id, (update_cfd, _))| (*order_id, (update_cfd.clone())))
.collect(),
)?)
}
fn get_taker_id_of_proposal(&self, order_id: &OrderId) -> Result<TakerId> {
let (_, taker_id) = self
.current_pending_proposals
.get(order_id)
.context("Could not find proposal for given order id")?;
Ok(*taker_id)
}
async fn handle_commit(&mut self, order_id: OrderId) -> Result<()> {
let mut conn = self.db.acquire().await?;
cfd_actors::handle_commit(
order_id,
&mut conn,
&self.wallet,
&self.cfd_feed_actor_inbox,
)
.await?;
/// Removes a proposal and updates the update cfd proposals' feed
fn remove_pending_proposal(&mut self, order_id: &OrderId) -> Result<()> {
if self.current_pending_proposals.remove(order_id).is_none() {
anyhow::bail!("Could not find proposal with order id: {}", &order_id)
}
self.send_pending_proposals()?;
Ok(())
}
fn get_settlement_proposal(&self, order_id: OrderId) -> Result<(SettlementProposal, TakerId)> {
let (update_proposal, taker_id) = self
.current_pending_proposals
.get(&order_id)
.context("have a proposal that is about to be accepted")?;
let proposal = match update_proposal {
UpdateCfdProposal::Settlement { proposal, .. } => proposal,
UpdateCfdProposal::RollOverProposal { .. } => {
anyhow::bail!("did not expect a rollover proposal");
}
};
Ok((proposal.clone(), *taker_id))
}
async fn handle_new_order(
async fn handle_propose_roll_over(
&mut self,
price: Usd,
min_quantity: Usd,
max_quantity: Usd,
proposal: RollOverProposal,
taker_id: TakerId,
) -> Result<()> {
let oracle_event_id =
oracle::next_announcement_after(time::OffsetDateTime::now_utc() + Order::TERM)?;
self.oracle_actor
.do_send_async(oracle::FetchAnnouncement(oracle_event_id))
.await?;
let order = Order::new(
price,
min_quantity,
max_quantity,
Origin::Ours,
oracle_event_id,
)?;
// 1. Save to DB
let mut conn = self.db.acquire().await?;
insert_order(&order, &mut conn).await?;
// 2. Update actor state to current order
self.current_order_id.replace(order.id);
// 3. Notify UI via feed
self.order_feed_sender.send(Some(order.clone()))?;
// 4. Inform connected takers
self.takers
.do_send_async(maker_inc_connections::BroadcastOrder(Some(order)))
.await?;
Ok(())
}
tracing::info!(
"Received proposal from the taker {}: {:?} to roll over order {}",
taker_id,
proposal,
proposal.order_id
);
async fn handle_new_taker_online(&mut self, taker_id: TakerId) -> Result<()> {
// check if CFD is in open state, otherwise we should not proceed
let mut conn = self.db.acquire().await?;
let current_order = match self.current_order_id {
Some(current_order_id) => Some(load_order_by_id(current_order_id, &mut conn).await?),
None => None,
let cfd = load_cfd_by_order_id(proposal.order_id, &mut conn).await?;
match cfd {
Cfd {
state: CfdState::Open { .. },
..
} => (),
_ => {
anyhow::bail!("Order is in invalid state. Cannot propose roll over.")
}
};
self.takers
.do_send_async(maker_inc_connections::TakerMessage {
taker_id,
command: TakerCommand::SendOrder {
order: current_order,
self.current_pending_proposals.insert(
proposal.order_id,
(
UpdateCfdProposal::RollOverProposal {
proposal,
direction: SettlementKind::Incoming,
},
})
.await?;
taker_id,
),
);
self.send_pending_proposals()?;
Ok(())
}
@ -309,43 +260,27 @@ impl Actor {
Ok(())
}
async fn handle_propose_roll_over(
&mut self,
proposal: RollOverProposal,
taker_id: TakerId,
) -> Result<()> {
tracing::info!(
"Received proposal from the taker {}: {:?} to roll over order {}",
taker_id,
proposal,
proposal.order_id
);
// check if CFD is in open state, otherwise we should not proceed
async fn handle_monitoring_event(&mut self, event: monitor::Event) -> Result<()> {
let mut conn = self.db.acquire().await?;
let cfd = load_cfd_by_order_id(proposal.order_id, &mut conn).await?;
match cfd {
Cfd {
state: CfdState::Open { .. },
..
} => (),
_ => {
anyhow::bail!("Order is in invalid state. Cannot propose roll over.")
}
};
self.current_pending_proposals.insert(
proposal.order_id,
(
UpdateCfdProposal::RollOverProposal {
proposal,
direction: SettlementKind::Incoming,
},
taker_id,
),
);
self.send_pending_proposals()?;
cfd_actors::handle_monitoring_event(
event,
&mut conn,
&self.wallet,
&self.cfd_feed_actor_inbox,
)
.await?;
Ok(())
}
async fn handle_oracle_attestation(&mut self, attestation: oracle::Attestation) -> Result<()> {
let mut conn = self.db.acquire().await?;
cfd_actors::handle_oracle_attestation(
attestation,
&mut conn,
&self.wallet,
&self.cfd_feed_actor_inbox,
)
.await?;
Ok(())
}
@ -387,78 +322,146 @@ impl Actor {
Ok(())
}
async fn handle_cfd_setup_completed(
&mut self,
order_id: OrderId,
dlc: Result<Dlc>,
) -> Result<()> {
self.setup_state = SetupState::None;
/// Send pending proposals for the purposes of UI updates.
/// Filters out the TakerIds, as they are an implementation detail inside of
/// the actor
fn send_pending_proposals(&self) -> Result<()> {
Ok(self.update_cfd_feed_sender.send(
self.current_pending_proposals
.iter()
.map(|(order_id, (update_cfd, _))| (*order_id, (update_cfd.clone())))
.collect(),
)?)
}
let dlc = dlc.context("Failed to setup contract with taker")?;
/// Removes a proposal and updates the update cfd proposals' feed
fn remove_pending_proposal(&mut self, order_id: &OrderId) -> Result<()> {
if self.current_pending_proposals.remove(order_id).is_none() {
anyhow::bail!("Could not find proposal with order id: {}", &order_id)
}
self.send_pending_proposals()?;
Ok(())
}
let mut conn = self.db.acquire().await?;
fn get_taker_id_of_proposal(&self, order_id: &OrderId) -> Result<TakerId> {
let (_, taker_id) = self
.current_pending_proposals
.get(order_id)
.context("Could not find proposal for given order id")?;
Ok(*taker_id)
}
let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
cfd.state = CfdState::PendingOpen {
common: CfdStateCommon::default(),
dlc: dlc.clone(),
attestation: None,
fn get_settlement_proposal(&self, order_id: OrderId) -> Result<(SettlementProposal, TakerId)> {
let (update_proposal, taker_id) = self
.current_pending_proposals
.get(&order_id)
.context("have a proposal that is about to be accepted")?;
let proposal = match update_proposal {
UpdateCfdProposal::Settlement { proposal, .. } => proposal,
UpdateCfdProposal::RollOverProposal { .. } => {
anyhow::bail!("did not expect a rollover proposal");
}
};
Ok((proposal.clone(), *taker_id))
}
}
append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?;
impl<O, M, T> Actor<O, M, T>
where
T: xtra::Handler<maker_inc_connections::TakerMessage>,
{
async fn handle_new_taker_online(&mut self, taker_id: TakerId) -> Result<()> {
let mut conn = self.db.acquire().await?;
let txid = self
.wallet
.try_broadcast_transaction(dlc.lock.0.clone())
let current_order = match self.current_order_id {
Some(current_order_id) => Some(load_order_by_id(current_order_id, &mut conn).await?),
None => None,
};
self.takers
.do_send_async(maker_inc_connections::TakerMessage {
taker_id,
command: TakerCommand::SendOrder {
order: current_order,
},
})
.await?;
tracing::info!("Lock transaction published with txid {}", txid);
Ok(())
}
self.monitor_actor
.do_send_async(monitor::StartMonitoring {
id: order_id,
params: MonitorParams::from_dlc_and_timelocks(dlc, cfd.refund_timelock_in_blocks()),
async fn handle_accept_settlement(&mut self, order_id: OrderId) -> Result<()> {
tracing::debug!(%order_id, "Maker accepts a settlement proposal" );
let taker_id = self.get_taker_id_of_proposal(&order_id)?;
self.takers
.do_send_async(maker_inc_connections::TakerMessage {
taker_id,
command: TakerCommand::NotifySettlementAccepted { id: order_id },
})
.await?;
self.oracle_actor
.do_send_async(oracle::MonitorAttestation {
event_id: cfd.order.oracle_event_id,
self.current_agreed_proposals
.insert(order_id, self.get_settlement_proposal(order_id)?);
self.remove_pending_proposal(&order_id)
.context("accepted settlement")?;
Ok(())
}
async fn handle_reject_settlement(&mut self, order_id: OrderId) -> Result<()> {
tracing::debug!(%order_id, "Maker rejects a settlement proposal" );
let taker_id = self.get_taker_id_of_proposal(&order_id)?;
self.takers
.do_send_async(maker_inc_connections::TakerMessage {
taker_id,
command: TakerCommand::NotifySettlementRejected { id: order_id },
})
.await?;
self.remove_pending_proposal(&order_id)
.context("rejected settlement")?;
Ok(())
}
async fn handle_cfd_roll_over_completed(
&mut self,
order_id: OrderId,
dlc: Result<Dlc>,
) -> Result<()> {
let dlc = dlc.context("Failed to roll over contract with taker")?;
self.roll_over_state = RollOverState::None;
async fn handle_reject_roll_over(&mut self, order_id: OrderId) -> Result<()> {
tracing::debug!(%order_id, "Maker rejects a roll_over proposal" );
let mut conn = self.db.acquire().await?;
let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
cfd.state = CfdState::Open {
common: CfdStateCommon::default(),
dlc: dlc.clone(),
attestation: None,
collaborative_close: None,
// Validate if order is actually being requested to be extended
let (_, taker_id) = match self.current_pending_proposals.get(&order_id) {
Some((
UpdateCfdProposal::RollOverProposal {
proposal,
direction: SettlementKind::Incoming,
},
taker_id,
)) => (proposal, *taker_id),
_ => {
anyhow::bail!("Order is in invalid state. Ignoring reject roll over request.")
}
};
append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?;
self.monitor_actor
.do_send_async(monitor::StartMonitoring {
id: order_id,
params: MonitorParams::from_dlc_and_timelocks(dlc, cfd.refund_timelock_in_blocks()),
self.takers
.do_send_async(maker_inc_connections::TakerMessage {
taker_id,
command: TakerCommand::NotifyRollOverRejected { id: order_id },
})
.await?;
self.remove_pending_proposal(&order_id)
.context("rejected roll_over")?;
Ok(())
}
}
impl<O, M, T> Actor<O, M, T>
where
T: xtra::Handler<maker_inc_connections::TakerMessage>
+ xtra::Handler<maker_inc_connections::BroadcastOrder>,
{
async fn handle_take_order(
&mut self,
taker_id: TakerId,
@ -492,7 +495,7 @@ impl Actor {
}
};
// 2. Create a new CFD
// 2. Insert CFD in DB
let cfd = Cfd::new(
current_order.clone(),
quantity,
@ -515,6 +518,7 @@ impl Actor {
);
self.reject_order(taker_id, cfd, conn).await?;
return Ok(());
}
@ -528,6 +532,66 @@ impl Actor {
Ok(())
}
async fn handle_reject_order(&mut self, order_id: OrderId) -> Result<()> {
tracing::debug!(%order_id, "Maker rejects an order" );
let mut conn = self.db.acquire().await?;
let cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
let taker_id = match cfd {
Cfd {
state: CfdState::IncomingOrderRequest { taker_id, .. },
..
} => taker_id,
_ => {
anyhow::bail!("Order is in invalid state. Ignoring trying to accept it.")
}
};
self.reject_order(taker_id, cfd, conn).await?;
Ok(())
}
/// Reject an order
///
/// Rejection includes removing the order and saving in the db that it was rejected.
/// In the current model it is essential to remove the order because a taker
/// that received a rejection cannot communicate with the maker until a new order is published.
async fn reject_order(
&mut self,
taker_id: TakerId,
mut cfd: Cfd,
mut conn: PoolConnection<Sqlite>,
) -> Result<()> {
cfd.state = CfdState::rejected();
append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?;
self.takers
.do_send_async(maker_inc_connections::TakerMessage {
taker_id,
command: TakerCommand::NotifyOrderRejected { id: cfd.order.id },
})
.await?;
// Remove order for all
self.current_order_id = None;
self.takers
.do_send_async(maker_inc_connections::BroadcastOrder(None))
.await?;
self.order_feed_sender.send(None)?;
Ok(())
}
}
impl<O, M, T> Actor<O, M, T>
where
Self: xtra::Handler<CfdSetupCompleted>,
O: xtra::Handler<oracle::GetAnnouncement>,
T: xtra::Handler<maker_inc_connections::TakerMessage>,
{
async fn handle_accept_order(
&mut self,
order_id: OrderId,
@ -612,108 +676,107 @@ impl Actor {
Ok(())
}
}
async fn handle_reject_order(&mut self, order_id: OrderId) -> Result<()> {
tracing::debug!(%order_id, "Maker rejects an order" );
impl<O, M, T> Actor<O, M, T>
where
O: xtra::Handler<oracle::FetchAnnouncement>,
T: xtra::Handler<maker_inc_connections::BroadcastOrder>,
{
async fn handle_new_order(
&mut self,
price: Usd,
min_quantity: Usd,
max_quantity: Usd,
) -> Result<()> {
let oracle_event_id =
oracle::next_announcement_after(time::OffsetDateTime::now_utc() + Order::TERM)?;
self.oracle_actor
.do_send_async(oracle::FetchAnnouncement(oracle_event_id))
.await?;
let order = Order::new(
price,
min_quantity,
max_quantity,
Origin::Ours,
oracle_event_id,
)?;
// 1. Save to DB
let mut conn = self.db.acquire().await?;
let cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
insert_order(&order, &mut conn).await?;
let taker_id = match cfd {
Cfd {
state: CfdState::IncomingOrderRequest { taker_id, .. },
..
} => taker_id,
_ => {
anyhow::bail!("Order is in invalid state. Ignoring trying to accept it.")
}
};
// 2. Update actor state to current order
self.current_order_id.replace(order.id);
self.reject_order(taker_id, cfd, conn).await?;
// 3. Notify UI via feed
self.order_feed_sender.send(Some(order.clone()))?;
// 4. Inform connected takers
self.takers
.do_send_async(maker_inc_connections::BroadcastOrder(Some(order)))
.await?;
Ok(())
}
}
/// Reject an order
///
/// Rejection includes removing the order and saving in the db that it was rejected.
/// In the current model it is essential to remove the order because a taker
/// that received a rejection cannot communicate with the maker until a new order is published.
async fn reject_order(
impl<O, M, T> Actor<O, M, T>
where
O: xtra::Handler<oracle::MonitorAttestation>,
M: xtra::Handler<monitor::StartMonitoring>,
{
async fn handle_cfd_setup_completed(
&mut self,
taker_id: TakerId,
mut cfd: Cfd,
mut conn: PoolConnection<Sqlite>,
order_id: OrderId,
dlc: Result<Dlc>,
) -> Result<()> {
// Update order in db
cfd.state = CfdState::rejected();
append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?;
self.takers
.do_send_async(maker_inc_connections::TakerMessage {
taker_id,
command: TakerCommand::NotifyOrderRejected { id: cfd.order.id },
})
.await?;
// Remove order for all
self.current_order_id = None;
self.takers
.do_send_async(maker_inc_connections::BroadcastOrder(None))
.await?;
self.order_feed_sender.send(None)?;
self.setup_state = SetupState::None;
Ok(())
}
let dlc = dlc.context("Failed to setup contract with taker")?;
async fn handle_commit(&mut self, order_id: OrderId) -> Result<()> {
let mut conn = self.db.acquire().await?;
cfd_actors::handle_commit(
order_id,
&mut conn,
&self.wallet,
&self.cfd_feed_actor_inbox,
)
.await?;
Ok(())
}
async fn handle_accept_settlement(&mut self, order_id: OrderId) -> Result<()> {
tracing::debug!(%order_id, "Maker accepts a settlement proposal" );
let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
cfd.state = CfdState::PendingOpen {
common: CfdStateCommon::default(),
dlc: dlc.clone(),
attestation: None,
};
let taker_id = self.get_taker_id_of_proposal(&order_id)?;
append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?;
self.takers
.do_send_async(maker_inc_connections::TakerMessage {
taker_id,
command: TakerCommand::NotifySettlementAccepted { id: order_id },
})
let txid = self
.wallet
.try_broadcast_transaction(dlc.lock.0.clone())
.await?;
self.current_agreed_proposals
.insert(order_id, self.get_settlement_proposal(order_id)?);
self.remove_pending_proposal(&order_id)
.context("accepted settlement")?;
Ok(())
}
async fn handle_reject_settlement(&mut self, order_id: OrderId) -> Result<()> {
tracing::debug!(%order_id, "Maker rejects a settlement proposal" );
tracing::info!("Lock transaction published with txid {}", txid);
let taker_id = self.get_taker_id_of_proposal(&order_id)?;
self.monitor_actor
.do_send_async(monitor::StartMonitoring {
id: order_id,
params: MonitorParams::from_dlc_and_timelocks(dlc, cfd.refund_timelock_in_blocks()),
})
.await?;
self.takers
.do_send_async(maker_inc_connections::TakerMessage {
taker_id,
command: TakerCommand::NotifySettlementRejected { id: order_id },
self.oracle_actor
.do_send_async(oracle::MonitorAttestation {
event_id: cfd.order.oracle_event_id,
})
.await?;
self.remove_pending_proposal(&order_id)
.context("rejected settlement")?;
Ok(())
}
}
impl<O, M, T> Actor<O, M, T>
where
Self: xtra::Handler<CfdRollOverCompleted>,
O: xtra::Handler<oracle::MonitorAttestation> + xtra::Handler<oracle::GetAnnouncement>,
T: xtra::Handler<maker_inc_connections::TakerMessage>,
{
async fn handle_accept_roll_over(
&mut self,
order_id: OrderId,
@ -801,63 +864,49 @@ impl Actor {
.context("accepted roll_over")?;
Ok(())
}
}
async fn handle_reject_roll_over(&mut self, order_id: OrderId) -> Result<()> {
tracing::debug!(%order_id, "Maker rejects a roll_over proposal" );
impl<O, M, T> Actor<O, M, T>
where
M: xtra::Handler<monitor::StartMonitoring>,
{
async fn handle_cfd_roll_over_completed(
&mut self,
order_id: OrderId,
dlc: Result<Dlc>,
) -> Result<()> {
let dlc = dlc.context("Failed to roll over contract with taker")?;
self.roll_over_state = RollOverState::None;
// Validate if order is actually being requested to be extended
let (_, taker_id) = match self.current_pending_proposals.get(&order_id) {
Some((
UpdateCfdProposal::RollOverProposal {
proposal,
direction: SettlementKind::Incoming,
},
taker_id,
)) => (proposal, *taker_id),
_ => {
anyhow::bail!("Order is in invalid state. Ignoring reject roll over request.")
}
let mut conn = self.db.acquire().await?;
let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
cfd.state = CfdState::Open {
common: CfdStateCommon::default(),
dlc: dlc.clone(),
attestation: None,
collaborative_close: None,
};
append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?;
self.takers
.do_send_async(maker_inc_connections::TakerMessage {
taker_id,
command: TakerCommand::NotifyRollOverRejected { id: order_id },
self.monitor_actor
.do_send_async(monitor::StartMonitoring {
id: order_id,
params: MonitorParams::from_dlc_and_timelocks(dlc, cfd.refund_timelock_in_blocks()),
})
.await?;
self.remove_pending_proposal(&order_id)
.context("rejected roll_over")?;
Ok(())
}
async fn handle_monitoring_event(&mut self, event: monitor::Event) -> Result<()> {
let mut conn = self.db.acquire().await?;
cfd_actors::handle_monitoring_event(
event,
&mut conn,
&self.wallet,
&self.cfd_feed_actor_inbox,
)
.await?;
Ok(())
}
async fn handle_oracle_attestation(&mut self, attestation: oracle::Attestation) -> Result<()> {
let mut conn = self.db.acquire().await?;
cfd_actors::handle_oracle_attestation(
attestation,
&mut conn,
&self.wallet,
&self.cfd_feed_actor_inbox,
)
.await?;
Ok(())
}
}
#[async_trait]
impl Handler<CfdAction> for Actor {
impl<O: 'static, M: 'static, T: 'static> Handler<CfdAction> for Actor<O, M, T>
where
Self: xtra::Handler<CfdSetupCompleted> + xtra::Handler<CfdRollOverCompleted>,
O: xtra::Handler<oracle::MonitorAttestation> + xtra::Handler<oracle::GetAnnouncement>,
T: xtra::Handler<maker_inc_connections::TakerMessage>
+ xtra::Handler<maker_inc_connections::BroadcastOrder>,
{
async fn handle(&mut self, msg: CfdAction, ctx: &mut Context<Self>) {
use CfdAction::*;
if let Err(e) = match msg {
@ -875,42 +924,60 @@ impl Handler<CfdAction> for Actor {
}
#[async_trait]
impl Handler<NewOrder> for Actor {
impl<O: 'static, M: 'static, T: 'static> Handler<NewOrder> for Actor<O, M, T>
where
O: xtra::Handler<oracle::FetchAnnouncement>,
T: xtra::Handler<maker_inc_connections::BroadcastOrder>,
{
async fn handle(&mut self, msg: NewOrder, _ctx: &mut Context<Self>) {
log_error!(self.handle_new_order(msg.price, msg.min_quantity, msg.max_quantity));
}
}
#[async_trait]
impl Handler<NewTakerOnline> for Actor {
impl<O: 'static, M: 'static, T: 'static> Handler<NewTakerOnline> for Actor<O, M, T>
where
T: xtra::Handler<maker_inc_connections::TakerMessage>,
{
async fn handle(&mut self, msg: NewTakerOnline, _ctx: &mut Context<Self>) {
log_error!(self.handle_new_taker_online(msg.id));
}
}
#[async_trait]
impl Handler<CfdSetupCompleted> for Actor {
impl<O: 'static, M: 'static, T: 'static> Handler<CfdSetupCompleted> for Actor<O, M, T>
where
O: xtra::Handler<oracle::MonitorAttestation>,
M: xtra::Handler<monitor::StartMonitoring>,
{
async fn handle(&mut self, msg: CfdSetupCompleted, _ctx: &mut Context<Self>) {
log_error!(self.handle_cfd_setup_completed(msg.order_id, msg.dlc));
}
}
#[async_trait]
impl Handler<CfdRollOverCompleted> for Actor {
impl<O: 'static, M: 'static, T: 'static> Handler<CfdRollOverCompleted> for Actor<O, M, T>
where
M: xtra::Handler<monitor::StartMonitoring>,
{
async fn handle(&mut self, msg: CfdRollOverCompleted, _ctx: &mut Context<Self>) {
log_error!(self.handle_cfd_roll_over_completed(msg.order_id, msg.dlc));
}
}
#[async_trait]
impl Handler<monitor::Event> for Actor {
impl<O: 'static, M: 'static, T: 'static> Handler<monitor::Event> for Actor<O, M, T> {
async fn handle(&mut self, msg: monitor::Event, _ctx: &mut Context<Self>) {
log_error!(self.handle_monitoring_event(msg))
}
}
#[async_trait]
impl Handler<FromTaker> for Actor {
impl<O: 'static, M: 'static, T: 'static> Handler<FromTaker> for Actor<O, M, T>
where
T: xtra::Handler<maker_inc_connections::BroadcastOrder>
+ xtra::Handler<maker_inc_connections::TakerMessage>,
{
async fn handle(&mut self, FromTaker { taker_id, msg }: FromTaker, _ctx: &mut Context<Self>) {
match msg {
wire::TakerToMaker::TakeOrder { order_id, quantity } => {
@ -964,7 +1031,7 @@ impl Handler<FromTaker> for Actor {
}
#[async_trait]
impl Handler<oracle::Attestation> for Actor {
impl<O: 'static, M: 'static, T: 'static> Handler<oracle::Attestation> for Actor<O, M, T> {
async fn handle(&mut self, msg: oracle::Attestation, _ctx: &mut Context<Self>) {
log_error!(self.handle_oracle_attestation(msg))
}
@ -994,4 +1061,4 @@ impl Message for FromTaker {
type Result = ();
}
impl xtra::Actor for Actor {}
impl<O: 'static, M: 'static, T: 'static> xtra::Actor for Actor<O, M, T> {}

Loading…
Cancel
Save