diff --git a/advent_of_code_2017/src/bin/day_09.rs b/advent_of_code_2017/src/bin/day_09.rs index 8a9a0f310..cfda8ed06 100644 --- a/advent_of_code_2017/src/bin/day_09.rs +++ b/advent_of_code_2017/src/bin/day_09.rs @@ -107,7 +107,7 @@ where unit_ranges .iterate(|scope, ranges| - // Each available range, of size less than usize::max_value(), advertises itself as the range + // Each available range, of size less than usize::MAX, advertises itself as the range // twice as large, aligned to integer multiples of its size. Each range, which may contain at // most two elements, then summarizes itself using the `combine` function. Finally, we re-add // the initial `unit_ranges` intervals, so that the set of ranges grows monotonically. diff --git a/differential-dataflow/examples/spines.rs b/differential-dataflow/examples/spines.rs deleted file mode 100644 index b2e9bf5dd..000000000 --- a/differential-dataflow/examples/spines.rs +++ /dev/null @@ -1,104 +0,0 @@ -use timely::dataflow::operators::probe::Handle; - -use differential_dataflow::input::Input; - -use mimalloc::MiMalloc; - -#[global_allocator] -static GLOBAL: MiMalloc = MiMalloc; - -fn main() { - - let keys: usize = std::env::args().nth(1).unwrap().parse().unwrap(); - let size: usize = std::env::args().nth(2).unwrap().parse().unwrap(); - - let mode: String = std::env::args().nth(3).unwrap(); - - println!("Running [{:?}] arrangement", mode); - - let timer1 = ::std::time::Instant::now(); - let timer2 = timer1.clone(); - - // define a new computational scope, in which to run BFS - timely::execute_from_args(std::env::args(), move |worker| { - - // define BFS dataflow; return handles to roots and edges inputs - let mut probe = Handle::new(); - let (mut data_input, mut keys_input) = worker.dataflow(|scope| { - - use differential_dataflow::operators::{arrange::Arrange}; - - let (data_input, data) = scope.new_collection::(); - let (keys_input, keys) = scope.new_collection::(); - - match mode.as_str() { - "old" => { - use differential_dataflow::trace::implementations::ord_neu::{OrdKeyBatcher, RcOrdKeyBuilder, OrdKeySpine}; - let data = data.arrange::, RcOrdKeyBuilder<_,_,_>, OrdKeySpine<_,_,_>>(); - let keys = keys.arrange::, RcOrdKeyBuilder<_,_,_>, OrdKeySpine<_,_,_>>(); - keys.join_core(data, |_k, &(), &()| Option::<()>::None) - .probe_with(&mut probe); - }, - "rhh" => { - use differential_dataflow::trace::implementations::rhh::{HashWrapper, VecBatcher, VecBuilder, VecSpine}; - let data = data.map(|x| HashWrapper { inner: x }).arrange::, VecBuilder<_,(),_,_>, VecSpine<_,(),_,_>>(); - let keys = keys.map(|x| HashWrapper { inner: x }).arrange::, VecBuilder<_,(),_,_>, VecSpine<_,(),_,_>>(); - keys.join_core(data, |_k, &(), &()| Option::<()>::None) - .probe_with(&mut probe); - }, - _ => { - println!("unrecognized mode: {:?}", mode) - } - } - - (data_input, keys_input) - }); - - // Load up data in batches. - let mut counter = 0; - while counter < 10 * keys { - let mut i = worker.index(); - while i < size { - let val = (counter + i) % keys; - data_input.insert(format!("{:?}", val)); - i += worker.peers(); - } - counter += size; - data_input.advance_to(data_input.time() + 1); - data_input.flush(); - keys_input.advance_to(keys_input.time() + 1); - keys_input.flush(); - while probe.less_than(data_input.time()) { - worker.step(); - } - } - println!("{:?}\tloading complete", timer1.elapsed()); - - let mut queries = 0; - - while queries < 10 * keys { - let mut i = worker.index(); - while i < size { - let val = (queries + i) % keys; - keys_input.insert(format!("{:?}", val)); - i += worker.peers(); - } - queries += size; - data_input.advance_to(data_input.time() + 1); - data_input.flush(); - keys_input.advance_to(keys_input.time() + 1); - keys_input.flush(); - while probe.less_than(data_input.time()) { - worker.step(); - } - } - - println!("{:?}\tqueries complete", timer1.elapsed()); - - // loop { } - - }).unwrap(); - - println!("{:?}\tshut down", timer2.elapsed()); - -} diff --git a/differential-dataflow/src/algorithms/graphs/bfs.rs b/differential-dataflow/src/algorithms/graphs/bfs.rs index 830cd0f6e..8a9428993 100644 --- a/differential-dataflow/src/algorithms/graphs/bfs.rs +++ b/differential-dataflow/src/algorithms/graphs/bfs.rs @@ -11,7 +11,7 @@ use crate::lattice::Lattice; /// Returns pairs (node, dist) indicating distance of each node from a root. pub fn bfs<'scope, T, N>(edges: VecCollection<'scope, T, (N,N)>, roots: VecCollection<'scope, T, N>) -> VecCollection<'scope, T, (N,u32)> where - T: Timestamp + Lattice + Ord, + T: Timestamp + Lattice, N: ExchangeData+Hash, { let edges = edges.arrange_by_key(); diff --git a/differential-dataflow/src/algorithms/graphs/bijkstra.rs b/differential-dataflow/src/algorithms/graphs/bijkstra.rs index b9d2dd44c..fce691af5 100644 --- a/differential-dataflow/src/algorithms/graphs/bijkstra.rs +++ b/differential-dataflow/src/algorithms/graphs/bijkstra.rs @@ -21,7 +21,7 @@ use crate::operators::iterate::Variable; /// could be good insurance here. pub fn bidijkstra<'scope, T, N>(edges: VecCollection<'scope, T, (N,N)>, goals: VecCollection<'scope, T, (N,N)>) -> VecCollection<'scope, T, ((N,N), u32)> where - T: Timestamp + Lattice + Ord, + T: Timestamp + Lattice, N: ExchangeData+Hash, { let forward = edges.clone().arrange_by_key(); diff --git a/differential-dataflow/src/algorithms/graphs/propagate.rs b/differential-dataflow/src/algorithms/graphs/propagate.rs index 915bf545d..a1afe57ff 100644 --- a/differential-dataflow/src/algorithms/graphs/propagate.rs +++ b/differential-dataflow/src/algorithms/graphs/propagate.rs @@ -15,7 +15,7 @@ use crate::difference::{Abelian, Multiply}; /// method to limit the introduction of labels. pub fn propagate<'scope, T, N, L, R>(edges: VecCollection<'scope, T, (N,N), R>, nodes: VecCollection<'scope, T,(N,L),R>) -> VecCollection<'scope, T,(N,L),R> where - T: Timestamp + Lattice + Ord + Hash, + T: Timestamp + Lattice + Hash, N: ExchangeData+Hash, R: ExchangeData+Abelian, R: Multiply, @@ -32,7 +32,7 @@ where /// method to limit the introduction of labels. pub fn propagate_at<'scope, T, N, L, F, R>(edges: VecCollection<'scope, T, (N,N), R>, nodes: VecCollection<'scope, T,(N,L),R>, logic: F) -> VecCollection<'scope, T,(N,L),R> where - T: Timestamp + Lattice + Ord + Hash, + T: Timestamp + Lattice + Hash, N: ExchangeData+Hash, R: ExchangeData+Abelian, R: Multiply, diff --git a/differential-dataflow/src/algorithms/graphs/scc.rs b/differential-dataflow/src/algorithms/graphs/scc.rs index 7dcb1de7f..c66f93ba3 100644 --- a/differential-dataflow/src/algorithms/graphs/scc.rs +++ b/differential-dataflow/src/algorithms/graphs/scc.rs @@ -14,7 +14,7 @@ use super::propagate::propagate; /// Returns the subset of edges in the same strongly connected component. pub fn strongly_connected<'scope, T, N, R>(graph: VecCollection<'scope, T, (N,N), R>) -> VecCollection<'scope, T, (N,N), R> where - T: Timestamp + Lattice + Ord + Hash, + T: Timestamp + Lattice + Hash, N: ExchangeData + Hash, R: ExchangeData + Abelian, R: Multiply, @@ -39,7 +39,7 @@ where fn trim_edges<'scope, T, N, R>(cycle: VecCollection<'scope, T, (N,N), R>, edges: VecCollection<'scope, T, (N,N), R>) -> VecCollection<'scope, T, (N,N), R> where - T: Timestamp + Lattice + Ord + Hash, + T: Timestamp + Lattice + Hash, N: ExchangeData + Hash, R: ExchangeData + Abelian, R: Multiply, diff --git a/differential-dataflow/src/algorithms/graphs/sequential.rs b/differential-dataflow/src/algorithms/graphs/sequential.rs index 9c58cac61..fec21f664 100644 --- a/differential-dataflow/src/algorithms/graphs/sequential.rs +++ b/differential-dataflow/src/algorithms/graphs/sequential.rs @@ -11,12 +11,12 @@ use crate::hashable::Hashable; fn _color<'scope, T, N>(edges: VecCollection<'scope, T, (N,N)>) -> VecCollection<'scope, T,(N,Option)> where - T: Timestamp + Lattice + Ord + Hash, + T: Timestamp + Lattice + Hash, N: ExchangeData+Hash, { // need some bogus initial values. let start = edges.clone() - .map(|(x,_y)| (x,u32::max_value())) + .map(|(x,_y)| (x,u32::MAX)) .distinct(); // repeatedly apply color-picking logic. @@ -45,7 +45,7 @@ pub fn sequence<'scope, T, N, V, F>( edges: VecCollection<'scope, T, (N,N)>, logic: F) -> VecCollection<'scope, T, (N,Option)> where - T: Timestamp + Lattice + Hash + Ord, + T: Timestamp + Lattice + Hash, N: ExchangeData+Hashable, V: ExchangeData, F: Fn(&N, &[(&V, isize)])->V+'static diff --git a/differential-dataflow/src/algorithms/prefix_sum.rs b/differential-dataflow/src/algorithms/prefix_sum.rs index 90156d688..e0dbc5abd 100644 --- a/differential-dataflow/src/algorithms/prefix_sum.rs +++ b/differential-dataflow/src/algorithms/prefix_sum.rs @@ -32,7 +32,7 @@ where fn prefix_sum_at(self, locations: VecCollection<'scope, T, (usize, K)>, zero: D, combine: F) -> Self where F: Fn(&K,&D,&D)->D + 'static { let combine1 = ::std::rc::Rc::new(combine); - let combine2 = combine1.clone(); + let combine2 = ::std::rc::Rc::clone(&combine1); let ranges = aggregate(self.clone(), move |k,x,y| (*combine1)(k,x,y)); broadcast(ranges, locations, zero, move |k,x,y| (*combine2)(k,x,y)) @@ -54,7 +54,7 @@ where .clone() .iterate(|scope, ranges| { - // Each available range, of size less than usize::max_value(), advertises itself as the range + // Each available range, of size less than usize::MAX, advertises itself as the range // twice as large, aligned to integer multiples of its size. Each range, which may contain at // most two elements, then summarizes itself using the `combine` function. Finally, we re-add // the initial `unit_ranges` intervals, so that the set of ranges grows monotonically. @@ -79,7 +79,7 @@ pub fn broadcast<'scope, T, K, D, F>( zero: D, combine: F) -> VecCollection<'scope, T, ((usize, K), D)> where - T: Timestamp + Lattice + Ord + ::std::fmt::Debug, + T: Timestamp + Lattice, K: ExchangeData + ::std::hash::Hash, D: ExchangeData + ::std::hash::Hash, F: Fn(&K,&D,&D)->D + 'static, diff --git a/differential-dataflow/src/capture.rs b/differential-dataflow/src/capture.rs index def7c9ae4..ba1522aca 100644 --- a/differential-dataflow/src/capture.rs +++ b/differential-dataflow/src/capture.rs @@ -304,7 +304,7 @@ pub mod source { let mut antichain = MutableAntichain::new(); antichain.update_iter(Some((T::minimum(), workers as i64))); let shared_frontier = Rc::new(RefCell::new(antichain)); - let shared_frontier2 = shared_frontier.clone(); + let shared_frontier2 = Rc::clone(&shared_frontier); // Step 1: The MESSAGES operator. let mut messages_op = OperatorBuilder::new("CDCV2_Messages".to_string(), scope); @@ -329,7 +329,7 @@ pub mod source { let mut updates_caps = CapabilitySet::from_elem(capabilities[0].clone()); let mut progress_caps = CapabilitySet::from_elem(capabilities[1].clone()); // Capture the shared frontier to read out frontier updates to apply. - let local_frontier = shared_frontier.clone(); + let local_frontier = Rc::clone(&shared_frontier); // move |_frontiers| { // First check to ensure that we haven't been terminated by someone dropping our tokens. @@ -723,183 +723,3 @@ pub mod sink { }); } } - -// pub mod kafka { - -// use serde::{Serialize, Deserialize}; -// use timely::scheduling::SyncActivator; -// use rdkafka::{ClientContext, config::ClientConfig}; -// use rdkafka::consumer::{BaseConsumer, ConsumerContext}; -// use rdkafka::error::{KafkaError, RDKafkaError}; -// use super::BytesSink; - -// use std::hash::Hash; -// use timely::progress::Timestamp; -// use timely::dataflow::{Scope, Stream}; -// use crate::ExchangeData; -// use crate::lattice::Lattice; - -// /// Creates a Kafka source from supplied configuration information. -// pub fn create_source(scope: T, addr: &str, topic: &str, group: &str) -> (Box, Stream>) -// where -// T: Scope, -// D: ExchangeData + Hash + for<'a> serde::Deserialize<'a>, -// T: ExchangeData + Hash + for<'a> serde::Deserialize<'a> + Timestamp + Lattice, -// R: ExchangeData + Hash + for<'a> serde::Deserialize<'a>, -// { -// super::source::build(scope, |activator| { -// let source = KafkaSource::new(addr, topic, group, activator); -// // An iterator combinator that yields every "duration" even if more items exist. -// // The implementation of such an iterator exists in the git history, or can be rewritten easily. -// super::YieldingIter::new_from(Iter::::new_from(source), std::time::Duration::from_millis(10)) -// }) -// } - -// pub fn create_sink(stream: &Stream>, addr: &str, topic: &str) -> Box -// where -// T: Scope, -// D: ExchangeData + Hash + Serialize + for<'a> Deserialize<'a>, -// T: ExchangeData + Hash + Serialize + for<'a> Deserialize<'a> + Timestamp + Lattice, -// R: ExchangeData + Hash + Serialize + for<'a> Deserialize<'a>, -// { -// use std::rc::Rc; -// use std::cell::RefCell; -// use crate::hashable::Hashable; - -// let sink = KafkaSink::new(addr, topic); -// let result = Rc::new(RefCell::new(sink)); -// let sink_hash = (addr.to_string(), topic.to_string()).hashed(); -// super::sink::build( -// &stream, -// sink_hash, -// Rc::downgrade(&result), -// Rc::downgrade(&result), -// ); -// Box::new(result) - -// } - -// pub struct KafkaSource { -// consumer: BaseConsumer, -// } - -// impl KafkaSource { -// pub fn new(addr: &str, topic: &str, group: &str, activator: SyncActivator) -> Self { -// let mut kafka_config = ClientConfig::new(); -// // This is mostly cargo-cult'd in from `source/kafka.rs`. -// kafka_config.set("bootstrap.servers", &addr.to_string()); -// kafka_config -// .set("enable.auto.commit", "false") -// .set("auto.offset.reset", "earliest"); - -// kafka_config.set("topic.metadata.refresh.interval.ms", "30000"); // 30 seconds -// kafka_config.set("fetch.message.max.bytes", "134217728"); -// kafka_config.set("group.id", group); -// kafka_config.set("isolation.level", "read_committed"); -// let activator = ActivationConsumerContext(activator); -// let consumer = kafka_config.create_with_context::<_, BaseConsumer<_>>(activator).unwrap(); -// use rdkafka::consumer::Consumer; -// consumer.subscribe(&[topic]).unwrap(); -// Self { -// consumer, -// } -// } -// } - -// pub struct Iter { -// pub source: KafkaSource, -// phantom: std::marker::PhantomData<(D, T, R)>, -// } - -// impl Iter { -// /// Constructs a new iterator from a bytes source. -// pub fn new_from(source: KafkaSource) -> Self { -// Self { -// source, -// phantom: std::marker::PhantomData, -// } -// } -// } - -// impl Iterator for Iter -// where -// D: for<'a>Deserialize<'a>, -// T: for<'a>Deserialize<'a>, -// R: for<'a>Deserialize<'a>, -// { -// type Item = super::Message; -// fn next(&mut self) -> Option { -// use rdkafka::message::Message; -// self.source -// .consumer -// .poll(std::time::Duration::from_millis(0)) -// .and_then(|result| result.ok()) -// .and_then(|message| { -// message.payload().and_then(|message| bincode::deserialize::>(message).ok()) -// }) -// } -// } - -// /// An implementation of [`ConsumerContext`] that unparks the wrapped thread -// /// when the message queue switches from nonempty to empty. -// struct ActivationConsumerContext(SyncActivator); - -// impl ClientContext for ActivationConsumerContext { } - -// impl ActivationConsumerContext { -// fn activate(&self) { -// self.0.activate().unwrap(); -// } -// } - -// impl ConsumerContext for ActivationConsumerContext { -// fn message_queue_nonempty_callback(&self) { -// self.activate(); -// } -// } - -// use std::time::Duration; -// use rdkafka::producer::DefaultProducerContext; -// use rdkafka::producer::{BaseRecord, ThreadedProducer}; - -// pub struct KafkaSink { -// topic: String, -// producer: ThreadedProducer, -// } - -// impl KafkaSink { -// pub fn new(addr: &str, topic: &str) -> Self { -// let mut config = ClientConfig::new(); -// config.set("bootstrap.servers", &addr); -// config.set("queue.buffering.max.kbytes", &format!("{}", 16 << 20)); -// config.set("queue.buffering.max.messages", &format!("{}", 10_000_000)); -// config.set("queue.buffering.max.ms", &format!("{}", 10)); -// let producer = config -// .create_with_context::<_, ThreadedProducer<_>>(DefaultProducerContext) -// .expect("creating kafka producer for kafka sinks failed"); -// Self { -// producer, -// topic: topic.to_string(), -// } -// } -// } - -// impl BytesSink for KafkaSink { -// fn poll(&mut self, bytes: &[u8]) -> Option { -// let record = BaseRecord::<[u8], _>::to(&self.topic).payload(bytes); - -// self.producer.send(record).err().map(|(e, _)| { -// if let KafkaError::MessageProduction(RDKafkaError::QueueFull) = e { -// Duration::from_secs(1) -// } else { -// // TODO(frank): report this error upwards so the user knows the sink is dead. -// Duration::from_secs(1) -// } -// }) -// } -// fn done(&self) -> bool { -// self.producer.in_flight_count() == 0 -// } -// } - -// } diff --git a/differential-dataflow/src/dynamic/pointstamp.rs b/differential-dataflow/src/dynamic/pointstamp.rs index ee390b4f4..cf2bdf4fe 100644 --- a/differential-dataflow/src/dynamic/pointstamp.rs +++ b/differential-dataflow/src/dynamic/pointstamp.rs @@ -87,7 +87,7 @@ impl std::ops::Deref for PointStamp { // Implement timely dataflow's `PartialOrder` trait. use timely::order::PartialOrder; -impl PartialOrder for PointStamp { +impl PartialOrder for PointStamp { fn less_equal(&self, other: &Self) -> bool { // Every present coordinate must be less-equal the corresponding coordinate, // where absent corresponding coordinates are `T::minimum()`. Coordinates @@ -224,7 +224,7 @@ impl Timestamp for PointStamp { // Implement differential dataflow's `Lattice` trait. // This extends the `PartialOrder` implementation with additional structure. use crate::lattice::Lattice; -impl Lattice for PointStamp { +impl Lattice for PointStamp { #[inline(always)] fn join(&self, other: &Self) -> Self { let min_len = ::std::cmp::min(self.vector.len(), other.vector.len()); diff --git a/differential-dataflow/src/input.rs b/differential-dataflow/src/input.rs index e43323c7d..01292ec0a 100644 --- a/differential-dataflow/src/input.rs +++ b/differential-dataflow/src/input.rs @@ -182,20 +182,6 @@ impl InputSession { pub fn remove(&mut self, element: D) { self.update(element,-1); } } -// impl InputSession { -// /// Adds an element to the collection. -// pub fn insert(&mut self, element: D) { self.update(element, 1); } -// /// Removes an element from the collection. -// pub fn remove(&mut self, element: D) { self.update(element,-1); } -// } - -// impl InputSession { -// /// Adds an element to the collection. -// pub fn insert(&mut self, element: D) { self.update(element, 1); } -// /// Removes an element from the collection. -// pub fn remove(&mut self, element: D) { self.update(element,-1); } -// } - impl InputSession { /// Introduces a handle as collection. diff --git a/differential-dataflow/src/operators/arrange/agent.rs b/differential-dataflow/src/operators/arrange/agent.rs index 630c4258a..d58957356 100644 --- a/differential-dataflow/src/operators/arrange/agent.rs +++ b/differential-dataflow/src/operators/arrange/agent.rs @@ -11,7 +11,6 @@ use timely::progress::{Antichain, frontier::AntichainRef}; use timely::dataflow::operators::CapabilitySet; use crate::trace::{Trace, TraceReader, BatchReader}; -use crate::trace::wrappers::rc::TraceBox; use timely::scheduling::Activator; @@ -26,7 +25,7 @@ use crate::trace::wrappers::frontier::{TraceFrontier, BatchFrontier}; /// The `TraceAgent` is the default trace type produced by `arranged`, and it can be extracted /// from the dataflow in which it was defined, and imported into other dataflows. pub struct TraceAgent { - trace: Rc>>, + trace: Rc>>, queues: Weak>>>, logical_compaction: Antichain, physical_compaction: Antichain, @@ -81,7 +80,7 @@ impl TraceAgent { where Tr: Trace, { - let trace = Rc::new(RefCell::new(TraceBox::new(trace))); + let trace = Rc::new(RefCell::new(trace_box::TraceBox::new(trace))); let queues = Rc::new(RefCell::new(Vec::new())); if let Some(logging) = &logging { @@ -91,7 +90,7 @@ impl TraceAgent { } let reader = TraceAgent { - trace: trace.clone(), + trace: Rc::clone(&trace), queues: Rc::downgrade(&queues), logical_compaction: trace.borrow().logical_compaction.frontier().to_owned(), physical_compaction: trace.borrow().physical_compaction.frontier().to_owned(), @@ -148,13 +147,13 @@ impl TraceAgent { &self.operator } - /// Obtain a reference to the inner [`TraceBox`]. It is the caller's obligation to maintain + /// Obtain a reference to the inner [`trace_box::TraceBox`]. It is the caller's obligation to maintain /// the trace box and this trace agent's invariants. Specifically, it is undefined behavior /// to mutate the trace box. Keeping strong references can prevent resource reclamation. /// /// This method is subject to changes and removal and should not be considered part of a stable /// interface. - pub fn trace_box_unstable(&self) -> Rc>> { + pub fn trace_box_unstable(&self) -> Rc>> { Rc::clone(&self.trace) } } @@ -291,7 +290,7 @@ impl TraceAgent { let queue = self.new_listener(activator); let activator = scope.activator_for(info.address); - *shutdown_button_ref = Some(ShutdownButton::new(capabilities.clone(), activator)); + *shutdown_button_ref = Some(ShutdownButton::new(Rc::clone(&capabilities), activator)); capabilities.borrow_mut().as_mut().unwrap().insert(capability); @@ -424,7 +423,7 @@ impl TraceAgent { let queue = self.new_listener(activator); let activator = scope.activator_for(info.address); - *shutdown_button_ref = Some(ShutdownButton::new(capabilities.clone(), activator)); + *shutdown_button_ref = Some(ShutdownButton::new(Rc::clone(&capabilities), activator)); capabilities.borrow_mut().as_mut().unwrap().insert(capability); @@ -485,25 +484,6 @@ impl ShutdownButton { *self.reference.borrow_mut() = None; self.activator.activate(); } - /// Hotwires the button to one that is pressed if dropped. - pub fn press_on_drop(self) -> ShutdownDeadmans { - ShutdownDeadmans { - button: self - } - } -} - -/// A deadman's switch version of a shutdown button. -/// -/// This type hosts a shutdown button and will press it when dropped. -pub struct ShutdownDeadmans { - button: ShutdownButton, -} - -impl Drop for ShutdownDeadmans { - fn drop(&mut self) { - self.button.press(); - } } impl Clone for TraceAgent { @@ -521,8 +501,8 @@ impl Clone for TraceAgent { self.trace.borrow_mut().adjust_physical_compaction(empty_frontier.borrow(), self.physical_compaction.borrow()); TraceAgent { - trace: self.trace.clone(), - queues: self.queues.clone(), + trace: Rc::clone(&self.trace), + queues: Weak::clone(&self.queues), logical_compaction: self.logical_compaction.clone(), physical_compaction: self.physical_compaction.clone(), operator: self.operator.clone(), @@ -547,3 +527,67 @@ impl Drop for TraceAgent { self.trace.borrow_mut().adjust_physical_compaction(self.physical_compaction.borrow(), empty_frontier.borrow()); } } + +/// A trace wrapper suitable for use through shared reference counted ownership. +/// +/// The wrapper mainly accumulates the expressed compaction constraints from many, +/// and presents their implications to the wrapped trace. +pub mod trace_box { + + use timely::progress::{frontier::{AntichainRef, MutableAntichain}}; + + use crate::trace::TraceReader; + + /// A wrapper around a trace which tracks the frontiers of all referees. + /// + /// This is an internal type, unlikely to be useful to higher-level programs, but exposed just in case. + /// This type is equivalent to a `RefCell`, in that it wraps the mutable state that multiple referrers + /// may influence. + pub struct TraceBox { + /// accumulated holds on times for advancement. + pub (crate) logical_compaction: MutableAntichain, + /// accumulated holds on times for distinction. + pub (crate) physical_compaction: MutableAntichain, + /// The wrapped trace. + pub (crate) trace: Tr, + } + + impl TraceBox { + /// Moves an existing trace into a shareable trace wrapper. + /// + /// The trace may already exist and have non-initial advance and distinguish frontiers. The boxing + /// process will fish these out and make sure that they are used for the initial read capabilities. + pub fn new(mut trace: Tr) -> Self { + + let mut logical_compaction = MutableAntichain::new(); + logical_compaction.update_iter(trace.get_logical_compaction().iter().cloned().map(|t| (t,1))); + let mut physical_compaction = MutableAntichain::new(); + physical_compaction.update_iter(trace.get_physical_compaction().iter().cloned().map(|t| (t,1))); + + TraceBox { + logical_compaction, + physical_compaction, + trace, + } + } + /// Borrowed access to the underlying trace. + /// + /// This is used to inspect batches for purposes of resource accounting in external systems. + pub fn trace(&self) -> &Tr { &self.trace } + /// Replaces elements of `lower` with those of `upper`. + #[inline] + pub fn adjust_logical_compaction(&mut self, lower: AntichainRef, upper: AntichainRef) { + self.logical_compaction.update_iter(upper.iter().cloned().map(|t| (t,1))); + self.logical_compaction.update_iter(lower.iter().cloned().map(|t| (t,-1))); + self.trace.set_logical_compaction(self.logical_compaction.frontier()); + } + /// Replaces elements of `lower` with those of `upper`. + #[inline] + pub fn adjust_physical_compaction(&mut self, lower: AntichainRef, upper: AntichainRef) { + self.physical_compaction.update_iter(upper.iter().cloned().map(|t| (t,1))); + self.physical_compaction.update_iter(lower.iter().cloned().map(|t| (t,-1))); + self.trace.set_physical_compaction(self.physical_compaction.frontier()); + } + } + +} diff --git a/differential-dataflow/src/operators/arrange/arrangement.rs b/differential-dataflow/src/operators/arrange/arrangement.rs index de533a08f..3b5a63fb6 100644 --- a/differential-dataflow/src/operators/arrange/arrangement.rs +++ b/differential-dataflow/src/operators/arrange/arrangement.rs @@ -41,10 +41,7 @@ use super::TraceAgent; /// /// An `Arranged` allows multiple differential operators to share the resources (communication, /// computation, memory) required to produce and maintain an indexed representation of a collection. -pub struct Arranged<'scope, Tr> -where - Tr: TraceReader+Clone, -{ +pub struct Arranged<'scope, Tr: TraceReader> { /// A stream containing arranged updates. /// /// This stream contains the same batches of updates the trace itself accepts, so there should @@ -53,14 +50,9 @@ where pub stream: Stream<'scope, Tr::Time, Vec>, /// A shared trace, updated by the `Arrange` operator and readable by others. pub trace: Tr, - // TODO : We might have an `Option>` here, which `as_collection` sets and - // returns when invoked, so as to not duplicate work with multiple calls to `as_collection`. } -impl<'scope, Tr> Clone for Arranged<'scope, Tr> -where - Tr: TraceReader + Clone, -{ +impl<'scope, Tr: TraceReader+Clone> Clone for Arranged<'scope, Tr> { fn clone(&self) -> Self { Arranged { stream: self.stream.clone(), @@ -72,10 +64,7 @@ where use ::timely::progress::timestamp::Refines; use timely::Container; -impl<'scope, Tr> Arranged<'scope, Tr> -where - Tr: TraceReader + Clone, -{ +impl<'scope, Tr: TraceReader> Arranged<'scope, Tr> { /// Brings an arranged collection into a nested scope. /// /// This method produces a proxy trace handle that uses the same backing data, but acts as if the timestamps @@ -224,10 +213,7 @@ where use crate::difference::Multiply; // Direct join implementations. -impl<'scope, Tr1> Arranged<'scope, Tr1> -where - Tr1: TraceReader + Clone + 'static, -{ +impl<'scope, Tr1: TraceReader+'static> Arranged<'scope, Tr1> { /// A convenience method to join and produce `VecCollection` output. /// /// Avoid this method, as it is likely to evolve into one without the `VecCollection` opinion. @@ -260,10 +246,7 @@ where // Direct reduce implementations. use crate::difference::Abelian; -impl<'scope, Tr1> Arranged<'scope, Tr1> -where - Tr1: TraceReader + Clone + 'static, -{ +impl<'scope, Tr1: TraceReader+'static> Arranged<'scope, Tr1> { /// A direct implementation of `ReduceCore::reduce_abelian`. pub fn reduce_abelian(self, name: &str, mut logic: L, push: P) -> Arranged<'scope, TraceAgent> where @@ -304,10 +287,7 @@ where } -impl<'scope, Tr> Arranged<'scope, Tr> -where - Tr: TraceReader + Clone, -{ +impl<'scope, Tr: TraceReader> Arranged<'scope, Tr> { /// Brings an arranged collection out of a nested region. /// /// This method only applies to *regions*, which are subscopes with the same timestamp @@ -322,10 +302,7 @@ where } /// A type that can be arranged as if a collection of updates. -pub trait Arrange<'scope, T, C> : Sized -where - T: Timestamp + Lattice, -{ +pub trait Arrange<'scope, T: Timestamp+Lattice, C> : Sized { /// Arranges updates into a shared trace. fn arrange(self) -> Arranged<'scope, TraceAgent> where @@ -389,7 +366,7 @@ where // Capabilities for the lower envelope of updates in `batcher`. let mut capabilities = Antichain::>::new(); - let activator = Some(scope.activator_for(info.address.clone())); + let activator = Some(scope.activator_for(std::rc::Rc::clone(&info.address))); let mut empty_trace = Tr::new(info.clone(), logger.clone(), activator); // If there is default exertion logic set, install it. if let Some(exert_logic) = scope.worker().config().get::("differential/default_exert_logic").cloned() { diff --git a/differential-dataflow/src/operators/arrange/mod.rs b/differential-dataflow/src/operators/arrange/mod.rs index 70f9fb866..fedbfdc23 100644 --- a/differential-dataflow/src/operators/arrange/mod.rs +++ b/differential-dataflow/src/operators/arrange/mod.rs @@ -69,4 +69,4 @@ pub mod upsert; pub use self::writer::TraceWriter; pub use self::agent::{TraceAgent, ShutdownButton}; -pub use self::arrangement::{Arranged, Arrange}; \ No newline at end of file +pub use self::arrangement::{Arranged, Arrange}; diff --git a/differential-dataflow/src/operators/arrange/upsert.rs b/differential-dataflow/src/operators/arrange/upsert.rs index 5e336fc3d..60fb51f26 100644 --- a/differential-dataflow/src/operators/arrange/upsert.rs +++ b/differential-dataflow/src/operators/arrange/upsert.rs @@ -159,7 +159,7 @@ where // Tracks the lower envelope of times in `priority_queue`. let mut capabilities = Antichain::>::new(); // Form the trace we will both use internally and publish. - let activator = Some(scope.activator_for(info.address.clone())); + let activator = Some(scope.activator_for(std::rc::Rc::clone(&info.address))); let mut empty_trace = Tr::new(info.clone(), logger.clone(), activator); if let Some(exert_logic) = scope.worker().config().get::("differential/default_exert_logic").cloned() { @@ -244,7 +244,7 @@ where // Attempt to find the key in the trace. trace_cursor.seek_key(&trace_storage, key_con.index(0)); - if trace_cursor.get_key(&trace_storage).map(|k| k.eq(&key_con.index(0))).unwrap_or(false) { + if trace_cursor.get_key(&trace_storage).map(|k| k.eq(key_con.index(0))).unwrap_or(false) { // Determine the prior value associated with the key. while let Some(val) = trace_cursor.get_val(&trace_storage) { let mut count = 0; diff --git a/differential-dataflow/src/operators/arrange/writer.rs b/differential-dataflow/src/operators/arrange/writer.rs index 8df11690c..4d3a2d031 100644 --- a/differential-dataflow/src/operators/arrange/writer.rs +++ b/differential-dataflow/src/operators/arrange/writer.rs @@ -9,11 +9,10 @@ use std::cell::RefCell; use timely::progress::Antichain; use crate::trace::{Trace, Batch, BatchReader}; -use crate::trace::wrappers::rc::TraceBox; - use super::TraceAgentQueueWriter; use super::TraceReplayInstruction; +use super::agent::trace_box::TraceBox; /// Write endpoint for a sequence of batches. /// diff --git a/differential-dataflow/src/operators/count.rs b/differential-dataflow/src/operators/count.rs index abc3d20db..b0de75c62 100644 --- a/differential-dataflow/src/operators/count.rs +++ b/differential-dataflow/src/operators/count.rs @@ -14,7 +14,7 @@ use crate::operators::arrange::Arranged; use crate::trace::{BatchReader, Cursor, TraceReader}; /// Extension trait for the `count` differential dataflow method. -pub trait CountTotal<'scope, T: Timestamp + TotalOrder + Lattice + Ord, K: ExchangeData, R: Semigroup> : Sized { +pub trait CountTotal<'scope, T: Timestamp + TotalOrder + Lattice, K: ExchangeData, R: Semigroup> : Sized { /// Counts the number of occurrences of each element. /// /// # Examples @@ -44,7 +44,7 @@ pub trait CountTotal<'scope, T: Timestamp + TotalOrder + Lattice + Ord, K: Excha impl<'scope, T, K: ExchangeData+Hashable, R: ExchangeData+Semigroup> CountTotal<'scope, T, K, R> for VecCollection<'scope, T, K, R> where - T: Timestamp + TotalOrder + Lattice + Ord, + T: Timestamp + TotalOrder + Lattice, { fn count_total_core + 'static>(self) -> VecCollection<'scope, T, (K, R), R2> { self.arrange_by_self_named("Arrange: CountTotal") diff --git a/differential-dataflow/src/operators/join.rs b/differential-dataflow/src/operators/join.rs index 00277ced3..477bc4b92 100644 --- a/differential-dataflow/src/operators/join.rs +++ b/differential-dataflow/src/operators/join.rs @@ -68,21 +68,21 @@ impl, D> PushInto for EffortBuilder { /// [`AsCollection`]: crate::collection::AsCollection pub fn join_traces<'scope, Tr1, Tr2, L, CB>(arranged1: Arranged<'scope, Tr1>, arranged2: Arranged<'scope, Tr2>, mut result: L) -> Stream<'scope, Tr1::Time, CB::Container> where - Tr1: TraceReader+Clone+'static, - Tr2: for<'a> TraceReader=Tr1::Key<'a>, Time = Tr1::Time>+Clone+'static, + Tr1: TraceReader+'static, + Tr2: for<'a> TraceReader=Tr1::Key<'a>, Time = Tr1::Time>+'static, L: FnMut(Tr1::Key<'_>,Tr1::Val<'_>,Tr2::Val<'_>,&Tr1::Time,&Tr1::Diff,&Tr2::Diff,&mut JoinSession>)+'static, CB: ContainerBuilder, { // Rename traces for symmetry from here on out. - let mut trace1 = arranged1.trace.clone(); - let mut trace2 = arranged2.trace.clone(); + let mut trace1 = arranged1.trace; + let mut trace2 = arranged2.trace; let scope = arranged1.stream.scope(); arranged1.stream.binary_frontier(arranged2.stream, Pipeline, Pipeline, "Join", move |capability, info| { // Acquire an activator to reschedule the operator when it has unfinished work. use timely::scheduling::Activator; - let activations = scope.activations().clone(); + let activations = scope.activations(); let activator = Activator::new(info.address, activations); // Our initial invariants are that for each trace, physical compaction is less or equal the trace's upper bound. @@ -310,7 +310,7 @@ where /// dataflow system a chance to run operators that can consume and aggregate the data. struct Deferred where - T: Timestamp+Lattice+Ord, + T: Timestamp+Lattice, C1: Cursor, C2: for<'a> Cursor=C1::Key<'a>, Time=T>, { @@ -326,7 +326,7 @@ impl Deferred where C1: Cursor, C2: for<'a> Cursor=C1::Key<'a>, Time=T>, - T: Timestamp+Lattice+Ord, + T: Timestamp+Lattice, { fn new(trace: C1, trace_storage: C1::Storage, batch: C2, batch_storage: C2::Storage, capability: Capability) -> Self { Deferred { diff --git a/differential-dataflow/src/operators/reduce.rs b/differential-dataflow/src/operators/reduce.rs index 06fbddc99..046a5e32c 100644 --- a/differential-dataflow/src/operators/reduce.rs +++ b/differential-dataflow/src/operators/reduce.rs @@ -33,7 +33,7 @@ use crate::trace::TraceReader; /// key's computation to another, and will likely introduce non-determinism. pub fn reduce_trace<'scope, Tr1, Bu, Tr2, L, P>(trace: Arranged<'scope, Tr1>, name: &str, mut logic: L, mut push: P) -> Arranged<'scope, TraceAgent> where - Tr1: TraceReader + Clone + 'static, + Tr1: TraceReader + 'static, Tr2: for<'a> Trace=Tr1::Key<'a>, ValOwn: Data, Time = Tr1::Time> + 'static, Bu: Builder, L: FnMut(Tr1::Key<'_>, &[(Tr1::Val<'_>, Tr1::Diff)], &mut Vec<(Tr2::ValOwn,Tr2::Diff)>, &mut Vec<(Tr2::ValOwn, Tr2::Diff)>)+'static, @@ -44,6 +44,7 @@ where // fabricate a data-parallel operator using the `unary_notify` pattern. let stream = { + let mut source_trace = trace.trace; let result_trace = &mut result_trace; let scope = trace.stream.scope(); trace.stream.unary_frontier(Pipeline, name, move |_capability, operator_info| { @@ -51,15 +52,13 @@ where // Acquire a logger for arrange events. let logger = scope.worker().logger_for::("differential/arrange").map(Into::into); - let activator = Some(scope.activator_for(operator_info.address.clone())); + let activator = Some(scope.activator_for(std::rc::Rc::clone(&operator_info.address))); let mut empty = Tr2::new(operator_info.clone(), logger.clone(), activator); // If there is default exert logic set, install it. if let Some(exert_logic) = scope.worker().config().get::("differential/default_exert_logic").cloned() { empty.set_exert_logic(exert_logic); } - let mut source_trace = trace.trace.clone(); - let (mut output_reader, mut output_writer) = TraceAgent::new(empty, operator_info, logger); *result_trace = Some(output_reader.clone()); diff --git a/differential-dataflow/src/operators/threshold.rs b/differential-dataflow/src/operators/threshold.rs index d7716a0db..f712ae7ba 100644 --- a/differential-dataflow/src/operators/threshold.rs +++ b/differential-dataflow/src/operators/threshold.rs @@ -17,7 +17,7 @@ use crate::operators::arrange::Arranged; use crate::trace::{BatchReader, Cursor, TraceReader}; /// Extension trait for the `distinct` differential dataflow method. -pub trait ThresholdTotal<'scope, T: Timestamp + TotalOrder + Lattice + Ord, K: ExchangeData, R: ExchangeData+Semigroup> : Sized { +pub trait ThresholdTotal<'scope, T: Timestamp + TotalOrder + Lattice, K: ExchangeData, R: ExchangeData+Semigroup> : Sized { /// Reduces the collection to one occurrence of each distinct element. fn threshold_semigroup(self, thresh: F) -> VecCollection<'scope, T, K, R2> where @@ -86,7 +86,7 @@ pub trait ThresholdTotal<'scope, T: Timestamp + TotalOrder + Lattice + Ord, K: E impl<'scope, T, K: ExchangeData+Hashable, R: ExchangeData+Semigroup> ThresholdTotal<'scope, T, K, R> for VecCollection<'scope, T, K, R> where - T: Timestamp + TotalOrder + Lattice + Ord, + T: Timestamp + TotalOrder + Lattice, { fn threshold_semigroup(self, thresh: F) -> VecCollection<'scope, T, K, R2> where diff --git a/differential-dataflow/src/trace/implementations/chainless_batcher.rs b/differential-dataflow/src/trace/implementations/chainless_batcher.rs deleted file mode 100644 index de4e6c0ee..000000000 --- a/differential-dataflow/src/trace/implementations/chainless_batcher.rs +++ /dev/null @@ -1,105 +0,0 @@ -//! A `Batcher` implementation based on merge sort. - -use timely::progress::frontier::AntichainRef; -use timely::progress::{frontier::Antichain, Timestamp}; - -use crate::logging::Logger; -use crate::trace; - -/// A type that can be used as storage within a merge batcher. -pub trait BatcherStorage : Default + Sized { - /// Number of contained updates. - fn len(&self) -> usize; - /// Merges two storage containers into one. - /// - /// This is expected to consolidate updates as it goes. - fn merge(self, other: Self) -> Self; - /// Extracts elements not greater or equal to the frontier. - fn split(&mut self, frontier: AntichainRef) -> Self; - /// Ensures `frontier` is less or equal to all contained times. - /// - /// Consider merging with `split`, but needed for new stores as well. - fn lower(&self, frontier: &mut Antichain); -} - -/// A batcher that simple merges `BatcherStorage` implementors. -pub struct Batcher> { - /// Each store is at least twice the size of the next. - storages: Vec, - /// The lower bound of timestamps of the maintained updates. - lower: Antichain, - /// The previosly minted frontier. - prior: Antichain, - - /// Logger for size accounting. - _logger: Option, - /// Timely operator ID. - _operator_id: usize, -} - -impl> Batcher { - /// Ensures lists decrease in size geometrically. - fn tidy(&mut self) { - self.storages.retain(|x| x.len() > 0); - self.storages.sort_by_key(|x| x.len()); - self.storages.reverse(); - while let Some(pos) = (1..self.storages.len()).position(|i| self.storages[i-1].len() < 2 * self.storages[i].len()) { - while self.storages.len() > pos + 1 { - let x = self.storages.pop().unwrap(); - let y = self.storages.pop().unwrap(); - self.storages.push(x.merge(y)); - self.storages.sort_by_key(|x| x.len()); - self.storages.reverse(); - } - } - } -} - -impl> trace::Batcher for Batcher { - type Time = T; - type Input = S; - type Output = S; - - fn new(logger: Option, operator_id: usize) -> Self { - Self { - storages: Vec::default(), - lower: Default::default(), - prior: Antichain::from_elem(T::minimum()), - _logger: logger, - _operator_id: operator_id, - } - } - - fn push_container(&mut self, batch: &mut Self::Input) { - if batch.len() > 0 { - // TODO: This appears to be optional based on `frontier` only being called after `seal`. - // For the moment, the trait doesn't promise this, but keep eyes on the cost. - batch.lower(&mut self.lower); - self.storages.push(std::mem::take(batch)); - self.tidy(); - } - } - - fn seal>(&mut self, upper: Antichain) -> B::Output { - let description = trace::Description::new(self.prior.clone(), upper.clone(), Antichain::new()); - self.prior = upper.clone(); - if let Some(mut store) = self.storages.pop() { - self.lower.clear(); - let mut ship = store.split(upper.borrow()); - let mut keep = store; - while let Some(mut store) = self.storages.pop() { - let split = store.split(upper.borrow()); - ship = ship.merge(split); - keep = keep.merge(store); - } - keep.lower(&mut self.lower); - self.storages.push(keep); - B::seal(&mut vec![ship], description) - } - else { - B::seal(&mut vec![], description) - } - } - - fn frontier(&mut self) -> AntichainRef<'_, Self::Time> { self.lower.borrow() } -} diff --git a/differential-dataflow/src/trace/implementations/huffman_container.rs b/differential-dataflow/src/trace/implementations/huffman_container.rs deleted file mode 100644 index 26ac1ffee..000000000 --- a/differential-dataflow/src/trace/implementations/huffman_container.rs +++ /dev/null @@ -1,532 +0,0 @@ -//! A slice container that Huffman encodes its contents. - -use std::collections::BTreeMap; -use timely::container::PushInto; - -use crate::trace::implementations::{BatchContainer, OffsetList}; - -use self::wrapper::Wrapped; -use self::encoded::Encoded; -use self::huffman::Huffman; - -/// A container that contains slices `[B]` as items. -pub struct HuffmanContainer { - /// Either encoded data or raw data. - inner: Result<(Huffman, Vec), Vec>, - /// Offsets that bound each contained slice. - /// - /// The length will be one greater than the number of contained items. - offsets: OffsetList, - /// Counts of the number of each pattern we've seen. - stats: BTreeMap -} - -impl HuffmanContainer { - /// Prints statistics about encoded containers. - pub fn print(&self) { - if let Ok((_huff, bytes)) = &self.inner { - println!("Bytes: {:?}, Symbols: {:?}", bytes.len(), self.stats.values().sum::()); - } - } -} - -impl<'a, B: Ord + Clone + 'static> PushInto<&'a Vec> for HuffmanContainer { - fn push_into(&mut self, item: &'a Vec) { - for x in item.iter() { *self.stats.entry(x.clone()).or_insert(0) += 1; } - match &mut self.inner { - Ok((huffman, bytes)) => { - bytes.extend(huffman.encode(item.iter())); - self.offsets.push(bytes.len()); - }, - Err(raw) => { - raw.extend(item.iter().cloned()); - self.offsets.push(raw.len()); - } - } - } -} - -impl<'a, B: Ord + Clone + 'static> PushInto> for HuffmanContainer { - fn push_into(&mut self, item: Wrapped<'a, B>) { - match item.decode() { - Ok(decoded) => { - for x in decoded { *self.stats.entry(x.clone()).or_insert(0) += 1; } - - }, - Err(symbols) => { - for x in symbols.iter() { *self.stats.entry(x.clone()).or_insert(0) += 1; } - } - } - match (item.decode(), &mut self.inner) { - (Ok(decoded), Ok((huffman, bytes))) => { - bytes.extend(huffman.encode(decoded)); - self.offsets.push(bytes.len()); - } - (Ok(decoded), Err(raw)) => { - raw.extend(decoded.cloned()); - self.offsets.push(raw.len()); - } - (Err(symbols), Ok((huffman, bytes))) => { - bytes.extend(huffman.encode(symbols.iter())); - self.offsets.push(bytes.len()); - } - (Err(symbols), Err(raw)) => { - raw.extend(symbols.iter().cloned()); - self.offsets.push(raw.len()); - } - } - } -} - -impl BatchContainer for HuffmanContainer { - type Owned = Vec; - type ReadItem<'a> = Wrapped<'a, B>; - - fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned { - match item.decode() { - Ok(decode) => decode.cloned().collect(), - Err(bytes) => bytes.to_vec(), - } - } - fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) { - other.clear(); - match item.decode() { - Ok(decode) => other.extend(decode.cloned()), - Err(bytes) => other.extend_from_slice(bytes), - } - } - fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item } - - fn push_ref(&mut self, item: Self::ReadItem<'_>) { self.push_into(item) } - fn push_own(&mut self, item: &Self::Owned) { self.push_into(item) } - - fn clear(&mut self) { *self = Self::default(); } - - fn with_capacity(size: usize) -> Self { - let mut offsets = OffsetList::with_capacity(size + 1); - offsets.push(0); - Self { - inner: Err(Vec::with_capacity(size)), - offsets, - stats: Default::default(), - } - } - fn merge_capacity(cont1: &Self, cont2: &Self) -> Self { - - if cont1.len() > 0 { cont1.print(); } - if cont2.len() > 0 { cont2.print(); } - - let mut counts = BTreeMap::default(); - for (symbol, count) in cont1.stats.iter() { - *counts.entry(symbol.clone()).or_insert(0) += count; - } - for (symbol, count) in cont2.stats.iter() { - *counts.entry(symbol.clone()).or_insert(0) += count; - } - - let bytes = Vec::with_capacity(counts.values().cloned().sum::() as usize); - let huffman = Huffman::create_from(counts); - let inner = Ok((huffman, bytes)); - // : Err(Vec::with_capacity(length)) - - let length = cont1.offsets.len() + cont2.offsets.len() - 2; - let mut offsets = OffsetList::with_capacity(length + 1); - offsets.push(0); - Self { - inner, - offsets, - stats: Default::default(), - } - } - fn index(&self, index: usize) -> Self::ReadItem<'_> { - let lower = self.offsets.index(index); - let upper = self.offsets.index(index+1); - match &self.inner { - Ok((huffman, bytes)) => Wrapped::encoded(Encoded::new(huffman, &bytes[lower .. upper])), - Err(raw) => Wrapped::decoded(&raw[lower .. upper]), - } - } - fn len(&self) -> usize { - self.offsets.len() - 1 - } -} -/// Default implementation introduces a first offset. -impl Default for HuffmanContainer { - fn default() -> Self { - let mut offsets = OffsetList::with_capacity(1); - offsets.push(0); - Self { - inner: Err(Vec::new()), - offsets, - stats: Default::default(), - } - } -} - -mod wrapper { - - use super::Encoded; - - pub struct Wrapped<'a, B: Ord> { - pub(crate) inner: Result, &'a [B]>, - } - - impl<'a, B: Ord> Wrapped<'a, B> { - /// Returns either a decoding iterator, or just the bytes themselves. - pub fn decode(&'a self) -> Result + 'a, &'a [B]> { - match &self.inner { - Ok(encoded) => Ok(encoded.decode()), - Err(symbols) => Err(symbols), - } - } - /// A wrapper around an encoded sequence. - pub fn encoded(e: Encoded<'a, B>) -> Self { Self { inner: Ok(e) } } - /// A wrapper around a decoded sequence. - pub fn decoded(d: &'a [B]) -> Self { Self { inner: Err(d) } } - } - - impl<'a, B: Ord> Copy for Wrapped<'a, B> { } - impl<'a, B: Ord> Clone for Wrapped<'a, B> { - fn clone(&self) -> Self { *self } - } - - use std::cmp::Ordering; - impl<'a, 'b, B: Ord> PartialEq> for Wrapped<'b, B> { - fn eq(&self, other: &Wrapped<'a, B>) -> bool { - match (self.decode(), other.decode()) { - (Ok(decode1), Ok(decode2)) => decode1.eq(decode2), - (Ok(decode1), Err(bytes2)) => decode1.eq(bytes2.iter()), - (Err(bytes1), Ok(decode2)) => bytes1.iter().eq(decode2), - (Err(bytes1), Err(bytes2)) => bytes1.eq(bytes2), - } - } - } - impl<'a, B: Ord> Eq for Wrapped<'a, B> { } - impl<'a, 'b, B: Ord> PartialOrd> for Wrapped<'b, B> { - fn partial_cmp(&self, other: &Wrapped<'a, B>) -> Option { - match (self.decode(), other.decode()) { - (Ok(decode1), Ok(decode2)) => decode1.partial_cmp(decode2), - (Ok(decode1), Err(bytes2)) => decode1.partial_cmp(bytes2.iter()), - (Err(bytes1), Ok(decode2)) => bytes1.iter().partial_cmp(decode2), - (Err(bytes1), Err(bytes2)) => bytes1.partial_cmp(bytes2), - } - } - } - impl<'a, B: Ord> Ord for Wrapped<'a, B> { - fn cmp(&self, other: &Self) -> Ordering { - self.partial_cmp(other).unwrap() - } - } -} - -/// Wrapper around a Huffman decoder and byte slices, decodeable to a byte sequence. -mod encoded { - - use super::Huffman; - - /// Welcome to GATs! - pub struct Encoded<'a, B: Ord> { - /// Text that decorates the data. - huffman: &'a Huffman, - /// The data itself. - bytes: &'a [u8], - } - - impl<'a, B: Ord> Encoded<'a, B> { - /// Returns either a decoding iterator, or just the bytes themselves. - pub fn decode(&'a self) -> impl Iterator + 'a { - self.huffman.decode(self.bytes.iter().cloned()) - } - pub fn new(huffman: &'a Huffman, bytes: &'a [u8]) -> Self { - Self { huffman, bytes } - } - } - - impl<'a, B: Ord> Copy for Encoded<'a, B> { } - impl<'a, B: Ord> Clone for Encoded<'a, B> { - fn clone(&self) -> Self { *self } - } -} - -mod huffman { - - use std::collections::BTreeMap; - use std::convert::TryInto; - - use self::decoder::Decoder; - use self::encoder::Encoder; - - /// Encoding and decoding state for Huffman codes. - pub struct Huffman { - /// byte indexed description of what to blat down for encoding. - /// An entry `(bits, code)` indicates that the low `bits` of `code` should be blatted down. - /// Probably every `code` fits in a `u64`, unless there are crazy frequencies? - encode: BTreeMap, - /// Byte-by-byte decoder. - decode: [Decode; 256], - } - impl Huffman { - - /// Encodes the provided symbols as a sequence of bytes. - /// - /// The last byte may only contain partial information, but it should be recorded as presented, - /// as we haven't a way to distinguish (e.g. a `Result` return type). - pub fn encode<'a, I>(&'a self, symbols: I) -> Encoder<'a, T, I::IntoIter> - where - I: IntoIterator, - { - Encoder::new(&self.encode, symbols.into_iter()) - } - - /// Decodes the provided bytes as a sequence of symbols. - pub fn decode(&self, bytes: I) -> Decoder<'_, T, I::IntoIter> - where - I: IntoIterator - { - Decoder::new(&self.decode, bytes.into_iter()) - } - - pub fn create_from(counts: BTreeMap) -> Self where T: Clone { - - if counts.is_empty() { - return Self { - encode: Default::default(), - decode: Decode::map(), - }; - } - - let mut heap = std::collections::BinaryHeap::new(); - for (item, count) in counts { - heap.push((-count, Node::Leaf(item))); - } - let mut tree = Vec::with_capacity(2 * heap.len() - 1); - while heap.len() > 1 { - let (count1, least1) = heap.pop().unwrap(); - let (count2, least2) = heap.pop().unwrap(); - let fork = Node::Fork(tree.len(), tree.len()+1); - tree.push(least1); - tree.push(least2); - heap.push((count1 + count2, fork)); - } - tree.push(heap.pop().unwrap().1); - - let mut levels = Vec::with_capacity(1 + tree.len()/2); - let mut todo = vec![(tree.last().unwrap(), 0)]; - while let Some((node, level)) = todo.pop() { - match node { - Node::Leaf(sym) => { levels.push((level, sym)); }, - Node::Fork(l,r) => { - todo.push((&tree[*l], level + 1)); - todo.push((&tree[*r], level + 1)); - }, - } - } - levels.sort_by(|x,y| x.0.cmp(&y.0)); - let mut code: u64 = 0; - let mut prev_level = 0; - let mut encode = BTreeMap::new(); - let mut decode = Decode::map(); - for (level, sym) in levels { - if prev_level != level { - code <<= level - prev_level; - prev_level = level; - } - encode.insert(sym.clone(), (level, code)); - Self::insert_decode(&mut decode, sym, level, code << (64-level)); - - code += 1; - } - - for (index, entry) in decode.iter().enumerate() { - if entry.any_void() { - panic!("VOID FOUND: {:?}", index); - } - } - - Huffman { - encode, - decode, - } - } - - /// Inserts a symbol, and - fn insert_decode(map: &mut [Decode; 256], symbol: &T, bits: usize, code: u64) where T: Clone { - let byte: u8 = (code >> 56).try_into().unwrap(); - if bits <= 8 { - for off in 0 .. (1 << (8 - bits)) { - map[(byte as usize) + off] = Decode::Symbol(symbol.clone(), bits); - } - } - else { - if let Decode::Void = &map[byte as usize] { - map[byte as usize] = Decode::Further(Box::new(Decode::map())); - } - if let Decode::Further(next_map) = &mut map[byte as usize] { - Self::insert_decode(next_map, symbol, bits - 8, code << 8); - } - } - } - } - /// Tree structure for Huffman bit length determination. - #[derive(Eq, PartialEq, Ord, PartialOrd, Debug)] - enum Node { - Leaf(T), - Fork(usize, usize), - } - - /// Decoder - #[derive(Eq, PartialEq, Ord, PartialOrd, Debug, Default)] - pub enum Decode { - /// An as-yet unfilled slot. - #[default] - Void, - /// The symbol, and the number of bits consumed. - Symbol(T, usize), - /// An additional map to push subsequent bytes at. - Further(Box<[Decode; 256]>), - } - - impl Decode { - /// Tests to see if the map contains any invalid values. - /// - /// A correctly initialized map will have no invalid values. - /// A map with invalid values will be unable to decode some - /// input byte sequences. - fn any_void(&self) -> bool { - match self { - Decode::Void => true, - Decode::Symbol(_,_) => false, - Decode::Further(map) => map.iter().any(|m| m.any_void()), - } - } - /// Creates a new map containing invalid values. - fn map() -> [Decode; 256] { - let mut vec = Vec::with_capacity(256); - for _ in 0 .. 256 { - vec.push(Decode::Void); - } - vec.try_into().ok().unwrap() - } - } - - - /// A tabled Huffman decoder, written as an iterator. - mod decoder { - - use super::Decode; - - #[derive(Copy, Clone)] - pub struct Decoder<'a, T, I> { - decode: &'a [Decode; 256], - bytes: I, - pending_byte: u16, - pending_bits: usize, - } - - impl<'a, T, I> Decoder<'a, T, I> { - pub fn new(decode: &'a [Decode; 256], bytes: I) -> Self { - Self { - decode, - bytes, - pending_byte: 0, - pending_bits: 0, - } - } - } - - impl<'a, T, I: Iterator> Iterator for Decoder<'a, T, I> { - type Item = &'a T; - fn next(&mut self) -> Option<&'a T> { - // We must navigate `self.decode`, restocking bits whenever possible. - // We stop if ever there are not enough bits remaining. - let mut map = self.decode; - loop { - if self.pending_bits < 8 { - if let Some(next_byte) = self.bytes.next() { - self.pending_byte = (self.pending_byte << 8) + next_byte as u16; - self.pending_bits += 8; - } - else { - return None; - } - } - let byte = (self.pending_byte >> (self.pending_bits - 8)) as usize; - match &map[byte] { - Decode::Void => { panic!("invalid decoding map"); } - Decode::Symbol(s, bits) => { - self.pending_bits -= bits; - self.pending_byte &= (1 << self.pending_bits) - 1; - return Some(s); - } - Decode::Further(next_map) => { - self.pending_bits -= 8; - self.pending_byte &= (1 << self.pending_bits) - 1; - map = next_map; - } - } - } - } - } - } - - /// A tabled Huffman encoder, written as an iterator. - mod encoder { - - use std::collections::BTreeMap; - - #[derive(Copy, Clone)] - pub struct Encoder<'a, T, I> { - encode: &'a BTreeMap, - symbols: I, - pending_byte: u64, - pending_bits: usize, - } - - impl<'a, T, I> Encoder<'a, T, I> { - pub fn new(encode: &'a BTreeMap, symbols: I) -> Self { - Self { - encode, - symbols, - pending_byte: 0, - pending_bits: 0, - } - } - } - - impl<'a, T: Ord, I> Iterator for Encoder<'a, T, I> - where - I: Iterator, - { - type Item = u8; - fn next(&mut self) -> Option { - // We repeatedly ship bytes out of `self.pending_byte`, restocking from `self.symbols`. - while self.pending_bits < 8 { - if let Some(symbol) = self.symbols.next() { - let (bits, code) = self.encode.get(symbol).unwrap(); - self.pending_byte <<= bits; - self.pending_byte += code; - self.pending_bits += bits; - } - else { - // We have run out of symbols. Perhaps there is a final fractional byte to ship? - if self.pending_bits > 0 { - let byte = self.pending_byte << (8 - self.pending_bits); - self.pending_bits = 0; - self.pending_byte = 0; - return Some(byte as u8); - } - else { - return None; - } - } - } - - let byte = self.pending_byte >> (self.pending_bits - 8); - self.pending_bits -= 8; - self.pending_byte &= (1 << self.pending_bits) - 1; - Some(byte as u8) - } - } - } - -} diff --git a/differential-dataflow/src/trace/implementations/merge_batcher.rs b/differential-dataflow/src/trace/implementations/merge_batcher.rs index d7656fd0e..aa41300fb 100644 --- a/differential-dataflow/src/trace/implementations/merge_batcher.rs +++ b/differential-dataflow/src/trace/implementations/merge_batcher.rs @@ -344,7 +344,7 @@ pub mod container { where D: Ord + Clone + 'static, T: Ord + Clone + PartialOrder + 'static, - R: crate::difference::Semigroup + Clone + 'static, + R: crate::difference::Semigroup + 'static, { type Chunk = Vec<(D, T, R)>; type Time = T; @@ -599,7 +599,7 @@ pub mod container { use crate::difference::Semigroup; use super::InternalMerge; - impl InternalMerge for Vec<(D, T, R)> { + impl InternalMerge for Vec<(D, T, R)> { type TimeOwned = T; fn len(&self) -> usize { Vec::len(self) } diff --git a/differential-dataflow/src/trace/implementations/mod.rs b/differential-dataflow/src/trace/implementations/mod.rs index 836c6f612..2b14659f1 100644 --- a/differential-dataflow/src/trace/implementations/mod.rs +++ b/differential-dataflow/src/trace/implementations/mod.rs @@ -41,10 +41,7 @@ pub mod spine_fueled; pub mod merge_batcher; -pub mod chainless_batcher; pub mod ord_neu; -pub mod rhh; -pub mod huffman_container; pub mod chunker; // Opinionated takes on default spines. @@ -71,7 +68,7 @@ pub trait Update { /// Values associated with the key. type Val: Ord + Clone + 'static; /// Time at which updates occur. - type Time: Ord + Clone + Lattice + timely::progress::Timestamp; + type Time: Lattice + timely::progress::Timestamp; /// Way in which updates occur. type Diff: Ord + Semigroup + 'static; } @@ -80,7 +77,7 @@ impl Update for ((K, V), T, R) where K: Ord+Clone+'static, V: Ord+Clone+'static, - T: Ord+Clone+Lattice+timely::progress::Timestamp, + T: Lattice+timely::progress::Timestamp, R: Ord+Semigroup+'static, { type Key = K; @@ -378,7 +375,7 @@ where KBC: for<'a> BatchContainer: PartialEq<&'a K>>, V: Ord + Clone + 'static, VBC: for<'a> BatchContainer: PartialEq<&'a V>>, - T: Timestamp + Lattice + Clone + 'static, + T: Timestamp + Lattice + 'static, R: Ord + Semigroup + 'static, { type Key<'a> = K; @@ -561,7 +558,7 @@ pub mod containers { &self[index] } fn get(&self, index: usize) -> Option> { - <[T]>::get(&self, index) + <[T]>::get(self, index) } fn len(&self) -> usize { self[..].len() diff --git a/differential-dataflow/src/trace/implementations/ord_neu.rs b/differential-dataflow/src/trace/implementations/ord_neu.rs index 0d5315f6e..00ac3009f 100644 --- a/differential-dataflow/src/trace/implementations/ord_neu.rs +++ b/differential-dataflow/src/trace/implementations/ord_neu.rs @@ -28,9 +28,6 @@ pub type OrdValBatcher = MergeBatcher, ContainerChu /// A builder using ordered lists. pub type RcOrdValBuilder = RcBuilder, Vec<((K,V),T,R)>>>; -// /// A trace implementation for empty values using a spine of ordered lists. -// pub type OrdKeySpine = Spine>>>; - /// A trace implementation using a spine of ordered lists. pub type OrdKeySpine = Spine>>>; /// A batcher for ordered lists. @@ -38,9 +35,6 @@ pub type OrdKeyBatcher = MergeBatcher, ContainerChunk /// A builder for ordered lists. pub type RcOrdKeyBuilder = RcBuilder, Vec<((K,()),T,R)>>>; -// /// A trace implementation for empty values using a spine of ordered lists. -// pub type OrdKeySpine = Spine>>>; - pub use layers::{Vals, Upds}; /// Layers are containers of lists of some type. /// diff --git a/differential-dataflow/src/trace/implementations/rhh.rs b/differential-dataflow/src/trace/implementations/rhh.rs deleted file mode 100644 index 7308b3a49..000000000 --- a/differential-dataflow/src/trace/implementations/rhh.rs +++ /dev/null @@ -1,883 +0,0 @@ -//! Batch implementation based on Robin Hood Hashing. -//! -//! Items are ordered by `(hash(Key), Key)` rather than `Key`, which means -//! that these implementations should only be used with each other, under -//! the same `hash` function, or for types that also order by `(hash(X), X)`, -//! for example wrapped types that implement `Ord` that way. - -use std::rc::Rc; -use std::cmp::Ordering; - -use serde::{Deserialize, Serialize}; - -use crate::Hashable; -use crate::trace::implementations::chunker::ContainerChunker; -use crate::trace::implementations::merge_batcher::MergeBatcher; -use crate::trace::implementations::merge_batcher::container::VecInternalMerger; -use crate::trace::implementations::spine_fueled::Spine; -use crate::trace::rc_blanket_impls::RcBuilder; - -use super::{Layout, Vector}; - -use self::val_batch::{RhhValBatch, RhhValBuilder}; - -/// A trace implementation using a spine of ordered lists. -pub type VecSpine = Spine>>>; -/// A batcher for ordered lists. -pub type VecBatcher = MergeBatcher, ContainerChunker>, VecInternalMerger<(K, V), T, R>>; -/// A builder for ordered lists. -pub type VecBuilder = RcBuilder, Vec<((K,V),T,R)>>>; - -// /// A trace implementation for empty values using a spine of ordered lists. -// pub type OrdKeySpine = Spine>>>; - -/// A carrier trait indicating that the type's `Ord` and `PartialOrd` implementations are by `Hashable::hashed()`. -pub trait HashOrdered: Hashable { } - -impl<'a, T: std::hash::Hash + HashOrdered> HashOrdered for &'a T { } - -/// A hash-ordered wrapper that modifies `Ord` and `PartialOrd`. -#[derive(Copy, Clone, Eq, PartialEq, Debug, Default, Serialize, Deserialize)] -pub struct HashWrapper { - /// The inner value, freely modifiable. - pub inner: T -} - -impl> PartialOrd for HashWrapper { - fn partial_cmp(&self, other: &Self) -> Option { - let this_hash = self.inner.hashed(); - let that_hash = other.inner.hashed(); - (this_hash, &self.inner).partial_cmp(&(that_hash, &other.inner)) - } -} - -impl> Ord for HashWrapper { - fn cmp(&self, other: &Self) -> Ordering { - self.partial_cmp(other).unwrap() - } -} - -impl HashOrdered for HashWrapper { } - -impl Hashable for HashWrapper { - type Output = T::Output; - fn hashed(&self) -> Self::Output { self.inner.hashed() } -} - -impl HashOrdered for &HashWrapper { } - -impl Hashable for &HashWrapper { - type Output = T::Output; - fn hashed(&self) -> Self::Output { self.inner.hashed() } -} - -mod val_batch { - - use std::convert::TryInto; - use std::marker::PhantomData; - use serde::{Deserialize, Serialize}; - use timely::container::PushInto; - use timely::progress::{Antichain, frontier::AntichainRef}; - - use crate::hashable::Hashable; - use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger}; - use crate::trace::implementations::{BatchContainer, BuilderInput}; - use crate::trace::implementations::layout; - - use super::{Layout, HashOrdered}; - - /// Update tuples organized as a Robin Hood Hash map, ordered by `(hash(Key), Key, Val, Time)`. - /// - /// Specifically, this means that we attempt to place any `Key` at `alloc_len * (hash(Key) / 2^64)`, - /// and spill onward if the slot is occupied. The cleverness of RHH is that you may instead evict - /// someone else, in order to maintain the ordering up above. In fact, that is basically the rule: - /// when there is a conflict, evict the greater of the two and attempt to place it in the next slot. - /// - /// This RHH implementation uses a repeated `keys_offs` offset to indicate an absent element, as all - /// keys for valid updates must have some associated values with updates. This is the same type of - /// optimization made for repeated updates, and it rules out (here) using that trick for repeated values. - /// - /// We will use the `Hashable` trait here, but any consistent hash function should work out ok. - /// We specifically want to use the highest bits of the result (we will) because the low bits have - /// likely been spent shuffling the data between workers (by key), and are likely low entropy. - #[derive(Debug, Serialize, Deserialize)] - pub struct RhhValStorage - where - layout::Key: Default + HashOrdered, - { - - /// The requested capacity for `keys`. We use this when determining where a key with a certain hash - /// would most like to end up. The `BatchContainer` trait does not provide a `capacity()` method, - /// otherwise we would just use that. - pub key_capacity: usize, - /// A number large enough that when it divides any `u64` the result is at most `self.key_capacity`. - /// When that capacity is zero or one, this is set to zero instead. - pub divisor: u64, - /// The number of present keys, distinct from `keys.len()` which contains - pub key_count: usize, - - /// An ordered list of keys, corresponding to entries in `keys_offs`. - pub keys: L::KeyContainer, - /// Offsets used to provide indexes from keys to values. - /// - /// The length of this list is one longer than `keys`, so that we can avoid bounds logic. - pub keys_offs: L::OffsetContainer, - /// Concatenated ordered lists of values, bracketed by offsets in `keys_offs`. - pub vals: L::ValContainer, - /// Offsets used to provide indexes from values to updates. - /// - /// This list has a special representation that any empty range indicates the singleton - /// element just before the range, as if the start were decremented by one. The empty - /// range is otherwise an invalid representation, and we borrow it to compactly encode - /// single common update values (e.g. in a snapshot, the minimal time and a diff of one). - /// - /// The length of this list is one longer than `vals`, so that we can avoid bounds logic. - pub vals_offs: L::OffsetContainer, - /// Concatenated ordered lists of update times, bracketed by offsets in `vals_offs`. - pub times: L::TimeContainer, - /// Concatenated ordered lists of update diffs, bracketed by offsets in `vals_offs`. - pub diffs: L::DiffContainer, - } - - impl RhhValStorage - where - layout::Key: Default + HashOrdered, - for<'a> layout::KeyRef<'a, L>: HashOrdered, - { - /// Lower and upper bounds in `self.vals` corresponding to the key at `index`. - fn values_for_key(&self, index: usize) -> (usize, usize) { - let lower = self.keys_offs.index(index); - let upper = self.keys_offs.index(index+1); - // Looking up values for an invalid key indicates something is wrong. - assert!(lower < upper, "{:?} v {:?} at {:?}", lower, upper, index); - (lower, upper) - } - /// Lower and upper bounds in `self.updates` corresponding to the value at `index`. - fn updates_for_value(&self, index: usize) -> (usize, usize) { - let mut lower = self.vals_offs.index(index); - let upper = self.vals_offs.index(index+1); - // We use equal lower and upper to encode "singleton update; just before here". - // It should only apply when there is a prior element, so `lower` should be greater than zero. - if lower == upper { - assert!(lower > 0); - lower -= 1; - } - (lower, upper) - } - - /// Inserts the key at its desired location, or nearby. - /// - /// Because there may be collisions, they key may be placed just after its desired location. - /// If necessary, this method will introduce default keys and copy the offsets to create space - /// after which to insert the key. These will be indicated by `None` entries in the `hash` vector. - /// - /// If `offset` is specified, we will insert it at the appropriate location. If it is not specified, - /// we leave `keys_offs` ready to receive it as the next `push`. This is so that builders that may - /// not know the final offset at the moment of key insertion can prepare for receiving the offset. - fn insert_key(&mut self, key: layout::KeyRef<'_, L>, offset: Option) { - let desired = self.desired_location(&key); - // Were we to push the key now, it would be at `self.keys.len()`, so while that is wrong, - // push additional blank entries in. - while self.keys.len() < desired { - // We insert a default (dummy) key and repeat the offset to indicate this. - let current_offset = self.keys_offs.index(self.keys.len()); - self.keys.push_own(& as Default>::default()); - self.keys_offs.push_ref(current_offset); - } - - // Now we insert the key. Even if it is no longer the desired location because of contention. - // If an offset has been supplied we insert it, and otherwise leave it for future determination. - self.keys.push_ref(key); - if let Some(offset) = offset { - self.keys_offs.push_ref(offset); - } - self.key_count += 1; - } - - /// Inserts a reference to an owned key, inefficiently. Should be removed. - fn insert_key_own(&mut self, key: &layout::Key, offset: Option) { - let mut key_con = L::KeyContainer::with_capacity(1); - key_con.push_own(&key); - self.insert_key(key_con.index(0), offset) - } - - /// Indicates both the desired location and the hash signature of the key. - fn desired_location(&self, key: &K) -> usize { - if self.divisor == 0 { 0 } - else { - (key.hashed().into() / self.divisor).try_into().expect("divisor not large enough to force u64 into uisze") - } - } - - /// Returns true if one should advance one's index in the search for `key`. - fn advance_key(&self, index: usize, key: layout::KeyRef<'_, L>) -> bool { - // Ideally this short-circuits, as `self.keys[index]` is bogus data. - !self.live_key(index) || self.keys.index(index).lt(&::reborrow(key)) - } - - /// Indicates that a key is valid, rather than dead space, by looking for a valid offset range. - fn live_key(&self, index: usize) -> bool { - self.keys_offs.index(index) != self.keys_offs.index(index+1) - } - - /// Advances `index` until it references a live key, or is `keys.len()`. - fn advance_to_live_key(&self, index: &mut usize) { - while *index < self.keys.len() && !self.live_key(*index) { - *index += 1; - } - } - - /// A value large enough that any `u64` divided by it is less than `capacity`. - /// - /// This is `2^64 / capacity`, except in the cases where `capacity` is zero or one. - /// In those cases, we'll return `0` to communicate the exception, for which we should - /// just return `0` when announcing a target location (and a zero capacity that we insert - /// into becomes a bug). - fn divisor_for_capacity(capacity: usize) -> u64 { - let capacity: u64 = capacity.try_into().expect("usize exceeds u64"); - if capacity == 0 || capacity == 1 { 0 } - else { - ((1 << 63) / capacity) << 1 - } - } - } - - /// An immutable collection of update tuples, from a contiguous interval of logical times. - /// - /// The `L` parameter captures how the updates should be laid out, and `C` determines which - /// merge batcher to select. - #[derive(Serialize, Deserialize)] - #[serde(bound = " - L::KeyContainer: Serialize + for<'a> Deserialize<'a>, - L::ValContainer: Serialize + for<'a> Deserialize<'a>, - L::OffsetContainer: Serialize + for<'a> Deserialize<'a>, - L::TimeContainer: Serialize + for<'a> Deserialize<'a>, - L::DiffContainer: Serialize + for<'a> Deserialize<'a>, - ")] - pub struct RhhValBatch - where - layout::Key: Default + HashOrdered, - { - /// The updates themselves. - pub storage: RhhValStorage, - /// Description of the update times this layer represents. - pub description: Description>, - /// The number of updates reflected in the batch. - /// - /// We track this separately from `storage` because due to the singleton optimization, - /// we may have many more updates than `storage.updates.len()`. It should equal that - /// length, plus the number of singleton optimizations employed. - pub updates: usize, - } - - impl WithLayout for RhhValBatch - where - layout::Key: Default + HashOrdered, - for<'a> layout::KeyRef<'a, L>: HashOrdered, - { - type Layout = L; - } - - impl BatchReader for RhhValBatch - where - layout::Key: Default + HashOrdered, - for<'a> layout::KeyRef<'a, L>: HashOrdered, - { - type Cursor = RhhValCursor; - fn cursor(&self) -> Self::Cursor { - let mut cursor = RhhValCursor { - key_cursor: 0, - val_cursor: 0, - phantom: std::marker::PhantomData, - }; - cursor.step_key(self); - cursor - } - fn len(&self) -> usize { - // Normally this would be `self.updates.len()`, but we have a clever compact encoding. - // Perhaps we should count such exceptions to the side, to provide a correct accounting. - self.updates - } - fn description(&self) -> &Description> { &self.description } - } - - impl Batch for RhhValBatch - where - layout::Key: Default + HashOrdered, - for<'a> layout::KeyRef<'a, L>: HashOrdered, - { - type Merger = RhhValMerger; - - fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef>) -> Self::Merger { - RhhValMerger::new(self, other, compaction_frontier) - } - - fn empty(lower: Antichain, upper: Antichain) -> Self { - use timely::progress::Timestamp; - Self { - storage: RhhValStorage { - keys: L::KeyContainer::with_capacity(0), - keys_offs: L::OffsetContainer::with_capacity(0), - vals: L::ValContainer::with_capacity(0), - vals_offs: L::OffsetContainer::with_capacity(0), - times: L::TimeContainer::with_capacity(0), - diffs: L::DiffContainer::with_capacity(0), - key_count: 0, - key_capacity: 0, - divisor: 0, - }, - description: Description::new(lower, upper, Antichain::from_elem(Self::Time::minimum())), - updates: 0, - } - } - } - - /// State for an in-progress merge. - pub struct RhhValMerger - where - layout::Key: Default + HashOrdered, - { - /// Key position to merge next in the first batch. - key_cursor1: usize, - /// Key position to merge next in the second batch. - key_cursor2: usize, - /// result that we are currently assembling. - result: RhhValStorage, - /// description - description: Description>, - - /// Local stash of updates, to use for consolidation. - /// - /// We could emulate a `ChangeBatch` here, with related compaction smarts. - /// A `ChangeBatch` itself needs an `i64` diff type, which we have not. - update_stash: Vec<(layout::Time, layout::Diff)>, - /// Counts the number of singleton-optimized entries, that we may correctly count the updates. - singletons: usize, - } - - impl Merger> for RhhValMerger - where - layout::Key: Default + HashOrdered, - RhhValBatch: Batch>, - for<'a> layout::KeyRef<'a, L>: HashOrdered, - { - fn new(batch1: &RhhValBatch, batch2: &RhhValBatch, compaction_frontier: AntichainRef>) -> Self { - - assert!(batch1.upper() == batch2.lower()); - use crate::lattice::Lattice; - let mut since = batch1.description().since().join(batch2.description().since()); - since = since.join(&compaction_frontier.to_owned()); - - let description = Description::new(batch1.lower().clone(), batch2.upper().clone(), since); - - // This is a massive overestimate on the number of keys, but we don't have better information. - // An over-estimate can be a massive problem as well, with sparse regions being hard to cross. - let max_cap = batch1.len() + batch2.len(); - let rhh_cap = 2 * max_cap; - - let batch1 = &batch1.storage; - let batch2 = &batch2.storage; - - let mut storage = RhhValStorage { - keys: L::KeyContainer::merge_capacity(&batch1.keys, &batch2.keys), - keys_offs: L::OffsetContainer::with_capacity(batch1.keys_offs.len() + batch2.keys_offs.len()), - vals: L::ValContainer::merge_capacity(&batch1.vals, &batch2.vals), - vals_offs: L::OffsetContainer::with_capacity(batch1.vals_offs.len() + batch2.vals_offs.len()), - times: L::TimeContainer::merge_capacity(&batch1.times, &batch2.times), - diffs: L::DiffContainer::merge_capacity(&batch1.diffs, &batch2.diffs), - key_count: 0, - key_capacity: rhh_cap, - divisor: RhhValStorage::::divisor_for_capacity(rhh_cap), - }; - - // Mark explicit types because type inference fails to resolve it. - let keys_offs: &mut L::OffsetContainer = &mut storage.keys_offs; - keys_offs.push_ref(0); - let vals_offs: &mut L::OffsetContainer = &mut storage.vals_offs; - vals_offs.push_ref(0); - - RhhValMerger { - key_cursor1: 0, - key_cursor2: 0, - result: storage, - description, - update_stash: Vec::new(), - singletons: 0, - } - } - fn done(self) -> RhhValBatch { - RhhValBatch { - updates: self.result.times.len() + self.singletons, - storage: self.result, - description: self.description, - } - } - fn work(&mut self, source1: &RhhValBatch, source2: &RhhValBatch, fuel: &mut isize) { - - // An (incomplete) indication of the amount of work we've done so far. - let starting_updates = self.result.times.len(); - let mut effort = 0isize; - - source1.storage.advance_to_live_key(&mut self.key_cursor1); - source2.storage.advance_to_live_key(&mut self.key_cursor2); - - // While both mergees are still active, perform single-key merges. - while self.key_cursor1 < source1.storage.keys.len() && self.key_cursor2 < source2.storage.keys.len() && effort < *fuel { - self.merge_key(&source1.storage, &source2.storage); - source1.storage.advance_to_live_key(&mut self.key_cursor1); - source2.storage.advance_to_live_key(&mut self.key_cursor2); - // An (incomplete) accounting of the work we've done. - effort = (self.result.times.len() - starting_updates) as isize; - } - - // Merging is complete, and only copying remains. - // Key-by-key copying allows effort interruption, and compaction. - while self.key_cursor1 < source1.storage.keys.len() && effort < *fuel { - self.copy_key(&source1.storage, self.key_cursor1); - self.key_cursor1 += 1; - source1.storage.advance_to_live_key(&mut self.key_cursor1); - effort = (self.result.times.len() - starting_updates) as isize; - } - while self.key_cursor2 < source2.storage.keys.len() && effort < *fuel { - self.copy_key(&source2.storage, self.key_cursor2); - self.key_cursor2 += 1; - source2.storage.advance_to_live_key(&mut self.key_cursor2); - effort = (self.result.times.len() - starting_updates) as isize; - } - - *fuel -= effort; - } - } - - // Helper methods in support of merging batches. - impl RhhValMerger - where - layout::Key: Default + HashOrdered, - for<'a> layout::KeyRef<'a, L>: HashOrdered, - { - /// Copy the next key in `source`. - /// - /// The method extracts the key in `source` at `cursor`, and merges it in to `self`. - /// If the result does not wholly cancel, they key will be present in `self` with the - /// compacted values and updates. - /// - /// The caller should be certain to update the cursor, as this method does not do this. - fn copy_key(&mut self, source: &RhhValStorage, cursor: usize) { - // Capture the initial number of values to determine if the merge was ultimately non-empty. - let init_vals = self.result.vals.len(); - let (mut lower, upper) = source.values_for_key(cursor); - while lower < upper { - self.stash_updates_for_val(source, lower); - if let Some(off) = self.consolidate_updates() { - self.result.vals_offs.push_ref(off); - self.result.vals.push_ref(source.vals.index(lower)); - } - lower += 1; - } - - // If we have pushed any values, copy the key as well. - if self.result.vals.len() > init_vals { - self.result.insert_key(source.keys.index(cursor), Some(self.result.vals.len())); - } - } - /// Merge the next key in each of `source1` and `source2` into `self`, updating the appropriate cursors. - /// - /// This method only merges a single key. It applies all compaction necessary, and may result in no output - /// if the updates cancel either directly or after compaction. - fn merge_key(&mut self, source1: &RhhValStorage, source2: &RhhValStorage) { - - use ::std::cmp::Ordering; - match source1.keys.index(self.key_cursor1).cmp(&source2.keys.index(self.key_cursor2)) { - Ordering::Less => { - self.copy_key(source1, self.key_cursor1); - self.key_cursor1 += 1; - }, - Ordering::Equal => { - // Keys are equal; must merge all values from both sources for this one key. - let (lower1, upper1) = source1.values_for_key(self.key_cursor1); - let (lower2, upper2) = source2.values_for_key(self.key_cursor2); - if let Some(off) = self.merge_vals((source1, lower1, upper1), (source2, lower2, upper2)) { - self.result.insert_key(source1.keys.index(self.key_cursor1), Some(off)); - } - // Increment cursors in either case; the keys are merged. - self.key_cursor1 += 1; - self.key_cursor2 += 1; - }, - Ordering::Greater => { - self.copy_key(source2, self.key_cursor2); - self.key_cursor2 += 1; - }, - } - } - /// Merge two ranges of values into `self`. - /// - /// If the compacted result contains values with non-empty updates, the function returns - /// an offset that should be recorded to indicate the upper extent of the result values. - fn merge_vals( - &mut self, - (source1, mut lower1, upper1): (&RhhValStorage, usize, usize), - (source2, mut lower2, upper2): (&RhhValStorage, usize, usize), - ) -> Option { - // Capture the initial number of values to determine if the merge was ultimately non-empty. - let init_vals = self.result.vals.len(); - while lower1 < upper1 && lower2 < upper2 { - // We compare values, and fold in updates for the lowest values; - // if they are non-empty post-consolidation, we write the value. - // We could multi-way merge and it wouldn't be very complicated. - use ::std::cmp::Ordering; - match source1.vals.index(lower1).cmp(&source2.vals.index(lower2)) { - Ordering::Less => { - // Extend stash by updates, with logical compaction applied. - self.stash_updates_for_val(source1, lower1); - if let Some(off) = self.consolidate_updates() { - self.result.vals_offs.push_ref(off); - self.result.vals.push_ref(source1.vals.index(lower1)); - } - lower1 += 1; - }, - Ordering::Equal => { - self.stash_updates_for_val(source1, lower1); - self.stash_updates_for_val(source2, lower2); - if let Some(off) = self.consolidate_updates() { - self.result.vals_offs.push_ref(off); - self.result.vals.push_ref(source1.vals.index(lower1)); - } - lower1 += 1; - lower2 += 1; - }, - Ordering::Greater => { - // Extend stash by updates, with logical compaction applied. - self.stash_updates_for_val(source2, lower2); - if let Some(off) = self.consolidate_updates() { - self.result.vals_offs.push_ref(off); - self.result.vals.push_ref(source2.vals.index(lower2)); - } - lower2 += 1; - }, - } - } - // Merging is complete, but we may have remaining elements to push. - while lower1 < upper1 { - self.stash_updates_for_val(source1, lower1); - if let Some(off) = self.consolidate_updates() { - self.result.vals_offs.push_ref(off); - self.result.vals.push_ref(source1.vals.index(lower1)); - } - lower1 += 1; - } - while lower2 < upper2 { - self.stash_updates_for_val(source2, lower2); - if let Some(off) = self.consolidate_updates() { - self.result.vals_offs.push_ref(off); - self.result.vals.push_ref(source2.vals.index(lower2)); - } - lower2 += 1; - } - - // Values being pushed indicate non-emptiness. - if self.result.vals.len() > init_vals { - Some(self.result.vals.len()) - } else { - None - } - } - - /// Transfer updates for an indexed value in `source` into `self`, with compaction applied. - fn stash_updates_for_val(&mut self, source: &RhhValStorage, index: usize) { - let (lower, upper) = source.updates_for_value(index); - for i in lower .. upper { - // NB: Here is where we would need to look back if `lower == upper`. - let time = source.times.index(i); - let diff = source.diffs.index(i); - let mut new_time = L::TimeContainer::into_owned(time); - use crate::lattice::Lattice; - new_time.advance_by(self.description.since().borrow()); - self.update_stash.push((new_time, L::DiffContainer::into_owned(diff))); - } - } - - /// Consolidates `self.updates_stash` and produces the offset to record, if any. - fn consolidate_updates(&mut self) -> Option { - use crate::consolidation; - consolidation::consolidate(&mut self.update_stash); - if !self.update_stash.is_empty() { - // If there is a single element, equal to a just-prior recorded update, - // we push nothing and report an unincremented offset to encode this case. - let time_diff = self.result.times.last().zip(self.result.diffs.last()); - let last_eq = self.update_stash.last().zip(time_diff).map(|((t1, d1), (t2, d2))| { - // TODO: The use of `into_owned` is a work-around for not having reference types. - *t1 == L::TimeContainer::into_owned(t2) && *d1 == L::DiffContainer::into_owned(d2) - }); - if self.update_stash.len() == 1 && last_eq.unwrap_or(false) { - // Just clear out update_stash, as we won't drain it here. - self.update_stash.clear(); - self.singletons += 1; - } - else { - // Conventional; move `update_stash` into `updates`. - for (time, diff) in self.update_stash.drain(..) { - self.result.times.push_own(&time); - self.result.diffs.push_own(&diff); - } - } - Some(self.result.times.len()) - } else { - None - } - } - } - - - /// A cursor through a Robin Hood Hashed list of keys, vals, and such. - /// - /// The important detail is that not all of `keys` represent valid keys. - /// We must consult `storage.hashed` to see if the associated data is valid. - /// Importantly, we should skip over invalid keys, rather than report them as - /// invalid through `key_valid`: that method is meant to indicate the end of - /// the cursor, rather than internal state. - pub struct RhhValCursor - where - layout::Key: Default + HashOrdered, - { - /// Absolute position of the current key. - key_cursor: usize, - /// Absolute position of the current value. - val_cursor: usize, - /// Phantom marker for Rust happiness. - phantom: PhantomData, - } - - use crate::trace::implementations::WithLayout; - impl WithLayout for RhhValCursor - where - layout::Key: Default + HashOrdered, - for<'a> layout::KeyRef<'a, L>: HashOrdered, - { - type Layout = L; - } - - impl Cursor for RhhValCursor - where - layout::Key: Default + HashOrdered, - for<'a> layout::KeyRef<'a, L>: HashOrdered, - { - type Storage = RhhValBatch; - - fn get_key<'a>(&self, storage: &'a RhhValBatch) -> Option> { storage.storage.keys.get(self.key_cursor) } - fn get_val<'a>(&self, storage: &'a RhhValBatch) -> Option> { if self.val_valid(storage) { storage.storage.vals.get(self.val_cursor) } else { None } } - fn key<'a>(&self, storage: &'a RhhValBatch) -> Self::Key<'a> { storage.storage.keys.index(self.key_cursor) } - fn val<'a>(&self, storage: &'a RhhValBatch) -> Self::Val<'a> { storage.storage.vals.index(self.val_cursor) } - fn map_times, Self::DiffGat<'_>)>(&mut self, storage: &RhhValBatch, mut logic: L2) { - let (lower, upper) = storage.storage.updates_for_value(self.val_cursor); - for index in lower .. upper { - let time = storage.storage.times.index(index); - let diff = storage.storage.diffs.index(index); - logic(time, diff); - } - } - fn key_valid(&self, storage: &RhhValBatch) -> bool { self.key_cursor < storage.storage.keys.len() } - fn val_valid(&self, storage: &RhhValBatch) -> bool { self.val_cursor < storage.storage.values_for_key(self.key_cursor).1 } - fn step_key(&mut self, storage: &RhhValBatch){ - // We advance the cursor by one for certain, and then as long as we need to find a valid key. - self.key_cursor += 1; - storage.storage.advance_to_live_key(&mut self.key_cursor); - - if self.key_valid(storage) { - self.rewind_vals(storage); - } - else { - self.key_cursor = storage.storage.keys.len(); - } - } - fn seek_key(&mut self, storage: &RhhValBatch, key: Self::Key<'_>) { - // self.key_cursor += storage.storage.keys.advance(self.key_cursor, storage.storage.keys.len(), |x| x.lt(key)); - let desired = storage.storage.desired_location(&key); - // Advance the cursor, if `desired` is ahead of it. - if self.key_cursor < desired { - self.key_cursor = desired; - } - // Advance the cursor as long as we have not found a value greater or equal to `key`. - // We may have already passed `key`, and confirmed its absence, but our goal is to - // find the next key afterwards so that users can, for example, alternately iterate. - while self.key_valid(storage) && storage.storage.advance_key(self.key_cursor, key) { - // TODO: Based on our encoding, we could skip logarithmically over empty regions by galloping - // through `storage.keys_offs`, which stays put for dead space. - self.key_cursor += 1; - } - - if self.key_valid(storage) { - self.rewind_vals(storage); - } - } - fn step_val(&mut self, storage: &RhhValBatch) { - self.val_cursor += 1; - if !self.val_valid(storage) { - self.val_cursor = storage.storage.values_for_key(self.key_cursor).1; - } - } - fn seek_val(&mut self, storage: &RhhValBatch, val: Self::Val<'_>) { - self.val_cursor += storage.storage.vals.advance(self.val_cursor, storage.storage.values_for_key(self.key_cursor).1, |x| ::reborrow(x).lt(&::reborrow(val))); - } - fn rewind_keys(&mut self, storage: &RhhValBatch) { - self.key_cursor = 0; - storage.storage.advance_to_live_key(&mut self.key_cursor); - - if self.key_valid(storage) { - self.rewind_vals(storage) - } - } - fn rewind_vals(&mut self, storage: &RhhValBatch) { - self.val_cursor = storage.storage.values_for_key(self.key_cursor).0; - } - } - - /// A builder for creating layers from unsorted update tuples. - pub struct RhhValBuilder - where - layout::Key: Default + HashOrdered, - { - result: RhhValStorage, - singleton: Option<(layout::Time, layout::Diff)>, - /// Counts the number of singleton optimizations we performed. - /// - /// This number allows us to correctly gauge the total number of updates reflected in a batch, - /// even though `updates.len()` may be much shorter than this amount. - singletons: usize, - _marker: PhantomData, - } - - impl RhhValBuilder - where - layout::Key: Default + HashOrdered, - { - /// Pushes a single update, which may set `self.singleton` rather than push. - /// - /// This operation is meant to be equivalent to `self.results.updates.push((time, diff))`. - /// However, for "clever" reasons it does not do this. Instead, it looks for opportunities - /// to encode a singleton update with an "absert" update: repeating the most recent offset. - /// This otherwise invalid state encodes "look back one element". - /// - /// When `self.singleton` is `Some`, it means that we have seen one update and it matched the - /// previously pushed update exactly. In that case, we do not push the update into `updates`. - /// The update tuple is retained in `self.singleton` in case we see another update and need - /// to recover the singleton to push it into `updates` to join the second update. - fn push_update(&mut self, time: layout::Time, diff: layout::Diff) { - // If a just-pushed update exactly equals `(time, diff)` we can avoid pushing it. - // TODO: The use of `into_owned` is a bandage for not having references we can compare. - if self.result.times.last().map(|t| L::TimeContainer::into_owned(t) == time).unwrap_or(false) && self.result.diffs.last().map(|d| L::DiffContainer::into_owned(d) == diff).unwrap_or(false) { - assert!(self.singleton.is_none()); - self.singleton = Some((time, diff)); - } - else { - // If we have pushed a single element, we need to copy it out to meet this one. - if let Some((time, diff)) = self.singleton.take() { - self.result.times.push_own(&time); - self.result.diffs.push_own(&diff); - } - self.result.times.push_own(&time); - self.result.diffs.push_own(&diff); - } - } - } - - impl Builder for RhhValBuilder - where - layout::Key: Default + HashOrdered, - CI: for<'a> BuilderInput = layout::Key, Time=layout::Time, Diff=layout::Diff>, - for<'a> L::ValContainer: PushInto>, - for<'a> layout::KeyRef<'a, L>: HashOrdered, - { - type Input = CI; - type Time = layout::Time; - type Output = RhhValBatch; - - fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self { - - // Double the capacity for RHH; probably excessive. - let rhh_capacity = 2 * keys; - let divisor = RhhValStorage::::divisor_for_capacity(rhh_capacity); - // We want some additive slop, in case we spill over. - // This number magically chosen based on nothing in particular. - // Worst case, we will re-alloc and copy if we spill beyond this. - let keys = rhh_capacity + 10; - - // We don't introduce zero offsets as they will be introduced by the first `push` call. - Self { - result: RhhValStorage { - keys: L::KeyContainer::with_capacity(keys), - keys_offs: L::OffsetContainer::with_capacity(keys + 1), - vals: L::ValContainer::with_capacity(vals), - vals_offs: L::OffsetContainer::with_capacity(vals + 1), - times: L::TimeContainer::with_capacity(upds), - diffs: L::DiffContainer::with_capacity(upds), - key_count: 0, - key_capacity: rhh_capacity, - divisor, - }, - singleton: None, - singletons: 0, - _marker: PhantomData, - } - } - - #[inline] - fn push(&mut self, chunk: &mut Self::Input) { - for item in chunk.drain() { - let (key, val, time, diff) = CI::into_parts(item); - // Perhaps this is a continuation of an already received key. - if self.result.keys.last().map(|k| CI::key_eq(&key, k)).unwrap_or(false) { - // Perhaps this is a continuation of an already received value. - if self.result.vals.last().map(|v| CI::val_eq(&val, v)).unwrap_or(false) { - self.push_update(time, diff); - } else { - // New value; complete representation of prior value. - self.result.vals_offs.push_ref(self.result.times.len()); - if self.singleton.take().is_some() { self.singletons += 1; } - self.push_update(time, diff); - self.result.vals.push_into(val); - } - } else { - // New key; complete representation of prior key. - self.result.vals_offs.push_ref(self.result.times.len()); - if self.singleton.take().is_some() { self.singletons += 1; } - self.result.keys_offs.push_ref(self.result.vals.len()); - self.push_update(time, diff); - self.result.vals.push_into(val); - // Insert the key, but with no specified offset. - self.result.insert_key_own(&key, None); - } - } - } - - #[inline(never)] - fn done(mut self, description: Description) -> RhhValBatch { - // Record the final offsets - self.result.vals_offs.push_ref(self.result.times.len()); - // Remove any pending singleton, and if it was set increment our count. - if self.singleton.take().is_some() { self.singletons += 1; } - self.result.keys_offs.push_ref(self.result.vals.len()); - RhhValBatch { - updates: self.result.times.len() + self.singletons, - storage: self.result, - description, - } - } - - fn seal(chain: &mut Vec, description: Description) -> Self::Output { - let (keys, vals, upds) = Self::Input::key_val_upd_counts(&chain[..]); - let mut builder = Self::with_capacity(keys, vals, upds); - for mut chunk in chain.drain(..) { - builder.push(&mut chunk); - } - - builder.done(description) - } - } - -} - -mod key_batch { - - // Copy the above, once it works! - -} diff --git a/differential-dataflow/src/trace/implementations/spine_fueled.rs b/differential-dataflow/src/trace/implementations/spine_fueled.rs index b7be06b9c..0df6b3bc8 100644 --- a/differential-dataflow/src/trace/implementations/spine_fueled.rs +++ b/differential-dataflow/src/trace/implementations/spine_fueled.rs @@ -378,21 +378,6 @@ impl Spine { }) } - /// Describes the merge progress of layers in the trace. - /// - /// Intended for diagnostics rather than public consumption. - #[allow(dead_code)] - fn describe(&self) -> Vec<(usize, usize)> { - self.merging - .iter() - .map(|b| match b { - MergeState::Vacant => (0, 0), - x @ MergeState::Single(_) => (1, x.len()), - x @ MergeState::Double(_) => (2, x.len()), - }) - .collect() - } - /// Allocates a fueled `Spine` with a specified effort multiplier. /// /// This trace will merge batches progressively, with each inserted batch applying a multiple @@ -870,7 +855,7 @@ impl MergeVariant { /// The result is either `None`, for structurally empty batches, /// or a batch and optionally input batches from which it derived. fn complete(mut self) -> Option<(B, Option<(B, B)>)> { - let mut fuel = isize::max_value(); + let mut fuel = isize::MAX; self.work(&mut fuel); if let MergeVariant::Complete(batch) = self { batch } else { panic!("Failed to complete a merge!"); } diff --git a/differential-dataflow/src/trace/wrappers/enter.rs b/differential-dataflow/src/trace/wrappers/enter.rs index cea7c8b27..137fe5496 100644 --- a/differential-dataflow/src/trace/wrappers/enter.rs +++ b/differential-dataflow/src/trace/wrappers/enter.rs @@ -27,7 +27,7 @@ impl Clone for TraceEnter { impl WithLayout for TraceEnter where - Tr: TraceReader, + Tr: TraceReader, TInner: Refines+Lattice, { type Layout = ( @@ -41,7 +41,7 @@ where impl TraceReader for TraceEnter where - Tr: TraceReader, + Tr: TraceReader, TInner: Refines+Lattice, { type Batch = BatchEnter; diff --git a/differential-dataflow/src/trace/wrappers/enter_at.rs b/differential-dataflow/src/trace/wrappers/enter_at.rs index 7a4739f08..2851eaf38 100644 --- a/differential-dataflow/src/trace/wrappers/enter_at.rs +++ b/differential-dataflow/src/trace/wrappers/enter_at.rs @@ -42,7 +42,7 @@ where impl WithLayout for TraceEnter where - Tr: TraceReader, + Tr: TraceReader, TInner: Refines+Lattice, F: Clone, G: Clone, @@ -58,7 +58,7 @@ where impl TraceReader for TraceEnter where - Tr: TraceReader, + Tr: TraceReader, TInner: Refines+Lattice, F: 'static, F: FnMut(Tr::Key<'_>, Tr::Val<'_>, Tr::TimeGat<'_>)->TInner+Clone, diff --git a/differential-dataflow/src/trace/wrappers/mod.rs b/differential-dataflow/src/trace/wrappers/mod.rs index aebaa72a6..ddebc66fb 100644 --- a/differential-dataflow/src/trace/wrappers/mod.rs +++ b/differential-dataflow/src/trace/wrappers/mod.rs @@ -3,4 +3,3 @@ pub mod enter; pub mod enter_at; pub mod frontier; -pub mod rc; diff --git a/differential-dataflow/src/trace/wrappers/rc.rs b/differential-dataflow/src/trace/wrappers/rc.rs deleted file mode 100644 index 76a44c129..000000000 --- a/differential-dataflow/src/trace/wrappers/rc.rs +++ /dev/null @@ -1,154 +0,0 @@ -//! A reference-counted wrapper sharing one owned trace. -//! -//! The types in this module, `TraceBox` and `TraceRc` and meant to parallel `RcBox` and `Rc` in `std::rc`. -//! -//! The first typee is an owned trace with some information about the cumulative requirements of the shared -//! handles. This is roughly how much progress has each made, so we know which "read capabilities" they have -//! collectively dropped, and when it is safe to inform the trace of such progress. -//! -//! The second type is a wrapper which presents as a `TraceReader`, but whose methods for advancing its read -//! capabilities interact with the `TraceBox` rather than directly with the owned trace. Ideally, instances -//! `TraceRc` should appear indistinguishable from the underlying trace from a reading perspective, with the -//! exception that the trace may not compact its representation as fast as if it were exclusively owned. - -use std::rc::Rc; -use std::cell::RefCell; - -use timely::progress::{Antichain, frontier::{AntichainRef, MutableAntichain}}; - -use crate::trace::TraceReader; - -/// A wrapper around a trace which tracks the frontiers of all referees. -/// -/// This is an internal type, unlikely to be useful to higher-level programs, but exposed just in case. -/// This type is equivalent to a `RefCell`, in that it wraps the mutable state that multiple referrers -/// may influence. -pub struct TraceBox { - /// accumulated holds on times for advancement. - pub logical_compaction: MutableAntichain, - /// accumulated holds on times for distinction. - pub physical_compaction: MutableAntichain, - /// The wrapped trace. - pub trace: Tr, -} - -impl TraceBox { - /// Moves an existing trace into a shareable trace wrapper. - /// - /// The trace may already exist and have non-initial advance and distinguish frontiers. The boxing - /// process will fish these out and make sure that they are used for the initial read capabilities. - pub fn new(mut trace: Tr) -> Self { - - let mut logical_compaction = MutableAntichain::new(); - logical_compaction.update_iter(trace.get_logical_compaction().iter().cloned().map(|t| (t,1))); - let mut physical_compaction = MutableAntichain::new(); - physical_compaction.update_iter(trace.get_physical_compaction().iter().cloned().map(|t| (t,1))); - - TraceBox { - logical_compaction, - physical_compaction, - trace, - } - } - /// Replaces elements of `lower` with those of `upper`. - #[inline] - pub fn adjust_logical_compaction(&mut self, lower: AntichainRef, upper: AntichainRef) { - self.logical_compaction.update_iter(upper.iter().cloned().map(|t| (t,1))); - self.logical_compaction.update_iter(lower.iter().cloned().map(|t| (t,-1))); - self.trace.set_logical_compaction(self.logical_compaction.frontier()); - } - /// Replaces elements of `lower` with those of `upper`. - #[inline] - pub fn adjust_physical_compaction(&mut self, lower: AntichainRef, upper: AntichainRef) { - self.physical_compaction.update_iter(upper.iter().cloned().map(|t| (t,1))); - self.physical_compaction.update_iter(lower.iter().cloned().map(|t| (t,-1))); - self.trace.set_physical_compaction(self.physical_compaction.frontier()); - } -} - -/// A handle to a shared trace. -/// -/// As long as the handle exists, the wrapped trace should continue to exist and will not advance its -/// timestamps past the frontier maintained by the handle. The intent is that such a handle appears as -/// if it is a privately maintained trace, despite being backed by shared resources. -pub struct TraceRc { - logical_compaction: Antichain, - physical_compaction: Antichain, - /// Wrapped trace. Please be gentle when using. - pub wrapper: Rc>>, -} - -use crate::trace::WithLayout; -impl WithLayout for TraceRc { - type Layout = Tr::Layout; -} - -impl TraceReader for TraceRc { - - type Batch = Tr::Batch; - type Storage = Tr::Storage; - type Cursor = Tr::Cursor; - - /// Sets frontier to now be elements in `frontier`. - /// - /// This change may not have immediately observable effects. It informs the shared trace that this - /// handle no longer requires access to times other than those in the future of `frontier`, but if - /// there are other handles to the same trace, it may not yet be able to compact. - fn set_logical_compaction(&mut self, frontier: AntichainRef<'_, Tr::Time>) { - self.wrapper.borrow_mut().adjust_logical_compaction(self.logical_compaction.borrow(), frontier); - self.logical_compaction = frontier.to_owned(); - } - fn get_logical_compaction(&mut self) -> AntichainRef<'_, Tr::Time> { self.logical_compaction.borrow() } - /// Allows the trace to compact batches of times before `frontier`. - fn set_physical_compaction(&mut self, frontier: AntichainRef<'_, Tr::Time>) { - self.wrapper.borrow_mut().adjust_physical_compaction(self.physical_compaction.borrow(), frontier); - self.physical_compaction = frontier.to_owned(); - } - fn get_physical_compaction(&mut self) -> AntichainRef<'_, Tr::Time> { self.physical_compaction.borrow() } - /// Creates a new cursor over the wrapped trace. - fn cursor_through(&mut self, frontier: AntichainRef<'_, Tr::Time>) -> Option<(Tr::Cursor, Tr::Storage)> { - ::std::cell::RefCell::borrow_mut(&self.wrapper).trace.cursor_through(frontier) - } - - fn map_batches(&self, f: F) { - ::std::cell::RefCell::borrow(&self.wrapper).trace.map_batches(f) - } -} - -impl TraceRc { - /// Allocates a new handle from an existing wrapped wrapper. - pub fn make_from(trace: Tr) -> (Self, Rc>>) { - - let wrapped = Rc::new(RefCell::new(TraceBox::new(trace))); - - let handle = TraceRc { - logical_compaction: wrapped.borrow().logical_compaction.frontier().to_owned(), - physical_compaction: wrapped.borrow().physical_compaction.frontier().to_owned(), - wrapper: wrapped.clone(), - }; - - (handle, wrapped) - } -} - -impl Clone for TraceRc { - fn clone(&self) -> Self { - // increase ref counts for this frontier - self.wrapper.borrow_mut().adjust_logical_compaction(Antichain::new().borrow(), self.logical_compaction.borrow()); - self.wrapper.borrow_mut().adjust_physical_compaction(Antichain::new().borrow(), self.physical_compaction.borrow()); - TraceRc { - logical_compaction: self.logical_compaction.clone(), - physical_compaction: self.physical_compaction.clone(), - wrapper: self.wrapper.clone(), - } - } -} - -impl Drop for TraceRc { - fn drop(&mut self) { - self.wrapper.borrow_mut().adjust_logical_compaction(self.logical_compaction.borrow(), Antichain::new().borrow()); - self.wrapper.borrow_mut().adjust_physical_compaction(self.physical_compaction.borrow(), Antichain::new().borrow()); - self.logical_compaction = Antichain::new(); - self.physical_compaction = Antichain::new(); - } -} diff --git a/differential-dataflow/tests/bfs.rs b/differential-dataflow/tests/bfs.rs index 2362bf6af..6093287c4 100644 --- a/differential-dataflow/tests/bfs.rs +++ b/differential-dataflow/tests/bfs.rs @@ -101,7 +101,7 @@ fn bfs_sequential( if time <= round { *edges.entry((src, dst)).or_insert(0) += diff; } } - let mut dists = vec![usize::max_value(); nodes]; + let mut dists = vec![usize::MAX; nodes]; for (&key, &val) in roots.iter() { if val > 0 { dists[key] = 0; } } @@ -111,7 +111,7 @@ fn bfs_sequential( changes = false; for (&(src, dst), &cnt) in edges.iter() { if cnt > 0 { - if dists[src] != usize::max_value() && dists[dst] > dists[src] + 1 { + if dists[src] != usize::MAX && dists[dst] > dists[src] + 1 { dists[dst] = dists[src] + 1; changes = true; } @@ -121,7 +121,7 @@ fn bfs_sequential( let mut new_counts = vec![0; nodes]; for &value in dists.iter() { - if value != usize::max_value() { + if value != usize::MAX { new_counts[value] += 1; } } diff --git a/dogsdogsdogs/examples/dogsdogsdogs.rs b/dogsdogsdogs/examples/dogsdogsdogs.rs index be3ce2f90..110ec3922 100644 --- a/dogsdogsdogs/examples/dogsdogsdogs.rs +++ b/dogsdogsdogs/examples/dogsdogsdogs.rs @@ -45,7 +45,7 @@ fn main() { let (edges_input, edges) = scope.new_collection(); // determine stream of (prefix, count, index) indicating relation with fewest extensions. - let counts = edges.map(|p| (p, usize::max_value(), usize::max_value())); + let counts = edges.map(|p| (p, usize::MAX, usize::MAX)); let counts0 = index_xz.count(counts, 0); let counts1 = index_yz.count(counts0, 1); diff --git a/dogsdogsdogs/examples/ngo.rs b/dogsdogsdogs/examples/ngo.rs index 9c1177a58..4bdabe399 100644 --- a/dogsdogsdogs/examples/ngo.rs +++ b/dogsdogsdogs/examples/ngo.rs @@ -62,8 +62,8 @@ where let winners = cand_count1.concat(cand_count2) .reduce(|_srcdst, counts, output| { if counts.len() == 2 { - let mut min_cnt = isize::max_value(); - let mut min_idx = usize::max_value(); + let mut min_cnt = isize::MAX; + let mut min_idx = usize::MAX; for &(&idx, cnt) in counts.iter() { if min_cnt > cnt { min_idx = idx; diff --git a/experiments/src/bin/arrange.rs b/experiments/src/bin/arrange.rs index 6b022a327..96a03e9ad 100644 --- a/experiments/src/bin/arrange.rs +++ b/experiments/src/bin/arrange.rs @@ -28,7 +28,7 @@ fn main() { let keys: usize = std::env::args().nth(1).unwrap().parse().unwrap(); let recs: usize = std::env::args().nth(2).unwrap().parse().unwrap(); let rate: usize = std::env::args().nth(3).unwrap().parse().unwrap(); - let work: usize = std::env::args().nth(4).unwrap().parse().unwrap_or(usize::max_value()); + let work: usize = std::env::args().nth(4).unwrap().parse().unwrap_or(usize::MAX); let comp: Comp = match std::env::args().nth(5).unwrap().as_str() { "exchange" => Comp::Exchange, "arrange" => Comp::Arrange, @@ -224,4 +224,4 @@ fn main() { } }).unwrap(); -} \ No newline at end of file +} diff --git a/experiments/src/bin/graphs-interactive-alt.rs b/experiments/src/bin/graphs-interactive-alt.rs index 838836d5e..c29be8403 100644 --- a/experiments/src/bin/graphs-interactive-alt.rs +++ b/experiments/src/bin/graphs-interactive-alt.rs @@ -152,11 +152,11 @@ fn main() { q4.insert((rng3.gen_range(0, nodes), rng3.gen_range(0, nodes))); } - q1.advance_to(usize::max_value()); q1.flush(); // q1 queries start now. - q2.advance_to(usize::max_value()); q2.flush(); // q2 queries start here. - q3.advance_to(usize::max_value()); q3.flush(); // q3 queries start here. - q4.advance_to(usize::max_value()); q4.flush(); // q4 queries start here. - state.advance_to(usize::max_value()); state.flush(); + q1.advance_to(usize::MAX); q1.flush(); // q1 queries start now. + q2.advance_to(usize::MAX); q2.flush(); // q2 queries start here. + q3.advance_to(usize::MAX); q3.flush(); // q3 queries start here. + q4.advance_to(usize::MAX); q4.flush(); // q4 queries start here. + state.advance_to(usize::MAX); state.flush(); graph.advance_to(1); graph.flush(); // finish graph loading work. diff --git a/experiments/src/bin/graphs-interactive-neu-zwei.rs b/experiments/src/bin/graphs-interactive-neu-zwei.rs index 8801a1948..fca927d38 100644 --- a/experiments/src/bin/graphs-interactive-neu-zwei.rs +++ b/experiments/src/bin/graphs-interactive-neu-zwei.rs @@ -115,7 +115,7 @@ fn main() { } query.advance_to(1); query.flush(); - state.advance_to(usize::max_value()); state.flush(); + state.advance_to(usize::MAX); state.flush(); graph.advance_to(1); graph.flush(); // finish graph loading work. diff --git a/experiments/src/bin/graphs-interactive-neu.rs b/experiments/src/bin/graphs-interactive-neu.rs index e1f26ad29..8344cb1f4 100644 --- a/experiments/src/bin/graphs-interactive-neu.rs +++ b/experiments/src/bin/graphs-interactive-neu.rs @@ -157,7 +157,7 @@ fn main() { q2.advance_to(1); q2.flush(); // q2 queries start here. q3.advance_to(1); q3.flush(); // q3 queries start here. q4.advance_to(1); q4.flush(); // q4 queries start here. - state.advance_to(usize::max_value()); state.flush(); + state.advance_to(usize::MAX); state.flush(); graph.advance_to(1); graph.flush(); // finish graph loading work. diff --git a/tpchlike/src/bin/arrange.rs b/tpchlike/src/bin/arrange.rs index c9ffcd48e..4cd6ee81b 100644 --- a/tpchlike/src/bin/arrange.rs +++ b/tpchlike/src/bin/arrange.rs @@ -173,7 +173,7 @@ fn main() { } // Finish outstanding work before stopping the timer. - let next_round = usize::max_value(); + let next_round = usize::MAX; inputs.0.as_mut().map(|x| x.advance_to(next_round)); inputs.1.as_mut().map(|x| x.advance_to(next_round)); inputs.2.as_mut().map(|x| x.advance_to(next_round)); @@ -243,4 +243,4 @@ where T: for<'a> From<&'a str> { result.reverse(); result -} \ No newline at end of file +} diff --git a/tpchlike/src/bin/just-arrange.rs b/tpchlike/src/bin/just-arrange.rs index ce78720ae..f4b9393b7 100644 --- a/tpchlike/src/bin/just-arrange.rs +++ b/tpchlike/src/bin/just-arrange.rs @@ -150,7 +150,7 @@ fn main() { } // Finish outstanding work before stopping the timer. - let next_round = usize::max_value(); + let next_round = usize::MAX; inputs.0.as_mut().map(|x| x.advance_to(next_round)); inputs.1.as_mut().map(|x| x.advance_to(next_round)); inputs.2.as_mut().map(|x| x.advance_to(next_round)); @@ -220,4 +220,4 @@ where T: for<'a> From<&'a str> { result.reverse(); result -} \ No newline at end of file +} diff --git a/tpchlike/src/bin/stream-concurrent.rs b/tpchlike/src/bin/stream-concurrent.rs index 0aae85e1d..85c0b449b 100644 --- a/tpchlike/src/bin/stream-concurrent.rs +++ b/tpchlike/src/bin/stream-concurrent.rs @@ -146,7 +146,7 @@ fn main() { } // Finish outstanding work before stopping the timer. - let next_round = usize::max_value(); + let next_round = usize::MAX; inputs.0.as_mut().map(|x| x.advance_to(next_round)); inputs.1.as_mut().map(|x| x.advance_to(next_round)); inputs.2.as_mut().map(|x| x.advance_to(next_round)); diff --git a/tpchlike/src/bin/stream.rs b/tpchlike/src/bin/stream.rs index d836cd19d..fbdc6385f 100644 --- a/tpchlike/src/bin/stream.rs +++ b/tpchlike/src/bin/stream.rs @@ -150,7 +150,7 @@ fn main() { } // Finish outstanding work before stopping the timer. - let next_round = usize::max_value(); + let next_round = usize::MAX; inputs.0.as_mut().map(|x| x.advance_to(next_round)); inputs.1.as_mut().map(|x| x.advance_to(next_round)); inputs.2.as_mut().map(|x| x.advance_to(next_round)); diff --git a/tpchlike/src/queries/query20.rs b/tpchlike/src/queries/query20.rs index 8c5476fce..26f0525e6 100644 --- a/tpchlike/src/queries/query20.rs +++ b/tpchlike/src/queries/query20.rs @@ -89,7 +89,7 @@ where G::Timestamp: Lattice+TotalOrder+Ord { let key: u64 = key; let avail2: isize = avail2; if avail1 > avail2 as i32 / 2 { - Some((key & (u32::max_value() as u64)) as usize) + Some((key & (u32::MAX as u64)) as usize) } else { None } }); @@ -137,4 +137,4 @@ where if starts_with(&n.name, b"CANADA") { Some((nm,ad)) } else { None } ) .probe_with(probe); -} \ No newline at end of file +}