From d68c394befc83bb405b244a558a1adeeeda6f69b Mon Sep 17 00:00:00 2001 From: Parmeet Singh Bhatia Date: Mon, 17 Jan 2022 13:38:51 -0500 Subject: [PATCH 1/4] fixing amazonreviewpolarity example --- examples/text/amazonreviewpolarity.py | 28 ++++++++------------------- 1 file changed, 8 insertions(+), 20 deletions(-) diff --git a/examples/text/amazonreviewpolarity.py b/examples/text/amazonreviewpolarity.py index bb810c167..d2abe5ad0 100644 --- a/examples/text/amazonreviewpolarity.py +++ b/examples/text/amazonreviewpolarity.py @@ -37,25 +37,13 @@ def AmazonReviewPolarity(root, split): """Demonstrating caching, extraction and sanity check pipelines.""" url_dp = IterableWrapper([URL]) - # cache data on-disk with sanity check - cache_dp = url_dp.on_disk_cache( + cache_compressed_dp = url_dp.on_disk_cache( filepath_fn=lambda x: os.path.join(root, _PATH), hash_dict={os.path.join(root, _PATH): MD5}, hash_type="md5" ) - cache_dp = GDriveReader(cache_dp).end_caching(mode="wb", same_filepath_fn=True) - - cache_dp = FileOpener(cache_dp, mode="b") - - # stack TAR extractor on top of loader DP - extracted_files = cache_dp.read_from_tar() - - # filter files as necessary - filter_extracted_files = extracted_files.filter(lambda x: split in x[0]) - - # stack sanity checker on top of extracted files - check_filter_extracted_files = filter_extracted_files.check_hash( - {os.path.normpath(os.path.join(root, _EXTRACTED_FILES[split])): _EXTRACTED_FILES_MD5[split]}, - "md5", - ) - - # stack CSV reader and do some mapping - return check_filter_extracted_files.parse_csv().map(fn=lambda t: (int(t[0]), t[1])) + cache_compressed_dp = GDriveReader(cache_compressed_dp).end_caching(mode="wb", same_filepath_fn=True) + cache_decompressed_dp = cache_compressed_dp.on_disk_cache( + filepath_fn=lambda x: os.path.join(root, os.path.dirname(_EXTRACTED_FILES[split]), os.path.basename(x))) + cache_decompressed_dp = FileOpener(cache_decompressed_dp, mode="b").read_from_tar() + cache_compressed_dp = cache_decompressed_dp.end_caching(mode="wb", same_filepath_fn=True) + data_dp = FileOpener(cache_decompressed_dp.filter(lambda x: _EXTRACTED_FILES[split] in x[0]).map(lambda x: x[0]), mode='b') + return data_dp.parse_csv().map(fn=lambda t: (int(t[0]), ' '.join(t[1:]))) From a8439422492d6914f60fc8fd395232f36561c182 Mon Sep 17 00:00:00 2001 From: Parmeet Singh Bhatia Date: Tue, 18 Jan 2022 18:12:23 -0500 Subject: [PATCH 2/4] fix caching issue --- examples/text/amazonreviewpolarity.py | 33 ++++++++++++++++++--------- 1 file changed, 22 insertions(+), 11 deletions(-) diff --git a/examples/text/amazonreviewpolarity.py b/examples/text/amazonreviewpolarity.py index d2abe5ad0..eccc3dbd9 100644 --- a/examples/text/amazonreviewpolarity.py +++ b/examples/text/amazonreviewpolarity.py @@ -18,13 +18,8 @@ _PATH = "amazon_review_polarity_csv.tar.gz" _EXTRACTED_FILES = { - "train": os.path.join(_PATH, "amazon_review_polarity_csv", "train.csv"), - "test": os.path.join(_PATH, "amazon_review_polarity_csv", "test.csv"), -} - -_EXTRACTED_FILES_MD5 = { - "train": "520937107c39a2d1d1f66cd410e9ed9e", - "test": "f4c8bded2ecbde5f996b675db6228f16", + "train": os.path.join("amazon_review_polarity_csv", "train.csv"), + "test": os.path.join("amazon_review_polarity_csv", "test.csv"), } DATASET_NAME = "AmazonReviewPolarity" @@ -41,9 +36,25 @@ def AmazonReviewPolarity(root, split): filepath_fn=lambda x: os.path.join(root, _PATH), hash_dict={os.path.join(root, _PATH): MD5}, hash_type="md5" ) cache_compressed_dp = GDriveReader(cache_compressed_dp).end_caching(mode="wb", same_filepath_fn=True) + + def extracted_filepath_fn(x): + file_path = os.path.join(root, _EXTRACTED_FILES[split]) + dir_path = os.path.dirname(file_path) + if not os.path.exists(dir_path): + os.makedirs(dir_path) + return file_path + cache_decompressed_dp = cache_compressed_dp.on_disk_cache( - filepath_fn=lambda x: os.path.join(root, os.path.dirname(_EXTRACTED_FILES[split]), os.path.basename(x))) - cache_decompressed_dp = FileOpener(cache_decompressed_dp, mode="b").read_from_tar() - cache_compressed_dp = cache_decompressed_dp.end_caching(mode="wb", same_filepath_fn=True) - data_dp = FileOpener(cache_decompressed_dp.filter(lambda x: _EXTRACTED_FILES[split] in x[0]).map(lambda x: x[0]), mode='b') + filepath_fn=extracted_filepath_fn) + cache_decompressed_dp = FileOpener(cache_decompressed_dp, mode="b").\ + read_from_tar().\ + filter(lambda x: _EXTRACTED_FILES[split] in x[0]).\ + map(lambda x: (x[0].replace('_PATH' + '/', ''), x[1])) + cache_decompressed_dp = cache_decompressed_dp.end_caching(mode="wb", same_filepath_fn=True) + data_dp = FileOpener(cache_decompressed_dp, mode='b') + + # data_dp = FileOpener(cache_compressed_dp, mode='b') + # data_dp = data_dp.read_from_tar() + # data_dp = data_dp.filter(lambda x: _EXTRACTED_FILES[split] in x[0]) + return data_dp.parse_csv().map(fn=lambda t: (int(t[0]), ' '.join(t[1:]))) From 3f758cc5c49e2746ef318cdf91bca823f5cf5869 Mon Sep 17 00:00:00 2001 From: Parmeet Singh Bhatia Date: Wed, 19 Jan 2022 13:12:22 -0500 Subject: [PATCH 3/4] removing explicit directory creation --- examples/text/amazonreviewpolarity.py | 23 ++++------------------- 1 file changed, 4 insertions(+), 19 deletions(-) diff --git a/examples/text/amazonreviewpolarity.py b/examples/text/amazonreviewpolarity.py index eccc3dbd9..aac102fb8 100644 --- a/examples/text/amazonreviewpolarity.py +++ b/examples/text/amazonreviewpolarity.py @@ -36,25 +36,10 @@ def AmazonReviewPolarity(root, split): filepath_fn=lambda x: os.path.join(root, _PATH), hash_dict={os.path.join(root, _PATH): MD5}, hash_type="md5" ) cache_compressed_dp = GDriveReader(cache_compressed_dp).end_caching(mode="wb", same_filepath_fn=True) - - def extracted_filepath_fn(x): - file_path = os.path.join(root, _EXTRACTED_FILES[split]) - dir_path = os.path.dirname(file_path) - if not os.path.exists(dir_path): - os.makedirs(dir_path) - return file_path - - cache_decompressed_dp = cache_compressed_dp.on_disk_cache( - filepath_fn=extracted_filepath_fn) - cache_decompressed_dp = FileOpener(cache_decompressed_dp, mode="b").\ - read_from_tar().\ - filter(lambda x: _EXTRACTED_FILES[split] in x[0]).\ - map(lambda x: (x[0].replace('_PATH' + '/', ''), x[1])) + + cache_decompressed_dp = cache_compressed_dp.on_disk_cache(filepath_fn=lambda x: os.path.join(root, _EXTRACTED_FILES[split])) + cache_decompressed_dp = FileOpener(cache_decompressed_dp, mode="b").read_from_tar().filter(lambda x: _EXTRACTED_FILES[split] in x[0]) cache_decompressed_dp = cache_decompressed_dp.end_caching(mode="wb", same_filepath_fn=True) + data_dp = FileOpener(cache_decompressed_dp, mode='b') - - # data_dp = FileOpener(cache_compressed_dp, mode='b') - # data_dp = data_dp.read_from_tar() - # data_dp = data_dp.filter(lambda x: _EXTRACTED_FILES[split] in x[0]) - return data_dp.parse_csv().map(fn=lambda t: (int(t[0]), ' '.join(t[1:]))) From 1b4d7e717963a4bd7ed1970efe0d1490366741e0 Mon Sep 17 00:00:00 2001 From: Parmeet Singh Bhatia Date: Wed, 19 Jan 2022 13:42:27 -0500 Subject: [PATCH 4/4] fix linter --- examples/text/amazonreviewpolarity.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/examples/text/amazonreviewpolarity.py b/examples/text/amazonreviewpolarity.py index aac102fb8..8571da555 100644 --- a/examples/text/amazonreviewpolarity.py +++ b/examples/text/amazonreviewpolarity.py @@ -36,10 +36,14 @@ def AmazonReviewPolarity(root, split): filepath_fn=lambda x: os.path.join(root, _PATH), hash_dict={os.path.join(root, _PATH): MD5}, hash_type="md5" ) cache_compressed_dp = GDriveReader(cache_compressed_dp).end_caching(mode="wb", same_filepath_fn=True) - - cache_decompressed_dp = cache_compressed_dp.on_disk_cache(filepath_fn=lambda x: os.path.join(root, _EXTRACTED_FILES[split])) - cache_decompressed_dp = FileOpener(cache_decompressed_dp, mode="b").read_from_tar().filter(lambda x: _EXTRACTED_FILES[split] in x[0]) + + cache_decompressed_dp = cache_compressed_dp.on_disk_cache( + filepath_fn=lambda x: os.path.join(root, _EXTRACTED_FILES[split]) + ) + cache_decompressed_dp = ( + FileOpener(cache_decompressed_dp, mode="b").read_from_tar().filter(lambda x: _EXTRACTED_FILES[split] in x[0]) + ) cache_decompressed_dp = cache_decompressed_dp.end_caching(mode="wb", same_filepath_fn=True) - - data_dp = FileOpener(cache_decompressed_dp, mode='b') - return data_dp.parse_csv().map(fn=lambda t: (int(t[0]), ' '.join(t[1:]))) + + data_dp = FileOpener(cache_decompressed_dp, mode="b") + return data_dp.parse_csv().map(fn=lambda t: (int(t[0]), " ".join(t[1:])))