From 4b7c51170376308b45601d76615a5fa5a9d6cdc8 Mon Sep 17 00:00:00 2001 From: abbycin Date: Tue, 3 Mar 2026 22:18:11 +0800 Subject: [PATCH] phase3: add durability mode matrix and cost report --- plan_exec.md | 27 ++++++++++- rocksdb/main.cpp | 44 +++++++++++++++-- scripts/phase3.sh | 95 ++++++++++++++++++++++++++++++++++++ scripts/phase3_report.py | 101 +++++++++++++++++++++++++++++++++++++++ src/main.rs | 49 +++++++++++++++++-- 5 files changed, 307 insertions(+), 9 deletions(-) create mode 100755 scripts/phase3.sh create mode 100755 scripts/phase3_report.py diff --git a/plan_exec.md b/plan_exec.md index c854a28..8fafd39 100644 --- a/plan_exec.md +++ b/plan_exec.md @@ -17,7 +17,7 @@ - `cmake --build --preset release -j` 通过 - 运行烟测: - `mace` 与 `rocksdb` 均可按新参数运行并写入统一 schema 结果文件 -- 提交:待本阶段 commit +- 提交:`0649db5` (`phase0: align benchmark v2 workload protocol`) ## Phase 1(已完成) - 日期:2026-03-03 @@ -35,7 +35,7 @@ - 验证: - `bash -n scripts/phase1.sh` 通过 - `python3 -m py_compile scripts/phase1_eval.py` 通过 -- 提交:待本阶段 commit +- 提交:`436e813` (`phase1: add trial matrix runner and cv evaluator`) ## Phase 2(已完成) - 日期:2026-03-03 @@ -47,4 +47,27 @@ - 验证: - `bash -n scripts/phase2.sh` 通过 - `python3 -m py_compile scripts/phase2_report.py` 通过 +- 提交:`f0fd573` (`phase2: add steady-state matrix runner and report`) + +## Phase 3(已完成) +- 日期:2026-03-03 +- 范围: + - 扩展 `src/main.rs` 与 `rocksdb/main.cpp`: + - 新增 `--durability relaxed|durable` + - 统一结果 schema 新增 `durability_mode` 字段 + - 新增 `scripts/phase3.sh`:耐久性成本矩阵执行器 + - dataset:`tier-m` + - workload:`W1/W3/W6` + - profile:`P2` + - threads:`1/12` + - repeats:默认 `5` + - durability:`relaxed` 与 `durable` + - 新增 `scripts/phase3_report.py`: + - 输出 `throughput_drop_pct` + - 输出 `p99_inflation_pct` +- 验证: + - `cargo check -q` 通过 + - `cmake --build --preset release -j` 通过 + - `bash -n scripts/phase3.sh` 通过 + - `python3 -m py_compile scripts/phase3_report.py` 通过 - 提交:待本阶段 commit diff --git a/rocksdb/main.cpp b/rocksdb/main.cpp index 98dd3bb..6209b47 100644 --- a/rocksdb/main.cpp +++ b/rocksdb/main.cpp @@ -80,6 +80,7 @@ struct Args { size_t scan_len = 100; double zipf_theta = 0.99; std::string read_path = "snapshot"; + std::string durability = "relaxed"; std::string result_file = "benchmark_results.csv"; bool cleanup = true; }; @@ -94,6 +95,11 @@ enum class ReadPath { RwTxn, }; +enum class DurabilityMode { + Relaxed, + Durable, +}; + struct WorkloadSpec { std::string id; std::string mode_label; @@ -138,6 +144,7 @@ struct ResultRow { uint64_t ts_epoch_ms; std::string workload_id; std::string mode; + DurabilityMode durability_mode; size_t threads; size_t key_size; size_t value_size; @@ -205,6 +212,27 @@ static const char *read_path_str(ReadPath p) { return "snapshot"; } +static std::optional parse_durability(const std::string &raw) { + auto v = to_lower(raw); + if (v == "relaxed") { + return DurabilityMode::Relaxed; + } + if (v == "durable") { + return DurabilityMode::Durable; + } + return std::nullopt; +} + +static const char *durability_str(DurabilityMode mode) { + switch (mode) { + case DurabilityMode::Relaxed: + return "relaxed"; + case DurabilityMode::Durable: + return "durable"; + } + return "relaxed"; +} + static const char *distribution_str(Distribution d) { switch (d) { case Distribution::Uniform: @@ -398,16 +426,17 @@ static std::string csv_escape(const std::string &v) { } static const char *result_header() { - return "schema_version,ts_epoch_ms,engine,workload_id,mode,threads,key_size,value_size,prefill_keys,shared_keyspace,distribution,zipf_theta,read_pct,update_pct,scan_pct,scan_len,read_path,warmup_secs,measure_secs,total_ops,error_ops,ops_per_sec,p50_us,p95_us,p99_us,p999_us,elapsed_us,host,os,arch,kernel,cpu_cores,mem_total_kb,mem_available_kb"; + return "schema_version,ts_epoch_ms,engine,workload_id,mode,durability_mode,threads,key_size,value_size,prefill_keys,shared_keyspace,distribution,zipf_theta,read_pct,update_pct,scan_pct,scan_len,read_path,warmup_secs,measure_secs,total_ops,error_ops,ops_per_sec,p50_us,p95_us,p99_us,p999_us,elapsed_us,host,os,arch,kernel,cpu_cores,mem_total_kb,mem_available_kb"; } static std::string result_row_csv(const ResultRow &r) { return fmt::format( - "v2,{},{},{},{},{},{},{},{},{},{},{:.4},{},{},{},{},{},{},{},{},{},{:.3},{},{},{},{},{},{},{},{},{},{},{},{}", + "v2,{},{},{},{},{},{},{},{},{},{},{},{:.4},{},{},{},{},{},{},{},{},{},{:.3},{},{},{},{},{},{},{},{},{},{},{},{}", r.ts_epoch_ms, "rocksdb", csv_escape(r.workload_id), csv_escape(r.mode), + durability_str(r.durability_mode), r.threads, r.key_size, r.value_size, @@ -672,6 +701,7 @@ int main(int argc, char *argv[]) { app.add_option("--scan-len", args.scan_len, "Scan length per scan op"); app.add_option("--zipf-theta", args.zipf_theta, "Zipf theta"); app.add_option("--read-path", args.read_path, "snapshot|rw_txn"); + app.add_option("--durability", args.durability, "relaxed|durable"); app.add_option("--result-file", args.result_file, "Unified result csv"); app.add_flag("--random", args.random, "Shuffle insert keys (legacy insert)"); app.add_flag("--no-shared-keyspace", disable_shared, "Use per-thread keyspace"); @@ -715,6 +745,11 @@ int main(int argc, char *argv[]) { fmt::println(stderr, "invalid read_path `{}` (supported: snapshot, rw_txn)", args.read_path); return 1; } + auto durability = parse_durability(args.durability); + if (!durability.has_value()) { + fmt::println(stderr, "invalid durability `{}` (supported: relaxed, durable)", args.durability); + return 1; + } std::string workload_err; auto workload_spec_opt = parse_workload(args, workload_err); @@ -764,6 +799,7 @@ int main(int argc, char *argv[]) { auto wopt = rocksdb::WriteOptions(); wopt.no_slowdown = true; + wopt.sync = (durability.value() == DurabilityMode::Durable); rocksdb::OptimisticTransactionDB *db = nullptr; std::vector handles{}; @@ -959,6 +995,7 @@ int main(int argc, char *argv[]) { .ts_epoch_ms = now_epoch_ms(), .workload_id = workload_spec.id, .mode = workload_spec.mode_label, + .durability_mode = durability.value(), .threads = args.threads, .key_size = args.key_size, .value_size = args.value_size, @@ -992,9 +1029,10 @@ int main(int argc, char *argv[]) { } fmt::println( - "engine=rocksdb workload={} mode={} threads={} ops={} err={} qps={:.2} p99_us={} result_file={}", + "engine=rocksdb workload={} mode={} durability={} threads={} ops={} err={} qps={:.2} p99_us={} result_file={}", row.workload_id, row.mode, + durability_str(row.durability_mode), row.threads, row.total_ops, row.error_ops, diff --git a/scripts/phase3.sh b/scripts/phase3.sh new file mode 100755 index 0000000..35a8f1b --- /dev/null +++ b/scripts/phase3.sh @@ -0,0 +1,95 @@ +#!/usr/bin/env bash + +set -euo pipefail + +if [ "$#" -lt 1 ] || [ "$#" -gt 2 ]; then + printf "Usage: %s [result_csv]\n" "$0" + exit 1 +fi + +script_dir="$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" && pwd)" +root_dir="$(cd -- "${script_dir}/.." && pwd)" + +if [[ "$1" != /nvme* ]]; then + printf "db_root must be under /nvme, got: %s\n" "$1" >&2 + exit 1 +fi + +db_root="$1" +result_file="${2:-${script_dir}/phase3_results.csv}" + +warmup_secs="${WARMUP_SECS:-120}" +measure_secs="${MEASURE_SECS:-300}" +repeats="${REPEATS:-5}" +read_path="${READ_PATH:-snapshot}" + +mkdir -p "${db_root}" +mkdir -p "$(dirname -- "${result_file}")" + +cargo build --release --manifest-path "${root_dir}/Cargo.toml" +(cd "${root_dir}/rocksdb" && cmake --preset release) +(cd "${root_dir}/rocksdb" && cmake --build --preset release) + +workloads=(W1 W3 W6) +threads=(1 12) +durabilities=(relaxed durable) +key_size=32 +value_size=1024 +prefill_keys=18302417 # tier-m P2 + +run_case() { + local engine="$1" + local workload="$2" + local t="$3" + local durability="$4" + local repeat="$5" + local run_path + + run_path="$(mktemp -u -p "${db_root}" "${engine}_phase3_${workload}_t${t}_${durability}_r${repeat}_XXXXXX")" + + printf "[phase3][%s] repeat=%s workload=%s threads=%s durability=%s path=%s\n" \ + "${engine}" "${repeat}" "${workload}" "${t}" "${durability}" "${run_path}" + + if [ "${engine}" = "mace" ]; then + "${root_dir}/target/release/kv_bench" \ + --path "${run_path}" \ + --workload "${workload}" \ + --threads "${t}" \ + --key-size "${key_size}" \ + --value-size "${value_size}" \ + --prefill-keys "${prefill_keys}" \ + --warmup-secs "${warmup_secs}" \ + --measure-secs "${measure_secs}" \ + --shared-keyspace true \ + --read-path "${read_path}" \ + --durability "${durability}" \ + --result-file "${result_file}" + else + "${root_dir}/rocksdb/build/release/rocksdb_bench" \ + --path "${run_path}" \ + --workload "${workload}" \ + --threads "${t}" \ + --key-size "${key_size}" \ + --value-size "${value_size}" \ + --prefill-keys "${prefill_keys}" \ + --warmup-secs "${warmup_secs}" \ + --measure-secs "${measure_secs}" \ + --read-path "${read_path}" \ + --durability "${durability}" \ + --result-file "${result_file}" + fi +} + +for repeat in $(seq 1 "${repeats}"); do + for workload in "${workloads[@]}"; do + for t in "${threads[@]}"; do + for durability in "${durabilities[@]}"; do + run_case mace "${workload}" "${t}" "${durability}" "${repeat}" + run_case rocksdb "${workload}" "${t}" "${durability}" "${repeat}" + done + done + done +done + +python3 "${script_dir}/phase3_report.py" "${result_file}" +printf "Phase 3 finished. Results: %s\n" "${result_file}" diff --git a/scripts/phase3_report.py b/scripts/phase3_report.py new file mode 100755 index 0000000..da77ec6 --- /dev/null +++ b/scripts/phase3_report.py @@ -0,0 +1,101 @@ +#!/usr/bin/env python3 + +import sys +import pandas as pd + + +def main() -> int: + if len(sys.argv) != 2: + print(f"Usage: {sys.argv[0]} ") + return 1 + + df = pd.read_csv(sys.argv[1]) + + needed = { + "engine", + "workload_id", + "threads", + "durability_mode", + "ops_per_sec", + "p99_us", + } + missing = needed - set(df.columns) + if missing: + raise ValueError(f"Missing columns: {sorted(missing)}") + + sub = df[ + (df["workload_id"].isin(["W1", "W3", "W6"])) + & (df["key_size"] == 32) + & (df["value_size"] == 1024) + ].copy() + + if sub.empty: + print("No phase3 rows found in csv") + return 0 + + base = ( + sub.groupby(["engine", "workload_id", "threads", "durability_mode"]) + .agg( + repeats=("ops_per_sec", "count"), + throughput_median=("ops_per_sec", "median"), + p99_median=("p99_us", "median"), + ) + .reset_index() + ) + + with pd.option_context("display.max_rows", None, "display.max_columns", None): + print(base.to_string(index=False)) + + piv_tput = base.pivot_table( + index=["engine", "workload_id", "threads"], + columns="durability_mode", + values="throughput_median", + aggfunc="first", + ).reset_index() + + piv_p99 = base.pivot_table( + index=["engine", "workload_id", "threads"], + columns="durability_mode", + values="p99_median", + aggfunc="first", + ).reset_index() + + merged = piv_tput.merge( + piv_p99, + on=["engine", "workload_id", "threads"], + suffixes=("_tput", "_p99"), + ) + + for col in ["relaxed_tput", "durable_tput", "relaxed_p99", "durable_p99"]: + if col not in merged.columns: + merged[col] = pd.NA + + merged["throughput_drop_pct"] = ( + (1.0 - (merged["durable_tput"] / merged["relaxed_tput"])) * 100.0 + ) + merged["p99_inflation_pct"] = ( + ((merged["durable_p99"] / merged["relaxed_p99"]) - 1.0) * 100.0 + ) + + print("\nDurability cost summary:") + print( + merged[ + [ + "engine", + "workload_id", + "threads", + "relaxed_tput", + "durable_tput", + "throughput_drop_pct", + "relaxed_p99", + "durable_p99", + "p99_inflation_pct", + ] + ].to_string(index=False) + ) + + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/src/main.rs b/src/main.rs index f6b3a07..674f542 100644 --- a/src/main.rs +++ b/src/main.rs @@ -78,6 +78,9 @@ struct Args { #[arg(long, default_value = "snapshot")] read_path: String, + #[arg(long, default_value = "relaxed")] + durability: String, + #[arg(long, default_value = "benchmark_results.csv")] result_file: String, @@ -106,6 +109,29 @@ enum ReadPath { RwTxn, } +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +enum DurabilityMode { + Relaxed, + Durable, +} + +impl DurabilityMode { + fn parse(raw: &str) -> Option { + match raw.trim().to_ascii_lowercase().as_str() { + "relaxed" => Some(Self::Relaxed), + "durable" => Some(Self::Durable), + _ => None, + } + } + + fn as_str(self) -> &'static str { + match self { + DurabilityMode::Relaxed => "relaxed", + DurabilityMode::Durable => "durable", + } + } +} + impl ReadPath { fn parse(raw: &str) -> Option { match raw.trim().to_ascii_lowercase().as_str() { @@ -167,6 +193,7 @@ struct ResultRow { engine: &'static str, workload_id: String, mode: String, + durability_mode: DurabilityMode, threads: usize, key_size: usize, value_size: usize, @@ -459,16 +486,17 @@ fn csv_escape(raw: &str) -> String { } fn result_header() -> &'static str { - "schema_version,ts_epoch_ms,engine,workload_id,mode,threads,key_size,value_size,prefill_keys,shared_keyspace,distribution,zipf_theta,read_pct,update_pct,scan_pct,scan_len,read_path,warmup_secs,measure_secs,total_ops,error_ops,ops_per_sec,p50_us,p95_us,p99_us,p999_us,elapsed_us,host,os,arch,kernel,cpu_cores,mem_total_kb,mem_available_kb" + "schema_version,ts_epoch_ms,engine,workload_id,mode,durability_mode,threads,key_size,value_size,prefill_keys,shared_keyspace,distribution,zipf_theta,read_pct,update_pct,scan_pct,scan_len,read_path,warmup_secs,measure_secs,total_ops,error_ops,ops_per_sec,p50_us,p95_us,p99_us,p999_us,elapsed_us,host,os,arch,kernel,cpu_cores,mem_total_kb,mem_available_kb" } fn result_row_csv(row: &ResultRow) -> String { format!( - "v2,{},{},{},{},{},{},{},{},{},{},{:.4},{},{},{},{},{},{},{},{},{},{:.3},{},{},{},{},{},{},{},{},{},{},{},{}", + "v2,{},{},{},{},{},{},{},{},{},{},{},{:.4},{},{},{},{},{},{},{},{},{},{:.3},{},{},{},{},{},{},{},{},{},{},{},{}", row.ts_epoch_ms, row.engine, csv_escape(&row.workload_id), csv_escape(&row.mode), + row.durability_mode.as_str(), row.threads, row.key_size, row.value_size, @@ -620,6 +648,17 @@ fn main() { } }; + let durability_mode = match DurabilityMode::parse(&args.durability) { + Some(v) => v, + None => { + eprintln!( + "invalid durability `{}` (supported: relaxed, durable)", + args.durability + ); + exit(1); + } + }; + let workload = match parse_workload(&args) { Ok(w) => w, Err(e) => { @@ -651,7 +690,7 @@ fn main() { }; let mut opt = Options::new(path); - opt.sync_on_write = false; + opt.sync_on_write = durability_mode == DurabilityMode::Durable; opt.inline_size = args.blob_size; opt.cache_capacity = 3 << 30; opt.data_file_size = 64 << 20; @@ -865,6 +904,7 @@ fn main() { engine: "mace", workload_id: workload.id.clone(), mode: workload.mode_label.clone(), + durability_mode, threads: args.threads, key_size: args.key_size, value_size: args.value_size, @@ -893,9 +933,10 @@ fn main() { } println!( - "engine=mace workload={} mode={} threads={} ops={} err={} qps={:.2} p99_us={} result_file={}", + "engine=mace workload={} mode={} durability={} threads={} ops={} err={} qps={:.2} p99_us={} result_file={}", row.workload_id, row.mode, + row.durability_mode.as_str(), row.threads, row.total_ops, row.error_ops,