From 7fbd67612625fabf32d7bd1de1ce57c13ff347b2 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Sun, 19 Apr 2026 17:09:30 -0400 Subject: [PATCH 1/2] Introduce Cursor::populate_key --- differential-dataflow/src/trace/cursor/mod.rs | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/differential-dataflow/src/trace/cursor/mod.rs b/differential-dataflow/src/trace/cursor/mod.rs index d0d973249..71ff646e2 100644 --- a/differential-dataflow/src/trace/cursor/mod.rs +++ b/differential-dataflow/src/trace/cursor/mod.rs @@ -55,6 +55,34 @@ pub trait Cursor : LayoutExt { /// Rewinds the cursor to the first value for current key. fn rewind_vals(&mut self, storage: &Self::Storage); + /// Loads `target` with all updates associated with the supplied `key`. + fn populate_key<'a>(&mut self, storage: &'a Self::Storage, key: Self::Key<'a>, meet: Option<&Self::Time>, target: &mut crate::operators::EditList<'a, Self>) where Self : Sized { + target.clear(); + self.seek_key(storage, key); + if self.get_key(storage) == Some(key) { + self.rewind_vals(storage); + if let Some(meet) = meet { + while let Some(val) = self.get_val(storage) { + self.map_times(storage, |time, diff| { + use crate::lattice::Lattice; + let mut time = Self::owned_time(time); + time.join_assign(meet); + target.push(time, Self::owned_diff(diff)) + }); + target.seal(val); + self.step_val(storage); + } + } + else { + while let Some(val) = self.get_val(storage) { + self.map_times(storage, |time, diff| target.push(Self::owned_time(time), Self::owned_diff(diff))); + target.seal(val); + self.step_val(storage); + } + } + } + } + /// Rewinds the cursor and outputs its contents to a Vec fn to_vec(&mut self, storage: &Self::Storage, into_key: IK, into_val: IV) -> Vec<((K, V), Vec<(Self::Time, Self::Diff)>)> where From d6f9667a788f658fd8658fe6fd115b23063c1d33 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Sun, 19 Apr 2026 17:09:54 -0400 Subject: [PATCH 2/2] Update EditList, ValueHistory --- differential-dataflow/src/operators/join.rs | 8 +--- differential-dataflow/src/operators/mod.rs | 37 +++++-------------- differential-dataflow/src/operators/reduce.rs | 14 ++----- differential-dataflow/src/trace/cursor/mod.rs | 2 +- 4 files changed, 16 insertions(+), 45 deletions(-) diff --git a/differential-dataflow/src/operators/join.rs b/differential-dataflow/src/operators/join.rs index edf7120f6..c1b245508 100644 --- a/differential-dataflow/src/operators/join.rs +++ b/differential-dataflow/src/operators/join.rs @@ -370,12 +370,8 @@ where Ordering::Greater => batch.seek_key(batch_storage, trace_key), Ordering::Equal => { - thinker.history1.edits.load(trace, trace_storage, |time| { - let mut time = C1::owned_time(time); - time.join_assign(meet); - time - }); - thinker.history2.edits.load(batch, batch_storage, |time| C2::owned_time(time)); + thinker.history1.edits.load(trace, trace_storage, trace_key, Some(meet)); + thinker.history2.edits.load(batch, batch_storage, batch_key, None); // populate `temp` with the results in the best way we know how. thinker.think(|v1,v2,t,r1,r2| { diff --git a/differential-dataflow/src/operators/mod.rs b/differential-dataflow/src/operators/mod.rs index eb7880aab..a92c7d71c 100644 --- a/differential-dataflow/src/operators/mod.rs +++ b/differential-dataflow/src/operators/mod.rs @@ -19,7 +19,7 @@ use crate::lattice::Lattice; use crate::trace::Cursor; /// An accumulation of (value, time, diff) updates. -struct EditList { +pub struct EditList { values: Vec<(V, usize)>, edits: Vec<(T, D)>, } @@ -33,35 +33,29 @@ impl EditList { edits: Vec::new(), } } - /// Loads the contents of a cursor. - fn load<'a, C, L>(&mut self, cursor: &mut C, storage: &'a C::Storage, logic: L) + /// Loads the contents of a cursor at `key`, advancing times by `meet` if supplied. + fn load<'a, C>(&mut self, cursor: &mut C, storage: &'a C::Storage, key: C::Key<'a>, meet: Option<&T>) where C: Cursor = V, Time = T, Diff = D>, - L: Fn(C::TimeGat<'_>) -> T, { - self.clear(); - while let Some(val) = cursor.get_val(storage) { - cursor.map_times(storage, |time1, diff1| self.push(logic(time1), C::owned_diff(diff1))); - self.seal(val); - cursor.step_val(storage); - } + cursor.populate_key(storage, key, meet, self); } /// Clears the list of edits. #[inline] - fn clear(&mut self) { + pub fn clear(&mut self) { self.values.clear(); self.edits.clear(); } fn len(&self) -> usize { self.edits.len() } /// Inserts a new edit for an as-yet undetermined value. #[inline] - fn push(&mut self, time: T, diff: D) { + pub fn push(&mut self, time: T, diff: D) { // TODO: Could attempt "insertion-sort" like behavior here, where we collapse if possible. self.edits.push((time, diff)); } /// Associates all edits pushed since the previous `seal_value` call with `value`. #[inline] - fn seal(&mut self, value: V) { + pub fn seal(&mut self, value: V) { let prev = self.values.last().map(|x| x.1).unwrap_or(0); crate::consolidation::consolidate_from(&mut self.edits, prev); if self.edits.len() > prev { @@ -98,33 +92,22 @@ impl V self.history.clear(); self.buffer.clear(); } - fn load<'a, C, L>(&mut self, cursor: &mut C, storage: &'a C::Storage, logic: L) - where - C: Cursor = V, Time = T, Diff = D>, - L: Fn(C::TimeGat<'_>) -> T, - { - self.edits.load(cursor, storage, logic); - } /// Loads and replays a specified key. /// /// If the key is absent, the replayed history will be empty. - fn replay_key<'a, 'history, C, L>( + fn replay_key<'a, 'history, C>( &'history mut self, cursor: &mut C, storage: &'a C::Storage, key: C::Key<'a>, - logic: L, + meet: Option<&T>, ) -> HistoryReplay<'history, V, T, D> where C: Cursor = V, Time = T, Diff = D>, - L: Fn(C::TimeGat<'_>) -> T, { self.clear(); - cursor.seek_key(storage, key); - if cursor.get_key(storage) == Some(key) { - self.load(cursor, storage, logic); - } + cursor.populate_key(storage, key, meet, &mut self.edits); self.replay() } diff --git a/differential-dataflow/src/operators/reduce.rs b/differential-dataflow/src/operators/reduce.rs index b6ebb4bb2..60b4406c5 100644 --- a/differential-dataflow/src/operators/reduce.rs +++ b/differential-dataflow/src/operators/reduce.rs @@ -388,7 +388,7 @@ mod history_replay { // loaded times by performing the lattice `join` with this value. // Load the batch contents. - let mut batch_replay = self.batch_history.replay_key(batch_cursor, batch_storage, key, |time| C3::owned_time(time)); + let mut batch_replay = self.batch_history.replay_key(batch_cursor, batch_storage, key, None); // We determine the meet of times we must reconsider (those from `batch` and `times`). This meet // can be used to advance other historical times, which may consolidate their representation. As @@ -412,17 +412,9 @@ mod history_replay { // Load the input and output histories. let mut input_replay = - self.input_history.replay_key(source_cursor, source_storage, key, |time| { - let mut time = C1::owned_time(time); - if let Some(meet) = meet.as_ref() { time.join_assign(meet); } - time - }); + self.input_history.replay_key(source_cursor, source_storage, key, meet.as_ref()); let mut output_replay = - self.output_history.replay_key(output_cursor, output_storage, key, |time| { - let mut time = C2::owned_time(time); - if let Some(meet) = meet.as_ref() { time.join_assign(meet); } - time - }); + self.output_history.replay_key(output_cursor, output_storage, key, meet.as_ref()); self.synth_times.clear(); self.times_current.clear(); diff --git a/differential-dataflow/src/trace/cursor/mod.rs b/differential-dataflow/src/trace/cursor/mod.rs index 71ff646e2..b12ddb690 100644 --- a/differential-dataflow/src/trace/cursor/mod.rs +++ b/differential-dataflow/src/trace/cursor/mod.rs @@ -56,7 +56,7 @@ pub trait Cursor : LayoutExt { fn rewind_vals(&mut self, storage: &Self::Storage); /// Loads `target` with all updates associated with the supplied `key`. - fn populate_key<'a>(&mut self, storage: &'a Self::Storage, key: Self::Key<'a>, meet: Option<&Self::Time>, target: &mut crate::operators::EditList<'a, Self>) where Self : Sized { + fn populate_key<'a>(&mut self, storage: &'a Self::Storage, key: Self::Key<'a>, meet: Option<&Self::Time>, target: &mut crate::operators::EditList, Self::Time, Self::Diff>) { target.clear(); self.seek_key(storage, key); if self.get_key(storage) == Some(key) {