Skip to content

Commit

Permalink
Ensure that the in-cluster is always scheduled on replica-0
Browse files Browse the repository at this point in the history
  • Loading branch information
dee-kryvenko committed Feb 18, 2025
1 parent 226285f commit 5628c68
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 3 deletions.
2 changes: 2 additions & 0 deletions api/autoscaler/common/load_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ func (s LoadIndexesDesc) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}

// Less returns true if the value of the LoadIndex at index i is greater than the value of the LoadIndex at index j.
// Note that this is reversed because this is a descending sort implementation.
func (s LoadIndexesDesc) Less(i, j int) bool {
return s[i].Value.AsApproximateFloat64() > s[j].Value.AsApproximateFloat64()
}
29 changes: 26 additions & 3 deletions partitioners/longestprocessingtime/longestprocessingtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,10 @@ func (r *PartitionerImpl) Partition(ctx context.Context,
return replicas, nil
}

sort.Sort(common.LoadIndexesDesc(shards))
bucketSize := shards[0].Value.AsApproximateFloat64()
sortedShards, bucketSize := r.Sort(shards)

replicaCount := int32(0)
for _, shard := range shards {
for _, shard := range sortedShards {
// Find the replica with the least current load.
minLoad := float64(0)
selectedReplicaIndex := -1
Expand Down Expand Up @@ -89,3 +88,27 @@ func (r *PartitionerImpl) Partition(ctx context.Context,

return replicas, nil
}

func (r *PartitionerImpl) Sort(shards []common.LoadIndex) ([]common.LoadIndex, float64) {
if len(shards) == 0 {
return shards, 0
}

sortedShards := common.LoadIndexesDesc(shards)
sort.Sort(sortedShards)
biggestLoad := float64(sortedShards[0].Value.AsApproximateFloat64())

inClusterCondition := func(loadIndex common.LoadIndex) bool {
return loadIndex.Shard.Name == "in-cluster"
}
inClusterIndex := sort.Search(len(sortedShards), func(i int) bool {
return inClusterCondition(sortedShards[i])
})
if inClusterIndex < len(sortedShards) && inClusterCondition(sortedShards[inClusterIndex]) {
inClusterShard := sortedShards[inClusterIndex]
copy(sortedShards[inClusterIndex:], sortedShards[inClusterIndex+1:])
sortedShards = sortedShards[:len(sortedShards)-1]
return append(common.LoadIndexesDesc{inClusterShard}, sortedShards...), biggestLoad
}
return sortedShards, biggestLoad
}
100 changes: 100 additions & 0 deletions partitioners/longestprocessingtime/longestprocessingtime_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package longestprocessingtime

import (
"context"
"testing"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/plumber-cd/argocd-autoscaler/api/autoscaler/common"
"k8s.io/apimachinery/pkg/api/resource"
)

func TestControllers(t *testing.T) {
RegisterFailHandler(Fail)

RunSpecs(t, "LongestProcessingTimePartitioner Suite")
}

var _ = Describe("LongestProcessingTimePartitioner", func() {
Context("when unsorted list of shards", func() {
It("should sort", func() {
shards := []common.LoadIndex{
{Shard: common.Shard{Name: "shard1"}, Value: resource.MustParse("5")},
{Shard: common.Shard{Name: "shard2"}, Value: resource.MustParse("3")},
{Shard: common.Shard{Name: "shard3"}, Value: resource.MustParse("8")},
{Shard: common.Shard{Name: "shard4"}, Value: resource.MustParse("6")},
{Shard: common.Shard{Name: "shard5"}, Value: resource.MustParse("7")},
}
partitioner := PartitionerImpl{}
sortedShards, bucketSize := partitioner.Sort(shards)
Expect(bucketSize).To(Equal(float64(8)))
Expect(len(sortedShards)).To(Equal(len(shards)))

Check failure on line 32 in partitioners/longestprocessingtime/longestprocessingtime_test.go

View workflow job for this annotation

GitHub Actions / Run on Ubuntu

ginkgo-linter: wrong length assertion. Consider using `Expect(sortedShards).To(HaveLen(len(shards)))` instead (ginkgolinter)

Check failure on line 32 in partitioners/longestprocessingtime/longestprocessingtime_test.go

View workflow job for this annotation

GitHub Actions / Run on Ubuntu

ginkgo-linter: wrong length assertion. Consider using `Expect(sortedShards).To(HaveLen(len(shards)))` instead (ginkgolinter)
Expect(sortedShards[0].Shard.Name).To(Equal("shard3"))
Expect(sortedShards[1].Shard.Name).To(Equal("shard5"))
Expect(sortedShards[2].Shard.Name).To(Equal("shard4"))
Expect(sortedShards[3].Shard.Name).To(Equal("shard1"))
Expect(sortedShards[4].Shard.Name).To(Equal("shard2"))
})

It("should put in-cluster first", func() {
shards := []common.LoadIndex{
{Shard: common.Shard{Name: "shard1"}, Value: resource.MustParse("5")},
{Shard: common.Shard{Name: "shard2"}, Value: resource.MustParse("3")},
{Shard: common.Shard{Name: "shard3"}, Value: resource.MustParse("8")},
{Shard: common.Shard{Name: "in-cluster"}, Value: resource.MustParse("6")},
{Shard: common.Shard{Name: "shard5"}, Value: resource.MustParse("7")},
}
partitioner := PartitionerImpl{}
sortedShards, bucketSize := partitioner.Sort(shards)
Expect(bucketSize).To(Equal(float64(8)))
Expect(len(sortedShards)).To(Equal(len(shards)))

Check failure on line 51 in partitioners/longestprocessingtime/longestprocessingtime_test.go

View workflow job for this annotation

GitHub Actions / Run on Ubuntu

ginkgo-linter: wrong length assertion. Consider using `Expect(sortedShards).To(HaveLen(len(shards)))` instead (ginkgolinter)

Check failure on line 51 in partitioners/longestprocessingtime/longestprocessingtime_test.go

View workflow job for this annotation

GitHub Actions / Run on Ubuntu

ginkgo-linter: wrong length assertion. Consider using `Expect(sortedShards).To(HaveLen(len(shards)))` instead (ginkgolinter)
Expect(sortedShards[0].Shard.Name).To(Equal("in-cluster"))
Expect(sortedShards[1].Shard.Name).To(Equal("shard3"))
Expect(sortedShards[2].Shard.Name).To(Equal("shard5"))
Expect(sortedShards[3].Shard.Name).To(Equal("shard1"))
Expect(sortedShards[4].Shard.Name).To(Equal("shard2"))
})
})

Context("when partitioning", func() {
It("should partition", func() {
shards := []common.LoadIndex{
{Shard: common.Shard{Name: "shard1"}, Value: resource.MustParse("5")},
{Shard: common.Shard{Name: "shard2"}, Value: resource.MustParse("3")},
{Shard: common.Shard{Name: "shard3"}, Value: resource.MustParse("10")},
{Shard: common.Shard{Name: "in-cluster"}, Value: resource.MustParse("6")},
{Shard: common.Shard{Name: "shard5"}, Value: resource.MustParse("7")},
}
partitioner := PartitionerImpl{}
replicas, err := partitioner.Partition(context.TODO(), shards)
Expect(err).ToNot(HaveOccurred())
Expect(replicas).To(HaveLen(4))

Expect(replicas[0].ID).To(Equal(int32(0)))
Expect(replicas[0].LoadIndexes).To(HaveLen(1))
Expect(replicas[0].LoadIndexes[0].Shard.Name).To(Equal("in-cluster"))
Expect(replicas[0].TotalLoad.AsApproximateFloat64()).To(Equal(float64(6)))
Expect(replicas[0].TotalLoadDisplayValue).To(Equal("6"))

Expect(replicas[1].ID).To(Equal(int32(1)))
Expect(replicas[1].LoadIndexes).To(HaveLen(1))
Expect(replicas[1].LoadIndexes[0].Shard.Name).To(Equal("shard3"))
Expect(replicas[1].TotalLoad.AsApproximateFloat64()).To(Equal(float64(10)))
Expect(replicas[1].TotalLoadDisplayValue).To(Equal("10"))

Expect(replicas[2].ID).To(Equal(int32(2)))
Expect(replicas[2].LoadIndexes).To(HaveLen(1))
Expect(replicas[2].LoadIndexes[0].Shard.Name).To(Equal("shard5"))
Expect(replicas[2].TotalLoad.AsApproximateFloat64()).To(Equal(float64(7)))
Expect(replicas[2].TotalLoadDisplayValue).To(Equal("7"))

Expect(replicas[3].ID).To(Equal(int32(3)))
Expect(replicas[3].LoadIndexes).To(HaveLen(2))
Expect(replicas[3].LoadIndexes[0].Shard.Name).To(Equal("shard1"))
Expect(replicas[3].LoadIndexes[1].Shard.Name).To(Equal("shard2"))
Expect(replicas[3].TotalLoad.AsApproximateFloat64()).To(Equal(float64(8)))
Expect(replicas[3].TotalLoadDisplayValue).To(Equal("8"))
})
})
})

0 comments on commit 5628c68

Please sign in to comment.