Skip to content

Conversation

@antiguru
Copy link
Member

@antiguru antiguru commented Jan 14, 2023

This is an experiment to see what changes if we consolidate output from persist differently than to wait for, say 1M rows and then consolidate, but rather use much smaller buffers. It's reusing the ConsolidateBuffer type, which wraps an output handle and compacts in-place.

The implementation isn't nice, partly due to async, and because it's late :) The main issue is that the session should not survive the await point, but this point is in a loop where it's used. There's probably a better way than using ManuallyDrop!

Motivation

This tries to wire up an established pattern from log processing to use in persist.

Tips to the reviewer

The first commit is only code movement. The second is the interesting one!

Checklist

  • This PR has adequate test coverage / QA involvement has been duly considered.
  • This PR evolves an existing $T ⇔ Proto$T mapping (possibly in a backwards-incompatible way) and therefore is tagged with a T-proto label.
  • If this PR will require changes to cloud orchestration, there is a
    companion cloud PR to account for those changes that is tagged with
    the release-blocker label (example).
  • This PR includes the following user-facing behavior changes:

@antiguru antiguru requested review from danhhz and pH14 January 14, 2023 02:35
Copy link
Contributor

@danhhz danhhz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ConsolidateBuffer is cool! I particularly like the "repeats until the consolidation recovered less than half of the buffer's capacity" heuristic

yield_fn: YFn,
) -> (
Stream<G, (Result<Row, DataflowError>, Timestamp, Diff)>,
Stream<G, (Row, Timestamp, Diff)>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why did this have to change?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should split this change out. The reasoning here is that Row is 32 bytes, but DataflowError is 80 bytes. Multiplexing them adds another 8 bytes to each element, which means we're wasting 56 bytes most of the time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 wfm!

YFn: Fn(Instant, usize) -> bool + 'static,
{
let (stream, token) = persist_source_core(
let (updates, errs, token) = persist_source_core(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

am I missing something or are persist_source and persist_source_core the same after this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, it seems they are! I can change it as part of this PR, or not, whatever you prefer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we're keeping the change to persist_source_core's return type, I lean toward yes, clean them up while we're in here making changes

updates_output.session(&cap).give_vec(&mut updates);
if yield_fn(decode_start, updates) {
updates = 0;
unsafe {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I try pretty hard to avoid unsafe whenever possible and would feel much more comfortable with merging this if we first did whatever refactor was necessary to avoid it. Maybe we could use Option instead and turn the unsafety into .expect panics?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressing a shortcoming in Timely in TimelyDataflow/timely-dataflow#496 to avoid the unsafe code here. I agree this is bad!

@antiguru
Copy link
Member Author

I updated the PR to avoid any unsafe code, but it's depending on a change in Timely (TimelyDataflow/timely-dataflow#496).

Copy link
Contributor

@danhhz danhhz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks!

YFn: Fn(Instant, usize) -> bool + 'static,
{
let (stream, token) = persist_source_core(
let (updates, errs, token) = persist_source_core(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we're keeping the change to persist_source_core's return type, I lean toward yes, clean them up while we're in here making changes

yield_fn: YFn,
) -> (
Stream<G, (Result<Row, DataflowError>, Timestamp, Diff)>,
Stream<G, (Row, Timestamp, Diff)>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 wfm!

This was largely redundant with persist_source.

Signed-off-by: Moritz Hoffmann <[email protected]>
@danhhz
Copy link
Contributor

danhhz commented Jan 20, 2023

Dunno if you're waiting on me to take another look or for the timely change to get merged, but feel free to merge this when you're happy with it

@antiguru
Copy link
Member Author

Closed in favor of #17252, which converts the operator into a fueled one.

@antiguru antiguru closed this Jan 27, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants