Browse Source

Merge #419

419: Ensure we always know about Order::Term * announcements r=da-kami a=bonomat

When looking into https://github.com/comit-network/hermes/issues/360 I realized we forgot to trigger `FetchAnnouncement` when receiving a roll-over request. 
This is fixed with this PR.

I found this a bit cumbersome to have to do this manually. Instead, we should sync regularly and ensure we have attestations for the next 24h. 

This is also related to the discussion `@da-kami` and `@thomaseizinger` had in https://github.com/comit-network/hermes/issues/349

Co-authored-by: Philipp Hoenisch <philipp@hoenisch.at>
contact-taker-before-changing-cfd-state
bors[bot] 3 years ago
committed by GitHub
parent
commit
ba8d74e914
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      daemon/src/lib.rs
  2. 3
      daemon/src/maker.rs
  3. 6
      daemon/src/maker_cfd.rs
  4. 63
      daemon/src/oracle.rs
  5. 4
      daemon/src/taker.rs
  6. 31
      daemon/src/taker_cfd.rs
  7. 2
      daemon/tests/happy_path.rs

2
daemon/src/lib.rs

@ -57,7 +57,6 @@ impl<O, M, T, W> MakerActorSystem<O, M, T, W>
where
O: xtra::Handler<oracle::MonitorAttestation>
+ xtra::Handler<oracle::GetAnnouncement>
+ xtra::Handler<oracle::FetchAnnouncement>
+ xtra::Handler<oracle::Sync>,
M: xtra::Handler<monitor::StartMonitoring>
+ xtra::Handler<monitor::Sync>
@ -162,7 +161,6 @@ impl<O, M, W> TakerActorSystem<O, M, W>
where
O: xtra::Handler<oracle::MonitorAttestation>
+ xtra::Handler<oracle::GetAnnouncement>
+ xtra::Handler<oracle::FetchAnnouncement>
+ xtra::Handler<oracle::Sync>,
M: xtra::Handler<monitor::StartMonitoring>
+ xtra::Handler<monitor::Sync>

3
daemon/src/maker.rs

