From 6ad42bcdba2cabd46e4fc6df4e5c6a9137401cda Mon Sep 17 00:00:00 2001 From: Alexandr Gorshenin Date: Tue, 16 Jun 2026 09:21:55 +0100 Subject: [PATCH] Fixed StartPartitionSessionResponse for reading without consumer --- .../tech/ydb/topic/read/impl/ReadSession.java | 16 ++-- .../topic/TopicReadersIntegrationTest.java | 81 +++++++++++++++---- 2 files changed, 75 insertions(+), 22 deletions(-) diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/ReadSession.java b/topic/src/main/java/tech/ydb/topic/read/impl/ReadSession.java index 830c26efa..ec7bc6c56 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/ReadSession.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/ReadSession.java @@ -206,12 +206,19 @@ public void confirm(StartPartitionSessionSettings options) { long readFrom = committed; long commitTo = committed; + + YdbTopic.StreamReadMessage.StartPartitionSessionResponse.Builder resp = YdbTopic.StreamReadMessage + .StartPartitionSessionResponse.newBuilder() + .setPartitionSessionId(psid); + if (options != null) { if (options.getReadOffset() != null) { readFrom = options.getReadOffset(); + resp.setReadOffset(readFrom); } if (options.getCommitOffset() != null) { commitTo = options.getCommitOffset(); + resp.setCommitOffset(commitTo); } } @@ -225,13 +232,8 @@ public CompletableFuture handleDataReceivedEvent(DataReceivedEvent event) logger.info("[{}] Sending StartPartitionSessionResponse for {} and consumer \"{}\" with readOffset " + "{} and commitOffset {}", traceID, partition, consumerName, readFrom, commitTo); send(YdbTopic.StreamReadMessage.FromClient.newBuilder() - .setStartPartitionSessionResponse(YdbTopic.StreamReadMessage.StartPartitionSessionResponse - .newBuilder() - .setPartitionSessionId(psid) - .setReadOffset(readFrom) - .setCommitOffset(commitTo) - .build() - ).build()); + .setStartPartitionSessionResponse(resp.build()) + .build()); } }); } diff --git a/topic/src/test/java/tech/ydb/topic/TopicReadersIntegrationTest.java b/topic/src/test/java/tech/ydb/topic/TopicReadersIntegrationTest.java index 7e2a3246c..3c031aa6e 100644 --- a/topic/src/test/java/tech/ydb/topic/TopicReadersIntegrationTest.java +++ b/topic/src/test/java/tech/ydb/topic/TopicReadersIntegrationTest.java @@ -1,9 +1,10 @@ package tech.ydb.topic; +import java.util.Arrays; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -111,14 +112,14 @@ public void singleThreadExecutorTest() throws Exception { .setConsumerName(TEST_CONSUMER1) .build(); - Semaphore messageCount = new Semaphore(0); + CountDownLatch read = new CountDownLatch(1); CompletableFuture processing = new CompletableFuture<>(); ExecutorService executor = Executors.newSingleThreadExecutor((r) -> new Thread(r, "test-executor")); AsyncReader reader = client.createAsyncReader(readerSettings, ReadEventHandlersSettings.newBuilder() .setExecutor(executor) .setEventHandler((event) -> { - messageCount.release(); + read.countDown(); processing.join(); }).build() ); @@ -126,38 +127,88 @@ public void singleThreadExecutorTest() throws Exception { reader.init().join(); // wait for message committing - messageCount.acquireUninterruptibly(); + Assert.assertTrue(read.await(5, TimeUnit.SECONDS)); // stop reader - CompletableFuture f = reader.shutdown(); + CompletableFuture shutdown = reader.shutdown(); processing.completeExceptionally(new RuntimeException("shutdown")); - f.get(5, TimeUnit.SECONDS); + shutdown.get(5, TimeUnit.SECONDS); executor.shutdownNow(); } @Test - public void readAllTest() { + public void readAllTest() throws InterruptedException { ReaderSettings readerSettings = ReaderSettings.newBuilder() .addTopic(TopicReadSettings.newBuilder().setPath(TEST_TOPIC).build()) .setConsumerName(TEST_CONSUMER1) .build(); - AtomicLong counter = new AtomicLong(); - CompletableFuture result = new CompletableFuture<>(); + AtomicLong offset = new AtomicLong(); + CountDownLatch read = new CountDownLatch(6); + AsyncReader reader = client.createAsyncReader(readerSettings, ReadEventHandlersSettings.newBuilder() + .setEventHandler((DataReceivedEvent event) -> { + for (Message msg: event.getMessages()) { + Assert.assertEquals(offset.getAndIncrement(), msg.getOffset()); + read.countDown(); + } + }).build()); + + reader.init().join(); + Assert.assertTrue(read.await(5, TimeUnit.SECONDS)); + Assert.assertEquals(6, offset.get()); + reader.shutdown().join(); + } + + @Test + public void readAllByPartitionIdTest() throws InterruptedException { + ReaderSettings readerSettings = ReaderSettings.newBuilder() + .addTopic(TopicReadSettings.newBuilder() + .setPath(TEST_TOPIC) + .setPartitionIds(Arrays.asList(0L)) + .build()) + .setConsumerName(TEST_CONSUMER1) + .build(); + + AtomicLong offset = new AtomicLong(); + CountDownLatch read = new CountDownLatch(6); + AsyncReader reader = client.createAsyncReader(readerSettings, ReadEventHandlersSettings.newBuilder() + .setEventHandler((DataReceivedEvent event) -> { + for (Message msg: event.getMessages()) { + Assert.assertEquals(offset.getAndIncrement(), msg.getOffset()); + read.countDown(); + } + }).build()); + + reader.init().join(); + Assert.assertTrue(read.await(5, TimeUnit.SECONDS)); + Assert.assertEquals(6, offset.get()); + reader.shutdown().join(); + } + + @Test + public void readAllWithoutConsumerTest() throws InterruptedException { + ReaderSettings readerSettings = ReaderSettings.newBuilder() + .addTopic(TopicReadSettings.newBuilder() + .setPath(TEST_TOPIC) + .setPartitionIds(Arrays.asList(0L)) + .build()) + .withoutConsumer() + .build(); + + AtomicLong offset = new AtomicLong(); + CountDownLatch read = new CountDownLatch(6); AsyncReader reader = client.createAsyncReader(readerSettings, ReadEventHandlersSettings.newBuilder() .setEventHandler((DataReceivedEvent event) -> { for (Message msg: event.getMessages()) { - Assert.assertEquals(msg.getOffset(), counter.get()); - long read = counter.incrementAndGet(); - if (new String(msg.getData()).equals("stop")) { - result.complete(read); - } + Assert.assertEquals(offset.getAndIncrement(), msg.getOffset()); + read.countDown(); } }).build()); reader.init().join(); - Assert.assertEquals(Long.valueOf(6), result.join()); + Assert.assertTrue(read.await(5, TimeUnit.SECONDS)); + Assert.assertEquals(6, offset.get()); reader.shutdown().join(); }