Compare commits
2 Commits
cbc1e24b08
...
6c5ec881f6
| Author | SHA1 | Date | |
|---|---|---|---|
| 6c5ec881f6 | |||
| 0fe2673791 |
93
plan_exec.md
93
plan_exec.md
@ -1,93 +0,0 @@
|
||||
# kv_bench 执行记录(benchmark_refactor)
|
||||
|
||||
## Phase 0(已完成)
|
||||
- 日期:2026-03-03
|
||||
- 范围:
|
||||
- 重构 `src/main.rs` 与 `rocksdb/main.cpp`,完成 v2 方法学最小清单:
|
||||
- workload preset:`W1..W6`
|
||||
- mixed/read/scan 的 prefill + shared keyspace
|
||||
- 时长模式:`--warmup-secs` / `--measure-secs`
|
||||
- 显式 read path parity:`--read-path snapshot|rw_txn`
|
||||
- 统一 schema 结果落盘(CSV)并自动附带机器/环境元数据
|
||||
- 更新脚本:`scripts/mace.sh`、`scripts/rocksdb.sh`、`scripts/plot.py`、`scripts/init.sh`
|
||||
- 默认数据目录切换为 `/nvme` 体系(脚本强制 db_root 在 `/nvme` 下)
|
||||
- 编译验证:
|
||||
- `cargo check -q` 通过
|
||||
- `cargo build --release -q` 通过
|
||||
- `cmake --build --preset release -j` 通过
|
||||
- 运行烟测:
|
||||
- `mace` 与 `rocksdb` 均可按新参数运行并写入统一 schema 结果文件
|
||||
- 提交:`0649db5` (`phase0: align benchmark v2 workload protocol`)
|
||||
|
||||
## Phase 1(已完成)
|
||||
- 日期:2026-03-03
|
||||
- 范围:
|
||||
- 新增 `scripts/phase1.sh`:按文档矩阵执行小规模试跑
|
||||
- dataset:`tier-s`
|
||||
- workload:`W1/W3/W6`
|
||||
- profile:`P2/P3`
|
||||
- threads:`1/12`
|
||||
- repeats:默认 `3`(可由 `REPEATS` 覆盖)
|
||||
- 新增 `scripts/phase1_eval.py`:按 case 聚合并计算
|
||||
- throughput CV
|
||||
- p99 CV
|
||||
- 稳定性通过率(门槛:throughput CV<=10%, p99 CV<=15%)
|
||||
- 验证:
|
||||
- `bash -n scripts/phase1.sh` 通过
|
||||
- `python3 -m py_compile scripts/phase1_eval.py` 通过
|
||||
- 提交:`436e813` (`phase1: add trial matrix runner and cv evaluator`)
|
||||
|
||||
## Phase 2(已完成)
|
||||
- 日期:2026-03-03
|
||||
- 范围:
|
||||
- 新增 `scripts/phase2.sh`:稳态核心报告矩阵执行器
|
||||
- `tier-m` 全量:`W1/W2/W3/W4/W6` × `P2/P3` × `threads(1/6/12)` × `repeats(默认5)`
|
||||
- 可选 `tier-l` 代表集:`RUN_TIER_L_REPRESENTATIVE=1` 启用,默认 `TIER_L_REPEATS=1`
|
||||
- 新增 `scripts/phase2_report.py`:输出按 case 的 `throughput/p95/p99 median`,并给出慢场景对比表
|
||||
- 验证:
|
||||
- `bash -n scripts/phase2.sh` 通过
|
||||
- `python3 -m py_compile scripts/phase2_report.py` 通过
|
||||
- 提交:`f0fd573` (`phase2: add steady-state matrix runner and report`)
|
||||
|
||||
## Phase 3(已完成)
|
||||
- 日期:2026-03-03
|
||||
- 范围:
|
||||
- 扩展 `src/main.rs` 与 `rocksdb/main.cpp`:
|
||||
- 新增 `--durability relaxed|durable`
|
||||
- 统一结果 schema 新增 `durability_mode` 字段
|
||||
- 新增 `scripts/phase3.sh`:耐久性成本矩阵执行器
|
||||
- dataset:`tier-m`
|
||||
- workload:`W1/W3/W6`
|
||||
- profile:`P2`
|
||||
- threads:`1/12`
|
||||
- repeats:默认 `5`
|
||||
- durability:`relaxed` 与 `durable`
|
||||
- 新增 `scripts/phase3_report.py`:
|
||||
- 输出 `throughput_drop_pct`
|
||||
- 输出 `p99_inflation_pct`
|
||||
- 验证:
|
||||
- `cargo check -q` 通过
|
||||
- `cmake --build --preset release -j` 通过
|
||||
- `bash -n scripts/phase3.sh` 通过
|
||||
- `python3 -m py_compile scripts/phase3_report.py` 通过
|
||||
- 提交:`4b7c511` (`phase3: add durability mode matrix and cost report`)
|
||||
|
||||
## Phase 4(已完成)
|
||||
- 日期:2026-03-03
|
||||
- 范围:
|
||||
- 扩展 `src/main.rs` 与 `rocksdb/main.cpp`:
|
||||
- 新增 `--reuse-path`(允许复用已存在数据目录)
|
||||
- 新增 `--skip-prefill`(重启后直接用既有数据集)
|
||||
- 新增 `scripts/phase4_soak.sh`:
|
||||
- baseline:`tier-m + W3 + P2 + 12 threads`
|
||||
- 周期性 `kill -9` + restart 验证(默认每 30 分钟)
|
||||
- 记录 `restart_ready_ms` 与退出状态到 `phase4_restart.csv`
|
||||
- 新增 `scripts/phase4_report.py`:
|
||||
- 汇总 restart 成功率
|
||||
- 输出 restart ready 时间分位(p50/p95/p99/max)
|
||||
- 验证:
|
||||
- `cargo check -q` 通过
|
||||
- `cmake --build --preset release -j` 通过
|
||||
- `bash -n scripts/phase4_soak.sh` 通过
|
||||
- `python3 -m py_compile scripts/phase4_report.py` 通过
|
||||
- 提交:`536a3f8` (`phase4: add soak/restart runner and recovery checks`)
|
||||
@ -288,13 +288,6 @@ static bool workload_runs_gc(const WorkloadSpec &spec) { return spec.requires_pr
|
||||
|
||||
static void run_prefill_gc(rocksdb::OptimisticTransactionDB *db, rocksdb::ColumnFamilyHandle *handle) {
|
||||
require_ok(db->EnableAutoCompaction({handle}), "enable auto compaction");
|
||||
|
||||
rocksdb::FlushOptions flush_options;
|
||||
flush_options.wait = true;
|
||||
require_ok(db->Flush(flush_options, handle), "prefill flush");
|
||||
|
||||
rocksdb::CompactRangeOptions compact_options;
|
||||
require_ok(db->CompactRange(compact_options, handle, nullptr, nullptr), "prefill compaction");
|
||||
}
|
||||
|
||||
static std::vector<ThreadRange> split_ranges(size_t total, size_t n) {
|
||||
@ -555,7 +548,10 @@ static bool run_one_op(OpKind op, rocksdb::OptimisticTransactionDB *db, rocksdb:
|
||||
std::string out;
|
||||
|
||||
if (read_path == ReadPath::Snapshot) {
|
||||
auto *snapshot = db->GetSnapshot();
|
||||
ropt.snapshot = snapshot;
|
||||
auto st = db->Get(ropt, handle, key, &out);
|
||||
db->ReleaseSnapshot(snapshot);
|
||||
return st.ok();
|
||||
}
|
||||
|
||||
@ -643,6 +639,8 @@ static bool run_one_op(OpKind op, rocksdb::OptimisticTransactionDB *db, rocksdb:
|
||||
auto scan_limit = std::max<size_t>(scan_len, 1);
|
||||
|
||||
if (read_path == ReadPath::Snapshot) {
|
||||
auto *snapshot = db->GetSnapshot();
|
||||
ropt.snapshot = snapshot;
|
||||
auto *iter = db->NewIterator(ropt, handle);
|
||||
iter->Seek(prefix.value());
|
||||
size_t scanned = 0;
|
||||
@ -654,6 +652,7 @@ static bool run_one_op(OpKind op, rocksdb::OptimisticTransactionDB *db, rocksdb:
|
||||
}
|
||||
auto st = iter->status();
|
||||
delete iter;
|
||||
db->ReleaseSnapshot(snapshot);
|
||||
return st.ok();
|
||||
}
|
||||
|
||||
@ -779,7 +778,7 @@ int main(int argc, char *argv[]) {
|
||||
cfo.min_blob_size = args.blob_size;
|
||||
cfo.disable_auto_compactions = true;
|
||||
cfo.write_buffer_size = 64 << 20;
|
||||
cfo.max_write_buffer_number = 128;
|
||||
cfo.max_write_buffer_number = 16;
|
||||
|
||||
auto cache = rocksdb::NewLRUCache(3 << 30);
|
||||
rocksdb::BlockBasedTableOptions table_options{};
|
||||
@ -1009,8 +1008,8 @@ int main(int argc, char *argv[]) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
fmt::println(
|
||||
"engine=rocksdb workload={} mode={} durability={} threads={} total_op={} ok_op={} err_op={} ops={} p99_us={} result_file={}",
|
||||
fmt::println("engine=rocksdb workload={} mode={} durability={} threads={} total_op={} ok_op={} err_op={} ops={} "
|
||||
"p99_us={} result_file={}",
|
||||
row.workload_id, row.mode, durability_str(row.durability_mode), row.threads, row.total_op, row.ok_op,
|
||||
row.err_op, static_cast<uint64_t>(row.ops), row.quantiles.p99_us, args.result_file);
|
||||
|
||||
|
||||
@ -5,6 +5,70 @@ import sys
|
||||
import pandas as pd
|
||||
|
||||
|
||||
WORKLOAD_TEMPLATE = [
|
||||
("W1", "`W1` (95R/5U, uniform)"),
|
||||
("W2", "`W2` (95R/5U, zipf)"),
|
||||
("W3", "`W3` (50R/50U)"),
|
||||
("W4", "`W4` (5R/95U)"),
|
||||
("W5", "`W5` (70R/25U/5S)"),
|
||||
("W6", "`W6` (100% scan)"),
|
||||
]
|
||||
WORKLOAD_LABELS = dict(WORKLOAD_TEMPLATE)
|
||||
|
||||
|
||||
def format_ratio(v: object) -> str:
|
||||
if pd.isna(v):
|
||||
return "N/A"
|
||||
return f"**{float(v):.1f}x**"
|
||||
|
||||
|
||||
def print_workload_summary_table(out_df: pd.DataFrame) -> None:
|
||||
template_order = [workload_id for workload_id, _ in WORKLOAD_TEMPLATE]
|
||||
observed = sorted(
|
||||
out_df["workload_id"].dropna().astype(str).unique().tolist()
|
||||
)
|
||||
workload_order = template_order + [w for w in observed if w not in WORKLOAD_LABELS]
|
||||
|
||||
print("\nSummary table (template format):")
|
||||
print(
|
||||
"| Workload | Mace wins (ops) | ops median ratio (Mace/RocksDB) | "
|
||||
"Mace wins (p99) | p99 median ratio (Mace/RocksDB) |"
|
||||
)
|
||||
print("|---|---:|---:|---:|--:|")
|
||||
|
||||
for workload_id in workload_order:
|
||||
sub = out_df[out_df["workload_id"] == workload_id]
|
||||
|
||||
ops_ratio = (
|
||||
pd.to_numeric(
|
||||
sub["ops_ratio_mace_over_rocksdb"], errors="coerce"
|
||||
)
|
||||
.replace([float("inf"), float("-inf")], pd.NA)
|
||||
.dropna()
|
||||
)
|
||||
p99_ratio = (
|
||||
pd.to_numeric(
|
||||
sub["p99_ratio_mace_over_rocksdb"], errors="coerce"
|
||||
)
|
||||
.replace([float("inf"), float("-inf")], pd.NA)
|
||||
.dropna()
|
||||
)
|
||||
|
||||
ops_win = int((ops_ratio > 1.0).sum())
|
||||
p99_win = int((p99_ratio < 1.0).sum())
|
||||
ops_total = int(len(ops_ratio))
|
||||
p99_total = int(len(p99_ratio))
|
||||
ops_median = ops_ratio.median() if ops_total > 0 else pd.NA
|
||||
p99_median = p99_ratio.median() if p99_total > 0 else pd.NA
|
||||
|
||||
workload_label = WORKLOAD_LABELS.get(workload_id, f"`{workload_id}`")
|
||||
print(
|
||||
f"| {workload_label} | {ops_win} / {ops_total} | "
|
||||
f"{format_ratio(ops_median)} | {p99_win} / {p99_total} | "
|
||||
f"{format_ratio(p99_median)} |"
|
||||
)
|
||||
|
||||
|
||||
def main() -> int:
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Compare mace vs rocksdb from benchmark_results.csv"
|
||||
@ -106,6 +170,7 @@ def main() -> int:
|
||||
print("\nInterpretation:")
|
||||
print("- ops_ratio_mace_over_rocksdb > 1: mace has higher throughput")
|
||||
print("- p99_ratio_mace_over_rocksdb < 1: mace has lower p99 latency")
|
||||
print_workload_summary_table(out)
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
18
src/main.rs
18
src/main.rs
@ -711,13 +711,12 @@ fn main() {
|
||||
let mut opt = Options::new(path);
|
||||
opt.sync_on_write = durability_mode == DurabilityMode::Durable;
|
||||
opt.inline_size = args.blob_size;
|
||||
opt.cache_capacity = 3 << 30;
|
||||
opt.data_file_size = 64 << 20;
|
||||
opt.max_log_size = 1 << 30;
|
||||
opt.wal_buffer_size = 64 << 20;
|
||||
opt.wal_file_size = 128 << 20;
|
||||
opt.default_arenas = 128;
|
||||
opt.gc_timeout = 600 * 1000;
|
||||
opt.checkpoint_size = 128 << 20;
|
||||
opt.cache_capacity = 1 << 30;
|
||||
opt.lru_capacity = 1 << 30;
|
||||
opt.pool_capacity = 1 << 30;
|
||||
opt.enable_backpressure = true;
|
||||
opt.gc_timeout = 5 * 1000;
|
||||
opt.gc_eager = false;
|
||||
opt.data_garbage_ratio = 50;
|
||||
opt.tmp_store = cleanup;
|
||||
@ -770,7 +769,6 @@ fn main() {
|
||||
|
||||
if workload_runs_gc(&workload) {
|
||||
db.enable_gc();
|
||||
db.start_gc();
|
||||
}
|
||||
|
||||
let op_counts = split_ranges(args.iterations, args.threads);
|
||||
@ -851,7 +849,7 @@ fn main() {
|
||||
{
|
||||
let now = Instant::now();
|
||||
let mut slot = measure_start_slot.lock().unwrap();
|
||||
if slot.map_or(true, |prev| now < prev) {
|
||||
if slot.map_or_else(|| true, |prev| now < prev) {
|
||||
*slot = Some(now);
|
||||
}
|
||||
}
|
||||
@ -926,7 +924,7 @@ fn main() {
|
||||
{
|
||||
let now = Instant::now();
|
||||
let mut slot = measure_start.lock().unwrap();
|
||||
if slot.map_or(true, |prev| now < prev) {
|
||||
if slot.map_or_else(|| true, |prev| now < prev) {
|
||||
*slot = Some(now);
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user