From 96f1a44fb702062b47ada78b4bec35fed3951301 Mon Sep 17 00:00:00 2001 From: Rulin Huang Date: Thu, 23 May 2024 23:10:19 -0400 Subject: [PATCH] Parallelize SetState in LaunchParallelMemTableWriters We found that for writers s in STATE_LOCKED_WAITING, the notify-one function needs to be called, and the cost of calling this function is very high especially when there are many writers that need to be awakened. So, we Parallelize this progress. To wake up each writer to write its own memtable, the leader writer first wakes up the (n^0.5-1) caller writers, and then those callers and the leader will wake up n/x separately to write to the memtable. This reduces the number for the leader's to SetState n-1 writers to 2*(n^0.5) writers in turn. vcpu=160, benchmark=db_bench The score is normalized: | case name | optimized/base | |-------------------|----------------| | fillrandom | 182% | | fillseq | 184% | | fillsync | 136% | | overwrite | 179% | | randomreplacekeys | 180% | | randomtransaction | 161% | | updaterandom | 163% | | xorupdaterandom | 165% | --- db/db_impl/db_impl_write.cc | 7 ++- db/write_thread.cc | 56 +++++++++++++++++-- db/write_thread.h | 15 +++++ ...ify-one_i_LaunchParallelMemTableWriters.md | 1 + 4 files changed, 73 insertions(+), 6 deletions(-) create mode 100644 unreleased_history/performance_improvements/parallel_notify-one_i_LaunchParallelMemTableWriters.md diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index 121bb55e912..7ab95683d6a 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -329,6 +329,9 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, StopWatch write_sw(immutable_db_options_.clock, stats_, DB_WRITE); write_thread_.JoinBatchGroup(&w); + if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_CALLER) { + write_thread_.SetMemWritersEachStride(&w); + } if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) { // we are a non-leader in a parallel group @@ -826,7 +829,9 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options, // so we need to set its status to pass ASSERT_STATUS_CHECKED memtable_write_group.status.PermitUncheckedError(); } - + if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_CALLER) { + write_thread_.SetMemWritersEachStride(&w); + } if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) { PERF_TIMER_STOP(write_pre_and_post_process_time); PERF_TIMER_FOR_WAIT_GUARD(write_memtable_time); diff --git a/db/write_thread.cc b/db/write_thread.cc index 39f13c31875..fc74ee3bd3d 100644 --- a/db/write_thread.cc +++ b/db/write_thread.cc @@ -428,6 +428,7 @@ void WriteThread::JoinBatchGroup(Writer* w) { TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:BeganWaiting", w); AwaitState(w, STATE_GROUP_LEADER | STATE_MEMTABLE_WRITER_LEADER | + STATE_PARALLEL_MEMTABLE_CALLER | STATE_PARALLEL_MEMTABLE_WRITER | STATE_COMPLETED, &jbg_ctx); TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:DoneWaiting", w); @@ -656,12 +657,57 @@ void WriteThread::ExitAsMemTableWriter(Writer* /*self*/, SetState(leader, STATE_COMPLETED); } +void WriteThread::SetMemWritersEachStride(Writer* w) { + WriteGroup* write_group = w->write_group; + Writer* last_writer = write_group->last_writer; + + // The stride is the same for each writer in write_group, so w will + // call the writers with the same number in write_group mod total size + size_t stride = static_cast(std::sqrt(write_group->size)); + size_t count = 0; + while (w) { + if (count++ % stride == 0) { + SetState(w, STATE_PARALLEL_MEMTABLE_WRITER); + } + w = (w == last_writer) ? nullptr : w->link_newer; + } +} + void WriteThread::LaunchParallelMemTableWriters(WriteGroup* write_group) { assert(write_group != nullptr); - write_group->running.store(write_group->size); - for (auto w : *write_group) { - SetState(w, STATE_PARALLEL_MEMTABLE_WRITER); + size_t group_size = write_group->size; + write_group->running.store(group_size); + + // The minimum number to allow the group use parallel caller mode. + // The number must no lower than 3; + const size_t MinParallelSize = 20; + + // The group_size is too small, and there is no need to have + // the parallel partial callers. + if (group_size < MinParallelSize) { + for (auto w : *write_group) { + SetState(w, STATE_PARALLEL_MEMTABLE_WRITER); + } + return; } + + // The stride is equal to std::sqrt(group_size) which can minimize + // the total number of leader SetSate. + // Set the leader itself STATE_PARALLEL_MEMTABLE_WRITER, and set + // (stride-1) writers to be STATE_PARALLEL_MEMTABLE_CALLER. + size_t stride = static_cast(std::sqrt(group_size)); + auto w = write_group->leader; + SetState(w, STATE_PARALLEL_MEMTABLE_WRITER); + + for (size_t i = 1; i < stride; i++) { + w = w->link_newer; + SetState(w, STATE_PARALLEL_MEMTABLE_CALLER); + } + + // After setting all STATE_PARALLEL_MEMTABLE_CALLER, the leader also + // does the job as STATE_PARALLEL_MEMTABLE_CALLER. + w = w->link_newer; + SetMemWritersEachStride(w); } static WriteThread::AdaptationContext cpmtw_ctx( @@ -788,8 +834,8 @@ void WriteThread::ExitAsBatchGroupLeader(WriteGroup& write_group, } AwaitState(leader, - STATE_MEMTABLE_WRITER_LEADER | STATE_PARALLEL_MEMTABLE_WRITER | - STATE_COMPLETED, + STATE_MEMTABLE_WRITER_LEADER | STATE_PARALLEL_MEMTABLE_CALLER | + STATE_PARALLEL_MEMTABLE_WRITER | STATE_COMPLETED, &eabgl_ctx); } else { Writer* head = newest_writer_.load(std::memory_order_acquire); diff --git a/db/write_thread.h b/db/write_thread.h index dc64601f9f4..dee42c80a8e 100644 --- a/db/write_thread.h +++ b/db/write_thread.h @@ -71,6 +71,12 @@ class WriteThread { // A state indicating that the thread may be waiting using StateMutex() // and StateCondVar() STATE_LOCKED_WAITING = 32, + + // The state used to inform a waiting writer that it has become a + // caller to call some other waiting writers to write to memtable + // by calling SetMemWritersEachStride. After doing + // this, it will also write to memtable. + STATE_PARALLEL_MEMTABLE_CALLER = 64, }; struct Writer; @@ -323,10 +329,19 @@ class WriteThread { // Causes JoinBatchGroup to return STATE_PARALLEL_MEMTABLE_WRITER for all of // the non-leader members of this write batch group. Sets Writer::sequence // before waking them up. + // If the size of write_group n is not small, the leader will call n^0.5 + // members to be PARALLEL_MEMTABLE_CALLER in the write_group to help to set + // other's status parallel. This ensures that the cost to call SetState + // sequentially does not exceed 2(n^0.5). // // WriteGroup* write_group: Extra state used to coordinate the parallel add void LaunchParallelMemTableWriters(WriteGroup* write_group); + // One of the every stride=N number writer in the WriteGroup are set to the + // MemTableWriters, where N is equal to square of the total number of this + // write_group, and all of these MemTableWriters will write to memtable. + void SetMemWritersEachStride(Writer* w); + // Reports the completion of w's batch to the parallel group leader, and // waits for the rest of the parallel batch to complete. Returns true // if this thread is the last to complete, and hence should advance diff --git a/unreleased_history/performance_improvements/parallel_notify-one_i_LaunchParallelMemTableWriters.md b/unreleased_history/performance_improvements/parallel_notify-one_i_LaunchParallelMemTableWriters.md new file mode 100644 index 00000000000..8fbfca124bc --- /dev/null +++ b/unreleased_history/performance_improvements/parallel_notify-one_i_LaunchParallelMemTableWriters.md @@ -0,0 +1 @@ +Improved write throughput to memtable when there's a large number of concurrent writers and allow_concurrent_memtable_write=true(#12545) \ No newline at end of file