supportedProtocolVersions = List.of(ProtocolVersions.MCP_2024_11_05,
ProtocolVersions.MCP_2025_03_26, ProtocolVersions.MCP_2025_06_18, ProtocolVersions.MCP_2025_11_25);
+ private McpHttpClientAuthorizationErrorHandler authorizationErrorHandler = McpHttpClientAuthorizationErrorHandler.NOOP;
+
/**
* Creates a new builder with the specified base URI.
* @param baseUri the base URI of the MCP server
@@ -801,6 +832,17 @@ public Builder asyncHttpRequestCustomizer(McpAsyncHttpClientRequestCustomizer as
return this;
}
+ /**
+ * Sets the handler to be used when the server responds with HTTP 401 or HTTP 403
+ * when sending a message.
+ * @param authorizationErrorHandler the handler
+ * @return this builder
+ */
+ public Builder authorizationErrorHandler(McpHttpClientAuthorizationErrorHandler authorizationErrorHandler) {
+ this.authorizationErrorHandler = authorizationErrorHandler;
+ return this;
+ }
+
/**
* Sets the connection timeout for the HTTP client.
* @param connectTimeout the connection timeout duration
@@ -845,7 +887,7 @@ public HttpClientStreamableHttpTransport build() {
HttpClient httpClient = this.clientBuilder.connectTimeout(this.connectTimeout).build();
return new HttpClientStreamableHttpTransport(jsonMapper == null ? McpJsonDefaults.getMapper() : jsonMapper,
httpClient, requestBuilder, baseUri, endpoint, resumableStreams, openConnectionOnStartup,
- httpRequestCustomizer, supportedProtocolVersions);
+ httpRequestCustomizer, authorizationErrorHandler, supportedProtocolVersions);
}
}
diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/McpHttpClientTransportAuthorizationException.java b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/McpHttpClientTransportAuthorizationException.java
new file mode 100644
index 000000000..31e5ae95e
--- /dev/null
+++ b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/McpHttpClientTransportAuthorizationException.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2026-2026 the original author or authors.
+ */
+
+package io.modelcontextprotocol.client.transport;
+
+import java.net.http.HttpResponse;
+
+import io.modelcontextprotocol.spec.McpTransportException;
+
+/**
+ * Thrown when the MCP server responds with an authorization error (HTTP 401 or HTTP 403).
+ * Subclass of {@link McpTransportException} for targeted retry handling in
+ * {@link HttpClientStreamableHttpTransport}.
+ *
+ * @author Daniel Garnier-Moiroux
+ */
+public class McpHttpClientTransportAuthorizationException extends McpTransportException {
+
+ private final HttpResponse.ResponseInfo responseInfo;
+
+ public McpHttpClientTransportAuthorizationException(String message, HttpResponse.ResponseInfo responseInfo) {
+ super(message);
+ this.responseInfo = responseInfo;
+ }
+
+ public HttpResponse.ResponseInfo getResponseInfo() {
+ return responseInfo;
+ }
+
+}
diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/customizer/McpHttpClientAuthorizationErrorHandler.java b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/customizer/McpHttpClientAuthorizationErrorHandler.java
new file mode 100644
index 000000000..c98fac61d
--- /dev/null
+++ b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/customizer/McpHttpClientAuthorizationErrorHandler.java
@@ -0,0 +1,104 @@
+/*
+ * Copyright 2026-2026 the original author or authors.
+ */
+
+package io.modelcontextprotocol.client.transport.customizer;
+
+import java.net.http.HttpResponse;
+
+import io.modelcontextprotocol.client.transport.McpHttpClientTransportAuthorizationException;
+import io.modelcontextprotocol.common.McpTransportContext;
+import org.reactivestreams.Publisher;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+
+/**
+ * Handle security-related errors in HTTP-client based transports. This class handles MCP
+ * server responses with status code 401 and 403.
+ *
+ * @see MCP
+ * Specification: Authorization
+ * @author Daniel Garnier-Moiroux
+ */
+public interface McpHttpClientAuthorizationErrorHandler {
+
+ /**
+ * Handle authorization error (HTTP 401 or 403), and signal whether the HTTP request
+ * should be retried or not. If the publisher returns true, the original transport
+ * method (connect, sendMessage) will be replayed with the original arguments.
+ * Otherwise, the transport will throw an
+ * {@link McpHttpClientTransportAuthorizationException}, indicating the error status.
+ *
+ * If the returned {@link Publisher} errors, the error will be propagated to the
+ * calling method, to be handled by the caller.
+ *
+ * The number of retries is bounded by {@link #maxRetries()}.
+ * @param responseInfo the HTTP response information
+ * @param context the MCP client transport context
+ * @return {@link Publisher} emitting true if the original request should be replayed,
+ * false otherwise.
+ */
+ Publisher handle(HttpResponse.ResponseInfo responseInfo, McpTransportContext context);
+
+ /**
+ * Maximum number of authorization error retries the transport will attempt. When the
+ * handler signals a retry via {@link #handle}, the transport will replay the original
+ * request at most this many times. If the authorization error persists after
+ * exhausting all retries, the transport will propagate the
+ * {@link McpHttpClientTransportAuthorizationException}.
+ *
+ * Defaults to {@code 1}.
+ * @return the maximum number of retries
+ */
+ default int maxRetries() {
+ return 1;
+ }
+
+ /**
+ * A no-op handler, used in the default use-case.
+ */
+ McpHttpClientAuthorizationErrorHandler NOOP = new Noop();
+
+ /**
+ * Create a {@link McpHttpClientAuthorizationErrorHandler} from a synchronous handler.
+ * Will be subscribed on {@link Schedulers#boundedElastic()}. The handler may be
+ * blocking.
+ * @param handler the synchronous handler
+ * @return an async handler
+ */
+ static McpHttpClientAuthorizationErrorHandler fromSync(Sync handler) {
+ return (info, context) -> Mono.fromCallable(() -> handler.handle(info, context))
+ .subscribeOn(Schedulers.boundedElastic());
+ }
+
+ /**
+ * Synchronous authorization error handler.
+ */
+ interface Sync {
+
+ /**
+ * Handle authorization error (HTTP 401 or 403), and signal whether the HTTP
+ * request should be retried or not. If the return value is true, the original
+ * transport method (connect, sendMessage) will be replayed with the original
+ * arguments. Otherwise, the transport will throw an
+ * {@link McpHttpClientTransportAuthorizationException}, indicating the error
+ * status.
+ * @param responseInfo the HTTP response information
+ * @param context the MCP client transport context
+ * @return true if the original request should be replayed, false otherwise.
+ */
+ boolean handle(HttpResponse.ResponseInfo responseInfo, McpTransportContext context);
+
+ }
+
+ class Noop implements McpHttpClientAuthorizationErrorHandler {
+
+ @Override
+ public Publisher handle(HttpResponse.ResponseInfo responseInfo, McpTransportContext context) {
+ return Mono.just(false);
+ }
+
+ }
+
+}
diff --git a/mcp-core/src/test/java/io/modelcontextprotocol/client/transport/customizer/McpHttpClientAuthorizationErrorHandlerTest.java b/mcp-core/src/test/java/io/modelcontextprotocol/client/transport/customizer/McpHttpClientAuthorizationErrorHandlerTest.java
new file mode 100644
index 000000000..2812522f5
--- /dev/null
+++ b/mcp-core/src/test/java/io/modelcontextprotocol/client/transport/customizer/McpHttpClientAuthorizationErrorHandlerTest.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2026-2026 the original author or authors.
+ */
+package io.modelcontextprotocol.client.transport.customizer;
+
+import java.net.http.HttpResponse;
+
+import io.modelcontextprotocol.common.McpTransportContext;
+import org.junit.jupiter.api.Test;
+import reactor.test.StepVerifier;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * @author Daniel Garnier-Moiroux
+ */
+class McpHttpClientAuthorizationErrorHandlerTest {
+
+ private final HttpResponse.ResponseInfo responseInfo = mock(HttpResponse.ResponseInfo.class);
+
+ private final McpTransportContext context = McpTransportContext.EMPTY;
+
+ @Test
+ void whenTrueThenRetry() {
+ McpHttpClientAuthorizationErrorHandler handler = McpHttpClientAuthorizationErrorHandler
+ .fromSync((info, ctx) -> true);
+ StepVerifier.create(handler.handle(responseInfo, context)).expectNext(true).verifyComplete();
+ }
+
+ @Test
+ void whenFalseThenError() {
+ McpHttpClientAuthorizationErrorHandler handler = McpHttpClientAuthorizationErrorHandler
+ .fromSync((info, ctx) -> false);
+ StepVerifier.create(handler.handle(responseInfo, context)).expectNext(false).verifyComplete();
+ }
+
+ @Test
+ void whenExceptionThenPropagate() {
+ McpHttpClientAuthorizationErrorHandler handler = McpHttpClientAuthorizationErrorHandler
+ .fromSync((info, ctx) -> {
+ throw new IllegalStateException("sync handler error");
+ });
+ StepVerifier.create(handler.handle(responseInfo, context))
+ .expectErrorMatches(t -> t instanceof IllegalStateException && t.getMessage().equals("sync handler error"))
+ .verify();
+ }
+
+}
diff --git a/mcp-test/src/test/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransportErrorHandlingTest.java b/mcp-test/src/test/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransportErrorHandlingTest.java
index b82d6eb2c..c4857e5b4 100644
--- a/mcp-test/src/test/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransportErrorHandlingTest.java
+++ b/mcp-test/src/test/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransportErrorHandlingTest.java
@@ -1,26 +1,24 @@
/*
- * Copyright 2025-2025 the original author or authors.
+ * Copyright 2025-2026 the original author or authors.
*/
package io.modelcontextprotocol.client.transport;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.net.http.HttpResponse;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
-
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.Timeout;
+import java.util.function.Predicate;
import com.sun.net.httpserver.HttpServer;
-
+import io.modelcontextprotocol.client.transport.customizer.McpHttpClientAuthorizationErrorHandler;
+import io.modelcontextprotocol.common.McpTransportContext;
+import org.reactivestreams.Publisher;
import io.modelcontextprotocol.server.transport.TomcatTestUtil;
import io.modelcontextprotocol.spec.HttpHeaders;
import io.modelcontextprotocol.spec.McpClientTransport;
@@ -28,14 +26,30 @@
import io.modelcontextprotocol.spec.McpTransportException;
import io.modelcontextprotocol.spec.McpTransportSessionNotFoundException;
import io.modelcontextprotocol.spec.ProtocolVersions;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.InstanceOfAssertFactories.type;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
/**
* Tests for error handling changes in HttpClientStreamableHttpTransport. Specifically
* tests the distinction between session-related errors and general transport errors for
* 404 and 400 status codes.
*
* @author Christian Tzolov
+ * @author Daniel Garnier-Moiroux
*/
@Timeout(15)
public class HttpClientStreamableHttpTransportErrorHandlingTest {
@@ -46,11 +60,17 @@ public class HttpClientStreamableHttpTransportErrorHandlingTest {
private HttpServer server;
- private AtomicReference serverResponseStatus = new AtomicReference<>(200);
+ private final AtomicInteger serverResponseStatus = new AtomicInteger(200);
+
+ private final AtomicInteger serverSseResponseStatus = new AtomicInteger(200);
- private AtomicReference currentServerSessionId = new AtomicReference<>(null);
+ private final AtomicReference currentServerSessionId = new AtomicReference<>(null);
- private AtomicReference lastReceivedSessionId = new AtomicReference<>(null);
+ private final AtomicReference lastReceivedSessionId = new AtomicReference<>(null);
+
+ private final AtomicInteger processedMessagesCount = new AtomicInteger(0);
+
+ private final AtomicInteger processedSseConnectCount = new AtomicInteger(0);
private McpClientTransport transport;
@@ -88,6 +108,20 @@ else if ("POST".equals(httpExchange.getRequestMethod())) {
else {
httpExchange.sendResponseHeaders(status, 0);
}
+ processedMessagesCount.incrementAndGet();
+ }
+ else if ("GET".equals(httpExchange.getRequestMethod())) {
+ int status = serverSseResponseStatus.get();
+ if (status == 200) {
+ httpExchange.getResponseHeaders().set("Content-Type", "text/event-stream");
+ httpExchange.sendResponseHeaders(200, 0);
+ String sseData = "event: message\ndata: {\"jsonrpc\":\"2.0\",\"method\":\"test\",\"params\":{}}\n\n";
+ httpExchange.getResponseBody().write(sseData.getBytes());
+ }
+ else {
+ httpExchange.sendResponseHeaders(status, 0);
+ }
+ processedSseConnectCount.incrementAndGet();
}
httpExchange.close();
});
@@ -103,6 +137,7 @@ void stopServer() {
if (server != null) {
server.stop(0);
}
+ StepVerifier.create(transport.closeGracefully()).verifyComplete();
}
/**
@@ -334,6 +369,386 @@ else if (status == 404) {
StepVerifier.create(transport.closeGracefully()).verifyComplete();
}
+ @Nested
+ class AuthorizationError {
+
+ @Nested
+ class SendMessage {
+
+ @ParameterizedTest
+ @ValueSource(ints = { 401, 403 })
+ void invokeHandler(int httpStatus) {
+ serverResponseStatus.set(httpStatus);
+
+ AtomicReference capturedResponseInfo = new AtomicReference<>();
+ AtomicReference capturedContext = new AtomicReference<>();
+
+ var authTransport = HttpClientStreamableHttpTransport.builder(HOST)
+ .authorizationErrorHandler((responseInfo, context) -> {
+ capturedResponseInfo.set(responseInfo);
+ capturedContext.set(context);
+ return Mono.just(false);
+ })
+ .build();
+
+ StepVerifier.create(authTransport.sendMessage(createTestRequestMessage()))
+ .expectErrorMatches(authorizationError(httpStatus))
+ .verify();
+ assertThat(processedMessagesCount.get()).isEqualTo(1);
+ assertThat(capturedResponseInfo.get()).isNotNull();
+ assertThat(capturedResponseInfo.get().statusCode()).isEqualTo(httpStatus);
+ assertThat(capturedContext.get()).isNotNull();
+
+ StepVerifier.create(authTransport.closeGracefully()).verifyComplete();
+ }
+
+ @Test
+ void defaultHandler() {
+ serverResponseStatus.set(401);
+
+ var authTransport = HttpClientStreamableHttpTransport.builder(HOST).build();
+
+ StepVerifier.create(authTransport.sendMessage(createTestRequestMessage()))
+ .expectErrorMatches(authorizationError(401))
+ .verify();
+ assertThat(processedMessagesCount.get()).isEqualTo(1);
+
+ StepVerifier.create(authTransport.closeGracefully()).verifyComplete();
+ }
+
+ @Test
+ void retry() {
+ serverResponseStatus.set(401);
+ var authTransport = HttpClientStreamableHttpTransport.builder(HOST)
+ .authorizationErrorHandler((responseInfo, context) -> {
+ serverResponseStatus.set(200);
+ return Mono.just(true);
+ })
+ .build();
+ StepVerifier.create(authTransport.sendMessage(createTestRequestMessage())).verifyComplete();
+ // initial request + retry
+ assertThat(processedMessagesCount.get()).isEqualTo(2);
+
+ StepVerifier.create(authTransport.closeGracefully()).verifyComplete();
+ }
+
+ @Test
+ void retryAtMostOnce() {
+ serverResponseStatus.set(401);
+ var authTransport = HttpClientStreamableHttpTransport.builder(HOST)
+ .authorizationErrorHandler((responseInfo, context) -> Mono.just(true))
+ .build();
+ StepVerifier.create(authTransport.sendMessage(createTestRequestMessage()))
+ .expectErrorMatches(authorizationError(401))
+ .verify();
+ // initial request + 1 retry (maxRetries default is 1)
+ assertThat(processedMessagesCount.get()).isEqualTo(2);
+
+ StepVerifier.create(authTransport.closeGracefully()).verifyComplete();
+ }
+
+ @Test
+ void customMaxRetries() {
+ serverResponseStatus.set(401);
+ var authTransport = HttpClientStreamableHttpTransport.builder(HOST)
+ .authorizationErrorHandler(new McpHttpClientAuthorizationErrorHandler() {
+ @Override
+ public Publisher handle(HttpResponse.ResponseInfo responseInfo,
+ McpTransportContext context) {
+ return Mono.just(true);
+ }
+
+ @Override
+ public int maxRetries() {
+ return 3;
+ }
+ })
+ .build();
+ StepVerifier.create(authTransport.sendMessage(createTestRequestMessage()))
+ .expectErrorMatches(authorizationError(401))
+ .verify();
+ // initial request + 3 retries
+ assertThat(processedMessagesCount.get()).isEqualTo(4);
+
+ StepVerifier.create(authTransport.closeGracefully()).verifyComplete();
+ }
+
+ @Test
+ void noRetry() {
+ serverResponseStatus.set(401);
+
+ var authTransport = HttpClientStreamableHttpTransport.builder(HOST)
+ .authorizationErrorHandler((responseInfo, context) -> Mono.just(false))
+ .build();
+
+ StepVerifier.create(authTransport.sendMessage(createTestRequestMessage()))
+ .expectErrorMatches(authorizationError(401))
+ .verify();
+ assertThat(processedMessagesCount.get()).isEqualTo(1);
+
+ StepVerifier.create(authTransport.closeGracefully()).verifyComplete();
+ }
+
+ @Test
+ void propagateHandlerError() {
+ serverResponseStatus.set(401);
+ var authTransport = HttpClientStreamableHttpTransport.builder(HOST)
+ .authorizationErrorHandler(
+ (responseInfo, context) -> Mono.error(new IllegalStateException("handler error")))
+ .build();
+
+ StepVerifier.create(authTransport.sendMessage(createTestRequestMessage()))
+ .expectErrorMatches(throwable -> throwable instanceof IllegalStateException
+ && throwable.getMessage().equals("handler error"))
+ .verify();
+
+ StepVerifier.create(authTransport.closeGracefully()).verifyComplete();
+ }
+
+ @Test
+ void emptyHandler() {
+ serverResponseStatus.set(401);
+ var authTransport = HttpClientStreamableHttpTransport.builder(HOST)
+ .authorizationErrorHandler((responseInfo, context) -> Mono.empty())
+ .build();
+
+ StepVerifier.create(authTransport.sendMessage(createTestRequestMessage()))
+ .expectErrorMatches(authorizationError(401))
+ .verify();
+
+ StepVerifier.create(authTransport.closeGracefully()).verifyComplete();
+ }
+
+ }
+
+ @Nested
+ class Connect {
+
+ @ParameterizedTest
+ @ValueSource(ints = { 401, 403 })
+ void invokeHandler(int httpStatus) {
+ serverSseResponseStatus.set(httpStatus);
+ @SuppressWarnings("unchecked")
+ AtomicReference capturedException = new AtomicReference<>();
+
+ AtomicReference capturedResponseInfo = new AtomicReference<>();
+ AtomicReference capturedContext = new AtomicReference<>();
+
+ var authTransport = HttpClientStreamableHttpTransport.builder(HOST)
+ .authorizationErrorHandler((responseInfo, context) -> {
+ capturedResponseInfo.set(responseInfo);
+ capturedContext.set(context);
+ return Mono.just(false);
+ })
+ .openConnectionOnStartup(true)
+ .build();
+ authTransport.setExceptionHandler(capturedException::set);
+
+ var messages = new ArrayList();
+ StepVerifier.create(authTransport.connect(msg -> msg.doOnNext(messages::add))).verifyComplete();
+ Awaitility.await()
+ .atMost(Duration.ofSeconds(1))
+ .untilAsserted(() -> assertThat(processedSseConnectCount.get()).isEqualTo(1));
+ assertThat(messages).isEmpty();
+ assertThat(capturedResponseInfo.get()).isNotNull();
+ assertThat(capturedResponseInfo.get().statusCode()).isEqualTo(httpStatus);
+ assertThat(capturedContext.get()).isNotNull();
+ assertThat(capturedException.get()).hasMessage("Authorization error connecting to SSE stream")
+ .asInstanceOf(type(McpHttpClientTransportAuthorizationException.class))
+ .extracting(McpHttpClientTransportAuthorizationException::getResponseInfo)
+ .extracting(HttpResponse.ResponseInfo::statusCode)
+ .isEqualTo(httpStatus);
+
+ StepVerifier.create(authTransport.closeGracefully()).verifyComplete();
+ }
+
+ @Test
+ void defaultHandler() {
+ serverSseResponseStatus.set(401);
+ AtomicReference capturedException = new AtomicReference<>();
+ var authTransport = HttpClientStreamableHttpTransport.builder(HOST)
+ .openConnectionOnStartup(true)
+ .build();
+ authTransport.setExceptionHandler(capturedException::set);
+
+ StepVerifier.create(authTransport.connect(msg -> msg)).verifyComplete();
+ Awaitility.await()
+ .atMost(Duration.ofSeconds(1))
+ .untilAsserted(() -> assertThat(processedSseConnectCount.get()).isEqualTo(1));
+ assertThat(capturedException.get()).isInstanceOf(McpHttpClientTransportAuthorizationException.class);
+
+ StepVerifier.create(authTransport.closeGracefully()).verifyComplete();
+ }
+
+ @Test
+ void retry() {
+ serverSseResponseStatus.set(401);
+ AtomicReference capturedException = new AtomicReference<>();
+ var authTransport = HttpClientStreamableHttpTransport.builder(HOST)
+ .openConnectionOnStartup(true)
+ .authorizationErrorHandler((responseInfo, context) -> {
+ serverSseResponseStatus.set(200);
+ return Mono.just(true);
+ })
+ .build();
+ authTransport.setExceptionHandler(capturedException::set);
+
+ var messages = new ArrayList();
+ var messageHandlerClosed = new AtomicBoolean(false);
+ StepVerifier
+ .create(authTransport
+ .connect(msg -> msg.doOnNext(messages::add).doFinally(s -> messageHandlerClosed.set(true))))
+ .verifyComplete();
+ Awaitility.await()
+ .atMost(Duration.ofSeconds(1))
+ .untilAsserted(() -> assertThat(messageHandlerClosed).isTrue());
+ assertThat(processedSseConnectCount.get()).isEqualTo(2);
+ assertThat(messages).hasSize(1);
+ assertThat(capturedException.get()).isNull();
+ assertThat(messageHandlerClosed.get()).isTrue();
+
+ StepVerifier.create(authTransport.closeGracefully()).verifyComplete();
+ }
+
+ @Test
+ void retryAtMostOnce() {
+ serverSseResponseStatus.set(401);
+ AtomicReference capturedException = new AtomicReference<>();
+ var authTransport = HttpClientStreamableHttpTransport.builder(HOST)
+ .openConnectionOnStartup(true)
+ .authorizationErrorHandler((responseInfo, context) -> {
+ return Mono.just(true);
+ })
+ .build();
+ authTransport.setExceptionHandler(capturedException::set);
+
+ var messages = new ArrayList();
+ StepVerifier.create(authTransport.connect(msg -> msg.doOnNext(messages::add))).verifyComplete();
+ Awaitility.await()
+ .atMost(Duration.ofSeconds(1))
+ .untilAsserted(() -> assertThat(capturedException.get()).isNotNull());
+ // initial request + 1 retry (maxRetries default is 1)
+ assertThat(processedSseConnectCount.get()).isEqualTo(2);
+ assertThat(messages).isEmpty();
+ assertThat(capturedException.get()).isInstanceOf(McpHttpClientTransportAuthorizationException.class);
+
+ StepVerifier.create(authTransport.closeGracefully()).verifyComplete();
+ }
+
+ @Test
+ void customMaxRetries() {
+ serverSseResponseStatus.set(401);
+ AtomicReference capturedException = new AtomicReference<>();
+ var authTransport = HttpClientStreamableHttpTransport.builder(HOST)
+ .openConnectionOnStartup(true)
+ .authorizationErrorHandler(new McpHttpClientAuthorizationErrorHandler() {
+ @Override
+ public Publisher handle(HttpResponse.ResponseInfo responseInfo,
+ McpTransportContext context) {
+ return Mono.just(true);
+ }
+
+ @Override
+ public int maxRetries() {
+ return 3;
+ }
+ })
+ .build();
+ authTransport.setExceptionHandler(capturedException::set);
+
+ var messages = new ArrayList();
+ StepVerifier.create(authTransport.connect(msg -> msg.doOnNext(messages::add))).verifyComplete();
+ Awaitility.await()
+ .atMost(Duration.ofSeconds(1))
+ .untilAsserted(() -> assertThat(capturedException.get()).isNotNull());
+ // initial request + 3 retries
+ assertThat(processedSseConnectCount.get()).isEqualTo(4);
+ assertThat(messages).isEmpty();
+ assertThat(capturedException.get()).isInstanceOf(McpHttpClientTransportAuthorizationException.class);
+
+ StepVerifier.create(authTransport.closeGracefully()).verifyComplete();
+ }
+
+ @Test
+ void noRetry() {
+ serverSseResponseStatus.set(401);
+ AtomicReference capturedException = new AtomicReference<>();
+ var authTransport = HttpClientStreamableHttpTransport.builder(HOST)
+ .openConnectionOnStartup(true)
+ .authorizationErrorHandler((responseInfo, context) -> {
+ // if there was a retry, the request would succeed.
+ serverSseResponseStatus.set(200);
+ return Mono.just(false);
+ })
+ .build();
+ authTransport.setExceptionHandler(capturedException::set);
+
+ var messages = new ArrayList();
+ StepVerifier.create(authTransport.connect(msg -> msg.doOnNext(messages::add))).verifyComplete();
+ Awaitility.await()
+ .atMost(Duration.ofSeconds(1))
+ .untilAsserted(() -> assertThat(processedSseConnectCount.get()).isEqualTo(1));
+ assertThat(messages).isEmpty();
+ assertThat(capturedException.get()).isInstanceOf(McpHttpClientTransportAuthorizationException.class);
+
+ StepVerifier.create(authTransport.closeGracefully()).verifyComplete();
+ }
+
+ @Test
+ void emptyHandler() {
+ serverSseResponseStatus.set(401);
+ AtomicReference capturedException = new AtomicReference<>();
+ var authTransport = HttpClientStreamableHttpTransport.builder(HOST)
+ .openConnectionOnStartup(true)
+ .authorizationErrorHandler((responseInfo, context) -> Mono.empty())
+ .build();
+ authTransport.setExceptionHandler(capturedException::set);
+
+ var messages = new ArrayList();
+ StepVerifier.create(authTransport.connect(msg -> msg.doOnNext(messages::add))).verifyComplete();
+ Awaitility.await()
+ .atMost(Duration.ofSeconds(1))
+ .untilAsserted(() -> assertThat(processedSseConnectCount.get()).isEqualTo(1));
+ assertThat(messages).isEmpty();
+ assertThat(capturedException.get()).isInstanceOf(McpHttpClientTransportAuthorizationException.class);
+
+ StepVerifier.create(authTransport.closeGracefully()).verifyComplete();
+ }
+
+ @Test
+ void propagateHandlerError() {
+ serverSseResponseStatus.set(401);
+ AtomicReference capturedException = new AtomicReference<>();
+ var authTransport = HttpClientStreamableHttpTransport.builder(HOST)
+ .openConnectionOnStartup(true)
+ .authorizationErrorHandler(
+ (responseInfo, context) -> Mono.error(new IllegalStateException("handler error")))
+ .build();
+ authTransport.setExceptionHandler(capturedException::set);
+
+ var messages = new ArrayList();
+ StepVerifier.create(authTransport.connect(msg -> msg.doOnNext(messages::add))).verifyComplete();
+ Awaitility.await()
+ .atMost(Duration.ofSeconds(1))
+ .untilAsserted(() -> assertThat(processedSseConnectCount.get()).isEqualTo(1));
+ assertThat(messages).isEmpty();
+ assertThat(capturedException.get()).isInstanceOf(IllegalStateException.class)
+ .hasMessage("handler error");
+
+ StepVerifier.create(authTransport.closeGracefully()).verifyComplete();
+ }
+
+ }
+
+ private static Predicate authorizationError(int httpStatus) {
+ return throwable -> throwable instanceof McpHttpClientTransportAuthorizationException
+ && throwable.getMessage().contains("Authorization error")
+ && ((McpHttpClientTransportAuthorizationException) throwable).getResponseInfo()
+ .statusCode() == httpStatus;
+ }
+
+ }
+
private McpSchema.JSONRPCRequest createTestRequestMessage() {
var initializeRequest = new McpSchema.InitializeRequest(ProtocolVersions.MCP_2025_03_26,
McpSchema.ClientCapabilities.builder().roots(true).build(),