From 65b4c17055d9b4e6a244c338e8e4525071c58603 Mon Sep 17 00:00:00 2001 From: Alexandr Gorshenin Date: Thu, 11 Jun 2026 14:42:31 +0100 Subject: [PATCH 1/4] RetryableStream: moved initRequest to constructor arg --- .../ydb/topic/impl/TopicRetryableStream.java | 3 +- .../java/tech/ydb/topic/impl/TopicStream.java | 4 +- .../tech/ydb/topic/impl/TopicStreamBase.java | 8 +- .../tech/ydb/topic/impl/TopicStreamFail.java | 2 +- .../ydb/topic/write/impl/WriteSession.java | 5 -- .../ydb/topic/write/impl/WriteStream.java | 4 +- .../topic/write/impl/WriteStreamFactory.java | 87 +++++++++---------- .../topic/impl/TopicRetryableStreamTest.java | 9 +- .../tech/ydb/topic/impl/TopicStreamTest.java | 10 +-- .../write/impl/WriteStreamFactoryTest.java | 23 +++-- 10 files changed, 70 insertions(+), 85 deletions(-) diff --git a/topic/src/main/java/tech/ydb/topic/impl/TopicRetryableStream.java b/topic/src/main/java/tech/ydb/topic/impl/TopicRetryableStream.java index 18c3acf57..3cdb97368 100644 --- a/topic/src/main/java/tech/ydb/topic/impl/TopicRetryableStream.java +++ b/topic/src/main/java/tech/ydb/topic/impl/TopicRetryableStream.java @@ -33,7 +33,6 @@ public TopicRetryableStream(Logger logger, String debugId, RetryConfig config, S } protected abstract TopicStream createNewStream(String debugId); - protected abstract W getInitRequest(); protected abstract void onNext(R message); @@ -53,7 +52,7 @@ public void start() { return; } - stream.start(getInitRequest(), this::onNext).whenComplete((status, th) -> { + stream.start(this::onNext).whenComplete((status, th) -> { realStream.compareAndSet(stream, null); if (status != null) { onStreamStop(status, retryConfig.getStatusRetryPolicy(status)); diff --git a/topic/src/main/java/tech/ydb/topic/impl/TopicStream.java b/topic/src/main/java/tech/ydb/topic/impl/TopicStream.java index d435693ad..75236073b 100644 --- a/topic/src/main/java/tech/ydb/topic/impl/TopicStream.java +++ b/topic/src/main/java/tech/ydb/topic/impl/TopicStream.java @@ -8,8 +8,8 @@ import tech.ydb.core.Status; public interface TopicStream { - CompletableFuture start(W initReq, Consumer messageHandler); - void send(W request); + CompletableFuture start(Consumer messageHandler); + void send(W request); void close(); } diff --git a/topic/src/main/java/tech/ydb/topic/impl/TopicStreamBase.java b/topic/src/main/java/tech/ydb/topic/impl/TopicStreamBase.java index 520bca53e..f091afe7f 100644 --- a/topic/src/main/java/tech/ydb/topic/impl/TopicStreamBase.java +++ b/topic/src/main/java/tech/ydb/topic/impl/TopicStreamBase.java @@ -15,13 +15,15 @@ public abstract class TopicStreamBase impl private final Logger logger; private final String debugId; private final GrpcReadWriteStream stream; + private final W initRequest; private final CompletableFuture streamStatus = new CompletableFuture<>(); private volatile String token; - public TopicStreamBase(Logger logger, String debugId, GrpcReadWriteStream stream) { + public TopicStreamBase(Logger logger, String debugId, GrpcReadWriteStream stream, W initRequest) { this.logger = logger; this.debugId = debugId; this.stream = stream; + this.initRequest = initRequest; this.token = stream.authToken(); } @@ -29,7 +31,7 @@ public TopicStreamBase(Logger logger, String debugId, GrpcReadWriteStream protected abstract Status parseMessageStatus(R message); @Override - public CompletableFuture start(W initReq, Consumer messageHandler) { + public CompletableFuture start(Consumer messageHandler) { this.logger.debug("[{}] is about to start", debugId); this.stream.start((R msg) -> { Status messageStatus = parseMessageStatus(msg); @@ -48,7 +50,7 @@ public CompletableFuture start(W initReq, Consumer messageHandler) { }); if (!streamStatus.isDone()) { - stream.sendNext(initReq); + stream.sendNext(initRequest); } return streamStatus; diff --git a/topic/src/main/java/tech/ydb/topic/impl/TopicStreamFail.java b/topic/src/main/java/tech/ydb/topic/impl/TopicStreamFail.java index 20ef759a4..3d1965d7a 100644 --- a/topic/src/main/java/tech/ydb/topic/impl/TopicStreamFail.java +++ b/topic/src/main/java/tech/ydb/topic/impl/TopicStreamFail.java @@ -21,7 +21,7 @@ public TopicStreamFail(Logger logger, String debugId, Status status) { } @Override - public CompletableFuture start(W initReq, Consumer messageHandler) { + public CompletableFuture start(Consumer messageHandler) { return CompletableFuture.completedFuture(status); } diff --git a/topic/src/main/java/tech/ydb/topic/write/impl/WriteSession.java b/topic/src/main/java/tech/ydb/topic/write/impl/WriteSession.java index f2a5a7d9a..2ecc04f71 100644 --- a/topic/src/main/java/tech/ydb/topic/write/impl/WriteSession.java +++ b/topic/src/main/java/tech/ydb/topic/write/impl/WriteSession.java @@ -51,11 +51,6 @@ protected Stream createNewStream(String id) { return streamFactory.createNewStream(id); } - @Override - protected FromClient getInitRequest() { - return streamFactory.initRequest(); - } - public void sendAll(List list) { for (SentMessage msg: list) { sender.sendMessage(msg); diff --git a/topic/src/main/java/tech/ydb/topic/write/impl/WriteStream.java b/topic/src/main/java/tech/ydb/topic/write/impl/WriteStream.java index 10ccad609..3f0b1ae89 100644 --- a/topic/src/main/java/tech/ydb/topic/write/impl/WriteStream.java +++ b/topic/src/main/java/tech/ydb/topic/write/impl/WriteStream.java @@ -20,8 +20,8 @@ public class WriteStream extends TopicStreamBase implements WriteSession.Stream { private static final Logger logger = LoggerFactory.getLogger(WriteStream.class); - public WriteStream(String id, GrpcReadWriteStream stream) { - super(logger, id, stream); + public WriteStream(String id, GrpcReadWriteStream stream, FromClient initRequest) { + super(logger, id, stream, initRequest); } @Override diff --git a/topic/src/main/java/tech/ydb/topic/write/impl/WriteStreamFactory.java b/topic/src/main/java/tech/ydb/topic/write/impl/WriteStreamFactory.java index a1920a740..ba73cf49e 100644 --- a/topic/src/main/java/tech/ydb/topic/write/impl/WriteStreamFactory.java +++ b/topic/src/main/java/tech/ydb/topic/write/impl/WriteStreamFactory.java @@ -13,7 +13,6 @@ import tech.ydb.core.grpc.GrpcReadWriteStream; import tech.ydb.core.grpc.GrpcRequestSettings; import tech.ydb.proto.StatusCodesProtos; -import tech.ydb.proto.topic.YdbTopic; import tech.ydb.proto.topic.YdbTopic.DescribeTopicRequest; import tech.ydb.proto.topic.YdbTopic.DescribeTopicResult; import tech.ydb.proto.topic.YdbTopic.StreamWriteMessage; @@ -30,17 +29,29 @@ public class WriteStreamFactory { private static final Logger logger = LoggerFactory.getLogger(WriteStreamFactory.class); private final String topicPath; - private final StreamWriteMessage.InitRequest initRequest; protected final TopicRpc rpc; + protected final String producerId; + protected final String messageGroupId; + protected final Long partitionId; private WriteStreamFactory(TopicRpc rpc, WriterSettings settings) { this.rpc = rpc; this.topicPath = settings.getTopicPath(); - String producerId = settings.getProducerId(); - String messageGroupId = settings.getMessageGroupId(); - Long partitionId = settings.getPartitionId(); + this.producerId = settings.getProducerId(); + this.messageGroupId = settings.getMessageGroupId(); + this.partitionId = settings.getPartitionId(); + if (messageGroupId != null && partitionId != null) { + throw new IllegalArgumentException("Both MessageGroupId and PartitionId are set in WriterSettings"); + } + } + + public String getTopicPath() { + return topicPath; + } + + public StreamWriteMessage.InitRequest buildInitRequest() { StreamWriteMessage.InitRequest.Builder req = StreamWriteMessage.InitRequest.newBuilder() .setPath(topicPath); @@ -48,29 +59,18 @@ private WriteStreamFactory(TopicRpc rpc, WriterSettings settings) { req.setProducerId(producerId); } if (messageGroupId != null) { - if (partitionId != null) { - throw new IllegalArgumentException("Both MessageGroupId and PartitionId are set in WriterSettings"); - } req.setMessageGroupId(messageGroupId); - } else if (partitionId != null) { + } + if (partitionId != null) { req.setPartitionId(partitionId); } - this.initRequest = req.build(); - } - - public String getTopicPath() { - return topicPath; + return req.build(); } public WriteSession.Stream createNewStream(String id) { - return new WriteStream(id, rpc.writeSession(id)); - } - - public YdbTopic.StreamWriteMessage.FromClient initRequest() { - return YdbTopic.StreamWriteMessage.FromClient.newBuilder() - .setInitRequest(initRequest) - .build(); + FromClient init = FromClient.newBuilder().setInitRequest(buildInitRequest()).build(); + return new WriteStream(id, rpc.writeSession(id), init); } protected Result lookupNodeId(String id, long partitionId) { @@ -98,7 +98,7 @@ protected Result lookupNodeId(String id, long partitionId) { } protected Result lookupPartitionId(String id, String producerId) { - CompletableFuture> partitionId = new CompletableFuture<>(); + CompletableFuture> pidFuture = new CompletableFuture<>(); // create one-shot stream to detect partitionID for this producer logger.info("[{}] create probe stream for topic {} with producer {}", id, topicPath, producerId); @@ -113,7 +113,7 @@ protected Result lookupPartitionId(String id, String producerId) { Status status = Status.of(StatusCode.fromProto(resp.getStatus()), Issue.fromPb(resp.getIssuesList())); logger.warn("[{}] probe stream to topic {} with producer {} got error {}", id, topicPath, producerId, status); - partitionId.complete(Result.fail(status)); + pidFuture.complete(Result.fail(status)); return; } @@ -121,7 +121,7 @@ protected Result lookupPartitionId(String id, String producerId) { long pid = resp.getInitResponse().getPartitionId(); logger.info("[{}] probe stream to topic {} with producer {} has partition {}", id, topicPath, producerId, pid); - partitionId.complete(Result.success(pid)); + pidFuture.complete(Result.success(pid)); return; } @@ -129,7 +129,7 @@ protected Result lookupPartitionId(String id, String producerId) { producerId, resp.getClass().getName()); Issue issue = Issue.of("Unexpected message from stream with producer " + producerId, Issue.Severity.ERROR); - partitionId.complete(Result.fail(Status.of(StatusCode.BAD_REQUEST, issue))); + pidFuture.complete(Result.fail(Status.of(StatusCode.BAD_REQUEST, issue))); }); if (streamFuture.isDone()) { @@ -141,14 +141,15 @@ protected Result lookupPartitionId(String id, String producerId) { try { streamFuture.whenComplete((st, th) -> { Status status = st != null ? st : Status.of(StatusCode.CLIENT_INTERNAL_ERROR, th); - if (!partitionId.isDone()) { + if (!pidFuture.isDone()) { logger.warn("[{}] probe stream to topic {} with producer {} failed with status {}", id, topicPath, producerId, status); - partitionId.complete(Result.fail(status)); + pidFuture.complete(Result.fail(status)); } }); - stream.sendNext(initRequest()); - return partitionId.join(); + FromClient init = FromClient.newBuilder().setInitRequest(buildInitRequest()).build(); + stream.sendNext(init); + return pidFuture.join(); } finally { if (!streamFuture.isDone()) { stream.close(); @@ -162,22 +163,19 @@ public static WriteStreamFactory of(TopicRpc rpc, WriterSettings settings) { } if (settings.getPartitionId() != null) { - return new DirectWriteByPartitionId(rpc, settings, settings.getPartitionId()); + return new DirectWriteByPartitionId(rpc, settings); } if (settings.getProducerId() != null) { - return new DirectWriteByProducerId(rpc, settings, settings.getProducerId()); + return new DirectWriteByProducerId(rpc, settings); } throw new IllegalArgumentException("Direct writing requires PartitionId or ProducerId in WriterSettings"); } private static class DirectWriteByPartitionId extends WriteStreamFactory { - private final long partitionId; - - private DirectWriteByPartitionId(TopicRpc rpc, WriterSettings settings, long partitionId) { + private DirectWriteByPartitionId(TopicRpc rpc, WriterSettings settings) { super(rpc, settings); - this.partitionId = partitionId; } @Override @@ -193,26 +191,24 @@ public WriteSession.Stream createNewStream(String id) { .withDirectMode(true) .withPreferredNodeID(nodeId.getValue()) .build(); - return new WriteStream(id, rpc.writeSession(settings)); + FromClient init = FromClient.newBuilder().setInitRequest(buildInitRequest()).build(); + return new WriteStream(id, rpc.writeSession(settings), init); } } private static class DirectWriteByProducerId extends WriteStreamFactory { - private final String producerId; - - private DirectWriteByProducerId(TopicRpc rpc, WriterSettings settings, String producerId) { + private DirectWriteByProducerId(TopicRpc rpc, WriterSettings settings) { super(rpc, settings); - this.producerId = producerId; } @Override public WriteSession.Stream createNewStream(String id) { - Result partitionId = lookupPartitionId(id, producerId); - if (!partitionId.isSuccess()) { - return new WriteStream.Fail(id, partitionId.getStatus()); + Result partId = lookupPartitionId(id, producerId); + if (!partId.isSuccess()) { + return new WriteStream.Fail(id, partId.getStatus()); } - Result nodeId = lookupNodeId(id, partitionId.getValue()); + Result nodeId = lookupNodeId(id, partId.getValue()); if (!nodeId.isSuccess()) { return new WriteStream.Fail(id, nodeId.getStatus()); } @@ -223,7 +219,8 @@ public WriteSession.Stream createNewStream(String id) { .withDirectMode(true) .withPreferredNodeID(nodeId.getValue()) .build(); - return new WriteStream(id, rpc.writeSession(settings)); + FromClient init = FromClient.newBuilder().setInitRequest(buildInitRequest()).build(); + return new WriteStream(id, rpc.writeSession(settings), init); } } } diff --git a/topic/src/test/java/tech/ydb/topic/impl/TopicRetryableStreamTest.java b/topic/src/test/java/tech/ydb/topic/impl/TopicRetryableStreamTest.java index 3ef82d174..f09eab067 100644 --- a/topic/src/test/java/tech/ydb/topic/impl/TopicRetryableStreamTest.java +++ b/topic/src/test/java/tech/ydb/topic/impl/TopicRetryableStreamTest.java @@ -42,14 +42,14 @@ private static class StreamHandle { StreamHandle(TopicStreamBase mocked) { this.stream = mocked; - Mockito.when(mocked.start(Mockito.any(), Mockito.any())).thenReturn(grpcFuture); + Mockito.when(mocked.start(Mockito.any())).thenReturn(grpcFuture); } StreamHandle() { Mockito.when(grpc.authToken()).thenReturn("token"); Mockito.when(grpc.start(Mockito.any())).thenReturn(grpcFuture); - stream = new TopicStreamBase(logger, "inner", grpc) { + stream = new TopicStreamBase(logger, "inner", grpc, EMPTY) { @Override protected Empty updateTokenMessage(String token) { return EMPTY; @@ -89,11 +89,6 @@ protected TopicStream createNewStream(String debugId) { return handles.get(handleIndex++).stream; } - @Override - protected Empty getInitRequest() { - return EMPTY; - } - @Override protected void onNext(Empty message) { receivedMessages.add(message); diff --git a/topic/src/test/java/tech/ydb/topic/impl/TopicStreamTest.java b/topic/src/test/java/tech/ydb/topic/impl/TopicStreamTest.java index 642195fc2..c63b77e65 100644 --- a/topic/src/test/java/tech/ydb/topic/impl/TopicStreamTest.java +++ b/topic/src/test/java/tech/ydb/topic/impl/TopicStreamTest.java @@ -26,7 +26,7 @@ private interface MockedStream extends GrpcReadWriteStream { TestStream(MockedStream mock) { - super(logger, "test", mock); + super(logger, "test", mock, msg("init")); } @Override @@ -65,7 +65,7 @@ public void baseTest() { List received = new ArrayList<>(); TestStream stream = new TestStream(mock); - CompletableFuture result = stream.start(msg("init"), received::add); + CompletableFuture result = stream.start(received::add); Mockito.verify(mock).start(observer.capture()); stream.send(msg("s1")); @@ -104,7 +104,7 @@ public void startStreamAndImmediatelyFinishTest() { MockedStream mock = buildMockedStream("token", status); TestStream stream = new TestStream(mock); - stream.start(msg("init-req"), msg -> {}); + stream.start(msg -> {}); Mockito.verify(mock).start(Mockito.any()); stream.send(msg("s1")); @@ -123,7 +123,7 @@ public void nonSuccessMessageStopsStreamTest() { List received = new ArrayList<>(); TestStream stream = new TestStream(mock); - CompletableFuture result = stream.start(msg("init"), received::add); + CompletableFuture result = stream.start(received::add); Mockito.verify(mock).start(observer.capture()); @@ -153,7 +153,7 @@ public void tokenUpdatesTest() { Mockito.when(mock.start(Mockito.any())).thenReturn(streamFuture); TestStream stream = new TestStream(mock); - stream.start(msg("init"), msg -> {}); + stream.start(msg -> {}); Mockito.verify(mock).start(Mockito.any()); diff --git a/topic/src/test/java/tech/ydb/topic/write/impl/WriteStreamFactoryTest.java b/topic/src/test/java/tech/ydb/topic/write/impl/WriteStreamFactoryTest.java index ce48d8183..8d473449f 100644 --- a/topic/src/test/java/tech/ydb/topic/write/impl/WriteStreamFactoryTest.java +++ b/topic/src/test/java/tech/ydb/topic/write/impl/WriteStreamFactoryTest.java @@ -99,8 +99,7 @@ public void writeWithoutDeduplicationTest() { .setTopicPath("/test/topic") .build()); - YdbTopic.StreamWriteMessage.InitRequest req = factory.initRequest() - .getInitRequest(); + YdbTopic.StreamWriteMessage.InitRequest req = factory.buildInitRequest(); Assert.assertEquals("/test/topic", req.getPath()); Assert.assertEquals("", req.getProducerId()); Assert.assertFalse(req.hasMessageGroupId()); @@ -115,8 +114,7 @@ public void writeWithProducerIdTest() { .setProducerId("producer") .build()); - YdbTopic.StreamWriteMessage.InitRequest req = factory.initRequest() - .getInitRequest(); + YdbTopic.StreamWriteMessage.InitRequest req = factory.buildInitRequest(); Assert.assertEquals("/test/topic", req.getPath()); Assert.assertEquals("producer", req.getProducerId()); Assert.assertFalse(req.hasMessageGroupId()); @@ -132,8 +130,7 @@ public void writeWithProducerIdAndMessageGroupIdTest() { .setMessageGroupId("producer") .build()); - YdbTopic.StreamWriteMessage.InitRequest req = factory.initRequest() - .getInitRequest(); + YdbTopic.StreamWriteMessage.InitRequest req = factory.buildInitRequest(); Assert.assertEquals("/test/topic", req.getPath()); Assert.assertEquals("producer", req.getProducerId()); Assert.assertEquals("producer", req.getMessageGroupId()); @@ -148,7 +145,7 @@ public void writeWithPartitionIdTest() { .setPartitionId(5L) .build()); - YdbTopic.StreamWriteMessage.InitRequest req = factory.initRequest().getInitRequest(); + YdbTopic.StreamWriteMessage.InitRequest req = factory.buildInitRequest(); Assert.assertEquals(5L, req.getPartitionId()); Assert.assertFalse(req.hasMessageGroupId()); } @@ -216,7 +213,7 @@ public void directWriteByPartitionIdTestDescribeFailTest() { Mockito.verify(rpc, Mockito.never()).writeSession(Mockito.any(GrpcRequestSettings.class)); Assert.assertTrue(stream instanceof WriteStream.Fail); - CompletableFuture res = stream.start(null, null); + CompletableFuture res = stream.start(null); Assert.assertTrue(res.isDone()); Assert.assertEquals(Status.of(StatusCode.UNAVAILABLE), res.join()); @@ -240,7 +237,7 @@ public void directWriteByPartitionIdTestPartitionNotFoundTest() { Mockito.verify(rpc, Mockito.never()).writeSession(Mockito.any(GrpcRequestSettings.class)); Assert.assertTrue(stream instanceof WriteStream.Fail); - CompletableFuture res = stream.start(null, null); + CompletableFuture res = stream.start(null); Assert.assertTrue(res.isDone()); Status expected = Status.of(StatusCode.BAD_REQUEST, Issue.of("Cannot find partition 3", Issue.Severity.ERROR)); Assert.assertEquals(expected, res.join()); @@ -301,7 +298,7 @@ public void directWriteByProducerIdProbeFailTest() { Assert.assertTrue(stream instanceof WriteStream.Fail); Mockito.verify(rpc).writeSession(Mockito.any(GrpcRequestSettings.class)); - CompletableFuture res = stream.start(null, null); + CompletableFuture res = stream.start(null); Assert.assertTrue(res.isDone()); Assert.assertEquals(Status.of(StatusCode.UNAUTHORIZED), res.join()); stream.close(); // no effect @@ -330,7 +327,7 @@ public void directWriteByProducerIdProbeWrongResponseTest() { Assert.assertTrue(stream instanceof WriteStream.Fail); Mockito.verify(rpc).writeSession(Mockito.any(GrpcRequestSettings.class)); - CompletableFuture res = stream.start(null, null); + CompletableFuture res = stream.start(null); Assert.assertTrue(res.isDone()); Assert.assertEquals(Status.of(StatusCode.INTERNAL_ERROR), res.join()); stream.close(); // no effect @@ -360,7 +357,7 @@ public void directWriteByProducerIdProbeUnexpectedResponseTest() { Assert.assertTrue(stream instanceof WriteStream.Fail); Mockito.verify(rpc).writeSession(Mockito.any(GrpcRequestSettings.class)); - CompletableFuture res = stream.start(null, null); + CompletableFuture res = stream.start(null); Assert.assertTrue(res.isDone()); Issue issue = Issue.of("Unexpected message from stream with producer producer-1", Issue.Severity.ERROR); Assert.assertEquals(Status.of(StatusCode.BAD_REQUEST, issue), res.join()); @@ -396,7 +393,7 @@ public void directWriteByProducerIdPartitionNotFoundTest() { WriteSession.Stream stream = factory.createNewStream("s1"); Assert.assertTrue(stream instanceof WriteStream.Fail); - CompletableFuture res = stream.start(null, null); + CompletableFuture res = stream.start(null); Assert.assertTrue(res.isDone()); Status expected = Status.of(StatusCode.BAD_REQUEST, Issue.of("Cannot find partition 5", Issue.Severity.ERROR)); Assert.assertEquals(expected, res.join()); From ce799aee712e30dff2596422acca6f2f46af7551 Mon Sep 17 00:00:00 2001 From: Alexandr Gorshenin Date: Thu, 11 Jun 2026 21:01:18 +0100 Subject: [PATCH 2/4] Use generation for topic's direct write --- .../ydb/topic/write/impl/WriteSession.java | 9 +- .../write/impl/WriteStreamDirectFactory.java | 166 +++++++++ .../topic/write/impl/WriteStreamFactory.java | 171 +-------- .../tech/ydb/topic/write/impl/WriterImpl.java | 20 +- .../impl/WriteStreamDirectFactoryTest.java | 348 ++++++++++++++++++ .../write/impl/WriteStreamFactoryTest.java | 311 +--------------- 6 files changed, 545 insertions(+), 480 deletions(-) create mode 100644 topic/src/main/java/tech/ydb/topic/write/impl/WriteStreamDirectFactory.java create mode 100644 topic/src/test/java/tech/ydb/topic/write/impl/WriteStreamDirectFactoryTest.java diff --git a/topic/src/main/java/tech/ydb/topic/write/impl/WriteSession.java b/topic/src/main/java/tech/ydb/topic/write/impl/WriteSession.java index 2ecc04f71..ee716e43b 100644 --- a/topic/src/main/java/tech/ydb/topic/write/impl/WriteSession.java +++ b/topic/src/main/java/tech/ydb/topic/write/impl/WriteSession.java @@ -1,5 +1,6 @@ package tech.ydb.topic.write.impl; import java.util.List; +import java.util.concurrent.ScheduledExecutorService; import java.util.function.BiConsumer; import org.slf4j.Logger; @@ -10,7 +11,6 @@ import tech.ydb.proto.topic.YdbTopic; import tech.ydb.proto.topic.YdbTopic.StreamWriteMessage.FromClient; import tech.ydb.proto.topic.YdbTopic.StreamWriteMessage.FromServer; -import tech.ydb.topic.TopicRpc; import tech.ydb.topic.impl.TopicRetryableStream; import tech.ydb.topic.impl.TopicStream; import tech.ydb.topic.settings.WriterSettings; @@ -38,10 +38,11 @@ public interface Listener { private final MessageSender sender; private final BiConsumer errorsHandler; - public WriteSession(String debugId, TopicRpc rpc, WriterSettings settings, Listener controller) { - super(logger, debugId, settings.getRetryConfig(), rpc.getScheduler()); + public WriteSession(String debugId, WriteStreamFactory factory, WriterSettings settings, + ScheduledExecutorService scheduler, Listener controller) { + super(logger, debugId, settings.getRetryConfig(), scheduler); this.listener = controller; - this.streamFactory = WriteStreamFactory.of(rpc, settings); + this.streamFactory = factory; this.sender = new MessageSender(debugId, settings.getCodec(), this::send); this.errorsHandler = settings.getErrorsHandler(); } diff --git a/topic/src/main/java/tech/ydb/topic/write/impl/WriteStreamDirectFactory.java b/topic/src/main/java/tech/ydb/topic/write/impl/WriteStreamDirectFactory.java new file mode 100644 index 000000000..c74b2c6c6 --- /dev/null +++ b/topic/src/main/java/tech/ydb/topic/write/impl/WriteStreamDirectFactory.java @@ -0,0 +1,166 @@ +package tech.ydb.topic.write.impl; + +import java.time.Duration; +import java.util.concurrent.CompletableFuture; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import tech.ydb.core.Issue; +import tech.ydb.core.Result; +import tech.ydb.core.Status; +import tech.ydb.core.StatusCode; +import tech.ydb.core.grpc.GrpcReadWriteStream; +import tech.ydb.core.grpc.GrpcRequestSettings; +import tech.ydb.proto.StatusCodesProtos; +import tech.ydb.proto.topic.YdbTopic; +import tech.ydb.proto.topic.YdbTopic.StreamWriteMessage; +import tech.ydb.proto.topic.YdbTopic.StreamWriteMessage.FromClient; +import tech.ydb.proto.topic.YdbTopic.StreamWriteMessage.FromServer; +import tech.ydb.topic.TopicRpc; +import tech.ydb.topic.settings.WriterSettings; + +/** + * + * @author Aleksandr Gorshenin {@literal } + */ +public class WriteStreamDirectFactory extends WriteStreamFactory { + private static final Logger logger = LoggerFactory.getLogger(WriteStreamDirectFactory.class); + + public WriteStreamDirectFactory(TopicRpc rpc, WriterSettings settings) { + super(rpc, settings); + + if (settings.getPartitionId() == null && settings.getProducerId() == null) { + throw new IllegalArgumentException("Direct writing requires PartitionId or ProducerId in WriterSettings"); + } + } + + @Override + public WriteSession.Stream createNewStream(String id) { + Long partition = partitionId; + if (partition == null) { + Result pid = lookupPartitionId(id, producerId); + if (!pid.isSuccess()) { + return new WriteStream.Fail(id, pid.getStatus()); + } + partition = pid.getValue(); + } + + + Result location = lookupLocation(id, partition); + if (!location.isSuccess()) { + return new WriteStream.Fail(id, location.getStatus()); + } + + StreamWriteMessage.InitRequest.Builder req = StreamWriteMessage.InitRequest.newBuilder() + .setPath(topicPath) + .setPartitionWithGeneration(YdbTopic.PartitionWithGeneration.newBuilder() + .setPartitionId(partition) + .setGeneration(location.getValue().getGeneration()) + .build()); + + if (producerId != null) { + req.setProducerId(producerId); + } + if (messageGroupId != null) { + req.setMessageGroupId(messageGroupId); + } + + FromClient init = FromClient.newBuilder().setInitRequest(req).build(); + GrpcRequestSettings settings = GrpcRequestSettings.newBuilder() + .withTraceId(id) + .disableDeadline() + .withDirectMode(true) + .withPreferredNodeID(location.getValue().getNodeId()) + .build(); + + return new WriteStream(id, rpc.writeSession(settings), init); + } + + + protected Result lookupLocation(String id, long partitionId) { + logger.info("[{}] describe topic {} to look up node for partition {}", id, topicPath, partitionId); + Result describeTopic = rpc.describeTopic( + YdbTopic.DescribeTopicRequest.newBuilder().setIncludeLocation(true).setPath(topicPath).build(), + GrpcRequestSettings.newBuilder().withDeadline(Duration.ofMinutes(1)).build() + ).join(); + + if (!describeTopic.isSuccess()) { + logger.warn("[{}] describe topic {} failed with status {}", id, topicPath, describeTopic.getStatus()); + return Result.fail(describeTopic.getStatus()); + } + + // lookup for partition location + for (YdbTopic.DescribeTopicResult.PartitionInfo partition : describeTopic.getValue().getPartitionsList()) { + if (partition.getPartitionId() == partitionId) { + return Result.success(partition.getPartitionLocation()); + } + } + + logger.warn("[{}] topic {} doesn't have partition {}, direct writing failed", id, topicPath, partitionId); + Issue issue = Issue.of("Cannot find partition " + partitionId, Issue.Severity.ERROR); + return Result.fail(Status.of(StatusCode.BAD_REQUEST, issue)); + } + + private Result lookupPartitionId(String id, String producerId) { + CompletableFuture> pidFuture = new CompletableFuture<>(); + + // create one-shot stream to detect partitionID for this producer + logger.info("[{}] create probe stream for topic {} with producer {}", id, topicPath, producerId); + GrpcRequestSettings settings = GrpcRequestSettings.newBuilder() + .withTraceId(id + "-probe") + .withDeadline(Duration.ofMinutes(1)) + .build(); + GrpcReadWriteStream stream = rpc.writeSession(settings); + + CompletableFuture streamFuture = stream.start(resp -> { + if (resp.getStatus() != StatusCodesProtos.StatusIds.StatusCode.SUCCESS) { + Status status = Status.of(StatusCode.fromProto(resp.getStatus()), Issue.fromPb(resp.getIssuesList())); + logger.warn("[{}] probe stream to topic {} with producer {} got error {}", id, topicPath, + producerId, status); + pidFuture.complete(Result.fail(status)); + return; + } + + if (resp.hasInitResponse()) { + long pid = resp.getInitResponse().getPartitionId(); + logger.info("[{}] probe stream to topic {} with producer {} has partition {}", id, topicPath, + producerId, pid); + pidFuture.complete(Result.success(pid)); + return; + } + + logger.warn("[{}] probe stream to topic {} with producer {} got unexpected message {}", id, topicPath, + producerId, resp.getClass().getName()); + + Issue issue = Issue.of("Unexpected message from stream with producer " + producerId, Issue.Severity.ERROR); + pidFuture.complete(Result.fail(Status.of(StatusCode.BAD_REQUEST, issue))); + }); + + if (streamFuture.isDone()) { + logger.warn("[{}] probe stream to topic {} with producer {} failed with status {}", id, topicPath, + producerId, streamFuture.join()); + return Result.fail(streamFuture.join()); + } + + try { + streamFuture.whenComplete((st, th) -> { + Status status = st != null ? st : Status.of(StatusCode.CLIENT_INTERNAL_ERROR, th); + if (!pidFuture.isDone()) { + logger.warn("[{}] probe stream to topic {} with producer {} failed with status {}", id, topicPath, + producerId, status); + pidFuture.complete(Result.fail(status)); + } + }); + YdbTopic.StreamWriteMessage.FromClient init = YdbTopic.StreamWriteMessage.FromClient.newBuilder() + .setInitRequest(buildInitRequest()) + .build(); + stream.sendNext(init); + return pidFuture.join(); + } finally { + if (!streamFuture.isDone()) { + stream.close(); + } + } + } +} diff --git a/topic/src/main/java/tech/ydb/topic/write/impl/WriteStreamFactory.java b/topic/src/main/java/tech/ydb/topic/write/impl/WriteStreamFactory.java index ba73cf49e..2cc0459d5 100644 --- a/topic/src/main/java/tech/ydb/topic/write/impl/WriteStreamFactory.java +++ b/topic/src/main/java/tech/ydb/topic/write/impl/WriteStreamFactory.java @@ -1,23 +1,9 @@ package tech.ydb.topic.write.impl; -import java.time.Duration; -import java.util.concurrent.CompletableFuture; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import tech.ydb.core.Issue; -import tech.ydb.core.Result; -import tech.ydb.core.Status; -import tech.ydb.core.StatusCode; -import tech.ydb.core.grpc.GrpcReadWriteStream; -import tech.ydb.core.grpc.GrpcRequestSettings; -import tech.ydb.proto.StatusCodesProtos; -import tech.ydb.proto.topic.YdbTopic.DescribeTopicRequest; -import tech.ydb.proto.topic.YdbTopic.DescribeTopicResult; import tech.ydb.proto.topic.YdbTopic.StreamWriteMessage; import tech.ydb.proto.topic.YdbTopic.StreamWriteMessage.FromClient; -import tech.ydb.proto.topic.YdbTopic.StreamWriteMessage.FromServer; import tech.ydb.topic.TopicRpc; import tech.ydb.topic.settings.WriterSettings; @@ -26,15 +12,13 @@ * @author Aleksandr Gorshenin */ public class WriteStreamFactory { - private static final Logger logger = LoggerFactory.getLogger(WriteStreamFactory.class); - - private final String topicPath; + protected final String topicPath; protected final TopicRpc rpc; protected final String producerId; protected final String messageGroupId; protected final Long partitionId; - private WriteStreamFactory(TopicRpc rpc, WriterSettings settings) { + public WriteStreamFactory(TopicRpc rpc, WriterSettings settings) { this.rpc = rpc; this.topicPath = settings.getTopicPath(); @@ -72,155 +56,4 @@ public WriteSession.Stream createNewStream(String id) { FromClient init = FromClient.newBuilder().setInitRequest(buildInitRequest()).build(); return new WriteStream(id, rpc.writeSession(id), init); } - - protected Result lookupNodeId(String id, long partitionId) { - logger.info("[{}] describe topic {} to look up node for partition {}", id, topicPath, partitionId); - Result describeTopic = rpc.describeTopic( - DescribeTopicRequest.newBuilder().setIncludeLocation(true).setPath(topicPath).build(), - GrpcRequestSettings.newBuilder().withDeadline(Duration.ofMinutes(1)).build() - ).join(); - - if (!describeTopic.isSuccess()) { - logger.warn("[{}] describe topic {} failed with status {}", id, topicPath, describeTopic.getStatus()); - return Result.fail(describeTopic.getStatus()); - } - - // lookup for nodeID - for (DescribeTopicResult.PartitionInfo partition : describeTopic.getValue().getPartitionsList()) { - if (partition.getPartitionId() == partitionId) { - return Result.success(partition.getPartitionLocation().getNodeId()); - } - } - - logger.warn("[{}] topic {} doesn't have partition {}, direct writing failed", id, topicPath, partitionId); - Issue issue = Issue.of("Cannot find partition " + partitionId, Issue.Severity.ERROR); - return Result.fail(Status.of(StatusCode.BAD_REQUEST, issue)); - } - - protected Result lookupPartitionId(String id, String producerId) { - CompletableFuture> pidFuture = new CompletableFuture<>(); - - // create one-shot stream to detect partitionID for this producer - logger.info("[{}] create probe stream for topic {} with producer {}", id, topicPath, producerId); - GrpcRequestSettings settings = GrpcRequestSettings.newBuilder() - .withTraceId(id + "-probe") - .withDeadline(Duration.ofMinutes(1)) - .build(); - GrpcReadWriteStream stream = rpc.writeSession(settings); - - CompletableFuture streamFuture = stream.start(resp -> { - if (resp.getStatus() != StatusCodesProtos.StatusIds.StatusCode.SUCCESS) { - Status status = Status.of(StatusCode.fromProto(resp.getStatus()), Issue.fromPb(resp.getIssuesList())); - logger.warn("[{}] probe stream to topic {} with producer {} got error {}", id, topicPath, - producerId, status); - pidFuture.complete(Result.fail(status)); - return; - } - - if (resp.hasInitResponse()) { - long pid = resp.getInitResponse().getPartitionId(); - logger.info("[{}] probe stream to topic {} with producer {} has partition {}", id, topicPath, - producerId, pid); - pidFuture.complete(Result.success(pid)); - return; - } - - logger.warn("[{}] probe stream to topic {} with producer {} got unexpected message {}", id, topicPath, - producerId, resp.getClass().getName()); - - Issue issue = Issue.of("Unexpected message from stream with producer " + producerId, Issue.Severity.ERROR); - pidFuture.complete(Result.fail(Status.of(StatusCode.BAD_REQUEST, issue))); - }); - - if (streamFuture.isDone()) { - logger.warn("[{}] probe stream to topic {} with producer {} failed with status {}", id, topicPath, - producerId, streamFuture.join()); - return Result.fail(streamFuture.join()); - } - - try { - streamFuture.whenComplete((st, th) -> { - Status status = st != null ? st : Status.of(StatusCode.CLIENT_INTERNAL_ERROR, th); - if (!pidFuture.isDone()) { - logger.warn("[{}] probe stream to topic {} with producer {} failed with status {}", id, topicPath, - producerId, status); - pidFuture.complete(Result.fail(status)); - } - }); - FromClient init = FromClient.newBuilder().setInitRequest(buildInitRequest()).build(); - stream.sendNext(init); - return pidFuture.join(); - } finally { - if (!streamFuture.isDone()) { - stream.close(); - } - } - } - - public static WriteStreamFactory of(TopicRpc rpc, WriterSettings settings) { - if (!settings.isDirectWrite()) { - return new WriteStreamFactory(rpc, settings); - } - - if (settings.getPartitionId() != null) { - return new DirectWriteByPartitionId(rpc, settings); - } - - if (settings.getProducerId() != null) { - return new DirectWriteByProducerId(rpc, settings); - } - - throw new IllegalArgumentException("Direct writing requires PartitionId or ProducerId in WriterSettings"); - } - - private static class DirectWriteByPartitionId extends WriteStreamFactory { - private DirectWriteByPartitionId(TopicRpc rpc, WriterSettings settings) { - super(rpc, settings); - } - - @Override - public WriteSession.Stream createNewStream(String id) { - Result nodeId = lookupNodeId(id, partitionId); - if (!nodeId.isSuccess()) { - return new WriteStream.Fail(id, nodeId.getStatus()); - } - - GrpcRequestSettings settings = GrpcRequestSettings.newBuilder() - .withTraceId(id) - .disableDeadline() - .withDirectMode(true) - .withPreferredNodeID(nodeId.getValue()) - .build(); - FromClient init = FromClient.newBuilder().setInitRequest(buildInitRequest()).build(); - return new WriteStream(id, rpc.writeSession(settings), init); - } - } - - private static class DirectWriteByProducerId extends WriteStreamFactory { - private DirectWriteByProducerId(TopicRpc rpc, WriterSettings settings) { - super(rpc, settings); - } - - @Override - public WriteSession.Stream createNewStream(String id) { - Result partId = lookupPartitionId(id, producerId); - if (!partId.isSuccess()) { - return new WriteStream.Fail(id, partId.getStatus()); - } - - Result nodeId = lookupNodeId(id, partId.getValue()); - if (!nodeId.isSuccess()) { - return new WriteStream.Fail(id, nodeId.getStatus()); - } - - GrpcRequestSettings settings = GrpcRequestSettings.newBuilder() - .withTraceId(id) - .disableDeadline() - .withDirectMode(true) - .withPreferredNodeID(nodeId.getValue()) - .build(); - FromClient init = FromClient.newBuilder().setInitRequest(buildInitRequest()).build(); - return new WriteStream(id, rpc.writeSession(settings), init); - } - } } diff --git a/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java b/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java index b209afc8a..6e8e8e2c6 100644 --- a/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java +++ b/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java @@ -47,12 +47,15 @@ public class WriterImpl { private Boolean isSeqNoProvided = null; - public WriterImpl(TopicRpc topicRpc, - WriterSettings settings, - Executor compressionExecutor, - @Nonnull CodecRegistry codecRegistry) { + public WriterImpl(TopicRpc topicRpc, WriterSettings settings, Executor compressionExecutor, + @Nonnull CodecRegistry codecRegistry) { + this(topicRpc, defaultFactory(topicRpc, settings), settings, compressionExecutor, codecRegistry); + } + + public WriterImpl(TopicRpc topicRpc, WriteStreamFactory factory, WriterSettings settings, + Executor compressionExecutor, @Nonnull CodecRegistry codecRegistry) { this.debugId = DebugTools.createDebugId(settings.getLogPrefix()); - this.stream = new WriteSession(debugId, topicRpc, settings, new ListenerImpl()); + this.stream = new WriteSession(debugId, factory, settings, topicRpc.getScheduler(), new ListenerImpl()); this.writeQueue = new WriterQueue(debugId, settings, codecRegistry, compressionExecutor, sendTask); logger.info("Writer with id {} created for topic \"{}\" with producerId \"{}\" and messageGroupId \"{}\"", @@ -180,4 +183,11 @@ public void run() { stream.sendAll(send); } } + + private static WriteStreamFactory defaultFactory(TopicRpc topicRpc, WriterSettings settings) { + if (settings.isDirectWrite()) { + return new WriteStreamDirectFactory(topicRpc, settings); + } + return new WriteStreamFactory(topicRpc, settings); + } } diff --git a/topic/src/test/java/tech/ydb/topic/write/impl/WriteStreamDirectFactoryTest.java b/topic/src/test/java/tech/ydb/topic/write/impl/WriteStreamDirectFactoryTest.java new file mode 100644 index 000000000..bdb002116 --- /dev/null +++ b/topic/src/test/java/tech/ydb/topic/write/impl/WriteStreamDirectFactoryTest.java @@ -0,0 +1,348 @@ +package tech.ydb.topic.write.impl; + +import java.util.Arrays; +import java.util.concurrent.CompletableFuture; + +import org.junit.Assert; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +import tech.ydb.core.Issue; +import tech.ydb.core.Result; +import tech.ydb.core.Status; +import tech.ydb.core.StatusCode; +import tech.ydb.core.grpc.GrpcReadStream; +import tech.ydb.core.grpc.GrpcReadWriteStream; +import tech.ydb.core.grpc.GrpcRequestSettings; +import tech.ydb.proto.StatusCodesProtos; +import tech.ydb.proto.topic.YdbTopic; +import tech.ydb.proto.topic.YdbTopic.StreamWriteMessage.FromClient; +import tech.ydb.proto.topic.YdbTopic.StreamWriteMessage.FromServer; +import tech.ydb.proto.topic.YdbTopic.StreamWriteMessage.InitResponse; +import tech.ydb.topic.TopicRpc; +import tech.ydb.topic.settings.WriterSettings; + +/** + * + * @author Aleksandr Gorshenin {@literal } + */ +public class WriteStreamDirectFactoryTest { + private static GrpcReadWriteStream mockGrpcStream() { + @SuppressWarnings("unchecked") + GrpcReadWriteStream grpc = Mockito.mock(GrpcReadWriteStream.class); + Mockito.when(grpc.authToken()).thenReturn(""); + Mockito.when(grpc.start(Mockito.any())).thenReturn(new CompletableFuture<>()); + return grpc; + } + + private static void mockStreamError(GrpcReadWriteStream mock, Status error) { + Mockito.when(mock.start(Mockito.any())).thenReturn(CompletableFuture.completedFuture(error)); + } + + private static void mockStreamResponse(GrpcReadWriteStream mock, FromServer response) { + CompletableFuture result = new CompletableFuture<>(); + + Mockito.when(mock.start(Mockito.any())).thenAnswer(iom -> { + GrpcReadStream.Observer obs = iom.getArgument(0); + obs.onNext(response); + return result; + }).thenReturn(result); + + Mockito.doAnswer((iom) -> { + result.complete(Status.SUCCESS); + return null; + }).when(mock).close(); + } + + private static YdbTopic.DescribeTopicResult.PartitionInfo partition(long partitionId, int nodeId, long generation) { + return YdbTopic.DescribeTopicResult.PartitionInfo.newBuilder() + .setPartitionId(partitionId) + .setPartitionLocation(YdbTopic.PartitionLocation.newBuilder() + .setNodeId(nodeId) + .setGeneration(generation) + .build()) + .build(); + } + + private static void mockDescribeResult(TopicRpc rpc, YdbTopic.DescribeTopicResult.PartitionInfo... partitions) { + Mockito.when(rpc.describeTopic(Mockito.any(), Mockito.any())) + .thenReturn(CompletableFuture.completedFuture(Result.success( + YdbTopic.DescribeTopicResult.newBuilder().addAllPartitions(Arrays.asList(partitions)).build()) + )); + } + + private static void mockDescribeResult(TopicRpc rpc, Status status) { + Mockito.when(rpc.describeTopic(Mockito.any(), Mockito.any())) + .thenReturn(CompletableFuture.completedFuture(Result.fail(status))); + } + + @Test + public void invalidDirectWriteTest() { + TopicRpc rpc = Mockito.mock(TopicRpc.class); + WriterSettings settings = WriterSettings.newBuilder() + .setTopicPath("/local/topic") + .setDirectWrite(true) // requires producerId or partitionId + .build(); + + Exception ex = Assert.assertThrows( + IllegalArgumentException.class, + () -> new WriteStreamDirectFactory(rpc, settings) + ); + Assert.assertEquals("Direct writing requires PartitionId or ProducerId in WriterSettings", ex.getMessage()); + } + + @Test + public void directWriteByPartitionIdTest() { + GrpcReadWriteStream grpc = mockGrpcStream(); + TopicRpc rpc = Mockito.mock(TopicRpc.class); + + mockDescribeResult(rpc, partition(1L, 10, 3L), partition(2L, 42, 1L), partition(3L, 23, 2L)); + Mockito.when(rpc.writeSession(Mockito.any(GrpcRequestSettings.class))).thenReturn(grpc); + + WriterSettings settings = WriterSettings.newBuilder() + .setTopicPath("/local/topic") + .setPartitionId(2L) + .setDirectWrite(true) + .build(); + + // just verify it doesn't throw and returns a factory for the correct topic + WriteStreamFactory factory = new WriteStreamDirectFactory(rpc, settings); + Assert.assertEquals("/local/topic", factory.getTopicPath()); + + WriteSession.Stream stream = factory.createNewStream("s1"); + Assert.assertTrue(stream instanceof WriteStream); + + ArgumentCaptor options = ArgumentCaptor.forClass(GrpcRequestSettings.class); + Mockito.verify(rpc).writeSession(options.capture()); + Assert.assertTrue(options.getValue().isDirectMode()); + Assert.assertEquals(42, options.getValue().getPreferredNodeID().intValue()); + + stream.start(null); + + ArgumentCaptor msg = ArgumentCaptor.forClass(FromClient.class); + Mockito.verify(grpc).sendNext(msg.capture()); + Assert.assertTrue(msg.getValue().hasInitRequest()); + Assert.assertEquals("/local/topic", msg.getValue().getInitRequest().getPath()); + Assert.assertFalse(msg.getValue().getInitRequest().hasPartitionId()); + Assert.assertEquals("", msg.getValue().getInitRequest().getProducerId()); + Assert.assertEquals(2L, msg.getValue().getInitRequest().getPartitionWithGeneration().getPartitionId()); + Assert.assertEquals(1L, msg.getValue().getInitRequest().getPartitionWithGeneration().getGeneration()); + } + + @Test + public void directWriteByPartitionIdTestDescribeFailTest() { + TopicRpc rpc = Mockito.mock(TopicRpc.class); + mockDescribeResult(rpc, Status.of(StatusCode.UNAVAILABLE)); + + WriteStreamFactory factory = new WriteStreamDirectFactory(rpc, WriterSettings.newBuilder() + .setTopicPath("/test/topic") + .setPartitionId(3L) + .setDirectWrite(true) + .build()); + + WriteSession.Stream stream = factory.createNewStream("s1"); + + Mockito.verify(rpc, Mockito.never()).writeSession(Mockito.any(GrpcRequestSettings.class)); + + Assert.assertTrue(stream instanceof WriteStream.Fail); + CompletableFuture res = stream.start(null); + Assert.assertTrue(res.isDone()); + Assert.assertEquals(Status.of(StatusCode.UNAVAILABLE), res.join()); + + stream.close(); // no effect + } + + @Test + public void directWriteByPartitionIdTestPartitionNotFoundTest() { + TopicRpc rpc = Mockito.mock(TopicRpc.class); + // result has partition 5, but we're looking for partition 3 + mockDescribeResult(rpc, partition(4L, 99, 1L), partition(5L, 100, 2L)); + + WriteStreamFactory factory = new WriteStreamDirectFactory(rpc, WriterSettings.newBuilder() + .setTopicPath("/test/topic") + .setPartitionId(3L) + .setDirectWrite(true) + .build()); + + WriteSession.Stream stream = factory.createNewStream("s1"); + + Mockito.verify(rpc, Mockito.never()).writeSession(Mockito.any(GrpcRequestSettings.class)); + + Assert.assertTrue(stream instanceof WriteStream.Fail); + CompletableFuture res = stream.start(null); + Assert.assertTrue(res.isDone()); + Status expected = Status.of(StatusCode.BAD_REQUEST, Issue.of("Cannot find partition 3", Issue.Severity.ERROR)); + Assert.assertEquals(expected, res.join()); + + stream.close(); // no effect + } + + @Test + public void directWriteByProducerIdTest() { + TopicRpc rpc = Mockito.mock(TopicRpc.class); + + GrpcReadWriteStream probeGrpc = mockGrpcStream(); + GrpcReadWriteStream actualGrpc = mockGrpcStream(); + + FromServer initResponse = FromServer.newBuilder() + .setStatus(StatusCodesProtos.StatusIds.StatusCode.SUCCESS) + .setInitResponse(InitResponse.newBuilder() + .setLastSeqNo(0) + .setPartitionId(7L) + .setSessionId("session") + .build()) + .build(); + + mockStreamResponse(probeGrpc, initResponse); + mockDescribeResult(rpc, partition(7L, 55, 3L)); + + Mockito.when(rpc.writeSession(Mockito.any(GrpcRequestSettings.class))) + .thenReturn(probeGrpc).thenReturn(actualGrpc); + + WriteStreamFactory factory = new WriteStreamDirectFactory(rpc, WriterSettings.newBuilder() + .setTopicPath("/test/topic") + .setProducerId("producer-1") + .setDirectWrite(true) + .build()); + + WriteSession.Stream stream = factory.createNewStream("s1"); + Assert.assertTrue(stream instanceof WriteStream); + + ArgumentCaptor options = ArgumentCaptor.forClass(GrpcRequestSettings.class); + Mockito.verify(rpc, Mockito.times(2)).writeSession(options.capture()); + Assert.assertTrue(options.getValue().isDirectMode()); + Assert.assertEquals(55, options.getValue().getPreferredNodeID().intValue()); + + stream.start(null); + + ArgumentCaptor msg = ArgumentCaptor.forClass(FromClient.class); + Mockito.verify(actualGrpc).sendNext(msg.capture()); + Assert.assertTrue(msg.getValue().hasInitRequest()); + Assert.assertEquals("/test/topic", msg.getValue().getInitRequest().getPath()); + Assert.assertFalse(msg.getValue().getInitRequest().hasPartitionId()); + Assert.assertEquals("producer-1", msg.getValue().getInitRequest().getProducerId()); + Assert.assertEquals(7L, msg.getValue().getInitRequest().getPartitionWithGeneration().getPartitionId()); + Assert.assertEquals(3L, msg.getValue().getInitRequest().getPartitionWithGeneration().getGeneration()); + } + + @Test + public void directWriteByProducerIdProbeFailTest() { + TopicRpc rpc = Mockito.mock(TopicRpc.class); + + GrpcReadWriteStream probeGrpc = mockGrpcStream(); + + mockStreamError(probeGrpc, Status.of(StatusCode.UNAUTHORIZED)); + + Mockito.when(rpc.writeSession(Mockito.any(GrpcRequestSettings.class))).thenReturn(probeGrpc); + + WriteStreamFactory factory = new WriteStreamDirectFactory(rpc, WriterSettings.newBuilder() + .setTopicPath("/test/topic") + .setProducerId("producer-1") + .setDirectWrite(true) + .build()); + + WriteSession.Stream stream = factory.createNewStream("s1"); + Assert.assertTrue(stream instanceof WriteStream.Fail); + Mockito.verify(rpc).writeSession(Mockito.any(GrpcRequestSettings.class)); + + CompletableFuture res = stream.start(null); + Assert.assertTrue(res.isDone()); + Assert.assertEquals(Status.of(StatusCode.UNAUTHORIZED), res.join()); + stream.close(); // no effect + } + + @Test + public void directWriteByProducerIdProbeWrongResponseTest() { + TopicRpc rpc = Mockito.mock(TopicRpc.class); + + GrpcReadWriteStream probeGrpc = mockGrpcStream(); + + FromServer initResponse = FromServer.newBuilder() + .setStatus(StatusCodesProtos.StatusIds.StatusCode.INTERNAL_ERROR) + .build(); + mockStreamResponse(probeGrpc, initResponse); + + Mockito.when(rpc.writeSession(Mockito.any(GrpcRequestSettings.class))).thenReturn(probeGrpc); + + WriteStreamFactory factory = new WriteStreamDirectFactory(rpc, WriterSettings.newBuilder() + .setTopicPath("/test/topic") + .setProducerId("producer-1") + .setDirectWrite(true) + .build()); + + WriteSession.Stream stream = factory.createNewStream("s1"); + Assert.assertTrue(stream instanceof WriteStream.Fail); + Mockito.verify(rpc).writeSession(Mockito.any(GrpcRequestSettings.class)); + + CompletableFuture res = stream.start(null); + Assert.assertTrue(res.isDone()); + Assert.assertEquals(Status.of(StatusCode.INTERNAL_ERROR), res.join()); + stream.close(); // no effect + } + + @Test + public void directWriteByProducerIdProbeUnexpectedResponseTest() { + TopicRpc rpc = Mockito.mock(TopicRpc.class); + + GrpcReadWriteStream probeGrpc = mockGrpcStream(); + + FromServer initResponse = FromServer.newBuilder() + .setStatus(StatusCodesProtos.StatusIds.StatusCode.SUCCESS) + .setUpdateTokenResponse(YdbTopic.UpdateTokenResponse.newBuilder().build()) + .build(); + mockStreamResponse(probeGrpc, initResponse); + + Mockito.when(rpc.writeSession(Mockito.any(GrpcRequestSettings.class))).thenReturn(probeGrpc); + + WriteStreamFactory factory = new WriteStreamDirectFactory(rpc, WriterSettings.newBuilder() + .setTopicPath("/test/topic") + .setProducerId("producer-1") + .setDirectWrite(true) + .build()); + + WriteSession.Stream stream = factory.createNewStream("s1"); + Assert.assertTrue(stream instanceof WriteStream.Fail); + Mockito.verify(rpc).writeSession(Mockito.any(GrpcRequestSettings.class)); + + CompletableFuture res = stream.start(null); + Assert.assertTrue(res.isDone()); + Issue issue = Issue.of("Unexpected message from stream with producer producer-1", Issue.Severity.ERROR); + Assert.assertEquals(Status.of(StatusCode.BAD_REQUEST, issue), res.join()); + stream.close(); // no effect + } + + @Test + public void directWriteByProducerIdPartitionNotFoundTest() { + TopicRpc rpc = Mockito.mock(TopicRpc.class); + + GrpcReadWriteStream probeGrpc = mockGrpcStream(); + + FromServer initResponse = FromServer.newBuilder() + .setStatus(StatusCodesProtos.StatusIds.StatusCode.SUCCESS) + .setInitResponse(InitResponse.newBuilder() + .setLastSeqNo(0) + .setPartitionId(5L) + .setSessionId("session") + .build()) + .build(); + + mockStreamResponse(probeGrpc, initResponse); + mockDescribeResult(rpc, partition(1L, 55, 8L), partition(2L, 55, 7L)); + + Mockito.when(rpc.writeSession(Mockito.any(GrpcRequestSettings.class))).thenReturn(probeGrpc); + + WriteStreamFactory factory = new WriteStreamDirectFactory(rpc, WriterSettings.newBuilder() + .setTopicPath("/test/topic") + .setProducerId("producer-1") + .setDirectWrite(true) + .build()); + + WriteSession.Stream stream = factory.createNewStream("s1"); + Assert.assertTrue(stream instanceof WriteStream.Fail); + CompletableFuture res = stream.start(null); + Assert.assertTrue(res.isDone()); + Status expected = Status.of(StatusCode.BAD_REQUEST, Issue.of("Cannot find partition 5", Issue.Severity.ERROR)); + Assert.assertEquals(expected, res.join()); + } +} diff --git a/topic/src/test/java/tech/ydb/topic/write/impl/WriteStreamFactoryTest.java b/topic/src/test/java/tech/ydb/topic/write/impl/WriteStreamFactoryTest.java index 8d473449f..267fdae10 100644 --- a/topic/src/test/java/tech/ydb/topic/write/impl/WriteStreamFactoryTest.java +++ b/topic/src/test/java/tech/ydb/topic/write/impl/WriteStreamFactoryTest.java @@ -1,22 +1,12 @@ package tech.ydb.topic.write.impl; -import java.util.Arrays; -import java.util.concurrent.CompletableFuture; import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; -import tech.ydb.core.Issue; -import tech.ydb.core.Result; -import tech.ydb.core.Status; -import tech.ydb.core.StatusCode; -import tech.ydb.core.grpc.GrpcReadStream; import tech.ydb.core.grpc.GrpcReadWriteStream; -import tech.ydb.core.grpc.GrpcRequestSettings; -import tech.ydb.proto.StatusCodesProtos; import tech.ydb.proto.topic.YdbTopic; -import tech.ydb.proto.topic.YdbTopic.DescribeTopicResult; import tech.ydb.proto.topic.YdbTopic.StreamWriteMessage.FromClient; import tech.ydb.proto.topic.YdbTopic.StreamWriteMessage.FromServer; import tech.ydb.topic.TopicRpc; @@ -26,57 +16,11 @@ * @author Aleksandr Gorshenin */ public class WriteStreamFactoryTest { - - @SuppressWarnings("unchecked") - private static GrpcReadWriteStream mockGrpcStream() { - GrpcReadWriteStream grpc = Mockito.mock(GrpcReadWriteStream.class); - Mockito.when(grpc.authToken()).thenReturn(""); - return grpc; - } - - private static void mockStreamError(GrpcReadWriteStream mock, Status error) { - Mockito.when(mock.start(Mockito.any())).thenReturn(CompletableFuture.completedFuture(error)); - } - - private static void mockStreamResponse(GrpcReadWriteStream mock, FromServer response) { - CompletableFuture result = new CompletableFuture<>(); - - Mockito.when(mock.start(Mockito.any())).thenAnswer(iom -> { - GrpcReadStream.Observer obs = iom.getArgument(0); - obs.onNext(response); - return result; - }).thenReturn(result); - - Mockito.doAnswer((iom) -> { - result.complete(Status.SUCCESS); - return null; - }).when(mock).close(); - } - - private static DescribeTopicResult.PartitionInfo partition(long partitionId, int nodeId) { - return DescribeTopicResult.PartitionInfo.newBuilder() - .setPartitionId(partitionId) - .setPartitionLocation(YdbTopic.PartitionLocation.newBuilder() - .setNodeId(nodeId) - .build()) - .build(); - } - - private static void mockDescribeResult(TopicRpc rpc, DescribeTopicResult.PartitionInfo... partitions) { - Mockito.when(rpc.describeTopic(Mockito.any(), Mockito.any())) - .thenReturn(CompletableFuture.completedFuture(Result.success( - DescribeTopicResult.newBuilder().addAllPartitions(Arrays.asList(partitions)).build()) - )); - } - - private static void mockDescribeResult(TopicRpc rpc, Status status) { - Mockito.when(rpc.describeTopic(Mockito.any(), Mockito.any())) - .thenReturn(CompletableFuture.completedFuture(Result.fail(status))); - } - @Test public void regularWriteTest() { - GrpcReadWriteStream grpc = mockGrpcStream(); + @SuppressWarnings("unchecked") + GrpcReadWriteStream grpc = Mockito.mock(GrpcReadWriteStream.class); + TopicRpc rpc = Mockito.mock(TopicRpc.class); Mockito.when(rpc.writeSession(Mockito.eq("s1"))).thenReturn(grpc); @@ -84,7 +28,7 @@ public void regularWriteTest() { .setTopicPath("/local/topic") .build(); - WriteStreamFactory factory = WriteStreamFactory.of(rpc, settings); + WriteStreamFactory factory = new WriteStreamFactory(rpc, settings); Assert.assertEquals("/local/topic", factory.getTopicPath()); WriteSession.Stream stream = factory.createNewStream("s1"); @@ -95,7 +39,7 @@ public void regularWriteTest() { @Test public void writeWithoutDeduplicationTest() { TopicRpc rpc = Mockito.mock(TopicRpc.class); - WriteStreamFactory factory = WriteStreamFactory.of(rpc, WriterSettings.newBuilder() + WriteStreamFactory factory = new WriteStreamFactory(rpc, WriterSettings.newBuilder() .setTopicPath("/test/topic") .build()); @@ -109,7 +53,7 @@ public void writeWithoutDeduplicationTest() { @Test public void writeWithProducerIdTest() { TopicRpc rpc = Mockito.mock(TopicRpc.class); - WriteStreamFactory factory = WriteStreamFactory.of(rpc, WriterSettings.newBuilder() + WriteStreamFactory factory = new WriteStreamFactory(rpc, WriterSettings.newBuilder() .setTopicPath("/test/topic") .setProducerId("producer") .build()); @@ -124,7 +68,7 @@ public void writeWithProducerIdTest() { @Test public void writeWithProducerIdAndMessageGroupIdTest() { TopicRpc rpc = Mockito.mock(TopicRpc.class); - WriteStreamFactory factory = WriteStreamFactory.of(rpc, WriterSettings.newBuilder() + WriteStreamFactory factory = new WriteStreamFactory(rpc, WriterSettings.newBuilder() .setTopicPath("/test/topic") .setProducerId("producer") .setMessageGroupId("producer") @@ -140,7 +84,7 @@ public void writeWithProducerIdAndMessageGroupIdTest() { @Test public void writeWithPartitionIdTest() { TopicRpc rpc = Mockito.mock(TopicRpc.class); - WriteStreamFactory factory = WriteStreamFactory.of(rpc, WriterSettings.newBuilder() + WriteStreamFactory factory = new WriteStreamFactory(rpc, WriterSettings.newBuilder() .setTopicPath("/test/topic") .setPartitionId(5L) .build()); @@ -158,244 +102,7 @@ public void messageGroupAndPartitionErrorTest() { .setMessageGroupId("group-1") .setPartitionId(5L) .build(); - Exception ex = Assert.assertThrows(IllegalArgumentException.class, () -> WriteStreamFactory.of(rpc, settings)); + Exception ex = Assert.assertThrows(IllegalArgumentException.class, () -> new WriteStreamFactory(rpc, settings)); Assert.assertEquals("Both MessageGroupId and PartitionId are set in WriterSettings", ex.getMessage()); } - - @Test - public void invalidDirectWriteTest() { - TopicRpc rpc = Mockito.mock(TopicRpc.class); - WriterSettings settings = WriterSettings.newBuilder() - .setTopicPath("/local/topic") - .setDirectWrite(true) // requires producerId or partitionId - .build(); - - Exception ex = Assert.assertThrows(IllegalArgumentException.class, () -> WriteStreamFactory.of(rpc, settings)); - Assert.assertEquals("Direct writing requires PartitionId or ProducerId in WriterSettings", ex.getMessage()); - } - - @Test - public void directWriteByPartitionIdTest() { - GrpcReadWriteStream grpc = mockGrpcStream(); - TopicRpc rpc = Mockito.mock(TopicRpc.class); - - mockDescribeResult(rpc, partition(1L, 10), partition(2L, 42), partition(3L, 23)); - Mockito.when(rpc.writeSession(Mockito.any(GrpcRequestSettings.class))).thenReturn(grpc); - - WriterSettings settings = WriterSettings.newBuilder() - .setTopicPath("/local/topic") - .setPartitionId(2L) - .setDirectWrite(true) - .build(); - - // just verify it doesn't throw and returns a factory for the correct topic - WriteStreamFactory factory = WriteStreamFactory.of(rpc, settings); - Assert.assertEquals("/local/topic", factory.getTopicPath()); - - WriteSession.Stream stream = factory.createNewStream("s1"); - Assert.assertTrue(stream instanceof WriteStream); - Mockito.verify(rpc).writeSession(Mockito.any(GrpcRequestSettings.class)); - } - - @Test - public void directWriteByPartitionIdTestDescribeFailTest() { - TopicRpc rpc = Mockito.mock(TopicRpc.class); - mockDescribeResult(rpc, Status.of(StatusCode.UNAVAILABLE)); - - WriteStreamFactory factory = WriteStreamFactory.of(rpc, WriterSettings.newBuilder() - .setTopicPath("/test/topic") - .setPartitionId(3L) - .setDirectWrite(true) - .build()); - - WriteSession.Stream stream = factory.createNewStream("s1"); - - Mockito.verify(rpc, Mockito.never()).writeSession(Mockito.any(GrpcRequestSettings.class)); - - Assert.assertTrue(stream instanceof WriteStream.Fail); - CompletableFuture res = stream.start(null); - Assert.assertTrue(res.isDone()); - Assert.assertEquals(Status.of(StatusCode.UNAVAILABLE), res.join()); - - stream.close(); // no effect - } - - @Test - public void directWriteByPartitionIdTestPartitionNotFoundTest() { - TopicRpc rpc = Mockito.mock(TopicRpc.class); - // result has partition 5, but we're looking for partition 3 - mockDescribeResult(rpc, partition(4L, 99), partition(5L, 100)); - - WriteStreamFactory factory = WriteStreamFactory.of(rpc, WriterSettings.newBuilder() - .setTopicPath("/test/topic") - .setPartitionId(3L) - .setDirectWrite(true) - .build()); - - WriteSession.Stream stream = factory.createNewStream("s1"); - - Mockito.verify(rpc, Mockito.never()).writeSession(Mockito.any(GrpcRequestSettings.class)); - - Assert.assertTrue(stream instanceof WriteStream.Fail); - CompletableFuture res = stream.start(null); - Assert.assertTrue(res.isDone()); - Status expected = Status.of(StatusCode.BAD_REQUEST, Issue.of("Cannot find partition 3", Issue.Severity.ERROR)); - Assert.assertEquals(expected, res.join()); - - stream.close(); // no effect - } - - @Test - public void directWriteByProducerIdTest() { - TopicRpc rpc = Mockito.mock(TopicRpc.class); - - GrpcReadWriteStream probeGrpc = mockGrpcStream(); - GrpcReadWriteStream actualGrpc = mockGrpcStream(); - - FromServer initResponse = FromServer.newBuilder() - .setStatus(StatusCodesProtos.StatusIds.StatusCode.SUCCESS) - .setInitResponse(YdbTopic.StreamWriteMessage.InitResponse.newBuilder() - .setLastSeqNo(0) - .setPartitionId(7L) - .setSessionId("session") - .build()) - .build(); - - mockStreamResponse(probeGrpc, initResponse); - mockDescribeResult(rpc, partition(7L, 55)); - - Mockito.when(rpc.writeSession(Mockito.any(GrpcRequestSettings.class))) - .thenReturn(probeGrpc).thenReturn(actualGrpc); - - WriteStreamFactory factory = WriteStreamFactory.of(rpc, WriterSettings.newBuilder() - .setTopicPath("/test/topic") - .setProducerId("producer-1") - .setDirectWrite(true) - .build()); - - WriteSession.Stream stream = factory.createNewStream("s1"); - Assert.assertTrue(stream instanceof WriteStream); - Mockito.verify(rpc, Mockito.times(2)).writeSession(Mockito.any(GrpcRequestSettings.class)); - } - - @Test - public void directWriteByProducerIdProbeFailTest() { - TopicRpc rpc = Mockito.mock(TopicRpc.class); - - GrpcReadWriteStream probeGrpc = mockGrpcStream(); - - mockStreamError(probeGrpc, Status.of(StatusCode.UNAUTHORIZED)); - - Mockito.when(rpc.writeSession(Mockito.any(GrpcRequestSettings.class))).thenReturn(probeGrpc); - - WriteStreamFactory factory = WriteStreamFactory.of(rpc, WriterSettings.newBuilder() - .setTopicPath("/test/topic") - .setProducerId("producer-1") - .setDirectWrite(true) - .build()); - - WriteSession.Stream stream = factory.createNewStream("s1"); - Assert.assertTrue(stream instanceof WriteStream.Fail); - Mockito.verify(rpc).writeSession(Mockito.any(GrpcRequestSettings.class)); - - CompletableFuture res = stream.start(null); - Assert.assertTrue(res.isDone()); - Assert.assertEquals(Status.of(StatusCode.UNAUTHORIZED), res.join()); - stream.close(); // no effect - } - - @Test - public void directWriteByProducerIdProbeWrongResponseTest() { - TopicRpc rpc = Mockito.mock(TopicRpc.class); - - GrpcReadWriteStream probeGrpc = mockGrpcStream(); - - FromServer initResponse = FromServer.newBuilder() - .setStatus(StatusCodesProtos.StatusIds.StatusCode.INTERNAL_ERROR) - .build(); - mockStreamResponse(probeGrpc, initResponse); - - Mockito.when(rpc.writeSession(Mockito.any(GrpcRequestSettings.class))).thenReturn(probeGrpc); - - WriteStreamFactory factory = WriteStreamFactory.of(rpc, WriterSettings.newBuilder() - .setTopicPath("/test/topic") - .setProducerId("producer-1") - .setDirectWrite(true) - .build()); - - WriteSession.Stream stream = factory.createNewStream("s1"); - Assert.assertTrue(stream instanceof WriteStream.Fail); - Mockito.verify(rpc).writeSession(Mockito.any(GrpcRequestSettings.class)); - - CompletableFuture res = stream.start(null); - Assert.assertTrue(res.isDone()); - Assert.assertEquals(Status.of(StatusCode.INTERNAL_ERROR), res.join()); - stream.close(); // no effect - } - - @Test - public void directWriteByProducerIdProbeUnexpectedResponseTest() { - TopicRpc rpc = Mockito.mock(TopicRpc.class); - - GrpcReadWriteStream probeGrpc = mockGrpcStream(); - - FromServer initResponse = FromServer.newBuilder() - .setStatus(StatusCodesProtos.StatusIds.StatusCode.SUCCESS) - .setUpdateTokenResponse(YdbTopic.UpdateTokenResponse.newBuilder().build()) - .build(); - mockStreamResponse(probeGrpc, initResponse); - - Mockito.when(rpc.writeSession(Mockito.any(GrpcRequestSettings.class))).thenReturn(probeGrpc); - - WriteStreamFactory factory = WriteStreamFactory.of(rpc, WriterSettings.newBuilder() - .setTopicPath("/test/topic") - .setProducerId("producer-1") - .setDirectWrite(true) - .build()); - - WriteSession.Stream stream = factory.createNewStream("s1"); - Assert.assertTrue(stream instanceof WriteStream.Fail); - Mockito.verify(rpc).writeSession(Mockito.any(GrpcRequestSettings.class)); - - CompletableFuture res = stream.start(null); - Assert.assertTrue(res.isDone()); - Issue issue = Issue.of("Unexpected message from stream with producer producer-1", Issue.Severity.ERROR); - Assert.assertEquals(Status.of(StatusCode.BAD_REQUEST, issue), res.join()); - stream.close(); // no effect - } - - @Test - public void directWriteByProducerIdPartitionNotFoundTest() { - TopicRpc rpc = Mockito.mock(TopicRpc.class); - - GrpcReadWriteStream probeGrpc = mockGrpcStream(); -// GrpcReadWriteStream actualGrpc = mockGrpcStream(); - - FromServer initResponse = FromServer.newBuilder() - .setStatus(StatusCodesProtos.StatusIds.StatusCode.SUCCESS) - .setInitResponse(YdbTopic.StreamWriteMessage.InitResponse.newBuilder() - .setLastSeqNo(0) - .setPartitionId(5L) - .setSessionId("session") - .build()) - .build(); - - mockStreamResponse(probeGrpc, initResponse); - mockDescribeResult(rpc, partition(1L, 55), partition(2L, 55)); - - Mockito.when(rpc.writeSession(Mockito.any(GrpcRequestSettings.class))).thenReturn(probeGrpc); - - WriteStreamFactory factory = WriteStreamFactory.of(rpc, WriterSettings.newBuilder() - .setTopicPath("/test/topic") - .setProducerId("producer-1") - .setDirectWrite(true) - .build()); - - WriteSession.Stream stream = factory.createNewStream("s1"); - Assert.assertTrue(stream instanceof WriteStream.Fail); - CompletableFuture res = stream.start(null); - Assert.assertTrue(res.isDone()); - Status expected = Status.of(StatusCode.BAD_REQUEST, Issue.of("Cannot find partition 5", Issue.Severity.ERROR)); - Assert.assertEquals(expected, res.join()); - } } From 68dcd52a6672392a2096fb77d9a18fb6115732ac Mon Sep 17 00:00:00 2001 From: Alexandr Gorshenin Date: Mon, 15 Jun 2026 15:07:31 +0100 Subject: [PATCH 3/4] Removed message_group_id for direct writing --- .../write/impl/WriteStreamDirectFactory.java | 38 +-- .../topic/TopicWritersIntegrationTest.java | 50 ++++ .../impl/WriteStreamDirectFactoryTest.java | 243 ++++++++++++------ 3 files changed, 238 insertions(+), 93 deletions(-) diff --git a/topic/src/main/java/tech/ydb/topic/write/impl/WriteStreamDirectFactory.java b/topic/src/main/java/tech/ydb/topic/write/impl/WriteStreamDirectFactory.java index c74b2c6c6..91cc8cb3b 100644 --- a/topic/src/main/java/tech/ydb/topic/write/impl/WriteStreamDirectFactory.java +++ b/topic/src/main/java/tech/ydb/topic/write/impl/WriteStreamDirectFactory.java @@ -37,17 +37,16 @@ public WriteStreamDirectFactory(TopicRpc rpc, WriterSettings settings) { @Override public WriteSession.Stream createNewStream(String id) { - Long partition = partitionId; - if (partition == null) { - Result pid = lookupPartitionId(id, producerId); + Long targetPartitionId = partitionId; + if (targetPartitionId == null) { + Result pid = lookupPartitionId(id); if (!pid.isSuccess()) { return new WriteStream.Fail(id, pid.getStatus()); } - partition = pid.getValue(); + targetPartitionId = pid.getValue(); } - - Result location = lookupLocation(id, partition); + Result location = lookupLocation(id, targetPartitionId); if (!location.isSuccess()) { return new WriteStream.Fail(id, location.getStatus()); } @@ -55,16 +54,13 @@ public WriteSession.Stream createNewStream(String id) { StreamWriteMessage.InitRequest.Builder req = StreamWriteMessage.InitRequest.newBuilder() .setPath(topicPath) .setPartitionWithGeneration(YdbTopic.PartitionWithGeneration.newBuilder() - .setPartitionId(partition) + .setPartitionId(targetPartitionId) .setGeneration(location.getValue().getGeneration()) .build()); if (producerId != null) { req.setProducerId(producerId); } - if (messageGroupId != null) { - req.setMessageGroupId(messageGroupId); - } FromClient init = FromClient.newBuilder().setInitRequest(req).build(); GrpcRequestSettings settings = GrpcRequestSettings.newBuilder() @@ -77,9 +73,8 @@ public WriteSession.Stream createNewStream(String id) { return new WriteStream(id, rpc.writeSession(settings), init); } - - protected Result lookupLocation(String id, long partitionId) { - logger.info("[{}] describe topic {} to look up node for partition {}", id, topicPath, partitionId); + protected Result lookupLocation(String id, long targetPartitionId) { + logger.info("[{}] describe topic {} to look up node for partition {}", id, topicPath, targetPartitionId); Result describeTopic = rpc.describeTopic( YdbTopic.DescribeTopicRequest.newBuilder().setIncludeLocation(true).setPath(topicPath).build(), GrpcRequestSettings.newBuilder().withDeadline(Duration.ofMinutes(1)).build() @@ -92,17 +87,23 @@ protected Result lookupLocation(String id, long part // lookup for partition location for (YdbTopic.DescribeTopicResult.PartitionInfo partition : describeTopic.getValue().getPartitionsList()) { - if (partition.getPartitionId() == partitionId) { + if (partition.getPartitionId() == targetPartitionId) { + if (!partition.hasPartitionLocation()) { + logger.warn("[{}] partition {} has no valid location info", id, partitionId); + Issue issue = Issue.of("Partition " + targetPartitionId + " has no location", Issue.Severity.ERROR); + return Result.fail(Status.of(StatusCode.BAD_REQUEST, issue)); + } + return Result.success(partition.getPartitionLocation()); } } - logger.warn("[{}] topic {} doesn't have partition {}, direct writing failed", id, topicPath, partitionId); - Issue issue = Issue.of("Cannot find partition " + partitionId, Issue.Severity.ERROR); + logger.warn("[{}] topic {} doesn't have partition {}, direct writing failed", id, topicPath, targetPartitionId); + Issue issue = Issue.of("Cannot find partition " + targetPartitionId, Issue.Severity.ERROR); return Result.fail(Status.of(StatusCode.BAD_REQUEST, issue)); } - private Result lookupPartitionId(String id, String producerId) { + private Result lookupPartitionId(String id) { CompletableFuture> pidFuture = new CompletableFuture<>(); // create one-shot stream to detect partitionID for this producer @@ -146,10 +147,9 @@ private Result lookupPartitionId(String id, String producerId) { try { streamFuture.whenComplete((st, th) -> { Status status = st != null ? st : Status.of(StatusCode.CLIENT_INTERNAL_ERROR, th); - if (!pidFuture.isDone()) { + if (pidFuture.complete(Result.fail(status))) { logger.warn("[{}] probe stream to topic {} with producer {} failed with status {}", id, topicPath, producerId, status); - pidFuture.complete(Result.fail(status)); } }); YdbTopic.StreamWriteMessage.FromClient init = YdbTopic.StreamWriteMessage.FromClient.newBuilder() diff --git a/topic/src/test/java/tech/ydb/topic/TopicWritersIntegrationTest.java b/topic/src/test/java/tech/ydb/topic/TopicWritersIntegrationTest.java index 9b746cdb7..b739cfdc9 100644 --- a/topic/src/test/java/tech/ydb/topic/TopicWritersIntegrationTest.java +++ b/topic/src/test/java/tech/ydb/topic/TopicWritersIntegrationTest.java @@ -5,6 +5,7 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; @@ -21,17 +22,20 @@ import org.slf4j.LoggerFactory; import tech.ydb.core.StatusCode; +import tech.ydb.core.UnexpectedResultException; import tech.ydb.core.utils.FutureTools; import tech.ydb.test.junit4.GrpcTransportRule; import tech.ydb.topic.description.Consumer; import tech.ydb.topic.read.DeferredCommitter; import tech.ydb.topic.read.SyncReader; import tech.ydb.topic.settings.CreateTopicSettings; +import tech.ydb.topic.settings.PartitioningSettings; import tech.ydb.topic.settings.ReaderSettings; import tech.ydb.topic.settings.TopicReadSettings; import tech.ydb.topic.settings.TopicRetryConfig; import tech.ydb.topic.settings.WriterSettings; import tech.ydb.topic.write.AsyncWriter; +import tech.ydb.topic.write.InitResult; import tech.ydb.topic.write.Message; import tech.ydb.topic.write.QueueOverflowException; import tech.ydb.topic.write.SyncWriter; @@ -54,6 +58,7 @@ public class TopicWritersIntegrationTest { private final static String TEST_PRODUCER1 = "producer"; private final static String TEST_CONSUMER1 = "consumer"; + private final static int PARTITIONS_COUNT = 1; private static TopicClient client; @@ -74,6 +79,10 @@ public void initTopic() { logger.info("Create test topic {} ...", TEST_TOPIC); client.createTopic(TEST_TOPIC, CreateTopicSettings.newBuilder() .addConsumer(Consumer.newBuilder().setName(TEST_CONSUMER1).build()) + .setPartitioningSettings(PartitioningSettings.newBuilder() + .setMaxActivePartitions(PARTITIONS_COUNT) + .setMinActivePartitions(PARTITIONS_COUNT) + .build()) .build()) .join().expectSuccess("can't create a new topic"); } @@ -363,4 +372,45 @@ public void idempotentWriterTest() throws Exception { writer2.shutdown().join(); } + + @Test + public void wrongDirectWriteTest() throws Exception { + CountDownLatch closed = new CountDownLatch(1); + + WriterSettings settings = WriterSettings.newBuilder() + .setTopicPath(TEST_TOPIC) + .setDirectWrite(true) + .setPartitionId(PARTITIONS_COUNT + 1) // Invalid partition + .setRetryConfig(TopicRetryConfig.STANDARD) + .setErrorsHandler((t, u) -> { closed.countDown(); }) + .build(); + + AsyncWriter writer = client.createAsyncWriter(settings); + CompletableFuture f1 = writer.send(Message.of(new byte[] { 0x00 })); + CompletableFuture f2 = writer.init(); + + Assert.assertTrue(closed.await(5, TimeUnit.SECONDS)); + + CompletableFuture f3 = writer.shutdown(); + + Assert.assertTrue(f1.isCompletedExceptionally()); + Assert.assertTrue(f2.isCompletedExceptionally()); + Assert.assertFalse(f3.isCompletedExceptionally()); + + Exception ex1 = Assert.assertThrows(CompletionException.class, f1::join); + Exception ex2 = Assert.assertThrows(CompletionException.class, f2::join); + + Assert.assertTrue(ex1.getCause() instanceof RuntimeException); + Assert.assertTrue(ex2.getCause() instanceof UnexpectedResultException); + + String reason = "Cannot find partition " + (PARTITIONS_COUNT + 1) + " (S_ERROR)"; + Assert.assertEquals( + "Message sending was cancelled with Status{code = BAD_REQUEST(code=400010), issues = [" + reason + "]}", + ex1.getCause().getMessage() + ); + Assert.assertEquals( + "Cannot init write session, code: BAD_REQUEST, issues: [" + reason + "]", + ex2.getCause().getMessage() + ); + } } diff --git a/topic/src/test/java/tech/ydb/topic/write/impl/WriteStreamDirectFactoryTest.java b/topic/src/test/java/tech/ydb/topic/write/impl/WriteStreamDirectFactoryTest.java index bdb002116..c58e1dc90 100644 --- a/topic/src/test/java/tech/ydb/topic/write/impl/WriteStreamDirectFactoryTest.java +++ b/topic/src/test/java/tech/ydb/topic/write/impl/WriteStreamDirectFactoryTest.java @@ -28,31 +28,57 @@ * @author Aleksandr Gorshenin {@literal } */ public class WriteStreamDirectFactoryTest { - private static GrpcReadWriteStream mockGrpcStream() { + private static class MockedStream { @SuppressWarnings("unchecked") - GrpcReadWriteStream grpc = Mockito.mock(GrpcReadWriteStream.class); - Mockito.when(grpc.authToken()).thenReturn(""); - Mockito.when(grpc.start(Mockito.any())).thenReturn(new CompletableFuture<>()); - return grpc; - } - - private static void mockStreamError(GrpcReadWriteStream mock, Status error) { - Mockito.when(mock.start(Mockito.any())).thenReturn(CompletableFuture.completedFuture(error)); - } - - private static void mockStreamResponse(GrpcReadWriteStream mock, FromServer response) { - CompletableFuture result = new CompletableFuture<>(); - - Mockito.when(mock.start(Mockito.any())).thenAnswer(iom -> { - GrpcReadStream.Observer obs = iom.getArgument(0); - obs.onNext(response); - return result; - }).thenReturn(result); - - Mockito.doAnswer((iom) -> { - result.complete(Status.SUCCESS); - return null; - }).when(mock).close(); + private final GrpcReadWriteStream grpc = Mockito.mock(GrpcReadWriteStream.class); + @SuppressWarnings("unchecked") + private final ArgumentCaptor> observer = ArgumentCaptor + .forClass(GrpcReadStream.Observer.class); + + private final CompletableFuture result = new CompletableFuture<>(); + private final ArgumentCaptor msg = ArgumentCaptor.forClass(FromClient.class); + + public MockedStream() { + Mockito.when(grpc.authToken()).thenReturn(""); + Mockito.when(grpc.start(observer.capture())).thenReturn(result); + } + + public FromClient verifyNextMsg() { + Mockito.verify(grpc).sendNext(msg.capture()); + return msg.getValue(); + } + + public void responseWith(FromServer response) { + Mockito.doAnswer((iom) -> { + observer.getValue().onNext(response); + return null; + }).when(grpc).sendNext(Mockito.any()); + } + + public void responseWith(Status status) { + Mockito.doAnswer((iom) -> { + result.complete(status); + return null; + }).when(grpc).sendNext(Mockito.any()); + } + + public void responseWith(Exception ex) { + Mockito.doAnswer((iom) -> { + result.completeExceptionally(ex); + return null; + }).when(grpc).sendNext(Mockito.any()); + } + + public void closeImmediatelly(Status status) { + result.complete(status); + } + + public void fail(FromServer response) { + Mockito.doAnswer((iom) -> { + observer.getValue().onNext(response); + return null; + }).when(grpc).sendNext(Mockito.any()); + } } private static YdbTopic.DescribeTopicResult.PartitionInfo partition(long partitionId, int nodeId, long generation) { @@ -94,11 +120,11 @@ public void invalidDirectWriteTest() { @Test public void directWriteByPartitionIdTest() { - GrpcReadWriteStream grpc = mockGrpcStream(); + MockedStream mocked = new MockedStream(); TopicRpc rpc = Mockito.mock(TopicRpc.class); mockDescribeResult(rpc, partition(1L, 10, 3L), partition(2L, 42, 1L), partition(3L, 23, 2L)); - Mockito.when(rpc.writeSession(Mockito.any(GrpcRequestSettings.class))).thenReturn(grpc); + Mockito.when(rpc.writeSession(Mockito.any(GrpcRequestSettings.class))).thenReturn(mocked.grpc); WriterSettings settings = WriterSettings.newBuilder() .setTopicPath("/local/topic") @@ -120,14 +146,13 @@ public void directWriteByPartitionIdTest() { stream.start(null); - ArgumentCaptor msg = ArgumentCaptor.forClass(FromClient.class); - Mockito.verify(grpc).sendNext(msg.capture()); - Assert.assertTrue(msg.getValue().hasInitRequest()); - Assert.assertEquals("/local/topic", msg.getValue().getInitRequest().getPath()); - Assert.assertFalse(msg.getValue().getInitRequest().hasPartitionId()); - Assert.assertEquals("", msg.getValue().getInitRequest().getProducerId()); - Assert.assertEquals(2L, msg.getValue().getInitRequest().getPartitionWithGeneration().getPartitionId()); - Assert.assertEquals(1L, msg.getValue().getInitRequest().getPartitionWithGeneration().getGeneration()); + FromClient msg = mocked.verifyNextMsg(); + Assert.assertTrue(msg.hasInitRequest()); + Assert.assertEquals("/local/topic", msg.getInitRequest().getPath()); + Assert.assertFalse(msg.getInitRequest().hasPartitionId()); + Assert.assertEquals("", msg.getInitRequest().getProducerId()); + Assert.assertEquals(2L, msg.getInitRequest().getPartitionWithGeneration().getPartitionId()); + Assert.assertEquals(1L, msg.getInitRequest().getPartitionWithGeneration().getGeneration()); } @Test @@ -178,31 +203,58 @@ public void directWriteByPartitionIdTestPartitionNotFoundTest() { stream.close(); // no effect } + @Test + public void directWriteByPartitionIdTestPartitionHasNoLocationTest() { + TopicRpc rpc = Mockito.mock(TopicRpc.class); + + mockDescribeResult(rpc, YdbTopic.DescribeTopicResult.PartitionInfo.newBuilder() + .setPartitionId(3L) + .build()); + + WriteStreamFactory factory = new WriteStreamDirectFactory(rpc, WriterSettings.newBuilder() + .setTopicPath("/test/topic") + .setPartitionId(3L) + .setDirectWrite(true) + .build()); + + WriteSession.Stream stream = factory.createNewStream("s1"); + + Mockito.verify(rpc, Mockito.never()).writeSession(Mockito.any(GrpcRequestSettings.class)); + + Assert.assertTrue(stream instanceof WriteStream.Fail); + CompletableFuture res = stream.start(null); + Assert.assertTrue(res.isDone()); + Status expected = Status.of(StatusCode.BAD_REQUEST, Issue.of("Partition 3 has no location", Issue.Severity.ERROR)); + Assert.assertEquals(expected, res.join()); + + stream.close(); // no effect + } + @Test public void directWriteByProducerIdTest() { TopicRpc rpc = Mockito.mock(TopicRpc.class); - GrpcReadWriteStream probeGrpc = mockGrpcStream(); - GrpcReadWriteStream actualGrpc = mockGrpcStream(); + MockedStream probe = new MockedStream(); + MockedStream actual = new MockedStream(); + + Mockito.when(rpc.writeSession(Mockito.any(GrpcRequestSettings.class))) + .thenReturn(probe.grpc).thenReturn(actual.grpc); + + mockDescribeResult(rpc, partition(7L, 55, 3L)); - FromServer initResponse = FromServer.newBuilder() + probe.responseWith(FromServer.newBuilder() .setStatus(StatusCodesProtos.StatusIds.StatusCode.SUCCESS) .setInitResponse(InitResponse.newBuilder() .setLastSeqNo(0) .setPartitionId(7L) .setSessionId("session") .build()) - .build(); - - mockStreamResponse(probeGrpc, initResponse); - mockDescribeResult(rpc, partition(7L, 55, 3L)); - - Mockito.when(rpc.writeSession(Mockito.any(GrpcRequestSettings.class))) - .thenReturn(probeGrpc).thenReturn(actualGrpc); + .build()); WriteStreamFactory factory = new WriteStreamDirectFactory(rpc, WriterSettings.newBuilder() .setTopicPath("/test/topic") .setProducerId("producer-1") + .setMessageGroupId("producer-1") .setDirectWrite(true) .build()); @@ -216,25 +268,71 @@ public void directWriteByProducerIdTest() { stream.start(null); - ArgumentCaptor msg = ArgumentCaptor.forClass(FromClient.class); - Mockito.verify(actualGrpc).sendNext(msg.capture()); - Assert.assertTrue(msg.getValue().hasInitRequest()); - Assert.assertEquals("/test/topic", msg.getValue().getInitRequest().getPath()); - Assert.assertFalse(msg.getValue().getInitRequest().hasPartitionId()); - Assert.assertEquals("producer-1", msg.getValue().getInitRequest().getProducerId()); - Assert.assertEquals(7L, msg.getValue().getInitRequest().getPartitionWithGeneration().getPartitionId()); - Assert.assertEquals(3L, msg.getValue().getInitRequest().getPartitionWithGeneration().getGeneration()); + FromClient msg = actual.verifyNextMsg(); + Assert.assertTrue(msg.hasInitRequest()); + Assert.assertEquals("/test/topic", msg.getInitRequest().getPath()); + Assert.assertFalse(msg.getInitRequest().hasPartitionId()); + Assert.assertEquals("producer-1", msg.getInitRequest().getProducerId()); + Assert.assertEquals("", msg.getInitRequest().getMessageGroupId()); // never used for direct-write + Assert.assertEquals(7L, msg.getInitRequest().getPartitionWithGeneration().getPartitionId()); + Assert.assertEquals(3L, msg.getInitRequest().getPartitionWithGeneration().getGeneration()); } @Test public void directWriteByProducerIdProbeFailTest() { TopicRpc rpc = Mockito.mock(TopicRpc.class); - GrpcReadWriteStream probeGrpc = mockGrpcStream(); + MockedStream probe = new MockedStream(); + probe.closeImmediatelly(Status.of(StatusCode.UNAUTHORIZED)); + Mockito.when(rpc.writeSession(Mockito.any(GrpcRequestSettings.class))).thenReturn(probe.grpc); + + WriteStreamFactory factory = new WriteStreamDirectFactory(rpc, WriterSettings.newBuilder() + .setTopicPath("/test/topic") + .setProducerId("producer-1") + .setDirectWrite(true) + .build()); + + WriteSession.Stream stream = factory.createNewStream("s1"); + Assert.assertTrue(stream instanceof WriteStream.Fail); + Mockito.verify(rpc).writeSession(Mockito.any(GrpcRequestSettings.class)); + + CompletableFuture res = stream.start(null); + Assert.assertTrue(res.isDone()); + Assert.assertEquals(Status.of(StatusCode.UNAUTHORIZED), res.join()); + stream.close(); // no effect + } + + @Test + public void directWriteByProducerIdProbeFailOnSendTest() { + TopicRpc rpc = Mockito.mock(TopicRpc.class); + + MockedStream probe = new MockedStream(); + probe.responseWith(Status.of(StatusCode.PRECONDITION_FAILED)); + Mockito.when(rpc.writeSession(Mockito.any(GrpcRequestSettings.class))).thenReturn(probe.grpc); + + WriteStreamFactory factory = new WriteStreamDirectFactory(rpc, WriterSettings.newBuilder() + .setTopicPath("/test/topic") + .setProducerId("producer-1") + .setDirectWrite(true) + .build()); + + WriteSession.Stream stream = factory.createNewStream("s1"); + Assert.assertTrue(stream instanceof WriteStream.Fail); + Mockito.verify(rpc).writeSession(Mockito.any(GrpcRequestSettings.class)); - mockStreamError(probeGrpc, Status.of(StatusCode.UNAUTHORIZED)); + CompletableFuture res = stream.start(null); + Assert.assertTrue(res.isDone()); + Assert.assertEquals(Status.of(StatusCode.PRECONDITION_FAILED), res.join()); + stream.close(); // no effect + } + + @Test + public void directWriteByProducerIdProbeExceptionOnSendTest() { + TopicRpc rpc = Mockito.mock(TopicRpc.class); - Mockito.when(rpc.writeSession(Mockito.any(GrpcRequestSettings.class))).thenReturn(probeGrpc); + MockedStream probe = new MockedStream(); + probe.responseWith(new RuntimeException("something went wrong")); + Mockito.when(rpc.writeSession(Mockito.any(GrpcRequestSettings.class))).thenReturn(probe.grpc); WriteStreamFactory factory = new WriteStreamDirectFactory(rpc, WriterSettings.newBuilder() .setTopicPath("/test/topic") @@ -248,7 +346,10 @@ public void directWriteByProducerIdProbeFailTest() { CompletableFuture res = stream.start(null); Assert.assertTrue(res.isDone()); - Assert.assertEquals(Status.of(StatusCode.UNAUTHORIZED), res.join()); + Status status = res.join(); + Assert.assertEquals(StatusCode.CLIENT_INTERNAL_ERROR, status.getCode()); + Assert.assertNotNull(status.getCause()); + Assert.assertEquals("something went wrong", status.getCause().getMessage()); stream.close(); // no effect } @@ -256,14 +357,12 @@ public void directWriteByProducerIdProbeFailTest() { public void directWriteByProducerIdProbeWrongResponseTest() { TopicRpc rpc = Mockito.mock(TopicRpc.class); - GrpcReadWriteStream probeGrpc = mockGrpcStream(); - - FromServer initResponse = FromServer.newBuilder() + MockedStream probe = new MockedStream(); + probe.responseWith(FromServer.newBuilder() .setStatus(StatusCodesProtos.StatusIds.StatusCode.INTERNAL_ERROR) - .build(); - mockStreamResponse(probeGrpc, initResponse); + .build()); - Mockito.when(rpc.writeSession(Mockito.any(GrpcRequestSettings.class))).thenReturn(probeGrpc); + Mockito.when(rpc.writeSession(Mockito.any(GrpcRequestSettings.class))).thenReturn(probe.grpc); WriteStreamFactory factory = new WriteStreamDirectFactory(rpc, WriterSettings.newBuilder() .setTopicPath("/test/topic") @@ -285,15 +384,13 @@ public void directWriteByProducerIdProbeWrongResponseTest() { public void directWriteByProducerIdProbeUnexpectedResponseTest() { TopicRpc rpc = Mockito.mock(TopicRpc.class); - GrpcReadWriteStream probeGrpc = mockGrpcStream(); - - FromServer initResponse = FromServer.newBuilder() + MockedStream probe = new MockedStream(); + probe.responseWith(FromServer.newBuilder() .setStatus(StatusCodesProtos.StatusIds.StatusCode.SUCCESS) .setUpdateTokenResponse(YdbTopic.UpdateTokenResponse.newBuilder().build()) - .build(); - mockStreamResponse(probeGrpc, initResponse); + .build()); - Mockito.when(rpc.writeSession(Mockito.any(GrpcRequestSettings.class))).thenReturn(probeGrpc); + Mockito.when(rpc.writeSession(Mockito.any(GrpcRequestSettings.class))).thenReturn(probe.grpc); WriteStreamFactory factory = new WriteStreamDirectFactory(rpc, WriterSettings.newBuilder() .setTopicPath("/test/topic") @@ -316,21 +413,19 @@ public void directWriteByProducerIdProbeUnexpectedResponseTest() { public void directWriteByProducerIdPartitionNotFoundTest() { TopicRpc rpc = Mockito.mock(TopicRpc.class); - GrpcReadWriteStream probeGrpc = mockGrpcStream(); - - FromServer initResponse = FromServer.newBuilder() + MockedStream probe = new MockedStream(); + probe.responseWith(FromServer.newBuilder() .setStatus(StatusCodesProtos.StatusIds.StatusCode.SUCCESS) .setInitResponse(InitResponse.newBuilder() .setLastSeqNo(0) .setPartitionId(5L) .setSessionId("session") .build()) - .build(); + .build()); - mockStreamResponse(probeGrpc, initResponse); mockDescribeResult(rpc, partition(1L, 55, 8L), partition(2L, 55, 7L)); - Mockito.when(rpc.writeSession(Mockito.any(GrpcRequestSettings.class))).thenReturn(probeGrpc); + Mockito.when(rpc.writeSession(Mockito.any(GrpcRequestSettings.class))).thenReturn(probe.grpc); WriteStreamFactory factory = new WriteStreamDirectFactory(rpc, WriterSettings.newBuilder() .setTopicPath("/test/topic") From 0e403215d89e1cfb520a2702d2fe3e3e92156f9d Mon Sep 17 00:00:00 2001 From: Alexandr Gorshenin Date: Mon, 15 Jun 2026 16:38:56 +0100 Subject: [PATCH 4/4] Fixed by copilot --- .../tech/ydb/topic/write/impl/WriteStreamDirectFactory.java | 4 ++-- .../ydb/topic/write/impl/WriteStreamDirectFactoryTest.java | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/topic/src/main/java/tech/ydb/topic/write/impl/WriteStreamDirectFactory.java b/topic/src/main/java/tech/ydb/topic/write/impl/WriteStreamDirectFactory.java index 91cc8cb3b..c3cf7e425 100644 --- a/topic/src/main/java/tech/ydb/topic/write/impl/WriteStreamDirectFactory.java +++ b/topic/src/main/java/tech/ydb/topic/write/impl/WriteStreamDirectFactory.java @@ -62,7 +62,7 @@ public WriteSession.Stream createNewStream(String id) { req.setProducerId(producerId); } - FromClient init = FromClient.newBuilder().setInitRequest(req).build(); + FromClient init = FromClient.newBuilder().setInitRequest(req.build()).build(); GrpcRequestSettings settings = GrpcRequestSettings.newBuilder() .withTraceId(id) .disableDeadline() @@ -89,7 +89,7 @@ protected Result lookupLocation(String id, long targ for (YdbTopic.DescribeTopicResult.PartitionInfo partition : describeTopic.getValue().getPartitionsList()) { if (partition.getPartitionId() == targetPartitionId) { if (!partition.hasPartitionLocation()) { - logger.warn("[{}] partition {} has no valid location info", id, partitionId); + logger.warn("[{}] partition {} has no valid location info", id, targetPartitionId); Issue issue = Issue.of("Partition " + targetPartitionId + " has no location", Issue.Severity.ERROR); return Result.fail(Status.of(StatusCode.BAD_REQUEST, issue)); } diff --git a/topic/src/test/java/tech/ydb/topic/write/impl/WriteStreamDirectFactoryTest.java b/topic/src/test/java/tech/ydb/topic/write/impl/WriteStreamDirectFactoryTest.java index c58e1dc90..9dbc2ed90 100644 --- a/topic/src/test/java/tech/ydb/topic/write/impl/WriteStreamDirectFactoryTest.java +++ b/topic/src/test/java/tech/ydb/topic/write/impl/WriteStreamDirectFactoryTest.java @@ -69,7 +69,7 @@ public void responseWith(Exception ex) { }).when(grpc).sendNext(Mockito.any()); } - public void closeImmediatelly(Status status) { + public void closeImmediately(Status status) { result.complete(status); } @@ -283,7 +283,7 @@ public void directWriteByProducerIdProbeFailTest() { TopicRpc rpc = Mockito.mock(TopicRpc.class); MockedStream probe = new MockedStream(); - probe.closeImmediatelly(Status.of(StatusCode.UNAUTHORIZED)); + probe.closeImmediately(Status.of(StatusCode.UNAUTHORIZED)); Mockito.when(rpc.writeSession(Mockito.any(GrpcRequestSettings.class))).thenReturn(probe.grpc); WriteStreamFactory factory = new WriteStreamDirectFactory(rpc, WriterSettings.newBuilder()