Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ rkyv = { version = "0.8.9", features = ["uuid-1"] }
lockfree = "0.5.1"
uuid = { version = "1.11.0", features = ["v4"] }
indexset = { version = "0.11.2", features = ["concurrent", "cdc", "multimap"] }
tokio = { version = "1", features = ["full"] }
84 changes: 44 additions & 40 deletions src/page/index/page.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
//! [`crate::page::IndexPage`] definition.

use std::fmt::Debug;
use std::fs::File;
use std::hash::Hash;
use std::io::{Read, Seek, SeekFrom, Write};
use std::io::SeekFrom;
use std::mem;

use crate::page::{IndexValue, PageId};
use crate::{
align, align8, seek_to_page_start, Link, Persistable, SizeMeasurable, GENERAL_HEADER_SIZE,
};
use indexset::core::pair::Pair;
use rkyv::de::Pool;
use rkyv::rancor::Strategy;
Expand All @@ -14,11 +17,8 @@ use rkyv::ser::sharing::Share;
use rkyv::ser::Serializer;
use rkyv::util::AlignedVec;
use rkyv::{Archive, Deserialize, Serialize};

use crate::page::{IndexValue, PageId};
use crate::{
align, align8, seek_to_page_start, Link, Persistable, SizeMeasurable, GENERAL_HEADER_SIZE,
};
use tokio::fs::File;
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};

pub fn get_index_page_size_from_data_length<T>(length: usize) -> usize
where
Expand Down Expand Up @@ -147,34 +147,34 @@ impl<T> IndexPage<T> {
}
}

