diff --git a/diagnostics/src/logging.rs b/diagnostics/src/logging.rs index f0fac8d2b..c16490865 100644 --- a/diagnostics/src/logging.rs +++ b/diagnostics/src/logging.rs @@ -243,10 +243,10 @@ fn quantize(time: Duration, interval: Duration) -> Duration { } /// Quantize timestamps in a collection's inner stream. -fn quantize_collection( - collection: VecCollection, +fn quantize_collection<'scope, D>( + collection: VecCollection<'scope, Duration, D, i64>, interval: Duration, -) -> VecCollection +) -> VecCollection<'scope, Duration, D, i64> where D: differential_dataflow::Data, { @@ -400,11 +400,11 @@ fn install_loggers( // ============================================================================ /// Internal: collections before arrangement, used for the cross-join. -struct TimelyCollections { - operators: VecCollection), i64>, - channels: VecCollection, (usize, usize), (usize, usize)), i64>, - elapsed: VecCollection, - messages: VecCollection, +struct TimelyCollections<'scope> { + operators: VecCollection<'scope, Duration, (usize, String, Vec), i64>, + channels: VecCollection<'scope, Duration, (usize, Vec, (usize, usize), (usize, usize)), i64>, + elapsed: VecCollection<'scope, Duration, usize, i64>, + messages: VecCollection<'scope, Duration, usize, i64>, } #[derive(Default)] @@ -414,10 +414,10 @@ struct TimelyDemuxState { } /// Build timely logging collections and arrangements. -fn construct_timely( - scope: &mut Scope, - stream: Stream>, -) -> (TimelyTraces, TimelyCollections) { +fn construct_timely<'scope>( + scope: &mut Scope<'scope, Duration>, + stream: Stream<'scope, Duration, Vec<(Duration, TimelyEvent)>>, +) -> (TimelyTraces, TimelyCollections<'scope>) { type OpUpdate = ((usize, String, Vec), Duration, i64); type ChUpdate = ((usize, Vec, (usize, usize), (usize, usize)), Duration, i64); type ElUpdate = (usize, Duration, i64); @@ -534,21 +534,21 @@ fn construct_timely( // ============================================================================ /// Internal: collections before arrangement, used for the cross-join. -struct DifferentialCollections { - arrangement_batches: VecCollection, - arrangement_records: VecCollection, - sharing: VecCollection, - batcher_records: VecCollection, - batcher_size: VecCollection, - batcher_capacity: VecCollection, - batcher_allocations: VecCollection, +struct DifferentialCollections<'scope> { + arrangement_batches: VecCollection<'scope, Duration, usize, i64>, + arrangement_records: VecCollection<'scope, Duration, usize, i64>, + sharing: VecCollection<'scope, Duration, usize, i64>, + batcher_records: VecCollection<'scope, Duration, usize, i64>, + batcher_size: VecCollection<'scope, Duration, usize, i64>, + batcher_capacity: VecCollection<'scope, Duration, usize, i64>, + batcher_allocations: VecCollection<'scope, Duration, usize, i64>, } /// Build differential logging collections and arrangements. -fn construct_differential( - scope: &mut Scope, - stream: Stream>, -) -> (DifferentialTraces, DifferentialCollections) { +fn construct_differential<'scope>( + scope: &mut Scope<'scope, Duration>, + stream: Stream<'scope, Duration, Vec<(Duration, DifferentialEvent)>>, +) -> (DifferentialTraces, DifferentialCollections<'scope>) { type Update = (usize, Duration, i64); let mut demux = OperatorBuilder::new("Differential Demux".to_string(), scope.clone()); diff --git a/differential-dataflow/examples/bfs.rs b/differential-dataflow/examples/bfs.rs index db97f3938..f8abe4873 100644 --- a/differential-dataflow/examples/bfs.rs +++ b/differential-dataflow/examples/bfs.rs @@ -90,7 +90,7 @@ fn main() { } // returns pairs (n, s) indicating node n can be reached from a root in s steps. -fn bfs(edges: VecCollection, roots: VecCollection) -> VecCollection +fn bfs<'scope, T>(edges: VecCollection<'scope, T, Edge>, roots: VecCollection<'scope, T, Node>) -> VecCollection<'scope, T, (Node, u32)> where T: timely::progress::Timestamp + Lattice + Ord, { diff --git a/differential-dataflow/examples/columnar/main.rs b/differential-dataflow/examples/columnar/main.rs index a6a2b9ccd..3ccaec945 100644 --- a/differential-dataflow/examples/columnar/main.rs +++ b/differential-dataflow/examples/columnar/main.rs @@ -109,10 +109,10 @@ mod reachability { /// Compute the set of nodes reachable from `roots` along directed `edges`. /// /// Returns `(node, ())` for each reachable node. - pub fn reach( - edges: Collection>, - roots: Collection>, - ) -> Collection> + pub fn reach<'scope>( + edges: Collection<'scope, Time, RecordedUpdates<(Node, Node, Time, Diff)>>, + roots: Collection<'scope, Time, RecordedUpdates<(Node, (), Time, Diff)>>, + ) -> Collection<'scope, Time, RecordedUpdates<(Node, (), Time, Diff)>> { let outer = edges.inner.scope(); diff --git a/differential-dataflow/examples/dynamic.rs b/differential-dataflow/examples/dynamic.rs index c0206fbde..89ca2b2ae 100644 --- a/differential-dataflow/examples/dynamic.rs +++ b/differential-dataflow/examples/dynamic.rs @@ -90,7 +90,7 @@ fn main() { } // returns pairs (n, s) indicating node n can be reached from a root in s steps. -fn bfs(edges: VecCollection, roots: VecCollection) -> VecCollection +fn bfs<'scope, T>(edges: VecCollection<'scope, T, Edge>, roots: VecCollection<'scope, T, Node>) -> VecCollection<'scope, T, (Node, u32)> where T: timely::progress::Timestamp + Lattice + Ord, { diff --git a/differential-dataflow/examples/graspan.rs b/differential-dataflow/examples/graspan.rs index 4725cf4af..2f4ae3763 100644 --- a/differential-dataflow/examples/graspan.rs +++ b/differential-dataflow/examples/graspan.rs @@ -68,7 +68,7 @@ use differential_dataflow::operators::arrange::{Arranged, TraceAgent}; type TraceKeyHandle = TraceAgent>; type TraceValHandle = TraceAgent>; -type Arrange = Arranged>; +type Arrange<'scope, T,K,V,R> = Arranged<'scope, TraceValHandle>; /// An evolving set of edges. /// @@ -78,17 +78,17 @@ type Arrange = Arranged>; /// /// An edge variable provides arranged representations of its contents, even before they are /// completely defined, in support of recursively defined productions. -pub struct EdgeVariable { - variable: VecVariable, - collection: VecCollection, - current: VecCollection, - forward: Option>, - reverse: Option>, +pub struct EdgeVariable<'scope, T: Timestamp + Lattice> { + variable: VecVariable<'scope, T, Edge, Diff>, + collection: VecCollection<'scope, T, Edge, Diff>, + current: VecCollection<'scope, T, Edge, Diff>, + forward: Option>, + reverse: Option>, } -impl EdgeVariable { +impl<'scope, T: Timestamp + Lattice> EdgeVariable<'scope, T> { /// Creates a new variable initialized with `source`. - pub fn from(source: VecCollection, step: T::Summary) -> Self { + pub fn from(source: VecCollection<'scope, T, Edge>, step: T::Summary) -> Self { let (variable, collection) = VecVariable::new(&mut source.scope(), step); EdgeVariable { variable, @@ -99,7 +99,7 @@ impl EdgeVariable { } } /// Concatenates `production` into the definition of the variable. - pub fn add_production(&mut self, production: VecCollection) { + pub fn add_production(&mut self, production: VecCollection<'scope, T, Edge, Diff>) { self.current = self.current.clone().concat(production); } /// Finalizes the variable, connecting its recursive definition. @@ -112,14 +112,14 @@ impl EdgeVariable { self.variable.set(distinct); } /// The collection arranged in the forward direction. - pub fn forward(&mut self) -> &Arrange { + pub fn forward(&mut self) -> &Arrange<'scope, T, Node, Node, Diff> { if self.forward.is_none() { self.forward = Some(self.collection.clone().arrange_by_key()); } self.forward.as_ref().unwrap() } /// The collection arranged in the reverse direction. - pub fn reverse(&mut self) -> &Arrange { + pub fn reverse(&mut self) -> &Arrange<'scope, T, Node, Node, Diff> { if self.reverse.is_none() { self.reverse = Some(self.collection.clone().map(|(x,y)| (y,x)).arrange_by_key()); } diff --git a/differential-dataflow/examples/interpreted.rs b/differential-dataflow/examples/interpreted.rs index 31a622108..aab1e097c 100644 --- a/differential-dataflow/examples/interpreted.rs +++ b/differential-dataflow/examples/interpreted.rs @@ -36,7 +36,7 @@ fn main() { }).unwrap(); } -fn interpret(edges: VecCollection, relations: &[(usize, usize)]) -> VecCollection> +fn interpret<'scope, T>(edges: VecCollection<'scope, T, Edge>, relations: &[(usize, usize)]) -> VecCollection<'scope, T, Vec> where T: timely::progress::Timestamp + Lattice + Hash + Ord, { diff --git a/differential-dataflow/examples/monoid-bfs.rs b/differential-dataflow/examples/monoid-bfs.rs index f658cce0f..027384cd8 100644 --- a/differential-dataflow/examples/monoid-bfs.rs +++ b/differential-dataflow/examples/monoid-bfs.rs @@ -121,7 +121,7 @@ fn main() { } // returns pairs (n, s) indicating node n can be reached from a root in s steps. -fn bfs(edges: VecCollection, roots: VecCollection) -> VecCollection +fn bfs<'scope, T>(edges: VecCollection<'scope, T, Edge, MinSum>, roots: VecCollection<'scope, T, Node, MinSum>) -> VecCollection<'scope, T, Node, MinSum> where T: timely::progress::Timestamp + Lattice + Ord, { diff --git a/differential-dataflow/examples/progress.rs b/differential-dataflow/examples/progress.rs index 0229263d8..20550b7ea 100644 --- a/differential-dataflow/examples/progress.rs +++ b/differential-dataflow/examples/progress.rs @@ -113,11 +113,11 @@ fn main() { /// /// The computation to determine this, and to maintain it as times change, is an iterative /// computation that propagates times and maintains the minimal elements at each location. -fn frontier( - nodes: VecCollection, - edges: VecCollection, - times: VecCollection, -) -> VecCollection +fn frontier<'scope, G, T>( + nodes: VecCollection<'scope, G, (Target, Source, T::Summary)>, + edges: VecCollection<'scope, G, (Source, Target)>, + times: VecCollection<'scope, G, (Location, T)>, +) -> VecCollection<'scope, G, (Location, T)> where G: Timestamp + Lattice + Ord, T: Timestamp+std::hash::Hash, @@ -125,7 +125,7 @@ where // Translate node and edge transitions into a common Location to Location edge with an associated Summary. let nodes = nodes.map(|(target, source, summary)| (Location::from(target), (Location::from(source), summary))); let edges = edges.map(|(source, target)| (Location::from(source), (Location::from(target), Default::default()))); - let transitions: VecCollection = nodes.concat(edges); + let transitions: VecCollection<'scope, G, (Location, (Location, T::Summary))> = nodes.concat(edges); times .clone() @@ -148,10 +148,10 @@ where } /// Summary paths from locations to operator zero inputs. -fn summarize( - nodes: VecCollection, - edges: VecCollection, -) -> VecCollection +fn summarize<'scope, G, T>( + nodes: VecCollection<'scope, G, (Target, Source, T::Summary)>, + edges: VecCollection<'scope, G, (Source, Target)>, +) -> VecCollection<'scope, G, (Location, (Location, T::Summary))> where G: Timestamp + Lattice + Ord, T: Timestamp, @@ -167,7 +167,7 @@ where // Retain node connections along "default" timestamp summaries. let nodes = nodes.map(|(target, source, summary)| (Location::from(source), (Location::from(target), summary))); let edges = edges.map(|(source, target)| (Location::from(target), (Location::from(source), Default::default()))); - let transitions: VecCollection = nodes.concat(edges); + let transitions: VecCollection<'scope, G, (Location, (Location, T::Summary))> = nodes.concat(edges); zero_inputs .clone() @@ -193,10 +193,10 @@ where /// Identifies cycles along paths that do not increment timestamps. -fn find_cycles( - nodes: VecCollection, - edges: VecCollection, -) -> VecCollection +fn find_cycles<'scope, G: Timestamp, T: Timestamp>( + nodes: VecCollection<'scope, G, (Target, Source, T::Summary)>, + edges: VecCollection<'scope, G, (Source, Target)>, +) -> VecCollection<'scope, G, (Location, Location)> where G: Timestamp + Lattice + Ord, T: Timestamp, @@ -211,7 +211,7 @@ where } }); let edges = edges.map(|(source, target)| (Location::from(source), Location::from(target))); - let transitions: VecCollection = nodes.concat(edges); + let transitions: VecCollection<'scope, G, (Location, Location)> = nodes.concat(edges); // Repeatedly restrict to locations with an incoming path. transitions diff --git a/differential-dataflow/examples/stackoverflow.rs b/differential-dataflow/examples/stackoverflow.rs index 5d25b19b1..234930932 100644 --- a/differential-dataflow/examples/stackoverflow.rs +++ b/differential-dataflow/examples/stackoverflow.rs @@ -104,7 +104,7 @@ fn main() { } // returns pairs (n, s) indicating node n can be reached from a root in s steps. -fn bfs(edges: VecCollection, roots: VecCollection) -> VecCollection +fn bfs<'scope, T>(edges: VecCollection<'scope, T, Edge>, roots: VecCollection<'scope, T, Node>) -> VecCollection<'scope, T, (Node, u32)> where T: timely::progress::Timestamp + Lattice + Ord, { diff --git a/differential-dataflow/src/algorithms/graphs/bfs.rs b/differential-dataflow/src/algorithms/graphs/bfs.rs index a9d5ed307..e1756b073 100644 --- a/differential-dataflow/src/algorithms/graphs/bfs.rs +++ b/differential-dataflow/src/algorithms/graphs/bfs.rs @@ -9,7 +9,7 @@ use crate::operators::*; use crate::lattice::Lattice; /// Returns pairs (node, dist) indicating distance of each node from a root. -pub fn bfs(edges: VecCollection, roots: VecCollection) -> VecCollection +pub fn bfs<'scope, T, N>(edges: VecCollection<'scope, T, (N,N)>, roots: VecCollection<'scope, T, N>) -> VecCollection<'scope, T, (N,u32)> where T: Timestamp + Lattice + Ord, N: ExchangeData+Hash, @@ -22,7 +22,7 @@ use crate::trace::TraceReader; use crate::operators::arrange::Arranged; /// Returns pairs (node, dist) indicating distance of each node from a root. -pub fn bfs_arranged(edges: Arranged, roots: VecCollection) -> VecCollection +pub fn bfs_arranged<'scope, N, Tr>(edges: Arranged<'scope, Tr>, roots: VecCollection<'scope, Tr::Time, N>) -> VecCollection<'scope, Tr::Time, (N, u32)> where N: ExchangeData+Hash, Tr: for<'a> TraceReader=&'a N, Val<'a>=&'a N, Diff=isize>+Clone+'static, diff --git a/differential-dataflow/src/algorithms/graphs/bijkstra.rs b/differential-dataflow/src/algorithms/graphs/bijkstra.rs index 82f47ecec..f6dfa520e 100644 --- a/differential-dataflow/src/algorithms/graphs/bijkstra.rs +++ b/differential-dataflow/src/algorithms/graphs/bijkstra.rs @@ -19,7 +19,7 @@ use crate::operators::iterate::Variable; /// Goals that cannot reach from the source to the target are relatively expensive, as /// the entire graph must be explored to confirm this. A graph connectivity pre-filter /// could be good insurance here. -pub fn bidijkstra(edges: VecCollection, goals: VecCollection) -> VecCollection +pub fn bidijkstra<'scope, T, N>(edges: VecCollection<'scope, T, (N,N)>, goals: VecCollection<'scope, T, (N,N)>) -> VecCollection<'scope, T, ((N,N), u32)> where T: Timestamp + Lattice + Ord, N: ExchangeData+Hash, @@ -33,11 +33,11 @@ use crate::trace::TraceReader; use crate::operators::arrange::Arranged; /// Bi-directional Dijkstra search using arranged forward and reverse edge collections. -pub fn bidijkstra_arranged( - forward: Arranged, - reverse: Arranged, - goals: VecCollection -) -> VecCollection +pub fn bidijkstra_arranged<'scope, N, Tr>( + forward: Arranged<'scope, Tr>, + reverse: Arranged<'scope, Tr>, + goals: VecCollection<'scope, Tr::Time, (N,N)> +) -> VecCollection<'scope, Tr::Time, ((N,N), u32)> where N: ExchangeData+Hash, Tr: for<'a> TraceReader=&'a N, Val<'a>=&'a N, Diff=isize>+Clone+'static, diff --git a/differential-dataflow/src/algorithms/graphs/propagate.rs b/differential-dataflow/src/algorithms/graphs/propagate.rs index 54efb079d..033ec3e9f 100644 --- a/differential-dataflow/src/algorithms/graphs/propagate.rs +++ b/differential-dataflow/src/algorithms/graphs/propagate.rs @@ -13,7 +13,7 @@ use crate::difference::{Abelian, Multiply}; /// This algorithm naively propagates all labels at once, much like standard label propagation. /// To more carefully control the label propagation, consider `propagate_core` which supports a /// method to limit the introduction of labels. -pub fn propagate(edges: VecCollection, nodes: VecCollection) -> VecCollection +pub fn propagate<'scope, T, N, L, R>(edges: VecCollection<'scope, T, (N,N), R>, nodes: VecCollection<'scope, T,(N,L),R>) -> VecCollection<'scope, T,(N,L),R> where T: Timestamp + Lattice + Ord + Hash, N: ExchangeData+Hash, @@ -30,7 +30,7 @@ where /// This algorithm naively propagates all labels at once, much like standard label propagation. /// To more carefully control the label propagation, consider `propagate_core` which supports a /// method to limit the introduction of labels. -pub fn propagate_at(edges: VecCollection, nodes: VecCollection, logic: F) -> VecCollection +pub fn propagate_at<'scope, T, N, L, F, R>(edges: VecCollection<'scope, T, (N,N), R>, nodes: VecCollection<'scope, T,(N,L),R>, logic: F) -> VecCollection<'scope, T,(N,L),R> where T: Timestamp + Lattice + Ord + Hash, N: ExchangeData+Hash, @@ -51,7 +51,7 @@ use crate::operators::arrange::arrangement::Arranged; /// This variant takes a pre-arranged edge collection, to facilitate re-use, and allows /// a method `logic` to specify the rounds in which we introduce various labels. The output /// of `logic should be a number in the interval \[0,64\], -pub fn propagate_core(edges: Arranged, nodes: VecCollection, logic: F) -> VecCollection +pub fn propagate_core<'scope, N, L, Tr, F, R>(edges: Arranged<'scope, Tr>, nodes: VecCollection<'scope, Tr::Time,(N,L),R>, logic: F) -> VecCollection<'scope, Tr::Time,(N,L),R> where N: ExchangeData+Hash, R: ExchangeData+Abelian, diff --git a/differential-dataflow/src/algorithms/graphs/scc.rs b/differential-dataflow/src/algorithms/graphs/scc.rs index f158cdc08..d50a5a05d 100644 --- a/differential-dataflow/src/algorithms/graphs/scc.rs +++ b/differential-dataflow/src/algorithms/graphs/scc.rs @@ -12,7 +12,7 @@ use crate::difference::{Abelian, Multiply}; use super::propagate::propagate; /// Returns the subset of edges in the same strongly connected component. -pub fn strongly_connected(graph: VecCollection) -> VecCollection +pub fn strongly_connected<'scope, T, N, R>(graph: VecCollection<'scope, T, (N,N), R>) -> VecCollection<'scope, T, (N,N), R> where T: Timestamp + Lattice + Ord + Hash, N: ExchangeData + Hash, @@ -36,8 +36,8 @@ where }) } -fn trim_edges(cycle: VecCollection, edges: VecCollection) - -> VecCollection +fn trim_edges<'scope, T, N, R>(cycle: VecCollection<'scope, T, (N,N), R>, edges: VecCollection<'scope, T, (N,N), R>) + -> VecCollection<'scope, T, (N,N), R> where T: Timestamp + Lattice + Ord + Hash, N: ExchangeData + Hash, diff --git a/differential-dataflow/src/algorithms/graphs/sequential.rs b/differential-dataflow/src/algorithms/graphs/sequential.rs index 8ba203fa4..e264ed421 100644 --- a/differential-dataflow/src/algorithms/graphs/sequential.rs +++ b/differential-dataflow/src/algorithms/graphs/sequential.rs @@ -9,7 +9,7 @@ use crate::lattice::Lattice; use crate::operators::*; use crate::hashable::Hashable; -fn _color(edges: VecCollection) -> VecCollection)> +fn _color<'scope, T, N>(edges: VecCollection<'scope, T, (N,N)>) -> VecCollection<'scope, T,(N,Option)> where T: Timestamp + Lattice + Ord + Hash, N: ExchangeData+Hash, @@ -40,10 +40,10 @@ where /// a node "fires" once all of its neighbors with lower identifier have /// fired, and we apply `logic` to the new state of lower neighbors and /// the old state (input) of higher neighbors. -pub fn sequence( - state: VecCollection, - edges: VecCollection, - logic: F) -> VecCollection)> +pub fn sequence<'scope, T, N, V, F>( + state: VecCollection<'scope, T, (N,V)>, + edges: VecCollection<'scope, T, (N,N)>, + logic: F) -> VecCollection<'scope, T, (N,Option)> where T: Timestamp + Lattice + Hash + Ord, N: ExchangeData+Hashable, diff --git a/differential-dataflow/src/algorithms/identifiers.rs b/differential-dataflow/src/algorithms/identifiers.rs index 8c0678a53..c348613e7 100644 --- a/differential-dataflow/src/algorithms/identifiers.rs +++ b/differential-dataflow/src/algorithms/identifiers.rs @@ -8,7 +8,7 @@ use crate::operators::*; use crate::difference::Abelian; /// Assign unique identifiers to elements of a collection. -pub trait Identifiers { +pub trait Identifiers<'scope, T: Timestamp, D: ExchangeData, R: ExchangeData+Abelian> { /// Assign unique identifiers to elements of a collection. /// /// # Example @@ -27,16 +27,16 @@ pub trait Identifiers { /// .assert_empty(); /// }); /// ``` - fn identifiers(self) -> VecCollection; + fn identifiers(self) -> VecCollection<'scope, T, (D, u64), R>; } -impl Identifiers for VecCollection +impl<'scope, T, D, R> Identifiers<'scope, T, D, R> for VecCollection<'scope, T, D, R> where T: Timestamp + Lattice, D: ExchangeData + ::std::hash::Hash, R: ExchangeData + Abelian, { - fn identifiers(self) -> VecCollection { + fn identifiers(self) -> VecCollection<'scope, T, (D, u64), R> { // The design here is that we iteratively develop a collection // of pairs (round, record), where each pair is a proposal that diff --git a/differential-dataflow/src/algorithms/prefix_sum.rs b/differential-dataflow/src/algorithms/prefix_sum.rs index f7401d892..e8d1b0f4e 100644 --- a/differential-dataflow/src/algorithms/prefix_sum.rs +++ b/differential-dataflow/src/algorithms/prefix_sum.rs @@ -7,7 +7,7 @@ use crate::lattice::Lattice; use crate::operators::*; /// Extension trait for the prefix_sum method. -pub trait PrefixSum { +pub trait PrefixSum<'scope, T: Timestamp, K, D> { /// Computes the prefix sum for each element in the collection. /// /// The prefix sum is data-parallel, in the sense that the sums are computed independently for @@ -16,10 +16,10 @@ pub trait PrefixSum { fn prefix_sum(self, zero: D, combine: F) -> Self where F: Fn(&K,&D,&D)->D + 'static; /// Determine the prefix sum at each element of `location`. - fn prefix_sum_at(self, locations: VecCollection, zero: D, combine: F) -> Self where F: Fn(&K,&D,&D)->D + 'static; + fn prefix_sum_at(self, locations: VecCollection<'scope, T, (usize, K)>, zero: D, combine: F) -> Self where F: Fn(&K,&D,&D)->D + 'static; } -impl PrefixSum for VecCollection +impl<'scope, T, K, D> PrefixSum<'scope, T, K, D> for VecCollection<'scope, T, ((usize, K), D)> where T: Timestamp + Lattice, K: ExchangeData + ::std::hash::Hash, @@ -29,7 +29,7 @@ where self.clone().prefix_sum_at(self.map(|(x,_)| x), zero, combine) } - fn prefix_sum_at(self, locations: VecCollection, zero: D, combine: F) -> Self where F: Fn(&K,&D,&D)->D + 'static { + fn prefix_sum_at(self, locations: VecCollection<'scope, T, (usize, K)>, zero: D, combine: F) -> Self where F: Fn(&K,&D,&D)->D + 'static { let combine1 = ::std::rc::Rc::new(combine); let combine2 = combine1.clone(); @@ -40,7 +40,7 @@ where } /// Accumulate data in `collection` into all powers-of-two intervals containing them. -pub fn aggregate(collection: VecCollection, combine: F) -> VecCollection +pub fn aggregate<'scope, T, K, D, F>(collection: VecCollection<'scope, T, ((usize, K), D)>, combine: F) -> VecCollection<'scope, T, ((usize, usize, K), D)> where T: Timestamp + Lattice, K: ExchangeData + ::std::hash::Hash, @@ -73,18 +73,17 @@ where } /// Produces the accumulated values at each of the `usize` locations in `queries`. -pub fn broadcast( - ranges: VecCollection, - queries: VecCollection, +pub fn broadcast<'scope, T, K, D, F>( + ranges: VecCollection<'scope, T, ((usize, usize, K), D)>, + queries: VecCollection<'scope, T, (usize, K)>, zero: D, - combine: F) -> VecCollection + combine: F) -> VecCollection<'scope, T, ((usize, K), D)> where T: Timestamp + Lattice + Ord + ::std::fmt::Debug, K: ExchangeData + ::std::hash::Hash, D: ExchangeData + ::std::hash::Hash, F: Fn(&K,&D,&D)->D + 'static, { - let zero0 = zero.clone(); let zero1 = zero.clone(); let zero2 = zero.clone(); diff --git a/differential-dataflow/src/collection.rs b/differential-dataflow/src/collection.rs index d33463863..8d5377498 100644 --- a/differential-dataflow/src/collection.rs +++ b/differential-dataflow/src/collection.rs @@ -21,7 +21,7 @@ use crate::difference::Abelian; /// in order to expose some of this functionality (e.g. negation, timestamp manipulation). Other actions /// on the containers, and streams of containers, are left to the container implementor to describe. #[derive(Clone)] -pub struct Collection { +pub struct Collection<'scope, T: Timestamp, C: 'static> { /// The underlying timely dataflow stream. /// /// This field is exposed to support direct timely dataflow manipulation when required, but it is @@ -30,10 +30,10 @@ pub struct Collection { /// The timestamp in the data is required to always be at least the timestamp _of_ the data, in /// the timely-dataflow sense. If this invariant is not upheld, differential operators may behave /// unexpectedly. - pub inner: Stream, + pub inner: Stream<'scope, T, C>, } -impl Collection { +impl<'scope, T: Timestamp, C> Collection<'scope, T, C> { /// Creates a new Collection from a timely dataflow stream. /// /// This method seems to be rarely used, with the `as_collection` method on streams being a more @@ -43,9 +43,9 @@ impl Collection { /// /// This stream should satisfy the timestamp invariant as documented on [Collection]; this /// method does not check it. - pub fn new(stream: Stream) -> Self { Self { inner: stream } } + pub fn new(stream: Stream<'scope, T, C>) -> Self { Self { inner: stream } } } -impl Collection { +impl<'scope, T: Timestamp, C: Container> Collection<'scope, T, C> { /// Creates a new collection accumulating the contents of the two collections. /// /// Despite the name, differential dataflow collections are unordered. This method is so named because the @@ -108,7 +108,7 @@ impl Collection { /// /// This method is a specialization of `enter` to the case where the nested scope is a region. /// It removes the need for an operator that adjusts the timestamp. - pub fn enter_region(self, child: &Scope) -> Self { + pub fn enter_region<'inner>(self, child: &Scope<'inner, T>) -> Collection<'inner, T, C> { self.inner .enter(child) .as_collection() @@ -157,7 +157,7 @@ impl Collection { Self::new(self.inner.probe_with(handle)) } /// The scope containing the underlying timely dataflow stream. - pub fn scope(&self) -> Scope { + pub fn scope(&self) -> Scope<'scope, T> { self.inner.scope() } @@ -214,7 +214,7 @@ impl Collection { /// data.assert_eq(result); /// }); /// ``` - pub fn enter(self, child: &Scope) -> Collection>::InnerContainer> + pub fn enter<'inner, TInner>(self, child: &Scope<'inner, TInner>) -> Collection<'inner, TInner, >::InnerContainer> where C: containers::Enter, TInner: Refines, @@ -264,7 +264,7 @@ impl Collection { use timely::progress::timestamp::Refines; /// Methods requiring a nested scope. -impl Collection +impl<'scope, T: Timestamp, C: Container> Collection<'scope, T, C> { /// Returns the final value of a Collection from a nested scope to its containing scope. /// @@ -287,7 +287,7 @@ impl Collection /// data.assert_eq(result); /// }); /// ``` - pub fn leave(self, outer: &Scope) -> Collection>::OuterContainer> + pub fn leave<'outer, TOuter>(self, outer: &Scope<'outer, TOuter>) -> Collection<'outer, TOuter, >::OuterContainer> where TOuter: Timestamp, T: Refines, @@ -306,10 +306,7 @@ impl Collection /// /// This method is a specialization of `leave` to the case that of a nested region. /// It removes the need for an operator that adjusts the timestamp. - pub fn leave_region(self, outer: &Scope) -> Self - where - C: Clone + 'static, - { + pub fn leave_region<'outer>(self, outer: &Scope<'outer, T>) -> Collection<'outer, T, C> { self.inner .leave(outer) .as_collection() @@ -351,10 +348,10 @@ pub mod vec { /// defaults to) `isize`, representing changes to the occurrence count of each record. /// /// This type definition instantiates the [`Collection`] type with a `Vec<(D, T, R)>`. - pub type Collection = super::Collection>; + pub type Collection<'scope, T, D, R = isize> = super::Collection<'scope, T, Vec<(D, T, R)>>; - impl Collection { + impl<'scope, T: Timestamp, D: Clone+'static, R: Clone+'static> Collection<'scope, T, D, R> { /// Creates a new collection by applying the supplied function to each input element. /// /// # Examples @@ -369,7 +366,7 @@ pub mod vec { /// .assert_empty(); /// }); /// ``` - pub fn map(self, mut logic: L) -> Collection + pub fn map(self, mut logic: L) -> Collection<'scope, T, D2, R> where D2: Clone+'static, L: FnMut(D) -> D2 + 'static, @@ -396,7 +393,7 @@ pub mod vec { /// .assert_empty(); /// }); /// ``` - pub fn map_in_place(self, mut logic: L) -> Collection + pub fn map_in_place(self, mut logic: L) -> Collection<'scope, T, D, R> where L: FnMut(&mut D) + 'static, { @@ -420,7 +417,7 @@ pub mod vec { /// .flat_map(|x| 0 .. x); /// }); /// ``` - pub fn flat_map(self, mut logic: L) -> Collection + pub fn flat_map(self, mut logic: L) -> Collection<'scope, T, I::Item, R> where T: Clone, I: IntoIterator, @@ -444,7 +441,7 @@ pub mod vec { /// .assert_empty(); /// }); /// ``` - pub fn filter(self, mut logic: L) -> Collection + pub fn filter(self, mut logic: L) -> Collection<'scope, T, D, R> where L: FnMut(&D) -> bool + 'static, { @@ -472,7 +469,7 @@ pub mod vec { /// x1.assert_eq(x2); /// }); /// ``` - pub fn explode(self, mut logic: L) -> Collection>::Output> + pub fn explode(self, mut logic: L) -> Collection<'scope, T, D2, >::Output> where D2: Clone+'static, R2: Semigroup+Multiply, @@ -506,7 +503,7 @@ pub mod vec { /// ); /// }); /// ``` - pub fn join_function(self, mut logic: L) -> Collection>::Output> + pub fn join_function(self, mut logic: L) -> Collection<'scope, T, D2, >::Output> where T: Lattice, D2: Clone+'static, @@ -542,7 +539,7 @@ pub mod vec { /// data.assert_eq(result); /// }); /// ``` - pub fn enter_at(self, child: &Iterative, mut initial: F) -> Collection, D, R> + pub fn enter_at<'inner, TInner, F>(self, child: &Iterative<'inner, T, TInner>, mut initial: F) -> Collection<'inner, Product, D, R> where TInner: Timestamp+Hash, F: FnMut(&D) -> TInner + Clone + 'static, @@ -563,7 +560,7 @@ pub mod vec { /// ordered, they should have the same order or compare equal once `func` is applied to them (this /// is because we advance the timely capability with the same logic, and it must remain `less_equal` /// to all of the data timestamps). - pub fn delay(self, func: F) -> Collection + pub fn delay(self, func: F) -> Collection<'scope, T, D, R> where T: Hash, F: FnMut(&T) -> T + Clone + 'static, @@ -600,7 +597,7 @@ pub mod vec { /// .inspect(|x| println!("error: {:?}", x)); /// }); /// ``` - pub fn inspect(self, func: F) -> Collection + pub fn inspect(self, func: F) -> Collection<'scope, T, D, R> where F: FnMut(&(D, T, R))+'static, { @@ -626,7 +623,7 @@ pub mod vec { /// .inspect_batch(|t,xs| println!("errors @ {:?}: {:?}", t, xs)); /// }); /// ``` - pub fn inspect_batch(self, mut func: F) -> Collection + pub fn inspect_batch(self, mut func: F) -> Collection<'scope, T, D, R> where F: FnMut(&T, &[(D, T, R)])+'static, { @@ -666,7 +663,7 @@ pub mod vec { } /// Methods requiring an Abelian difference, to support negation. - impl Collection { + impl<'scope, T: Timestamp + Clone + 'static, D: Clone+'static, R: Abelian+'static> Collection<'scope, T, D, R> { /// Assert if the collections are ever different. /// /// Because this is a dataflow fragment, the test is only applied as the computation is run. If the computation @@ -705,7 +702,7 @@ pub mod vec { use crate::trace::{Trace, Builder}; use crate::operators::arrange::{Arranged, TraceAgent}; - impl Collection + impl <'scope, T, K, V, R> Collection<'scope, T, (K, V), R> where T: Timestamp + Lattice + Ord, K: crate::ExchangeData+Hashable, @@ -739,13 +736,13 @@ pub mod vec { /// }); /// }); /// ``` - pub fn reduce(self, logic: L) -> Collection + pub fn reduce(self, logic: L) -> Collection<'scope, T, (K, V2), R2> where L: FnMut(&K, &[(&V, R)], &mut Vec<(V2, R2)>)+'static { self.reduce_named("Reduce", logic) } /// As `reduce` with the ability to name the operator. - pub fn reduce_named(self, name: &str, logic: L) -> Collection + pub fn reduce_named(self, name: &str, logic: L) -> Collection<'scope, T, (K, V2), R2> where L: FnMut(&K, &[(&V, R)], &mut Vec<(V2, R2)>)+'static { use crate::trace::implementations::{ValBuilder, ValSpine}; @@ -782,7 +779,7 @@ pub mod vec { /// .trace; /// }); /// ``` - pub fn reduce_abelian(self, name: &str, mut logic: L) -> Arranged> + pub fn reduce_abelian(self, name: &str, mut logic: L) -> Arranged<'scope, TraceAgent> where T2: for<'a> Trace= &'a K, ValOwn = V, Time=T, Diff: Abelian>+'static, Bu: Builder, Output = T2::Batch>, @@ -800,7 +797,7 @@ pub mod vec { /// Unlike `reduce_arranged`, this method may be called with an empty `input`, /// and it may not be safe to index into the first element. /// At least one of the two collections will be non-empty. - pub fn reduce_core(self, name: &str, logic: L) -> Arranged> + pub fn reduce_core(self, name: &str, logic: L) -> Arranged<'scope, TraceAgent> where V: Clone+'static, T2: for<'a> Trace=&'a K, ValOwn = V, Time=T>+'static, @@ -816,7 +813,7 @@ pub mod vec { } } - impl Collection + impl<'scope, T, K, R1> Collection<'scope, T, K, R1> where T: Timestamp + Lattice + Ord, K: crate::ExchangeData+Hashable, @@ -837,7 +834,7 @@ pub mod vec { /// .distinct(); /// }); /// ``` - pub fn distinct(self) -> Collection { + pub fn distinct(self) -> Collection<'scope, T, K, isize> { self.distinct_core() } @@ -846,7 +843,7 @@ pub mod vec { /// This method allows `distinct` to produce collections whose difference /// type is something other than an `isize` integer, for example perhaps an /// `i32`. - pub fn distinct_core>(self) -> Collection { + pub fn distinct_core>(self) -> Collection<'scope, T, K, R2> { self.threshold_named("Distinct", |_,_| R2::from(1i8)) } @@ -868,12 +865,12 @@ pub mod vec { /// .threshold(|_,c| c % 2); /// }); /// ``` - pub fn thresholdR2+'static>(self, thresh: F) -> Collection { + pub fn thresholdR2+'static>(self, thresh: F) -> Collection<'scope, T, K, R2> { self.threshold_named("Threshold", thresh) } /// A `threshold` with the ability to name the operator. - pub fn threshold_namedR2+'static>(self, name: &str, mut thresh: F) -> Collection { + pub fn threshold_namedR2+'static>(self, name: &str, mut thresh: F) -> Collection<'scope, T, K, R2> { use crate::trace::implementations::{KeyBuilder, KeySpine}; self.arrange_by_self_named(&format!("Arrange: {}", name)) @@ -887,7 +884,7 @@ pub mod vec { } - impl Collection + impl<'scope, T, K, R> Collection<'scope, T, K, R> where T: Timestamp + Lattice + Ord, K: crate::ExchangeData+Hashable, @@ -908,14 +905,14 @@ pub mod vec { /// .count(); /// }); /// ``` - pub fn count(self) -> Collection { self.count_core() } + pub fn count(self) -> Collection<'scope, T, (K, R), isize> { self.count_core() } /// Count for general integer differences. /// /// This method allows `count` to produce collections whose difference /// type is something other than an `isize` integer, for example perhaps an /// `i32`. - pub fn count_core + 'static>(self) -> Collection { + pub fn count_core + 'static>(self) -> Collection<'scope, T, (K, R), R2> { use crate::trace::implementations::{ValBuilder, ValSpine}; self.arrange_by_self_named("Arrange: Count") .reduce_abelian::<_,ValBuilder,ValSpine,_>( @@ -928,7 +925,7 @@ pub mod vec { } /// Methods which require data be arrangeable. - impl Collection + impl<'scope, T, D, R> Collection<'scope, T, D, R> where T: Timestamp + Clone + 'static + Lattice, D: crate::ExchangeData+Hashable, @@ -1023,14 +1020,14 @@ pub mod vec { use crate::trace::implementations::{KeySpine, KeyBatcher, KeyBuilder}; use crate::operators::arrange::Arrange; - impl Arrange> for Collection + impl<'scope, T, K, V, R> Arrange<'scope, T, Vec<((K, V), T, R)>> for Collection<'scope, T, (K, V), R> where T: Timestamp + Lattice, K: crate::ExchangeData + Hashable, V: crate::ExchangeData, R: crate::ExchangeData + Semigroup, { - fn arrange_named(self, name: &str) -> Arranged> + fn arrange_named(self, name: &str) -> Arranged<'scope, TraceAgent> where Ba: crate::trace::Batcher, Time=T> + 'static, Bu: crate::trace::Builder, @@ -1041,11 +1038,11 @@ pub mod vec { } } - impl Arrange> for Collection + impl<'scope, T, K: crate::ExchangeData+Hashable, R: crate::ExchangeData+Semigroup> Arrange<'scope, T, Vec<((K, ()), T, R)>> for Collection<'scope, T, K, R> where T: Timestamp + Lattice + Ord, { - fn arrange_named(self, name: &str) -> Arranged> + fn arrange_named(self, name: &str) -> Arranged<'scope, TraceAgent> where Ba: crate::trace::Batcher, Time=T> + 'static, Bu: crate::trace::Builder, @@ -1057,7 +1054,7 @@ pub mod vec { } - impl Collection + impl<'scope, T, K: crate::ExchangeData+Hashable, V: crate::ExchangeData, R: crate::ExchangeData+Semigroup> Collection<'scope, T, (K,V), R> where T: Timestamp + Lattice + Ord, { @@ -1066,17 +1063,17 @@ pub mod vec { /// This operator arranges a stream of values into a shared trace, whose contents it maintains. /// This trace is current for all times completed by the output stream, which can be used to /// safely identify the stable times and values in the trace. - pub fn arrange_by_key(self) -> Arranged>> { + pub fn arrange_by_key(self) -> Arranged<'scope, TraceAgent>> { self.arrange_by_key_named("ArrangeByKey") } /// As `arrange_by_key` but with the ability to name the arrangement. - pub fn arrange_by_key_named(self, name: &str) -> Arranged>> { + pub fn arrange_by_key_named(self, name: &str) -> Arranged<'scope, TraceAgent>> { self.arrange_named::,ValBuilder<_,_,_,_>,_>(name) } } - impl Collection + impl<'scope, T, K: crate::ExchangeData+Hashable, R: crate::ExchangeData+Semigroup> Collection<'scope, T, K, R> where T: Timestamp + Lattice + Ord, { @@ -1085,18 +1082,18 @@ pub mod vec { /// This operator arranges a collection of records into a shared trace, whose contents it maintains. /// This trace is current for all times complete in the output stream, which can be used to safely /// identify the stable times and values in the trace. - pub fn arrange_by_self(self) -> Arranged>> { + pub fn arrange_by_self(self) -> Arranged<'scope, TraceAgent>> { self.arrange_by_self_named("ArrangeBySelf") } /// As `arrange_by_self` but with the ability to name the arrangement. - pub fn arrange_by_self_named(self, name: &str) -> Arranged>> { + pub fn arrange_by_self_named(self, name: &str) -> Arranged<'scope, TraceAgent>> { self.map(|k| (k, ())) .arrange_named::,KeyBuilder<_,_,_>,_>(name) } } - impl Collection + impl<'scope, T, K, V, R> Collection<'scope, T, (K, V), R> where T: Timestamp + Lattice + Ord, K: crate::ExchangeData+Hashable, @@ -1122,7 +1119,7 @@ pub mod vec { /// .assert_eq(z); /// }); /// ``` - pub fn join(self, other: Collection) -> Collection>::Output> + pub fn join(self, other: Collection<'scope, T, (K,V2), R2>) -> Collection<'scope, T, (K,(V,V2)), >::Output> where K: crate::ExchangeData, V2: crate::ExchangeData, @@ -1149,7 +1146,7 @@ pub mod vec { /// .assert_eq(z); /// }); /// ``` - pub fn join_map(self, other: Collection, mut logic: L) -> Collection>::Output> + pub fn join_map(self, other: Collection<'scope, T, (K, V2), R2>, mut logic: L) -> Collection<'scope, T, D, >::Output> where R: Multiply, L: FnMut(&K, &V, &V2)->D+'static { let arranged1 = self.arrange_by_key(); let arranged2 = other.arrange_by_key(); @@ -1177,7 +1174,7 @@ pub mod vec { /// .assert_eq(z); /// }); /// ``` - pub fn semijoin(self, other: Collection) -> Collection>::Output> + pub fn semijoin(self, other: Collection<'scope, T, K, R2>) -> Collection<'scope, T, (K, V), >::Output> where R: Multiply { let arranged1 = self.arrange_by_key(); let arranged2 = other.arrange_by_self(); @@ -1209,7 +1206,7 @@ pub mod vec { /// .assert_eq(z); /// }); /// ``` - pub fn antijoin(self, other: Collection) -> Collection + pub fn antijoin(self, other: Collection<'scope, T, K, R2>) -> Collection<'scope, T, (K, V), R> where R: Multiply, R: Abelian+'static { self.clone().concat(self.semijoin(other).negate()) } @@ -1220,9 +1217,6 @@ pub mod vec { /// which produces something implementing `IntoIterator`, where the output collection will have an entry for /// every value returned by the iterator. /// - /// This trait is implemented for arrangements (`Arranged`) rather than collections. The `Join` trait - /// contains the implementations for collections. - /// /// # Examples /// /// ``` @@ -1242,7 +1236,7 @@ pub mod vec { /// .assert_eq(z); /// }); /// ``` - pub fn join_core (self, stream2: Arranged, result: L) -> Collection>::Output> + pub fn join_core (self, stream2: Arranged<'scope, Tr2>, result: L) -> Collection<'scope, T,I::Item,>::Output> where Tr2: for<'a> crate::trace::TraceReader=&'a K, Time=T>+Clone+'static, R: Multiply, @@ -1256,17 +1250,17 @@ pub mod vec { } /// Conversion to a differential dataflow Collection. -pub trait AsCollection { +pub trait AsCollection<'scope, T: Timestamp, C> { /// Converts the type to a differential dataflow collection. - fn as_collection(self) -> Collection; + fn as_collection(self) -> Collection<'scope, T, C>; } -impl AsCollection for Stream { +impl<'scope, T: Timestamp, C> AsCollection<'scope, T, C> for Stream<'scope, T, C> { /// Converts the type to a differential dataflow collection. /// /// By calling this method, you guarantee that the timestamp invariant (as documented on /// [Collection]) is upheld. This method will not check it. - fn as_collection(self) -> Collection { + fn as_collection(self) -> Collection<'scope, T, C> { Collection::::new(self) } } @@ -1292,11 +1286,11 @@ impl AsCollection for Stream { /// .assert_eq(data); /// }); /// ``` -pub fn concatenate(scope: &mut Scope, iterator: I) -> Collection +pub fn concatenate<'scope, T, C, I>(scope: &mut Scope<'scope, T>, iterator: I) -> Collection<'scope, T, C> where T: Timestamp, C: Container, - I: IntoIterator>, + I: IntoIterator>, { scope .concatenate(iterator.into_iter().map(|x| x.inner)) diff --git a/differential-dataflow/src/dynamic/mod.rs b/differential-dataflow/src/dynamic/mod.rs index 9efe58f6c..0a615ffbf 100644 --- a/differential-dataflow/src/dynamic/mod.rs +++ b/differential-dataflow/src/dynamic/mod.rs @@ -25,7 +25,7 @@ use crate::collection::AsCollection; use crate::dynamic::pointstamp::PointStamp; use crate::dynamic::pointstamp::PointStampSummary; -impl VecCollection>, D, R> +impl<'scope, D, R, T, TOuter> VecCollection<'scope, Product>, D, R> where D: Data, R: Semigroup+'static, diff --git a/differential-dataflow/src/input.rs b/differential-dataflow/src/input.rs index 0ef606ac7..03045270c 100644 --- a/differential-dataflow/src/input.rs +++ b/differential-dataflow/src/input.rs @@ -16,7 +16,7 @@ use crate::difference::Semigroup; use crate::collection::{VecCollection, AsCollection}; /// Create a new collection and input handle to control the collection. -pub trait Input : TimelyInput { +pub trait Input<'scope> : TimelyInput<'scope> { /// Create a new collection and input handle to subsequently control the collection. /// /// # Examples @@ -41,7 +41,7 @@ pub trait Input : TimelyInput { /// /// }).unwrap(); /// ``` - fn new_collection(&mut self) -> (InputSession, VecCollection) + fn new_collection(&mut self) -> (InputSession, VecCollection<'scope, Self::Timestamp, D, R>) where D: Data, R: Semigroup+'static; /// Create a new collection and input handle from initial data. /// @@ -67,7 +67,7 @@ pub trait Input : TimelyInput { /// /// }).unwrap(); /// ``` - fn new_collection_from(&mut self, data: I) -> (InputSession, VecCollection) + fn new_collection_from(&mut self, data: I) -> (InputSession, VecCollection<'scope, Self::Timestamp, I::Item, isize>) where I: IntoIterator + 'static; /// Create a new collection and input handle from initial data. /// @@ -93,24 +93,24 @@ pub trait Input : TimelyInput { /// /// }).unwrap(); /// ``` - fn new_collection_from_raw(&mut self, data: I) -> (InputSession, VecCollection) + fn new_collection_from_raw(&mut self, data: I) -> (InputSession, VecCollection<'scope, Self::Timestamp, D, R>) where I: IntoIterator+'static, D: Data, R: Semigroup+'static; } use crate::lattice::Lattice; -impl Input for Scope { - fn new_collection(&mut self) -> (InputSession, VecCollection) +impl<'scope, T: Timestamp + Lattice + timely::order::TotalOrder> Input<'scope> for Scope<'scope, T> { + fn new_collection(&mut self) -> (InputSession, VecCollection<'scope, T, D, R>) where D: Data, R: Semigroup+'static, { let (handle, stream) = self.new_input(); (InputSession::from(handle), stream.as_collection()) } - fn new_collection_from(&mut self, data: I) -> (InputSession, VecCollection) + fn new_collection_from(&mut self, data: I) -> (InputSession, VecCollection<'scope, T, I::Item, isize>) where I: IntoIterator+'static, I::Item: Data { self.new_collection_from_raw(data.into_iter().map(|d| (d, ::minimum(), 1))) } - fn new_collection_from_raw(&mut self, data: I) -> (InputSession, VecCollection) + fn new_collection_from_raw(&mut self, data: I) -> (InputSession, VecCollection<'scope, T, D, R>) where D: Data, R: Semigroup+'static, @@ -199,7 +199,7 @@ impl InputSession { impl InputSession { /// Introduces a handle as collection. - pub fn to_collection(&mut self, scope: &mut Scope) -> VecCollection + pub fn to_collection<'scope>(&mut self, scope: &mut Scope<'scope, T>) -> VecCollection<'scope, T, D, R> where T: timely::order::TotalOrder, { diff --git a/differential-dataflow/src/operators/arrange/agent.rs b/differential-dataflow/src/operators/arrange/agent.rs index 7ae253d2c..61118a497 100644 --- a/differential-dataflow/src/operators/arrange/agent.rs +++ b/differential-dataflow/src/operators/arrange/agent.rs @@ -216,13 +216,13 @@ impl TraceAgent { /// /// }).unwrap(); /// ``` - pub fn import(&mut self, scope: &Scope) -> Arranged> + pub fn import<'scope>(&mut self, scope: &Scope<'scope, Tr::Time>) -> Arranged<'scope, TraceAgent> { self.import_named(scope, "ArrangedSource") } /// Same as `import`, but allows to name the source. - pub fn import_named(&mut self, scope: &Scope, name: &str) -> Arranged> + pub fn import_named<'scope>(&mut self, scope: &Scope<'scope, Tr::Time>, name: &str) -> Arranged<'scope, TraceAgent> { // Drop ShutdownButton and return only the arrangement. self.import_core(scope, name).0 @@ -275,7 +275,7 @@ impl TraceAgent { /// /// }).unwrap(); /// ``` - pub fn import_core(&mut self, scope: &Scope, name: &str) -> (Arranged>, ShutdownButton>) + pub fn import_core<'scope>(&mut self, scope: &Scope<'scope, Tr::Time>, name: &str) -> (Arranged<'scope, TraceAgent>, ShutdownButton>) { let trace = self.clone(); @@ -388,7 +388,7 @@ impl TraceAgent { /// /// }).unwrap(); /// ``` - pub fn import_frontier(&mut self, scope: &Scope, name: &str) -> (Arranged>>, ShutdownButton>) + pub fn import_frontier<'scope>(&mut self, scope: &Scope<'scope, Tr::Time>, name: &str) -> (Arranged<'scope, TraceFrontier>>, ShutdownButton>) where Tr: TraceReader, { @@ -405,7 +405,7 @@ impl TraceAgent { /// /// Invoking this method with an `until` of `Antichain::new()` will perform no filtering, as the empty /// frontier indicates the end of times. - pub fn import_frontier_core(&mut self, scope: &Scope, name: &str, since: Antichain, until: Antichain) -> (Arranged>>, ShutdownButton>) + pub fn import_frontier_core<'scope>(&mut self, scope: &Scope<'scope, Tr::Time>, name: &str, since: Antichain, until: Antichain) -> (Arranged<'scope, TraceFrontier>>, ShutdownButton>) where Tr: TraceReader, { diff --git a/differential-dataflow/src/operators/arrange/arrangement.rs b/differential-dataflow/src/operators/arrange/arrangement.rs index 4034bb9a1..7435e692e 100644 --- a/differential-dataflow/src/operators/arrange/arrangement.rs +++ b/differential-dataflow/src/operators/arrange/arrangement.rs @@ -43,7 +43,7 @@ use super::TraceAgent; /// /// An `Arranged` allows multiple differential operators to share the resources (communication, /// computation, memory) required to produce and maintain an indexed representation of a collection. -pub struct Arranged +pub struct Arranged<'scope, Tr> where Tr: TraceReader+Clone, { @@ -52,14 +52,14 @@ where /// This stream contains the same batches of updates the trace itself accepts, so there should /// be no additional overhead to receiving these records. The batches can be navigated just as /// the batches in the trace, by key and by value. - pub stream: Stream>, + pub stream: Stream<'scope, Tr::Time, Vec>, /// A shared trace, updated by the `Arrange` operator and readable by others. pub trace: Tr, // TODO : We might have an `Option>` here, which `as_collection` sets and // returns when invoked, so as to not duplicate work with multiple calls to `as_collection`. } -impl Clone for Arranged +impl<'scope, Tr> Clone for Arranged<'scope, Tr> where Tr: TraceReader + Clone, { @@ -74,7 +74,7 @@ where use ::timely::progress::timestamp::Refines; use timely::Container; -impl Arranged +impl<'scope, Tr> Arranged<'scope, Tr> where Tr: TraceReader + Clone, { @@ -83,10 +83,9 @@ where /// This method produces a proxy trace handle that uses the same backing data, but acts as if the timestamps /// have all been extended with an additional coordinate with the default value. The resulting collection does /// not vary with the new timestamp coordinate. - pub fn enter(self, child: &Scope) - -> Arranged> - where - TInner: Refines+Lattice, + pub fn enter<'inner, TInner>(self, child: &Scope<'inner, TInner>) -> Arranged<'inner, TraceEnter> + where + TInner: Refines+Lattice, { Arranged { stream: self.stream.enter(child).map(|bw| BatchEnter::make_from(bw)), @@ -98,7 +97,7 @@ where /// /// This method only applies to *regions*, which are subscopes with the same timestamp /// as their containing scope. In this case, the trace type does not need to change. - pub fn enter_region(self, child: &Scope) -> Self { + pub fn enter_region<'inner>(self, child: &Scope<'inner, Tr::Time>) -> Arranged<'inner, Tr> { Arranged { stream: self.stream.enter(child), trace: self.trace, @@ -110,13 +109,12 @@ where /// This method produces a proxy trace handle that uses the same backing data, but acts as if the timestamps /// have all been extended with an additional coordinate with the default value. The resulting collection does /// not vary with the new timestamp coordinate. - pub fn enter_at(self, child: &Scope, logic: F, prior: P) - -> Arranged> - where - TInner: Refines+Lattice+'static, - F: FnMut(Tr::Key<'_>, Tr::Val<'_>, Tr::TimeGat<'_>)->TInner+Clone+'static, - P: FnMut(&TInner)->Tr::Time+Clone+'static, - { + pub fn enter_at<'inner, TInner, F, P>(self, child: &Scope<'inner, TInner>, logic: F, prior: P) -> Arranged<'inner, TraceEnterAt> + where + TInner: Refines+Lattice+'static, + F: FnMut(Tr::Key<'_>, Tr::Val<'_>, Tr::TimeGat<'_>)->TInner+Clone+'static, + P: FnMut(&TInner)->Tr::Time+Clone+'static, + { let logic1 = logic.clone(); let logic2 = logic.clone(); Arranged { @@ -129,7 +127,7 @@ where /// /// This method is like `self.stream.flat_map`, except that it produces containers /// directly, rather than form a container of containers as `flat_map` would. - pub fn as_container(self, mut logic: L) -> crate::Collection + pub fn as_container(self, mut logic: L) -> crate::Collection<'scope, Tr::Time, I::Item> where I: IntoIterator, L: FnMut(Tr::Batch) -> I+'static, @@ -152,7 +150,7 @@ where /// The underlying `Stream>>` is a much more efficient way to access the data, /// and this method should only be used when the data need to be transformed or exchanged, rather than /// supplied as arguments to an operator using the same key-value structure. - pub fn as_collection(self, mut logic: L) -> VecCollection + pub fn as_collection(self, mut logic: L) -> VecCollection<'scope, Tr::Time, D, Tr::Diff> where L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> D+'static, { @@ -168,7 +166,7 @@ where /// The method takes `K` and `V` as generic arguments, in order to constrain the reference types to support /// cloning into owned types. If this bound does not work, the `as_collection` method allows arbitrary logic /// on the reference types. - pub fn as_vecs(self) -> VecCollection + pub fn as_vecs(self) -> VecCollection<'scope, Tr::Time, (K, V), Tr::Diff> where K: crate::ExchangeData, V: crate::ExchangeData, @@ -181,7 +179,7 @@ where /// /// The supplied logic may produce an iterator over output values, allowing either /// filtering or flat mapping as part of the extraction. - pub fn flat_map_ref(self, logic: L) -> VecCollection + pub fn flat_map_ref(self, logic: L) -> VecCollection<'scope, Tr::Time, I::Item, Tr::Diff> where I: IntoIterator, L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> I+'static, @@ -196,7 +194,7 @@ where /// /// This method exists for streams of batches without the corresponding arrangement. /// If you have the arrangement, its `flat_map_ref` method is equivalent to this. - pub fn flat_map_batches(stream: Stream>, mut logic: L) -> VecCollection + pub fn flat_map_batches(stream: Stream<'scope, Tr::Time, Vec>, mut logic: L) -> VecCollection<'scope, Tr::Time, I::Item, Tr::Diff> where I: IntoIterator, L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> I+'static, @@ -228,14 +226,14 @@ where use crate::difference::Multiply; // Direct join implementations. -impl Arranged +impl<'scope, Tr1> Arranged<'scope, Tr1> where Tr1: TraceReader + Clone + 'static, { /// A convenience method to join and produce `VecCollection` output. /// /// Avoid this method, as it is likely to evolve into one without the `VecCollection` opinion. - pub fn join_core(self, other: Arranged, mut result: L) -> VecCollection>::Output> + pub fn join_core(self, other: Arranged<'scope, Tr2>, mut result: L) -> VecCollection<'scope, Tr1::Time,I::Item,>::Output> where Tr2: for<'a> TraceReader=Tr1::Key<'a>,Time=Tr1::Time>+Clone+'static, Tr1::Diff: Multiply, @@ -264,12 +262,12 @@ where // Direct reduce implementations. use crate::difference::Abelian; -impl Arranged +impl<'scope, Tr1> Arranged<'scope, Tr1> where Tr1: TraceReader + Clone + 'static, { /// A direct implementation of `ReduceCore::reduce_abelian`. - pub fn reduce_abelian(self, name: &str, mut logic: L, push: P) -> Arranged> + pub fn reduce_abelian(self, name: &str, mut logic: L, push: P) -> Arranged<'scope, TraceAgent> where Tr2: for<'a> Trace< Key<'a>= Tr1::Key<'a>, @@ -291,7 +289,7 @@ where } /// A direct implementation of `ReduceCore::reduce_core`. - pub fn reduce_core(self, name: &str, logic: L, push: P) -> Arranged> + pub fn reduce_core(self, name: &str, logic: L, push: P) -> Arranged<'scope, TraceAgent> where Tr2: for<'a> Trace< Key<'a>=Tr1::Key<'a>, @@ -308,7 +306,7 @@ where } -impl Arranged +impl<'scope, Tr> Arranged<'scope, Tr> where Tr: TraceReader + Clone, { @@ -316,8 +314,7 @@ where /// /// This method only applies to *regions*, which are subscopes with the same timestamp /// as their containing scope. In this case, the trace type does not need to change. - pub fn leave_region(self, outer: &Scope) -> Self - { + pub fn leave_region<'outer>(self, outer: &Scope<'outer, Tr::Time>) -> Arranged<'outer, Tr> { use timely::dataflow::operators::Leave; Arranged { stream: self.stream.leave(outer), @@ -327,12 +324,12 @@ where } /// A type that can be arranged as if a collection of updates. -pub trait Arrange : Sized +pub trait Arrange<'scope, T, C> : Sized where T: Timestamp + Lattice, { /// Arranges updates into a shared trace. - fn arrange(self) -> Arranged> + fn arrange(self) -> Arranged<'scope, TraceAgent> where Ba: Batcher + 'static, Bu: Builder, @@ -342,7 +339,7 @@ where } /// Arranges updates into a shared trace, with a supplied name. - fn arrange_named(self, name: &str) -> Arranged> + fn arrange_named(self, name: &str) -> Arranged<'scope, TraceAgent> where Ba: Batcher + 'static, Bu: Builder, @@ -355,7 +352,7 @@ where /// This operator arranges a stream of values into a shared trace, whose contents it maintains. /// It uses the supplied parallelization contract to distribute the data, which does not need to /// be consistently by key (though this is the most common). -pub fn arrange_core(stream: Stream, pact: P, name: &str) -> Arranged> +pub fn arrange_core<'scope, P, Ba, Bu, Tr>(stream: Stream<'scope, Tr::Time, Ba::Input>, pact: P, name: &str) -> Arranged<'scope, TraceAgent> where P: ParallelizationContract, Ba: Batcher + 'static, diff --git a/differential-dataflow/src/operators/arrange/upsert.rs b/differential-dataflow/src/operators/arrange/upsert.rs index 33b62a0e8..0c88eebd9 100644 --- a/differential-dataflow/src/operators/arrange/upsert.rs +++ b/differential-dataflow/src/operators/arrange/upsert.rs @@ -128,10 +128,10 @@ use super::TraceAgent; /// This method is only implemented for totally ordered times, as we do not yet /// understand what a "sequence" of upserts would mean for partially ordered /// timestamps. -pub fn arrange_from_upsert( - stream: Stream, Tr::Time)>>, +pub fn arrange_from_upsert<'scope, Bu, Tr, K, V>( + stream: Stream<'scope, Tr::Time, Vec<(K, Option, Tr::Time)>>, name: &str, -) -> Arranged> +) -> Arranged<'scope, TraceAgent> where K: ExchangeData+Hashable+std::hash::Hash, V: ExchangeData, diff --git a/differential-dataflow/src/operators/count.rs b/differential-dataflow/src/operators/count.rs index 8f4e7ad7a..abc3d20db 100644 --- a/differential-dataflow/src/operators/count.rs +++ b/differential-dataflow/src/operators/count.rs @@ -14,7 +14,7 @@ use crate::operators::arrange::Arranged; use crate::trace::{BatchReader, Cursor, TraceReader}; /// Extension trait for the `count` differential dataflow method. -pub trait CountTotal : Sized { +pub trait CountTotal<'scope, T: Timestamp + TotalOrder + Lattice + Ord, K: ExchangeData, R: Semigroup> : Sized { /// Counts the number of occurrences of each element. /// /// # Examples @@ -30,7 +30,7 @@ pub trait CountTotal VecCollection { + fn count_total(self) -> VecCollection<'scope, T, (K, R), isize> { self.count_total_core() } @@ -39,20 +39,20 @@ pub trait CountTotal + 'static>(self) -> VecCollection; + fn count_total_core + 'static>(self) -> VecCollection<'scope, T, (K, R), R2>; } -impl CountTotal for VecCollection +impl<'scope, T, K: ExchangeData+Hashable, R: ExchangeData+Semigroup> CountTotal<'scope, T, K, R> for VecCollection<'scope, T, K, R> where T: Timestamp + TotalOrder + Lattice + Ord, { - fn count_total_core + 'static>(self) -> VecCollection { + fn count_total_core + 'static>(self) -> VecCollection<'scope, T, (K, R), R2> { self.arrange_by_self_named("Arrange: CountTotal") .count_total_core() } } -impl CountTotal for Arranged +impl<'scope, K, Tr> CountTotal<'scope, Tr::Time, K, Tr::Diff> for Arranged<'scope, Tr> where Tr: for<'a> TraceReader< Key<'a> = &'a K, @@ -62,7 +62,7 @@ where >+Clone+'static, K: ExchangeData, { - fn count_total_core + 'static>(self) -> VecCollection { + fn count_total_core + 'static>(self) -> VecCollection<'scope, Tr::Time, (K, Tr::Diff), R2> { let mut trace = self.trace.clone(); diff --git a/differential-dataflow/src/operators/iterate.rs b/differential-dataflow/src/operators/iterate.rs index 140e8888a..496f9f5c1 100644 --- a/differential-dataflow/src/operators/iterate.rs +++ b/differential-dataflow/src/operators/iterate.rs @@ -46,7 +46,7 @@ use crate::difference::{Semigroup, Abelian}; use crate::lattice::Lattice; /// An extension trait for the `iterate` method. -pub trait Iterate { +pub trait Iterate<'scope, T: Timestamp + Lattice, D: Data, R: Semigroup> { /// Iteratively apply `logic` to the source collection until convergence. /// /// Importantly, this method does not automatically consolidate results. @@ -73,15 +73,15 @@ pub trait Iterate { /// }); /// }); /// ``` - fn iterate(self, logic: F) -> VecCollection + fn iterate(self, logic: F) -> VecCollection<'scope, T, D, R> where - for<'a> F: FnOnce(Iterative, VecCollection, D, R>)->VecCollection, D, R>; + for<'inner> F: FnOnce(Iterative<'inner, T, u64>, VecCollection<'inner, Product, D, R>)->VecCollection<'inner, Product, D, R>; } -impl Iterate for VecCollection { - fn iterate(self, logic: F) -> VecCollection +impl<'scope, T: Timestamp + Lattice, D: Ord+Data+Debug, R: Abelian+'static> Iterate<'scope, T, D, R> for VecCollection<'scope, T, D, R> { + fn iterate(self, logic: F) -> VecCollection<'scope, T, D, R> where - for<'a> F: FnOnce(Iterative, VecCollection, D, R>)->VecCollection, D, R>, + for<'inner> F: FnOnce(Iterative<'inner, T, u64>, VecCollection<'inner, Product, D, R>)->VecCollection<'inner, Product, D, R>, { let outer = self.inner.scope(); outer.scoped("Iterate", |subgraph| { @@ -99,10 +99,10 @@ impl Iterate Iterate for Scope { - fn iterate(self, logic: F) -> VecCollection +impl<'scope, T: Timestamp + Lattice, D: Ord+Data+Debug, R: Semigroup+'static> Iterate<'scope, T, D, R> for Scope<'scope, T> { + fn iterate(self, logic: F) -> VecCollection<'scope, T, D, R> where - for<'a> F: FnOnce(Iterative, VecCollection, D, R>)->VecCollection, D, R>, + for<'inner> F: FnOnce(Iterative<'inner, T, u64>, VecCollection<'inner, Product, D, R>)->VecCollection<'inner, Product, D, R>, { let outer = self.clone(); self.scoped("Iterate", |subgraph| { @@ -189,20 +189,20 @@ impl Iterate +pub struct Variable<'scope, T, C> where T: Timestamp + Lattice, C: Container, { - feedback: Handle, - source: Option>, + feedback: Handle<'scope, T, C>, + source: Option>, step: T::Summary, } /// A `Variable` specialized to a vector container of update triples (data, time, diff). -pub type VecVariable = Variable>; +pub type VecVariable<'scope, T, D, R> = Variable<'scope, T, Vec<(D, T, R)>>; -impl Variable +impl<'scope, T, C: Container> Variable<'scope, T, C> where T: Timestamp + Lattice, C: crate::collection::containers::ResultsIn, @@ -218,7 +218,7 @@ where /// will produce its fixed point in the outer scope. /// /// In a non-iterative scope the mechanics are the same, but the interpretation varies. - pub fn new(scope: &mut Scope, step: T::Summary) -> (Self, Collection) { + pub fn new(scope: &mut Scope<'scope, T>, step: T::Summary) -> (Self, Collection<'scope, T, C>) { let (feedback, updates) = scope.feedback(step.clone()); let collection = Collection::::new(updates); (Self { feedback, source: None, step }, collection) @@ -249,7 +249,7 @@ where /// adding the source, doing the logic, then subtracting the source, it is appropriate to do. /// For example, if the logic modifies a few records it is possible to produce this update /// directly without using the backstop implementation this method provides. - pub fn new_from(source: Collection, step: T::Summary) -> (Self, Collection) where C: Clone + crate::collection::containers::Negate { + pub fn new_from(source: Collection<'scope, T, C>, step: T::Summary) -> (Self, Collection<'scope, T, C>) where C: Clone + crate::collection::containers::Negate { let (feedback, updates) = source.inner.scope().feedback(step.clone()); let collection = Collection::::new(updates).concat(source.clone()); (Variable { feedback, source: Some(source.negate()), step }, collection) @@ -259,7 +259,7 @@ where /// /// This method binds the `Variable` to be equal to the supplied collection, /// which may be recursively defined in terms of the variable itself. - pub fn set(mut self, mut result: Collection) { + pub fn set(mut self, mut result: Collection<'scope, T, C>) { if let Some(source) = self.source.take() { result = result.concat(source); } diff --git a/differential-dataflow/src/operators/join.rs b/differential-dataflow/src/operators/join.rs index 7d514cfec..b01e3bf89 100644 --- a/differential-dataflow/src/operators/join.rs +++ b/differential-dataflow/src/operators/join.rs @@ -67,7 +67,7 @@ impl, D> PushInto for EffortBuilder { /// The "correctness" of this method depends heavily on the behavior of the supplied `result` function. /// /// [`AsCollection`]: crate::collection::AsCollection -pub fn join_traces(arranged1: Arranged, arranged2: Arranged, mut result: L) -> Stream +pub fn join_traces<'scope, Tr1, Tr2, L, CB>(arranged1: Arranged<'scope, Tr1>, arranged2: Arranged<'scope, Tr2>, mut result: L) -> Stream<'scope, Tr1::Time, CB::Container> where Tr1: TraceReader+Clone+'static, Tr2: for<'a> TraceReader=Tr1::Key<'a>, Time = Tr1::Time>+Clone+'static, diff --git a/differential-dataflow/src/operators/reduce.rs b/differential-dataflow/src/operators/reduce.rs index 4fcaefd08..a695527a8 100644 --- a/differential-dataflow/src/operators/reduce.rs +++ b/differential-dataflow/src/operators/reduce.rs @@ -33,7 +33,7 @@ use crate::trace::TraceReader; /// the value updates, as appropriate for the container. It is critical that it clear the container as /// the operator has no ability to do this otherwise, and failing to do so represents a leak from one /// key's computation to another, and will likely introduce non-determinism. -pub fn reduce_trace(trace: Arranged, name: &str, mut logic: L, mut push: P) -> Arranged> +pub fn reduce_trace<'scope, Tr1, Bu, Tr2, L, P>(trace: Arranged<'scope, Tr1>, name: &str, mut logic: L, mut push: P) -> Arranged<'scope, TraceAgent> where Tr1: TraceReader + Clone + 'static, Tr2: for<'a> Trace=Tr1::Key<'a>, ValOwn: Data, Time = Tr1::Time> + 'static, diff --git a/differential-dataflow/src/operators/threshold.rs b/differential-dataflow/src/operators/threshold.rs index 22dfec32b..d7716a0db 100644 --- a/differential-dataflow/src/operators/threshold.rs +++ b/differential-dataflow/src/operators/threshold.rs @@ -17,9 +17,9 @@ use crate::operators::arrange::Arranged; use crate::trace::{BatchReader, Cursor, TraceReader}; /// Extension trait for the `distinct` differential dataflow method. -pub trait ThresholdTotal : Sized { +pub trait ThresholdTotal<'scope, T: Timestamp + TotalOrder + Lattice + Ord, K: ExchangeData, R: ExchangeData+Semigroup> : Sized { /// Reduces the collection to one occurrence of each distinct element. - fn threshold_semigroup(self, thresh: F) -> VecCollection + fn threshold_semigroup(self, thresh: F) -> VecCollection<'scope, T, K, R2> where R2: Semigroup+'static, F: FnMut(&K,&R,Option<&R>)->Option+'static, @@ -39,7 +39,7 @@ pub trait ThresholdTotalR2+'static>(self, mut thresh: F) -> VecCollection { + fn threshold_totalR2+'static>(self, mut thresh: F) -> VecCollection<'scope, T, K, R2> { self.threshold_semigroup(move |key, new, old| { let mut new = thresh(key, new); if let Some(old) = old { @@ -69,7 +69,7 @@ pub trait ThresholdTotal VecCollection { + fn distinct_total(self) -> VecCollection<'scope, T, K, isize> { self.distinct_total_core() } @@ -78,17 +78,17 @@ pub trait ThresholdTotal+'static>(self) -> VecCollection { + fn distinct_total_core+'static>(self) -> VecCollection<'scope, T, K, R2> { self.threshold_total(|_,_| R2::from(1i8)) } } -impl ThresholdTotal for VecCollection +impl<'scope, T, K: ExchangeData+Hashable, R: ExchangeData+Semigroup> ThresholdTotal<'scope, T, K, R> for VecCollection<'scope, T, K, R> where T: Timestamp + TotalOrder + Lattice + Ord, { - fn threshold_semigroup(self, thresh: F) -> VecCollection + fn threshold_semigroup(self, thresh: F) -> VecCollection<'scope, T, K, R2> where R2: Semigroup+'static, F: FnMut(&K,&R,Option<&R>)->Option+'static, @@ -98,7 +98,7 @@ where } } -impl ThresholdTotal for Arranged +impl<'scope, K, Tr> ThresholdTotal<'scope, Tr::Time, K, Tr::Diff> for Arranged<'scope, Tr> where Tr: for<'a> TraceReader< Key<'a>=&'a K, @@ -108,7 +108,7 @@ where >+Clone+'static, K: ExchangeData, { - fn threshold_semigroup(self, mut thresh: F) -> VecCollection + fn threshold_semigroup(self, mut thresh: F) -> VecCollection<'scope, Tr::Time, K, R2> where R2: Semigroup+'static, F: for<'a> FnMut(Tr::Key<'a>,&Tr::Diff,Option<&Tr::Diff>)->Option+'static, diff --git a/differential-dataflow/tests/bfs.rs b/differential-dataflow/tests/bfs.rs index d3a3bccb8..721f7709d 100644 --- a/differential-dataflow/tests/bfs.rs +++ b/differential-dataflow/tests/bfs.rs @@ -201,7 +201,7 @@ fn bfs_differential( } // returns pairs (n, s) indicating node n can be reached from a root in s steps. -fn bfs(edges: VecCollection, roots: VecCollection) -> VecCollection +fn bfs<'scope, T>(edges: VecCollection<'scope, T, Edge>, roots: VecCollection<'scope, T, Node>) -> VecCollection<'scope, T, (Node, usize)> where T: timely::progress::Timestamp + Lattice + Ord, { diff --git a/differential-dataflow/tests/scc.rs b/differential-dataflow/tests/scc.rs index 2e48b41f3..2c0b35d92 100644 --- a/differential-dataflow/tests/scc.rs +++ b/differential-dataflow/tests/scc.rs @@ -214,7 +214,7 @@ fn scc_differential( .collect() } -fn _strongly_connected(graph: VecCollection) -> VecCollection +fn _strongly_connected<'scope, T>(graph: VecCollection<'scope, T, Edge>) -> VecCollection<'scope, T, Edge> where T: timely::progress::Timestamp + Lattice + Ord + Hash, { @@ -225,7 +225,7 @@ where }) } -fn _trim_edges(cycle: VecCollection, edges: VecCollection) -> VecCollection +fn _trim_edges<'scope, T>(cycle: VecCollection<'scope, T, Edge>, edges: VecCollection<'scope, T, Edge>) -> VecCollection<'scope, T, Edge> where T: timely::progress::Timestamp + Lattice + Ord + Hash, { @@ -243,7 +243,7 @@ where .map(|((x1,x2),_)| (x2,x1)) } -fn _reachability(edges: VecCollection, nodes: VecCollection) -> VecCollection +fn _reachability<'scope, T>(edges: VecCollection<'scope, T, Edge>, nodes: VecCollection<'scope, T, (Node, Node)>) -> VecCollection<'scope, T, Edge> where T: timely::progress::Timestamp + Lattice + Ord + Hash, { diff --git a/dogsdogsdogs/src/calculus.rs b/dogsdogsdogs/src/calculus.rs index 8109d1262..a9bfe1589 100644 --- a/dogsdogsdogs/src/calculus.rs +++ b/dogsdogsdogs/src/calculus.rs @@ -21,23 +21,23 @@ use differential_dataflow::difference::Abelian; use crate::altneu::AltNeu; /// Produce a collection containing the changes at the moments they happen. -pub trait Differentiate { - fn differentiate(self, child: &Scope>) -> VecCollection, D, R>; +pub trait Differentiate<'scope, T: Timestamp, D: Data, R: Abelian> { + fn differentiate<'inner>(self, child: &Scope<'inner, AltNeu>) -> VecCollection<'inner, AltNeu, D, R>; } /// Collect instantaneous changes back in to a collection. -pub trait Integrate { - fn integrate(self, outer: &Scope) -> VecCollection; +pub trait Integrate<'scope, T: Timestamp, D: Data, R: Abelian> { + fn integrate<'outer>(self, outer: &Scope<'outer, T>) -> VecCollection<'outer, T, D, R>; } -impl Differentiate for VecCollection +impl<'scope, T, D, R> Differentiate<'scope, T, D, R> for VecCollection<'scope, T, D, R> where T: Timestamp, D: Data, R: Abelian + 'static, { // For each (data, Alt(time), diff) we add a (data, Neu(time), -diff). - fn differentiate(self, child: &Scope>) -> VecCollection, D, R> { + fn differentiate<'inner>(self, child: &Scope<'inner, AltNeu>) -> VecCollection<'inner, AltNeu, D, R> { self.enter(child) .inner .flat_map(|(data, time, diff)| { @@ -51,14 +51,14 @@ where } } -impl Integrate for VecCollection, D, R> +impl<'scope, T, D, R> Integrate<'scope, T, D, R> for VecCollection<'scope, AltNeu, D, R> where T: Timestamp, D: Data, R: Abelian + 'static, { // We discard each `neu` variant and strip off the `alt` wrapper. - fn integrate(self, outer: &Scope) -> VecCollection { + fn integrate<'outer>(self, outer: &Scope<'outer, T>) -> VecCollection<'outer, T, D, R> { self.inner .filter(|(_d,t,_r)| !t.neu) .as_collection() diff --git a/dogsdogsdogs/src/lib.rs b/dogsdogsdogs/src/lib.rs index e2699ee0d..91549a6f3 100644 --- a/dogsdogsdogs/src/lib.rs +++ b/dogsdogsdogs/src/lib.rs @@ -19,37 +19,37 @@ pub mod operators; Implementors of `PrefixExtension` provide types and methods for extending a differential dataflow collection, via the three methods `count`, `propose`, and `validate`. **/ -pub trait PrefixExtender> { +pub trait PrefixExtender<'scope, T: Timestamp, R: Monoid+Multiply> { /// The required type of prefix to extend. type Prefix; /// The type to be produced as extension. type Extension; /// Annotates prefixes with the number of extensions the relation would propose. - fn count(&mut self, prefixes: VecCollection, index: usize) -> VecCollection; + fn count(&mut self, prefixes: VecCollection<'scope, T, (Self::Prefix, usize, usize), R>, index: usize) -> VecCollection<'scope, T, (Self::Prefix, usize, usize), R>; /// Extends each prefix with corresponding extensions. - fn propose(&mut self, prefixes: VecCollection) -> VecCollection; + fn propose(&mut self, prefixes: VecCollection<'scope, T, Self::Prefix, R>) -> VecCollection<'scope, T, (Self::Prefix, Self::Extension), R>; /// Restricts proposed extensions by those the extender would have proposed. - fn validate(&mut self, extensions: VecCollection) -> VecCollection; + fn validate(&mut self, extensions: VecCollection<'scope, T, (Self::Prefix, Self::Extension), R>) -> VecCollection<'scope, T, (Self::Prefix, Self::Extension), R>; } -pub trait ProposeExtensionMethod> { - fn propose_using>(self, extender: &mut PE) -> VecCollection; - fn extend(self, extenders: &mut [&mut dyn PrefixExtender]) -> VecCollection; +pub trait ProposeExtensionMethod<'scope, T: Timestamp, P: ExchangeData+Ord, R: Monoid+Multiply> { + fn propose_using>(self, extender: &mut PE) -> VecCollection<'scope, T, (P, PE::Extension), R>; + fn extend(self, extenders: &mut [&mut dyn PrefixExtender<'scope, T,R,Prefix=P,Extension=E>]) -> VecCollection<'scope, T, (P, E), R>; } -impl ProposeExtensionMethod for VecCollection +impl<'scope, T, P, R> ProposeExtensionMethod<'scope, T, P, R> for VecCollection<'scope, T, P, R> where T: Timestamp, P: ExchangeData+Ord, R: Monoid+Multiply+'static, { - fn propose_using(self, extender: &mut PE) -> VecCollection + fn propose_using(self, extender: &mut PE) -> VecCollection<'scope, T, (P, PE::Extension), R> where - PE: PrefixExtender + PE: PrefixExtender<'scope, T, R, Prefix=P> { extender.propose(self) } - fn extend(self, extenders: &mut [&mut dyn PrefixExtender]) -> VecCollection + fn extend(self, extenders: &mut [&mut dyn PrefixExtender<'scope, T,R,Prefix=P,Extension=E>]) -> VecCollection<'scope, T, (P, E), R> where E: ExchangeData+Ord { @@ -80,12 +80,12 @@ where } } -pub trait ValidateExtensionMethod, P, E> { - fn validate_using>(self, extender: &mut PE) -> VecCollection; +pub trait ValidateExtensionMethod<'scope, T: Timestamp, R: Monoid+Multiply, P, E> { + fn validate_using>(self, extender: &mut PE) -> VecCollection<'scope, T, (P, E), R>; } -impl, P, E> ValidateExtensionMethod for VecCollection { - fn validate_using>(self, extender: &mut PE) -> VecCollection { +impl<'scope, T: Timestamp, R: Monoid+Multiply, P, E> ValidateExtensionMethod<'scope, T, R, P, E> for VecCollection<'scope, T, (P, E), R> { + fn validate_using>(self, extender: &mut PE) -> VecCollection<'scope, T, (P, E), R> { extender.validate(self) } } @@ -136,7 +136,7 @@ where R: Monoid+Multiply+ExchangeData, { - pub fn index(collection: VecCollection) -> Self { + pub fn index<'scope>(collection: VecCollection<'scope, T, (K, V), R>) -> Self { // We need to count the number of (k, v) pairs and not rely on the given Monoid R and its binary addition operation. // counts and validate can share the base arrangement let arranged = collection.clone().arrange_by_self(); @@ -179,7 +179,7 @@ where key_selector: F, } -impl PrefixExtender for CollectionExtender +impl<'scope, T, K, V, R, P, F> PrefixExtender<'scope, T, R> for CollectionExtender where T: Timestamp + Lattice + ExchangeData + Hash, K: ExchangeData+Hash+Default, @@ -191,17 +191,17 @@ where type Prefix = P; type Extension = V; - fn count(&mut self, prefixes: VecCollection, index: usize) -> VecCollection { + fn count(&mut self, prefixes: VecCollection<'scope, T, (P, usize, usize), R>, index: usize) -> VecCollection<'scope, T, (P, usize, usize), R> { let counts = self.indices.count_trace.import(&prefixes.scope()); operators::count::count(prefixes, counts, self.key_selector.clone(), index) } - fn propose(&mut self, prefixes: VecCollection) -> VecCollection { + fn propose(&mut self, prefixes: VecCollection<'scope, T, P, R>) -> VecCollection<'scope, T, (P, V), R> { let propose = self.indices.propose_trace.import(&prefixes.scope()); operators::propose::propose(prefixes, propose, self.key_selector.clone()) } - fn validate(&mut self, extensions: VecCollection) -> VecCollection { + fn validate(&mut self, extensions: VecCollection<'scope, T, (P, V), R>) -> VecCollection<'scope, T, (P, V), R> { let validate = self.indices.validate_trace.import(&extensions.scope()); operators::validate::validate(extensions, validate, self.key_selector.clone()) } diff --git a/dogsdogsdogs/src/operators/count.rs b/dogsdogsdogs/src/operators/count.rs index a92066f1e..34aebc738 100644 --- a/dogsdogsdogs/src/operators/count.rs +++ b/dogsdogsdogs/src/operators/count.rs @@ -9,12 +9,12 @@ use differential_dataflow::trace::TraceReader; /// For each triple, it extracts a key using `key_selector`, and finds the /// associated count in `arrangement`. If the found count is less than `count`, /// the `count` and `index` fields are overwritten with their new values. -pub fn count( - prefixes: VecCollection, - arrangement: Arranged, +pub fn count<'scope, Tr, K, R, F, P>( + prefixes: VecCollection<'scope, Tr::Time, (P, usize, usize), R>, + arrangement: Arranged<'scope, Tr>, key_selector: F, index: usize, -) -> VecCollection +) -> VecCollection<'scope, Tr::Time, (P, usize, usize), R> where Tr: TraceReader+Clone+'static, Tr::KeyContainer: differential_dataflow::trace::implementations::BatchContainer, diff --git a/dogsdogsdogs/src/operators/half_join.rs b/dogsdogsdogs/src/operators/half_join.rs index 8bedb4cbf..e095d95a8 100644 --- a/dogsdogsdogs/src/operators/half_join.rs +++ b/dogsdogsdogs/src/operators/half_join.rs @@ -75,13 +75,13 @@ use differential_dataflow::trace::implementations::BatchContainer; /// Notice that the time is hoisted up into data. The expectation is that /// once out of the "delta flow region", the updates will be `delay`d to the /// times specified in the payloads. -pub fn half_join( - stream: VecCollection, - arrangement: Arranged, +pub fn half_join<'scope, K, V, R, Tr, FF, CF, DOut, S>( + stream: VecCollection<'scope, Tr::Time, (K, V, Tr::Time), R>, + arrangement: Arranged<'scope, Tr>, frontier_func: FF, comparison: CF, mut output_func: S, -) -> VecCollection>::Output> +) -> VecCollection<'scope, Tr::Time, (DOut, Tr::Time), >::Output> where K: Hashable + ExchangeData, V: ExchangeData, @@ -140,14 +140,14 @@ type SessionFor<'a, 'b, T, CB> = /// yield control, as a function of the elapsed time and the number of matched /// records. Note this is not the number of *output* records, owing mainly to /// the number of matched records being easiest to record with low overhead. -pub fn half_join_internal_unsafe( - stream: VecCollection, - mut arrangement: Arranged, +pub fn half_join_internal_unsafe<'scope, K, V, R, Tr, FF, CF, Y, S, CB>( + stream: VecCollection<'scope, Tr::Time, (K, V, Tr::Time), R>, + mut arrangement: Arranged<'scope, Tr>, frontier_func: FF, comparison: CF, yield_function: Y, mut output_func: S, -) -> Stream +) -> Stream<'scope, Tr::Time, CB::Container> where K: Hashable + ExchangeData, V: ExchangeData, diff --git a/dogsdogsdogs/src/operators/half_join2.rs b/dogsdogsdogs/src/operators/half_join2.rs index 6b19c0bbd..2cc963f76 100644 --- a/dogsdogsdogs/src/operators/half_join2.rs +++ b/dogsdogsdogs/src/operators/half_join2.rs @@ -62,13 +62,13 @@ use timely::dataflow::operators::CapabilitySet; /// Notice that the time is hoisted up into data. The expectation is that /// once out of the "delta flow region", the updates will be `delay`d to the /// times specified in the payloads. -pub fn half_join( - stream: VecCollection, - arrangement: Arranged, +pub fn half_join<'scope, K, V, R, Tr, FF, CF, DOut, S>( + stream: VecCollection<'scope, Tr::Time, (K, V, Tr::Time), R>, + arrangement: Arranged<'scope, Tr>, frontier_func: FF, comparison: CF, mut output_func: S, -) -> VecCollection>::Output> +) -> VecCollection<'scope, Tr::Time, (DOut, Tr::Time), >::Output> where K: Hashable + ExchangeData, V: ExchangeData, @@ -118,14 +118,14 @@ where /// yield control, as a function of the elapsed time and the number of matched /// records. Note this is not the number of *output* records, owing mainly to /// the number of matched records being easiest to record with low overhead. -pub fn half_join_internal_unsafe( - stream: VecCollection, - mut arrangement: Arranged, +pub fn half_join_internal_unsafe<'scope, K, V, R, Tr, FF, CF, Y, S, CB>( + stream: VecCollection<'scope, Tr::Time, (K, V, Tr::Time), R>, + mut arrangement: Arranged<'scope, Tr>, frontier_func: FF, comparison: CF, yield_function: Y, mut output_func: S, -) -> Stream +) -> Stream<'scope, Tr::Time, CB::Container> where K: Hashable + ExchangeData, V: ExchangeData, diff --git a/dogsdogsdogs/src/operators/lookup_map.rs b/dogsdogsdogs/src/operators/lookup_map.rs index b15aba314..702bdf138 100644 --- a/dogsdogsdogs/src/operators/lookup_map.rs +++ b/dogsdogsdogs/src/operators/lookup_map.rs @@ -16,15 +16,15 @@ use differential_dataflow::trace::implementations::BatchContainer; /// This method takes a stream of prefixes and for each determines a /// key with `key_selector` and then proposes all pair af the prefix /// and values associated with the key in `arrangement`. -pub fn lookup_map( - prefixes: VecCollection, - mut arrangement: Arranged, +pub fn lookup_map<'scope, D, K, R, Tr, F, DOut, ROut, S>( + prefixes: VecCollection<'scope, Tr::Time, D, R>, + mut arrangement: Arranged<'scope, Tr>, key_selector: F, mut output_func: S, supplied_key0: K, supplied_key1: K, supplied_key2: K, -) -> VecCollection +) -> VecCollection<'scope, Tr::Time, DOut, ROut> where Tr: for<'a> TraceReader< Time: std::hash::Hash, diff --git a/dogsdogsdogs/src/operators/propose.rs b/dogsdogsdogs/src/operators/propose.rs index 869e67aab..187c79256 100644 --- a/dogsdogsdogs/src/operators/propose.rs +++ b/dogsdogsdogs/src/operators/propose.rs @@ -11,11 +11,11 @@ use differential_dataflow::trace::TraceReader; /// create a join if the `prefixes` collection is also arranged and responds to changes that /// `arrangement` undergoes. More complicated patterns are also appropriate, as in the case /// of delta queries. -pub fn propose( - prefixes: VecCollection, - arrangement: Arranged, +pub fn propose<'scope, Tr, K, F, P, V>( + prefixes: VecCollection<'scope, Tr::Time, P, Tr::Diff>, + arrangement: Arranged<'scope, Tr>, key_selector: F, -) -> VecCollection +) -> VecCollection<'scope, Tr::Time, (P, V), Tr::Diff> where Tr: for<'a> TraceReader< ValOwn = V, @@ -44,11 +44,11 @@ where /// Unlike `propose`, this method does not scale the multiplicity of matched /// prefixes by the number of matches in `arrangement`. This can be useful to /// avoid the need to prepare an arrangement of distinct extensions. -pub fn propose_distinct( - prefixes: VecCollection, - arrangement: Arranged, +pub fn propose_distinct<'scope, Tr, K, F, P, V>( + prefixes: VecCollection<'scope, Tr::Time, P, Tr::Diff>, + arrangement: Arranged<'scope, Tr>, key_selector: F, -) -> VecCollection +) -> VecCollection<'scope, Tr::Time, (P, V), Tr::Diff> where Tr: for<'a> TraceReader< ValOwn = V, diff --git a/dogsdogsdogs/src/operators/validate.rs b/dogsdogsdogs/src/operators/validate.rs index 0a28c2483..565bcf510 100644 --- a/dogsdogsdogs/src/operators/validate.rs +++ b/dogsdogsdogs/src/operators/validate.rs @@ -10,11 +10,11 @@ use differential_dataflow::trace::TraceReader; /// This method takes a stream of prefixes and for each determines a /// key with `key_selector` and then proposes all pair af the prefix /// and values associated with the key in `arrangement`. -pub fn validate( - extensions: VecCollection, - arrangement: Arranged, +pub fn validate<'scope, K, V, Tr, F, P>( + extensions: VecCollection<'scope, Tr::Time, (P, V), Tr::Diff>, + arrangement: Arranged<'scope, Tr>, key_selector: F, -) -> VecCollection +) -> VecCollection<'scope, Tr::Time, (P, V), Tr::Diff> where Tr: for<'a> TraceReader< Time: std::hash::Hash, diff --git a/experiments/src/bin/deals-interactive.rs b/experiments/src/bin/deals-interactive.rs index a3c29b5e3..c3d2a524a 100644 --- a/experiments/src/bin/deals-interactive.rs +++ b/experiments/src/bin/deals-interactive.rs @@ -12,7 +12,7 @@ use differential_dataflow::trace::implementations::ValSpine; use differential_dataflow::operators::arrange::TraceAgent; use differential_dataflow::operators::arrange::Arranged; -type Arrange = Arranged>>; +type Arrange<'s, G, K, V, R> = Arranged<'s, TraceAgent>>; type Node = u32; @@ -203,12 +203,12 @@ fn main() { }).unwrap(); } -fn interactive( - edges: Arrange, - tc_1: VecCollection, - tc_2: VecCollection, - sg_x: VecCollection -) -> VecCollection { +fn interactive<'s, G: timely::progress::Timestamp + Lattice>( + edges: Arrange<'s, G, Node, Node, isize>, + tc_1: VecCollection<'s, G, Node>, + tc_2: VecCollection<'s, G, Node>, + sg_x: VecCollection<'s, G, Node> +) -> VecCollection<'s, G, Node> { // descendants of tc_1: let tc_1_enter = tc_1.clone(); diff --git a/experiments/src/bin/deals.rs b/experiments/src/bin/deals.rs index 89c9699a0..966cc4840 100644 --- a/experiments/src/bin/deals.rs +++ b/experiments/src/bin/deals.rs @@ -12,7 +12,7 @@ use differential_dataflow::operators::iterate::Variable; use differential_dataflow::lattice::Lattice; use differential_dataflow::difference::Present; -type EdgeArranged = Arranged>>; +type EdgeArranged<'s, G, K, V, R> = Arranged<'s, TraceAgent>>; type Node = u32; type Edge = (Node, Node); @@ -82,7 +82,7 @@ fn main() { use timely::order::Product; // returns pairs (n, s) indicating node n can be reached from a root in s steps. -fn tc(edges: EdgeArranged) -> VecCollection { +fn tc<'s, T: timely::progress::Timestamp + Lattice + Default + timely::order::Empty>(edges: EdgeArranged<'s, T, Node, Node, Present>) -> VecCollection<'s, T, Edge, Present> { // repeatedly update minimal distances each node can be reached from each root let outer = edges.stream.scope(); @@ -108,7 +108,7 @@ fn tc } // returns pairs (n, s) indicating node n can be reached from a root in s steps. -fn sg(edges: EdgeArranged) -> VecCollection { +fn sg<'s, T: timely::progress::Timestamp + Lattice + Default + timely::order::Empty>(edges: EdgeArranged<'s, T, Node, Node, Present>) -> VecCollection<'s, T, Edge, Present> { let peers = edges.clone().join_core(edges.clone(), |_,&x,&y| Some((x,y))).filter(|&(x,y)| x != y); diff --git a/experiments/src/bin/graphs-interactive-alt.rs b/experiments/src/bin/graphs-interactive-alt.rs index 0ba7ddd4b..032f5d5ec 100644 --- a/experiments/src/bin/graphs-interactive-alt.rs +++ b/experiments/src/bin/graphs-interactive-alt.rs @@ -258,13 +258,13 @@ use differential_dataflow::trace::implementations::ValSpine; use differential_dataflow::operators::arrange::TraceAgent; use differential_dataflow::operators::arrange::Arranged; -type Arrange = Arranged>>; +type Arrange<'s, T, K, V, R> = Arranged<'s, TraceAgent>>; // returns pairs (n, s) indicating node n can be reached from a root in s steps. -fn three_hop( - forward_graph: Arrange, - reverse_graph: Arrange, - goals: VecCollection) -> VecCollection +fn three_hop<'s, T: Timestamp + Lattice + Ord>( + forward_graph: Arrange<'s, T, Node, Node, isize>, + reverse_graph: Arrange<'s, T, Node, Node, isize>, + goals: VecCollection<'s, T, (Node, Node)>) -> VecCollection<'s, T, ((Node, Node), u32)> { let sources = goals.clone().map(|(x,_)| x); @@ -288,10 +288,10 @@ fn three_hop( } // returns pairs (n, s) indicating node n can be reached from a root in s steps. -fn _bidijkstra( - forward_graph: Arrange, - reverse_graph: Arrange, - goals: VecCollection) -> VecCollection +fn _bidijkstra<'s, T: Timestamp + Lattice + Ord>( + forward_graph: Arrange<'s, T, Node, Node, isize>, + reverse_graph: Arrange<'s, T, Node, Node, isize>, + goals: VecCollection<'s, T, (Node, Node)>) -> VecCollection<'s, T, ((Node, Node), u32)> { let outer = goals.scope(); @@ -362,7 +362,7 @@ fn _bidijkstra( } -fn connected_components(graph: Arrange) -> VecCollection +fn connected_components<'s, T: Timestamp + Lattice + Ord>(graph: Arrange<'s, T, Node, Node, isize>) -> VecCollection<'s, T, (Node, Node)> where T: Lattice + std::hash::Hash { diff --git a/experiments/src/bin/graphs-interactive-neu-zwei.rs b/experiments/src/bin/graphs-interactive-neu-zwei.rs index 0a5ee0d25..8801a1948 100644 --- a/experiments/src/bin/graphs-interactive-neu-zwei.rs +++ b/experiments/src/bin/graphs-interactive-neu-zwei.rs @@ -226,13 +226,13 @@ use differential_dataflow::trace::implementations::ValSpine; use differential_dataflow::operators::arrange::TraceAgent; use differential_dataflow::operators::arrange::Arranged; -type Arrange = Arranged>>; +type Arrange<'s, T, K, V, R> = Arranged<'s, TraceAgent>>; // returns pairs (n, s) indicating node n can be reached from a root in s steps. -fn three_hop( - forward_graph: Arrange, - reverse_graph: Arrange, - goals: VecCollection) -> VecCollection +fn three_hop<'s, T: Timestamp + Lattice + Ord>( + forward_graph: Arrange<'s, T, Node, Node, isize>, + reverse_graph: Arrange<'s, T, Node, Node, isize>, + goals: VecCollection<'s, T, (Node, Node)>) -> VecCollection<'s, T, ((Node, Node), u32)> { let sources = goals.clone().map(|(x,_)| x); diff --git a/experiments/src/bin/graphs-interactive-neu.rs b/experiments/src/bin/graphs-interactive-neu.rs index ceda7913c..c12e92845 100644 --- a/experiments/src/bin/graphs-interactive-neu.rs +++ b/experiments/src/bin/graphs-interactive-neu.rs @@ -291,13 +291,13 @@ use differential_dataflow::trace::implementations::ValSpine; use differential_dataflow::operators::arrange::TraceAgent; use differential_dataflow::operators::arrange::Arranged; -type Arrange = Arranged>>; +type Arrange<'s, T, K, V, R> = Arranged<'s, TraceAgent>>; // returns pairs (n, s) indicating node n can be reached from a root in s steps. -fn three_hop( - forward_graph: Arrange, - reverse_graph: Arrange, - goals: VecCollection) -> VecCollection +fn three_hop<'s, T: Timestamp + Lattice + Ord>( + forward_graph: Arrange<'s, T, Node, Node, isize>, + reverse_graph: Arrange<'s, T, Node, Node, isize>, + goals: VecCollection<'s, T, (Node, Node)>) -> VecCollection<'s, T, ((Node, Node), u32)> { let sources = goals.clone().map(|(x,_)| x); @@ -321,10 +321,10 @@ fn three_hop( } // returns pairs (n, s) indicating node n can be reached from a root in s steps. -fn _bidijkstra( - forward_graph: Arrange, - reverse_graph: Arrange, - goals: VecCollection) -> VecCollection +fn _bidijkstra<'s, T: Timestamp + Lattice + Ord>( + forward_graph: Arrange<'s, T, Node, Node, isize>, + reverse_graph: Arrange<'s, T, Node, Node, isize>, + goals: VecCollection<'s, T, (Node, Node)>) -> VecCollection<'s, T, ((Node, Node), u32)> { let outer = goals.scope(); diff --git a/experiments/src/bin/graphs-interactive.rs b/experiments/src/bin/graphs-interactive.rs index 9ee11bf7b..d7914c66e 100644 --- a/experiments/src/bin/graphs-interactive.rs +++ b/experiments/src/bin/graphs-interactive.rs @@ -194,14 +194,14 @@ use differential_dataflow::trace::implementations::ValSpine; use differential_dataflow::operators::arrange::TraceAgent; use differential_dataflow::operators::arrange::Arranged; -type Arrange = Arranged>>; +type Arrange<'s, T, K, V, R> = Arranged<'s, TraceAgent>>; // returns pairs (n, s) indicating node n can be reached from a root in s steps. -fn three_hop( - forward_graph: Arrange, - reverse_graph: Arrange, - goals: VecCollection) -> VecCollection +fn three_hop<'s, T: Timestamp + Lattice + Ord>( + forward_graph: Arrange<'s, T, Node, Node, isize>, + reverse_graph: Arrange<'s, T, Node, Node, isize>, + goals: VecCollection<'s, T, (Node, Node)>) -> VecCollection<'s, T, ((Node, Node), u32)> { let sources = goals.clone().map(|(x,_)| x); @@ -225,10 +225,10 @@ fn three_hop( } // returns pairs (n, s) indicating node n can be reached from a root in s steps. -fn _bidijkstra( - forward_graph: Arrange, - reverse_graph: Arrange, - goals: VecCollection) -> VecCollection +fn _bidijkstra<'s, T: Timestamp + Lattice + Ord>( + forward_graph: Arrange<'s, T, Node, Node, isize>, + reverse_graph: Arrange<'s, T, Node, Node, isize>, + goals: VecCollection<'s, T, (Node, Node)>) -> VecCollection<'s, T, ((Node, Node), u32)> { let outer = goals.scope(); diff --git a/experiments/src/bin/graphs-static.rs b/experiments/src/bin/graphs-static.rs index 46a89b4a2..02de62956 100644 --- a/experiments/src/bin/graphs-static.rs +++ b/experiments/src/bin/graphs-static.rs @@ -104,10 +104,10 @@ use differential_dataflow::operators::arrange::TraceAgent; type TraceHandle = TraceAgent; -fn reach( +fn reach<'s>( graph: &mut TraceHandle, - roots: VecCollection<(), Node, Diff> -) -> VecCollection<(), Node, Diff> { + roots: VecCollection<'s, (), Node, Diff> +) -> VecCollection<'s, (), Node, Diff> { let graph = graph.import(&roots.scope()); @@ -130,10 +130,10 @@ fn reach( } -fn bfs( +fn bfs<'s>( graph: &mut TraceHandle, - roots: VecCollection<(), Node, Diff> -) -> VecCollection<(), (Node, u32), Diff> { + roots: VecCollection<'s, (), Node, Diff> +) -> VecCollection<'s, (), (Node, u32), Diff> { let graph = graph.import(&roots.scope()); let roots = roots.map(|r| (r,0)); @@ -155,11 +155,11 @@ fn bfs( }) } -fn connected_components( - scope: &mut timely::dataflow::Scope<()>, +fn connected_components<'s>( + scope: &mut timely::dataflow::Scope<'s, ()>, forward: &mut TraceHandle, reverse: &mut TraceHandle, -) -> VecCollection<(), (Node, Node), Diff> { +) -> VecCollection<'s, (), (Node, Node), Diff> { let forward = forward.import(scope); let reverse = reverse.import(scope); diff --git a/experiments/src/bin/graphs.rs b/experiments/src/bin/graphs.rs index 7461d006a..c5294a37b 100644 --- a/experiments/src/bin/graphs.rs +++ b/experiments/src/bin/graphs.rs @@ -86,10 +86,10 @@ use differential_dataflow::operators::arrange::TraceAgent; type TraceHandle = TraceAgent; -fn reach( +fn reach<'s>( graph: &mut TraceHandle, - roots: VecCollection<(), Node> -) -> VecCollection<(), Node> { + roots: VecCollection<'s, (), Node> +) -> VecCollection<'s, (), Node> { let graph = graph.import(&roots.scope()); @@ -108,10 +108,10 @@ fn reach( } -fn bfs( +fn bfs<'s>( graph: &mut TraceHandle, - roots: VecCollection<(), Node> -) -> VecCollection<(), (Node, u32)> { + roots: VecCollection<'s, (), Node> +) -> VecCollection<'s, (), (Node, u32)> { let graph = graph.import(&roots.scope()); let roots = roots.map(|r| (r,0)); diff --git a/interactive/src/plan/concat.rs b/interactive/src/plan/concat.rs index 149b81049..2190b0359 100644 --- a/interactive/src/plan/concat.rs +++ b/interactive/src/plan/concat.rs @@ -19,10 +19,10 @@ impl Render for Concat { type Value = V; - fn render( + fn render<'scope>( &self, - scope: &mut Scope