From bd82e38234870487f379b59f5ae5e5937c3d3924 Mon Sep 17 00:00:00 2001 From: Jonada Hoxha Date: Thu, 28 Nov 2024 12:51:56 +0100 Subject: [PATCH] Sync clusters --- cmd/icinga-kubernetes/main.go | 61 +++++++++++++++++++++-------------- pkg/schema/v1/cluster.go | 32 ++++++++++++++++++ 2 files changed, 68 insertions(+), 25 deletions(-) create mode 100644 pkg/schema/v1/cluster.go diff --git a/cmd/icinga-kubernetes/main.go b/cmd/icinga-kubernetes/main.go index df58642..74addf2 100644 --- a/cmd/icinga-kubernetes/main.go +++ b/cmd/icinga-kubernetes/main.go @@ -40,10 +40,6 @@ import ( "time" ) -type clusterContextKeyType string - -const clusterContextKey clusterContextKeyType = "clusterContextKey" - const expectedSchemaVersion = "0.2.0" func main() { @@ -123,15 +119,30 @@ func main() { g, ctx := errgroup.WithContext(context.Background()) + kubeconfigPath := kclientcmd.NewDefaultClientConfigLoadingRules().GetDefaultFilename() + kubeconfig, err := kclientcmd.LoadFromFile(kubeconfigPath) + if err != nil { + klog.Fatalf("Failed to load kubeconfig: %v", err) + } + + name := kubeconfig.CurrentContext namespaceName := "kube-system" - ns, err := clientset.CoreV1().Namespaces().Get(ctx, namespaceName, v1.GetOptions{}) + ns, err := clientset.CoreV1().Namespaces().Get(context.TODO(), namespaceName, v1.GetOptions{}) if err != nil { - klog.Fatalf("Failed to retrieve namespace '%s': %v. Ensure the cluster is accessible and the namespace exists.", namespaceName, err) + klog.Fatalf("Failed to retrieve namespace '%s' for cluster '%s': %v", namespaceName, name, err) + } + + cluster := &schemav1.Cluster{ + Uuid: schemav1.EnsureUUID(ns.UID), + Name: name, } - clusterUuid := schemav1.EnsureUUID(ns.UID) + stmt, _ := db.BuildUpsertStmt(cluster) + if _, err := db.NamedExecContext(ctx, stmt, cluster); err != nil { + klog.Error(errors.Wrap(err, "can't update cluster")) + } - ctx = context.WithValue(ctx, clusterContextKey, clusterUuid) + ctx = cluster.NewClusterContext(ctx) if hasSchema { var version string @@ -304,7 +315,7 @@ func main() { } g.Go(func() error { - s := syncv1.NewSync(db, factory.Core().V1().Namespaces().Informer(), log.WithName("namespaces"), schemav1.NewNamespace, clusterUuid) + s := syncv1.NewSync(db, factory.Core().V1().Namespaces().Informer(), log.WithName("namespaces"), schemav1.NewNamespace, cluster.Uuid) return s.Run(ctx) }) @@ -313,7 +324,7 @@ func main() { wg.Add(1) g.Go(func() error { - s := syncv1.NewSync(db, factory.Core().V1().Nodes().Informer(), log.WithName("nodes"), schemav1.NewNode, clusterUuid) + s := syncv1.NewSync(db, factory.Core().V1().Nodes().Informer(), log.WithName("nodes"), schemav1.NewNode, cluster.Uuid) var forwardForNotifications []syncv1.Feature if cfg.Notifications.Url != "" { @@ -340,7 +351,7 @@ func main() { ) f := schemav1.NewPodFactory(clientset) - s := syncv1.NewSync(db, factory.Core().V1().Pods().Informer(), log.WithName("pods"), f.New, clusterUuid) + s := syncv1.NewSync(db, factory.Core().V1().Pods().Informer(), log.WithName("pods"), f.New, cluster.Uuid) wg.Done() @@ -354,7 +365,7 @@ func main() { wg.Add(1) g.Go(func() error { s := syncv1.NewSync( - db, factory.Apps().V1().Deployments().Informer(), log.WithName("deployments"), schemav1.NewDeployment, clusterUuid) + db, factory.Apps().V1().Deployments().Informer(), log.WithName("deployments"), schemav1.NewDeployment, cluster.Uuid) var forwardForNotifications []syncv1.Feature if cfg.Notifications.Url != "" { @@ -373,7 +384,7 @@ func main() { wg.Add(1) g.Go(func() error { s := syncv1.NewSync( - db, factory.Apps().V1().DaemonSets().Informer(), log.WithName("daemon-sets"), schemav1.NewDaemonSet, clusterUuid) + db, factory.Apps().V1().DaemonSets().Informer(), log.WithName("daemon-sets"), schemav1.NewDaemonSet, cluster.Uuid) var forwardForNotifications []syncv1.Feature if cfg.Notifications.Url != "" { @@ -392,7 +403,7 @@ func main() { wg.Add(1) g.Go(func() error { s := syncv1.NewSync( - db, factory.Apps().V1().ReplicaSets().Informer(), log.WithName("replica-sets"), schemav1.NewReplicaSet, clusterUuid) + db, factory.Apps().V1().ReplicaSets().Informer(), log.WithName("replica-sets"), schemav1.NewReplicaSet, cluster.Uuid) var forwardForNotifications []syncv1.Feature if cfg.Notifications.Url != "" { @@ -411,7 +422,7 @@ func main() { wg.Add(1) g.Go(func() error { s := syncv1.NewSync( - db, factory.Apps().V1().StatefulSets().Informer(), log.WithName("stateful-sets"), schemav1.NewStatefulSet, clusterUuid) + db, factory.Apps().V1().StatefulSets().Informer(), log.WithName("stateful-sets"), schemav1.NewStatefulSet, cluster.Uuid) var forwardForNotifications []syncv1.Feature if cfg.Notifications.Url != "" { @@ -428,60 +439,60 @@ func main() { }) g.Go(func() error { - s := syncv1.NewSync(db, factory.Core().V1().Services().Informer(), log.WithName("services"), schemav1.NewService, clusterUuid) + s := syncv1.NewSync(db, factory.Core().V1().Services().Informer(), log.WithName("services"), schemav1.NewService, cluster.Uuid) return s.Run(ctx) }) g.Go(func() error { - s := syncv1.NewSync(db, factory.Discovery().V1().EndpointSlices().Informer(), log.WithName("endpoints"), schemav1.NewEndpointSlice, clusterUuid) + s := syncv1.NewSync(db, factory.Discovery().V1().EndpointSlices().Informer(), log.WithName("endpoints"), schemav1.NewEndpointSlice, cluster.Uuid) return s.Run(ctx) }) g.Go(func() error { - s := syncv1.NewSync(db, factory.Core().V1().Secrets().Informer(), log.WithName("secrets"), schemav1.NewSecret, clusterUuid) + s := syncv1.NewSync(db, factory.Core().V1().Secrets().Informer(), log.WithName("secrets"), schemav1.NewSecret, cluster.Uuid) return s.Run(ctx) }) g.Go(func() error { - s := syncv1.NewSync(db, factory.Core().V1().ConfigMaps().Informer(), log.WithName("config-maps"), schemav1.NewConfigMap, clusterUuid) + s := syncv1.NewSync(db, factory.Core().V1().ConfigMaps().Informer(), log.WithName("config-maps"), schemav1.NewConfigMap, cluster.Uuid) return s.Run(ctx) }) g.Go(func() error { - s := syncv1.NewSync(db, factory.Events().V1().Events().Informer(), log.WithName("events"), schemav1.NewEvent, clusterUuid) + s := syncv1.NewSync(db, factory.Events().V1().Events().Informer(), log.WithName("events"), schemav1.NewEvent, cluster.Uuid) return s.Run(ctx, syncv1.WithNoDelete(), syncv1.WithNoWarumup()) }) g.Go(func() error { - s := syncv1.NewSync(db, factory.Core().V1().PersistentVolumeClaims().Informer(), log.WithName("pvcs"), schemav1.NewPvc, clusterUuid) + s := syncv1.NewSync(db, factory.Core().V1().PersistentVolumeClaims().Informer(), log.WithName("pvcs"), schemav1.NewPvc, cluster.Uuid) return s.Run(ctx) }) g.Go(func() error { - s := syncv1.NewSync(db, factory.Core().V1().PersistentVolumes().Informer(), log.WithName("persistent-volumes"), schemav1.NewPersistentVolume, clusterUuid) + s := syncv1.NewSync(db, factory.Core().V1().PersistentVolumes().Informer(), log.WithName("persistent-volumes"), schemav1.NewPersistentVolume, cluster.Uuid) return s.Run(ctx) }) g.Go(func() error { - s := syncv1.NewSync(db, factory.Batch().V1().Jobs().Informer(), log.WithName("jobs"), schemav1.NewJob, clusterUuid) + s := syncv1.NewSync(db, factory.Batch().V1().Jobs().Informer(), log.WithName("jobs"), schemav1.NewJob, cluster.Uuid) return s.Run(ctx) }) g.Go(func() error { - s := syncv1.NewSync(db, factory.Batch().V1().CronJobs().Informer(), log.WithName("cron-jobs"), schemav1.NewCronJob, clusterUuid) + s := syncv1.NewSync(db, factory.Batch().V1().CronJobs().Informer(), log.WithName("cron-jobs"), schemav1.NewCronJob, cluster.Uuid) return s.Run(ctx) }) g.Go(func() error { - s := syncv1.NewSync(db, factory.Networking().V1().Ingresses().Informer(), log.WithName("ingresses"), schemav1.NewIngress, clusterUuid) + s := syncv1.NewSync(db, factory.Networking().V1().Ingresses().Informer(), log.WithName("ingresses"), schemav1.NewIngress, cluster.Uuid) return s.Run(ctx) }) diff --git a/pkg/schema/v1/cluster.go b/pkg/schema/v1/cluster.go new file mode 100644 index 0000000..bc33036 --- /dev/null +++ b/pkg/schema/v1/cluster.go @@ -0,0 +1,32 @@ +package v1 + +import ( + "context" + "github.com/icinga/icinga-go-library/types" +) + +type contextKey string // Private type to prevent collisions with other context keys + +// clusterContextKey is the key for Cluster values in contexts. +var clusterContextKey = contextKey("cluster") + +type Cluster struct { + Uuid types.UUID + Name string +} + +// NewClusterContext returns a new Context that carries this Cluster as value. +func (c *Cluster) NewClusterContext(parent context.Context) context.Context { + return context.WithValue(parent, clusterContextKey, c) +} + +// ClusterFromContext returns the Cluster value stored in ctx, if any: +// +// e, ok := ClusterFromContext(ctx) +// if !ok { +// // Error handling. +// } +func ClusterFromContext(ctx context.Context) (*Cluster, bool) { + c, ok := ctx.Value(clusterContextKey).(*Cluster) + return c, ok +}