diff --git a/rocksdb/main.cpp b/rocksdb/main.cpp index 7d873e2..0df0aed 100644 --- a/rocksdb/main.cpp +++ b/rocksdb/main.cpp @@ -374,6 +374,12 @@ static uint64_t now_epoch_ms() { return static_cast(ms.count()); } +static uint64_t steady_now_ns() { + auto now = std::chrono::steady_clock::now(); + auto ns = std::chrono::duration_cast(now.time_since_epoch()); + return static_cast(ns.count()); +} + static uint64_t read_mem_kb(const char *key) { std::ifstream in("/proc/meminfo"); if (!in.is_open()) { @@ -433,7 +439,7 @@ static const char *result_header() { static std::string result_row_csv(const ResultRow &r) { return fmt::format( - "v2,{},{},{},{},{},{},{},{},{},{},{},{:.4},{},{},{},{},{},{},{},{},{},{:.3},{},{},{},{},{},{},{},{},{},{},{},{}", + "v2,{},{},{},{},{},{},{},{},{},{},{},{:.4},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{}", r.ts_epoch_ms, "rocksdb", csv_escape(r.workload_id), @@ -455,7 +461,7 @@ static std::string result_row_csv(const ResultRow &r) { r.measure_secs, r.total_ops, r.error_ops, - r.ops_per_sec, + static_cast(r.ops_per_sec), r.quantiles.p50_us, r.quantiles.p95_us, r.quantiles.p99_us, @@ -570,7 +576,8 @@ static bool run_one_op(OpKind op, size_t local_key_len, size_t tid, std::atomic &insert_counter, - size_t &local_insert_idx) { + size_t &local_insert_idx, + std::optional fixed_insert_id) { if (op == OpKind::Read) { auto maybe_id = pick_key_id(rng, distribution, zipf_theta, shared_keyspace, prefill_keys, local_key_len); if (!maybe_id.has_value()) { @@ -599,7 +606,14 @@ static bool run_one_op(OpKind op, if (op == OpKind::Update) { std::optional key; if (spec.insert_only) { - if (shared_keyspace) { + if (fixed_insert_id.has_value()) { + auto id = fixed_insert_id.value(); + if (shared_keyspace) { + key = make_shared_key(id, key_size); + } else { + key = make_thread_key(tid, id, key_size); + } + } else if (shared_keyspace) { auto id = insert_counter.fetch_add(1, std::memory_order_relaxed); key = make_shared_key(id, key_size); } else { @@ -762,6 +776,14 @@ int main(int argc, char *argv[]) { return 1; } auto workload_spec = workload_spec_opt.value(); + auto legacy_mode = workload_spec.id.rfind("LEGACY_", 0) == 0; + auto effective_warmup_secs = legacy_mode ? 0ULL : args.warmup_secs; + auto effective_measure_secs = legacy_mode ? 0ULL : args.measure_secs; + auto mixed_workload = workload_spec.read_pct > 0 && workload_spec.update_pct > 0; + if (mixed_workload && !args.shared_keyspace) { + fmt::println(stderr, "mixed workloads require shared keyspace"); + return 1; + } auto prefill_keys = workload_spec.requires_prefill ? (args.prefill_keys > 0 ? args.prefill_keys : std::max(args.iterations, 1)) @@ -853,11 +875,18 @@ int main(int argc, char *argv[]) { std::barrier measure_barrier(static_cast(args.threads + 1)); std::atomic insert_counter{0}; + std::atomic measure_start_ns{0}; std::vector workers; workers.reserve(args.threads); std::vector thread_stats(args.threads); auto seed_base = now_epoch_ms(); + auto mark_measure_start = [&measure_start_ns]() { + uint64_t expected = 0; + auto now_ns = steady_now_ns(); + (void) measure_start_ns.compare_exchange_strong( + expected, now_ns, std::memory_order_relaxed); + }; for (size_t tid = 0; tid < args.threads; ++tid) { workers.emplace_back([&, tid] { @@ -865,6 +894,7 @@ int main(int argc, char *argv[]) { auto &stats = thread_stats[tid]; std::mt19937_64 rng(seed_base ^ ((tid + 1) * 0x9E3779B97F4A7C15ULL)); auto local_key_len = prefill_ranges[tid].len; + auto local_op_start = op_ranges[tid].start; auto local_op_len = op_ranges[tid].len; size_t local_insert_idx = 0; @@ -876,8 +906,9 @@ int main(int argc, char *argv[]) { ready_barrier.arrive_and_wait(); - if (args.warmup_secs > 0) { - auto warmup_deadline = std::chrono::steady_clock::now() + std::chrono::seconds(args.warmup_secs); + if (effective_warmup_secs > 0) { + auto warmup_deadline = + std::chrono::steady_clock::now() + std::chrono::seconds(effective_warmup_secs); while (std::chrono::steady_clock::now() < warmup_deadline) { auto op = pick_op_kind(rng, workload_spec); (void) run_one_op(op, @@ -897,11 +928,13 @@ int main(int argc, char *argv[]) { local_key_len, tid, insert_counter, - local_insert_idx); + local_insert_idx, + std::nullopt); } } measure_barrier.arrive_and_wait(); + mark_measure_start(); auto record = [&](bool ok, uint64_t us) { stats.total_ops += 1; @@ -912,8 +945,9 @@ int main(int argc, char *argv[]) { stats.hist[b] += 1; }; - if (args.measure_secs > 0) { - auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(args.measure_secs); + if (effective_measure_secs > 0) { + auto deadline = + std::chrono::steady_clock::now() + std::chrono::seconds(effective_measure_secs); while (std::chrono::steady_clock::now() < deadline) { auto op = pick_op_kind(rng, workload_spec); auto started = std::chrono::steady_clock::now(); @@ -934,15 +968,20 @@ int main(int argc, char *argv[]) { local_key_len, tid, insert_counter, - local_insert_idx); + local_insert_idx, + std::nullopt); auto us = static_cast(std::chrono::duration_cast( std::chrono::steady_clock::now() - started).count()); record(ok, us); } } else { for (size_t i: count_indices) { - (void) i; auto op = workload_spec.insert_only ? OpKind::Update : pick_op_kind(rng, workload_spec); + std::optional fixed_insert_id = std::nullopt; + if (workload_spec.insert_only) { + fixed_insert_id = + args.shared_keyspace ? (local_op_start + i) : i; + } auto started = std::chrono::steady_clock::now(); auto ok = run_one_op(op, db, @@ -961,7 +1000,8 @@ int main(int argc, char *argv[]) { local_key_len, tid, insert_counter, - local_insert_idx); + local_insert_idx, + fixed_insert_id); auto us = static_cast(std::chrono::duration_cast( std::chrono::steady_clock::now() - started).count()); record(ok, us); @@ -972,13 +1012,18 @@ int main(int argc, char *argv[]) { ready_barrier.arrive_and_wait(); measure_barrier.arrive_and_wait(); - auto measure_started = nm::Instant::now(); + mark_measure_start(); for (auto &w: workers) { w.join(); } - uint64_t elapsed_us = static_cast(measure_started.elapse_usec()); + auto measure_end_ns = steady_now_ns(); + auto measure_begin_ns = measure_start_ns.load(std::memory_order_relaxed); + if (measure_begin_ns == 0 || measure_end_ns < measure_begin_ns) { + measure_begin_ns = measure_end_ns; + } + uint64_t elapsed_us = (measure_end_ns - measure_begin_ns) / 1000; uint64_t total_ops = 0; uint64_t error_ops = 0; std::array merged_hist{}; @@ -1012,8 +1057,8 @@ int main(int argc, char *argv[]) { .scan_pct = workload_spec.scan_pct, .scan_len = workload_spec.scan_len, .read_path = read_path.value(), - .warmup_secs = args.warmup_secs, - .measure_secs = args.measure_secs, + .warmup_secs = effective_warmup_secs, + .measure_secs = effective_measure_secs, .total_ops = total_ops, .error_ops = error_ops, .ops_per_sec = ops_per_sec, @@ -1033,14 +1078,14 @@ int main(int argc, char *argv[]) { } fmt::println( - "engine=rocksdb workload={} mode={} durability={} threads={} ops={} err={} qps={:.2} p99_us={} result_file={}", + "engine=rocksdb workload={} mode={} durability={} threads={} ops={} err={} qps={} p99_us={} result_file={}", row.workload_id, row.mode, durability_str(row.durability_mode), row.threads, row.total_ops, row.error_ops, - row.ops_per_sec, + static_cast(row.ops_per_sec), row.quantiles.p99_us, args.result_file); diff --git a/scripts/fast_test.py b/scripts/fast_test.py new file mode 100644 index 0000000..942b8a2 --- /dev/null +++ b/scripts/fast_test.py @@ -0,0 +1,361 @@ +#!/usr/bin/env python3 + +from __future__ import annotations + +import argparse +import os +import subprocess +import sys +import uuid +from pathlib import Path +from typing import Sequence + +def ensure_venv_python() -> None: + script_path = Path(__file__).resolve() + script_dir = script_path.parent + venv_python = script_dir / "bin" / "python" + if not venv_python.exists(): + return + + current = Path(sys.executable).absolute() + target = venv_python.absolute() + if current == target: + return + if os.environ.get("FAST_TEST_REEXEC") == "1": + return + + env = os.environ.copy() + env["FAST_TEST_REEXEC"] = "1" + os.execve(str(target), [str(target), str(script_path), *sys.argv[1:]], env) + + +ensure_venv_python() + +import matplotlib.pyplot as plt +import pandas as pd + +ENGINE_ORDER = ("mace", "rocksdb") +MODE_PLAN = ( + ("put", "insert"), + ("get", "get"), + ("mixed", "mixed"), + ("scan", "scan"), +) +KV_PROFILES = ( + (16, 128), + (32, 1024), + (32, 4096), + (32, 16384), +) +LINE_STYLES = {"mace": "-", "rocksdb": ":"} + + +def detect_logical_cpus() -> int: + count = os.cpu_count() or 1 + return max(1, int(count)) + + +def power_of_two_threads(cpu_count: int) -> list[int]: + points: list[int] = [] + t = 1 + while t <= cpu_count: + points.append(t) + t *= 2 + return points + + +def format_bytes(size: int) -> str: + if size >= 1024 and size % 1024 == 0: + return f"{size // 1024}KB" + return f"{size}B" + + +def repo_root_from_script(script_dir: Path) -> Path: + return script_dir.parent + + +def build_binaries(repo_root: Path) -> None: + subprocess.run( + ["cargo", "build", "--release", "--manifest-path", str(repo_root / "Cargo.toml")], + check=True, + ) + subprocess.run(["cmake", "--preset", "release"], check=True, cwd=repo_root / "rocksdb") + subprocess.run( + ["cmake", "--build", "--preset", "release"], check=True, cwd=repo_root / "rocksdb" + ) + + +def run_engine_cases( + *, + engine: str, + bench_bin: Path, + engine_storage_root: Path, + result_csv: Path, + thread_points: Sequence[int], + warmup_secs: int, + measure_secs: int, + iterations: int, + prefill_keys: int, + read_path: str, + durability: str, + insert_ratio: int, +) -> None: + for mode_display, mode_cli in MODE_PLAN: + for threads in thread_points: + for key_size, value_size in KV_PROFILES: + run_path = engine_storage_root / ( + f"{engine}_{mode_display}_t{threads}_k{key_size}_v{value_size}_" + f"{uuid.uuid4().hex[:8]}" + ) + args = [ + str(bench_bin), + "--path", + str(run_path), + "--mode", + mode_cli, + "--threads", + str(threads), + "--key-size", + str(key_size), + "--value-size", + str(value_size), + "--iterations", + str(iterations), + "--warmup-secs", + str(warmup_secs), + "--measure-secs", + str(measure_secs), + "--read-path", + read_path, + "--durability", + durability, + "--result-file", + str(result_csv), + ] + + if mode_cli != "insert": + args.extend(["--prefill-keys", str(prefill_keys)]) + if mode_cli == "mixed": + args.extend(["--insert-ratio", str(insert_ratio)]) + if engine == "mace": + args.append("--shared-keyspace") + + print( + f"[run] engine={engine} mode={mode_display} " + f"threads={threads} key={key_size} value={value_size}" + ) + print(f"{' '.join(args)}") + subprocess.run(args, check=True) + + +def annotate_points(x_values: Sequence[int], y_values: Sequence[float], y_max: float, color: str) -> None: + if y_max <= 0: + return + y_offset = 0.035 * y_max + for x, y in zip(x_values, y_values): + top_half = y >= y_max * 0.9 + y_pos = y - y_offset if top_half else y + y_offset + va = "top" if top_half else "bottom" + plt.text( + x, + y_pos, + f"{y:.0f}", + fontsize=8, + ha="center", + va=va, + color=color, + bbox={ + "facecolor": "white", + "alpha": 0.75, + "edgecolor": "none", + "boxstyle": "round,pad=0.2", + }, + ) + + +def mode_title(mode_display: str, insert_ratio: int) -> str: + if mode_display == "mixed": + return f"Mixed ({100 - insert_ratio}% Get, {insert_ratio}% Put)" + return mode_display.capitalize() + + +def plot_results( + *, + result_csv: Path, + output_dir: Path, + thread_points: Sequence[int], + insert_ratio: int, +) -> list[Path]: + df = pd.read_csv(result_csv) + required = {"engine", "mode", "threads", "key_size", "value_size", "ops_per_sec"} + missing = required - set(df.columns) + if missing: + raise ValueError(f"Missing required columns in csv: {sorted(missing)}") + + df = df[df["engine"].isin(ENGINE_ORDER) & df["mode"].isin([x[1] for x in MODE_PLAN])] + if df.empty: + raise ValueError("No legacy rows found in csv for mace/rocksdb") + + grouped = ( + df.groupby(["engine", "mode", "key_size", "value_size", "threads"], as_index=False)[ + "ops_per_sec" + ] + .mean() + .sort_values(["engine", "mode", "key_size", "value_size", "threads"]) + ) + + color_list = plt.get_cmap("tab10").colors + profile_colors = { + profile: color_list[idx % len(color_list)] for idx, profile in enumerate(KV_PROFILES) + } + + output_paths: list[Path] = [] + for mode_display, mode_cli in MODE_PLAN: + mode_df = grouped[grouped["mode"] == mode_cli] + if mode_df.empty: + continue + + plt.figure(figsize=(16, 10)) + y_max = float(mode_df["ops_per_sec"].max()) if not mode_df.empty else 0.0 + + for engine in ENGINE_ORDER: + for key_size, value_size in KV_PROFILES: + sub = mode_df[ + (mode_df["engine"] == engine) + & (mode_df["key_size"] == key_size) + & (mode_df["value_size"] == value_size) + ].sort_values("threads") + if sub.empty: + continue + + x = sub["threads"].tolist() + y = sub["ops_per_sec"].tolist() + label = ( + f"{engine} ({format_bytes(key_size)}/{format_bytes(value_size)})" + ) + line_color = profile_colors[(key_size, value_size)] + plt.plot( + x, + y, + label=label, + linestyle=LINE_STYLES.get(engine, "-"), + marker="o", + markersize=6, + linewidth=2, + color=line_color, + ) + annotate_points(x, y, y_max, line_color) + + plt.title(mode_title(mode_display, insert_ratio), fontsize=16) + plt.xlabel("Threads", fontsize=14) + plt.ylabel("OPS/s", fontsize=14) + plt.xticks(list(thread_points), fontsize=12) + plt.yticks(fontsize=12) + plt.grid(True, linestyle="--", alpha=0.4) + plt.legend(fontsize=10) + plt.tight_layout() + + out = output_dir / f"fast_test_{mode_display}_ops.png" + plt.savefig(out, dpi=260, bbox_inches="tight") + plt.close() + output_paths.append(out) + + return output_paths + + +def parse_args(argv: Sequence[str]) -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Run legacy fast benchmark for mace + rocksdb, then plot mode-level OPS charts." + ) + parser.add_argument("storage_root", type=Path, help="Root directory for benchmark temporary data") + parser.add_argument("--warmup-secs", type=int, default=3) + parser.add_argument("--measure-secs", type=int, default=10) + parser.add_argument("--iterations", type=int, default=500_000) + parser.add_argument("--prefill-keys", type=int, default=500_000) + parser.add_argument("--insert-ratio", type=int, default=30) + parser.add_argument("--read-path", choices=("snapshot", "rw_txn"), default="snapshot") + parser.add_argument("--durability", choices=("relaxed", "durable"), default="relaxed") + parser.add_argument("--csv-name", default="fast_test_results.csv") + parser.add_argument("--skip-build", action="store_true", help="Skip cargo/cmake build") + parser.add_argument("--plot-only", action="store_true", help="Skip run and only plot existing csv") + parser.add_argument("--append", action="store_true", help="Append to existing csv instead of overwrite") + return parser.parse_args(argv) + + +def main(argv: Sequence[str]) -> int: + args = parse_args(argv) + + script_dir = Path(__file__).resolve().parent + repo_root = repo_root_from_script(script_dir) + result_csv = script_dir / args.csv_name + + cpu_count = detect_logical_cpus() + thread_points = power_of_two_threads(cpu_count) + print(f"[info] detected logical cpus={cpu_count}, threads={thread_points}") + print(f"[info] csv={result_csv}") + + if not args.plot_only: + if not args.append and result_csv.exists(): + result_csv.unlink() + + args.storage_root.mkdir(parents=True, exist_ok=True) + mace_root = args.storage_root / "mace" + rocksdb_root = args.storage_root / "rocksdb" + mace_root.mkdir(parents=True, exist_ok=True) + rocksdb_root.mkdir(parents=True, exist_ok=True) + + if not args.skip_build: + build_binaries(repo_root) + + bench_bins = { + "mace": repo_root / "target" / "release" / "kv_bench", + "rocksdb": repo_root / "rocksdb" / "build" / "release" / "rocksdb_bench", + } + for engine in ENGINE_ORDER: + if not bench_bins[engine].exists(): + raise FileNotFoundError(f"benchmark binary not found: {bench_bins[engine]}") + + # Run sequentially: mace first, then rocksdb. + run_engine_cases( + engine="mace", + bench_bin=bench_bins["mace"], + engine_storage_root=mace_root, + result_csv=result_csv, + thread_points=thread_points, + warmup_secs=args.warmup_secs, + measure_secs=args.measure_secs, + iterations=args.iterations, + prefill_keys=args.prefill_keys, + read_path=args.read_path, + durability=args.durability, + insert_ratio=args.insert_ratio, + ) + run_engine_cases( + engine="rocksdb", + bench_bin=bench_bins["rocksdb"], + engine_storage_root=rocksdb_root, + result_csv=result_csv, + thread_points=thread_points, + warmup_secs=args.warmup_secs, + measure_secs=args.measure_secs, + iterations=args.iterations, + prefill_keys=args.prefill_keys, + read_path=args.read_path, + durability=args.durability, + insert_ratio=args.insert_ratio, + ) + + outputs = plot_results( + result_csv=result_csv, + output_dir=script_dir, + thread_points=thread_points, + insert_ratio=args.insert_ratio, + ) + print("[done] generated charts:") + for p in outputs: + print(f" - {p}") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main(sys.argv[1:])) diff --git a/src/main.rs b/src/main.rs index f5a29a0..e91a005 100644 --- a/src/main.rs +++ b/src/main.rs @@ -12,7 +12,7 @@ use std::io::{BufWriter, Write}; use std::path::Path; use std::process::exit; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::{Arc, Barrier}; +use std::sync::{Arc, Barrier, Mutex}; use std::thread::JoinHandle; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; @@ -667,7 +667,10 @@ fn main() { let read_path = match ReadPath::parse(&args.read_path) { Some(r) => r, None => { - eprintln!("invalid read_path `{}` (supported: snapshot, rw_txn)", args.read_path); + eprintln!( + "invalid read_path `{}` (supported: snapshot, rw_txn)", + args.read_path + ); exit(1); } }; @@ -690,6 +693,9 @@ fn main() { exit(1); } }; + let legacy_mode = workload.id.starts_with("LEGACY_"); + let effective_warmup_secs = if legacy_mode { 0 } else { args.warmup_secs }; + let effective_measure_secs = if legacy_mode { 0 } else { args.measure_secs }; let mixed_workload = workload.read_pct > 0 && workload.update_pct > 0; if mixed_workload && !shared_keyspace { @@ -772,6 +778,7 @@ fn main() { let op_counts = split_ranges(args.iterations, args.threads); let ready_barrier = Arc::new(Barrier::new(args.threads + 1)); let measure_barrier = Arc::new(Barrier::new(args.threads + 1)); + let measure_start = Arc::new(Mutex::new(None::)); let insert_counter = Arc::new(AtomicUsize::new(0)); let handles: Vec> = (0..args.threads) @@ -781,18 +788,20 @@ fn main() { let spec = workload.clone(); let ready = Arc::clone(&ready_barrier); let measure = Arc::clone(&measure_barrier); + let measure_start_slot = Arc::clone(&measure_start); let ins_ctr = Arc::clone(&insert_counter); let key_size = args.key_size; let random_insert = args.random; let read_path_mode = read_path; - let warmup_secs = args.warmup_secs; - let measure_secs = args.measure_secs; + let warmup_secs = effective_warmup_secs; + let measure_secs = effective_measure_secs; let distribution = spec.distribution; let zipf_theta = args.zipf_theta; let scan_len = spec.scan_len; let shared = shared_keyspace; let prefill_key_count = prefill_keys; let local_key_len = thread_prefill_ranges[tid].len; + let local_op_start = op_counts[tid].start; let local_op_count = op_counts[tid].len; std::thread::spawn(move || { @@ -835,11 +844,19 @@ fn main() { &ins_ctr, &mut local_insert_idx, None, + None, ); } } measure.wait(); + { + let now = Instant::now(); + let mut slot = measure_start_slot.lock().unwrap(); + if slot.map_or(true, |prev| now < prev) { + *slot = Some(now); + } + } if measure_secs > 0 { let deadline = Instant::now() + Duration::from_secs(measure_secs); @@ -862,13 +879,18 @@ fn main() { tid, &ins_ctr, &mut local_insert_idx, + None, Some(&mut stats), ); } } else { for idx in count_indices { + let fixed_insert_id = if spec.insert_only { + Some(if shared { local_op_start + idx } else { idx }) + } else { + None + }; let op = if spec.insert_only { - let _ = idx; OpKind::Update } else { pick_op_kind(&mut rng, &spec) @@ -890,6 +912,7 @@ fn main() { tid, &ins_ctr, &mut local_insert_idx, + fixed_insert_id, Some(&mut stats), ); } @@ -902,7 +925,13 @@ fn main() { ready_barrier.wait(); measure_barrier.wait(); - let measure_started = Instant::now(); + { + let now = Instant::now(); + let mut slot = measure_start.lock().unwrap(); + if slot.map_or(true, |prev| now < prev) { + *slot = Some(now); + } + } let mut merged_hist = [0u64; LAT_BUCKETS]; let mut total_ops = 0u64; @@ -917,7 +946,9 @@ fn main() { } } - let elapsed_us = measure_started.elapsed().as_micros() as u64; + let measure_end = Instant::now(); + let measure_started = (*measure_start.lock().unwrap()).unwrap_or(measure_end); + let elapsed_us = measure_end.duration_since(measure_started).as_micros() as u64; let ops_per_sec = if elapsed_us == 0 { 0.0 } else { @@ -949,8 +980,8 @@ fn main() { scan_pct: workload.scan_pct, scan_len: workload.scan_len, read_path, - warmup_secs: args.warmup_secs, - measure_secs: args.measure_secs, + warmup_secs: effective_warmup_secs, + measure_secs: effective_measure_secs, total_ops, error_ops, ops_per_sec, @@ -1000,6 +1031,7 @@ fn run_one_op( tid: usize, insert_counter: &AtomicUsize, local_insert_idx: &mut usize, + fixed_insert_id: Option, stats: Option<&mut ThreadStats>, ) { let start = stats.as_ref().map(|_| Instant::now()); @@ -1021,11 +1053,9 @@ fn run_one_op( make_thread_key(tid, id, key_size) }; match read_path { - ReadPath::Snapshot => bucket - .view() - .ok() - .and_then(|tx| tx.get(key).ok()) - .is_some(), + ReadPath::Snapshot => { + bucket.view().ok().and_then(|tx| tx.get(key).ok()).is_some() + } ReadPath::RwTxn => { if let Ok(tx) = bucket.begin() { let get_ok = tx.get(key).is_ok(); @@ -1042,7 +1072,13 @@ fn run_one_op( } OpKind::Update => { let key_opt = if spec.insert_only { - if shared_keyspace { + if let Some(id) = fixed_insert_id { + if shared_keyspace { + Some(make_shared_key(id, key_size)) + } else { + Some(make_thread_key(tid, id, key_size)) + } + } else if shared_keyspace { let id = insert_counter.fetch_add(1, Ordering::Relaxed); Some(make_shared_key(id, key_size)) } else {