diff --git a/examples/listener.rs b/examples/listener.rs index cc66748..6b1e704 100644 --- a/examples/listener.rs +++ b/examples/listener.rs @@ -1,41 +1,15 @@ -extern crate bitcoin; extern crate electrs; -extern crate error_chain; -use bitcoin::network::constants::Network; -use bitcoin::network::message::NetworkMessage; -use bitcoin::network::socket::Socket; +#[macro_use] +extern crate log; -use electrs::errors::*; - -fn run() -> Result<()> { - // Open socket - let mut sock = Socket::new(Network::Bitcoin); - sock.connect("127.0.0.1", 8333) - .chain_err(|| "failed to connect to node")?; - - let mut outgoing = vec![sock.version_message(0).unwrap()]; - loop { - for msg in outgoing.split_off(0) { - eprintln!("send {:?}", msg); - sock.send_message(msg.clone()) - .chain_err(|| format!("failed to send {:?}", msg))?; - } - // Receive new message - let msg = sock - .receive_message() - .chain_err(|| "failed to receive p2p message")?; - - match msg { - NetworkMessage::Alert(_) => continue, // deprecated - NetworkMessage::Version(_) => outgoing.push(NetworkMessage::Verack), - NetworkMessage::Ping(nonce) => outgoing.push(NetworkMessage::Pong(nonce)), - _ => (), - }; - eprintln!("recv {:?}", msg); - } -} +use electrs::config::Config; +use electrs::notify; fn main() { - run().expect("p2p listener failed"); + let _ = Config::from_args(); + let rx = notify::run().into_receiver(); + for msg in rx.iter() { + info!("{:?}", msg) + } } diff --git a/src/lib.rs b/src/lib.rs index 2e2bd1f..07e412f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -42,6 +42,7 @@ pub mod fake; pub mod index; pub mod mempool; pub mod metrics; +pub mod notify; pub mod query; pub mod rpc; pub mod signal; diff --git a/src/notify.rs b/src/notify.rs new file mode 100644 index 0000000..d230e61 --- /dev/null +++ b/src/notify.rs @@ -0,0 +1,50 @@ +use bitcoin::network::constants::Network; +use bitcoin::network::message::NetworkMessage; +use bitcoin::network::socket::Socket; + +use util; + +pub fn run() -> util::Channel { + let chan = util::Channel::new(); + let tx = chan.sender(); + + util::spawn_thread("p2p", move || loop { + // TODO: support testnet and regtest as well. + let mut sock = Socket::new(Network::Bitcoin); + if let Err(e) = sock.connect("127.0.0.1", 8333) { + warn!("failed to connect to node: {}", e); + continue; + } + let mut outgoing = vec![sock.version_message(0).unwrap()]; + loop { + for msg in outgoing.split_off(0) { + debug!("send {:?}", msg); + if let Err(e) = sock.send_message(msg.clone()) { + warn!("failed to connect to node: {}", e); + break; + } + } + // Receive new message + let msg = match sock.receive_message() { + Ok(msg) => msg, + Err(e) => { + warn!("failed to receive p2p message: {}", e); + break; + } + }; + match msg { + NetworkMessage::Alert(_) => continue, // deprecated + NetworkMessage::Version(_) => outgoing.push(NetworkMessage::Verack), + NetworkMessage::Ping(nonce) => outgoing.push(NetworkMessage::Pong(nonce)), + _ => (), + }; + debug!("recv {:?}", msg); + if tx.send(msg).is_err() { + warn!("failed to connect to node"); + return; + } + } + }); + + chan +}