Browse Source

Merge pull request #172 from comit-network/directly-initialize-price-feed

fix-olivia-event-id
Thomas Eizinger 3 years ago
committed by GitHub
parent
commit
5d5347ce56
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 13
      daemon/src/cleanup.rs
  2. 13
      daemon/src/maker.rs
  3. 3
      daemon/src/maker_cfd.rs
  4. 12
      daemon/src/taker.rs
  5. 3
      daemon/src/taker_cfd.rs

13
daemon/src/cleanup.rs

@ -1,13 +1,14 @@
use crate::db::{insert_new_cfd_state_by_order_id, load_all_cfds};
use crate::model::cfd::{Cfd, CfdState, CfdStateCommon};
use anyhow::Result;
use sqlx::SqlitePool;
use sqlx::pool::PoolConnection;
use sqlx::Sqlite;
use std::time::SystemTime;
pub async fn transition_non_continue_cfds_to_setup_failed(db: SqlitePool) -> Result<()> {
let mut conn = db.acquire().await?;
let cfds = load_all_cfds(&mut conn).await?;
pub async fn transition_non_continue_cfds_to_setup_failed(
conn: &mut PoolConnection<Sqlite>,
) -> Result<()> {
let cfds = load_all_cfds(conn).await?;
for cfd in cfds.iter().filter(|cfd| Cfd::is_cleanup(cfd)) {
insert_new_cfd_state_by_order_id(
@ -18,7 +19,7 @@ pub async fn transition_non_continue_cfds_to_setup_failed(db: SqlitePool) -> Res
},
info: format!("Was in state {} which cannot be continued.", cfd.state),
},
&mut conn,
conn,
)
.await?;
}

13
daemon/src/maker.rs

@ -6,7 +6,7 @@ use anyhow::{Context, Result};
use bdk::bitcoin::secp256k1::{schnorrsig, SECP256K1};
use bdk::bitcoin::Network;
use clap::Clap;
use model::cfd::{Cfd, Order};
use model::cfd::Order;
use model::WalletInfo;
use rocket::fairing::AdHoc;
use rocket_db_pools::Database;
@ -108,7 +108,6 @@ async fn main() -> Result<()> {
let oracle = schnorrsig::KeyPair::new(SECP256K1, &mut rand::thread_rng()); // TODO: Fetch oracle public key from oracle.
let (cfd_feed_sender, cfd_feed_receiver) = watch::channel::<Vec<Cfd>>(vec![]);
let (order_feed_sender, order_feed_receiver) = watch::channel::<Option<Order>>(None);
let (wallet_feed_sender, wallet_feed_receiver) = watch::channel::<WalletInfo>(wallet_info);
@ -125,7 +124,6 @@ async fn main() -> Result<()> {
tokio::spawn(task);
rocket::custom(figment)
.manage(cfd_feed_receiver)
.manage(order_feed_receiver)
.manage(wallet_feed_receiver)
.manage(auth_password)
@ -150,11 +148,14 @@ async fn main() -> Result<()> {
Some(db) => (**db).clone(),
None => return Err(rocket),
};
let mut conn = db.acquire().await.unwrap();
cleanup::transition_non_continue_cfds_to_setup_failed(db.clone())
cleanup::transition_non_continue_cfds_to_setup_failed(&mut conn)
.await
.unwrap();
let cfds = load_all_cfds(&mut conn).await.unwrap();
let (cfd_feed_sender, cfd_feed_receiver) = watch::channel(cfds.clone());
let (maker_inc_connections_address, maker_inc_connections_context) =
xtra::Context::new(None);
@ -216,7 +217,9 @@ async fn main() -> Result<()> {
tokio::spawn(maker_inc_connections_address.attach_stream(listener_stream));
tokio::spawn(wallet_sync::new(wallet, wallet_feed_sender));
Ok(rocket.manage(cfd_maker_actor_inbox))
Ok(rocket
.manage(cfd_maker_actor_inbox)
.manage(cfd_feed_receiver))
},
))
.mount(

3
daemon/src/maker_cfd.rs

@ -80,9 +80,6 @@ impl Actor {
cfds: Vec<Cfd>,
oracle_actor: Address<oracle::Actor<Actor, monitor::Actor<Actor>>>,
) -> Result<Self> {
// populate the CFD feed with existing CFDs
cfd_feed_actor_inbox.send(cfds.clone())?;
for dlc in cfds.iter().filter_map(|cfd| Cfd::pending_open_dlc(cfd)) {
let txid = wallet.try_broadcast_transaction(dlc.lock.0.clone()).await?;

12
daemon/src/taker.rs

@ -6,7 +6,7 @@ use bdk::bitcoin::secp256k1::{schnorrsig, SECP256K1};
use bdk::bitcoin::Network;
use clap::Clap;
use futures::StreamExt;
use model::cfd::{Cfd, Order};
use model::cfd::Order;
use rocket::fairing::AdHoc;
use rocket_db_pools::Database;
use seed::Seed;
@ -103,7 +103,6 @@ async fn main() -> Result<()> {
let oracle = schnorrsig::KeyPair::new(SECP256K1, &mut rand::thread_rng()); // TODO: Fetch oracle public key from oracle.
let (cfd_feed_sender, cfd_feed_receiver) = watch::channel::<Vec<Cfd>>(vec![]);
let (order_feed_sender, order_feed_receiver) = watch::channel::<Option<Order>>(None);
let (wallet_feed_sender, wallet_feed_receiver) = watch::channel::<WalletInfo>(wallet_info);
@ -128,7 +127,6 @@ async fn main() -> Result<()> {
.merge(("port", opts.http_port));
rocket::custom(figment)
.manage(cfd_feed_receiver)
.manage(order_feed_receiver)
.manage(wallet_feed_receiver)
.manage(quote_updates)
@ -152,11 +150,14 @@ async fn main() -> Result<()> {
Some(db) => (**db).clone(),
None => return Err(rocket),
};
let mut conn = db.acquire().await.unwrap();
cleanup::transition_non_continue_cfds_to_setup_failed(db.clone())
cleanup::transition_non_continue_cfds_to_setup_failed(&mut conn)
.await
.unwrap();
let cfds = load_all_cfds(&mut conn).await.unwrap();
let (cfd_feed_sender, cfd_feed_receiver) = watch::channel(cfds.clone());
let send_to_maker = send_to_socket::Actor::new(write)
.create(None)
.spawn_global();
@ -193,7 +194,6 @@ async fn main() -> Result<()> {
),
);
tokio::spawn(wallet_sync::new(wallet, wallet_feed_sender));
tokio::spawn(
oracle_actor_context
.notify_interval(Duration::from_secs(60), || oracle::Sync)
@ -204,7 +204,7 @@ async fn main() -> Result<()> {
monitor_actor_address,
)));
Ok(rocket.manage(cfd_actor_inbox))
Ok(rocket.manage(cfd_actor_inbox).manage(cfd_feed_receiver))
},
))
.mount(

3
daemon/src/taker_cfd.rs

@ -73,9 +73,6 @@ impl Actor {
cfds: Vec<Cfd>,
oracle_actor: Address<oracle::Actor<Actor, monitor::Actor<Actor>>>,
) -> Result<Self> {
// populate the CFD feed with existing CFDs
cfd_feed_actor_inbox.send(cfds.clone())?;
for dlc in cfds.iter().filter_map(|cfd| Cfd::pending_open_dlc(cfd)) {
let txid = wallet.try_broadcast_transaction(dlc.lock.0.clone()).await?;

Loading…
Cancel
Save