Skip to content

Commit 5a60c7e

Browse files
committed
container: Preserve public API
Signed-off-by: Moritz Hoffmann <[email protected]>
1 parent b519cb2 commit 5a60c7e

File tree

15 files changed

+113
-94
lines changed

15 files changed

+113
-94
lines changed

kafkaesque/src/kafka_source.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ where
9696
D: Data,
9797
L: Fn(&[u8],
9898
&mut Capability<G::Timestamp>,
99-
&mut OutputHandle<G::Timestamp, Vec<D>, Tee<G::Timestamp, Vec<D>>>) -> bool+'static,
99+
&mut OutputHandle<G::Timestamp, D, Tee<G::Timestamp, D>>) -> bool+'static,
100100
{
101101
use timely::dataflow::operators::generic::source;
102102
source(scope, name, move |capability, info| {

timely/src/dataflow/channels/pact.rs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ use super::{Bundle, Message};
1919

2020
use crate::logging::{TimelyLogger as Logger, MessagesEvent};
2121

22-
/// A `ParallelizationContract` allocates paired `Push` and `Pull` implementors.
23-
pub trait ParallelizationContract<T: 'static, D: 'static> {
22+
/// A `ParallelizationContractCore` allocates paired `Push` and `Pull` implementors.
23+
pub trait ParallelizationContractCore<T: 'static, D: 'static> {
2424
/// Type implementing `Push` produced by this pact.
2525
type Pusher: Push<Bundle<T, D>>+'static;
2626
/// Type implementing `Pull` produced by this pact.
@@ -29,11 +29,15 @@ pub trait ParallelizationContract<T: 'static, D: 'static> {
2929
fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option<Logger>) -> (Self::Pusher, Self::Puller);
3030
}
3131

32+
/// A `ParallelizationContractCore` specialized for `Vec` containers
33+
/// TODO: Use trait aliases once stable.
34+
pub trait ParallelizationContract<T: 'static, D: 'static>: ParallelizationContractCore<T, Vec<D>> { }
35+
3236
/// A direct connection
3337
#[derive(Debug)]
3438
pub struct Pipeline;
3539

36-
impl<T: 'static, D: Container+'static> ParallelizationContract<T, D> for Pipeline {
40+
impl<T: 'static, D: Container+'static> ParallelizationContractCore<T, D> for Pipeline {
3741
type Pusher = LogPusher<T, D, ThreadPusher<Bundle<T, D>>>;
3842
type Puller = LogPuller<T, D, ThreadPuller<Bundle<T, D>>>;
3943
fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
@@ -45,6 +49,8 @@ impl<T: 'static, D: Container+'static> ParallelizationContract<T, D> for Pipelin
4549
}
4650
}
4751

52+
impl<T: 'static, D: Clone+'static> ParallelizationContract<T, D> for Pipeline { }
53+
4854
/// An exchange between multiple observers by data
4955
pub struct Exchange<C, D, F> { hash_func: F, phantom: PhantomData<(C, D)> }
5056

@@ -59,7 +65,7 @@ impl<'a, C: Container, D: Data, F: FnMut(&D)->u64+'static> Exchange<C, D, F> {
5965
}
6066

6167
// Exchange uses a `Box<Pushable>` because it cannot know what type of pushable will return from the allocator.
62-
impl<'a, T: Eq+Data+Clone, C: Container<Inner=D>+ExchangeContainer+'static, D: Data+Clone, F: FnMut(&D)->u64+'static> ParallelizationContract<T, C> for Exchange<C, D, F>
68+
impl<'a, T: Eq+Data+Clone, C: Container<Inner=D>+ExchangeContainer+'static, D: Data+Clone, F: FnMut(&D)->u64+'static> ParallelizationContractCore<T, C> for Exchange<C, D, F>
6369
where for<'b> &'b mut C: DrainContainer<Inner=D>,
6470
{
6571
// TODO: The closure in the type prevents us from naming it.
@@ -73,6 +79,8 @@ impl<'a, T: Eq+Data+Clone, C: Container<Inner=D>+ExchangeContainer+'static, D: D
7379
}
7480
}
7581

82+
impl<T: Eq+Data+Clone, D: Data+Clone, F: FnMut(&D)->u64+'static> ParallelizationContract<T, D> for Exchange<Vec<D>, D, F> { }
83+
7684
impl<C, D, F> Debug for Exchange<C, D, F> {
7785
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
7886
f.debug_struct("Exchange").finish()

timely/src/dataflow/channels/pushers/buffer.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ impl<T: ::std::fmt::Debug, C: Container, P: Push<Bundle<T, C>>+::std::fmt::Debug
2727
let mut debug = f.debug_struct("UnorderedHandle");
2828
debug.field("time", &self.time);
2929
debug.field("pusher", &self.pusher);
30-
debug.finish_non_exhaustive()
30+
debug.field("buffer", &"...");
31+
debug.finish()
3132
}
3233
}
3334

timely/src/dataflow/channels/pushers/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
pub use self::tee::{Tee, TeeHelper};
1+
pub use self::tee::{Tee, TeeCore, TeeHelper};
22
pub use self::exchange::Exchange;
33
pub use self::counter::Counter;
44

timely/src/dataflow/channels/pushers/tee.rs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,15 @@ use crate::communication::Push;
1212
type PushList<T, D> = Rc<RefCell<Vec<Box<dyn Push<Bundle<T, D>>>>>>;
1313

1414
/// Wraps a shared list of `Box<Push>` to forward pushes to. Owned by `Stream`.
15-
pub struct Tee<T: 'static, D: 'static> {
15+
pub struct TeeCore<T: 'static, D: 'static> {
1616
buffer: D,
1717
shared: PushList<T, D>,
1818
}
1919

20-
impl<T: Data, D: Container> Push<Bundle<T, D>> for Tee<T, D> {
20+
/// [TeeCore] specialized to `Vec`-based container.
21+
pub type Tee<T, D> = TeeCore<T, Vec<D>>;
22+
23+
impl<T: Data, D: Container> Push<Bundle<T, D>> for TeeCore<T, D> {
2124
#[inline]
2225
fn push(&mut self, message: &mut Option<Bundle<T, D>>) {
2326
let mut pushers = self.shared.borrow_mut();
@@ -41,11 +44,11 @@ impl<T: Data, D: Container> Push<Bundle<T, D>> for Tee<T, D> {
4144
}
4245
}
4346

44-
impl<T, D: Container> Tee<T, D> {
47+
impl<T, D: Container> TeeCore<T, D> {
4548
/// Allocates a new pair of `Tee` and `TeeHelper`.
46-
pub fn new() -> (Tee<T, D>, TeeHelper<T, D>) {
49+
pub fn new() -> (TeeCore<T, D>, TeeHelper<T, D>) {
4750
let shared = Rc::new(RefCell::new(Vec::new()));
48-
let port = Tee {
51+
let port = TeeCore {
4952
buffer: D::Builder::with_capacity(D::default_length()).build(),
5053
shared: shared.clone(),
5154
};
@@ -54,7 +57,7 @@ impl<T, D: Container> Tee<T, D> {
5457
}
5558
}
5659

57-
impl<T, D> Debug for Tee<T, D>
60+
impl<T, D> Debug for TeeCore<T, D>
5861
where
5962
D: Debug,
6063
{

timely/src/dataflow/operators/enterleave.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use crate::progress::{Source, Target};
2727
use crate::order::Product;
2828
use crate::{Container, ContainerBuilder, Data};
2929
use crate::communication::Push;
30-
use crate::dataflow::channels::pushers::{Counter, Tee};
30+
use crate::dataflow::channels::pushers::{Counter, TeeCore};
3131
use crate::dataflow::channels::{Bundle, Message};
3232

3333
use crate::worker::AsWorker;
@@ -87,7 +87,7 @@ impl<G: Scope, T: Timestamp+Refines<G::Timestamp>, C: Container<Inner=D>, D> Ent
8787

8888
use crate::scheduling::Scheduler;
8989

90-
let (targets, registrar) = Tee::<T, C>::new();
90+
let (targets, registrar) = TeeCore::<T, C>::new();
9191
let ingress = IngressNub {
9292
targets: Counter::new(targets),
9393
phantom: ::std::marker::PhantomData,
@@ -129,7 +129,7 @@ impl<'a, G: Scope, D: Container, T: Timestamp+Refines<G::Timestamp>> Leave<G, D>
129129
let scope = self.scope();
130130

131131
let output = scope.subgraph.borrow_mut().new_output();
132-
let (targets, registrar) = Tee::<G::Timestamp, D>::new();
132+
let (targets, registrar) = TeeCore::<G::Timestamp, D>::new();
133133
let channel_id = scope.clone().new_identifier();
134134
self.connect_to(Target::new(0, output.port), EgressNub { targets, phantom: PhantomData }, channel_id);
135135

@@ -143,7 +143,7 @@ impl<'a, G: Scope, D: Container, T: Timestamp+Refines<G::Timestamp>> Leave<G, D>
143143

144144

145145
struct IngressNub<TOuter: Timestamp, TInner: Timestamp+Refines<TOuter>, TData: Container> {
146-
targets: Counter<TInner, TData, Tee<TInner, TData>>,
146+
targets: Counter<TInner, TData, TeeCore<TInner, TData>>,
147147
phantom: ::std::marker::PhantomData<TOuter>,
148148
activator: crate::scheduling::Activator,
149149
active: bool,
@@ -175,7 +175,7 @@ impl<TOuter: Timestamp, TInner: Timestamp+Refines<TOuter>, TData: Container> Pus
175175

176176

177177
struct EgressNub<TOuter: Timestamp, TInner: Timestamp+Refines<TOuter>, TData: Container> {
178-
targets: Tee<TOuter, TData>,
178+
targets: TeeCore<TOuter, TData>,
179179
phantom: PhantomData<TInner>,
180180
}
181181

timely/src/dataflow/operators/feedback.rs

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use crate::progress::{Timestamp, PathSummary};
44
use crate::progress::frontier::Antichain;
55
use crate::order::Product;
66

7-
use crate::dataflow::channels::pushers::Tee;
7+
use crate::dataflow::channels::pushers::TeeCore;
88
use crate::dataflow::channels::pact::Pipeline;
99
use crate::dataflow::{CoreStream, Scope, Stream};
1010
use crate::dataflow::scopes::child::Iterative;
@@ -35,7 +35,7 @@ pub trait Feedback<G: Scope> {
3535
/// .connect_loop(handle);
3636
/// });
3737
/// ```
38-
fn feedback<D: Data>(&mut self, summary: <G::Timestamp as Timestamp>::Summary) -> (Handle<G, Vec<D>>, Stream<G, D>);
38+
fn feedback<D: Data>(&mut self, summary: <G::Timestamp as Timestamp>::Summary) -> (Handle<G, D>, Stream<G, D>);
3939

4040
/// Creates a `CoreStream` and a `Handle` to later bind the source of that `Stream`.
4141
///
@@ -58,7 +58,7 @@ pub trait Feedback<G: Scope> {
5858
/// .connect_loop(handle);
5959
/// });
6060
/// ```
61-
fn feedback_core<D: Container>(&mut self, summary: <G::Timestamp as Timestamp>::Summary) -> (Handle<G, D>, CoreStream<G, D>);
61+
fn feedback_core<D: Container>(&mut self, summary: <G::Timestamp as Timestamp>::Summary) -> (HandleCore<G, D>, CoreStream<G, D>);
6262
}
6363

6464
/// Creates a `Stream` and a `Handle` to later bind the source of that `Stream`.
@@ -86,25 +86,25 @@ pub trait LoopVariable<'a, G: Scope, T: Timestamp> {
8686
/// });
8787
/// });
8888
/// ```
89-
fn loop_variable<D: Container>(&mut self, summary: T::Summary) -> (Handle<Iterative<'a, G, T>, D>, CoreStream<Iterative<'a, G, T>, D>);
89+
fn loop_variable<D: Container>(&mut self, summary: T::Summary) -> (HandleCore<Iterative<'a, G, T>, D>, CoreStream<Iterative<'a, G, T>, D>);
9090
}
9191

9292
impl<G: Scope> Feedback<G> for G {
93-
fn feedback<D: Data>(&mut self, summary: <G::Timestamp as Timestamp>::Summary) -> (Handle<G, Vec<D>>, Stream<G, D>) {
93+
fn feedback<D: Data>(&mut self, summary: <G::Timestamp as Timestamp>::Summary) -> (Handle<G, D>, Stream<G, D>) {
9494
self.feedback_core(summary)
9595
}
9696

97-
fn feedback_core<D: Container>(&mut self, summary: <G::Timestamp as Timestamp>::Summary) -> (Handle<G, D>, CoreStream<G, D>) {
97+
fn feedback_core<D: Container>(&mut self, summary: <G::Timestamp as Timestamp>::Summary) -> (HandleCore<G, D>, CoreStream<G, D>) {
9898

9999
let mut builder = OperatorBuilder::new("Feedback".to_owned(), self.clone());
100100
let (output, stream) = builder.new_output();
101101

102-
(Handle { builder, summary, output }, stream)
102+
(HandleCore { builder, summary, output }, stream)
103103
}
104104
}
105105

106106
impl<'a, G: Scope, T: Timestamp> LoopVariable<'a, G, T> for Iterative<'a, G, T> {
107-
fn loop_variable<D: Container>(&mut self, summary: T::Summary) -> (Handle<Iterative<'a, G, T>, D>, CoreStream<Iterative<'a, G, T>, D>) {
107+
fn loop_variable<D: Container>(&mut self, summary: T::Summary) -> (HandleCore<Iterative<'a, G, T>, D>, CoreStream<Iterative<'a, G, T>, D>) {
108108
self.feedback_core(Product::new(Default::default(), summary))
109109
}
110110
}
@@ -128,11 +128,11 @@ pub trait ConnectLoop<G: Scope, D: Container> {
128128
/// .connect_loop(handle);
129129
/// });
130130
/// ```
131-
fn connect_loop(&self, _: Handle<G, D>);
131+
fn connect_loop(&self, _: HandleCore<G, D>);
132132
}
133133

134134
impl<G: Scope, D: Container> ConnectLoop<G, D> for CoreStream<G, D> {
135-
fn connect_loop(&self, helper: Handle<G, D>) {
135+
fn connect_loop(&self, helper: HandleCore<G, D>) {
136136

137137
let mut builder = helper.builder;
138138
let summary = helper.summary;
@@ -158,8 +158,11 @@ impl<G: Scope, D: Container> ConnectLoop<G, D> for CoreStream<G, D> {
158158

159159
/// A handle used to bind the source of a loop variable.
160160
#[derive(Debug)]
161-
pub struct Handle<G: Scope, D: Container> {
161+
pub struct HandleCore<G: Scope, D: Container> {
162162
builder: OperatorBuilder<G>,
163163
summary: <G::Timestamp as Timestamp>::Summary,
164-
output: OutputWrapper<G::Timestamp, D, Tee<G::Timestamp, D>>,
164+
output: OutputWrapper<G::Timestamp, D, TeeCore<G::Timestamp, D>>,
165165
}
166+
167+
/// A `HandleCore` specialized for using `Vec` as container
168+
pub type Handle<G, D> = HandleCore<G, Vec<D>>;

timely/src/dataflow/operators/generic/builder_raw.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@ use crate::progress::{Source, Target};
1616
use crate::progress::{Timestamp, Operate, operate::SharedProgress, Antichain};
1717

1818
use crate::dataflow::{CoreStream, Scope};
19-
use crate::dataflow::channels::pushers::Tee;
20-
use crate::dataflow::channels::pact::ParallelizationContract;
19+
use crate::dataflow::channels::pushers::TeeCore;
20+
use crate::dataflow::channels::pact::ParallelizationContractCore;
2121
use crate::dataflow::operators::generic::operator_info::OperatorInfo;
2222
use crate::Container;
2323

@@ -109,15 +109,15 @@ impl<G: Scope> OperatorBuilder<G> {
109109
/// Adds a new input to a generic operator builder, returning the `Pull` implementor to use.
110110
pub fn new_input<D: Data, P>(&mut self, stream: &CoreStream<G, D>, pact: P) -> P::Puller
111111
where
112-
P: ParallelizationContract<G::Timestamp, D> {
112+
P: ParallelizationContractCore<G::Timestamp, D> {
113113
let connection = vec![Antichain::from_elem(Default::default()); self.shape.outputs];
114114
self.new_input_connection(stream, pact, connection)
115115
}
116116

117117
/// Adds a new input to a generic operator builder, returning the `Pull` implementor to use.
118118
pub fn new_input_connection<D: Data, P>(&mut self, stream: &CoreStream<G, D>, pact: P, connection: Vec<Antichain<<G::Timestamp as Timestamp>::Summary>>) -> P::Puller
119119
where
120-
P: ParallelizationContract<G::Timestamp, D> {
120+
P: ParallelizationContractCore<G::Timestamp, D> {
121121

122122
let channel_id = self.scope.new_identifier();
123123
let logging = self.scope.logging();
@@ -133,16 +133,16 @@ impl<G: Scope> OperatorBuilder<G> {
133133
}
134134

135135
/// Adds a new output to a generic operator builder, returning the `Push` implementor to use.
136-
pub fn new_output<D: Container>(&mut self) -> (Tee<G::Timestamp, D>, CoreStream<G, D>) {
136+
pub fn new_output<D: Container>(&mut self) -> (TeeCore<G::Timestamp, D>, CoreStream<G, D>) {
137137

138138
let connection = vec![Antichain::from_elem(Default::default()); self.shape.inputs];
139139
self.new_output_connection(connection)
140140
}
141141

142142
/// Adds a new output to a generic operator builder, returning the `Push` implementor to use.
143-
pub fn new_output_connection<D: Container>(&mut self, connection: Vec<Antichain<<G::Timestamp as Timestamp>::Summary>>) -> (Tee<G::Timestamp, D>, CoreStream<G, D>) {
143+
pub fn new_output_connection<D: Container>(&mut self, connection: Vec<Antichain<<G::Timestamp as Timestamp>::Summary>>) -> (TeeCore<G::Timestamp, D>, CoreStream<G, D>) {
144144

145-
let (targets, registrar) = Tee::<G::Timestamp,D>::new();
145+
let (targets, registrar) = TeeCore::<G::Timestamp,D>::new();
146146
let source = Source::new(self.index, self.shape.outputs);
147147
let stream = CoreStream::new(source, registrar, self.scope.clone());
148148

timely/src/dataflow/operators/generic/builder_rc.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,10 @@ use crate::progress::operate::SharedProgress;
1111
use crate::progress::frontier::{Antichain, MutableAntichain};
1212

1313
use crate::dataflow::{Scope, CoreStream};
14-
use crate::dataflow::channels::pushers::Tee;
14+
use crate::dataflow::channels::pushers::TeeCore;
1515
use crate::dataflow::channels::pushers::Counter as PushCounter;
1616
use crate::dataflow::channels::pushers::buffer::Buffer as PushBuffer;
17-
use crate::dataflow::channels::pact::ParallelizationContract;
17+
use crate::dataflow::channels::pact::ParallelizationContractCore;
1818
use crate::dataflow::channels::pullers::Counter as PullCounter;
1919
use crate::dataflow::operators::capability::Capability;
2020
use crate::dataflow::operators::generic::handles::{InputHandle, new_input_handle, OutputWrapper};
@@ -60,7 +60,7 @@ impl<G: Scope> OperatorBuilder<G> {
6060
/// Adds a new input to a generic operator builder, returning the `Pull` implementor to use.
6161
pub fn new_input<D: Container, P>(&mut self, stream: &CoreStream<G, D>, pact: P) -> InputHandle<G::Timestamp, D, P::Puller>
6262
where
63-
P: ParallelizationContract<G::Timestamp, D> {
63+
P: ParallelizationContractCore<G::Timestamp, D> {
6464

6565
let connection = vec![Antichain::from_elem(Default::default()); self.builder.shape().outputs()];
6666
self.new_input_connection(stream, pact, connection)
@@ -76,7 +76,7 @@ impl<G: Scope> OperatorBuilder<G> {
7676
/// antichain indicating that there is no connection from the input to the output.
7777
pub fn new_input_connection<D: Data, P>(&mut self, stream: &CoreStream<G, D>, pact: P, connection: Vec<Antichain<<G::Timestamp as Timestamp>::Summary>>) -> InputHandle<G::Timestamp, D, P::Puller>
7878
where
79-
P: ParallelizationContract<G::Timestamp, D> {
79+
P: ParallelizationContractCore<G::Timestamp, D> {
8080

8181
let puller = self.builder.new_input_connection(stream, pact, connection);
8282

@@ -88,7 +88,7 @@ impl<G: Scope> OperatorBuilder<G> {
8888
}
8989

9090
/// Adds a new output to a generic operator builder, returning the `Push` implementor to use.
91-
pub fn new_output<D: Container>(&mut self) -> (OutputWrapper<G::Timestamp, D, Tee<G::Timestamp, D>>, CoreStream<G, D>) {
91+
pub fn new_output<D: Container>(&mut self) -> (OutputWrapper<G::Timestamp, D, TeeCore<G::Timestamp, D>>, CoreStream<G, D>) {
9292
let connection = vec![Antichain::from_elem(Default::default()); self.builder.shape().inputs()];
9393
self.new_output_connection(connection)
9494
}
@@ -101,7 +101,7 @@ impl<G: Scope> OperatorBuilder<G> {
101101
///
102102
/// Commonly the connections are either the unit summary, indicating the same timestamp might be produced as output, or an empty
103103
/// antichain indicating that there is no connection from the input to the output.
104-
pub fn new_output_connection<D: Container>(&mut self, connection: Vec<Antichain<<G::Timestamp as Timestamp>::Summary>>) -> (OutputWrapper<G::Timestamp, D, Tee<G::Timestamp, D>>, CoreStream<G, D>) {
104+
pub fn new_output_connection<D: Container>(&mut self, connection: Vec<Antichain<<G::Timestamp as Timestamp>::Summary>>) -> (OutputWrapper<G::Timestamp, D, TeeCore<G::Timestamp, D>>, CoreStream<G, D>) {
105105

106106
let (tee, stream) = self.builder.new_output_connection(connection);
107107

timely/src/dataflow/operators/generic/handles.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -179,8 +179,8 @@ impl<T: Timestamp, D: Container, P: Push<Bundle<T, D>>> OutputWrapper<T, D, P> {
179179
///
180180
/// This method ensures that the only access to the push buffer is through the `OutputHandle`
181181
/// type which ensures the use of capabilities, and which calls `cease` when it is dropped.
182-
pub fn activate(&mut self) -> OutputHandle<T, D, P> {
183-
OutputHandle {
182+
pub fn activate(&mut self) -> OutputHandleCore<T, D, P> {
183+
OutputHandleCore {
184184
push_buffer: &mut self.push_buffer,
185185
internal_buffer: &self.internal_buffer,
186186
}
@@ -189,12 +189,15 @@ impl<T: Timestamp, D: Container, P: Push<Bundle<T, D>>> OutputWrapper<T, D, P> {
189189

190190

191191
/// Handle to an operator's output stream.
192-
pub struct OutputHandle<'a, T: Timestamp, C: Container+'a, P: Push<Bundle<T, C>>+'a> {
192+
pub struct OutputHandleCore<'a, T: Timestamp, C: Container+'a, P: Push<Bundle<T, C>>+'a> {
193193
push_buffer: &'a mut Buffer<T, C, PushCounter<T, C, P>>,
194194
internal_buffer: &'a Rc<RefCell<ChangeBatch<T>>>,
195195
}
196196

197-
impl<'a, T: Timestamp, C: Container, P: Push<Bundle<T, C>>> OutputHandle<'a, T, C, P> {
197+
/// Handle specialized to `Vec`-based container.
198+
pub type OutputHandle<'a, T, D, P> = OutputHandleCore<'a, T, Vec<D>, P>;
199+
200+
impl<'a, T: Timestamp, C: Container, P: Push<Bundle<T, C>>> OutputHandleCore<'a, T, C, P> {
198201
/// Obtains a session that can send data at the timestamp associated with capability `cap`.
199202
///
200203
/// In order to send data at a future timestamp, obtain a capability for the new timestamp
@@ -223,7 +226,7 @@ impl<'a, T: Timestamp, C: Container, P: Push<Bundle<T, C>>> OutputHandle<'a, T,
223226
}
224227
}
225228

226-
impl<'a, T: Timestamp, C: Container, P: Push<Bundle<T, C>>> Drop for OutputHandle<'a, T, C, P> {
229+
impl<'a, T: Timestamp, C: Container, P: Push<Bundle<T, C>>> Drop for OutputHandleCore<'a, T, C, P> {
227230
fn drop(&mut self) {
228231
self.push_buffer.cease();
229232
}

0 commit comments

Comments
 (0)