diff --git a/litebox/src/broker/mod.rs b/litebox/src/broker/mod.rs index 64b5eb03b..8607c4b63 100644 --- a/litebox/src/broker/mod.rs +++ b/litebox/src/broker/mod.rs @@ -2,9 +2,7 @@ // Licensed under the MIT license. use litebox_broker_local::{BrokerLocal, BrokerLocalError}; -use litebox_broker_protocol::{ - BrokerRequest, BrokerResponse, CoreRequest, CoreResponse, LocalControlChannel, -}; +use litebox_broker_protocol::{CoreRequest, CoreResponse, LocalControlChannel}; use crate::sync::{Mutex, RawSyncPrimitivesProvider}; @@ -51,15 +49,14 @@ where let response = self .local .lock() - .request(BrokerRequest::Core(request)) + .request(request) .map_err(|error| match error { + BrokerLocalError::Channel(_) | BrokerLocalError::ChannelClosed => { + BrokerControlError::Transport + } BrokerLocalError::Broker(error) => BrokerControlError::Broker(error), BrokerLocalError::UnexpectedResponse(_) => BrokerControlError::UnexpectedResponse, - _ => BrokerControlError::Transport, })?; - match response { - BrokerResponse::Core(response) => Ok(response), - _ => Err(BrokerControlError::UnexpectedResponse), - } + Ok(response) } } diff --git a/litebox_broker_local/src/error.rs b/litebox_broker_local/src/error.rs index 724b8a889..98201c365 100644 --- a/litebox_broker_local/src/error.rs +++ b/litebox_broker_local/src/error.rs @@ -1,48 +1,16 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. -use litebox_broker_protocol::{BrokerResponse, ErrorCode, ProtocolVersion}; +use litebox_broker_protocol::{BrokerResponse, ErrorCode}; use thiserror::Error; -/// Errors returned by the broker-local control adapter. +/// Errors returned by active broker-local control requests. #[derive(Debug, Error)] -#[non_exhaustive] pub enum BrokerLocalError { #[error("broker channel failed: {0}")] Channel(#[source] E), - #[error("broker local adapter has not negotiated protocol version")] - NotNegotiated, - #[error("broker local adapter already negotiated")] - AlreadyNegotiated, #[error("broker closed the channel")] ChannelClosed, - #[error( - "broker accepted incompatible protocol negotiation: requested {requested:?}, broker supports {broker_protocol_version:?}" - )] - IncompatibleNegotiation { - /// Protocol version requested by this local adapter. - requested: ProtocolVersion, - /// Protocol version advertised by the broker. - broker_protocol_version: ProtocolVersion, - }, - #[error( - "broker local adapter cannot request protocol version {requested:?}; local adapter supports {local_protocol_version:?}" - )] - UnsupportedLocalVersion { - /// Protocol version requested by the caller. - requested: ProtocolVersion, - /// Protocol version supported by this local implementation. - local_protocol_version: ProtocolVersion, - }, - #[error( - "broker does not support requested protocol version {requested:?}; broker supports {broker_protocol_version:?}" - )] - UnsupportedVersion { - /// Protocol version requested by this local adapter. - requested: ProtocolVersion, - /// Protocol version advertised by the broker. - broker_protocol_version: ProtocolVersion, - }, #[error("broker rejected request: {0}")] Broker(#[source] ErrorCode), #[error("broker returned unexpected response: {0:?}")] diff --git a/litebox_broker_local/src/event.rs b/litebox_broker_local/src/event.rs index 1a538735b..2851289fd 100644 --- a/litebox_broker_local/src/event.rs +++ b/litebox_broker_local/src/event.rs @@ -2,8 +2,8 @@ // Licensed under the MIT license. use litebox_broker_protocol::{ - AddEventRequest, BrokerRequest, BrokerResponse, ConsumeEventRequest, ConsumeEventResponse, - CoreRequest, CoreResponse, CreateEventRequest, EventConsumeMode, EventRequest, EventResponse, + AddEventRequest, BrokerResponse, ConsumeEventRequest, ConsumeEventResponse, CoreRequest, + CoreResponse, CreateEventRequest, EventConsumeMode, EventRequest, EventResponse, LocalControlChannel, ObjectHandle, ReadinessState, WaitEventRequest, }; @@ -20,25 +20,24 @@ impl BrokerLocal { &mut self, initial_count: u64, ) -> Result { - match self.request(event_request(EventRequest::Create(CreateEventRequest { - initial_count, - })))? { - BrokerResponse::Core(CoreResponse::Event(EventResponse::Create(response))) => { - Ok(response.handle) - } - response => Err(BrokerLocalError::UnexpectedResponse(response)), + let response = + self.request_event(EventRequest::Create(CreateEventRequest { initial_count }))?; + match response { + EventResponse::Create(response) => Ok(response.handle), + response => Err(BrokerLocalError::UnexpectedResponse(BrokerResponse::Core( + CoreResponse::Event(response), + ))), } } /// Checks whether an event wait would complete now. pub fn wait_event(&mut self, handle: ObjectHandle) -> Result { - match self.request(event_request(EventRequest::Wait(WaitEventRequest { - handle, - })))? { - BrokerResponse::Core(CoreResponse::Event(EventResponse::Wait(response))) => { - Ok(response.readiness) - } - response => Err(BrokerLocalError::UnexpectedResponse(response)), + let response = self.request_event(EventRequest::Wait(WaitEventRequest { handle }))?; + match response { + EventResponse::Wait(response) => Ok(response.readiness), + response => Err(BrokerLocalError::UnexpectedResponse(BrokerResponse::Core( + CoreResponse::Event(response), + ))), } } @@ -48,14 +47,12 @@ impl BrokerLocal { handle: ObjectHandle, value: u64, ) -> Result { - match self.request(event_request(EventRequest::Add(AddEventRequest { - handle, - value, - })))? { - BrokerResponse::Core(CoreResponse::Event(EventResponse::Add(response))) => { - Ok(response.readiness) - } - response => Err(BrokerLocalError::UnexpectedResponse(response)), + let response = self.request_event(EventRequest::Add(AddEventRequest { handle, value }))?; + match response { + EventResponse::Add(response) => Ok(response.readiness), + response => Err(BrokerLocalError::UnexpectedResponse(BrokerResponse::Core( + CoreResponse::Event(response), + ))), } } @@ -65,18 +62,18 @@ impl BrokerLocal { handle: ObjectHandle, mode: EventConsumeMode, ) -> Result { - match self.request(event_request(EventRequest::Consume(ConsumeEventRequest { - handle, - mode, - })))? { - BrokerResponse::Core(CoreResponse::Event(EventResponse::Consume(response))) => { - Ok(response) - } - response => Err(BrokerLocalError::UnexpectedResponse(response)), + let response = + self.request_event(EventRequest::Consume(ConsumeEventRequest { handle, mode }))?; + match response { + EventResponse::Consume(response) => Ok(response), + response => Err(BrokerLocalError::UnexpectedResponse(BrokerResponse::Core( + CoreResponse::Event(response), + ))), } } -} -const fn event_request(request: EventRequest) -> BrokerRequest { - BrokerRequest::Core(CoreRequest::Event(request)) + fn request_event(&mut self, request: EventRequest) -> Result { + let CoreResponse::Event(response) = self.request(CoreRequest::Event(request))?; + Ok(response) + } } diff --git a/litebox_broker_local/src/lib.rs b/litebox_broker_local/src/lib.rs index 41ace4cf4..72b6b3059 100644 --- a/litebox_broker_local/src/lib.rs +++ b/litebox_broker_local/src/lib.rs @@ -16,7 +16,8 @@ mod error; mod event; use litebox_broker_protocol::{ - BROKER_PROTOCOL_VERSION, BrokerRequest, BrokerResponse, LocalControlChannel, + BROKER_PROTOCOL_VERSION, BrokerRequest, BrokerResponse, CoreRequest, CoreResponse, ErrorCode, + LocalControlChannel, }; pub use error::{BrokerLocalError, Result}; @@ -24,200 +25,131 @@ pub use error::{BrokerLocalError, Result}; /// Typed broker-local control adapter for broker operations. pub struct BrokerLocal { channel: T, - state: ConnectionState, } -#[derive(Clone, Copy, Debug, PartialEq, Eq)] -enum ConnectionState { - AwaitingNegotiation, - Active, -} - -impl BrokerLocal { - /// Creates a broker-local control adapter over an already-connected control channel. - pub const fn new(channel: T) -> Self { - Self { - channel, - state: ConnectionState::AwaitingNegotiation, - } - } - +impl BrokerLocal { /// Returns the underlying control channel for deployment-specific configuration. pub fn control_channel_mut(&mut self) -> &mut T { &mut self.channel } -} -impl BrokerLocal { - /// Sends one broker request. - /// - /// Negotiation is the only request allowed before the connection is active. - #[expect( - clippy::match_wildcard_for_single_variants, - reason = "wildcards keep state-machine fallbacks grouped by behavior" - )] - pub fn request(&mut self, request: BrokerRequest) -> Result { - match self.state { - ConnectionState::AwaitingNegotiation => match request { - BrokerRequest::Negotiate { protocol_version } => { - if protocol_version != BROKER_PROTOCOL_VERSION { - return Err(BrokerLocalError::UnsupportedLocalVersion { - requested: protocol_version, - local_protocol_version: BROKER_PROTOCOL_VERSION, - }); - } - - match self.raw_request(BrokerRequest::Negotiate { protocol_version })? { - BrokerResponse::Negotiated { - broker_protocol_version, - } => { - if protocol_version != broker_protocol_version { - return Err(BrokerLocalError::IncompatibleNegotiation { - requested: protocol_version, - broker_protocol_version, - }); - } - self.state = ConnectionState::Active; - Ok(BrokerResponse::Negotiated { - broker_protocol_version, - }) - } - BrokerResponse::VersionMismatch { - broker_protocol_version, - } => Err(BrokerLocalError::UnsupportedVersion { - requested: protocol_version, - broker_protocol_version, - }), - BrokerResponse::Error(error) => Err(BrokerLocalError::Broker(error)), - response => Err(BrokerLocalError::UnexpectedResponse(response)), - } - } - _ => Err(BrokerLocalError::NotNegotiated), - }, - ConnectionState::Active => match request { - BrokerRequest::Negotiate { .. } => Err(BrokerLocalError::AlreadyNegotiated), - request => match self.raw_request(request)? { - BrokerResponse::Error(error) => Err(BrokerLocalError::Broker(error)), - response => Ok(response), - }, + /// Negotiates the broker protocol over an already-connected control channel. + pub fn negotiate(mut channel: T) -> Result { + let requested = BROKER_PROTOCOL_VERSION; + match raw_request( + &mut channel, + BrokerRequest::Negotiate { + protocol_version: requested, }, + )? { + response @ BrokerResponse::Negotiated { + broker_protocol_version, + } => { + if requested != broker_protocol_version { + return Err(BrokerLocalError::UnexpectedResponse(response)); + } + Ok(Self { channel }) + } + BrokerResponse::VersionMismatch { .. } => { + Err(BrokerLocalError::Broker(ErrorCode::UnsupportedVersion)) + } + BrokerResponse::Error(error) => Err(BrokerLocalError::Broker(error)), + response @ BrokerResponse::Core(_) => { + Err(BrokerLocalError::UnexpectedResponse(response)) + } } } - fn raw_request(&mut self, request: BrokerRequest) -> Result { - self.channel - .send_request(&request) - .map_err(BrokerLocalError::Channel)?; - self.channel - .recv_response() - .map_err(BrokerLocalError::Channel)? - .ok_or(BrokerLocalError::ChannelClosed) + /// Sends one active BrokerCore request. + pub fn request(&mut self, request: CoreRequest) -> Result { + match raw_request(&mut self.channel, BrokerRequest::Core(request))? { + BrokerResponse::Core(response) => Ok(response), + BrokerResponse::Error(error) => Err(BrokerLocalError::Broker(error)), + response => Err(BrokerLocalError::UnexpectedResponse(response)), + } } } +fn raw_request( + channel: &mut T, + request: BrokerRequest, +) -> Result { + channel + .send_request(&request) + .map_err(BrokerLocalError::Channel)?; + channel + .recv_response() + .map_err(BrokerLocalError::Channel)? + .ok_or(BrokerLocalError::ChannelClosed) +} + #[cfg(test)] mod tests { use super::*; use core::convert::Infallible; - use litebox_broker_protocol::ProtocolVersion; - - #[test] - fn event_operations_require_negotiation_without_sending() { - let channel = FakeControlChannel::new(None); - let mut local = BrokerLocal::new(channel); - - assert!(matches!( - local.create_event(), - Err(BrokerLocalError::NotNegotiated) - )); - assert_eq!(local.channel.sent_request, None); - } + use litebox_broker_protocol::{ + CreateEventRequest, CreateEventResponse, EventRequest, EventResponse, ObjectHandle, + ProtocolVersion, + }; #[test] - fn negotiation_request_activates_local_connection() { - let requested = BROKER_PROTOCOL_VERSION; + fn negotiate_returns_active_local_connection() { let channel = FakeControlChannel::new(Some(BrokerResponse::Negotiated { broker_protocol_version: BROKER_PROTOCOL_VERSION, })); - let mut local = BrokerLocal::new(channel); + let local = BrokerLocal::negotiate(channel).unwrap(); - assert_eq!( - local - .request(BrokerRequest::Negotiate { - protocol_version: requested - }) - .unwrap(), - BrokerResponse::Negotiated { - broker_protocol_version: BROKER_PROTOCOL_VERSION - } - ); assert_eq!( local.channel.sent_request, Some(BrokerRequest::Negotiate { - protocol_version: requested + protocol_version: BROKER_PROTOCOL_VERSION }) ); - assert_eq!(local.state, ConnectionState::Active); } #[test] - fn negotiation_request_rejects_locally_unsupported_version_without_sending() { - let too_new = ProtocolVersion(BROKER_PROTOCOL_VERSION.0 + 1); - let channel = FakeControlChannel::new(None); - let mut local = BrokerLocal::new(channel); + fn active_request_sends_core_request() { + let handle = ObjectHandle(7); + let request = CoreRequest::Event(EventRequest::Create(CreateEventRequest { + initial_count: 0, + })); + let response = CoreResponse::Event(EventResponse::Create(CreateEventResponse { handle })); + let channel = FakeControlChannel::new(Some(BrokerResponse::Core(response.clone()))); + let mut local = BrokerLocal { channel }; - assert!(matches!( - local.request(BrokerRequest::Negotiate { - protocol_version: too_new - }), - Err(BrokerLocalError::UnsupportedLocalVersion { - requested, - local_protocol_version - }) if requested == too_new && local_protocol_version == BROKER_PROTOCOL_VERSION - )); - assert_eq!(local.state, ConnectionState::AwaitingNegotiation); - assert_eq!(local.channel.sent_request, None); + assert_eq!(local.request(request.clone()).unwrap(), response); + assert_eq!( + local.channel.sent_request, + Some(BrokerRequest::Core(request)) + ); } #[test] - fn negotiation_request_rejects_broker_different_version_response() { + fn negotiate_rejects_broker_different_version_response() { let broker_protocol_version = ProtocolVersion(BROKER_PROTOCOL_VERSION.0 + 1); let channel = FakeControlChannel::new(Some(BrokerResponse::Negotiated { broker_protocol_version, })); - let mut local = BrokerLocal::new(channel); assert!(matches!( - local.request(BrokerRequest::Negotiate { - protocol_version: BROKER_PROTOCOL_VERSION - }), - Err(BrokerLocalError::IncompatibleNegotiation { - requested, + BrokerLocal::negotiate(channel), + Err(BrokerLocalError::UnexpectedResponse(BrokerResponse::Negotiated { broker_protocol_version: broker - }) if requested == BROKER_PROTOCOL_VERSION && broker == broker_protocol_version + })) if broker == broker_protocol_version )); - assert_eq!(local.state, ConnectionState::AwaitingNegotiation); - assert_eq!( - local.channel.sent_request, - Some(BrokerRequest::Negotiate { - protocol_version: BROKER_PROTOCOL_VERSION - }) - ); } #[test] - fn active_connection_rejects_negotiation_without_sending() { - let channel = FakeControlChannel::new(None); - let mut local = BrokerLocal::new(channel); - local.state = ConnectionState::Active; + fn negotiate_rejects_broker_unsupported_version_response() { + let broker_protocol_version = ProtocolVersion(BROKER_PROTOCOL_VERSION.0 + 1); + let channel = FakeControlChannel::new(Some(BrokerResponse::VersionMismatch { + broker_protocol_version, + })); assert!(matches!( - local.request(BrokerRequest::Negotiate { - protocol_version: BROKER_PROTOCOL_VERSION - }), - Err(BrokerLocalError::AlreadyNegotiated) + BrokerLocal::negotiate(channel), + Err(BrokerLocalError::Broker(ErrorCode::UnsupportedVersion)) )); - assert_eq!(local.channel.sent_request, None); } struct FakeControlChannel { diff --git a/litebox_broker_userland/tests/userland_broker.rs b/litebox_broker_userland/tests/userland_broker.rs index 883b65f38..d0ccb967c 100644 --- a/litebox_broker_userland/tests/userland_broker.rs +++ b/litebox_broker_userland/tests/userland_broker.rs @@ -10,9 +10,7 @@ use std::thread; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use litebox_broker_local::BrokerLocal; -use litebox_broker_protocol::{ - BROKER_PROTOCOL_VERSION, BrokerRequest, BrokerResponse, ReadinessState, -}; +use litebox_broker_protocol::ReadinessState; use litebox_broker_transport::unix_socket::UnixStreamLocalControlChannel; #[test] @@ -23,18 +21,7 @@ fn separate_process_broker_serves_event_object_requests() { channel .set_io_timeout(Some(Duration::from_secs(5))) .unwrap(); - let mut local = BrokerLocal::new(channel); - - assert_eq!( - local - .request(BrokerRequest::Negotiate { - protocol_version: BROKER_PROTOCOL_VERSION, - }) - .unwrap(), - BrokerResponse::Negotiated { - broker_protocol_version: BROKER_PROTOCOL_VERSION - } - ); + let mut local = BrokerLocal::negotiate(channel).unwrap(); let handle = local.create_event().unwrap(); assert_eq!( diff --git a/litebox_runner_linux_userland/src/broker.rs b/litebox_runner_linux_userland/src/broker.rs index f7e5b38c0..1d7a2525e 100644 --- a/litebox_runner_linux_userland/src/broker.rs +++ b/litebox_runner_linux_userland/src/broker.rs @@ -9,7 +9,6 @@ use std::{ use anyhow::{Context as _, Result}; use litebox_broker_local::BrokerLocal; -use litebox_broker_protocol::{BROKER_PROTOCOL_VERSION, BrokerRequest}; use litebox_broker_transport::unix_socket::UnixStreamLocalControlChannel; const SETUP_TIMEOUT: Duration = Duration::from_secs(5); @@ -52,12 +51,7 @@ fn connect_with_retry(socket_path: &Path, setup_deadline: Instant) -> Result {