Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

initial pass at a pipelining transform #424

Draft
wants to merge 10 commits into
base: dev
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,9 @@ def get_table(self, path: str) -> tuple[pa.table, int]:
"""
Get pyArrow table for a given path
:param path - file path
:return: pyArrow table or None, if the table read failed and number of operation retries.
Retries are performed on operation failures and are typically due to the resource overload.
:return: Tuple containing
pyarrow.Table: PyArrow table if read successfully, None otherwise.
the number of retries. Retries are performed on operation failures and are typically due to the resource overload.
"""
pass

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,9 @@ def get_table(self, path: str) -> tuple[pa.table, int]:
Args:
path (str): Path to the file containing the table.

Returns:
Returns: Tuple containing
pyarrow.Table: PyArrow table if read successfully, None otherwise.
the number of retries.
"""

try:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
import pathlib
from typing import Any, Union

import pyarrow as pa
from data_processing.transform import AbstractBinaryTransform, AbstractTableTransform


name = "pipeline"
cli_prefix = f"{name}_"
transform_key = "transforms"


class PipelinedTransform(AbstractBinaryTransform):
"""
Enables the sequencing of transforms.
Configuration is done by providing a list of configured AbstractBinaryTransform instances under the "transforms"
key in the dictionary provided to the initializer.
Features/considerations include:
* Transforms must be sequenced such that the output of a given transform, identified by the extension produced,
must be compatible with the next transform in the sequence.
* Intermediate input file names are only informative and do not actually exist on disk. The file extensions
used are those produced by the output of the previous transform. The base names are constructed
from the name of the generating transform class name, but should not be relied on.
* If a transform produces multiple outputs (must be with the same extension) each output is applied through
the subsequent transforms in the pipeline.
Restrictions include:
* metadata produced is merged across all transforms, for any given call to transform/flush_binary() methods.
"""

def __init__(self, config: dict[str, Any]):
"""
Create the pipeline using a list of initialize transforms
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

initialized?

Args:
config: dictionary holding the following keys
transforms : a list of AbstractBinaryTransform instances. All transforms must expect and produce
the same data type (within the binary array) represented by the file extensions passed into and
returned by the transform/flush_binary() methods.
"""
super().__init__(config)
self.input_extension = None
self.transforms = config.get(transform_key, None)
if self.transforms is None:
raise ValueError(f"Missing configuration key {transform_key} specifying the list of transforms to run")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also make sure the list is not empty and actually contains transforms, not some garbage

for transform in self.transforms:
if not isinstance(transform, AbstractBinaryTransform):
raise ValueError(f"{transform} is not an instance of AbstractBinaryTransform")

def transform_binary(self, file_name: str, byte_array: bytes) -> tuple[list[tuple[bytes, str]], dict[str, Any]]:
"""
Applies the list of transforms, provided to the initializer, to the input data.
If a transform produces multiple byte arrays, each will be applied through the downstream transforms.
Args:
file_name:
byte_array:
Returns:

"""
r_bytes, r_metadata = self._apply_transforms_to_datum(self.transforms, (file_name, byte_array))
return r_bytes, r_metadata

def transform(self, table: pa.Table, file_name: str = None) -> tuple[list[pa.Table], dict[str, Any]]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this table, it should be binary. it should check the extension and then decide whether this is a table

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm trying to avoid having someone that wants to transform a table, first having to convert to bytes to call transform_binary(). i think as you said, we may need to restrict this to transform_binary(), or allow transform() but check to be sure it is a Table. Or, have transform_binary() do the type checking. I think it would be useful though to avoid unnecessary conversion back and forth of Table to bytes and bytes to Table, for each transform.

"""
Applies the list of transforms, provided to the initializer, to the input data.
If a transform produces multiple byte arrays, each will be applied through the downstream transforms.
Args:
file_name:
byte_array:
Returns:

