Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BE] Unify Map style DataPipes to support input_col and output_col syntax #562

Open
wants to merge 12 commits into
base: gh/VitalyFedyunin/10/base
Choose a base branch
from

Conversation

VitalyFedyunin
Copy link
Contributor

@VitalyFedyunin VitalyFedyunin commented Jun 29, 2022

VitalyFedyunin added a commit that referenced this pull request Jun 29, 2022
…ntax

ghstack-source-id: ac06eb69a0dd654ae6ef6b9a0086cb5625ac487b
Pull Request resolved: #562
@facebook-github-bot facebook-github-bot added the CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed. label Jun 29, 2022
Copy link
Contributor

@NivekT NivekT left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also add one for flatmap?



class MapTemplateIterDataPipe(Mapper):
def __init__(self, source_datapipe, input_col=None, output_col=None) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Let's add a docstring for the purpose of this class.
  2. Add the class MapTemplate to RST documnetation file as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will update DP contributors doc and add docstrings

@VitalyFedyunin
Copy link
Contributor Author

Surely doing it for flatmap now, plus will attempt to convert several inner DataPipes to this approach.

Copy link
Contributor

@ejguan ejguan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to move this class to PyTorch Core for Mapper and Filter.

@VitalyFedyunin VitalyFedyunin mentioned this pull request Jun 30, 2022
VitalyFedyunin added a commit that referenced this pull request Jun 30, 2022
…ntax

ghstack-source-id: bc08ee551c3b1b72158dadd5701d1d1b15b496e5
Pull Request resolved: #562
@VitalyFedyunin
Copy link
Contributor Author

UPD: Temporary make a copy of FlatMapper
UPD: Rewrote JsonParser as example.

@VitalyFedyunin
Copy link
Contributor Author

I think it's better to move this class to PyTorch Core for Mapper and Filter.

To be honest I prefer to keep as much as possible inside this repo. This will make future deprecation easier.

VitalyFedyunin added a commit that referenced this pull request Jul 6, 2022
…ntax

ghstack-source-id: 9e62d23ba6008a32f5b71e32da93896571d598d0
Pull Request resolved: #562
VitalyFedyunin added a commit that referenced this pull request Jul 6, 2022
…ntax

ghstack-source-id: 3227ba7b13a3477fb294708fb0bb551399d70747
Pull Request resolved: #562
VitalyFedyunin added a commit that referenced this pull request Jul 6, 2022
…ntax

ghstack-source-id: e7320cd563b122d952dbf0dfacb4fe6d39804b51
Pull Request resolved: #562
return {"file_name": data[0], "stream": data[1]}

dict_dp = datapipe_nonempty.map(as_dict)
parsed_dp = dict_dp.parse_json_files(input_col="stream", output_col="json")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the most important part of the PR. IMO it will simplify some of the domains code.

Comment on lines 28 to 29
@functional_datapipe("flatmap_proto")
class FlatMapperProtoIterDataPipe(IterDataPipe[T_co]):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want to directly change the FlatMapper here?

@functional_datapipe("flatmap")
class FlatMapperIterDataPipe(IterDataPipe[T_co]):

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I do, was making sure I cover all use cases before replacement.

self.kwargs = kwargs

def __iter__(self) -> Iterator[Tuple[str, Dict]]:
for file_name, stream in self.source_datapipe:
def _map(self, stream: IO):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think we should change the type to StreamWrapper here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are fine with any stream as input.

Comment on lines 24 to 25
def _map(self, *args, **kwargs) -> T_co:
raise NotImplementedError
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, we might want to use abc. abstractmethod here to guarantee to implement the method at the construct time.

Comment on lines 111 to 112
def _flatmap(self, *args, **kwargs) -> Iterable[T_co]:
raise NotImplementedError
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto

@VitalyFedyunin
Copy link
Contributor Author

Direct use of abstract leads to:

torchdata/datapipes/iter/util/jsonparser.py:40:5: error: Signature of "_map"
incompatible with supertype "MapTemplateIterDataPipe"  [override]
        def _map(self, stream: IO):

@VitalyFedyunin
Copy link
Contributor Author

@VitalyFedyunin has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator.

_check_unpickable_fn(fn)
self.fn = fn # type: ignore[assignment]

self.input_col = input_col
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can also validate input_col with fn using torch.utils.data.datapipes.utils.common.validate_input_col
https://github.com/pytorch/pytorch/blob/cfb9d0d23314fd28be118b6ca280ded55364e71c/torch/utils/data/datapipes/utils/common.py#L24

Copy link
Contributor

@NivekT NivekT left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This PR is fine (with nit comments) but there is something unsatisfying about how input_col and output_col work in the case where I want to select an element from a tuple/list?

Note that we do not have to address the below within this PR.

A common return pattern that we have is (Path, Stream) from archives/streams/etc. Let's say I opened a file and now I have a DataPipe of tuples of (Path, Stream), and I want to throw away the path and parse the stream. I will have to do something like this

datapipe = FileOpener(json_files, mode="b")
datapipe = datapipe.parse_json_files()
datapipe = datapipe.map(lambda x: x[1])  # Ideally this should not be needed, and handled by `input_col`, `output_col`

I feel like ideally the input_col and output_col argument should allow for selection (unpack when the elements are list of length 1 or tuple of length 1).

We likely also want to allow the case where the input is not a list/tuple:

datapipe = IterableWrapper([stream1, stream2])
datapipe = datapipe.parse_json_files()  # I think we should support this, but it currently doesn't work

