Skip to content

Commit

Permalink
Sync clusters
Browse files Browse the repository at this point in the history
  • Loading branch information
jhoxhaa committed Nov 29, 2024
1 parent c9d3599 commit bd82e38
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 25 deletions.
61 changes: 36 additions & 25 deletions cmd/icinga-kubernetes/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,6 @@ import (
"time"
)

type clusterContextKeyType string

const clusterContextKey clusterContextKeyType = "clusterContextKey"

const expectedSchemaVersion = "0.2.0"

func main() {
Expand Down Expand Up @@ -123,15 +119,30 @@ func main() {

g, ctx := errgroup.WithContext(context.Background())

kubeconfigPath := kclientcmd.NewDefaultClientConfigLoadingRules().GetDefaultFilename()
kubeconfig, err := kclientcmd.LoadFromFile(kubeconfigPath)
if err != nil {
klog.Fatalf("Failed to load kubeconfig: %v", err)
}

name := kubeconfig.CurrentContext
namespaceName := "kube-system"
ns, err := clientset.CoreV1().Namespaces().Get(ctx, namespaceName, v1.GetOptions{})
ns, err := clientset.CoreV1().Namespaces().Get(context.TODO(), namespaceName, v1.GetOptions{})
if err != nil {
klog.Fatalf("Failed to retrieve namespace '%s': %v. Ensure the cluster is accessible and the namespace exists.", namespaceName, err)
klog.Fatalf("Failed to retrieve namespace '%s' for cluster '%s': %v", namespaceName, name, err)
}

cluster := &schemav1.Cluster{
Uuid: schemav1.EnsureUUID(ns.UID),
Name: name,
}

clusterUuid := schemav1.EnsureUUID(ns.UID)
stmt, _ := db.BuildUpsertStmt(cluster)
if _, err := db.NamedExecContext(ctx, stmt, cluster); err != nil {
klog.Error(errors.Wrap(err, "can't update cluster"))
}

ctx = context.WithValue(ctx, clusterContextKey, clusterUuid)
ctx = cluster.NewClusterContext(ctx)

if hasSchema {
var version string
Expand Down Expand Up @@ -304,7 +315,7 @@ func main() {
}

g.Go(func() error {
s := syncv1.NewSync(db, factory.Core().V1().Namespaces().Informer(), log.WithName("namespaces"), schemav1.NewNamespace, clusterUuid)
s := syncv1.NewSync(db, factory.Core().V1().Namespaces().Informer(), log.WithName("namespaces"), schemav1.NewNamespace, cluster.Uuid)

return s.Run(ctx)
})
Expand All @@ -313,7 +324,7 @@ func main() {

wg.Add(1)
g.Go(func() error {
s := syncv1.NewSync(db, factory.Core().V1().Nodes().Informer(), log.WithName("nodes"), schemav1.NewNode, clusterUuid)
s := syncv1.NewSync(db, factory.Core().V1().Nodes().Informer(), log.WithName("nodes"), schemav1.NewNode, cluster.Uuid)

var forwardForNotifications []syncv1.Feature
if cfg.Notifications.Url != "" {
Expand All @@ -340,7 +351,7 @@ func main() {
)

f := schemav1.NewPodFactory(clientset)
s := syncv1.NewSync(db, factory.Core().V1().Pods().Informer(), log.WithName("pods"), f.New, clusterUuid)
s := syncv1.NewSync(db, factory.Core().V1().Pods().Informer(), log.WithName("pods"), f.New, cluster.Uuid)

wg.Done()

Expand All @@ -354,7 +365,7 @@ func main() {
wg.Add(1)
g.Go(func() error {
s := syncv1.NewSync(
db, factory.Apps().V1().Deployments().Informer(), log.WithName("deployments"), schemav1.NewDeployment, clusterUuid)
db, factory.Apps().V1().Deployments().Informer(), log.WithName("deployments"), schemav1.NewDeployment, cluster.Uuid)

var forwardForNotifications []syncv1.Feature
if cfg.Notifications.Url != "" {
Expand All @@ -373,7 +384,7 @@ func main() {
wg.Add(1)
g.Go(func() error {
s := syncv1.NewSync(
db, factory.Apps().V1().DaemonSets().Informer(), log.WithName("daemon-sets"), schemav1.NewDaemonSet, clusterUuid)
db, factory.Apps().V1().DaemonSets().Informer(), log.WithName("daemon-sets"), schemav1.NewDaemonSet, cluster.Uuid)

var forwardForNotifications []syncv1.Feature
if cfg.Notifications.Url != "" {
Expand All @@ -392,7 +403,7 @@ func main() {
wg.Add(1)
g.Go(func() error {
s := syncv1.NewSync(
db, factory.Apps().V1().ReplicaSets().Informer(), log.WithName("replica-sets"), schemav1.NewReplicaSet, clusterUuid)
db, factory.Apps().V1().ReplicaSets().Informer(), log.WithName("replica-sets"), schemav1.NewReplicaSet, cluster.Uuid)

var forwardForNotifications []syncv1.Feature
if cfg.Notifications.Url != "" {
Expand All @@ -411,7 +422,7 @@ func main() {
wg.Add(1)
g.Go(func() error {
s := syncv1.NewSync(
db, factory.Apps().V1().StatefulSets().Informer(), log.WithName("stateful-sets"), schemav1.NewStatefulSet, clusterUuid)
db, factory.Apps().V1().StatefulSets().Informer(), log.WithName("stateful-sets"), schemav1.NewStatefulSet, cluster.Uuid)

var forwardForNotifications []syncv1.Feature
if cfg.Notifications.Url != "" {
Expand All @@ -428,60 +439,60 @@ func main() {
})

g.Go(func() error {
s := syncv1.NewSync(db, factory.Core().V1().Services().Informer(), log.WithName("services"), schemav1.NewService, clusterUuid)
s := syncv1.NewSync(db, factory.Core().V1().Services().Informer(), log.WithName("services"), schemav1.NewService, cluster.Uuid)

return s.Run(ctx)
})

g.Go(func() error {
s := syncv1.NewSync(db, factory.Discovery().V1().EndpointSlices().Informer(), log.WithName("endpoints"), schemav1.NewEndpointSlice, clusterUuid)
s := syncv1.NewSync(db, factory.Discovery().V1().EndpointSlices().Informer(), log.WithName("endpoints"), schemav1.NewEndpointSlice, cluster.Uuid)

return s.Run(ctx)
})

g.Go(func() error {
s := syncv1.NewSync(db, factory.Core().V1().Secrets().Informer(), log.WithName("secrets"), schemav1.NewSecret, clusterUuid)
s := syncv1.NewSync(db, factory.Core().V1().Secrets().Informer(), log.WithName("secrets"), schemav1.NewSecret, cluster.Uuid)
return s.Run(ctx)
})

g.Go(func() error {
s := syncv1.NewSync(db, factory.Core().V1().ConfigMaps().Informer(), log.WithName("config-maps"), schemav1.NewConfigMap, clusterUuid)
s := syncv1.NewSync(db, factory.Core().V1().ConfigMaps().Informer(), log.WithName("config-maps"), schemav1.NewConfigMap, cluster.Uuid)

return s.Run(ctx)
})

g.Go(func() error {
s := syncv1.NewSync(db, factory.Events().V1().Events().Informer(), log.WithName("events"), schemav1.NewEvent, clusterUuid)
s := syncv1.NewSync(db, factory.Events().V1().Events().Informer(), log.WithName("events"), schemav1.NewEvent, cluster.Uuid)

return s.Run(ctx, syncv1.WithNoDelete(), syncv1.WithNoWarumup())
})

g.Go(func() error {
s := syncv1.NewSync(db, factory.Core().V1().PersistentVolumeClaims().Informer(), log.WithName("pvcs"), schemav1.NewPvc, clusterUuid)
s := syncv1.NewSync(db, factory.Core().V1().PersistentVolumeClaims().Informer(), log.WithName("pvcs"), schemav1.NewPvc, cluster.Uuid)

return s.Run(ctx)
})

g.Go(func() error {
s := syncv1.NewSync(db, factory.Core().V1().PersistentVolumes().Informer(), log.WithName("persistent-volumes"), schemav1.NewPersistentVolume, clusterUuid)
s := syncv1.NewSync(db, factory.Core().V1().PersistentVolumes().Informer(), log.WithName("persistent-volumes"), schemav1.NewPersistentVolume, cluster.Uuid)

return s.Run(ctx)
})

g.Go(func() error {
s := syncv1.NewSync(db, factory.Batch().V1().Jobs().Informer(), log.WithName("jobs"), schemav1.NewJob, clusterUuid)
s := syncv1.NewSync(db, factory.Batch().V1().Jobs().Informer(), log.WithName("jobs"), schemav1.NewJob, cluster.Uuid)

return s.Run(ctx)
})

g.Go(func() error {
s := syncv1.NewSync(db, factory.Batch().V1().CronJobs().Informer(), log.WithName("cron-jobs"), schemav1.NewCronJob, clusterUuid)
s := syncv1.NewSync(db, factory.Batch().V1().CronJobs().Informer(), log.WithName("cron-jobs"), schemav1.NewCronJob, cluster.Uuid)

return s.Run(ctx)
})

g.Go(func() error {
s := syncv1.NewSync(db, factory.Networking().V1().Ingresses().Informer(), log.WithName("ingresses"), schemav1.NewIngress, clusterUuid)
s := syncv1.NewSync(db, factory.Networking().V1().Ingresses().Informer(), log.WithName("ingresses"), schemav1.NewIngress, cluster.Uuid)

return s.Run(ctx)
})
Expand Down
32 changes: 32 additions & 0 deletions pkg/schema/v1/cluster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package v1

import (
"context"
"github.com/icinga/icinga-go-library/types"
)

type contextKey string // Private type to prevent collisions with other context keys

// clusterContextKey is the key for Cluster values in contexts.
var clusterContextKey = contextKey("cluster")

type Cluster struct {
Uuid types.UUID
Name string
}

// NewClusterContext returns a new Context that carries this Cluster as value.
func (c *Cluster) NewClusterContext(parent context.Context) context.Context {
return context.WithValue(parent, clusterContextKey, c)
}

// ClusterFromContext returns the Cluster value stored in ctx, if any:
//
// e, ok := ClusterFromContext(ctx)
// if !ok {
// // Error handling.
// }
func ClusterFromContext(ctx context.Context) (*Cluster, bool) {
c, ok := ctx.Value(clusterContextKey).(*Cluster)
return c, ok
}

0 comments on commit bd82e38

Please sign in to comment.