Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate to kfp 2 #170

Merged
merged 30 commits into from
Jul 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
32df203
Vertex AI pipelines client: list pipelines.
sfczekalski Jul 12, 2024
d7ba232
Very initial version of run once.
sfczekalski Jul 15, 2024
6550422
Pass parameters to pipeline job.
sfczekalski Jul 15, 2024
c9b1dff
Add type annotation to task passed to _configure_resources.
sfczekalski Jul 15, 2024
a159230
Remove image pull policy parameter as it's not supported.
sfczekalski Jul 15, 2024
76b6c6b
Mlflow component.
sfczekalski Jul 16, 2024
89499ce
Add mlflow component and pass run id to downstream components.
sfczekalski Jul 16, 2024
1bf52fb
Rename ops to tasks.
sfczekalski Jul 17, 2024
e5eb306
Wait for completion.
sfczekalski Jul 17, 2024
300ee39
Remove io.py file as it's not used anyway.
sfczekalski Jul 17, 2024
7783275
Pre-commit fixes.
sfczekalski Jul 17, 2024
100af95
Test if resources are correctly added to the pipeline spec.
sfczekalski Jul 19, 2024
4a63027
Test that resources section is not added to spec if it wasn't specified.
sfczekalski Jul 19, 2024
ce99cdb
Test grouping nodes.
sfczekalski Jul 22, 2024
d049d3d
Test adds mlflow task.
sfczekalski Jul 22, 2024
8aadde5
Test runner and runner config in args.
sfczekalski Jul 22, 2024
1ba7238
Finish other tests.
sfczekalski Jul 22, 2024
7e704e8
Add unittest.skip to test_should_remove_old_schedule for now.
sfczekalski Jul 22, 2024
2200a87
List pipelines test.
sfczekalski Jul 23, 2024
d084e76
Test compile.
sfczekalski Jul 23, 2024
a9401e2
Patch mlflow is enabled when setting mlflow tags.
sfczekalski Jul 23, 2024
a67b2bf
Update test_run_once_with_wait test.
sfczekalski Jul 23, 2024
1c9baf9
Update changelog.
sfczekalski Jul 23, 2024
fa74bc0
Remove image_pull_policy from the config and add the description of t…
sfczekalski Jul 24, 2024
fad9c51
Remove 'layer' parameter from datasets as it's deprecated.
sfczekalski Jul 24, 2024
9c4792c
Changelog: add description of the --timeout-seconds flag removal.
sfczekalski Jul 24, 2024
521ca3d
Soften kfp version requirement, modify one import so that it works us…
sfczekalski Jul 24, 2024
e050ad1
Remove commented-out line.
sfczekalski Jul 24, 2024
a56d47a
Remove passing image_pull_policy to compile.
sfczekalski Jul 24, 2024
a0923fa
Update python version in cicd.
sfczekalski Jul 25, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/prepare-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: [3.8]
python-version: [3.10]
env:
PYTHON_PACKAGE: kedro_vertexai
steps:
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/test_and_publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ jobs:
strategy:
matrix:
e2e_case: ["standard", "grouping"]
python-version: ['3.8'] # todo update python
python-version: ['3.10']
env:
PROJECT_NAME: kedro-vertexai
IMAGE_REGISTRY: gcr.io/gid-ml-ops-sandbox
Expand Down Expand Up @@ -191,7 +191,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ['3.8']
python-version: ['3.10']
env:
PYTHON_PACKAGE: kedro_vertexai
steps:
Expand Down
9 changes: 8 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
# Changelog

## [Unreleased]
## [Unreleased] 2024-07-23

