You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
232 lines
6.8 KiB
232 lines
6.8 KiB
3 years ago
|
use crate::model::cfd::{Cfd, CfdState, Dlc, Order, OrderId, Role};
|
||
|
use crate::model::Usd;
|
||
|
use crate::oracle::Announcement;
|
||
|
use crate::setup_contract::{self, SetupParams};
|
||
|
use crate::tokio_ext::spawn_fallible;
|
||
|
use crate::wire::{self, SetupMsg};
|
||
|
use crate::{connection, wallet};
|
||
|
use anyhow::{Context, Result};
|
||
|
use async_trait::async_trait;
|
||
|
use futures::channel::mpsc::{self, UnboundedSender};
|
||
|
use futures::{future, SinkExt};
|
||
|
use maia::secp256k1_zkp::schnorrsig;
|
||
|
use xtra::prelude::*;
|
||
|
use xtra_productivity::xtra_productivity;
|
||
|
|
||
|
pub struct Actor {
|
||
|
order: Order,
|
||
|
quantity: Usd,
|
||
|
n_payouts: usize,
|
||
|
oracle_pk: schnorrsig::PublicKey,
|
||
|
announcement: Announcement,
|
||
|
build_party_params: Box<dyn MessageChannel<wallet::BuildPartyParams>>,
|
||
|
sign: Box<dyn MessageChannel<wallet::Sign>>,
|
||
|
maker: xtra::Address<connection::Actor>,
|
||
|
on_accepted: Box<dyn MessageChannel<Started>>,
|
||
|
on_completed: Box<dyn MessageChannel<Completed>>,
|
||
|
setup_msg_sender: Option<UnboundedSender<SetupMsg>>,
|
||
|
}
|
||
|
|
||
|
impl Actor {
|
||
|
#[allow(clippy::too_many_arguments)]
|
||
|
pub fn new(
|
||
|
(order, quantity, n_payouts): (Order, Usd, usize),
|
||
|
(oracle_pk, announcement): (schnorrsig::PublicKey, Announcement),
|
||
|
build_party_params: &(impl MessageChannel<wallet::BuildPartyParams> + 'static),
|
||
|
sign: &(impl MessageChannel<wallet::Sign> + 'static),
|
||
|
maker: xtra::Address<connection::Actor>,
|
||
|
on_accepted: &(impl MessageChannel<Started> + 'static),
|
||
|
on_completed: &(impl MessageChannel<Completed> + 'static),
|
||
|
) -> Self {
|
||
|
Self {
|
||
|
order,
|
||
|
quantity,
|
||
|
n_payouts,
|
||
|
oracle_pk,
|
||
|
announcement,
|
||
|
build_party_params: build_party_params.clone_channel(),
|
||
|
sign: sign.clone_channel(),
|
||
|
maker,
|
||
|
on_accepted: on_accepted.clone_channel(),
|
||
|
on_completed: on_completed.clone_channel(),
|
||
|
setup_msg_sender: None,
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
#[xtra_productivity]
|
||
|
impl Actor {
|
||
|
fn handle(&mut self, _: Accepted, ctx: &mut xtra::Context<Self>) -> Result<()> {
|
||
|
let order_id = self.order.id;
|
||
|
tracing::info!(%order_id, "Order got accepted");
|
||
|
|
||
|
// inform the `taker_cfd::Actor` about the start of contract
|
||
|
// setup, so that the db and UI can be updated accordingly
|
||
|
self.on_accepted.send(Started(order_id)).await?;
|
||
|
|
||
|
let cfd = Cfd::new(
|
||
|
self.order.clone(),
|
||
|
self.quantity,
|
||
|
CfdState::contract_setup(),
|
||
|
);
|
||
|
|
||
|
let (sender, receiver) = mpsc::unbounded::<SetupMsg>();
|
||
|
// store the writing end to forward messages from the taker to
|
||
|
// the spawned contract setup task
|
||
|
self.setup_msg_sender = Some(sender);
|
||
|
|
||
|
let contract_future = setup_contract::new(
|
||
|
xtra::message_channel::MessageChannel::sink(&self.maker)
|
||
|
.with(move |msg| future::ok(wire::TakerToMaker::Protocol { order_id, msg })),
|
||
|
receiver,
|
||
|
(self.oracle_pk, self.announcement.clone()),
|
||
|
SetupParams::new(
|
||
|
cfd.margin()?,
|
||
|
cfd.counterparty_margin()?,
|
||
|
cfd.order.price,
|
||
|
cfd.quantity_usd,
|
||
|
cfd.order.leverage,
|
||
|
cfd.refund_timelock_in_blocks(),
|
||
|
),
|
||
|
self.build_party_params.clone_channel(),
|
||
|
self.sign.clone_channel(),
|
||
|
Role::Taker,
|
||
|
self.n_payouts,
|
||
|
);
|
||
|
|
||
|
let this = ctx.address().expect("self to be alive");
|
||
|
spawn_fallible::<_, anyhow::Error>(async move {
|
||
|
let _ = match contract_future.await {
|
||
|
Ok(dlc) => this.send(SetupSucceeded { order_id, dlc }).await?,
|
||
|
Err(error) => this.send(SetupFailed { order_id, error }).await?,
|
||
|
};
|
||
|
|
||
|
Ok(())
|
||
|
});
|
||
|
|
||
|
Ok(())
|
||
|
}
|
||
|
|
||
|
fn handle(&mut self, _: Rejected, ctx: &mut xtra::Context<Self>) -> Result<()> {
|
||
|
self.on_completed
|
||
|
.send(Completed::Rejected {
|
||
|
order_id: self.order.id,
|
||
|
})
|
||
|
.await?;
|
||
|
|
||
|
ctx.stop();
|
||
|
|
||
|
Ok(())
|
||
|
}
|
||
|
|
||
|
fn handle(&mut self, msg: wire::SetupMsg, _ctx: &mut xtra::Context<Self>) -> Result<()> {
|
||
|
let mut sender = self
|
||
|
.setup_msg_sender
|
||
|
.clone()
|
||
|
.context("Cannot forward message to contract setup task")?;
|
||
|
sender.send(msg).await?;
|
||
|
|
||
|
Ok(())
|
||
|
}
|
||
|
|
||
|
fn handle(&mut self, msg: SetupSucceeded, ctx: &mut xtra::Context<Self>) -> Result<()> {
|
||
|
self.on_completed
|
||
|
.send(Completed::NewContract {
|
||
|
order_id: msg.order_id,
|
||
|
dlc: msg.dlc,
|
||
|
})
|
||
|
.await?;
|
||
|
|
||
|
ctx.stop();
|
||
|
|
||
|
Ok(())
|
||
|
}
|
||
|
|
||
|
fn handle(&mut self, msg: SetupFailed, ctx: &mut xtra::Context<Self>) -> Result<()> {
|
||
|
self.on_completed
|
||
|
.send(Completed::Failed {
|
||
|
order_id: msg.order_id,
|
||
|
error: msg.error,
|
||
|
})
|
||
|
.await?;
|
||
|
|
||
|
ctx.stop();
|
||
|
|
||
|
Ok(())
|
||
|
}
|
||
|
}
|
||
|
|
||
|
#[async_trait]
|
||
|
impl xtra::Actor for Actor {
|
||
|
async fn started(&mut self, ctx: &mut xtra::Context<Self>) {
|
||
|
let address = ctx
|
||
|
.address()
|
||
|
.expect("actor to be able to give address to itself");
|
||
|
let res = self
|
||
|
.maker
|
||
|
.send(connection::TakeOrder {
|
||
|
order_id: self.order.id,
|
||
|
quantity: self.quantity,
|
||
|
address,
|
||
|
})
|
||
|
.await;
|
||
|
|
||
|
if let Err(e) = res {
|
||
|
tracing::error!(%self.order.id, "Stopping setup_taker actor: {}", e);
|
||
|
ctx.stop()
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
/// Message sent from the `connection::Actor` to the
|
||
|
/// `setup_taker::Actor` to notify that the order taken was accepted
|
||
|
/// by the maker.
|
||
|
pub struct Accepted;
|
||
|
|
||
|
/// Message sent from the `setup_taker::Actor` to the
|
||
|
/// `taker_cfd::Actor` to notify that the contract setup has started.
|
||
|
pub struct Started(pub OrderId);
|
||
|
|
||
|
/// Message sent from the `connection::Actor` to the
|
||
|
/// `setup_taker::Actor` to notify that the order taken was rejected
|
||
|
/// by the maker.
|
||
|
pub struct Rejected;
|
||
|
|
||
|
/// Message sent from the spawned task to `setup_taker::Actor` to
|
||
|
/// notify that the contract setup has finished successfully.
|
||
|
pub struct SetupSucceeded {
|
||
|
order_id: OrderId,
|
||
|
dlc: Dlc,
|
||
|
}
|
||
|
|
||
|
/// Message sent from the spawned task to `setup_taker::Actor` to
|
||
|
/// notify that the contract setup has failed.
|
||
|
pub struct SetupFailed {
|
||
|
order_id: OrderId,
|
||
|
error: anyhow::Error,
|
||
|
}
|
||
|
|
||
|
/// Message sent from the `setup_taker::Actor` to the
|
||
|
/// `taker_cfd::Actor` to notify that the contract setup has finished.
|
||
|
pub enum Completed {
|
||
|
NewContract {
|
||
|
order_id: OrderId,
|
||
|
dlc: Dlc,
|
||
|
},
|
||
|
Rejected {
|
||
|
order_id: OrderId,
|
||
|
},
|
||
|
Failed {
|
||
|
order_id: OrderId,
|
||
|
error: anyhow::Error,
|
||
|
},
|
||
|
}
|
||
|
|
||
|
impl xtra::Message for Started {
|
||
|
type Result = ();
|
||
|
}
|
||
|
|
||
|
impl xtra::Message for Completed {
|
||
|
type Result = ();
|
||
|
}
|