update options and remove unused code

This commit is contained in:
abbycin 2026-06-03 16:43:53 +08:00
parent eb7316b0ca
commit 8ef2f8d2f5
Signed by: abby
GPG Key ID: B636E0F0307EF8EB
10 changed files with 14 additions and 653 deletions

View File

@ -9,20 +9,9 @@ clap = { version = "4.5.48", features = ["derive"] }
rand = "0.9.2"
log = "0.4.22"
coreid = { path = "coreid" }
logger = { path = "logger" }
myalloc = { path = "heap_trace" }
[features]
default = []
custom_alloc = []
[profile.release]
lto = true
opt-level = 3
debug = true
[profile.release-with-symbol]
inherits = "release"
# debug = true
debug = "line-tables-only"
panic = "abort"

View File

@ -1,2 +0,0 @@
/target
Cargo.lock

View File

@ -1,7 +0,0 @@
[package]
name = "myalloc"
version = "0.1.0"
edition = "2024"
[dependencies]
backtrace = "0.3.76"

View File

@ -1,233 +0,0 @@
use std::{
alloc::{GlobalAlloc, System},
cell::Cell,
collections::{HashMap, hash_map::Entry},
fmt::Display,
hash::{DefaultHasher, Hash, Hasher},
ptr,
sync::{LazyLock, Mutex, atomic::AtomicBool},
};
pub struct MyAlloc;
fn trace(size: usize, is_alloc: bool) -> Option<String> {
let mut key = String::new();
backtrace::trace(|f| {
backtrace::resolve_frame(f, |sym| {
if let Some(filename) = sym.filename()
&& let Some(line) = sym.lineno()
{
if let Some(name) = filename.to_str()
&& name.contains("mace")
{
if name.len() > 10 {
// sometime name maybe empty
let x = format!("{}:{}\n", name, line);
key.extend(x.chars().into_iter());
}
}
}
});
true
});
if !key.is_empty() {
let mut lk = G_MAP.lock().unwrap();
let tmp = key.clone();
match lk.entry(tmp) {
Entry::Vacant(v) => {
if is_alloc {
v.insert(Status {
nr_alloc: 1,
alloc_size: size,
nr_free: 0,
free_size: 0,
});
} else {
v.insert(Status {
nr_alloc: 0,
alloc_size: 0,
nr_free: 1,
free_size: size,
});
}
}
Entry::Occupied(mut o) => {
let s = o.get_mut();
if is_alloc {
s.nr_alloc += 1;
s.alloc_size += size;
} else {
s.nr_free += 1;
s.free_size += size;
}
}
}
Some(key)
} else {
None
}
}
#[derive(Debug)]
pub struct Status {
nr_alloc: usize,
alloc_size: usize,
nr_free: usize,
free_size: usize,
}
impl Display for Status {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_fmt(format_args!("{:?}", self))
}
}
static G_STOP: AtomicBool = AtomicBool::new(false);
static G_MAP: LazyLock<Mutex<HashMap<String, Status>>> =
LazyLock::new(|| Mutex::new(HashMap::new()));
static G_TRACE: LazyLock<Mutex<HashMap<u64, String>>> =
LazyLock::new(|| Mutex::new(HashMap::new()));
const META_LEN: usize = 8;
thread_local! {
static G_SELF: Cell<bool> = const { Cell::new(false) };
}
const fn real_size(layout: &std::alloc::Layout) -> usize {
if META_LEN > layout.align() {
META_LEN.checked_add(layout.size()).unwrap()
} else {
layout.align().checked_add(layout.size()).unwrap()
}
}
fn new_layout(layout: std::alloc::Layout) -> std::alloc::Layout {
let align = layout.align().max(align_of::<u64>());
let sz = real_size(&layout);
std::alloc::Layout::from_size_align(sz, align).unwrap()
}
fn write_hash(x: *mut u8, align: usize, s: Option<String>) -> *mut u8 {
let r = unsafe { x.add(META_LEN.max(align)) };
if !G_SELF.with(|x| x.get()) {
G_SELF.with(|x| x.set(true));
let p = x.cast::<u64>();
if let Some(s) = s {
let mut stat = DefaultHasher::new();
s.hash(&mut stat);
let h = stat.finish();
unsafe { p.write_unaligned(h) };
let mut lk = G_TRACE.lock().unwrap();
lk.insert(h, s);
} else {
unsafe { p.write_unaligned(u64::MAX) };
}
G_SELF.with(|x| x.set(false));
}
r
}
fn read_hash(x: *mut u8, align: usize) -> *mut u8 {
let (h, p) = unsafe {
let p = x.sub(META_LEN.max(align)).cast::<u64>();
(p.read_unaligned(), p.cast::<u8>())
};
if h == u64::MAX {
return p;
}
if !G_SELF.with(|x| x.get()) {
G_SELF.with(|x| x.set(true));
let mut lk = G_TRACE.lock().unwrap();
lk.remove(&h);
G_SELF.with(|x| x.set(false));
}
p
}
unsafe impl GlobalAlloc for MyAlloc {
unsafe fn alloc(&self, layout: std::alloc::Layout) -> *mut u8 {
let s = if !G_SELF.with(|x| x.get()) && !G_STOP.load(std::sync::atomic::Ordering::Acquire) {
G_SELF.with(|x| x.set(true));
let x = trace(layout.size(), true);
G_SELF.with(|x| x.set(false));
x
} else {
None
};
let new = new_layout(layout);
let x = unsafe { System.alloc(new) };
write_hash(x, new.align(), s)
}
unsafe fn dealloc(&self, ptr: *mut u8, layout: std::alloc::Layout) {
if !G_SELF.with(|x| x.get()) && !G_STOP.load(std::sync::atomic::Ordering::Acquire) {
G_SELF.with(|x| x.set(true));
trace(layout.size(), false);
G_SELF.with(|x| x.set(false));
}
let new = new_layout(layout);
let p = read_hash(ptr, new.align());
unsafe { System.dealloc(p, new) };
}
unsafe fn alloc_zeroed(&self, layout: std::alloc::Layout) -> *mut u8 {
let p = unsafe { self.alloc(layout) };
if !p.is_null() {
unsafe { ptr::write_bytes(p, 0, layout.size()) };
}
p
}
unsafe fn realloc(&self, ptr: *mut u8, layout: std::alloc::Layout, new_size: usize) -> *mut u8 {
let s = if !G_SELF.with(|x| x.get()) && !G_STOP.load(std::sync::atomic::Ordering::Acquire) {
G_SELF.with(|x| x.set(true));
let x = trace(layout.size(), true);
G_SELF.with(|x| x.set(false));
x
} else {
None
};
unsafe {
let old_layout = new_layout(layout);
let raw = ptr.sub(META_LEN.max(old_layout.align()));
let new_total_size = META_LEN + new_size;
let new_raw = System.realloc(raw, old_layout, new_total_size);
if new_raw.is_null() {
return new_raw;
}
write_hash(new_raw, old_layout.align(), s)
}
}
}
pub fn print_filtered_trace<F>(f: F)
where
F: Fn(&str, &Status),
{
G_STOP.store(true, std::sync::atomic::Ordering::Release);
let lk = G_MAP.lock().unwrap();
let t = G_TRACE.lock().unwrap();
for (_, v) in t.iter() {
if let Some(s) = lk.get(v) {
f(v, s);
}
}
}
pub fn print_all_trace<F>(f: F)
where
F: Fn(&str, &Status),
{
G_STOP.store(true, std::sync::atomic::Ordering::Release);
let lk = G_MAP.lock().unwrap();
lk.iter().for_each(|(k, v)| f(k, v));
}

