Skip to content

Conversation

@yongtang
Copy link
Member

This PR tries to expose a tfio.IOTensor which could be applied to and io related data which are indexable (__getitem__ and __len__)

The idea is to bind __getitem__ and __len__ to kernel ops in run time, so that is is not necessarily to read everything in memory.

The first file format is the WAV file. With tfio.IOTensor dtype and shape are exposed with __getitem__ and __len__.

Further, a rate property has been exposed specifically for Audio/WAV file which gives sample rate.

This tfio.IOTensor only works in eager mode.

In additional this PR also converts WavDataset to use IOTensor (instead of direct C++ implementation).

Finally, this PR also exposes tfio.Dataset which carries PR #416

Note as was discussed, rebatch has been dropped. Instead, a PR to core tensorflow repo will be opened.

Signed-off-by: Yong Tang [email protected]

@yongtang
Copy link
Member Author

yongtang commented Aug 15, 2019

@terrytangyuan @BryanCutler I have updated the PR to include both indexable and iterable IOTensor now.

Example of Indexable IOTensor is WAV which could be accessed through __getitem__

wav = tfio.IOTensor.from_wav(filename)
sample_3_to_5 = wav[3:5]

Example of Iterable IOTensor is Kafka which could be iterated through __iter__:

kafka = tfio.IOTensor.from_kafka("test", eof=True)
for entry in kafka:
    print(entry)

Would appreciate feedback, and if possible, would like to see a similar pattern applied to other formats as well. (Or, any other suggestions to reduce code duplication in C++ for Dataset are welcomed).

@yongtang
Copy link
Member Author

/cc @zhjunqin in case you are interested as this PR touches Kafka part.

The new proposed tfio.IOTensor.from_kafka allows user to iterate through items in eager mode:

kafka = tfio.IOTensor.from_kafka("test", eof=True)
for entry in kafka:
    print(entry)

The difference with KafkaDataset's C++ implementation is that new implementation in IOTensor does not need to manually construct a subgraph in C++ anymore (thus increases code reuse).

@yongtang yongtang changed the title Expose tfio.IOTensor class and from_audio and tfio.Dataset Expose tfio.IOTensor class and from_audio and tfio.IOTensor.to_dataset() Aug 15, 2019
@yongtang
Copy link
Member Author

@BryanCutler @terrytangyuan Instead of exposing a tfio.Dataset, I have dropped the exposure of tfio.Dataset, and added a tfio.IOTensor.to_dataset() which I think makes much more sense to avoid the confusion.

@BryanCutler
Copy link
Member

From a quick glance, this looks really cool @yongtang ! I'll try to take a closer look this weekend.

This PR tries to expose a tfio.IOTensor which could
be applied to and io related data which are indexable
(`__getitem__` and `__len__`)

The idea is to bind `__getitem__` and `__len__` to
kernel ops in run time, so that is is not necessarily
to read everything in memory.

The first file format is the WAV file. With tfio.IOTensor
dtype and shape are exposed with `__getitem__` and `__len__`.

Further, a `rate` property has been exposed specifically
for Audio/WAV file which gives sample rate.

This tfio.IOTensor only works in eager mode.

In additional this PR also converts WavDataset to
use IOTensor (instead of direct C++ implementation).

Signed-off-by: Yong Tang <[email protected]>
Signed-off-by: Yong Tang <[email protected]>
This is a follow up to add a KafkaIOTensor which could be
used as an iterable in eager mode:

```
kafka = tfio.IOTensor.from_kafka(subscription, eof=True)
for e in kafka:
  print(e)
```

Signed-off-by: Yong Tang <[email protected]>
Instead of exposing a tfio.Dataset, I have added a `tfio.IOTensor.to_dataset()`
which I think makes much more sense to avoid the confusion.

Signed-off-by: Yong Tang <[email protected]>
Signed-off-by: Yong Tang <[email protected]>
@yongtang
Copy link
Member Author

