#!/usr/bin/env python3 import argparse import base64 import csv import json import statistics from collections import defaultdict from pathlib import Path from typing import Any ENGINE_STYLE_FALLBACK = [ "solid", "hatch", "dot", ] WORKLOAD_TEMPLATE = [ ("W1", "W1 (95R/5U, uniform)"), ("W2", "W2 (95R/5U, zipf)"), ("W3", "W3 (50R/50U)"), ("W4", "W4 (5R/95U)"), ("W5", "W5 (70R/25U/5S)"), ("W6", "W6 (100% scan)"), ] WORKLOAD_LABELS = dict(WORKLOAD_TEMPLATE) def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description=( "Generate a single-page HTML report from benchmark CSV. " "Each workload gets two line charts: ops and p99_us." ) ) parser.add_argument("csv_path", help="Input CSV path, e.g. benchmark_results.csv") parser.add_argument( "-o", "--output", help="Output HTML path (default: _report.html)", ) return parser.parse_args() def to_int(value: str) -> int: return int(float(value)) def to_float(value: str) -> float: return float(value) def format_metric(metric: str, value: float) -> str: if metric == "ops": if abs(value) >= 100: return f"{value:,.0f}" return f"{value:.2f}" if abs(value) >= 100: return f"{value:,.0f}" return f"{value:.2f}" def workload_sort_key(workload: str) -> tuple[int, str, int]: prefix = "" suffix = "" for idx, ch in enumerate(workload): if ch.isdigit(): prefix = workload[:idx] suffix = workload[idx:] break if suffix.isdigit(): return (0, prefix, int(suffix)) return (1, workload, 0) def workload_label(workload: str) -> str: return WORKLOAD_LABELS.get(workload, workload) def engine_style(engine: str, index: int) -> str: normalized = engine.strip().lower() if normalized == "mace": return "solid" if normalized == "rocksdb": return "hatch" return ENGINE_STYLE_FALLBACK[index % len(ENGINE_STYLE_FALLBACK)] def color_for_index(index: int) -> str: hue = (index * 137.508) % 360.0 return f"hsl({hue:.1f}, 70%, 45%)" def read_and_aggregate(csv_path: Path) -> tuple[list[dict[str, Any]], set[str], set[tuple[int, int]]]: required = { "engine", "workload_id", "threads", "key_size", "value_size", "ops", "p99_us", } grouped: dict[tuple[str, str, int, int, int], dict[str, list[float]]] = defaultdict( lambda: {"ops": [], "p99_us": []} ) engines: set[str] = set() kv_pairs: set[tuple[int, int]] = set() with csv_path.open("r", encoding="utf-8", newline="") as f: reader = csv.DictReader(f) if reader.fieldnames is None: raise ValueError("CSV header is missing") missing = required - set(reader.fieldnames) if missing: raise ValueError(f"Missing required columns: {sorted(missing)}") skipped = 0 for row in reader: try: engine = str(row["engine"]).strip() workload = str(row["workload_id"]).strip() threads = to_int(str(row["threads"])) key_size = to_int(str(row["key_size"])) value_size = to_int(str(row["value_size"])) ops = to_float(str(row["ops"])) p99 = to_float(str(row["p99_us"])) except (TypeError, ValueError): skipped += 1 continue if not engine or not workload: skipped += 1 continue key = (workload, engine, key_size, value_size, threads) grouped[key]["ops"].append(ops) grouped[key]["p99_us"].append(p99) engines.add(engine) kv_pairs.add((key_size, value_size)) rows: list[dict[str, Any]] = [] for (workload, engine, key_size, value_size, threads), values in grouped.items(): if not values["ops"] or not values["p99_us"]: continue rows.append( { "workload": workload, "engine": engine, "key_size": key_size, "value_size": value_size, "threads": threads, "ops": float(statistics.median(values["ops"])), "p99_us": float(statistics.median(values["p99_us"])), } ) if not rows: raise ValueError("No valid rows parsed from CSV") return rows, engines, kv_pairs def build_report_payload(rows: list[dict[str, Any]], engines: set[str], kv_pairs: set[tuple[int, int]]) -> dict[str, Any]: engine_order = sorted(engines) kv_order = sorted(kv_pairs) engine_to_style = { engine: engine_style(engine, idx) for idx, engine in enumerate(engine_order) } kv_to_color = { kv: color_for_index(idx) for idx, kv in enumerate(kv_order) } by_workload: dict[str, dict[tuple[str, int, int], list[dict[str, Any]]]] = defaultdict( lambda: defaultdict(list) ) for row in rows: series_key = (row["engine"], row["key_size"], row["value_size"]) by_workload[row["workload"]][series_key].append(row) workload_items = [] for workload in sorted(by_workload.keys(), key=workload_sort_key): metric_datasets: dict[str, list[dict[str, Any]]] = {"ops": [], "p99_us": []} series_map = by_workload[workload] for engine, key_size, value_size in sorted(series_map.keys(), key=lambda x: (x[1], x[2], x[0])): points = sorted(series_map[(engine, key_size, value_size)], key=lambda x: x["threads"]) color = kv_to_color[(key_size, value_size)] style = engine_to_style[engine] for metric in ("ops", "p99_us"): data_points = [ { "x": p["threads"], "y": p[metric], "label": format_metric(metric, p[metric]), } for p in points ] metric_datasets[metric].append( { "label": f"{engine} (k={key_size}, v={value_size})", "data": data_points, "borderColor": color, "backgroundColor": color, "borderWidth": 2, "engineStyle": style, } ) workload_items.append( { "id": workload, "label": workload_label(workload), "charts": metric_datasets, } ) legend_pairs = [ {"key_size": k, "value_size": v, "color": kv_to_color[(k, v)]} for k, v in kv_order ] legend_engines = [ {"engine": engine, "style": engine_to_style[engine]} for engine in engine_order ] return { "workloads": workload_items, "kvLegend": legend_pairs, "engineLegend": legend_engines, } def render_html(payload: dict[str, Any], source_csv: str, source_csv_name: str, source_csv_b64: str) -> str: payload_json = json.dumps(payload, ensure_ascii=False) return f""" Benchmark 报告

Benchmark 报告

数据来源: {source_csv} (点击下载原始 CSV)
颜色: key/value 对 (跨 engine 保持一致)
填充样式: engine
""" def main() -> int: args = parse_args() csv_path = Path(args.csv_path) if not csv_path.exists(): raise FileNotFoundError(f"CSV file not found: {csv_path}") output_path = ( Path(args.output) if args.output else csv_path.with_suffix(".html") ) rows, engines, kv_pairs = read_and_aggregate(csv_path) payload = build_report_payload(rows, engines, kv_pairs) csv_bytes = csv_path.read_bytes() csv_b64 = base64.b64encode(csv_bytes).decode("ascii") html = render_html(payload, str(csv_path), csv_path.name, csv_b64) output_path.write_text(html, encoding="utf-8") print(f"HTML written to: {output_path}") print(f"Workloads: {len(payload['workloads'])}, engines: {len(engines)}, kv pairs: {len(kv_pairs)}") return 0 if __name__ == "__main__": raise SystemExit(main())