Skip to content

Commit 5f916be

Browse files
Add topologySpreadConstraints configuration to pod spec.
1 parent c206eb3 commit 5f916be

File tree

9 files changed

+155
-14
lines changed

9 files changed

+155
-14
lines changed

e2e/tests/test_e2e.py

Lines changed: 61 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -560,7 +560,7 @@ def compare_config():
560560

561561
pg_patch_config["spec"]["patroni"]["slots"][slot_to_change]["database"] = "bar"
562562
del pg_patch_config["spec"]["patroni"]["slots"][slot_to_remove]
563-
563+
564564
k8s.api.custom_objects_api.patch_namespaced_custom_object(
565565
"acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_delete_slot_patch)
566566

@@ -577,7 +577,7 @@ def compare_config():
577577

578578
self.eventuallyEqual(lambda: self.query_database(leader.metadata.name, "postgres", get_slot_query%("database", slot_to_change))[0], "bar",
579579
"The replication slot cannot be updated", 10, 5)
580-
580+
581581
# make sure slot from Patroni didn't get deleted
582582
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "postgres", get_slot_query%("slot_name", patroni_slot))), 1,
583583
"The replication slot from Patroni gets deleted", 10, 5)
@@ -933,7 +933,7 @@ def test_ignored_annotations(self):
933933
},
934934
}
935935
}
936-
936+
937937
old_sts_creation_timestamp = sts.metadata.creation_timestamp
938938
k8s.api.apps_v1.patch_namespaced_stateful_set(sts.metadata.name, sts.metadata.namespace, annotation_patch)
939939
old_svc_creation_timestamp = svc.metadata.creation_timestamp
@@ -1370,7 +1370,7 @@ def test_persistent_volume_claim_retention_policy(self):
13701370
}
13711371
k8s.update_config(patch_scaled_policy_retain)
13721372
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
1373-
1373+
13741374
# decrease the number of instances
13751375
k8s.api.custom_objects_api.patch_namespaced_custom_object(
13761376
'acid.zalan.do', 'v1', 'default', 'postgresqls', 'acid-minimal-cluster', pg_patch_scale_down_instances)
@@ -1647,7 +1647,6 @@ def test_node_readiness_label(self):
16471647
# toggle pod anti affinity to move replica away from master node
16481648
self.assert_distributed_pods(master_nodes)
16491649

1650-
16511650
@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
16521651
def test_overwrite_pooler_deployment(self):
16531652
pooler_name = 'acid-minimal-cluster-pooler'
@@ -1796,7 +1795,7 @@ def test_password_rotation(self):
17961795
},
17971796
}
17981797
k8s.api.core_v1.patch_namespaced_secret(
1799-
name="foo-user.acid-minimal-cluster.credentials.postgresql.acid.zalan.do",
1798+
name="foo-user.acid-minimal-cluster.credentials.postgresql.acid.zalan.do",
18001799
namespace="default",
18011800
body=secret_fake_rotation)
18021801

@@ -1812,7 +1811,7 @@ def test_password_rotation(self):
18121811
"data": {
18131812
"enable_password_rotation": "true",
18141813
"password_rotation_interval": "30",
1815-
"password_rotation_user_retention": "30", # should be set to 60
1814+
"password_rotation_user_retention": "30", # should be set to 60
18161815
},
18171816
}
18181817
k8s.update_config(enable_password_rotation)
@@ -1865,7 +1864,7 @@ def test_password_rotation(self):
18651864
"Unexpected username in secret of test.db_user: expected {}, got {}".format("test.db_user", secret_username))
18661865

