From f2041732f92b8e6a24400b8f9b7e427fa7e7b6a2 Mon Sep 17 00:00:00 2001 From: Robert Stephan Date: Fri, 15 May 2026 10:17:59 +0200 Subject: [PATCH 1/2] OcflS3Client with multiple S3 buckets --- .../main/java/io/ocfl/aws/OcflS3Client.java | 230 +++++++++++++----- .../ocfl/core/storage/cloud/CloudClient.java | 3 +- .../ocfl/core/storage/cloud/ListResult.java | 6 +- 3 files changed, 173 insertions(+), 66 deletions(-) diff --git a/ocfl-java-aws/src/main/java/io/ocfl/aws/OcflS3Client.java b/ocfl-java-aws/src/main/java/io/ocfl/aws/OcflS3Client.java index 979623b8..2a96900b 100644 --- a/ocfl-java-aws/src/main/java/io/ocfl/aws/OcflS3Client.java +++ b/ocfl-java-aws/src/main/java/io/ocfl/aws/OcflS3Client.java @@ -24,15 +24,6 @@ package io.ocfl.aws; -import io.ocfl.api.OcflRepository; -import io.ocfl.api.exception.OcflIOException; -import io.ocfl.api.util.Enforce; -import io.ocfl.core.storage.cloud.CloudClient; -import io.ocfl.core.storage.cloud.CloudObjectKey; -import io.ocfl.core.storage.cloud.HeadResult; -import io.ocfl.core.storage.cloud.KeyNotFoundException; -import io.ocfl.core.storage.cloud.ListResult; -import io.ocfl.core.util.UncheckedFiles; import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; @@ -42,13 +33,28 @@ import java.util.Collection; import java.util.List; import java.util.Objects; +import java.util.Vector; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.function.BiConsumer; +import java.util.function.BiFunction; import java.util.stream.Collectors; +import java.util.zip.CRC32; +import java.util.zip.Checksum; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import io.ocfl.api.OcflRepository; +import io.ocfl.api.exception.OcflIOException; +import io.ocfl.api.util.Enforce; +import io.ocfl.core.storage.cloud.CloudClient; +import io.ocfl.core.storage.cloud.CloudObjectKey; +import io.ocfl.core.storage.cloud.HeadResult; +import io.ocfl.core.storage.cloud.KeyNotFoundException; +import io.ocfl.core.storage.cloud.ListResult; +import io.ocfl.core.util.UncheckedFiles; import software.amazon.awssdk.core.async.AsyncRequestBody; import software.amazon.awssdk.core.async.AsyncResponseTransformer; import software.amazon.awssdk.services.s3.S3AsyncClient; @@ -74,13 +80,32 @@ public class OcflS3Client implements CloudClient { private static final long EIGHT_MB = 8 * 1024 * 1024; + /** + * default function to retrieve bucket. + * Always returns the first bucket of list used in implementations that handle only one bucket + */ + private static final BiFunction, String> SELECT_FIRST_BUCKET = + (key, buckets) -> buckets.get(0); + + public static final BiFunction, String> SELECT_BUCKET_BY_CRC32_AND_MOD = + (key, buckets) -> { + if (buckets.size() == 1) { + return buckets.get(0); + } + byte[] bytes = key.getKey().getBytes(StandardCharsets.UTF_8); + Checksum crc32 = new CRC32(); + crc32.update(bytes, 0, bytes.length); + return buckets.get((int) (crc32.getValue() % buckets.size())); + }; + private final S3AsyncClient s3Client; private final S3TransferManager transferManager; - private final String bucket; + private final List buckets; private final String repoPrefix; private final CloudObjectKey.Builder keyBuilder; private final BiConsumer putObjectModifier; + private final BiFunction, String> selectBucket; /** * Used to create a new OcflS3Client instance. @@ -98,9 +123,19 @@ public static Builder builder() { * @param bucket s3 bucket */ public OcflS3Client(S3AsyncClient s3Client, String bucket) { - this(s3Client, bucket, null, null, null); + this(s3Client, List.of(bucket), null, null, null, SELECT_FIRST_BUCKET); } + /** + * @see OcflS3Client#builder() + * + * @param s3Client aws sdk s3 client + * @param buckets list of s3 bucket names + */ + public OcflS3Client(S3AsyncClient s3Client, List buckets) { + this(s3Client, buckets, null, null, null, null); + } + /** * @see OcflS3Client#builder() * @@ -116,12 +151,37 @@ public OcflS3Client( String prefix, S3TransferManager transferManager, BiConsumer putObjectModifier) { + this(s3Client, List.of(bucket), prefix, transferManager, putObjectModifier, null); + } + + /** + * @see OcflS3Client#builder() + * + * @param s3Client aws sdk s3 client, not null + * @param buckets list of s3 bucket names, not null + * @param prefix key prefix, may be null + * @param transferManager aws sdk s3 transfer manager, not null + * @param putObjectModifier hook for modifying putObject requests, may be null + */ + public OcflS3Client( + S3AsyncClient s3Client, + List buckets, + String prefix, + S3TransferManager transferManager, + BiConsumer putObjectModifier, + BiFunction, String> selectBucket) { this.s3Client = Enforce.notNull(s3Client, "s3Client cannot be null"); - this.bucket = Enforce.notBlank(bucket, "bucket cannot be blank"); + this.buckets = Enforce.notNull(buckets, "bucket cannot be null"); + for (String bucket : buckets) { + Enforce.notBlank(bucket, "bucket " + bucket + "cannot be empty"); + } this.repoPrefix = sanitizeRepoPrefix(prefix == null ? "" : prefix); this.transferManager = Enforce.notNull(transferManager, "transferManager cannot be null"); this.keyBuilder = CloudObjectKey.builder().prefix(repoPrefix); this.putObjectModifier = putObjectModifier != null ? putObjectModifier : (k, b) -> {}; + this.selectBucket = selectBucket != null + ? selectBucket + : (buckets.size() == 1 ? SELECT_FIRST_BUCKET : SELECT_BUCKET_BY_CRC32_AND_MOD); } private static String sanitizeRepoPrefix(String repoPrefix) { @@ -149,8 +209,8 @@ public void close() { * {@inheritDoc} */ @Override - public String bucket() { - return bucket; + public List buckets() { + return buckets; } /** @@ -176,7 +236,7 @@ public Future uploadFileAsync(Path srcPath, String dstPath) { public Future uploadFileAsync(Path srcPath, String dstPath, String contentType) { var fileSize = UncheckedFiles.size(srcPath); var dstKey = keyBuilder.buildFromPath(dstPath); - + String bucket = selectBucket.apply(dstKey, buckets); LOG.debug("Uploading {} to bucket {} key {} size {}", srcPath, bucket, dstKey, fileSize); var builder = PutObjectRequest.builder().contentType(contentType); @@ -226,6 +286,7 @@ public CloudObjectKey uploadFile(Path srcPath, String dstPath, String contentTyp @Override public CloudObjectKey uploadBytes(String dstPath, byte[] bytes, String contentType) { var dstKey = keyBuilder.buildFromPath(dstPath); + String bucket = selectBucket.apply(dstKey, buckets); LOG.debug("Writing bytes to bucket {} key {}", bucket, dstKey); var builder = PutObjectRequest.builder().contentType(contentType); @@ -249,6 +310,7 @@ public CloudObjectKey uploadBytes(String dstPath, byte[] bytes, String contentTy public CloudObjectKey copyObject(String srcPath, String dstPath) { var srcKey = keyBuilder.buildFromPath(srcPath); var dstKey = keyBuilder.buildFromPath(dstPath); + String bucket = selectBucket.apply(dstKey, buckets); LOG.debug("Copying {} to {} in bucket {}", srcKey, dstKey, bucket); @@ -277,14 +339,14 @@ public CloudObjectKey copyObject(String srcPath, String dstPath) { @Override public Path downloadFile(String srcPath, Path dstPath) { var srcKey = keyBuilder.buildFromPath(srcPath); + String bucket = selectBucket.apply(srcKey, buckets); LOG.debug("Downloading from bucket {} key {} to {}", bucket, srcKey, dstPath); try { transferManager - .downloadFile(req -> req.getObjectRequest(getReq -> - getReq.bucket(bucket).key(srcKey.getKey()).build()) - .destination(dstPath) - .build()) + .downloadFile(req -> req.getObjectRequest(getReq -> getReq.bucket(bucket).key(srcKey.getKey()).build()) + .destination(dstPath) + .build()) .completionFuture() .join(); } catch (RuntimeException e) { @@ -304,6 +366,7 @@ public Path downloadFile(String srcPath, Path dstPath) { @Override public InputStream downloadStream(String srcPath) { var srcKey = keyBuilder.buildFromPath(srcPath); + String bucket = selectBucket.apply(srcKey, buckets); LOG.debug("Streaming from bucket {} key {}", bucket, srcKey); try { @@ -329,6 +392,7 @@ public InputStream downloadStream(String srcPath) { @Override public InputStream downloadStreamRange(String srcPath, String range) { var srcKey = keyBuilder.buildFromPath(srcPath); + String bucket = selectBucket.apply(srcKey, buckets); LOG.debug("Streaming from bucket {} key {} range {}", bucket, srcKey, range); try { @@ -367,6 +431,7 @@ public String downloadString(String srcPath) { @Override public HeadResult head(String path) { var key = keyBuilder.buildFromPath(path); + String bucket = selectBucket.apply(key, buckets()); try { var s3Result = s3Client.headObject(HeadObjectRequest.builder() @@ -395,7 +460,14 @@ public HeadResult head(String path) { @Override public ListResult list(String prefix) { var prefixedPrefix = keyBuilder.buildFromPath(prefix); - return toListResult(ListObjectsV2Request.builder().bucket(bucket).prefix(prefixedPrefix.getKey())); + ListResult lrReturn = new ListResult(); + for (String bucket : buckets) { + ListResult lrBucket = + toListResult(ListObjectsV2Request.builder().bucket(bucket).prefix(prefixedPrefix.getKey())); + lrReturn.getDirectories().addAll(lrBucket.getDirectories()); + lrReturn.getObjects().addAll(lrBucket.getObjects()); + } + return lrReturn; } /** @@ -409,10 +481,16 @@ public ListResult listDirectory(String path) { prefix = prefix + "/"; } - LOG.debug("Listing directory {} in bucket {}", prefix, bucket); - - return toListResult( + ListResult lrReturn = new ListResult(); + for (String bucket : buckets) { + LOG.debug("Listing directory {} in bucket {}", prefix, bucket); + ListResult lrb = toListResult( ListObjectsV2Request.builder().bucket(bucket).delimiter("/").prefix(prefix)); + lrReturn.getDirectories().addAll(lrb.getDirectories()); + lrReturn.getObjects().addAll(lrb.getObjects()); + + } + return lrReturn; } /** @@ -426,22 +504,25 @@ public boolean directoryExists(String path) { prefix = prefix + "/"; } - LOG.debug("Checking existence of {} in bucket {}", prefix, bucket); - try { - var response = s3Client.listObjectsV2(ListObjectsV2Request.builder() - .bucket(bucket) - .delimiter("/") - .prefix(prefix) - .maxKeys(1) - .build()) + for (String bucket : buckets) { + var response = s3Client.listObjectsV2(ListObjectsV2Request.builder() + .bucket(bucket) + .delimiter("/") + .prefix(prefix) + .maxKeys(1) + .build()) .join(); - return response.contents().stream().findAny().isPresent() - || response.commonPrefixes().stream().findAny().isPresent(); + if (response.contents().stream().findAny().isPresent() + || response.commonPrefixes().stream().findAny().isPresent()) { + return true; + } + } } catch (RuntimeException e) { throw new OcflS3Exception("Failed to list objects under " + prefix, OcflS3Util.unwrapCompletionEx(e)); } + return false; } /** @@ -449,13 +530,14 @@ public boolean directoryExists(String path) { */ @Override public void deletePath(String path) { - LOG.debug("Deleting path {} in bucket {}", path, bucket); - - var keys = list(path).getObjects().stream() + for (String bucket : buckets) { + LOG.debug("Deleting path {} in bucket {}", path, bucket); + var keys = list(path).getObjects().stream() .map(ListResult.ObjectListing::getKey) .collect(Collectors.toList()); - deleteObjectsInternal(keys); + deleteObjectsInternal(keys); + } } /** @@ -474,29 +556,32 @@ public void deleteObjects(Collection objectPaths) { } private void deleteObjectsInternal(Collection objectKeys) { - LOG.debug("Deleting objects in bucket {}: {}", bucket, objectKeys); + for (String bucket : buckets) { + LOG.debug("Deleting objects in bucket {}: {}", bucket, objectKeys); - if (!objectKeys.isEmpty()) { - var objectIds = objectKeys.stream() + if (!objectKeys.isEmpty()) { + var objectIds = objectKeys.stream() .map(key -> ObjectIdentifier.builder().key(key.getKey()).build()) .collect(Collectors.toList()); - try { - var futures = new ArrayList>(); + try { + var futures = new ArrayList>(); - // Can only delete at most 1,000 objects per request - for (int i = 0; i < objectIds.size(); i += 999) { - var toDelete = objectIds.subList(i, Math.min(objectIds.size(), i + 999)); - futures.add(s3Client.deleteObjects(DeleteObjectsRequest.builder() + // Can only delete at most 1,000 objects per request + for (int i = 0; i < objectIds.size(); i += 999) { + var toDelete = objectIds.subList(i, Math.min(objectIds.size(), i + 999)); + futures.add(s3Client.deleteObjects(DeleteObjectsRequest.builder() .bucket(bucket) .delete(builder -> builder.objects(toDelete)) .build())); - } + } - CompletableFuture.allOf(futures.toArray(new CompletableFuture[] {})) + CompletableFuture.allOf(futures.toArray(new CompletableFuture[] {})) .join(); - } catch (RuntimeException e) { - throw new OcflS3Exception("Failed to delete objects " + objectIds, OcflS3Util.unwrapCompletionEx(e)); + } catch (RuntimeException e) { + throw new OcflS3Exception("Failed to delete objects " + objectIds, + OcflS3Util.unwrapCompletionEx(e)); + } } } } @@ -517,7 +602,7 @@ public void safeDeleteObjects(Collection objectPaths) { try { deleteObjects(objectPaths); } catch (RuntimeException e) { - LOG.error("Failed to cleanup objects in bucket {}: {}", bucket, objectPaths, e); + LOG.error("Failed to cleanup objects: {}", objectPaths, e); } } @@ -526,17 +611,19 @@ public void safeDeleteObjects(Collection objectPaths) { */ @Override public boolean bucketExists() { - try { - s3Client.headBucket(HeadBucketRequest.builder().bucket(bucket).build()) + for (String bucket : buckets) { + try { + s3Client.headBucket(HeadBucketRequest.builder().bucket(bucket).build()) .join(); - return true; - } catch (RuntimeException e) { - var cause = OcflS3Util.unwrapCompletionEx(e); - if (wasNotFound(cause)) { - return false; + } catch (RuntimeException e) { + var cause = OcflS3Util.unwrapCompletionEx(e); + if (wasNotFound(cause)) { + return false; + } + throw new OcflS3Exception("Failed ot HEAD bucket " + bucket, cause); } - throw new OcflS3Exception("Failed ot HEAD bucket " + bucket, cause); } + return true; } private ListResult toListResult(ListObjectsV2Request.Builder requestBuilder) { @@ -617,10 +704,11 @@ private boolean wasNotFound(Throwable e) { public static class Builder { private S3AsyncClient s3Client; private S3TransferManager transferManager; - private String bucket; + private List buckets = new Vector(); private String repoPrefix; private BiConsumer putObjectModifier; + private BiFunction, String> selectBucket; /** * The AWS SDK S3 client. Required. @@ -667,7 +755,14 @@ public Builder transferManager(S3TransferManager transferManager) { * @return builder */ public Builder bucket(String bucket) { - this.bucket = Enforce.notBlank(bucket, "bucket cannot be blank"); + this.buckets.add(Enforce.notBlank(bucket, "bucket cannot be blank")); + return this; + } + + public Builder buckets(List buckets) { + for (String bucket : buckets) { + this.buckets.add(Enforce.notBlank(bucket, "bucket cannot be blank")); + } return this; } @@ -697,6 +792,17 @@ public Builder putObjectModifier(BiConsumer pu return this; } + /** + * Provides a hook to calculate the name of the S3 bucket from a list of buckets + * + * @param selectBucket hook for modifying putObject requests + * @return builder + */ + public Builder selectBucket(BiFunction, String> selectBucket) { + this.selectBucket = selectBucket; + return this; + } + /** * Constructs a new {@link OcflS3Client}. {@link #s3Client(S3AsyncClient)} and {@link #bucket(String)} must be set. *

@@ -706,7 +812,7 @@ public Builder putObjectModifier(BiConsumer pu * @return OcflS3Client */ public OcflS3Client build() { - return new OcflS3Client(s3Client, bucket, repoPrefix, transferManager, putObjectModifier); + return new OcflS3Client(s3Client, buckets, repoPrefix, transferManager, putObjectModifier, selectBucket); } } } diff --git a/ocfl-java-core/src/main/java/io/ocfl/core/storage/cloud/CloudClient.java b/ocfl-java-core/src/main/java/io/ocfl/core/storage/cloud/CloudClient.java index 33e50b8e..50176f8f 100644 --- a/ocfl-java-core/src/main/java/io/ocfl/core/storage/cloud/CloudClient.java +++ b/ocfl-java-core/src/main/java/io/ocfl/core/storage/cloud/CloudClient.java @@ -27,6 +27,7 @@ import java.io.InputStream; import java.nio.file.Path; import java.util.Collection; +import java.util.List; import java.util.concurrent.Future; /** @@ -44,7 +45,7 @@ public interface CloudClient { * * @return bucket name */ - String bucket(); + List buckets(); /** * The key prefix of all objects within the OCFL repository. This may be empty. Multiple different OCFL repositories diff --git a/ocfl-java-core/src/main/java/io/ocfl/core/storage/cloud/ListResult.java b/ocfl-java-core/src/main/java/io/ocfl/core/storage/cloud/ListResult.java index bf8d6e06..87201d51 100644 --- a/ocfl-java-core/src/main/java/io/ocfl/core/storage/cloud/ListResult.java +++ b/ocfl-java-core/src/main/java/io/ocfl/core/storage/cloud/ListResult.java @@ -25,8 +25,8 @@ package io.ocfl.core.storage.cloud; import io.ocfl.api.util.Enforce; -import java.util.Collections; import java.util.List; +import java.util.Vector; /** * Encapsulates the results of a list operation @@ -37,8 +37,8 @@ public class ListResult { private List directories; public ListResult() { - this.objects = Collections.emptyList(); - this.directories = Collections.emptyList(); + this.objects = new Vector<>(); + this.directories = new Vector<>(); } /** From e3a78d4aee98c8785ff12e0a072f72dabd7c4f1a Mon Sep 17 00:00:00 2001 From: Robert Stephan Date: Fri, 15 May 2026 10:22:36 +0200 Subject: [PATCH 2/2] restore deletePath() --- .../src/main/java/io/ocfl/aws/OcflS3Client.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/ocfl-java-aws/src/main/java/io/ocfl/aws/OcflS3Client.java b/ocfl-java-aws/src/main/java/io/ocfl/aws/OcflS3Client.java index 2a96900b..b333190f 100644 --- a/ocfl-java-aws/src/main/java/io/ocfl/aws/OcflS3Client.java +++ b/ocfl-java-aws/src/main/java/io/ocfl/aws/OcflS3Client.java @@ -530,14 +530,13 @@ public boolean directoryExists(String path) { */ @Override public void deletePath(String path) { - for (String bucket : buckets) { - LOG.debug("Deleting path {} in bucket {}", path, bucket); - var keys = list(path).getObjects().stream() + LOG.debug("Deleting path {}", path); + + var keys = list(path).getObjects().stream() .map(ListResult.ObjectListing::getKey) .collect(Collectors.toList()); - deleteObjectsInternal(keys); - } + deleteObjectsInternal(keys); } /**