diff --git a/src/storage-client/src/source/persist_source.rs b/src/storage-client/src/source/persist_source.rs index 88b608000f167..44041e04985c6 100644 --- a/src/storage-client/src/source/persist_source.rs +++ b/src/storage-client/src/source/persist_source.rs @@ -16,23 +16,27 @@ use std::time::Instant; use mz_persist_client::operators::shard_source::shard_source; use mz_persist_types::codec_impls::UnitSchema; +use timely::communication::Push; use timely::dataflow::channels::pact::Pipeline; -use timely::dataflow::operators::OkErr; +use timely::dataflow::channels::Bundle; +use timely::dataflow::operators::generic::builder_rc::OperatorBuilder; +use timely::dataflow::operators::{Capability, OkErr}; use timely::dataflow::{Scope, Stream}; use timely::progress::Antichain; +use timely::scheduling::Activator; use tokio::sync::Mutex; use mz_expr::MfpPlan; use mz_persist_client::cache::PersistClientCache; use mz_persist_client::fetch::FetchedPart; -use mz_repr::{Diff, GlobalId, Row, Timestamp}; -use mz_timely_util::builder_async::{Event, OperatorBuilder as AsyncOperatorBuilder}; +use mz_repr::{DatumVec, Diff, GlobalId, Row, Timestamp}; use crate::controller::CollectionMetadata; use crate::types::errors::DataflowError; use crate::types::sources::SourceData; pub use mz_persist_client::operators::shard_source::FlowControl; +use mz_timely_util::buffer::ConsolidateBuffer; /// Creates a new source that reads from a persist shard, distributing the work /// of reading data to all timely workers. @@ -146,10 +150,12 @@ where G: Scope, YFn: Fn(Instant, usize) -> bool + 'static, { - let mut builder = AsyncOperatorBuilder::new( + let scope = fetched.scope(); + let mut builder = OperatorBuilder::new( format!("persist_source::decode_and_mfp({})", name), - fetched.scope(), + scope.clone(), ); + let operator_info = builder.operator_info(); let mut fetched_input = builder.new_input(fetched, Pipeline); let (mut updates_output, updates_stream) = builder.new_output(); @@ -159,111 +165,134 @@ where let mut row_builder = Row::default(); // Extract the MFP if it exists; leave behind an identity MFP in that case. - let mut map_filter_project = map_filter_project.as_mut().map(|mfp| mfp.take()); + let map_filter_project = map_filter_project.as_mut().map(|mfp| mfp.take()); - builder.build(move |mut initial_capabilities| async move { - initial_capabilities.clear(); + builder.build(move |_caps| { + // Acquire an activator to reschedule the operator when it has unfinished work. + let activations = scope.activations(); + let activator = Activator::new(&operator_info.address[..], activations); + // Maintain a list of work to do + let mut pending_work = std::collections::VecDeque::new(); + let mut buffer = Default::default(); - let mut buffer = Vec::new(); + move |_frontier| { + fetched_input.for_each(|time, data| { + data.swap(&mut buffer); + let capability = time.retain(); + for fetched_part in buffer.drain(..) { + pending_work.push_back(PendingWork { + capability: capability.clone(), + fetched_part, + }) + } + }); - while let Some(event) = fetched_input.next().await { - let cap = match event { - Event::Data(cap, data) => { - data.swap(&mut buffer); - cap + let mut work = 0; + let start_time = Instant::now(); + let mut output = updates_output.activate(); + let mut handle = ConsolidateBuffer::new(&mut output, 0); + while !pending_work.is_empty() && !yield_fn(start_time, work) { + let done = pending_work.front_mut().unwrap().do_work( + &mut work, + start_time, + &yield_fn, + &until, + map_filter_project.as_ref(), + &mut datum_vec, + &mut row_builder, + &mut handle, + ); + if done { + pending_work.pop_front(); } - Event::Progress(_) => continue, - }; + } + if !pending_work.is_empty() { + activator.activate(); + } + } + }); - // SUBTLE: This operator yields back to timely whenever an await returns a - // Pending result from the overall async/await state machine `poll`. Since - // this is fetching from remote storage, it will yield and thus we can reset - // our yield counters here. - let mut decode_start = Instant::now(); + updates_stream +} - for fetched_part in buffer.drain(..) { - // Apply as much logic to `updates` as we can, before we emit anything. - let (updates_size_hint_min, updates_size_hint_max) = fetched_part.size_hint(); - let mut updates = - Vec::with_capacity(updates_size_hint_max.unwrap_or(updates_size_hint_min)); - for ((key, val), time, diff) in fetched_part { - if !until.less_equal(&time) { - match (key, val) { - (Ok(SourceData(Ok(row))), Ok(())) => { - if let Some(mfp) = &mut map_filter_project { - let arena = mz_repr::RowArena::new(); - let mut datums_local = datum_vec.borrow_with(&row); - for result in mfp.evaluate( - &mut datums_local, - &arena, - time, - diff, - |time| !until.less_equal(time), - &mut row_builder, - ) { - match result { - Ok((row, time, diff)) => { - // Additional `until` filtering due to temporal filters. - if !until.less_equal(&time) { - updates.push((Ok(row), time, diff)); - } - } - Err((err, time, diff)) => { - // Additional `until` filtering due to temporal filters. - if !until.less_equal(&time) { - updates.push((Err(err), time, diff)); - } - } - } +/// Pending work to read from fetched parts +struct PendingWork { + /// The time at which the work should happen. + capability: Capability, + /// Pending fetched part. + fetched_part: FetchedPart, +} + +impl PendingWork { + /// Perform work, reading from the fetched part, decoding, and sending outputs, while checking + /// `yield_fn` whether more fuel is available. + fn do_work( + &mut self, + work: &mut usize, + start_time: Instant, + yield_fn: YFn, + until: &Antichain, + map_filter_project: Option<&MfpPlan>, + datum_vec: &mut DatumVec, + row_builder: &mut Row, + output: &mut ConsolidateBuffer, Diff, P>, + ) -> bool + where + P: Push, Timestamp, Diff)>>, + YFn: Fn(Instant, usize) -> bool, + { + while let Some(((key, val), time, diff)) = self.fetched_part.next() { + if until.less_equal(&time) { + continue; + } + match (key, val) { + (Ok(SourceData(Ok(row))), Ok(())) => { + if let Some(mfp) = map_filter_project { + let arena = mz_repr::RowArena::new(); + let mut datums_local = datum_vec.borrow_with(&row); + for result in mfp.evaluate( + &mut datums_local, + &arena, + time, + diff, + |time| !until.less_equal(time), + row_builder, + ) { + match result { + Ok((row, time, diff)) => { + // Additional `until` filtering due to temporal filters. + if !until.less_equal(&time) { + output.give_at(&self.capability, (Ok(row), time, diff)); + *work += 1; + } + } + Err((err, time, diff)) => { + // Additional `until` filtering due to temporal filters. + if !until.less_equal(&time) { + output.give_at(&self.capability, (Err(err), time, diff)); + *work += 1; } - } else { - updates.push((Ok(row), time, diff)); } } - (Ok(SourceData(Err(err))), Ok(())) => { - updates.push((Err(err), time, diff)); - } - // TODO(petrosagg): error handling - (Err(_), Ok(_)) | (Ok(_), Err(_)) | (Err(_), Err(_)) => { - panic!("decoding failed") - } - } - } - if yield_fn(decode_start, updates.len()) { - // A large part of the point of yielding is to let later operators - // reduce down the data, so emit what we have. Note that this means - // we don't get to consolidate everything, but that's part of the - // tradeoff in tuning yield_fn. - differential_dataflow::consolidation::consolidate_updates(&mut updates); - - { - // Do very fine-grained output activation/session - // creation to ensure that we don't hold activated - // outputs or sessions across await points, which - // would prevent messages from being flushed from - // the shared timely output buffer. - let mut updates_output = updates_output.activate(); - updates_output.session(&cap).give_vec(&mut updates); } - - // Force a yield to give back the timely thread, reactivating on our - // way out. - tokio::task::yield_now().await; - decode_start = Instant::now(); + } else { + output.give_at(&self.capability, (Ok(row), time, diff)); + *work += 1; } } - differential_dataflow::consolidation::consolidate_updates(&mut updates); - - // Do very fine-grained output activation/session creation - // to ensure that we don't hold activated outputs or - // sessions across await points, which would prevent - // messages from being flushed from the shared timely output - // buffer. - let mut updates_output = updates_output.activate(); - updates_output.session(&cap).give_vec(&mut updates); + (Ok(SourceData(Err(err))), Ok(())) => { + output.give_at(&self.capability, (Err(err), time, diff)); + *work += 1; + } + // TODO(petrosagg): error handling + (Err(_), Ok(_)) | (Ok(_), Err(_)) | (Err(_), Err(_)) => { + panic!("decoding failed") + } + } + if yield_fn(start_time, *work) { + return false; } } - }); - - updates_stream + true + } } diff --git a/src/timely-util/src/buffer.rs b/src/timely-util/src/buffer.rs index 2c64b2b051ad4..bfbef3d062c0d 100644 --- a/src/timely-util/src/buffer.rs +++ b/src/timely-util/src/buffer.rs @@ -15,7 +15,6 @@ use differential_dataflow::consolidation::consolidate_updates; use differential_dataflow::difference::Semigroup; -use differential_dataflow::lattice::Lattice; use differential_dataflow::Data; use timely::communication::Push; use timely::dataflow::channels::Bundle; @@ -42,7 +41,7 @@ use timely::progress::Timestamp; pub struct ConsolidateBuffer<'a, 'b, T, D: Data, R: Semigroup, P> where P: Push> + 'a, - T: Clone + Lattice + Ord + Timestamp + 'a, + T: Data + Timestamp + 'a, D: 'a, { // a buffer for records, to send at self.cap @@ -56,7 +55,7 @@ where impl<'a, 'b, T, D: Data, R: Semigroup, P> ConsolidateBuffer<'a, 'b, T, D, R, P> where - T: Clone + Lattice + Ord + Timestamp + 'a, + T: Data + Timestamp + 'a, P: Push> + 'a, { /// Create a new [ConsolidateBuffer], wrapping the provided session. @@ -94,6 +93,25 @@ where // Retain capability for the specified output port. self.cap = Some(cap.delayed_for_output(cap.time(), self.port)); } + self.give_internal(data); + } + + /// Give an element to the buffer, using a pre-fabricated capability. Note that the capability + /// must be valid for the associated output. + pub fn give_at(&mut self, cap: &Capability, data: (D, T, R)) { + // Retain a cap for the current time, which will be used on flush. + if self.cap.as_ref().map_or(true, |t| t.time() != cap.time()) { + // Flush on capability change + self.flush(); + // Retain capability. + self.cap = Some(cap.clone()); + } + self.give_internal(data); + } + + /// Give an element and possibly flush the buffer. Note that this needs to have access + /// to a capability, which the public functions ensure. + fn give_internal(&mut self, data: (D, T, R)) { self.buffer.push(data); // Limit, if possible, the lifetime of the allocations for data @@ -138,7 +156,7 @@ where impl<'a, 'b, T, D: Data, R: Semigroup, P> Drop for ConsolidateBuffer<'a, 'b, T, D, R, P> where P: Push> + 'a, - T: Clone + Lattice + Ord + Timestamp + 'a, + T: Data + Timestamp + 'a, D: 'a, { fn drop(&mut self) {