diff --git a/src/MappedFileQueues/CrossPlatformProcessLock.cs b/src/MappedFileQueues/CrossPlatformProcessLock.cs
new file mode 100644
index 0000000..b77986e
--- /dev/null
+++ b/src/MappedFileQueues/CrossPlatformProcessLock.cs
@@ -0,0 +1,133 @@
+using System.Runtime.InteropServices;
+using System.Security.Cryptography;
+
+namespace MappedFileQueues;
+
+///
+/// A cross-platform process lock implementation. Only Windows and Linux are supported.
+/// On Windows, a named mutex is used. On Linux, a file lock is used. On other platforms, a no-op lock is used.
+///
+internal sealed class CrossPlatformProcessLock : IDisposable
+{
+ private readonly IProcessLock _lock;
+
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// The name of the process lock.
+ /// The store path of the .
+ public CrossPlatformProcessLock(string name, string storePath)
+ {
+ if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
+ {
+ var hashBytes = MD5.HashData(System.Text.Encoding.UTF8.GetBytes(storePath));
+ var hashString = Convert.ToHexString(hashBytes);
+ name = $"{name}_{hashString}";
+ _lock = new MutexProcessLock(name);
+ }
+ else if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux))
+ {
+ var lockFilePath = Path.Combine(storePath, $"{name}.lock");
+ _lock = new FileProcessLock(lockFilePath);
+ }
+ else
+ {
+ _lock = new EmptyProcessLock();
+ }
+ }
+
+ public void Acquire() => _lock.Acquire();
+ public void Release() => _lock.Release();
+
+ public void Dispose() => _lock.Dispose();
+
+ #region Process Lock Interface
+
+ private interface IProcessLock : IDisposable
+ {
+ void Acquire();
+ void Release();
+ }
+
+ #endregion
+
+ #region Windows Named Mutex Lock
+
+ private class MutexProcessLock(string name) : IProcessLock
+ {
+ private Mutex? _mutex;
+ // Global\ Make the mutex available across sessions
+ private readonly string _name = $@"Global\{name}";
+
+ public void Acquire()
+ {
+ _mutex = new Mutex(false, _name);
+ _mutex.WaitOne();
+ }
+
+ public void Release()
+ {
+ _mutex?.ReleaseMutex();
+ _mutex?.Dispose();
+ }
+
+ public void Dispose() => Release();
+ }
+
+ #endregion
+
+ #region Linux File Lock
+
+ private class FileProcessLock(string lockFilePath) : IProcessLock
+ {
+ private FileStream? _lockFileStream;
+
+ public void Acquire()
+ {
+ while (true)
+ {
+ try
+ {
+ _lockFileStream = new FileStream(lockFilePath, FileMode.OpenOrCreate,
+ FileAccess.ReadWrite, FileShare.ReadWrite);
+ _lockFileStream.Lock(0, 0);
+ return;
+ }
+ catch (IOException)
+ {
+ // Lock is held by another process, wait and retry
+ Thread.Sleep(200);
+ }
+ }
+ }
+
+ public void Release()
+ {
+ _lockFileStream?.Unlock(0, 0);
+ _lockFileStream?.Dispose();
+ }
+
+ public void Dispose() => Release();
+ }
+
+ #endregion
+
+ #region Empty Lock (for unsupported platforms)
+
+ private class EmptyProcessLock : IProcessLock
+ {
+ public void Acquire()
+ {
+ }
+
+ public void Release()
+ {
+ }
+
+ public void Dispose()
+ {
+ }
+ }
+
+ #endregion
+}
diff --git a/src/MappedFileQueues/MappedFileQueueT.cs b/src/MappedFileQueues/MappedFileQueueT.cs
index 7b6f6c0..fa609cb 100644
--- a/src/MappedFileQueues/MappedFileQueueT.cs
+++ b/src/MappedFileQueues/MappedFileQueueT.cs
@@ -50,22 +50,10 @@ public void Dispose()
// producer's offset to the position of the data that has been confirmed to be persisted.
private void RecoverProducerOffsetIfNeeded()
{
- // On Windows, use "Global\" prefix to make the named semaphore visible across all user sessions (system-wide on this machine).
- var semName = $"Global\\MappedFileQueueSem_{_options.StorePath.GetHashCode()}";
- Semaphore? semaphore = null;
- try
- {
- semaphore = new Semaphore(1, 1, semName);
- }
- catch (NotSupportedException)
- {
- // Named semaphores are not supported on this platform, use unnamed semaphore instead.
- semaphore = new Semaphore(1, 1);
- }
-
- using var sem = semaphore;
+ var lockName = "recovery_lock";
+ using var processLock = new CrossPlatformProcessLock(lockName, _options.StorePath);
- semaphore.WaitOne();
+ processLock.Acquire();
try
{
@@ -108,7 +96,7 @@ private void RecoverProducerOffsetIfNeeded()
_producer = null;
_consumer = null;
- semaphore.Release();
+ processLock.Release();
}
}
}
diff --git a/src/MappedFileQueues/OffsetMappedFile.cs b/src/MappedFileQueues/OffsetMappedFile.cs
index 71813d5..adae3b2 100644
--- a/src/MappedFileQueues/OffsetMappedFile.cs
+++ b/src/MappedFileQueues/OffsetMappedFile.cs
@@ -55,6 +55,7 @@ public void MoveTo(long offset, bool flushToDisk = false)
public void Dispose()
{
+ Flush();
_fileStream.Dispose();
_mmf.Dispose();
_vierAccessor.Dispose();