Skip to content

Commit

Permalink
[BREAKING] Only store hash of distributions over time (#12)
Browse files Browse the repository at this point in the history
Evaluation resource has been storing list of replicas to each history
record. It appears to still blow out of the waters the size of the
object on big number of replicas. This change removes the distribution
from history records on evaluations. The idea is that we store the last
known projected winning distribution and compare it to the current one,
and if current one becomes the winner - we store it insead. At no point
in time we should need to know any other non-winning distribution, other
than what was its hash, how many times we've seen it and when was the
last time we've seen it.

There would be an edge case when a while ago there was a very much
wanted distribution that no longer wanter but it's total seen count is
higher than any of the current distributions. When that huge part of the
history goes out of the bounds and getting erased - we might not have
current projection anymore. In that case evaluator will go in not ready
state for a while until the new projected winning distribution is clear.

This change updates CRDs in-place with removal of fields, which is ok
because this is still beta.
  • Loading branch information
dee-kryvenko authored Feb 18, 2025
1 parent 05125ce commit b0a7e00
Show file tree
Hide file tree
Showing 7 changed files with 198 additions and 137 deletions.
10 changes: 10 additions & 0 deletions DESIGN.md
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,16 @@ This will guarantee that we will use an up to date configuration once every pred
It may not be the most efficient accordingly to the latest data, but it's reflecting well the most recent history,
without risking to fall behind too much.

There is an edge case that needs to be explained in a little more details.
The `MostWantedTwoPhaseHysteresisEvaluation` CRD only stores hashes of the recent history for repeated evaluation.
It cannot store full distribution plans as it would quickly get out of etcd object size limits.
Therefore, there could be a scenario when the most-wanted but old record is about to be erased,
while the new projected winner is not the current partitioner output.
Thus - the current projection is effectively unknown (only its hash and last seen time and total seen count is known).
When that happens - evaluator will enter a not-ready state.
Eventually, either another partitioning plan will surpass this currently unknown projection, or -
partitioner will give us another sample with the current projection details that evaluator will be able to remember.

## Scaler

Using input from `.spec.partitionProviderRef` - Scaler can ensure that intention becomes reality.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ type MostWantedTwoPhaseHysteresisEvaluationStatusHistoricalRecord struct {
// +kubebuilder:validation:Required
Timestamp metav1.Time `json:"timestamp,omitempty"`

// Replicas is the partition as it was seen at this moment in time.
// ReplicasHash is the hash of serialized replicas string.
// +kubebuilder:validation:Required
Replicas common.ReplicaList `json:"replicas,omitempty"`
ReplicasHash string `json:"replicasHash,omitempty"`

// SeenTimes is the counter of how many times have this record been seen.
// +kubebuilder:validation:Required
Expand Down
7 changes: 0 additions & 7 deletions api/autoscaler/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -150,104 +150,10 @@ spec:
changes over time.
items:
properties:
replicas:
description: Replicas is the partition as it was seen at this
moment in time.
items:
description: Replica is a representation of the replica for
sharding
properties:
id:
description: ID of the replica, starting from 0 and onward.
format: int32
type: integer
loadIndexes:
description: LoadIndexes shards assigned to this replica
wrapped into their load index.
items:
description: LoadIndex is a representation of a shard
with calculated load index to it.
properties:
displayValue:
description: |-
DisplayValue is the string representation of the without precision guarantee.
This is meaningless and exists purely for convenience of someone who is looking at the kubectl get output.
type: string
shard:
description: Shard is the shard that this load index
is calculated for.
properties:
id:
description: |-
ID of this shard. It may or may not be unique, depending on the discoverer.
For a secret with type=cluster label, this would be the name of the secret.
type: string
name:
description: |-
Name of this shard.
This must be the same as the name of this destination cluster as seen by Application Controller.
type: string
namespace:
description: |-
Namespace of this shard.
For a secret with type=cluster label, this would be the namespace of the secret.
If shard is managed externally - it is expected to be set to some value.
Same as the Application Controller is in - would be a logical choice.
type: string
server:
description: |-
Server of this shard.
This must be the same as the server URL of this destination cluster as seen by Application Controller.
type: string
uid:
description: |-
UID unique identifier of this shard.
There is multiple seemingly duplicative fields here, but this is the only one that is unique.
For example, when the shard is represented as a secret with type=cluster label,
the UID of the secret is a UID of the shard.
Meaning that it would change if the secret is re-created.
That's what is meant by "unique" in this context.
When the discovery was external - this may be arbitrary string unique to that shard.
type: string
required:
- id
- name
- namespace
- server
- uid
type: object
value:
anyOf:
- type: integer
- type: string
description: Value is a value of this load index.
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
x-kubernetes-int-or-string: true
required:
- displayValue
- shard
- value
type: object
type: array
totalLoad:
anyOf:
- type: integer
- type: string
description: TotalLoad is the sum of all load indexes
assigned to this replica.
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
x-kubernetes-int-or-string: true
totalLoadDisplayValue:
description: |-
DisplayValue is the string representation of the total load without precision guarantee.
This is meaningless and exists purely for convenience of someone who is looking at the kubectl get output.
type: string
required:
- loadIndexes
- totalLoad
- totalLoadDisplayValue
type: object
type: array
replicasHash:
description: ReplicasHash is the hash of serialized replicas
string.
type: string
seenTimes:
description: SeenTimes is the counter of how many times have
this record been seen.
Expand All @@ -259,7 +165,7 @@ spec:
format: date-time
type: string
required:
- replicas
- replicasHash
- seenTimes
- timestamp
type: object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ func init() {

// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile
//
//nolint:gocyclo
func (r *MostWantedTwoPhaseHysteresisEvaluationReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := log.FromContext(ctx)
log.V(2).Info("Received reconcile request")
Expand Down Expand Up @@ -202,12 +204,12 @@ func (r *MostWantedTwoPhaseHysteresisEvaluationReconciler) Reconcile(ctx context
return ctrl.Result{}, nil
}

replicas := partitionProvider.GetPartitionProviderStatus().Replicas
log.V(2).Info("Currently reported replicas by partition provider", "count", len(replicas))
currentReplicas := partitionProvider.GetPartitionProviderStatus().Replicas
log.V(2).Info("Currently reported replicas by partition provider", "count", len(currentReplicas))

seenBefore := false
for i, record := range evaluation.Status.History {
if record.Replicas.SerializeToString() == replicas.SerializeToString() {
if record.ReplicasHash == Sha256(currentReplicas.SerializeToString()) {
seenBefore = true
evaluation.Status.History[i].SeenTimes++
evaluation.Status.History[i].Timestamp = metav1.Now()
Expand All @@ -217,9 +219,9 @@ func (r *MostWantedTwoPhaseHysteresisEvaluationReconciler) Reconcile(ctx context
if !seenBefore {
evaluation.Status.History = append(evaluation.Status.History,
autoscalerv1alpha1.MostWantedTwoPhaseHysteresisEvaluationStatusHistoricalRecord{
Timestamp: metav1.Now(),
Replicas: replicas,
SeenTimes: 1,
Timestamp: metav1.Now(),
ReplicasHash: Sha256(currentReplicas.SerializeToString()),
SeenTimes: 1,
},
)
}
Expand All @@ -237,45 +239,65 @@ func (r *MostWantedTwoPhaseHysteresisEvaluationReconciler) Reconcile(ctx context
evaluation.Status.History = cleanHistory
}

historyRecords := map[string]autoscalerv1alpha1.MostWantedTwoPhaseHysteresisEvaluationStatusHistoricalRecord{}
historyRecordsLastSeen := map[string]metav1.Time{}
historyRecorsSeenTimes := map[string]int32{}
historyRecordsByHash := map[string]autoscalerv1alpha1.MostWantedTwoPhaseHysteresisEvaluationStatusHistoricalRecord{}
historyRecordsByHashLastSeen := map[string]metav1.Time{}
historyRecorsByHashSeenTimes := map[string]int32{}
for _, record := range evaluation.Status.History {
serializedRecord := record.Replicas.SerializeToString()
log.V(2).Info("Noticing record", "record", serializedRecord)
if _, ok := historyRecordsLastSeen[serializedRecord]; !ok {
historyRecords[serializedRecord] = record
historyRecordsLastSeen[serializedRecord] = record.Timestamp
historyRecorsSeenTimes[serializedRecord] = record.SeenTimes
} else if record.Timestamp.After(historyRecordsLastSeen[serializedRecord].Time) {
historyRecordsLastSeen[serializedRecord] = record.Timestamp
historyRecorsSeenTimes[serializedRecord] += record.SeenTimes
hash := record.ReplicasHash
log.V(2).Info("Noticing record", "record", hash)
if _, ok := historyRecordsByHashLastSeen[hash]; !ok {
historyRecordsByHash[hash] = record
historyRecordsByHashLastSeen[hash] = record.Timestamp
historyRecorsByHashSeenTimes[hash] = record.SeenTimes
} else if record.Timestamp.After(historyRecordsByHashLastSeen[hash].Time) {
historyRecordsByHashLastSeen[hash] = record.Timestamp
historyRecorsByHashSeenTimes[hash] += record.SeenTimes
} else {
historyRecorsSeenTimes[serializedRecord] += record.SeenTimes
historyRecorsByHashSeenTimes[hash] += record.SeenTimes
}
}

topSeenRecord := autoscalerv1alpha1.MostWantedTwoPhaseHysteresisEvaluationStatusHistoricalRecord{}
maxSeenCount := int32(0)
for serializedRecord, seenTimes := range historyRecorsSeenTimes {
log.V(2).Info("Evaluating records", "record", serializedRecord, "seenTimes", seenTimes)
for hash, seenTimes := range historyRecorsByHashSeenTimes {
log.V(2).Info("Evaluating records", "record", hash, "seenTimes", seenTimes)
if seenTimes > maxSeenCount {
maxSeenCount = seenTimes
topSeenRecord = historyRecords[serializedRecord]
topSeenRecord = historyRecordsByHash[hash]
} else if seenTimes == maxSeenCount &&
historyRecords[serializedRecord].Timestamp.After(topSeenRecord.Timestamp.Time) {
log.V(2).Info("Tie breaker", "left", topSeenRecord, "right", serializedRecord)
topSeenRecord = historyRecords[serializedRecord]
historyRecordsByHash[hash].Timestamp.After(topSeenRecord.Timestamp.Time) {
log.V(2).Info("Tie breaker", "left", topSeenRecord.ReplicasHash, "right", hash)
topSeenRecord = historyRecordsByHash[hash]
}
}
log.V(2).Info("Top seen record", "record", topSeenRecord.Replicas.SerializeToString())
log.V(2).Info("Top seen record", "record", topSeenRecord.ReplicasHash)
if topSeenRecord.ReplicasHash == Sha256(currentReplicas.SerializeToString()) {
log.V(1).Info("A new election projection updated", "record", topSeenRecord.ReplicasHash)
evaluation.Status.Projection = currentReplicas
}
if topSeenRecord.ReplicasHash != Sha256(evaluation.Status.Projection.SerializeToString()) {
err := fmt.Errorf("top seen record is neither current projection nor current partition")
log.Error(err, "Failed to evaluate")
meta.SetStatusCondition(&evaluation.Status.Conditions, metav1.Condition{
Type: StatusTypeReady,
Status: metav1.ConditionFalse,
Reason: "AwaitingNewProjection",
Message: err.Error(),
})
if err := r.Status().Update(ctx, evaluation); err != nil {
log.V(1).Info("Failed to update resource status", "err", err)
return ctrl.Result{}, err
}
// In this case re-queuing will change nothing
return ctrl.Result{}, nil
}
mostWantedTwoPhaseHysteresisEvaluationProjectedShardsGauge.DeletePartialMatch(prometheus.Labels{
"evaluation_ref": req.NamespacedName.String(),
})
mostWantedTwoPhaseHysteresisEvaluationProjectedReplicasTotalLoadGauge.DeletePartialMatch(prometheus.Labels{
"evaluation_ref": req.NamespacedName.String(),
})
for _, replica := range topSeenRecord.Replicas {
for _, replica := range evaluation.Status.Projection {
for _, li := range replica.LoadIndexes {
mostWantedTwoPhaseHysteresisEvaluationProjectedShardsGauge.WithLabelValues(
req.NamespacedName.String(),
Expand All @@ -293,10 +315,9 @@ func (r *MostWantedTwoPhaseHysteresisEvaluationReconciler) Reconcile(ctx context
).Set(replica.TotalLoad.AsApproximateFloat64())
}

evaluation.Status.Projection = topSeenRecord.Replicas
if evaluation.Status.LastEvaluationTimestamp == nil ||
time.Since(evaluation.Status.LastEvaluationTimestamp.Time) >= evaluation.Spec.StabilizationPeriod.Duration {
evaluation.Status.Replicas = topSeenRecord.Replicas
evaluation.Status.Replicas = evaluation.Status.Projection
evaluation.Status.LastEvaluationTimestamp = ptr.To(metav1.Now())
log.Info("New partitioning has won",
"replicas", len(evaluation.Status.Replicas), "lastEvaluationTimestamp", evaluation.Status.LastEvaluationTimestamp)
Expand Down
Loading

0 comments on commit b0a7e00

Please sign in to comment.