Skip to content

Commit

Permalink
Re-visit LPT evaluation as it was violating etcd size limits (#6)
Browse files Browse the repository at this point in the history
* Re-visit LPT evaluation as it was violating etcd size limits
  • Loading branch information
dee-kryvenko authored Feb 13, 2025
1 parent cf74510 commit cc7b511
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 147 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,20 @@ type MostWantedTwoPhaseHysteresisEvaluationSpec struct {
// Older historical records are always pruned.
// +kubebuilder:validation:Required
StabilizationPeriod metav1.Duration `json:"stabilizationPeriod,omitempty"`

// MinimumSampleSize is the minimum number of samples to consider before first evaluating.
// +kubebuilder:validation:Required
MinimumSampleSize int32 `json:"minimumSampleSize,omitempty"`
}

type MostWantedTwoPhaseHysteresisEvaluationStatusHistoricalRecord struct {
// Timestamp is the time at which the record was created.
// Timestamp is the time at which the record was last seen.
// +kubebuilder:validation:Required
Timestamp metav1.Time `json:"timestamp,omitempty"`

// Replicas is the partition as it was seen at this moment in time.
// +kubebuilder:validation:Required
Replicas common.ReplicaList `json:"replicas,omitempty"`

// SeenTimes is the counter of how many times have this record been seen.
// +kubebuilder:validation:Required
SeenTimes int32 `json:"seenTimes,omitempty"`
}

// MostWantedTwoPhaseHysteresisEvaluationStatus defines the observed state of MostWantedTwoPhaseHysteresisEvaluation.
Expand All @@ -57,7 +57,6 @@ type MostWantedTwoPhaseHysteresisEvaluationStatus struct {
History []MostWantedTwoPhaseHysteresisEvaluationStatusHistoricalRecord `json:"history,omitempty"`

// LastEvaluationTimestamp is the time at which the last evaluation was performed.

LastEvaluationTimestamp *metav1.Time `json:"lastEvaluationTimestamp,omitempty"`

// Projection shows what the partitioning choice would have been if evaluation was performed during last poll.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,6 @@ spec:
description: MostWantedTwoPhaseHysteresisEvaluationSpec defines the desired
state of MostWantedTwoPhaseHysteresisEvaluation.
properties:
minimumSampleSize:
description: MinimumSampleSize is the minimum number of samples to
consider before first evaluating.
format: int32
type: integer
partitionProviderRef:
description: PartitionProviderRef is the reference to the partition
provider.
Expand Down Expand Up @@ -89,7 +84,6 @@ spec:
Older historical records are always pruned.
type: string
required:
- minimumSampleSize
- partitionProviderRef
- pollingPeriod
- stabilizationPeriod
Expand Down Expand Up @@ -257,16 +251,25 @@ spec:
- totalLoadDisplayValue
type: object
type: array
seenTimes:
description: SeenTimes is the counter of how many times have
this record been seen.
format: int32
type: integer
timestamp:
description: Timestamp is the time at which the record was created.
description: Timestamp is the time at which the record was last
seen.
format: date-time
type: string
required:
- replicas
- seenTimes
- timestamp
type: object
type: array
lastEvaluationTimestamp:
description: LastEvaluationTimestamp is the time at which the last
evaluation was performed.
format: date-time
type: string
projection:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,3 @@ spec:
name: default
pollingPeriod: 1h
stabilizationPeriod: 24h
minimumSampleSize: 5
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,3 @@ spec:
name: sample
pollingPeriod: 5s
stabilizationPeriod: 1m
minimumSampleSize: 3
Original file line number Diff line number Diff line change
Expand Up @@ -204,56 +204,58 @@ func (r *MostWantedTwoPhaseHysteresisEvaluationReconciler) Reconcile(ctx context
replicas := partitionProvider.GetPartitionProviderStatus().Replicas
log.V(2).Info("Currently reported replicas by partition provider", "count", len(replicas))

seenBefore := false
for i, record := range evaluation.Status.History {
if record.Replicas.SerializeToString() == replicas.SerializeToString() {
seenBefore = true
evaluation.Status.History[i].SeenTimes++
evaluation.Status.History[i].Timestamp = metav1.Now()
break
}
}
if !seenBefore {
evaluation.Status.History = append(evaluation.Status.History,
autoscalerv1alpha1.MostWantedTwoPhaseHysteresisEvaluationStatusHistoricalRecord{
Timestamp: metav1.Now(),
Replicas: replicas,
SeenTimes: 1,
},
)
}

// Maintain the history
evaluation.Status.History = append(evaluation.Status.History,
autoscalerv1alpha1.MostWantedTwoPhaseHysteresisEvaluationStatusHistoricalRecord{
Timestamp: metav1.Now(),
Replicas: replicas,
})
cleanHistory := []autoscalerv1alpha1.MostWantedTwoPhaseHysteresisEvaluationStatusHistoricalRecord{}
for _, record := range evaluation.Status.History {
if record.Timestamp.Add(evaluation.Spec.StabilizationPeriod.Duration).After(time.Now()) {
cleanHistory = append(cleanHistory, record)
}
}
log.V(1).Info("Trim the history", "from", len(evaluation.Status.History), "to", len(cleanHistory))
evaluation.Status.History = cleanHistory
if len(evaluation.Status.History) < int(evaluation.Spec.MinimumSampleSize) {
err := fmt.Errorf("Minimum sample size not reached")
log.Info(err.Error())
meta.SetStatusCondition(&evaluation.Status.Conditions, metav1.Condition{
Type: StatusTypeReady,
Status: metav1.ConditionFalse,
Reason: "MinimumSampleSizeNotReached",
Message: err.Error(),
})
if err := r.Status().Update(ctx, evaluation); err != nil {
log.V(1).Info("Failed to update resource status", "err", err)
return ctrl.Result{}, err
}
// We need to re-queue it for the next poll, but this is NOT an error
return ctrl.Result{RequeueAfter: evaluation.Spec.PollingPeriod.Duration}, nil
historyWasTrimmed := len(cleanHistory) < len(evaluation.Status.History)
if historyWasTrimmed {
log.V(1).Info("Trim the history", "from", len(evaluation.Status.History), "to", len(cleanHistory))
evaluation.Status.History = cleanHistory
}
log.V(2).Info("Minimum sample size reached, proceeding to evaluate",
"minimumSampleSize", evaluation.Spec.MinimumSampleSize,
"currentSampleSize", len(evaluation.Status.History))

historyRecords := map[string]autoscalerv1alpha1.MostWantedTwoPhaseHysteresisEvaluationStatusHistoricalRecord{}
historyRecordsLastSeen := map[string]metav1.Time{}
historyRecorsSeenTimes := map[string]int{}
historyRecorsSeenTimes := map[string]int32{}
for _, record := range evaluation.Status.History {
serializedRecord := record.Replicas.SerializeToString()
log.V(2).Info("Noticing record", "record", serializedRecord)
if _, ok := historyRecordsLastSeen[serializedRecord]; !ok ||
record.Timestamp.After(historyRecordsLastSeen[serializedRecord].Time) {
if _, ok := historyRecordsLastSeen[serializedRecord]; !ok {
historyRecords[serializedRecord] = record
historyRecordsLastSeen[serializedRecord] = record.Timestamp
historyRecorsSeenTimes[serializedRecord] = 0
historyRecorsSeenTimes[serializedRecord] = record.SeenTimes
} else if record.Timestamp.After(historyRecordsLastSeen[serializedRecord].Time) {
historyRecordsLastSeen[serializedRecord] = record.Timestamp
historyRecorsSeenTimes[serializedRecord] += record.SeenTimes
} else {
historyRecorsSeenTimes[serializedRecord] += record.SeenTimes
}
historyRecorsSeenTimes[serializedRecord]++
}

topSeenRecord := autoscalerv1alpha1.MostWantedTwoPhaseHysteresisEvaluationStatusHistoricalRecord{}
maxSeenCount := 0
maxSeenCount := int32(0)
for serializedRecord, seenTimes := range historyRecorsSeenTimes {
log.V(2).Info("Evaluating records", "record", serializedRecord, "seenTimes", seenTimes)
if seenTimes > maxSeenCount {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package autoscaler

import (
"fmt"
"time"

. "github.com/onsi/ginkgo/v2"
Expand Down Expand Up @@ -83,9 +82,8 @@ var _ = Describe("MostWantedTwoPhaseHysteresisEvaluation Controller", func() {
Name: "N/A",
},
},
PollingPeriod: metav1.Duration{Duration: time.Minute},
StabilizationPeriod: metav1.Duration{Duration: time.Hour},
MinimumSampleSize: 5,
PollingPeriod: metav1.Duration{Duration: time.Second},
StabilizationPeriod: metav1.Duration{Duration: 5 * time.Minute},
},
},
).Create()
Expand Down Expand Up @@ -207,45 +205,8 @@ var _ = Describe("MostWantedTwoPhaseHysteresisEvaluation Controller", func() {
).
BranchFailureToUpdateStatusCheck(collector.Collect).
WithCheck(
"bail out on minimum sample size not reached",
"succeed",
func(run *ScenarioRun[*autoscalerv1alpha1.MostWantedTwoPhaseHysteresisEvaluation]) {
samplePartition := NewObjectContainer(
run,
&autoscalerv1alpha1.LongestProcessingTimePartition{
ObjectMeta: metav1.ObjectMeta{
Name: run.Container().Get().Object().Spec.PartitionProviderRef.Name,
Namespace: run.Namespace().ObjectKey().Name,
},
},
)
for i := 0; i < int(run.Container().Object().Spec.MinimumSampleSize-1); i++ {
Expect(run.ReconcileError()).ToNot(HaveOccurred())
Expect(run.ReconcileResult().RequeueAfter).
To(Equal(run.Container().Object().Spec.PollingPeriod.Duration))
Expect(run.ReconcileResult().Requeue).To(BeFalse())

By("Checking conditions")
readyCondition := meta.FindStatusCondition(
run.Container().Get().Object().Status.Conditions,
StatusTypeReady,
)
Expect(readyCondition).NotTo(BeNil())
Expect(readyCondition.Status).To(Equal(metav1.ConditionFalse))
Expect(readyCondition.Reason).To(Equal("MinimumSampleSizeNotReached"))
Expect(readyCondition.Message).To(ContainSubstring("Minimum sample size not reached"))

By("Checking history records")
samplePartition.Get()
history := run.Container().Object().Status.History
Expect(history).To(HaveLen(i + 1))
record := history[i]
Expect(record.Replicas).To(Equal(samplePartition.Object().Status.Replicas))

By("Reconciling resource sample #" + fmt.Sprintf("%d", i+2))
time.Sleep(5 * time.Second)
run.Reconcile()
}

Expect(run.ReconcileError()).ToNot(HaveOccurred())
Expect(run.ReconcileResult().RequeueAfter).
To(Equal(run.Container().Object().Spec.PollingPeriod.Duration))
Expand All @@ -261,57 +222,28 @@ var _ = Describe("MostWantedTwoPhaseHysteresisEvaluation Controller", func() {
Expect(readyCondition.Reason).To(Equal(StatusTypeReady))

By("Checking history records")
samplePartition := NewObjectContainer(
run,
&autoscalerv1alpha1.LongestProcessingTimePartition{
ObjectMeta: metav1.ObjectMeta{
Name: run.Container().Get().Object().Spec.PartitionProviderRef.Name,
Namespace: run.Namespace().ObjectKey().Name,
},
},
)
samplePartition.Get()
history := run.Container().Object().Status.History
Expect(history).To(HaveLen(int(run.Container().Object().Spec.MinimumSampleSize)))
record := history[int(run.Container().Object().Spec.MinimumSampleSize)-1]
Expect(record.Replicas).To(Equal(samplePartition.Object().Status.Replicas))
Expect(history).To(HaveLen(1))

By("Checking evaluation results")
Expect(run.Container().Object().Status.LastEvaluationTimestamp.Time).To(BeTemporally("~", time.Now(), 2*time.Second))
Expect(run.Container().Object().Status.Projection).To(Equal(samplePartition.Object().Status.Replicas))
Expect(run.Container().Object().Status.Replicas).To(Equal(samplePartition.Object().Status.Replicas))
Expect(run.Container().Object().Status.Projection).To(Equal(samplePartition.Object().Status.Replicas))
Expect(run.Container().Object().Status.LastEvaluationTimestamp.Time).
To(BeTemporally("~", time.Now(), time.Second))

By("Simulate stabilization period expiring")
run.Container().Object().Status.LastEvaluationTimestamp = &metav1.Time{
Time: time.Now().Add(-run.Container().Object().Spec.StabilizationPeriod.Duration).
Add(-time.Second),
}
for i := range run.Container().Object().Status.History {
run.Container().Object().Status.History[i].Timestamp = *run.Container().Object().Status.LastEvaluationTimestamp
}
run.Container().StatusUpdate()

for i := 0; i < int(run.Container().Object().Spec.MinimumSampleSize-1); i++ {
By("Reconciling resource sample #" + fmt.Sprintf("%d", i+1))
time.Sleep(5 * time.Second)
run.Reconcile()

Expect(run.ReconcileError()).ToNot(HaveOccurred())
Expect(run.ReconcileResult().RequeueAfter).
To(Equal(run.Container().Object().Spec.PollingPeriod.Duration))
Expect(run.ReconcileResult().Requeue).To(BeFalse())

By("Checking conditions")
readyCondition := meta.FindStatusCondition(
run.Container().Get().Object().Status.Conditions,
StatusTypeReady,
)
Expect(readyCondition).NotTo(BeNil())
Expect(readyCondition.Status).To(Equal(metav1.ConditionFalse))
Expect(readyCondition.Reason).To(Equal("MinimumSampleSizeNotReached"))
Expect(readyCondition.Message).To(ContainSubstring("Minimum sample size not reached"))

By("Checking history records")
samplePartition.Get()
history := run.Container().Object().Status.History
Expect(history).To(HaveLen(i + 1))
record := history[i]
Expect(record.Replicas).To(Equal(samplePartition.Object().Status.Replicas))
}

By("Reconciling resource sample #" + fmt.Sprintf("%d", run.Container().Object().Spec.MinimumSampleSize))
time.Sleep(5 * time.Second)
By("Checking seconds reconciliation")
firstReconcileTime := run.Container().Object().Status.LastEvaluationTimestamp.Time
time.Sleep(6 * time.Second)
run.Reconcile()

Expect(run.ReconcileError()).ToNot(HaveOccurred())
Expand All @@ -329,27 +261,28 @@ var _ = Describe("MostWantedTwoPhaseHysteresisEvaluation Controller", func() {
Expect(readyCondition.Reason).To(Equal(StatusTypeReady))

By("Checking history records")
samplePartition = NewObjectContainer(
run,
&autoscalerv1alpha1.LongestProcessingTimePartition{
ObjectMeta: metav1.ObjectMeta{
Name: run.Container().Get().Object().Spec.PartitionProviderRef.Name,
Namespace: run.Namespace().ObjectKey().Name,
},
},
)
samplePartition.Get()
history = run.Container().Object().Status.History
Expect(history).To(HaveLen(int(run.Container().Object().Spec.MinimumSampleSize)))
record = history[int(run.Container().Object().Spec.MinimumSampleSize)-1]
Expect(record.Replicas).To(Equal(samplePartition.Object().Status.Replicas))
Expect(history).To(HaveLen(1))
Expect(history[0].SeenTimes).To(Equal(int32(2)))

By("Checking evaluation results")
Expect(run.Container().Object().Status.LastEvaluationTimestamp.Time).To(BeTemporally("~", time.Now(), 2*time.Second))
Expect(run.Container().Object().Status.Projection).To(Equal(samplePartition.Object().Status.Replicas))
Expect(run.Container().Object().Status.Replicas).To(Equal(samplePartition.Object().Status.Replicas))
Expect(run.Container().Object().Status.Projection).To(Equal(samplePartition.Object().Status.Replicas))
Expect(run.Container().Object().Status.LastEvaluationTimestamp.Time).
To(Equal(firstReconcileTime))
},
).
Commit(collector.Collect).
Hydrate(
"with one min sample requirement",
func(run *ScenarioRun[*autoscalerv1alpha1.MostWantedTwoPhaseHysteresisEvaluation]) {
run.Container().Object().Spec.MinimumSampleSize = 1
run.Container().Update()
},
).
BranchFailureToUpdateStatusCheck(collector.Collect)
Commit(collector.Collect)

BeforeEach(func() {
scenarioRun = collector.NewRun(ctx, k8sClient)
Expand Down

0 comments on commit cc7b511

Please sign in to comment.