Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Support Tensorflow distributed training in kfp workflow #3000

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions elyra/kfp/bootstrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,18 @@ def parse_arguments(cls, args) -> dict:
help="Pipeline name",
required=True,
)
parser.add_argument(
"--op-name",
dest="op-name",
help="operation name",
required=True,
)
parser.add_argument(
"--rank",
dest="rank",
help="rank for distributed training",
required=False,
)
parsed_args = vars(parser.parse_args(args))

# set pipeline name as global
Expand Down Expand Up @@ -675,6 +687,20 @@ def main():
)
# Setup packages and gather arguments
input_params = OpUtil.parse_arguments(sys.argv[1:])
# Set runtime PipelineParam "rank" into env:
if input_params["rank"]:
op_name = input_params["op-name"]
# FIXME: operation name will be updated by kfp, replace these chars for matching.
op_name = op_name.replace("_", "-")
os.environ["RANK"] = input_params["rank"]
nranks = os.getenv("NRANKS")
if not nranks:
raise ValueError("rank argument setted but no NRANKS env found!")
# NOTE: import kfpdist only when needed, to be compatible with normal elyra pipelines.
from kfpdist import set_dist_train_config

set_dist_train_config(int(input_params["rank"]), int(nranks), op_name, port=9888)

OpUtil.log_operation_info("starting operation")
t0 = time.time()
OpUtil.package_install(user_volume_path=input_params.get("user-volume-path"))
Expand Down
54 changes: 53 additions & 1 deletion elyra/kfp/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from typing import List
from typing import Optional