@ -190,6 +190,7 @@ async fn main() -> Result<()> {
housekeeping::transition_non_continue_cfds_to_setup_failed(&mut conn).await?;
housekeeping::rebroadcast_transactions(&mut conn, &wallet).await?;
let term = time::Duration::hours(opts.term as i64);
let MakerActorSystem {
cfd_actor_addr,
cfd_feed_receiver,
@ -200,7 +201,7 @@ async fn main() -> Result<()> {
db.clone(),
wallet.clone(),
oracle,
|cfds, channel| oracle::Actor::new(cfds, channel),
|cfds, channel| oracle::Actor::new(cfds, channel, term),
{
|channel, cfds| {
let electrum = opts.network.electrum().to_string();

6
daemon/src/maker_cfd.rs

@ -629,7 +629,6 @@ where
impl<O, M, T, W> Actor<O, M, T, W>
where
O: xtra::Handler<oracle::FetchAnnouncement>,
T: xtra::Handler<maker_inc_connections::BroadcastOrder>,
{
async fn handle_new_order(
@ -641,10 +640,6 @@ where
let oracle_event_id =
oracle::next_announcement_after(time::OffsetDateTime::now_utc() + self.term)?;
self.oracle_actor
.do_send_async(oracle::FetchAnnouncement(oracle_event_id))
.await?;
let order = Order::new(
price,
min_quantity,
@ -949,7 +944,6 @@ where
#[async_trait]
impl<O: 'static, M: 'static, T: 'static, W: 'static> Handler<NewOrder> for Actor<O, M, T, W>
where
O: xtra::Handler<oracle::FetchAnnouncement>,
T: xtra::Handler<maker_inc_connections::BroadcastOrder>,
{
async fn handle(&mut self, msg: NewOrder, _ctx: &mut Context<Self>) {

63
daemon/src/oracle.rs

@ -1,6 +1,6 @@
use crate::model::cfd::{Cfd, CfdState};
use crate::model::BitMexPriceEventId;
use crate::{log_error, tokio_ext};
use crate::{log_error, tokio_ext, try_continue};
use anyhow::{Context, Result};
use async_trait::async_trait;
use cfd_protocol::secp256k1_zkp::{schnorrsig, SecretKey};
@ -9,26 +9,19 @@ use serde::Deserialize;
use std::collections::{HashMap, HashSet};
use std::ops::Add;
use time::ext::NumericalDuration;
use time::Duration;
use xtra::prelude::StrongMessageChannel;
use xtra_productivity::xtra_productivity;
pub struct Actor {
announcements: HashMap<BitMexPriceEventId, (OffsetDateTime, Vec<schnorrsig::PublicKey>)>,
pending_announcements: HashSet<BitMexPriceEventId>,
pending_attestations: HashSet<BitMexPriceEventId>,
attestation_channel: Box<dyn StrongMessageChannel<Attestation>>,
term: Duration,
}
pub struct Sync;
/// Message used to tell the `oracle::Actor` to fetch an
/// `Announcement` from `olivia`.
///
/// The `Announcement` corresponds to the `OracleEventId` included in
/// the message.
#[derive(Debug, Clone)]
pub struct FetchAnnouncement(pub BitMexPriceEventId);
pub struct MonitorAttestation {
pub event_id: BitMexPriceEventId,
}
@ -69,6 +62,7 @@ impl Actor {
pub fn new(
cfds: Vec<Cfd>,
attestation_channel: Box<dyn StrongMessageChannel<Attestation>>,
term: Duration,
) -> Self {
let mut pending_attestations = HashSet::new();
@ -100,14 +94,23 @@ impl Actor {
Self {
announcements: HashMap::new(),
pending_announcements: HashSet::new(),
pending_attestations,
attestation_channel,
term,
}
}
fn update_pending_announcements(&mut self, ctx: &mut xtra::Context<Self>) {
for event_id in self.pending_announcements.iter().cloned() {
fn ensure_having_announcements(&mut self, term: Duration, ctx: &mut xtra::Context<Self>) {
// we want inclusive the term length hence +1
for hour in 1..term.whole_hours() + 1 {
let event_id = try_continue!(next_announcement_after(
time::OffsetDateTime::now_utc() + Duration::hours(hour)
));
if self.announcements.get(&event_id).is_some() {
tracing::trace!("Announcement already known: {}", event_id,);
continue;
}
let this = ctx.address().expect("self to be alive");
tokio_ext::spawn_fallible(async move {
@ -208,35 +211,18 @@ impl Actor {
}
}
fn handle_fetch_announcement(
&mut self,
msg: FetchAnnouncement,
_ctx: &mut xtra::Context<Self>,
) {
if !self.pending_announcements.insert(msg.0) {
tracing::trace!("Announcement {} already being fetched", msg.0);
}
}
fn handle_get_announcement(
&mut self,
msg: GetAnnouncement,
_ctx: &mut xtra::Context<Self>,
) -> Option<Announcement> {
let announcement =
self.announcements
.get_key_value(&msg.0)
.map(|(id, (time, nonce_pks))| Announcement {
id: *id,
expected_outcome_time: *time,
nonce_pks: nonce_pks.clone(),
});
if announcement.is_none() {
self.pending_announcements.insert(msg.0);
}
announcement
self.announcements
.get_key_value(&msg.0)
.map(|(id, (time, nonce_pks))| Announcement {
id: *id,
expected_outcome_time: *time,
nonce_pks: nonce_pks.clone(),
})
}
fn handle_new_announcement_fetched(
@ -244,13 +230,12 @@ impl Actor {
msg: NewAnnouncementFetched,
_ctx: &mut xtra::Context<Self>,
) {
self.pending_announcements.remove(&msg.id);
self.announcements
.insert(msg.id, (msg.expected_outcome_time, msg.nonce_pks));
}
fn handle_sync(&mut self, _: Sync, ctx: &mut xtra::Context<Self>) {
self.update_pending_announcements(ctx);
self.ensure_having_announcements(self.term, ctx);
self.update_pending_attestations(ctx);
}
}

4
daemon/src/taker.rs

@ -27,6 +27,8 @@ use xtra::Actor;
mod routes_taker;
pub const TERM: time::Duration = time::Duration::hours(24);
#[derive(Clap)]
struct Opts {
/// The IP address of the other party (i.e. the maker).
@ -179,7 +181,7 @@ async fn main() -> Result<()> {
oracle,
send_to_maker,
read_from_maker,
|cfds, channel| oracle::Actor::new(cfds, channel),
|cfds, channel| oracle::Actor::new(cfds, channel, TERM),
{
|channel, cfds| {
let electrum = opts.network.electrum().to_string();

31
daemon/src/taker_cfd.rs

@ -16,7 +16,6 @@ use futures::channel::mpsc;
use futures::{future, SinkExt};
use std::collections::HashMap;
use std::time::SystemTime;
use time::OffsetDateTime;
use tokio::sync::watch;
use xtra::prelude::*;
use xtra::KeepRunning;
@ -300,19 +299,12 @@ where
}
}
impl<O, M, W> Actor<O, M, W>
where
O: xtra::Handler<oracle::FetchAnnouncement>,
{
impl<O, M, W> Actor<O, M, W> {
async fn handle_new_order(&mut self, order: Option<Order>) -> Result<()> {
match order {
Some(mut order) => {
order.origin = Origin::Theirs;
self.oracle_actor
.do_send_async(oracle::FetchAnnouncement(order.oracle_event_id))
.await?;
let mut conn = self.db.acquire().await?;
insert_order(&order, &mut conn).await?;
self.order_feed_actor_inbox.send(Some(order))?;
@ -356,7 +348,6 @@ where
impl<O, M, W> Actor<O, M, W>
where
O: xtra::Handler<oracle::FetchAnnouncement>,
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
async fn handle_propose_roll_over(&mut self, order_id: OrderId) -> Result<()> {
@ -364,8 +355,6 @@ where
anyhow::bail!("An update for order id {} is already in progress", order_id)
}
let order = load_order_by_id(order_id, &mut self.db.acquire().await?).await?;
let proposal = RollOverProposal {
order_id,
timestamp: SystemTime::now(),
@ -380,13 +369,6 @@ where
);
self.send_pending_update_proposals()?;
// we are likely going to need this one
self.oracle_actor
.send(oracle::FetchAnnouncement(oracle::next_announcement_after(
OffsetDateTime::now_utc() + order.term,
)?))
.await?;
self.send_to_maker
.do_send(wire::TakerToMaker::ProposeRollOver {
order_id: proposal.order_id,
@ -395,12 +377,10 @@ where
Ok(())
}
}
impl<O, M, W> Actor<O, M, W>
where
O: xtra::Handler<oracle::FetchAnnouncement>,
impl<O, M, W> Actor<O, M, W> where
W: xtra::Handler<wallet::TryBroadcastTransaction>
+ xtra::Handler<wallet::Sign>
+ xtra::Handler<wallet::BuildPartyParams>,
+ xtra::Handler<wallet::BuildPartyParams>
{
}
@ -681,7 +661,6 @@ impl<O: 'static, M: 'static, W: 'static> Handler<TakeOffer> for Actor<O, M, W> {
#[async_trait]
impl<O: 'static, M: 'static, W: 'static> Handler<CfdAction> for Actor<O, M, W>
where
O: xtra::Handler<oracle::FetchAnnouncement>,
W: xtra::Handler<wallet::TryBroadcastTransaction>
+ xtra::Handler<wallet::Sign>
+ xtra::Handler<wallet::BuildPartyParams>,
@ -709,9 +688,7 @@ where
impl<O: 'static, M: 'static, W: 'static> Handler<MakerStreamMessage> for Actor<O, M, W>
where
Self: xtra::Handler<CfdSetupCompleted> + xtra::Handler<CfdRollOverCompleted>,
O: xtra::Handler<oracle::FetchAnnouncement>
+ xtra::Handler<oracle::GetAnnouncement>
+ xtra::Handler<oracle::MonitorAttestation>,
O: xtra::Handler<oracle::GetAnnouncement> + xtra::Handler<oracle::MonitorAttestation>,
M: xtra::Handler<monitor::CollaborativeSettlement>,
W: xtra::Handler<wallet::TryBroadcastTransaction>
+ xtra::Handler<wallet::Sign>

2
daemon/tests/happy_path.rs

@ -86,8 +86,6 @@ impl xtra::Actor for Oracle {}
#[xtra_productivity(message_impl = false)]
impl Oracle {
async fn handle_fetch_announcement(&mut self, _msg: oracle::FetchAnnouncement) {}
async fn handle_get_announcement(
&mut self,
_msg: oracle::GetAnnouncement,

Loading…
Cancel
Save