Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
38eba34
Add `row_schema` field to `SpaceInfo`
a-rodin Dec 5, 2024
927c95f
WIP reading of fields
a-rodin Dec 9, 2024
b8c9124
Store column data types as strings
a-rodin Dec 9, 2024
582a2ac
Rename `secondary_index_map` to `secondary_index_types`
a-rodin Dec 10, 2024
ddd7de2
Merge branch 'main' into row_schema
a-rodin Dec 12, 2024
858aaa7
Add a function that reads arbitrary archived structs
a-rodin Dec 13, 2024
cff4ebf
Support more types and correct padding in `parse_archived_row` function
a-rodin Dec 16, 2024
ed0f458
Make code more DRY
a-rodin Dec 16, 2024
b17598e
Support `f64` and `f32` data types
a-rodin Dec 16, 2024
7d839e3
Make usage of commas in the match arms more consistent
a-rodin Dec 16, 2024
276b5bc
Rename the test for `parse_archived_row`
a-rodin Dec 16, 2024
ff90def
Merge branch 'main' into parse_rkyv_data
a-rodin Dec 16, 2024
5e606b8
Add `primary_key_type` field
a-rodin Dec 17, 2024
fc147ac
Change `primary_key_type` to `primary_key_fields` in `SpaceInfo`
a-rodin Dec 17, 2024
3ef34f0
Fix `test_as_bytes` test
a-rodin Dec 17, 2024
6ca08cc
Storea and read vectors of index records in index pages
a-rodin Dec 17, 2024
cc1e313
WIP reading of data pages
a-rodin Dec 17, 2024
6899c9d
Merge branch 'parse_rkyv_data' into row_schema
a-rodin Dec 17, 2024
e719f0b
An implementation of reading rows from the database
a-rodin Dec 17, 2024
6f62940
corrections
Handy-caT Dec 17, 2024
7cfd277
Implement `DataType` for numerical types
a-rodin Dec 18, 2024
aaf7d45
Support all data types
a-rodin Dec 19, 2024
ea2529d
Run `cargo fmt`
a-rodin Dec 19, 2024
0ba0db1
Remove unused imports
a-rodin Dec 19, 2024
ac882f9
Start implementing a test for reading row data
a-rodin Dec 19, 2024
cedd6b0
Create a mock database for `test_read_table_data` test
a-rodin Dec 23, 2024
e4b3386
Read vectors of `IndexValue` instead of vectors of `IndexData`
a-rodin Dec 23, 2024
878f061
Make `test_read_table_data` test pass
a-rodin Dec 23, 2024
de423bb
Merge branch 'main' into row_schema
a-rodin Dec 23, 2024
cf90eda
Run `cargo fmt`
a-rodin Dec 23, 2024
bdf019f
Add `parse_data_page` function
a-rodin Dec 23, 2024
e9ffb39
Support more primary key data types
a-rodin Dec 24, 2024
6cc2530
Make intervals closed
a-rodin Dec 24, 2024
4b9c15c
Fix a broken test
a-rodin Dec 24, 2024
9d22e37
Add `read_rows_schema` function
a-rodin Dec 24, 2024
6dd3b95
Add `Display` trait to `DataValueType`
a-rodin Dec 24, 2024
94f25fd
Add `PageIterator`
a-rodin Dec 25, 2024
0fc7a8a
Add `LinkIterator`
a-rodin Dec 25, 2024
3de33c1
Use absolute seek instead of relative seeks
a-rodin Dec 25, 2024
5db7cb2
Add `DataIterator` and a test for it
a-rodin Dec 25, 2024
a8a2412
Remove an unused import
a-rodin Dec 25, 2024
b46ff54
Infer the type of the primary key from SpaceInfo
a-rodin Dec 27, 2024
1812810
Make the code more DRY
a-rodin Dec 27, 2024
7a49d0d
Remove unused imports and unneeded `mut`
a-rodin Dec 30, 2024
27ff08d
Merge branch 'main' into row_schema
a-rodin Dec 30, 2024
c52f5b8
Merge branch `main`
a-rodin Dec 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ pub use link::Link;
pub use data_bucket_codegen::SizeMeasure;
pub use page::{
map_data_pages_to_general, map_index_pages_to_general, map_tree_index, map_unique_tree_index,
parse_data_page, parse_index_page, parse_page, persist_page, read_index_pages, seek_by_link,
seek_to_page_start, update_at, Data as DataPage, DataType, General as GeneralPage,
GeneralHeader, IndexPage as IndexData, Interval, PageType, SpaceInfo as SpaceInfoData,
DATA_VERSION, GENERAL_HEADER_SIZE, INNER_PAGE_SIZE, PAGE_SIZE,
parse_data_page, parse_index_page, parse_page, persist_page, read_data_pages, read_index_pages,
read_rows_schema, seek_by_link, seek_to_page_start, update_at, Data as DataPage,
General as GeneralPage, GeneralHeader, IndexPage as IndexData, Interval, PageType,
SpaceInfo as SpaceInfoData, DATA_VERSION, GENERAL_HEADER_SIZE, INNER_PAGE_SIZE, PAGE_SIZE,
};
pub use persistence::{PersistableIndex, PersistableTable};
pub use util::{align, Persistable, SizeMeasurable};
8 changes: 0 additions & 8 deletions src/page/data_type.rs

