From f4e96775650a54126f0d54abff811b5330ab91b7 Mon Sep 17 00:00:00 2001 From: Sergey Filippov Date: Wed, 10 May 2023 16:56:03 +0200 Subject: [PATCH] - binary serialization/deserialization - existing qeue subscription - web proxy support - hearbeat message type added - nugets updated --- .../Messages/StompBinaryMessageSerializer.cs | 128 ++++++++++++++++++ source/Messages/StompMessage.cs | 28 +++- source/Messages/StompMessageBodyType.cs | 9 ++ ...lizer.cs => StompTextMessageSerializer.cs} | 4 +- source/Netina.Stomp.Client.csproj | 4 +- source/StompClient.cs | 81 ++++++++--- source/Utils/StompCommand.cs | 3 + source/Utils/StompHeader.cs | 23 ++++ 8 files changed, 249 insertions(+), 31 deletions(-) create mode 100644 source/Messages/StompBinaryMessageSerializer.cs create mode 100644 source/Messages/StompMessageBodyType.cs rename source/Messages/{StompMessageSerializer.cs => StompTextMessageSerializer.cs} (94%) create mode 100644 source/Utils/StompHeader.cs diff --git a/source/Messages/StompBinaryMessageSerializer.cs b/source/Messages/StompBinaryMessageSerializer.cs new file mode 100644 index 0000000..b2c9502 --- /dev/null +++ b/source/Messages/StompBinaryMessageSerializer.cs @@ -0,0 +1,128 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Net.Http; +using System.Runtime.CompilerServices; +using System.Text; +using Netina.Stomp.Client.Utils; +using System.Linq; + +namespace Netina.Stomp.Client.Messages +{ + public class StompBinaryMessageSerializer + { + public byte[] Serialize(StompMessage message) + { + var resultBuffer = new List(); + resultBuffer.AddRange(Encoding.UTF8.GetBytes($"{message.Command}\n")); + + if (message.Headers?.Count > 0) + { + foreach (var messageHeader in message.Headers) + { + resultBuffer.AddRange(Encoding.UTF8.GetBytes($"{messageHeader.Key}:{messageHeader.Value}\n")); + } + } + + resultBuffer.Add((byte)'\n'); + resultBuffer.AddRange(message.BinaryBody); + resultBuffer.Add((byte)'\0'); + + return resultBuffer.ToArray(); + } + + public StompMessage Deserialize(byte[] message) + { + var headerBuffer = new List(); + var bodyBuffer = new List(); + byte previousByte = 0; + var isBodyStarted = false; + + // Building header and body buffers + foreach (var currentByte in message) + { + if (!isBodyStarted && currentByte == previousByte && previousByte == (byte)'\n') + { + isBodyStarted = true; + } + else + { + if (isBodyStarted) + { + bodyBuffer.Add(currentByte); + } + else + { + headerBuffer.Add(currentByte); + } + } + + previousByte = currentByte; + } + + // Doing a cleanup of a body buffer according to a frame description: + // "The body is then followed by the NULL octet. The NULL octet can be optionally followed by multiple EOLs" + var ignoredChars = new byte[] { (byte)'\n', (byte)'\r' }; + var messageEnd = (byte)'\0'; + for (var index = bodyBuffer.Count - 1; index >= 0; index--) + { + var currentByte = bodyBuffer[index]; + if (ignoredChars.Contains(currentByte)) + { + bodyBuffer.RemoveAt(index); + continue; + } + + if (currentByte == messageEnd) + { + bodyBuffer.RemoveAt(index); + } + + break; + } + + var command = string.Empty; + var headers = new Dictionary(); + + // Parse headers + if (headerBuffer.Count > 0) + { + var stringHeader = Encoding.UTF8.GetString(headerBuffer.ToArray()); + + using (var reader = new StringReader(stringHeader)) + { + command = reader.ReadLine(); + var header = reader.ReadLine(); + while (!string.IsNullOrEmpty(header)) + { + var separatorIndex = header.IndexOf(':'); + if (separatorIndex != -1) + { + var name = header.Substring(0, separatorIndex); + var value = header.Substring(separatorIndex + 1); + headers[name] = value; + } + + header = reader.ReadLine() ?? string.Empty; + } + } + } + + // Check body content length is present + if (headers.TryGetValue(StompHeader.ContentLength, out var contentLength)) + { + if (long.TryParse(contentLength, out var length)) + { + if (length != bodyBuffer.Count) + { + throw new ApplicationException( + "STOMP: Content length header value is different then actual length of bytes received."); + } + } + } + + + return new StompMessage(command, bodyBuffer.ToArray(), headers); + } + } +} \ No newline at end of file diff --git a/source/Messages/StompMessage.cs b/source/Messages/StompMessage.cs index 4df83bd..e60ffb4 100644 --- a/source/Messages/StompMessage.cs +++ b/source/Messages/StompMessage.cs @@ -5,29 +5,47 @@ namespace Netina.Stomp.Client.Messages public class StompMessage { public IDictionary Headers { get; } - public string Body { get; } + public string TextBody { get; } + public byte[] BinaryBody { get; } public string Command { get; } + public StompMessageBodyType BodyType { get; } + public StompMessage(string command) : this(command, string.Empty) { } - public StompMessage(string command, string body) - : this(command, body, new Dictionary()) + public StompMessage(string command, string textBody) + : this(command, textBody, new Dictionary()) { } public StompMessage(string command, IDictionary headers) : this(command, string.Empty, headers) { + + } + + public StompMessage(string command, string textBody, IDictionary headers) + { + Command = command; + TextBody = textBody; + Headers = headers; + BodyType = string.IsNullOrEmpty(textBody) ? StompMessageBodyType.Empty : StompMessageBodyType.Text; + } + + public StompMessage(string command, byte[] binBody) + : this(command, binBody, new Dictionary()) + { } - public StompMessage(string command, string body, IDictionary headers) + public StompMessage(string command, byte[] binBody, IDictionary headers) { Command = command; - Body = body; + BinaryBody = binBody; Headers = headers; + BodyType = binBody == null || binBody.Length == 0 ? StompMessageBodyType.Empty : StompMessageBodyType.Binary; } } } diff --git a/source/Messages/StompMessageBodyType.cs b/source/Messages/StompMessageBodyType.cs new file mode 100644 index 0000000..1cee04c --- /dev/null +++ b/source/Messages/StompMessageBodyType.cs @@ -0,0 +1,9 @@ +namespace Netina.Stomp.Client.Messages +{ + public enum StompMessageBodyType + { + Text, + Binary, + Empty + } +} \ No newline at end of file diff --git a/source/Messages/StompMessageSerializer.cs b/source/Messages/StompTextMessageSerializer.cs similarity index 94% rename from source/Messages/StompMessageSerializer.cs rename to source/Messages/StompTextMessageSerializer.cs index d2a9afc..9dd1113 100644 --- a/source/Messages/StompMessageSerializer.cs +++ b/source/Messages/StompTextMessageSerializer.cs @@ -4,7 +4,7 @@ namespace Netina.Stomp.Client.Messages { - public class StompMessageSerializer + public class StompTextMessageSerializer { public string Serialize(StompMessage message) { @@ -21,7 +21,7 @@ public string Serialize(StompMessage message) } buffer.Append('\n'); - buffer.Append(message.Body); + buffer.Append(message.TextBody); buffer.Append('\0'); return buffer.ToString(); diff --git a/source/Netina.Stomp.Client.csproj b/source/Netina.Stomp.Client.csproj index fc165ac..6b1a32b 100644 --- a/source/Netina.Stomp.Client.csproj +++ b/source/Netina.Stomp.Client.csproj @@ -22,8 +22,8 @@ Add ACK & NACK Commands - - + + diff --git a/source/StompClient.cs b/source/StompClient.cs index c83e304..50abdcc 100644 --- a/source/StompClient.cs +++ b/source/StompClient.cs @@ -1,5 +1,7 @@ using System; using System.Collections.Generic; +using System.Net; +using System.Net.Http; using System.Text; using System.Threading.Tasks; using System.Net.WebSockets; @@ -24,7 +26,8 @@ public class StompClient : IStompClient public string Version { get; private set; } private readonly WebsocketClient _socket; - private readonly StompMessageSerializer _stompSerializer = new StompMessageSerializer(); + private readonly StompTextMessageSerializer _stompTextSerializer = new StompTextMessageSerializer(); + private readonly StompBinaryMessageSerializer _binaryMessageSerializer = new StompBinaryMessageSerializer(); private readonly IDictionary> _subscribers = new Dictionary>(); private readonly IDictionary _connectingHeaders = new Dictionary(); @@ -36,11 +39,17 @@ public class StompClient : IStompClient /// Add stomp version in header for connecting, set "1.0,1.1,1.2" if nothing specified /// Time range in ms, how long to wait before reconnecting if last reconnection failed. Set null to disable this feature /// Set 0,1000 if nothing specified - public StompClient(string url, bool reconnectEnable = true, string stompVersion = null, TimeSpan? reconnectTimeOut = null, string heartBeat = null) + public StompClient(string url, bool reconnectEnable = true, string stompVersion = null, TimeSpan? reconnectTimeOut = null, string heartBeat = null, IWebProxy proxy = null) { _socket = new WebsocketClient(new Uri(url), () => { var ws = new ClientWebSocket(); ws.Options.AddSubProtocol("stomp"); + + if (proxy != null) + { + ws.Options.Proxy = proxy; + } + return ws; }) { @@ -65,12 +74,12 @@ public StompClient(string url, bool reconnectEnable = true, string stompVersion await Reconnect(); }); - _connectingHeaders.Add("accept-version", string.IsNullOrEmpty(stompVersion) ? "1.0,1.1,1.2" : stompVersion); - _connectingHeaders.Add("heart-beat", string.IsNullOrEmpty(heartBeat) ? "0,1000" : heartBeat); + _connectingHeaders.Add(StompHeader.AcceptVersion, string.IsNullOrEmpty(stompVersion) ? "1.0,1.1,1.2" : stompVersion); + _connectingHeaders.Add(StompHeader.Heartbeat, string.IsNullOrEmpty(heartBeat) ? "0,1000" : heartBeat); OnConnect += (sender, message) => { - Version = message.Headers["version"]; + Version = message.Headers[StompHeader.Version]; }; } @@ -87,7 +96,7 @@ public async Task ConnectAsync(IDictionary headers) _connectingHeaders.Add(header); } var connectMessage = new StompMessage(StompCommand.Connect, _connectingHeaders); - await _socket.SendInstant(_stompSerializer.Serialize(connectMessage)); + await _socket.SendInstant(_stompTextSerializer.Serialize(connectMessage)); StompState = StompConnectionState.Open; } @@ -98,15 +107,15 @@ public async Task Reconnect() if (StompState == StompConnectionState.Open) return; var connectMessage = new StompMessage(StompCommand.Connect, _connectingHeaders); - await _socket.SendInstant(_stompSerializer.Serialize(connectMessage)); + await _socket.SendInstant(_stompTextSerializer.Serialize(connectMessage)); StompState = StompConnectionState.Open; } public async Task SendAsync(object body, string destination, IDictionary headers) { var jsonPayload = JsonConvert.SerializeObject(body); - headers.Add("content-type", "application/json;charset=UTF-8"); - headers.Add("content-length", Encoding.UTF8.GetByteCount(jsonPayload).ToString()); + headers.Add(StompHeader.ContentType, "application/json;charset=UTF-8"); + headers.Add(StompHeader.ContentLength, Encoding.UTF8.GetByteCount(jsonPayload).ToString()); await SendAsync(jsonPayload, destination, headers); } @@ -115,14 +124,14 @@ public async Task SendAsync(string body, string destination, IDictionary(string topic, IDictionary headers, EventHandler handler) { - await SubscribeAsync(topic, headers, (sender, message) => handler(this, JsonConvert.DeserializeObject(message.Body))); + await SubscribeAsync(topic, headers, (sender, message) => handler(this, JsonConvert.DeserializeObject(message.TextBody))); } public async Task SubscribeAsync(string topic, IDictionary headers, EventHandler handler) @@ -130,10 +139,10 @@ public async Task SubscribeAsync(string topic, IDictionary heade if (StompState != StompConnectionState.Open) await Reconnect(); - headers.Add("destination", topic); - headers.Add("id", $"sub-{_subscribers.Count}"); + headers.Add(StompHeader.Destination, topic); + headers.Add(StompHeader.Id, $"sub-{_subscribers.Count}"); var subscribeMessage = new StompMessage(StompCommand.Subscribe, headers); - await _socket.SendInstant(_stompSerializer.Serialize(subscribeMessage)); + await _socket.SendInstant(_stompTextSerializer.Serialize(subscribeMessage)); _subscribers.Add(topic, handler); } @@ -150,7 +159,7 @@ public async Task NackAsync(string id, string transaction = null) public async Task DisconnectAsync() { var connectMessage = new StompMessage(StompCommand.Disconnect); - await _socket.SendInstant(_stompSerializer.Serialize(connectMessage)); + await _socket.SendInstant(_stompTextSerializer.Serialize(connectMessage)); StompState = StompConnectionState.Closed; _socket.Dispose(); _subscribers.Clear(); @@ -170,24 +179,52 @@ private async Task Acknowledge(bool isPositive, string id, string transaction = var headers = new Dictionary() { - { "id", id } + { StompHeader.Id, id } }; if (!string.IsNullOrEmpty(transaction)) - headers.Add("transaction", transaction); + headers.Add(StompHeader.Transaction, transaction); var connectMessage = new StompMessage(isPositive ? StompCommand.Ack : StompCommand.Nack, headers); - await _socket.SendInstant(_stompSerializer.Serialize(connectMessage)); + await _socket.SendInstant(_stompTextSerializer.Serialize(connectMessage)); } private void HandleMessage(ResponseMessage messageEventArgs) { - var message = _stompSerializer.Deserialize(messageEventArgs.Text); + StompMessage message = null; + if (messageEventArgs.MessageType == WebSocketMessageType.Text) + { + message = _stompTextSerializer.Deserialize(messageEventArgs.Text); + } + else + { + if (messageEventArgs.Binary.Length > 1) + { + message = _binaryMessageSerializer.Deserialize(messageEventArgs.Binary); + } + else + { + message = new StompMessage(StompCommand.HeartBeat); + } + } + OnMessage?.Invoke(this, message); if (message.Command == StompCommand.Connected) OnConnect?.Invoke(this, message); if (message.Command == StompCommand.Error) OnError?.Invoke(this, message); - if (message.Headers.ContainsKey("destination")) - _subscribers[message.Headers["destination"]](this, message); + if (message.Headers.TryGetValue(StompHeader.Destination, out var header)) + { + if (!_subscribers.ContainsKey(header)) + { + // Workaround for RabbitMQ subscription to a queue created outside the STOMP gateway + // https://www.rabbitmq.com/stomp.html#d + header = "/amq" + header; + } + + if (_subscribers.TryGetValue(header, out var subscriber)) + { + subscriber(this, message); + } + } } } } diff --git a/source/Utils/StompCommand.cs b/source/Utils/StompCommand.cs index e73a0b9..cf15608 100644 --- a/source/Utils/StompCommand.cs +++ b/source/Utils/StompCommand.cs @@ -15,5 +15,8 @@ public static class StompCommand public const string Connected = "CONNECTED"; public const string Message = "MESSAGE"; public const string Error = "ERROR"; + + //Fictional + public const string HeartBeat = "HEARTBEAT"; } } diff --git a/source/Utils/StompHeader.cs b/source/Utils/StompHeader.cs new file mode 100644 index 0000000..b3c466b --- /dev/null +++ b/source/Utils/StompHeader.cs @@ -0,0 +1,23 @@ +namespace Netina.Stomp.Client.Utils +{ + public static class StompHeader + { + public const string ContentLength = "content-length"; + public const string ContentType = "content-type"; + public const string AcceptVersion = "accept-version"; + public const string Version = "version"; + public const string Host = "host"; + public const string Login = "login"; + public const string Passcode = "passcode"; + public const string Heartbeat = "heart-beat"; + public const string Id = "id"; + public const string Acknowledge = "ack"; + public const string Transaction = "transaction"; + public const string MessageId = "message-id"; + public const string ReceiptId = "receipt-id"; + public const string Receipt = "receipt"; + + public const string Destination = "destination"; + public const string Subscription = "subscription"; + } +} \ No newline at end of file