18671866
# disable password rotation for all other users (foo_user)
1868-
# and pick smaller intervals to see if the third fake rotation user is dropped
1867+
# and pick smaller intervals to see if the third fake rotation user is dropped
18691868
enable_password_rotation = {
18701869
"data": {
18711870
"enable_password_rotation": "false",
@@ -2363,6 +2362,56 @@ def test_taint_based_eviction(self):
23632362
# toggle pod anti affinity to move replica away from master node
23642363
self.assert_distributed_pods(master_nodes)
23652364

2365+
@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
2366+
def test_topology_spread_constraints(self):
2367+
'''
2368+
Enable topologySpreadConstraints for pods
2369+
'''
2370+
k8s = self.k8s
2371+
cluster_labels = "application=spilo,cluster-name=acid-minimal-cluster"
2372+
2373+
# Verify we are in good state from potential previous tests
2374+
self.eventuallyEqual(lambda: k8s.count_running_pods(), 2, "No 2 pods running")
2375+
2376+
master_nodes, replica_nodes = k8s.get_cluster_nodes()
2377+
self.assertNotEqual(master_nodes, [])
2378+
self.assertNotEqual(replica_nodes, [])
2379+
2380+
# Patch label to nodes for topologySpreadConstraints
2381+
patch_node_label = {
2382+
"metadata": {
2383+
"labels": {
2384+
"topology.kubernetes.io/zone": "zalando"
2385+
}
2386+
}
2387+
}
2388+
k8s.api.core_v1.patch_node(master_nodes[0], patch_node_label)
2389+
k8s.api.core_v1.patch_node(replica_nodes[0], patch_node_label)
2390+
2391+
# Scale-out postgresql pods
2392+
k8s.api.custom_objects_api.patch_namespaced_custom_object("acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster",
2393+
{"spec": {"numberOfInstances": 6}})
2394+
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
2395+
self.eventuallyEqual(lambda: k8s.count_pods_with_label(cluster_labels), 6, "Postgresql StatefulSet are scale to 6")
2396+
self.eventuallyEqual(lambda: k8s.count_running_pods(), 6, "All pods are running")
2397+
2398+
worker_node_1 = 0
2399+
worker_node_2 = 0
2400+
pods = k8s.api.core_v1.list_namespaced_pod('default', label_selector=cluster_labels)
2401+
for pod in pods.items:
2402+
if pod.spec.node_name == 'postgres-operator-e2e-tests-worker':
2403+
worker_node_1 += 1
2404+
elif pod.spec.node_name == 'postgres-operator-e2e-tests-worker2':
2405+
worker_node_2 += 1
2406+
2407+
self.assertEqual(worker_node_1, worker_node_2)
2408+
self.assertEqual(worker_node_1, 3)
2409+
self.assertEqual(worker_node_2, 3)
2410+
2411+
# Scale-it postgresql pods to previous replicas
2412+
k8s.api.custom_objects_api.patch_namespaced_custom_object("acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster",
2413+
{"spec": {"numberOfInstances": 2}})
2414+
23662415
@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
23672416
def test_zz_cluster_deletion(self):
23682417
'''
@@ -2438,7 +2487,7 @@ def test_zz_cluster_deletion(self):
24382487
self.eventuallyEqual(lambda: k8s.count_deployments_with_label(cluster_label), 0, "Deployments not deleted")
24392488
self.eventuallyEqual(lambda: k8s.count_pdbs_with_label(cluster_label), 0, "Pod disruption budget not deleted")
24402489
self.eventuallyEqual(lambda: k8s.count_secrets_with_label(cluster_label), 8, "Secrets were deleted although disabled in config")
2441-
self.eventuallyEqual(lambda: k8s.count_pvcs_with_label(cluster_label), 3, "PVCs were deleted although disabled in config")
2490+
self.eventuallyEqual(lambda: k8s.count_pvcs_with_label(cluster_label), 6, "PVCs were deleted although disabled in config")
24422491

24432492
except timeout_decorator.TimeoutError:
24442493
print('Operator log: {}'.format(k8s.get_operator_log()))
@@ -2480,7 +2529,7 @@ def assert_distributed_pods(self, target_nodes, cluster_labels='cluster-name=aci
24802529

24812530
# if nodes are different we can quit here
24822531
if master_nodes[0] not in replica_nodes:
2483-
return True
2532+
return True
24842533

24852534
# enable pod anti affintiy in config map which should trigger movement of replica
24862535
patch_enable_antiaffinity = {
@@ -2504,7 +2553,7 @@ def assert_distributed_pods(self, target_nodes, cluster_labels='cluster-name=aci
25042553
}
25052554
k8s.update_config(patch_disable_antiaffinity, "disable antiaffinity")
25062555
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
2507-
2556+
25082557
k8s.wait_for_pod_start('spilo-role=replica,' + cluster_labels)
25092558
k8s.wait_for_running_pods(cluster_labels, 2)
25102559

@@ -2515,7 +2564,7 @@ def assert_distributed_pods(self, target_nodes, cluster_labels='cluster-name=aci
25152564
# if nodes are different we can quit here
25162565
for target_node in target_nodes:
25172566
if (target_node not in master_nodes or target_node not in replica_nodes) and master_nodes[0] in replica_nodes:
2518-
print('Pods run on the same node')
2567+
print('Pods run on the same node')
25192568
return False
25202569

25212570
except timeout_decorator.TimeoutError:

manifests/postgresql.crd.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -574,6 +574,12 @@ spec:
574574
- PreferNoSchedule
575575
tolerationSeconds:
576576
type: integer
577+
topologySpreadConstraints:
578+
type: array
579+
nullable: true
580+
items:
581+
type: object
582+
x-kubernetes-preserve-unknown-fields: true
577583
useLoadBalancer:
578584
type: boolean
579585
description: deprecated

pkg/apis/acid.zalan.do/v1/crds.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -895,6 +895,16 @@ var PostgresCRDResourceValidation = apiextv1.CustomResourceValidation{
895895
},
896896
},
897897
},
898+
"topologySpreadConstraints": {
899+
Type: "array",
900+
Nullable: true,
901+
Items: &apiextv1.JSONSchemaPropsOrArray{
902+
Schema: &apiextv1.JSONSchemaProps{
903+
Type: "object",
904+
XPreserveUnknownFields: util.True(),
905+
},
906+
},
907+
},
898908
"useLoadBalancer": {
899909
Type: "boolean",
900910
Description: "deprecated",

pkg/apis/acid.zalan.do/v1/operator_configuration_type.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ type KubernetesMetaConfiguration struct {
108108
EnableReadinessProbe bool `json:"enable_readiness_probe,omitempty"`
109109
EnableCrossNamespaceSecret bool `json:"enable_cross_namespace_secret,omitempty"`
110110
EnableFinalizers *bool `json:"enable_finalizers,omitempty"`
111+
EnablePostgresTopologySpreadConstraints bool `json:"enable_postgres_topology_spread_constraints,omitempty"`
111112
}
112113

113114
// PostgresPodResourcesDefaults defines the spec of default resources

pkg/apis/acid.zalan.do/v1/postgresql_type.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,8 @@ type PostgresSpec struct {
9393
// deprecated json tags
9494
InitContainersOld []v1.Container `json:"init_containers,omitempty"`
9595
PodPriorityClassNameOld string `json:"pod_priority_class_name,omitempty"`
96+
97+
TopologySpreadConstraints []v1.TopologySpreadConstraint `json:"topologySpreadConstraints,omitempty"`
9698
}
9799

98100
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

pkg/cluster/cluster.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -494,6 +494,11 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa
494494
needsRollUpdate = true
495495
reasons = append(reasons, "new statefulset's pod affinity does not match the current one")
496496
}
497+
if !reflect.DeepEqual(c.Statefulset.Spec.Template.Spec.TopologySpreadConstraints, statefulSet.Spec.Template.Spec.TopologySpreadConstraints) {
498+
needsReplace = true
499+
needsRollUpdate = true
500+
reasons = append(reasons, "new statefulset's pod topologySpreadConstraints does not match the current one")
501+
}
497502
if len(c.Statefulset.Spec.Template.Spec.Tolerations) != len(statefulSet.Spec.Template.Spec.Tolerations) {
498503
needsReplace = true
499504
needsRollUpdate = true

pkg/cluster/k8sres.go

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -599,6 +599,22 @@ func generatePodAntiAffinity(podAffinityTerm v1.PodAffinityTerm, preferredDuring
599599
return podAntiAffinity
600600
}
601601

602+
func generateTopologySpreadConstraints(labels labels.Set, topologySpreadConstraints []v1.TopologySpreadConstraint) []v1.TopologySpreadConstraint {
603+
topologySpreadConstraint := v1.TopologySpreadConstraint{
604+
MaxSkew: int32(1),
605+
TopologyKey: "topology.kubernetes.io/zone",
606+
WhenUnsatisfiable: v1.DoNotSchedule,
607+
LabelSelector: &metav1.LabelSelector{
608+
MatchLabels: labels,
609+
},
610+
}
611+
topologySpreadConstraints := []v1.TopologySpreadConstraint{topologySpreadConstraint}
612+
if len(topologySpreadConstraints) > 0 {
613+
topologySpreadConstraints = append(topologySpreadConstraints, topologySpreadConstraints...)
614+
}
615+
return topologySpreadConstraints
616+
}
617+
602618
func tolerations(tolerationsSpec *[]v1.Toleration, podToleration map[string]string) []v1.Toleration {
603619
// allow to override tolerations by postgresql manifest
604620
if len(*tolerationsSpec) > 0 {
@@ -821,6 +837,7 @@ func (c *Cluster) generatePodTemplate(
821837
additionalSecretMount string,
822838
additionalSecretMountPath string,
823839
additionalVolumes []acidv1.AdditionalVolume,
840+
topologySpreadConstraints []v1.TopologySpreadConstraint,
824841
) (*v1.PodTemplateSpec, error) {
825842

826843
terminateGracePeriodSeconds := terminateGracePeriod
@@ -873,6 +890,10 @@ func (c *Cluster) generatePodTemplate(
873890
podSpec.PriorityClassName = priorityClassName
874891
}
875892

893+
if topologySpreadConstraints {
894+
podSpec.TopologySpreadConstraints = generateTopologySpreadConstraints(labels, topologySpreadConstraints)
895+
}
896+
876897
if sharePgSocketWithSidecars != nil && *sharePgSocketWithSidecars {
877898
addVarRunVolume(&podSpec)
878899
}
@@ -1476,7 +1497,8 @@ func (c *Cluster) generateStatefulSet(spec *acidv1.PostgresSpec) (*appsv1.Statef
14761497
c.OpConfig.PodAntiAffinityPreferredDuringScheduling,
14771498
c.OpConfig.AdditionalSecretMount,
14781499
c.OpConfig.AdditionalSecretMountPath,
1479-
additionalVolumes)
1500+
additionalVolumes,
1501+
spec.TopologySpreadConstraints)
14801502

14811503
if err != nil {
14821504
return nil, fmt.Errorf("could not generate pod template: %v", err)
@@ -2334,7 +2356,9 @@ func (c *Cluster) generateLogicalBackupJob() (*batchv1.CronJob, error) {
23342356
false,
23352357
c.OpConfig.AdditionalSecretMount,
23362358
c.OpConfig.AdditionalSecretMountPath,
2337-
[]acidv1.AdditionalVolume{}); err != nil {
2359+
[]acidv1.AdditionalVolume{},
2360+
true,
2361+
[]v1.TopologySpreadConstraint{}); err != nil {
23382362
return nil, fmt.Errorf("could not generate pod template for logical backup pod: %v", err)
23392363
}
23402364

pkg/cluster/k8sres_test.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3886,3 +3886,46 @@ func TestGenerateCapabilities(t *testing.T) {
38863886
}
38873887
}
38883888
}
3889+
3890+
func TestTopologySpreadConstraints(t *testing.T) {
3891+
clusterName := "acid-test-cluster"
3892+
namespace := "default"
3893+
3894+
pg := acidv1.Postgresql{
3895+
ObjectMeta: metav1.ObjectMeta{
3896+
Name: clusterName,
3897+
Namespace: namespace,
3898+
},
3899+
Spec: acidv1.PostgresSpec{
3900+
NumberOfInstances: 1,
3901+
Resources: &acidv1.Resources{
3902+
ResourceRequests: acidv1.ResourceDescription{CPU: k8sutil.StringToPointer("1"), Memory: k8sutil.StringToPointer("10")},
3903+
ResourceLimits: acidv1.ResourceDescription{CPU: k8sutil.StringToPointer("1"), Memory: k8sutil.StringToPointer("10")},
3904+
},
3905+
Volume: acidv1.Volume{
3906+
Size: "1G",
3907+
},
3908+
},
3909+
}
3910+
3911+
cluster := New(
3912+
Config{
3913+
OpConfig: config.Config{
3914+
PodManagementPolicy: "ordered_ready",
3915+
},
3916+
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder)
3917+
cluster.Name = clusterName
3918+
cluster.Namespace = namespace
3919+
cluster.labelsSet(true)
3920+
3921+
s, err := cluster.generateStatefulSet(&pg.Spec)
3922+
assert.NoError(t, err)
3923+
assert.Contains(t, s.Spec.Template.Spec.TopologySpreadConstraints, v1.TopologySpreadConstraint{
3924+
MaxSkew: int32(1),
3925+
TopologyKey: "topology.kubernetes.io/zone",
3926+
WhenUnsatisfiable: v1.DoNotSchedule,
3927+
LabelSelector: &metav1.LabelSelector{
3928+
MatchLabels: cluster.labelsSet(true),
3929+
},
3930+
})
3931+
}

pkg/util/config/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,7 @@ type Config struct {
254254
EnableSecretsDeletion *bool `name:"enable_secrets_deletion" default:"true"`
255255
EnablePersistentVolumeClaimDeletion *bool `name:"enable_persistent_volume_claim_deletion" default:"true"`
256256
PersistentVolumeClaimRetentionPolicy map[string]string `name:"persistent_volume_claim_retention_policy" default:"when_deleted:retain,when_scaled:retain"`
257+
EnablePostgresTopologySpreadConstraints bool `json:"enable_postgres_topology_spread_constraints,omitempty"`
257258
}
258259

259260
// MustMarshal marshals the config or panics

0 commit comments

Comments
 (0)