- Migrated to kfp 2
- Removed `image_pull_policy` parameter from configuration, as it only applies to Kubernetes backend and not Vertex AI,
and it's only available in `kfp-kubernetes` extension package
- Removed `--timeout-seconds` parameter from `run-once` command for now, as in the old version of the plugin exceeding the specified time
didn't alter the remote pipeline execution, and only escaped the local Python processs. The timeout funcionality will be added later on,
with the proper remote pipeline execution handling, and possibly per-task timeout enabled by [the new kfp feature](https://github.com/kubeflow/pipelines/pull/10481).

## [0.11.1] - 2024-07-01

Expand Down
4 changes: 0 additions & 4 deletions docs/source/02_installation/02_configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,6 @@ run_config:
# Name of the image to run as the pipeline steps
image: eu.gcr.io/my-gcp-mlops-project/example_model:${oc.env:KEDRO_CONFIG_COMMIT_ID}

# Pull policy to be used for the steps. Use Always if you push the images
# on the same tag, or Never if you use only local images
image_pull_policy: IfNotPresent

# Location of Vertex AI GCS root
root: bucket_name/gcs_suffix

Expand Down
3 changes: 0 additions & 3 deletions docs/source/03_getting_started/01_quickstart.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,17 +108,14 @@ Adjusted `catalog.yml` should look like this (note: remove the rest of the entri
companies:
type: pandas.CSVDataSet
filepath: data/01_raw/companies.csv
layer: raw

reviews:
type: pandas.CSVDataSet
filepath: data/01_raw/reviews.csv
layer: raw

shuttles:
type: pandas.ExcelDataSet
filepath: data/01_raw/shuttles.xlsx
layer: raw
```

All intermediate and output data will be stored in the location with the following pattern:
Expand Down
32 changes: 3 additions & 29 deletions kedro_vertexai/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@

from .client import VertexAIPipelinesClient
from .config import PluginConfig, RunConfig
from .constants import KEDRO_VERTEXAI_BLOB_TEMP_DIR_NAME, VERTEXAI_RUN_ID_TAG
from .constants import VERTEXAI_RUN_ID_TAG
from .context_helper import ContextHelper
from .data_models import PipelineResult
from .utils import (
docker_build,
docker_push,
Expand Down Expand Up @@ -98,14 +97,6 @@ def list_pipelines(ctx):
help="Parameters override in form of `key=value`",
)
@click.option("--wait-for-completion", type=bool, is_flag=True, default=False)
@click.option(
"--timeout-seconds",
type=int,
default=1800,
help="If --wait-for-completion is used, "
"this option sets timeout after which the plugin will return non-zero exit code "
"if the pipeline does not finish in time",
)
@click.pass_context
def run_once(
ctx: Context,
Expand All @@ -115,7 +106,6 @@ def run_once(
pipeline: str,
params: list,
wait_for_completion: bool,
timeout_seconds: int,
):
"""Deploy pipeline as a single run within given experiment
Config can be specified in kubeflow.yml as well."""
Expand Down Expand Up @@ -143,29 +133,14 @@ def run_once(
Consider using '--auto-build' parameter."
)

run = client.run_once(
job = client.run_once(
pipeline=pipeline,
image=image,
image_pull_policy=config.image_pull_policy,
parameters=format_params(params),
)

click.echo(
f"Intermediate data datasets will be stored in {os.linesep}"
f"gs://{config.root.strip('/')}/{KEDRO_VERTEXAI_BLOB_TEMP_DIR_NAME}/{run['displayName']}/*.bin"
)

if wait_for_completion:
result: PipelineResult = client.wait_for_completion(
timeout_seconds
) # blocking call
if result.is_success:
logger.info("Pipeline finished successfully!")
exit_code = 0
else:
logger.error(f"Pipeline finished with status: {result.state}")
exit_code = 1
ctx.exit(exit_code)
job.wait()


@vertexai_group.command()
Expand Down Expand Up @@ -210,7 +185,6 @@ def compile(ctx, image, pipeline, output) -> None:

context_helper.vertexai_client.compile(
pipeline=pipeline,
image_pull_policy=config.image_pull_policy,
image=image if image else config.image,
output=output,
)
Expand Down
106 changes: 19 additions & 87 deletions kedro_vertexai/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,17 @@
import json
import logging
import os
import threading
from queue import Empty, Queue
from tempfile import NamedTemporaryFile
from time import sleep

from google.cloud import aiplatform as aip
from google.cloud.aiplatform import PipelineJob
from google.cloud.scheduler_v1.services.cloud_scheduler import (
CloudSchedulerClient,
)
from kfp.v2 import compiler
from kfp.v2.google.client import AIPlatformClient
from kfp import compiler
from tabulate import tabulate

from .config import PluginConfig
from .data_models import PipelineResult, PipelineStatus
from .generator import PipelineGenerator


Expand All @@ -32,9 +29,7 @@ class VertexAIPipelinesClient:

def __init__(self, config: PluginConfig, project_name, context):

self.api_client = AIPlatformClient(
project_id=config.project_id, region=config.region
)
aip.init(project=config.project_id, location=config.region)
self.cloud_scheduler_client = CloudSchedulerClient()
self.location = f"projects/{config.project_id}/locations/{config.region}"
self.run_config = config.run_config
Expand All @@ -46,58 +41,50 @@ def list_pipelines(self):
List all the jobs (current and historical) on Vertex AI Pipelines
:return:
"""
list_jobs_response = self.api_client.list_jobs()
self.log.debug(list_jobs_response)

jobs_key = "pipelineJobs"
headers = ["Name", "ID"]
data = (
map(
lambda x: [x.get("displayName"), x["name"]],
list_jobs_response[jobs_key],
)
if jobs_key in list_jobs_response
else []
)

list_jobs_response = aip.PipelineJob.list()
data = [(x.display_name, x.name) for x in list_jobs_response]

return tabulate(data, headers=headers)

def run_once(
self,
pipeline,
image,
image_pull_policy="IfNotPresent",
parameters=None,
):
) -> PipelineJob:
"""
Runs the pipeline in Vertex AI Pipelines
:param pipeline:
:param image:
:param image_pull_policy:
:param parameters:
:return:
"""
with NamedTemporaryFile(
mode="rt", prefix="kedro-vertexai", suffix=".json"
mode="rt", prefix="kedro-vertexai", suffix=".yaml"
) as spec_output:
self.compile(
pipeline,
image,
output=spec_output.name,
image_pull_policy=image_pull_policy,
)

run = self.api_client.create_run_from_job_spec(
service_account=self.run_config.service_account,
job_spec_path=spec_output.name,
job = aip.PipelineJob(
display_name=self.run_name,
template_path=spec_output.name,
job_id=self.run_name,
pipeline_root=f"gs://{self.run_config.root}",
parameter_values=parameters or {},
enable_caching=False,
)

job.submit(
service_account=self.run_config.service_account,
network=self.run_config.network.vpc,
)
self.log.debug("Run created %s", str(run))

return run
return job

def _generate_run_name(self, config: PluginConfig): # noqa
return config.run_config.experiment_name.rstrip("-") + "-{}".format(
Expand All @@ -109,20 +96,16 @@ def compile(
pipeline,
image,
output,
image_pull_policy="IfNotPresent",
):
"""
Creates json file in given local output path
:param pipeline:
:param image:
:param output:
:param image_pull_policy:
:return:
"""
token = os.getenv("MLFLOW_TRACKING_TOKEN", "")
pipeline_func = self.generator.generate_pipeline(
pipeline, image, image_pull_policy, token
)
pipeline_func = self.generator.generate_pipeline(pipeline, image, token)
compiler.Compiler().compile(
pipeline_func=pipeline_func,
package_path=output,
Expand Down Expand Up @@ -170,7 +153,6 @@ def schedule(
pipeline,
self.run_config.image,
output=spec_output.name,
image_pull_policy=image_pull_policy,
)
self.api_client.create_schedule_from_job_spec(
job_spec_path=spec_output.name,
Expand All @@ -182,53 +164,3 @@ def schedule(
)

self.log.info("Pipeline scheduled to %s", cron_expression)

def wait_for_completion(
self,
max_timeout_seconds,
interval_seconds=30.0,
max_api_fails=5,
) -> PipelineResult:
termination_statuses = (
PipelineStatus.PIPELINE_STATE_FAILED,
PipelineStatus.PIPELINE_STATE_SUCCEEDED,
PipelineStatus.PIPELINE_STATE_CANCELLED,
)

status_queue = Queue(1)

def monitor(q: Queue):
fails = 0
while fails < max_api_fails:
try:
job = self.api_client.get_job(self.run_name)
state = job["state"]
if state in termination_statuses:
q.put(
PipelineResult(
is_success=state
== PipelineStatus.PIPELINE_STATE_SUCCEEDED,
state=state,
job_data=job,
)
)
break
else:
self.log.info(f"Pipeline state: {state}")
except: # noqa: E722
fails += 1
self.log.error(
"Exception occurred while checking the pipeline status",
exc_info=True,
)
finally:
sleep(interval_seconds)
else:
q.put(PipelineResult(is_success=False, state="Internal exception"))

thread = threading.Thread(target=monitor, daemon=True, args=(status_queue,))
thread.start()
try:
return status_queue.get(timeout=max_timeout_seconds)
except Empty:
return PipelineResult(False, f"Max timeout {max_timeout_seconds}s reached")
5 changes: 0 additions & 5 deletions kedro_vertexai/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@
# Name of the image to run as the pipeline steps
image: {image}

# Pull policy to be used for the steps. Use Always if you push the images
# on the same tag, or Never if you use only local images
image_pull_policy: IfNotPresent

# Location of Vertex AI GCS root
root: bucket_name/gcs_suffix

Expand Down Expand Up @@ -199,7 +195,6 @@ class MLFlowVertexAIConfig(BaseModel):

class RunConfig(BaseModel):
image: str
image_pull_policy: Optional[str] = "IfNotPresent"
root: Optional[str]
description: Optional[str]
experiment_name: str
Expand Down
22 changes: 0 additions & 22 deletions kedro_vertexai/data_models.py

This file was deleted.

Loading
Loading