From 7059299ba8f4e942b11f10dd9f5a4341e621ec09 Mon Sep 17 00:00:00 2001 From: Event Horizon <772552754@qq.com> Date: Fri, 14 Nov 2025 06:04:00 +0800 Subject: [PATCH] add CrossPlatformProcessLock --- .../CrossPlatformProcessLock.cs | 133 ++++++++++++++++++ src/MappedFileQueues/MappedFileQueueT.cs | 20 +-- src/MappedFileQueues/OffsetMappedFile.cs | 1 + 3 files changed, 138 insertions(+), 16 deletions(-) create mode 100644 src/MappedFileQueues/CrossPlatformProcessLock.cs diff --git a/src/MappedFileQueues/CrossPlatformProcessLock.cs b/src/MappedFileQueues/CrossPlatformProcessLock.cs new file mode 100644 index 0000000..b77986e --- /dev/null +++ b/src/MappedFileQueues/CrossPlatformProcessLock.cs @@ -0,0 +1,133 @@ +using System.Runtime.InteropServices; +using System.Security.Cryptography; + +namespace MappedFileQueues; + +/// +/// A cross-platform process lock implementation. Only Windows and Linux are supported. +/// On Windows, a named mutex is used. On Linux, a file lock is used. On other platforms, a no-op lock is used. +/// +internal sealed class CrossPlatformProcessLock : IDisposable +{ + private readonly IProcessLock _lock; + + /// + /// Initializes a new instance of the class. + /// + /// The name of the process lock. + /// The store path of the . + public CrossPlatformProcessLock(string name, string storePath) + { + if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) + { + var hashBytes = MD5.HashData(System.Text.Encoding.UTF8.GetBytes(storePath)); + var hashString = Convert.ToHexString(hashBytes); + name = $"{name}_{hashString}"; + _lock = new MutexProcessLock(name); + } + else if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux)) + { + var lockFilePath = Path.Combine(storePath, $"{name}.lock"); + _lock = new FileProcessLock(lockFilePath); + } + else + { + _lock = new EmptyProcessLock(); + } + } + + public void Acquire() => _lock.Acquire(); + public void Release() => _lock.Release(); + + public void Dispose() => _lock.Dispose(); + + #region Process Lock Interface + + private interface IProcessLock : IDisposable + { + void Acquire(); + void Release(); + } + + #endregion + + #region Windows Named Mutex Lock + + private class MutexProcessLock(string name) : IProcessLock + { + private Mutex? _mutex; + // Global\ Make the mutex available across sessions + private readonly string _name = $@"Global\{name}"; + + public void Acquire() + { + _mutex = new Mutex(false, _name); + _mutex.WaitOne(); + } + + public void Release() + { + _mutex?.ReleaseMutex(); + _mutex?.Dispose(); + } + + public void Dispose() => Release(); + } + + #endregion + + #region Linux File Lock + + private class FileProcessLock(string lockFilePath) : IProcessLock + { + private FileStream? _lockFileStream; + + public void Acquire() + { + while (true) + { + try + { + _lockFileStream = new FileStream(lockFilePath, FileMode.OpenOrCreate, + FileAccess.ReadWrite, FileShare.ReadWrite); + _lockFileStream.Lock(0, 0); + return; + } + catch (IOException) + { + // Lock is held by another process, wait and retry + Thread.Sleep(200); + } + } + } + + public void Release() + { + _lockFileStream?.Unlock(0, 0); + _lockFileStream?.Dispose(); + } + + public void Dispose() => Release(); + } + + #endregion + + #region Empty Lock (for unsupported platforms) + + private class EmptyProcessLock : IProcessLock + { + public void Acquire() + { + } + + public void Release() + { + } + + public void Dispose() + { + } + } + + #endregion +} diff --git a/src/MappedFileQueues/MappedFileQueueT.cs b/src/MappedFileQueues/MappedFileQueueT.cs index 7b6f6c0..fa609cb 100644 --- a/src/MappedFileQueues/MappedFileQueueT.cs +++ b/src/MappedFileQueues/MappedFileQueueT.cs @@ -50,22 +50,10 @@ public void Dispose() // producer's offset to the position of the data that has been confirmed to be persisted. private void RecoverProducerOffsetIfNeeded() { - // On Windows, use "Global\" prefix to make the named semaphore visible across all user sessions (system-wide on this machine). - var semName = $"Global\\MappedFileQueueSem_{_options.StorePath.GetHashCode()}"; - Semaphore? semaphore = null; - try - { - semaphore = new Semaphore(1, 1, semName); - } - catch (NotSupportedException) - { - // Named semaphores are not supported on this platform, use unnamed semaphore instead. - semaphore = new Semaphore(1, 1); - } - - using var sem = semaphore; + var lockName = "recovery_lock"; + using var processLock = new CrossPlatformProcessLock(lockName, _options.StorePath); - semaphore.WaitOne(); + processLock.Acquire(); try { @@ -108,7 +96,7 @@ private void RecoverProducerOffsetIfNeeded() _producer = null; _consumer = null; - semaphore.Release(); + processLock.Release(); } } } diff --git a/src/MappedFileQueues/OffsetMappedFile.cs b/src/MappedFileQueues/OffsetMappedFile.cs index 71813d5..adae3b2 100644 --- a/src/MappedFileQueues/OffsetMappedFile.cs +++ b/src/MappedFileQueues/OffsetMappedFile.cs @@ -55,6 +55,7 @@ public void MoveTo(long offset, bool flushToDisk = false) public void Dispose() { + Flush(); _fileStream.Dispose(); _mmf.Dispose(); _vierAccessor.Dispose();