Compare commits

...

5 Commits

Author SHA1 Message Date
67f7108399
fix rocksdb test code 2025-12-19 19:51:14 +08:00
11ea5a1569
use snapshot for scan in rocksdb 2025-11-23 10:07:30 +08:00
90788721b2
update config 2025-11-22 16:41:22 +08:00
d5a9b1552a
fix rocksdb iteration calc 2025-11-22 16:00:50 +08:00
5917f83af2
change to hot scan 2025-11-21 17:56:07 +08:00
4 changed files with 107 additions and 34 deletions

View File

@ -4,12 +4,14 @@
#include <cstdio> #include <cstdio>
#include <fmt/base.h> #include <fmt/base.h>
#include <fmt/format.h> #include <fmt/format.h>
#include <iostream>
#include <memory> #include <memory>
#include <random> #include <random>
#include <rocksdb/cache.h> #include <rocksdb/cache.h>
#include <rocksdb/db.h> #include <rocksdb/db.h>
#include <rocksdb/env.h> #include <rocksdb/env.h>
#include <rocksdb/options.h> #include <rocksdb/options.h>
#include <rocksdb/slice.h>
#include <rocksdb/table.h> #include <rocksdb/table.h>
#include <rocksdb/utilities/optimistic_transaction_db.h> #include <rocksdb/utilities/optimistic_transaction_db.h>
#include <rocksdb/utilities/transaction.h> #include <rocksdb/utilities/transaction.h>
@ -19,16 +21,23 @@
#include <filesystem> #include <filesystem>
#include <format> #include <format>
#include <string> #include <string>
#include <syncstream>
#include "CLI/CLI.hpp" #include "CLI/CLI.hpp"
#include "instant.h" #include "instant.h"
template<class T>
static void black_box(const T &t) {
asm volatile("" ::"m"(t) : "memory");
}
struct Args { struct Args {
size_t threads; size_t threads;
size_t iterations; size_t iterations;
size_t key_size; size_t key_size;
size_t value_size; size_t value_size;
size_t insert_ratio; size_t insert_ratio;
size_t blob_size;
bool random; bool random;
std::string mode; std::string mode;
std::string path; std::string path;
@ -50,6 +59,7 @@ int main(int argc, char *argv[]) {
app.add_option("-t,--threads", args.threads, "Threads"); app.add_option("-t,--threads", args.threads, "Threads");
app.add_option("-k,--key-size", args.key_size, "Key Size"); app.add_option("-k,--key-size", args.key_size, "Key Size");
app.add_option("-v,--value-size", args.value_size, "Value Size"); app.add_option("-v,--value-size", args.value_size, "Value Size");
app.add_option("-b,--blob-size", args.value_size, "Blob Size");
app.add_option("-i,--iterations", args.iterations, "Iterations"); app.add_option("-i,--iterations", args.iterations, "Iterations");
app.add_option("-r,--insert-ratio", args.insert_ratio, "Insert Ratio for mixed mode"); app.add_option("-r,--insert-ratio", args.insert_ratio, "Insert Ratio for mixed mode");
app.add_option("-p,--path", args.path, "DataBase Home"); app.add_option("-p,--path", args.path, "DataBase Home");
@ -82,9 +92,25 @@ int main(int argc, char *argv[]) {
return 1; return 1;
} }
auto find_upper_bound = [](std::string prefix) {
std::string upper_bound_key = prefix;
for (int i = upper_bound_key.length() - 1; i >= 0; --i) {
if ((unsigned char) upper_bound_key[i] != 0xff) {
upper_bound_key[i] = (unsigned char) upper_bound_key[i] + 1;
upper_bound_key.resize(i + 1);
break;
}
if (i == 0) {
upper_bound_key = "";
break;
}
}
return upper_bound_key;
};
rocksdb::ColumnFamilyOptions cfo{}; rocksdb::ColumnFamilyOptions cfo{};
cfo.enable_blob_files = true; cfo.enable_blob_files = true;
cfo.min_blob_size = 8192; cfo.min_blob_size = args.blob_size;
// use 1GB block cache // use 1GB block cache
auto cache = rocksdb::NewLRUCache(1 << 30); auto cache = rocksdb::NewLRUCache(1 << 30);
rocksdb::BlockBasedTableOptions table_options{}; rocksdb::BlockBasedTableOptions table_options{};
@ -131,7 +157,7 @@ int main(int argc, char *argv[]) {
tmp.resize(args.key_size, 'x'); tmp.resize(args.key_size, 'x');
key.emplace_back(std::move(tmp)); key.emplace_back(std::move(tmp));
} }
if (args.mode == "get" || args.random) { if (args.mode == "get" || args.random || args.mode == "scan") {
std::shuffle(keys.begin(), keys.end(), gen); std::shuffle(keys.begin(), keys.end(), gen);
} }
keys.emplace_back(std::move(key)); keys.emplace_back(std::move(key));
@ -150,21 +176,32 @@ int main(int argc, char *argv[]) {
} }
kv->Commit(); kv->Commit();
delete kv; delete kv;
delete handle; if (args.mode == "get") {
delete db; delete handle;
handles.clear(); delete db;
// re-open db handles.clear();
s = rocksdb::OptimisticTransactionDB::Open(options, args.path, cfd, &handles, &db); // re-open db
assert(s.ok()); s = rocksdb::OptimisticTransactionDB::Open(options, args.path, cfd, &handles, &db);
assert(s.ok());
}
} }
handle = handles[0]; handle = handles[0];
auto *snapshot = db->GetSnapshot();
for (size_t tid = 0; tid < args.threads; ++tid) { for (size_t tid = 0; tid < args.threads; ++tid) {
auto *tk = &keys[tid];
wg.emplace_back([&, tid] { wg.emplace_back([&, tid] {
std::string rval(args.value_size, '0'); std::string rval(args.value_size, '0');
auto prefix = std::format("key_{}", tid); auto prefix = std::format("key_{}", tid);
auto ropt = rocksdb::ReadOptions(); auto ropt = rocksdb::ReadOptions();
auto upper_bound = find_upper_bound(prefix);
auto upper_bound_slice = rocksdb::Slice(upper_bound);
if (!upper_bound.empty()) {
ropt.iterate_upper_bound = &upper_bound_slice;
}
auto *tk = &keys[tid];
ropt.prefix_same_as_start = true;
ropt.snapshot = snapshot;
size_t round = 0;
barrier.arrive_and_wait(); barrier.arrive_and_wait();
if (mtx.try_lock()) { if (mtx.try_lock()) {
@ -174,6 +211,7 @@ int main(int argc, char *argv[]) {
if (args.mode == "insert") { if (args.mode == "insert") {
for (auto &key: *tk) { for (auto &key: *tk) {
round += 1;
auto *kv = db->BeginTransaction(wopt); auto *kv = db->BeginTransaction(wopt);
kv->Put(handle, key, val); kv->Put(handle, key, val);
kv->Commit(); kv->Commit();
@ -182,6 +220,7 @@ int main(int argc, char *argv[]) {
} else if (args.mode == "get") { } else if (args.mode == "get") {
for (auto &key: *tk) { for (auto &key: *tk) {
round += 1;
auto *kv = db->BeginTransaction(wopt); auto *kv = db->BeginTransaction(wopt);
kv->Get(ropt, handle, key, &rval); kv->Get(ropt, handle, key, &rval);
kv->Commit(); kv->Commit();
@ -189,6 +228,7 @@ int main(int argc, char *argv[]) {
} }
} else if (args.mode == "mixed") { } else if (args.mode == "mixed") {
for (auto &key: *tk) { for (auto &key: *tk) {
round += 1;
auto is_insert = dist(gen) < args.insert_ratio; auto is_insert = dist(gen) < args.insert_ratio;
auto *kv = db->BeginTransaction(wopt); auto *kv = db->BeginTransaction(wopt);
if (is_insert) { if (is_insert) {
@ -202,12 +242,19 @@ int main(int argc, char *argv[]) {
} else if (args.mode == "scan") { } else if (args.mode == "scan") {
auto *iter = db->NewIterator(ropt); auto *iter = db->NewIterator(ropt);
iter->Seek(prefix); iter->Seek(prefix);
size_t n = 0;
while (iter->Valid()) { while (iter->Valid()) {
round += 1;
auto k = iter->key();
auto v = iter->value();
black_box(k);
black_box(v);
iter->Next(); iter->Next();
n += 1;
} }
delete iter; delete iter;
} }
total_op.fetch_add(args.iterations, std::memory_order::relaxed); total_op.fetch_add(round, std::memory_order::relaxed);
}); });
} }
@ -222,6 +269,7 @@ int main(int argc, char *argv[]) {
uint64_t ops = total_op.load(std::memory_order_relaxed) / b.elapse_sec(); uint64_t ops = total_op.load(std::memory_order_relaxed) / b.elapse_sec();
fmt::println("{},{},{},{},{},{},{}", args.mode, args.threads, args.key_size, args.value_size, ratio, (uint64_t) ops, fmt::println("{},{},{},{},{},{},{}", args.mode, args.threads, args.key_size, args.value_size, ratio, (uint64_t) ops,
(uint64_t) b.elapse_ms()); (uint64_t) b.elapse_ms());
db->ReleaseSnapshot(snapshot);
delete handle; delete handle;
delete db; delete db;
std::filesystem::remove_all(args.path); std::filesystem::remove_all(args.path);

View File

@ -1,5 +1,11 @@
#!/usr/bin/env bash #!/usr/bin/env bash
if [ "$#" -ne 1 ]
then
printf "\033[m$0 path\033[0m\n"
exit 1
fi
pushd . pushd .
cd .. cd ..
cargo build --release 1>/dev/null 2> /dev/null cargo build --release 1>/dev/null 2> /dev/null
@ -8,31 +14,32 @@ function samples() {
export RUST_BACKTRACE=full export RUST_BACKTRACE=full
kv_sz=(16 16 100 1024 1024 1024 16 10240) kv_sz=(16 16 100 1024 1024 1024 16 10240)
# set -x # set -x
db_root=$1
cnt=10000 cnt=10000
for ((i = 1; i <= $(nproc); i *= 2)) for ((i = 1; i <= $(nproc); i *= 2))
do do
for ((j = 0; j < ${#kv_sz[@]}; j += 2)) for ((j = 0; j < ${#kv_sz[@]}; j += 2))
do do
./target/release/kv_bench --path /home/abby/mace_bench --threads $i --iterations $cnt --mode insert --key-size ${kv_sz[j]} --value-size ${kv_sz[j+1]} ./target/release/kv_bench --path $db_root --threads $i --iterations $cnt --mode insert --key-size ${kv_sz[j]} --value-size ${kv_sz[j+1]}
if test $? -ne 0 if test $? -ne 0
then then
echo "insert threads $i ksz ${kv_sz[j]} vsz ${kv_sz[j+1]} fail" echo "insert threads $i ksz ${kv_sz[j]} vsz ${kv_sz[j+1]} fail"
exit 1 exit 1
fi fi
./target/release/kv_bench --path /home/abby/mace_bench --threads $i --iterations $cnt --mode get --key-size ${kv_sz[j]} --value-size ${kv_sz[j+1]} ./target/release/kv_bench --path $db_root --threads $i --iterations $cnt --mode get --key-size ${kv_sz[j]} --value-size ${kv_sz[j+1]}
if test $? -ne 0 if test $? -ne 0
then then
echo "get threads $i ksz ${kv_sz[j]} vsz ${kv_sz[j+1]} fail" echo "get threads $i ksz ${kv_sz[j]} vsz ${kv_sz[j+1]} fail"
exit 1 exit 1
fi fi
./target/release/kv_bench --path /home/abby/mace_bench --threads $i --iterations $cnt --mode mixed --key-size ${kv_sz[j]} --value-size ${kv_sz[j+1]} --insert-ratio 30 ./target/release/kv_bench --path $db_root --threads $i --iterations $cnt --mode mixed --key-size ${kv_sz[j]} --value-size ${kv_sz[j+1]} --insert-ratio 30
if test $? -ne 0 if test $? -ne 0
then then
echo "mixed threads $i ksz ${kv_sz[j]} vsz ${kv_sz[j+1]} fail" echo "mixed threads $i ksz ${kv_sz[j]} vsz ${kv_sz[j+1]} fail"
exit 1 exit 1
fi fi
./target/release/kv_bench --path /home/abby/mace_bench --threads $i --iterations $cnt --mode scan --key-size ${kv_sz[j]} --value-size ${kv_sz[j+1]} --insert-ratio 30 ./target/release/kv_bench --path $db_root --threads $i --iterations $cnt --mode scan --key-size ${kv_sz[j]} --value-size ${kv_sz[j+1]} --insert-ratio 30
if test $? -ne 0 if test $? -ne 0
then then
echo "mixed threads $i ksz ${kv_sz[j]} vsz ${kv_sz[j+1]} fail" echo "mixed threads $i ksz ${kv_sz[j]} vsz ${kv_sz[j+1]} fail"
@ -43,6 +50,6 @@ function samples() {
} }
echo mode,threads,key_size,value_size,insert_ratio,ops,elasped > scripts/mace.csv echo mode,threads,key_size,value_size,insert_ratio,ops,elasped > scripts/mace.csv
samples 2>> scripts/mace.csv samples $1 2>> scripts/mace.csv
popd popd
./bin/python plot.py mace.csv ./bin/python plot.py mace.csv

View File

@ -1,5 +1,11 @@
#!/usr/bin/env bash #!/usr/bin/env bash
if [ "$#" -ne 1 ]
then
printf "\033[m$0 path\033[0m\n"
exit 1
fi
pushd . pushd .
cd ../rocksdb cd ../rocksdb
cmake --preset release 1>/dev/null 2>/dev/null cmake --preset release 1>/dev/null 2>/dev/null
@ -8,30 +14,31 @@ cmake --build --preset release 1>/dev/null 2>/dev/null
function samples() { function samples() {
kv_sz=(16 16 100 1024 1024 1024 16 10240) kv_sz=(16 16 100 1024 1024 1024 16 10240)
# set -x # set -x
db_root=$1
cnt=10000 cnt=10000
for ((i = 1; i <= $(nproc); i *= 2)) for ((i = 1; i <= $(nproc); i *= 2))
do do
for ((j = 0; j < ${#kv_sz[@]}; j += 2)) for ((j = 0; j < ${#kv_sz[@]}; j += 2))
do do
./build/release/rocksdb_bench --path /home/abby/rocksdb_tmp --threads $i --iterations $cnt --mode insert --key-size ${kv_sz[j]} --value-size ${kv_sz[j+1]} ./build/release/rocksdb_bench --path $db_root --threads $i --iterations $cnt --mode insert --key-size ${kv_sz[j]} --value-size ${kv_sz[j+1]}
if test $? -ne 0 if test $? -ne 0
then then
echo "insert threads $i ksz ${kv_sz[j]} vsz ${kv_sz[j+1]} fail" echo "insert threads $i ksz ${kv_sz[j]} vsz ${kv_sz[j+1]} fail"
exit 1 exit 1
fi fi
./build/release/rocksdb_bench --path /home/abby/rocksdb_tmp --threads $i --iterations $cnt --mode get --key-size ${kv_sz[j]} --value-size ${kv_sz[j+1]} ./build/release/rocksdb_bench --path $db_root --threads $i --iterations $cnt --mode get --key-size ${kv_sz[j]} --value-size ${kv_sz[j+1]}
if test $? -ne 0 if test $? -ne 0
then then
echo "get threads $i ksz ${kv_sz[j]} vsz ${kv_sz[j+1]} fail" echo "get threads $i ksz ${kv_sz[j]} vsz ${kv_sz[j+1]} fail"
exit 1 exit 1
fi fi
./build/release/rocksdb_bench --path /home/abby/rocksdb_tmp --threads $i --iterations $cnt --mode mixed --key-size ${kv_sz[j]} --value-size ${kv_sz[j+1]} --insert-ratio 30 ./build/release/rocksdb_bench --path $db_root --threads $i --iterations $cnt --mode mixed --key-size ${kv_sz[j]} --value-size ${kv_sz[j+1]} --insert-ratio 30
if test $? -ne 0 if test $? -ne 0
then then
echo "mixed threads $i ksz ${kv_sz[j]} vsz ${kv_sz[j+1]} fail" echo "mixed threads $i ksz ${kv_sz[j]} vsz ${kv_sz[j+1]} fail"
exit 1 exit 1
fi fi
./build/release/rocksdb_bench --path /home/abby/rocksdb_tmp --threads $i --iterations $cnt --mode scan --key-size ${kv_sz[j]} --value-size ${kv_sz[j+1]} --insert-ratio 30 ./build/release/rocksdb_bench --path $db_root --threads $i --iterations $cnt --mode scan --key-size ${kv_sz[j]} --value-size ${kv_sz[j+1]} --insert-ratio 30
if test $? -ne 0 if test $? -ne 0
then then
echo "mixed threads $i ksz ${kv_sz[j]} vsz ${kv_sz[j+1]} fail" echo "mixed threads $i ksz ${kv_sz[j]} vsz ${kv_sz[j+1]} fail"
@ -42,6 +49,6 @@ function samples() {
} }
echo mode,threads,key_size,value_size,insert_ratio,ops,elapsed > ../scripts/rocksdb.csv echo mode,threads,key_size,value_size,insert_ratio,ops,elapsed > ../scripts/rocksdb.csv
samples 1>> ../scripts/rocksdb.csv samples $1 1>> ../scripts/rocksdb.csv
popd popd
./bin/python plot.py rocksdb.csv ./bin/python plot.py rocksdb.csv

View File

@ -81,7 +81,7 @@ fn main() {
opt.sync_on_write = false; opt.sync_on_write = false;
opt.over_provision = true; // large value will use lots of memeory opt.over_provision = true; // large value will use lots of memeory
opt.inline_size = args.blob_size; opt.inline_size = args.blob_size;
opt.tmp_store = args.mode != "get" && args.mode != "scan"; opt.tmp_store = args.mode != "get";
let mut saved = opt.clone(); let mut saved = opt.clone();
saved.tmp_store = false; saved.tmp_store = false;
let mut db = Mace::new(opt.validate().unwrap()).unwrap(); let mut db = Mace::new(opt.validate().unwrap()).unwrap();
@ -96,7 +96,7 @@ fn main() {
key.resize(args.key_size, b'x'); key.resize(args.key_size, b'x');
tk.push(key); tk.push(key);
} }
if args.random || args.mode == "get" { if args.random || args.mode == "get" || args.mode == "scan" {
tk.shuffle(&mut rng); tk.shuffle(&mut rng);
} }
keys.push(tk); keys.push(tk);
@ -105,15 +105,17 @@ fn main() {
if args.mode == "get" || args.mode == "scan" { if args.mode == "get" || args.mode == "scan" {
let pre_tx = db.begin().unwrap(); let pre_tx = db.begin().unwrap();
(0..args.threads).for_each(|tid| { (0..args.threads).for_each(|tid| {
for i in 0..args.iterations { for k in &keys[tid] {
pre_tx.put(&keys[tid][i], &*value).unwrap(); pre_tx.put(k, &*value).unwrap();
} }
}); });
pre_tx.commit().unwrap(); pre_tx.commit().unwrap();
drop(db); if args.mode == "get" {
// re-open db drop(db);
saved.tmp_store = true; // re-open db
db = Mace::new(saved.validate().unwrap()).unwrap(); saved.tmp_store = true;
db = Mace::new(saved.validate().unwrap()).unwrap();
}
} }
let barrier = Arc::new(std::sync::Barrier::new(args.threads)); let barrier = Arc::new(std::sync::Barrier::new(args.threads));
@ -134,6 +136,7 @@ fn main() {
std::thread::spawn(move || { std::thread::spawn(move || {
// coreid::bind_core(tid); // coreid::bind_core(tid);
let mut round = 0;
barrier.wait(); barrier.wait();
{ {
@ -141,10 +144,10 @@ fn main() {
*guard = Instant::now(); *guard = Instant::now();
} }
} }
match mode.as_str() { match mode.as_str() {
"insert" => { "insert" => {
for key in tk { for key in tk {
round += 1;
let tx = db.begin().unwrap(); let tx = db.begin().unwrap();
tx.put(key.as_slice(), val.as_slice()).unwrap(); tx.put(key.as_slice(), val.as_slice()).unwrap();
tx.commit().unwrap(); tx.commit().unwrap();
@ -152,13 +155,16 @@ fn main() {
} }
"get" => { "get" => {
for key in tk { for key in tk {
round += 1;
let tx = db.view().unwrap(); let tx = db.view().unwrap();
tx.get(key).unwrap(); let x = tx.get(key).unwrap();
std::hint::black_box(x);
} }
} }
"mixed" => { "mixed" => {
for key in tk { for key in tk {
let is_insert = rand::random_range(0..100) < insert_ratio; let is_insert = rand::random_range(0..100) < insert_ratio;
round += 1;
if is_insert { if is_insert {
let tx = db.begin().unwrap(); let tx = db.begin().unwrap();
@ -166,7 +172,8 @@ fn main() {
tx.commit().unwrap(); tx.commit().unwrap();
} else { } else {
let tx = db.view().unwrap(); let tx = db.view().unwrap();
let _ = tx.get(key); // not found let x = tx.get(key); // not found
let _ = std::hint::black_box(x);
} }
} }
} }
@ -174,13 +181,17 @@ fn main() {
let view = db.view().unwrap(); let view = db.view().unwrap();
let iter = view.seek(prefix); let iter = view.seek(prefix);
for x in iter { for x in iter {
std::hint::black_box(x); round += 1;
let k = x.key();
let v = x.val();
std::hint::black_box(k);
std::hint::black_box(v);
} }
} }
_ => panic!("Invalid mode"), _ => panic!("Invalid mode"),
} }
total_ops.fetch_add(args.iterations, std::sync::atomic::Ordering::Relaxed); total_ops.fetch_add(round, std::sync::atomic::Ordering::Relaxed);
}) })
}) })
.collect(); .collect();
@ -213,7 +224,7 @@ fn main() {
}; };
// eprintln!("mode,threads,key_size,value_size,insert_ratio,ops"); // eprintln!("mode,threads,key_size,value_size,insert_ratio,ops");
eprintln!( eprintln!(
"{},{},{},{},{},{:.2},{}", "{},{},{},{},{},{},{}",
args.mode, args.mode,
args.threads, args.threads,
args.key_size, args.key_size,