Browse Source

Make taker_cfd::Actor generic over other actor addresses

debug-statements
Lucas Soriano del Pino 3 years ago
parent
commit
4b9dae8c7d
No known key found for this signature in database GPG Key ID: EE611E973A1530E7
  1. 407
      daemon/src/taker_cfd.rs

407
daemon/src/taker_cfd.rs

@ -67,7 +67,7 @@ enum RollOverState {
None,
}
pub struct Actor {
pub struct Actor<O, M> {
db: sqlx::SqlitePool,
wallet: Wallet,
oracle_pk: schnorrsig::PublicKey,
@ -75,14 +75,14 @@ pub struct Actor {
order_feed_actor_inbox: watch::Sender<Option<Order>>,
update_cfd_feed_sender: watch::Sender<UpdateCfdProposals>,
send_to_maker: Box<dyn MessageChannel<wire::TakerToMaker>>,
monitor_actor: Address<monitor::Actor>,
monitor_actor: Address<M>,
setup_state: SetupState,
roll_over_state: RollOverState,
oracle_actor: Address<oracle::Actor>,
oracle_actor: Address<O>,
current_pending_proposals: UpdateCfdProposals,
}
impl Actor {
impl<O, M> Actor<O, M> {
#[allow(clippy::too_many_arguments)]
pub fn new(
db: sqlx::SqlitePool,
@ -92,8 +92,8 @@ impl Actor {
order_feed_actor_inbox: watch::Sender<Option<Order>>,
update_cfd_feed_sender: watch::Sender<UpdateCfdProposals>,
send_to_maker: Box<dyn MessageChannel<wire::TakerToMaker> + Send>,
monitor_actor: Address<monitor::Actor>,
oracle_actor: Address<oracle::Actor>,
monitor_actor: Address<M>,
oracle_actor: Address<O>,
) -> Self {
Self {
db,
@ -227,6 +227,103 @@ impl Actor {
Ok(())
}
async fn handle_order_rejected(&mut self, order_id: OrderId) -> Result<()> {
tracing::debug!(%order_id, "Order rejected");
let mut conn = self.db.acquire().await?;
let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
cfd.state = CfdState::rejected();
append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?;
Ok(())
}
async fn handle_settlement_rejected(&mut self, order_id: OrderId) -> Result<()> {
tracing::info!(%order_id, "Settlement proposal got rejected");
self.remove_pending_proposal(&order_id)?;
Ok(())
}
async fn handle_roll_over_rejected(&mut self, order_id: OrderId) -> Result<()> {
tracing::debug!(%order_id, "Roll over request rejected");
// TODO: tell UI that roll over was rejected
// this is not too bad as we are still monitoring for the CFD to expiry
// the taker can just try to ask again :)
Ok(())
}
async fn handle_inc_protocol_msg(&mut self, msg: SetupMsg) -> Result<()> {
match &mut self.setup_state {
SetupState::Active { sender } => {
sender.send(msg).await?;
}
SetupState::None => {
anyhow::bail!("Received setup message without an active contract setup")
}
}
Ok(())
}
async fn handle_inc_roll_over_msg(&mut self, msg: RollOverMsg) -> Result<()> {
match &mut self.roll_over_state {
RollOverState::Active { sender } => {
sender.send(msg).await?;
}
RollOverState::None => {
anyhow::bail!("Received message without an active roll_over setup")
}
}
Ok(())
}
async fn handle_monitoring_event(&mut self, event: monitor::Event) -> Result<()> {
let mut conn = self.db.acquire().await?;
cfd_actors::handle_monitoring_event(
event,
&mut conn,
&self.wallet,
&self.cfd_feed_actor_inbox,
)
.await?;
Ok(())
}
async fn handle_commit(&mut self, order_id: OrderId) -> Result<()> {
let mut conn = self.db.acquire().await?;
cfd_actors::handle_commit(
order_id,
&mut conn,
&self.wallet,
&self.cfd_feed_actor_inbox,
)
.await?;
Ok(())
}
async fn handle_oracle_attestation(&mut self, attestation: oracle::Attestation) -> Result<()> {
let mut conn = self.db.acquire().await?;
cfd_actors::handle_oracle_attestation(
attestation,
&mut conn,
&self.wallet,
&self.cfd_feed_actor_inbox,
)
.await?;
Ok(())
}
}
impl<O, M> Actor<O, M>
where
O: xtra::Handler<oracle::FetchAnnouncement>,
{
async fn handle_new_order(&mut self, order: Option<Order>) -> Result<()> {
match order {
Some(mut order) => {
@ -246,7 +343,62 @@ impl Actor {
}
Ok(())
}
}
impl<O, M> Actor<O, M>
where
O: xtra::Handler<oracle::MonitorAttestation>,
M: xtra::Handler<monitor::StartMonitoring>,
{
async fn handle_cfd_setup_completed(
&mut self,
order_id: OrderId,
dlc: Result<Dlc>,
) -> Result<()> {
self.setup_state = SetupState::None;
let dlc = dlc.context("Failed to setup contract with maker")?;
tracing::info!("Setup complete, publishing on chain now");
let mut conn = self.db.acquire().await?;
let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
cfd.state = CfdState::PendingOpen {
common: CfdStateCommon::default(),
dlc: dlc.clone(),
attestation: None,
};
append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?;
let txid = self
.wallet
.try_broadcast_transaction(dlc.lock.0.clone())
.await?;
tracing::info!("Lock transaction published with txid {}", txid);
self.monitor_actor
.do_send_async(monitor::StartMonitoring {
id: order_id,
params: MonitorParams::from_dlc_and_timelocks(dlc, cfd.refund_timelock_in_blocks()),
})
.await?;
self.oracle_actor
.do_send_async(oracle::MonitorAttestation {
event_id: cfd.order.oracle_event_id,
})
.await?;
Ok(())
}
}
impl<O: 'static, M: 'static> Actor<O, M>
where
Self: xtra::Handler<CfdSetupCompleted>,
O: xtra::Handler<oracle::GetAnnouncement> + xtra::Handler<oracle::MonitorAttestation>,
{
async fn handle_order_accepted(
&mut self,
order_id: OrderId,
@ -305,61 +457,13 @@ impl Actor {
Ok(())
}
}
async fn handle_order_rejected(&mut self, order_id: OrderId) -> Result<()> {
tracing::debug!(%order_id, "Order rejected");
let mut conn = self.db.acquire().await?;
let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
cfd.state = CfdState::rejected();
append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?;
Ok(())
}
async fn handle_settlement_accepted(
&mut self,
order_id: OrderId,
_ctx: &mut Context<Self>,
) -> Result<()> {
tracing::info!(%order_id, "Settlement proposal got accepted");
let mut conn = self.db.acquire().await?;
let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
let dlc = cfd.open_dlc().context("CFD was in wrong state")?;
let proposal = self.get_settlement_proposal(order_id)?;
let (tx, sig_taker) = dlc.close_transaction(proposal)?;
self.send_to_maker
.do_send(wire::TakerToMaker::InitiateSettlement {
order_id,
sig_taker,
})?;
cfd.handle(CfdStateChangeEvent::ProposalSigned(
CollaborativeSettlement::new(
tx.clone(),
dlc.script_pubkey_for(cfd.role()),
proposal.price,
),
))?;
append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?;
self.remove_pending_proposal(&order_id)?;
self.monitor_actor
.do_send_async(monitor::CollaborativeSettlement {
order_id,
tx: (tx.txid(), dlc.script_pubkey_for(Role::Taker)),
})
.await?;
Ok(())
}
impl<O: 'static, M: 'static> Actor<O, M>
where
Self: xtra::Handler<CfdRollOverCompleted>,
O: xtra::Handler<oracle::GetAnnouncement>,
{
async fn handle_roll_over_accepted(
&mut self,
order_id: OrderId,
@ -413,78 +517,31 @@ impl Actor {
.context("Could not remove accepted roll over")?;
Ok(())
}
}
async fn handle_settlement_rejected(&mut self, order_id: OrderId) -> Result<()> {
tracing::info!(%order_id, "Settlement proposal got rejected");
self.remove_pending_proposal(&order_id)?;
Ok(())
}
async fn handle_roll_over_rejected(&mut self, order_id: OrderId) -> Result<()> {
tracing::debug!(%order_id, "Roll over request rejected");
// TODO: tell UI that roll over was rejected
// this is not too bad as we are still monitoring for the CFD to expiry
// the taker can just try to ask again :)
Ok(())
}
async fn handle_inc_protocol_msg(&mut self, msg: SetupMsg) -> Result<()> {
match &mut self.setup_state {
SetupState::Active { sender } => {
sender.send(msg).await?;
}
SetupState::None => {
anyhow::bail!("Received setup message without an active contract setup")
}
}
Ok(())
}
async fn handle_inc_roll_over_msg(&mut self, msg: RollOverMsg) -> Result<()> {
match &mut self.roll_over_state {
RollOverState::Active { sender } => {
sender.send(msg).await?;
}
RollOverState::None => {
anyhow::bail!("Received message without an active roll_over setup")
}
}
Ok(())
}
async fn handle_cfd_setup_completed(
impl<O: 'static, M: 'static> Actor<O, M>
where
M: xtra::Handler<monitor::StartMonitoring>,
{
async fn handle_cfd_roll_over_completed(
&mut self,
order_id: OrderId,
dlc: Result<Dlc>,
) -> Result<()> {
self.setup_state = SetupState::None;
let dlc = dlc.context("Failed to setup contract with maker")?;
tracing::info!("Setup complete, publishing on chain now");
let dlc = dlc.context("Failed to roll over contract with maker")?;
self.roll_over_state = RollOverState::None;
let mut conn = self.db.acquire().await?;
let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
cfd.state = CfdState::PendingOpen {
cfd.state = CfdState::Open {
common: CfdStateCommon::default(),
dlc: dlc.clone(),
attestation: None,
collaborative_close: None,
};
append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?;
let txid = self
.wallet
.try_broadcast_transaction(dlc.lock.0.clone())
.await?;
tracing::info!("Lock transaction published with txid {}", txid);
self.monitor_actor
.do_send_async(monitor::StartMonitoring {
id: order_id,
@ -492,90 +549,66 @@ impl Actor {
})
.await?;
self.oracle_actor
.do_send_async(oracle::MonitorAttestation {
event_id: cfd.order.oracle_event_id,
})
.await?;
Ok(())
}
}
async fn handle_cfd_roll_over_completed(
impl<O: 'static, M: 'static> Actor<O, M>
where
M: xtra::Handler<monitor::CollaborativeSettlement>,
{
async fn handle_settlement_accepted(
&mut self,
order_id: OrderId,
dlc: Result<Dlc>,
_ctx: &mut Context<Self>,
) -> Result<()> {
let dlc = dlc.context("Failed to roll over contract with maker")?;
self.roll_over_state = RollOverState::None;
tracing::info!(%order_id, "Settlement proposal got accepted");
let mut conn = self.db.acquire().await?;
let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
cfd.state = CfdState::Open {
common: CfdStateCommon::default(),
dlc: dlc.clone(),
attestation: None,
collaborative_close: None,
};
let dlc = cfd.open_dlc().context("CFD was in wrong state")?;
let proposal = self.get_settlement_proposal(order_id)?;
let (tx, sig_taker) = dlc.close_transaction(proposal)?;
self.send_to_maker
.do_send(wire::TakerToMaker::InitiateSettlement {
order_id,
sig_taker,
})?;
cfd.handle(CfdStateChangeEvent::ProposalSigned(
CollaborativeSettlement::new(
tx.clone(),
dlc.script_pubkey_for(cfd.role()),
proposal.price,
),
))?;
append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?;
self.remove_pending_proposal(&order_id)?;
self.monitor_actor
.do_send_async(monitor::StartMonitoring {
id: order_id,
params: MonitorParams::from_dlc_and_timelocks(dlc, cfd.refund_timelock_in_blocks()),
.do_send_async(monitor::CollaborativeSettlement {
order_id,
tx: (tx.txid(), dlc.script_pubkey_for(Role::Taker)),
})
.await?;
Ok(())
}
async fn handle_monitoring_event(&mut self, event: monitor::Event) -> Result<()> {
let mut conn = self.db.acquire().await?;
cfd_actors::handle_monitoring_event(
event,
&mut conn,
&self.wallet,
&self.cfd_feed_actor_inbox,
)
.await?;
Ok(())
}
async fn handle_commit(&mut self, order_id: OrderId) -> Result<()> {
let mut conn = self.db.acquire().await?;
cfd_actors::handle_commit(
order_id,
&mut conn,
&self.wallet,
&self.cfd_feed_actor_inbox,
)
.await?;
Ok(())
}
async fn handle_oracle_attestation(&mut self, attestation: oracle::Attestation) -> Result<()> {
let mut conn = self.db.acquire().await?;
cfd_actors::handle_oracle_attestation(
attestation,
&mut conn,
&self.wallet,
&self.cfd_feed_actor_inbox,
)
.await?;
Ok(())
}
}
#[async_trait]
impl Handler<TakeOffer> for Actor {
impl<O: 'static, M: 'static> Handler<TakeOffer> for Actor<O, M> {
async fn handle(&mut self, msg: TakeOffer, _ctx: &mut Context<Self>) {
log_error!(self.handle_take_offer(msg.order_id, msg.quantity));
}
}
#[async_trait]
impl Handler<CfdAction> for Actor {
impl<O: 'static, M: 'static> Handler<CfdAction> for Actor<O, M> {
async fn handle(&mut self, msg: CfdAction, _ctx: &mut Context<Self>) {
use CfdAction::*;
@ -596,7 +629,14 @@ impl Handler<CfdAction> for Actor {
}
#[async_trait]
impl Handler<MakerStreamMessage> for Actor {
impl<O: 'static, M: 'static> Handler<MakerStreamMessage> for Actor<O, M>
where
Self: xtra::Handler<CfdSetupCompleted> + xtra::Handler<CfdRollOverCompleted>,
O: xtra::Handler<oracle::FetchAnnouncement>
+ xtra::Handler<oracle::GetAnnouncement>
+ xtra::Handler<oracle::MonitorAttestation>,
M: xtra::Handler<monitor::CollaborativeSettlement>,
{
async fn handle(
&mut self,
message: MakerStreamMessage,
@ -649,28 +689,35 @@ impl Handler<MakerStreamMessage> for Actor {
}
#[async_trait]
impl Handler<CfdSetupCompleted> for Actor {
impl<O: 'static, M: 'static> Handler<CfdSetupCompleted> for Actor<O, M>
where
O: xtra::Handler<oracle::MonitorAttestation>,
M: xtra::Handler<monitor::StartMonitoring>,
{
async fn handle(&mut self, msg: CfdSetupCompleted, _ctx: &mut Context<Self>) {
log_error!(self.handle_cfd_setup_completed(msg.order_id, msg.dlc));
}
}
#[async_trait]
impl Handler<CfdRollOverCompleted> for Actor {
impl<O: 'static, M: 'static> Handler<CfdRollOverCompleted> for Actor<O, M>
where
M: xtra::Handler<monitor::StartMonitoring>,
{
async fn handle(&mut self, msg: CfdRollOverCompleted, _ctx: &mut Context<Self>) {
log_error!(self.handle_cfd_roll_over_completed(msg.order_id, msg.dlc));
}
}
#[async_trait]
impl Handler<monitor::Event> for Actor {
impl<O: 'static, M: 'static> Handler<monitor::Event> for Actor<O, M> {
async fn handle(&mut self, msg: monitor::Event, _ctx: &mut Context<Self>) {
log_error!(self.handle_monitoring_event(msg))
}
}
#[async_trait]
impl Handler<oracle::Attestation> for Actor {
impl<O: 'static, M: 'static> Handler<oracle::Attestation> for Actor<O, M> {
async fn handle(&mut self, msg: oracle::Attestation, _ctx: &mut Context<Self>) {
log_error!(self.handle_oracle_attestation(msg))
}
@ -697,4 +744,4 @@ impl Message for CfdRollOverCompleted {
type Result = ();
}
impl xtra::Actor for Actor {}
impl<O: 'static, M: 'static> xtra::Actor for Actor<O, M> {}

Loading…
Cancel
Save