Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/pipelines parametrization #180

Merged
merged 13 commits into from
Feb 3, 2025
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ didn't alter the remote pipeline execution, and only escaped the local Python pr
with the proper remote pipeline execution handling, and possibly per-task timeout enabled by [the new kfp feature](https://github.com/kubeflow/pipelines/pull/10481).
- Assign pipelines to Vertex AI experiments
- Migrated `pydantic` library to v2
- Added pipeline parametrization
- Migrated to `actions/upload-artifact@v4` in the Github Actions

## [0.11.1] - 2024-07-01
Expand Down
17 changes: 14 additions & 3 deletions kedro_vertexai/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,21 @@ def ui(ctx) -> None:
"-o",
"--output",
type=str,
default="pipeline.json",
help="Pipeline JSON definition file.",
default="pipeline.yaml",
help="Pipeline YAML definition file.",
)
@click.option(
"--params",
type=str,
default="",
help="""
Pipeline parameters to be specified at run time.
In a format <param name≥:<param type>, for example test_param:int.
Should be separated by comma.
""",
)
@click.pass_context
def compile(ctx, image, pipeline, output) -> None:
def compile(ctx, image, pipeline, output, params) -> None:
"""Translates Kedro pipeline into JSON file with VertexAI pipeline definition"""
context_helper = ctx.obj["context_helper"]
config = context_helper.config.run_config
Expand All @@ -187,6 +197,7 @@ def compile(ctx, image, pipeline, output) -> None:
pipeline=pipeline,
image=image if image else config.image,
output=output,
params=params,
)


Expand Down
14 changes: 6 additions & 8 deletions kedro_vertexai/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import logging
import os
from tempfile import NamedTemporaryFile
from typing import Any, Dict, Optional
from typing import Any, Dict, List, Optional

from google.cloud import aiplatform as aip
from google.cloud.aiplatform import PipelineJob
Expand Down Expand Up @@ -92,21 +92,19 @@ def _generate_run_name(self, config: PluginConfig): # noqa
dt.datetime.utcnow().strftime("%Y%m%d%H%M%S")
)

def compile(
self,
pipeline,
image,
output,
):
def compile(self, pipeline, image, output, params: List[str] = []):
"""
Creates json file in given local output path
:param pipeline:
:param image:
:param output:
:param params: Pipeline parameters to be specified at run time.
:return:
"""
token = os.getenv("MLFLOW_TRACKING_TOKEN", "")
pipeline_func = self.generator.generate_pipeline(pipeline, image, token)
pipeline_func = self.generator.generate_pipeline(
pipeline, image, token, params=params
)
Compiler().compile(
pipeline_func=pipeline_func,
package_path=output,
Expand Down
54 changes: 45 additions & 9 deletions kedro_vertexai/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@
import json
import logging
import os
from typing import Dict, Union
from typing import Dict, List, Union # noqa

from kedro.framework.context import KedroContext
from kfp import dsl
from kfp.dsl import PipelineTask
from makefun import with_signature

from kedro_vertexai.config import (
KedroVertexAIRunnerConfig,
Expand Down Expand Up @@ -53,15 +54,25 @@ def get_pipeline_name(self):
"""
return self.project_name.lower().replace(" ", "-").replace("_", "-")

def generate_pipeline(self, pipeline, image, token):
def _generate_params_signature(self, params: str) -> str:
params = params.split(",") if len(params) > 0 else []

params_signature = ", ".join(
[f"{param.split(':')[0]}: {param.split(':')[1]}" for param in params]
)
return params_signature

def generate_pipeline(self, pipeline, image, token, params: str = ""):
"""
This method return @dsl.pipeline annotated function that contains
dynamically generated pipelines.
:param pipeline: kedro pipeline
:param image: full docker image name
:param token: mlflow authentication token
:param params: Pipeline parameters to be specified at run time.
:return: kfp pipeline function
"""
params_signature = self._generate_params_signature(params)

def set_dependencies(
node_name, dependencies, kfp_tasks: Dict[str, PipelineTask]
Expand All @@ -75,13 +86,16 @@ def set_dependencies(
name=self.get_pipeline_name(),
description=self.run_config.description,
)
def convert_kedro_pipeline_to_kfp() -> None:
@with_signature(f"pipeline({params_signature}) -> None")
def convert_kedro_pipeline_to_kfp(*args, **kwargs) -> None:
from kedro.framework.project import pipelines

node_dependencies = pipelines[pipeline].node_dependencies
grouping = self.grouping.group(node_dependencies)

kfp_tasks = self._build_kfp_tasks(grouping, image, pipeline, token)
kfp_tasks = self._build_kfp_tasks(
grouping, image, pipeline, token, kwargs, params_signature
)
for group_name, dependencies in grouping.dependencies.items():
set_dependencies(group_name, dependencies, kfp_tasks)

Expand Down Expand Up @@ -118,12 +132,24 @@ def mlflow_start_run(mlflow_run_id: dsl.OutputPath(str)):

return mlflow_start_run()

def _add_mlflow_param_to_signature(self, params_signature: str) -> str:
mlflow_signature = "mlflow_run_id: Union[str, None] = None"

params_signature = (
f"{params_signature}, {mlflow_signature}"
if len(params_signature) > 0
else mlflow_signature
)
return params_signature

def _build_kfp_tasks(
self,
node_grouping: Grouping,
image,
pipeline,
tracking_token=None,
params: List[str] = [],
params_signature: str = "",
) -> Dict[str, PipelineTask]:
"""Build kfp container graph from Kedro node dependencies."""
kfp_tasks = {}
Expand All @@ -136,13 +162,16 @@ def _build_kfp_tasks(
image, should_add_params
)

params_signature = self._add_mlflow_param_to_signature(params_signature)

for group_name, nodes_group in node_grouping.nodes_mapping.items():
name = clean_name(group_name)
tags = {tag for tagging in nodes_group for tag in tagging.tags}

component_params = (
mlflow_params = (
kfp_tasks["mlflow-start-run"].outputs if mlflow_enabled else {}
)
component_params = {**mlflow_params, **params}

runner_config = KedroVertexAIRunnerConfig(storage_root=self.run_config.root)

Expand Down Expand Up @@ -172,16 +201,23 @@ def _build_kfp_tasks(
).strip()

@dsl.container_component
def component(mlflow_run_id: Union[str, None] = None):
@with_signature(f"{name.replace('-', '_')}({params_signature})")
def component(*args, **kwargs):
dynamic_parameters = ",".join(
[f"{k}={kwargs[k]}" for k in params.keys()]
)

return dsl.ContainerSpec(
image=image,
command=["/bin/bash", "-c"],
args=[node_command],
args=[
node_command,
" --params", # TODO what if there is no dynamic params?
f" {dynamic_parameters}",
],
)

task = component(**component_params)
task.component_spec.name = name
task.set_display_name(name)
self._configure_resources(name, tags, task)
kfp_tasks[name] = task

Expand Down
13 changes: 12 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ cachetools = ">=3.0,<6.0"
google-cloud-aiplatform = {extras = ["metadata"], version = "^1.59.0"}
cloudpickle = "^3.0.0"
mlflow = "^2.14.3"
makefun = "^1.15.6"

[tool.poetry.extras]
mlflow = ["kedro-mlflow"]
Expand Down
4 changes: 1 addition & 3 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,7 @@ def test_compile(self):

assert result.exit_code == 0
context_helper.vertexai_client.compile.assert_called_with(
image="img",
output="output",
pipeline="pipe",
image="img", output="output", pipeline="pipe", params=""
)

def test_store_params_empty(self):
Expand Down
Loading
Loading