You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

373 lines
11 KiB

use crate::db::{
insert_cfd, insert_new_cfd_state_by_order_id, insert_order, load_all_cfds,
load_cfd_by_order_id, load_order_by_id,
};
use crate::actors::log_error;
use crate::model::cfd::{Cfd, CfdState, CfdStateChangeEvent, CfdStateCommon, Dlc, Order, OrderId};
use crate::model::Usd;
use crate::monitor::{CfdMonitoringEvent, MonitorParams};
use crate::wallet::Wallet;
use crate::wire::SetupMsg;
use crate::{monitor, send_to_socket, setup_contract_actor, wire};
use anyhow::Result;
use async_trait::async_trait;
use bdk::bitcoin::secp256k1::schnorrsig;
use std::time::SystemTime;
use tokio::sync::{mpsc, watch};
use xtra::prelude::*;
pub struct TakeOffer {
pub order_id: OrderId,
pub quantity: Usd,
}
pub struct NewOrder(pub Option<Order>);
pub struct OrderAccepted(pub OrderId);
pub struct OrderRejected(pub OrderId);
pub struct IncProtocolMsg(pub SetupMsg);
pub struct CfdSetupCompleted {
pub order_id: OrderId,
pub dlc: Dlc,
}
pub struct Actor {
db: sqlx::SqlitePool,
wallet: Wallet,
oracle_pk: schnorrsig::PublicKey,
cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>,
order_feed_actor_inbox: watch::Sender<Option<Order>>,
send_to_maker: Address<send_to_socket::Actor>,
current_contract_setup: Option<mpsc::UnboundedSender<SetupMsg>>,
// TODO: Move the contract setup into a dedicated actor and send messages to that actor that
// manages the state instead of this ugly buffer
contract_setup_message_buffer: Vec<SetupMsg>,
monitor_actor: Address<monitor::Actor<Actor>>,
}
impl Actor {
pub async fn new(
db: sqlx::SqlitePool,
wallet: Wallet,
oracle_pk: schnorrsig::PublicKey,
cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>,
order_feed_actor_inbox: watch::Sender<Option<Order>>,
send_to_maker: Address<send_to_socket::Actor>,
monitor_actor: Address<monitor::Actor<Actor>>,
) -> Result<Self> {
let mut conn = db.acquire().await?;
cfd_feed_actor_inbox.send(load_all_cfds(&mut conn).await?)?;
Ok(Self {
db,
wallet,
oracle_pk,
cfd_feed_actor_inbox,
order_feed_actor_inbox,
send_to_maker,
current_contract_setup: None,
contract_setup_message_buffer: vec![],
monitor_actor,
})
}
async fn handle_take_offer(&mut self, order_id: OrderId, quantity: Usd) -> Result<()> {
let mut conn = self.db.acquire().await?;
let current_order = load_order_by_id(order_id, &mut conn).await?;
tracing::info!("Taking current order: {:?}", &current_order);
let cfd = Cfd::new(
current_order.clone(),
quantity,
CfdState::OutgoingOrderRequest {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
},
);
insert_cfd(cfd, &mut conn).await?;
self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
self.send_to_maker
.do_send_async(wire::TakerToMaker::TakeOrder { order_id, quantity })
.await?;
Ok(())
}
async fn handle_new_order(&mut self, order: Option<Order>) -> Result<()> {
match order {
Some(order) => {
let mut conn = self.db.acquire().await?;
insert_order(&order, &mut conn).await?;
self.order_feed_actor_inbox.send(Some(order))?;
}
None => {
self.order_feed_actor_inbox.send(None)?;
}
}
Ok(())
}
async fn handle_order_accepted(
&mut self,
order_id: OrderId,
ctx: &mut Context<Self>,
) -> Result<()> {
tracing::info!(%order_id, "Order got accepted");
let mut conn = self.db.acquire().await?;
insert_new_cfd_state_by_order_id(
order_id,
CfdState::ContractSetup {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
},
&mut conn,
)
.await?;
self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
let (sk, pk) = crate::keypair::new(&mut rand::thread_rng());
let cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
let margin = cfd.margin()?;
let taker_params = self.wallet.build_party_params(margin, pk).await?;
let (actor, inbox) = setup_contract_actor::new(
{
let inbox = self.send_to_maker.clone();
move |msg| {
tokio::spawn(inbox.do_send_async(wire::TakerToMaker::Protocol(msg)));
}
},
setup_contract_actor::OwnParams::Taker(taker_params),
sk,
self.oracle_pk,
cfd,
self.wallet.clone(),
);
for msg in self.contract_setup_message_buffer.drain(..) {
inbox.send(msg)?;
}
let address = ctx
.address()
.expect("actor to be able to give address to itself");
tokio::spawn(async move {
address
.do_send_async(CfdSetupCompleted {
order_id,
dlc: actor.await,
})
.await
});
self.current_contract_setup = Some(inbox);
Ok(())
}
async fn handle_order_rejected(&mut self, order_id: OrderId) -> Result<()> {
tracing::debug!(%order_id, "Order rejected");
let mut conn = self.db.acquire().await?;
insert_new_cfd_state_by_order_id(
order_id,
CfdState::Rejected {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
},
&mut conn,
)
.await?;
self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
Ok(())
}
async fn handle_inc_protocol_msg(&mut self, msg: SetupMsg) -> Result<()> {
let inbox = match &self.current_contract_setup {
None => {
self.contract_setup_message_buffer.push(msg);
return Ok(());
}
Some(inbox) => inbox,
};
inbox.send(msg)?;
Ok(())
}
async fn handle_cfd_setup_completed(&mut self, order_id: OrderId, dlc: Dlc) -> Result<()> {
tracing::info!("Setup complete, publishing on chain now");
self.current_contract_setup = None;
self.contract_setup_message_buffer = vec![];
let mut conn = self.db.acquire().await?;
insert_new_cfd_state_by_order_id(
order_id,
CfdState::PendingOpen {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
dlc: dlc.clone(),
},
&mut conn,
)
.await?;
self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
let txid = self
.wallet
.try_broadcast_transaction(dlc.lock.0.clone())
.await?;
tracing::info!("Lock transaction published with txid {}", txid);
// TODO: It's a bit suspicious to load this just to get the
// refund timelock
let cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
let script_pubkey = dlc.address.script_pubkey();
self.monitor_actor
.do_send_async(monitor::StartMonitoring {
id: order_id,
params: MonitorParams {
lock: (dlc.lock.0.txid(), dlc.lock.1),
commit: (dlc.commit.0.txid(), dlc.commit.2),
cets: dlc
.cets
.into_iter()
.map(|(tx, _, range)| (tx.txid(), script_pubkey.clone(), range))
.collect(),
refund: (
dlc.refund.0.txid(),
script_pubkey,
cfd.refund_timelock_in_blocks(),
),
},
})
.await?;
Ok(())
}
async fn handle_monitoring_event(&mut self, event: CfdMonitoringEvent) -> Result<()> {
let order_id = event.order_id();
let mut conn = self.db.acquire().await?;
let cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
let new_state = cfd.transition_to(CfdStateChangeEvent::Monitor(event))?;
insert_new_cfd_state_by_order_id(order_id, new_state.clone(), &mut conn).await?;
// TODO: Not sure that should be done here...
// Consider sending a message to ourselves to trigger broadcasting refund?
if let CfdState::MustRefund { .. } = new_state {
let signed_refund_tx = cfd.refund_tx()?;
let txid = self
.wallet
.try_broadcast_transaction(signed_refund_tx)
.await?;
tracing::info!("Refund transaction published on chain: {}", txid);
}
Ok(())
}
}
#[async_trait]
impl Handler<TakeOffer> for Actor {
async fn handle(&mut self, msg: TakeOffer, _ctx: &mut Context<Self>) {
log_error!(self.handle_take_offer(msg.order_id, msg.quantity));
}
}
#[async_trait]
impl Handler<NewOrder> for Actor {
async fn handle(&mut self, msg: NewOrder, _ctx: &mut Context<Self>) {
log_error!(self.handle_new_order(msg.0));
}
}
#[async_trait]
impl Handler<OrderAccepted> for Actor {
async fn handle(&mut self, msg: OrderAccepted, ctx: &mut Context<Self>) {
log_error!(self.handle_order_accepted(msg.0, ctx));
}
}
#[async_trait]
impl Handler<OrderRejected> for Actor {
async fn handle(&mut self, msg: OrderRejected, _ctx: &mut Context<Self>) {
log_error!(self.handle_order_rejected(msg.0));
}
}
#[async_trait]
impl Handler<IncProtocolMsg> for Actor {
async fn handle(&mut self, msg: IncProtocolMsg, _ctx: &mut Context<Self>) {
log_error!(self.handle_inc_protocol_msg(msg.0));
}
}
#[async_trait]
impl Handler<CfdSetupCompleted> for Actor {
async fn handle(&mut self, msg: CfdSetupCompleted, _ctx: &mut Context<Self>) {
log_error!(self.handle_cfd_setup_completed(msg.order_id, msg.dlc));
}
}
#[async_trait]
impl Handler<CfdMonitoringEvent> for Actor {
async fn handle(&mut self, msg: CfdMonitoringEvent, _ctx: &mut Context<Self>) {
log_error!(self.handle_monitoring_event(msg))
}
}
impl Message for TakeOffer {
type Result = ();
}
impl Message for NewOrder {
type Result = ();
}
impl Message for OrderAccepted {
type Result = ();
}
impl Message for OrderRejected {
type Result = ();
}
impl Message for IncProtocolMsg {
type Result = ();
}
impl Message for CfdSetupCompleted {
type Result = ();
}
impl xtra::Actor for Actor {}