Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 24 additions & 24 deletions diagnostics/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,10 +243,10 @@ fn quantize(time: Duration, interval: Duration) -> Duration {
}

/// Quantize timestamps in a collection's inner stream.
fn quantize_collection<D>(
collection: VecCollection<Duration, D, i64>,
fn quantize_collection<'scope, D>(
collection: VecCollection<'scope, Duration, D, i64>,
interval: Duration,
) -> VecCollection<Duration, D, i64>
) -> VecCollection<'scope, Duration, D, i64>
where
D: differential_dataflow::Data,
{
Expand Down Expand Up @@ -400,11 +400,11 @@ fn install_loggers(
// ============================================================================

/// Internal: collections before arrangement, used for the cross-join.
struct TimelyCollections {
operators: VecCollection<Duration, (usize, String, Vec<usize>), i64>,
channels: VecCollection<Duration, (usize, Vec<usize>, (usize, usize), (usize, usize)), i64>,
elapsed: VecCollection<Duration, usize, i64>,
messages: VecCollection<Duration, usize, i64>,
struct TimelyCollections<'scope> {
operators: VecCollection<'scope, Duration, (usize, String, Vec<usize>), i64>,
channels: VecCollection<'scope, Duration, (usize, Vec<usize>, (usize, usize), (usize, usize)), i64>,
elapsed: VecCollection<'scope, Duration, usize, i64>,
messages: VecCollection<'scope, Duration, usize, i64>,
}

#[derive(Default)]
Expand All @@ -414,10 +414,10 @@ struct TimelyDemuxState {
}

/// Build timely logging collections and arrangements.
fn construct_timely(
scope: &mut Scope<Duration>,
stream: Stream<Duration, Vec<(Duration, TimelyEvent)>>,
) -> (TimelyTraces, TimelyCollections) {
fn construct_timely<'scope>(
scope: &mut Scope<'scope, Duration>,
stream: Stream<'scope, Duration, Vec<(Duration, TimelyEvent)>>,
) -> (TimelyTraces, TimelyCollections<'scope>) {
type OpUpdate = ((usize, String, Vec<usize>), Duration, i64);
type ChUpdate = ((usize, Vec<usize>, (usize, usize), (usize, usize)), Duration, i64);
type ElUpdate = (usize, Duration, i64);
Expand Down Expand Up @@ -534,21 +534,21 @@ fn construct_timely(
// ============================================================================

/// Internal: collections before arrangement, used for the cross-join.
struct DifferentialCollections {
arrangement_batches: VecCollection<Duration, usize, i64>,
arrangement_records: VecCollection<Duration, usize, i64>,
sharing: VecCollection<Duration, usize, i64>,
batcher_records: VecCollection<Duration, usize, i64>,
batcher_size: VecCollection<Duration, usize, i64>,
batcher_capacity: VecCollection<Duration, usize, i64>,
batcher_allocations: VecCollection<Duration, usize, i64>,
struct DifferentialCollections<'scope> {
arrangement_batches: VecCollection<'scope, Duration, usize, i64>,
arrangement_records: VecCollection<'scope, Duration, usize, i64>,
sharing: VecCollection<'scope, Duration, usize, i64>,
batcher_records: VecCollection<'scope, Duration, usize, i64>,
batcher_size: VecCollection<'scope, Duration, usize, i64>,
batcher_capacity: VecCollection<'scope, Duration, usize, i64>,
batcher_allocations: VecCollection<'scope, Duration, usize, i64>,
}

/// Build differential logging collections and arrangements.
fn construct_differential(
scope: &mut Scope<Duration>,
stream: Stream<Duration, Vec<(Duration, DifferentialEvent)>>,
) -> (DifferentialTraces, DifferentialCollections) {
fn construct_differential<'scope>(
scope: &mut Scope<'scope, Duration>,
stream: Stream<'scope, Duration, Vec<(Duration, DifferentialEvent)>>,
) -> (DifferentialTraces, DifferentialCollections<'scope>) {
type Update = (usize, Duration, i64);

let mut demux = OperatorBuilder::new("Differential Demux".to_string(), scope.clone());
Expand Down
2 changes: 1 addition & 1 deletion differential-dataflow/examples/bfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ fn main() {
}

// returns pairs (n, s) indicating node n can be reached from a root in s steps.
fn bfs<T>(edges: VecCollection<T, Edge>, roots: VecCollection<T, Node>) -> VecCollection<T, (Node, u32)>
fn bfs<'scope, T>(edges: VecCollection<'scope, T, Edge>, roots: VecCollection<'scope, T, Node>) -> VecCollection<'scope, T, (Node, u32)>
where
T: timely::progress::Timestamp + Lattice + Ord,
{
Expand Down
8 changes: 4 additions & 4 deletions differential-dataflow/examples/columnar/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,10 @@ mod reachability {
/// Compute the set of nodes reachable from `roots` along directed `edges`.
///
/// Returns `(node, ())` for each reachable node.
pub fn reach(
edges: Collection<Time, RecordedUpdates<(Node, Node, Time, Diff)>>,
roots: Collection<Time, RecordedUpdates<(Node, (), Time, Diff)>>,
) -> Collection<Time, RecordedUpdates<(Node, (), Time, Diff)>>
pub fn reach<'scope>(
edges: Collection<'scope, Time, RecordedUpdates<(Node, Node, Time, Diff)>>,
roots: Collection<'scope, Time, RecordedUpdates<(Node, (), Time, Diff)>>,
) -> Collection<'scope, Time, RecordedUpdates<(Node, (), Time, Diff)>>
{
let outer = edges.inner.scope();

Expand Down
2 changes: 1 addition & 1 deletion differential-dataflow/examples/dynamic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ fn main() {
}

// returns pairs (n, s) indicating node n can be reached from a root in s steps.
fn bfs<T>(edges: VecCollection<T, Edge>, roots: VecCollection<T, Node>) -> VecCollection<T, (Node, u32)>
fn bfs<'scope, T>(edges: VecCollection<'scope, T, Edge>, roots: VecCollection<'scope, T, Node>) -> VecCollection<'scope, T, (Node, u32)>
where
T: timely::progress::Timestamp + Lattice + Ord,
{
Expand Down
24 changes: 12 additions & 12 deletions differential-dataflow/examples/graspan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ use differential_dataflow::operators::arrange::{Arranged, TraceAgent};

type TraceKeyHandle<K,T,R> = TraceAgent<KeySpine<K, T, R>>;
type TraceValHandle<K,V,T,R> = TraceAgent<ValSpine<K, V, T, R>>;
type Arrange<T,K,V,R> = Arranged<TraceValHandle<K, V, T, R>>;
type Arrange<'scope, T,K,V,R> = Arranged<'scope, TraceValHandle<K, V, T, R>>;

/// An evolving set of edges.
///
Expand All @@ -78,17 +78,17 @@ type Arrange<T,K,V,R> = Arranged<TraceValHandle<K, V, T, R>>;
///
/// An edge variable provides arranged representations of its contents, even before they are
/// completely defined, in support of recursively defined productions.
pub struct EdgeVariable<T: Timestamp + Lattice> {
variable: VecVariable<T, Edge, Diff>,
collection: VecCollection<T, Edge, Diff>,
current: VecCollection<T, Edge, Diff>,
forward: Option<Arrange<T, Node, Node, Diff>>,
reverse: Option<Arrange<T, Node, Node, Diff>>,
pub struct EdgeVariable<'scope, T: Timestamp + Lattice> {
variable: VecVariable<'scope, T, Edge, Diff>,
collection: VecCollection<'scope, T, Edge, Diff>,
current: VecCollection<'scope, T, Edge, Diff>,
forward: Option<Arrange<'scope, T, Node, Node, Diff>>,
reverse: Option<Arrange<'scope, T, Node, Node, Diff>>,
}

impl<T: Timestamp + Lattice> EdgeVariable<T> {
impl<'scope, T: Timestamp + Lattice> EdgeVariable<'scope, T> {
/// Creates a new variable initialized with `source`.
pub fn from(source: VecCollection<T, Edge>, step: T::Summary) -> Self {
pub fn from(source: VecCollection<'scope, T, Edge>, step: T::Summary) -> Self {
let (variable, collection) = VecVariable::new(&mut source.scope(), step);
EdgeVariable {
variable,
Expand All @@ -99,7 +99,7 @@ impl<T: Timestamp + Lattice> EdgeVariable<T> {
}
}
/// Concatenates `production` into the definition of the variable.
pub fn add_production(&mut self, production: VecCollection<T, Edge, Diff>) {
pub fn add_production(&mut self, production: VecCollection<'scope, T, Edge, Diff>) {
self.current = self.current.clone().concat(production);
}
/// Finalizes the variable, connecting its recursive definition.
Expand All @@ -112,14 +112,14 @@ impl<T: Timestamp + Lattice> EdgeVariable<T> {
self.variable.set(distinct);
}
/// The collection arranged in the forward direction.
pub fn forward(&mut self) -> &Arrange<T, Node, Node, Diff> {
pub fn forward(&mut self) -> &Arrange<'scope, T, Node, Node, Diff> {
if self.forward.is_none() {
self.forward = Some(self.collection.clone().arrange_by_key());
}
self.forward.as_ref().unwrap()
}
/// The collection arranged in the reverse direction.
pub fn reverse(&mut self) -> &Arrange<T, Node, Node, Diff> {
pub fn reverse(&mut self) -> &Arrange<'scope, T, Node, Node, Diff> {
if self.reverse.is_none() {
self.reverse = Some(self.collection.clone().map(|(x,y)| (y,x)).arrange_by_key());
}
Expand Down
2 changes: 1 addition & 1 deletion differential-dataflow/examples/interpreted.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ fn main() {
}).unwrap();
}

fn interpret<T>(edges: VecCollection<T, Edge>, relations: &[(usize, usize)]) -> VecCollection<T, Vec<Node>>
fn interpret<'scope, T>(edges: VecCollection<'scope, T, Edge>, relations: &[(usize, usize)]) -> VecCollection<'scope, T, Vec<Node>>
where
T: timely::progress::Timestamp + Lattice + Hash + Ord,
{
Expand Down
2 changes: 1 addition & 1 deletion differential-dataflow/examples/monoid-bfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ fn main() {
}

// returns pairs (n, s) indicating node n can be reached from a root in s steps.
fn bfs<T>(edges: VecCollection<T, Edge, MinSum>, roots: VecCollection<T, Node, MinSum>) -> VecCollection<T, Node, MinSum>
fn bfs<'scope, T>(edges: VecCollection<'scope, T, Edge, MinSum>, roots: VecCollection<'scope, T, Node, MinSum>) -> VecCollection<'scope, T, Node, MinSum>
where
T: timely::progress::Timestamp + Lattice + Ord,
{
Expand Down
32 changes: 16 additions & 16 deletions differential-dataflow/examples/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,19 +113,19 @@ fn main() {
///
/// The computation to determine this, and to maintain it as times change, is an iterative
/// computation that propagates times and maintains the minimal elements at each location.
fn frontier<G, T>(
nodes: VecCollection<G, (Target, Source, T::Summary)>,
edges: VecCollection<G, (Source, Target)>,
times: VecCollection<G, (Location, T)>,
) -> VecCollection<G, (Location, T)>
fn frontier<'scope, G, T>(
nodes: VecCollection<'scope, G, (Target, Source, T::Summary)>,
edges: VecCollection<'scope, G, (Source, Target)>,
times: VecCollection<'scope, G, (Location, T)>,
) -> VecCollection<'scope, G, (Location, T)>
where
G: Timestamp + Lattice + Ord,
T: Timestamp<Summary: differential_dataflow::ExchangeData>+std::hash::Hash,
{
// Translate node and edge transitions into a common Location to Location edge with an associated Summary.
let nodes = nodes.map(|(target, source, summary)| (Location::from(target), (Location::from(source), summary)));
let edges = edges.map(|(source, target)| (Location::from(source), (Location::from(target), Default::default())));
let transitions: VecCollection<G, (Location, (Location, T::Summary))> = nodes.concat(edges);
let transitions: VecCollection<'scope, G, (Location, (Location, T::Summary))> = nodes.concat(edges);

times
.clone()
Expand All @@ -148,10 +148,10 @@ where
}

/// Summary paths from locations to operator zero inputs.
fn summarize<G, T>(
nodes: VecCollection<G, (Target, Source, T::Summary)>,
edges: VecCollection<G, (Source, Target)>,
) -> VecCollection<G, (Location, (Location, T::Summary))>
fn summarize<'scope, G, T>(
nodes: VecCollection<'scope, G, (Target, Source, T::Summary)>,
edges: VecCollection<'scope, G, (Source, Target)>,
) -> VecCollection<'scope, G, (Location, (Location, T::Summary))>
where
G: Timestamp + Lattice + Ord,
T: Timestamp<Summary: differential_dataflow::ExchangeData+std::hash::Hash>,
Expand All @@ -167,7 +167,7 @@ where
// Retain node connections along "default" timestamp summaries.
let nodes = nodes.map(|(target, source, summary)| (Location::from(source), (Location::from(target), summary)));
let edges = edges.map(|(source, target)| (Location::from(target), (Location::from(source), Default::default())));
let transitions: VecCollection<G, (Location, (Location, T::Summary))> = nodes.concat(edges);
let transitions: VecCollection<'scope, G, (Location, (Location, T::Summary))> = nodes.concat(edges);

zero_inputs
.clone()
Expand All @@ -193,10 +193,10 @@ where


/// Identifies cycles along paths that do not increment timestamps.
fn find_cycles<G: Timestamp, T: Timestamp>(
nodes: VecCollection<G, (Target, Source, T::Summary)>,
edges: VecCollection<G, (Source, Target)>,
) -> VecCollection<G, (Location, Location)>
fn find_cycles<'scope, G: Timestamp, T: Timestamp>(
nodes: VecCollection<'scope, G, (Target, Source, T::Summary)>,
edges: VecCollection<'scope, G, (Source, Target)>,
) -> VecCollection<'scope, G, (Location, Location)>
where
G: Timestamp + Lattice + Ord,
T: Timestamp<Summary: differential_dataflow::ExchangeData>,
Expand All @@ -211,7 +211,7 @@ where
}
});
let edges = edges.map(|(source, target)| (Location::from(source), Location::from(target)));
let transitions: VecCollection<G, (Location, Location)> = nodes.concat(edges);
let transitions: VecCollection<'scope, G, (Location, Location)> = nodes.concat(edges);

// Repeatedly restrict to locations with an incoming path.
transitions
Expand Down
2 changes: 1 addition & 1 deletion differential-dataflow/examples/stackoverflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ fn main() {
}

// returns pairs (n, s) indicating node n can be reached from a root in s steps.
fn bfs<T>(edges: VecCollection<T, Edge>, roots: VecCollection<T, Node>) -> VecCollection<T, (Node, u32)>
fn bfs<'scope, T>(edges: VecCollection<'scope, T, Edge>, roots: VecCollection<'scope, T, Node>) -> VecCollection<'scope, T, (Node, u32)>
where
T: timely::progress::Timestamp + Lattice + Ord,
{
Expand Down
4 changes: 2 additions & 2 deletions differential-dataflow/src/algorithms/graphs/bfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::operators::*;
use crate::lattice::Lattice;

/// Returns pairs (node, dist) indicating distance of each node from a root.
pub fn bfs<T, N>(edges: VecCollection<T, (N,N)>, roots: VecCollection<T, N>) -> VecCollection<T, (N,u32)>
pub fn bfs<'scope, T, N>(edges: VecCollection<'scope, T, (N,N)>, roots: VecCollection<'scope, T, N>) -> VecCollection<'scope, T, (N,u32)>
where
T: Timestamp + Lattice + Ord,
N: ExchangeData+Hash,
Expand All @@ -22,7 +22,7 @@ use crate::trace::TraceReader;
use crate::operators::arrange::Arranged;

/// Returns pairs (node, dist) indicating distance of each node from a root.
pub fn bfs_arranged<N, Tr>(edges: Arranged<Tr>, roots: VecCollection<Tr::Time, N>) -> VecCollection<Tr::Time, (N, u32)>
pub fn bfs_arranged<'scope, N, Tr>(edges: Arranged<'scope, Tr>, roots: VecCollection<'scope, Tr::Time, N>) -> VecCollection<'scope, Tr::Time, (N, u32)>
where
N: ExchangeData+Hash,
Tr: for<'a> TraceReader<Key<'a>=&'a N, Val<'a>=&'a N, Diff=isize>+Clone+'static,
Expand Down
12 changes: 6 additions & 6 deletions differential-dataflow/src/algorithms/graphs/bijkstra.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::operators::iterate::Variable;
/// Goals that cannot reach from the source to the target are relatively expensive, as
/// the entire graph must be explored to confirm this. A graph connectivity pre-filter
/// could be good insurance here.
pub fn bidijkstra<T, N>(edges: VecCollection<T, (N,N)>, goals: VecCollection<T, (N,N)>) -> VecCollection<T, ((N,N), u32)>
pub fn bidijkstra<'scope, T, N>(edges: VecCollection<'scope, T, (N,N)>, goals: VecCollection<'scope, T, (N,N)>) -> VecCollection<'scope, T, ((N,N), u32)>
where
T: Timestamp + Lattice + Ord,
N: ExchangeData+Hash,
Expand All @@ -33,11 +33,11 @@ use crate::trace::TraceReader;
use crate::operators::arrange::Arranged;

/// Bi-directional Dijkstra search using arranged forward and reverse edge collections.
pub fn bidijkstra_arranged<N, Tr>(
forward: Arranged<Tr>,
reverse: Arranged<Tr>,
goals: VecCollection<Tr::Time, (N,N)>
) -> VecCollection<Tr::Time, ((N,N), u32)>
pub fn bidijkstra_arranged<'scope, N, Tr>(
forward: Arranged<'scope, Tr>,
reverse: Arranged<'scope, Tr>,
goals: VecCollection<'scope, Tr::Time, (N,N)>
) -> VecCollection<'scope, Tr::Time, ((N,N), u32)>
where
N: ExchangeData+Hash,
Tr: for<'a> TraceReader<Key<'a>=&'a N, Val<'a>=&'a N, Diff=isize>+Clone+'static,
Expand Down
6 changes: 3 additions & 3 deletions differential-dataflow/src/algorithms/graphs/propagate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::difference::{Abelian, Multiply};
/// This algorithm naively propagates all labels at once, much like standard label propagation.
/// To more carefully control the label propagation, consider `propagate_core` which supports a
/// method to limit the introduction of labels.
pub fn propagate<T, N, L, R>(edges: VecCollection<T, (N,N), R>, nodes: VecCollection<T,(N,L),R>) -> VecCollection<T,(N,L),R>
pub fn propagate<'scope, T, N, L, R>(edges: VecCollection<'scope, T, (N,N), R>, nodes: VecCollection<'scope, T,(N,L),R>) -> VecCollection<'scope, T,(N,L),R>
where
T: Timestamp + Lattice + Ord + Hash,
N: ExchangeData+Hash,
Expand All @@ -30,7 +30,7 @@ where
/// This algorithm naively propagates all labels at once, much like standard label propagation.
/// To more carefully control the label propagation, consider `propagate_core` which supports a
/// method to limit the introduction of labels.
pub fn propagate_at<T, N, L, F, R>(edges: VecCollection<T, (N,N), R>, nodes: VecCollection<T,(N,L),R>, logic: F) -> VecCollection<T,(N,L),R>
pub fn propagate_at<'scope, T, N, L, F, R>(edges: VecCollection<'scope, T, (N,N), R>, nodes: VecCollection<'scope, T,(N,L),R>, logic: F) -> VecCollection<'scope, T,(N,L),R>
where
T: Timestamp + Lattice + Ord + Hash,
N: ExchangeData+Hash,
Expand All @@ -51,7 +51,7 @@ use crate::operators::arrange::arrangement::Arranged;
/// This variant takes a pre-arranged edge collection, to facilitate re-use, and allows
/// a method `logic` to specify the rounds in which we introduce various labels. The output
/// of `logic should be a number in the interval \[0,64\],
pub fn propagate_core<N, L, Tr, F, R>(edges: Arranged<Tr>, nodes: VecCollection<Tr::Time,(N,L),R>, logic: F) -> VecCollection<Tr::Time,(N,L),R>
pub fn propagate_core<'scope, N, L, Tr, F, R>(edges: Arranged<'scope, Tr>, nodes: VecCollection<'scope, Tr::Time,(N,L),R>, logic: F) -> VecCollection<'scope, Tr::Time,(N,L),R>
where
N: ExchangeData+Hash,
R: ExchangeData+Abelian,
Expand Down
6 changes: 3 additions & 3 deletions differential-dataflow/src/algorithms/graphs/scc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::difference::{Abelian, Multiply};
use super::propagate::propagate;

/// Returns the subset of edges in the same strongly connected component.
pub fn strongly_connected<T, N, R>(graph: VecCollection<T, (N,N), R>) -> VecCollection<T, (N,N), R>
pub fn strongly_connected<'scope, T, N, R>(graph: VecCollection<'scope, T, (N,N), R>) -> VecCollection<'scope, T, (N,N), R>
where
T: Timestamp + Lattice + Ord + Hash,
N: ExchangeData + Hash,
Expand All @@ -36,8 +36,8 @@ where
})
}

fn trim_edges<T, N, R>(cycle: VecCollection<T, (N,N), R>, edges: VecCollection<T, (N,N), R>)
-> VecCollection<T, (N,N), R>
fn trim_edges<'scope, T, N, R>(cycle: VecCollection<'scope, T, (N,N), R>, edges: VecCollection<'scope, T, (N,N), R>)
-> VecCollection<'scope, T, (N,N), R>
where
T: Timestamp + Lattice + Ord + Hash,
N: ExchangeData + Hash,
Expand Down
10 changes: 5 additions & 5 deletions differential-dataflow/src/algorithms/graphs/sequential.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::lattice::Lattice;
use crate::operators::*;
use crate::hashable::Hashable;

fn _color<T, N>(edges: VecCollection<T, (N,N)>) -> VecCollection<T,(N,Option<u32>)>
fn _color<'scope, T, N>(edges: VecCollection<'scope, T, (N,N)>) -> VecCollection<'scope, T,(N,Option<u32>)>
where
T: Timestamp + Lattice + Ord + Hash,
N: ExchangeData+Hash,
Expand Down Expand Up @@ -40,10 +40,10 @@ where
/// a node "fires" once all of its neighbors with lower identifier have
/// fired, and we apply `logic` to the new state of lower neighbors and
/// the old state (input) of higher neighbors.
pub fn sequence<T, N, V, F>(
state: VecCollection<T, (N,V)>,
edges: VecCollection<T, (N,N)>,
logic: F) -> VecCollection<T, (N,Option<V>)>
pub fn sequence<'scope, T, N, V, F>(
state: VecCollection<'scope, T, (N,V)>,
edges: VecCollection<'scope, T, (N,N)>,
logic: F) -> VecCollection<'scope, T, (N,Option<V>)>
where
T: Timestamp + Lattice + Hash + Ord,
N: ExchangeData+Hashable,
Expand Down
Loading
Loading