From 2bdb70adabc55b6e73b8e3e2875932e43880af41 Mon Sep 17 00:00:00 2001 From: Handy-caT <37216852+Handy-caT@users.noreply.github.com> Date: Sun, 9 Mar 2025 18:45:48 +0300 Subject: [PATCH 1/4] move to async i/o --- Cargo.toml | 1 + src/page/index/page.rs | 87 ++++++++++++++++++++++------------------- src/page/util.rs | 89 +++++++++++++++++++++--------------------- 3 files changed, 93 insertions(+), 84 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 70d1a00..a7cf6e2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,3 +19,4 @@ rkyv = { version = "0.8.9", features = ["uuid-1"] } lockfree = "0.5.1" uuid = { version = "1.11.0", features = ["v4"] } indexset = { version = "0.11.2", features = ["concurrent", "cdc", "multimap"] } +tokio = { version = "1", features = ["full"] } diff --git a/src/page/index/page.rs b/src/page/index/page.rs index 680371d..74dc544 100644 --- a/src/page/index/page.rs +++ b/src/page/index/page.rs @@ -1,11 +1,14 @@ //! [`crate::page::IndexPage`] definition. use std::fmt::Debug; -use std::fs::File; use std::hash::Hash; -use std::io::{Read, Seek, SeekFrom, Write}; +use std::io::SeekFrom; use std::mem; +use crate::page::{IndexValue, PageId}; +use crate::{ + align, align8, seek_to_page_start, Link, Persistable, SizeMeasurable, GENERAL_HEADER_SIZE, +}; use indexset::core::pair::Pair; use rkyv::de::Pool; use rkyv::rancor::Strategy; @@ -14,11 +17,8 @@ use rkyv::ser::sharing::Share; use rkyv::ser::Serializer; use rkyv::util::AlignedVec; use rkyv::{Archive, Deserialize, Serialize}; - -use crate::page::{IndexValue, PageId}; -use crate::{ - align, align8, seek_to_page_start, Link, Persistable, SizeMeasurable, GENERAL_HEADER_SIZE, -}; +use tokio::fs::File; +use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; pub fn get_index_page_size_from_data_length(length: usize) -> usize where @@ -147,7 +147,7 @@ impl IndexPage { } } - pub fn parse_index_page_utility( + pub async fn parse_index_page_utility( file: &mut File, page_id: PageId, ) -> eyre::Result> @@ -155,12 +155,12 @@ impl IndexPage { T: Archive + Default + SizeMeasurable, ::Archived: Deserialize>, { - seek_to_page_start(file, page_id.0)?; + seek_to_page_start(file, page_id.0).await?; let offset = GENERAL_HEADER_SIZE as i64; - file.seek(SeekFrom::Current(offset))?; + file.seek(SeekFrom::Current(offset)).await?; let mut size_bytes = vec![0u8; 2]; - file.read_exact(size_bytes.as_mut_slice())?; + file.read_exact(size_bytes.as_mut_slice()).await?; let archived = unsafe { rkyv::access_unchecked::<::Archived>(&size_bytes[0..2]) }; let size = @@ -168,13 +168,13 @@ impl IndexPage { let index_utility_len = Self::index_page_utility_length(size as usize); let mut index_utility_bytes = vec![0u8; index_utility_len]; - file.read_exact(index_utility_bytes.as_mut_slice())?; + file.read_exact(index_utility_bytes.as_mut_slice()).await?; let utility = Self::get_index_page_utility_from_bytes(index_utility_bytes.as_ref()); Ok(utility) } - pub fn persist_index_page_utility( + pub async fn persist_index_page_utility( file: &mut File, page_id: PageId, utility: IndexPageUtility, @@ -187,29 +187,36 @@ impl IndexPage { Strategy, Share>, rkyv::rancor::Error>, >, { - seek_to_page_start(file, page_id.0)?; + seek_to_page_start(file, page_id.0).await?; file.seek(SeekFrom::Current( GENERAL_HEADER_SIZE as i64 + u16::default().aligned_size() as i64, - ))?; - - let node_id_bytes = rkyv::to_bytes::(&utility.node_id)?; - file.write_all(node_id_bytes.as_slice())?; - let current_index_bytes = rkyv::to_bytes::(&utility.current_index)?; - file.write_all(current_index_bytes.as_slice())?; - let current_length_bytes = rkyv::to_bytes::(&utility.current_length)?; - file.write_all(current_length_bytes.as_slice())?; - let slots_bytes = rkyv::to_bytes::(&utility.slots)?; - file.write_all(slots_bytes.as_slice())?; + )) + .await?; + + let mut bytes = vec![]; + bytes.copy_from_slice( + &mut rkyv::to_bytes::(&utility.node_id)?.as_slice(), + ); + bytes.copy_from_slice( + &mut rkyv::to_bytes::(&utility.current_index)?.as_slice(), + ); + bytes.copy_from_slice( + &mut rkyv::to_bytes::(&utility.current_length)?.as_slice(), + ); + bytes.copy_from_slice( + &mut rkyv::to_bytes::(&utility.slots)?.as_slice(), + ); + file.write_all(bytes.as_slice()).await?; Ok(()) } - fn read_value(file: &mut File) -> eyre::Result> + async fn read_value(file: &mut File) -> eyre::Result> where T: Archive + Default + SizeMeasurable, ::Archived: Deserialize>, { let mut bytes = vec![0u8; align8(IndexValue::::default().aligned_size())]; - file.read_exact(bytes.as_mut_slice())?; + file.read_exact(bytes.as_mut_slice()).await?; let mut v = AlignedVec::<4>::new(); v.extend_from_slice(bytes.as_slice()); let archived = @@ -217,7 +224,7 @@ impl IndexPage { Ok(rkyv::deserialize(archived).expect("data should be valid")) } - pub fn read_value_with_index( + pub async fn read_value_with_index( file: &mut File, page_id: PageId, size: usize, @@ -227,11 +234,11 @@ impl IndexPage { T: Archive + Default + SizeMeasurable, ::Archived: Deserialize>, { - seek_to_page_start(file, page_id.0)?; + seek_to_page_start(file, page_id.0).await?; let offset = Self::get_value_offset(size, index); - file.seek(SeekFrom::Current(offset as i64))?; - Self::read_value(file) + file.seek(SeekFrom::Current(offset as i64)).await?; + Self::read_value(file).await } fn get_value_offset(size: usize, value_index: usize) -> usize @@ -249,7 +256,7 @@ impl IndexPage { offset } - pub fn persist_value( + pub async fn persist_value( file: &mut File, page_id: PageId, size: usize, @@ -266,25 +273,25 @@ impl IndexPage { >, ::Archived: Deserialize>, { - seek_to_page_start(file, page_id.0)?; + seek_to_page_start(file, page_id.0).await?; let offset = Self::get_value_offset(size, value_index as usize); - file.seek(SeekFrom::Current(offset as i64))?; + file.seek(SeekFrom::Current(offset as i64)).await?; let bytes = rkyv::to_bytes::(&value)?; - file.write_all(bytes.as_slice())?; + file.write_all(bytes.as_slice()).await?; if value_index != size as u16 - 1 { - let mut value = Self::read_value(file)?; + let mut value = Self::read_value(file).await?; while value != IndexValue::default() { value_index += 1; - value = Self::read_value(file)?; + value = Self::read_value(file).await?; } } Ok(value_index + 1) } - pub fn remove_value( + pub async fn remove_value( file: &mut File, page_id: PageId, size: usize, @@ -300,13 +307,13 @@ impl IndexPage { >, ::Archived: Deserialize>, { - seek_to_page_start(file, page_id.0)?; + seek_to_page_start(file, page_id.0).await?; let offset = Self::get_value_offset(size, value_index as usize); - file.seek(SeekFrom::Current(offset as i64))?; + file.seek(SeekFrom::Current(offset as i64)).await?; let value = IndexValue::::default(); let bytes = rkyv::to_bytes::(&value)?; - file.write_all(bytes.as_slice())?; + file.write_all(bytes.as_slice()).await?; Ok(()) } diff --git a/src/page/util.rs b/src/page/util.rs index c82f7fe..90fa853 100644 --- a/src/page/util.rs +++ b/src/page/util.rs @@ -1,9 +1,9 @@ -use std::io; -use std::io::prelude::*; - use eyre::eyre; use rkyv::api::high::HighDeserializer; use rkyv::Archive; +use std::io::SeekFrom; +use tokio::fs::File; +use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; use super::index::IndexValue; use super::SpaceInfoPage; @@ -69,44 +69,45 @@ pub fn map_data_pages_to_general( general_pages } -pub fn persist_page(page: &mut GeneralPage, file: &mut std::fs::File) -> eyre::Result<()> +pub async fn persist_page(page: &mut GeneralPage, file: &mut File) -> eyre::Result<()> where T: Persistable, { - use std::io::prelude::*; - - seek_to_page_start(file, page.header.page_id.0)?; + seek_to_page_start(file, page.header.page_id.0).await?; let page_count = page.header.page_id.0 as i64 + 1; let inner_bytes = page.inner.as_bytes(); page.header.data_length = inner_bytes.as_ref().len() as u32; - file.write_all(page.header.as_bytes().as_ref())?; - file.write_all(inner_bytes.as_ref())?; - let curr_position = file.stream_position()?; - file.seek(io::SeekFrom::Current( + file.write_all(page.header.as_bytes().as_ref()).await?; + file.write_all(inner_bytes.as_ref()).await?; + let curr_position = file.stream_position().await?; + file.seek(SeekFrom::Current( (page_count * PAGE_SIZE as i64) - curr_position as i64, - ))?; + )) + .await?; Ok(()) } -pub fn seek_to_page_start(file: &mut std::fs::File, index: u32) -> eyre::Result<()> { - file.seek(io::SeekFrom::Start(index as u64 * PAGE_SIZE as u64))?; +pub async fn seek_to_page_start(file: &mut File, index: u32) -> eyre::Result<()> { + file.seek(SeekFrom::Start(index as u64 * PAGE_SIZE as u64)) + .await?; Ok(()) } -pub fn seek_by_link(file: &mut std::fs::File, link: Link) -> eyre::Result<()> { - file.seek(io::SeekFrom::Start( +pub async fn seek_by_link(file: &mut File, link: Link) -> eyre::Result<()> { + file.seek(SeekFrom::Start( link.page_id.0 as u64 * PAGE_SIZE as u64 + GENERAL_HEADER_SIZE as u64 + link.offset as u64, - ))?; + )) + .await?; Ok(()) } -pub fn update_at( - file: &mut std::fs::File, +pub async fn update_at( + file: &mut File, link: Link, new_data: &[u8], ) -> eyre::Result<()> { @@ -127,14 +128,14 @@ pub fn update_at( )); } - seek_by_link(file, link)?; - file.write_all(new_data)?; + seek_by_link(file, link).await?; + file.write_all(new_data).await?; Ok(()) } -pub fn parse_general_header(file: &mut std::fs::File) -> eyre::Result { +pub async fn parse_general_header(file: &mut File) -> eyre::Result { let mut buffer = [0; GENERAL_HEADER_SIZE]; - file.read_exact(&mut buffer)?; + file.read_exact(&mut buffer).await?; let archived = unsafe { rkyv::access_unchecked::<::Archived>(&buffer[..]) }; let header = @@ -143,8 +144,8 @@ pub fn parse_general_header(file: &mut std::fs::File) -> eyre::Result( - file: &mut std::fs::File, +pub async fn parse_page( + file: &mut File, index: u32, ) -> eyre::Result> where @@ -152,8 +153,8 @@ where ::Archived: rkyv::Deserialize>, { - seek_to_page_start(file, index)?; - let header = parse_general_header(file)?; + seek_to_page_start(file, index).await?; + let header = parse_general_header(file).await?; let length = if header.data_length == 0 { PAGE_SIZE } else { @@ -161,7 +162,7 @@ where }; let mut buffer: Vec = vec![0u8; length as usize]; - file.read_exact(&mut buffer)?; + file.read_exact(&mut buffer).await?; let info = Page::from_bytes(buffer.as_ref()); Ok(GeneralPage { @@ -170,19 +171,19 @@ where }) } -pub fn parse_data_page( - file: &mut std::fs::File, +pub async fn parse_data_page( + file: &mut File, index: u32, ) -> eyre::Result>> { - seek_to_page_start(file, index)?; - let header = parse_general_header(file)?; + seek_to_page_start(file, index).await?; + let header = parse_general_header(file).await?; let mut buffer = [0u8; INNER_PAGE_SIZE]; if header.next_id == 0.into() { #[allow(clippy::unused_io_amount)] - file.read(&mut buffer)?; + file.read(&mut buffer).await?; } else { - file.read_exact(&mut buffer)?; + file.read_exact(&mut buffer).await?; } let data = DataPage { @@ -220,19 +221,19 @@ pub fn parse_data_page( // Ok(parsed_record) // } -pub fn parse_index_page( - file: &mut std::fs::File, +pub async fn parse_index_page( + file: &mut File, index: u32, ) -> eyre::Result>> where T: Archive, ::Archived: rkyv::Deserialize>, { - seek_to_page_start(file, index)?; - let header = parse_general_header(file)?; + seek_to_page_start(file, index).await?; + let header = parse_general_header(file).await?; let mut buffer: Vec = vec![0u8; header.data_length as usize]; - file.read_exact(&mut buffer)?; + file.read_exact(&mut buffer).await?; let archived = unsafe { rkyv::access_unchecked::< as Archive>::Archived>(&buffer[..]) }; let index_records: Vec> = rkyv::deserialize::, _>(archived) @@ -242,14 +243,14 @@ where Ok(index_records) } -pub fn parse_space_info( - file: &mut std::fs::File, +pub async fn parse_space_info( + file: &mut File, ) -> eyre::Result { - file.seek(io::SeekFrom::Start(0))?; - let header = parse_general_header(file)?; + file.seek(SeekFrom::Start(0)).await?; + let header = parse_general_header(file).await?; let mut buffer = vec![0u8; header.data_length as usize]; - file.read_exact(&mut buffer)?; + file.read_exact(&mut buffer).await?; let archived = unsafe { rkyv::access_unchecked::<::Archived>(&buffer[..]) }; let space_info: SpaceInfoPage = From 0abaeb54d015fe1ece22a1989a796e2a6b280121 Mon Sep 17 00:00:00 2001 From: Handy-caT <37216852+Handy-caT@users.noreply.github.com> Date: Fri, 14 Mar 2025 13:32:52 +0300 Subject: [PATCH 2/4] WIP --- src/page/util.rs | 38 ++++++++++++++++++++++---------------- src/util/persistable.rs | 3 ++- 2 files changed, 24 insertions(+), 17 deletions(-) diff --git a/src/page/util.rs b/src/page/util.rs index 90fa853..b7cb57b 100644 --- a/src/page/util.rs +++ b/src/page/util.rs @@ -1,6 +1,7 @@ use eyre::eyre; use rkyv::api::high::HighDeserializer; use rkyv::Archive; +use std::future::Future; use std::io::SeekFrom; use tokio::fs::File; use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; @@ -69,25 +70,30 @@ pub fn map_data_pages_to_general( general_pages } -pub async fn persist_page(page: &mut GeneralPage, file: &mut File) -> eyre::Result<()> +pub fn persist_page<'a, T>( + page: &'a mut GeneralPage, + file: &'a mut File, +) -> impl Future> + Send + use<'a, T> where - T: Persistable, + T: Persistable + Send + Sync, { - seek_to_page_start(file, page.header.page_id.0).await?; - - let page_count = page.header.page_id.0 as i64 + 1; - let inner_bytes = page.inner.as_bytes(); - page.header.data_length = inner_bytes.as_ref().len() as u32; - - file.write_all(page.header.as_bytes().as_ref()).await?; - file.write_all(inner_bytes.as_ref()).await?; - let curr_position = file.stream_position().await?; - file.seek(SeekFrom::Current( - (page_count * PAGE_SIZE as i64) - curr_position as i64, - )) - .await?; + async { + seek_to_page_start(file, page.header.page_id.0).await?; + + let page_count = page.header.page_id.0 as i64 + 1; + let inner_bytes = page.inner.as_bytes(); + page.header.data_length = inner_bytes.as_ref().len() as u32; + + file.write_all(page.header.as_bytes().as_ref()).await?; + file.write_all(inner_bytes.as_ref()).await?; + let curr_position = file.stream_position().await?; + file.seek(SeekFrom::Current( + (page_count * PAGE_SIZE as i64) - curr_position as i64, + )) + .await?; - Ok(()) + Ok(()) + } } pub async fn seek_to_page_start(file: &mut File, index: u32) -> eyre::Result<()> { diff --git a/src/util/persistable.rs b/src/util/persistable.rs index eb342a3..8fb5620 100644 --- a/src/util/persistable.rs +++ b/src/util/persistable.rs @@ -1,4 +1,5 @@ use crate::SizeMeasurable; + use rkyv::de::Pool; use rkyv::rancor::Strategy; use rkyv::ser::allocator::ArenaHandle; @@ -8,7 +9,7 @@ use rkyv::util::AlignedVec; use rkyv::{Archive, Deserialize, Serialize}; pub trait Persistable { - fn as_bytes(&self) -> impl AsRef<[u8]>; + fn as_bytes(&self) -> impl AsRef<[u8]> + Send; fn from_bytes(bytes: &[u8]) -> Self; } From 07f8ef4a22153c22cf785b3430fda9b65a7c1793 Mon Sep 17 00:00:00 2001 From: Handy-caT <37216852+Handy-caT@users.noreply.github.com> Date: Fri, 14 Mar 2025 14:05:48 +0300 Subject: [PATCH 3/4] Corrections --- src/page/index/page.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/page/index/page.rs b/src/page/index/page.rs index 74dc544..bd76b53 100644 --- a/src/page/index/page.rs +++ b/src/page/index/page.rs @@ -194,16 +194,16 @@ impl IndexPage { .await?; let mut bytes = vec![]; - bytes.copy_from_slice( + bytes.extend_from_slice( &mut rkyv::to_bytes::(&utility.node_id)?.as_slice(), ); - bytes.copy_from_slice( + bytes.extend_from_slice( &mut rkyv::to_bytes::(&utility.current_index)?.as_slice(), ); - bytes.copy_from_slice( + bytes.extend_from_slice( &mut rkyv::to_bytes::(&utility.current_length)?.as_slice(), ); - bytes.copy_from_slice( + bytes.extend_from_slice( &mut rkyv::to_bytes::(&utility.slots)?.as_slice(), ); file.write_all(bytes.as_slice()).await?; From 100c2759b4e13b8107162dfe4f6ba83f24afbbb2 Mon Sep 17 00:00:00 2001 From: Handy-caT <37216852+Handy-caT@users.noreply.github.com> Date: Fri, 14 Mar 2025 14:37:43 +0300 Subject: [PATCH 4/4] clippy --- src/page/index/page.rs | 13 +++++-------- src/page/util.rs | 35 ++++++++++++++++------------------- 2 files changed, 21 insertions(+), 27 deletions(-) diff --git a/src/page/index/page.rs b/src/page/index/page.rs index bd76b53..24564b9 100644 --- a/src/page/index/page.rs +++ b/src/page/index/page.rs @@ -194,18 +194,15 @@ impl IndexPage { .await?; let mut bytes = vec![]; + bytes + .extend_from_slice(rkyv::to_bytes::(&utility.node_id)?.as_slice()); bytes.extend_from_slice( - &mut rkyv::to_bytes::(&utility.node_id)?.as_slice(), - ); - bytes.extend_from_slice( - &mut rkyv::to_bytes::(&utility.current_index)?.as_slice(), - ); - bytes.extend_from_slice( - &mut rkyv::to_bytes::(&utility.current_length)?.as_slice(), + rkyv::to_bytes::(&utility.current_index)?.as_slice(), ); bytes.extend_from_slice( - &mut rkyv::to_bytes::(&utility.slots)?.as_slice(), + rkyv::to_bytes::(&utility.current_length)?.as_slice(), ); + bytes.extend_from_slice(rkyv::to_bytes::(&utility.slots)?.as_slice()); file.write_all(bytes.as_slice()).await?; Ok(()) } diff --git a/src/page/util.rs b/src/page/util.rs index b7cb57b..08079a6 100644 --- a/src/page/util.rs +++ b/src/page/util.rs @@ -1,7 +1,6 @@ use eyre::eyre; use rkyv::api::high::HighDeserializer; use rkyv::Archive; -use std::future::Future; use std::io::SeekFrom; use tokio::fs::File; use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; @@ -70,30 +69,28 @@ pub fn map_data_pages_to_general( general_pages } -pub fn persist_page<'a, T>( +pub async fn persist_page<'a, T>( page: &'a mut GeneralPage, file: &'a mut File, -) -> impl Future> + Send + use<'a, T> +) -> eyre::Result<()> where T: Persistable + Send + Sync, { - async { - seek_to_page_start(file, page.header.page_id.0).await?; - - let page_count = page.header.page_id.0 as i64 + 1; - let inner_bytes = page.inner.as_bytes(); - page.header.data_length = inner_bytes.as_ref().len() as u32; - - file.write_all(page.header.as_bytes().as_ref()).await?; - file.write_all(inner_bytes.as_ref()).await?; - let curr_position = file.stream_position().await?; - file.seek(SeekFrom::Current( - (page_count * PAGE_SIZE as i64) - curr_position as i64, - )) - .await?; + seek_to_page_start(file, page.header.page_id.0).await?; - Ok(()) - } + let page_count = page.header.page_id.0 as i64 + 1; + let inner_bytes = page.inner.as_bytes(); + page.header.data_length = inner_bytes.as_ref().len() as u32; + + file.write_all(page.header.as_bytes().as_ref()).await?; + file.write_all(inner_bytes.as_ref()).await?; + let curr_position = file.stream_position().await?; + file.seek(SeekFrom::Current( + (page_count * PAGE_SIZE as i64) - curr_position as i64, + )) + .await?; + + Ok(()) } pub async fn seek_to_page_start(file: &mut File, index: u32) -> eyre::Result<()> {