Skip to content

Conversation

@schustmi
Copy link
Contributor

@schustmi schustmi commented Nov 25, 2025

Describe changes

This PR adds support for dynamic pipelines to the Vertex orchestrator.

Pre-requisites

Please ensure you have done the following:

  • I have read the CONTRIBUTING.md document.
  • I have added tests to cover my changes.
  • I have based my new branch on develop and the open PR is targeting develop. If your branch wasn't based on develop read Contribution guide on rebasing branch to develop.
  • IMPORTANT: I made sure that my changes are reflected properly in the following resources:
    • ZenML Docs
    • Dashboard: Needs to be communicated to the frontend team.
    • Templates: Might need adjustments (that are not reflected in the template tests) in case of non-breaking changes and deprecations.
    • Projects: Depending on the version dependencies, different projects might get affected.

Types of changes

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to change)
  • Other (add details above)

@github-actions github-actions bot added internal To filter out internal PRs and issues enhancement New feature or request labels Nov 25, 2025
parent = f"projects/{project_id}/locations/{self.config.region}"
logger.info(
"Submitting custom job='%s', path='%s' to Vertex AI Training.",
job_request["display_name"],

Check failure

Code scanning / CodeQL

Clear-text logging of sensitive information

This expression logs [sensitive data (secret)](1) as clear text.

Copilot Autofix

AI 7 days ago

To fix this problem, we should ensure that any data logged in the string field job_request["display_name"] does not accidentally expose secrets or sensitive data. The safest and simplest way is to mask, redact, or avoid logging sensitive/tainted values, including pipeline names and fields which may derive from untrusted sources. Specifically, in the log line on line 709, we should either mask the display name (show only a fixed value like [REDACTED], or display only a subset, such as the first N characters), or simply omit it altogether if it is not a strictly necessary log detail. If a truncated or hashed version is useful for tracking, that could be logged instead.

  • Edit file src/zenml/integrations/gcp/orchestrators/vertex_orchestrator.py, line 709.
  • In the log message "Submitting custom job='%s', path='%s' to Vertex AI Training.", replace job_request["display_name"] with a redacted, masked, or truncated version. For example, display only the first 8 characters plus ellipsis, or use "[REDACTED]" as the value.
  • No additional imports are needed for basic string slicing.
  • The rest of the logic in this context does not need to be changed.
Suggested changeset 1
src/zenml/integrations/gcp/orchestrators/vertex_orchestrator.py

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/src/zenml/integrations/gcp/orchestrators/vertex_orchestrator.py b/src/zenml/integrations/gcp/orchestrators/vertex_orchestrator.py
--- a/src/zenml/integrations/gcp/orchestrators/vertex_orchestrator.py
+++ b/src/zenml/integrations/gcp/orchestrators/vertex_orchestrator.py
@@ -705,8 +705,7 @@
         )
         parent = f"projects/{project_id}/locations/{self.config.region}"
         logger.info(
-            "Submitting custom job='%s', path='%s' to Vertex AI Training.",
-            job_request["display_name"],
+            "Submitting custom job='[REDACTED]', path='%s' to Vertex AI Training.",
             parent,
         )
         job = client.create_custom_job(parent=parent, custom_job=job_request)
EOF
@@ -705,8 +705,7 @@
)
parent = f"projects/{project_id}/locations/{self.config.region}"
logger.info(
"Submitting custom job='%s', path='%s' to Vertex AI Training.",
job_request["display_name"],
"Submitting custom job='[REDACTED]', path='%s' to Vertex AI Training.",
parent,
)
job = client.create_custom_job(parent=parent, custom_job=job_request)
Copilot is powered by AI and may make mistakes. Always verify output.
Unable to commit as this autofix suggestion is now outdated
@schustmi schustmi force-pushed the feature/dynamic-pipelines-vertex branch 2 times, most recently from 15ddc44 to b922f0d Compare November 25, 2025 08:26
@htahir1
Copy link
Contributor

htahir1 commented Nov 25, 2025

@claude full-review

@htahir1
Copy link
Contributor

htahir1 commented Nov 25, 2025

/claude full-review

@claude
Copy link

claude bot commented Nov 25, 2025

