Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimization coding pass over AudioMixerSlavePool and AvatarMixerSlavePool pt. 2. #1361

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
5a9abf5
copy the wait predicate into a test before waking the condition, to r…
odysseus654 Sep 23, 2021
e839857
breaking the pool _mutex into two smaller _slaveMutex and _poolMutex …
odysseus654 Sep 23, 2021
0c35b43
pull out data shared between threads / remove "friend" declarations
odysseus654 Sep 23, 2021
e60db78
add shared_mutex in an attempt to reduce the number of mutexes used i…
odysseus654 Sep 24, 2021
536b1ed
stripping out the shared_mutex for a contract-based threading model (…
odysseus654 Sep 24, 2021
aa2c307
suggested changes:
odysseus654 Sep 24, 2021
569afb1
resolve potential race condition
odysseus654 Sep 24, 2021
bc5d9de
renamed _pool to _data, to bring it more in line with other users
odysseus654 Sep 25, 2021
45d3312
remove last reference to _slaves
odysseus654 Sep 25, 2021
3a4e8a7
better management of numThreads when launching new workers
odysseus654 Sep 26, 2021
146da44
ensure that newly-created threads are properly configured before leav…
odysseus654 Sep 26, 2021
83f2b52
added some missed field initializations
odysseus654 Sep 26, 2021
df59a42
more tweaks to improve worker thread startup
odysseus654 Sep 27, 2021
5389d3f
New threads now unconditionally enter a wait without checking if numS…
odysseus654 Sep 28, 2021
ecba866
condition.wait is still letting threads through before it's time, sta…
odysseus654 Sep 28, 2021
f7e61b8
results of linux debugging, trying to get the wait conditions working…
odysseus654 Nov 8, 2021
1f9347d
Merge remote-tracking branch 'remotes/vircadia/master' into pr/pool-c…
odysseus654 Dec 4, 2021
20df505
Merge remote-tracking branch 'remotes/vircadia/master' into pr/pool-c…
odysseus654 Jan 25, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 26 additions & 26 deletions assignment-client/src/audio/AudioMixerSlavePool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,13 @@ void AudioMixerSlaveThread::run() {

void AudioMixerSlaveThread::wait() {
{
Lock lock(_pool._mutex);
_pool._slaveCondition.wait(lock, [&] {
Lock slaveLock(_pool._slaveMutex);
_pool._slaveCondition.wait(slaveLock, [&] {
assert(_pool._numStarted <= _pool._numThreads);
return _pool._numStarted != _pool._numThreads;
});
++_pool._numStarted;
}
++_pool._numStarted;

if (_pool._configure) {
_pool._configure(*this);
Expand All @@ -53,15 +53,16 @@ void AudioMixerSlaveThread::wait() {
}

void AudioMixerSlaveThread::notify(bool stopping) {
{
Lock lock(_pool._mutex);
assert(_pool._numFinished < _pool._numThreads);
++_pool._numFinished;
if (stopping) {
++_pool._numStopped;
}
assert(_pool._numFinished < _pool._numThreads && _pool._numFinished <= _pool._numStarted);
int numFinished = ++_pool._numFinished;
if (stopping) {
++_pool._numStopped;
assert(_pool._numStopped <= _pool._numFinished);
}

if (numFinished == _pool._numThreads) {
_pool._poolCondition.notify_one();
}
_pool._poolCondition.notify_one();
}

bool AudioMixerSlaveThread::try_pop(SharedNodePointer& node) {
Expand Down Expand Up @@ -92,21 +93,19 @@ void AudioMixerSlavePool::run(ConstIter begin, ConstIter end) {
_queue.push(node);
});

{
Lock lock(_mutex);
// run
_numStarted = _numFinished = 0;
_slaveCondition.notify_all();

// run
_numStarted = _numFinished = 0;
_slaveCondition.notify_all();

// wait
_poolCondition.wait(lock, [&] {
// wait
{
Lock poolLock(_poolMutex);
_poolCondition.wait(poolLock, [&] {
assert(_numFinished <= _numThreads);
return _numFinished == _numThreads;
});

assert(_numStarted == _numThreads);
}
assert(_numStarted == _numThreads);

assert(_queue.empty());
}
Expand Down Expand Up @@ -155,8 +154,6 @@ void AudioMixerSlavePool::resize(int numThreads) {

qDebug("%s: set %d threads (was %d)", __FUNCTION__, numThreads, _numThreads);

Lock lock(_mutex);

if (numThreads > _numThreads) {
// start new slaves
for (int i = 0; i < numThreads - _numThreads; ++i) {
Expand All @@ -171,19 +168,22 @@ void AudioMixerSlavePool::resize(int numThreads) {
// mark slaves to stop...
auto slave = extraBegin;
while (slave != _slaves.end()) {
(*slave)->_stop = true;
(*slave)->stop();
++slave;
}

// ...cycle them until they do stop...
_numStopped = 0;
while (_numStopped != (_numThreads - numThreads)) {
_numStarted = _numFinished = _numStopped;
_numStarted = _numFinished = 0;
_slaveCondition.notify_all();
_poolCondition.wait(lock, [&] {

Lock poolLock(_poolMutex);
_poolCondition.wait(poolLock, [&] {
assert(_numFinished <= _numThreads);
return _numFinished == _numThreads;
});
assert(_numStopped == (_numThreads - numThreads));
}

// ...wait for threads to finish...
Expand Down
69 changes: 44 additions & 25 deletions assignment-client/src/audio/AudioMixerSlavePool.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,46 @@

#include "AudioMixerSlave.h"

class AudioMixerSlavePool;
// Private slave pool data that is shared and accessible with the slave threads. This describes
// what information is needs to be thread-safe
struct AudioMixerSlavePoolData {
using Queue = tbb::concurrent_queue<SharedNodePointer>;
using Mutex = std::mutex;
using ConditionVariable = std::condition_variable;

// synchronization state
Mutex _poolMutex; // only used for _poolCondition at the moment
ConditionVariable _poolCondition; // woken when work has been completed (_numStarted = _numFinished = _numThreads)
Mutex _slaveMutex; // only used for _slaveCondition at the moment
ConditionVariable _slaveCondition; // woken when work needs to be done (_numStarted < _numThreads)

// The variables below this point are alternately owned by the pool or by the slaves collectively.
// When idle they are owned by the pool.
// Moving ownership to the slaves is done by setting _numStarted = _numFinished = 0 and waking _slaveCondition
// Moving ownership to the pool is done when _numFinished == _numThreads and is done by waking _poolCondition

void (AudioMixerSlave::*_function)(const SharedNodePointer& node); // r/o when owned by slaves, r/w when owned by pool
std::function<void(AudioMixerSlave&)> _configure; // r/o when owned by slaves, r/w when owned by pool

// Number of currently-running slave threads
// r/o when owned by slaves, r/w when owned by pool
int _numThreads{ 0 };

// Number of slave threads "awake" and processing the current request (0 <= _numStarted <= _numThreads)
// incremented when owned by slaves, r/w when owned by pool
std::atomic<int> _numStarted{ 0 };

// Number of slave threads finished with the current request (0 <= _numFinished <= _numStarted)
// incremented when owned by slaves, r/w when owned by pool
std::atomic<int> _numFinished{ 0 };

// Number of slave threads shutting down when asked to (0 <= _numStopped <= _numFinished)
// incremented when owned by slaves, r/w when owned by pool
std::atomic<int> _numStopped{ 0 };

// frame state
Queue _queue;
};

class AudioMixerSlaveThread : public QThread, public AudioMixerSlave {
Q_OBJECT
Expand All @@ -31,30 +70,26 @@ class AudioMixerSlaveThread : public QThread, public AudioMixerSlave {
using Lock = std::unique_lock<Mutex>;

public:
AudioMixerSlaveThread(AudioMixerSlavePool& pool, AudioMixerSlave::SharedData& sharedData)
AudioMixerSlaveThread(AudioMixerSlavePoolData& pool, AudioMixerSlave::SharedData& sharedData)
: AudioMixerSlave(sharedData), _pool(pool) {}

void run() override final;
inline void stop() { _stop = true; }

private:
friend class AudioMixerSlavePool;

void wait();
void notify(bool stopping);
bool try_pop(SharedNodePointer& node);

AudioMixerSlavePool& _pool;
AudioMixerSlavePoolData& _pool;
void (AudioMixerSlave::*_function)(const SharedNodePointer& node) { nullptr };
bool _stop { false };
};

// Slave pool for audio mixers
// AudioMixerSlavePool is not thread-safe! It should be instantiated and used from a single thread.
class AudioMixerSlavePool {
using Queue = tbb::concurrent_queue<SharedNodePointer>;
using Mutex = std::mutex;
class AudioMixerSlavePool : private AudioMixerSlavePoolData {
using Lock = std::unique_lock<Mutex>;
using ConditionVariable = std::condition_variable;

public:
using ConstIter = NodeList::const_iterator;
Expand Down Expand Up @@ -85,23 +120,7 @@ class AudioMixerSlavePool {

std::vector<std::unique_ptr<AudioMixerSlaveThread>> _slaves;

friend void AudioMixerSlaveThread::wait();
friend void AudioMixerSlaveThread::notify(bool stopping);
friend bool AudioMixerSlaveThread::try_pop(SharedNodePointer& node);

// synchronization state
Mutex _mutex;
ConditionVariable _slaveCondition;
ConditionVariable _poolCondition;
void (AudioMixerSlave::*_function)(const SharedNodePointer& node);
std::function<void(AudioMixerSlave&)> _configure;
int _numThreads { 0 };
int _numStarted { 0 }; // guarded by _mutex
int _numFinished { 0 }; // guarded by _mutex
int _numStopped { 0 }; // guarded by _mutex

// frame state
Queue _queue;
ConstIter _begin;
ConstIter _end;

Expand Down
61 changes: 31 additions & 30 deletions assignment-client/src/avatars/AvatarMixerSlavePool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,29 +34,31 @@ void AvatarMixerSlaveThread::run() {

void AvatarMixerSlaveThread::wait() {
{
Lock lock(_pool._mutex);
_pool._slaveCondition.wait(lock, [&] {
Lock slaveLock(_pool._slaveMutex);
_pool._slaveCondition.wait(slaveLock, [&] {
assert(_pool._numStarted <= _pool._numThreads);
return _pool._numStarted != _pool._numThreads;
});
++_pool._numStarted;
}
++_pool._numStarted;

if (_pool._configure) {
_pool._configure(*this);
}
_function = _pool._function;
}

void AvatarMixerSlaveThread::notify(bool stopping) {
{
Lock lock(_pool._mutex);
assert(_pool._numFinished < _pool._numThreads);
++_pool._numFinished;
if (stopping) {
++_pool._numStopped;
}
assert(_pool._numFinished < _pool._numThreads && _pool._numFinished <= _pool._numStarted);
int numFinished = ++_pool._numFinished;
if (stopping) {
++_pool._numStopped;
assert(_pool._numStopped <= _pool._numFinished);
}

if (numFinished == _pool._numThreads) {
_pool._poolCondition.notify_one();
}
_pool._poolCondition.notify_one();
}

bool AvatarMixerSlaveThread::try_pop(SharedNodePointer& node) {
Expand All @@ -65,7 +67,7 @@ bool AvatarMixerSlaveThread::try_pop(SharedNodePointer& node) {

void AvatarMixerSlavePool::processIncomingPackets(ConstIter begin, ConstIter end) {
_function = &AvatarMixerSlave::processIncomingPackets;
_configure = [=](AvatarMixerSlave& slave) {
_configure = [=](AvatarMixerSlave& slave) {
slave.configure(begin, end);
};
run(begin, end);
Expand All @@ -75,10 +77,10 @@ void AvatarMixerSlavePool::broadcastAvatarData(ConstIter begin, ConstIter end,
p_high_resolution_clock::time_point lastFrameTimestamp,
float maxKbpsPerNode, float throttlingRatio) {
_function = &AvatarMixerSlave::broadcastAvatarData;
_configure = [=](AvatarMixerSlave& slave) {
_configure = [=](AvatarMixerSlave& slave) {
slave.configureBroadcast(begin, end, lastFrameTimestamp, maxKbpsPerNode, throttlingRatio,
_priorityReservedFraction);
};
_priorityReservedFraction);
HifiExperiments marked this conversation as resolved.
Show resolved Hide resolved
};
run(begin, end);
}

Expand All @@ -91,21 +93,19 @@ void AvatarMixerSlavePool::run(ConstIter begin, ConstIter end) {
_queue.push(node);
});

{
Lock lock(_mutex);

// run
_numStarted = _numFinished = 0;
_slaveCondition.notify_all();
// run
_numStarted = _numFinished = 0;
_slaveCondition.notify_all();

// wait
_poolCondition.wait(lock, [&] {
// wait
{
Lock poolLock(_poolMutex);
_poolCondition.wait(poolLock, [&] {
assert(_numFinished <= _numThreads);
return _numFinished == _numThreads;
});

assert(_numStarted == _numThreads);
}
assert(_numStarted == _numThreads);

assert(_queue.empty());
}
Expand Down Expand Up @@ -155,8 +155,6 @@ void AvatarMixerSlavePool::resize(int numThreads) {

qDebug("%s: set %d threads (was %d)", __FUNCTION__, numThreads, _numThreads);

Lock lock(_mutex);

if (numThreads > _numThreads) {
// start new slaves
for (int i = 0; i < numThreads - _numThreads; ++i) {
Expand All @@ -170,19 +168,22 @@ void AvatarMixerSlavePool::resize(int numThreads) {
// mark slaves to stop...
auto slave = extraBegin;
while (slave != _slaves.end()) {
(*slave)->_stop = true;
(*slave)->stop();
++slave;
}

// ...cycle them until they do stop...
_numStopped = 0;
while (_numStopped != (_numThreads - numThreads)) {
_numStarted = _numFinished = _numStopped;
_numStarted = _numFinished = 0;
_slaveCondition.notify_all();
_poolCondition.wait(lock, [&] {

Lock poolLock(_poolMutex);
_poolCondition.wait(poolLock, [&] {
assert(_numFinished <= _numThreads);
return _numFinished == _numThreads;
});
assert(_numStopped == (_numThreads - numThreads));
}

// ...wait for threads to finish...
Expand Down
Loading