Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 6 additions & 9 deletions litebox/src/broker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@
// Licensed under the MIT license.

use litebox_broker_local::{BrokerLocal, BrokerLocalError};
use litebox_broker_protocol::{
BrokerRequest, BrokerResponse, CoreRequest, CoreResponse, LocalControlChannel,
};
use litebox_broker_protocol::{CoreRequest, CoreResponse, LocalControlChannel};

use crate::sync::{Mutex, RawSyncPrimitivesProvider};

Expand Down Expand Up @@ -51,15 +49,14 @@ where
let response = self
.local
.lock()
.request(BrokerRequest::Core(request))
.request(request)
.map_err(|error| match error {
BrokerLocalError::Channel(_) | BrokerLocalError::ChannelClosed => {
BrokerControlError::Transport
}
BrokerLocalError::Broker(error) => BrokerControlError::Broker(error),
BrokerLocalError::UnexpectedResponse(_) => BrokerControlError::UnexpectedResponse,
_ => BrokerControlError::Transport,
})?;
match response {
BrokerResponse::Core(response) => Ok(response),
_ => Err(BrokerControlError::UnexpectedResponse),
}
Ok(response)
}
}
36 changes: 2 additions & 34 deletions litebox_broker_local/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,48 +1,16 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

use litebox_broker_protocol::{BrokerResponse, ErrorCode, ProtocolVersion};
use litebox_broker_protocol::{BrokerResponse, ErrorCode};
use thiserror::Error;

/// Errors returned by the broker-local control adapter.
/// Errors returned by active broker-local control requests.
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum BrokerLocalError<E> {
#[error("broker channel failed: {0}")]
Channel(#[source] E),
#[error("broker local adapter has not negotiated protocol version")]
NotNegotiated,
#[error("broker local adapter already negotiated")]
AlreadyNegotiated,
#[error("broker closed the channel")]
ChannelClosed,
#[error(
"broker accepted incompatible protocol negotiation: requested {requested:?}, broker supports {broker_protocol_version:?}"
)]
IncompatibleNegotiation {
/// Protocol version requested by this local adapter.
requested: ProtocolVersion,
/// Protocol version advertised by the broker.
broker_protocol_version: ProtocolVersion,
},
#[error(
"broker local adapter cannot request protocol version {requested:?}; local adapter supports {local_protocol_version:?}"
)]
UnsupportedLocalVersion {
/// Protocol version requested by the caller.
requested: ProtocolVersion,
/// Protocol version supported by this local implementation.
local_protocol_version: ProtocolVersion,
},
#[error(
"broker does not support requested protocol version {requested:?}; broker supports {broker_protocol_version:?}"
)]
UnsupportedVersion {
/// Protocol version requested by this local adapter.
requested: ProtocolVersion,
/// Protocol version advertised by the broker.
broker_protocol_version: ProtocolVersion,
},
#[error("broker rejected request: {0}")]
Broker(#[source] ErrorCode),
#[error("broker returned unexpected response: {0:?}")]
Expand Down
67 changes: 32 additions & 35 deletions litebox_broker_local/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
// Licensed under the MIT license.

use litebox_broker_protocol::{
AddEventRequest, BrokerRequest, BrokerResponse, ConsumeEventRequest, ConsumeEventResponse,
CoreRequest, CoreResponse, CreateEventRequest, EventConsumeMode, EventRequest, EventResponse,
AddEventRequest, BrokerResponse, ConsumeEventRequest, ConsumeEventResponse, CoreRequest,
CoreResponse, CreateEventRequest, EventConsumeMode, EventRequest, EventResponse,
LocalControlChannel, ObjectHandle, ReadinessState, WaitEventRequest,
};

Expand All @@ -20,25 +20,24 @@ impl<T: LocalControlChannel> BrokerLocal<T> {
&mut self,
initial_count: u64,
) -> Result<ObjectHandle, T::Error> {
match self.request(event_request(EventRequest::Create(CreateEventRequest {
initial_count,
})))? {
BrokerResponse::Core(CoreResponse::Event(EventResponse::Create(response))) => {
Ok(response.handle)
}
response => Err(BrokerLocalError::UnexpectedResponse(response)),
let response =
self.request_event(EventRequest::Create(CreateEventRequest { initial_count }))?;
match response {
EventResponse::Create(response) => Ok(response.handle),
response => Err(BrokerLocalError::UnexpectedResponse(BrokerResponse::Core(
CoreResponse::Event(response),
))),
}
}

/// Checks whether an event wait would complete now.
pub fn wait_event(&mut self, handle: ObjectHandle) -> Result<ReadinessState, T::Error> {
match self.request(event_request(EventRequest::Wait(WaitEventRequest {
handle,
})))? {
BrokerResponse::Core(CoreResponse::Event(EventResponse::Wait(response))) => {
Ok(response.readiness)
}
response => Err(BrokerLocalError::UnexpectedResponse(response)),
let response = self.request_event(EventRequest::Wait(WaitEventRequest { handle }))?;
match response {
EventResponse::Wait(response) => Ok(response.readiness),
response => Err(BrokerLocalError::UnexpectedResponse(BrokerResponse::Core(
CoreResponse::Event(response),
))),
}
}

Expand All @@ -48,14 +47,12 @@ impl<T: LocalControlChannel> BrokerLocal<T> {
handle: ObjectHandle,
value: u64,
) -> Result<ReadinessState, T::Error> {
match self.request(event_request(EventRequest::Add(AddEventRequest {
handle,
value,
})))? {
BrokerResponse::Core(CoreResponse::Event(EventResponse::Add(response))) => {
Ok(response.readiness)
}
response => Err(BrokerLocalError::UnexpectedResponse(response)),
let response = self.request_event(EventRequest::Add(AddEventRequest { handle, value }))?;
match response {
EventResponse::Add(response) => Ok(response.readiness),
response => Err(BrokerLocalError::UnexpectedResponse(BrokerResponse::Core(
CoreResponse::Event(response),
))),
}
}

Expand All @@ -65,18 +62,18 @@ impl<T: LocalControlChannel> BrokerLocal<T> {
handle: ObjectHandle,
mode: EventConsumeMode,
) -> Result<ConsumeEventResponse, T::Error> {
match self.request(event_request(EventRequest::Consume(ConsumeEventRequest {
handle,
mode,
})))? {
BrokerResponse::Core(CoreResponse::Event(EventResponse::Consume(response))) => {
Ok(response)
}
response => Err(BrokerLocalError::UnexpectedResponse(response)),
let response =
self.request_event(EventRequest::Consume(ConsumeEventRequest { handle, mode }))?;
match response {
EventResponse::Consume(response) => Ok(response),
response => Err(BrokerLocalError::UnexpectedResponse(BrokerResponse::Core(
CoreResponse::Event(response),
))),
}
}
}

const fn event_request(request: EventRequest) -> BrokerRequest {
BrokerRequest::Core(CoreRequest::Event(request))
fn request_event(&mut self, request: EventRequest) -> Result<EventResponse, T::Error> {
let CoreResponse::Event(response) = self.request(CoreRequest::Event(request))?;
Ok(response)
}
}
Loading