Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ public TopicRetryableStream(Logger logger, String debugId, RetryConfig config, S
}

protected abstract TopicStream<R, W> createNewStream(String debugId);
protected abstract W getInitRequest();

protected abstract void onNext(R message);

Expand All @@ -53,7 +52,7 @@ public void start() {
return;
}

stream.start(getInitRequest(), this::onNext).whenComplete((status, th) -> {
stream.start(this::onNext).whenComplete((status, th) -> {
realStream.compareAndSet(stream, null);
if (status != null) {
onStreamStop(status, retryConfig.getStatusRetryPolicy(status));
Expand Down
4 changes: 2 additions & 2 deletions topic/src/main/java/tech/ydb/topic/impl/TopicStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
import tech.ydb.core.Status;

public interface TopicStream<R extends Message, W extends Message> {
CompletableFuture<Status> start(W initReq, Consumer<R> messageHandler);
void send(W request);
CompletableFuture<Status> start(Consumer<R> messageHandler);

void send(W request);
void close();
}
8 changes: 5 additions & 3 deletions topic/src/main/java/tech/ydb/topic/impl/TopicStreamBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,23 @@ public abstract class TopicStreamBase<R extends Message, W extends Message> impl
private final Logger logger;
private final String debugId;
private final GrpcReadWriteStream<R, W> stream;
private final W initRequest;
private final CompletableFuture<Status> streamStatus = new CompletableFuture<>();
private volatile String token;

public TopicStreamBase(Logger logger, String debugId, GrpcReadWriteStream<R, W> stream) {
public TopicStreamBase(Logger logger, String debugId, GrpcReadWriteStream<R, W> stream, W initRequest) {
this.logger = logger;
this.debugId = debugId;
this.stream = stream;
this.initRequest = initRequest;
this.token = stream.authToken();
}

protected abstract W updateTokenMessage(String token);
protected abstract Status parseMessageStatus(R message);

@Override
public CompletableFuture<Status> start(W initReq, Consumer<R> messageHandler) {
public CompletableFuture<Status> start(Consumer<R> messageHandler) {
this.logger.debug("[{}] is about to start", debugId);
this.stream.start((R msg) -> {
Status messageStatus = parseMessageStatus(msg);
Expand All @@ -48,7 +50,7 @@ public CompletableFuture<Status> start(W initReq, Consumer<R> messageHandler) {
});

if (!streamStatus.isDone()) {
stream.sendNext(initReq);
stream.sendNext(initRequest);
}

return streamStatus;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public TopicStreamFail(Logger logger, String debugId, Status status) {
}

@Override
public CompletableFuture<Status> start(W initReq, Consumer<R> messageHandler) {
public CompletableFuture<Status> start(Consumer<R> messageHandler) {
return CompletableFuture.completedFuture(status);
}

Expand Down
14 changes: 5 additions & 9 deletions topic/src/main/java/tech/ydb/topic/write/impl/WriteSession.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package tech.ydb.topic.write.impl;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.BiConsumer;

import org.slf4j.Logger;
Expand All @@ -10,7 +11,6 @@
import tech.ydb.proto.topic.YdbTopic;
import tech.ydb.proto.topic.YdbTopic.StreamWriteMessage.FromClient;
import tech.ydb.proto.topic.YdbTopic.StreamWriteMessage.FromServer;
import tech.ydb.topic.TopicRpc;
import tech.ydb.topic.impl.TopicRetryableStream;
import tech.ydb.topic.impl.TopicStream;
import tech.ydb.topic.settings.WriterSettings;
Expand Down Expand Up @@ -38,10 +38,11 @@ public interface Listener {
private final MessageSender sender;
private final BiConsumer<Status, Throwable> errorsHandler;

public WriteSession(String debugId, TopicRpc rpc, WriterSettings settings, Listener controller) {
super(logger, debugId, settings.getRetryConfig(), rpc.getScheduler());
public WriteSession(String debugId, WriteStreamFactory factory, WriterSettings settings,
ScheduledExecutorService scheduler, Listener controller) {
super(logger, debugId, settings.getRetryConfig(), scheduler);
this.listener = controller;
this.streamFactory = WriteStreamFactory.of(rpc, settings);
this.streamFactory = factory;
this.sender = new MessageSender(debugId, settings.getCodec(), this::send);
this.errorsHandler = settings.getErrorsHandler();
}
Expand All @@ -51,11 +52,6 @@ protected Stream createNewStream(String id) {
return streamFactory.createNewStream(id);
}

@Override
protected FromClient getInitRequest() {
return streamFactory.initRequest();
}

public void sendAll(List<SentMessage> list) {
for (SentMessage msg: list) {
sender.sendMessage(msg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
public class WriteStream extends TopicStreamBase<FromServer, FromClient> implements WriteSession.Stream {
private static final Logger logger = LoggerFactory.getLogger(WriteStream.class);

public WriteStream(String id, GrpcReadWriteStream<FromServer, FromClient> stream) {
super(logger, id, stream);
public WriteStream(String id, GrpcReadWriteStream<FromServer, FromClient> stream, FromClient initRequest) {
super(logger, id, stream, initRequest);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
package tech.ydb.topic.write.impl;

import java.time.Duration;
import java.util.concurrent.CompletableFuture;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import tech.ydb.core.Issue;
import tech.ydb.core.Result;
import tech.ydb.core.Status;
import tech.ydb.core.StatusCode;
import tech.ydb.core.grpc.GrpcReadWriteStream;
import tech.ydb.core.grpc.GrpcRequestSettings;
import tech.ydb.proto.StatusCodesProtos;
import tech.ydb.proto.topic.YdbTopic;
import tech.ydb.proto.topic.YdbTopic.StreamWriteMessage;
import tech.ydb.proto.topic.YdbTopic.StreamWriteMessage.FromClient;
import tech.ydb.proto.topic.YdbTopic.StreamWriteMessage.FromServer;
import tech.ydb.topic.TopicRpc;
import tech.ydb.topic.settings.WriterSettings;

/**
*
* @author Aleksandr Gorshenin {@literal <alexandr268@ydb.tech>}
*/
public class WriteStreamDirectFactory extends WriteStreamFactory {
private static final Logger logger = LoggerFactory.getLogger(WriteStreamDirectFactory.class);

public WriteStreamDirectFactory(TopicRpc rpc, WriterSettings settings) {
super(rpc, settings);

if (settings.getPartitionId() == null && settings.getProducerId() == null) {
throw new IllegalArgumentException("Direct writing requires PartitionId or ProducerId in WriterSettings");
}
}

@Override
public WriteSession.Stream createNewStream(String id) {
Long targetPartitionId = partitionId;
if (targetPartitionId == null) {
Result<Long> pid = lookupPartitionId(id);
if (!pid.isSuccess()) {
return new WriteStream.Fail(id, pid.getStatus());
}
targetPartitionId = pid.getValue();
}

Result<YdbTopic.PartitionLocation> location = lookupLocation(id, targetPartitionId);
if (!location.isSuccess()) {
return new WriteStream.Fail(id, location.getStatus());
}

StreamWriteMessage.InitRequest.Builder req = StreamWriteMessage.InitRequest.newBuilder()
.setPath(topicPath)
.setPartitionWithGeneration(YdbTopic.PartitionWithGeneration.newBuilder()
.setPartitionId(targetPartitionId)
.setGeneration(location.getValue().getGeneration())
.build());

if (producerId != null) {
req.setProducerId(producerId);
}

FromClient init = FromClient.newBuilder().setInitRequest(req.build()).build();
GrpcRequestSettings settings = GrpcRequestSettings.newBuilder()
.withTraceId(id)
.disableDeadline()
.withDirectMode(true)
.withPreferredNodeID(location.getValue().getNodeId())
.build();

return new WriteStream(id, rpc.writeSession(settings), init);
}

protected Result<YdbTopic.PartitionLocation> lookupLocation(String id, long targetPartitionId) {
logger.info("[{}] describe topic {} to look up node for partition {}", id, topicPath, targetPartitionId);
Result<YdbTopic.DescribeTopicResult> describeTopic = rpc.describeTopic(
YdbTopic.DescribeTopicRequest.newBuilder().setIncludeLocation(true).setPath(topicPath).build(),
GrpcRequestSettings.newBuilder().withDeadline(Duration.ofMinutes(1)).build()
).join();

if (!describeTopic.isSuccess()) {
logger.warn("[{}] describe topic {} failed with status {}", id, topicPath, describeTopic.getStatus());
return Result.fail(describeTopic.getStatus());
}

// lookup for partition location
for (YdbTopic.DescribeTopicResult.PartitionInfo partition : describeTopic.getValue().getPartitionsList()) {
if (partition.getPartitionId() == targetPartitionId) {
if (!partition.hasPartitionLocation()) {
logger.warn("[{}] partition {} has no valid location info", id, targetPartitionId);
Issue issue = Issue.of("Partition " + targetPartitionId + " has no location", Issue.Severity.ERROR);
return Result.fail(Status.of(StatusCode.BAD_REQUEST, issue));
}

return Result.success(partition.getPartitionLocation());
Comment thread
alex268 marked this conversation as resolved.
}
}

logger.warn("[{}] topic {} doesn't have partition {}, direct writing failed", id, topicPath, targetPartitionId);
Issue issue = Issue.of("Cannot find partition " + targetPartitionId, Issue.Severity.ERROR);
return Result.fail(Status.of(StatusCode.BAD_REQUEST, issue));
}

private Result<Long> lookupPartitionId(String id) {
CompletableFuture<Result<Long>> pidFuture = new CompletableFuture<>();

// create one-shot stream to detect partitionID for this producer
logger.info("[{}] create probe stream for topic {} with producer {}", id, topicPath, producerId);
GrpcRequestSettings settings = GrpcRequestSettings.newBuilder()
.withTraceId(id + "-probe")
.withDeadline(Duration.ofMinutes(1))
.build();
GrpcReadWriteStream<FromServer, FromClient> stream = rpc.writeSession(settings);

CompletableFuture<Status> streamFuture = stream.start(resp -> {
if (resp.getStatus() != StatusCodesProtos.StatusIds.StatusCode.SUCCESS) {
Status status = Status.of(StatusCode.fromProto(resp.getStatus()), Issue.fromPb(resp.getIssuesList()));
logger.warn("[{}] probe stream to topic {} with producer {} got error {}", id, topicPath,
producerId, status);
pidFuture.complete(Result.fail(status));
return;
}

if (resp.hasInitResponse()) {
long pid = resp.getInitResponse().getPartitionId();
logger.info("[{}] probe stream to topic {} with producer {} has partition {}", id, topicPath,
producerId, pid);
pidFuture.complete(Result.success(pid));
return;
}

logger.warn("[{}] probe stream to topic {} with producer {} got unexpected message {}", id, topicPath,
producerId, resp.getClass().getName());

Issue issue = Issue.of("Unexpected message from stream with producer " + producerId, Issue.Severity.ERROR);
pidFuture.complete(Result.fail(Status.of(StatusCode.BAD_REQUEST, issue)));
});

if (streamFuture.isDone()) {
logger.warn("[{}] probe stream to topic {} with producer {} failed with status {}", id, topicPath,
producerId, streamFuture.join());
return Result.fail(streamFuture.join());
}

try {
streamFuture.whenComplete((st, th) -> {
Status status = st != null ? st : Status.of(StatusCode.CLIENT_INTERNAL_ERROR, th);
if (pidFuture.complete(Result.fail(status))) {
logger.warn("[{}] probe stream to topic {} with producer {} failed with status {}", id, topicPath,
producerId, status);
}
});
YdbTopic.StreamWriteMessage.FromClient init = YdbTopic.StreamWriteMessage.FromClient.newBuilder()
.setInitRequest(buildInitRequest())
.build();
stream.sendNext(init);
return pidFuture.join();
} finally {
if (!streamFuture.isDone()) {
stream.close();
}
}
}
}
Loading