From 9e4dafd9fbb3cb9dd8fbcf11b5b70beffbb7c6bd Mon Sep 17 00:00:00 2001 From: Elijah Rippeth Date: Wed, 19 Jan 2022 09:38:47 -0500 Subject: [PATCH 1/5] add double caching for yelp full to speed up extracted reading. --- torchtext/datasets/yelpreviewfull.py | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/torchtext/datasets/yelpreviewfull.py b/torchtext/datasets/yelpreviewfull.py index 270b627845..e377717f9b 100644 --- a/torchtext/datasets/yelpreviewfull.py +++ b/torchtext/datasets/yelpreviewfull.py @@ -47,8 +47,17 @@ def YelpReviewFull(root: str, split: Union[Tuple[str], str]): cache_dp = GDriveReader(cache_dp).end_caching(mode="wb", same_filepath_fn=True) cache_dp = FileOpener(cache_dp, mode="b") - extracted_files = cache_dp.read_from_tar() - - filter_extracted_files = extracted_files.filter(lambda x: _EXTRACTED_FILES[split] in x[0]) - - return filter_extracted_files.parse_csv().map(fn=lambda t: (int(t[0]), " ".join(t[1:]))) + def extracted_filepath_fn(_): + file_path = os.path.join(root, _EXTRACTED_FILES[split]) + dir_path = os.path.dirname(file_path) + if not os.path.exists(dir_path): + os.makedirs(dir_path) + return file_path + + cache_dp = cache_dp.on_disk_cache( + filepath_fn=extracted_filepath_fn + ) + cache_dp = cache_dp.read_from_tar().filter(lambda x: _EXTRACTED_FILES[split] in x[0]) + cache_dp = cache_dp.end_caching(mode="wb", same_filepath_fn=True) + cache_dp = FileOpener(cache_dp, mode="b") + return cache_dp.parse_csv().map(fn=lambda t: (int(t[0]), " ".join(t[1:]))) From ac7e739dbb13ef632c4f4a6b3d431e448d114c5e Mon Sep 17 00:00:00 2001 From: Elijah Rippeth Date: Wed, 19 Jan 2022 14:42:32 -0500 Subject: [PATCH 2/5] simplify filepath_fn --- torchtext/datasets/yelpreviewfull.py | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/torchtext/datasets/yelpreviewfull.py b/torchtext/datasets/yelpreviewfull.py index e377717f9b..db6ab2daf6 100644 --- a/torchtext/datasets/yelpreviewfull.py +++ b/torchtext/datasets/yelpreviewfull.py @@ -33,7 +33,7 @@ @_add_docstring_header(num_lines=NUM_LINES, num_classes=5) @_create_dataset_directory(dataset_name=DATASET_NAME) -@_wrap_split_argument(('train', 'test')) +@_wrap_split_argument(("train", "test")) def YelpReviewFull(root: str, split: Union[Tuple[str], str]): if not is_module_available("torchdata"): raise ModuleNotFoundError("Package `torchdata` not found. Please install following instructions at `https://github.com/pytorch/data`") @@ -47,16 +47,7 @@ def YelpReviewFull(root: str, split: Union[Tuple[str], str]): cache_dp = GDriveReader(cache_dp).end_caching(mode="wb", same_filepath_fn=True) cache_dp = FileOpener(cache_dp, mode="b") - def extracted_filepath_fn(_): - file_path = os.path.join(root, _EXTRACTED_FILES[split]) - dir_path = os.path.dirname(file_path) - if not os.path.exists(dir_path): - os.makedirs(dir_path) - return file_path - - cache_dp = cache_dp.on_disk_cache( - filepath_fn=extracted_filepath_fn - ) + cache_dp = cache_dp.on_disk_cache(filepath_fn=lambda x: os.path.join(root, _EXTRACTED_FILES[split])) cache_dp = cache_dp.read_from_tar().filter(lambda x: _EXTRACTED_FILES[split] in x[0]) cache_dp = cache_dp.end_caching(mode="wb", same_filepath_fn=True) cache_dp = FileOpener(cache_dp, mode="b") From ad4bb57b474206462a22502b33617b604e2d3e80 Mon Sep 17 00:00:00 2001 From: Elijah Rippeth Date: Wed, 19 Jan 2022 14:43:29 -0500 Subject: [PATCH 3/5] rename dps for consistency. --- torchtext/datasets/yelpreviewfull.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/torchtext/datasets/yelpreviewfull.py b/torchtext/datasets/yelpreviewfull.py index db6ab2daf6..46b880897a 100644 --- a/torchtext/datasets/yelpreviewfull.py +++ b/torchtext/datasets/yelpreviewfull.py @@ -40,15 +40,15 @@ def YelpReviewFull(root: str, split: Union[Tuple[str], str]): url_dp = IterableWrapper([URL]) - cache_dp = url_dp.on_disk_cache( + cache_compressed_dp = url_dp.on_disk_cache( filepath_fn=lambda x: os.path.join(root, _PATH), hash_dict={os.path.join(root, _PATH): MD5}, hash_type="md5" ) - cache_dp = GDriveReader(cache_dp).end_caching(mode="wb", same_filepath_fn=True) - cache_dp = FileOpener(cache_dp, mode="b") - - cache_dp = cache_dp.on_disk_cache(filepath_fn=lambda x: os.path.join(root, _EXTRACTED_FILES[split])) - cache_dp = cache_dp.read_from_tar().filter(lambda x: _EXTRACTED_FILES[split] in x[0]) - cache_dp = cache_dp.end_caching(mode="wb", same_filepath_fn=True) - cache_dp = FileOpener(cache_dp, mode="b") - return cache_dp.parse_csv().map(fn=lambda t: (int(t[0]), " ".join(t[1:]))) + cache_compressed_dp = GDriveReader(cache_compressed_dp).end_caching(mode="wb", same_filepath_fn=True) + cache_compressed_dp = FileOpener(cache_compressed_dp, mode="b") + + cache_decompressed_dp = cache_compressed_dp.on_disk_cache(filepath_fn=lambda x: os.path.join(root, _EXTRACTED_FILES[split])) + cache_decompressed_dp = cache_decompressed_dp.read_from_tar().filter(lambda x: _EXTRACTED_FILES[split] in x[0]) + cache_decompressed_dp = cache_decompressed_dp.end_caching(mode="wb", same_filepath_fn=True) + data_dp = FileOpener(cache_decompressed_dp, mode="b") + return data_dp.parse_csv().map(fn=lambda t: (int(t[0]), " ".join(t[1:]))) From fc4fa878aea25fb68f90496f00e3f9ea1cdd50d8 Mon Sep 17 00:00:00 2001 From: Elijah Rippeth Date: Fri, 21 Jan 2022 04:55:17 -0500 Subject: [PATCH 4/5] add FileOpener within caching block for more consistency. --- torchtext/datasets/yelpreviewfull.py | 1 + 1 file changed, 1 insertion(+) diff --git a/torchtext/datasets/yelpreviewfull.py b/torchtext/datasets/yelpreviewfull.py index 46b880897a..ee159c6790 100644 --- a/torchtext/datasets/yelpreviewfull.py +++ b/torchtext/datasets/yelpreviewfull.py @@ -48,6 +48,7 @@ def YelpReviewFull(root: str, split: Union[Tuple[str], str]): cache_compressed_dp = FileOpener(cache_compressed_dp, mode="b") cache_decompressed_dp = cache_compressed_dp.on_disk_cache(filepath_fn=lambda x: os.path.join(root, _EXTRACTED_FILES[split])) + cache_decompressed_dp = FileOpener(cache_decompressed_dp, mode="b") cache_decompressed_dp = cache_decompressed_dp.read_from_tar().filter(lambda x: _EXTRACTED_FILES[split] in x[0]) cache_decompressed_dp = cache_decompressed_dp.end_caching(mode="wb", same_filepath_fn=True) data_dp = FileOpener(cache_decompressed_dp, mode="b") From 21bc3f67c8c24fa9d7d69fec4ff39d7985c9dc40 Mon Sep 17 00:00:00 2001 From: Elijah Rippeth Date: Fri, 21 Jan 2022 10:08:05 -0500 Subject: [PATCH 5/5] remove extraneous FileOpener since it happens in the cache block. --- torchtext/datasets/yelpreviewfull.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/torchtext/datasets/yelpreviewfull.py b/torchtext/datasets/yelpreviewfull.py index ee159c6790..5258cd732c 100644 --- a/torchtext/datasets/yelpreviewfull.py +++ b/torchtext/datasets/yelpreviewfull.py @@ -44,12 +44,13 @@ def YelpReviewFull(root: str, split: Union[Tuple[str], str]): filepath_fn=lambda x: os.path.join(root, _PATH), hash_dict={os.path.join(root, _PATH): MD5}, hash_type="md5" ) - cache_compressed_dp = GDriveReader(cache_compressed_dp).end_caching(mode="wb", same_filepath_fn=True) - cache_compressed_dp = FileOpener(cache_compressed_dp, mode="b") + cache_compressed_dp = GDriveReader(cache_compressed_dp) + cache_compressed_dp = cache_compressed_dp.end_caching(mode="wb", same_filepath_fn=True) cache_decompressed_dp = cache_compressed_dp.on_disk_cache(filepath_fn=lambda x: os.path.join(root, _EXTRACTED_FILES[split])) cache_decompressed_dp = FileOpener(cache_decompressed_dp, mode="b") cache_decompressed_dp = cache_decompressed_dp.read_from_tar().filter(lambda x: _EXTRACTED_FILES[split] in x[0]) cache_decompressed_dp = cache_decompressed_dp.end_caching(mode="wb", same_filepath_fn=True) + data_dp = FileOpener(cache_decompressed_dp, mode="b") return data_dp.parse_csv().map(fn=lambda t: (int(t[0]), " ".join(t[1:])))