diff --git a/diagnostics/src/logging.rs b/diagnostics/src/logging.rs index cc4b8331a..c96358394 100644 --- a/diagnostics/src/logging.rs +++ b/diagnostics/src/logging.rs @@ -30,7 +30,7 @@ use std::time::{Duration, Instant}; use differential_dataflow::collection::concatenate; use differential_dataflow::logging::{DifferentialEvent, DifferentialEventBuilder}; -use differential_dataflow::operators::arrange::TraceAgent; +use differential_dataflow::operators::arrange::TraceIntra; use differential_dataflow::trace::implementations::{KeySpine, ValSpine}; use differential_dataflow::{AsCollection, VecCollection}; @@ -165,9 +165,9 @@ pub type DiagnosticEvent = Event; // ============================================================================ /// A key-value trace: key K, value V, time Duration, diff i64. -type ValTrace = TraceAgent>; +type ValTrace = TraceIntra>; /// A key-only trace: key K, time Duration, diff i64. -type KeyTrace = TraceAgent>; +type KeyTrace = TraceIntra>; /// Trace handles for timely logging arrangements. pub struct TimelyTraces { diff --git a/differential-dataflow/examples/arrange.rs b/differential-dataflow/examples/arrange.rs index 2e5623679..5052489e1 100644 --- a/differential-dataflow/examples/arrange.rs +++ b/differential-dataflow/examples/arrange.rs @@ -86,8 +86,7 @@ fn main() { }) .probe_with(&mut probe) .as_collection() - .arrange_by_key() - // .arrange::() + .arrange_by_key_inter() .trace }); diff --git a/differential-dataflow/examples/columnar/columnar_support.rs b/differential-dataflow/examples/columnar/columnar_support.rs index 1241702e7..21cc33db1 100644 --- a/differential-dataflow/examples/columnar/columnar_support.rs +++ b/differential-dataflow/examples/columnar/columnar_support.rs @@ -1966,15 +1966,13 @@ where stream.as_collection() } -/// Extract a `Collection<_, RecordedUpdates>` from a columnar `Arranged`. +/// Extract a `Collection<_, RecordedUpdates>` from a columnar batch stream. /// /// Cursors through each batch and pushes `(key, val, time, diff)` refs into /// a `ValColBuilder`, which sorts and consolidates on flush. -pub fn as_recorded_updates( - arranged: differential_dataflow::operators::arrange::Arranged< - differential_dataflow::operators::arrange::TraceAgent>, - >, -) -> differential_dataflow::Collection> +pub fn as_recorded_updates<'scope, U>( + stream: timely::dataflow::Stream<'scope, U::Time, Vec>>>>, +) -> differential_dataflow::Collection<'scope, U::Time, RecordedUpdates> where U: layout::ColumnarUpdate, { @@ -1983,7 +1981,7 @@ where use differential_dataflow::trace::{BatchReader, Cursor}; use differential_dataflow::AsCollection; - arranged.stream + stream .unary::, _, _, _>(Pipeline, "AsRecordedUpdates", |_, _| { move |input, output| { input.for_each(|time, batches| { diff --git a/differential-dataflow/examples/columnar/main.rs b/differential-dataflow/examples/columnar/main.rs index 56380089c..fe61fbf38 100644 --- a/differential-dataflow/examples/columnar/main.rs +++ b/differential-dataflow/examples/columnar/main.rs @@ -96,7 +96,7 @@ mod reachability { use differential_dataflow::Collection; use differential_dataflow::AsCollection; use differential_dataflow::operators::iterate::Variable; - use differential_dataflow::operators::arrange::arrangement::arrange_core; + use differential_dataflow::operators::arrange::arrangement::arrange_intra; use differential_dataflow::operators::join::join_traces; use crate::columnar_support::*; @@ -127,13 +127,13 @@ mod reachability { let edges_pact = ValPact { hashfunc: |k: columnar::Ref<'_, Node>| *k as u64 }; let reach_pact = ValPact { hashfunc: |k: columnar::Ref<'_, Node>| *k as u64 }; - let edges_arr = arrange_core::<_, + let edges_arr = arrange_intra::<_, ValBatcher, ValBuilder, ValSpine, >(edges_inner.inner, edges_pact, "Edges"); - let reach_arr = arrange_core::<_, + let reach_arr = arrange_intra::<_, ValBatcher, ValBuilder, ValSpine, @@ -157,7 +157,7 @@ mod reachability { // Arrange for reduce. let combined_pact = ValPact { hashfunc: |k: columnar::Ref<'_, Node>| *k as u64 }; - let combined_arr = arrange_core::<_, + let combined_arr = arrange_intra::<_, ValBatcher, ValBuilder, ValSpine, @@ -180,7 +180,7 @@ mod reachability { }); // Extract RecordedUpdates from the Arranged's batch stream. - let result_col = as_recorded_updates::<(Node, (), IterTime, Diff)>(result); + let result_col = as_recorded_updates::<(Node, (), IterTime, Diff)>(result.stream); variable.set(result_col.clone()); diff --git a/differential-dataflow/examples/graspan.rs b/differential-dataflow/examples/graspan.rs index 7b6117d24..358397943 100644 --- a/differential-dataflow/examples/graspan.rs +++ b/differential-dataflow/examples/graspan.rs @@ -64,10 +64,10 @@ pub struct Query { } use differential_dataflow::trace::implementations::{ValSpine, KeySpine}; -use differential_dataflow::operators::arrange::{Arranged, TraceAgent}; +use differential_dataflow::operators::arrange::{Arranged, TraceInter}; -type TraceKeyHandle = TraceAgent>; -type TraceValHandle = TraceAgent>; +type TraceKeyHandle = TraceInter>; +type TraceValHandle = TraceInter>; type Arrange<'scope, T,K,V,R> = Arranged<'scope, TraceValHandle>; /// An evolving set of edges. @@ -114,14 +114,14 @@ impl<'scope, T: Timestamp + Lattice> EdgeVariable<'scope, T> { /// The collection arranged in the forward direction. pub fn forward(&mut self) -> &Arrange<'scope, T, Node, Node, Diff> { if self.forward.is_none() { - self.forward = Some(self.collection.clone().arrange_by_key()); + self.forward = Some(self.collection.clone().arrange_by_key_inter()); } self.forward.as_ref().unwrap() } /// The collection arranged in the reverse direction. pub fn reverse(&mut self) -> &Arrange<'scope, T, Node, Node, Diff> { if self.reverse.is_none() { - self.reverse = Some(self.collection.clone().map(|(x,y)| (y,x)).arrange_by_key()); + self.reverse = Some(self.collection.clone().map(|(x,y)| (y,x)).arrange_by_key_inter()); } self.reverse.as_ref().unwrap() } @@ -170,7 +170,7 @@ impl Query { // create variables and result handles for each named relation. for (name, (input, collection)) in input_map { let edge_variable = EdgeVariable::from(collection.enter(subscope), Product::new(Default::default(), 1)); - let trace = edge_variable.collection.clone().leave(scope).arrange_by_self().trace; + let trace = edge_variable.collection.clone().leave(scope).arrange_by_self_inter().trace; result_map.insert(name.clone(), RelationHandles { input, trace }); variable_map.insert(name.clone(), edge_variable); } @@ -199,7 +199,7 @@ impl Query { transposed = transposed .join_core(to_join, |_k,&x,&y| Some((y,x))) - .arrange_by_key(); + .arrange_by_key_inter(); } // Reverse the direction before adding it as a production. diff --git a/differential-dataflow/src/collection.rs b/differential-dataflow/src/collection.rs index 16755c27e..d5fa028b0 100644 --- a/differential-dataflow/src/collection.rs +++ b/differential-dataflow/src/collection.rs @@ -700,7 +700,7 @@ pub mod vec { } use crate::trace::{Trace, Builder}; - use crate::operators::arrange::{Arranged, TraceAgent}; + use crate::operators::arrange::{Arranged, TraceInter, TraceIntra}; impl <'scope, T, K, V, R> Collection<'scope, T, (K, V), R> where @@ -779,7 +779,7 @@ pub mod vec { /// .trace; /// }); /// ``` - pub fn reduce_abelian(self, name: &str, mut logic: L) -> Arranged<'scope, TraceAgent> + pub fn reduce_abelian(self, name: &str, mut logic: L) -> Arranged<'scope, TraceIntra> where T2: for<'a> Trace= &'a K, ValOwn = V, Time=T, Diff: Abelian>+'static, Bu: Builder, Output = T2::Batch>, @@ -797,7 +797,7 @@ pub mod vec { /// Unlike `reduce_arranged`, this method may be called with an empty `input`, /// and it may not be safe to index into the first element. /// At least one of the two collections will be non-empty. - pub fn reduce_core(self, name: &str, logic: L) -> Arranged<'scope, TraceAgent> + pub fn reduce_core(self, name: &str, logic: L) -> Arranged<'scope, TraceIntra> where V: Clone+'static, T2: for<'a> Trace=&'a K, ValOwn = V, Time=T>+'static, @@ -1027,14 +1027,14 @@ pub mod vec { V: crate::ExchangeData, R: crate::ExchangeData + Semigroup, { - fn arrange_named(self, name: &str) -> Arranged<'scope, TraceAgent> + fn arrange_named(self, name: &str) -> Arranged<'scope, TraceIntra> where Ba: crate::trace::Batcher, Time=T> + 'static, Bu: crate::trace::Builder, Tr: crate::trace::Trace + 'static, { let exchange = timely::dataflow::channels::pact::Exchange::new(move |update: &((K,V),T,R)| (update.0).0.hashed().into()); - crate::operators::arrange::arrangement::arrange_core::<_, Ba, Bu, _>(self.inner, exchange, name) + crate::operators::arrange::arrangement::arrange_intra::<_, Ba, Bu, _>(self.inner, exchange, name) } } @@ -1042,14 +1042,14 @@ pub mod vec { where T: Timestamp + Lattice + Ord, { - fn arrange_named(self, name: &str) -> Arranged<'scope, TraceAgent> + fn arrange_named(self, name: &str) -> Arranged<'scope, TraceIntra> where Ba: crate::trace::Batcher, Time=T> + 'static, Bu: crate::trace::Builder, Tr: crate::trace::Trace + 'static, { let exchange = timely::dataflow::channels::pact::Exchange::new(move |update: &((K,()),T,R)| (update.0).0.hashed().into()); - crate::operators::arrange::arrangement::arrange_core::<_,Ba,Bu,_>(self.map(|k| (k, ())).inner, exchange, name) + crate::operators::arrange::arrangement::arrange_intra::<_,Ba,Bu,_>(self.map(|k| (k, ())).inner, exchange, name) } } @@ -1063,14 +1063,26 @@ pub mod vec { /// This operator arranges a stream of values into a shared trace, whose contents it maintains. /// This trace is current for all times completed by the output stream, which can be used to /// safely identify the stable times and values in the trace. - pub fn arrange_by_key(self) -> Arranged<'scope, TraceAgent>> { + pub fn arrange_by_key(self) -> Arranged<'scope, TraceIntra>> { self.arrange_by_key_named("ArrangeByKey") } /// As `arrange_by_key` but with the ability to name the arrangement. - pub fn arrange_by_key_named(self, name: &str) -> Arranged<'scope, TraceAgent>> { + pub fn arrange_by_key_named(self, name: &str) -> Arranged<'scope, TraceIntra>> { self.arrange_named::,ValBuilder<_,_,_,_>,_>(name) } + + + /// As `arrange_by_key` but producing a `TraceInter` that can be imported into other dataflows. + pub fn arrange_by_key_inter(self) -> Arranged<'scope, TraceInter>> { + self.arrange_by_key_inter_named("ArrangeByKey") + } + + /// As `arrange_by_key_inter` but with the ability to name the arrangement. + pub fn arrange_by_key_inter_named(self, name: &str) -> Arranged<'scope, TraceInter>> { + let exchange = timely::dataflow::channels::pact::Exchange::new(move |update: &((K,V),T,R)| (update.0).0.hashed().into()); + crate::operators::arrange::arrangement::arrange_inter::<_,ValBatcher<_,_,_,_>,ValBuilder<_,_,_,_>,_>(self.inner, exchange, name) + } } impl<'scope, T, K: crate::ExchangeData+Hashable, R: crate::ExchangeData+Semigroup> Collection<'scope, T, K, R> @@ -1082,15 +1094,27 @@ pub mod vec { /// This operator arranges a collection of records into a shared trace, whose contents it maintains. /// This trace is current for all times complete in the output stream, which can be used to safely /// identify the stable times and values in the trace. - pub fn arrange_by_self(self) -> Arranged<'scope, TraceAgent>> { + pub fn arrange_by_self(self) -> Arranged<'scope, TraceIntra>> { self.arrange_by_self_named("ArrangeBySelf") } /// As `arrange_by_self` but with the ability to name the arrangement. - pub fn arrange_by_self_named(self, name: &str) -> Arranged<'scope, TraceAgent>> { + pub fn arrange_by_self_named(self, name: &str) -> Arranged<'scope, TraceIntra>> { self.map(|k| (k, ())) .arrange_named::,KeyBuilder<_,_,_>,_>(name) } + + + /// As `arrange_by_self` but producing a `TraceInter` that can be imported into other dataflows. + pub fn arrange_by_self_inter(self) -> Arranged<'scope, TraceInter>> { + self.arrange_by_self_inter_named("ArrangeBySelf") + } + + /// As `arrange_by_self_inter` but with the ability to name the arrangement. + pub fn arrange_by_self_inter_named(self, name: &str) -> Arranged<'scope, TraceInter>> { + let exchange = timely::dataflow::channels::pact::Exchange::new(move |update: &((K,()),T,R)| (update.0).0.hashed().into()); + crate::operators::arrange::arrangement::arrange_inter::<_,KeyBatcher<_,_,_>,KeyBuilder<_,_,_>,_>(self.map(|k| (k, ())).inner, exchange, name) + } } impl<'scope, T, K, V, R> Collection<'scope, T, (K, V), R> diff --git a/differential-dataflow/src/operators/arrange/agent.rs b/differential-dataflow/src/operators/arrange/agent.rs index 630c4258a..1ff212e48 100644 --- a/differential-dataflow/src/operators/arrange/agent.rs +++ b/differential-dataflow/src/operators/arrange/agent.rs @@ -15,19 +15,20 @@ use crate::trace::wrappers::rc::TraceBox; use timely::scheduling::Activator; -use super::{TraceWriter, TraceAgentQueueWriter, TraceAgentQueueReader, Arranged}; +use super::{TraceWriterInter, TraceInterQueueWriter, TraceInterQueueReader, Arranged}; use super::TraceReplayInstruction; +use super::writer::TraceWriterIntra; use crate::trace::wrappers::frontier::{TraceFrontier, BatchFrontier}; -/// A `TraceReader` wrapper which can be imported into other dataflows. +/// Trace reader that can share a trace within a dataflow. /// -/// The `TraceAgent` is the default trace type produced by `arranged`, and it can be extracted -/// from the dataflow in which it was defined, and imported into other dataflows. -pub struct TraceAgent { +/// Unlike `TraceInter`, this trace reader cannot be shared across +/// dataflows, but in exchange doesn't need to be scheduled as its +/// frontiers advance, in the absence of updates. +pub struct TraceIntra { trace: Rc>>, - queues: Weak>>>, logical_compaction: Antichain, physical_compaction: Antichain, temp_antichain: Antichain, @@ -37,11 +38,11 @@ pub struct TraceAgent { } use crate::trace::implementations::WithLayout; -impl WithLayout for TraceAgent { +impl WithLayout for TraceIntra { type Layout = Tr::Layout; } -impl TraceReader for TraceAgent { +impl TraceReader for TraceIntra { type Batch = Tr::Batch; type Storage = Tr::Storage; @@ -75,14 +76,13 @@ impl TraceReader for TraceAgent { fn map_batches(&self, f: F) { self.trace.borrow().trace.map_batches(f) } } -impl TraceAgent { - /// Creates a new agent from a trace reader. - pub fn new(trace: Tr, operator: OperatorInfo, logging: Option) -> (Self, TraceWriter) +impl TraceIntra { + /// Creates a new inner agent from a trace reader, returning the agent and a matching writer. + pub fn new(trace: Tr, operator: OperatorInfo, logging: Option) -> (Self, TraceWriterIntra) where Tr: Trace, { let trace = Rc::new(RefCell::new(TraceBox::new(trace))); - let queues = Rc::new(RefCell::new(Vec::new())); if let Some(logging) = &logging { logging.log( @@ -90,21 +90,132 @@ impl TraceAgent { ); } - let reader = TraceAgent { - trace: trace.clone(), - queues: Rc::downgrade(&queues), + let reader = TraceIntra { logical_compaction: trace.borrow().logical_compaction.frontier().to_owned(), physical_compaction: trace.borrow().physical_compaction.frontier().to_owned(), + trace: Rc::clone(&trace), temp_antichain: Antichain::new(), operator, logging, }; - let writer = TraceWriter::new( - vec![Tr::Time::minimum()], - Rc::downgrade(&trace), - queues, - ); + let writer = TraceWriterIntra::new(vec![Tr::Time::minimum()], Rc::downgrade(&trace)); + + (reader, writer) + } + + /// The [OperatorInfo] of the underlying Timely operator + pub fn operator(&self) -> &OperatorInfo { + &self.operator + } + + /// Obtain a reference to the inner [`TraceBox`]. It is the caller's obligation to maintain + /// the trace box and this trace agent's invariants. Specifically, it is undefined behavior + /// to mutate the trace box. Keeping strong references can prevent resource reclamation. + /// + /// This method is subject to changes and removal and should not be considered part of a stable + /// interface. + pub fn trace_box_unstable(&self) -> Rc>> { + Rc::clone(&self.trace) + } +} + +impl Clone for TraceIntra { + fn clone(&self) -> Self { + + if let Some(logging) = &self.logging { + logging.log( + crate::logging::TraceShare { operator: self.operator.global_id, diff: 1 } + ); + } + + // increase counts for wrapped `TraceBox`. + let empty_frontier = Antichain::new(); + self.trace.borrow_mut().adjust_logical_compaction(empty_frontier.borrow(), self.logical_compaction.borrow()); + self.trace.borrow_mut().adjust_physical_compaction(empty_frontier.borrow(), self.physical_compaction.borrow()); + + TraceIntra { + trace: Rc::clone(&self.trace), + logical_compaction: self.logical_compaction.clone(), + physical_compaction: self.physical_compaction.clone(), + operator: self.operator.clone(), + logging: self.logging.clone(), + temp_antichain: Antichain::new(), + } + } +} + +impl Drop for TraceIntra { + fn drop(&mut self) { + + if let Some(logging) = &self.logging { + logging.log( + crate::logging::TraceShare { operator: self.operator.global_id, diff: -1 } + ); + } + + // decrement borrow counts to remove all holds + let empty_frontier = Antichain::new(); + self.trace.borrow_mut().adjust_logical_compaction(self.logical_compaction.borrow(), empty_frontier.borrow()); + self.trace.borrow_mut().adjust_physical_compaction(self.physical_compaction.borrow(), empty_frontier.borrow()); + } +} + +/// Trace reader that can both share a trace within a dataflow and be imported into other dataflows. +/// +/// Unlike `TraceIntra`, this trace reader can be shared across dataflows, +/// but in exchange it must be scheduled whenever its frontiers advance. +/// This can increase the scheduling load. +pub struct TraceInter { + /// Inner agent maintaining the shared trace and compaction state. + inner: TraceIntra, + /// A sequence of private queues into which batches are written. + queues: Weak>>>, +} + +impl WithLayout for TraceInter { + type Layout = Tr::Layout; +} + +impl TraceReader for TraceInter { + + type Batch = Tr::Batch; + type Storage = Tr::Storage; + type Cursor = Tr::Cursor; + + fn set_logical_compaction(&mut self, frontier: AntichainRef) { + self.inner.set_logical_compaction(frontier); + } + fn get_logical_compaction(&mut self) -> AntichainRef<'_, Tr::Time> { + self.inner.get_logical_compaction() + } + fn set_physical_compaction(&mut self, frontier: AntichainRef<'_, Tr::Time>) { + self.inner.set_physical_compaction(frontier); + } + fn get_physical_compaction(&mut self) -> AntichainRef<'_, Tr::Time> { + self.inner.get_physical_compaction() + } + fn cursor_through(&mut self, frontier: AntichainRef<'_, Tr::Time>) -> Option<(Self::Cursor, Self::Storage)> { + self.inner.cursor_through(frontier) + } + fn map_batches(&self, f: F) { self.inner.map_batches(f) } +} + +impl TraceInter { + /// Creates a new agent from a trace reader. + pub fn new(trace: Tr, operator: OperatorInfo, logging: Option) -> (Self, TraceWriterInter) + where + Tr: Trace, + { + let queues = Rc::new(RefCell::new(Vec::new())); + let (inner, writer_inner) = TraceIntra::new(trace, operator, logging); + + let reader = TraceInter { + inner, + queues: Rc::downgrade(&queues), + }; + + let writer = TraceWriterInter::from_intra(writer_inner, queues); (reader, writer) } @@ -114,18 +225,18 @@ impl TraceAgent { /// The queue is first populated with existing batches from the trace, /// The queue will be immediately populated with existing historical batches from the trace, and until the reference /// is dropped the queue will receive new batches as produced by the source `arrange` operator. - pub fn new_listener(&mut self, activator: Activator) -> TraceAgentQueueReader + pub fn new_listener(&mut self, activator: Activator) -> TraceInterQueueReader { // create a new queue for progress and batch information. let mut new_queue = VecDeque::new(); // add the existing batches from the trace let mut upper = None; - self.trace + self.inner.trace .borrow_mut() .trace .map_batches(|batch| { - new_queue.push_back(TraceReplayInstruction::Batch(batch.clone(), Some(Tr::Time::minimum()))); + new_queue.push_back(TraceReplayInstruction::Batch(batch.clone(), Some(::minimum()))); upper = Some(batch.upper().clone()); }); @@ -144,9 +255,7 @@ impl TraceAgent { } /// The [OperatorInfo] of the underlying Timely operator - pub fn operator(&self) -> &OperatorInfo { - &self.operator - } + pub fn operator(&self) -> &OperatorInfo { self.inner.operator() } /// Obtain a reference to the inner [`TraceBox`]. It is the caller's obligation to maintain /// the trace box and this trace agent's invariants. Specifically, it is undefined behavior @@ -154,12 +263,13 @@ impl TraceAgent { /// /// This method is subject to changes and removal and should not be considered part of a stable /// interface. - pub fn trace_box_unstable(&self) -> Rc>> { - Rc::clone(&self.trace) - } + pub fn trace_box_unstable(&self) -> Rc>> { self.inner.trace_box_unstable() } + + /// Extracts the inner `TraceIntra`, discarding queue management. + pub fn into_intra(self) -> TraceIntra { self.inner } } -impl TraceAgent { +impl TraceInter { /// Copies an existing collection into the supplied scope. /// /// This method creates an `Arranged` collection that should appear indistinguishable from applying `arrange` @@ -194,7 +304,7 @@ impl TraceAgent { /// let mut trace = worker.dataflow::(|scope| { /// // create input handle and collection. /// scope.new_collection_from(0 .. 10).1 - /// .arrange_by_self() + /// .arrange_by_self_inter() /// .trace /// }); /// @@ -215,14 +325,12 @@ impl TraceAgent { /// /// }).unwrap(); /// ``` - pub fn import<'scope>(&mut self, scope: Scope<'scope, Tr::Time>) -> Arranged<'scope, TraceAgent> - { + pub fn import<'scope>(&mut self, scope: Scope<'scope, Tr::Time>) -> Arranged<'scope, TraceInter> { self.import_named(scope, "ArrangedSource") } /// Same as `import`, but allows to name the source. - pub fn import_named<'scope>(&mut self, scope: Scope<'scope, Tr::Time>, name: &str) -> Arranged<'scope, TraceAgent> - { + pub fn import_named<'scope>(&mut self, scope: Scope<'scope, Tr::Time>, name: &str) -> Arranged<'scope, TraceInter> { // Drop ShutdownButton and return only the arrangement. self.import_core(scope, name).0 } @@ -247,7 +355,7 @@ impl TraceAgent { /// let mut trace = worker.dataflow::(|scope| { /// // create input handle and collection. /// input.to_collection(scope) - /// .arrange_by_self() + /// .arrange_by_self_inter() /// .trace /// }); /// @@ -274,8 +382,7 @@ impl TraceAgent { /// /// }).unwrap(); /// ``` - pub fn import_core<'scope>(&mut self, scope: Scope<'scope, Tr::Time>, name: &str) -> (Arranged<'scope, TraceAgent>, ShutdownButton>) - { + pub fn import_core<'scope>(&mut self, scope: Scope<'scope, Tr::Time>, name: &str) -> (Arranged<'scope, TraceInter>, ShutdownButton>) { let trace = self.clone(); let mut shutdown_button = None; @@ -349,7 +456,7 @@ impl TraceAgent { /// let (mut handle, mut trace) = worker.dataflow::(|scope| { /// // create input handle and collection. /// let (handle, stream) = scope.new_collection(); - /// let trace = stream.arrange_by_self().trace; + /// let trace = stream.arrange_by_self_inter().trace; /// (handle, trace) /// }); /// @@ -387,10 +494,7 @@ impl TraceAgent { /// /// }).unwrap(); /// ``` - pub fn import_frontier<'scope>(&mut self, scope: Scope<'scope, Tr::Time>, name: &str) -> (Arranged<'scope, TraceFrontier>>, ShutdownButton>) - where - Tr: TraceReader, - { + pub fn import_frontier<'scope>(&mut self, scope: Scope<'scope, Tr::Time>, name: &str) -> (Arranged<'scope, TraceFrontier>>, ShutdownButton>) { // This frontier describes our only guarantee on the compaction frontier. let since = self.get_logical_compaction().to_owned(); self.import_frontier_core(scope, name, since, Antichain::new()) @@ -404,10 +508,7 @@ impl TraceAgent { /// /// Invoking this method with an `until` of `Antichain::new()` will perform no filtering, as the empty /// frontier indicates the end of times. - pub fn import_frontier_core<'scope>(&mut self, scope: Scope<'scope, Tr::Time>, name: &str, since: Antichain, until: Antichain) -> (Arranged<'scope, TraceFrontier>>, ShutdownButton>) - where - Tr: TraceReader, - { + pub fn import_frontier_core<'scope>(&mut self, scope: Scope<'scope, Tr::Time>, name: &str, since: Antichain, until: Antichain) -> (Arranged<'scope, TraceFrontier>>, ShutdownButton>) { let trace = self.clone(); let trace = TraceFrontier::make_from(trace, since.borrow(), until.borrow()); @@ -506,44 +607,11 @@ impl Drop for ShutdownDeadmans { } } -impl Clone for TraceAgent { +impl Clone for TraceInter { fn clone(&self) -> Self { - - if let Some(logging) = &self.logging { - logging.log( - crate::logging::TraceShare { operator: self.operator.global_id, diff: 1 } - ); - } - - // increase counts for wrapped `TraceBox`. - let empty_frontier = Antichain::new(); - self.trace.borrow_mut().adjust_logical_compaction(empty_frontier.borrow(), self.logical_compaction.borrow()); - self.trace.borrow_mut().adjust_physical_compaction(empty_frontier.borrow(), self.physical_compaction.borrow()); - - TraceAgent { - trace: self.trace.clone(), + TraceInter { + inner: self.inner.clone(), queues: self.queues.clone(), - logical_compaction: self.logical_compaction.clone(), - physical_compaction: self.physical_compaction.clone(), - operator: self.operator.clone(), - logging: self.logging.clone(), - temp_antichain: Antichain::new(), } } } - -impl Drop for TraceAgent { - fn drop(&mut self) { - - if let Some(logging) = &self.logging { - logging.log( - crate::logging::TraceShare { operator: self.operator.global_id, diff: -1 } - ); - } - - // decrement borrow counts to remove all holds - let empty_frontier = Antichain::new(); - self.trace.borrow_mut().adjust_logical_compaction(self.logical_compaction.borrow(), empty_frontier.borrow()); - self.trace.borrow_mut().adjust_physical_compaction(self.physical_compaction.borrow(), empty_frontier.borrow()); - } -} diff --git a/differential-dataflow/src/operators/arrange/arrangement.rs b/differential-dataflow/src/operators/arrange/arrangement.rs index de533a08f..e798f588e 100644 --- a/differential-dataflow/src/operators/arrange/arrangement.rs +++ b/differential-dataflow/src/operators/arrange/arrangement.rs @@ -35,7 +35,7 @@ use trace::wrappers::enter::{TraceEnter, BatchEnter,}; use trace::wrappers::enter_at::TraceEnter as TraceEnterAt; use trace::wrappers::enter_at::BatchEnter as BatchEnterAt; -use super::TraceAgent; +use super::{TraceIntra, TraceInter}; /// An arranged collection of `(K,V)` values. /// @@ -264,8 +264,8 @@ impl<'scope, Tr1> Arranged<'scope, Tr1> where Tr1: TraceReader + Clone + 'static, { - /// A direct implementation of `ReduceCore::reduce_abelian`. - pub fn reduce_abelian(self, name: &str, mut logic: L, push: P) -> Arranged<'scope, TraceAgent> + /// A direct implementation of `ReduceCore::reduce_abelian`, producing a `TraceIntra`. + pub fn reduce_abelian(self, name: &str, mut logic: L, push: P) -> Arranged<'scope, TraceIntra> where Tr2: for<'a> Trace< Key<'a>= Tr1::Key<'a>, @@ -286,8 +286,46 @@ where }, push) } - /// A direct implementation of `ReduceCore::reduce_core`. - pub fn reduce_core(self, name: &str, logic: L, push: P) -> Arranged<'scope, TraceAgent> + /// As `reduce_abelian` but producing a `TraceInter` that can be imported into other dataflows. + pub fn reduce_abelian_inter(self, name: &str, mut logic: L, push: P) -> Arranged<'scope, TraceInter> + where + Tr2: for<'a> Trace< + Key<'a>= Tr1::Key<'a>, + ValOwn: Data, + Time=Tr1::Time, + Diff: Abelian, + >+'static, + Bu: Builder, + L: FnMut(Tr1::Key<'_>, &[(Tr1::Val<'_>, Tr1::Diff)], &mut Vec<(Tr2::ValOwn, Tr2::Diff)>)+'static, + P: FnMut(&mut Bu::Input, Tr1::Key<'_>, &mut Vec<(Tr2::ValOwn, Tr2::Time, Tr2::Diff)>) + 'static, + { + self.reduce_core_inter::<_,Bu,Tr2,_>(name, move |key, input, output, change| { + if !input.is_empty() { + logic(key, input, change); + } + change.extend(output.drain(..).map(|(x,mut d)| { d.negate(); (x, d) })); + crate::consolidation::consolidate(change); + }, push) + } + + /// A direct implementation of `ReduceCore::reduce_core`, producing a `TraceIntra`. + pub fn reduce_core(self, name: &str, logic: L, push: P) -> Arranged<'scope, TraceIntra> + where + Tr2: for<'a> Trace< + Key<'a>=Tr1::Key<'a>, + ValOwn: Data, + Time=Tr1::Time, + >+'static, + Bu: Builder, + L: FnMut(Tr1::Key<'_>, &[(Tr1::Val<'_>, Tr1::Diff)], &mut Vec<(Tr2::ValOwn, Tr2::Diff)>, &mut Vec<(Tr2::ValOwn, Tr2::Diff)>)+'static, + P: FnMut(&mut Bu::Input, Tr1::Key<'_>, &mut Vec<(Tr2::ValOwn, Tr2::Time, Tr2::Diff)>) + 'static, + { + use crate::operators::reduce::reduce_trace_intra; + reduce_trace_intra::<_,Bu,_,_,_>(self, name, logic, push) + } + + /// As `reduce_core` but producing a `TraceInter` that can be imported into other dataflows. + pub fn reduce_core_inter(self, name: &str, logic: L, push: P) -> Arranged<'scope, TraceInter> where Tr2: for<'a> Trace< Key<'a>=Tr1::Key<'a>, @@ -298,8 +336,8 @@ where L: FnMut(Tr1::Key<'_>, &[(Tr1::Val<'_>, Tr1::Diff)], &mut Vec<(Tr2::ValOwn, Tr2::Diff)>, &mut Vec<(Tr2::ValOwn, Tr2::Diff)>)+'static, P: FnMut(&mut Bu::Input, Tr1::Key<'_>, &mut Vec<(Tr2::ValOwn, Tr2::Time, Tr2::Diff)>) + 'static, { - use crate::operators::reduce::reduce_trace; - reduce_trace::<_,Bu,_,_,_>(self, name, logic, push) + use crate::operators::reduce::reduce_trace_inter; + reduce_trace_inter::<_,Bu,_,_,_>(self, name, logic, push) } } @@ -321,13 +359,30 @@ where } } +impl<'scope, Tr> Arranged<'scope, TraceInter> +where + Tr: TraceReader, +{ + /// Converts an inter-dataflow trace to an intra-dataflow trace. + /// + /// This method is used primarily to harmonize types. It only unwraps the trace and discards + /// its ability to be imported into dataflows. It does not improve or alter the scheduling of + /// the backing dataflow operator. To get the improved scheduling, use `arrange_intra` instead. + pub fn into_intra<'outer>(self) -> Arranged<'scope, TraceIntra> { + Arranged { + stream: self.stream, + trace: self.trace.into_intra(), + } + } +} + /// A type that can be arranged as if a collection of updates. pub trait Arrange<'scope, T, C> : Sized where T: Timestamp + Lattice, { /// Arranges updates into a shared trace. - fn arrange(self) -> Arranged<'scope, TraceAgent> + fn arrange(self) -> Arranged<'scope, TraceIntra> where Ba: Batcher + 'static, Bu: Builder, @@ -337,7 +392,7 @@ where } /// Arranges updates into a shared trace, with a supplied name. - fn arrange_named(self, name: &str) -> Arranged<'scope, TraceAgent> + fn arrange_named(self, name: &str) -> Arranged<'scope, TraceIntra> where Ba: Batcher + 'static, Bu: Builder, @@ -345,12 +400,43 @@ where ; } -/// Arranges a stream of updates by a key, configured with a name and a parallelization contract. +/// Arranges a stream of updates into a trace that can be imported into other dataflows. +/// +/// The returned `TraceInter` can be shared across dataflows via `import`. Because this +/// requires propagating frontier advances outward even without data, the operator must be +/// continually scheduled, which adds scheduling overhead. Use `arrange_intra` if the trace +/// will only be consumed within the producing dataflow. +pub fn arrange_inter<'scope, P, Ba, Bu, Tr>(stream: Stream<'scope, Tr::Time, Ba::Input>, pact: P, name: &str) -> Arranged<'scope, TraceInter> +where + P: ParallelizationContract, + Ba: Batcher + 'static, + Bu: Builder, + Tr: Trace+'static, +{ + arrange_core::(stream, pact, name, timely::progress::operate::FrontierInterest::Always) +} + +/// Arranges a stream of updates into a trace scoped to the producing dataflow. +/// +/// The returned `TraceIntra` can be shared by operators within the same dataflow, but it +/// cannot be imported into other dataflows (it has no `import` method). In exchange, the +/// operator is only scheduled when it holds capabilities, reducing scheduling overhead. +pub fn arrange_intra<'scope, P, Ba, Bu, Tr>(stream: Stream<'scope, Tr::Time, Ba::Input>, pact: P, name: &str) -> Arranged<'scope, TraceIntra> +where + P: ParallelizationContract, + Ba: Batcher + 'static, + Bu: Builder, + Tr: Trace+'static, +{ + arrange_core::(stream, pact, name, timely::progress::operate::FrontierInterest::IfCapability).into_intra() +} + +/// Arranges a stream of updates by a key, configured with a name, a parallelization contract, +/// and a frontier interest policy. /// -/// This operator arranges a stream of values into a shared trace, whose contents it maintains. -/// It uses the supplied parallelization contract to distribute the data, which does not need to -/// be consistently by key (though this is the most common). -pub fn arrange_core<'scope, P, Ba, Bu, Tr>(stream: Stream<'scope, Tr::Time, Ba::Input>, pact: P, name: &str) -> Arranged<'scope, TraceAgent> +/// This is the general form that both `arrange_inter` and `arrange_intra` delegate to. +/// The `FrontierInterest` parameter controls when the operator is notified of frontier changes. +pub fn arrange_core<'scope, P, Ba, Bu, Tr>(stream: Stream<'scope, Tr::Time, Ba::Input>, pact: P, name: &str, interest: timely::progress::operate::FrontierInterest) -> Arranged<'scope, TraceInter> where P: ParallelizationContract, Ba: Batcher + 'static, @@ -372,38 +458,44 @@ where // held by the batcher, which may prevents the operator from sending an // empty batch. - let mut reader: Option> = None; + use timely::dataflow::operators::generic::builder_rc::OperatorBuilder; - // fabricate a data-parallel operator using the `unary_notify` pattern. - let reader_ref = &mut reader; let scope = stream.scope(); - let stream = stream.unary_frontier(pact, name, move |_capability, info| { + let mut builder = OperatorBuilder::new(name.to_owned(), scope); + let operator_info = builder.operator_info(); - // Acquire a logger for arrange events. - let logger = scope.worker().logger_for::("differential/arrange").map(Into::into); + let mut input = builder.new_input(stream, pact); + builder.set_notify_for(0, interest); + let (mut output, stream) = builder.new_output(); - // Where we will deposit received updates, and from which we extract batches. - let mut batcher = Ba::new(logger.clone(), info.global_id); + // Acquire a logger for arrange events. + let logger = scope.worker().logger_for::("differential/arrange").map(Into::into); - // Capabilities for the lower envelope of updates in `batcher`. - let mut capabilities = Antichain::>::new(); + // Where we will deposit received updates, and from which we extract batches. + let mut batcher = Ba::new(logger.clone(), operator_info.global_id); - let activator = Some(scope.activator_for(info.address.clone())); - let mut empty_trace = Tr::new(info.clone(), logger.clone(), activator); - // If there is default exertion logic set, install it. - if let Some(exert_logic) = scope.worker().config().get::("differential/default_exert_logic").cloned() { - empty_trace.set_exert_logic(exert_logic); - } + let activator = Some(scope.activator_for(operator_info.address.clone())); + let mut empty_trace = Tr::new(operator_info.clone(), logger.clone(), activator); + // If there is default exertion logic set, install it. + if let Some(exert_logic) = scope.worker().config().get::("differential/default_exert_logic").cloned() { + empty_trace.set_exert_logic(exert_logic); + } - let (reader_local, mut writer) = TraceAgent::new(empty_trace, info, logger); + let (trace, mut writer) = TraceInter::new(empty_trace, operator_info, logger); - *reader_ref = Some(reader_local); + builder.build(move |_capabilities| { + + // Capabilities for the lower envelope of updates in `batcher`. + let mut capabilities = Antichain::>::new(); // Initialize to the minimal input frontier. let mut prev_frontier = Antichain::from_elem(Tr::Time::minimum()); - move |(input, frontier), output| { + move |frontiers| { + + let frontier = &frontiers[0]; + let mut output = output.activate(); // As we receive data, we need to (i) stash the data and (ii) keep *enough* capabilities. // We don't have to keep all capabilities, but we need to be able to form output messages @@ -465,7 +557,7 @@ where writer.insert(batch.clone(), Some(capability.time().clone())); // send the batch to downstream consumers, empty or not. - output.session(&capabilities.elements()[index]).give(batch); + output.give(&capabilities.elements()[index], &mut vec![batch]); } } @@ -500,5 +592,5 @@ where } }); - Arranged { stream, trace: reader.unwrap() } + Arranged { stream, trace } } diff --git a/differential-dataflow/src/operators/arrange/mod.rs b/differential-dataflow/src/operators/arrange/mod.rs index 70f9fb866..aaffa28ef 100644 --- a/differential-dataflow/src/operators/arrange/mod.rs +++ b/differential-dataflow/src/operators/arrange/mod.rs @@ -57,8 +57,8 @@ pub enum TraceReplayInstruction { // Short names for strongly and weakly owned activators and shared queues. type BatchQueue = VecDeque>; -type TraceAgentQueueReader = Rc<(Activator, RefCell>)>; -type TraceAgentQueueWriter = Weak<(Activator, RefCell>)>; +type TraceInterQueueReader = Rc<(Activator, RefCell>)>; +type TraceInterQueueWriter = Weak<(Activator, RefCell>)>; pub mod writer; pub mod agent; @@ -66,7 +66,7 @@ pub mod arrangement; pub mod upsert; -pub use self::writer::TraceWriter; -pub use self::agent::{TraceAgent, ShutdownButton}; +pub use self::writer::{TraceWriterIntra, TraceWriterInter}; +pub use self::agent::{TraceIntra, TraceInter, ShutdownButton}; -pub use self::arrangement::{Arranged, Arrange}; \ No newline at end of file +pub use self::arrangement::{Arranged, Arrange}; diff --git a/differential-dataflow/src/operators/arrange/upsert.rs b/differential-dataflow/src/operators/arrange/upsert.rs index 5e336fc3d..8a3e3b175 100644 --- a/differential-dataflow/src/operators/arrange/upsert.rs +++ b/differential-dataflow/src/operators/arrange/upsert.rs @@ -114,7 +114,7 @@ use crate::{ExchangeData, Hashable}; use crate::trace::implementations::containers::BatchContainer; -use super::TraceAgent; +use super::TraceInter; /// Arrange data from a stream of keyed upserts. /// @@ -129,7 +129,7 @@ use super::TraceAgent; pub fn arrange_from_upsert<'scope, Bu, Tr, K, V>( stream: Stream<'scope, Tr::Time, Vec<(K, Option, Tr::Time)>>, name: &str, -) -> Arranged<'scope, TraceAgent> +) -> Arranged<'scope, TraceInter> where K: ExchangeData+Hashable+std::hash::Hash, V: ExchangeData, @@ -141,7 +141,7 @@ where >+'static, Bu: Builder, Output = Tr::Batch>, { - let mut reader: Option> = None; + let mut reader: Option> = None; // fabricate a data-parallel operator using the `unary_notify` pattern. let stream = { @@ -166,7 +166,7 @@ where empty_trace.set_exert_logic(exert_logic); } - let (mut reader_local, mut writer) = TraceAgent::new(empty_trace, info, logger); + let (mut reader_local, mut writer) = TraceInter::new(empty_trace, info, logger); // Capture the reader outside the builder scope. *reader = Some(reader_local.clone()); diff --git a/differential-dataflow/src/operators/arrange/writer.rs b/differential-dataflow/src/operators/arrange/writer.rs index 8df11690c..a9fbe8cd5 100644 --- a/differential-dataflow/src/operators/arrange/writer.rs +++ b/differential-dataflow/src/operators/arrange/writer.rs @@ -1,7 +1,8 @@ -//! Write endpoint for a sequence of batches. +//! Write endpoints for a sequence of batches. //! -//! A `TraceWriter` accepts a sequence of batches and distributes them -//! to both a shared trace and to a sequence of private queues. +//! A `TraceWriterIntra` distributes batches to a shared trace (intra-dataflow sharing). +//! A `TraceWriterInter` additionally distributes them to a set of private queues +//! (inter-dataflow sharing). use std::rc::{Rc, Weak}; use std::cell::RefCell; @@ -12,35 +13,31 @@ use crate::trace::{Trace, Batch, BatchReader}; use crate::trace::wrappers::rc::TraceBox; -use super::TraceAgentQueueWriter; +use super::TraceInterQueueWriter; use super::TraceReplayInstruction; -/// Write endpoint for a sequence of batches. +/// Write endpoint that maintains the frontier and shared trace. /// -/// A `TraceWriter` accepts a sequence of batches and distributes them -/// to both a shared trace and to a sequence of private queues. -pub struct TraceWriter { +/// Used for intra-dataflow sharing where readers consume the trace directly +/// rather than via a replay queue. +pub struct TraceWriterIntra { /// Current upper limit. upper: Antichain, /// Shared trace, possibly absent (due to weakness). trace: Weak>>, - /// A sequence of private queues into which batches are written. - queues: Rc>>>, } -impl TraceWriter { - /// Creates a new `TraceWriter`. - pub fn new( - upper: Vec, - trace: Weak>>, - queues: Rc>>> - ) -> Self - { +impl TraceWriterIntra { + /// Creates a new `TraceWriterIntra`. + pub fn new(upper: Vec, trace: Weak>>) -> Self { let mut temp = Antichain::new(); temp.extend(upper); - Self { upper: temp, trace, queues } + Self { upper: temp, trace } } + /// The current upper frontier. + pub fn upper(&self) -> &Antichain { &self.upper } + /// Exerts merge effort, even without additional updates. pub fn exert(&mut self) { if let Some(trace) = self.trace.upgrade() { @@ -50,10 +47,9 @@ impl TraceWriter { /// Advances the trace by `batch`. /// - /// The `hint` argument is either `None` in the case of an empty batch, - /// or is `Some(time)` for a time less or equal to all updates in the - /// batch and which is suitable for use as a capability. - pub fn insert(&mut self, batch: Tr::Batch, hint: Option) { + /// Asserts the batch is a valid continuation of the current frontier, + /// updates the upper frontier, and pushes the batch into the shared trace. + pub fn insert(&mut self, batch: Tr::Batch) { // Something is wrong if not a sequence. if !(&self.upper == batch.lower()) { @@ -64,6 +60,65 @@ impl TraceWriter { self.upper.clone_from(batch.upper()); + // push data to the trace, if it still exists. + if let Some(trace) = self.trace.upgrade() { + trace.borrow_mut().trace.insert(batch); + } + } + + /// Inserts an empty batch up to `upper`. + pub fn seal(&mut self, upper: Antichain) { + if self.upper != upper { + self.insert(Tr::Batch::empty(self.upper.clone(), upper)); + } + } +} + +impl Drop for TraceWriterIntra { + fn drop(&mut self) { self.seal(Antichain::new()) } +} + +/// Write endpoint that distributes batches to both a shared trace and private queues. +/// +/// Used for inter-dataflow sharing: in addition to writing to the trace, each batch +/// is pushed onto any listener queues so that importing dataflows can observe it. +pub struct TraceWriterInter { + /// Inner writer maintaining the frontier and shared trace. + inner: TraceWriterIntra, + /// A sequence of private queues into which batches are written. + queues: Rc>>>, +} + +impl TraceWriterInter { + /// Creates a new `TraceWriterInter`. + pub fn new( + upper: Vec, + trace: Weak>>, + queues: Rc>>> + ) -> Self + { + Self { inner: TraceWriterIntra::new(upper, trace), queues } + } + + /// Creates a new `TraceWriterInter` from an existing `TraceWriterIntra` and queues. + pub fn from_intra( + inner: TraceWriterIntra, + queues: Rc>>> + ) -> Self + { + Self { inner, queues } + } + + /// Exerts merge effort, even without additional updates. + pub fn exert(&mut self) { self.inner.exert() } + + /// Advances the trace by `batch`. + /// + /// The `hint` argument is either `None` in the case of an empty batch, + /// or is `Some(time)` for a time less or equal to all updates in the + /// batch and which is suitable for use as a capability. + pub fn insert(&mut self, batch: Tr::Batch, hint: Option) { + // push information to each listener that still exists. let mut borrow = self.queues.borrow_mut(); for queue in borrow.iter_mut() { @@ -75,23 +130,18 @@ impl TraceWriter { } borrow.retain(|w| w.upgrade().is_some()); - // push data to the trace, if it still exists. - if let Some(trace) = self.trace.upgrade() { - trace.borrow_mut().trace.insert(batch); - } - + // push data to the trace and update the frontier. + self.inner.insert(batch); } /// Inserts an empty batch up to `upper`. pub fn seal(&mut self, upper: Antichain) { - if self.upper != upper { - self.insert(Tr::Batch::empty(self.upper.clone(), upper), None); + if *self.inner.upper() != upper { + self.insert(Tr::Batch::empty(self.inner.upper().clone(), upper), None); } } } -impl Drop for TraceWriter { - fn drop(&mut self) { - self.seal(Antichain::new()) - } +impl Drop for TraceWriterInter { + fn drop(&mut self) { self.seal(Antichain::new()) } } diff --git a/differential-dataflow/src/operators/join.rs b/differential-dataflow/src/operators/join.rs index 00277ced3..72ba80745 100644 --- a/differential-dataflow/src/operators/join.rs +++ b/differential-dataflow/src/operators/join.rs @@ -9,8 +9,9 @@ use timely::{Accountable, ContainerBuilder}; use timely::container::PushInto; use timely::order::PartialOrder; use timely::progress::Timestamp; +use timely::progress::operate::FrontierInterest; use timely::dataflow::Stream; -use timely::dataflow::operators::generic::{Operator, OutputBuilderSession, Session}; +use timely::dataflow::operators::generic::{OutputBuilder, OutputBuilderSession, Session}; use timely::dataflow::channels::pact::Pipeline; use timely::dataflow::operators::Capability; @@ -77,13 +78,26 @@ where let mut trace1 = arranged1.trace.clone(); let mut trace2 = arranged2.trace.clone(); + use timely::dataflow::operators::generic::builder_rc::OperatorBuilder; + let scope = arranged1.stream.scope(); - arranged1.stream.binary_frontier(arranged2.stream, Pipeline, Pipeline, "Join", move |capability, info| { + let mut builder = OperatorBuilder::new("Join".to_owned(), scope); + let operator_info = builder.operator_info(); + + let mut input1 = builder.new_input(arranged1.stream, Pipeline); + let mut input2 = builder.new_input(arranged2.stream, Pipeline); + builder.set_notify_for(0, FrontierInterest::IfCapability); + builder.set_notify_for(1, FrontierInterest::IfCapability); + let (output, stream) = builder.new_output::(); + let mut output_builder = OutputBuilder::>::from(output); + + // Acquire an activator to reschedule the operator when it has unfinished work. + use timely::scheduling::Activator; + let activations = scope.activations().clone(); + let activator = Activator::new(operator_info.address.clone(), activations); - // Acquire an activator to reschedule the operator when it has unfinished work. - use timely::scheduling::Activator; - let activations = scope.activations().clone(); - let activator = Activator::new(info.address, activations); + builder.build(move |mut capabilities| { + let capability = capabilities.remove(0); // Our initial invariants are that for each trace, physical compaction is less or equal the trace's upper bound. // These invariants ensure that we can reference observed batch frontiers from `_start_upper` onward, as long as @@ -146,7 +160,12 @@ where let mut trace1_option = Some(trace1); let mut trace2_option = Some(trace2); - move |(input1, frontier1), (input2, frontier2), output| { + move |frontiers| { + + let frontier1 = &frontiers[0]; + let frontier2 = &frontiers[1]; + let mut output_session = output_builder.activate(); + let output = &mut output_session; // 1. Consuming input. // @@ -299,7 +318,9 @@ where } } } - }) + }); + + stream } diff --git a/differential-dataflow/src/operators/reduce.rs b/differential-dataflow/src/operators/reduce.rs index 06fbddc99..8715693ff 100644 --- a/differential-dataflow/src/operators/reduce.rs +++ b/differential-dataflow/src/operators/reduce.rs @@ -9,15 +9,45 @@ use crate::Data; use timely::progress::frontier::Antichain; use timely::progress::Timestamp; -use timely::dataflow::operators::Operator; +use timely::progress::operate::FrontierInterest; use timely::dataflow::channels::pact::Pipeline; -use crate::operators::arrange::{Arranged, TraceAgent}; +use crate::operators::arrange::{Arranged, TraceInter, TraceIntra}; use crate::trace::{BatchReader, Cursor, Trace, Builder, ExertionLogic, Description}; use crate::trace::cursor::CursorList; use crate::trace::implementations::containers::BatchContainer; use crate::trace::TraceReader; +/// A key-wise reduction of values in an input trace, producing a `TraceInter`. +/// +/// The returned trace can be imported into other dataflows. Use `reduce_trace_intra` if +/// cross-dataflow import is not needed, to reduce scheduling overhead. +pub fn reduce_trace_inter<'scope, Tr1, Bu, Tr2, L, P>(trace: Arranged<'scope, Tr1>, name: &str, logic: L, push: P) -> Arranged<'scope, TraceInter> +where + Tr1: TraceReader + Clone + 'static, + Tr2: for<'a> Trace=Tr1::Key<'a>, ValOwn: Data, Time = Tr1::Time> + 'static, + Bu: Builder, + L: FnMut(Tr1::Key<'_>, &[(Tr1::Val<'_>, Tr1::Diff)], &mut Vec<(Tr2::ValOwn,Tr2::Diff)>, &mut Vec<(Tr2::ValOwn, Tr2::Diff)>)+'static, + P: FnMut(&mut Bu::Input, Tr1::Key<'_>, &mut Vec<(Tr2::ValOwn, Tr2::Time, Tr2::Diff)>) + 'static, +{ + reduce_trace::(trace, name, logic, push, FrontierInterest::Always) +} + +/// A key-wise reduction of values in an input trace, producing a `TraceIntra`. +/// +/// The returned trace cannot be imported into other dataflows (it has no `import` method). +/// In exchange, the operator is only scheduled when it holds capabilities. +pub fn reduce_trace_intra<'scope, Tr1, Bu, Tr2, L, P>(trace: Arranged<'scope, Tr1>, name: &str, logic: L, push: P) -> Arranged<'scope, TraceIntra> +where + Tr1: TraceReader + Clone + 'static, + Tr2: for<'a> Trace=Tr1::Key<'a>, ValOwn: Data, Time = Tr1::Time> + 'static, + Bu: Builder, + L: FnMut(Tr1::Key<'_>, &[(Tr1::Val<'_>, Tr1::Diff)], &mut Vec<(Tr2::ValOwn,Tr2::Diff)>, &mut Vec<(Tr2::ValOwn, Tr2::Diff)>)+'static, + P: FnMut(&mut Bu::Input, Tr1::Key<'_>, &mut Vec<(Tr2::ValOwn, Tr2::Time, Tr2::Diff)>) + 'static, +{ + reduce_trace::(trace, name, logic, push, FrontierInterest::IfCapability).into_intra() +} + /// A key-wise reduction of values in an input trace. /// /// This method exists to provide reduce functionality without opinions about qualifying trace types. @@ -31,7 +61,10 @@ use crate::trace::TraceReader; /// the value updates, as appropriate for the container. It is critical that it clear the container as /// the operator has no ability to do this otherwise, and failing to do so represents a leak from one /// key's computation to another, and will likely introduce non-determinism. -pub fn reduce_trace<'scope, Tr1, Bu, Tr2, L, P>(trace: Arranged<'scope, Tr1>, name: &str, mut logic: L, mut push: P) -> Arranged<'scope, TraceAgent> +/// +/// The `interest` parameter controls when the operator is notified of frontier changes; +/// `reduce_trace_inter` and `reduce_trace_intra` are the common wrappers. +pub fn reduce_trace<'scope, Tr1, Bu, Tr2, L, P>(trace: Arranged<'scope, Tr1>, name: &str, mut logic: L, mut push: P, interest: FrontierInterest) -> Arranged<'scope, TraceInter> where Tr1: TraceReader + Clone + 'static, Tr2: for<'a> Trace=Tr1::Key<'a>, ValOwn: Data, Time = Tr1::Time> + 'static, @@ -39,32 +72,36 @@ where L: FnMut(Tr1::Key<'_>, &[(Tr1::Val<'_>, Tr1::Diff)], &mut Vec<(Tr2::ValOwn,Tr2::Diff)>, &mut Vec<(Tr2::ValOwn, Tr2::Diff)>)+'static, P: FnMut(&mut Bu::Input, Tr1::Key<'_>, &mut Vec<(Tr2::ValOwn, Tr2::Time, Tr2::Diff)>) + 'static, { - let mut result_trace = None; + use timely::dataflow::operators::generic::builder_rc::OperatorBuilder; - // fabricate a data-parallel operator using the `unary_notify` pattern. - let stream = { + let scope = trace.stream.scope(); - let result_trace = &mut result_trace; - let scope = trace.stream.scope(); - trace.stream.unary_frontier(Pipeline, name, move |_capability, operator_info| { + let mut builder = OperatorBuilder::new(name.to_owned(), scope); + let operator_info = builder.operator_info(); - // Acquire a logger for arrange events. - let logger = scope.worker().logger_for::("differential/arrange").map(Into::into); + let mut input = builder.new_input(trace.stream, Pipeline); + builder.set_notify_for(0, interest); + let (mut output, stream) = builder.new_output(); - let activator = Some(scope.activator_for(operator_info.address.clone())); - let mut empty = Tr2::new(operator_info.clone(), logger.clone(), activator); - // If there is default exert logic set, install it. - if let Some(exert_logic) = scope.worker().config().get::("differential/default_exert_logic").cloned() { - empty.set_exert_logic(exert_logic); - } + // Acquire a logger for reduce events. + let logger = scope.worker().logger_for::("differential/arrange").map(Into::into); - let mut source_trace = trace.trace.clone(); + let activator = Some(scope.activator_for(operator_info.address.clone())); + let mut empty = Tr2::new(operator_info.clone(), logger.clone(), activator); + // If there is default exert logic set, install it. + if let Some(exert_logic) = scope.worker().config().get::("differential/default_exert_logic").cloned() { + empty.set_exert_logic(exert_logic); + } + + let mut source_trace = trace.trace.clone(); - let (mut output_reader, mut output_writer) = TraceAgent::new(empty, operator_info, logger); + let (mut output_reader, mut output_writer) = TraceInter::new(empty, operator_info, logger); - *result_trace = Some(output_reader.clone()); + let result_trace = output_reader.clone(); - let mut new_interesting_times = Vec::::new(); + builder.build(move |_capabilities| { + + let mut new_interesting_times = Vec::::new(); // Our implementation maintains a list of outstanding `(key, time)` synthetic interesting times, // sorted by (key, time), as well as capabilities for the lower envelope of the times. @@ -85,15 +122,18 @@ where let mut output_upper = Antichain::from_elem(::minimum()); let mut output_lower = Antichain::from_elem(::minimum()); - move |(input, frontier), output| { + move |frontiers| { - // The operator receives input batches, which it treats as contiguous and will collect and - // then process as one batch. It captures the input frontier from the batches, from the upstream - // trace, and from the input frontier, and retires the work through that interval. - // - // Reduce may retain capabilities and need to perform work and produce output at times that - // may not be seen in its input. The standard example is that updates at `(0, 1)` and `(1, 0)` - // may result in outputs at `(1, 1)` as well, even with no input at that time. + let frontier = &frontiers[0]; + let mut output = output.activate(); + + // The operator receives input batches, which it treats as contiguous and will collect and + // then process as one batch. It captures the input frontier from the batches, from the upstream + // trace, and from the input frontier, and retires the work through that interval. + // + // Reduce may retain capabilities and need to perform work and produce output at times that + // may not be seen in its input. The standard example is that updates at `(0, 1)` and `(1, 0)` + // may result in outputs at `(1, 1)` as well, even with no input at that time. let mut batch_cursors = Vec::new(); let mut batch_storage = Vec::new(); @@ -254,7 +294,7 @@ where let batch = builder.done(description); // ship batch to the output, and commit to the output trace. - output.session(&capabilities[index]).give(batch.clone()); + output.give(&capabilities[index], &mut vec![batch.clone()]); output_writer.insert(batch, Some(capabilities[index].time().clone())); output_lower.clear(); @@ -291,14 +331,12 @@ where output_reader.set_physical_compaction(upper_limit.borrow()); } - // Exert trace maintenance if we have been so requested. - output_writer.exert(); - } + // Exert trace maintenance if we have been so requested. + output_writer.exert(); } - ) - }; + }); - Arranged { stream, trace: result_trace.unwrap() } + Arranged { stream, trace: result_trace } } diff --git a/differential-dataflow/tests/import.rs b/differential-dataflow/tests/import.rs index 1bb2c9cc1..ccf3c5027 100644 --- a/differential-dataflow/tests/import.rs +++ b/differential-dataflow/tests/import.rs @@ -52,7 +52,7 @@ fn test_import_vanilla() { let (mut input, mut trace) = worker.dataflow(|scope| { let (input, edges) = scope.new_input(); let arranged = edges.as_collection() - .arrange_by_key(); + .arrange_by_key_inter(); (input, arranged.trace.clone()) }); let (captured,) = worker.dataflow(move |scope| { @@ -116,7 +116,7 @@ fn test_import_completed_dataflow() { let (mut input, mut trace, probe) = worker.dataflow(|scope| { let (input, edges) = scope.new_input(); let arranged = edges.as_collection() - .arrange_by_key(); + .arrange_by_key_inter(); let (probe, _) = arranged.stream.probe(); (input, arranged.trace.clone(), probe) }); @@ -182,7 +182,7 @@ fn test_import_stalled_dataflow() { let arranged = input .to_collection(scope) - .arrange_by_self(); + .arrange_by_self_inter(); let (probe, _) = arranged.stream.probe(); (arranged.trace, probe) @@ -229,7 +229,7 @@ fn import_skewed() { let (mut input, mut trace) = worker.dataflow(|scope| { let (input, edges) = scope.new_input(); let arranged = edges.as_collection() - .arrange_by_key(); + .arrange_by_key_inter(); (input, arranged.trace.clone()) }); diff --git a/dogsdogsdogs/src/lib.rs b/dogsdogsdogs/src/lib.rs index ce484e3b7..d68a72753 100644 --- a/dogsdogsdogs/src/lib.rs +++ b/dogsdogsdogs/src/lib.rs @@ -7,7 +7,7 @@ use timely::dataflow::operators::Concatenate; use differential_dataflow::{ExchangeData, VecCollection, AsCollection}; use differential_dataflow::difference::{Monoid, Multiply}; use differential_dataflow::lattice::Lattice; -use differential_dataflow::operators::arrange::TraceAgent; +use differential_dataflow::operators::arrange::TraceInter; pub mod altneu; pub mod calculus; @@ -92,8 +92,8 @@ impl<'scope, T: Timestamp, R: Monoid+Multiply, P, E> ValidateExtensi // These are all defined here so that users can be assured a common layout. use differential_dataflow::trace::implementations::{KeySpine, ValSpine}; -type TraceValHandle = TraceAgent>; -type TraceKeyHandle = TraceAgent>; +type TraceValHandle = TraceInter>; +type TraceKeyHandle = TraceInter>; pub struct CollectionIndex where @@ -139,16 +139,16 @@ where pub fn index<'scope>(collection: VecCollection<'scope, T, (K, V), R>) -> Self { // We need to count the number of (k, v) pairs and not rely on the given Monoid R and its binary addition operation. // counts and validate can share the base arrangement - let arranged = collection.clone().arrange_by_self(); + let arranged = collection.clone().arrange_by_self_inter(); // TODO: This could/should be arrangement to arrangement, via `reduce_abelian`, but the types are a mouthful at the moment. let counts = arranged .clone() .as_collection(|k,_v| k.clone()) .distinct() .map(|(k, _v)| k) - .arrange_by_self() + .arrange_by_self_inter() .trace; - let propose = collection.arrange_by_key().trace; + let propose = collection.arrange_by_key_inter().trace; let validate = arranged.trace; CollectionIndex { diff --git a/experiments/src/bin/deals-interactive.rs b/experiments/src/bin/deals-interactive.rs index 234152d59..0c6bdfdb6 100644 --- a/experiments/src/bin/deals-interactive.rs +++ b/experiments/src/bin/deals-interactive.rs @@ -9,10 +9,10 @@ use differential_dataflow::operators::*; use differential_dataflow::lattice::Lattice; use differential_dataflow::trace::implementations::ValSpine; -use differential_dataflow::operators::arrange::TraceAgent; +use differential_dataflow::operators::arrange::TraceIntra; use differential_dataflow::operators::arrange::Arranged; -type Arrange<'s, G, K, V, R> = Arranged<'s, TraceAgent>>; +type Arrange<'s, G, K, V, R> = Arranged<'s, TraceIntra>>; type Node = u32; diff --git a/experiments/src/bin/deals.rs b/experiments/src/bin/deals.rs index f20e3678f..3749f95b3 100644 --- a/experiments/src/bin/deals.rs +++ b/experiments/src/bin/deals.rs @@ -5,14 +5,14 @@ use differential_dataflow::VecCollection; use differential_dataflow::operators::*; use differential_dataflow::trace::implementations::{ValSpine, KeySpine, KeyBatcher, KeyBuilder, ValBatcher, ValBuilder}; -use differential_dataflow::operators::arrange::TraceAgent; +use differential_dataflow::operators::arrange::TraceIntra; use differential_dataflow::operators::arrange::Arranged; use differential_dataflow::operators::arrange::Arrange; use differential_dataflow::operators::iterate::Variable; use differential_dataflow::lattice::Lattice; use differential_dataflow::difference::Present; -type EdgeArranged<'s, G, K, V, R> = Arranged<'s, TraceAgent>>; +type EdgeArranged<'s, G, K, V, R> = Arranged<'s, TraceIntra>>; type Node = u32; type Edge = (Node, Node); diff --git a/experiments/src/bin/graphs-interactive-alt.rs b/experiments/src/bin/graphs-interactive-alt.rs index 838836d5e..78d4185b5 100644 --- a/experiments/src/bin/graphs-interactive-alt.rs +++ b/experiments/src/bin/graphs-interactive-alt.rs @@ -255,10 +255,10 @@ fn main() { } use differential_dataflow::trace::implementations::ValSpine; -use differential_dataflow::operators::arrange::TraceAgent; +use differential_dataflow::operators::arrange::TraceIntra; use differential_dataflow::operators::arrange::Arranged; -type Arrange<'s, T, K, V, R> = Arranged<'s, TraceAgent>>; +type Arrange<'s, T, K, V, R> = Arranged<'s, TraceIntra>>; // returns pairs (n, s) indicating node n can be reached from a root in s steps. fn three_hop<'s, T: Timestamp + Lattice + Ord>( diff --git a/experiments/src/bin/graphs-interactive-neu-zwei.rs b/experiments/src/bin/graphs-interactive-neu-zwei.rs index 8801a1948..e00c5b69a 100644 --- a/experiments/src/bin/graphs-interactive-neu-zwei.rs +++ b/experiments/src/bin/graphs-interactive-neu-zwei.rs @@ -223,10 +223,10 @@ fn main() { } use differential_dataflow::trace::implementations::ValSpine; -use differential_dataflow::operators::arrange::TraceAgent; +use differential_dataflow::operators::arrange::TraceIntra; use differential_dataflow::operators::arrange::Arranged; -type Arrange<'s, T, K, V, R> = Arranged<'s, TraceAgent>>; +type Arrange<'s, T, K, V, R> = Arranged<'s, TraceIntra>>; // returns pairs (n, s) indicating node n can be reached from a root in s steps. fn three_hop<'s, T: Timestamp + Lattice + Ord>( diff --git a/experiments/src/bin/graphs-interactive-neu.rs b/experiments/src/bin/graphs-interactive-neu.rs index e1f26ad29..aedab3dab 100644 --- a/experiments/src/bin/graphs-interactive-neu.rs +++ b/experiments/src/bin/graphs-interactive-neu.rs @@ -288,10 +288,10 @@ fn main() { } use differential_dataflow::trace::implementations::ValSpine; -use differential_dataflow::operators::arrange::TraceAgent; +use differential_dataflow::operators::arrange::TraceIntra; use differential_dataflow::operators::arrange::Arranged; -type Arrange<'s, T, K, V, R> = Arranged<'s, TraceAgent>>; +type Arrange<'s, T, K, V, R> = Arranged<'s, TraceIntra>>; // returns pairs (n, s) indicating node n can be reached from a root in s steps. fn three_hop<'s, T: Timestamp + Lattice + Ord>( diff --git a/experiments/src/bin/graphs-interactive.rs b/experiments/src/bin/graphs-interactive.rs index 29a6ee6b8..2ae6bce2f 100644 --- a/experiments/src/bin/graphs-interactive.rs +++ b/experiments/src/bin/graphs-interactive.rs @@ -191,10 +191,10 @@ fn main() { } use differential_dataflow::trace::implementations::ValSpine; -use differential_dataflow::operators::arrange::TraceAgent; +use differential_dataflow::operators::arrange::TraceIntra; use differential_dataflow::operators::arrange::Arranged; -type Arrange<'s, T, K, V, R> = Arranged<'s, TraceAgent>>; +type Arrange<'s, T, K, V, R> = Arranged<'s, TraceIntra>>; // returns pairs (n, s) indicating node n can be reached from a root in s steps. diff --git a/experiments/src/bin/graphs-static.rs b/experiments/src/bin/graphs-static.rs index fc6a5cb3d..826937366 100644 --- a/experiments/src/bin/graphs-static.rs +++ b/experiments/src/bin/graphs-static.rs @@ -46,7 +46,7 @@ fn main() { .to_stream(scope) .as_collection(); - edges.arrange_by_key().trace + edges.arrange_by_key_inter().trace }); while worker.step() { } @@ -84,7 +84,7 @@ fn main() { forward .import(scope) .as_collection(|&k,&v| (v,k)) - .arrange_by_key() + .arrange_by_key_inter() .trace }); while worker.step() { } @@ -100,9 +100,9 @@ fn main() { }).unwrap(); } -use differential_dataflow::operators::arrange::TraceAgent; +use differential_dataflow::operators::arrange::TraceInter; -type TraceHandle = TraceAgent; +type TraceHandle = TraceInter; fn reach<'s>( graph: &mut TraceHandle, diff --git a/experiments/src/bin/graphs.rs b/experiments/src/bin/graphs.rs index 633bb986c..eb686bd0c 100644 --- a/experiments/src/bin/graphs.rs +++ b/experiments/src/bin/graphs.rs @@ -34,7 +34,7 @@ fn main() { let (mut graph, mut trace) = worker.dataflow(|scope| { let (graph_input, graph) = scope.new_collection(); - let graph_indexed = graph.arrange_by_key(); + let graph_indexed = graph.arrange_by_key_inter(); // let graph_indexed = graph.arrange_by_key(); (graph_input, graph_indexed.trace) }); @@ -81,10 +81,9 @@ fn main() { }).unwrap(); } -// use differential_dataflow::trace::implementations::ord::OrdValSpine; -use differential_dataflow::operators::arrange::TraceAgent; +use differential_dataflow::operators::arrange::TraceInter; -type TraceHandle = TraceAgent; +type TraceHandle = TraceInter; fn reach<'s>( graph: &mut TraceHandle, diff --git a/interactive/examples/ddir_col.rs b/interactive/examples/ddir_col.rs index 6ece2eeae..162ac6707 100644 --- a/interactive/examples/ddir_col.rs +++ b/interactive/examples/ddir_col.rs @@ -92,7 +92,7 @@ mod render { use differential_dataflow::operators::iterate::Variable; use differential_dataflow::dynamic::pointstamp::{PointStamp, PointStampSummary}; use differential_dataflow::dynamic::feedback_summary; - use differential_dataflow::operators::arrange::{Arranged, TraceAgent}; + use differential_dataflow::operators::arrange::{Arranged, TraceIntra}; use columnar::Columnar; use super::types::*; use interactive::ir::{Node, LinearOp, Program, RowLike, eval_fields, eval_field_into, eval_condition}; @@ -103,7 +103,7 @@ mod render { type ConcreteTime = Product>; pub type Col<'scope> = Collection<'scope, ConcreteTime, DdirRecordedUpdates>; - type Arr<'scope> = Arranged<'scope, TraceAgent>>; + type Arr<'scope> = Arranged<'scope, TraceIntra>>; enum Rendered<'scope> { Collection(Col<'scope>), @@ -115,7 +115,7 @@ mod render { match self { Rendered::Collection(c) => c.clone(), Rendered::Arrangement(a) => { - super::columnar_support::as_recorded_updates::(a.clone()) + super::columnar_support::as_recorded_updates::(a.stream.clone()) } } } @@ -123,9 +123,9 @@ mod render { match self { Rendered::Arrangement(a) => a.clone(), Rendered::Collection(c) => { - use differential_dataflow::operators::arrange::arrangement::arrange_core; + use differential_dataflow::operators::arrange::arrangement::arrange_intra; use super::columnar::ColValBatcher; - arrange_core::<_, ColValBatcher, ColValBuilder, ColValSpine>(c.inner.clone(), timely::dataflow::channels::pact::Pipeline, "Arrange") + arrange_intra::<_, ColValBatcher, ColValBuilder, ColValSpine>(c.inner.clone(), timely::dataflow::channels::pact::Pipeline, "Arrange") } } } diff --git a/interactive/examples/ddir_vec.rs b/interactive/examples/ddir_vec.rs index b3104f223..abf3ca832 100644 --- a/interactive/examples/ddir_vec.rs +++ b/interactive/examples/ddir_vec.rs @@ -9,7 +9,7 @@ use differential_dataflow::operators::iterate::VecVariable; use differential_dataflow::dynamic::pointstamp::{PointStamp, PointStampSummary}; use differential_dataflow::dynamic::feedback_summary; use differential_dataflow::trace::implementations::ValSpine; -use differential_dataflow::operators::arrange::{Arranged, TraceAgent}; +use differential_dataflow::operators::arrange::{Arranged, TraceIntra}; use differential_dataflow::input::Input; use smallvec::smallvec as svec; @@ -20,7 +20,7 @@ use interactive::ir::{Node, LinearOp, Program, Diff, Id, Time, eval_fields, eval type Row = Vec; type DdirTime = Product>; type Col<'scope, T> = VecCollection<'scope, T, (Row, Row), Diff>; -type Arr<'scope, T> = Arranged<'scope, TraceAgent>>; +type Arr<'scope, T> = Arranged<'scope, TraceIntra>>; enum Rendered<'scope, T: timely::progress::Timestamp + differential_dataflow::lattice::Lattice> { Collection(Col<'scope, T>), diff --git a/server/dataflows/random_graph/src/lib.rs b/server/dataflows/random_graph/src/lib.rs index 971646c04..2320d374b 100644 --- a/server/dataflows/random_graph/src/lib.rs +++ b/server/dataflows/random_graph/src/lib.rs @@ -180,7 +180,7 @@ pub fn build((dataflow, handles, probe, timer, args): Environment) -> Result<(), }) .probe_with(probe) .as_collection() - .arrange_by_key() + .arrange_by_key_inter() .trace; // release all blocks on merging. @@ -193,4 +193,4 @@ pub fn build((dataflow, handles, probe, timer, args): Environment) -> Result<(), println!("handles set"); Ok(()) -} \ No newline at end of file +} diff --git a/server/src/lib.rs b/server/src/lib.rs index cdf674b68..76f436526 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -9,13 +9,13 @@ use timely::dataflow::Scope; use timely::dataflow::operators::probe::Handle as ProbeHandle; // stuff for talking about shared trace types ... -use differential_dataflow::operators::arrange::TraceAgent; +use differential_dataflow::operators::arrange::TraceInter; use differential_dataflow::trace::implementations::ValSpine; // These are all defined here so that users can be assured a common layout. pub type RootTime = usize; type TraceSpine = ValSpine; -pub type TraceHandle = TraceAgent; +pub type TraceHandle = TraceInter; /// Arguments provided to each shared library to help build their dataflows and register their results. pub type Environment<'a, 'b> = (