Skip to content

Commit

Permalink
Rebuild functional options
Browse files Browse the repository at this point in the history
  • Loading branch information
jrauh01 committed Nov 24, 2023
1 parent 604cfa1 commit 2f328e1
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 37 deletions.
10 changes: 3 additions & 7 deletions cmd/icinga-kubernetes/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,6 @@ func main() {
).Run(ctx)
})

//upsertPodChannelSpreader := sync.NewChannelSpreader[database.Entity](forwardUpsertPodsChannel)
//deletePodChannelSpreader := sync.NewChannelSpreader[any](forwardDeletePodsChannel)

//upsertPodChannel := upsertPodChannelSpreader.NewChannel()
//deletePodChannel := deletePodChannelSpreader.NewChannel()

forwardUpsertPodsToLogChannel := make(chan contracts.KUpsert)
forwardDeletePodsToLogChannel := make(chan contracts.KDelete)

Expand All @@ -114,9 +108,11 @@ func main() {
schema.NewPod,
informers.Core().V1().Pods().Informer(),
logs.GetChildLogger("Pods"),
).Run(
ctx,
sync.WithForwardUpsertToLog(forwardUpsertPodsToLogChannel),
sync.WithForwardDeleteToLog(forwardDeletePodsToLogChannel),
).Run(ctx)
)
})

logSync := sync.NewLogSync(k, db, logs.GetChildLogger("ContainerLogs"))
Expand Down
70 changes: 40 additions & 30 deletions pkg/sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,39 +14,22 @@ import (
kcache "k8s.io/client-go/tools/cache"
)

type Option func(s *sync)

func WithForwardUpsertToLog(channel chan<- contracts.KUpsert) Option {
return func(s *sync) {
s.forwardUpsertToLogChannel = channel
}
}

func WithForwardDeleteToLog(channel chan<- contracts.KDelete) Option {
return func(s *sync) {
s.forwardDeleteToLogChannel = channel
}
}

type Sync interface {
Run(context.Context) error
Run(context.Context, ...ExecOption) error
}

type sync struct {
db *database.DB
factory func() contracts.Resource
informer kcache.SharedInformer
logger *logging.Logger
forwardUpsertToLogChannel chan<- contracts.KUpsert
forwardDeleteToLogChannel chan<- contracts.KDelete
db *database.DB
factory func() contracts.Resource
informer kcache.SharedInformer
logger *logging.Logger
}

func NewSync(
db *database.DB,
factory func() contracts.Resource,
informer kcache.SharedInformer,
logger *logging.Logger,
options ...Option,
) Sync {
s := &sync{
db: db,
Expand All @@ -55,14 +38,39 @@ func NewSync(
factory: factory,
}

for _, option := range options {
option(s)
return s
}

func WithForwardUpsertToLog(channel chan<- contracts.KUpsert) ExecOption {
return func(storage *OptionStorage) {
storage.forwardUpsertToLogChannel = channel
}
}

func WithForwardDeleteToLog(channel chan<- contracts.KDelete) ExecOption {
return func(storage *OptionStorage) {
storage.forwardDeleteToLogChannel = channel
}
}

return s
type ExecOption func(storage *OptionStorage)

type OptionStorage struct {
forwardUpsertToLogChannel chan<- contracts.KUpsert
forwardDeleteToLogChannel chan<- contracts.KDelete
}

func (s *sync) Run(ctx context.Context) error {
func NewOptionStorage(execOptions ...ExecOption) *OptionStorage {
optionStorage := &OptionStorage{}

for _, option := range execOptions {
option(optionStorage)
}

return optionStorage
}

func (s *sync) Run(ctx context.Context, execOptions ...ExecOption) error {
s.logger.Info("Starting sync")

s.logger.Debug("Warming up")
Expand Down Expand Up @@ -90,6 +98,8 @@ func (s *sync) Run(ctx context.Context) error {

s.factory().GetResourceVersion()

optionStorage := NewOptionStorage(execOptions...)

// init upsert channel spreader
multiplexUpsertChannel := make(chan contracts.KUpsert)
defer close(multiplexUpsertChannel)
Expand All @@ -98,8 +108,8 @@ func (s *sync) Run(ctx context.Context) error {

upsertChannel := multiplexUpsert.NewChannel()

if s.forwardUpsertToLogChannel != nil {
multiplexUpsert.AddChannel(s.forwardUpsertToLogChannel)
if optionStorage.forwardUpsertToLogChannel != nil {
multiplexUpsert.AddChannel(optionStorage.forwardUpsertToLogChannel)
}

// run upsert channel spreader
Expand Down Expand Up @@ -172,8 +182,8 @@ func (s *sync) Run(ctx context.Context) error {

deleteChannel := multiplexDelete.NewChannel()

if s.forwardDeleteToLogChannel != nil {
multiplexDelete.AddChannel(s.forwardDeleteToLogChannel)
if optionStorage.forwardDeleteToLogChannel != nil {
multiplexDelete.AddChannel(optionStorage.forwardDeleteToLogChannel)
}

// run delete channel spreader
Expand Down

0 comments on commit 2f328e1

Please sign in to comment.