Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat rtp processor #3063 #3064

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 18 additions & 12 deletions dtlstransport.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,36 +530,42 @@
func (t *DTLSTransport) streamsForSSRC(
ssrc SSRC,
streamInfo interceptor.StreamInfo,
) (*srtp.ReadStreamSRTP, interceptor.RTPReader, *srtp.ReadStreamSRTCP, interceptor.RTCPReader, error) {
) (*srtp.ReadStreamSRTP, interceptor.RTPReader, interceptor.RTPProcessor, *srtp.ReadStreamSRTCP, interceptor.RTCPReader, error) {

Check failure on line 533 in dtlstransport.go

View workflow job for this annotation

GitHub Actions / lint / Go

The line is 129 characters long, which exceeds the maximum of 120 characters. (lll)
srtpSession, err := t.getSRTPSession()
if err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, nil, nil, err
}

rtpReadStream, err := srtpSession.OpenReadStream(uint32(ssrc))
if err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, nil, nil, err

Check warning on line 541 in dtlstransport.go

View check run for this annotation

Codecov / codecov/patch

dtlstransport.go#L541

Added line #L541 was not covered by tests
}

rtpInterceptor := t.api.interceptor.BindRemoteStream(
rtpProcessor := t.api.interceptor.BindRemoteStream(
&streamInfo,
interceptor.RTPReaderFunc(
func(in []byte, a interceptor.Attributes) (n int, attributes interceptor.Attributes, err error) {
n, err = rtpReadStream.Read(in)

return n, a, err
interceptor.RTPProcessorFunc(
func(s int, in []byte, a interceptor.Attributes) (n int, attributes interceptor.Attributes, err error) {

Check failure on line 547 in dtlstransport.go

View workflow job for this annotation

GitHub Actions / lint / Go

unused-parameter: parameter 'in' seems to be unused, consider removing or renaming it as _ (revive)
return s, a, nil
},
),
)

rtpReader := interceptor.RTPReaderFunc(
func(in []byte, a interceptor.Attributes) (n int, attributes interceptor.Attributes, err error) {
n, err = rtpReadStream.Read(in)

return n, a, err
},
)

srtcpSession, err := t.getSRTCPSession()
if err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, nil, nil, err

Check warning on line 563 in dtlstransport.go

View check run for this annotation

Codecov / codecov/patch

dtlstransport.go#L563

Added line #L563 was not covered by tests
}

rtcpReadStream, err := srtcpSession.OpenReadStream(uint32(ssrc))
if err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, nil, nil, err

Check warning on line 568 in dtlstransport.go

View check run for this annotation

Codecov / codecov/patch

dtlstransport.go#L568

Added line #L568 was not covered by tests
}

rtcpInterceptor := t.api.interceptor.BindRTCPReader(interceptor.RTCPReaderFunc(
Expand All @@ -570,5 +576,5 @@
}),
)