This file was deleted.

290 changes: 290 additions & 0 deletions src/page/iterators.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,290 @@
use std::io::Read;

use rkyv::{de::Pool, rancor::Strategy, Archive, DeserializeUnsized};

use crate::{
page::util::parse_general_header,
persistence::data::{rkyv_data::parse_archived_row, DataTypeValue},
IndexData, Link,
};

use super::{index::{ArchivedIndexValue, IndexValue}, seek_by_link, seek_to_page_start, Interval, SpaceInfo};

pub struct PageIterator {
intervals: Vec<Interval>,
current_intervals_index: usize,
current_position_in_interval: usize,
}

impl PageIterator {
pub fn new(intervals: Vec<Interval>) -> PageIterator {
PageIterator {
current_intervals_index: 0,
current_position_in_interval: if intervals.len() > 0 {
intervals[0].0
} else {
0
},
intervals,
}
}
}

impl Iterator for PageIterator {
type Item = u32;

fn next(&mut self) -> Option<Self::Item> {
let mut result: Option<Self::Item> = None;

if self.current_intervals_index >= self.intervals.len() {
result = None
} else if self.current_position_in_interval
>= self.intervals[self.current_intervals_index].0
&& self.current_position_in_interval <= self.intervals[self.current_intervals_index].1
{
result = Some(self.current_position_in_interval as u32);
self.current_position_in_interval += 1;
} else if self.current_position_in_interval > self.intervals[self.current_intervals_index].1
{
self.current_intervals_index += 1;
if self.current_intervals_index >= self.intervals.len() {
result = None;
} else {
self.current_position_in_interval = self.intervals[self.current_intervals_index].0;
result = Some(self.current_position_in_interval as u32);
self.current_position_in_interval += 1;
}
}

result
}
}

pub struct LinksIterator<'a> {
file: &'a mut std::fs::File,
page_id: u32,
links: Option<Vec<Link>>,
link_index: usize,
primary_key_type: String,
}

impl<'a> LinksIterator<'a> {
pub fn new(
file: &'a mut std::fs::File,
page_id: u32,
space_info: &SpaceInfo,
) -> LinksIterator<'a> {
let primary_key_fields = &space_info.primary_key_fields;
let primary_key_type = space_info
.row_schema
.iter()
.filter(|(field_name, _field_type)| field_name == &primary_key_fields[0])
.map(|(_field_name, field_type)| field_type)
.take(1)
.collect::<Vec<&String>>()[0];
LinksIterator {
file,
page_id,
links: None,
link_index: 0,
primary_key_type: primary_key_type.clone(),
}
}
}

