From 0649db54e79d76b12e31921caa4469a0f417f141 Mon Sep 17 00:00:00 2001 From: abbycin Date: Tue, 3 Mar 2026 22:12:17 +0800 Subject: [PATCH] phase0: align benchmark v2 workload protocol --- plan_exec.md | 20 + rocksdb/main.cpp | 1061 +++++++++++++++++++++++++++++++++-------- scripts/init.sh | 2 +- scripts/mace.sh | 92 ++-- scripts/plot.py | 136 +++--- scripts/rocksdb.sh | 92 ++-- src/main.rs | 1115 ++++++++++++++++++++++++++++++++++++++------ 7 files changed, 2026 insertions(+), 492 deletions(-) create mode 100644 plan_exec.md diff --git a/plan_exec.md b/plan_exec.md new file mode 100644 index 0000000..d0c54d5 --- /dev/null +++ b/plan_exec.md @@ -0,0 +1,20 @@ +# kv_bench 执行记录(benchmark_refactor) + +## Phase 0(已完成) +- 日期:2026-03-03 +- 范围: + - 重构 `src/main.rs` 与 `rocksdb/main.cpp`,完成 v2 方法学最小清单: + - workload preset:`W1..W6` + - mixed/read/scan 的 prefill + shared keyspace + - 时长模式:`--warmup-secs` / `--measure-secs` + - 显式 read path parity:`--read-path snapshot|rw_txn` + - 统一 schema 结果落盘(CSV)并自动附带机器/环境元数据 + - 更新脚本:`scripts/mace.sh`、`scripts/rocksdb.sh`、`scripts/plot.py`、`scripts/init.sh` + - 默认数据目录切换为 `/nvme` 体系(脚本强制 db_root 在 `/nvme` 下) +- 编译验证: + - `cargo check -q` 通过 + - `cargo build --release -q` 通过 + - `cmake --build --preset release -j` 通过 +- 运行烟测: + - `mace` 与 `rocksdb` 均可按新参数运行并写入统一 schema 结果文件 +- 提交:待本阶段 commit diff --git a/rocksdb/main.cpp b/rocksdb/main.cpp index 6a4debf..98dd3bb 100644 --- a/rocksdb/main.cpp +++ b/rocksdb/main.cpp @@ -1,12 +1,22 @@ #include +#include +#include +#include #include #include #include -#include +#include +#include +#include +#include +#include +#include +#include +#include + #include #include -#include -#include + #include #include #include @@ -15,17 +25,9 @@ #include #include #include -#include - -#include -#include -#include -#include -#include -#include #include -#include +#include #include #include "CLI/CLI.hpp" @@ -33,7 +35,7 @@ template static void black_box(const T &t) { - asm volatile("" ::"m"(t) : "memory"); + asm volatile("" : : "m"(t) : "memory"); } static size_t cores_online() { @@ -56,88 +58,683 @@ static void require_ok(const rocksdb::Status &st, const char *what) { } } +constexpr size_t kLatencyBuckets = 64; +constexpr size_t kPrefixGroups = 1024; +constexpr size_t kPrefillBatch = 1024; + struct Args { + size_t threads = 4; + size_t iterations = 10000; + size_t key_size = 16; + size_t value_size = 1024; + size_t blob_size = 8192; + size_t insert_ratio = 30; + bool random = false; + std::string mode = "insert"; + std::optional workload; + std::string path = "/nvme/kv_bench_rocksdb"; + bool shared_keyspace = true; + size_t prefill_keys = 0; + uint64_t warmup_secs = 0; + uint64_t measure_secs = 0; + size_t scan_len = 100; + double zipf_theta = 0.99; + std::string read_path = "snapshot"; + std::string result_file = "benchmark_results.csv"; + bool cleanup = true; +}; + +enum class Distribution { + Uniform, + Zipf, +}; + +enum class ReadPath { + Snapshot, + RwTxn, +}; + +struct WorkloadSpec { + std::string id; + std::string mode_label; + Distribution distribution; + uint8_t read_pct; + uint8_t update_pct; + uint8_t scan_pct; + size_t scan_len; + bool requires_prefill; + bool insert_only; +}; + +struct ThreadRange { + size_t start; + size_t len; +}; + +struct MachineMeta { + std::string host; + std::string os; + std::string arch; + std::string kernel; + size_t cpu_cores; + uint64_t mem_total_kb; + uint64_t mem_available_kb; +}; + +struct Quantiles { + uint64_t p50_us = 0; + uint64_t p95_us = 0; + uint64_t p99_us = 0; + uint64_t p999_us = 0; +}; + +struct ThreadStats { + uint64_t total_ops = 0; + uint64_t error_ops = 0; + std::array hist{}; +}; + +struct ResultRow { + uint64_t ts_epoch_ms; + std::string workload_id; + std::string mode; size_t threads; - size_t iterations; size_t key_size; size_t value_size; - size_t blob_size; - size_t insert_ratio; - bool random; - std::string mode; - std::string path; + size_t prefill_keys; + bool shared_keyspace; + Distribution distribution; + double zipf_theta; + uint8_t read_pct; + uint8_t update_pct; + uint8_t scan_pct; + size_t scan_len; + ReadPath read_path; + uint64_t warmup_secs; + uint64_t measure_secs; + uint64_t total_ops; + uint64_t error_ops; + double ops_per_sec; + Quantiles quantiles; + uint64_t elapsed_us; + MachineMeta meta; }; +enum class OpKind { + Read, + Update, + Scan, +}; + +static std::string to_upper(std::string v) { + for (auto &c: v) { + if (c >= 'a' && c <= 'z') { + c = static_cast(c - 'a' + 'A'); + } + } + return v; +} + +static std::string to_lower(std::string v) { + for (auto &c: v) { + if (c >= 'A' && c <= 'Z') { + c = static_cast(c - 'A' + 'a'); + } + } + return v; +} + +static std::optional parse_read_path(const std::string &raw) { + auto v = to_lower(raw); + if (v == "snapshot") { + return ReadPath::Snapshot; + } + if (v == "rw_txn" || v == "rwtxn" || v == "txn") { + return ReadPath::RwTxn; + } + return std::nullopt; +} + +static const char *read_path_str(ReadPath p) { + switch (p) { + case ReadPath::Snapshot: + return "snapshot"; + case ReadPath::RwTxn: + return "rw_txn"; + } + return "snapshot"; +} + +static const char *distribution_str(Distribution d) { + switch (d) { + case Distribution::Uniform: + return "uniform"; + case Distribution::Zipf: + return "zipf"; + } + return "uniform"; +} + +static std::optional parse_workload(const Args &args, std::string &err) { + if (args.workload.has_value()) { + auto id = to_upper(args.workload.value()); + if (id == "W1") { + return WorkloadSpec{id, "mixed", Distribution::Uniform, 95, 5, 0, args.scan_len, true, false}; + } + if (id == "W2") { + return WorkloadSpec{id, "mixed", Distribution::Zipf, 95, 5, 0, args.scan_len, true, false}; + } + if (id == "W3") { + return WorkloadSpec{id, "mixed", Distribution::Uniform, 50, 50, 0, args.scan_len, true, false}; + } + if (id == "W4") { + return WorkloadSpec{id, "mixed", Distribution::Uniform, 5, 95, 0, args.scan_len, true, false}; + } + if (id == "W5") { + return WorkloadSpec{id, "mixed", Distribution::Uniform, 70, 25, 5, args.scan_len, true, false}; + } + if (id == "W6") { + return WorkloadSpec{id, "scan", Distribution::Uniform, 0, 0, 100, args.scan_len, true, false}; + } + err = fmt::format("invalid workload `{}` (supported: W1..W6)", args.workload.value()); + return std::nullopt; + } + + auto mode = to_lower(args.mode); + if (mode == "insert") { + return WorkloadSpec{"LEGACY_INSERT", "insert", Distribution::Uniform, 0, 100, 0, args.scan_len, false, true}; + } + if (mode == "get") { + return WorkloadSpec{"LEGACY_GET", "get", Distribution::Uniform, 100, 0, 0, args.scan_len, true, false}; + } + if (mode == "mixed") { + return WorkloadSpec{"LEGACY_MIXED", "mixed", Distribution::Uniform, + static_cast(100 - args.insert_ratio), + static_cast(args.insert_ratio), 0, args.scan_len, true, false}; + } + if (mode == "scan") { + return WorkloadSpec{"LEGACY_SCAN", "scan", Distribution::Uniform, 0, 0, 100, args.scan_len, true, false}; + } + err = fmt::format("invalid mode `{}` (supported: insert/get/mixed/scan)", args.mode); + return std::nullopt; +} + +static std::vector split_ranges(size_t total, size_t n) { + std::vector out; + out.reserve(n); + if (n == 0) { + return out; + } + auto base = total / n; + auto rem = total % n; + size_t start = 0; + for (size_t tid = 0; tid < n; ++tid) { + size_t len = base + (tid < rem ? 1 : 0); + out.push_back(ThreadRange{start, len}); + start += len; + } + return out; +} + +static std::string make_shared_key(size_t id, size_t key_size) { + auto group = id % kPrefixGroups; + auto key = fmt::format("s{:03x}_{:010x}", group, id); + if (key.size() < key_size) { + key.resize(key_size, 'x'); + } + return key; +} + +static std::string make_thread_key(size_t tid, size_t local_id, size_t key_size) { + auto key = fmt::format("t{:03x}_{:010x}", tid % kPrefixGroups, local_id); + if (key.size() < key_size) { + key.resize(key_size, 'x'); + } + return key; +} + +static std::string make_shared_prefix(size_t id) { + return fmt::format("s{:03x}_", id % kPrefixGroups); +} + +static std::string make_thread_prefix(size_t tid) { + return fmt::format("t{:03x}_", tid % kPrefixGroups); +} + +static size_t latency_bucket(uint64_t us) { + auto v = std::max(us, 1); + size_t idx = 0; + while (v >>= 1) { + idx += 1; + if (idx + 1 >= kLatencyBuckets) { + break; + } + } + return std::min(idx, kLatencyBuckets - 1); +} + +static uint64_t histogram_quantile_us(const std::array &hist, double q) { + uint64_t total = 0; + for (auto v: hist) { + total += v; + } + if (total == 0) { + return 0; + } + auto target = static_cast(std::ceil(static_cast(total) * q)); + uint64_t acc = 0; + for (size_t i = 0; i < hist.size(); ++i) { + acc += hist[i]; + if (acc >= target) { + if (i == 0) { + return 1; + } + if (i >= 63) { + return (1ULL << 63); + } + return (1ULL << i); + } + } + return (1ULL << (kLatencyBuckets - 1)); +} + +static uint64_t now_epoch_ms() { + auto now = std::chrono::system_clock::now(); + auto ms = std::chrono::duration_cast(now.time_since_epoch()); + return static_cast(ms.count()); +} + +static uint64_t read_mem_kb(const char *key) { + std::ifstream in("/proc/meminfo"); + if (!in.is_open()) { + return 0; + } + std::string k; + uint64_t val = 0; + std::string unit; + while (in >> k >> val >> unit) { + if (k == key) { + return val; + } + } + return 0; +} + +static MachineMeta gather_machine_meta() { + char host_buf[256] = {0}; + if (::gethostname(host_buf, sizeof(host_buf) - 1) != 0) { + std::snprintf(host_buf, sizeof(host_buf), "unknown"); + } + + struct utsname uts {}; + std::string kernel = "unknown"; + std::string os = "unknown"; + std::string arch = "unknown"; + if (::uname(&uts) == 0) { + kernel = uts.release; + os = uts.sysname; + arch = uts.machine; + } + + return MachineMeta{ + .host = host_buf, + .os = os, + .arch = arch, + .kernel = kernel, + .cpu_cores = cores_online(), + .mem_total_kb = read_mem_kb("MemTotal:"), + .mem_available_kb = read_mem_kb("MemAvailable:"), + }; +} + +static std::string csv_escape(const std::string &v) { + std::string out = v; + for (auto &c: out) { + if (c == ',' || c == '\n' || c == '\r') { + c = ' '; + } + } + return out; +} + +static const char *result_header() { + return "schema_version,ts_epoch_ms,engine,workload_id,mode,threads,key_size,value_size,prefill_keys,shared_keyspace,distribution,zipf_theta,read_pct,update_pct,scan_pct,scan_len,read_path,warmup_secs,measure_secs,total_ops,error_ops,ops_per_sec,p50_us,p95_us,p99_us,p999_us,elapsed_us,host,os,arch,kernel,cpu_cores,mem_total_kb,mem_available_kb"; +} + +static std::string result_row_csv(const ResultRow &r) { + return fmt::format( + "v2,{},{},{},{},{},{},{},{},{},{},{:.4},{},{},{},{},{},{},{},{},{},{:.3},{},{},{},{},{},{},{},{},{},{},{},{}", + r.ts_epoch_ms, + "rocksdb", + csv_escape(r.workload_id), + csv_escape(r.mode), + r.threads, + r.key_size, + r.value_size, + r.prefill_keys, + r.shared_keyspace, + distribution_str(r.distribution), + r.zipf_theta, + r.read_pct, + r.update_pct, + r.scan_pct, + r.scan_len, + read_path_str(r.read_path), + r.warmup_secs, + r.measure_secs, + r.total_ops, + r.error_ops, + r.ops_per_sec, + r.quantiles.p50_us, + r.quantiles.p95_us, + r.quantiles.p99_us, + r.quantiles.p999_us, + r.elapsed_us, + csv_escape(r.meta.host), + csv_escape(r.meta.os), + csv_escape(r.meta.arch), + csv_escape(r.meta.kernel), + r.meta.cpu_cores, + r.meta.mem_total_kb, + r.meta.mem_available_kb); +} + +static bool append_result_row(const std::string &path, const ResultRow &row) { + auto exists = std::filesystem::exists(path); + std::ofstream out(path, std::ios::out | std::ios::app); + if (!out.is_open()) { + return false; + } + if (!exists) { + out << result_header() << "\n"; + } + out << result_row_csv(row) << "\n"; + return true; +} + +static size_t sample_zipf_like(std::mt19937_64 &rng, size_t n, double theta) { + if (n <= 1) { + return 0; + } + auto t = std::clamp(theta, 0.0001, 0.9999); + std::uniform_real_distribution dist(0.0, 1.0); + auto u = dist(rng); + auto scaled = std::pow(u, 1.0 / (1.0 - t)); + auto idx = static_cast(scaled * static_cast(n)); + return std::min(idx, n - 1); +} + +static std::optional pick_key_id(std::mt19937_64 &rng, + Distribution distribution, + double zipf_theta, + bool shared_keyspace, + size_t prefill_keys, + size_t local_key_len) { + if (shared_keyspace) { + if (prefill_keys == 0) { + return std::nullopt; + } + if (distribution == Distribution::Uniform) { + std::uniform_int_distribution dist(0, prefill_keys - 1); + return dist(rng); + } + return sample_zipf_like(rng, prefill_keys, zipf_theta); + } + + if (local_key_len == 0) { + return std::nullopt; + } + + if (distribution == Distribution::Uniform) { + std::uniform_int_distribution dist(0, local_key_len - 1); + return dist(rng); + } + return sample_zipf_like(rng, local_key_len, zipf_theta); +} + +static OpKind pick_op_kind(std::mt19937_64 &rng, const WorkloadSpec &spec) { + if (spec.insert_only) { + return OpKind::Update; + } + if (spec.scan_pct == 100) { + return OpKind::Scan; + } + std::uniform_int_distribution dist(0, 99); + auto roll = static_cast(dist(rng)); + if (roll < spec.read_pct) { + return OpKind::Read; + } + if (roll < static_cast(spec.read_pct + spec.update_pct)) { + return OpKind::Update; + } + return OpKind::Scan; +} + +static std::string find_upper_bound(const std::string &prefix) { + std::string upper = prefix; + for (int i = static_cast(upper.size()) - 1; i >= 0; --i) { + if (static_cast(upper[i]) != 0xff) { + upper[i] = static_cast(static_cast(upper[i]) + 1); + upper.resize(static_cast(i + 1)); + return upper; + } + } + return ""; +} + +static bool run_one_op(OpKind op, + rocksdb::OptimisticTransactionDB *db, + rocksdb::ColumnFamilyHandle *handle, + const rocksdb::WriteOptions &wopt, + const std::string &value, + std::mt19937_64 &rng, + const WorkloadSpec &spec, + Distribution distribution, + double zipf_theta, + ReadPath read_path, + size_t key_size, + size_t scan_len, + bool shared_keyspace, + size_t prefill_keys, + size_t local_key_len, + size_t tid, + std::atomic &insert_counter, + size_t &local_insert_idx) { + if (op == OpKind::Read) { + auto maybe_id = pick_key_id(rng, distribution, zipf_theta, shared_keyspace, prefill_keys, local_key_len); + if (!maybe_id.has_value()) { + return false; + } + + auto key = shared_keyspace + ? make_shared_key(maybe_id.value(), key_size) + : make_thread_key(tid, maybe_id.value(), key_size); + + auto ropt = rocksdb::ReadOptions(); + std::string out; + + if (read_path == ReadPath::Snapshot) { + auto st = db->Get(ropt, handle, key, &out); + return st.ok(); + } + + auto *txn = db->BeginTransaction(wopt); + auto st = txn->Get(ropt, handle, key, &out); + auto cst = txn->Commit(); + delete txn; + return st.ok() && cst.ok(); + } + + if (op == OpKind::Update) { + std::optional key; + if (spec.insert_only) { + if (shared_keyspace) { + auto id = insert_counter.fetch_add(1, std::memory_order_relaxed); + key = make_shared_key(id, key_size); + } else { + auto id = local_insert_idx; + local_insert_idx += 1; + key = make_thread_key(tid, id, key_size); + } + } else { + auto maybe_id = pick_key_id(rng, distribution, zipf_theta, shared_keyspace, prefill_keys, local_key_len); + if (!maybe_id.has_value()) { + return false; + } + if (shared_keyspace) { + key = make_shared_key(maybe_id.value(), key_size); + } else { + key = make_thread_key(tid, maybe_id.value(), key_size); + } + } + + auto *txn = db->BeginTransaction(wopt); + auto pst = txn->Put(handle, key.value(), value); + auto cst = txn->Commit(); + delete txn; + return pst.ok() && cst.ok(); + } + + // scan + std::optional prefix; + if (shared_keyspace) { + auto maybe_id = pick_key_id(rng, distribution, zipf_theta, true, prefill_keys, local_key_len); + if (!maybe_id.has_value()) { + return false; + } + prefix = make_shared_prefix(maybe_id.value()); + } else { + prefix = make_thread_prefix(tid); + } + + auto ropt = rocksdb::ReadOptions(); + auto upper = find_upper_bound(prefix.value()); + rocksdb::Slice upper_slice(upper); + if (!upper.empty()) { + ropt.iterate_upper_bound = &upper_slice; + } + ropt.prefix_same_as_start = true; + + auto scan_limit = std::max(scan_len, 1); + + if (read_path == ReadPath::Snapshot) { + auto *iter = db->NewIterator(ropt, handle); + iter->Seek(prefix.value()); + size_t scanned = 0; + while (iter->Valid() && scanned < scan_limit) { + black_box(iter->key()); + black_box(iter->value()); + iter->Next(); + scanned += 1; + } + auto st = iter->status(); + delete iter; + return st.ok(); + } + + auto *txn = db->BeginTransaction(wopt); + auto *iter = txn->GetIterator(ropt); + iter->Seek(prefix.value()); + size_t scanned = 0; + while (iter->Valid() && scanned < scan_limit) { + black_box(iter->key()); + black_box(iter->value()); + iter->Next(); + scanned += 1; + } + auto ist = iter->status(); + auto cst = txn->Commit(); + delete iter; + delete txn; + return ist.ok() && cst.ok(); +} + int main(int argc, char *argv[]) { CLI::App app{"rocksdb bench"}; - Args args{ - .threads = 4, - .iterations = 100000, - .key_size = 16, - .value_size = 1024, - .blob_size = 8192, - .insert_ratio = 30, - .mode = "insert", - .path = "/tmp/rocksdb_tmp", - }; + Args args; + + bool disable_shared = false; + bool disable_cleanup = false; + std::string workload; app.add_option("-m,--mode", args.mode, "Mode: insert, get, mixed, scan"); + app.add_option("--workload", workload, "Workload preset: W1..W6"); app.add_option("-t,--threads", args.threads, "Threads"); app.add_option("-k,--key-size", args.key_size, "Key Size"); app.add_option("-v,--value-size", args.value_size, "Value Size"); app.add_option("-b,--blob-size", args.blob_size, "Blob Size"); app.add_option("-i,--iterations", args.iterations, "Iterations"); - app.add_option("-r,--insert-ratio", args.insert_ratio, "Insert Ratio for mixed mode"); - app.add_option("-p,--path", args.path, "DataBase Home"); - app.add_flag("--random", args.random, "Shuffle insert keys"); + app.add_option("-r,--insert-ratio", args.insert_ratio, "Update ratio for legacy mixed mode"); + app.add_option("-p,--path", args.path, "Database path"); + app.add_option("--prefill-keys", args.prefill_keys, "Prefill key count"); + app.add_option("--warmup-secs", args.warmup_secs, "Warmup duration seconds"); + app.add_option("--measure-secs", args.measure_secs, "Measure duration seconds"); + app.add_option("--scan-len", args.scan_len, "Scan length per scan op"); + app.add_option("--zipf-theta", args.zipf_theta, "Zipf theta"); + app.add_option("--read-path", args.read_path, "snapshot|rw_txn"); + app.add_option("--result-file", args.result_file, "Unified result csv"); + app.add_flag("--random", args.random, "Shuffle insert keys (legacy insert)"); + app.add_flag("--no-shared-keyspace", disable_shared, "Use per-thread keyspace"); + app.add_flag("--no-cleanup", disable_cleanup, "Keep db directory after run"); CLI11_PARSE(app, argc, argv); + if (!workload.empty()) { + args.workload = workload; + } + args.shared_keyspace = !disable_shared; + args.cleanup = !disable_cleanup; + if (args.path.empty()) { - fmt::println("path is empty"); + fmt::println(stderr, "path is empty"); return 1; } - if (std::filesystem::exists(args.path)) { - fmt::println("path `{}` already exists", args.path); + fmt::println(stderr, "path `{}` already exists", args.path); return 1; } - if (args.threads == 0) { - fmt::println("Error: threads must be greater than 0"); + fmt::println(stderr, "threads must be greater than 0"); return 1; } - - if (args.mode != "insert" && args.mode != "get" && args.mode != "mixed" && args.mode != "scan") { - fmt::println("Error: Invalid mode"); - return 1; - } - if (args.key_size < 16 || args.value_size < 16) { - fmt::println("Error: key_size or value_size too small, must >= 16"); + fmt::println(stderr, "key_size and value_size must be >= 16"); return 1; } - if (args.insert_ratio > 100) { - fmt::println("Error: Insert ratio must be between 0 and 100"); + fmt::println(stderr, "insert_ratio must be in [0,100]"); + return 1; + } + if (!(args.zipf_theta > 0.0 && args.zipf_theta < 1.0)) { + fmt::println(stderr, "zipf_theta must be in (0,1)"); return 1; } - auto find_upper_bound = [](std::string prefix) { - std::string upper_bound_key = prefix; - for (int i = upper_bound_key.length() - 1; i >= 0; --i) { - if ((unsigned char) upper_bound_key[i] != 0xff) { - upper_bound_key[i] = (unsigned char) upper_bound_key[i] + 1; - upper_bound_key.resize(i + 1); - break; - } - if (i == 0) { - upper_bound_key = ""; - break; - } - } - return upper_bound_key; - }; + auto read_path = parse_read_path(args.read_path); + if (!read_path.has_value()) { + fmt::println(stderr, "invalid read_path `{}` (supported: snapshot, rw_txn)", args.read_path); + return 1; + } + + std::string workload_err; + auto workload_spec_opt = parse_workload(args, workload_err); + if (!workload_spec_opt.has_value()) { + fmt::println(stderr, "{}", workload_err); + return 1; + } + auto workload_spec = workload_spec_opt.value(); + + auto prefill_keys = workload_spec.requires_prefill + ? (args.prefill_keys > 0 ? args.prefill_keys : std::max(args.iterations, 1)) + : args.prefill_keys; + + if (workload_spec.requires_prefill && prefill_keys == 0) { + fmt::println(stderr, "prefill_keys must be > 0 for read/mixed/scan workloads"); + return 1; + } + + auto prefill_ranges = split_ranges(prefill_keys, args.threads); + auto op_ranges = split_ranges(args.iterations, args.threads); rocksdb::ColumnFamilyOptions cfo{}; cfo.enable_blob_files = true; @@ -150,14 +747,13 @@ int main(int argc, char *argv[]) { cfo.write_buffer_size = 64 << 20; cfo.max_write_buffer_number = 128; - // use 3GB block cache auto cache = rocksdb::NewLRUCache(3 << 30); rocksdb::BlockBasedTableOptions table_options{}; table_options.block_cache = cache; cfo.table_factory.reset(NewBlockBasedTableFactory(table_options)); std::vector cfd{}; - cfd.push_back(rocksdb::ColumnFamilyDescriptor("default", cfo)); + cfd.emplace_back("default", cfo); rocksdb::DBOptions options; options.create_if_missing = true; @@ -168,164 +764,249 @@ int main(int argc, char *argv[]) { auto wopt = rocksdb::WriteOptions(); wopt.no_slowdown = true; - std::vector wg; - std::atomic total_op{0}; - rocksdb::OptimisticTransactionDB *db; - auto b = nm::Instant::now(); + + rocksdb::OptimisticTransactionDB *db = nullptr; std::vector handles{}; - auto s = rocksdb::OptimisticTransactionDB::Open(options, args.path, cfd, &handles, &db); - require_ok(s, "open db"); - std::barrier ready_barrier{static_cast(args.threads + 1)}; - std::barrier start_barrier{static_cast(args.threads + 1)}; - - std::random_device rd{}; - - std::string val(args.value_size, 'x'); - + auto st = rocksdb::OptimisticTransactionDB::Open(options, args.path, cfd, &handles, &db); + require_ok(st, "open db"); auto *handle = handles[0]; - if (args.mode == "get" || args.mode == "scan") { + std::string value(args.value_size, '0'); + + if (workload_spec.requires_prefill) { std::vector fill_threads; + fill_threads.reserve(args.threads); for (size_t tid = 0; tid < args.threads; ++tid) { fill_threads.emplace_back([&, tid] { bind_core(tid); - size_t count = args.iterations / args.threads + (tid < args.iterations % args.threads ? 1 : 0); - const size_t batch_size = 10000; - for (size_t i = 0; i < count; i += batch_size) { - auto *kv = db->BeginTransaction(wopt); - for (size_t j = 0; j < batch_size && (i + j) < count; ++j) { - auto key = std::format("key_{}_{}", tid, i + j); - key.resize(args.key_size, 'x'); - require_ok(kv->Put(handle, key, val), "fill put"); + auto tr = prefill_ranges[tid]; + auto *txn = db->BeginTransaction(wopt); + size_t in_batch = 0; + for (size_t i = 0; i < tr.len; ++i) { + std::string key; + if (args.shared_keyspace) { + key = make_shared_key(tr.start + i, args.key_size); + } else { + key = make_thread_key(tid, i, args.key_size); + } + require_ok(txn->Put(handle, key, value), "prefill put"); + in_batch += 1; + if (in_batch >= kPrefillBatch) { + require_ok(txn->Commit(), "prefill commit"); + delete txn; + txn = db->BeginTransaction(wopt); + in_batch = 0; } - require_ok(kv->Commit(), "fill commit"); - delete kv; } + if (in_batch > 0) { + require_ok(txn->Commit(), "prefill tail commit"); + } + delete txn; }); } - for (auto &t: fill_threads) + for (auto &t: fill_threads) { t.join(); - - delete handle; - delete db; - handles.clear(); - // re-open db - s = rocksdb::OptimisticTransactionDB::Open(options, args.path, cfd, &handles, &db); - require_ok(s, "reopen db"); - handle = handles[0]; + } } - auto base_seed = rd(); - for (size_t tid = 0; tid < args.threads; ++tid) { - wg.emplace_back([&, tid] { - bind_core(tid); - std::string rval(args.value_size, '0'); - auto prefix = std::format("key_{}_", tid); - auto ropt = rocksdb::ReadOptions(); - auto upper_bound = find_upper_bound(prefix); - auto upper_bound_slice = rocksdb::Slice(upper_bound); - if (!upper_bound.empty()) { - ropt.iterate_upper_bound = &upper_bound_slice; - } - ropt.prefix_same_as_start = true; - size_t round = 0; - std::mt19937 thread_gen(static_cast(base_seed) ^ static_cast(tid)); - std::uniform_int_distribution mixed_dist(0, 99); + std::barrier ready_barrier(static_cast(args.threads + 1)); + std::barrier measure_barrier(static_cast(args.threads + 1)); - size_t key_count = args.iterations / args.threads + (tid < args.iterations % args.threads ? 1 : 0); - std::vector indices(key_count); - std::iota(indices.begin(), indices.end(), 0); - if (args.random) { - std::shuffle(indices.begin(), indices.end(), thread_gen); + std::atomic insert_counter{0}; + std::vector workers; + workers.reserve(args.threads); + std::vector thread_stats(args.threads); + + auto seed_base = now_epoch_ms(); + + for (size_t tid = 0; tid < args.threads; ++tid) { + workers.emplace_back([&, tid] { + bind_core(tid); + auto &stats = thread_stats[tid]; + std::mt19937_64 rng(seed_base ^ ((tid + 1) * 0x9E3779B97F4A7C15ULL)); + auto local_key_len = prefill_ranges[tid].len; + auto local_op_len = op_ranges[tid].len; + size_t local_insert_idx = 0; + + std::vector count_indices(local_op_len); + std::iota(count_indices.begin(), count_indices.end(), 0); + if (args.random && workload_spec.insert_only) { + std::shuffle(count_indices.begin(), count_indices.end(), rng); } ready_barrier.arrive_and_wait(); - start_barrier.arrive_and_wait(); - if (args.mode == "insert") { - for (size_t i: indices) { - auto key = std::format("key_{}_{}", tid, i); - key.resize(args.key_size, 'x'); - round += 1; - auto *kv = db->BeginTransaction(wopt); - require_ok(kv->Put(handle, key, val), "insert put"); - require_ok(kv->Commit(), "insert commit"); - delete kv; + if (args.warmup_secs > 0) { + auto warmup_deadline = std::chrono::steady_clock::now() + std::chrono::seconds(args.warmup_secs); + while (std::chrono::steady_clock::now() < warmup_deadline) { + auto op = pick_op_kind(rng, workload_spec); + (void) run_one_op(op, + db, + handle, + wopt, + value, + rng, + workload_spec, + workload_spec.distribution, + args.zipf_theta, + read_path.value(), + args.key_size, + workload_spec.scan_len, + args.shared_keyspace, + prefill_keys, + local_key_len, + tid, + insert_counter, + local_insert_idx); + } + } + + measure_barrier.arrive_and_wait(); + + auto record = [&](bool ok, uint64_t us) { + stats.total_ops += 1; + if (!ok) { + stats.error_ops += 1; + } + auto b = latency_bucket(us); + stats.hist[b] += 1; + }; + + if (args.measure_secs > 0) { + auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(args.measure_secs); + while (std::chrono::steady_clock::now() < deadline) { + auto op = pick_op_kind(rng, workload_spec); + auto started = std::chrono::steady_clock::now(); + auto ok = run_one_op(op, + db, + handle, + wopt, + value, + rng, + workload_spec, + workload_spec.distribution, + args.zipf_theta, + read_path.value(), + args.key_size, + workload_spec.scan_len, + args.shared_keyspace, + prefill_keys, + local_key_len, + tid, + insert_counter, + local_insert_idx); + auto us = static_cast(std::chrono::duration_cast( + std::chrono::steady_clock::now() - started).count()); + record(ok, us); + } + } else { + for (size_t i: count_indices) { + (void) i; + auto op = workload_spec.insert_only ? OpKind::Update : pick_op_kind(rng, workload_spec); + auto started = std::chrono::steady_clock::now(); + auto ok = run_one_op(op, + db, + handle, + wopt, + value, + rng, + workload_spec, + workload_spec.distribution, + args.zipf_theta, + read_path.value(), + args.key_size, + workload_spec.scan_len, + args.shared_keyspace, + prefill_keys, + local_key_len, + tid, + insert_counter, + local_insert_idx); + auto us = static_cast(std::chrono::duration_cast( + std::chrono::steady_clock::now() - started).count()); + record(ok, us); } - } else if (args.mode == "get") { - // rocksdb has no dedicated read-only txn in this bench path, use direct get for fair read-path - // comparison with mace view - for (size_t i: indices) { - auto key = std::format("key_{}_{}", tid, i); - key.resize(args.key_size, 'x'); - round += 1; - require_ok(db->Get(ropt, handle, key, &rval), "get"); - } - } else if (args.mode == "mixed") { - for (size_t i: indices) { - auto key = std::format("key_{}_{}", tid, i); - key.resize(args.key_size, 'x'); - round += 1; - auto is_insert = mixed_dist(thread_gen) < static_cast(args.insert_ratio); - auto *kv = db->BeginTransaction(wopt); - if (is_insert) { - require_ok(kv->Put(handle, key, val), "mixed put"); - } else { - auto st = kv->Get(ropt, handle, key, &rval); - if (!st.ok() && !st.IsNotFound()) { - require_ok(st, "mixed get"); - } - } - require_ok(kv->Commit(), "mixed commit"); - delete kv; - } - } else if (args.mode == "scan") { - auto *iter = db->NewIterator(ropt); - iter->Seek(prefix); - while (iter->Valid()) { - round += 1; - auto k = iter->key(); - auto v = iter->value(); - black_box(k); - black_box(v); - iter->Next(); - } - require_ok(iter->status(), "scan iterate"); - delete iter; } - total_op.fetch_add(round, std::memory_order::relaxed); }); } ready_barrier.arrive_and_wait(); - b = nm::Instant::now(); - start_barrier.arrive_and_wait(); + measure_barrier.arrive_and_wait(); + auto measure_started = nm::Instant::now(); - for (auto &w: wg) { + for (auto &w: workers) { w.join(); } - size_t ratio = [&args] -> size_t { - if (args.mode == "mixed") - return args.insert_ratio; - return args.mode == "insert" ? 100 : 0; - }(); - const auto elapsed_us = b.elapse_usec(); - uint64_t ops = 0; - const auto total = total_op.load(std::memory_order_relaxed); - if (elapsed_us > 0) { - ops = static_cast(static_cast(total) * 1000000.0 / elapsed_us); - } - if (args.mode == "insert") { - if (args.random) { - args.mode = "random_insert"; - } else { - args.mode = "sequential_insert"; + uint64_t elapsed_us = static_cast(measure_started.elapse_usec()); + uint64_t total_ops = 0; + uint64_t error_ops = 0; + std::array merged_hist{}; + + for (const auto &s: thread_stats) { + total_ops += s.total_ops; + error_ops += s.error_ops; + for (size_t i = 0; i < merged_hist.size(); ++i) { + merged_hist[i] += s.hist[i]; } } - fmt::println("{},{},{},{},{},{},{}", args.mode, args.threads, args.key_size, args.value_size, ratio, ops, - static_cast(elapsed_us)); + + auto ops_per_sec = elapsed_us == 0 + ? 0.0 + : (static_cast(total_ops) * 1'000'000.0 / static_cast(elapsed_us)); + + auto row = ResultRow{ + .ts_epoch_ms = now_epoch_ms(), + .workload_id = workload_spec.id, + .mode = workload_spec.mode_label, + .threads = args.threads, + .key_size = args.key_size, + .value_size = args.value_size, + .prefill_keys = prefill_keys, + .shared_keyspace = args.shared_keyspace, + .distribution = workload_spec.distribution, + .zipf_theta = args.zipf_theta, + .read_pct = workload_spec.read_pct, + .update_pct = workload_spec.update_pct, + .scan_pct = workload_spec.scan_pct, + .scan_len = workload_spec.scan_len, + .read_path = read_path.value(), + .warmup_secs = args.warmup_secs, + .measure_secs = args.measure_secs, + .total_ops = total_ops, + .error_ops = error_ops, + .ops_per_sec = ops_per_sec, + .quantiles = Quantiles{ + .p50_us = histogram_quantile_us(merged_hist, 0.50), + .p95_us = histogram_quantile_us(merged_hist, 0.95), + .p99_us = histogram_quantile_us(merged_hist, 0.99), + .p999_us = histogram_quantile_us(merged_hist, 0.999), + }, + .elapsed_us = elapsed_us, + .meta = gather_machine_meta(), + }; + + if (!append_result_row(args.result_file, row)) { + fmt::println(stderr, "failed to write result file {}", args.result_file); + return 1; + } + + fmt::println( + "engine=rocksdb workload={} mode={} threads={} ops={} err={} qps={:.2} p99_us={} result_file={}", + row.workload_id, + row.mode, + row.threads, + row.total_ops, + row.error_ops, + row.ops_per_sec, + row.quantiles.p99_us, + args.result_file); + delete handle; delete db; - std::filesystem::remove_all(args.path); + + if (args.cleanup) { + std::filesystem::remove_all(args.path); + } + return 0; } diff --git a/scripts/init.sh b/scripts/init.sh index 881ade4..74b0405 100755 --- a/scripts/init.sh +++ b/scripts/init.sh @@ -1,5 +1,5 @@ #!/usr/bin/env bash python3 -m venv . -./bin/pip3 install pandas matplotlib adjustText +./bin/pip3 install pandas matplotlib rm -f .gitignore diff --git a/scripts/mace.sh b/scripts/mace.sh index c06529c..796bc18 100755 --- a/scripts/mace.sh +++ b/scripts/mace.sh @@ -2,55 +2,61 @@ set -euo pipefail -if [ "$#" -ne 1 ] -then - printf "\033[m$0 path\033[0m\n" +if [ "$#" -lt 1 ] || [ "$#" -gt 2 ]; then + printf "Usage: %s [result_csv]\n" "$0" exit 1 fi script_dir="$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" && pwd)" root_dir="$(cd -- "${script_dir}/.." && pwd)" -cargo build --release --manifest-path "${root_dir}/Cargo.toml" 1>/dev/null 2>/dev/null +# The runner creates per-case unique paths under this root; each path must not exist. +db_root="$1" +result_file="${2:-${script_dir}/benchmark_results.csv}" -function samples() { - export RUST_BACKTRACE=full - kv_sz=(16 16 100 1024 1024 1024 16 10240) - mode=(insert get mixed scan) - # set -x - db_root="$1" +warmup_secs="${WARMUP_SECS:-10}" +measure_secs="${MEASURE_SECS:-20}" +prefill_keys="${PREFILL_KEYS:-200000}" +read_path="${READ_PATH:-snapshot}" - cnt=100000 - for ((i = 1; i <= $(nproc); i *= 2)) - do - for ((j = 0; j < ${#kv_sz[@]}; j += 2)) - do - for ((k = 0; k < ${#mode[@]}; k += 1)) - do - if [ "${mode[k]}" == "insert" ] - then - "${root_dir}/target/release/kv_bench" --path "${db_root}" --threads "${i}" --iterations "${cnt}" --mode "${mode[k]}" --key-size "${kv_sz[j]}" --value-size "${kv_sz[j+1]}" --random - if test $? -ne 0 - then - echo "${mode[k]} threads $i ksz ${kv_sz[j]} vsz ${kv_sz[j+1]} random fail" - exit 1 - fi - fi - "${root_dir}/target/release/kv_bench" --path "${db_root}" --threads "${i}" --iterations "${cnt}" --mode "${mode[k]}" --key-size "${kv_sz[j]}" --value-size "${kv_sz[j+1]}" - if test $? -ne 0 - then - echo "${mode[k]} threads $i ksz ${kv_sz[j]} vsz ${kv_sz[j+1]} fail" - exit 1 - fi - done - done - done -} - -echo mode,threads,key_size,value_size,insert_ratio,ops,elapsed_us > "${script_dir}/mace.csv" -samples "$1" 1>> "${script_dir}/mace.csv" -if [ -x "${script_dir}/bin/python" ]; then - (cd "${script_dir}" && "${script_dir}/bin/python" plot.py mace.csv) -else - (cd "${script_dir}" && python3 plot.py mace.csv) +if [[ "${db_root}" != /nvme* ]]; then + printf "db_root must be under /nvme, got: %s\n" "${db_root}" >&2 + exit 1 fi + +mkdir -p "${db_root}" +mkdir -p "$(dirname -- "${result_file}")" + +cargo build --release --manifest-path "${root_dir}/Cargo.toml" + +workloads=(W1 W2 W3 W4 W5 W6) +threads=(1 6 12) +profiles=( + "32 1024" + "32 16384" +) + +for workload in "${workloads[@]}"; do + for t in "${threads[@]}"; do + for kv in "${profiles[@]}"; do + read -r key_size value_size <<< "${kv}" + run_path="$(mktemp -u -p "${db_root}" "mace_${workload}_${t}_${key_size}_${value_size}_XXXXXX")" + printf "[mace] workload=%s threads=%s key=%s value=%s path=%s\n" \ + "${workload}" "${t}" "${key_size}" "${value_size}" "${run_path}" + "${root_dir}/target/release/kv_bench" \ + --path "${run_path}" \ + --workload "${workload}" \ + --threads "${t}" \ + --key-size "${key_size}" \ + --value-size "${value_size}" \ + --prefill-keys "${prefill_keys}" \ + --warmup-secs "${warmup_secs}" \ + --measure-secs "${measure_secs}" \ + --shared-keyspace true \ + --read-path "${read_path}" \ + --result-file "${result_file}" + done + done +done + +printf "Mace runs finished. Results appended to: %s\n" "${result_file}" diff --git a/scripts/plot.py b/scripts/plot.py index 386ceff..c7b84cd 100644 --- a/scripts/plot.py +++ b/scripts/plot.py @@ -1,65 +1,81 @@ -import pandas as pd -import matplotlib.pyplot as plt -from adjustText import adjust_text +#!/usr/bin/env python3 + import sys +from pathlib import Path + +import matplotlib.pyplot as plt +import pandas as pd -def real_mode(m): - if m == "mixed": - return "Mixed (70% Get, 30% Insert)" - elif m == "get": - return "Random Get" - elif m == "scan": - return "Sequential Scan" - return m.capitalize() +def main() -> int: + if len(sys.argv) not in (2, 3): + print(f"Usage: {sys.argv[0]} [output_dir]") + return 1 + + result_csv = Path(sys.argv[1]) + output_dir = Path(sys.argv[2]) if len(sys.argv) == 3 else result_csv.parent + output_dir.mkdir(parents=True, exist_ok=True) + + df = pd.read_csv(result_csv) + + required = { + "engine", + "workload_id", + "threads", + "key_size", + "value_size", + "ops_per_sec", + "p99_us", + } + missing = required - set(df.columns) + if missing: + raise ValueError(f"Missing required columns: {sorted(missing)}") + + for engine in sorted(df["engine"].unique()): + engine_df = df[df["engine"] == engine] + profiles = ( + engine_df[["key_size", "value_size"]] + .drop_duplicates() + .sort_values(["key_size", "value_size"]) + .itertuples(index=False) + ) + + for key_size, value_size in profiles: + sub = engine_df[ + (engine_df["key_size"] == key_size) + & (engine_df["value_size"] == value_size) + ] + if sub.empty: + continue + + for metric, ylabel in (("ops_per_sec", "OPS/s"), ("p99_us", "P99 Latency (us)")): + plt.figure(figsize=(12, 7)) + for workload in sorted(sub["workload_id"].unique()): + wdf = sub[sub["workload_id"] == workload].sort_values("threads") + plt.plot( + wdf["threads"], + wdf[metric], + marker="o", + linewidth=2, + label=workload, + ) + + plt.title( + f"{engine.upper()} {metric} (key={key_size}, value={value_size})", + fontsize=14, + ) + plt.xlabel("Threads") + plt.ylabel(ylabel) + plt.grid(True, linestyle="--", alpha=0.5) + plt.legend() + plt.tight_layout() + out = output_dir / f"{engine}_{metric}_k{key_size}_v{value_size}.png" + plt.savefig(out) + plt.close() + + print(f"Charts written to: {output_dir}") + return 0 -name = sys.argv[1] -prefix = name.split(".")[0] - -# read benchmark data -# keep compatibility with older csv files that used elapsed/elasped -# and normalize to elapsed_us - -df = pd.read_csv(f"./{name}") -if "elapsed_us" not in df.columns: - if "elapsed" in df.columns: - df = df.rename(columns={"elapsed": "elapsed_us"}) - elif "elasped" in df.columns: - df = df.rename(columns={"elasped": "elapsed_us"}) - -# group by mode -modes = df["mode"].unique() - -for mode in modes: - plt.figure(figsize=(16, 9)) - subset = df[df["mode"] == mode] - - # group by key/value size - key_value_combinations = subset.groupby(["key_size", "value_size"]) - - texts = [] - for (key_size, value_size), group in key_value_combinations: - label = f"key={key_size}B, val={value_size}B" - x = group["threads"] - y = group["ops"] - - # draw line - line, = plt.plot(x, y, marker="o", label=label) - - # add labels - for xi, yi, ops in zip(x, y, group["ops"]): - texts.append( - plt.text(xi, yi, f"{int(ops)}", color=line.get_color(), fontsize=12) - ) - - adjust_text(texts, arrowprops=dict(arrowstyle="->", color="gray")) - - plt.title(f"{prefix.upper()}: {real_mode(mode)}", fontsize=16) - plt.xlabel("Threads", fontsize=14) - plt.ylabel("OPS", fontsize=14) - plt.grid(True, linestyle="--", alpha=0.6) - plt.legend() - plt.tight_layout() - plt.savefig(f"{prefix}_{mode}.png") - plt.close() +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/scripts/rocksdb.sh b/scripts/rocksdb.sh index 0bebf9d..7c85e92 100755 --- a/scripts/rocksdb.sh +++ b/scripts/rocksdb.sh @@ -2,55 +2,61 @@ set -euo pipefail -if [ "$#" -ne 1 ] -then - printf "\033[m$0 path\033[0m\n" - exit 1 +if [ "$#" -lt 1 ] || [ "$#" -gt 2 ]; then + printf "Usage: %s [result_csv]\n" "$0" + exit 1 fi script_dir="$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" && pwd)" root_dir="$(cd -- "${script_dir}/.." && pwd)" rocksdb_dir="${root_dir}/rocksdb" -(cd "${rocksdb_dir}" && cmake --preset release 1>/dev/null 2>/dev/null) -(cd "${rocksdb_dir}" && cmake --build --preset release 1>/dev/null 2>/dev/null) +db_root="$1" +result_file="${2:-${script_dir}/benchmark_results.csv}" -function samples() { - kv_sz=(16 16 100 1024 1024 1024 16 10240) - mode=(insert get mixed scan) - # set -x - db_root="$1" - cnt=100000 - for ((i = 1; i <= $(nproc); i *= 2)) - do - for ((j = 0; j < ${#kv_sz[@]}; j += 2)) - do - for ((k = 0; k < ${#mode[@]}; k += 1)) - do - if [ "${mode[k]}" == "insert" ] - then - "${rocksdb_dir}/build/release/rocksdb_bench" --path "${db_root}" --threads "${i}" --iterations "${cnt}" --mode "${mode[k]}" --key-size "${kv_sz[j]}" --value-size "${kv_sz[j+1]}" --random - if test $? -ne 0 - then - echo "${mode[k]} threads $i ksz ${kv_sz[j]} vsz ${kv_sz[j+1]} random fail" - exit 1 - fi - fi - "${rocksdb_dir}/build/release/rocksdb_bench" --path "${db_root}" --threads "${i}" --iterations "${cnt}" --mode "${mode[k]}" --key-size "${kv_sz[j]}" --value-size "${kv_sz[j+1]}" - if test $? -ne 0 - then - echo "${mode[k]} threads $i ksz ${kv_sz[j]} vsz ${kv_sz[j+1]} fail" - exit 1 - fi - done - done - done -} +warmup_secs="${WARMUP_SECS:-10}" +measure_secs="${MEASURE_SECS:-20}" +prefill_keys="${PREFILL_KEYS:-200000}" +read_path="${READ_PATH:-snapshot}" -echo mode,threads,key_size,value_size,insert_ratio,ops,elapsed_us > "${script_dir}/rocksdb.csv" -samples "$1" 1>> "${script_dir}/rocksdb.csv" -if [ -x "${script_dir}/bin/python" ]; then - (cd "${script_dir}" && "${script_dir}/bin/python" plot.py rocksdb.csv) -else - (cd "${script_dir}" && python3 plot.py rocksdb.csv) +if [[ "${db_root}" != /nvme* ]]; then + printf "db_root must be under /nvme, got: %s\n" "${db_root}" >&2 + exit 1 fi + +mkdir -p "${db_root}" +mkdir -p "$(dirname -- "${result_file}")" + +(cd "${rocksdb_dir}" && cmake --preset release) +(cd "${rocksdb_dir}" && cmake --build --preset release) + +workloads=(W1 W2 W3 W4 W5 W6) +threads=(1 6 12) +profiles=( + "32 1024" + "32 16384" +) + +for workload in "${workloads[@]}"; do + for t in "${threads[@]}"; do + for kv in "${profiles[@]}"; do + read -r key_size value_size <<< "${kv}" + run_path="$(mktemp -u -p "${db_root}" "rocksdb_${workload}_${t}_${key_size}_${value_size}_XXXXXX")" + printf "[rocksdb] workload=%s threads=%s key=%s value=%s path=%s\n" \ + "${workload}" "${t}" "${key_size}" "${value_size}" "${run_path}" + "${rocksdb_dir}/build/release/rocksdb_bench" \ + --path "${run_path}" \ + --workload "${workload}" \ + --threads "${t}" \ + --key-size "${key_size}" \ + --value-size "${value_size}" \ + --prefill-keys "${prefill_keys}" \ + --warmup-secs "${warmup_secs}" \ + --measure-secs "${measure_secs}" \ + --read-path "${read_path}" \ + --result-file "${result_file}" + done + done +done + +printf "RocksDB runs finished. Results appended to: %s\n" "${result_file}" diff --git a/src/main.rs b/src/main.rs index 0179ad4..f6b3a07 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,45 +4,577 @@ use logger::Logger; use mace::{Mace, Options}; #[cfg(feature = "custom_alloc")] use myalloc::{MyAlloc, print_filtered_trace}; +use rand::rngs::StdRng; +use rand::seq::SliceRandom; +use rand::{Rng, SeedableRng}; +use std::fs::OpenOptions; +use std::io::{BufWriter, Write}; use std::path::Path; use std::process::exit; -use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Barrier}; use std::thread::JoinHandle; -use std::time::Instant; +use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; #[cfg(feature = "custom_alloc")] #[global_allocator] static GLOBAL: MyAlloc = MyAlloc; -#[derive(Parser, Debug)] +const LAT_BUCKETS: usize = 64; +const PREFIX_GROUPS: usize = 1024; +const PREFILL_BATCH: usize = 1024; + +#[derive(Parser, Debug, Clone)] #[command(author, version, about, long_about = None)] struct Args { - #[arg(short = 'p', long, default_value = "/tmp/mace")] + #[arg(short = 'p', long, default_value = "/nvme/kv_bench_mace")] path: String, #[arg(short = 'm', long, default_value = "insert")] mode: String, - #[arg(short = 'k', long, default_value = "16")] + #[arg(long)] + workload: Option, + + #[arg(short = 'k', long, default_value_t = 16)] key_size: usize, - #[arg(short = 'v', long, default_value = "1024")] + #[arg(short = 'v', long, default_value_t = 1024)] value_size: usize, - #[arg(short = 't', long, default_value = "4")] + #[arg(short = 't', long, default_value_t = 4)] threads: usize, - #[arg(short = 'i', long, default_value = "10000")] + #[arg(short = 'i', long, default_value_t = 10000)] iterations: usize, - #[arg(short = 'r', long, default_value = "30")] + #[arg(short = 'r', long, default_value_t = 30)] insert_ratio: u8, - #[arg(long, default_value = "false")] + #[arg(long, default_value_t = false)] random: bool, - #[arg(long, default_value = "8192")] + #[arg(long, default_value_t = 8192)] blob_size: usize, + + #[arg(long, default_value_t = true)] + shared_keyspace: bool, + + #[arg(long, default_value_t = 0)] + prefill_keys: usize, + + #[arg(long, default_value_t = 0)] + warmup_secs: u64, + + #[arg(long, default_value_t = 0)] + measure_secs: u64, + + #[arg(long, default_value_t = 100)] + scan_len: usize, + + #[arg(long, default_value_t = 0.99)] + zipf_theta: f64, + + #[arg(long, default_value = "snapshot")] + read_path: String, + + #[arg(long, default_value = "benchmark_results.csv")] + result_file: String, + + #[arg(long, default_value_t = true)] + cleanup: bool, +} + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +enum Distribution { + Uniform, + Zipf, +} + +impl Distribution { + fn as_str(self) -> &'static str { + match self { + Distribution::Uniform => "uniform", + Distribution::Zipf => "zipf", + } + } +} + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +enum ReadPath { + Snapshot, + RwTxn, +} + +impl ReadPath { + fn parse(raw: &str) -> Option { + match raw.trim().to_ascii_lowercase().as_str() { + "snapshot" => Some(Self::Snapshot), + "rw_txn" | "rwtxn" | "txn" => Some(Self::RwTxn), + _ => None, + } + } + + fn as_str(self) -> &'static str { + match self { + ReadPath::Snapshot => "snapshot", + ReadPath::RwTxn => "rw_txn", + } + } +} + +#[derive(Clone, Debug)] +struct WorkloadSpec { + id: String, + mode_label: String, + distribution: Distribution, + read_pct: u8, + update_pct: u8, + scan_pct: u8, + scan_len: usize, + requires_prefill: bool, + insert_only: bool, +} + +#[derive(Clone, Copy, Debug)] +struct ThreadRange { + start: usize, + len: usize, +} + +#[derive(Clone, Debug)] +struct MachineMeta { + host: String, + os: String, + arch: String, + kernel: String, + cpu_cores: usize, + mem_total_kb: u64, + mem_available_kb: u64, +} + +#[derive(Clone, Copy, Debug, Default)] +struct Quantiles { + p50_us: u64, + p95_us: u64, + p99_us: u64, + p999_us: u64, +} + +#[derive(Clone, Debug)] +struct ResultRow { + ts_epoch_ms: u128, + engine: &'static str, + workload_id: String, + mode: String, + threads: usize, + key_size: usize, + value_size: usize, + prefill_keys: usize, + shared_keyspace: bool, + distribution: Distribution, + zipf_theta: f64, + read_pct: u8, + update_pct: u8, + scan_pct: u8, + scan_len: usize, + read_path: ReadPath, + warmup_secs: u64, + measure_secs: u64, + total_ops: u64, + error_ops: u64, + ops_per_sec: f64, + quantiles: Quantiles, + elapsed_us: u64, + meta: MachineMeta, +} + +#[derive(Clone, Debug)] +struct ThreadStats { + total_ops: u64, + error_ops: u64, + hist: [u64; LAT_BUCKETS], +} + +impl Default for ThreadStats { + fn default() -> Self { + Self { + total_ops: 0, + error_ops: 0, + hist: [0; LAT_BUCKETS], + } + } +} + +fn parse_workload(args: &Args) -> Result { + if let Some(w) = args.workload.as_ref() { + let id = w.trim().to_ascii_uppercase(); + let spec = match id.as_str() { + "W1" => WorkloadSpec { + id, + mode_label: "mixed".into(), + distribution: Distribution::Uniform, + read_pct: 95, + update_pct: 5, + scan_pct: 0, + scan_len: args.scan_len, + requires_prefill: true, + insert_only: false, + }, + "W2" => WorkloadSpec { + id, + mode_label: "mixed".into(), + distribution: Distribution::Zipf, + read_pct: 95, + update_pct: 5, + scan_pct: 0, + scan_len: args.scan_len, + requires_prefill: true, + insert_only: false, + }, + "W3" => WorkloadSpec { + id, + mode_label: "mixed".into(), + distribution: Distribution::Uniform, + read_pct: 50, + update_pct: 50, + scan_pct: 0, + scan_len: args.scan_len, + requires_prefill: true, + insert_only: false, + }, + "W4" => WorkloadSpec { + id, + mode_label: "mixed".into(), + distribution: Distribution::Uniform, + read_pct: 5, + update_pct: 95, + scan_pct: 0, + scan_len: args.scan_len, + requires_prefill: true, + insert_only: false, + }, + "W5" => WorkloadSpec { + id, + mode_label: "mixed".into(), + distribution: Distribution::Uniform, + read_pct: 70, + update_pct: 25, + scan_pct: 5, + scan_len: args.scan_len, + requires_prefill: true, + insert_only: false, + }, + "W6" => WorkloadSpec { + id, + mode_label: "scan".into(), + distribution: Distribution::Uniform, + read_pct: 0, + update_pct: 0, + scan_pct: 100, + scan_len: args.scan_len, + requires_prefill: true, + insert_only: false, + }, + _ => { + return Err(format!( + "invalid workload `{}` (supported: W1, W2, W3, W4, W5, W6)", + w + )); + } + }; + return Ok(spec); + } + + let mode = args.mode.trim().to_ascii_lowercase(); + match mode.as_str() { + "insert" => Ok(WorkloadSpec { + id: "LEGACY_INSERT".into(), + mode_label: "insert".into(), + distribution: Distribution::Uniform, + read_pct: 0, + update_pct: 100, + scan_pct: 0, + scan_len: args.scan_len, + requires_prefill: false, + insert_only: true, + }), + "get" => Ok(WorkloadSpec { + id: "LEGACY_GET".into(), + mode_label: "get".into(), + distribution: Distribution::Uniform, + read_pct: 100, + update_pct: 0, + scan_pct: 0, + scan_len: args.scan_len, + requires_prefill: true, + insert_only: false, + }), + "mixed" => Ok(WorkloadSpec { + id: "LEGACY_MIXED".into(), + mode_label: "mixed".into(), + distribution: Distribution::Uniform, + read_pct: 100u8.saturating_sub(args.insert_ratio), + update_pct: args.insert_ratio, + scan_pct: 0, + scan_len: args.scan_len, + requires_prefill: true, + insert_only: false, + }), + "scan" => Ok(WorkloadSpec { + id: "LEGACY_SCAN".into(), + mode_label: "scan".into(), + distribution: Distribution::Uniform, + read_pct: 0, + update_pct: 0, + scan_pct: 100, + scan_len: args.scan_len, + requires_prefill: true, + insert_only: false, + }), + _ => Err(format!( + "invalid mode `{}` (supported: insert, get, mixed, scan)", + args.mode + )), + } +} + +fn split_ranges(total: usize, n: usize) -> Vec { + let mut ranges = Vec::with_capacity(n); + if n == 0 { + return ranges; + } + let base = total / n; + let rem = total % n; + let mut start = 0; + for tid in 0..n { + let len = base + usize::from(tid < rem); + ranges.push(ThreadRange { start, len }); + start += len; + } + ranges +} + +fn make_shared_key(id: usize, key_size: usize) -> Vec { + let group = id % PREFIX_GROUPS; + let mut key = format!("s{:03x}_{:010x}", group, id).into_bytes(); + if key.len() < key_size { + key.resize(key_size, b'x'); + } + key +} + +fn make_thread_key(tid: usize, local_id: usize, key_size: usize) -> Vec { + let mut key = format!("t{:03x}_{:010x}", tid % PREFIX_GROUPS, local_id).into_bytes(); + if key.len() < key_size { + key.resize(key_size, b'x'); + } + key +} + +fn make_shared_prefix(key_id: usize) -> Vec { + let group = key_id % PREFIX_GROUPS; + format!("s{:03x}_", group).into_bytes() +} + +fn make_thread_prefix(tid: usize) -> Vec { + format!("t{:03x}_", tid % PREFIX_GROUPS).into_bytes() +} + +fn latency_bucket(us: u64) -> usize { + let v = us.max(1); + let idx = (63 - v.leading_zeros() as usize).min(LAT_BUCKETS - 1); + idx +} + +fn histogram_quantile_us(hist: &[u64; LAT_BUCKETS], q: f64) -> u64 { + let total: u64 = hist.iter().sum(); + if total == 0 { + return 0; + } + let target = ((total as f64) * q).ceil() as u64; + let mut acc = 0u64; + for (idx, cnt) in hist.iter().enumerate() { + acc += *cnt; + if acc >= target { + return if idx == 0 { 1 } else { 1u64 << idx }; + } + } + 1u64 << (LAT_BUCKETS - 1) +} + +fn now_epoch_ms() -> u128 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_millis() +} + +fn read_proc_value_kb(key: &str) -> u64 { + let Ok(content) = std::fs::read_to_string("/proc/meminfo") else { + return 0; + }; + for line in content.lines() { + if let Some(rest) = line.strip_prefix(key) { + let num = rest + .split_whitespace() + .next() + .unwrap_or("0") + .parse::() + .unwrap_or(0); + return num; + } + } + 0 +} + +fn gather_machine_meta() -> MachineMeta { + let host = std::fs::read_to_string("/proc/sys/kernel/hostname") + .ok() + .map(|s| s.trim().to_string()) + .filter(|s| !s.is_empty()) + .or_else(|| std::env::var("HOSTNAME").ok()) + .unwrap_or_else(|| "unknown".to_string()); + + let kernel = std::fs::read_to_string("/proc/sys/kernel/osrelease") + .ok() + .map(|s| s.trim().to_string()) + .unwrap_or_else(|| "unknown".to_string()); + + MachineMeta { + host, + os: std::env::consts::OS.to_string(), + arch: std::env::consts::ARCH.to_string(), + kernel, + cpu_cores: std::thread::available_parallelism() + .map(|n| n.get()) + .unwrap_or(1), + mem_total_kb: read_proc_value_kb("MemTotal:"), + mem_available_kb: read_proc_value_kb("MemAvailable:"), + } +} + +fn csv_escape(raw: &str) -> String { + raw.replace([',', '\n', '\r'], " ") +} + +fn result_header() -> &'static str { + "schema_version,ts_epoch_ms,engine,workload_id,mode,threads,key_size,value_size,prefill_keys,shared_keyspace,distribution,zipf_theta,read_pct,update_pct,scan_pct,scan_len,read_path,warmup_secs,measure_secs,total_ops,error_ops,ops_per_sec,p50_us,p95_us,p99_us,p999_us,elapsed_us,host,os,arch,kernel,cpu_cores,mem_total_kb,mem_available_kb" +} + +fn result_row_csv(row: &ResultRow) -> String { + format!( + "v2,{},{},{},{},{},{},{},{},{},{},{:.4},{},{},{},{},{},{},{},{},{},{:.3},{},{},{},{},{},{},{},{},{},{},{},{}", + row.ts_epoch_ms, + row.engine, + csv_escape(&row.workload_id), + csv_escape(&row.mode), + row.threads, + row.key_size, + row.value_size, + row.prefill_keys, + row.shared_keyspace, + row.distribution.as_str(), + row.zipf_theta, + row.read_pct, + row.update_pct, + row.scan_pct, + row.scan_len, + row.read_path.as_str(), + row.warmup_secs, + row.measure_secs, + row.total_ops, + row.error_ops, + row.ops_per_sec, + row.quantiles.p50_us, + row.quantiles.p95_us, + row.quantiles.p99_us, + row.quantiles.p999_us, + row.elapsed_us, + csv_escape(&row.meta.host), + csv_escape(&row.meta.os), + csv_escape(&row.meta.arch), + csv_escape(&row.meta.kernel), + row.meta.cpu_cores, + row.meta.mem_total_kb, + row.meta.mem_available_kb, + ) +} + +fn append_result_row(path: &str, row: &ResultRow) -> std::io::Result<()> { + let exists = Path::new(path).exists(); + let file = OpenOptions::new().create(true).append(true).open(path)?; + let mut writer = BufWriter::new(file); + if !exists { + writer.write_all(result_header().as_bytes())?; + writer.write_all(b"\n")?; + } + writer.write_all(result_row_csv(row).as_bytes())?; + writer.write_all(b"\n")?; + writer.flush() +} + +fn sample_zipf_like(rng: &mut StdRng, n: usize, theta: f64) -> usize { + if n <= 1 { + return 0; + } + let t = theta.clamp(0.0001, 0.9999); + let u: f64 = rng.random(); + let scaled = u.powf(1.0 / (1.0 - t)); + let idx = (scaled * n as f64) as usize; + idx.min(n - 1) +} + +fn pick_key_id( + rng: &mut StdRng, + distribution: Distribution, + theta: f64, + shared: bool, + prefill_keys: usize, + local_len: usize, +) -> Option { + if shared { + if prefill_keys == 0 { + return None; + } + return Some(match distribution { + Distribution::Uniform => rng.random_range(0..prefill_keys), + Distribution::Zipf => sample_zipf_like(rng, prefill_keys, theta), + }); + } + + if local_len == 0 { + return None; + } + + Some(match distribution { + Distribution::Uniform => rng.random_range(0..local_len), + Distribution::Zipf => sample_zipf_like(rng, local_len, theta), + }) +} + +#[derive(Clone, Copy)] +enum OpKind { + Read, + Update, + Scan, +} + +fn pick_op_kind(rng: &mut StdRng, spec: &WorkloadSpec) -> OpKind { + if spec.insert_only { + return OpKind::Update; + } + if spec.scan_pct == 100 { + return OpKind::Scan; + } + let roll: u8 = rng.random_range(0..100); + if roll < spec.read_pct { + OpKind::Read + } else if roll < spec.read_pct.saturating_add(spec.update_pct) { + OpKind::Update + } else { + OpKind::Scan + } } fn main() { @@ -51,81 +583,114 @@ fn main() { Logger::init().add_file("/tmp/x.log", true); log::set_max_level(log::LevelFilter::Info); } - let args = Args::parse(); + let args = Args::parse(); let path = Path::new(&args.path); if args.path.is_empty() { eprintln!("path is empty"); exit(1); } - if path.exists() { eprintln!("path {:?} already exists", args.path); exit(1); } - if args.threads == 0 { - eprintln!("Error: threads must be greater than 0"); + eprintln!("threads must be greater than 0"); exit(1); } - - if !matches!(args.mode.as_str(), "insert" | "get" | "mixed" | "scan") { - eprintln!("Error: Invalid mode"); - exit(1); - } - if args.key_size < 16 || args.value_size < 16 { - eprintln!("Error: key_size or value_size too small, must >= 16"); + eprintln!("key_size and value_size must be >= 16"); + exit(1); + } + if args.insert_ratio > 100 { + eprintln!("insert ratio must be between 0 and 100"); + exit(1); + } + if !(0.0..1.0).contains(&args.zipf_theta) { + eprintln!("zipf_theta must be in range (0, 1)"); exit(1); } - if args.insert_ratio > 100 { - eprintln!("Error: Insert ratio must be between 0 and 100"); + let read_path = match ReadPath::parse(&args.read_path) { + Some(r) => r, + None => { + eprintln!("invalid read_path `{}` (supported: snapshot, rw_txn)", args.read_path); + exit(1); + } + }; + + let workload = match parse_workload(&args) { + Ok(w) => w, + Err(e) => { + eprintln!("{}", e); + exit(1); + } + }; + + let prefill_keys = if workload.requires_prefill { + if args.prefill_keys > 0 { + args.prefill_keys + } else { + args.iterations.max(1) + } + } else { + args.prefill_keys + }; + + if workload.requires_prefill && prefill_keys == 0 { + eprintln!("prefill_keys must be > 0 for read/mixed/scan workloads"); exit(1); } + + let thread_prefill_ranges = split_ranges(prefill_keys, args.threads); + let thread_op_ranges = if args.shared_keyspace { + thread_prefill_ranges.clone() + } else { + thread_prefill_ranges.clone() + }; + let mut opt = Options::new(path); opt.sync_on_write = false; opt.inline_size = args.blob_size; - opt.tmp_store = args.mode != "get" && args.mode != "scan"; opt.cache_capacity = 3 << 30; opt.data_file_size = 64 << 20; opt.max_log_size = 1 << 30; opt.default_arenas = 128; - let mut saved = opt.clone(); + opt.tmp_store = args.cleanup; - saved.tmp_store = true; - let mut db = Mace::new(opt.validate().unwrap()).unwrap(); + let db = Mace::new(opt.validate().unwrap()).unwrap(); db.disable_gc(); - let mut bkt = db.new_bucket("default").unwrap(); - + let bkt = db.new_bucket("default").unwrap(); let value = Arc::new(vec![b'0'; args.value_size]); - if args.mode == "get" || args.mode == "scan" { - let mut fill_handles = vec![]; + if workload.requires_prefill { + let mut fill_handles = Vec::with_capacity(args.threads); for tid in 0..args.threads { - let bkt_clone = bkt.clone(); - let val_clone = value.clone(); - let key_count = args.iterations / args.threads - + if tid < args.iterations % args.threads { - 1 - } else { - 0 - }; + let bucket = bkt.clone(); + let v = value.clone(); let key_size = args.key_size; + let shared = args.shared_keyspace; + let tr = thread_prefill_ranges[tid]; fill_handles.push(std::thread::spawn(move || { coreid::bind_core(tid); - const BATCH_SIZE: usize = 10000; - for i in (0..key_count).step_by(BATCH_SIZE) { - let tx = bkt_clone.begin().unwrap(); - for j in 0..BATCH_SIZE { - if i + j >= key_count { - break; - } - let mut key = format!("key_{tid}_{}", i + j).into_bytes(); - key.resize(key_size, b'x'); - tx.put(&key, &*val_clone).unwrap(); + let mut in_batch = 0usize; + let mut tx = bucket.begin().unwrap(); + for i in 0..tr.len { + let key = if shared { + make_shared_key(tr.start + i, key_size) + } else { + make_thread_key(tid, i, key_size) + }; + tx.put(key.as_slice(), v.as_slice()).unwrap(); + in_batch += 1; + if in_batch >= PREFILL_BATCH { + tx.commit().unwrap(); + tx = bucket.begin().unwrap(); + in_batch = 0; } + } + if in_batch > 0 { tx.commit().unwrap(); } })); @@ -133,137 +698,377 @@ fn main() { for h in fill_handles { h.join().unwrap(); } - drop(bkt); - drop(db); - db = Mace::new(saved.validate().unwrap()).unwrap(); - bkt = db.get_bucket("default").unwrap(); } - let mut key_counts = vec![args.iterations / args.threads; args.threads]; - for cnt in key_counts.iter_mut().take(args.iterations % args.threads) { - *cnt += 1; - } + let op_counts = split_ranges(args.iterations, args.threads); + let ready_barrier = Arc::new(Barrier::new(args.threads + 1)); + let measure_barrier = Arc::new(Barrier::new(args.threads + 1)); + let insert_counter = Arc::new(AtomicUsize::new(0)); - let ready_barrier = Arc::new(std::sync::Barrier::new(args.threads + 1)); - let start_barrier = Arc::new(std::sync::Barrier::new(args.threads + 1)); - let total_ops = Arc::new(std::sync::atomic::AtomicUsize::new(0)); - - let h: Vec> = (0..args.threads) + let handles: Vec> = (0..args.threads) .map(|tid| { - let db = bkt.clone(); - let total_ops = total_ops.clone(); - let ready_barrier = Arc::clone(&ready_barrier); - let start_barrier = Arc::clone(&start_barrier); - let mode = args.mode.clone(); - let insert_ratio = args.insert_ratio; - let val = value.clone(); - let key_count = key_counts[tid]; + let bucket = bkt.clone(); + let v = value.clone(); + let spec = workload.clone(); + let ready = Arc::clone(&ready_barrier); + let measure = Arc::clone(&measure_barrier); + let ins_ctr = Arc::clone(&insert_counter); let key_size = args.key_size; - let prefix = format!("key_{tid}_"); - let is_random = args.random; + let random_insert = args.random; + let read_path = read_path; + let warmup_secs = args.warmup_secs; + let measure_secs = args.measure_secs; + let distribution = spec.distribution; + let zipf_theta = args.zipf_theta; + let scan_len = spec.scan_len; + let shared = args.shared_keyspace; + let prefill_keys = prefill_keys; + let local_key_len = thread_op_ranges[tid].len; + let local_op_count = op_counts[tid].len; std::thread::spawn(move || { coreid::bind_core(tid); - let mut round = 0; - let mut indices: Vec = (0..key_count).collect(); - if is_random { - use rand::seq::SliceRandom; - indices.shuffle(&mut rand::rng()); + let seed = (now_epoch_ms() as u64) + ^ ((tid as u64 + 1) * 0x9E37_79B9_7F4A_7C15) + ^ ((prefill_keys as u64) << 7); + let mut rng = StdRng::seed_from_u64(seed); + let mut stats = ThreadStats::default(); + let mut local_insert_idx = 0usize; + + let mut count_indices: Vec = (0..local_op_count).collect(); + if random_insert && spec.insert_only { + count_indices.shuffle(&mut rng); } - ready_barrier.wait(); - start_barrier.wait(); - match mode.as_str() { - "insert" => { - for i in indices { - let mut key = format!("key_{tid}_{i}").into_bytes(); - key.resize(key_size, b'x'); - round += 1; - let tx = db.begin().unwrap(); - tx.put(key.as_slice(), val.as_slice()).unwrap(); - tx.commit().unwrap(); - } - } - "get" => { - for i in indices { - let mut key = format!("key_{tid}_{i}").into_bytes(); - key.resize(key_size, b'x'); - round += 1; - let tx = db.view().unwrap(); - let x = tx.get(key).unwrap(); - std::hint::black_box(x); - } - } - "mixed" => { - for i in indices { - let mut key = format!("key_{tid}_{i}").into_bytes(); - key.resize(key_size, b'x'); - let is_insert = rand::random_range(0..100) < insert_ratio; - round += 1; + ready.wait(); - if is_insert { - let tx = db.begin().unwrap(); - tx.put(key, &*val).unwrap(); - tx.commit().unwrap(); - } else { - let tx = db.view().unwrap(); - let x = tx.get(key); - let _ = std::hint::black_box(x); - } - } + if warmup_secs > 0 { + let deadline = Instant::now() + Duration::from_secs(warmup_secs); + while Instant::now() < deadline { + let op = pick_op_kind(&mut rng, &spec); + run_one_op( + op, + &bucket, + &v, + &mut rng, + &spec, + distribution, + zipf_theta, + read_path, + key_size, + scan_len, + shared, + prefill_keys, + local_key_len, + tid, + &ins_ctr, + &mut local_insert_idx, + None, + ); } - "scan" => { - let view = db.view().unwrap(); - let iter = view.seek(prefix); - for x in iter { - round += 1; - std::hint::black_box(x); - } - } - _ => panic!("Invalid mode"), } - total_ops.fetch_add(round, std::sync::atomic::Ordering::Relaxed); + measure.wait(); + + if measure_secs > 0 { + let deadline = Instant::now() + Duration::from_secs(measure_secs); + while Instant::now() < deadline { + let op = pick_op_kind(&mut rng, &spec); + run_one_op( + op, + &bucket, + &v, + &mut rng, + &spec, + distribution, + zipf_theta, + read_path, + key_size, + scan_len, + shared, + prefill_keys, + local_key_len, + tid, + &ins_ctr, + &mut local_insert_idx, + Some(&mut stats), + ); + } + } else { + for idx in count_indices { + let op = if spec.insert_only { + let _ = idx; + OpKind::Update + } else { + pick_op_kind(&mut rng, &spec) + }; + run_one_op( + op, + &bucket, + &v, + &mut rng, + &spec, + distribution, + zipf_theta, + read_path, + key_size, + scan_len, + shared, + prefill_keys, + local_key_len, + tid, + &ins_ctr, + &mut local_insert_idx, + Some(&mut stats), + ); + } + } + + stats }) }) .collect(); ready_barrier.wait(); - let start_time = Instant::now(); - start_barrier.wait(); + measure_barrier.wait(); + let measure_started = Instant::now(); - for x in h { - x.join().unwrap(); - } + let mut merged_hist = [0u64; LAT_BUCKETS]; + let mut total_ops = 0u64; + let mut error_ops = 0u64; - let elapsed_us = start_time.elapsed().as_micros() as u64; - let total = total_ops.load(std::sync::atomic::Ordering::Relaxed); - let ops = if elapsed_us == 0 { - 0 - } else { - ((total as u128 * 1_000_000u128) / elapsed_us as u128) as usize - }; - - let ratio = if args.mode == "mixed" { - args.insert_ratio - } else if args.mode == "insert" { - 100 - } else { - 0 - }; - let mut mode = args.mode.clone(); - if mode == "insert" { - if args.random { - mode = "random_insert".into(); - } else { - mode = "sequential_insert".into(); + for h in handles { + let s = h.join().unwrap(); + total_ops += s.total_ops; + error_ops += s.error_ops; + for (i, v) in s.hist.iter().enumerate() { + merged_hist[i] += *v; } } + + let elapsed_us = measure_started.elapsed().as_micros() as u64; + let ops_per_sec = if elapsed_us == 0 { + 0.0 + } else { + (total_ops as f64) * 1_000_000.0 / (elapsed_us as f64) + }; + + let quantiles = Quantiles { + p50_us: histogram_quantile_us(&merged_hist, 0.50), + p95_us: histogram_quantile_us(&merged_hist, 0.95), + p99_us: histogram_quantile_us(&merged_hist, 0.99), + p999_us: histogram_quantile_us(&merged_hist, 0.999), + }; + + let row = ResultRow { + ts_epoch_ms: now_epoch_ms(), + engine: "mace", + workload_id: workload.id.clone(), + mode: workload.mode_label.clone(), + threads: args.threads, + key_size: args.key_size, + value_size: args.value_size, + prefill_keys, + shared_keyspace: args.shared_keyspace, + distribution: workload.distribution, + zipf_theta: args.zipf_theta, + read_pct: workload.read_pct, + update_pct: workload.update_pct, + scan_pct: workload.scan_pct, + scan_len: workload.scan_len, + read_path, + warmup_secs: args.warmup_secs, + measure_secs: args.measure_secs, + total_ops, + error_ops, + ops_per_sec, + quantiles, + elapsed_us, + meta: gather_machine_meta(), + }; + + if let Err(e) = append_result_row(&args.result_file, &row) { + eprintln!("failed to write result file {}: {}", args.result_file, e); + exit(1); + } + println!( - "{},{},{},{},{},{},{}", - mode, args.threads, args.key_size, args.value_size, ratio, ops, elapsed_us + "engine=mace workload={} mode={} threads={} ops={} err={} qps={:.2} p99_us={} result_file={}", + row.workload_id, + row.mode, + row.threads, + row.total_ops, + row.error_ops, + row.ops_per_sec, + row.quantiles.p99_us, + args.result_file ); + drop(db); #[cfg(feature = "custom_alloc")] print_filtered_trace(|x, y| log::info!("{}{}", x, y)); } + +#[allow(clippy::too_many_arguments)] +fn run_one_op( + op: OpKind, + bucket: &mace::Bucket, + value: &Arc>, + rng: &mut StdRng, + spec: &WorkloadSpec, + distribution: Distribution, + zipf_theta: f64, + read_path: ReadPath, + key_size: usize, + scan_len: usize, + shared_keyspace: bool, + prefill_keys: usize, + local_key_len: usize, + tid: usize, + insert_counter: &AtomicUsize, + local_insert_idx: &mut usize, + stats: Option<&mut ThreadStats>, +) { + let start = stats.as_ref().map(|_| Instant::now()); + + let ok = match op { + OpKind::Read => { + let maybe_id = pick_key_id( + rng, + distribution, + zipf_theta, + shared_keyspace, + prefill_keys, + local_key_len, + ); + if let Some(id) = maybe_id { + let key = if shared_keyspace { + make_shared_key(id, key_size) + } else { + make_thread_key(tid, id, key_size) + }; + match read_path { + ReadPath::Snapshot => bucket + .view() + .ok() + .and_then(|tx| tx.get(key).ok()) + .is_some(), + ReadPath::RwTxn => { + if let Ok(tx) = bucket.begin() { + let get_ok = tx.get(key).is_ok(); + let commit_ok = tx.commit().is_ok(); + get_ok && commit_ok + } else { + false + } + } + } + } else { + false + } + } + OpKind::Update => { + let key_opt = if spec.insert_only { + if shared_keyspace { + let id = insert_counter.fetch_add(1, Ordering::Relaxed); + Some(make_shared_key(id, key_size)) + } else { + let id = *local_insert_idx; + *local_insert_idx += 1; + Some(make_thread_key(tid, id, key_size)) + } + } else { + let maybe_id = pick_key_id( + rng, + distribution, + zipf_theta, + shared_keyspace, + prefill_keys, + local_key_len, + ); + if let Some(id) = maybe_id { + if shared_keyspace { + Some(make_shared_key(id, key_size)) + } else { + Some(make_thread_key(tid, id, key_size)) + } + } else { + None + } + }; + + if let Some(key) = key_opt { + if let Ok(tx) = bucket.begin() { + let write_ok = if spec.insert_only { + tx.put(key.as_slice(), value.as_slice()).is_ok() + } else { + tx.upsert(key.as_slice(), value.as_slice()).is_ok() + }; + if !write_ok { + false + } else { + tx.commit().is_ok() + } + } else { + false + } + } else { + false + } + } + OpKind::Scan => { + let prefix_opt = if shared_keyspace { + let maybe_id = pick_key_id( + rng, + distribution, + zipf_theta, + true, + prefill_keys, + local_key_len, + ); + maybe_id.map(make_shared_prefix) + } else { + Some(make_thread_prefix(tid)) + }; + + if let Some(prefix) = prefix_opt { + match read_path { + ReadPath::Snapshot => { + if let Ok(view) = bucket.view() { + for item in view.seek(prefix).take(scan_len.max(1)) { + std::hint::black_box(item); + } + true + } else { + false + } + } + ReadPath::RwTxn => { + if let Ok(tx) = bucket.begin() { + for item in tx.seek(prefix).take(scan_len.max(1)) { + std::hint::black_box(item); + } + tx.commit().is_ok() + } else { + false + } + } + } + } else { + false + } + } + }; + + if let Some(stats) = stats { + stats.total_ops += 1; + if !ok { + stats.error_ops += 1; + } + if let Some(start) = start { + let us = start.elapsed().as_micros() as u64; + let idx = latency_bucket(us); + stats.hist[idx] += 1; + } + } +}