Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions source/Interfaces/IStompClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ public interface IStompClient : IDisposable
Task ConnectAsync(IDictionary<string, string> headers);
Task SendAsync(object body, string destination, IDictionary<string, string> headers);
Task SendAsync(string body, string destination, IDictionary<string, string> headers);
Task SubscribeAsync<T>(string topic, IDictionary<string, string> headers, EventHandler<T> handler);
Task SubscribeAsync(string topic, IDictionary<string, string> headers, EventHandler<StompMessage> handler);
Task<StompUnsubscribeCallback> SubscribeAsync<T>(string topic, IDictionary<string, string> headers, EventHandler<T> handler);
Task<StompUnsubscribeCallback> SubscribeAsync(string topic, IDictionary<string, string> headers, EventHandler<StompMessage> handler);
Task AckAsync(string id, string transaction = null);
Task NackAsync(string id, string transaction = null);
Task DisconnectAsync();
Expand Down
35 changes: 29 additions & 6 deletions source/StompClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Text;
using System.Threading.Tasks;
using System.Net.WebSockets;
using System.Threading;
using Netina.Stomp.Client.Interfaces;
using Netina.Stomp.Client.Messages;
using Netina.Stomp.Client.Utils;
Expand All @@ -12,6 +13,8 @@

namespace Netina.Stomp.Client
{
public delegate Task StompUnsubscribeCallback();

public class StompClient : IStompClient
{
public event EventHandler<StompMessage> OnConnect;
Expand All @@ -27,6 +30,7 @@ public class StompClient : IStompClient
private readonly StompMessageSerializer _stompSerializer = new StompMessageSerializer();
private readonly IDictionary<string, EventHandler<StompMessage>> _subscribers = new Dictionary<string, EventHandler<StompMessage>>();
private readonly IDictionary<string, string> _connectingHeaders = new Dictionary<string, string>();
private int _subscriptionIdSequence;

/// <summary>
/// StompClient Ctor
Expand Down Expand Up @@ -120,21 +124,40 @@ public async Task SendAsync(string body, string destination, IDictionary<string,
await _socket.SendInstant(_stompSerializer.Serialize(connectMessage));
}

public async Task SubscribeAsync<T>(string topic, IDictionary<string, string> headers, EventHandler<T> handler)
public async Task<StompUnsubscribeCallback> SubscribeAsync<T>(string topic, IDictionary<string, string> headers, EventHandler<T> handler)
{
await SubscribeAsync(topic, headers, (sender, message) => handler(this, JsonConvert.DeserializeObject<T>(message.Body)));
return await SubscribeAsync(topic, headers, (sender, message) => handler(this, JsonConvert.DeserializeObject<T>(message.Body)));
}

public async Task SubscribeAsync(string topic, IDictionary<string, string> headers, EventHandler<StompMessage> handler)
public async Task<StompUnsubscribeCallback> SubscribeAsync(string topic, IDictionary<string, string> headers, EventHandler<StompMessage> handler)
{
if (StompState != StompConnectionState.Open)
await Reconnect();

headers.Add("destination", topic);
headers.Add("id", $"sub-{_subscribers.Count}");
var subscribeMessage = new StompMessage(StompCommand.Subscribe, headers);
Dictionary<string, string> headersCopy = new Dictionary<string, string>(headers);

string subscriptionId = $"sub-{Interlocked.Increment(ref _subscriptionIdSequence)}";
headersCopy.Add("destination", topic);
headersCopy.Add("id", subscriptionId);
var subscribeMessage = new StompMessage(StompCommand.Subscribe, headersCopy);
await _socket.SendInstant(_stompSerializer.Serialize(subscribeMessage));
_subscribers.Add(topic, handler);

return () => UnsubscribeAsync(subscriptionId, topic, headers);
}

private async Task UnsubscribeAsync(string subscriptionId, string topic, IDictionary<string, string> headers) {
if (StompState != StompConnectionState.Open) {
await Reconnect();
}

Dictionary<string, string> headersCopy = new Dictionary<string, string>(headers);

headersCopy.Add("destination", topic);
headersCopy.Add("id", subscriptionId);

await _socket.SendInstant(_stompSerializer.Serialize(new StompMessage(StompCommand.Unsubscribe, headersCopy)));
_subscribers.Remove(topic);
}

public async Task AckAsync(string id, string transaction = null)
Expand Down