Skip to content

Commit 3db7800

Browse files
authored
Merge branch 'finos:master' into master
2 parents 49b9465 + 83d6acd commit 3db7800

37 files changed

+466
-237
lines changed

CMakeLists.txt

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -442,7 +442,6 @@ set(FILES
442442
utils/IdGenerator.h
443443
utils/MersienneRandom.h
444444
utils/Optional.h
445-
utils/Pacer.cpp
446445
utils/Pacer.h
447446
utils/ScopedFileHandle.h
448447
utils/ScopedInvariantChecker.h
@@ -502,8 +501,6 @@ set(TEST_LIB_FILES
502501
test/integration/FFTanalysis.cpp
503502
test/sctp/SctpEndpoint.h
504503
test/sctp/SctpEndpoint.cpp
505-
test/transport/SrtpProtectJob.cpp
506-
test/transport/SrtpProtectJob.h
507504
test/transport/SrtpUnprotectJob.cpp
508505
test/transport/SrtpUnprotectJob.h
509506
test/transport/FakeNetwork.h

bridge/engine/DiscardReceivedVideoPacketJob.cpp

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,14 +34,13 @@ void DiscardReceivedVideoPacketJob::run()
3434
return;
3535
}
3636

37-
const bool noPacketsProcessedYet =
38-
(_ssrcContext.packetsProcessed == 0 && _ssrcContext.lastReceivedExtendedSequenceNumber == 0 &&
39-
_ssrcContext.lastUnprotectedExtendedSequenceNumber == 0);
40-
41-
if (noPacketsProcessedYet && (_extendedSequenceNumber >> 16) == 0)
37+
if (!_ssrcContext.hasDecryptedPackets)
4238
{
43-
_sender->unprotect(*_packet); // make sure srtp sees one packet with ROC=0
44-
_ssrcContext.lastUnprotectedExtendedSequenceNumber = _extendedSequenceNumber;
39+
if (_sender->unprotectFirstRtp(*_packet, _ssrcContext.rocOffset)) // make sure srtp sees one packet with ROC=0
40+
{
41+
_ssrcContext.lastUnprotectedExtendedSequenceNumber = _extendedSequenceNumber;
42+
_ssrcContext.hasDecryptedPackets = true;
43+
}
4544
}
4645

4746
_ssrcContext.onRtpPacketReceived(_timestamp);

bridge/engine/EngineMixer.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ EngineMixer::EngineMixer(const std::string& id,
5959
_lastReceiveTimeOnRegularTransports(_lastStartedIterationTimestamp),
6060
_lastReceiveTimeOnBarbellTransports(_lastStartedIterationTimestamp),
6161
_lastSendTimeOfUserMediaMapMessageOverBarbells(_lastStartedIterationTimestamp),
62+
_lastCounterCheck(0),
6263
_engineStreamDirector(std::make_unique<EngineStreamDirector>(_loggableId.getInstanceId(), config, lastN)),
6364
_activeMediaList(std::make_unique<ActiveMediaList>(_loggableId.getInstanceId(),
6465
audioSsrcs,

bridge/engine/ProcessMissingVideoPacketsJob.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,7 @@ ProcessMissingVideoPacketsJob::ProcessMissingVideoPacketsJob(SsrcInboundContext&
2626
void ProcessMissingVideoPacketsJob::run()
2727
{
2828
auto timestamp = utils::Time::getAbsoluteTime();
29-
auto* videoMissingPacketsTracker = _ssrcContext.videoMissingPacketsTracker.get();
30-
if (!videoMissingPacketsTracker)
29+
if (!_ssrcContext.videoMissingPacketsTracker)
3130
{
3231
return;
3332
}

bridge/engine/RtpForwarderReceiveBaseJob.cpp

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,26 @@ RtpForwarderReceiveBaseJob::RtpForwarderReceiveBaseJob(memory::UniquePacket&& pa
2626

2727
bool RtpForwarderReceiveBaseJob::tryUnprotectRtpPacket(const char* logGroup)
2828
{
29+
if (!_ssrcContext.hasDecryptedPackets)
30+
{
31+
if (_sender->unprotectFirstRtp(*_packet, _ssrcContext.rocOffset))
32+
{
33+
_ssrcContext.lastUnprotectedExtendedSequenceNumber = _extendedSequenceNumber;
34+
_ssrcContext.hasDecryptedPackets = true;
35+
return true;
36+
}
37+
else
38+
{
39+
return false;
40+
}
41+
}
42+
2943
if (transport::SrtpClient::shouldSetRolloverCounter(_ssrcContext.lastUnprotectedExtendedSequenceNumber,
3044
_extendedSequenceNumber))
3145
{
32-
const uint32_t oldRolloverCounter = _ssrcContext.lastUnprotectedExtendedSequenceNumber >> 16;
33-
const uint32_t newRolloverCounter = _extendedSequenceNumber >> 16;
46+
const uint32_t oldRolloverCounter =
47+
_ssrcContext.rocOffset + (_ssrcContext.lastUnprotectedExtendedSequenceNumber >> 16);
48+
const uint32_t newRolloverCounter = _ssrcContext.rocOffset + (_extendedSequenceNumber >> 16);
3449

3550
logger::info("Setting rollover counter for ssrc %u, extseqno %u->%u, seqno %u->%u, roc %u->%u, %s",
3651
logGroup,

bridge/engine/SsrcInboundContext.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,9 @@ class SsrcInboundContext
4646
defaultLevelSsrc(defaultLevelSsrc),
4747
markNextPacket(true),
4848
lastReceivedExtendedSequenceNumber(0),
49-
packetsProcessed(0),
49+
hasDecryptedPackets(false),
5050
lastUnprotectedExtendedSequenceNumber(0),
51+
rocOffset(0),
5152
activeMedia(false),
5253
inactiveTransitionCount(0),
5354
isSsrcUsed(true),
@@ -109,8 +110,9 @@ class SsrcInboundContext
109110
// transport thread variables ===================================
110111
bool markNextPacket;
111112
uint32_t lastReceivedExtendedSequenceNumber;
112-
uint32_t packetsProcessed;
113+
bool hasDecryptedPackets;
113114
uint32_t lastUnprotectedExtendedSequenceNumber;
115+
uint32_t rocOffset; // srtp packets with roc=0 were lost
114116
std::shared_ptr<VideoMissingPacketsTracker> videoMissingPacketsTracker;
115117
std::unique_ptr<codec::OpusDecoder> opusDecoder; // used for missing audio level
116118
std::unique_ptr<utils::AvgRateTracker> opusPacketRate; // pkt/s

bridge/engine/VideoForwarderReceiveJob.cpp

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,9 +67,7 @@ void VideoForwarderReceiveJob::run()
6767
? codec::H264Header::isKeyFrame(payload, payloadSize)
6868
: codec::Vp8Header::isKeyFrame(payload, codec::Vp8Header::getPayloadDescriptorSize(payload, payloadSize));
6969

70-
++_ssrcContext.packetsProcessed;
71-
72-
if (_ssrcContext.packetsProcessed == 1)
70+
if (!_ssrcContext.videoMissingPacketsTracker)
7371
{
7472
_ssrcContext.lastReceivedExtendedSequenceNumber = _extendedSequenceNumber - 1;
7573
_ssrcContext.videoMissingPacketsTracker = std::make_shared<VideoMissingPacketsTracker>();

concurrency/LockFreeList.cpp

Lines changed: 39 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -4,46 +4,44 @@ namespace concurrency
44
{
55

66
std::atomic_uint32_t LockFreeList::_versionCounter(1);
7-
LockFreeList::LockFreeList() : _head(&_eol), _tail(&_eol), _count(0)
7+
LockFreeList::LockFreeList() : _count(0)
88
{
9-
_eol._next = &_eol;
9+
_head.store(VersionedPtr<ListItem>(&_eol, 0));
10+
_tail.store(VersionedPtr<ListItem>(&_eol, 0));
11+
_eol._next = VersionedPtr<ListItem>(&_eol, 0);
1012
_cacheLineSeparator[0] = 0xBA;
1113
}
1214

1315
// Possible to push a list of items too
1416
bool LockFreeList::push(ListItem* item)
1517
{
16-
ListItem* last = nullptr;
1718
const uint32_t version = _versionCounter.fetch_add(1);
1819
int count = 0;
1920

20-
for (auto p = item; p; p = p->_next.load())
21+
auto itemNode = VersionedPtr<ListItem>(item, version);
22+
VersionedPtr<ListItem> lastNode;
23+
for (auto p = itemNode; p; p = p->_next.load())
2124
{
2225
++count;
23-
const auto nextPtr = p->_next.load(std::memory_order::memory_order_relaxed);
24-
if (nextPtr == nullptr)
26+
auto nextPtr = p->_next.load(std::memory_order::memory_order_acquire);
27+
if (!nextPtr)
2528
{
26-
last = p;
27-
p->_next.store(makeVersionedPointer(&_eol, version));
29+
lastNode = VersionedPtr<ListItem>(p.get(), version);
30+
p->_next.store(VersionedPtr<ListItem>(&_eol, version), std::memory_order::memory_order_release);
2831
break;
2932
}
3033
else
3134
{
32-
p->_next.store(makeVersionedPointer(nextPtr, version));
35+
p->_next.store(VersionedPtr<ListItem>(nextPtr.get(), version), std::memory_order::memory_order_release);
3336
}
3437
}
3538

36-
auto itemNode = makeVersionedPointer(item, version);
37-
auto lastNode = makeVersionedPointer(last, version);
38-
3939
// move tail to our new tail, retry until we make it
4040
auto prevTailNode = _tail.load();
41-
auto pPrevTail = getPointer(prevTailNode);
42-
auto prevTailNext = pPrevTail->_next.load();
41+
auto prevTailNext = prevTailNode->_next.load();
4342
while (!_tail.compare_exchange_weak(prevTailNode, lastNode))
4443
{
45-
pPrevTail = getPointer(prevTailNode);
46-
prevTailNext = pPrevTail->_next;
44+
prevTailNext = prevTailNode->_next;
4745
}
4846

4947
// prevTail may be one of: &this->_eol, prev node that may be in list or popped already
@@ -52,8 +50,8 @@ bool LockFreeList::push(ListItem* item)
5250
// if it was popped and reinserted and next is still _eol, the version will differ, and we attach to head.
5351
// Otherwise we would detach ourselves and the rest. if we set next first, popper must notice and move head
5452

55-
if (pPrevTail != &_eol && getPointer(prevTailNext) == &_eol &&
56-
pPrevTail->_next.compare_exchange_strong(prevTailNext, itemNode))
53+
if (prevTailNode.get() != &_eol && prevTailNext.get() == &_eol &&
54+
prevTailNode->_next.compare_exchange_strong(prevTailNext, itemNode))
5755
{
5856
// if we won this, the popper will have to move head to us
5957
_count.fetch_add(count, std::memory_order::memory_order_relaxed);
@@ -63,7 +61,7 @@ bool LockFreeList::push(ListItem* item)
6361
{
6462
// prevTailNext is now indicating either nullptr, eol in another list, eol in this list but another version so
6563
// re-inserted at end popped empty, we must attach to head
66-
_head.store(itemNode);
64+
_head.store(itemNode, std::memory_order_release);
6765
_count.fetch_add(count, std::memory_order::memory_order_relaxed);
6866
return true;
6967
}
@@ -73,66 +71,59 @@ bool LockFreeList::push(ListItem* item)
7371
empty list, head points to _eol
7472
return false
7573
76-
Otherwise, we iterate down the list and try to lok an item. If a next pointer is not pointing to node of proper
77-
version we restart from head. Version always have to be checked after locking.
78-
79-
= Popping the tail item =
80-
if we locked tail item we can try to set tail back to head but it may already have been set to a new tail
81-
We can try to set the head past our item but it may have been moved to earlier item or later item. In case of
82-
earlier, we retry. If head has been moved back to eol we cannot decide unless head ptr is still versioned. eol
83-
item does not have to be versioned.
84-
85-
= Popping first item or mid item
86-
There is no difference. Once item is locked we try to move the pointer to next as long as the version of pointer
87-
is less than our item's version
74+
Otherwise, we load _head, next of head, and try to update _head to point to next. That will fail if _head changed and we
75+
retry.
76+
- Once popped, we cannot tell if we popped the _tail also. We do not know if others are pushing to our next.
77+
- We try to update _tail in case it pointed to our node, but it may have moved on and there is nothing we can do.
78+
It may have moved on before we checked it.
79+
- Now if we fail to update our next from the next pointer we had before pop, it means nodes have been added to our next
80+
because our node was the tail node at some point. But that also means we moved the _head to eol. All we can do is to
81+
have _head point to the new list added to our tail.
8882
8983
*/
9084
bool LockFreeList::pop(ListItem*& item)
9185
{
92-
ListItem* pCurrent = nullptr;
93-
ListItem* nextNode = nullptr;
94-
ListItem* currentNode = _head.load(std::memory_order::memory_order_consume);
86+
VersionedPtr<ListItem> nextNode;
87+
auto nodeToPop = _head.load(std::memory_order::memory_order_acquire);
9588
for (;;)
9689
{
97-
pCurrent = getPointer(currentNode);
98-
if (pCurrent == &_eol)
90+
if (nodeToPop.get() == &_eol)
9991
{
10092
return false;
10193
}
10294

103-
nextNode = pCurrent->_next.load();
104-
if (nextNode == nullptr) // cheap check instead of CAS
95+
nextNode = nodeToPop->_next.load();
96+
if (!nextNode) // cheap check if node is popped, instead of CAS
10597
{
106-
currentNode = _head.load();
98+
nodeToPop = _head.load();
10799
continue;
108100
}
109101

110-
if (_head.compare_exchange_weak(currentNode, nextNode))
102+
if (_head.compare_exchange_weak(nodeToPop, nextNode))
111103
{
112104
break;
113105
}
114106
}
115-
// we won the node for popping and the next pointer is from that time
116107

117108
{
118109
auto tail = _tail.load();
119-
if (tail == currentNode)
110+
if (tail == nodeToPop)
120111
{
121112
_tail.compare_exchange_strong(tail, nextNode);
122113
// else tail moved ahead already
123114
}
124115
}
125116

126-
// neither head nor tail can point to this node. We own it.
127-
// but a pusher may be trying to add to next
128-
if (!pCurrent->_next.compare_exchange_strong(nextNode, nullptr))
117+
// Neither head nor tail can point to this node now. We own it.
118+
// But a pusher may have added to our tail, before we changed _tail
119+
if (!nodeToPop->_next.compare_exchange_strong(nextNode, VersionedPtr<ListItem>()))
129120
{
130-
// pusher won. tail was added to our node. We have to fix head since we moved it to eol
121+
// A tail was added to our node. We have to fix _head since we moved it to eol
131122
_head.store(nextNode);
132-
pCurrent->_next.store(nullptr, std::memory_order::memory_order_relaxed);
123+
nodeToPop->_next.store(VersionedPtr<ListItem>(), std::memory_order::memory_order_release);
133124
}
134125

135-
item = pCurrent;
126+
item = nodeToPop.get();
136127
_count.fetch_sub(1, std::memory_order::memory_order_relaxed);
137128
return true;
138129
}

concurrency/LockFreeList.h

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,10 @@ namespace concurrency
1111
class ListItem
1212
{
1313
public:
14-
ListItem()
15-
{
16-
// _next must be initialied atomically. Cannot use constructor
17-
_next = nullptr;
18-
}
14+
ListItem() { _next = VersionedPtr<ListItem>(); }
1915

2016
private:
21-
std::atomic<ListItem*> _next;
17+
std::atomic<VersionedPtr<ListItem>> _next;
2218
friend class LockFreeList;
2319
};
2420

@@ -35,13 +31,13 @@ class LockFreeList
3531
bool push(ListItem* item);
3632
bool pop(ListItem*& item);
3733

38-
bool empty() const { return getPointer(_head.load()) == &_eol; }
34+
bool empty() const { return _head.load().get() == &_eol; }
3935
uint32_t size() const { return _count; }
4036

4137
private:
42-
std::atomic<ListItem*> _head;
38+
std::atomic<VersionedPtr<ListItem>> _head;
4339
uint64_t _cacheLineSeparator[7];
44-
std::atomic<ListItem*> _tail;
40+
std::atomic<VersionedPtr<ListItem>> _tail;
4541
ListItem _eol;
4642
std::atomic_uint32_t _count;
4743
static std::atomic_uint32_t _versionCounter;

0 commit comments

Comments
 (0)