From f65a1d25eef327b9c8b86e43c2901c033aea04ae Mon Sep 17 00:00:00 2001 From: Event Horizon <772552754@qq.com> Date: Fri, 27 Feb 2026 19:38:55 +0800 Subject: [PATCH 1/2] fix read unwritten data --- ...moryBuferQueue.cs => MemoryBufferQueue.cs} | 0 src/BufferQueue/Memory/MemoryBufferSegment.cs | 70 +++++++++++++++---- 2 files changed, 56 insertions(+), 14 deletions(-) rename src/BufferQueue/Memory/{MemoryBuferQueue.cs => MemoryBufferQueue.cs} (100%) diff --git a/src/BufferQueue/Memory/MemoryBuferQueue.cs b/src/BufferQueue/Memory/MemoryBufferQueue.cs similarity index 100% rename from src/BufferQueue/Memory/MemoryBuferQueue.cs rename to src/BufferQueue/Memory/MemoryBufferQueue.cs diff --git a/src/BufferQueue/Memory/MemoryBufferSegment.cs b/src/BufferQueue/Memory/MemoryBufferSegment.cs index 6592281..d666a08 100644 --- a/src/BufferQueue/Memory/MemoryBufferSegment.cs +++ b/src/BufferQueue/Memory/MemoryBufferSegment.cs @@ -16,14 +16,16 @@ internal sealed class MemoryBufferSegment private readonly MemoryBufferPartitionOffset _startOffset; private readonly MemoryBufferPartitionOffset _endOffset; private readonly T[] _slots; - private volatile int _writePosition; + private volatile int _reservedWritePosition; + private volatile int _publishedWritePosition; public MemoryBufferSegment(int length, MemoryBufferPartitionOffset startOffset) { _startOffset = startOffset; _endOffset = startOffset + (ulong)(length - 1); _slots = new T[length]; - _writePosition = -1; + _reservedWritePosition = -1; + _publishedWritePosition = -1; } private MemoryBufferSegment(T[] slots, MemoryBufferPartitionOffset startOffset) @@ -31,7 +33,8 @@ private MemoryBufferSegment(T[] slots, MemoryBufferPartitionOffset startOffset) _startOffset = startOffset; _endOffset = startOffset + (ulong)(slots.Length - 1); _slots = slots; - _writePosition = -1; + _reservedWritePosition = -1; + _publishedWritePosition = -1; } public MemoryBufferSegment? NextSegment { get; set; } @@ -42,19 +45,58 @@ private MemoryBufferSegment(T[] slots, MemoryBufferPartitionOffset startOffset) public int Capacity => _slots.Length; - public int Count => Math.Min(Capacity, _writePosition + 1); + public int Count => Math.Min(Capacity, _publishedWritePosition + 1); public bool TryEnqueue(T item) { - var writePosition = Interlocked.Increment(ref _writePosition); - if (writePosition >= _slots.Length) + while (true) { - _writePosition = _slots.Length - 1; - return false; - } + var currentReserved = _reservedWritePosition; + var nextPosition = currentReserved + 1; + if (nextPosition >= _slots.Length) + { + // No more space to write in this segment. + return false; + } + + if (Interlocked.CompareExchange( + ref _reservedWritePosition, + nextPosition, + currentReserved) + != currentReserved) + { + // Another thread has already written to the next position, retry. + continue; + } + + // Write the item to the slot. + // It's safe to write directly without locks because each position is written by at most one thread. + _slots[nextPosition] = item; + + // Now we need to publish the new write position so that readers can see the new item. + while (true) + { + var currentPublished = _publishedWritePosition; + if (currentPublished >= nextPosition) + { + // Another thread has already published a position that is greater than our next position, + // which means our item is already visible to readers, no need to publish again. + break; + } + + if (Interlocked.CompareExchange( + ref _publishedWritePosition, + nextPosition, + currentPublished) + == currentPublished) + { + // Successfully published the new write position, now readers can see the new item. + break; + } + } - _slots[writePosition] = item; - return true; + return true; + } } public bool TryGet(MemoryBufferPartitionOffset offset, int count, [NotNullWhen(true)] out T[]? items) @@ -67,13 +109,13 @@ public bool TryGet(MemoryBufferPartitionOffset offset, int count, [NotNullWhen(t var readPosition = (offset - _startOffset).ToInt32(); - if (_writePosition < 0 || readPosition > _writePosition) + if (_publishedWritePosition < 0 || readPosition > _publishedWritePosition) { items = null; return false; } - var writePosition = Math.Min(_writePosition, _slots.Length - 1); + var writePosition = Math.Min(_publishedWritePosition, _slots.Length - 1); var actualCount = Math.Min(count, writePosition - readPosition + 1); var wholeSegment = readPosition == 0 && actualCount == _slots.Length; if (wholeSegment) @@ -102,6 +144,6 @@ private class DebugView(MemoryBufferSegment segment) public int Count => segment.Count; - public T[] Items => segment._slots.Take(segment._writePosition + 1).ToArray(); + public T[] Items => segment._slots.Take(segment._publishedWritePosition + 1).ToArray(); } } From 645d479b940c4fda6f70f779c1a552e7f6f9e6a8 Mon Sep 17 00:00:00 2001 From: Event Horizon <772552754@qq.com> Date: Fri, 27 Feb 2026 20:17:23 +0800 Subject: [PATCH 2/2] fix read unwritten data --- .../Memory/MemoryBufferConsumer.cs | 2 +- .../Memory/MemoryBufferPartition.cs | 67 ++++++++----------- .../Memory/MemoryBufferProducer.cs | 2 +- src/BufferQueue/Memory/MemoryBufferSegment.cs | 7 +- 4 files changed, 33 insertions(+), 45 deletions(-) diff --git a/src/BufferQueue/Memory/MemoryBufferConsumer.cs b/src/BufferQueue/Memory/MemoryBufferConsumer.cs index baa0c74..a55c033 100644 --- a/src/BufferQueue/Memory/MemoryBufferConsumer.cs +++ b/src/BufferQueue/Memory/MemoryBufferConsumer.cs @@ -67,7 +67,7 @@ public async IAsyncEnumerable> ConsumeAsync( } // Try to pull from other partitions - IEnumerable itemsFromOtherPartition = default!; + IEnumerable itemsFromOtherPartition = null!; var hasItemFromOtherPartition = false; foreach (var t in _assignedPartitions) diff --git a/src/BufferQueue/Memory/MemoryBufferPartition.cs b/src/BufferQueue/Memory/MemoryBufferPartition.cs index 8a5ae29..921219e 100644 --- a/src/BufferQueue/Memory/MemoryBufferPartition.cs +++ b/src/BufferQueue/Memory/MemoryBufferPartition.cs @@ -116,12 +116,12 @@ private bool TryRecycleSegment( return false; } - var minConsumerPendingOffset = MinConsumerPendingOffset(); + var minConsumerReadPosition = MinConsumerReadPosition(); MemoryBufferSegment? recyclableSegment = null; for (var segment = _head; segment != _tail; segment = segment.NextSegment!) { - var wholeSegmentConsumed = segment.EndOffset < minConsumerPendingOffset; + var wholeSegmentConsumed = segment.EndOffset < minConsumerReadPosition; if (wholeSegmentConsumed) { recyclableSegment = segment; @@ -141,53 +141,47 @@ private bool TryRecycleSegment( return true; } - private MemoryBufferPartitionOffset MinConsumerPendingOffset() + private MemoryBufferPartitionOffset MinConsumerReadPosition() { - MemoryBufferPartitionOffset? minPendingOffset = null; + MemoryBufferPartitionOffset? minReadPosition = null; foreach (var reader in _consumerReaders.Values) { - var pendingOffset = reader.PendingOffset; + var readPosition = reader.ReadPosition; - if (minPendingOffset == null) + if (minReadPosition == null) { - minPendingOffset = pendingOffset; + minReadPosition = readPosition; continue; } - if (pendingOffset < minPendingOffset) + if (readPosition < minReadPosition) { - minPendingOffset = pendingOffset; + minReadPosition = readPosition; } } - return minPendingOffset ?? _head.StartOffset; + return minReadPosition ?? _head.StartOffset; } // One reader can only be used by one consumer at the same time. - private sealed class Reader + private sealed class Reader(MemoryBufferSegment currentSegment, MemoryBufferPartitionOffset currentOffset) { - private MemoryBufferSegment _currentSegment; - private MemoryBufferPartitionOffset _pendingOffset; + private MemoryBufferSegment _currentSegment = currentSegment; + private MemoryBufferPartitionOffset _readPosition = currentOffset; private int _lastReadCount; - public Reader(MemoryBufferSegment currentSegment, MemoryBufferPartitionOffset currentOffset) - { - _currentSegment = currentSegment; - _pendingOffset = currentOffset; - } - - public MemoryBufferPartitionOffset PendingOffset => _pendingOffset; + public MemoryBufferPartitionOffset ReadPosition => _readPosition; public bool TryRead(int batchSize, [NotNullWhen(true)] out IEnumerable? items) { var remainingCount = batchSize; - var pendingOffset = _pendingOffset; + var readPosition = _readPosition; var result = Enumerable.Empty(); var currentSegment = _currentSegment; while (true) { - if (currentSegment.EndOffset < pendingOffset) + if (currentSegment.EndOffset < readPosition) { if (currentSegment.NextSegment == null) { @@ -197,11 +191,11 @@ public bool TryRead(int batchSize, [NotNullWhen(true)] out IEnumerable? items currentSegment = currentSegment.NextSegment; } - var retrievalSuccess = currentSegment.TryGet(pendingOffset, remainingCount, out var segmentItems); + var retrievalSuccess = currentSegment.TryGet(readPosition, remainingCount, out var segmentItems); if (retrievalSuccess) { var length = segmentItems!.Length; - pendingOffset += (ulong)length; + readPosition += (ulong)length; remainingCount -= length; result = result.Concat(segmentItems); } @@ -240,39 +234,32 @@ public bool TryRead(int batchSize, [NotNullWhen(true)] out IEnumerable? items public void MoveNext() { - _pendingOffset += (ulong)_lastReadCount; - while (_currentSegment.EndOffset < _pendingOffset && _currentSegment.NextSegment != null) + _readPosition += (ulong)_lastReadCount; + while (_currentSegment.EndOffset < _readPosition && _currentSegment.NextSegment != null) { _currentSegment = _currentSegment.NextSegment!; } } } - private class DebugView + private class DebugView(MemoryBufferPartition partition) { - private readonly MemoryBufferPartition _partition; - - public DebugView(MemoryBufferPartition partition) - { - _partition = partition; - } - - public int PartitionId => _partition.PartitionId; + public int PartitionId => partition.PartitionId; - public ulong Capacity => _partition.Capacity; + public ulong Capacity => partition.Capacity; - public ulong Count => _partition.Count; + public ulong Count => partition.Count; - public HashSet> Consumers => _partition._consumers; + public HashSet> Consumers => partition._consumers; - public ConcurrentDictionary ConsumerReaders => _partition._consumerReaders; + public ConcurrentDictionary ConsumerReaders => partition._consumerReaders; public MemoryBufferSegment[] Segments { get { var segments = new List>(); - for (var segment = _partition._head; segment != null; segment = segment.NextSegment) + for (var segment = partition._head; segment != null; segment = segment.NextSegment) { segments.Add(segment); } diff --git a/src/BufferQueue/Memory/MemoryBufferProducer.cs b/src/BufferQueue/Memory/MemoryBufferProducer.cs index 1a7744e..0136ffb 100644 --- a/src/BufferQueue/Memory/MemoryBufferProducer.cs +++ b/src/BufferQueue/Memory/MemoryBufferProducer.cs @@ -15,7 +15,7 @@ internal sealed class MemoryBufferProducer( private uint _partitionIndex; private int _checkingCapacity; - public string TopicName { get; } = options.TopicName; + public string TopicName { get; } = options.TopicName!; public ValueTask ProduceAsync(T item) { diff --git a/src/BufferQueue/Memory/MemoryBufferSegment.cs b/src/BufferQueue/Memory/MemoryBufferSegment.cs index d666a08..626724e 100644 --- a/src/BufferQueue/Memory/MemoryBufferSegment.cs +++ b/src/BufferQueue/Memory/MemoryBufferSegment.cs @@ -116,15 +116,16 @@ public bool TryGet(MemoryBufferPartitionOffset offset, int count, [NotNullWhen(t } var writePosition = Math.Min(_publishedWritePosition, _slots.Length - 1); - var actualCount = Math.Min(count, writePosition - readPosition + 1); - var wholeSegment = readPosition == 0 && actualCount == _slots.Length; + // Number of items actually available to return (bounded by requested count and written items). + var availableCount = Math.Min(count, writePosition - readPosition + 1); + var wholeSegment = readPosition == 0 && availableCount == _slots.Length; if (wholeSegment) { items = _slots; return true; } - items = _slots[readPosition..(readPosition + actualCount)]; + items = _slots[readPosition..(readPosition + availableCount)]; return true; }