-
Notifications
You must be signed in to change notification settings - Fork 814
adding data pipelines for Roberta pre-processing #1637
Conversation
Codecov Report
@@ Coverage Diff @@
## main #1637 +/- ##
=======================================
Coverage 85.36% 85.36%
=======================================
Files 58 58
Lines 2500 2500
=======================================
Hits 2134 2134
Misses 366 366 Continue to review full report at Codecov.
|
| self.add_eos = T.AddToken(token=2, begin=False) | ||
|
|
||
| def forward(self, input: ta.DataFrame) -> ta.DataFrame: | ||
| input["tokens"] = input["text"].map(self.tokenizer, dtype=dt.List(dt.string)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I think we need to use self.tokenizer.forward for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, seems like it is working without explicitly calling forward?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@parmeet wow.sorry. if it's working, then don't worry about it . (didn't realize just providing the Module would work for map )
|
|
||
| def forward(self, input: ta.DataFrame) -> ta.DataFrame: | ||
| input["tokens"] = input["text"].map(self.tokenizer, dtype=dt.List(dt.string)) | ||
| input["tokens"] = input["tokens"].map(partial(F.truncate, max_seq_len=254)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: is this try to reserve the first 254 elements in tokens? try
input["tokens"].list.slice(stop=254)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yupp, good call. BTW, do we have append/insert operations available? that way we can use that instead of explicitly calling transforms for adding tokens ids at the begin/end as done in below using self.add_bos?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Didn't find it in https://facebookincubator.github.io/velox/functions/array.html. But array concat is definitely a function in Presto: https://prestodb.io/docs/current/functions/array.html. Will ask Velox developers on this.
Just want to check the semantic of add_bos -- basically we want to add one element at the beginning or end of the array right?
Nayef211
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just added a couple of questions and comments. Overall the pipelines LGTM.
| def forward(self, input: ta.DataFrame) -> ta.DataFrame: | ||
| input["tokens"] = input["text"].map(self.tokenizer.forward, dtype=dt.List(dt.string)) | ||
| input["tokens"] = input["tokens"].list.slice(stop=254) | ||
| input["tokens"] = input["tokens"].map(self.vocab, dtype=dt.List(dt.int32)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Out of curiosity, do we only need to provide a dtype arg when an operation changes the underlying type of the data within a DataFrame column?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is my understanding as well after reading the tutorial here. Wondering what else could potentially be needed though?
| train_dp = SST2(split="train") | ||
|
|
||
| # convert to DataFrame of size batches | ||
| # TODO: Figure out how to create DataFrame of larger size and create batches consequently |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm trying to understand the benefit of doing this in the future. Assuming we can convert a larger chunk of the rows in the datapipe into a dataframe (based on system capabilities), why would we operate on a smaller batch rather than operating on the entire dataframe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this is purely my lack of knowledge in terms of APIs. It was just out of convenience that I created DataFrame of size equal to batch-size such that to_tensor can yield the batched tensor (since to_tensor converts the whole DataFrame into tensor). Ideally we would like to create larger DataFrames, and apply batch transformations, followed by creation of batched tensors of size batch-size. Perhaps we can use batch API. Let me explore a bit on this. @wenleix wondering if you have any suggestions here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess my question here was why would this followup be necessary rather than maintaining the status quo of what you implemented here? Why would we want to create a larger DataFrame rather than always keeping DataFrame size equal to batch size?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm good question. I guess, we should do the benchmarking to figure this out. I don't know the internal dynamics, but what if creating one large dataframe of size nxbatch_size takes time less than n times the time taken to create single DataFrame of size batch_size? Also it might be more efficient to run transform on larger batch size instead of making n calls? But it could be that it just turns out to be same in which case this is already an elegant API use :).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So if batch size is small (e.g. 16 rows), the dispatch and other Python framework overhead in eager mode might be non-negligible. That being said, we haven't benchmarked and have an understanding about "what's small".
In general we should have a wrapper (e.g. StreamingDataFrame @VitalyFedyunin had prototyped ) that separates the "in memory buffer" size (say 4096) with the "batch size" (say 16 or 128), so we don't need to let user to worry about such.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general we should have a wrapper (e.g. StreamingDataFrame @VitalyFedyunin had prototyped ) that separates the "in memory buffer" size (say 4096) with the "batch size" (say 16 or 128)
This sounds interesting!
| if __name__ == "__main__": | ||
| parser = ArgumentParser() | ||
| parser.add_argument("--batch-size", default=4, type=int) | ||
| parser.add_argument("--train-steps", default=-1, type=int) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is the default value of train_steps -1 instead of 1. Wouldn't this cause a bug when doing the if i == train_steps: check?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, not really. So by default it is iterating over whole dataset because i would start from 0 right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ohh gotcha. I didn't realize that we wanted to have iteration over the entire dataset as the default behavior. My bad!
| from torchtext.datasets import SST2 | ||
|
|
||
|
|
||
| class RobertaTransform(Module): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This may not be necessary to do since this is an example but I did notice the __init__ method for the RobertaTransform is the exact same for both datapipes and I was wondering if it would be possible to consolidate the transforms by creating a base class. The child class would then extend the base class and implement the forward method (which is what expects different inputs i.e. DataFrame vs a Dict object).
Totally fine if you think this is out of scope for a simple example but I was thinking it might make it more clear as to what the main differences are in the 2 RobertaTransform implementations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the init method for the RobertaTransform is the exact same for both datapipes and I was wondering if it would be possible to consolidate the transforms by creating a base class
Yes, that's true but only for now. As we move tokenization and vocabulary to work natively with TorchArrow (instead of operating as UDFs), the APIs and initialization would not be the same.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gotcha I missed that part. Thanks for providing that clarification!
| if __name__ == "__main__": | ||
| parser = ArgumentParser() | ||
| parser.add_argument("--batch-size", default=4, type=int) | ||
| parser.add_argument("--train-steps", default=-1, type=int) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same question about this being -1
Nayef211
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
| train_dp = SST2(split="train") | ||
|
|
||
| # convert to DataFrame of size batches | ||
| # TODO: Figure out how to create DataFrame of larger size and create batches consequently |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess my question here was why would this followup be necessary rather than maintaining the status quo of what you implemented here? Why would we want to create a larger DataFrame rather than always keeping DataFrame size equal to batch size?
| if __name__ == "__main__": | ||
| parser = ArgumentParser() | ||
| parser.add_argument("--batch-size", default=4, type=int) | ||
| parser.add_argument("--train-steps", default=-1, type=int) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ohh gotcha. I didn't realize that we wanted to have iteration over the entire dataset as the default behavior. My bad!
| from torchtext.datasets import SST2 | ||
|
|
||
|
|
||
| class RobertaTransform(Module): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gotcha I missed that part. Thanks for providing that clarification!
No description provided.