pub fn parse_index_page_utility(
pub async fn parse_index_page_utility(
file: &mut File,
page_id: PageId,
) -> eyre::Result<IndexPageUtility<T>>
where
T: Archive + Default + SizeMeasurable,
<T as Archive>::Archived: Deserialize<T, Strategy<Pool, rkyv::rancor::Error>>,
{
seek_to_page_start(file, page_id.0)?;
seek_to_page_start(file, page_id.0).await?;
let offset = GENERAL_HEADER_SIZE as i64;
file.seek(SeekFrom::Current(offset))?;
file.seek(SeekFrom::Current(offset)).await?;

let mut size_bytes = vec![0u8; 2];
file.read_exact(size_bytes.as_mut_slice())?;
file.read_exact(size_bytes.as_mut_slice()).await?;
let archived =
unsafe { rkyv::access_unchecked::<<u16 as Archive>::Archived>(&size_bytes[0..2]) };
let size =
rkyv::deserialize::<u16, rkyv::rancor::Error>(archived).expect("data should be valid");

let index_utility_len = Self::index_page_utility_length(size as usize);
let mut index_utility_bytes = vec![0u8; index_utility_len];
file.read_exact(index_utility_bytes.as_mut_slice())?;
file.read_exact(index_utility_bytes.as_mut_slice()).await?;
let utility = Self::get_index_page_utility_from_bytes(index_utility_bytes.as_ref());

Ok(utility)
}

pub fn persist_index_page_utility(
pub async fn persist_index_page_utility(
file: &mut File,
page_id: PageId,
utility: IndexPageUtility<T>,
Expand All @@ -187,37 +187,41 @@ impl<T> IndexPage<T> {
Strategy<Serializer<AlignedVec, ArenaHandle<'a>, Share>, rkyv::rancor::Error>,
>,
{
seek_to_page_start(file, page_id.0)?;
seek_to_page_start(file, page_id.0).await?;
file.seek(SeekFrom::Current(
GENERAL_HEADER_SIZE as i64 + u16::default().aligned_size() as i64,
))?;

let node_id_bytes = rkyv::to_bytes::<rkyv::rancor::Error>(&utility.node_id)?;
file.write_all(node_id_bytes.as_slice())?;
let current_index_bytes = rkyv::to_bytes::<rkyv::rancor::Error>(&utility.current_index)?;
file.write_all(current_index_bytes.as_slice())?;
let current_length_bytes = rkyv::to_bytes::<rkyv::rancor::Error>(&utility.current_length)?;
file.write_all(current_length_bytes.as_slice())?;
let slots_bytes = rkyv::to_bytes::<rkyv::rancor::Error>(&utility.slots)?;
file.write_all(slots_bytes.as_slice())?;
))
.await?;

let mut bytes = vec![];
bytes
.extend_from_slice(rkyv::to_bytes::<rkyv::rancor::Error>(&utility.node_id)?.as_slice());
bytes.extend_from_slice(
rkyv::to_bytes::<rkyv::rancor::Error>(&utility.current_index)?.as_slice(),
);
bytes.extend_from_slice(
rkyv::to_bytes::<rkyv::rancor::Error>(&utility.current_length)?.as_slice(),
);
bytes.extend_from_slice(rkyv::to_bytes::<rkyv::rancor::Error>(&utility.slots)?.as_slice());
file.write_all(bytes.as_slice()).await?;
Ok(())
}

fn read_value(file: &mut File) -> eyre::Result<IndexValue<T>>
async fn read_value(file: &mut File) -> eyre::Result<IndexValue<T>>
where
T: Archive + Default + SizeMeasurable,
<T as Archive>::Archived: Deserialize<T, Strategy<Pool, rkyv::rancor::Error>>,
{
let mut bytes = vec![0u8; align8(IndexValue::<T>::default().aligned_size())];
file.read_exact(bytes.as_mut_slice())?;
file.read_exact(bytes.as_mut_slice()).await?;
let mut v = AlignedVec::<4>::new();
v.extend_from_slice(bytes.as_slice());
let archived =
unsafe { rkyv::access_unchecked::<<IndexValue<T> as Archive>::Archived>(&v[..]) };
Ok(rkyv::deserialize(archived).expect("data should be valid"))
}

pub fn read_value_with_index(
pub async fn read_value_with_index(
file: &mut File,
page_id: PageId,
size: usize,
Expand All @@ -227,11 +231,11 @@ impl<T> IndexPage<T> {
T: Archive + Default + SizeMeasurable,
<T as Archive>::Archived: Deserialize<T, Strategy<Pool, rkyv::rancor::Error>>,
{
seek_to_page_start(file, page_id.0)?;
seek_to_page_start(file, page_id.0).await?;

let offset = Self::get_value_offset(size, index);
file.seek(SeekFrom::Current(offset as i64))?;
Self::read_value(file)
file.seek(SeekFrom::Current(offset as i64)).await?;
Self::read_value(file).await
}

fn get_value_offset(size: usize, value_index: usize) -> usize
Expand All @@ -249,7 +253,7 @@ impl<T> IndexPage<T> {
offset
}

pub fn persist_value(
pub async fn persist_value(
file: &mut File,
page_id: PageId,
size: usize,
Expand All @@ -266,25 +270,25 @@ impl<T> IndexPage<T> {
>,
<T as Archive>::Archived: Deserialize<T, Strategy<Pool, rkyv::rancor::Error>>,
{
seek_to_page_start(file, page_id.0)?;
seek_to_page_start(file, page_id.0).await?;

let offset = Self::get_value_offset(size, value_index as usize);
file.seek(SeekFrom::Current(offset as i64))?;
file.seek(SeekFrom::Current(offset as i64)).await?;
let bytes = rkyv::to_bytes::<rkyv::rancor::Error>(&value)?;
file.write_all(bytes.as_slice())?;
file.write_all(bytes.as_slice()).await?;

if value_index != size as u16 - 1 {
let mut value = Self::read_value(file)?;
let mut value = Self::read_value(file).await?;
while value != IndexValue::default() {
value_index += 1;
value = Self::read_value(file)?;
value = Self::read_value(file).await?;
}
}

Ok(value_index + 1)
}

pub fn remove_value(
pub async fn remove_value(
file: &mut File,
page_id: PageId,
size: usize,
Expand All @@ -300,13 +304,13 @@ impl<T> IndexPage<T> {
>,
<T as Archive>::Archived: Deserialize<T, Strategy<Pool, rkyv::rancor::Error>>,
{
seek_to_page_start(file, page_id.0)?;
seek_to_page_start(file, page_id.0).await?;

let offset = Self::get_value_offset(size, value_index as usize);
file.seek(SeekFrom::Current(offset as i64))?;
file.seek(SeekFrom::Current(offset as i64)).await?;
let value = IndexValue::<T>::default();
let bytes = rkyv::to_bytes::<rkyv::rancor::Error>(&value)?;
file.write_all(bytes.as_slice())?;
file.write_all(bytes.as_slice()).await?;

Ok(())
}
Expand Down
Loading
Loading