Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cleanup: Stop updating ManageConflictErrors for a remote RSync #1538

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 0 additions & 49 deletions pkg/parse/root_sync_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,52 +592,3 @@ func summarizeErrorsForCommit(sourceStatus v1beta1.SourceStatus, renderingStatus
}
return errorSources, errorSummary
}

// prependRootSyncRemediatorStatus adds the conflict error detected by the remediator to the front of the sync errors.
func prependRootSyncRemediatorStatus(ctx context.Context, c client.Client, syncName string, conflictErrs []status.ManagementConflictError, denominator int) status.Error {
if denominator <= 0 {
return status.InternalErrorf("denominator must be positive: %d", denominator)
}

var rs v1beta1.RootSync
if err := c.Get(ctx, rootsync.ObjectKey(syncName), &rs); err != nil {
return status.APIServerError(err, "failed to get RootSync: "+syncName)
}

// Skip updating if already reported by this reconciler or the remote reconciler (inverted)
var errs []v1beta1.ConfigSyncError
for _, conflictErr := range conflictErrs {
cse := conflictErr.ToCSE()
invertedCSE := conflictErr.Invert().ToCSE()
errorFound := false
for _, e := range rs.Status.Sync.Errors {
if e.Code == status.ManagementConflictErrorCode && (e.ErrorMessage == cse.ErrorMessage || e.ErrorMessage == invertedCSE.ErrorMessage) {
errorFound = true
break
}
}
if !errorFound {
errs = append(errs, cse)
}
}

// No new errors, so no update
if len(errs) == 0 {
return nil
}

// Add the remeditor conflict errors before other sync errors for more visibility.
errs = append(errs, rs.Status.Sync.Errors...)
setSyncStatusErrors(&rs.Status.Status, errs, denominator)
rs.Status.Sync.LastUpdate = metav1.Now()

if err := c.Status().Update(ctx, &rs, client.FieldOwner(configsync.FieldManager)); err != nil {
// If the update failure was caused by the size of the RootSync object, we would truncate the errors and retry.
if isRequestTooLargeError(err) {
klog.Infof("Failed to update RootSync sync status (total error count: %d, denominator: %d): %s.", rs.Status.Sync.ErrorSummary.TotalCount, denominator, err)
return prependRootSyncRemediatorStatus(ctx, c, syncName, conflictErrs, denominator*2)
}
return status.APIServerError(err, "failed to update RootSync sync status")
}
return nil
}
125 changes: 0 additions & 125 deletions pkg/parse/root_sync_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,10 @@
package parse

import (
"context"
"testing"

"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/require"
"kpt.dev/configsync/pkg/api/configsync/v1beta1"
"kpt.dev/configsync/pkg/applier"
"kpt.dev/configsync/pkg/core"
"kpt.dev/configsync/pkg/core/k8sobjects"
"kpt.dev/configsync/pkg/metadata"
"kpt.dev/configsync/pkg/rootsync"
"kpt.dev/configsync/pkg/status"
syncertest "kpt.dev/configsync/pkg/syncer/syncertest/fake"
"sigs.k8s.io/cli-utils/pkg/testutil"
)

func TestSummarizeErrors(t *testing.T) {
Expand Down Expand Up @@ -460,118 +450,3 @@ func TestSummarizeErrors(t *testing.T) {
})
}
}

