Skip to content

Commit

Permalink
Move positive offset logic from load indexer to normalizer where it b…
Browse files Browse the repository at this point in the history
…elongs (#7)
  • Loading branch information
dee-kryvenko authored Feb 13, 2025
1 parent cc7b511 commit 07c47b5
Show file tree
Hide file tree
Showing 17 changed files with 71 additions and 60 deletions.
19 changes: 11 additions & 8 deletions DESIGN.md
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,16 @@ It uses robust statistics (median and interquartile range, often denoted IQR) ra
- Less sensitive to extreme outliers than other methods (like min-max).
- Keeps each metric’s range more comparable.

4. **Positive Offset:**
- Output from this normalizer will be a set of values centered around 0, with negative values
representing metrics below the median.
Some implementations of the load index do not work well with negative values.
- Additional option exists to make sure that there are no negative values by using offset to the right:
`offset = -(min_value) + ε * (max_value - min_value))`
A good value for `ε` based on my practical tests seem to be `0.01`.
A value of `0` will just upscale smallest replica to be `0`, which is also an option.
I just wanted any non zero metric to have non zero weight.

## Load Indexer

Load Indexer can either use the Poller directly or the Normalizer to source the metrics from `.spec.metricValuesProviderRef`.
Expand Down Expand Up @@ -245,12 +255,6 @@ We use positive variation as that would be easier to represent and use for chart
`x_1(S), x_2(S), ..., x_k(S)`.
- Each metric `i` has an importance weight `w_i`.
- Choose a `p` value (commonly 2 or 3) that controls how much large values stand out.
- Input metrics cannot be negative, as that might result in negative load index.
Which is fine as per the algorithm, but does not makes sense for how we are using it.
A shard cannot have negative load and subtract from the replica.
Our implementation will make sure that there are no negative values by using offset to the right:
`offset = -(min_value) + ε * (max_value - min_value))`
A good value for `ε` based on my experiments on Robust Scaling normalizer output would be `0.01`.

2. **Formula (p-Norm):**
`LoadIndex(S) = ( Σ[i=1..k] [ w_i * ( x_i(S) )^p ] )^(1/p)`
Expand All @@ -259,8 +263,7 @@ We use positive variation as that would be easier to represent and use for chart
- If `p = 1`, this reduces to a simple weighted sum.
- If `p = 2`, it behaves like a weighted Euclidean norm (moderate emphasis on larger values).
- Larger `p` values increase the influence of the largest metric but still combine the rest.
- Tip: with Robust Scaling normalization, the values will already be centered around 0, and offset formula above
takes care of the negative values. You may want to use `p = 1` in that case,
- Tip: with Robust Scaling normalization with positive offset you may want to use `p = 1`,
otherwise your index for largest replica will get so big that other replicas will be packed together too tight.

## Partitioner
Expand Down
5 changes: 5 additions & 0 deletions api/autoscaler/v1alpha1/robustscalingnormalizer_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,17 @@ package v1alpha1

import (
"github.com/plumber-cd/argocd-autoscaler/api/autoscaler/common"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// RobustScalingNormalizerSpec defines the desired state of RobustScalingNormalizer.
type RobustScalingNormalizerSpec struct {
common.NormalizerSpec `json:",inline"`

// PositiveOffsetE is an epsilon for the offset to eliminate negative values.
// If not set (default value) - no offset will be applied, and normalized result will end up with negative values.
PositiveOffsetE *resource.Quantity `json:"positiveOffsetE,omitempty"`
}

// RobustScalingNormalizerStatus defines the observed state of RobustScalingNormalizer.
Expand Down
4 changes: 0 additions & 4 deletions api/autoscaler/v1alpha1/weightedpnormloadindex_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,6 @@ type WeightedPNormLoadIndexSpec struct {
// +kubebuilder:validation:Required
P int32 `json:"p,omitempty"`

// OffsetE is an epsilon for the offset to eliminate negative values.
// +kubebuilder:validation:Required
OffsetE resource.Quantity `json:"offsetE,omitempty"`

// Weights is the list of metrics and their weights to use in this load index.
// +kubebuilder:validation:Required
Weights []WeightedPNormLoadIndexWeight `json:"weights,omitempty"`
Expand Down
6 changes: 5 additions & 1 deletion api/autoscaler/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,15 @@ spec:
- name
type: object
x-kubernetes-map-type: atomic
positiveOffsetE:
anyOf:
- type: integer
- type: string
description: |-
PositiveOffsetE is an epsilon for the offset to eliminate negative values.
If not set (default value) - no offset will be applied, and normalized result will end up with negative values.
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
x-kubernetes-int-or-string: true
required:
- metricValuesProviderRef
type: object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,6 @@ spec:
- name
type: object
x-kubernetes-map-type: atomic
offsetE:
anyOf:
- type: integer
- type: string
description: OffsetE is an epsilon for the offset to eliminate negative
values.
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
x-kubernetes-int-or-string: true
p:
description: |-
P is the vaue of p in the p-norm
Expand Down Expand Up @@ -107,7 +99,6 @@ spec:
type: array
required:
- metricValuesProviderRef
- offsetE
- p
- weights
type: object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ spec:
apiGroup: autoscaler.argoproj.io
kind: PrometheusPoll
name: default
positiveOffsetE: "0.01"
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ spec:
kind: RobustScalingNormalizer
name: default
p: 1
offsetE: "0.01"
weights:
- id: argocd_app_info
weight: "0.05"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ spec:
apiGroup: autoscaler.argoproj.io
kind: PrometheusPoll
name: sample
positiveOffsetE: "0.01"
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ spec:
kind: RobustScalingNormalizer
name: sample
p: 1
offsetE: "0.01"
weights:
- id: up
weight: "1"
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,7 @@ var _ = Describe("LongestProcessingTimePartition Controller", func() {
Name: "N/A",
},
},
P: 1,
OffsetE: resource.MustParse("0.01"),
P: 1,
Weights: []autoscalerv1alpha1.WeightedPNormLoadIndexWeight{
{
ID: "fake-metric",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -158,8 +159,12 @@ func (r *RobustScalingNormalizerReconciler) Reconcile(ctx context.Context, req c
return ctrl.Result{}, nil
}

var e *float64
if normalizer.Spec.PositiveOffsetE != nil {
e = ptr.To(normalizer.Spec.PositiveOffsetE.AsApproximateFloat64())
}
values := metricValuesProvider.GetMetricValuesProviderStatus().Values
normalizedValues, err := r.Normalizer.Normalize(ctx, values)
normalizedValues, err := r.Normalizer.Normalize(ctx, e, values)
if err != nil {
log.Error(err, "Error during normalization")
meta.SetStatusCondition(&normalizer.Status.Conditions, metav1.Condition{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,14 @@ import (
)

type fakeNormalizer struct {
fn func(ctx context.Context, allMetrics []common.MetricValue) ([]common.MetricValue, error)
fn func(context.Context, *float64, []common.MetricValue) ([]common.MetricValue, error)
normalized bool
}

func (f *fakeNormalizer) Normalize(ctx context.Context, allMetrics []common.MetricValue) ([]common.MetricValue, error) {
func (f *fakeNormalizer) Normalize(ctx context.Context, e *float64, allMetrics []common.MetricValue) ([]common.MetricValue, error) {
f.normalized = true
if f.fn != nil {
return f.fn(ctx, allMetrics)
return f.fn(ctx, e, allMetrics)
}
return nil, errors.New("fake normalizer not implemented")
}
Expand Down Expand Up @@ -230,6 +230,7 @@ var _ = Describe("PrometheusPoll Controller", func() {
func(run *ScenarioRun[*autoscalerv1alpha1.RobustScalingNormalizer]) {
normalizer.fn = func(
ctx context.Context,
e *float64,
values []common.MetricValue,
) ([]common.MetricValue, error) {
normalizedValues := []common.MetricValue{}
Expand Down Expand Up @@ -308,6 +309,7 @@ var _ = Describe("PrometheusPoll Controller", func() {
func(run *ScenarioRun[*autoscalerv1alpha1.RobustScalingNormalizer]) {
normalizer.fn = func(
ctx context.Context,
e *float64,
values []common.MetricValue,
) ([]common.MetricValue, error) {
return nil, errors.New("fake error")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,6 @@ func (r *WeightedPNormLoadIndexReconciler) Reconcile(ctx context.Context, req ct
values := metricValuesProvider.GetMetricValuesProviderStatus().Values
loadIndexes, err := r.LoadIndexer.Calculate(
loadIndex.Spec.P,
loadIndex.Spec.OffsetE.AsApproximateFloat64(),
weightsByID,
values,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@ import (
)

type fakeLoadIndexer struct {
fn func(int32, float64, map[string]autoscalerv1alpha1.WeightedPNormLoadIndexWeight, []common.MetricValue) ([]common.LoadIndex, error)
fn func(int32, map[string]autoscalerv1alpha1.WeightedPNormLoadIndexWeight, []common.MetricValue) ([]common.LoadIndex, error)
indexed bool
}

func (f *fakeLoadIndexer) Calculate(p int32, e float64, weights map[string]autoscalerv1alpha1.WeightedPNormLoadIndexWeight, values []common.MetricValue) ([]common.LoadIndex, error) {
func (f *fakeLoadIndexer) Calculate(p int32, weights map[string]autoscalerv1alpha1.WeightedPNormLoadIndexWeight, values []common.MetricValue) ([]common.LoadIndex, error) {
f.indexed = true
if f.fn != nil {
return f.fn(p, e, weights, values)
return f.fn(p, weights, values)
}
return nil, errors.New("fake error in fakeLoadIndexer")
}
Expand Down Expand Up @@ -265,7 +265,6 @@ var _ = Describe("WeightedPNormLoadIndex Controller", func() {
func(run *ScenarioRun[*autoscalerv1alpha1.WeightedPNormLoadIndex]) {
indexer.fn = func(
p int32,
e float64,
weights map[string]autoscalerv1alpha1.WeightedPNormLoadIndexWeight,
values []common.MetricValue,
) ([]common.LoadIndex, error) {
Expand Down Expand Up @@ -335,7 +334,6 @@ var _ = Describe("WeightedPNormLoadIndex Controller", func() {
func(run *ScenarioRun[*autoscalerv1alpha1.WeightedPNormLoadIndex]) {
indexer.fn = func(
p int32,
e float64,
weights map[string]autoscalerv1alpha1.WeightedPNormLoadIndexWeight,
values []common.MetricValue,
) ([]common.LoadIndex, error) {
Expand Down
24 changes: 1 addition & 23 deletions loadindexers/weightedpnorm/weightedpnorm.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,14 @@ import (
)

type LoadIndexer interface {
Calculate(int32, float64,
Calculate(int32,
map[string]autoscaler.WeightedPNormLoadIndexWeight, []common.MetricValue) ([]common.LoadIndex, error)
}

type LoadIndexerImpl struct{}

func (li *LoadIndexerImpl) Calculate(
p int32,
e float64,
weights map[string]autoscaler.WeightedPNormLoadIndexWeight,
values []common.MetricValue,
) ([]common.LoadIndex, error) {
Expand Down Expand Up @@ -66,27 +65,6 @@ func (li *LoadIndexerImpl) Calculate(
}
}

minValue := math.MaxFloat64
maxValue := math.MaxFloat64 * -1
for _, values := range metricsByShard {
for _, m := range values {
if m.Value.AsApproximateFloat64() < minValue {
minValue = m.Value.AsApproximateFloat64()
}
if m.Value.AsApproximateFloat64() > maxValue {
maxValue = m.Value.AsApproximateFloat64()
}
}
}
offset := -(minValue) + e*(maxValue-minValue)
for shardUID, values := range metricsByShard {
for i, m := range values {
scaled := m.Value.AsApproximateFloat64() + offset
metricValueAsString := strconv.FormatFloat(scaled, 'f', -1, 64)
metricsByShard[shardUID][i].Value = resource.MustParse(metricValueAsString)
}
}

loadIndexes := []common.LoadIndex{}
for shardUID, values := range metricsByShard {
value := li.loadIndex(
Expand Down
24 changes: 23 additions & 1 deletion normalizers/robustscaling/robustscaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package robustscaling

import (
"context"
"math"
"sort"
"strconv"

Expand All @@ -25,12 +26,13 @@ import (
)

type Normalizer interface {
Normalize(ctx context.Context, allMetrics []common.MetricValue) ([]common.MetricValue, error)
Normalize(context.Context, *float64, []common.MetricValue) ([]common.MetricValue, error)
}

type NormalizerImpl struct{}

func (r *NormalizerImpl) Normalize(ctx context.Context,
e *float64,
allMetrics []common.MetricValue) ([]common.MetricValue, error) {

log := log.FromContext(ctx)
Expand Down Expand Up @@ -99,6 +101,26 @@ func (r *NormalizerImpl) Normalize(ctx context.Context,
}
}

if e != nil {
minValue := math.MaxFloat64
maxValue := math.MaxFloat64 * -1
for _, metric := range normalizedMetrics {
if metric.Value.AsApproximateFloat64() < minValue {
minValue = metric.Value.AsApproximateFloat64()
}
if metric.Value.AsApproximateFloat64() > maxValue {
maxValue = metric.Value.AsApproximateFloat64()
}
}
_e := *e
offset := -(minValue) + _e*(maxValue-minValue)
for i, metric := range normalizedMetrics {
scaled := metric.Value.AsApproximateFloat64() + offset
metricValueAsString := strconv.FormatFloat(scaled, 'f', -1, 64)
normalizedMetrics[i].Value = resource.MustParse(metricValueAsString)
}
}

return normalizedMetrics, nil
}

Expand Down

0 comments on commit 07c47b5

Please sign in to comment.