2
logger/.gitignore vendored
View File

@ -1,2 +0,0 @@
/target
Cargo.lock

View File

@ -1,12 +0,0 @@
[package]
name = "logger"
version = "0.1.1"
edition = "2024"
authors = ["abbytsing@gmail.com"]
[target.'cfg(target_os = "linux")'.dependencies]
libc = "0.2.161"
[dependencies]
chrono = "0.4.38"
log = "0.4.22"

View File

@ -1,298 +0,0 @@
use log::{LevelFilter, Metadata, Record};
use std::cell::OnceCell;
use std::io::Write;
use std::ops::Deref;
use std::path::Path;
use std::ptr::addr_of_mut;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering::Relaxed;
use std::sync::{Mutex, MutexGuard};
thread_local! {
static G_TID: OnceCell<i32> = OnceCell::new();
}
#[cfg(not(target_os = "linux"))]
static G_ID: std::sync::atomic::AtomicI32 = std::sync::atomic::AtomicI32::new(1);
static mut G_LOGGER: Logger = Logger {
mtx_shard: [const { Mutex::new(()) }; 8],
sink: Vec::new(),
abort_on_error: AtomicBool::new(false),
};
static G_INIIED: Mutex<bool> = Mutex::new(false);
const G_CONSOLE: &'static str = "console";
const G_FILE: &'static str = "file";
#[cfg(target_os = "linux")]
fn get_tid() -> i32 {
G_TID.with(|x| *x.get_or_init(|| unsafe { libc::gettid() }))
}
#[cfg(not(target_os = "linux"))]
fn get_tid() -> i32 {
use std::sync::atomic::Ordering::Relaxed;
G_TID.with(|x| *x.get_or_init(|| G_ID.fetch_add(1, Relaxed)))
}
/// a simple sync logger which impl log::Log
pub struct Logger {
mtx_shard: [Mutex<()>; 8],
sink: Vec<SinkHandle>,
abort_on_error: AtomicBool,
}
struct SinkHandle {
raw: *mut dyn Sink,
}
unsafe impl Send for SinkHandle {}
unsafe impl Sync for SinkHandle {}
impl SinkHandle {
fn new<T>(x: T) -> Self
where
T: Sink + 'static,
{
let x = Box::new(x);
let raw = Box::into_raw(x);
Self { raw }
}
fn as_mut(&self) -> &mut dyn Sink {
unsafe { &mut *self.raw }
}
}
impl Deref for SinkHandle {
type Target = dyn Sink;
fn deref(&self) -> &Self::Target {
unsafe { &*self.raw }
}
}
impl Drop for SinkHandle {
fn drop(&mut self) {
unsafe {
let _ = Box::from_raw(self.raw);
}
}
}
trait Sink: Send + Sync {
fn sink(&mut self, str: &String);
fn flush(&mut self);
fn name(&self) -> &'static str;
}
impl log::Log for Logger {
fn enabled(&self, _metadata: &Metadata) -> bool {
return true;
}
fn log(&self, record: &Record) {
let s = format!(
"{} {} [{}] {}:{} {}\n",
chrono::Local::now().format("%Y-%m-%d %H:%M:%S.%6f"),
get_tid(),
record.level().as_str(),
record.file().unwrap(),
record.line().unwrap(),
record.args()
);
let _lk = self.lock();
for p in &self.sink {
p.as_mut().sink(&s);
}
if record.level() == log::LevelFilter::Error && self.should_abort() {
let bt = std::backtrace::Backtrace::force_capture();
let buf = format!("{}", bt);
for p in &self.sink {
p.as_mut().sink(&buf);
}
std::process::abort();
}
}
fn flush(&self) {
let _lk = self.lock();
for p in &self.sink {
p.as_mut().flush();
}
}
}
struct Console {}
impl Console {
fn new() -> Self {
Self {}
}
}
/// NOTE: file rolling is not support at present
struct File {
w: std::fs::File,
}
impl File {
fn new(path: impl AsRef<Path>, trunc: bool) -> Result<Self, std::io::Error> {
let mut ops = std::fs::File::options();
ops.write(true).create(true);
if trunc {
ops.truncate(true);
} else {
ops.append(true);
}
match ops.open(path) {
Err(e) => Err(e),
Ok(f) => Ok(Self { w: f }),
}
}
}
impl Sink for Console {
fn sink(&mut self, str: &String) {
std::io::stdout().write(str.as_bytes()).unwrap();
}
fn flush(&mut self) {
std::io::stdout().flush().unwrap();
}
fn name(&self) -> &'static str {
G_CONSOLE
}
}
impl Sink for File {
fn sink(&mut self, str: &String) {
self.w.write(str.as_bytes()).unwrap();
}
fn flush(&mut self) {
self.w.flush().unwrap();
}
fn name(&self) -> &'static str {
G_FILE
}
}
impl Logger {
fn is_set() -> bool {
*G_INIIED.lock().unwrap()
}
pub fn init() -> &'static mut Self {
if !Self::is_set() {
*G_INIIED.lock().unwrap() = true;
log::set_logger(Self::get()).unwrap();
log::set_max_level(LevelFilter::Trace);
}
return Self::get();
}
pub fn get() -> &'static mut Self {
unsafe {
let a = addr_of_mut!(G_LOGGER);
return &mut *a;
}
}
fn exist(&self, sink: &'static str) -> Option<&mut Self> {
let _lk = self.mtx_shard[0].lock().unwrap();
for i in &self.sink {
if i.name() == sink {
return Some(Self::get());
}
}
return None;
}
fn should_abort(&self) -> bool {
self.abort_on_error.load(Relaxed)
}
fn lock(&'_ self) -> MutexGuard<'_, ()> {
self.mtx_shard[get_tid() as usize & (self.mtx_shard.len() - 1)]
.lock()
.unwrap()
}
pub fn abort_on_error(&mut self, flag: bool) -> &mut Self {
self.abort_on_error.store(flag, Relaxed);
self
}
pub fn add_console(&mut self) -> &mut Self {
if self.exist(G_CONSOLE).is_none() {
self.sink.push(SinkHandle::new(Console::new()));
}
self
}
pub fn add_file(&mut self, path: impl AsRef<Path>, trunc: bool) -> Option<&mut Self> {
if self.exist(G_FILE).is_none() {
match File::new(&path, trunc) {
Err(e) => {
eprintln!(
"can't open {}, error {}",
path.as_ref().to_str().unwrap(),
e.to_string()
);
return None;
}
Ok(f) => {
self.sink.push(SinkHandle::new(f));
return Some(self);
}
}
}
Some(self)
}
fn remove_impl(&mut self, name: &'static str) {
let _lk = self.mtx_shard[0].lock().unwrap();
for (idx, s) in self.sink.iter().enumerate() {
if s.name() == name {
self.sink.remove(idx);
break;
}
}
}
pub fn remove_file(&mut self) {
self.remove_impl(G_FILE);
}
pub fn remove_console(&mut self) {
self.remove_impl(G_CONSOLE);
}
}
impl Drop for Logger {
fn drop(&mut self) {
let _lk = self.lock();
for p in &self.sink {
p.as_mut().flush();
}
}
}
#[cfg(test)]
mod test {
use crate::Logger;
#[test]
fn test_console() {
let l = Logger::init();
let p = log::logger() as *const dyn log::Log;
let q = &*l as *const dyn log::Log;
assert!(std::ptr::addr_eq(p, q));
}
}

