Skip to content

Commit

Permalink
test: Fix pod preemption in kubelet config tests (#2531)
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathan-innis authored Sep 19, 2022
1 parent fc6b899 commit 1e00020
Show file tree
Hide file tree
Showing 6 changed files with 141 additions and 40 deletions.
79 changes: 77 additions & 2 deletions test/pkg/environment/expectations.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
"knative.dev/pkg/logging"
Expand Down Expand Up @@ -64,16 +65,19 @@ var (
)

const (
NoWatch = "NoWatch"
NoWatch = "NoWatch"
NoEvents = "NoEvents"
)

// if set, logs additional information that may be useful in debugging an E2E test failure
var debugE2E = true
var testStartTime time.Time
var stop chan struct{}

// nolint:gocyclo
func (env *Environment) BeforeEach() {
stop = make(chan struct{})
testStartTime = time.Now()

if debugE2E {
fmt.Println("------- START BEFORE -------")
Expand Down Expand Up @@ -136,6 +140,29 @@ func (env *Environment) getPodInformation(p *v1.Pod) string {
pod.IsProvisionable(p), p.Status.Phase, p.Spec.NodeName, containerInfo.String())
}

// Partially copied from
// https://github.com/kubernetes/kubernetes/blob/04ee339c7a4d36b4037ce3635993e2a9e395ebf3/staging/src/k8s.io/kubectl/pkg/describe/describe.go#L4232
func getEventInformation(k types.NamespacedName, el *v1.EventList) string {
sb := strings.Builder{}
sb.WriteString(fmt.Sprintf("------- %s EVENTS -------\n", k))
if len(el.Items) == 0 {
return sb.String()
}
for _, e := range el.Items {
source := e.Source.Component
if source == "" {
source = e.ReportingController
}
sb.WriteString(fmt.Sprintf("type=%s reason=%s from=%s message=%s\n",
e.Type,
e.Reason,
source,
strings.TrimSpace(e.Message)),
)
}
return sb.String()
}

// startPodMonitor monitors all pods that are provisioned in a namespace outside kube-system
// and karpenter namespaces during a test
func (env *Environment) startPodMonitor(stop <-chan struct{}) {
Expand Down Expand Up @@ -211,9 +238,47 @@ func (env *Environment) AfterEach() {
env.eventuallyExpectScaleDown()
env.expectNoCrashes()
close(stop) // close the pod/node monitor watch channel
if debugE2E && !lo.Contains(CurrentSpecReport().Labels(), NoEvents) {
env.dumpPodEvents(testStartTime)
}
env.printControllerLogs(&v1.PodLogOptions{Container: "controller"})
}

func (env *Environment) dumpPodEvents(testStartTime time.Time) {
el := &v1.EventList{}
ExpectWithOffset(1, env.Client.List(env, el)).To(Succeed())

eventMap := map[types.NamespacedName]*v1.EventList{}

filteredEvents := lo.Filter(el.Items, func(e v1.Event, _ int) bool {
if !e.EventTime.IsZero() {
if e.EventTime.BeforeTime(&metav1.Time{Time: testStartTime}) {
return false
}
} else if e.FirstTimestamp.Before(&metav1.Time{Time: testStartTime}) {
return false
}
if e.InvolvedObject.Kind != "Pod" {
return false
}
if e.InvolvedObject.Namespace == "kube-system" || e.InvolvedObject.Namespace == "karpenter" {
return false
}
return true
})
for i := range filteredEvents {
elem := filteredEvents[i]
objectKey := types.NamespacedName{Namespace: elem.InvolvedObject.Namespace, Name: elem.InvolvedObject.Name}
if _, ok := eventMap[objectKey]; !ok {
eventMap[objectKey] = &v1.EventList{}
}
eventMap[objectKey].Items = append(eventMap[objectKey].Items, elem)
}
for k, v := range eventMap {
fmt.Print(getEventInformation(k, v))
}
}

func (env *Environment) ExpectCreated(objects ...client.Object) {
for _, object := range objects {
object.SetLabels(lo.Assign(object.GetLabels(), map[string]string{TestLabelName: env.ClusterName}))
Expand Down Expand Up @@ -271,10 +336,19 @@ func (env *Environment) EventuallyExpectKarpenterWithEnvVar(envVar v1.EnvVar) {

func (env *Environment) EventuallyExpectHealthyPodCount(selector labels.Selector, numPods int) {
Eventually(func(g Gomega) {
g.Expect(env.Monitor.RunningPods(selector)).To(Equal(numPods))
g.Expect(env.Monitor.RunningPodsCount(selector)).To(Equal(numPods))
}).Should(Succeed())
}

func (env *Environment) ExpectUniqueNodeNames(selector labels.Selector, uniqueNames int) {
pods := env.Monitor.RunningPods(selector)
nodeNames := sets.NewString()
for _, pod := range pods {
nodeNames.Insert(pod.Spec.NodeName)
}
ExpectWithOffset(1, len(nodeNames)).To(BeNumerically("==", uniqueNames))
}

func (env *Environment) eventuallyExpectScaleDown() {
Eventually(func(g Gomega) {
// expect the current node count to be what it was when the test started
Expand Down Expand Up @@ -368,6 +442,7 @@ var (
)

func (env *Environment) printControllerLogs(options *v1.PodLogOptions) {
fmt.Println("------- START CONTROLLER LOGS -------")
if options.SinceTime == nil {
options.SinceTime = lastLogged.DeepCopy()
lastLogged = metav1.Now()
Expand Down
13 changes: 9 additions & 4 deletions test/pkg/environment/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,17 +129,22 @@ func (m *Monitor) GetCreatedNodes() []v1.Node {
}

// RunningPods returns the number of running pods matching the given selector
func (m *Monitor) RunningPods(selector labels.Selector) int {
count := 0
func (m *Monitor) RunningPods(selector labels.Selector) []*v1.Pod {
var pods []*v1.Pod
for _, pod := range m.poll().pods.Items {
pod := pod
if pod.Status.Phase != v1.PodRunning {
continue
}
if selector.Matches(labels.Set(pod.Labels)) {
count++
pods = append(pods, &pod)
}
}
return count
return pods
}

func (m *Monitor) RunningPodsCount(selector labels.Selector) int {
return len(m.RunningPods(selector))
}

func (m *Monitor) poll() state {
Expand Down
2 changes: 1 addition & 1 deletion test/suites/consolidation/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ var _ = AfterEach(func() {
})

var _ = Describe("Consolidation", func() {
It("should consolidate nodes (delete)", Label(environment.NoWatch), func() {
It("should consolidate nodes (delete)", Label(environment.NoWatch), Label(environment.NoEvents), func() {
provider := test.AWSNodeTemplate(v1alpha1.AWSNodeTemplateSpec{AWS: awsv1alpha1.AWS{
SecurityGroupSelector: map[string]string{"karpenter.sh/discovery": env.ClusterName},
SubnetSelector: map[string]string{"karpenter.sh/discovery": env.ClusterName},
Expand Down
83 changes: 52 additions & 31 deletions test/suites/integration/kubelet_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import (
. "github.com/onsi/gomega"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"knative.dev/pkg/ptr"

"github.com/aws/karpenter/pkg/apis/awsnodetemplate/v1alpha1"
Expand Down Expand Up @@ -56,19 +58,24 @@ var _ = Describe("KubeletConfiguration Overrides", func() {
MaxPods: ptr.Int32(1 + int32(dsCount)),
}

pods := []*v1.Pod{test.Pod(), test.Pod(), test.Pod()}
env.ExpectCreated(provisioner, provider)
for _, pod := range pods {
env.ExpectCreated(pod)
}
env.EventuallyExpectHealthy(pods...)
env.ExpectCreatedNodeCount("==", 3)
numPods := 3
dep := test.Deployment(test.DeploymentOptions{
Replicas: int32(numPods),
PodOptions: test.PodOptions{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{"app": "large-app"},
},
ResourceRequirements: v1.ResourceRequirements{
Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("100m")},
},
},
})
selector := labels.SelectorFromSet(dep.Spec.Selector.MatchLabels)
env.ExpectCreated(provisioner, provider, dep)

nodeNames := sets.NewString()
for _, pod := range pods {
nodeNames.Insert(pod.Spec.NodeName)
}
Expect(len(nodeNames)).To(BeNumerically("==", 3))
env.EventuallyExpectHealthyPodCount(selector, numPods)
env.ExpectCreatedNodeCount("==", 3)
env.ExpectUniqueNodeNames(selector, 3)
})
It("should schedule pods onto separate nodes when podsPerCore is set", func() {
provider := test.AWSNodeTemplate(v1alpha1.AWSNodeTemplateSpec{AWS: awsv1alpha1.AWS{
Expand All @@ -92,6 +99,19 @@ var _ = Describe("KubeletConfiguration Overrides", func() {
},
},
})
numPods := 4
dep := test.Deployment(test.DeploymentOptions{
Replicas: int32(numPods),
PodOptions: test.PodOptions{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{"app": "large-app"},
},
ResourceRequirements: v1.ResourceRequirements{
Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("100m")},
},
},
})
selector := labels.SelectorFromSet(dep.Spec.Selector.MatchLabels)

// Get the DS pod count and use it to calculate the DS pod overhead
// We calculate podsPerCore to split the test pods and the DS pods between two nodes:
Expand All @@ -106,19 +126,10 @@ var _ = Describe("KubeletConfiguration Overrides", func() {
PodsPerCore: ptr.Int32(int32(math.Ceil(float64(2+dsCount) / 2))),
}

pods := []*v1.Pod{test.Pod(), test.Pod(), test.Pod(), test.Pod()}
env.ExpectCreated(provisioner, provider)
for _, pod := range pods {
env.ExpectCreated(pod)
}
env.EventuallyExpectHealthy(pods...)
env.ExpectCreated(provisioner, provider, dep)
env.EventuallyExpectHealthyPodCount(selector, numPods)
env.ExpectCreatedNodeCount("==", 2)

nodeNames := sets.NewString()
for _, pod := range pods {
nodeNames.Insert(pod.Spec.NodeName)
}
Expect(len(nodeNames)).To(BeNumerically("==", 2))
env.ExpectUniqueNodeNames(selector, 2)
})
It("should ignore podsPerCore value when Bottlerocket is used", func() {
provider := test.AWSNodeTemplate(v1alpha1.AWSNodeTemplateSpec{AWS: awsv1alpha1.AWS{
Expand All @@ -141,14 +152,24 @@ var _ = Describe("KubeletConfiguration Overrides", func() {
},
},
})
numPods := 6
dep := test.Deployment(test.DeploymentOptions{
Replicas: int32(numPods),
PodOptions: test.PodOptions{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{"app": "large-app"},
},
ResourceRequirements: v1.ResourceRequirements{
Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("100m")},
},
},
})
selector := labels.SelectorFromSet(dep.Spec.Selector.MatchLabels)

pods := []*v1.Pod{test.Pod(), test.Pod(), test.Pod(), test.Pod(), test.Pod(), test.Pod()}
env.ExpectCreated(provisioner, provider)
for _, pod := range pods {
env.ExpectCreated(pod)
}
env.EventuallyExpectHealthy(pods...)
env.ExpectCreated(provisioner, provider, dep)
env.EventuallyExpectHealthyPodCount(selector, numPods)
env.ExpectCreatedNodeCount("==", 1)
env.ExpectUniqueNodeNames(selector, 1)
})
})

Expand Down
2 changes: 1 addition & 1 deletion test/suites/integration/scheduling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ var _ = Describe("Scheduling", func() {
env.EventuallyExpectHealthy(pod)
env.ExpectCreatedNodeCount("==", 1)
})
It("should provision a node for a deployment", Label(environment.NoWatch), func() {
It("should provision a node for a deployment", Label(environment.NoWatch), Label(environment.NoEvents), func() {
provider := test.AWSNodeTemplate(v1alpha1.AWSNodeTemplateSpec{AWS: awsv1alpha1.AWS{
SecurityGroupSelector: map[string]string{"karpenter.sh/discovery": env.ClusterName},
SubnetSelector: map[string]string{"karpenter.sh/discovery": env.ClusterName},
Expand Down
2 changes: 1 addition & 1 deletion test/suites/utilization/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestUtilization(t *testing.T) {
var _ = BeforeEach(func() { env.BeforeEach() })
var _ = AfterEach(func() { env.AfterEach() })

var _ = Describe("Utilization", Label(environment.NoWatch), func() {
var _ = Describe("Utilization", Label(environment.NoWatch), Label(environment.NoEvents), func() {
It("should provision one pod per node", func() {
provider := test.AWSNodeTemplate(v1alpha1.AWSNodeTemplateSpec{AWS: awsv1alpha1.AWS{
SecurityGroupSelector: map[string]string{"karpenter.sh/discovery": env.ClusterName},
Expand Down

0 comments on commit 1e00020

Please sign in to comment.