Browse Source

Merge #366

366: Introduce taker ActorSystem r=klochowicz a=klochowicz

The connection actor is a real actor yet, however it should be enough to
abstract away from the production I/O.

Co-authored-by: Lucas Soriano del Pino <l.soriano.del.pino@gmail.com>
Co-authored-by: Mariusz Klochowicz <mariusz@klochowicz.com>
feature/actor-custom-derive
bors[bot] 3 years ago
committed by GitHub
parent
commit
ccefd53892
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 44
      daemon/src/connection.rs
  2. 58
      daemon/src/maker.rs
  3. 211
      daemon/src/taker.rs
  4. 2
      daemon/src/taker_cfd.rs

44
daemon/src/connection.rs

@ -0,0 +1,44 @@
use daemon::{send_to_socket, taker_cfd, wire};
use futures::{Stream, StreamExt};
use std::net::SocketAddr;
use std::time::Duration;
use tokio_util::codec::FramedRead;
use xtra::prelude::MessageChannel;
use xtra::spawn::TokioGlobalSpawnExt;
use xtra::Actor as _;
const CONNECTION_RETRY_INTERVAL: Duration = Duration::from_secs(5);
pub struct Actor {
pub send_to_maker: Box<dyn MessageChannel<wire::TakerToMaker>>,
pub read_from_maker: Box<dyn Stream<Item = taker_cfd::MakerStreamMessage> + Unpin + Send>,
}
impl Actor {
pub async fn new(maker: SocketAddr) -> Self {
let (read, write) = loop {
let socket = tokio::net::TcpSocket::new_v4().expect("Be able ta create a socket");
if let Ok(connection) = socket.connect(maker).await {
break connection.into_split();
} else {
tracing::warn!(
"Could not connect to the maker, retrying in {}s ...",
CONNECTION_RETRY_INTERVAL.as_secs()
);
tokio::time::sleep(CONNECTION_RETRY_INTERVAL).await;
}
};
let send_to_maker = send_to_socket::Actor::new(write)
.create(None)
.spawn_global();
let read = FramedRead::new(read, wire::JsonCodec::default())
.map(move |item| taker_cfd::MakerStreamMessage { item });
Self {
send_to_maker: Box::new(send_to_maker),
read_from_maker: Box::new(read),
}
}
}

58
daemon/src/maker.rs

@ -25,7 +25,7 @@ use std::str::FromStr;
use std::task::Poll;
use std::time::Duration;
use tokio::net::TcpListener;
use tokio::sync::watch::{self, Receiver};
use tokio::sync::watch;
use tracing_subscriber::filter::LevelFilter;
use xtra::prelude::*;
use xtra::spawn::TokioGlobalSpawnExt;
@ -205,30 +205,25 @@ async fn main() -> Result<()> {
.unwrap();
let ActorSystem {
cfd_actor_addr,
cfd_feed_receiver,
order_feed_receiver,
update_cfd_feed_receiver,
} =
ActorSystem::new(
db,
wallet.clone(),
oracle,
|cfds, channel| oracle::Actor::new(cfds, channel),
{
|channel, cfds| {
let electrum = opts.network.electrum().to_string();
async move {
monitor::Actor::new(electrum, channel, cfds.clone()).await
}
}
},
|channel0, channel1| {
maker_inc_connections::Actor::new(channel0, channel1)
},
listener,
)
.await;
cfd_actor_addr,
cfd_feed_receiver,
order_feed_receiver,
update_cfd_feed_receiver,
} = ActorSystem::new(
db,
wallet.clone(),
oracle,
|cfds, channel| oracle::Actor::new(cfds, channel),
{
|channel, cfds| {
let electrum = opts.network.electrum().to_string();
monitor::Actor::new(electrum, channel, cfds)
}
},
|channel0, channel1| maker_inc_connections::Actor::new(channel0, channel1),
listener,
)
.await;
tokio::spawn(wallet_sync::new(wallet, wallet_feed_sender));
@ -270,9 +265,9 @@ async fn main() -> Result<()> {
pub struct ActorSystem<O, M, T> {
cfd_actor_addr: Address<maker_cfd::Actor<O, M, T>>,
cfd_feed_receiver: Receiver<Vec<Cfd>>,
order_feed_receiver: Receiver<Option<Order>>,
update_cfd_feed_receiver: Receiver<UpdateCfdProposals>,
cfd_feed_receiver: watch::Receiver<Vec<Cfd>>,
order_feed_receiver: watch::Receiver<Option<Order>>,
update_cfd_feed_receiver: watch::Receiver<UpdateCfdProposals>,
}
impl<O, M, T> ActorSystem<O, M, T>
@ -283,7 +278,8 @@ where
+ xtra::Handler<oracle::Sync>,
M: xtra::Handler<monitor::StartMonitoring>
+ xtra::Handler<monitor::Sync>
+ xtra::Handler<monitor::CollaborativeSettlement>,
+ xtra::Handler<monitor::CollaborativeSettlement>
+ xtra::Handler<oracle::Attestation>,
T: xtra::Handler<maker_inc_connections::TakerMessage>
+ xtra::Handler<maker_inc_connections::BroadcastOrder>
+ xtra::Handler<maker_inc_connections::ListenerMessage>,
@ -324,7 +320,7 @@ where
order_feed_sender,
update_cfd_feed_sender,
inc_conn_addr.clone(),
monitor_addr,
monitor_addr.clone(),
oracle_addr.clone(),
)
.create(None)
@ -353,7 +349,7 @@ where
.notify_interval(Duration::from_secs(5), || oracle::Sync)
.unwrap(),
);
let fan_out_actor = fan_out::Actor::new(&[&cfd_actor_addr, &cfd_actor_addr])
let fan_out_actor = fan_out::Actor::new(&[&cfd_actor_addr, &monitor_addr])
.create(None)
.spawn_global();

