From 80764f2b1605c834a643dd1565044aba28b378c9 Mon Sep 17 00:00:00 2001 From: Dero Date: Wed, 1 Apr 2026 12:47:44 +0200 Subject: [PATCH] Added support for unsubscribing previously subscribed from topics --- source/Interfaces/IStompClient.cs | 4 ++-- source/StompClient.cs | 35 +++++++++++++++++++++++++------ 2 files changed, 31 insertions(+), 8 deletions(-) diff --git a/source/Interfaces/IStompClient.cs b/source/Interfaces/IStompClient.cs index 7103110..799fff0 100644 --- a/source/Interfaces/IStompClient.cs +++ b/source/Interfaces/IStompClient.cs @@ -21,8 +21,8 @@ public interface IStompClient : IDisposable Task ConnectAsync(IDictionary headers); Task SendAsync(object body, string destination, IDictionary headers); Task SendAsync(string body, string destination, IDictionary headers); - Task SubscribeAsync(string topic, IDictionary headers, EventHandler handler); - Task SubscribeAsync(string topic, IDictionary headers, EventHandler handler); + Task SubscribeAsync(string topic, IDictionary headers, EventHandler handler); + Task SubscribeAsync(string topic, IDictionary headers, EventHandler handler); Task AckAsync(string id, string transaction = null); Task NackAsync(string id, string transaction = null); Task DisconnectAsync(); diff --git a/source/StompClient.cs b/source/StompClient.cs index c83e304..e7259bc 100644 --- a/source/StompClient.cs +++ b/source/StompClient.cs @@ -3,6 +3,7 @@ using System.Text; using System.Threading.Tasks; using System.Net.WebSockets; +using System.Threading; using Netina.Stomp.Client.Interfaces; using Netina.Stomp.Client.Messages; using Netina.Stomp.Client.Utils; @@ -12,6 +13,8 @@ namespace Netina.Stomp.Client { + public delegate Task StompUnsubscribeCallback(); + public class StompClient : IStompClient { public event EventHandler OnConnect; @@ -27,6 +30,7 @@ public class StompClient : IStompClient private readonly StompMessageSerializer _stompSerializer = new StompMessageSerializer(); private readonly IDictionary> _subscribers = new Dictionary>(); private readonly IDictionary _connectingHeaders = new Dictionary(); + private int _subscriptionIdSequence; /// /// StompClient Ctor @@ -120,21 +124,40 @@ public async Task SendAsync(string body, string destination, IDictionary(string topic, IDictionary headers, EventHandler handler) + public async Task SubscribeAsync(string topic, IDictionary headers, EventHandler handler) { - await SubscribeAsync(topic, headers, (sender, message) => handler(this, JsonConvert.DeserializeObject(message.Body))); + return await SubscribeAsync(topic, headers, (sender, message) => handler(this, JsonConvert.DeserializeObject(message.Body))); } - public async Task SubscribeAsync(string topic, IDictionary headers, EventHandler handler) + public async Task SubscribeAsync(string topic, IDictionary headers, EventHandler handler) { if (StompState != StompConnectionState.Open) await Reconnect(); - headers.Add("destination", topic); - headers.Add("id", $"sub-{_subscribers.Count}"); - var subscribeMessage = new StompMessage(StompCommand.Subscribe, headers); + Dictionary headersCopy = new Dictionary(headers); + + string subscriptionId = $"sub-{Interlocked.Increment(ref _subscriptionIdSequence)}"; + headersCopy.Add("destination", topic); + headersCopy.Add("id", subscriptionId); + var subscribeMessage = new StompMessage(StompCommand.Subscribe, headersCopy); await _socket.SendInstant(_stompSerializer.Serialize(subscribeMessage)); _subscribers.Add(topic, handler); + + return () => UnsubscribeAsync(subscriptionId, topic, headers); + } + + private async Task UnsubscribeAsync(string subscriptionId, string topic, IDictionary headers) { + if (StompState != StompConnectionState.Open) { + await Reconnect(); + } + + Dictionary headersCopy = new Dictionary(headers); + + headersCopy.Add("destination", topic); + headersCopy.Add("id", subscriptionId); + + await _socket.SendInstant(_stompSerializer.Serialize(new StompMessage(StompCommand.Unsubscribe, headersCopy))); + _subscribers.Remove(topic); } public async Task AckAsync(string id, string transaction = null)