-
Notifications
You must be signed in to change notification settings - Fork 169
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
initial pass at a pipelining transform #424
base: dev
Are you sure you want to change the base?
Changes from 5 commits
c548db3
3818a77
0ea45c9
e899a24
aa512b6
441999d
9369482
d97a076
8fc7637
68a1f7e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,202 @@ | ||
import pathlib | ||
from typing import Any, Union | ||
|
||
import pyarrow as pa | ||
from data_processing.transform import AbstractBinaryTransform, AbstractTableTransform | ||
|
||
|
||
name = "pipeline" | ||
cli_prefix = f"{name}_" | ||
transform_key = "transforms" | ||
|
||
|
||
class PipelinedTransform(AbstractBinaryTransform): | ||
""" | ||
Enables the sequencing of transforms. | ||
Configuration is done by providing a list of configured AbstractBinaryTransform instances under the "transforms" | ||
key in the dictionary provided to the initializer. | ||
Features/considerations include: | ||
* Transforms must be sequenced such that the output of a given transform, identified by the extension produced, | ||
must be compatible with the next transform in the sequence. | ||
* Intermediate input file names are only informative and do not actually exist on disk. The file extensions | ||
used are those produced by the output of the previous transform. The base names are constructed | ||
from the name of the generating transform class name, but should not be relied on. | ||
* If a transform produces multiple outputs (must be with the same extension) each output is applied through | ||
the subsequent transforms in the pipeline. | ||
Restrictions include: | ||
* metadata produced is merged across all transforms, for any given call to transform/flush_binary() methods. | ||
""" | ||
|
||
def __init__(self, config: dict[str, Any]): | ||
""" | ||
Create the pipeline using a list of initialize transforms | ||
Args: | ||
config: dictionary holding the following keys | ||
transforms : a list of AbstractBinaryTransform instances. All transforms must expect and produce | ||
the same data type (within the binary array) represented by the file extensions passed into and | ||
returned by the transform/flush_binary() methods. | ||
""" | ||
super().__init__(config) | ||
self.input_extension = None | ||
self.transforms = config.get(transform_key, None) | ||
if self.transforms is None: | ||
raise ValueError(f"Missing configuration key {transform_key} specifying the list of transforms to run") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also make sure the list is not empty and actually contains transforms, not some garbage |
||
for transform in self.transforms: | ||
if not isinstance(transform, AbstractBinaryTransform): | ||
raise ValueError(f"{transform} is not an instance of AbstractBinaryTransform") | ||
|
||
def transform_binary(self, file_name: str, byte_array: bytes) -> tuple[list[tuple[bytes, str]], dict[str, Any]]: | ||
""" | ||
Applies the list of transforms, provided to the initializer, to the input data. | ||
If a transform produces multiple byte arrays, each will be applied through the downstream transforms. | ||
Args: | ||
file_name: | ||
byte_array: | ||
Returns: | ||
|
||
""" | ||
r_bytes, r_metadata = self._apply_transforms_to_datum(self.transforms, (file_name, byte_array)) | ||
return r_bytes, r_metadata | ||
|
||
def transform(self, table: pa.Table, file_name: str = None) -> tuple[list[pa.Table], dict[str, Any]]: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why is this table, it should be binary. it should check the extension and then decide whether this is a table There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm trying to avoid having someone that wants to transform a table, first having to convert to bytes to call transform_binary(). i think as you said, we may need to restrict this to transform_binary(), or allow transform() but check to be sure it is a Table. Or, have transform_binary() do the type checking. I think it would be useful though to avoid unnecessary conversion back and forth of Table to bytes and bytes to Table, for each transform. |
||
""" | ||
Applies the list of transforms, provided to the initializer, to the input data. | ||
If a transform produces multiple byte arrays, each will be applied through the downstream transforms. | ||
Args: | ||
file_name: | ||
byte_array: | ||
Returns: | ||
|
||
""" | ||
r_bytes, r_metadata = self._apply_transforms_to_datum(self.transforms, (file_name, table)) | ||
tables = self._get_table_list(r_bytes) | ||
return tables, r_metadata | ||
|
||
def _get_table_list(self, transformed: list[tuple[pa.Table, str]]): | ||
tables = [] | ||
for t in transformed: | ||
tables.append(t[0]) | ||
return tables | ||
|
||
def _apply_transforms_to_data( | ||
self, transforms: list[AbstractBinaryTransform], data: list[tuple[str, bytearray]] | ||
) -> tuple[list[tuple[bytes, str]], dict[str, Any]]: | ||
r_bytes = [] | ||
r_metadata = {} | ||
for datum in data: | ||
processed, metadata = self._apply_transforms_to_datum(transforms, data) | ||
r_bytes.extend(processed) | ||
r_metadata = r_metadata | metadata | ||
|
||
return r_bytes, r_metadata | ||
|
||
def _apply_transforms_to_datum( | ||
self, | ||
transforms: Union[list[AbstractBinaryTransform], list[AbstractTableTransform]], | ||
datum: Union[tuple[str, bytearray], tuple[str, pa.Table]], | ||
) -> tuple[list[tuple[bytes, str]], dict[str, Any]]: | ||
""" | ||
Apply the list of transforms to the given datum tuple of filename and byte[] | ||
Args: | ||
transforms: | ||
datum: | ||
Returns: same as transform_binary(). | ||
|
||
""" | ||
r_metadata = {} | ||
pending_to_process = [(datum[0], datum[1])] | ||
for transform in transforms: | ||
transform_name = type(transform).__name__ | ||
to_process = pending_to_process | ||
pending_to_process = [] | ||
for tp in to_process: # Over all outputs from the last transform (or the initial input) | ||
fname = tp[0] | ||
to_transform = tp[1] | ||
transformation_tuples, metadata = self._call_transform(transform, to_transform, fname) | ||
# Capture the list of outputs from this transform as inputs to the next (or as the return values). | ||
for transformation in transformation_tuples: | ||
transformed, extension = transformation | ||
fname = transform_name + "-output" + extension | ||
next = (fname, transformed) | ||
pending_to_process.append(next) | ||
# TODO: this is not quite right and might overwrite previous values. | ||
# Would be better if we could somehow support lists. | ||
r_metadata = r_metadata | metadata | ||
|
||
# Strip the pseudo-base filename from the pending_to_process tuples, leaving only the extension, as required. | ||
r_bytes = [] | ||
for tp in pending_to_process: | ||
fname = tp[0] | ||
byte_array = tp[1] | ||
extension = pathlib.Path(fname).suffix | ||
tp = (byte_array, extension) | ||
r_bytes.append(tp) | ||
|
||
return r_bytes, r_metadata | ||
|
||
def _call_transform( | ||
self, | ||
transform: Union[AbstractTableTransform, AbstractBinaryTransform], | ||
datum: Union[pa.Table, bytearray], | ||
file_name: str, | ||
): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This seems to be wrong. Transform is always binary. A table is just one of the cases of binary. no need for Unions. Take a look at https://github.com/IBM/data-prep-kit/blob/dev/data-processing-lib/python/src/data_processing/transform/table_transform.py. it takes binary and returns binary. Table is just an intermediate format There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Understood, I was/am trying to find a way to avoid requiring the user to call transform_binary() if they are starting with a Table. And if the list of transforms is all TableTransforms, we should try and avoid two byte/Table conversions on each transform. |
||
is_table_transform = isinstance(datum, pa.Table) | ||
if is_table_transform: | ||
tables, metadata = transform.transform(datum, file_name) | ||
transformation_tuples = [] | ||
for table in tables: | ||
transformation_tuples.append((table, ".parquet")) | ||
else: | ||
transformation_tuples, metadata = transform.transform_binary(file_name, datum) | ||
return transformation_tuples, metadata | ||
|
||
def _call_flush(self, transform: Union[AbstractTableTransform, AbstractBinaryTransform]): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here this union stuff again |
||
is_table_transform = isinstance(transform, AbstractTableTransform) | ||
if is_table_transform: | ||
tables, metadata = transform.flush() | ||
transformation_tuples = [] | ||
for table in tables: | ||
transformation_tuples.append((table, ".parquet")) | ||
else: | ||
transformation_tuples, metadata = transform.flush_binary() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should be the only method called. Why is it more complex that it needs to be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Again, trying to avoid byte/Table conversion, but not sure its ready yet. |
||
return transformation_tuples, metadata | ||
|
||
def flush_binary(self) -> tuple[list[tuple[bytes, str]], dict[str, Any]]: | ||
return self._flush_transforms() | ||
|
||
def flush(self) -> tuple[list[pa.Table], dict[str, Any]]: | ||
transformed, metadata = self._flush_transforms() | ||
# but only return the table list and not the extensions as is done for flush_binary() | ||
tables = self._get_table_list(transformed) | ||
|
||
def _flush_transforms(self) -> tuple[list[Union[tuple[bytes, str], tuple[pa.Table, str]]], dict[str, Any]]: | ||
""" | ||
Call flush on all transforms in the pipeline passing flushed results to downstream | ||
transforms, as appropriate. Aggregated results. | ||
Returns: | ||
|
||
""" | ||
|
||
r_bytes = [] | ||
r_metadata = {} | ||
index = 0 | ||
for transform in self.transforms: # flush each transform | ||
index += 1 | ||
transformation_tuples, metadata = self._call_flush(transform) | ||
r_metadata = r_metadata | metadata | ||
if len(transformation_tuples) > 0: # Something was flushed from this transform. | ||
downstream_transforms = self.transforms[index:] | ||
if len(downstream_transforms) > 0: | ||
# Apply the flushed results to the downstream transforms. | ||
transformation_tuples, metadata = self._apply_transforms_to_data( | ||
downstream_transforms, transformation_tuples | ||
) | ||
r_bytes.extend(transformation_tuples) | ||
# TODO: this is not quite right and might overwrite previous values. | ||
# Would be better if we could somehow support lists. | ||
r_metadata = r_metadata | metadata | ||
else: | ||
# We flushed the last transform so just append its results. | ||
r_bytes.extend(transformation_tuples) | ||
|
||
return r_bytes, r_metadata |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,104 @@ | ||
# (C) Copyright IBM Corp. 2024. | ||
# Licensed under the Apache License, Version 2.0 (the “License”); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an “AS IS” BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
################################################################################ | ||
import pathlib | ||
from typing import Any, Tuple | ||
|
||
import pyarrow as pa | ||
from data_processing.test_support.transform import AbstractBinaryTransformTest | ||
from data_processing.test_support.transform.noop_transform import NOOPTransform | ||
from data_processing.transform import AbstractBinaryTransform | ||
from data_processing.transform.pipelined_transform import ( | ||
PipelinedTransform, | ||
transform_key, | ||
) | ||
from data_processing.utils import TransformUtils | ||
|
||
|
||
table = pa.Table.from_pydict({"name": pa.array(["Tom", "Dick", "Harry"]), "age": pa.array([0, 1, 2])}) | ||
expected_table = table # We only use NOOP | ||
|
||
# Because the test is calling transform/flush_binary(), we get the additional metadata *_doc_count. | ||
expected_metadata_list = [ | ||
{"nfiles": 1, "nrows": 3, "result_doc_count": 3, "source_doc_count": 3}, # transform() result | ||
{} # flush() result | ||
# {"result_doc_count": 0}, # flush_binary() result | ||
] | ||
|
||
|
||
class DoublerTransform(AbstractBinaryTransform): | ||
def __init__(self): | ||
self.extension = None | ||
self.buffer = [] | ||
|
||
def transform_binary(self, file_name: str, byte_array: bytes) -> tuple[list[tuple[bytes, str]], dict[str, Any]]: | ||
self.extension = pathlib.Path(file_name).suffix | ||
the_tuple = (byte_array, self.extension) | ||
self.buffer.append(the_tuple) | ||
return [the_tuple], {} | ||
|
||
def flush_binary(self) -> tuple[list[tuple[bytes, str]], dict[str, Any]]: | ||
r = self.buffer | ||
self.buffer = None | ||
return r, {} | ||
|
||
|
||
class TestPipelinedBinaryTransform(AbstractBinaryTransformTest): | ||
""" | ||
Extends the super-class to define the test data for the tests defined there. | ||
The name of this class MUST begin with the word Test so that pytest recognizes it as a test class. | ||
""" | ||
|
||
def get_test_transform_fixtures(self) -> list[Tuple]: | ||
# Defines correct evaluation of pipeline for the expected number of tables produced. | ||
# It does NOT test the transformation of the transform contained in the pipeline other | ||
# than to make sure the byte arrays are not changed due to using a NoopTransform in the pipeline. | ||
# .parquet is used as the extension because the transforms being used are AbstractTableTransforms | ||
# which use/expect parquet files. | ||
fixtures = [] | ||
noop0 = NOOPTransform({"sleep": 0}) | ||
noop1 = NOOPTransform({"sleep": 0}) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is complete cheating There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. its a start. that said, we're not testing the application of the underlying transforms, as much as the structure of the output. but yes, would nice to have a better test, but would require having transforms othre than NOOP in the test_support packages. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah you are referring to the configuration part. We have always been saying that transforms can be configured outside of the CLI/runtime mechanics. I'm doing that here. However, it is true, that to run a pipeline transform in a runtime may require more work - this is more for the python only non-runtime users, at least initially. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I repeat my sentiment. Its circumventing the "normal" execution |
||
config = {transform_key: [noop0]} | ||
binary_table = TransformUtils.convert_arrow_to_binary(table) | ||
binary_expected_table = TransformUtils.convert_arrow_to_binary(expected_table) | ||
|
||
# Simple test to makes sure a single transform works | ||
fixtures.append( | ||
( | ||
PipelinedTransform(config), | ||
[("foo.parquet", binary_table)], | ||
[(binary_expected_table, ".parquet")], | ||
expected_metadata_list, | ||
) | ||
) | ||
|
||
# Put two transforms together | ||
config = {transform_key: [noop0, noop1]} | ||
fixtures.append( | ||
( | ||
PipelinedTransform(config), | ||
[("foo.parquet", binary_table)], | ||
[(binary_expected_table, ".parquet")], | ||
expected_metadata_list, | ||
) | ||
) | ||
|
||
# Add a transform to the pipeline that a) produces muliple tables and b) uses flush() to do it. | ||
config = {transform_key: [noop0, DoublerTransform()]} | ||
fixtures.append( | ||
( | ||
PipelinedTransform(config), | ||
[("foo.parquet", binary_table)], | ||
[(binary_expected_table, ".parquet"), (binary_expected_table, ".parquet")], | ||
expected_metadata_list, | ||
) | ||
) | ||
return fixtures |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
initialized?