From 5628c683c749b6cd3c79d6d3434f41554f45d7c8 Mon Sep 17 00:00:00 2001 From: Dmytro Date: Tue, 18 Feb 2025 13:20:26 -0700 Subject: [PATCH] Ensure that the in-cluster is always scheduled on replica-0 --- api/autoscaler/common/load_index.go | 2 + .../longestprocessingtime.go | 29 ++++- .../longestprocessingtime_test.go | 100 ++++++++++++++++++ 3 files changed, 128 insertions(+), 3 deletions(-) create mode 100644 partitioners/longestprocessingtime/longestprocessingtime_test.go diff --git a/api/autoscaler/common/load_index.go b/api/autoscaler/common/load_index.go index 0664ae8..3da9ff9 100644 --- a/api/autoscaler/common/load_index.go +++ b/api/autoscaler/common/load_index.go @@ -43,6 +43,8 @@ func (s LoadIndexesDesc) Swap(i, j int) { s[i], s[j] = s[j], s[i] } +// Less returns true if the value of the LoadIndex at index i is greater than the value of the LoadIndex at index j. +// Note that this is reversed because this is a descending sort implementation. func (s LoadIndexesDesc) Less(i, j int) bool { return s[i].Value.AsApproximateFloat64() > s[j].Value.AsApproximateFloat64() } diff --git a/partitioners/longestprocessingtime/longestprocessingtime.go b/partitioners/longestprocessingtime/longestprocessingtime.go index 58073d8..e0a79cf 100644 --- a/partitioners/longestprocessingtime/longestprocessingtime.go +++ b/partitioners/longestprocessingtime/longestprocessingtime.go @@ -42,11 +42,10 @@ func (r *PartitionerImpl) Partition(ctx context.Context, return replicas, nil } - sort.Sort(common.LoadIndexesDesc(shards)) - bucketSize := shards[0].Value.AsApproximateFloat64() + sortedShards, bucketSize := r.Sort(shards) replicaCount := int32(0) - for _, shard := range shards { + for _, shard := range sortedShards { // Find the replica with the least current load. minLoad := float64(0) selectedReplicaIndex := -1 @@ -89,3 +88,27 @@ func (r *PartitionerImpl) Partition(ctx context.Context, return replicas, nil } + +func (r *PartitionerImpl) Sort(shards []common.LoadIndex) ([]common.LoadIndex, float64) { + if len(shards) == 0 { + return shards, 0 + } + + sortedShards := common.LoadIndexesDesc(shards) + sort.Sort(sortedShards) + biggestLoad := float64(sortedShards[0].Value.AsApproximateFloat64()) + + inClusterCondition := func(loadIndex common.LoadIndex) bool { + return loadIndex.Shard.Name == "in-cluster" + } + inClusterIndex := sort.Search(len(sortedShards), func(i int) bool { + return inClusterCondition(sortedShards[i]) + }) + if inClusterIndex < len(sortedShards) && inClusterCondition(sortedShards[inClusterIndex]) { + inClusterShard := sortedShards[inClusterIndex] + copy(sortedShards[inClusterIndex:], sortedShards[inClusterIndex+1:]) + sortedShards = sortedShards[:len(sortedShards)-1] + return append(common.LoadIndexesDesc{inClusterShard}, sortedShards...), biggestLoad + } + return sortedShards, biggestLoad +} diff --git a/partitioners/longestprocessingtime/longestprocessingtime_test.go b/partitioners/longestprocessingtime/longestprocessingtime_test.go new file mode 100644 index 0000000..5858ef6 --- /dev/null +++ b/partitioners/longestprocessingtime/longestprocessingtime_test.go @@ -0,0 +1,100 @@ +package longestprocessingtime + +import ( + "context" + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/plumber-cd/argocd-autoscaler/api/autoscaler/common" + "k8s.io/apimachinery/pkg/api/resource" +) + +func TestControllers(t *testing.T) { + RegisterFailHandler(Fail) + + RunSpecs(t, "LongestProcessingTimePartitioner Suite") +} + +var _ = Describe("LongestProcessingTimePartitioner", func() { + Context("when unsorted list of shards", func() { + It("should sort", func() { + shards := []common.LoadIndex{ + {Shard: common.Shard{Name: "shard1"}, Value: resource.MustParse("5")}, + {Shard: common.Shard{Name: "shard2"}, Value: resource.MustParse("3")}, + {Shard: common.Shard{Name: "shard3"}, Value: resource.MustParse("8")}, + {Shard: common.Shard{Name: "shard4"}, Value: resource.MustParse("6")}, + {Shard: common.Shard{Name: "shard5"}, Value: resource.MustParse("7")}, + } + partitioner := PartitionerImpl{} + sortedShards, bucketSize := partitioner.Sort(shards) + Expect(bucketSize).To(Equal(float64(8))) + Expect(len(sortedShards)).To(Equal(len(shards))) + Expect(sortedShards[0].Shard.Name).To(Equal("shard3")) + Expect(sortedShards[1].Shard.Name).To(Equal("shard5")) + Expect(sortedShards[2].Shard.Name).To(Equal("shard4")) + Expect(sortedShards[3].Shard.Name).To(Equal("shard1")) + Expect(sortedShards[4].Shard.Name).To(Equal("shard2")) + }) + + It("should put in-cluster first", func() { + shards := []common.LoadIndex{ + {Shard: common.Shard{Name: "shard1"}, Value: resource.MustParse("5")}, + {Shard: common.Shard{Name: "shard2"}, Value: resource.MustParse("3")}, + {Shard: common.Shard{Name: "shard3"}, Value: resource.MustParse("8")}, + {Shard: common.Shard{Name: "in-cluster"}, Value: resource.MustParse("6")}, + {Shard: common.Shard{Name: "shard5"}, Value: resource.MustParse("7")}, + } + partitioner := PartitionerImpl{} + sortedShards, bucketSize := partitioner.Sort(shards) + Expect(bucketSize).To(Equal(float64(8))) + Expect(len(sortedShards)).To(Equal(len(shards))) + Expect(sortedShards[0].Shard.Name).To(Equal("in-cluster")) + Expect(sortedShards[1].Shard.Name).To(Equal("shard3")) + Expect(sortedShards[2].Shard.Name).To(Equal("shard5")) + Expect(sortedShards[3].Shard.Name).To(Equal("shard1")) + Expect(sortedShards[4].Shard.Name).To(Equal("shard2")) + }) + }) + + Context("when partitioning", func() { + It("should partition", func() { + shards := []common.LoadIndex{ + {Shard: common.Shard{Name: "shard1"}, Value: resource.MustParse("5")}, + {Shard: common.Shard{Name: "shard2"}, Value: resource.MustParse("3")}, + {Shard: common.Shard{Name: "shard3"}, Value: resource.MustParse("10")}, + {Shard: common.Shard{Name: "in-cluster"}, Value: resource.MustParse("6")}, + {Shard: common.Shard{Name: "shard5"}, Value: resource.MustParse("7")}, + } + partitioner := PartitionerImpl{} + replicas, err := partitioner.Partition(context.TODO(), shards) + Expect(err).ToNot(HaveOccurred()) + Expect(replicas).To(HaveLen(4)) + + Expect(replicas[0].ID).To(Equal(int32(0))) + Expect(replicas[0].LoadIndexes).To(HaveLen(1)) + Expect(replicas[0].LoadIndexes[0].Shard.Name).To(Equal("in-cluster")) + Expect(replicas[0].TotalLoad.AsApproximateFloat64()).To(Equal(float64(6))) + Expect(replicas[0].TotalLoadDisplayValue).To(Equal("6")) + + Expect(replicas[1].ID).To(Equal(int32(1))) + Expect(replicas[1].LoadIndexes).To(HaveLen(1)) + Expect(replicas[1].LoadIndexes[0].Shard.Name).To(Equal("shard3")) + Expect(replicas[1].TotalLoad.AsApproximateFloat64()).To(Equal(float64(10))) + Expect(replicas[1].TotalLoadDisplayValue).To(Equal("10")) + + Expect(replicas[2].ID).To(Equal(int32(2))) + Expect(replicas[2].LoadIndexes).To(HaveLen(1)) + Expect(replicas[2].LoadIndexes[0].Shard.Name).To(Equal("shard5")) + Expect(replicas[2].TotalLoad.AsApproximateFloat64()).To(Equal(float64(7))) + Expect(replicas[2].TotalLoadDisplayValue).To(Equal("7")) + + Expect(replicas[3].ID).To(Equal(int32(3))) + Expect(replicas[3].LoadIndexes).To(HaveLen(2)) + Expect(replicas[3].LoadIndexes[0].Shard.Name).To(Equal("shard1")) + Expect(replicas[3].LoadIndexes[1].Shard.Name).To(Equal("shard2")) + Expect(replicas[3].TotalLoad.AsApproximateFloat64()).To(Equal(float64(8))) + Expect(replicas[3].TotalLoadDisplayValue).To(Equal("8")) + }) + }) +})