Browse Source

first version

cl-refactor
Vlad Gluhovsky 10 years ago
parent
commit
86b16df9c2
  1. 4
      libwhisper/Interface.h
  2. 19
      libwhisper/WhisperDB.cpp
  3. 10
      libwhisper/WhisperDB.h
  4. 67
      libwhisper/WhisperHost.cpp
  5. 2
      libwhisper/WhisperHost.h
  6. 96
      test/libwhisper/whisperDB.cpp

4
libwhisper/Interface.h

@ -64,15 +64,15 @@ public:
virtual ~Interface();
virtual void inject(Envelope const& _m, WhisperPeer* _from = nullptr) = 0;
virtual Topics const& fullTopics(unsigned _id) const = 0;
virtual unsigned installWatch(Topics const& _filter) = 0;
virtual void uninstallWatch(unsigned _watchId) = 0;
virtual h256s peekWatch(unsigned _watchId) const = 0;
virtual h256s checkWatch(unsigned _watchId) = 0;
virtual h256s watchMessages(unsigned _watchId) = 0;
virtual Envelope envelope(h256 _m) const = 0;
virtual void saveTopicsToDB(std::string const& _app, std::string const& _password) = 0;
virtual std::vector<unsigned> restoreTopicsFromDB(std::string const& _app, std::string const& _password) = 0;
void post(bytes const& _payload, Topics _topics, unsigned _ttl = 50, unsigned _workToProve = 50) { inject(Message(_payload).seal(_topics, _ttl, _workToProve)); }
void post(Public _to, bytes const& _payload, Topics _topics, unsigned _ttl = 50, unsigned _workToProve = 50) { inject(Message(_payload).sealTo(_to, _topics, _ttl, _workToProve)); }

19
libwhisper/WhisperDB.cpp

