refine test & support mace-0.0.27

This commit is contained in:
abbycin 2026-02-13 17:07:31 +08:00
parent 7bd02bb652
commit fe6b1de1f6
Signed by: abby
GPG Key ID: B636E0F0307EF8EB
16 changed files with 130 additions and 70 deletions

View File

@ -4,7 +4,7 @@ version = "0.1.0"
edition = "2024"
[dependencies]
mace-kv = "0.0.25"
mace-kv = "0.0.27"
clap = { version = "4.5.48", features = ["derive"] }
rand = "0.9.2"
log = "0.4.22"

View File

@ -1,6 +1,4 @@
# mace 0.0.24 vs rocksdb 10.4.2
**mace 0.0.24 traded a slight dip in query performance for a 10+% boost in insertion performance.**
# mace 0.0.27 vs rocksdb 10.4.2
## sequential insert
![mace_sequential_insert](./scripts/mace_sequential_insert.png)

View File

@ -22,6 +22,10 @@
#include <format>
#include <string>
#include <pthread.h>
#include <sched.h>
#include <unistd.h>
#include "CLI/CLI.hpp"
#include "instant.h"
@ -30,6 +34,19 @@ static void black_box(const T &t) {
asm volatile("" ::"m"(t) : "memory");
}
static size_t cores_online() {
auto n = ::sysconf(_SC_NPROCESSORS_ONLN);
return n > 0 ? static_cast<size_t>(n) : 1;
}
static void bind_core(size_t tid) {
cpu_set_t set;
CPU_ZERO(&set);
auto core = static_cast<int>(tid % cores_online());
CPU_SET(core, &set);
(void) pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &set);
}
struct Args {
size_t threads;
size_t iterations;
@ -59,11 +76,11 @@ int main(int argc, char *argv[]) {
app.add_option("-t,--threads", args.threads, "Threads");
app.add_option("-k,--key-size", args.key_size, "Key Size");
app.add_option("-v,--value-size", args.value_size, "Value Size");
app.add_option("-b,--blob-size", args.value_size, "Blob Size");
app.add_option("-b,--blob-size", args.blob_size, "Blob Size");
app.add_option("-i,--iterations", args.iterations, "Iterations");
app.add_option("-r,--insert-ratio", args.insert_ratio, "Insert Ratio for mixed mode");
app.add_option("-p,--path", args.path, "DataBase Home");
app.add_option("--random", args.random, "Shuffle insert keys");
app.add_flag("--random", args.random, "Shuffle insert keys");
CLI11_PARSE(app, argc, argv);
@ -77,6 +94,11 @@ int main(int argc, char *argv[]) {
return 1;
}
if (args.threads == 0) {
fmt::println("Error: threads must be greater than 0");
return 1;
}
if (args.mode != "insert" && args.mode != "get" && args.mode != "mixed" && args.mode != "scan") {
fmt::println("Error: Invalid mode");
return 1;
@ -142,27 +164,31 @@ int main(int argc, char *argv[]) {
std::atomic<uint64_t> total_op{0};
rocksdb::OptimisticTransactionDB *db;
auto b = nm::Instant::now();
std::mutex mtx{};
std::vector<rocksdb::ColumnFamilyHandle *> handles{};
auto s = rocksdb::OptimisticTransactionDB::Open(options, args.path, cfd, &handles, &db);
assert(s.ok());
std::barrier barrier{static_cast<ptrdiff_t>(args.threads)};
std::barrier ready_barrier{static_cast<ptrdiff_t>(args.threads + 1)};
std::barrier start_barrier{static_cast<ptrdiff_t>(args.threads + 1)};
std::random_device rd{};
std::mt19937 gen(rd());
std::uniform_int_distribution<int> dist(0, 100);
std::string val(args.value_size, 'x');
auto keys_per_thread = args.iterations / args.threads;
std::vector<size_t> key_counts(args.threads, args.iterations / args.threads);
for (size_t i = 0; i < args.iterations % args.threads; ++i) {
key_counts[i] += 1;
}
keys.reserve(args.threads);
for (size_t tid = 0; tid < args.threads; ++tid) {
std::vector<std::string> key{};
for (size_t i = 0; i < keys_per_thread; ++i) {
key.reserve(key_counts[tid]);
for (size_t i = 0; i < key_counts[tid]; ++i) {
auto tmp = std::format("key_{}_{}", tid, i);
tmp.resize(args.key_size, 'x');
key.emplace_back(std::move(tmp));
}
if (args.mode == "get" || args.random) {
std::shuffle(keys.begin(), keys.end(), gen);
std::shuffle(key.begin(), key.end(), gen);
}
keys.emplace_back(std::move(key));
}
@ -189,11 +215,14 @@ int main(int argc, char *argv[]) {
handle = handles[0];
// simulate common use cases
std::uniform_int_distribution<int> dist(0, args.threads - 1);
for (size_t i = 0; i < keys_per_thread; ++i) {
auto tid = dist(gen);
auto k = std::format("key_{}_{}", tid, i);
k.resize(args.key_size, 'x');
std::uniform_int_distribution<size_t> tid_dist(0, args.threads - 1);
for (size_t i = 0; i < args.iterations; ++i) {
auto tid = tid_dist(gen);
if (keys[tid].empty()) {
continue;
}
std::uniform_int_distribution<size_t> key_dist(0, keys[tid].size() - 1);
const auto &k = keys[tid][key_dist(gen)];
auto s = db->Get(rocksdb::ReadOptions(), k, &val);
if (!s.ok()) {
std::terminate();
@ -202,10 +231,12 @@ int main(int argc, char *argv[]) {
}
auto *snapshot = db->GetSnapshot();
auto base_seed = rd();
for (size_t tid = 0; tid < args.threads; ++tid) {
wg.emplace_back([&, tid] {
bind_core(tid);
std::string rval(args.value_size, '0');
auto prefix = std::format("key_{}", tid);
auto prefix = std::format("key_{}_", tid);
auto ropt = rocksdb::ReadOptions();
auto upper_bound = find_upper_bound(prefix);
auto upper_bound_slice = rocksdb::Slice(upper_bound);
@ -216,12 +247,11 @@ int main(int argc, char *argv[]) {
ropt.prefix_same_as_start = true;
ropt.snapshot = snapshot;
size_t round = 0;
std::mt19937 mixed_gen(static_cast<uint32_t>(base_seed) ^ static_cast<uint32_t>(0x9e3779b9U * (tid + 1)));
std::uniform_int_distribution<int> mixed_dist(0, 99);
barrier.arrive_and_wait();
if (mtx.try_lock()) {
b = nm::Instant::now();
mtx.unlock();
}
ready_barrier.arrive_and_wait();
start_barrier.arrive_and_wait();
if (args.mode == "insert") {
for (auto &key: *tk) {
@ -243,7 +273,7 @@ int main(int argc, char *argv[]) {
} else if (args.mode == "mixed") {
for (auto &key: *tk) {
round += 1;
auto is_insert = dist(gen) < args.insert_ratio;
auto is_insert = mixed_dist(mixed_gen) < static_cast<int>(args.insert_ratio);
auto *kv = db->BeginTransaction(wopt);
if (is_insert) {
kv->Put(handle, key, val);
@ -273,6 +303,10 @@ int main(int argc, char *argv[]) {
});
}
ready_barrier.arrive_and_wait();
b = nm::Instant::now();
start_barrier.arrive_and_wait();
for (auto &w: wg) {
w.join();
}

View File

@ -1,21 +1,24 @@
#!/usr/bin/env bash
set -euo pipefail
if [ "$#" -ne 1 ]
then
printf "\033[m$0 path\033[0m\n"
exit 1
fi
pushd .
cd ..
cargo build --release 1>/dev/null 2> /dev/null
script_dir="$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" && pwd)"
root_dir="$(cd -- "${script_dir}/.." && pwd)"
cargo build --release --manifest-path "${root_dir}/Cargo.toml" 1>/dev/null 2>/dev/null
function samples() {
export RUST_BACKTRACE=full
kv_sz=(16 16 100 1024 1024 1024 16 10240)
mode=(insert get mixed scan)
# set -x
db_root=$1
db_root="$1"
cnt=100000
for ((i = 1; i <= $(nproc); i *= 2))
@ -26,14 +29,14 @@ function samples() {
do
if [ "${mode[k]}" == "insert" ]
then
./target/release/kv_bench --path $db_root --threads $i --iterations $cnt --mode ${mode[k]} --key-size ${kv_sz[j]} --value-size ${kv_sz[j+1]} --random
"${root_dir}/target/release/kv_bench" --path "${db_root}" --threads "${i}" --iterations "${cnt}" --mode "${mode[k]}" --key-size "${kv_sz[j]}" --value-size "${kv_sz[j+1]}" --random
if test $? -ne 0
then
echo "${mode[k]} threads $i ksz ${kv_sz[j]} vsz ${kv_sz[j+1]} random fail"
exit 1
fi
fi
./target/release/kv_bench --path $db_root --threads $i --iterations $cnt --mode ${mode[k]} --key-size ${kv_sz[j]} --value-size ${kv_sz[j+1]}
"${root_dir}/target/release/kv_bench" --path "${db_root}" --threads "${i}" --iterations "${cnt}" --mode "${mode[k]}" --key-size "${kv_sz[j]}" --value-size "${kv_sz[j+1]}"
if test $? -ne 0
then
echo "${mode[k]} threads $i ksz ${kv_sz[j]} vsz ${kv_sz[j+1]} fail"
@ -44,7 +47,10 @@ function samples() {
done
}
echo mode,threads,key_size,value_size,insert_ratio,ops,elasped > scripts/mace.csv
samples $1 2>> scripts/mace.csv
popd
./bin/python plot.py mace.csv
echo mode,threads,key_size,value_size,insert_ratio,ops,elasped > "${script_dir}/mace.csv"
samples "$1" 2>> "${script_dir}/mace.csv"
if [ -x "${script_dir}/bin/python" ]; then
(cd "${script_dir}" && "${script_dir}/bin/python" plot.py mace.csv)
else
(cd "${script_dir}" && python3 plot.py mace.csv)
fi

Binary file not shown.

Before

Width:  |  Height:  |  Size: 137 KiB

After

Width:  |  Height:  |  Size: 133 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 131 KiB

After

Width:  |  Height:  |  Size: 135 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 127 KiB

After

Width:  |  Height:  |  Size: 130 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 151 KiB

After

Width:  |  Height:  |  Size: 143 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 121 KiB

After

Width:  |  Height:  |  Size: 125 KiB

View File

@ -1,21 +1,25 @@
#!/usr/bin/env bash
set -euo pipefail
if [ "$#" -ne 1 ]
then
printf "\033[m$0 path\033[0m\n"
exit 1
fi
pushd .
cd ../rocksdb
cmake --preset release 1>/dev/null 2>/dev/null
cmake --build --preset release 1>/dev/null 2>/dev/null
script_dir="$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" && pwd)"
root_dir="$(cd -- "${script_dir}/.." && pwd)"
rocksdb_dir="${root_dir}/rocksdb"
(cd "${rocksdb_dir}" && cmake --preset release 1>/dev/null 2>/dev/null)
(cd "${rocksdb_dir}" && cmake --build --preset release 1>/dev/null 2>/dev/null)
function samples() {
kv_sz=(16 16 100 1024 1024 1024 16 10240)
mode=(insert get mixed scan)
# set -x
db_root=$1
db_root="$1"
cnt=100000
for ((i = 1; i <= $(nproc); i *= 2))
do
@ -25,14 +29,14 @@ function samples() {
do
if [ "${mode[k]}" == "insert" ]
then
./build/release/rocksdb_bench --path $db_root --threads $i --iterations $cnt --mode ${mode[k]} --key-size ${kv_sz[j]} --value-size ${kv_sz[j+1]} --random 1
"${rocksdb_dir}/build/release/rocksdb_bench" --path "${db_root}" --threads "${i}" --iterations "${cnt}" --mode "${mode[k]}" --key-size "${kv_sz[j]}" --value-size "${kv_sz[j+1]}" --random
if test $? -ne 0
then
echo "${mode[k]} threads $i ksz ${kv_sz[j]} vsz ${kv_sz[j+1]} random fail"
exit 1
fi
fi
./build/release/rocksdb_bench --path $db_root --threads $i --iterations $cnt --mode ${mode[k]} --key-size ${kv_sz[j]} --value-size ${kv_sz[j+1]}
"${rocksdb_dir}/build/release/rocksdb_bench" --path "${db_root}" --threads "${i}" --iterations "${cnt}" --mode "${mode[k]}" --key-size "${kv_sz[j]}" --value-size "${kv_sz[j+1]}"
if test $? -ne 0
then
echo "${mode[k]} threads $i ksz ${kv_sz[j]} vsz ${kv_sz[j+1]} fail"
@ -43,7 +47,10 @@ function samples() {
done
}
echo mode,threads,key_size,value_size,insert_ratio,ops,elapsed > ../scripts/rocksdb.csv
samples $1 1>> ../scripts/rocksdb.csv
popd
./bin/python plot.py rocksdb.csv
echo mode,threads,key_size,value_size,insert_ratio,ops,elapsed > "${script_dir}/rocksdb.csv"
samples "$1" 1>> "${script_dir}/rocksdb.csv"
if [ -x "${script_dir}/bin/python" ]; then
(cd "${script_dir}" && "${script_dir}/bin/python" plot.py rocksdb.csv)
else
(cd "${script_dir}" && python3 plot.py rocksdb.csv)
fi

Binary file not shown.

Before

Width:  |  Height:  |  Size: 135 KiB

After

Width:  |  Height:  |  Size: 143 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 140 KiB

After

Width:  |  Height:  |  Size: 128 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 125 KiB

After

Width:  |  Height:  |  Size: 122 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 115 KiB

After

Width:  |  Height:  |  Size: 127 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 126 KiB

After

Width:  |  Height:  |  Size: 126 KiB

View File

@ -66,6 +66,16 @@ fn main() {
exit(1);
}
if args.threads == 0 {
eprintln!("Error: threads must be greater than 0");
exit(1);
}
if !matches!(args.mode.as_str(), "insert" | "get" | "mixed" | "scan") {
eprintln!("Error: Invalid mode");
exit(1);
}
if args.key_size < 16 || args.value_size < 16 {
eprintln!("Error: key_size or value_size too small, must >= 16");
exit(1);
@ -87,13 +97,17 @@ fn main() {
saved.tmp_store = false;
let mut db = Mace::new(opt.validate().unwrap()).unwrap();
db.disable_gc();
let mut bkt = db.new_bucket("default").unwrap();
let mut rng = rand::rng();
let value = Arc::new(vec![b'0'; args.value_size]);
let keys_per_thread = args.iterations / args.threads;
let mut key_counts = vec![args.iterations / args.threads; args.threads];
for cnt in key_counts.iter_mut().take(args.iterations % args.threads) {
*cnt += 1;
}
for tid in 0..args.threads {
let mut tk = Vec::with_capacity(keys_per_thread);
for i in 0..keys_per_thread {
let mut tk = Vec::with_capacity(key_counts[tid]);
for i in 0..key_counts[tid] {
let mut key = format!("key_{tid}_{i}").into_bytes();
key.resize(args.key_size, b'x');
tk.push(key);
@ -105,54 +119,52 @@ fn main() {
}
if args.mode == "get" || args.mode == "scan" {
let pre_tx = db.begin().unwrap();
let pre_tx = bkt.begin().unwrap();
(0..args.threads).for_each(|tid| {
for k in &keys[tid] {
pre_tx.put(k, &*value).unwrap();
}
});
pre_tx.commit().unwrap();
drop(bkt);
drop(db);
// re-open db
saved.tmp_store = true;
db = Mace::new(saved.validate().unwrap()).unwrap();
bkt = db.get_bucket("default").unwrap();
// simulate common use cases
for i in 0..keys_per_thread {
for _ in 0..args.iterations {
let tid = rng.random_range(0..args.threads);
let mut k = format!("key_{tid}_{i}").into_bytes();
k.resize(args.key_size, b'x');
let view = db.view().unwrap();
view.get(&k).unwrap();
let Some(k) = keys[tid].choose(&mut rng) else {
continue;
};
let view = bkt.view().unwrap();
view.get(k).unwrap();
}
}
let barrier = Arc::new(std::sync::Barrier::new(args.threads));
let ready_barrier = Arc::new(std::sync::Barrier::new(args.threads + 1));
let start_barrier = Arc::new(std::sync::Barrier::new(args.threads + 1));
let total_ops = Arc::new(std::sync::atomic::AtomicUsize::new(0));
let start_time = Arc::new(std::sync::Mutex::new(Instant::now()));
let h: Vec<JoinHandle<()>> = (0..args.threads)
.map(|tid| {
let db = db.clone();
let db = bkt.clone();
let tk: &Vec<Vec<u8>> = unsafe { std::mem::transmute(&keys[tid]) };
let total_ops = total_ops.clone();
let barrier = Arc::clone(&barrier);
let ready_barrier = Arc::clone(&ready_barrier);
let start_barrier = Arc::clone(&start_barrier);
let mode = args.mode.clone();
let insert_ratio = args.insert_ratio;
let st = start_time.clone();
let val = value.clone();
let prefix = format!("key_{tid}");
let prefix = format!("key_{tid}_");
std::thread::spawn(move || {
// coreid::bind_core(tid);
coreid::bind_core(tid);
let mut round = 0;
barrier.wait();
{
if let Ok(mut guard) = st.try_lock() {
*guard = Instant::now();
}
}
ready_barrier.wait();
start_barrier.wait();
match mode.as_str() {
"insert" => {
for key in tk {
@ -202,12 +214,15 @@ fn main() {
})
.collect();
ready_barrier.wait();
let start_time = Instant::now();
start_barrier.wait();
for x in h {
x.join().unwrap();
}
let test_start = start_time.lock().unwrap();
let duration = test_start.elapsed();
let duration = start_time.elapsed();
let total = total_ops.load(std::sync::atomic::Ordering::Relaxed);
let ops = (total as f64 / duration.as_secs_f64()) as usize;