fn parse_links<T>(buffer: &[u8]) -> Vec<Link>
where T: Archive,
[ArchivedIndexValue<T>]: DeserializeUnsized<[IndexValue<T>], Strategy<Pool, rkyv::rancor::Error>>
{
let archived = unsafe {
rkyv::access_unchecked::<<IndexData<T> as Archive>::Archived>(
&buffer[..],
)
};
let index_records =
rkyv::deserialize::<IndexData<T>, rkyv::rancor::Error>(archived)
.expect("data should be valid")
.index_values;

index_records
.iter()
.map(|index_value| index_value.link)
.collect::<Vec<_>>()
}

impl Iterator for LinksIterator<'_> {
type Item = Link;

fn next(&mut self) -> Option<Self::Item> {
if self.links.is_none() {
seek_to_page_start(&mut self.file, self.page_id).expect("page should be seekable");
let header = parse_general_header(&mut self.file).expect("header should be readable");

let mut buffer: Vec<u8> = vec![0u8; header.data_length as usize];
self.file
.read_exact(&mut buffer)
.expect("index data should be readable");

self.links = Some(match self.primary_key_type.as_str() {
"String" => parse_links::<String>(&buffer),
"i128" => parse_links::<i128>(&buffer),
"i64" => parse_links::<i64>(&buffer),
"i32" => parse_links::<i32>(&buffer),
"i16" => parse_links::<i16>(&buffer),
"i8" => parse_links::<i8>(&buffer),
"u128" => parse_links::<u128>(&buffer),
"u64" => parse_links::<u64>(&buffer),
"u32" => parse_links::<u32>(&buffer),
"u16" => parse_links::<u16>(&buffer),
"u8" => parse_links::<u8>(&buffer),
"f64" => parse_links::<f64>(&buffer),
"f32" => parse_links::<f32>(&buffer),
_ => panic!(
"Unsupported primary key data type `{}`",
self.primary_key_type
),
});
}

if self.link_index < self.links.as_deref().unwrap().len() {
let result = Some(self.links.as_deref().unwrap()[self.link_index]);
self.link_index += 1;
result
} else {
None
}
}
}

pub struct DataIterator<'a> {
file: &'a mut std::fs::File,
schema: Vec<(String, String)>,
links: Vec<Link>,
link_index: usize,
}

impl DataIterator<'_> {
pub fn new(
file: &mut std::fs::File,
schema: Vec<(String, String)>,
mut links: Vec<Link>,
) -> DataIterator<'_> {
links.sort_by(|a, b| {
(a.page_id, a.offset)
.partial_cmp(&(b.page_id, b.offset))
.unwrap()
});

DataIterator {
file,
schema,
links,
link_index: 0,
}
}
}

impl Iterator for DataIterator<'_> {
type Item = Vec<DataTypeValue>;

fn next(&mut self) -> Option<Self::Item> {
if self.link_index >= self.links.len() {
return None;
}

let current_link = self.links[self.link_index];
seek_by_link(&mut self.file, current_link).expect("the seek should be successful");
let mut buffer = vec![0u8; current_link.length as usize];
self.file
.read_exact(&mut buffer)
.expect("the data should be read");
let row = parse_archived_row(&buffer, &self.schema);

self.link_index += 1;

Some(row)
}
}

