Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ rust-version = "1.86"
differential-dataflow = { path = "differential-dataflow", default-features = false, version = "0.22.0" }
#timely = { version = "0.28", default-features = false }
columnar = { version = "0.12", default-features = false }
timely = { git = "https://github.com/TimelyDataflow/timely-dataflow" }
timely = { git = "https://github.com/TimelyDataflow/timely-dataflow", default-features = false }
#timely = { path = "../timely-dataflow/timely/", default-features = false }

[workspace.lints.clippy]
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ Alternately, here is a fragment that computes the set of nodes reachable from a
```rust
let reachable =
roots.iterate(|scope, reach|
edges.enter(&scope)
edges.enter(scope)
.semijoin(reach)
.map(|(src, dst)| dst)
.concat(reach)
Expand Down Expand Up @@ -337,7 +337,7 @@ edges.iterate(|scope, inner| {
.map(|(node,_)| node);

// keep edges between active vertices
edges.enter(&scope)
edges.enter(scope)
.semijoin(active)
.map(|(src,dst)| (dst,src))
.semijoin(active)
Expand Down
4 changes: 2 additions & 2 deletions advent_of_code_2017/src/bin/day_06.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ fn main() {

let stable = banks.iterate(|scope, iter|
iter.map_in_place(|banks| recycle(banks))
.concat(banks.enter(&scope))
.concat(banks.enter(scope))
.distinct()
);

Expand All @@ -43,7 +43,7 @@ fn main() {
loop_point
.iterate(|scope, iter|
iter.map_in_place(|banks| recycle(banks))
.concat(loop_point.enter(&scope))
.concat(loop_point.enter(scope))
.distinct()
)
.map(|_| ((),()))
Expand Down
4 changes: 2 additions & 2 deletions advent_of_code_2017/src/bin/day_07.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1104,10 +1104,10 @@ tvhftq (35)";

let total_weights: VecCollection<_,String> = weights
.iterate(|scope, inner| {
parents.enter(&scope)
parents.enter(scope)
.semijoin(inner)
.map(|(_, parent)| parent)
.concat(weights.enter(&scope))
.concat(weights.enter(scope))
});

parents
Expand Down
2 changes: 1 addition & 1 deletion advent_of_code_2017/src/bin/day_08.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1107,7 +1107,7 @@ wui inc -120 if i > -2038";
.map(|_| ((0, String::new()), 0))
.iterate(|scope, valid| {

let edits = edits.enter(&scope);
let edits = edits.enter(scope);

valid
.prefix_sum_at(edits.map(|(key,_)| key), 0, |_k,x,y| *x + *y)
Expand Down
6 changes: 3 additions & 3 deletions advent_of_code_2017/src/bin/day_09.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ where
if input.len() > 1 { result = combine(result, &(input[1].0).1); }
output.push((result, 1));
})
.concat(unit_ranges.enter(&scope))
.concat(unit_ranges.enter(scope))
)
}

Expand Down Expand Up @@ -154,10 +154,10 @@ where
.iterate(|scope, state| {
aggregates
.filter(|&((_, log),_)| log < 64) // the log = 64 interval doesn't help us here (overflows).
.enter(&scope)
.enter(scope)
.map(|((pos, log), data)| (pos, (log, data)))
.join_map(state, move |&pos, &(log, ref data), state| (pos + (1 << log), combine(state, data)))
.concat(init_state.enter(&scope))
.concat(init_state.enter(scope))
.distinct()
})
.consolidate()
Expand Down
4 changes: 2 additions & 2 deletions advent_of_code_2017/src/bin/day_12.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2035,8 +2035,8 @@ fn main() {
let labels =
nodes
.iterate(|scope, label| {
let edges = edges.enter(&scope);
let nodes = nodes.enter(&scope);
let edges = edges.enter(scope);
let nodes = nodes.enter(scope);
label
.join_map(edges, |_src, &lbl, &tgt| (tgt, lbl))
.concat(nodes)
Expand Down
8 changes: 4 additions & 4 deletions diagnostics/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -415,15 +415,15 @@ struct TimelyDemuxState {

/// Build timely logging collections and arrangements.
fn construct_timely<'scope>(
scope: &mut Scope<'scope, Duration>,
scope: Scope<'scope, Duration>,
stream: Stream<'scope, Duration, Vec<(Duration, TimelyEvent)>>,
) -> (TimelyTraces, TimelyCollections<'scope>) {
type OpUpdate = ((usize, String, Vec<usize>), Duration, i64);
type ChUpdate = ((usize, Vec<usize>, (usize, usize), (usize, usize)), Duration, i64);
type ElUpdate = (usize, Duration, i64);
type MsgUpdate = (usize, Duration, i64);

let mut demux = OperatorBuilder::new("Timely Demux".to_string(), scope.clone());
let mut demux = OperatorBuilder::new("Timely Demux".to_string(), scope);
let mut input = demux.new_input(stream, Pipeline);

let (op_out, operates) = demux.new_output::<Vec<OpUpdate>>();
Expand Down Expand Up @@ -546,12 +546,12 @@ struct DifferentialCollections<'scope> {

/// Build differential logging collections and arrangements.
fn construct_differential<'scope>(
scope: &mut Scope<'scope, Duration>,
scope: Scope<'scope, Duration>,
stream: Stream<'scope, Duration, Vec<(Duration, DifferentialEvent)>>,
) -> (DifferentialTraces, DifferentialCollections<'scope>) {
type Update = (usize, Duration, i64);

let mut demux = OperatorBuilder::new("Differential Demux".to_string(), scope.clone());
let mut demux = OperatorBuilder::new("Differential Demux".to_string(), scope);
let mut input = demux.new_input(stream, Pipeline);

let (bat_out, batches) = demux.new_output::<Vec<Update>>();
Expand Down
2 changes: 1 addition & 1 deletion differential-dataflow/examples/accumulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ fn main() {
scope.iterative::<u32,_,_>(|inner| {
data.enter_at(inner, |_| 0)
.consolidate()
.leave(&scope)
.leave(scope)
});

input
Expand Down
7 changes: 3 additions & 4 deletions differential-dataflow/examples/arrange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use rand::{Rng, SeedableRng, StdRng};

use timely::dataflow::operators::*;
use timely::order::Product;
use timely::scheduling::Scheduler;

use differential_dataflow::input::Input;
use differential_dataflow::AsCollection;
Expand Down Expand Up @@ -109,8 +108,8 @@ fn main() {
// repeatedly update minimal distances each node can be reached from each root
roots.clone().iterate(|scope, dists| {

let edges = edges.enter(&scope);
let roots = roots.enter(&scope);
let edges = edges.enter(scope);
let roots = roots.enter(scope);

dists.arrange_by_key()
.join_core(edges, |_k,l,d| Some((*d, l+1)))
Expand Down Expand Up @@ -175,4 +174,4 @@ fn main() {
}
}
}).unwrap();
}
}
4 changes: 2 additions & 2 deletions differential-dataflow/examples/bfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ where
// repeatedly update minimal distances each node can be reached from each root
nodes.clone().iterate(|scope, inner| {

let nodes = nodes.enter(&scope);
let edges = edges.enter(&scope);
let nodes = nodes.enter(scope);
let edges = edges.enter(scope);

inner.join_map(edges, |_k,l,d| (*d, l+1))
.concat(nodes)
Expand Down
10 changes: 5 additions & 5 deletions differential-dataflow/examples/columnar/columnar_support.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ mod distributor {
use timely::dataflow::channels::Message;
use timely::dataflow::channels::pact::{LogPuller, LogPusher, ParallelizationContract};
use timely::progress::Timestamp;
use timely::worker::AsWorker;
use timely::worker::Worker;

use crate::layout::ColumnarUpdate as Update;
use crate::{Updates, RecordedUpdates};
Expand Down Expand Up @@ -361,15 +361,15 @@ mod distributor {
>;
type Puller = LogPuller<Box<dyn timely::communication::Pull<Message<T, RecordedUpdates<U>>>>>;

fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option<TimelyLogger>) -> (Self::Pusher, Self::Puller) {
let (senders, receiver) = allocator.allocate::<Message<T, RecordedUpdates<U>>>(identifier, address);
let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, allocator.index(), i, identifier, logging.clone())).collect::<Vec<_>>();
fn connect(self, worker: &Worker, identifier: usize, address: Rc<[usize]>, logging: Option<TimelyLogger>) -> (Self::Pusher, Self::Puller) {
let (senders, receiver) = worker.allocate::<Message<T, RecordedUpdates<U>>>(identifier, address);
let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, worker.index(), i, identifier, logging.clone())).collect::<Vec<_>>();
let distributor = ValDistributor {
marker: std::marker::PhantomData,
hashfunc: self.hashfunc,
pre_lens: Vec::new(),
};
(Exchange::new(senders, distributor), LogPuller::new(receiver, allocator.index(), identifier, logging.clone()))
(Exchange::new(senders, distributor), LogPuller::new(receiver, worker.index(), identifier, logging.clone()))
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion differential-dataflow/examples/columnar/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ mod reachability {
variable.set(result_col.clone());

// Leave the iterative scope.
result_col.leave(&outer)
result_col.leave(outer)
})
}
}
2 changes: 1 addition & 1 deletion differential-dataflow/examples/dynamic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ where
// Leave the dynamic iteration, stripping off the last timestamp coordinate.
next.leave_dynamic(1)
.inspect(|x| println!("{:?}", x))
.leave(&outer)
.leave(outer)
})

}
6 changes: 3 additions & 3 deletions differential-dataflow/examples/graspan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ pub struct EdgeVariable<'scope, T: Timestamp + Lattice> {
impl<'scope, T: Timestamp + Lattice> EdgeVariable<'scope, T> {
/// Creates a new variable initialized with `source`.
pub fn from(source: VecCollection<'scope, T, Edge>, step: T::Summary) -> Self {
let (variable, collection) = VecVariable::new(&mut source.scope(), step);
let (variable, collection) = VecVariable::new(source.scope(), step);
EdgeVariable {
variable,
collection,
Expand Down Expand Up @@ -150,7 +150,7 @@ impl Query {
}

/// Creates a dataflow implementing the query, and returns input and trace handles.
pub fn render_in<T>(&self, scope: &mut Scope<T>) -> BTreeMap<String, RelationHandles<T>>
pub fn render_in<T>(&self, scope: Scope<T>) -> BTreeMap<String, RelationHandles<T>>
where
T: Timestamp + Lattice + ::timely::order::TotalOrder,
{
Expand All @@ -170,7 +170,7 @@ impl Query {
// create variables and result handles for each named relation.
for (name, (input, collection)) in input_map {
let edge_variable = EdgeVariable::from(collection.enter(subscope), Product::new(Default::default(), 1));
let trace = edge_variable.collection.clone().leave(&scope).arrange_by_self().trace;
let trace = edge_variable.collection.clone().leave(scope).arrange_by_self().trace;
result_map.insert(name.clone(), RelationHandles { input, trace });
variable_map.insert(name.clone(), edge_variable);
}
Expand Down
2 changes: 1 addition & 1 deletion differential-dataflow/examples/iterate_container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ fn main() {
}).as_collection().consolidate();
let result = wrap(result.inner).as_collection();
variable.set(result);
collection.leave(&scope)
collection.leave(scope)
});
})
}
2 changes: 1 addition & 1 deletion differential-dataflow/examples/monoid-bfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,6 @@ where
.as_collection(|k,()| *k);

variable.set(result.clone());
result.leave(&outer)
result.leave(outer)
})
}
2 changes: 1 addition & 1 deletion differential-dataflow/examples/pagerank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,6 @@ where

// Bind the recursive variable, return its limit.
ranks_bind.set(pushed.clone());
pushed.leave(&outer)
pushed.leave(outer)
})
}
10 changes: 5 additions & 5 deletions differential-dataflow/examples/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,10 @@ where
.clone()
.iterate(|scope, reach| {
transitions
.enter(&scope)
.enter(scope)
.join_map(reach, |_from, (dest, summ), time| (dest.clone(), summ.results_in(time)))
.flat_map(|(dest, time)| time.map(move |time| (dest, time)))
.concat(times.enter(&scope))
.concat(times.enter(scope))
.reduce(|_location, input, output: &mut Vec<(T, isize)>| {
// retain the lower envelope of times.
for (t1, _count1) in input.iter() {
Expand Down Expand Up @@ -173,10 +173,10 @@ where
.clone()
.iterate(|scope, summaries| {
transitions
.enter(&scope)
.enter(scope)
.join_map(summaries, |_middle, (from, summ1), (to, summ2)| (from.clone(), to.clone(), summ1.followed_by(summ2)))
.flat_map(|(from, to, summ)| summ.map(move |summ| (from, (to, summ))))
.concat(zero_inputs.enter(&scope))
.concat(zero_inputs.enter(scope))
.map(|(from, (to, summary))| ((from, to), summary))
.reduce(|_from_to, input, output| {
for (summary, _count) in input.iter() {
Expand Down Expand Up @@ -222,7 +222,7 @@ where
.map(|(_source, target)| target)
.distinct();
transitions
.enter(&scope)
.enter(scope)
.semijoin(active)
})
.consolidate()
Expand Down
4 changes: 2 additions & 2 deletions differential-dataflow/examples/stackoverflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ where
// repeatedly update minimal distances each node can be reached from each root
nodes.clone().iterate(|scope, inner| {

let edges = edges.enter(&scope);
let nodes = nodes.enter(&scope);
let edges = edges.enter(scope);
let nodes = nodes.enter(scope);

inner.join_map(edges, |_k,l,d| (*d, l+1))
.concat(nodes)
Expand Down
4 changes: 2 additions & 2 deletions differential-dataflow/src/algorithms/graphs/bfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ where
// repeatedly update minimal distances each node can be reached from each root
nodes.clone().iterate(|scope, inner| {

let edges = edges.enter(&scope);
let nodes = nodes.enter(&scope);
let edges = edges.enter(scope);
let nodes = nodes.enter(scope);

inner.join_core(edges, |_k,l,d| Some((d.clone(), l+1)))
.concat(nodes)
Expand Down
2 changes: 1 addition & 1 deletion differential-dataflow/src/algorithms/graphs/bijkstra.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,6 @@ where

reverse_bind.set(reverse_next);

reached.leave(&outer)
reached.leave(outer)
})
}
6 changes: 3 additions & 3 deletions differential-dataflow/src/algorithms/graphs/propagate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ where

// nodes.filter(|_| false)
// .iterate(|scope, inner| {
// let edges = edges.enter(&scope);
// let nodes = nodes.enter_at(&scope, move |r| 256 * (64 - (logic(&r.1)).leading_zeros() as u64));
// let edges = edges.enter(scope);
// let nodes = nodes.enter_at(scope, move |r| 256 * (64 - (logic(&r.1)).leading_zeros() as u64));
// inner.join_map(edges, |_k,l,d| (d.clone(),l.clone()))
// .concat(nodes)
// .reduce(|_, s, t| t.push((s[0].0.clone(), 1)))
Expand Down Expand Up @@ -104,6 +104,6 @@ where

labels
.as_collection(|k,v| (k.clone(), v.clone()))
.leave(&outer)
.leave(outer)
})
}
6 changes: 3 additions & 3 deletions differential-dataflow/src/algorithms/graphs/scc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@ where
let outer = graph.scope();
outer.scoped::<Product<_, usize>,_,_>("StronglyConnected", |scope| {
// Bring in edges and transposed edges.
let edges = graph.enter(&scope);
let edges = graph.enter(scope);
let trans = edges.clone().map_in_place(|x| mem::swap(&mut x.0, &mut x.1));
// Create a new variable that will be intra-scc edges.
use crate::operators::iterate::Variable;
let (variable, inner) = Variable::new_from(edges.clone(), Product::new(Default::default(), 1));

let result = trim_edges(trim_edges(inner, edges), trans);
variable.set(result.clone());
result.leave(&outer)
result.leave(outer)
})
}

Expand Down Expand Up @@ -64,6 +64,6 @@ where
.join_core(labels, |e2,(e1,l1),l2| [((e1.clone(),e2.clone()),(l1.clone(),l2.clone()))])
.filter(|(_,(l1,l2))| l1 == l2)
.map(|((x1,x2),_)| (x2,x1))
.leave_region(&outer)
.leave_region(outer)
})
}
4 changes: 2 additions & 2 deletions differential-dataflow/src/algorithms/graphs/sequential.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ where
.map(|(node, _state)| (node, None))
.iterate(|scope, new_state| {
// immutable content: edges and initial state.
let edges = edges.enter(&scope);
let old_state = state.enter(&scope);
let edges = edges.enter(scope);
let old_state = state.enter(scope);
// .map(|x| (x.0, Some(x.1)));

// break edges into forward and reverse directions.
Expand Down
Loading
Loading