Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions topic/src/main/java/tech/ydb/topic/read/impl/ReadSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -206,12 +206,19 @@ public void confirm(StartPartitionSessionSettings options) {

long readFrom = committed;
long commitTo = committed;

YdbTopic.StreamReadMessage.StartPartitionSessionResponse.Builder resp = YdbTopic.StreamReadMessage
.StartPartitionSessionResponse.newBuilder()
.setPartitionSessionId(psid);

if (options != null) {
if (options.getReadOffset() != null) {
readFrom = options.getReadOffset();
resp.setReadOffset(readFrom);
}
if (options.getCommitOffset() != null) {
commitTo = options.getCommitOffset();
resp.setCommitOffset(commitTo);
}
}

Expand All @@ -225,13 +232,8 @@ public CompletableFuture<Void> handleDataReceivedEvent(DataReceivedEvent event)
logger.info("[{}] Sending StartPartitionSessionResponse for {} and consumer \"{}\" with readOffset "
+ "{} and commitOffset {}", traceID, partition, consumerName, readFrom, commitTo);
Comment thread
alex268 marked this conversation as resolved.
Comment thread
alex268 marked this conversation as resolved.
send(YdbTopic.StreamReadMessage.FromClient.newBuilder()
.setStartPartitionSessionResponse(YdbTopic.StreamReadMessage.StartPartitionSessionResponse
.newBuilder()
.setPartitionSessionId(psid)
.setReadOffset(readFrom)
.setCommitOffset(commitTo)
.build()
).build());
.setStartPartitionSessionResponse(resp.build())
.build());
}
});
}
Expand Down
81 changes: 66 additions & 15 deletions topic/src/test/java/tech/ydb/topic/TopicReadersIntegrationTest.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package tech.ydb.topic;

import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

Expand Down Expand Up @@ -111,53 +112,103 @@ public void singleThreadExecutorTest() throws Exception {
.setConsumerName(TEST_CONSUMER1)
.build();

Semaphore messageCount = new Semaphore(0);
CountDownLatch read = new CountDownLatch(1);
CompletableFuture<Boolean> processing = new CompletableFuture<>();

ExecutorService executor = Executors.newSingleThreadExecutor((r) -> new Thread(r, "test-executor"));
AsyncReader reader = client.createAsyncReader(readerSettings, ReadEventHandlersSettings.newBuilder()
.setExecutor(executor)
.setEventHandler((event) -> {
messageCount.release();
read.countDown();
processing.join();
}).build()
);

reader.init().join();

// wait for message committing
messageCount.acquireUninterruptibly();
Assert.assertTrue(read.await(5, TimeUnit.SECONDS));

// stop reader
CompletableFuture<Void> f = reader.shutdown();
CompletableFuture<Void> shutdown = reader.shutdown();
processing.completeExceptionally(new RuntimeException("shutdown"));
f.get(5, TimeUnit.SECONDS);
shutdown.get(5, TimeUnit.SECONDS);

executor.shutdownNow();
}

@Test
public void readAllTest() {
public void readAllTest() throws InterruptedException {
ReaderSettings readerSettings = ReaderSettings.newBuilder()
.addTopic(TopicReadSettings.newBuilder().setPath(TEST_TOPIC).build())
.setConsumerName(TEST_CONSUMER1)
.build();

AtomicLong counter = new AtomicLong();
CompletableFuture<Long> result = new CompletableFuture<>();
AtomicLong offset = new AtomicLong();
CountDownLatch read = new CountDownLatch(6);
AsyncReader reader = client.createAsyncReader(readerSettings, ReadEventHandlersSettings.newBuilder()
.setEventHandler((DataReceivedEvent event) -> {
for (Message msg: event.getMessages()) {
Comment thread
alex268 marked this conversation as resolved.
Assert.assertEquals(offset.getAndIncrement(), msg.getOffset());
read.countDown();
}
}).build());

reader.init().join();
Assert.assertTrue(read.await(5, TimeUnit.SECONDS));
Assert.assertEquals(6, offset.get());
reader.shutdown().join();
}

@Test
public void readAllByPartitionIdTest() throws InterruptedException {
ReaderSettings readerSettings = ReaderSettings.newBuilder()
.addTopic(TopicReadSettings.newBuilder()
.setPath(TEST_TOPIC)
.setPartitionIds(Arrays.asList(0L))
.build())
.setConsumerName(TEST_CONSUMER1)
.build();

AtomicLong offset = new AtomicLong();
CountDownLatch read = new CountDownLatch(6);
AsyncReader reader = client.createAsyncReader(readerSettings, ReadEventHandlersSettings.newBuilder()
.setEventHandler((DataReceivedEvent event) -> {
for (Message msg: event.getMessages()) {
Comment thread
alex268 marked this conversation as resolved.
Assert.assertEquals(offset.getAndIncrement(), msg.getOffset());
read.countDown();
}
}).build());

reader.init().join();
Assert.assertTrue(read.await(5, TimeUnit.SECONDS));
Assert.assertEquals(6, offset.get());
reader.shutdown().join();
}

@Test
public void readAllWithoutConsumerTest() throws InterruptedException {
ReaderSettings readerSettings = ReaderSettings.newBuilder()
.addTopic(TopicReadSettings.newBuilder()
.setPath(TEST_TOPIC)
.setPartitionIds(Arrays.asList(0L))
.build())
.withoutConsumer()
.build();

AtomicLong offset = new AtomicLong();
CountDownLatch read = new CountDownLatch(6);
AsyncReader reader = client.createAsyncReader(readerSettings, ReadEventHandlersSettings.newBuilder()
.setEventHandler((DataReceivedEvent event) -> {
for (Message msg: event.getMessages()) {
Comment thread
alex268 marked this conversation as resolved.
Assert.assertEquals(msg.getOffset(), counter.get());
long read = counter.incrementAndGet();
if (new String(msg.getData()).equals("stop")) {
result.complete(read);
}
Assert.assertEquals(offset.getAndIncrement(), msg.getOffset());
read.countDown();
}
}).build());

reader.init().join();
Assert.assertEquals(Long.valueOf(6), result.join());
Assert.assertTrue(read.await(5, TimeUnit.SECONDS));
Assert.assertEquals(6, offset.get());
reader.shutdown().join();
}

Expand Down