Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -117,3 +117,6 @@ postgres-protocol = { git = "https://github.com/MaterializeInc/rust-postgres" }
tokio-postgres = { git = "https://github.com/MaterializeInc/rust-postgres" }
serde-value = { git = "https://github.com/MaterializeInc/serde-value.git" }
vte = { git = "https://github.com/alacritty/vte" }

[patch."https://github.com/TimelyDataflow/timely-dataflow"]
timely = { git = "https://github.com/antiguru/timely-dataflow", branch = "output_handle_cease" }
3 changes: 3 additions & 0 deletions deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -209,4 +209,7 @@ allow-git = [
# Waiting on https://github.com/launchdarkly/rust-server-sdk/pull/20 to make
# it into a release.
"https://github.com/MaterializeInc/rust-server-sdk",

# TODO Remove
"https://github.com/antiguru/timely-dataflow",
]
3 changes: 2 additions & 1 deletion src/compute/src/logging/differential.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,12 @@ use mz_expr::{permutation_for_arrangement, MirScalarExpr};
use mz_ore::cast::CastFrom;
use mz_repr::{Datum, DatumVec, Diff, Row, Timestamp};
use mz_timely_util::activator::RcActivator;
use mz_timely_util::buffer::ConsolidateBuffer;
use mz_timely_util::replay::MzReplay;

use crate::compute_state::ComputeState;
use crate::logging::persist::persist_sink;
use crate::logging::{ConsolidateBuffer, DifferentialLog, LogVariant};
use crate::logging::{DifferentialLog, LogVariant};
use crate::typedefs::{KeysValsHandle, RowSpine};

/// Constructs the logging dataflow for differential logs.
Expand Down
95 changes: 0 additions & 95 deletions src/compute/src/logging/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,8 @@ pub mod timely;

use std::time::Duration;

use ::timely::communication::Push;
use ::timely::dataflow::channels::Bundle;
use ::timely::dataflow::operators::capture::{Event, EventPusher};
use ::timely::dataflow::operators::generic::OutputHandle;
use ::timely::dataflow::operators::Capability;
use ::timely::dataflow::operators::CapabilityRef;
use ::timely::progress::Timestamp as TimelyTimestamp;
use differential_dataflow::consolidation::consolidate_updates;
use differential_dataflow::difference::Semigroup;
use differential_dataflow::lattice::Lattice;
use differential_dataflow::ExchangeData;

use mz_compute_client::logging::{ComputeLog, DifferentialLog, LogVariant, TimelyLog};
use mz_repr::Timestamp;
Expand Down Expand Up @@ -126,89 +117,3 @@ where
.push(Event::Progress(vec![(self.time_ms, -1)]));
}
}

/// A buffer that consolidates updates
///
/// The buffer implements a wrapper around [OutputHandle] consolidating elements pushed to it. It is
/// backed by a capacity-limited buffer, which means that compaction only occurs within the
/// dimensions of the buffer, i.e. the number of unique keys is less than half of the buffer's
/// capacity.
///
/// A cap is retained whenever the current time changes to be able to flush on drop or when the time
/// changes again.
///
/// The buffer is filled with updates until it reaches its capacity. At this point, the updates are
/// consolidated to free up space. This process repeats until the consolidation recovered less than
/// half of the buffer's capacity, at which point the buffer will be shipped.
///
/// The buffer retains a capability to send data on flush. It will flush all data once dropped, if
/// time changes, or if the buffer capacity is reached.
pub struct ConsolidateBuffer<'a, T, D: ExchangeData, R: Semigroup, P>
where
P: Push<Bundle<T, (D, T, R)>> + 'a,
T: Clone + Lattice + Ord + TimelyTimestamp + 'a,
D: 'a,
{
// a buffer for records, to send at self.cap
// Invariant: Buffer only contains data if cap is Some.
buffer: Vec<(D, T, R)>,
output_handle: OutputHandle<'a, T, (D, T, R), P>,
cap: Option<Capability<T>>,
port: usize,
}

impl<'a, T, D: ExchangeData, R: Semigroup, P> ConsolidateBuffer<'a, T, D, R, P>
where
T: Clone + Lattice + Ord + TimelyTimestamp + 'a,
P: Push<Bundle<T, (D, T, R)>> + 'a,
{
/// Create a new [ConsolidateBuffer], wrapping the provided session.
///
/// * `output_handle`: The output to send data to.
/// * 'port': The output port to retain capabilities for.
pub fn new(output_handle: OutputHandle<'a, T, (D, T, R), P>, port: usize) -> Self {
Self {
output_handle,
port,
cap: None,
buffer: Vec::with_capacity(::timely::container::buffer::default_capacity::<(D, T, R)>()),
}
}

/// Give an element to the buffer
pub fn give(&mut self, cap: &CapabilityRef<T>, data: (D, T, R)) {
// Retain a cap for the current time, which will be used on flush.
if self.cap.as_ref().map_or(true, |t| t.time() != cap.time()) {
// Flush on capability change
self.flush();
// Retain capability for the specified output port.
self.cap = Some(cap.delayed_for_output(cap.time(), self.port));
}
self.buffer.push(data);
if self.buffer.len() == self.buffer.capacity() {
// Consolidate while the consolidation frees at least half the buffer
consolidate_updates(&mut self.buffer);
if self.buffer.len() > self.buffer.capacity() / 2 {
self.flush();
}
}
}

/// Flush the internal buffer to the underlying session
pub fn flush(&mut self) {
if let Some(cap) = &self.cap {
self.output_handle.session(cap).give_vec(&mut self.buffer);
}
}
}

impl<'a, T, D: ExchangeData, R: Semigroup, P> Drop for ConsolidateBuffer<'a, T, D, R, P>
where
P: Push<Bundle<T, (D, T, R)>> + 'a,
T: Clone + Lattice + Ord + TimelyTimestamp + 'a,
D: 'a,
{
fn drop(&mut self) {
self.flush();
}
}
3 changes: 2 additions & 1 deletion src/compute/src/logging/reachability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,12 @@ use mz_ore::cast::CastFrom;
use mz_ore::iter::IteratorExt;
use mz_repr::{Datum, Diff, Row, RowArena, Timestamp};
use mz_timely_util::activator::RcActivator;
use mz_timely_util::buffer::ConsolidateBuffer;
use mz_timely_util::replay::MzReplay;

use crate::compute_state::ComputeState;
use crate::logging::persist::persist_sink;
use crate::logging::{ConsolidateBuffer, LogVariant, TimelyLog};
use crate::logging::{LogVariant, TimelyLog};
use crate::typedefs::{KeysValsHandle, RowSpine};

/// Constructs the logging dataflow for reachability logs.
Expand Down
3 changes: 2 additions & 1 deletion src/compute/src/logging/timely.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@ use mz_compute_client::logging::LoggingConfig;
use mz_ore::cast::CastFrom;
use mz_repr::{datum_list_size, datum_size, Datum, DatumVec, Diff, Row, Timestamp};
use mz_timely_util::activator::RcActivator;
use mz_timely_util::buffer::ConsolidateBuffer;
use mz_timely_util::replay::MzReplay;

use crate::compute_state::ComputeState;
use crate::logging::persist::persist_sink;
use crate::logging::{ConsolidateBuffer, LogVariant, TimelyLog};
use crate::logging::{LogVariant, TimelyLog};
use crate::typedefs::{KeysValsHandle, RowSpine};

/// Constructs the logging dataflow for timely logs.
Expand Down
Loading