Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion litebox/src/broker/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use crate::event::{counter::EventCounterError, polling::TryOpError};

/// Error returned by the deployment-provided broker control path.
#[derive(Clone, Copy, Debug, Error, PartialEq, Eq)]
#[non_exhaustive]
pub(crate) enum BrokerControlError {
#[error("broker control transport failed")]
Transport,
Expand Down
16 changes: 6 additions & 10 deletions litebox/src/event/counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ pub use litebox_broker_protocol::EventConsumeMode as EventCounterReadMode;
use litebox_broker_protocol::{
AddEventRequest, ConsumeEventRequest, ConsumeEventResponse, CoreRequest, CoreResponse,
CreateEventRequest, EventRequest, EventResponse, ObjectHandle, ReadinessState,
WaitEventRequest, WaitOutcome,
WaitEventRequest,
};
use thiserror::Error;

Expand Down Expand Up @@ -64,7 +64,7 @@ where
CreateEventRequest { initial_count },
)))
.map_err(BrokerObjectError::from)
.and_then(event_response_from_core)
.map(event_response_from_core)
.map_err(EventCounterError::from)?;
let EventResponse::Create(response) = response else {
return Err(BrokerObjectError::UnexpectedResponse.into());
Expand Down Expand Up @@ -140,7 +140,7 @@ where
self.broker
.request(CoreRequest::Event(request))
.map_err(BrokerObjectError::from)
.and_then(event_response_from_core)
.map(event_response_from_core)
}
}

Expand All @@ -161,10 +161,7 @@ where
let EventResponse::Wait(response) = response else {
return Events::empty();
};
let (WaitOutcome::Ready(readiness) | WaitOutcome::WouldBlock(readiness)) = response.outcome
else {
return Events::empty();
};
let readiness = response.readiness;
let mut events = Events::empty();
if readiness.read_ready {
events |= Events::IN;
Expand All @@ -176,9 +173,8 @@ where
}
}

