Skip to content

Commit 10ca01c

Browse files
fix: handle detached not terminated instances (#18)
Fix the issue where a detached but not terminated instance could lead to incorrect scaling decisions.
1 parent 41323ce commit 10ca01c

File tree

3 files changed

+121
-6
lines changed

3 files changed

+121
-6
lines changed

internal/auto_scaler_test.go

Lines changed: 53 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,10 @@ import (
44
"bytes"
55
"context"
66
"testing"
7+
"time"
78

89
"github.com/aws/aws-sdk-go-v2/service/autoscaling/types"
10+
ec2types "github.com/aws/aws-sdk-go-v2/service/ec2/types"
911
"github.com/stretchr/testify/mock"
1012
"github.com/stretchr/testify/require"
1113
"golang.org/x/exp/slog"
@@ -68,7 +70,7 @@ func TestAutoScalerScalingUp(t *testing.T) {
6870
MaxSize: ptr(int32(3)),
6971
DesiredCapacity: ptr(int32(2)),
7072
Instances: []types.Instance{
71-
{},
73+
{InstanceId: ptr("instance")},
7274
},
7375
}, nil)
7476
ctrl.On("ScaleUpASG", mock.Anything, int32(2)).Return(nil)
@@ -107,8 +109,8 @@ func TestAutoScalerScalingDown(t *testing.T) {
107109
MaxSize: ptr(int32(3)),
108110
DesiredCapacity: ptr(int32(2)),
109111
Instances: []types.Instance{
110-
{},
111-
{},
112+
{InstanceId: ptr("instance")},
113+
{InstanceId: ptr("instance2")},
112114
},
113115
}, nil)
114116
ctrl.On("DrainWorker", mock.Anything, "1").Return(true, nil)
@@ -117,6 +119,54 @@ func TestAutoScalerScalingDown(t *testing.T) {
117119
require.NoError(t, err)
118120
}
119121

122+
func TestAutoScalerDetachedNotTerminatedInstances(t *testing.T) {
123+
var buf bytes.Buffer
124+
h := slog.NewTextHandler(&buf, nil)
125+
126+
cfg := internal.RuntimeConfig{}
127+
128+
ctrl := new(MockController)
129+
defer ctrl.AssertExpectations(t)
130+
131+
scaler := internal.NewAutoScaler(ctrl, slog.New(h))
132+
133+
ctrl.On("GetWorkerPool", mock.Anything).Return(&internal.WorkerPool{
134+
Workers: []internal.Worker{
135+
{
136+
ID: "1",
137+
Metadata: `{"asg_id": "group", "instance_id": "instance"}`,
138+
},
139+
{
140+
ID: "2",
141+
Drained: true,
142+
Metadata: `{"asg_id": "group", "instance_id": "detached"}`,
143+
},
144+
},
145+
PendingRuns: 2,
146+
}, nil)
147+
ctrl.On("GetAutoscalingGroup", mock.Anything).Return(&types.AutoScalingGroup{
148+
AutoScalingGroupName: ptr("group"),
149+
MinSize: ptr(int32(1)),
150+
MaxSize: ptr(int32(3)),
151+
DesiredCapacity: ptr(int32(2)),
152+
Instances: []types.Instance{
153+
{InstanceId: ptr("instance")},
154+
},
155+
}, nil)
156+
ctrl.On("KillInstance", mock.Anything, "detached").Return(nil)
157+
output := []ec2types.Instance{{
158+
InstanceId: ptr("detached"),
159+
LaunchTime: nullable(time.Now().Add(-time.Hour)),
160+
}}
161+
ctrl.On(
162+
"DescribeInstances",
163+
mock.Anything,
164+
[]string{"detached"},
165+
).Return(output, nil)
166+
err := scaler.Scale(context.Background(), cfg)
167+
require.NoError(t, err)
168+
}
169+
120170
func ptr[T any](v T) *T {
121171
return &v
122172
}

internal/state.go

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,14 +85,37 @@ func (s *State) IdleWorkers() []Worker {
8585

8686
// StrayInstances returns a list of instance IDs that don't have a corresponding
8787
// worker in the worker pool.
88-
func (s *State) StrayInstances() (out []string) {
88+
func (s *State) StrayInstances() []string {
89+
var res []string
8990
for instanceID := range s.inServiceInstanceIDs {
9091
if _, ok := s.workersByInstanceID[instanceID]; !ok {
91-
out = append(out, string(instanceID))
92+
res = append(res, string(instanceID))
9293
}
9394
}
9495

95-
return
96+
res = append(res, s.detachedNotTerminatedInstances()...)
97+
98+
return res
99+
}
100+
101+
func (s *State) detachedNotTerminatedInstances() []string {
102+
instanceIDs := make(map[InstanceID]struct{})
103+
for _, instance := range s.ASG.Instances {
104+
instanceIDs[InstanceID(*instance.InstanceId)] = struct{}{}
105+
}
106+
107+
var res []string
108+
for instanceID, worker := range s.workersByInstanceID {
109+
if !worker.Drained {
110+
continue
111+
}
112+
if _, ok := instanceIDs[instanceID]; ok {
113+
continue
114+
}
115+
116+
res = append(res, string(instanceID))
117+
}
118+
return res
96119
}
97120

98121
func (s *State) Decide(maxCreate, maxKill int) Decision {

internal/state_test.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,52 @@ import (
77
"github.com/aws/aws-sdk-go-v2/service/autoscaling/types"
88
"github.com/franela/goblin"
99
. "github.com/onsi/gomega"
10+
"github.com/stretchr/testify/assert"
11+
"github.com/stretchr/testify/require"
1012

1113
"github.com/spacelift-io/awsautoscalr/internal"
1214
)
1315

16+
func TestState_StrayInstances(t *testing.T) {
17+
const asgName = "asg-name"
18+
const instanceID = "instance-id"
19+
const failedToTerminateInstanceID = "instance-id2"
20+
asg := &types.AutoScalingGroup{
21+
AutoScalingGroupName: nullable(asgName),
22+
MinSize: nullable(int32(1)),
23+
MaxSize: nullable(int32(5)),
24+
DesiredCapacity: nullable(int32(3)),
25+
Instances: []types.Instance{
26+
{
27+
InstanceId: nullable(instanceID),
28+
},
29+
},
30+
}
31+
workerPool := &internal.WorkerPool{
32+
Workers: []internal.Worker{
33+
{
34+
Metadata: mustJSON(map[string]any{
35+
"asg_id": asgName,
36+
"instance_id": instanceID,
37+
}),
38+
},
39+
{
40+
Drained: true,
41+
Metadata: mustJSON(map[string]any{
42+
"asg_id": asgName,
43+
"instance_id": failedToTerminateInstanceID,
44+
}),
45+
},
46+
},
47+
}
48+
49+
state, err := internal.NewState(workerPool, asg)
50+
require.NoError(t, err)
51+
52+
strayInstances := state.StrayInstances()
53+
assert.Equal(t, []string{failedToTerminateInstanceID}, strayInstances)
54+
}
55+
1456
func TestState(t *testing.T) {
1557
g := goblin.Goblin(t)
1658
RegisterFailHandler(func(m string, _ ...int) { g.Fail(m) })

0 commit comments

Comments
 (0)