211
daemon/src/taker.rs

@ -3,15 +3,15 @@ use bdk::bitcoin;
use bdk::bitcoin::secp256k1::schnorrsig;
use clap::Clap;
use daemon::db::{self, load_all_cfds};
use daemon::model::cfd::{Order, UpdateCfdProposals};
use daemon::model::cfd::{Cfd, Order, UpdateCfdProposals};
use daemon::model::WalletInfo;
use daemon::oracle::Attestation;
use daemon::seed::Seed;
use daemon::wallet::Wallet;
use daemon::{
bitmex_price_feed, fan_out, housekeeping, logger, monitor, oracle, send_to_socket, taker_cfd,
wallet_sync, wire,
bitmex_price_feed, fan_out, housekeeping, logger, monitor, oracle, taker_cfd, wallet_sync, wire,
};
use futures::StreamExt;
use futures::{Future, Stream};
use rocket::fairing::AdHoc;
use sqlx::sqlite::SqliteConnectOptions;
use sqlx::SqlitePool;
@ -19,19 +19,16 @@ use std::collections::HashMap;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::str::FromStr;
use std::thread::sleep;
use std::time::Duration;
use tokio::sync::watch;
use tokio_util::codec::FramedRead;
use tracing_subscriber::filter::LevelFilter;
use xtra::prelude::MessageChannel;
use xtra::prelude::{MessageChannel, StrongMessageChannel};
use xtra::spawn::TokioGlobalSpawnExt;
use xtra::Actor;
use xtra::{Actor, Address};
mod connection;
mod routes_taker;
const CONNECTION_RETRY_INTERVAL: Duration = Duration::from_secs(5);
#[derive(Clap)]
struct Opts {
/// The IP address of the other party (i.e. the maker).
@ -140,23 +137,7 @@ async fn main() -> Result<()> {
"ddd4636845a90185991826be5a494cde9f4a6947b1727217afedc6292fa4caf7",
)?;
let (order_feed_sender, order_feed_receiver) = watch::channel::<Option<Order>>(None);
let (wallet_feed_sender, wallet_feed_receiver) = watch::channel::<WalletInfo>(wallet_info);
let (update_cfd_feed_sender, update_feed_receiver) =
watch::channel::<UpdateCfdProposals>(HashMap::new());
let (read, write) = loop {
let socket = tokio::net::TcpSocket::new_v4()?;
if let Ok(connection) = socket.connect(opts.maker).await {
break connection.into_split();
} else {
tracing::warn!(
"Could not connect to the maker, retrying in {}s ...",
CONNECTION_RETRY_INTERVAL.as_secs()
);
sleep(CONNECTION_RETRY_INTERVAL);
}
};
let (task, quote_updates) = bitmex_price_feed::new().await?;
tokio::spawn(task);
@ -173,9 +154,7 @@ async fn main() -> Result<()> {
.await?;
rocket::custom(figment)
.manage(order_feed_receiver)
.manage(wallet_feed_receiver)
.manage(update_feed_receiver)
.manage(quote_updates)
.manage(bitcoin_network)
.attach(AdHoc::try_on_ignite("SQL migrations", {
@ -200,72 +179,41 @@ async fn main() -> Result<()> {
housekeeping::rebroadcast_transactions(&mut conn, &wallet)
.await
.unwrap();
let cfds = load_all_cfds(&mut conn).await.unwrap();
let (cfd_feed_sender, cfd_feed_receiver) = watch::channel(cfds.clone());
let send_to_maker = send_to_socket::Actor::new(write)
.create(None)
.spawn_global();
let (monitor_actor_address, mut monitor_actor_context) = xtra::Context::new(None);
let (oracle_actor_address, mut oracle_actor_context) = xtra::Context::new(None);
let cfd_actor_inbox = taker_cfd::Actor::new(
db.clone(),
let connection::Actor {
send_to_maker,
read_from_maker,
} = connection::Actor::new(opts.maker).await;
let ActorSystem {
cfd_actor_addr,
cfd_feed_receiver,
order_feed_receiver,
update_cfd_feed_receiver,
} = ActorSystem::new(
db,
wallet.clone(),
oracle,
cfd_feed_sender,
order_feed_sender,
update_cfd_feed_sender,
Box::new(send_to_maker),
monitor_actor_address.clone(),
oracle_actor_address.clone(),
send_to_maker,
read_from_maker,
|cfds, channel| oracle::Actor::new(cfds, channel),
{
|channel, cfds| {
let electrum = opts.network.electrum().to_string();
monitor::Actor::new(electrum, channel, cfds)
}
},
)
.create(None)
.spawn_global();
let read = FramedRead::new(read, wire::JsonCodec::default())
.map(move |item| taker_cfd::MakerStreamMessage { item });
tokio::spawn(cfd_actor_inbox.clone().attach_stream(read));
tokio::spawn(
monitor_actor_context
.notify_interval(Duration::from_secs(20), || monitor::Sync)
.unwrap(),
);
tokio::spawn(
monitor_actor_context.run(
monitor::Actor::new(
opts.network.electrum().to_string(),
Box::new(cfd_actor_inbox.clone()),
cfds.clone(),
)
.await
.unwrap(),
),
);
tokio::spawn(wallet_sync::new(wallet, wallet_feed_sender));
tokio::spawn(
oracle_actor_context
.notify_interval(Duration::from_secs(5), || oracle::Sync)
.unwrap(),
);
let actor = fan_out::Actor::new(&[&cfd_actor_inbox, &monitor_actor_address])
.create(None)
.spawn_global();
tokio::spawn(oracle_actor_context.run(oracle::Actor::new(cfds, Box::new(actor))));
oracle_actor_address
.do_send_async(oracle::Sync)
.await
.unwrap();
.await;
tokio::spawn(wallet_sync::new(wallet, wallet_feed_sender));
let take_offer_channel =
MessageChannel::<taker_cfd::TakeOffer>::clone_channel(&cfd_actor_inbox);
MessageChannel::<taker_cfd::TakeOffer>::clone_channel(&cfd_actor_addr);
let cfd_action_channel =
MessageChannel::<taker_cfd::CfdAction>::clone_channel(&cfd_actor_inbox);
MessageChannel::<taker_cfd::CfdAction>::clone_channel(&cfd_actor_addr);
Ok(rocket
.manage(order_feed_receiver)
.manage(update_cfd_feed_receiver)
.manage(take_offer_channel)
.manage(cfd_action_channel)
.manage(cfd_feed_receiver))
@ -292,3 +240,94 @@ async fn main() -> Result<()> {
Ok(())
}
pub struct ActorSystem<O, M> {
cfd_actor_addr: Address<taker_cfd::Actor<O, M>>,
cfd_feed_receiver: watch::Receiver<Vec<Cfd>>,
order_feed_receiver: watch::Receiver<Option<Order>>,
update_cfd_feed_receiver: watch::Receiver<UpdateCfdProposals>,
}
impl<O, M> ActorSystem<O, M>
where
O: xtra::Handler<oracle::MonitorAttestation>
+ xtra::Handler<oracle::GetAnnouncement>
+ xtra::Handler<oracle::FetchAnnouncement>
+ xtra::Handler<oracle::Sync>,
M: xtra::Handler<monitor::StartMonitoring>
+ xtra::Handler<monitor::Sync>
+ xtra::Handler<monitor::CollaborativeSettlement>
+ xtra::Handler<oracle::Attestation>,
{
pub async fn new<F>(
db: SqlitePool,
wallet: Wallet,
oracle_pk: schnorrsig::PublicKey,
send_to_maker: Box<dyn MessageChannel<wire::TakerToMaker>>,
read_from_maker: Box<dyn Stream<Item = taker_cfd::MakerStreamMessage> + Unpin + Send>,
oracle_constructor: impl Fn(Vec<Cfd>, Box<dyn StrongMessageChannel<Attestation>>) -> O,
monitor_constructor: impl Fn(Box<dyn StrongMessageChannel<monitor::Event>>, Vec<Cfd>) -> F,
) -> Self
where
F: Future<Output = Result<M>>,
{
let mut conn = db.acquire().await.unwrap();
let cfds = load_all_cfds(&mut conn).await.unwrap();
let (cfd_feed_sender, cfd_feed_receiver) = watch::channel(cfds.clone());
let (order_feed_sender, order_feed_receiver) = watch::channel::<Option<Order>>(None);
let (update_cfd_feed_sender, update_cfd_feed_receiver) =
watch::channel::<UpdateCfdProposals>(HashMap::new());
let (monitor_addr, mut monitor_ctx) = xtra::Context::new(None);
let (oracle_addr, mut oracle_ctx) = xtra::Context::new(None);
let cfd_actor_addr = taker_cfd::Actor::new(
db,
wallet,
oracle_pk,
cfd_feed_sender,
order_feed_sender,
update_cfd_feed_sender,
send_to_maker,
monitor_addr.clone(),
oracle_addr,
)
.create(None)
.spawn_global();
tokio::spawn(cfd_actor_addr.clone().attach_stream(read_from_maker));
tokio::spawn(
monitor_ctx
.notify_interval(Duration::from_secs(20), || monitor::Sync)
.unwrap(),
);
tokio::spawn(
monitor_ctx.run(
monitor_constructor(Box::new(cfd_actor_addr.clone()), cfds.clone())
.await
.unwrap(),
),
);
tokio::spawn(
oracle_ctx
.notify_interval(Duration::from_secs(5), || oracle::Sync)
.unwrap(),
);
let fan_out_actor = fan_out::Actor::new(&[&cfd_actor_addr, &monitor_addr])
.create(None)
.spawn_global();
tokio::spawn(oracle_ctx.run(oracle_constructor(cfds, Box::new(fan_out_actor))));
Self {
cfd_actor_addr,
cfd_feed_receiver,
order_feed_receiver,
update_cfd_feed_receiver,
}
}
}

2
daemon/src/taker_cfd.rs

@ -92,7 +92,7 @@ impl<O, M> Actor<O, M> {
cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>,
order_feed_actor_inbox: watch::Sender<Option<Order>>,
update_cfd_feed_sender: watch::Sender<UpdateCfdProposals>,
send_to_maker: Box<dyn MessageChannel<wire::TakerToMaker> + Send>,
send_to_maker: Box<dyn MessageChannel<wire::TakerToMaker>>,
monitor_actor: Address<M>,
oracle_actor: Address<O>,
) -> Self {

Loading…
Cancel
Save