Skip to content

Commit fcb0e57

Browse files
committed
fix: Fix status-writer in the Updater target
1 parent 1e0bb53 commit fcb0e57

File tree

4 files changed

+175
-40
lines changed

4 files changed

+175
-40
lines changed

pkg/pipeline/default_engine.go

Lines changed: 1 addition & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,7 @@ func (eng *defaultEngine) EvaluateStage(s *Stage, delta cache.Delta) ([]cache.De
273273

274274
vs := []unstruct{}
275275
for i, elem := range list {
276-
a := deepCopy(u)
276+
a := object.DeepCopyAny(u)
277277
v, ok := a.(unstruct)
278278
if !ok {
279279
return nil, errors.New("could not deepcopy object content")
@@ -730,27 +730,6 @@ func containsDelta(ds []cache.Delta, delta cache.Delta) bool {
730730
})
731731
}
732732

733-
func deepCopy(value any) any {
734-
switch v := value.(type) {
735-
case bool, int64, float64, string:
736-
return v
737-
case []any:
738-
newList := make([]any, len(v))
739-
for i, item := range v {
740-
newList[i] = deepCopy(item)
741-
}
742-
return newList
743-
case map[string]any:
744-
newMap := make(map[string]any)
745-
for k, item := range v {
746-
newMap[k] = deepCopy(item)
747-
}
748-
return newMap
749-
default:
750-
return v
751-
}
752-
}
753-
754733
func packDeltas(targetView string, deltaType cache.DeltaType, vs []unstruct) []cache.Delta {
755734
ret := []cache.Delta{}
756735
for _, v := range vs {

pkg/reconciler/reconciler_test.go

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -341,6 +341,7 @@ var _ = Describe("Reconciler", func() {
341341
// object.SetContent(view2, map[string]any{"a": int64(1), "b": int64(2)})
342342
object.SetContent(res, map[string]any{"b": int64(2)})
343343
Expect(object.DeepEqual(res, event.Object.(object.Object))).To(BeTrue())
344+
// Expect(res).To(Equal(event.Object.(object.Object)))
344345

345346
// Push a delete to the target
346347
err = target.Write(ctx, cache.Delta{Type: cache.Deleted, Object: view2})
@@ -361,7 +362,7 @@ var _ = Describe("Reconciler", func() {
361362
Expect(err).NotTo(HaveOccurred())
362363
Expect(mgr).NotTo(BeNil())
363364

364-
// Register target
365+
// Register an updater target
365366
group, version := "", "v1"
366367
target := NewTarget(mgr, opv1a1.Target{
367368
Resource: opv1a1.Resource{
@@ -391,10 +392,6 @@ var _ = Describe("Reconciler", func() {
391392
Version: "v1",
392393
Kind: "Pod",
393394
}))
394-
// make a Get so that we get a full object (tracker returns only a runtime.Object)
395-
396-
// getFromClient := &corev1.Pod{}
397-
// Expect(c.Get(ctx, client.ObjectKeyFromObject(pod), getFromClient)).NotTo(HaveOccurred())
398395
getFromClient, err := object.ConvertRuntimeObjectToClientObject(getFromTracker)
399396
Expect(err).NotTo(HaveOccurred())
400397
Expect(getFromClient.GetObjectKind().GroupVersionKind()).To(Equal(schema.GroupVersionKind{
@@ -413,8 +410,8 @@ var _ = Describe("Reconciler", func() {
413410
// Push update to the target
414411
newPod := object.DeepCopy(pod2)
415412
unstructured.RemoveNestedField(newPod.UnstructuredContent(), "spec", "containers")
416-
417413
unstructured.SetNestedField(newPod.UnstructuredContent(), "Always", "spec", "restartPolicy")
414+
418415
err = target.Write(ctx, cache.Delta{Type: cache.Updated, Object: newPod})
419416
Expect(err).NotTo(HaveOccurred())
420417

@@ -426,6 +423,7 @@ var _ = Describe("Reconciler", func() {
426423
Version: "v1",
427424
Kind: "Pod",
428425
}))
426+
429427
getFromClient, err = object.ConvertRuntimeObjectToClientObject(getFromTracker)
430428
Expect(err).NotTo(HaveOccurred())
431429
Expect(getFromClient.GetObjectKind().GroupVersionKind()).To(Equal(schema.GroupVersionKind{
@@ -439,6 +437,34 @@ var _ = Describe("Reconciler", func() {
439437
Expect(p.Spec.Containers).To(BeEmpty())
440438
Expect(p.Spec.RestartPolicy).To(Equal(corev1.RestartPolicy("Always")))
441439

440+
// Set the status
441+
unstructured.SetNestedField(newPod.UnstructuredContent(), map[string]any{
442+
"message": "testmessage",
443+
"reason": "testreason",
444+
}, "status")
445+
446+
err = target.Write(ctx, cache.Delta{Type: cache.Updated, Object: newPod})
447+
Expect(err).NotTo(HaveOccurred())
448+
449+
getFromTracker, err = tracker.Get(gvr, "testns", "testpod")
450+
Expect(err).NotTo(HaveOccurred())
451+
getFromClient, err = object.ConvertRuntimeObjectToClientObject(getFromTracker)
452+
Expect(err).NotTo(HaveOccurred())
453+
Expect(getFromClient.GetObjectKind().GroupVersionKind()).To(Equal(schema.GroupVersionKind{
454+
Group: "",
455+
Version: "v1",
456+
Kind: "Pod",
457+
}))
458+
p = getFromClient.(*corev1.Pod)
459+
Expect(p.GetName()).To(Equal("testpod"))
460+
Expect(p.GetNamespace()).To(Equal("testns"))
461+
Expect(p.Spec.Containers).To(BeEmpty())
462+
Expect(p.Spec.RestartPolicy).To(Equal(corev1.RestartPolicy("Always")))
463+
Expect(p.Status).To(Equal(corev1.PodStatus{
464+
Message: "testmessage",
465+
Reason: "testreason",
466+
}))
467+
442468
// Delete from the target
443469
err = target.Write(ctx, cache.Delta{Type: cache.Deleted, Object: newPod})
444470
Expect(err).NotTo(HaveOccurred())

pkg/reconciler/resource.go

Lines changed: 66 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import (
1212
"k8s.io/apimachinery/pkg/types"
1313
"k8s.io/apimachinery/pkg/util/json"
1414
"sigs.k8s.io/controller-runtime/pkg/client"
15-
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
1615
runtimeManager "sigs.k8s.io/controller-runtime/pkg/manager"
1716
runtimePredicate "sigs.k8s.io/controller-runtime/pkg/predicate"
1817
runtimeSource "sigs.k8s.io/controller-runtime/pkg/source"
@@ -232,7 +231,7 @@ func (t *target) update(ctx context.Context, delta cache.Delta) error {
232231

233232
//nolint:nolintlint
234233
switch delta.Type { //nolint:exhaustive
235-
case cache.Added, cache.Upserted:
234+
case cache.Added, cache.Upserted, cache.Updated, cache.Replaced:
236235
t.log.V(2).Info("add/upsert", "event-type", delta.Type, "object", client.ObjectKeyFromObject(delta.Object))
237236

238237
gvk, err := t.Resource.GetGVK()
@@ -244,24 +243,53 @@ func (t *target) update(ctx context.Context, delta cache.Delta) error {
244243
obj.SetName(delta.Object.GetName())
245244
obj.SetNamespace(delta.Object.GetNamespace())
246245

247-
if res, err := controllerutil.CreateOrUpdate(context.TODO(), c, obj, func() error {
248-
obj.SetUnstructuredContent(delta.Object.UnstructuredContent())
246+
// WARNING: the Update target cannot be used to delete labels and annotations, use
247+
// the Patcher target for that (this is because we don't want the user to remove
248+
// important labels/annotations accidentally and taking care of each in the
249+
// pipeline may be too difficult)
250+
//
251+
// Use our own CreateOrUpdate that will also update the status
252+
res, err := CreateOrUpdate(context.TODO(), c, obj, func() error {
253+
// remove stuff that's no longer there
254+
for k := range obj.UnstructuredContent() {
255+
if k == "metadata" {
256+
continue
257+
}
258+
if _, ok, _ := unstructured.NestedFieldNoCopy(delta.Object.UnstructuredContent(), k); !ok {
259+
unstructured.RemoveNestedField(obj.UnstructuredContent(), k)
260+
}
261+
}
262+
263+
// then update the content with new keys: metadata and status will be handled separately
264+
for k, v := range delta.Object.UnstructuredContent() {
265+
if k == "metadata" {
266+
continue
267+
}
268+
269+
if err := unstructured.SetNestedField(obj.UnstructuredContent(), v, k); err != nil {
270+
t.log.Error(err, "failed to update object field during update",
271+
"object", client.ObjectKeyFromObject(obj).String(), "key", k)
272+
continue
273+
}
274+
}
275+
276+
mergeMetadata(obj, delta.Object)
277+
278+
// restore metadata
249279
obj.SetGroupVersionKind(gvk)
250280
obj.SetName(delta.Object.GetName())
251281
obj.SetNamespace(delta.Object.GetNamespace())
282+
252283
return nil
253-
}); err != nil {
254-
return fmt.Errorf("create/update resource %s/%s failed with operation code %s: %w",
255-
delta.Object.GetNamespace(), delta.Object.GetName(), res, err)
284+
})
285+
286+
if err != nil {
287+
return fmt.Errorf("create/update resource %s failed with operation code %s: %w",
288+
client.ObjectKeyFromObject(delta.Object).String(), res, err)
256289
}
257290

258291
return nil
259292

260-
case cache.Updated, cache.Replaced:
261-
t.log.V(2).Info("update", "event-type", delta.Type, "object", client.ObjectKeyFromObject(delta.Object))
262-
263-
return c.Update(ctx, delta.Object)
264-
265293
case cache.Deleted:
266294
t.log.V(2).Info("delete", "event-type", delta.Type, "object", client.ObjectKeyFromObject(delta.Object))
267295

@@ -353,3 +381,29 @@ func removeNested(m map[string]any) map[string]any {
353381
}
354382
return result
355383
}
384+
385+
func mergeMetadata(obj, new object.Object) {
386+
labels := obj.GetLabels()
387+
newLabels := new.GetLabels()
388+
if newLabels != nil {
389+
if labels == nil {
390+
labels = map[string]string{}
391+
}
392+
for k, v := range newLabels {
393+
labels[k] = v
394+
}
395+
obj.SetLabels(labels)
396+
}
397+
398+
annotations := obj.GetAnnotations()
399+
newAnnotations := new.GetAnnotations()
400+
if newAnnotations != nil {
401+
if annotations == nil {
402+
annotations = map[string]string{}
403+
}
404+
for k, v := range newAnnotations {
405+
annotations[k] = v
406+
}
407+
obj.SetAnnotations(annotations)
408+
}
409+
}

pkg/reconciler/utils.go

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
package reconciler
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
apierrors "k8s.io/apimachinery/pkg/api/errors"
8+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
9+
"sigs.k8s.io/controller-runtime/pkg/client"
10+
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
11+
12+
"github.com/hsnlab/dcontroller/pkg/object"
13+
)
14+
15+
// CreateOrUpdate creates or updates the given object in the Kubernetes cluster. The object's
16+
// desired state must be reconciled with the existing state inside the passed in callback MutateFn.
17+
//
18+
// The MutateFn is called regardless of creating or updating an object.
19+
//
20+
// It returns the executed operation and an error.
21+
//
22+
// Note: this version differs from default controllerutil.CreateOrUpdate in two subtle ways
23+
// - it uses the unstructured API via object.Object
24+
// - changes made by MutateFn to the status subresource will be handled, changes to any other
25+
// sub-resource will be discarded
26+
// - errors produced by the `Create` branch (after a failed `Get`) will be ignored: this is to
27+
// make the function usable with spl// it clients in tests where updates to dot appear in Get
28+
func CreateOrUpdate(ctx context.Context, c client.Client, obj object.Object, f controllerutil.MutateFn) (controllerutil.OperationResult, error) {
29+
key := client.ObjectKeyFromObject(obj)
30+
if err := c.Get(ctx, key, obj); err != nil {
31+
if !apierrors.IsNotFound(err) {
32+
return controllerutil.OperationResultNone, err
33+
}
34+
if err := mutate(f, key, obj); err != nil {
35+
return controllerutil.OperationResultNone, err
36+
}
37+
if err := c.Create(ctx, obj); err != nil {
38+
// this is not an error: default to the update branch
39+
goto update
40+
}
41+
return controllerutil.OperationResultCreated, nil
42+
}
43+
44+
update:
45+
if err := mutate(f, key, obj); err != nil {
46+
return controllerutil.OperationResultNone, err
47+
}
48+
49+
// Update may rewrite out status
50+
newStatus, hasStatus, _ := unstructured.NestedMap(obj.UnstructuredContent(), "status")
51+
52+
if err := c.Update(ctx, obj); err != nil {
53+
return controllerutil.OperationResultNone, err
54+
}
55+
56+
// take care of the status just now
57+
if hasStatus {
58+
if err := unstructured.SetNestedMap(obj.UnstructuredContent(), newStatus, "status"); err == nil {
59+
if err := c.Status().Update(ctx, obj); err != nil {
60+
return controllerutil.OperationResultNone, err
61+
}
62+
}
63+
}
64+
65+
return controllerutil.OperationResultUpdated, nil
66+
}
67+
68+
func mutate(f controllerutil.MutateFn, key client.ObjectKey, obj client.Object) error {
69+
if err := f(); err != nil {
70+
return err
71+
}
72+
if newKey := client.ObjectKeyFromObject(obj); key != newKey {
73+
return fmt.Errorf("MutateFn cannot mutate object name and/or object namespace")
74+
}
75+
return nil
76+
}

0 commit comments

Comments
 (0)