diff --git a/src/lib.rs b/src/lib.rs index 1a4af0e..3351a68 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,10 +9,10 @@ pub use link::Link; pub use data_bucket_codegen::SizeMeasure; pub use page::{ map_data_pages_to_general, map_index_pages_to_general, map_tree_index, map_unique_tree_index, - parse_data_page, parse_index_page, parse_page, persist_page, read_index_pages, seek_by_link, - seek_to_page_start, update_at, Data as DataPage, DataType, General as GeneralPage, - GeneralHeader, IndexPage as IndexData, Interval, PageType, SpaceInfo as SpaceInfoData, - DATA_VERSION, GENERAL_HEADER_SIZE, INNER_PAGE_SIZE, PAGE_SIZE, + parse_data_page, parse_index_page, parse_page, persist_page, read_data_pages, read_index_pages, + read_rows_schema, seek_by_link, seek_to_page_start, update_at, Data as DataPage, + General as GeneralPage, GeneralHeader, IndexPage as IndexData, Interval, PageType, + SpaceInfo as SpaceInfoData, DATA_VERSION, GENERAL_HEADER_SIZE, INNER_PAGE_SIZE, PAGE_SIZE, }; pub use persistence::{PersistableIndex, PersistableTable}; pub use util::{align, Persistable, SizeMeasurable}; diff --git a/src/page/data_type.rs b/src/page/data_type.rs deleted file mode 100644 index c70ac1a..0000000 --- a/src/page/data_type.rs +++ /dev/null @@ -1,8 +0,0 @@ -use rkyv::{Archive, Deserialize, Serialize}; - -#[derive(Archive, Clone, Deserialize, Debug, PartialEq, Serialize)] -pub enum DataType { - String = 0, - Integer = 1, // 64-bit integer - Float = 2, // 64-bit float -} diff --git a/src/page/iterators.rs b/src/page/iterators.rs new file mode 100644 index 0000000..f0c0634 --- /dev/null +++ b/src/page/iterators.rs @@ -0,0 +1,290 @@ +use std::io::Read; + +use rkyv::{de::Pool, rancor::Strategy, Archive, DeserializeUnsized}; + +use crate::{ + page::util::parse_general_header, + persistence::data::{rkyv_data::parse_archived_row, DataTypeValue}, + IndexData, Link, +}; + +use super::{index::{ArchivedIndexValue, IndexValue}, seek_by_link, seek_to_page_start, Interval, SpaceInfo}; + +pub struct PageIterator { + intervals: Vec, + current_intervals_index: usize, + current_position_in_interval: usize, +} + +impl PageIterator { + pub fn new(intervals: Vec) -> PageIterator { + PageIterator { + current_intervals_index: 0, + current_position_in_interval: if intervals.len() > 0 { + intervals[0].0 + } else { + 0 + }, + intervals, + } + } +} + +impl Iterator for PageIterator { + type Item = u32; + + fn next(&mut self) -> Option { + let mut result: Option = None; + + if self.current_intervals_index >= self.intervals.len() { + result = None + } else if self.current_position_in_interval + >= self.intervals[self.current_intervals_index].0 + && self.current_position_in_interval <= self.intervals[self.current_intervals_index].1 + { + result = Some(self.current_position_in_interval as u32); + self.current_position_in_interval += 1; + } else if self.current_position_in_interval > self.intervals[self.current_intervals_index].1 + { + self.current_intervals_index += 1; + if self.current_intervals_index >= self.intervals.len() { + result = None; + } else { + self.current_position_in_interval = self.intervals[self.current_intervals_index].0; + result = Some(self.current_position_in_interval as u32); + self.current_position_in_interval += 1; + } + } + + result + } +} + +pub struct LinksIterator<'a> { + file: &'a mut std::fs::File, + page_id: u32, + links: Option>, + link_index: usize, + primary_key_type: String, +} + +impl<'a> LinksIterator<'a> { + pub fn new( + file: &'a mut std::fs::File, + page_id: u32, + space_info: &SpaceInfo, + ) -> LinksIterator<'a> { + let primary_key_fields = &space_info.primary_key_fields; + let primary_key_type = space_info + .row_schema + .iter() + .filter(|(field_name, _field_type)| field_name == &primary_key_fields[0]) + .map(|(_field_name, field_type)| field_type) + .take(1) + .collect::>()[0]; + LinksIterator { + file, + page_id, + links: None, + link_index: 0, + primary_key_type: primary_key_type.clone(), + } + } +} + +fn parse_links(buffer: &[u8]) -> Vec +where T: Archive, + [ArchivedIndexValue]: DeserializeUnsized<[IndexValue], Strategy> +{ + let archived = unsafe { + rkyv::access_unchecked::< as Archive>::Archived>( + &buffer[..], + ) + }; + let index_records = + rkyv::deserialize::, rkyv::rancor::Error>(archived) + .expect("data should be valid") + .index_values; + + index_records + .iter() + .map(|index_value| index_value.link) + .collect::>() +} + +impl Iterator for LinksIterator<'_> { + type Item = Link; + + fn next(&mut self) -> Option { + if self.links.is_none() { + seek_to_page_start(&mut self.file, self.page_id).expect("page should be seekable"); + let header = parse_general_header(&mut self.file).expect("header should be readable"); + + let mut buffer: Vec = vec![0u8; header.data_length as usize]; + self.file + .read_exact(&mut buffer) + .expect("index data should be readable"); + + self.links = Some(match self.primary_key_type.as_str() { + "String" => parse_links::(&buffer), + "i128" => parse_links::(&buffer), + "i64" => parse_links::(&buffer), + "i32" => parse_links::(&buffer), + "i16" => parse_links::(&buffer), + "i8" => parse_links::(&buffer), + "u128" => parse_links::(&buffer), + "u64" => parse_links::(&buffer), + "u32" => parse_links::(&buffer), + "u16" => parse_links::(&buffer), + "u8" => parse_links::(&buffer), + "f64" => parse_links::(&buffer), + "f32" => parse_links::(&buffer), + _ => panic!( + "Unsupported primary key data type `{}`", + self.primary_key_type + ), + }); + } + + if self.link_index < self.links.as_deref().unwrap().len() { + let result = Some(self.links.as_deref().unwrap()[self.link_index]); + self.link_index += 1; + result + } else { + None + } + } +} + +pub struct DataIterator<'a> { + file: &'a mut std::fs::File, + schema: Vec<(String, String)>, + links: Vec, + link_index: usize, +} + +impl DataIterator<'_> { + pub fn new( + file: &mut std::fs::File, + schema: Vec<(String, String)>, + mut links: Vec, + ) -> DataIterator<'_> { + links.sort_by(|a, b| { + (a.page_id, a.offset) + .partial_cmp(&(b.page_id, b.offset)) + .unwrap() + }); + + DataIterator { + file, + schema, + links, + link_index: 0, + } + } +} + +impl Iterator for DataIterator<'_> { + type Item = Vec; + + fn next(&mut self) -> Option { + if self.link_index >= self.links.len() { + return None; + } + + let current_link = self.links[self.link_index]; + seek_by_link(&mut self.file, current_link).expect("the seek should be successful"); + let mut buffer = vec![0u8; current_link.length as usize]; + self.file + .read_exact(&mut buffer) + .expect("the data should be read"); + let row = parse_archived_row(&buffer, &self.schema); + + self.link_index += 1; + + Some(row) + } +} + +#[cfg(test)] +mod test { + use crate::{ + page::{iterators::DataIterator, util::parse_space_info, PageId}, + persistence::data::DataTypeValue, + Interval, Link, PAGE_SIZE, + }; + + use super::{LinksIterator, PageIterator}; + + #[test] + fn test_page_iterator() { + let interval1 = Interval(1, 2); + let interval2 = Interval(5, 7); + let page_iterator = PageIterator::new(vec![interval1, interval2]); + let collected = page_iterator.collect::>(); + assert_eq!(collected, vec![1, 2, 5, 6, 7]); + } + + #[test] + fn test_page_iterator_empty() { + let page_iterator = PageIterator::new(vec![]); + let collected = page_iterator.collect::>(); + assert_eq!(collected, Vec::::new()); + } + + #[test] + fn test_links_iterator() { + let filename = "tests/data/table_links_test.wt"; + super::super::util::test::create_test_database_file(filename); + + let mut file = std::fs::File::open(filename).unwrap(); + let space_info = parse_space_info::(&mut file).unwrap(); + let links = LinksIterator::<'_>::new(&mut file, 1, &space_info); + assert_eq!( + links.collect::>(), + vec![ + Link { + page_id: PageId(2), + offset: 0, + length: 24 + }, + Link { + page_id: PageId(2), + offset: 24, + length: 28 + } + ] + ); + } + + #[test] + fn test_pages_and_links_iterators() { + let filename = "tests/data/table_pages_and_links_test.wt"; + super::super::util::test::create_test_database_file(filename); + + let mut file = std::fs::File::open(filename).unwrap(); + let space_info = parse_space_info::(&mut file).unwrap(); + let index_intervals = space_info.primary_key_intervals.clone(); + + let pages_ids = PageIterator::new(index_intervals).collect::>(); + assert_eq!(pages_ids, vec![1]); + + let links = + LinksIterator::<'_>::new(&mut file, pages_ids[0], &space_info).collect::>(); + let data_iterator: DataIterator<'_> = + DataIterator::new(&mut file, space_info.row_schema, links); + assert_eq!( + data_iterator.collect::>(), + vec![ + vec![ + DataTypeValue::I32(1), + DataTypeValue::String("first string".to_string()) + ], + vec![ + DataTypeValue::I32(2), + DataTypeValue::String("second string".to_string()) + ] + ] + ); + } +} diff --git a/src/page/mod.rs b/src/page/mod.rs index 67bd103..c511210 100644 --- a/src/page/mod.rs +++ b/src/page/mod.rs @@ -1,23 +1,23 @@ mod data; -mod data_type; mod header; mod index; +mod iterators; mod space_info; mod ty; mod util; - use derive_more::{Display, From}; use rkyv::{Archive, Deserialize, Serialize}; pub use data::Data; -pub use data_type::DataType; pub use header::{GeneralHeader, DATA_VERSION}; pub use index::{map_tree_index, map_unique_tree_index, IndexPage}; +pub use iterators::{DataIterator, LinksIterator, PageIterator}; pub use space_info::{Interval, SpaceInfo}; pub use ty::PageType; pub use util::{ map_data_pages_to_general, map_index_pages_to_general, parse_data_page, parse_index_page, - parse_page, persist_page, read_index_pages, seek_by_link, seek_to_page_start, update_at, + parse_page, parse_space_info, persist_page, read_data_pages, read_index_pages, + read_rows_schema, seek_by_link, seek_to_page_start, update_at, }; // TODO: Move to config diff --git a/src/page/space_info.rs b/src/page/space_info.rs index 37f7f1e..2e903da 100644 --- a/src/page/space_info.rs +++ b/src/page/space_info.rs @@ -9,7 +9,6 @@ use rkyv::util::AlignedVec; use rkyv::{Archive, Deserialize, Serialize}; use crate::util::Persistable; -use crate::DataType; use crate::{space, Link}; pub type SpaceName = String; @@ -23,12 +22,14 @@ pub struct SpaceInfo { pub id: space::Id, pub page_count: u32, pub name: SpaceName, + pub row_schema: Vec<(String, String)>, + pub primary_key_fields: Vec, pub primary_key_intervals: Vec, + pub secondary_index_types: Vec<(String, String)>, pub secondary_index_intervals: HashMap>, pub data_intervals: Vec, pub pk_gen_state: Pk, pub empty_links_list: Vec, - pub secondary_index_map: HashMap, } /// Represents some interval between values. @@ -66,12 +67,14 @@ mod test { id: 0.into(), page_count: 0, name: "Test".to_string(), + row_schema: vec![], + primary_key_fields: vec![], primary_key_intervals: vec![], secondary_index_intervals: HashMap::new(), data_intervals: vec![], pk_gen_state: (), empty_links_list: vec![], - secondary_index_map: HashMap::new(), + secondary_index_types: vec![], }; let bytes = info.as_bytes(); assert!(bytes.as_ref().len() < INNER_PAGE_SIZE) diff --git a/src/page/util.rs b/src/page/util.rs index 013fa98..0e0c715 100644 --- a/src/page/util.rs +++ b/src/page/util.rs @@ -5,13 +5,15 @@ use eyre::eyre; use rkyv::api::high::HighDeserializer; use rkyv::Archive; +use super::index::IndexValue; +use super::{Interval, SpaceInfo}; use crate::page::header::GeneralHeader; use crate::page::ty::PageType; use crate::page::General; +use crate::persistence::data::rkyv_data::parse_archived_row; +use crate::persistence::data::DataTypeValue; use crate::{DataPage, GeneralPage, IndexData, Link, Persistable, GENERAL_HEADER_SIZE, PAGE_SIZE}; -use super::{Interval, SpaceInfo}; - pub fn map_index_pages_to_general(pages: Vec>) -> Vec>> { let mut header = &mut GeneralHeader::new(0.into(), PageType::Index, 0.into()); let mut general_pages = vec![]; @@ -91,16 +93,15 @@ where } pub fn seek_to_page_start(file: &mut std::fs::File, index: u32) -> eyre::Result<()> { - let current_position: u64 = file.stream_position()?; - let start_pos = index as i64 * PAGE_SIZE as i64; - file.seek(io::SeekFrom::Current(start_pos - current_position as i64))?; + file.seek(io::SeekFrom::Start(index as u64 * PAGE_SIZE as u64))?; Ok(()) } pub fn seek_by_link(file: &mut std::fs::File, link: Link) -> eyre::Result<()> { - seek_to_page_start(file, link.page_id.0)?; - file.seek(io::SeekFrom::Current(link.offset as i64))?; + file.seek(io::SeekFrom::Start( + link.page_id.0 as u64 * PAGE_SIZE as u64 + GENERAL_HEADER_SIZE as u64 + link.offset as u64, + ))?; Ok(()) } @@ -132,7 +133,7 @@ pub fn update_at( Ok(()) } -fn parse_general_header(file: &mut std::fs::File) -> eyre::Result { +pub fn parse_general_header(file: &mut std::fs::File) -> eyre::Result { let mut buffer = [0; GENERAL_HEADER_SIZE]; file.read_exact(&mut buffer)?; let archived = @@ -191,10 +192,34 @@ pub fn parse_data_page( }) } +pub fn parse_data_record( + file: &mut std::fs::File, + index: u32, + offset: u32, + length: u32, + schema: &Vec<(String, String)>, +) -> eyre::Result> { + seek_to_page_start(file, index)?; + let header = parse_general_header(file)?; + if header.page_type != PageType::Data { + return Err(eyre::Report::msg(format!( + "The type of the page with index {} is not `Data`", + index + ))); + } + file.seek(io::SeekFrom::Current(offset as i64))?; + let mut buffer = vec![0u8; length as usize]; + file.read_exact(&mut buffer)?; + + let parsed_record = parse_archived_row(&buffer, &schema); + + Ok(parsed_record) +} + pub fn parse_index_page( file: &mut std::fs::File, index: u32, -) -> eyre::Result>> +) -> eyre::Result>> where T: Archive, ::Archived: rkyv::Deserialize>, @@ -206,12 +231,11 @@ where file.read_exact(&mut buffer)?; let archived = unsafe { rkyv::access_unchecked::< as Archive>::Archived>(&buffer[..]) }; - let index: IndexData = rkyv::deserialize(archived).expect("data should be valid"); + let index_records: Vec> = rkyv::deserialize::, _>(archived) + .expect("data should be valid") + .index_values; - Ok(GeneralPage { - header, - inner: index, - }) + Ok(index_records) } pub fn parse_space_info( @@ -230,11 +254,11 @@ pub fn parse_space_info( Ok(space_info) } -pub fn read_index_pages( +pub fn read_secondary_index_pages( file: &mut std::fs::File, index_name: &str, intervals: Vec, -) -> eyre::Result>> +) -> eyre::Result>> where T: Archive, ::Archived: rkyv::Deserialize>, @@ -260,19 +284,106 @@ where } } - let mut result: Vec> = vec![]; + let mut result: Vec> = vec![]; + for interval in intervals.iter() { + for index in interval.0..=interval.1 { + let mut index_records = parse_index_page::(file, index as u32)?; + result.append(&mut index_records); + } + } + + Ok(result) +} + +pub fn read_index_pages( + file: &mut std::fs::File, + intervals: &Vec, +) -> eyre::Result>> +where + T: Archive, + ::Archived: rkyv::Deserialize>, +{ + let mut result: Vec> = vec![]; for interval in intervals.iter() { - for index in interval.0..interval.1 { - let index_page = parse_index_page::(file, index as u32)?; - result.push(index_page.inner); + for index in interval.0..=interval.1 { + let mut index_records = parse_index_page::(file, index as u32)?; + result.append(&mut index_records); } } + Ok(result) +} + +fn read_links( + mut file: &mut std::fs::File, + space_info: &SpaceInfo, +) -> eyre::Result> { + Ok( + read_index_pages::(&mut file, &space_info.primary_key_intervals)? + .iter() + .map(|index_value| index_value.link) + .collect::>(), + ) +} + +pub fn read_rows_schema( + file: &mut std::fs::File, +) -> eyre::Result> { + let space_info = parse_space_info::(file)?; + Ok(space_info.row_schema) +} + +pub fn read_data_pages( + mut file: &mut std::fs::File, +) -> eyre::Result>> { + let space_info = parse_space_info::(file)?; + let primary_key_fields = &space_info.primary_key_fields; + if primary_key_fields.len() != 1 { + panic!("Currently only single primary key is supported"); + } + + let primary_key_type = space_info + .row_schema + .iter() + .filter(|(field_name, _field_type)| field_name == &primary_key_fields[0]) + .map(|(_field_name, field_type)| field_type) + .take(1) + .collect::>()[0] + .as_str(); + let links = match primary_key_type { + "String" => read_links::(&mut file, &space_info)?, + "i128" => read_links::(&mut file, &space_info)?, + "i64" => read_links::(&mut file, &space_info)?, + "i32" => read_links::(&mut file, &space_info)?, + "i16" => read_links::(&mut file, &space_info)?, + "i8" => read_links::(&mut file, &space_info)?, + "u128" => read_links::(&mut file, &space_info)?, + "u64" => read_links::(&mut file, &space_info)?, + "u32" => read_links::(&mut file, &space_info)?, + "u16" => read_links::(&mut file, &space_info)?, + "u8" => read_links::(&mut file, &space_info)?, + "f64" => read_links::(&mut file, &space_info)?, + "f32" => read_links::(&mut file, &space_info)?, + _ => panic!("Unsupported primary key data type `{}`", primary_key_type), + }; + + let mut result: Vec> = vec![]; + for link in links { + let row = parse_data_record::( + &mut file, + link.page_id.0, + link.offset, + link.length, + &space_info.row_schema, + )?; + result.push(row); + } Ok(result) } #[cfg(test)] -mod test { +pub mod test { + use rkyv::{Archive, Deserialize, Serialize}; use scc::ebr::Guard; use scc::TreeIndex; use std::collections::HashMap; @@ -280,13 +391,16 @@ mod test { use std::path::Path; use crate::page::index::IndexValue; + use crate::page::util::read_secondary_index_pages; use crate::page::INNER_PAGE_SIZE; + use crate::persistence::data::DataTypeValue; use crate::{ - map_index_pages_to_general, map_unique_tree_index, DataType, GeneralHeader, GeneralPage, - IndexData, Interval, Link, PageType, SpaceInfoData, DATA_VERSION, PAGE_SIZE, + map_index_pages_to_general, map_unique_tree_index, read_data_pages, + GeneralHeader, GeneralPage, IndexData, Interval, Link, PageType, SpaceInfoData, + DATA_VERSION, PAGE_SIZE, }; - use super::{persist_page, read_index_pages}; + use super::persist_page; #[test] fn test_map() { @@ -349,6 +463,8 @@ mod test { id: 0.into(), page_count: 0, name: "Test".to_string(), + row_schema: vec![], + primary_key_fields: vec![], primary_key_intervals: vec![], secondary_index_intervals: HashMap::from([( "string_index".to_owned(), @@ -357,7 +473,7 @@ mod test { data_intervals: vec![], pk_gen_state: (), empty_links_list: vec![], - secondary_index_map: HashMap::from([("string_index".to_owned(), DataType::String)]), + secondary_index_types: vec![("string_index".to_string(), "String".to_string())], }; let space_info_page = GeneralPage { header: space_info_header, @@ -371,7 +487,7 @@ mod test { let mut index_pages = Vec::>>::new(); for interval in intervals { - for index in interval.0..interval.1 { + for index in interval.0..=interval.1 { let index_header = GeneralHeader { data_version: DATA_VERSION, space_id: 1.into(), @@ -408,9 +524,9 @@ mod test { if Path::new(filename).exists() { remove_file(filename).unwrap(); } - let mut file = std::fs::File::create(filename).unwrap(); + let mut file: std::fs::File = std::fs::File::create(filename).unwrap(); - let intervals = vec![Interval(1, 3), Interval(5, 8)]; + let intervals = vec![Interval(1, 2), Interval(5, 7)]; // create the space page let mut space_info_page = create_space_with_intervals(&intervals); @@ -423,16 +539,152 @@ mod test { // read the data let mut file = std::fs::File::open(filename).unwrap(); - let index_pages = read_index_pages::( + let index_pages = read_secondary_index_pages::( &mut file, "string_index", vec![Interval(1, 2), Interval(5, 6)], ) .unwrap(); - assert_eq!(index_pages[0].index_values.len(), 1); - assert_eq!(index_pages[0].index_values[0].key, "first_value"); - assert_eq!(index_pages[0].index_values[0].link.page_id, 2.into()); - assert_eq!(index_pages[0].index_values[0].link.offset, 0); - assert_eq!(index_pages[0].index_values[0].link.length, 0); + assert_eq!(index_pages.len(), 4); + assert_eq!(index_pages[0].key, "first_value"); + assert_eq!(index_pages[0].link.page_id, 2.into()); + assert_eq!(index_pages[0].link.offset, 0); + assert_eq!(index_pages[0].link.length, 0); + } + + #[derive(Archive, Debug, Deserialize, Serialize)] + struct TableStruct { + int1: i32, + string1: String, + } + + pub fn create_test_database_file(filename: &str) { + if Path::new(filename).exists() { + remove_file(filename).unwrap(); + } + let mut file: std::fs::File = std::fs::File::create(filename).unwrap(); + + let space_info_header = GeneralHeader { + data_version: DATA_VERSION, + space_id: 1.into(), + page_id: 0.into(), + previous_id: 0.into(), + next_id: 1.into(), + page_type: PageType::SpaceInfo, + data_length: 0u32, + }; + let space_info = SpaceInfoData { + id: 1.into(), + page_count: 4, + name: "test space".to_owned(), + row_schema: vec![ + ("int1".to_string(), "i32".to_string()), + ("string1".to_string(), "String".to_string()), + ], + primary_key_fields: vec!["int1".to_string()], + primary_key_intervals: vec![Interval(1, 1)], + secondary_index_types: vec![], + secondary_index_intervals: Default::default(), + data_intervals: vec![], + pk_gen_state: (), + empty_links_list: vec![], + }; + let mut space_info_page = GeneralPage { + header: space_info_header, + inner: space_info, + }; + persist_page(&mut space_info_page, &mut file).unwrap(); + + let index_header = GeneralHeader { + data_version: DATA_VERSION, + space_id: 1.into(), + page_id: 1.into(), + previous_id: 0.into(), + next_id: 2.into(), + page_type: PageType::Index, + data_length: 0, + }; + + let data_header = GeneralHeader { + data_version: DATA_VERSION, + space_id: 1.into(), + page_id: 2.into(), + previous_id: 2.into(), + next_id: 4.into(), + page_type: PageType::Data, + data_length: 0, + }; + + let data_row1 = TableStruct { + int1: 1, + string1: "first string".to_string(), + }; + + let data_row2 = TableStruct { + int1: 2, + string1: "second string".to_string(), + }; + + let data_row1_inner = rkyv::to_bytes::(&data_row1).unwrap(); + let data_row1_offset = 0; + let data_row1_length = data_row1_inner.len(); + + let data_row2_inner = rkyv::to_bytes::(&data_row2).unwrap(); + let data_row2_offset = data_row1_offset + data_row1_length; + let data_row2_length = data_row2_inner.len(); + + let data_rows12_buffer = [data_row1_inner, data_row2_inner].concat(); + + let mut data_page = GeneralPage::> { + header: data_header, + inner: data_rows12_buffer, + }; + + let index_data: IndexData = IndexData:: { + index_values: vec![ + IndexValue:: { + key: 1, + link: Link { + page_id: data_header.page_id, + offset: data_row1_offset as u32, + length: data_row1_length as u32, + }, + }, + IndexValue:: { + key: 2, + link: Link { + page_id: data_header.page_id, + offset: data_row2_offset as u32, + length: data_row2_length as u32, + }, + }, + ], + }; + let mut index_page = GeneralPage { + header: index_header, + inner: index_data, + }; + + persist_page(&mut index_page, &mut file).unwrap(); + persist_page(&mut data_page, &mut file).unwrap(); + } + + #[test] + fn test_read_table_data() { + let filename = "tests/data/table_with_rows.wt"; + create_test_database_file(filename); + + let mut file: std::fs::File = std::fs::File::open(filename).unwrap(); + let data_pages: Vec> = read_data_pages::(&mut file).unwrap(); + assert_eq!(data_pages[0][0], DataTypeValue::I32(1)); + assert_eq!( + data_pages[0][1], + DataTypeValue::String("first string".to_string()) + ); + assert_eq!(data_pages[1][0], DataTypeValue::I32(2)); + assert_eq!( + data_pages[1][1], + DataTypeValue::String("second string".to_string()) + ); } } diff --git a/src/persistence/data/mod.rs b/src/persistence/data/mod.rs new file mode 100644 index 0000000..290d568 --- /dev/null +++ b/src/persistence/data/mod.rs @@ -0,0 +1,12 @@ +pub mod rkyv_data; +mod types; +mod util; + +pub use types::DataTypeValue; + +pub trait DataType { + fn advance_accum(&self, accum: &mut usize); + fn from_pointer(&self, pointer: *const u8, start_pointer: *const u8) -> DataTypeValue; + fn advance_pointer_for_padding(&self, pointer: &mut *const u8, start_pointer: *const u8); + fn advance_pointer(&self, pointer: &mut *const u8); +} diff --git a/src/persistence/data/rkyv_data.rs b/src/persistence/data/rkyv_data.rs new file mode 100644 index 0000000..97f4811 --- /dev/null +++ b/src/persistence/data/rkyv_data.rs @@ -0,0 +1,152 @@ +use crate::persistence::data::types::DataTypeValue; +use std::str::FromStr; + +pub fn parse_archived_row, S2: AsRef>( + buf: &[u8], + columns: &Vec<(S1, S2)>, +) -> Vec { + let mut data_length: usize = { + let mut accum: usize = 0; + for column in columns.iter() { + let value = + DataTypeValue::from_str(column.1.as_ref()).expect("data type should be supported"); + let data_type = value.as_data_type(); + data_type.advance_accum(&mut accum); + } + accum + }; + if data_length % 4 != 0 { + data_length += 4 - data_length % 4; + } + + let start_pointer = unsafe { buf.as_ptr().add(buf.len()).sub(data_length) }; + let mut current_pointer = start_pointer; + let mut output: Vec<_> = vec![]; + for column in columns.iter() { + let value = + DataTypeValue::from_str(column.1.as_ref()).expect("data type should be supported"); + let data_type = value.as_data_type(); + let deserialized = data_type.from_pointer(current_pointer, start_pointer); + data_type.advance_pointer_for_padding(&mut current_pointer, start_pointer); + output.push(deserialized); + data_type.advance_pointer(&mut current_pointer); + } + output +} + +#[cfg(test)] +mod test { + use super::parse_archived_row; + use crate::persistence::data::types::DataTypeValue; + use rkyv::{Archive, Deserialize, Serialize}; + + #[derive(Archive, Serialize, Deserialize, Debug)] + struct Struct1 { + pub string1: String, + } + + #[test] + fn test_parse_archived_row() { + let buffer = rkyv::to_bytes::(&Struct1 { + string1: "000000000000000".to_string(), + }) + .unwrap(); + let parsed = parse_archived_row(&buffer, &vec![("string1", "String")]); + assert_eq!( + parsed, + [DataTypeValue::String("000000000000000".to_string())] + ) + } + + #[derive(Archive, Serialize, Deserialize, Debug)] + struct Struct2 { + pub int1: i32, + } + + #[test] + fn test_parse_archived_row_int() { + let buffer = rkyv::to_bytes::(&Struct2 { int1: 3 }).unwrap(); + let parsed = parse_archived_row(&buffer, &vec![("int1", "i32")]); + assert_eq!(parsed, [DataTypeValue::I32(3)]) + } + + #[derive(Archive, Serialize, Deserialize, Debug)] + struct Struct3 { + pub float1: f64, + } + + #[test] + fn test_parse_archived_row_float() { + let buffer = rkyv::to_bytes::(&Struct3 { + float1: 3.14159265358, + }) + .unwrap(); + let parsed = parse_archived_row(&buffer, &vec![("float1", "f64")]); + assert_eq!(parsed, [DataTypeValue::F64(3.14159265358)]) + } + + #[derive(Archive, Serialize, Deserialize, Debug)] + struct Struct4 { + pub string1: String, + pub int1: u32, + pub string2: String, + pub int2: u8, + pub int3: i8, + pub int4: u8, + pub int5: i32, + pub int6: u8, + pub string3: String, + pub int7: i8, + pub float1: f64, + } + + #[test] + fn test_parse_archived_row_many_fields() { + let buffer = rkyv::to_bytes::(&Struct4 { + string1: "000000000000000".to_string(), + int1: 20, + string2: "aaaaaaaa".to_string(), + int2: 3, + int3: 4, + int4: 5, + int5: 6, + int6: 7, + string3: "x".to_string(), + int7: 8, + float1: 3.14159265358, + }) + .unwrap(); + let parsed = parse_archived_row( + &buffer, + &vec![ + ("string1".to_string(), "String".to_string()), + ("int1".to_string(), "i32".to_string()), + ("string2".to_string(), "String".to_string()), + ("int2".to_string(), "u8".to_string()), + ("int3".to_string(), "i8".to_string()), + ("int4".to_string(), "u8".to_string()), + ("int5".to_string(), "i32".to_string()), + ("int6".to_string(), "u8".to_string()), + ("string3".to_string(), "String".to_string()), + ("int7".to_string(), "i8".to_string()), + ("float1".to_string(), "f64".to_string()), + ], + ); + assert_eq!( + parsed, + [ + DataTypeValue::String("000000000000000".to_string()), + DataTypeValue::I32(20), + DataTypeValue::String("aaaaaaaa".to_string()), + DataTypeValue::U8(3), + DataTypeValue::I8(4), + DataTypeValue::U8(5), + DataTypeValue::I32(6), + DataTypeValue::U8(7), + DataTypeValue::String("x".to_string()), + DataTypeValue::I8(8), + DataTypeValue::F64(3.14159265358f64), + ] + ) + } +} diff --git a/src/persistence/data/types.rs b/src/persistence/data/types.rs new file mode 100644 index 0000000..c3d0d31 --- /dev/null +++ b/src/persistence/data/types.rs @@ -0,0 +1,144 @@ +use std::str::FromStr; + +use derive_more::derive::Display; +use derive_more::From; +use rkyv::primitive::{ + ArchivedF32, ArchivedF64, ArchivedI128, ArchivedI16, ArchivedI32, ArchivedI64, ArchivedU128, + ArchivedU16, ArchivedU32, ArchivedU64, +}; +use rkyv::string::ArchivedString; + +use crate::persistence::data::util::{advance_accum_for_padding, advance_pointer_for_padding}; +use crate::persistence::data::DataType; + +#[derive(Debug, Display, From, PartialEq)] +pub enum DataTypeValue { + String(String), + I128(i128), + I64(i64), + I32(i32), + I16(i16), + I8(i8), + U128(u128), + U64(u64), + U32(u32), + U16(u16), + U8(u8), + F64(f64), + F32(f32), +} + +impl DataTypeValue { + pub fn as_data_type(&self) -> &dyn DataType { + match self { + Self::String(s) => s, + Self::I128(i) => i, + Self::I64(i) => i, + Self::I32(i) => i, + Self::I16(i) => i, + Self::I8(i) => i, + Self::U128(u) => u, + Self::U64(u) => u, + Self::U32(u) => u, + Self::U16(u) => u, + Self::U8(u) => u, + Self::F64(f) => f, + Self::F32(f) => f, + } + } +} + +impl FromStr for DataTypeValue { + type Err = (); + + fn from_str(s: &str) -> Result { + Ok(match s.as_ref() { + "String" => String::default().into(), + "i128" => i128::default().into(), + "i64" => i64::default().into(), + "i32" => i32::default().into(), + "i16" => i16::default().into(), + "i8" => i8::default().into(), + "u128" => u128::default().into(), + "u64" => u64::default().into(), + "u32" => u32::default().into(), + "u16" => u16::default().into(), + "u8" => u8::default().into(), + "f64" => f64::default().into(), + "f32" => f32::default().into(), + _ => unreachable!(), + }) + } +} + +impl DataType for String { + fn advance_accum(&self, accum: &mut usize) { + *accum = advance_accum_for_padding(*accum, 4); + *accum += size_of::(); + } + + fn from_pointer(&self, pointer: *const u8, start_pointer: *const u8) -> DataTypeValue { + let current_pointer = advance_pointer_for_padding(pointer, start_pointer, 4); + let archived_ptr: *const ArchivedString = current_pointer.cast(); + unsafe { (*archived_ptr).to_string() }.into() + } + + fn advance_pointer_for_padding(&self, pointer: &mut *const u8, start_pointer: *const u8) { + *pointer = advance_pointer_for_padding(*pointer, start_pointer, 4); + } + + fn advance_pointer(&self, pointer: &mut *const u8) { + *pointer = unsafe { pointer.add(size_of::()) }; + } +} + +macro_rules! impl_datatype { + ($datatype:ty, $archived_datatype:ty, $datatype_value:expr) => { + impl DataType for $datatype { + fn advance_accum(&self, accum: &mut usize) { + *accum = advance_accum_for_padding(*accum, size_of::<$archived_datatype>()); + *accum += size_of::<$archived_datatype>(); + } + + fn from_pointer(&self, pointer: *const u8, start_pointer: *const u8) -> DataTypeValue { + let current_pointer = advance_pointer_for_padding( + pointer, + start_pointer, + size_of::<$archived_datatype>(), + ); + let archived_ptr: *const $archived_datatype = current_pointer.cast(); + + $datatype_value(unsafe { (*archived_ptr) }.into()) + } + + fn advance_pointer_for_padding( + &self, + pointer: &mut *const u8, + start_pointer: *const u8, + ) { + *pointer = advance_pointer_for_padding( + *pointer, + start_pointer, + size_of::<$archived_datatype>(), + ); + } + + fn advance_pointer(&self, pointer: &mut *const u8) { + *pointer = unsafe { pointer.add(size_of::<$archived_datatype>()) }; + } + } + }; +} + +impl_datatype! {i128, ArchivedI128, DataTypeValue::I128} +impl_datatype! {i64, ArchivedI64, DataTypeValue::I64} +impl_datatype! {i32, ArchivedI32, DataTypeValue::I32} +impl_datatype! {i16, ArchivedI16, DataTypeValue::I16} +impl_datatype! {i8, i8, DataTypeValue::I8} +impl_datatype! {u128, ArchivedU128, DataTypeValue::U128} +impl_datatype! {u64, ArchivedU64, DataTypeValue::U64} +impl_datatype! {u32, ArchivedU32, DataTypeValue::U32} +impl_datatype! {u16, ArchivedU16, DataTypeValue::U16} +impl_datatype! {u8, u8, DataTypeValue::U8} +impl_datatype! {f64, ArchivedF64, DataTypeValue::F64} +impl_datatype! {f32, ArchivedF32, DataTypeValue::F32} diff --git a/src/persistence/data/util.rs b/src/persistence/data/util.rs new file mode 100644 index 0000000..ad816e1 --- /dev/null +++ b/src/persistence/data/util.rs @@ -0,0 +1,23 @@ +pub fn advance_accum_for_padding(mut accum: usize, padding: usize) -> usize { + if accum % padding != 0 { + accum += padding - accum % padding; + } + accum +} + +pub fn advance_pointer_for_padding( + mut current_pointer: *const u8, + start_pointer: *const u8, + padding: usize, +) -> *const u8 { + if unsafe { current_pointer.byte_offset_from(start_pointer) % padding as isize != 0 } { + current_pointer = unsafe { + current_pointer.add( + (padding as isize + - current_pointer.byte_offset_from(start_pointer) % padding as isize) + as usize, + ) + }; + } + current_pointer +} diff --git a/src/persistence/mod.rs b/src/persistence/mod.rs index 6890cba..692fc9b 100644 --- a/src/persistence/mod.rs +++ b/src/persistence/mod.rs @@ -1,3 +1,4 @@ +pub mod data; mod index; mod table; diff --git a/src/util/persistable.rs b/src/util/persistable.rs index 5b05d72..d140547 100644 --- a/src/util/persistable.rs +++ b/src/util/persistable.rs @@ -1,3 +1,28 @@ +use rkyv::rancor::Strategy; +use rkyv::ser::allocator::ArenaHandle; +use rkyv::ser::sharing::Share; +use rkyv::ser::Serializer; +use rkyv::util::AlignedVec; +use rkyv::Serialize; + pub trait Persistable { fn as_bytes(&self) -> impl AsRef<[u8]>; } + +impl Persistable for Vec +where + T: Persistable + + for<'a> Serialize< + Strategy, Share>, rkyv::rancor::Error>, + >, +{ + fn as_bytes(&self) -> impl AsRef<[u8]> { + rkyv::to_bytes::(self).unwrap() + } +} + +impl Persistable for u8 { + fn as_bytes(&self) -> impl AsRef<[u8]> { + rkyv::to_bytes::(self).unwrap() + } +}