Skip to content

Commit

Permalink
feat(tekton-kfptask): Update kfptask to support pod metadata (#1449)
Browse files Browse the repository at this point in the history
* update kfptask to support pod metadata

* fix type
  • Loading branch information
Tomcli authored Jan 17, 2024
1 parent d5fc9fd commit 550a827
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 8 deletions.
2 changes: 1 addition & 1 deletion tekton-catalog/tekton-kfptask/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ require (
github.com/google/uuid v1.3.1
github.com/kubeflow/pipelines v0.0.0-20231027040853-58ce09e07d03
github.com/kubeflow/pipelines/api v0.0.0-20231027040853-58ce09e07d03
github.com/kubeflow/pipelines/kubernetes_platform v0.0.0-20231027040853-58ce09e07d03
github.com/kubeflow/pipelines/kubernetes_platform v0.0.0-20240111221413-aac4408237df
github.com/kubeflow/pipelines/third_party/ml-metadata v0.0.0-20231027040853-58ce09e07d03
github.com/stretchr/testify v1.8.4
github.com/tektoncd/pipeline v0.53.2
Expand Down
4 changes: 2 additions & 2 deletions tekton-catalog/tekton-kfptask/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions tekton-catalog/tekton-kfptask/pkg/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,10 @@ func ParseParams(run *tektonv1beta1.CustomRun) (*driverOptions, *apis.FieldError
return opts, nil
}

func GetKubernetesExecutorConfig(options *driverOptions) *kubernetesplatform.KubernetesExecutorConfig {
return options.options.KubernetesExecutorConfig
}

func prettyPrint(jsonStr string) string {
var prettyJSON bytes.Buffer
err := json.Indent(&prettyJSON, []byte(jsonStr), "", " ")
Expand Down
56 changes: 51 additions & 5 deletions tekton-catalog/tekton-kfptask/pkg/reconciler/kfptask/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
kfptaskClient "github.com/kubeflow/kfp-tekton/tekton-catalog/tekton-kfptask/pkg/client/clientset/versioned"
kfptaskListers "github.com/kubeflow/kfp-tekton/tekton-catalog/tekton-kfptask/pkg/client/listers/kfptask/v1alpha1"
"github.com/kubeflow/kfp-tekton/tekton-catalog/tekton-kfptask/pkg/common"
"github.com/kubeflow/pipelines/kubernetes_platform/go/kubernetesplatform"
"github.com/tektoncd/pipeline/pkg/apis/pipeline"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/pod"
tektonv1 "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1"
Expand Down Expand Up @@ -135,13 +136,13 @@ var annotationToDrop = map[string]string{
}

// transite to next state based on current state
func (kts *kfptaskFS) next(executionID string, executorInput string, podSpecPatch string) error {
func (kts *kfptaskFS) next(executionID string, executorInput string, podSpecPatch string, executorConfig *kubernetesplatform.KubernetesExecutorConfig) error {
kts.logger.Infof("kts state is %s", kts.state)
switch kts.state {
case StateInit:
// create the corresponding TaskRun CRD and start the task
// compose TaskRun
tr, err := kts.constructTaskRun(executionID, executorInput, podSpecPatch)
tr, err := kts.constructTaskRun(executionID, executorInput, podSpecPatch, executorConfig)
if err != nil {
kts.logger.Infof("Failed to construct a TaskRun:%v", err)
kts.run.Status.MarkCustomRunFailed(kfptaskv1alpha1.KfpTaskRunReasonInternalError.String(), "Failed to construct a TaskRun: %v", err)
Expand Down Expand Up @@ -196,7 +197,47 @@ func (kts *kfptaskFS) next(executionID string, executorInput string, podSpecPatc
return nil
}

func (kts *kfptaskFS) constructTaskRun(executionID string, executorInput string, podSpecPatch string) (*tektonv1.TaskRun, error) {
// Extends the PodMetadata to include Kubernetes-specific executor config.
// Although the current podMetadata object is always empty, this function
// doesn't overwrite the existing podMetadata because for security reasons
// the existing podMetadata should have higher privilege than the user definition.
func extendPodMetadata(
podMetadata *metav1.ObjectMeta,
kubernetesExecutorConfig *kubernetesplatform.KubernetesExecutorConfig,
) {
// Get pod metadata information
if kubernetesExecutorConfig.GetPodMetadata() != nil {
if kubernetesExecutorConfig.GetPodMetadata().GetLabels() != nil {
if podMetadata.Labels == nil {
podMetadata.Labels = kubernetesExecutorConfig.GetPodMetadata().GetLabels()
} else {
podMetadata.Labels = extendMetadataMap(podMetadata.Labels, kubernetesExecutorConfig.GetPodMetadata().GetLabels())
}
}
if kubernetesExecutorConfig.GetPodMetadata().GetAnnotations() != nil {
if podMetadata.Annotations == nil {
podMetadata.Annotations = kubernetesExecutorConfig.GetPodMetadata().GetAnnotations()
} else {
podMetadata.Annotations = extendMetadataMap(podMetadata.Annotations, kubernetesExecutorConfig.GetPodMetadata().GetAnnotations())
}
}
}
}

// Extends metadata map values, highPriorityMap should overwrites lowPriorityMap values
// The original Map inputs should have higher priority since its defined by admin
// TODO: Use maps.Copy after moving to go 1.21+
func extendMetadataMap(
highPriorityMap map[string]string,
lowPriorityMap map[string]string,
) map[string]string {
for k, v := range highPriorityMap {
lowPriorityMap[k] = v
}
return lowPriorityMap
}

func (kts *kfptaskFS) constructTaskRun(executionID string, executorInput string, podSpecPatch string, executorConfig *kubernetesplatform.KubernetesExecutorConfig) (*tektonv1.TaskRun, error) {
ktSpec, err := kts.reconciler.getKfpTaskSpec(kts.ctx, kts.run)
if err != nil {
return nil, err
Expand Down Expand Up @@ -235,6 +276,10 @@ func (kts *kfptaskFS) constructTaskRun(executionID string, executorInput string,
},
}

if executorConfig != nil {
extendPodMetadata(&tr.ObjectMeta, executorConfig)
}

if podSpecPatch != "" {
podSpec, err := parseTaskSpecPatch(podSpecPatch)
if err != nil {
Expand Down Expand Up @@ -312,7 +357,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, run *tektonv1beta1.Custo
return nil
}
if ktstate.isRunning() {
return ktstate.next("", "", "")
return ktstate.next("", "", "", nil)
}
options, err := common.ParseParams(run)
if err != nil {
Expand All @@ -321,6 +366,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, run *tektonv1beta1.Custo
"Run can't be run because it has an invalid param - %v", err)
return nil
}
executorConfig := common.GetKubernetesExecutorConfig(options)

runResults, runTask, executionID, executorInput, podSpecPatch, driverErr := common.ExecDriver(ctx, options)
if driverErr != nil {
Expand All @@ -341,7 +387,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, run *tektonv1beta1.Custo
return nil
}

return ktstate.next(executionID, executorInput, podSpecPatch)
return ktstate.next(executionID, executorInput, podSpecPatch, executorConfig)
}

func (r *Reconciler) FinalizeKind(ctx context.Context, run *tektonv1beta1.CustomRun) reconciler.Event {
Expand Down

0 comments on commit 550a827

Please sign in to comment.