#[cfg(test)]
mod test {
use crate::{
page::{iterators::DataIterator, util::parse_space_info, PageId},
persistence::data::DataTypeValue,
Interval, Link, PAGE_SIZE,
};

use super::{LinksIterator, PageIterator};

#[test]
fn test_page_iterator() {
let interval1 = Interval(1, 2);
let interval2 = Interval(5, 7);
let page_iterator = PageIterator::new(vec![interval1, interval2]);
let collected = page_iterator.collect::<Vec<_>>();
assert_eq!(collected, vec![1, 2, 5, 6, 7]);
}

#[test]
fn test_page_iterator_empty() {
let page_iterator = PageIterator::new(vec![]);
let collected = page_iterator.collect::<Vec<_>>();
assert_eq!(collected, Vec::<u32>::new());
}

#[test]
fn test_links_iterator() {
let filename = "tests/data/table_links_test.wt";
super::super::util::test::create_test_database_file(filename);

let mut file = std::fs::File::open(filename).unwrap();
let space_info = parse_space_info::<PAGE_SIZE>(&mut file).unwrap();
let links = LinksIterator::<'_>::new(&mut file, 1, &space_info);
assert_eq!(
links.collect::<Vec<_>>(),
vec![
Link {
page_id: PageId(2),
offset: 0,
length: 24
},
Link {
page_id: PageId(2),
offset: 24,
length: 28
}
]
);
}

#[test]
fn test_pages_and_links_iterators() {
let filename = "tests/data/table_pages_and_links_test.wt";
super::super::util::test::create_test_database_file(filename);

let mut file = std::fs::File::open(filename).unwrap();
let space_info = parse_space_info::<PAGE_SIZE>(&mut file).unwrap();
let index_intervals = space_info.primary_key_intervals.clone();

let pages_ids = PageIterator::new(index_intervals).collect::<Vec<_>>();
assert_eq!(pages_ids, vec![1]);

let links =
LinksIterator::<'_>::new(&mut file, pages_ids[0], &space_info).collect::<Vec<_>>();
let data_iterator: DataIterator<'_> =
DataIterator::new(&mut file, space_info.row_schema, links);
assert_eq!(
data_iterator.collect::<Vec<_>>(),
vec![
vec![
DataTypeValue::I32(1),
DataTypeValue::String("first string".to_string())
],
vec![
DataTypeValue::I32(2),
DataTypeValue::String("second string".to_string())
]
]
);
}
}
8 changes: 4 additions & 4 deletions src/page/mod.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
mod data;
mod data_type;
mod header;
mod index;
mod iterators;
mod space_info;
mod ty;
mod util;

use derive_more::{Display, From};
use rkyv::{Archive, Deserialize, Serialize};

pub use data::Data;
pub use data_type::DataType;
pub use header::{GeneralHeader, DATA_VERSION};
pub use index::{map_tree_index, map_unique_tree_index, IndexPage};
pub use iterators::{DataIterator, LinksIterator, PageIterator};
pub use space_info::{Interval, SpaceInfo};
pub use ty::PageType;
pub use util::{
map_data_pages_to_general, map_index_pages_to_general, parse_data_page, parse_index_page,
parse_page, persist_page, read_index_pages, seek_by_link, seek_to_page_start, update_at,
parse_page, parse_space_info, persist_page, read_data_pages, read_index_pages,
read_rows_schema, seek_by_link, seek_to_page_start, update_at,
};

// TODO: Move to config
Expand Down
9 changes: 6 additions & 3 deletions src/page/space_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use rkyv::util::AlignedVec;
use rkyv::{Archive, Deserialize, Serialize};

use crate::util::Persistable;
use crate::DataType;
use crate::{space, Link};

pub type SpaceName = String;
Expand All @@ -23,12 +22,14 @@ pub struct SpaceInfo<Pk = ()> {
pub id: space::Id,
pub page_count: u32,
pub name: SpaceName,
pub row_schema: Vec<(String, String)>,
pub primary_key_fields: Vec<String>,
pub primary_key_intervals: Vec<Interval>,
pub secondary_index_types: Vec<(String, String)>,
pub secondary_index_intervals: HashMap<String, Vec<Interval>>,
pub data_intervals: Vec<Interval>,
pub pk_gen_state: Pk,
pub empty_links_list: Vec<Link>,
pub secondary_index_map: HashMap<String, DataType>,
}

/// Represents some interval between values.
Expand Down Expand Up @@ -66,12 +67,14 @@ mod test {
id: 0.into(),
page_count: 0,
name: "Test".to_string(),
row_schema: vec![],
primary_key_fields: vec![],
primary_key_intervals: vec![],
secondary_index_intervals: HashMap::new(),
data_intervals: vec![],
pk_gen_state: (),
empty_links_list: vec![],
secondary_index_map: HashMap::new(),
secondary_index_types: vec![],
};
let bytes = info.as_bytes();
assert!(bytes.as_ref().len() < INNER_PAGE_SIZE)
Expand Down
Loading