Skip to content

Conversation

@antiguru
Copy link
Member

@antiguru antiguru commented Aug 3, 2021

Currently, the Stream abstraction has an opinion of the content type of
message bundles, which is that all datums are contained in vectors. With
this change, Streams are generic over the container type, allowing user
code to supply other container types if required.

This change exposes the lower-level building blocks to provide generic
operators over the container type, but many operators still require the
data to be contained by vectors.

Signed-off-by: Moritz Hoffmann [email protected]

@antiguru
Copy link
Member Author

antiguru commented Aug 10, 2021

Summary

Timely currently assumes that batches of data are a combination of time, meta information, and a vector of datums. Requiring the data batch to be encoded as a vector limits flexibility in certain situations. Some clients could store data in a column-first batch format, or would like to customize the default allocation policy through a custom type. Here, we analyze the requirements Timely imposes on a possible batch type and work towards generalizing the API be open to the type of batch data structure.

Goals

The goals of this effort are twofold:

  1. Document the requirements Timely has for a batch data structure.
  2. Propose an implementation that generalizes the batch data structure for lower-level APIs in Timely.

The goal should be met maintaining compatibility for existing client code.

Non-goals

It is no goal to provide a different backing structure than the currently default Vec implementation.

Description

In Timely, data is passed between operators in the form of a Message, which includes a timestamp, a sending worker, a sequence number and a vector of datums. Operators normally do not interact with messages but use the input or session API to create messages at specific times from individual datums or iterators. Internally, Timely creates messages by accumulating a maximum amount of data and pushing messages of similar size through the dataflow. This limits the API requirements for batches of data.

In the following, we define a Container type that exposes the interfaces required for Timely to interact with batches of data. A container needs to fulfill the following contracts:

  • Input and session APIs need to construct containers from individual datums or iterators of datums in the high-level API. A lower-level API could decide to accept containers as they are.
  • The exchange operator needs to dissect containers and construct new containers for each destination worker.
  • Zero-copy/networked communication setups require containers to serialize.

A container should represent possibly immutable data. For this reason, each container should have an associated builder. The builder accepts updates, wich are then stashed into a finalized container for passing along dataflow edges.

The container API should enable Timely to re-use allocations where possible.

Alternatives

The current API using vectors of data can be interpreted as vector of a single data elements, which itself contains the batch of data. It would add a pointer dereference to access the data, but does not require changes to Timely's API. The downside of this approach is that containers cannot express custom allocation strategies.

@antiguru antiguru force-pushed the container_stream branch 3 times, most recently from 9e3bee5 to cfc99e6 Compare August 10, 2021 12:36
@antiguru
Copy link
Member Author

I changed the implementation to limit changes on public interfaces. Currently, the InputHandle and OutputHandle are generic over their container type and require changed annotations. We could avoid this breaking change by providing an aliased type defaulting to the vector-based container.

@antiguru
Copy link
Member Author

Differential now compiles without any changes. Materialize encodes some of the types and needs a few changes.

Currently, the Stream abstraction has an opinion of the content type of
message bundles, which is that all datums are contained in vectors. With
this change, Streams are generic over the container type, allowing user
code to supply other container types if required.

This change exposes the lower-level building blocks to provide generic
operators over the container type, but many operators still require the
data to be contained by vectors.

Enables the drain function on containers implementing DrainContainer.
Due avoid lifetime complications, the trait needs to be implemented with
a known lifetime, for example using

   impl<C: Container> ... where for<'a> &'a mut C: DrainContainer

Container builder: Separate container from building a container

CoreStream: Alias core stream over vectors as `Stream` to limit API changes

Signed-off-by: Moritz Hoffmann <[email protected]>
Signed-off-by: Moritz Hoffmann <[email protected]>
...for pushers, input, unordered input, a few more.

Signed-off-by: Moritz Hoffmann <[email protected]>
Signed-off-by: Moritz Hoffmann <[email protected]>
Signed-off-by: Moritz Hoffmann <[email protected]>
Signed-off-by: Moritz Hoffmann <[email protected]>
Signed-off-by: Moritz Hoffmann <[email protected]>
Signed-off-by: Moritz Hoffmann <[email protected]>
Signed-off-by: Moritz Hoffmann <[email protected]>
Signed-off-by: Moritz Hoffmann <[email protected]>
Signed-off-by: Moritz Hoffmann <[email protected]>
Signed-off-by: Moritz Hoffmann <[email protected]>
Signed-off-by: Moritz Hoffmann <[email protected]>
Signed-off-by: Moritz Hoffmann <[email protected]>
Signed-off-by: Moritz Hoffmann <[email protected]>
Signed-off-by: Moritz Hoffmann <[email protected]>
Unfortunately, timestamps cannot yet be recycled because the association
of product time's allocation cannot be expressed.

Signed-off-by: Moritz Hoffmann <[email protected]>
@antiguru
Copy link
Member Author

Superseded by #426.

@antiguru antiguru closed this Dec 16, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant