Skip to content

Commit

Permalink
add configurable packetsize, reuse relay buffers
Browse files Browse the repository at this point in the history
  • Loading branch information
iSchluff committed Nov 6, 2023
1 parent d1c4d45 commit 6bbb6e6
Show file tree
Hide file tree
Showing 15 changed files with 206 additions and 60 deletions.
20 changes: 14 additions & 6 deletions api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,25 @@ func (s *Server) Listen(ctx context.Context) error {
MaxHeaderBytes: 1 << 14,
}

s.done.Add(1)
s.done.Add(2)
// http listener
go func() {
defer s.done.Done()
err := serv.ListenAndServe()
if err != nil {
if err != nil && err != http.ErrServerClosed {
log.Println(err)
}
}()
s.done.Add(1)

// shutdown goroutine
go func() {
defer s.done.Done()
<-ctx.Done()
ctx2, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
serv.Shutdown(ctx2)
if err := serv.Shutdown(ctx2); err != nil {
log.Println(err)
}
}()

return nil
Expand All @@ -66,11 +70,15 @@ func (s *Server) Wait() {
func (s *Server) HandleStreams(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
stats := s.srtServer.GetStatistics()
json.NewEncoder(w).Encode(stats)
if err := json.NewEncoder(w).Encode(stats); err != nil {
log.Println(err)
}
}

func (s *Server) HandleSockets(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
stats := s.srtServer.GetSocketStatistics()
json.NewEncoder(w).Encode(stats)
if err := json.NewEncoder(w).Encode(stats); err != nil {
log.Println(err)
}
}
1 change: 0 additions & 1 deletion auth/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ func Test_httpAuth_Authenticate(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {

auth := NewHTTPAuth(HTTPAuthConfig{
URL: srv.URL + tt.url,
})
Expand Down
18 changes: 14 additions & 4 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,18 @@ type AppConfig struct {
PublicAddress string
Latency uint
ListenTimeout uint
Buffersize uint
SyncClients bool
LossMaxTTL uint

// total buffer size in bytes, determines maximum delay of a client
Buffersize uint

// Whether to sync clients to GOP start
SyncClients bool

// The value up to which the Reorder Tolerance may grow, 0 by default
LossMaxTTL uint

// max size of packets in bytes, default is 1316
PacketSize uint
}

type AuthConfig struct {
Expand Down Expand Up @@ -78,8 +87,9 @@ func Parse(paths []string) (*Config, error) {
Latency: 200,
ListenTimeout: 3000,
LossMaxTTL: 0,
Buffersize: 384000,
Buffersize: 384000, // 1s @ 3Mbits/s
SyncClients: false,
PacketSize: 1316, // max is 1456
},
Auth: AuthConfig{
Type: "static",
Expand Down
4 changes: 4 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ func TestConfig(t *testing.T) {
assert.Equal(t, conf.App.Latency, uint(1337))
assert.Equal(t, conf.App.Buffersize, uint(123000))
assert.Equal(t, conf.App.SyncClients, true)
assert.Equal(t, conf.App.PacketSize, uint(1456))
assert.Equal(t, conf.App.LossMaxTTL, uint(50))
assert.Equal(t, conf.App.ListenTimeout, uint(5555))
assert.Equal(t, conf.App.PublicAddress, "dontlookmeup:5432")

assert.Equal(t, conf.API.Enabled, false)
assert.Equal(t, conf.API.Address, ":1234")
Expand Down
4 changes: 4 additions & 0 deletions config/testfiles/config_test.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ addresses = ["127.0.0.1:5432"]
latency = 1337
buffersize = 123000
syncClients = true
packetSize = 1456
lossMaxTTL= 50
listenTimeout= 5555
publicAddress = "dontlookmeup:5432"

[api]
enabled = false
Expand Down
1 change: 0 additions & 1 deletion format/demuxer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ func DetermineTransport(data []byte) TransportType {
// FindInit determines a synchronization point in the stream
// Finally it returns the required stream packets up to that point
func (d *Demuxer) FindInit(data []byte) ([][]byte, error) {

if d.transport == Unknown {
d.transport = DetermineTransport(data)
}
Expand Down
3 changes: 2 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ func main() {
Auth: auth,
},
Relay: relay.RelayConfig{
Buffersize: conf.App.Buffersize, // 1s @ 3Mbits/
BufferSize: conf.App.Buffersize,
PacketSize: conf.App.PacketSize,
},
}

Expand Down
1 change: 0 additions & 1 deletion mpegts/packet.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ func (pkt *Packet) ToBytes(data []byte) error {
return ErrDataTooLong
}
copy(data[offset:offset+payloadLength], pkt.payload)
offset += payloadLength

return nil
}
Expand Down
19 changes: 12 additions & 7 deletions mpegts/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package mpegts

import (
"encoding/json"
"io/ioutil"
"log"
"os"
"os/exec"
Expand Down Expand Up @@ -80,18 +79,24 @@ func checkParser(t *testing.T, p *Parser, data []byte, name string, numFrames in
t.Errorf("%s - Init should not be nil", name)
}

file, err := ioutil.TempFile("", "srttest")
file, err := os.CreateTemp("", "srttest")
if err != nil {
t.Fatal(err)
}
defer os.Remove(file.Name())

for i := range pkts {
buf := pkts[i]
file.Write(buf)
if _, err := file.Write(buf); err != nil {
t.Error(err)
}
}
if _, err := file.Write(data[offset:]); err != nil {
t.Fatal(err)
}
if err := file.Sync(); err != nil {
t.Fatal(err)
}
file.Write(data[offset:])
file.Sync()

// compare ffprobe results with original
got, err := ffprobe(file.Name())
Expand All @@ -103,7 +108,7 @@ func checkParser(t *testing.T, p *Parser, data []byte, name string, numFrames in

func TestParser_ParseH264_basic(t *testing.T) {
// Parse 1s complete MPEG-TS with NAL at start
data, err := ioutil.ReadFile("h264.ts")
data, err := os.ReadFile("h264.ts")
if err != nil {
t.Fatalf("failed to open test file")
}
Expand All @@ -113,7 +118,7 @@ func TestParser_ParseH264_basic(t *testing.T) {

func TestParser_ParseH264_complex(t *testing.T) {
// Parse 1s complete MPEG-TS with NAL at start
data, err := ioutil.ReadFile("h264_long.ts")
data, err := os.ReadFile("h264_long.ts")
if err != nil {
t.Fatalf("failed to open test file")
}
Expand Down
11 changes: 5 additions & 6 deletions relay/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type UnsubscribeFunc func()
type Channel struct {
mutex sync.Mutex
subs Subs
buffersize uint
maxPackets uint

// statistics
clients atomic.Value
Expand Down Expand Up @@ -47,10 +47,10 @@ func (subs Subs) Remove(sub chan []byte) Subs {
return subs[:len(subs)-1] // Truncate slice.
}

func NewChannel(buffersize uint) *Channel {
func NewChannel(maxPackets uint) *Channel {
ch := &Channel{
subs: make([]chan []byte, 0, 10),
buffersize: buffersize,
maxPackets: maxPackets,
created: time.Now(),
}
ch.clients.Store(0)
Expand All @@ -61,8 +61,7 @@ func NewChannel(buffersize uint) *Channel {
func (ch *Channel) Sub() (<-chan []byte, UnsubscribeFunc) {
ch.mutex.Lock()
defer ch.mutex.Unlock()
channelbuffer := ch.buffersize / 1316
sub := make(chan []byte, channelbuffer)
sub := make(chan []byte, ch.maxPackets)
ch.subs = append(ch.subs, sub)
ch.clients.Store(len(ch.subs))

Expand Down Expand Up @@ -95,7 +94,7 @@ func (ch *Channel) Pub(b []byte) {
// Remember overflowed chans for drop
default:
toRemove = append(toRemove, ch.subs[i])
log.Println("dropping client", i)
log.Println("dropping overflowing client", i)
}
}
for _, sub := range toRemove {
Expand Down
7 changes: 4 additions & 3 deletions relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ var (
)

type RelayConfig struct {
Buffersize uint
BufferSize uint
PacketSize uint
}

type Relay interface {
Expand Down Expand Up @@ -53,7 +54,7 @@ func (s *RelayImpl) Publish(name string) (chan<- []byte, error) {
return nil, ErrStreamAlreadyExists
}

channel := NewChannel(s.config.Buffersize)
channel := NewChannel(s.config.BufferSize / s.config.PacketSize)
s.channels[name] = channel

ch := make(chan []byte)
Expand All @@ -67,7 +68,7 @@ func (s *RelayImpl) Publish(name string) (chan<- []byte, error) {
if !ok {
// Need a lock on the map first to stop new subscribers
s.mutex.Lock()
log.Println("Removing stream", name)
log.Println("Unpublished stream", name)
delete(s.channels, name)
channel.Close()
s.mutex.Unlock()
Expand Down
8 changes: 4 additions & 4 deletions relay/relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
)

func TestRelayImpl_SubscribeAndUnsubscribe(t *testing.T) {
config := RelayConfig{Buffersize: 1316 * 50}
config := RelayConfig{BufferSize: 50, PacketSize: 1}
relay := NewRelay(&config)
data := []byte{1, 2, 3, 4}

Expand Down Expand Up @@ -46,7 +46,7 @@ func TestRelayImpl_SubscribeAndUnsubscribe(t *testing.T) {
}

func TestRelayImpl_PublisherClose(t *testing.T) {
config := RelayConfig{Buffersize: 0}
config := RelayConfig{BufferSize: 1, PacketSize: 1}
relay := NewRelay(&config)

ch, _ := relay.Publish("test")
Expand All @@ -70,7 +70,7 @@ func TestRelayImpl_PublisherClose(t *testing.T) {
}

func TestRelayImpl_DoublePublish(t *testing.T) {
config := RelayConfig{Buffersize: 0}
config := RelayConfig{BufferSize: 1, PacketSize: 1}
relay := NewRelay(&config)
relay.Publish("foo")
_, err := relay.Publish("foo")
Expand All @@ -81,7 +81,7 @@ func TestRelayImpl_DoublePublish(t *testing.T) {
}

func TestRelayImpl_SubscribeNonExisting(t *testing.T) {
config := RelayConfig{Buffersize: 0}
config := RelayConfig{BufferSize: 1, PacketSize: 1}
relay := NewRelay(&config)

_, _, err := relay.Subscribe("foobar")
Expand Down
12 changes: 12 additions & 0 deletions srt/pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package srt

import "sync"

func newBufferPool(packetSize uint) *sync.Pool {
return &sync.Pool{
New: func() interface{} {
buf := make([]byte, packetSize)
return &buf
},
}
}
Loading

0 comments on commit 6bbb6e6

Please sign in to comment.