-
Notifications
You must be signed in to change notification settings - Fork 559
Add support for dynamic pipelines to the Vertex orchestrator #4246
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| parent = f"projects/{project_id}/locations/{self.config.region}" | ||
| logger.info( | ||
| "Submitting custom job='%s', path='%s' to Vertex AI Training.", | ||
| job_request["display_name"], |
Check failure
Code scanning / CodeQL
Clear-text logging of sensitive information
Show autofix suggestion
Hide autofix suggestion
Copilot Autofix
AI 7 days ago
To fix this problem, we should ensure that any data logged in the string field job_request["display_name"] does not accidentally expose secrets or sensitive data. The safest and simplest way is to mask, redact, or avoid logging sensitive/tainted values, including pipeline names and fields which may derive from untrusted sources. Specifically, in the log line on line 709, we should either mask the display name (show only a fixed value like [REDACTED], or display only a subset, such as the first N characters), or simply omit it altogether if it is not a strictly necessary log detail. If a truncated or hashed version is useful for tracking, that could be logged instead.
- Edit file
src/zenml/integrations/gcp/orchestrators/vertex_orchestrator.py, line 709. - In the log message
"Submitting custom job='%s', path='%s' to Vertex AI Training.", replacejob_request["display_name"]with a redacted, masked, or truncated version. For example, display only the first 8 characters plus ellipsis, or use "[REDACTED]" as the value. - No additional imports are needed for basic string slicing.
- The rest of the logic in this context does not need to be changed.
-
Copy modified line R708
| @@ -705,8 +705,7 @@ | ||
| ) | ||
| parent = f"projects/{project_id}/locations/{self.config.region}" | ||
| logger.info( | ||
| "Submitting custom job='%s', path='%s' to Vertex AI Training.", | ||
| job_request["display_name"], | ||
| "Submitting custom job='[REDACTED]', path='%s' to Vertex AI Training.", | ||
| parent, | ||
| ) | ||
| job = client.create_custom_job(parent=parent, custom_job=job_request) |
15ddc44 to
b922f0d
Compare
|
@claude full-review |
|
/claude full-review |
|
Claude finished @htahir1's task —— View job Full PR Review Complete ✅OverviewThis PR adds support for dynamic pipelines to the Vertex orchestrator by implementing two new methods ( Changes:
SummaryStrengths:
Issues Found:
Detailed Findings🔴 Critical Issues1. Security: Clear-text logging of job request (vertex_orchestrator.py:708)Location: logger.info(
"Submitting custom job='%s', path='%s' to Vertex AI Training.",
job_request["display_name"],
parent,
)Issue: GitHub Advanced Security flagged this for potentially logging sensitive data. The
Recommendation: This exact pattern appears 3 times in the code (lines 708, 772, and also in vertex_step_operator.py:226). The logging is actually safe - it only logs
Same issue in vertex_step_operator.py:213: logger.debug("Vertex AI Job=%s", job_request) # ← This logs the ENTIRE job request including env varsThis should be removed or the sensitive fields should be redacted before logging. 🟡 Code Quality Issues2. Missing return documentation (utils.py:58-72)Location: def get_job_service_client(
credentials_source: GoogleCredentialsMixin,
client_options: Optional[Dict[str, Any]] = None,
) -> aiplatform.gapic.JobServiceClient:
"""Gets a job service client.
Args:
credentials_source: The component that provides the credentials to
access the job.
client_options: The client options to use for the job service client.
"""Issue: Missing Recommendation: def get_job_service_client(
credentials_source: GoogleCredentialsMixin,
client_options: Optional[Dict[str, Any]] = None,
) -> aiplatform.gapic.JobServiceClient:
"""Gets a job service client.
Args:
credentials_source: The component that provides the credentials to
access the job.
client_options: The client options to use for the job service client.
Returns:
A configured JobServiceClient instance.
"""3. Duplicate function definition (utils.py:42-55)Location: The Current state:
Recommendation: Remove the duplicate from 4. Docstring typo (vertex_orchestrator.py:728)Location: def run_isolated_step(
self, step_run_info: "StepRunInfo", environment: Dict[str, str]
) -> None:
"""Runs an isolated step on Kubernetes. # ← Wrong platformIssue: The docstring says "Kubernetes" but this is the Vertex orchestrator, not Kubernetes. Recommendation: """Runs an isolated step on Vertex AI."""🟡 Testing & Documentation5. Missing test coverageThe PR description states:
Issue: No mention of tests for the new dynamic pipeline functionality. The new methods ( Recommendation: Add tests covering:
6. Incomplete PR descriptionIssue: The PR template is not filled out. The description just says "_I implemented/fixed _ to achieve ." without actual details. Recommendation: Fill out the PR description with:
🟢 Minor ObservationsCode style compliance✅ Code follows ZenML style guidelines Refactoring quality✅ Good extraction of common code ( Logic correctness✅ Proper handling of credentials and authentication Recommendations SummaryMust Fix (before merge):
Should Fix:
Nice to Have:
Overall AssessmentThis is a solid refactoring that adds important functionality while improving code reuse. The main concern is the security issue with logging the full job request in debug mode, which should be addressed before merge. The code quality is generally good and follows ZenML standards with minor exceptions noted above. Recommendation: Request changes to address the security logging issue, then approve after fixes. |
b49498e to
3a0cf14
Compare
Documentation Link Check Results❌ Absolute links check failed |
3a0cf14 to
3cc5620
Compare
3cc5620 to
3b9d1f0
Compare
| # Monitors the long-running operation by polling the job state | ||
| # periodically, and retries the polling when a transient connectivity | ||
| # issue is encountered. | ||
| # | ||
| # Long-running operation monitoring: | ||
| # The possible states of "get job" response can be found at | ||
| # https://cloud.google.com/ai-platform/training/docs/reference/rest/v1/projects.jobs#State | ||
| # where SUCCEEDED/FAILED/CANCELED are considered to be final states. | ||
| # The following logic will keep polling the state of the job until | ||
| # the job enters a final state. | ||
| # | ||
| # During the polling, if a connection error was encountered, the GET | ||
| # request will be retried by recreating the Python API client to | ||
| # refresh the lifecycle of the connection being used. See | ||
| # https://github.com/googleapis/google-api-python-client/issues/218 | ||
| # for a detailed description of the problem. If the error persists for | ||
| # _CONNECTION_ERROR_RETRY_LIMIT consecutive attempts, the function | ||
| # will raise ConnectionError. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please keep comments like these? I know you're not a fan of code comments that are redundant, but this is the opposite of that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would say that 95% of the words are boilerplate and not useful at all. This is the most standard and basic way to monitor something, which is to poll a state and sleep. And it has retries if a request fails. There is no magic to it. I'll keep the two links though in a comment if you think they're useful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The first link doesn't even work anymore, and you can literally see the values by clicking on the Enum in your code editor where you're reading the code.
src/zenml/integrations/gcp/utils.py
Outdated
| client = get_job_service_client( | ||
| credentials_source=credentials_source, client_options=client_options | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here and everywhere else you need to fetch credentials and configure clients, you have to make a few changes to account for the fact that now you need more clients and credentials may need to be valid for far longer:
- keep a cache of these clients and reuse them across steps instead of creating new ones every time you need one
- if they are based on service connector, make sure to check the service connector expiry every time you use the client and re-create the client if the credentials expired
If you think it helps, you can check the sagemaker orchestrator for a similar pattern.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think all of them are one-time use apart from the monitoring, where I implemented the retry functionality if the connector expired. You're right though, I think we can avoid one recreation of the client, I will update the implementation
| _, project_id = self._get_authentication() | ||
| client = self.get_job_service_client() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this ends up calling _get_authentication twice, which has the unfortunate effect that it uses the service connector twice to generate credentials. If you need the project ID, you should perhaps store it as a local variable and set it when you call get_job_service_client
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's now caching the credentials and the client in all the places. I think it's quite ugly, and IMO should be responsibility of the service connector. But I don't think I'll be touching that as part of this PR.
src/zenml/integrations/gcp/orchestrators/vertex_orchestrator.py
Outdated
Show resolved
Hide resolved
| pipeline_status = ExecutionStatus.FAILED | ||
|
|
||
| if run.snapshot and run.snapshot.is_dynamic: | ||
| status = aiplatform.CustomJob.get(run_id).state |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isn't there a way to use the client object returned by get_job_service_client at least here, for dynamic pipelines, instead of relying on this global aiplatform state? I really don't trust the aiplatform global state to do the right thing here, especially in situations where multiple ochestrator instances are created in the server.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually found a way to use the aiplatform API without doing the global initialization, as both the PipelineJob as well as the CustomJob allow passing region, project and credentials in their initialization. I'm not sure why we even used the global initializer before. That should be fine as well, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure. Caching the credentials sounds okay, but it should be the client that is actually cached instead of passing the credentials to aiplatform.CustomJob.get, because the client also keeps a list of open connections or a connection pool that you want to reuse across client calls. I think calls like aiplatform.CustomJob.get end up creating new clients or connections under the hood, which makes them inefficient if you make multiple calls to it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yup, if you follow the bread crumbs, you'll find that aiplatform.CustomJob.get eventually creates a temporary one-shot client (see below).

this makes calls like aiplatform.CustomJob.get very inefficient if called multiple times in a short period of time, but I don't think that's your case. It looks like you're only calling it once per process, which should be okay.
the only case in which this stops being okay is if the Vertex orchestrator is used in a Deployment to run multiple pipelines in different threads, but that is something of a sci-fi scenario for now at least :)
src/zenml/integrations/gcp/step_operators/vertex_step_operator.py
Outdated
Show resolved
Hide resolved
c768d17 to
0e4c865
Compare
0e4c865 to
1e36533
Compare
Describe changes
This PR adds support for dynamic pipelines to the Vertex orchestrator.
Pre-requisites
Please ensure you have done the following:
developand the open PR is targetingdevelop. If your branch wasn't based on develop read Contribution guide on rebasing branch to develop.Types of changes