diff --git a/data-processing-lib/spark/src/data_processing_spark/transform/spark/__init__.py b/data-processing-lib/spark/src/data_processing_spark/transform/spark/__init__.py new file mode 100644 index 000000000..9a72e3503 --- /dev/null +++ b/data-processing-lib/spark/src/data_processing_spark/transform/spark/__init__.py @@ -0,0 +1 @@ +from data_processing_spark.transform.spark.pipeline_transform import SparkPipelineTransform diff --git a/data-processing-lib/spark/src/data_processing_spark/transform/spark/pipeline_transform.py b/data-processing-lib/spark/src/data_processing_spark/transform/spark/pipeline_transform.py new file mode 100644 index 000000000..c49551c6f --- /dev/null +++ b/data-processing-lib/spark/src/data_processing_spark/transform/spark/pipeline_transform.py @@ -0,0 +1,50 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +from typing import Any +from data_processing.transform import AbstractPipelineTransform +from data_processing.transform import BaseTransformRuntime + + +class SparkPipelineTransform(AbstractPipelineTransform): + """ + Transform that executes a set of base transforms sequentially. Data is passed between + participating transforms in memory + """ + + def __init__(self, config: dict[str, Any]): + """ + Initializes pipeline execution for the list of transforms + :param config - configuration parameters - list of transforms in the pipeline. + Note that transforms will be executed in the order they are defined + """ + self.partition = config.get("partition_index", 0) + super().__init__(config) + + def _get_transform_params(self, runtime: BaseTransformRuntime) -> dict[str, Any]: + """ + get transform parameters + :param runtime - runtime + :return: transform params + """ + return runtime.get_transform_config(partition=self.partition, + data_access_factory=self.data_access_factory,statistics=self.statistics) + + def _compute_execution_statistics(self, stats: dict[str, Any]) -> None: + """ + Compute execution statistics + :param stats: current statistics from flush + :return: None + """ + self.statistics.add_stats(stats) + for _, runtime in self.participants: + runtime.compute_execution_stats(stats=self.statistics) \ No newline at end of file diff --git a/transforms/universal/noop/spark/src/noop_pipeline_local_spark.py b/transforms/universal/noop/spark/src/noop_pipeline_local_spark.py new file mode 100644 index 000000000..1d7eea850 --- /dev/null +++ b/transforms/universal/noop/spark/src/noop_pipeline_local_spark.py @@ -0,0 +1,45 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import os +import sys + +from data_processing_spark.runtime.spark import SparkTransformLauncher +from data_processing.utils import ParamsUtils +from noop_pipeline_transform_spark import NOOPPypelineSparkTransformConfiguration + + +# create parameters +input_folder = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "test-data", "input")) +output_folder = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "output")) +local_conf = { + "input_folder": input_folder, + "output_folder": output_folder, +} +code_location = {"github": "github", "commit_hash": "12345", "path": "path"} +params = { + # Data access. Only required parameters are specified + "data_local_config": ParamsUtils.convert_to_ast(local_conf), + # execution info + "runtime_pipeline_id": "pipeline_id", + "runtime_job_id": "job_id", + "runtime_code_location": ParamsUtils.convert_to_ast(code_location), + # noop params + "noop_sleep_sec": 1, +} +if __name__ == "__main__": + # Set the simulated command line args + sys.argv = ParamsUtils.dict_to_req(d=params) + # create launcher + launcher = SparkTransformLauncher(runtime_config=NOOPPypelineSparkTransformConfiguration()) + # Launch the ray actor(s) to process the input + launcher.launch() diff --git a/transforms/universal/noop/spark/src/noop_pipeline_transform_spark.py b/transforms/universal/noop/spark/src/noop_pipeline_transform_spark.py new file mode 100644 index 000000000..4c3d6718e --- /dev/null +++ b/transforms/universal/noop/spark/src/noop_pipeline_transform_spark.py @@ -0,0 +1,42 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +from data_processing_spark.runtime.spark import SparkTransformLauncher, SparkTransformRuntimeConfiguration +from data_processing.transform import PipelineTransformConfiguration +from data_processing_spark.transform.spark import SparkPipelineTransform +from data_processing.utils import get_logger +from noop_transform_spark import NOOPSparkTransformConfiguration + +logger = get_logger(__name__) + + +class NOOPPypelineSparkTransformConfiguration(SparkTransformRuntimeConfiguration): + """ + Implements the PythonTransformConfiguration for NOOP as required by the PythonTransformLauncher. + NOOP does not use a RayRuntime class so the superclass only needs the base + python-only configuration. + """ + + def __init__(self): + """ + Initialization + """ + super().__init__(transform_config=PipelineTransformConfiguration( + config={"transforms": [NOOPSparkTransformConfiguration()]}, + transform_class=SparkPipelineTransform)) + + +if __name__ == "__main__": + # launcher = NOOPRayLauncher() + launcher = SparkTransformLauncher(NOOPPypelineSparkTransformConfiguration()) + logger.info("Launching resize/noop transform") + launcher.launch() diff --git a/transforms/universal/noop/spark/test/test_noop_pipeline_spark.py b/transforms/universal/noop/spark/test/test_noop_pipeline_spark.py new file mode 100644 index 000000000..03759b9e6 --- /dev/null +++ b/transforms/universal/noop/spark/test/test_noop_pipeline_spark.py @@ -0,0 +1,34 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import os + +from data_processing.test_support.launch.transform_test import ( + AbstractTransformLauncherTest, +) +from data_processing_spark.runtime.spark import SparkTransformLauncher +from noop_pipeline_transform_spark import NOOPPypelineSparkTransformConfiguration + + +class TestSparkNOOPTransform(AbstractTransformLauncherTest): + """ + Extends the super-class to define the test data for the tests defined there. + The name of this class MUST begin with the word Test so that pytest recognizes it as a test class. + """ + + def get_test_transform_fixtures(self) -> list[tuple]: + basedir = "../test-data" + basedir = os.path.abspath(os.path.join(os.path.dirname(__file__), basedir)) + fixtures = [] + launcher = SparkTransformLauncher(NOOPPypelineSparkTransformConfiguration()) + fixtures.append((launcher, {"noop_sleep_sec": 1}, basedir + "/input", basedir + "/expected")) + return fixtures