Skip to content

Commit 3a0cf14

Browse files
committed
Add support for dynamic pipelines to the Vertex orchestrator
1 parent d8bd68a commit 3a0cf14

File tree

5 files changed

+412
-182
lines changed

5 files changed

+412
-182
lines changed

docs/book/how-to/steps-pipelines/dynamic_pipelines.md

Lines changed: 3 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ description: Write dynamic pipelines
55
# Dynamic Pipelines (Experimental)
66

77
{% hint style="warning" %}
8-
**Experimental Feature**: Dynamic pipelines are currently an experimental feature. There are known issues and limitations, and the interface is subject to change. This feature is only supported by the `local` and `kubernetes` orchestrators. If you encounter any issues or have feedback, please let us know at [https://github.com/zenml-io/zenml/issues](https://github.com/zenml-io/zenml/issues).
8+
**Experimental Feature**: Dynamic pipelines are currently an experimental feature. There are known issues and limitations, and the interface is subject to change. This feature is only supported by the `local`, `kubernetes`, `sagemaker` and `vertex` orchestrators. If you encounter any issues or have feedback, please let us know at [https://github.com/zenml-io/zenml/issues](https://github.com/zenml-io/zenml/issues).
99
{% endhint %}
1010

1111
{% hint style="info" %}
@@ -265,26 +265,11 @@ When running multiple steps concurrently using `step.submit()`, a failure in one
265265
Dynamic pipelines are currently only supported by:
266266
- `local` orchestrator
267267
- `kubernetes` orchestrator
268+
- `sagemaker` orchestrator
269+
- `vertex` orchestrator
268270

269271
Other orchestrators will raise an error if you try to run a dynamic pipeline with them.
270272

271-
### Remote Execution Requirement
272-
273-
When running dynamic pipelines remotely (e.g., with the `kubernetes` orchestrator), you **must** include `depends_on` for at least one step in your pipeline definition. This is currently required due to a bug in remote execution.
274-
275-
{% hint style="warning" %}
276-
**Required for Remote Execution**: Without `depends_on`, remote execution will fail. This requirement does not apply when running locally with the `local` orchestrator.
277-
{% endhint %}
278-
279-
For example:
280-
281-
```python
282-
@pipeline(dynamic=True, depends_on=[some_step])
283-
def dynamic_pipeline():
284-
some_step()
285-
# ... rest of your pipeline
286-
```
287-
288273
### Artifact Loading
289274

290275
When you call `.load()` on an artifact in a dynamic pipeline, it synchronously loads the data. For large artifacts or when you want to maintain parallelism, consider passing the step outputs (future or artifact) directly to downstream steps instead of loading them.

src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1050,10 +1050,10 @@ def _wait_for_completion() -> None:
10501050
metadata=metadata,
10511051
)
10521052

1053-
def launch_dynamic_step(
1053+
def run_isolated_step(
10541054
self, step_run_info: "StepRunInfo", environment: Dict[str, str]
10551055
) -> None:
1056-
"""Launch a dynamic step.
1056+
"""Runs an isolated step on Sagemaker.
10571057
10581058
Args:
10591059
step_run_info: The step run information.

src/zenml/integrations/gcp/orchestrators/vertex_orchestrator.py

Lines changed: 168 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -63,17 +63,20 @@
6363
from kfp.compiler import Compiler
6464
from kfp.dsl.base_component import BaseComponent
6565

66+
from zenml import __version__
6667
from zenml.config.resource_settings import ResourceSettings
6768
from zenml.constants import (
6869
METADATA_ORCHESTRATOR_LOGS_URL,
6970
METADATA_ORCHESTRATOR_RUN_ID,
7071
METADATA_ORCHESTRATOR_URL,
72+
ORCHESTRATOR_DOCKER_IMAGE_KEY,
7173
)
7274
from zenml.entrypoints import StepEntrypointConfiguration
7375
from zenml.enums import ExecutionStatus, StackComponentType
7476
from zenml.integrations.gcp import GCP_ARTIFACT_STORE_FLAVOR
7577
from zenml.integrations.gcp.constants import (
7678
GKE_ACCELERATOR_NODE_SELECTOR_CONSTRAINT_LABEL,
79+
VERTEX_ENDPOINT_SUFFIX,
7780
)
7881
from zenml.integrations.gcp.flavors.vertex_orchestrator_flavor import (
7982
VertexOrchestratorConfig,
@@ -82,6 +85,10 @@
8285
from zenml.integrations.gcp.google_credentials_mixin import (
8386
GoogleCredentialsMixin,
8487
)
88+
from zenml.integrations.gcp.utils import (
89+
build_job_request,
90+
monitor_job,
91+
)
8592
from zenml.integrations.gcp.vertex_custom_job_parameters import (
8693
VertexCustomJobParameters,
8794
)
@@ -90,11 +97,18 @@
9097
from zenml.metadata.metadata_types import MetadataType, Uri
9198
from zenml.orchestrators import ContainerizedOrchestrator, SubmissionResult
9299
from zenml.orchestrators.utils import get_orchestrator_run_name
100+
from zenml.pipelines.dynamic.entrypoint_configuration import (
101+
DynamicPipelineEntrypointConfiguration,
102+
)
93103
from zenml.stack.stack_validator import StackValidator
104+
from zenml.step_operators.step_operator_entrypoint_configuration import (
105+
StepOperatorEntrypointConfiguration,
106+
)
94107
from zenml.utils.io_utils import get_global_config_directory
95108

96109
if TYPE_CHECKING:
97110
from zenml.config.base_settings import BaseSettings
111+
from zenml.config.step_run_info import StepRunInfo
98112
from zenml.models import (
99113
PipelineRunResponse,
100114
PipelineSnapshotResponse,
@@ -621,6 +635,151 @@ def dynamic_pipeline() -> None:
621635
schedule=snapshot.schedule,
622636
)
623637

638+
def submit_dynamic_pipeline(
639+
self,
640+
snapshot: "PipelineSnapshotResponse",
641+
stack: "Stack",
642+
environment: Dict[str, str],
643+
placeholder_run: Optional["PipelineRunResponse"] = None,
644+
) -> Optional[SubmissionResult]:
645+
"""Submits a dynamic pipeline to the orchestrator.
646+
647+
Args:
648+
snapshot: The pipeline snapshot to submit.
649+
stack: The stack the pipeline will run on.
650+
environment: Environment variables to set in the orchestration
651+
environment.
652+
placeholder_run: An optional placeholder run.
653+
654+
Raises:
655+
RuntimeError: If the snapshot contains a schedule.
656+
657+
Returns:
658+
Optional submission result.
659+
"""
660+
if snapshot.schedule:
661+
raise RuntimeError(
662+
"Scheduling dynamic pipelines is not supported for the "
663+
"Vertex orchestrator yet."
664+
)
665+
666+
settings = cast(
667+
VertexOrchestratorSettings, self.get_settings(snapshot)
668+
)
669+
670+
command = (
671+
DynamicPipelineEntrypointConfiguration.get_entrypoint_command()
672+
)
673+
args = DynamicPipelineEntrypointConfiguration.get_entrypoint_arguments(
674+
snapshot_id=snapshot.id,
675+
run_id=placeholder_run.id if placeholder_run else None,
676+
)
677+
678+
image = self.get_image(snapshot=snapshot)
679+
labels = settings.labels.copy()
680+
labels["source"] = f"zenml-{__version__.replace('.', '_')}"
681+
682+
job_request = build_job_request(
683+
display_name=get_orchestrator_run_name(
684+
pipeline_name=snapshot.pipeline_configuration.name
685+
),
686+
image=image,
687+
entrypoint_command=command + args,
688+
custom_job_settings=settings.custom_job_parameters
689+
or VertexCustomJobParameters(),
690+
resource_settings=snapshot.pipeline_configuration.resource_settings,
691+
environment=environment,
692+
labels=labels,
693+
encryption_spec_key_name=self.config.encryption_spec_key_name,
694+
service_account=self.config.workload_service_account,
695+
network=self.config.network,
696+
)
697+
698+
credentials, project_id = self._get_authentication()
699+
client_options = {
700+
"api_endpoint": self.config.location + VERTEX_ENDPOINT_SUFFIX
701+
}
702+
client = aiplatform.gapic.JobServiceClient(
703+
credentials=credentials, client_options=client_options
704+
)
705+
parent = f"projects/{project_id}/locations/{self.config.location}"
706+
logger.info(
707+
"Submitting custom job='%s', path='%s' to Vertex AI Training.",
708+
job_request["display_name"],
709+
parent,
710+
)
711+
job = client.create_custom_job(parent=parent, custom_job=job_request)
712+
713+
wait_for_completion = None
714+
if settings.synchronous:
715+
wait_for_completion = lambda: monitor_job(
716+
job_id=job.name,
717+
credentials_source=self,
718+
client_options=client_options,
719+
)
720+
721+
return SubmissionResult(
722+
wait_for_completion=wait_for_completion,
723+
)
724+
725+
def run_isolated_step(
726+
self, step_run_info: "StepRunInfo", environment: Dict[str, str]
727+
) -> None:
728+
"""Runs an isolated step on Vertex.
729+
730+
Args:
731+
step_run_info: The step run information.
732+
environment: The environment variables to set.
733+
"""
734+
settings = cast(
735+
VertexOrchestratorSettings, self.get_settings(step_run_info)
736+
)
737+
738+
image = step_run_info.get_image(key=ORCHESTRATOR_DOCKER_IMAGE_KEY)
739+
command = StepOperatorEntrypointConfiguration.get_entrypoint_command()
740+
args = StepOperatorEntrypointConfiguration.get_entrypoint_arguments(
741+
step_name=step_run_info.pipeline_step_name,
742+
snapshot_id=(step_run_info.snapshot.id),
743+
step_run_id=str(step_run_info.step_run_id),
744+
)
745+
746+
labels = settings.labels.copy()
747+
labels["source"] = f"zenml-{__version__.replace('.', '_')}"
748+
749+
job_request = build_job_request(
750+
display_name=f"{step_run_info.run_name}-{step_run_info.pipeline_step_name}",
751+
image=image,
752+
entrypoint_command=command + args,
753+
custom_job_settings=settings.custom_job_parameters
754+
or VertexCustomJobParameters(),
755+
resource_settings=step_run_info.config.resource_settings,
756+
environment=environment,
757+
labels=labels,
758+
encryption_spec_key_name=self.config.encryption_spec_key_name,
759+
service_account=self.config.workload_service_account,
760+
network=self.config.network,
761+
)
762+
763+
credentials, project_id = self._get_authentication()
764+
client_options = {
765+
"api_endpoint": self.config.location + VERTEX_ENDPOINT_SUFFIX
766+
}
767+
client = aiplatform.gapic.JobServiceClient(
768+
credentials=credentials, client_options=client_options
769+
)
770+
parent = f"projects/{project_id}/locations/{self.config.location}"
771+
logger.info(
772+
"Submitting custom job='%s', path='%s' to Vertex AI Training.",
773+
job_request["display_name"],
774+
parent,
775+
)
776+
job = client.create_custom_job(parent=parent, custom_job=job_request)
777+
monitor_job(
778+
job_id=job.name,
779+
credentials_source=self,
780+
client_options=client_options,
781+
)
782+
624783
def _upload_and_run_pipeline(
625784
self,
626785
pipeline_name: str,
@@ -786,19 +945,19 @@ def get_orchestrator_run_id(self) -> str:
786945
"""Returns the active orchestrator run id.
787946
788947
Raises:
789-
RuntimeError: If the environment variable specifying the run id
790-
is not set.
948+
RuntimeError: If the orchestrator run id cannot be read from the
949+
environment.
791950
792951
Returns:
793952
The orchestrator run id.
794953
"""
795-
try:
796-
return os.environ[ENV_ZENML_VERTEX_RUN_ID]
797-
except KeyError:
798-
raise RuntimeError(
799-
"Unable to read run id from environment variable "
800-
f"{ENV_ZENML_VERTEX_RUN_ID}."
801-
)
954+
for env in [ENV_ZENML_VERTEX_RUN_ID, "CLOUD_ML_JOB_ID"]:
955+
if env in os.environ:
956+
return os.environ[env]
957+
958+
raise RuntimeError(
959+
"Unable to get orchestrator run id from environment."
960+
)
802961

803962
def get_pipeline_run_metadata(
804963
self, run_id: UUID

0 commit comments

Comments
 (0)