diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/MessageCommitterImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/MessageCommitterImpl.java index d3091c237..09dce0dfb 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/MessageCommitterImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/MessageCommitterImpl.java @@ -62,23 +62,34 @@ public void confirmCommit(long committedOffset) { @Override public CompletableFuture commit(OffsetsRange range) { - CompletableFuture future = new CompletableFuture<>(); logger.debug("{} Offset range {} is requested to be committed. Last committed offset is {} (commit lag is {})", session, range, lastCommittedOffset, range.getStart() - lastCommittedOffset); + CompletableFuture future; commitFuturesLock.lock(); try { - if (session.commitOffsets(Collections.singletonList(range))) { + future = commitFutures.get(range.getEnd()); + if (future == null) { + future = new CompletableFuture<>(); commitFutures.put(range.getEnd(), future); - } else { - logger.info("{} Offset range {} is requested to be committed, but partition session is already stopped", - session, range); - future.completeExceptionally(partitionIsClosedException()); } } finally { commitFuturesLock.unlock(); } + if (!session.commitOffsets(Collections.singletonList(range))) { + logger.info("{} Offset range {} is requested to be committed, but partition session is already stopped", + session, range); + future.completeExceptionally(partitionIsClosedException()); + + commitFuturesLock.lock(); + try { + commitFutures.remove(range.getEnd()); + } finally { + commitFuturesLock.unlock(); + } + } + return future; }