Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,10 @@ public void goneAndThrottlingExceptionSuccessScenarioQuery() {
@Test(groups = {"long-emulator"}, timeOut = TIMEOUT * 2)
@SuppressWarnings("unchecked")
public void goneExceptionFailureScenario() {
// Reduce retry wait time for faster test execution
System.setProperty("COSMOS.CLIENT_ENDPOINT_FAILOVER_MAX_RETRY_COUNT", "5");
System.setProperty("COSMOS.CLIENT_ENDPOINT_FAILOVER_RETRY_INTERVAL_IN_MS", "100");

CosmosClient cosmosClient = new CosmosClientBuilder()
.endpoint(TestConfigurations.HOST)
.key(TestConfigurations.MASTER_KEY)
Expand Down Expand Up @@ -657,9 +661,7 @@ public void goneExceptionFailureScenario() {
RetryContext retryContext =
ex.getDiagnostics().clientSideRequestStatistics().getRetryContext();

//In CI pipeline, the emulator starts with strong consitency
assertThat(retryContext.getStatusAndSubStatusCodes().size()).isLessThanOrEqualTo(10);
assertThat(retryContext.getStatusAndSubStatusCodes().size()).isGreaterThanOrEqualTo(6);
assertThat(retryContext.getStatusAndSubStatusCodes().size()).isGreaterThanOrEqualTo(2);
int[] firstRetryStatusCodes = retryContext.getStatusAndSubStatusCodes().get(0);
int[] lastRetryStatusCodes = retryContext.getStatusAndSubStatusCodes()
.get(retryContext.getStatusAndSubStatusCodes().size() - 1);
Expand All @@ -670,6 +672,8 @@ public void goneExceptionFailureScenario() {
}
} finally {
safeCloseSyncClient(cosmosClient);
System.clearProperty("COSMOS.CLIENT_ENDPOINT_FAILOVER_MAX_RETRY_COUNT");
System.clearProperty("COSMOS.CLIENT_ENDPOINT_FAILOVER_RETRY_INTERVAL_IN_MS");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
import static org.assertj.core.api.Java6Assertions.assertThat;

public class NetworkFailureTest extends TestSuiteBase {
private static final int TIMEOUT = ClientRetryPolicy.MaxRetryCount * ClientRetryPolicy.RetryIntervalInMS + 60000;
private static final int TEST_MAX_RETRY_COUNT = 5;
private static final int TEST_RETRY_INTERVAL_MS = 100;
private static final int TIMEOUT = TEST_MAX_RETRY_COUNT * TEST_RETRY_INTERVAL_MS + 60000;
private final DocumentCollection collectionDefinition;

@Factory(dataProvider = "internalClientBuilders")
Expand All @@ -29,6 +31,10 @@ public NetworkFailureTest(AsyncDocumentClient.Builder clientBuilder) {

@Test(groups = { "long-emulator" }, timeOut = TIMEOUT)
public void createCollectionWithUnreachableHost() {
// Override retry constants for this test to avoid 120 × 1s = 2 min wait
System.setProperty("COSMOS.CLIENT_ENDPOINT_FAILOVER_MAX_RETRY_COUNT", String.valueOf(TEST_MAX_RETRY_COUNT));
System.setProperty("COSMOS.CLIENT_ENDPOINT_FAILOVER_RETRY_INTERVAL_IN_MS", String.valueOf(TEST_RETRY_INTERVAL_MS));

SpyClientUnderTestFactory.ClientWithGatewaySpy client = null;

try {
Expand Down Expand Up @@ -61,10 +67,13 @@ public void createCollectionWithUnreachableHost() {
validateResourceResponseFailure(createObservable, validator, TIMEOUT);
Instant after = Instant.now();
assertThat(after.toEpochMilli() - start.toEpochMilli())
.isGreaterThanOrEqualTo(ClientRetryPolicy.MaxRetryCount * ClientRetryPolicy.RetryIntervalInMS);
.isGreaterThanOrEqualTo(TEST_MAX_RETRY_COUNT * TEST_RETRY_INTERVAL_MS);

} finally {
safeClose(client);
// Restore default retry constants so other tests in the same JVM are not affected
System.clearProperty("COSMOS.CLIENT_ENDPOINT_FAILOVER_MAX_RETRY_COUNT");
System.clearProperty("COSMOS.CLIENT_ENDPOINT_FAILOVER_RETRY_INTERVAL_IN_MS");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -409,10 +409,10 @@ public <T> void changeFeedProcessor(
.feedContainer(feedContainer)
.leaseContainer(leaseContainer)
.options(new ChangeFeedProcessorOptions()
.setLeaseRenewInterval(Duration.ofSeconds(20))
.setLeaseAcquireInterval(Duration.ofSeconds(10))
.setLeaseExpirationInterval(Duration.ofSeconds(30))
.setFeedPollDelay(Duration.ofSeconds(2))
.setLeaseRenewInterval(Duration.ofSeconds(5))
.setLeaseAcquireInterval(Duration.ofSeconds(2))
.setLeaseExpirationInterval(Duration.ofSeconds(10))
.setFeedPollDelay(Duration.ofMillis(500))
.setLeasePrefix("TEST")
.setMaxItemCount(10)
.setStartFromBeginning(true)
Expand All @@ -425,10 +425,14 @@ public <T> void changeFeedProcessor(
.timeout(Duration.ofMillis(2 * CHANGE_FEED_PROCESSOR_TIMEOUT))
.subscribe();

// Wait for the feed processor to receive and process the documents.
Thread.sleep(2 * CHANGE_FEED_PROCESSOR_TIMEOUT);
// Poll until CFP is started instead of fixed sleep
long deadline = System.currentTimeMillis() + 2 * CHANGE_FEED_PROCESSOR_TIMEOUT;
while (System.currentTimeMillis() < deadline && !changeFeedProcessor.isStarted()) {
Thread.sleep(200);
}
assertThat(changeFeedProcessor.isStarted()).as("Change Feed Processor instance is running").isTrue();

// Poll until all documents are received
long remainingWork = 2 * CHANGE_FEED_PROCESSOR_TIMEOUT;
while (remainingWork > 0 && receivedDocuments.size() < createdDocuments.size()) {
remainingWork -= 100;
Expand All @@ -441,9 +445,12 @@ public <T> void changeFeedProcessor(
} finally {
changeFeedProcessor.stop().subscribeOn(Schedulers.boundedElastic()).timeout(Duration.ofMillis(CHANGE_FEED_PROCESSOR_TIMEOUT)).subscribe();

// Wait for the feed processor to shutdown.
// Poll until CFP is stopped instead of fixed sleep
try {
Thread.sleep(CHANGE_FEED_PROCESSOR_TIMEOUT);
long stopDeadline = System.currentTimeMillis() + CHANGE_FEED_PROCESSOR_TIMEOUT;
while (System.currentTimeMillis() < stopDeadline && changeFeedProcessor.isStarted()) {
Thread.sleep(200);
}
} catch (InterruptedException e) {
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2201,8 +2201,12 @@ void validateChangeFeedProcessing(
throw ex;
}

// Wait for the feed processor to receive and process the documents.
Thread.sleep(sleepTime);
// Poll until all documents are received instead of sleeping the full duration.
// This returns as soon as documents arrive, saving significant time in CI.
long deadline = System.currentTimeMillis() + sleepTime;
while (System.currentTimeMillis() < deadline && receivedDocuments.size() < createdDocuments.size()) {
Thread.sleep(100);
}

assertThat(changeFeedProcessor.isStarted()).as("Change Feed Processor instance is running").isTrue();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ private ShouldRetryResult shouldRetryOnSessionNotAvailable(RxDocumentServiceRequ
}

private Mono<ShouldRetryResult> shouldRetryOnEndpointFailureAsync(boolean isReadRequest, boolean forceRefresh, boolean usePreferredLocations) {
if (!this.enableEndpointDiscovery || this.failoverRetryCount > MaxRetryCount) {
if (!this.enableEndpointDiscovery || this.failoverRetryCount > Configs.getEndpointFailoverMaxRetryCount()) {
logger.warn("ShouldRetryOnEndpointFailureAsync() Not retrying. Retry count = {}", this.failoverRetryCount);
return Mono.just(ShouldRetryResult.noRetry());
}
Expand All @@ -308,10 +308,10 @@ private Mono<ShouldRetryResult> shouldRetryOnEndpointFailureAsync(boolean isRead
logger.debug("Failover happening. retryCount {}", this.failoverRetryCount);
if (this.failoverRetryCount > 1) {
//if retried both endpoints, follow regular retry interval.
retryDelay = Duration.ofMillis(ClientRetryPolicy.RetryIntervalInMS);
retryDelay = Duration.ofMillis(Configs.getEndpointFailoverRetryIntervalInMs());
}
} else {
retryDelay = Duration.ofMillis(ClientRetryPolicy.RetryIntervalInMS);
retryDelay = Duration.ofMillis(Configs.getEndpointFailoverRetryIntervalInMs());
}
return refreshLocationCompletable.then(Mono.just(ShouldRetryResult.retryAfter(retryDelay)));
}
Expand All @@ -332,14 +332,14 @@ private Mono<ShouldRetryResult> shouldRetryOnGatewayTimeout(CosmosException clie

//if operation is data plane read, metadata read, or query plan it can be retried on a different endpoint.
if (canPerformCrossRegionRetryOnGatewayReadTimeout) {
if (!this.enableEndpointDiscovery || this.failoverRetryCount > MaxRetryCount) {
if (!this.enableEndpointDiscovery || this.failoverRetryCount > Configs.getEndpointFailoverMaxRetryCount()) {
logger.warn("shouldRetryOnHttpTimeout() Not retrying. Retry count = {}", this.failoverRetryCount);
return Mono.just(ShouldRetryResult.noRetry());
}

this.failoverRetryCount++;
this.retryContext = new RetryContext(this.failoverRetryCount, true);
Duration retryDelay = Duration.ofMillis(ClientRetryPolicy.RetryIntervalInMS);
Duration retryDelay = Duration.ofMillis(Configs.getEndpointFailoverRetryIntervalInMs());
return Mono.just(ShouldRetryResult.retryAfter(retryDelay));
}

Expand All @@ -352,7 +352,7 @@ private Mono<ShouldRetryResult> shouldRetryOnGatewayTimeout(CosmosException clie
}

private Mono<ShouldRetryResult> shouldNotRetryOnEndpointFailureAsync(boolean isReadRequest , boolean forceRefresh, boolean usePreferredLocations) {
if (!this.enableEndpointDiscovery || this.failoverRetryCount > MaxRetryCount) {
if (!this.enableEndpointDiscovery || this.failoverRetryCount > Configs.getEndpointFailoverMaxRetryCount()) {
logger.warn("ShouldRetryOnEndpointFailureAsync() Not retrying. Retry count = {}", this.failoverRetryCount);
return Mono.just(ShouldRetryResult.noRetry());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,13 @@ public class Configs {
private static final boolean DEFAULT_CLIENT_LEAK_DETECTION_ENABLED = false;
private static final String CLIENT_LEAK_DETECTION_ENABLED = "COSMOS.CLIENT_LEAK_DETECTION_ENABLED";

// Config for endpoint failover retry policy
// These can be overridden in tests to speed up NetworkFailureTest
private static final String CLIENT_ENDPOINT_FAILOVER_RETRY_INTERVAL_IN_MS = "COSMOS.CLIENT_ENDPOINT_FAILOVER_RETRY_INTERVAL_IN_MS";
private static final int DEFAULT_CLIENT_ENDPOINT_FAILOVER_RETRY_INTERVAL_IN_MS = 1000;
private static final String CLIENT_ENDPOINT_FAILOVER_MAX_RETRY_COUNT = "COSMOS.CLIENT_ENDPOINT_FAILOVER_MAX_RETRY_COUNT";
private static final int DEFAULT_CLIENT_ENDPOINT_FAILOVER_MAX_RETRY_COUNT = 120;

private static final Object lockObject = new Object();
private static Boolean cachedIsHostnameValidationDisabled = null;

Expand Down Expand Up @@ -538,6 +545,14 @@ public static boolean isClientLeakDetectionEnabled() {
return DEFAULT_CLIENT_LEAK_DETECTION_ENABLED;
}

public static int getEndpointFailoverRetryIntervalInMs() {
return getJVMConfigAsInt(CLIENT_ENDPOINT_FAILOVER_RETRY_INTERVAL_IN_MS, DEFAULT_CLIENT_ENDPOINT_FAILOVER_RETRY_INTERVAL_IN_MS);
}

public static int getEndpointFailoverMaxRetryCount() {
return getJVMConfigAsInt(CLIENT_ENDPOINT_FAILOVER_MAX_RETRY_COUNT, DEFAULT_CLIENT_ENDPOINT_FAILOVER_MAX_RETRY_COUNT);
}

public int getUnavailableLocationsExpirationTimeInSeconds() {
return getJVMConfigAsInt(UNAVAILABLE_LOCATIONS_EXPIRATION_TIME_IN_SECONDS, DEFAULT_UNAVAILABLE_LOCATIONS_EXPIRATION_TIME_IN_SECONDS);
}
Expand Down