Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ class ApiaryUnbufferedReadableByteChannel implements UnbufferedReadableByteChann
private ScatteringByteChannel sbc;
private boolean open;
private boolean returnEOF;
private long totalBytesReadFromNetwork;

// returned X-Goog-Generation header value
private Long xGoogGeneration;
Expand Down Expand Up @@ -128,6 +129,7 @@ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
returnEOF = true;
} else {
totalRead += read;
totalBytesReadFromNetwork += read;
}
return totalRead;
} catch (Exception t) {
Expand Down Expand Up @@ -163,9 +165,25 @@ public boolean isOpen() {

@Override
public void close() throws IOException {
open = false;
if (sbc != null) {
sbc.close();
try {
long requestedLength = apiaryReadRequest.getByteRangeSpec().length();
if (requestedLength >= 0
&& requestedLength < ByteRangeSpec.EFFECTIVE_INFINITY
&& totalBytesReadFromNetwork > requestedLength) {
java.util.logging.Logger.getLogger(ApiaryUnbufferedReadableByteChannel.class.getName())
.warning(
String.format(
"storage: received %d more bytes than requested from GCS for bucket '%s',"
+ " object '%s'",
totalBytesReadFromNetwork - requestedLength,
apiaryReadRequest.getObject().getBucket(),
apiaryReadRequest.getObject().getName()));
}
} finally {
open = false;
if (sbc != null) {
sbc.close();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,37 +199,50 @@ public boolean isOpen() {

@Override
public void close() throws IOException {
open = false;
try {
if (leftovers != null) {
leftovers.close();
long readLimit = req.getReadLimit();
long receivedBytes = fetchOffset.get() - req.getReadOffset();
if (readLimit > 0 && receivedBytes > readLimit) {
java.util.logging.Logger.getLogger(GapicUnbufferedReadableByteChannel.class.getName())
.warning(
String.format(
"storage: received %d more bytes than requested from GCS for bucket '%s',"
+ " object '%s'",
receivedBytes - readLimit, req.getBucket(), req.getObject()));
}
ReadObjectObserver obs = readObjectObserver;
if (obs != null && !obs.cancellation.isDone()) {
obs.cancel();
drainQueue();
try {
// make sure our waiting doesn't lockup permanently
obs.cancellation.get(1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
InterruptedIOException ioe = new InterruptedIOException();
ioe.initCause(e);
ioe.addSuppressed(new AsyncStorageTaskException());
throw ioe;
} catch (ExecutionException e) {
Throwable cause = e;
if (e.getCause() != null) {
cause = e.getCause();
} finally {
open = false;
try {
if (leftovers != null) {
leftovers.close();
}
ReadObjectObserver obs = readObjectObserver;
if (obs != null && !obs.cancellation.isDone()) {
obs.cancel();
drainQueue();
try {
// make sure our waiting doesn't lockup permanently
obs.cancellation.get(1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
InterruptedIOException ioe = new InterruptedIOException();
ioe.initCause(e);
ioe.addSuppressed(new AsyncStorageTaskException());
throw ioe;
} catch (ExecutionException e) {
Throwable cause = e;
if (e.getCause() != null) {
cause = e.getCause();
}
IOException ioException = new IOException(cause);
ioException.addSuppressed(new AsyncStorageTaskException());
throw ioException;
} catch (TimeoutException ignore) {
}
IOException ioException = new IOException(cause);
ioException.addSuppressed(new AsyncStorageTaskException());
throw ioException;
} catch (TimeoutException ignore) {
}
} finally {
drainQueue();
}
} finally {
drainQueue();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,75 @@ public void readObject(
}
}

@Test
public void logsWarning_whenReceivingMoreBytesThanRequested()
throws IOException, ExecutionException, InterruptedException, TimeoutException {
ReadObjectRequest reqWithLimit =
ReadObjectRequest.newBuilder()
.setObject(objectName)
.setReadOffset(0)
.setReadLimit(10)
.build();

StorageGrpc.StorageImplBase fakeStorage =
new StorageGrpc.StorageImplBase() {
@Override
public void readObject(
ReadObjectRequest request, StreamObserver<ReadObjectResponse> responseObserver) {
responseObserver.onNext(resp1); // sends 10 bytes
responseObserver.onNext(resp2); // sends another 10 bytes (total 20 > limit 10)
responseObserver.onCompleted();
}
};

java.util.logging.Logger logger =
java.util.logging.Logger.getLogger(GapicUnbufferedReadableByteChannel.class.getName());
java.util.List<java.util.logging.LogRecord> records = new java.util.ArrayList<>();
java.util.logging.Handler handler =
new java.util.logging.Handler() {
@Override
public void publish(java.util.logging.LogRecord record) {
records.add(record);
}

@Override
public void flush() {}

@Override
public void close() throws SecurityException {}
};
logger.addHandler(handler);

try (FakeServer server = FakeServer.of(fakeStorage);
StorageClient storageClient = StorageClient.create(server.storageSettings())) {
Retrier retrier = TestUtils.retrierFromStorageOptions(server.getGrpcStorageOptions());

UnbufferedReadableByteChannelSession<Object> session =
new UnbufferedReadSession<>(
ApiFutures.immediateFuture(reqWithLimit),
(start, resultFuture) ->
new GapicUnbufferedReadableByteChannel(
resultFuture,
new ZeroCopyServerStreamingCallable<>(
storageClient.readObjectCallable(),
ResponseContentLifecycleManager.noop()),
start,
Hasher.noop(),
retrier,
retryOnly(DataLossException.class)));
byte[] actualBytes = new byte[15];
try (UnbufferedReadableByteChannel c = session.open()) {
c.read(ByteBuffer.wrap(actualBytes));
}

boolean warningLogged =
records.stream().anyMatch(r -> r.getMessage().contains("more bytes than requested"));
assertThat(warningLogged).isTrue();
} finally {
logger.removeHandler(handler);
}
}

private static <E extends ApiException> ResultRetryAlgorithm<?> retryOnly(Class<E> c) {
return new BasicResultRetryAlgorithm<java.lang.Object>() {
@Override
Expand Down
Loading