no prepared keys

This commit is contained in:
abbycin 2026-02-24 22:24:39 +08:00
parent fe6b1de1f6
commit f71be68c38
Signed by: abby
GPG Key ID: B636E0F0307EF8EB
2 changed files with 78 additions and 80 deletions

View File

@ -133,19 +133,19 @@ int main(int argc, char *argv[]) {
rocksdb::ColumnFamilyOptions cfo{};
cfo.enable_blob_files = true;
cfo.min_blob_size = args.blob_size;
// rocksdb::BlockBasedTableOptions top{};
// top.use_delta_encoding = false;
// cfo.table_factory.reset(rocksdb::NewBlockBasedTableFactory(top));
cfo.disable_auto_compactions = true;
cfo.max_compaction_bytes = (1ULL << 60);
cfo.level0_stop_writes_trigger = 100000;
cfo.level0_slowdown_writes_trigger = 100000;
cfo.level0_file_num_compaction_trigger = 100000;
cfo.write_buffer_size = 64 << 20;
cfo.max_write_buffer_number = 64;
// use 3GB block cache
auto cache = rocksdb::NewLRUCache(3 << 30);
rocksdb::BlockBasedTableOptions table_options{};
table_options.block_cache = cache;
cfo.table_factory.reset(NewBlockBasedTableFactory(table_options));
// the following three options makes it not trigger GC in test
cfo.level0_file_num_compaction_trigger = 10000;
cfo.write_buffer_size = 64 << 20;
cfo.max_write_buffer_number = 16;
std::vector<rocksdb::ColumnFamilyDescriptor> cfd{};
cfd.push_back(rocksdb::ColumnFamilyDescriptor("default", cfo));
@ -160,7 +160,6 @@ int main(int argc, char *argv[]) {
wopt.no_slowdown = true;
// wopt.disableWAL = true;
std::vector<std::thread> wg;
std::vector<std::vector<std::string>> keys{};
std::atomic<uint64_t> total_op{0};
rocksdb::OptimisticTransactionDB *db;
auto b = nm::Instant::now();
@ -174,32 +173,16 @@ int main(int argc, char *argv[]) {
std::mt19937 gen(rd());
std::string val(args.value_size, 'x');
std::vector<size_t> key_counts(args.threads, args.iterations / args.threads);
for (size_t i = 0; i < args.iterations % args.threads; ++i) {
key_counts[i] += 1;
}
keys.reserve(args.threads);
for (size_t tid = 0; tid < args.threads; ++tid) {
std::vector<std::string> key{};
key.reserve(key_counts[tid]);
for (size_t i = 0; i < key_counts[tid]; ++i) {
auto tmp = std::format("key_{}_{}", tid, i);
tmp.resize(args.key_size, 'x');
key.emplace_back(std::move(tmp));
}
if (args.mode == "get" || args.random) {
std::shuffle(key.begin(), key.end(), gen);
}
keys.emplace_back(std::move(key));
}
auto *handle = handles[0];
if (args.mode == "get" || args.mode == "scan") {
auto *kv = db->BeginTransaction(wopt);
for (size_t tid = 0; tid < args.threads; ++tid) {
auto *tk = &keys[tid];
for (auto &key: *tk) {
size_t count = args.iterations / args.threads + (tid < args.iterations % args.threads ? 1 : 0);
for (size_t i = 0; i < count; ++i) {
auto key = std::format("key_{}_{}", tid, i);
key.resize(args.key_size, 'x');
kv->Put(handle, key, val);
}
}
@ -214,16 +197,15 @@ int main(int argc, char *argv[]) {
handle = handles[0];
// simulate common use cases
std::uniform_int_distribution<size_t> tid_dist(0, args.threads - 1);
for (size_t i = 0; i < args.iterations; ++i) {
auto tid = tid_dist(gen);
if (keys[tid].empty()) {
continue;
}
std::uniform_int_distribution<size_t> key_dist(0, keys[tid].size() - 1);
const auto &k = keys[tid][key_dist(gen)];
auto s = db->Get(rocksdb::ReadOptions(), k, &val);
size_t count = args.iterations / args.threads + (tid < args.iterations % args.threads ? 1 : 0);
std::uniform_int_distribution<size_t> key_dist(0, count - 1);
auto idx = key_dist(gen);
auto key = std::format("key_{}_{}", tid, idx);
key.resize(args.key_size, 'x');
auto s = db->Get(rocksdb::ReadOptions(), key, &val);
if (!s.ok()) {
std::terminate();
}
@ -243,18 +225,21 @@ int main(int argc, char *argv[]) {
if (!upper_bound.empty()) {
ropt.iterate_upper_bound = &upper_bound_slice;
}
auto *tk = &keys[tid];
ropt.prefix_same_as_start = true;
ropt.snapshot = snapshot;
size_t round = 0;
std::mt19937 mixed_gen(static_cast<uint32_t>(base_seed) ^ static_cast<uint32_t>(0x9e3779b9U * (tid + 1)));
std::uniform_int_distribution<int> mixed_dist(0, 99);
size_t key_count = args.iterations / args.threads + (tid < args.iterations % args.threads ? 1 : 0);
ready_barrier.arrive_and_wait();
start_barrier.arrive_and_wait();
if (args.mode == "insert") {
for (auto &key: *tk) {
for (size_t i = 0; i < key_count; ++i) {
auto key = std::format("key_{}_{}", tid, i);
key.resize(args.key_size, 'x');
round += 1;
auto *kv = db->BeginTransaction(wopt);
kv->Put(handle, key, val);
@ -263,7 +248,9 @@ int main(int argc, char *argv[]) {
}
} else if (args.mode == "get") {
for (auto &key: *tk) {
for (size_t i = 0; i < key_count; ++i) {
auto key = std::format("key_{}_{}", tid, i);
key.resize(args.key_size, 'x');
round += 1;
auto *kv = db->BeginTransaction(wopt);
kv->Get(ropt, handle, key, &rval);
@ -271,20 +258,21 @@ int main(int argc, char *argv[]) {
delete kv;
}
} else if (args.mode == "mixed") {
for (auto &key: *tk) {
for (size_t i = 0; i < key_count; ++i) {
auto key = std::format("key_{}_{}", tid, i);
key.resize(args.key_size, 'x');
round += 1;
auto is_insert = mixed_dist(mixed_gen) < static_cast<int>(args.insert_ratio);
auto *kv = db->BeginTransaction(wopt);
if (is_insert) {
kv->Put(handle, key, val);
} else {
kv->Get(ropt, handle, key, &rval); // not found
kv->Get(ropt, handle, key, &rval);
}
kv->Commit();
delete kv;
}
} else if (args.mode == "scan") {
// ropt.pin_data = true;
auto *iter = db->NewIterator(ropt);
iter->Seek(prefix);
size_t n = 0;

View File

@ -52,7 +52,7 @@ fn main() {
Logger::init().add_file("/tmp/x.log", true);
log::set_max_level(log::LevelFilter::Info);
}
let mut args = Args::parse();
let args = Args::parse();
let path = Path::new(&args.path);
@ -86,64 +86,66 @@ fn main() {
exit(1);
}
let mut keys: Vec<Vec<Vec<u8>>> = Vec::with_capacity(args.threads);
let mut opt = Options::new(path);
opt.sync_on_write = false;
opt.over_provision = true; // large value will use lots of memeory
opt.over_provision = false;
opt.inline_size = args.blob_size;
opt.tmp_store = args.mode != "get" && args.mode != "scan";
opt.cache_capacity = 3 << 30;
opt.data_file_size = 64 << 20;
opt.max_log_size = 1 << 30;
let mut saved = opt.clone();
saved.tmp_store = false;
let mut db = Mace::new(opt.validate().unwrap()).unwrap();
db.disable_gc();
let mut bkt = db.new_bucket("default").unwrap();
let mut rng = rand::rng();
let value = Arc::new(vec![b'0'; args.value_size]);
let mut key_counts = vec![args.iterations / args.threads; args.threads];
for cnt in key_counts.iter_mut().take(args.iterations % args.threads) {
*cnt += 1;
}
for tid in 0..args.threads {
let mut tk = Vec::with_capacity(key_counts[tid]);
for i in 0..key_counts[tid] {
let mut key = format!("key_{tid}_{i}").into_bytes();
key.resize(args.key_size, b'x');
tk.push(key);
}
if args.random || args.mode == "get" {
tk.shuffle(&mut rng);
}
keys.push(tk);
}
if args.mode == "get" || args.mode == "scan" {
let pre_tx = bkt.begin().unwrap();
(0..args.threads).for_each(|tid| {
for k in &keys[tid] {
pre_tx.put(k, &*value).unwrap();
for tid in 0..args.threads {
let count = args.iterations / args.threads
+ if tid < args.iterations % args.threads {
1
} else {
0
};
for i in 0..count {
let mut key = format!("key_{tid}_{i}").into_bytes();
key.resize(args.key_size, b'x');
pre_tx.put(&key, &*value).unwrap();
}
});
}
pre_tx.commit().unwrap();
drop(bkt);
drop(db);
// re-open db
saved.tmp_store = true;
db = Mace::new(saved.validate().unwrap()).unwrap();
bkt = db.get_bucket("default").unwrap();
// simulate common use cases
let mut rng = rand::rng();
for _ in 0..args.iterations {
let tid = rng.random_range(0..args.threads);
let Some(k) = keys[tid].choose(&mut rng) else {
continue;
};
let count = args.iterations / args.threads
+ if tid < args.iterations % args.threads {
1
} else {
0
};
let idx = rng.random_range(0..count);
let mut key = format!("key_{tid}_{idx}").into_bytes();
key.resize(args.key_size, b'x');
let view = bkt.view().unwrap();
view.get(k).unwrap();
view.get(&key).unwrap();
}
}
let mut key_counts = vec![args.iterations / args.threads; args.threads];
for cnt in key_counts.iter_mut().take(args.iterations % args.threads) {
*cnt += 1;
}
let ready_barrier = Arc::new(std::sync::Barrier::new(args.threads + 1));
let start_barrier = Arc::new(std::sync::Barrier::new(args.threads + 1));
let total_ops = Arc::new(std::sync::atomic::AtomicUsize::new(0));
@ -151,13 +153,14 @@ fn main() {
let h: Vec<JoinHandle<()>> = (0..args.threads)
.map(|tid| {
let db = bkt.clone();
let tk: &Vec<Vec<u8>> = unsafe { std::mem::transmute(&keys[tid]) };
let total_ops = total_ops.clone();
let ready_barrier = Arc::clone(&ready_barrier);
let start_barrier = Arc::clone(&start_barrier);
let mode = args.mode.clone();
let insert_ratio = args.insert_ratio;
let val = value.clone();
let key_count = key_counts[tid];
let key_size = args.key_size;
let prefix = format!("key_{tid}_");
std::thread::spawn(move || {
@ -167,7 +170,9 @@ fn main() {
start_barrier.wait();
match mode.as_str() {
"insert" => {
for key in tk {
for i in 0..key_count {
let mut key = format!("key_{tid}_{i}").into_bytes();
key.resize(key_size, b'x');
round += 1;
let tx = db.begin().unwrap();
tx.put(key.as_slice(), val.as_slice()).unwrap();
@ -175,7 +180,9 @@ fn main() {
}
}
"get" => {
for key in tk {
for i in 0..key_count {
let mut key = format!("key_{tid}_{i}").into_bytes();
key.resize(key_size, b'x');
round += 1;
let tx = db.view().unwrap();
let x = tx.get(key).unwrap();
@ -183,7 +190,9 @@ fn main() {
}
}
"mixed" => {
for key in tk {
for i in 0..key_count {
let mut key = format!("key_{tid}_{i}").into_bytes();
key.resize(key_size, b'x');
let is_insert = rand::random_range(0..100) < insert_ratio;
round += 1;
@ -193,7 +202,7 @@ fn main() {
tx.commit().unwrap();
} else {
let tx = db.view().unwrap();
let x = tx.get(key); // not found
let x = tx.get(key);
let _ = std::hint::black_box(x);
}
}
@ -233,16 +242,17 @@ fn main() {
} else {
0
};
if args.mode == "insert" {
let mut mode = args.mode.clone();
if mode == "insert" {
if args.random {
args.mode = "random_insert".into();
mode = "random_insert".into();
} else {
args.mode = "sequential_insert".into();
mode = "sequential_insert".into();
}
}
eprintln!(
"{},{},{},{},{},{},{}",
args.mode,
mode,
args.threads,
args.key_size,
args.value_size,