Skip to content

Commit

Permalink
Support dependency/redundancy group state checksum on the fly
Browse files Browse the repository at this point in the history
  • Loading branch information
yhabteab committed Feb 26, 2025
1 parent 708e2bf commit 2736a9a
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 12 deletions.
7 changes: 7 additions & 0 deletions pkg/contracts/contracts.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,10 @@ func SafeInit(v any) {
initer.Init()
}
}

// Equaler is implemented by any entity that can be compared with another entity of the same type.
// The Equal method should return true if the receiver is equal to the other entity.
type Equaler interface {
// Equal returns whether the receiver is equal to the other entity.
Equal(other any) bool
}
20 changes: 13 additions & 7 deletions pkg/icingadb/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (delta *Delta) run(ctx context.Context, actualCh, desiredCh <-chan database
desired := EntitiesById{} // only read from desiredCh (so far)

var update EntitiesById
if delta.Subject.WithChecksum() {
if _, ok := delta.Subject.Entity().(contracts.Equaler); ok || delta.Subject.WithChecksum() {
update = EntitiesById{} // read from actualCh and desiredCh with mismatching checksums
}

Expand All @@ -70,7 +70,7 @@ func (delta *Delta) run(ctx context.Context, actualCh, desiredCh <-chan database
id := actualValue.ID().String()
if desiredValue, ok := desired[id]; ok {
delete(desired, id)
if update != nil && !checksumsMatch(actualValue, desiredValue) {
if update != nil && !entitiesEqual(actualValue, desiredValue) {
update[id] = desiredValue
}
} else {
Expand All @@ -88,7 +88,7 @@ func (delta *Delta) run(ctx context.Context, actualCh, desiredCh <-chan database
id := desiredValue.ID().String()
if actualValue, ok := actual[id]; ok {
delete(actual, id)
if update != nil && !checksumsMatch(actualValue, desiredValue) {
if update != nil && !entitiesEqual(actualValue, desiredValue) {
update[id] = desiredValue
}
} else {
Expand Down Expand Up @@ -117,8 +117,14 @@ func (delta *Delta) run(ctx context.Context, actualCh, desiredCh <-chan database
zap.Int("delete", len(delta.Delete)))
}

// checksumsMatch returns whether the checksums of two entities are the same.
// Both entities must implement contracts.Checksumer.
func checksumsMatch(a, b database.Entity) bool {
return cmp.Equal(a.(contracts.Checksumer).Checksum(), b.(contracts.Checksumer).Checksum())
// entitiesEqual returns whether the two entities are equal either based on their checksum or by comparing them.
//
// Both entities must either implement contracts.Checksumer or contracts.Equaler for this to work. If neither
// interface is implemented nor if both entities don't implement the same interface, this function will panic.
func entitiesEqual(a, b database.Entity) bool {
if _, ok := a.(contracts.Checksumer); ok {
return cmp.Equal(a.(contracts.Checksumer).Checksum(), b.(contracts.Checksumer).Checksum())
}

return a.(contracts.Equaler).Equal(b)
}
15 changes: 12 additions & 3 deletions pkg/icingadb/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,18 @@ func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error {
entitiesWithoutChecksum, errs := icingaredis.CreateEntities(ctx, delta.Subject.Factory(), pairs, runtime.NumCPU())
// Let errors from CreateEntities cancel our group.
com.ErrgroupReceive(g, errs)
entities, errs := icingaredis.SetChecksums(ctx, entitiesWithoutChecksum, delta.Update, runtime.NumCPU())
// Let errors from SetChecksums cancel our group.
com.ErrgroupReceive(g, errs)

var entities <-chan database.Entity
// Apply the checksums only if the sync subject supports it, i.e, it implements contracts.Checksumer.
// This is necessary because not only entities that implement contracts.Checksumer can be updated, but
// also entities that implement contracts.Equaler interface.
if delta.Subject.WithChecksum() {
entities, errs = icingaredis.SetChecksums(ctx, entitiesWithoutChecksum, delta.Update, runtime.NumCPU())
// Let errors from SetChecksums cancel our group.
com.ErrgroupReceive(g, errs)
} else {
entities = entitiesWithoutChecksum
}

g.Go(func() error {
// Using upsert here on purpose as this is the fastest way to do bulk updates.
Expand Down
27 changes: 25 additions & 2 deletions pkg/icingadb/v1/dependency.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package v1

import (
"bytes"
"github.com/google/go-cmp/cmp"
"github.com/icinga/icinga-go-library/database"
"github.com/icinga/icinga-go-library/types"
)
Expand All @@ -12,7 +14,7 @@ type Redundancygroup struct {
}

// TableName implements [database.TableNamer].
func (r Redundancygroup) TableName() string {
func (r *Redundancygroup) TableName() string {
return "redundancy_group"
}

Expand All @@ -26,10 +28,22 @@ type RedundancygroupState struct {
}

// TableName implements [database.TableNamer].
func (r RedundancygroupState) TableName() string {
func (r *RedundancygroupState) TableName() string {
return "redundancy_group_state"
}

// Equal implements the [contracts.Equaler] interface.
func (r *RedundancygroupState) Equal(other any) bool {
if other, ok := other.(*RedundancygroupState); ok {
return bytes.Equal(r.RedundancyGroupId, other.RedundancyGroupId) &&
cmp.Equal(r.Failed, other.Failed) &&
cmp.Equal(r.IsReachable, other.IsReachable) &&
r.LastStateChange.Time().Equal(other.LastStateChange.Time())
}

return false
}

type DependencyNode struct {
EntityWithoutChecksum `json:",inline"`
EnvironmentMeta `json:",inline"`
Expand All @@ -44,6 +58,15 @@ type DependencyEdgeState struct {
Failed types.Bool `json:"failed"`
}

// Equal implements the [contracts.Equaler] interface.
func (es *DependencyEdgeState) Equal(other any) bool {
if other, ok := other.(*DependencyEdgeState); ok {
return bytes.Equal(es.Id, other.Id) && cmp.Equal(es.Failed, other.Failed)
}

return false
}

type DependencyEdge struct {
EntityWithoutChecksum `json:",inline"`
EnvironmentMeta `json:",inline"`
Expand Down

0 comments on commit 2736a9a

Please sign in to comment.