Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
225 changes: 127 additions & 98 deletions src/storage-client/src/source/persist_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,27 @@ use std::time::Instant;

use mz_persist_client::operators::shard_source::shard_source;
use mz_persist_types::codec_impls::UnitSchema;
use timely::communication::Push;
use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::operators::OkErr;
use timely::dataflow::channels::Bundle;
use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
use timely::dataflow::operators::{Capability, OkErr};
use timely::dataflow::{Scope, Stream};
use timely::progress::Antichain;
use timely::scheduling::Activator;
use tokio::sync::Mutex;

use mz_expr::MfpPlan;
use mz_persist_client::cache::PersistClientCache;
use mz_persist_client::fetch::FetchedPart;
use mz_repr::{Diff, GlobalId, Row, Timestamp};
use mz_timely_util::builder_async::{Event, OperatorBuilder as AsyncOperatorBuilder};
use mz_repr::{DatumVec, Diff, GlobalId, Row, Timestamp};

use crate::controller::CollectionMetadata;
use crate::types::errors::DataflowError;
use crate::types::sources::SourceData;

pub use mz_persist_client::operators::shard_source::FlowControl;
use mz_timely_util::buffer::ConsolidateBuffer;

/// Creates a new source that reads from a persist shard, distributing the work
/// of reading data to all timely workers.
Expand Down Expand Up @@ -146,10 +150,12 @@ where
G: Scope<Timestamp = mz_repr::Timestamp>,
YFn: Fn(Instant, usize) -> bool + 'static,
{
let mut builder = AsyncOperatorBuilder::new(
let scope = fetched.scope();
let mut builder = OperatorBuilder::new(
format!("persist_source::decode_and_mfp({})", name),
fetched.scope(),
scope.clone(),
);
let operator_info = builder.operator_info();

let mut fetched_input = builder.new_input(fetched, Pipeline);
let (mut updates_output, updates_stream) = builder.new_output();
Expand All @@ -159,111 +165,134 @@ where
let mut row_builder = Row::default();

// Extract the MFP if it exists; leave behind an identity MFP in that case.
let mut map_filter_project = map_filter_project.as_mut().map(|mfp| mfp.take());
let map_filter_project = map_filter_project.as_mut().map(|mfp| mfp.take());

builder.build(move |mut initial_capabilities| async move {
initial_capabilities.clear();
builder.build(move |_caps| {
// Acquire an activator to reschedule the operator when it has unfinished work.
let activations = scope.activations();
let activator = Activator::new(&operator_info.address[..], activations);
// Maintain a list of work to do
let mut pending_work = std::collections::VecDeque::new();
let mut buffer = Default::default();

let mut buffer = Vec::new();
move |_frontier| {
fetched_input.for_each(|time, data| {
data.swap(&mut buffer);
let capability = time.retain();
for fetched_part in buffer.drain(..) {
pending_work.push_back(PendingWork {
capability: capability.clone(),
fetched_part,
})
}
});

while let Some(event) = fetched_input.next().await {
let cap = match event {
Event::Data(cap, data) => {
data.swap(&mut buffer);
cap
let mut work = 0;
let start_time = Instant::now();
let mut output = updates_output.activate();
let mut handle = ConsolidateBuffer::new(&mut output, 0);
while !pending_work.is_empty() && !yield_fn(start_time, work) {
let done = pending_work.front_mut().unwrap().do_work(
&mut work,
start_time,
&yield_fn,
&until,
map_filter_project.as_ref(),
&mut datum_vec,
&mut row_builder,
&mut handle,
);
if done {
pending_work.pop_front();
}
Event::Progress(_) => continue,
};
}
if !pending_work.is_empty() {
activator.activate();
}
}
});

// SUBTLE: This operator yields back to timely whenever an await returns a
// Pending result from the overall async/await state machine `poll`. Since
// this is fetching from remote storage, it will yield and thus we can reset
// our yield counters here.
let mut decode_start = Instant::now();
updates_stream
}

for fetched_part in buffer.drain(..) {
// Apply as much logic to `updates` as we can, before we emit anything.
let (updates_size_hint_min, updates_size_hint_max) = fetched_part.size_hint();
let mut updates =
Vec::with_capacity(updates_size_hint_max.unwrap_or(updates_size_hint_min));
for ((key, val), time, diff) in fetched_part {
if !until.less_equal(&time) {
match (key, val) {
(Ok(SourceData(Ok(row))), Ok(())) => {
if let Some(mfp) = &mut map_filter_project {
let arena = mz_repr::RowArena::new();
let mut datums_local = datum_vec.borrow_with(&row);
for result in mfp.evaluate(
&mut datums_local,
&arena,
time,
diff,
|time| !until.less_equal(time),
&mut row_builder,
) {
match result {
Ok((row, time, diff)) => {
// Additional `until` filtering due to temporal filters.
if !until.less_equal(&time) {
updates.push((Ok(row), time, diff));
}
}
Err((err, time, diff)) => {
// Additional `until` filtering due to temporal filters.
if !until.less_equal(&time) {
updates.push((Err(err), time, diff));
}
}
}
/// Pending work to read from fetched parts
struct PendingWork {
/// The time at which the work should happen.
capability: Capability<Timestamp>,
/// Pending fetched part.
fetched_part: FetchedPart<SourceData, (), Timestamp, Diff>,
}

impl PendingWork {
/// Perform work, reading from the fetched part, decoding, and sending outputs, while checking
/// `yield_fn` whether more fuel is available.
fn do_work<P, YFn>(
&mut self,
work: &mut usize,
start_time: Instant,
yield_fn: YFn,
until: &Antichain<Timestamp>,
map_filter_project: Option<&MfpPlan>,
datum_vec: &mut DatumVec,
row_builder: &mut Row,
output: &mut ConsolidateBuffer<Timestamp, Result<Row, DataflowError>, Diff, P>,
) -> bool
where
P: Push<Bundle<Timestamp, (Result<Row, DataflowError>, Timestamp, Diff)>>,
YFn: Fn(Instant, usize) -> bool,
{
while let Some(((key, val), time, diff)) = self.fetched_part.next() {
if until.less_equal(&time) {
continue;
}
match (key, val) {
(Ok(SourceData(Ok(row))), Ok(())) => {
if let Some(mfp) = map_filter_project {
let arena = mz_repr::RowArena::new();
let mut datums_local = datum_vec.borrow_with(&row);
for result in mfp.evaluate(
&mut datums_local,
&arena,
time,
diff,
|time| !until.less_equal(time),
row_builder,
) {
match result {
Ok((row, time, diff)) => {
// Additional `until` filtering due to temporal filters.
if !until.less_equal(&time) {
output.give_at(&self.capability, (Ok(row), time, diff));
*work += 1;
}
}
Err((err, time, diff)) => {
// Additional `until` filtering due to temporal filters.
if !until.less_equal(&time) {
output.give_at(&self.capability, (Err(err), time, diff));
*work += 1;
}
} else {
updates.push((Ok(row), time, diff));
}
}
(Ok(SourceData(Err(err))), Ok(())) => {
updates.push((Err(err), time, diff));
}
// TODO(petrosagg): error handling
(Err(_), Ok(_)) | (Ok(_), Err(_)) | (Err(_), Err(_)) => {
panic!("decoding failed")
}
}
}
if yield_fn(decode_start, updates.len()) {
// A large part of the point of yielding is to let later operators
// reduce down the data, so emit what we have. Note that this means
// we don't get to consolidate everything, but that's part of the
// tradeoff in tuning yield_fn.
differential_dataflow::consolidation::consolidate_updates(&mut updates);

{
// Do very fine-grained output activation/session
// creation to ensure that we don't hold activated
// outputs or sessions across await points, which
// would prevent messages from being flushed from
// the shared timely output buffer.
let mut updates_output = updates_output.activate();
updates_output.session(&cap).give_vec(&mut updates);
}

// Force a yield to give back the timely thread, reactivating on our
// way out.
tokio::task::yield_now().await;
decode_start = Instant::now();
} else {
output.give_at(&self.capability, (Ok(row), time, diff));
*work += 1;
}
}
differential_dataflow::consolidation::consolidate_updates(&mut updates);

// Do very fine-grained output activation/session creation
// to ensure that we don't hold activated outputs or
// sessions across await points, which would prevent
// messages from being flushed from the shared timely output
// buffer.
let mut updates_output = updates_output.activate();
updates_output.session(&cap).give_vec(&mut updates);
(Ok(SourceData(Err(err))), Ok(())) => {
output.give_at(&self.capability, (Err(err), time, diff));
*work += 1;
}
// TODO(petrosagg): error handling
(Err(_), Ok(_)) | (Ok(_), Err(_)) | (Err(_), Err(_)) => {
panic!("decoding failed")
}
}
if yield_fn(start_time, *work) {
return false;
}
}
});

updates_stream
true
}
}
26 changes: 22 additions & 4 deletions src/timely-util/src/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

use differential_dataflow::consolidation::consolidate_updates;
use differential_dataflow::difference::Semigroup;
use differential_dataflow::lattice::Lattice;
use differential_dataflow::Data;
use timely::communication::Push;
use timely::dataflow::channels::Bundle;
Expand All @@ -42,7 +41,7 @@ use timely::progress::Timestamp;
pub struct ConsolidateBuffer<'a, 'b, T, D: Data, R: Semigroup, P>
where
P: Push<Bundle<T, (D, T, R)>> + 'a,
T: Clone + Lattice + Ord + Timestamp + 'a,
T: Data + Timestamp + 'a,
D: 'a,
{
// a buffer for records, to send at self.cap
Expand All @@ -56,7 +55,7 @@ where

impl<'a, 'b, T, D: Data, R: Semigroup, P> ConsolidateBuffer<'a, 'b, T, D, R, P>
where
T: Clone + Lattice + Ord + Timestamp + 'a,
T: Data + Timestamp + 'a,
P: Push<Bundle<T, (D, T, R)>> + 'a,
{
/// Create a new [ConsolidateBuffer], wrapping the provided session.
Expand Down Expand Up @@ -94,6 +93,25 @@ where
// Retain capability for the specified output port.
self.cap = Some(cap.delayed_for_output(cap.time(), self.port));
}
self.give_internal(data);
}

/// Give an element to the buffer, using a pre-fabricated capability. Note that the capability
/// must be valid for the associated output.
pub fn give_at(&mut self, cap: &Capability<T>, data: (D, T, R)) {
// Retain a cap for the current time, which will be used on flush.
if self.cap.as_ref().map_or(true, |t| t.time() != cap.time()) {
// Flush on capability change
self.flush();
// Retain capability.
self.cap = Some(cap.clone());
}
self.give_internal(data);
}

/// Give an element and possibly flush the buffer. Note that this needs to have access
/// to a capability, which the public functions ensure.
fn give_internal(&mut self, data: (D, T, R)) {
self.buffer.push(data);

// Limit, if possible, the lifetime of the allocations for data
Expand Down Expand Up @@ -138,7 +156,7 @@ where
impl<'a, 'b, T, D: Data, R: Semigroup, P> Drop for ConsolidateBuffer<'a, 'b, T, D, R, P>
where
P: Push<Bundle<T, (D, T, R)>> + 'a,
T: Clone + Lattice + Ord + Timestamp + 'a,
T: Data + Timestamp + 'a,
D: 'a,
{
fn drop(&mut self) {
Expand Down