"""
r_bytes, r_metadata = self._apply_transforms_to_datum(self.transforms, (file_name, table))
tables = self._get_table_list(r_bytes)
return tables, r_metadata

def _get_table_list(self, transformed: list[tuple[pa.Table, str]]):
tables = []
for t in transformed:
tables.append(t[0])
return tables

def _apply_transforms_to_data(
self, transforms: list[AbstractBinaryTransform], data: list[tuple[str, bytearray]]
) -> tuple[list[tuple[bytes, str]], dict[str, Any]]:
r_bytes = []
r_metadata = {}
for datum in data:
processed, metadata = self._apply_transforms_to_datum(transforms, data)
r_bytes.extend(processed)
r_metadata = r_metadata | metadata

return r_bytes, r_metadata

def _apply_transforms_to_datum(
self,
transforms: Union[list[AbstractBinaryTransform], list[AbstractTableTransform]],
datum: Union[tuple[str, bytearray], tuple[str, pa.Table]],
) -> tuple[list[tuple[bytes, str]], dict[str, Any]]:
"""
Apply the list of transforms to the given datum tuple of filename and byte[]
Args:
transforms:
datum:
Returns: same as transform_binary().

"""
r_metadata = {}
pending_to_process = [(datum[0], datum[1])]
for transform in transforms:
transform_name = type(transform).__name__
to_process = pending_to_process
pending_to_process = []
for tp in to_process: # Over all outputs from the last transform (or the initial input)
fname = tp[0]
to_transform = tp[1]
transformation_tuples, metadata = self._call_transform(transform, to_transform, fname)
# Capture the list of outputs from this transform as inputs to the next (or as the return values).
for transformation in transformation_tuples:
transformed, extension = transformation
fname = transform_name + "-output" + extension
next = (fname, transformed)
pending_to_process.append(next)
# TODO: this is not quite right and might overwrite previous values.
# Would be better if we could somehow support lists.
r_metadata = r_metadata | metadata

# Strip the pseudo-base filename from the pending_to_process tuples, leaving only the extension, as required.
r_bytes = []
for tp in pending_to_process:
fname = tp[0]
byte_array = tp[1]
extension = pathlib.Path(fname).suffix
tp = (byte_array, extension)
r_bytes.append(tp)

return r_bytes, r_metadata

def _call_transform(
self,
transform: Union[AbstractTableTransform, AbstractBinaryTransform],
datum: Union[pa.Table, bytearray],
file_name: str,
):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be wrong. Transform is always binary. A table is just one of the cases of binary. no need for Unions. Take a look at https://github.com/IBM/data-prep-kit/blob/dev/data-processing-lib/python/src/data_processing/transform/table_transform.py. it takes binary and returns binary. Table is just an intermediate format

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood, I was/am trying to find a way to avoid requiring the user to call transform_binary() if they are starting with a Table. And if the list of transforms is all TableTransforms, we should try and avoid two byte/Table conversions on each transform.

is_table_transform = isinstance(datum, pa.Table)
if is_table_transform:
tables, metadata = transform.transform(datum, file_name)
transformation_tuples = []
for table in tables:
transformation_tuples.append((table, ".parquet"))
else:
transformation_tuples, metadata = transform.transform_binary(file_name, datum)
return transformation_tuples, metadata

def _call_flush(self, transform: Union[AbstractTableTransform, AbstractBinaryTransform]):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here this union stuff again

is_table_transform = isinstance(transform, AbstractTableTransform)
if is_table_transform:
tables, metadata = transform.flush()
transformation_tuples = []
for table in tables:
transformation_tuples.append((table, ".parquet"))
else:
transformation_tuples, metadata = transform.flush_binary()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be the only method called. Why is it more complex that it needs to be

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, trying to avoid byte/Table conversion, but not sure its ready yet.

return transformation_tuples, metadata

def flush_binary(self) -> tuple[list[tuple[bytes, str]], dict[str, Any]]:
return self._flush_transforms()

def flush(self) -> tuple[list[pa.Table], dict[str, Any]]:
transformed, metadata = self._flush_transforms()
# but only return the table list and not the extensions as is done for flush_binary()
tables = self._get_table_list(transformed)

def _flush_transforms(self) -> tuple[list[Union[tuple[bytes, str], tuple[pa.Table, str]]], dict[str, Any]]:
"""
Call flush on all transforms in the pipeline passing flushed results to downstream
transforms, as appropriate. Aggregated results.
Returns:

