From eff55d391cc23eeb60d5f4aaf3ea88849110b617 Mon Sep 17 00:00:00 2001 From: Kushashwa Ravi Shrimali Date: Mon, 8 Nov 2021 10:26:09 +0530 Subject: [PATCH 01/28] WIP: Sintel Dataset --- .../prototype/datasets/_builtin/__init__.py | 1 + .../prototype/datasets/_builtin/sintel.py | 123 ++++++++++++++++++ 2 files changed, 124 insertions(+) create mode 100644 torchvision/prototype/datasets/_builtin/sintel.py diff --git a/torchvision/prototype/datasets/_builtin/__init__.py b/torchvision/prototype/datasets/_builtin/__init__.py index 62abc3119f6..589fecf0323 100644 --- a/torchvision/prototype/datasets/_builtin/__init__.py +++ b/torchvision/prototype/datasets/_builtin/__init__.py @@ -6,4 +6,5 @@ from .mnist import MNIST, FashionMNIST, KMNIST, EMNIST, QMNIST from .sbd import SBD from .semeion import SEMEION +from .sintel import SINTEL from .voc import VOC diff --git a/torchvision/prototype/datasets/_builtin/sintel.py b/torchvision/prototype/datasets/_builtin/sintel.py new file mode 100644 index 00000000000..89d05ea8a43 --- /dev/null +++ b/torchvision/prototype/datasets/_builtin/sintel.py @@ -0,0 +1,123 @@ +# Sintel Optical Flow Dataset +import io +from typing import Any, Callable, Dict, List, Optional, Tuple, Iterator +import pathlib + +import torch +from torchdata.datapipes.iter import ( + IterDataPipe, + Demultiplexer, + Mapper, + Shuffler, + Zipper, + ZipArchiveReader, +) +from torchvision.prototype.datasets.decoder import raw +from torchvision.prototype.datasets.utils import ( + Dataset, + DatasetConfig, + DatasetInfo, + HttpResource, + OnlineResource, + DatasetType, +) +from torchvision.prototype.datasets.utils._internal import ( + image_buffer_from_array, + Decompressor, + INFINITE_BUFFER_SIZE, +) + + +# WIP +class FlowDatasetReader(IterDataPipe[torch.Tensor]): + def __init__( + self, + images_datapipe: IterDataPipe[Tuple[Any, io.IOBase]], + labels_datapipe: IterDataPipe[Tuple[Any, io.IOBase]] + ) -> None: + self.images_datapipe = images_datapipe + self.labels_datapipe = labels_datapipe + + def __iter__(self) -> Iterator[torch.Tensor]: + count_images = 0 + for _, file in self.images_datapipe: + count_images += 1 + + count_labels = 0 + for _, file in self.labels_datapipe: + count_labels += 1 + + print(count_images, count_labels) + pass + + +class SINTEL(Dataset): + def _make_info(self) -> DatasetInfo: + return DatasetInfo( + "sintel", + type=DatasetType.IMAGE, + homepage="", + valid_options=dict( + split=("train", "test"), + ), + ) + + def resources(self, config: DatasetConfig) -> List[OnlineResource]: + # training_images_archive = HttpResource( + # "", + # sha256="", + # ) + # testing_images_archive = HttpResource( + # "", + # sha256="", + # ) + archive = HttpResource( + "http://sintel.cs.washington.edu/MPI-Sintel-complete.zip", + sha256="", + ) + # return [training_images_archive, testing_images_archive] + return [archive] + + def _collate_and_decode_sample( + self, + data: Tuple[str, ...], + *, + decoder: Optional[Callable[[io.IOBase], torch.Tensor]], + ) -> Dict[str, Any]: + print(data) + + def _classify_archive(self, data: Dict[str, Any]): + path = pathlib.Path(data[0]) + if ".png" in path.name: + return 0 + elif ".flo" in path.name: + return 1 + else: + return None + + def _make_datapipe( + self, + resource_dps: List[IterDataPipe], + *, + config: DatasetConfig, + decoder: Optional[Callable[[io.IOBase], torch.Tensor]] + ) -> IterDataPipe[Dict[str, Any]]: + dp = resource_dps[0] + dp = ZipArchiveReader(dp) + images_dp, flo_dp = Demultiplexer( + dp, + 2, + self._classify_archive, + drop_none=True, + buffer_size=INFINITE_BUFFER_SIZE, + ) + + # images_dp = Decompressor(images_dp) + # flo_dp = Decompressor(flo_dp) + # images_dp, flo_dp = FlowDatasetReader(images_dp, flo_dp) + + # dp = Zipper(images_dp, flo_dp) + images_dp = Shuffler(images_dp, buffer_size=INFINITE_BUFFER_SIZE) + flo_dp = Shuffler(flo_dp, buffer_size=INFINITE_BUFFER_SIZE) + # images_dp = Mapper(images_dp, self._collate_and_decode_sample, fn_kwargs=dict(decoder=decoder)) + return [images_dp, flo_dp] From 61831dc9d3f849cf53ffa1a411f4fa8a9746b22c Mon Sep 17 00:00:00 2001 From: Kushashwa Ravi Shrimali Date: Mon, 8 Nov 2021 13:56:29 +0530 Subject: [PATCH 02/28] Failing to read streamwrapper object in Python --- .../prototype/datasets/_builtin/sintel.py | 100 +++++++++++++----- 1 file changed, 71 insertions(+), 29 deletions(-) diff --git a/torchvision/prototype/datasets/_builtin/sintel.py b/torchvision/prototype/datasets/_builtin/sintel.py index 89d05ea8a43..ccdafd95111 100644 --- a/torchvision/prototype/datasets/_builtin/sintel.py +++ b/torchvision/prototype/datasets/_builtin/sintel.py @@ -1,15 +1,20 @@ -# Sintel Optical Flow Dataset import io -from typing import Any, Callable, Dict, List, Optional, Tuple, Iterator +from typing import Any, Callable, Dict, List, Optional, Tuple, Iterator, Iterable import pathlib +from functools import partial +import re import torch +from PIL import Image +import numpy as np + from torchdata.datapipes.iter import ( IterDataPipe, Demultiplexer, Mapper, Shuffler, Zipper, + LineReader, ZipArchiveReader, ) from torchvision.prototype.datasets.decoder import raw @@ -26,9 +31,9 @@ Decompressor, INFINITE_BUFFER_SIZE, ) +from torchvision import transforms -# WIP class FlowDatasetReader(IterDataPipe[torch.Tensor]): def __init__( self, @@ -41,7 +46,7 @@ def __init__( def __iter__(self) -> Iterator[torch.Tensor]: count_images = 0 for _, file in self.images_datapipe: - count_images += 1 + yield self._read_image(file) count_labels = 0 for _, file in self.labels_datapipe: @@ -59,23 +64,16 @@ def _make_info(self) -> DatasetInfo: homepage="", valid_options=dict( split=("train", "test"), - ), + pass_=("clean", "final"), + ) ) def resources(self, config: DatasetConfig) -> List[OnlineResource]: - # training_images_archive = HttpResource( - # "", - # sha256="", - # ) - # testing_images_archive = HttpResource( - # "", - # sha256="", - # ) archive = HttpResource( "http://sintel.cs.washington.edu/MPI-Sintel-complete.zip", sha256="", ) - # return [training_images_archive, testing_images_archive] + # return [training_images_archive, testing_images_archive], self._collate_and_decode_sample, fn_kwargs=dict(decoder=decoder)) return [archive] def _collate_and_decode_sample( @@ -84,17 +82,50 @@ def _collate_and_decode_sample( *, decoder: Optional[Callable[[io.IOBase], torch.Tensor]], ) -> Dict[str, Any]: - print(data) + return data + + def _classify_train_test(self, data: Dict[str, Any], *, config: DatasetConfig): + path = pathlib.Path(data[0]) + path_str = str(path.absolute()) + if "/training/" in path_str: + return 0 + elif "/test/" in path_str: + return 1 + else: + return None - def _classify_archive(self, data: Dict[str, Any]): + def _classify_archive(self, data: Dict[str, Any], *, config: DatasetConfig): path = pathlib.Path(data[0]) - if ".png" in path.name: + path_str = str(path.absolute()) + if config.pass_ in path_str and ".png" in path_str: return 0 - elif ".flo" in path.name: + elif ".flo" in path_str: return 1 else: return None + def read_images(self, data: IterDataPipe[Tuple[Any, io.IOBase]]) -> Iterable[torch.Tensor]: + count_images = 0 + for _, file in data: + img = Image.open(file) + to_tensor = transforms.ToTensor() + count_images += 1 + yield to_tensor(img) + + def read_flo(self, data: IterDataPipe[Tuple[Any, io.IOBase]], config) -> Iterable[np.ndarray]: + count_flo = 0 + for _, file in data: + with open(file.name, "rb") as f: + magic = np.fromfile(f, np.float32, count=1) + if 202021.25 != magic: + raise ValueError("Magic number incorrect. Invalid .flo file") + + w = int(np.fromfile(f, np.int32, count=1)) + h = int(np.fromfile(f, np.int32, count=1)) + _data = np.fromfile(f, np.float32, count=2 * w * h) + count_flo += 1 + yield _data.reshape(2, h, w) + def _make_datapipe( self, resource_dps: List[IterDataPipe], @@ -103,21 +134,32 @@ def _make_datapipe( decoder: Optional[Callable[[io.IOBase], torch.Tensor]] ) -> IterDataPipe[Dict[str, Any]]: dp = resource_dps[0] - dp = ZipArchiveReader(dp) - images_dp, flo_dp = Demultiplexer( - dp, + archive_dp = ZipArchiveReader(dp) + + train_dp, test_dp = Demultiplexer( + archive_dp, 2, - self._classify_archive, + partial(self._classify_train_test, config=config), drop_none=True, buffer_size=INFINITE_BUFFER_SIZE, ) - # images_dp = Decompressor(images_dp) - # flo_dp = Decompressor(flo_dp) - # images_dp, flo_dp = FlowDatasetReader(images_dp, flo_dp) + if config.split == "train": + curr_split = train_dp + else: + curr_split = test_dp + + pass_images_dp, flo_dp = Demultiplexer( + curr_split, + 2, + partial(self._classify_archive, config=config), + drop_none=True, + buffer_size=INFINITE_BUFFER_SIZE, + ) + pass_images_dp = self.read_images(pass_images_dp) + flo_dp = self.read_flo(flo_dp, config) - # dp = Zipper(images_dp, flo_dp) - images_dp = Shuffler(images_dp, buffer_size=INFINITE_BUFFER_SIZE) + images_dp = Shuffler(pass_images_dp, buffer_size=INFINITE_BUFFER_SIZE) flo_dp = Shuffler(flo_dp, buffer_size=INFINITE_BUFFER_SIZE) - # images_dp = Mapper(images_dp, self._collate_and_decode_sample, fn_kwargs=dict(decoder=decoder)) - return [images_dp, flo_dp] + zipped_dp = Zipper(images_dp, flo_dp) + return Mapper(zipped_dp, self._collate_and_decode_sample, fn_kwargs=dict(decoder=decoder)) From 12b591583e17a5f7c9d321752c5e1b5a7fa47d90 Mon Sep 17 00:00:00 2001 From: Kushashwa Ravi Shrimali Date: Tue, 9 Nov 2021 16:12:27 +0530 Subject: [PATCH 03/28] KeyZipper updates --- .../prototype/datasets/_builtin/sintel.py | 114 +++++++++++------- 1 file changed, 71 insertions(+), 43 deletions(-) diff --git a/torchvision/prototype/datasets/_builtin/sintel.py b/torchvision/prototype/datasets/_builtin/sintel.py index ccdafd95111..5354e9e60db 100644 --- a/torchvision/prototype/datasets/_builtin/sintel.py +++ b/torchvision/prototype/datasets/_builtin/sintel.py @@ -1,5 +1,5 @@ import io -from typing import Any, Callable, Dict, List, Optional, Tuple, Iterator, Iterable +from typing import Any, Callable, Dict, List, Optional, Tuple, Iterator, Iterable, TypeVar import pathlib from functools import partial import re @@ -10,10 +10,12 @@ from torchdata.datapipes.iter import ( IterDataPipe, + IterableWrapper, Demultiplexer, Mapper, Shuffler, Zipper, + KeyZipper, LineReader, ZipArchiveReader, ) @@ -33,27 +35,30 @@ ) from torchvision import transforms +T = TypeVar("T") -class FlowDatasetReader(IterDataPipe[torch.Tensor]): - def __init__( - self, - images_datapipe: IterDataPipe[Tuple[Any, io.IOBase]], - labels_datapipe: IterDataPipe[Tuple[Any, io.IOBase]] - ) -> None: - self.images_datapipe = images_datapipe - self.labels_datapipe = labels_datapipe +FILE_NAME_PATTERN = re.compile(r"(frame|image)_(?P\d+)[.](flo|png)") + +try: + from itertools import pairwise +except ImportError: + from itertools import tee + + def pairwise(iterable: Iterable[T]) -> Iterable[Tuple[T, T]]: + a, b = tee(iterable) + next(b, None) + return zip(a, b) - def __iter__(self) -> Iterator[torch.Tensor]: - count_images = 0 - for _, file in self.images_datapipe: - yield self._read_image(file) +class IntCategoryGrouper(IterDataPipe[Tuple[Tuple[str, T], Tuple[str, T]]]): + def __init__(self, datapipe: IterDataPipe[Tuple[str, T]]) -> None: + self.datapipe = datapipe - count_labels = 0 - for _, file in self.labels_datapipe: - count_labels += 1 + def __iter__(self): + for item1, item2 in pairwise(sorted(self.datapipe)): + if pathlib.Path(item1[0]).parent != pathlib.Path(item2[0]).parent: + continue - print(count_images, count_labels) - pass + yield item1, item2 class SINTEL(Dataset): @@ -82,7 +87,23 @@ def _collate_and_decode_sample( *, decoder: Optional[Callable[[io.IOBase], torch.Tensor]], ) -> Dict[str, Any]: - return data + # Read images and flo file here + # Use decoder for images if available + # Return dict + flo, images = data + img1, img2 = images + + path1, buffer1 = img1 + path2, buffer2 = img2 + + flow_arr = self.read_flo(flo) + + print(buffer1, buffer2) + return dict( + image1=decoder(buffer1) if decoder else buffer1, + image2=decoder(buffer2) if decoder else buffer2, + label=flow_arr, + ) def _classify_train_test(self, data: Dict[str, Any], *, config: DatasetConfig): path = pathlib.Path(data[0]) @@ -104,27 +125,29 @@ def _classify_archive(self, data: Dict[str, Any], *, config: DatasetConfig): else: return None - def read_images(self, data: IterDataPipe[Tuple[Any, io.IOBase]]) -> Iterable[torch.Tensor]: - count_images = 0 - for _, file in data: - img = Image.open(file) - to_tensor = transforms.ToTensor() - count_images += 1 - yield to_tensor(img) - - def read_flo(self, data: IterDataPipe[Tuple[Any, io.IOBase]], config) -> Iterable[np.ndarray]: + def read_flo(self, data: IterDataPipe[Tuple[Any, io.IOBase]]) -> Iterable[np.ndarray]: count_flo = 0 for _, file in data: - with open(file.name, "rb") as f: - magic = np.fromfile(f, np.float32, count=1) - if 202021.25 != magic: - raise ValueError("Magic number incorrect. Invalid .flo file") + f = file.file_obj + magic = np.fromfile(f, np.float32, count=1) + if 202021.25 != magic: + raise ValueError("Magic number incorrect. Invalid .flo file") + + w = int(np.fromfile(f, np.int32, count=1)) + h = int(np.fromfile(f, np.int32, count=1)) + _data = np.fromfile(f, np.float32, count=2 * w * h) + count_flo += 1 + yield _data.reshape(2, h, w) + + def flows_key(self, data: Tuple[str, Any]) -> Tuple[str, int]: + path = pathlib.Path(data[0]) + print('path: ', path.name) + category = path.parent.name + idx = int(FILE_NAME_PATTERN.match(path.name).group("idx")) # type: ignore[union-attr] + return category, idx - w = int(np.fromfile(f, np.int32, count=1)) - h = int(np.fromfile(f, np.int32, count=1)) - _data = np.fromfile(f, np.float32, count=2 * w * h) - count_flo += 1 - yield _data.reshape(2, h, w) + def images_key(self, data: Tuple[Tuple[str, Any], Tuple[str, Any]]) -> Tuple[str, int]: + return self.flows_key(data[0]) def _make_datapipe( self, @@ -132,10 +155,11 @@ def _make_datapipe( *, config: DatasetConfig, decoder: Optional[Callable[[io.IOBase], torch.Tensor]] - ) -> IterDataPipe[Dict[str, Any]]: + ) -> IterDataPipe[Tuple[Dict[str, Any], Dict[str, Any]]]: dp = resource_dps[0] archive_dp = ZipArchiveReader(dp) + # Use a Filter train_dp, test_dp = Demultiplexer( archive_dp, 2, @@ -156,10 +180,14 @@ def _make_datapipe( drop_none=True, buffer_size=INFINITE_BUFFER_SIZE, ) - pass_images_dp = self.read_images(pass_images_dp) - flo_dp = self.read_flo(flo_dp, config) - - images_dp = Shuffler(pass_images_dp, buffer_size=INFINITE_BUFFER_SIZE) flo_dp = Shuffler(flo_dp, buffer_size=INFINITE_BUFFER_SIZE) - zipped_dp = Zipper(images_dp, flo_dp) + + pass_images_dp = IntCategoryGrouper(pass_images_dp) + zipped_dp = KeyZipper( + flo_dp, + pass_images_dp, + key_fn=self.flows_key, + ref_key_fn=self.images_key, + ) + return Mapper(zipped_dp, self._collate_and_decode_sample, fn_kwargs=dict(decoder=decoder)) From 1b690ac6a91fd41065b86089d0d93ee2299fa365 Mon Sep 17 00:00:00 2001 From: Kushashwa Ravi Shrimali Date: Tue, 9 Nov 2021 19:27:51 +0530 Subject: [PATCH 04/28] seek of closed file error for now --- torchvision/prototype/datasets/_builtin/sintel.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/torchvision/prototype/datasets/_builtin/sintel.py b/torchvision/prototype/datasets/_builtin/sintel.py index 5354e9e60db..962e886a49a 100644 --- a/torchvision/prototype/datasets/_builtin/sintel.py +++ b/torchvision/prototype/datasets/_builtin/sintel.py @@ -93,12 +93,13 @@ def _collate_and_decode_sample( flo, images = data img1, img2 = images + print(img1) path1, buffer1 = img1 path2, buffer2 = img2 flow_arr = self.read_flo(flo) + obj = Image.open(buffer1) - print(buffer1, buffer2) return dict( image1=decoder(buffer1) if decoder else buffer1, image2=decoder(buffer2) if decoder else buffer2, @@ -141,7 +142,6 @@ def read_flo(self, data: IterDataPipe[Tuple[Any, io.IOBase]]) -> Iterable[np.nda def flows_key(self, data: Tuple[str, Any]) -> Tuple[str, int]: path = pathlib.Path(data[0]) - print('path: ', path.name) category = path.parent.name idx = int(FILE_NAME_PATTERN.match(path.name).group("idx")) # type: ignore[union-attr] return category, idx @@ -180,7 +180,7 @@ def _make_datapipe( drop_none=True, buffer_size=INFINITE_BUFFER_SIZE, ) - flo_dp = Shuffler(flo_dp, buffer_size=INFINITE_BUFFER_SIZE) + # flo_dp = Shuffler(flo_dp, buffer_size=INFINITE_BUFFER_SIZE) pass_images_dp = IntCategoryGrouper(pass_images_dp) zipped_dp = KeyZipper( From 6f371c795b337ecf029ac995ada33e5eb315dce3 Mon Sep 17 00:00:00 2001 From: Kushashwa Ravi Shrimali Date: Tue, 9 Nov 2021 20:29:24 +0530 Subject: [PATCH 05/28] Working... --- .../prototype/datasets/_builtin/sintel.py | 26 +++---------------- 1 file changed, 4 insertions(+), 22 deletions(-) diff --git a/torchvision/prototype/datasets/_builtin/sintel.py b/torchvision/prototype/datasets/_builtin/sintel.py index 962e886a49a..5e54ebc0144 100644 --- a/torchvision/prototype/datasets/_builtin/sintel.py +++ b/torchvision/prototype/datasets/_builtin/sintel.py @@ -14,6 +14,7 @@ Demultiplexer, Mapper, Shuffler, + Filter, Zipper, KeyZipper, LineReader, @@ -78,7 +79,6 @@ def resources(self, config: DatasetConfig) -> List[OnlineResource]: "http://sintel.cs.washington.edu/MPI-Sintel-complete.zip", sha256="", ) - # return [training_images_archive, testing_images_archive], self._collate_and_decode_sample, fn_kwargs=dict(decoder=decoder)) return [archive] def _collate_and_decode_sample( @@ -108,13 +108,7 @@ def _collate_and_decode_sample( def _classify_train_test(self, data: Dict[str, Any], *, config: DatasetConfig): path = pathlib.Path(data[0]) - path_str = str(path.absolute()) - if "/training/" in path_str: - return 0 - elif "/test/" in path_str: - return 1 - else: - return None + return config.split in str(path.parent) def _classify_archive(self, data: Dict[str, Any], *, config: DatasetConfig): path = pathlib.Path(data[0]) @@ -159,19 +153,7 @@ def _make_datapipe( dp = resource_dps[0] archive_dp = ZipArchiveReader(dp) - # Use a Filter - train_dp, test_dp = Demultiplexer( - archive_dp, - 2, - partial(self._classify_train_test, config=config), - drop_none=True, - buffer_size=INFINITE_BUFFER_SIZE, - ) - - if config.split == "train": - curr_split = train_dp - else: - curr_split = test_dp + curr_split = Filter(archive_dp, partial(self._classify_train_test, config=config)) pass_images_dp, flo_dp = Demultiplexer( curr_split, @@ -180,7 +162,7 @@ def _make_datapipe( drop_none=True, buffer_size=INFINITE_BUFFER_SIZE, ) - # flo_dp = Shuffler(flo_dp, buffer_size=INFINITE_BUFFER_SIZE) + flo_dp = Shuffler(flo_dp, buffer_size=INFINITE_BUFFER_SIZE) pass_images_dp = IntCategoryGrouper(pass_images_dp) zipped_dp = KeyZipper( From 081c70f6509db0d0c2558afa76028ee98e4d4b3d Mon Sep 17 00:00:00 2001 From: Kushashwa Ravi Shrimali Date: Tue, 9 Nov 2021 21:21:06 +0530 Subject: [PATCH 06/28] Rearranging functions --- .../prototype/datasets/_builtin/sintel.py | 57 +++++++++---------- 1 file changed, 26 insertions(+), 31 deletions(-) diff --git a/torchvision/prototype/datasets/_builtin/sintel.py b/torchvision/prototype/datasets/_builtin/sintel.py index 5e54ebc0144..22e9877b95e 100644 --- a/torchvision/prototype/datasets/_builtin/sintel.py +++ b/torchvision/prototype/datasets/_builtin/sintel.py @@ -81,31 +81,6 @@ def resources(self, config: DatasetConfig) -> List[OnlineResource]: ) return [archive] - def _collate_and_decode_sample( - self, - data: Tuple[str, ...], - *, - decoder: Optional[Callable[[io.IOBase], torch.Tensor]], - ) -> Dict[str, Any]: - # Read images and flo file here - # Use decoder for images if available - # Return dict - flo, images = data - img1, img2 = images - - print(img1) - path1, buffer1 = img1 - path2, buffer2 = img2 - - flow_arr = self.read_flo(flo) - obj = Image.open(buffer1) - - return dict( - image1=decoder(buffer1) if decoder else buffer1, - image2=decoder(buffer2) if decoder else buffer2, - label=flow_arr, - ) - def _classify_train_test(self, data: Dict[str, Any], *, config: DatasetConfig): path = pathlib.Path(data[0]) return config.split in str(path.parent) @@ -120,7 +95,7 @@ def _classify_archive(self, data: Dict[str, Any], *, config: DatasetConfig): else: return None - def read_flo(self, data: IterDataPipe[Tuple[Any, io.IOBase]]) -> Iterable[np.ndarray]: + def _read_flo(self, data: IterDataPipe[Tuple[Any, io.IOBase]]) -> Iterable[np.ndarray]: count_flo = 0 for _, file in data: f = file.file_obj @@ -134,14 +109,34 @@ def read_flo(self, data: IterDataPipe[Tuple[Any, io.IOBase]]) -> Iterable[np.nda count_flo += 1 yield _data.reshape(2, h, w) - def flows_key(self, data: Tuple[str, Any]) -> Tuple[str, int]: + def _flows_key(self, data: Tuple[str, Any]) -> Tuple[str, int]: path = pathlib.Path(data[0]) category = path.parent.name idx = int(FILE_NAME_PATTERN.match(path.name).group("idx")) # type: ignore[union-attr] return category, idx - def images_key(self, data: Tuple[Tuple[str, Any], Tuple[str, Any]]) -> Tuple[str, int]: - return self.flows_key(data[0]) + def _images_key(self, data: Tuple[Tuple[str, Any], Tuple[str, Any]]) -> Tuple[str, int]: + return self._flows_key(data[0]) + + def _collate_and_decode_sample( + self, + data: Tuple[str, ...], + *, + decoder: Optional[Callable[[io.IOBase], torch.Tensor]], + ) -> Dict[str, Any]: + flo, images = data + img1, img2 = images + + path1, buffer1 = img1 + path2, buffer2 = img2 + + flow_arr = self._read_flo(flo) + + return dict( + image1=decoder(buffer1) if decoder else buffer1, + image2=decoder(buffer2) if decoder else buffer2, + label=flow_arr, + ) def _make_datapipe( self, @@ -168,8 +163,8 @@ def _make_datapipe( zipped_dp = KeyZipper( flo_dp, pass_images_dp, - key_fn=self.flows_key, - ref_key_fn=self.images_key, + key_fn=self._flows_key, + ref_key_fn=self._images_key, ) return Mapper(zipped_dp, self._collate_and_decode_sample, fn_kwargs=dict(decoder=decoder)) From 6c106e7d2c1fe28c98086bd7db371ebc2a5c712e Mon Sep 17 00:00:00 2001 From: Kushashwa Ravi Shrimali Date: Wed, 10 Nov 2021 12:30:27 +0530 Subject: [PATCH 07/28] Fix mypy failures, minor edits --- .../prototype/datasets/_builtin/sintel.py | 103 +++++++++--------- 1 file changed, 50 insertions(+), 53 deletions(-) diff --git a/torchvision/prototype/datasets/_builtin/sintel.py b/torchvision/prototype/datasets/_builtin/sintel.py index 22e9877b95e..a92a463df07 100644 --- a/torchvision/prototype/datasets/_builtin/sintel.py +++ b/torchvision/prototype/datasets/_builtin/sintel.py @@ -1,26 +1,20 @@ import io -from typing import Any, Callable, Dict, List, Optional, Tuple, Iterator, Iterable, TypeVar import pathlib -from functools import partial import re +from functools import partial +from typing import Any, Callable, Dict, List, Optional, Tuple, Iterator, Iterable, TypeVar, Union -import torch -from PIL import Image import numpy as np - +import torch from torchdata.datapipes.iter import ( IterDataPipe, - IterableWrapper, Demultiplexer, Mapper, Shuffler, Filter, - Zipper, - KeyZipper, - LineReader, + IterKeyZipper, ZipArchiveReader, ) -from torchvision.prototype.datasets.decoder import raw from torchvision.prototype.datasets.utils import ( Dataset, DatasetConfig, @@ -29,19 +23,14 @@ OnlineResource, DatasetType, ) -from torchvision.prototype.datasets.utils._internal import ( - image_buffer_from_array, - Decompressor, - INFINITE_BUFFER_SIZE, -) -from torchvision import transforms +from torchvision.prototype.datasets.utils._internal import INFINITE_BUFFER_SIZE T = TypeVar("T") FILE_NAME_PATTERN = re.compile(r"(frame|image)_(?P\d+)[.](flo|png)") try: - from itertools import pairwise + from itertools import pairwise # type: ignore except ImportError: from itertools import tee @@ -50,11 +39,12 @@ def pairwise(iterable: Iterable[T]) -> Iterable[Tuple[T, T]]: next(b, None) return zip(a, b) + class IntCategoryGrouper(IterDataPipe[Tuple[Tuple[str, T], Tuple[str, T]]]): def __init__(self, datapipe: IterDataPipe[Tuple[str, T]]) -> None: self.datapipe = datapipe - def __iter__(self): + def __iter__(self) -> Iterator[Tuple[Tuple[str, Any], Tuple[str, Any]]]: for item1, item2 in pairwise(sorted(self.datapipe)): if pathlib.Path(item1[0]).parent != pathlib.Path(item2[0]).parent: continue @@ -71,7 +61,7 @@ def _make_info(self) -> DatasetInfo: valid_options=dict( split=("train", "test"), pass_=("clean", "final"), - ) + ), ) def resources(self, config: DatasetConfig) -> List[OnlineResource]: @@ -81,33 +71,28 @@ def resources(self, config: DatasetConfig) -> List[OnlineResource]: ) return [archive] - def _classify_train_test(self, data: Dict[str, Any], *, config: DatasetConfig): + def _classify_train_test(self, data: Tuple[str, Any], *, config: DatasetConfig) -> bool: path = pathlib.Path(data[0]) return config.split in str(path.parent) - def _classify_archive(self, data: Dict[str, Any], *, config: DatasetConfig): + def _classify_archive(self, data: Tuple[str, Any], *, config: DatasetConfig) -> Union[int, None]: path = pathlib.Path(data[0]) - path_str = str(path.absolute()) - if config.pass_ in path_str and ".png" in path_str: + if config.pass_ == path.parent.parent.name and path.suffix == ".png": return 0 - elif ".flo" in path_str: + elif path.suffix == ".flo": return 1 else: return None - def _read_flo(self, data: IterDataPipe[Tuple[Any, io.IOBase]]) -> Iterable[np.ndarray]: - count_flo = 0 - for _, file in data: - f = file.file_obj - magic = np.fromfile(f, np.float32, count=1) - if 202021.25 != magic: - raise ValueError("Magic number incorrect. Invalid .flo file") - - w = int(np.fromfile(f, np.int32, count=1)) - h = int(np.fromfile(f, np.int32, count=1)) - _data = np.fromfile(f, np.float32, count=2 * w * h) - count_flo += 1 - yield _data.reshape(2, h, w) + def _read_flo(self, file: IterDataPipe[Tuple[Any, io.IOBase]]) -> Any: + magic = file.read(4) + if magic != b"PIEH": + raise ValueError("Magic number incorrect. Invalid .flo file") + w = int.from_bytes(file.read(4), "little") + h = int.from_bytes(file.read(4), "little") + data = file.read(2 * w * h * 4) + data = np.frombuffer(data, dtype=np.float32) + return data.reshape(h, w, 2).transpose(2, 0, 1) def _flows_key(self, data: Tuple[str, Any]) -> Tuple[str, int]: path = pathlib.Path(data[0]) @@ -120,22 +105,29 @@ def _images_key(self, data: Tuple[Tuple[str, Any], Tuple[str, Any]]) -> Tuple[st def _collate_and_decode_sample( self, - data: Tuple[str, ...], + data: Tuple[Tuple[str, io.IOBase], Any], *, decoder: Optional[Callable[[io.IOBase], torch.Tensor]], + config: DatasetConfig, ) -> Dict[str, Any]: - flo, images = data - img1, img2 = images + if config.split == "train": + flo, images = data + img1, img2 = images + flow_arr = self._read_flo(flo[1]) + del images, flo + else: + # When split is `test` + img1, img2 = data + + del data path1, buffer1 = img1 path2, buffer2 = img2 - flow_arr = self._read_flo(flo) - return dict( image1=decoder(buffer1) if decoder else buffer1, image2=decoder(buffer2) if decoder else buffer2, - label=flow_arr, + flow=flow_arr if config.split == "train" else None, ) def _make_datapipe( @@ -143,7 +135,7 @@ def _make_datapipe( resource_dps: List[IterDataPipe], *, config: DatasetConfig, - decoder: Optional[Callable[[io.IOBase], torch.Tensor]] + decoder: Optional[Callable[[io.IOBase], torch.Tensor]], ) -> IterDataPipe[Tuple[Dict[str, Any], Dict[str, Any]]]: dp = resource_dps[0] archive_dp = ZipArchiveReader(dp) @@ -159,12 +151,17 @@ def _make_datapipe( ) flo_dp = Shuffler(flo_dp, buffer_size=INFINITE_BUFFER_SIZE) - pass_images_dp = IntCategoryGrouper(pass_images_dp) - zipped_dp = KeyZipper( - flo_dp, - pass_images_dp, - key_fn=self._flows_key, - ref_key_fn=self._images_key, - ) - - return Mapper(zipped_dp, self._collate_and_decode_sample, fn_kwargs=dict(decoder=decoder)) + pass_images_dp: IterDataPipe[Tuple[str, Any], Tuple[stry, Any]] = IntCategoryGrouper(pass_images_dp) + if config.split == "train": + zipped_dp = IterKeyZipper( + flo_dp, + pass_images_dp, + key_fn=self._flows_key, + ref_key_fn=self._images_key, + ) + return Mapper(zipped_dp, self._collate_and_decode_sample, fn_kwargs=dict(decoder=decoder, config=config)) + else: + # When split is `test`, flo_dp will be empty and thus should not be zipped with pass_images_dp + return Mapper( + pass_images_dp, self._collate_and_decode_sample, fn_kwargs=dict(decoder=decoder, config=config) + ) From 32cc661085201712bd0159dd4c4cacb2efdd1b5f Mon Sep 17 00:00:00 2001 From: Kushashwa Ravi Shrimali Date: Thu, 11 Nov 2021 08:42:39 +0530 Subject: [PATCH 08/28] Apply suggestions from code review Co-authored-by: Philip Meier --- .../prototype/datasets/_builtin/sintel.py | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/torchvision/prototype/datasets/_builtin/sintel.py b/torchvision/prototype/datasets/_builtin/sintel.py index a92a463df07..d967780d3ba 100644 --- a/torchvision/prototype/datasets/_builtin/sintel.py +++ b/torchvision/prototype/datasets/_builtin/sintel.py @@ -40,7 +40,7 @@ def pairwise(iterable: Iterable[T]) -> Iterable[Tuple[T, T]]: return zip(a, b) -class IntCategoryGrouper(IterDataPipe[Tuple[Tuple[str, T], Tuple[str, T]]]): +class InSceneGrouper(IterDataPipe[Tuple[Tuple[str, T], Tuple[str, T]]]): def __init__(self, datapipe: IterDataPipe[Tuple[str, T]]) -> None: self.datapipe = datapipe @@ -57,25 +57,25 @@ def _make_info(self) -> DatasetInfo: return DatasetInfo( "sintel", type=DatasetType.IMAGE, - homepage="", + homepage="http://sintel.is.tue.mpg.de/", valid_options=dict( split=("train", "test"), - pass_=("clean", "final"), + pass_name=("clean", "final"), ), ) def resources(self, config: DatasetConfig) -> List[OnlineResource]: archive = HttpResource( - "http://sintel.cs.washington.edu/MPI-Sintel-complete.zip", - sha256="", + "http://files.is.tue.mpg.de/sintel/MPI-Sintel-complete.zip", + sha256="bdc80abbe6ae13f96f6aa02e04d98a251c017c025408066a00204cd2c7104c5f", ) return [archive] - def _classify_train_test(self, data: Tuple[str, Any], *, config: DatasetConfig) -> bool: + def _filter_split(self, data: Tuple[str, Any], *, config: DatasetConfig) -> bool: path = pathlib.Path(data[0]) return config.split in str(path.parent) - def _classify_archive(self, data: Tuple[str, Any], *, config: DatasetConfig) -> Union[int, None]: + def _classify_archive(self, data: Tuple[str, Any], *, config: DatasetConfig) -> Optional[int]: path = pathlib.Path(data[0]) if config.pass_ == path.parent.parent.name and path.suffix == ".png": return 0 @@ -84,7 +84,7 @@ def _classify_archive(self, data: Tuple[str, Any], *, config: DatasetConfig) -> else: return None - def _read_flo(self, file: IterDataPipe[Tuple[Any, io.IOBase]]) -> Any: + def _read_flo(self, file: io.IOBase) -> torch.Tensor: magic = file.read(4) if magic != b"PIEH": raise ValueError("Magic number incorrect. Invalid .flo file") @@ -136,11 +136,11 @@ def _make_datapipe( *, config: DatasetConfig, decoder: Optional[Callable[[io.IOBase], torch.Tensor]], - ) -> IterDataPipe[Tuple[Dict[str, Any], Dict[str, Any]]]: + ) -> IterDataPipe[Dict[str, Any]]: dp = resource_dps[0] archive_dp = ZipArchiveReader(dp) - curr_split = Filter(archive_dp, partial(self._classify_train_test, config=config)) + curr_split = Filter(archive_dp, self._classify_train_test, fn_kwargs=dict(split=split)) pass_images_dp, flo_dp = Demultiplexer( curr_split, From 7f27e3f330c741aabf9de9a3480ac2fa2692bad9 Mon Sep 17 00:00:00 2001 From: Kushashwa Ravi Shrimali Date: Thu, 11 Nov 2021 10:22:37 +0530 Subject: [PATCH 09/28] Address reviews... --- .../prototype/datasets/_builtin/sintel.py | 77 +++++++++++-------- 1 file changed, 46 insertions(+), 31 deletions(-) diff --git a/torchvision/prototype/datasets/_builtin/sintel.py b/torchvision/prototype/datasets/_builtin/sintel.py index d967780d3ba..33de5fe8b18 100644 --- a/torchvision/prototype/datasets/_builtin/sintel.py +++ b/torchvision/prototype/datasets/_builtin/sintel.py @@ -2,7 +2,7 @@ import pathlib import re from functools import partial -from typing import Any, Callable, Dict, List, Optional, Tuple, Iterator, Iterable, TypeVar, Union +from typing import Any, Callable, Dict, List, Optional, Tuple, Iterator, Iterable, TypeVar import numpy as np import torch @@ -27,10 +27,8 @@ T = TypeVar("T") -FILE_NAME_PATTERN = re.compile(r"(frame|image)_(?P\d+)[.](flo|png)") - try: - from itertools import pairwise # type: ignore + from itertools import pairwise # type: ignore[attr-defined] except ImportError: from itertools import tee @@ -53,6 +51,9 @@ def __iter__(self) -> Iterator[Tuple[Tuple[str, Any], Tuple[str, Any]]]: class SINTEL(Dataset): + + _FILE_NAME_PATTERN = re.compile(r"(frame|image)_(?P\d+)[.](flo|png)") + def _make_info(self) -> DatasetInfo: return DatasetInfo( "sintel", @@ -71,15 +72,23 @@ def resources(self, config: DatasetConfig) -> List[OnlineResource]: ) return [archive] - def _filter_split(self, data: Tuple[str, Any], *, config: DatasetConfig) -> bool: + def _filter_split(self, data: Tuple[str, Any], *, split: str) -> bool: path = pathlib.Path(data[0]) - return config.split in str(path.parent) + # The dataset contains has the folder "training", while allowed options for `split` are + # "train" and "test", we don't check for equality here ("train" != "training") and instead + # check if split is in the folder name + return split in path.parent.parent.parent.name - def _classify_archive(self, data: Tuple[str, Any], *, config: DatasetConfig) -> Optional[int]: + def _filter_images(self, data: Tuple[str, Any], *, pass_name: str) -> bool: path = pathlib.Path(data[0]) - if config.pass_ == path.parent.parent.name and path.suffix == ".png": + return pass_name in str(path.parent) and path.suffix == ".png" + + def _classify_archive(self, data: Tuple[str, Any], *, pass_name: str) -> Optional[int]: + path = pathlib.Path(data[0]) + suffix = path.suffix + if suffix == ".flo": return 0 - elif path.suffix == ".flo": + elif pass_name == path.parent.parent.name and suffix == ".png": return 1 else: return None @@ -91,13 +100,15 @@ def _read_flo(self, file: io.IOBase) -> torch.Tensor: w = int.from_bytes(file.read(4), "little") h = int.from_bytes(file.read(4), "little") data = file.read(2 * w * h * 4) - data = np.frombuffer(data, dtype=np.float32) - return data.reshape(h, w, 2).transpose(2, 0, 1) + data_arr = np.frombuffer(data, dtype=np.float32) + # Creating a copy of the underlying array, to avoid UserWarning: "The given NumPy array + # is not writeable, and PyTorch does not support non-writeable tensors." + return torch.from_numpy(np.copy(data_arr.reshape(h, w, 2).transpose(2, 0, 1))) def _flows_key(self, data: Tuple[str, Any]) -> Tuple[str, int]: path = pathlib.Path(data[0]) category = path.parent.name - idx = int(FILE_NAME_PATTERN.match(path.name).group("idx")) # type: ignore[union-attr] + idx = int(self._FILE_NAME_PATTERN.match(path.name).group("idx")) # type: ignore[union-attr] return category, idx def _images_key(self, data: Tuple[Tuple[str, Any], Tuple[str, Any]]) -> Tuple[str, int]: @@ -110,24 +121,24 @@ def _collate_and_decode_sample( decoder: Optional[Callable[[io.IOBase], torch.Tensor]], config: DatasetConfig, ) -> Dict[str, Any]: + # flo, images = data + # img1, img2 = images + if config.split == "train": flo, images = data img1, img2 = images flow_arr = self._read_flo(flo[1]) - del images, flo else: # When split is `test` img1, img2 = data - del data - path1, buffer1 = img1 path2, buffer2 = img2 return dict( - image1=decoder(buffer1) if decoder else buffer1, - image2=decoder(buffer2) if decoder else buffer2, - flow=flow_arr if config.split == "train" else None, + image1=(path1, decoder(buffer1)) if decoder else (path1, buffer1), + image2=(path2, decoder(buffer2)) if decoder else (path2, buffer2), + flow=(flo[0], flow_arr) if config.split == "train" else ("", None), ) def _make_datapipe( @@ -140,19 +151,17 @@ def _make_datapipe( dp = resource_dps[0] archive_dp = ZipArchiveReader(dp) - curr_split = Filter(archive_dp, self._classify_train_test, fn_kwargs=dict(split=split)) - - pass_images_dp, flo_dp = Demultiplexer( - curr_split, - 2, - partial(self._classify_archive, config=config), - drop_none=True, - buffer_size=INFINITE_BUFFER_SIZE, - ) - flo_dp = Shuffler(flo_dp, buffer_size=INFINITE_BUFFER_SIZE) - - pass_images_dp: IterDataPipe[Tuple[str, Any], Tuple[stry, Any]] = IntCategoryGrouper(pass_images_dp) + curr_split = Filter(archive_dp, self._filter_split, fn_kwargs=dict(split=config.split)) if config.split == "train": + flo_dp, pass_images_dp = Demultiplexer( + curr_split, + 2, + partial(self._classify_archive, pass_name=config.pass_name), + drop_none=True, + buffer_size=INFINITE_BUFFER_SIZE, + ) + flo_dp = Shuffler(flo_dp, buffer_size=INFINITE_BUFFER_SIZE) + pass_images_dp: IterDataPipe[Tuple[str, Any], Tuple[stry, Any]] = InSceneGrouper(pass_images_dp) zipped_dp = IterKeyZipper( flo_dp, pass_images_dp, @@ -161,7 +170,13 @@ def _make_datapipe( ) return Mapper(zipped_dp, self._collate_and_decode_sample, fn_kwargs=dict(decoder=decoder, config=config)) else: - # When split is `test`, flo_dp will be empty and thus should not be zipped with pass_images_dp + # When config.split is "test" + # There are no flow files for test split + # TODO: How to zip an empty IterDataPipe and pass to the Mapper? + # flo_dp = IterDataPipe() + pass_images_dp = Filter(curr_split, self._filter_images, fn_kwargs=dict(pass_name=config.pass_name)) + pass_images_dp = Shuffler(pass_images_dp, buffer_size=INFINITE_BUFFER_SIZE) + pass_images_dp = InSceneGrouper(pass_images_dp) return Mapper( pass_images_dp, self._collate_and_decode_sample, fn_kwargs=dict(decoder=decoder, config=config) ) From cdcb9140479dd5d459b842201c8641c9c5540e8a Mon Sep 17 00:00:00 2001 From: Kushashwa Ravi Shrimali Date: Fri, 12 Nov 2021 06:20:13 +0000 Subject: [PATCH 10/28] Update torchvision/prototype/datasets/_builtin/sintel.py Co-authored-by: Philip Meier --- torchvision/prototype/datasets/_builtin/sintel.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/torchvision/prototype/datasets/_builtin/sintel.py b/torchvision/prototype/datasets/_builtin/sintel.py index 33de5fe8b18..27e717e71ea 100644 --- a/torchvision/prototype/datasets/_builtin/sintel.py +++ b/torchvision/prototype/datasets/_builtin/sintel.py @@ -77,7 +77,7 @@ def _filter_split(self, data: Tuple[str, Any], *, split: str) -> bool: # The dataset contains has the folder "training", while allowed options for `split` are # "train" and "test", we don't check for equality here ("train" != "training") and instead # check if split is in the folder name - return split in path.parent.parent.parent.name + return split in path.parents[2].name def _filter_images(self, data: Tuple[str, Any], *, pass_name: str) -> bool: path = pathlib.Path(data[0]) From b58c14bf88781f4d3e1a5b3f98ffc4cb589fe3f4 Mon Sep 17 00:00:00 2001 From: Kushashwa Ravi Shrimali Date: Fri, 12 Nov 2021 12:19:01 +0530 Subject: [PATCH 11/28] Add support for 'both' as pass_name --- .../prototype/datasets/_builtin/sintel.py | 51 +++++++++---------- 1 file changed, 24 insertions(+), 27 deletions(-) diff --git a/torchvision/prototype/datasets/_builtin/sintel.py b/torchvision/prototype/datasets/_builtin/sintel.py index 33de5fe8b18..380b3ef14ed 100644 --- a/torchvision/prototype/datasets/_builtin/sintel.py +++ b/torchvision/prototype/datasets/_builtin/sintel.py @@ -25,11 +25,10 @@ ) from torchvision.prototype.datasets.utils._internal import INFINITE_BUFFER_SIZE -T = TypeVar("T") - try: from itertools import pairwise # type: ignore[attr-defined] except ImportError: + T = TypeVar("T") from itertools import tee def pairwise(iterable: Iterable[T]) -> Iterable[Tuple[T, T]]: @@ -53,6 +52,7 @@ def __iter__(self) -> Iterator[Tuple[Tuple[str, Any], Tuple[str, Any]]]: class SINTEL(Dataset): _FILE_NAME_PATTERN = re.compile(r"(frame|image)_(?P\d+)[.](flo|png)") + _PASS_NAME_BOTH = re.compile(r"clean|final") def _make_info(self) -> DatasetInfo: return DatasetInfo( @@ -61,7 +61,7 @@ def _make_info(self) -> DatasetInfo: homepage="http://sintel.is.tue.mpg.de/", valid_options=dict( split=("train", "test"), - pass_name=("clean", "final"), + pass_name=("clean", "final", "both"), ), ) @@ -81,15 +81,19 @@ def _filter_split(self, data: Tuple[str, Any], *, split: str) -> bool: def _filter_images(self, data: Tuple[str, Any], *, pass_name: str) -> bool: path = pathlib.Path(data[0]) - return pass_name in str(path.parent) and path.suffix == ".png" + regexp = self._PASS_NAME_BOTH if pass_name == "both" else re.compile(pass_name) + matched = regexp.search(str(path.parent)) + return (True if matched else False) and path.suffix == ".png" def _classify_archive(self, data: Tuple[str, Any], *, pass_name: str) -> Optional[int]: path = pathlib.Path(data[0]) suffix = path.suffix if suffix == ".flo": return 0 - elif pass_name == path.parent.parent.name and suffix == ".png": - return 1 + elif suffix == ".png": + regexp = self._PASS_NAME_BOTH if pass_name == "both" else re.compile(pass_name) + matched = regexp.search(path.parents[1].name) + return 1 if matched else None else: return None @@ -121,24 +125,20 @@ def _collate_and_decode_sample( decoder: Optional[Callable[[io.IOBase], torch.Tensor]], config: DatasetConfig, ) -> Dict[str, Any]: - # flo, images = data - # img1, img2 = images - - if config.split == "train": - flo, images = data - img1, img2 = images - flow_arr = self._read_flo(flo[1]) - else: - # When split is `test` - img1, img2 = data + flo, images = data + img1, img2 = images + flow_arr = self._read_flo(flo[1]) if flo[1] else None path1, buffer1 = img1 path2, buffer2 = img2 return dict( - image1=(path1, decoder(buffer1)) if decoder else (path1, buffer1), - image2=(path2, decoder(buffer2)) if decoder else (path2, buffer2), - flow=(flo[0], flow_arr) if config.split == "train" else ("", None), + image1=decoder(buffer1) if decoder else buffer1, + image1_path=path1, + image2=decoder(buffer2) if decoder else buffer2, + image2_path=path2, + flow=flow_arr, + flow_path=flo[0] or "", ) def _make_datapipe( @@ -168,15 +168,12 @@ def _make_datapipe( key_fn=self._flows_key, ref_key_fn=self._images_key, ) - return Mapper(zipped_dp, self._collate_and_decode_sample, fn_kwargs=dict(decoder=decoder, config=config)) else: - # When config.split is "test" - # There are no flow files for test split - # TODO: How to zip an empty IterDataPipe and pass to the Mapper? - # flo_dp = IterDataPipe() pass_images_dp = Filter(curr_split, self._filter_images, fn_kwargs=dict(pass_name=config.pass_name)) pass_images_dp = Shuffler(pass_images_dp, buffer_size=INFINITE_BUFFER_SIZE) pass_images_dp = InSceneGrouper(pass_images_dp) - return Mapper( - pass_images_dp, self._collate_and_decode_sample, fn_kwargs=dict(decoder=decoder, config=config) - ) + # When config.split is "test" + # There are no flow files for test split, hence we create a fake flow data + zipped_dp = Mapper(pass_images_dp, lambda data: ((None, None), data)) + + return Mapper(zipped_dp, self._collate_and_decode_sample, fn_kwargs=dict(decoder=decoder, config=config)) From 52ba6da2f9db0047cb3e7741f5550a4908a8dc8e Mon Sep 17 00:00:00 2001 From: Kushashwa Ravi Shrimali Date: Fri, 12 Nov 2021 12:21:27 +0530 Subject: [PATCH 12/28] Keep imports in the same block --- torchvision/prototype/datasets/_builtin/sintel.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/torchvision/prototype/datasets/_builtin/sintel.py b/torchvision/prototype/datasets/_builtin/sintel.py index 1c2d9a3b48c..177bdf9f29a 100644 --- a/torchvision/prototype/datasets/_builtin/sintel.py +++ b/torchvision/prototype/datasets/_builtin/sintel.py @@ -28,9 +28,10 @@ try: from itertools import pairwise # type: ignore[attr-defined] except ImportError: - T = TypeVar("T") from itertools import tee + T = TypeVar("T") + def pairwise(iterable: Iterable[T]) -> Iterable[Tuple[T, T]]: a, b = tee(iterable) next(b, None) From e515fbb69c9eaa94ff0d8934beaaad02442a9391 Mon Sep 17 00:00:00 2001 From: Kushashwa Ravi Shrimali Date: Fri, 12 Nov 2021 12:23:21 +0530 Subject: [PATCH 13/28] Convert re.search output to bool --- torchvision/prototype/datasets/_builtin/sintel.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/torchvision/prototype/datasets/_builtin/sintel.py b/torchvision/prototype/datasets/_builtin/sintel.py index 177bdf9f29a..efa7f6508ed 100644 --- a/torchvision/prototype/datasets/_builtin/sintel.py +++ b/torchvision/prototype/datasets/_builtin/sintel.py @@ -83,8 +83,8 @@ def _filter_split(self, data: Tuple[str, Any], *, split: str) -> bool: def _filter_images(self, data: Tuple[str, Any], *, pass_name: str) -> bool: path = pathlib.Path(data[0]) regexp = self._PASS_NAME_BOTH if pass_name == "both" else re.compile(pass_name) - matched = regexp.search(str(path.parent)) - return (True if matched else False) and path.suffix == ".png" + matched = bool(regexp.search(str(path.parent))) + return matched and path.suffix == ".png" def _classify_archive(self, data: Tuple[str, Any], *, pass_name: str) -> Optional[int]: path = pathlib.Path(data[0]) From ee3c78fa4fa4676fb089bcc7b8308a65c0d091a0 Mon Sep 17 00:00:00 2001 From: Kushashwa Ravi Shrimali Date: Mon, 15 Nov 2021 10:53:04 +0530 Subject: [PATCH 14/28] Address reviews, cleanup, one more todo left... --- .../prototype/datasets/_builtin/sintel.py | 32 ++++++++++--------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/torchvision/prototype/datasets/_builtin/sintel.py b/torchvision/prototype/datasets/_builtin/sintel.py index efa7f6508ed..0160b76ff46 100644 --- a/torchvision/prototype/datasets/_builtin/sintel.py +++ b/torchvision/prototype/datasets/_builtin/sintel.py @@ -25,13 +25,13 @@ ) from torchvision.prototype.datasets.utils._internal import INFINITE_BUFFER_SIZE +T = TypeVar("T") + try: from itertools import pairwise # type: ignore[attr-defined] except ImportError: from itertools import tee - T = TypeVar("T") - def pairwise(iterable: Iterable[T]) -> Iterable[Tuple[T, T]]: a, b = tee(iterable) next(b, None) @@ -53,7 +53,6 @@ def __iter__(self) -> Iterator[Tuple[Tuple[str, Any], Tuple[str, Any]]]: class SINTEL(Dataset): _FILE_NAME_PATTERN = re.compile(r"(frame|image)_(?P\d+)[.](flo|png)") - _PASS_NAME_BOTH = re.compile(r"clean|final") def _make_info(self) -> DatasetInfo: return DatasetInfo( @@ -82,8 +81,10 @@ def _filter_split(self, data: Tuple[str, Any], *, split: str) -> bool: def _filter_images(self, data: Tuple[str, Any], *, pass_name: str) -> bool: path = pathlib.Path(data[0]) - regexp = self._PASS_NAME_BOTH if pass_name == "both" else re.compile(pass_name) - matched = bool(regexp.search(str(path.parent))) + if pass_name == "both": + matched = path.parents[1].name in ["clean", "final"] + else: + matched = path.parents[1].name == pass_name return matched and path.suffix == ".png" def _classify_archive(self, data: Tuple[str, Any], *, pass_name: str) -> Optional[int]: @@ -92,9 +93,7 @@ def _classify_archive(self, data: Tuple[str, Any], *, pass_name: str) -> Optiona if suffix == ".flo": return 0 elif suffix == ".png": - regexp = self._PASS_NAME_BOTH if pass_name == "both" else re.compile(pass_name) - matched = regexp.search(path.parents[1].name) - return 1 if matched else None + return 1 else: return None @@ -104,8 +103,10 @@ def _read_flo(self, file: io.IOBase) -> torch.Tensor: raise ValueError("Magic number incorrect. Invalid .flo file") w = int.from_bytes(file.read(4), "little") h = int.from_bytes(file.read(4), "little") + data = file.read(2 * w * h * 4) data_arr = np.frombuffer(data, dtype=np.float32) + # Creating a copy of the underlying array, to avoid UserWarning: "The given NumPy array # is not writeable, and PyTorch does not support non-writeable tensors." return torch.from_numpy(np.copy(data_arr.reshape(h, w, 2).transpose(2, 0, 1))) @@ -116,6 +117,9 @@ def _flows_key(self, data: Tuple[str, Any]) -> Tuple[str, int]: idx = int(self._FILE_NAME_PATTERN.match(path.name).group("idx")) # type: ignore[union-attr] return category, idx + def _add_fake_flow_data(self, data: Tuple[str, Any]) -> Tuple[tuple, Tuple[str, Any]]: + return ((None, None), data) + def _images_key(self, data: Tuple[Tuple[str, Any], Tuple[str, Any]]) -> Tuple[str, int]: return self._flows_key(data[0]) @@ -139,7 +143,7 @@ def _collate_and_decode_sample( image2=decoder(buffer2) if decoder else buffer2, image2_path=path2, flow=flow_arr, - flow_path=flo[0] or "", + flow_path=flo[0], ) def _make_datapipe( @@ -153,9 +157,10 @@ def _make_datapipe( archive_dp = ZipArchiveReader(dp) curr_split = Filter(archive_dp, self._filter_split, fn_kwargs=dict(split=config.split)) + filtered_curr_split = Filter(curr_split, self._filter_images, fn_kwargs=dict(pass_name=config.pass_name)) if config.split == "train": flo_dp, pass_images_dp = Demultiplexer( - curr_split, + filtered_curr_split, 2, partial(self._classify_archive, pass_name=config.pass_name), drop_none=True, @@ -170,11 +175,8 @@ def _make_datapipe( ref_key_fn=self._images_key, ) else: - pass_images_dp = Filter(curr_split, self._filter_images, fn_kwargs=dict(pass_name=config.pass_name)) - pass_images_dp = Shuffler(pass_images_dp, buffer_size=INFINITE_BUFFER_SIZE) + pass_images_dp = Shuffler(filtered_curr_split, buffer_size=INFINITE_BUFFER_SIZE) pass_images_dp = InSceneGrouper(pass_images_dp) - # When config.split is "test" - # There are no flow files for test split, hence we create a fake flow data - zipped_dp = Mapper(pass_images_dp, lambda data: ((None, None), data)) + zipped_dp = Mapper(pass_images_dp, self._add_fake_flow_data) return Mapper(zipped_dp, self._collate_and_decode_sample, fn_kwargs=dict(decoder=decoder, config=config)) From 591633a57b711d9086a4a55837b898f5cddca90b Mon Sep 17 00:00:00 2001 From: Kushashwa Ravi Shrimali Date: Mon, 15 Nov 2021 16:15:32 +0530 Subject: [PATCH 15/28] little endian format for data (flow file) --- torchvision/prototype/datasets/_builtin/sintel.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/torchvision/prototype/datasets/_builtin/sintel.py b/torchvision/prototype/datasets/_builtin/sintel.py index 0160b76ff46..98c2216b88d 100644 --- a/torchvision/prototype/datasets/_builtin/sintel.py +++ b/torchvision/prototype/datasets/_builtin/sintel.py @@ -105,7 +105,7 @@ def _read_flo(self, file: io.IOBase) -> torch.Tensor: h = int.from_bytes(file.read(4), "little") data = file.read(2 * w * h * 4) - data_arr = np.frombuffer(data, dtype=np.float32) + data_arr = np.frombuffer(data, dtype=" Date: Mon, 15 Nov 2021 16:22:13 +0530 Subject: [PATCH 16/28] As per review, use frombuffer consistently --- torchvision/prototype/datasets/_builtin/sintel.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/torchvision/prototype/datasets/_builtin/sintel.py b/torchvision/prototype/datasets/_builtin/sintel.py index 98c2216b88d..0c6e2790b81 100644 --- a/torchvision/prototype/datasets/_builtin/sintel.py +++ b/torchvision/prototype/datasets/_builtin/sintel.py @@ -101,8 +101,8 @@ def _read_flo(self, file: io.IOBase) -> torch.Tensor: magic = file.read(4) if magic != b"PIEH": raise ValueError("Magic number incorrect. Invalid .flo file") - w = int.from_bytes(file.read(4), "little") - h = int.from_bytes(file.read(4), "little") + w = np.frombufer(file.read(4), dtype=" Date: Tue, 16 Nov 2021 08:18:41 +0530 Subject: [PATCH 17/28] Only filter pass name, and not png, include flow filter there --- .../prototype/datasets/_builtin/sintel.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/torchvision/prototype/datasets/_builtin/sintel.py b/torchvision/prototype/datasets/_builtin/sintel.py index 0c6e2790b81..a6b9f9a54c9 100644 --- a/torchvision/prototype/datasets/_builtin/sintel.py +++ b/torchvision/prototype/datasets/_builtin/sintel.py @@ -79,13 +79,13 @@ def _filter_split(self, data: Tuple[str, Any], *, split: str) -> bool: # check if split is in the folder name return split in path.parents[2].name - def _filter_images(self, data: Tuple[str, Any], *, pass_name: str) -> bool: + def _filter_pass_name(self, data: Tuple[str, Any], *, pass_name: str) -> bool: path = pathlib.Path(data[0]) if pass_name == "both": - matched = path.parents[1].name in ["clean", "final"] + matched = path.parents[1].name in ["clean", "final", "flow"] else: - matched = path.parents[1].name == pass_name - return matched and path.suffix == ".png" + matched = path.parents[1].name in [pass_name, "flow"] + return matched def _classify_archive(self, data: Tuple[str, Any], *, pass_name: str) -> Optional[int]: path = pathlib.Path(data[0]) @@ -101,8 +101,8 @@ def _read_flo(self, file: io.IOBase) -> torch.Tensor: magic = file.read(4) if magic != b"PIEH": raise ValueError("Magic number incorrect. Invalid .flo file") - w = np.frombufer(file.read(4), dtype=" Date: Tue, 16 Nov 2021 08:23:00 +0530 Subject: [PATCH 18/28] Rename the func --- torchvision/prototype/datasets/_builtin/sintel.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/torchvision/prototype/datasets/_builtin/sintel.py b/torchvision/prototype/datasets/_builtin/sintel.py index a6b9f9a54c9..a62328eb584 100644 --- a/torchvision/prototype/datasets/_builtin/sintel.py +++ b/torchvision/prototype/datasets/_builtin/sintel.py @@ -79,7 +79,7 @@ def _filter_split(self, data: Tuple[str, Any], *, split: str) -> bool: # check if split is in the folder name return split in path.parents[2].name - def _filter_pass_name(self, data: Tuple[str, Any], *, pass_name: str) -> bool: + def _filter_pass_name_and_flow(self, data: Tuple[str, Any], *, pass_name: str) -> bool: path = pathlib.Path(data[0]) if pass_name == "both": matched = path.parents[1].name in ["clean", "final", "flow"] @@ -157,7 +157,9 @@ def _make_datapipe( archive_dp = ZipArchiveReader(dp) curr_split = Filter(archive_dp, self._filter_split, fn_kwargs=dict(split=config.split)) - filtered_curr_split = Filter(curr_split, self._filter_pass_name, fn_kwargs=dict(pass_name=config.pass_name)) + filtered_curr_split = Filter( + curr_split, self._filter_pass_name_and_flow, fn_kwargs=dict(pass_name=config.pass_name) + ) if config.split == "train": flo_dp, pass_images_dp = Demultiplexer( filtered_curr_split, From cb904c5a341f90615112e3e66a8688b9a0844852 Mon Sep 17 00:00:00 2001 From: Kushashwa Ravi Shrimali Date: Tue, 16 Nov 2021 08:51:11 +0530 Subject: [PATCH 19/28] Add label (scene dir), needs review --- torchvision/prototype/datasets/_builtin/sintel.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/torchvision/prototype/datasets/_builtin/sintel.py b/torchvision/prototype/datasets/_builtin/sintel.py index a62328eb584..623027643dd 100644 --- a/torchvision/prototype/datasets/_builtin/sintel.py +++ b/torchvision/prototype/datasets/_builtin/sintel.py @@ -137,6 +137,9 @@ def _collate_and_decode_sample( path1, buffer1 = img1 path2, buffer2 = img2 + # We return the scene dir name as the label + label = path1.split("/")[-2] + return dict( image1=decoder(buffer1) if decoder else buffer1, image1_path=path1, @@ -144,6 +147,7 @@ def _collate_and_decode_sample( image2_path=path2, flow=flow_arr, flow_path=flo[0], + label=label, ) def _make_datapipe( From 10bdc4bc39a16112a9b17a8504e221b5b59dae94 Mon Sep 17 00:00:00 2001 From: Kushashwa Ravi Shrimali Date: Wed, 17 Nov 2021 08:50:15 +0530 Subject: [PATCH 20/28] Add test for sintel dataset --- test/builtin_dataset_mocks.py | 40 ++++++++++++++++++++++++- test/test_prototype_builtin_datasets.py | 1 + 2 files changed, 40 insertions(+), 1 deletion(-) diff --git a/test/builtin_dataset_mocks.py b/test/builtin_dataset_mocks.py index 173776ec799..078df8d79f8 100644 --- a/test/builtin_dataset_mocks.py +++ b/test/builtin_dataset_mocks.py @@ -10,7 +10,7 @@ import numpy as np import pytest import torch -from datasets_utils import create_image_folder, make_tar, make_zip +from datasets_utils import create_image_folder, make_tar, make_zip, make_fake_flo_file from torch.testing import make_tensor as _make_tensor from torchdata.datapipes.iter import IterDataPipe from torchvision.prototype import datasets @@ -504,3 +504,41 @@ def imagenet(info, root, config): make_tar(root, f"{devkit_root}.tar.gz", devkit_root, compression="gz") return num_samples + + +@dataset_mocks.register_mock_data_fn +def sintel(info, root, config): + FLOW_H, FLOW_W = 3, 4 + + num_images_per_scene = 3 if config["split"] == "train" else 4 + num_scenes = 2 + + for split_dir in ("training", "test"): + for pass_name in ("clean", "final"): + image_root = root / split_dir / pass_name + + for scene_id in range(num_scenes): + scene_dir = image_root / f"scene_{scene_id}" + create_image_folder( + image_root, + name=str(scene_dir), + file_name_fn=lambda image_idx: f"frame_000{image_idx}.png", + num_examples=num_images_per_scene, + ) + + flow_root = root / "training" / "flow" + for scene_id in range(num_scenes): + # flow_root.mkdir(exist_ok=True) + scene_dir = flow_root / f"scene_{scene_id}" + scene_dir.mkdir(exist_ok=True, parents=True) + for i in range(num_images_per_scene - 1): + file_name = str(scene_dir / f"frame_000{i}.flo") + make_fake_flo_file(h=FLOW_H, w=FLOW_W, file_name=file_name) + + # with e.g. num_images_per_scene = 3, for a single scene with have 3 images + # which are frame_0000, frame_0001 and frame_0002 + # They will be consecutively paired as (frame_0000, frame_0001), (frame_0001, frame_0002), + # that is 3 - 1 = 2 examples. Hence the formula below + num_passes = 2 if config["pass_name"] == "both" else 1 + num_examples = (num_images_per_scene - 1) * num_scenes * num_passes + return num_examples diff --git a/test/test_prototype_builtin_datasets.py b/test/test_prototype_builtin_datasets.py index 24120a73856..4bff063a839 100644 --- a/test/test_prototype_builtin_datasets.py +++ b/test/test_prototype_builtin_datasets.py @@ -23,6 +23,7 @@ "caltech256", "caltech101", "imagenet", + "sintel", ] for name in TMP: loader = functools.partial(builtin_dataset_mocks.load, name) From 54618c6c4c30c8af74bd848d290daf4a6974f5a8 Mon Sep 17 00:00:00 2001 From: Kushashwa Ravi Shrimali Date: Wed, 17 Nov 2021 08:52:36 +0530 Subject: [PATCH 21/28] Remove comment --- test/builtin_dataset_mocks.py | 1 - 1 file changed, 1 deletion(-) diff --git a/test/builtin_dataset_mocks.py b/test/builtin_dataset_mocks.py index 078df8d79f8..4684145fb95 100644 --- a/test/builtin_dataset_mocks.py +++ b/test/builtin_dataset_mocks.py @@ -528,7 +528,6 @@ def sintel(info, root, config): flow_root = root / "training" / "flow" for scene_id in range(num_scenes): - # flow_root.mkdir(exist_ok=True) scene_dir = flow_root / f"scene_{scene_id}" scene_dir.mkdir(exist_ok=True, parents=True) for i in range(num_images_per_scene - 1): From 6c04d5fddd69e255f29f040c20fc4b21059e7697 Mon Sep 17 00:00:00 2001 From: Kushashwa Ravi Shrimali Date: Fri, 19 Nov 2021 15:18:17 +0530 Subject: [PATCH 22/28] Temporary fix + test class fixes --- test/builtin_dataset_mocks.py | 11 +++++--- torchvision/prototype/datasets/datapipes.py | 29 +++++++++++++++++++++ 2 files changed, 36 insertions(+), 4 deletions(-) create mode 100644 torchvision/prototype/datasets/datapipes.py diff --git a/test/builtin_dataset_mocks.py b/test/builtin_dataset_mocks.py index 4684145fb95..d4b0b9692bb 100644 --- a/test/builtin_dataset_mocks.py +++ b/test/builtin_dataset_mocks.py @@ -101,10 +101,11 @@ def _get(self, dataset, config): def _decoder(self, dataset_type): def to_bytes(file): - try: - return file.read() - finally: - file.close() + # "ValueError: read from closed file" error, temporary fix for this PR + # try: + return file.read() + # finally: + # file.close() if dataset_type == datasets.utils.DatasetType.RAW: return datasets.decoder.raw @@ -540,4 +541,6 @@ def sintel(info, root, config): # that is 3 - 1 = 2 examples. Hence the formula below num_passes = 2 if config["pass_name"] == "both" else 1 num_examples = (num_images_per_scene - 1) * num_scenes * num_passes + + make_zip(root, "MPI-Sintel-complete.zip", "training", "test") return num_examples diff --git a/torchvision/prototype/datasets/datapipes.py b/torchvision/prototype/datasets/datapipes.py new file mode 100644 index 00000000000..5159e3f10aa --- /dev/null +++ b/torchvision/prototype/datasets/datapipes.py @@ -0,0 +1,29 @@ +from typing import Tuple, IO, Iterator, Union, cast + +import torch +from torchdata.datapipes.iter import IterDataPipe + +__all__ = ["FloReader"] + + +class FloReader(IterDataPipe[torch.Tensor]): + def __init__(self, datapipe: IterDataPipe[Tuple[str, IO]]) -> None: + self.datapipe = datapipe + + def _read_data(self, file: IO, *, dtype: torch.dtype, count: int) -> torch.Tensor: + num_bytes_per_value = (torch.finfo if dtype.is_floating_point else torch.iinfo)(dtype).bits // 8 + chunk_size = count * num_bytes_per_value + return torch.frombuffer(bytearray(file.read(chunk_size)), dtype=dtype) + + def _read_scalar(self, file: IO, *, dtype: torch.dtype) -> Union[int, float]: + return self._read_data(file, dtype=dtype, count=1).item() + + def __iter__(self) -> Iterator[torch.Tensor]: + for _, file in self.datapipe: + if self._read_scalar(file, dtype=torch.float32) != 202021.25: + raise ValueError("Magic number incorrect. Invalid .flo file") + + width = cast(int, self._read_scalar(file, dtype=torch.int32)) + height = cast(int, self._read_scalar(file, dtype=torch.int32)) + + yield self._read_data(file, dtype=torch.float32, count=2 * height * width).reshape((2, height, width)) From 84c4e8874236eb18bd0baabb76a64b4209aa648e Mon Sep 17 00:00:00 2001 From: Kushashwa Ravi Shrimali Date: Fri, 19 Nov 2021 15:23:31 +0530 Subject: [PATCH 23/28] Revert temp fix --- test/builtin_dataset_mocks.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/test/builtin_dataset_mocks.py b/test/builtin_dataset_mocks.py index d4b0b9692bb..92448be4544 100644 --- a/test/builtin_dataset_mocks.py +++ b/test/builtin_dataset_mocks.py @@ -101,11 +101,10 @@ def _get(self, dataset, config): def _decoder(self, dataset_type): def to_bytes(file): - # "ValueError: read from closed file" error, temporary fix for this PR - # try: - return file.read() - # finally: - # file.close() + try: + return file.read() + finally: + file.close() if dataset_type == datasets.utils.DatasetType.RAW: return datasets.decoder.raw From c0b254cd4bce06eaba1939046dc353ccdc28b47a Mon Sep 17 00:00:00 2001 From: Philip Meier Date: Fri, 19 Nov 2021 11:29:26 +0100 Subject: [PATCH 24/28] use common read_flo instead of custom implementation --- .../prototype/datasets/_builtin/sintel.py | 54 ++++++------------- 1 file changed, 17 insertions(+), 37 deletions(-) diff --git a/torchvision/prototype/datasets/_builtin/sintel.py b/torchvision/prototype/datasets/_builtin/sintel.py index 623027643dd..290da8a3d18 100644 --- a/torchvision/prototype/datasets/_builtin/sintel.py +++ b/torchvision/prototype/datasets/_builtin/sintel.py @@ -2,9 +2,8 @@ import pathlib import re from functools import partial -from typing import Any, Callable, Dict, List, Optional, Tuple, Iterator, Iterable, TypeVar +from typing import Any, Callable, Dict, List, Optional, Tuple, Iterator, Iterable, TypeVar, BinaryIO -import numpy as np import torch from torchdata.datapipes.iter import ( IterDataPipe, @@ -23,7 +22,7 @@ OnlineResource, DatasetType, ) -from torchvision.prototype.datasets.utils._internal import INFINITE_BUFFER_SIZE +from torchvision.prototype.datasets.utils._internal import INFINITE_BUFFER_SIZE, read_flo T = TypeVar("T") @@ -97,20 +96,6 @@ def _classify_archive(self, data: Tuple[str, Any], *, pass_name: str) -> Optiona else: return None - def _read_flo(self, file: io.IOBase) -> torch.Tensor: - magic = file.read(4) - if magic != b"PIEH": - raise ValueError("Magic number incorrect. Invalid .flo file") - w = np.frombuffer(file.read(4), dtype=" Tuple[str, int]: path = pathlib.Path(data[0]) category = path.parent.name @@ -125,29 +110,24 @@ def _images_key(self, data: Tuple[Tuple[str, Any], Tuple[str, Any]]) -> Tuple[st def _collate_and_decode_sample( self, - data: Tuple[Tuple[str, io.IOBase], Any], + data: Tuple[Tuple[Optional[str], Optional[BinaryIO]], Tuple[Tuple[str, BinaryIO], Tuple[str, BinaryIO]]], *, - decoder: Optional[Callable[[io.IOBase], torch.Tensor]], - config: DatasetConfig, + decoder: Optional[Callable[[BinaryIO], torch.Tensor]], ) -> Dict[str, Any]: - flo, images = data - img1, img2 = images - flow_arr = self._read_flo(flo[1]) if flo[1] else None - - path1, buffer1 = img1 - path2, buffer2 = img2 - - # We return the scene dir name as the label - label = path1.split("/")[-2] + flow_data, images_data = data + flow_path, flow_buffer = flow_data + image1_data, image2_data = images_data + image1_path, image1_buffer = image1_data + image2_path, image2_buffer = image2_data return dict( - image1=decoder(buffer1) if decoder else buffer1, - image1_path=path1, - image2=decoder(buffer2) if decoder else buffer2, - image2_path=path2, - flow=flow_arr, - flow_path=flo[0], - label=label, + image1=decoder(image1_buffer) if decoder else image1_buffer, + image1_path=image1_path, + image2=decoder(image2_buffer) if decoder else image2_buffer, + image2_path=image2_path, + flow=read_flo(flow_buffer) if flow_buffer else None, + flow_path=flow_path, + scene=pathlib.Path(image1_path).parent.name, ) def _make_datapipe( @@ -185,4 +165,4 @@ def _make_datapipe( pass_images_dp = InSceneGrouper(pass_images_dp) zipped_dp = Mapper(pass_images_dp, self._add_fake_flow_data) - return Mapper(zipped_dp, self._collate_and_decode_sample, fn_kwargs=dict(decoder=decoder, config=config)) + return Mapper(zipped_dp, self._collate_and_decode_sample, fn_kwargs=dict(decoder=decoder)) From e9fa65605b6d8c899a440b66be2a3c10932f6b2c Mon Sep 17 00:00:00 2001 From: Philip Meier Date: Fri, 19 Nov 2021 11:33:23 +0100 Subject: [PATCH 25/28] remove more obsolete code --- torchvision/prototype/datasets/datapipes.py | 29 --------------------- 1 file changed, 29 deletions(-) delete mode 100644 torchvision/prototype/datasets/datapipes.py diff --git a/torchvision/prototype/datasets/datapipes.py b/torchvision/prototype/datasets/datapipes.py deleted file mode 100644 index 5159e3f10aa..00000000000 --- a/torchvision/prototype/datasets/datapipes.py +++ /dev/null @@ -1,29 +0,0 @@ -from typing import Tuple, IO, Iterator, Union, cast - -import torch -from torchdata.datapipes.iter import IterDataPipe - -__all__ = ["FloReader"] - - -class FloReader(IterDataPipe[torch.Tensor]): - def __init__(self, datapipe: IterDataPipe[Tuple[str, IO]]) -> None: - self.datapipe = datapipe - - def _read_data(self, file: IO, *, dtype: torch.dtype, count: int) -> torch.Tensor: - num_bytes_per_value = (torch.finfo if dtype.is_floating_point else torch.iinfo)(dtype).bits // 8 - chunk_size = count * num_bytes_per_value - return torch.frombuffer(bytearray(file.read(chunk_size)), dtype=dtype) - - def _read_scalar(self, file: IO, *, dtype: torch.dtype) -> Union[int, float]: - return self._read_data(file, dtype=dtype, count=1).item() - - def __iter__(self) -> Iterator[torch.Tensor]: - for _, file in self.datapipe: - if self._read_scalar(file, dtype=torch.float32) != 202021.25: - raise ValueError("Magic number incorrect. Invalid .flo file") - - width = cast(int, self._read_scalar(file, dtype=torch.int32)) - height = cast(int, self._read_scalar(file, dtype=torch.int32)) - - yield self._read_data(file, dtype=torch.float32, count=2 * height * width).reshape((2, height, width)) From 3724869aa1349d33a9a88798eb5145087e85aa79 Mon Sep 17 00:00:00 2001 From: Philip Meier Date: Fri, 19 Nov 2021 11:35:11 +0100 Subject: [PATCH 26/28] [DEBUG] check if tests also run on Python 3.9 --- .circleci/config.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 6a5a06f10ef..1df66155903 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -262,7 +262,7 @@ jobs: prototype_test: docker: - - image: circleci/python:3.7 + - image: circleci/python:3.9 steps: - run: name: Install torch From 69194e1d8bffbf34ce81574bad1eb5da126eee6f Mon Sep 17 00:00:00 2001 From: Philip Meier Date: Fri, 19 Nov 2021 11:50:17 +0100 Subject: [PATCH 27/28] Revert "[DEBUG] check if tests also run on Python 3.9" This reverts commit 3724869aa1349d33a9a88798eb5145087e85aa79. --- .circleci/config.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 1df66155903..6a5a06f10ef 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -262,7 +262,7 @@ jobs: prototype_test: docker: - - image: circleci/python:3.9 + - image: circleci/python:3.7 steps: - run: name: Install torch From b4cce900cf45b28588fe0247d6614dc4f8d99f9d Mon Sep 17 00:00:00 2001 From: Philip Meier Date: Mon, 22 Nov 2021 10:56:49 +0100 Subject: [PATCH 28/28] store bytes to avoid reading twice from file handle --- .../prototype/datasets/_builtin/cifar.py | 2 +- .../prototype/datasets/_builtin/coco.py | 2 +- .../prototype/datasets/_builtin/imagenet.py | 4 +- .../prototype/datasets/_builtin/sbd.py | 2 +- .../prototype/datasets/_builtin/sintel.py | 36 +++-------- .../prototype/datasets/_builtin/voc.py | 2 +- .../prototype/datasets/utils/_internal.py | 60 +++++++++++++++---- 7 files changed, 63 insertions(+), 45 deletions(-) diff --git a/torchvision/prototype/datasets/_builtin/cifar.py b/torchvision/prototype/datasets/_builtin/cifar.py index 54a31edfa5c..d2a8a7bc67c 100644 --- a/torchvision/prototype/datasets/_builtin/cifar.py +++ b/torchvision/prototype/datasets/_builtin/cifar.py @@ -95,7 +95,7 @@ def _make_datapipe( def _generate_categories(self, root: pathlib.Path) -> List[str]: dp = self.resources(self.default_config)[0].to_datapipe(pathlib.Path(root) / self.name) dp = TarArchiveReader(dp) - dp = Filter(dp, path_comparator("name", self._META_FILE_NAME)) + dp = Filter(dp, path_comparator("name", value=self._META_FILE_NAME)) dp = Mapper(dp, self._unpickle) return cast(List[str], next(iter(dp))[self._CATEGORIES_KEY]) diff --git a/torchvision/prototype/datasets/_builtin/coco.py b/torchvision/prototype/datasets/_builtin/coco.py index 641d584dc43..749793f1741 100644 --- a/torchvision/prototype/datasets/_builtin/coco.py +++ b/torchvision/prototype/datasets/_builtin/coco.py @@ -115,7 +115,7 @@ def _make_datapipe( images_dp, meta_dp = resource_dps meta_dp = ZipArchiveReader(meta_dp) - meta_dp = Filter(meta_dp, path_comparator("name", f"instances_{config.split}{config.year}.json")) + meta_dp = Filter(meta_dp, path_comparator("name", value=f"instances_{config.split}{config.year}.json")) meta_dp = JsonParser(meta_dp) meta_dp = Mapper(meta_dp, getitem(1)) meta_dp = MappingIterator(meta_dp) diff --git a/torchvision/prototype/datasets/_builtin/imagenet.py b/torchvision/prototype/datasets/_builtin/imagenet.py index d25d55c216f..5639dd59656 100644 --- a/torchvision/prototype/datasets/_builtin/imagenet.py +++ b/torchvision/prototype/datasets/_builtin/imagenet.py @@ -147,7 +147,7 @@ def _make_datapipe( dp = Mapper(dp, self._collate_train_data) elif config.split == "val": devkit_dp = TarArchiveReader(devkit_dp) - devkit_dp = Filter(devkit_dp, path_comparator("name", "ILSVRC2012_validation_ground_truth.txt")) + devkit_dp = Filter(devkit_dp, path_comparator("name", value="ILSVRC2012_validation_ground_truth.txt")) devkit_dp = LineReader(devkit_dp, return_path=False) devkit_dp = Mapper(devkit_dp, int) devkit_dp = Enumerator(devkit_dp, 1) @@ -178,7 +178,7 @@ def _generate_categories(self, root: pathlib.Path) -> List[Tuple[str, ...]]: resources = self.resources(self.default_config) devkit_dp = resources[1].to_datapipe(root / self.name) devkit_dp = TarArchiveReader(devkit_dp) - devkit_dp = Filter(devkit_dp, path_comparator("name", "meta.mat")) + devkit_dp = Filter(devkit_dp, path_comparator("name", value="meta.mat")) meta = next(iter(devkit_dp))[1] synsets = read_mat(meta, squeeze_me=True)["synsets"] diff --git a/torchvision/prototype/datasets/_builtin/sbd.py b/torchvision/prototype/datasets/_builtin/sbd.py index 888a464f69b..046c9bc5d46 100644 --- a/torchvision/prototype/datasets/_builtin/sbd.py +++ b/torchvision/prototype/datasets/_builtin/sbd.py @@ -156,7 +156,7 @@ def _make_datapipe( def _generate_categories(self, root: pathlib.Path) -> Tuple[str, ...]: dp = self.resources(self.default_config)[0].to_datapipe(pathlib.Path(root) / self.name) dp = TarArchiveReader(dp) - dp = Filter(dp, path_comparator("name", "category_names.m")) + dp = Filter(dp, path_comparator("name", value="category_names.m")) dp = LineReader(dp) dp = Mapper(dp, bytes.decode, input_col=1) lines = tuple(zip(*iter(dp)))[1] diff --git a/torchvision/prototype/datasets/_builtin/sintel.py b/torchvision/prototype/datasets/_builtin/sintel.py index 290da8a3d18..15c517e6ab9 100644 --- a/torchvision/prototype/datasets/_builtin/sintel.py +++ b/torchvision/prototype/datasets/_builtin/sintel.py @@ -2,7 +2,7 @@ import pathlib import re from functools import partial -from typing import Any, Callable, Dict, List, Optional, Tuple, Iterator, Iterable, TypeVar, BinaryIO +from typing import Any, Callable, Dict, List, Optional, Tuple, BinaryIO import torch from torchdata.datapipes.iter import ( @@ -22,31 +22,7 @@ OnlineResource, DatasetType, ) -from torchvision.prototype.datasets.utils._internal import INFINITE_BUFFER_SIZE, read_flo - -T = TypeVar("T") - -try: - from itertools import pairwise # type: ignore[attr-defined] -except ImportError: - from itertools import tee - - def pairwise(iterable: Iterable[T]) -> Iterable[Tuple[T, T]]: - a, b = tee(iterable) - next(b, None) - return zip(a, b) - - -class InSceneGrouper(IterDataPipe[Tuple[Tuple[str, T], Tuple[str, T]]]): - def __init__(self, datapipe: IterDataPipe[Tuple[str, T]]) -> None: - self.datapipe = datapipe - - def __iter__(self) -> Iterator[Tuple[Tuple[str, Any], Tuple[str, Any]]]: - for item1, item2 in pairwise(sorted(self.datapipe)): - if pathlib.Path(item1[0]).parent != pathlib.Path(item2[0]).parent: - continue - - yield item1, item2 +from torchvision.prototype.datasets.utils._internal import INFINITE_BUFFER_SIZE, read_flo, InScenePairer, path_accessor class SINTEL(Dataset): @@ -102,7 +78,7 @@ def _flows_key(self, data: Tuple[str, Any]) -> Tuple[str, int]: idx = int(self._FILE_NAME_PATTERN.match(path.name).group("idx")) # type: ignore[union-attr] return category, idx - def _add_fake_flow_data(self, data: Tuple[str, Any]) -> Tuple[tuple, Tuple[str, Any]]: + def _add_fake_flow_data(self, data: Tuple[str, Any]) -> Tuple[Tuple[None, None], Tuple[str, Any]]: return ((None, None), data) def _images_key(self, data: Tuple[Tuple[str, Any], Tuple[str, Any]]) -> Tuple[str, int]: @@ -153,7 +129,9 @@ def _make_datapipe( buffer_size=INFINITE_BUFFER_SIZE, ) flo_dp = Shuffler(flo_dp, buffer_size=INFINITE_BUFFER_SIZE) - pass_images_dp: IterDataPipe[Tuple[str, Any], Tuple[str, Any]] = InSceneGrouper(pass_images_dp) + pass_images_dp: IterDataPipe[Tuple[str, Any], Tuple[str, Any]] = InScenePairer( + pass_images_dp, scene_fn=path_accessor("parent", "name") + ) zipped_dp = IterKeyZipper( flo_dp, pass_images_dp, @@ -162,7 +140,7 @@ def _make_datapipe( ) else: pass_images_dp = Shuffler(filtered_curr_split, buffer_size=INFINITE_BUFFER_SIZE) - pass_images_dp = InSceneGrouper(pass_images_dp) + pass_images_dp = InScenePairer(pass_images_dp, scene_fn=path_accessor("parent", "name")) zipped_dp = Mapper(pass_images_dp, self._add_fake_flow_data) return Mapper(zipped_dp, self._collate_and_decode_sample, fn_kwargs=dict(decoder=decoder)) diff --git a/torchvision/prototype/datasets/_builtin/voc.py b/torchvision/prototype/datasets/_builtin/voc.py index a4175765555..4995f9a9d99 100644 --- a/torchvision/prototype/datasets/_builtin/voc.py +++ b/torchvision/prototype/datasets/_builtin/voc.py @@ -129,7 +129,7 @@ def _make_datapipe( ) split_dp = Filter(split_dp, self._is_in_folder, fn_kwargs=dict(name=self._SPLIT_FOLDER[config.task])) - split_dp = Filter(split_dp, path_comparator("name", f"{config.split}.txt")) + split_dp = Filter(split_dp, path_comparator("name", value=f"{config.split}.txt")) split_dp = LineReader(split_dp, decode=True) split_dp = Shuffler(split_dp, buffer_size=INFINITE_BUFFER_SIZE) diff --git a/torchvision/prototype/datasets/utils/_internal.py b/torchvision/prototype/datasets/utils/_internal.py index 3db10183f68..c94de7f44c8 100644 --- a/torchvision/prototype/datasets/utils/_internal.py +++ b/torchvision/prototype/datasets/utils/_internal.py @@ -21,6 +21,7 @@ Optional, IO, Sized, + Iterable, ) from typing import cast @@ -33,6 +34,19 @@ from torchdata.datapipes.iter import IoPathFileLister, IoPathFileLoader from torchdata.datapipes.utils import StreamWrapper +K = TypeVar("K") +D = TypeVar("D") + +try: + from itertools import pairwise # type: ignore[attr-defined] +except ImportError: + from itertools import tee + + def pairwise(iterable: Iterable[D]) -> Iterable[Tuple[D, D]]: + a, b = tee(iterable) + next(b, None) + return zip(a, b) + __all__ = [ "INFINITE_BUFFER_SIZE", @@ -48,11 +62,9 @@ "Decompressor", "fromfile", "read_flo", + "InScenePairer", ] -K = TypeVar("K") -D = TypeVar("D") - # pseudo-infinite until a true infinite buffer is supported by all datapipes INFINITE_BUFFER_SIZE = 1_000_000_000 @@ -117,17 +129,22 @@ def getitem(*items: Any) -> Callable[[Any], Any]: return functools.partial(_getitem_closure, items=items) -def _path_attribute_accessor(path: pathlib.Path, *, name: str) -> D: - return cast(D, getattr(path, name)) +def _path_attribute_accessor(path: pathlib.Path, *, attrs: Sequence[str]) -> D: + obj: Any = path + for attr in attrs: + obj = getattr(obj, attr) + return cast(D, obj) def _path_accessor_closure(data: Tuple[str, Any], *, getter: Callable[[pathlib.Path], D]) -> D: return getter(pathlib.Path(data[0])) -def path_accessor(getter: Union[str, Callable[[pathlib.Path], D]]) -> Callable[[Tuple[str, Any]], D]: - if isinstance(getter, str): - getter = functools.partial(_path_attribute_accessor, name=getter) +def path_accessor(*attrs: Union[str, Callable[[pathlib.Path], D]]) -> Callable[[Tuple[str, Any]], D]: + if not callable(attrs[0]): + getter = cast(Callable[[pathlib.Path], D], functools.partial(_path_attribute_accessor, attrs=attrs)) + else: + getter = attrs[0] return functools.partial(_path_accessor_closure, getter=getter) @@ -136,8 +153,8 @@ def _path_comparator_closure(data: Tuple[str, Any], *, accessor: Callable[[Tuple return accessor(data) == value -def path_comparator(getter: Union[str, Callable[[pathlib.Path], D]], value: D) -> Callable[[Tuple[str, Any]], bool]: - return functools.partial(_path_comparator_closure, accessor=path_accessor(getter), value=value) +def path_comparator(*attrs: Union[str, Callable[[pathlib.Path], D]], value: D) -> Callable[[Tuple[str, Any]], bool]: + return functools.partial(_path_comparator_closure, accessor=path_accessor(*attrs), value=value) class CompressionType(enum.Enum): @@ -321,3 +338,26 @@ def read_flo(file: BinaryIO) -> torch.Tensor: width, height = fromfile(file, dtype=torch.int32, byte_order="little", count=2) flow = fromfile(file, dtype=torch.float32, byte_order="little", count=height * width * 2) return flow.reshape((height, width, 2)).permute((2, 0, 1)) + + +class InScenePairer(IterDataPipe[Tuple[Tuple[str, BinaryIO], Tuple[str, BinaryIO]]]): + def __init__( + self, datapipe: IterDataPipe[Tuple[str, BinaryIO]], *, scene_fn: Callable[[Tuple[str, BinaryIO]], K] + ) -> None: + self.datapipe = datapipe + self.scene_fn = scene_fn + + def __iter__(self) -> Iterator[Tuple[Tuple[str, BinaryIO], Tuple[str, BinaryIO]]]: + prev_bytes = b"" + for (path1, stream1), (path2, stream2) in pairwise(sorted(self.datapipe)): + if self.scene_fn(path1) != self.scene_fn(path2): + prev_bytes = b"" + continue + + buffer1 = io.BytesIO(prev_bytes) if prev_bytes else stream1 + prev_bytes = stream2.read() + if prev_bytes == b"": + print() + buffer2 = io.BytesIO(prev_bytes) + + yield (path1, buffer1), (path2, buffer2)