Browse Source

Networking fix.

cl-refactor
Gav Wood 11 years ago
parent
commit
7ffa513586
  1. 2
      alethzero/MainWin.cpp
  2. 45
      libethereum/PeerSession.cpp
  3. 4
      libethereum/PeerSession.h

2
alethzero/MainWin.cpp

@ -372,6 +372,7 @@ void Main::writeSettings()
s.setValue("address", b); s.setValue("address", b);
s.setValue("upnp", ui->upnp->isChecked()); s.setValue("upnp", ui->upnp->isChecked());
s.setValue("forceAddress", ui->forceAddress->text());
s.setValue("usePast", ui->usePast->isChecked()); s.setValue("usePast", ui->usePast->isChecked());
s.setValue("paranoia", ui->paranoia->isChecked()); s.setValue("paranoia", ui->paranoia->isChecked());
s.setValue("showAll", ui->showAll->isChecked()); s.setValue("showAll", ui->showAll->isChecked());
@ -418,6 +419,7 @@ void Main::readSettings()
m_client->setAddress(m_myKeys.back().address()); m_client->setAddress(m_myKeys.back().address());
m_peers = s.value("peers").toByteArray(); m_peers = s.value("peers").toByteArray();
ui->upnp->setChecked(s.value("upnp", true).toBool()); ui->upnp->setChecked(s.value("upnp", true).toBool());
ui->forceAddress->setText(s.value("forceAddress", "").toString());
ui->usePast->setChecked(s.value("usePast", true).toBool()); ui->usePast->setChecked(s.value("usePast", true).toBool());
ui->paranoia->setChecked(s.value("paranoia", false).toBool()); ui->paranoia->setChecked(s.value("paranoia", false).toBool());
ui->showAll->setChecked(s.value("showAll", false).toBool()); ui->showAll->setChecked(s.value("showAll", false).toBool());

45
libethereum/PeerSession.cpp

@ -37,7 +37,6 @@ static const eth::uint c_maxBlocksAsk = 256; ///< Maximum number of blocks we as
PeerSession::PeerSession(PeerServer* _s, bi::tcp::socket _socket, uint _rNId, bi::address _peerAddress, unsigned short _peerPort): PeerSession::PeerSession(PeerServer* _s, bi::tcp::socket _socket, uint _rNId, bi::address _peerAddress, unsigned short _peerPort):
m_server(_s), m_server(_s),
m_strand(_socket.get_io_service()),
m_socket(std::move(_socket)), m_socket(std::move(_socket)),
m_reqNetworkId(_rNId), m_reqNetworkId(_rNId),
m_listenPort(_peerPort), m_listenPort(_peerPort),
@ -50,6 +49,7 @@ PeerSession::PeerSession(PeerServer* _s, bi::tcp::socket _socket, uint _rNId, bi
PeerSession::~PeerSession() PeerSession::~PeerSession()
{ {
// Read-chain finished for one reason or another.
try try
{ {
if (m_socket.is_open()) if (m_socket.is_open())
@ -424,7 +424,7 @@ void PeerSession::sendDestroy(bytes& _msg)
} }
bytes buffer = bytes(std::move(_msg)); bytes buffer = bytes(std::move(_msg));
m_strand.post(boost::bind(&PeerSession::writeImpl, this, buffer)); writeImpl(buffer);
} }
void PeerSession::send(bytesConstRef _msg) void PeerSession::send(bytesConstRef _msg)
@ -437,43 +437,51 @@ void PeerSession::send(bytesConstRef _msg)
} }
bytes buffer = bytes(_msg.toBytes()); bytes buffer = bytes(_msg.toBytes());
m_strand.post(boost::bind(&PeerSession::writeImpl, this, buffer)); writeImpl(buffer);
} }
void PeerSession::writeImpl(bytes& _buffer) void PeerSession::writeImpl(bytes& _buffer)
{ {
m_writeQueue.push_back(_buffer); // cerr << (void*)this << " writeImpl" << endl;
if (m_writeQueue.size() > 1) if (!m_socket.is_open())
return; return;
lock_guard<recursive_mutex> l(m_writeLock);
m_writeQueue.push_back(_buffer);
if (m_writeQueue.size() == 1)
write(); write();
} }
void PeerSession::write() void PeerSession::write()
{ {
// cerr << (void*)this << " write" << endl;
lock_guard<recursive_mutex> l(m_writeLock);
if (m_writeQueue.empty()) if (m_writeQueue.empty())
return; return;
const bytes& bytes = m_writeQueue[0]; const bytes& bytes = m_writeQueue[0];
if (m_socket.is_open()) auto self(shared_from_this());
ba::async_write(m_socket, ba::buffer(bytes), m_strand.wrap([this](boost::system::error_code ec, std::size_t /*length*/) ba::async_write(m_socket, ba::buffer(bytes), [this, self](boost::system::error_code ec, std::size_t /*length*/)
{ {
// must check queue, as write callback can occur following dropped() // cerr << (void*)this << " write.callback" << endl;
if (!m_writeQueue.empty())
m_writeQueue.pop_front();
// must check queue, as write callback can occur following dropped()
if (ec) if (ec)
{ {
cwarn << "Error sending: " << ec.message(); cwarn << "Error sending: " << ec.message();
dropped(); dropped();
} }
else else
m_strand.post(boost::bind(&PeerSession::write, this)); {
})); m_writeQueue.pop_front();
write();
}
});
} }
void PeerSession::dropped() void PeerSession::dropped()
{ {
// cerr << (void*)this << " dropped" << endl;
if (m_socket.is_open()) if (m_socket.is_open())
try try
{ {
@ -482,24 +490,13 @@ void PeerSession::dropped()
} }
catch (...) {} catch (...) {}
// block future writes by running in strand and clearing queue // Remove from peer server
m_strand.post([=]()
{
m_writeQueue.clear();
if (!m_willBeDeleted) // Don't want two deleters on the queue at once!
{
m_willBeDeleted = true;
m_strand.post([=]()
{
for (auto i = m_server->m_peers.begin(); i != m_server->m_peers.end(); ++i) for (auto i = m_server->m_peers.begin(); i != m_server->m_peers.end(); ++i)
if (i->second.lock().get() == this) if (i->second.lock().get() == this)
{ {
m_server->m_peers.erase(i); m_server->m_peers.erase(i);
break; break;
} }
});
}
});
} }
void PeerSession::disconnect(int _reason) void PeerSession::disconnect(int _reason)

4
libethereum/PeerSession.h

@ -21,6 +21,7 @@
#pragma once #pragma once
#include <mutex>
#include <array> #include <array>
#include <set> #include <set>
#include <memory> #include <memory>
@ -65,7 +66,8 @@ private:
void writeImpl(bytes& _buffer); void writeImpl(bytes& _buffer);
void write(); void write();
PeerServer* m_server; PeerServer* m_server;
boost::asio::strand m_strand;
std::recursive_mutex m_writeLock;
std::deque<bytes> m_writeQueue; std::deque<bytes> m_writeQueue;
bi::tcp::socket m_socket; bi::tcp::socket m_socket;

Loading…
Cancel
Save