Skip to content

Commit

Permalink
refactor: remove use of k8s pods (#2553)
Browse files Browse the repository at this point in the history
## Description

Removes use of k8s pod functions.

## Related Issue

Relates to #2507

## Checklist before merging

- [x] Test, docs, adr added or updated as needed
- [x] [Contributor Guide
Steps](https://github.com/defenseunicorns/zarf/blob/main/.github/CONTRIBUTING.md#developer-workflow)
followed
  • Loading branch information
phillebaba committed Jun 11, 2024
1 parent ed2f876 commit fa57f65
Show file tree
Hide file tree
Showing 7 changed files with 259 additions and 333 deletions.
48 changes: 30 additions & 18 deletions src/internal/packager/helm/zarf.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ import (

pkgkubernetes "github.com/defenseunicorns/pkg/kubernetes"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/defenseunicorns/zarf/src/internal/packager/template"
"github.com/defenseunicorns/zarf/src/pkg/cluster"
"github.com/defenseunicorns/zarf/src/pkg/k8s"
"github.com/defenseunicorns/zarf/src/pkg/message"
"github.com/defenseunicorns/zarf/src/pkg/transform"
"github.com/defenseunicorns/zarf/src/pkg/utils"
Expand Down Expand Up @@ -79,18 +80,21 @@ func (h *Helm) UpdateZarfAgentValues(ctx context.Context) error {
}

// Get the current agent image from one of its pods.
pods := h.cluster.WaitForPodsAndContainers(
ctx,
k8s.PodLookup{
Namespace: cluster.ZarfNamespaceName,
Selector: "app=agent-hook",
},
nil,
)
selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{MatchLabels: map[string]string{"app": "agent-hook"}})
if err != nil {
return err
}
listOpts := metav1.ListOptions{
LabelSelector: selector.String(),
}
podList, err := h.cluster.Clientset.CoreV1().Pods(cluster.ZarfNamespaceName).List(ctx, listOpts)
if err != nil {
return err
}

var currentAgentImage transform.Image
if len(pods) > 0 && len(pods[0].Spec.Containers) > 0 {
currentAgentImage, err = transform.ParseImageRef(pods[0].Spec.Containers[0].Image)
if len(podList.Items) > 0 && len(podList.Items[0].Spec.Containers) > 0 {
currentAgentImage, err = transform.ParseImageRef(podList.Items[0].Spec.Containers[0].Image)
if err != nil {
return fmt.Errorf("unable to parse current agent image reference: %w", err)
}
Expand Down Expand Up @@ -142,13 +146,21 @@ func (h *Helm) UpdateZarfAgentValues(ctx context.Context) error {
defer spinner.Stop()

// Force pods to be recreated to get the updated secret.
err = h.cluster.DeletePods(
ctx,
k8s.PodLookup{
Namespace: cluster.ZarfNamespaceName,
Selector: "app=agent-hook",
},
)
// TODO: Explain why no grace period is given.
deleteGracePeriod := int64(0)
deletePolicy := metav1.DeletePropagationForeground
deleteOpts := metav1.DeleteOptions{
GracePeriodSeconds: &deleteGracePeriod,
PropagationPolicy: &deletePolicy,
}
selector, err = metav1.LabelSelectorAsSelector(&metav1.LabelSelector{MatchLabels: map[string]string{"app": "agent-hook"}})
if err != nil {
return err
}
listOpts = metav1.ListOptions{
LabelSelector: selector.String(),
}
err = h.cluster.Clientset.CoreV1().Pods(cluster.ZarfNamespaceName).DeleteCollection(ctx, deleteOpts, listOpts)
if err != nil {
return fmt.Errorf("error recycling pods for the Zarf Agent: %w", err)
}
Expand Down
111 changes: 104 additions & 7 deletions src/pkg/cluster/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,30 @@ import (
"fmt"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"sync"
"time"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"

"github.com/defenseunicorns/pkg/helpers/v2"

"github.com/defenseunicorns/zarf/src/config"
"github.com/defenseunicorns/zarf/src/pkg/k8s"
"github.com/defenseunicorns/zarf/src/pkg/layout"
"github.com/defenseunicorns/zarf/src/pkg/message"
"github.com/defenseunicorns/zarf/src/pkg/utils"
"github.com/defenseunicorns/zarf/src/pkg/utils/exec"
"github.com/defenseunicorns/zarf/src/types"
corev1 "k8s.io/api/core/v1"
)

// HandleDataInjection waits for the target pod(s) to come up and inject the data into them
// todo: this currently requires kubectl but we should have enough k8s work to make this native now.
func (c *Cluster) HandleDataInjection(ctx context.Context, wg *sync.WaitGroup, data types.ZarfDataInjection, componentPath *layout.ComponentPaths, dataIdx int) {
defer wg.Done()

injectionCompletionMarker := filepath.Join(componentPath.DataInjections, config.GetDataInjectionMarker())
if err := os.WriteFile(injectionCompletionMarker, []byte("🦄"), helpers.ReadWriteUser); err != nil {
message.WarnErrf(err, "Unable to create the data injection completion marker")
Expand Down Expand Up @@ -68,14 +72,14 @@ iterator:
}
}

target := k8s.PodLookup{
target := podLookup{
Namespace: data.Target.Namespace,
Selector: data.Target.Selector,
Container: data.Target.Container,
}

// Wait until the pod we are injecting data into becomes available
pods := c.WaitForPodsAndContainers(ctx, target, podFilterByInitContainer)
pods := waitForPodsAndContainers(ctx, c.Clientset, target, podFilterByInitContainer)
if len(pods) < 1 {
continue
}
Expand Down Expand Up @@ -132,15 +136,15 @@ iterator:
}

// Do not look for a specific container after injection in case they are running an init container
podOnlyTarget := k8s.PodLookup{
podOnlyTarget := podLookup{
Namespace: data.Target.Namespace,
Selector: data.Target.Selector,
}

// Block one final time to make sure at least one pod has come up and injected the data
// Using only the pod as the final selector because we don't know what the container name will be
// Still using the init container filter to make sure we have the right running pod
_ = c.WaitForPodsAndContainers(ctx, podOnlyTarget, podFilterByInitContainer)
_ = waitForPodsAndContainers(ctx, c.Clientset, podOnlyTarget, podFilterByInitContainer)

// Cleanup now to reduce disk pressure
_ = os.RemoveAll(source)
Expand All @@ -149,3 +153,96 @@ iterator:
return
}
}

// podLookup is a struct for specifying a pod to target for data injection or lookups.
type podLookup struct {
Namespace string
Selector string
Container string
}

// podFilter is a function that returns true if the pod should be targeted for data injection or lookups.
type podFilter func(pod corev1.Pod) bool

// WaitForPodsAndContainers attempts to find pods matching the given selector and optional inclusion filter
// It will wait up to 90 seconds for the pods to be found and will return a list of matching pod names
// If the timeout is reached, an empty list will be returned.
// TODO: Test, refactor and/or remove.
func waitForPodsAndContainers(ctx context.Context, clientset kubernetes.Interface, target podLookup, include podFilter) []corev1.Pod {
waitCtx, cancel := context.WithTimeout(ctx, 90*time.Second)
defer cancel()

timer := time.NewTimer(0)
defer timer.Stop()

for {
select {
case <-waitCtx.Done():
message.Debug("Pod lookup failed: %v", ctx.Err())
return nil
case <-timer.C:
listOpts := metav1.ListOptions{
LabelSelector: target.Selector,
}
podList, err := clientset.CoreV1().Pods(target.Namespace).List(ctx, listOpts)
if err != nil {
message.Debug("Unable to find matching pods: %w", err)
return nil
}

message.Debug("Found %d pods for target %#v", len(podList.Items), target)

var readyPods = []corev1.Pod{}

// Sort the pods from newest to oldest
sort.Slice(podList.Items, func(i, j int) bool {
return podList.Items[i].CreationTimestamp.After(podList.Items[j].CreationTimestamp.Time)
})

for _, pod := range podList.Items {
message.Debug("Testing pod %q", pod.Name)

// If an include function is provided, only keep pods that return true
if include != nil && !include(pod) {
continue
}

// Handle container targeting
if target.Container != "" {
message.Debug("Testing pod %q for container %q", pod.Name, target.Container)

// Check the status of initContainers for a running match
for _, initContainer := range pod.Status.InitContainerStatuses {
isRunning := initContainer.State.Running != nil
if initContainer.Name == target.Container && isRunning {
// On running match in initContainer break this loop
readyPods = append(readyPods, pod)
break
}
}

// Check the status of regular containers for a running match
for _, container := range pod.Status.ContainerStatuses {
isRunning := container.State.Running != nil
if container.Name == target.Container && isRunning {
readyPods = append(readyPods, pod)
break
}
}
} else {
status := pod.Status.Phase
message.Debug("Testing pod %q phase, want (%q) got (%q)", pod.Name, corev1.PodRunning, status)
// Regular status checking without a container
if status == corev1.PodRunning {
readyPods = append(readyPods, pod)
break
}
}
}
if len(readyPods) > 0 {
return readyPods
}
timer.Reset(3 * time.Second)
}
}
}
Loading

0 comments on commit fa57f65

Please sign in to comment.