From 077fe445649f59283d70505e69d435c58167f356 Mon Sep 17 00:00:00 2001 From: Weidong Cui Date: Tue, 23 Jun 2026 18:33:09 -0700 Subject: [PATCH 1/6] Make broker local negotiation explicit Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- litebox/src/broker/mod.rs | 19 +- litebox_broker_local/src/error.rs | 14 -- litebox_broker_local/src/event.rs | 61 +++-- litebox_broker_local/src/lib.rs | 222 +++++++----------- .../tests/userland_broker.rs | 17 +- litebox_runner_linux_userland/src/broker.rs | 8 +- 6 files changed, 123 insertions(+), 218 deletions(-) diff --git a/litebox/src/broker/mod.rs b/litebox/src/broker/mod.rs index 64b5eb03b..606f50f70 100644 --- a/litebox/src/broker/mod.rs +++ b/litebox/src/broker/mod.rs @@ -2,9 +2,7 @@ // Licensed under the MIT license. use litebox_broker_local::{BrokerLocal, BrokerLocalError}; -use litebox_broker_protocol::{ - BrokerRequest, BrokerResponse, CoreRequest, CoreResponse, LocalControlChannel, -}; +use litebox_broker_protocol::{CoreRequest, CoreResponse, LocalControlChannel}; use crate::sync::{Mutex, RawSyncPrimitivesProvider}; @@ -51,15 +49,18 @@ where let response = self .local .lock() - .request(BrokerRequest::Core(request)) + .request(request) .map_err(|error| match error { + BrokerLocalError::Channel(_) | BrokerLocalError::ChannelClosed => { + BrokerControlError::Transport + } BrokerLocalError::Broker(error) => BrokerControlError::Broker(error), BrokerLocalError::UnexpectedResponse(_) => BrokerControlError::UnexpectedResponse, - _ => BrokerControlError::Transport, + BrokerLocalError::IncompatibleNegotiation { .. } + | BrokerLocalError::UnsupportedVersion { .. } => { + unreachable!("negotiation errors cannot occur on active broker requests") + } })?; - match response { - BrokerResponse::Core(response) => Ok(response), - _ => Err(BrokerControlError::UnexpectedResponse), - } + Ok(response) } } diff --git a/litebox_broker_local/src/error.rs b/litebox_broker_local/src/error.rs index 724b8a889..792b8ef63 100644 --- a/litebox_broker_local/src/error.rs +++ b/litebox_broker_local/src/error.rs @@ -6,14 +6,9 @@ use thiserror::Error; /// Errors returned by the broker-local control adapter. #[derive(Debug, Error)] -#[non_exhaustive] pub enum BrokerLocalError { #[error("broker channel failed: {0}")] Channel(#[source] E), - #[error("broker local adapter has not negotiated protocol version")] - NotNegotiated, - #[error("broker local adapter already negotiated")] - AlreadyNegotiated, #[error("broker closed the channel")] ChannelClosed, #[error( @@ -25,15 +20,6 @@ pub enum BrokerLocalError { /// Protocol version advertised by the broker. broker_protocol_version: ProtocolVersion, }, - #[error( - "broker local adapter cannot request protocol version {requested:?}; local adapter supports {local_protocol_version:?}" - )] - UnsupportedLocalVersion { - /// Protocol version requested by the caller. - requested: ProtocolVersion, - /// Protocol version supported by this local implementation. - local_protocol_version: ProtocolVersion, - }, #[error( "broker does not support requested protocol version {requested:?}; broker supports {broker_protocol_version:?}" )] diff --git a/litebox_broker_local/src/event.rs b/litebox_broker_local/src/event.rs index 1a538735b..63de775c3 100644 --- a/litebox_broker_local/src/event.rs +++ b/litebox_broker_local/src/event.rs @@ -2,8 +2,8 @@ // Licensed under the MIT license. use litebox_broker_protocol::{ - AddEventRequest, BrokerRequest, BrokerResponse, ConsumeEventRequest, ConsumeEventResponse, - CoreRequest, CoreResponse, CreateEventRequest, EventConsumeMode, EventRequest, EventResponse, + AddEventRequest, BrokerResponse, ConsumeEventRequest, ConsumeEventResponse, CoreRequest, + CoreResponse, CreateEventRequest, EventConsumeMode, EventRequest, EventResponse, LocalControlChannel, ObjectHandle, ReadinessState, WaitEventRequest, }; @@ -20,25 +20,20 @@ impl BrokerLocal { &mut self, initial_count: u64, ) -> Result { - match self.request(event_request(EventRequest::Create(CreateEventRequest { - initial_count, - })))? { - BrokerResponse::Core(CoreResponse::Event(EventResponse::Create(response))) => { - Ok(response.handle) - } - response => Err(BrokerLocalError::UnexpectedResponse(response)), + let response = + self.request_event(EventRequest::Create(CreateEventRequest { initial_count }))?; + match response { + EventResponse::Create(response) => Ok(response.handle), + response => Err(unexpected_event_response(response)), } } /// Checks whether an event wait would complete now. pub fn wait_event(&mut self, handle: ObjectHandle) -> Result { - match self.request(event_request(EventRequest::Wait(WaitEventRequest { - handle, - })))? { - BrokerResponse::Core(CoreResponse::Event(EventResponse::Wait(response))) => { - Ok(response.readiness) - } - response => Err(BrokerLocalError::UnexpectedResponse(response)), + let response = self.request_event(EventRequest::Wait(WaitEventRequest { handle }))?; + match response { + EventResponse::Wait(response) => Ok(response.readiness), + response => Err(unexpected_event_response(response)), } } @@ -48,14 +43,10 @@ impl BrokerLocal { handle: ObjectHandle, value: u64, ) -> Result { - match self.request(event_request(EventRequest::Add(AddEventRequest { - handle, - value, - })))? { - BrokerResponse::Core(CoreResponse::Event(EventResponse::Add(response))) => { - Ok(response.readiness) - } - response => Err(BrokerLocalError::UnexpectedResponse(response)), + let response = self.request_event(EventRequest::Add(AddEventRequest { handle, value }))?; + match response { + EventResponse::Add(response) => Ok(response.readiness), + response => Err(unexpected_event_response(response)), } } @@ -65,18 +56,20 @@ impl BrokerLocal { handle: ObjectHandle, mode: EventConsumeMode, ) -> Result { - match self.request(event_request(EventRequest::Consume(ConsumeEventRequest { - handle, - mode, - })))? { - BrokerResponse::Core(CoreResponse::Event(EventResponse::Consume(response))) => { - Ok(response) - } - response => Err(BrokerLocalError::UnexpectedResponse(response)), + let response = + self.request_event(EventRequest::Consume(ConsumeEventRequest { handle, mode }))?; + match response { + EventResponse::Consume(response) => Ok(response), + response => Err(unexpected_event_response(response)), } } + + fn request_event(&mut self, request: EventRequest) -> Result { + let CoreResponse::Event(response) = self.request(CoreRequest::Event(request))?; + Ok(response) + } } -const fn event_request(request: EventRequest) -> BrokerRequest { - BrokerRequest::Core(CoreRequest::Event(request)) +fn unexpected_event_response(response: EventResponse) -> BrokerLocalError { + BrokerLocalError::UnexpectedResponse(BrokerResponse::Core(CoreResponse::Event(response))) } diff --git a/litebox_broker_local/src/lib.rs b/litebox_broker_local/src/lib.rs index 41ace4cf4..8b6128f14 100644 --- a/litebox_broker_local/src/lib.rs +++ b/litebox_broker_local/src/lib.rs @@ -16,7 +16,8 @@ mod error; mod event; use litebox_broker_protocol::{ - BROKER_PROTOCOL_VERSION, BrokerRequest, BrokerResponse, LocalControlChannel, + BROKER_PROTOCOL_VERSION, BrokerRequest, BrokerResponse, CoreRequest, CoreResponse, + LocalControlChannel, }; pub use error::{BrokerLocalError, Result}; @@ -24,24 +25,9 @@ pub use error::{BrokerLocalError, Result}; /// Typed broker-local control adapter for broker operations. pub struct BrokerLocal { channel: T, - state: ConnectionState, -} - -#[derive(Clone, Copy, Debug, PartialEq, Eq)] -enum ConnectionState { - AwaitingNegotiation, - Active, } impl BrokerLocal { - /// Creates a broker-local control adapter over an already-connected control channel. - pub const fn new(channel: T) -> Self { - Self { - channel, - state: ConnectionState::AwaitingNegotiation, - } - } - /// Returns the underlying control channel for deployment-specific configuration. pub fn control_channel_mut(&mut self) -> &mut T { &mut self.channel @@ -49,175 +35,133 @@ impl BrokerLocal { } impl BrokerLocal { - /// Sends one broker request. - /// - /// Negotiation is the only request allowed before the connection is active. - #[expect( - clippy::match_wildcard_for_single_variants, - reason = "wildcards keep state-machine fallbacks grouped by behavior" - )] - pub fn request(&mut self, request: BrokerRequest) -> Result { - match self.state { - ConnectionState::AwaitingNegotiation => match request { - BrokerRequest::Negotiate { protocol_version } => { - if protocol_version != BROKER_PROTOCOL_VERSION { - return Err(BrokerLocalError::UnsupportedLocalVersion { - requested: protocol_version, - local_protocol_version: BROKER_PROTOCOL_VERSION, - }); - } - - match self.raw_request(BrokerRequest::Negotiate { protocol_version })? { - BrokerResponse::Negotiated { - broker_protocol_version, - } => { - if protocol_version != broker_protocol_version { - return Err(BrokerLocalError::IncompatibleNegotiation { - requested: protocol_version, - broker_protocol_version, - }); - } - self.state = ConnectionState::Active; - Ok(BrokerResponse::Negotiated { - broker_protocol_version, - }) - } - BrokerResponse::VersionMismatch { - broker_protocol_version, - } => Err(BrokerLocalError::UnsupportedVersion { - requested: protocol_version, - broker_protocol_version, - }), - BrokerResponse::Error(error) => Err(BrokerLocalError::Broker(error)), - response => Err(BrokerLocalError::UnexpectedResponse(response)), - } - } - _ => Err(BrokerLocalError::NotNegotiated), - }, - ConnectionState::Active => match request { - BrokerRequest::Negotiate { .. } => Err(BrokerLocalError::AlreadyNegotiated), - request => match self.raw_request(request)? { - BrokerResponse::Error(error) => Err(BrokerLocalError::Broker(error)), - response => Ok(response), - }, + /// Negotiates the broker protocol over an already-connected control channel. + pub fn negotiate(mut channel: T) -> Result { + let requested = BROKER_PROTOCOL_VERSION; + match raw_request( + &mut channel, + BrokerRequest::Negotiate { + protocol_version: requested, }, + )? { + BrokerResponse::Negotiated { + broker_protocol_version, + } => { + if requested != broker_protocol_version { + return Err(BrokerLocalError::IncompatibleNegotiation { + requested, + broker_protocol_version, + }); + } + Ok(Self { channel }) + } + BrokerResponse::VersionMismatch { + broker_protocol_version, + } => Err(BrokerLocalError::UnsupportedVersion { + requested, + broker_protocol_version, + }), + BrokerResponse::Error(error) => Err(BrokerLocalError::Broker(error)), + response @ BrokerResponse::Core(_) => { + Err(BrokerLocalError::UnexpectedResponse(response)) + } } } - fn raw_request(&mut self, request: BrokerRequest) -> Result { - self.channel - .send_request(&request) - .map_err(BrokerLocalError::Channel)?; - self.channel - .recv_response() - .map_err(BrokerLocalError::Channel)? - .ok_or(BrokerLocalError::ChannelClosed) + /// Sends one active BrokerCore request. + pub fn request(&mut self, request: CoreRequest) -> Result { + match raw_request(&mut self.channel, BrokerRequest::Core(request))? { + BrokerResponse::Core(response) => Ok(response), + BrokerResponse::Error(error) => Err(BrokerLocalError::Broker(error)), + response => Err(BrokerLocalError::UnexpectedResponse(response)), + } } } +fn raw_request( + channel: &mut T, + request: BrokerRequest, +) -> Result { + channel + .send_request(&request) + .map_err(BrokerLocalError::Channel)?; + channel + .recv_response() + .map_err(BrokerLocalError::Channel)? + .ok_or(BrokerLocalError::ChannelClosed) +} + #[cfg(test)] mod tests { use super::*; use core::convert::Infallible; - use litebox_broker_protocol::ProtocolVersion; + use litebox_broker_protocol::{ + CreateEventRequest, CreateEventResponse, EventRequest, EventResponse, ObjectHandle, + ProtocolVersion, + }; #[test] - fn event_operations_require_negotiation_without_sending() { - let channel = FakeControlChannel::new(None); - let mut local = BrokerLocal::new(channel); - - assert!(matches!( - local.create_event(), - Err(BrokerLocalError::NotNegotiated) - )); - assert_eq!(local.channel.sent_request, None); - } - - #[test] - fn negotiation_request_activates_local_connection() { - let requested = BROKER_PROTOCOL_VERSION; + fn negotiate_returns_active_local_connection() { let channel = FakeControlChannel::new(Some(BrokerResponse::Negotiated { broker_protocol_version: BROKER_PROTOCOL_VERSION, })); - let mut local = BrokerLocal::new(channel); + let local = BrokerLocal::negotiate(channel).unwrap(); - assert_eq!( - local - .request(BrokerRequest::Negotiate { - protocol_version: requested - }) - .unwrap(), - BrokerResponse::Negotiated { - broker_protocol_version: BROKER_PROTOCOL_VERSION - } - ); assert_eq!( local.channel.sent_request, Some(BrokerRequest::Negotiate { - protocol_version: requested + protocol_version: BROKER_PROTOCOL_VERSION }) ); - assert_eq!(local.state, ConnectionState::Active); } #[test] - fn negotiation_request_rejects_locally_unsupported_version_without_sending() { - let too_new = ProtocolVersion(BROKER_PROTOCOL_VERSION.0 + 1); - let channel = FakeControlChannel::new(None); - let mut local = BrokerLocal::new(channel); + fn active_request_sends_core_request() { + let handle = ObjectHandle(7); + let request = CoreRequest::Event(EventRequest::Create(CreateEventRequest { + initial_count: 0, + })); + let response = CoreResponse::Event(EventResponse::Create(CreateEventResponse { handle })); + let channel = FakeControlChannel::new(Some(BrokerResponse::Core(response.clone()))); + let mut local = BrokerLocal { channel }; - assert!(matches!( - local.request(BrokerRequest::Negotiate { - protocol_version: too_new - }), - Err(BrokerLocalError::UnsupportedLocalVersion { - requested, - local_protocol_version - }) if requested == too_new && local_protocol_version == BROKER_PROTOCOL_VERSION - )); - assert_eq!(local.state, ConnectionState::AwaitingNegotiation); - assert_eq!(local.channel.sent_request, None); + assert_eq!(local.request(request.clone()).unwrap(), response); + assert_eq!( + local.channel.sent_request, + Some(BrokerRequest::Core(request)) + ); } #[test] - fn negotiation_request_rejects_broker_different_version_response() { + fn negotiate_rejects_broker_different_version_response() { let broker_protocol_version = ProtocolVersion(BROKER_PROTOCOL_VERSION.0 + 1); let channel = FakeControlChannel::new(Some(BrokerResponse::Negotiated { broker_protocol_version, })); - let mut local = BrokerLocal::new(channel); assert!(matches!( - local.request(BrokerRequest::Negotiate { - protocol_version: BROKER_PROTOCOL_VERSION - }), + BrokerLocal::negotiate(channel), Err(BrokerLocalError::IncompatibleNegotiation { requested, broker_protocol_version: broker }) if requested == BROKER_PROTOCOL_VERSION && broker == broker_protocol_version )); - assert_eq!(local.state, ConnectionState::AwaitingNegotiation); - assert_eq!( - local.channel.sent_request, - Some(BrokerRequest::Negotiate { - protocol_version: BROKER_PROTOCOL_VERSION - }) - ); } #[test] - fn active_connection_rejects_negotiation_without_sending() { - let channel = FakeControlChannel::new(None); - let mut local = BrokerLocal::new(channel); - local.state = ConnectionState::Active; + fn negotiate_rejects_broker_unsupported_version_response() { + let broker_protocol_version = ProtocolVersion(BROKER_PROTOCOL_VERSION.0 + 1); + let channel = FakeControlChannel::new(Some(BrokerResponse::VersionMismatch { + broker_protocol_version, + })); assert!(matches!( - local.request(BrokerRequest::Negotiate { - protocol_version: BROKER_PROTOCOL_VERSION - }), - Err(BrokerLocalError::AlreadyNegotiated) + BrokerLocal::negotiate(channel), + Err(BrokerLocalError::UnsupportedVersion { + requested, + broker_protocol_version: broker + }) if requested == BROKER_PROTOCOL_VERSION && broker == broker_protocol_version )); - assert_eq!(local.channel.sent_request, None); } struct FakeControlChannel { diff --git a/litebox_broker_userland/tests/userland_broker.rs b/litebox_broker_userland/tests/userland_broker.rs index 883b65f38..d0ccb967c 100644 --- a/litebox_broker_userland/tests/userland_broker.rs +++ b/litebox_broker_userland/tests/userland_broker.rs @@ -10,9 +10,7 @@ use std::thread; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use litebox_broker_local::BrokerLocal; -use litebox_broker_protocol::{ - BROKER_PROTOCOL_VERSION, BrokerRequest, BrokerResponse, ReadinessState, -}; +use litebox_broker_protocol::ReadinessState; use litebox_broker_transport::unix_socket::UnixStreamLocalControlChannel; #[test] @@ -23,18 +21,7 @@ fn separate_process_broker_serves_event_object_requests() { channel .set_io_timeout(Some(Duration::from_secs(5))) .unwrap(); - let mut local = BrokerLocal::new(channel); - - assert_eq!( - local - .request(BrokerRequest::Negotiate { - protocol_version: BROKER_PROTOCOL_VERSION, - }) - .unwrap(), - BrokerResponse::Negotiated { - broker_protocol_version: BROKER_PROTOCOL_VERSION - } - ); + let mut local = BrokerLocal::negotiate(channel).unwrap(); let handle = local.create_event().unwrap(); assert_eq!( diff --git a/litebox_runner_linux_userland/src/broker.rs b/litebox_runner_linux_userland/src/broker.rs index f7e5b38c0..1d7a2525e 100644 --- a/litebox_runner_linux_userland/src/broker.rs +++ b/litebox_runner_linux_userland/src/broker.rs @@ -9,7 +9,6 @@ use std::{ use anyhow::{Context as _, Result}; use litebox_broker_local::BrokerLocal; -use litebox_broker_protocol::{BROKER_PROTOCOL_VERSION, BrokerRequest}; use litebox_broker_transport::unix_socket::UnixStreamLocalControlChannel; const SETUP_TIMEOUT: Duration = Duration::from_secs(5); @@ -52,12 +51,7 @@ fn connect_with_retry(socket_path: &Path, setup_deadline: Instant) -> Result { From e263e10ac3b42a2ba2063596338630c428c39376 Mon Sep 17 00:00:00 2001 From: Weidong Cui Date: Tue, 23 Jun 2026 18:41:53 -0700 Subject: [PATCH 2/6] Split broker local negotiation errors Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- litebox/src/broker/mod.rs | 4 ---- litebox_broker_local/src/error.rs | 20 ++++++++++++++-- litebox_broker_local/src/lib.rs | 39 ++++++++++++++++++------------- 3 files changed, 41 insertions(+), 22 deletions(-) diff --git a/litebox/src/broker/mod.rs b/litebox/src/broker/mod.rs index 606f50f70..8607c4b63 100644 --- a/litebox/src/broker/mod.rs +++ b/litebox/src/broker/mod.rs @@ -56,10 +56,6 @@ where } BrokerLocalError::Broker(error) => BrokerControlError::Broker(error), BrokerLocalError::UnexpectedResponse(_) => BrokerControlError::UnexpectedResponse, - BrokerLocalError::IncompatibleNegotiation { .. } - | BrokerLocalError::UnsupportedVersion { .. } => { - unreachable!("negotiation errors cannot occur on active broker requests") - } })?; Ok(response) } diff --git a/litebox_broker_local/src/error.rs b/litebox_broker_local/src/error.rs index 792b8ef63..4efed710c 100644 --- a/litebox_broker_local/src/error.rs +++ b/litebox_broker_local/src/error.rs @@ -4,9 +4,9 @@ use litebox_broker_protocol::{BrokerResponse, ErrorCode, ProtocolVersion}; use thiserror::Error; -/// Errors returned by the broker-local control adapter. +/// Errors returned while negotiating the broker-local control adapter. #[derive(Debug, Error)] -pub enum BrokerLocalError { +pub enum BrokerLocalNegotiationError { #[error("broker channel failed: {0}")] Channel(#[source] E), #[error("broker closed the channel")] @@ -35,5 +35,21 @@ pub enum BrokerLocalError { UnexpectedResponse(BrokerResponse), } +/// Errors returned by active broker-local control requests. +#[derive(Debug, Error)] +pub enum BrokerLocalError { + #[error("broker channel failed: {0}")] + Channel(#[source] E), + #[error("broker closed the channel")] + ChannelClosed, + #[error("broker rejected request: {0}")] + Broker(#[source] ErrorCode), + #[error("broker returned unexpected response: {0:?}")] + UnexpectedResponse(BrokerResponse), +} + +/// Broker-local control adapter negotiation result type. +pub type NegotiationResult = core::result::Result>; + /// Broker-local control adapter result type. pub type Result = core::result::Result>; diff --git a/litebox_broker_local/src/lib.rs b/litebox_broker_local/src/lib.rs index 8b6128f14..ac6c4154f 100644 --- a/litebox_broker_local/src/lib.rs +++ b/litebox_broker_local/src/lib.rs @@ -20,7 +20,7 @@ use litebox_broker_protocol::{ LocalControlChannel, }; -pub use error::{BrokerLocalError, Result}; +pub use error::{BrokerLocalError, BrokerLocalNegotiationError, NegotiationResult, Result}; /// Typed broker-local control adapter for broker operations. pub struct BrokerLocal { @@ -36,19 +36,21 @@ impl BrokerLocal { impl BrokerLocal { /// Negotiates the broker protocol over an already-connected control channel. - pub fn negotiate(mut channel: T) -> Result { + pub fn negotiate(mut channel: T) -> NegotiationResult { let requested = BROKER_PROTOCOL_VERSION; match raw_request( &mut channel, BrokerRequest::Negotiate { protocol_version: requested, }, + BrokerLocalNegotiationError::Channel, + BrokerLocalNegotiationError::ChannelClosed, )? { BrokerResponse::Negotiated { broker_protocol_version, } => { if requested != broker_protocol_version { - return Err(BrokerLocalError::IncompatibleNegotiation { + return Err(BrokerLocalNegotiationError::IncompatibleNegotiation { requested, broker_protocol_version, }); @@ -57,20 +59,25 @@ impl BrokerLocal { } BrokerResponse::VersionMismatch { broker_protocol_version, - } => Err(BrokerLocalError::UnsupportedVersion { + } => Err(BrokerLocalNegotiationError::UnsupportedVersion { requested, broker_protocol_version, }), - BrokerResponse::Error(error) => Err(BrokerLocalError::Broker(error)), + BrokerResponse::Error(error) => Err(BrokerLocalNegotiationError::Broker(error)), response @ BrokerResponse::Core(_) => { - Err(BrokerLocalError::UnexpectedResponse(response)) + Err(BrokerLocalNegotiationError::UnexpectedResponse(response)) } } } /// Sends one active BrokerCore request. pub fn request(&mut self, request: CoreRequest) -> Result { - match raw_request(&mut self.channel, BrokerRequest::Core(request))? { + match raw_request( + &mut self.channel, + BrokerRequest::Core(request), + BrokerLocalError::Channel, + BrokerLocalError::ChannelClosed, + )? { BrokerResponse::Core(response) => Ok(response), BrokerResponse::Error(error) => Err(BrokerLocalError::Broker(error)), response => Err(BrokerLocalError::UnexpectedResponse(response)), @@ -78,17 +85,17 @@ impl BrokerLocal { } } -fn raw_request( +fn raw_request( channel: &mut T, request: BrokerRequest, -) -> Result { - channel - .send_request(&request) - .map_err(BrokerLocalError::Channel)?; + channel_error: impl Fn(T::Error) -> E, + channel_closed: E, +) -> core::result::Result { + channel.send_request(&request).map_err(&channel_error)?; channel .recv_response() - .map_err(BrokerLocalError::Channel)? - .ok_or(BrokerLocalError::ChannelClosed) + .map_err(&channel_error)? + .ok_or(channel_closed) } #[cfg(test)] @@ -141,7 +148,7 @@ mod tests { assert!(matches!( BrokerLocal::negotiate(channel), - Err(BrokerLocalError::IncompatibleNegotiation { + Err(BrokerLocalNegotiationError::IncompatibleNegotiation { requested, broker_protocol_version: broker }) if requested == BROKER_PROTOCOL_VERSION && broker == broker_protocol_version @@ -157,7 +164,7 @@ mod tests { assert!(matches!( BrokerLocal::negotiate(channel), - Err(BrokerLocalError::UnsupportedVersion { + Err(BrokerLocalNegotiationError::UnsupportedVersion { requested, broker_protocol_version: broker }) if requested == BROKER_PROTOCOL_VERSION && broker == broker_protocol_version From d2361799e513f61724a9f140f681305902976c53 Mon Sep 17 00:00:00 2001 From: Weidong Cui Date: Tue, 23 Jun 2026 18:45:35 -0700 Subject: [PATCH 3/6] Share broker local request errors Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- litebox_broker_local/src/error.rs | 32 +++++++++++++------------------ litebox_broker_local/src/lib.rs | 27 ++++++++++---------------- 2 files changed, 23 insertions(+), 36 deletions(-) diff --git a/litebox_broker_local/src/error.rs b/litebox_broker_local/src/error.rs index 4efed710c..2056e3690 100644 --- a/litebox_broker_local/src/error.rs +++ b/litebox_broker_local/src/error.rs @@ -4,13 +4,24 @@ use litebox_broker_protocol::{BrokerResponse, ErrorCode, ProtocolVersion}; use thiserror::Error; -/// Errors returned while negotiating the broker-local control adapter. +/// Errors returned by active broker-local control requests. #[derive(Debug, Error)] -pub enum BrokerLocalNegotiationError { +pub enum BrokerLocalError { #[error("broker channel failed: {0}")] Channel(#[source] E), #[error("broker closed the channel")] ChannelClosed, + #[error("broker rejected request: {0}")] + Broker(#[source] ErrorCode), + #[error("broker returned unexpected response: {0:?}")] + UnexpectedResponse(BrokerResponse), +} + +/// Errors returned while negotiating the broker-local control adapter. +#[derive(Debug, Error)] +pub enum BrokerLocalNegotiationError { + #[error(transparent)] + Control(#[from] BrokerLocalError), #[error( "broker accepted incompatible protocol negotiation: requested {requested:?}, broker supports {broker_protocol_version:?}" )] @@ -29,23 +40,6 @@ pub enum BrokerLocalNegotiationError { /// Protocol version advertised by the broker. broker_protocol_version: ProtocolVersion, }, - #[error("broker rejected request: {0}")] - Broker(#[source] ErrorCode), - #[error("broker returned unexpected response: {0:?}")] - UnexpectedResponse(BrokerResponse), -} - -/// Errors returned by active broker-local control requests. -#[derive(Debug, Error)] -pub enum BrokerLocalError { - #[error("broker channel failed: {0}")] - Channel(#[source] E), - #[error("broker closed the channel")] - ChannelClosed, - #[error("broker rejected request: {0}")] - Broker(#[source] ErrorCode), - #[error("broker returned unexpected response: {0:?}")] - UnexpectedResponse(BrokerResponse), } /// Broker-local control adapter negotiation result type. diff --git a/litebox_broker_local/src/lib.rs b/litebox_broker_local/src/lib.rs index ac6c4154f..b4bab8213 100644 --- a/litebox_broker_local/src/lib.rs +++ b/litebox_broker_local/src/lib.rs @@ -43,8 +43,6 @@ impl BrokerLocal { BrokerRequest::Negotiate { protocol_version: requested, }, - BrokerLocalNegotiationError::Channel, - BrokerLocalNegotiationError::ChannelClosed, )? { BrokerResponse::Negotiated { broker_protocol_version, @@ -63,21 +61,16 @@ impl BrokerLocal { requested, broker_protocol_version, }), - BrokerResponse::Error(error) => Err(BrokerLocalNegotiationError::Broker(error)), + BrokerResponse::Error(error) => Err(BrokerLocalError::Broker(error).into()), response @ BrokerResponse::Core(_) => { - Err(BrokerLocalNegotiationError::UnexpectedResponse(response)) + Err(BrokerLocalError::UnexpectedResponse(response).into()) } } } /// Sends one active BrokerCore request. pub fn request(&mut self, request: CoreRequest) -> Result { - match raw_request( - &mut self.channel, - BrokerRequest::Core(request), - BrokerLocalError::Channel, - BrokerLocalError::ChannelClosed, - )? { + match raw_request(&mut self.channel, BrokerRequest::Core(request))? { BrokerResponse::Core(response) => Ok(response), BrokerResponse::Error(error) => Err(BrokerLocalError::Broker(error)), response => Err(BrokerLocalError::UnexpectedResponse(response)), @@ -85,17 +78,17 @@ impl BrokerLocal { } } -fn raw_request( +fn raw_request( channel: &mut T, request: BrokerRequest, - channel_error: impl Fn(T::Error) -> E, - channel_closed: E, -) -> core::result::Result { - channel.send_request(&request).map_err(&channel_error)?; +) -> Result { + channel + .send_request(&request) + .map_err(BrokerLocalError::Channel)?; channel .recv_response() - .map_err(&channel_error)? - .ok_or(channel_closed) + .map_err(BrokerLocalError::Channel)? + .ok_or(BrokerLocalError::ChannelClosed) } #[cfg(test)] From bab6c6f9e0911ad443a9a7133d277987305f8d4d Mon Sep 17 00:00:00 2001 From: Weidong Cui Date: Tue, 23 Jun 2026 18:51:27 -0700 Subject: [PATCH 4/6] Use one broker local error type Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- litebox_broker_local/src/error.rs | 30 +------------------------- litebox_broker_local/src/lib.rs | 36 +++++++++++-------------------- 2 files changed, 14 insertions(+), 52 deletions(-) diff --git a/litebox_broker_local/src/error.rs b/litebox_broker_local/src/error.rs index 2056e3690..98201c365 100644 --- a/litebox_broker_local/src/error.rs +++ b/litebox_broker_local/src/error.rs @@ -1,7 +1,7 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. -use litebox_broker_protocol::{BrokerResponse, ErrorCode, ProtocolVersion}; +use litebox_broker_protocol::{BrokerResponse, ErrorCode}; use thiserror::Error; /// Errors returned by active broker-local control requests. @@ -17,33 +17,5 @@ pub enum BrokerLocalError { UnexpectedResponse(BrokerResponse), } -/// Errors returned while negotiating the broker-local control adapter. -#[derive(Debug, Error)] -pub enum BrokerLocalNegotiationError { - #[error(transparent)] - Control(#[from] BrokerLocalError), - #[error( - "broker accepted incompatible protocol negotiation: requested {requested:?}, broker supports {broker_protocol_version:?}" - )] - IncompatibleNegotiation { - /// Protocol version requested by this local adapter. - requested: ProtocolVersion, - /// Protocol version advertised by the broker. - broker_protocol_version: ProtocolVersion, - }, - #[error( - "broker does not support requested protocol version {requested:?}; broker supports {broker_protocol_version:?}" - )] - UnsupportedVersion { - /// Protocol version requested by this local adapter. - requested: ProtocolVersion, - /// Protocol version advertised by the broker. - broker_protocol_version: ProtocolVersion, - }, -} - -/// Broker-local control adapter negotiation result type. -pub type NegotiationResult = core::result::Result>; - /// Broker-local control adapter result type. pub type Result = core::result::Result>; diff --git a/litebox_broker_local/src/lib.rs b/litebox_broker_local/src/lib.rs index b4bab8213..274078e60 100644 --- a/litebox_broker_local/src/lib.rs +++ b/litebox_broker_local/src/lib.rs @@ -16,11 +16,11 @@ mod error; mod event; use litebox_broker_protocol::{ - BROKER_PROTOCOL_VERSION, BrokerRequest, BrokerResponse, CoreRequest, CoreResponse, + BROKER_PROTOCOL_VERSION, BrokerRequest, BrokerResponse, CoreRequest, CoreResponse, ErrorCode, LocalControlChannel, }; -pub use error::{BrokerLocalError, BrokerLocalNegotiationError, NegotiationResult, Result}; +pub use error::{BrokerLocalError, Result}; /// Typed broker-local control adapter for broker operations. pub struct BrokerLocal { @@ -36,7 +36,7 @@ impl BrokerLocal { impl BrokerLocal { /// Negotiates the broker protocol over an already-connected control channel. - pub fn negotiate(mut channel: T) -> NegotiationResult { + pub fn negotiate(mut channel: T) -> Result { let requested = BROKER_PROTOCOL_VERSION; match raw_request( &mut channel, @@ -44,26 +44,20 @@ impl BrokerLocal { protocol_version: requested, }, )? { - BrokerResponse::Negotiated { + response @ BrokerResponse::Negotiated { broker_protocol_version, } => { if requested != broker_protocol_version { - return Err(BrokerLocalNegotiationError::IncompatibleNegotiation { - requested, - broker_protocol_version, - }); + return Err(BrokerLocalError::UnexpectedResponse(response)); } Ok(Self { channel }) } - BrokerResponse::VersionMismatch { - broker_protocol_version, - } => Err(BrokerLocalNegotiationError::UnsupportedVersion { - requested, - broker_protocol_version, - }), - BrokerResponse::Error(error) => Err(BrokerLocalError::Broker(error).into()), + BrokerResponse::VersionMismatch { .. } => { + Err(BrokerLocalError::Broker(ErrorCode::UnsupportedVersion)) + } + BrokerResponse::Error(error) => Err(BrokerLocalError::Broker(error)), response @ BrokerResponse::Core(_) => { - Err(BrokerLocalError::UnexpectedResponse(response).into()) + Err(BrokerLocalError::UnexpectedResponse(response)) } } } @@ -141,10 +135,9 @@ mod tests { assert!(matches!( BrokerLocal::negotiate(channel), - Err(BrokerLocalNegotiationError::IncompatibleNegotiation { - requested, + Err(BrokerLocalError::UnexpectedResponse(BrokerResponse::Negotiated { broker_protocol_version: broker - }) if requested == BROKER_PROTOCOL_VERSION && broker == broker_protocol_version + })) if broker == broker_protocol_version )); } @@ -157,10 +150,7 @@ mod tests { assert!(matches!( BrokerLocal::negotiate(channel), - Err(BrokerLocalNegotiationError::UnsupportedVersion { - requested, - broker_protocol_version: broker - }) if requested == BROKER_PROTOCOL_VERSION && broker == broker_protocol_version + Err(BrokerLocalError::Broker(ErrorCode::UnsupportedVersion)) )); } From 51c49a35e998a23302e32171945b84f7dc2f74c7 Mon Sep 17 00:00:00 2001 From: Weidong Cui Date: Tue, 23 Jun 2026 19:05:20 -0700 Subject: [PATCH 5/6] Inline broker local event errors Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- litebox_broker_local/src/event.rs | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/litebox_broker_local/src/event.rs b/litebox_broker_local/src/event.rs index 63de775c3..2851289fd 100644 --- a/litebox_broker_local/src/event.rs +++ b/litebox_broker_local/src/event.rs @@ -24,7 +24,9 @@ impl BrokerLocal { self.request_event(EventRequest::Create(CreateEventRequest { initial_count }))?; match response { EventResponse::Create(response) => Ok(response.handle), - response => Err(unexpected_event_response(response)), + response => Err(BrokerLocalError::UnexpectedResponse(BrokerResponse::Core( + CoreResponse::Event(response), + ))), } } @@ -33,7 +35,9 @@ impl BrokerLocal { let response = self.request_event(EventRequest::Wait(WaitEventRequest { handle }))?; match response { EventResponse::Wait(response) => Ok(response.readiness), - response => Err(unexpected_event_response(response)), + response => Err(BrokerLocalError::UnexpectedResponse(BrokerResponse::Core( + CoreResponse::Event(response), + ))), } } @@ -46,7 +50,9 @@ impl BrokerLocal { let response = self.request_event(EventRequest::Add(AddEventRequest { handle, value }))?; match response { EventResponse::Add(response) => Ok(response.readiness), - response => Err(unexpected_event_response(response)), + response => Err(BrokerLocalError::UnexpectedResponse(BrokerResponse::Core( + CoreResponse::Event(response), + ))), } } @@ -60,7 +66,9 @@ impl BrokerLocal { self.request_event(EventRequest::Consume(ConsumeEventRequest { handle, mode }))?; match response { EventResponse::Consume(response) => Ok(response), - response => Err(unexpected_event_response(response)), + response => Err(BrokerLocalError::UnexpectedResponse(BrokerResponse::Core( + CoreResponse::Event(response), + ))), } } @@ -69,7 +77,3 @@ impl BrokerLocal { Ok(response) } } - -fn unexpected_event_response(response: EventResponse) -> BrokerLocalError { - BrokerLocalError::UnexpectedResponse(BrokerResponse::Core(CoreResponse::Event(response))) -} From fc94cc04b5491d728a6e958ac2e249a2bd26f822 Mon Sep 17 00:00:00 2001 From: Weidong Cui Date: Wed, 24 Jun 2026 08:10:10 -0700 Subject: [PATCH 6/6] Use single BrokerLocal impl block Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- litebox_broker_local/src/lib.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/litebox_broker_local/src/lib.rs b/litebox_broker_local/src/lib.rs index 274078e60..72b6b3059 100644 --- a/litebox_broker_local/src/lib.rs +++ b/litebox_broker_local/src/lib.rs @@ -27,14 +27,12 @@ pub struct BrokerLocal { channel: T, } -impl BrokerLocal { +impl BrokerLocal { /// Returns the underlying control channel for deployment-specific configuration. pub fn control_channel_mut(&mut self) -> &mut T { &mut self.channel } -} -impl BrokerLocal { /// Negotiates the broker protocol over an already-connected control channel. pub fn negotiate(mut channel: T) -> Result { let requested = BROKER_PROTOCOL_VERSION;