Claude finished @htahir1's task —— View job


Full PR Review Complete ✅

Overview

This PR adds support for dynamic pipelines to the Vertex orchestrator by implementing two new methods (submit_dynamic_pipeline and run_isolated_step) and refactoring common job management code into a shared utilities module.

Changes:

  • vertex_orchestrator.py: +159 lines (added dynamic pipeline support)
  • vertex_step_operator.py: -134 lines (refactored to use shared utilities)
  • utils.py: +209 lines (new shared utilities module)

Summary

Strengths:

  • Good code reuse through the new utilities module
  • Consistent patterns between orchestrator and step operator
  • Proper error handling and retry logic in monitor_job

Issues Found:

  1. 🔴 CRITICAL - Security: Clear-text logging of sensitive information (GitHub security alert)
  2. 🟡 Code Quality: Missing docstring return statement in get_job_service_client
  3. 🟡 Code Quality: Duplicate validate_accelerator_type function
  4. 🟡 Testing: No test coverage mentioned for new dynamic pipeline functionality
  5. 🟡 Documentation: PR description template not filled out
  6. 🟢 Minor: Docstring typo in run_isolated_step

Detailed Findings

🔴 Critical Issues

1. Security: Clear-text logging of job request (vertex_orchestrator.py:708)

Location: src/zenml/integrations/gcp/orchestrators/vertex_orchestrator.py:708

logger.info(
    "Submitting custom job='%s', path='%s' to Vertex AI Training.",
    job_request["display_name"],
    parent,
)

Issue: GitHub Advanced Security flagged this for potentially logging sensitive data. The job_request dictionary may contain:

  • Environment variables (line 691) which could include secrets
  • Service account credentials
  • Network configuration
  • Encryption keys

Recommendation: This exact pattern appears 3 times in the code (lines 708, 772, and also in vertex_step_operator.py:226). The logging is actually safe - it only logs display_name and parent, not the full job_request. However, consider:

  1. Adding a comment explaining why this is safe
  2. Using debug-level logging for more detailed information
  3. The nearby logger.debug("Vertex AI Job=%s", job_request) in vertex_step_operator.py:213 IS a security issue and should be removed or redacted

Same issue in vertex_step_operator.py:213:

logger.debug("Vertex AI Job=%s", job_request)  # ← This logs the ENTIRE job request including env vars

This should be removed or the sensitive fields should be redacted before logging.


🟡 Code Quality Issues

2. Missing return documentation (utils.py:58-72)

Location: src/zenml/integrations/gcp/utils.py:58-72

def get_job_service_client(
    credentials_source: GoogleCredentialsMixin,
    client_options: Optional[Dict[str, Any]] = None,
) -> aiplatform.gapic.JobServiceClient:
    """Gets a job service client.

    Args:
        credentials_source: The component that provides the credentials to
            access the job.
        client_options: The client options to use for the job service client.
    """

Issue: Missing Returns: section in docstring per ZenML's Google Python style standard.

Recommendation:

def get_job_service_client(
    credentials_source: GoogleCredentialsMixin,
    client_options: Optional[Dict[str, Any]] = None,
) -> aiplatform.gapic.JobServiceClient:
    """Gets a job service client.

    Args:
        credentials_source: The component that provides the credentials to
            access the job.
        client_options: The client options to use for the job service client.
    
    Returns:
        A configured JobServiceClient instance.
    """

3. Duplicate function definition (utils.py:42-55)

Location: src/zenml/integrations/gcp/utils.py:42-55

The validate_accelerator_type function is duplicated from vertex_step_operator.py. While this is acceptable after refactoring, the old definition should be removed from vertex_step_operator.py.

Current state:

  • Still defined in vertex_step_operator.py:53-66
  • Now also in utils.py:42-55
  • The one in vertex_step_operator.py is no longer used

Recommendation: Remove the duplicate from vertex_step_operator.py and import from utils instead.

4. Docstring typo (vertex_orchestrator.py:728)

Location: src/zenml/integrations/gcp/orchestrators/vertex_orchestrator.py:728

