Skip to content

Commit b6ae05e

Browse files
authored
feat: add a delay to scale in (#28)
1 parent 7de3059 commit b6ae05e

File tree

5 files changed

+70
-26
lines changed

5 files changed

+70
-26
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ Two additional environment variables are optional, but very useful if you're run
2525

2626
- `AUTOSCALING_MAX_KILL` (defaults to 1) - the maximum number of instances the utility is allowed to terminate in a single run;
2727
- `AUTOSCALING_MAX_CREATE` (defaults to 1) - the maximum number of instances the utility is allowed to create in a single run;
28+
- `AUTOSCALING_SCALE_DOWN_DELAY` (defaults to 0) - the number of minutes a worker must be registered to Spacelift before its eligible to be scaled in.
2829

2930
## Important note on concurrency
3031

internal/auto_scaler.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ func (s AutoScaler) Scale(ctx context.Context, cfg RuntimeConfig) error {
4545
return fmt.Errorf("could not get autoscaling group: %w", err)
4646
}
4747

48-
state, err := NewState(workerPool, asg)
48+
state, err := NewState(workerPool, asg, cfg)
4949
if err != nil {
5050
return fmt.Errorf("could not create state: %w", err)
5151
}
@@ -109,10 +109,10 @@ func (s AutoScaler) Scale(ctx context.Context, cfg RuntimeConfig) error {
109109
// If we got this far, we're scaling down.
110110
logger.With("instances", decision.ScalingSize).Info("scaling down ASG")
111111

112-
idleWorkers := state.IdleWorkers()
112+
scalableWorkers := state.ScalableWorkers()
113113

114114
for i := 0; i < decision.ScalingSize; i++ {
115-
worker := idleWorkers[i]
115+
worker := scalableWorkers[i]
116116

117117
_, instanceID, _ := worker.InstanceIdentity()
118118

internal/runtime_config.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,9 @@ type RuntimeConfig struct {
66
SpaceliftAPIEndpoint string `env:"SPACELIFT_API_KEY_ENDPOINT,notEmpty"`
77
SpaceliftWorkerPoolID string `env:"SPACELIFT_WORKER_POOL_ID,notEmpty"`
88

9-
AutoscalingGroupARN string `env:"AUTOSCALING_GROUP_ARN,notEmpty"`
10-
AutoscalingRegion string `env:"AUTOSCALING_REGION,notEmpty"`
11-
AutoscalingMaxKill int `env:"AUTOSCALING_MAX_KILL" envDefault:"1"`
12-
AutoscalingMaxCreate int `env:"AUTOSCALING_MAX_CREATE" envDefault:"1"`
9+
AutoscalingScaleDownDelay int `env:"AUTOSCALING_SCALE_DOWN_DELAY" envDefault:"0"`
10+
AutoscalingGroupARN string `env:"AUTOSCALING_GROUP_ARN,notEmpty"`
11+
AutoscalingRegion string `env:"AUTOSCALING_REGION,notEmpty"`
12+
AutoscalingMaxKill int `env:"AUTOSCALING_MAX_KILL" envDefault:"1"`
13+
AutoscalingMaxCreate int `env:"AUTOSCALING_MAX_CREATE" envDefault:"1"`
1314
}

internal/state.go

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@ package internal
22

33
import (
44
"fmt"
5-
65
"github.com/aws/aws-sdk-go-v2/service/autoscaling/types"
6+
"time"
77
)
88

99
// State represents the state of the world, as far as the autoscaler is
@@ -15,9 +15,10 @@ type State struct {
1515

1616
inServiceInstanceIDs map[InstanceID]struct{}
1717
workersByInstanceID map[InstanceID]Worker
18+
cfg RuntimeConfig
1819
}
1920

20-
func NewState(workerPool *WorkerPool, asg *types.AutoScalingGroup) (*State, error) {
21+
func NewState(workerPool *WorkerPool, asg *types.AutoScalingGroup, cfg RuntimeConfig) (*State, error) {
2122
workersByInstanceID := make(map[InstanceID]Worker)
2223
inServiceInstanceIDs := make(map[InstanceID]struct{})
2324

@@ -65,18 +66,28 @@ func NewState(workerPool *WorkerPool, asg *types.AutoScalingGroup) (*State, erro
6566
ASG: asg,
6667
inServiceInstanceIDs: inServiceInstanceIDs,
6768
workersByInstanceID: workersByInstanceID,
69+
cfg: cfg,
6870
}, nil
6971
}
7072

71-
// IdleWorkers returns a list of workers that are not currently busy.
72-
func (s *State) IdleWorkers() []Worker {
73+
// ScalableWorkers returns a list of workers that are not currently busy.
74+
func (s *State) ScalableWorkers() []Worker {
7375
var out []Worker
74-
7576
for _, worker := range s.WorkerPool.Workers {
7677
if worker.Busy {
7778
continue
7879
}
7980

81+
// Even though the worker might be idle, we will give it some time to pick up more work
82+
// if the customer wants
83+
if s.cfg.AutoscalingScaleDownDelay != 0 {
84+
workerCreationTime := time.Unix(int64(worker.CreatedAt), 0)
85+
minimumAliveTime := workerCreationTime.Add(time.Duration(s.cfg.AutoscalingScaleDownDelay) * time.Minute)
86+
if !time.Now().After(minimumAliveTime) {
87+
continue
88+
}
89+
}
90+
8091
out = append(out, worker)
8192
}
8293

@@ -126,9 +137,9 @@ func (s *State) Decide(maxCreate, maxKill int) Decision {
126137
}
127138
}
128139

129-
idle := s.IdleWorkers()
140+
scalable := s.ScalableWorkers()
130141

131-
difference := int(s.WorkerPool.PendingRuns) - len(idle)
142+
difference := int(s.WorkerPool.PendingRuns) - len(scalable)
132143

133144
if difference > 0 {
134145
return s.determineScaleUp(difference, maxCreate)

internal/state_test.go

Lines changed: 43 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,13 @@ package internal_test
22

33
import (
44
"encoding/json"
5-
"testing"
6-
75
"github.com/aws/aws-sdk-go-v2/service/autoscaling/types"
86
"github.com/franela/goblin"
97
. "github.com/onsi/gomega"
108
"github.com/stretchr/testify/assert"
119
"github.com/stretchr/testify/require"
10+
"testing"
11+
"time"
1212

1313
"github.com/spacelift-io/awsautoscalr/internal"
1414
)
@@ -17,6 +17,7 @@ func TestState_StrayInstances(t *testing.T) {
1717
const asgName = "asg-name"
1818
const instanceID = "instance-id"
1919
const failedToTerminateInstanceID = "instance-id2"
20+
cfg := internal.RuntimeConfig{}
2021
asg := &types.AutoScalingGroup{
2122
AutoScalingGroupName: nullable(asgName),
2223
MinSize: nullable(int32(1)),
@@ -46,7 +47,7 @@ func TestState_StrayInstances(t *testing.T) {
4647
},
4748
}
4849

49-
state, err := internal.NewState(workerPool, asg)
50+
state, err := internal.NewState(workerPool, asg, cfg)
5051
require.NoError(t, err)
5152

5253
strayInstances := state.StrayInstances()
@@ -60,6 +61,7 @@ func TestState(t *testing.T) {
6061
g.Describe("State", func() {
6162
var asg *types.AutoScalingGroup
6263
var workerPool *internal.WorkerPool
64+
var cfg internal.RuntimeConfig
6365

6466
var sut *internal.State
6567

@@ -76,9 +78,10 @@ func TestState(t *testing.T) {
7678
DesiredCapacity: nullable(int32(3)),
7779
}
7880
workerPool = &internal.WorkerPool{}
81+
cfg = internal.RuntimeConfig{}
7982
})
8083

81-
g.JustBeforeEach(func() { sut, err = internal.NewState(workerPool, asg) })
84+
g.JustBeforeEach(func() { sut, err = internal.NewState(workerPool, asg, cfg) })
8285

8386
g.Describe("when the ASG is invalid", func() {
8487
g.Describe("when the name is not set", func() {
@@ -210,10 +213,10 @@ func TestState(t *testing.T) {
210213
})
211214
})
212215

213-
g.Describe("IdleWorkers", func() {
214-
var idleWorkers []internal.Worker
216+
g.Describe("ScaleableWorkers", func() {
217+
var scalableWorkers []internal.Worker
215218

216-
g.JustBeforeEach(func() { idleWorkers = sut.IdleWorkers() })
219+
g.JustBeforeEach(func() { scalableWorkers = sut.ScalableWorkers() })
217220

218221
g.BeforeEach(func() {
219222
workerPool.Workers = []internal.Worker{
@@ -223,8 +226,8 @@ func TestState(t *testing.T) {
223226
})
224227

225228
g.It("should return the idle workers", func() {
226-
Expect(idleWorkers).To(HaveLen(1))
227-
Expect(idleWorkers[0].ID).To(Equal("idle"))
229+
Expect(scalableWorkers).To(HaveLen(1))
230+
Expect(scalableWorkers[0].ID).To(Equal("idle"))
228231
})
229232
})
230233

@@ -243,10 +246,20 @@ func TestState(t *testing.T) {
243246
}
244247
workerPool = &internal.WorkerPool{}
245248

246-
sut = &internal.State{
247-
WorkerPool: workerPool,
248-
ASG: asg,
249+
asg = &types.AutoScalingGroup{
250+
AutoScalingGroupName: nullable("asg-name"),
251+
MinSize: nullable(int32(1)),
252+
MaxSize: nullable(int32(2)),
253+
DesiredCapacity: nullable(int32(2)),
254+
}
255+
workerPool = &internal.WorkerPool{}
256+
cfg = internal.RuntimeConfig{
257+
AutoscalingScaleDownDelay: 50,
249258
}
259+
260+
var err error
261+
sut, err = internal.NewState(workerPool, asg, cfg)
262+
Expect(err).NotTo(HaveOccurred())
250263
})
251264

252265
g.JustBeforeEach(func() {
@@ -401,6 +414,24 @@ func TestState(t *testing.T) {
401414
Expect(decision.Comments).To(Equal([]string{"removing idle workers"}))
402415
})
403416
})
417+
418+
g.Describe("when waiting some time for idle workers", func() {
419+
g.BeforeEach(func() {
420+
workerPool.Workers = []internal.Worker{
421+
{ID: "busy", Busy: true},
422+
{ID: "idle", Busy: false, CreatedAt: int32(time.Now().Unix())},
423+
}
424+
})
425+
426+
g.It("should not scale down because the worker is busy and not enough time has passed", func() {
427+
Expect(decision.ScalingDirection).To(Equal(internal.ScalingDirectionNone))
428+
Expect(decision.ScalingSize).To(BeZero())
429+
Expect(decision.Comments).To(Equal([]string{
430+
"autoscaling group exactly at the right size",
431+
}))
432+
})
433+
})
434+
404435
})
405436
})
406437
})

0 commit comments

Comments
 (0)