Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions sdk/webpubsub/azure-messaging-webpubsub-client/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,28 @@ client.addOnStoppedEventHandler(event -> {
});
```

### Invoke upstream events (preview)

`invokeEvent` sends an `invoke` request to the service, awaits the correlated `invokeResponse` payload.

```java readme-sample-invokeEvent
InvokeEventResult result = client.invokeEvent("processOrder",
BinaryData.fromString("{\"orderId\":1}"), WebPubSubDataFormat.JSON);
System.out.println("Invocation result: " + result.getData().toString());
```

You can set a timeout and a custom invocation ID via `InvokeEventOptions` so the invocation fails with an `InvocationException` if no response is received within the specified duration. By default, there is no timeout and the invocation waits indefinitely.

```java readme-sample-invokeEventWithTimeout
InvokeEventOptions options = new InvokeEventOptions().setTimeout(Duration.ofSeconds(10)).setInvocationId("my-invocation-1");
InvokeEventResult result = client.invokeEvent("processOrder",
BinaryData.fromString("{\"orderId\":1}"), WebPubSubDataFormat.JSON, options);
System.out.println("Invocation result: " + result.getData().toString());
```

_Streaming and service-initiated invocations are not yet supported._


### Operation and retry

By default, the operation such as `client.joinGroup()`, `client.leaveGroup()`, `client.sendToGroup()`, `client.sendEvent()` has three reties. You can use `WebPubSubClientBuilder.retryOptions()` to change. If all retries have failed, an error will be thrown. You can keep retrying by passing in the same `ackId` as previous retries, thus the service can help to deduplicate the operation with the same `ackId`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,12 @@
import com.azure.messaging.webpubsub.client.implementation.WebPubSubConnection;
import com.azure.messaging.webpubsub.client.implementation.WebPubSubGroup;
import com.azure.messaging.webpubsub.client.implementation.models.AckMessage;
import com.azure.messaging.webpubsub.client.implementation.models.CancelInvocationMessage;
import com.azure.messaging.webpubsub.client.implementation.models.ConnectedMessage;
import com.azure.messaging.webpubsub.client.implementation.models.DisconnectedMessage;
import com.azure.messaging.webpubsub.client.implementation.models.GroupDataMessage;
import com.azure.messaging.webpubsub.client.implementation.models.InvokeMessage;
import com.azure.messaging.webpubsub.client.implementation.models.InvokeResponseMessage;
import com.azure.messaging.webpubsub.client.implementation.models.JoinGroupMessage;
import com.azure.messaging.webpubsub.client.implementation.models.LeaveGroupMessage;
import com.azure.messaging.webpubsub.client.implementation.models.SendEventMessage;
Expand All @@ -33,6 +36,9 @@
import com.azure.messaging.webpubsub.client.models.DisconnectedEvent;
import com.azure.messaging.webpubsub.client.models.GroupMessageEvent;
import com.azure.messaging.webpubsub.client.models.RejoinGroupFailedEvent;
import com.azure.messaging.webpubsub.client.models.InvocationException;
import com.azure.messaging.webpubsub.client.models.InvokeEventOptions;
import com.azure.messaging.webpubsub.client.models.InvokeEventResult;
import com.azure.messaging.webpubsub.client.models.SendEventOptions;
import com.azure.messaging.webpubsub.client.models.SendMessageFailedException;
import com.azure.messaging.webpubsub.client.models.SendToGroupOptions;
Expand Down Expand Up @@ -106,10 +112,15 @@ final class WebPubSubAsyncClient implements Closeable {

private Sinks.Many<RejoinGroupFailedEvent> rejoinGroupFailedEventSink
= Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false);
private Sinks.Many<InvokeResponseMessage> invokeResponseSink
= Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false);

// incremental ackId
private final AtomicLong ackId = new AtomicLong(0);

// incremental invocation ID
private final AtomicLong invocationIdCounter = new AtomicLong(0);

// connection (logic, one to one map to the connectionId)
private WebPubSubConnection webPubSubConnection;

Expand Down Expand Up @@ -459,6 +470,121 @@ public Mono<WebPubSubResult> sendEvent(String eventName, BinaryData content, Web
return responseMono.retryWhen(sendMessageRetrySpec);
}

/**
* Invokes an upstream event and waits for the correlated response.
*
* @param eventName the event name.
* @param content the data.
* @param dataFormat the data format.
* @return the result.
*/
public Mono<InvokeEventResult> invokeEvent(String eventName, BinaryData content, WebPubSubDataFormat dataFormat) {
return invokeEvent(eventName, content, dataFormat, new InvokeEventOptions());
}

/**
* Invokes an upstream event and waits for the correlated response.
*
* @param eventName the event name.
* @param content the data.
* @param dataFormat the data format.
* @param options the options.
* @return the result.
*/
public Mono<InvokeEventResult> invokeEvent(String eventName, BinaryData content, WebPubSubDataFormat dataFormat,
InvokeEventOptions options) {
Objects.requireNonNull(eventName);
Objects.requireNonNull(content);
Objects.requireNonNull(dataFormat);
if (options == null) {
options = new InvokeEventOptions();
}

String invocationId = options.getInvocationId() != null ? options.getInvocationId() : nextInvocationId();
Duration timeout = options.getTimeout();

InvokeMessage invokeMessage = new InvokeMessage().setInvocationId(invocationId)
.setTarget("event")
.setEvent(eventName)
.setDataType(dataFormat.toString())
.setData(content);

return invokeEventAttempt(invocationId, invokeMessage, timeout).retryWhen(sendMessageRetrySpec);
}

private Mono<InvokeEventResult> invokeEventAttempt(String invocationId, InvokeMessage invokeMessage,
Duration timeout) {
return Mono.<InvokeResponseMessage>create(sink -> {
Disposable responseDisposable
= waitForInvokeResponse(invocationId, timeout).subscribe(sink::success, sink::error);
sink.onDispose(responseDisposable);

sendMessage(invokeMessage).subscribe(null, error -> sink.error(error));
}).map(this::mapInvokeResponse).onErrorResume(throwable -> {
// If InvocationException, do not retry
if (throwable instanceof InvocationException) {
return Mono.error(throwable);
}
// Attempt to send cancelInvocation on failure
return sendCancelInvocationBestEffort(invocationId).then(
Mono.error(logSendMessageFailedException("Failed to invoke event.", throwable, true, (Long) null)));
});
}

private Mono<InvokeResponseMessage> waitForInvokeResponse(String invocationId, Duration timeout) {
Mono<InvokeResponseMessage> responseMono
= receiveInvokeResponses().filter(m -> invocationId.equals(m.getInvocationId())).next();
if (timeout != null) {
responseMono
= responseMono
.timeout(timeout,
Mono.defer(() -> Mono.error(new InvocationException(
"Invocation timed out after " + timeout.toMillis()
+ "ms. No response received for invocation '" + invocationId + "'.",
invocationId, null))));
}
return responseMono;
}

private InvokeEventResult mapInvokeResponse(InvokeResponseMessage message) {
if (Boolean.TRUE.equals(message.isSuccess())) {
return new InvokeEventResult(message.getInvocationId(), message.getDataType(), message.getData());
} else if (Boolean.FALSE.equals(message.isSuccess())) {
throw logger.logExceptionAsWarning(new InvocationException(
message.getError() != null ? message.getError().getMessage() : "Invocation failed.",
message.getInvocationId(), message.getError()));
} else {
throw logger.logExceptionAsWarning(
new InvocationException("Unsupported invoke response frame.", message.getInvocationId(), null));
}
}

/**
* Cancels a pending invocation by sending a cancel message to the server.
*
* @param invocationId the invocation ID to cancel.
* @return a {@link Mono} that completes when the cancel message has been sent, or errors if the
* cancel message could not be sent (e.g., when disconnected).
* @throws NullPointerException if {@code invocationId} is null.
*/
public Mono<Void> cancelInvocation(String invocationId) {
Objects.requireNonNull(invocationId, "'invocationId' cannot be null.");
CancelInvocationMessage cancelMessage = new CancelInvocationMessage().setInvocationId(invocationId);
return sendMessage(cancelMessage);
}

private Mono<Void> sendCancelInvocationBestEffort(String invocationId) {
CancelInvocationMessage cancelMessage = new CancelInvocationMessage().setInvocationId(invocationId);
return sendMessage(cancelMessage).onErrorResume(error -> {
logger.atVerbose().log("Failed to send cancelInvocation for " + invocationId, error);
return Mono.empty();
});
}

private Flux<InvokeResponseMessage> receiveInvokeResponses() {
return invokeResponseSink.asFlux();
}

/**
* Receives group message events.
*
Expand Down Expand Up @@ -523,6 +649,16 @@ private long nextAckId() {
});
}

private String nextInvocationId() {
return String.valueOf(invocationIdCounter.getAndUpdate(value -> {
// keep positive
if (++value < 0) {
value = 0;
}
return value;
}));
}

private Flux<AckMessage> receiveAckMessages() {
return ackMessageSink.asFlux();
}
Expand Down Expand Up @@ -776,6 +912,8 @@ private void handleMessage(WebPubSubMessage webPubSubMessage) {
}
} else if (webPubSubMessage instanceof AckMessage) {
tryEmitNext(ackMessageSink, (AckMessage) webPubSubMessage);
} else if (webPubSubMessage instanceof InvokeResponseMessage) {
tryEmitNext(invokeResponseSink, (InvokeResponseMessage) webPubSubMessage);
} else if (webPubSubMessage instanceof ConnectedMessage) {
final ConnectedMessage connectedMessage = (ConnectedMessage) webPubSubMessage;
final String connectionId = connectedMessage.getConnectionId();
Expand Down Expand Up @@ -940,6 +1078,9 @@ private void handleClientStop(boolean sendStoppedEvent) {
ackMessageSink.emitComplete(emitFailureHandler("Unable to emit Complete to ackMessageSink"));
ackMessageSink = Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false);

invokeResponseSink.emitComplete(emitFailureHandler("Unable to emit Complete to invokeResponseSink"));
invokeResponseSink = Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false);

updateLogger(applicationId, null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
import com.azure.messaging.webpubsub.client.models.DisconnectedEvent;
import com.azure.messaging.webpubsub.client.models.GroupMessageEvent;
import com.azure.messaging.webpubsub.client.models.RejoinGroupFailedEvent;
import com.azure.messaging.webpubsub.client.models.InvocationException;
import com.azure.messaging.webpubsub.client.models.InvokeEventOptions;
import com.azure.messaging.webpubsub.client.models.InvokeEventResult;
import com.azure.messaging.webpubsub.client.models.SendEventOptions;
import com.azure.messaging.webpubsub.client.models.SendMessageFailedException;
import com.azure.messaging.webpubsub.client.models.SendToGroupOptions;
Expand Down Expand Up @@ -447,6 +450,40 @@ public WebPubSubResult sendEvent(String eventName, BinaryData content, WebPubSub
return asyncClient.sendEvent(eventName, content, dataFormat, options).block();
}

/**
* Invokes an upstream event and waits for the correlated response.
* <p>
* {@link #start()} the client, before invoke event.
*
* @param eventName the event name.
* @param content the data.
* @param dataFormat the data format.
* @return the result.
* @throws InvocationException thrown if the invocation fails.
* @throws SendMessageFailedException thrown if client not connected, or send message failed.
*/
public InvokeEventResult invokeEvent(String eventName, BinaryData content, WebPubSubDataFormat dataFormat) {
return asyncClient.invokeEvent(eventName, content, dataFormat).block();
}

/**
* Invokes an upstream event and waits for the correlated response.
* <p>
* {@link #start()} the client, before invoke event.
*
* @param eventName the event name.
* @param content the data.
* @param dataFormat the data format.
* @param options the options.
* @return the result.
* @throws InvocationException thrown if the invocation fails.
* @throws SendMessageFailedException thrown if client not connected, or send message failed.
*/
public InvokeEventResult invokeEvent(String eventName, BinaryData content, WebPubSubDataFormat dataFormat,
InvokeEventOptions options) {
return asyncClient.invokeEvent(eventName, content, dataFormat, options).block();
}

// following API is for testing
WebPubSubClientState getClientState() {
return this.asyncClient.getClientState();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.azure.core.util.BinaryData;
import com.azure.json.JsonProviders;
import com.azure.json.JsonWriter;
import com.azure.messaging.webpubsub.client.implementation.models.InvokeMessage;
import com.azure.messaging.webpubsub.client.implementation.models.SendEventMessage;
import com.azure.messaging.webpubsub.client.implementation.models.SendToGroupMessage;
import com.azure.messaging.webpubsub.client.implementation.models.WebPubSubMessage;
Expand All @@ -23,6 +24,8 @@ public String encode(WebPubSubMessage object) {
updateDataForType((SendToGroupMessage) object);
} else if (object instanceof SendEventMessage) {
updateDataForType((SendEventMessage) object);
} else if (object instanceof InvokeMessage) {
updateDataForType((InvokeMessage) object);
}

try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
Expand All @@ -42,6 +45,10 @@ private static void updateDataForType(SendEventMessage message) {
updateDataForType(message.getDataType(), message.getData(), message::setData);
}

private static void updateDataForType(InvokeMessage message) {
updateDataForType(message.getDataType(), message.getData(), message::setData);
}

private static void updateDataForType(String dataType, Object data, Consumer<Object> dataUpdater) {
if (WebPubSubDataFormat.BINARY.toString().equals(dataType)
|| WebPubSubDataFormat.PROTOBUF.toString().equals(dataType)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.messaging.webpubsub.client.implementation.models;

import com.azure.core.annotation.Fluent;
import com.azure.json.JsonReader;
import com.azure.json.JsonToken;
import com.azure.json.JsonWriter;

import java.io.IOException;

/**
* The cancel invocation message sent upstream to cancel a pending invocation.
*/
@Fluent
public final class CancelInvocationMessage extends WebPubSubMessage {

private static final String TYPE = "cancelInvocation";

private String invocationId;

/**
* Creates a new instance of CancelInvocationMessage.
*/
public CancelInvocationMessage() {
}

/**
* Gets the type.
*
* @return the type.
*/
public String getType() {
return TYPE;
}

/**
* Gets the invocation ID.
*
* @return the invocation ID.
*/
public String getInvocationId() {
return invocationId;
}

/**
* Sets the invocation ID.
*
* @param invocationId the invocation ID.
* @return itself.
*/
public CancelInvocationMessage setInvocationId(String invocationId) {
this.invocationId = invocationId;
return this;
}

@Override
public JsonWriter toJson(JsonWriter jsonWriter) throws IOException {
return jsonWriter.writeStartObject()
.writeStringField("type", TYPE)
.writeStringField("invocationId", invocationId)
.writeEndObject();
}

public static CancelInvocationMessage fromJson(JsonReader jsonReader) throws IOException {
return jsonReader.readObject(reader -> {
CancelInvocationMessage message = new CancelInvocationMessage();

while (reader.nextToken() != JsonToken.END_OBJECT) {
String fieldName = reader.getFieldName();
reader.nextToken();
if ("invocationId".equals(fieldName)) {
message.invocationId = reader.getString();
} else {
reader.skipChildren();
}
}

return message;
});
}
}
Loading