Skip to content

Commit

Permalink
Introduce "sync-by-content" flag, implemented for secrets and configm…
Browse files Browse the repository at this point in the history
…aps (#352)

* Add initial "sync-by-content" mode

- Optionally ignore the "replicated-from-version" annotation and always compare by contents
- Implemented for secrets and config maps
- Add test for secrets for the new feature
- Change secret tests to use fake.Clientset instead of running in whatever happened to be your active KUBECONFIG(!)

* Refine the resource version updates in the fake client

* Remove WIP fake client usage

* Update README and mention new --sync-by-content flag

---------

Co-authored-by: Martin Helmich <[email protected]>
  • Loading branch information
stippi2 and martin-helmich authored Nov 6, 2024
1 parent d750403 commit 018b323
Show file tree
Hide file tree
Showing 9 changed files with 346 additions and 146 deletions.
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,17 @@ data: {}
The replicator will then copy the `data` attribute of the referenced object into the annotated object and keep them in
sync.

By default, the replicator adds an annotation `replicator.v1.mittwald.de/replicated-from-version` to the target object.
This annotation contains the resource-version of the source object at the time of replication.

##### Sync by Content

When the target object is re-applied with an empty `data` attribute, the replicator will not automatically perform replication.
The reason is that the target already has the `replicated-from-version` annotation with a matching source resource-version.
For Secrets and ConfigMaps, there is the option to synchronize _based on the content_, ignoring the `replicated-from-version` annotation.

To activate this mode, start the replicator with the `--sync-by-content` flag.

#### Special case: TLS secrets

Secrets of type `kubernetes.io/tls` are treated in a special way and need to have a `data["tls.crt"]` and a
Expand Down
1 change: 1 addition & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ type flags struct {
ReplicateRoles bool
ReplicateRoleBindings bool
ReplicateServiceAccounts bool
SyncByContent bool
}
44 changes: 24 additions & 20 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,30 +1,33 @@
module github.com/mittwald/kubernetes-replicator

go 1.22
go 1.22.0

toolchain go1.23.0

require (
github.com/hashicorp/go-multierror v1.1.1
github.com/pkg/errors v0.9.1
github.com/sirupsen/logrus v1.9.3
github.com/stretchr/testify v1.9.0
k8s.io/api v0.29.4
k8s.io/apimachinery v0.29.4
k8s.io/client-go v0.29.4
k8s.io/api v0.31.1
k8s.io/apimachinery v0.31.1
k8s.io/client-go v0.31.1
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
github.com/go-logr/logr v1.3.0 // indirect
github.com/fxamacker/cbor/v2 v2.7.0 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-openapi/jsonpointer v0.19.6 // indirect
github.com/go-openapi/jsonreference v0.20.2 // indirect
github.com/go-openapi/swag v0.22.3 // indirect
github.com/go-openapi/swag v0.22.4 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/gnostic-models v0.6.8 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/imdario/mergo v0.3.7 // indirect
github.com/josharian/intern v1.0.0 // indirect
Expand All @@ -33,23 +36,24 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/spf13/pflag v1.0.5 // indirect
golang.org/x/net v0.23.0 // indirect
golang.org/x/oauth2 v0.10.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/term v0.18.0 // indirect
golang.org/x/text v0.14.0 // indirect
github.com/x448/float16 v0.8.4 // indirect
golang.org/x/net v0.26.0 // indirect
golang.org/x/oauth2 v0.21.0 // indirect
golang.org/x/sys v0.21.0 // indirect
golang.org/x/term v0.21.0 // indirect
golang.org/x/text v0.16.0 // indirect
golang.org/x/time v0.3.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/protobuf v1.33.0 // indirect
google.golang.org/protobuf v1.34.2 // indirect
gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/klog/v2 v2.110.1 // indirect
k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00 // indirect
k8s.io/utils v0.0.0-20230726121419-3b25d923346b // indirect
k8s.io/klog/v2 v2.130.1 // indirect
k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 // indirect
k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect
sigs.k8s.io/yaml v1.3.0 // indirect
sigs.k8s.io/yaml v1.4.0 // indirect
)
104 changes: 54 additions & 50 deletions go.sum

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func init() {
flag.BoolVar(&f.ReplicateRoles, "replicate-roles", true, "Enable replication of roles")
flag.BoolVar(&f.ReplicateRoleBindings, "replicate-role-bindings", true, "Enable replication of role bindings")
flag.BoolVar(&f.ReplicateServiceAccounts, "replicate-service-accounts", true, "Enable replication of service accounts")
flag.BoolVar(&f.SyncByContent, "sync-by-content", false, "Always compare the contents of source and target resources and force them to be the same")
flag.Parse()

switch strings.ToUpper(strings.TrimSpace(f.LogLevel)) {
Expand Down Expand Up @@ -88,13 +89,13 @@ func main() {
client = kubernetes.NewForConfigOrDie(config)

if f.ReplicateSecrets {
secretRepl := secret.NewReplicator(client, f.ResyncPeriod, f.AllowAll)
secretRepl := secret.NewReplicator(client, f.ResyncPeriod, f.AllowAll, f.SyncByContent)
go secretRepl.Run()
enabledReplicators = append(enabledReplicators, secretRepl)
}

if f.ReplicateConfigMaps {
configMapRepl := configmap.NewReplicator(client, f.ResyncPeriod, f.AllowAll)
configMapRepl := configmap.NewReplicator(client, f.ResyncPeriod, f.AllowAll, f.SyncByContent)
go configMapRepl.Run()
enabledReplicators = append(enabledReplicators, configMapRepl)
}
Expand Down
42 changes: 34 additions & 8 deletions replicate/common/generic-replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ package common
import (
"context"
"fmt"
"k8s.io/apimachinery/pkg/labels"
"reflect"
"regexp"
"strconv"
"strings"
"time"

"k8s.io/apimachinery/pkg/labels"

"github.com/hashicorp/go-multierror"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
Expand All @@ -22,13 +23,14 @@ import (
)

type ReplicatorConfig struct {
Kind string
Client kubernetes.Interface
ResyncPeriod time.Duration
AllowAll bool
ListFunc cache.ListFunc
WatchFunc cache.WatchFunc
ObjType runtime.Object
Kind string
Client kubernetes.Interface
ResyncPeriod time.Duration
AllowAll bool
SyncByContent bool
ListFunc cache.ListFunc
WatchFunc cache.WatchFunc
ObjType runtime.Object
}

type UpdateFuncs struct {
Expand All @@ -44,6 +46,7 @@ type GenericReplicator struct {
Controller cache.Controller

DependencyMap map[string]map[string]interface{}
DependentMap map[string]string
UpdateFuncs UpdateFuncs

// ReplicateToList is a set that caches the names of all secrets that have a
Expand All @@ -60,6 +63,7 @@ func NewGenericReplicator(config ReplicatorConfig) *GenericReplicator {
repl := GenericReplicator{
ReplicatorConfig: config,
DependencyMap: make(map[string]map[string]interface{}),
DependentMap: make(map[string]string),
ReplicateToList: GenericMap[string, struct{}]{},
ReplicateToMatchingList: GenericMap[string, labels.Selector]{},
}
Expand Down Expand Up @@ -257,6 +261,24 @@ func (r *GenericReplicator) ResourceAdded(obj interface{}) {
logger.WithError(err).Error("failed to update cache")
}
}
source, ok := r.DependentMap[sourceKey]
if ok {
logger.Debugf("objectMeta %s has source %s", sourceKey, source)

sourceObject, exists, err := r.Store.GetByKey(source)
if err != nil {
logger.Debugf("could not get source %s %s: %s", r.Kind, source, err)
return
} else if !exists {
logger.Debugf("could not get source %s %s: does not exist", r.Kind, source)
return
}
targetMap := map[string]interface{}{MustGetKey(obj): ""}
if err := r.updateDependents(sourceObject, targetMap); err != nil {
logger.WithError(err).
Errorf("Failed to update cache for %s: %v", MustGetKey(objectMeta), err)
}
}

annotations := objectMeta.GetAnnotations()

Expand Down Expand Up @@ -323,6 +345,10 @@ func (r *GenericReplicator) resourceAddedReplicateFrom(sourceLocation string, ta

r.DependencyMap[sourceLocation][cacheKey] = nil

if _, ok := r.DependentMap[cacheKey]; !ok {
r.DependentMap[cacheKey] = sourceLocation
}

sourceObject, exists, err := r.Store.GetByKey(sourceLocation)
if err != nil {
return errors.Wrapf(err, "Could not get source %s: %v", sourceLocation, err)
Expand Down
47 changes: 38 additions & 9 deletions replicate/configmap/configmaps.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package configmap

import (
"bytes"
"context"
"encoding/json"
"fmt"
Expand All @@ -25,14 +26,15 @@ type Replicator struct {
}

// NewReplicator creates a new config map replicator
func NewReplicator(client kubernetes.Interface, resyncPeriod time.Duration, allowAll bool) common.Replicator {
func NewReplicator(client kubernetes.Interface, resyncPeriod time.Duration, allowAll, syncByContent bool) common.Replicator {
repl := Replicator{
GenericReplicator: common.NewGenericReplicator(common.ReplicatorConfig{
Kind: "ConfigMap",
ObjType: &v1.ConfigMap{},
AllowAll: allowAll,
ResyncPeriod: resyncPeriod,
Client: client,
Kind: "ConfigMap",
ObjType: &v1.ConfigMap{},
AllowAll: allowAll,
SyncByContent: syncByContent,
ResyncPeriod: resyncPeriod,
Client: client,
ListFunc: func(lo metav1.ListOptions) (runtime.Object, error) {
return client.CoreV1().ConfigMaps("").List(context.TODO(), lo)
},
Expand Down Expand Up @@ -65,7 +67,7 @@ func (r *Replicator) ReplicateDataFrom(sourceObj interface{}, targetObj interfac
targetVersion, ok := target.Annotations[common.ReplicatedFromVersionAnnotation]
sourceVersion := source.ResourceVersion

if ok && targetVersion == sourceVersion {
if ok && targetVersion == sourceVersion && !r.SyncByContent {
logger.Debugf("target %s is already up-to-date", common.MustGetKey(target))
return nil
}
Expand All @@ -78,17 +80,38 @@ func (r *Replicator) ReplicateDataFrom(sourceObj interface{}, targetObj interfac
prevKeys, hasPrevKeys := common.PreviouslyPresentKeys(&targetCopy.ObjectMeta)
replicatedKeys := make([]string, 0)

dataChanged := false
for key, value := range source.Data {
oldValue, ok := targetCopy.Data[key]
if ok {
if strings.Compare(value, oldValue) != 0 {
dataChanged = true
}
} else {
dataChanged = true
}
targetCopy.Data[key] = value

replicatedKeys = append(replicatedKeys, key)
delete(prevKeys, key)
}

if source.BinaryData != nil {
targetCopy.BinaryData = make(map[string][]byte)
if targetCopy.BinaryData == nil {
targetCopy.BinaryData = make(map[string][]byte)
}
for key, value := range source.BinaryData {
targetCopy.BinaryData[key] = value
newValue := make([]byte, len(value))
copy(newValue, value)
oldValue, ok := targetCopy.BinaryData[key]
if ok {
if bytes.Compare(newValue, oldValue) != 0 {
dataChanged = true
}
} else {
dataChanged = true
}
targetCopy.BinaryData[key] = newValue

replicatedKeys = append(replicatedKeys, key)
delete(prevKeys, key)
Expand All @@ -100,9 +123,15 @@ func (r *Replicator) ReplicateDataFrom(sourceObj interface{}, targetObj interfac
logger.Debugf("removing previously present key %s: not present in source any more", k)
delete(targetCopy.Data, k)
delete(targetCopy.BinaryData, k)
dataChanged = true
}
}

if !dataChanged {
logger.Debugf("target values of %s are already up-to-date", common.MustGetKey(target))
return nil
}

sort.Strings(replicatedKeys)

logger.Infof("updating config map %s/%s", target.Namespace, target.Name)
Expand Down
31 changes: 24 additions & 7 deletions replicate/secret/secrets.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package secret

import (
"bytes"
"context"
"encoding/json"
"fmt"
Expand All @@ -25,14 +26,15 @@ type Replicator struct {
}

// NewReplicator creates a new secret replicator
func NewReplicator(client kubernetes.Interface, resyncPeriod time.Duration, allowAll bool) common.Replicator {
func NewReplicator(client kubernetes.Interface, resyncPeriod time.Duration, allowAll, syncByContent bool) common.Replicator {
repl := Replicator{
GenericReplicator: common.NewGenericReplicator(common.ReplicatorConfig{
Kind: "Secret",
ObjType: &v1.Secret{},
AllowAll: allowAll,
ResyncPeriod: resyncPeriod,
Client: client,
Kind: "Secret",
ObjType: &v1.Secret{},
AllowAll: allowAll,
SyncByContent: syncByContent,
ResyncPeriod: resyncPeriod,
Client: client,
ListFunc: func(lo metav1.ListOptions) (runtime.Object, error) {
return client.CoreV1().Secrets("").List(context.TODO(), lo)
},
Expand Down Expand Up @@ -69,7 +71,7 @@ func (r *Replicator) ReplicateDataFrom(sourceObj interface{}, targetObj interfac
targetVersion, ok := target.Annotations[common.ReplicatedFromVersionAnnotation]
sourceVersion := source.ResourceVersion

if ok && targetVersion == sourceVersion {
if ok && targetVersion == sourceVersion && !r.SyncByContent {
logger.Debugf("target %s is already up-to-date", common.MustGetKey(target))
return nil
}
Expand All @@ -82,9 +84,18 @@ func (r *Replicator) ReplicateDataFrom(sourceObj interface{}, targetObj interfac
prevKeys, hasPrevKeys := common.PreviouslyPresentKeys(&targetCopy.ObjectMeta)
replicatedKeys := make([]string, 0)

dataChanged := false
for key, value := range source.Data {
newValue := make([]byte, len(value))
copy(newValue, value)
oldValue, ok := targetCopy.Data[key]
if ok {
if bytes.Compare(newValue, oldValue) != 0 {
dataChanged = true
}
} else {
dataChanged = true
}
targetCopy.Data[key] = newValue

replicatedKeys = append(replicatedKeys, key)
Expand All @@ -95,9 +106,15 @@ func (r *Replicator) ReplicateDataFrom(sourceObj interface{}, targetObj interfac
for k := range prevKeys {
logger.Debugf("removing previously present key %s: not present in source any more", k)
delete(targetCopy.Data, k)
dataChanged = true
}
}

if !dataChanged {
logger.Debugf("target values of %s are already up-to-date", common.MustGetKey(target))
return nil
}

sort.Strings(replicatedKeys)

logger.Infof("updating target %s", common.MustGetKey(target))
Expand Down
Loading

0 comments on commit 018b323

Please sign in to comment.