"""

r_bytes = []
r_metadata = {}
index = 0
for transform in self.transforms: # flush each transform
index += 1
transformation_tuples, metadata = self._call_flush(transform)
r_metadata = r_metadata | metadata
if len(transformation_tuples) > 0: # Something was flushed from this transform.
downstream_transforms = self.transforms[index:]
if len(downstream_transforms) > 0:
# Apply the flushed results to the downstream transforms.
transformation_tuples, metadata = self._apply_transforms_to_data(
downstream_transforms, transformation_tuples
)
r_bytes.extend(transformation_tuples)
# TODO: this is not quite right and might overwrite previous values.
# Would be better if we could somehow support lists.
r_metadata = r_metadata | metadata
else:
# We flushed the last transform so just append its results.
r_bytes.extend(transformation_tuples)

return r_bytes, r_metadata
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
# (C) Copyright IBM Corp. 2024.
# Licensed under the Apache License, Version 2.0 (the “License”);
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an “AS IS” BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import pathlib
from typing import Any, Tuple

import pyarrow as pa
from data_processing.test_support.transform import AbstractBinaryTransformTest
from data_processing.test_support.transform.noop_transform import NOOPTransform
from data_processing.transform import AbstractBinaryTransform
from data_processing.transform.pipelined_transform import (
PipelinedTransform,
transform_key,
)
from data_processing.utils import TransformUtils


table = pa.Table.from_pydict({"name": pa.array(["Tom", "Dick", "Harry"]), "age": pa.array([0, 1, 2])})
expected_table = table # We only use NOOP

# Because the test is calling transform/flush_binary(), we get the additional metadata *_doc_count.
expected_metadata_list = [
{"nfiles": 1, "nrows": 3, "result_doc_count": 3, "source_doc_count": 3}, # transform() result
{} # flush() result
# {"result_doc_count": 0}, # flush_binary() result
]


class DoublerTransform(AbstractBinaryTransform):
def __init__(self):
self.extension = None
self.buffer = []

def transform_binary(self, file_name: str, byte_array: bytes) -> tuple[list[tuple[bytes, str]], dict[str, Any]]:
self.extension = pathlib.Path(file_name).suffix
the_tuple = (byte_array, self.extension)
self.buffer.append(the_tuple)
return [the_tuple], {}

def flush_binary(self) -> tuple[list[tuple[bytes, str]], dict[str, Any]]:
r = self.buffer
self.buffer = None
return r, {}


class TestPipelinedBinaryTransform(AbstractBinaryTransformTest):
"""
Extends the super-class to define the test data for the tests defined there.
The name of this class MUST begin with the word Test so that pytest recognizes it as a test class.
"""

def get_test_transform_fixtures(self) -> list[Tuple]:
# Defines correct evaluation of pipeline for the expected number of tables produced.
# It does NOT test the transformation of the transform contained in the pipeline other
# than to make sure the byte arrays are not changed due to using a NoopTransform in the pipeline.
# .parquet is used as the extension because the transforms being used are AbstractTableTransforms
# which use/expect parquet files.
fixtures = []
noop0 = NOOPTransform({"sleep": 0})
noop1 = NOOPTransform({"sleep": 0})
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is complete cheating

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its a start. that said, we're not testing the application of the underlying transforms, as much as the structure of the output. but yes, would nice to have a better test, but would require having transforms othre than NOOP in the test_support packages.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah you are referring to the configuration part. We have always been saying that transforms can be configured outside of the CLI/runtime mechanics. I'm doing that here. However, it is true, that to run a pipeline transform in a runtime may require more work - this is more for the python only non-runtime users, at least initially.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I repeat my sentiment. Its circumventing the "normal" execution

config = {transform_key: [noop0]}
binary_table = TransformUtils.convert_arrow_to_binary(table)
binary_expected_table = TransformUtils.convert_arrow_to_binary(expected_table)

# Simple test to makes sure a single transform works
fixtures.append(
(
PipelinedTransform(config),
[("foo.parquet", binary_table)],
[(binary_expected_table, ".parquet")],
expected_metadata_list,
)
)

# Put two transforms together
config = {transform_key: [noop0, noop1]}
fixtures.append(
(
PipelinedTransform(config),
[("foo.parquet", binary_table)],
[(binary_expected_table, ".parquet")],
expected_metadata_list,
)
)

# Add a transform to the pipeline that a) produces muliple tables and b) uses flush() to do it.
config = {transform_key: [noop0, DoublerTransform()]}
fixtures.append(
(
PipelinedTransform(config),
[("foo.parquet", binary_table)],
[(binary_expected_table, ".parquet"), (binary_expected_table, ".parquet")],
expected_metadata_list,
)
)
return fixtures