Browse Source

Remodel daemons to follow actor model more closely

Co-authored-by: Mariusz Klochowicz <mariusz@klochowicz.com>
integrate-protocol-maker-side
Thomas Eizinger 3 years ago
parent
commit
3112f1f7be
No known key found for this signature in database GPG Key ID: 651AC83A6C6C8B96
  1. 3
      .github/workflows/ci.yml
  2. 8
      daemon/Cargo.toml
  3. 9
      daemon/src/bin/maker.rs
  4. 9
      daemon/src/bin/taker.rs
  5. 93
      daemon/src/db.rs
  6. 17
      daemon/src/db/maker.rs
  7. 17
      daemon/src/db/taker.rs
  8. 9
      daemon/src/lib.rs
  9. 98
      daemon/src/maker.rs
  10. 145
      daemon/src/maker_cfd_actor.rs
  11. 107
      daemon/src/maker_inc_connections_actor.rs
  12. 25
      daemon/src/model.rs
  13. 62
      daemon/src/model/cfd.rs
  14. 264
      daemon/src/routes_maker.rs
  15. 234
      daemon/src/routes_taker.rs
  16. 31
      daemon/src/send_wire_message_actor.rs
  17. 41
      daemon/src/socket.rs
  18. 56
      daemon/src/state.rs
  19. 65
      daemon/src/state/maker.rs
  20. 64
      daemon/src/state/taker.rs
  21. 95
      daemon/src/taker.rs
  22. 89
      daemon/src/taker_cfd_actor.rs
  23. 41
      daemon/src/taker_inc_message_actor.rs
  24. 25
      daemon/src/to_sse_event.rs
  25. 21
      daemon/src/wire.rs

3
.github/workflows/ci.yml

@ -67,6 +67,7 @@ jobs:
- name: Smoke test ${{ matrix.target }} release binary
run: |
target/${{ matrix.target }}/release/maker &
sleep 5s # Wait for maker to start
target/${{ matrix.target }}/release/taker &
sleep 5s
sleep 5s # Wait for taker to start
curl --fail http://localhost:8000/alive

8
daemon/Cargo.toml

