-
Notifications
You must be signed in to change notification settings - Fork 1.7k
feat(backend/sdk): enable dsl.Collected for parameters & artifacts #11725
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
c6c8142
to
4146148
Compare
b82df21
to
e9cf281
Compare
/retest |
After rebasing CI started to fail. |
/retest |
4898385
to
45abf43
Compare
Tests are finally passing after rebasing again 🙏 |
Here's visuals of the sample test supplied with the updates, along with the logs of the main container from the launcher pod of the read-files component, the components here are using a custom image I built that installs the sdk with the changes from this PR, along with some additional print statements to verify the types used when parsing the artifacts from the artifact list. Logs:
|
I think there should be some consensus on how the UI should depict the fan-in of artifacts since there is a dag UI element associated with them, but we can save that for another PR 😃. |
42c03a7
to
065dede
Compare
18ae546
to
635321d
Compare
/rerun-workflow "KFP e2e tests" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Thanks for addressing the barrage of feedback, @zazulam. This PR is a legit exception to this principle:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/lgtm
/approve
Awesome work!
Signed-off-by: zazulam <[email protected]>
Signed-off-by: zazulam <[email protected]> To enable users to use loops similar to subdags, the initial collecting implementation went only 1 layer deep of loops/subdags. This implementation serves to handle multifacted approaches of pipelines that users can generate.
/lgtm amazing work all around folks!! @zazulam ++ |
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: HumairAK, mprahl The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
Signed-off-by: agoins <[email protected]> Unit test to validate 2-template creation Signed-off-by: agoins <[email protected]> Integration test case to validate set_retry all args Signed-off-by: agoins <[email protected]> Integration test to validate failure with no retry Signed-off-by: agoins <[email protected]> pipeline failure w/out retry - raise exception Signed-off-by: agoins <[email protected]> Update integration tests Signed-off-by: agoins <[email protected]> Update integration test Signed-off-by: agoins <[email protected]> retryStrategy helper method return empty slice not nil Signed-off-by: agoins <[email protected]> refactor setting parameters Signed-off-by: agoins <[email protected]> Update test master file Signed-off-by: agoins <[email protected]> Remove tests not using currently Signed-off-by: agoins <[email protected]> Refactor input parameters Signed-off-by: agoins <[email protected]> Revert unnecessary syntax changes. Signed-off-by: agoins <[email protected]> Remove unnecessary syntax changes Signed-off-by: agoins <[email protected]> Refactor getTaskRetryParameters Signed-off-by: agoins <[email protected]> getTaskRetryParameters() Signed-off-by: agoins <[email protected]> Remove unnecessary string formatting Signed-off-by: agoins <[email protected]> Add pointers in helper method getTaskRetryParameters() Signed-off-by: agoins <[email protected]> Update test case with default behavior Signed-off-by: agoins <[email protected]> rename templateParameterInstructions Signed-off-by: agoins <[email protected]> Add comments Signed-off-by: agoins <[email protected]> unit test for 2-template template case Signed-off-by: agoins <[email protected]> Update integration tests for setRetry Signed-off-by: agoins <[email protected]> reformat backoff duration Signed-off-by: agoins <[email protected]> add comments to templateParameterInstructions Signed-off-by: agoins <[email protected]> Split getTaskRetryParameters to handle set value separately Signed-off-by: agoins <[email protected]> Address PR comments Signed-off-by: agoins <[email protected]> Update samples/v2/sample_test.py Co-authored-by: Matt Prahl <[email protected]> Signed-off-by: Alyssa Goins <[email protected]> Update sample.py pipeline retry tests Signed-off-by: agoins <[email protected]> Update backend retry integration test expected behavior Signed-off-by: agoins <[email protected]> add name field to pipeline annotations Signed-off-by: agoins <[email protected]> Refactor getTaskRetryParameters() logic. Signed-off-by: agoins <[email protected]> Update samples test formatting. Signed-off-by: agoins <[email protected]> fix pipeline annotation Signed-off-by: agoins <[email protected]> reformat sample_test retry tests Signed-off-by: agoins <[email protected]> refactor sample test retry files Signed-off-by: agoins <[email protected]> refactor sample test retry files Signed-off-by: agoins <[email protected]> update input from int to string Signed-off-by: agoins <[email protected]> Update sample test retry units. Signed-off-by: agoins <[email protected]> Appropriate retry duration units Signed-off-by: agoins <[email protected]> Update PR comments. Signed-off-by: agoins <[email protected]> feat(backend/sdk): enable dsl.Collected for parameters & artifacts (kubeflow#11725) * feat(backend/sdk): enable dsl.Collected for params & artifacts Signed-off-by: zazulam <[email protected]> * feat(backend): collect through loops & dags Signed-off-by: zazulam <[email protected]> To enable users to use loops similar to subdags, the initial collecting implementation went only 1 layer deep of loops/subdags. This implementation serves to handle multifacted approaches of pipelines that users can generate. --------- Signed-off-by: zazulam <[email protected]> fix(sdk): fix pip install for dev (kubeflow#11891) Signed-off-by: Daniel Dowler <[email protected]> chore: Adding Adopters file for CNCF graduation (kubeflow#11894) Signed-off-by: Francisco Javier Arceo <[email protected]>
Description of your changes:
Jumping off from #11627, these changes resolve #10050 in regards to the using
dsl.Collected
for both parameters and artifacts in pipelines. Currently for artifacts, the executor needs to be updated in the sdk and have a release prior to tests being enabled. As for parameters that should work out of the box with solely backend changes.This PR introduces a few new methods to help ease with detecting and resolving collected inputs as well as cleaning up some of the shared logic between
resolveUpstreamParameters
&resolveUpstreamArtifacts
.A few things to keep in mind:
A parallelFor creates
x+1
dags, wherex
is the number of iterations. The extra dag is something I like to refer to as the parallelFor Head dag, being that it only contains the iteration dags within it.With these changes, the keys within the task map returned from
getDAGTasks
will now contain the parent dag id associated with the task. This is to help maintain uniqueness, also when detecting a parallelFor iteration task/dag, the index will be added to the task name to further prevent any potential collisions and map the appropriate tasks within the same iteration for input/output resolution.When resolving the inputs, if the current Task is a DAG the driver now first checks if the it is a parallelFor by inspecting if there exists an
iteration_count
custom property, as only parallelFor heads have that property. If found, the iteration dag names will be added to a queue that theCollectInputs
will use to start the resolution search.New methods:
CollectInputs
- Performs a BFS on the tasks passed to reach the final producer task iterations and collects the values into the appropriate structure for either Artifacts or ParametersCollectContainerOutput
- Helper function for processing the output of a containerExecution, currently only used in theCollectInputs
.*
GetProducerTask
- Performs the check on a task for the potential new producer task and output parameter/artifact key and returns the updated value if found.InferIndexedTaskName
- Used to update the producer task with the appropriate index if the current dag context is a parallelFor iteration dag i.e. resolving task inputs within the loop.getParallelForIterationCount
- Helper to get the number of iterations / determine if a parallelFor head dag.GetParallelForTaskName
- Used to update the task name with the iteration value supplied.GetTaskNameWithDagID
- Used to update the task name with the it's parent's dag id.Checklist: