diff --git a/Cargo.toml b/Cargo.toml index c67cb96..d5a492a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,7 @@ version = "0.1.0" edition = "2024" [dependencies] -mace-kv = "0.0.25" +mace-kv = "0.0.27" clap = { version = "4.5.48", features = ["derive"] } rand = "0.9.2" log = "0.4.22" diff --git a/README.md b/README.md index 0d6598b..43c5ec7 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,4 @@ -# mace 0.0.24 vs rocksdb 10.4.2 - -**mace 0.0.24 traded a slight dip in query performance for a 10+% boost in insertion performance.** +# mace 0.0.27 vs rocksdb 10.4.2 ## sequential insert ![mace_sequential_insert](./scripts/mace_sequential_insert.png) diff --git a/rocksdb/main.cpp b/rocksdb/main.cpp index 01f2ca6..b683aa9 100644 --- a/rocksdb/main.cpp +++ b/rocksdb/main.cpp @@ -22,6 +22,10 @@ #include #include +#include +#include +#include + #include "CLI/CLI.hpp" #include "instant.h" @@ -30,6 +34,19 @@ static void black_box(const T &t) { asm volatile("" ::"m"(t) : "memory"); } +static size_t cores_online() { + auto n = ::sysconf(_SC_NPROCESSORS_ONLN); + return n > 0 ? static_cast(n) : 1; +} + +static void bind_core(size_t tid) { + cpu_set_t set; + CPU_ZERO(&set); + auto core = static_cast(tid % cores_online()); + CPU_SET(core, &set); + (void) pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &set); +} + struct Args { size_t threads; size_t iterations; @@ -59,11 +76,11 @@ int main(int argc, char *argv[]) { app.add_option("-t,--threads", args.threads, "Threads"); app.add_option("-k,--key-size", args.key_size, "Key Size"); app.add_option("-v,--value-size", args.value_size, "Value Size"); - app.add_option("-b,--blob-size", args.value_size, "Blob Size"); + app.add_option("-b,--blob-size", args.blob_size, "Blob Size"); app.add_option("-i,--iterations", args.iterations, "Iterations"); app.add_option("-r,--insert-ratio", args.insert_ratio, "Insert Ratio for mixed mode"); app.add_option("-p,--path", args.path, "DataBase Home"); - app.add_option("--random", args.random, "Shuffle insert keys"); + app.add_flag("--random", args.random, "Shuffle insert keys"); CLI11_PARSE(app, argc, argv); @@ -77,6 +94,11 @@ int main(int argc, char *argv[]) { return 1; } + if (args.threads == 0) { + fmt::println("Error: threads must be greater than 0"); + return 1; + } + if (args.mode != "insert" && args.mode != "get" && args.mode != "mixed" && args.mode != "scan") { fmt::println("Error: Invalid mode"); return 1; @@ -142,27 +164,31 @@ int main(int argc, char *argv[]) { std::atomic total_op{0}; rocksdb::OptimisticTransactionDB *db; auto b = nm::Instant::now(); - std::mutex mtx{}; std::vector handles{}; auto s = rocksdb::OptimisticTransactionDB::Open(options, args.path, cfd, &handles, &db); assert(s.ok()); - std::barrier barrier{static_cast(args.threads)}; + std::barrier ready_barrier{static_cast(args.threads + 1)}; + std::barrier start_barrier{static_cast(args.threads + 1)}; std::random_device rd{}; std::mt19937 gen(rd()); - std::uniform_int_distribution dist(0, 100); std::string val(args.value_size, 'x'); - auto keys_per_thread = args.iterations / args.threads; + std::vector key_counts(args.threads, args.iterations / args.threads); + for (size_t i = 0; i < args.iterations % args.threads; ++i) { + key_counts[i] += 1; + } + keys.reserve(args.threads); for (size_t tid = 0; tid < args.threads; ++tid) { std::vector key{}; - for (size_t i = 0; i < keys_per_thread; ++i) { + key.reserve(key_counts[tid]); + for (size_t i = 0; i < key_counts[tid]; ++i) { auto tmp = std::format("key_{}_{}", tid, i); tmp.resize(args.key_size, 'x'); key.emplace_back(std::move(tmp)); } if (args.mode == "get" || args.random) { - std::shuffle(keys.begin(), keys.end(), gen); + std::shuffle(key.begin(), key.end(), gen); } keys.emplace_back(std::move(key)); } @@ -189,11 +215,14 @@ int main(int argc, char *argv[]) { handle = handles[0]; // simulate common use cases - std::uniform_int_distribution dist(0, args.threads - 1); - for (size_t i = 0; i < keys_per_thread; ++i) { - auto tid = dist(gen); - auto k = std::format("key_{}_{}", tid, i); - k.resize(args.key_size, 'x'); + std::uniform_int_distribution tid_dist(0, args.threads - 1); + for (size_t i = 0; i < args.iterations; ++i) { + auto tid = tid_dist(gen); + if (keys[tid].empty()) { + continue; + } + std::uniform_int_distribution key_dist(0, keys[tid].size() - 1); + const auto &k = keys[tid][key_dist(gen)]; auto s = db->Get(rocksdb::ReadOptions(), k, &val); if (!s.ok()) { std::terminate(); @@ -202,10 +231,12 @@ int main(int argc, char *argv[]) { } auto *snapshot = db->GetSnapshot(); + auto base_seed = rd(); for (size_t tid = 0; tid < args.threads; ++tid) { wg.emplace_back([&, tid] { + bind_core(tid); std::string rval(args.value_size, '0'); - auto prefix = std::format("key_{}", tid); + auto prefix = std::format("key_{}_", tid); auto ropt = rocksdb::ReadOptions(); auto upper_bound = find_upper_bound(prefix); auto upper_bound_slice = rocksdb::Slice(upper_bound); @@ -216,12 +247,11 @@ int main(int argc, char *argv[]) { ropt.prefix_same_as_start = true; ropt.snapshot = snapshot; size_t round = 0; + std::mt19937 mixed_gen(static_cast(base_seed) ^ static_cast(0x9e3779b9U * (tid + 1))); + std::uniform_int_distribution mixed_dist(0, 99); - barrier.arrive_and_wait(); - if (mtx.try_lock()) { - b = nm::Instant::now(); - mtx.unlock(); - } + ready_barrier.arrive_and_wait(); + start_barrier.arrive_and_wait(); if (args.mode == "insert") { for (auto &key: *tk) { @@ -243,7 +273,7 @@ int main(int argc, char *argv[]) { } else if (args.mode == "mixed") { for (auto &key: *tk) { round += 1; - auto is_insert = dist(gen) < args.insert_ratio; + auto is_insert = mixed_dist(mixed_gen) < static_cast(args.insert_ratio); auto *kv = db->BeginTransaction(wopt); if (is_insert) { kv->Put(handle, key, val); @@ -273,6 +303,10 @@ int main(int argc, char *argv[]) { }); } + ready_barrier.arrive_and_wait(); + b = nm::Instant::now(); + start_barrier.arrive_and_wait(); + for (auto &w: wg) { w.join(); } diff --git a/scripts/mace.sh b/scripts/mace.sh index 6cb0a8c..07b4d19 100755 --- a/scripts/mace.sh +++ b/scripts/mace.sh @@ -1,21 +1,24 @@ #!/usr/bin/env bash +set -euo pipefail + if [ "$#" -ne 1 ] then printf "\033[m$0 path\033[0m\n" exit 1 fi -pushd . -cd .. -cargo build --release 1>/dev/null 2> /dev/null +script_dir="$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" && pwd)" +root_dir="$(cd -- "${script_dir}/.." && pwd)" + +cargo build --release --manifest-path "${root_dir}/Cargo.toml" 1>/dev/null 2>/dev/null function samples() { export RUST_BACKTRACE=full kv_sz=(16 16 100 1024 1024 1024 16 10240) mode=(insert get mixed scan) # set -x - db_root=$1 + db_root="$1" cnt=100000 for ((i = 1; i <= $(nproc); i *= 2)) @@ -26,14 +29,14 @@ function samples() { do if [ "${mode[k]}" == "insert" ] then - ./target/release/kv_bench --path $db_root --threads $i --iterations $cnt --mode ${mode[k]} --key-size ${kv_sz[j]} --value-size ${kv_sz[j+1]} --random + "${root_dir}/target/release/kv_bench" --path "${db_root}" --threads "${i}" --iterations "${cnt}" --mode "${mode[k]}" --key-size "${kv_sz[j]}" --value-size "${kv_sz[j+1]}" --random if test $? -ne 0 then echo "${mode[k]} threads $i ksz ${kv_sz[j]} vsz ${kv_sz[j+1]} random fail" exit 1 fi fi - ./target/release/kv_bench --path $db_root --threads $i --iterations $cnt --mode ${mode[k]} --key-size ${kv_sz[j]} --value-size ${kv_sz[j+1]} + "${root_dir}/target/release/kv_bench" --path "${db_root}" --threads "${i}" --iterations "${cnt}" --mode "${mode[k]}" --key-size "${kv_sz[j]}" --value-size "${kv_sz[j+1]}" if test $? -ne 0 then echo "${mode[k]} threads $i ksz ${kv_sz[j]} vsz ${kv_sz[j+1]} fail" @@ -44,7 +47,10 @@ function samples() { done } -echo mode,threads,key_size,value_size,insert_ratio,ops,elasped > scripts/mace.csv -samples $1 2>> scripts/mace.csv -popd -./bin/python plot.py mace.csv +echo mode,threads,key_size,value_size,insert_ratio,ops,elasped > "${script_dir}/mace.csv" +samples "$1" 2>> "${script_dir}/mace.csv" +if [ -x "${script_dir}/bin/python" ]; then + (cd "${script_dir}" && "${script_dir}/bin/python" plot.py mace.csv) +else + (cd "${script_dir}" && python3 plot.py mace.csv) +fi diff --git a/scripts/mace_get.png b/scripts/mace_get.png index 4348333..20c409b 100644 Binary files a/scripts/mace_get.png and b/scripts/mace_get.png differ diff --git a/scripts/mace_mixed.png b/scripts/mace_mixed.png index f609baa..e3bad3e 100644 Binary files a/scripts/mace_mixed.png and b/scripts/mace_mixed.png differ diff --git a/scripts/mace_random_insert.png b/scripts/mace_random_insert.png index e82473d..9ef9fa6 100644 Binary files a/scripts/mace_random_insert.png and b/scripts/mace_random_insert.png differ diff --git a/scripts/mace_scan.png b/scripts/mace_scan.png index 877c962..914e631 100644 Binary files a/scripts/mace_scan.png and b/scripts/mace_scan.png differ diff --git a/scripts/mace_sequential_insert.png b/scripts/mace_sequential_insert.png index 63bdc84..158e0bf 100644 Binary files a/scripts/mace_sequential_insert.png and b/scripts/mace_sequential_insert.png differ diff --git a/scripts/rocksdb.sh b/scripts/rocksdb.sh index fb3031a..068fc13 100755 --- a/scripts/rocksdb.sh +++ b/scripts/rocksdb.sh @@ -1,21 +1,25 @@ #!/usr/bin/env bash +set -euo pipefail + if [ "$#" -ne 1 ] then printf "\033[m$0 path\033[0m\n" exit 1 fi -pushd . -cd ../rocksdb -cmake --preset release 1>/dev/null 2>/dev/null -cmake --build --preset release 1>/dev/null 2>/dev/null +script_dir="$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" && pwd)" +root_dir="$(cd -- "${script_dir}/.." && pwd)" +rocksdb_dir="${root_dir}/rocksdb" + +(cd "${rocksdb_dir}" && cmake --preset release 1>/dev/null 2>/dev/null) +(cd "${rocksdb_dir}" && cmake --build --preset release 1>/dev/null 2>/dev/null) function samples() { kv_sz=(16 16 100 1024 1024 1024 16 10240) mode=(insert get mixed scan) # set -x - db_root=$1 + db_root="$1" cnt=100000 for ((i = 1; i <= $(nproc); i *= 2)) do @@ -25,14 +29,14 @@ function samples() { do if [ "${mode[k]}" == "insert" ] then - ./build/release/rocksdb_bench --path $db_root --threads $i --iterations $cnt --mode ${mode[k]} --key-size ${kv_sz[j]} --value-size ${kv_sz[j+1]} --random 1 + "${rocksdb_dir}/build/release/rocksdb_bench" --path "${db_root}" --threads "${i}" --iterations "${cnt}" --mode "${mode[k]}" --key-size "${kv_sz[j]}" --value-size "${kv_sz[j+1]}" --random if test $? -ne 0 then echo "${mode[k]} threads $i ksz ${kv_sz[j]} vsz ${kv_sz[j+1]} random fail" exit 1 fi fi - ./build/release/rocksdb_bench --path $db_root --threads $i --iterations $cnt --mode ${mode[k]} --key-size ${kv_sz[j]} --value-size ${kv_sz[j+1]} + "${rocksdb_dir}/build/release/rocksdb_bench" --path "${db_root}" --threads "${i}" --iterations "${cnt}" --mode "${mode[k]}" --key-size "${kv_sz[j]}" --value-size "${kv_sz[j+1]}" if test $? -ne 0 then echo "${mode[k]} threads $i ksz ${kv_sz[j]} vsz ${kv_sz[j+1]} fail" @@ -43,7 +47,10 @@ function samples() { done } -echo mode,threads,key_size,value_size,insert_ratio,ops,elapsed > ../scripts/rocksdb.csv -samples $1 1>> ../scripts/rocksdb.csv -popd -./bin/python plot.py rocksdb.csv +echo mode,threads,key_size,value_size,insert_ratio,ops,elapsed > "${script_dir}/rocksdb.csv" +samples "$1" 1>> "${script_dir}/rocksdb.csv" +if [ -x "${script_dir}/bin/python" ]; then + (cd "${script_dir}" && "${script_dir}/bin/python" plot.py rocksdb.csv) +else + (cd "${script_dir}" && python3 plot.py rocksdb.csv) +fi diff --git a/scripts/rocksdb_get.png b/scripts/rocksdb_get.png index 63efa0d..24a886b 100644 Binary files a/scripts/rocksdb_get.png and b/scripts/rocksdb_get.png differ diff --git a/scripts/rocksdb_mixed.png b/scripts/rocksdb_mixed.png index 296b2f9..006477d 100644 Binary files a/scripts/rocksdb_mixed.png and b/scripts/rocksdb_mixed.png differ diff --git a/scripts/rocksdb_random_insert.png b/scripts/rocksdb_random_insert.png index c76af41..31c1cd5 100644 Binary files a/scripts/rocksdb_random_insert.png and b/scripts/rocksdb_random_insert.png differ diff --git a/scripts/rocksdb_scan.png b/scripts/rocksdb_scan.png index eeaf557..7ce38cf 100644 Binary files a/scripts/rocksdb_scan.png and b/scripts/rocksdb_scan.png differ diff --git a/scripts/rocksdb_sequential_insert.png b/scripts/rocksdb_sequential_insert.png index 4933dc9..eed3970 100644 Binary files a/scripts/rocksdb_sequential_insert.png and b/scripts/rocksdb_sequential_insert.png differ diff --git a/src/main.rs b/src/main.rs index 9f881ab..8c66ed5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -66,6 +66,16 @@ fn main() { exit(1); } + if args.threads == 0 { + eprintln!("Error: threads must be greater than 0"); + exit(1); + } + + if !matches!(args.mode.as_str(), "insert" | "get" | "mixed" | "scan") { + eprintln!("Error: Invalid mode"); + exit(1); + } + if args.key_size < 16 || args.value_size < 16 { eprintln!("Error: key_size or value_size too small, must >= 16"); exit(1); @@ -87,13 +97,17 @@ fn main() { saved.tmp_store = false; let mut db = Mace::new(opt.validate().unwrap()).unwrap(); db.disable_gc(); + let mut bkt = db.new_bucket("default").unwrap(); let mut rng = rand::rng(); let value = Arc::new(vec![b'0'; args.value_size]); - let keys_per_thread = args.iterations / args.threads; + let mut key_counts = vec![args.iterations / args.threads; args.threads]; + for cnt in key_counts.iter_mut().take(args.iterations % args.threads) { + *cnt += 1; + } for tid in 0..args.threads { - let mut tk = Vec::with_capacity(keys_per_thread); - for i in 0..keys_per_thread { + let mut tk = Vec::with_capacity(key_counts[tid]); + for i in 0..key_counts[tid] { let mut key = format!("key_{tid}_{i}").into_bytes(); key.resize(args.key_size, b'x'); tk.push(key); @@ -105,54 +119,52 @@ fn main() { } if args.mode == "get" || args.mode == "scan" { - let pre_tx = db.begin().unwrap(); + let pre_tx = bkt.begin().unwrap(); (0..args.threads).for_each(|tid| { for k in &keys[tid] { pre_tx.put(k, &*value).unwrap(); } }); pre_tx.commit().unwrap(); + drop(bkt); drop(db); // re-open db saved.tmp_store = true; db = Mace::new(saved.validate().unwrap()).unwrap(); + bkt = db.get_bucket("default").unwrap(); // simulate common use cases - for i in 0..keys_per_thread { + for _ in 0..args.iterations { let tid = rng.random_range(0..args.threads); - let mut k = format!("key_{tid}_{i}").into_bytes(); - k.resize(args.key_size, b'x'); - let view = db.view().unwrap(); - view.get(&k).unwrap(); + let Some(k) = keys[tid].choose(&mut rng) else { + continue; + }; + let view = bkt.view().unwrap(); + view.get(k).unwrap(); } } - let barrier = Arc::new(std::sync::Barrier::new(args.threads)); + let ready_barrier = Arc::new(std::sync::Barrier::new(args.threads + 1)); + let start_barrier = Arc::new(std::sync::Barrier::new(args.threads + 1)); let total_ops = Arc::new(std::sync::atomic::AtomicUsize::new(0)); - let start_time = Arc::new(std::sync::Mutex::new(Instant::now())); let h: Vec> = (0..args.threads) .map(|tid| { - let db = db.clone(); + let db = bkt.clone(); let tk: &Vec> = unsafe { std::mem::transmute(&keys[tid]) }; let total_ops = total_ops.clone(); - let barrier = Arc::clone(&barrier); + let ready_barrier = Arc::clone(&ready_barrier); + let start_barrier = Arc::clone(&start_barrier); let mode = args.mode.clone(); let insert_ratio = args.insert_ratio; - let st = start_time.clone(); let val = value.clone(); - let prefix = format!("key_{tid}"); + let prefix = format!("key_{tid}_"); std::thread::spawn(move || { - // coreid::bind_core(tid); + coreid::bind_core(tid); let mut round = 0; - barrier.wait(); - - { - if let Ok(mut guard) = st.try_lock() { - *guard = Instant::now(); - } - } + ready_barrier.wait(); + start_barrier.wait(); match mode.as_str() { "insert" => { for key in tk { @@ -202,12 +214,15 @@ fn main() { }) .collect(); + ready_barrier.wait(); + let start_time = Instant::now(); + start_barrier.wait(); + for x in h { x.join().unwrap(); } - let test_start = start_time.lock().unwrap(); - let duration = test_start.elapsed(); + let duration = start_time.elapsed(); let total = total_ops.load(std::sync::atomic::Ordering::Relaxed); let ops = (total as f64 / duration.as_secs_f64()) as usize;