diff --git a/Cargo.toml b/Cargo.toml index 4a63c17..a9ac4ef 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ members = ["codegen", "examples", "performance_measurement", "performance_measur [package] name = "worktable" -version = "0.8.13" +version = "0.8.15" edition = "2024" authors = ["Handy-caT"] license = "MIT" @@ -16,7 +16,7 @@ perf_measurements = ["dep:performance_measurement", "dep:performance_measurement # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -worktable_codegen = { path = "codegen", version = "0.8.6" } +worktable_codegen = { path = "codegen", version = "0.8.14" } eyre = "0.6.12" derive_more = { version = "2.0.1", features = ["from", "error", "display", "into"] } @@ -29,7 +29,7 @@ futures = "0.3.30" uuid = { version = "1.10.0", features = ["v4", "v7"] } data_bucket = "0.3.6" # data_bucket = { git = "https://github.com/pathscale/DataBucket", branch = "page_cdc_correction", version = "0.2.7" } -# data_bucket = { path = "../DataBucket", version = "0.3.4" } +# data_bucket = { path = "../DataBucket", version = "0.3.6" } performance_measurement_codegen = { path = "performance_measurement/codegen", version = "0.1.0", optional = true } performance_measurement = { path = "performance_measurement", version = "0.1.0", optional = true } # indexset = { version = "0.12.3", features = ["concurrent", "cdc", "multimap"] } diff --git a/codegen/Cargo.toml b/codegen/Cargo.toml index 5cd8c12..643768b 100644 --- a/codegen/Cargo.toml +++ b/codegen/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "worktable_codegen" -version = "0.8.6" +version = "0.8.14" edition = "2024" license = "MIT" description = "WorkTable codegeneration crate" diff --git a/codegen/src/worktable/generator/queries/delete.rs b/codegen/src/worktable/generator/queries/delete.rs index 9da2983..5b9b13d 100644 --- a/codegen/src/worktable/generator/queries/delete.rs +++ b/codegen/src/worktable/generator/queries/delete.rs @@ -36,7 +36,7 @@ impl Generator { fn gen_full_row_delete(&mut self) -> TokenStream { let name_generator = WorktableNameGenerator::from_table_name(self.name.to_string()); let pk_ident = name_generator.get_primary_key_type_ident(); - let delete_logic = self.gen_delete_logic(); + let delete_logic = self.gen_delete_logic(true); let full_row_lock = self.gen_full_lock_for_update(); quote! { @@ -58,7 +58,7 @@ impl Generator { fn gen_full_row_delete_without_lock(&mut self) -> TokenStream { let name_generator = WorktableNameGenerator::from_table_name(self.name.to_string()); let pk_ident = name_generator.get_primary_key_type_ident(); - let delete_logic = self.gen_delete_logic(); + let delete_logic = self.gen_delete_logic(false); quote! { pub fn delete_without_lock(&self, pk: #pk_ident) -> core::result::Result<(), WorkTableError> { @@ -68,7 +68,7 @@ impl Generator { } } - fn gen_delete_logic(&self) -> TokenStream { + fn gen_delete_logic(&self, is_locked: bool) -> TokenStream { let name_generator = WorktableNameGenerator::from_table_name(self.name.to_string()); let pk_ident = name_generator.get_primary_key_type_ident(); let secondary_events_ident = name_generator.get_space_secondary_index_events_ident(); @@ -97,15 +97,33 @@ impl Generator { self.0.data.delete(link).map_err(WorkTableError::PagesError)?; } }; - - quote! { - let link = self.0 - .pk_map - .get(&pk) - .map(|v| v.get().value) - .ok_or(WorkTableError::NotFound)?; - let row = self.select(pk.clone()).unwrap(); - #process + if is_locked { + quote! { + let link = match self.0 + .pk_map + .get(&pk) + .map(|v| v.get().value) + .ok_or(WorkTableError::NotFound) { + Ok(l) => l, + Err(e) => { + lock.unlock(); // Releases locks + self.0.lock_map.remove_with_lock_check(&pk).await; // Removes locks + return Err(e); + } + }; + let row = self.select(pk.clone()).unwrap(); + #process + } + } else { + quote! { + let link = self.0 + .pk_map + .get(&pk) + .map(|v| v.get().value) + .ok_or(WorkTableError::NotFound)?; + let row = self.select(pk.clone()).unwrap(); + #process + } } } diff --git a/codegen/src/worktable/generator/queries/update.rs b/codegen/src/worktable/generator/queries/update.rs index c84638b..dae5fb4 100644 --- a/codegen/src/worktable/generator/queries/update.rs +++ b/codegen/src/worktable/generator/queries/update.rs @@ -92,11 +92,19 @@ impl Generator { #full_row_lock }; - let link = self.0 + let link = match self.0 .pk_map .get(&pk) .map(|v| v.get().value) - .ok_or(WorkTableError::NotFound)?; + .ok_or(WorkTableError::NotFound) { + Ok(l) => l, + Err(e) => { + lock.unlock(); + self.0.lock_map.remove_with_lock_check(&pk).await; + + return Err(e); + } + }; let row_old = self.0.data.select_non_ghosted(link)?; self.0.update_state.insert(pk.clone(), row_old); @@ -469,11 +477,19 @@ impl Generator { #custom_lock }; - let link = self.0 + let link = match self.0 .pk_map .get(&pk) .map(|v| v.get().value) - .ok_or(WorkTableError::NotFound)?; + .ok_or(WorkTableError::NotFound) { + Ok(l) => l, + Err(e) => { + lock.unlock(); + self.0.lock_map.remove_with_lock_check(&pk).await; + + return Err(e); + } + }; let mut bytes = rkyv::to_bytes::(&row).map_err(|_| WorkTableError::SerializeError)?; let mut archived_row = unsafe { rkyv::access_unchecked_mut::<<#query_ident as rkyv::Archive>::Archived>(&mut bytes[..]).unseal_unchecked() }; @@ -702,10 +718,18 @@ impl Generator { #custom_lock }; - let link = self.0.indexes.#index + let link = match self.0.indexes.#index .get(#by) .map(|kv| kv.get().value) - .ok_or(WorkTableError::NotFound)?; + .ok_or(WorkTableError::NotFound) { + Ok(l) => l, + Err(e) => { + lock.unlock(); + self.0.lock_map.remove_with_lock_check(&pk).await; + + return Err(e); + } + }; let op_id = OperationId::Single(uuid::Uuid::now_v7()); #size_check diff --git a/src/in_memory/data.rs b/src/in_memory/data.rs index 0e824cb..ac7103b 100644 --- a/src/in_memory/data.rs +++ b/src/in_memory/data.rs @@ -158,6 +158,43 @@ impl Data { Ok(link) } + #[allow(clippy::missing_safety_doc)] + pub unsafe fn try_save_row_by_link( + &self, + row: &Row, + mut link: Link, + ) -> Result<(Link, Option), ExecutionError> + where + Row: Archive + + for<'a> Serialize< + Strategy, Share>, rkyv::rancor::Error>, + >, + { + let bytes = rkyv::to_bytes(row).map_err(|_| ExecutionError::SerializeError)?; + let length = bytes.len() as u32; + if length > link.length { + return Err(ExecutionError::InvalidLink); + } + + let link_diff = link.length - length; + let link_left = if link_diff > 0 { + link.length -= link_diff; + Some(Link { + page_id: link.page_id, + offset: link.offset + link.length, + length: link_diff, + }) + } else { + None + }; + + let inner_data = unsafe { &mut *self.inner_data.get() }; + inner_data[link.offset as usize..][..link.length as usize] + .copy_from_slice(bytes.as_slice()); + + Ok((link, link_left)) + } + /// # Safety /// This function is `unsafe` because it returns a mutable reference to an archived row. /// The caller must ensure that there are no other references to the same data diff --git a/src/in_memory/empty_link_registry.rs b/src/in_memory/empty_link_registry.rs new file mode 100644 index 0000000..35d5e06 --- /dev/null +++ b/src/in_memory/empty_link_registry.rs @@ -0,0 +1,199 @@ +use crate::in_memory::DATA_INNER_LENGTH; +use data_bucket::Link; +use indexset::concurrent::multimap::BTreeMultiMap; +use indexset::concurrent::set::BTreeSet; +use parking_lot::FairMutex; + +/// A link wrapper that implements `Ord` based on absolute index calculation. +#[derive(Copy, Clone, Debug, Eq, PartialEq)] +pub struct IndexOrdLink(pub Link); + +impl IndexOrdLink { + /// Calculates the absolute index of the link. + fn absolute_index(&self) -> u64 { + let page_id: u32 = self.0.page_id.into(); + (page_id as u64 * DATA_LENGTH as u64) + self.0.offset as u64 + } + + fn unite_with_right_neighbor(&self, other: &Self) -> Option { + let self_end = self.absolute_index() + self.0.length as u64; + let other_start = other.absolute_index(); + + if self.0.page_id != other.0.page_id { + return None; + } + + if self_end == other_start { + let new_length = self.0.length + other.0.length; + Some(IndexOrdLink(Link { + page_id: self.0.page_id, + offset: self.0.offset, + length: new_length, + })) + } else { + None + } + } + + fn unite_with_left_neighbor(&self, other: &Self) -> Option { + let other_end = other.absolute_index() + other.0.length as u64; + let self_start = self.absolute_index(); + + if self.0.page_id != other.0.page_id { + return None; + } + + if other_end == self_start { + let new_offset = other.0.offset; + let new_length = self.0.length + other.0.length; + Some(IndexOrdLink(Link { + page_id: other.0.page_id, + offset: new_offset, + length: new_length, + })) + } else { + None + } + } +} + +impl PartialOrd for IndexOrdLink { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for IndexOrdLink { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.absolute_index().cmp(&other.absolute_index()) + } +} + +#[derive(Debug)] +pub struct EmptyLinkRegistry { + index_ord_links: BTreeSet>, + length_ord_links: BTreeMultiMap, + op_lock: FairMutex<()>, +} + +impl Default for EmptyLinkRegistry { + fn default() -> Self { + Self { + index_ord_links: BTreeSet::new(), + length_ord_links: BTreeMultiMap::new(), + op_lock: Default::default(), + } + } +} + +impl EmptyLinkRegistry { + pub fn push(&self, link: Link) { + let mut index_ord_link = IndexOrdLink(link); + let _g = self.op_lock.lock(); + + { + let mut iter = self.index_ord_links.range(..index_ord_link).rev(); + if let Some(possible_left_neighbor) = iter.next() { + let possible_left_neighbor = *possible_left_neighbor; + if let Some(united_link) = + index_ord_link.unite_with_left_neighbor(&possible_left_neighbor) + { + drop(iter); + + // Remove left neighbor + self.index_ord_links.remove(&possible_left_neighbor); + self.length_ord_links + .remove(&possible_left_neighbor.0.length, &possible_left_neighbor.0); + + index_ord_link = united_link; + } + } + } + + { + let mut iter = self.index_ord_links.range(index_ord_link..); + if let Some(possible_right_neighbor) = iter.next() { + let possible_right_neighbor = *possible_right_neighbor; + if let Some(united_link) = + index_ord_link.unite_with_right_neighbor(&possible_right_neighbor) + { + drop(iter); + + // Remove right neighbor + self.index_ord_links.remove(&possible_right_neighbor); + self.length_ord_links.remove( + &possible_right_neighbor.0.length, + &possible_right_neighbor.0, + ); + + index_ord_link = united_link; + } + } + } + + self.index_ord_links.insert(index_ord_link); + self.length_ord_links + .insert(index_ord_link.0.length, index_ord_link.0); + } + + pub fn pop_max(&self) -> Option { + let _g = self.op_lock.lock(); + + let mut iter = self.length_ord_links.iter().rev(); + let (len, max_length_link) = iter.next()?; + let index_ord_link = IndexOrdLink(*max_length_link); + drop(iter); + + self.length_ord_links.remove(len, max_length_link); + self.index_ord_links.remove(&index_ord_link); + + Some(index_ord_link.0) + } + + pub fn iter(&self) -> impl Iterator + '_ { + self.index_ord_links.iter().map(|l| l.0) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_empty_link_registry_insert_and_pop() { + let registry = EmptyLinkRegistry::::default(); + + let link1 = Link { + page_id: 1.into(), + offset: 0, + length: 100, + }; + + let link2 = Link { + page_id: 1.into(), + offset: 100, + length: 150, + }; + + let link3 = Link { + page_id: 2.into(), + offset: 0, + length: 200, + }; + + registry.push(link1); + registry.push(link2); + registry.push(link3); + + // After inserting link1 and link2, they should be united + let united_link = Link { + page_id: 1.into(), + offset: 0, + length: 250, + }; + + assert_eq!(registry.pop_max(), Some(united_link)); + assert_eq!(registry.pop_max(), Some(link3)); + assert_eq!(registry.pop_max(), None); + } +} diff --git a/src/in_memory/mod.rs b/src/in_memory/mod.rs index 1a14334..addfaae 100644 --- a/src/in_memory/mod.rs +++ b/src/in_memory/mod.rs @@ -1,4 +1,5 @@ mod data; +mod empty_link_registry; mod pages; mod row; diff --git a/src/in_memory/pages.rs b/src/in_memory/pages.rs index 65823b9..9432979 100644 --- a/src/in_memory/pages.rs +++ b/src/in_memory/pages.rs @@ -6,7 +6,6 @@ use std::{ use data_bucket::page::PageId; use derive_more::{Display, Error, From}; -use lockfree::stack::Stack; #[cfg(feature = "perf_measurements")] use performance_measurement_codegen::performance_measurement; use rkyv::{ @@ -17,6 +16,7 @@ use rkyv::{ util::AlignedVec, }; +use crate::in_memory::empty_link_registry::EmptyLinkRegistry; use crate::{ in_memory::{ DATA_INNER_LENGTH, Data, DataExecutionError, @@ -37,9 +37,7 @@ where /// Pages vector. Currently, not lock free. pages: RwLock::WrappedRow, DATA_LENGTH>>>>, - /// Stack with empty [`Link`]s. It stores [`Link`]s of rows that was deleted. - // TODO: Proper empty links registry + defragmentation - empty_links: Stack, + empty_links: EmptyLinkRegistry, /// Count of saved rows. row_count: AtomicU64, @@ -68,7 +66,7 @@ where Self { // We are starting ID's from `1` because `0`'s page in file is info page. pages: RwLock::new(vec![Arc::new(Data::new(1.into()))]), - empty_links: Stack::new(), + empty_links: EmptyLinkRegistry::::default(), row_count: AtomicU64::new(0), last_page_id: AtomicU32::new(1), current_page_id: AtomicU32::new(1), @@ -83,7 +81,7 @@ where let last_page_id = vec.len(); Self { pages: RwLock::new(vec), - empty_links: Stack::new(), + empty_links: EmptyLinkRegistry::default(), row_count: AtomicU64::new(0), last_page_id: AtomicU32::new(last_page_id as u32), current_page_id: AtomicU32::new(last_page_id as u32), @@ -104,24 +102,35 @@ where { let general_row = ::WrappedRow::from_inner(row); - if let Some(link) = self.empty_links.pop() { + //println!("Popping empty link"); + if let Some(link) = self.empty_links.pop_max() { + //println!("Empty link len {}", self.empty_links.len()); let pages = self.pages.read().unwrap(); let current_page: usize = page_id_mapper(link.page_id.into()); let page = &pages[current_page]; - if let Err(e) = unsafe { page.save_row_by_link(&general_row, link) } { - match e { + match unsafe { page.try_save_row_by_link(&general_row, link) } { + Ok((link, left_link)) => { + if let Some(l) = left_link { + //println!("Pushing empty link"); + self.empty_links.push(l); + //println!("Pushed empty link"); + } + return Ok(link); + } + // Ok(l) => return Ok(l), + Err(e) => match e { DataExecutionError::InvalidLink => { + //println!("Pushing empty link"); self.empty_links.push(link); + //println!("Pushed empty link"); } DataExecutionError::PageIsFull { .. } | DataExecutionError::PageTooSmall { .. } | DataExecutionError::SerializeError | DataExecutionError::DeserializeError => return Err(e.into()), - } - } else { - return Ok(link); - }; + }, + } } loop { @@ -311,7 +320,9 @@ where } pub fn delete(&self, link: Link) -> Result<(), ExecutionError> { + //println!("Pushing empty link"); self.empty_links.push(link); + //println!("Pushed empty link"); Ok(()) } @@ -337,20 +348,15 @@ where } pub fn get_empty_links(&self) -> Vec { - let mut res = vec![]; - for l in self.empty_links.pop_iter() { - res.push(l) - } - - res + self.empty_links.iter().collect() } pub fn with_empty_links(mut self, links: Vec) -> Self { - let stack = Stack::new(); + let registry = EmptyLinkRegistry::default(); for l in links { - stack.push(l) + registry.push(l) } - self.empty_links = stack; + self.empty_links = registry; self } @@ -496,7 +502,7 @@ mod tests { let link = pages.insert(row).unwrap(); pages.delete(link).unwrap(); - assert_eq!(pages.empty_links.pop(), Some(link)); + assert_eq!(pages.empty_links.pop_max(), Some(link)); pages.empty_links.push(link); let row = TestRow { a: 20, b: 20 }; diff --git a/src/lock/mod.rs b/src/lock/mod.rs index 57a6e76..4c5a230 100644 --- a/src/lock/mod.rs +++ b/src/lock/mod.rs @@ -35,6 +35,12 @@ impl Hash for Lock { } } +impl Drop for Lock { + fn drop(&mut self) { + self.unlock() + } +} + impl Lock { pub fn new(id: u16) -> Self { Self { diff --git a/tests/worktable/unsized_.rs b/tests/worktable/unsized_.rs index 834b0de..b6dc787 100644 --- a/tests/worktable/unsized_.rs +++ b/tests/worktable/unsized_.rs @@ -1,9 +1,8 @@ -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; use std::time::Duration; use parking_lot::Mutex; - use worktable::prelude::*; use worktable::worktable; @@ -174,9 +173,13 @@ async fn test_update_string_by_non_unique() { } ); let empty_links = table.0.data.get_empty_links(); - assert_eq!(empty_links.len(), 2); - assert!(empty_links.contains(&first_link)); - assert!(empty_links.contains(&second_link)) + assert_eq!(empty_links.len(), 1); + let l = Link { + page_id: first_link.page_id, + offset: first_link.offset, + length: first_link.length + second_link.length, + }; + assert!(empty_links.contains(&l)) } #[tokio::test] @@ -448,9 +451,13 @@ async fn test_update_many_strings_by_non_unique() { } ); let empty_links = table.0.data.get_empty_links(); - assert_eq!(empty_links.len(), 2); - assert!(empty_links.contains(&first_link)); - assert!(empty_links.contains(&second_link)) + assert_eq!(empty_links.len(), 1); + let l = Link { + page_id: first_link.page_id, + offset: first_link.offset, + length: first_link.length + second_link.length, + }; + assert!(empty_links.contains(&l)); } #[tokio::test] @@ -512,9 +519,13 @@ async fn test_update_many_strings_by_string() { } ); let empty_links = table.0.data.get_empty_links(); - assert_eq!(empty_links.len(), 2); - assert!(empty_links.contains(&first_link)); - assert!(empty_links.contains(&second_link)) + assert_eq!(empty_links.len(), 1); + let l = Link { + page_id: first_link.page_id, + offset: first_link.offset, + length: first_link.length + second_link.length, + }; + assert!(empty_links.contains(&l)); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] @@ -770,6 +781,70 @@ async fn update_parallel_more_strings_with_select_non_unique() { } } +#[tokio::test(flavor = "multi_thread", worker_threads = 3)] +async fn delete_parallel() { + let table = Arc::new(TestMoreStringsWorkTable::default()); + let deleted_state = Arc::new(Mutex::new(HashSet::new())); + for i in 0..1000 { + let e_val = fastrand::u8(0..100); + let s_val = fastrand::u8(0..100); + let row = TestMoreStringsRow { + id: table.get_next_pk().into(), + test: i + 1, + another: 1, + exchange: format!("test_{e_val}"), + some_string: format!("some_{s_val}"), + other_srting: format!("other_{i}"), + }; + let _ = table.insert(row.clone()).unwrap(); + } + let shared = table.clone(); + let h1 = tokio::spawn(async move { + for i in 1_000..6_000 { + let e_val = fastrand::u8(0..100); + let s_val = fastrand::u8(0..100); + let row = TestMoreStringsRow { + id: shared.get_next_pk().into(), + test: i + 1, + another: 1, + exchange: format!("test_{e_val}"), + some_string: format!("some_{s_val}"), + other_srting: format!("other_{i}"), + }; + let _ = shared.insert(row.clone()).unwrap(); + } + }); + let shared = table.clone(); + let shared_deleted_state = deleted_state.clone(); + let h2 = tokio::spawn(async move { + for _ in 0..1_000 { + let id_to_update = fastrand::u64(0..1000); + let _ = shared.delete(id_to_update.into()).await; + { + let mut guard = shared_deleted_state.lock(); + guard.insert(id_to_update); + } + } + }); + for _ in 0..5_000 { + let val = fastrand::u8(0..100); + let vals = table + .select_by_exchange(format!("test_{val}")) + .execute() + .unwrap(); + for v in vals { + assert_eq!(v.exchange, format!("test_{val}")) + } + } + h1.await.unwrap(); + h2.await.unwrap(); + + for id in deleted_state.lock_arc().iter() { + let row = table.select(*id); + assert!(row.is_none()) + } +} + #[tokio::test(flavor = "multi_thread", worker_threads = 3)] async fn update_parallel_more_strings_with_select_unique() { let table = Arc::new(TestMoreStringsWorkTable::default()); @@ -843,8 +918,58 @@ async fn update_parallel_more_strings_with_select_unique() { let row = table.select(*id).unwrap(); assert_eq!(&row.exchange, e) } - for (id, a) in a_state.lock_arc().iter() { +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 3)] +async fn upsert_parallel() { + let table = Arc::new(TestMoreStringsWorkTable::default()); + let e_state = Arc::new(Mutex::new(HashMap::new())); + for i in 0..1000 { + let e_val = fastrand::u8(0..100); + let s_val = fastrand::u8(0..100); + let row = TestMoreStringsRow { + id: table.get_next_pk().into(), + test: i, + another: 1, + exchange: format!("test_{e_val}"), + some_string: format!("some_{s_val}"), + other_srting: format!("other_{i}"), + }; + let _ = table.insert(row.clone()).unwrap(); + } + let shared = table.clone(); + let shared_e_state = e_state.clone(); + let h1 = tokio::spawn(async move { + for _ in 0..4_000 { + let val = fastrand::u8(0..100); + let id_to_update = fastrand::u64(0..1000); + let row = TestMoreStringsRow { + id: id_to_update, + test: id_to_update as i64, + another: 1, + exchange: format!("test_{val}"), + some_string: format!("some_{val}"), + other_srting: format!("other_{id_to_update}"), + }; + shared.upsert(row.clone()).await.unwrap(); + { + let mut guard = shared_e_state.lock(); + guard + .entry(id_to_update) + .and_modify(|v| *v = format!("test_{val}")) + .or_insert(format!("test_{val}")); + } + } + }); + for _ in 0..20_000 { + let val = fastrand::i64(0..1000); + let res = table.select_by_test(val); + assert!(res.is_some()) + } + h1.await.unwrap(); + + for (id, e) in e_state.lock_arc().iter() { let row = table.select(*id).unwrap(); - assert_eq!(&row.another, a) + assert_eq!(&row.exchange, e) } }