from kfp.dsl import ContainerOp
from kfp.dsl import ContainerOp, PipelineParam
from kfp.dsl import RUN_ID_PLACEHOLDER
from kubernetes.client.models import V1EmptyDirVolumeSource
from kubernetes.client.models import V1EnvVar
Expand Down Expand Up @@ -68,6 +68,7 @@ def __init__(
self,
pipeline_name: str,
experiment_name: str,
op_name: str,
notebook: str,
cos_endpoint: str,
cos_bucket: str,
Expand All @@ -85,12 +86,14 @@ def __init__(
mem_request: Optional[str] = None,
gpu_limit: Optional[str] = None,
workflow_engine: Optional[str] = "argo",
rank: Optional[PipelineParam] = None,
**kwargs,
):
"""Create a new instance of ContainerOp.
Args:
pipeline_name: pipeline that this op belongs to
experiment_name: the experiment where pipeline_name is executed
op_name: original operation name
notebook: name of the notebook that will be executed per this operation
cos_endpoint: object storage endpoint e.g weaikish1.fyre.ibm.com:30442
cos_bucket: bucket to retrieve archive from
Expand All @@ -115,6 +118,7 @@ def __init__(

self.pipeline_name = pipeline_name
self.pipeline_version = pipeline_version
self.op_name = op_name
self.pipeline_source = pipeline_source
self.experiment_name = experiment_name
self.notebook = notebook
Expand All @@ -134,6 +138,7 @@ def __init__(
self.cpu_request = cpu_request
self.mem_request = mem_request
self.gpu_limit = gpu_limit
self.rank = rank

argument_list = []

Expand Down Expand Up @@ -197,16 +202,21 @@ def __init__(
f"curl {common_curl_options} -L {self.python_pip_config_url} --output pip.conf && cd .. &&"
)

rank_argument = ""
if self.rank is not None:
rank_argument = f'--rank "{self.rank}" '
argument_list.append(
f"python3 -m pip install {self.python_user_lib_path_target} packaging && "
"python3 -m pip freeze > requirements-current.txt && "
"python3 bootstrapper.py "
f'--pipeline-name "{self.pipeline_name}" '
f'--op-name "{self.op_name}" '
f"--cos-endpoint {self.cos_endpoint} "
f"--cos-bucket {self.cos_bucket} "
f'--cos-directory "{self.cos_directory}" '
f'--cos-dependencies-archive "{self.cos_dependencies_archive}" '
f'--file "{self.notebook}" '
f"{rank_argument} "
)

if self.pipeline_inputs:
Expand All @@ -231,6 +241,48 @@ def __init__(
for key, value in self.pipeline_envs.items(): # Convert dict entries to format kfp needs
self.container.add_env_variable(V1EnvVar(name=key, value=value))

# Add KFP general envs.
self.container.add_env_variable(
V1EnvVar(
name="WORKFLOW_ID",
value_from=V1EnvVarSource(
field_ref=V1ObjectFieldSelector(
api_version="v1", field_path="metadata.labels['workflows.argoproj.io/workflow']"
)
),
)
)
self.container.add_env_variable(
V1EnvVar(
name="KFP_NAMESPACE",
value_from=V1EnvVarSource(
field_ref=V1ObjectFieldSelector(api_version="v1", field_path="metadata.namespace")
),
)
)
self.container.add_env_variable(
V1EnvVar(
name="KFP_POD_NAME",
value_from=V1EnvVarSource(
field_ref=V1ObjectFieldSelector(api_version="v1", field_path="metadata.name")
),
)
)
self.container.add_env_variable(
V1EnvVar(
name="KFP_POD_UID",
value_from=V1EnvVarSource(field_ref=V1ObjectFieldSelector(api_version="v1", field_path="metadata.uid")),
)
)
self.container.add_env_variable(
V1EnvVar(
name="KFP_RUN_ID",
value_from=V1EnvVarSource(
field_ref=V1ObjectFieldSelector(api_version="v1", field_path="metadata.labels['pipeline/runid']")
),
)
)

# If crio volume size is found then assume kubeflow pipelines environment is using CRI-o as
# its container runtime
if self.emptydir_volume_size:
Expand Down
124 changes: 85 additions & 39 deletions elyra/pipeline/kfp/processor_kfp.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from kfp import Client as ArgoClient
from kfp import compiler as kfp_argo_compiler
from kfp import components as components
from kfp.dsl import PipelineConf
from kfp.dsl import ParallelFor, PipelineConf
from kfp.aws import use_aws_secret # noqa H306
from kubernetes import client as k8s_client
from kubernetes.client import V1EmptyDirVolumeSource
Expand Down Expand Up @@ -530,53 +530,99 @@ def _cc_pipeline(
# If operation is one of the "generic" set of NBs or scripts, construct custom ExecuteFileOp
if isinstance(operation, GenericOperation):
component = ComponentCache.get_generic_component_from_op(operation.classifier)

# Collect env variables
pipeline_envs = self._collect_envs(
operation, cos_secret=cos_secret, cos_username=cos_username, cos_password=cos_password
)

operation_artifact_archive = self._get_dependency_archive_name(operation)

self.log.debug(
f"Creating pipeline component archive '{operation_artifact_archive}' for operation '{operation}'"
)

container_op = ExecuteFileOp(
name=sanitized_operation_name,
pipeline_name=pipeline_name,
experiment_name=experiment_name,
notebook=operation.filename,
cos_endpoint=cos_endpoint,
cos_bucket=cos_bucket,
cos_directory=artifact_object_prefix,
cos_dependencies_archive=operation_artifact_archive,
pipeline_version=pipeline_version,
pipeline_source=pipeline.source,
pipeline_inputs=operation.inputs,
pipeline_outputs=operation.outputs,
pipeline_envs=pipeline_envs,
emptydir_volume_size=emptydir_volume_size,
cpu_request=operation.cpu,
mem_request=operation.memory,
gpu_limit=operation.gpu,
workflow_engine=engine,
image=operation.runtime_image,
file_outputs={
"mlpipeline-metrics": f"{pipeline_envs['ELYRA_WRITABLE_CONTAINER_DIR']}/mlpipeline-metrics.json", # noqa
"mlpipeline-ui-metadata": f"{pipeline_envs['ELYRA_WRITABLE_CONTAINER_DIR']}/mlpipeline-ui-metadata.json", # noqa
},
)
distributed_count = int(operation.distributed_training)
# rank is a PipelineParam, can only used as ContainerOp's arguments
# see https://kubeflow-pipelines.readthedocs.io/en/stable/source/kfp.dsl.html#kfp.dsl.ParallelFor
if distributed_count > 1:
with ParallelFor(list(range(distributed_count))) as rank:
# set "nranks" as env, then set "rank" to env at runtime.
pipeline_envs["NRANKS"] = int(distributed_count)
container_op = ExecuteFileOp(
name=sanitized_operation_name,
op_name=operation.name,
pipeline_name=pipeline_name,
experiment_name=experiment_name,
notebook=operation.filename,
cos_endpoint=cos_endpoint,
cos_bucket=cos_bucket,
cos_directory=artifact_object_prefix,
cos_dependencies_archive=operation_artifact_archive,
pipeline_version=pipeline_version,
pipeline_source=pipeline.source,
pipeline_inputs=operation.inputs,
pipeline_outputs=operation.outputs,
pipeline_envs=pipeline_envs,
emptydir_volume_size=emptydir_volume_size,
cpu_request=operation.cpu,
mem_request=operation.memory,
gpu_limit=operation.gpu,
workflow_engine=engine,
rank=rank,
image=operation.runtime_image,
file_outputs={
"mlpipeline-metrics": f"{pipeline_envs['ELYRA_WRITABLE_CONTAINER_DIR']}/mlpipeline-metrics.json", # noqa
"mlpipeline-ui-metadata": f"{pipeline_envs['ELYRA_WRITABLE_CONTAINER_DIR']}/mlpipeline-ui-metadata.json", # noqa
},
)

if cos_secret and not export:
container_op.apply(use_aws_secret(cos_secret))

image_namespace = self._get_metadata_configuration(RuntimeImages.RUNTIME_IMAGES_SCHEMASPACE_ID)
for image_instance in image_namespace:
if image_instance.metadata[
"image_name"
] == operation.runtime_image and image_instance.metadata.get(
"pull_policy"
): # noqa
container_op.container.set_image_pull_policy(image_instance.metadata["pull_policy"])
else:
container_op = ExecuteFileOp(
name=sanitized_operation_name,
op_name=operation.name,
pipeline_name=pipeline_name,
experiment_name=experiment_name,
notebook=operation.filename,
cos_endpoint=cos_endpoint,
cos_bucket=cos_bucket,
cos_directory=artifact_object_prefix,
cos_dependencies_archive=operation_artifact_archive,
pipeline_version=pipeline_version,
pipeline_source=pipeline.source,
pipeline_inputs=operation.inputs,
pipeline_outputs=operation.outputs,
pipeline_envs=pipeline_envs,
emptydir_volume_size=emptydir_volume_size,
cpu_request=operation.cpu,
mem_request=operation.memory,
gpu_limit=operation.gpu,
workflow_engine=engine,
image=operation.runtime_image,
file_outputs={
"mlpipeline-metrics": f"{pipeline_envs['ELYRA_WRITABLE_CONTAINER_DIR']}/mlpipeline-metrics.json", # noqa
"mlpipeline-ui-metadata": f"{pipeline_envs['ELYRA_WRITABLE_CONTAINER_DIR']}/mlpipeline-ui-metadata.json", # noqa
},
)

if cos_secret and not export:
container_op.apply(use_aws_secret(cos_secret))
if cos_secret and not export:
container_op.apply(use_aws_secret(cos_secret))

image_namespace = self._get_metadata_configuration(RuntimeImages.RUNTIME_IMAGES_SCHEMASPACE_ID)
for image_instance in image_namespace:
if image_instance.metadata["image_name"] == operation.runtime_image and image_instance.metadata.get(
"pull_policy"
):
container_op.container.set_image_pull_policy(image_instance.metadata["pull_policy"])
image_namespace = self._get_metadata_configuration(RuntimeImages.RUNTIME_IMAGES_SCHEMASPACE_ID)
for image_instance in image_namespace:
if image_instance.metadata[
"image_name"
] == operation.runtime_image and image_instance.metadata.get(
"pull_policy"
): # noqa
container_op.container.set_image_pull_policy(image_instance.metadata["pull_policy"])

self.log_pipeline_info(
pipeline_name,
Expand Down Expand Up @@ -865,4 +911,4 @@ def __init__(self, run_id, run_url, object_storage_url, object_storage_path):
def to_json(self):
response = super().to_json()
response["run_id"] = self.run_id
return response
return response
6 changes: 6 additions & 0 deletions elyra/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ def __init__(
cpu: number of cpus requested to run the operation
memory: amount of memory requested to run the operation (in Gi)
gpu: number of gpus requested to run the operation
distributed_training: number of workers to run the step as distributed training.
Entries for other (non-built-in) component types are a function of the respective component.

:param elyra_params: dictionary of parameter key:value pairs that are owned by Elyra
Expand All @@ -272,6 +273,7 @@ def __init__(
self._component_params["cpu"] = component_params.get("cpu")
self._component_params["gpu"] = component_params.get("gpu")
self._component_params["memory"] = component_params.get("memory")
self._component_params["distributed_training"] = component_params.get("distributed_training")

if not elyra_params:
elyra_params = {}
Expand Down Expand Up @@ -319,6 +321,10 @@ def memory(self) -> Optional[str]:
def gpu(self) -> Optional[str]:
return self._component_params.get("gpu")

@property
def distributed_training(self) -> Optional[str]:
return self._component_params.get("distributed_training")

def __eq__(self, other: GenericOperation) -> bool:
if isinstance(self, other.__class__):
return super().__eq__(other)
Expand Down
7 changes: 7 additions & 0 deletions elyra/templates/components/generic_properties_template.jinja2
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,13 @@
"description": "Recursively include subdirectories when submitting a pipeline (This may increase submission time).",
"default": false
},
"distributed_training": {
"type": "integer",
"title": "Distributed Training",
"description": "Number of workers to run distributed training. Supported framework Tensorflow(Pytorch comming soon). Use env RANK, NRANKS in your program if needed.",
"default": 1,
"minimum": 1
},
"outputs_header": {
"type": "null",
"title": "Outputs",
Expand Down