From 08b1fcec9ad0bb6d68a22bb74ba126cc2e9ae114 Mon Sep 17 00:00:00 2001 From: abbycin Date: Sun, 16 Nov 2025 10:43:06 +0800 Subject: [PATCH] merge blodb --- rocksdb/main.cpp | 54 +++++++++++++++++++++++++++++++++------------- scripts/mace.sh | 10 +++++---- scripts/rocksdb.sh | 10 ++++----- src/main.rs | 8 +++++-- 4 files changed, 56 insertions(+), 26 deletions(-) diff --git a/rocksdb/main.cpp b/rocksdb/main.cpp index e160940..c2f8254 100644 --- a/rocksdb/main.cpp +++ b/rocksdb/main.cpp @@ -1,10 +1,14 @@ #include #include +#include #include +#include #include #include #include +#include #include +#include #include #include #include @@ -76,17 +80,31 @@ int main(int argc, char *argv[]) { return 1; } - rocksdb::Options options; + rocksdb::ColumnFamilyOptions cfo{}; + cfo.enable_blob_files = true; + cfo.min_blob_size = 8192; + // use 1GB block cache + auto cache = rocksdb::NewLRUCache(1 << 30); + rocksdb::BlockBasedTableOptions table_options{}; + table_options.block_cache = cache; + cfo.table_factory.reset(NewBlockBasedTableFactory(table_options)); + // the following three options makes it not trigger GC in test + cfo.level0_file_num_compaction_trigger = 10000; + cfo.write_buffer_size = 64 << 20; + cfo.max_write_buffer_number = 16; + + std::vector cfd{}; + cfd.push_back(rocksdb::ColumnFamilyDescriptor("default", cfo)); + + rocksdb::DBOptions options; options.create_if_missing = true; options.allow_concurrent_memtable_write = true; options.enable_pipelined_write = true; - // the following three options makes it not trigger GC in test - options.level0_file_num_compaction_trigger = 1000; - options.write_buffer_size = 1 << 30; - options.max_write_buffer_number = 5; + options.env->SetBackgroundThreads(4, rocksdb::Env::Priority::HIGH); auto ropt = rocksdb::ReadOptions(); auto wopt = rocksdb::WriteOptions(); + wopt.no_slowdown = true; // wopt.disableWAL = true; std::vector wg; std::vector> keys{}; @@ -94,7 +112,8 @@ int main(int argc, char *argv[]) { rocksdb::OptimisticTransactionDB *db; auto b = nm::Instant::now(); std::mutex mtx{}; - auto s = rocksdb::OptimisticTransactionDB::Open(options, args.path, &db); + std::vector handles{}; + auto s = rocksdb::OptimisticTransactionDB::Open(options, args.path, cfd, &handles, &db); assert(s.ok()); std::barrier barrier{static_cast(args.threads)}; @@ -117,23 +136,27 @@ int main(int argc, char *argv[]) { keys.emplace_back(std::move(key)); } + auto *handle = handles[0]; if (args.mode == "get") { auto *kv = db->BeginTransaction(wopt); for (size_t tid = 0; tid < args.threads; ++tid) { auto *tk = &keys[tid]; for (auto &key: *tk) { - kv->Put(key, val); + kv->Put(handle, key, val); } } kv->Commit(); delete kv; + delete handle; delete db; + handles.clear(); // re-open db - s = rocksdb::OptimisticTransactionDB::Open(options, args.path, &db); + s = rocksdb::OptimisticTransactionDB::Open(options, args.path, cfd, &handles, &db); assert(s.ok()); } + handle = handles[0]; for (size_t tid = 0; tid < args.threads; ++tid) { auto *tk = &keys[tid]; wg.emplace_back([&] { @@ -147,7 +170,7 @@ int main(int argc, char *argv[]) { if (args.mode == "insert") { for (auto &key: *tk) { auto *kv = db->BeginTransaction(wopt); - kv->Put(key, val); + kv->Put(handle, key, val); kv->Commit(); delete kv; } @@ -155,7 +178,7 @@ int main(int argc, char *argv[]) { } else if (args.mode == "get") { for (auto &key: *tk) { auto *kv = db->BeginTransaction(wopt); - kv->Get(ropt, key, &rval); + kv->Get(ropt, handle, key, &rval); kv->Commit(); delete kv; } @@ -164,9 +187,9 @@ int main(int argc, char *argv[]) { auto is_insert = dist(gen) < args.insert_ratio; auto *kv = db->BeginTransaction(wopt); if (is_insert) { - kv->Put(key, val); + kv->Put(handle, key, val); } else { - kv->Get(ropt, key, &rval); // not found + kv->Get(ropt, handle, key, &rval); // not found } kv->Commit(); delete kv; @@ -184,9 +207,10 @@ int main(int argc, char *argv[]) { return args.insert_ratio; return args.mode == "insert" ? 100 : 0; }(); - double ops = static_cast(total_op.load(std::memory_order_relaxed)) / b.elapse_sec(); - fmt::println("{},{},{},{},{},{:.2f},{}", args.mode, args.threads, args.key_size, args.value_size, ratio, ops, - b.elapse_ms()); + uint64_t ops = total_op.load(std::memory_order_relaxed) / b.elapse_sec(); + fmt::println("{},{},{},{},{},{},{}", args.mode, args.threads, args.key_size, args.value_size, ratio, (uint64_t) ops, + (uint64_t) b.elapse_ms()); + delete handle; delete db; std::filesystem::remove_all(args.path); } diff --git a/scripts/mace.sh b/scripts/mace.sh index 5265d65..4ccd042 100755 --- a/scripts/mace.sh +++ b/scripts/mace.sh @@ -5,26 +5,28 @@ cd .. cargo build --release 1>/dev/null 2> /dev/null function samples() { - kv_sz=(16 16 100 1024 1024 1024) + export RUST_BACKTRACE=full + kv_sz=(16 16 100 1024 1024 1024 16 10240) # set -x + cnt=10000 for ((i = 1; i <= $(nproc); i *= 2)) do for ((j = 0; j < ${#kv_sz[@]}; j += 2)) do - ./target/release/kv_bench --path /home/abby/mace_bench --threads $i --iterations 100000 --mode insert --key-size ${kv_sz[j]} --value-size ${kv_sz[j+1]} + ./target/release/kv_bench --path /home/abby/mace_bench --threads $i --iterations $cnt --mode insert --key-size ${kv_sz[j]} --value-size ${kv_sz[j+1]} if test $? -ne 0 then echo "insert threads $i ksz ${kv_sz[j]} vsz ${kv_sz[j+1]} fail" exit 1 fi - ./target/release/kv_bench --path /home/abby/mace_bench --threads $i --iterations 100000 --mode get --key-size ${kv_sz[j]} --value-size ${kv_sz[j+1]} + ./target/release/kv_bench --path /home/abby/mace_bench --threads $i --iterations $cnt --mode get --key-size ${kv_sz[j]} --value-size ${kv_sz[j+1]} if test $? -ne 0 then echo "get threads $i ksz ${kv_sz[j]} vsz ${kv_sz[j+1]} fail" exit 1 fi - ./target/release/kv_bench --path /home/abby/mace_bench --threads $i --iterations 100000 --mode mixed --key-size ${kv_sz[j]} --value-size ${kv_sz[j+1]} --insert-ratio 30 + ./target/release/kv_bench --path /home/abby/mace_bench --threads $i --iterations $cnt --mode mixed --key-size ${kv_sz[j]} --value-size ${kv_sz[j+1]} --insert-ratio 30 if test $? -ne 0 then echo "mixed threads $i ksz ${kv_sz[j]} vsz ${kv_sz[j+1]} fail" diff --git a/scripts/rocksdb.sh b/scripts/rocksdb.sh index 79e18a7..35bcf93 100755 --- a/scripts/rocksdb.sh +++ b/scripts/rocksdb.sh @@ -6,26 +6,26 @@ cmake --preset release 1>/dev/null 2>/dev/null cmake --build --preset release 1>/dev/null 2>/dev/null function samples() { - kv_sz=(16 16 100 1024 1024 1024) + kv_sz=(16 16 100 1024 1024 1024 16 10240) # set -x - + cnt=10000 for ((i = 1; i <= $(nproc); i *= 2)) do for ((j = 0; j < ${#kv_sz[@]}; j += 2)) do - ./build/release/rocksdb_bench --path /home/abby/rocksdb_tmp --threads $i --iterations 100000 --mode insert --key-size ${kv_sz[j]} --value-size ${kv_sz[j+1]} + ./build/release/rocksdb_bench --path /home/abby/rocksdb_tmp --threads $i --iterations $cnt --mode insert --key-size ${kv_sz[j]} --value-size ${kv_sz[j+1]} if test $? -ne 0 then echo "insert threads $i ksz ${kv_sz[j]} vsz ${kv_sz[j+1]} fail" exit 1 fi - ./build/release/rocksdb_bench --path /home/abby/rocksdb_tmp --threads $i --iterations 100000 --mode get --key-size ${kv_sz[j]} --value-size ${kv_sz[j+1]} + ./build/release/rocksdb_bench --path /home/abby/rocksdb_tmp --threads $i --iterations $cnt --mode get --key-size ${kv_sz[j]} --value-size ${kv_sz[j+1]} if test $? -ne 0 then echo "get threads $i ksz ${kv_sz[j]} vsz ${kv_sz[j+1]} fail" exit 1 fi - ./build/release/rocksdb_bench --path /home/abby/rocksdb_tmp --threads $i --iterations 100000 --mode mixed --key-size ${kv_sz[j]} --value-size ${kv_sz[j+1]} --insert-ratio 30 + ./build/release/rocksdb_bench --path /home/abby/rocksdb_tmp --threads $i --iterations $cnt --mode mixed --key-size ${kv_sz[j]} --value-size ${kv_sz[j+1]} --insert-ratio 30 if test $? -ne 0 then echo "mixed threads $i ksz ${kv_sz[j]} vsz ${kv_sz[j+1]} fail" diff --git a/src/main.rs b/src/main.rs index 52b6854..abdc862 100644 --- a/src/main.rs +++ b/src/main.rs @@ -41,6 +41,9 @@ struct Args { #[arg(long, default_value = "false")] random: bool, + + #[arg(long, default_value = "8192")] + blob_size: usize, } fn main() { @@ -76,10 +79,11 @@ fn main() { let mut keys: Vec>> = Vec::with_capacity(args.threads); let mut opt = Options::new(path); opt.sync_on_write = false; + opt.over_provision = true; // large value will use lots of memeory + opt.inline_size = args.blob_size; opt.tmp_store = args.mode != "get"; let mut saved = opt.clone(); saved.tmp_store = false; - // opt.cache_capacity = 3 << 30; // this is very important for large key-value store let mut db = Mace::new(opt.validate().unwrap()).unwrap(); db.disable_gc(); @@ -180,7 +184,7 @@ fn main() { let test_start = start_time.lock().unwrap(); let duration = test_start.elapsed(); let total = total_ops.load(std::sync::atomic::Ordering::Relaxed); - let ops = total as f64 / duration.as_secs_f64(); + let ops = (total as f64 / duration.as_secs_f64()) as usize; // println!("{:<20} {}", "Test Mode:", args.mode); // println!("{:<20} {}", "Threads:", args.threads);