Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ rust-version = "1.68" # 1.67 contains an UB we would tri
[dependencies]
bytes = "1"
crc32fast = "1.2"
serde = { version = "1", features = ["derive"] }
thiserror = "2"
tracing = "0.1.37"
arc-swap = "1.7"
rclite = "0.2"

[dev-dependencies]
criterion = "0.5"
Expand All @@ -26,3 +27,7 @@ tempfile = "3"
[[bench]]
name = "bench"
harness = false

# Uncomment for profiling
# [profile.bench]
# debug=true
12 changes: 12 additions & 0 deletions TODO
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
we do not need to record position anymore. Keeping the last record is sufficient.
Of course it would be nicer to store that in a separate way to avoid fragmentation.

Record refcount

No GC however means on restart we will often have to read way more than what is actually required.

We need a way to stop reading the past.

Truncating a non existing queue.

just truncate to record position.
40 changes: 36 additions & 4 deletions src/block_read_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,22 @@ use crate::PersistAction;

pub const BLOCK_NUM_BYTES: usize = 32_768;

/// A block read is supposed to be positioned on a block at its initialization.
///
/// In other words, it is not necessary to call `next_block` a first time
/// before calling `block()`.
pub trait BlockRead {
type Session;

fn start_session(&self) -> Self::Session;

/// Loads the next block.
/// If `Ok(true)` is returned, the new block is available through
/// `.block()`.
///
/// If `Ok(false)` is returned, the end of the `BlockReader`
/// has been reached and the content of `block()` could be anything.
fn next_block(&mut self) -> io::Result<bool>;
fn next_block(&mut self, read_session: &mut Self::Session) -> io::Result<bool>;

/// A `BlockReader` is always position on a specific block.
///
Expand All @@ -25,8 +33,17 @@ pub trait BlockRead {
}

pub trait BlockWrite {
type Session;

fn start_write_session(&mut self) -> io::Result<Self::Session>;

fn make_room(&mut self, num_bytes: u64) -> io::Result<()>;

/// Must panic if buf is larger than `num_bytes_remaining_in_block`.
fn write(&mut self, buf: &[u8]) -> io::Result<()>;
/// Not that this trait does not have next_block() method.
///
/// We automatically go to the next block after the current block has been entirely written.
fn write(&mut self, buf: &[u8], write_session: &mut Self::Session) -> io::Result<()>;
/// Persist the data following the `persist_action`.
fn persist(&mut self, persist_action: PersistAction) -> io::Result<()>;
/// Number of bytes that can be added in the block.
Expand All @@ -49,7 +66,11 @@ impl<'a> From<&'a [u8]> for ArrayReader<'a> {
}

impl BlockRead for ArrayReader<'_> {
fn next_block(&mut self) -> io::Result<bool> {
type Session = ();

fn start_session(&self) -> Self::Session {}

fn next_block(&mut self, _session: &mut Self::Session) -> io::Result<bool> {
if self.data.len() < BLOCK_NUM_BYTES {
return Ok(false);
}
Expand Down Expand Up @@ -81,7 +102,14 @@ impl From<VecBlockWriter> for Vec<u8> {
}

impl BlockWrite for VecBlockWriter {
fn write(&mut self, buf: &[u8]) -> io::Result<()> {
type Session = ();
fn make_room(&mut self, num_bytes: u64) -> io::Result<()> {
// TODO consider just doubling for performance.
let new_len = ceil_to_block(self.cursor + num_bytes as usize);
self.buffer.resize(new_len, 0u8);
Ok(())
}
fn write(&mut self, buf: &[u8], _session: &mut Self::Session) -> io::Result<()> {
assert!(buf.len() <= self.num_bytes_remaining_in_block());
if self.cursor + buf.len() > self.buffer.len() {
let new_len = ceil_to_block((self.cursor + buf.len()) * 2 + 1);
Expand All @@ -99,4 +127,8 @@ impl BlockWrite for VecBlockWriter {
fn num_bytes_remaining_in_block(&self) -> usize {
BLOCK_NUM_BYTES - (self.cursor % BLOCK_NUM_BYTES)
}

fn start_write_session(&mut self) -> io::Result<Self::Session> {
Ok(())
}
}
10 changes: 10 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,13 @@ pub enum ReadRecordError {
#[error("Corruption")]
Corruption,
}

#[derive(Error, Debug)]
pub enum HeaderError {
#[error("invalid magic number: found {magic_number}")]
InvalidMagicNumber { magic_number: u32 },
#[error("invalid checksum")]
InvalidChecksum,
#[error("unsupported version: {version}")]
UnsupportedVersion { version: u32 },
}
33 changes: 15 additions & 18 deletions src/frame/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,14 @@ use std::io;

use thiserror::Error;

use crate::frame::{FrameType, FrameWriter, Header, HEADER_LEN};
use crate::rolling::{RollingReader, RollingWriter};
use crate::frame::{FrameType, Header, HEADER_LEN};
use crate::{BlockRead, BLOCK_NUM_BYTES};

pub struct FrameReader<R> {
reader: R,
pub(crate) reader: R,

/// In block cursor
cursor: usize,
pub(crate) cursor: usize,

// The current block is corrupted.
block_corrupted: bool,
Expand All @@ -35,8 +34,8 @@ impl<R: BlockRead + Unpin> FrameReader<R> {
}
}

pub fn read(&self) -> &R {
&self.reader
pub fn start_session(&self) -> R::Session {
self.reader.start_session()
}

// Returns the number of bytes remaining into
Expand All @@ -47,13 +46,16 @@ impl<R: BlockRead + Unpin> FrameReader<R> {
crate::BLOCK_NUM_BYTES - self.cursor
}

fn go_to_next_block_if_necessary(&mut self) -> Result<(), ReadFrameError> {
fn go_to_next_block_if_necessary(
&mut self,
session: &mut R::Session,
) -> Result<(), ReadFrameError> {
let num_bytes_to_end_of_block = self.num_bytes_to_end_of_block();
let need_to_skip_block = self.block_corrupted || num_bytes_to_end_of_block < HEADER_LEN;
if !need_to_skip_block {
return Ok(());
}
if !self.reader.next_block()? {
if !self.reader.next_block(session)? {
return Err(ReadFrameError::NotAvailable);
}

Expand All @@ -79,8 +81,11 @@ impl<R: BlockRead + Unpin> FrameReader<R> {
}

// Reads the next frame.
pub fn read_frame(&mut self) -> Result<(FrameType, &[u8]), ReadFrameError> {
self.go_to_next_block_if_necessary()?;
pub fn read_frame(
&mut self,
session: &mut R::Session,
) -> Result<(FrameType, &[u8]), ReadFrameError> {
self.go_to_next_block_if_necessary(session)?;
let header = self.get_frame_header()?;
self.cursor += HEADER_LEN;
if self.cursor + header.len() > BLOCK_NUM_BYTES {
Expand All @@ -103,11 +108,3 @@ impl<R: BlockRead + Unpin> FrameReader<R> {
Ok((header.frame_type(), frame_payload))
}
}

impl FrameReader<RollingReader> {
pub fn into_writer(self) -> io::Result<FrameWriter<RollingWriter>> {
let mut rolling_writer: RollingWriter = self.reader.into_writer()?;
rolling_writer.forward(self.cursor)?;
Ok(FrameWriter::create(rolling_writer))
}
}
50 changes: 30 additions & 20 deletions src/frame/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,39 +3,42 @@
use crate::block_read_write::{ArrayReader, VecBlockWriter};
use crate::frame::header::{FrameType, HEADER_LEN};
use crate::frame::{FrameReader, FrameWriter, ReadFrameError};
use crate::{PersistAction, BLOCK_NUM_BYTES};
use crate::{BlockWrite as _, PersistAction, BLOCK_NUM_BYTES};

#[test]
fn test_frame_simple() {
let block_writer = {
let wrt: VecBlockWriter = VecBlockWriter::default();
let mut wrt: VecBlockWriter = VecBlockWriter::default();
let mut session = wrt.start_write_session().unwrap();

Check warning on line 12 in src/frame/tests.rs

View workflow job for this annotation

GitHub Actions / clippy

this let-binding has unit value

warning: this let-binding has unit value --> src/frame/tests.rs:12:9 | 12 | let mut session = wrt.start_write_session().unwrap(); | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#let_unit_value = note: `#[warn(clippy::let_unit_value)]` on by default help: omit the `let` binding and replace variable usages with `()` | 12 ~ wrt.start_write_session().unwrap(); 13 | let mut frame_writer = FrameWriter::create(wrt); 14 | 15 | frame_writer 16 ~ .write_frame(FrameType::First, &b"abc"[..], &mut ()) 17 | .unwrap(); 18 | frame_writer 19 ~ .write_frame(FrameType::Middle, &b"de"[..], &mut ()) 20 | .unwrap(); 21 | frame_writer 22 ~ .write_frame(FrameType::Last, &b"fgh"[..], &mut ()) |
let mut frame_writer = FrameWriter::create(wrt);

frame_writer
.write_frame(FrameType::First, &b"abc"[..])
.write_frame(FrameType::First, &b"abc"[..], &mut session)
.unwrap();
frame_writer
.write_frame(FrameType::Middle, &b"de"[..])
.write_frame(FrameType::Middle, &b"de"[..], &mut session)
.unwrap();
frame_writer
.write_frame(FrameType::Last, &b"fgh"[..])
.write_frame(FrameType::Last, &b"fgh"[..], &mut session)
.unwrap();
frame_writer.persist(PersistAction::Flush).unwrap();
frame_writer.into_writer()
};
let buffer: Vec<u8> = block_writer.into();
let mut frame_reader = FrameReader::open(ArrayReader::from(&buffer[..]));
let read_frame_res = frame_reader.read_frame();
let mut session = frame_reader.start_session();

Check warning on line 29 in src/frame/tests.rs

View workflow job for this annotation

GitHub Actions / clippy

this let-binding has unit value

warning: this let-binding has unit value --> src/frame/tests.rs:29:5 | 29 | let mut session = frame_reader.start_session(); | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#let_unit_value help: omit the `let` binding and replace variable usages with `()` | 29 ~ frame_reader.start_session(); 30 ~ let read_frame_res = frame_reader.read_frame(&mut ()); 31 | assert_eq!(read_frame_res.unwrap(), (FrameType::First, &b"abc"[..])); 32 | assert_eq!( 33 ~ frame_reader.read_frame(&mut ()).unwrap(), 34 | (FrameType::Middle, &b"de"[..]) 35 | ); 36 | assert_eq!( 37 ~ frame_reader.read_frame(&mut ()).unwrap(), 38 | (FrameType::Last, &b"fgh"[..]) 39 | ); 40 | assert!(matches!( 41 ~ frame_reader.read_frame(&mut ()).unwrap_err(), |
let read_frame_res = frame_reader.read_frame(&mut session);
assert_eq!(read_frame_res.unwrap(), (FrameType::First, &b"abc"[..]));
assert_eq!(
frame_reader.read_frame().unwrap(),
frame_reader.read_frame(&mut session).unwrap(),
(FrameType::Middle, &b"de"[..])
);
assert_eq!(
frame_reader.read_frame().unwrap(),
frame_reader.read_frame(&mut session).unwrap(),
(FrameType::Last, &b"fgh"[..])
);
assert!(matches!(
frame_reader.read_frame().unwrap_err(),
frame_reader.read_frame(&mut session).unwrap_err(),
ReadFrameError::NotAvailable
));
}
Expand All @@ -44,29 +47,34 @@
fn test_frame_corruption_in_payload() -> io::Result<()> {
let mut buf: Vec<u8> = {
let mut frame_writer = FrameWriter::create(VecBlockWriter::default());
frame_writer.write_frame(FrameType::First, &b"abc"[..])?;
let mut session = frame_writer.start_session()?;

Check warning on line 50 in src/frame/tests.rs

View workflow job for this annotation

GitHub Actions / clippy

this let-binding has unit value

warning: this let-binding has unit value --> src/frame/tests.rs:50:9 | 50 | let mut session = frame_writer.start_session()?; | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#let_unit_value help: omit the `let` binding and replace variable usages with `()` | 50 ~ frame_writer.start_session()?; 51 ~ frame_writer.write_frame(FrameType::First, &b"abc"[..], &mut ())?; 52 | frame_writer.persist(PersistAction::Flush)?; 53 ~ frame_writer.write_frame(FrameType::Middle, &b"de"[..], &mut ())?; |
frame_writer.write_frame(FrameType::First, &b"abc"[..], &mut session)?;
frame_writer.persist(PersistAction::Flush)?;
frame_writer.write_frame(FrameType::Middle, &b"de"[..])?;
frame_writer.write_frame(FrameType::Middle, &b"de"[..], &mut session)?;
frame_writer.persist(PersistAction::Flush)?;
frame_writer.into_writer().into()
};
buf[8] = 0u8;
let mut frame_reader = FrameReader::open(ArrayReader::from(&buf[..]));
let mut session = frame_reader.start_session();

Check warning on line 59 in src/frame/tests.rs

View workflow job for this annotation

GitHub Actions / clippy

this let-binding has unit value

warning: this let-binding has unit value --> src/frame/tests.rs:59:5 | 59 | let mut session = frame_reader.start_session(); | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#let_unit_value help: omit the `let` binding and replace variable usages with `()` | 59 ~ frame_reader.start_session(); 60 | assert!(matches!( 61 ~ frame_reader.read_frame(&mut ()), 62 | Err(ReadFrameError::Corruption) 63 | )); 64 | assert!(matches!( 65 ~ frame_reader.read_frame(&mut ()), |
assert!(matches!(
frame_reader.read_frame(),
frame_reader.read_frame(&mut session),
Err(ReadFrameError::Corruption)
));
assert!(matches!(
frame_reader.read_frame(),
frame_reader.read_frame(&mut session),
Ok((FrameType::Middle, b"de"))
));
Ok(())
}

fn repeat_empty_frame_util(repeat: usize) -> Vec<u8> {
let mut frame_writer = FrameWriter::create(VecBlockWriter::default());
let mut session = frame_writer.start_session().unwrap();

Check warning on line 73 in src/frame/tests.rs

View workflow job for this annotation

GitHub Actions / clippy

this let-binding has unit value

warning: this let-binding has unit value --> src/frame/tests.rs:73:5 | 73 | let mut session = frame_writer.start_session().unwrap(); | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#let_unit_value help: omit the `let` binding and replace variable usages with `()` | 73 ~ frame_writer.start_session().unwrap(); 74 | for _ in 0..repeat { 75 | frame_writer 76 ~ .write_frame(FrameType::Full, &b""[..], &mut ()) |
for _ in 0..repeat {
frame_writer.write_frame(FrameType::Full, &b""[..]).unwrap();
frame_writer
.write_frame(FrameType::Full, &b""[..], &mut session)
.unwrap();
}
frame_writer.persist(PersistAction::Flush).unwrap();
frame_writer.into_writer().into()
Expand All @@ -77,12 +85,13 @@
let num_frames = 1 + BLOCK_NUM_BYTES / HEADER_LEN;
let buffer = repeat_empty_frame_util(num_frames);
let mut frame_reader = FrameReader::open(ArrayReader::from(&buffer[..]));
let mut session = frame_reader.start_session();

Check warning on line 88 in src/frame/tests.rs

View workflow job for this annotation

GitHub Actions / clippy

this let-binding has unit value

warning: this let-binding has unit value --> src/frame/tests.rs:88:5 | 88 | let mut session = frame_reader.start_session(); | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#let_unit_value help: omit the `let` binding and replace variable usages with `()` | 88 ~ frame_reader.start_session(); 89 | for _ in 0..num_frames { 90 ~ let read_frame_res = frame_reader.read_frame(&mut ()); 91 | assert!(matches!(read_frame_res, Ok((FrameType::Full, &[])))); 92 | } 93 | assert!(matches!( 94 ~ frame_reader.read_frame(&mut &mut &mut &mut &mut &mut &mut &mut ()), |
for _ in 0..num_frames {
let read_frame_res = frame_reader.read_frame();
let read_frame_res = frame_reader.read_frame(&mut session);
assert!(matches!(read_frame_res, Ok((FrameType::Full, &[]))));
}
assert!(matches!(
frame_reader.read_frame(),
frame_reader.read_frame(&mut &mut &mut &mut &mut &mut &mut &mut session),

Check warning on line 94 in src/frame/tests.rs

View workflow job for this annotation

GitHub Actions / clippy

this expression creates a reference which is immediately dereferenced by the compiler

warning: this expression creates a reference which is immediately dereferenced by the compiler --> src/frame/tests.rs:94:33 | 94 | frame_reader.read_frame(&mut &mut &mut &mut &mut &mut &mut &mut session), | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ help: change this to: `(&mut session)` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#needless_borrow = note: `#[warn(clippy::needless_borrow)]` on by default
Err(ReadFrameError::NotAvailable)
));
Ok(())
Expand All @@ -96,20 +105,21 @@
let mut buffer = repeat_empty_frame_util(num_frames);
buffer[2000 * HEADER_LEN + 5] = 255u8;
let mut frame_reader = FrameReader::open(ArrayReader::from(&buffer[..]));
let mut session = frame_reader.start_session();

Check warning on line 108 in src/frame/tests.rs

View workflow job for this annotation

GitHub Actions / clippy

this let-binding has unit value

warning: this let-binding has unit value --> src/frame/tests.rs:108:5 | 108 | let mut session = frame_reader.start_session(); | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#let_unit_value help: omit the `let` binding and replace variable usages with `()` | 108 ~ frame_reader.start_session(); 109 | for _ in 0..2000 { 110 ~ let read_frame_res = frame_reader.read_frame(&mut ()); 111 | assert!(matches!(read_frame_res, Ok((FrameType::Full, &[])))); 112 | } 113 | assert!(matches!( 114 ~ frame_reader.read_frame(&mut ()), 115 | Err(ReadFrameError::Corruption) 116 | )); 117 | assert!(matches!( 118 ~ frame_reader.read_frame(&mut ()), 119 | Ok((FrameType::Full, &[])) 120 | )); 121 | assert!(matches!( 122 ~ frame_reader.read_frame(&mut ()), |
for _ in 0..2000 {
let read_frame_res = frame_reader.read_frame();
let read_frame_res = frame_reader.read_frame(&mut session);
assert!(matches!(read_frame_res, Ok((FrameType::Full, &[]))));
}
assert!(matches!(
frame_reader.read_frame(),
frame_reader.read_frame(&mut session),
Err(ReadFrameError::Corruption)
));
assert!(matches!(
frame_reader.read_frame(),
frame_reader.read_frame(&mut session),
Ok((FrameType::Full, &[]))
));
assert!(matches!(
frame_reader.read_frame(),
frame_reader.read_frame(&mut session),
Err(ReadFrameError::NotAvailable)
));
Ok(())
Expand Down
Loading
Loading