From 7180fc1bbddaf7ad24fc32780720003bf3fb26c9 Mon Sep 17 00:00:00 2001 From: Alexandr Gorshenin Date: Mon, 15 Jun 2026 17:21:25 +0100 Subject: [PATCH 1/2] Fixed deadlock between MessageCommitter/SessionBase --- .../ydb/topic/read/impl/MessageCommitterImpl.java | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/MessageCommitterImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/MessageCommitterImpl.java index d3091c237..48f722340 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/MessageCommitterImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/MessageCommitterImpl.java @@ -66,15 +66,16 @@ public CompletableFuture commit(OffsetsRange range) { logger.debug("{} Offset range {} is requested to be committed. Last committed offset is {} (commit lag is {})", session, range, lastCommittedOffset, range.getStart() - lastCommittedOffset); + if (!session.commitOffsets(Collections.singletonList(range))) { + logger.info("{} Offset range {} is requested to be committed, but partition session is already stopped", + session, range); + future.completeExceptionally(partitionIsClosedException()); + return future; + } + commitFuturesLock.lock(); try { - if (session.commitOffsets(Collections.singletonList(range))) { - commitFutures.put(range.getEnd(), future); - } else { - logger.info("{} Offset range {} is requested to be committed, but partition session is already stopped", - session, range); - future.completeExceptionally(partitionIsClosedException()); - } + commitFutures.put(range.getEnd(), future); } finally { commitFuturesLock.unlock(); } From e7f2a871c752343790836cefc867424d719ab3c4 Mon Sep 17 00:00:00 2001 From: Alexandr Gorshenin Date: Mon, 15 Jun 2026 19:51:18 +0100 Subject: [PATCH 2/2] Fixed data race on commitFutures --- .../topic/read/impl/MessageCommitterImpl.java | 26 +++++++++++++------ 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/MessageCommitterImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/MessageCommitterImpl.java index 48f722340..09dce0dfb 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/MessageCommitterImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/MessageCommitterImpl.java @@ -62,22 +62,32 @@ public void confirmCommit(long committedOffset) { @Override public CompletableFuture commit(OffsetsRange range) { - CompletableFuture future = new CompletableFuture<>(); logger.debug("{} Offset range {} is requested to be committed. Last committed offset is {} (commit lag is {})", session, range, lastCommittedOffset, range.getStart() - lastCommittedOffset); + CompletableFuture future; + commitFuturesLock.lock(); + try { + future = commitFutures.get(range.getEnd()); + if (future == null) { + future = new CompletableFuture<>(); + commitFutures.put(range.getEnd(), future); + } + } finally { + commitFuturesLock.unlock(); + } + if (!session.commitOffsets(Collections.singletonList(range))) { logger.info("{} Offset range {} is requested to be committed, but partition session is already stopped", session, range); future.completeExceptionally(partitionIsClosedException()); - return future; - } - commitFuturesLock.lock(); - try { - commitFutures.put(range.getEnd(), future); - } finally { - commitFuturesLock.unlock(); + commitFuturesLock.lock(); + try { + commitFutures.remove(range.getEnd()); + } finally { + commitFuturesLock.unlock(); + } } return future;