Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BREAKING] Only store hash of distributions over time #12

Merged
merged 3 commits into from
Feb 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions DESIGN.md
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,16 @@ This will guarantee that we will use an up to date configuration once every pred
It may not be the most efficient accordingly to the latest data, but it's reflecting well the most recent history,
without risking to fall behind too much.

There is an edge case that needs to be explained in a little more details.
The `MostWantedTwoPhaseHysteresisEvaluation` CRD only stores hashes of the recent history for repeated evaluation.
It cannot store full distribution plans as it would quickly get out of etcd object size limits.
Therefore, there could be a scenario when the most-wanted but old record is about to be erased,
while the new projected winner is not the current partitioner output.
Thus - the current projection is effectively unknown (only its hash and last seen time and total seen count is known).
When that happens - evaluator will enter a not-ready state.
Eventually, either another partitioning plan will surpass this currently unknown projection, or -
partitioner will give us another sample with the current projection details that evaluator will be able to remember.

## Scaler

Using input from `.spec.partitionProviderRef` - Scaler can ensure that intention becomes reality.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ type MostWantedTwoPhaseHysteresisEvaluationStatusHistoricalRecord struct {
// +kubebuilder:validation:Required
Timestamp metav1.Time `json:"timestamp,omitempty"`

// Replicas is the partition as it was seen at this moment in time.
// ReplicasHash is the hash of serialized replicas string.
// +kubebuilder:validation:Required
Replicas common.ReplicaList `json:"replicas,omitempty"`
ReplicasHash string `json:"replicasHash,omitempty"`

// SeenTimes is the counter of how many times have this record been seen.
// +kubebuilder:validation:Required
Expand Down
7 changes: 0 additions & 7 deletions api/autoscaler/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -150,104 +150,10 @@ spec:
changes over time.
items:
properties:
replicas:
description: Replicas is the partition as it was seen at this
moment in time.
items:
description: Replica is a representation of the replica for
sharding
properties:
id:
description: ID of the replica, starting from 0 and onward.
format: int32
type: integer
loadIndexes:
description: LoadIndexes shards assigned to this replica
wrapped into their load index.
items:
description: LoadIndex is a representation of a shard
with calculated load index to it.
properties:
displayValue:
description: |-
DisplayValue is the string representation of the without precision guarantee.
This is meaningless and exists purely for convenience of someone who is looking at the kubectl get output.
type: string
shard:
description: Shard is the shard that this load index
is calculated for.
properties:
id:
description: |-
ID of this shard. It may or may not be unique, depending on the discoverer.
For a secret with type=cluster label, this would be the name of the secret.
type: string
name:
description: |-
Name of this shard.
This must be the same as the name of this destination cluster as seen by Application Controller.
type: string
namespace:
description: |-
Namespace of this shard.
For a secret with type=cluster label, this would be the namespace of the secret.
If shard is managed externally - it is expected to be set to some value.
Same as the Application Controller is in - would be a logical choice.
type: string
server:
description: |-
Server of this shard.
This must be the same as the server URL of this destination cluster as seen by Application Controller.
type: string
uid:
description: |-
UID unique identifier of this shard.
There is multiple seemingly duplicative fields here, but this is the only one that is unique.
For example, when the shard is represented as a secret with type=cluster label,
the UID of the secret is a UID of the shard.
Meaning that it would change if the secret is re-created.
That's what is meant by "unique" in this context.
When the discovery was external - this may be arbitrary string unique to that shard.
type: string
required:
- id
- name
- namespace
- server
- uid
type: object
value:
anyOf:
- type: integer
- type: string
description: Value is a value of this load index.
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
x-kubernetes-int-or-string: true
required:
- displayValue
- shard
- value
type: object
type: array
totalLoad:
anyOf:
- type: integer
- type: string
description: TotalLoad is the sum of all load indexes
assigned to this replica.
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
x-kubernetes-int-or-string: true
totalLoadDisplayValue:
description: |-
DisplayValue is the string representation of the total load without precision guarantee.
This is meaningless and exists purely for convenience of someone who is looking at the kubectl get output.
type: string
required:
- loadIndexes
- totalLoad
- totalLoadDisplayValue
type: object
type: array
replicasHash:
description: ReplicasHash is the hash of serialized replicas
string.
type: string
seenTimes:
description: SeenTimes is the counter of how many times have
this record been seen.
Expand All @@ -259,7 +165,7 @@ spec:
format: date-time
type: string
required:
- replicas
- replicasHash
- seenTimes
- timestamp
type: object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ func init() {

// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile
//
//nolint:gocyclo
func (r *MostWantedTwoPhaseHysteresisEvaluationReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := log.FromContext(ctx)
log.V(2).Info("Received reconcile request")
Expand Down Expand Up @@ -202,12 +204,12 @@ func (r *MostWantedTwoPhaseHysteresisEvaluationReconciler) Reconcile(ctx context
return ctrl.Result{}, nil
}

replicas := partitionProvider.GetPartitionProviderStatus().Replicas
log.V(2).Info("Currently reported replicas by partition provider", "count", len(replicas))
currentReplicas := partitionProvider.GetPartitionProviderStatus().Replicas
log.V(2).Info("Currently reported replicas by partition provider", "count", len(currentReplicas))

seenBefore := false
for i, record := range evaluation.Status.History {
if record.Replicas.SerializeToString() == replicas.SerializeToString() {
if record.ReplicasHash == Sha256(currentReplicas.SerializeToString()) {
seenBefore = true
evaluation.Status.History[i].SeenTimes++
evaluation.Status.History[i].Timestamp = metav1.Now()
Expand All @@ -217,9 +219,9 @@ func (r *MostWantedTwoPhaseHysteresisEvaluationReconciler) Reconcile(ctx context
if !seenBefore {
evaluation.Status.History = append(evaluation.Status.History,
autoscalerv1alpha1.MostWantedTwoPhaseHysteresisEvaluationStatusHistoricalRecord{
Timestamp: metav1.Now(),
Replicas: replicas,
SeenTimes: 1,
Timestamp: metav1.Now(),
ReplicasHash: Sha256(currentReplicas.SerializeToString()),
SeenTimes: 1,
},
)
}
Expand All @@ -237,45 +239,65 @@ func (r *MostWantedTwoPhaseHysteresisEvaluationReconciler) Reconcile(ctx context
evaluation.Status.History = cleanHistory
}

historyRecords := map[string]autoscalerv1alpha1.MostWantedTwoPhaseHysteresisEvaluationStatusHistoricalRecord{}
historyRecordsLastSeen := map[string]metav1.Time{}
historyRecorsSeenTimes := map[string]int32{}
historyRecordsByHash := map[string]autoscalerv1alpha1.MostWantedTwoPhaseHysteresisEvaluationStatusHistoricalRecord{}
historyRecordsByHashLastSeen := map[string]metav1.Time{}
historyRecorsByHashSeenTimes := map[string]int32{}
for _, record := range evaluation.Status.History {
serializedRecord := record.Replicas.SerializeToString()
log.V(2).Info("Noticing record", "record", serializedRecord)
if _, ok := historyRecordsLastSeen[serializedRecord]; !ok {
historyRecords[serializedRecord] = record
historyRecordsLastSeen[serializedRecord] = record.Timestamp
historyRecorsSeenTimes[serializedRecord] = record.SeenTimes
} else if record.Timestamp.After(historyRecordsLastSeen[serializedRecord].Time) {
historyRecordsLastSeen[serializedRecord] = record.Timestamp
historyRecorsSeenTimes[serializedRecord] += record.SeenTimes
hash := record.ReplicasHash
log.V(2).Info("Noticing record", "record", hash)
if _, ok := historyRecordsByHashLastSeen[hash]; !ok {
historyRecordsByHash[hash] = record
historyRecordsByHashLastSeen[hash] = record.Timestamp
historyRecorsByHashSeenTimes[hash] = record.SeenTimes
} else if record.Timestamp.After(historyRecordsByHashLastSeen[hash].Time) {
historyRecordsByHashLastSeen[hash] = record.Timestamp
historyRecorsByHashSeenTimes[hash] += record.SeenTimes
} else {
historyRecorsSeenTimes[serializedRecord] += record.SeenTimes
historyRecorsByHashSeenTimes[hash] += record.SeenTimes
}
}

topSeenRecord := autoscalerv1alpha1.MostWantedTwoPhaseHysteresisEvaluationStatusHistoricalRecord{}
maxSeenCount := int32(0)
for serializedRecord, seenTimes := range historyRecorsSeenTimes {
log.V(2).Info("Evaluating records", "record", serializedRecord, "seenTimes", seenTimes)
for hash, seenTimes := range historyRecorsByHashSeenTimes {
log.V(2).Info("Evaluating records", "record", hash, "seenTimes", seenTimes)
if seenTimes > maxSeenCount {
maxSeenCount = seenTimes
topSeenRecord = historyRecords[serializedRecord]
topSeenRecord = historyRecordsByHash[hash]
} else if seenTimes == maxSeenCount &&
historyRecords[serializedRecord].Timestamp.After(topSeenRecord.Timestamp.Time) {
log.V(2).Info("Tie breaker", "left", topSeenRecord, "right", serializedRecord)
topSeenRecord = historyRecords[serializedRecord]
historyRecordsByHash[hash].Timestamp.After(topSeenRecord.Timestamp.Time) {
log.V(2).Info("Tie breaker", "left", topSeenRecord.ReplicasHash, "right", hash)
topSeenRecord = historyRecordsByHash[hash]
}
}
log.V(2).Info("Top seen record", "record", topSeenRecord.Replicas.SerializeToString())
log.V(2).Info("Top seen record", "record", topSeenRecord.ReplicasHash)
if topSeenRecord.ReplicasHash == Sha256(currentReplicas.SerializeToString()) {
log.V(1).Info("A new election projection updated", "record", topSeenRecord.ReplicasHash)
evaluation.Status.Projection = currentReplicas
}
if topSeenRecord.ReplicasHash != Sha256(evaluation.Status.Projection.SerializeToString()) {
err := fmt.Errorf("top seen record is neither current projection nor current partition")
log.Error(err, "Failed to evaluate")
meta.SetStatusCondition(&evaluation.Status.Conditions, metav1.Condition{
Type: StatusTypeReady,
Status: metav1.ConditionFalse,
Reason: "AwaitingNewProjection",
Message: err.Error(),
})
if err := r.Status().Update(ctx, evaluation); err != nil {
log.V(1).Info("Failed to update resource status", "err", err)
return ctrl.Result{}, err
}
// In this case re-queuing will change nothing
return ctrl.Result{}, nil
}
mostWantedTwoPhaseHysteresisEvaluationProjectedShardsGauge.DeletePartialMatch(prometheus.Labels{
"evaluation_ref": req.NamespacedName.String(),
})
mostWantedTwoPhaseHysteresisEvaluationProjectedReplicasTotalLoadGauge.DeletePartialMatch(prometheus.Labels{
"evaluation_ref": req.NamespacedName.String(),
})
for _, replica := range topSeenRecord.Replicas {
for _, replica := range evaluation.Status.Projection {
for _, li := range replica.LoadIndexes {
mostWantedTwoPhaseHysteresisEvaluationProjectedShardsGauge.WithLabelValues(
req.NamespacedName.String(),
Expand All @@ -293,10 +315,9 @@ func (r *MostWantedTwoPhaseHysteresisEvaluationReconciler) Reconcile(ctx context
).Set(replica.TotalLoad.AsApproximateFloat64())
}

evaluation.Status.Projection = topSeenRecord.Replicas
if evaluation.Status.LastEvaluationTimestamp == nil ||
time.Since(evaluation.Status.LastEvaluationTimestamp.Time) >= evaluation.Spec.StabilizationPeriod.Duration {
evaluation.Status.Replicas = topSeenRecord.Replicas
evaluation.Status.Replicas = evaluation.Status.Projection
evaluation.Status.LastEvaluationTimestamp = ptr.To(metav1.Now())
log.Info("New partitioning has won",
"replicas", len(evaluation.Status.Replicas), "lastEvaluationTimestamp", evaluation.Status.LastEvaluationTimestamp)
Expand Down
Loading