@ -20,5 +20,13 @@ tokio = { version = "1", features = ["rt-multi-thread", "time", "macros", "sync"
tokio-util = { version = "0.6", features = ["codec"] }
uuid = { version = "0.8", features = ["serde", "v4"] }
[[bin]]
name = "taker"
path = "src/taker.rs"
[[bin]]
name = "maker"
path = "src/maker.rs"
[dev-dependencies]
tempfile = "3"

9
daemon/src/bin/maker.rs

@ -1,9 +0,0 @@
use anyhow::Result;
use daemon::routes_maker;
#[rocket::main]
async fn main() -> Result<()> {
routes_maker::start_http().await?;
Ok(())
}

9
daemon/src/bin/taker.rs

@ -1,9 +0,0 @@
use anyhow::Result;
use daemon::routes_taker;
#[rocket::main]
async fn main() -> Result<()> {
routes_taker::start_http().await?;
Ok(())
}

93
daemon/src/db.rs

@ -1,24 +1,22 @@
use std::convert::TryInto;
use std::mem;
use crate::model::cfd::{Cfd, CfdOffer, CfdOfferId, CfdState};
use crate::model::{Leverage, Usd};
use anyhow::Context;
use bdk::bitcoin::Amount;
use rocket_db_pools::sqlx;
use sqlx::pool::PoolConnection;
use sqlx::{Sqlite, SqlitePool};
use crate::model::cfd::{Cfd, CfdOffer, CfdOfferId, CfdState};
use crate::model::{Leverage, Usd};
pub mod maker;
pub mod taker;
use sqlx::{Acquire, Sqlite, SqlitePool};
use std::convert::TryInto;
use std::mem;
pub async fn do_run_migrations(pool: &SqlitePool) -> anyhow::Result<()> {
pub async fn run_migrations(pool: &SqlitePool) -> anyhow::Result<()> {
sqlx::migrate!("./migrations").run(pool).await?;
Ok(())
}
pub async fn insert_cfd_offer(cfd_offer: CfdOffer, pool: &SqlitePool) -> anyhow::Result<()> {
pub async fn insert_cfd_offer(
cfd_offer: &CfdOffer,
conn: &mut PoolConnection<Sqlite>,
) -> anyhow::Result<()> {
let uuid = serde_json::to_string(&cfd_offer.id).unwrap();
let trading_pair = serde_json::to_string(&cfd_offer.trading_pair).unwrap();
let position = serde_json::to_string(&cfd_offer.position).unwrap();
@ -56,15 +54,13 @@ pub async fn insert_cfd_offer(cfd_offer: CfdOffer, pool: &SqlitePool) -> anyhow:
creation_timestamp,
term
)
.execute(pool)
.execute(conn)
.await?;
Ok(())
}
// TODO: Consider refactor the API to consistently present PoolConnections
pub async fn load_offer_by_id_from_conn(
pub async fn load_offer_by_id(
id: CfdOfferId,
conn: &mut PoolConnection<Sqlite>,
) -> anyhow::Result<CfdOffer> {
@ -104,12 +100,9 @@ pub async fn load_offer_by_id_from_conn(
})
}
pub async fn load_offer_by_id(id: CfdOfferId, pool: &SqlitePool) -> anyhow::Result<CfdOffer> {
let mut connection = pool.acquire().await?;
load_offer_by_id_from_conn(id, &mut connection).await
}
pub async fn insert_cfd(cfd: Cfd, conn: &mut PoolConnection<Sqlite>) -> anyhow::Result<()> {
let mut tx = conn.begin().await?;
pub async fn insert_cfd(cfd: Cfd, pool: &SqlitePool) -> anyhow::Result<()> {
let offer_uuid = serde_json::to_string(&cfd.offer_id)?;
let offer_row = sqlx::query!(
r#"
@ -117,7 +110,7 @@ pub async fn insert_cfd(cfd: Cfd, pool: &SqlitePool) -> anyhow::Result<()> {
"#,
offer_uuid
)
.fetch_one(pool)
.fetch_one(&mut tx)
.await?;
let offer_id = offer_row.id;
@ -126,7 +119,6 @@ pub async fn insert_cfd(cfd: Cfd, pool: &SqlitePool) -> anyhow::Result<()> {
let cfd_state = serde_json::to_string(&cfd.state)?;
// save cfd + state in a transaction to make sure the state is only inserted if the cfd was inserted
let mut tx = pool.begin().await?;
let cfd_id = sqlx::query!(
r#"
@ -162,13 +154,14 @@ pub async fn insert_cfd(cfd: Cfd, pool: &SqlitePool) -> anyhow::Result<()> {
Ok(())
}
#[allow(dead_code)] // This is only used by one binary.
pub async fn insert_new_cfd_state_by_offer_id(
offer_id: CfdOfferId,
new_state: CfdState,
pool: &SqlitePool,
conn: &mut PoolConnection<Sqlite>,
) -> anyhow::Result<()> {
let cfd_id = load_cfd_id_by_offer_uuid(offer_id, pool).await?;
let latest_cfd_state_in_db = load_latest_cfd_state(cfd_id, pool)
let cfd_id = load_cfd_id_by_offer_uuid(offer_id, conn).await?;
let latest_cfd_state_in_db = load_latest_cfd_state(cfd_id, conn)
.await
.context("loading latest state failed")?;
@ -189,21 +182,15 @@ pub async fn insert_new_cfd_state_by_offer_id(
cfd_id,
cfd_state,
)
.execute(pool)
.execute(conn)
.await?;
Ok(())
}
pub async fn insert_new_cfd_state(cfd: Cfd, pool: &SqlitePool) -> anyhow::Result<()> {
insert_new_cfd_state_by_offer_id(cfd.offer_id, cfd.state, pool).await?;
Ok(())
}
async fn load_cfd_id_by_offer_uuid(
offer_uuid: CfdOfferId,
pool: &SqlitePool,
conn: &mut PoolConnection<Sqlite>,
) -> anyhow::Result<i64> {
let offer_uuid = serde_json::to_string(&offer_uuid)?;
@ -216,7 +203,7 @@ async fn load_cfd_id_by_offer_uuid(
"#,
offer_uuid
)
.fetch_one(pool)
.fetch_one(conn)
.await?;
let cfd_id = cfd_id.id.context("No cfd found")?;
@ -224,7 +211,10 @@ async fn load_cfd_id_by_offer_uuid(
Ok(cfd_id)
}
async fn load_latest_cfd_state(cfd_id: i64, pool: &SqlitePool) -> anyhow::Result<CfdState> {
async fn load_latest_cfd_state(
cfd_id: i64,
conn: &mut PoolConnection<Sqlite>,
) -> anyhow::Result<CfdState> {
let latest_cfd_state = sqlx::query!(
r#"
select
@ -236,7 +226,7 @@ async fn load_latest_cfd_state(cfd_id: i64, pool: &SqlitePool) -> anyhow::Result
"#,
cfd_id
)
.fetch_one(pool)
.fetch_one(conn)
.await?;
let latest_cfd_state_in_db: CfdState =
@ -246,7 +236,7 @@ async fn load_latest_cfd_state(cfd_id: i64, pool: &SqlitePool) -> anyhow::Result
}
/// Loads all CFDs with the latest state as the CFD state
pub async fn load_all_cfds(pool: &SqlitePool) -> anyhow::Result<Vec<Cfd>> {
pub async fn load_all_cfds(conn: &mut PoolConnection<Sqlite>) -> anyhow::Result<Vec<Cfd>> {
// TODO: Could be optimized with something like but not sure it's worth the complexity:
let rows = sqlx::query!(
@ -274,7 +264,7 @@ pub async fn load_all_cfds(pool: &SqlitePool) -> anyhow::Result<Vec<Cfd>> {
)
"#
)
.fetch_all(pool)
.fetch_all(conn)
.await?;
// TODO: We might want to separate the database model from the http model and properly map between them
@ -327,11 +317,12 @@ mod tests {
#[tokio::test]
async fn test_insert_and_load_offer() {
let pool = setup_test_db().await;
let mut conn = pool.acquire().await.unwrap();
let cfd_offer = CfdOffer::from_default_with_price(Usd(dec!(10000))).unwrap();
insert_cfd_offer(cfd_offer.clone(), &pool).await.unwrap();
insert_cfd_offer(&cfd_offer, &mut conn).await.unwrap();
let cfd_offer_loaded = load_offer_by_id(cfd_offer.id, &pool).await.unwrap();
let cfd_offer_loaded = load_offer_by_id(cfd_offer.id, &mut conn).await.unwrap();
assert_eq!(cfd_offer, cfd_offer_loaded);
}
@ -339,6 +330,7 @@ mod tests {
#[tokio::test]
async fn test_insert_and_load_cfd() {
let pool = setup_test_db().await;
let mut conn = pool.acquire().await.unwrap();
let cfd_offer = CfdOffer::from_default_with_price(Usd(dec!(10000))).unwrap();
let cfd = Cfd::new(
@ -354,10 +346,10 @@ mod tests {
.unwrap();
// the order ahs to exist in the db in order to be able to insert the cfd
insert_cfd_offer(cfd_offer, &pool).await.unwrap();
insert_cfd(cfd.clone(), &pool).await.unwrap();
insert_cfd_offer(&cfd_offer, &mut conn).await.unwrap();
insert_cfd(cfd.clone(), &mut conn).await.unwrap();
let cfds_from_db = load_all_cfds(&pool).await.unwrap();
let cfds_from_db = load_all_cfds(&mut conn).await.unwrap();
let cfd_from_db = cfds_from_db.first().unwrap().clone();
assert_eq!(cfd, cfd_from_db)
}
@ -365,6 +357,7 @@ mod tests {
#[tokio::test]
async fn test_insert_new_cfd_state() {
let pool = setup_test_db().await;
let mut conn = pool.acquire().await.unwrap();
let cfd_offer = CfdOffer::from_default_with_price(Usd(dec!(10000))).unwrap();
let mut cfd = Cfd::new(
@ -380,17 +373,19 @@ mod tests {
.unwrap();
// the order ahs to exist in the db in order to be able to insert the cfd
insert_cfd_offer(cfd_offer, &pool).await.unwrap();
insert_cfd(cfd.clone(), &pool).await.unwrap();
insert_cfd_offer(&cfd_offer, &mut conn).await.unwrap();
insert_cfd(cfd.clone(), &mut conn).await.unwrap();
cfd.state = CfdState::Accepted {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
};
insert_new_cfd_state(cfd.clone(), &pool).await.unwrap();
insert_new_cfd_state_by_offer_id(cfd.offer_id, cfd.state, &mut conn)
.await
.unwrap();
let cfds_from_db = load_all_cfds(&pool).await.unwrap();
let cfds_from_db = load_all_cfds(&mut conn).await.unwrap();
let cfd_from_db = cfds_from_db.first().unwrap().clone();
assert_eq!(cfd, cfd_from_db)
}
@ -407,7 +402,7 @@ mod tests {
.await
.unwrap();
do_run_migrations(&pool).await.unwrap();
run_migrations(&pool).await.unwrap();
pool
}

17
daemon/src/db/maker.rs

@ -1,17 +0,0 @@
use crate::db::do_run_migrations;
use rocket::{fairing, Build, Rocket};
use rocket_db_pools::{sqlx, Database};
#[derive(Database)]
#[database("maker")]
pub struct Maker(sqlx::SqlitePool);
pub async fn run_migrations(rocket: Rocket<Build>) -> fairing::Result {
match Maker::fetch(&rocket) {
Some(db) => match do_run_migrations(&**db).await {
Ok(_) => Ok(rocket),
Err(_) => Err(rocket),
},
None => Err(rocket),
}
}

17
daemon/src/db/taker.rs

@ -1,17 +0,0 @@
use crate::db::do_run_migrations;
use rocket::{fairing, Build, Rocket};
use rocket_db_pools::{sqlx, Database};
#[derive(Database)]
#[database("taker")]
pub struct Taker(sqlx::SqlitePool);
pub async fn run_migrations(rocket: Rocket<Build>) -> fairing::Result {
match Taker::fetch(&rocket) {
Some(db) => match do_run_migrations(&**db).await {
Ok(_) => Ok(rocket),
Err(_) => Err(rocket),
},
None => Err(rocket),
}
}

9
daemon/src/lib.rs

@ -1,9 +0,0 @@
#[macro_use]
extern crate rocket;
pub mod db;
pub mod model;
pub mod routes_maker;
pub mod routes_taker;
pub mod socket;
pub mod state;

98
daemon/src/maker.rs

@ -0,0 +1,98 @@
use anyhow::Result;
use bdk::bitcoin::Amount;
use model::cfd::{Cfd, CfdOffer};
use rocket::fairing::AdHoc;
use rocket::figment::util::map;
use rocket::figment::value::{Map, Value};
use rocket_db_pools::Database;
use tokio::sync::{mpsc, watch};
mod db;
mod maker_cfd_actor;
mod maker_inc_connections_actor;
mod model;
mod routes_maker;
mod send_wire_message_actor;
mod to_sse_event;
mod wire;
#[derive(Database)]
#[database("maker")]
pub struct Db(sqlx::SqlitePool);
#[rocket::main]
async fn main() -> Result<()> {
let (cfd_feed_sender, cfd_feed_receiver) = watch::channel::<Vec<Cfd>>(vec![]);
let (offer_feed_sender, offer_feed_receiver) = watch::channel::<Option<CfdOffer>>(None);
let (_balance_feed_sender, balance_feed_receiver) = watch::channel::<Amount>(Amount::ONE_BTC);
let db: Map<_, Value> = map! {
"url" => "./maker.sqlite".into(),
};
let figment = rocket::Config::figment()
.merge(("databases", map!["maker" => db]))
.merge(("port", 8001));
let listener = tokio::net::TcpListener::bind("0.0.0.0:9999").await?;
let local_addr = listener.local_addr().unwrap();
println!("Listening on {}", local_addr);
rocket::custom(figment)
.manage(cfd_feed_receiver)
.manage(offer_feed_receiver)
.manage(balance_feed_receiver)
.attach(Db::init())
.attach(AdHoc::try_on_ignite(
"SQL migrations",
|rocket| async move {
match Db::fetch(&rocket) {
Some(db) => match db::run_migrations(&**db).await {
Ok(_) => Ok(rocket),
Err(_) => Err(rocket),
},
None => Err(rocket),
}
},
))
.attach(AdHoc::try_on_ignite("Create actors", |rocket| async move {
let db = match Db::fetch(&rocket) {
Some(db) => (**db).clone(),
None => return Err(rocket),
};
let (connections_actor_inbox_sender, connections_actor_inbox_recv) =
mpsc::unbounded_channel();
let (cfd_maker_actor, cfd_maker_actor_inbox) = maker_cfd_actor::new(
db,
connections_actor_inbox_sender,
cfd_feed_sender,
offer_feed_sender,
);
let connections_actor = maker_inc_connections_actor::new(
listener,
cfd_maker_actor_inbox.clone(),
connections_actor_inbox_recv,
);
tokio::spawn(cfd_maker_actor);
tokio::spawn(connections_actor);
Ok(rocket.manage(cfd_maker_actor_inbox))
}))
.mount(
"/",
rocket::routes![
routes_maker::maker_feed,
routes_maker::post_sell_offer,
// routes_maker::post_confirm_offer,
routes_maker::get_health_check
],
)
.launch()
.await?;
Ok(())
}

145
daemon/src/maker_cfd_actor.rs

@ -0,0 +1,145 @@
use std::time::SystemTime;
use crate::model::cfd::{Cfd, CfdOffer, CfdOfferId, CfdState, CfdStateCommon};
use crate::model::{TakerId, Usd};
use crate::{db, maker_cfd_actor, maker_inc_connections_actor};
use futures::Future;
use rust_decimal_macros::dec;
use tokio::sync::{mpsc, watch};
#[derive(Debug)]
pub enum Command {
TakeOffer {
taker_id: TakerId,
offer_id: CfdOfferId,
quantity: Usd,
},
NewOffer(CfdOffer),
NewTakerOnline {
id: TakerId,
},
}
pub fn new(
db: sqlx::SqlitePool,
takers: mpsc::UnboundedSender<maker_inc_connections_actor::Command>,
cfd_feed_sender: watch::Sender<Vec<Cfd>>,
offer_feed_sender: watch::Sender<Option<CfdOffer>>,
) -> (
impl Future<Output = ()>,
mpsc::UnboundedSender<maker_cfd_actor::Command>,
) {
let (sender, mut receiver) = mpsc::unbounded_channel();
let mut current_offer_id = None;
let actor = async move {
while let Some(message) = receiver.recv().await {
match message {
maker_cfd_actor::Command::TakeOffer {
taker_id,
offer_id,
quantity,
} => {
println!(
"Taker {} wants to take {} of offer {}",
taker_id, quantity, offer_id
);
let mut conn = db.acquire().await.unwrap();
// 1. Validate if offer is still valid
let current_offer = match current_offer_id {
Some(current_offer_id) if current_offer_id == offer_id => {
db::load_offer_by_id(current_offer_id, &mut conn)
.await
.unwrap()
}
_ => {
takers
.send(maker_inc_connections_actor::Command::NotifyInvalidOfferId {
id: offer_id,
taker_id,
})
.unwrap();
continue;
}
};
// 2. Insert CFD in DB
// TODO: Don't auto-accept, present to user in UI instead
let cfd = Cfd::new(
current_offer,
quantity,
CfdState::Accepted {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
},
Usd(dec!(10001)),
)
.unwrap();
db::insert_cfd(cfd, &mut conn).await.unwrap();
takers
.send(maker_inc_connections_actor::Command::NotifyOfferAccepted {
id: offer_id,
taker_id,
})
.unwrap();
cfd_feed_sender
.send(db::load_all_cfds(&mut conn).await.unwrap())
.unwrap();
// 3. Remove current offer
current_offer_id = None;
takers
.send(maker_inc_connections_actor::Command::BroadcastCurrentOffer(
None,
))
.unwrap();
offer_feed_sender.send(None).unwrap();
}
maker_cfd_actor::Command::NewOffer(offer) => {
// 1. Save to DB
let mut conn = db.acquire().await.unwrap();
db::insert_cfd_offer(&offer, &mut conn).await.unwrap();
// 2. Update actor state to current offer
current_offer_id.replace(offer.id);
// 3. Notify UI via feed
offer_feed_sender.send(Some(offer.clone())).unwrap();
// 4. Inform connected takers
takers
.send(maker_inc_connections_actor::Command::BroadcastCurrentOffer(
Some(offer),
))
.unwrap();
}
maker_cfd_actor::Command::NewTakerOnline { id: taker_id } => {
let mut conn = db.acquire().await.unwrap();
let current_offer = match current_offer_id {
Some(current_offer_id) => Some(
db::load_offer_by_id(current_offer_id, &mut conn)
.await
.unwrap(),
),
None => None,
};
takers
.send(maker_inc_connections_actor::Command::SendCurrentOffer {
offer: current_offer,
taker_id,
})
.unwrap();
}
}
}
};
(actor, sender)
}

107
daemon/src/maker_inc_connections_actor.rs

@ -0,0 +1,107 @@
use crate::model::cfd::{CfdOffer, CfdOfferId};
use crate::model::TakerId;
use crate::{maker_cfd_actor, maker_inc_connections_actor, send_wire_message_actor, wire};
use futures::{Future, StreamExt};
use std::collections::HashMap;
use tokio::net::tcp::OwnedReadHalf;
use tokio::net::TcpListener;
use tokio::sync::mpsc;
use tokio_util::codec::{FramedRead, LengthDelimitedCodec};
#[derive(Debug)]
pub enum Command {
BroadcastCurrentOffer(Option<CfdOffer>),
SendCurrentOffer {
offer: Option<CfdOffer>,
taker_id: TakerId,
},
NotifyInvalidOfferId {
id: CfdOfferId,
taker_id: TakerId,
},
NotifyOfferAccepted {
id: CfdOfferId,
taker_id: TakerId,
},
}
pub fn new(
listener: TcpListener,
cfd_maker_actor_inbox: mpsc::UnboundedSender<maker_cfd_actor::Command>,
mut our_inbox: mpsc::UnboundedReceiver<maker_inc_connections_actor::Command>,
) -> impl Future<Output = ()> {
let mut write_connections =
HashMap::<TakerId, mpsc::UnboundedSender<wire::MakerToTaker>>::new();
async move {
loop {
tokio::select! {
Ok((socket, remote_addr)) = listener.accept() => {
println!("Connected to {}", remote_addr);
let taker_id = TakerId::default();
let (read, write) = socket.into_split();
let in_taker_actor = in_taker_messages(read, cfd_maker_actor_inbox.clone(), taker_id);
let (out_message_actor, out_message_actor_inbox) = send_wire_message_actor::new(write);
tokio::spawn(in_taker_actor);
tokio::spawn(out_message_actor);
cfd_maker_actor_inbox.send(maker_cfd_actor::Command::NewTakerOnline{id : taker_id}).unwrap();
write_connections.insert(taker_id, out_message_actor_inbox);
},
Some(message) = our_inbox.recv() => {
match message {
maker_inc_connections_actor::Command::NotifyInvalidOfferId { id, taker_id } => {
let conn = write_connections.get(&taker_id).expect("no connection to taker_id");
conn.send(wire::MakerToTaker::InvalidOfferId(id)).unwrap();
},
maker_inc_connections_actor::Command::BroadcastCurrentOffer(offer) => {
for conn in write_connections.values() {
conn.send(wire::MakerToTaker::CurrentOffer(offer.clone())).unwrap();
}
},
maker_inc_connections_actor::Command::SendCurrentOffer {offer, taker_id} => {
let conn = write_connections.get(&taker_id).expect("no connection to taker_id");
conn.send(wire::MakerToTaker::CurrentOffer(offer)).unwrap();
},
maker_inc_connections_actor::Command::NotifyOfferAccepted { id, taker_id } => {
let conn = write_connections.get(&taker_id).expect("no connection to taker_id");
conn.send(wire::MakerToTaker::ConfirmTakeOffer(id)).unwrap();
},
}
}
}
}
}
}
fn in_taker_messages(
read: OwnedReadHalf,
cfd_actor_inbox: mpsc::UnboundedSender<maker_cfd_actor::Command>,
taker_id: TakerId,
) -> impl Future<Output = ()> {
let mut messages = FramedRead::new(read, LengthDelimitedCodec::new()).map(|result| {
let message = serde_json::from_slice::<wire::TakerToMaker>(&result?)?;
anyhow::Result::<_>::Ok(message)
});
async move {
while let Some(message) = messages.next().await {
match message {
Ok(wire::TakerToMaker::TakeOffer { offer_id, quantity }) => cfd_actor_inbox
.send(maker_cfd_actor::Command::TakeOffer {
taker_id,
offer_id,
quantity,
})
.unwrap(),
Ok(wire::TakerToMaker::StartContractSetup(_offer_id)) => {}
Err(error) => {
eprintln!("Error in reading message: {}", error);
}
}
}
}
}

25
daemon/src/model.rs

@ -1,6 +1,10 @@
use std::fmt::{Display, Formatter};
use rust_decimal::Decimal;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
pub mod cfd;
#[derive(Debug, Copy, Clone, Serialize, Deserialize, PartialEq)]
@ -10,9 +14,9 @@ impl Usd {
pub const ZERO: Self = Self(Decimal::ZERO);
}
impl Usd {
pub fn to_sat_precision(&self) -> Self {
Self(self.0.round_dp(8))
impl Display for Usd {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
@ -29,3 +33,18 @@ pub enum Position {
Buy,
Sell,
}
#[derive(Debug, Copy, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
pub struct TakerId(Uuid);
impl Default for TakerId {
fn default() -> Self {
Self(Uuid::new_v4())
}
}
impl Display for TakerId {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}

62
daemon/src/model/cfd.rs

@ -1,15 +1,13 @@
use std::fmt::{Display, Formatter};
use std::time::{Duration, SystemTime};
use crate::model::{Leverage, Position, TradingPair, Usd};
use anyhow::{Context, Result};
use bdk::bitcoin::Amount;
use rust_decimal::Decimal;
use rust_decimal_macros::dec;
use serde::{Deserialize, Serialize};
use std::fmt::{Display, Formatter};
use std::time::{Duration, SystemTime};
use uuid::Uuid;
use crate::model::{Leverage, Position, TradingPair, Usd};
#[derive(Debug, Copy, Clone, Serialize, Deserialize, PartialEq)]
pub struct CfdOfferId(Uuid);
@ -49,6 +47,7 @@ pub struct CfdOffer {
pub term: Duration,
}
#[allow(dead_code)] // Only one binary and the tests use this.
impl CfdOffer {
pub fn from_default_with_price(price: Usd) -> Result<Self> {
let leverage = Leverage(5);
@ -69,13 +68,6 @@ impl CfdOffer {
term: Duration::from_secs(60 * 60 * 8), // 8 hours
})
}
/// FIXME: For quick prototyping, remove when price is known
pub fn from_default_with_dummy_price() -> Result<Self> {
let price = Usd(dec!(49_000));
Self::from_default_with_price(price)
}
pub fn with_min_quantity(mut self, min_quantity: Usd) -> CfdOffer {
self.min_quantity = min_quantity;
self
@ -206,26 +198,26 @@ pub enum CfdState {
}
impl CfdState {
fn get_common(&self) -> CfdStateCommon {
let common = match self {
CfdState::TakeRequested { common } => common,
CfdState::PendingTakeRequest { common } => common,
CfdState::Accepted { common } => common,
CfdState::Rejected { common } => common,
CfdState::ContractSetup { common } => common,
CfdState::Open { common, .. } => common,
CfdState::CloseRequested { common } => common,
CfdState::PendingClose { common } => common,
CfdState::Closed { common } => common,
CfdState::Error { common } => common,
};
*common
}
pub fn get_transition_timestamp(&self) -> SystemTime {
self.get_common().transition_timestamp
}
// fn get_common(&self) -> CfdStateCommon {
// let common = match self {
// CfdState::TakeRequested { common } => common,
// CfdState::PendingTakeRequest { common } => common,
// CfdState::Accepted { common } => common,
// CfdState::Rejected { common } => common,
// CfdState::ContractSetup { common } => common,
// CfdState::Open { common, .. } => common,
// CfdState::CloseRequested { common } => common,
// CfdState::PendingClose { common } => common,
// CfdState::Closed { common } => common,
// CfdState::Error { common } => common,
// };
// *common
// }
// pub fn get_transition_timestamp(&self) -> SystemTime {
// self.get_common().transition_timestamp
// }
}
impl Display for CfdState {
@ -323,11 +315,9 @@ fn calculate_profit(
#[cfg(test)]
mod tests {
use std::time::UNIX_EPOCH;
use rust_decimal_macros::dec;
use super::*;
use rust_decimal_macros::dec;
use std::time::UNIX_EPOCH;
#[test]
fn given_default_values_then_expected_liquidation_price() {

264
daemon/src/routes_maker.rs

@ -1,55 +1,17 @@
use std::time::SystemTime;
use crate::maker_cfd_actor;
use crate::model::cfd::{Cfd, CfdNewOfferRequest, CfdOffer};
use crate::to_sse_event::ToSseEvent;
use anyhow::Result;
use bdk::bitcoin::Amount;
use futures::stream::SelectAll;
use futures::StreamExt;
use rocket::fairing::AdHoc;
use rocket::figment::util::map;
use rocket::figment::value::{Map, Value};
use rocket::response::status;
use rocket::response::stream::{Event, EventStream};
use rocket::response::stream::EventStream;
use rocket::serde::json::Json;
use rocket::State;
use rocket_db_pools::{Connection, Database};
use rust_decimal_macros::dec;
use tokio::select;
use tokio::sync::{mpsc, watch};
use tokio_util::codec::{FramedRead, LengthDelimitedCodec};
use uuid::Uuid;
use crate::db;
use crate::model::cfd::{
Cfd, CfdNewOfferRequest, CfdOffer, CfdState, CfdStateCommon, CfdTakeRequest,
};
use crate::model::Usd;
use crate::socket::*;
use crate::state::maker::{maker_do_something, Command};
trait ToSseEvent {
fn to_sse_event(&self) -> Event;
}
impl ToSseEvent for Vec<Cfd> {
fn to_sse_event(&self) -> Event {
Event::json(self).event("cfds")
}
}
impl ToSseEvent for Option<CfdOffer> {
fn to_sse_event(&self) -> Event {
Event::json(self).event("offer")
}
}
impl ToSseEvent for Amount {
fn to_sse_event(&self) -> Event {
Event::json(&self.as_btc()).event("balance")
}
}
#[get("/maker-feed")]
async fn maker_feed(
#[rocket::get("/maker-feed")]
pub async fn maker_feed(
rx_cfds: &State<watch::Receiver<Vec<Cfd>>>,
rx_offer: &State<watch::Receiver<Option<CfdOffer>>>,
rx_balance: &State<watch::Receiver<Amount>>,
@ -87,201 +49,47 @@ async fn maker_feed(
}
}
#[post("/offer/sell", data = "<cfd_confirm_offer_request>")]
async fn post_sell_offer(
cfd_confirm_offer_request: Json<CfdNewOfferRequest>,
queue: &State<mpsc::Sender<CfdOffer>>,
#[rocket::post("/offer/sell", data = "<offer>")]
pub async fn post_sell_offer(
offer: Json<CfdNewOfferRequest>,
cfd_actor_inbox: &State<mpsc::UnboundedSender<maker_cfd_actor::Command>>,
) -> Result<status::Accepted<()>, status::BadRequest<String>> {
let offer = CfdOffer::from_default_with_price(cfd_confirm_offer_request.price)
let offer = CfdOffer::from_default_with_price(offer.price)
.map_err(|e| status::BadRequest(Some(e.to_string())))?
.with_min_quantity(cfd_confirm_offer_request.min_quantity)
.with_max_quantity(cfd_confirm_offer_request.max_quantity);
.with_min_quantity(offer.min_quantity)
.with_max_quantity(offer.max_quantity);
let _res = queue
.send(offer)
.await
.map_err(|_| status::BadRequest(Some("internal server error".to_string())))?;
cfd_actor_inbox
.send(maker_cfd_actor::Command::NewOffer(offer))
.expect("actor to always be available");
Ok(status::Accepted(None))
}
// TODO: Shall we use a simpler struct for verification? AFAICT quantity is not
// needed, no need to send the whole CFD either as the other fields can be generated from the offer
#[post("/offer/confirm", data = "<cfd_confirm_offer_request>")]
async fn post_confirm_offer(
cfd_confirm_offer_request: Json<CfdTakeRequest>,
queue: &State<mpsc::Sender<CfdOffer>>,
mut conn: Connection<db::maker::Maker>,
) -> Result<status::Accepted<()>, status::BadRequest<String>> {
dbg!(&cfd_confirm_offer_request);
// // TODO: Shall we use a simpler struct for verification? AFAICT quantity is not
// // needed, no need to send the whole CFD either as the other fields can be generated from the offer
// #[rocket::post("/offer/confirm", data = "<cfd_confirm_offer_request>")]
// pub async fn post_confirm_offer(
// cfd_confirm_offer_request: Json<CfdTakeRequest>,
// queue: &State<mpsc::Sender<CfdOffer>>,
// mut conn: Connection<Db>,
// ) -> Result<status::Accepted<()>, status::BadRequest<String>> {
// dbg!(&cfd_confirm_offer_request);
let offer = db::load_offer_by_id_from_conn(cfd_confirm_offer_request.offer_id, &mut conn)
.await
.map_err(|e| status::BadRequest(Some(e.to_string())))?;
// let offer = db::load_offer_by_id_from_conn(cfd_confirm_offer_request.offer_id, &mut conn)
// .await
// .map_err(|e| status::BadRequest(Some(e.to_string())))?;
let _res = queue
.send(offer)
.await
.map_err(|_| status::BadRequest(Some("internal server error".to_string())))?;
// let _res = queue
// .send(offer)
// .await
// .map_err(|_| status::BadRequest(Some("internal server error".to_string())))?;
Ok(status::Accepted(None))
}
// Ok(status::Accepted(None))
// }
#[get("/alive")]
fn get_health_check() {}
#[rocket::get("/alive")]
pub fn get_health_check() {}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
struct RetrieveCurrentOffer;
pub async fn start_http() -> Result<()> {
let (cfd_feed_sender, cfd_feed_receiver) = watch::channel::<Vec<Cfd>>(vec![]);
let (offer_feed_sender, offer_feed_receiver) = watch::channel::<Option<CfdOffer>>(None);
let (_balance_feed_sender, balance_feed_receiver) = watch::channel::<Amount>(Amount::ONE_BTC);
let (new_cfd_offer_sender, mut new_cfd_offer_receiver) = mpsc::channel::<CfdOffer>(1024);
let (db_command_sender, db_command_receiver) = mpsc::channel::<Command>(1024);
// init the CFD feed, this will be picked up by the receiver managed by rocket once started
db_command_sender
.send(Command::RefreshCfdFeed)
.await
.unwrap();
let listener = tokio::net::TcpListener::bind("0.0.0.0:9999").await?;
let local_addr = listener.local_addr().unwrap();
println!("Listening on {}", local_addr);
tokio::spawn({
let db_command_sender = db_command_sender.clone();
let offer_feed_receiver = offer_feed_receiver.clone();
async move {
let mut read_connections = SelectAll::new();
let mut write_connections = std::collections::HashMap::new();
loop {
select! {
Ok((socket, remote_addr)) = listener.accept() => {
println!("Connected to {}", remote_addr);
let uuid = Uuid::new_v4();
let (read, write) = socket.into_split();
let messages = FramedRead::new(read, LengthDelimitedCodec::new())
.map(|result| {
let message = serde_json::from_slice::<Message>(&result?)?;
anyhow::Result::<_>::Ok(message)
})
.map(move |message_result| (uuid, remote_addr, message_result));
read_connections.push(messages);
let sender = spawn_sender(write);
if let Some(latest_offer) = &*offer_feed_receiver.borrow() {
sender.send(Message::CurrentOffer(Some(latest_offer.clone()))).expect("Could not communicate with taker");
}
write_connections.insert(uuid, sender);
},
Some(cfd_offer) = new_cfd_offer_receiver.recv() => {
db_command_sender.send(Command::SaveOffer(cfd_offer.clone())).await.unwrap();
offer_feed_sender.send(Some(cfd_offer.clone())).unwrap();
for sender in write_connections.values() {
sender.send(Message::CurrentOffer(Some(cfd_offer.clone()))).expect("Could not communicate with taker");
}
},
Some((uuid, _peer, message)) = read_connections.next() => {
match message {
Ok(Message::TakeOffer(cfd_take_request)) => {
println!("Received a CFD offer take request {:?}", cfd_take_request);
if offer_feed_receiver.borrow().as_ref().is_none() {
eprintln!("Maker has no current offer anymore - can't handle a take request");
return;
}
let current_offer = offer_feed_receiver.borrow().as_ref().unwrap().clone();
assert_eq!(current_offer.id, cfd_take_request.offer_id, "You can only confirm the current offer");
let cfd = Cfd::new(current_offer, cfd_take_request.quantity, CfdState::PendingTakeRequest{
common: CfdStateCommon {
transition_timestamp: SystemTime::now()
},
},
Usd(dec!(10001))).unwrap();
db_command_sender.send(Command::SaveCfd(cfd.clone())).await.unwrap();
// FIXME: Use a button on the CFD tile to
// confirm instead of auto-confirmation
write_connections.get(&uuid).expect("taker to still online")
.send(Message::ConfirmTakeOffer(cfd_take_request.offer_id)).unwrap();
db_command_sender.send(Command::SaveNewCfdStateByOfferId(cfd.offer_id, CfdState::Accepted { common: CfdStateCommon { transition_timestamp: SystemTime::now()}})).await.unwrap();
// Remove the current offer as it got accepted
offer_feed_sender.send(None).unwrap();
for sender in write_connections.values() {
sender.send(Message::CurrentOffer(None)).expect("Could not communicate with taker");
}
},
Ok(Message::StartContractSetup(offer_id)) => {
db_command_sender.send(Command::SaveNewCfdStateByOfferId(offer_id, CfdState::ContractSetup { common: CfdStateCommon { transition_timestamp: SystemTime::now()}})).await.unwrap();
db_command_sender.send(Command::RefreshCfdFeed).await.unwrap();
}
Ok(Message::CurrentOffer(_)) => {
panic!("Maker should not receive current offer");
},
Ok(Message::ConfirmTakeOffer(_)) => {
panic!("Maker should not receive offer confirmations");
},
Err(error) => {
eprintln!("Error in reading message: {}", error );
}
}
}
}
}
}
});
let db: Map<_, Value> = map! {
"url" => "./maker.sqlite".into(),
};
let figment = rocket::Config::figment()
.merge(("databases", map!["maker" => db]))
.merge(("port", 8001));
rocket::custom(figment)
.manage(cfd_feed_receiver)
.manage(offer_feed_receiver)
.manage(new_cfd_offer_sender)
.manage(balance_feed_receiver)
.manage(db_command_sender)
.attach(db::maker::Maker::init())
.attach(AdHoc::try_on_ignite(
"SQL migrations",
db::maker::run_migrations,
))
.attach(AdHoc::try_on_ignite("send command to the db", |rocket| {
maker_do_something(rocket, db_command_receiver, cfd_feed_sender)
}))
.mount(
"/",
routes![
maker_feed,
post_sell_offer,
post_confirm_offer,
get_health_check
],
)
.launch()
.await?;
Ok(())
}

234
daemon/src/routes_taker.rs

@ -1,50 +1,15 @@
use std::time::SystemTime;
use anyhow::Result;
use crate::model::cfd::{Cfd, CfdOffer, CfdTakeRequest};
use crate::taker_cfd_actor;
use crate::to_sse_event::ToSseEvent;
use bdk::bitcoin::Amount;
use futures::StreamExt;
use rocket::fairing::AdHoc;
use rocket::figment::util::map;
use rocket::figment::value::{Map, Value};
use rocket::response::stream::{Event, EventStream};
use rocket::response::stream::EventStream;
use rocket::serde::json::Json;
use rocket::State;
use rocket_db_pools::{Connection, Database};
use rust_decimal::Decimal;
use tokio::select;
use tokio::sync::{mpsc, watch};
use tokio_util::codec::{FramedRead, LengthDelimitedCodec};
use crate::db;
use crate::model::cfd::{Cfd, CfdOffer, CfdState, CfdStateCommon, CfdTakeRequest};
use crate::model::{Position, Usd};
use crate::socket::*;
use crate::state::taker::{hey_db_do_something, Command};
trait ToSseEvent {
fn to_sse_event(&self) -> Event;
}
impl ToSseEvent for Vec<Cfd> {
fn to_sse_event(&self) -> Event {
Event::json(self).event("cfds")
}
}
impl ToSseEvent for Option<CfdOffer> {
fn to_sse_event(&self) -> Event {
Event::json(self).event("offer")
}
}
impl ToSseEvent for Amount {
fn to_sse_event(&self) -> Event {
Event::json(&self.as_btc()).event("balance")
}
}
#[get("/feed")]
async fn feed(
#[rocket::get("/feed")]
pub async fn feed(
rx_cfds: &State<watch::Receiver<Vec<Cfd>>>,
rx_offer: &State<watch::Receiver<Option<CfdOffer>>>,
rx_balance: &State<watch::Receiver<Amount>>,
@ -82,183 +47,18 @@ async fn feed(
}
}
#[post("/cfd", data = "<cfd_take_request>")]
async fn post_cfd(
#[rocket::post("/cfd", data = "<cfd_take_request>")]
pub async fn post_cfd(
cfd_take_request: Json<CfdTakeRequest>,
cfd_sender: &State<mpsc::Sender<Cfd>>,
db_command_sender: &State<mpsc::Sender<Command>>,
mut conn: Connection<db::taker::Taker>,
cfd_actor_inbox: &State<mpsc::UnboundedSender<taker_cfd_actor::Command>>,
) {
let current_offer = db::load_offer_by_id_from_conn(cfd_take_request.offer_id, &mut conn)
.await
.unwrap();
println!("Accepting current offer: {:?}", &current_offer);
let cfd = Cfd {
offer_id: current_offer.id,
initial_price: current_offer.price,
leverage: current_offer.leverage,
trading_pair: current_offer.trading_pair,
liquidation_price: current_offer.liquidation_price,
position: Position::Buy,
quantity_usd: cfd_take_request.quantity,
profit_btc: Amount::ZERO,
profit_usd: Usd(Decimal::ZERO),
state: CfdState::TakeRequested {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
},
};
db_command_sender
.send(Command::SaveCfd(cfd.clone()))
.await
.unwrap();
// TODO: remove unwrap
cfd_sender.send(cfd).await.unwrap();
cfd_actor_inbox
.send(taker_cfd_actor::Command::TakeOffer {
offer_id: cfd_take_request.offer_id,
quantity: cfd_take_request.quantity,
})
.expect("actor to never disappear");
}
#[get("/alive")]
fn get_health_check() {}
pub async fn start_http() -> Result<()> {
let (cfd_feed_sender, cfd_feed_receiver) = watch::channel::<Vec<Cfd>>(vec![]);
let (offer_feed_sender, offer_feed_receiver) = watch::channel::<Option<CfdOffer>>(None);
let (_balance_feed_sender, balance_feed_receiver) = watch::channel::<Amount>(Amount::ONE_BTC);
let (take_cfd_sender, mut take_cfd_receiver) = mpsc::channel::<Cfd>(1024);
let (db_command_sender, db_command_receiver) = mpsc::channel::<Command>(1024);
// init the CFD feed, this will be picked up by the receiver managed by rocket once started
db_command_sender
.send(Command::RefreshCfdFeed)
.await
.unwrap();
let socket = tokio::net::TcpSocket::new_v4().unwrap();
let connection = socket
.connect("0.0.0.0:9999".parse().unwrap())
.await
.expect("Maker should be online first");
let (read, write) = connection.into_split();
tokio::spawn({
let db_command_sender = db_command_sender.clone();
let mut cfd_feed_receiver = cfd_feed_receiver.clone();
async move {
let frame_read = FramedRead::new(read, LengthDelimitedCodec::new());
let mut messages = frame_read.map(|result| {
let message = serde_json::from_slice::<Message>(&result?)?;
anyhow::Result::<_>::Ok(message)
});
let sender = spawn_sender(write);
loop {
select! {
Some(cfd) = take_cfd_receiver.recv() => {
sender.send(Message::TakeOffer(CfdTakeRequest { offer_id : cfd.offer_id, quantity : cfd.quantity_usd})).unwrap();
let cfd_with_new_state = Cfd {
state : CfdState::PendingTakeRequest {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
},
..cfd
};
db_command_sender.send(Command::SaveNewCfdState(cfd_with_new_state)).await.unwrap();
},
Some(message) = messages.next() => {
match message {
Ok(Message::TakeOffer(_)) => {
eprintln!("Taker should not receive take requests");
},
Ok(Message::CurrentOffer(offer)) => {
if let Some(offer) = &offer {
println!("Received new offer from the maker: {:?}", offer );
db_command_sender.send(Command::SaveOffer(offer.clone())).await.unwrap();
}
else {
println!("Maker does not have an offer anymore");
}
offer_feed_sender.send(offer).unwrap();
},
Ok(Message::StartContractSetup(_)) => {
eprintln!("Taker should not receive start contract setup message as the taker sends it");
}
Ok(Message::ConfirmTakeOffer(offer_id)) => {
println!("The maker has accepted your take request for offer: {:?}", offer_id );
let new_state : CfdState=
CfdState::Accepted {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
};
db_command_sender.send(Command::SaveNewCfdStateByOfferId(offer_id, new_state)).await.unwrap();
},
Err(error) => {
eprintln!("Error in reading message: {}", error );
}
}
},
Ok(()) = cfd_feed_receiver.changed() => {
let cfds = cfd_feed_receiver.borrow().clone();
let to_be_accepted = cfds.into_iter().filter(|by| matches!(by.state, CfdState::Accepted {..})).collect::<Vec<Cfd>>();
for mut cfd in to_be_accepted {
let new_state : CfdState=
CfdState::ContractSetup {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
};
cfd.state = new_state;
db_command_sender.send(Command::SaveNewCfdState(cfd)).await.unwrap();
// TODO: Send message to Maker for contract setup (and transition Maker into that state upon receiving the message)
}
}
}
}
}
});
let db: Map<_, Value> = map! {
"url" => "./taker.sqlite".into(),
};
let figment = rocket::Config::figment().merge(("databases", map!["taker" => db]));
rocket::custom(figment)
.manage(offer_feed_receiver)
.manage(cfd_feed_receiver)
.manage(take_cfd_sender)
.manage(balance_feed_receiver)
.manage(db_command_sender)
.attach(db::taker::Taker::init())
.attach(AdHoc::try_on_ignite(
"SQL migrations",
db::taker::run_migrations,
))
.attach(AdHoc::try_on_ignite("send command to the db", |rocket| {
hey_db_do_something(rocket, db_command_receiver, cfd_feed_sender)
}))
.mount("/", routes![feed, post_cfd, get_health_check])
.launch()
.await?;
Ok(())
}
#[rocket::get("/alive")]
pub fn get_health_check() {}

31
daemon/src/send_wire_message_actor.rs

@ -0,0 +1,31 @@
use futures::{Future, SinkExt};
use serde::Serialize;
use tokio::net::tcp::OwnedWriteHalf;
use tokio::sync::mpsc;
use tokio_util::codec::{FramedWrite, LengthDelimitedCodec};
pub fn new<T>(write: OwnedWriteHalf) -> (impl Future<Output = ()>, mpsc::UnboundedSender<T>)
where
T: Serialize,
{
let (sender, mut receiver) = mpsc::unbounded_channel::<T>();
let actor = async move {
let mut framed_write = FramedWrite::new(write, LengthDelimitedCodec::new());
while let Some(message) = receiver.recv().await {
match framed_write
.send(serde_json::to_vec(&message).unwrap().into())
.await
{
Ok(_) => {}
Err(_) => {
eprintln!("TCP connection error");
break;
}
}
}
};
(actor, sender)
}

41
daemon/src/socket.rs

@ -1,41 +0,0 @@
use serde::{Deserialize, Serialize};
use tokio::net::tcp::OwnedWriteHalf;
use crate::model::cfd::{CfdOffer, CfdOfferId, CfdTakeRequest};
use futures::SinkExt;
use tokio::sync::mpsc::UnboundedSender;
use tokio_util::codec::{FramedWrite, LengthDelimitedCodec};
// TODO: Implement messages used for confirming CFD offer between maker and taker
#[derive(Debug, Serialize, Deserialize)]
#[serde(tag = "type", content = "payload")]
pub enum Message {
CurrentOffer(Option<CfdOffer>),
TakeOffer(CfdTakeRequest),
// TODO: Needs RejectOffer as well
ConfirmTakeOffer(CfdOfferId),
// TODO: Currently the taker starts, can already send some stuff for signing over in the first message.
StartContractSetup(CfdOfferId),
}
pub fn spawn_sender(write: OwnedWriteHalf) -> UnboundedSender<Message> {
let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel::<Message>();
tokio::spawn(async move {
let mut framed_write = FramedWrite::new(write, LengthDelimitedCodec::new());
while let Some(message) = receiver.recv().await {
match framed_write
.send(serde_json::to_vec(&message).unwrap().into())
.await
{
Ok(_) => {}
Err(_) => {
eprintln!("TCP connection error");
break;
}
}
}
});
sender
}

56
daemon/src/state.rs

@ -1,56 +0,0 @@
use crate::db::{
insert_cfd, insert_cfd_offer, insert_new_cfd_state, insert_new_cfd_state_by_offer_id,
load_all_cfds, load_offer_by_id,
};
use crate::model::cfd::{Cfd, CfdOffer, CfdOfferId, CfdState};
use tokio::sync::watch;
pub mod maker;
pub mod taker;
#[derive(Debug)]
pub enum Command {
SaveOffer(CfdOffer),
SaveCfd(Cfd),
SaveNewCfdState(Cfd),
SaveNewCfdStateByOfferId(CfdOfferId, CfdState),
RefreshCfdFeed,
}
pub async fn handle_command(
db: &sqlx::SqlitePool,
command: Command,
cfd_feed_sender: &watch::Sender<Vec<Cfd>>,
) -> anyhow::Result<()> {
println!("Handle command: {:?}", command);
match command {
Command::SaveOffer(cfd_offer) => {
// Only save offer when it wasn't already saved (e.g. taker
// can see the same "latest" offer when it comes back online)
if let Ok(offer) = load_offer_by_id(cfd_offer.id, db).await {
println!("Offer with id {:?} already stored in the db.", offer.id);
} else {
insert_cfd_offer(cfd_offer, db).await?
}
}
Command::SaveCfd(cfd) => {
insert_cfd(cfd, db).await?;
cfd_feed_sender.send(load_all_cfds(db).await?)?;
}
Command::SaveNewCfdState(cfd) => {
insert_new_cfd_state(cfd, db).await?;
cfd_feed_sender.send(load_all_cfds(db).await?)?;
}
Command::SaveNewCfdStateByOfferId(offer_id, state) => {
insert_new_cfd_state_by_offer_id(offer_id, state, db).await?;
cfd_feed_sender.send(load_all_cfds(db).await?)?;
}
Command::RefreshCfdFeed => {
cfd_feed_sender.send(load_all_cfds(db).await?)?;
}
}
Ok(())
}

65
daemon/src/state/maker.rs

@ -1,65 +0,0 @@
use crate::model::cfd::{Cfd, CfdOffer, CfdOfferId, CfdState};
use crate::state;
use crate::state::handle_command;
use rocket::{fairing, Build, Rocket};
use rocket_db_pools::Database;
use std::convert::{TryFrom, TryInto};
use tokio::sync::{mpsc, watch};
#[derive(Debug, Clone)]
pub enum Command {
SaveOffer(CfdOffer),
SaveCfd(Cfd),
SaveNewCfdState(Cfd),
SaveNewCfdStateByOfferId(CfdOfferId, CfdState),
/// All other commands that are interacting with Cfds perform a refresh
/// automatically - as such, RefreshCfdFeed should be used only on init
RefreshCfdFeed,
}
pub struct NotSharedCommand;
impl TryFrom<Command> for state::Command {
type Error = NotSharedCommand;
fn try_from(value: Command) -> Result<Self, Self::Error> {
let command = match value {
Command::SaveOffer(offer) => state::Command::SaveOffer(offer),
Command::SaveCfd(cfd) => state::Command::SaveCfd(cfd),
Command::SaveNewCfdState(cfd) => state::Command::SaveNewCfdState(cfd),
Command::SaveNewCfdStateByOfferId(uuid, cfd_state) => {
state::Command::SaveNewCfdStateByOfferId(uuid, cfd_state)
}
Command::RefreshCfdFeed => state::Command::RefreshCfdFeed,
};
Ok(command)
}
}
// TODO write more rocket wrapper functions to insert into db and create long-running tasks + channels handling the insert
pub async fn maker_do_something(
rocket: Rocket<Build>,
mut db_command_receiver: mpsc::Receiver<Command>,
cfd_feed_sender: watch::Sender<Vec<Cfd>>,
) -> fairing::Result {
let db = match crate::db::maker::Maker::fetch(&rocket) {
Some(db) => (**db).clone(),
None => return Err(rocket),
};
tokio::spawn(async move {
while let Some(command) = db_command_receiver.recv().await {
match command.clone().try_into() {
Ok(shared_command) => {
handle_command(&db, shared_command, &cfd_feed_sender)
.await
.unwrap();
}
Err(_) => unreachable!("currently there are only shared commands"),
}
}
});
Ok(rocket)
}

64
daemon/src/state/taker.rs

@ -1,64 +0,0 @@
use crate::model::cfd::{Cfd, CfdOffer, CfdOfferId, CfdState};
use crate::state;
use crate::state::handle_command;
use rocket::{fairing, Build, Rocket};
use rocket_db_pools::Database;
use std::convert::{TryFrom, TryInto};
use tokio::sync::{mpsc, watch};
#[derive(Debug, Clone)]
pub enum Command {
SaveOffer(CfdOffer),
SaveCfd(Cfd),
SaveNewCfdState(Cfd),
SaveNewCfdStateByOfferId(CfdOfferId, CfdState),
/// All other commands that are interacting with Cfds perform a refresh
/// automatically - as such, RefreshCfdFeed should be used only on init
RefreshCfdFeed,
}
pub struct CommandError;
impl TryFrom<Command> for state::Command {
type Error = CommandError;
fn try_from(value: Command) -> Result<Self, Self::Error> {
let command = match value {
Command::SaveOffer(offer) => state::Command::SaveOffer(offer),
Command::SaveCfd(cfd) => state::Command::SaveCfd(cfd),
Command::SaveNewCfdState(cfd) => state::Command::SaveNewCfdState(cfd),
Command::SaveNewCfdStateByOfferId(uuid, cfd_state) => {
state::Command::SaveNewCfdStateByOfferId(uuid, cfd_state)
}
Command::RefreshCfdFeed => state::Command::RefreshCfdFeed,
};
Ok(command)
}
}
pub async fn hey_db_do_something(
rocket: Rocket<Build>,
mut db_command_receiver: mpsc::Receiver<Command>,
cfd_feed_sender: watch::Sender<Vec<Cfd>>,
) -> fairing::Result {
let db = match crate::db::taker::Taker::fetch(&rocket) {
Some(db) => (**db).clone(),
None => return Err(rocket),
};
tokio::spawn(async move {
while let Some(command) = db_command_receiver.recv().await {
match command.clone().try_into() {
Ok(shared_command) => {
handle_command(&db, shared_command, &cfd_feed_sender)
.await
.unwrap();
}
Err(_) => unreachable!("currently there are only shared commands"),
}
}
});
Ok(rocket)
}

95
daemon/src/taker.rs

@ -0,0 +1,95 @@
use anyhow::Result;
use bdk::bitcoin::Amount;
use model::cfd::{Cfd, CfdOffer};
use rocket::fairing::AdHoc;
use rocket::figment::util::map;
use rocket::figment::value::{Map, Value};
use rocket_db_pools::Database;
use tokio::sync::watch;
mod db;
mod model;
mod routes_taker;
mod send_wire_message_actor;
mod taker_cfd_actor;
mod taker_inc_message_actor;
mod to_sse_event;
mod wire;
#[derive(Database)]
#[database("taker")]
pub struct Db(sqlx::SqlitePool);
#[rocket::main]
async fn main() -> Result<()> {
let (cfd_feed_sender, cfd_feed_receiver) = watch::channel::<Vec<Cfd>>(vec![]);
let (offer_feed_sender, offer_feed_receiver) = watch::channel::<Option<CfdOffer>>(None);
let (_balance_feed_sender, balance_feed_receiver) = watch::channel::<Amount>(Amount::ONE_BTC);
let socket = tokio::net::TcpSocket::new_v4().unwrap();
let connection = socket
.connect("0.0.0.0:9999".parse().unwrap())
.await
.expect("Maker should be online first");
let (read, write) = connection.into_split();
let db: Map<_, Value> = map! {
"url" => "./taker.sqlite".into(),
};
let figment = rocket::Config::figment().merge(("databases", map!["taker" => db]));
rocket::custom(figment)
.manage(cfd_feed_receiver)
.manage(offer_feed_receiver)
.manage(balance_feed_receiver)
.attach(Db::init())
.attach(AdHoc::try_on_ignite(
"SQL migrations",
|rocket| async move {
match Db::fetch(&rocket) {
Some(db) => match db::run_migrations(&**db).await {
Ok(_) => Ok(rocket),
Err(_) => Err(rocket),
},
None => Err(rocket),
}
},
))
.attach(AdHoc::try_on_ignite("Create actors", |rocket| async move {
let db = match Db::fetch(&rocket) {
Some(db) => (**db).clone(),
None => return Err(rocket),
};
let (out_maker_messages_actor, out_maker_actor_inbox) =
send_wire_message_actor::new(write);
let (cfd_actor, cfd_actor_inbox) = taker_cfd_actor::new(
db,
cfd_feed_sender,
offer_feed_sender,
out_maker_actor_inbox,
);
let inc_maker_messages_actor =
taker_inc_message_actor::new(read, cfd_actor_inbox.clone());
tokio::spawn(cfd_actor);
tokio::spawn(inc_maker_messages_actor);
tokio::spawn(out_maker_messages_actor);
Ok(rocket.manage(cfd_actor_inbox))
}))
.mount(
"/",
rocket::routes![
routes_taker::feed,
routes_taker::post_cfd,
routes_taker::get_health_check
],
)
.launch()
.await?;
Ok(())
}

89
daemon/src/taker_cfd_actor.rs

@ -0,0 +1,89 @@
use crate::model::cfd::{Cfd, CfdOffer, CfdOfferId, CfdState, CfdStateCommon};
use crate::model::Usd;
use crate::{db, wire};
use futures::Future;
use std::time::SystemTime;
use tokio::sync::{mpsc, watch};
#[derive(Debug)]
pub enum Command {
TakeOffer { offer_id: CfdOfferId, quantity: Usd },
NewOffer(Option<CfdOffer>),
OfferAccepted(CfdOfferId),
}
pub fn new(
db: sqlx::SqlitePool,
cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>,
offer_feed_actor_inbox: watch::Sender<Option<CfdOffer>>,
out_msg_maker_inbox: mpsc::UnboundedSender<wire::TakerToMaker>,
) -> (impl Future<Output = ()>, mpsc::UnboundedSender<Command>) {
let (sender, mut receiver) = mpsc::unbounded_channel();
let actor = async move {
while let Some(message) = receiver.recv().await {
match message {
Command::TakeOffer { offer_id, quantity } => {
let mut conn = db.acquire().await.unwrap();
let current_offer = db::load_offer_by_id(offer_id, &mut conn).await.unwrap();
println!("Accepting current offer: {:?}", &current_offer);
let cfd = Cfd::new(
current_offer,
quantity,
CfdState::PendingTakeRequest {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
},
Usd::ZERO,
)
.unwrap();
db::insert_cfd(cfd, &mut conn).await.unwrap();
cfd_feed_actor_inbox
.send(db::load_all_cfds(&mut conn).await.unwrap())
.unwrap();
out_msg_maker_inbox
.send(wire::TakerToMaker::TakeOffer { offer_id, quantity })
.unwrap();
}
Command::NewOffer(Some(offer)) => {
let mut conn = db.acquire().await.unwrap();
db::insert_cfd_offer(&offer, &mut conn).await.unwrap();
offer_feed_actor_inbox.send(Some(offer)).unwrap();
}
Command::NewOffer(None) => {
offer_feed_actor_inbox.send(None).unwrap();
}
Command::OfferAccepted(offer_id) => {
let mut conn = db.acquire().await.unwrap();
db::insert_new_cfd_state_by_offer_id(
offer_id,
CfdState::ContractSetup {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
},
&mut conn,
)
.await
.unwrap();
cfd_feed_actor_inbox
.send(db::load_all_cfds(&mut conn).await.unwrap())
.unwrap();
// TODO: Contract signing/setup
}
}
}
};
(actor, sender)
}

41
daemon/src/taker_inc_message_actor.rs

@ -0,0 +1,41 @@
use crate::{taker_cfd_actor, wire};
use futures::{Future, StreamExt};
use tokio::net::tcp::OwnedReadHalf;
use tokio::sync::mpsc;
use tokio_util::codec::{FramedRead, LengthDelimitedCodec};
pub fn new(
read: OwnedReadHalf,
cfd_actor: mpsc::UnboundedSender<taker_cfd_actor::Command>,
) -> impl Future<Output = ()> {
let frame_read = FramedRead::new(read, LengthDelimitedCodec::new());
let mut messages = frame_read.map(|result| {
let message = serde_json::from_slice::<wire::MakerToTaker>(&result?)?;
anyhow::Result::<_>::Ok(message)
});
async move {
while let Some(message) = messages.next().await {
match message {
Ok(wire::MakerToTaker::CurrentOffer(offer)) => {
cfd_actor
.send(taker_cfd_actor::Command::NewOffer(offer))
.unwrap();
}
Ok(wire::MakerToTaker::ConfirmTakeOffer(offer_id)) => {
// TODO: This naming is not well aligned.
cfd_actor
.send(taker_cfd_actor::Command::OfferAccepted(offer_id))
.unwrap();
}
Ok(wire::MakerToTaker::InvalidOfferId(_)) => {
todo!()
}
Err(error) => {
eprintln!("Error in reading message: {}", error);
}
}
}
}
}

25
daemon/src/to_sse_event.rs

@ -0,0 +1,25 @@
use crate::model::cfd::{Cfd, CfdOffer};
use bdk::bitcoin::Amount;
use rocket::response::stream::Event;
pub trait ToSseEvent {
fn to_sse_event(&self) -> Event;
}
impl ToSseEvent for Vec<Cfd> {
fn to_sse_event(&self) -> Event {
Event::json(self).event("cfds")
}
}
impl ToSseEvent for Option<CfdOffer> {
fn to_sse_event(&self) -> Event {
Event::json(self).event("offer")
}
}
impl ToSseEvent for Amount {
fn to_sse_event(&self) -> Event {
Event::json(&self.as_btc()).event("balance")
}
}

21
daemon/src/wire.rs

@ -0,0 +1,21 @@
use crate::model::cfd::CfdOfferId;
use crate::model::Usd;
use crate::CfdOffer;
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize)]
#[serde(tag = "type", content = "payload")]
pub enum TakerToMaker {
TakeOffer { offer_id: CfdOfferId, quantity: Usd },
// TODO: Currently the taker starts, can already send some stuff for signing over in the first message.
StartContractSetup(CfdOfferId),
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(tag = "type", content = "payload")]
pub enum MakerToTaker {
CurrentOffer(Option<CfdOffer>),
// TODO: Needs RejectOffer as well
ConfirmTakeOffer(CfdOfferId),
InvalidOfferId(CfdOfferId),
}
Loading…
Cancel
Save