diff --git a/datafusion/common/src/utils/proxy.rs b/datafusion/common/src/utils/proxy.rs index fddf834912544..42e374d238894 100644 --- a/datafusion/common/src/utils/proxy.rs +++ b/datafusion/common/src/utils/proxy.rs @@ -121,6 +121,7 @@ pub trait HashTableAllocExt { /// /// Returns the bucket where the element was inserted. /// Note that allocation counts capacity, not size. + /// This method assumes that the element is not already present /// /// # Example: /// ``` @@ -134,7 +135,7 @@ pub trait HashTableAllocExt { /// assert_eq!(allocated, 64); /// /// // insert more values - /// for i in 0..100 { + /// for i in 2..100 { /// table.insert_accounted(i, hash_fn, &mut allocated); /// } /// assert_eq!(allocated, 400); @@ -161,22 +162,24 @@ where ) { let hash = hasher(&x); - // NOTE: `find_entry` does NOT grow! - match self.find_entry(hash, |y| y == &x) { - Ok(_occupied) => {} - Err(_absent) => { - if self.len() == self.capacity() { - // need to request more memory - let bump_elements = self.capacity().max(16); - let bump_size = bump_elements * size_of::(); - *accounting = (*accounting).checked_add(bump_size).expect("overflow"); + if cfg!(debug_assertions) { + // In debug mode, check that the element is not already present + debug_assert!( + self.find_entry(hash, |y| y == &x).is_err(), + "attempted to insert duplicate element into HashTableAllocExt::insert_accounted" + ); + } - self.reserve(bump_elements, &hasher); - } + if self.len() == self.capacity() { + // need to request more memory + let bump_elements = self.capacity().max(16); + let bump_size = bump_elements * size_of::(); + *accounting = (*accounting).checked_add(bump_size).expect("overflow"); - // still need to insert the element since first try failed - self.entry(hash, |y| y == &x, hasher).insert(x); - } + self.reserve(bump_elements, &hasher); } + + // still need to insert the element since first try failed + self.insert_unique(hash, x, hasher); } } diff --git a/datafusion/physical-expr-common/src/binary_map.rs b/datafusion/physical-expr-common/src/binary_map.rs index ab95302bbb046..e2ffc9f2ef314 100644 --- a/datafusion/physical-expr-common/src/binary_map.rs +++ b/datafusion/physical-expr-common/src/binary_map.rs @@ -389,7 +389,7 @@ where // is value is already present in the set? let entry = self.map.find_mut(hash, |header| { // compare value if hashes match - if header.len != value_len { + if header.hash != hash { return false; } // value is stored inline so no need to consult buffer @@ -427,7 +427,7 @@ where // Check if the value is already present in the set let entry = self.map.find_mut(hash, |header| { // compare value if hashes match - if header.len != value_len { + if header.hash != hash { return false; } // Need to compare the bytes in the buffer diff --git a/datafusion/physical-expr-common/src/binary_view_map.rs b/datafusion/physical-expr-common/src/binary_view_map.rs index 2de563472c789..7969244200568 100644 --- a/datafusion/physical-expr-common/src/binary_view_map.rs +++ b/datafusion/physical-expr-common/src/binary_view_map.rs @@ -273,11 +273,10 @@ where let value: &[u8] = value.as_ref(); let entry = self.map.find_mut(hash, |header| { - let v = self.builder.get_value(header.view_idx); - - if v.len() != value.len() { + if header.hash != hash { return false; } + let v = self.builder.get_value(header.view_idx); v == value }); diff --git a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs index c46cde8786eb4..2b8a2cfa68897 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs @@ -128,7 +128,9 @@ where let hash = key.hash(state); let insert = self.map.entry( hash, - |&(g, _)| unsafe { self.values.get_unchecked(g).is_eq(key) }, + |&(g, h)| unsafe { + hash == h && self.values.get_unchecked(g).is_eq(key) + }, |&(_, h)| h, );