Skip to content

Commit

Permalink
test: tests and code refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
matteogastaldello committed Oct 31, 2024
1 parent 5ca412b commit 18d4a8d
Show file tree
Hide file tree
Showing 8 changed files with 237 additions and 108 deletions.
63 changes: 44 additions & 19 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/google/go-cmp/cmp"
"github.com/krateoplatformops/unstructured-runtime/pkg/controller/event"
"github.com/krateoplatformops/unstructured-runtime/pkg/controller/objectref"
eventrec "github.com/krateoplatformops/unstructured-runtime/pkg/event"
"github.com/krateoplatformops/unstructured-runtime/pkg/listwatcher"
Expand All @@ -31,6 +32,30 @@ const (
reasonReconciliationFailed eventrec.Reason = "ReconciliationFailed"
)

// An ExternalClient manages the lifecycle of an external resource.
// None of the calls here should be blocking. All of the calls should be
// idempotent. For example, Create call should not return AlreadyExists error
// if it's called again with the same parameters or Delete call should not
// return error if there is an ongoing deletion or resource does not exist.
type ExternalClient interface {
Observe(ctx context.Context, mg *unstructured.Unstructured) (bool, error)
Create(ctx context.Context, mg *unstructured.Unstructured) error
Update(ctx context.Context, mg *unstructured.Unstructured) error
Delete(ctx context.Context, mg *unstructured.Unstructured) error
}

// An ExternalObservation is the result of an observation of an external resource.
type ExternalObservation struct {
// ResourceExists must be true if a corresponding external resource exists
// for the managed resource.
ResourceExists bool

// ResourceUpToDate should be true if the corresponding external resource
// appears to be up-to-date - i.e. updating the external resource to match
// the desired state of the managed resource would be a no-op.
ResourceUpToDate bool
}

