Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions diagnostics/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use std::time::{Duration, Instant};

use differential_dataflow::collection::concatenate;
use differential_dataflow::logging::{DifferentialEvent, DifferentialEventBuilder};
use differential_dataflow::operators::arrange::TraceAgent;
use differential_dataflow::operators::arrange::TraceIntra;
use differential_dataflow::trace::implementations::{KeySpine, ValSpine};
use differential_dataflow::{AsCollection, VecCollection};

Expand Down Expand Up @@ -165,9 +165,9 @@ pub type DiagnosticEvent = Event<Duration, DiagnosticContainer>;
// ============================================================================

/// A key-value trace: key K, value V, time Duration, diff i64.
type ValTrace<K, V> = TraceAgent<ValSpine<K, V, Duration, i64>>;
type ValTrace<K, V> = TraceIntra<ValSpine<K, V, Duration, i64>>;
/// A key-only trace: key K, time Duration, diff i64.
type KeyTrace<K> = TraceAgent<KeySpine<K, Duration, i64>>;
type KeyTrace<K> = TraceIntra<KeySpine<K, Duration, i64>>;

/// Trace handles for timely logging arrangements.
pub struct TimelyTraces {
Expand Down
3 changes: 1 addition & 2 deletions differential-dataflow/examples/arrange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,7 @@ fn main() {
})
.probe_with(&mut probe)
.as_collection()
.arrange_by_key()
// .arrange::<OrdValSpineAbom>()
.arrange_by_key_inter()
.trace
});

Expand Down
12 changes: 5 additions & 7 deletions differential-dataflow/examples/columnar/columnar_support.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1966,15 +1966,13 @@ where
stream.as_collection()
}