@jiachengxu while I was looking into Apache Arrow I noticed that Arrow have JSON support and column read which could easily used for ndjson files (or files of json elements concatenated. I added a to_json as part of the IOTensor.

The JSONIOTensor supports nested element structures so shape() and dtype() will return nested structure if there are multiple columns, and return single shape and data type if there is only one column. Please take a look if you don't mind.

@yongtang
Copy link
Member Author

@BryanCutler @terrytangyuan Added some documentation. Examples are mostly in place though it only applies to Kafka, JSON and WAV. The documentations are copied to here:

An IOTensor is a tensor with data backed by IO operations. For example, an AudioIOTensor is a tensor with data from an audio file, a KafkaIOTensor is a tensor with data from reading the messages of a Kafka stream server.

There are two types of IOTensor, a normal IOTensor which itself is indexable, or a degenerated IOIterableTensor which only supports accessing the tensor iteratively.

Since IOTensor is indexable, it support __getitem__() and __len__() methods in Python. In other words, it is a subclass of collections.abc.Sequence.

Example:

>>> import tensorflow_io as tfio
>>>
>>> samples = tfio.IOTensor.from_audio("sample.wav")
>>> print(samples[1000:1005])
... tf.Tensor(
... [[-3]
...  [-7]
...  [-6]
...  [-6]
...  [-5]], shape=(5, 1), dtype=int16)

A IOIterableTensor is really a subclass of collections.abc.Iterable. It provides a __iter__() method that could be used (through iter indirectly) to access data in an iterative fashion.

Example:

>>> import tensorflow_io as tfio
>>>
>>> kafka = tfio.IOTensor.from_kafka("test", eof=True)
>>> for message in kafka:
>>>   print(message)
... tf.Tensor(['D0'], shape=(1,), dtype=string)
... tf.Tensor(['D1'], shape=(1,), dtype=string)
... tf.Tensor(['D2'], shape=(1,), dtype=string)
... tf.Tensor(['D3'], shape=(1,), dtype=string)
... tf.Tensor(['D4'], shape=(1,), dtype=string)

Indexable vs. Iterable

While many IO formats are natually considered as iterable only, in most of the situations they could still be accessed by indexing through certain workaround. For example, a Kafka stream is not directly indexable yet the stream could be saved in memory or disk to allow indexing. Another example is the packet capture (PCAP) file in networking area. The packets inside a PCAP file is concatenated sequentially. Since each packets could have a variable length, the only way to access each packet is to read one packet at a time. If the PCAP file is huge (e.g., hundreds of GBs or even TBs), it may not be realistic (or necessarily) to save the index of every packet in memory. We could consider PCAP format as iterable only.

As we could see the availability of memory size could be a factor to decide if a format is indexable or not. However, this factor could also be blurred as well in distributed computing. One common case is the file format that might be splittable where a file could be split into multiple chunks (without read the whole file) with no data overlapping in between those chunks. For example, a text file could be reliably split into multiple chunks with line feed (LF) as the boundary. Processing of chunks could then be distributed across a group of compute node to speed up (by reading small chunks into memory). From that standpoint, we could still consider splittable formats as indexable.

For that reason our focus is IOTensor with convinience indexing and slicing through __getitem__() method.

Lazy Read

One useful feature of IOTensor is the lazy read. Data inside a file is not read into memory until needed. This could be convenient where only a small segment of the data is needed. For example, a WAV file could be as big as GBs but in many cases only several seconds of samples are used for training or inference purposes.

While CPU memory is cheap nowadays, GPU memory is still considered as an expensive resource. It is also imperative to fit data in GPU memory for speed up purposes. From that perspective lazy read could be very helpful.

Association of Meta Data

While a file format could consist of mostly numeric data, in may situations the meta data is important as well. For example, in audio file format the sample rate is a number that is necessary for almost everything. Association of the sample rate with the sample of int16 Tensor is more helpful, especially in eager mode.

Example:

>>> import tensorflow_io as tfio
>>>
>>> samples = tfio.IOTensor.from_audio("sample.wav")
>>> print(samples.rate)
... 44100

Nested Element Structure

The concept of IOTensor is not limited to a Tensor of single data type. It supports nested element structure which could consists of many components and complex structures. The exposed API such as shape() or dtype() will display the shape and data type of an individual Tensor, or a nested structure of shape and data types for components of a composite Tensor.

Example:

>>> import tensorflow_io as tfio
>>>
>>> samples = tfio.IOTensor.from_audio("sample.wav")
>>> print(samples.shape)
... (22050, 2)
>>> print(samples.dtype)
... <dtype: 'int32'>
>>>
>>> features = tfio.IOTensor.from_json("feature.json")
>>> print(features.shape)
... (TensorShape([Dimension(2)]), TensorShape([Dimension(2)]))
>>> print(features.dtype)
... (tf.float64, tf.int64)

Access Columns of Tabular Data Formats

May file formats such as Parquet or Json are considered as Tabular because they consists of columns in a table. With IOTensor it is possible to access individual columns through __call__().

Example:

>>> import tensorflow_io as tfio
>>>
>>> features = tfio.IOTensor.from_json("feature.json")
>>> print(features.shape)
... (TensorShape([Dimension(2)]), TensorShape([Dimension(2)]))
>>> print(features.dtype)
... (tf.float64, tf.int64)
>>>
>>> print(features("floatfeature").shape)
... (2,)
>>> print(features("floatfeature").dtype)
... <dtype: 'float64'>

Conversion from and to Tensor and Dataset

When needed, IOTensor could be converted into a Tensor (through to_tensor(), or a tf.data.Dataset (through to_dataset(), to suppor operations that is only available through Tensor or tf.data.Dataset.

Example:

>>> import tensorflow as tf
>>> import tensorflow_io as tfio
>>>
>>> features = tfio.IOTensor.from_json("feature.json")
>>>
>>> features_tensor = features.to_tensor()
>>> print(features_tensor())
... (<tf.Tensor: id=21, shape=(2,), dtype=float64, numpy=array([1.1, 2.1])>, <tf.Tensor: id=22, shape=(2,), dtype=int64, numpy=array([2, 3])>)
>>>
>>> features_dataset = features.to_dataset()
>>> print(features_dataset)
... <_IOTensorDataset shapes: ((), ()), types: (tf.float64, tf.int64)>
>>>
>>> dataset = tf.data.Dataset.zip((features_dataset, labels_dataset))

Signed-off-by: Yong Tang <[email protected]>
@yongtang
Copy link
Member Author

There are some further enhancement we could make I think:

  1. Wrap IOTensor as a CompositeTensor so that is potentially could be passed to TF directly.

I have played with CompositeTensor, not very detailed documentation, but I think I managed to get the following work:

class IOTensorSpec(tf.TypeSpec):

  def __init__(self, specs):
    self._specs = specs

  @property
  def value_type(self):
    return self

  def _to_components(self, value):
    return value.components

  def _from_components(self, components):
    return IOCompositeTensor(components)

  @property
  def _component_specs(self):
    return self._specs

  def _serialize(self):
    return self._specs

class IOCompositeTensor(composite_tensor.CompositeTensor):
  def __init__(self, components):
    self._components = components
    self._specs = tf.nest.map_structure(lambda e: tf.TensorSpec(e.shape, e.dtype), components)

  @property
  def _type_spec(self):
    return IOTensorSpec(self._specs)

  @property
  def components(self):
    return self._components

  @property
  def specs(self):
    return self._specs

tensor = IOCompositeTensor((tf.constant([1, 2, 3]), tf.constant([4, 5, 6])))

print(tensor)

components = tf.nest.flatten(tensor, expand_composites=True)
print(components)

reversed_components = (components[1], components[0])
reversed_tensor = tf.nest.pack_sequence_as(tensor.specs, reversed_components, expand_composites=True)

print(tf.nest.flatten(reversed_tensor, expand_composites=True))
  1. At the moment the slicing and indexing are pretty much hand-crafted (with manipulation of start, stop, step). Would like to convert to a wrapper class (or python slice class), and ideally allow concatenate multiple IOTensors sequentially. This will help when we want to distribute the Tensor to different worker nodes.

  2. Also, in the PR I manually exposes properties (kind of metadata like sample rate of WAV). I would prefer to make it a decorator so that any properties with @metadata could be automatically captured and printed out in __repr__.

Copy link
Member

@BryanCutler BryanCutler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work @yongtang , this looks great!

One concern I have (sorry if you have discussed this before), is for something like KafkaIOTensor that is meant for iteration, what is the benefit of having this primitive op vs. using a tf.data.Dataset, and is there a performance difference? It might be confusing to the end user why they need the extra step to convert to a Dataset.

As an alternative to the Python class definitions, I was thinking some minor changes like below you could avoid having private constructors and the user seeing from_kafka method when they have an instance of an AudioIOTensor. Basically, just having static methods in IOTensor that return the concrete classes and move the rest to a super class, but it's up to you though.

class _IOBaseTensor():
  ...

class _IOTensorIndexable(_IOBaseTensor):
  ...

class AudioIOTensor(_IOTensorIndexable):
  ...

class IOTensor(object):
  @staticmethod
  def from_audio(filename, **kwargs):
    ...
  

def __getitem__(self, key):
"""Returns the specified piece of this IOTensor."""
if isinstance(key, slice):
start = key.start
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these could be simplified a bit to start = key.start or 0 but that might not be as readable so up to you

Returns:
A `tf.data.Dataset` with value obtained from this `IOTensor`.
"""
class _IOTensorDataset(tf.compat.v2.data.Dataset):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you move this to the top level, it's a little confusing to be an inner class? Also, would it make sense to define in a way that would work for _IOIterableTensorDataset too and move the logic to create the tf.data.Dataset in to_dataset()? For example:

class _IOTensorDataset(tf.compat.v2.data.Dataset):
  def __init__(self, dataset, resource, function):
    self._dataset = dataset
    self._resource = resource
    self._function = function

  ...

then to_dataset() would build a dataset as done in __init__ and return a _IOTensorDataset

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I will update the structure.

@yongtang
Copy link
Member Author

Thanks @BryanCutler ! Those are great suggestions about python classes. I am also thinking about split into several files so that it is easier to read.

One thing about dataset, is that dataset has been mostly focus on data pipeline with the best performance, especially for image files which are largely homogenous (except with some image preprocessing). There are a couple of limitations. One is that dataset is row-based (vs. column based like Pandas). One example is that, while dataset could be zipped to create a multi-columns dataset, you cannot zip datasets with different batch sizes. This is largely because dataset implementation are C++ heavy and it is hard to implement a column-based one in C++ (it is still possible, though iterator instead of array makes it complicated). Another is that dataset is more or less element-centric and lacks meta data. While you could attach some metadata to a customized class, once you run a dataset operation (e.g., zip or concatenate) then the metadata are lost with the retuned dataset. For image processing, metadata may not be that important but for other data source things could be different.

The idea is to have a higher level wrapper class, to attach the metadata for easy feature engineering and more column-oriented functionalities.

I think we could provide dataset class for iterators like Kafka, though one thing I definitely want to reduce is the code duplication. Really not a big fun of having half of the C++ code (DatasetOp/Dataset/Iterator) just to build a skeleton Op, and another half to do the actual work.

In this PR, I tried to reduce code duplication. For any future C++ contribution as long as Init and Next are implemented then everything else could be wired up automatically. I am hoping with changes like that could reduce the C++ code to less than 40% for this repo. This repo is still considered as C++ by GitHub, but this is definitely not something we want to see. I would like to eventually see GitHub label this repo as Python.

One good thing with realignment of the C++ implementation, is that as long as we implement some basic building blocks, we could easily add a wrapper to have dataset and other implementations . I think supporting dataset for row-based performance operations, and another higher level class with additional functionalities (and column-based with metadata) would be good. And we could support both with the same C++ code base.

@yongtang
Copy link
Member Author

@BryanCutler I like the class layout you posted. The staticmethod probably could be replaced with classmethod. I noticed that staticmethod is less flexible than classmethod, when I tried to enhance the data.Dataset. The issue is that staticmethod does not have the class information of the subclass, so it is not possible for a subclass to override staticmethod. Other than that I think those are great suggestions.

@yongtang
Copy link
Member Author

One more thing about enhancement:

  1. Null value was never handled in the past, but with the new class I think it is finally possible to use a tuple (val, nil) to allow null value. The nil could be optional and could be just index only (to save storage). I think column-oriented approach will help here.

@yongtang
Copy link
Member Author

@BryanCutler Thinking further again, I will remove the KafkaIterableIOTensor so iteration-only operations will be done through KafkaDataset.

I do plan to introduce an indexable KafkaIOTensor that reused the code in KafkaDataset, but lazily stores the data in memory, so that

  • KafkaDataset will be used for iterations
  • KafkaIOTensor will be used for indexing and slicing (build on top of KafkaDataset code base).

As was mentioned in #420 (comment), even though Kafka naturally fits a iterable category, it is still could be made indexable on top of it.

One particular use case is the example ML with Kafka on Gitter channel by Kai (from Confluence). The ML algorithm need to normalize the data with summation. But, it is really very inconvenience to use dataset to achieve this simple operation, as summation is not a natural operation of dataset.

With the above plan, I think your concern could be addressed and user will have less confusion.

One additional note about the performance, is that the implementation used in this PR takes the RT_RESOURCE approach. It is pretty much the same as Dataset's internal implementation. So I would assume there is no performance difference.

@BryanCutler
Copy link
Member

The staticmethod probably could be replaced with classmethod. I noticed that staticmethod is less flexible than classmethod, when I tried to enhance the data.Dataset. The issue is that staticmethod does not have the class information of the subclass, so it is not possible for a subclass to override staticmethod. Other than that I think those are great suggestions.

Yeah, that sounds fine. I don't think it's a problem to use classmethod because that is commonly used as an alternate constructor and it's clear what it's for.

@BryanCutler
Copy link
Member

I do plan to introduce an indexable KafkaIOTensor that reused the code in KafkaDataset, but lazily stores the data in memory, so that
KafkaDataset will be used for iterations
KafkaIOTensor will be used for indexing and slicing (build on top of KafkaDataset code base).

That sounds good to me. My concern was that providing 2 ways to iterate over the same data might be confusing to the user, and for iteration they might want to use tf.data based anyway.

@yongtang
Copy link
Member Author

Close this one as this is carried in #437.

@yongtang yongtang closed this Aug 22, 2019
@yongtang yongtang deleted the wav branch January 20, 2020 03:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants