share iterations among threads

This commit is contained in:
abbycin 2025-12-20 16:12:34 +08:00
parent 67f7108399
commit 7448a39eef
Signed by: abby
GPG Key ID: B636E0F0307EF8EB
12 changed files with 46 additions and 42 deletions

View File

@ -1,10 +1,10 @@
#include <algorithm>
#include <atomic> #include <atomic>
#include <cstdint> #include <cstdint>
#include <cstdio> #include <cstdio>
#include <cstdlib>
#include <exception>
#include <fmt/base.h> #include <fmt/base.h>
#include <fmt/format.h> #include <fmt/format.h>
#include <iostream>
#include <memory> #include <memory>
#include <random> #include <random>
#include <rocksdb/cache.h> #include <rocksdb/cache.h>
@ -21,7 +21,6 @@
#include <filesystem> #include <filesystem>
#include <format> #include <format>
#include <string> #include <string>
#include <syncstream>
#include "CLI/CLI.hpp" #include "CLI/CLI.hpp"
#include "instant.h" #include "instant.h"
@ -36,8 +35,8 @@ struct Args {
size_t iterations; size_t iterations;
size_t key_size; size_t key_size;
size_t value_size; size_t value_size;
size_t insert_ratio;
size_t blob_size; size_t blob_size;
size_t insert_ratio;
bool random; bool random;
std::string mode; std::string mode;
std::string path; std::string path;
@ -50,6 +49,7 @@ int main(int argc, char *argv[]) {
.iterations = 100000, .iterations = 100000,
.key_size = 16, .key_size = 16,
.value_size = 1024, .value_size = 1024,
.blob_size = 8192,
.insert_ratio = 30, .insert_ratio = 30,
.mode = "insert", .mode = "insert",
.path = "/tmp/rocksdb_tmp", .path = "/tmp/rocksdb_tmp",
@ -148,22 +148,21 @@ int main(int argc, char *argv[]) {
std::mt19937 gen(rd()); std::mt19937 gen(rd());
std::uniform_int_distribution<int> dist(0, 100); std::uniform_int_distribution<int> dist(0, 100);
std::string val(args.value_size, 'x'); std::string val(args.value_size, 'x');
auto keys_per_thread = args.iterations / args.threads;
for (size_t tid = 0; tid < args.threads; ++tid) { for (size_t tid = 0; tid < args.threads; ++tid) {
std::vector<std::string> key{}; std::vector<std::string> key{};
for (size_t i = 0; i < args.iterations; ++i) { for (size_t i = 0; i < keys_per_thread; ++i) {
auto tmp = std::format("key_{}_{}", tid, i); auto tmp = std::format("key_{}_{}", tid, i);
tmp.resize(args.key_size, 'x'); tmp.resize(args.key_size, 'x');
key.emplace_back(std::move(tmp)); key.emplace_back(std::move(tmp));
} }
if (args.mode == "get" || args.random || args.mode == "scan") { if (args.mode == "get" || args.random) {
std::shuffle(keys.begin(), keys.end(), gen); std::shuffle(keys.begin(), keys.end(), gen);
} }
keys.emplace_back(std::move(key)); keys.emplace_back(std::move(key));
} }
auto *handle = handles[0]; auto *handle = handles[0];
if (args.mode == "get" || args.mode == "scan") { if (args.mode == "get" || args.mode == "scan") {
@ -176,17 +175,28 @@ int main(int argc, char *argv[]) {
} }
kv->Commit(); kv->Commit();
delete kv; delete kv;
if (args.mode == "get") { delete handle;
delete handle; delete db;
delete db; handles.clear();
handles.clear(); // re-open db
// re-open db s = rocksdb::OptimisticTransactionDB::Open(options, args.path, cfd, &handles, &db);
s = rocksdb::OptimisticTransactionDB::Open(options, args.path, cfd, &handles, &db); assert(s.ok());
assert(s.ok());
handle = handles[0];
// simulate common use cases
std::uniform_int_distribution<int> dist(0, args.threads - 1);
for (size_t i = 0; i < keys_per_thread; ++i) {
auto tid = dist(gen);
auto k = std::format("key_{}_{}", tid, i);
k.resize(args.key_size, 'x');
auto s = db->Get(rocksdb::ReadOptions(), k, &val);
if (!s.ok()) {
std::terminate();
}
} }
} }
handle = handles[0];
auto *snapshot = db->GetSnapshot(); auto *snapshot = db->GetSnapshot();
for (size_t tid = 0; tid < args.threads; ++tid) { for (size_t tid = 0; tid < args.threads; ++tid) {
wg.emplace_back([&, tid] { wg.emplace_back([&, tid] {

View File

@ -16,7 +16,7 @@ function samples() {
# set -x # set -x
db_root=$1 db_root=$1
cnt=10000 cnt=100000
for ((i = 1; i <= $(nproc); i *= 2)) for ((i = 1; i <= $(nproc); i *= 2))
do do
for ((j = 0; j < ${#kv_sz[@]}; j += 2)) for ((j = 0; j < ${#kv_sz[@]}; j += 2))

Binary file not shown.

Before

Width:  |  Height:  |  Size: 108 KiB

After

Width:  |  Height:  |  Size: 134 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 131 KiB

After

Width:  |  Height:  |  Size: 122 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 133 KiB

After

Width:  |  Height:  |  Size: 131 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 97 KiB

After

Width:  |  Height:  |  Size: 149 KiB

View File

@ -15,7 +15,7 @@ function samples() {
kv_sz=(16 16 100 1024 1024 1024 16 10240) kv_sz=(16 16 100 1024 1024 1024 16 10240)
# set -x # set -x
db_root=$1 db_root=$1
cnt=10000 cnt=100000
for ((i = 1; i <= $(nproc); i *= 2)) for ((i = 1; i <= $(nproc); i *= 2))
do do
for ((j = 0; j < ${#kv_sz[@]}; j += 2)) for ((j = 0; j < ${#kv_sz[@]}; j += 2))

Binary file not shown.

Before

Width:  |  Height:  |  Size: 147 KiB

After

Width:  |  Height:  |  Size: 138 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 115 KiB

After

Width:  |  Height:  |  Size: 133 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 128 KiB

After

Width:  |  Height:  |  Size: 128 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 105 KiB

After

Width:  |  Height:  |  Size: 116 KiB

View File

@ -81,7 +81,7 @@ fn main() {
opt.sync_on_write = false; opt.sync_on_write = false;
opt.over_provision = true; // large value will use lots of memeory opt.over_provision = true; // large value will use lots of memeory
opt.inline_size = args.blob_size; opt.inline_size = args.blob_size;
opt.tmp_store = args.mode != "get"; opt.tmp_store = args.mode != "get" && args.mode != "scan";
let mut saved = opt.clone(); let mut saved = opt.clone();
saved.tmp_store = false; saved.tmp_store = false;
let mut db = Mace::new(opt.validate().unwrap()).unwrap(); let mut db = Mace::new(opt.validate().unwrap()).unwrap();
@ -89,14 +89,15 @@ fn main() {
let mut rng = rand::rng(); let mut rng = rand::rng();
let value = Arc::new(vec![b'0'; args.value_size]); let value = Arc::new(vec![b'0'; args.value_size]);
let keys_per_thread = args.iterations / args.threads;
for tid in 0..args.threads { for tid in 0..args.threads {
let mut tk = Vec::with_capacity(args.iterations); let mut tk = Vec::with_capacity(keys_per_thread);
for i in 0..args.iterations { for i in 0..keys_per_thread {
let mut key = format!("key_{tid}_{i}").into_bytes(); let mut key = format!("key_{tid}_{i}").into_bytes();
key.resize(args.key_size, b'x'); key.resize(args.key_size, b'x');
tk.push(key); tk.push(key);
} }
if args.random || args.mode == "get" || args.mode == "scan" { if args.random || args.mode == "get" {
tk.shuffle(&mut rng); tk.shuffle(&mut rng);
} }
keys.push(tk); keys.push(tk);
@ -110,11 +111,18 @@ fn main() {
} }
}); });
pre_tx.commit().unwrap(); pre_tx.commit().unwrap();
if args.mode == "get" { drop(db);
drop(db); // re-open db
// re-open db saved.tmp_store = true;
saved.tmp_store = true; db = Mace::new(saved.validate().unwrap()).unwrap();
db = Mace::new(saved.validate().unwrap()).unwrap();
// simulate common use cases
for i in 0..keys_per_thread {
let tid = rng.random_range(0..args.threads);
let mut k = format!("key_{tid}_{i}").into_bytes();
k.resize(args.key_size, b'x');
let view = db.view().unwrap();
view.get(&k).unwrap();
} }
} }
@ -182,10 +190,7 @@ fn main() {
let iter = view.seek(prefix); let iter = view.seek(prefix);
for x in iter { for x in iter {
round += 1; round += 1;
let k = x.key(); std::hint::black_box(x);
let v = x.val();
std::hint::black_box(k);
std::hint::black_box(v);
} }
} }
_ => panic!("Invalid mode"), _ => panic!("Invalid mode"),
@ -205,16 +210,6 @@ fn main() {
let total = total_ops.load(std::sync::atomic::Ordering::Relaxed); let total = total_ops.load(std::sync::atomic::Ordering::Relaxed);
let ops = (total as f64 / duration.as_secs_f64()) as usize; let ops = (total as f64 / duration.as_secs_f64()) as usize;
// println!("{:<20} {}", "Test Mode:", args.mode);
// println!("{:<20} {}", "Threads:", args.threads);
// println!("{:<20} {}", "Iterations", args.iterations);
// println!("{:<20} {}B", "Key Size:", args.key_size);
// println!("{:<20} {}B", "Value Size:", args.value_size);
// println!("{:<20} {ops}", "Total Ops:");
// if args.mode == "mixed" {
// println!("{:<20} {}%", "Insert Ratio:", args.insert_ratio);
// }
let ratio = if args.mode == "mixed" { let ratio = if args.mode == "mixed" {
args.insert_ratio args.insert_ratio
} else if args.mode == "insert" { } else if args.mode == "insert" {
@ -222,7 +217,6 @@ fn main() {
} else { } else {
0 0
}; };
// eprintln!("mode,threads,key_size,value_size,insert_ratio,ops");
eprintln!( eprintln!(
"{},{},{},{},{},{},{}", "{},{},{},{},{},{},{}",
args.mode, args.mode,