fn event_response_from_core(response: CoreResponse) -> Result<EventResponse, BrokerObjectError> {
fn event_response_from_core(response: CoreResponse) -> EventResponse {
match response {
CoreResponse::Event(response) => Ok(response),
_ => Err(BrokerObjectError::UnexpectedResponse),
CoreResponse::Event(response) => response,
}
}
1 change: 0 additions & 1 deletion litebox/src/pipes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,6 @@ struct WriteEnd<Platform: RawSyncPrimitivesProvider + TimeProvider, T> {

/// Potential errors when writing or reading from a pipe
#[derive(Error, Debug)]
#[non_exhaustive]
enum PipeError {
#[error("this end has been shut down")]
ThisEndShutdown,
Expand Down
1 change: 0 additions & 1 deletion litebox/src/sync/lock_tracing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ const CONFIG_ENABLE_RECORDING: bool = true;
const CONFIG_MAX_RECORDED_EVENTS: usize = 1_000_000;

/// The kind of lock that has been applied, either for locking or unlocking.
#[non_exhaustive]
#[derive(PartialEq, Eq, Debug, Clone, Copy)]
pub(crate) enum LockType {
RwLock,
Expand Down
22 changes: 6 additions & 16 deletions litebox_broker_core/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@

use crate::session::{ObjectEntry, ObjectRights};
use crate::{BrokerError, BrokerSession, Result};
use litebox_broker_protocol::{
EventConsumeMode, EventConsumption, ObjectHandle, ReadinessState, WaitOutcome,
};
use litebox_broker_protocol::{EventConsumeMode, EventConsumption, ObjectHandle, ReadinessState};

pub(crate) const MAX_EVENT_COUNT: u64 = u64::MAX - 1;

Expand All @@ -25,20 +23,13 @@ pub fn create(session: &BrokerSession, initial_count: u64) -> Result<ObjectHandl
/// Blocking is intentionally outside BrokerCore for the first proof of
/// concept. Userland or kernel deployments can block on deployment-specific
/// wait primitives after BrokerCore authorizes and reports readiness state.
pub fn wait(session: &BrokerSession, handle: ObjectHandle) -> Result<WaitOutcome> {
pub fn wait(session: &BrokerSession, handle: ObjectHandle) -> Result<ReadinessState> {
let required_rights = ObjectRights::WAIT;
session.with_authorized_object(handle, required_rights, |object| match object {
ObjectEntry::Event(event) => {
let readiness = ReadinessState {
read_ready: event.count > 0,
write_ready: event.count < MAX_EVENT_COUNT,
};
Ok(if readiness.read_ready {
WaitOutcome::Ready(readiness)
} else {
WaitOutcome::WouldBlock(readiness)
})
}
ObjectEntry::Event(event) => Ok(ReadinessState {
read_ready: event.count > 0,
write_ready: event.count < MAX_EVENT_COUNT,
}),
})
}

Expand Down Expand Up @@ -92,7 +83,6 @@ impl EventObject {
let value = match mode {
EventConsumeMode::All => self.count,
EventConsumeMode::One => 1,
_ => return Err(BrokerError::UnsupportedOperation),
};
self.count -= value;
Ok(EventConsumption {
Expand Down
6 changes: 3 additions & 3 deletions litebox_broker_core/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ mod tests {
event,
};
use litebox_broker_protocol::{
EventConsumeMode, EventConsumption, ObjectHandle, ReadinessState, WaitOutcome,
EventConsumeMode, EventConsumption, ObjectHandle, ReadinessState,
};

#[test]
Expand Down Expand Up @@ -214,10 +214,10 @@ mod tests {

assert_eq!(
event::wait(&session, handle),
Ok(WaitOutcome::WouldBlock(ReadinessState {
Ok(ReadinessState {
read_ready: false,
write_ready: true,
}))
})
);
assert_eq!(
event::add(&session, handle, 1),
Expand Down
9 changes: 3 additions & 6 deletions litebox_broker_host/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ fn handle_request(
})
}
}
_ => BrokerDispatch::close_after(
BrokerRequest::Core(_) => BrokerDispatch::close_after(
BrokerResponse::Error(ErrorCode::ProtocolState),
CloseReason::ProtocolViolation,
),
Expand All @@ -117,14 +117,12 @@ fn handle_active_request(session: &BrokerSession, request: BrokerRequest) -> Bro
BrokerRequest::Core(request) => {
BrokerDispatch::continue_after(handle_core_request(session, request))
}
_ => BrokerDispatch::continue_after(BrokerResponse::Error(ErrorCode::UnsupportedOperation)),
}
}

fn handle_core_request(session: &BrokerSession, request: CoreRequest) -> BrokerResponse {
match request {
CoreRequest::Event(request) => handle_event_request(session, request),
_ => BrokerResponse::Error(ErrorCode::UnsupportedOperation),
}
}

Expand All @@ -138,9 +136,9 @@ fn handle_event_request(session: &BrokerSession, request: EventRequest) -> Broke
})
}
EventRequest::Wait(request) => {
handle_core_result(event::wait(session, request.handle), |outcome| {
handle_core_result(event::wait(session, request.handle), |readiness| {
BrokerResponse::Core(CoreResponse::Event(EventResponse::Wait(
WaitEventResponse { outcome },
WaitEventResponse { readiness },
)))
})
}
Expand All @@ -158,7 +156,6 @@ fn handle_event_request(session: &BrokerSession, request: EventRequest) -> Broke
BrokerResponse::Core(CoreResponse::Event(EventResponse::Consume(consumption)))
},
),
_ => BrokerResponse::Error(ErrorCode::UnsupportedOperation),
}
}

Expand Down
6 changes: 3 additions & 3 deletions litebox_broker_local/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use litebox_broker_protocol::{
AddEventRequest, BrokerRequest, BrokerResponse, ConsumeEventRequest, ConsumeEventResponse,
CoreRequest, CoreResponse, CreateEventRequest, EventConsumeMode, EventRequest, EventResponse,
LocalControlChannel, ObjectHandle, ReadinessState, WaitEventRequest, WaitOutcome,
LocalControlChannel, ObjectHandle, ReadinessState, WaitEventRequest,
};

use crate::{BrokerLocal, BrokerLocalError, Result};
Expand All @@ -31,12 +31,12 @@ impl<T: LocalControlChannel> BrokerLocal<T> {
}

/// Checks whether an event wait would complete now.
pub fn wait_event(&mut self, handle: ObjectHandle) -> Result<WaitOutcome, T::Error> {
pub fn wait_event(&mut self, handle: ObjectHandle) -> Result<ReadinessState, T::Error> {
match self.request(event_request(EventRequest::Wait(WaitEventRequest {
handle,
})))? {
BrokerResponse::Core(CoreResponse::Event(EventResponse::Wait(response))) => {
Ok(response.outcome)
Ok(response.readiness)
}
response => Err(BrokerLocalError::UnexpectedResponse(response)),
}
Expand Down
4 changes: 4 additions & 0 deletions litebox_broker_local/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ impl<T: LocalControlChannel> BrokerLocal<T> {
/// Sends one broker request.
///
/// Negotiation is the only request allowed before the connection is active.
#[expect(
clippy::match_wildcard_for_single_variants,
reason = "wildcards keep state-machine fallbacks grouped by behavior"
)]
pub fn request(&mut self, request: BrokerRequest) -> Result<BrokerResponse, T::Error> {
match self.state {
ConnectionState::AwaitingNegotiation => match request {
Expand Down
31 changes: 12 additions & 19 deletions litebox_broker_protocol/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,6 @@ pub enum ErrorCode {
ResourceExhausted,
#[error("broker operation would block")]
WouldBlock,
/// Error code emitted by a newer broker and not understood by this local peer.
///
/// This variant is reserved for raw codes not assigned by this protocol
/// version.
#[error("unknown broker error code {0}")]
Unknown(u16),
}

impl ErrorCode {
Expand All @@ -42,19 +36,19 @@ impl ErrorCode {
/// concrete broker errors.
///
/// Converts a raw protocol error code to an error category.
pub const fn from_raw(raw: u16) -> Self {
pub const fn from_raw(raw: u16) -> Option<Self> {
match raw {
1 => Self::UnsupportedVersion,
2 => Self::MalformedRequest,
3 => Self::ProtocolState,
4 => Self::UnsupportedOperation,
5 => Self::Internal,
6 => Self::PolicyDenied,
7 => Self::UnknownObject,
8 => Self::InvalidRights,
9 => Self::ResourceExhausted,
10 => Self::WouldBlock,
raw => Self::Unknown(raw),
1 => Some(Self::UnsupportedVersion),
2 => Some(Self::MalformedRequest),
3 => Some(Self::ProtocolState),
4 => Some(Self::UnsupportedOperation),
5 => Some(Self::Internal),
6 => Some(Self::PolicyDenied),
7 => Some(Self::UnknownObject),
8 => Some(Self::InvalidRights),
9 => Some(Self::ResourceExhausted),
10 => Some(Self::WouldBlock),
_ => None,
}
}

Expand All @@ -71,7 +65,6 @@ impl ErrorCode {
Self::InvalidRights => 8,
Self::ResourceExhausted => 9,
Self::WouldBlock => 10,
Self::Unknown(raw) => raw,
}
}
}
15 changes: 2 additions & 13 deletions litebox_broker_protocol/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,8 @@ pub struct ReadinessState {
pub write_ready: bool,
}

/// Result of checking whether a broker event read wait would complete now.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
#[non_exhaustive]
pub enum WaitOutcome {
/// The object is read-ready now.
Ready(ReadinessState),
/// The object is not read-ready; deployment-specific wait plumbing may block.
WouldBlock(ReadinessState),
}

/// How a broker event consume operation should remove readiness credits.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
#[non_exhaustive]
pub enum EventConsumeMode {
/// Consume all currently available credits.
All,
Expand Down Expand Up @@ -56,8 +45,8 @@ pub struct WaitEventRequest {
/// Response to an event wait request.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct WaitEventResponse {
/// Current wait outcome.
pub outcome: WaitOutcome,
/// Current readiness state.
pub readiness: ReadinessState,
}

/// Request to add readiness credits to an event.
Expand Down
2 changes: 1 addition & 1 deletion litebox_broker_protocol/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub use error::ErrorCode;
pub use event::{
AddEventRequest, AddEventResponse, ConsumeEventRequest, ConsumeEventResponse,
CreateEventRequest, CreateEventResponse, EventConsumeMode, EventConsumption, ReadinessState,
WaitEventRequest, WaitEventResponse, WaitOutcome,
WaitEventRequest, WaitEventResponse,
};
pub use message::{
BrokerRequest, BrokerResponse, CoreRequest, CoreResponse, EventRequest, EventResponse,
Expand Down
6 changes: 0 additions & 6 deletions litebox_broker_protocol/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use crate::{
/// domain-specific operations are grouped below it so new object families do not
/// accumulate as unrelated top-level broker variants.
#[derive(Clone, Debug, PartialEq, Eq)]
#[non_exhaustive]
pub enum BrokerRequest {
/// Protocol negotiation request.
Negotiate {
Expand All @@ -26,15 +25,13 @@ pub enum BrokerRequest {

/// Request adapted by the broker host into a BrokerCore domain call.
#[derive(Clone, Debug, PartialEq, Eq)]
#[non_exhaustive]
pub enum CoreRequest {
/// Event object request family.
Event(EventRequest),
}

/// Broker-owned event object request.
#[derive(Clone, Debug, PartialEq, Eq)]
#[non_exhaustive]
pub enum EventRequest {
/// Create a broker-owned event object.
Create(CreateEventRequest),
Expand All @@ -52,7 +49,6 @@ pub enum EventRequest {
/// grouped under [`CoreResponse`] so future object families can evolve without
/// turning the broker envelope into a flat operation/result list.
#[derive(Clone, Debug, PartialEq, Eq)]
#[non_exhaustive]
pub enum BrokerResponse {
/// Negotiation result.
Negotiated {
Expand All @@ -79,15 +75,13 @@ pub enum BrokerResponse {

/// Response returned by a BrokerCore domain request.
#[derive(Clone, Debug, PartialEq, Eq)]
#[non_exhaustive]
pub enum CoreResponse {
/// Event object response family.
Event(EventResponse),
}

/// Broker-owned event object response.
#[derive(Clone, Debug, PartialEq, Eq)]
#[non_exhaustive]
pub enum EventResponse {
/// Create operation response.
Create(CreateEventResponse),
Expand Down
Loading