Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 10 additions & 11 deletions differential-dataflow/src/operators/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,19 +400,18 @@ where
}
}

struct JoinThinker<'a, C1, C2>
where
C1: Cursor,
C2: Cursor<Time = C1::Time>,
{
pub history1: ValueHistory<'a, C1>,
pub history2: ValueHistory<'a, C2>,
struct JoinThinker<V1, V2, T, D1, D2> {
pub history1: ValueHistory<V1, T, D1>,
pub history2: ValueHistory<V2, T, D2>,
}

impl<'a, C1, C2> JoinThinker<'a, C1, C2>
impl<V1, V2, T, D1, D2> JoinThinker<V1, V2, T, D1, D2>
where
C1: Cursor,
C2: Cursor<Time = C1::Time>,
V1: Copy + Ord,
V2: Copy + Ord,
T: Ord + Clone + Lattice,
D1: Clone + crate::difference::Semigroup,
D2: Clone + crate::difference::Semigroup,
{
fn new() -> Self {
JoinThinker {
Expand All @@ -421,7 +420,7 @@ where
}
}

fn think<F: FnMut(C1::Val<'a>,C2::Val<'a>,C1::Time,&C1::Diff,&C2::Diff)>(&mut self, mut results: F) {
fn think<F: FnMut(V1, V2, T, &D1, &D2)>(&mut self, mut results: F) {

// for reasonably sized edits, do the dead-simple thing.
if self.history1.edits.len() < 10 || self.history2.edits.len() < 10 {
Expand Down
67 changes: 35 additions & 32 deletions differential-dataflow/src/operators/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ use crate::lattice::Lattice;
use crate::trace::Cursor;

/// An accumulation of (value, time, diff) updates.
struct EditList<'a, C: Cursor> {
values: Vec<(C::Val<'a>, usize)>,
edits: Vec<(C::Time, C::Diff)>,
struct EditList<V, T, D> {
values: Vec<(V, usize)>,
edits: Vec<(T, D)>,
}

impl<'a, C: Cursor> EditList<'a, C> {
impl<V: Copy, T: Ord, D: crate::difference::Semigroup> EditList<V, T, D> {
/// Creates an empty list of edits.
#[inline]
fn new() -> Self {
Expand All @@ -34,9 +34,10 @@ impl<'a, C: Cursor> EditList<'a, C> {
}
}
/// Loads the contents of a cursor.
fn load<L>(&mut self, cursor: &mut C, storage: &'a C::Storage, logic: L)
fn load<'a, C, L>(&mut self, cursor: &mut C, storage: &'a C::Storage, logic: L)
where
L: Fn(C::TimeGat<'_>)->C::Time,
C: Cursor<Val<'a> = V, Time = T, Diff = D>,
L: Fn(C::TimeGat<'_>) -> T,
{
self.clear();
while let Some(val) = cursor.get_val(storage) {
Expand All @@ -54,20 +55,20 @@ impl<'a, C: Cursor> EditList<'a, C> {
fn len(&self) -> usize { self.edits.len() }
/// Inserts a new edit for an as-yet undetermined value.
#[inline]
fn push(&mut self, time: C::Time, diff: C::Diff) {
fn push(&mut self, time: T, diff: D) {
// TODO: Could attempt "insertion-sort" like behavior here, where we collapse if possible.
self.edits.push((time, diff));
}
/// Associates all edits pushed since the previous `seal_value` call with `value`.
#[inline]
fn seal(&mut self, value: C::Val<'a>) {
fn seal(&mut self, value: V) {
let prev = self.values.last().map(|x| x.1).unwrap_or(0);
crate::consolidation::consolidate_from(&mut self.edits, prev);
if self.edits.len() > prev {
self.values.push((value, self.edits.len()));
}
}
fn map<F: FnMut(C::Val<'a>, &C::Time, &C::Diff)>(&self, mut logic: F) {
fn map<F: FnMut(V, &T, &D)>(&self, mut logic: F) {
for index in 0 .. self.values.len() {
let lower = if index == 0 { 0 } else { self.values[index-1].1 };
let upper = self.values[index].1;
Expand All @@ -78,13 +79,13 @@ impl<'a, C: Cursor> EditList<'a, C> {
}
}

struct ValueHistory<'storage, C: Cursor> {
edits: EditList<'storage, C>,
history: Vec<(C::Time, C::Time, usize, usize)>, // (time, meet, value_index, edit_offset)
buffer: Vec<((C::Val<'storage>, C::Time), C::Diff)>, // where we accumulate / collapse updates.
struct ValueHistory<V, T, D> {
edits: EditList<V, T, D>,
history: Vec<(T, T, usize, usize)>, // (time, meet, value_index, edit_offset)
buffer: Vec<((V, T), D)>, // where we accumulate / collapse updates.
}

impl<'storage, C: Cursor> ValueHistory<'storage, C> {
impl<V: Copy + Ord, T: Ord + Clone + Lattice, D: crate::difference::Semigroup> ValueHistory<V, T, D> {
fn new() -> Self {
ValueHistory {
edits: EditList::new(),
Expand All @@ -97,25 +98,27 @@ impl<'storage, C: Cursor> ValueHistory<'storage, C> {
self.history.clear();
self.buffer.clear();
}
fn load<L>(&mut self, cursor: &mut C, storage: &'storage C::Storage, logic: L)
fn load<'a, C, L>(&mut self, cursor: &mut C, storage: &'a C::Storage, logic: L)
where
L: Fn(C::TimeGat<'_>)->C::Time,
C: Cursor<Val<'a> = V, Time = T, Diff = D>,
L: Fn(C::TimeGat<'_>) -> T,
{
self.edits.load(cursor, storage, logic);
}

/// Loads and replays a specified key.
///
/// If the key is absent, the replayed history will be empty.
fn replay_key<'history, L>(
fn replay_key<'a, 'history, C, L>(
&'history mut self,
cursor: &mut C,
storage: &'storage C::Storage,
key: C::Key<'storage>,
logic: L
) -> HistoryReplay<'storage, 'history, C>
storage: &'a C::Storage,
key: C::Key<'a>,
logic: L,
) -> HistoryReplay<'history, V, T, D>
where
L: Fn(C::TimeGat<'_>)->C::Time,
C: Cursor<Val<'a> = V, Time = T, Diff = D>,
L: Fn(C::TimeGat<'_>) -> T,
{
self.clear();
cursor.seek_key(storage, key);
Expand All @@ -126,7 +129,7 @@ impl<'storage, C: Cursor> ValueHistory<'storage, C> {
}

/// Organizes history based on current contents of edits.
fn replay<'history>(&'history mut self) -> HistoryReplay<'storage, 'history, C> {
fn replay<'history>(&'history mut self) -> HistoryReplay<'history, V, T, D> {

self.buffer.clear();
self.history.clear();
Expand All @@ -150,34 +153,34 @@ impl<'storage, C: Cursor> ValueHistory<'storage, C> {
}
}

struct HistoryReplay<'storage, 'history, C: Cursor> {
replay: &'history mut ValueHistory<'storage, C>
struct HistoryReplay<'history, V, T, D> {
replay: &'history mut ValueHistory<V, T, D>,
}

impl<'storage, 'history, C: Cursor> HistoryReplay<'storage, 'history, C> {
fn time(&self) -> Option<&C::Time> { self.replay.history.last().map(|x| &x.0) }
fn meet(&self) -> Option<&C::Time> { self.replay.history.last().map(|x| &x.1) }
fn edit(&self) -> Option<(C::Val<'storage>, &C::Time, &C::Diff)> {
impl<'history, V: Copy + Ord, T: Ord + Clone + Lattice, D: Clone + crate::difference::Semigroup> HistoryReplay<'history, V, T, D> {
fn time(&self) -> Option<&T> { self.replay.history.last().map(|x| &x.0) }
fn meet(&self) -> Option<&T> { self.replay.history.last().map(|x| &x.1) }
fn edit(&self) -> Option<(V, &T, &D)> {
self.replay.history.last().map(|&(ref t, _, v, e)| (self.replay.edits.values[v].0, t, &self.replay.edits.edits[e].1))
}

fn buffer(&self) -> &[((C::Val<'storage>, C::Time), C::Diff)] {
fn buffer(&self) -> &[((V, T), D)] {
&self.replay.buffer[..]
}

fn step(&mut self) {
let (time, _, value_index, edit_offset) = self.replay.history.pop().unwrap();
self.replay.buffer.push(((self.replay.edits.values[value_index].0, time), self.replay.edits.edits[edit_offset].1.clone()));
}
fn step_while_time_is(&mut self, time: &C::Time) -> bool {
fn step_while_time_is(&mut self, time: &T) -> bool {
let mut found = false;
while self.time() == Some(time) {
found = true;
self.step();
}
found
}
fn advance_buffer_by(&mut self, meet: &C::Time) {
fn advance_buffer_by(&mut self, meet: &T) {
for element in self.replay.buffer.iter_mut() {
(element.0).1 = (element.0).1.join(meet);
}
Expand Down
64 changes: 29 additions & 35 deletions differential-dataflow/src/operators/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@
pending_time.clear(); std::mem::swap(&mut next_pending_time, &mut pending_time);

// Update `capabilities` to reflect pending times.
let mut frontier = Antichain::<Tr1::Time>::new();

Check warning on line 272 in differential-dataflow/src/operators/reduce.rs

View workflow job for this annotation

GitHub Actions / Cargo clippy

`mut frontier` shadows a previous, unrelated binding
let mut owned_time = Tr1::Time::minimum();
for pos in 0 .. pending_time.len() {
Tr1::clone_time_onto(pending_time.index(pos), &mut owned_time);
Expand Down Expand Up @@ -312,7 +312,6 @@
mod history_replay {

use timely::progress::Antichain;
use timely::PartialOrder;

use crate::lattice::Lattice;
use crate::trace::Cursor;
Expand All @@ -322,32 +321,28 @@

/// The `HistoryReplayer` is a compute strategy based on moving through existing inputs, interesting times, etc in
/// time order, maintaining consolidated representations of updates with respect to future interesting times.
pub struct HistoryReplayer<'a, C1, C2, C3, V>
where
C1: Cursor,
C2: Cursor<Key<'a> = C1::Key<'a>, Time = C1::Time>,
C3: Cursor<Key<'a> = C1::Key<'a>, Val<'a> = C1::Val<'a>, Time = C1::Time, Diff = C1::Diff>,
V: Clone + Ord,
{
input_history: ValueHistory<'a, C1>,
output_history: ValueHistory<'a, C2>,
batch_history: ValueHistory<'a, C3>,
input_buffer: Vec<(C1::Val<'a>, C1::Diff)>,
output_buffer: Vec<(V, C2::Diff)>,
update_buffer: Vec<(V, C2::Diff)>,
output_produced: Vec<((V, C2::Time), C2::Diff)>,
synth_times: Vec<C1::Time>,
meets: Vec<C1::Time>,
times_current: Vec<C1::Time>,
temporary: Vec<C1::Time>,
pub struct HistoryReplayer<V1, V2, V, T, D1, D2> {
input_history: ValueHistory<V1, T, D1>,
output_history: ValueHistory<V2, T, D2>,
batch_history: ValueHistory<V1, T, D1>,
input_buffer: Vec<(V1, D1)>,
output_buffer: Vec<(V, D2)>,
update_buffer: Vec<(V, D2)>,
output_produced: Vec<((V, T), D2)>,
synth_times: Vec<T>,
meets: Vec<T>,
times_current: Vec<T>,
temporary: Vec<T>,
}

impl<'a, C1, C2, C3, V> HistoryReplayer<'a, C1, C2, C3, V>
impl<V1, V2, V, T, D1, D2> HistoryReplayer<V1, V2, V, T, D1, D2>
where
C1: Cursor,
C2: for<'b> Cursor<Key<'a> = C1::Key<'a>, ValOwn = V, Time = C1::Time>,
C3: Cursor<Key<'a> = C1::Key<'a>, Val<'a> = C1::Val<'a>, Time = C1::Time, Diff = C1::Diff>,
V1: Copy + Ord,
V2: Copy + Ord,
V: Clone + Ord,
T: Ord + Clone + Lattice,
D1: Clone + crate::difference::Semigroup,
D2: Clone + crate::difference::Semigroup,
{
pub fn new() -> Self {
HistoryReplayer {
Expand All @@ -365,24 +360,23 @@
}
}
#[inline(never)]
pub fn compute<L>(
pub fn compute<'a, K, C1, C2, C3, L>(

Check warning on line 363 in differential-dataflow/src/operators/reduce.rs

View workflow job for this annotation

GitHub Actions / Cargo clippy

this function has too many arguments (10/7)
&mut self,
key: C1::Key<'a>,
key: K,
(source_cursor, source_storage): (&mut C1, &'a C1::Storage),
(output_cursor, output_storage): (&mut C2, &'a C2::Storage),
(batch_cursor, batch_storage): (&mut C3, &'a C3::Storage),
times: &Vec<C1::Time>,
times: &Vec<T>,
logic: &mut L,
upper_limit: &Antichain<C1::Time>,
outputs: &mut [(C2::Time, Vec<(V, C2::Time, C2::Diff)>)],
new_interesting: &mut Vec<C1::Time>)
upper_limit: &Antichain<T>,
outputs: &mut [(T, Vec<(V, T, D2)>)],
new_interesting: &mut Vec<T>)
where
L: FnMut(
C1::Key<'a>,
&[(C1::Val<'a>, C1::Diff)],
&mut Vec<(V, C2::Diff)>,
&mut Vec<(V, C2::Diff)>,
)
C1: Cursor<Key<'a> = K, Val<'a> = V1, Time = T, Diff = D1>,
C2: Cursor<Key<'a> = K, Val<'a> = V2, ValOwn = V, Time = T, Diff = D2>,
C3: Cursor<Key<'a> = K, Val<'a> = V1, Time = T, Diff = D1>,
K: Copy + Ord,
L: FnMut(K, &[(V1, D1)], &mut Vec<(V, D2)>, &mut Vec<(V, D2)>),
{

// The work we need to perform is at times defined principally by the contents of `batch_cursor`
Expand Down
Loading