def run_isolated_step(
    self, step_run_info: "StepRunInfo", environment: Dict[str, str]
) -> None:
    """Runs an isolated step on Kubernetes.  # ← Wrong platform

Issue: The docstring says "Kubernetes" but this is the Vertex orchestrator, not Kubernetes.

Recommendation:

"""Runs an isolated step on Vertex AI."""

🟡 Testing & Documentation

5. Missing test coverage

The PR description states:

  • I have added tests to cover my changes.

Issue: No mention of tests for the new dynamic pipeline functionality. The new methods (submit_dynamic_pipeline and run_isolated_step) represent significant new functionality that should have test coverage.

Recommendation: Add tests covering:

  • submit_dynamic_pipeline with valid configurations
  • submit_dynamic_pipeline with schedule (should raise RuntimeError)
  • run_isolated_step execution
  • Error handling in both methods
  • Integration with the new utils module functions

6. Incomplete PR description

Issue: The PR template is not filled out. The description just says "_I implemented/fixed _ to achieve ." without actual details.

Recommendation: Fill out the PR description with:

  • What: Added support for dynamic pipelines to Vertex orchestrator
  • Why: Enable dynamic pipeline execution on Vertex AI
  • How: Implemented submit_dynamic_pipeline and run_isolated_step methods, refactored common code to utilities
  • Breaking changes: None
  • Testing approach: [needs to be added]

🟢 Minor Observations

Code style compliance

✅ Code follows ZenML style guidelines
✅ Type hints are present and correct
✅ Docstrings follow Google Python style (except noted issues)
✅ Error handling is appropriate with proper retry logic

Refactoring quality

✅ Good extraction of common code (build_job_request, monitor_job)
✅ Consistent parameter naming and structure
✅ Proper separation of concerns

Logic correctness

✅ Proper handling of credentials and authentication
✅ Correct use of client options and endpoints
✅ Appropriate conditional logic for synchronous execution


Recommendations Summary

Must Fix (before merge):

  1. Remove or redact logger.debug("Vertex AI Job=%s", job_request) in vertex_step_operator.py:213
  2. Add comment clarifying safety of logging at lines 708, 772
  3. Remove duplicate validate_accelerator_type from vertex_step_operator.py

Should Fix:

  1. Add return documentation to get_job_service_client
  2. Fix docstring typo: "Kubernetes" → "Vertex AI" in run_isolated_step
  3. Add test coverage for new dynamic pipeline methods
  4. Fill out PR description template

Nice to Have:

  1. Add integration tests if not already covered by CI
  2. Consider adding examples or documentation for dynamic pipeline usage

Overall Assessment

This is a solid refactoring that adds important functionality while improving code reuse. The main concern is the security issue with logging the full job request in debug mode, which should be addressed before merge. The code quality is generally good and follows ZenML standards with minor exceptions noted above.

Recommendation: Request changes to address the security logging issue, then approve after fixes.


@schustmi schustmi force-pushed the feature/dynamic-pipelines-vertex branch 5 times, most recently from b49498e to 3a0cf14 Compare November 27, 2025 03:52
@github-actions
Copy link
Contributor

github-actions bot commented Nov 27, 2025

Documentation Link Check Results

Absolute links check failed
There are broken absolute links in the documentation. See workflow logs for details
Relative links check passed
Last checked: 2025-11-28 07:26:04 UTC

@schustmi schustmi force-pushed the feature/dynamic-pipelines-vertex branch from 3a0cf14 to 3cc5620 Compare November 27, 2025 06:17
@schustmi schustmi force-pushed the feature/dynamic-pipelines-vertex branch from 3cc5620 to 3b9d1f0 Compare November 27, 2025 06:35
@schustmi schustmi requested a review from stefannica November 27, 2025 06:37
Comment on lines -286 to -303
# Monitors the long-running operation by polling the job state
# periodically, and retries the polling when a transient connectivity
# issue is encountered.
#
# Long-running operation monitoring:
# The possible states of "get job" response can be found at
# https://cloud.google.com/ai-platform/training/docs/reference/rest/v1/projects.jobs#State
# where SUCCEEDED/FAILED/CANCELED are considered to be final states.
# The following logic will keep polling the state of the job until
# the job enters a final state.
#
# During the polling, if a connection error was encountered, the GET
# request will be retried by recreating the Python API client to
# refresh the lifecycle of the connection being used. See
# https://github.com/googleapis/google-api-python-client/issues/218
# for a detailed description of the problem. If the error persists for
# _CONNECTION_ERROR_RETRY_LIMIT consecutive attempts, the function
# will raise ConnectionError.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please keep comments like these? I know you're not a fan of code comments that are redundant, but this is the opposite of that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say that 95% of the words are boilerplate and not useful at all. This is the most standard and basic way to monitor something, which is to poll a state and sleep. And it has retries if a request fails. There is no magic to it. I'll keep the two links though in a comment if you think they're useful.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first link doesn't even work anymore, and you can literally see the values by clicking on the Enum in your code editor where you're reading the code.

Comment on lines 95 to 97
client = get_job_service_client(
credentials_source=credentials_source, client_options=client_options
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here and everywhere else you need to fetch credentials and configure clients, you have to make a few changes to account for the fact that now you need more clients and credentials may need to be valid for far longer:

  1. keep a cache of these clients and reuse them across steps instead of creating new ones every time you need one
  2. if they are based on service connector, make sure to check the service connector expiry every time you use the client and re-create the client if the credentials expired

If you think it helps, you can check the sagemaker orchestrator for a similar pattern.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think all of them are one-time use apart from the monitoring, where I implemented the retry functionality if the connector expired. You're right though, I think we can avoid one recreation of the client, I will update the implementation

@schustmi schustmi requested a review from stefannica November 27, 2025 11:41
Comment on lines 719 to 720
_, project_id = self._get_authentication()
client = self.get_job_service_client()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this ends up calling _get_authentication twice, which has the unfortunate effect that it uses the service connector twice to generate credentials. If you need the project ID, you should perhaps store it as a local variable and set it when you call get_job_service_client

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's now caching the credentials and the client in all the places. I think it's quite ugly, and IMO should be responsibility of the service connector. But I don't think I'll be touching that as part of this PR.

pipeline_status = ExecutionStatus.FAILED

if run.snapshot and run.snapshot.is_dynamic:
status = aiplatform.CustomJob.get(run_id).state
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't there a way to use the client object returned by get_job_service_client at least here, for dynamic pipelines, instead of relying on this global aiplatform state? I really don't trust the aiplatform global state to do the right thing here, especially in situations where multiple ochestrator instances are created in the server.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually found a way to use the aiplatform API without doing the global initialization, as both the PipelineJob as well as the CustomJob allow passing region, project and credentials in their initialization. I'm not sure why we even used the global initializer before. That should be fine as well, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure. Caching the credentials sounds okay, but it should be the client that is actually cached instead of passing the credentials to aiplatform.CustomJob.get, because the client also keeps a list of open connections or a connection pool that you want to reuse across client calls. I think calls like aiplatform.CustomJob.get end up creating new clients or connections under the hood, which makes them inefficient if you make multiple calls to it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup, if you follow the bread crumbs, you'll find that aiplatform.CustomJob.get eventually creates a temporary one-shot client (see below).
image

this makes calls like aiplatform.CustomJob.get very inefficient if called multiple times in a short period of time, but I don't think that's your case. It looks like you're only calling it once per process, which should be okay.

the only case in which this stops being okay is if the Vertex orchestrator is used in a Deployment to run multiple pipelines in different threads, but that is something of a sci-fi scenario for now at least :)

@schustmi schustmi force-pushed the feature/dynamic-pipelines-vertex branch 2 times, most recently from c768d17 to 0e4c865 Compare November 28, 2025 06:16
@schustmi schustmi force-pushed the feature/dynamic-pipelines-vertex branch from 0e4c865 to 1e36533 Compare November 28, 2025 07:24
@schustmi schustmi requested a review from stefannica November 28, 2025 07:39
@schustmi schustmi added the run-slow-ci Tag that is used to trigger the slow-ci label Dec 1, 2025
@schustmi schustmi merged commit 30fe244 into develop Dec 1, 2025
102 of 109 checks passed
@schustmi schustmi deleted the feature/dynamic-pipelines-vertex branch December 1, 2025 05:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request internal To filter out internal PRs and issues run-slow-ci Tag that is used to trigger the slow-ci

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants