Compare commits
2 Commits
fca68dddcf
...
82a845429b
| Author | SHA1 | Date | |
|---|---|---|---|
| 82a845429b | |||
| 563f480643 |
@ -46,7 +46,7 @@ mkdir -p "${KV_BENCH_STORAGE_ROOT}/basic_mace" "${KV_BENCH_STORAGE_ROOT}/basic_r
|
||||
## What Is Compared
|
||||
- Comparison unit: rows with identical `workload_id`, `threads`, `key_size`, `value_size`, `durability_mode`, `read_path`
|
||||
- Fairness rule for read-heavy workloads: `get`, `scan`, and `W1`-`W6` run one GC/compaction pass after prefill and before warmup/measurement, so RocksDB is not compared with GC artificially disabled while reads may have to touch multiple SSTs
|
||||
- Throughput metric: workload-level `ops_per_sec` (higher is better)
|
||||
- Throughput metric: workload-level `ops` (higher is better)
|
||||
- Tail latency metric: workload-level `p99_us` (lower is better)
|
||||
- This is the workload-level p99 of all operations executed in that row, not per-op-type p99
|
||||
|
||||
|
||||
@ -177,9 +177,9 @@ Unified schema columns include:
|
||||
- `workload_id` (`W1..W6`)
|
||||
- `durability_mode` (`relaxed` / `durable`)
|
||||
- `threads,key_size,value_size,prefill_keys`
|
||||
- `ops_per_sec`
|
||||
- `ops`
|
||||
- `total_ops,ok_ops,err_ops`
|
||||
- `p50_us,p95_us,p99_us,p999_us`
|
||||
- `error_ops`
|
||||
- `read_path`
|
||||
|
||||
## 7. Report Commands
|
||||
@ -207,4 +207,4 @@ Only compare rows with identical:
|
||||
- `read_path`
|
||||
- read-heavy workload rows are expected to include the pre-run GC/compaction pass described above
|
||||
|
||||
If `error_ops > 0`, investigate that case before drawing conclusions.
|
||||
If `err_ops > 0`, investigate that case before drawing conclusions.
|
||||
|
||||
250
rocksdb/main.cpp
250
rocksdb/main.cpp
@ -137,7 +137,7 @@ struct Quantiles {
|
||||
|
||||
struct ThreadStats {
|
||||
uint64_t total_ops = 0;
|
||||
uint64_t error_ops = 0;
|
||||
uint64_t err_ops = 0;
|
||||
std::array<uint64_t, kLatencyBuckets> hist{};
|
||||
};
|
||||
|
||||
@ -161,8 +161,9 @@ struct ResultRow {
|
||||
uint64_t warmup_secs;
|
||||
uint64_t measure_secs;
|
||||
uint64_t total_ops;
|
||||
uint64_t error_ops;
|
||||
double ops_per_sec;
|
||||
uint64_t ok_ops;
|
||||
uint64_t err_ops;
|
||||
double ops;
|
||||
Quantiles quantiles;
|
||||
uint64_t elapsed_us;
|
||||
MachineMeta meta;
|
||||
@ -283,12 +284,9 @@ static std::optional<WorkloadSpec> parse_workload(const Args &args, std::string
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
static bool workload_runs_gc(const WorkloadSpec &spec) {
|
||||
return spec.requires_prefill;
|
||||
}
|
||||
static bool workload_runs_gc(const WorkloadSpec &spec) { return spec.requires_prefill; }
|
||||
|
||||
static void run_prefill_gc(rocksdb::OptimisticTransactionDB *db,
|
||||
rocksdb::ColumnFamilyHandle *handle) {
|
||||
static void run_prefill_gc(rocksdb::OptimisticTransactionDB *db, rocksdb::ColumnFamilyHandle *handle) {
|
||||
require_ok(db->EnableAutoCompaction({handle}), "enable auto compaction");
|
||||
|
||||
rocksdb::FlushOptions flush_options;
|
||||
@ -333,13 +331,9 @@ static std::string make_thread_key(size_t tid, size_t local_id, size_t key_size)
|
||||
return key;
|
||||
}
|
||||
|
||||
static std::string make_shared_prefix(size_t id) {
|
||||
return fmt::format("s{:03x}_", id % kPrefixGroups);
|
||||
}
|
||||
static std::string make_shared_prefix(size_t id) { return fmt::format("s{:03x}_", id % kPrefixGroups); }
|
||||
|
||||
static std::string make_thread_prefix(size_t tid) {
|
||||
return fmt::format("t{:03x}_", tid % kPrefixGroups);
|
||||
}
|
||||
static std::string make_thread_prefix(size_t tid) { return fmt::format("t{:03x}_", tid % kPrefixGroups); }
|
||||
|
||||
static size_t latency_bucket(uint64_t us) {
|
||||
auto v = std::max<uint64_t>(us, 1);
|
||||
@ -412,7 +406,7 @@ static MachineMeta gather_machine_meta() {
|
||||
std::snprintf(host_buf, sizeof(host_buf), "unknown");
|
||||
}
|
||||
|
||||
struct utsname uts {};
|
||||
struct utsname uts{};
|
||||
std::string kernel = "unknown";
|
||||
std::string os = "unknown";
|
||||
std::string arch = "unknown";
|
||||
@ -444,46 +438,23 @@ static std::string csv_escape(const std::string &v) {
|
||||
}
|
||||
|
||||
static const char *result_header() {
|
||||
return "schema_version,ts_epoch_ms,engine,workload_id,mode,durability_mode,threads,key_size,value_size,prefill_keys,shared_keyspace,distribution,zipf_theta,read_pct,update_pct,scan_pct,scan_len,read_path,warmup_secs,measure_secs,total_ops,error_ops,ops_per_sec,p50_us,p95_us,p99_us,p999_us,elapsed_us,host,os,arch,kernel,cpu_cores,mem_total_kb,mem_available_kb";
|
||||
return "schema_version,ts_epoch_ms,engine,workload_id,mode,durability_mode,threads,key_size,value_size,prefill_"
|
||||
"keys,shared_keyspace,distribution,zipf_theta,read_pct,update_pct,scan_pct,scan_len,read_path,warmup_secs,"
|
||||
"measure_secs,total_ops,ok_ops,err_ops,ops,p50_us,p95_us,p99_us,p999_us,elapsed_us,host,os,arch,kernel,cpu_"
|
||||
"cores,mem_total_kb,mem_available_kb";
|
||||
}
|
||||
|
||||
static std::string result_row_csv(const ResultRow &r) {
|
||||
return fmt::format(
|
||||
"v2,{},{},{},{},{},{},{},{},{},{},{},{:.4},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{}",
|
||||
r.ts_epoch_ms,
|
||||
"rocksdb",
|
||||
csv_escape(r.workload_id),
|
||||
csv_escape(r.mode),
|
||||
durability_str(r.durability_mode),
|
||||
r.threads,
|
||||
r.key_size,
|
||||
r.value_size,
|
||||
r.prefill_keys,
|
||||
r.shared_keyspace,
|
||||
distribution_str(r.distribution),
|
||||
r.zipf_theta,
|
||||
r.read_pct,
|
||||
r.update_pct,
|
||||
r.scan_pct,
|
||||
r.scan_len,
|
||||
read_path_str(r.read_path),
|
||||
r.warmup_secs,
|
||||
r.measure_secs,
|
||||
r.total_ops,
|
||||
r.error_ops,
|
||||
static_cast<uint64_t>(r.ops_per_sec),
|
||||
r.quantiles.p50_us,
|
||||
r.quantiles.p95_us,
|
||||
r.quantiles.p99_us,
|
||||
r.quantiles.p999_us,
|
||||
r.elapsed_us,
|
||||
csv_escape(r.meta.host),
|
||||
csv_escape(r.meta.os),
|
||||
csv_escape(r.meta.arch),
|
||||
csv_escape(r.meta.kernel),
|
||||
r.meta.cpu_cores,
|
||||
r.meta.mem_total_kb,
|
||||
r.meta.mem_available_kb);
|
||||
return fmt::format("v2,{},{},{},{},{},{},{},{},{},{},{},{:.4},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{}"
|
||||
",{},{},{},{}",
|
||||
r.ts_epoch_ms, "rocksdb", csv_escape(r.workload_id), csv_escape(r.mode),
|
||||
durability_str(r.durability_mode), r.threads, r.key_size, r.value_size, r.prefill_keys,
|
||||
r.shared_keyspace, distribution_str(r.distribution), r.zipf_theta, r.read_pct, r.update_pct,
|
||||
r.scan_pct, r.scan_len, read_path_str(r.read_path), r.warmup_secs, r.measure_secs, r.total_ops,
|
||||
r.ok_ops, r.err_ops, static_cast<uint64_t>(r.ops), r.quantiles.p50_us, r.quantiles.p95_us,
|
||||
r.quantiles.p99_us, r.quantiles.p999_us, r.elapsed_us, csv_escape(r.meta.host),
|
||||
csv_escape(r.meta.os), csv_escape(r.meta.arch), csv_escape(r.meta.kernel), r.meta.cpu_cores,
|
||||
r.meta.mem_total_kb, r.meta.mem_available_kb);
|
||||
}
|
||||
|
||||
static bool append_result_row(const std::string &path, const ResultRow &row) {
|
||||
@ -511,12 +482,8 @@ static size_t sample_zipf_like(std::mt19937_64 &rng, size_t n, double theta) {
|
||||
return std::min(idx, n - 1);
|
||||
}
|
||||
|
||||
static std::optional<size_t> pick_key_id(std::mt19937_64 &rng,
|
||||
Distribution distribution,
|
||||
double zipf_theta,
|
||||
bool shared_keyspace,
|
||||
size_t prefill_keys,
|
||||
size_t local_key_len) {
|
||||
static std::optional<size_t> pick_key_id(std::mt19937_64 &rng, Distribution distribution, double zipf_theta,
|
||||
bool shared_keyspace, size_t prefill_keys, size_t local_key_len) {
|
||||
if (shared_keyspace) {
|
||||
if (prefill_keys == 0) {
|
||||
return std::nullopt;
|
||||
@ -569,24 +536,11 @@ static std::string find_upper_bound(const std::string &prefix) {
|
||||
return "";
|
||||
}
|
||||
|
||||
static bool run_one_op(OpKind op,
|
||||
rocksdb::OptimisticTransactionDB *db,
|
||||
rocksdb::ColumnFamilyHandle *handle,
|
||||
const rocksdb::WriteOptions &wopt,
|
||||
const std::string &value,
|
||||
std::mt19937_64 &rng,
|
||||
const WorkloadSpec &spec,
|
||||
Distribution distribution,
|
||||
double zipf_theta,
|
||||
ReadPath read_path,
|
||||
size_t key_size,
|
||||
size_t scan_len,
|
||||
bool shared_keyspace,
|
||||
size_t prefill_keys,
|
||||
size_t local_key_len,
|
||||
size_t tid,
|
||||
std::atomic<size_t> &insert_counter,
|
||||
size_t &local_insert_idx,
|
||||
static bool run_one_op(OpKind op, rocksdb::OptimisticTransactionDB *db, rocksdb::ColumnFamilyHandle *handle,
|
||||
const rocksdb::WriteOptions &wopt, const std::string &value, std::mt19937_64 &rng,
|
||||
const WorkloadSpec &spec, Distribution distribution, double zipf_theta, ReadPath read_path,
|
||||
size_t key_size, size_t scan_len, bool shared_keyspace, size_t prefill_keys,
|
||||
size_t local_key_len, size_t tid, std::atomic<size_t> &insert_counter, size_t &local_insert_idx,
|
||||
std::optional<size_t> fixed_insert_id) {
|
||||
if (op == OpKind::Read) {
|
||||
auto maybe_id = pick_key_id(rng, distribution, zipf_theta, shared_keyspace, prefill_keys, local_key_len);
|
||||
@ -594,9 +548,8 @@ static bool run_one_op(OpKind op,
|
||||
return false;
|
||||
}
|
||||
|
||||
auto key = shared_keyspace
|
||||
? make_shared_key(maybe_id.value(), key_size)
|
||||
: make_thread_key(tid, maybe_id.value(), key_size);
|
||||
auto key = shared_keyspace ? make_shared_key(maybe_id.value(), key_size)
|
||||
: make_thread_key(tid, maybe_id.value(), key_size);
|
||||
|
||||
auto ropt = rocksdb::ReadOptions();
|
||||
std::string out;
|
||||
@ -791,8 +744,8 @@ int main(int argc, char *argv[]) {
|
||||
}
|
||||
|
||||
auto prefill_keys = workload_spec.requires_prefill
|
||||
? (args.prefill_keys > 0 ? args.prefill_keys : std::max<size_t>(args.iterations, 1))
|
||||
: args.prefill_keys;
|
||||
? (args.prefill_keys > 0 ? args.prefill_keys : std::max<size_t>(args.iterations, 1))
|
||||
: args.prefill_keys;
|
||||
|
||||
if (workload_spec.requires_prefill && prefill_keys == 0) {
|
||||
fmt::println(stderr, "prefill_keys must be > 0 for read/mixed/scan workloads");
|
||||
@ -821,12 +774,11 @@ int main(int argc, char *argv[]) {
|
||||
options.create_if_missing = true;
|
||||
options.allow_concurrent_memtable_write = true;
|
||||
options.enable_pipelined_write = true;
|
||||
options.max_background_flushes = 8;
|
||||
options.env->SetBackgroundThreads(8, rocksdb::Env::Priority::HIGH);
|
||||
options.env->SetBackgroundThreads(8, rocksdb::Env::Priority::LOW);
|
||||
|
||||
auto wopt = rocksdb::WriteOptions();
|
||||
wopt.no_slowdown = true;
|
||||
// allow backpressure under heavy write load so throughput reflects completed operations
|
||||
// instead of inflating qps with fast-failed requests
|
||||
wopt.no_slowdown = false;
|
||||
wopt.sync = (durability.value() == DurabilityMode::Durable);
|
||||
|
||||
rocksdb::OptimisticTransactionDB *db = nullptr;
|
||||
@ -890,8 +842,7 @@ int main(int argc, char *argv[]) {
|
||||
auto mark_measure_start = [&measure_start_ns]() {
|
||||
uint64_t expected = 0;
|
||||
auto now_ns = steady_now_ns();
|
||||
(void) measure_start_ns.compare_exchange_strong(
|
||||
expected, now_ns, std::memory_order_relaxed);
|
||||
(void) measure_start_ns.compare_exchange_strong(expected, now_ns, std::memory_order_relaxed);
|
||||
};
|
||||
|
||||
for (size_t tid = 0; tid < args.threads; ++tid) {
|
||||
@ -913,29 +864,13 @@ int main(int argc, char *argv[]) {
|
||||
ready_barrier.arrive_and_wait();
|
||||
|
||||
if (effective_warmup_secs > 0) {
|
||||
auto warmup_deadline =
|
||||
std::chrono::steady_clock::now() + std::chrono::seconds(effective_warmup_secs);
|
||||
auto warmup_deadline = std::chrono::steady_clock::now() + std::chrono::seconds(effective_warmup_secs);
|
||||
while (std::chrono::steady_clock::now() < warmup_deadline) {
|
||||
auto op = pick_op_kind(rng, workload_spec);
|
||||
(void) run_one_op(op,
|
||||
db,
|
||||
handle,
|
||||
wopt,
|
||||
value,
|
||||
rng,
|
||||
workload_spec,
|
||||
workload_spec.distribution,
|
||||
args.zipf_theta,
|
||||
read_path.value(),
|
||||
args.key_size,
|
||||
workload_spec.scan_len,
|
||||
args.shared_keyspace,
|
||||
prefill_keys,
|
||||
local_key_len,
|
||||
tid,
|
||||
insert_counter,
|
||||
local_insert_idx,
|
||||
std::nullopt);
|
||||
(void) run_one_op(op, db, handle, wopt, value, rng, workload_spec, workload_spec.distribution,
|
||||
args.zipf_theta, read_path.value(), args.key_size, workload_spec.scan_len,
|
||||
args.shared_keyspace, prefill_keys, local_key_len, tid, insert_counter,
|
||||
local_insert_idx, std::nullopt);
|
||||
}
|
||||
}
|
||||
|
||||
@ -945,39 +880,24 @@ int main(int argc, char *argv[]) {
|
||||
auto record = [&](bool ok, uint64_t us) {
|
||||
stats.total_ops += 1;
|
||||
if (!ok) {
|
||||
stats.error_ops += 1;
|
||||
stats.err_ops += 1;
|
||||
}
|
||||
auto b = latency_bucket(us);
|
||||
stats.hist[b] += 1;
|
||||
};
|
||||
|
||||
if (effective_measure_secs > 0) {
|
||||
auto deadline =
|
||||
std::chrono::steady_clock::now() + std::chrono::seconds(effective_measure_secs);
|
||||
auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(effective_measure_secs);
|
||||
while (std::chrono::steady_clock::now() < deadline) {
|
||||
auto op = pick_op_kind(rng, workload_spec);
|
||||
auto started = std::chrono::steady_clock::now();
|
||||
auto ok = run_one_op(op,
|
||||
db,
|
||||
handle,
|
||||
wopt,
|
||||
value,
|
||||
rng,
|
||||
workload_spec,
|
||||
workload_spec.distribution,
|
||||
args.zipf_theta,
|
||||
read_path.value(),
|
||||
args.key_size,
|
||||
workload_spec.scan_len,
|
||||
args.shared_keyspace,
|
||||
prefill_keys,
|
||||
local_key_len,
|
||||
tid,
|
||||
insert_counter,
|
||||
local_insert_idx,
|
||||
std::nullopt);
|
||||
auto ok = run_one_op(op, db, handle, wopt, value, rng, workload_spec, workload_spec.distribution,
|
||||
args.zipf_theta, read_path.value(), args.key_size, workload_spec.scan_len,
|
||||
args.shared_keyspace, prefill_keys, local_key_len, tid, insert_counter,
|
||||
local_insert_idx, std::nullopt);
|
||||
auto us = static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::microseconds>(
|
||||
std::chrono::steady_clock::now() - started).count());
|
||||
std::chrono::steady_clock::now() - started)
|
||||
.count());
|
||||
record(ok, us);
|
||||
}
|
||||
} else {
|
||||
@ -985,31 +905,16 @@ int main(int argc, char *argv[]) {
|
||||
auto op = workload_spec.insert_only ? OpKind::Update : pick_op_kind(rng, workload_spec);
|
||||
std::optional<size_t> fixed_insert_id = std::nullopt;
|
||||
if (workload_spec.insert_only) {
|
||||
fixed_insert_id =
|
||||
args.shared_keyspace ? (local_op_start + i) : i;
|
||||
fixed_insert_id = args.shared_keyspace ? (local_op_start + i) : i;
|
||||
}
|
||||
auto started = std::chrono::steady_clock::now();
|
||||
auto ok = run_one_op(op,
|
||||
db,
|
||||
handle,
|
||||
wopt,
|
||||
value,
|
||||
rng,
|
||||
workload_spec,
|
||||
workload_spec.distribution,
|
||||
args.zipf_theta,
|
||||
read_path.value(),
|
||||
args.key_size,
|
||||
workload_spec.scan_len,
|
||||
args.shared_keyspace,
|
||||
prefill_keys,
|
||||
local_key_len,
|
||||
tid,
|
||||
insert_counter,
|
||||
local_insert_idx,
|
||||
fixed_insert_id);
|
||||
auto ok = run_one_op(op, db, handle, wopt, value, rng, workload_spec, workload_spec.distribution,
|
||||
args.zipf_theta, read_path.value(), args.key_size, workload_spec.scan_len,
|
||||
args.shared_keyspace, prefill_keys, local_key_len, tid, insert_counter,
|
||||
local_insert_idx, fixed_insert_id);
|
||||
auto us = static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::microseconds>(
|
||||
std::chrono::steady_clock::now() - started).count());
|
||||
std::chrono::steady_clock::now() - started)
|
||||
.count());
|
||||
record(ok, us);
|
||||
}
|
||||
}
|
||||
@ -1031,20 +936,20 @@ int main(int argc, char *argv[]) {
|
||||
}
|
||||
uint64_t elapsed_us = (measure_end_ns - measure_begin_ns) / 1000;
|
||||
uint64_t total_ops = 0;
|
||||
uint64_t error_ops = 0;
|
||||
uint64_t err_ops = 0;
|
||||
std::array<uint64_t, kLatencyBuckets> merged_hist{};
|
||||
|
||||
for (const auto &s: thread_stats) {
|
||||
total_ops += s.total_ops;
|
||||
error_ops += s.error_ops;
|
||||
err_ops += s.err_ops;
|
||||
for (size_t i = 0; i < merged_hist.size(); ++i) {
|
||||
merged_hist[i] += s.hist[i];
|
||||
}
|
||||
}
|
||||
|
||||
auto ops_per_sec = elapsed_us == 0
|
||||
? 0.0
|
||||
: (static_cast<double>(total_ops) * 1'000'000.0 / static_cast<double>(elapsed_us));
|
||||
auto ops = elapsed_us == 0 ? 0.0 : (static_cast<double>(total_ops) * 1'000'000.0 / static_cast<double>(elapsed_us));
|
||||
|
||||
uint64_t ok_ops = total_ops >= err_ops ? (total_ops - err_ops) : 0;
|
||||
|
||||
auto row = ResultRow{
|
||||
.ts_epoch_ms = now_epoch_ms(),
|
||||
@ -1066,14 +971,16 @@ int main(int argc, char *argv[]) {
|
||||
.warmup_secs = effective_warmup_secs,
|
||||
.measure_secs = effective_measure_secs,
|
||||
.total_ops = total_ops,
|
||||
.error_ops = error_ops,
|
||||
.ops_per_sec = ops_per_sec,
|
||||
.quantiles = Quantiles{
|
||||
.p50_us = histogram_quantile_us(merged_hist, 0.50),
|
||||
.p95_us = histogram_quantile_us(merged_hist, 0.95),
|
||||
.p99_us = histogram_quantile_us(merged_hist, 0.99),
|
||||
.p999_us = histogram_quantile_us(merged_hist, 0.999),
|
||||
},
|
||||
.ok_ops = ok_ops,
|
||||
.err_ops = err_ops,
|
||||
.ops = ops,
|
||||
.quantiles =
|
||||
Quantiles{
|
||||
.p50_us = histogram_quantile_us(merged_hist, 0.50),
|
||||
.p95_us = histogram_quantile_us(merged_hist, 0.95),
|
||||
.p99_us = histogram_quantile_us(merged_hist, 0.99),
|
||||
.p999_us = histogram_quantile_us(merged_hist, 0.999),
|
||||
},
|
||||
.elapsed_us = elapsed_us,
|
||||
.meta = gather_machine_meta(),
|
||||
};
|
||||
@ -1085,15 +992,8 @@ int main(int argc, char *argv[]) {
|
||||
|
||||
fmt::println(
|
||||
"engine=rocksdb workload={} mode={} durability={} threads={} ops={} err={} qps={} p99_us={} result_file={}",
|
||||
row.workload_id,
|
||||
row.mode,
|
||||
durability_str(row.durability_mode),
|
||||
row.threads,
|
||||
row.total_ops,
|
||||
row.error_ops,
|
||||
static_cast<uint64_t>(row.ops_per_sec),
|
||||
row.quantiles.p99_us,
|
||||
args.result_file);
|
||||
row.workload_id, row.mode, durability_str(row.durability_mode), row.threads, row.total_ops, row.err_ops,
|
||||
static_cast<uint64_t>(row.ops), row.quantiles.p99_us, args.result_file);
|
||||
|
||||
delete handle;
|
||||
delete db;
|
||||
|
||||
@ -18,7 +18,7 @@ def main() -> int:
|
||||
parser.add_argument(
|
||||
"--filter-errors",
|
||||
action="store_true",
|
||||
help="Only compare rows with error_ops == 0 (default: include all rows)",
|
||||
help="Only compare rows with err_ops == 0 (default: include all rows)",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
@ -32,9 +32,9 @@ def main() -> int:
|
||||
"value_size",
|
||||
"durability_mode",
|
||||
"read_path",
|
||||
"ops_per_sec",
|
||||
"ops",
|
||||
"p99_us",
|
||||
"error_ops",
|
||||
"err_ops",
|
||||
}
|
||||
missing = required - set(df.columns)
|
||||
if missing:
|
||||
@ -50,45 +50,45 @@ def main() -> int:
|
||||
]
|
||||
|
||||
if args.filter_errors:
|
||||
base = df[df["error_ops"] == 0].copy()
|
||||
base = df[df["err_ops"] == 0].copy()
|
||||
else:
|
||||
base = df.copy()
|
||||
|
||||
if base.empty:
|
||||
if args.filter_errors:
|
||||
print("No rows with error_ops == 0, cannot compare.")
|
||||
print("No rows with err_ops == 0, cannot compare.")
|
||||
else:
|
||||
print("No rows found in csv, cannot compare.")
|
||||
return 0
|
||||
|
||||
agg = base.groupby(keys + ["engine"], as_index=False).agg(
|
||||
ops_per_sec=("ops_per_sec", "median"),
|
||||
ops=("ops", "median"),
|
||||
p99_us=("p99_us", "median"),
|
||||
error_ops=("error_ops", "median"),
|
||||
err_ops=("err_ops", "median"),
|
||||
)
|
||||
|
||||
piv = agg.pivot_table(
|
||||
index=keys,
|
||||
columns="engine",
|
||||
values=["ops_per_sec", "p99_us", "error_ops"],
|
||||
values=["ops", "p99_us", "err_ops"],
|
||||
aggfunc="first",
|
||||
)
|
||||
piv.columns = [f"{metric}_{engine}" for metric, engine in piv.columns]
|
||||
out = piv.reset_index()
|
||||
|
||||
for col in [
|
||||
"ops_per_sec_mace",
|
||||
"ops_per_sec_rocksdb",
|
||||
"ops_mace",
|
||||
"ops_rocksdb",
|
||||
"p99_us_mace",
|
||||
"p99_us_rocksdb",
|
||||
"error_ops_mace",
|
||||
"error_ops_rocksdb",
|
||||
"err_ops_mace",
|
||||
"err_ops_rocksdb",
|
||||
]:
|
||||
if col not in out.columns:
|
||||
out[col] = pd.NA
|
||||
|
||||
out["qps_ratio_mace_over_rocksdb"] = (
|
||||
out["ops_per_sec_mace"] / out["ops_per_sec_rocksdb"]
|
||||
out["ops_mace"] / out["ops_rocksdb"]
|
||||
)
|
||||
out["p99_ratio_mace_over_rocksdb"] = out["p99_us_mace"] / out["p99_us_rocksdb"]
|
||||
out = out.sort_values(keys)
|
||||
|
||||
@ -180,7 +180,7 @@ def plot_results(
|
||||
thread_points: Sequence[int],
|
||||
) -> list[Path]:
|
||||
df = pd.read_csv(result_csv)
|
||||
required = {"engine", "mode", "threads", "key_size", "value_size", "ops_per_sec"}
|
||||
required = {"engine", "mode", "threads", "key_size", "value_size", "ops"}
|
||||
missing = required - set(df.columns)
|
||||
if missing:
|
||||
raise ValueError(f"Missing required columns in csv: {sorted(missing)}")
|
||||
@ -191,7 +191,7 @@ def plot_results(
|
||||
|
||||
grouped = (
|
||||
df.groupby(["engine", "mode", "key_size", "value_size", "threads"], as_index=False)[
|
||||
"ops_per_sec"
|
||||
"ops"
|
||||
]
|
||||
.mean()
|
||||
.sort_values(["engine", "mode", "key_size", "value_size", "threads"])
|
||||
@ -209,7 +209,7 @@ def plot_results(
|
||||
continue
|
||||
|
||||
plt.figure(figsize=(16, 10))
|
||||
y_max = float(mode_df["ops_per_sec"].max()) if not mode_df.empty else 0.0
|
||||
y_max = float(mode_df["ops"].max()) if not mode_df.empty else 0.0
|
||||
|
||||
for engine in ENGINE_ORDER:
|
||||
for key_size, value_size in KV_PROFILES:
|
||||
@ -222,7 +222,7 @@ def plot_results(
|
||||
continue
|
||||
|
||||
x = sub["threads"].tolist()
|
||||
y = sub["ops_per_sec"].tolist()
|
||||
y = sub["ops"].tolist()
|
||||
label = (
|
||||
f"{engine} ({format_bytes(key_size)}/{format_bytes(value_size)})"
|
||||
)
|
||||
|
||||
@ -25,7 +25,7 @@ def main() -> int:
|
||||
"key_size",
|
||||
"value_size",
|
||||
"threads",
|
||||
"ops_per_sec",
|
||||
"ops",
|
||||
"p99_us",
|
||||
}
|
||||
missing = needed - set(df.columns)
|
||||
@ -41,10 +41,10 @@ def main() -> int:
|
||||
agg = (
|
||||
sub.groupby(grp_cols)
|
||||
.agg(
|
||||
repeats=("ops_per_sec", "count"),
|
||||
throughput_cv=("ops_per_sec", cv),
|
||||
repeats=("ops", "count"),
|
||||
throughput_cv=("ops", cv),
|
||||
p99_cv=("p99_us", cv),
|
||||
throughput_median=("ops_per_sec", "median"),
|
||||
throughput_median=("ops", "median"),
|
||||
p99_median=("p99_us", "median"),
|
||||
)
|
||||
.reset_index()
|
||||
|
||||
@ -32,7 +32,7 @@ def main() -> int:
|
||||
"value_size",
|
||||
"prefill_keys",
|
||||
"threads",
|
||||
"ops_per_sec",
|
||||
"ops",
|
||||
"p95_us",
|
||||
"p99_us",
|
||||
}
|
||||
@ -51,8 +51,8 @@ def main() -> int:
|
||||
summary = (
|
||||
sub.groupby(grp_cols)
|
||||
.agg(
|
||||
repeats=("ops_per_sec", "count"),
|
||||
throughput_median=("ops_per_sec", "median"),
|
||||
repeats=("ops", "count"),
|
||||
throughput_median=("ops", "median"),
|
||||
p95_median=("p95_us", "median"),
|
||||
p99_median=("p99_us", "median"),
|
||||
)
|
||||
|
||||
@ -17,7 +17,7 @@ def main() -> int:
|
||||
"workload_id",
|
||||
"threads",
|
||||
"durability_mode",
|
||||
"ops_per_sec",
|
||||
"ops",
|
||||
"p99_us",
|
||||
}
|
||||
missing = needed - set(df.columns)
|
||||
@ -39,8 +39,8 @@ def main() -> int:
|
||||
base = (
|
||||
sub.groupby(["engine", "workload_id", "threads", "durability_mode"])
|
||||
.agg(
|
||||
repeats=("ops_per_sec", "count"),
|
||||
throughput_median=("ops_per_sec", "median"),
|
||||
repeats=("ops", "count"),
|
||||
throughput_median=("ops", "median"),
|
||||
p99_median=("p99_us", "median"),
|
||||
)
|
||||
.reset_index()
|
||||
|
||||
@ -24,7 +24,7 @@ def main() -> int:
|
||||
"threads",
|
||||
"key_size",
|
||||
"value_size",
|
||||
"ops_per_sec",
|
||||
"ops",
|
||||
"p99_us",
|
||||
}
|
||||
missing = required - set(df.columns)
|
||||
@ -48,7 +48,7 @@ def main() -> int:
|
||||
if sub.empty:
|
||||
continue
|
||||
|
||||
for metric, ylabel in (("ops_per_sec", "OPS/s"), ("p99_us", "P99 Latency (us)")):
|
||||
for metric, ylabel in (("ops", "OPS/s"), ("p99_us", "P99 Latency (us)")):
|
||||
plt.figure(figsize=(12, 7))
|
||||
for workload in sorted(sub["workload_id"].unique()):
|
||||
wdf = sub[sub["workload_id"] == workload].sort_values("threads")
|
||||
|
||||
53
src/main.rs
53
src/main.rs
@ -218,8 +218,9 @@ struct ResultRow {
|
||||
warmup_secs: u64,
|
||||
measure_secs: u64,
|
||||
total_ops: u64,
|
||||
error_ops: u64,
|
||||
ops_per_sec: f64,
|
||||
ok_ops: u64,
|
||||
err_ops: u64,
|
||||
ops: f64,
|
||||
quantiles: Quantiles,
|
||||
elapsed_us: u64,
|
||||
meta: MachineMeta,
|
||||
@ -228,7 +229,7 @@ struct ResultRow {
|
||||
#[derive(Clone, Debug)]
|
||||
struct ThreadStats {
|
||||
total_ops: u64,
|
||||
error_ops: u64,
|
||||
err_ops: u64,
|
||||
hist: [u64; LAT_BUCKETS],
|
||||
}
|
||||
|
||||
@ -236,7 +237,7 @@ impl Default for ThreadStats {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
total_ops: 0,
|
||||
error_ops: 0,
|
||||
err_ops: 0,
|
||||
hist: [0; LAT_BUCKETS],
|
||||
}
|
||||
}
|
||||
@ -487,12 +488,12 @@ fn csv_escape(raw: &str) -> String {
|
||||
}
|
||||
|
||||
fn result_header() -> &'static str {
|
||||
"schema_version,ts_epoch_ms,engine,workload_id,mode,durability_mode,threads,key_size,value_size,prefill_keys,shared_keyspace,distribution,zipf_theta,read_pct,update_pct,scan_pct,scan_len,read_path,warmup_secs,measure_secs,total_ops,error_ops,ops_per_sec,p50_us,p95_us,p99_us,p999_us,elapsed_us,host,os,arch,kernel,cpu_cores,mem_total_kb,mem_available_kb"
|
||||
"schema_version,ts_epoch_ms,engine,workload_id,mode,durability_mode,threads,key_size,value_size,prefill_keys,shared_keyspace,distribution,zipf_theta,read_pct,update_pct,scan_pct,scan_len,read_path,warmup_secs,measure_secs,total_ops,ok_ops,err_ops,ops,p50_us,p95_us,p99_us,p999_us,elapsed_us,host,os,arch,kernel,cpu_cores,mem_total_kb,mem_available_kb"
|
||||
}
|
||||
|
||||
fn result_row_csv(row: &ResultRow) -> String {
|
||||
format!(
|
||||
"v2,{},{},{},{},{},{},{},{},{},{},{},{:.4},{},{},{},{},{},{},{},{},{},{:.3},{},{},{},{},{},{},{},{},{},{},{},{}",
|
||||
"v2,{},{},{},{},{},{},{},{},{},{},{},{:.4},{},{},{},{},{},{},{},{},{},{},{:.3},{},{},{},{},{},{},{},{},{},{},{},{}",
|
||||
row.ts_epoch_ms,
|
||||
row.engine,
|
||||
csv_escape(&row.workload_id),
|
||||
@ -513,8 +514,9 @@ fn result_row_csv(row: &ResultRow) -> String {
|
||||
row.warmup_secs,
|
||||
row.measure_secs,
|
||||
row.total_ops,
|
||||
row.error_ops,
|
||||
row.ops_per_sec,
|
||||
row.ok_ops,
|
||||
row.err_ops,
|
||||
row.ops,
|
||||
row.quantiles.p50_us,
|
||||
row.quantiles.p95_us,
|
||||
row.quantiles.p99_us,
|
||||
@ -712,7 +714,12 @@ fn main() {
|
||||
opt.cache_capacity = 3 << 30;
|
||||
opt.data_file_size = 64 << 20;
|
||||
opt.max_log_size = 1 << 30;
|
||||
opt.wal_buffer_size = 64 << 20;
|
||||
opt.wal_file_size = 128 << 20;
|
||||
opt.default_arenas = 128;
|
||||
opt.gc_timeout = 600 * 1000;
|
||||
opt.gc_eager = false;
|
||||
opt.data_garbage_ratio = 50;
|
||||
opt.tmp_store = cleanup;
|
||||
|
||||
let db = Mace::new(opt.validate().unwrap()).unwrap();
|
||||
@ -743,7 +750,7 @@ fn main() {
|
||||
} else {
|
||||
make_thread_key(tid, i, key_size)
|
||||
};
|
||||
tx.put(key.as_slice(), v.as_slice()).unwrap();
|
||||
tx.upsert(key.as_slice(), v.as_slice()).unwrap();
|
||||
in_batch += 1;
|
||||
if in_batch >= PREFILL_BATCH {
|
||||
tx.commit().unwrap();
|
||||
@ -926,12 +933,12 @@ fn main() {
|
||||
|
||||
let mut merged_hist = [0u64; LAT_BUCKETS];
|
||||
let mut total_ops = 0u64;
|
||||
let mut error_ops = 0u64;
|
||||
let mut err_ops = 0u64;
|
||||
|
||||
for h in handles {
|
||||
let s = h.join().unwrap();
|
||||
total_ops += s.total_ops;
|
||||
error_ops += s.error_ops;
|
||||
err_ops += s.err_ops;
|
||||
for (i, v) in s.hist.iter().enumerate() {
|
||||
merged_hist[i] += *v;
|
||||
}
|
||||
@ -940,12 +947,14 @@ fn main() {
|
||||
let measure_end = Instant::now();
|
||||
let measure_started = (*measure_start.lock().unwrap()).unwrap_or(measure_end);
|
||||
let elapsed_us = measure_end.duration_since(measure_started).as_micros() as u64;
|
||||
let ops_per_sec = if elapsed_us == 0 {
|
||||
let ops = if elapsed_us == 0 {
|
||||
0.0
|
||||
} else {
|
||||
(total_ops as f64) * 1_000_000.0 / (elapsed_us as f64)
|
||||
};
|
||||
|
||||
let ok_ops = total_ops.saturating_sub(err_ops);
|
||||
|
||||
let quantiles = Quantiles {
|
||||
p50_us: histogram_quantile_us(&merged_hist, 0.50),
|
||||
p95_us: histogram_quantile_us(&merged_hist, 0.95),
|
||||
@ -974,8 +983,9 @@ fn main() {
|
||||
warmup_secs: effective_warmup_secs,
|
||||
measure_secs: effective_measure_secs,
|
||||
total_ops,
|
||||
error_ops,
|
||||
ops_per_sec,
|
||||
ok_ops,
|
||||
err_ops,
|
||||
ops,
|
||||
quantiles,
|
||||
elapsed_us,
|
||||
meta: gather_machine_meta(),
|
||||
@ -987,14 +997,15 @@ fn main() {
|
||||
}
|
||||
|
||||
println!(
|
||||
"engine=mace workload={} mode={} durability={} threads={} ops={} err={} qps={:.2} p99_us={} result_file={}",
|
||||
"engine=mace workload={} mode={} durability={} threads={} total_ops={} ok_ops={} err_ops={} ops={:.2} p99_us={} result_file={}",
|
||||
row.workload_id,
|
||||
row.mode,
|
||||
row.durability_mode.as_str(),
|
||||
row.threads,
|
||||
row.total_ops,
|
||||
row.error_ops,
|
||||
row.ops_per_sec,
|
||||
row.ok_ops,
|
||||
row.err_ops,
|
||||
row.ops,
|
||||
row.quantiles.p99_us,
|
||||
args.result_file
|
||||
);
|
||||
@ -1099,11 +1110,7 @@ fn run_one_op(
|
||||
|
||||
if let Some(key) = key_opt {
|
||||
if let Ok(tx) = bucket.begin() {
|
||||
let write_ok = if spec.insert_only {
|
||||
tx.put(key.as_slice(), value.as_slice()).is_ok()
|
||||
} else {
|
||||
tx.upsert(key.as_slice(), value.as_slice()).is_ok()
|
||||
};
|
||||
let write_ok = tx.upsert(key.as_slice(), value.as_slice()).is_ok();
|
||||
if !write_ok {
|
||||
false
|
||||
} else {
|
||||
@ -1163,7 +1170,7 @@ fn run_one_op(
|
||||
if let Some(stats) = stats {
|
||||
stats.total_ops += 1;
|
||||
if !ok {
|
||||
stats.error_ops += 1;
|
||||
stats.err_ops += 1;
|
||||
}
|
||||
if let Some(start) = start {
|
||||
let us = start.elapsed().as_micros() as u64;
|
||||
|
||||
Loading…
Reference in New Issue
Block a user