Compare commits

..

2 Commits

Author SHA1 Message Date
82a845429b
rename ops 2026-03-09 18:37:36 +08:00
563f480643 align mace writes with rocksdb put semantics 2026-03-09 15:58:12 +08:00
10 changed files with 138 additions and 231 deletions

View File

@ -46,7 +46,7 @@ mkdir -p "${KV_BENCH_STORAGE_ROOT}/basic_mace" "${KV_BENCH_STORAGE_ROOT}/basic_r
## What Is Compared ## What Is Compared
- Comparison unit: rows with identical `workload_id`, `threads`, `key_size`, `value_size`, `durability_mode`, `read_path` - Comparison unit: rows with identical `workload_id`, `threads`, `key_size`, `value_size`, `durability_mode`, `read_path`
- Fairness rule for read-heavy workloads: `get`, `scan`, and `W1`-`W6` run one GC/compaction pass after prefill and before warmup/measurement, so RocksDB is not compared with GC artificially disabled while reads may have to touch multiple SSTs - Fairness rule for read-heavy workloads: `get`, `scan`, and `W1`-`W6` run one GC/compaction pass after prefill and before warmup/measurement, so RocksDB is not compared with GC artificially disabled while reads may have to touch multiple SSTs
- Throughput metric: workload-level `ops_per_sec` (higher is better) - Throughput metric: workload-level `ops` (higher is better)
- Tail latency metric: workload-level `p99_us` (lower is better) - Tail latency metric: workload-level `p99_us` (lower is better)
- This is the workload-level p99 of all operations executed in that row, not per-op-type p99 - This is the workload-level p99 of all operations executed in that row, not per-op-type p99

View File

@ -177,9 +177,9 @@ Unified schema columns include:
- `workload_id` (`W1..W6`) - `workload_id` (`W1..W6`)
- `durability_mode` (`relaxed` / `durable`) - `durability_mode` (`relaxed` / `durable`)
- `threads,key_size,value_size,prefill_keys` - `threads,key_size,value_size,prefill_keys`
- `ops_per_sec` - `ops`
- `total_ops,ok_ops,err_ops`
- `p50_us,p95_us,p99_us,p999_us` - `p50_us,p95_us,p99_us,p999_us`
- `error_ops`
- `read_path` - `read_path`
## 7. Report Commands ## 7. Report Commands
@ -207,4 +207,4 @@ Only compare rows with identical:
- `read_path` - `read_path`
- read-heavy workload rows are expected to include the pre-run GC/compaction pass described above - read-heavy workload rows are expected to include the pre-run GC/compaction pass described above
If `error_ops > 0`, investigate that case before drawing conclusions. If `err_ops > 0`, investigate that case before drawing conclusions.

View File

