Compare commits

..

2 Commits

Author SHA1 Message Date
82a845429b
rename ops 2026-03-09 18:37:36 +08:00
563f480643 align mace writes with rocksdb put semantics 2026-03-09 15:58:12 +08:00
10 changed files with 138 additions and 231 deletions

View File

@ -46,7 +46,7 @@ mkdir -p "${KV_BENCH_STORAGE_ROOT}/basic_mace" "${KV_BENCH_STORAGE_ROOT}/basic_r
## What Is Compared
- Comparison unit: rows with identical `workload_id`, `threads`, `key_size`, `value_size`, `durability_mode`, `read_path`
- Fairness rule for read-heavy workloads: `get`, `scan`, and `W1`-`W6` run one GC/compaction pass after prefill and before warmup/measurement, so RocksDB is not compared with GC artificially disabled while reads may have to touch multiple SSTs
- Throughput metric: workload-level `ops_per_sec` (higher is better)
- Throughput metric: workload-level `ops` (higher is better)
- Tail latency metric: workload-level `p99_us` (lower is better)
- This is the workload-level p99 of all operations executed in that row, not per-op-type p99

View File

@ -177,9 +177,9 @@ Unified schema columns include:
- `workload_id` (`W1..W6`)
- `durability_mode` (`relaxed` / `durable`)
- `threads,key_size,value_size,prefill_keys`
- `ops_per_sec`
- `ops`
- `total_ops,ok_ops,err_ops`
- `p50_us,p95_us,p99_us,p999_us`
- `error_ops`
- `read_path`
## 7. Report Commands
@ -207,4 +207,4 @@ Only compare rows with identical:
- `read_path`
- read-heavy workload rows are expected to include the pre-run GC/compaction pass described above
If `error_ops > 0`, investigate that case before drawing conclusions.
If `err_ops > 0`, investigate that case before drawing conclusions.

View File

