Unify benchmark op counters

This commit is contained in:
abbycin 2026-03-12 08:36:22 +08:00
parent b9b4306b5d
commit cbc1e24b08
Signed by: abby
GPG Key ID: B636E0F0307EF8EB
4 changed files with 69 additions and 60 deletions

View File

@ -178,7 +178,7 @@ Unified schema columns include:
- `durability_mode` (`relaxed` / `durable`) - `durability_mode` (`relaxed` / `durable`)
- `threads,key_size,value_size,prefill_keys` - `threads,key_size,value_size,prefill_keys`
- `ops` - `ops`
- `total_ops,ok_ops,err_ops` - `total_op,ok_op,err_op`
- `p50_us,p95_us,p99_us,p999_us` - `p50_us,p95_us,p99_us,p999_us`
- `read_path` - `read_path`
@ -207,4 +207,4 @@ Only compare rows with identical:
- `read_path` - `read_path`
- read-heavy workload rows are expected to include the pre-run GC/compaction pass described above - read-heavy workload rows are expected to include the pre-run GC/compaction pass described above
If `err_ops > 0`, investigate that case before drawing conclusions. If `err_op > 0`, investigate that case before drawing conclusions.

View File

@ -136,8 +136,8 @@ struct Quantiles {
}; };
struct ThreadStats { struct ThreadStats {
uint64_t total_ops = 0; uint64_t total_op = 0;
uint64_t err_ops = 0; uint64_t err_op = 0;
std::array<uint64_t, kLatencyBuckets> hist{}; std::array<uint64_t, kLatencyBuckets> hist{};
}; };
@ -160,9 +160,9 @@ struct ResultRow {
ReadPath read_path; ReadPath read_path;
uint64_t warmup_secs; uint64_t warmup_secs;
uint64_t measure_secs; uint64_t measure_secs;
uint64_t total_ops; uint64_t total_op;
uint64_t ok_ops; uint64_t ok_op;
uint64_t err_ops; uint64_t err_op;
double ops; double ops;
Quantiles quantiles; Quantiles quantiles;
uint64_t elapsed_us; uint64_t elapsed_us;
@ -440,7 +440,7 @@ static std::string csv_escape(const std::string &v) {
static const char *result_header() { static const char *result_header() {
return "schema_version,ts_epoch_ms,engine,workload_id,mode,durability_mode,threads,key_size,value_size,prefill_" return "schema_version,ts_epoch_ms,engine,workload_id,mode,durability_mode,threads,key_size,value_size,prefill_"
"keys,shared_keyspace,distribution,zipf_theta,read_pct,update_pct,scan_pct,scan_len,read_path,warmup_secs," "keys,shared_keyspace,distribution,zipf_theta,read_pct,update_pct,scan_pct,scan_len,read_path,warmup_secs,"
"measure_secs,total_ops,ok_ops,err_ops,ops,p50_us,p95_us,p99_us,p999_us,elapsed_us,host,os,arch,kernel,cpu_" "measure_secs,total_op,ok_op,err_op,ops,p50_us,p95_us,p99_us,p999_us,elapsed_us,host,os,arch,kernel,cpu_"
"cores,mem_total_kb,mem_available_kb"; "cores,mem_total_kb,mem_available_kb";
} }
@ -450,8 +450,8 @@ static std::string result_row_csv(const ResultRow &r) {
r.ts_epoch_ms, "rocksdb", csv_escape(r.workload_id), csv_escape(r.mode), r.ts_epoch_ms, "rocksdb", csv_escape(r.workload_id), csv_escape(r.mode),
durability_str(r.durability_mode), r.threads, r.key_size, r.value_size, r.prefill_keys, durability_str(r.durability_mode), r.threads, r.key_size, r.value_size, r.prefill_keys,
r.shared_keyspace, distribution_str(r.distribution), r.zipf_theta, r.read_pct, r.update_pct, r.shared_keyspace, distribution_str(r.distribution), r.zipf_theta, r.read_pct, r.update_pct,
r.scan_pct, r.scan_len, read_path_str(r.read_path), r.warmup_secs, r.measure_secs, r.total_ops, r.scan_pct, r.scan_len, read_path_str(r.read_path), r.warmup_secs, r.measure_secs, r.total_op,
r.ok_ops, r.err_ops, static_cast<uint64_t>(r.ops), r.quantiles.p50_us, r.quantiles.p95_us, r.ok_op, r.err_op, static_cast<uint64_t>(r.ops), r.quantiles.p50_us, r.quantiles.p95_us,
r.quantiles.p99_us, r.quantiles.p999_us, r.elapsed_us, csv_escape(r.meta.host), r.quantiles.p99_us, r.quantiles.p999_us, r.elapsed_us, csv_escape(r.meta.host),
csv_escape(r.meta.os), csv_escape(r.meta.arch), csv_escape(r.meta.kernel), r.meta.cpu_cores, csv_escape(r.meta.os), csv_escape(r.meta.arch), csv_escape(r.meta.kernel), r.meta.cpu_cores,
r.meta.mem_total_kb, r.meta.mem_available_kb); r.meta.mem_total_kb, r.meta.mem_available_kb);
@ -897,9 +897,9 @@ int main(int argc, char *argv[]) {
mark_measure_start(); mark_measure_start();
auto record = [&](bool ok, uint64_t us) { auto record = [&](bool ok, uint64_t us) {
stats.total_ops += 1; stats.total_op += 1;
if (!ok) { if (!ok) {
stats.err_ops += 1; stats.err_op += 1;
} }
auto b = latency_bucket(us); auto b = latency_bucket(us);
stats.hist[b] += 1; stats.hist[b] += 1;
@ -954,21 +954,21 @@ int main(int argc, char *argv[]) {
measure_begin_ns = measure_end_ns; measure_begin_ns = measure_end_ns;
} }
uint64_t elapsed_us = (measure_end_ns - measure_begin_ns) / 1000; uint64_t elapsed_us = (measure_end_ns - measure_begin_ns) / 1000;
uint64_t total_ops = 0; uint64_t total_op = 0;
uint64_t err_ops = 0; uint64_t err_op = 0;
std::array<uint64_t, kLatencyBuckets> merged_hist{}; std::array<uint64_t, kLatencyBuckets> merged_hist{};
for (const auto &s: thread_stats) { for (const auto &s: thread_stats) {
total_ops += s.total_ops; total_op += s.total_op;
err_ops += s.err_ops; err_op += s.err_op;
for (size_t i = 0; i < merged_hist.size(); ++i) { for (size_t i = 0; i < merged_hist.size(); ++i) {
merged_hist[i] += s.hist[i]; merged_hist[i] += s.hist[i];
} }
} }
auto ops = elapsed_us == 0 ? 0.0 : (static_cast<double>(total_ops) * 1'000'000.0 / static_cast<double>(elapsed_us)); auto ops = elapsed_us == 0 ? 0.0 : (static_cast<double>(total_op) * 1'000'000.0 / static_cast<double>(elapsed_us));
uint64_t ok_ops = total_ops >= err_ops ? (total_ops - err_ops) : 0; uint64_t ok_op = total_op >= err_op ? (total_op - err_op) : 0;
auto row = ResultRow{ auto row = ResultRow{
.ts_epoch_ms = now_epoch_ms(), .ts_epoch_ms = now_epoch_ms(),
@ -989,9 +989,9 @@ int main(int argc, char *argv[]) {
.read_path = read_path.value(), .read_path = read_path.value(),
.warmup_secs = effective_warmup_secs, .warmup_secs = effective_warmup_secs,
.measure_secs = effective_measure_secs, .measure_secs = effective_measure_secs,
.total_ops = total_ops, .total_op = total_op,
.ok_ops = ok_ops, .ok_op = ok_op,
.err_ops = err_ops, .err_op = err_op,
.ops = ops, .ops = ops,
.quantiles = .quantiles =
Quantiles{ Quantiles{
@ -1010,9 +1010,9 @@ int main(int argc, char *argv[]) {
} }
fmt::println( fmt::println(
"engine=rocksdb workload={} mode={} durability={} threads={} ops={} err={} qps={} p99_us={} result_file={}", "engine=rocksdb workload={} mode={} durability={} threads={} total_op={} ok_op={} err_op={} ops={} p99_us={} result_file={}",
row.workload_id, row.mode, durability_str(row.durability_mode), row.threads, row.total_ops, row.err_ops, row.workload_id, row.mode, durability_str(row.durability_mode), row.threads, row.total_op, row.ok_op,
static_cast<uint64_t>(row.ops), row.quantiles.p99_us, args.result_file); row.err_op, static_cast<uint64_t>(row.ops), row.quantiles.p99_us, args.result_file);
delete handle; delete handle;
delete db; delete db;

View File

@ -18,12 +18,21 @@ def main() -> int:
parser.add_argument( parser.add_argument(
"--filter-errors", "--filter-errors",
action="store_true", action="store_true",
help="Only compare rows with err_ops == 0 (default: include all rows)", help="Only compare rows with err_op == 0 (default: include all rows)",
) )
args = parser.parse_args() args = parser.parse_args()
df = pd.read_csv(args.csv_path) df = pd.read_csv(args.csv_path)
legacy_columns = {
"total_ops": "total_op",
"ok_ops": "ok_op",
"err_ops": "err_op",
}
for legacy, current in legacy_columns.items():
if legacy in df.columns and current not in df.columns:
df = df.rename(columns={legacy: current})
required = { required = {
"engine", "engine",
"workload_id", "workload_id",
@ -34,7 +43,7 @@ def main() -> int:
"read_path", "read_path",
"ops", "ops",
"p99_us", "p99_us",
"err_ops", "err_op",
} }
missing = required - set(df.columns) missing = required - set(df.columns)
if missing: if missing:
@ -50,13 +59,13 @@ def main() -> int:
] ]
if args.filter_errors: if args.filter_errors:
base = df[df["err_ops"] == 0].copy() base = df[df["err_op"] == 0].copy()
else: else:
base = df.copy() base = df.copy()
if base.empty: if base.empty:
if args.filter_errors: if args.filter_errors:
print("No rows with err_ops == 0, cannot compare.") print("No rows with err_op == 0, cannot compare.")
else: else:
print("No rows found in csv, cannot compare.") print("No rows found in csv, cannot compare.")
return 0 return 0
@ -64,13 +73,13 @@ def main() -> int:
agg = base.groupby(keys + ["engine"], as_index=False).agg( agg = base.groupby(keys + ["engine"], as_index=False).agg(
ops=("ops", "median"), ops=("ops", "median"),
p99_us=("p99_us", "median"), p99_us=("p99_us", "median"),
err_ops=("err_ops", "median"), err_op=("err_op", "median"),
) )
piv = agg.pivot_table( piv = agg.pivot_table(
index=keys, index=keys,
columns="engine", columns="engine",
values=["ops", "p99_us", "err_ops"], values=["ops", "p99_us", "err_op"],
aggfunc="first", aggfunc="first",
) )
piv.columns = [f"{metric}_{engine}" for metric, engine in piv.columns] piv.columns = [f"{metric}_{engine}" for metric, engine in piv.columns]
@ -81,13 +90,13 @@ def main() -> int:
"ops_rocksdb", "ops_rocksdb",
"p99_us_mace", "p99_us_mace",
"p99_us_rocksdb", "p99_us_rocksdb",
"err_ops_mace", "err_op_mace",
"err_ops_rocksdb", "err_op_rocksdb",
]: ]:
if col not in out.columns: if col not in out.columns:
out[col] = pd.NA out[col] = pd.NA
out["qps_ratio_mace_over_rocksdb"] = ( out["ops_ratio_mace_over_rocksdb"] = (
out["ops_mace"] / out["ops_rocksdb"] out["ops_mace"] / out["ops_rocksdb"]
) )
out["p99_ratio_mace_over_rocksdb"] = out["p99_us_mace"] / out["p99_us_rocksdb"] out["p99_ratio_mace_over_rocksdb"] = out["p99_us_mace"] / out["p99_us_rocksdb"]
@ -95,7 +104,7 @@ def main() -> int:
print(out.to_string(index=False)) print(out.to_string(index=False))
print("\nInterpretation:") print("\nInterpretation:")
print("- qps_ratio_mace_over_rocksdb > 1: mace has higher throughput") print("- ops_ratio_mace_over_rocksdb > 1: mace has higher throughput")
print("- p99_ratio_mace_over_rocksdb < 1: mace has lower p99 latency") print("- p99_ratio_mace_over_rocksdb < 1: mace has lower p99 latency")
return 0 return 0

View File

@ -217,9 +217,9 @@ struct ResultRow {
read_path: ReadPath, read_path: ReadPath,
warmup_secs: u64, warmup_secs: u64,
measure_secs: u64, measure_secs: u64,
total_ops: u64, total_op: u64,
ok_ops: u64, ok_op: u64,
err_ops: u64, err_op: u64,
ops: f64, ops: f64,
quantiles: Quantiles, quantiles: Quantiles,
elapsed_us: u64, elapsed_us: u64,
@ -228,16 +228,16 @@ struct ResultRow {
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
struct ThreadStats { struct ThreadStats {
total_ops: u64, total_op: u64,
err_ops: u64, err_op: u64,
hist: [u64; LAT_BUCKETS], hist: [u64; LAT_BUCKETS],
} }
impl Default for ThreadStats { impl Default for ThreadStats {
fn default() -> Self { fn default() -> Self {
Self { Self {
total_ops: 0, total_op: 0,
err_ops: 0, err_op: 0,
hist: [0; LAT_BUCKETS], hist: [0; LAT_BUCKETS],
} }
} }
@ -488,7 +488,7 @@ fn csv_escape(raw: &str) -> String {
} }
fn result_header() -> &'static str { fn result_header() -> &'static str {
"schema_version,ts_epoch_ms,engine,workload_id,mode,durability_mode,threads,key_size,value_size,prefill_keys,shared_keyspace,distribution,zipf_theta,read_pct,update_pct,scan_pct,scan_len,read_path,warmup_secs,measure_secs,total_ops,ok_ops,err_ops,ops,p50_us,p95_us,p99_us,p999_us,elapsed_us,host,os,arch,kernel,cpu_cores,mem_total_kb,mem_available_kb" "schema_version,ts_epoch_ms,engine,workload_id,mode,durability_mode,threads,key_size,value_size,prefill_keys,shared_keyspace,distribution,zipf_theta,read_pct,update_pct,scan_pct,scan_len,read_path,warmup_secs,measure_secs,total_op,ok_op,err_op,ops,p50_us,p95_us,p99_us,p999_us,elapsed_us,host,os,arch,kernel,cpu_cores,mem_total_kb,mem_available_kb"
} }
fn result_row_csv(row: &ResultRow) -> String { fn result_row_csv(row: &ResultRow) -> String {
@ -513,9 +513,9 @@ fn result_row_csv(row: &ResultRow) -> String {
row.read_path.as_str(), row.read_path.as_str(),
row.warmup_secs, row.warmup_secs,
row.measure_secs, row.measure_secs,
row.total_ops, row.total_op,
row.ok_ops, row.ok_op,
row.err_ops, row.err_op,
row.ops, row.ops,
row.quantiles.p50_us, row.quantiles.p50_us,
row.quantiles.p95_us, row.quantiles.p95_us,
@ -932,13 +932,13 @@ fn main() {
} }
let mut merged_hist = [0u64; LAT_BUCKETS]; let mut merged_hist = [0u64; LAT_BUCKETS];
let mut total_ops = 0u64; let mut total_op = 0u64;
let mut err_ops = 0u64; let mut err_op = 0u64;
for h in handles { for h in handles {
let s = h.join().unwrap(); let s = h.join().unwrap();
total_ops += s.total_ops; total_op += s.total_op;
err_ops += s.err_ops; err_op += s.err_op;
for (i, v) in s.hist.iter().enumerate() { for (i, v) in s.hist.iter().enumerate() {
merged_hist[i] += *v; merged_hist[i] += *v;
} }
@ -950,10 +950,10 @@ fn main() {
let ops = if elapsed_us == 0 { let ops = if elapsed_us == 0 {
0.0 0.0
} else { } else {
(total_ops as f64) * 1_000_000.0 / (elapsed_us as f64) (total_op as f64) * 1_000_000.0 / (elapsed_us as f64)
}; };
let ok_ops = total_ops.saturating_sub(err_ops); let ok_op = total_op.saturating_sub(err_op);
let quantiles = Quantiles { let quantiles = Quantiles {
p50_us: histogram_quantile_us(&merged_hist, 0.50), p50_us: histogram_quantile_us(&merged_hist, 0.50),
@ -982,9 +982,9 @@ fn main() {
read_path, read_path,
warmup_secs: effective_warmup_secs, warmup_secs: effective_warmup_secs,
measure_secs: effective_measure_secs, measure_secs: effective_measure_secs,
total_ops, total_op,
ok_ops, ok_op,
err_ops, err_op,
ops, ops,
quantiles, quantiles,
elapsed_us, elapsed_us,
@ -997,14 +997,14 @@ fn main() {
} }
println!( println!(
"engine=mace workload={} mode={} durability={} threads={} total_ops={} ok_ops={} err_ops={} ops={:.2} p99_us={} result_file={}", "engine=mace workload={} mode={} durability={} threads={} total_op={} ok_op={} err_op={} ops={:.2} p99_us={} result_file={}",
row.workload_id, row.workload_id,
row.mode, row.mode,
row.durability_mode.as_str(), row.durability_mode.as_str(),
row.threads, row.threads,
row.total_ops, row.total_op,
row.ok_ops, row.ok_op,
row.err_ops, row.err_op,
row.ops, row.ops,
row.quantiles.p99_us, row.quantiles.p99_us,
args.result_file args.result_file
@ -1172,9 +1172,9 @@ fn run_one_op(
}; };
if let Some(stats) = stats { if let Some(stats) = stats {
stats.total_ops += 1; stats.total_op += 1;
if !ok { if !ok {
stats.err_ops += 1; stats.err_op += 1;
} }
if let Some(start) = start { if let Some(start) = start {
let us = start.elapsed().as_micros() as u64; let us = start.elapsed().as_micros() as u64;