|
@ -11,7 +11,6 @@ use std::net::{Shutdown, SocketAddr, TcpListener, TcpStream}; |
|
|
use std::sync::mpsc::{Sender, SyncSender, TrySendError}; |
|
|
use std::sync::mpsc::{Sender, SyncSender, TrySendError}; |
|
|
use std::sync::{Arc, Mutex}; |
|
|
use std::sync::{Arc, Mutex}; |
|
|
use std::thread; |
|
|
use std::thread; |
|
|
use std::time::Duration; |
|
|
|
|
|
|
|
|
|
|
|
use crate::errors::*; |
|
|
use crate::errors::*; |
|
|
use crate::metrics::{Gauge, HistogramOpts, HistogramVec, MetricOpts, Metrics}; |
|
|
use crate::metrics::{Gauge, HistogramOpts, HistogramVec, MetricOpts, Metrics}; |
|
@ -555,26 +554,46 @@ impl RPC { |
|
|
let acceptor = RPC::start_acceptor(addr); |
|
|
let acceptor = RPC::start_acceptor(addr); |
|
|
RPC::start_notifier(notification, senders.clone(), acceptor.sender()); |
|
|
RPC::start_notifier(notification, senders.clone(), acceptor.sender()); |
|
|
|
|
|
|
|
|
|
|
|
let mut threads = HashMap::new(); |
|
|
|
|
|
let (garbage_sender, garbage_receiver) = crossbeam_channel::unbounded(); |
|
|
|
|
|
|
|
|
while let Some((stream, addr)) = acceptor.receiver().recv().unwrap() { |
|
|
while let Some((stream, addr)) = acceptor.receiver().recv().unwrap() { |
|
|
// explicitely scope the shadowed variables for the new thread
|
|
|
// explicitely scope the shadowed variables for the new thread
|
|
|
let query = Arc::clone(&query); |
|
|
let query = Arc::clone(&query); |
|
|
let senders = Arc::clone(&senders); |
|
|
let senders = Arc::clone(&senders); |
|
|
let stats = Arc::clone(&stats); |
|
|
let stats = Arc::clone(&stats); |
|
|
|
|
|
let garbage_sender = garbage_sender.clone(); |
|
|
|
|
|
|
|
|
// HACK: detach peer-handling threads
|
|
|
let spawned = spawn_thread("peer", move || { |
|
|
spawn_thread("peer", move || { |
|
|
|
|
|
info!("[{}] connected peer", addr); |
|
|
info!("[{}] connected peer", addr); |
|
|
let conn = Connection::new(query, stream, addr, stats, relayfee); |
|
|
let conn = Connection::new(query, stream, addr, stats, relayfee); |
|
|
senders.lock().unwrap().push(conn.chan.sender()); |
|
|
senders.lock().unwrap().push(conn.chan.sender()); |
|
|
conn.run(); |
|
|
conn.run(); |
|
|
info!("[{}] disconnected peer", addr); |
|
|
info!("[{}] disconnected peer", addr); |
|
|
|
|
|
let _ = garbage_sender.send(std::thread::current().id()); |
|
|
}); |
|
|
}); |
|
|
|
|
|
|
|
|
|
|
|
trace!("[{}] spawned {:?}", addr, spawned.thread().id()); |
|
|
|
|
|
threads.insert(spawned.thread().id(), spawned); |
|
|
|
|
|
while let Ok(id) = garbage_receiver.try_recv() { |
|
|
|
|
|
if let Some(thread) = threads.remove(&id) { |
|
|
|
|
|
trace!("[{}] joining {:?}", addr, id); |
|
|
|
|
|
if let Err(error) = thread.join() { |
|
|
|
|
|
error!("failed to join {:?}: {:?}", id, error); |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
} |
|
|
} |
|
|
trace!("closing {} RPC connections", senders.lock().unwrap().len()); |
|
|
trace!("closing {} RPC connections", senders.lock().unwrap().len()); |
|
|
for sender in senders.lock().unwrap().iter() { |
|
|
for sender in senders.lock().unwrap().iter() { |
|
|
let _ = sender.send(Message::Done); |
|
|
let _ = sender.send(Message::Done); |
|
|
} |
|
|
} |
|
|
thread::sleep(Duration::from_secs(1)); // TODO: actually wait for threads
|
|
|
for (id, thread) in threads { |
|
|
|
|
|
trace!("joining {:?}", id); |
|
|
|
|
|
if let Err(error) = thread.join() { |
|
|
|
|
|
error!("failed to join {:?}: {:?}", id, error); |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
trace!("RPC connections are closed"); |
|
|
trace!("RPC connections are closed"); |
|
|
})), |
|
|
})), |
|
|