From 82a845429b40370c5bec917db39d2a67b833ca45 Mon Sep 17 00:00:00 2001 From: abbycin Date: Mon, 9 Mar 2026 18:37:36 +0800 Subject: [PATCH] rename ops --- README.md | 2 +- docs/repro.md | 6 +- rocksdb/main.cpp | 250 +++++++++++------------------------- scripts/compare_baseline.py | 26 ++-- scripts/fast_test.py | 8 +- scripts/phase1_eval.py | 8 +- scripts/phase2_report.py | 6 +- scripts/phase3_report.py | 6 +- scripts/plot.py | 4 +- src/main.rs | 45 ++++--- 10 files changed, 136 insertions(+), 225 deletions(-) diff --git a/README.md b/README.md index b40518c..5131882 100644 --- a/README.md +++ b/README.md @@ -46,7 +46,7 @@ mkdir -p "${KV_BENCH_STORAGE_ROOT}/basic_mace" "${KV_BENCH_STORAGE_ROOT}/basic_r ## What Is Compared - Comparison unit: rows with identical `workload_id`, `threads`, `key_size`, `value_size`, `durability_mode`, `read_path` - Fairness rule for read-heavy workloads: `get`, `scan`, and `W1`-`W6` run one GC/compaction pass after prefill and before warmup/measurement, so RocksDB is not compared with GC artificially disabled while reads may have to touch multiple SSTs -- Throughput metric: workload-level `ops_per_sec` (higher is better) +- Throughput metric: workload-level `ops` (higher is better) - Tail latency metric: workload-level `p99_us` (lower is better) - This is the workload-level p99 of all operations executed in that row, not per-op-type p99 diff --git a/docs/repro.md b/docs/repro.md index 4d44bc9..89f9312 100644 --- a/docs/repro.md +++ b/docs/repro.md @@ -177,9 +177,9 @@ Unified schema columns include: - `workload_id` (`W1..W6`) - `durability_mode` (`relaxed` / `durable`) - `threads,key_size,value_size,prefill_keys` -- `ops_per_sec` +- `ops` +- `total_ops,ok_ops,err_ops` - `p50_us,p95_us,p99_us,p999_us` -- `error_ops` - `read_path` ## 7. Report Commands @@ -207,4 +207,4 @@ Only compare rows with identical: - `read_path` - read-heavy workload rows are expected to include the pre-run GC/compaction pass described above -If `error_ops > 0`, investigate that case before drawing conclusions. +If `err_ops > 0`, investigate that case before drawing conclusions. diff --git a/rocksdb/main.cpp b/rocksdb/main.cpp index 0b47574..61bb91c 100644 --- a/rocksdb/main.cpp +++ b/rocksdb/main.cpp @@ -137,7 +137,7 @@ struct Quantiles { struct ThreadStats { uint64_t total_ops = 0; - uint64_t error_ops = 0; + uint64_t err_ops = 0; std::array hist{}; }; @@ -161,8 +161,9 @@ struct ResultRow { uint64_t warmup_secs; uint64_t measure_secs; uint64_t total_ops; - uint64_t error_ops; - double ops_per_sec; + uint64_t ok_ops; + uint64_t err_ops; + double ops; Quantiles quantiles; uint64_t elapsed_us; MachineMeta meta; @@ -283,12 +284,9 @@ static std::optional parse_workload(const Args &args, std::string return std::nullopt; } -static bool workload_runs_gc(const WorkloadSpec &spec) { - return spec.requires_prefill; -} +static bool workload_runs_gc(const WorkloadSpec &spec) { return spec.requires_prefill; } -static void run_prefill_gc(rocksdb::OptimisticTransactionDB *db, - rocksdb::ColumnFamilyHandle *handle) { +static void run_prefill_gc(rocksdb::OptimisticTransactionDB *db, rocksdb::ColumnFamilyHandle *handle) { require_ok(db->EnableAutoCompaction({handle}), "enable auto compaction"); rocksdb::FlushOptions flush_options; @@ -333,13 +331,9 @@ static std::string make_thread_key(size_t tid, size_t local_id, size_t key_size) return key; } -static std::string make_shared_prefix(size_t id) { - return fmt::format("s{:03x}_", id % kPrefixGroups); -} +static std::string make_shared_prefix(size_t id) { return fmt::format("s{:03x}_", id % kPrefixGroups); } -static std::string make_thread_prefix(size_t tid) { - return fmt::format("t{:03x}_", tid % kPrefixGroups); -} +static std::string make_thread_prefix(size_t tid) { return fmt::format("t{:03x}_", tid % kPrefixGroups); } static size_t latency_bucket(uint64_t us) { auto v = std::max(us, 1); @@ -412,7 +406,7 @@ static MachineMeta gather_machine_meta() { std::snprintf(host_buf, sizeof(host_buf), "unknown"); } - struct utsname uts {}; + struct utsname uts{}; std::string kernel = "unknown"; std::string os = "unknown"; std::string arch = "unknown"; @@ -444,46 +438,23 @@ static std::string csv_escape(const std::string &v) { } static const char *result_header() { - return "schema_version,ts_epoch_ms,engine,workload_id,mode,durability_mode,threads,key_size,value_size,prefill_keys,shared_keyspace,distribution,zipf_theta,read_pct,update_pct,scan_pct,scan_len,read_path,warmup_secs,measure_secs,total_ops,error_ops,ops_per_sec,p50_us,p95_us,p99_us,p999_us,elapsed_us,host,os,arch,kernel,cpu_cores,mem_total_kb,mem_available_kb"; + return "schema_version,ts_epoch_ms,engine,workload_id,mode,durability_mode,threads,key_size,value_size,prefill_" + "keys,shared_keyspace,distribution,zipf_theta,read_pct,update_pct,scan_pct,scan_len,read_path,warmup_secs," + "measure_secs,total_ops,ok_ops,err_ops,ops,p50_us,p95_us,p99_us,p999_us,elapsed_us,host,os,arch,kernel,cpu_" + "cores,mem_total_kb,mem_available_kb"; } static std::string result_row_csv(const ResultRow &r) { - return fmt::format( - "v2,{},{},{},{},{},{},{},{},{},{},{},{:.4},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{}", - r.ts_epoch_ms, - "rocksdb", - csv_escape(r.workload_id), - csv_escape(r.mode), - durability_str(r.durability_mode), - r.threads, - r.key_size, - r.value_size, - r.prefill_keys, - r.shared_keyspace, - distribution_str(r.distribution), - r.zipf_theta, - r.read_pct, - r.update_pct, - r.scan_pct, - r.scan_len, - read_path_str(r.read_path), - r.warmup_secs, - r.measure_secs, - r.total_ops, - r.error_ops, - static_cast(r.ops_per_sec), - r.quantiles.p50_us, - r.quantiles.p95_us, - r.quantiles.p99_us, - r.quantiles.p999_us, - r.elapsed_us, - csv_escape(r.meta.host), - csv_escape(r.meta.os), - csv_escape(r.meta.arch), - csv_escape(r.meta.kernel), - r.meta.cpu_cores, - r.meta.mem_total_kb, - r.meta.mem_available_kb); + return fmt::format("v2,{},{},{},{},{},{},{},{},{},{},{},{:.4},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{}" + ",{},{},{},{}", + r.ts_epoch_ms, "rocksdb", csv_escape(r.workload_id), csv_escape(r.mode), + durability_str(r.durability_mode), r.threads, r.key_size, r.value_size, r.prefill_keys, + r.shared_keyspace, distribution_str(r.distribution), r.zipf_theta, r.read_pct, r.update_pct, + r.scan_pct, r.scan_len, read_path_str(r.read_path), r.warmup_secs, r.measure_secs, r.total_ops, + r.ok_ops, r.err_ops, static_cast(r.ops), r.quantiles.p50_us, r.quantiles.p95_us, + r.quantiles.p99_us, r.quantiles.p999_us, r.elapsed_us, csv_escape(r.meta.host), + csv_escape(r.meta.os), csv_escape(r.meta.arch), csv_escape(r.meta.kernel), r.meta.cpu_cores, + r.meta.mem_total_kb, r.meta.mem_available_kb); } static bool append_result_row(const std::string &path, const ResultRow &row) { @@ -511,12 +482,8 @@ static size_t sample_zipf_like(std::mt19937_64 &rng, size_t n, double theta) { return std::min(idx, n - 1); } -static std::optional pick_key_id(std::mt19937_64 &rng, - Distribution distribution, - double zipf_theta, - bool shared_keyspace, - size_t prefill_keys, - size_t local_key_len) { +static std::optional pick_key_id(std::mt19937_64 &rng, Distribution distribution, double zipf_theta, + bool shared_keyspace, size_t prefill_keys, size_t local_key_len) { if (shared_keyspace) { if (prefill_keys == 0) { return std::nullopt; @@ -569,24 +536,11 @@ static std::string find_upper_bound(const std::string &prefix) { return ""; } -static bool run_one_op(OpKind op, - rocksdb::OptimisticTransactionDB *db, - rocksdb::ColumnFamilyHandle *handle, - const rocksdb::WriteOptions &wopt, - const std::string &value, - std::mt19937_64 &rng, - const WorkloadSpec &spec, - Distribution distribution, - double zipf_theta, - ReadPath read_path, - size_t key_size, - size_t scan_len, - bool shared_keyspace, - size_t prefill_keys, - size_t local_key_len, - size_t tid, - std::atomic &insert_counter, - size_t &local_insert_idx, +static bool run_one_op(OpKind op, rocksdb::OptimisticTransactionDB *db, rocksdb::ColumnFamilyHandle *handle, + const rocksdb::WriteOptions &wopt, const std::string &value, std::mt19937_64 &rng, + const WorkloadSpec &spec, Distribution distribution, double zipf_theta, ReadPath read_path, + size_t key_size, size_t scan_len, bool shared_keyspace, size_t prefill_keys, + size_t local_key_len, size_t tid, std::atomic &insert_counter, size_t &local_insert_idx, std::optional fixed_insert_id) { if (op == OpKind::Read) { auto maybe_id = pick_key_id(rng, distribution, zipf_theta, shared_keyspace, prefill_keys, local_key_len); @@ -594,9 +548,8 @@ static bool run_one_op(OpKind op, return false; } - auto key = shared_keyspace - ? make_shared_key(maybe_id.value(), key_size) - : make_thread_key(tid, maybe_id.value(), key_size); + auto key = shared_keyspace ? make_shared_key(maybe_id.value(), key_size) + : make_thread_key(tid, maybe_id.value(), key_size); auto ropt = rocksdb::ReadOptions(); std::string out; @@ -791,8 +744,8 @@ int main(int argc, char *argv[]) { } auto prefill_keys = workload_spec.requires_prefill - ? (args.prefill_keys > 0 ? args.prefill_keys : std::max(args.iterations, 1)) - : args.prefill_keys; + ? (args.prefill_keys > 0 ? args.prefill_keys : std::max(args.iterations, 1)) + : args.prefill_keys; if (workload_spec.requires_prefill && prefill_keys == 0) { fmt::println(stderr, "prefill_keys must be > 0 for read/mixed/scan workloads"); @@ -821,12 +774,11 @@ int main(int argc, char *argv[]) { options.create_if_missing = true; options.allow_concurrent_memtable_write = true; options.enable_pipelined_write = true; - options.max_background_flushes = 8; - options.env->SetBackgroundThreads(8, rocksdb::Env::Priority::HIGH); - options.env->SetBackgroundThreads(8, rocksdb::Env::Priority::LOW); auto wopt = rocksdb::WriteOptions(); - wopt.no_slowdown = true; + // allow backpressure under heavy write load so throughput reflects completed operations + // instead of inflating qps with fast-failed requests + wopt.no_slowdown = false; wopt.sync = (durability.value() == DurabilityMode::Durable); rocksdb::OptimisticTransactionDB *db = nullptr; @@ -890,8 +842,7 @@ int main(int argc, char *argv[]) { auto mark_measure_start = [&measure_start_ns]() { uint64_t expected = 0; auto now_ns = steady_now_ns(); - (void) measure_start_ns.compare_exchange_strong( - expected, now_ns, std::memory_order_relaxed); + (void) measure_start_ns.compare_exchange_strong(expected, now_ns, std::memory_order_relaxed); }; for (size_t tid = 0; tid < args.threads; ++tid) { @@ -913,29 +864,13 @@ int main(int argc, char *argv[]) { ready_barrier.arrive_and_wait(); if (effective_warmup_secs > 0) { - auto warmup_deadline = - std::chrono::steady_clock::now() + std::chrono::seconds(effective_warmup_secs); + auto warmup_deadline = std::chrono::steady_clock::now() + std::chrono::seconds(effective_warmup_secs); while (std::chrono::steady_clock::now() < warmup_deadline) { auto op = pick_op_kind(rng, workload_spec); - (void) run_one_op(op, - db, - handle, - wopt, - value, - rng, - workload_spec, - workload_spec.distribution, - args.zipf_theta, - read_path.value(), - args.key_size, - workload_spec.scan_len, - args.shared_keyspace, - prefill_keys, - local_key_len, - tid, - insert_counter, - local_insert_idx, - std::nullopt); + (void) run_one_op(op, db, handle, wopt, value, rng, workload_spec, workload_spec.distribution, + args.zipf_theta, read_path.value(), args.key_size, workload_spec.scan_len, + args.shared_keyspace, prefill_keys, local_key_len, tid, insert_counter, + local_insert_idx, std::nullopt); } } @@ -945,39 +880,24 @@ int main(int argc, char *argv[]) { auto record = [&](bool ok, uint64_t us) { stats.total_ops += 1; if (!ok) { - stats.error_ops += 1; + stats.err_ops += 1; } auto b = latency_bucket(us); stats.hist[b] += 1; }; if (effective_measure_secs > 0) { - auto deadline = - std::chrono::steady_clock::now() + std::chrono::seconds(effective_measure_secs); + auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(effective_measure_secs); while (std::chrono::steady_clock::now() < deadline) { auto op = pick_op_kind(rng, workload_spec); auto started = std::chrono::steady_clock::now(); - auto ok = run_one_op(op, - db, - handle, - wopt, - value, - rng, - workload_spec, - workload_spec.distribution, - args.zipf_theta, - read_path.value(), - args.key_size, - workload_spec.scan_len, - args.shared_keyspace, - prefill_keys, - local_key_len, - tid, - insert_counter, - local_insert_idx, - std::nullopt); + auto ok = run_one_op(op, db, handle, wopt, value, rng, workload_spec, workload_spec.distribution, + args.zipf_theta, read_path.value(), args.key_size, workload_spec.scan_len, + args.shared_keyspace, prefill_keys, local_key_len, tid, insert_counter, + local_insert_idx, std::nullopt); auto us = static_cast(std::chrono::duration_cast( - std::chrono::steady_clock::now() - started).count()); + std::chrono::steady_clock::now() - started) + .count()); record(ok, us); } } else { @@ -985,31 +905,16 @@ int main(int argc, char *argv[]) { auto op = workload_spec.insert_only ? OpKind::Update : pick_op_kind(rng, workload_spec); std::optional fixed_insert_id = std::nullopt; if (workload_spec.insert_only) { - fixed_insert_id = - args.shared_keyspace ? (local_op_start + i) : i; + fixed_insert_id = args.shared_keyspace ? (local_op_start + i) : i; } auto started = std::chrono::steady_clock::now(); - auto ok = run_one_op(op, - db, - handle, - wopt, - value, - rng, - workload_spec, - workload_spec.distribution, - args.zipf_theta, - read_path.value(), - args.key_size, - workload_spec.scan_len, - args.shared_keyspace, - prefill_keys, - local_key_len, - tid, - insert_counter, - local_insert_idx, - fixed_insert_id); + auto ok = run_one_op(op, db, handle, wopt, value, rng, workload_spec, workload_spec.distribution, + args.zipf_theta, read_path.value(), args.key_size, workload_spec.scan_len, + args.shared_keyspace, prefill_keys, local_key_len, tid, insert_counter, + local_insert_idx, fixed_insert_id); auto us = static_cast(std::chrono::duration_cast( - std::chrono::steady_clock::now() - started).count()); + std::chrono::steady_clock::now() - started) + .count()); record(ok, us); } } @@ -1031,20 +936,20 @@ int main(int argc, char *argv[]) { } uint64_t elapsed_us = (measure_end_ns - measure_begin_ns) / 1000; uint64_t total_ops = 0; - uint64_t error_ops = 0; + uint64_t err_ops = 0; std::array merged_hist{}; for (const auto &s: thread_stats) { total_ops += s.total_ops; - error_ops += s.error_ops; + err_ops += s.err_ops; for (size_t i = 0; i < merged_hist.size(); ++i) { merged_hist[i] += s.hist[i]; } } - auto ops_per_sec = elapsed_us == 0 - ? 0.0 - : (static_cast(total_ops) * 1'000'000.0 / static_cast(elapsed_us)); + auto ops = elapsed_us == 0 ? 0.0 : (static_cast(total_ops) * 1'000'000.0 / static_cast(elapsed_us)); + + uint64_t ok_ops = total_ops >= err_ops ? (total_ops - err_ops) : 0; auto row = ResultRow{ .ts_epoch_ms = now_epoch_ms(), @@ -1066,14 +971,16 @@ int main(int argc, char *argv[]) { .warmup_secs = effective_warmup_secs, .measure_secs = effective_measure_secs, .total_ops = total_ops, - .error_ops = error_ops, - .ops_per_sec = ops_per_sec, - .quantiles = Quantiles{ - .p50_us = histogram_quantile_us(merged_hist, 0.50), - .p95_us = histogram_quantile_us(merged_hist, 0.95), - .p99_us = histogram_quantile_us(merged_hist, 0.99), - .p999_us = histogram_quantile_us(merged_hist, 0.999), - }, + .ok_ops = ok_ops, + .err_ops = err_ops, + .ops = ops, + .quantiles = + Quantiles{ + .p50_us = histogram_quantile_us(merged_hist, 0.50), + .p95_us = histogram_quantile_us(merged_hist, 0.95), + .p99_us = histogram_quantile_us(merged_hist, 0.99), + .p999_us = histogram_quantile_us(merged_hist, 0.999), + }, .elapsed_us = elapsed_us, .meta = gather_machine_meta(), }; @@ -1085,15 +992,8 @@ int main(int argc, char *argv[]) { fmt::println( "engine=rocksdb workload={} mode={} durability={} threads={} ops={} err={} qps={} p99_us={} result_file={}", - row.workload_id, - row.mode, - durability_str(row.durability_mode), - row.threads, - row.total_ops, - row.error_ops, - static_cast(row.ops_per_sec), - row.quantiles.p99_us, - args.result_file); + row.workload_id, row.mode, durability_str(row.durability_mode), row.threads, row.total_ops, row.err_ops, + static_cast(row.ops), row.quantiles.p99_us, args.result_file); delete handle; delete db; diff --git a/scripts/compare_baseline.py b/scripts/compare_baseline.py index de10864..7277cc3 100644 --- a/scripts/compare_baseline.py +++ b/scripts/compare_baseline.py @@ -18,7 +18,7 @@ def main() -> int: parser.add_argument( "--filter-errors", action="store_true", - help="Only compare rows with error_ops == 0 (default: include all rows)", + help="Only compare rows with err_ops == 0 (default: include all rows)", ) args = parser.parse_args() @@ -32,9 +32,9 @@ def main() -> int: "value_size", "durability_mode", "read_path", - "ops_per_sec", + "ops", "p99_us", - "error_ops", + "err_ops", } missing = required - set(df.columns) if missing: @@ -50,45 +50,45 @@ def main() -> int: ] if args.filter_errors: - base = df[df["error_ops"] == 0].copy() + base = df[df["err_ops"] == 0].copy() else: base = df.copy() if base.empty: if args.filter_errors: - print("No rows with error_ops == 0, cannot compare.") + print("No rows with err_ops == 0, cannot compare.") else: print("No rows found in csv, cannot compare.") return 0 agg = base.groupby(keys + ["engine"], as_index=False).agg( - ops_per_sec=("ops_per_sec", "median"), + ops=("ops", "median"), p99_us=("p99_us", "median"), - error_ops=("error_ops", "median"), + err_ops=("err_ops", "median"), ) piv = agg.pivot_table( index=keys, columns="engine", - values=["ops_per_sec", "p99_us", "error_ops"], + values=["ops", "p99_us", "err_ops"], aggfunc="first", ) piv.columns = [f"{metric}_{engine}" for metric, engine in piv.columns] out = piv.reset_index() for col in [ - "ops_per_sec_mace", - "ops_per_sec_rocksdb", + "ops_mace", + "ops_rocksdb", "p99_us_mace", "p99_us_rocksdb", - "error_ops_mace", - "error_ops_rocksdb", + "err_ops_mace", + "err_ops_rocksdb", ]: if col not in out.columns: out[col] = pd.NA out["qps_ratio_mace_over_rocksdb"] = ( - out["ops_per_sec_mace"] / out["ops_per_sec_rocksdb"] + out["ops_mace"] / out["ops_rocksdb"] ) out["p99_ratio_mace_over_rocksdb"] = out["p99_us_mace"] / out["p99_us_rocksdb"] out = out.sort_values(keys) diff --git a/scripts/fast_test.py b/scripts/fast_test.py index 6cf71a6..a16db0c 100644 --- a/scripts/fast_test.py +++ b/scripts/fast_test.py @@ -180,7 +180,7 @@ def plot_results( thread_points: Sequence[int], ) -> list[Path]: df = pd.read_csv(result_csv) - required = {"engine", "mode", "threads", "key_size", "value_size", "ops_per_sec"} + required = {"engine", "mode", "threads", "key_size", "value_size", "ops"} missing = required - set(df.columns) if missing: raise ValueError(f"Missing required columns in csv: {sorted(missing)}") @@ -191,7 +191,7 @@ def plot_results( grouped = ( df.groupby(["engine", "mode", "key_size", "value_size", "threads"], as_index=False)[ - "ops_per_sec" + "ops" ] .mean() .sort_values(["engine", "mode", "key_size", "value_size", "threads"]) @@ -209,7 +209,7 @@ def plot_results( continue plt.figure(figsize=(16, 10)) - y_max = float(mode_df["ops_per_sec"].max()) if not mode_df.empty else 0.0 + y_max = float(mode_df["ops"].max()) if not mode_df.empty else 0.0 for engine in ENGINE_ORDER: for key_size, value_size in KV_PROFILES: @@ -222,7 +222,7 @@ def plot_results( continue x = sub["threads"].tolist() - y = sub["ops_per_sec"].tolist() + y = sub["ops"].tolist() label = ( f"{engine} ({format_bytes(key_size)}/{format_bytes(value_size)})" ) diff --git a/scripts/phase1_eval.py b/scripts/phase1_eval.py index 5158948..23737e0 100755 --- a/scripts/phase1_eval.py +++ b/scripts/phase1_eval.py @@ -25,7 +25,7 @@ def main() -> int: "key_size", "value_size", "threads", - "ops_per_sec", + "ops", "p99_us", } missing = needed - set(df.columns) @@ -41,10 +41,10 @@ def main() -> int: agg = ( sub.groupby(grp_cols) .agg( - repeats=("ops_per_sec", "count"), - throughput_cv=("ops_per_sec", cv), + repeats=("ops", "count"), + throughput_cv=("ops", cv), p99_cv=("p99_us", cv), - throughput_median=("ops_per_sec", "median"), + throughput_median=("ops", "median"), p99_median=("p99_us", "median"), ) .reset_index() diff --git a/scripts/phase2_report.py b/scripts/phase2_report.py index 027e514..9a27cb4 100755 --- a/scripts/phase2_report.py +++ b/scripts/phase2_report.py @@ -32,7 +32,7 @@ def main() -> int: "value_size", "prefill_keys", "threads", - "ops_per_sec", + "ops", "p95_us", "p99_us", } @@ -51,8 +51,8 @@ def main() -> int: summary = ( sub.groupby(grp_cols) .agg( - repeats=("ops_per_sec", "count"), - throughput_median=("ops_per_sec", "median"), + repeats=("ops", "count"), + throughput_median=("ops", "median"), p95_median=("p95_us", "median"), p99_median=("p99_us", "median"), ) diff --git a/scripts/phase3_report.py b/scripts/phase3_report.py index c918857..a13e891 100755 --- a/scripts/phase3_report.py +++ b/scripts/phase3_report.py @@ -17,7 +17,7 @@ def main() -> int: "workload_id", "threads", "durability_mode", - "ops_per_sec", + "ops", "p99_us", } missing = needed - set(df.columns) @@ -39,8 +39,8 @@ def main() -> int: base = ( sub.groupby(["engine", "workload_id", "threads", "durability_mode"]) .agg( - repeats=("ops_per_sec", "count"), - throughput_median=("ops_per_sec", "median"), + repeats=("ops", "count"), + throughput_median=("ops", "median"), p99_median=("p99_us", "median"), ) .reset_index() diff --git a/scripts/plot.py b/scripts/plot.py index c7b84cd..5c56389 100644 --- a/scripts/plot.py +++ b/scripts/plot.py @@ -24,7 +24,7 @@ def main() -> int: "threads", "key_size", "value_size", - "ops_per_sec", + "ops", "p99_us", } missing = required - set(df.columns) @@ -48,7 +48,7 @@ def main() -> int: if sub.empty: continue - for metric, ylabel in (("ops_per_sec", "OPS/s"), ("p99_us", "P99 Latency (us)")): + for metric, ylabel in (("ops", "OPS/s"), ("p99_us", "P99 Latency (us)")): plt.figure(figsize=(12, 7)) for workload in sorted(sub["workload_id"].unique()): wdf = sub[sub["workload_id"] == workload].sort_values("threads") diff --git a/src/main.rs b/src/main.rs index 80b6c8e..b2cc01a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -218,8 +218,9 @@ struct ResultRow { warmup_secs: u64, measure_secs: u64, total_ops: u64, - error_ops: u64, - ops_per_sec: f64, + ok_ops: u64, + err_ops: u64, + ops: f64, quantiles: Quantiles, elapsed_us: u64, meta: MachineMeta, @@ -228,7 +229,7 @@ struct ResultRow { #[derive(Clone, Debug)] struct ThreadStats { total_ops: u64, - error_ops: u64, + err_ops: u64, hist: [u64; LAT_BUCKETS], } @@ -236,7 +237,7 @@ impl Default for ThreadStats { fn default() -> Self { Self { total_ops: 0, - error_ops: 0, + err_ops: 0, hist: [0; LAT_BUCKETS], } } @@ -487,12 +488,12 @@ fn csv_escape(raw: &str) -> String { } fn result_header() -> &'static str { - "schema_version,ts_epoch_ms,engine,workload_id,mode,durability_mode,threads,key_size,value_size,prefill_keys,shared_keyspace,distribution,zipf_theta,read_pct,update_pct,scan_pct,scan_len,read_path,warmup_secs,measure_secs,total_ops,error_ops,ops_per_sec,p50_us,p95_us,p99_us,p999_us,elapsed_us,host,os,arch,kernel,cpu_cores,mem_total_kb,mem_available_kb" + "schema_version,ts_epoch_ms,engine,workload_id,mode,durability_mode,threads,key_size,value_size,prefill_keys,shared_keyspace,distribution,zipf_theta,read_pct,update_pct,scan_pct,scan_len,read_path,warmup_secs,measure_secs,total_ops,ok_ops,err_ops,ops,p50_us,p95_us,p99_us,p999_us,elapsed_us,host,os,arch,kernel,cpu_cores,mem_total_kb,mem_available_kb" } fn result_row_csv(row: &ResultRow) -> String { format!( - "v2,{},{},{},{},{},{},{},{},{},{},{},{:.4},{},{},{},{},{},{},{},{},{},{:.3},{},{},{},{},{},{},{},{},{},{},{},{}", + "v2,{},{},{},{},{},{},{},{},{},{},{},{:.4},{},{},{},{},{},{},{},{},{},{},{:.3},{},{},{},{},{},{},{},{},{},{},{},{}", row.ts_epoch_ms, row.engine, csv_escape(&row.workload_id), @@ -513,8 +514,9 @@ fn result_row_csv(row: &ResultRow) -> String { row.warmup_secs, row.measure_secs, row.total_ops, - row.error_ops, - row.ops_per_sec, + row.ok_ops, + row.err_ops, + row.ops, row.quantiles.p50_us, row.quantiles.p95_us, row.quantiles.p99_us, @@ -712,7 +714,12 @@ fn main() { opt.cache_capacity = 3 << 30; opt.data_file_size = 64 << 20; opt.max_log_size = 1 << 30; + opt.wal_buffer_size = 64 << 20; + opt.wal_file_size = 128 << 20; opt.default_arenas = 128; + opt.gc_timeout = 600 * 1000; + opt.gc_eager = false; + opt.data_garbage_ratio = 50; opt.tmp_store = cleanup; let db = Mace::new(opt.validate().unwrap()).unwrap(); @@ -926,12 +933,12 @@ fn main() { let mut merged_hist = [0u64; LAT_BUCKETS]; let mut total_ops = 0u64; - let mut error_ops = 0u64; + let mut err_ops = 0u64; for h in handles { let s = h.join().unwrap(); total_ops += s.total_ops; - error_ops += s.error_ops; + err_ops += s.err_ops; for (i, v) in s.hist.iter().enumerate() { merged_hist[i] += *v; } @@ -940,12 +947,14 @@ fn main() { let measure_end = Instant::now(); let measure_started = (*measure_start.lock().unwrap()).unwrap_or(measure_end); let elapsed_us = measure_end.duration_since(measure_started).as_micros() as u64; - let ops_per_sec = if elapsed_us == 0 { + let ops = if elapsed_us == 0 { 0.0 } else { (total_ops as f64) * 1_000_000.0 / (elapsed_us as f64) }; + let ok_ops = total_ops.saturating_sub(err_ops); + let quantiles = Quantiles { p50_us: histogram_quantile_us(&merged_hist, 0.50), p95_us: histogram_quantile_us(&merged_hist, 0.95), @@ -974,8 +983,9 @@ fn main() { warmup_secs: effective_warmup_secs, measure_secs: effective_measure_secs, total_ops, - error_ops, - ops_per_sec, + ok_ops, + err_ops, + ops, quantiles, elapsed_us, meta: gather_machine_meta(), @@ -987,14 +997,15 @@ fn main() { } println!( - "engine=mace workload={} mode={} durability={} threads={} ops={} err={} qps={:.2} p99_us={} result_file={}", + "engine=mace workload={} mode={} durability={} threads={} total_ops={} ok_ops={} err_ops={} ops={:.2} p99_us={} result_file={}", row.workload_id, row.mode, row.durability_mode.as_str(), row.threads, row.total_ops, - row.error_ops, - row.ops_per_sec, + row.ok_ops, + row.err_ops, + row.ops, row.quantiles.p99_us, args.result_file ); @@ -1159,7 +1170,7 @@ fn run_one_op( if let Some(stats) = stats { stats.total_ops += 1; if !ok { - stats.error_ops += 1; + stats.err_ops += 1; } if let Some(start) = start { let us = start.elapsed().as_micros() as u64;