diff --git a/plan_exec.md b/plan_exec.md index 8fafd39..5045ad9 100644 --- a/plan_exec.md +++ b/plan_exec.md @@ -70,4 +70,24 @@ - `cmake --build --preset release -j` 通过 - `bash -n scripts/phase3.sh` 通过 - `python3 -m py_compile scripts/phase3_report.py` 通过 +- 提交:`4b7c511` (`phase3: add durability mode matrix and cost report`) + +## Phase 4(已完成) +- 日期:2026-03-03 +- 范围: + - 扩展 `src/main.rs` 与 `rocksdb/main.cpp`: + - 新增 `--reuse-path`(允许复用已存在数据目录) + - 新增 `--skip-prefill`(重启后直接用既有数据集) + - 新增 `scripts/phase4_soak.sh`: + - baseline:`tier-m + W3 + P2 + 12 threads` + - 周期性 `kill -9` + restart 验证(默认每 30 分钟) + - 记录 `restart_ready_ms` 与退出状态到 `phase4_restart.csv` + - 新增 `scripts/phase4_report.py`: + - 汇总 restart 成功率 + - 输出 restart ready 时间分位(p50/p95/p99/max) +- 验证: + - `cargo check -q` 通过 + - `cmake --build --preset release -j` 通过 + - `bash -n scripts/phase4_soak.sh` 通过 + - `python3 -m py_compile scripts/phase4_report.py` 通过 - 提交:待本阶段 commit diff --git a/rocksdb/main.cpp b/rocksdb/main.cpp index 6209b47..0649669 100644 --- a/rocksdb/main.cpp +++ b/rocksdb/main.cpp @@ -83,6 +83,8 @@ struct Args { std::string durability = "relaxed"; std::string result_file = "benchmark_results.csv"; bool cleanup = true; + bool skip_prefill = false; + bool reuse_path = false; }; enum class Distribution { @@ -706,6 +708,8 @@ int main(int argc, char *argv[]) { app.add_flag("--random", args.random, "Shuffle insert keys (legacy insert)"); app.add_flag("--no-shared-keyspace", disable_shared, "Use per-thread keyspace"); app.add_flag("--no-cleanup", disable_cleanup, "Keep db directory after run"); + app.add_flag("--skip-prefill", args.skip_prefill, "Skip prefill and use existing dataset"); + app.add_flag("--reuse-path", args.reuse_path, "Allow opening existing db path"); CLI11_PARSE(app, argc, argv); @@ -719,7 +723,7 @@ int main(int argc, char *argv[]) { fmt::println(stderr, "path is empty"); return 1; } - if (std::filesystem::exists(args.path)) { + if (std::filesystem::exists(args.path) && !args.reuse_path) { fmt::println(stderr, "path `{}` already exists", args.path); return 1; } @@ -809,7 +813,7 @@ int main(int argc, char *argv[]) { std::string value(args.value_size, '0'); - if (workload_spec.requires_prefill) { + if (workload_spec.requires_prefill && !args.skip_prefill) { std::vector fill_threads; fill_threads.reserve(args.threads); for (size_t tid = 0; tid < args.threads; ++tid) { diff --git a/scripts/phase4_report.py b/scripts/phase4_report.py new file mode 100755 index 0000000..4612743 --- /dev/null +++ b/scripts/phase4_report.py @@ -0,0 +1,40 @@ +#!/usr/bin/env python3 + +import sys +import pandas as pd + + +def main() -> int: + if len(sys.argv) != 2: + print(f"Usage: {sys.argv[0]} ") + return 1 + + df = pd.read_csv(sys.argv[1]) + needed = {"cycle", "restart_status", "restart_ready_ms", "worker_exit"} + missing = needed - set(df.columns) + if missing: + raise ValueError(f"Missing columns: {sorted(missing)}") + + total = len(df) + restart_ok = int((df["restart_status"] == 0).sum()) + worker_nonzero = int((df["worker_exit"] != 0).sum()) + + print(f"cycles={total}") + print(f"restart_success={restart_ok}/{total} ({(restart_ok / total * 100.0) if total else 0.0:.1f}%)") + print(f"worker_nonzero_exit={worker_nonzero}/{total}") + + if total: + q = df["restart_ready_ms"].quantile([0.5, 0.95, 0.99]).to_dict() + print( + "restart_ready_ms: " + f"p50={q.get(0.5, 0):.1f}, " + f"p95={q.get(0.95, 0):.1f}, " + f"p99={q.get(0.99, 0):.1f}, " + f"max={df['restart_ready_ms'].max():.1f}" + ) + + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/scripts/phase4_soak.sh b/scripts/phase4_soak.sh new file mode 100755 index 0000000..285cb2b --- /dev/null +++ b/scripts/phase4_soak.sh @@ -0,0 +1,140 @@ +#!/usr/bin/env bash + +set -euo pipefail + +if [ "$#" -lt 2 ] || [ "$#" -gt 4 ]; then + printf "Usage: %s [result_csv] [restart_csv]\n" "$0" + exit 1 +fi + +engine="$1" +db_path="$2" + +script_dir="$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" && pwd)" +root_dir="$(cd -- "${script_dir}/.." && pwd)" + +result_file="${3:-${script_dir}/phase4_results.csv}" +restart_file="${4:-${script_dir}/phase4_restart.csv}" + +if [[ "${db_path}" != /nvme* ]]; then + printf "db_path must be under /nvme, got: %s\n" "${db_path}" >&2 + exit 1 +fi + +if [[ "${engine}" != "mace" && "${engine}" != "rocksdb" ]]; then + printf "engine must be mace or rocksdb\n" >&2 + exit 1 +fi + +mkdir -p "$(dirname -- "${db_path}")" +mkdir -p "$(dirname -- "${result_file}")" +mkdir -p "$(dirname -- "${restart_file}")" + +soak_hours="${SOAK_HOURS:-12}" +crash_interval_secs="${CRASH_INTERVAL_SECS:-1800}" +verify_measure_secs="${VERIFY_MEASURE_SECS:-30}" +run_measure_secs="${RUN_MEASURE_SECS:-3600}" +warmup_secs="${WARMUP_SECS:-30}" + +# baseline: tier-m + W3 + P2 + 12 threads +workload_main="W3" +workload_verify="W6" +threads=12 +key_size=32 +value_size=1024 +prefill_keys=18302417 +read_path="${READ_PATH:-snapshot}" +durability="${DURABILITY:-relaxed}" + +start_epoch="$(date +%s)" +end_epoch="$((start_epoch + soak_hours * 3600))" + +run_cmd() { + local workload="$1" + local measure_secs="$2" + local skip_prefill="$3" + + if [ "${engine}" = "mace" ]; then + local cleanup_flag + cleanup_flag="false" + "${root_dir}/target/release/kv_bench" \ + --path "${db_path}" \ + --workload "${workload}" \ + --threads "${threads}" \ + --key-size "${key_size}" \ + --value-size "${value_size}" \ + --prefill-keys "${prefill_keys}" \ + --warmup-secs "${warmup_secs}" \ + --measure-secs "${measure_secs}" \ + --shared-keyspace true \ + --read-path "${read_path}" \ + --durability "${durability}" \ + --cleanup "${cleanup_flag}" \ + --reuse-path \ + $( [ "${skip_prefill}" = "1" ] && printf '%s' "--skip-prefill" ) \ + --result-file "${result_file}" + else + "${root_dir}/rocksdb/build/release/rocksdb_bench" \ + --path "${db_path}" \ + --workload "${workload}" \ + --threads "${threads}" \ + --key-size "${key_size}" \ + --value-size "${value_size}" \ + --prefill-keys "${prefill_keys}" \ + --warmup-secs "${warmup_secs}" \ + --measure-secs "${measure_secs}" \ + --read-path "${read_path}" \ + --durability "${durability}" \ + --no-cleanup \ + --reuse-path \ + $( [ "${skip_prefill}" = "1" ] && printf '%s' "--skip-prefill" ) \ + --result-file "${result_file}" + fi +} + +if [ ! -f "${restart_file}" ]; then + printf "cycle,start_epoch,kill_sent,worker_exit,restart_status,restart_ready_ms\n" > "${restart_file}" +fi + +# seed dataset once (with prefill) +printf "[phase4][%s] seed dataset at %s\n" "${engine}" "${db_path}" +run_cmd "${workload_main}" 5 0 + +cycle=0 +while [ "$(date +%s)" -lt "${end_epoch}" ]; do + cycle="$((cycle + 1))" + cycle_start="$(date +%s)" + printf "[phase4][%s] cycle=%s start=%s\n" "${engine}" "${cycle}" "${cycle_start}" + + # long run in background; kill after interval + (run_cmd "${workload_main}" "${run_measure_secs}" 1) & + runner_pid=$! + + sleep "${crash_interval_secs}" + + kill_sent=0 + if kill -0 "${runner_pid}" 2>/dev/null; then + kill -9 "${runner_pid}" || true + kill_sent=1 + fi + + set +e + wait "${runner_pid}" + worker_exit=$? + set -e + + restart_start_ms="$(date +%s%3N)" + restart_status=0 + set +e + run_cmd "${workload_verify}" "${verify_measure_secs}" 1 + restart_status=$? + set -e + restart_end_ms="$(date +%s%3N)" + restart_ready_ms="$((restart_end_ms - restart_start_ms))" + + printf "%s,%s,%s,%s,%s,%s\n" \ + "${cycle}" "${cycle_start}" "${kill_sent}" "${worker_exit}" "${restart_status}" "${restart_ready_ms}" >> "${restart_file}" +done + +python3 "${script_dir}/phase4_report.py" "${restart_file}" +printf "Phase 4 soak finished. Results: %s | Restart log: %s\n" "${result_file}" "${restart_file}" diff --git a/src/main.rs b/src/main.rs index 674f542..0cdd9b2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -86,6 +86,12 @@ struct Args { #[arg(long, default_value_t = true)] cleanup: bool, + + #[arg(long, default_value_t = false)] + skip_prefill: bool, + + #[arg(long, default_value_t = false)] + reuse_path: bool, } #[derive(Clone, Copy, Debug, PartialEq, Eq)] @@ -619,7 +625,7 @@ fn main() { eprintln!("path is empty"); exit(1); } - if path.exists() { + if path.exists() && !args.reuse_path { eprintln!("path {:?} already exists", args.path); exit(1); } @@ -700,10 +706,16 @@ fn main() { let db = Mace::new(opt.validate().unwrap()).unwrap(); db.disable_gc(); - let bkt = db.new_bucket("default").unwrap(); + let bkt = if args.reuse_path { + db.get_bucket("default") + .or_else(|_| db.new_bucket("default")) + .unwrap() + } else { + db.new_bucket("default").unwrap() + }; let value = Arc::new(vec![b'0'; args.value_size]); - if workload.requires_prefill { + if workload.requires_prefill && !args.skip_prefill { let mut fill_handles = Vec::with_capacity(args.threads); for tid in 0..args.threads { let bucket = bkt.clone();