Browse Source

Merge #684

684: Introduce short-lived `setup_taker::Actor` r=da-kami a=luckysori



Co-authored-by: Lucas Soriano del Pino <l.soriano.del.pino@gmail.com>
debug-collab-settlement
bors[bot] 3 years ago
committed by GitHub
parent
commit
171c70c11e
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 62
      daemon/src/connection.rs
  2. 3
      daemon/src/lib.rs
  3. 7
      daemon/src/maker_cfd.rs
  4. 17
      daemon/src/setup_contract.rs
  5. 231
      daemon/src/setup_taker.rs
  6. 395
      daemon/src/taker_cfd.rs
  7. 14
      daemon/src/wire.rs
  8. 6
      daemon/tests/happy_path.rs
  9. 2
      daemon/tests/harness/mocks/mod.rs

62
daemon/src/connection.rs

@ -1,6 +1,9 @@
use crate::{log_error, noise, send_to_socket, wire, Tasks}; use crate::model::cfd::OrderId;
use crate::model::Usd;
use crate::{log_error, noise, send_to_socket, setup_taker, wire, Tasks};
use anyhow::Result; use anyhow::Result;
use futures::StreamExt; use futures::StreamExt;
use std::collections::HashMap;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime}; use std::time::{Duration, SystemTime};
@ -28,6 +31,7 @@ pub struct Actor {
/// Max duration since the last heartbeat until we die. /// Max duration since the last heartbeat until we die.
timeout: Duration, timeout: Duration,
connected_state: Option<ConnectedState>, connected_state: Option<ConnectedState>,
setup_actors: HashMap<OrderId, xtra::Address<setup_taker::Actor>>,
} }
pub struct Connect { pub struct Connect {
@ -48,6 +52,19 @@ pub enum ConnectionStatus {
Offline, Offline,
} }
/// Message sent from the `setup_taker::Actor` to the
/// `connection::Actor` so that it can forward it to the maker.
///
/// Additionally, the address of this instance of the
/// `setup_taker::Actor` is included so that the `connection::Actor`
/// knows where to forward the contract setup messages from the maker
/// about this particular order.
pub struct TakeOrder {
pub order_id: OrderId,
pub quantity: Usd,
pub address: xtra::Address<setup_taker::Actor>,
}
impl Actor { impl Actor {
pub fn new( pub fn new(
status_sender: watch::Sender<ConnectionStatus>, status_sender: watch::Sender<ConnectionStatus>,
@ -65,6 +82,7 @@ impl Actor {
maker_to_taker, maker_to_taker,
timeout, timeout,
connected_state: None, connected_state: None,
setup_actors: HashMap::new(),
} }
} }
} }
@ -76,6 +94,22 @@ impl Actor {
} }
} }
#[xtra_productivity]
impl Actor {
async fn handle_take_order(&mut self, msg: TakeOrder) -> Result<()> {
self.send_to_maker
.send(wire::TakerToMaker::TakeOrder {
order_id: msg.order_id,
quantity: msg.quantity,
})
.await?;
self.setup_actors.insert(msg.order_id, msg.address);
Ok(())
}
}
#[xtra_productivity] #[xtra_productivity]
impl Actor { impl Actor {
async fn handle_connect( async fn handle_connect(
@ -147,6 +181,32 @@ impl Actor {
.expect("wire messages only to arrive in connected state") .expect("wire messages only to arrive in connected state")
.last_heartbeat = SystemTime::now(); .last_heartbeat = SystemTime::now();
} }
wire::MakerToTaker::ConfirmOrder(order_id) => match self.setup_actors.get(&order_id) {
Some(addr) => {
let _ = addr.send(setup_taker::Accepted).await;
}
None => {
tracing::warn!(%order_id, "No active contract setup");
}
},
wire::MakerToTaker::RejectOrder(order_id) => match self.setup_actors.get(&order_id) {
Some(addr) => {
let _ = addr.send(setup_taker::Rejected).await;
}
None => {
tracing::warn!(%order_id, "No active contract setup");
}
},
wire::MakerToTaker::Protocol { order_id, msg } => {
match self.setup_actors.get(&order_id) {
Some(addr) => {
let _ = addr.send(msg).await;
}
None => {
tracing::warn!(%order_id, "No active contract setup");
}
}
}
other => { other => {
// this one should go to the taker cfd actor // this one should go to the taker cfd actor
log_error!(self.maker_to_taker.send(other)); log_error!(self.maker_to_taker.send(other));

3
daemon/src/lib.rs

@ -43,6 +43,7 @@ pub mod routes;
pub mod seed; pub mod seed;
pub mod send_to_socket; pub mod send_to_socket;
pub mod setup_contract; pub mod setup_contract;
pub mod setup_taker;
pub mod taker_cfd; pub mod taker_cfd;
pub mod to_sse_event; pub mod to_sse_event;
pub mod tokio_ext; pub mod tokio_ext;
@ -250,7 +251,7 @@ where
wallet_addr, wallet_addr,
oracle_pk, oracle_pk,
projection_actor, projection_actor,
Box::new(connection_actor_addr.clone()), connection_actor_addr.clone(),
monitor_addr.clone(), monitor_addr.clone(),
oracle_addr, oracle_addr,
n_payouts, n_payouts,

7
daemon/src/maker_cfd.rs

@ -538,7 +538,7 @@ where
self.takers.clone().into_sink().with(move |msg| { self.takers.clone().into_sink().with(move |msg| {
future::ok(maker_inc_connections::TakerMessage { future::ok(maker_inc_connections::TakerMessage {
taker_id, taker_id,
msg: wire::MakerToTaker::Protocol(msg), msg: wire::MakerToTaker::Protocol { order_id, msg },
}) })
}), }),
receiver, receiver,
@ -551,7 +551,8 @@ where
cfd.order.leverage, cfd.order.leverage,
cfd.refund_timelock_in_blocks(), cfd.refund_timelock_in_blocks(),
), ),
self.wallet.clone(), Box::new(self.wallet.clone()),
Box::new(self.wallet.clone()),
Role::Maker, Role::Maker,
self.n_payouts, self.n_payouts,
); );
@ -1132,7 +1133,7 @@ where
} => { } => {
log_error!(self.handle_initiate_settlement(taker_id, order_id, sig_taker)) log_error!(self.handle_initiate_settlement(taker_id, order_id, sig_taker))
} }
wire::TakerToMaker::Protocol(msg) => { wire::TakerToMaker::Protocol { msg, .. } => {
log_error!(self.handle_inc_protocol_msg(taker_id, msg)) log_error!(self.handle_inc_protocol_msg(taker_id, msg))
} }
wire::TakerToMaker::ProposeRollOver { wire::TakerToMaker::ProposeRollOver {

17
daemon/src/setup_contract.rs

@ -23,7 +23,7 @@ use std::collections::HashMap;
use std::iter::FromIterator; use std::iter::FromIterator;
use std::ops::RangeInclusive; use std::ops::RangeInclusive;
use std::time::Duration; use std::time::Duration;
use xtra::Address; use xtra::prelude::MessageChannel;
pub struct SetupParams { pub struct SetupParams {
margin: Amount, margin: Amount,
@ -56,23 +56,22 @@ impl SetupParams {
/// Given an initial set of parameters, sets up the CFD contract with /// Given an initial set of parameters, sets up the CFD contract with
/// the other party. /// the other party.
pub async fn new<W>( #[allow(clippy::too_many_arguments)]
pub async fn new(
mut sink: impl Sink<SetupMsg, Error = anyhow::Error> + Unpin, mut sink: impl Sink<SetupMsg, Error = anyhow::Error> + Unpin,
mut stream: impl FusedStream<Item = SetupMsg> + Unpin, mut stream: impl FusedStream<Item = SetupMsg> + Unpin,
(oracle_pk, announcement): (schnorrsig::PublicKey, oracle::Announcement), (oracle_pk, announcement): (schnorrsig::PublicKey, oracle::Announcement),
setup_params: SetupParams, setup_params: SetupParams,
wallet: Address<W>, build_party_params_channel: Box<dyn MessageChannel<wallet::BuildPartyParams>>,
sign_channel: Box<dyn MessageChannel<wallet::Sign>>,
role: Role, role: Role,
n_payouts: usize, n_payouts: usize,
) -> Result<Dlc> ) -> Result<Dlc> {
where
W: xtra::Handler<wallet::Sign> + xtra::Handler<wallet::BuildPartyParams>,
{
let (sk, pk) = crate::keypair::new(&mut rand::thread_rng()); let (sk, pk) = crate::keypair::new(&mut rand::thread_rng());
let (rev_sk, rev_pk) = crate::keypair::new(&mut rand::thread_rng()); let (rev_sk, rev_pk) = crate::keypair::new(&mut rand::thread_rng());
let (publish_sk, publish_pk) = crate::keypair::new(&mut rand::thread_rng()); let (publish_sk, publish_pk) = crate::keypair::new(&mut rand::thread_rng());
let own_params = wallet let own_params = build_party_params_channel
.send(wallet::BuildPartyParams { .send(wallet::BuildPartyParams {
amount: setup_params.margin, amount: setup_params.margin,
identity_pk: pk, identity_pk: pk,
@ -211,7 +210,7 @@ where
tracing::info!("Verified all signatures"); tracing::info!("Verified all signatures");
let mut signed_lock_tx = wallet let mut signed_lock_tx = sign_channel
.send(wallet::Sign { psbt: lock_tx }) .send(wallet::Sign { psbt: lock_tx })
.await .await
.context("Failed to send message to wallet actor")? .context("Failed to send message to wallet actor")?

231
daemon/src/setup_taker.rs

@ -0,0 +1,231 @@
use crate::model::cfd::{Cfd, CfdState, Dlc, Order, OrderId, Role};
use crate::model::Usd;
use crate::oracle::Announcement;
use crate::setup_contract::{self, SetupParams};
use crate::tokio_ext::spawn_fallible;
use crate::wire::{self, SetupMsg};
use crate::{connection, wallet};
use anyhow::{Context, Result};
use async_trait::async_trait;
use futures::channel::mpsc::{self, UnboundedSender};
use futures::{future, SinkExt};
use maia::secp256k1_zkp::schnorrsig;
use xtra::prelude::*;
use xtra_productivity::xtra_productivity;
pub struct Actor {
order: Order,
quantity: Usd,
n_payouts: usize,
oracle_pk: schnorrsig::PublicKey,
announcement: Announcement,
build_party_params: Box<dyn MessageChannel<wallet::BuildPartyParams>>,
sign: Box<dyn MessageChannel<wallet::Sign>>,
maker: xtra::Address<connection::Actor>,
on_accepted: Box<dyn MessageChannel<Started>>,
on_completed: Box<dyn MessageChannel<Completed>>,
setup_msg_sender: Option<UnboundedSender<SetupMsg>>,
}
impl Actor {
#[allow(clippy::too_many_arguments)]
pub fn new(
(order, quantity, n_payouts): (Order, Usd, usize),
(oracle_pk, announcement): (schnorrsig::PublicKey, Announcement),
build_party_params: &(impl MessageChannel<wallet::BuildPartyParams> + 'static),
sign: &(impl MessageChannel<wallet::Sign> + 'static),
maker: xtra::Address<connection::Actor>,
on_accepted: &(impl MessageChannel<Started> + 'static),
on_completed: &(impl MessageChannel<Completed> + 'static),
) -> Self {
Self {
order,
quantity,
n_payouts,
oracle_pk,
announcement,
build_party_params: build_party_params.clone_channel(),
sign: sign.clone_channel(),
maker,
on_accepted: on_accepted.clone_channel(),
on_completed: on_completed.clone_channel(),
setup_msg_sender: None,
}
}
}
#[xtra_productivity]
impl Actor {
fn handle(&mut self, _: Accepted, ctx: &mut xtra::Context<Self>) -> Result<()> {
let order_id = self.order.id;
tracing::info!(%order_id, "Order got accepted");
// inform the `taker_cfd::Actor` about the start of contract
// setup, so that the db and UI can be updated accordingly
self.on_accepted.send(Started(order_id)).await?;
let cfd = Cfd::new(
self.order.clone(),
self.quantity,
CfdState::contract_setup(),
);
let (sender, receiver) = mpsc::unbounded::<SetupMsg>();
// store the writing end to forward messages from the taker to
// the spawned contract setup task
self.setup_msg_sender = Some(sender);
let contract_future = setup_contract::new(
xtra::message_channel::MessageChannel::sink(&self.maker)
.with(move |msg| future::ok(wire::TakerToMaker::Protocol { order_id, msg })),
receiver,
(self.oracle_pk, self.announcement.clone()),
SetupParams::new(
cfd.margin()?,
cfd.counterparty_margin()?,
cfd.order.price,
cfd.quantity_usd,
cfd.order.leverage,
cfd.refund_timelock_in_blocks(),
),
self.build_party_params.clone_channel(),
self.sign.clone_channel(),
Role::Taker,
self.n_payouts,
);
let this = ctx.address().expect("self to be alive");
spawn_fallible::<_, anyhow::Error>(async move {
let _ = match contract_future.await {
Ok(dlc) => this.send(SetupSucceeded { order_id, dlc }).await?,
Err(error) => this.send(SetupFailed { order_id, error }).await?,
};
Ok(())
});
Ok(())
}
fn handle(&mut self, _: Rejected, ctx: &mut xtra::Context<Self>) -> Result<()> {
self.on_completed
.send(Completed::Rejected {
order_id: self.order.id,
})
.await?;
ctx.stop();
Ok(())
}
fn handle(&mut self, msg: wire::SetupMsg, _ctx: &mut xtra::Context<Self>) -> Result<()> {
let mut sender = self
.setup_msg_sender
.clone()
.context("Cannot forward message to contract setup task")?;
sender.send(msg).await?;
Ok(())
}
fn handle(&mut self, msg: SetupSucceeded, ctx: &mut xtra::Context<Self>) -> Result<()> {
self.on_completed
.send(Completed::NewContract {
order_id: msg.order_id,
dlc: msg.dlc,
})
.await?;
ctx.stop();
Ok(())
}
fn handle(&mut self, msg: SetupFailed, ctx: &mut xtra::Context<Self>) -> Result<()> {
self.on_completed
.send(Completed::Failed {
order_id: msg.order_id,
error: msg.error,
})
.await?;
ctx.stop();
Ok(())
}
}
#[async_trait]
impl xtra::Actor for Actor {
async fn started(&mut self, ctx: &mut xtra::Context<Self>) {
let address = ctx
.address()
.expect("actor to be able to give address to itself");
let res = self
.maker
.send(connection::TakeOrder {
order_id: self.order.id,
quantity: self.quantity,
address,
})
.await;
if let Err(e) = res {
tracing::error!(%self.order.id, "Stopping setup_taker actor: {}", e);
ctx.stop()
}
}
}
/// Message sent from the `connection::Actor` to the
/// `setup_taker::Actor` to notify that the order taken was accepted
/// by the maker.
pub struct Accepted;
/// Message sent from the `setup_taker::Actor` to the
/// `taker_cfd::Actor` to notify that the contract setup has started.
pub struct Started(pub OrderId);
/// Message sent from the `connection::Actor` to the
/// `setup_taker::Actor` to notify that the order taken was rejected
/// by the maker.
pub struct Rejected;
/// Message sent from the spawned task to `setup_taker::Actor` to
/// notify that the contract setup has finished successfully.
pub struct SetupSucceeded {
order_id: OrderId,
dlc: Dlc,
}
/// Message sent from the spawned task to `setup_taker::Actor` to
/// notify that the contract setup has failed.
pub struct SetupFailed {
order_id: OrderId,
error: anyhow::Error,
}
/// Message sent from the `setup_taker::Actor` to the
/// `taker_cfd::Actor` to notify that the contract setup has finished.
pub enum Completed {
NewContract {
order_id: OrderId,
dlc: Dlc,
},
Rejected {
order_id: OrderId,
},
Failed {
order_id: OrderId,
error: anyhow::Error,
},
}
impl xtra::Message for Started {
type Result = ();
}
impl xtra::Message for Completed {
type Result = ();
}

395
daemon/src/taker_cfd.rs

@ -6,18 +6,22 @@ use crate::model::cfd::{
}; };
use crate::model::{BitMexPriceEventId, Price, Timestamp, Usd}; use crate::model::{BitMexPriceEventId, Price, Timestamp, Usd};
use crate::monitor::{self, MonitorParams}; use crate::monitor::{self, MonitorParams};
use crate::setup_contract::{RolloverParams, SetupParams}; use crate::setup_contract::RolloverParams;
use crate::tokio_ext::FutureExt; use crate::tokio_ext::FutureExt;
use crate::wire::{MakerToTaker, RollOverMsg, SetupMsg}; use crate::wire::RollOverMsg;
use crate::{log_error, oracle, projection, setup_contract, wallet, wire}; use crate::{
connection, log_error, oracle, projection, setup_contract, setup_taker, wallet, wire, Tasks,
};
use anyhow::{bail, Context as _, Result}; use anyhow::{bail, Context as _, Result};
use async_trait::async_trait; use async_trait::async_trait;
use bdk::bitcoin::secp256k1::schnorrsig; use bdk::bitcoin::secp256k1::schnorrsig;
use futures::channel::mpsc; use futures::channel::mpsc;
use futures::future::RemoteHandle; use futures::future::RemoteHandle;
use futures::{future, SinkExt}; use futures::{future, SinkExt};
use std::collections::hash_map::Entry;
use std::collections::HashMap; use std::collections::HashMap;
use xtra::prelude::*; use xtra::prelude::*;
use xtra::Actor as _;
pub struct TakeOffer { pub struct TakeOffer {
pub order_id: OrderId, pub order_id: OrderId,
@ -37,24 +41,11 @@ pub enum CfdAction {
}, },
} }
pub struct CfdSetupCompleted {
pub order_id: OrderId,
pub dlc: Result<Dlc>,
}
pub struct CfdRollOverCompleted { pub struct CfdRollOverCompleted {
pub order_id: OrderId, pub order_id: OrderId,
pub dlc: Result<Dlc>, pub dlc: Result<Dlc>,
} }
enum SetupState {
Active {
sender: mpsc::UnboundedSender<SetupMsg>,
_task: RemoteHandle<()>,
},
None,
}
enum RollOverState { enum RollOverState {
Active { Active {
sender: mpsc::UnboundedSender<RollOverMsg>, sender: mpsc::UnboundedSender<RollOverMsg>,
@ -68,13 +59,14 @@ pub struct Actor<O, M, W> {
wallet: Address<W>, wallet: Address<W>,
oracle_pk: schnorrsig::PublicKey, oracle_pk: schnorrsig::PublicKey,
projection_actor: Address<projection::Actor>, projection_actor: Address<projection::Actor>,
send_to_maker: Box<dyn MessageChannel<wire::TakerToMaker>>, conn_actor: Address<connection::Actor>,
monitor_actor: Address<M>, monitor_actor: Address<M>,
setup_state: SetupState, setup_actors: HashMap<OrderId, xtra::Address<setup_taker::Actor>>,
roll_over_state: RollOverState, roll_over_state: RollOverState,
oracle_actor: Address<O>, oracle_actor: Address<O>,
current_pending_proposals: UpdateCfdProposals, current_pending_proposals: UpdateCfdProposals,
n_payouts: usize, n_payouts: usize,
tasks: Tasks,
} }
impl<O, M, W> Actor<O, M, W> impl<O, M, W> Actor<O, M, W>
@ -89,7 +81,7 @@ where
wallet: Address<W>, wallet: Address<W>,
oracle_pk: schnorrsig::PublicKey, oracle_pk: schnorrsig::PublicKey,
projection_actor: Address<projection::Actor>, projection_actor: Address<projection::Actor>,
send_to_maker: Box<dyn MessageChannel<wire::TakerToMaker>>, conn_actor: Address<connection::Actor>,
monitor_actor: Address<M>, monitor_actor: Address<M>,
oracle_actor: Address<O>, oracle_actor: Address<O>,
n_payouts: usize, n_payouts: usize,
@ -99,13 +91,14 @@ where
wallet, wallet,
oracle_pk, oracle_pk,
projection_actor, projection_actor,
send_to_maker, conn_actor,
monitor_actor, monitor_actor,
setup_state: SetupState::None,
roll_over_state: RollOverState::None, roll_over_state: RollOverState::None,
oracle_actor, oracle_actor,
current_pending_proposals: HashMap::new(), current_pending_proposals: HashMap::new(),
n_payouts, n_payouts,
setup_actors: HashMap::new(),
tasks: Tasks::default(),
} }
} }
} }
@ -139,33 +132,6 @@ impl<O, M, W> Actor<O, M, W> {
} }
} }
} }
async fn handle_take_offer(&mut self, order_id: OrderId, quantity: Usd) -> Result<()> {
let mut conn = self.db.acquire().await?;
let current_order = load_order_by_id(order_id, &mut conn).await?;
tracing::info!("Taking current order: {:?}", &current_order);
let cfd = Cfd::new(
current_order.clone(),
quantity,
CfdState::outgoing_order_request(),
);
insert_cfd_and_send_to_feed(&cfd, &mut conn, &self.projection_actor).await?;
// Cleanup own order feed, after inserting the cfd.
// Due to the 1:1 relationship between order and cfd we can never create another cfd for the
// same order id.
self.projection_actor.send(projection::Update(None)).await?;
self.send_to_maker
.send(wire::TakerToMaker::TakeOrder { order_id, quantity })
.await?;
Ok(())
}
} }
impl<O, M, W> Actor<O, M, W> impl<O, M, W> Actor<O, M, W>
@ -218,7 +184,7 @@ where
); );
self.send_pending_update_proposals().await?; self.send_pending_update_proposals().await?;
self.send_to_maker self.conn_actor
.send(wire::TakerToMaker::ProposeSettlement { .send(wire::TakerToMaker::ProposeSettlement {
order_id: proposal.order_id, order_id: proposal.order_id,
timestamp: proposal.timestamp, timestamp: proposal.timestamp,
@ -230,12 +196,6 @@ where
Ok(()) Ok(())
} }
async fn handle_order_rejected(&mut self, order_id: OrderId) -> Result<()> {
self.append_cfd_state_rejected(order_id).await?;
Ok(())
}
async fn handle_settlement_rejected(&mut self, order_id: OrderId) -> Result<()> { async fn handle_settlement_rejected(&mut self, order_id: OrderId) -> Result<()> {
tracing::info!(%order_id, "Settlement proposal got rejected"); tracing::info!(%order_id, "Settlement proposal got rejected");
@ -254,19 +214,6 @@ where
Ok(()) Ok(())
} }
async fn handle_inc_protocol_msg(&mut self, msg: SetupMsg) -> Result<()> {
match &mut self.setup_state {
SetupState::Active { sender, .. } => {
sender.send(msg).await?;
}
SetupState::None => {
anyhow::bail!("Received setup message without an active contract setup")
}
}
Ok(())
}
async fn handle_inc_roll_over_msg(&mut self, msg: RollOverMsg) -> Result<()> { async fn handle_inc_roll_over_msg(&mut self, msg: RollOverMsg) -> Result<()> {
match &mut self.roll_over_state { match &mut self.roll_over_state {
RollOverState::Active { sender, .. } => { RollOverState::Active { sender, .. } => {
@ -287,17 +234,6 @@ where
Ok(()) Ok(())
} }
async fn append_cfd_state_rejected(&mut self, order_id: OrderId) -> Result<()> {
tracing::debug!(%order_id, "Order rejected");
let mut conn = self.db.acquire().await?;
let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
cfd.state = CfdState::rejected();
append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?;
Ok(())
}
} }
impl<O, M, W> Actor<O, M, W> { impl<O, M, W> Actor<O, M, W> {
@ -329,6 +265,67 @@ impl<O, M, W> Actor<O, M, W> {
Ok(()) Ok(())
} }
async fn append_cfd_state_rejected(&mut self, order_id: OrderId) -> Result<()> {
tracing::debug!(%order_id, "Order rejected");
let mut conn = self.db.acquire().await?;
let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
cfd.state = CfdState::rejected();
append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?;
Ok(())
}
async fn append_cfd_state_setup_failed(
&mut self,
order_id: OrderId,
error: anyhow::Error,
) -> Result<()> {
tracing::error!(%order_id, "Contract setup failed: {:#?}", error);
let mut conn = self.db.acquire().await?;
let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
cfd.state = CfdState::setup_failed(error.to_string());
append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?;
Ok(())
}
/// Set the state of the CFD in the database to `ContractSetup`
/// and update the corresponding projection.
async fn handle_setup_started(&mut self, order_id: OrderId) -> Result<()> {
let mut conn = self.db.acquire().await?;
let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
cfd.state = CfdState::contract_setup();
append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?;
Ok(())
}
}
impl<O, M, W> Actor<O, M, W>
where
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
async fn handle_monitoring_event(&mut self, event: monitor::Event) -> Result<()> {
let mut conn = self.db.acquire().await?;
cfd_actors::handle_monitoring_event(event, &mut conn, &self.wallet, &self.projection_actor)
.await?;
Ok(())
}
async fn handle_oracle_attestation(&mut self, attestation: oracle::Attestation) -> Result<()> {
let mut conn = self.db.acquire().await?;
cfd_actors::handle_oracle_attestation(
attestation,
&mut conn,
&self.wallet,
&self.projection_actor,
)
.await?;
Ok(())
}
async fn handle_propose_roll_over(&mut self, order_id: OrderId) -> Result<()> { async fn handle_propose_roll_over(&mut self, order_id: OrderId) -> Result<()> {
if self.current_pending_proposals.contains_key(&order_id) { if self.current_pending_proposals.contains_key(&order_id) {
anyhow::bail!("An update for order id {} is already in progress", order_id) anyhow::bail!("An update for order id {} is already in progress", order_id)
@ -348,7 +345,7 @@ impl<O, M, W> Actor<O, M, W> {
); );
self.send_pending_update_proposals().await?; self.send_pending_update_proposals().await?;
self.send_to_maker self.conn_actor
.send(wire::TakerToMaker::ProposeRollOver { .send(wire::TakerToMaker::ProposeRollOver {
order_id: proposal.order_id, order_id: proposal.order_id,
timestamp: proposal.timestamp, timestamp: proposal.timestamp,
@ -360,24 +357,75 @@ impl<O, M, W> Actor<O, M, W> {
impl<O, M, W> Actor<O, M, W> impl<O, M, W> Actor<O, M, W>
where where
W: xtra::Handler<wallet::TryBroadcastTransaction>, Self: xtra::Handler<setup_taker::Completed>,
O: xtra::Handler<oracle::GetAnnouncement> + xtra::Handler<oracle::MonitorAttestation>,
W: xtra::Handler<wallet::BuildPartyParams> + xtra::Handler<wallet::Sign>,
{ {
async fn handle_oracle_attestation(&mut self, attestation: oracle::Attestation) -> Result<()> { async fn handle_take_offer(
&mut self,
order_id: OrderId,
quantity: Usd,
ctx: &mut Context<Self>,
) -> Result<()> {
let entry = self.setup_actors.entry(order_id);
if matches!(entry, Entry::Occupied(ref occupied) if occupied.get().is_connected()) {
bail!(
"A contract setup for order id {} is already in progress",
order_id
)
}
let mut conn = self.db.acquire().await?; let mut conn = self.db.acquire().await?;
cfd_actors::handle_oracle_attestation(
attestation, let current_order = load_order_by_id(order_id, &mut conn).await?;
&mut conn,
tracing::info!("Taking current order: {:?}", &current_order);
let cfd = Cfd::new(
current_order.clone(),
quantity,
CfdState::outgoing_order_request(),
);
insert_cfd_and_send_to_feed(&cfd, &mut conn, &self.projection_actor).await?;
// Cleanup own order feed, after inserting the cfd.
// Due to the 1:1 relationship between order and cfd we can never create another cfd for the
// same order id.
self.projection_actor.send(projection::Update(None)).await?;
let announcement = self
.oracle_actor
.send(oracle::GetAnnouncement(cfd.order.oracle_event_id))
.await?
.with_context(|| format!("Announcement {} not found", cfd.order.oracle_event_id))?;
let this = ctx
.address()
.expect("actor to be able to give address to itself");
let (addr, fut) = setup_taker::Actor::new(
(current_order, quantity, self.n_payouts),
(self.oracle_pk, announcement),
&self.wallet, &self.wallet,
&self.projection_actor, &self.wallet,
self.conn_actor.clone(),
&this,
&this,
) )
.await?; .create(None)
Ok(()) .run();
match entry {
Entry::Occupied(mut disconnected) => {
disconnected.insert(addr);
}
Entry::Vacant(vacant) => {
vacant.insert(addr);
}
} }
async fn handle_monitoring_event(&mut self, event: monitor::Event) -> Result<()> { self.tasks.add(fut);
let mut conn = self.db.acquire().await?;
cfd_actors::handle_monitoring_event(event, &mut conn, &self.wallet, &self.projection_actor)
.await?;
Ok(()) Ok(())
} }
} }
@ -388,30 +436,22 @@ where
M: xtra::Handler<monitor::StartMonitoring>, M: xtra::Handler<monitor::StartMonitoring>,
W: xtra::Handler<wallet::TryBroadcastTransaction>, W: xtra::Handler<wallet::TryBroadcastTransaction>,
{ {
async fn handle_cfd_setup_completed( async fn handle_setup_completed(&mut self, msg: setup_taker::Completed) -> Result<()> {
&mut self, let (order_id, dlc) = match msg {
order_id: OrderId, setup_taker::Completed::NewContract { order_id, dlc } => (order_id, dlc),
dlc: Result<Dlc>, setup_taker::Completed::Rejected { order_id } => {
) -> Result<()> { self.append_cfd_state_rejected(order_id).await?;
self.setup_state = SetupState::None; return Ok(());
}
setup_taker::Completed::Failed { order_id, error } => {
self.append_cfd_state_setup_failed(order_id, error).await?;
return Ok(());
}
};
let mut conn = self.db.acquire().await?; let mut conn = self.db.acquire().await?;
let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?; let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
let dlc = match dlc {
Ok(dlc) => dlc,
Err(e) => {
cfd.state = CfdState::SetupFailed {
common: CfdStateCommon::default(),
info: e.to_string(),
};
append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?;
return Err(e);
}
};
tracing::info!("Setup complete, publishing on chain now"); tracing::info!("Setup complete, publishing on chain now");
cfd.state = CfdState::PendingOpen { cfd.state = CfdState::PendingOpen {
@ -448,79 +488,6 @@ where
} }
} }
impl<O: 'static, M: 'static, W: 'static> Actor<O, M, W>
where
Self: xtra::Handler<CfdSetupCompleted>,
O: xtra::Handler<oracle::GetAnnouncement> + xtra::Handler<oracle::MonitorAttestation>,
W: xtra::Handler<wallet::Sign> + xtra::Handler<wallet::BuildPartyParams>,
{
async fn handle_order_accepted(
&mut self,
order_id: OrderId,
ctx: &mut Context<Self>,
) -> Result<()> {
tracing::info!(%order_id, "Order got accepted");
let (sender, receiver) = mpsc::unbounded();
if let SetupState::Active { .. } = self.setup_state {
anyhow::bail!("Already setting up a contract!")
}
let mut conn = self.db.acquire().await?;
let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
cfd.state = CfdState::contract_setup();
append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?;
let offer_announcement = self
.oracle_actor
.send(oracle::GetAnnouncement(cfd.order.oracle_event_id))
.await?
.with_context(|| format!("Announcement {} not found", cfd.order.oracle_event_id))?;
let contract_future = setup_contract::new(
self.send_to_maker
.sink()
.clone_message_sink()
.with(|msg| future::ok(wire::TakerToMaker::Protocol(msg))),
receiver,
(self.oracle_pk, offer_announcement),
SetupParams::new(
cfd.margin()?,
cfd.counterparty_margin()?,
cfd.order.price,
cfd.quantity_usd,
cfd.order.leverage,
cfd.refund_timelock_in_blocks(),
),
self.wallet.clone(),
Role::Taker,
self.n_payouts,
);
let this = ctx
.address()
.expect("actor to be able to give address to itself");
let task = async move {
let dlc = contract_future.await;
this.send(CfdSetupCompleted { order_id, dlc })
.await
.expect("always connected to ourselves")
}
.spawn_with_handle();
self.setup_state = SetupState::Active {
sender,
_task: task,
};
Ok(())
}
}
impl<O: 'static, M: 'static, W: 'static> Actor<O, M, W> impl<O: 'static, M: 'static, W: 'static> Actor<O, M, W>
where where
Self: xtra::Handler<CfdRollOverCompleted>, Self: xtra::Handler<CfdRollOverCompleted>,
@ -555,8 +522,7 @@ where
.with_context(|| format!("Announcement {} not found", oracle_event_id))?; .with_context(|| format!("Announcement {} not found", oracle_event_id))?;
let contract_future = setup_contract::roll_over( let contract_future = setup_contract::roll_over(
self.send_to_maker xtra::message_channel::MessageChannel::sink(&self.conn_actor)
.sink()
.with(|msg| future::ok(wire::TakerToMaker::RollOverProtocol(msg))), .with(|msg| future::ok(wire::TakerToMaker::RollOverProtocol(msg))),
receiver, receiver,
(self.oracle_pk, announcement), (self.oracle_pk, announcement),
@ -659,7 +625,7 @@ where
let proposal = self.get_settlement_proposal(order_id)?; let proposal = self.get_settlement_proposal(order_id)?;
let (tx, sig_taker) = dlc.close_transaction(proposal)?; let (tx, sig_taker) = dlc.close_transaction(proposal)?;
self.send_to_maker self.conn_actor
.send(wire::TakerToMaker::InitiateSettlement { .send(wire::TakerToMaker::InitiateSettlement {
order_id, order_id,
sig_taker, sig_taker,
@ -687,9 +653,15 @@ where
} }
#[async_trait] #[async_trait]
impl<O: 'static, M: 'static, W: 'static> Handler<TakeOffer> for Actor<O, M, W> { impl<O: 'static, M: 'static, W: 'static> Handler<TakeOffer> for Actor<O, M, W>
async fn handle(&mut self, msg: TakeOffer, _ctx: &mut Context<Self>) -> Result<()> { where
self.handle_take_offer(msg.order_id, msg.quantity).await Self: xtra::Handler<setup_taker::Completed>,
O: xtra::Handler<oracle::GetAnnouncement> + xtra::Handler<oracle::MonitorAttestation>,
W: xtra::Handler<wallet::BuildPartyParams> + xtra::Handler<wallet::Sign>,
{
async fn handle(&mut self, msg: TakeOffer, ctx: &mut Context<Self>) -> Result<()> {
self.handle_take_offer(msg.order_id, msg.quantity, ctx)
.await
} }
} }
@ -724,7 +696,7 @@ where
#[async_trait] #[async_trait]
impl<O: 'static, M: 'static, W: 'static> Handler<wire::MakerToTaker> for Actor<O, M, W> impl<O: 'static, M: 'static, W: 'static> Handler<wire::MakerToTaker> for Actor<O, M, W>
where where
Self: xtra::Handler<CfdSetupCompleted> + xtra::Handler<CfdRollOverCompleted>, Self: xtra::Handler<CfdRollOverCompleted>,
O: xtra::Handler<oracle::GetAnnouncement> + xtra::Handler<oracle::MonitorAttestation>, O: xtra::Handler<oracle::GetAnnouncement> + xtra::Handler<oracle::MonitorAttestation>,
M: xtra::Handler<monitor::CollaborativeSettlement>, M: xtra::Handler<monitor::CollaborativeSettlement>,
W: xtra::Handler<wallet::TryBroadcastTransaction> W: xtra::Handler<wallet::TryBroadcastTransaction>
@ -733,18 +705,9 @@ where
{ {
async fn handle(&mut self, msg: wire::MakerToTaker, ctx: &mut Context<Self>) { async fn handle(&mut self, msg: wire::MakerToTaker, ctx: &mut Context<Self>) {
match msg { match msg {
wire::MakerToTaker::Heartbeat => {
unreachable!("Heartbeats should be handled somewhere else")
}
wire::MakerToTaker::CurrentOrder(current_order) => { wire::MakerToTaker::CurrentOrder(current_order) => {
log_error!(self.handle_new_order(current_order)) log_error!(self.handle_new_order(current_order))
} }
wire::MakerToTaker::ConfirmOrder(order_id) => {
log_error!(self.handle_order_accepted(order_id, ctx))
}
wire::MakerToTaker::RejectOrder(order_id) => {
log_error!(self.handle_order_rejected(order_id))
}
wire::MakerToTaker::ConfirmSettlement(order_id) => { wire::MakerToTaker::ConfirmSettlement(order_id) => {
log_error!(self.handle_settlement_accepted(order_id, ctx)) log_error!(self.handle_settlement_accepted(order_id, ctx))
} }
@ -754,9 +717,6 @@ where
wire::MakerToTaker::InvalidOrderId(order_id) => { wire::MakerToTaker::InvalidOrderId(order_id) => {
log_error!(self.handle_invalid_order_id(order_id)) log_error!(self.handle_invalid_order_id(order_id))
} }
wire::MakerToTaker::Protocol(setup_msg) => {
log_error!(self.handle_inc_protocol_msg(setup_msg))
}
wire::MakerToTaker::ConfirmRollOver { wire::MakerToTaker::ConfirmRollOver {
order_id, order_id,
oracle_event_id, oracle_event_id,
@ -766,22 +726,30 @@ where
wire::MakerToTaker::RejectRollOver(order_id) => { wire::MakerToTaker::RejectRollOver(order_id) => {
log_error!(self.handle_roll_over_rejected(order_id)) log_error!(self.handle_roll_over_rejected(order_id))
} }
MakerToTaker::RollOverProtocol(roll_over_msg) => { wire::MakerToTaker::RollOverProtocol(roll_over_msg) => {
log_error!(self.handle_inc_roll_over_msg(roll_over_msg)) log_error!(self.handle_inc_roll_over_msg(roll_over_msg))
} }
wire::MakerToTaker::Heartbeat => {
unreachable!("Heartbeats should be handled somewhere else")
}
wire::MakerToTaker::ConfirmOrder(_)
| wire::MakerToTaker::RejectOrder(_)
| wire::MakerToTaker::Protocol { .. } => {
unreachable!("These messages should be sent to the `setup_taker::Actor`")
}
} }
} }
} }
#[async_trait] #[async_trait]
impl<O: 'static, M: 'static, W: 'static> Handler<CfdSetupCompleted> for Actor<O, M, W> impl<O: 'static, M: 'static, W: 'static> Handler<setup_taker::Completed> for Actor<O, M, W>
where where
O: xtra::Handler<oracle::MonitorAttestation>, O: xtra::Handler<oracle::MonitorAttestation>,
M: xtra::Handler<monitor::StartMonitoring>, M: xtra::Handler<monitor::StartMonitoring>,
W: xtra::Handler<wallet::TryBroadcastTransaction>, W: xtra::Handler<wallet::TryBroadcastTransaction>,
{ {
async fn handle(&mut self, msg: CfdSetupCompleted, _ctx: &mut Context<Self>) { async fn handle(&mut self, msg: setup_taker::Completed, _ctx: &mut Context<Self>) {
log_error!(self.handle_cfd_setup_completed(msg.order_id, msg.dlc)); log_error!(self.handle_setup_completed(msg))
} }
} }
@ -816,6 +784,13 @@ where
} }
} }
#[async_trait]
impl<O: 'static, M: 'static, W: 'static> Handler<setup_taker::Started> for Actor<O, M, W> {
async fn handle(&mut self, msg: setup_taker::Started, _ctx: &mut Context<Self>) {
log_error!(self.handle_setup_started(msg.0))
}
}
impl Message for TakeOffer { impl Message for TakeOffer {
type Result = Result<()>; type Result = Result<()>;
} }
@ -824,10 +799,6 @@ impl Message for CfdAction {
type Result = Result<()>; type Result = Result<()>;
} }
impl Message for CfdSetupCompleted {
type Result = ();
}
impl Message for CfdRollOverCompleted { impl Message for CfdRollOverCompleted {
type Result = (); type Result = ();
} }

14
daemon/src/wire.rs

@ -43,7 +43,10 @@ pub enum TakerToMaker {
order_id: OrderId, order_id: OrderId,
timestamp: Timestamp, timestamp: Timestamp,
}, },
Protocol(SetupMsg), Protocol {
order_id: OrderId,
msg: SetupMsg,
},
RollOverProtocol(RollOverMsg), RollOverProtocol(RollOverMsg),
} }
@ -53,7 +56,7 @@ impl fmt::Display for TakerToMaker {
TakerToMaker::TakeOrder { .. } => write!(f, "TakeOrder"), TakerToMaker::TakeOrder { .. } => write!(f, "TakeOrder"),
TakerToMaker::ProposeSettlement { .. } => write!(f, "ProposeSettlement"), TakerToMaker::ProposeSettlement { .. } => write!(f, "ProposeSettlement"),
TakerToMaker::InitiateSettlement { .. } => write!(f, "InitiateSettlement"), TakerToMaker::InitiateSettlement { .. } => write!(f, "InitiateSettlement"),
TakerToMaker::Protocol(_) => write!(f, "Protocol"), TakerToMaker::Protocol { .. } => write!(f, "Protocol"),
TakerToMaker::ProposeRollOver { .. } => write!(f, "ProposeRollOver"), TakerToMaker::ProposeRollOver { .. } => write!(f, "ProposeRollOver"),
TakerToMaker::RollOverProtocol(_) => write!(f, "RollOverProtocol"), TakerToMaker::RollOverProtocol(_) => write!(f, "RollOverProtocol"),
} }
@ -72,7 +75,10 @@ pub enum MakerToTaker {
ConfirmSettlement(OrderId), ConfirmSettlement(OrderId),
RejectSettlement(OrderId), RejectSettlement(OrderId),
InvalidOrderId(OrderId), InvalidOrderId(OrderId),
Protocol(SetupMsg), Protocol {
order_id: OrderId,
msg: SetupMsg,
},
RollOverProtocol(RollOverMsg), RollOverProtocol(RollOverMsg),
ConfirmRollOver { ConfirmRollOver {
order_id: OrderId, order_id: OrderId,
@ -91,7 +97,7 @@ impl fmt::Display for MakerToTaker {
MakerToTaker::ConfirmSettlement(_) => write!(f, "ConfirmSettlement"), MakerToTaker::ConfirmSettlement(_) => write!(f, "ConfirmSettlement"),
MakerToTaker::RejectSettlement(_) => write!(f, "RejectSettlement"), MakerToTaker::RejectSettlement(_) => write!(f, "RejectSettlement"),
MakerToTaker::InvalidOrderId(_) => write!(f, "InvalidOrderId"), MakerToTaker::InvalidOrderId(_) => write!(f, "InvalidOrderId"),
MakerToTaker::Protocol(_) => write!(f, "Protocol"), MakerToTaker::Protocol { .. } => write!(f, "Protocol"),
MakerToTaker::ConfirmRollOver { .. } => write!(f, "ConfirmRollOver"), MakerToTaker::ConfirmRollOver { .. } => write!(f, "ConfirmRollOver"),
MakerToTaker::RejectRollOver(_) => write!(f, "RejectRollOver"), MakerToTaker::RejectRollOver(_) => write!(f, "RejectRollOver"),
MakerToTaker::RollOverProtocol(_) => write!(f, "RollOverProtocol"), MakerToTaker::RollOverProtocol(_) => write!(f, "RollOverProtocol"),

6
daemon/tests/happy_path.rs

@ -40,6 +40,7 @@ async fn taker_takes_order_and_maker_rejects() {
.await .await
.unwrap(); .unwrap();
taker.mocks.mock_oracle_announcement().await;
taker.take_order(received.clone(), Usd::new(dec!(10))).await; taker.take_order(received.clone(), Usd::new(dec!(10))).await;
let (taker_cfd, maker_cfd) = next_cfd(taker.cfd_feed(), maker.cfd_feed()).await.unwrap(); let (taker_cfd, maker_cfd) = next_cfd(taker.cfd_feed(), maker.cfd_feed()).await.unwrap();
@ -77,11 +78,12 @@ async fn taker_takes_order_and_maker_accepts_and_contract_setup() {
.await .await
.unwrap(); .unwrap();
taker.mocks.mock_oracle_announcement().await;
taker.take_order(received.clone(), Usd::new(dec!(5))).await; taker.take_order(received.clone(), Usd::new(dec!(5))).await;
let (_, _) = next_cfd(taker.cfd_feed(), maker.cfd_feed()).await.unwrap(); let (_, _) = next_cfd(taker.cfd_feed(), maker.cfd_feed()).await.unwrap();
maker.mocks.mock_oracle_annoucement().await; maker.mocks.mock_oracle_announcement().await;
taker.mocks.mock_oracle_annoucement().await; taker.mocks.mock_oracle_announcement().await;
maker.mocks.mock_party_params().await; maker.mocks.mock_party_params().await;
taker.mocks.mock_party_params().await; taker.mocks.mock_party_params().await;

2
daemon/tests/harness/mocks/mod.rs

@ -54,7 +54,7 @@ impl Mocks {
.in_sequence(&mut seq); .in_sequence(&mut seq);
} }
pub async fn mock_oracle_annoucement(&mut self) { pub async fn mock_oracle_announcement(&mut self) {
self.oracle() self.oracle()
.await .await
.expect_get_announcement() .expect_get_announcement()

Loading…
Cancel
Save