@ -137,7 +137,7 @@ struct Quantiles {
struct ThreadStats {
uint64_t total_ops = 0;
uint64_t error_ops = 0;
uint64_t err_ops = 0;
std::array<uint64_t, kLatencyBuckets> hist{};
};
@ -161,8 +161,9 @@ struct ResultRow {
uint64_t warmup_secs;
uint64_t measure_secs;
uint64_t total_ops;
uint64_t error_ops;
double ops_per_sec;
uint64_t ok_ops;
uint64_t err_ops;
double ops;
Quantiles quantiles;
uint64_t elapsed_us;
MachineMeta meta;
@ -283,12 +284,9 @@ static std::optional<WorkloadSpec> parse_workload(const Args &args, std::string
return std::nullopt;
}
static bool workload_runs_gc(const WorkloadSpec &spec) {
return spec.requires_prefill;
}
static bool workload_runs_gc(const WorkloadSpec &spec) { return spec.requires_prefill; }
static void run_prefill_gc(rocksdb::OptimisticTransactionDB *db,
rocksdb::ColumnFamilyHandle *handle) {
static void run_prefill_gc(rocksdb::OptimisticTransactionDB *db, rocksdb::ColumnFamilyHandle *handle) {
require_ok(db->EnableAutoCompaction({handle}), "enable auto compaction");
rocksdb::FlushOptions flush_options;
@ -333,13 +331,9 @@ static std::string make_thread_key(size_t tid, size_t local_id, size_t key_size)
return key;
}
static std::string make_shared_prefix(size_t id) {
return fmt::format("s{:03x}_", id % kPrefixGroups);
}
static std::string make_shared_prefix(size_t id) { return fmt::format("s{:03x}_", id % kPrefixGroups); }
static std::string make_thread_prefix(size_t tid) {
return fmt::format("t{:03x}_", tid % kPrefixGroups);
}
static std::string make_thread_prefix(size_t tid) { return fmt::format("t{:03x}_", tid % kPrefixGroups); }
static size_t latency_bucket(uint64_t us) {
auto v = std::max<uint64_t>(us, 1);
@ -412,7 +406,7 @@ static MachineMeta gather_machine_meta() {
std::snprintf(host_buf, sizeof(host_buf), "unknown");
}
struct utsname uts {};
struct utsname uts{};
std::string kernel = "unknown";
std::string os = "unknown";
std::string arch = "unknown";
@ -444,46 +438,23 @@ static std::string csv_escape(const std::string &v) {
}
static const char *result_header() {
return "schema_version,ts_epoch_ms,engine,workload_id,mode,durability_mode,threads,key_size,value_size,prefill_keys,shared_keyspace,distribution,zipf_theta,read_pct,update_pct,scan_pct,scan_len,read_path,warmup_secs,measure_secs,total_ops,error_ops,ops_per_sec,p50_us,p95_us,p99_us,p999_us,elapsed_us,host,os,arch,kernel,cpu_cores,mem_total_kb,mem_available_kb";
return "schema_version,ts_epoch_ms,engine,workload_id,mode,durability_mode,threads,key_size,value_size,prefill_"
"keys,shared_keyspace,distribution,zipf_theta,read_pct,update_pct,scan_pct,scan_len,read_path,warmup_secs,"
"measure_secs,total_ops,ok_ops,err_ops,ops,p50_us,p95_us,p99_us,p999_us,elapsed_us,host,os,arch,kernel,cpu_"
"cores,mem_total_kb,mem_available_kb";
}
static std::string result_row_csv(const ResultRow &r) {
return fmt::format(
"v2,{},{},{},{},{},{},{},{},{},{},{},{:.4},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{}",
r.ts_epoch_ms,
"rocksdb",
csv_escape(r.workload_id),
csv_escape(r.mode),
durability_str(r.durability_mode),
r.threads,
r.key_size,
r.value_size,
r.prefill_keys,
r.shared_keyspace,
distribution_str(r.distribution),
r.zipf_theta,
r.read_pct,
r.update_pct,
r.scan_pct,
r.scan_len,
read_path_str(r.read_path),
r.warmup_secs,
r.measure_secs,
r.total_ops,
r.error_ops,
static_cast<uint64_t>(r.ops_per_sec),
r.quantiles.p50_us,
r.quantiles.p95_us,
r.quantiles.p99_us,
r.quantiles.p999_us,
r.elapsed_us,
csv_escape(r.meta.host),
csv_escape(r.meta.os),
csv_escape(r.meta.arch),
csv_escape(r.meta.kernel),
r.meta.cpu_cores,
r.meta.mem_total_kb,
r.meta.mem_available_kb);
return fmt::format("v2,{},{},{},{},{},{},{},{},{},{},{},{:.4},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{}"
",{},{},{},{}",
r.ts_epoch_ms, "rocksdb", csv_escape(r.workload_id), csv_escape(r.mode),
durability_str(r.durability_mode), r.threads, r.key_size, r.value_size, r.prefill_keys,
r.shared_keyspace, distribution_str(r.distribution), r.zipf_theta, r.read_pct, r.update_pct,
r.scan_pct, r.scan_len, read_path_str(r.read_path), r.warmup_secs, r.measure_secs, r.total_ops,
r.ok_ops, r.err_ops, static_cast<uint64_t>(r.ops), r.quantiles.p50_us, r.quantiles.p95_us,
r.quantiles.p99_us, r.quantiles.p999_us, r.elapsed_us, csv_escape(r.meta.host),
csv_escape(r.meta.os), csv_escape(r.meta.arch), csv_escape(r.meta.kernel), r.meta.cpu_cores,
r.meta.mem_total_kb, r.meta.mem_available_kb);
}
static bool append_result_row(const std::string &path, const ResultRow &row) {
@ -511,12 +482,8 @@ static size_t sample_zipf_like(std::mt19937_64 &rng, size_t n, double theta) {
return std::min(idx, n - 1);
}
static std::optional<size_t> pick_key_id(std::mt19937_64 &rng,
Distribution distribution,
double zipf_theta,
bool shared_keyspace,
size_t prefill_keys,
size_t local_key_len) {
static std::optional<size_t> pick_key_id(std::mt19937_64 &rng, Distribution distribution, double zipf_theta,
bool shared_keyspace, size_t prefill_keys, size_t local_key_len) {
if (shared_keyspace) {
if (prefill_keys == 0) {
return std::nullopt;
@ -569,24 +536,11 @@ static std::string find_upper_bound(const std::string &prefix) {
return "";
}
static bool run_one_op(OpKind op,
rocksdb::OptimisticTransactionDB *db,
rocksdb::ColumnFamilyHandle *handle,
const rocksdb::WriteOptions &wopt,
const std::string &value,
std::mt19937_64 &rng,
const WorkloadSpec &spec,
Distribution distribution,
double zipf_theta,
ReadPath read_path,
size_t key_size,
size_t scan_len,
bool shared_keyspace,
size_t prefill_keys,
size_t local_key_len,
size_t tid,
std::atomic<size_t> &insert_counter,
size_t &local_insert_idx,
static bool run_one_op(OpKind op, rocksdb::OptimisticTransactionDB *db, rocksdb::ColumnFamilyHandle *handle,
const rocksdb::WriteOptions &wopt, const std::string &value, std::mt19937_64 &rng,
const WorkloadSpec &spec, Distribution distribution, double zipf_theta, ReadPath read_path,
size_t key_size, size_t scan_len, bool shared_keyspace, size_t prefill_keys,
size_t local_key_len, size_t tid, std::atomic<size_t> &insert_counter, size_t &local_insert_idx,
std::optional<size_t> fixed_insert_id) {
if (op == OpKind::Read) {
auto maybe_id = pick_key_id(rng, distribution, zipf_theta, shared_keyspace, prefill_keys, local_key_len);
@ -594,9 +548,8 @@ static bool run_one_op(OpKind op,
return false;
}
auto key = shared_keyspace
? make_shared_key(maybe_id.value(), key_size)
: make_thread_key(tid, maybe_id.value(), key_size);
auto key = shared_keyspace ? make_shared_key(maybe_id.value(), key_size)
: make_thread_key(tid, maybe_id.value(), key_size);
auto ropt = rocksdb::ReadOptions();
std::string out;
@ -791,8 +744,8 @@ int main(int argc, char *argv[]) {
}
auto prefill_keys = workload_spec.requires_prefill
? (args.prefill_keys > 0 ? args.prefill_keys : std::max<size_t>(args.iterations, 1))
: args.prefill_keys;
? (args.prefill_keys > 0 ? args.prefill_keys : std::max<size_t>(args.iterations, 1))
: args.prefill_keys;
if (workload_spec.requires_prefill && prefill_keys == 0) {
fmt::println(stderr, "prefill_keys must be > 0 for read/mixed/scan workloads");
@ -821,12 +774,11 @@ int main(int argc, char *argv[]) {
options.create_if_missing = true;
options.allow_concurrent_memtable_write = true;
options.enable_pipelined_write = true;
options.max_background_flushes = 8;
options.env->SetBackgroundThreads(8, rocksdb::Env::Priority::HIGH);
options.env->SetBackgroundThreads(8, rocksdb::Env::Priority::LOW);
auto wopt = rocksdb::WriteOptions();
wopt.no_slowdown = true;
// allow backpressure under heavy write load so throughput reflects completed operations
// instead of inflating qps with fast-failed requests
wopt.no_slowdown = false;
wopt.sync = (durability.value() == DurabilityMode::Durable);
rocksdb::OptimisticTransactionDB *db = nullptr;
@ -890,8 +842,7 @@ int main(int argc, char *argv[]) {
auto mark_measure_start = [&measure_start_ns]() {
uint64_t expected = 0;
auto now_ns = steady_now_ns();
(void) measure_start_ns.compare_exchange_strong(
expected, now_ns, std::memory_order_relaxed);
(void) measure_start_ns.compare_exchange_strong(expected, now_ns, std::memory_order_relaxed);
};
for (size_t tid = 0; tid < args.threads; ++tid) {
@ -913,29 +864,13 @@ int main(int argc, char *argv[]) {
ready_barrier.arrive_and_wait();
if (effective_warmup_secs > 0) {
auto warmup_deadline =
std::chrono::steady_clock::now() + std::chrono::seconds(effective_warmup_secs);
auto warmup_deadline = std::chrono::steady_clock::now() + std::chrono::seconds(effective_warmup_secs);
while (std::chrono::steady_clock::now() < warmup_deadline) {
auto op = pick_op_kind(rng, workload_spec);
(void) run_one_op(op,
db,
handle,
wopt,
value,
rng,
workload_spec,
workload_spec.distribution,
args.zipf_theta,
read_path.value(),
args.key_size,
workload_spec.scan_len,
args.shared_keyspace,
prefill_keys,
local_key_len,
tid,
insert_counter,
local_insert_idx,
std::nullopt);
(void) run_one_op(op, db, handle, wopt, value, rng, workload_spec, workload_spec.distribution,
args.zipf_theta, read_path.value(), args.key_size, workload_spec.scan_len,
args.shared_keyspace, prefill_keys, local_key_len, tid, insert_counter,
local_insert_idx, std::nullopt);
}
}
@ -945,39 +880,24 @@ int main(int argc, char *argv[]) {
auto record = [&](bool ok, uint64_t us) {
stats.total_ops += 1;
if (!ok) {
stats.error_ops += 1;
stats.err_ops += 1;
}
auto b = latency_bucket(us);
stats.hist[b] += 1;
};
if (effective_measure_secs > 0) {
auto deadline =
std::chrono::steady_clock::now() + std::chrono::seconds(effective_measure_secs);
auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(effective_measure_secs);
while (std::chrono::steady_clock::now() < deadline) {
auto op = pick_op_kind(rng, workload_spec);
auto started = std::chrono::steady_clock::now();
auto ok = run_one_op(op,
db,
handle,
wopt,
value,
rng,
workload_spec,
workload_spec.distribution,
args.zipf_theta,
read_path.value(),
args.key_size,
workload_spec.scan_len,
args.shared_keyspace,
prefill_keys,
local_key_len,
tid,
insert_counter,
local_insert_idx,
std::nullopt);
auto ok = run_one_op(op, db, handle, wopt, value, rng, workload_spec, workload_spec.distribution,
args.zipf_theta, read_path.value(), args.key_size, workload_spec.scan_len,
args.shared_keyspace, prefill_keys, local_key_len, tid, insert_counter,
local_insert_idx, std::nullopt);
auto us = static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::steady_clock::now() - started).count());
std::chrono::steady_clock::now() - started)
.count());
record(ok, us);
}
} else {
@ -985,31 +905,16 @@ int main(int argc, char *argv[]) {
auto op = workload_spec.insert_only ? OpKind::Update : pick_op_kind(rng, workload_spec);
std::optional<size_t> fixed_insert_id = std::nullopt;
if (workload_spec.insert_only) {
fixed_insert_id =
args.shared_keyspace ? (local_op_start + i) : i;
fixed_insert_id = args.shared_keyspace ? (local_op_start + i) : i;
}
auto started = std::chrono::steady_clock::now();
auto ok = run_one_op(op,
db,
handle,
wopt,
value,
rng,
workload_spec,
workload_spec.distribution,
args.zipf_theta,
read_path.value(),
args.key_size,
workload_spec.scan_len,
args.shared_keyspace,
prefill_keys,
local_key_len,
tid,
insert_counter,
local_insert_idx,
fixed_insert_id);
auto ok = run_one_op(op, db, handle, wopt, value, rng, workload_spec, workload_spec.distribution,
args.zipf_theta, read_path.value(), args.key_size, workload_spec.scan_len,
args.shared_keyspace, prefill_keys, local_key_len, tid, insert_counter,
local_insert_idx, fixed_insert_id);
auto us = static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::steady_clock::now() - started).count());
std::chrono::steady_clock::now() - started)
.count());
record(ok, us);
}
}
@ -1031,20 +936,20 @@ int main(int argc, char *argv[]) {
}
uint64_t elapsed_us = (measure_end_ns - measure_begin_ns) / 1000;
uint64_t total_ops = 0;
uint64_t error_ops = 0;
uint64_t err_ops = 0;
std::array<uint64_t, kLatencyBuckets> merged_hist{};
for (const auto &s: thread_stats) {
total_ops += s.total_ops;
error_ops += s.error_ops;
err_ops += s.err_ops;
for (size_t i = 0; i < merged_hist.size(); ++i) {
merged_hist[i] += s.hist[i];
}
}
auto ops_per_sec = elapsed_us == 0
? 0.0
: (static_cast<double>(total_ops) * 1'000'000.0 / static_cast<double>(elapsed_us));
auto ops = elapsed_us == 0 ? 0.0 : (static_cast<double>(total_ops) * 1'000'000.0 / static_cast<double>(elapsed_us));
uint64_t ok_ops = total_ops >= err_ops ? (total_ops - err_ops) : 0;
auto row = ResultRow{
.ts_epoch_ms = now_epoch_ms(),
@ -1066,14 +971,16 @@ int main(int argc, char *argv[]) {
.warmup_secs = effective_warmup_secs,
.measure_secs = effective_measure_secs,
.total_ops = total_ops,
.error_ops = error_ops,
.ops_per_sec = ops_per_sec,
.quantiles = Quantiles{
.p50_us = histogram_quantile_us(merged_hist, 0.50),
.p95_us = histogram_quantile_us(merged_hist, 0.95),
.p99_us = histogram_quantile_us(merged_hist, 0.99),
.p999_us = histogram_quantile_us(merged_hist, 0.999),
},
.ok_ops = ok_ops,
.err_ops = err_ops,
.ops = ops,
.quantiles =
Quantiles{
.p50_us = histogram_quantile_us(merged_hist, 0.50),
.p95_us = histogram_quantile_us(merged_hist, 0.95),
.p99_us = histogram_quantile_us(merged_hist, 0.99),
.p999_us = histogram_quantile_us(merged_hist, 0.999),
},
.elapsed_us = elapsed_us,
.meta = gather_machine_meta(),
};
@ -1085,15 +992,8 @@ int main(int argc, char *argv[]) {
fmt::println(
"engine=rocksdb workload={} mode={} durability={} threads={} ops={} err={} qps={} p99_us={} result_file={}",
row.workload_id,
row.mode,
durability_str(row.durability_mode),
row.threads,
row.total_ops,
row.error_ops,
static_cast<uint64_t>(row.ops_per_sec),
row.quantiles.p99_us,
args.result_file);
row.workload_id, row.mode, durability_str(row.durability_mode), row.threads, row.total_ops, row.err_ops,
static_cast<uint64_t>(row.ops), row.quantiles.p99_us, args.result_file);
delete handle;
delete db;

View File

@ -18,7 +18,7 @@ def main() -> int:
parser.add_argument(
"--filter-errors",
action="store_true",
help="Only compare rows with error_ops == 0 (default: include all rows)",
help="Only compare rows with err_ops == 0 (default: include all rows)",
)
args = parser.parse_args()
@ -32,9 +32,9 @@ def main() -> int:
"value_size",
"durability_mode",
"read_path",
"ops_per_sec",
"ops",
"p99_us",
"error_ops",
"err_ops",
}
missing = required - set(df.columns)
if missing:
@ -50,45 +50,45 @@ def main() -> int:
]
if args.filter_errors:
base = df[df["error_ops"] == 0].copy()
base = df[df["err_ops"] == 0].copy()
else:
base = df.copy()
if base.empty:
if args.filter_errors:
print("No rows with error_ops == 0, cannot compare.")
print("No rows with err_ops == 0, cannot compare.")
else:
print("No rows found in csv, cannot compare.")
return 0
agg = base.groupby(keys + ["engine"], as_index=False).agg(
ops_per_sec=("ops_per_sec", "median"),
ops=("ops", "median"),
p99_us=("p99_us", "median"),
error_ops=("error_ops", "median"),
err_ops=("err_ops", "median"),
)
piv = agg.pivot_table(
index=keys,
columns="engine",
values=["ops_per_sec", "p99_us", "error_ops"],
values=["ops", "p99_us", "err_ops"],
aggfunc="first",
)
piv.columns = [f"{metric}_{engine}" for metric, engine in piv.columns]
out = piv.reset_index()
for col in [
"ops_per_sec_mace",
"ops_per_sec_rocksdb",
"ops_mace",
"ops_rocksdb",
"p99_us_mace",
"p99_us_rocksdb",
"error_ops_mace",
"error_ops_rocksdb",
"err_ops_mace",
"err_ops_rocksdb",
]:
if col not in out.columns:
out[col] = pd.NA
out["qps_ratio_mace_over_rocksdb"] = (
out["ops_per_sec_mace"] / out["ops_per_sec_rocksdb"]
out["ops_mace"] / out["ops_rocksdb"]
)
out["p99_ratio_mace_over_rocksdb"] = out["p99_us_mace"] / out["p99_us_rocksdb"]
out = out.sort_values(keys)

View File

@ -180,7 +180,7 @@ def plot_results(
thread_points: Sequence[int],
) -> list[Path]:
df = pd.read_csv(result_csv)
required = {"engine", "mode", "threads", "key_size", "value_size", "ops_per_sec"}
required = {"engine", "mode", "threads", "key_size", "value_size", "ops"}
missing = required - set(df.columns)
if missing:
raise ValueError(f"Missing required columns in csv: {sorted(missing)}")
@ -191,7 +191,7 @@ def plot_results(
grouped = (
df.groupby(["engine", "mode", "key_size", "value_size", "threads"], as_index=False)[
"ops_per_sec"
"ops"
]
.mean()
.sort_values(["engine", "mode", "key_size", "value_size", "threads"])
@ -209,7 +209,7 @@ def plot_results(
continue
plt.figure(figsize=(16, 10))
y_max = float(mode_df["ops_per_sec"].max()) if not mode_df.empty else 0.0
y_max = float(mode_df["ops"].max()) if not mode_df.empty else 0.0
for engine in ENGINE_ORDER:
for key_size, value_size in KV_PROFILES:
@ -222,7 +222,7 @@ def plot_results(
continue
x = sub["threads"].tolist()
y = sub["ops_per_sec"].tolist()
y = sub["ops"].tolist()
label = (
f"{engine} ({format_bytes(key_size)}/{format_bytes(value_size)})"
)

View File

@ -25,7 +25,7 @@ def main() -> int:
"key_size",
"value_size",
"threads",
"ops_per_sec",
"ops",
"p99_us",
}
missing = needed - set(df.columns)
@ -41,10 +41,10 @@ def main() -> int:
agg = (
sub.groupby(grp_cols)
.agg(
repeats=("ops_per_sec", "count"),
throughput_cv=("ops_per_sec", cv),
repeats=("ops", "count"),
throughput_cv=("ops", cv),
p99_cv=("p99_us", cv),
throughput_median=("ops_per_sec", "median"),
throughput_median=("ops", "median"),
p99_median=("p99_us", "median"),
)
.reset_index()

View File

@ -32,7 +32,7 @@ def main() -> int:
"value_size",
"prefill_keys",
"threads",
"ops_per_sec",
"ops",
"p95_us",
"p99_us",
}
@ -51,8 +51,8 @@ def main() -> int:
summary = (
sub.groupby(grp_cols)
.agg(
repeats=("ops_per_sec", "count"),
throughput_median=("ops_per_sec", "median"),
repeats=("ops", "count"),
throughput_median=("ops", "median"),
p95_median=("p95_us", "median"),
p99_median=("p99_us", "median"),
)

View File

@ -17,7 +17,7 @@ def main() -> int:
"workload_id",
"threads",
"durability_mode",
"ops_per_sec",
"ops",
"p99_us",
}
missing = needed - set(df.columns)
@ -39,8 +39,8 @@ def main() -> int:
base = (
sub.groupby(["engine", "workload_id", "threads", "durability_mode"])
.agg(
repeats=("ops_per_sec", "count"),
throughput_median=("ops_per_sec", "median"),
repeats=("ops", "count"),
throughput_median=("ops", "median"),
p99_median=("p99_us", "median"),
)
.reset_index()

View File

@ -24,7 +24,7 @@ def main() -> int:
"threads",
"key_size",
"value_size",
"ops_per_sec",
"ops",
"p99_us",
}
missing = required - set(df.columns)
@ -48,7 +48,7 @@ def main() -> int:
if sub.empty:
continue
for metric, ylabel in (("ops_per_sec", "OPS/s"), ("p99_us", "P99 Latency (us)")):
for metric, ylabel in (("ops", "OPS/s"), ("p99_us", "P99 Latency (us)")):
plt.figure(figsize=(12, 7))
for workload in sorted(sub["workload_id"].unique()):
wdf = sub[sub["workload_id"] == workload].sort_values("threads")

View File

@ -218,8 +218,9 @@ struct ResultRow {
warmup_secs: u64,
measure_secs: u64,
total_ops: u64,
error_ops: u64,
ops_per_sec: f64,
ok_ops: u64,
err_ops: u64,
ops: f64,
quantiles: Quantiles,
elapsed_us: u64,
meta: MachineMeta,
@ -228,7 +229,7 @@ struct ResultRow {
#[derive(Clone, Debug)]
struct ThreadStats {
total_ops: u64,
error_ops: u64,
err_ops: u64,
hist: [u64; LAT_BUCKETS],
}
@ -236,7 +237,7 @@ impl Default for ThreadStats {
fn default() -> Self {
Self {
total_ops: 0,
error_ops: 0,
err_ops: 0,
hist: [0; LAT_BUCKETS],
}
}
@ -487,12 +488,12 @@ fn csv_escape(raw: &str) -> String {
}
fn result_header() -> &'static str {
"schema_version,ts_epoch_ms,engine,workload_id,mode,durability_mode,threads,key_size,value_size,prefill_keys,shared_keyspace,distribution,zipf_theta,read_pct,update_pct,scan_pct,scan_len,read_path,warmup_secs,measure_secs,total_ops,error_ops,ops_per_sec,p50_us,p95_us,p99_us,p999_us,elapsed_us,host,os,arch,kernel,cpu_cores,mem_total_kb,mem_available_kb"
"schema_version,ts_epoch_ms,engine,workload_id,mode,durability_mode,threads,key_size,value_size,prefill_keys,shared_keyspace,distribution,zipf_theta,read_pct,update_pct,scan_pct,scan_len,read_path,warmup_secs,measure_secs,total_ops,ok_ops,err_ops,ops,p50_us,p95_us,p99_us,p999_us,elapsed_us,host,os,arch,kernel,cpu_cores,mem_total_kb,mem_available_kb"
}
fn result_row_csv(row: &ResultRow) -> String {
format!(
"v2,{},{},{},{},{},{},{},{},{},{},{},{:.4},{},{},{},{},{},{},{},{},{},{:.3},{},{},{},{},{},{},{},{},{},{},{},{}",
"v2,{},{},{},{},{},{},{},{},{},{},{},{:.4},{},{},{},{},{},{},{},{},{},{},{:.3},{},{},{},{},{},{},{},{},{},{},{},{}",
row.ts_epoch_ms,
row.engine,
csv_escape(&row.workload_id),
@ -513,8 +514,9 @@ fn result_row_csv(row: &ResultRow) -> String {
row.warmup_secs,
row.measure_secs,
row.total_ops,
row.error_ops,
row.ops_per_sec,
row.ok_ops,
row.err_ops,
row.ops,
row.quantiles.p50_us,
row.quantiles.p95_us,
row.quantiles.p99_us,
@ -712,7 +714,12 @@ fn main() {
opt.cache_capacity = 3 << 30;
opt.data_file_size = 64 << 20;
opt.max_log_size = 1 << 30;
opt.wal_buffer_size = 64 << 20;
opt.wal_file_size = 128 << 20;
opt.default_arenas = 128;
opt.gc_timeout = 600 * 1000;
opt.gc_eager = false;
opt.data_garbage_ratio = 50;
opt.tmp_store = cleanup;
let db = Mace::new(opt.validate().unwrap()).unwrap();
@ -743,7 +750,7 @@ fn main() {
} else {
make_thread_key(tid, i, key_size)
};
tx.put(key.as_slice(), v.as_slice()).unwrap();
tx.upsert(key.as_slice(), v.as_slice()).unwrap();
in_batch += 1;
if in_batch >= PREFILL_BATCH {
tx.commit().unwrap();
@ -926,12 +933,12 @@ fn main() {
let mut merged_hist = [0u64; LAT_BUCKETS];
let mut total_ops = 0u64;
let mut error_ops = 0u64;
let mut err_ops = 0u64;
for h in handles {
let s = h.join().unwrap();
total_ops += s.total_ops;
error_ops += s.error_ops;
err_ops += s.err_ops;
for (i, v) in s.hist.iter().enumerate() {
merged_hist[i] += *v;
}
@ -940,12 +947,14 @@ fn main() {
let measure_end = Instant::now();
let measure_started = (*measure_start.lock().unwrap()).unwrap_or(measure_end);
let elapsed_us = measure_end.duration_since(measure_started).as_micros() as u64;
let ops_per_sec = if elapsed_us == 0 {
let ops = if elapsed_us == 0 {
0.0
} else {
(total_ops as f64) * 1_000_000.0 / (elapsed_us as f64)
};
let ok_ops = total_ops.saturating_sub(err_ops);
let quantiles = Quantiles {
p50_us: histogram_quantile_us(&merged_hist, 0.50),
p95_us: histogram_quantile_us(&merged_hist, 0.95),
@ -974,8 +983,9 @@ fn main() {
warmup_secs: effective_warmup_secs,
measure_secs: effective_measure_secs,
total_ops,
error_ops,
ops_per_sec,
ok_ops,
err_ops,
ops,
quantiles,
elapsed_us,
meta: gather_machine_meta(),
@ -987,14 +997,15 @@ fn main() {
}
println!(
"engine=mace workload={} mode={} durability={} threads={} ops={} err={} qps={:.2} p99_us={} result_file={}",
"engine=mace workload={} mode={} durability={} threads={} total_ops={} ok_ops={} err_ops={} ops={:.2} p99_us={} result_file={}",
row.workload_id,
row.mode,
row.durability_mode.as_str(),
row.threads,
row.total_ops,
row.error_ops,
row.ops_per_sec,
row.ok_ops,
row.err_ops,
row.ops,
row.quantiles.p99_us,
args.result_file
);
@ -1099,11 +1110,7 @@ fn run_one_op(
if let Some(key) = key_opt {
if let Ok(tx) = bucket.begin() {
let write_ok = if spec.insert_only {
tx.put(key.as_slice(), value.as_slice()).is_ok()
} else {
tx.upsert(key.as_slice(), value.as_slice()).is_ok()
};
let write_ok = tx.upsert(key.as_slice(), value.as_slice()).is_ok();
if !write_ok {
false
} else {
@ -1163,7 +1170,7 @@ fn run_one_op(
if let Some(stats) = stats {
stats.total_ops += 1;
if !ok {
stats.error_ops += 1;
stats.err_ops += 1;
}
if let Some(start) = start {
let us = start.elapsed().as_micros() as u64;