From 80b4745118ed8de70f3d79f8318b2d7c2a2e5199 Mon Sep 17 00:00:00 2001 From: abbycin Date: Tue, 3 Mar 2026 13:00:01 +0800 Subject: [PATCH] align rocksdb and mace --- rocksdb/main.cpp | 75 ++++++++++++++++++++++-------------------------- scripts/mace.sh | 2 +- src/main.rs | 71 ++++++++++++++++++++++++--------------------- 3 files changed, 74 insertions(+), 74 deletions(-) diff --git a/rocksdb/main.cpp b/rocksdb/main.cpp index 2b97e3e..05e0272 100644 --- a/rocksdb/main.cpp +++ b/rocksdb/main.cpp @@ -135,11 +135,11 @@ int main(int argc, char *argv[]) { cfo.min_blob_size = args.blob_size; cfo.disable_auto_compactions = true; cfo.max_compaction_bytes = (1ULL << 60); - cfo.level0_stop_writes_trigger = 100000; - cfo.level0_slowdown_writes_trigger = 100000; - cfo.level0_file_num_compaction_trigger = 100000; + cfo.level0_stop_writes_trigger = 1000000; + cfo.level0_slowdown_writes_trigger = 1000000; + cfo.level0_file_num_compaction_trigger = 1000000; cfo.write_buffer_size = 64 << 20; - cfo.max_write_buffer_number = 64; + cfo.max_write_buffer_number = 128; // use 3GB block cache auto cache = rocksdb::NewLRUCache(3 << 30); @@ -154,7 +154,8 @@ int main(int argc, char *argv[]) { options.create_if_missing = true; options.allow_concurrent_memtable_write = true; options.enable_pipelined_write = true; - options.env->SetBackgroundThreads(4, rocksdb::Env::Priority::HIGH); + options.max_background_flushes = 8; + options.env->SetBackgroundThreads(8, rocksdb::Env::Priority::HIGH); auto wopt = rocksdb::WriteOptions(); wopt.no_slowdown = true; @@ -177,42 +178,36 @@ int main(int argc, char *argv[]) { auto *handle = handles[0]; if (args.mode == "get" || args.mode == "scan") { - auto *kv = db->BeginTransaction(wopt); + std::vector fill_threads; for (size_t tid = 0; tid < args.threads; ++tid) { - size_t count = args.iterations / args.threads + (tid < args.iterations % args.threads ? 1 : 0); - for (size_t i = 0; i < count; ++i) { - auto key = std::format("key_{}_{}", tid, i); - key.resize(args.key_size, 'x'); - kv->Put(handle, key, val); - } + fill_threads.emplace_back([&, tid] { + bind_core(tid); + size_t count = args.iterations / args.threads + (tid < args.iterations % args.threads ? 1 : 0); + const size_t batch_size = 10000; + for (size_t i = 0; i < count; i += batch_size) { + auto *kv = db->BeginTransaction(wopt); + for (size_t j = 0; j < batch_size && (i + j) < count; ++j) { + auto key = std::format("key_{}_{}", tid, i + j); + key.resize(args.key_size, 'x'); + kv->Put(handle, key, val); + } + kv->Commit(); + delete kv; + } + }); } - kv->Commit(); - delete kv; + for (auto &t: fill_threads) + t.join(); + delete handle; delete db; handles.clear(); // re-open db s = rocksdb::OptimisticTransactionDB::Open(options, args.path, cfd, &handles, &db); assert(s.ok()); - handle = handles[0]; - - std::uniform_int_distribution tid_dist(0, args.threads - 1); - for (size_t i = 0; i < args.iterations; ++i) { - auto tid = tid_dist(gen); - size_t count = args.iterations / args.threads + (tid < args.iterations % args.threads ? 1 : 0); - std::uniform_int_distribution key_dist(0, count - 1); - auto idx = key_dist(gen); - auto key = std::format("key_{}_{}", tid, idx); - key.resize(args.key_size, 'x'); - auto s = db->Get(rocksdb::ReadOptions(), key, &val); - if (!s.ok()) { - std::terminate(); - } - } } - auto *snapshot = db->GetSnapshot(); auto base_seed = rd(); for (size_t tid = 0; tid < args.threads; ++tid) { wg.emplace_back([&, tid] { @@ -226,18 +221,22 @@ int main(int argc, char *argv[]) { ropt.iterate_upper_bound = &upper_bound_slice; } ropt.prefix_same_as_start = true; - ropt.snapshot = snapshot; size_t round = 0; - std::mt19937 mixed_gen(static_cast(base_seed) ^ static_cast(0x9e3779b9U * (tid + 1))); + std::mt19937 thread_gen(static_cast(base_seed) ^ static_cast(tid)); std::uniform_int_distribution mixed_dist(0, 99); size_t key_count = args.iterations / args.threads + (tid < args.iterations % args.threads ? 1 : 0); + std::vector indices(key_count); + std::iota(indices.begin(), indices.end(), 0); + if (args.random) { + std::shuffle(indices.begin(), indices.end(), thread_gen); + } ready_barrier.arrive_and_wait(); start_barrier.arrive_and_wait(); if (args.mode == "insert") { - for (size_t i = 0; i < key_count; ++i) { + for (size_t i: indices) { auto key = std::format("key_{}_{}", tid, i); key.resize(args.key_size, 'x'); round += 1; @@ -246,9 +245,8 @@ int main(int argc, char *argv[]) { kv->Commit(); delete kv; } - } else if (args.mode == "get") { - for (size_t i = 0; i < key_count; ++i) { + for (size_t i: indices) { auto key = std::format("key_{}_{}", tid, i); key.resize(args.key_size, 'x'); round += 1; @@ -258,11 +256,11 @@ int main(int argc, char *argv[]) { delete kv; } } else if (args.mode == "mixed") { - for (size_t i = 0; i < key_count; ++i) { + for (size_t i: indices) { auto key = std::format("key_{}_{}", tid, i); key.resize(args.key_size, 'x'); round += 1; - auto is_insert = mixed_dist(mixed_gen) < static_cast(args.insert_ratio); + auto is_insert = mixed_dist(thread_gen) < static_cast(args.insert_ratio); auto *kv = db->BeginTransaction(wopt); if (is_insert) { kv->Put(handle, key, val); @@ -275,7 +273,6 @@ int main(int argc, char *argv[]) { } else if (args.mode == "scan") { auto *iter = db->NewIterator(ropt); iter->Seek(prefix); - size_t n = 0; while (iter->Valid()) { round += 1; auto k = iter->key(); @@ -283,7 +280,6 @@ int main(int argc, char *argv[]) { black_box(k); black_box(v); iter->Next(); - n += 1; } delete iter; } @@ -313,7 +309,6 @@ int main(int argc, char *argv[]) { } fmt::println("{},{},{},{},{},{},{}", args.mode, args.threads, args.key_size, args.value_size, ratio, (uint64_t) ops, (uint64_t) b.elapse_ms()); - db->ReleaseSnapshot(snapshot); delete handle; delete db; std::filesystem::remove_all(args.path); diff --git a/scripts/mace.sh b/scripts/mace.sh index 07b4d19..1c5ab77 100755 --- a/scripts/mace.sh +++ b/scripts/mace.sh @@ -48,7 +48,7 @@ function samples() { } echo mode,threads,key_size,value_size,insert_ratio,ops,elasped > "${script_dir}/mace.csv" -samples "$1" 2>> "${script_dir}/mace.csv" +samples "$1" 1>> "${script_dir}/mace.csv" if [ -x "${script_dir}/bin/python" ]; then (cd "${script_dir}" && "${script_dir}/bin/python" plot.py mace.csv) else diff --git a/src/main.rs b/src/main.rs index ef124cd..1b211f4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,7 +4,6 @@ use logger::Logger; use mace::{Mace, Options}; #[cfg(feature = "custom_alloc")] use myalloc::{MyAlloc, print_filtered_trace}; -use rand::prelude::*; use std::path::Path; use std::process::exit; use std::sync::Arc; @@ -85,17 +84,17 @@ fn main() { eprintln!("Error: Insert ratio must be between 0 and 100"); exit(1); } - let mut opt = Options::new(path); opt.sync_on_write = false; - opt.over_provision = false; opt.inline_size = args.blob_size; opt.tmp_store = args.mode != "get" && args.mode != "scan"; opt.cache_capacity = 3 << 30; opt.data_file_size = 64 << 20; opt.max_log_size = 1 << 30; + opt.default_arenas = 128; let mut saved = opt.clone(); - saved.tmp_store = false; + + saved.tmp_store = true; let mut db = Mace::new(opt.validate().unwrap()).unwrap(); db.disable_gc(); let mut bkt = db.new_bucket("default").unwrap(); @@ -103,42 +102,41 @@ fn main() { let value = Arc::new(vec![b'0'; args.value_size]); if args.mode == "get" || args.mode == "scan" { - let pre_tx = bkt.begin().unwrap(); + let mut fill_handles = vec![]; for tid in 0..args.threads { - let count = args.iterations / args.threads + let bkt_clone = bkt.clone(); + let val_clone = value.clone(); + let key_count = args.iterations / args.threads + if tid < args.iterations % args.threads { 1 } else { 0 }; - for i in 0..count { - let mut key = format!("key_{tid}_{i}").into_bytes(); - key.resize(args.key_size, b'x'); - pre_tx.put(&key, &*value).unwrap(); - } + let key_size = args.key_size; + fill_handles.push(std::thread::spawn(move || { + coreid::bind_core(tid); + const BATCH_SIZE: usize = 10000; + for i in (0..key_count).step_by(BATCH_SIZE) { + let tx = bkt_clone.begin().unwrap(); + for j in 0..BATCH_SIZE { + if i + j >= key_count { + break; + } + let mut key = format!("key_{tid}_{}", i + j).into_bytes(); + key.resize(key_size, b'x'); + tx.put(&key, &*val_clone).unwrap(); + } + tx.commit().unwrap(); + } + })); + } + for h in fill_handles { + h.join().unwrap(); } - pre_tx.commit().unwrap(); drop(bkt); drop(db); - saved.tmp_store = true; db = Mace::new(saved.validate().unwrap()).unwrap(); bkt = db.get_bucket("default").unwrap(); - - let mut rng = rand::rng(); - for _ in 0..args.iterations { - let tid = rng.random_range(0..args.threads); - let count = args.iterations / args.threads - + if tid < args.iterations % args.threads { - 1 - } else { - 0 - }; - let idx = rng.random_range(0..count); - let mut key = format!("key_{tid}_{idx}").into_bytes(); - key.resize(args.key_size, b'x'); - let view = bkt.view().unwrap(); - view.get(&key).unwrap(); - } } let mut key_counts = vec![args.iterations / args.threads; args.threads]; @@ -162,15 +160,22 @@ fn main() { let key_count = key_counts[tid]; let key_size = args.key_size; let prefix = format!("key_{tid}_"); + let is_random = args.random; std::thread::spawn(move || { coreid::bind_core(tid); let mut round = 0; + let mut indices: Vec = (0..key_count).collect(); + if is_random { + use rand::seq::SliceRandom; + indices.shuffle(&mut rand::rng()); + } + ready_barrier.wait(); start_barrier.wait(); match mode.as_str() { "insert" => { - for i in 0..key_count { + for i in indices { let mut key = format!("key_{tid}_{i}").into_bytes(); key.resize(key_size, b'x'); round += 1; @@ -180,7 +185,7 @@ fn main() { } } "get" => { - for i in 0..key_count { + for i in indices { let mut key = format!("key_{tid}_{i}").into_bytes(); key.resize(key_size, b'x'); round += 1; @@ -190,7 +195,7 @@ fn main() { } } "mixed" => { - for i in 0..key_count { + for i in indices { let mut key = format!("key_{tid}_{i}").into_bytes(); key.resize(key_size, b'x'); let is_insert = rand::random_range(0..100) < insert_ratio; @@ -250,7 +255,7 @@ fn main() { mode = "sequential_insert".into(); } } - eprintln!( + println!( "{},{},{},{},{},{},{}", mode, args.threads,