#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "CLI/CLI.hpp" #include "instant.h" template static void black_box(const T &t) { asm volatile("" : : "m"(t) : "memory"); } static size_t cores_online() { auto n = ::sysconf(_SC_NPROCESSORS_ONLN); return n > 0 ? static_cast(n) : 1; } static void bind_core(size_t tid) { cpu_set_t set; CPU_ZERO(&set); auto core = static_cast(tid % cores_online()); CPU_SET(core, &set); (void) pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &set); } static void require_ok(const rocksdb::Status &st, const char *what) { if (!st.ok()) { fmt::println(stderr, "{} failed: {}", what, st.ToString()); std::abort(); } } constexpr size_t kLatencyBuckets = 64; constexpr size_t kPrefixGroups = 1024; constexpr size_t kPrefillBatch = 1024; struct Args { size_t threads = 4; size_t iterations = 10000; size_t key_size = 16; size_t value_size = 1024; size_t blob_size = 8192; size_t insert_ratio = 30; bool random = false; std::string mode = "insert"; std::optional workload; std::string path = "/nvme/kv_bench_rocksdb"; bool shared_keyspace = true; size_t prefill_keys = 0; uint64_t warmup_secs = 0; uint64_t measure_secs = 0; size_t scan_len = 100; double zipf_theta = 0.99; std::string read_path = "snapshot"; std::string durability = "relaxed"; std::string result_file = "benchmark_results.csv"; bool cleanup = true; }; enum class Distribution { Uniform, Zipf, }; enum class ReadPath { Snapshot, RwTxn, }; enum class DurabilityMode { Relaxed, Durable, }; struct WorkloadSpec { std::string id; std::string mode_label; Distribution distribution; uint8_t read_pct; uint8_t update_pct; uint8_t scan_pct; size_t scan_len; bool requires_prefill; bool insert_only; }; struct ThreadRange { size_t start; size_t len; }; struct MachineMeta { std::string host; std::string os; std::string arch; std::string kernel; size_t cpu_cores; uint64_t mem_total_kb; uint64_t mem_available_kb; }; struct Quantiles { uint64_t p50_us = 0; uint64_t p95_us = 0; uint64_t p99_us = 0; uint64_t p999_us = 0; }; struct ThreadStats { uint64_t total_ops = 0; uint64_t error_ops = 0; std::array hist{}; }; struct ResultRow { uint64_t ts_epoch_ms; std::string workload_id; std::string mode; DurabilityMode durability_mode; size_t threads; size_t key_size; size_t value_size; size_t prefill_keys; bool shared_keyspace; Distribution distribution; double zipf_theta; uint8_t read_pct; uint8_t update_pct; uint8_t scan_pct; size_t scan_len; ReadPath read_path; uint64_t warmup_secs; uint64_t measure_secs; uint64_t total_ops; uint64_t error_ops; double ops_per_sec; Quantiles quantiles; uint64_t elapsed_us; MachineMeta meta; }; enum class OpKind { Read, Update, Scan, }; static std::string to_upper(std::string v) { for (auto &c: v) { if (c >= 'a' && c <= 'z') { c = static_cast(c - 'a' + 'A'); } } return v; } static std::string to_lower(std::string v) { for (auto &c: v) { if (c >= 'A' && c <= 'Z') { c = static_cast(c - 'A' + 'a'); } } return v; } static std::optional parse_read_path(const std::string &raw) { auto v = to_lower(raw); if (v == "snapshot") { return ReadPath::Snapshot; } if (v == "rw_txn" || v == "rwtxn" || v == "txn") { return ReadPath::RwTxn; } return std::nullopt; } static const char *read_path_str(ReadPath p) { switch (p) { case ReadPath::Snapshot: return "snapshot"; case ReadPath::RwTxn: return "rw_txn"; } return "snapshot"; } static std::optional parse_durability(const std::string &raw) { auto v = to_lower(raw); if (v == "relaxed") { return DurabilityMode::Relaxed; } if (v == "durable") { return DurabilityMode::Durable; } return std::nullopt; } static const char *durability_str(DurabilityMode mode) { switch (mode) { case DurabilityMode::Relaxed: return "relaxed"; case DurabilityMode::Durable: return "durable"; } return "relaxed"; } static const char *distribution_str(Distribution d) { switch (d) { case Distribution::Uniform: return "uniform"; case Distribution::Zipf: return "zipf"; } return "uniform"; } static std::optional parse_workload(const Args &args, std::string &err) { if (args.workload.has_value()) { auto id = to_upper(args.workload.value()); if (id == "W1") { return WorkloadSpec{id, "mixed", Distribution::Uniform, 95, 5, 0, args.scan_len, true, false}; } if (id == "W2") { return WorkloadSpec{id, "mixed", Distribution::Zipf, 95, 5, 0, args.scan_len, true, false}; } if (id == "W3") { return WorkloadSpec{id, "mixed", Distribution::Uniform, 50, 50, 0, args.scan_len, true, false}; } if (id == "W4") { return WorkloadSpec{id, "mixed", Distribution::Uniform, 5, 95, 0, args.scan_len, true, false}; } if (id == "W5") { return WorkloadSpec{id, "mixed", Distribution::Uniform, 70, 25, 5, args.scan_len, true, false}; } if (id == "W6") { return WorkloadSpec{id, "scan", Distribution::Uniform, 0, 0, 100, args.scan_len, true, false}; } err = fmt::format("invalid workload `{}` (supported: W1..W6)", args.workload.value()); return std::nullopt; } auto mode = to_lower(args.mode); if (mode == "insert") { return WorkloadSpec{"LEGACY_INSERT", "insert", Distribution::Uniform, 0, 100, 0, args.scan_len, false, true}; } if (mode == "get") { return WorkloadSpec{"LEGACY_GET", "get", Distribution::Uniform, 100, 0, 0, args.scan_len, true, false}; } if (mode == "mixed") { return WorkloadSpec{"LEGACY_MIXED", "mixed", Distribution::Uniform, static_cast(100 - args.insert_ratio), static_cast(args.insert_ratio), 0, args.scan_len, true, false}; } if (mode == "scan") { return WorkloadSpec{"LEGACY_SCAN", "scan", Distribution::Uniform, 0, 0, 100, args.scan_len, true, false}; } err = fmt::format("invalid mode `{}` (supported: insert/get/mixed/scan)", args.mode); return std::nullopt; } static std::vector split_ranges(size_t total, size_t n) { std::vector out; out.reserve(n); if (n == 0) { return out; } auto base = total / n; auto rem = total % n; size_t start = 0; for (size_t tid = 0; tid < n; ++tid) { size_t len = base + (tid < rem ? 1 : 0); out.push_back(ThreadRange{start, len}); start += len; } return out; } static std::string make_shared_key(size_t id, size_t key_size) { auto group = id % kPrefixGroups; auto key = fmt::format("s{:03x}_{:010x}", group, id); if (key.size() < key_size) { key.resize(key_size, 'x'); } return key; } static std::string make_thread_key(size_t tid, size_t local_id, size_t key_size) { auto key = fmt::format("t{:03x}_{:010x}", tid % kPrefixGroups, local_id); if (key.size() < key_size) { key.resize(key_size, 'x'); } return key; } static std::string make_shared_prefix(size_t id) { return fmt::format("s{:03x}_", id % kPrefixGroups); } static std::string make_thread_prefix(size_t tid) { return fmt::format("t{:03x}_", tid % kPrefixGroups); } static size_t latency_bucket(uint64_t us) { auto v = std::max(us, 1); size_t idx = 0; while (v >>= 1) { idx += 1; if (idx + 1 >= kLatencyBuckets) { break; } } return std::min(idx, kLatencyBuckets - 1); } static uint64_t histogram_quantile_us(const std::array &hist, double q) { uint64_t total = 0; for (auto v: hist) { total += v; } if (total == 0) { return 0; } auto target = static_cast(std::ceil(static_cast(total) * q)); uint64_t acc = 0; for (size_t i = 0; i < hist.size(); ++i) { acc += hist[i]; if (acc >= target) { if (i == 0) { return 1; } if (i >= 63) { return (1ULL << 63); } return (1ULL << i); } } return (1ULL << (kLatencyBuckets - 1)); } static uint64_t now_epoch_ms() { auto now = std::chrono::system_clock::now(); auto ms = std::chrono::duration_cast(now.time_since_epoch()); return static_cast(ms.count()); } static uint64_t read_mem_kb(const char *key) { std::ifstream in("/proc/meminfo"); if (!in.is_open()) { return 0; } std::string k; uint64_t val = 0; std::string unit; while (in >> k >> val >> unit) { if (k == key) { return val; } } return 0; } static MachineMeta gather_machine_meta() { char host_buf[256] = {0}; if (::gethostname(host_buf, sizeof(host_buf) - 1) != 0) { std::snprintf(host_buf, sizeof(host_buf), "unknown"); } struct utsname uts {}; std::string kernel = "unknown"; std::string os = "unknown"; std::string arch = "unknown"; if (::uname(&uts) == 0) { kernel = uts.release; os = uts.sysname; arch = uts.machine; } return MachineMeta{ .host = host_buf, .os = os, .arch = arch, .kernel = kernel, .cpu_cores = cores_online(), .mem_total_kb = read_mem_kb("MemTotal:"), .mem_available_kb = read_mem_kb("MemAvailable:"), }; } static std::string csv_escape(const std::string &v) { std::string out = v; for (auto &c: out) { if (c == ',' || c == '\n' || c == '\r') { c = ' '; } } return out; } static const char *result_header() { return "schema_version,ts_epoch_ms,engine,workload_id,mode,durability_mode,threads,key_size,value_size,prefill_keys,shared_keyspace,distribution,zipf_theta,read_pct,update_pct,scan_pct,scan_len,read_path,warmup_secs,measure_secs,total_ops,error_ops,ops_per_sec,p50_us,p95_us,p99_us,p999_us,elapsed_us,host,os,arch,kernel,cpu_cores,mem_total_kb,mem_available_kb"; } static std::string result_row_csv(const ResultRow &r) { return fmt::format( "v2,{},{},{},{},{},{},{},{},{},{},{},{:.4},{},{},{},{},{},{},{},{},{},{:.3},{},{},{},{},{},{},{},{},{},{},{},{}", r.ts_epoch_ms, "rocksdb", csv_escape(r.workload_id), csv_escape(r.mode), durability_str(r.durability_mode), r.threads, r.key_size, r.value_size, r.prefill_keys, r.shared_keyspace, distribution_str(r.distribution), r.zipf_theta, r.read_pct, r.update_pct, r.scan_pct, r.scan_len, read_path_str(r.read_path), r.warmup_secs, r.measure_secs, r.total_ops, r.error_ops, r.ops_per_sec, r.quantiles.p50_us, r.quantiles.p95_us, r.quantiles.p99_us, r.quantiles.p999_us, r.elapsed_us, csv_escape(r.meta.host), csv_escape(r.meta.os), csv_escape(r.meta.arch), csv_escape(r.meta.kernel), r.meta.cpu_cores, r.meta.mem_total_kb, r.meta.mem_available_kb); } static bool append_result_row(const std::string &path, const ResultRow &row) { auto exists = std::filesystem::exists(path); std::ofstream out(path, std::ios::out | std::ios::app); if (!out.is_open()) { return false; } if (!exists) { out << result_header() << "\n"; } out << result_row_csv(row) << "\n"; return true; } static size_t sample_zipf_like(std::mt19937_64 &rng, size_t n, double theta) { if (n <= 1) { return 0; } auto t = std::clamp(theta, 0.0001, 0.9999); std::uniform_real_distribution dist(0.0, 1.0); auto u = dist(rng); auto scaled = std::pow(u, 1.0 / (1.0 - t)); auto idx = static_cast(scaled * static_cast(n)); return std::min(idx, n - 1); } static std::optional pick_key_id(std::mt19937_64 &rng, Distribution distribution, double zipf_theta, bool shared_keyspace, size_t prefill_keys, size_t local_key_len) { if (shared_keyspace) { if (prefill_keys == 0) { return std::nullopt; } if (distribution == Distribution::Uniform) { std::uniform_int_distribution dist(0, prefill_keys - 1); return dist(rng); } return sample_zipf_like(rng, prefill_keys, zipf_theta); } if (local_key_len == 0) { return std::nullopt; } if (distribution == Distribution::Uniform) { std::uniform_int_distribution dist(0, local_key_len - 1); return dist(rng); } return sample_zipf_like(rng, local_key_len, zipf_theta); } static OpKind pick_op_kind(std::mt19937_64 &rng, const WorkloadSpec &spec) { if (spec.insert_only) { return OpKind::Update; } if (spec.scan_pct == 100) { return OpKind::Scan; } std::uniform_int_distribution dist(0, 99); auto roll = static_cast(dist(rng)); if (roll < spec.read_pct) { return OpKind::Read; } if (roll < static_cast(spec.read_pct + spec.update_pct)) { return OpKind::Update; } return OpKind::Scan; } static std::string find_upper_bound(const std::string &prefix) { std::string upper = prefix; for (int i = static_cast(upper.size()) - 1; i >= 0; --i) { if (static_cast(upper[i]) != 0xff) { upper[i] = static_cast(static_cast(upper[i]) + 1); upper.resize(static_cast(i + 1)); return upper; } } return ""; } static bool run_one_op(OpKind op, rocksdb::OptimisticTransactionDB *db, rocksdb::ColumnFamilyHandle *handle, const rocksdb::WriteOptions &wopt, const std::string &value, std::mt19937_64 &rng, const WorkloadSpec &spec, Distribution distribution, double zipf_theta, ReadPath read_path, size_t key_size, size_t scan_len, bool shared_keyspace, size_t prefill_keys, size_t local_key_len, size_t tid, std::atomic &insert_counter, size_t &local_insert_idx) { if (op == OpKind::Read) { auto maybe_id = pick_key_id(rng, distribution, zipf_theta, shared_keyspace, prefill_keys, local_key_len); if (!maybe_id.has_value()) { return false; } auto key = shared_keyspace ? make_shared_key(maybe_id.value(), key_size) : make_thread_key(tid, maybe_id.value(), key_size); auto ropt = rocksdb::ReadOptions(); std::string out; if (read_path == ReadPath::Snapshot) { auto st = db->Get(ropt, handle, key, &out); return st.ok(); } auto *txn = db->BeginTransaction(wopt); auto st = txn->Get(ropt, handle, key, &out); auto cst = txn->Commit(); delete txn; return st.ok() && cst.ok(); } if (op == OpKind::Update) { std::optional key; if (spec.insert_only) { if (shared_keyspace) { auto id = insert_counter.fetch_add(1, std::memory_order_relaxed); key = make_shared_key(id, key_size); } else { auto id = local_insert_idx; local_insert_idx += 1; key = make_thread_key(tid, id, key_size); } } else { auto maybe_id = pick_key_id(rng, distribution, zipf_theta, shared_keyspace, prefill_keys, local_key_len); if (!maybe_id.has_value()) { return false; } if (shared_keyspace) { key = make_shared_key(maybe_id.value(), key_size); } else { key = make_thread_key(tid, maybe_id.value(), key_size); } } auto *txn = db->BeginTransaction(wopt); auto pst = txn->Put(handle, key.value(), value); auto cst = txn->Commit(); delete txn; return pst.ok() && cst.ok(); } // scan std::optional prefix; if (shared_keyspace) { auto maybe_id = pick_key_id(rng, distribution, zipf_theta, true, prefill_keys, local_key_len); if (!maybe_id.has_value()) { return false; } prefix = make_shared_prefix(maybe_id.value()); } else { prefix = make_thread_prefix(tid); } auto ropt = rocksdb::ReadOptions(); auto upper = find_upper_bound(prefix.value()); rocksdb::Slice upper_slice(upper); if (!upper.empty()) { ropt.iterate_upper_bound = &upper_slice; } ropt.prefix_same_as_start = true; auto scan_limit = std::max(scan_len, 1); if (read_path == ReadPath::Snapshot) { auto *iter = db->NewIterator(ropt, handle); iter->Seek(prefix.value()); size_t scanned = 0; while (iter->Valid() && scanned < scan_limit) { black_box(iter->key()); black_box(iter->value()); iter->Next(); scanned += 1; } auto st = iter->status(); delete iter; return st.ok(); } auto *txn = db->BeginTransaction(wopt); auto *iter = txn->GetIterator(ropt); iter->Seek(prefix.value()); size_t scanned = 0; while (iter->Valid() && scanned < scan_limit) { black_box(iter->key()); black_box(iter->value()); iter->Next(); scanned += 1; } auto ist = iter->status(); auto cst = txn->Commit(); delete iter; delete txn; return ist.ok() && cst.ok(); } int main(int argc, char *argv[]) { CLI::App app{"rocksdb bench"}; Args args; bool disable_shared = false; bool disable_cleanup = false; std::string workload; app.add_option("-m,--mode", args.mode, "Mode: insert, get, mixed, scan"); app.add_option("--workload", workload, "Workload preset: W1..W6"); app.add_option("-t,--threads", args.threads, "Threads"); app.add_option("-k,--key-size", args.key_size, "Key Size"); app.add_option("-v,--value-size", args.value_size, "Value Size"); app.add_option("-b,--blob-size", args.blob_size, "Blob Size"); app.add_option("-i,--iterations", args.iterations, "Iterations"); app.add_option("-r,--insert-ratio", args.insert_ratio, "Update ratio for legacy mixed mode"); app.add_option("-p,--path", args.path, "Database path"); app.add_option("--prefill-keys", args.prefill_keys, "Prefill key count"); app.add_option("--warmup-secs", args.warmup_secs, "Warmup duration seconds"); app.add_option("--measure-secs", args.measure_secs, "Measure duration seconds"); app.add_option("--scan-len", args.scan_len, "Scan length per scan op"); app.add_option("--zipf-theta", args.zipf_theta, "Zipf theta"); app.add_option("--read-path", args.read_path, "snapshot|rw_txn"); app.add_option("--durability", args.durability, "relaxed|durable"); app.add_option("--result-file", args.result_file, "Unified result csv"); app.add_flag("--random", args.random, "Shuffle insert keys (legacy insert)"); app.add_flag("--no-shared-keyspace", disable_shared, "Use per-thread keyspace"); app.add_flag("--no-cleanup", disable_cleanup, "Keep db directory after run"); CLI11_PARSE(app, argc, argv); if (!workload.empty()) { args.workload = workload; } args.shared_keyspace = !disable_shared; args.cleanup = !disable_cleanup; if (args.path.empty()) { fmt::println(stderr, "path is empty"); return 1; } if (std::filesystem::exists(args.path)) { fmt::println(stderr, "path `{}` already exists", args.path); return 1; } if (args.threads == 0) { fmt::println(stderr, "threads must be greater than 0"); return 1; } if (args.key_size < 16 || args.value_size < 16) { fmt::println(stderr, "key_size and value_size must be >= 16"); return 1; } if (args.insert_ratio > 100) { fmt::println(stderr, "insert_ratio must be in [0,100]"); return 1; } if (!(args.zipf_theta > 0.0 && args.zipf_theta < 1.0)) { fmt::println(stderr, "zipf_theta must be in (0,1)"); return 1; } auto read_path = parse_read_path(args.read_path); if (!read_path.has_value()) { fmt::println(stderr, "invalid read_path `{}` (supported: snapshot, rw_txn)", args.read_path); return 1; } auto durability = parse_durability(args.durability); if (!durability.has_value()) { fmt::println(stderr, "invalid durability `{}` (supported: relaxed, durable)", args.durability); return 1; } std::string workload_err; auto workload_spec_opt = parse_workload(args, workload_err); if (!workload_spec_opt.has_value()) { fmt::println(stderr, "{}", workload_err); return 1; } auto workload_spec = workload_spec_opt.value(); auto prefill_keys = workload_spec.requires_prefill ? (args.prefill_keys > 0 ? args.prefill_keys : std::max(args.iterations, 1)) : args.prefill_keys; if (workload_spec.requires_prefill && prefill_keys == 0) { fmt::println(stderr, "prefill_keys must be > 0 for read/mixed/scan workloads"); return 1; } auto prefill_ranges = split_ranges(prefill_keys, args.threads); auto op_ranges = split_ranges(args.iterations, args.threads); rocksdb::ColumnFamilyOptions cfo{}; cfo.enable_blob_files = true; cfo.min_blob_size = args.blob_size; cfo.disable_auto_compactions = true; cfo.max_compaction_bytes = (1ULL << 60); cfo.level0_stop_writes_trigger = 1000000; cfo.level0_slowdown_writes_trigger = 1000000; cfo.level0_file_num_compaction_trigger = 1000000; cfo.write_buffer_size = 64 << 20; cfo.max_write_buffer_number = 128; auto cache = rocksdb::NewLRUCache(3 << 30); rocksdb::BlockBasedTableOptions table_options{}; table_options.block_cache = cache; cfo.table_factory.reset(NewBlockBasedTableFactory(table_options)); std::vector cfd{}; cfd.emplace_back("default", cfo); rocksdb::DBOptions options; options.create_if_missing = true; options.allow_concurrent_memtable_write = true; options.enable_pipelined_write = true; options.max_background_flushes = 8; options.env->SetBackgroundThreads(8, rocksdb::Env::Priority::HIGH); auto wopt = rocksdb::WriteOptions(); wopt.no_slowdown = true; wopt.sync = (durability.value() == DurabilityMode::Durable); rocksdb::OptimisticTransactionDB *db = nullptr; std::vector handles{}; auto st = rocksdb::OptimisticTransactionDB::Open(options, args.path, cfd, &handles, &db); require_ok(st, "open db"); auto *handle = handles[0]; std::string value(args.value_size, '0'); if (workload_spec.requires_prefill) { std::vector fill_threads; fill_threads.reserve(args.threads); for (size_t tid = 0; tid < args.threads; ++tid) { fill_threads.emplace_back([&, tid] { bind_core(tid); auto tr = prefill_ranges[tid]; auto *txn = db->BeginTransaction(wopt); size_t in_batch = 0; for (size_t i = 0; i < tr.len; ++i) { std::string key; if (args.shared_keyspace) { key = make_shared_key(tr.start + i, args.key_size); } else { key = make_thread_key(tid, i, args.key_size); } require_ok(txn->Put(handle, key, value), "prefill put"); in_batch += 1; if (in_batch >= kPrefillBatch) { require_ok(txn->Commit(), "prefill commit"); delete txn; txn = db->BeginTransaction(wopt); in_batch = 0; } } if (in_batch > 0) { require_ok(txn->Commit(), "prefill tail commit"); } delete txn; }); } for (auto &t: fill_threads) { t.join(); } } std::barrier ready_barrier(static_cast(args.threads + 1)); std::barrier measure_barrier(static_cast(args.threads + 1)); std::atomic insert_counter{0}; std::vector workers; workers.reserve(args.threads); std::vector thread_stats(args.threads); auto seed_base = now_epoch_ms(); for (size_t tid = 0; tid < args.threads; ++tid) { workers.emplace_back([&, tid] { bind_core(tid); auto &stats = thread_stats[tid]; std::mt19937_64 rng(seed_base ^ ((tid + 1) * 0x9E3779B97F4A7C15ULL)); auto local_key_len = prefill_ranges[tid].len; auto local_op_len = op_ranges[tid].len; size_t local_insert_idx = 0; std::vector count_indices(local_op_len); std::iota(count_indices.begin(), count_indices.end(), 0); if (args.random && workload_spec.insert_only) { std::shuffle(count_indices.begin(), count_indices.end(), rng); } ready_barrier.arrive_and_wait(); if (args.warmup_secs > 0) { auto warmup_deadline = std::chrono::steady_clock::now() + std::chrono::seconds(args.warmup_secs); while (std::chrono::steady_clock::now() < warmup_deadline) { auto op = pick_op_kind(rng, workload_spec); (void) run_one_op(op, db, handle, wopt, value, rng, workload_spec, workload_spec.distribution, args.zipf_theta, read_path.value(), args.key_size, workload_spec.scan_len, args.shared_keyspace, prefill_keys, local_key_len, tid, insert_counter, local_insert_idx); } } measure_barrier.arrive_and_wait(); auto record = [&](bool ok, uint64_t us) { stats.total_ops += 1; if (!ok) { stats.error_ops += 1; } auto b = latency_bucket(us); stats.hist[b] += 1; }; if (args.measure_secs > 0) { auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(args.measure_secs); while (std::chrono::steady_clock::now() < deadline) { auto op = pick_op_kind(rng, workload_spec); auto started = std::chrono::steady_clock::now(); auto ok = run_one_op(op, db, handle, wopt, value, rng, workload_spec, workload_spec.distribution, args.zipf_theta, read_path.value(), args.key_size, workload_spec.scan_len, args.shared_keyspace, prefill_keys, local_key_len, tid, insert_counter, local_insert_idx); auto us = static_cast(std::chrono::duration_cast( std::chrono::steady_clock::now() - started).count()); record(ok, us); } } else { for (size_t i: count_indices) { (void) i; auto op = workload_spec.insert_only ? OpKind::Update : pick_op_kind(rng, workload_spec); auto started = std::chrono::steady_clock::now(); auto ok = run_one_op(op, db, handle, wopt, value, rng, workload_spec, workload_spec.distribution, args.zipf_theta, read_path.value(), args.key_size, workload_spec.scan_len, args.shared_keyspace, prefill_keys, local_key_len, tid, insert_counter, local_insert_idx); auto us = static_cast(std::chrono::duration_cast( std::chrono::steady_clock::now() - started).count()); record(ok, us); } } }); } ready_barrier.arrive_and_wait(); measure_barrier.arrive_and_wait(); auto measure_started = nm::Instant::now(); for (auto &w: workers) { w.join(); } uint64_t elapsed_us = static_cast(measure_started.elapse_usec()); uint64_t total_ops = 0; uint64_t error_ops = 0; std::array merged_hist{}; for (const auto &s: thread_stats) { total_ops += s.total_ops; error_ops += s.error_ops; for (size_t i = 0; i < merged_hist.size(); ++i) { merged_hist[i] += s.hist[i]; } } auto ops_per_sec = elapsed_us == 0 ? 0.0 : (static_cast(total_ops) * 1'000'000.0 / static_cast(elapsed_us)); auto row = ResultRow{ .ts_epoch_ms = now_epoch_ms(), .workload_id = workload_spec.id, .mode = workload_spec.mode_label, .durability_mode = durability.value(), .threads = args.threads, .key_size = args.key_size, .value_size = args.value_size, .prefill_keys = prefill_keys, .shared_keyspace = args.shared_keyspace, .distribution = workload_spec.distribution, .zipf_theta = args.zipf_theta, .read_pct = workload_spec.read_pct, .update_pct = workload_spec.update_pct, .scan_pct = workload_spec.scan_pct, .scan_len = workload_spec.scan_len, .read_path = read_path.value(), .warmup_secs = args.warmup_secs, .measure_secs = args.measure_secs, .total_ops = total_ops, .error_ops = error_ops, .ops_per_sec = ops_per_sec, .quantiles = Quantiles{ .p50_us = histogram_quantile_us(merged_hist, 0.50), .p95_us = histogram_quantile_us(merged_hist, 0.95), .p99_us = histogram_quantile_us(merged_hist, 0.99), .p999_us = histogram_quantile_us(merged_hist, 0.999), }, .elapsed_us = elapsed_us, .meta = gather_machine_meta(), }; if (!append_result_row(args.result_file, row)) { fmt::println(stderr, "failed to write result file {}", args.result_file); return 1; } fmt::println( "engine=rocksdb workload={} mode={} durability={} threads={} ops={} err={} qps={:.2} p99_us={} result_file={}", row.workload_id, row.mode, durability_str(row.durability_mode), row.threads, row.total_ops, row.error_ops, row.ops_per_sec, row.quantiles.p99_us, args.result_file); delete handle; delete db; if (args.cleanup) { std::filesystem::remove_all(args.path); } return 0; }