diff --git a/README.md b/README.md index 499c34e..71cac38 100644 --- a/README.md +++ b/README.md @@ -43,6 +43,7 @@ Under the storage path specified by the `StorePath` configuration option, Mapped │ └── ... ├── offset │ ├── producer.offset +│ ├── producer.confirmed.offset │ └── consumer.offset ``` @@ -51,6 +52,9 @@ Details: - The `commitlog` directory stores the actual Segment files. - The `offset` directory stores the offset files for both the producer and the consumer. + - The `producer.offset` file is used to record the write offset of the producer. + - The `producer.confirmed.offset` file is used to record the offset that the producer has confirmed to be written to disk. This is used so that after a system abnormal restart, the producer can continue writing data from this offset, avoiding the situation where the consumer waits for lost data. + - The `consumer.offset` file is used to record the consumption offset of the consumer. ### Usage Example @@ -64,6 +68,8 @@ Details: - **ConsumerSpinWaitDuration**: The maximum duration for a single spin-wait for data by the consumer, default is 100 milliseconds. +- **ProducerForceFlushIntervalCount**: The number of messages after which the producer will forcibly flush data to disk after writing. The default is 1000 messages. If this number is not reached, data may be temporarily stored in memory, posing a risk of data loss until the system automatically flushes it to disk. Setting this value to 1 maximizes data safety but may impact performance. In cases of sudden power loss or other exceptions, data that has not been promptly written to disk may be lost. During recovery, the producer's offset will be rolled back to ensure that the consumer does not wait for lost data. + #### Producing and Consuming Data The producer and consumer interfaces in MappedFileQueues are as follows: @@ -76,6 +82,12 @@ public interface IMappedFileProducer where T : struct // Observes the next writable offset for the current producer public long Offset { get; } + // Observes the offset that the current producer has confirmed to be written to disk + public long ConfirmedOffset { get; } + + // Adjusts the offset for the current producer + public void AdjustOffset(long offset); + public void Produce(ref T item); } diff --git a/README.zh-CN.md b/README.zh-CN.md index f33fe46..dbe60fb 100644 --- a/README.zh-CN.md +++ b/README.zh-CN.md @@ -43,6 +43,7 @@ offset 使用 long 类型存储,支持的最大值为 2^63-1。 │ └── ... ├── offset │ ├── producer.offset +│ ├── producer.confirmed.offset │ └── consumer.offset ``` @@ -51,7 +52,9 @@ offset 使用 long 类型存储,支持的最大值为 2^63-1。 - `commitlog` 目录存储实际的 Segment 文件。 - `offset` 目录存储生产者和消费者的偏移量文件。 - + - `producer.offset` 文件用于记录生产者的写入偏移量。 + - `producer.confirmed.offset` 文件用于记录生产者已确认写入磁盘的偏移量,用于在系统异常重启后,生产者可以从该偏移量继续写入数据,避免消费者陷入等待已丢失数据的情况。 + - `consumer.offset` 文件用于记录消费者的消费偏移量。 ### 使用示例 #### 配置选项(MappedFileQueueOptions) @@ -64,6 +67,8 @@ offset 使用 long 类型存储,支持的最大值为 2^63-1。 - **ConsumerSpinWaitDuration**:消费者单次自旋等待数据时的最大等待时间,默认为 100 毫秒。 +- **ProducerForceFlushIntervalCount**:生产者在写入数据后,强制将数据刷新到磁盘的间隔消息数量,默认为 1000 条消息。在未达到该数量时,数据可能会暂存在内存中,存在丢失的风险,需等待系统自动刷新到磁盘。将此值设置为 1 可最大程度地保证数据安全,但会影响性能。在突然断电等异常情况下,未及时落盘的数据可能会丢失,恢复时将对生产者的 offset 进行回退,以确保消费者不会等待已丢失的数据。 + #### 生产和消费数据 MappedFileQueues 中的生产者和消费者接口如下所示: @@ -74,6 +79,12 @@ public interface IMappedFileProducer where T : struct // 用于观察当前生产者的下一个可写入的偏移量 public long Offset { get; } + // 用于观察当前生产者已确认写入磁盘的偏移量 + public long ConfirmedOffset { get; } + + // 调整当前生产者的偏移量 + public void AdjustOffset(long offset); + public void Produce(ref T message); } diff --git a/src/MappedFileQueues/Constants.cs b/src/MappedFileQueues/Constants.cs index 3c4ba32..3c7445f 100644 --- a/src/MappedFileQueues/Constants.cs +++ b/src/MappedFileQueues/Constants.cs @@ -8,6 +8,8 @@ internal class Constants public const string ProducerOffsetFile = "producer.offset"; + public const string ProducerConfirmedOffsetFile = "producer.confirmed.offset"; + public const string ConsumerOffsetFile = "consumer.offset"; public const byte EndMarker = 0xFF; diff --git a/src/MappedFileQueues/IMappedFileConsumer.cs b/src/MappedFileQueues/IMappedFileConsumer.cs index 3148caf..81746a6 100644 --- a/src/MappedFileQueues/IMappedFileConsumer.cs +++ b/src/MappedFileQueues/IMappedFileConsumer.cs @@ -11,6 +11,8 @@ public interface IMappedFileConsumer where T : struct /// Adjusts the offset to consume from the mapped file queue. /// /// The new offset to set. + /// Thrown when the provided offset is negative. + /// Thrown when the producer has already started consuming messages. public void AdjustOffset(long offset); /// @@ -23,4 +25,10 @@ public interface IMappedFileConsumer where T : struct /// Commits the offset of the last consumed message. /// public void Commit(); + + /// + /// Checks if there is a next message available to consume. + /// + /// True if there is a next message available; otherwise, false. + internal bool NextMessageAvailable(); } diff --git a/src/MappedFileQueues/IMappedFileProducer.cs b/src/MappedFileQueues/IMappedFileProducer.cs index fec9af7..9a1d932 100644 --- a/src/MappedFileQueues/IMappedFileProducer.cs +++ b/src/MappedFileQueues/IMappedFileProducer.cs @@ -7,6 +7,19 @@ public interface IMappedFileProducer where T : struct /// public long Offset { get; } + /// + /// The last offset that has been fully persisted (confirmed written to storage). + /// + public long ConfirmedOffset { get; } + + /// + /// Adjusts the offset to produce to the mapped file queue. + /// + /// The new offset to set. + /// Thrown when the provided offset is negative. + /// Thrown when the producer has already started producing messages. + public void AdjustOffset(long offset); + /// /// Produces a message to the mapped file queue. /// diff --git a/src/MappedFileQueues/MappedFileConsumer.cs b/src/MappedFileQueues/MappedFileConsumer.cs index dc2f646..feaa49f 100644 --- a/src/MappedFileQueues/MappedFileConsumer.cs +++ b/src/MappedFileQueues/MappedFileConsumer.cs @@ -53,7 +53,7 @@ public void AdjustOffset(long offset) "Cannot adjust offset while there is an active segment. Please adjust the offset before consuming any messages."); } - _offsetFile.MoveTo(offset); + _offsetFile.MoveTo(offset, true); } public void Consume(out T message) @@ -120,6 +120,13 @@ public void Dispose() _segment?.Dispose(); } + public bool NextMessageAvailable() => + _segment switch + { + null when !TryFindSegmentByOffset(out _segment) => false, + _ => _segment.TryRead(_offsetFile.Offset, out _) + }; + private bool TryFindSegmentByOffset([MaybeNullWhen(false)] out MappedFileSegment segment) => MappedFileSegment.TryFind( _segmentDirectory, diff --git a/src/MappedFileQueues/MappedFileProducer.cs b/src/MappedFileQueues/MappedFileProducer.cs index cfe94bd..2a08634 100644 --- a/src/MappedFileQueues/MappedFileProducer.cs +++ b/src/MappedFileQueues/MappedFileProducer.cs @@ -9,6 +9,9 @@ internal class MappedFileProducer : IMappedFileProducer, IDisposable where // Memory mapped file to store the producer offset private readonly OffsetMappedFile _offsetFile; + // Memory mapped file to store the confirmed producer offset + private readonly OffsetMappedFile _confirmedOffsetFile; + private readonly int _payloadSize; private readonly string _segmentDirectory; @@ -17,6 +20,8 @@ internal class MappedFileProducer : IMappedFileProducer, IDisposable where private bool _disposed; + private long _producedCount = 0; + public MappedFileProducer(MappedFileQueueOptions options) { _options = options; @@ -30,6 +35,9 @@ public MappedFileProducer(MappedFileQueueOptions options) var offsetPath = Path.Combine(offsetDir, Constants.ProducerOffsetFile); _offsetFile = new OffsetMappedFile(offsetPath); + var confirmedOffsetPath = Path.Combine(offsetDir, Constants.ProducerConfirmedOffsetFile); + _confirmedOffsetFile = new OffsetMappedFile(confirmedOffsetPath); + _payloadSize = Unsafe.SizeOf(); _segmentDirectory = Path.Combine(options.StorePath, Constants.CommitLogDirectory); @@ -37,6 +45,26 @@ public MappedFileProducer(MappedFileQueueOptions options) public long Offset => _offsetFile.Offset; + public long ConfirmedOffset => _confirmedOffsetFile.Offset; + + public void AdjustOffset(long offset) + { + ObjectDisposedException.ThrowIf(_disposed, this); + + if (offset < 0) + { + throw new ArgumentOutOfRangeException(nameof(offset), "Offset must be greater than or equal to zero."); + } + + if (_segment != null) + { + throw new InvalidOperationException( + "Cannot adjust offset while there is an active segment. Please adjust the offset before producing any messages."); + } + + _offsetFile.MoveTo(offset, true); + } + public void Produce(ref T message) { ObjectDisposedException.ThrowIf(_disposed, this); @@ -57,6 +85,7 @@ public void Dispose() _disposed = true; _offsetFile.Dispose(); + _confirmedOffsetFile.Dispose(); _segment?.Dispose(); } @@ -71,6 +100,15 @@ private void Commit() _offsetFile.Advance(_payloadSize + Constants.EndMarkerSize); + _producedCount++; + + if (_producedCount % _options.ProducerForceFlushIntervalCount == 0) + { + // Force flush the segment and update the confirmed offset + _segment.Flush(); + _confirmedOffsetFile.MoveTo(_offsetFile.Offset, true); + } + // Check if the segment has reached its limit if (_segment.AllowedLastOffsetToWrite < _offsetFile.Offset) { diff --git a/src/MappedFileQueues/MappedFileQueueOptions.cs b/src/MappedFileQueues/MappedFileQueueOptions.cs index cd79d90..21d2837 100644 --- a/src/MappedFileQueues/MappedFileQueueOptions.cs +++ b/src/MappedFileQueues/MappedFileQueueOptions.cs @@ -21,4 +21,9 @@ public class MappedFileQueueOptions /// The maximum duration a consumer will spin-wait each time for an item to become available. /// public TimeSpan ConsumerSpinWaitDuration { get; set; } = TimeSpan.FromMilliseconds(100); + + /// + /// Number of produced items after which the producer will perform a forced flush. + /// + public long ProducerForceFlushIntervalCount { get; set; } = 1000; } diff --git a/src/MappedFileQueues/MappedFileQueueT.cs b/src/MappedFileQueues/MappedFileQueueT.cs index 8fad84f..7b6f6c0 100644 --- a/src/MappedFileQueues/MappedFileQueueT.cs +++ b/src/MappedFileQueues/MappedFileQueueT.cs @@ -9,26 +9,30 @@ public sealed class MappedFileQueue : IDisposable where T : struct public MappedFileQueue(MappedFileQueueOptions options) { + _options = options; + ArgumentException.ThrowIfNullOrWhiteSpace(options.StorePath, nameof(options.StorePath)); + if (options.SegmentSize <= 0) + { + throw new ArgumentOutOfRangeException(nameof(options.SegmentSize), + "SegmentSize must be greater than zero."); + } + if (File.Exists(options.StorePath)) { throw new ArgumentException($"The path '{options.StorePath}' is a file, not a directory.", nameof(options.StorePath)); } - if (!Directory.Exists(options.StorePath)) + if (Directory.Exists(options.StorePath)) { - Directory.CreateDirectory(options.StorePath); + RecoverProducerOffsetIfNeeded(); } - - if (options.SegmentSize <= 0) + else { - throw new ArgumentOutOfRangeException(nameof(options.SegmentSize), - "SegmentSize must be greater than zero."); + Directory.CreateDirectory(options.StorePath); } - - _options = options; } public IMappedFileProducer Producer => _producer ??= new MappedFileProducer(_options); @@ -39,4 +43,72 @@ public void Dispose() _producer?.Dispose(); _consumer?.Dispose(); } + + // Check the last data. If the producer's offset is greater than the consumer's offset, + // and the consumer cannot consume the next piece of data, it means that there is data + // in the queue that was not persisted before the crash. We need to roll back the + // producer's offset to the position of the data that has been confirmed to be persisted. + private void RecoverProducerOffsetIfNeeded() + { + // On Windows, use "Global\" prefix to make the named semaphore visible across all user sessions (system-wide on this machine). + var semName = $"Global\\MappedFileQueueSem_{_options.StorePath.GetHashCode()}"; + Semaphore? semaphore = null; + try + { + semaphore = new Semaphore(1, 1, semName); + } + catch (NotSupportedException) + { + // Named semaphores are not supported on this platform, use unnamed semaphore instead. + semaphore = new Semaphore(1, 1); + } + + using var sem = semaphore; + + semaphore.WaitOne(); + + try + { + var consumer = Consumer; + + if (consumer.NextMessageAvailable()) + { + return; + } + + var producer = Producer; + + if (producer.Offset <= consumer.Offset) + { + // the consumer can continue to consume when + // the producer produces a new message + return; + } + + var rollbackOffset = Math.Max(consumer.Offset, producer.ConfirmedOffset); + + if (producer.Offset > rollbackOffset) + { + producer.AdjustOffset(rollbackOffset); + } + + if (producer.Offset > consumer.Offset + && !consumer.NextMessageAvailable()) + { + throw new InvalidOperationException( + "After recovering the producer's offset, the consumer still cannot consume the next message, the data may be corrupted."); + } + } + finally + { + // The producer and consumer may not be used after this recovery process, + // so we dispose them here. + _producer?.Dispose(); + _consumer?.Dispose(); + _producer = null; + _consumer = null; + + semaphore.Release(); + } + } } diff --git a/src/MappedFileQueues/MappedFileSegment.cs b/src/MappedFileQueues/MappedFileSegment.cs index 4ccdbef..b34eee6 100644 --- a/src/MappedFileQueues/MappedFileSegment.cs +++ b/src/MappedFileQueues/MappedFileSegment.cs @@ -130,8 +130,21 @@ public bool TryRead(long offset, out T message) return true; } + + /// + /// Forces any changes made to the memory-mapped view to be written to the underlying file. + /// It is not usually necessary to call this method, as the system will flush changes automatically, + /// but it can be useful in scenarios where data integrity is critical. + /// + public void Flush() + { + _viewAccessor.Flush(); + _fileStream.Flush(true); + } + public void Dispose() { + Flush(); _viewAccessor.Dispose(); _mmf.Dispose(); _fileStream.Dispose(); diff --git a/src/MappedFileQueues/OffsetMappedFile.cs b/src/MappedFileQueues/OffsetMappedFile.cs index 1635ed5..71813d5 100644 --- a/src/MappedFileQueues/OffsetMappedFile.cs +++ b/src/MappedFileQueues/OffsetMappedFile.cs @@ -38,7 +38,7 @@ public void Advance(long step) _vierAccessor.Write(0, _offset); } - public void MoveTo(long offset) + public void MoveTo(long offset, bool flushToDisk = false) { if (offset < 0) { @@ -47,6 +47,10 @@ public void MoveTo(long offset) _offset = offset; _vierAccessor.Write(0, _offset); + if (flushToDisk) + { + Flush(); + } } public void Dispose() @@ -55,4 +59,10 @@ public void Dispose() _mmf.Dispose(); _vierAccessor.Dispose(); } + + public void Flush() + { + _vierAccessor.Flush(); + _fileStream.Flush(true); + } } diff --git a/tests/MappedFileQueues.Tests/MappedFileQueueTests.cs b/tests/MappedFileQueues.Tests/MappedFileQueueTests.cs index 276cff4..fde1845 100644 --- a/tests/MappedFileQueues.Tests/MappedFileQueueTests.cs +++ b/tests/MappedFileQueues.Tests/MappedFileQueueTests.cs @@ -1,3 +1,5 @@ +using System.Runtime.CompilerServices; + namespace MappedFileQueues.Tests; public class MappedFileQueueTests @@ -262,4 +264,206 @@ public void Consumer_Offset_Can_Be_Manually_Set() Assert.Equal(i + 3, testStruct.D); } } + + [Fact] + public void Producer_Force_Flush_Works() + { + using var tempStorePath1 = TempStorePath.Create(); + + var options = new MappedFileQueueOptions + { + StorePath = tempStorePath1.Path, + SegmentSize = 32, + ProducerForceFlushIntervalCount = 5 + }; + + using var queue = MappedFileQueue.Create(options); + + var producer = queue.Producer; + + var consumer = queue.Consumer; + + for (var i = 0; i < 10; i++) + { + var testStruct = new TestStructSize16 { A = i, B = i + 1, C = i + 2, D = i + 3 }; + + producer.Produce(ref testStruct); + + if (i is 4 or 9) + { + // After producing 5 and 10 messages, the confirmed offset should be updated + Assert.Equal((i + 1) * (Unsafe.SizeOf() + Constants.EndMarkerSize), + producer.ConfirmedOffset); + } + } + + for (var i = 0; i < 10; i++) + { + consumer.Consume(out var testStruct); + consumer.Commit(); + + Assert.Equal(i, testStruct.A); + Assert.Equal(i + 1, testStruct.B); + Assert.Equal(i + 2, testStruct.C); + Assert.Equal(i + 3, testStruct.D); + } + } + + [Fact] + public void Producer_AdjustOffset_Throws_When_Producing() + { + using var tempStorePath = TempStorePath.Create(); + + var options = new MappedFileQueueOptions { StorePath = tempStorePath.Path, SegmentSize = 64 }; + + using var queue = MappedFileQueue.Create(options); + + var producer = queue.Producer; + + var testStruct = new TestStructSize16 { A = 1, B = 2, C = 3, D = 4 }; + + producer.Produce(ref testStruct); + + Assert.Throws(() => producer.AdjustOffset(0)); + } + + [Fact] + public void Consumer_AdjustOffset_Throws_When_Consuming() + { + using var tempStorePath = TempStorePath.Create(); + + var options = new MappedFileQueueOptions { StorePath = tempStorePath.Path, SegmentSize = 32 }; + + using var queue = MappedFileQueue.Create(options); + + var producer = queue.Producer; + + for (var i = 0; i < 3; i++) + { + var testStruct = new TestStructSize16 { A = i, B = i + 1, C = i + 2, D = i + 3 }; + producer.Produce(ref testStruct); + } + + var consumer = queue.Consumer; + + consumer.Consume(out _); + + Assert.Throws(() => consumer.AdjustOffset(0)); + } + + [Fact] + public void Dispose_Multiple_Times_Does_Not_Throw() + { + using var tempStorePath = TempStorePath.Create(); + + var options = new MappedFileQueueOptions { StorePath = tempStorePath.Path, SegmentSize = 64 }; + + var queue = MappedFileQueue.Create(options); + + // Dispose multiple times + queue.Dispose(); + queue.Dispose(); + } + + [Fact] + public void Produce_After_Dispose_Throws() + { + using var tempStorePath = TempStorePath.Create(); + var options = new MappedFileQueueOptions { StorePath = tempStorePath.Path, SegmentSize = 64 }; + using var queue = MappedFileQueue.Create(options); + var producer = queue.Producer; + queue.Dispose(); + var testStruct = new TestStructSize16 { A = 1, B = 2, C = 3, D = 4 }; + Assert.Throws(() => producer.Produce(ref testStruct)); + } + + [Fact] + public void Consume_After_Dispose_Throws() + { + using var tempStorePath = TempStorePath.Create(); + var options = new MappedFileQueueOptions { StorePath = tempStorePath.Path, SegmentSize = 64 }; + using var queue = MappedFileQueue.Create(options); + var consumer = queue.Consumer; + queue.Dispose(); + Assert.Throws(() => consumer.Consume(out _)); + } + + [Fact] + public void Recover_Producer_Offset_After_Crash() + { + using var tempStorePath = TempStorePath.Create(); + + var options = new MappedFileQueueOptions + { + StorePath = tempStorePath.Path, + SegmentSize = 32, + ProducerForceFlushIntervalCount = 4 + }; + + var messageSize = Unsafe.SizeOf() + Constants.EndMarkerSize; + + // Simulate first run + using (var queue = MappedFileQueue.Create(options)) + { + var producer = queue.Producer; + + for (var i = 0; i < 5; i++) + { + var testStruct = new TestStructSize16 { A = i, B = i + 1, C = i + 2, D = i + 3 }; + producer.Produce(ref testStruct); + } + + Assert.Equal(4 * messageSize, producer.ConfirmedOffset); + + var consumer = queue.Consumer; + + for (var i = 0; i < 5; i++) + { + consumer.Consume(out var testStruct); + consumer.Commit(); + + Assert.Equal(i, testStruct.A); + Assert.Equal(i + 1, testStruct.B); + Assert.Equal(i + 2, testStruct.C); + Assert.Equal(i + 3, testStruct.D); + } + } + + // Simulate crash by modify the producer offset to be beyond the confirmed offset + using (var queue = MappedFileQueue.Create(options)) + { + var producer = queue.Producer; + var consumer = queue.Consumer; + + // Move the producer offset to the end (simulate uncommitted data) + producer.AdjustOffset(7 * messageSize); + } + + // Simulate recovery run + using (var queue = MappedFileQueue.Create(options)) + { + var producer = queue.Producer; + var consumer = queue.Consumer; + + // Producer's offset should be rolled back to the max(confirmed offset, consumer offset) = 5 * messageSize + Assert.Equal(5 * messageSize, producer.Offset); + + for (var i = 4; i < 10; i++) + { + var testStruct = new TestStructSize16 { A = i, B = i + 1, C = i + 2, D = i + 3 }; + producer.Produce(ref testStruct); + } + + for (var i = 4; i < 10; i++) + { + consumer.Consume(out var testStruct); + consumer.Commit(); + + Assert.Equal(i, testStruct.A); + Assert.Equal(i + 1, testStruct.B); + Assert.Equal(i + 2, testStruct.C); + Assert.Equal(i + 3, testStruct.D); + } + } + } }