align rocksdb and mace
This commit is contained in:
parent
f71be68c38
commit
80b4745118
@ -135,11 +135,11 @@ int main(int argc, char *argv[]) {
|
|||||||
cfo.min_blob_size = args.blob_size;
|
cfo.min_blob_size = args.blob_size;
|
||||||
cfo.disable_auto_compactions = true;
|
cfo.disable_auto_compactions = true;
|
||||||
cfo.max_compaction_bytes = (1ULL << 60);
|
cfo.max_compaction_bytes = (1ULL << 60);
|
||||||
cfo.level0_stop_writes_trigger = 100000;
|
cfo.level0_stop_writes_trigger = 1000000;
|
||||||
cfo.level0_slowdown_writes_trigger = 100000;
|
cfo.level0_slowdown_writes_trigger = 1000000;
|
||||||
cfo.level0_file_num_compaction_trigger = 100000;
|
cfo.level0_file_num_compaction_trigger = 1000000;
|
||||||
cfo.write_buffer_size = 64 << 20;
|
cfo.write_buffer_size = 64 << 20;
|
||||||
cfo.max_write_buffer_number = 64;
|
cfo.max_write_buffer_number = 128;
|
||||||
|
|
||||||
// use 3GB block cache
|
// use 3GB block cache
|
||||||
auto cache = rocksdb::NewLRUCache(3 << 30);
|
auto cache = rocksdb::NewLRUCache(3 << 30);
|
||||||
@ -154,7 +154,8 @@ int main(int argc, char *argv[]) {
|
|||||||
options.create_if_missing = true;
|
options.create_if_missing = true;
|
||||||
options.allow_concurrent_memtable_write = true;
|
options.allow_concurrent_memtable_write = true;
|
||||||
options.enable_pipelined_write = true;
|
options.enable_pipelined_write = true;
|
||||||
options.env->SetBackgroundThreads(4, rocksdb::Env::Priority::HIGH);
|
options.max_background_flushes = 8;
|
||||||
|
options.env->SetBackgroundThreads(8, rocksdb::Env::Priority::HIGH);
|
||||||
|
|
||||||
auto wopt = rocksdb::WriteOptions();
|
auto wopt = rocksdb::WriteOptions();
|
||||||
wopt.no_slowdown = true;
|
wopt.no_slowdown = true;
|
||||||
@ -177,42 +178,36 @@ int main(int argc, char *argv[]) {
|
|||||||
auto *handle = handles[0];
|
auto *handle = handles[0];
|
||||||
|
|
||||||
if (args.mode == "get" || args.mode == "scan") {
|
if (args.mode == "get" || args.mode == "scan") {
|
||||||
auto *kv = db->BeginTransaction(wopt);
|
std::vector<std::thread> fill_threads;
|
||||||
for (size_t tid = 0; tid < args.threads; ++tid) {
|
for (size_t tid = 0; tid < args.threads; ++tid) {
|
||||||
size_t count = args.iterations / args.threads + (tid < args.iterations % args.threads ? 1 : 0);
|
fill_threads.emplace_back([&, tid] {
|
||||||
for (size_t i = 0; i < count; ++i) {
|
bind_core(tid);
|
||||||
auto key = std::format("key_{}_{}", tid, i);
|
size_t count = args.iterations / args.threads + (tid < args.iterations % args.threads ? 1 : 0);
|
||||||
key.resize(args.key_size, 'x');
|
const size_t batch_size = 10000;
|
||||||
kv->Put(handle, key, val);
|
for (size_t i = 0; i < count; i += batch_size) {
|
||||||
}
|
auto *kv = db->BeginTransaction(wopt);
|
||||||
|
for (size_t j = 0; j < batch_size && (i + j) < count; ++j) {
|
||||||
|
auto key = std::format("key_{}_{}", tid, i + j);
|
||||||
|
key.resize(args.key_size, 'x');
|
||||||
|
kv->Put(handle, key, val);
|
||||||
|
}
|
||||||
|
kv->Commit();
|
||||||
|
delete kv;
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
kv->Commit();
|
for (auto &t: fill_threads)
|
||||||
delete kv;
|
t.join();
|
||||||
|
|
||||||
delete handle;
|
delete handle;
|
||||||
delete db;
|
delete db;
|
||||||
handles.clear();
|
handles.clear();
|
||||||
// re-open db
|
// re-open db
|
||||||
s = rocksdb::OptimisticTransactionDB::Open(options, args.path, cfd, &handles, &db);
|
s = rocksdb::OptimisticTransactionDB::Open(options, args.path, cfd, &handles, &db);
|
||||||
assert(s.ok());
|
assert(s.ok());
|
||||||
|
|
||||||
handle = handles[0];
|
handle = handles[0];
|
||||||
|
|
||||||
std::uniform_int_distribution<size_t> tid_dist(0, args.threads - 1);
|
|
||||||
for (size_t i = 0; i < args.iterations; ++i) {
|
|
||||||
auto tid = tid_dist(gen);
|
|
||||||
size_t count = args.iterations / args.threads + (tid < args.iterations % args.threads ? 1 : 0);
|
|
||||||
std::uniform_int_distribution<size_t> key_dist(0, count - 1);
|
|
||||||
auto idx = key_dist(gen);
|
|
||||||
auto key = std::format("key_{}_{}", tid, idx);
|
|
||||||
key.resize(args.key_size, 'x');
|
|
||||||
auto s = db->Get(rocksdb::ReadOptions(), key, &val);
|
|
||||||
if (!s.ok()) {
|
|
||||||
std::terminate();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
auto *snapshot = db->GetSnapshot();
|
|
||||||
auto base_seed = rd();
|
auto base_seed = rd();
|
||||||
for (size_t tid = 0; tid < args.threads; ++tid) {
|
for (size_t tid = 0; tid < args.threads; ++tid) {
|
||||||
wg.emplace_back([&, tid] {
|
wg.emplace_back([&, tid] {
|
||||||
@ -226,18 +221,22 @@ int main(int argc, char *argv[]) {
|
|||||||
ropt.iterate_upper_bound = &upper_bound_slice;
|
ropt.iterate_upper_bound = &upper_bound_slice;
|
||||||
}
|
}
|
||||||
ropt.prefix_same_as_start = true;
|
ropt.prefix_same_as_start = true;
|
||||||
ropt.snapshot = snapshot;
|
|
||||||
size_t round = 0;
|
size_t round = 0;
|
||||||
std::mt19937 mixed_gen(static_cast<uint32_t>(base_seed) ^ static_cast<uint32_t>(0x9e3779b9U * (tid + 1)));
|
std::mt19937 thread_gen(static_cast<uint32_t>(base_seed) ^ static_cast<uint32_t>(tid));
|
||||||
std::uniform_int_distribution<int> mixed_dist(0, 99);
|
std::uniform_int_distribution<int> mixed_dist(0, 99);
|
||||||
|
|
||||||
size_t key_count = args.iterations / args.threads + (tid < args.iterations % args.threads ? 1 : 0);
|
size_t key_count = args.iterations / args.threads + (tid < args.iterations % args.threads ? 1 : 0);
|
||||||
|
std::vector<size_t> indices(key_count);
|
||||||
|
std::iota(indices.begin(), indices.end(), 0);
|
||||||
|
if (args.random) {
|
||||||
|
std::shuffle(indices.begin(), indices.end(), thread_gen);
|
||||||
|
}
|
||||||
|
|
||||||
ready_barrier.arrive_and_wait();
|
ready_barrier.arrive_and_wait();
|
||||||
start_barrier.arrive_and_wait();
|
start_barrier.arrive_and_wait();
|
||||||
|
|
||||||
if (args.mode == "insert") {
|
if (args.mode == "insert") {
|
||||||
for (size_t i = 0; i < key_count; ++i) {
|
for (size_t i: indices) {
|
||||||
auto key = std::format("key_{}_{}", tid, i);
|
auto key = std::format("key_{}_{}", tid, i);
|
||||||
key.resize(args.key_size, 'x');
|
key.resize(args.key_size, 'x');
|
||||||
round += 1;
|
round += 1;
|
||||||
@ -246,9 +245,8 @@ int main(int argc, char *argv[]) {
|
|||||||
kv->Commit();
|
kv->Commit();
|
||||||
delete kv;
|
delete kv;
|
||||||
}
|
}
|
||||||
|
|
||||||
} else if (args.mode == "get") {
|
} else if (args.mode == "get") {
|
||||||
for (size_t i = 0; i < key_count; ++i) {
|
for (size_t i: indices) {
|
||||||
auto key = std::format("key_{}_{}", tid, i);
|
auto key = std::format("key_{}_{}", tid, i);
|
||||||
key.resize(args.key_size, 'x');
|
key.resize(args.key_size, 'x');
|
||||||
round += 1;
|
round += 1;
|
||||||
@ -258,11 +256,11 @@ int main(int argc, char *argv[]) {
|
|||||||
delete kv;
|
delete kv;
|
||||||
}
|
}
|
||||||
} else if (args.mode == "mixed") {
|
} else if (args.mode == "mixed") {
|
||||||
for (size_t i = 0; i < key_count; ++i) {
|
for (size_t i: indices) {
|
||||||
auto key = std::format("key_{}_{}", tid, i);
|
auto key = std::format("key_{}_{}", tid, i);
|
||||||
key.resize(args.key_size, 'x');
|
key.resize(args.key_size, 'x');
|
||||||
round += 1;
|
round += 1;
|
||||||
auto is_insert = mixed_dist(mixed_gen) < static_cast<int>(args.insert_ratio);
|
auto is_insert = mixed_dist(thread_gen) < static_cast<int>(args.insert_ratio);
|
||||||
auto *kv = db->BeginTransaction(wopt);
|
auto *kv = db->BeginTransaction(wopt);
|
||||||
if (is_insert) {
|
if (is_insert) {
|
||||||
kv->Put(handle, key, val);
|
kv->Put(handle, key, val);
|
||||||
@ -275,7 +273,6 @@ int main(int argc, char *argv[]) {
|
|||||||
} else if (args.mode == "scan") {
|
} else if (args.mode == "scan") {
|
||||||
auto *iter = db->NewIterator(ropt);
|
auto *iter = db->NewIterator(ropt);
|
||||||
iter->Seek(prefix);
|
iter->Seek(prefix);
|
||||||
size_t n = 0;
|
|
||||||
while (iter->Valid()) {
|
while (iter->Valid()) {
|
||||||
round += 1;
|
round += 1;
|
||||||
auto k = iter->key();
|
auto k = iter->key();
|
||||||
@ -283,7 +280,6 @@ int main(int argc, char *argv[]) {
|
|||||||
black_box(k);
|
black_box(k);
|
||||||
black_box(v);
|
black_box(v);
|
||||||
iter->Next();
|
iter->Next();
|
||||||
n += 1;
|
|
||||||
}
|
}
|
||||||
delete iter;
|
delete iter;
|
||||||
}
|
}
|
||||||
@ -313,7 +309,6 @@ int main(int argc, char *argv[]) {
|
|||||||
}
|
}
|
||||||
fmt::println("{},{},{},{},{},{},{}", args.mode, args.threads, args.key_size, args.value_size, ratio, (uint64_t) ops,
|
fmt::println("{},{},{},{},{},{},{}", args.mode, args.threads, args.key_size, args.value_size, ratio, (uint64_t) ops,
|
||||||
(uint64_t) b.elapse_ms());
|
(uint64_t) b.elapse_ms());
|
||||||
db->ReleaseSnapshot(snapshot);
|
|
||||||
delete handle;
|
delete handle;
|
||||||
delete db;
|
delete db;
|
||||||
std::filesystem::remove_all(args.path);
|
std::filesystem::remove_all(args.path);
|
||||||
|
|||||||
@ -48,7 +48,7 @@ function samples() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
echo mode,threads,key_size,value_size,insert_ratio,ops,elasped > "${script_dir}/mace.csv"
|
echo mode,threads,key_size,value_size,insert_ratio,ops,elasped > "${script_dir}/mace.csv"
|
||||||
samples "$1" 2>> "${script_dir}/mace.csv"
|
samples "$1" 1>> "${script_dir}/mace.csv"
|
||||||
if [ -x "${script_dir}/bin/python" ]; then
|
if [ -x "${script_dir}/bin/python" ]; then
|
||||||
(cd "${script_dir}" && "${script_dir}/bin/python" plot.py mace.csv)
|
(cd "${script_dir}" && "${script_dir}/bin/python" plot.py mace.csv)
|
||||||
else
|
else
|
||||||
|
|||||||
71
src/main.rs
71
src/main.rs
@ -4,7 +4,6 @@ use logger::Logger;
|
|||||||
use mace::{Mace, Options};
|
use mace::{Mace, Options};
|
||||||
#[cfg(feature = "custom_alloc")]
|
#[cfg(feature = "custom_alloc")]
|
||||||
use myalloc::{MyAlloc, print_filtered_trace};
|
use myalloc::{MyAlloc, print_filtered_trace};
|
||||||
use rand::prelude::*;
|
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
use std::process::exit;
|
use std::process::exit;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
@ -85,17 +84,17 @@ fn main() {
|
|||||||
eprintln!("Error: Insert ratio must be between 0 and 100");
|
eprintln!("Error: Insert ratio must be between 0 and 100");
|
||||||
exit(1);
|
exit(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut opt = Options::new(path);
|
let mut opt = Options::new(path);
|
||||||
opt.sync_on_write = false;
|
opt.sync_on_write = false;
|
||||||
opt.over_provision = false;
|
|
||||||
opt.inline_size = args.blob_size;
|
opt.inline_size = args.blob_size;
|
||||||
opt.tmp_store = args.mode != "get" && args.mode != "scan";
|
opt.tmp_store = args.mode != "get" && args.mode != "scan";
|
||||||
opt.cache_capacity = 3 << 30;
|
opt.cache_capacity = 3 << 30;
|
||||||
opt.data_file_size = 64 << 20;
|
opt.data_file_size = 64 << 20;
|
||||||
opt.max_log_size = 1 << 30;
|
opt.max_log_size = 1 << 30;
|
||||||
|
opt.default_arenas = 128;
|
||||||
let mut saved = opt.clone();
|
let mut saved = opt.clone();
|
||||||
saved.tmp_store = false;
|
|
||||||
|
saved.tmp_store = true;
|
||||||
let mut db = Mace::new(opt.validate().unwrap()).unwrap();
|
let mut db = Mace::new(opt.validate().unwrap()).unwrap();
|
||||||
db.disable_gc();
|
db.disable_gc();
|
||||||
let mut bkt = db.new_bucket("default").unwrap();
|
let mut bkt = db.new_bucket("default").unwrap();
|
||||||
@ -103,42 +102,41 @@ fn main() {
|
|||||||
let value = Arc::new(vec![b'0'; args.value_size]);
|
let value = Arc::new(vec![b'0'; args.value_size]);
|
||||||
|
|
||||||
if args.mode == "get" || args.mode == "scan" {
|
if args.mode == "get" || args.mode == "scan" {
|
||||||
let pre_tx = bkt.begin().unwrap();
|
let mut fill_handles = vec![];
|
||||||
for tid in 0..args.threads {
|
for tid in 0..args.threads {
|
||||||
let count = args.iterations / args.threads
|
let bkt_clone = bkt.clone();
|
||||||
|
let val_clone = value.clone();
|
||||||
|
let key_count = args.iterations / args.threads
|
||||||
+ if tid < args.iterations % args.threads {
|
+ if tid < args.iterations % args.threads {
|
||||||
1
|
1
|
||||||
} else {
|
} else {
|
||||||
0
|
0
|
||||||
};
|
};
|
||||||
for i in 0..count {
|
let key_size = args.key_size;
|
||||||
let mut key = format!("key_{tid}_{i}").into_bytes();
|
fill_handles.push(std::thread::spawn(move || {
|
||||||
key.resize(args.key_size, b'x');
|
coreid::bind_core(tid);
|
||||||
pre_tx.put(&key, &*value).unwrap();
|
const BATCH_SIZE: usize = 10000;
|
||||||
}
|
for i in (0..key_count).step_by(BATCH_SIZE) {
|
||||||
|
let tx = bkt_clone.begin().unwrap();
|
||||||
|
for j in 0..BATCH_SIZE {
|
||||||
|
if i + j >= key_count {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
let mut key = format!("key_{tid}_{}", i + j).into_bytes();
|
||||||
|
key.resize(key_size, b'x');
|
||||||
|
tx.put(&key, &*val_clone).unwrap();
|
||||||
|
}
|
||||||
|
tx.commit().unwrap();
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
for h in fill_handles {
|
||||||
|
h.join().unwrap();
|
||||||
}
|
}
|
||||||
pre_tx.commit().unwrap();
|
|
||||||
drop(bkt);
|
drop(bkt);
|
||||||
drop(db);
|
drop(db);
|
||||||
saved.tmp_store = true;
|
|
||||||
db = Mace::new(saved.validate().unwrap()).unwrap();
|
db = Mace::new(saved.validate().unwrap()).unwrap();
|
||||||
bkt = db.get_bucket("default").unwrap();
|
bkt = db.get_bucket("default").unwrap();
|
||||||
|
|
||||||
let mut rng = rand::rng();
|
|
||||||
for _ in 0..args.iterations {
|
|
||||||
let tid = rng.random_range(0..args.threads);
|
|
||||||
let count = args.iterations / args.threads
|
|
||||||
+ if tid < args.iterations % args.threads {
|
|
||||||
1
|
|
||||||
} else {
|
|
||||||
0
|
|
||||||
};
|
|
||||||
let idx = rng.random_range(0..count);
|
|
||||||
let mut key = format!("key_{tid}_{idx}").into_bytes();
|
|
||||||
key.resize(args.key_size, b'x');
|
|
||||||
let view = bkt.view().unwrap();
|
|
||||||
view.get(&key).unwrap();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut key_counts = vec![args.iterations / args.threads; args.threads];
|
let mut key_counts = vec![args.iterations / args.threads; args.threads];
|
||||||
@ -162,15 +160,22 @@ fn main() {
|
|||||||
let key_count = key_counts[tid];
|
let key_count = key_counts[tid];
|
||||||
let key_size = args.key_size;
|
let key_size = args.key_size;
|
||||||
let prefix = format!("key_{tid}_");
|
let prefix = format!("key_{tid}_");
|
||||||
|
let is_random = args.random;
|
||||||
|
|
||||||
std::thread::spawn(move || {
|
std::thread::spawn(move || {
|
||||||
coreid::bind_core(tid);
|
coreid::bind_core(tid);
|
||||||
let mut round = 0;
|
let mut round = 0;
|
||||||
|
let mut indices: Vec<usize> = (0..key_count).collect();
|
||||||
|
if is_random {
|
||||||
|
use rand::seq::SliceRandom;
|
||||||
|
indices.shuffle(&mut rand::rng());
|
||||||
|
}
|
||||||
|
|
||||||
ready_barrier.wait();
|
ready_barrier.wait();
|
||||||
start_barrier.wait();
|
start_barrier.wait();
|
||||||
match mode.as_str() {
|
match mode.as_str() {
|
||||||
"insert" => {
|
"insert" => {
|
||||||
for i in 0..key_count {
|
for i in indices {
|
||||||
let mut key = format!("key_{tid}_{i}").into_bytes();
|
let mut key = format!("key_{tid}_{i}").into_bytes();
|
||||||
key.resize(key_size, b'x');
|
key.resize(key_size, b'x');
|
||||||
round += 1;
|
round += 1;
|
||||||
@ -180,7 +185,7 @@ fn main() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
"get" => {
|
"get" => {
|
||||||
for i in 0..key_count {
|
for i in indices {
|
||||||
let mut key = format!("key_{tid}_{i}").into_bytes();
|
let mut key = format!("key_{tid}_{i}").into_bytes();
|
||||||
key.resize(key_size, b'x');
|
key.resize(key_size, b'x');
|
||||||
round += 1;
|
round += 1;
|
||||||
@ -190,7 +195,7 @@ fn main() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
"mixed" => {
|
"mixed" => {
|
||||||
for i in 0..key_count {
|
for i in indices {
|
||||||
let mut key = format!("key_{tid}_{i}").into_bytes();
|
let mut key = format!("key_{tid}_{i}").into_bytes();
|
||||||
key.resize(key_size, b'x');
|
key.resize(key_size, b'x');
|
||||||
let is_insert = rand::random_range(0..100) < insert_ratio;
|
let is_insert = rand::random_range(0..100) < insert_ratio;
|
||||||
@ -250,7 +255,7 @@ fn main() {
|
|||||||
mode = "sequential_insert".into();
|
mode = "sequential_insert".into();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
eprintln!(
|
println!(
|
||||||
"{},{},{},{},{},{},{}",
|
"{},{},{},{},{},{},{}",
|
||||||
mode,
|
mode,
|
||||||
args.threads,
|
args.threads,
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user