return rtpReadStream, rtpInterceptor, rtcpReadStream, rtcpInterceptor, nil
return rtpReadStream, rtpReader, rtpProcessor, rtcpReadStream, rtcpInterceptor, nil
}
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,5 @@ require (
golang.org/x/sys v0.30.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

replace github.com/pion/interceptor v0.1.37 => github.com/arjunshajitech/interceptor v0.0.0-20250313131735-36472c6290cb
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
github.com/arjunshajitech/interceptor v0.0.0-20250312123050-c9d476a510a1 h1:5p3Tm/VZUdN8aqLJp1noK/fAqggXJBHSsWXQJbksmw0=
github.com/arjunshajitech/interceptor v0.0.0-20250312123050-c9d476a510a1/go.mod h1:tYRp/5W3dEUrbYzdB49i4WictfIG2eEOSoFCb+oJAHY=
github.com/arjunshajitech/interceptor v0.0.0-20250313131735-36472c6290cb h1:qu70eQhcmCvNkrzYeVTDXS1RGmt14Qu5vo+sQH+q16w=
github.com/arjunshajitech/interceptor v0.0.0-20250313131735-36472c6290cb/go.mod h1:tYRp/5W3dEUrbYzdB49i4WictfIG2eEOSoFCb+oJAHY=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down
26 changes: 22 additions & 4 deletions interceptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,15 @@
},
)
},
BindRemoteStreamFn: func(_ *interceptor.StreamInfo, reader interceptor.RTPReader) interceptor.RTPReader {
return interceptor.RTPReaderFunc(func(b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) {
BindRemoteStreamFn: func(_ *interceptor.StreamInfo, reader interceptor.RTPProcessor) interceptor.RTPProcessor {
return interceptor.RTPProcessorFunc(func(i int, b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) {

Check failure on line 55 in interceptor_test.go

View workflow job for this annotation

GitHub Actions / lint / Go

The line is 128 characters long, which exceeds the maximum of 120 characters. (lll)
if a == nil {
a = interceptor.Attributes{}
}

a.Set("attribute", "value")

return reader.Read(b, a)
return reader.Process(i, b, a)
})
},
}, nil
Expand Down Expand Up @@ -146,7 +146,7 @@
UnbindLocalStreamFn: func(*interceptor.StreamInfo) {
atomic.AddUint32(&cntUnbindLocalStream, 1)
},
BindRemoteStreamFn: func(_ *interceptor.StreamInfo, reader interceptor.RTPReader) interceptor.RTPReader {
BindRemoteStreamFn: func(_ *interceptor.StreamInfo, reader interceptor.RTPProcessor) interceptor.RTPProcessor {
atomic.AddUint32(&cntBindRemoteStream, 1)

return reader
Expand Down Expand Up @@ -413,6 +413,24 @@
close(done)
})

pcOfferConnected := make(chan struct{})
pcAnswerConnected := make(chan struct{})

pc1.OnConnectionStateChange(func(state PeerConnectionState) {
if state == PeerConnectionStateConnected {
close(pcOfferConnected)
}
})

pc2.OnConnectionStateChange(func(state PeerConnectionState) {
if state == PeerConnectionStateConnected {
close(pcAnswerConnected)
}
})

<-pcOfferConnected
<-pcAnswerConnected

go func() {
for i := 0; i < numPackets; i++ {
time.Sleep(20 * time.Millisecond)
Expand Down
9 changes: 5 additions & 4 deletions peerconnection.go
Original file line number Diff line number Diff line change
Expand Up @@ -1732,7 +1732,7 @@
params.Codecs[0].RTPCodecCapability,
params.HeaderExtensions,
)
readStream, interceptor, rtcpReadStream, rtcpInterceptor, err := pc.dtlsTransport.streamsForSSRC(ssrc, *streamInfo)
readStream, rtpReader, rtpProcessor, rtcpReadStream, rtcpInterceptor, err := pc.dtlsTransport.streamsForSSRC(ssrc, *streamInfo)

Check failure on line 1735 in peerconnection.go

View workflow job for this annotation

GitHub Actions / lint / Go

The line is 128 characters long, which exceeds the maximum of 120 characters. (lll)
if err != nil {
return err
}
Expand All @@ -1746,7 +1746,7 @@
readCount--
}

i, _, err := interceptor.Read(b, nil)
i, _, err := rtpReader.Read(b, nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -1775,15 +1775,16 @@
receiver.mu.Lock()
defer receiver.mu.Unlock()

return receiver.receiveForRtx(SSRC(0), rsid, streamInfo, readStream, interceptor, rtcpReadStream, rtcpInterceptor)
return receiver.receiveForRtx(SSRC(0), rsid, streamInfo, readStream, rtpReader, rtpProcessor, rtcpReadStream, rtcpInterceptor)

Check failure on line 1778 in peerconnection.go

View workflow job for this annotation

GitHub Actions / lint / Go

The line is 130 characters long, which exceeds the maximum of 120 characters. (lll)
}

track, err := receiver.receiveForRid(
rid,
params,
streamInfo,
readStream,
interceptor,
rtpReader,
rtpProcessor,
rtcpReadStream,
rtcpInterceptor,
)
Expand Down
10 changes: 8 additions & 2 deletions peerconnection_media_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,11 @@

go func() {
for {
time.Sleep(time.Millisecond * 100)
if pcOffer.ICEConnectionState() != ICEConnectionStateConnected {
time.Sleep(time.Millisecond * 100)
continue

Check failure on line 155 in peerconnection_media_test.go

View workflow job for this annotation

GitHub Actions / lint / Go

continue with no blank line before (nlreturn)
}

if routineErr := vp8Track.WriteSample(media.Sample{Data: []byte{0x00}, Duration: time.Second}); routineErr != nil {
fmt.Println(routineErr)
}
Expand All @@ -169,6 +170,12 @@
}()

go func() {
for {
if pcOffer.ICEConnectionState() == ICEConnectionStateConnected {
break
}
time.Sleep(time.Millisecond * 100)
}
parameters := sender.GetParameters()

for {
Expand All @@ -190,7 +197,6 @@
}
}
}()

go func() {
if _, _, routineErr := sender.Read(make([]byte, 1400)); routineErr == nil {
close(awaitRTCPSenderRecv)
Expand Down
50 changes: 36 additions & 14 deletions rtpreceiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,16 @@

streamInfo, repairStreamInfo *interceptor.StreamInfo

rtpReadStream *srtp.ReadStreamSRTP
rtpInterceptor interceptor.RTPReader
rtpReadStream *srtp.ReadStreamSRTP
rtpReader interceptor.RTPReader
rtpProcessor interceptor.RTPProcessor

rtcpReadStream *srtp.ReadStreamSRTCP
rtcpInterceptor interceptor.RTCPReader

repairReadStream *srtp.ReadStreamSRTP
repairInterceptor interceptor.RTPReader
repairReader interceptor.RTPReader
repairProcessor interceptor.RTPProcessor
repairStreamChannel chan rtxPacketWithAttributes

repairRtcpReadStream *srtp.ReadStreamSRTCP
Expand Down Expand Up @@ -228,13 +230,13 @@
var err error

//nolint:lll // # TODO refactor
if streams.rtpReadStream, streams.rtpInterceptor, streams.rtcpReadStream, streams.rtcpInterceptor, err = r.transport.streamsForSSRC(parameters.Encodings[i].SSRC, *streams.streamInfo); err != nil {
if streams.rtpReadStream, streams.rtpReader, streams.rtpProcessor, streams.rtcpReadStream, streams.rtcpInterceptor, err = r.transport.streamsForSSRC(parameters.Encodings[i].SSRC, *streams.streamInfo); err != nil {
return err
}

if rtxSsrc := parameters.Encodings[i].RTX.SSRC; rtxSsrc != 0 {
streamInfo := createStreamInfo("", rtxSsrc, 0, 0, 0, 0, 0, codec, globalParams.HeaderExtensions)
rtpReadStream, rtpInterceptor, rtcpReadStream, rtcpInterceptor, err := r.transport.streamsForSSRC(
rtpReadStream, rtpReader, rtpProcessor, rtcpReadStream, rtcpInterceptor, err := r.transport.streamsForSSRC(
rtxSsrc,
*streamInfo,
)
Expand All @@ -247,7 +249,8 @@
"",
streamInfo,
rtpReadStream,
rtpInterceptor,
rtpReader,
rtpProcessor,
rtcpReadStream,
rtcpInterceptor,
); err != nil {
Expand Down Expand Up @@ -412,7 +415,11 @@
}

if t := r.streamsForTrack(reader); t != nil {
return t.rtpInterceptor.Read(b, a)
i, attr, err := t.rtpReader.Read(b, a)
if err != nil {
return 0, nil, err
}
return t.rtpProcessor.Process(i, b, attr)

Check failure on line 422 in rtpreceiver.go

View workflow job for this annotation

GitHub Actions / lint / Go

return with no blank line before (nlreturn)
}

return 0, nil, fmt.Errorf("%w: %d", errRTPReceiverWithSSRCTrackStreamNotFound, reader.SSRC())
Expand All @@ -425,7 +432,8 @@
params RTPParameters,
streamInfo *interceptor.StreamInfo,
rtpReadStream *srtp.ReadStreamSRTP,
rtpInterceptor interceptor.RTPReader,
rtpReader interceptor.RTPReader,
rtpProcessor interceptor.RTPProcessor,
rtcpReadStream *srtp.ReadStreamSRTCP,
rtcpInterceptor interceptor.RTCPReader,
) (*TrackRemote, error) {
Expand All @@ -443,7 +451,8 @@

r.tracks[i].streamInfo = streamInfo
r.tracks[i].rtpReadStream = rtpReadStream
r.tracks[i].rtpInterceptor = rtpInterceptor
r.tracks[i].rtpReader = rtpReader
r.tracks[i].rtpProcessor = rtpProcessor
r.tracks[i].rtcpReadStream = rtcpReadStream
r.tracks[i].rtcpInterceptor = rtcpInterceptor

Expand All @@ -457,12 +466,13 @@
// receiveForRtx starts a routine that processes the repair stream.
//
//nolint:cyclop
func (r *RTPReceiver) receiveForRtx(

Check failure on line 469 in rtpreceiver.go

View workflow job for this annotation

GitHub Actions / lint / Go

cognitive complexity 33 of func `(*RTPReceiver).receiveForRtx` is high (> 30) (gocognit)
ssrc SSRC,
rsid string,
streamInfo *interceptor.StreamInfo,
rtpReadStream *srtp.ReadStreamSRTP,
rtpInterceptor interceptor.RTPReader,
rtpReader interceptor.RTPReader,
rtpProcessor interceptor.RTPProcessor,
rtcpReadStream *srtp.ReadStreamSRTCP,
rtcpInterceptor interceptor.RTCPReader,
) error {
Expand All @@ -488,15 +498,21 @@

track.repairStreamInfo = streamInfo
track.repairReadStream = rtpReadStream
track.repairInterceptor = rtpInterceptor
track.repairReader = rtpReader
track.repairProcessor = rtpProcessor
track.repairRtcpReadStream = rtcpReadStream
track.repairRtcpInterceptor = rtcpInterceptor
track.repairStreamChannel = make(chan rtxPacketWithAttributes, 50)

go func() {
for {
b := r.rtxPool.Get().([]byte) // nolint:forcetypeassert
i, attributes, err := track.repairInterceptor.Read(b, nil)
i, attributes, err := track.repairReader.Read(b, nil)
if err != nil {
r.rtxPool.Put(b) // nolint:staticcheck
return

Check failure on line 513 in rtpreceiver.go

View workflow job for this annotation

GitHub Actions / lint / Go

return with no blank line before (nlreturn)
}
i, attributes, err = track.repairProcessor.Process(i, b, attributes)
if err != nil {
r.rtxPool.Put(b) // nolint:staticcheck

Expand Down Expand Up @@ -590,7 +606,7 @@
}

// readRTX returns an RTX packet if one is available on the RTX track, otherwise returns nil.
func (r *RTPReceiver) readRTX(reader *TrackRemote) *rtxPacketWithAttributes {
func (r *RTPReceiver) readRTX(b []byte, reader *TrackRemote) *rtxPacketWithAttributes {
if !reader.HasRTX() {
return nil
}
Expand All @@ -604,7 +620,13 @@
if t := r.streamsForTrack(reader); t != nil {
select {
case rtxPacketReceived := <-t.repairStreamChannel:
return &rtxPacketReceived
{
n := copy(b, rtxPacketReceived.pkt)
_, _, err := t.rtpProcessor.Process(n, b, nil)
if err == nil {
return &rtxPacketReceived
}
}
default:
}
}
Expand Down
2 changes: 1 addition & 1 deletion track_remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (t *TrackRemote) Read(b []byte) (n int, attributes interceptor.Attributes,
}

// If there's a separate RTX track and an RTX packet is available, return that
if rtxPacketReceived := receiver.readRTX(t); rtxPacketReceived != nil {
if rtxPacketReceived := receiver.readRTX(b, t); rtxPacketReceived != nil {
n = copy(b, rtxPacketReceived.pkt)
attributes = rtxPacketReceived.attributes
rtxPacketReceived.release()
Expand Down
Loading