type ListWatcherConfiguration struct {
LabelSelector *string
FieldSelector *string
Expand All @@ -52,7 +77,7 @@ type Controller struct {
dynamicClient dynamic.Interface
discoveryClient discovery.DiscoveryInterface
gvr schema.GroupVersionResource
queue workqueue.TypedRateLimitingInterface[Event]
queue workqueue.TypedRateLimitingInterface[event.Event]
items *sync.Map
informer cache.Controller
recorder eventrec.Recorder
Expand All @@ -62,13 +87,13 @@ type Controller struct {

func New(sid *shortid.Shortid, opts Options) *Controller {
rateLimiter := workqueue.NewTypedMaxOfRateLimiter(
workqueue.NewTypedItemExponentialFailureRateLimiter[Event](3*time.Second, 180*time.Second),
workqueue.NewTypedItemExponentialFailureRateLimiter[event.Event](3*time.Second, 180*time.Second),
// 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
&workqueue.TypedBucketRateLimiter[Event]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
&workqueue.TypedBucketRateLimiter[event.Event]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
)

queue := workqueue.NewTypedRateLimitingQueue(rateLimiter)
workqueue.NewTypedRateLimitingQueueWithConfig(rateLimiter, workqueue.TypedRateLimitingQueueConfig[Event]{})
workqueue.NewTypedRateLimitingQueueWithConfig(rateLimiter, workqueue.TypedRateLimitingQueueConfig[event.Event]{})
items := &sync.Map{}

lw, err := listwatcher.Create(listwatcher.CreateOption{
Expand Down Expand Up @@ -103,17 +128,17 @@ func New(sid *shortid.Shortid, opts Options) *Controller {
return
}

item := Event{
item := event.Event{
Id: id,
EventType: Observe,
EventType: event.Observe,
ObjectRef: objectref.ObjectRef{
APIVersion: el.GetAPIVersion(),
Kind: el.GetKind(),
Name: el.GetName(),
Namespace: el.GetNamespace(),
},
}
dig := digestForEvent(item)
dig := event.DigestForEvent(item)

if _, loaded := items.LoadOrStore(dig, struct{}{}); !loaded {
queue.Add(item)
Expand Down Expand Up @@ -141,9 +166,9 @@ func New(sid *shortid.Shortid, opts Options) *Controller {
if !newUns.GetDeletionTimestamp().IsZero() {
opts.Logger.Debug(fmt.Sprintf("UpdateFunc: object %s/%s is being deleted", newUns.GetNamespace(), newUns.GetName()))

item := Event{
item := event.Event{
Id: id,
EventType: Delete,
EventType: event.Delete,
ObjectRef: objectref.ObjectRef{
APIVersion: newUns.GetAPIVersion(),
Kind: newUns.GetKind(),
Expand All @@ -152,7 +177,7 @@ func New(sid *shortid.Shortid, opts Options) *Controller {
},
}

dig := digestForEvent(item)
dig := event.DigestForEvent(item)

if _, loaded := items.LoadOrStore(dig, struct{}{}); !loaded {
queue.Add(item)
Expand All @@ -174,25 +199,25 @@ func New(sid *shortid.Shortid, opts Options) *Controller {
diff := cmp.Diff(newSpec, oldSpec)
opts.Logger.Debug(fmt.Sprintf("UpdateFunc: comparing current spec with desired spec: %s", diff))
if len(diff) > 0 {
item := Event{
item := event.Event{
Id: id,
EventType: Update,
EventType: event.Update,
ObjectRef: objectref.ObjectRef{
APIVersion: newUns.GetAPIVersion(),
Kind: newUns.GetKind(),
Name: newUns.GetName(),
Namespace: newUns.GetNamespace(),
}}

dig := digestForEvent(item)
dig := event.DigestForEvent(item)

if _, loaded := items.LoadOrStore(dig, struct{}{}); !loaded {
queue.Add(item)
}
} else {
item := Event{
item := event.Event{
Id: id,
EventType: Observe,
EventType: event.Observe,
ObjectRef: objectref.ObjectRef{
APIVersion: newUns.GetAPIVersion(),
Kind: newUns.GetKind(),
Expand All @@ -201,7 +226,7 @@ func New(sid *shortid.Shortid, opts Options) *Controller {
},
}

dig := digestForEvent(item)
dig := event.DigestForEvent(item)

if _, loaded := items.Load(dig); !loaded {
time.AfterFunc(opts.ResyncInterval, func() {
Expand Down Expand Up @@ -236,17 +261,17 @@ func New(sid *shortid.Shortid, opts Options) *Controller {
return
}

item := Event{
item := event.Event{
Id: id,
EventType: Delete,
EventType: event.Delete,
ObjectRef: objectref.ObjectRef{
APIVersion: el.GetAPIVersion(),
Kind: el.GetKind(),
Name: el.GetName(),
Namespace: el.GetNamespace(),
},
}
dig := digestForEvent(item)
dig := event.DigestForEvent(item)

if _, loaded := items.LoadOrStore(dig, struct{}{}); !loaded {
queue.Add(item)
Expand Down
29 changes: 0 additions & 29 deletions pkg/controller/digest.go

This file was deleted.

47 changes: 0 additions & 47 deletions pkg/controller/event.go

This file was deleted.

39 changes: 39 additions & 0 deletions pkg/controller/event/digest.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package event

import (
"fmt"
"strconv"

"github.com/krateoplatformops/unstructured-runtime/pkg/controller/objectref"
"github.com/twmb/murmur3"
)

type cleanEvent struct {
eventType string
objectRef objectref.ObjectRef
}

// digestForEvent generates a hash digest for a given event.
// It uses the Murmur3 hashing algorithm to create a 64-bit hash value.
// The event is first cleaned and then converted to a byte slice before hashing.
// The resulting hash is returned as a hexadecimal string.
//
// Parameters:
// - ev: The event for which the digest is to be generated.
//
// Returns:
// - A hexadecimal string representing the hash digest of the event.
func DigestForEvent(ev Event) string {
hasher := murmur3.New64()

cleanEvent := cleanEvent{
eventType: string(ev.EventType),
objectRef: ev.ObjectRef,
}

bin_buf := []byte(fmt.Sprintf("%v", cleanEvent))

hasher.Write(bin_buf)

return strconv.FormatUint(hasher.Sum64(), 16)
}
48 changes: 48 additions & 0 deletions pkg/controller/event/digest_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package event

import (
"testing"

"github.com/krateoplatformops/unstructured-runtime/pkg/controller/objectref"
"github.com/stretchr/testify/assert"
)

func TestDigestForEvent(t *testing.T) {
tests := []struct {
name string
event Event
expected string
}{
{
name: "Test event 1",
event: Event{
EventType: "create",
ObjectRef: objectref.ObjectRef{
Kind: "Pod",
Namespace: "default",
Name: "mypod",
},
},
expected: "2555f9c98d0663c1", // Replace with the actual expected hash
},
{
name: "Test event 2",
event: Event{
EventType: "delete",
ObjectRef: objectref.ObjectRef{
Kind: "Service",
Namespace: "default",
Name: "myservice",
},
},
expected: "c515572a42a933fd", // Replace with the actual expected hash
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
actual := DigestForEvent(tt.event)
assert.Equal(t, tt.expected, actual)
})
}
}
20 changes: 20 additions & 0 deletions pkg/controller/event/event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package event

import (
"github.com/krateoplatformops/unstructured-runtime/pkg/controller/objectref"
)

type EventType string

const (
Observe EventType = "Observe"
Create EventType = "Create"
Update EventType = "Update"
Delete EventType = "Delete"
)

type Event struct {
Id string
EventType EventType
ObjectRef objectref.ObjectRef
}
Loading

0 comments on commit 18d4a8d

Please sign in to comment.