Skip to content

Commit

Permalink
Integrate clusterUuid into resource synchronization and metadata ha…
Browse files Browse the repository at this point in the history
…ndling

- Add `clusterUuid` to the `Meta` struct and update the `ObtainMeta` method to set it.
- Modify the `Resource` interface to require `clusterUuid` in the `Obtain` method.
  • Loading branch information
jhoxhaa committed Nov 22, 2024
1 parent d9e1013 commit e95e761
Show file tree
Hide file tree
Showing 20 changed files with 68 additions and 62 deletions.
34 changes: 17 additions & 17 deletions cmd/icinga-kubernetes/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ func main() {
}

g.Go(func() error {
s := syncv1.NewSync(db, factory.Core().V1().Namespaces().Informer(), log.WithName("namespaces"), schemav1.NewNamespace)
s := syncv1.NewSync(db, factory.Core().V1().Namespaces().Informer(), log.WithName("namespaces"), schemav1.NewNamespace, clusterUuid)

return s.Run(ctx)
})
Expand All @@ -313,7 +313,7 @@ func main() {

wg.Add(1)
g.Go(func() error {
s := syncv1.NewSync(db, factory.Core().V1().Nodes().Informer(), log.WithName("nodes"), schemav1.NewNode)
s := syncv1.NewSync(db, factory.Core().V1().Nodes().Informer(), log.WithName("nodes"), schemav1.NewNode, clusterUuid)

var forwardForNotifications []syncv1.Feature
if cfg.Notifications.Url != "" {
Expand All @@ -340,7 +340,7 @@ func main() {
)

f := schemav1.NewPodFactory(clientset)
s := syncv1.NewSync(db, factory.Core().V1().Pods().Informer(), log.WithName("pods"), f.New)
s := syncv1.NewSync(db, factory.Core().V1().Pods().Informer(), log.WithName("pods"), f.New, clusterUuid)

wg.Done()

Expand All @@ -354,7 +354,7 @@ func main() {
wg.Add(1)
g.Go(func() error {
s := syncv1.NewSync(
db, factory.Apps().V1().Deployments().Informer(), log.WithName("deployments"), schemav1.NewDeployment)
db, factory.Apps().V1().Deployments().Informer(), log.WithName("deployments"), schemav1.NewDeployment, clusterUuid)

var forwardForNotifications []syncv1.Feature
if cfg.Notifications.Url != "" {
Expand All @@ -373,7 +373,7 @@ func main() {
wg.Add(1)
g.Go(func() error {
s := syncv1.NewSync(
db, factory.Apps().V1().DaemonSets().Informer(), log.WithName("daemon-sets"), schemav1.NewDaemonSet)
db, factory.Apps().V1().DaemonSets().Informer(), log.WithName("daemon-sets"), schemav1.NewDaemonSet, clusterUuid)

var forwardForNotifications []syncv1.Feature
if cfg.Notifications.Url != "" {
Expand All @@ -392,7 +392,7 @@ func main() {
wg.Add(1)
g.Go(func() error {
s := syncv1.NewSync(
db, factory.Apps().V1().ReplicaSets().Informer(), log.WithName("replica-sets"), schemav1.NewReplicaSet)
db, factory.Apps().V1().ReplicaSets().Informer(), log.WithName("replica-sets"), schemav1.NewReplicaSet, clusterUuid)

var forwardForNotifications []syncv1.Feature
if cfg.Notifications.Url != "" {
Expand All @@ -411,7 +411,7 @@ func main() {
wg.Add(1)
g.Go(func() error {
s := syncv1.NewSync(
db, factory.Apps().V1().StatefulSets().Informer(), log.WithName("stateful-sets"), schemav1.NewStatefulSet)
db, factory.Apps().V1().StatefulSets().Informer(), log.WithName("stateful-sets"), schemav1.NewStatefulSet, clusterUuid)

var forwardForNotifications []syncv1.Feature
if cfg.Notifications.Url != "" {
Expand All @@ -428,60 +428,60 @@ func main() {
})

g.Go(func() error {
s := syncv1.NewSync(db, factory.Core().V1().Services().Informer(), log.WithName("services"), schemav1.NewService)
s := syncv1.NewSync(db, factory.Core().V1().Services().Informer(), log.WithName("services"), schemav1.NewService, clusterUuid)

return s.Run(ctx)
})

g.Go(func() error {
s := syncv1.NewSync(db, factory.Discovery().V1().EndpointSlices().Informer(), log.WithName("endpoints"), schemav1.NewEndpointSlice)
s := syncv1.NewSync(db, factory.Discovery().V1().EndpointSlices().Informer(), log.WithName("endpoints"), schemav1.NewEndpointSlice, clusterUuid)

return s.Run(ctx)
})

g.Go(func() error {
s := syncv1.NewSync(db, factory.Core().V1().Secrets().Informer(), log.WithName("secrets"), schemav1.NewSecret)
s := syncv1.NewSync(db, factory.Core().V1().Secrets().Informer(), log.WithName("secrets"), schemav1.NewSecret, clusterUuid)
return s.Run(ctx)
})

g.Go(func() error {
s := syncv1.NewSync(db, factory.Core().V1().ConfigMaps().Informer(), log.WithName("config-maps"), schemav1.NewConfigMap)
s := syncv1.NewSync(db, factory.Core().V1().ConfigMaps().Informer(), log.WithName("config-maps"), schemav1.NewConfigMap, clusterUuid)

return s.Run(ctx)
})

g.Go(func() error {
s := syncv1.NewSync(db, factory.Events().V1().Events().Informer(), log.WithName("events"), schemav1.NewEvent)
s := syncv1.NewSync(db, factory.Events().V1().Events().Informer(), log.WithName("events"), schemav1.NewEvent, clusterUuid)

return s.Run(ctx, syncv1.WithNoDelete(), syncv1.WithNoWarumup())
})

g.Go(func() error {
s := syncv1.NewSync(db, factory.Core().V1().PersistentVolumeClaims().Informer(), log.WithName("pvcs"), schemav1.NewPvc)
s := syncv1.NewSync(db, factory.Core().V1().PersistentVolumeClaims().Informer(), log.WithName("pvcs"), schemav1.NewPvc, clusterUuid)

return s.Run(ctx)
})

g.Go(func() error {
s := syncv1.NewSync(db, factory.Core().V1().PersistentVolumes().Informer(), log.WithName("persistent-volumes"), schemav1.NewPersistentVolume)
s := syncv1.NewSync(db, factory.Core().V1().PersistentVolumes().Informer(), log.WithName("persistent-volumes"), schemav1.NewPersistentVolume, clusterUuid)

return s.Run(ctx)
})

g.Go(func() error {
s := syncv1.NewSync(db, factory.Batch().V1().Jobs().Informer(), log.WithName("jobs"), schemav1.NewJob)
s := syncv1.NewSync(db, factory.Batch().V1().Jobs().Informer(), log.WithName("jobs"), schemav1.NewJob, clusterUuid)

return s.Run(ctx)
})

g.Go(func() error {
s := syncv1.NewSync(db, factory.Batch().V1().CronJobs().Informer(), log.WithName("cron-jobs"), schemav1.NewCronJob)
s := syncv1.NewSync(db, factory.Batch().V1().CronJobs().Informer(), log.WithName("cron-jobs"), schemav1.NewCronJob, clusterUuid)

return s.Run(ctx)
})

g.Go(func() error {
s := syncv1.NewSync(db, factory.Networking().V1().Ingresses().Informer(), log.WithName("ingresses"), schemav1.NewIngress)
s := syncv1.NewSync(db, factory.Networking().V1().Ingresses().Informer(), log.WithName("ingresses"), schemav1.NewIngress, clusterUuid)

return s.Run(ctx)
})
Expand Down
4 changes: 2 additions & 2 deletions pkg/schema/v1/config_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ func NewConfigMap() Resource {
return &ConfigMap{}
}

func (c *ConfigMap) Obtain(k8s kmetav1.Object) {
c.ObtainMeta(k8s)
func (c *ConfigMap) Obtain(k8s kmetav1.Object, clusterUuid types.UUID) {
c.ObtainMeta(k8s, clusterUuid)

configMap := k8s.(*kcorev1.ConfigMap)

Expand Down
6 changes: 4 additions & 2 deletions pkg/schema/v1/contracts.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,22 @@ var NameSpaceKubernetes = uuid.MustParse("3f249403-2bb0-428f-8e91-504d1fd7ddb6")

type Resource interface {
kmetav1.Object
Obtain(k8s kmetav1.Object)
Obtain(k8s kmetav1.Object, clusterUuid types.UUID)
}

type Meta struct {
Uuid types.UUID
ClusterUuid types.UUID
Uid ktypes.UID
Namespace string
Name string
ResourceVersion string
Created types.UnixMilli
}

func (m *Meta) ObtainMeta(k8s kmetav1.Object) {
func (m *Meta) ObtainMeta(k8s kmetav1.Object, clusterUuid types.UUID) {
m.Uuid = EnsureUUID(k8s.GetUID())
m.ClusterUuid = clusterUuid
m.Uid = k8s.GetUID()
m.Namespace = k8s.GetNamespace()
m.Name = k8s.GetName()
Expand Down
4 changes: 2 additions & 2 deletions pkg/schema/v1/cron_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ func NewCronJob() Resource {
return &CronJob{}
}

func (c *CronJob) Obtain(k8s kmetav1.Object) {
c.ObtainMeta(k8s)
func (c *CronJob) Obtain(k8s kmetav1.Object, clusterUuid types.UUID) {
c.ObtainMeta(k8s, clusterUuid)

cronJob := k8s.(*kbatchv1.CronJob)

Expand Down
4 changes: 2 additions & 2 deletions pkg/schema/v1/daemon_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ func NewDaemonSet() Resource {
return &DaemonSet{}
}

func (d *DaemonSet) Obtain(k8s kmetav1.Object) {
d.ObtainMeta(k8s)
func (d *DaemonSet) Obtain(k8s kmetav1.Object, clusterUuid types.UUID) {
d.ObtainMeta(k8s, clusterUuid)

daemonSet := k8s.(*kappsv1.DaemonSet)

Expand Down
4 changes: 2 additions & 2 deletions pkg/schema/v1/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ func NewDeployment() Resource {
return &Deployment{}
}

func (d *Deployment) Obtain(k8s kmetav1.Object) {
d.ObtainMeta(k8s)
func (d *Deployment) Obtain(k8s kmetav1.Object, clusterUuid types.UUID) {
d.ObtainMeta(k8s, clusterUuid)

deployment := k8s.(*kappsv1.Deployment)

Expand Down
4 changes: 2 additions & 2 deletions pkg/schema/v1/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ func NewEndpointSlice() Resource {
return &EndpointSlice{}
}

func (e *EndpointSlice) Obtain(k8s kmetav1.Object) {
e.ObtainMeta(k8s)
func (e *EndpointSlice) Obtain(k8s kmetav1.Object, clusterUuid types.UUID) {
e.ObtainMeta(k8s, clusterUuid)

endpointSlice := k8s.(*kdiscoveryv1.EndpointSlice)

Expand Down
4 changes: 2 additions & 2 deletions pkg/schema/v1/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ func NewEvent() Resource {
return &Event{}
}

func (e *Event) Obtain(k8s kmetav1.Object) {
e.ObtainMeta(k8s)
func (e *Event) Obtain(k8s kmetav1.Object, clusterUuid types.UUID) {
e.ObtainMeta(k8s, clusterUuid)

event := k8s.(*keventsv1.Event)

Expand Down
4 changes: 2 additions & 2 deletions pkg/schema/v1/ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ func NewIngress() Resource {
return &Ingress{}
}

func (i *Ingress) Obtain(k8s kmetav1.Object) {
i.ObtainMeta(k8s)
func (i *Ingress) Obtain(k8s kmetav1.Object, clusterUuid types.UUID) {
i.ObtainMeta(k8s, clusterUuid)

ingress := k8s.(*networkingv1.Ingress)

Expand Down
4 changes: 2 additions & 2 deletions pkg/schema/v1/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ func NewJob() Resource {
return &Job{}
}

func (j *Job) Obtain(k8s kmetav1.Object) {
j.ObtainMeta(k8s)
func (j *Job) Obtain(k8s kmetav1.Object, clusterUuid types.UUID) {
j.ObtainMeta(k8s, clusterUuid)

job := k8s.(*kbatchv1.Job)

Expand Down
4 changes: 2 additions & 2 deletions pkg/schema/v1/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ func NewNamespace() Resource {
return &Namespace{}
}

func (n *Namespace) Obtain(k8s kmetav1.Object) {
n.ObtainMeta(k8s)
func (n *Namespace) Obtain(k8s kmetav1.Object, clusterUuid types.UUID) {
n.ObtainMeta(k8s, clusterUuid)

namespace := k8s.(*kcorev1.Namespace)

Expand Down
4 changes: 2 additions & 2 deletions pkg/schema/v1/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ func NewNode() Resource {
return &Node{}
}

func (n *Node) Obtain(k8s kmetav1.Object) {
n.ObtainMeta(k8s)
func (n *Node) Obtain(k8s kmetav1.Object, clusterUuid types.UUID) {
n.ObtainMeta(k8s, clusterUuid)

node := k8s.(*kcorev1.Node)

Expand Down
4 changes: 2 additions & 2 deletions pkg/schema/v1/persistent_volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ func NewPersistentVolume() Resource {
return &PersistentVolume{}
}

func (p *PersistentVolume) Obtain(k8s kmetav1.Object) {
p.ObtainMeta(k8s)
func (p *PersistentVolume) Obtain(k8s kmetav1.Object, clusterUuid types.UUID) {
p.ObtainMeta(k8s, clusterUuid)

persistentVolume := k8s.(*kcorev1.PersistentVolume)

Expand Down
4 changes: 2 additions & 2 deletions pkg/schema/v1/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ func (f *PodFactory) New() Resource {
return &Pod{factory: f}
}

func (p *Pod) Obtain(k8s kmetav1.Object) {
p.ObtainMeta(k8s)
func (p *Pod) Obtain(k8s kmetav1.Object, clusterUuid types.UUID) {
p.ObtainMeta(k8s, clusterUuid)

pod := k8s.(*kcorev1.Pod)

Expand Down
4 changes: 2 additions & 2 deletions pkg/schema/v1/pvc.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ func NewPvc() Resource {
return &Pvc{}
}

func (p *Pvc) Obtain(k8s kmetav1.Object) {
p.ObtainMeta(k8s)
func (p *Pvc) Obtain(k8s kmetav1.Object, clusterUuid types.UUID) {
p.ObtainMeta(k8s, clusterUuid)

pvc := k8s.(*kcorev1.PersistentVolumeClaim)

Expand Down
4 changes: 2 additions & 2 deletions pkg/schema/v1/replica_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ func NewReplicaSet() Resource {
return &ReplicaSet{}
}

func (r *ReplicaSet) Obtain(k8s kmetav1.Object) {
r.ObtainMeta(k8s)
func (r *ReplicaSet) Obtain(k8s kmetav1.Object, clusterUuid types.UUID) {
r.ObtainMeta(k8s, clusterUuid)

replicaSet := k8s.(*kappsv1.ReplicaSet)

Expand Down
4 changes: 2 additions & 2 deletions pkg/schema/v1/secret.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ func NewSecret() Resource {
return &Secret{}
}

func (s *Secret) Obtain(k8s kmetav1.Object) {
s.ObtainMeta(k8s)
func (s *Secret) Obtain(k8s kmetav1.Object, clusterUuid types.UUID) {
s.ObtainMeta(k8s, clusterUuid)

secret := k8s.(*kcorev1.Secret)

Expand Down
4 changes: 2 additions & 2 deletions pkg/schema/v1/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ func NewService() Resource {
return &Service{}
}

func (s *Service) Obtain(k8s kmetav1.Object) {
s.ObtainMeta(k8s)
func (s *Service) Obtain(k8s kmetav1.Object, clusterUuid types.UUID) {
s.ObtainMeta(k8s, clusterUuid)

service := k8s.(*kcorev1.Service)

Expand Down
4 changes: 2 additions & 2 deletions pkg/schema/v1/stateful_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ func NewStatefulSet() Resource {
return &StatefulSet{}
}

func (s *StatefulSet) Obtain(k8s kmetav1.Object) {
s.ObtainMeta(k8s)
func (s *StatefulSet) Obtain(k8s kmetav1.Object, clusterUuid types.UUID) {
s.ObtainMeta(k8s, clusterUuid)

statefulSet := k8s.(*kappsv1.StatefulSet)

Expand Down
22 changes: 13 additions & 9 deletions pkg/sync/v1/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package v1
import (
"context"
"github.com/go-logr/logr"
"github.com/icinga/icinga-go-library/types"
"github.com/icinga/icinga-kubernetes/pkg/com"
"github.com/icinga/icinga-kubernetes/pkg/database"
schemav1 "github.com/icinga/icinga-kubernetes/pkg/schema/v1"
Expand All @@ -12,23 +13,26 @@ import (
)

type Sync struct {
db *database.Database
informer cache.SharedIndexInformer
log logr.Logger
factory func() schemav1.Resource
db *database.Database
informer cache.SharedIndexInformer
log logr.Logger
factory func() schemav1.Resource
clusterUuid types.UUID
}

func NewSync(
db *database.Database,
informer cache.SharedIndexInformer,
log logr.Logger,
factory func() schemav1.Resource,
clusterUuid types.UUID,
) *Sync {
return &Sync{
db: db,
informer: informer,
log: log,
factory: factory,
db: db,
informer: informer,
log: log,
factory: factory,
clusterUuid: clusterUuid,
}
}

Expand Down Expand Up @@ -80,7 +84,7 @@ func (s *Sync) warmup(ctx context.Context, c *Controller) error {
func (s *Sync) sync(ctx context.Context, c *Controller, features ...Feature) error {
sink := NewSink(func(i *Item) interface{} {
entity := s.factory()
entity.Obtain(*i.Item)
entity.Obtain(*i.Item, s.clusterUuid)

return entity
}, func(k interface{}) interface{} {
Expand Down

0 comments on commit e95e761

Please sign in to comment.