|
|
@ -18,22 +18,14 @@ use time::ext::NumericalDuration; |
|
|
|
const OLIVIA_EVENT_TIME_FORMAT: &[FormatItem] = |
|
|
|
format_description!("[year]-[month]-[day]T[hour]:[minute]:[second]"); |
|
|
|
|
|
|
|
pub struct Actor<CFD, M> |
|
|
|
where |
|
|
|
CFD: xtra::Handler<Attestation>, |
|
|
|
M: xtra::Handler<Attestation>, |
|
|
|
{ |
|
|
|
pub struct Actor<CFD, M> { |
|
|
|
latest_announcements: HashMap<OracleEventId, Announcement>, |
|
|
|
pending_attestations: HashSet<OracleEventId>, |
|
|
|
cfd_actor_address: xtra::Address<CFD>, |
|
|
|
monitor_actor_address: xtra::Address<M>, |
|
|
|
} |
|
|
|
|
|
|
|
impl<CFD, M> Actor<CFD, M> |
|
|
|
where |
|
|
|
CFD: xtra::Handler<Attestation>, |
|
|
|
M: xtra::Handler<Attestation>, |
|
|
|
{ |
|
|
|
impl<CFD, M> Actor<CFD, M> { |
|
|
|
pub fn new( |
|
|
|
cfd_actor_address: xtra::Address<CFD>, |
|
|
|
monitor_actor_address: xtra::Address<M>, |
|
|
@ -73,7 +65,13 @@ where |
|
|
|
monitor_actor_address, |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
impl<CFD, M> Actor<CFD, M> |
|
|
|
where |
|
|
|
CFD: xtra::Handler<Attestation>, |
|
|
|
M: xtra::Handler<Attestation>, |
|
|
|
{ |
|
|
|
async fn update_state(&mut self) -> Result<()> { |
|
|
|
self.update_latest_announcements() |
|
|
|
.await |
|
|
@ -86,7 +84,7 @@ where |
|
|
|
} |
|
|
|
|
|
|
|
async fn update_latest_announcements(&mut self) -> Result<()> { |
|
|
|
let new_announcements = next_ids() |
|
|
|
self.latest_announcements = next_ids() |
|
|
|
.into_iter() |
|
|
|
.map(|event_id| async move { |
|
|
|
let url = event_id.to_olivia_url(); |
|
|
@ -105,11 +103,9 @@ where |
|
|
|
Result::<_, anyhow::Error>::Ok((event_id, announcement)) |
|
|
|
}) |
|
|
|
.collect::<FuturesOrdered<_>>() |
|
|
|
.try_collect::<HashMap<OracleEventId, Announcement>>() |
|
|
|
.try_collect() |
|
|
|
.await?; |
|
|
|
|
|
|
|
self.latest_announcements = new_announcements; |
|
|
|
|
|
|
|
Ok(()) |
|
|
|
} |
|
|
|
|
|
|
@ -150,24 +146,9 @@ where |
|
|
|
|
|
|
|
Ok(()) |
|
|
|
} |
|
|
|
|
|
|
|
fn monitor_event(&mut self, event_id: OracleEventId) { |
|
|
|
if !self.pending_attestations.insert(event_id.clone()) { |
|
|
|
tracing::trace!("Event {} already being monitored", event_id); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
pub fn handle_get_announcement(&self, event_id: OracleEventId) -> Option<Announcement> { |
|
|
|
self.latest_announcements.get(&event_id).cloned() |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
impl<CFD, M> xtra::Actor for Actor<CFD, M> |
|
|
|
where |
|
|
|
CFD: xtra::Handler<Attestation>, |
|
|
|
M: xtra::Handler<Attestation>, |
|
|
|
{ |
|
|
|
} |
|
|
|
impl<CFD: 'static, M: 'static> xtra::Actor for Actor<CFD, M> {} |
|
|
|
|
|
|
|
pub struct Sync; |
|
|
|
|
|
|
@ -195,13 +176,11 @@ impl xtra::Message for MonitorEvent { |
|
|
|
} |
|
|
|
|
|
|
|
#[async_trait] |
|
|
|
impl<CFD, M> xtra::Handler<MonitorEvent> for Actor<CFD, M> |
|
|
|
where |
|
|
|
CFD: xtra::Handler<Attestation>, |
|
|
|
M: xtra::Handler<Attestation>, |
|
|
|
{ |
|
|
|
impl<CFD: 'static, M: 'static> xtra::Handler<MonitorEvent> for Actor<CFD, M> { |
|
|
|
async fn handle(&mut self, msg: MonitorEvent, _ctx: &mut xtra::Context<Self>) { |
|
|
|
self.monitor_event(msg.event_id) |
|
|
|
if !self.pending_attestations.insert(msg.event_id.clone()) { |
|
|
|
tracing::trace!("Event {} already being monitored", msg.event_id); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
@ -209,17 +188,13 @@ where |
|
|
|
pub struct GetAnnouncement(pub OracleEventId); |
|
|
|
|
|
|
|
#[async_trait] |
|
|
|
impl<CFD, M> xtra::Handler<GetAnnouncement> for Actor<CFD, M> |
|
|
|
where |
|
|
|
CFD: xtra::Handler<Attestation>, |
|
|
|
M: xtra::Handler<Attestation>, |
|
|
|
{ |
|
|
|
impl<CFD: 'static, M: 'static> xtra::Handler<GetAnnouncement> for Actor<CFD, M> { |
|
|
|
async fn handle( |
|
|
|
&mut self, |
|
|
|
msg: GetAnnouncement, |
|
|
|
_ctx: &mut xtra::Context<Self>, |
|
|
|
) -> Option<Announcement> { |
|
|
|
self.handle_get_announcement(msg.0) |
|
|
|
self.latest_announcements.get(&msg.0).cloned() |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|