committed by
GitHub
25 changed files with 807 additions and 821 deletions
@ -1,9 +0,0 @@ |
|||||
use anyhow::Result; |
|
||||
use daemon::routes_maker; |
|
||||
|
|
||||
#[rocket::main] |
|
||||
async fn main() -> Result<()> { |
|
||||
routes_maker::start_http().await?; |
|
||||
|
|
||||
Ok(()) |
|
||||
} |
|
@ -1,9 +0,0 @@ |
|||||
use anyhow::Result; |
|
||||
use daemon::routes_taker; |
|
||||
|
|
||||
#[rocket::main] |
|
||||
async fn main() -> Result<()> { |
|
||||
routes_taker::start_http().await?; |
|
||||
|
|
||||
Ok(()) |
|
||||
} |
|
@ -1,17 +0,0 @@ |
|||||
use crate::db::do_run_migrations; |
|
||||
use rocket::{fairing, Build, Rocket}; |
|
||||
use rocket_db_pools::{sqlx, Database}; |
|
||||
|
|
||||
#[derive(Database)] |
|
||||
#[database("maker")] |
|
||||
pub struct Maker(sqlx::SqlitePool); |
|
||||
|
|
||||
pub async fn run_migrations(rocket: Rocket<Build>) -> fairing::Result { |
|
||||
match Maker::fetch(&rocket) { |
|
||||
Some(db) => match do_run_migrations(&**db).await { |
|
||||
Ok(_) => Ok(rocket), |
|
||||
Err(_) => Err(rocket), |
|
||||
}, |
|
||||
None => Err(rocket), |
|
||||
} |
|
||||
} |
|
@ -1,17 +0,0 @@ |
|||||
use crate::db::do_run_migrations; |
|
||||
use rocket::{fairing, Build, Rocket}; |
|
||||
use rocket_db_pools::{sqlx, Database}; |
|
||||
|
|
||||
#[derive(Database)] |
|
||||
#[database("taker")] |
|
||||
pub struct Taker(sqlx::SqlitePool); |
|
||||
|
|
||||
pub async fn run_migrations(rocket: Rocket<Build>) -> fairing::Result { |
|
||||
match Taker::fetch(&rocket) { |
|
||||
Some(db) => match do_run_migrations(&**db).await { |
|
||||
Ok(_) => Ok(rocket), |
|
||||
Err(_) => Err(rocket), |
|
||||
}, |
|
||||
None => Err(rocket), |
|
||||
} |
|
||||
} |
|
@ -1,9 +0,0 @@ |
|||||
#[macro_use] |
|
||||
extern crate rocket; |
|
||||
|
|
||||
pub mod db; |
|
||||
pub mod model; |
|
||||
pub mod routes_maker; |
|
||||
pub mod routes_taker; |
|
||||
pub mod socket; |
|
||||
pub mod state; |
|
@ -0,0 +1,98 @@ |
|||||
|
use anyhow::Result; |
||||
|
use bdk::bitcoin::Amount; |
||||
|
use model::cfd::{Cfd, CfdOffer}; |
||||
|
use rocket::fairing::AdHoc; |
||||
|
use rocket::figment::util::map; |
||||
|
use rocket::figment::value::{Map, Value}; |
||||
|
use rocket_db_pools::Database; |
||||
|
use tokio::sync::{mpsc, watch}; |
||||
|
|
||||
|
mod db; |
||||
|
mod maker_cfd_actor; |
||||
|
mod maker_inc_connections_actor; |
||||
|
mod model; |
||||
|
mod routes_maker; |
||||
|
mod send_wire_message_actor; |
||||
|
mod to_sse_event; |
||||
|
mod wire; |
||||
|
|
||||
|
#[derive(Database)] |
||||
|
#[database("maker")] |
||||
|
pub struct Db(sqlx::SqlitePool); |
||||
|
|
||||
|
#[rocket::main] |
||||
|
async fn main() -> Result<()> { |
||||
|
let (cfd_feed_sender, cfd_feed_receiver) = watch::channel::<Vec<Cfd>>(vec![]); |
||||
|
let (offer_feed_sender, offer_feed_receiver) = watch::channel::<Option<CfdOffer>>(None); |
||||
|
let (_balance_feed_sender, balance_feed_receiver) = watch::channel::<Amount>(Amount::ONE_BTC); |
||||
|
|
||||
|
let db: Map<_, Value> = map! { |
||||
|
"url" => "./maker.sqlite".into(), |
||||
|
}; |
||||
|
|
||||
|
let figment = rocket::Config::figment() |
||||
|
.merge(("databases", map!["maker" => db])) |
||||
|
.merge(("port", 8001)); |
||||
|
|
||||
|
let listener = tokio::net::TcpListener::bind("0.0.0.0:9999").await?; |
||||
|
let local_addr = listener.local_addr().unwrap(); |
||||
|
|
||||
|
println!("Listening on {}", local_addr); |
||||
|
|
||||
|
rocket::custom(figment) |
||||
|
.manage(cfd_feed_receiver) |
||||
|
.manage(offer_feed_receiver) |
||||
|
.manage(balance_feed_receiver) |
||||
|
.attach(Db::init()) |
||||
|
.attach(AdHoc::try_on_ignite( |
||||
|
"SQL migrations", |
||||
|
|rocket| async move { |
||||
|
match Db::fetch(&rocket) { |
||||
|
Some(db) => match db::run_migrations(&**db).await { |
||||
|
Ok(_) => Ok(rocket), |
||||
|
Err(_) => Err(rocket), |
||||
|
}, |
||||
|
None => Err(rocket), |
||||
|
} |
||||
|
}, |
||||
|
)) |
||||
|
.attach(AdHoc::try_on_ignite("Create actors", |rocket| async move { |
||||
|
let db = match Db::fetch(&rocket) { |
||||
|
Some(db) => (**db).clone(), |
||||
|
None => return Err(rocket), |
||||
|
}; |
||||
|
|
||||
|
let (connections_actor_inbox_sender, connections_actor_inbox_recv) = |
||||
|
mpsc::unbounded_channel(); |
||||
|
|
||||
|
let (cfd_maker_actor, cfd_maker_actor_inbox) = maker_cfd_actor::new( |
||||
|
db, |
||||
|
connections_actor_inbox_sender, |
||||
|
cfd_feed_sender, |
||||
|
offer_feed_sender, |
||||
|
); |
||||
|
let connections_actor = maker_inc_connections_actor::new( |
||||
|
listener, |
||||
|
cfd_maker_actor_inbox.clone(), |
||||
|
connections_actor_inbox_recv, |
||||
|
); |
||||
|
|
||||
|
tokio::spawn(cfd_maker_actor); |
||||
|
tokio::spawn(connections_actor); |
||||
|
|
||||
|
Ok(rocket.manage(cfd_maker_actor_inbox)) |
||||
|
})) |
||||
|
.mount( |
||||
|
"/", |
||||
|
rocket::routes![ |
||||
|
routes_maker::maker_feed, |
||||
|
routes_maker::post_sell_offer, |
||||
|
// routes_maker::post_confirm_offer,
|
||||
|
routes_maker::get_health_check |
||||
|
], |
||||
|
) |
||||
|
.launch() |
||||
|
.await?; |
||||
|
|
||||
|
Ok(()) |
||||
|
} |
@ -0,0 +1,145 @@ |
|||||
|
use std::time::SystemTime; |
||||
|
|
||||
|
use crate::model::cfd::{Cfd, CfdOffer, CfdOfferId, CfdState, CfdStateCommon}; |
||||
|
use crate::model::{TakerId, Usd}; |
||||
|
use crate::{db, maker_cfd_actor, maker_inc_connections_actor}; |
||||
|
use futures::Future; |
||||
|
use rust_decimal_macros::dec; |
||||
|
use tokio::sync::{mpsc, watch}; |
||||
|
|
||||
|
#[derive(Debug)] |
||||
|
pub enum Command { |
||||
|
TakeOffer { |
||||
|
taker_id: TakerId, |
||||
|
offer_id: CfdOfferId, |
||||
|
quantity: Usd, |
||||
|
}, |
||||
|
NewOffer(CfdOffer), |
||||
|
NewTakerOnline { |
||||
|
id: TakerId, |
||||
|
}, |
||||
|
} |
||||
|
|
||||
|
pub fn new( |
||||
|
db: sqlx::SqlitePool, |
||||
|
takers: mpsc::UnboundedSender<maker_inc_connections_actor::Command>, |
||||
|
cfd_feed_sender: watch::Sender<Vec<Cfd>>, |
||||
|
offer_feed_sender: watch::Sender<Option<CfdOffer>>, |
||||
|
) -> ( |
||||
|
impl Future<Output = ()>, |
||||
|
mpsc::UnboundedSender<maker_cfd_actor::Command>, |
||||
|
) { |
||||
|
let (sender, mut receiver) = mpsc::unbounded_channel(); |
||||
|
|
||||
|
let mut current_offer_id = None; |
||||
|
|
||||
|
let actor = async move { |
||||
|
while let Some(message) = receiver.recv().await { |
||||
|
match message { |
||||
|
maker_cfd_actor::Command::TakeOffer { |
||||
|
taker_id, |
||||
|
offer_id, |
||||
|
quantity, |
||||
|
} => { |
||||
|
println!( |
||||
|
"Taker {} wants to take {} of offer {}", |
||||
|
taker_id, quantity, offer_id |
||||
|
); |
||||
|
|
||||
|
let mut conn = db.acquire().await.unwrap(); |
||||
|
|
||||
|
// 1. Validate if offer is still valid
|
||||
|
let current_offer = match current_offer_id { |
||||
|
Some(current_offer_id) if current_offer_id == offer_id => { |
||||
|
db::load_offer_by_id(current_offer_id, &mut conn) |
||||
|
.await |
||||
|
.unwrap() |
||||
|
} |
||||
|
_ => { |
||||
|
takers |
||||
|
.send(maker_inc_connections_actor::Command::NotifyInvalidOfferId { |
||||
|
id: offer_id, |
||||
|
taker_id, |
||||
|
}) |
||||
|
.unwrap(); |
||||
|
continue; |
||||
|
} |
||||
|
}; |
||||
|
|
||||
|
// 2. Insert CFD in DB
|
||||
|
// TODO: Don't auto-accept, present to user in UI instead
|
||||
|
let cfd = Cfd::new( |
||||
|
current_offer, |
||||
|
quantity, |
||||
|
CfdState::Accepted { |
||||
|
common: CfdStateCommon { |
||||
|
transition_timestamp: SystemTime::now(), |
||||
|
}, |
||||
|
}, |
||||
|
Usd(dec!(10001)), |
||||
|
) |
||||
|
.unwrap(); |
||||
|
db::insert_cfd(cfd, &mut conn).await.unwrap(); |
||||
|
|
||||
|
takers |
||||
|
.send(maker_inc_connections_actor::Command::NotifyOfferAccepted { |
||||
|
id: offer_id, |
||||
|
taker_id, |
||||
|
}) |
||||
|
.unwrap(); |
||||
|
cfd_feed_sender |
||||
|
.send(db::load_all_cfds(&mut conn).await.unwrap()) |
||||
|
.unwrap(); |
||||
|
|
||||
|
// 3. Remove current offer
|
||||
|
current_offer_id = None; |
||||
|
takers |
||||
|
.send(maker_inc_connections_actor::Command::BroadcastCurrentOffer( |
||||
|
None, |
||||
|
)) |
||||
|
.unwrap(); |
||||
|
offer_feed_sender.send(None).unwrap(); |
||||
|
} |
||||
|
maker_cfd_actor::Command::NewOffer(offer) => { |
||||
|
// 1. Save to DB
|
||||
|
let mut conn = db.acquire().await.unwrap(); |
||||
|
db::insert_cfd_offer(&offer, &mut conn).await.unwrap(); |
||||
|
|
||||
|
// 2. Update actor state to current offer
|
||||
|
current_offer_id.replace(offer.id); |
||||
|
|
||||
|
// 3. Notify UI via feed
|
||||
|
offer_feed_sender.send(Some(offer.clone())).unwrap(); |
||||
|
|
||||
|
// 4. Inform connected takers
|
||||
|
takers |
||||
|
.send(maker_inc_connections_actor::Command::BroadcastCurrentOffer( |
||||
|
Some(offer), |
||||
|
)) |
||||
|
.unwrap(); |
||||
|
} |
||||
|
maker_cfd_actor::Command::NewTakerOnline { id: taker_id } => { |
||||
|
let mut conn = db.acquire().await.unwrap(); |
||||
|
|
||||
|
let current_offer = match current_offer_id { |
||||
|
Some(current_offer_id) => Some( |
||||
|
db::load_offer_by_id(current_offer_id, &mut conn) |
||||
|
.await |
||||
|
.unwrap(), |
||||
|
), |
||||
|
None => None, |
||||
|
}; |
||||
|
|
||||
|
takers |
||||
|
.send(maker_inc_connections_actor::Command::SendCurrentOffer { |
||||
|
offer: current_offer, |
||||
|
taker_id, |
||||
|
}) |
||||
|
.unwrap(); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
}; |
||||
|
|
||||
|
(actor, sender) |
||||
|
} |
@ -0,0 +1,107 @@ |
|||||
|
use crate::model::cfd::{CfdOffer, CfdOfferId}; |
||||
|
use crate::model::TakerId; |
||||
|
use crate::{maker_cfd_actor, maker_inc_connections_actor, send_wire_message_actor, wire}; |
||||
|
use futures::{Future, StreamExt}; |
||||
|
use std::collections::HashMap; |
||||
|
use tokio::net::tcp::OwnedReadHalf; |
||||
|
use tokio::net::TcpListener; |
||||
|
use tokio::sync::mpsc; |
||||
|
use tokio_util::codec::{FramedRead, LengthDelimitedCodec}; |
||||
|
|
||||
|
#[derive(Debug)] |
||||
|
pub enum Command { |
||||
|
BroadcastCurrentOffer(Option<CfdOffer>), |
||||
|
SendCurrentOffer { |
||||
|
offer: Option<CfdOffer>, |
||||
|
taker_id: TakerId, |
||||
|
}, |
||||
|
NotifyInvalidOfferId { |
||||
|
id: CfdOfferId, |
||||
|
taker_id: TakerId, |
||||
|
}, |
||||
|
NotifyOfferAccepted { |
||||
|
id: CfdOfferId, |
||||
|
taker_id: TakerId, |
||||
|
}, |
||||
|
} |
||||
|
|
||||
|
pub fn new( |
||||
|
listener: TcpListener, |
||||
|
cfd_maker_actor_inbox: mpsc::UnboundedSender<maker_cfd_actor::Command>, |
||||
|
mut our_inbox: mpsc::UnboundedReceiver<maker_inc_connections_actor::Command>, |
||||
|
) -> impl Future<Output = ()> { |
||||
|
let mut write_connections = |
||||
|
HashMap::<TakerId, mpsc::UnboundedSender<wire::MakerToTaker>>::new(); |
||||
|
|
||||
|
async move { |
||||
|
loop { |
||||
|
tokio::select! { |
||||
|
Ok((socket, remote_addr)) = listener.accept() => { |
||||
|
println!("Connected to {}", remote_addr); |
||||
|
let taker_id = TakerId::default(); |
||||
|
let (read, write) = socket.into_split(); |
||||
|
|
||||
|
let in_taker_actor = in_taker_messages(read, cfd_maker_actor_inbox.clone(), taker_id); |
||||
|
let (out_message_actor, out_message_actor_inbox) = send_wire_message_actor::new(write); |
||||
|
|
||||
|
tokio::spawn(in_taker_actor); |
||||
|
tokio::spawn(out_message_actor); |
||||
|
|
||||
|
cfd_maker_actor_inbox.send(maker_cfd_actor::Command::NewTakerOnline{id : taker_id}).unwrap(); |
||||
|
|
||||
|
write_connections.insert(taker_id, out_message_actor_inbox); |
||||
|
}, |
||||
|
Some(message) = our_inbox.recv() => { |
||||
|
match message { |
||||
|
maker_inc_connections_actor::Command::NotifyInvalidOfferId { id, taker_id } => { |
||||
|
let conn = write_connections.get(&taker_id).expect("no connection to taker_id"); |
||||
|
conn.send(wire::MakerToTaker::InvalidOfferId(id)).unwrap(); |
||||
|
}, |
||||
|
maker_inc_connections_actor::Command::BroadcastCurrentOffer(offer) => { |
||||
|
for conn in write_connections.values() { |
||||
|
conn.send(wire::MakerToTaker::CurrentOffer(offer.clone())).unwrap(); |
||||
|
} |
||||
|
}, |
||||
|
maker_inc_connections_actor::Command::SendCurrentOffer {offer, taker_id} => { |
||||
|
let conn = write_connections.get(&taker_id).expect("no connection to taker_id"); |
||||
|
conn.send(wire::MakerToTaker::CurrentOffer(offer)).unwrap(); |
||||
|
}, |
||||
|
maker_inc_connections_actor::Command::NotifyOfferAccepted { id, taker_id } => { |
||||
|
let conn = write_connections.get(&taker_id).expect("no connection to taker_id"); |
||||
|
conn.send(wire::MakerToTaker::ConfirmTakeOffer(id)).unwrap(); |
||||
|
}, |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
fn in_taker_messages( |
||||
|
read: OwnedReadHalf, |
||||
|
cfd_actor_inbox: mpsc::UnboundedSender<maker_cfd_actor::Command>, |
||||
|
taker_id: TakerId, |
||||
|
) -> impl Future<Output = ()> { |
||||
|
let mut messages = FramedRead::new(read, LengthDelimitedCodec::new()).map(|result| { |
||||
|
let message = serde_json::from_slice::<wire::TakerToMaker>(&result?)?; |
||||
|
anyhow::Result::<_>::Ok(message) |
||||
|
}); |
||||
|
|
||||
|
async move { |
||||
|
while let Some(message) = messages.next().await { |
||||
|
match message { |
||||
|
Ok(wire::TakerToMaker::TakeOffer { offer_id, quantity }) => cfd_actor_inbox |
||||
|
.send(maker_cfd_actor::Command::TakeOffer { |
||||
|
taker_id, |
||||
|
offer_id, |
||||
|
quantity, |
||||
|
}) |
||||
|
.unwrap(), |
||||
|
Ok(wire::TakerToMaker::StartContractSetup(_offer_id)) => {} |
||||
|
Err(error) => { |
||||
|
eprintln!("Error in reading message: {}", error); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
} |
@ -0,0 +1,31 @@ |
|||||
|
use futures::{Future, SinkExt}; |
||||
|
use serde::Serialize; |
||||
|
use tokio::net::tcp::OwnedWriteHalf; |
||||
|
use tokio::sync::mpsc; |
||||
|
use tokio_util::codec::{FramedWrite, LengthDelimitedCodec}; |
||||
|
|
||||
|
pub fn new<T>(write: OwnedWriteHalf) -> (impl Future<Output = ()>, mpsc::UnboundedSender<T>) |
||||
|
where |
||||
|
T: Serialize, |
||||
|
{ |
||||
|
let (sender, mut receiver) = mpsc::unbounded_channel::<T>(); |
||||
|
|
||||
|
let actor = async move { |
||||
|
let mut framed_write = FramedWrite::new(write, LengthDelimitedCodec::new()); |
||||
|
|
||||
|
while let Some(message) = receiver.recv().await { |
||||
|
match framed_write |
||||
|
.send(serde_json::to_vec(&message).unwrap().into()) |
||||
|
.await |
||||
|
{ |
||||
|
Ok(_) => {} |
||||
|
Err(_) => { |
||||
|
eprintln!("TCP connection error"); |
||||
|
break; |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
}; |
||||
|
|
||||
|
(actor, sender) |
||||
|
} |
@ -1,41 +0,0 @@ |
|||||
use serde::{Deserialize, Serialize}; |
|
||||
use tokio::net::tcp::OwnedWriteHalf; |
|
||||
|
|
||||
use crate::model::cfd::{CfdOffer, CfdOfferId, CfdTakeRequest}; |
|
||||
|
|
||||
use futures::SinkExt; |
|
||||
use tokio::sync::mpsc::UnboundedSender; |
|
||||
use tokio_util::codec::{FramedWrite, LengthDelimitedCodec}; |
|
||||
|
|
||||
// TODO: Implement messages used for confirming CFD offer between maker and taker
|
|
||||
#[derive(Debug, Serialize, Deserialize)] |
|
||||
#[serde(tag = "type", content = "payload")] |
|
||||
pub enum Message { |
|
||||
CurrentOffer(Option<CfdOffer>), |
|
||||
TakeOffer(CfdTakeRequest), |
|
||||
// TODO: Needs RejectOffer as well
|
|
||||
ConfirmTakeOffer(CfdOfferId), |
|
||||
// TODO: Currently the taker starts, can already send some stuff for signing over in the first message.
|
|
||||
StartContractSetup(CfdOfferId), |
|
||||
} |
|
||||
|
|
||||
pub fn spawn_sender(write: OwnedWriteHalf) -> UnboundedSender<Message> { |
|
||||
let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel::<Message>(); |
|
||||
tokio::spawn(async move { |
|
||||
let mut framed_write = FramedWrite::new(write, LengthDelimitedCodec::new()); |
|
||||
|
|
||||
while let Some(message) = receiver.recv().await { |
|
||||
match framed_write |
|
||||
.send(serde_json::to_vec(&message).unwrap().into()) |
|
||||
.await |
|
||||
{ |
|
||||
Ok(_) => {} |
|
||||
Err(_) => { |
|
||||
eprintln!("TCP connection error"); |
|
||||
break; |
|
||||
} |
|
||||
} |
|
||||
} |
|
||||
}); |
|
||||
sender |
|
||||
} |
|
@ -1,56 +0,0 @@ |
|||||
use crate::db::{ |
|
||||
insert_cfd, insert_cfd_offer, insert_new_cfd_state, insert_new_cfd_state_by_offer_id, |
|
||||
load_all_cfds, load_offer_by_id, |
|
||||
}; |
|
||||
use crate::model::cfd::{Cfd, CfdOffer, CfdOfferId, CfdState}; |
|
||||
use tokio::sync::watch; |
|
||||
|
|
||||
pub mod maker; |
|
||||
pub mod taker; |
|
||||
|
|
||||
#[derive(Debug)] |
|
||||
pub enum Command { |
|
||||
SaveOffer(CfdOffer), |
|
||||
SaveCfd(Cfd), |
|
||||
SaveNewCfdState(Cfd), |
|
||||
SaveNewCfdStateByOfferId(CfdOfferId, CfdState), |
|
||||
RefreshCfdFeed, |
|
||||
} |
|
||||
|
|
||||
pub async fn handle_command( |
|
||||
db: &sqlx::SqlitePool, |
|
||||
command: Command, |
|
||||
cfd_feed_sender: &watch::Sender<Vec<Cfd>>, |
|
||||
) -> anyhow::Result<()> { |
|
||||
println!("Handle command: {:?}", command); |
|
||||
|
|
||||
match command { |
|
||||
Command::SaveOffer(cfd_offer) => { |
|
||||
// Only save offer when it wasn't already saved (e.g. taker
|
|
||||
// can see the same "latest" offer when it comes back online)
|
|
||||
if let Ok(offer) = load_offer_by_id(cfd_offer.id, db).await { |
|
||||
println!("Offer with id {:?} already stored in the db.", offer.id); |
|
||||
} else { |
|
||||
insert_cfd_offer(cfd_offer, db).await? |
|
||||
} |
|
||||
} |
|
||||
Command::SaveCfd(cfd) => { |
|
||||
insert_cfd(cfd, db).await?; |
|
||||
cfd_feed_sender.send(load_all_cfds(db).await?)?; |
|
||||
} |
|
||||
Command::SaveNewCfdState(cfd) => { |
|
||||
insert_new_cfd_state(cfd, db).await?; |
|
||||
cfd_feed_sender.send(load_all_cfds(db).await?)?; |
|
||||
} |
|
||||
|
|
||||
Command::SaveNewCfdStateByOfferId(offer_id, state) => { |
|
||||
insert_new_cfd_state_by_offer_id(offer_id, state, db).await?; |
|
||||
cfd_feed_sender.send(load_all_cfds(db).await?)?; |
|
||||
} |
|
||||
Command::RefreshCfdFeed => { |
|
||||
cfd_feed_sender.send(load_all_cfds(db).await?)?; |
|
||||
} |
|
||||
} |
|
||||
|
|
||||
Ok(()) |
|
||||
} |
|
@ -1,65 +0,0 @@ |
|||||
use crate::model::cfd::{Cfd, CfdOffer, CfdOfferId, CfdState}; |
|
||||
use crate::state; |
|
||||
use crate::state::handle_command; |
|
||||
use rocket::{fairing, Build, Rocket}; |
|
||||
use rocket_db_pools::Database; |
|
||||
use std::convert::{TryFrom, TryInto}; |
|
||||
use tokio::sync::{mpsc, watch}; |
|
||||
|
|
||||
#[derive(Debug, Clone)] |
|
||||
pub enum Command { |
|
||||
SaveOffer(CfdOffer), |
|
||||
SaveCfd(Cfd), |
|
||||
SaveNewCfdState(Cfd), |
|
||||
SaveNewCfdStateByOfferId(CfdOfferId, CfdState), |
|
||||
/// All other commands that are interacting with Cfds perform a refresh
|
|
||||
/// automatically - as such, RefreshCfdFeed should be used only on init
|
|
||||
RefreshCfdFeed, |
|
||||
} |
|
||||
|
|
||||
pub struct NotSharedCommand; |
|
||||
|
|
||||
impl TryFrom<Command> for state::Command { |
|
||||
type Error = NotSharedCommand; |
|
||||
|
|
||||
fn try_from(value: Command) -> Result<Self, Self::Error> { |
|
||||
let command = match value { |
|
||||
Command::SaveOffer(offer) => state::Command::SaveOffer(offer), |
|
||||
Command::SaveCfd(cfd) => state::Command::SaveCfd(cfd), |
|
||||
Command::SaveNewCfdState(cfd) => state::Command::SaveNewCfdState(cfd), |
|
||||
Command::SaveNewCfdStateByOfferId(uuid, cfd_state) => { |
|
||||
state::Command::SaveNewCfdStateByOfferId(uuid, cfd_state) |
|
||||
} |
|
||||
Command::RefreshCfdFeed => state::Command::RefreshCfdFeed, |
|
||||
}; |
|
||||
|
|
||||
Ok(command) |
|
||||
} |
|
||||
} |
|
||||
|
|
||||
// TODO write more rocket wrapper functions to insert into db and create long-running tasks + channels handling the insert
|
|
||||
pub async fn maker_do_something( |
|
||||
rocket: Rocket<Build>, |
|
||||
mut db_command_receiver: mpsc::Receiver<Command>, |
|
||||
cfd_feed_sender: watch::Sender<Vec<Cfd>>, |
|
||||
) -> fairing::Result { |
|
||||
let db = match crate::db::maker::Maker::fetch(&rocket) { |
|
||||
Some(db) => (**db).clone(), |
|
||||
None => return Err(rocket), |
|
||||
}; |
|
||||
|
|
||||
tokio::spawn(async move { |
|
||||
while let Some(command) = db_command_receiver.recv().await { |
|
||||
match command.clone().try_into() { |
|
||||
Ok(shared_command) => { |
|
||||
handle_command(&db, shared_command, &cfd_feed_sender) |
|
||||
.await |
|
||||
.unwrap(); |
|
||||
} |
|
||||
Err(_) => unreachable!("currently there are only shared commands"), |
|
||||
} |
|
||||
} |
|
||||
}); |
|
||||
|
|
||||
Ok(rocket) |
|
||||
} |
|
@ -1,64 +0,0 @@ |
|||||
use crate::model::cfd::{Cfd, CfdOffer, CfdOfferId, CfdState}; |
|
||||
use crate::state; |
|
||||
use crate::state::handle_command; |
|
||||
use rocket::{fairing, Build, Rocket}; |
|
||||
use rocket_db_pools::Database; |
|
||||
use std::convert::{TryFrom, TryInto}; |
|
||||
use tokio::sync::{mpsc, watch}; |
|
||||
|
|
||||
#[derive(Debug, Clone)] |
|
||||
pub enum Command { |
|
||||
SaveOffer(CfdOffer), |
|
||||
SaveCfd(Cfd), |
|
||||
SaveNewCfdState(Cfd), |
|
||||
SaveNewCfdStateByOfferId(CfdOfferId, CfdState), |
|
||||
/// All other commands that are interacting with Cfds perform a refresh
|
|
||||
/// automatically - as such, RefreshCfdFeed should be used only on init
|
|
||||
RefreshCfdFeed, |
|
||||
} |
|
||||
|
|
||||
pub struct CommandError; |
|
||||
|
|
||||
impl TryFrom<Command> for state::Command { |
|
||||
type Error = CommandError; |
|
||||
|
|
||||
fn try_from(value: Command) -> Result<Self, Self::Error> { |
|
||||
let command = match value { |
|
||||
Command::SaveOffer(offer) => state::Command::SaveOffer(offer), |
|
||||
Command::SaveCfd(cfd) => state::Command::SaveCfd(cfd), |
|
||||
Command::SaveNewCfdState(cfd) => state::Command::SaveNewCfdState(cfd), |
|
||||
Command::SaveNewCfdStateByOfferId(uuid, cfd_state) => { |
|
||||
state::Command::SaveNewCfdStateByOfferId(uuid, cfd_state) |
|
||||
} |
|
||||
Command::RefreshCfdFeed => state::Command::RefreshCfdFeed, |
|
||||
}; |
|
||||
|
|
||||
Ok(command) |
|
||||
} |
|
||||
} |
|
||||
|
|
||||
pub async fn hey_db_do_something( |
|
||||
rocket: Rocket<Build>, |
|
||||
mut db_command_receiver: mpsc::Receiver<Command>, |
|
||||
cfd_feed_sender: watch::Sender<Vec<Cfd>>, |
|
||||
) -> fairing::Result { |
|
||||
let db = match crate::db::taker::Taker::fetch(&rocket) { |
|
||||
Some(db) => (**db).clone(), |
|
||||
None => return Err(rocket), |
|
||||
}; |
|
||||
|
|
||||
tokio::spawn(async move { |
|
||||
while let Some(command) = db_command_receiver.recv().await { |
|
||||
match command.clone().try_into() { |
|
||||
Ok(shared_command) => { |
|
||||
handle_command(&db, shared_command, &cfd_feed_sender) |
|
||||
.await |
|
||||
.unwrap(); |
|
||||
} |
|
||||
Err(_) => unreachable!("currently there are only shared commands"), |
|
||||
} |
|
||||
} |
|
||||
}); |
|
||||
|
|
||||
Ok(rocket) |
|
||||
} |
|
@ -0,0 +1,95 @@ |
|||||
|
use anyhow::Result; |
||||
|
use bdk::bitcoin::Amount; |
||||
|
use model::cfd::{Cfd, CfdOffer}; |
||||
|
use rocket::fairing::AdHoc; |
||||
|
use rocket::figment::util::map; |
||||
|
use rocket::figment::value::{Map, Value}; |
||||
|
use rocket_db_pools::Database; |
||||
|
use tokio::sync::watch; |
||||
|
|
||||
|
mod db; |
||||
|
mod model; |
||||
|
mod routes_taker; |
||||
|
mod send_wire_message_actor; |
||||
|
mod taker_cfd_actor; |
||||
|
mod taker_inc_message_actor; |
||||
|
mod to_sse_event; |
||||
|
mod wire; |
||||
|
|
||||
|
#[derive(Database)] |
||||
|
#[database("taker")] |
||||
|
pub struct Db(sqlx::SqlitePool); |
||||
|
|
||||
|
#[rocket::main] |
||||
|
async fn main() -> Result<()> { |
||||
|
let (cfd_feed_sender, cfd_feed_receiver) = watch::channel::<Vec<Cfd>>(vec![]); |
||||
|
let (offer_feed_sender, offer_feed_receiver) = watch::channel::<Option<CfdOffer>>(None); |
||||
|
let (_balance_feed_sender, balance_feed_receiver) = watch::channel::<Amount>(Amount::ONE_BTC); |
||||
|
|
||||
|
let socket = tokio::net::TcpSocket::new_v4().unwrap(); |
||||
|
let connection = socket |
||||
|
.connect("0.0.0.0:9999".parse().unwrap()) |
||||
|
.await |
||||
|
.expect("Maker should be online first"); |
||||
|
|
||||
|
let (read, write) = connection.into_split(); |
||||
|
|
||||
|
let db: Map<_, Value> = map! { |
||||
|
"url" => "./taker.sqlite".into(), |
||||
|
}; |
||||
|
|
||||
|
let figment = rocket::Config::figment().merge(("databases", map!["taker" => db])); |
||||
|
|
||||
|
rocket::custom(figment) |
||||
|
.manage(cfd_feed_receiver) |
||||
|
.manage(offer_feed_receiver) |
||||
|
.manage(balance_feed_receiver) |
||||
|
.attach(Db::init()) |
||||
|
.attach(AdHoc::try_on_ignite( |
||||
|
"SQL migrations", |
||||
|
|rocket| async move { |
||||
|
match Db::fetch(&rocket) { |
||||
|
Some(db) => match db::run_migrations(&**db).await { |
||||
|
Ok(_) => Ok(rocket), |
||||
|
Err(_) => Err(rocket), |
||||
|
}, |
||||
|
None => Err(rocket), |
||||
|
} |
||||
|
}, |
||||
|
)) |
||||
|
.attach(AdHoc::try_on_ignite("Create actors", |rocket| async move { |
||||
|
let db = match Db::fetch(&rocket) { |
||||
|
Some(db) => (**db).clone(), |
||||
|
None => return Err(rocket), |
||||
|
}; |
||||
|
|
||||
|
let (out_maker_messages_actor, out_maker_actor_inbox) = |
||||
|
send_wire_message_actor::new(write); |
||||
|
let (cfd_actor, cfd_actor_inbox) = taker_cfd_actor::new( |
||||
|
db, |
||||
|
cfd_feed_sender, |
||||
|
offer_feed_sender, |
||||
|
out_maker_actor_inbox, |
||||
|
); |
||||
|
let inc_maker_messages_actor = |
||||
|
taker_inc_message_actor::new(read, cfd_actor_inbox.clone()); |
||||
|
|
||||
|
tokio::spawn(cfd_actor); |
||||
|
tokio::spawn(inc_maker_messages_actor); |
||||
|
tokio::spawn(out_maker_messages_actor); |
||||
|
|
||||
|
Ok(rocket.manage(cfd_actor_inbox)) |
||||
|
})) |
||||
|
.mount( |
||||
|
"/", |
||||
|
rocket::routes![ |
||||
|
routes_taker::feed, |
||||
|
routes_taker::post_cfd, |
||||
|
routes_taker::get_health_check |
||||
|
], |
||||
|
) |
||||
|
.launch() |
||||
|
.await?; |
||||
|
|
||||
|
Ok(()) |
||||
|
} |
@ -0,0 +1,89 @@ |
|||||
|
use crate::model::cfd::{Cfd, CfdOffer, CfdOfferId, CfdState, CfdStateCommon}; |
||||
|
use crate::model::Usd; |
||||
|
use crate::{db, wire}; |
||||
|
use futures::Future; |
||||
|
use std::time::SystemTime; |
||||
|
use tokio::sync::{mpsc, watch}; |
||||
|
|
||||
|
#[derive(Debug)] |
||||
|
pub enum Command { |
||||
|
TakeOffer { offer_id: CfdOfferId, quantity: Usd }, |
||||
|
NewOffer(Option<CfdOffer>), |
||||
|
OfferAccepted(CfdOfferId), |
||||
|
} |
||||
|
|
||||
|
pub fn new( |
||||
|
db: sqlx::SqlitePool, |
||||
|
cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>, |
||||
|
offer_feed_actor_inbox: watch::Sender<Option<CfdOffer>>, |
||||
|
out_msg_maker_inbox: mpsc::UnboundedSender<wire::TakerToMaker>, |
||||
|
) -> (impl Future<Output = ()>, mpsc::UnboundedSender<Command>) { |
||||
|
let (sender, mut receiver) = mpsc::unbounded_channel(); |
||||
|
|
||||
|
let actor = async move { |
||||
|
while let Some(message) = receiver.recv().await { |
||||
|
match message { |
||||
|
Command::TakeOffer { offer_id, quantity } => { |
||||
|
let mut conn = db.acquire().await.unwrap(); |
||||
|
|
||||
|
let current_offer = db::load_offer_by_id(offer_id, &mut conn).await.unwrap(); |
||||
|
|
||||
|
println!("Accepting current offer: {:?}", ¤t_offer); |
||||
|
|
||||
|
let cfd = Cfd::new( |
||||
|
current_offer, |
||||
|
quantity, |
||||
|
CfdState::PendingTakeRequest { |
||||
|
common: CfdStateCommon { |
||||
|
transition_timestamp: SystemTime::now(), |
||||
|
}, |
||||
|
}, |
||||
|
Usd::ZERO, |
||||
|
) |
||||
|
.unwrap(); |
||||
|
|
||||
|
db::insert_cfd(cfd, &mut conn).await.unwrap(); |
||||
|
|
||||
|
cfd_feed_actor_inbox |
||||
|
.send(db::load_all_cfds(&mut conn).await.unwrap()) |
||||
|
.unwrap(); |
||||
|
out_msg_maker_inbox |
||||
|
.send(wire::TakerToMaker::TakeOffer { offer_id, quantity }) |
||||
|
.unwrap(); |
||||
|
} |
||||
|
Command::NewOffer(Some(offer)) => { |
||||
|
let mut conn = db.acquire().await.unwrap(); |
||||
|
db::insert_cfd_offer(&offer, &mut conn).await.unwrap(); |
||||
|
offer_feed_actor_inbox.send(Some(offer)).unwrap(); |
||||
|
} |
||||
|
|
||||
|
Command::NewOffer(None) => { |
||||
|
offer_feed_actor_inbox.send(None).unwrap(); |
||||
|
} |
||||
|
|
||||
|
Command::OfferAccepted(offer_id) => { |
||||
|
let mut conn = db.acquire().await.unwrap(); |
||||
|
db::insert_new_cfd_state_by_offer_id( |
||||
|
offer_id, |
||||
|
CfdState::ContractSetup { |
||||
|
common: CfdStateCommon { |
||||
|
transition_timestamp: SystemTime::now(), |
||||
|
}, |
||||
|
}, |
||||
|
&mut conn, |
||||
|
) |
||||
|
.await |
||||
|
.unwrap(); |
||||
|
|
||||
|
cfd_feed_actor_inbox |
||||
|
.send(db::load_all_cfds(&mut conn).await.unwrap()) |
||||
|
.unwrap(); |
||||
|
|
||||
|
// TODO: Contract signing/setup
|
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
}; |
||||
|
|
||||
|
(actor, sender) |
||||
|
} |
@ -0,0 +1,41 @@ |
|||||
|
use crate::{taker_cfd_actor, wire}; |
||||
|
use futures::{Future, StreamExt}; |
||||
|
use tokio::net::tcp::OwnedReadHalf; |
||||
|
use tokio::sync::mpsc; |
||||
|
use tokio_util::codec::{FramedRead, LengthDelimitedCodec}; |
||||
|
|
||||
|
pub fn new( |
||||
|
read: OwnedReadHalf, |
||||
|
cfd_actor: mpsc::UnboundedSender<taker_cfd_actor::Command>, |
||||
|
) -> impl Future<Output = ()> { |
||||
|
let frame_read = FramedRead::new(read, LengthDelimitedCodec::new()); |
||||
|
|
||||
|
let mut messages = frame_read.map(|result| { |
||||
|
let message = serde_json::from_slice::<wire::MakerToTaker>(&result?)?; |
||||
|
anyhow::Result::<_>::Ok(message) |
||||
|
}); |
||||
|
|
||||
|
async move { |
||||
|
while let Some(message) = messages.next().await { |
||||
|
match message { |
||||
|
Ok(wire::MakerToTaker::CurrentOffer(offer)) => { |
||||
|
cfd_actor |
||||
|
.send(taker_cfd_actor::Command::NewOffer(offer)) |
||||
|
.unwrap(); |
||||
|
} |
||||
|
Ok(wire::MakerToTaker::ConfirmTakeOffer(offer_id)) => { |
||||
|
// TODO: This naming is not well aligned.
|
||||
|
cfd_actor |
||||
|
.send(taker_cfd_actor::Command::OfferAccepted(offer_id)) |
||||
|
.unwrap(); |
||||
|
} |
||||
|
Ok(wire::MakerToTaker::InvalidOfferId(_)) => { |
||||
|
todo!() |
||||
|
} |
||||
|
Err(error) => { |
||||
|
eprintln!("Error in reading message: {}", error); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
} |
@ -0,0 +1,25 @@ |
|||||
|
use crate::model::cfd::{Cfd, CfdOffer}; |
||||
|
use bdk::bitcoin::Amount; |
||||
|
use rocket::response::stream::Event; |
||||
|
|
||||
|
pub trait ToSseEvent { |
||||
|
fn to_sse_event(&self) -> Event; |
||||
|
} |
||||
|
|
||||
|
impl ToSseEvent for Vec<Cfd> { |
||||
|
fn to_sse_event(&self) -> Event { |
||||
|
Event::json(self).event("cfds") |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
impl ToSseEvent for Option<CfdOffer> { |
||||
|
fn to_sse_event(&self) -> Event { |
||||
|
Event::json(self).event("offer") |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
impl ToSseEvent for Amount { |
||||
|
fn to_sse_event(&self) -> Event { |
||||
|
Event::json(&self.as_btc()).event("balance") |
||||
|
} |
||||
|
} |
@ -0,0 +1,21 @@ |
|||||
|
use crate::model::cfd::CfdOfferId; |
||||
|
use crate::model::Usd; |
||||
|
use crate::CfdOffer; |
||||
|
use serde::{Deserialize, Serialize}; |
||||
|
|
||||
|
#[derive(Debug, Serialize, Deserialize)] |
||||
|
#[serde(tag = "type", content = "payload")] |
||||
|
pub enum TakerToMaker { |
||||
|
TakeOffer { offer_id: CfdOfferId, quantity: Usd }, |
||||
|
// TODO: Currently the taker starts, can already send some stuff for signing over in the first message.
|
||||
|
StartContractSetup(CfdOfferId), |
||||
|
} |
||||
|
|
||||
|
#[derive(Debug, Serialize, Deserialize)] |
||||
|
#[serde(tag = "type", content = "payload")] |
||||
|
pub enum MakerToTaker { |
||||
|
CurrentOffer(Option<CfdOffer>), |
||||
|
// TODO: Needs RejectOffer as well
|
||||
|
ConfirmTakeOffer(CfdOfferId), |
||||
|
InvalidOfferId(CfdOfferId), |
||||
|
} |
Loading…
Reference in new issue