bench: add fast legacy runner and fix legacy mode semantics

This commit is contained in:
abbycin 2026-03-08 15:07:50 +08:00
parent 1bdc932c51
commit ddc3f8af7e
Signed by: abby
GPG Key ID: B636E0F0307EF8EB
3 changed files with 475 additions and 33 deletions

View File

@ -374,6 +374,12 @@ static uint64_t now_epoch_ms() {
return static_cast<uint64_t>(ms.count());
}
static uint64_t steady_now_ns() {
auto now = std::chrono::steady_clock::now();
auto ns = std::chrono::duration_cast<std::chrono::nanoseconds>(now.time_since_epoch());
return static_cast<uint64_t>(ns.count());
}
static uint64_t read_mem_kb(const char *key) {
std::ifstream in("/proc/meminfo");
if (!in.is_open()) {
@ -433,7 +439,7 @@ static const char *result_header() {
static std::string result_row_csv(const ResultRow &r) {
return fmt::format(
"v2,{},{},{},{},{},{},{},{},{},{},{},{:.4},{},{},{},{},{},{},{},{},{},{:.3},{},{},{},{},{},{},{},{},{},{},{},{}",
"v2,{},{},{},{},{},{},{},{},{},{},{},{:.4},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{}",
r.ts_epoch_ms,
"rocksdb",
csv_escape(r.workload_id),
@ -455,7 +461,7 @@ static std::string result_row_csv(const ResultRow &r) {
r.measure_secs,
r.total_ops,
r.error_ops,
r.ops_per_sec,
static_cast<uint64_t>(r.ops_per_sec),
r.quantiles.p50_us,
r.quantiles.p95_us,
r.quantiles.p99_us,
@ -570,7 +576,8 @@ static bool run_one_op(OpKind op,
size_t local_key_len,
size_t tid,
std::atomic<size_t> &insert_counter,
size_t &local_insert_idx) {
size_t &local_insert_idx,
std::optional<size_t> fixed_insert_id) {
if (op == OpKind::Read) {
auto maybe_id = pick_key_id(rng, distribution, zipf_theta, shared_keyspace, prefill_keys, local_key_len);
if (!maybe_id.has_value()) {
@ -599,7 +606,14 @@ static bool run_one_op(OpKind op,
if (op == OpKind::Update) {
std::optional<std::string> key;
if (spec.insert_only) {
if (shared_keyspace) {
if (fixed_insert_id.has_value()) {
auto id = fixed_insert_id.value();
if (shared_keyspace) {
key = make_shared_key(id, key_size);
} else {
key = make_thread_key(tid, id, key_size);
}
} else if (shared_keyspace) {
auto id = insert_counter.fetch_add(1, std::memory_order_relaxed);
key = make_shared_key(id, key_size);
} else {
@ -762,6 +776,14 @@ int main(int argc, char *argv[]) {
return 1;
}
auto workload_spec = workload_spec_opt.value();
auto legacy_mode = workload_spec.id.rfind("LEGACY_", 0) == 0;
auto effective_warmup_secs = legacy_mode ? 0ULL : args.warmup_secs;
auto effective_measure_secs = legacy_mode ? 0ULL : args.measure_secs;
auto mixed_workload = workload_spec.read_pct > 0 && workload_spec.update_pct > 0;
if (mixed_workload && !args.shared_keyspace) {
fmt::println(stderr, "mixed workloads require shared keyspace");
return 1;
}
auto prefill_keys = workload_spec.requires_prefill
? (args.prefill_keys > 0 ? args.prefill_keys : std::max<size_t>(args.iterations, 1))
@ -853,11 +875,18 @@ int main(int argc, char *argv[]) {
std::barrier measure_barrier(static_cast<ptrdiff_t>(args.threads + 1));
std::atomic<size_t> insert_counter{0};
std::atomic<uint64_t> measure_start_ns{0};
std::vector<std::thread> workers;
workers.reserve(args.threads);
std::vector<ThreadStats> thread_stats(args.threads);
auto seed_base = now_epoch_ms();
auto mark_measure_start = [&measure_start_ns]() {
uint64_t expected = 0;
auto now_ns = steady_now_ns();
(void) measure_start_ns.compare_exchange_strong(
expected, now_ns, std::memory_order_relaxed);
};
for (size_t tid = 0; tid < args.threads; ++tid) {
workers.emplace_back([&, tid] {
@ -865,6 +894,7 @@ int main(int argc, char *argv[]) {
auto &stats = thread_stats[tid];
std::mt19937_64 rng(seed_base ^ ((tid + 1) * 0x9E3779B97F4A7C15ULL));
auto local_key_len = prefill_ranges[tid].len;
auto local_op_start = op_ranges[tid].start;
auto local_op_len = op_ranges[tid].len;
size_t local_insert_idx = 0;
@ -876,8 +906,9 @@ int main(int argc, char *argv[]) {
ready_barrier.arrive_and_wait();
if (args.warmup_secs > 0) {
auto warmup_deadline = std::chrono::steady_clock::now() + std::chrono::seconds(args.warmup_secs);
if (effective_warmup_secs > 0) {
auto warmup_deadline =
std::chrono::steady_clock::now() + std::chrono::seconds(effective_warmup_secs);
while (std::chrono::steady_clock::now() < warmup_deadline) {
auto op = pick_op_kind(rng, workload_spec);
(void) run_one_op(op,
@ -897,11 +928,13 @@ int main(int argc, char *argv[]) {
local_key_len,
tid,
insert_counter,
local_insert_idx);
local_insert_idx,
std::nullopt);
}
}
measure_barrier.arrive_and_wait();
mark_measure_start();
auto record = [&](bool ok, uint64_t us) {
stats.total_ops += 1;
@ -912,8 +945,9 @@ int main(int argc, char *argv[]) {
stats.hist[b] += 1;
};
if (args.measure_secs > 0) {
auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(args.measure_secs);
if (effective_measure_secs > 0) {
auto deadline =
std::chrono::steady_clock::now() + std::chrono::seconds(effective_measure_secs);
while (std::chrono::steady_clock::now() < deadline) {
auto op = pick_op_kind(rng, workload_spec);
auto started = std::chrono::steady_clock::now();
@ -934,15 +968,20 @@ int main(int argc, char *argv[]) {
local_key_len,
tid,
insert_counter,
local_insert_idx);
local_insert_idx,
std::nullopt);
auto us = static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::steady_clock::now() - started).count());
record(ok, us);
}
} else {
for (size_t i: count_indices) {
(void) i;
auto op = workload_spec.insert_only ? OpKind::Update : pick_op_kind(rng, workload_spec);
std::optional<size_t> fixed_insert_id = std::nullopt;
if (workload_spec.insert_only) {
fixed_insert_id =
args.shared_keyspace ? (local_op_start + i) : i;
}
auto started = std::chrono::steady_clock::now();
auto ok = run_one_op(op,
db,
@ -961,7 +1000,8 @@ int main(int argc, char *argv[]) {
local_key_len,
tid,
insert_counter,
local_insert_idx);
local_insert_idx,
fixed_insert_id);
auto us = static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::steady_clock::now() - started).count());
record(ok, us);
@ -972,13 +1012,18 @@ int main(int argc, char *argv[]) {
ready_barrier.arrive_and_wait();
measure_barrier.arrive_and_wait();
auto measure_started = nm::Instant::now();
mark_measure_start();
for (auto &w: workers) {
w.join();
}
uint64_t elapsed_us = static_cast<uint64_t>(measure_started.elapse_usec());
auto measure_end_ns = steady_now_ns();
auto measure_begin_ns = measure_start_ns.load(std::memory_order_relaxed);
if (measure_begin_ns == 0 || measure_end_ns < measure_begin_ns) {
measure_begin_ns = measure_end_ns;
}
uint64_t elapsed_us = (measure_end_ns - measure_begin_ns) / 1000;
uint64_t total_ops = 0;
uint64_t error_ops = 0;
std::array<uint64_t, kLatencyBuckets> merged_hist{};
@ -1012,8 +1057,8 @@ int main(int argc, char *argv[]) {
.scan_pct = workload_spec.scan_pct,
.scan_len = workload_spec.scan_len,
.read_path = read_path.value(),
.warmup_secs = args.warmup_secs,
.measure_secs = args.measure_secs,
.warmup_secs = effective_warmup_secs,
.measure_secs = effective_measure_secs,
.total_ops = total_ops,
.error_ops = error_ops,
.ops_per_sec = ops_per_sec,
@ -1033,14 +1078,14 @@ int main(int argc, char *argv[]) {
}
fmt::println(
"engine=rocksdb workload={} mode={} durability={} threads={} ops={} err={} qps={:.2} p99_us={} result_file={}",
"engine=rocksdb workload={} mode={} durability={} threads={} ops={} err={} qps={} p99_us={} result_file={}",
row.workload_id,
row.mode,
durability_str(row.durability_mode),
row.threads,
row.total_ops,
row.error_ops,
row.ops_per_sec,
static_cast<uint64_t>(row.ops_per_sec),
row.quantiles.p99_us,
args.result_file);

361
scripts/fast_test.py Normal file
View File

@ -0,0 +1,361 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import os
import subprocess
import sys
import uuid
from pathlib import Path
from typing import Sequence
def ensure_venv_python() -> None:
script_path = Path(__file__).resolve()
script_dir = script_path.parent
venv_python = script_dir / "bin" / "python"
if not venv_python.exists():
return
current = Path(sys.executable).absolute()
target = venv_python.absolute()
if current == target:
return
if os.environ.get("FAST_TEST_REEXEC") == "1":
return
env = os.environ.copy()
env["FAST_TEST_REEXEC"] = "1"
os.execve(str(target), [str(target), str(script_path), *sys.argv[1:]], env)
ensure_venv_python()
import matplotlib.pyplot as plt
import pandas as pd
ENGINE_ORDER = ("mace", "rocksdb")
MODE_PLAN = (
("put", "insert"),
("get", "get"),
("mixed", "mixed"),
("scan", "scan"),
)
KV_PROFILES = (
(16, 128),
(32, 1024),
(32, 4096),
(32, 16384),
)
LINE_STYLES = {"mace": "-", "rocksdb": ":"}
def detect_logical_cpus() -> int:
count = os.cpu_count() or 1
return max(1, int(count))
def power_of_two_threads(cpu_count: int) -> list[int]:
points: list[int] = []
t = 1
while t <= cpu_count:
points.append(t)
t *= 2
return points
def format_bytes(size: int) -> str:
if size >= 1024 and size % 1024 == 0:
return f"{size // 1024}KB"
return f"{size}B"
def repo_root_from_script(script_dir: Path) -> Path:
return script_dir.parent
def build_binaries(repo_root: Path) -> None:
subprocess.run(
["cargo", "build", "--release", "--manifest-path", str(repo_root / "Cargo.toml")],
check=True,
)
subprocess.run(["cmake", "--preset", "release"], check=True, cwd=repo_root / "rocksdb")
subprocess.run(
["cmake", "--build", "--preset", "release"], check=True, cwd=repo_root / "rocksdb"
)
def run_engine_cases(
*,
engine: str,
bench_bin: Path,
engine_storage_root: Path,
result_csv: Path,
thread_points: Sequence[int],
warmup_secs: int,
measure_secs: int,
iterations: int,
prefill_keys: int,
read_path: str,
durability: str,
insert_ratio: int,
) -> None:
for mode_display, mode_cli in MODE_PLAN:
for threads in thread_points:
for key_size, value_size in KV_PROFILES:
run_path = engine_storage_root / (
f"{engine}_{mode_display}_t{threads}_k{key_size}_v{value_size}_"
f"{uuid.uuid4().hex[:8]}"
)
args = [
str(bench_bin),
"--path",
str(run_path),
"--mode",
mode_cli,
"--threads",
str(threads),
"--key-size",
str(key_size),
"--value-size",
str(value_size),
"--iterations",
str(iterations),
"--warmup-secs",
str(warmup_secs),
"--measure-secs",
str(measure_secs),
"--read-path",
read_path,
"--durability",
durability,
"--result-file",
str(result_csv),
]
if mode_cli != "insert":
args.extend(["--prefill-keys", str(prefill_keys)])
if mode_cli == "mixed":
args.extend(["--insert-ratio", str(insert_ratio)])
if engine == "mace":
args.append("--shared-keyspace")
print(
f"[run] engine={engine} mode={mode_display} "
f"threads={threads} key={key_size} value={value_size}"
)
print(f"{' '.join(args)}")
subprocess.run(args, check=True)
def annotate_points(x_values: Sequence[int], y_values: Sequence[float], y_max: float, color: str) -> None:
if y_max <= 0:
return
y_offset = 0.035 * y_max
for x, y in zip(x_values, y_values):
top_half = y >= y_max * 0.9
y_pos = y - y_offset if top_half else y + y_offset
va = "top" if top_half else "bottom"
plt.text(
x,
y_pos,
f"{y:.0f}",
fontsize=8,
ha="center",
va=va,
color=color,
bbox={
"facecolor": "white",
"alpha": 0.75,
"edgecolor": "none",
"boxstyle": "round,pad=0.2",
},
)
def mode_title(mode_display: str, insert_ratio: int) -> str:
if mode_display == "mixed":
return f"Mixed ({100 - insert_ratio}% Get, {insert_ratio}% Put)"
return mode_display.capitalize()
def plot_results(
*,
result_csv: Path,
output_dir: Path,
thread_points: Sequence[int],
insert_ratio: int,
) -> list[Path]:
df = pd.read_csv(result_csv)
required = {"engine", "mode", "threads", "key_size", "value_size", "ops_per_sec"}
missing = required - set(df.columns)
if missing:
raise ValueError(f"Missing required columns in csv: {sorted(missing)}")
df = df[df["engine"].isin(ENGINE_ORDER) & df["mode"].isin([x[1] for x in MODE_PLAN])]
if df.empty:
raise ValueError("No legacy rows found in csv for mace/rocksdb")
grouped = (
df.groupby(["engine", "mode", "key_size", "value_size", "threads"], as_index=False)[
"ops_per_sec"
]
.mean()
.sort_values(["engine", "mode", "key_size", "value_size", "threads"])
)
color_list = plt.get_cmap("tab10").colors
profile_colors = {
profile: color_list[idx % len(color_list)] for idx, profile in enumerate(KV_PROFILES)
}
output_paths: list[Path] = []
for mode_display, mode_cli in MODE_PLAN:
mode_df = grouped[grouped["mode"] == mode_cli]
if mode_df.empty:
continue
plt.figure(figsize=(16, 10))
y_max = float(mode_df["ops_per_sec"].max()) if not mode_df.empty else 0.0
for engine in ENGINE_ORDER:
for key_size, value_size in KV_PROFILES:
sub = mode_df[
(mode_df["engine"] == engine)
& (mode_df["key_size"] == key_size)
& (mode_df["value_size"] == value_size)
].sort_values("threads")
if sub.empty:
continue
x = sub["threads"].tolist()
y = sub["ops_per_sec"].tolist()
label = (
f"{engine} ({format_bytes(key_size)}/{format_bytes(value_size)})"
)
line_color = profile_colors[(key_size, value_size)]
plt.plot(
x,
y,
label=label,
linestyle=LINE_STYLES.get(engine, "-"),
marker="o",
markersize=6,
linewidth=2,
color=line_color,
)
annotate_points(x, y, y_max, line_color)
plt.title(mode_title(mode_display, insert_ratio), fontsize=16)
plt.xlabel("Threads", fontsize=14)
plt.ylabel("OPS/s", fontsize=14)
plt.xticks(list(thread_points), fontsize=12)
plt.yticks(fontsize=12)
plt.grid(True, linestyle="--", alpha=0.4)
plt.legend(fontsize=10)
plt.tight_layout()
out = output_dir / f"fast_test_{mode_display}_ops.png"
plt.savefig(out, dpi=260, bbox_inches="tight")
plt.close()
output_paths.append(out)
return output_paths
def parse_args(argv: Sequence[str]) -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Run legacy fast benchmark for mace + rocksdb, then plot mode-level OPS charts."
)
parser.add_argument("storage_root", type=Path, help="Root directory for benchmark temporary data")
parser.add_argument("--warmup-secs", type=int, default=3)
parser.add_argument("--measure-secs", type=int, default=10)
parser.add_argument("--iterations", type=int, default=500_000)
parser.add_argument("--prefill-keys", type=int, default=500_000)
parser.add_argument("--insert-ratio", type=int, default=30)
parser.add_argument("--read-path", choices=("snapshot", "rw_txn"), default="snapshot")
parser.add_argument("--durability", choices=("relaxed", "durable"), default="relaxed")
parser.add_argument("--csv-name", default="fast_test_results.csv")
parser.add_argument("--skip-build", action="store_true", help="Skip cargo/cmake build")
parser.add_argument("--plot-only", action="store_true", help="Skip run and only plot existing csv")
parser.add_argument("--append", action="store_true", help="Append to existing csv instead of overwrite")
return parser.parse_args(argv)
def main(argv: Sequence[str]) -> int:
args = parse_args(argv)
script_dir = Path(__file__).resolve().parent
repo_root = repo_root_from_script(script_dir)
result_csv = script_dir / args.csv_name
cpu_count = detect_logical_cpus()
thread_points = power_of_two_threads(cpu_count)
print(f"[info] detected logical cpus={cpu_count}, threads={thread_points}")
print(f"[info] csv={result_csv}")
if not args.plot_only:
if not args.append and result_csv.exists():
result_csv.unlink()
args.storage_root.mkdir(parents=True, exist_ok=True)
mace_root = args.storage_root / "mace"
rocksdb_root = args.storage_root / "rocksdb"
mace_root.mkdir(parents=True, exist_ok=True)
rocksdb_root.mkdir(parents=True, exist_ok=True)
if not args.skip_build:
build_binaries(repo_root)
bench_bins = {
"mace": repo_root / "target" / "release" / "kv_bench",
"rocksdb": repo_root / "rocksdb" / "build" / "release" / "rocksdb_bench",
}
for engine in ENGINE_ORDER:
if not bench_bins[engine].exists():
raise FileNotFoundError(f"benchmark binary not found: {bench_bins[engine]}")
# Run sequentially: mace first, then rocksdb.
run_engine_cases(
engine="mace",
bench_bin=bench_bins["mace"],
engine_storage_root=mace_root,
result_csv=result_csv,
thread_points=thread_points,
warmup_secs=args.warmup_secs,
measure_secs=args.measure_secs,
iterations=args.iterations,
prefill_keys=args.prefill_keys,
read_path=args.read_path,
durability=args.durability,
insert_ratio=args.insert_ratio,
)
run_engine_cases(
engine="rocksdb",
bench_bin=bench_bins["rocksdb"],
engine_storage_root=rocksdb_root,
result_csv=result_csv,
thread_points=thread_points,
warmup_secs=args.warmup_secs,
measure_secs=args.measure_secs,
iterations=args.iterations,
prefill_keys=args.prefill_keys,
read_path=args.read_path,
durability=args.durability,
insert_ratio=args.insert_ratio,
)
outputs = plot_results(
result_csv=result_csv,
output_dir=script_dir,
thread_points=thread_points,
insert_ratio=args.insert_ratio,
)
print("[done] generated charts:")
for p in outputs:
print(f" - {p}")
return 0
if __name__ == "__main__":
raise SystemExit(main(sys.argv[1:]))

View File

@ -12,7 +12,7 @@ use std::io::{BufWriter, Write};
use std::path::Path;
use std::process::exit;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Barrier};
use std::sync::{Arc, Barrier, Mutex};
use std::thread::JoinHandle;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
@ -667,7 +667,10 @@ fn main() {
let read_path = match ReadPath::parse(&args.read_path) {
Some(r) => r,
None => {
eprintln!("invalid read_path `{}` (supported: snapshot, rw_txn)", args.read_path);
eprintln!(
"invalid read_path `{}` (supported: snapshot, rw_txn)",
args.read_path
);
exit(1);
}
};
@ -690,6 +693,9 @@ fn main() {
exit(1);
}
};
let legacy_mode = workload.id.starts_with("LEGACY_");
let effective_warmup_secs = if legacy_mode { 0 } else { args.warmup_secs };
let effective_measure_secs = if legacy_mode { 0 } else { args.measure_secs };
let mixed_workload = workload.read_pct > 0 && workload.update_pct > 0;
if mixed_workload && !shared_keyspace {
@ -772,6 +778,7 @@ fn main() {
let op_counts = split_ranges(args.iterations, args.threads);
let ready_barrier = Arc::new(Barrier::new(args.threads + 1));
let measure_barrier = Arc::new(Barrier::new(args.threads + 1));
let measure_start = Arc::new(Mutex::new(None::<Instant>));
let insert_counter = Arc::new(AtomicUsize::new(0));
let handles: Vec<JoinHandle<ThreadStats>> = (0..args.threads)
@ -781,18 +788,20 @@ fn main() {
let spec = workload.clone();
let ready = Arc::clone(&ready_barrier);
let measure = Arc::clone(&measure_barrier);
let measure_start_slot = Arc::clone(&measure_start);
let ins_ctr = Arc::clone(&insert_counter);
let key_size = args.key_size;
let random_insert = args.random;
let read_path_mode = read_path;
let warmup_secs = args.warmup_secs;
let measure_secs = args.measure_secs;
let warmup_secs = effective_warmup_secs;
let measure_secs = effective_measure_secs;
let distribution = spec.distribution;
let zipf_theta = args.zipf_theta;
let scan_len = spec.scan_len;
let shared = shared_keyspace;
let prefill_key_count = prefill_keys;
let local_key_len = thread_prefill_ranges[tid].len;
let local_op_start = op_counts[tid].start;
let local_op_count = op_counts[tid].len;
std::thread::spawn(move || {
@ -835,11 +844,19 @@ fn main() {
&ins_ctr,
&mut local_insert_idx,
None,
None,
);
}
}
measure.wait();
{
let now = Instant::now();
let mut slot = measure_start_slot.lock().unwrap();
if slot.map_or(true, |prev| now < prev) {
*slot = Some(now);
}
}
if measure_secs > 0 {
let deadline = Instant::now() + Duration::from_secs(measure_secs);
@ -862,13 +879,18 @@ fn main() {
tid,
&ins_ctr,
&mut local_insert_idx,
None,
Some(&mut stats),
);
}
} else {
for idx in count_indices {
let fixed_insert_id = if spec.insert_only {
Some(if shared { local_op_start + idx } else { idx })
} else {
None
};
let op = if spec.insert_only {
let _ = idx;
OpKind::Update
} else {
pick_op_kind(&mut rng, &spec)
@ -890,6 +912,7 @@ fn main() {
tid,
&ins_ctr,
&mut local_insert_idx,
fixed_insert_id,
Some(&mut stats),
);
}
@ -902,7 +925,13 @@ fn main() {
ready_barrier.wait();
measure_barrier.wait();
let measure_started = Instant::now();
{
let now = Instant::now();
let mut slot = measure_start.lock().unwrap();
if slot.map_or(true, |prev| now < prev) {
*slot = Some(now);
}
}
let mut merged_hist = [0u64; LAT_BUCKETS];
let mut total_ops = 0u64;
@ -917,7 +946,9 @@ fn main() {
}
}
let elapsed_us = measure_started.elapsed().as_micros() as u64;
let measure_end = Instant::now();
let measure_started = (*measure_start.lock().unwrap()).unwrap_or(measure_end);
let elapsed_us = measure_end.duration_since(measure_started).as_micros() as u64;
let ops_per_sec = if elapsed_us == 0 {
0.0
} else {
@ -949,8 +980,8 @@ fn main() {
scan_pct: workload.scan_pct,
scan_len: workload.scan_len,
read_path,
warmup_secs: args.warmup_secs,
measure_secs: args.measure_secs,
warmup_secs: effective_warmup_secs,
measure_secs: effective_measure_secs,
total_ops,
error_ops,
ops_per_sec,
@ -1000,6 +1031,7 @@ fn run_one_op(
tid: usize,
insert_counter: &AtomicUsize,
local_insert_idx: &mut usize,
fixed_insert_id: Option<usize>,
stats: Option<&mut ThreadStats>,
) {
let start = stats.as_ref().map(|_| Instant::now());
@ -1021,11 +1053,9 @@ fn run_one_op(
make_thread_key(tid, id, key_size)
};
match read_path {
ReadPath::Snapshot => bucket
.view()
.ok()
.and_then(|tx| tx.get(key).ok())
.is_some(),
ReadPath::Snapshot => {
bucket.view().ok().and_then(|tx| tx.get(key).ok()).is_some()
}
ReadPath::RwTxn => {
if let Ok(tx) = bucket.begin() {
let get_ok = tx.get(key).is_ok();
@ -1042,7 +1072,13 @@ fn run_one_op(
}
OpKind::Update => {
let key_opt = if spec.insert_only {
if shared_keyspace {
if let Some(id) = fixed_insert_id {
if shared_keyspace {
Some(make_shared_key(id, key_size))
} else {
Some(make_thread_key(tid, id, key_size))
}
} else if shared_keyspace {
let id = insert_counter.fetch_add(1, Ordering::Relaxed);
Some(make_shared_key(id, key_size))
} else {