diff --git a/Cargo.toml b/Cargo.toml index 70d1a00..a7cf6e2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,3 +19,4 @@ rkyv = { version = "0.8.9", features = ["uuid-1"] } lockfree = "0.5.1" uuid = { version = "1.11.0", features = ["v4"] } indexset = { version = "0.11.2", features = ["concurrent", "cdc", "multimap"] } +tokio = { version = "1", features = ["full"] } diff --git a/src/page/index/page.rs b/src/page/index/page.rs index 680371d..24564b9 100644 --- a/src/page/index/page.rs +++ b/src/page/index/page.rs @@ -1,11 +1,14 @@ //! [`crate::page::IndexPage`] definition. use std::fmt::Debug; -use std::fs::File; use std::hash::Hash; -use std::io::{Read, Seek, SeekFrom, Write}; +use std::io::SeekFrom; use std::mem; +use crate::page::{IndexValue, PageId}; +use crate::{ + align, align8, seek_to_page_start, Link, Persistable, SizeMeasurable, GENERAL_HEADER_SIZE, +}; use indexset::core::pair::Pair; use rkyv::de::Pool; use rkyv::rancor::Strategy; @@ -14,11 +17,8 @@ use rkyv::ser::sharing::Share; use rkyv::ser::Serializer; use rkyv::util::AlignedVec; use rkyv::{Archive, Deserialize, Serialize}; - -use crate::page::{IndexValue, PageId}; -use crate::{ - align, align8, seek_to_page_start, Link, Persistable, SizeMeasurable, GENERAL_HEADER_SIZE, -}; +use tokio::fs::File; +use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; pub fn get_index_page_size_from_data_length(length: usize) -> usize where @@ -147,7 +147,7 @@ impl IndexPage { } } - pub fn parse_index_page_utility( + pub async fn parse_index_page_utility( file: &mut File, page_id: PageId, ) -> eyre::Result> @@ -155,12 +155,12 @@ impl IndexPage { T: Archive + Default + SizeMeasurable, ::Archived: Deserialize>, { - seek_to_page_start(file, page_id.0)?; + seek_to_page_start(file, page_id.0).await?; let offset = GENERAL_HEADER_SIZE as i64; - file.seek(SeekFrom::Current(offset))?; + file.seek(SeekFrom::Current(offset)).await?; let mut size_bytes = vec![0u8; 2]; - file.read_exact(size_bytes.as_mut_slice())?; + file.read_exact(size_bytes.as_mut_slice()).await?; let archived = unsafe { rkyv::access_unchecked::<::Archived>(&size_bytes[0..2]) }; let size = @@ -168,13 +168,13 @@ impl IndexPage { let index_utility_len = Self::index_page_utility_length(size as usize); let mut index_utility_bytes = vec![0u8; index_utility_len]; - file.read_exact(index_utility_bytes.as_mut_slice())?; + file.read_exact(index_utility_bytes.as_mut_slice()).await?; let utility = Self::get_index_page_utility_from_bytes(index_utility_bytes.as_ref()); Ok(utility) } - pub fn persist_index_page_utility( + pub async fn persist_index_page_utility( file: &mut File, page_id: PageId, utility: IndexPageUtility, @@ -187,29 +187,33 @@ impl IndexPage { Strategy, Share>, rkyv::rancor::Error>, >, { - seek_to_page_start(file, page_id.0)?; + seek_to_page_start(file, page_id.0).await?; file.seek(SeekFrom::Current( GENERAL_HEADER_SIZE as i64 + u16::default().aligned_size() as i64, - ))?; - - let node_id_bytes = rkyv::to_bytes::(&utility.node_id)?; - file.write_all(node_id_bytes.as_slice())?; - let current_index_bytes = rkyv::to_bytes::(&utility.current_index)?; - file.write_all(current_index_bytes.as_slice())?; - let current_length_bytes = rkyv::to_bytes::(&utility.current_length)?; - file.write_all(current_length_bytes.as_slice())?; - let slots_bytes = rkyv::to_bytes::(&utility.slots)?; - file.write_all(slots_bytes.as_slice())?; + )) + .await?; + + let mut bytes = vec![]; + bytes + .extend_from_slice(rkyv::to_bytes::(&utility.node_id)?.as_slice()); + bytes.extend_from_slice( + rkyv::to_bytes::(&utility.current_index)?.as_slice(), + ); + bytes.extend_from_slice( + rkyv::to_bytes::(&utility.current_length)?.as_slice(), + ); + bytes.extend_from_slice(rkyv::to_bytes::(&utility.slots)?.as_slice()); + file.write_all(bytes.as_slice()).await?; Ok(()) } - fn read_value(file: &mut File) -> eyre::Result> + async fn read_value(file: &mut File) -> eyre::Result> where T: Archive + Default + SizeMeasurable, ::Archived: Deserialize>, { let mut bytes = vec![0u8; align8(IndexValue::::default().aligned_size())]; - file.read_exact(bytes.as_mut_slice())?; + file.read_exact(bytes.as_mut_slice()).await?; let mut v = AlignedVec::<4>::new(); v.extend_from_slice(bytes.as_slice()); let archived = @@ -217,7 +221,7 @@ impl IndexPage { Ok(rkyv::deserialize(archived).expect("data should be valid")) } - pub fn read_value_with_index( + pub async fn read_value_with_index( file: &mut File, page_id: PageId, size: usize, @@ -227,11 +231,11 @@ impl IndexPage { T: Archive + Default + SizeMeasurable, ::Archived: Deserialize>, { - seek_to_page_start(file, page_id.0)?; + seek_to_page_start(file, page_id.0).await?; let offset = Self::get_value_offset(size, index); - file.seek(SeekFrom::Current(offset as i64))?; - Self::read_value(file) + file.seek(SeekFrom::Current(offset as i64)).await?; + Self::read_value(file).await } fn get_value_offset(size: usize, value_index: usize) -> usize @@ -249,7 +253,7 @@ impl IndexPage { offset } - pub fn persist_value( + pub async fn persist_value( file: &mut File, page_id: PageId, size: usize, @@ -266,25 +270,25 @@ impl IndexPage { >, ::Archived: Deserialize>, { - seek_to_page_start(file, page_id.0)?; + seek_to_page_start(file, page_id.0).await?; let offset = Self::get_value_offset(size, value_index as usize); - file.seek(SeekFrom::Current(offset as i64))?; + file.seek(SeekFrom::Current(offset as i64)).await?; let bytes = rkyv::to_bytes::(&value)?; - file.write_all(bytes.as_slice())?; + file.write_all(bytes.as_slice()).await?; if value_index != size as u16 - 1 { - let mut value = Self::read_value(file)?; + let mut value = Self::read_value(file).await?; while value != IndexValue::default() { value_index += 1; - value = Self::read_value(file)?; + value = Self::read_value(file).await?; } } Ok(value_index + 1) } - pub fn remove_value( + pub async fn remove_value( file: &mut File, page_id: PageId, size: usize, @@ -300,13 +304,13 @@ impl IndexPage { >, ::Archived: Deserialize>, { - seek_to_page_start(file, page_id.0)?; + seek_to_page_start(file, page_id.0).await?; let offset = Self::get_value_offset(size, value_index as usize); - file.seek(SeekFrom::Current(offset as i64))?; + file.seek(SeekFrom::Current(offset as i64)).await?; let value = IndexValue::::default(); let bytes = rkyv::to_bytes::(&value)?; - file.write_all(bytes.as_slice())?; + file.write_all(bytes.as_slice()).await?; Ok(()) } diff --git a/src/page/util.rs b/src/page/util.rs index c82f7fe..08079a6 100644 --- a/src/page/util.rs +++ b/src/page/util.rs @@ -1,9 +1,9 @@ -use std::io; -use std::io::prelude::*; - use eyre::eyre; use rkyv::api::high::HighDeserializer; use rkyv::Archive; +use std::io::SeekFrom; +use tokio::fs::File; +use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; use super::index::IndexValue; use super::SpaceInfoPage; @@ -69,44 +69,48 @@ pub fn map_data_pages_to_general( general_pages } -pub fn persist_page(page: &mut GeneralPage, file: &mut std::fs::File) -> eyre::Result<()> +pub async fn persist_page<'a, T>( + page: &'a mut GeneralPage, + file: &'a mut File, +) -> eyre::Result<()> where - T: Persistable, + T: Persistable + Send + Sync, { - use std::io::prelude::*; - - seek_to_page_start(file, page.header.page_id.0)?; + seek_to_page_start(file, page.header.page_id.0).await?; let page_count = page.header.page_id.0 as i64 + 1; let inner_bytes = page.inner.as_bytes(); page.header.data_length = inner_bytes.as_ref().len() as u32; - file.write_all(page.header.as_bytes().as_ref())?; - file.write_all(inner_bytes.as_ref())?; - let curr_position = file.stream_position()?; - file.seek(io::SeekFrom::Current( + file.write_all(page.header.as_bytes().as_ref()).await?; + file.write_all(inner_bytes.as_ref()).await?; + let curr_position = file.stream_position().await?; + file.seek(SeekFrom::Current( (page_count * PAGE_SIZE as i64) - curr_position as i64, - ))?; + )) + .await?; Ok(()) } -pub fn seek_to_page_start(file: &mut std::fs::File, index: u32) -> eyre::Result<()> { - file.seek(io::SeekFrom::Start(index as u64 * PAGE_SIZE as u64))?; +pub async fn seek_to_page_start(file: &mut File, index: u32) -> eyre::Result<()> { + file.seek(SeekFrom::Start(index as u64 * PAGE_SIZE as u64)) + .await?; Ok(()) } -pub fn seek_by_link(file: &mut std::fs::File, link: Link) -> eyre::Result<()> { - file.seek(io::SeekFrom::Start( +pub async fn seek_by_link(file: &mut File, link: Link) -> eyre::Result<()> { + file.seek(SeekFrom::Start( link.page_id.0 as u64 * PAGE_SIZE as u64 + GENERAL_HEADER_SIZE as u64 + link.offset as u64, - ))?; + )) + .await?; Ok(()) } -pub fn update_at( - file: &mut std::fs::File, +pub async fn update_at( + file: &mut File, link: Link, new_data: &[u8], ) -> eyre::Result<()> { @@ -127,14 +131,14 @@ pub fn update_at( )); } - seek_by_link(file, link)?; - file.write_all(new_data)?; + seek_by_link(file, link).await?; + file.write_all(new_data).await?; Ok(()) } -pub fn parse_general_header(file: &mut std::fs::File) -> eyre::Result { +pub async fn parse_general_header(file: &mut File) -> eyre::Result { let mut buffer = [0; GENERAL_HEADER_SIZE]; - file.read_exact(&mut buffer)?; + file.read_exact(&mut buffer).await?; let archived = unsafe { rkyv::access_unchecked::<::Archived>(&buffer[..]) }; let header = @@ -143,8 +147,8 @@ pub fn parse_general_header(file: &mut std::fs::File) -> eyre::Result( - file: &mut std::fs::File, +pub async fn parse_page( + file: &mut File, index: u32, ) -> eyre::Result> where @@ -152,8 +156,8 @@ where ::Archived: rkyv::Deserialize>, { - seek_to_page_start(file, index)?; - let header = parse_general_header(file)?; + seek_to_page_start(file, index).await?; + let header = parse_general_header(file).await?; let length = if header.data_length == 0 { PAGE_SIZE } else { @@ -161,7 +165,7 @@ where }; let mut buffer: Vec = vec![0u8; length as usize]; - file.read_exact(&mut buffer)?; + file.read_exact(&mut buffer).await?; let info = Page::from_bytes(buffer.as_ref()); Ok(GeneralPage { @@ -170,19 +174,19 @@ where }) } -pub fn parse_data_page( - file: &mut std::fs::File, +pub async fn parse_data_page( + file: &mut File, index: u32, ) -> eyre::Result>> { - seek_to_page_start(file, index)?; - let header = parse_general_header(file)?; + seek_to_page_start(file, index).await?; + let header = parse_general_header(file).await?; let mut buffer = [0u8; INNER_PAGE_SIZE]; if header.next_id == 0.into() { #[allow(clippy::unused_io_amount)] - file.read(&mut buffer)?; + file.read(&mut buffer).await?; } else { - file.read_exact(&mut buffer)?; + file.read_exact(&mut buffer).await?; } let data = DataPage { @@ -220,19 +224,19 @@ pub fn parse_data_page( // Ok(parsed_record) // } -pub fn parse_index_page( - file: &mut std::fs::File, +pub async fn parse_index_page( + file: &mut File, index: u32, ) -> eyre::Result>> where T: Archive, ::Archived: rkyv::Deserialize>, { - seek_to_page_start(file, index)?; - let header = parse_general_header(file)?; + seek_to_page_start(file, index).await?; + let header = parse_general_header(file).await?; let mut buffer: Vec = vec![0u8; header.data_length as usize]; - file.read_exact(&mut buffer)?; + file.read_exact(&mut buffer).await?; let archived = unsafe { rkyv::access_unchecked::< as Archive>::Archived>(&buffer[..]) }; let index_records: Vec> = rkyv::deserialize::, _>(archived) @@ -242,14 +246,14 @@ where Ok(index_records) } -pub fn parse_space_info( - file: &mut std::fs::File, +pub async fn parse_space_info( + file: &mut File, ) -> eyre::Result { - file.seek(io::SeekFrom::Start(0))?; - let header = parse_general_header(file)?; + file.seek(SeekFrom::Start(0)).await?; + let header = parse_general_header(file).await?; let mut buffer = vec![0u8; header.data_length as usize]; - file.read_exact(&mut buffer)?; + file.read_exact(&mut buffer).await?; let archived = unsafe { rkyv::access_unchecked::<::Archived>(&buffer[..]) }; let space_info: SpaceInfoPage = diff --git a/src/util/persistable.rs b/src/util/persistable.rs index eb342a3..8fb5620 100644 --- a/src/util/persistable.rs +++ b/src/util/persistable.rs @@ -1,4 +1,5 @@ use crate::SizeMeasurable; + use rkyv::de::Pool; use rkyv::rancor::Strategy; use rkyv::ser::allocator::ArenaHandle; @@ -8,7 +9,7 @@ use rkyv::util::AlignedVec; use rkyv::{Archive, Deserialize, Serialize}; pub trait Persistable { - fn as_bytes(&self) -> impl AsRef<[u8]>; + fn as_bytes(&self) -> impl AsRef<[u8]> + Send; fn from_bytes(bytes: &[u8]) -> Self; }