Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions differential-dataflow/src/operators/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,12 +370,8 @@ where
Ordering::Greater => batch.seek_key(batch_storage, trace_key),
Ordering::Equal => {

thinker.history1.edits.load(trace, trace_storage, |time| {
let mut time = C1::owned_time(time);
time.join_assign(meet);
time
});
thinker.history2.edits.load(batch, batch_storage, |time| C2::owned_time(time));
thinker.history1.edits.load(trace, trace_storage, trace_key, Some(meet));
thinker.history2.edits.load(batch, batch_storage, batch_key, None);

// populate `temp` with the results in the best way we know how.
thinker.think(|v1,v2,t,r1,r2| {
Expand Down
37 changes: 10 additions & 27 deletions differential-dataflow/src/operators/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::lattice::Lattice;
use crate::trace::Cursor;

/// An accumulation of (value, time, diff) updates.
struct EditList<V, T, D> {
pub struct EditList<V, T, D> {
values: Vec<(V, usize)>,
edits: Vec<(T, D)>,
}
Expand All @@ -33,35 +33,29 @@ impl<V: Copy, T: Ord, D: crate::difference::Semigroup> EditList<V, T, D> {
edits: Vec::new(),
}
}
/// Loads the contents of a cursor.
fn load<'a, C, L>(&mut self, cursor: &mut C, storage: &'a C::Storage, logic: L)
/// Loads the contents of a cursor at `key`, advancing times by `meet` if supplied.
fn load<'a, C>(&mut self, cursor: &mut C, storage: &'a C::Storage, key: C::Key<'a>, meet: Option<&T>)
where
C: Cursor<Val<'a> = V, Time = T, Diff = D>,
L: Fn(C::TimeGat<'_>) -> T,
{
self.clear();
while let Some(val) = cursor.get_val(storage) {
cursor.map_times(storage, |time1, diff1| self.push(logic(time1), C::owned_diff(diff1)));
self.seal(val);
cursor.step_val(storage);
}
cursor.populate_key(storage, key, meet, self);
}
/// Clears the list of edits.
#[inline]
fn clear(&mut self) {
pub fn clear(&mut self) {
self.values.clear();
self.edits.clear();
}
fn len(&self) -> usize { self.edits.len() }
/// Inserts a new edit for an as-yet undetermined value.
#[inline]
fn push(&mut self, time: T, diff: D) {
pub fn push(&mut self, time: T, diff: D) {
// TODO: Could attempt "insertion-sort" like behavior here, where we collapse if possible.
self.edits.push((time, diff));
}
/// Associates all edits pushed since the previous `seal_value` call with `value`.
#[inline]
fn seal(&mut self, value: V) {
pub fn seal(&mut self, value: V) {
let prev = self.values.last().map(|x| x.1).unwrap_or(0);
crate::consolidation::consolidate_from(&mut self.edits, prev);
if self.edits.len() > prev {
Expand Down Expand Up @@ -98,33 +92,22 @@ impl<V: Copy + Ord, T: Ord + Clone + Lattice, D: crate::difference::Semigroup> V
self.history.clear();
self.buffer.clear();
}
fn load<'a, C, L>(&mut self, cursor: &mut C, storage: &'a C::Storage, logic: L)
where
C: Cursor<Val<'a> = V, Time = T, Diff = D>,
L: Fn(C::TimeGat<'_>) -> T,
{
self.edits.load(cursor, storage, logic);
}

/// Loads and replays a specified key.
///
/// If the key is absent, the replayed history will be empty.
fn replay_key<'a, 'history, C, L>(
fn replay_key<'a, 'history, C>(
&'history mut self,
cursor: &mut C,
storage: &'a C::Storage,
key: C::Key<'a>,
logic: L,
meet: Option<&T>,
) -> HistoryReplay<'history, V, T, D>
where
C: Cursor<Val<'a> = V, Time = T, Diff = D>,
L: Fn(C::TimeGat<'_>) -> T,
{
self.clear();
cursor.seek_key(storage, key);
if cursor.get_key(storage) == Some(key) {
self.load(cursor, storage, logic);
}
cursor.populate_key(storage, key, meet, &mut self.edits);
self.replay()
}

Expand Down
14 changes: 3 additions & 11 deletions differential-dataflow/src/operators/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@
pending_time.clear(); std::mem::swap(&mut next_pending_time, &mut pending_time);

// Update `capabilities` to reflect pending times.
let mut frontier = Antichain::<Tr1::Time>::new();

Check warning on line 272 in differential-dataflow/src/operators/reduce.rs

View workflow job for this annotation

GitHub Actions / Cargo clippy

`mut frontier` shadows a previous, unrelated binding
let mut owned_time = Tr1::Time::minimum();
for pos in 0 .. pending_time.len() {
Tr1::clone_time_onto(pending_time.index(pos), &mut owned_time);
Expand Down Expand Up @@ -360,7 +360,7 @@
}
}
#[inline(never)]
pub fn compute<'a, K, C1, C2, C3, L>(

Check warning on line 363 in differential-dataflow/src/operators/reduce.rs

View workflow job for this annotation

GitHub Actions / Cargo clippy

this function has too many arguments (10/7)
&mut self,
key: K,
(source_cursor, source_storage): (&mut C1, &'a C1::Storage),
Expand Down Expand Up @@ -388,7 +388,7 @@
// loaded times by performing the lattice `join` with this value.

// Load the batch contents.
let mut batch_replay = self.batch_history.replay_key(batch_cursor, batch_storage, key, |time| C3::owned_time(time));
let mut batch_replay = self.batch_history.replay_key(batch_cursor, batch_storage, key, None);

// We determine the meet of times we must reconsider (those from `batch` and `times`). This meet
// can be used to advance other historical times, which may consolidate their representation. As
Expand All @@ -412,17 +412,9 @@

// Load the input and output histories.
let mut input_replay =
self.input_history.replay_key(source_cursor, source_storage, key, |time| {
let mut time = C1::owned_time(time);
if let Some(meet) = meet.as_ref() { time.join_assign(meet); }
time
});
self.input_history.replay_key(source_cursor, source_storage, key, meet.as_ref());
let mut output_replay =
self.output_history.replay_key(output_cursor, output_storage, key, |time| {
let mut time = C2::owned_time(time);
if let Some(meet) = meet.as_ref() { time.join_assign(meet); }
time
});
self.output_history.replay_key(output_cursor, output_storage, key, meet.as_ref());

self.synth_times.clear();
self.times_current.clear();
Expand Down
28 changes: 28 additions & 0 deletions differential-dataflow/src/trace/cursor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,34 @@ pub trait Cursor : LayoutExt {
/// Rewinds the cursor to the first value for current key.
fn rewind_vals(&mut self, storage: &Self::Storage);

/// Loads `target` with all updates associated with the supplied `key`.
fn populate_key<'a>(&mut self, storage: &'a Self::Storage, key: Self::Key<'a>, meet: Option<&Self::Time>, target: &mut crate::operators::EditList<Self::Val<'a>, Self::Time, Self::Diff>) {
target.clear();
self.seek_key(storage, key);
if self.get_key(storage) == Some(key) {
self.rewind_vals(storage);
if let Some(meet) = meet {
while let Some(val) = self.get_val(storage) {
self.map_times(storage, |time, diff| {
use crate::lattice::Lattice;
let mut time = Self::owned_time(time);
time.join_assign(meet);
target.push(time, Self::owned_diff(diff))
});
target.seal(val);
self.step_val(storage);
}
}
else {
while let Some(val) = self.get_val(storage) {
self.map_times(storage, |time, diff| target.push(Self::owned_time(time), Self::owned_diff(diff)));
target.seal(val);
self.step_val(storage);
}
}
}
}

/// Rewinds the cursor and outputs its contents to a Vec
fn to_vec<K, IK, V, IV>(&mut self, storage: &Self::Storage, into_key: IK, into_val: IV) -> Vec<((K, V), Vec<(Self::Time, Self::Diff)>)>
where
Expand Down
Loading