From 5a9abf571b0220ec7c015a921ed83a30a2225595 Mon Sep 17 00:00:00 2001 From: Heather Anderson Date: Wed, 22 Sep 2021 21:49:39 -0700 Subject: [PATCH 01/16] copy the wait predicate into a test before waking the condition, to reduce spurious wakes --- .../src/audio/AudioMixerSlavePool.cpp | 17 +++++++++-------- .../src/avatars/AvatarMixerSlavePool.cpp | 17 +++++++++-------- 2 files changed, 18 insertions(+), 16 deletions(-) diff --git a/assignment-client/src/audio/AudioMixerSlavePool.cpp b/assignment-client/src/audio/AudioMixerSlavePool.cpp index e8a2909acbf..93adabd5d9f 100644 --- a/assignment-client/src/audio/AudioMixerSlavePool.cpp +++ b/assignment-client/src/audio/AudioMixerSlavePool.cpp @@ -53,15 +53,16 @@ void AudioMixerSlaveThread::wait() { } void AudioMixerSlaveThread::notify(bool stopping) { - { - Lock lock(_pool._mutex); - assert(_pool._numFinished < _pool._numThreads); - ++_pool._numFinished; - if (stopping) { - ++_pool._numStopped; - } + Lock lock(_pool._mutex); + assert(_pool._numFinished < _pool._numThreads); + ++_pool._numFinished; + if (stopping) { + ++_pool._numStopped; + } + + if(_pool._numFinished == _pool._numThreads) { + _pool._poolCondition.notify_one(); } - _pool._poolCondition.notify_one(); } bool AudioMixerSlaveThread::try_pop(SharedNodePointer& node) { diff --git a/assignment-client/src/avatars/AvatarMixerSlavePool.cpp b/assignment-client/src/avatars/AvatarMixerSlavePool.cpp index 027e68e88b0..8279e033e95 100644 --- a/assignment-client/src/avatars/AvatarMixerSlavePool.cpp +++ b/assignment-client/src/avatars/AvatarMixerSlavePool.cpp @@ -48,15 +48,16 @@ void AvatarMixerSlaveThread::wait() { } void AvatarMixerSlaveThread::notify(bool stopping) { - { - Lock lock(_pool._mutex); - assert(_pool._numFinished < _pool._numThreads); - ++_pool._numFinished; - if (stopping) { - ++_pool._numStopped; - } + Lock lock(_pool._mutex); + assert(_pool._numFinished < _pool._numThreads); + ++_pool._numFinished; + if (stopping) { + ++_pool._numStopped; + } + + if(_pool._numFinished == _pool._numThreads) { + _pool._poolCondition.notify_one(); } - _pool._poolCondition.notify_one(); } bool AvatarMixerSlaveThread::try_pop(SharedNodePointer& node) { From e83985789128fda2572f48e2e249e16d56219cb2 Mon Sep 17 00:00:00 2001 From: Heather Anderson Date: Wed, 22 Sep 2021 22:01:19 -0700 Subject: [PATCH 02/16] breaking the pool _mutex into two smaller _slaveMutex and _poolMutex mutexes to reduce lock contention. --- .../src/audio/AudioMixerSlavePool.cpp | 40 +++++++++++++------ .../src/audio/AudioMixerSlavePool.h | 11 ++--- .../src/avatars/AvatarMixerSlavePool.cpp | 40 +++++++++++++------ .../src/avatars/AvatarMixerSlavePool.h | 11 ++--- 4 files changed, 66 insertions(+), 36 deletions(-) diff --git a/assignment-client/src/audio/AudioMixerSlavePool.cpp b/assignment-client/src/audio/AudioMixerSlavePool.cpp index 93adabd5d9f..825ce9bf6ea 100644 --- a/assignment-client/src/audio/AudioMixerSlavePool.cpp +++ b/assignment-client/src/audio/AudioMixerSlavePool.cpp @@ -38,8 +38,8 @@ void AudioMixerSlaveThread::run() { void AudioMixerSlaveThread::wait() { { - Lock lock(_pool._mutex); - _pool._slaveCondition.wait(lock, [&] { + Lock slaveLock(_pool._slaveMutex); + _pool._slaveCondition.wait(slaveLock, [&] { assert(_pool._numStarted <= _pool._numThreads); return _pool._numStarted != _pool._numThreads; }); @@ -53,7 +53,7 @@ void AudioMixerSlaveThread::wait() { } void AudioMixerSlaveThread::notify(bool stopping) { - Lock lock(_pool._mutex); + Lock poolLock(_pool._poolMutex); assert(_pool._numFinished < _pool._numThreads); ++_pool._numFinished; if (stopping) { @@ -94,19 +94,27 @@ void AudioMixerSlavePool::run(ConstIter begin, ConstIter end) { }); { - Lock lock(_mutex); + Lock poolLock(_poolMutex); // run - _numStarted = _numFinished = 0; - _slaveCondition.notify_all(); + { + Lock slaveLock(_slaveMutex); + _numStarted = _numFinished = 0; + _slaveCondition.notify_all(); + } // wait - _poolCondition.wait(lock, [&] { + _poolCondition.wait(poolLock, [&] { assert(_numFinished <= _numThreads); return _numFinished == _numThreads; }); - assert(_numStarted == _numThreads); +#ifndef NDEBUG + { + Lock slaveLock(_slaveMutex); + assert(_numStarted == _numThreads); + } +#endif } assert(_queue.empty()); @@ -156,7 +164,7 @@ void AudioMixerSlavePool::resize(int numThreads) { qDebug("%s: set %d threads (was %d)", __FUNCTION__, numThreads, _numThreads); - Lock lock(_mutex); + Lock poolLock(_poolMutex); if (numThreads > _numThreads) { // start new slaves @@ -179,9 +187,12 @@ void AudioMixerSlavePool::resize(int numThreads) { // ...cycle them until they do stop... _numStopped = 0; while (_numStopped != (_numThreads - numThreads)) { - _numStarted = _numFinished = _numStopped; - _slaveCondition.notify_all(); - _poolCondition.wait(lock, [&] { + { + Lock slaveLock(_slaveMutex); + _numStarted = _numFinished = _numStopped; + _slaveCondition.notify_all(); + } + _poolCondition.wait(poolLock, [&] { assert(_numFinished <= _numThreads); return _numFinished == _numThreads; }); @@ -200,6 +211,9 @@ void AudioMixerSlavePool::resize(int numThreads) { _slaves.erase(extraBegin, _slaves.end()); } - _numThreads = _numStarted = _numFinished = numThreads; + { + Lock slaveLock(_slaveMutex); + _numThreads = _numStarted = _numFinished = numThreads; + } assert(_numThreads == (int)_slaves.size()); } diff --git a/assignment-client/src/audio/AudioMixerSlavePool.h b/assignment-client/src/audio/AudioMixerSlavePool.h index 3807db05410..9425f4616bc 100644 --- a/assignment-client/src/audio/AudioMixerSlavePool.h +++ b/assignment-client/src/audio/AudioMixerSlavePool.h @@ -90,15 +90,16 @@ class AudioMixerSlavePool { friend bool AudioMixerSlaveThread::try_pop(SharedNodePointer& node); // synchronization state - Mutex _mutex; - ConditionVariable _slaveCondition; + Mutex _poolMutex; ConditionVariable _poolCondition; + Mutex _slaveMutex; // subservient to _poolMutex, do not lock _poolMutex while holding _slaveMutex! + ConditionVariable _slaveCondition; void (AudioMixerSlave::*_function)(const SharedNodePointer& node); std::function _configure; int _numThreads { 0 }; - int _numStarted { 0 }; // guarded by _mutex - int _numFinished { 0 }; // guarded by _mutex - int _numStopped { 0 }; // guarded by _mutex + int _numStarted { 0 }; // guarded by _slaveMutex + int _numFinished { 0 }; // guarded by _poolMutex + int _numStopped { 0 }; // guarded by _poolMutex // frame state Queue _queue; diff --git a/assignment-client/src/avatars/AvatarMixerSlavePool.cpp b/assignment-client/src/avatars/AvatarMixerSlavePool.cpp index 8279e033e95..16269ee7a5e 100644 --- a/assignment-client/src/avatars/AvatarMixerSlavePool.cpp +++ b/assignment-client/src/avatars/AvatarMixerSlavePool.cpp @@ -34,8 +34,8 @@ void AvatarMixerSlaveThread::run() { void AvatarMixerSlaveThread::wait() { { - Lock lock(_pool._mutex); - _pool._slaveCondition.wait(lock, [&] { + Lock slaveLock(_pool._slaveMutex); + _pool._slaveCondition.wait(slaveLock, [&] { assert(_pool._numStarted <= _pool._numThreads); return _pool._numStarted != _pool._numThreads; }); @@ -48,7 +48,7 @@ void AvatarMixerSlaveThread::wait() { } void AvatarMixerSlaveThread::notify(bool stopping) { - Lock lock(_pool._mutex); + Lock poolLock(_pool._poolMutex); assert(_pool._numFinished < _pool._numThreads); ++_pool._numFinished; if (stopping) { @@ -93,19 +93,27 @@ void AvatarMixerSlavePool::run(ConstIter begin, ConstIter end) { }); { - Lock lock(_mutex); + Lock poolLock(_poolMutex); // run - _numStarted = _numFinished = 0; - _slaveCondition.notify_all(); + { + Lock slaveLock(_slaveMutex); + _numStarted = _numFinished = 0; + _slaveCondition.notify_all(); + } // wait - _poolCondition.wait(lock, [&] { + _poolCondition.wait(poolLock, [&] { assert(_numFinished <= _numThreads); return _numFinished == _numThreads; }); - assert(_numStarted == _numThreads); +#ifndef NDEBUG + { + Lock slaveLock(_slaveMutex); + assert(_numStarted == _numThreads); + } +#endif } assert(_queue.empty()); @@ -156,7 +164,7 @@ void AvatarMixerSlavePool::resize(int numThreads) { qDebug("%s: set %d threads (was %d)", __FUNCTION__, numThreads, _numThreads); - Lock lock(_mutex); + Lock poolLock(_poolMutex); if (numThreads > _numThreads) { // start new slaves @@ -178,9 +186,12 @@ void AvatarMixerSlavePool::resize(int numThreads) { // ...cycle them until they do stop... _numStopped = 0; while (_numStopped != (_numThreads - numThreads)) { - _numStarted = _numFinished = _numStopped; - _slaveCondition.notify_all(); - _poolCondition.wait(lock, [&] { + { + Lock slaveLock(_slaveMutex); + _numStarted = _numFinished = _numStopped; + _slaveCondition.notify_all(); + } + _poolCondition.wait(poolLock, [&] { assert(_numFinished <= _numThreads); return _numFinished == _numThreads; }); @@ -199,6 +210,9 @@ void AvatarMixerSlavePool::resize(int numThreads) { _slaves.erase(extraBegin, _slaves.end()); } - _numThreads = _numStarted = _numFinished = numThreads; + { + Lock slaveLock(_slaveMutex); + _numThreads = _numStarted = _numFinished = numThreads; + } assert(_numThreads == (int)_slaves.size()); } diff --git a/assignment-client/src/avatars/AvatarMixerSlavePool.h b/assignment-client/src/avatars/AvatarMixerSlavePool.h index 915c6d8dc44..8fa91ddb8c9 100644 --- a/assignment-client/src/avatars/AvatarMixerSlavePool.h +++ b/assignment-client/src/avatars/AvatarMixerSlavePool.h @@ -95,9 +95,10 @@ class AvatarMixerSlavePool { friend bool AvatarMixerSlaveThread::try_pop(SharedNodePointer& node); // synchronization state - Mutex _mutex; - ConditionVariable _slaveCondition; + Mutex _poolMutex; ConditionVariable _poolCondition; + Mutex _slaveMutex; // subservient to _poolMutex, do not lock _poolMutex while holding _slaveMutex! + ConditionVariable _slaveCondition; void (AvatarMixerSlave::*_function)(const SharedNodePointer& node); std::function _configure; @@ -105,9 +106,9 @@ class AvatarMixerSlavePool { float _priorityReservedFraction { 0.4f }; int _numThreads { 0 }; - int _numStarted { 0 }; // guarded by _mutex - int _numFinished { 0 }; // guarded by _mutex - int _numStopped { 0 }; // guarded by _mutex + int _numStarted { 0 }; // guarded by _slaveMutex + int _numFinished { 0 }; // guarded by _poolMutex + int _numStopped { 0 }; // guarded by _poolMutex // frame state Queue _queue; From 0c35b439e841659cc8454136f236c7b470e86fa6 Mon Sep 17 00:00:00 2001 From: Heather Anderson Date: Thu, 23 Sep 2021 10:40:29 -0700 Subject: [PATCH 03/16] pull out data shared between threads / remove "friend" declarations --- .../src/audio/AudioMixerSlavePool.cpp | 2 +- .../src/audio/AudioMixerSlavePool.h | 54 +++++++++--------- .../src/avatars/AvatarMixerSlavePool.cpp | 2 +- .../src/avatars/AvatarMixerSlavePool.h | 55 ++++++++++--------- 4 files changed, 58 insertions(+), 55 deletions(-) diff --git a/assignment-client/src/audio/AudioMixerSlavePool.cpp b/assignment-client/src/audio/AudioMixerSlavePool.cpp index 825ce9bf6ea..2558e35fd82 100644 --- a/assignment-client/src/audio/AudioMixerSlavePool.cpp +++ b/assignment-client/src/audio/AudioMixerSlavePool.cpp @@ -180,7 +180,7 @@ void AudioMixerSlavePool::resize(int numThreads) { // mark slaves to stop... auto slave = extraBegin; while (slave != _slaves.end()) { - (*slave)->_stop = true; + (*slave)->stop(); ++slave; } diff --git a/assignment-client/src/audio/AudioMixerSlavePool.h b/assignment-client/src/audio/AudioMixerSlavePool.h index 9425f4616bc..c0066adab91 100644 --- a/assignment-client/src/audio/AudioMixerSlavePool.h +++ b/assignment-client/src/audio/AudioMixerSlavePool.h @@ -22,7 +22,30 @@ #include "AudioMixerSlave.h" -class AudioMixerSlavePool; +// Private slave pool data that is shared and accessible with the slave threads. This describes +// what information is needs to be thread-safe +struct AudioMixerSlavePoolData { + using Queue = tbb::concurrent_queue; + using Mutex = std::mutex; + using ConditionVariable = std::condition_variable; + + // synchronization state + Mutex _poolMutex; + ConditionVariable _poolCondition; + Mutex _slaveMutex; // subservient to _poolMutex, do not lock _poolMutex while holding _slaveMutex! + ConditionVariable _slaveCondition; + + void (AudioMixerSlave::*_function)(const SharedNodePointer& node); + std::function _configure; + + int _numThreads{ 0 }; + int _numStarted{ 0 }; // guarded by _slaveMutex + int _numFinished{ 0 }; // guarded by _poolMutex + int _numStopped{ 0 }; // guarded by _poolMutex + + // frame state + Queue _queue; +}; class AudioMixerSlaveThread : public QThread, public AudioMixerSlave { Q_OBJECT @@ -31,30 +54,26 @@ class AudioMixerSlaveThread : public QThread, public AudioMixerSlave { using Lock = std::unique_lock; public: - AudioMixerSlaveThread(AudioMixerSlavePool& pool, AudioMixerSlave::SharedData& sharedData) + AudioMixerSlaveThread(AudioMixerSlavePoolData& pool, AudioMixerSlave::SharedData& sharedData) : AudioMixerSlave(sharedData), _pool(pool) {} void run() override final; + inline void stop() { _stop = true; } private: - friend class AudioMixerSlavePool; - void wait(); void notify(bool stopping); bool try_pop(SharedNodePointer& node); - AudioMixerSlavePool& _pool; + AudioMixerSlavePoolData& _pool; void (AudioMixerSlave::*_function)(const SharedNodePointer& node) { nullptr }; bool _stop { false }; }; // Slave pool for audio mixers // AudioMixerSlavePool is not thread-safe! It should be instantiated and used from a single thread. -class AudioMixerSlavePool { - using Queue = tbb::concurrent_queue; - using Mutex = std::mutex; +class AudioMixerSlavePool : private AudioMixerSlavePoolData { using Lock = std::unique_lock; - using ConditionVariable = std::condition_variable; public: using ConstIter = NodeList::const_iterator; @@ -85,24 +104,7 @@ class AudioMixerSlavePool { std::vector> _slaves; - friend void AudioMixerSlaveThread::wait(); - friend void AudioMixerSlaveThread::notify(bool stopping); - friend bool AudioMixerSlaveThread::try_pop(SharedNodePointer& node); - - // synchronization state - Mutex _poolMutex; - ConditionVariable _poolCondition; - Mutex _slaveMutex; // subservient to _poolMutex, do not lock _poolMutex while holding _slaveMutex! - ConditionVariable _slaveCondition; - void (AudioMixerSlave::*_function)(const SharedNodePointer& node); - std::function _configure; - int _numThreads { 0 }; - int _numStarted { 0 }; // guarded by _slaveMutex - int _numFinished { 0 }; // guarded by _poolMutex - int _numStopped { 0 }; // guarded by _poolMutex - // frame state - Queue _queue; ConstIter _begin; ConstIter _end; diff --git a/assignment-client/src/avatars/AvatarMixerSlavePool.cpp b/assignment-client/src/avatars/AvatarMixerSlavePool.cpp index 16269ee7a5e..290228ce3fe 100644 --- a/assignment-client/src/avatars/AvatarMixerSlavePool.cpp +++ b/assignment-client/src/avatars/AvatarMixerSlavePool.cpp @@ -179,7 +179,7 @@ void AvatarMixerSlavePool::resize(int numThreads) { // mark slaves to stop... auto slave = extraBegin; while (slave != _slaves.end()) { - (*slave)->_stop = true; + (*slave)->stop(); ++slave; } diff --git a/assignment-client/src/avatars/AvatarMixerSlavePool.h b/assignment-client/src/avatars/AvatarMixerSlavePool.h index 8fa91ddb8c9..d2e1a8e2510 100644 --- a/assignment-client/src/avatars/AvatarMixerSlavePool.h +++ b/assignment-client/src/avatars/AvatarMixerSlavePool.h @@ -25,7 +25,30 @@ #include "AvatarMixerSlave.h" -class AvatarMixerSlavePool; +// Private slave pool data that is shared and accessible with the slave threads. This describes +// what information is needs to be thread-safe +struct AvatarMixerSlavePoolData { + using Queue = tbb::concurrent_queue; + using Mutex = std::mutex; + using ConditionVariable = std::condition_variable; + + // synchronization state + Mutex _poolMutex; + ConditionVariable _poolCondition; + Mutex _slaveMutex; // subservient to _poolMutex, do not lock _poolMutex while holding _slaveMutex! + ConditionVariable _slaveCondition; + + void (AvatarMixerSlave::*_function)(const SharedNodePointer& node); + std::function _configure; + + int _numThreads{ 0 }; + int _numStarted{ 0 }; // guarded by _slaveMutex + int _numFinished{ 0 }; // guarded by _poolMutex + int _numStopped{ 0 }; // guarded by _poolMutex + + // frame state + Queue _queue; +}; class AvatarMixerSlaveThread : public QThread, public AvatarMixerSlave { Q_OBJECT @@ -34,30 +57,26 @@ class AvatarMixerSlaveThread : public QThread, public AvatarMixerSlave { using Lock = std::unique_lock; public: - AvatarMixerSlaveThread(AvatarMixerSlavePool& pool, SlaveSharedData* slaveSharedData) : + AvatarMixerSlaveThread(AvatarMixerSlavePoolData& pool, SlaveSharedData* slaveSharedData) : AvatarMixerSlave(slaveSharedData), _pool(pool) {}; void run() override final; + inline void stop() { _stop = true; } private: - friend class AvatarMixerSlavePool; - void wait(); void notify(bool stopping); bool try_pop(SharedNodePointer& node); - AvatarMixerSlavePool& _pool; + AvatarMixerSlavePoolData& _pool; void (AvatarMixerSlave::*_function)(const SharedNodePointer& node) { nullptr }; bool _stop { false }; }; // Slave pool for avatar mixers // AvatarMixerSlavePool is not thread-safe! It should be instantiated and used from a single thread. -class AvatarMixerSlavePool { - using Queue = tbb::concurrent_queue; - using Mutex = std::mutex; +class AvatarMixerSlavePool : private AvatarMixerSlavePoolData { using Lock = std::unique_lock; - using ConditionVariable = std::condition_variable; public: using ConstIter = NodeList::const_iterator; @@ -90,28 +109,10 @@ class AvatarMixerSlavePool { std::vector> _slaves; - friend void AvatarMixerSlaveThread::wait(); - friend void AvatarMixerSlaveThread::notify(bool stopping); - friend bool AvatarMixerSlaveThread::try_pop(SharedNodePointer& node); - - // synchronization state - Mutex _poolMutex; - ConditionVariable _poolCondition; - Mutex _slaveMutex; // subservient to _poolMutex, do not lock _poolMutex while holding _slaveMutex! - ConditionVariable _slaveCondition; - void (AvatarMixerSlave::*_function)(const SharedNodePointer& node); - std::function _configure; - // Set from Domain Settings: float _priorityReservedFraction { 0.4f }; - int _numThreads { 0 }; - - int _numStarted { 0 }; // guarded by _slaveMutex - int _numFinished { 0 }; // guarded by _poolMutex - int _numStopped { 0 }; // guarded by _poolMutex // frame state - Queue _queue; ConstIter _begin; ConstIter _end; From e60db7875ca463fcc204b278fb2557a2aabc25cf Mon Sep 17 00:00:00 2001 From: Heather Anderson Date: Thu, 23 Sep 2021 20:51:47 -0700 Subject: [PATCH 04/16] add shared_mutex in an attempt to reduce the number of mutexes used in managing slaves and shared data --- .../src/audio/AudioMixerSlavePool.cpp | 100 +++++++---- .../src/audio/AudioMixerSlavePool.h | 29 ++- .../src/avatars/AvatarMixerSlavePool.cpp | 107 +++++++----- .../src/avatars/AvatarMixerSlavePool.h | 29 ++- libraries/shared/src/SharedMutex.cpp | 90 ++++++++++ libraries/shared/src/SharedMutex.h | 165 ++++++++++++++++++ 6 files changed, 429 insertions(+), 91 deletions(-) create mode 100644 libraries/shared/src/SharedMutex.cpp create mode 100644 libraries/shared/src/SharedMutex.h diff --git a/assignment-client/src/audio/AudioMixerSlavePool.cpp b/assignment-client/src/audio/AudioMixerSlavePool.cpp index 2558e35fd82..0a84f019c45 100644 --- a/assignment-client/src/audio/AudioMixerSlavePool.cpp +++ b/assignment-client/src/audio/AudioMixerSlavePool.cpp @@ -18,6 +18,32 @@ #include +template +class lock_anti_guard { +public: + using mutex_type = _Mutex; + + explicit lock_anti_guard(std::unique_lock<_Mutex>& lock) : + _MyMutex(*lock.mutex()), _Owns(lock.owns_lock()) { // construct and unlock + if (_Owns) { + _MyMutex.unlock(); + } + } + + ~lock_anti_guard() noexcept { + if (_Owns) { + _MyMutex.lock(); + } + } + + lock_anti_guard(const lock_anti_guard&) = delete; + lock_anti_guard& operator=(const lock_anti_guard&) = delete; + +private: + _Mutex& _MyMutex; + bool _Owns; +}; + void AudioMixerSlaveThread::run() { while (true) { wait(); @@ -40,11 +66,13 @@ void AudioMixerSlaveThread::wait() { { Lock slaveLock(_pool._slaveMutex); _pool._slaveCondition.wait(slaveLock, [&] { + shared_lock activeLock(_pool._slavesActive); assert(_pool._numStarted <= _pool._numThreads); return _pool._numStarted != _pool._numThreads; }); - ++_pool._numStarted; } + shared_lock activeLock(_pool._slavesActive); + ++_pool._numStarted; if (_pool._configure) { _pool._configure(*this); @@ -53,14 +81,14 @@ void AudioMixerSlaveThread::wait() { } void AudioMixerSlaveThread::notify(bool stopping) { - Lock poolLock(_pool._poolMutex); + shared_lock activeLock(_pool._slavesActive); assert(_pool._numFinished < _pool._numThreads); - ++_pool._numFinished; + int numFinished = ++_pool._numFinished; if (stopping) { ++_pool._numStopped; } - if(_pool._numFinished == _pool._numThreads) { + if (numFinished == _pool._numThreads) { _pool._poolCondition.notify_one(); } } @@ -70,16 +98,20 @@ bool AudioMixerSlaveThread::try_pop(SharedNodePointer& node) { } void AudioMixerSlavePool::processPackets(ConstIter begin, ConstIter end) { - _function = &AudioMixerSlave::processPackets; - _configure = [](AudioMixerSlave& slave) {}; + { + std::lock_guard activeLock(_slavesActive); + _function = &AudioMixerSlave::processPackets; + _configure = [](AudioMixerSlave& slave) {}; + } run(begin, end); } void AudioMixerSlavePool::mix(ConstIter begin, ConstIter end, unsigned int frame, int numToRetain) { - _function = &AudioMixerSlave::mix; - _configure = [=](AudioMixerSlave& slave) { - slave.configureMix(_begin, _end, frame, numToRetain); - }; + { + std::lock_guard activeLock(_slavesActive); + _function = &AudioMixerSlave::mix; + _configure = [=](AudioMixerSlave& slave) { slave.configureMix(_begin, _end, frame, numToRetain); }; + } run(begin, end); } @@ -94,27 +126,24 @@ void AudioMixerSlavePool::run(ConstIter begin, ConstIter end) { }); { - Lock poolLock(_poolMutex); + std::unique_lock activeLock(_slavesActive); // run - { - Lock slaveLock(_slaveMutex); - _numStarted = _numFinished = 0; - _slaveCondition.notify_all(); - } + _numStarted = _numFinished = 0; + _slaveCondition.notify_all(); // wait - _poolCondition.wait(poolLock, [&] { - assert(_numFinished <= _numThreads); - return _numFinished == _numThreads; - }); - -#ifndef NDEBUG { - Lock slaveLock(_slaveMutex); - assert(_numStarted == _numThreads); + lock_anti_guard releaseActiveLock(activeLock); + Lock poolLock(_poolMutex); + _poolCondition.wait(poolLock, [&] { + shared_lock activeLock(_slavesActive); + assert(_numFinished <= _numThreads); + return _numFinished == _numThreads; + }); } -#endif + + assert(_numStarted == _numThreads); } assert(_queue.empty()); @@ -160,11 +189,10 @@ void AudioMixerSlavePool::setNumThreads(int numThreads) { } void AudioMixerSlavePool::resize(int numThreads) { - assert(_numThreads == (int)_slaves.size()); - qDebug("%s: set %d threads (was %d)", __FUNCTION__, numThreads, _numThreads); - Lock poolLock(_poolMutex); + std::unique_lock activeLock(_slavesActive); + assert(_numThreads == (int)_slaves.size()); if (numThreads > _numThreads) { // start new slaves @@ -187,12 +215,13 @@ void AudioMixerSlavePool::resize(int numThreads) { // ...cycle them until they do stop... _numStopped = 0; while (_numStopped != (_numThreads - numThreads)) { - { - Lock slaveLock(_slaveMutex); - _numStarted = _numFinished = _numStopped; - _slaveCondition.notify_all(); - } + _numStarted = _numFinished = _numStopped.load(); + _slaveCondition.notify_all(); + + lock_anti_guard releaseActiveLock(activeLock); + Lock poolLock(_poolMutex); _poolCondition.wait(poolLock, [&] { + shared_lock activeLock(_slavesActive); assert(_numFinished <= _numThreads); return _numFinished == _numThreads; }); @@ -211,9 +240,6 @@ void AudioMixerSlavePool::resize(int numThreads) { _slaves.erase(extraBegin, _slaves.end()); } - { - Lock slaveLock(_slaveMutex); - _numThreads = _numStarted = _numFinished = numThreads; - } + _numThreads = _numStarted = _numFinished = numThreads; assert(_numThreads == (int)_slaves.size()); } diff --git a/assignment-client/src/audio/AudioMixerSlavePool.h b/assignment-client/src/audio/AudioMixerSlavePool.h index c0066adab91..efa71fed7aa 100644 --- a/assignment-client/src/audio/AudioMixerSlavePool.h +++ b/assignment-client/src/audio/AudioMixerSlavePool.h @@ -19,6 +19,7 @@ #include #include #include +#include #include "AudioMixerSlave.h" @@ -28,20 +29,33 @@ struct AudioMixerSlavePoolData { using Queue = tbb::concurrent_queue; using Mutex = std::mutex; using ConditionVariable = std::condition_variable; + using RWMutex = shared_mutex; // synchronization state - Mutex _poolMutex; + shared_mutex _slavesActive; // shared_lock by slaves while running, lock by pool when configuring + Mutex _poolMutex; // only used for _poolCondition at the moment ConditionVariable _poolCondition; - Mutex _slaveMutex; // subservient to _poolMutex, do not lock _poolMutex while holding _slaveMutex! + Mutex _slaveMutex; // only used for _slaveCondition at the moment ConditionVariable _slaveCondition; - void (AudioMixerSlave::*_function)(const SharedNodePointer& node); - std::function _configure; + void (AudioMixerSlave::*_function)(const SharedNodePointer& node); // guarded by _slavesActive: r/o when shared, r/w when locked + std::function _configure; // guarded by _slavesActive: r/o when shared, r/w when locked + // Number of currently-running slave threads + // guarded by _slavesActive: r/o when shared, r/w when locked int _numThreads{ 0 }; - int _numStarted{ 0 }; // guarded by _slaveMutex - int _numFinished{ 0 }; // guarded by _poolMutex - int _numStopped{ 0 }; // guarded by _poolMutex + + // Number of slave threads "awake" and processing the current request (0 <= _numStarted <= _numThreads) + // guarded by _slavesActive: incremented when shared, r/w when locked + std::atomic _numStarted{ 0 }; + + // Number of slave threads finished with the current request (0 <= _numStarted <= _numThreads) + // guarded by _slavesActive: incremented when shared, r/w when locked + std::atomic _numFinished{ 0 }; + + // Number of slave threads shutting down when asked to (0 <= _numStarted <= _numThreads) + // guarded by _slavesActive: incremented when shared, r/w when locked + std::atomic _numStopped{ 0 }; // frame state Queue _queue; @@ -52,6 +66,7 @@ class AudioMixerSlaveThread : public QThread, public AudioMixerSlave { using ConstIter = NodeList::const_iterator; using Mutex = std::mutex; using Lock = std::unique_lock; + using RWMutex = shared_mutex; public: AudioMixerSlaveThread(AudioMixerSlavePoolData& pool, AudioMixerSlave::SharedData& sharedData) diff --git a/assignment-client/src/avatars/AvatarMixerSlavePool.cpp b/assignment-client/src/avatars/AvatarMixerSlavePool.cpp index 290228ce3fe..3c1556b1f5e 100644 --- a/assignment-client/src/avatars/AvatarMixerSlavePool.cpp +++ b/assignment-client/src/avatars/AvatarMixerSlavePool.cpp @@ -14,6 +14,32 @@ #include #include +template +class lock_anti_guard { +public: + using mutex_type = _Mutex; + + explicit lock_anti_guard(std::unique_lock<_Mutex>& lock) : + _MyMutex(*lock.mutex()), _Owns(lock.owns_lock()) { // construct and unlock + if (_Owns) { + _MyMutex.unlock(); + } + } + + ~lock_anti_guard() noexcept { + if (_Owns) { + _MyMutex.lock(); + } + } + + lock_anti_guard(const lock_anti_guard&) = delete; + lock_anti_guard& operator=(const lock_anti_guard&) = delete; + +private: + _Mutex& _MyMutex; + bool _Owns; +}; + void AvatarMixerSlaveThread::run() { while (true) { wait(); @@ -36,11 +62,14 @@ void AvatarMixerSlaveThread::wait() { { Lock slaveLock(_pool._slaveMutex); _pool._slaveCondition.wait(slaveLock, [&] { + shared_lock activeLock(_pool._slavesActive); assert(_pool._numStarted <= _pool._numThreads); return _pool._numStarted != _pool._numThreads; }); - ++_pool._numStarted; } + shared_lock activeLock(_pool._slavesActive); + ++_pool._numStarted; + if (_pool._configure) { _pool._configure(*this); } @@ -48,14 +77,14 @@ void AvatarMixerSlaveThread::wait() { } void AvatarMixerSlaveThread::notify(bool stopping) { - Lock poolLock(_pool._poolMutex); + shared_lock activeLock(_pool._slavesActive); assert(_pool._numFinished < _pool._numThreads); - ++_pool._numFinished; + int numFinished = ++_pool._numFinished; if (stopping) { ++_pool._numStopped; } - if(_pool._numFinished == _pool._numThreads) { + if (numFinished == _pool._numThreads) { _pool._poolCondition.notify_one(); } } @@ -65,21 +94,25 @@ bool AvatarMixerSlaveThread::try_pop(SharedNodePointer& node) { } void AvatarMixerSlavePool::processIncomingPackets(ConstIter begin, ConstIter end) { - _function = &AvatarMixerSlave::processIncomingPackets; - _configure = [=](AvatarMixerSlave& slave) { - slave.configure(begin, end); - }; + { + std::lock_guard activeLock(_slavesActive); + _function = &AvatarMixerSlave::processIncomingPackets; + _configure = [=](AvatarMixerSlave& slave) { slave.configure(begin, end); }; + } run(begin, end); } void AvatarMixerSlavePool::broadcastAvatarData(ConstIter begin, ConstIter end, p_high_resolution_clock::time_point lastFrameTimestamp, float maxKbpsPerNode, float throttlingRatio) { - _function = &AvatarMixerSlave::broadcastAvatarData; - _configure = [=](AvatarMixerSlave& slave) { - slave.configureBroadcast(begin, end, lastFrameTimestamp, maxKbpsPerNode, throttlingRatio, - _priorityReservedFraction); - }; + { + std::lock_guard activeLock(_slavesActive); + _function = &AvatarMixerSlave::broadcastAvatarData; + _configure = [=](AvatarMixerSlave& slave) { + slave.configureBroadcast(begin, end, lastFrameTimestamp, maxKbpsPerNode, throttlingRatio, + _priorityReservedFraction); + }; + } run(begin, end); } @@ -93,27 +126,24 @@ void AvatarMixerSlavePool::run(ConstIter begin, ConstIter end) { }); { - Lock poolLock(_poolMutex); + std::unique_lock activeLock(_slavesActive); // run - { - Lock slaveLock(_slaveMutex); - _numStarted = _numFinished = 0; - _slaveCondition.notify_all(); - } + _numStarted = _numFinished = 0; + _slaveCondition.notify_all(); // wait - _poolCondition.wait(poolLock, [&] { - assert(_numFinished <= _numThreads); - return _numFinished == _numThreads; - }); - -#ifndef NDEBUG { - Lock slaveLock(_slaveMutex); - assert(_numStarted == _numThreads); + lock_anti_guard releaseActiveLock(activeLock); + Lock poolLock(_poolMutex); + _poolCondition.wait(poolLock, [&] { + shared_lock activeLock(_slavesActive); + assert(_numFinished <= _numThreads); + return _numFinished == _numThreads; + }); } -#endif + + assert(_numStarted == _numThreads); } assert(_queue.empty()); @@ -160,11 +190,10 @@ void AvatarMixerSlavePool::setNumThreads(int numThreads) { } void AvatarMixerSlavePool::resize(int numThreads) { - assert(_numThreads == (int)_slaves.size()); - qDebug("%s: set %d threads (was %d)", __FUNCTION__, numThreads, _numThreads); - Lock poolLock(_poolMutex); + std::unique_lock activeLock(_slavesActive); + assert(_numThreads == (int)_slaves.size()); if (numThreads > _numThreads) { // start new slaves @@ -186,12 +215,13 @@ void AvatarMixerSlavePool::resize(int numThreads) { // ...cycle them until they do stop... _numStopped = 0; while (_numStopped != (_numThreads - numThreads)) { - { - Lock slaveLock(_slaveMutex); - _numStarted = _numFinished = _numStopped; - _slaveCondition.notify_all(); - } + _numStarted = _numFinished = _numStopped.load(); + _slaveCondition.notify_all(); + + lock_anti_guard releaseActiveLock(activeLock); + Lock poolLock(_poolMutex); _poolCondition.wait(poolLock, [&] { + shared_lock activeLock(_slavesActive); assert(_numFinished <= _numThreads); return _numFinished == _numThreads; }); @@ -210,9 +240,6 @@ void AvatarMixerSlavePool::resize(int numThreads) { _slaves.erase(extraBegin, _slaves.end()); } - { - Lock slaveLock(_slaveMutex); - _numThreads = _numStarted = _numFinished = numThreads; - } + _numThreads = _numStarted = _numFinished = numThreads; assert(_numThreads == (int)_slaves.size()); } diff --git a/assignment-client/src/avatars/AvatarMixerSlavePool.h b/assignment-client/src/avatars/AvatarMixerSlavePool.h index d2e1a8e2510..fe305d0a43b 100644 --- a/assignment-client/src/avatars/AvatarMixerSlavePool.h +++ b/assignment-client/src/avatars/AvatarMixerSlavePool.h @@ -21,6 +21,7 @@ #include #include #include +#include #include "AvatarMixerSlave.h" @@ -31,20 +32,33 @@ struct AvatarMixerSlavePoolData { using Queue = tbb::concurrent_queue; using Mutex = std::mutex; using ConditionVariable = std::condition_variable; + using RWMutex = shared_mutex; // synchronization state - Mutex _poolMutex; + shared_mutex _slavesActive; // shared_lock by slaves while running, lock by pool when configuring + Mutex _poolMutex; // only used for _poolCondition at the moment ConditionVariable _poolCondition; - Mutex _slaveMutex; // subservient to _poolMutex, do not lock _poolMutex while holding _slaveMutex! + Mutex _slaveMutex; // only used for _slaveCondition at the moment ConditionVariable _slaveCondition; - void (AvatarMixerSlave::*_function)(const SharedNodePointer& node); - std::function _configure; + void (AvatarMixerSlave::*_function)(const SharedNodePointer& node); // guarded by _slavesActive: r/o when shared, r/w when locked + std::function _configure; // guarded by _slavesActive: r/o when shared, r/w when locked + // Number of currently-running slave threads + // guarded by _slavesActive: r/o when shared, r/w when locked int _numThreads{ 0 }; - int _numStarted{ 0 }; // guarded by _slaveMutex - int _numFinished{ 0 }; // guarded by _poolMutex - int _numStopped{ 0 }; // guarded by _poolMutex + + // Number of slave threads "awake" and processing the current request (0 <= _numStarted <= _numThreads) + // guarded by _slavesActive: incremented when shared, r/w when locked + std::atomic _numStarted{ 0 }; + + // Number of slave threads finished with the current request (0 <= _numStarted <= _numThreads) + // guarded by _slavesActive: incremented when shared, r/w when locked + std::atomic _numFinished{ 0 }; + + // Number of slave threads shutting down when asked to (0 <= _numStarted <= _numThreads) + // guarded by _slavesActive: incremented when shared, r/w when locked + std::atomic _numStopped{ 0 }; // frame state Queue _queue; @@ -54,6 +68,7 @@ class AvatarMixerSlaveThread : public QThread, public AvatarMixerSlave { Q_OBJECT using ConstIter = NodeList::const_iterator; using Mutex = std::mutex; + using RWMutex = shared_mutex; using Lock = std::unique_lock; public: diff --git a/libraries/shared/src/SharedMutex.cpp b/libraries/shared/src/SharedMutex.cpp new file mode 100644 index 00000000000..d7a16368981 --- /dev/null +++ b/libraries/shared/src/SharedMutex.cpp @@ -0,0 +1,90 @@ +// +// SharedMutex.cpp +// shared/src +// +// Created by Heather Anderson on 9/23/2021. +// Copyright 2021 Vircadia contributors. +// +// Distributed under the Apache License, Version 2.0. +// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html +// + +#include "SharedMutex.h" + +#if __cplusplus < 201402L /* C++14 */ + +// Reference implementation taken from http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2007/n2406.html#shared_mutex_imp + +// Exclusive ownership + +void shared_mutex::lock() { + //std::this_thread::disable_interruption _; + std::unique_lock lk(mut_); + while (state_ & write_entered_) { + gate1_.wait(lk); + } + state_ |= write_entered_; + while (state_ & n_readers_) { + gate2_.wait(lk); + } +} + +bool shared_mutex::try_lock() { + std::unique_lock lk(mut_, std::try_to_lock); + if (lk.owns_lock() && state_ == 0) { + state_ = write_entered_; + return true; + } + return false; +} + +void shared_mutex::unlock() { + { + std::lock_guard _(mut_); + state_ = 0; + } + gate1_.notify_all(); +} + +// Shared ownership + +void shared_mutex::lock_shared() { + //std::this_thread::disable_interruption _; + std::unique_lock lk(mut_); + while ((state_ & write_entered_) || (state_ & n_readers_) == n_readers_) { + gate1_.wait(lk); + } + unsigned num_readers = (state_ & n_readers_) + 1; + state_ &= ~n_readers_; + state_ |= num_readers; +} + +bool shared_mutex::try_lock_shared() { + std::unique_lock lk(mut_, std::try_to_lock); + unsigned num_readers = state_ & n_readers_; + if (lk.owns_lock() && !(state_ & write_entered_) && num_readers != n_readers_) { + ++num_readers; + state_ &= ~n_readers_; + state_ |= num_readers; + return true; + } + return false; +} + +void shared_mutex::unlock_shared() { + std::lock_guard _(mut_); + unsigned num_readers = (state_ & n_readers_) - 1; + state_ &= ~n_readers_; + state_ |= num_readers; + if (state_ & write_entered_) { + if (num_readers == 0) { + gate2_.notify_one(); + } + } else { + if (num_readers == n_readers_ - 1) { + gate1_.notify_one(); + } + } +} + +#endif /* __cplusplus >= 201703L */ diff --git a/libraries/shared/src/SharedMutex.h b/libraries/shared/src/SharedMutex.h new file mode 100644 index 00000000000..e1156665d50 --- /dev/null +++ b/libraries/shared/src/SharedMutex.h @@ -0,0 +1,165 @@ +// +// SharedMutex.h +// shared/src +// +// Created by Heather Anderson on 9/23/2021. +// Copyright 2021 Vircadia contributors. +// +// Distributed under the Apache License, Version 2.0. +// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html +// + +#ifndef hifi_SharedMutex_h +#define hifi_SharedMutex_h + +#include + +#if __cplusplus >= 201402L /* C++14 */ + +using shared_mutex = std::shared_mutex; +template using shared_lock = std::shared_lock<_Mutex>; + +#else + +// Reference implementation taken from http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2007/n2406.html#shared_mutex_imp + +class shared_mutex { + using mutex = std::mutex; + using cond_var = std::condition_variable; + + mutex mut_; + cond_var gate1_; + cond_var gate2_; + unsigned state_; + + static const unsigned write_entered_ = 1U << (sizeof(unsigned) * CHAR_BIT - 1); + static const unsigned n_readers_ = ~write_entered_; + +public: + inline shared_mutex() : state_(0) {} + + // Exclusive ownership + + void lock(); + bool try_lock(); + void unlock(); + + // Shared ownership + + void lock_shared(); + bool try_lock_shared(); + void unlock_shared(); +}; + +// CLASS TEMPLATE shared_lock (copied from unique_lock) +template +class shared_lock { // whizzy class with destructor that unlocks mutex +public: + using mutex_type = _Mutex; + + // CONSTRUCT, ASSIGN, AND DESTROY + shared_lock() noexcept : _Pmtx(nullptr), _Owns(false) {} + + explicit shared_lock(_Mutex& _Mtx) : _Pmtx(&_Mtx), _Owns(false) { // construct and lock + _Pmtx->lock_shared(); + _Owns = true; + } + + shared_lock(_Mutex& _Mtx, std::adopt_lock_t) : _Pmtx(&_Mtx), _Owns(true) {} // construct and assume already locked + + shared_lock(_Mutex& _Mtx, std::defer_lock_t) noexcept : _Pmtx(&_Mtx), _Owns(false) {} // construct but don't lock + + shared_lock(_Mutex& _Mtx, std::try_to_lock_t) : + _Pmtx(&_Mtx), _Owns(_Pmtx->try_lock_shared()) {} // construct and try to lock + + shared_lock(shared_lock&& _Other) noexcept : _Pmtx(_Other._Pmtx), _Owns(_Other._Owns) { + _Other._Pmtx = nullptr; + _Other._Owns = false; + } + + shared_lock& operator=(shared_lock&& _Other) { + if (this != &_Other) { + if (_Owns) { + _Pmtx->unlock_shared(); + } + + _Pmtx = _Other._Pmtx; + _Owns = _Other._Owns; + _Other._Pmtx = nullptr; + _Other._Owns = false; + } + return *this; + } + + ~shared_lock() noexcept { + if (_Owns) { + _Pmtx->unlock_shared(); + } + } + + shared_lock(const shared_lock&) = delete; + shared_lock& operator=(const shared_lock&) = delete; + + void lock() { // lock the mutex + Validate(); + _Pmtx->lock_shared(); + _Owns = true; + } + + bool try_lock() { + Validate(); + _Owns = _Pmtx->try_lock_shared(); + return _Owns; + } + + void unlock() { + if (!_Pmtx || !_Owns) { + throw std::system_error(std::errc::operation_not_permitted); + } + + _Pmtx->unlock_shared(); + _Owns = false; + } + + void swap(shared_lock& _Other) noexcept { + std::swap(_Pmtx, _Other._Pmtx); + std::swap(_Owns, _Other._Owns); + } + + _Mutex* release() noexcept { + _Mutex* _Res = _Pmtx; + _Pmtx = nullptr; + _Owns = false; + return _Res; + } + + bool owns_lock() const noexcept { return _Owns; } + + explicit operator bool() const noexcept { return _Owns; } + + _Mutex* mutex() const noexcept { return _Pmtx; } + +private: + _Mutex* _Pmtx; + bool _Owns; + + void Validate() const { // check if the mutex can be locked + if (!_Pmtx) { + throw std::system_error(std::errc::operation_not_permitted); + } + + if (_Owns) { + throw std::system_error(std::errc::resource_deadlock_would_occur); + } + } +}; + +// FUNCTION TEMPLATE swap FOR shared_lock +template +void swap(shared_lock<_Mutex>& _Left, shared_lock<_Mutex>& _Right) noexcept { + _Left.swap(_Right); +} + +#endif /* __cplusplus >= 201703L */ + +#endif /* hifi_SharedMutex_h */ \ No newline at end of file From 536b1ed14f89d1ac9e9201be0e943b983c172593 Mon Sep 17 00:00:00 2001 From: Heather Anderson Date: Thu, 23 Sep 2021 21:09:16 -0700 Subject: [PATCH 05/16] stripping out the shared_mutex for a contract-based threading model (which hopefully stays up just from handshake agreements?) --- .../src/audio/AudioMixerSlavePool.cpp | 87 +++------ .../src/audio/AudioMixerSlavePool.h | 33 ++-- .../src/avatars/AvatarMixerSlavePool.cpp | 93 +++------- .../src/avatars/AvatarMixerSlavePool.h | 29 +-- libraries/shared/src/SharedMutex.cpp | 90 ---------- libraries/shared/src/SharedMutex.h | 165 ------------------ 6 files changed, 81 insertions(+), 416 deletions(-) delete mode 100644 libraries/shared/src/SharedMutex.cpp delete mode 100644 libraries/shared/src/SharedMutex.h diff --git a/assignment-client/src/audio/AudioMixerSlavePool.cpp b/assignment-client/src/audio/AudioMixerSlavePool.cpp index 0a84f019c45..7960f75ed16 100644 --- a/assignment-client/src/audio/AudioMixerSlavePool.cpp +++ b/assignment-client/src/audio/AudioMixerSlavePool.cpp @@ -18,32 +18,6 @@ #include -template -class lock_anti_guard { -public: - using mutex_type = _Mutex; - - explicit lock_anti_guard(std::unique_lock<_Mutex>& lock) : - _MyMutex(*lock.mutex()), _Owns(lock.owns_lock()) { // construct and unlock - if (_Owns) { - _MyMutex.unlock(); - } - } - - ~lock_anti_guard() noexcept { - if (_Owns) { - _MyMutex.lock(); - } - } - - lock_anti_guard(const lock_anti_guard&) = delete; - lock_anti_guard& operator=(const lock_anti_guard&) = delete; - -private: - _Mutex& _MyMutex; - bool _Owns; -}; - void AudioMixerSlaveThread::run() { while (true) { wait(); @@ -66,12 +40,10 @@ void AudioMixerSlaveThread::wait() { { Lock slaveLock(_pool._slaveMutex); _pool._slaveCondition.wait(slaveLock, [&] { - shared_lock activeLock(_pool._slavesActive); assert(_pool._numStarted <= _pool._numThreads); return _pool._numStarted != _pool._numThreads; }); } - shared_lock activeLock(_pool._slavesActive); ++_pool._numStarted; if (_pool._configure) { @@ -81,11 +53,11 @@ void AudioMixerSlaveThread::wait() { } void AudioMixerSlaveThread::notify(bool stopping) { - shared_lock activeLock(_pool._slavesActive); - assert(_pool._numFinished < _pool._numThreads); + assert(_pool._numFinished < _pool._numThreads && _pool._numFinished <= _pool._numStarted); int numFinished = ++_pool._numFinished; if (stopping) { ++_pool._numStopped; + assert(_pool._numStopped <= _pool._numFinished); } if (numFinished == _pool._numThreads) { @@ -98,20 +70,16 @@ bool AudioMixerSlaveThread::try_pop(SharedNodePointer& node) { } void AudioMixerSlavePool::processPackets(ConstIter begin, ConstIter end) { - { - std::lock_guard activeLock(_slavesActive); - _function = &AudioMixerSlave::processPackets; - _configure = [](AudioMixerSlave& slave) {}; - } + _function = &AudioMixerSlave::processPackets; + _configure = [](AudioMixerSlave& slave) {}; run(begin, end); } void AudioMixerSlavePool::mix(ConstIter begin, ConstIter end, unsigned int frame, int numToRetain) { - { - std::lock_guard activeLock(_slavesActive); - _function = &AudioMixerSlave::mix; - _configure = [=](AudioMixerSlave& slave) { slave.configureMix(_begin, _end, frame, numToRetain); }; - } + _function = &AudioMixerSlave::mix; + _configure = [=](AudioMixerSlave& slave) { + slave.configureMix(_begin, _end, frame, numToRetain); + }; run(begin, end); } @@ -125,26 +93,19 @@ void AudioMixerSlavePool::run(ConstIter begin, ConstIter end) { _queue.push(node); }); - { - std::unique_lock activeLock(_slavesActive); - - // run - _numStarted = _numFinished = 0; - _slaveCondition.notify_all(); + // run + _numStarted = _numFinished = 0; + _slaveCondition.notify_all(); - // wait - { - lock_anti_guard releaseActiveLock(activeLock); - Lock poolLock(_poolMutex); - _poolCondition.wait(poolLock, [&] { - shared_lock activeLock(_slavesActive); - assert(_numFinished <= _numThreads); - return _numFinished == _numThreads; - }); - } - - assert(_numStarted == _numThreads); + // wait + { + Lock poolLock(_poolMutex); + _poolCondition.wait(poolLock, [&] { + assert(_numFinished <= _numThreads); + return _numFinished == _numThreads; + }); } + assert(_numStarted == _numThreads); assert(_queue.empty()); } @@ -189,11 +150,10 @@ void AudioMixerSlavePool::setNumThreads(int numThreads) { } void AudioMixerSlavePool::resize(int numThreads) { - qDebug("%s: set %d threads (was %d)", __FUNCTION__, numThreads, _numThreads); - - std::unique_lock activeLock(_slavesActive); assert(_numThreads == (int)_slaves.size()); + qDebug("%s: set %d threads (was %d)", __FUNCTION__, numThreads, _numThreads); + if (numThreads > _numThreads) { // start new slaves for (int i = 0; i < numThreads - _numThreads; ++i) { @@ -215,16 +175,15 @@ void AudioMixerSlavePool::resize(int numThreads) { // ...cycle them until they do stop... _numStopped = 0; while (_numStopped != (_numThreads - numThreads)) { - _numStarted = _numFinished = _numStopped.load(); + _numStarted = _numFinished = 0; _slaveCondition.notify_all(); - lock_anti_guard releaseActiveLock(activeLock); Lock poolLock(_poolMutex); _poolCondition.wait(poolLock, [&] { - shared_lock activeLock(_slavesActive); assert(_numFinished <= _numThreads); return _numFinished == _numThreads; }); + assert(_numStopped == (_numThreads - numThreads)); } // ...wait for threads to finish... diff --git a/assignment-client/src/audio/AudioMixerSlavePool.h b/assignment-client/src/audio/AudioMixerSlavePool.h index efa71fed7aa..fd6f24cb41e 100644 --- a/assignment-client/src/audio/AudioMixerSlavePool.h +++ b/assignment-client/src/audio/AudioMixerSlavePool.h @@ -19,7 +19,6 @@ #include #include #include -#include #include "AudioMixerSlave.h" @@ -29,32 +28,35 @@ struct AudioMixerSlavePoolData { using Queue = tbb::concurrent_queue; using Mutex = std::mutex; using ConditionVariable = std::condition_variable; - using RWMutex = shared_mutex; // synchronization state - shared_mutex _slavesActive; // shared_lock by slaves while running, lock by pool when configuring - Mutex _poolMutex; // only used for _poolCondition at the moment - ConditionVariable _poolCondition; - Mutex _slaveMutex; // only used for _slaveCondition at the moment - ConditionVariable _slaveCondition; + Mutex _poolMutex; // only used for _poolCondition at the moment + ConditionVariable _poolCondition; // woken when work has been completed (_numStarted = _numFinished = _numThreads) + Mutex _slaveMutex; // only used for _slaveCondition at the moment + ConditionVariable _slaveCondition; // woken when work needs to be done (_numStarted < _numThreads) - void (AudioMixerSlave::*_function)(const SharedNodePointer& node); // guarded by _slavesActive: r/o when shared, r/w when locked - std::function _configure; // guarded by _slavesActive: r/o when shared, r/w when locked + // The variables below this point are alternately owned by the pool or by the slaves collectively. + // When idle they are owned by the pool. + // Moving ownership to the slaves is done by setting _numStarted = _numFinished = 0 and waking _slaveCondition + // Moving ownership to the pool is done when _numFinished == _numThreads and is done by waking _poolCondition + + void (AudioMixerSlave::*_function)(const SharedNodePointer& node); // r/o when owned by slaves, r/w when owned by pool + std::function _configure; // r/o when owned by slaves, r/w when owned by pool // Number of currently-running slave threads - // guarded by _slavesActive: r/o when shared, r/w when locked + // r/o when owned by slaves, r/w when owned by pool int _numThreads{ 0 }; // Number of slave threads "awake" and processing the current request (0 <= _numStarted <= _numThreads) - // guarded by _slavesActive: incremented when shared, r/w when locked + // incremented when owned by slaves, r/w when owned by pool std::atomic _numStarted{ 0 }; - // Number of slave threads finished with the current request (0 <= _numStarted <= _numThreads) - // guarded by _slavesActive: incremented when shared, r/w when locked + // Number of slave threads finished with the current request (0 <= _numFinished <= _numStarted) + // incremented when owned by slaves, r/w when owned by pool std::atomic _numFinished{ 0 }; - // Number of slave threads shutting down when asked to (0 <= _numStarted <= _numThreads) - // guarded by _slavesActive: incremented when shared, r/w when locked + // Number of slave threads shutting down when asked to (0 <= _numStopped <= _numFinished) + // incremented when owned by slaves, r/w when owned by pool std::atomic _numStopped{ 0 }; // frame state @@ -66,7 +68,6 @@ class AudioMixerSlaveThread : public QThread, public AudioMixerSlave { using ConstIter = NodeList::const_iterator; using Mutex = std::mutex; using Lock = std::unique_lock; - using RWMutex = shared_mutex; public: AudioMixerSlaveThread(AudioMixerSlavePoolData& pool, AudioMixerSlave::SharedData& sharedData) diff --git a/assignment-client/src/avatars/AvatarMixerSlavePool.cpp b/assignment-client/src/avatars/AvatarMixerSlavePool.cpp index 3c1556b1f5e..d551464533f 100644 --- a/assignment-client/src/avatars/AvatarMixerSlavePool.cpp +++ b/assignment-client/src/avatars/AvatarMixerSlavePool.cpp @@ -14,32 +14,6 @@ #include #include -template -class lock_anti_guard { -public: - using mutex_type = _Mutex; - - explicit lock_anti_guard(std::unique_lock<_Mutex>& lock) : - _MyMutex(*lock.mutex()), _Owns(lock.owns_lock()) { // construct and unlock - if (_Owns) { - _MyMutex.unlock(); - } - } - - ~lock_anti_guard() noexcept { - if (_Owns) { - _MyMutex.lock(); - } - } - - lock_anti_guard(const lock_anti_guard&) = delete; - lock_anti_guard& operator=(const lock_anti_guard&) = delete; - -private: - _Mutex& _MyMutex; - bool _Owns; -}; - void AvatarMixerSlaveThread::run() { while (true) { wait(); @@ -62,12 +36,10 @@ void AvatarMixerSlaveThread::wait() { { Lock slaveLock(_pool._slaveMutex); _pool._slaveCondition.wait(slaveLock, [&] { - shared_lock activeLock(_pool._slavesActive); assert(_pool._numStarted <= _pool._numThreads); return _pool._numStarted != _pool._numThreads; }); } - shared_lock activeLock(_pool._slavesActive); ++_pool._numStarted; if (_pool._configure) { @@ -77,11 +49,11 @@ void AvatarMixerSlaveThread::wait() { } void AvatarMixerSlaveThread::notify(bool stopping) { - shared_lock activeLock(_pool._slavesActive); - assert(_pool._numFinished < _pool._numThreads); + assert(_pool._numFinished < _pool._numThreads && _pool._numFinished <= _pool._numStarted); int numFinished = ++_pool._numFinished; if (stopping) { ++_pool._numStopped; + assert(_pool._numStopped <= _pool._numFinished); } if (numFinished == _pool._numThreads) { @@ -94,25 +66,21 @@ bool AvatarMixerSlaveThread::try_pop(SharedNodePointer& node) { } void AvatarMixerSlavePool::processIncomingPackets(ConstIter begin, ConstIter end) { - { - std::lock_guard activeLock(_slavesActive); - _function = &AvatarMixerSlave::processIncomingPackets; - _configure = [=](AvatarMixerSlave& slave) { slave.configure(begin, end); }; - } + _function = &AvatarMixerSlave::processIncomingPackets; + _configure = [=](AvatarMixerSlave& slave) { + slave.configure(begin, end); + }; run(begin, end); } void AvatarMixerSlavePool::broadcastAvatarData(ConstIter begin, ConstIter end, p_high_resolution_clock::time_point lastFrameTimestamp, float maxKbpsPerNode, float throttlingRatio) { - { - std::lock_guard activeLock(_slavesActive); - _function = &AvatarMixerSlave::broadcastAvatarData; - _configure = [=](AvatarMixerSlave& slave) { - slave.configureBroadcast(begin, end, lastFrameTimestamp, maxKbpsPerNode, throttlingRatio, - _priorityReservedFraction); - }; - } + _function = &AvatarMixerSlave::broadcastAvatarData; + _configure = [=](AvatarMixerSlave& slave) { + slave.configureBroadcast(begin, end, lastFrameTimestamp, maxKbpsPerNode, throttlingRatio, + _priorityReservedFraction); + }; run(begin, end); } @@ -125,26 +93,19 @@ void AvatarMixerSlavePool::run(ConstIter begin, ConstIter end) { _queue.push(node); }); - { - std::unique_lock activeLock(_slavesActive); - - // run - _numStarted = _numFinished = 0; - _slaveCondition.notify_all(); + // run + _numStarted = _numFinished = 0; + _slaveCondition.notify_all(); - // wait - { - lock_anti_guard releaseActiveLock(activeLock); - Lock poolLock(_poolMutex); - _poolCondition.wait(poolLock, [&] { - shared_lock activeLock(_slavesActive); - assert(_numFinished <= _numThreads); - return _numFinished == _numThreads; - }); - } - - assert(_numStarted == _numThreads); + // wait + { + Lock poolLock(_poolMutex); + _poolCondition.wait(poolLock, [&] { + assert(_numFinished <= _numThreads); + return _numFinished == _numThreads; + }); } + assert(_numStarted == _numThreads); assert(_queue.empty()); } @@ -190,11 +151,10 @@ void AvatarMixerSlavePool::setNumThreads(int numThreads) { } void AvatarMixerSlavePool::resize(int numThreads) { - qDebug("%s: set %d threads (was %d)", __FUNCTION__, numThreads, _numThreads); - - std::unique_lock activeLock(_slavesActive); assert(_numThreads == (int)_slaves.size()); + qDebug("%s: set %d threads (was %d)", __FUNCTION__, numThreads, _numThreads); + if (numThreads > _numThreads) { // start new slaves for (int i = 0; i < numThreads - _numThreads; ++i) { @@ -215,16 +175,15 @@ void AvatarMixerSlavePool::resize(int numThreads) { // ...cycle them until they do stop... _numStopped = 0; while (_numStopped != (_numThreads - numThreads)) { - _numStarted = _numFinished = _numStopped.load(); + _numStarted = _numFinished = 0; _slaveCondition.notify_all(); - lock_anti_guard releaseActiveLock(activeLock); Lock poolLock(_poolMutex); _poolCondition.wait(poolLock, [&] { - shared_lock activeLock(_slavesActive); assert(_numFinished <= _numThreads); return _numFinished == _numThreads; }); + assert(_numStopped == (_numThreads - numThreads)); } // ...wait for threads to finish... diff --git a/assignment-client/src/avatars/AvatarMixerSlavePool.h b/assignment-client/src/avatars/AvatarMixerSlavePool.h index fe305d0a43b..0d0f4eb4180 100644 --- a/assignment-client/src/avatars/AvatarMixerSlavePool.h +++ b/assignment-client/src/avatars/AvatarMixerSlavePool.h @@ -21,7 +21,6 @@ #include #include #include -#include #include "AvatarMixerSlave.h" @@ -32,32 +31,35 @@ struct AvatarMixerSlavePoolData { using Queue = tbb::concurrent_queue; using Mutex = std::mutex; using ConditionVariable = std::condition_variable; - using RWMutex = shared_mutex; // synchronization state - shared_mutex _slavesActive; // shared_lock by slaves while running, lock by pool when configuring - Mutex _poolMutex; // only used for _poolCondition at the moment - ConditionVariable _poolCondition; - Mutex _slaveMutex; // only used for _slaveCondition at the moment - ConditionVariable _slaveCondition; + Mutex _poolMutex; // only used for _poolCondition at the moment + ConditionVariable _poolCondition; // woken when work has been completed (_numStarted = _numFinished = _numThreads) + Mutex _slaveMutex; // only used for _slaveCondition at the moment + ConditionVariable _slaveCondition; // woken when work needs to be done (_numStarted < _numThreads) - void (AvatarMixerSlave::*_function)(const SharedNodePointer& node); // guarded by _slavesActive: r/o when shared, r/w when locked - std::function _configure; // guarded by _slavesActive: r/o when shared, r/w when locked + // The variables below this point are alternately owned by the pool or by the slaves collectively. + // When idle they are owned by the pool. + // Moving ownership to the slaves is done by setting _numStarted = _numFinished = 0 and waking _slaveCondition + // Moving ownership to the pool is done when _numFinished == _numThreads and is done by waking _poolCondition + + void (AvatarMixerSlave::*_function)(const SharedNodePointer& node); // r/o when owned by slaves, r/w when owned by pool + std::function _configure; // r/o when owned by slaves, r/w when owned by pool // Number of currently-running slave threads - // guarded by _slavesActive: r/o when shared, r/w when locked + // r/o when owned by slaves, r/w when owned by pool int _numThreads{ 0 }; // Number of slave threads "awake" and processing the current request (0 <= _numStarted <= _numThreads) - // guarded by _slavesActive: incremented when shared, r/w when locked + // incremented when owned by slaves, r/w when owned by pool std::atomic _numStarted{ 0 }; // Number of slave threads finished with the current request (0 <= _numStarted <= _numThreads) - // guarded by _slavesActive: incremented when shared, r/w when locked + // incremented when owned by slaves, r/w when owned by pool std::atomic _numFinished{ 0 }; // Number of slave threads shutting down when asked to (0 <= _numStarted <= _numThreads) - // guarded by _slavesActive: incremented when shared, r/w when locked + // incremented when owned by slaves, r/w when owned by pool std::atomic _numStopped{ 0 }; // frame state @@ -68,7 +70,6 @@ class AvatarMixerSlaveThread : public QThread, public AvatarMixerSlave { Q_OBJECT using ConstIter = NodeList::const_iterator; using Mutex = std::mutex; - using RWMutex = shared_mutex; using Lock = std::unique_lock; public: diff --git a/libraries/shared/src/SharedMutex.cpp b/libraries/shared/src/SharedMutex.cpp deleted file mode 100644 index d7a16368981..00000000000 --- a/libraries/shared/src/SharedMutex.cpp +++ /dev/null @@ -1,90 +0,0 @@ -// -// SharedMutex.cpp -// shared/src -// -// Created by Heather Anderson on 9/23/2021. -// Copyright 2021 Vircadia contributors. -// -// Distributed under the Apache License, Version 2.0. -// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html -// - -#include "SharedMutex.h" - -#if __cplusplus < 201402L /* C++14 */ - -// Reference implementation taken from http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2007/n2406.html#shared_mutex_imp - -// Exclusive ownership - -void shared_mutex::lock() { - //std::this_thread::disable_interruption _; - std::unique_lock lk(mut_); - while (state_ & write_entered_) { - gate1_.wait(lk); - } - state_ |= write_entered_; - while (state_ & n_readers_) { - gate2_.wait(lk); - } -} - -bool shared_mutex::try_lock() { - std::unique_lock lk(mut_, std::try_to_lock); - if (lk.owns_lock() && state_ == 0) { - state_ = write_entered_; - return true; - } - return false; -} - -void shared_mutex::unlock() { - { - std::lock_guard _(mut_); - state_ = 0; - } - gate1_.notify_all(); -} - -// Shared ownership - -void shared_mutex::lock_shared() { - //std::this_thread::disable_interruption _; - std::unique_lock lk(mut_); - while ((state_ & write_entered_) || (state_ & n_readers_) == n_readers_) { - gate1_.wait(lk); - } - unsigned num_readers = (state_ & n_readers_) + 1; - state_ &= ~n_readers_; - state_ |= num_readers; -} - -bool shared_mutex::try_lock_shared() { - std::unique_lock lk(mut_, std::try_to_lock); - unsigned num_readers = state_ & n_readers_; - if (lk.owns_lock() && !(state_ & write_entered_) && num_readers != n_readers_) { - ++num_readers; - state_ &= ~n_readers_; - state_ |= num_readers; - return true; - } - return false; -} - -void shared_mutex::unlock_shared() { - std::lock_guard _(mut_); - unsigned num_readers = (state_ & n_readers_) - 1; - state_ &= ~n_readers_; - state_ |= num_readers; - if (state_ & write_entered_) { - if (num_readers == 0) { - gate2_.notify_one(); - } - } else { - if (num_readers == n_readers_ - 1) { - gate1_.notify_one(); - } - } -} - -#endif /* __cplusplus >= 201703L */ diff --git a/libraries/shared/src/SharedMutex.h b/libraries/shared/src/SharedMutex.h deleted file mode 100644 index e1156665d50..00000000000 --- a/libraries/shared/src/SharedMutex.h +++ /dev/null @@ -1,165 +0,0 @@ -// -// SharedMutex.h -// shared/src -// -// Created by Heather Anderson on 9/23/2021. -// Copyright 2021 Vircadia contributors. -// -// Distributed under the Apache License, Version 2.0. -// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html -// - -#ifndef hifi_SharedMutex_h -#define hifi_SharedMutex_h - -#include - -#if __cplusplus >= 201402L /* C++14 */ - -using shared_mutex = std::shared_mutex; -template using shared_lock = std::shared_lock<_Mutex>; - -#else - -// Reference implementation taken from http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2007/n2406.html#shared_mutex_imp - -class shared_mutex { - using mutex = std::mutex; - using cond_var = std::condition_variable; - - mutex mut_; - cond_var gate1_; - cond_var gate2_; - unsigned state_; - - static const unsigned write_entered_ = 1U << (sizeof(unsigned) * CHAR_BIT - 1); - static const unsigned n_readers_ = ~write_entered_; - -public: - inline shared_mutex() : state_(0) {} - - // Exclusive ownership - - void lock(); - bool try_lock(); - void unlock(); - - // Shared ownership - - void lock_shared(); - bool try_lock_shared(); - void unlock_shared(); -}; - -// CLASS TEMPLATE shared_lock (copied from unique_lock) -template -class shared_lock { // whizzy class with destructor that unlocks mutex -public: - using mutex_type = _Mutex; - - // CONSTRUCT, ASSIGN, AND DESTROY - shared_lock() noexcept : _Pmtx(nullptr), _Owns(false) {} - - explicit shared_lock(_Mutex& _Mtx) : _Pmtx(&_Mtx), _Owns(false) { // construct and lock - _Pmtx->lock_shared(); - _Owns = true; - } - - shared_lock(_Mutex& _Mtx, std::adopt_lock_t) : _Pmtx(&_Mtx), _Owns(true) {} // construct and assume already locked - - shared_lock(_Mutex& _Mtx, std::defer_lock_t) noexcept : _Pmtx(&_Mtx), _Owns(false) {} // construct but don't lock - - shared_lock(_Mutex& _Mtx, std::try_to_lock_t) : - _Pmtx(&_Mtx), _Owns(_Pmtx->try_lock_shared()) {} // construct and try to lock - - shared_lock(shared_lock&& _Other) noexcept : _Pmtx(_Other._Pmtx), _Owns(_Other._Owns) { - _Other._Pmtx = nullptr; - _Other._Owns = false; - } - - shared_lock& operator=(shared_lock&& _Other) { - if (this != &_Other) { - if (_Owns) { - _Pmtx->unlock_shared(); - } - - _Pmtx = _Other._Pmtx; - _Owns = _Other._Owns; - _Other._Pmtx = nullptr; - _Other._Owns = false; - } - return *this; - } - - ~shared_lock() noexcept { - if (_Owns) { - _Pmtx->unlock_shared(); - } - } - - shared_lock(const shared_lock&) = delete; - shared_lock& operator=(const shared_lock&) = delete; - - void lock() { // lock the mutex - Validate(); - _Pmtx->lock_shared(); - _Owns = true; - } - - bool try_lock() { - Validate(); - _Owns = _Pmtx->try_lock_shared(); - return _Owns; - } - - void unlock() { - if (!_Pmtx || !_Owns) { - throw std::system_error(std::errc::operation_not_permitted); - } - - _Pmtx->unlock_shared(); - _Owns = false; - } - - void swap(shared_lock& _Other) noexcept { - std::swap(_Pmtx, _Other._Pmtx); - std::swap(_Owns, _Other._Owns); - } - - _Mutex* release() noexcept { - _Mutex* _Res = _Pmtx; - _Pmtx = nullptr; - _Owns = false; - return _Res; - } - - bool owns_lock() const noexcept { return _Owns; } - - explicit operator bool() const noexcept { return _Owns; } - - _Mutex* mutex() const noexcept { return _Pmtx; } - -private: - _Mutex* _Pmtx; - bool _Owns; - - void Validate() const { // check if the mutex can be locked - if (!_Pmtx) { - throw std::system_error(std::errc::operation_not_permitted); - } - - if (_Owns) { - throw std::system_error(std::errc::resource_deadlock_would_occur); - } - } -}; - -// FUNCTION TEMPLATE swap FOR shared_lock -template -void swap(shared_lock<_Mutex>& _Left, shared_lock<_Mutex>& _Right) noexcept { - _Left.swap(_Right); -} - -#endif /* __cplusplus >= 201703L */ - -#endif /* hifi_SharedMutex_h */ \ No newline at end of file From aa2c30741af5bbdf18101b27f497f4597e67ebe0 Mon Sep 17 00:00:00 2001 From: Heather Anderson Date: Fri, 24 Sep 2021 00:33:44 -0700 Subject: [PATCH 06/16] suggested changes: - changed "slave" to "worker" when dealing with classes in this file - moved the "data" struct to be a private member instead of a base class --- .../src/audio/AudioMixerSlavePool.cpp | 134 +++++++++--------- .../src/audio/AudioMixerSlavePool.h | 68 ++++----- .../src/avatars/AvatarMixerSlavePool.cpp | 133 +++++++++-------- .../src/avatars/AvatarMixerSlavePool.h | 66 ++++----- 4 files changed, 202 insertions(+), 199 deletions(-) diff --git a/assignment-client/src/audio/AudioMixerSlavePool.cpp b/assignment-client/src/audio/AudioMixerSlavePool.cpp index 7960f75ed16..4db71f0fe84 100644 --- a/assignment-client/src/audio/AudioMixerSlavePool.cpp +++ b/assignment-client/src/audio/AudioMixerSlavePool.cpp @@ -18,7 +18,7 @@ #include -void AudioMixerSlaveThread::run() { +void AudioMixerWorkerThread::run() { while (true) { wait(); @@ -36,48 +36,48 @@ void AudioMixerSlaveThread::run() { } } -void AudioMixerSlaveThread::wait() { +void AudioMixerWorkerThread::wait() { { - Lock slaveLock(_pool._slaveMutex); - _pool._slaveCondition.wait(slaveLock, [&] { - assert(_pool._numStarted <= _pool._numThreads); - return _pool._numStarted != _pool._numThreads; + Lock workerLock(_pool.workerMutex); + _pool.workerCondition.wait(workerLock, [&] { + assert(_pool.numStarted <= _pool.numThreads); + return _pool.numStarted != _pool.numThreads; }); } - ++_pool._numStarted; + ++_pool.numStarted; - if (_pool._configure) { - _pool._configure(*this); + if (_pool.configure) { + _pool.configure(*this); } - _function = _pool._function; + _function = _pool.function; } -void AudioMixerSlaveThread::notify(bool stopping) { - assert(_pool._numFinished < _pool._numThreads && _pool._numFinished <= _pool._numStarted); - int numFinished = ++_pool._numFinished; +void AudioMixerWorkerThread::notify(bool stopping) { + assert(_pool.numFinished < _pool.numThreads && _pool.numFinished <= _pool.numStarted); + int numFinished = ++_pool.numFinished; if (stopping) { - ++_pool._numStopped; - assert(_pool._numStopped <= _pool._numFinished); + ++_pool.numStopped; + assert(_pool.numStopped <= _pool.numFinished); } - if (numFinished == _pool._numThreads) { - _pool._poolCondition.notify_one(); + if (numFinished == _pool.numThreads) { + _pool.poolCondition.notify_one(); } } -bool AudioMixerSlaveThread::try_pop(SharedNodePointer& node) { - return _pool._queue.try_pop(node); +bool AudioMixerWorkerThread::try_pop(SharedNodePointer& node) { + return _pool.queue.try_pop(node); } void AudioMixerSlavePool::processPackets(ConstIter begin, ConstIter end) { - _function = &AudioMixerSlave::processPackets; - _configure = [](AudioMixerSlave& slave) {}; + _data.function = &AudioMixerSlave::processPackets; + _data.configure = [](AudioMixerSlave& slave) {}; run(begin, end); } void AudioMixerSlavePool::mix(ConstIter begin, ConstIter end, unsigned int frame, int numToRetain) { - _function = &AudioMixerSlave::mix; - _configure = [=](AudioMixerSlave& slave) { + _data.function = &AudioMixerSlave::mix; + _data.configure = [=](AudioMixerSlave& slave) { slave.configureMix(_begin, _end, frame, numToRetain); }; @@ -90,37 +90,37 @@ void AudioMixerSlavePool::run(ConstIter begin, ConstIter end) { // fill the queue std::for_each(_begin, _end, [&](const SharedNodePointer& node) { - _queue.push(node); + _data.queue.push(node); }); // run - _numStarted = _numFinished = 0; - _slaveCondition.notify_all(); + _data.numStarted = _data.numFinished = 0; + _data.workerCondition.notify_all(); // wait { - Lock poolLock(_poolMutex); - _poolCondition.wait(poolLock, [&] { - assert(_numFinished <= _numThreads); - return _numFinished == _numThreads; + Lock poolLock(_data.poolMutex); + _data.poolCondition.wait(poolLock, [&] { + assert(_data.numFinished <= _data.numThreads); + return _data.numFinished == _data.numThreads; }); } - assert(_numStarted == _numThreads); + assert(_data.numStarted == _data.numThreads); - assert(_queue.empty()); + assert(_data.queue.empty()); } void AudioMixerSlavePool::each(std::function functor) { - for (auto& slave : _slaves) { - functor(*slave.get()); + for (auto& worker : _workers) { + functor(*worker.get()); } } #ifdef DEBUG_EVENT_QUEUE void AudioMixerSlavePool::queueStats(QJsonObject& stats) { unsigned i = 0; - for (auto& slave : _slaves) { - int queueSize = ::hifi::qt::getEventQueueSize(slave.get()); + for (auto& worker : _workers) { + int queueSize = ::hifi::qt::getEventQueueSize(worker.get()); QString queueName = QString("audio_thread_event_queue_%1").arg(i); stats[queueName] = queueSize; @@ -150,55 +150,55 @@ void AudioMixerSlavePool::setNumThreads(int numThreads) { } void AudioMixerSlavePool::resize(int numThreads) { - assert(_numThreads == (int)_slaves.size()); + assert(_data.numThreads == (int)_workers.size()); - qDebug("%s: set %d threads (was %d)", __FUNCTION__, numThreads, _numThreads); + qDebug("%s: set %d threads (was %d)", __FUNCTION__, numThreads, _data.numThreads); - if (numThreads > _numThreads) { + if (numThreads > _data.numThreads) { // start new slaves - for (int i = 0; i < numThreads - _numThreads; ++i) { - auto slave = new AudioMixerSlaveThread(*this, _workerSharedData); - QObject::connect(slave, &QThread::started, [] { setThreadName("AudioMixerSlaveThread"); }); - slave->start(); - _slaves.emplace_back(slave); + for (int i = 0; i < numThreads - _data.numThreads; ++i) { + auto worker = new AudioMixerWorkerThread(_data, _workerSharedData); + QObject::connect(worker, &QThread::started, [] { setThreadName("AudioMixerSlaveThread"); }); + worker->start(); + _workers.emplace_back(worker); } - } else if (numThreads < _numThreads) { - auto extraBegin = _slaves.begin() + numThreads; + } else if (numThreads < _data.numThreads) { + auto extraBegin = _workers.begin() + numThreads; // mark slaves to stop... - auto slave = extraBegin; - while (slave != _slaves.end()) { - (*slave)->stop(); - ++slave; + auto worker = extraBegin; + while (worker != _workers.end()) { + (*worker)->stop(); + ++worker; } // ...cycle them until they do stop... - _numStopped = 0; - while (_numStopped != (_numThreads - numThreads)) { - _numStarted = _numFinished = 0; - _slaveCondition.notify_all(); - - Lock poolLock(_poolMutex); - _poolCondition.wait(poolLock, [&] { - assert(_numFinished <= _numThreads); - return _numFinished == _numThreads; + _data.numStopped = 0; + while (_data.numStopped != (_data.numThreads - numThreads)) { + _data.numStarted = _data.numFinished = 0; + _data.workerCondition.notify_all(); + + Lock poolLock(_data.poolMutex); + _data.poolCondition.wait(poolLock, [&] { + assert(_data.numFinished <= _data.numThreads); + return _data.numFinished == _data.numThreads; }); - assert(_numStopped == (_numThreads - numThreads)); + assert(_data.numStopped == (_data.numThreads - numThreads)); } // ...wait for threads to finish... - slave = extraBegin; - while (slave != _slaves.end()) { - QThread* thread = reinterpret_cast(slave->get()); + worker = extraBegin; + while (worker != _workers.end()) { + QThread* thread = reinterpret_cast(worker->get()); static const int MAX_THREAD_WAIT_TIME = 10; thread->wait(MAX_THREAD_WAIT_TIME); - ++slave; + ++worker; } // ...and erase them - _slaves.erase(extraBegin, _slaves.end()); + _workers.erase(extraBegin, _workers.end()); } - _numThreads = _numStarted = _numFinished = numThreads; - assert(_numThreads == (int)_slaves.size()); + _data.numThreads = _data.numStarted = _data.numFinished = numThreads; + assert(_data.numThreads == (int)_slaves.size()); } diff --git a/assignment-client/src/audio/AudioMixerSlavePool.h b/assignment-client/src/audio/AudioMixerSlavePool.h index fd6f24cb41e..df437c32733 100644 --- a/assignment-client/src/audio/AudioMixerSlavePool.h +++ b/assignment-client/src/audio/AudioMixerSlavePool.h @@ -22,55 +22,55 @@ #include "AudioMixerSlave.h" -// Private slave pool data that is shared and accessible with the slave threads. This describes +// Private worker pool data that is shared and accessible with the worker threads. This describes // what information is needs to be thread-safe -struct AudioMixerSlavePoolData { +struct AudioMixerWorkerPoolData { using Queue = tbb::concurrent_queue; using Mutex = std::mutex; using ConditionVariable = std::condition_variable; // synchronization state - Mutex _poolMutex; // only used for _poolCondition at the moment - ConditionVariable _poolCondition; // woken when work has been completed (_numStarted = _numFinished = _numThreads) - Mutex _slaveMutex; // only used for _slaveCondition at the moment - ConditionVariable _slaveCondition; // woken when work needs to be done (_numStarted < _numThreads) + Mutex poolMutex; // only used for _poolCondition at the moment + ConditionVariable poolCondition; // woken when work has been completed (_numStarted = _numFinished = _numThreads) + Mutex workerMutex; // only used for _workerCondition at the moment + ConditionVariable workerCondition; // woken when work needs to be done (_numStarted < _numThreads) - // The variables below this point are alternately owned by the pool or by the slaves collectively. + // The variables below this point are alternately owned by the pool or by the workers collectively. // When idle they are owned by the pool. - // Moving ownership to the slaves is done by setting _numStarted = _numFinished = 0 and waking _slaveCondition + // Moving ownership to the workers is done by setting _numStarted = _numFinished = 0 and waking _workerCondition // Moving ownership to the pool is done when _numFinished == _numThreads and is done by waking _poolCondition - void (AudioMixerSlave::*_function)(const SharedNodePointer& node); // r/o when owned by slaves, r/w when owned by pool - std::function _configure; // r/o when owned by slaves, r/w when owned by pool + void (AudioMixerSlave::*function)(const SharedNodePointer& node); // r/o when owned by workers, r/w when owned by pool + std::function configure; // r/o when owned by workers, r/w when owned by pool - // Number of currently-running slave threads - // r/o when owned by slaves, r/w when owned by pool - int _numThreads{ 0 }; + // Number of currently-running worker threads + // r/o when owned by workersves, r/w when owned by pool + int numThreads{ 0 }; - // Number of slave threads "awake" and processing the current request (0 <= _numStarted <= _numThreads) - // incremented when owned by slaves, r/w when owned by pool - std::atomic _numStarted{ 0 }; + // Number of worker threads "awake" and processing the current request (0 <= _numStarted <= _numThreads) + // incremented when owned by workers, r/w when owned by pool + std::atomic numStarted{ 0 }; - // Number of slave threads finished with the current request (0 <= _numFinished <= _numStarted) - // incremented when owned by slaves, r/w when owned by pool - std::atomic _numFinished{ 0 }; + // Number of worker threads finished with the current request (0 <= _numFinished <= _numStarted) + // incremented when owned by workers, r/w when owned by pool + std::atomic numFinished{ 0 }; - // Number of slave threads shutting down when asked to (0 <= _numStopped <= _numFinished) - // incremented when owned by slaves, r/w when owned by pool - std::atomic _numStopped{ 0 }; + // Number of worker threads shutting down when asked to (0 <= _numStopped <= _numFinished) + // incremented when owned by workers, r/w when owned by pool + std::atomic numStopped{ 0 }; // frame state - Queue _queue; + Queue queue; }; -class AudioMixerSlaveThread : public QThread, public AudioMixerSlave { +class AudioMixerWorkerThread : public QThread, public AudioMixerSlave { Q_OBJECT using ConstIter = NodeList::const_iterator; using Mutex = std::mutex; using Lock = std::unique_lock; public: - AudioMixerSlaveThread(AudioMixerSlavePoolData& pool, AudioMixerSlave::SharedData& sharedData) + AudioMixerWorkerThread(AudioMixerWorkerPoolData& pool, AudioMixerSlave::SharedData& sharedData) : AudioMixerSlave(sharedData), _pool(pool) {} void run() override final; @@ -81,14 +81,15 @@ class AudioMixerSlaveThread : public QThread, public AudioMixerSlave { void notify(bool stopping); bool try_pop(SharedNodePointer& node); - AudioMixerSlavePoolData& _pool; + AudioMixerWorkerPoolData& _pool; void (AudioMixerSlave::*_function)(const SharedNodePointer& node) { nullptr }; bool _stop { false }; }; -// Slave pool for audio mixers +// Worker pool for audio mixers // AudioMixerSlavePool is not thread-safe! It should be instantiated and used from a single thread. -class AudioMixerSlavePool : private AudioMixerSlavePoolData { +class AudioMixerSlavePool { + using Mutex = std::mutex; using Lock = std::unique_lock; public: @@ -98,13 +99,13 @@ class AudioMixerSlavePool : private AudioMixerSlavePoolData { : _workerSharedData(sharedData) { setNumThreads(numThreads); } ~AudioMixerSlavePool() { resize(0); } - // process packets on slave threads + // process packets on worker threads void processPackets(ConstIter begin, ConstIter end); - // mix on slave threads + // mix on worker threads void mix(ConstIter begin, ConstIter end, unsigned int frame, int numToRetain); - // iterate over all slaves + // iterate over all workers void each(std::function functor); #ifdef DEBUG_EVENT_QUEUE @@ -112,19 +113,20 @@ class AudioMixerSlavePool : private AudioMixerSlavePoolData { #endif void setNumThreads(int numThreads); - int numThreads() { return _numThreads; } + int numThreads() { return _data.numThreads; } private: void run(ConstIter begin, ConstIter end); void resize(int numThreads); - std::vector> _slaves; + std::vector> _workers; // frame state ConstIter _begin; ConstIter _end; AudioMixerSlave::SharedData& _workerSharedData; + AudioMixerWorkerPoolData _data; }; #endif // hifi_AudioMixerSlavePool_h diff --git a/assignment-client/src/avatars/AvatarMixerSlavePool.cpp b/assignment-client/src/avatars/AvatarMixerSlavePool.cpp index d551464533f..6138d7445bd 100644 --- a/assignment-client/src/avatars/AvatarMixerSlavePool.cpp +++ b/assignment-client/src/avatars/AvatarMixerSlavePool.cpp @@ -14,7 +14,7 @@ #include #include -void AvatarMixerSlaveThread::run() { +void AvatarMixerWorkerThread::run() { while (true) { wait(); @@ -32,42 +32,42 @@ void AvatarMixerSlaveThread::run() { } } -void AvatarMixerSlaveThread::wait() { +void AvatarMixerWorkerThread::wait() { { - Lock slaveLock(_pool._slaveMutex); - _pool._slaveCondition.wait(slaveLock, [&] { - assert(_pool._numStarted <= _pool._numThreads); - return _pool._numStarted != _pool._numThreads; + Lock workerLock(_pool.workerMutex); + _pool.workerCondition.wait(workerLock, [&] { + assert(_pool.numStarted <= _pool.numThreads); + return _pool.numStarted != _pool.numThreads; }); } - ++_pool._numStarted; + ++_pool.numStarted; - if (_pool._configure) { - _pool._configure(*this); + if (_pool.configure) { + _pool.configure(*this); } - _function = _pool._function; + _function = _pool.function; } -void AvatarMixerSlaveThread::notify(bool stopping) { - assert(_pool._numFinished < _pool._numThreads && _pool._numFinished <= _pool._numStarted); - int numFinished = ++_pool._numFinished; +void AvatarMixerWorkerThread::notify(bool stopping) { + assert(_pool.numFinished < _pool.numThreads && _pool.numFinished <= _pool.numStarted); + int numFinished = ++_pool.numFinished; if (stopping) { - ++_pool._numStopped; - assert(_pool._numStopped <= _pool._numFinished); + ++_pool.numStopped; + assert(_pool.numStopped <= _pool.numFinished); } - if (numFinished == _pool._numThreads) { - _pool._poolCondition.notify_one(); + if (numFinished == _pool.numThreads) { + _pool.poolCondition.notify_one(); } } -bool AvatarMixerSlaveThread::try_pop(SharedNodePointer& node) { - return _pool._queue.try_pop(node); +bool AvatarMixerWorkerThread::try_pop(SharedNodePointer& node) { + return _pool.queue.try_pop(node); } void AvatarMixerSlavePool::processIncomingPackets(ConstIter begin, ConstIter end) { - _function = &AvatarMixerSlave::processIncomingPackets; - _configure = [=](AvatarMixerSlave& slave) { + _data.function = &AvatarMixerSlave::processIncomingPackets; + _data.configure = [=](AvatarMixerSlave& slave) { slave.configure(begin, end); }; run(begin, end); @@ -76,10 +76,9 @@ void AvatarMixerSlavePool::processIncomingPackets(ConstIter begin, ConstIter end void AvatarMixerSlavePool::broadcastAvatarData(ConstIter begin, ConstIter end, p_high_resolution_clock::time_point lastFrameTimestamp, float maxKbpsPerNode, float throttlingRatio) { - _function = &AvatarMixerSlave::broadcastAvatarData; - _configure = [=](AvatarMixerSlave& slave) { - slave.configureBroadcast(begin, end, lastFrameTimestamp, maxKbpsPerNode, throttlingRatio, - _priorityReservedFraction); + _data.function = &AvatarMixerSlave::broadcastAvatarData; + _data.configure = [=](AvatarMixerSlave& slave) { + slave.configureBroadcast(begin, end, lastFrameTimestamp, maxKbpsPerNode, throttlingRatio, _priorityReservedFraction); }; run(begin, end); } @@ -90,38 +89,38 @@ void AvatarMixerSlavePool::run(ConstIter begin, ConstIter end) { // fill the queue std::for_each(_begin, _end, [&](const SharedNodePointer& node) { - _queue.push(node); + _data.queue.push(node); }); // run - _numStarted = _numFinished = 0; - _slaveCondition.notify_all(); + _data.numStarted = _data.numFinished = 0; + _data.workerCondition.notify_all(); // wait { - Lock poolLock(_poolMutex); - _poolCondition.wait(poolLock, [&] { - assert(_numFinished <= _numThreads); - return _numFinished == _numThreads; + Lock poolLock(_data.poolMutex); + _data.poolCondition.wait(poolLock, [&] { + assert(_data.numFinished <= _data.numThreads); + return _data.numFinished == _data.numThreads; }); } - assert(_numStarted == _numThreads); + assert(_data.numStarted == _data.numThreads); - assert(_queue.empty()); + assert(_data.queue.empty()); } void AvatarMixerSlavePool::each(std::function functor) { - for (auto& slave : _slaves) { - functor(*slave.get()); + for (auto& worker : _workers) { + functor(*worker.get()); } } #ifdef DEBUG_EVENT_QUEUE void AvatarMixerSlavePool::queueStats(QJsonObject& stats) { unsigned i = 0; - for (auto& slave : _slaves) { - int queueSize = ::hifi::qt::getEventQueueSize(slave.get()); + for (auto& worker : _workers) { + int queueSize = ::hifi::qt::getEventQueueSize(worker.get()); QString queueName = QString("avatar_thread_event_queue_%1").arg(i); stats[queueName] = queueSize; @@ -151,54 +150,54 @@ void AvatarMixerSlavePool::setNumThreads(int numThreads) { } void AvatarMixerSlavePool::resize(int numThreads) { - assert(_numThreads == (int)_slaves.size()); + assert(_data.numThreads == (int)_workers.size()); - qDebug("%s: set %d threads (was %d)", __FUNCTION__, numThreads, _numThreads); + qDebug("%s: set %d threads (was %d)", __FUNCTION__, numThreads, _data.numThreads); - if (numThreads > _numThreads) { + if (numThreads > _data.numThreads) { // start new slaves - for (int i = 0; i < numThreads - _numThreads; ++i) { - auto slave = new AvatarMixerSlaveThread(*this, _slaveSharedData); - slave->start(); - _slaves.emplace_back(slave); + for (int i = 0; i < numThreads - _data.numThreads; ++i) { + auto worker = new AvatarMixerWorkerThread(_data, _slaveSharedData); + worker->start(); + _workers.emplace_back(worker); } - } else if (numThreads < _numThreads) { - auto extraBegin = _slaves.begin() + numThreads; + } else if (numThreads < _data.numThreads) { + auto extraBegin = _workers.begin() + numThreads; // mark slaves to stop... - auto slave = extraBegin; - while (slave != _slaves.end()) { - (*slave)->stop(); - ++slave; + auto worker = extraBegin; + while (worker != _workers.end()) { + (*worker)->stop(); + ++worker; } // ...cycle them until they do stop... - _numStopped = 0; - while (_numStopped != (_numThreads - numThreads)) { - _numStarted = _numFinished = 0; - _slaveCondition.notify_all(); + _data.numStopped = 0; + while (_data.numStopped != (_data.numThreads - numThreads)) { + _data.numStarted = _data.numFinished = 0; + _data.workerCondition.notify_all(); - Lock poolLock(_poolMutex); - _poolCondition.wait(poolLock, [&] { - assert(_numFinished <= _numThreads); - return _numFinished == _numThreads; + Lock poolLock(_data.poolMutex); + _data.poolCondition.wait(poolLock, [&] { + assert(_data.numFinished <= _data.numThreads); + return _data.numFinished == _data.numThreads; }); - assert(_numStopped == (_numThreads - numThreads)); + assert(_data.numStopped == (_data.numThreads - numThreads)); } // ...wait for threads to finish... - slave = extraBegin; - while (slave != _slaves.end()) { - QThread* thread = reinterpret_cast(slave->get()); + worker = extraBegin; + while (worker != _workers.end()) { + QThread* thread = reinterpret_cast(worker->get()); static const int MAX_THREAD_WAIT_TIME = 10; thread->wait(MAX_THREAD_WAIT_TIME); - ++slave; + ++worker; } // ...and erase them - _slaves.erase(extraBegin, _slaves.end()); + _workers.erase(extraBegin, _workers.end()); } - _numThreads = _numStarted = _numFinished = numThreads; - assert(_numThreads == (int)_slaves.size()); + _data.numThreads = _data.numStarted = _data.numFinished = numThreads; + assert(_data.numThreads == (int)_workers.size()); } diff --git a/assignment-client/src/avatars/AvatarMixerSlavePool.h b/assignment-client/src/avatars/AvatarMixerSlavePool.h index 0d0f4eb4180..a0d4f080760 100644 --- a/assignment-client/src/avatars/AvatarMixerSlavePool.h +++ b/assignment-client/src/avatars/AvatarMixerSlavePool.h @@ -25,55 +25,55 @@ #include "AvatarMixerSlave.h" -// Private slave pool data that is shared and accessible with the slave threads. This describes +// Private worker pool data that is shared and accessible with the worker threads. This describes // what information is needs to be thread-safe -struct AvatarMixerSlavePoolData { +struct AvatarMixerWorkerPoolData { using Queue = tbb::concurrent_queue; using Mutex = std::mutex; using ConditionVariable = std::condition_variable; // synchronization state - Mutex _poolMutex; // only used for _poolCondition at the moment - ConditionVariable _poolCondition; // woken when work has been completed (_numStarted = _numFinished = _numThreads) - Mutex _slaveMutex; // only used for _slaveCondition at the moment - ConditionVariable _slaveCondition; // woken when work needs to be done (_numStarted < _numThreads) + Mutex poolMutex; // only used for _poolCondition at the moment + ConditionVariable poolCondition; // woken when work has been completed (_numStarted = _numFinished = _numThreads) + Mutex workerMutex; // only used for _workerCondition at the moment + ConditionVariable workerCondition; // woken when work needs to be done (_numStarted < _numThreads) - // The variables below this point are alternately owned by the pool or by the slaves collectively. + // The variables below this point are alternately owned by the pool or by the workers collectively. // When idle they are owned by the pool. - // Moving ownership to the slaves is done by setting _numStarted = _numFinished = 0 and waking _slaveCondition + // Moving ownership to the workers is done by setting _numStarted = _numFinished = 0 and waking _slaveCondition // Moving ownership to the pool is done when _numFinished == _numThreads and is done by waking _poolCondition - void (AvatarMixerSlave::*_function)(const SharedNodePointer& node); // r/o when owned by slaves, r/w when owned by pool - std::function _configure; // r/o when owned by slaves, r/w when owned by pool + void (AvatarMixerSlave::*function)(const SharedNodePointer& node); // r/o when owned by workers, r/w when owned by pool + std::function configure; // r/o when owned by workers, r/w when owned by pool - // Number of currently-running slave threads - // r/o when owned by slaves, r/w when owned by pool - int _numThreads{ 0 }; + // Number of currently-running worker threads + // r/o when owned by workers, r/w when owned by pool + int numThreads{ 0 }; - // Number of slave threads "awake" and processing the current request (0 <= _numStarted <= _numThreads) - // incremented when owned by slaves, r/w when owned by pool - std::atomic _numStarted{ 0 }; + // Number of worker threads "awake" and processing the current request (0 <= _numStarted <= _numThreads) + // incremented when owned by workers, r/w when owned by pool + std::atomic numStarted{ 0 }; - // Number of slave threads finished with the current request (0 <= _numStarted <= _numThreads) - // incremented when owned by slaves, r/w when owned by pool - std::atomic _numFinished{ 0 }; + // Number of worker threads finished with the current request (0 <= _numStarted <= _numThreads) + // incremented when owned by workers, r/w when owned by pool + std::atomic numFinished{ 0 }; - // Number of slave threads shutting down when asked to (0 <= _numStarted <= _numThreads) - // incremented when owned by slaves, r/w when owned by pool - std::atomic _numStopped{ 0 }; + // Number of worker threads shutting down when asked to (0 <= _numStarted <= _numThreads) + // incremented when owned by workers, r/w when owned by pool + std::atomic numStopped{ 0 }; // frame state - Queue _queue; + Queue queue; }; -class AvatarMixerSlaveThread : public QThread, public AvatarMixerSlave { +class AvatarMixerWorkerThread : public QThread, public AvatarMixerSlave { Q_OBJECT using ConstIter = NodeList::const_iterator; using Mutex = std::mutex; using Lock = std::unique_lock; public: - AvatarMixerSlaveThread(AvatarMixerSlavePoolData& pool, SlaveSharedData* slaveSharedData) : + AvatarMixerWorkerThread(AvatarMixerWorkerPoolData& pool, SlaveSharedData* slaveSharedData) : AvatarMixerSlave(slaveSharedData), _pool(pool) {}; void run() override final; @@ -84,14 +84,15 @@ class AvatarMixerSlaveThread : public QThread, public AvatarMixerSlave { void notify(bool stopping); bool try_pop(SharedNodePointer& node); - AvatarMixerSlavePoolData& _pool; + AvatarMixerWorkerPoolData& _pool; void (AvatarMixerSlave::*_function)(const SharedNodePointer& node) { nullptr }; bool _stop { false }; }; -// Slave pool for avatar mixers +// Worker pool for avatar mixers // AvatarMixerSlavePool is not thread-safe! It should be instantiated and used from a single thread. -class AvatarMixerSlavePool : private AvatarMixerSlavePoolData { +class AvatarMixerSlavePool { + using Mutex = std::mutex; using Lock = std::unique_lock; public: @@ -101,12 +102,12 @@ class AvatarMixerSlavePool : private AvatarMixerSlavePoolData { _slaveSharedData(slaveSharedData) { setNumThreads(numThreads); } ~AvatarMixerSlavePool() { resize(0); } - // Jobs the slave pool can do... + // Jobs the worker pool can do... void processIncomingPackets(ConstIter begin, ConstIter end); void broadcastAvatarData(ConstIter begin, ConstIter end, p_high_resolution_clock::time_point lastFrameTimestamp, float maxKbpsPerNode, float throttlingRatio); - // iterate over all slaves + // iterate over all workers void each(std::function functor); #ifdef DEBUG_EVENT_QUEUE @@ -114,7 +115,7 @@ class AvatarMixerSlavePool : private AvatarMixerSlavePoolData { #endif void setNumThreads(int numThreads); - int numThreads() const { return _numThreads; } + int numThreads() const { return _data.numThreads; } void setPriorityReservedFraction(float fraction) { _priorityReservedFraction = fraction; } float getPriorityReservedFraction() const { return _priorityReservedFraction; } @@ -123,7 +124,7 @@ class AvatarMixerSlavePool : private AvatarMixerSlavePoolData { void run(ConstIter begin, ConstIter end); void resize(int numThreads); - std::vector> _slaves; + std::vector> _workers; // Set from Domain Settings: float _priorityReservedFraction { 0.4f }; @@ -133,6 +134,7 @@ class AvatarMixerSlavePool : private AvatarMixerSlavePoolData { ConstIter _end; SlaveSharedData* _slaveSharedData; + AvatarMixerWorkerPoolData _data; }; #endif // hifi_AvatarMixerSlavePool_h From 569afb18a2ef8ed8d5d5a6105c07f32d34032cbe Mon Sep 17 00:00:00 2001 From: Heather Anderson Date: Fri, 24 Sep 2021 11:00:04 -0700 Subject: [PATCH 07/16] resolve potential race condition --- .../src/audio/AudioMixerSlavePool.cpp | 45 +++++++++++------ .../src/avatars/AvatarMixerSlavePool.cpp | 49 ++++++++++++------- 2 files changed, 61 insertions(+), 33 deletions(-) diff --git a/assignment-client/src/audio/AudioMixerSlavePool.cpp b/assignment-client/src/audio/AudioMixerSlavePool.cpp index 4db71f0fe84..9ca03e4ba9e 100644 --- a/assignment-client/src/audio/AudioMixerSlavePool.cpp +++ b/assignment-client/src/audio/AudioMixerSlavePool.cpp @@ -39,10 +39,12 @@ void AudioMixerWorkerThread::run() { void AudioMixerWorkerThread::wait() { { Lock workerLock(_pool.workerMutex); - _pool.workerCondition.wait(workerLock, [&] { - assert(_pool.numStarted <= _pool.numThreads); - return _pool.numStarted != _pool.numThreads; - }); + if (_pool.numStarted == _pool.numThreads) { + _pool.workerCondition.wait(workerLock, [&] { + assert(_pool.numStarted <= _pool.numThreads); + return _pool.numStarted != _pool.numThreads; + }); + } } ++_pool.numStarted; @@ -61,6 +63,7 @@ void AudioMixerWorkerThread::notify(bool stopping) { } if (numFinished == _pool.numThreads) { + Lock poolLock(_pool.poolMutex); _pool.poolCondition.notify_one(); } } @@ -94,16 +97,21 @@ void AudioMixerSlavePool::run(ConstIter begin, ConstIter end) { }); // run - _data.numStarted = _data.numFinished = 0; - _data.workerCondition.notify_all(); + { + Lock workerLock(_data.workerMutex); + _data.numStarted = _data.numFinished = 0; + _data.workerCondition.notify_all(); + } // wait { Lock poolLock(_data.poolMutex); - _data.poolCondition.wait(poolLock, [&] { - assert(_data.numFinished <= _data.numThreads); - return _data.numFinished == _data.numThreads; - }); + if (_data.numFinished < _data.numThreads) { + _data.poolCondition.wait(poolLock, [&] { + assert(_data.numFinished <= _data.numThreads); + return _data.numFinished == _data.numThreads; + }); + } } assert(_data.numStarted == _data.numThreads); @@ -175,14 +183,19 @@ void AudioMixerSlavePool::resize(int numThreads) { // ...cycle them until they do stop... _data.numStopped = 0; while (_data.numStopped != (_data.numThreads - numThreads)) { - _data.numStarted = _data.numFinished = 0; - _data.workerCondition.notify_all(); + { + Lock workerLock(_data.workerMutex); + _data.numStarted = _data.numFinished = 0; + _data.workerCondition.notify_all(); + } Lock poolLock(_data.poolMutex); - _data.poolCondition.wait(poolLock, [&] { - assert(_data.numFinished <= _data.numThreads); - return _data.numFinished == _data.numThreads; - }); + if (_data.numFinished < _data.numThreads) { + _data.poolCondition.wait(poolLock, [&] { + assert(_data.numFinished <= _data.numThreads); + return _data.numFinished == _data.numThreads; + }); + } assert(_data.numStopped == (_data.numThreads - numThreads)); } diff --git a/assignment-client/src/avatars/AvatarMixerSlavePool.cpp b/assignment-client/src/avatars/AvatarMixerSlavePool.cpp index 6138d7445bd..1e4e0efe602 100644 --- a/assignment-client/src/avatars/AvatarMixerSlavePool.cpp +++ b/assignment-client/src/avatars/AvatarMixerSlavePool.cpp @@ -35,10 +35,12 @@ void AvatarMixerWorkerThread::run() { void AvatarMixerWorkerThread::wait() { { Lock workerLock(_pool.workerMutex); - _pool.workerCondition.wait(workerLock, [&] { - assert(_pool.numStarted <= _pool.numThreads); - return _pool.numStarted != _pool.numThreads; - }); + if (_pool.numStarted == _pool.numThreads) { + _pool.workerCondition.wait(workerLock, [&] { + assert(_pool.numStarted <= _pool.numThreads); + return _pool.numStarted != _pool.numThreads; + }); + } } ++_pool.numStarted; @@ -57,6 +59,7 @@ void AvatarMixerWorkerThread::notify(bool stopping) { } if (numFinished == _pool.numThreads) { + Lock poolLock(_pool.poolMutex); _pool.poolCondition.notify_one(); } } @@ -93,16 +96,21 @@ void AvatarMixerSlavePool::run(ConstIter begin, ConstIter end) { }); // run - _data.numStarted = _data.numFinished = 0; - _data.workerCondition.notify_all(); + { + Lock workerLock(_data.workerMutex); + _data.numStarted = _data.numFinished = 0; + _data.workerCondition.notify_all(); + } // wait { Lock poolLock(_data.poolMutex); - _data.poolCondition.wait(poolLock, [&] { - assert(_data.numFinished <= _data.numThreads); - return _data.numFinished == _data.numThreads; - }); + if (_data.numFinished < _data.numThreads) { + _data.poolCondition.wait(poolLock, [&] { + assert(_data.numFinished <= _data.numThreads); + return _data.numFinished == _data.numThreads; + }); + } } assert(_data.numStarted == _data.numThreads); @@ -174,14 +182,21 @@ void AvatarMixerSlavePool::resize(int numThreads) { // ...cycle them until they do stop... _data.numStopped = 0; while (_data.numStopped != (_data.numThreads - numThreads)) { - _data.numStarted = _data.numFinished = 0; - _data.workerCondition.notify_all(); + { + Lock workerLock(_data.workerMutex); + _data.numStarted = _data.numFinished = 0; + _data.workerCondition.notify_all(); + } - Lock poolLock(_data.poolMutex); - _data.poolCondition.wait(poolLock, [&] { - assert(_data.numFinished <= _data.numThreads); - return _data.numFinished == _data.numThreads; - }); + { + Lock poolLock(_data.poolMutex); + if (_data.numFinished < _data.numThreads) { + _data.poolCondition.wait(poolLock, [&] { + assert(_data.numFinished <= _data.numThreads); + return _data.numFinished == _data.numThreads; + }); + } + } assert(_data.numStopped == (_data.numThreads - numThreads)); } From bc5d9de4e6aec8d984423b1eabf4f9f8660b1b3a Mon Sep 17 00:00:00 2001 From: Heather Anderson Date: Fri, 24 Sep 2021 20:15:14 -0700 Subject: [PATCH 08/16] renamed _pool to _data, to bring it more in line with other users --- .../src/audio/AudioMixerSlavePool.cpp | 34 +++++++++---------- .../src/audio/AudioMixerSlavePool.h | 8 ++--- .../src/avatars/AvatarMixerSlavePool.cpp | 34 +++++++++---------- .../src/avatars/AvatarMixerSlavePool.h | 8 ++--- 4 files changed, 42 insertions(+), 42 deletions(-) diff --git a/assignment-client/src/audio/AudioMixerSlavePool.cpp b/assignment-client/src/audio/AudioMixerSlavePool.cpp index 9ca03e4ba9e..6c940c5622d 100644 --- a/assignment-client/src/audio/AudioMixerSlavePool.cpp +++ b/assignment-client/src/audio/AudioMixerSlavePool.cpp @@ -38,38 +38,38 @@ void AudioMixerWorkerThread::run() { void AudioMixerWorkerThread::wait() { { - Lock workerLock(_pool.workerMutex); - if (_pool.numStarted == _pool.numThreads) { - _pool.workerCondition.wait(workerLock, [&] { - assert(_pool.numStarted <= _pool.numThreads); - return _pool.numStarted != _pool.numThreads; + Lock workerLock(_data.workerMutex); + if (_data.numStarted == _data.numThreads) { + _data.workerCondition.wait(workerLock, [&] { + assert(_data.numStarted <= _data.numThreads); + return _data.numStarted != _data.numThreads; }); } } - ++_pool.numStarted; + ++_data.numStarted; - if (_pool.configure) { - _pool.configure(*this); + if (_data.configure) { + _data.configure(*this); } - _function = _pool.function; + _function = _data.function; } void AudioMixerWorkerThread::notify(bool stopping) { - assert(_pool.numFinished < _pool.numThreads && _pool.numFinished <= _pool.numStarted); - int numFinished = ++_pool.numFinished; + assert(_data.numFinished < _data.numThreads && _data.numFinished <= _data.numStarted); + int numFinished = ++_data.numFinished; if (stopping) { - ++_pool.numStopped; - assert(_pool.numStopped <= _pool.numFinished); + ++_data.numStopped; + assert(_data.numStopped <= _data.numFinished); } - if (numFinished == _pool.numThreads) { - Lock poolLock(_pool.poolMutex); - _pool.poolCondition.notify_one(); + if (numFinished == _data.numThreads) { + Lock poolLock(_data.poolMutex); + _data.poolCondition.notify_one(); } } bool AudioMixerWorkerThread::try_pop(SharedNodePointer& node) { - return _pool.queue.try_pop(node); + return _data.queue.try_pop(node); } void AudioMixerSlavePool::processPackets(ConstIter begin, ConstIter end) { diff --git a/assignment-client/src/audio/AudioMixerSlavePool.h b/assignment-client/src/audio/AudioMixerSlavePool.h index df437c32733..f0b4a448b43 100644 --- a/assignment-client/src/audio/AudioMixerSlavePool.h +++ b/assignment-client/src/audio/AudioMixerSlavePool.h @@ -17,7 +17,7 @@ #include #include -#include +#include // for DEBUG_EVENT_QUEUE #include #include "AudioMixerSlave.h" @@ -70,8 +70,8 @@ class AudioMixerWorkerThread : public QThread, public AudioMixerSlave { using Lock = std::unique_lock; public: - AudioMixerWorkerThread(AudioMixerWorkerPoolData& pool, AudioMixerSlave::SharedData& sharedData) - : AudioMixerSlave(sharedData), _pool(pool) {} + AudioMixerWorkerThread(AudioMixerWorkerPoolData& data, AudioMixerSlave::SharedData& sharedData) : + AudioMixerSlave(sharedData), _data(data) {} void run() override final; inline void stop() { _stop = true; } @@ -81,7 +81,7 @@ class AudioMixerWorkerThread : public QThread, public AudioMixerSlave { void notify(bool stopping); bool try_pop(SharedNodePointer& node); - AudioMixerWorkerPoolData& _pool; + AudioMixerWorkerPoolData& _data; void (AudioMixerSlave::*_function)(const SharedNodePointer& node) { nullptr }; bool _stop { false }; }; diff --git a/assignment-client/src/avatars/AvatarMixerSlavePool.cpp b/assignment-client/src/avatars/AvatarMixerSlavePool.cpp index 1e4e0efe602..f8dd99af672 100644 --- a/assignment-client/src/avatars/AvatarMixerSlavePool.cpp +++ b/assignment-client/src/avatars/AvatarMixerSlavePool.cpp @@ -34,38 +34,38 @@ void AvatarMixerWorkerThread::run() { void AvatarMixerWorkerThread::wait() { { - Lock workerLock(_pool.workerMutex); - if (_pool.numStarted == _pool.numThreads) { - _pool.workerCondition.wait(workerLock, [&] { - assert(_pool.numStarted <= _pool.numThreads); - return _pool.numStarted != _pool.numThreads; + Lock workerLock(_data.workerMutex); + if (_data.numStarted == _data.numThreads) { + _data.workerCondition.wait(workerLock, [&] { + assert(_data.numStarted <= _data.numThreads); + return _data.numStarted != _data.numThreads; }); } } - ++_pool.numStarted; + ++_data.numStarted; - if (_pool.configure) { - _pool.configure(*this); + if (_data.configure) { + _data.configure(*this); } - _function = _pool.function; + _function = _data.function; } void AvatarMixerWorkerThread::notify(bool stopping) { - assert(_pool.numFinished < _pool.numThreads && _pool.numFinished <= _pool.numStarted); - int numFinished = ++_pool.numFinished; + assert(_data.numFinished < _data.numThreads && _data.numFinished <= _data.numStarted); + int numFinished = ++_data.numFinished; if (stopping) { - ++_pool.numStopped; - assert(_pool.numStopped <= _pool.numFinished); + ++_data.numStopped; + assert(_data.numStopped <= _data.numFinished); } - if (numFinished == _pool.numThreads) { - Lock poolLock(_pool.poolMutex); - _pool.poolCondition.notify_one(); + if (numFinished == _data.numThreads) { + Lock poolLock(_data.poolMutex); + _data.poolCondition.notify_one(); } } bool AvatarMixerWorkerThread::try_pop(SharedNodePointer& node) { - return _pool.queue.try_pop(node); + return _data.queue.try_pop(node); } void AvatarMixerSlavePool::processIncomingPackets(ConstIter begin, ConstIter end) { diff --git a/assignment-client/src/avatars/AvatarMixerSlavePool.h b/assignment-client/src/avatars/AvatarMixerSlavePool.h index a0d4f080760..d15c2703020 100644 --- a/assignment-client/src/avatars/AvatarMixerSlavePool.h +++ b/assignment-client/src/avatars/AvatarMixerSlavePool.h @@ -20,7 +20,7 @@ #include #include -#include +#include // for DEBUG_EVENT_QUEUE #include "AvatarMixerSlave.h" @@ -73,8 +73,8 @@ class AvatarMixerWorkerThread : public QThread, public AvatarMixerSlave { using Lock = std::unique_lock; public: - AvatarMixerWorkerThread(AvatarMixerWorkerPoolData& pool, SlaveSharedData* slaveSharedData) : - AvatarMixerSlave(slaveSharedData), _pool(pool) {}; + AvatarMixerWorkerThread(AvatarMixerWorkerPoolData& data, SlaveSharedData* slaveSharedData) : + AvatarMixerSlave(slaveSharedData), _data(data){}; void run() override final; inline void stop() { _stop = true; } @@ -84,7 +84,7 @@ class AvatarMixerWorkerThread : public QThread, public AvatarMixerSlave { void notify(bool stopping); bool try_pop(SharedNodePointer& node); - AvatarMixerWorkerPoolData& _pool; + AvatarMixerWorkerPoolData& _data; void (AvatarMixerSlave::*_function)(const SharedNodePointer& node) { nullptr }; bool _stop { false }; }; From 45d33127f60ebace362af1ca94fb2aab3ee5ade3 Mon Sep 17 00:00:00 2001 From: Heather Anderson Date: Sat, 25 Sep 2021 10:15:20 -0700 Subject: [PATCH 09/16] remove last reference to _slaves --- assignment-client/src/audio/AudioMixerSlavePool.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/assignment-client/src/audio/AudioMixerSlavePool.cpp b/assignment-client/src/audio/AudioMixerSlavePool.cpp index 6c940c5622d..29f8c20c2f1 100644 --- a/assignment-client/src/audio/AudioMixerSlavePool.cpp +++ b/assignment-client/src/audio/AudioMixerSlavePool.cpp @@ -213,5 +213,5 @@ void AudioMixerSlavePool::resize(int numThreads) { } _data.numThreads = _data.numStarted = _data.numFinished = numThreads; - assert(_data.numThreads == (int)_slaves.size()); + assert(_data.numThreads == (int)_workers.size()); } From 3a4e8a7fb76f212f44d070a4c2de9cde50efb347 Mon Sep 17 00:00:00 2001 From: Heather Anderson Date: Sat, 25 Sep 2021 17:11:58 -0700 Subject: [PATCH 10/16] better management of numThreads when launching new workers --- assignment-client/src/audio/AudioMixerSlavePool.cpp | 6 ++++-- assignment-client/src/avatars/AvatarMixerSlavePool.cpp | 6 ++++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/assignment-client/src/audio/AudioMixerSlavePool.cpp b/assignment-client/src/audio/AudioMixerSlavePool.cpp index 29f8c20c2f1..acc5f4d8de7 100644 --- a/assignment-client/src/audio/AudioMixerSlavePool.cpp +++ b/assignment-client/src/audio/AudioMixerSlavePool.cpp @@ -55,7 +55,8 @@ void AudioMixerWorkerThread::wait() { } void AudioMixerWorkerThread::notify(bool stopping) { - assert(_data.numFinished < _data.numThreads && _data.numFinished <= _data.numStarted); + assert(_data.numFinished < _data.numThreads); + assert(_data.numFinished <= _data.numStarted); int numFinished = ++_data.numFinished; if (stopping) { ++_data.numStopped; @@ -164,7 +165,8 @@ void AudioMixerSlavePool::resize(int numThreads) { if (numThreads > _data.numThreads) { // start new slaves - for (int i = 0; i < numThreads - _data.numThreads; ++i) { + while (numThreads > _data.numThreads) { + _data.numThreads++; auto worker = new AudioMixerWorkerThread(_data, _workerSharedData); QObject::connect(worker, &QThread::started, [] { setThreadName("AudioMixerSlaveThread"); }); worker->start(); diff --git a/assignment-client/src/avatars/AvatarMixerSlavePool.cpp b/assignment-client/src/avatars/AvatarMixerSlavePool.cpp index f8dd99af672..9bc06e67dc9 100644 --- a/assignment-client/src/avatars/AvatarMixerSlavePool.cpp +++ b/assignment-client/src/avatars/AvatarMixerSlavePool.cpp @@ -51,7 +51,8 @@ void AvatarMixerWorkerThread::wait() { } void AvatarMixerWorkerThread::notify(bool stopping) { - assert(_data.numFinished < _data.numThreads && _data.numFinished <= _data.numStarted); + assert(_data.numFinished < _data.numThreads); + assert(_data.numFinished <= _data.numStarted); int numFinished = ++_data.numFinished; if (stopping) { ++_data.numStopped; @@ -164,7 +165,8 @@ void AvatarMixerSlavePool::resize(int numThreads) { if (numThreads > _data.numThreads) { // start new slaves - for (int i = 0; i < numThreads - _data.numThreads; ++i) { + while (numThreads > _data.numThreads) { + _data.numThreads++; auto worker = new AvatarMixerWorkerThread(_data, _slaveSharedData); worker->start(); _workers.emplace_back(worker); From 146da44a4a43600a43756403c590fd94fb41e46b Mon Sep 17 00:00:00 2001 From: Heather Anderson Date: Sun, 26 Sep 2021 11:17:15 -0700 Subject: [PATCH 11/16] ensure that newly-created threads are properly configured before leaving "resize" --- assignment-client/src/audio/AudioMixerSlavePool.cpp | 11 +++++++++++ .../src/avatars/AvatarMixerSlavePool.cpp | 11 +++++++++++ 2 files changed, 22 insertions(+) diff --git a/assignment-client/src/audio/AudioMixerSlavePool.cpp b/assignment-client/src/audio/AudioMixerSlavePool.cpp index acc5f4d8de7..fa76b2aa310 100644 --- a/assignment-client/src/audio/AudioMixerSlavePool.cpp +++ b/assignment-client/src/audio/AudioMixerSlavePool.cpp @@ -164,6 +164,8 @@ void AudioMixerSlavePool::resize(int numThreads) { qDebug("%s: set %d threads (was %d)", __FUNCTION__, numThreads, _data.numThreads); if (numThreads > _data.numThreads) { + assert(_data.numFinished == _data.numThreads); + // start new slaves while (numThreads > _data.numThreads) { _data.numThreads++; @@ -172,6 +174,15 @@ void AudioMixerSlavePool::resize(int numThreads) { worker->start(); _workers.emplace_back(worker); } + + // wait for the new workers to wake up and enter the wait + Lock poolLock(_data.poolMutex); + if (_data.numFinished < _data.numThreads) { + _data.poolCondition.wait(poolLock, [&] { + assert(_data.numFinished <= _data.numThreads); + return _data.numFinished == _data.numThreads; + }); + } } else if (numThreads < _data.numThreads) { auto extraBegin = _workers.begin() + numThreads; diff --git a/assignment-client/src/avatars/AvatarMixerSlavePool.cpp b/assignment-client/src/avatars/AvatarMixerSlavePool.cpp index 9bc06e67dc9..a01c62e7668 100644 --- a/assignment-client/src/avatars/AvatarMixerSlavePool.cpp +++ b/assignment-client/src/avatars/AvatarMixerSlavePool.cpp @@ -164,6 +164,8 @@ void AvatarMixerSlavePool::resize(int numThreads) { qDebug("%s: set %d threads (was %d)", __FUNCTION__, numThreads, _data.numThreads); if (numThreads > _data.numThreads) { + assert(_data.numFinished == _data.numThreads); + // start new slaves while (numThreads > _data.numThreads) { _data.numThreads++; @@ -171,6 +173,15 @@ void AvatarMixerSlavePool::resize(int numThreads) { worker->start(); _workers.emplace_back(worker); } + + // wait for the new workers to wake up and enter the wait + Lock poolLock(_data.poolMutex); + if (_data.numFinished < _data.numThreads) { + _data.poolCondition.wait(poolLock, [&] { + assert(_data.numFinished <= _data.numThreads); + return _data.numFinished == _data.numThreads; + }); + } } else if (numThreads < _data.numThreads) { auto extraBegin = _workers.begin() + numThreads; From 83f2b52695f4dbfbe0b0d53197469c3aced5bdea Mon Sep 17 00:00:00 2001 From: Heather Anderson Date: Sun, 26 Sep 2021 11:32:56 -0700 Subject: [PATCH 12/16] added some missed field initializations --- assignment-client/src/audio/AudioMixerSlavePool.cpp | 2 ++ assignment-client/src/audio/AudioMixerSlavePool.h | 4 ++-- assignment-client/src/avatars/AvatarMixerSlavePool.cpp | 2 ++ assignment-client/src/avatars/AvatarMixerSlavePool.h | 4 ++-- 4 files changed, 8 insertions(+), 4 deletions(-) diff --git a/assignment-client/src/audio/AudioMixerSlavePool.cpp b/assignment-client/src/audio/AudioMixerSlavePool.cpp index fa76b2aa310..83547b8d50d 100644 --- a/assignment-client/src/audio/AudioMixerSlavePool.cpp +++ b/assignment-client/src/audio/AudioMixerSlavePool.cpp @@ -25,6 +25,7 @@ void AudioMixerWorkerThread::run() { // iterate over all available nodes SharedNodePointer node; while (try_pop(node)) { + assert(_function); (this->*_function)(node); } @@ -52,6 +53,7 @@ void AudioMixerWorkerThread::wait() { _data.configure(*this); } _function = _data.function; + assert(_function); } void AudioMixerWorkerThread::notify(bool stopping) { diff --git a/assignment-client/src/audio/AudioMixerSlavePool.h b/assignment-client/src/audio/AudioMixerSlavePool.h index f0b4a448b43..41778278736 100644 --- a/assignment-client/src/audio/AudioMixerSlavePool.h +++ b/assignment-client/src/audio/AudioMixerSlavePool.h @@ -40,8 +40,8 @@ struct AudioMixerWorkerPoolData { // Moving ownership to the workers is done by setting _numStarted = _numFinished = 0 and waking _workerCondition // Moving ownership to the pool is done when _numFinished == _numThreads and is done by waking _poolCondition - void (AudioMixerSlave::*function)(const SharedNodePointer& node); // r/o when owned by workers, r/w when owned by pool - std::function configure; // r/o when owned by workers, r/w when owned by pool + void (AudioMixerSlave::*function)(const SharedNodePointer& node){ nullptr }; // r/o when owned by workers, r/w when owned by pool + std::function configure{ nullptr }; // r/o when owned by workers, r/w when owned by pool // Number of currently-running worker threads // r/o when owned by workersves, r/w when owned by pool diff --git a/assignment-client/src/avatars/AvatarMixerSlavePool.cpp b/assignment-client/src/avatars/AvatarMixerSlavePool.cpp index a01c62e7668..427f5278b5f 100644 --- a/assignment-client/src/avatars/AvatarMixerSlavePool.cpp +++ b/assignment-client/src/avatars/AvatarMixerSlavePool.cpp @@ -21,6 +21,7 @@ void AvatarMixerWorkerThread::run() { // iterate over all available nodes SharedNodePointer node; while (try_pop(node)) { + assert(_function); (this->*_function)(node); } @@ -48,6 +49,7 @@ void AvatarMixerWorkerThread::wait() { _data.configure(*this); } _function = _data.function; + assert(_function); } void AvatarMixerWorkerThread::notify(bool stopping) { diff --git a/assignment-client/src/avatars/AvatarMixerSlavePool.h b/assignment-client/src/avatars/AvatarMixerSlavePool.h index d15c2703020..c7bab1e0fda 100644 --- a/assignment-client/src/avatars/AvatarMixerSlavePool.h +++ b/assignment-client/src/avatars/AvatarMixerSlavePool.h @@ -43,8 +43,8 @@ struct AvatarMixerWorkerPoolData { // Moving ownership to the workers is done by setting _numStarted = _numFinished = 0 and waking _slaveCondition // Moving ownership to the pool is done when _numFinished == _numThreads and is done by waking _poolCondition - void (AvatarMixerSlave::*function)(const SharedNodePointer& node); // r/o when owned by workers, r/w when owned by pool - std::function configure; // r/o when owned by workers, r/w when owned by pool + void (AvatarMixerSlave::*function)(const SharedNodePointer& node){ nullptr }; // r/o when owned by workers, r/w when owned by pool + std::function configure{ nullptr }; // r/o when owned by workers, r/w when owned by pool // Number of currently-running worker threads // r/o when owned by workers, r/w when owned by pool From df59a42c68f8405c1a591feabaf0e17af08fd20f Mon Sep 17 00:00:00 2001 From: Heather Anderson Date: Mon, 27 Sep 2021 12:52:05 -0700 Subject: [PATCH 13/16] more tweaks to improve worker thread startup --- .../src/audio/AudioMixerSlavePool.cpp | 29 ++++++++++++------- .../src/audio/AudioMixerSlavePool.h | 2 +- .../src/avatars/AvatarMixerSlavePool.cpp | 29 ++++++++++++------- .../src/avatars/AvatarMixerSlavePool.h | 2 +- 4 files changed, 38 insertions(+), 24 deletions(-) diff --git a/assignment-client/src/audio/AudioMixerSlavePool.cpp b/assignment-client/src/audio/AudioMixerSlavePool.cpp index 83547b8d50d..d2a6f49d98f 100644 --- a/assignment-client/src/audio/AudioMixerSlavePool.cpp +++ b/assignment-client/src/audio/AudioMixerSlavePool.cpp @@ -19,25 +19,32 @@ #include void AudioMixerWorkerThread::run() { - while (true) { - wait(); - - // iterate over all available nodes - SharedNodePointer node; - while (try_pop(node)) { - assert(_function); - (this->*_function)(node); - } + assert(_data.numStarted < _data.numThreads); + _data.numStarted++; + while (true) { bool stopping = _stop; notify(stopping); if (stopping) { return; } + + wait(); + assert(_function); + + if (_function) { + // iterate over all available nodes + SharedNodePointer node; + while (try_pop(node)) { + (this->*_function)(node); + } + } } } void AudioMixerWorkerThread::wait() { + assert(_data.numStarted <= _data.numThreads); + { Lock workerLock(_data.workerMutex); if (_data.numStarted == _data.numThreads) { @@ -47,7 +54,7 @@ void AudioMixerWorkerThread::wait() { }); } } - ++_data.numStarted; + _data.numStarted++; if (_data.configure) { _data.configure(*this); @@ -61,7 +68,7 @@ void AudioMixerWorkerThread::notify(bool stopping) { assert(_data.numFinished <= _data.numStarted); int numFinished = ++_data.numFinished; if (stopping) { - ++_data.numStopped; + _data.numStopped++; assert(_data.numStopped <= _data.numFinished); } diff --git a/assignment-client/src/audio/AudioMixerSlavePool.h b/assignment-client/src/audio/AudioMixerSlavePool.h index 41778278736..7dd5304aade 100644 --- a/assignment-client/src/audio/AudioMixerSlavePool.h +++ b/assignment-client/src/audio/AudioMixerSlavePool.h @@ -83,7 +83,7 @@ class AudioMixerWorkerThread : public QThread, public AudioMixerSlave { AudioMixerWorkerPoolData& _data; void (AudioMixerSlave::*_function)(const SharedNodePointer& node) { nullptr }; - bool _stop { false }; + volatile bool _stop { false }; // using volatile here mostly for compiler hinting, recognize it has minimal meaning }; // Worker pool for audio mixers diff --git a/assignment-client/src/avatars/AvatarMixerSlavePool.cpp b/assignment-client/src/avatars/AvatarMixerSlavePool.cpp index 427f5278b5f..9b08c103fb3 100644 --- a/assignment-client/src/avatars/AvatarMixerSlavePool.cpp +++ b/assignment-client/src/avatars/AvatarMixerSlavePool.cpp @@ -15,25 +15,32 @@ #include void AvatarMixerWorkerThread::run() { - while (true) { - wait(); - - // iterate over all available nodes - SharedNodePointer node; - while (try_pop(node)) { - assert(_function); - (this->*_function)(node); - } + assert(_data.numStarted < _data.numThreads); + _data.numStarted++; + while (true) { bool stopping = _stop; notify(stopping); if (stopping) { return; } + + wait(); + assert(_function); + + if (_function) { + // iterate over all available nodes + SharedNodePointer node; + while (try_pop(node)) { + (this->*_function)(node); + } + } } } void AvatarMixerWorkerThread::wait() { + assert(_data.numStarted <= _data.numThreads); + { Lock workerLock(_data.workerMutex); if (_data.numStarted == _data.numThreads) { @@ -43,7 +50,7 @@ void AvatarMixerWorkerThread::wait() { }); } } - ++_data.numStarted; + _data.numStarted++; if (_data.configure) { _data.configure(*this); @@ -57,7 +64,7 @@ void AvatarMixerWorkerThread::notify(bool stopping) { assert(_data.numFinished <= _data.numStarted); int numFinished = ++_data.numFinished; if (stopping) { - ++_data.numStopped; + _data.numStopped++; assert(_data.numStopped <= _data.numFinished); } diff --git a/assignment-client/src/avatars/AvatarMixerSlavePool.h b/assignment-client/src/avatars/AvatarMixerSlavePool.h index c7bab1e0fda..613699cba5c 100644 --- a/assignment-client/src/avatars/AvatarMixerSlavePool.h +++ b/assignment-client/src/avatars/AvatarMixerSlavePool.h @@ -86,7 +86,7 @@ class AvatarMixerWorkerThread : public QThread, public AvatarMixerSlave { AvatarMixerWorkerPoolData& _data; void (AvatarMixerSlave::*_function)(const SharedNodePointer& node) { nullptr }; - bool _stop { false }; + volatile bool _stop{ false }; // using volatile here mostly for compiler hinting, recognize it has minimal meaning }; // Worker pool for avatar mixers From 5389d3fd37c489779ec62a348d1e1e36be5d9e0c Mon Sep 17 00:00:00 2001 From: Heather Anderson Date: Tue, 28 Sep 2021 00:43:19 -0700 Subject: [PATCH 14/16] New threads now unconditionally enter a wait without checking if numStarted==numThreads (as numThreads is changing out-of-band in this special case) Moved numStarted++ inside the workerLock so as to avoid spurious worker thread wakes --- assignment-client/src/audio/AudioMixerSlavePool.cpp | 11 +++++++---- assignment-client/src/audio/AudioMixerSlavePool.h | 2 +- .../src/avatars/AvatarMixerSlavePool.cpp | 11 +++++++---- assignment-client/src/avatars/AvatarMixerSlavePool.h | 2 +- 4 files changed, 16 insertions(+), 10 deletions(-) diff --git a/assignment-client/src/audio/AudioMixerSlavePool.cpp b/assignment-client/src/audio/AudioMixerSlavePool.cpp index d2a6f49d98f..2e179f791a0 100644 --- a/assignment-client/src/audio/AudioMixerSlavePool.cpp +++ b/assignment-client/src/audio/AudioMixerSlavePool.cpp @@ -22,6 +22,7 @@ void AudioMixerWorkerThread::run() { assert(_data.numStarted < _data.numThreads); _data.numStarted++; + bool starting = true; while (true) { bool stopping = _stop; notify(stopping); @@ -29,7 +30,8 @@ void AudioMixerWorkerThread::run() { return; } - wait(); + wait(starting); + starting = false; assert(_function); if (_function) { @@ -42,19 +44,19 @@ void AudioMixerWorkerThread::run() { } } -void AudioMixerWorkerThread::wait() { +void AudioMixerWorkerThread::wait(bool starting) { assert(_data.numStarted <= _data.numThreads); { Lock workerLock(_data.workerMutex); - if (_data.numStarted == _data.numThreads) { + if (starting || _data.numStarted == _data.numThreads) { _data.workerCondition.wait(workerLock, [&] { assert(_data.numStarted <= _data.numThreads); return _data.numStarted != _data.numThreads; }); } + _data.numStarted++; } - _data.numStarted++; if (_data.configure) { _data.configure(*this); @@ -236,4 +238,5 @@ void AudioMixerSlavePool::resize(int numThreads) { _data.numThreads = _data.numStarted = _data.numFinished = numThreads; assert(_data.numThreads == (int)_workers.size()); + qDebug("%s: completed", __FUNCTION__); } diff --git a/assignment-client/src/audio/AudioMixerSlavePool.h b/assignment-client/src/audio/AudioMixerSlavePool.h index 7dd5304aade..d0de2734b5b 100644 --- a/assignment-client/src/audio/AudioMixerSlavePool.h +++ b/assignment-client/src/audio/AudioMixerSlavePool.h @@ -77,7 +77,7 @@ class AudioMixerWorkerThread : public QThread, public AudioMixerSlave { inline void stop() { _stop = true; } private: - void wait(); + void wait(bool starting); void notify(bool stopping); bool try_pop(SharedNodePointer& node); diff --git a/assignment-client/src/avatars/AvatarMixerSlavePool.cpp b/assignment-client/src/avatars/AvatarMixerSlavePool.cpp index 9b08c103fb3..aec678c62f2 100644 --- a/assignment-client/src/avatars/AvatarMixerSlavePool.cpp +++ b/assignment-client/src/avatars/AvatarMixerSlavePool.cpp @@ -18,6 +18,7 @@ void AvatarMixerWorkerThread::run() { assert(_data.numStarted < _data.numThreads); _data.numStarted++; + bool starting = true; while (true) { bool stopping = _stop; notify(stopping); @@ -25,7 +26,8 @@ void AvatarMixerWorkerThread::run() { return; } - wait(); + wait(starting); + starting = false; assert(_function); if (_function) { @@ -38,19 +40,19 @@ void AvatarMixerWorkerThread::run() { } } -void AvatarMixerWorkerThread::wait() { +void AvatarMixerWorkerThread::wait(bool starting) { assert(_data.numStarted <= _data.numThreads); { Lock workerLock(_data.workerMutex); - if (_data.numStarted == _data.numThreads) { + if (starting || _data.numStarted == _data.numThreads) { _data.workerCondition.wait(workerLock, [&] { assert(_data.numStarted <= _data.numThreads); return _data.numStarted != _data.numThreads; }); } + _data.numStarted++; } - _data.numStarted++; if (_data.configure) { _data.configure(*this); @@ -237,4 +239,5 @@ void AvatarMixerSlavePool::resize(int numThreads) { _data.numThreads = _data.numStarted = _data.numFinished = numThreads; assert(_data.numThreads == (int)_workers.size()); + qDebug("%s: completed", __FUNCTION__); } diff --git a/assignment-client/src/avatars/AvatarMixerSlavePool.h b/assignment-client/src/avatars/AvatarMixerSlavePool.h index 613699cba5c..5e4d436a081 100644 --- a/assignment-client/src/avatars/AvatarMixerSlavePool.h +++ b/assignment-client/src/avatars/AvatarMixerSlavePool.h @@ -80,7 +80,7 @@ class AvatarMixerWorkerThread : public QThread, public AvatarMixerSlave { inline void stop() { _stop = true; } private: - void wait(); + void wait(bool starting); void notify(bool stopping); bool try_pop(SharedNodePointer& node); From ecba866a0d9af571ba93ca52349dd18c08849f6d Mon Sep 17 00:00:00 2001 From: Heather Anderson Date: Tue, 28 Sep 2021 09:54:49 -0700 Subject: [PATCH 15/16] condition.wait is still letting threads through before it's time, starting to wonder if the passed condition is checked before entering the wait --- .../src/audio/AudioMixerSlavePool.cpp | 15 +++++++++------ .../src/avatars/AvatarMixerSlavePool.cpp | 13 ++++++++----- 2 files changed, 17 insertions(+), 11 deletions(-) diff --git a/assignment-client/src/audio/AudioMixerSlavePool.cpp b/assignment-client/src/audio/AudioMixerSlavePool.cpp index 2e179f791a0..7effb112e10 100644 --- a/assignment-client/src/audio/AudioMixerSlavePool.cpp +++ b/assignment-client/src/audio/AudioMixerSlavePool.cpp @@ -178,12 +178,15 @@ void AudioMixerSlavePool::resize(int numThreads) { assert(_data.numFinished == _data.numThreads); // start new slaves - while (numThreads > _data.numThreads) { - _data.numThreads++; - auto worker = new AudioMixerWorkerThread(_data, _workerSharedData); - QObject::connect(worker, &QThread::started, [] { setThreadName("AudioMixerSlaveThread"); }); - worker->start(); - _workers.emplace_back(worker); + { + Lock workerLock(_data.workerMutex); + while (numThreads > _data.numThreads) { + _data.numThreads++; + auto worker = new AudioMixerWorkerThread(_data, _workerSharedData); + QObject::connect(worker, &QThread::started, [] { setThreadName("AudioMixerSlaveThread"); }); + worker->start(); + _workers.emplace_back(worker); + } } // wait for the new workers to wake up and enter the wait diff --git a/assignment-client/src/avatars/AvatarMixerSlavePool.cpp b/assignment-client/src/avatars/AvatarMixerSlavePool.cpp index aec678c62f2..d006441d826 100644 --- a/assignment-client/src/avatars/AvatarMixerSlavePool.cpp +++ b/assignment-client/src/avatars/AvatarMixerSlavePool.cpp @@ -178,11 +178,14 @@ void AvatarMixerSlavePool::resize(int numThreads) { assert(_data.numFinished == _data.numThreads); // start new slaves - while (numThreads > _data.numThreads) { - _data.numThreads++; - auto worker = new AvatarMixerWorkerThread(_data, _slaveSharedData); - worker->start(); - _workers.emplace_back(worker); + { + Lock workerLock(_data.workerMutex); + while (numThreads > _data.numThreads) { + _data.numThreads++; + auto worker = new AvatarMixerWorkerThread(_data, _slaveSharedData); + worker->start(); + _workers.emplace_back(worker); + } } // wait for the new workers to wake up and enter the wait From f7e61b8ae20821d027e1b2f0e093137b6f28e6cc Mon Sep 17 00:00:00 2001 From: Heather Anderson Date: Sun, 7 Nov 2021 19:33:33 -0800 Subject: [PATCH 16/16] results of linux debugging, trying to get the wait conditions working as expected and verify the invariants --- .../src/audio/AudioMixerSlavePool.cpp | 11 ++++------- .../src/avatars/AvatarMixerSlavePool.cpp | 16 ++++++---------- 2 files changed, 10 insertions(+), 17 deletions(-) diff --git a/assignment-client/src/audio/AudioMixerSlavePool.cpp b/assignment-client/src/audio/AudioMixerSlavePool.cpp index 7effb112e10..966ab2897d7 100644 --- a/assignment-client/src/audio/AudioMixerSlavePool.cpp +++ b/assignment-client/src/audio/AudioMixerSlavePool.cpp @@ -32,7 +32,6 @@ void AudioMixerWorkerThread::run() { wait(starting); starting = false; - assert(_function); if (_function) { // iterate over all available nodes @@ -50,10 +49,9 @@ void AudioMixerWorkerThread::wait(bool starting) { { Lock workerLock(_data.workerMutex); if (starting || _data.numStarted == _data.numThreads) { - _data.workerCondition.wait(workerLock, [&] { - assert(_data.numStarted <= _data.numThreads); - return _data.numStarted != _data.numThreads; - }); + do { // this is equivalent to the two-parameter "wait" call, except we're doing the test afterwards + _data.workerCondition.wait(workerLock); + } while (_data.numStarted == _data.numThreads); } _data.numStarted++; } @@ -62,7 +60,6 @@ void AudioMixerWorkerThread::wait(bool starting) { _data.configure(*this); } _function = _data.function; - assert(_function); } void AudioMixerWorkerThread::notify(bool stopping) { @@ -147,7 +144,7 @@ void AudioMixerSlavePool::queueStats(QJsonObject& stats) { i++; } } -#endif // DEBUG_EVENT_QUEUE +#endif // DEBUG_EVENT_QUEUE void AudioMixerSlavePool::setNumThreads(int numThreads) { // clamp to allowed size diff --git a/assignment-client/src/avatars/AvatarMixerSlavePool.cpp b/assignment-client/src/avatars/AvatarMixerSlavePool.cpp index d006441d826..a1413b77a60 100644 --- a/assignment-client/src/avatars/AvatarMixerSlavePool.cpp +++ b/assignment-client/src/avatars/AvatarMixerSlavePool.cpp @@ -28,7 +28,6 @@ void AvatarMixerWorkerThread::run() { wait(starting); starting = false; - assert(_function); if (_function) { // iterate over all available nodes @@ -46,10 +45,9 @@ void AvatarMixerWorkerThread::wait(bool starting) { { Lock workerLock(_data.workerMutex); if (starting || _data.numStarted == _data.numThreads) { - _data.workerCondition.wait(workerLock, [&] { - assert(_data.numStarted <= _data.numThreads); - return _data.numStarted != _data.numThreads; - }); + do { // this is equivalent to the two-parameter "wait" call, except we're doing the test afterwards + _data.workerCondition.wait(workerLock); + } while (_data.numStarted == _data.numThreads); } _data.numStarted++; } @@ -58,7 +56,6 @@ void AvatarMixerWorkerThread::wait(bool starting) { _data.configure(*this); } _function = _data.function; - assert(_function); } void AvatarMixerWorkerThread::notify(bool stopping) { @@ -88,7 +85,7 @@ void AvatarMixerSlavePool::processIncomingPackets(ConstIter begin, ConstIter end run(begin, end); } -void AvatarMixerSlavePool::broadcastAvatarData(ConstIter begin, ConstIter end, +void AvatarMixerSlavePool::broadcastAvatarData(ConstIter begin, ConstIter end, p_high_resolution_clock::time_point lastFrameTimestamp, float maxKbpsPerNode, float throttlingRatio) { _data.function = &AvatarMixerSlave::broadcastAvatarData; @@ -129,7 +126,6 @@ void AvatarMixerSlavePool::run(ConstIter begin, ConstIter end) { assert(_data.queue.empty()); } - void AvatarMixerSlavePool::each(std::function functor) { for (auto& worker : _workers) { functor(*worker.get()); @@ -147,7 +143,7 @@ void AvatarMixerSlavePool::queueStats(QJsonObject& stats) { i++; } } -#endif // DEBUG_EVENT_QUEUE +#endif // DEBUG_EVENT_QUEUE void AvatarMixerSlavePool::setNumThreads(int numThreads) { // clamp to allowed size @@ -214,7 +210,7 @@ void AvatarMixerSlavePool::resize(int numThreads) { _data.numStarted = _data.numFinished = 0; _data.workerCondition.notify_all(); } - + { Lock poolLock(_data.poolMutex); if (_data.numFinished < _data.numThreads) {