Browse Source

Clean up RPC threads after the connection is closed

This was causing memory leaks when an RPC connection was closed for any
reason.
android
Juan Pablo Civile 5 years ago
committed by Roman Zeyde
parent
commit
44adde467f
  1. 74
      src/rpc.rs

74
src/rpc.rs

@ -8,7 +8,7 @@ use serde_json::{from_str, Value};
use std::collections::HashMap; use std::collections::HashMap;
use std::io::{BufRead, BufReader, Write}; use std::io::{BufRead, BufReader, Write};
use std::net::{Shutdown, SocketAddr, TcpListener, TcpStream}; use std::net::{Shutdown, SocketAddr, TcpListener, TcpStream};
use std::sync::mpsc::{Sender, SyncSender, TrySendError}; use std::sync::mpsc::{Sender, SyncSender};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::thread; use std::thread;
@ -485,21 +485,18 @@ struct Stats {
impl RPC { impl RPC {
fn start_notifier( fn start_notifier(
notification: Channel<Notification>, notification: Channel<Notification>,
senders: Arc<Mutex<Vec<SyncSender<Message>>>>, senders: Arc<Mutex<HashMap<i32, SyncSender<Message>>>>,
acceptor: Sender<Option<(TcpStream, SocketAddr)>>, acceptor: Sender<Option<(TcpStream, SocketAddr)>>,
) { ) {
spawn_thread("notification", move || { spawn_thread("notification", move || {
for msg in notification.receiver().iter() { for msg in notification.receiver().iter() {
let mut senders = senders.lock().unwrap(); let senders = senders.lock().unwrap();
match msg { match msg {
Notification::Periodic => { Notification::Periodic => {
for sender in senders.split_off(0) { for (i, sender) in senders.iter() {
if let Err(TrySendError::Disconnected(_)) = if let Err(e) = sender.try_send(Message::PeriodicUpdate) {
sender.try_send(Message::PeriodicUpdate) warn!("failed to send PeriodicUpdate to peer {}: {}", i, e);
{
continue;
} }
senders.push(sender);
} }
} }
Notification::Exit => acceptor.send(None).unwrap(), Notification::Exit => acceptor.send(None).unwrap(),
@ -544,29 +541,56 @@ impl RPC {
RPC { RPC {
notification: notification.sender(), notification: notification.sender(),
server: Some(spawn_thread("rpc", move || { server: Some(spawn_thread("rpc", move || {
let senders = Arc::new(Mutex::new(Vec::<SyncSender<Message>>::new())); let senders = Arc::new(Mutex::new(HashMap::<i32, SyncSender<Message>>::new()));
let handles = Arc::new(Mutex::new(
HashMap::<i32, std::thread::JoinHandle<()>>::new(),
));
let acceptor = RPC::start_acceptor(addr); let acceptor = RPC::start_acceptor(addr);
RPC::start_notifier(notification, senders.clone(), acceptor.sender()); RPC::start_notifier(notification, senders.clone(), acceptor.sender());
let mut children = vec![];
let mut handle_count = 0;
while let Some((stream, addr)) = acceptor.receiver().recv().unwrap() { while let Some((stream, addr)) = acceptor.receiver().recv().unwrap() {
let query = query.clone(); let handle_id = handle_count;
let senders = senders.clone(); handle_count += 1;
let stats = stats.clone();
children.push(spawn_thread("peer", move || { // explicitely scope the shadowed variables for the new thread
info!("[{}] connected peer", addr); let handle: thread::JoinHandle<()> = {
let conn = Connection::new(query, stream, addr, stats); let query = Arc::clone(&query);
senders.lock().unwrap().push(conn.chan.sender()); let senders = Arc::clone(&senders);
conn.run(); let stats = Arc::clone(&stats);
info!("[{}] disconnected peer", addr); let handles = Arc::clone(&handles);
}));
spawn_thread("peer", move || {
info!("[{}] connected peer #{}", addr, handle_id);
let conn = Connection::new(query, stream, addr, stats);
senders
.lock()
.unwrap()
.insert(handle_id, conn.chan.sender());
conn.run();
info!("[{}] disconnected peer #{}", addr, handle_id);
senders.lock().unwrap().remove(&handle_id);
handles.lock().unwrap().remove(&handle_id);
})
};
handles.lock().unwrap().insert(handle_id, handle);
} }
trace!("closing {} RPC connections", senders.lock().unwrap().len()); trace!("closing {} RPC connections", senders.lock().unwrap().len());
for sender in senders.lock().unwrap().iter() { for sender in senders.lock().unwrap().values() {
let _ = sender.send(Message::Done); let _ = sender.send(Message::Done);
} }
trace!("waiting for {} RPC handling threads", children.len());
for child in children { trace!(
let _ = child.join(); "waiting for {} RPC handling threads",
handles.lock().unwrap().len()
);
for (_, handle) in handles.lock().unwrap().drain() {
if let Err(e) = handle.join() {
warn!("failed to join thread: {:?}", e);
}
} }
trace!("RPC connections are closed"); trace!("RPC connections are closed");
})), })),

Loading…
Cancel
Save