From 61137542e9bd94b4432a1b088d71f1f7ab38ab69 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Fri, 13 Jan 2023 20:12:59 -0500 Subject: [PATCH 1/4] Move ConsolidateBuffer to timely-util Signed-off-by: Moritz Hoffmann --- Cargo.lock | 1 - src/compute/src/logging/differential.rs | 3 +- src/compute/src/logging/mod.rs | 95 -------------------- src/compute/src/logging/reachability.rs | 3 +- src/compute/src/logging/timely.rs | 3 +- src/timely-util/Cargo.toml | 1 - src/timely-util/src/buffer.rs | 110 ++++++++++++++++++++++++ src/timely-util/src/lib.rs | 1 + src/timely-util/src/panic.rs | 2 +- 9 files changed, 118 insertions(+), 101 deletions(-) create mode 100644 src/timely-util/src/buffer.rs diff --git a/Cargo.lock b/Cargo.lock index 0aea0ba6cec38..ffee7233086b8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4897,7 +4897,6 @@ dependencies = [ "proptest", "serde", "timely", - "timely_communication", "tokio", "workspace-hack", ] diff --git a/src/compute/src/logging/differential.rs b/src/compute/src/logging/differential.rs index 58b99c7aed606..3a6e139f04e87 100644 --- a/src/compute/src/logging/differential.rs +++ b/src/compute/src/logging/differential.rs @@ -28,11 +28,12 @@ use mz_expr::{permutation_for_arrangement, MirScalarExpr}; use mz_ore::cast::CastFrom; use mz_repr::{Datum, DatumVec, Diff, Row, Timestamp}; use mz_timely_util::activator::RcActivator; +use mz_timely_util::buffer::ConsolidateBuffer; use mz_timely_util::replay::MzReplay; use crate::compute_state::ComputeState; use crate::logging::persist::persist_sink; -use crate::logging::{ConsolidateBuffer, DifferentialLog, LogVariant}; +use crate::logging::{DifferentialLog, LogVariant}; use crate::typedefs::{KeysValsHandle, RowSpine}; /// Constructs the logging dataflow for differential logs. diff --git a/src/compute/src/logging/mod.rs b/src/compute/src/logging/mod.rs index 5c8f06729abb6..5cc4383c169c4 100644 --- a/src/compute/src/logging/mod.rs +++ b/src/compute/src/logging/mod.rs @@ -17,17 +17,8 @@ pub mod timely; use std::time::Duration; -use ::timely::communication::Push; -use ::timely::dataflow::channels::Bundle; use ::timely::dataflow::operators::capture::{Event, EventPusher}; -use ::timely::dataflow::operators::generic::OutputHandle; -use ::timely::dataflow::operators::Capability; -use ::timely::dataflow::operators::CapabilityRef; use ::timely::progress::Timestamp as TimelyTimestamp; -use differential_dataflow::consolidation::consolidate_updates; -use differential_dataflow::difference::Semigroup; -use differential_dataflow::lattice::Lattice; -use differential_dataflow::ExchangeData; use mz_compute_client::logging::{ComputeLog, DifferentialLog, LogVariant, TimelyLog}; use mz_repr::Timestamp; @@ -126,89 +117,3 @@ where .push(Event::Progress(vec![(self.time_ms, -1)])); } } - -/// A buffer that consolidates updates -/// -/// The buffer implements a wrapper around [OutputHandle] consolidating elements pushed to it. It is -/// backed by a capacity-limited buffer, which means that compaction only occurs within the -/// dimensions of the buffer, i.e. the number of unique keys is less than half of the buffer's -/// capacity. -/// -/// A cap is retained whenever the current time changes to be able to flush on drop or when the time -/// changes again. -/// -/// The buffer is filled with updates until it reaches its capacity. At this point, the updates are -/// consolidated to free up space. This process repeats until the consolidation recovered less than -/// half of the buffer's capacity, at which point the buffer will be shipped. -/// -/// The buffer retains a capability to send data on flush. It will flush all data once dropped, if -/// time changes, or if the buffer capacity is reached. -pub struct ConsolidateBuffer<'a, T, D: ExchangeData, R: Semigroup, P> -where - P: Push> + 'a, - T: Clone + Lattice + Ord + TimelyTimestamp + 'a, - D: 'a, -{ - // a buffer for records, to send at self.cap - // Invariant: Buffer only contains data if cap is Some. - buffer: Vec<(D, T, R)>, - output_handle: OutputHandle<'a, T, (D, T, R), P>, - cap: Option>, - port: usize, -} - -impl<'a, T, D: ExchangeData, R: Semigroup, P> ConsolidateBuffer<'a, T, D, R, P> -where - T: Clone + Lattice + Ord + TimelyTimestamp + 'a, - P: Push> + 'a, -{ - /// Create a new [ConsolidateBuffer], wrapping the provided session. - /// - /// * `output_handle`: The output to send data to. - /// * 'port': The output port to retain capabilities for. - pub fn new(output_handle: OutputHandle<'a, T, (D, T, R), P>, port: usize) -> Self { - Self { - output_handle, - port, - cap: None, - buffer: Vec::with_capacity(::timely::container::buffer::default_capacity::<(D, T, R)>()), - } - } - - /// Give an element to the buffer - pub fn give(&mut self, cap: &CapabilityRef, data: (D, T, R)) { - // Retain a cap for the current time, which will be used on flush. - if self.cap.as_ref().map_or(true, |t| t.time() != cap.time()) { - // Flush on capability change - self.flush(); - // Retain capability for the specified output port. - self.cap = Some(cap.delayed_for_output(cap.time(), self.port)); - } - self.buffer.push(data); - if self.buffer.len() == self.buffer.capacity() { - // Consolidate while the consolidation frees at least half the buffer - consolidate_updates(&mut self.buffer); - if self.buffer.len() > self.buffer.capacity() / 2 { - self.flush(); - } - } - } - - /// Flush the internal buffer to the underlying session - pub fn flush(&mut self) { - if let Some(cap) = &self.cap { - self.output_handle.session(cap).give_vec(&mut self.buffer); - } - } -} - -impl<'a, T, D: ExchangeData, R: Semigroup, P> Drop for ConsolidateBuffer<'a, T, D, R, P> -where - P: Push> + 'a, - T: Clone + Lattice + Ord + TimelyTimestamp + 'a, - D: 'a, -{ - fn drop(&mut self) { - self.flush(); - } -} diff --git a/src/compute/src/logging/reachability.rs b/src/compute/src/logging/reachability.rs index 0553060d3d159..8e9aa317790ea 100644 --- a/src/compute/src/logging/reachability.rs +++ b/src/compute/src/logging/reachability.rs @@ -28,11 +28,12 @@ use mz_ore::cast::CastFrom; use mz_ore::iter::IteratorExt; use mz_repr::{Datum, Diff, Row, RowArena, Timestamp}; use mz_timely_util::activator::RcActivator; +use mz_timely_util::buffer::ConsolidateBuffer; use mz_timely_util::replay::MzReplay; use crate::compute_state::ComputeState; use crate::logging::persist::persist_sink; -use crate::logging::{ConsolidateBuffer, LogVariant, TimelyLog}; +use crate::logging::{LogVariant, TimelyLog}; use crate::typedefs::{KeysValsHandle, RowSpine}; /// Constructs the logging dataflow for reachability logs. diff --git a/src/compute/src/logging/timely.rs b/src/compute/src/logging/timely.rs index 4615c667e7d93..55b362ddf1731 100644 --- a/src/compute/src/logging/timely.rs +++ b/src/compute/src/logging/timely.rs @@ -27,11 +27,12 @@ use mz_compute_client::logging::LoggingConfig; use mz_ore::cast::CastFrom; use mz_repr::{datum_list_size, datum_size, Datum, DatumVec, Diff, Row, Timestamp}; use mz_timely_util::activator::RcActivator; +use mz_timely_util::buffer::ConsolidateBuffer; use mz_timely_util::replay::MzReplay; use crate::compute_state::ComputeState; use crate::logging::persist::persist_sink; -use crate::logging::{ConsolidateBuffer, LogVariant, TimelyLog}; +use crate::logging::{LogVariant, TimelyLog}; use crate::typedefs::{KeysValsHandle, RowSpine}; /// Constructs the logging dataflow for timely logs. diff --git a/src/timely-util/Cargo.toml b/src/timely-util/Cargo.toml index fb506d6bae80d..3ff92928cb859 100644 --- a/src/timely-util/Cargo.toml +++ b/src/timely-util/Cargo.toml @@ -11,7 +11,6 @@ differential-dataflow = { git = "https://github.com/TimelyDataflow/differential- futures-util = "0.3.25" proptest = { git = "https://github.com/MaterializeInc/proptest.git", default-features = false, features = ["std"]} timely = { git = "https://github.com/TimelyDataflow/timely-dataflow", default-features = false, features = ["bincode"] } -timely_communication = { git = "https://github.com/TimelyDataflow/timely-dataflow", default-features = false, features = ["bincode"] } serde = { version = "1.0.152", features = ["derive"] } mz-ore = { path = "../ore", features = ["tracing_"] } polonius-the-crab = "0.3.1" diff --git a/src/timely-util/src/buffer.rs b/src/timely-util/src/buffer.rs new file mode 100644 index 0000000000000..af1be72f5ff5e --- /dev/null +++ b/src/timely-util/src/buffer.rs @@ -0,0 +1,110 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file at the +// root of this repository, or online at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use differential_dataflow::consolidation::consolidate_updates; +use differential_dataflow::difference::Semigroup; +use differential_dataflow::lattice::Lattice; +use differential_dataflow::ExchangeData; +use timely::communication::Push; +use timely::dataflow::channels::Bundle; +use timely::dataflow::operators::generic::OutputHandle; +use timely::dataflow::operators::{Capability, CapabilityRef}; +use timely::progress::Timestamp; + +/// A buffer that consolidates updates +/// +/// The buffer implements a wrapper around [OutputHandle] consolidating elements pushed to it. It is +/// backed by a capacity-limited buffer, which means that compaction only occurs within the +/// dimensions of the buffer, i.e. the number of unique keys is less than half of the buffer's +/// capacity. +/// +/// A cap is retained whenever the current time changes to be able to flush on drop or when the time +/// changes again. +/// +/// The buffer is filled with updates until it reaches its capacity. At this point, the updates are +/// consolidated to free up space. This process repeats until the consolidation recovered less than +/// half of the buffer's capacity, at which point the buffer will be shipped. +/// +/// The buffer retains a capability to send data on flush. It will flush all data once dropped, if +/// time changes, or if the buffer capacity is reached. +pub struct ConsolidateBuffer<'a, T, D: ExchangeData, R: Semigroup, P> +where + P: Push> + 'a, + T: Clone + Lattice + Ord + Timestamp + 'a, + D: 'a, +{ + // a buffer for records, to send at self.cap + // Invariant: Buffer only contains data if cap is Some. + buffer: Vec<(D, T, R)>, + output_handle: OutputHandle<'a, T, (D, T, R), P>, + cap: Option>, + port: usize, +} + +impl<'a, T, D: ExchangeData, R: Semigroup, P> ConsolidateBuffer<'a, T, D, R, P> +where + T: Clone + Lattice + Ord + Timestamp + 'a, + P: Push> + 'a, +{ + /// Create a new [ConsolidateBuffer], wrapping the provided session. + /// + /// * `output_handle`: The output to send data to. + /// * 'port': The output port to retain capabilities for. + pub fn new(output_handle: OutputHandle<'a, T, (D, T, R), P>, port: usize) -> Self { + Self { + output_handle, + port, + cap: None, + buffer: Vec::with_capacity(::timely::container::buffer::default_capacity::<(D, T, R)>()), + } + } + + /// Give an element to the buffer + pub fn give(&mut self, cap: &CapabilityRef, data: (D, T, R)) { + // Retain a cap for the current time, which will be used on flush. + if self.cap.as_ref().map_or(true, |t| t.time() != cap.time()) { + // Flush on capability change + self.flush(); + // Retain capability for the specified output port. + self.cap = Some(cap.delayed_for_output(cap.time(), self.port)); + } + self.buffer.push(data); + if self.buffer.len() == self.buffer.capacity() { + // Consolidate while the consolidation frees at least half the buffer + consolidate_updates(&mut self.buffer); + if self.buffer.len() > self.buffer.capacity() / 2 { + self.flush(); + } + } + } + + /// Flush the internal buffer to the underlying session + pub fn flush(&mut self) { + if let Some(cap) = &self.cap { + self.output_handle.session(cap).give_vec(&mut self.buffer); + } + } +} + +impl<'a, T, D: ExchangeData, R: Semigroup, P> Drop for ConsolidateBuffer<'a, T, D, R, P> +where + P: Push> + 'a, + T: Clone + Lattice + Ord + Timestamp + 'a, + D: 'a, +{ + fn drop(&mut self) { + self.flush(); + } +} diff --git a/src/timely-util/src/lib.rs b/src/timely-util/src/lib.rs index dd0dadbbc97a2..24acd829006cc 100644 --- a/src/timely-util/src/lib.rs +++ b/src/timely-util/src/lib.rs @@ -84,6 +84,7 @@ pub mod activator; pub mod antichain; +pub mod buffer; pub mod builder_async; pub mod capture; pub mod event; diff --git a/src/timely-util/src/panic.rs b/src/timely-util/src/panic.rs index fb5dc4515f59f..77407a8cb8ae0 100644 --- a/src/timely-util/src/panic.rs +++ b/src/timely-util/src/panic.rs @@ -17,7 +17,7 @@ use std::panic; use mz_ore::halt; -/// Intercepts expected [`timely_communication`] panics and downgrades them to +/// Intercepts expected [`timely::communication`] panics and downgrades them to /// [`halt`]s. /// /// Because processes in a timely cluster are shared fate, once one process in From 6f9a61f1fce9a35058a90f3573f3bd739b9fa6ff Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Wed, 18 Jan 2023 16:17:55 -0500 Subject: [PATCH 2/4] ConsolidateBuffer: Add cease function Signed-off-by: Moritz Hoffmann --- Cargo.lock | 10 +++++----- Cargo.toml | 3 +++ deny.toml | 3 +++ src/timely-util/src/buffer.rs | 5 +++++ 4 files changed, 16 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ffee7233086b8..161993e3d4c98 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7172,7 +7172,7 @@ dependencies = [ [[package]] name = "timely" version = "0.12.0" -source = "git+https://github.com/TimelyDataflow/timely-dataflow#faf5eb6457e69ac2bfbbb8a0f62deda55aa2c927" +source = "git+https://github.com/antiguru/timely-dataflow?branch=output_handle_cease#de1f20ec735c6fa3ebf4a214e2108c5a74bc260d" dependencies = [ "abomonation", "abomonation_derive", @@ -7190,12 +7190,12 @@ dependencies = [ [[package]] name = "timely_bytes" version = "0.12.0" -source = "git+https://github.com/TimelyDataflow/timely-dataflow#faf5eb6457e69ac2bfbbb8a0f62deda55aa2c927" +source = "git+https://github.com/antiguru/timely-dataflow?branch=output_handle_cease#de1f20ec735c6fa3ebf4a214e2108c5a74bc260d" [[package]] name = "timely_communication" version = "0.12.0" -source = "git+https://github.com/TimelyDataflow/timely-dataflow#faf5eb6457e69ac2bfbbb8a0f62deda55aa2c927" +source = "git+https://github.com/antiguru/timely-dataflow?branch=output_handle_cease#de1f20ec735c6fa3ebf4a214e2108c5a74bc260d" dependencies = [ "abomonation", "abomonation_derive", @@ -7211,7 +7211,7 @@ dependencies = [ [[package]] name = "timely_container" version = "0.12.0" -source = "git+https://github.com/TimelyDataflow/timely-dataflow#faf5eb6457e69ac2bfbbb8a0f62deda55aa2c927" +source = "git+https://github.com/antiguru/timely-dataflow?branch=output_handle_cease#de1f20ec735c6fa3ebf4a214e2108c5a74bc260d" dependencies = [ "columnation", "serde", @@ -7220,7 +7220,7 @@ dependencies = [ [[package]] name = "timely_logging" version = "0.12.0" -source = "git+https://github.com/TimelyDataflow/timely-dataflow#faf5eb6457e69ac2bfbbb8a0f62deda55aa2c927" +source = "git+https://github.com/antiguru/timely-dataflow?branch=output_handle_cease#de1f20ec735c6fa3ebf4a214e2108c5a74bc260d" [[package]] name = "tiny-keccak" diff --git a/Cargo.toml b/Cargo.toml index af7e812cb5535..ad99734bc4f15 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -117,3 +117,6 @@ postgres-protocol = { git = "https://github.com/MaterializeInc/rust-postgres" } tokio-postgres = { git = "https://github.com/MaterializeInc/rust-postgres" } serde-value = { git = "https://github.com/MaterializeInc/serde-value.git" } vte = { git = "https://github.com/alacritty/vte" } + +[patch."https://github.com/TimelyDataflow/timely-dataflow"] +timely = { git = "https://github.com/antiguru/timely-dataflow", branch = "output_handle_cease" } diff --git a/deny.toml b/deny.toml index 5616226866a9d..5c20fee315f7f 100644 --- a/deny.toml +++ b/deny.toml @@ -209,4 +209,7 @@ allow-git = [ # Waiting on https://github.com/launchdarkly/rust-server-sdk/pull/20 to make # it into a release. "https://github.com/MaterializeInc/rust-server-sdk", + + # TODO Remove + "https://github.com/antiguru/timely-dataflow", ] diff --git a/src/timely-util/src/buffer.rs b/src/timely-util/src/buffer.rs index af1be72f5ff5e..0578ed9d8f76b 100644 --- a/src/timely-util/src/buffer.rs +++ b/src/timely-util/src/buffer.rs @@ -96,6 +96,11 @@ where self.output_handle.session(cap).give_vec(&mut self.buffer); } } + + pub fn cease(&mut self) { + self.flush(); + self.output_handle.cease(); + } } impl<'a, T, D: ExchangeData, R: Semigroup, P> Drop for ConsolidateBuffer<'a, T, D, R, P> From f925818bb7adb2c0ee6d2b941d77a456567162be Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Fri, 13 Jan 2023 21:31:07 -0500 Subject: [PATCH 3/4] Use ConsolidateBuffer for persist's output Signed-off-by: Moritz Hoffmann --- .../src/source/persist_source.rs | 74 +++++++------------ src/storage/src/render/sources.rs | 15 ++-- src/storage/src/render/upsert.rs | 15 +++- 3 files changed, 51 insertions(+), 53 deletions(-) diff --git a/src/storage-client/src/source/persist_source.rs b/src/storage-client/src/source/persist_source.rs index ff5155406ec17..973ca78b854bf 100644 --- a/src/storage-client/src/source/persist_source.rs +++ b/src/storage-client/src/source/persist_source.rs @@ -16,8 +16,7 @@ use std::time::Instant; use mz_persist_client::operators::shard_source::shard_source; use timely::dataflow::channels::pact::Pipeline; -use timely::dataflow::operators::OkErr; -use timely::dataflow::{Scope, Stream}; +use timely::dataflow::{Scope, Stream, StreamCore}; use timely::progress::Antichain; use tokio::sync::Mutex; @@ -32,6 +31,7 @@ use crate::types::errors::DataflowError; use crate::types::sources::SourceData; pub use mz_persist_client::operators::shard_source::FlowControl; +use mz_timely_util::buffer::ConsolidateBuffer; /// Creates a new source that reads from a persist shard, distributing the work /// of reading data to all timely workers. @@ -74,7 +74,7 @@ where G: Scope, YFn: Fn(Instant, usize) -> bool + 'static, { - let (stream, token) = persist_source_core( + let (updates, errs, token) = persist_source_core( scope, source_id, persist_clients, @@ -85,11 +85,7 @@ where flow_control, yield_fn, ); - let (ok_stream, err_stream) = stream.ok_err(|(d, t, r)| match d { - Ok(row) => Ok((row, t, r)), - Err(err) => Err((err, t, r)), - }); - (ok_stream, err_stream, token) + (updates, errs, token) } /// Creates a new source that reads from a persist shard, distributing the work @@ -110,7 +106,8 @@ pub fn persist_source_core( flow_control: Option>, yield_fn: YFn, ) -> ( - Stream, Timestamp, Diff)>, + Stream, + Stream, Rc, ) where @@ -128,8 +125,8 @@ where until.clone(), flow_control, ); - let rows = decode_and_mfp(&fetched, &name, until, map_filter_project, yield_fn); - (rows, token) + let (updates, errs) = decode_and_mfp(&fetched, &name, until, map_filter_project, yield_fn); + (updates, errs, token) } pub fn decode_and_mfp( @@ -138,7 +135,10 @@ pub fn decode_and_mfp( until: Antichain, mut map_filter_project: Option<&mut MfpPlan>, yield_fn: YFn, -) -> Stream, Timestamp, Diff)> +) -> ( + StreamCore>, + StreamCore>, +) where G: Scope, YFn: Fn(Instant, usize) -> bool + 'static, @@ -150,6 +150,7 @@ where let mut fetched_input = builder.new_input(fetched, Pipeline); let (mut updates_output, updates_stream) = builder.new_output(); + let (mut err_output, err_stream) = builder.new_output(); // Re-used state for processing and building rows. let mut datum_vec = mz_repr::DatumVec::new(); @@ -178,11 +179,12 @@ where // our yield counters here. let mut decode_start = Instant::now(); + let mut updates_session = ConsolidateBuffer::new(updates_output.activate(), 0); + let mut err_session = ConsolidateBuffer::new(err_output.activate(), 1); + for fetched_part in buffer.drain(..) { // Apply as much logic to `updates` as we can, before we emit anything. - let (updates_size_hint_min, updates_size_hint_max) = fetched_part.size_hint(); - let mut updates = - Vec::with_capacity(updates_size_hint_max.unwrap_or(updates_size_hint_min)); + let mut updates = 0; for ((key, val), time, diff) in fetched_part { if !until.less_equal(&time) { match (key, val) { @@ -202,23 +204,25 @@ where Ok((row, time, diff)) => { // Additional `until` filtering due to temporal filters. if !until.less_equal(&time) { - updates.push((Ok(row), time, diff)); + updates_session.give(&cap, (row, time, diff)); + updates += 1; } } Err((err, time, diff)) => { // Additional `until` filtering due to temporal filters. if !until.less_equal(&time) { - updates.push((Err(err), time, diff)); + err_session.give(&cap, (err, time, diff)); } } } } } else { - updates.push((Ok(row), time, diff)); + updates_session.give(&cap, (row, time, diff)); + updates += 1; } } (Ok(SourceData(Err(err))), Ok(())) => { - updates.push((Err(err), time, diff)); + err_session.give(&cap, (err, time, diff)); } // TODO(petrosagg): error handling (Err(_), Ok(_)) | (Ok(_), Err(_)) | (Err(_), Err(_)) => { @@ -226,41 +230,19 @@ where } } } - if yield_fn(decode_start, updates.len()) { - // A large part of the point of yielding is to let later operators - // reduce down the data, so emit what we have. Note that this means - // we don't get to consolidate everything, but that's part of the - // tradeoff in tuning yield_fn. - differential_dataflow::consolidation::consolidate_updates(&mut updates); - - { - // Do very fine-grained output activation/session - // creation to ensure that we don't hold activated - // outputs or sessions across await points, which - // would prevent messages from being flushed from - // the shared timely output buffer. - let mut updates_output = updates_output.activate(); - updates_output.session(&cap).give_vec(&mut updates); - } - + if yield_fn(decode_start, updates) { + updates = 0; + updates_session.cease(); + err_session.cease(); // Force a yield to give back the timely thread, reactivating on our // way out. tokio::task::yield_now().await; decode_start = Instant::now(); } } - differential_dataflow::consolidation::consolidate_updates(&mut updates); - - // Do very fine-grained output activation/session creation - // to ensure that we don't hold activated outputs or - // sessions across await points, which would prevent - // messages from being flushed from the shared timely output - // buffer. - let mut updates_output = updates_output.activate(); - updates_output.session(&cap).give_vec(&mut updates); } } }); - updates_stream + (updates_stream, err_stream) } diff --git a/src/storage/src/render/sources.rs b/src/storage/src/render/sources.rs index 06436d62dd37a..15455d1707026 100644 --- a/src/storage/src/render/sources.rs +++ b/src/storage/src/render/sources.rs @@ -389,9 +389,9 @@ where } Some(&t) => Some(t.saturating_sub(1)), }; - let (previous_stream, previous_token) = + let (previous_ok_stream, previous_err_stream, previous_token) = if let Some(previous_as_of) = previous_as_of { - let (stream, tok) = persist_source::persist_source_core( + let (ok_stream, err_stream, tok) = persist_source::persist_source_core( scope, id, persist_clients, @@ -410,15 +410,20 @@ where // Copy the logic in DeltaJoin/Get/Join to start. |_timer, count| count > 1_000_000, ); - (stream, Some(tok)) + (ok_stream, err_stream, Some(tok)) } else { - (std::iter::empty().to_stream(scope), None) + ( + std::iter::empty().to_stream(scope), + std::iter::empty().to_stream(scope), + None, + ) }; let (upsert_ok, upsert_err) = super::upsert::upsert( &transformed_results, resume_upper, upsert_envelope.clone(), - previous_stream, + previous_ok_stream, + previous_err_stream, previous_token, ); diff --git a/src/storage/src/render/upsert.rs b/src/storage/src/render/upsert.rs index 4cc4319c2ca63..d9e492c8f277b 100644 --- a/src/storage/src/render/upsert.rs +++ b/src/storage/src/render/upsert.rs @@ -17,7 +17,9 @@ use differential_dataflow::lattice::Lattice; use differential_dataflow::{AsCollection, Collection}; use serde::{Deserialize, Serialize}; use timely::dataflow::channels::pact::Exchange; -use timely::dataflow::operators::{Capability, CapabilityRef, Concat, OkErr, Operator}; +use timely::dataflow::operators::{ + Capability, CapabilityRef, Concat, Concatenate, Map, OkErr, Operator, +}; use timely::dataflow::{Scope, Stream}; use timely::order::PartialOrder; use timely::progress::frontier::AntichainRef; @@ -58,7 +60,8 @@ pub(crate) fn upsert( stream: &Stream, as_of_frontier: Antichain, upsert_envelope: UpsertEnvelope, - previous: Stream, Timestamp, Diff)>, + previous_ok: Stream, + previous_err: Stream, previous_token: Option>, ) -> ( Stream, @@ -122,6 +125,14 @@ where None }; + // TODO: This multiplexes previous_ok and previous_err only to just demultiplex it differently + // in the next step. + let previous = previous_ok + .map(|(d, t, r)| (Ok(d), t, r)) + .concatenate(std::iter::once( + previous_err.map(|(d, t, r)| (Err(d), t, r)), + )); + // Break `previous` into: // On the one hand, "Ok" and "Err(UpsertError)", which we know how to deal with, and, // On the other hand, "Err(everything eles)", which we don't. From 28027321703b542fcc19d6d63bf01fdf152d2de9 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Wed, 18 Jan 2023 20:39:59 -0500 Subject: [PATCH 4/4] Inline persist_source_core This was largely redundant with persist_source. Signed-off-by: Moritz Hoffmann --- .../src/source/persist_source.rs | 40 ------------------- src/storage/src/render/sources.rs | 2 +- 2 files changed, 1 insertion(+), 41 deletions(-) diff --git a/src/storage-client/src/source/persist_source.rs b/src/storage-client/src/source/persist_source.rs index 973ca78b854bf..0e85c6f637855 100644 --- a/src/storage-client/src/source/persist_source.rs +++ b/src/storage-client/src/source/persist_source.rs @@ -70,46 +70,6 @@ pub fn persist_source( Stream, Rc, ) -where - G: Scope, - YFn: Fn(Instant, usize) -> bool + 'static, -{ - let (updates, errs, token) = persist_source_core( - scope, - source_id, - persist_clients, - metadata, - as_of, - until, - map_filter_project, - flow_control, - yield_fn, - ); - (updates, errs, token) -} - -/// Creates a new source that reads from a persist shard, distributing the work -/// of reading data to all timely workers. -/// -/// All times emitted will have been [advanced by] the given `as_of` frontier. -/// -/// [advanced by]: differential_dataflow::lattice::Lattice::advance_by -#[allow(clippy::needless_borrow)] -pub fn persist_source_core( - scope: &G, - source_id: GlobalId, - persist_clients: Arc>, - metadata: CollectionMetadata, - as_of: Option>, - until: Antichain, - map_filter_project: Option<&mut MfpPlan>, - flow_control: Option>, - yield_fn: YFn, -) -> ( - Stream, - Stream, - Rc, -) where G: Scope, YFn: Fn(Instant, usize) -> bool + 'static, diff --git a/src/storage/src/render/sources.rs b/src/storage/src/render/sources.rs index 15455d1707026..210e28e12ab8b 100644 --- a/src/storage/src/render/sources.rs +++ b/src/storage/src/render/sources.rs @@ -391,7 +391,7 @@ where }; let (previous_ok_stream, previous_err_stream, previous_token) = if let Some(previous_as_of) = previous_as_of { - let (ok_stream, err_stream, tok) = persist_source::persist_source_core( + let (ok_stream, err_stream, tok) = persist_source::persist_source( scope, id, persist_clients,