func TestPrependRootSyncRemediatorStatus(t *testing.T) {
const rootSyncName = "my-root-sync"
const thisManager = "this-manager"
const otherManager = "other-manager"
conflictingObject := k8sobjects.NamespaceObject("foo-ns", core.Annotation(metadata.ResourceManagerKey, otherManager))
conflictAB := status.ManagementConflictErrorWrap(conflictingObject, thisManager)
invertedObject := k8sobjects.NamespaceObject("foo-ns", core.Annotation(metadata.ResourceManagerKey, thisManager))
conflictBA := status.ManagementConflictErrorWrap(invertedObject, otherManager)
conflictABInverted := conflictAB.Invert()
// KptManagementConflictError is created with the desired object, not the current live object.
// So its manager annotation matches the reconciler doing the applying.
// This is the opposite of the objects passed to ManagementConflictErrorWrap.
kptConflictError := applier.KptManagementConflictError(invertedObject)
// Assert the value of each error message to make each value clear
const conflictABMessage = `KNV1060: The "this-manager" reconciler detected a management conflict with the "other-manager" reconciler. Remove the object from one of the sources of truth so that the object is only managed by one reconciler.

metadata.name: foo-ns
group:
version: v1
kind: Namespace

For more information, see https://g.co/cloud/acm-errors#knv1060`
const conflictBAMessage = `KNV1060: The "other-manager" reconciler detected a management conflict with the "this-manager" reconciler. Remove the object from one of the sources of truth so that the object is only managed by one reconciler.

metadata.name: foo-ns
group:
version: v1
kind: Namespace

For more information, see https://g.co/cloud/acm-errors#knv1060`
const kptConflictMessage = `KNV1060: The "this-manager" reconciler detected a management conflict with another reconciler. Remove the object from one of the sources of truth so that the object is only managed by one reconciler.

metadata.name: foo-ns
group:
version: v1
kind: Namespace

For more information, see https://g.co/cloud/acm-errors#knv1060`
testutil.AssertEqual(t, conflictABMessage, conflictAB.ToCSE().ErrorMessage)
testutil.AssertEqual(t, conflictBAMessage, conflictBA.ToCSE().ErrorMessage)
testutil.AssertEqual(t, kptConflictMessage, kptConflictError.ToCSE().ErrorMessage)
testutil.AssertEqual(t, conflictBA, conflictABInverted)
testCases := map[string]struct {
thisSyncErrors []v1beta1.ConfigSyncError
expectedErrors []v1beta1.ConfigSyncError
}{
"empty errors": {
thisSyncErrors: []v1beta1.ConfigSyncError{},
expectedErrors: []v1beta1.ConfigSyncError{
conflictAB.ToCSE(),
},
},
"unchanged conflict error": {
thisSyncErrors: []v1beta1.ConfigSyncError{
conflictAB.ToCSE(),
},
expectedErrors: []v1beta1.ConfigSyncError{
conflictAB.ToCSE(),
},
},
"prepend conflict error": {
thisSyncErrors: []v1beta1.ConfigSyncError{
{ErrorMessage: "foo"},
},
expectedErrors: []v1beta1.ConfigSyncError{
conflictAB.ToCSE(),
{ErrorMessage: "foo"},
},
},
"dedupe AB and BA errors": {
thisSyncErrors: []v1beta1.ConfigSyncError{
conflictBA.ToCSE(),
},
expectedErrors: []v1beta1.ConfigSyncError{
conflictBA.ToCSE(),
},
},
"dedupe AB and AB inverted errors": {
thisSyncErrors: []v1beta1.ConfigSyncError{
conflictABInverted.ToCSE(),
},
expectedErrors: []v1beta1.ConfigSyncError{
conflictABInverted.ToCSE(),
},
},
// TODO: De-dupe ManagementConflictErrorWrap & KptManagementConflictError
// These are currently de-duped locally by the conflict handler,
// but not remotely by prependRootSyncRemediatorStatus.
"prepend KptManagementConflictError": {
thisSyncErrors: []v1beta1.ConfigSyncError{
kptConflictError.ToCSE(),
},
expectedErrors: []v1beta1.ConfigSyncError{
conflictAB.ToCSE(),
kptConflictError.ToCSE(),
},
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
rootSync := k8sobjects.RootSyncObjectV1Beta1(rootSyncName)
rootSync.Status.Sync.Errors = tc.thisSyncErrors
fakeClient := syncertest.NewClient(t, core.Scheme, rootSync)
ctx := context.Background()
err := prependRootSyncRemediatorStatus(ctx, fakeClient, rootSyncName,
[]status.ManagementConflictError{conflictAB}, defaultDenominator)
require.NoError(t, err)
var updatedRootSync v1beta1.RootSync
statuserr := fakeClient.Get(ctx, rootsync.ObjectKey(rootSyncName), &updatedRootSync)
require.NoError(t, statuserr)
testutil.AssertEqual(t, tc.expectedErrors, updatedRootSync.Status.Sync.Errors)
})
}
}
37 changes: 0 additions & 37 deletions pkg/parse/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"k8s.io/utils/clock"
"kpt.dev/configsync/pkg/api/configsync"
"kpt.dev/configsync/pkg/core"
"kpt.dev/configsync/pkg/declared"
"kpt.dev/configsync/pkg/hydrate"
"kpt.dev/configsync/pkg/importer/analyzer/ast"
"kpt.dev/configsync/pkg/importer/filesystem/cmpath"
Expand Down Expand Up @@ -544,11 +543,6 @@ func (r *reconciler) setSyncStatus(ctx context.Context, newSyncStatus *SyncStatu
state.status.SyncStatus = newSyncStatus
}

// Report conflict errors to the remote manager, if it's a RootSync.
opts := r.Options()
if err := reportRootSyncConflicts(ctx, opts.Client, opts.ManagementConflicts()); err != nil {
return fmt.Errorf("failed to report remote conflicts: %w", err)
}
return nil
}

Expand Down Expand Up @@ -633,37 +627,6 @@ func blockHydration(signalFile string) {
klog.Infof("removed signal file %s to block hydration", signalFile)
}

// reportRootSyncConflicts reports conflicts to the RootSync that manages the
// conflicting resources.
func reportRootSyncConflicts(ctx context.Context, k8sClient client.Client, conflictErrs []status.ManagementConflictError) error {
if len(conflictErrs) == 0 {
return nil
}
conflictingManagerErrors := map[string][]status.ManagementConflictError{}
for _, conflictError := range conflictErrs {
conflictingManager := conflictError.CurrentManager()
conflictingManagerErrors[conflictingManager] = append(conflictingManagerErrors[conflictingManager], conflictError)
}

for conflictingManager, conflictErrors := range conflictingManagerErrors {
scope, name := declared.ManagerScopeAndName(conflictingManager)
if scope == declared.RootScope {
// RootSync applier uses PolicyAdoptAll.
// So it may fight, if the webhook is disabled.
// Report the conflict to the other RootSync to make it easier to detect.
klog.Infof("Detected conflict with RootSync manager %q", conflictingManager)
if err := prependRootSyncRemediatorStatus(ctx, k8sClient, name, conflictErrors, defaultDenominator); err != nil {
return fmt.Errorf("failed to update RootSync %q to prepend remediator conflicts: %w", name, err)
}
} else {
// RepoSync applier uses PolicyAdoptIfNoInventory.
// So it won't fight, even if the webhook is disabled.
klog.Infof("Detected conflict with RepoSync manager %q", conflictingManager)
}
}
return nil
}

// UpdateSyncStatus updates the RSync status to reflect asynchronous status
// changes made by the remediator between Reconcile calls.
func (r *reconciler) UpdateSyncStatus(ctx context.Context) error {
Expand Down