compare mace remote store and rocksdb blobdb
This commit is contained in:
parent
07baaae51e
commit
e655952829
@ -1,10 +1,12 @@
|
|||||||
#include <algorithm>
|
#include <algorithm>
|
||||||
#include <atomic>
|
#include <atomic>
|
||||||
#include <fmt/format.h>
|
#include <fmt/format.h>
|
||||||
|
#include <memory>
|
||||||
#include <random>
|
#include <random>
|
||||||
#include <rocksdb/cache.h>
|
#include <rocksdb/cache.h>
|
||||||
#include <rocksdb/db.h>
|
#include <rocksdb/db.h>
|
||||||
#include <rocksdb/options.h>
|
#include <rocksdb/options.h>
|
||||||
|
#include <rocksdb/table.h>
|
||||||
#include <rocksdb/utilities/optimistic_transaction_db.h>
|
#include <rocksdb/utilities/optimistic_transaction_db.h>
|
||||||
#include <rocksdb/utilities/transaction.h>
|
#include <rocksdb/utilities/transaction.h>
|
||||||
#include <rocksdb/utilities/transaction_db.h>
|
#include <rocksdb/utilities/transaction_db.h>
|
||||||
@ -76,17 +78,30 @@ int main(int argc, char *argv[]) {
|
|||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
rocksdb::Options options;
|
rocksdb::ColumnFamilyOptions cfo{};
|
||||||
|
cfo.enable_blob_files = true;
|
||||||
|
cfo.min_blob_size = 1024;
|
||||||
|
// use 1GB block cache
|
||||||
|
auto cache = rocksdb::NewLRUCache(1 << 30);
|
||||||
|
rocksdb::BlockBasedTableOptions table_options{};
|
||||||
|
table_options.block_cache = cache;
|
||||||
|
cfo.table_factory.reset(NewBlockBasedTableFactory(table_options));
|
||||||
|
// the following three options makes it not trigger GC in test
|
||||||
|
cfo.level0_file_num_compaction_trigger = 1000;
|
||||||
|
cfo.write_buffer_size = 1 << 30;
|
||||||
|
cfo.max_write_buffer_number = 5;
|
||||||
|
|
||||||
|
std::vector<rocksdb::ColumnFamilyDescriptor> cfd{};
|
||||||
|
cfd.push_back(rocksdb::ColumnFamilyDescriptor("default", cfo));
|
||||||
|
|
||||||
|
rocksdb::DBOptions options;
|
||||||
options.create_if_missing = true;
|
options.create_if_missing = true;
|
||||||
options.allow_concurrent_memtable_write = true;
|
options.allow_concurrent_memtable_write = true;
|
||||||
options.enable_pipelined_write = true;
|
options.enable_pipelined_write = true;
|
||||||
// the following three options makes it not trigger GC in test
|
|
||||||
options.level0_file_num_compaction_trigger = 1000;
|
|
||||||
options.write_buffer_size = 1 << 30;
|
|
||||||
options.max_write_buffer_number = 5;
|
|
||||||
|
|
||||||
auto ropt = rocksdb::ReadOptions();
|
auto ropt = rocksdb::ReadOptions();
|
||||||
auto wopt = rocksdb::WriteOptions();
|
auto wopt = rocksdb::WriteOptions();
|
||||||
|
wopt.no_slowdown = true;
|
||||||
// wopt.disableWAL = true;
|
// wopt.disableWAL = true;
|
||||||
std::vector<std::thread> wg;
|
std::vector<std::thread> wg;
|
||||||
std::vector<std::vector<std::string>> keys{};
|
std::vector<std::vector<std::string>> keys{};
|
||||||
@ -94,7 +109,8 @@ int main(int argc, char *argv[]) {
|
|||||||
rocksdb::OptimisticTransactionDB *db;
|
rocksdb::OptimisticTransactionDB *db;
|
||||||
auto b = nm::Instant::now();
|
auto b = nm::Instant::now();
|
||||||
std::mutex mtx{};
|
std::mutex mtx{};
|
||||||
auto s = rocksdb::OptimisticTransactionDB::Open(options, args.path, &db);
|
std::vector<rocksdb::ColumnFamilyHandle *> handles{};
|
||||||
|
auto s = rocksdb::OptimisticTransactionDB::Open(options, args.path, cfd, &handles, &db);
|
||||||
assert(s.ok());
|
assert(s.ok());
|
||||||
std::barrier barrier{static_cast<ptrdiff_t>(args.threads)};
|
std::barrier barrier{static_cast<ptrdiff_t>(args.threads)};
|
||||||
|
|
||||||
@ -117,23 +133,27 @@ int main(int argc, char *argv[]) {
|
|||||||
keys.emplace_back(std::move(key));
|
keys.emplace_back(std::move(key));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
auto *handle = handles[0];
|
||||||
|
|
||||||
if (args.mode == "get") {
|
if (args.mode == "get") {
|
||||||
auto *kv = db->BeginTransaction(wopt);
|
auto *kv = db->BeginTransaction(wopt);
|
||||||
for (size_t tid = 0; tid < args.threads; ++tid) {
|
for (size_t tid = 0; tid < args.threads; ++tid) {
|
||||||
auto *tk = &keys[tid];
|
auto *tk = &keys[tid];
|
||||||
for (auto &key: *tk) {
|
for (auto &key: *tk) {
|
||||||
kv->Put(key, val);
|
kv->Put(handle, key, val);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
kv->Commit();
|
kv->Commit();
|
||||||
delete kv;
|
delete kv;
|
||||||
|
delete handle;
|
||||||
delete db;
|
delete db;
|
||||||
|
handles.clear();
|
||||||
// re-open db
|
// re-open db
|
||||||
s = rocksdb::OptimisticTransactionDB::Open(options, args.path, &db);
|
s = rocksdb::OptimisticTransactionDB::Open(options, args.path, cfd, &handles, &db);
|
||||||
assert(s.ok());
|
assert(s.ok());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
handle = handles[0];
|
||||||
for (size_t tid = 0; tid < args.threads; ++tid) {
|
for (size_t tid = 0; tid < args.threads; ++tid) {
|
||||||
auto *tk = &keys[tid];
|
auto *tk = &keys[tid];
|
||||||
wg.emplace_back([&] {
|
wg.emplace_back([&] {
|
||||||
@ -147,7 +167,7 @@ int main(int argc, char *argv[]) {
|
|||||||
if (args.mode == "insert") {
|
if (args.mode == "insert") {
|
||||||
for (auto &key: *tk) {
|
for (auto &key: *tk) {
|
||||||
auto *kv = db->BeginTransaction(wopt);
|
auto *kv = db->BeginTransaction(wopt);
|
||||||
kv->Put(key, val);
|
kv->Put(handle, key, val);
|
||||||
kv->Commit();
|
kv->Commit();
|
||||||
delete kv;
|
delete kv;
|
||||||
}
|
}
|
||||||
@ -155,7 +175,7 @@ int main(int argc, char *argv[]) {
|
|||||||
} else if (args.mode == "get") {
|
} else if (args.mode == "get") {
|
||||||
for (auto &key: *tk) {
|
for (auto &key: *tk) {
|
||||||
auto *kv = db->BeginTransaction(wopt);
|
auto *kv = db->BeginTransaction(wopt);
|
||||||
kv->Get(ropt, key, &rval);
|
kv->Get(ropt, handle, key, &rval);
|
||||||
kv->Commit();
|
kv->Commit();
|
||||||
delete kv;
|
delete kv;
|
||||||
}
|
}
|
||||||
@ -164,9 +184,9 @@ int main(int argc, char *argv[]) {
|
|||||||
auto is_insert = dist(gen) < args.insert_ratio;
|
auto is_insert = dist(gen) < args.insert_ratio;
|
||||||
auto *kv = db->BeginTransaction(wopt);
|
auto *kv = db->BeginTransaction(wopt);
|
||||||
if (is_insert) {
|
if (is_insert) {
|
||||||
kv->Put(key, val);
|
kv->Put(handle, key, val);
|
||||||
} else {
|
} else {
|
||||||
kv->Get(ropt, key, &rval); // not found
|
kv->Get(ropt, handle, key, &rval); // not found
|
||||||
}
|
}
|
||||||
kv->Commit();
|
kv->Commit();
|
||||||
delete kv;
|
delete kv;
|
||||||
@ -187,6 +207,7 @@ int main(int argc, char *argv[]) {
|
|||||||
double ops = static_cast<double>(total_op.load(std::memory_order_relaxed)) / b.elapse_sec();
|
double ops = static_cast<double>(total_op.load(std::memory_order_relaxed)) / b.elapse_sec();
|
||||||
fmt::println("{},{},{},{},{},{:.2f},{}", args.mode, args.threads, args.key_size, args.value_size, ratio, ops,
|
fmt::println("{},{},{},{},{},{:.2f},{}", args.mode, args.threads, args.key_size, args.value_size, ratio, ops,
|
||||||
b.elapse_ms());
|
b.elapse_ms());
|
||||||
|
delete handle;
|
||||||
delete db;
|
delete db;
|
||||||
std::filesystem::remove_all(args.path);
|
std::filesystem::remove_all(args.path);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -76,6 +76,7 @@ fn main() {
|
|||||||
let mut keys: Vec<Vec<Vec<u8>>> = Vec::with_capacity(args.threads);
|
let mut keys: Vec<Vec<Vec<u8>>> = Vec::with_capacity(args.threads);
|
||||||
let mut opt = Options::new(path);
|
let mut opt = Options::new(path);
|
||||||
opt.sync_on_write = false;
|
opt.sync_on_write = false;
|
||||||
|
opt.max_inline_size = 1024;
|
||||||
opt.tmp_store = args.mode != "get";
|
opt.tmp_store = args.mode != "get";
|
||||||
let mut saved = opt.clone();
|
let mut saved = opt.clone();
|
||||||
saved.tmp_store = false;
|
saved.tmp_store = false;
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user