Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multi channel #6

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
module github.com/edgeware/v2l-example-scheduler

go 1.17

require github.com/edgeware/sbgo v0.0.0-20220517144721-cf8cceab4ebd
56 changes: 43 additions & 13 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,44 @@ import (
"log"
"os"
"os/signal"
"strconv"
"time"

"github.com/edgeware/v2l-example-scheduler/v2l"
)

const (
assetDir = "assets"
server = "http://localhost:8090"
channelName = "ch1"
gopDurMS = 2000 // Note, all content must have this same GoP duration
nrGopsPerSegment = 2 // Decides how long output segments will be in average
slidingWindowNrGops = 40 // Should be at least a minute before removing stuff
futureScheduleNrGops = 15 // Threshold for when to add new entries in schedule
contentTemplatePath = "content_template.json" // Template describing input and output
updatePeriodS = 2 // How often the schedule should be checked for updates in seconds
assetDir = "assets"
server = "http://localhost:8090"
channelName = "ch1"
gopDurMS = 2000 // Note, all content must have this same GoP duration
nrGopsPerSegment = 2 // Decides how long output segments will be in average
slidingWindowNrGopsDefault = 40 // Should be at least a minute before removing stuff
futureScheduleNrGops = 15 // Threshold for when to add new entries in schedule
contentTemplatePath = "content_template.json" // Template describing input and output
updatePeriodS = 2 // How often the schedule should be checked for updates in seconds
)

func main() {
err := v2l.DeleteChannel(server, channelName) // Delete any old channel and schedule
nrChannels := 1
var slidingWindowNrGops int64 = slidingWindowNrGopsDefault
var err error
if len(os.Args) > 1 {
nrChannels, err = strconv.Atoi(os.Args[1])
if err != nil {
printUsage()
}
}

if len(os.Args) > 2 {
slidingWindowS, err := strconv.Atoi(os.Args[2])
if err != nil {
printUsage()
}
slidingWindowNrGops = int64(slidingWindowS) * 1000 / gopDurMS
}

err = v2l.DeleteChannels(nrChannels, server) // Delete any old channel and schedule
if err != nil {
log.Fatal(err)
}
Expand All @@ -43,26 +62,37 @@ func main() {
}

// Create channel with a few assets and get state back
channel, err := v2l.CreateChannel(server, channelName, contentTemplatePath, gopDurMS, nrGopsPerSegment,
channels, err := v2l.CreateChannels(nrChannels, server, contentTemplatePath, gopDurMS, nrGopsPerSegment,
slidingWindowNrGops, futureScheduleNrGops, assetPaths)
if err != nil {
log.Fatal(err)
}

signalCh := make(chan os.Signal, 1)
signal.Notify(signalCh, os.Interrupt)
ticker := time.NewTicker(updatePeriodS * time.Second)

ticker := time.NewTicker(updatePeriodS * time.Second / time.Duration(nrChannels))
chIndex := 0
TickerLoop:
for {
select {
case <-signalCh:
log.Printf("Stopping loop\n")
break TickerLoop
case t := <-ticker.C:
err = v2l.UpdateSchedule(server, channel, assetPaths, t)
err = channels[chIndex].UpdateSchedule(server, assetPaths, t)
if err != nil {
log.Fatal(err)
}
}
chIndex++
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest that you do the modulo operation right away, so that one does not suspect an error. index should be less than nrOfChannels. From a performance point of view, it is also better to write it as

chIndex++
if chIndex == nrOfChannels {
chIndex -= nrOfChannels
}

since divisions and mod are very expensive operations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or rather this

if ... {
    chIndex = 0
}

if chIndex == nrChannels {
chIndex = 0
}
}
}

func printUsage() {
println("Usage: ", os.Args[0], "[<number-of-channels>] [<sliding-window-S>] defaults:1 channel, 80 sec ")
os.Exit(1)
}
87 changes: 49 additions & 38 deletions v2l/assetpaths.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,14 @@ package v2l
import (
"bytes"
"encoding/json"
"fmt"
"io/fs"
"math/rand"
"os"
"path/filepath"
"strings"

"github.com/edgeware/sbgo/pkg/esf"
)

// DeleteAllAssetPaths - delete all asset paths from server
Expand Down Expand Up @@ -54,56 +58,63 @@ func DiscoverAssetPaths(dir string) ([]AssetPath, error) {
}
assetPath := filepath.Dir(absAssetPath) // The dir containing content_info.json
assetName := filepath.Base(assetPath)
aps = append(aps, AssetPath{assetName, assetPath})
assetLen, err := getAssetLen(absAssetPath)
if err != nil {
return err
}
aps = append(aps, AssetPath{assetName, assetPath, assetLen})
}
return nil
})
return aps, err
}

// randomEntry - return a random entry given kind and assetPaths. Set offset, length, sctedID
func randomEntry(assetPaths []AssetPath, kind string, offset, length, scteID int64) Entry {
var programs []string
var ads []string
var fillers []string
var slates []string
func randomEntry(assetPaths []AssetPath, kind string, scteID int64) Entry {
var selectedAssets []AssetPath
var subPath = "/" + kind + "s/"
for _, ap := range assetPaths {
if strings.Contains(ap.Path, "/filler") {
fillers = append(fillers, ap.ID)
continue
}
if strings.Contains(ap.Path, "/slates/") {
fillers = append(fillers, ap.ID)
continue
}
if strings.Contains(ap.Path, "/ads/") {
ads = append(ads, ap.ID)
continue
if strings.Contains(ap.Path, subPath) {
selectedAssets = append(selectedAssets, ap)
}
programs = append(programs, ap.ID)
}
var assetID string
switch kind {
case "filler":
idx := rand.Intn(len(fillers))
assetID = fillers[idx]
case "slates":
idx := rand.Intn(len(slates))
assetID = slates[idx]
case "program":
idx := rand.Intn(len(programs))
assetID = programs[idx]
case "ad":
idx := rand.Intn(len(ads))
assetID = ads[idx]
default:
panic("Unknown kind of asset")

if len(selectedAssets) == 0 {
panic("No such asset kind: " + kind)
}

asset := selectedAssets[rand.Intn(len(selectedAssets))]

return Entry{
Name: assetID,
AssetID: assetID,
Offset: offset,
Len: length,
Name: asset.ID,
AssetID: asset.ID,
Offset: 0,
Len: asset.len,
SCTEEventID: scteID,
}
}

// getAssetLen -- get length in number of GoPs
func getAssetLen(assetPath string) (int64, error) {
bytes, err := os.ReadFile(assetPath)
if err != nil {
return 0, err
}

ci, err := esf.ParseContentInfo(bytes)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: get duration from a dat file instead,
only the "demo-contentinfo" files contain a duration

if err != nil {
return 0, err
}

cd := ci.ContentDurationMS
if cd == 0 {
return 0, fmt.Errorf("\"content_duration_ms\" not found")
}

gd := ci.GOPDurationMS
if gd == 0 {
return 0, fmt.Errorf("\"constant_gop_duration_ms\" not found")
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And JSON here too


return cd / gd, nil
}
Loading