-
Notifications
You must be signed in to change notification settings - Fork 141
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BE] Unify Map style DataPipes to support input_col and output_col syntax #562
base: gh/VitalyFedyunin/10/base
Are you sure you want to change the base?
[BE] Unify Map style DataPipes to support input_col and output_col syntax #562
Conversation
…ntax [ghstack-poisoned]
…ntax ghstack-source-id: ac06eb69a0dd654ae6ef6b9a0086cb5625ac487b Pull Request resolved: #562
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also add one for flatmap
?
|
||
|
||
class MapTemplateIterDataPipe(Mapper): | ||
def __init__(self, source_datapipe, input_col=None, output_col=None) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Let's add a docstring for the purpose of this class.
- Add the class
MapTemplate
to RST documnetation file as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will update DP contributors doc and add docstrings
Surely doing it for flatmap now, plus will attempt to convert several inner DataPipes to this approach. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's better to move this class to PyTorch Core for Mapper
and Filter
.
…tput_col syntax" [ghstack-poisoned]
…ntax ghstack-source-id: bc08ee551c3b1b72158dadd5701d1d1b15b496e5 Pull Request resolved: #562
UPD: Temporary make a copy of |
To be honest I prefer to keep as much as possible inside this repo. This will make future deprecation easier. |
…tput_col syntax" [ghstack-poisoned]
…ntax ghstack-source-id: 9e62d23ba6008a32f5b71e32da93896571d598d0 Pull Request resolved: #562
…tput_col syntax" [ghstack-poisoned]
…ntax ghstack-source-id: 3227ba7b13a3477fb294708fb0bb551399d70747 Pull Request resolved: #562
…tput_col syntax" [ghstack-poisoned]
…ntax ghstack-source-id: e7320cd563b122d952dbf0dfacb4fe6d39804b51 Pull Request resolved: #562
return {"file_name": data[0], "stream": data[1]} | ||
|
||
dict_dp = datapipe_nonempty.map(as_dict) | ||
parsed_dp = dict_dp.parse_json_files(input_col="stream", output_col="json") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the most important part of the PR. IMO it will simplify some of the domains code.
@functional_datapipe("flatmap_proto") | ||
class FlatMapperProtoIterDataPipe(IterDataPipe[T_co]): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you want to directly change the FlatMapper
here?
data/torchdata/datapipes/iter/transform/callable.py
Lines 93 to 94 in 14b2291
@functional_datapipe("flatmap") | |
class FlatMapperIterDataPipe(IterDataPipe[T_co]): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I do, was making sure I cover all use cases before replacement.
self.kwargs = kwargs | ||
|
||
def __iter__(self) -> Iterator[Tuple[str, Dict]]: | ||
for file_name, stream in self.source_datapipe: | ||
def _map(self, stream: IO): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think we should change the type to StreamWrapper
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are fine with any stream as input.
def _map(self, *args, **kwargs) -> T_co: | ||
raise NotImplementedError |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, we might want to use abc. abstractmethod
here to guarantee to implement the method at the construct time.
def _flatmap(self, *args, **kwargs) -> Iterable[T_co]: | ||
raise NotImplementedError |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto
…tput_col syntax" [ghstack-poisoned]
…tput_col syntax" [ghstack-poisoned]
…tput_col syntax" [ghstack-poisoned]
…tput_col syntax" [ghstack-poisoned]
…tput_col syntax" [ghstack-poisoned]
Direct use of abstract leads to:
|
…tput_col syntax" [ghstack-poisoned]
…tput_col syntax" [ghstack-poisoned]
@VitalyFedyunin has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
_check_unpickable_fn(fn) | ||
self.fn = fn # type: ignore[assignment] | ||
|
||
self.input_col = input_col |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can also validate input_col
with fn
using torch.utils.data.datapipes.utils.common.validate_input_col
https://github.com/pytorch/pytorch/blob/cfb9d0d23314fd28be118b6ca280ded55364e71c/torch/utils/data/datapipes/utils/common.py#L24
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: This PR is fine (with nit comments) but there is something unsatisfying about how input_col
and output_col
work in the case where I want to select an element from a tuple/list?
Note that we do not have to address the below within this PR.
A common return pattern that we have is (Path, Stream)
from archives/streams/etc. Let's say I opened a file and now I have a DataPipe of tuples of (Path, Stream)
, and I want to throw away the path and parse the stream. I will have to do something like this
datapipe = FileOpener(json_files, mode="b")
datapipe = datapipe.parse_json_files()
datapipe = datapipe.map(lambda x: x[1]) # Ideally this should not be needed, and handled by `input_col`, `output_col`
I feel like ideally the input_col
and output_col
argument should allow for selection (unpack when the elements are list of length 1 or tuple of length 1).
We likely also want to allow the case where the input is not a list/tuple:
datapipe = IterableWrapper([stream1, stream2])
datapipe = datapipe.parse_json_files() # I think we should support this, but it currently doesn't work
What do you two think?
cc: @ejguan
class MapTemplateIterDataPipe(Mapper[T_co]): | ||
def __init__(self, source_datapipe, input_col=None, output_col=None) -> None: | ||
fn = getattr(self, "_map") | ||
assert fn is not None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assert fn is not None | |
assert fn is not None, "`MapTemplate` expects subclass to have attribute `_map` defined, but it is `None`." |
[('1.json', ['foo', {'bar': ['baz', None, 1.0, 2]}]), ('2.json', {'__complex__': True, 'real': 1, 'imag': 2})] | ||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding usage example for input_col
argument
[('1.json', ['foo', {'bar': ['baz', None, 1.0, 2]}]), ('2.json', {'__complex__': True, 'real': 1, 'imag': 2})] | |
""" | |
[('1.json', ['foo', {'bar': ['baz', None, 1.0, 2]}]), ('2.json', {'__complex__': True, 'real': 1, 'imag': 2})] | |
>>> datapipe_extra_col = datapipe1.map(lambda x: (x[0], "extra", x[1])) | |
>>> # Use `input_col` if the stream is not in position 1 | |
>>> json_dp2 = datapipe_extra_col.parse_json_files(input_col=2) | |
>>> list(json_dp2) | |
[ | |
("1.json", "extra", ["foo", {"bar": ["baz", None, 1.0, 2]}]), | |
("2.json", "extra", {"__complex__": True, "real": 1, "imag": 2}), | |
] | |
""" |
|
||
|
||
@functional_datapipe("parse_json_files") | ||
class JsonParserIterDataPipe(IterDataPipe[Tuple[str, Dict]]): | ||
class JsonParserIterDataPipe(MapTemplate): | ||
r""" | ||
Reads from JSON data streams and yields a tuple of file name and JSON data (functional name: ``parse_json_files``). | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to update docstring to add new arguments
@@ -162,6 +165,115 @@ def __len__(self) -> int: | |||
raise TypeError(f"{type(self).__name__}'s length relies on the output of its function.") | |||
|
|||
|
|||
# TODO(VitalyFedyunin): Replacing FlatMapperIterDataPipe will require BC breaking change of input_col behaviour |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this because when input_col
is specified`, the new version will try to return a list/tuple if the original input is a list/tuple?
I think we have discussed about it earlier about the expected behavior of the combination of
Do you mean you want it still works with stream input rather than a tuple of |
One potential solution is to have
Yes, it may require some adjustment to how |
Sorry I accidentally edited your comment while quoting it. |
I think it's doable after this PR. We can let |
Hi @VitalyFedyunin! Thank you for your pull request. We require contributors to sign our Contributor License Agreement, and yours needs attention. You currently have a record in our system, but the CLA is no longer valid, and will need to be resubmitted. ProcessIn order for us to review and merge your suggested changes, please sign at https://code.facebook.com/cla. If you are contributing on behalf of someone else (eg your employer), the individual CLA may not be sufficient and your employer may need to sign the corporate CLA. Once the CLA is signed, our tooling will perform checks and validations. Afterwards, the pull request will be tagged with If you have received this in error or have any questions, please contact us at [email protected]. Thanks! |
Stack from ghstack (oldest at bottom):
Differential Revision: D40146676