@ -137,7 +137,7 @@ struct Quantiles {
struct ThreadStats { struct ThreadStats {
uint64_t total_ops = 0; uint64_t total_ops = 0;
uint64_t error_ops = 0; uint64_t err_ops = 0;
std::array<uint64_t, kLatencyBuckets> hist{}; std::array<uint64_t, kLatencyBuckets> hist{};
}; };
@ -161,8 +161,9 @@ struct ResultRow {
uint64_t warmup_secs; uint64_t warmup_secs;
uint64_t measure_secs; uint64_t measure_secs;
uint64_t total_ops; uint64_t total_ops;
uint64_t error_ops; uint64_t ok_ops;
double ops_per_sec; uint64_t err_ops;
double ops;
Quantiles quantiles; Quantiles quantiles;
uint64_t elapsed_us; uint64_t elapsed_us;
MachineMeta meta; MachineMeta meta;
@ -283,12 +284,9 @@ static std::optional<WorkloadSpec> parse_workload(const Args &args, std::string
return std::nullopt; return std::nullopt;
} }
static bool workload_runs_gc(const WorkloadSpec &spec) { static bool workload_runs_gc(const WorkloadSpec &spec) { return spec.requires_prefill; }
return spec.requires_prefill;
}
static void run_prefill_gc(rocksdb::OptimisticTransactionDB *db, static void run_prefill_gc(rocksdb::OptimisticTransactionDB *db, rocksdb::ColumnFamilyHandle *handle) {
rocksdb::ColumnFamilyHandle *handle) {
require_ok(db->EnableAutoCompaction({handle}), "enable auto compaction"); require_ok(db->EnableAutoCompaction({handle}), "enable auto compaction");
rocksdb::FlushOptions flush_options; rocksdb::FlushOptions flush_options;
@ -333,13 +331,9 @@ static std::string make_thread_key(size_t tid, size_t local_id, size_t key_size)
return key; return key;
} }
static std::string make_shared_prefix(size_t id) { static std::string make_shared_prefix(size_t id) { return fmt::format("s{:03x}_", id % kPrefixGroups); }
return fmt::format("s{:03x}_", id % kPrefixGroups);
}
static std::string make_thread_prefix(size_t tid) { static std::string make_thread_prefix(size_t tid) { return fmt::format("t{:03x}_", tid % kPrefixGroups); }
return fmt::format("t{:03x}_", tid % kPrefixGroups);
}
static size_t latency_bucket(uint64_t us) { static size_t latency_bucket(uint64_t us) {
auto v = std::max<uint64_t>(us, 1); auto v = std::max<uint64_t>(us, 1);
@ -444,46 +438,23 @@ static std::string csv_escape(const std::string &v) {
} }
static const char *result_header() { static const char *result_header() {
return "schema_version,ts_epoch_ms,engine,workload_id,mode,durability_mode,threads,key_size,value_size,prefill_keys,shared_keyspace,distribution,zipf_theta,read_pct,update_pct,scan_pct,scan_len,read_path,warmup_secs,measure_secs,total_ops,error_ops,ops_per_sec,p50_us,p95_us,p99_us,p999_us,elapsed_us,host,os,arch,kernel,cpu_cores,mem_total_kb,mem_available_kb"; return "schema_version,ts_epoch_ms,engine,workload_id,mode,durability_mode,threads,key_size,value_size,prefill_"
"keys,shared_keyspace,distribution,zipf_theta,read_pct,update_pct,scan_pct,scan_len,read_path,warmup_secs,"
"measure_secs,total_ops,ok_ops,err_ops,ops,p50_us,p95_us,p99_us,p999_us,elapsed_us,host,os,arch,kernel,cpu_"
"cores,mem_total_kb,mem_available_kb";
} }
static std::string result_row_csv(const ResultRow &r) { static std::string result_row_csv(const ResultRow &r) {
return fmt::format( return fmt::format("v2,{},{},{},{},{},{},{},{},{},{},{},{:.4},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{}"
"v2,{},{},{},{},{},{},{},{},{},{},{},{:.4},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{}", ",{},{},{},{}",
r.ts_epoch_ms, r.ts_epoch_ms, "rocksdb", csv_escape(r.workload_id), csv_escape(r.mode),
"rocksdb", durability_str(r.durability_mode), r.threads, r.key_size, r.value_size, r.prefill_keys,
csv_escape(r.workload_id), r.shared_keyspace, distribution_str(r.distribution), r.zipf_theta, r.read_pct, r.update_pct,
csv_escape(r.mode), r.scan_pct, r.scan_len, read_path_str(r.read_path), r.warmup_secs, r.measure_secs, r.total_ops,
durability_str(r.durability_mode), r.ok_ops, r.err_ops, static_cast<uint64_t>(r.ops), r.quantiles.p50_us, r.quantiles.p95_us,
r.threads, r.quantiles.p99_us, r.quantiles.p999_us, r.elapsed_us, csv_escape(r.meta.host),
r.key_size, csv_escape(r.meta.os), csv_escape(r.meta.arch), csv_escape(r.meta.kernel), r.meta.cpu_cores,
r.value_size, r.meta.mem_total_kb, r.meta.mem_available_kb);
r.prefill_keys,
r.shared_keyspace,
distribution_str(r.distribution),
r.zipf_theta,
r.read_pct,
r.update_pct,
r.scan_pct,
r.scan_len,
read_path_str(r.read_path),
r.warmup_secs,
r.measure_secs,
r.total_ops,
r.error_ops,
static_cast<uint64_t>(r.ops_per_sec),
r.quantiles.p50_us,
r.quantiles.p95_us,
r.quantiles.p99_us,
r.quantiles.p999_us,
r.elapsed_us,
csv_escape(r.meta.host),
csv_escape(r.meta.os),
csv_escape(r.meta.arch),
csv_escape(r.meta.kernel),
r.meta.cpu_cores,
r.meta.mem_total_kb,
r.meta.mem_available_kb);
} }
static bool append_result_row(const std::string &path, const ResultRow &row) { static bool append_result_row(const std::string &path, const ResultRow &row) {
@ -511,12 +482,8 @@ static size_t sample_zipf_like(std::mt19937_64 &rng, size_t n, double theta) {
return std::min(idx, n - 1); return std::min(idx, n - 1);
} }
static std::optional<size_t> pick_key_id(std::mt19937_64 &rng, static std::optional<size_t> pick_key_id(std::mt19937_64 &rng, Distribution distribution, double zipf_theta,
Distribution distribution, bool shared_keyspace, size_t prefill_keys, size_t local_key_len) {
double zipf_theta,
bool shared_keyspace,
size_t prefill_keys,
size_t local_key_len) {
if (shared_keyspace) { if (shared_keyspace) {
if (prefill_keys == 0) { if (prefill_keys == 0) {
return std::nullopt; return std::nullopt;
@ -569,24 +536,11 @@ static std::string find_upper_bound(const std::string &prefix) {
return ""; return "";
} }
static bool run_one_op(OpKind op, static bool run_one_op(OpKind op, rocksdb::OptimisticTransactionDB *db, rocksdb::ColumnFamilyHandle *handle,
rocksdb::OptimisticTransactionDB *db, const rocksdb::WriteOptions &wopt, const std::string &value, std::mt19937_64 &rng,
rocksdb::ColumnFamilyHandle *handle, const WorkloadSpec &spec, Distribution distribution, double zipf_theta, ReadPath read_path,
const rocksdb::WriteOptions &wopt, size_t key_size, size_t scan_len, bool shared_keyspace, size_t prefill_keys,
const std::string &value, size_t local_key_len, size_t tid, std::atomic<size_t> &insert_counter, size_t &local_insert_idx,
std::mt19937_64 &rng,
const WorkloadSpec &spec,
Distribution distribution,
double zipf_theta,
ReadPath read_path,
size_t key_size,
size_t scan_len,
bool shared_keyspace,
size_t prefill_keys,
size_t local_key_len,
size_t tid,
std::atomic<size_t> &insert_counter,
size_t &local_insert_idx,
std::optional<size_t> fixed_insert_id) { std::optional<size_t> fixed_insert_id) {
if (op == OpKind::Read) { if (op == OpKind::Read) {
auto maybe_id = pick_key_id(rng, distribution, zipf_theta, shared_keyspace, prefill_keys, local_key_len); auto maybe_id = pick_key_id(rng, distribution, zipf_theta, shared_keyspace, prefill_keys, local_key_len);
@ -594,8 +548,7 @@ static bool run_one_op(OpKind op,
return false; return false;
} }
auto key = shared_keyspace auto key = shared_keyspace ? make_shared_key(maybe_id.value(), key_size)
? make_shared_key(maybe_id.value(), key_size)
: make_thread_key(tid, maybe_id.value(), key_size); : make_thread_key(tid, maybe_id.value(), key_size);
auto ropt = rocksdb::ReadOptions(); auto ropt = rocksdb::ReadOptions();
@ -821,12 +774,11 @@ int main(int argc, char *argv[]) {
options.create_if_missing = true; options.create_if_missing = true;
options.allow_concurrent_memtable_write = true; options.allow_concurrent_memtable_write = true;
options.enable_pipelined_write = true; options.enable_pipelined_write = true;
options.max_background_flushes = 8;
options.env->SetBackgroundThreads(8, rocksdb::Env::Priority::HIGH);
options.env->SetBackgroundThreads(8, rocksdb::Env::Priority::LOW);
auto wopt = rocksdb::WriteOptions(); auto wopt = rocksdb::WriteOptions();
wopt.no_slowdown = true; // allow backpressure under heavy write load so throughput reflects completed operations
// instead of inflating qps with fast-failed requests
wopt.no_slowdown = false;
wopt.sync = (durability.value() == DurabilityMode::Durable); wopt.sync = (durability.value() == DurabilityMode::Durable);
rocksdb::OptimisticTransactionDB *db = nullptr; rocksdb::OptimisticTransactionDB *db = nullptr;
@ -890,8 +842,7 @@ int main(int argc, char *argv[]) {
auto mark_measure_start = [&measure_start_ns]() { auto mark_measure_start = [&measure_start_ns]() {
uint64_t expected = 0; uint64_t expected = 0;
auto now_ns = steady_now_ns(); auto now_ns = steady_now_ns();
(void) measure_start_ns.compare_exchange_strong( (void) measure_start_ns.compare_exchange_strong(expected, now_ns, std::memory_order_relaxed);
expected, now_ns, std::memory_order_relaxed);
}; };
for (size_t tid = 0; tid < args.threads; ++tid) { for (size_t tid = 0; tid < args.threads; ++tid) {
@ -913,29 +864,13 @@ int main(int argc, char *argv[]) {
ready_barrier.arrive_and_wait(); ready_barrier.arrive_and_wait();
if (effective_warmup_secs > 0) { if (effective_warmup_secs > 0) {
auto warmup_deadline = auto warmup_deadline = std::chrono::steady_clock::now() + std::chrono::seconds(effective_warmup_secs);
std::chrono::steady_clock::now() + std::chrono::seconds(effective_warmup_secs);
while (std::chrono::steady_clock::now() < warmup_deadline) { while (std::chrono::steady_clock::now() < warmup_deadline) {
auto op = pick_op_kind(rng, workload_spec); auto op = pick_op_kind(rng, workload_spec);
(void) run_one_op(op, (void) run_one_op(op, db, handle, wopt, value, rng, workload_spec, workload_spec.distribution,
db, args.zipf_theta, read_path.value(), args.key_size, workload_spec.scan_len,
handle, args.shared_keyspace, prefill_keys, local_key_len, tid, insert_counter,
wopt, local_insert_idx, std::nullopt);
value,
rng,
workload_spec,
workload_spec.distribution,
args.zipf_theta,
read_path.value(),
args.key_size,
workload_spec.scan_len,
args.shared_keyspace,
prefill_keys,
local_key_len,
tid,
insert_counter,
local_insert_idx,
std::nullopt);
} }
} }
@ -945,39 +880,24 @@ int main(int argc, char *argv[]) {
auto record = [&](bool ok, uint64_t us) { auto record = [&](bool ok, uint64_t us) {
stats.total_ops += 1; stats.total_ops += 1;
if (!ok) { if (!ok) {
stats.error_ops += 1; stats.err_ops += 1;
} }
auto b = latency_bucket(us); auto b = latency_bucket(us);
stats.hist[b] += 1; stats.hist[b] += 1;
}; };
if (effective_measure_secs > 0) { if (effective_measure_secs > 0) {
auto deadline = auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(effective_measure_secs);
std::chrono::steady_clock::now() + std::chrono::seconds(effective_measure_secs);
while (std::chrono::steady_clock::now() < deadline) { while (std::chrono::steady_clock::now() < deadline) {
auto op = pick_op_kind(rng, workload_spec); auto op = pick_op_kind(rng, workload_spec);
auto started = std::chrono::steady_clock::now(); auto started = std::chrono::steady_clock::now();
auto ok = run_one_op(op, auto ok = run_one_op(op, db, handle, wopt, value, rng, workload_spec, workload_spec.distribution,
db, args.zipf_theta, read_path.value(), args.key_size, workload_spec.scan_len,
handle, args.shared_keyspace, prefill_keys, local_key_len, tid, insert_counter,
wopt, local_insert_idx, std::nullopt);
value,
rng,
workload_spec,
workload_spec.distribution,
args.zipf_theta,
read_path.value(),
args.key_size,
workload_spec.scan_len,
args.shared_keyspace,
prefill_keys,
local_key_len,
tid,
insert_counter,
local_insert_idx,
std::nullopt);
auto us = static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::microseconds>( auto us = static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::steady_clock::now() - started).count()); std::chrono::steady_clock::now() - started)
.count());
record(ok, us); record(ok, us);
} }
} else { } else {
@ -985,31 +905,16 @@ int main(int argc, char *argv[]) {
auto op = workload_spec.insert_only ? OpKind::Update : pick_op_kind(rng, workload_spec); auto op = workload_spec.insert_only ? OpKind::Update : pick_op_kind(rng, workload_spec);
std::optional<size_t> fixed_insert_id = std::nullopt; std::optional<size_t> fixed_insert_id = std::nullopt;
if (workload_spec.insert_only) { if (workload_spec.insert_only) {
fixed_insert_id = fixed_insert_id = args.shared_keyspace ? (local_op_start + i) : i;
args.shared_keyspace ? (local_op_start + i) : i;
} }
auto started = std::chrono::steady_clock::now(); auto started = std::chrono::steady_clock::now();
auto ok = run_one_op(op, auto ok = run_one_op(op, db, handle, wopt, value, rng, workload_spec, workload_spec.distribution,
db, args.zipf_theta, read_path.value(), args.key_size, workload_spec.scan_len,
handle, args.shared_keyspace, prefill_keys, local_key_len, tid, insert_counter,
wopt, local_insert_idx, fixed_insert_id);
value,
rng,
workload_spec,
workload_spec.distribution,
args.zipf_theta,
read_path.value(),
args.key_size,
workload_spec.scan_len,
args.shared_keyspace,
prefill_keys,
local_key_len,
tid,
insert_counter,
local_insert_idx,
fixed_insert_id);
auto us = static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::microseconds>( auto us = static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::steady_clock::now() - started).count()); std::chrono::steady_clock::now() - started)
.count());
record(ok, us); record(ok, us);
} }
} }
@ -1031,20 +936,20 @@ int main(int argc, char *argv[]) {
} }
uint64_t elapsed_us = (measure_end_ns - measure_begin_ns) / 1000; uint64_t elapsed_us = (measure_end_ns - measure_begin_ns) / 1000;
uint64_t total_ops = 0; uint64_t total_ops = 0;
uint64_t error_ops = 0; uint64_t err_ops = 0;
std::array<uint64_t, kLatencyBuckets> merged_hist{}; std::array<uint64_t, kLatencyBuckets> merged_hist{};
for (const auto &s: thread_stats) { for (const auto &s: thread_stats) {
total_ops += s.total_ops; total_ops += s.total_ops;
error_ops += s.error_ops; err_ops += s.err_ops;
for (size_t i = 0; i < merged_hist.size(); ++i) { for (size_t i = 0; i < merged_hist.size(); ++i) {
merged_hist[i] += s.hist[i]; merged_hist[i] += s.hist[i];
} }
} }
auto ops_per_sec = elapsed_us == 0 auto ops = elapsed_us == 0 ? 0.0 : (static_cast<double>(total_ops) * 1'000'000.0 / static_cast<double>(elapsed_us));
? 0.0
: (static_cast<double>(total_ops) * 1'000'000.0 / static_cast<double>(elapsed_us)); uint64_t ok_ops = total_ops >= err_ops ? (total_ops - err_ops) : 0;
auto row = ResultRow{ auto row = ResultRow{
.ts_epoch_ms = now_epoch_ms(), .ts_epoch_ms = now_epoch_ms(),
@ -1066,9 +971,11 @@ int main(int argc, char *argv[]) {
.warmup_secs = effective_warmup_secs, .warmup_secs = effective_warmup_secs,
.measure_secs = effective_measure_secs, .measure_secs = effective_measure_secs,
.total_ops = total_ops, .total_ops = total_ops,
.error_ops = error_ops, .ok_ops = ok_ops,
.ops_per_sec = ops_per_sec, .err_ops = err_ops,
.quantiles = Quantiles{ .ops = ops,
.quantiles =
Quantiles{
.p50_us = histogram_quantile_us(merged_hist, 0.50), .p50_us = histogram_quantile_us(merged_hist, 0.50),
.p95_us = histogram_quantile_us(merged_hist, 0.95), .p95_us = histogram_quantile_us(merged_hist, 0.95),
.p99_us = histogram_quantile_us(merged_hist, 0.99), .p99_us = histogram_quantile_us(merged_hist, 0.99),
@ -1085,15 +992,8 @@ int main(int argc, char *argv[]) {
fmt::println( fmt::println(
"engine=rocksdb workload={} mode={} durability={} threads={} ops={} err={} qps={} p99_us={} result_file={}", "engine=rocksdb workload={} mode={} durability={} threads={} ops={} err={} qps={} p99_us={} result_file={}",
row.workload_id, row.workload_id, row.mode, durability_str(row.durability_mode), row.threads, row.total_ops, row.err_ops,
row.mode, static_cast<uint64_t>(row.ops), row.quantiles.p99_us, args.result_file);
durability_str(row.durability_mode),
row.threads,
row.total_ops,
row.error_ops,
static_cast<uint64_t>(row.ops_per_sec),
row.quantiles.p99_us,
args.result_file);
delete handle; delete handle;
delete db; delete db;

View File

@ -18,7 +18,7 @@ def main() -> int:
parser.add_argument( parser.add_argument(
"--filter-errors", "--filter-errors",
action="store_true", action="store_true",
help="Only compare rows with error_ops == 0 (default: include all rows)", help="Only compare rows with err_ops == 0 (default: include all rows)",
) )
args = parser.parse_args() args = parser.parse_args()
@ -32,9 +32,9 @@ def main() -> int:
"value_size", "value_size",
"durability_mode", "durability_mode",
"read_path", "read_path",
"ops_per_sec", "ops",
"p99_us", "p99_us",
"error_ops", "err_ops",
} }
missing = required - set(df.columns) missing = required - set(df.columns)
if missing: if missing:
@ -50,45 +50,45 @@ def main() -> int:
] ]
if args.filter_errors: if args.filter_errors:
base = df[df["error_ops"] == 0].copy() base = df[df["err_ops"] == 0].copy()
else: else:
base = df.copy() base = df.copy()
if base.empty: if base.empty:
if args.filter_errors: if args.filter_errors:
print("No rows with error_ops == 0, cannot compare.") print("No rows with err_ops == 0, cannot compare.")
else: else:
print("No rows found in csv, cannot compare.") print("No rows found in csv, cannot compare.")
return 0 return 0
agg = base.groupby(keys + ["engine"], as_index=False).agg( agg = base.groupby(keys + ["engine"], as_index=False).agg(
ops_per_sec=("ops_per_sec", "median"), ops=("ops", "median"),
p99_us=("p99_us", "median"), p99_us=("p99_us", "median"),
error_ops=("error_ops", "median"), err_ops=("err_ops", "median"),
) )
piv = agg.pivot_table( piv = agg.pivot_table(
index=keys, index=keys,
columns="engine", columns="engine",
values=["ops_per_sec", "p99_us", "error_ops"], values=["ops", "p99_us", "err_ops"],
aggfunc="first", aggfunc="first",
) )
piv.columns = [f"{metric}_{engine}" for metric, engine in piv.columns] piv.columns = [f"{metric}_{engine}" for metric, engine in piv.columns]
out = piv.reset_index() out = piv.reset_index()
for col in [ for col in [
"ops_per_sec_mace", "ops_mace",
"ops_per_sec_rocksdb", "ops_rocksdb",
"p99_us_mace", "p99_us_mace",
"p99_us_rocksdb", "p99_us_rocksdb",
"error_ops_mace", "err_ops_mace",
"error_ops_rocksdb", "err_ops_rocksdb",
]: ]:
if col not in out.columns: if col not in out.columns:
out[col] = pd.NA out[col] = pd.NA
out["qps_ratio_mace_over_rocksdb"] = ( out["qps_ratio_mace_over_rocksdb"] = (
out["ops_per_sec_mace"] / out["ops_per_sec_rocksdb"] out["ops_mace"] / out["ops_rocksdb"]
) )
out["p99_ratio_mace_over_rocksdb"] = out["p99_us_mace"] / out["p99_us_rocksdb"] out["p99_ratio_mace_over_rocksdb"] = out["p99_us_mace"] / out["p99_us_rocksdb"]
out = out.sort_values(keys) out = out.sort_values(keys)

View File

@ -180,7 +180,7 @@ def plot_results(
thread_points: Sequence[int], thread_points: Sequence[int],
) -> list[Path]: ) -> list[Path]:
df = pd.read_csv(result_csv) df = pd.read_csv(result_csv)
required = {"engine", "mode", "threads", "key_size", "value_size", "ops_per_sec"} required = {"engine", "mode", "threads", "key_size", "value_size", "ops"}
missing = required - set(df.columns) missing = required - set(df.columns)
if missing: if missing:
raise ValueError(f"Missing required columns in csv: {sorted(missing)}") raise ValueError(f"Missing required columns in csv: {sorted(missing)}")
@ -191,7 +191,7 @@ def plot_results(
grouped = ( grouped = (
df.groupby(["engine", "mode", "key_size", "value_size", "threads"], as_index=False)[ df.groupby(["engine", "mode", "key_size", "value_size", "threads"], as_index=False)[
"ops_per_sec" "ops"
] ]
.mean() .mean()
.sort_values(["engine", "mode", "key_size", "value_size", "threads"]) .sort_values(["engine", "mode", "key_size", "value_size", "threads"])
@ -209,7 +209,7 @@ def plot_results(
continue continue
plt.figure(figsize=(16, 10)) plt.figure(figsize=(16, 10))
y_max = float(mode_df["ops_per_sec"].max()) if not mode_df.empty else 0.0 y_max = float(mode_df["ops"].max()) if not mode_df.empty else 0.0
for engine in ENGINE_ORDER: for engine in ENGINE_ORDER:
for key_size, value_size in KV_PROFILES: for key_size, value_size in KV_PROFILES:
@ -222,7 +222,7 @@ def plot_results(
continue continue
x = sub["threads"].tolist() x = sub["threads"].tolist()
y = sub["ops_per_sec"].tolist() y = sub["ops"].tolist()
label = ( label = (
f"{engine} ({format_bytes(key_size)}/{format_bytes(value_size)})" f"{engine} ({format_bytes(key_size)}/{format_bytes(value_size)})"
) )

View File

@ -25,7 +25,7 @@ def main() -> int:
"key_size", "key_size",
"value_size", "value_size",
"threads", "threads",
"ops_per_sec", "ops",
"p99_us", "p99_us",
} }
missing = needed - set(df.columns) missing = needed - set(df.columns)
@ -41,10 +41,10 @@ def main() -> int:
agg = ( agg = (
sub.groupby(grp_cols) sub.groupby(grp_cols)
.agg( .agg(
repeats=("ops_per_sec", "count"), repeats=("ops", "count"),
throughput_cv=("ops_per_sec", cv), throughput_cv=("ops", cv),
p99_cv=("p99_us", cv), p99_cv=("p99_us", cv),
throughput_median=("ops_per_sec", "median"), throughput_median=("ops", "median"),
p99_median=("p99_us", "median"), p99_median=("p99_us", "median"),
) )
.reset_index() .reset_index()

View File

@ -32,7 +32,7 @@ def main() -> int:
"value_size", "value_size",
"prefill_keys", "prefill_keys",
"threads", "threads",
"ops_per_sec", "ops",
"p95_us", "p95_us",
"p99_us", "p99_us",
} }
@ -51,8 +51,8 @@ def main() -> int:
summary = ( summary = (
sub.groupby(grp_cols) sub.groupby(grp_cols)
.agg( .agg(
repeats=("ops_per_sec", "count"), repeats=("ops", "count"),
throughput_median=("ops_per_sec", "median"), throughput_median=("ops", "median"),
p95_median=("p95_us", "median"), p95_median=("p95_us", "median"),
p99_median=("p99_us", "median"), p99_median=("p99_us", "median"),
) )

View File

@ -17,7 +17,7 @@ def main() -> int:
"workload_id", "workload_id",
"threads", "threads",
"durability_mode", "durability_mode",
"ops_per_sec", "ops",
"p99_us", "p99_us",
} }
missing = needed - set(df.columns) missing = needed - set(df.columns)
@ -39,8 +39,8 @@ def main() -> int:
base = ( base = (
sub.groupby(["engine", "workload_id", "threads", "durability_mode"]) sub.groupby(["engine", "workload_id", "threads", "durability_mode"])
.agg( .agg(
repeats=("ops_per_sec", "count"), repeats=("ops", "count"),
throughput_median=("ops_per_sec", "median"), throughput_median=("ops", "median"),
p99_median=("p99_us", "median"), p99_median=("p99_us", "median"),
) )
.reset_index() .reset_index()

View File

@ -24,7 +24,7 @@ def main() -> int:
"threads", "threads",
"key_size", "key_size",
"value_size", "value_size",
"ops_per_sec", "ops",
"p99_us", "p99_us",
} }
missing = required - set(df.columns) missing = required - set(df.columns)
@ -48,7 +48,7 @@ def main() -> int:
if sub.empty: if sub.empty:
continue continue
for metric, ylabel in (("ops_per_sec", "OPS/s"), ("p99_us", "P99 Latency (us)")): for metric, ylabel in (("ops", "OPS/s"), ("p99_us", "P99 Latency (us)")):
plt.figure(figsize=(12, 7)) plt.figure(figsize=(12, 7))
for workload in sorted(sub["workload_id"].unique()): for workload in sorted(sub["workload_id"].unique()):
wdf = sub[sub["workload_id"] == workload].sort_values("threads") wdf = sub[sub["workload_id"] == workload].sort_values("threads")

View File

@ -218,8 +218,9 @@ struct ResultRow {
warmup_secs: u64, warmup_secs: u64,
measure_secs: u64, measure_secs: u64,
total_ops: u64, total_ops: u64,
error_ops: u64, ok_ops: u64,
ops_per_sec: f64, err_ops: u64,
ops: f64,
quantiles: Quantiles, quantiles: Quantiles,
elapsed_us: u64, elapsed_us: u64,
meta: MachineMeta, meta: MachineMeta,
@ -228,7 +229,7 @@ struct ResultRow {
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
struct ThreadStats { struct ThreadStats {
total_ops: u64, total_ops: u64,
error_ops: u64, err_ops: u64,
hist: [u64; LAT_BUCKETS], hist: [u64; LAT_BUCKETS],
} }
@ -236,7 +237,7 @@ impl Default for ThreadStats {
fn default() -> Self { fn default() -> Self {
Self { Self {
total_ops: 0, total_ops: 0,
error_ops: 0, err_ops: 0,
hist: [0; LAT_BUCKETS], hist: [0; LAT_BUCKETS],
} }
} }
@ -487,12 +488,12 @@ fn csv_escape(raw: &str) -> String {
} }
fn result_header() -> &'static str { fn result_header() -> &'static str {
"schema_version,ts_epoch_ms,engine,workload_id,mode,durability_mode,threads,key_size,value_size,prefill_keys,shared_keyspace,distribution,zipf_theta,read_pct,update_pct,scan_pct,scan_len,read_path,warmup_secs,measure_secs,total_ops,error_ops,ops_per_sec,p50_us,p95_us,p99_us,p999_us,elapsed_us,host,os,arch,kernel,cpu_cores,mem_total_kb,mem_available_kb" "schema_version,ts_epoch_ms,engine,workload_id,mode,durability_mode,threads,key_size,value_size,prefill_keys,shared_keyspace,distribution,zipf_theta,read_pct,update_pct,scan_pct,scan_len,read_path,warmup_secs,measure_secs,total_ops,ok_ops,err_ops,ops,p50_us,p95_us,p99_us,p999_us,elapsed_us,host,os,arch,kernel,cpu_cores,mem_total_kb,mem_available_kb"
} }
fn result_row_csv(row: &ResultRow) -> String { fn result_row_csv(row: &ResultRow) -> String {
format!( format!(
"v2,{},{},{},{},{},{},{},{},{},{},{},{:.4},{},{},{},{},{},{},{},{},{},{:.3},{},{},{},{},{},{},{},{},{},{},{},{}", "v2,{},{},{},{},{},{},{},{},{},{},{},{:.4},{},{},{},{},{},{},{},{},{},{},{:.3},{},{},{},{},{},{},{},{},{},{},{},{}",
row.ts_epoch_ms, row.ts_epoch_ms,
row.engine, row.engine,
csv_escape(&row.workload_id), csv_escape(&row.workload_id),
@ -513,8 +514,9 @@ fn result_row_csv(row: &ResultRow) -> String {
row.warmup_secs, row.warmup_secs,
row.measure_secs, row.measure_secs,
row.total_ops, row.total_ops,
row.error_ops, row.ok_ops,
row.ops_per_sec, row.err_ops,
row.ops,
row.quantiles.p50_us, row.quantiles.p50_us,
row.quantiles.p95_us, row.quantiles.p95_us,
row.quantiles.p99_us, row.quantiles.p99_us,
@ -712,7 +714,12 @@ fn main() {
opt.cache_capacity = 3 << 30; opt.cache_capacity = 3 << 30;
opt.data_file_size = 64 << 20; opt.data_file_size = 64 << 20;
opt.max_log_size = 1 << 30; opt.max_log_size = 1 << 30;
opt.wal_buffer_size = 64 << 20;
opt.wal_file_size = 128 << 20;
opt.default_arenas = 128; opt.default_arenas = 128;
opt.gc_timeout = 600 * 1000;
opt.gc_eager = false;
opt.data_garbage_ratio = 50;
opt.tmp_store = cleanup; opt.tmp_store = cleanup;
let db = Mace::new(opt.validate().unwrap()).unwrap(); let db = Mace::new(opt.validate().unwrap()).unwrap();
@ -743,7 +750,7 @@ fn main() {
} else { } else {
make_thread_key(tid, i, key_size) make_thread_key(tid, i, key_size)
}; };
tx.put(key.as_slice(), v.as_slice()).unwrap(); tx.upsert(key.as_slice(), v.as_slice()).unwrap();
in_batch += 1; in_batch += 1;
if in_batch >= PREFILL_BATCH { if in_batch >= PREFILL_BATCH {
tx.commit().unwrap(); tx.commit().unwrap();
@ -926,12 +933,12 @@ fn main() {
let mut merged_hist = [0u64; LAT_BUCKETS]; let mut merged_hist = [0u64; LAT_BUCKETS];
let mut total_ops = 0u64; let mut total_ops = 0u64;
let mut error_ops = 0u64; let mut err_ops = 0u64;
for h in handles { for h in handles {
let s = h.join().unwrap(); let s = h.join().unwrap();
total_ops += s.total_ops; total_ops += s.total_ops;
error_ops += s.error_ops; err_ops += s.err_ops;
for (i, v) in s.hist.iter().enumerate() { for (i, v) in s.hist.iter().enumerate() {
merged_hist[i] += *v; merged_hist[i] += *v;
} }
@ -940,12 +947,14 @@ fn main() {
let measure_end = Instant::now(); let measure_end = Instant::now();
let measure_started = (*measure_start.lock().unwrap()).unwrap_or(measure_end); let measure_started = (*measure_start.lock().unwrap()).unwrap_or(measure_end);
let elapsed_us = measure_end.duration_since(measure_started).as_micros() as u64; let elapsed_us = measure_end.duration_since(measure_started).as_micros() as u64;
let ops_per_sec = if elapsed_us == 0 { let ops = if elapsed_us == 0 {
0.0 0.0
} else { } else {
(total_ops as f64) * 1_000_000.0 / (elapsed_us as f64) (total_ops as f64) * 1_000_000.0 / (elapsed_us as f64)
}; };
let ok_ops = total_ops.saturating_sub(err_ops);
let quantiles = Quantiles { let quantiles = Quantiles {
p50_us: histogram_quantile_us(&merged_hist, 0.50), p50_us: histogram_quantile_us(&merged_hist, 0.50),
p95_us: histogram_quantile_us(&merged_hist, 0.95), p95_us: histogram_quantile_us(&merged_hist, 0.95),
@ -974,8 +983,9 @@ fn main() {
warmup_secs: effective_warmup_secs, warmup_secs: effective_warmup_secs,
measure_secs: effective_measure_secs, measure_secs: effective_measure_secs,
total_ops, total_ops,
error_ops, ok_ops,
ops_per_sec, err_ops,
ops,
quantiles, quantiles,
elapsed_us, elapsed_us,
meta: gather_machine_meta(), meta: gather_machine_meta(),
@ -987,14 +997,15 @@ fn main() {
} }
println!( println!(
"engine=mace workload={} mode={} durability={} threads={} ops={} err={} qps={:.2} p99_us={} result_file={}", "engine=mace workload={} mode={} durability={} threads={} total_ops={} ok_ops={} err_ops={} ops={:.2} p99_us={} result_file={}",
row.workload_id, row.workload_id,
row.mode, row.mode,
row.durability_mode.as_str(), row.durability_mode.as_str(),
row.threads, row.threads,
row.total_ops, row.total_ops,
row.error_ops, row.ok_ops,
row.ops_per_sec, row.err_ops,
row.ops,
row.quantiles.p99_us, row.quantiles.p99_us,
args.result_file args.result_file
); );
@ -1099,11 +1110,7 @@ fn run_one_op(
if let Some(key) = key_opt { if let Some(key) = key_opt {
if let Ok(tx) = bucket.begin() { if let Ok(tx) = bucket.begin() {
let write_ok = if spec.insert_only { let write_ok = tx.upsert(key.as_slice(), value.as_slice()).is_ok();
tx.put(key.as_slice(), value.as_slice()).is_ok()
} else {
tx.upsert(key.as_slice(), value.as_slice()).is_ok()
};
if !write_ok { if !write_ok {
false false
} else { } else {
@ -1163,7 +1170,7 @@ fn run_one_op(
if let Some(stats) = stats { if let Some(stats) = stats {
stats.total_ops += 1; stats.total_ops += 1;
if !ok { if !ok {
stats.error_ops += 1; stats.err_ops += 1;
} }
if let Some(start) = start { if let Some(start) = start {
let us = start.elapsed().as_micros() as u64; let us = start.elapsed().as_micros() as u64;