phase4: add soak/restart runner and recovery checks
This commit is contained in:
parent
4b7c511703
commit
536a3f8fbe
20
plan_exec.md
20
plan_exec.md
@ -70,4 +70,24 @@
|
|||||||
- `cmake --build --preset release -j` 通过
|
- `cmake --build --preset release -j` 通过
|
||||||
- `bash -n scripts/phase3.sh` 通过
|
- `bash -n scripts/phase3.sh` 通过
|
||||||
- `python3 -m py_compile scripts/phase3_report.py` 通过
|
- `python3 -m py_compile scripts/phase3_report.py` 通过
|
||||||
|
- 提交:`4b7c511` (`phase3: add durability mode matrix and cost report`)
|
||||||
|
|
||||||
|
## Phase 4(已完成)
|
||||||
|
- 日期:2026-03-03
|
||||||
|
- 范围:
|
||||||
|
- 扩展 `src/main.rs` 与 `rocksdb/main.cpp`:
|
||||||
|
- 新增 `--reuse-path`(允许复用已存在数据目录)
|
||||||
|
- 新增 `--skip-prefill`(重启后直接用既有数据集)
|
||||||
|
- 新增 `scripts/phase4_soak.sh`:
|
||||||
|
- baseline:`tier-m + W3 + P2 + 12 threads`
|
||||||
|
- 周期性 `kill -9` + restart 验证(默认每 30 分钟)
|
||||||
|
- 记录 `restart_ready_ms` 与退出状态到 `phase4_restart.csv`
|
||||||
|
- 新增 `scripts/phase4_report.py`:
|
||||||
|
- 汇总 restart 成功率
|
||||||
|
- 输出 restart ready 时间分位(p50/p95/p99/max)
|
||||||
|
- 验证:
|
||||||
|
- `cargo check -q` 通过
|
||||||
|
- `cmake --build --preset release -j` 通过
|
||||||
|
- `bash -n scripts/phase4_soak.sh` 通过
|
||||||
|
- `python3 -m py_compile scripts/phase4_report.py` 通过
|
||||||
- 提交:待本阶段 commit
|
- 提交:待本阶段 commit
|
||||||
|
|||||||
@ -83,6 +83,8 @@ struct Args {
|
|||||||
std::string durability = "relaxed";
|
std::string durability = "relaxed";
|
||||||
std::string result_file = "benchmark_results.csv";
|
std::string result_file = "benchmark_results.csv";
|
||||||
bool cleanup = true;
|
bool cleanup = true;
|
||||||
|
bool skip_prefill = false;
|
||||||
|
bool reuse_path = false;
|
||||||
};
|
};
|
||||||
|
|
||||||
enum class Distribution {
|
enum class Distribution {
|
||||||
@ -706,6 +708,8 @@ int main(int argc, char *argv[]) {
|
|||||||
app.add_flag("--random", args.random, "Shuffle insert keys (legacy insert)");
|
app.add_flag("--random", args.random, "Shuffle insert keys (legacy insert)");
|
||||||
app.add_flag("--no-shared-keyspace", disable_shared, "Use per-thread keyspace");
|
app.add_flag("--no-shared-keyspace", disable_shared, "Use per-thread keyspace");
|
||||||
app.add_flag("--no-cleanup", disable_cleanup, "Keep db directory after run");
|
app.add_flag("--no-cleanup", disable_cleanup, "Keep db directory after run");
|
||||||
|
app.add_flag("--skip-prefill", args.skip_prefill, "Skip prefill and use existing dataset");
|
||||||
|
app.add_flag("--reuse-path", args.reuse_path, "Allow opening existing db path");
|
||||||
|
|
||||||
CLI11_PARSE(app, argc, argv);
|
CLI11_PARSE(app, argc, argv);
|
||||||
|
|
||||||
@ -719,7 +723,7 @@ int main(int argc, char *argv[]) {
|
|||||||
fmt::println(stderr, "path is empty");
|
fmt::println(stderr, "path is empty");
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
if (std::filesystem::exists(args.path)) {
|
if (std::filesystem::exists(args.path) && !args.reuse_path) {
|
||||||
fmt::println(stderr, "path `{}` already exists", args.path);
|
fmt::println(stderr, "path `{}` already exists", args.path);
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
@ -809,7 +813,7 @@ int main(int argc, char *argv[]) {
|
|||||||
|
|
||||||
std::string value(args.value_size, '0');
|
std::string value(args.value_size, '0');
|
||||||
|
|
||||||
if (workload_spec.requires_prefill) {
|
if (workload_spec.requires_prefill && !args.skip_prefill) {
|
||||||
std::vector<std::thread> fill_threads;
|
std::vector<std::thread> fill_threads;
|
||||||
fill_threads.reserve(args.threads);
|
fill_threads.reserve(args.threads);
|
||||||
for (size_t tid = 0; tid < args.threads; ++tid) {
|
for (size_t tid = 0; tid < args.threads; ++tid) {
|
||||||
|
|||||||
40
scripts/phase4_report.py
Executable file
40
scripts/phase4_report.py
Executable file
@ -0,0 +1,40 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
import sys
|
||||||
|
import pandas as pd
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> int:
|
||||||
|
if len(sys.argv) != 2:
|
||||||
|
print(f"Usage: {sys.argv[0]} <restart_csv>")
|
||||||
|
return 1
|
||||||
|
|
||||||
|
df = pd.read_csv(sys.argv[1])
|
||||||
|
needed = {"cycle", "restart_status", "restart_ready_ms", "worker_exit"}
|
||||||
|
missing = needed - set(df.columns)
|
||||||
|
if missing:
|
||||||
|
raise ValueError(f"Missing columns: {sorted(missing)}")
|
||||||
|
|
||||||
|
total = len(df)
|
||||||
|
restart_ok = int((df["restart_status"] == 0).sum())
|
||||||
|
worker_nonzero = int((df["worker_exit"] != 0).sum())
|
||||||
|
|
||||||
|
print(f"cycles={total}")
|
||||||
|
print(f"restart_success={restart_ok}/{total} ({(restart_ok / total * 100.0) if total else 0.0:.1f}%)")
|
||||||
|
print(f"worker_nonzero_exit={worker_nonzero}/{total}")
|
||||||
|
|
||||||
|
if total:
|
||||||
|
q = df["restart_ready_ms"].quantile([0.5, 0.95, 0.99]).to_dict()
|
||||||
|
print(
|
||||||
|
"restart_ready_ms: "
|
||||||
|
f"p50={q.get(0.5, 0):.1f}, "
|
||||||
|
f"p95={q.get(0.95, 0):.1f}, "
|
||||||
|
f"p99={q.get(0.99, 0):.1f}, "
|
||||||
|
f"max={df['restart_ready_ms'].max():.1f}"
|
||||||
|
)
|
||||||
|
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
raise SystemExit(main())
|
||||||
140
scripts/phase4_soak.sh
Executable file
140
scripts/phase4_soak.sh
Executable file
@ -0,0 +1,140 @@
|
|||||||
|
#!/usr/bin/env bash
|
||||||
|
|
||||||
|
set -euo pipefail
|
||||||
|
|
||||||
|
if [ "$#" -lt 2 ] || [ "$#" -gt 4 ]; then
|
||||||
|
printf "Usage: %s <engine:mace|rocksdb> <db_path_under_/nvme> [result_csv] [restart_csv]\n" "$0"
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
|
||||||
|
engine="$1"
|
||||||
|
db_path="$2"
|
||||||
|
|
||||||
|
script_dir="$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" && pwd)"
|
||||||
|
root_dir="$(cd -- "${script_dir}/.." && pwd)"
|
||||||
|
|
||||||
|
result_file="${3:-${script_dir}/phase4_results.csv}"
|
||||||
|
restart_file="${4:-${script_dir}/phase4_restart.csv}"
|
||||||
|
|
||||||
|
if [[ "${db_path}" != /nvme* ]]; then
|
||||||
|
printf "db_path must be under /nvme, got: %s\n" "${db_path}" >&2
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
|
||||||
|
if [[ "${engine}" != "mace" && "${engine}" != "rocksdb" ]]; then
|
||||||
|
printf "engine must be mace or rocksdb\n" >&2
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
|
||||||
|
mkdir -p "$(dirname -- "${db_path}")"
|
||||||
|
mkdir -p "$(dirname -- "${result_file}")"
|
||||||
|
mkdir -p "$(dirname -- "${restart_file}")"
|
||||||
|
|
||||||
|
soak_hours="${SOAK_HOURS:-12}"
|
||||||
|
crash_interval_secs="${CRASH_INTERVAL_SECS:-1800}"
|
||||||
|
verify_measure_secs="${VERIFY_MEASURE_SECS:-30}"
|
||||||
|
run_measure_secs="${RUN_MEASURE_SECS:-3600}"
|
||||||
|
warmup_secs="${WARMUP_SECS:-30}"
|
||||||
|
|
||||||
|
# baseline: tier-m + W3 + P2 + 12 threads
|
||||||
|
workload_main="W3"
|
||||||
|
workload_verify="W6"
|
||||||
|
threads=12
|
||||||
|
key_size=32
|
||||||
|
value_size=1024
|
||||||
|
prefill_keys=18302417
|
||||||
|
read_path="${READ_PATH:-snapshot}"
|
||||||
|
durability="${DURABILITY:-relaxed}"
|
||||||
|
|
||||||
|
start_epoch="$(date +%s)"
|
||||||
|
end_epoch="$((start_epoch + soak_hours * 3600))"
|
||||||
|
|
||||||
|
run_cmd() {
|
||||||
|
local workload="$1"
|
||||||
|
local measure_secs="$2"
|
||||||
|
local skip_prefill="$3"
|
||||||
|
|
||||||
|
if [ "${engine}" = "mace" ]; then
|
||||||
|
local cleanup_flag
|
||||||
|
cleanup_flag="false"
|
||||||
|
"${root_dir}/target/release/kv_bench" \
|
||||||
|
--path "${db_path}" \
|
||||||
|
--workload "${workload}" \
|
||||||
|
--threads "${threads}" \
|
||||||
|
--key-size "${key_size}" \
|
||||||
|
--value-size "${value_size}" \
|
||||||
|
--prefill-keys "${prefill_keys}" \
|
||||||
|
--warmup-secs "${warmup_secs}" \
|
||||||
|
--measure-secs "${measure_secs}" \
|
||||||
|
--shared-keyspace true \
|
||||||
|
--read-path "${read_path}" \
|
||||||
|
--durability "${durability}" \
|
||||||
|
--cleanup "${cleanup_flag}" \
|
||||||
|
--reuse-path \
|
||||||
|
$( [ "${skip_prefill}" = "1" ] && printf '%s' "--skip-prefill" ) \
|
||||||
|
--result-file "${result_file}"
|
||||||
|
else
|
||||||
|
"${root_dir}/rocksdb/build/release/rocksdb_bench" \
|
||||||
|
--path "${db_path}" \
|
||||||
|
--workload "${workload}" \
|
||||||
|
--threads "${threads}" \
|
||||||
|
--key-size "${key_size}" \
|
||||||
|
--value-size "${value_size}" \
|
||||||
|
--prefill-keys "${prefill_keys}" \
|
||||||
|
--warmup-secs "${warmup_secs}" \
|
||||||
|
--measure-secs "${measure_secs}" \
|
||||||
|
--read-path "${read_path}" \
|
||||||
|
--durability "${durability}" \
|
||||||
|
--no-cleanup \
|
||||||
|
--reuse-path \
|
||||||
|
$( [ "${skip_prefill}" = "1" ] && printf '%s' "--skip-prefill" ) \
|
||||||
|
--result-file "${result_file}"
|
||||||
|
fi
|
||||||
|
}
|
||||||
|
|
||||||
|
if [ ! -f "${restart_file}" ]; then
|
||||||
|
printf "cycle,start_epoch,kill_sent,worker_exit,restart_status,restart_ready_ms\n" > "${restart_file}"
|
||||||
|
fi
|
||||||
|
|
||||||
|
# seed dataset once (with prefill)
|
||||||
|
printf "[phase4][%s] seed dataset at %s\n" "${engine}" "${db_path}"
|
||||||
|
run_cmd "${workload_main}" 5 0
|
||||||
|
|
||||||
|
cycle=0
|
||||||
|
while [ "$(date +%s)" -lt "${end_epoch}" ]; do
|
||||||
|
cycle="$((cycle + 1))"
|
||||||
|
cycle_start="$(date +%s)"
|
||||||
|
printf "[phase4][%s] cycle=%s start=%s\n" "${engine}" "${cycle}" "${cycle_start}"
|
||||||
|
|
||||||
|
# long run in background; kill after interval
|
||||||
|
(run_cmd "${workload_main}" "${run_measure_secs}" 1) &
|
||||||
|
runner_pid=$!
|
||||||
|
|
||||||
|
sleep "${crash_interval_secs}"
|
||||||
|
|
||||||
|
kill_sent=0
|
||||||
|
if kill -0 "${runner_pid}" 2>/dev/null; then
|
||||||
|
kill -9 "${runner_pid}" || true
|
||||||
|
kill_sent=1
|
||||||
|
fi
|
||||||
|
|
||||||
|
set +e
|
||||||
|
wait "${runner_pid}"
|
||||||
|
worker_exit=$?
|
||||||
|
set -e
|
||||||
|
|
||||||
|
restart_start_ms="$(date +%s%3N)"
|
||||||
|
restart_status=0
|
||||||
|
set +e
|
||||||
|
run_cmd "${workload_verify}" "${verify_measure_secs}" 1
|
||||||
|
restart_status=$?
|
||||||
|
set -e
|
||||||
|
restart_end_ms="$(date +%s%3N)"
|
||||||
|
restart_ready_ms="$((restart_end_ms - restart_start_ms))"
|
||||||
|
|
||||||
|
printf "%s,%s,%s,%s,%s,%s\n" \
|
||||||
|
"${cycle}" "${cycle_start}" "${kill_sent}" "${worker_exit}" "${restart_status}" "${restart_ready_ms}" >> "${restart_file}"
|
||||||
|
done
|
||||||
|
|
||||||
|
python3 "${script_dir}/phase4_report.py" "${restart_file}"
|
||||||
|
printf "Phase 4 soak finished. Results: %s | Restart log: %s\n" "${result_file}" "${restart_file}"
|
||||||
18
src/main.rs
18
src/main.rs
@ -86,6 +86,12 @@ struct Args {
|
|||||||
|
|
||||||
#[arg(long, default_value_t = true)]
|
#[arg(long, default_value_t = true)]
|
||||||
cleanup: bool,
|
cleanup: bool,
|
||||||
|
|
||||||
|
#[arg(long, default_value_t = false)]
|
||||||
|
skip_prefill: bool,
|
||||||
|
|
||||||
|
#[arg(long, default_value_t = false)]
|
||||||
|
reuse_path: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
|
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
|
||||||
@ -619,7 +625,7 @@ fn main() {
|
|||||||
eprintln!("path is empty");
|
eprintln!("path is empty");
|
||||||
exit(1);
|
exit(1);
|
||||||
}
|
}
|
||||||
if path.exists() {
|
if path.exists() && !args.reuse_path {
|
||||||
eprintln!("path {:?} already exists", args.path);
|
eprintln!("path {:?} already exists", args.path);
|
||||||
exit(1);
|
exit(1);
|
||||||
}
|
}
|
||||||
@ -700,10 +706,16 @@ fn main() {
|
|||||||
|
|
||||||
let db = Mace::new(opt.validate().unwrap()).unwrap();
|
let db = Mace::new(opt.validate().unwrap()).unwrap();
|
||||||
db.disable_gc();
|
db.disable_gc();
|
||||||
let bkt = db.new_bucket("default").unwrap();
|
let bkt = if args.reuse_path {
|
||||||
|
db.get_bucket("default")
|
||||||
|
.or_else(|_| db.new_bucket("default"))
|
||||||
|
.unwrap()
|
||||||
|
} else {
|
||||||
|
db.new_bucket("default").unwrap()
|
||||||
|
};
|
||||||
let value = Arc::new(vec![b'0'; args.value_size]);
|
let value = Arc::new(vec![b'0'; args.value_size]);
|
||||||
|
|
||||||
if workload.requires_prefill {
|
if workload.requires_prefill && !args.skip_prefill {
|
||||||
let mut fill_handles = Vec::with_capacity(args.threads);
|
let mut fill_handles = Vec::with_capacity(args.threads);
|
||||||
for tid in 0..args.threads {
|
for tid in 0..args.threads {
|
||||||
let bucket = bkt.clone();
|
let bucket = bkt.clone();
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user