diff --git a/rocksdb/main.cpp b/rocksdb/main.cpp index b683aa9..2b97e3e 100644 --- a/rocksdb/main.cpp +++ b/rocksdb/main.cpp @@ -133,19 +133,19 @@ int main(int argc, char *argv[]) { rocksdb::ColumnFamilyOptions cfo{}; cfo.enable_blob_files = true; cfo.min_blob_size = args.blob_size; - // rocksdb::BlockBasedTableOptions top{}; - // top.use_delta_encoding = false; - // cfo.table_factory.reset(rocksdb::NewBlockBasedTableFactory(top)); + cfo.disable_auto_compactions = true; + cfo.max_compaction_bytes = (1ULL << 60); + cfo.level0_stop_writes_trigger = 100000; + cfo.level0_slowdown_writes_trigger = 100000; + cfo.level0_file_num_compaction_trigger = 100000; + cfo.write_buffer_size = 64 << 20; + cfo.max_write_buffer_number = 64; // use 3GB block cache auto cache = rocksdb::NewLRUCache(3 << 30); rocksdb::BlockBasedTableOptions table_options{}; table_options.block_cache = cache; cfo.table_factory.reset(NewBlockBasedTableFactory(table_options)); - // the following three options makes it not trigger GC in test - cfo.level0_file_num_compaction_trigger = 10000; - cfo.write_buffer_size = 64 << 20; - cfo.max_write_buffer_number = 16; std::vector cfd{}; cfd.push_back(rocksdb::ColumnFamilyDescriptor("default", cfo)); @@ -160,7 +160,6 @@ int main(int argc, char *argv[]) { wopt.no_slowdown = true; // wopt.disableWAL = true; std::vector wg; - std::vector> keys{}; std::atomic total_op{0}; rocksdb::OptimisticTransactionDB *db; auto b = nm::Instant::now(); @@ -174,32 +173,16 @@ int main(int argc, char *argv[]) { std::mt19937 gen(rd()); std::string val(args.value_size, 'x'); - std::vector key_counts(args.threads, args.iterations / args.threads); - for (size_t i = 0; i < args.iterations % args.threads; ++i) { - key_counts[i] += 1; - } - keys.reserve(args.threads); - for (size_t tid = 0; tid < args.threads; ++tid) { - std::vector key{}; - key.reserve(key_counts[tid]); - for (size_t i = 0; i < key_counts[tid]; ++i) { - auto tmp = std::format("key_{}_{}", tid, i); - tmp.resize(args.key_size, 'x'); - key.emplace_back(std::move(tmp)); - } - if (args.mode == "get" || args.random) { - std::shuffle(key.begin(), key.end(), gen); - } - keys.emplace_back(std::move(key)); - } auto *handle = handles[0]; if (args.mode == "get" || args.mode == "scan") { auto *kv = db->BeginTransaction(wopt); for (size_t tid = 0; tid < args.threads; ++tid) { - auto *tk = &keys[tid]; - for (auto &key: *tk) { + size_t count = args.iterations / args.threads + (tid < args.iterations % args.threads ? 1 : 0); + for (size_t i = 0; i < count; ++i) { + auto key = std::format("key_{}_{}", tid, i); + key.resize(args.key_size, 'x'); kv->Put(handle, key, val); } } @@ -214,16 +197,15 @@ int main(int argc, char *argv[]) { handle = handles[0]; - // simulate common use cases std::uniform_int_distribution tid_dist(0, args.threads - 1); for (size_t i = 0; i < args.iterations; ++i) { auto tid = tid_dist(gen); - if (keys[tid].empty()) { - continue; - } - std::uniform_int_distribution key_dist(0, keys[tid].size() - 1); - const auto &k = keys[tid][key_dist(gen)]; - auto s = db->Get(rocksdb::ReadOptions(), k, &val); + size_t count = args.iterations / args.threads + (tid < args.iterations % args.threads ? 1 : 0); + std::uniform_int_distribution key_dist(0, count - 1); + auto idx = key_dist(gen); + auto key = std::format("key_{}_{}", tid, idx); + key.resize(args.key_size, 'x'); + auto s = db->Get(rocksdb::ReadOptions(), key, &val); if (!s.ok()) { std::terminate(); } @@ -243,18 +225,21 @@ int main(int argc, char *argv[]) { if (!upper_bound.empty()) { ropt.iterate_upper_bound = &upper_bound_slice; } - auto *tk = &keys[tid]; ropt.prefix_same_as_start = true; ropt.snapshot = snapshot; size_t round = 0; std::mt19937 mixed_gen(static_cast(base_seed) ^ static_cast(0x9e3779b9U * (tid + 1))); std::uniform_int_distribution mixed_dist(0, 99); + size_t key_count = args.iterations / args.threads + (tid < args.iterations % args.threads ? 1 : 0); + ready_barrier.arrive_and_wait(); start_barrier.arrive_and_wait(); if (args.mode == "insert") { - for (auto &key: *tk) { + for (size_t i = 0; i < key_count; ++i) { + auto key = std::format("key_{}_{}", tid, i); + key.resize(args.key_size, 'x'); round += 1; auto *kv = db->BeginTransaction(wopt); kv->Put(handle, key, val); @@ -263,7 +248,9 @@ int main(int argc, char *argv[]) { } } else if (args.mode == "get") { - for (auto &key: *tk) { + for (size_t i = 0; i < key_count; ++i) { + auto key = std::format("key_{}_{}", tid, i); + key.resize(args.key_size, 'x'); round += 1; auto *kv = db->BeginTransaction(wopt); kv->Get(ropt, handle, key, &rval); @@ -271,20 +258,21 @@ int main(int argc, char *argv[]) { delete kv; } } else if (args.mode == "mixed") { - for (auto &key: *tk) { + for (size_t i = 0; i < key_count; ++i) { + auto key = std::format("key_{}_{}", tid, i); + key.resize(args.key_size, 'x'); round += 1; auto is_insert = mixed_dist(mixed_gen) < static_cast(args.insert_ratio); auto *kv = db->BeginTransaction(wopt); if (is_insert) { kv->Put(handle, key, val); } else { - kv->Get(ropt, handle, key, &rval); // not found + kv->Get(ropt, handle, key, &rval); } kv->Commit(); delete kv; } } else if (args.mode == "scan") { - // ropt.pin_data = true; auto *iter = db->NewIterator(ropt); iter->Seek(prefix); size_t n = 0; diff --git a/src/main.rs b/src/main.rs index 8c66ed5..ef124cd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -52,7 +52,7 @@ fn main() { Logger::init().add_file("/tmp/x.log", true); log::set_max_level(log::LevelFilter::Info); } - let mut args = Args::parse(); + let args = Args::parse(); let path = Path::new(&args.path); @@ -86,64 +86,66 @@ fn main() { exit(1); } - let mut keys: Vec>> = Vec::with_capacity(args.threads); let mut opt = Options::new(path); opt.sync_on_write = false; - opt.over_provision = true; // large value will use lots of memeory + opt.over_provision = false; opt.inline_size = args.blob_size; opt.tmp_store = args.mode != "get" && args.mode != "scan"; opt.cache_capacity = 3 << 30; + opt.data_file_size = 64 << 20; + opt.max_log_size = 1 << 30; let mut saved = opt.clone(); saved.tmp_store = false; let mut db = Mace::new(opt.validate().unwrap()).unwrap(); db.disable_gc(); let mut bkt = db.new_bucket("default").unwrap(); - let mut rng = rand::rng(); let value = Arc::new(vec![b'0'; args.value_size]); - let mut key_counts = vec![args.iterations / args.threads; args.threads]; - for cnt in key_counts.iter_mut().take(args.iterations % args.threads) { - *cnt += 1; - } - for tid in 0..args.threads { - let mut tk = Vec::with_capacity(key_counts[tid]); - for i in 0..key_counts[tid] { - let mut key = format!("key_{tid}_{i}").into_bytes(); - key.resize(args.key_size, b'x'); - tk.push(key); - } - if args.random || args.mode == "get" { - tk.shuffle(&mut rng); - } - keys.push(tk); - } if args.mode == "get" || args.mode == "scan" { let pre_tx = bkt.begin().unwrap(); - (0..args.threads).for_each(|tid| { - for k in &keys[tid] { - pre_tx.put(k, &*value).unwrap(); + for tid in 0..args.threads { + let count = args.iterations / args.threads + + if tid < args.iterations % args.threads { + 1 + } else { + 0 + }; + for i in 0..count { + let mut key = format!("key_{tid}_{i}").into_bytes(); + key.resize(args.key_size, b'x'); + pre_tx.put(&key, &*value).unwrap(); } - }); + } pre_tx.commit().unwrap(); drop(bkt); drop(db); - // re-open db saved.tmp_store = true; db = Mace::new(saved.validate().unwrap()).unwrap(); bkt = db.get_bucket("default").unwrap(); - // simulate common use cases + let mut rng = rand::rng(); for _ in 0..args.iterations { let tid = rng.random_range(0..args.threads); - let Some(k) = keys[tid].choose(&mut rng) else { - continue; - }; + let count = args.iterations / args.threads + + if tid < args.iterations % args.threads { + 1 + } else { + 0 + }; + let idx = rng.random_range(0..count); + let mut key = format!("key_{tid}_{idx}").into_bytes(); + key.resize(args.key_size, b'x'); let view = bkt.view().unwrap(); - view.get(k).unwrap(); + view.get(&key).unwrap(); } } + let mut key_counts = vec![args.iterations / args.threads; args.threads]; + for cnt in key_counts.iter_mut().take(args.iterations % args.threads) { + *cnt += 1; + } + let ready_barrier = Arc::new(std::sync::Barrier::new(args.threads + 1)); let start_barrier = Arc::new(std::sync::Barrier::new(args.threads + 1)); let total_ops = Arc::new(std::sync::atomic::AtomicUsize::new(0)); @@ -151,13 +153,14 @@ fn main() { let h: Vec> = (0..args.threads) .map(|tid| { let db = bkt.clone(); - let tk: &Vec> = unsafe { std::mem::transmute(&keys[tid]) }; let total_ops = total_ops.clone(); let ready_barrier = Arc::clone(&ready_barrier); let start_barrier = Arc::clone(&start_barrier); let mode = args.mode.clone(); let insert_ratio = args.insert_ratio; let val = value.clone(); + let key_count = key_counts[tid]; + let key_size = args.key_size; let prefix = format!("key_{tid}_"); std::thread::spawn(move || { @@ -167,7 +170,9 @@ fn main() { start_barrier.wait(); match mode.as_str() { "insert" => { - for key in tk { + for i in 0..key_count { + let mut key = format!("key_{tid}_{i}").into_bytes(); + key.resize(key_size, b'x'); round += 1; let tx = db.begin().unwrap(); tx.put(key.as_slice(), val.as_slice()).unwrap(); @@ -175,7 +180,9 @@ fn main() { } } "get" => { - for key in tk { + for i in 0..key_count { + let mut key = format!("key_{tid}_{i}").into_bytes(); + key.resize(key_size, b'x'); round += 1; let tx = db.view().unwrap(); let x = tx.get(key).unwrap(); @@ -183,7 +190,9 @@ fn main() { } } "mixed" => { - for key in tk { + for i in 0..key_count { + let mut key = format!("key_{tid}_{i}").into_bytes(); + key.resize(key_size, b'x'); let is_insert = rand::random_range(0..100) < insert_ratio; round += 1; @@ -193,7 +202,7 @@ fn main() { tx.commit().unwrap(); } else { let tx = db.view().unwrap(); - let x = tx.get(key); // not found + let x = tx.get(key); let _ = std::hint::black_box(x); } } @@ -233,16 +242,17 @@ fn main() { } else { 0 }; - if args.mode == "insert" { + let mut mode = args.mode.clone(); + if mode == "insert" { if args.random { - args.mode = "random_insert".into(); + mode = "random_insert".into(); } else { - args.mode = "sequential_insert".into(); + mode = "sequential_insert".into(); } } eprintln!( "{},{},{},{},{},{},{}", - args.mode, + mode, args.threads, args.key_size, args.value_size,