Browse Source

Replace `Announcements` with requesting specific `Announcement`

Instead of the oracle actor sending out the `Announcements` we only store them internally and let the cfd actors chose what `Announcement` is relevant given the term of the offer.
The specific `Announcement` is then fetched from the oracle actor.
The monitoring logic for pending attestations remains the same.
compile-for-aarch64
Daniel Karzel 3 years ago
parent
commit
971b9f9368
No known key found for this signature in database GPG Key ID: 30C3FC2E438ADB6E
  1. 54
      daemon/src/maker_cfd.rs
  2. 60
      daemon/src/oracle.rs
  3. 49
      daemon/src/taker_cfd.rs

54
daemon/src/maker_cfd.rs

@ -8,7 +8,7 @@ use crate::model::cfd::{
Cfd, CfdState, CfdStateChangeEvent, CfdStateCommon, Dlc, Order, OrderId, Origin, Role,
RollOverProposal, SettlementKind, SettlementProposal, UpdateCfdProposal, UpdateCfdProposals,
};
use crate::model::{OracleEventId, TakerId, Usd};
use crate::model::{TakerId, Usd};
use crate::monitor::MonitorParams;
use crate::wallet::Wallet;
use crate::{maker_inc_connections, monitor, oracle, setup_contract, wire};
@ -17,7 +17,7 @@ use async_trait::async_trait;
use bdk::bitcoin::secp256k1::schnorrsig;
use futures::channel::mpsc;
use futures::{future, SinkExt};
use std::collections::{BTreeMap, HashMap};
use std::collections::HashMap;
use std::time::SystemTime;
use tokio::sync::watch;
use xtra::prelude::*;
@ -88,7 +88,6 @@ pub struct Actor {
monitor_actor: Address<monitor::Actor<Actor>>,
setup_state: SetupState,
roll_over_state: RollOverState,
latest_announcements: Option<BTreeMap<OracleEventId, oracle::Announcement>>,
oracle_actor: Address<oracle::Actor<Actor, monitor::Actor<Actor>>>,
// Maker needs to also store TakerId to be able to send a reply back
current_pending_proposals: HashMap<OrderId, (UpdateCfdProposal, TakerId)>,
@ -135,7 +134,6 @@ impl Actor {
monitor_actor,
setup_state: SetupState::None,
roll_over_state: RollOverState::None,
latest_announcements: None,
oracle_actor,
current_pending_proposals: HashMap::new(),
}
@ -513,13 +511,11 @@ impl Actor {
self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
let offer_announcements = self
.latest_announcements
.clone()
.context("No oracle announcements available")?;
let offer_announcement = offer_announcements
.get(&cfd.order.oracle_event_id)
.context("Order's announcement not found in current oracle announcements")?;
let offer_announcement = self
.oracle_actor
.send(oracle::GetAnnouncement(cfd.order.oracle_event_id.clone()))
.await?
.with_context(|| format!("Announcement {} not found", cfd.order.oracle_event_id))?;
self.oracle_actor
.do_send_async(oracle::MonitorEvent {
@ -694,13 +690,13 @@ impl Actor {
// TODO: do we want to store in the db that we rolled over?
let (oracle_event_id, announcement) = self
.latest_announcements
.clone()
.context("Cannot roll over because no announcement from oracle was found")?
.into_iter()
.next_back()
.context("Empty list of announcements")?;
let oracle_event_id =
oracle::next_announcement_after(time::OffsetDateTime::now_utc() + Order::TERM);
let announcement = self
.oracle_actor
.send(oracle::GetAnnouncement(oracle_event_id.clone()))
.await?
.with_context(|| format!("Announcement {} not found", oracle_event_id))?;
self.takers
.send(maker_inc_connections::TakerMessage {
@ -810,21 +806,6 @@ impl Actor {
Ok(())
}
async fn handle_oracle_announcements(
&mut self,
announcements: oracle::Announcements,
) -> Result<()> {
self.latest_announcements.replace(
announcements
.0
.iter()
.map(|announcement| (announcement.id.clone(), announcement.clone()))
.collect(),
);
Ok(())
}
async fn handle_oracle_attestation(&mut self, attestation: oracle::Attestation) -> Result<()> {
tracing::debug!(
"Learnt latest oracle attestation for event: {}",
@ -1014,13 +995,6 @@ impl Handler<TakerStreamMessage> for Actor {
}
}
#[async_trait]
impl Handler<oracle::Announcements> for Actor {
async fn handle(&mut self, msg: oracle::Announcements, _ctx: &mut Context<Self>) {
log_error!(self.handle_oracle_announcements(msg))
}
}
#[async_trait]
impl Handler<oracle::Attestation> for Actor {
async fn handle(&mut self, msg: oracle::Attestation, _ctx: &mut Context<Self>) {

60
daemon/src/oracle.rs

@ -11,8 +11,7 @@ use rocket::time::format_description::FormatItem;
use rocket::time::macros::format_description;
use rocket::time::{Duration, OffsetDateTime, Time};
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::convert::TryFrom;
use std::collections::{HashMap, HashSet};
/// Where `olivia` is located.
const OLIVIA_URL: &str = "https://h00.ooo";
@ -22,10 +21,10 @@ const OLIVIA_EVENT_TIME_FORMAT: &[FormatItem] =
pub struct Actor<CFD, M>
where
CFD: xtra::Handler<Announcements> + xtra::Handler<Attestation>,
CFD: xtra::Handler<Attestation>,
M: xtra::Handler<Attestation>,
{
latest_announcements: Option<[Announcement; 24]>,
latest_announcements: HashMap<OracleEventId, Announcement>,
pending_attestations: HashSet<OracleEventId>,
cfd_actor_address: xtra::Address<CFD>,
monitor_actor_address: xtra::Address<M>,
@ -33,7 +32,7 @@ where
impl<CFD, M> Actor<CFD, M>
where
CFD: xtra::Handler<Announcements> + xtra::Handler<Attestation>,
CFD: xtra::Handler<Attestation>,
M: xtra::Handler<Attestation>,
{
pub fn new(
@ -69,7 +68,7 @@ where
}
Self {
latest_announcements: None,
latest_announcements: HashMap::new(),
pending_attestations,
cfd_actor_address,
monitor_actor_address,
@ -103,21 +102,13 @@ where
.json::<Announcement>()
.await
.context("Failed to deserialize as Announcement")?;
Result::<_, anyhow::Error>::Ok(announcement)
Result::<_, anyhow::Error>::Ok((OracleEventId(event_url), announcement))
})
.collect::<FuturesOrdered<_>>()
.try_collect::<Vec<_>>()
.try_collect::<HashMap<OracleEventId, Announcement>>()
.await?;
let new_announcements = <[Announcement; 24]>::try_from(new_announcements)
.map_err(|vec| anyhow::anyhow!("wrong number of announcements: {}", vec.len()))?;
if self.latest_announcements.as_ref() != Some(&new_announcements) {
self.latest_announcements = Some(new_announcements.clone());
self.cfd_actor_address
.do_send_async(Announcements(new_announcements))
.await?;
}
self.latest_announcements = new_announcements;
Ok(())
}
@ -165,11 +156,15 @@ where
tracing::trace!("Event {} already being monitored", event_id);
}
}
pub fn handle_get_announcement(&self, event_id: OracleEventId) -> Option<Announcement> {
self.latest_announcements.get(&event_id).cloned()
}
}
impl<CFD, M> xtra::Actor for Actor<CFD, M>
where
CFD: xtra::Handler<Announcements> + xtra::Handler<Attestation>,
CFD: xtra::Handler<Attestation>,
M: xtra::Handler<Attestation>,
{
}
@ -183,7 +178,7 @@ impl xtra::Message for Sync {
#[async_trait]
impl<CFD, M> xtra::Handler<Sync> for Actor<CFD, M>
where
CFD: xtra::Handler<Announcements> + xtra::Handler<Attestation>,
CFD: xtra::Handler<Attestation>,
M: xtra::Handler<Attestation>,
{
async fn handle(&mut self, _: Sync, _ctx: &mut xtra::Context<Self>) {
@ -202,7 +197,7 @@ impl xtra::Message for MonitorEvent {
#[async_trait]
impl<CFD, M> xtra::Handler<MonitorEvent> for Actor<CFD, M>
where
CFD: xtra::Handler<Announcements> + xtra::Handler<Attestation>,
CFD: xtra::Handler<Attestation>,
M: xtra::Handler<Attestation>,
{
async fn handle(&mut self, msg: MonitorEvent, _ctx: &mut xtra::Context<Self>) {
@ -210,6 +205,24 @@ where
}
}
#[derive(Debug, Clone)]
pub struct GetAnnouncement(pub OracleEventId);
#[async_trait]
impl<CFD, M> xtra::Handler<GetAnnouncement> for Actor<CFD, M>
where
CFD: xtra::Handler<Attestation>,
M: xtra::Handler<Attestation>,
{
async fn handle(
&mut self,
msg: GetAnnouncement,
_ctx: &mut xtra::Context<Self>,
) -> Option<Announcement> {
self.handle_get_announcement(msg.0)
}
}
/// Construct the URL of the next 24 `BitMEX/BXBT` hourly events
/// `olivia` will attest to.
fn next_urls() -> Vec<String> {
@ -265,9 +278,6 @@ impl From<Announcement> for cfd_protocol::Announcement {
}
}
#[derive(Debug, Clone)]
pub struct Announcements(pub [Announcement; 24]);
// TODO: Implement real deserialization once price attestation is
// implemented in `olivia`
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
@ -278,8 +288,8 @@ pub struct Attestation {
pub scalars: Vec<SecretKey>,
}
impl xtra::Message for Announcements {
type Result = ();
impl xtra::Message for GetAnnouncement {
type Result = Option<Announcement>;
}
impl xtra::Message for Attestation {

49
daemon/src/taker_cfd.rs

@ -17,7 +17,7 @@ use async_trait::async_trait;
use bdk::bitcoin::secp256k1::schnorrsig;
use futures::channel::mpsc;
use futures::{future, SinkExt};
use std::collections::{BTreeMap, HashMap};
use std::collections::HashMap;
use std::time::SystemTime;
use tokio::sync::watch;
use xtra::prelude::*;
@ -80,7 +80,6 @@ pub struct Actor {
monitor_actor: Address<monitor::Actor<Actor>>,
setup_state: SetupState,
roll_over_state: RollOverState,
latest_announcements: Option<BTreeMap<OracleEventId, oracle::Announcement>>,
oracle_actor: Address<oracle::Actor<Actor, monitor::Actor<Actor>>>,
current_pending_proposals: UpdateCfdProposals,
}
@ -110,7 +109,6 @@ impl Actor {
setup_state: SetupState::None,
roll_over_state: RollOverState::None,
oracle_actor,
latest_announcements: None,
current_pending_proposals: HashMap::new(),
}
}
@ -271,13 +269,11 @@ impl Actor {
.send(load_all_cfds(&mut conn).await?)?;
let cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
let offer_announcements = self
.latest_announcements
.clone()
.context("No oracle announcements available")?;
let offer_announcement = offer_announcements
.get(&cfd.order.oracle_event_id)
.context("Order's announcement not found in current oracle announcements")?;
let offer_announcement = self
.oracle_actor
.send(oracle::GetAnnouncement(cfd.order.oracle_event_id.clone()))
.await?
.with_context(|| format!("Announcement {} not found", cfd.order.oracle_event_id))?;
self.oracle_actor
.do_send_async(oracle::MonitorEvent {
@ -367,14 +363,11 @@ impl Actor {
let cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
let dlc = cfd.open_dlc().context("CFD was in wrong state")?;
// TODO: we need to get multiple announcements for the next 24h
let announcement = self
.latest_announcements
.clone()
.context("Cannot roll over because no announcement from oracle was found")?
.get(&oracle_event_id)
.context("Empty list of announcements")?
.clone();
.oracle_actor
.send(oracle::GetAnnouncement(oracle_event_id.clone()))
.await?
.with_context(|| format!("Announcement {} not found", oracle_event_id))?;
self.oracle_actor
.do_send_async(oracle::MonitorEvent {
@ -594,21 +587,6 @@ impl Actor {
Ok(())
}
async fn handle_oracle_announcements(
&mut self,
announcements: oracle::Announcements,
) -> Result<()> {
self.latest_announcements.replace(
announcements
.0
.iter()
.map(|announcement| (announcement.id.clone(), announcement.clone()))
.collect(),
);
Ok(())
}
async fn handle_oracle_attestation(&mut self, attestation: oracle::Attestation) -> Result<()> {
tracing::debug!(
"Learnt latest oracle attestation for event: {}",
@ -748,13 +726,6 @@ impl Handler<monitor::Event> for Actor {
}
}
#[async_trait]
impl Handler<oracle::Announcements> for Actor {
async fn handle(&mut self, msg: oracle::Announcements, _ctx: &mut Context<Self>) {
log_error!(self.handle_oracle_announcements(msg))
}
}
#[async_trait]
impl Handler<oracle::Attestation> for Actor {
async fn handle(&mut self, msg: oracle::Attestation, _ctx: &mut Context<Self>) {

Loading…
Cancel
Save