Browse Source

Introduce short-lived setup_maker::Actor

feature/reconnect-button
Lucas Soriano del Pino 3 years ago
parent
commit
c6e59abcfc
No known key found for this signature in database GPG Key ID: EE611E973A1530E7
  1. 7
      daemon/src/address_map.rs
  2. 6
      daemon/src/lib.rs
  3. 407
      daemon/src/maker_cfd.rs
  4. 60
      daemon/src/maker_inc_connections.rs
  5. 284
      daemon/src/setup_maker.rs
  6. 2
      daemon/src/setup_taker.rs
  7. 5
      daemon/tests/happy_path.rs

7
daemon/src/address_map.rs

@ -29,6 +29,13 @@ where
Ok(Disconnected { entry })
}
pub fn get_connected(&self, key: &K) -> Option<&xtra::Address<A>> {
match self.inner.get(key) {
Some(addr) if addr.is_connected() => Some(addr),
_ => None,
}
}
/// Garbage-collect an address that is no longer active.
pub fn gc(&mut self, stopping: Stopping<A>) {
self.inner.retain(|_, candidate| stopping.me != *candidate);

6
daemon/src/lib.rs

@ -5,6 +5,7 @@ use crate::maker_cfd::{FromTaker, TakerConnected};
use crate::model::cfd::{Cfd, Order, UpdateCfdProposals};
use crate::oracle::Attestation;
use crate::tokio_ext::FutureExt;
use address_map::Stopping;
use anyhow::Result;
use connection::ConnectionStatus;
use futures::future::RemoteHandle;
@ -44,6 +45,7 @@ pub mod routes;
pub mod seed;
pub mod send_to_socket;
pub mod setup_contract;
pub mod setup_maker;
pub mod setup_taker;
pub mod taker_cfd;
pub mod to_sse_event;
@ -112,7 +114,9 @@ where
+ xtra::Handler<monitor::CollaborativeSettlement>
+ xtra::Handler<oracle::Attestation>,
T: xtra::Handler<maker_inc_connections::TakerMessage>
+ xtra::Handler<maker_inc_connections::BroadcastOrder>,
+ xtra::Handler<maker_inc_connections::BroadcastOrder>
+ xtra::Handler<maker_inc_connections::ConfirmOrder>
+ xtra::Handler<Stopping<setup_maker::Actor>>,
W: xtra::Handler<wallet::BuildPartyParams>
+ xtra::Handler<wallet::Sync>
+ xtra::Handler<wallet::Sign>

407
daemon/src/maker_cfd.rs

@ -1,3 +1,4 @@
use crate::address_map::{AddressMap, Stopping};
use crate::cfd_actors::{self, append_cfd_state, insert_cfd_and_send_to_feed};
use crate::db::{insert_order, load_cfd_by_order_id, load_order_by_id};
use crate::model::cfd::{
@ -10,10 +11,11 @@ use crate::projection::{
try_into_update_rollover_proposal, try_into_update_settlement_proposal, Update,
UpdateRollOverProposal, UpdateSettlementProposal,
};
use crate::setup_contract::{RolloverParams, SetupParams};
use crate::setup_contract::RolloverParams;
use crate::tokio_ext::FutureExt;
use crate::{
log_error, maker_inc_connections, monitor, oracle, projection, setup_contract, wallet, wire,
log_error, maker_inc_connections, monitor, oracle, projection, setup_contract, setup_maker,
wallet, wire, Tasks,
};
use anyhow::{Context as _, Result};
use async_trait::async_trait;
@ -27,6 +29,7 @@ use sqlx::Sqlite;
use std::collections::{HashMap, HashSet};
use time::Duration;
use xtra::prelude::*;
use xtra::Actor as _;
use xtra_productivity::xtra_productivity;
pub struct AcceptOrder {
@ -64,11 +67,6 @@ pub struct TakerDisconnected {
pub id: Identity,
}
pub struct CfdSetupCompleted {
pub order_id: OrderId,
pub dlc: Result<Dlc>,
}
pub struct CfdRollOverCompleted {
pub order_id: OrderId,
pub dlc: Result<Dlc>,
@ -93,7 +91,7 @@ pub struct Actor<
takers: Address<T>,
current_order_id: Option<OrderId>,
monitor_actor: Address<M>,
setup_state: SetupState,
setup_actors: AddressMap<OrderId, setup_maker::Actor>,
roll_over_state: RollOverState,
oracle_actor: Address<O>,
// Maker needs to also store Identity to be able to send a reply back
@ -101,15 +99,7 @@ pub struct Actor<
current_agreed_proposals: HashMap<OrderId, (SettlementProposal, Identity)>,
connected_takers: HashSet<Identity>,
n_payouts: usize,
}
enum SetupState {
Active {
taker: Identity,
sender: mpsc::UnboundedSender<wire::SetupMsg>,
_task: RemoteHandle<()>,
},
None,
tasks: Tasks,
}
enum RollOverState {
@ -143,13 +133,14 @@ impl<O, M, T, W> Actor<O, M, T, W> {
takers,
current_order_id: None,
monitor_actor,
setup_state: SetupState::None,
setup_actors: AddressMap::default(),
roll_over_state: RollOverState::None,
oracle_actor,
current_pending_proposals: HashMap::new(),
current_agreed_proposals: HashMap::new(),
n_payouts,
connected_takers: HashSet::new(),
tasks: Tasks::default(),
}
}
@ -216,26 +207,6 @@ impl<O, M, T, W> Actor<O, M, T, W> {
Ok(())
}
async fn handle_inc_protocol_msg(
&mut self,
taker_id: Identity,
msg: wire::SetupMsg,
) -> Result<()> {
match &mut self.setup_state {
SetupState::Active { taker, sender, .. } if taker_id == *taker => {
sender.send(msg).await?;
}
SetupState::Active { taker, .. } => {
anyhow::bail!("Currently setting up contract with taker {}", taker)
}
SetupState::None => {
anyhow::bail!("Received setup message without an active contract setup");
}
}
Ok(())
}
async fn handle_inc_roll_over_protocol_msg(
&mut self,
taker_id: Identity,
@ -320,6 +291,30 @@ impl<O, M, T, W> Actor<O, M, T, W> {
.await?;
Ok(())
}
async fn append_cfd_state_setup_failed(
&mut self,
order_id: OrderId,
error: anyhow::Error,
) -> Result<()> {
tracing::error!(%order_id, "Contract setup failed: {:#?}", error);
let mut conn = self.db.acquire().await?;
let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
cfd.state = CfdState::setup_failed(error.to_string());
append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?;
Ok(())
}
async fn append_cfd_state_rejected(&mut self, order_id: OrderId) -> Result<()> {
let mut conn = self.db.acquire().await?;
let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
cfd.state = CfdState::rejected();
append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?;
Ok(())
}
}
impl<O, M, T, W> Actor<O, M, T, W>
@ -406,17 +401,35 @@ where
impl<O, M, T, W> Actor<O, M, T, W>
where
T: xtra::Handler<maker_inc_connections::TakerMessage>
+ xtra::Handler<maker_inc_connections::BroadcastOrder>,
O: xtra::Handler<oracle::GetAnnouncement> + xtra::Handler<oracle::MonitorAttestation>,
M: xtra::Handler<monitor::StartMonitoring>,
T: xtra::Handler<maker_inc_connections::ConfirmOrder>
+ xtra::Handler<maker_inc_connections::TakerMessage>
+ xtra::Handler<maker_inc_connections::BroadcastOrder>
+ xtra::Handler<Stopping<setup_maker::Actor>>,
W: xtra::Handler<wallet::Sign>
+ xtra::Handler<wallet::BuildPartyParams>
+ xtra::Handler<wallet::TryBroadcastTransaction>,
{
async fn handle_take_order(
&mut self,
taker_id: Identity,
order_id: OrderId,
quantity: Usd,
ctx: &mut Context<Self>,
) -> Result<()> {
tracing::debug!(%taker_id, %quantity, %order_id, "Taker wants to take an order");
let disconnected = self
.setup_actors
.get_disconnected(order_id)
.with_context(|| {
format!(
"Contract setup for order {} is already in progress",
order_id
)
})?;
let mut conn = self.db.acquire().await?;
// 1. Validate if order is still valid
@ -471,130 +484,69 @@ where
);
insert_cfd_and_send_to_feed(&cfd, &mut conn, &self.projection_actor).await?;
// 4. check if order has acceptable amounts and if not reject the cfd
// Since rejection is tied to the cfd state at the moment we can only do this after creating
// a cfd.
if quantity < current_order.min_quantity || quantity > current_order.max_quantity {
tracing::warn!(
"Order rejected because quantity {} was out of bounds. It was either <{} or >{}",
quantity,
current_order.min_quantity,
current_order.max_quantity
);
// 4. Try to get the oracle announcement, if that fails we should exit prior to changing any
// state
let announcement = self
.oracle_actor
.send(oracle::GetAnnouncement(cfd.order.oracle_event_id))
.await??;
self.reject_order(taker_id, cfd, conn).await?;
}
// 5. Start up contract setup actor
let this = ctx
.address()
.expect("actor to be able to give address to itself");
let (addr, fut) = setup_maker::Actor::new(
(cfd.order, cfd.quantity_usd, self.n_payouts),
(self.oracle_pk, announcement),
&self.wallet,
&self.wallet,
(&self.takers, &self.takers, taker_id),
&this,
(&self.takers, &this),
)
.create(None)
.run();
disconnected.insert(addr);
self.tasks.add(fut);
Ok(())
}
}
#[xtra_productivity]
impl<O, M, T, W> Actor<O, M, T, W>
where
Self: xtra::Handler<CfdSetupCompleted>,
O: xtra::Handler<oracle::GetAnnouncement>,
T: xtra::Handler<maker_inc_connections::TakerMessage>,
W: xtra::Handler<wallet::Sign> + xtra::Handler<wallet::BuildPartyParams>,
{
async fn handle_accept_order(
&mut self,
msg: AcceptOrder,
ctx: &mut Context<Self>,
) -> Result<()> {
impl<O, M, T, W> Actor<O, M, T, W> {
async fn handle_accept_order(&mut self, msg: AcceptOrder) -> Result<()> {
let AcceptOrder { order_id } = msg;
if let SetupState::Active { .. } = self.setup_state {
anyhow::bail!("Already setting up a contract!")
}
tracing::debug!(%order_id, "Maker accepts an order" );
tracing::debug!(%order_id, "Maker accepts order");
let mut conn = self.db.acquire().await?;
// 1. Validate if order is still valid
let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
let taker_id = match cfd {
Cfd {
state: CfdState::IncomingOrderRequest { taker_id, .. },
..
} => taker_id,
_ => {
anyhow::bail!("Order is in invalid state. Ignoring trying to accept it.")
}
};
// 2. Try to get the oracle announcement, if that fails we should exit prior to changing any
// state
let offer_announcement = self
.oracle_actor
.send(oracle::GetAnnouncement(cfd.order.oracle_event_id))
.await??;
// 3. Notify the taker that we are ready for contract setup
// Use `.send` here to ensure we only continue once the message has been sent
// Nothing done after this call should be able to fail, otherwise we notified the taker, but
// might not transition to `Active` ourselves!
self.takers
.send(maker_inc_connections::TakerMessage {
taker_id,
msg: wire::MakerToTaker::ConfirmOrder(cfd.order.id),
})
.await??;
if !self
.setup_actors
.send(&order_id, setup_maker::Accepted)
.await
{
anyhow::bail!("No active contract setup for order {}", order_id);
}
// 4. Insert that we are in contract setup and refresh our own feed
cfd.state = CfdState::contract_setup();
append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?;
// 5. Spawn away the contract setup
let (sender, receiver) = mpsc::unbounded();
let contract_future = setup_contract::new(
self.takers.clone().into_sink().with(move |msg| {
future::ok(maker_inc_connections::TakerMessage {
taker_id,
msg: wire::MakerToTaker::Protocol { order_id, msg },
})
}),
receiver,
(self.oracle_pk, offer_announcement),
SetupParams::new(
cfd.margin()?,
cfd.counterparty_margin()?,
cfd.order.price,
cfd.quantity_usd,
cfd.order.leverage,
cfd.refund_timelock_in_blocks(),
),
Box::new(self.wallet.clone()),
Box::new(self.wallet.clone()),
Role::Maker,
self.n_payouts,
);
let this = ctx
.address()
.expect("actor to be able to give address to itself");
let task = async move {
let dlc = contract_future.await;
this.send(CfdSetupCompleted { order_id, dlc })
.await
.expect("always connected to ourselves");
}
.spawn_with_handle();
// 6. Record that we are in an active contract setup
self.setup_state = SetupState::Active {
sender,
taker: taker_id,
_task: task,
};
Ok(())
}
}
#[xtra_productivity(message_impl = false)]
impl<O, M, T, W> Actor<O, M, T, W> {
async fn handle_setup_actor_stopping(&mut self, message: Stopping<setup_maker::Actor>) {
self.setup_actors.gc(message);
}
}
#[xtra_productivity]
impl<O, M, T, W> Actor<O, M, T, W>
where
@ -603,7 +555,7 @@ where
async fn handle_reject_order(&mut self, msg: RejectOrder) -> Result<()> {
let RejectOrder { order_id } = msg;
tracing::debug!(%order_id, "Maker rejects an order" );
tracing::debug!(%order_id, "Maker rejects order");
let mut conn = self.db.acquire().await?;
let cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
@ -614,7 +566,7 @@ where
..
} => taker_id,
_ => {
anyhow::bail!("Order is in invalid state. Ignoring trying to accept it.")
anyhow::bail!("Order is in invalid state. Ignoring trying to reject it.")
}
};
@ -839,70 +791,6 @@ where
}
}
impl<O, M, T, W> Actor<O, M, T, W>
where
O: xtra::Handler<oracle::MonitorAttestation>,
M: xtra::Handler<monitor::StartMonitoring>,
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
async fn handle_cfd_setup_completed(
&mut self,
order_id: OrderId,
dlc: Result<Dlc>,
) -> Result<()> {
self.setup_state = SetupState::None;
let mut conn = self.db.acquire().await?;
let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
let dlc = match dlc {
Ok(dlc) => dlc,
Err(e) => {
cfd.state = CfdState::SetupFailed {
common: CfdStateCommon::default(),
info: e.to_string(),
};
append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?;
return Err(e);
}
};
cfd.state = CfdState::PendingOpen {
common: CfdStateCommon::default(),
dlc: dlc.clone(),
attestation: None,
};
append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?;
let txid = self
.wallet
.send(wallet::TryBroadcastTransaction {
tx: dlc.lock.0.clone(),
})
.await??;
tracing::info!("Lock transaction published with txid {}", txid);
self.monitor_actor
.send(monitor::StartMonitoring {
id: order_id,
params: MonitorParams::new(dlc.clone(), cfd.refund_timelock_in_blocks()),
})
.await?;
self.oracle_actor
.send(oracle::MonitorAttestation {
event_id: dlc.settlement_event_id,
})
.await?;
Ok(())
}
}
impl<O, M, T, W> Actor<O, M, T, W>
where
M: xtra::Handler<monitor::StartMonitoring>,
@ -1060,6 +948,66 @@ where
}
}
#[xtra_productivity]
impl<O, M, T, W> Actor<O, M, T, W>
where
O: xtra::Handler<oracle::MonitorAttestation>,
M: xtra::Handler<monitor::StartMonitoring>,
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
async fn handle_setup_completed(&mut self, msg: setup_maker::Completed) {
log_error!(async {
use setup_maker::Completed::*;
let (order_id, dlc) = match msg {
NewContract { order_id, dlc } => (order_id, dlc),
Failed { order_id, error } => {
self.append_cfd_state_setup_failed(order_id, error).await?;
return anyhow::Ok(());
}
Rejected(order_id) => {
self.append_cfd_state_rejected(order_id).await?;
return anyhow::Ok(());
}
};
let mut conn = self.db.acquire().await?;
let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
cfd.state = CfdState::PendingOpen {
common: CfdStateCommon::default(),
dlc: dlc.clone(),
attestation: None,
};
append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?;
let txid = self
.wallet
.send(wallet::TryBroadcastTransaction {
tx: dlc.lock.0.clone(),
})
.await??;
tracing::info!("Lock transaction published with txid {}", txid);
self.monitor_actor
.send(monitor::StartMonitoring {
id: order_id,
params: MonitorParams::new(dlc.clone(), cfd.refund_timelock_in_blocks()),
})
.await?;
self.oracle_actor
.send(oracle::MonitorAttestation {
event_id: dlc.settlement_event_id,
})
.await?;
Ok(())
});
}
}
#[async_trait]
impl<O: 'static, M: 'static, T: 'static, W: 'static> Handler<TakerConnected> for Actor<O, M, T, W>
where
@ -1081,19 +1029,6 @@ where
}
}
#[async_trait]
impl<O: 'static, M: 'static, T: 'static, W: 'static> Handler<CfdSetupCompleted>
for Actor<O, M, T, W>
where
O: xtra::Handler<oracle::MonitorAttestation>,
M: xtra::Handler<monitor::StartMonitoring>,
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
async fn handle(&mut self, msg: CfdSetupCompleted, _ctx: &mut Context<Self>) {
log_error!(self.handle_cfd_setup_completed(msg.order_id, msg.dlc));
}
}
#[async_trait]
impl<O: 'static, M: 'static, T: 'static, W: 'static> Handler<CfdRollOverCompleted>
for Actor<O, M, T, W>
@ -1119,15 +1054,20 @@ where
#[async_trait]
impl<O: 'static, M: 'static, T: 'static, W: 'static> Handler<FromTaker> for Actor<O, M, T, W>
where
T: xtra::Handler<maker_inc_connections::BroadcastOrder>
+ xtra::Handler<maker_inc_connections::TakerMessage>,
M: xtra::Handler<monitor::CollaborativeSettlement>,
W: xtra::Handler<wallet::TryBroadcastTransaction>,
O: xtra::Handler<oracle::GetAnnouncement> + xtra::Handler<oracle::MonitorAttestation>,
M: xtra::Handler<monitor::StartMonitoring> + xtra::Handler<monitor::CollaborativeSettlement>,
T: xtra::Handler<maker_inc_connections::ConfirmOrder>
+ xtra::Handler<maker_inc_connections::TakerMessage>
+ xtra::Handler<maker_inc_connections::BroadcastOrder>
+ xtra::Handler<Stopping<setup_maker::Actor>>,
W: xtra::Handler<wallet::Sign>
+ xtra::Handler<wallet::BuildPartyParams>
+ xtra::Handler<wallet::TryBroadcastTransaction>,
{
async fn handle(&mut self, FromTaker { taker_id, msg }: FromTaker, _ctx: &mut Context<Self>) {
async fn handle(&mut self, FromTaker { taker_id, msg }: FromTaker, ctx: &mut Context<Self>) {
match msg {
wire::TakerToMaker::TakeOrder { order_id, quantity } => {
log_error!(self.handle_take_order(taker_id, order_id, quantity))
log_error!(self.handle_take_order(taker_id, order_id, quantity, ctx))
}
wire::TakerToMaker::Settlement {
order_id,
@ -1156,9 +1096,6 @@ where
} => {
log_error!(self.handle_initiate_settlement(taker_id, order_id, sig_taker))
}
wire::TakerToMaker::Protocol { msg, .. } => {
log_error!(self.handle_inc_protocol_msg(taker_id, msg))
}
wire::TakerToMaker::ProposeRollOver {
order_id,
timestamp,
@ -1171,10 +1108,12 @@ where
taker_id,
))
}
wire::TakerToMaker::RollOverProtocol(msg) => {
log_error!(self.handle_inc_roll_over_protocol_msg(taker_id, msg))
}
wire::TakerToMaker::Protocol { .. } => {
unreachable!("This kind of message should be sent to the `setup_maker::Actor`")
}
}
}
}
@ -1198,10 +1137,6 @@ impl Message for TakerDisconnected {
type Result = ();
}
impl Message for CfdSetupCompleted {
type Result = ();
}
impl Message for CfdRollOverCompleted {
type Result = ();
}

