|
|
@ -1,18 +1,19 @@ |
|
|
|
use crate::address_map::{AddressMap, Stopping}; |
|
|
|
use crate::model::cfd::OrderId; |
|
|
|
use crate::model::{Price, Timestamp, Usd}; |
|
|
|
use crate::model::{Identity, Price, Timestamp, Usd}; |
|
|
|
use crate::tokio_ext::FutureExt; |
|
|
|
use crate::wire::{EncryptedJsonCodec, TakerToMaker, Version}; |
|
|
|
use crate::{collab_settlement_taker, log_error, noise, send_to_socket, setup_taker, wire, Tasks}; |
|
|
|
use anyhow::{Context, Result}; |
|
|
|
use anyhow::{bail, Context, Result}; |
|
|
|
use bdk::bitcoin::Amount; |
|
|
|
use futures::StreamExt; |
|
|
|
use futures::{SinkExt, StreamExt, TryStreamExt}; |
|
|
|
use std::collections::HashMap; |
|
|
|
use std::net::SocketAddr; |
|
|
|
use std::sync::{Arc, Mutex}; |
|
|
|
use std::time::{Duration, SystemTime}; |
|
|
|
use tokio::net::TcpStream; |
|
|
|
use tokio::sync::watch; |
|
|
|
use tokio_util::codec::FramedRead; |
|
|
|
use tokio_util::codec::{FramedRead, FramedWrite}; |
|
|
|
use xtra::prelude::MessageChannel; |
|
|
|
use xtra::KeepRunning; |
|
|
|
use xtra_productivity::xtra_productivity; |
|
|
@ -40,7 +41,7 @@ pub struct Actor { |
|
|
|
} |
|
|
|
|
|
|
|
pub struct Connect { |
|
|
|
pub maker_identity_pk: x25519_dalek::PublicKey, |
|
|
|
pub maker_identity: Identity, |
|
|
|
pub maker_addr: SocketAddr, |
|
|
|
} |
|
|
|
|
|
|
@ -54,7 +55,17 @@ struct MeasurePulse; |
|
|
|
#[derive(Clone, Debug, PartialEq)] |
|
|
|
pub enum ConnectionStatus { |
|
|
|
Online, |
|
|
|
Offline, |
|
|
|
Offline { |
|
|
|
reason: Option<ConnectionCloseReason>, |
|
|
|
}, |
|
|
|
} |
|
|
|
|
|
|
|
#[derive(Clone, Debug, PartialEq)] |
|
|
|
pub enum ConnectionCloseReason { |
|
|
|
VersionMismatch { |
|
|
|
taker_version: Version, |
|
|
|
maker_version: Version, |
|
|
|
}, |
|
|
|
} |
|
|
|
|
|
|
|
/// Message sent from the `setup_taker::Actor` to the
|
|
|
@ -167,7 +178,7 @@ impl Actor { |
|
|
|
&mut self, |
|
|
|
Connect { |
|
|
|
maker_addr, |
|
|
|
maker_identity_pk, |
|
|
|
maker_identity, |
|
|
|
}: Connect, |
|
|
|
ctx: &mut xtra::Context<Self>, |
|
|
|
) -> Result<()> { |
|
|
@ -185,25 +196,69 @@ impl Actor { |
|
|
|
) |
|
|
|
})? |
|
|
|
.with_context(|| format!("Failed to connect to {}", maker_addr))?; |
|
|
|
let noise = |
|
|
|
noise::initiator_handshake(&mut connection, &self.identity_sk, &maker_identity_pk) |
|
|
|
.await?; |
|
|
|
let noise = noise::initiator_handshake( |
|
|
|
&mut connection, |
|
|
|
&self.identity_sk, |
|
|
|
&maker_identity.pk(), |
|
|
|
) |
|
|
|
.await?; |
|
|
|
|
|
|
|
let (read, write) = connection.into_split(); |
|
|
|
(read, write, Arc::new(Mutex::new(noise))) |
|
|
|
}; |
|
|
|
|
|
|
|
let send_to_socket = send_to_socket::Actor::new(write, noise.clone()); |
|
|
|
let mut read = FramedRead::new(read, wire::EncryptedJsonCodec::new(noise.clone())); |
|
|
|
let mut write = FramedWrite::new(write, EncryptedJsonCodec::new(noise)); |
|
|
|
|
|
|
|
let mut tasks = Tasks::default(); |
|
|
|
tasks.add(self.send_to_maker_ctx.attach(send_to_socket)); |
|
|
|
let our_version = Version::current(); |
|
|
|
write.send(TakerToMaker::Hello(our_version.clone())).await?; |
|
|
|
|
|
|
|
let read = FramedRead::new(read, wire::EncryptedJsonCodec::new(noise)) |
|
|
|
.map(move |item| MakerStreamMessage { item }); |
|
|
|
match read |
|
|
|
.try_next() |
|
|
|
.timeout(Duration::from_secs(10)) |
|
|
|
.await |
|
|
|
.with_context(|| { |
|
|
|
format!( |
|
|
|
"Maker {} did not send Hello within 10 seconds, dropping connection", |
|
|
|
maker_identity |
|
|
|
) |
|
|
|
})? { |
|
|
|
Ok(Some(wire::MakerToTaker::Hello(maker_version))) => { |
|
|
|
if our_version != maker_version { |
|
|
|
self.status_sender |
|
|
|
.send(ConnectionStatus::Offline { |
|
|
|
reason: Some(ConnectionCloseReason::VersionMismatch { |
|
|
|
taker_version: our_version.clone(), |
|
|
|
maker_version: maker_version.clone(), |
|
|
|
}), |
|
|
|
}) |
|
|
|
.expect("receiver to outlive the actor"); |
|
|
|
|
|
|
|
bail!( |
|
|
|
"Network version mismatch, we are on version {} but taker is on version {}", |
|
|
|
our_version, |
|
|
|
maker_version, |
|
|
|
) |
|
|
|
} |
|
|
|
} |
|
|
|
unexpected_message => { |
|
|
|
bail!( |
|
|
|
"Unexpected message {:?} from maker {}", |
|
|
|
unexpected_message, |
|
|
|
maker_identity |
|
|
|
) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
tracing::info!(address = %maker_addr, "Established connection to maker"); |
|
|
|
|
|
|
|
let this = ctx.address().expect("self to be alive"); |
|
|
|
tasks.add(this.attach_stream(read)); |
|
|
|
|
|
|
|
let send_to_socket = send_to_socket::Actor::new(write); |
|
|
|
|
|
|
|
let mut tasks = Tasks::default(); |
|
|
|
tasks.add(self.send_to_maker_ctx.attach(send_to_socket)); |
|
|
|
tasks.add(this.attach_stream(read.map(move |item| MakerStreamMessage { item }))); |
|
|
|
tasks.add( |
|
|
|
ctx.notify_interval(self.heartbeat_timeout, || MeasurePulse) |
|
|
|
.expect("we just started"), |
|
|
@ -217,8 +272,6 @@ impl Actor { |
|
|
|
.send(ConnectionStatus::Online) |
|
|
|
.expect("receiver to outlive the actor"); |
|
|
|
|
|
|
|
tracing::info!(address = %maker_addr, "Established connection to maker"); |
|
|
|
|
|
|
|
Ok(()) |
|
|
|
} |
|
|
|
|
|
|
@ -285,6 +338,9 @@ impl Actor { |
|
|
|
tracing::warn!(%order_id, "No active collaborative settlement"); |
|
|
|
} |
|
|
|
} |
|
|
|
wire::MakerToTaker::Hello(_) => { |
|
|
|
tracing::warn!("Ignoring unexpected Hello message from maker. Hello is only expected when opening a new connection.") |
|
|
|
} |
|
|
|
other => { |
|
|
|
// this one should go to the taker cfd actor
|
|
|
|
log_error!(self.maker_to_taker.send(other)); |
|
|
@ -305,7 +361,7 @@ impl Actor { |
|
|
|
|
|
|
|
if time_since_last_heartbeat > self.heartbeat_timeout { |
|
|
|
self.status_sender |
|
|
|
.send(ConnectionStatus::Offline) |
|
|
|
.send(ConnectionStatus::Offline { reason: None }) |
|
|
|
.expect("watch receiver to outlive the actor"); |
|
|
|
self.connected_state = None; |
|
|
|
} |
|
|
@ -319,16 +375,17 @@ impl xtra::Actor for Actor {} |
|
|
|
pub async fn connect( |
|
|
|
mut maker_online_status_feed_receiver: watch::Receiver<ConnectionStatus>, |
|
|
|
connection_actor_addr: xtra::Address<Actor>, |
|
|
|
maker_identity_pk: x25519_dalek::PublicKey, |
|
|
|
maker_identity: Identity, |
|
|
|
maker_addresses: Vec<SocketAddr>, |
|
|
|
) { |
|
|
|
loop { |
|
|
|
if maker_online_status_feed_receiver.borrow().clone() == ConnectionStatus::Offline { |
|
|
|
let connection_status = maker_online_status_feed_receiver.borrow().clone(); |
|
|
|
if matches!(connection_status, ConnectionStatus::Offline { .. }) { |
|
|
|
tracing::debug!("No connection to the maker"); |
|
|
|
'connect: loop { |
|
|
|
for address in &maker_addresses { |
|
|
|
let connect_msg = Connect { |
|
|
|
maker_identity_pk, |
|
|
|
maker_identity, |
|
|
|
maker_addr: *address, |
|
|
|
}; |
|
|
|
|
|
|
@ -337,7 +394,7 @@ pub async fn connect( |
|
|
|
.await |
|
|
|
.expect("Taker actor to be present") |
|
|
|
{ |
|
|
|
tracing::trace!(%address, "Failed to establish connection: {:#}", e); |
|
|
|
tracing::warn!(%address, "Failed to establish connection: {:#}", e); |
|
|
|
continue; |
|
|
|
} |
|
|
|
break 'connect; |
|
|
|