Compare commits
2 Commits
f71be68c38
...
abf82f735c
| Author | SHA1 | Date | |
|---|---|---|---|
| abf82f735c | |||
| 80b4745118 |
123
rocksdb/main.cpp
123
rocksdb/main.cpp
@ -17,9 +17,11 @@
|
||||
#include <rocksdb/utilities/transaction.h>
|
||||
#include <rocksdb/utilities/transaction_db.h>
|
||||
|
||||
#include <algorithm>
|
||||
#include <barrier>
|
||||
#include <filesystem>
|
||||
#include <format>
|
||||
#include <numeric>
|
||||
#include <string>
|
||||
|
||||
#include <pthread.h>
|
||||
@ -47,6 +49,13 @@ static void bind_core(size_t tid) {
|
||||
(void) pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &set);
|
||||
}
|
||||
|
||||
static void require_ok(const rocksdb::Status &st, const char *what) {
|
||||
if (!st.ok()) {
|
||||
fmt::println(stderr, "{} failed: {}", what, st.ToString());
|
||||
std::abort();
|
||||
}
|
||||
}
|
||||
|
||||
struct Args {
|
||||
size_t threads;
|
||||
size_t iterations;
|
||||
@ -135,11 +144,11 @@ int main(int argc, char *argv[]) {
|
||||
cfo.min_blob_size = args.blob_size;
|
||||
cfo.disable_auto_compactions = true;
|
||||
cfo.max_compaction_bytes = (1ULL << 60);
|
||||
cfo.level0_stop_writes_trigger = 100000;
|
||||
cfo.level0_slowdown_writes_trigger = 100000;
|
||||
cfo.level0_file_num_compaction_trigger = 100000;
|
||||
cfo.level0_stop_writes_trigger = 1000000;
|
||||
cfo.level0_slowdown_writes_trigger = 1000000;
|
||||
cfo.level0_file_num_compaction_trigger = 1000000;
|
||||
cfo.write_buffer_size = 64 << 20;
|
||||
cfo.max_write_buffer_number = 64;
|
||||
cfo.max_write_buffer_number = 128;
|
||||
|
||||
// use 3GB block cache
|
||||
auto cache = rocksdb::NewLRUCache(3 << 30);
|
||||
@ -154,65 +163,58 @@ int main(int argc, char *argv[]) {
|
||||
options.create_if_missing = true;
|
||||
options.allow_concurrent_memtable_write = true;
|
||||
options.enable_pipelined_write = true;
|
||||
options.env->SetBackgroundThreads(4, rocksdb::Env::Priority::HIGH);
|
||||
options.max_background_flushes = 8;
|
||||
options.env->SetBackgroundThreads(8, rocksdb::Env::Priority::HIGH);
|
||||
|
||||
auto wopt = rocksdb::WriteOptions();
|
||||
wopt.no_slowdown = true;
|
||||
// wopt.disableWAL = true;
|
||||
std::vector<std::thread> wg;
|
||||
std::atomic<uint64_t> total_op{0};
|
||||
rocksdb::OptimisticTransactionDB *db;
|
||||
auto b = nm::Instant::now();
|
||||
std::vector<rocksdb::ColumnFamilyHandle *> handles{};
|
||||
auto s = rocksdb::OptimisticTransactionDB::Open(options, args.path, cfd, &handles, &db);
|
||||
assert(s.ok());
|
||||
require_ok(s, "open db");
|
||||
std::barrier ready_barrier{static_cast<ptrdiff_t>(args.threads + 1)};
|
||||
std::barrier start_barrier{static_cast<ptrdiff_t>(args.threads + 1)};
|
||||
|
||||
std::random_device rd{};
|
||||
std::mt19937 gen(rd());
|
||||
|
||||
std::string val(args.value_size, 'x');
|
||||
|
||||
auto *handle = handles[0];
|
||||
|
||||
if (args.mode == "get" || args.mode == "scan") {
|
||||
auto *kv = db->BeginTransaction(wopt);
|
||||
std::vector<std::thread> fill_threads;
|
||||
for (size_t tid = 0; tid < args.threads; ++tid) {
|
||||
size_t count = args.iterations / args.threads + (tid < args.iterations % args.threads ? 1 : 0);
|
||||
for (size_t i = 0; i < count; ++i) {
|
||||
auto key = std::format("key_{}_{}", tid, i);
|
||||
key.resize(args.key_size, 'x');
|
||||
kv->Put(handle, key, val);
|
||||
}
|
||||
fill_threads.emplace_back([&, tid] {
|
||||
bind_core(tid);
|
||||
size_t count = args.iterations / args.threads + (tid < args.iterations % args.threads ? 1 : 0);
|
||||
const size_t batch_size = 10000;
|
||||
for (size_t i = 0; i < count; i += batch_size) {
|
||||
auto *kv = db->BeginTransaction(wopt);
|
||||
for (size_t j = 0; j < batch_size && (i + j) < count; ++j) {
|
||||
auto key = std::format("key_{}_{}", tid, i + j);
|
||||
key.resize(args.key_size, 'x');
|
||||
require_ok(kv->Put(handle, key, val), "fill put");
|
||||
}
|
||||
require_ok(kv->Commit(), "fill commit");
|
||||
delete kv;
|
||||
}
|
||||
});
|
||||
}
|
||||
kv->Commit();
|
||||
delete kv;
|
||||
for (auto &t: fill_threads)
|
||||
t.join();
|
||||
|
||||
delete handle;
|
||||
delete db;
|
||||
handles.clear();
|
||||
// re-open db
|
||||
s = rocksdb::OptimisticTransactionDB::Open(options, args.path, cfd, &handles, &db);
|
||||
assert(s.ok());
|
||||
|
||||
require_ok(s, "reopen db");
|
||||
handle = handles[0];
|
||||
|
||||
std::uniform_int_distribution<size_t> tid_dist(0, args.threads - 1);
|
||||
for (size_t i = 0; i < args.iterations; ++i) {
|
||||
auto tid = tid_dist(gen);
|
||||
size_t count = args.iterations / args.threads + (tid < args.iterations % args.threads ? 1 : 0);
|
||||
std::uniform_int_distribution<size_t> key_dist(0, count - 1);
|
||||
auto idx = key_dist(gen);
|
||||
auto key = std::format("key_{}_{}", tid, idx);
|
||||
key.resize(args.key_size, 'x');
|
||||
auto s = db->Get(rocksdb::ReadOptions(), key, &val);
|
||||
if (!s.ok()) {
|
||||
std::terminate();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
auto *snapshot = db->GetSnapshot();
|
||||
auto base_seed = rd();
|
||||
for (size_t tid = 0; tid < args.threads; ++tid) {
|
||||
wg.emplace_back([&, tid] {
|
||||
@ -226,56 +228,60 @@ int main(int argc, char *argv[]) {
|
||||
ropt.iterate_upper_bound = &upper_bound_slice;
|
||||
}
|
||||
ropt.prefix_same_as_start = true;
|
||||
ropt.snapshot = snapshot;
|
||||
size_t round = 0;
|
||||
std::mt19937 mixed_gen(static_cast<uint32_t>(base_seed) ^ static_cast<uint32_t>(0x9e3779b9U * (tid + 1)));
|
||||
std::mt19937 thread_gen(static_cast<uint32_t>(base_seed) ^ static_cast<uint32_t>(tid));
|
||||
std::uniform_int_distribution<int> mixed_dist(0, 99);
|
||||
|
||||
size_t key_count = args.iterations / args.threads + (tid < args.iterations % args.threads ? 1 : 0);
|
||||
std::vector<size_t> indices(key_count);
|
||||
std::iota(indices.begin(), indices.end(), 0);
|
||||
if (args.random) {
|
||||
std::shuffle(indices.begin(), indices.end(), thread_gen);
|
||||
}
|
||||
|
||||
ready_barrier.arrive_and_wait();
|
||||
start_barrier.arrive_and_wait();
|
||||
|
||||
if (args.mode == "insert") {
|
||||
for (size_t i = 0; i < key_count; ++i) {
|
||||
for (size_t i: indices) {
|
||||
auto key = std::format("key_{}_{}", tid, i);
|
||||
key.resize(args.key_size, 'x');
|
||||
round += 1;
|
||||
auto *kv = db->BeginTransaction(wopt);
|
||||
kv->Put(handle, key, val);
|
||||
kv->Commit();
|
||||
require_ok(kv->Put(handle, key, val), "insert put");
|
||||
require_ok(kv->Commit(), "insert commit");
|
||||
delete kv;
|
||||
}
|
||||
|
||||
} else if (args.mode == "get") {
|
||||
for (size_t i = 0; i < key_count; ++i) {
|
||||
// rocksdb has no dedicated read-only txn in this bench path, use direct get for fair read-path
|
||||
// comparison with mace view
|
||||
for (size_t i: indices) {
|
||||
auto key = std::format("key_{}_{}", tid, i);
|
||||
key.resize(args.key_size, 'x');
|
||||
round += 1;
|
||||
auto *kv = db->BeginTransaction(wopt);
|
||||
kv->Get(ropt, handle, key, &rval);
|
||||
kv->Commit();
|
||||
delete kv;
|
||||
require_ok(db->Get(ropt, handle, key, &rval), "get");
|
||||
}
|
||||
} else if (args.mode == "mixed") {
|
||||
for (size_t i = 0; i < key_count; ++i) {
|
||||
for (size_t i: indices) {
|
||||
auto key = std::format("key_{}_{}", tid, i);
|
||||
key.resize(args.key_size, 'x');
|
||||
round += 1;
|
||||
auto is_insert = mixed_dist(mixed_gen) < static_cast<int>(args.insert_ratio);
|
||||
auto is_insert = mixed_dist(thread_gen) < static_cast<int>(args.insert_ratio);
|
||||
auto *kv = db->BeginTransaction(wopt);
|
||||
if (is_insert) {
|
||||
kv->Put(handle, key, val);
|
||||
require_ok(kv->Put(handle, key, val), "mixed put");
|
||||
} else {
|
||||
kv->Get(ropt, handle, key, &rval);
|
||||
auto st = kv->Get(ropt, handle, key, &rval);
|
||||
if (!st.ok() && !st.IsNotFound()) {
|
||||
require_ok(st, "mixed get");
|
||||
}
|
||||
}
|
||||
kv->Commit();
|
||||
require_ok(kv->Commit(), "mixed commit");
|
||||
delete kv;
|
||||
}
|
||||
} else if (args.mode == "scan") {
|
||||
auto *iter = db->NewIterator(ropt);
|
||||
iter->Seek(prefix);
|
||||
size_t n = 0;
|
||||
while (iter->Valid()) {
|
||||
round += 1;
|
||||
auto k = iter->key();
|
||||
@ -283,8 +289,8 @@ int main(int argc, char *argv[]) {
|
||||
black_box(k);
|
||||
black_box(v);
|
||||
iter->Next();
|
||||
n += 1;
|
||||
}
|
||||
require_ok(iter->status(), "scan iterate");
|
||||
delete iter;
|
||||
}
|
||||
total_op.fetch_add(round, std::memory_order::relaxed);
|
||||
@ -303,7 +309,13 @@ int main(int argc, char *argv[]) {
|
||||
return args.insert_ratio;
|
||||
return args.mode == "insert" ? 100 : 0;
|
||||
}();
|
||||
uint64_t ops = total_op.load(std::memory_order_relaxed) / b.elapse_sec();
|
||||
const auto elapsed_us = b.elapse_usec();
|
||||
uint64_t ops = 0;
|
||||
const auto total = total_op.load(std::memory_order_relaxed);
|
||||
if (elapsed_us > 0) {
|
||||
ops = static_cast<uint64_t>(static_cast<double>(total) * 1000000.0 / elapsed_us);
|
||||
}
|
||||
|
||||
if (args.mode == "insert") {
|
||||
if (args.random) {
|
||||
args.mode = "random_insert";
|
||||
@ -311,9 +323,8 @@ int main(int argc, char *argv[]) {
|
||||
args.mode = "sequential_insert";
|
||||
}
|
||||
}
|
||||
fmt::println("{},{},{},{},{},{},{}", args.mode, args.threads, args.key_size, args.value_size, ratio, (uint64_t) ops,
|
||||
(uint64_t) b.elapse_ms());
|
||||
db->ReleaseSnapshot(snapshot);
|
||||
fmt::println("{},{},{},{},{},{},{}", args.mode, args.threads, args.key_size, args.value_size, ratio, ops,
|
||||
static_cast<uint64_t>(elapsed_us));
|
||||
delete handle;
|
||||
delete db;
|
||||
std::filesystem::remove_all(args.path);
|
||||
|
||||
@ -47,8 +47,8 @@ function samples() {
|
||||
done
|
||||
}
|
||||
|
||||
echo mode,threads,key_size,value_size,insert_ratio,ops,elasped > "${script_dir}/mace.csv"
|
||||
samples "$1" 2>> "${script_dir}/mace.csv"
|
||||
echo mode,threads,key_size,value_size,insert_ratio,ops,elapsed_us > "${script_dir}/mace.csv"
|
||||
samples "$1" 1>> "${script_dir}/mace.csv"
|
||||
if [ -x "${script_dir}/bin/python" ]; then
|
||||
(cd "${script_dir}" && "${script_dir}/bin/python" plot.py mace.csv)
|
||||
else
|
||||
|
||||
@ -3,6 +3,7 @@ import matplotlib.pyplot as plt
|
||||
from adjustText import adjust_text
|
||||
import sys
|
||||
|
||||
|
||||
def real_mode(m):
|
||||
if m == "mixed":
|
||||
return "Mixed (70% Get, 30% Insert)"
|
||||
@ -12,20 +13,29 @@ def real_mode(m):
|
||||
return "Sequential Scan"
|
||||
return m.capitalize()
|
||||
|
||||
|
||||
name = sys.argv[1]
|
||||
prefix = name.split(".")[0]
|
||||
# 读取数据
|
||||
|
||||
# read benchmark data
|
||||
# keep compatibility with older csv files that used elapsed/elasped
|
||||
# and normalize to elapsed_us
|
||||
|
||||
df = pd.read_csv(f"./{name}")
|
||||
if "elapsed_us" not in df.columns:
|
||||
if "elapsed" in df.columns:
|
||||
df = df.rename(columns={"elapsed": "elapsed_us"})
|
||||
elif "elasped" in df.columns:
|
||||
df = df.rename(columns={"elasped": "elapsed_us"})
|
||||
|
||||
# 按 mode 分组
|
||||
# group by mode
|
||||
modes = df["mode"].unique()
|
||||
|
||||
|
||||
for mode in modes:
|
||||
plt.figure(figsize=(16, 9))
|
||||
subset = df[df["mode"] == mode]
|
||||
|
||||
# 按 key_size/value_size 分组
|
||||
# group by key/value size
|
||||
key_value_combinations = subset.groupby(["key_size", "value_size"])
|
||||
|
||||
texts = []
|
||||
@ -34,19 +44,17 @@ for mode in modes:
|
||||
x = group["threads"]
|
||||
y = group["ops"]
|
||||
|
||||
# 绘制折线
|
||||
# draw line
|
||||
line, = plt.plot(x, y, marker="o", label=label)
|
||||
|
||||
# 添加文本标签
|
||||
# add labels
|
||||
for xi, yi, ops in zip(x, y, group["ops"]):
|
||||
texts.append(
|
||||
plt.text(xi, yi, f"{int(ops)}", color=line.get_color(), fontsize=12)
|
||||
)
|
||||
|
||||
# 自动调整文本位置
|
||||
adjust_text(texts, arrowprops=dict(arrowstyle="->", color='gray'))
|
||||
adjust_text(texts, arrowprops=dict(arrowstyle="->", color="gray"))
|
||||
|
||||
# 设置图表样式
|
||||
plt.title(f"{prefix.upper()}: {real_mode(mode)}", fontsize=16)
|
||||
plt.xlabel("Threads", fontsize=14)
|
||||
plt.ylabel("OPS", fontsize=14)
|
||||
|
||||
@ -47,7 +47,7 @@ function samples() {
|
||||
done
|
||||
}
|
||||
|
||||
echo mode,threads,key_size,value_size,insert_ratio,ops,elapsed > "${script_dir}/rocksdb.csv"
|
||||
echo mode,threads,key_size,value_size,insert_ratio,ops,elapsed_us > "${script_dir}/rocksdb.csv"
|
||||
samples "$1" 1>> "${script_dir}/rocksdb.csv"
|
||||
if [ -x "${script_dir}/bin/python" ]; then
|
||||
(cd "${script_dir}" && "${script_dir}/bin/python" plot.py rocksdb.csv)
|
||||
|
||||
87
src/main.rs
87
src/main.rs
@ -4,7 +4,6 @@ use logger::Logger;
|
||||
use mace::{Mace, Options};
|
||||
#[cfg(feature = "custom_alloc")]
|
||||
use myalloc::{MyAlloc, print_filtered_trace};
|
||||
use rand::prelude::*;
|
||||
use std::path::Path;
|
||||
use std::process::exit;
|
||||
use std::sync::Arc;
|
||||
@ -85,17 +84,17 @@ fn main() {
|
||||
eprintln!("Error: Insert ratio must be between 0 and 100");
|
||||
exit(1);
|
||||
}
|
||||
|
||||
let mut opt = Options::new(path);
|
||||
opt.sync_on_write = false;
|
||||
opt.over_provision = false;
|
||||
opt.inline_size = args.blob_size;
|
||||
opt.tmp_store = args.mode != "get" && args.mode != "scan";
|
||||
opt.cache_capacity = 3 << 30;
|
||||
opt.data_file_size = 64 << 20;
|
||||
opt.max_log_size = 1 << 30;
|
||||
opt.default_arenas = 128;
|
||||
let mut saved = opt.clone();
|
||||
saved.tmp_store = false;
|
||||
|
||||
saved.tmp_store = true;
|
||||
let mut db = Mace::new(opt.validate().unwrap()).unwrap();
|
||||
db.disable_gc();
|
||||
let mut bkt = db.new_bucket("default").unwrap();
|
||||
@ -103,42 +102,41 @@ fn main() {
|
||||
let value = Arc::new(vec![b'0'; args.value_size]);
|
||||
|
||||
if args.mode == "get" || args.mode == "scan" {
|
||||
let pre_tx = bkt.begin().unwrap();
|
||||
let mut fill_handles = vec![];
|
||||
for tid in 0..args.threads {
|
||||
let count = args.iterations / args.threads
|
||||
let bkt_clone = bkt.clone();
|
||||
let val_clone = value.clone();
|
||||
let key_count = args.iterations / args.threads
|
||||
+ if tid < args.iterations % args.threads {
|
||||
1
|
||||
} else {
|
||||
0
|
||||
};
|
||||
for i in 0..count {
|
||||
let mut key = format!("key_{tid}_{i}").into_bytes();
|
||||
key.resize(args.key_size, b'x');
|
||||
pre_tx.put(&key, &*value).unwrap();
|
||||
}
|
||||
let key_size = args.key_size;
|
||||
fill_handles.push(std::thread::spawn(move || {
|
||||
coreid::bind_core(tid);
|
||||
const BATCH_SIZE: usize = 10000;
|
||||
for i in (0..key_count).step_by(BATCH_SIZE) {
|
||||
let tx = bkt_clone.begin().unwrap();
|
||||
for j in 0..BATCH_SIZE {
|
||||
if i + j >= key_count {
|
||||
break;
|
||||
}
|
||||
let mut key = format!("key_{tid}_{}", i + j).into_bytes();
|
||||
key.resize(key_size, b'x');
|
||||
tx.put(&key, &*val_clone).unwrap();
|
||||
}
|
||||
tx.commit().unwrap();
|
||||
}
|
||||
}));
|
||||
}
|
||||
for h in fill_handles {
|
||||
h.join().unwrap();
|
||||
}
|
||||
pre_tx.commit().unwrap();
|
||||
drop(bkt);
|
||||
drop(db);
|
||||
saved.tmp_store = true;
|
||||
db = Mace::new(saved.validate().unwrap()).unwrap();
|
||||
bkt = db.get_bucket("default").unwrap();
|
||||
|
||||
let mut rng = rand::rng();
|
||||
for _ in 0..args.iterations {
|
||||
let tid = rng.random_range(0..args.threads);
|
||||
let count = args.iterations / args.threads
|
||||
+ if tid < args.iterations % args.threads {
|
||||
1
|
||||
} else {
|
||||
0
|
||||
};
|
||||
let idx = rng.random_range(0..count);
|
||||
let mut key = format!("key_{tid}_{idx}").into_bytes();
|
||||
key.resize(args.key_size, b'x');
|
||||
let view = bkt.view().unwrap();
|
||||
view.get(&key).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
let mut key_counts = vec![args.iterations / args.threads; args.threads];
|
||||
@ -162,15 +160,22 @@ fn main() {
|
||||
let key_count = key_counts[tid];
|
||||
let key_size = args.key_size;
|
||||
let prefix = format!("key_{tid}_");
|
||||
let is_random = args.random;
|
||||
|
||||
std::thread::spawn(move || {
|
||||
coreid::bind_core(tid);
|
||||
let mut round = 0;
|
||||
let mut indices: Vec<usize> = (0..key_count).collect();
|
||||
if is_random {
|
||||
use rand::seq::SliceRandom;
|
||||
indices.shuffle(&mut rand::rng());
|
||||
}
|
||||
|
||||
ready_barrier.wait();
|
||||
start_barrier.wait();
|
||||
match mode.as_str() {
|
||||
"insert" => {
|
||||
for i in 0..key_count {
|
||||
for i in indices {
|
||||
let mut key = format!("key_{tid}_{i}").into_bytes();
|
||||
key.resize(key_size, b'x');
|
||||
round += 1;
|
||||
@ -180,7 +185,7 @@ fn main() {
|
||||
}
|
||||
}
|
||||
"get" => {
|
||||
for i in 0..key_count {
|
||||
for i in indices {
|
||||
let mut key = format!("key_{tid}_{i}").into_bytes();
|
||||
key.resize(key_size, b'x');
|
||||
round += 1;
|
||||
@ -190,7 +195,7 @@ fn main() {
|
||||
}
|
||||
}
|
||||
"mixed" => {
|
||||
for i in 0..key_count {
|
||||
for i in indices {
|
||||
let mut key = format!("key_{tid}_{i}").into_bytes();
|
||||
key.resize(key_size, b'x');
|
||||
let is_insert = rand::random_range(0..100) < insert_ratio;
|
||||
@ -231,9 +236,13 @@ fn main() {
|
||||
x.join().unwrap();
|
||||
}
|
||||
|
||||
let duration = start_time.elapsed();
|
||||
let elapsed_us = start_time.elapsed().as_micros() as u64;
|
||||
let total = total_ops.load(std::sync::atomic::Ordering::Relaxed);
|
||||
let ops = (total as f64 / duration.as_secs_f64()) as usize;
|
||||
let ops = if elapsed_us == 0 {
|
||||
0
|
||||
} else {
|
||||
((total as u128 * 1_000_000u128) / elapsed_us as u128) as usize
|
||||
};
|
||||
|
||||
let ratio = if args.mode == "mixed" {
|
||||
args.insert_ratio
|
||||
@ -250,15 +259,9 @@ fn main() {
|
||||
mode = "sequential_insert".into();
|
||||
}
|
||||
}
|
||||
eprintln!(
|
||||
println!(
|
||||
"{},{},{},{},{},{},{}",
|
||||
mode,
|
||||
args.threads,
|
||||
args.key_size,
|
||||
args.value_size,
|
||||
ratio,
|
||||
ops,
|
||||
duration.as_millis()
|
||||
mode, args.threads, args.key_size, args.value_size, ratio, ops, elapsed_us
|
||||
);
|
||||
drop(db);
|
||||
#[cfg(feature = "custom_alloc")]
|
||||
|
||||
Loading…
Reference in New Issue
Block a user