From fe44663a0d1cbd404b9a31da6cda6d68e94869bf Mon Sep 17 00:00:00 2001 From: devtekve Date: Mon, 24 Mar 2025 07:44:45 +0100 Subject: [PATCH 1/9] Add initial Msgq implementation with message queue support Introduces core Msgq components, including `MsgqDeep`, `MessageQueue`, and `MsgqMessage`, along with interfaces like `IMsgqMessage`. Also updates the test application to integrate basic message queue initialization. --- TestApp/Program.cs | 53 +++++--- msgqNET/implementations/msgq/IMsgqMessage.cs | 8 ++ msgqNET/implementations/msgq/MessageQueue.cs | 107 ++++++++++++++++ msgqNET/implementations/msgq/MsgqContext.cs | 11 ++ msgqNET/implementations/msgq/MsgqDeep.cs | 121 ++++++++++++++++++ msgqNET/implementations/msgq/MsgqMessage.cs | 32 +++++ msgqNET/implementations/msgq/MsgqSubSocket.cs | 30 +++++ msgqNET/interfaces/IMessage.cs | 6 +- 8 files changed, 347 insertions(+), 21 deletions(-) create mode 100644 msgqNET/implementations/msgq/IMsgqMessage.cs create mode 100644 msgqNET/implementations/msgq/MessageQueue.cs create mode 100644 msgqNET/implementations/msgq/MsgqContext.cs create mode 100644 msgqNET/implementations/msgq/MsgqDeep.cs create mode 100644 msgqNET/implementations/msgq/MsgqMessage.cs create mode 100644 msgqNET/implementations/msgq/MsgqSubSocket.cs diff --git a/TestApp/Program.cs b/TestApp/Program.cs index dc26fa8..724efb7 100644 --- a/TestApp/Program.cs +++ b/TestApp/Program.cs @@ -1,5 +1,6 @@ using Microsoft.Extensions.DependencyInjection; using msgqNET; +using msgqNET.implementations.msgq; using msgqNET.implementations.zmq; using msgqNET.interfaces; using TestApp; @@ -15,8 +16,8 @@ var context = messagingFactory.CreateContext(); -// var subsocket = messagingFactory.CreateSubSocket(context, "42979", "127.0.0.1", false, false); -// subsocket.SetTimeout(TimeSpan.FromSeconds(5)); +var subsocket = messagingFactory.CreateSubSocket(context, "carState", "127.0.0.1"); +subsocket.SetTimeout(TimeSpan.FromSeconds(30)); // // long counter = 0; // // Event processing loop @@ -33,23 +34,39 @@ -var pubsocket = messagingFactory.CreatePubSocket(context, "example-endpoint"); -var subsocket = messagingFactory.CreateSubSocket(context, "example-endpoint"); +// var pubsocket = messagingFactory.CreatePubSocket(context, "example-endpoint"); +// var subsocket = messagingFactory.CreateSubSocket(context, "example-endpoint"); +// +// var publisherTask = Task.Run(async () => +// { +// long counter = 0; +// // Event processing loop +// while (true) +// { +// pubsocket.SendMessage(new ZmqMessage($"hello world [{counter++}]")); +// await Task.Delay(3000); +// } +// }); +// +// +// subsocket.SetTimeout(TimeSpan.FromSeconds(5)); +// while (true) +// { +// HexDumper.PrintHexDump(subsocket.Receive()?.GetData() ?? []); +// } + -var publisherTask = Task.Run(async () => +var queue = new MessageQueue(); +if (queue.Initialize("example_queue", 1024) == 0) { - long counter = 0; - // Event processing loop - while (true) - { - pubsocket.SendMessage(new ZmqMessage($"hello world [{counter++}]")); - await Task.Delay(3000); - } -}); - - -subsocket.SetTimeout(TimeSpan.FromSeconds(5)); -while (true) + Console.WriteLine("Message queue initialized successfully."); +} +else { - HexDumper.PrintHexDump(subsocket.Receive()?.GetData() ?? []); + Console.WriteLine("Failed to initialize message queue."); } + + +// Perform operations... + +queue.Close(); \ No newline at end of file diff --git a/msgqNET/implementations/msgq/IMsgqMessage.cs b/msgqNET/implementations/msgq/IMsgqMessage.cs new file mode 100644 index 0000000..d04b1a9 --- /dev/null +++ b/msgqNET/implementations/msgq/IMsgqMessage.cs @@ -0,0 +1,8 @@ +using msgqNET.interfaces; + +namespace msgqNET.implementations.msgq; + +public interface IMsgqMessage : IMessage +{ + void TakeOwnership(byte[] data, int size); +} \ No newline at end of file diff --git a/msgqNET/implementations/msgq/MessageQueue.cs b/msgqNET/implementations/msgq/MessageQueue.cs new file mode 100644 index 0000000..64b32c7 --- /dev/null +++ b/msgqNET/implementations/msgq/MessageQueue.cs @@ -0,0 +1,107 @@ +using System.IO.MemoryMappedFiles; +using System.Runtime.InteropServices; + +namespace msgqNET.implementations.msgq; + + + +public class MessageQueue +{ + private const int HeaderSize = 128; // Example size for metadata header (adjust as needed) + private MemoryMappedFile memoryMappedFile; + private MemoryMappedViewAccessor accessor; + private string ShmPath => OperatingSystem.IsLinux() ? "/dev/shm/" : "/tmp/"; + + public string Path { get; private set; } + public int Size { get; private set; } + public byte[] Data { get; private set; } + + private struct MsgQueueHeader + { + public long NumReaders; + public long WritePointer; + public long[] ReadPointers; + public long[] ReadValids; + public long[] ReadUIDs; + + public MsgQueueHeader(int numReaders) + { + NumReaders = 0; + WritePointer = 0; + ReadPointers = new long[numReaders]; + ReadValids = new long[numReaders]; + ReadUIDs = new long[numReaders]; + } + } + + private MsgQueueHeader header; + + public int Initialize(string path, int size) + { + if (size < 0 || size > int.MaxValue) + { + Console.WriteLine("Size must be between 0 and 2^32-1."); + return -1; + } + + try + { + Path = path; + Size = size; + + // Construct full shared memory path + var fullPath = ShmPath + path; // Adjust for platform-specific paths + + // Create or open the memory-mapped file + memoryMappedFile = MemoryMappedFile.CreateFromFile(fullPath, FileMode.OpenOrCreate, null, 1024); + + // Create a view accessor for the entire file + accessor = memoryMappedFile.CreateViewAccessor(); + + // Initialize or read the header + header = ReadHeader(); + + // Map the data portion + Data = new byte[size]; + } + catch (Exception ex) + { + Console.WriteLine($"Failed to initialize message queue: {ex.Message}"); + return -1; + } + + return 0; + } + + private MsgQueueHeader ReadHeader() + { + // Read the header structure from the memory-mapped file + var headerBytes = new byte[HeaderSize]; + accessor.ReadArray(0, headerBytes, 0, HeaderSize); + + // Deserialize the header + var handle = GCHandle.Alloc(headerBytes, GCHandleType.Pinned); + var result = (MsgQueueHeader)Marshal.PtrToStructure(handle.AddrOfPinnedObject(), typeof(MsgQueueHeader)); + handle.Free(); + + return result; + } + + private void WriteHeader() + { + // Serialize the header to bytes + var headerBytes = new byte[HeaderSize]; + var handle = GCHandle.Alloc(headerBytes, GCHandleType.Pinned); + Marshal.StructureToPtr(header, handle.AddrOfPinnedObject(), false); + handle.Free(); + + // Write the header back to the memory-mapped file + accessor.WriteArray(0, headerBytes, 0, HeaderSize); + } + + public void Close() + { + accessor?.Dispose(); + memoryMappedFile?.Dispose(); + } +} \ No newline at end of file diff --git a/msgqNET/implementations/msgq/MsgqContext.cs b/msgqNET/implementations/msgq/MsgqContext.cs new file mode 100644 index 0000000..f15e477 --- /dev/null +++ b/msgqNET/implementations/msgq/MsgqContext.cs @@ -0,0 +1,11 @@ +using msgqNET.interfaces; + +namespace msgqNET.implementations.msgq; + +public class MsgqContext : IContext +{ + public IntPtr GetRawContext() + { + throw new NotImplementedException(); + } +} \ No newline at end of file diff --git a/msgqNET/implementations/msgq/MsgqDeep.cs b/msgqNET/implementations/msgq/MsgqDeep.cs new file mode 100644 index 0000000..ae7d177 --- /dev/null +++ b/msgqNET/implementations/msgq/MsgqDeep.cs @@ -0,0 +1,121 @@ +// using System; +// using System.Diagnostics; +// using System.Threading; +// +// namespace msgqNET.implementations.msgq; +// +// public static class Msgq +// { +// public const int NUM_READERS = 32; // Assuming a constant value +// +// public static void MsgqInitSubscriber(ref MsgqQueue q) +// { +// Debug.Assert(q != null); +// Debug.Assert(q.NumReaders != null); +// +// ulong uid = MsgqGetUid(); +// +// while (true) +// { +// ulong curNumReaders = q.NumReaders.Value; +// ulong newNumReaders = curNumReaders + 1; +// +// // No more slots available. Reset all subscribers to kick out inactive ones +// if (newNumReaders > NUM_READERS) +// { +// Console.WriteLine("Warning, evicting all subscribers!"); +// q.NumReaders.Value = 0; +// +// for (int i = 0; i < NUM_READERS; i++) +// { +// q.ReadValids[i].Value = false; +// +// ulong oldUid = q.ReadUids[i].Value; +// q.ReadUids[i].Value = 0; +// +// // Wake up reader in case they are in a poll +// ThreadSignal((int)(oldUid & 0xFFFFFFFF)); +// } +// +// continue; +// } +// +// // Use atomic compare and exchange to handle race condition where two subscribers start at the same time +// if (Interlocked.CompareExchange(ref q.NumReaders.Value, newNumReaders, curNumReaders) == curNumReaders) +// { +// q.ReaderId = curNumReaders; +// q.ReadUidLocal = uid; +// +// q.ReadValids[curNumReaders].Value = false; +// q.ReadPointers[curNumReaders].Value = 0; +// q.ReadUids[curNumReaders].Value = uid; +// break; +// } +// } +// +// Console.WriteLine($"New subscriber id: {q.ReaderId} uid: {q.ReadUidLocal} {q.Endpoint}"); +// MsgqResetReader(ref q); +// } +// +// // Assuming implementation of GetUid, ThreadSignal, ResetReader and supporting classes +// private static ulong MsgqGetUid() +// { +// // Example UUID generation +// return (ulong)DateTime.Now.Ticks; +// } +// +// private static void ThreadSignal(int uid) +// { +// // Example signal to a specific thread (Placeholder implementation) +// Console.WriteLine($"ThreadSignal called for uid: {uid}"); +// } +// +// private static void MsgqResetReader(ref MsgqQueue q) +// { +// // Reset reader logic (Placeholder implementation) +// Console.WriteLine($"Resetting reader for {q.Endpoint}"); +// } +// } +// +// public class MsgqQueue +// { +// public AtomicUlong NumReaders { get; set; } = new AtomicUlong(); +// public ulong ReaderId { get; set; } +// public ulong ReadUidLocal { get; set; } +// public AtomicBool[] ReadValids { get; set; } = new AtomicBool[Msgq.NUM_READERS]; +// public AtomicUlong[] ReadPointers { get; set; } = new AtomicUlong[Msgq.NUM_READERS]; +// public AtomicUlong[] ReadUids { get; set; } = new AtomicUlong[Msgq.NUM_READERS]; +// public string Endpoint { get; set; } +// +// public MsgqQueue() +// { +// for (int i = 0; i < Msgq.NUM_READERS; i++) +// { +// ReadValids[i] = new AtomicBool(); +// ReadPointers[i] = new AtomicUlong(); +// ReadUids[i] = new AtomicUlong(); +// } +// } +// } +// +// public class AtomicBool +// { +// private int value; +// +// public bool Value +// { +// get => value == 1; +// set => Interlocked.Exchange(ref this.value, value ? 1 : 0); +// } +// } +// +// public class AtomicUlong +// { +// private long value; +// +// public ulong Value +// { +// get => (ulong)Interlocked.Read(ref value); +// set => Interlocked.Exchange(ref this.value, (long)value); +// } +// } \ No newline at end of file diff --git a/msgqNET/implementations/msgq/MsgqMessage.cs b/msgqNET/implementations/msgq/MsgqMessage.cs new file mode 100644 index 0000000..40f7cae --- /dev/null +++ b/msgqNET/implementations/msgq/MsgqMessage.cs @@ -0,0 +1,32 @@ +namespace msgqNET.implementations.msgq; + +public class MsgqMessage : IMsgqMessage +{ + private int _size; + private byte[] _data; + + // Constructor to initialize with size + public MsgqMessage(int size) + { + _size = size; + _data = new byte[size]; + } + + // Constructor to initialize with data and size + public MsgqMessage(byte[] data, int size) + { + _size = size; + _data = new byte[size]; + Array.Copy(data, _data, size); + } + + public void TakeOwnership(byte[] data, int size) + { + _size = size; + _data = data; + } + + public int GetSize() => _size; + + public byte[] GetData() => _data; +} \ No newline at end of file diff --git a/msgqNET/implementations/msgq/MsgqSubSocket.cs b/msgqNET/implementations/msgq/MsgqSubSocket.cs new file mode 100644 index 0000000..cf89e69 --- /dev/null +++ b/msgqNET/implementations/msgq/MsgqSubSocket.cs @@ -0,0 +1,30 @@ +using System.Diagnostics; +using msgqNET.interfaces; + +namespace msgqNET.implementations.msgq; + +public class MsgqSubSocket : ISubSocket +{ + public bool Connect(IContext context, string endpoint, string address = "127.0.0.1", bool conflate = false, bool checkEndpoint = true) + { + Debug.Assert(context is not null); + Debug.Assert(address == "127.0.0.1"); + + throw new NotImplementedException(); + } + + public void SetTimeout(TimeSpan timeout) + { + throw new NotImplementedException(); + } + + public IMessage? Receive(bool nonBlocking = false) + { + throw new NotImplementedException(); + } + + public IntPtr GetRawSocket() + { + throw new NotImplementedException(); + } +} \ No newline at end of file diff --git a/msgqNET/interfaces/IMessage.cs b/msgqNET/interfaces/IMessage.cs index b2bde3b..736d1e8 100644 --- a/msgqNET/interfaces/IMessage.cs +++ b/msgqNET/interfaces/IMessage.cs @@ -2,9 +2,9 @@ namespace msgqNET.interfaces; public interface IMessage { - void Init(int size); - void Init(byte[] data, int size); - void Close(); + // void Init(int size); + // void Init(byte[] data, int size); + // void Close(); int GetSize(); byte[] GetData(); } \ No newline at end of file From 28784106b2e63c74bb0744292790160921bfe7f8 Mon Sep 17 00:00:00 2001 From: devtekve Date: Mon, 24 Mar 2025 09:30:08 +0100 Subject: [PATCH 2/9] Add subscriber functionality to MessageQueue Introduced `MessageQueueSubscriber` class to manage subscriber initialization and reader handling. Enhanced message queue header structure for multi-reader support. Added platform-specific signal handler stubs for future use in Linux/macOS environments. --- msgqNET/implementations/msgq/MessageQueue.cs | 124 +++++++++++++++++-- 1 file changed, 115 insertions(+), 9 deletions(-) diff --git a/msgqNET/implementations/msgq/MessageQueue.cs b/msgqNET/implementations/msgq/MessageQueue.cs index 64b32c7..9dda58f 100644 --- a/msgqNET/implementations/msgq/MessageQueue.cs +++ b/msgqNET/implementations/msgq/MessageQueue.cs @@ -1,27 +1,107 @@ using System.IO.MemoryMappedFiles; using System.Runtime.InteropServices; +using System.Runtime.Versioning; namespace msgqNET.implementations.msgq; +public class MessageQueueSubscriber : MessageQueue +{ + public int ReaderId { get; private set; } + public long ReaderUidLocal { get; private set; } + public bool ReadConflage { get; private set; } + + public void InitSubscriber() + { + if (accessor == null) + throw new InvalidOperationException("Message queue not initialized"); + + long uid = GetUid(); + + while (true) + { + long curNumReaders = header.NumReaders; + long newNumReaders = curNumReaders + 1; + + // No more slots available. Reset all subscribers to kick out inactive ones + if (newNumReaders > MaxReaders) + { + header.NumReaders = 0; + + for (int i = 0; i < MaxReaders; i++) + { + header.ReadValids[i] = 0; // false + + long oldUid = header.ReadUIDs[i]; + header.ReadUIDs[i] = 0; + +#if RELEASE + // Wake up reader if they are in a poll + if (oldUid != 0) + { + throw new NotImplementedException("ThreadSignal is not implemented because SIGUSR2 handling is not implemented."); + // SignalThread((int)(oldUid & 0xFFFFFFFF)); + } +#endif + } + + WriteHeader(); + continue; + } + + // Use atomic compare and exchange to handle race condition + long originalValue = curNumReaders; + if (Interlocked.CompareExchange(ref header.NumReaders, newNumReaders, curNumReaders) == originalValue) + { + ReaderId = (int)curNumReaders; + ReaderUidLocal = uid; + + // Start with read_valid = false + header.ReadValids[ReaderId] = 0; // false + header.ReadPointers[ReaderId] = 0; + header.ReadUIDs[ReaderId] = uid; + + WriteHeader(); + break; + } + } + + ResetReader(); + } + private void ResetReader() + { + // On first read, sync read pointer with write pointer + header.ReadValids[ReaderId] = 0; // false + WriteHeader(); + } + + private static long GetUid() => ((long)Environment.ProcessId << 32) | (uint)Environment.CurrentManagedThreadId; +} -public class MessageQueue +public class MessageQueue { - private const int HeaderSize = 128; // Example size for metadata header (adjust as needed) + private static readonly int HeaderSize = Marshal.SizeOf(); + protected const int MaxReaders = 15; private MemoryMappedFile memoryMappedFile; - private MemoryMappedViewAccessor accessor; + protected MemoryMappedViewAccessor accessor; private string ShmPath => OperatingSystem.IsLinux() ? "/dev/shm/" : "/tmp/"; public string Path { get; private set; } public int Size { get; private set; } public byte[] Data { get; private set; } - private struct MsgQueueHeader + protected struct MsgQueueHeader { public long NumReaders; public long WritePointer; + + [MarshalAs(UnmanagedType.ByValArray, SizeConst = MaxReaders)] public long[] ReadPointers; + + [MarshalAs(UnmanagedType.ByValArray, SizeConst = MaxReaders)] public long[] ReadValids; + + [MarshalAs(UnmanagedType.ByValArray, SizeConst = MaxReaders)] public long[] ReadUIDs; public MsgQueueHeader(int numReaders) @@ -34,15 +114,26 @@ public MsgQueueHeader(int numReaders) } } - private MsgQueueHeader header; + protected MsgQueueHeader header; + /// + /// + /// + /// + /// Size in bytes + /// public int Initialize(string path, int size) { - if (size < 0 || size > int.MaxValue) + if (size < 0) { - Console.WriteLine("Size must be between 0 and 2^32-1."); + Console.WriteLine($"Size must be between 0 and {int.MaxValue}"); return -1; } + +#if RELEASE + if (OperatingSystem.IsLinux() || OperatingSystem.IsMacOS()) + RegisterSigUsr2Handler(); +#endif try { @@ -53,7 +144,7 @@ public int Initialize(string path, int size) var fullPath = ShmPath + path; // Adjust for platform-specific paths // Create or open the memory-mapped file - memoryMappedFile = MemoryMappedFile.CreateFromFile(fullPath, FileMode.OpenOrCreate, null, 1024); + memoryMappedFile = MemoryMappedFile.CreateFromFile(fullPath, FileMode.OpenOrCreate, null, size); // Create a view accessor for the entire file accessor = memoryMappedFile.CreateViewAccessor(); @@ -87,7 +178,7 @@ private MsgQueueHeader ReadHeader() return result; } - private void WriteHeader() + protected void WriteHeader() { // Serialize the header to bytes var headerBytes = new byte[HeaderSize]; @@ -104,4 +195,19 @@ public void Close() accessor?.Dispose(); memoryMappedFile?.Dispose(); } + + /// + /// Registers a handler for the SIGUSR2 signal. + /// + [UnsupportedOSPlatform(nameof(OSPlatform.Windows))] + public static void RegisterSigUsr2Handler() + { + throw new NotImplementedException("sigusr2 handling is yet implemented. Maybe .net doesn't support it."); + // PosixSignalRegistration.Create(PosixSignal.SIGUSR2, context => + // { + // Console.WriteLine("SIGUSR2 received. Handling the signal..."); + // // Implement your signal handling logic here + // context.Cancel = true; // Prevents default termination + // }); + } } \ No newline at end of file From 3a24c1140a8022f7375261490d81ded47f8a4fd5 Mon Sep 17 00:00:00 2001 From: devtekve Date: Mon, 24 Mar 2025 09:30:12 +0100 Subject: [PATCH 3/9] Update message queue initialization and add backup queue Replaced `MessageQueue` with `MessageQueueSubscriber` and updated initialization parameters. Added a secondary queue setup for redundancy with identical configuration. These changes enhance robustness and ensure better handling of backup operations. --- TestApp/Program.cs | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/TestApp/Program.cs b/TestApp/Program.cs index 724efb7..fc1b044 100644 --- a/TestApp/Program.cs +++ b/TestApp/Program.cs @@ -56,9 +56,10 @@ // } -var queue = new MessageQueue(); -if (queue.Initialize("example_queue", 1024) == 0) +var queue = new MessageQueueSubscriber(); +if (queue.Initialize("backupManagerSP", 10486144) == 0) { + queue.InitSubscriber(); Console.WriteLine("Message queue initialized successfully."); } else @@ -66,6 +67,16 @@ Console.WriteLine("Failed to initialize message queue."); } +var queue2 = new MessageQueueSubscriber(); +if (queue2.Initialize("backupManagerSP", 10486144) == 0) +{ + queue2.InitSubscriber(); + Console.WriteLine("Message queue initialized successfully."); +} +else +{ + Console.WriteLine("Failed to initialize message queue. ??"); +} // Perform operations... From ed778e3137d337ff566fff8b1cb0ea6d48581480 Mon Sep 17 00:00:00 2001 From: devtekve Date: Mon, 24 Mar 2025 09:51:43 +0100 Subject: [PATCH 4/9] Refactor MessageQueue implementation to separate classes Moved `MessageQueueSubscriber` and `MessageQueuePublisher` to their own dedicated files for better structure and readability. Adjusted namespaces and updated field naming for consistency (e.g., `ReadUIDs` to `ReaderUIDs`). This change simplifies maintenance and future extensions of the messaging system. --- .../msgq/{ => queues}/MessageQueue.cs | 85 ++----------------- .../msgq/queues/MessageQueuePublisher.cs | 34 ++++++++ .../msgq/queues/MessageQueueSubscriber.cs | 79 +++++++++++++++++ msgqNET/msgqNET.csproj | 4 - 4 files changed, 120 insertions(+), 82 deletions(-) rename msgqNET/implementations/msgq/{ => queues}/MessageQueue.cs (62%) create mode 100644 msgqNET/implementations/msgq/queues/MessageQueuePublisher.cs create mode 100644 msgqNET/implementations/msgq/queues/MessageQueueSubscriber.cs diff --git a/msgqNET/implementations/msgq/MessageQueue.cs b/msgqNET/implementations/msgq/queues/MessageQueue.cs similarity index 62% rename from msgqNET/implementations/msgq/MessageQueue.cs rename to msgqNET/implementations/msgq/queues/MessageQueue.cs index 9dda58f..ad94baa 100644 --- a/msgqNET/implementations/msgq/MessageQueue.cs +++ b/msgqNET/implementations/msgq/queues/MessageQueue.cs @@ -2,83 +2,9 @@ using System.Runtime.InteropServices; using System.Runtime.Versioning; -namespace msgqNET.implementations.msgq; +namespace msgqNET.implementations.msgq.queues; -public class MessageQueueSubscriber : MessageQueue -{ - public int ReaderId { get; private set; } - public long ReaderUidLocal { get; private set; } - public bool ReadConflage { get; private set; } - - public void InitSubscriber() - { - if (accessor == null) - throw new InvalidOperationException("Message queue not initialized"); - - long uid = GetUid(); - - while (true) - { - long curNumReaders = header.NumReaders; - long newNumReaders = curNumReaders + 1; - - // No more slots available. Reset all subscribers to kick out inactive ones - if (newNumReaders > MaxReaders) - { - header.NumReaders = 0; - - for (int i = 0; i < MaxReaders; i++) - { - header.ReadValids[i] = 0; // false - - long oldUid = header.ReadUIDs[i]; - header.ReadUIDs[i] = 0; - -#if RELEASE - // Wake up reader if they are in a poll - if (oldUid != 0) - { - throw new NotImplementedException("ThreadSignal is not implemented because SIGUSR2 handling is not implemented."); - // SignalThread((int)(oldUid & 0xFFFFFFFF)); - } -#endif - } - - WriteHeader(); - continue; - } - - // Use atomic compare and exchange to handle race condition - long originalValue = curNumReaders; - if (Interlocked.CompareExchange(ref header.NumReaders, newNumReaders, curNumReaders) == originalValue) - { - ReaderId = (int)curNumReaders; - ReaderUidLocal = uid; - - // Start with read_valid = false - header.ReadValids[ReaderId] = 0; // false - header.ReadPointers[ReaderId] = 0; - header.ReadUIDs[ReaderId] = uid; - - WriteHeader(); - break; - } - } - - ResetReader(); - } - - private void ResetReader() - { - // On first read, sync read pointer with write pointer - header.ReadValids[ReaderId] = 0; // false - WriteHeader(); - } - - private static long GetUid() => ((long)Environment.ProcessId << 32) | (uint)Environment.CurrentManagedThreadId; -} - -public class MessageQueue +public abstract class MessageQueue { private static readonly int HeaderSize = Marshal.SizeOf(); protected const int MaxReaders = 15; @@ -94,6 +20,7 @@ protected struct MsgQueueHeader { public long NumReaders; public long WritePointer; + public long WriterUid; [MarshalAs(UnmanagedType.ByValArray, SizeConst = MaxReaders)] public long[] ReadPointers; @@ -102,7 +29,7 @@ protected struct MsgQueueHeader public long[] ReadValids; [MarshalAs(UnmanagedType.ByValArray, SizeConst = MaxReaders)] - public long[] ReadUIDs; + public long[] ReaderUIDs; public MsgQueueHeader(int numReaders) { @@ -110,7 +37,7 @@ public MsgQueueHeader(int numReaders) WritePointer = 0; ReadPointers = new long[numReaders]; ReadValids = new long[numReaders]; - ReadUIDs = new long[numReaders]; + ReaderUIDs = new long[numReaders]; } } @@ -210,4 +137,6 @@ public static void RegisterSigUsr2Handler() // context.Cancel = true; // Prevents default termination // }); } + + protected static long GetUid() => ((long)Environment.ProcessId << 32) | (uint)Environment.CurrentManagedThreadId; } \ No newline at end of file diff --git a/msgqNET/implementations/msgq/queues/MessageQueuePublisher.cs b/msgqNET/implementations/msgq/queues/MessageQueuePublisher.cs new file mode 100644 index 0000000..49d39ff --- /dev/null +++ b/msgqNET/implementations/msgq/queues/MessageQueuePublisher.cs @@ -0,0 +1,34 @@ +namespace msgqNET.implementations.msgq.queues; + +public class MessageQueuePublisher : MessageQueue +{ + + + public MessageQueuePublisher(string path, int size) + { + Initialize(path, size); + InitPublisher(); + } + + public long WriterUidLocal { get; private set; } + public bool ReadConflate { get; private set; } + + private void InitPublisher() + { + + if (accessor == null) + throw new InvalidOperationException("Message queue not initialized"); + + long uid = GetUid(); + header.WriterUid = uid; + header.NumReaders = 0; + for (var i = 0; i < MaxReaders; i++) + { + header.ReadValids[i] = 0; // false + header.ReaderUIDs[i] = 0; + } + + WriterUidLocal = uid; + WriteHeader(); + } +} \ No newline at end of file diff --git a/msgqNET/implementations/msgq/queues/MessageQueueSubscriber.cs b/msgqNET/implementations/msgq/queues/MessageQueueSubscriber.cs new file mode 100644 index 0000000..8d91721 --- /dev/null +++ b/msgqNET/implementations/msgq/queues/MessageQueueSubscriber.cs @@ -0,0 +1,79 @@ +namespace msgqNET.implementations.msgq.queues; + +public class MessageQueueSubscriber : MessageQueue +{ + public int ReaderId { get; private set; } + public long ReaderUidLocal { get; private set; } + public bool ReadConflate { get; private set; } + + public MessageQueueSubscriber(string path, int size) + { + Initialize(path, size); + InitSubscriber(); + } + + private void InitSubscriber() + { + if (accessor == null) + throw new InvalidOperationException("Message queue not initialized"); + + long uid = GetUid(); + + while (true) + { + long curNumReaders = header.NumReaders; + long newNumReaders = curNumReaders + 1; + + // No more slots available. Reset all subscribers to kick out inactive ones + if (newNumReaders > MaxReaders) + { + header.NumReaders = 0; + + for (int i = 0; i < MaxReaders; i++) + { + header.ReadValids[i] = 0; // false + + long oldUid = header.ReaderUIDs[i]; + header.ReaderUIDs[i] = 0; + +#if RELEASE + // Wake up reader if they are in a poll + if (oldUid != 0) + { + throw new NotImplementedException("ThreadSignal is not implemented because SIGUSR2 handling is not implemented."); + // SignalThread((int)(oldUid & 0xFFFFFFFF)); + } +#endif + } + + WriteHeader(); + continue; + } + + // Use atomic compare and exchange to handle race condition + long originalValue = curNumReaders; + if (Interlocked.CompareExchange(ref header.NumReaders, newNumReaders, curNumReaders) == originalValue) + { + ReaderId = (int)curNumReaders; + ReaderUidLocal = uid; + + // Start with read_valid = false + header.ReadValids[ReaderId] = 0; // false + header.ReadPointers[ReaderId] = 0; + header.ReaderUIDs[ReaderId] = uid; + + WriteHeader(); + break; + } + } + + ResetReader(); + } + + private void ResetReader() + { + // On first read, sync read pointer with write pointer + header.ReadValids[ReaderId] = 0; // false + WriteHeader(); + } +} \ No newline at end of file diff --git a/msgqNET/msgqNET.csproj b/msgqNET/msgqNET.csproj index f90d2c1..b94238d 100644 --- a/msgqNET/msgqNET.csproj +++ b/msgqNET/msgqNET.csproj @@ -12,8 +12,4 @@ - - - - From ca79aab1076ab5bc533f07385eb12b0c8c02b5a0 Mon Sep 17 00:00:00 2001 From: devtekve Date: Mon, 24 Mar 2025 09:51:47 +0100 Subject: [PATCH 5/9] Refactor message queue initialization logic. Simplified the initialization process by directly instantiating message queue objects with required parameters. Updated to use `MessageQueuePublisher` and improved code readability by removing duplicate initialization blocks. --- TestApp/Program.cs | 32 +++++++------------------------- 1 file changed, 7 insertions(+), 25 deletions(-) diff --git a/TestApp/Program.cs b/TestApp/Program.cs index fc1b044..6eb4a3f 100644 --- a/TestApp/Program.cs +++ b/TestApp/Program.cs @@ -1,7 +1,6 @@ using Microsoft.Extensions.DependencyInjection; using msgqNET; -using msgqNET.implementations.msgq; -using msgqNET.implementations.zmq; +using msgqNET.implementations.msgq.queues; using msgqNET.interfaces; using TestApp; @@ -55,29 +54,12 @@ // HexDumper.PrintHexDump(subsocket.Receive()?.GetData() ?? []); // } - -var queue = new MessageQueueSubscriber(); -if (queue.Initialize("backupManagerSP", 10486144) == 0) -{ - queue.InitSubscriber(); - Console.WriteLine("Message queue initialized successfully."); -} -else -{ - Console.WriteLine("Failed to initialize message queue."); -} - -var queue2 = new MessageQueueSubscriber(); -if (queue2.Initialize("backupManagerSP", 10486144) == 0) -{ - queue2.InitSubscriber(); - Console.WriteLine("Message queue initialized successfully."); -} -else -{ - Console.WriteLine("Failed to initialize message queue. ??"); -} +var pubqueue = new MessageQueuePublisher("backupManagerSP", 10486144); +var queue = new MessageQueueSubscriber("backupManagerSP", 10486144); +var queue2 = new MessageQueueSubscriber("backupManagerSP", 10486144); // Perform operations... -queue.Close(); \ No newline at end of file +pubqueue.Close(); +queue.Close(); +queue2.Close(); \ No newline at end of file From 54e6a0f443d18b506c9d6af5fd555d411054161c Mon Sep 17 00:00:00 2001 From: devtekve Date: Mon, 24 Mar 2025 10:00:27 +0100 Subject: [PATCH 6/9] Implement MsgqSubSocket and add Msgq service registration Introduced the MsgqSubSocket class with a basic implementation of the `Connect` method using `MessageQueueSubscriber`. Updated `MessagingServiceExtensions` to include a separate registration method for Msgq services and integrated it into the service pipeline. Additional methods remain partially implemented or as placeholders for further development. --- msgqNET/MessagingServiceExtensions.cs | 20 ++++++++++++++++--- msgqNET/implementations/msgq/MsgqSubSocket.cs | 7 ++++++- 2 files changed, 23 insertions(+), 4 deletions(-) diff --git a/msgqNET/MessagingServiceExtensions.cs b/msgqNET/MessagingServiceExtensions.cs index a87898e..8956c63 100644 --- a/msgqNET/MessagingServiceExtensions.cs +++ b/msgqNET/MessagingServiceExtensions.cs @@ -1,5 +1,6 @@ using Microsoft.Extensions.DependencyInjection; using msgqNET.implementations.fake; +using msgqNET.implementations.msgq; using msgqNET.implementations.zmq; using msgqNET.interfaces; @@ -20,13 +21,26 @@ public static IServiceCollection AddMessaging( // Register your specific implementations // You would replace these with your actual implementations - services.AddTransient(); - services.AddTransient(); - services.AddTransient(); + // RegisterZmqSocket(services); + RegisterMsqSocket(services); services.AddTransient(); return services; } + + private static void RegisterZmqSocket(IServiceCollection services) + { + services.AddTransient(); + services.AddTransient(); + services.AddTransient(); + } + + private static void RegisterMsqSocket(IServiceCollection services) + { + services.AddTransient(); + services.AddTransient(); + // services.AddTransient(); + } } public class MessagingOptions diff --git a/msgqNET/implementations/msgq/MsgqSubSocket.cs b/msgqNET/implementations/msgq/MsgqSubSocket.cs index cf89e69..6ac8929 100644 --- a/msgqNET/implementations/msgq/MsgqSubSocket.cs +++ b/msgqNET/implementations/msgq/MsgqSubSocket.cs @@ -1,16 +1,20 @@ using System.Diagnostics; +using msgqNET.implementations.msgq.queues; using msgqNET.interfaces; namespace msgqNET.implementations.msgq; public class MsgqSubSocket : ISubSocket { + private MessageQueueSubscriber _socket; + private const int size = 10486144; public bool Connect(IContext context, string endpoint, string address = "127.0.0.1", bool conflate = false, bool checkEndpoint = true) { Debug.Assert(context is not null); Debug.Assert(address == "127.0.0.1"); - throw new NotImplementedException(); + _socket = new MessageQueueSubscriber(endpoint, size); + return true; } public void SetTimeout(TimeSpan timeout) @@ -20,6 +24,7 @@ public void SetTimeout(TimeSpan timeout) public IMessage? Receive(bool nonBlocking = false) { + var x = _socket.Data; //DUMMY EXAMPLE NOT YET IMPLEMENTED throw new NotImplementedException(); } From 6eee62fe6e533a90072618a8fb02b34bba7fef63 Mon Sep 17 00:00:00 2001 From: devtekve Date: Mon, 24 Mar 2025 14:51:21 +0100 Subject: [PATCH 7/9] Refactor message queue classes for improved structure. Reorganized member variables, constructors, and methods to follow a cleaner and more consistent structure. Added nullable annotations, introduced proper encapsulation, and implemented previously unhandled operations like timeouts for subscribers. Simplified initialization and reset logic for both subscriber and publisher classes, ensuring better maintainability and future extension. --- msgqNET/implementations/msgq/MsgqSubSocket.cs | 16 +- .../msgq/queues/MessageQueue.cs | 20 +-- .../msgq/queues/MessageQueuePublisher.cs | 27 ++-- .../msgq/queues/MessageQueueSubscriber.cs | 144 ++++++++++++------ 4 files changed, 121 insertions(+), 86 deletions(-) diff --git a/msgqNET/implementations/msgq/MsgqSubSocket.cs b/msgqNET/implementations/msgq/MsgqSubSocket.cs index 6ac8929..139fc0f 100644 --- a/msgqNET/implementations/msgq/MsgqSubSocket.cs +++ b/msgqNET/implementations/msgq/MsgqSubSocket.cs @@ -6,25 +6,23 @@ namespace msgqNET.implementations.msgq; public class MsgqSubSocket : ISubSocket { - private MessageQueueSubscriber _socket; - private const int size = 10486144; + private MessageQueueSubscriber? _queue; + private const int Size = 10486144; + private TimeSpan _timeout = TimeSpan.FromMilliseconds(500); + public bool Connect(IContext context, string endpoint, string address = "127.0.0.1", bool conflate = false, bool checkEndpoint = true) { Debug.Assert(context is not null); Debug.Assert(address == "127.0.0.1"); - - _socket = new MessageQueueSubscriber(endpoint, size); + + _queue = new MessageQueueSubscriber(endpoint, Size); return true; } - public void SetTimeout(TimeSpan timeout) - { - throw new NotImplementedException(); - } + public void SetTimeout(TimeSpan timeout) => _timeout = timeout; public IMessage? Receive(bool nonBlocking = false) { - var x = _socket.Data; //DUMMY EXAMPLE NOT YET IMPLEMENTED throw new NotImplementedException(); } diff --git a/msgqNET/implementations/msgq/queues/MessageQueue.cs b/msgqNET/implementations/msgq/queues/MessageQueue.cs index ad94baa..38d4e81 100644 --- a/msgqNET/implementations/msgq/queues/MessageQueue.cs +++ b/msgqNET/implementations/msgq/queues/MessageQueue.cs @@ -41,22 +41,15 @@ public MsgQueueHeader(int numReaders) } } - protected MsgQueueHeader header; + protected MsgQueueHeader Header; - /// - /// - /// - /// - /// Size in bytes - /// - public int Initialize(string path, int size) + protected MessageQueue(string path, int size) { if (size < 0) { Console.WriteLine($"Size must be between 0 and {int.MaxValue}"); - return -1; } - + #if RELEASE if (OperatingSystem.IsLinux() || OperatingSystem.IsMacOS()) RegisterSigUsr2Handler(); @@ -77,7 +70,7 @@ public int Initialize(string path, int size) accessor = memoryMappedFile.CreateViewAccessor(); // Initialize or read the header - header = ReadHeader(); + Header = ReadHeader(); // Map the data portion Data = new byte[size]; @@ -85,10 +78,7 @@ public int Initialize(string path, int size) catch (Exception ex) { Console.WriteLine($"Failed to initialize message queue: {ex.Message}"); - return -1; } - - return 0; } private MsgQueueHeader ReadHeader() @@ -110,7 +100,7 @@ protected void WriteHeader() // Serialize the header to bytes var headerBytes = new byte[HeaderSize]; var handle = GCHandle.Alloc(headerBytes, GCHandleType.Pinned); - Marshal.StructureToPtr(header, handle.AddrOfPinnedObject(), false); + Marshal.StructureToPtr(Header, handle.AddrOfPinnedObject(), false); handle.Free(); // Write the header back to the memory-mapped file diff --git a/msgqNET/implementations/msgq/queues/MessageQueuePublisher.cs b/msgqNET/implementations/msgq/queues/MessageQueuePublisher.cs index 49d39ff..079452c 100644 --- a/msgqNET/implementations/msgq/queues/MessageQueuePublisher.cs +++ b/msgqNET/implementations/msgq/queues/MessageQueuePublisher.cs @@ -2,33 +2,24 @@ namespace msgqNET.implementations.msgq.queues; public class MessageQueuePublisher : MessageQueue { - - - public MessageQueuePublisher(string path, int size) - { - Initialize(path, size); - InitPublisher(); - } - - public long WriterUidLocal { get; private set; } - public bool ReadConflate { get; private set; } - - private void InitPublisher() + public MessageQueuePublisher(string path, int size) : base(path, size) { - if (accessor == null) throw new InvalidOperationException("Message queue not initialized"); long uid = GetUid(); - header.WriterUid = uid; - header.NumReaders = 0; + Header.WriterUid = uid; + Header.NumReaders = 0; for (var i = 0; i < MaxReaders; i++) { - header.ReadValids[i] = 0; // false - header.ReaderUIDs[i] = 0; + Header.ReadValids[i] = 0; // false + Header.ReaderUIDs[i] = 0; } - + WriterUidLocal = uid; WriteHeader(); } + + public long WriterUidLocal { get; private set; } + public bool ReadConflate { get; private set; } } \ No newline at end of file diff --git a/msgqNET/implementations/msgq/queues/MessageQueueSubscriber.cs b/msgqNET/implementations/msgq/queues/MessageQueueSubscriber.cs index 8d91721..13e76c0 100644 --- a/msgqNET/implementations/msgq/queues/MessageQueueSubscriber.cs +++ b/msgqNET/implementations/msgq/queues/MessageQueueSubscriber.cs @@ -2,40 +2,84 @@ namespace msgqNET.implementations.msgq.queues; public class MessageQueueSubscriber : MessageQueue { - public int ReaderId { get; private set; } - public long ReaderUidLocal { get; private set; } + private static readonly long Uid = GetUid(); + public long ReaderId { get; private set; } public bool ReadConflate { get; private set; } + + // Expose the underlying buffer and pointers through properties. + // Assume Header is an object that holds the shared state. + public long WritePointer => Header.WritePointer; + + public long ReadPointer + { + get => Header.ReadPointers[ReaderId]; + private set => Header.ReadPointers[ReaderId] = value; + } - public MessageQueueSubscriber(string path, int size) + public bool ReadValid { - Initialize(path, size); - InitSubscriber(); + get => Header.ReadValids[ReaderId] != 0; + private set => Header.ReadValids[ReaderId] = value ? 1 : 0; } + + public long ReaderUid + { + get => Header.ReaderUIDs[ReaderId]; + private set => Header.ReaderUIDs[ReaderId] = value; + } + + private readonly object _syncLock = new(); + + public MessageQueueSubscriber(string path, int size) : base(path, size) => InitializeSubscriber(); - private void InitSubscriber() + private void InitializeSubscriber() { if (accessor == null) throw new InvalidOperationException("Message queue not initialized"); - - long uid = GetUid(); - + while (true) { - long curNumReaders = header.NumReaders; - long newNumReaders = curNumReaders + 1; + var curNumReaders = Header.NumReaders; + var newNumReaders = curNumReaders + 1; // No more slots available. Reset all subscribers to kick out inactive ones if (newNumReaders > MaxReaders) { - header.NumReaders = 0; + KickAllReaders(); + continue; + } + + // Use atomic compare and exchange to handle race condition + if (Interlocked.CompareExchange(ref Header.NumReaders, newNumReaders, curNumReaders) != curNumReaders) + continue; - for (int i = 0; i < MaxReaders; i++) - { - header.ReadValids[i] = 0; // false + ReaderId = curNumReaders; + // Start with read_valid = false + ReadValid = false; // false + ReadPointer = 0; + ReaderUid = Uid; + + WriteHeader(); + break; + } + + ResetReader(); + } + + /// + /// This method feels like madness. I am a subscriber. Why am I writing to the header (other than my own fields)? + /// + private void KickAllReaders() + { + Header.NumReaders = 0; + + for (var i = 0; i < MaxReaders; i++) + { + Header.ReadValids[i] = 0; // false + Header.ReaderUIDs[i] = 0; + + var oldUid = Header.ReaderUIDs[i]; - long oldUid = header.ReaderUIDs[i]; - header.ReaderUIDs[i] = 0; - #if RELEASE // Wake up reader if they are in a poll if (oldUid != 0) @@ -44,36 +88,48 @@ private void InitSubscriber() // SignalThread((int)(oldUid & 0xFFFFFFFF)); } #endif - } - - WriteHeader(); - continue; - } - - // Use atomic compare and exchange to handle race condition - long originalValue = curNumReaders; - if (Interlocked.CompareExchange(ref header.NumReaders, newNumReaders, curNumReaders) == originalValue) - { - ReaderId = (int)curNumReaders; - ReaderUidLocal = uid; - - // Start with read_valid = false - header.ReadValids[ReaderId] = 0; // false - header.ReadPointers[ReaderId] = 0; - header.ReaderUIDs[ReaderId] = uid; - - WriteHeader(); - break; - } } - ResetReader(); + WriteHeader(); } - private void ResetReader() + + public void ResetReader() { - // On first read, sync read pointer with write pointer - header.ReadValids[ReaderId] = 0; // false - WriteHeader(); + lock (_syncLock) + { + // Sync read pointer with write pointer on reset + Header.ReadPointers[ReaderId] = Header.WritePointer; + Header.ReadValids[ReaderId] = 0; // mark as invalid, to be revalidated externally if needed + WriteHeader(); + } } + + // private static T Align(T value) where T : INumber, IBitwiseOperators + // { + // // Constants + // var seven = T.CreateChecked(7); + // // Use the bitwise operator directly through the ~ operator + // var mask = ~seven; + // + // // (value + 7) & ~7 + // return (value + seven) & mask; + // } + + // private IMsgqMessage? ReadMessage() + // { + // while (ReaderUid != Uid) + // { + // Console.WriteLine("Reader was evicted. Reconnecting..."); + // InitializeSubscriber(); + // } + // + // while (!ReadValid) + // { + // Console.WriteLine("Reader is not valid. Resetting..."); + // ResetReader(); + // } + // + // + // } } \ No newline at end of file From 6139795c7441a22116dea70bea89773d2e91fca7 Mon Sep 17 00:00:00 2001 From: devtekve Date: Mon, 24 Mar 2025 18:45:28 +0100 Subject: [PATCH 8/9] Refactor message handling and improve type consistency Replaced `int` with `long` for message size across interfaces and implementations to support larger data sizes. Implemented the `Packed64` struct for efficient pointer management and refactored `MessageQueueSubscriber` to streamline data handling. Additionally, modified the test application program flow and project configuration for better modularity and usability. --- TestApp/Program.cs | 60 ++++++---- TestApp/TestApp.csproj | 8 +- msgqNET/implementations/fake/FakeMessage.cs | 2 +- msgqNET/implementations/msgq/IMsgqMessage.cs | 2 +- msgqNET/implementations/msgq/MsgqMessage.cs | 17 ++- msgqNET/implementations/msgq/MsgqSubSocket.cs | 2 +- .../msgq/queues/MessageQueue.cs | 2 +- .../msgq/queues/MessageQueueSubscriber.cs | 112 ++++++++++++------ .../implementations/msgq/queues/Packed64.cs | 39 ++++++ msgqNET/implementations/zmq/ZmqMessage.cs | 2 +- msgqNET/interfaces/IMessage.cs | 4 +- 11 files changed, 173 insertions(+), 77 deletions(-) create mode 100644 msgqNET/implementations/msgq/queues/Packed64.cs diff --git a/TestApp/Program.cs b/TestApp/Program.cs index 6eb4a3f..3f82db1 100644 --- a/TestApp/Program.cs +++ b/TestApp/Program.cs @@ -5,30 +5,36 @@ using TestApp; // Set up the dependency injection container -var serviceProvider = new ServiceCollection() - .AddSingleton() - .AddMessaging() - .BuildServiceProvider(); + +internal class Program +{ + public static void Main(string endpoint = "backupManagerSP") + { + var serviceProvider = new ServiceCollection() + .AddSingleton() + .AddMessaging() + .BuildServiceProvider(); // Get the event processor from the service provider -var messagingFactory = serviceProvider.GetRequiredService(); -var context = messagingFactory.CreateContext(); + var messagingFactory = serviceProvider.GetRequiredService(); + var context = messagingFactory.CreateContext(); -var subsocket = messagingFactory.CreateSubSocket(context, "carState", "127.0.0.1"); -subsocket.SetTimeout(TimeSpan.FromSeconds(30)); + var subsocket = messagingFactory.CreateSubSocket(context, endpoint, "127.0.0.1"); + subsocket.SetTimeout(TimeSpan.FromSeconds(30)); // -// long counter = 0; -// // Event processing loop -// while (true) -// { -// Console.WriteLine($"Processing event {counter++}"); -// // pubsocket.SendMessage(new FakeMessage()); -// // await Task.Delay(1000); // Sleep for 1 second -// var x = subsocket.Receive(); -// HexDumper.PrintHexDump(x?.GetData() ?? []); -// Console.WriteLine(); -// } + long counter = 0; +// Event processing loop + while (true) + { + Console.WriteLine($"Processing event {counter++}"); + // pubsocket.SendMessage(new FakeMessage()); + // await Task.Delay(1000); // Sleep for 1 second + var x = subsocket.Receive(); + HexDumper.PrintHexDump(x?.GetData() ?? []); + Console.WriteLine(); + Thread.Sleep(1); + } @@ -54,12 +60,16 @@ // HexDumper.PrintHexDump(subsocket.Receive()?.GetData() ?? []); // } -var pubqueue = new MessageQueuePublisher("backupManagerSP", 10486144); -var queue = new MessageQueueSubscriber("backupManagerSP", 10486144); -var queue2 = new MessageQueueSubscriber("backupManagerSP", 10486144); + var pubqueue = new MessageQueuePublisher("backupManagerSP", 10486144); + var queue = new MessageQueueSubscriber("backupManagerSP", 10486144); + var queue2 = new MessageQueueSubscriber("backupManagerSP", 10486144); // Perform operations... -pubqueue.Close(); -queue.Close(); -queue2.Close(); \ No newline at end of file + pubqueue.Close(); + queue.Close(); + queue2.Close(); + } +} + + diff --git a/TestApp/TestApp.csproj b/TestApp/TestApp.csproj index 54fdfa2..b4c5e8c 100644 --- a/TestApp/TestApp.csproj +++ b/TestApp/TestApp.csproj @@ -5,17 +5,23 @@ net8.0 enable enable - true + true + true + + + + + diff --git a/msgqNET/implementations/fake/FakeMessage.cs b/msgqNET/implementations/fake/FakeMessage.cs index 4030d72..4f8fba3 100644 --- a/msgqNET/implementations/fake/FakeMessage.cs +++ b/msgqNET/implementations/fake/FakeMessage.cs @@ -16,7 +16,7 @@ public void Init(byte[] data, int size) public void Close() => _data = []; - public int GetSize() => _data.Length; + public long GetSize() => _data.Length; public byte[] GetData() => _data; diff --git a/msgqNET/implementations/msgq/IMsgqMessage.cs b/msgqNET/implementations/msgq/IMsgqMessage.cs index d04b1a9..bba5e83 100644 --- a/msgqNET/implementations/msgq/IMsgqMessage.cs +++ b/msgqNET/implementations/msgq/IMsgqMessage.cs @@ -4,5 +4,5 @@ namespace msgqNET.implementations.msgq; public interface IMsgqMessage : IMessage { - void TakeOwnership(byte[] data, int size); + void TakeOwnership(byte[] data, long size); } \ No newline at end of file diff --git a/msgqNET/implementations/msgq/MsgqMessage.cs b/msgqNET/implementations/msgq/MsgqMessage.cs index 40f7cae..76e0040 100644 --- a/msgqNET/implementations/msgq/MsgqMessage.cs +++ b/msgqNET/implementations/msgq/MsgqMessage.cs @@ -2,31 +2,38 @@ namespace msgqNET.implementations.msgq; public class MsgqMessage : IMsgqMessage { - private int _size; + private long _size; private byte[] _data; // Constructor to initialize with size - public MsgqMessage(int size) + public MsgqMessage(long size) { _size = size; _data = new byte[size]; } // Constructor to initialize with data and size - public MsgqMessage(byte[] data, int size) + public MsgqMessage(byte[] data, long size) { _size = size; _data = new byte[size]; Array.Copy(data, _data, size); } - public void TakeOwnership(byte[] data, int size) + public void TakeOwnership(byte[] data, long size) { _size = size; _data = data; } - public int GetSize() => _size; + public void Close() + { + // Release the data and reset size + _data = []; + _size = 0; + } + + public long GetSize() => _size; public byte[] GetData() => _data; } \ No newline at end of file diff --git a/msgqNET/implementations/msgq/MsgqSubSocket.cs b/msgqNET/implementations/msgq/MsgqSubSocket.cs index 139fc0f..3840838 100644 --- a/msgqNET/implementations/msgq/MsgqSubSocket.cs +++ b/msgqNET/implementations/msgq/MsgqSubSocket.cs @@ -23,7 +23,7 @@ public bool Connect(IContext context, string endpoint, string address = "127.0.0 public IMessage? Receive(bool nonBlocking = false) { - throw new NotImplementedException(); + return _queue?.Read(nonBlocking); } public IntPtr GetRawSocket() diff --git a/msgqNET/implementations/msgq/queues/MessageQueue.cs b/msgqNET/implementations/msgq/queues/MessageQueue.cs index 38d4e81..674a7e3 100644 --- a/msgqNET/implementations/msgq/queues/MessageQueue.cs +++ b/msgqNET/implementations/msgq/queues/MessageQueue.cs @@ -16,7 +16,7 @@ public abstract class MessageQueue public int Size { get; private set; } public byte[] Data { get; private set; } - protected struct MsgQueueHeader + protected record struct MsgQueueHeader { public long NumReaders; public long WritePointer; diff --git a/msgqNET/implementations/msgq/queues/MessageQueueSubscriber.cs b/msgqNET/implementations/msgq/queues/MessageQueueSubscriber.cs index 13e76c0..a0374de 100644 --- a/msgqNET/implementations/msgq/queues/MessageQueueSubscriber.cs +++ b/msgqNET/implementations/msgq/queues/MessageQueueSubscriber.cs @@ -3,29 +3,29 @@ namespace msgqNET.implementations.msgq.queues; public class MessageQueueSubscriber : MessageQueue { private static readonly long Uid = GetUid(); - public long ReaderId { get; private set; } - public bool ReadConflate { get; private set; } + private long ReaderId { get; set; } + private bool ReadConflate { get; set; } // Expose the underlying buffer and pointers through properties. // Assume Header is an object that holds the shared state. - public long WritePointer => Header.WritePointer; + private long WritePointer => Header.WritePointer; - public long ReadPointer + private long ReadPointer { get => Header.ReadPointers[ReaderId]; - private set => Header.ReadPointers[ReaderId] = value; + set => Header.ReadPointers[ReaderId] = value; } - - public bool ReadValid + + private bool ReadValid { get => Header.ReadValids[ReaderId] != 0; - private set => Header.ReadValids[ReaderId] = value ? 1 : 0; + set => Header.ReadValids[ReaderId] = value ? 1 : 0; } - - public long ReaderUid + + private long ReaderUid { get => Header.ReaderUIDs[ReaderId]; - private set => Header.ReaderUIDs[ReaderId] = value; + set => Header.ReaderUIDs[ReaderId] = value; } private readonly object _syncLock = new(); @@ -36,7 +36,7 @@ private void InitializeSubscriber() { if (accessor == null) throw new InvalidOperationException("Message queue not initialized"); - + while (true) { var curNumReaders = Header.NumReaders; @@ -105,31 +105,65 @@ public void ResetReader() } } - // private static T Align(T value) where T : INumber, IBitwiseOperators - // { - // // Constants - // var seven = T.CreateChecked(7); - // // Use the bitwise operator directly through the ~ operator - // var mask = ~seven; - // - // // (value + 7) & ~7 - // return (value + seven) & mask; - // } - - // private IMsgqMessage? ReadMessage() - // { - // while (ReaderUid != Uid) - // { - // Console.WriteLine("Reader was evicted. Reconnecting..."); - // InitializeSubscriber(); - // } - // - // while (!ReadValid) - // { - // Console.WriteLine("Reader is not valid. Resetting..."); - // ResetReader(); - // } - // - // - // } + public MsgqMessage? Read(bool nonBlocking = false) + { + do + { + lock (_syncLock) + { + // Read the message + var readPointer = new Packed64(ReadPointer); + var writePointer = new Packed64(WritePointer); + + if (readPointer == writePointer) + { + Console.WriteLine($"No message to read because read pointer [{ReadPointer}] is equal to write pointer [{WritePointer}]."); + continue; + } + + // Get size + var size = BitConverter.ToInt64(Data, readPointer.ReadPointer); + + // Check if valid + if (!ReadValid) + { + Console.WriteLine("Read pointer is invalid. Resetting reader."); + ResetReader(); + continue; + } + + if (size == -1) + { + Console.WriteLine("Size is -1. Skipping."); + readPointer.CycleCounter++; + continue; + } + + // // crashing is better than passing garbage data to the consumer + // // the size will have weird value if it was overwritten by data accidentally + // assert((uint64_t)size < q->size); + // assert(size > 0); + + // uint32_t new_read_pointer = ALIGN(read_pointer + sizeof(std::int64_t) + size); + var newReadPointer = new Packed64(readPointer.ReadPointer + sizeof(long) + size); + if (ReadConflate && writePointer != newReadPointer) + { + Console.WriteLine("Read conflate is enabled. Skipping."); + ReadPointer = newReadPointer.Value; + continue; + } + + var spn = new Span(Data, readPointer.ReadPointer + sizeof(long), (int)size); + var msg = new MsgqMessage(spn.ToArray(), size); + + ReadPointer = newReadPointer.Value; + if (ReadValid) return msg; + + msg.Close(); + ResetReader(); + } + } while (!nonBlocking); + + return null; + } } \ No newline at end of file diff --git a/msgqNET/implementations/msgq/queues/Packed64.cs b/msgqNET/implementations/msgq/queues/Packed64.cs new file mode 100644 index 0000000..e3105dd --- /dev/null +++ b/msgqNET/implementations/msgq/queues/Packed64.cs @@ -0,0 +1,39 @@ +namespace msgqNET.implementations.msgq.queues; + +public struct Packed64(long Value) : IEquatable +{ + private long _value = Value; + + // Constructor to initialize with a 64-bit value + + // Getter for the higher 32 bits + public int CycleCounter + { + get => (int)(_value >> 32); // Shift right by 32 bits to get the higher part + set => _value = ((long)value << 32) | (_value & 0xFFFFFFFF); // Set the higher part + } + + // Getter for the lower 32 bits + public int ReadPointer + { + get => (int)(_value & 0xFFFFFFFF); + set => _value = ((long)CycleCounter << 32) | (value & 0xFFFFFFFF); + } + + // Optionally, allow setting the entire 64-bit value + public long Value + { + get => _value; + set => _value = value; + } + + public bool Equals(Packed64 other) => _value == other._value; + + public override bool Equals(object? obj) => obj is Packed64 other && Equals(other); + + public override int GetHashCode() => _value.GetHashCode(); + + public static bool operator ==(Packed64 left, Packed64 right) => left.Equals(right); + + public static bool operator !=(Packed64 left, Packed64 right) => !left.Equals(right); +} \ No newline at end of file diff --git a/msgqNET/implementations/zmq/ZmqMessage.cs b/msgqNET/implementations/zmq/ZmqMessage.cs index f386e81..a1b4622 100644 --- a/msgqNET/implementations/zmq/ZmqMessage.cs +++ b/msgqNET/implementations/zmq/ZmqMessage.cs @@ -46,7 +46,7 @@ public void Close() _netMqMessage.Clear(); } - public int GetSize() + public long GetSize() { return _netMqMessage?.First.BufferSize ?? 0; } diff --git a/msgqNET/interfaces/IMessage.cs b/msgqNET/interfaces/IMessage.cs index 736d1e8..79b8ac2 100644 --- a/msgqNET/interfaces/IMessage.cs +++ b/msgqNET/interfaces/IMessage.cs @@ -4,7 +4,7 @@ public interface IMessage { // void Init(int size); // void Init(byte[] data, int size); - // void Close(); - int GetSize(); + void Close(); + long GetSize(); byte[] GetData(); } \ No newline at end of file From 289a10658b67a292a5c2f74ed4d0ee2b1602db73 Mon Sep 17 00:00:00 2001 From: devtekve Date: Mon, 24 Mar 2025 18:59:06 +0100 Subject: [PATCH 9/9] Convert queue pointers and IDs from long to ulong. Updated all relevant structures, properties, and methods across queue components to use `ulong` instead of `long`. This ensures better consistency and compatibility for unsigned values, particularly for pointer and ID handling in shared message queue operations. --- .../msgq/queues/MessageQueue.cs | 18 ++++++++--------- .../msgq/queues/MessageQueuePublisher.cs | 4 ++-- .../msgq/queues/MessageQueueSubscriber.cs | 20 ++++++++++--------- .../implementations/msgq/queues/Packed64.cs | 10 +++++----- 4 files changed, 27 insertions(+), 25 deletions(-) diff --git a/msgqNET/implementations/msgq/queues/MessageQueue.cs b/msgqNET/implementations/msgq/queues/MessageQueue.cs index 674a7e3..7031a22 100644 --- a/msgqNET/implementations/msgq/queues/MessageQueue.cs +++ b/msgqNET/implementations/msgq/queues/MessageQueue.cs @@ -18,26 +18,26 @@ public abstract class MessageQueue protected record struct MsgQueueHeader { - public long NumReaders; - public long WritePointer; - public long WriterUid; + public ulong NumReaders; + public ulong WritePointer; + public ulong WriterUid; [MarshalAs(UnmanagedType.ByValArray, SizeConst = MaxReaders)] - public long[] ReadPointers; + public ulong[] ReadPointers; [MarshalAs(UnmanagedType.ByValArray, SizeConst = MaxReaders)] - public long[] ReadValids; + public ulong[] ReadValids; [MarshalAs(UnmanagedType.ByValArray, SizeConst = MaxReaders)] - public long[] ReaderUIDs; + public ulong[] ReaderUIDs; public MsgQueueHeader(int numReaders) { NumReaders = 0; WritePointer = 0; - ReadPointers = new long[numReaders]; - ReadValids = new long[numReaders]; - ReaderUIDs = new long[numReaders]; + ReadPointers = new ulong[numReaders]; + ReadValids = new ulong[numReaders]; + ReaderUIDs = new ulong[numReaders]; } } diff --git a/msgqNET/implementations/msgq/queues/MessageQueuePublisher.cs b/msgqNET/implementations/msgq/queues/MessageQueuePublisher.cs index 079452c..3cca6a1 100644 --- a/msgqNET/implementations/msgq/queues/MessageQueuePublisher.cs +++ b/msgqNET/implementations/msgq/queues/MessageQueuePublisher.cs @@ -7,8 +7,8 @@ public MessageQueuePublisher(string path, int size) : base(path, size) if (accessor == null) throw new InvalidOperationException("Message queue not initialized"); - long uid = GetUid(); - Header.WriterUid = uid; + var uid = GetUid(); + Header.WriterUid = (ulong)uid; Header.NumReaders = 0; for (var i = 0; i < MaxReaders; i++) { diff --git a/msgqNET/implementations/msgq/queues/MessageQueueSubscriber.cs b/msgqNET/implementations/msgq/queues/MessageQueueSubscriber.cs index a0374de..da0d8cf 100644 --- a/msgqNET/implementations/msgq/queues/MessageQueueSubscriber.cs +++ b/msgqNET/implementations/msgq/queues/MessageQueueSubscriber.cs @@ -3,14 +3,14 @@ namespace msgqNET.implementations.msgq.queues; public class MessageQueueSubscriber : MessageQueue { private static readonly long Uid = GetUid(); - private long ReaderId { get; set; } + private ulong ReaderId { get; set; } private bool ReadConflate { get; set; } // Expose the underlying buffer and pointers through properties. // Assume Header is an object that holds the shared state. - private long WritePointer => Header.WritePointer; + private ulong WritePointer => Header.WritePointer; - private long ReadPointer + private ulong ReadPointer { get => Header.ReadPointers[ReaderId]; set => Header.ReadPointers[ReaderId] = value; @@ -19,10 +19,10 @@ private long ReadPointer private bool ReadValid { get => Header.ReadValids[ReaderId] != 0; - set => Header.ReadValids[ReaderId] = value ? 1 : 0; + set => Header.ReadValids[ReaderId] = value ? 1UL : 0UL; } - private long ReaderUid + private ulong ReaderUid { get => Header.ReaderUIDs[ReaderId]; set => Header.ReaderUIDs[ReaderId] = value; @@ -57,7 +57,7 @@ private void InitializeSubscriber() // Start with read_valid = false ReadValid = false; // false ReadPointer = 0; - ReaderUid = Uid; + ReaderUid = (ulong)Uid; WriteHeader(); break; @@ -99,8 +99,10 @@ public void ResetReader() lock (_syncLock) { // Sync read pointer with write pointer on reset - Header.ReadPointers[ReaderId] = Header.WritePointer; - Header.ReadValids[ReaderId] = 0; // mark as invalid, to be revalidated externally if needed + // Header.ReadPointers[ReaderId] = Header.WritePointer; + // Header.ReadValids[ReaderId] = 0; // mark as invalid, to be revalidated externally if needed + ReadValid = true; + ReadPointer = WritePointer; WriteHeader(); } } @@ -145,7 +147,7 @@ public void ResetReader() // assert(size > 0); // uint32_t new_read_pointer = ALIGN(read_pointer + sizeof(std::int64_t) + size); - var newReadPointer = new Packed64(readPointer.ReadPointer + sizeof(long) + size); + var newReadPointer = new Packed64((ulong)(readPointer.ReadPointer + sizeof(ulong) + size)); if (ReadConflate && writePointer != newReadPointer) { Console.WriteLine("Read conflate is enabled. Skipping."); diff --git a/msgqNET/implementations/msgq/queues/Packed64.cs b/msgqNET/implementations/msgq/queues/Packed64.cs index e3105dd..c3c51b4 100644 --- a/msgqNET/implementations/msgq/queues/Packed64.cs +++ b/msgqNET/implementations/msgq/queues/Packed64.cs @@ -1,8 +1,8 @@ namespace msgqNET.implementations.msgq.queues; -public struct Packed64(long Value) : IEquatable +public struct Packed64(ulong value) : IEquatable { - private long _value = Value; + private ulong _value = value; // Constructor to initialize with a 64-bit value @@ -10,18 +10,18 @@ public struct Packed64(long Value) : IEquatable public int CycleCounter { get => (int)(_value >> 32); // Shift right by 32 bits to get the higher part - set => _value = ((long)value << 32) | (_value & 0xFFFFFFFF); // Set the higher part + set => _value = ((ulong)value << 32) | (_value & 0xFFFFFFFF); // Set the higher part } // Getter for the lower 32 bits public int ReadPointer { get => (int)(_value & 0xFFFFFFFF); - set => _value = ((long)CycleCounter << 32) | (value & 0xFFFFFFFF); + set => _value = ((ulong)CycleCounter << 32) | (ulong)(value & 0xFFFFFFFF); } // Optionally, allow setting the entire 64-bit value - public long Value + public ulong Value { get => _value; set => _value = value;