|
|
@ -1,4 +1,5 @@ |
|
|
|
use rocksdb; |
|
|
|
use rocksdb::Writable; |
|
|
|
|
|
|
|
use std::path::Path; |
|
|
|
|
|
|
@ -38,23 +39,25 @@ pub struct StoreOptions { |
|
|
|
impl DBStore { |
|
|
|
/// Opens a new RocksDB at the specified location.
|
|
|
|
pub fn open(path: &Path, opts: StoreOptions) -> DBStore { |
|
|
|
let path = path.to_str().unwrap(); |
|
|
|
debug!("opening {:?} with {:?}", path, &opts); |
|
|
|
let mut db_opts = rocksdb::Options::default(); |
|
|
|
let mut db_opts = rocksdb::DBOptions::default(); |
|
|
|
db_opts.create_if_missing(true); |
|
|
|
db_opts.set_compaction_style(rocksdb::DBCompactionStyle::Level); |
|
|
|
db_opts.set_compression_type(rocksdb::DBCompressionType::Snappy); |
|
|
|
db_opts.set_target_file_size_base(128 << 20); |
|
|
|
db_opts.set_write_buffer_size(64 << 20); |
|
|
|
db_opts.increase_parallelism(2); |
|
|
|
db_opts.set_min_write_buffer_number(2); |
|
|
|
db_opts.set_max_write_buffer_number(3); |
|
|
|
db_opts.set_disable_auto_compactions(opts.bulk_import); |
|
|
|
db_opts.set_advise_random_on_open(!opts.bulk_import); |
|
|
|
|
|
|
|
let mut cf_opts = rocksdb::rocksdb_options::ColumnFamilyOptions::new(); |
|
|
|
cf_opts.set_compaction_style(rocksdb::DBCompactionStyle::Level); |
|
|
|
cf_opts.compression(rocksdb::DBCompressionType::Snappy); |
|
|
|
cf_opts.set_target_file_size_base(128 << 20); |
|
|
|
cf_opts.set_write_buffer_size(64 << 20); |
|
|
|
cf_opts.set_min_write_buffer_number(2); |
|
|
|
cf_opts.set_max_write_buffer_number(3); |
|
|
|
cf_opts.set_disable_auto_compactions(opts.bulk_import); |
|
|
|
|
|
|
|
let mut block_opts = rocksdb::BlockBasedOptions::default(); |
|
|
|
block_opts.set_block_size(256 << 10); |
|
|
|
DBStore { |
|
|
|
db: rocksdb::DB::open(&db_opts, &path).unwrap(), |
|
|
|
db: rocksdb::DB::open_cf(db_opts, path, vec![("default", cf_opts)]).unwrap(), |
|
|
|
opts: opts, |
|
|
|
} |
|
|
|
} |
|
|
@ -78,18 +81,13 @@ impl ReadStore for DBStore { |
|
|
|
// TODO: use generators
|
|
|
|
fn scan(&self, prefix: &[u8]) -> Vec<Row> { |
|
|
|
let mut rows = vec![]; |
|
|
|
let mut iter = self.db.raw_iterator(); |
|
|
|
iter.seek(prefix); |
|
|
|
while iter.valid() { |
|
|
|
let key = &iter.key().unwrap(); |
|
|
|
let mut iter = self.db.iter(); |
|
|
|
iter.seek(rocksdb::SeekKey::Key(prefix)); |
|
|
|
for (key, value) in &mut iter { |
|
|
|
if !key.starts_with(prefix) { |
|
|
|
break; |
|
|
|
} |
|
|
|
rows.push(Row { |
|
|
|
key: key.to_vec(), |
|
|
|
value: iter.value().unwrap().to_vec(), |
|
|
|
}); |
|
|
|
iter.next(); |
|
|
|
rows.push(Row { key, value }); |
|
|
|
} |
|
|
|
rows |
|
|
|
} |
|
|
@ -97,7 +95,7 @@ impl ReadStore for DBStore { |
|
|
|
|
|
|
|
impl WriteStore for DBStore { |
|
|
|
fn write(&self, rows_vec: Vec<Vec<Row>>) { |
|
|
|
let mut batch = rocksdb::WriteBatch::default(); |
|
|
|
let batch = rocksdb::WriteBatch::default(); |
|
|
|
for rows in rows_vec { |
|
|
|
for row in rows { |
|
|
|
batch.put(row.key.as_slice(), row.value.as_slice()).unwrap(); |
|
|
|