60
daemon/src/maker_inc_connections.rs

@ -1,8 +1,9 @@
use crate::address_map::{AddressMap, Stopping};
use crate::maker_cfd::{FromTaker, TakerConnected, TakerDisconnected};
use crate::model::cfd::Order;
use crate::model::cfd::{Order, OrderId};
use crate::model::Identity;
use crate::noise::TransportStateExt;
use crate::{maker_cfd, noise, send_to_socket, wire, Tasks};
use crate::{maker_cfd, noise, send_to_socket, setup_maker, wire, Tasks};
use anyhow::Result;
use futures::TryStreamExt;
use std::collections::HashMap;
@ -18,6 +19,20 @@ use xtra_productivity::xtra_productivity;
pub struct BroadcastOrder(pub Option<Order>);
/// Message sent from the `setup_maker::Actor` to the
/// `maker_inc_connections::Actor` so that it can forward it to the
/// taker.
///
/// Additionally, the address of this instance of the
/// `setup_maker::Actor` is included so that the
/// `maker_inc_connections::Actor` knows where to forward the contract
/// setup messages from the taker about this particular order.
pub struct ConfirmOrder {
pub taker_id: Identity,
pub order_id: OrderId,
pub address: xtra::Address<setup_maker::Actor>,
}
#[derive(Debug)]
pub struct TakerMessage {
pub taker_id: Identity,
@ -41,6 +56,7 @@ pub struct Actor {
taker_msg_channel: Box<dyn MessageChannel<FromTaker>>,
noise_priv_key: x25519_dalek::StaticSecret,
heartbeat_interval: Duration,
setup_actors: AddressMap<OrderId, setup_maker::Actor>,
connection_tasks: HashMap<Identity, Tasks>,
}
@ -59,6 +75,7 @@ impl Actor {
taker_msg_channel: taker_msg_channel.clone_channel(),
noise_priv_key,
heartbeat_interval,
setup_actors: AddressMap::default(),
connection_tasks: HashMap::new(),
}
}
@ -113,10 +130,9 @@ impl Actor {
FramedRead::new(read, wire::EncryptedJsonCodec::new(transport_state.clone()));
let this = ctx.address().expect("self to be alive");
let taker_msg_channel = self.taker_msg_channel.clone_channel();
let read_fut = async move {
while let Ok(Some(msg)) = read.try_next().await {
let res = taker_msg_channel.send(FromTaker { taker_id, msg }).await;
let res = this.send(FromTaker { taker_id, msg }).await;
if res.is_err() {
break;
@ -166,6 +182,18 @@ impl Actor {
}
}
async fn handle_confirm_order(&mut self, msg: ConfirmOrder) -> Result<()> {
self.send_to_taker(
&msg.taker_id,
wire::MakerToTaker::ConfirmOrder(msg.order_id),
)
.await?;
self.setup_actors.insert(msg.order_id, msg.address);
Ok(())
}
async fn handle_taker_message(&mut self, msg: TakerMessage) -> Result<(), NoConnection> {
self.send_to_taker(&msg.taker_id, msg.msg).await?;
@ -198,6 +226,30 @@ impl Actor {
}
}
#[xtra_productivity(message_impl = false)]
impl Actor {
async fn handle_msg_from_taker(&mut self, msg: FromTaker) {
use wire::TakerToMaker::*;
match msg.msg {
Protocol { order_id, msg } => match self.setup_actors.get_connected(&order_id) {
Some(addr) => {
let _ = addr.send(msg).await;
}
None => {
tracing::error!(%order_id, "No active contract setup");
}
},
_ => {
let _ = self.taker_msg_channel.send(msg);
}
}
}
async fn handle_setup_actor_stopping(&mut self, message: Stopping<setup_maker::Actor>) {
self.setup_actors.gc(message);
}
}
struct ReadFail(Identity);
impl xtra::Actor for Actor {}

284
daemon/src/setup_maker.rs

@ -0,0 +1,284 @@
use crate::address_map::Stopping;
use crate::model::cfd::{Cfd, CfdState, Dlc, Order, OrderId, Role};
use crate::model::{Identity, Usd};
use crate::oracle::Announcement;
use crate::setup_contract::{self, SetupParams};
use crate::tokio_ext::spawn_fallible;
use crate::wire::{self, SetupMsg};
use crate::{maker_inc_connections, wallet};
use anyhow::{Context, Result};
use async_trait::async_trait;
use futures::channel::mpsc::{self, UnboundedSender};
use futures::{future, SinkExt};
use maia::secp256k1_zkp::schnorrsig;
use xtra::prelude::MessageChannel;
use xtra_productivity::xtra_productivity;
pub struct Actor {
order: Order,
quantity: Usd,
n_payouts: usize,
oracle_pk: schnorrsig::PublicKey,
announcement: Announcement,
build_party_params: Box<dyn MessageChannel<wallet::BuildPartyParams>>,
sign: Box<dyn MessageChannel<wallet::Sign>>,
taker: Box<dyn MessageChannel<maker_inc_connections::TakerMessage>>,
confirm_order: Box<dyn MessageChannel<maker_inc_connections::ConfirmOrder>>,
taker_id: Identity,
on_completed: Box<dyn MessageChannel<Completed>>,
on_stopping: Vec<Box<dyn MessageChannel<Stopping<Self>>>>,
setup_msg_sender: Option<UnboundedSender<SetupMsg>>,
}
impl Actor {
pub fn new(
(order, quantity, n_payouts): (Order, Usd, usize),
(oracle_pk, announcement): (schnorrsig::PublicKey, Announcement),
build_party_params: &(impl MessageChannel<wallet::BuildPartyParams> + 'static),
sign: &(impl MessageChannel<wallet::Sign> + 'static),
(taker, confirm_order, taker_id): (
&(impl MessageChannel<maker_inc_connections::TakerMessage> + 'static),
&(impl MessageChannel<maker_inc_connections::ConfirmOrder> + 'static),
Identity,
),
on_completed: &(impl MessageChannel<Completed> + 'static),
(on_stopping0, on_stopping1): (
&(impl MessageChannel<Stopping<Self>> + 'static),
&(impl MessageChannel<Stopping<Self>> + 'static),
),
) -> Self {
Self {
order,
quantity,
n_payouts,
oracle_pk,
announcement,
build_party_params: build_party_params.clone_channel(),
sign: sign.clone_channel(),
taker: taker.clone_channel(),
confirm_order: confirm_order.clone_channel(),
taker_id,
on_completed: on_completed.clone_channel(),
on_stopping: vec![on_stopping0.clone_channel(), on_stopping1.clone_channel()],
setup_msg_sender: None,
}
}
async fn contract_setup(&mut self, this: xtra::Address<Self>) -> Result<()> {
let order_id = self.order.id;
let cfd = Cfd::new(
self.order.clone(),
self.quantity,
CfdState::contract_setup(),
);
let (sender, receiver) = mpsc::unbounded();
// store the writing end to forward messages from the taker to
// the spawned contract setup task
self.setup_msg_sender = Some(sender);
let taker_id = self.taker_id;
let contract_future = setup_contract::new(
self.taker.sink().with(move |msg| {
future::ok(maker_inc_connections::TakerMessage {
taker_id,
msg: wire::MakerToTaker::Protocol { order_id, msg },
})
}),
receiver,
(self.oracle_pk, self.announcement.clone()),
SetupParams::new(
cfd.margin()?,
cfd.counterparty_margin()?,
cfd.order.price,
cfd.quantity_usd,
cfd.order.leverage,
cfd.refund_timelock_in_blocks(),
),
self.build_party_params.clone_channel(),
self.sign.clone_channel(),
Role::Maker,
self.n_payouts,
);
spawn_fallible::<_, anyhow::Error>(async move {
let _ = match contract_future.await {
Ok(dlc) => this.send(SetupSucceeded { order_id, dlc }).await?,
Err(error) => this.send(SetupFailed { order_id, error }).await?,
};
Ok(())
});
Ok(())
}
async fn complete(&mut self, completed: Completed, ctx: &mut xtra::Context<Self>) {
let _ = self.on_completed.send(completed).await;
ctx.stop();
}
}
#[xtra_productivity]
impl Actor {
fn handle(&mut self, _msg: Accepted, ctx: &mut xtra::Context<Self>) {
let order_id = self.order.id;
tracing::info!(%order_id, "Maker accepts an order");
let this = ctx
.address()
.expect("actor to be able to give address to itself");
let fut = async {
self.confirm_order
.send(maker_inc_connections::ConfirmOrder {
taker_id: self.taker_id,
order_id,
address: this.clone(),
})
.await
.context("Failed to deliver order confirmation")??;
self.contract_setup(this)
.await
.context("Failed to start contract setup")?;
Ok(())
};
if let Err(error) = fut.await {
tracing::error!(%order_id, "Stopping setup_maker actor: {}", error);
self.complete(Completed::Failed { order_id, error }, ctx)
.await;
return;
}
}
fn handle(&mut self, _msg: Rejected, ctx: &mut xtra::Context<Self>) {
self.complete(Completed::Rejected(self.order.id), ctx).await;
}
fn handle(&mut self, msg: SetupSucceeded, ctx: &mut xtra::Context<Self>) {
self.complete(
Completed::NewContract {
order_id: msg.order_id,
dlc: msg.dlc,
},
ctx,
)
.await
}
fn handle(&mut self, msg: SetupFailed, ctx: &mut xtra::Context<Self>) {
self.complete(
Completed::Failed {
order_id: msg.order_id,
error: msg.error,
},
ctx,
)
.await
}
}
#[xtra_productivity(message_impl = false)]
impl Actor {
fn handle(&mut self, msg: wire::SetupMsg, _ctx: &mut xtra::Context<Self>) -> Result<()> {
let mut sender = self
.setup_msg_sender
.clone()
.context("Cannot forward message to contract setup task")?;
sender.send(msg).await?;
Ok(())
}
}
#[async_trait]
impl xtra::Actor for Actor {
async fn started(&mut self, ctx: &mut xtra::Context<Self>) {
let quantity = self.quantity;
let order = self.order.clone();
if quantity < order.min_quantity || quantity > order.max_quantity {
tracing::info!(
"Order rejected: quantity {} not in range [{}, {}]",
quantity,
order.min_quantity,
order.max_quantity
);
let _ = self
.taker
.send(maker_inc_connections::TakerMessage {
taker_id: self.taker_id,
msg: wire::MakerToTaker::RejectOrder(order.id),
})
.await;
self.complete(Completed::Rejected(order.id), ctx).await;
}
}
async fn stopping(&mut self, ctx: &mut xtra::Context<Self>) -> xtra::KeepRunning {
// inform other actors that we are stopping so that our
// address can be GCd from their AddressMaps
let me = ctx.address().expect("we are still alive");
for channel in self.on_stopping.iter() {
let _ = channel.send(Stopping { me: me.clone() }).await;
}
xtra::KeepRunning::StopAll
}
}
/// Message sent from the `maker_cfd::Actor` to the
/// `setup_maker::Actor` to inform that the maker user has accepted
/// the taker order request from the taker.
pub struct Accepted;
/// Message sent from the `maker_cfd::Actor` to the
/// `setup_maker::Actor` to inform that the maker user has rejected
/// the taker order request from the taker.
pub struct Rejected;
/// Message sent from the `setup_maker::Actor` to the
/// `maker_cfd::Actor` to notify that the contract setup has started.
pub struct Started(pub OrderId);
/// Message sent from the spawned task to `setup_maker::Actor` to
/// notify that the contract setup has finished successfully.
pub struct SetupSucceeded {
order_id: OrderId,
dlc: Dlc,
}
/// Message sent from the spawned task to `setup_maker::Actor` to
/// notify that the contract setup has failed.
pub struct SetupFailed {
order_id: OrderId,
error: anyhow::Error,
}
/// Message sent from the `setup_maker::Actor` to the
/// `maker_cfd::Actor` to notify that the contract setup has finished.
#[derive(Debug)]
#[allow(clippy::large_enum_variant)]
pub enum Completed {
Rejected(OrderId),
NewContract {
order_id: OrderId,
dlc: Dlc,
},
Failed {
order_id: OrderId,
error: anyhow::Error,
},
}
impl xtra::Message for Started {
type Result = ();
}

2
daemon/src/setup_taker.rs

@ -70,7 +70,7 @@ impl Actor {
);
let (sender, receiver) = mpsc::unbounded::<SetupMsg>();
// store the writing end to forward messages from the taker to
// store the writing end to forward messages from the maker to
// the spawned contract setup task
self.setup_msg_sender = Some(sender);

5
daemon/tests/happy_path.rs

@ -81,12 +81,11 @@ async fn taker_takes_order_and_maker_accepts_and_contract_setup() {
.unwrap();
taker.mocks.mock_oracle_announcement().await;
maker.mocks.mock_oracle_announcement().await;
taker.take_order(received.clone(), Usd::new(dec!(5))).await;
let (_, _) = next_cfd(taker.cfd_feed(), maker.cfd_feed()).await.unwrap();
maker.mocks.mock_oracle_announcement().await;
taker.mocks.mock_oracle_announcement().await;
maker.mocks.mock_party_params().await;
taker.mocks.mock_party_params().await;

Loading…
Cancel
Save