What do you two think?

cc: @ejguan

class MapTemplateIterDataPipe(Mapper[T_co]):
def __init__(self, source_datapipe, input_col=None, output_col=None) -> None:
fn = getattr(self, "_map")
assert fn is not None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
assert fn is not None
assert fn is not None, "`MapTemplate` expects subclass to have attribute `_map` defined, but it is `None`."

Comment on lines 33 to 34
[('1.json', ['foo', {'bar': ['baz', None, 1.0, 2]}]), ('2.json', {'__complex__': True, 'real': 1, 'imag': 2})]
"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding usage example for input_col argument

Suggested change
[('1.json', ['foo', {'bar': ['baz', None, 1.0, 2]}]), ('2.json', {'__complex__': True, 'real': 1, 'imag': 2})]
"""
[('1.json', ['foo', {'bar': ['baz', None, 1.0, 2]}]), ('2.json', {'__complex__': True, 'real': 1, 'imag': 2})]
>>> datapipe_extra_col = datapipe1.map(lambda x: (x[0], "extra", x[1]))
>>> # Use `input_col` if the stream is not in position 1
>>> json_dp2 = datapipe_extra_col.parse_json_files(input_col=2)
>>> list(json_dp2)
[
("1.json", "extra", ["foo", {"bar": ["baz", None, 1.0, 2]}]),
("2.json", "extra", {"__complex__": True, "real": 1, "imag": 2}),
]
"""



@functional_datapipe("parse_json_files")
class JsonParserIterDataPipe(IterDataPipe[Tuple[str, Dict]]):
class JsonParserIterDataPipe(MapTemplate):
r"""
Reads from JSON data streams and yields a tuple of file name and JSON data (functional name: ``parse_json_files``).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to update docstring to add new arguments

@@ -162,6 +165,115 @@ def __len__(self) -> int:
raise TypeError(f"{type(self).__name__}'s length relies on the output of its function.")


# TODO(VitalyFedyunin): Replacing FlatMapperIterDataPipe will require BC breaking change of input_col behaviour
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this because when input_col is specified`, the new version will try to return a list/tuple if the original input is a list/tuple?

@ejguan
Copy link
Contributor

ejguan commented Oct 6, 2022

nit: This PR is fine (with nit comments) but there is something unsatisfying about how input_col and output_col work in the case where I want to select an element from a tuple/list?

Note that we do not have to address the below within this PR.

A common return pattern that we have is (Path, Stream) from archives/streams/etc. Let's say I opened a file and now I have a DataPipe of tuples of (Path, Stream), and I want to throw away the path and parse the stream. I will have to do something like this

datapipe = FileOpener(json_files, mode="b")
datapipe = datapipe.parse_json_files()
datapipe = datapipe.map(lambda x: x[1])  # Ideally this should not be needed, and handled by `input_col`, `output_col`

I feel like ideally the input_col and output_col argument should allow for selection (unpack when the elements are list of length 1 or tuple of length 1).

We likely also want to allow the case where the input is not a list/tuple:

datapipe = IterableWrapper([stream1, stream2])
datapipe = datapipe.parse_json_files()  # I think we should support this, but it currently doesn't work

What do you two think?

cc: @ejguan

I think we have discussed about it earlier about the expected behavior of the combination of input_col and output_col. Currently, when input_col is specified buy output_col is not, we will do in-place operation. In order to support selection, i can't find a good value for output_col to indicate that we want to select a single column.

datapipe = datapipe.parse_json_files()  # I think we should support this, but it currently doesn't work

Do you mean you want it still works with stream input rather than a tuple of (pathname, stream)

@NivekT
Copy link
Contributor

NivekT commented Oct 6, 2022

I think we have discussed about it earlier about the expected behavior of the combination of input_col and output_col. Currently, when input_col is specified buy output_col is not, we will do in-place operation. In order to support selection, i can't find a good value for output_col to indicate that we want to select a single column.

One potential solution is to have output_col=[] or output_col=tuple() as selection. There may be side effects that I currently cannot think of.

Do you mean you want it still works with stream input rather than a tuple of (pathname, stream)

Yes, it may require some adjustment to how input_col and MapTemplate works to handle non-list/tuple inputs.

@NivekT
Copy link
Contributor

NivekT commented Oct 6, 2022

Sorry I accidentally edited your comment while quoting it.

@ejguan
Copy link
Contributor

ejguan commented Oct 6, 2022

Yes, it may require some adjustment to how input_col and MapTemplate works to handle non-list/tuple inputs.

I think it's doable after this PR. We can let JsonParser be a MapTemplate with input_col=1 by default to prevent BC breaking.
Users can specify input_col=None to make it directly working with stream.

@facebook-github-bot
Copy link
Contributor

Hi @VitalyFedyunin!

Thank you for your pull request.

We require contributors to sign our Contributor License Agreement, and yours needs attention.

You currently have a record in our system, but the CLA is no longer valid, and will need to be resubmitted.

Process

In order for us to review and merge your suggested changes, please sign at https://code.facebook.com/cla. If you are contributing on behalf of someone else (eg your employer), the individual CLA may not be sufficient and your employer may need to sign the corporate CLA.

Once the CLA is signed, our tooling will perform checks and validations. Afterwards, the pull request will be tagged with CLA signed. The tagging process may take up to 1 hour after signing. Please give it that time before contacting us about it.

If you have received this in error or have any questions, please contact us at [email protected]. Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants