Skip to content

feat(backend/sdk): enable dsl.Collected for parameters & artifacts #11725

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 5, 2025

Conversation

zazulam
Copy link
Contributor

@zazulam zazulam commented Mar 5, 2025

Description of your changes:
Jumping off from #11627, these changes resolve #10050 in regards to the using dsl.Collected for both parameters and artifacts in pipelines. Currently for artifacts, the executor needs to be updated in the sdk and have a release prior to tests being enabled. As for parameters that should work out of the box with solely backend changes.

This PR introduces a few new methods to help ease with detecting and resolving collected inputs as well as cleaning up some of the shared logic between resolveUpstreamParameters & resolveUpstreamArtifacts.

A few things to keep in mind:
A parallelFor creates x+1 dags, where x is the number of iterations. The extra dag is something I like to refer to as the parallelFor Head dag, being that it only contains the iteration dags within it.

With these changes, the keys within the task map returned from getDAGTasks will now contain the parent dag id associated with the task. This is to help maintain uniqueness, also when detecting a parallelFor iteration task/dag, the index will be added to the task name to further prevent any potential collisions and map the appropriate tasks within the same iteration for input/output resolution.

When resolving the inputs, if the current Task is a DAG the driver now first checks if the it is a parallelFor by inspecting if there exists an iteration_count custom property, as only parallelFor heads have that property. If found, the iteration dag names will be added to a queue that the CollectInputs will use to start the resolution search.

New methods:

  • CollectInputs - Performs a BFS on the tasks passed to reach the final producer task iterations and collects the values into the appropriate structure for either Artifacts or Parameters
  • CollectContainerOutput - Helper function for processing the output of a containerExecution, currently only used in the CollectInputs.
    * GetProducerTask - Performs the check on a task for the potential new producer task and output parameter/artifact key and returns the updated value if found.
  • InferIndexedTaskName - Used to update the producer task with the appropriate index if the current dag context is a parallelFor iteration dag i.e. resolving task inputs within the loop.
  • getParallelForIterationCount - Helper to get the number of iterations / determine if a parallelFor head dag.
  • GetParallelForTaskName - Used to update the task name with the iteration value supplied.
  • GetTaskNameWithDagID - Used to update the task name with the it's parent's dag id.

Checklist:

@google-oss-prow google-oss-prow bot requested review from Ark-kun, DharmitD and mprahl March 5, 2025 01:41
@zazulam zazulam force-pushed the collected branch 2 times, most recently from c6c8142 to 4146148 Compare March 5, 2025 01:49
@zazulam
Copy link
Contributor Author

zazulam commented Mar 5, 2025

Providing some more context, this is the error that occurs for artifacts when a user attempts use either List[Artifact]:

main     executor = component_executor.Executor(
main   File "/usr/local/lib/python3.9/site-packages/kfp/dsl/executor.py", line 52, in __init__
main     self.assign_input_and_output_artifacts()
main   File "/usr/local/lib/python3.9/site-packages/kfp/dsl/executor.py", line 73, in assign_input_and_output_artifacts
main     self.input_artifacts[name] = [
main   File "/usr/local/lib/python3.9/site-packages/kfp/dsl/executor.py", line 74, in <listcomp>
main     self.make_artifact(
main   File "/usr/local/lib/python3.9/site-packages/kfp/dsl/executor.py", line 118, in make_artifact
main     return create_artifact_instance(
main   File "/usr/local/lib/python3.9/site-packages/kfp/dsl/executor.py", line 378, in create_artifact_instance
main     ) if hasattr(artifact_cls, '_from_executor_fields') else artifact_cls(
main   File "/usr/local/lib/python3.9/typing.py", line 685, in __call__
main     raise TypeError(f"Type {self._name} cannot be instantiated; "
main TypeError: Type List cannot be instantiated; use list() instead

or list[Artifact]

main     return _run_code(code, main_globals, None,
main   File "/usr/local/lib/python3.9/runpy.py", line 87, in _run_code
main     exec(code, run_globals)
main   File "/usr/local/lib/python3.9/site-packages/kfp/dsl/executor_main.py", line 109, in <module>
main     executor_main()
main   File "/usr/local/lib/python3.9/site-packages/kfp/dsl/executor_main.py", line 98, in executor_main
main     executor = component_executor.Executor(
main   File "/usr/local/lib/python3.9/site-packages/kfp/dsl/executor.py", line 52, in __init__
main     self.assign_input_and_output_artifacts()
main   File "/usr/local/lib/python3.9/site-packages/kfp/dsl/executor.py", line 73, in assign_input_and_output_artifacts
main     self.input_artifacts[name] = [
main   File "/usr/local/lib/python3.9/site-packages/kfp/dsl/executor.py", line 74, in <listcomp>
main     self.make_artifact(
main   File "/usr/local/lib/python3.9/site-packages/kfp/dsl/executor.py", line 118, in make_artifact
main     return create_artifact_instance(
main   File "/usr/local/lib/python3.9/site-packages/kfp/dsl/executor.py", line 378, in create_artifact_instance
main     ) if hasattr(artifact_cls, '_from_executor_fields') else artifact_cls(
main TypeError: list() takes no keyword arguments

So we use the get_args & get_origin to determine the actual artifact cls type used within the collection.

I had made a custom image to test with the sdk installed with the executor changes, this is what it looks like:
image

@zazulam zazulam force-pushed the collected branch 2 times, most recently from b82df21 to e9cf281 Compare March 23, 2025 15:53
@zazulam zazulam requested a review from chensun March 23, 2025 15:54
@zazulam
Copy link
Contributor Author

zazulam commented Mar 23, 2025

/retest

@zazulam
Copy link
Contributor Author

zazulam commented Mar 23, 2025

After rebasing CI started to fail.

@zazulam
Copy link
Contributor Author

zazulam commented Mar 24, 2025

/retest

@zazulam zazulam force-pushed the collected branch 2 times, most recently from 4898385 to 45abf43 Compare March 24, 2025 18:39
@zazulam
Copy link
Contributor Author

zazulam commented Mar 24, 2025

Tests are finally passing after rebasing again 🙏

@zazulam zazulam changed the title feat(backend): enable dsl.Collected for parameters & artifacts feat(backend/sdk): enable dsl.Collected for parameters & artifacts Mar 24, 2025
@zazulam
Copy link
Contributor Author

zazulam commented Mar 24, 2025

Here's visuals of the sample test supplied with the updates, along with the logs of the main container from the launcher pod of the read-files component, the components here are using a custom image I built that installs the sdk with the changes from this PR, along with some additional print statements to verify the types used when parsing the artifacts from the artifact list.

image
image

Logs:

time="2025-03-24T20:12:24.247Z" level=info msg="capturing logs" argo=true
I0324 20:12:24.265991      21 main.go:58] Setting log level to: '1'
I0324 20:12:24.267121      21 cache.go:117] Connecting to cache endpoint 10.96.185.120:8887
WARNING: Running pip as the 'root' user can result in broken permissions and conflicting behaviour with the system package manager. It is recommended to use a virtual environment instead: 
https://pip.pypa.io/warnings/venv
[KFP Executor 2025-03-24 20:12:24,614 INFO]: Looking for component `read_files` in --component_module_path `/tmp/tmp.SbP6qw5nZb/ephemeral_component.py`
[KFP Executor 2025-03-24 20:12:24,614 INFO]: Loading KFP component "read_files" from /tmp/tmp.SbP6qw5nZb/ephemeral_component.py (directory "/tmp/tmp.SbP6qw5nZb" and module name "ephemeral_component")
[KFP Executor 2025-03-24 20:12:24,614 INFO]: Got executor_input:
{
    "inputs": {
        "artifacts": {
            "files": {
                "artifacts": [
                    {
                        "name": "440",
                        "type": {
                            "instanceSchema": ""
                        },
                        "uri": "minio://mlpipeline/v2/artifacts/collected-artifact-pipeline/28c3aaa7-226d-4300-b973-6fe55ad6f76e/create-file/ddfbc49b-8d72-42f3-a757-7fdd00abef8a/file",
                        "metadata": {
                            "display_name": "file",
                            "store_session_info": "{\"Provider\":\"minio\",\"Params\":{\"accessKeyKey\":\"accesskey\",\"disableSSL\":\"true\",\"endpoint\":\"10.96.103.164:9000\",\"fromEnv\":\"false\",\"region\":\"minio\",\"secretKeyKey\":\"secretkey\",\"secretName\":\"mlpipeline-minio-artifact\"}}"
                        }
                    },
                    {
                        "name": "441",
                        "type": {
                            "instanceSchema": ""
                        },
                        "uri": "minio://mlpipeline/v2/artifacts/collected-artifact-pipeline/28c3aaa7-226d-4300-b973-6fe55ad6f76e/create-file/437be769-9cc6-4388-84fc-2a8a42538198/file",
                        "metadata": {
                            "display_name": "file",
                            "store_session_info": "{\"Provider\":\"minio\",\"Params\":{\"accessKeyKey\":\"accesskey\",\"disableSSL\":\"true\",\"endpoint\":\"10.96.103.164:9000\",\"fromEnv\":\"false\",\"region\":\"minio\",\"secretKeyKey\":\"secretkey\",\"secretName\":\"mlpipeline-minio-artifact\"}}"
                        }
                    },
                    {
                        "name": "442",
                        "type": {
                            "instanceSchema": ""
                        },
                        "uri": "minio://mlpipeline/v2/artifacts/collected-artifact-pipeline/28c3aaa7-226d-4300-b973-6fe55ad6f76e/create-file/a5bf1932-3407-412b-a96b-68c177a382c1/file",
                        "metadata": {
                            "display_name": "file",
                            "store_session_info": "{\"Provider\":\"minio\",\"Params\":{\"accessKeyKey\":\"accesskey\",\"disableSSL\":\"true\",\"endpoint\":\"10.96.103.164:9000\",\"fromEnv\":\"false\",\"region\":\"minio\",\"secretKeyKey\":\"secretkey\",\"secretName\":\"mlpipeline-minio-artifact\"}}"
                        }
                    }
                ]
            }
        }
    },
    "outputs": {
        "parameters": {
            "Output": {
                "outputFile": "/tmp/kfp/outputs/Output"
            }
        },
        "outputFile": "/tmp/kfp_outputs/output_metadata.json"
    }
}
inner_annotation:  <class 'kfp.dsl.types.artifact_types.Artifact'>
param name:  files
annotation:  <class 'kfp.dsl.types.artifact_types.Artifact'>
runtime_artifact:  {'name': '440', 'type': {'instanceSchema': ''}, 'uri': 'minio://mlpipeline/v2/artifacts/collected-artifact-pipeline/28c3aaa7-226d-4300-b973-6fe55ad6f76e/create-file/ddfbc49b-8d72-42f3-a757-7fdd00abef8a/file', 'metadata': {'display_name': 'file', 'store_session_info': '{"Provider":"minio","Params":{"accessKeyKey":"accesskey","disableSSL":"true","endpoint":"10.96.103.164:9000","fromEnv":"false","region":"minio","secretKeyKey":"secretkey","secretName":"mlpipeline-minio-artifact"}}'}}
artifact_cls to create artifact instance:  <class 'kfp.dsl.types.artifact_types.Artifact'>
param name:  files
annotation:  <class 'kfp.dsl.types.artifact_types.Artifact'>
runtime_artifact:  {'name': '441', 'type': {'instanceSchema': ''}, 'uri': 'minio://mlpipeline/v2/artifacts/collected-artifact-pipeline/28c3aaa7-226d-4300-b973-6fe55ad6f76e/create-file/437be769-9cc6-4388-84fc-2a8a42538198/file', 'metadata': {'display_name': 'file', 'store_session_info': '{"Provider":"minio","Params":{"accessKeyKey":"accesskey","disableSSL":"true","endpoint":"10.96.103.164:9000","fromEnv":"false","region":"minio","secretKeyKey":"secretkey","secretName":"mlpipeline-minio-artifact"}}'}}
artifact_cls to create artifact instance:  <class 'kfp.dsl.types.artifact_types.Artifact'>
param name:  files
annotation:  <class 'kfp.dsl.types.artifact_types.Artifact'>
runtime_artifact:  {'name': '442', 'type': {'instanceSchema': ''}, 'uri': 'minio://mlpipeline/v2/artifacts/collected-artifact-pipeline/28c3aaa7-226d-4300-b973-6fe55ad6f76e/create-file/a5bf1932-3407-412b-a96b-68c177a382c1/file', 'metadata': {'display_name': 'file', 'store_session_info': '{"Provider":"minio","Params":{"accessKeyKey":"accesskey","disableSSL":"true","endpoint":"10.96.103.164:9000","fromEnv":"false","region":"minio","secretKeyKey":"secretkey","secretName":"mlpipeline-minio-artifact"}}'}}
artifact_cls to create artifact instance:  <class 'kfp.dsl.types.artifact_types.Artifact'>
type_annotations.is_list_of_artifacts(v): typing.List[kfp.dsl.types.artifact_types.Artifact]
Reading artifact 440 file: /minio/mlpipeline/v2/artifacts/collected-artifact-pipeline/28c3aaa7-226d-4300-b973-6fe55ad6f76e/create-file/ddfbc49b-8d72-42f3-a757-7fdd00abef8a/file
s1
Reading artifact 441 file: /minio/mlpipeline/v2/artifacts/collected-artifact-pipeline/28c3aaa7-226d-4300-b973-6fe55ad6f76e/create-file/437be769-9cc6-4388-84fc-2a8a42538198/file
s2
Reading artifact 442 file: /minio/mlpipeline/v2/artifacts/collected-artifact-pipeline/28c3aaa7-226d-4300-b973-6fe55ad6f76e/create-file/a5bf1932-3407-412b-a96b-68c177a382c1/file
s3
[KFP Executor 2025-03-24 20:12:24,615 INFO]: Wrote executor output file to /tmp/kfp_outputs/output_metadata.json.
I0324 20:12:24.620573      21 launcher_v2.go:806] ExecutorOutput: {
  "parameterValues": {
    "Output": "files read"
  }
}
I0324 20:12:24.633672      21 launcher_v2.go:188] publish success.
I0324 20:12:24.657377      21 client.go:724] Attempting to update DAG state
time="2025-03-24T20:12:25.248Z" level=info msg="sub-process exited" argo=true error="<nil>"

@zazulam
Copy link
Contributor Author

zazulam commented Mar 24, 2025

I think there should be some consensus on how the UI should depict the fan-in of artifacts since there is a dag UI element associated with them, but we can save that for another PR 😃.

@zazulam zazulam force-pushed the collected branch 3 times, most recently from 42c03a7 to 065dede Compare March 31, 2025 23:18
@zazulam zazulam force-pushed the collected branch 4 times, most recently from 18ae546 to 635321d Compare April 7, 2025 19:20
@mprahl
Copy link
Contributor

mprahl commented Apr 8, 2025

/rerun-workflow "KFP e2e tests"

Copy link
Collaborator

@droctothorpe droctothorpe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Thanks for addressing the barrage of feedback, @zazulam. This PR is a legit exception to this principle:
image

Copy link
Contributor

@mprahl mprahl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/lgtm
/approve

Awesome work!

zazulam added 2 commits May 5, 2025 17:47
Signed-off-by: zazulam <[email protected]>

To enable users to use loops similar to subdags, the initial collecting
implementation went only 1 layer deep of loops/subdags. This
implementation serves to handle multifacted approaches of pipelines that
users can generate.
@HumairAK
Copy link
Collaborator

HumairAK commented May 5, 2025

/lgtm
/approve

amazing work all around folks!!

@zazulam ++
@droctothorpe ++
@mprahl ++

Copy link

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: HumairAK, mprahl

The full list of commands accepted by this bot can be found here.

The pull request process is described here

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@google-oss-prow google-oss-prow bot merged commit ed828b5 into kubeflow:master May 5, 2025
64 checks passed
@github-project-automation github-project-automation bot moved this from In Review to Done in KFP Project Tracker May 5, 2025
alyssacgoins added a commit to alyssacgoins/data-science-pipelines that referenced this pull request May 7, 2025
Signed-off-by: agoins <[email protected]>

Unit test to validate 2-template creation

Signed-off-by: agoins <[email protected]>

Integration test case to validate set_retry all args

Signed-off-by: agoins <[email protected]>

Integration test to validate failure with no retry

Signed-off-by: agoins <[email protected]>

pipeline failure w/out retry - raise exception

Signed-off-by: agoins <[email protected]>

Update integration tests

Signed-off-by: agoins <[email protected]>

Update integration test

Signed-off-by: agoins <[email protected]>

retryStrategy helper method return empty slice not nil

Signed-off-by: agoins <[email protected]>

refactor setting parameters

Signed-off-by: agoins <[email protected]>

Update test master file

Signed-off-by: agoins <[email protected]>

Remove tests not using currently

Signed-off-by: agoins <[email protected]>

Refactor input parameters

Signed-off-by: agoins <[email protected]>

Revert unnecessary syntax changes.

Signed-off-by: agoins <[email protected]>

Remove unnecessary syntax changes

Signed-off-by: agoins <[email protected]>

Refactor getTaskRetryParameters

Signed-off-by: agoins <[email protected]>

getTaskRetryParameters()

Signed-off-by: agoins <[email protected]>

Remove unnecessary string formatting

Signed-off-by: agoins <[email protected]>

Add pointers in helper method getTaskRetryParameters()

Signed-off-by: agoins <[email protected]>

Update test case with default behavior

Signed-off-by: agoins <[email protected]>

rename templateParameterInstructions

Signed-off-by: agoins <[email protected]>

Add comments

Signed-off-by: agoins <[email protected]>

unit test for 2-template template case

Signed-off-by: agoins <[email protected]>

Update integration tests for setRetry

Signed-off-by: agoins <[email protected]>

reformat backoff duration

Signed-off-by: agoins <[email protected]>

add comments to templateParameterInstructions

Signed-off-by: agoins <[email protected]>

Split getTaskRetryParameters to handle set value separately

Signed-off-by: agoins <[email protected]>

Address PR comments

Signed-off-by: agoins <[email protected]>

Update samples/v2/sample_test.py

Co-authored-by: Matt Prahl <[email protected]>
Signed-off-by: Alyssa Goins <[email protected]>

Update sample.py pipeline retry tests

Signed-off-by: agoins <[email protected]>

Update backend retry integration test expected behavior

Signed-off-by: agoins <[email protected]>

add name field to pipeline annotations

Signed-off-by: agoins <[email protected]>

Refactor getTaskRetryParameters() logic.

Signed-off-by: agoins <[email protected]>

Update samples test formatting.

Signed-off-by: agoins <[email protected]>

fix pipeline annotation

Signed-off-by: agoins <[email protected]>

reformat sample_test retry tests

Signed-off-by: agoins <[email protected]>

refactor sample test retry files

Signed-off-by: agoins <[email protected]>

refactor sample test retry files

Signed-off-by: agoins <[email protected]>

update input from int to string

Signed-off-by: agoins <[email protected]>

Update sample test retry units.

Signed-off-by: agoins <[email protected]>

Appropriate retry duration units

Signed-off-by: agoins <[email protected]>

Update PR comments.

Signed-off-by: agoins <[email protected]>

feat(backend/sdk): enable dsl.Collected for parameters & artifacts (kubeflow#11725)

* feat(backend/sdk): enable dsl.Collected for params & artifacts

Signed-off-by: zazulam <[email protected]>

* feat(backend): collect through loops & dags

Signed-off-by: zazulam <[email protected]>

To enable users to use loops similar to subdags, the initial collecting
implementation went only 1 layer deep of loops/subdags. This
implementation serves to handle multifacted approaches of pipelines that
users can generate.

---------

Signed-off-by: zazulam <[email protected]>

fix(sdk): fix pip install for dev (kubeflow#11891)

Signed-off-by: Daniel Dowler <[email protected]>

chore: Adding Adopters file for CNCF graduation (kubeflow#11894)

Signed-off-by: Francisco Javier Arceo <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
Development

Successfully merging this pull request may close these issues.

[sdk] Unable to aggregate results over ParallelFor in Kubeflow V2 using V1 workarounds such as .after()
6 participants