/// Extract a `Collection<_, RecordedUpdates<U>>` from a columnar `Arranged`.
/// Extract a `Collection<_, RecordedUpdates<U>>` from a columnar batch stream.
///
/// Cursors through each batch and pushes `(key, val, time, diff)` refs into
/// a `ValColBuilder`, which sorts and consolidates on flush.
pub fn as_recorded_updates<U>(
arranged: differential_dataflow::operators::arrange::Arranged<
differential_dataflow::operators::arrange::TraceAgent<ValSpine<U::Key, U::Val, U::Time, U::Diff>>,
>,
) -> differential_dataflow::Collection<U::Time, RecordedUpdates<U>>
pub fn as_recorded_updates<'scope, U>(
stream: timely::dataflow::Stream<'scope, U::Time, Vec<std::rc::Rc<differential_dataflow::trace::implementations::ord_neu::OrdValBatch<layout::ColumnarLayout<U>>>>>,
) -> differential_dataflow::Collection<'scope, U::Time, RecordedUpdates<U>>
where
U: layout::ColumnarUpdate,
{
Expand All @@ -1983,7 +1981,7 @@ where
use differential_dataflow::trace::{BatchReader, Cursor};
use differential_dataflow::AsCollection;

arranged.stream
stream
.unary::<ValColBuilder<U>, _, _, _>(Pipeline, "AsRecordedUpdates", |_, _| {
move |input, output| {
input.for_each(|time, batches| {
Expand Down
10 changes: 5 additions & 5 deletions differential-dataflow/examples/columnar/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ mod reachability {
use differential_dataflow::Collection;
use differential_dataflow::AsCollection;
use differential_dataflow::operators::iterate::Variable;
use differential_dataflow::operators::arrange::arrangement::arrange_core;
use differential_dataflow::operators::arrange::arrangement::arrange_intra;
use differential_dataflow::operators::join::join_traces;

use crate::columnar_support::*;
Expand Down Expand Up @@ -127,13 +127,13 @@ mod reachability {
let edges_pact = ValPact { hashfunc: |k: columnar::Ref<'_, Node>| *k as u64 };
let reach_pact = ValPact { hashfunc: |k: columnar::Ref<'_, Node>| *k as u64 };

let edges_arr = arrange_core::<_,
let edges_arr = arrange_intra::<_,
ValBatcher<Node, Node, IterTime, Diff>,
ValBuilder<Node, Node, IterTime, Diff>,
ValSpine<Node, Node, IterTime, Diff>,
>(edges_inner.inner, edges_pact, "Edges");

let reach_arr = arrange_core::<_,
let reach_arr = arrange_intra::<_,
ValBatcher<Node, (), IterTime, Diff>,
ValBuilder<Node, (), IterTime, Diff>,
ValSpine<Node, (), IterTime, Diff>,
Expand All @@ -157,7 +157,7 @@ mod reachability {

// Arrange for reduce.
let combined_pact = ValPact { hashfunc: |k: columnar::Ref<'_, Node>| *k as u64 };
let combined_arr = arrange_core::<_,
let combined_arr = arrange_intra::<_,
ValBatcher<Node, (), IterTime, Diff>,
ValBuilder<Node, (), IterTime, Diff>,
ValSpine<Node, (), IterTime, Diff>,
Expand All @@ -180,7 +180,7 @@ mod reachability {
});

// Extract RecordedUpdates from the Arranged's batch stream.
let result_col = as_recorded_updates::<(Node, (), IterTime, Diff)>(result);
let result_col = as_recorded_updates::<(Node, (), IterTime, Diff)>(result.stream);

variable.set(result_col.clone());

Expand Down
14 changes: 7 additions & 7 deletions differential-dataflow/examples/graspan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@ pub struct Query {
}

use differential_dataflow::trace::implementations::{ValSpine, KeySpine};
use differential_dataflow::operators::arrange::{Arranged, TraceAgent};
use differential_dataflow::operators::arrange::{Arranged, TraceInter};

type TraceKeyHandle<K,T,R> = TraceAgent<KeySpine<K, T, R>>;
type TraceValHandle<K,V,T,R> = TraceAgent<ValSpine<K, V, T, R>>;
type TraceKeyHandle<K,T,R> = TraceInter<KeySpine<K, T, R>>;
type TraceValHandle<K,V,T,R> = TraceInter<ValSpine<K, V, T, R>>;
type Arrange<'scope, T,K,V,R> = Arranged<'scope, TraceValHandle<K, V, T, R>>;

/// An evolving set of edges.
Expand Down Expand Up @@ -114,14 +114,14 @@ impl<'scope, T: Timestamp + Lattice> EdgeVariable<'scope, T> {
/// The collection arranged in the forward direction.
pub fn forward(&mut self) -> &Arrange<'scope, T, Node, Node, Diff> {
if self.forward.is_none() {
self.forward = Some(self.collection.clone().arrange_by_key());
self.forward = Some(self.collection.clone().arrange_by_key_inter());
}
self.forward.as_ref().unwrap()
}
/// The collection arranged in the reverse direction.
pub fn reverse(&mut self) -> &Arrange<'scope, T, Node, Node, Diff> {
if self.reverse.is_none() {
self.reverse = Some(self.collection.clone().map(|(x,y)| (y,x)).arrange_by_key());
self.reverse = Some(self.collection.clone().map(|(x,y)| (y,x)).arrange_by_key_inter());
}
self.reverse.as_ref().unwrap()
}
Expand Down Expand Up @@ -170,7 +170,7 @@ impl Query {
// create variables and result handles for each named relation.
for (name, (input, collection)) in input_map {
let edge_variable = EdgeVariable::from(collection.enter(subscope), Product::new(Default::default(), 1));
let trace = edge_variable.collection.clone().leave(scope).arrange_by_self().trace;
let trace = edge_variable.collection.clone().leave(scope).arrange_by_self_inter().trace;
result_map.insert(name.clone(), RelationHandles { input, trace });
variable_map.insert(name.clone(), edge_variable);
}
Expand Down Expand Up @@ -199,7 +199,7 @@ impl Query {
transposed =
transposed
.join_core(to_join, |_k,&x,&y| Some((y,x)))
.arrange_by_key();
.arrange_by_key_inter();
}

// Reverse the direction before adding it as a production.
Expand Down
46 changes: 35 additions & 11 deletions differential-dataflow/src/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -700,7 +700,7 @@ pub mod vec {
}

use crate::trace::{Trace, Builder};
use crate::operators::arrange::{Arranged, TraceAgent};
use crate::operators::arrange::{Arranged, TraceInter, TraceIntra};

impl <'scope, T, K, V, R> Collection<'scope, T, (K, V), R>
where
Expand Down Expand Up @@ -779,7 +779,7 @@ pub mod vec {
/// .trace;
/// });
/// ```
pub fn reduce_abelian<L, Bu, T2>(self, name: &str, mut logic: L) -> Arranged<'scope, TraceAgent<T2>>
pub fn reduce_abelian<L, Bu, T2>(self, name: &str, mut logic: L) -> Arranged<'scope, TraceIntra<T2>>
where
T2: for<'a> Trace<Key<'a>= &'a K, ValOwn = V, Time=T, Diff: Abelian>+'static,
Bu: Builder<Time=T2::Time, Input = Vec<((K, V), T2::Time, T2::Diff)>, Output = T2::Batch>,
Expand All @@ -797,7 +797,7 @@ pub mod vec {
/// Unlike `reduce_arranged`, this method may be called with an empty `input`,
/// and it may not be safe to index into the first element.
/// At least one of the two collections will be non-empty.
pub fn reduce_core<L, Bu, T2>(self, name: &str, logic: L) -> Arranged<'scope, TraceAgent<T2>>
pub fn reduce_core<L, Bu, T2>(self, name: &str, logic: L) -> Arranged<'scope, TraceIntra<T2>>
where
V: Clone+'static,
T2: for<'a> Trace<Key<'a>=&'a K, ValOwn = V, Time=T>+'static,
Expand Down Expand Up @@ -1027,29 +1027,29 @@ pub mod vec {
V: crate::ExchangeData,
R: crate::ExchangeData + Semigroup,
{
fn arrange_named<Ba, Bu, Tr>(self, name: &str) -> Arranged<'scope, TraceAgent<Tr>>
fn arrange_named<Ba, Bu, Tr>(self, name: &str) -> Arranged<'scope, TraceIntra<Tr>>
where
Ba: crate::trace::Batcher<Input=Vec<((K, V), T, R)>, Time=T> + 'static,
Bu: crate::trace::Builder<Time=T, Input=Ba::Output, Output = Tr::Batch>,
Tr: crate::trace::Trace<Time=T> + 'static,
{
let exchange = timely::dataflow::channels::pact::Exchange::new(move |update: &((K,V),T,R)| (update.0).0.hashed().into());
crate::operators::arrange::arrangement::arrange_core::<_, Ba, Bu, _>(self.inner, exchange, name)
crate::operators::arrange::arrangement::arrange_intra::<_, Ba, Bu, _>(self.inner, exchange, name)
}
}

impl<'scope, T, K: crate::ExchangeData+Hashable, R: crate::ExchangeData+Semigroup> Arrange<'scope, T, Vec<((K, ()), T, R)>> for Collection<'scope, T, K, R>
where
T: Timestamp + Lattice + Ord,
{
fn arrange_named<Ba, Bu, Tr>(self, name: &str) -> Arranged<'scope, TraceAgent<Tr>>
fn arrange_named<Ba, Bu, Tr>(self, name: &str) -> Arranged<'scope, TraceIntra<Tr>>
where
Ba: crate::trace::Batcher<Input=Vec<((K,()),T,R)>, Time=T> + 'static,
Bu: crate::trace::Builder<Time=T, Input=Ba::Output, Output = Tr::Batch>,
Tr: crate::trace::Trace<Time=T> + 'static,
{
let exchange = timely::dataflow::channels::pact::Exchange::new(move |update: &((K,()),T,R)| (update.0).0.hashed().into());
crate::operators::arrange::arrangement::arrange_core::<_,Ba,Bu,_>(self.map(|k| (k, ())).inner, exchange, name)
crate::operators::arrange::arrangement::arrange_intra::<_,Ba,Bu,_>(self.map(|k| (k, ())).inner, exchange, name)
}
}

Expand All @@ -1063,14 +1063,26 @@ pub mod vec {
/// This operator arranges a stream of values into a shared trace, whose contents it maintains.
/// This trace is current for all times completed by the output stream, which can be used to
/// safely identify the stable times and values in the trace.
pub fn arrange_by_key(self) -> Arranged<'scope, TraceAgent<ValSpine<K, V, T, R>>> {
pub fn arrange_by_key(self) -> Arranged<'scope, TraceIntra<ValSpine<K, V, T, R>>> {
self.arrange_by_key_named("ArrangeByKey")
}

/// As `arrange_by_key` but with the ability to name the arrangement.
pub fn arrange_by_key_named(self, name: &str) -> Arranged<'scope, TraceAgent<ValSpine<K, V, T, R>>> {
pub fn arrange_by_key_named(self, name: &str) -> Arranged<'scope, TraceIntra<ValSpine<K, V, T, R>>> {
self.arrange_named::<ValBatcher<_,_,_,_>,ValBuilder<_,_,_,_>,_>(name)
}


/// As `arrange_by_key` but producing a `TraceInter` that can be imported into other dataflows.
pub fn arrange_by_key_inter(self) -> Arranged<'scope, TraceInter<ValSpine<K, V, T, R>>> {
self.arrange_by_key_inter_named("ArrangeByKey")
}

/// As `arrange_by_key_inter` but with the ability to name the arrangement.
pub fn arrange_by_key_inter_named(self, name: &str) -> Arranged<'scope, TraceInter<ValSpine<K, V, T, R>>> {
let exchange = timely::dataflow::channels::pact::Exchange::new(move |update: &((K,V),T,R)| (update.0).0.hashed().into());
crate::operators::arrange::arrangement::arrange_inter::<_,ValBatcher<_,_,_,_>,ValBuilder<_,_,_,_>,_>(self.inner, exchange, name)
}
}

impl<'scope, T, K: crate::ExchangeData+Hashable, R: crate::ExchangeData+Semigroup> Collection<'scope, T, K, R>
Expand All @@ -1082,15 +1094,27 @@ pub mod vec {
/// This operator arranges a collection of records into a shared trace, whose contents it maintains.
/// This trace is current for all times complete in the output stream, which can be used to safely
/// identify the stable times and values in the trace.
pub fn arrange_by_self(self) -> Arranged<'scope, TraceAgent<KeySpine<K, T, R>>> {
pub fn arrange_by_self(self) -> Arranged<'scope, TraceIntra<KeySpine<K, T, R>>> {
self.arrange_by_self_named("ArrangeBySelf")
}

/// As `arrange_by_self` but with the ability to name the arrangement.
pub fn arrange_by_self_named(self, name: &str) -> Arranged<'scope, TraceAgent<KeySpine<K, T, R>>> {
pub fn arrange_by_self_named(self, name: &str) -> Arranged<'scope, TraceIntra<KeySpine<K, T, R>>> {
self.map(|k| (k, ()))
.arrange_named::<KeyBatcher<_,_,_>,KeyBuilder<_,_,_>,_>(name)
}


/// As `arrange_by_self` but producing a `TraceInter` that can be imported into other dataflows.
pub fn arrange_by_self_inter(self) -> Arranged<'scope, TraceInter<KeySpine<K, T, R>>> {
self.arrange_by_self_inter_named("ArrangeBySelf")
}

/// As `arrange_by_self_inter` but with the ability to name the arrangement.
pub fn arrange_by_self_inter_named(self, name: &str) -> Arranged<'scope, TraceInter<KeySpine<K, T, R>>> {
let exchange = timely::dataflow::channels::pact::Exchange::new(move |update: &((K,()),T,R)| (update.0).0.hashed().into());
crate::operators::arrange::arrangement::arrange_inter::<_,KeyBatcher<_,_,_>,KeyBuilder<_,_,_>,_>(self.map(|k| (k, ())).inner, exchange, name)
}
}

impl<'scope, T, K, V, R> Collection<'scope, T, (K, V), R>
Expand Down
Loading
Loading