@ -27,7 +27,7 @@ using namespace dev;
using namespace dev::shh;
namespace fs = boost::filesystem;
WhisperDB::WhisperDB()
WhisperDB::WhisperDB(DBType _t): m_type(_t)
{
m_readOptions.verify_checksums = true;
string path = dev::getDataDir("shh");
@ -36,13 +36,25 @@ WhisperDB::WhisperDB()
leveldb::Options op;
op.create_if_missing = true;
op.max_open_files = 256;
string suffix = getTypeSuffix();
leveldb::DB* p = nullptr;
leveldb::Status status = leveldb::DB::Open(op, path + "/messages", &p);
leveldb::Status status = leveldb::DB::Open(op, path + suffix, &p);
m_db.reset(p);
if (!status.ok())
BOOST_THROW_EXCEPTION(FailedToOpenLevelDB(status.ToString()));
}
string WhisperDB::getTypeSuffix()
{
switch(m_type)
{
case Messages: return "\\messages";
case Filters: return "\\filters";
}
return "\\misc";
}
string WhisperDB::lookup(dev::h256 const& _key) const
{
string ret;
@ -137,6 +149,9 @@ void WhisperDB::loadAll(std::map<h256, Envelope>& o_dst)
void WhisperDB::save(h256 const& _key, Envelope const& _e)
{
if (m_type != Messages)
BOOST_THROW_EXCEPTION(WrongTypeLevelDB());
try
{
RLPStream rlp;

10
libwhisper/WhisperDB.h

@ -31,6 +31,7 @@ namespace dev
namespace shh
{
struct WrongTypeLevelDB: virtual Exception {};
struct FailedToOpenLevelDB: virtual Exception { FailedToOpenLevelDB(std::string const& _message): Exception(_message) {} };
struct FailedInsertInLevelDB: virtual Exception { FailedInsertInLevelDB(std::string const& _message): Exception(_message) {} };
struct FailedLookupInLevelDB: virtual Exception { FailedLookupInLevelDB(std::string const& _message): Exception(_message) {} };
@ -38,8 +39,10 @@ struct FailedDeleteInLevelDB: virtual Exception { FailedDeleteInLevelDB(std::str
class WhisperDB
{
public:
WhisperDB();
public:
enum DBType { Messages, Filters };
WhisperDB(DBType _t);
~WhisperDB() {}
std::string lookup(dev::h256 const& _key) const;
@ -50,9 +53,12 @@ class WhisperDB
void save(dev::h256 const& _key, Envelope const& _e);
private:
std::string getTypeSuffix();
leveldb::ReadOptions m_readOptions;
leveldb::WriteOptions m_writeOptions;
std::unique_ptr<leveldb::DB> m_db;
DBType m_type;
enum MetaInformation { StoreForeverFlag = 1, WatchedFlag = 2 };
};

67
libwhisper/WhisperHost.cpp

@ -223,7 +223,7 @@ void WhisperHost::saveMessagesToBD()
try
{
WhisperDB db;
WhisperDB db(WhisperDB::Messages);
ReadGuard g(x_messages);
unsigned now = (unsigned)time(0);
for (auto const& m: m_messages)
@ -253,7 +253,7 @@ void WhisperHost::loadMessagesFromBD()
try
{
map<h256, Envelope> m;
WhisperDB db;
WhisperDB db(WhisperDB::Messages);
db.loadAll(m);
WriteGuard g(x_messages);
m_messages.swap(m);
@ -269,3 +269,66 @@ void WhisperHost::loadMessagesFromBD()
cwarn << "Unknown Exception in WhisperHost::loadMessagesFromBD()";
}
}
void WhisperHost::saveTopicsToDB(string const& _app, string const& _password)
{
bytes plain;
DEV_GUARDED(m_filterLock)
{
RLPStream rlp(m_filters.size());
for (auto const& x: m_filters)
{
Topics const& topics = x.second.full;
unsigned const RawDataSize = topics.size() * h256::size;
unique_ptr<byte> p(new byte[RawDataSize]);
unsigned i = 0;
for (auto const& t: topics)
memcpy(p.get() + i * h256::size, t.data(), h256::size);
bytesConstRef ref(p.get(), RawDataSize);
rlp.append(ref);
}
rlp.swapOut(plain);
}
// todo: encrypt after tests
h256 h = sha3(_app);
WhisperDB db(WhisperDB::Messages);
db.insert(h, plain);
}
vector<unsigned> WhisperHost::restoreTopicsFromDB(string const& _app, string const& _password)
{
vector<unsigned> ret;
h256 h = sha3(_app);
WhisperDB db(WhisperDB::Messages);
string raw = db.lookup(h);
// todo: decrypt after tests
RLP rlp(raw);
auto sz = rlp.itemCountStrict();
for (unsigned i = 0; i < sz; ++i)
{
RLP r = rlp[i];
bytesConstRef ref(r.data());
Topics topics;
unsigned num = ref.size() / h256::size;
for (unsigned j = 0; j < num; ++j)
{
h256 topic(ref.data() + j * h256::size, h256::ConstructFromPointerType());
topics.push_back(topic);
}
unsigned w = installWatch(topics);
ret.push_back(w);
}
return ret;
}

2
libwhisper/WhisperHost.h

@ -63,6 +63,8 @@ public:
virtual h256s checkWatch(unsigned _watchId) override { cleanup(); dev::Guard l(m_filterLock); h256s ret; try { ret = m_watches.at(_watchId).changes; m_watches.at(_watchId).changes.clear(); } catch (...) {} return ret; }
virtual h256s watchMessages(unsigned _watchId) override; ///< returns IDs of messages, which match specific watch criteria
virtual Envelope envelope(h256 _m) const override { try { dev::ReadGuard l(x_messages); return m_messages.at(_m); } catch (...) { return Envelope(); } }
virtual void saveTopicsToDB(std::string const& _app, std::string const& _password);
virtual std::vector<unsigned> restoreTopicsFromDB(std::string const& _app, std::string const& _password);
protected:
virtual void doWork() override;

96
test/libwhisper/whisperDB.cpp

@ -28,6 +28,7 @@ along with cpp-ethereum. If not, see <http://www.gnu.org/licenses/>.
using namespace std;
using namespace dev;
using namespace dev::shh;
using namespace dev::p2p;
struct P2PFixture
{
@ -47,7 +48,7 @@ BOOST_AUTO_TEST_CASE(basic)
string const text2 = "ipsum";
h256 h1(0xBEEF);
h256 h2(0xC0FFEE);
WhisperDB db;
WhisperDB db(WhisperDB::Messages);
db.kill(h1);
db.kill(h2);
@ -94,7 +95,7 @@ BOOST_AUTO_TEST_CASE(persistence)
h256 const h2(0xBADD00DE);
{
WhisperDB db;
WhisperDB db(WhisperDB::Messages);
db.kill(h1);
db.kill(h2);
s = db.lookup(h1);
@ -109,7 +110,7 @@ BOOST_AUTO_TEST_CASE(persistence)
this_thread::sleep_for(chrono::milliseconds(20));
{
WhisperDB db;
WhisperDB db(WhisperDB::Messages);
db.insert(h1, text1);
db.insert(h2, text2);
}
@ -117,7 +118,7 @@ BOOST_AUTO_TEST_CASE(persistence)
this_thread::sleep_for(chrono::milliseconds(20));
{
WhisperDB db;
WhisperDB db(WhisperDB::Messages);
s = db.lookup(h2);
BOOST_REQUIRE(!s.compare(text2));
s = db.lookup(h1);
@ -169,7 +170,7 @@ BOOST_AUTO_TEST_CASE(messages)
}
}
WhisperDB db;
WhisperDB db(WhisperDB::Messages);
unsigned x = 0;
for (auto i: m1)
@ -190,7 +191,7 @@ BOOST_AUTO_TEST_CASE(corruptedData)
h256 x = h256::random();
{
WhisperDB db;
WhisperDB db(WhisperDB::Messages);
db.insert(x, "this is a test input, representing corrupt data");
}
@ -202,10 +203,91 @@ BOOST_AUTO_TEST_CASE(corruptedData)
}
{
WhisperDB db;
WhisperDB db(WhisperDB::Messages);
string s = db.lookup(x);
BOOST_REQUIRE(s.empty());
}
}
BOOST_AUTO_TEST_CASE(filters)
{
cnote << "Testing filters saving...";
VerbosityHolder setTemporaryLevel(2);
string const app("test suite whisperDB/filters");
string const password("some pseudorandom stuff");
{
p2p::Host h("Test");
auto wh = h.registerCapability(new WhisperHost());
wh->installWatch(BuildTopic("t1"));
wh->installWatch(BuildTopic("t2"));
wh->saveTopicsToDB(app, password);
}
short unsigned port1 = 30313;
Host host1("Test", NetworkPreferences("127.0.0.1", port1, false));
host1.setIdealPeerCount(1);
auto whost1 = host1.registerCapability(new WhisperHost());
host1.start();
auto ids = whost1->restoreTopicsFromDB(app, password);
bool host1Ready = false;
unsigned result = 0;
unsigned messageCount = 0;
std::thread listener([&]()
{
setThreadName("other");
host1Ready = true;
for (int j = 0; j < 200 && messageCount < 2; ++j)
{
for (unsigned id: ids)
for (auto e: whost1->checkWatch(id))
{
Message msg = whost1->envelope(e).open(whost1->fullTopics(id));
unsigned x = RLP(msg.payload()).toInt<unsigned>();
cnote << "New message:" << x;
result += x;
++messageCount;
}
this_thread::sleep_for(chrono::milliseconds(50));
}
});
Host host2("Test", NetworkPreferences("127.0.0.1", 30314, false));
host2.setIdealPeerCount(1);
auto whost2 = host2.registerCapability(new WhisperHost());
host2.start();
while (!host1.haveNetwork())
this_thread::sleep_for(chrono::milliseconds(5));
host2.requirePeer(host1.id(), NodeIPEndpoint(bi::address::from_string("127.0.0.1"), port1, port1));
while (!host1Ready)
this_thread::sleep_for(chrono::milliseconds(10));
while (!host1.peerCount() && !host2.peerCount())
this_thread::sleep_for(chrono::milliseconds(10));
unsigned ttl = 1000000;
whost2->post(RLPStream().append(8).out(), BuildTopic("t8"), ttl);
this_thread::sleep_for(chrono::milliseconds(10));
whost2->post(RLPStream().append(4).out(), BuildTopic("t4"), ttl);
this_thread::sleep_for(chrono::milliseconds(10));
whost2->post(RLPStream().append(1).out(), BuildTopic("t1"), ttl);
this_thread::sleep_for(chrono::milliseconds(10));
whost2->post(RLPStream().append(2).out(), BuildTopic("t2"), ttl);
this_thread::sleep_for(chrono::milliseconds(10));
whost2->post(RLPStream().append(16).out(), BuildTopic("t16"), ttl);
while (messageCount < 2)
this_thread::sleep_for(chrono::milliseconds(10));
listener.join();
BOOST_REQUIRE_EQUAL(messageCount, 2);
BOOST_REQUIRE_EQUAL(result, 3);
}
BOOST_AUTO_TEST_SUITE_END()

Loading…
Cancel
Save