View File

@ -30,7 +30,6 @@
#include <unistd.h>
#include "CLI/CLI.hpp"
#include "instant.h"
template<class T>
static void black_box(const T &t) {
@ -131,7 +130,6 @@ struct ThreadStats {
};
struct ResultRow {
uint64_t ts_epoch_ms;
std::string workload_id;
std::string mode;
DurabilityMode durability_mode;
@ -376,19 +374,18 @@ static std::string csv_escape(const std::string &v) {
}
static const char *result_header() {
return "schema_version,ts_epoch_ms,engine,workload_id,mode,durability_mode,threads,key_size,value_size,prefill_"
return "engine,workload_id,mode,durability_mode,threads,key_size,value_size,prefill_"
"keys,shared_keyspace,distribution,zipf_theta,read_pct,update_pct,scan_pct,scan_len,read_path,warmup_secs,"
"measure_secs,total_op,ok_op,err_op,ops,p50_us,p95_us,p99_us,p999_us,elapsed_us";
}
static std::string result_row_csv(const ResultRow &r) {
return fmt::format("v2,{},{},{},{},{},{},{},{},{},{},{},{:.4},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{}",
r.ts_epoch_ms, "rocksdb", csv_escape(r.workload_id), csv_escape(r.mode),
durability_str(r.durability_mode), r.threads, r.key_size, r.value_size, r.prefill_keys,
r.shared_keyspace, distribution_str(r.distribution), r.zipf_theta, r.read_pct, r.update_pct,
r.scan_pct, r.scan_len, read_path_str(r.read_path), r.warmup_secs, r.measure_secs, r.total_op,
r.ok_op, r.err_op, static_cast<uint64_t>(r.ops), r.quantiles.p50_us, r.quantiles.p95_us,
r.quantiles.p99_us, r.quantiles.p999_us, r.elapsed_us);
return fmt::format("{},{},{},{},{},{},{},{},{},{},{:.4},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{}", "rocksdb",
csv_escape(r.workload_id), csv_escape(r.mode), durability_str(r.durability_mode), r.threads,
r.key_size, r.value_size, r.prefill_keys, r.shared_keyspace, distribution_str(r.distribution),
r.zipf_theta, r.read_pct, r.update_pct, r.scan_pct, r.scan_len, read_path_str(r.read_path),
r.warmup_secs, r.measure_secs, r.total_op, r.ok_op, r.err_op, static_cast<uint64_t>(r.ops),
r.quantiles.p50_us, r.quantiles.p95_us, r.quantiles.p99_us, r.quantiles.p999_us, r.elapsed_us);
}
static bool append_result_row(const std::string &path, const ResultRow &row) {
@ -720,7 +717,7 @@ int main(int argc, char *argv[]) {
cfo.write_buffer_size = 64 << 20;
cfo.max_write_buffer_number = 16;
auto cache = rocksdb::NewLRUCache(4 << 30);
auto cache = rocksdb::NewLRUCache(5 << 30);
rocksdb::BlockBasedTableOptions table_options{};
table_options.block_cache = cache;
cfo.table_factory.reset(NewBlockBasedTableFactory(table_options));
@ -908,7 +905,6 @@ int main(int argc, char *argv[]) {
uint64_t ok_op = total_op >= err_op ? (total_op - err_op) : 0;
auto row = ResultRow{
.ts_epoch_ms = now_epoch_ms(),
.workload_id = workload_spec.id,
.mode = workload_spec.mode_label,
.durability_mode = durability.value(),

View File

@ -1,53 +0,0 @@
#!/usr/bin/python3
import sys
assert(len(sys.argv) == 2)
f = open(sys.argv[1])
lines = []
allocs = []
while True:
line = f.readline()
if len(line) < 10:
break
if line.find('INFO') != -1:
continue
pos = line.find('Status')
if pos < 0:
pos = line.find('mace')
if pos != 0:
lines.append(line[pos:])
else:
lines.append(line)
else:
cleaned = line[pos+6:].strip().strip('{}')
pairs = cleaned.split(',')
tl = [tuple(pair.split(': ')) for pair in pairs]
tl = [(k.strip(), int(v)) for k, v in tl]
allocs.append((tl, ''.join(lines)))
lines.clear()
# sort by alloc_size
allocs.sort(key=lambda x: x[0][1][1], reverse=True)
with open('alloc.txt', 'w') as o:
for x in allocs:
o.write(f'{x[0]}\n{x[1]}\n')
# sort by free_size
allocs.sort(key=lambda x: x[0][3][1], reverse=True)
with open('free.txt', 'w') as o:
for x in allocs:
o.write(f'{x[0]}\n{x[1]}\n')
alloc_size = 0
free_size = 0
for x in allocs:
alloc_size += x[0][1][1]
free_size += x[0][3][1]
print(f"total_alloc {alloc_size} total_free {free_size}")

View File

@ -1,9 +1,5 @@
use clap::{ArgAction, Parser};
#[cfg(target_os = "linux")]
use logger::Logger;
use mace::{Mace, Options};
#[cfg(feature = "custom_alloc")]
use myalloc::{MyAlloc, print_filtered_trace};
use rand::rngs::StdRng;
use rand::seq::SliceRandom;
use rand::{Rng, SeedableRng};
@ -16,10 +12,6 @@ use std::sync::{Arc, Barrier, Mutex};
use std::thread::JoinHandle;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
#[cfg(feature = "custom_alloc")]
#[global_allocator]
static GLOBAL: MyAlloc = MyAlloc;
const LAT_BUCKETS: usize = 64;
const PREFIX_GROUPS: usize = 1024;
const PREFILL_BATCH: usize = 1024;
@ -187,7 +179,6 @@ struct Quantiles {
#[derive(Clone, Debug)]
struct ResultRow {
ts_epoch_ms: u128,
engine: &'static str,
workload_id: String,
mode: String,
@ -432,13 +423,12 @@ fn csv_escape(raw: &str) -> String {
}
fn result_header() -> &'static str {
"schema_version,ts_epoch_ms,engine,workload_id,mode,durability_mode,threads,key_size,value_size,prefill_keys,shared_keyspace,distribution,zipf_theta,read_pct,update_pct,scan_pct,scan_len,read_path,warmup_secs,measure_secs,total_op,ok_op,err_op,ops,p50_us,p95_us,p99_us,p999_us,elapsed_us"
"engine,workload_id,mode,durability_mode,threads,key_size,value_size,prefill_keys,shared_keyspace,distribution,zipf_theta,read_pct,update_pct,scan_pct,scan_len,read_path,warmup_secs,measure_secs,total_op,ok_op,err_op,ops,p50_us,p95_us,p99_us,p999_us,elapsed_us"
}
fn result_row_csv(row: &ResultRow) -> String {
format!(
"v2,{},{},{},{},{},{},{},{},{},{},{},{:.4},{},{},{},{},{},{},{},{},{},{:.3},{},{},{},{},{},{}",
row.ts_epoch_ms,
"{},{},{},{},{},{},{},{},{},{},{},{:.4},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{}",
row.engine,
csv_escape(&row.workload_id),
csv_escape(&row.mode),
@ -460,7 +450,7 @@ fn result_row_csv(row: &ResultRow) -> String {
row.total_op,
row.ok_op,
row.err_op,
row.ops,
row.ops as u64,
row.quantiles.p50_us,
row.quantiles.p95_us,
row.quantiles.p99_us,
@ -546,12 +536,6 @@ fn pick_op_kind(rng: &mut StdRng, spec: &WorkloadSpec) -> OpKind {
}
fn main() {
#[cfg(target_os = "linux")]
{
Logger::init().add_file("kv_bench.log", true);
log::set_max_level(log::LevelFilter::Error);
}
let args = Args::parse();
let path = Path::new(&args.path);
let shared_keyspace = args.shared_keyspace && !args.no_shared_keyspace;
@ -650,8 +634,10 @@ fn main() {
opt.concurrent_write = 8;
opt.inline_size = args.blob_size;
opt.checkpoint_size = 128 << 20;
opt.cache_capacity = 3 << 30;
opt.cache_capacity = 4 << 30;
opt.lru_capacity = 1 << 30;
opt.blob_handle_cache_capacity = 256;
opt.data_handle_cache_capacity = 256;
opt.pool_capacity = 16 * (64 << 20);
opt.enable_backpressure = true;
opt.gc_timeout = 5 * 1000;
@ -899,7 +885,6 @@ fn main() {
};
let row = ResultRow {
ts_epoch_ms: now_epoch_ms(),
engine: "mace",
workload_id: workload.id.clone(),
mode: workload.mode_label.clone(),
@ -946,8 +931,6 @@ fn main() {
);
drop(db);
#[cfg(feature = "custom_alloc")]
print_filtered_trace(|x, y| log::info!("{}{}", x, y));
}
#[allow(clippy::too_many_arguments)]