Skip to content

Commit add6a32

Browse files
committed
Save rs member ids in annotation/deployment state
1 parent ac55cad commit add6a32

8 files changed

+108
-17
lines changed

controllers/om/mockedomclient.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,22 @@ func (oc *MockedOmConnection) ConfigureProject(project *Project) {
141141
oc.context.OrgID = project.OrgID
142142
}
143143

144+
func (oc *MockedOmConnection) GetReplicaSetMemberIds() (map[string]map[string]int, error) {
145+
oc.addToHistory(reflect.ValueOf(oc.GetReplicaSetMemberIds))
146+
dep, err := oc.ReadDeployment()
147+
if err != nil {
148+
return nil, err
149+
}
150+
151+
finalProcessIds := make(map[string]map[string]int)
152+
153+
for _, replicaSet := range dep.GetReplicaSets() {
154+
finalProcessIds[replicaSet.Name()] = replicaSet.MemberIds()
155+
}
156+
157+
return finalProcessIds, nil
158+
}
159+
144160
var _ Connection = &MockedOmConnection{}
145161

146162
// NewEmptyMockedOmConnection is the standard function for creating mocked connections that is usually used for testing

controllers/om/omclient.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,10 @@ type Connection interface {
6464
GetPreferredHostnames(agentApiKey string) ([]PreferredHostname, error)
6565
AddPreferredHostname(agentApiKey string, value string, isRegexp bool) error
6666

67+
// GetReplicaSetMemberIds returns a map with the replicaset name as the key.
68+
// The value is another map where the key is the replicaset member name and the value is its member id.
69+
GetReplicaSetMemberIds() (map[string]map[string]int, error)
70+
6771
backup.GroupConfigReader
6872
backup.GroupConfigUpdater
6973

@@ -273,6 +277,21 @@ func (oc *HTTPOmConnection) GetAgentAuthMode() (string, error) {
273277
return ac.Auth.AutoAuthMechanism, nil
274278
}
275279

280+
func (oc *HTTPOmConnection) GetReplicaSetMemberIds() (map[string]map[string]int, error) {
281+
dep, err := oc.ReadDeployment()
282+
if err != nil {
283+
return nil, err
284+
}
285+
286+
finalProcessIds := make(map[string]map[string]int)
287+
288+
for _, replicaSet := range dep.GetReplicaSets() {
289+
finalProcessIds[replicaSet.Name()] = replicaSet.MemberIds()
290+
}
291+
292+
return finalProcessIds, nil
293+
}
294+
276295
var _ Connection = &HTTPOmConnection{}
277296

278297
// NewOpsManagerConnection stores OpsManger api endpoint and authentication credentials.

controllers/om/replicaset.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,14 @@ func (r ReplicaSet) String() string {
146146
return fmt.Sprintf("\"%s\" (members: %v)", r.Name(), r.Members())
147147
}
148148

149+
func (r ReplicaSet) MemberIds() map[string]int {
150+
memberIds := make(map[string]int)
151+
for _, rsMember := range r.Members() {
152+
memberIds[rsMember.Name()] = rsMember.Id()
153+
}
154+
return memberIds
155+
}
156+
149157
// ***************************************** Private methods ***********************************************************
150158

151159
func initDefaultRs(set ReplicaSet, name string, protocolVersion string) {

controllers/operator/mongodbmultireplicaset_controller.go

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -198,8 +198,14 @@ func (r *ReconcileMongoDbMultiReplicaSet) Reconcile(ctx context.Context, request
198198
return r.updateStatus(ctx, &mrs, status, log)
199199
}
200200

201+
// Save replicasets member ids in annotation
202+
finalMemberIds, err := conn.GetReplicaSetMemberIds()
203+
if err != nil {
204+
return r.updateStatus(ctx, &mrs, workflow.Failed(err), log)
205+
}
206+
201207
mrs.Status.FeatureCompatibilityVersion = mrs.CalculateFeatureCompatibilityVersion()
202-
if err := r.saveLastAchievedSpec(ctx, mrs); err != nil {
208+
if err := r.saveLastAchievedSpec(ctx, mrs, finalMemberIds); err != nil {
203209
return r.updateStatus(ctx, &mrs, workflow.Failed(xerrors.Errorf("Failed to set annotation: %w", err)), log)
204210
}
205211

@@ -624,7 +630,7 @@ func getMembersForClusterSpecItemThisReconciliation(mrs *mdbmultiv1.MongoDBMulti
624630
}
625631

626632
// saveLastAchievedSpec updates the MongoDBMultiCluster resource with the spec that was just achieved.
627-
func (r *ReconcileMongoDbMultiReplicaSet) saveLastAchievedSpec(ctx context.Context, mrs mdbmultiv1.MongoDBMultiCluster) error {
633+
func (r *ReconcileMongoDbMultiReplicaSet) saveLastAchievedSpec(ctx context.Context, mrs mdbmultiv1.MongoDBMultiCluster, rsMemberIds map[string]map[string]int) error {
628634
clusterSpecs, err := mrs.GetClusterSpecItems()
629635
if err != nil {
630636
return err
@@ -654,6 +660,14 @@ func (r *ReconcileMongoDbMultiReplicaSet) saveLastAchievedSpec(ctx context.Conte
654660
annotationsToAdd[mdbmultiv1.LastClusterNumMapping] = string(clusterNumBytes)
655661
}
656662

663+
rsMemberIdsBytes, err := json.Marshal(rsMemberIds)
664+
if err != nil {
665+
return err
666+
}
667+
if string(rsMemberIdsBytes) != "null" {
668+
annotationsToAdd[util.LastAchievedRsMemberIds] = string(rsMemberIdsBytes)
669+
}
670+
657671
return annotations.SetAnnotations(ctx, &mrs, annotationsToAdd, r.client)
658672
}
659673

@@ -696,6 +710,10 @@ func (r *ReconcileMongoDbMultiReplicaSet) updateOmDeploymentRs(ctx context.Conte
696710
}
697711

698712
processIds := getReplicaSetProcessIdsFromReplicaSets(mrs.Name, existingDeployment)
713+
// If there is no replicaset configuration saved in OM, it might be a new project, so we check the ids saved in annotation
714+
if len(processIds) == 0 {
715+
processIds = getReplicaSetProcessIdsFromAnnotation(mrs)
716+
}
699717
log.Debugf("Existing process Ids: %+v", processIds)
700718

701719
certificateFileName := ""
@@ -791,6 +809,16 @@ func getReplicaSetProcessIdsFromReplicaSets(replicaSetName string, deployment om
791809
return processIds
792810
}
793811

812+
func getReplicaSetProcessIdsFromAnnotation(mrs mdbmultiv1.MongoDBMultiCluster) map[string]int {
813+
processIds := make(map[string]map[string]int)
814+
if processIdsStr, ok := mrs.Annotations[util.LastAchievedRsMemberIds]; ok {
815+
if err := json.Unmarshal([]byte(processIdsStr), &processIds); err != nil {
816+
return map[string]int{}
817+
}
818+
}
819+
return processIds[mrs.Name]
820+
}
821+
794822
func getSRVService(mrs *mdbmultiv1.MongoDBMultiCluster) corev1.Service {
795823
additionalConfig := mrs.Spec.GetAdditionalMongodConfig()
796824
port := additionalConfig.GetPortOrDefault()

controllers/operator/mongodbshardedcluster_controller.go

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -98,9 +98,10 @@ func newShardedClusterReconciler(ctx context.Context, kubeClient client.Client,
9898
}
9999

100100
type ShardedClusterDeploymentState struct {
101-
CommonDeploymentState `json:",inline"`
102-
LastAchievedSpec *mdbv1.MongoDbSpec `json:"lastAchievedSpec"`
103-
Status *mdbv1.MongoDbStatus `json:"status"`
101+
CommonDeploymentState `json:",inline"`
102+
LastAchievedRsMemberIds map[string]map[string]int `json:"lastAchievedRsMemberIds"`
103+
LastAchievedSpec *mdbv1.MongoDbSpec `json:"lastAchievedSpec"`
104+
Status *mdbv1.MongoDbStatus `json:"status"`
104105
}
105106

106107
// updateStatusFromResourceStatus updates the status in the deployment state with values from the resource status with additional ensurance that no data is accidentally lost.
@@ -978,6 +979,13 @@ func (r *ShardedClusterReconcileHelper) Reconcile(ctx context.Context, log *zap.
978979
return r.updateStatus(ctx, sc, workflow.Failed(err), log)
979980
}
980981

982+
// Save replicasets member ids in deployment state
983+
finalProcessIds, err := conn.GetReplicaSetMemberIds()
984+
if err != nil {
985+
return r.updateStatus(ctx, sc, workflow.Failed(err), log)
986+
}
987+
r.deploymentState.LastAchievedRsMemberIds = finalProcessIds
988+
981989
// Save last achieved spec in state
982990
r.deploymentState.LastAchievedSpec = &sc.Spec
983991
log.Infof("Finished reconciliation for Sharded Cluster! %s", completionMessage(conn.BaseURL(), conn.GroupID()))
@@ -1907,7 +1915,7 @@ func (r *ShardedClusterReconcileHelper) publishDeployment(ctx context.Context, c
19071915
}
19081916

19091917
configSrvProcesses, configSrvMemberOptions := r.createDesiredConfigSrvProcessesAndMemberOptions(configSrvMemberCertPath)
1910-
configRs, _ := buildReplicaSetFromProcesses(sc.ConfigRsName(), configSrvProcesses, sc, configSrvMemberOptions, existingDeployment)
1918+
configRs, _ := buildReplicaSetFromProcesses(sc.ConfigRsName(), configSrvProcesses, sc, configSrvMemberOptions, existingDeployment, r.deploymentState.LastAchievedRsMemberIds[sc.ConfigRsName()])
19111919

19121920
// Shards
19131921
shards := make([]om.ReplicaSetWithProcesses, sc.Spec.ShardCount)
@@ -1918,7 +1926,7 @@ func (r *ShardedClusterReconcileHelper) publishDeployment(ctx context.Context, c
19181926
shardInternalClusterPaths = append(shardInternalClusterPaths, fmt.Sprintf("%s/%s", util.InternalClusterAuthMountPath, shardOptions.InternalClusterHash))
19191927
shardMemberCertPath := fmt.Sprintf("%s/%s", util.TLSCertMountPath, shardOptions.CertificateHash)
19201928
desiredShardProcesses, desiredShardMemberOptions := r.createDesiredShardProcessesAndMemberOptions(shardIdx, shardMemberCertPath)
1921-
shards[shardIdx], _ = buildReplicaSetFromProcesses(r.sc.ShardRsName(shardIdx), desiredShardProcesses, sc, desiredShardMemberOptions, existingDeployment)
1929+
shards[shardIdx], _ = buildReplicaSetFromProcesses(r.sc.ShardRsName(shardIdx), desiredShardProcesses, sc, desiredShardMemberOptions, existingDeployment, r.deploymentState.LastAchievedRsMemberIds[r.sc.ShardRsName(shardIdx)])
19221930
}
19231931

19241932
// updateOmAuthentication normally takes care of the certfile rotation code, but since sharded-cluster is special pertaining multiple clusterfiles, we code this part here for now.
@@ -2225,10 +2233,15 @@ func createMongodProcessForShardedCluster(mongoDBImage string, forceEnterprise b
22252233

22262234
// buildReplicaSetFromProcesses creates the 'ReplicaSetWithProcesses' with specified processes. This is of use only
22272235
// for sharded cluster (config server, shards)
2228-
func buildReplicaSetFromProcesses(name string, members []om.Process, mdb *mdbv1.MongoDB, memberOptions []automationconfig.MemberOptions, deployment om.Deployment) (om.ReplicaSetWithProcesses, error) {
2236+
func buildReplicaSetFromProcesses(name string, members []om.Process, mdb *mdbv1.MongoDB, memberOptions []automationconfig.MemberOptions, deployment om.Deployment, savedProcessIds map[string]int) (om.ReplicaSetWithProcesses, error) {
22292237
replicaSet := om.NewReplicaSet(name, mdb.Spec.GetMongoDBVersion())
22302238

22312239
existingProcessIds := getReplicaSetProcessIdsFromReplicaSets(replicaSet.Name(), deployment)
2240+
// If there is no replicaset configuration saved in OM, it might be a new project, so we check the ids saved in deployment state
2241+
if len(existingProcessIds) == 0 {
2242+
existingProcessIds = savedProcessIds
2243+
}
2244+
22322245
var rsWithProcesses om.ReplicaSetWithProcesses
22332246
if mdb.Spec.IsMultiCluster() {
22342247
// we're passing nil as connectivity argument as in sharded clusters horizons don't make much sense as we don't expose externally individual shards

controllers/operator/mongodbshardedcluster_controller_multi_test.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1088,7 +1088,10 @@ func TestMigrateToNewDeploymentState(t *testing.T) {
10881088
err = kubeClient.Get(ctx, types.NamespacedName{Name: configMapName, Namespace: sc.Namespace}, stateConfigMap)
10891089
require.NoError(t, err)
10901090

1091-
expectedDeploymentState := generateExpectedDeploymentState(t, sc)
1091+
memberIds, err := omConnectionFactory.GetConnection().GetReplicaSetMemberIds()
1092+
require.NoError(t, err)
1093+
1094+
expectedDeploymentState := generateExpectedDeploymentState(t, sc, memberIds)
10921095
require.Contains(t, stateConfigMap.Data, stateKey)
10931096
require.JSONEq(t, expectedDeploymentState, stateConfigMap.Data[stateKey])
10941097

@@ -3587,18 +3590,19 @@ func getMultiClusterFQDN(stsName string, namespace string, clusterIdx int, podId
35873590
return fmt.Sprintf("%s-svc.%s.svc.%s", getPodName(stsName, clusterIdx, podIdx), namespace, clusterDomain)
35883591
}
35893592

3590-
func generateExpectedDeploymentState(t *testing.T, sc *mdbv1.MongoDB) string {
3593+
func generateExpectedDeploymentState(t *testing.T, sc *mdbv1.MongoDB, memberIds map[string]map[string]int) string {
35913594
lastSpec, _ := sc.GetLastSpec()
35923595
expectedState := ShardedClusterDeploymentState{
35933596
CommonDeploymentState: CommonDeploymentState{
35943597
ClusterMapping: map[string]int{},
35953598
},
3596-
LastAchievedSpec: lastSpec,
3597-
Status: &sc.Status,
3599+
LastAchievedRsMemberIds: memberIds,
3600+
LastAchievedSpec: lastSpec,
3601+
Status: &sc.Status,
35983602
}
3599-
lastSpecBytes, err := json.Marshal(expectedState)
3603+
expectedStateBytes, err := json.Marshal(expectedState)
36003604
require.NoError(t, err)
3601-
return string(lastSpecBytes)
3605+
return string(expectedStateBytes)
36023606
}
36033607

36043608
func loadMongoDBResource(resourceYamlPath string) (*mdbv1.MongoDB, error) {

controllers/operator/mongodbshardedcluster_controller_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1644,7 +1644,7 @@ func createDeploymentFromShardedCluster(t *testing.T, updatable v1.CustomResourc
16441644
construct.GetPodEnvOptions(),
16451645
)
16461646
shardSts := construct.DatabaseStatefulSet(*sh, shardOptions, zap.S())
1647-
shards[i], _ = buildReplicaSetFromProcesses(shardSts.Name, createShardProcesses("fake-mongoDBImage", false, shardSts, sh, ""), sh, sh.Spec.GetMemberOptions(), om.NewDeployment())
1647+
shards[i], _ = buildReplicaSetFromProcesses(shardSts.Name, createShardProcesses("fake-mongoDBImage", false, shardSts, sh, ""), sh, sh.Spec.GetMemberOptions(), om.NewDeployment(), make(map[string]int))
16481648
}
16491649

16501650
desiredMongosConfig := createMongosSpec(sh)
@@ -1665,7 +1665,7 @@ func createDeploymentFromShardedCluster(t *testing.T, updatable v1.CustomResourc
16651665
construct.GetPodEnvOptions(),
16661666
)
16671667
configSvrSts := construct.DatabaseStatefulSet(*sh, configServerOptions, zap.S())
1668-
configRs, _ := buildReplicaSetFromProcesses(configSvrSts.Name, createConfigSrvProcesses("fake-mongoDBImage", false, configSvrSts, sh, ""), sh, sh.Spec.GetMemberOptions(), om.NewDeployment())
1668+
configRs, _ := buildReplicaSetFromProcesses(configSvrSts.Name, createConfigSrvProcesses("fake-mongoDBImage", false, configSvrSts, sh, ""), sh, sh.Spec.GetMemberOptions(), om.NewDeployment(), make(map[string]int))
16691669

16701670
d := om.NewDeployment()
16711671
_, err := d.MergeShardedCluster(om.DeploymentShardedClusterMergeOptions{

pkg/util/constants.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,10 @@ const (
280280
// TODO: remove this from here and move it to the certs package
281281
// This currently creates an import cycle
282282
InternalCertAnnotationKey = "internalCertHash"
283-
LastAchievedSpec = "mongodb.com/v1.lastSuccessfulConfiguration"
283+
284+
// Annotation keys used by the operator
285+
LastAchievedSpec = "mongodb.com/v1.lastSuccessfulConfiguration"
286+
LastAchievedRsMemberIds = "mongodb.com/v1.lastAchievedRsMemberIds"
284287

285288
// SecretVolumeName is the name of the volume resource.
286289
SecretVolumeName = "secret-certs"

0 commit comments

Comments
 (0)