Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/BufferQueue/Memory/MemoryBufferConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public async IAsyncEnumerable<IEnumerable<T>> ConsumeAsync(
}

// Try to pull from other partitions
IEnumerable<T> itemsFromOtherPartition = default!;
IEnumerable<T> itemsFromOtherPartition = null!;
var hasItemFromOtherPartition = false;

foreach (var t in _assignedPartitions)
Expand Down
67 changes: 27 additions & 40 deletions src/BufferQueue/Memory/MemoryBufferPartition.cs
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,12 @@ private bool TryRecycleSegment(
return false;
}

var minConsumerPendingOffset = MinConsumerPendingOffset();
var minConsumerReadPosition = MinConsumerReadPosition();

MemoryBufferSegment<T>? recyclableSegment = null;
for (var segment = _head; segment != _tail; segment = segment.NextSegment!)
{
var wholeSegmentConsumed = segment.EndOffset < minConsumerPendingOffset;
var wholeSegmentConsumed = segment.EndOffset < minConsumerReadPosition;
if (wholeSegmentConsumed)
{
recyclableSegment = segment;
Expand All @@ -141,53 +141,47 @@ private bool TryRecycleSegment(
return true;
}

private MemoryBufferPartitionOffset MinConsumerPendingOffset()
private MemoryBufferPartitionOffset MinConsumerReadPosition()
{
MemoryBufferPartitionOffset? minPendingOffset = null;
MemoryBufferPartitionOffset? minReadPosition = null;
foreach (var reader in _consumerReaders.Values)
{
var pendingOffset = reader.PendingOffset;
var readPosition = reader.ReadPosition;

if (minPendingOffset == null)
if (minReadPosition == null)
{
minPendingOffset = pendingOffset;
minReadPosition = readPosition;
continue;
}

if (pendingOffset < minPendingOffset)
if (readPosition < minReadPosition)
{
minPendingOffset = pendingOffset;
minReadPosition = readPosition;
}
}

return minPendingOffset ?? _head.StartOffset;
return minReadPosition ?? _head.StartOffset;
}

// One reader can only be used by one consumer at the same time.
private sealed class Reader
private sealed class Reader(MemoryBufferSegment<T> currentSegment, MemoryBufferPartitionOffset currentOffset)
{
private MemoryBufferSegment<T> _currentSegment;
private MemoryBufferPartitionOffset _pendingOffset;
private MemoryBufferSegment<T> _currentSegment = currentSegment;
private MemoryBufferPartitionOffset _readPosition = currentOffset;
private int _lastReadCount;

public Reader(MemoryBufferSegment<T> currentSegment, MemoryBufferPartitionOffset currentOffset)
{
_currentSegment = currentSegment;
_pendingOffset = currentOffset;
}

public MemoryBufferPartitionOffset PendingOffset => _pendingOffset;
public MemoryBufferPartitionOffset ReadPosition => _readPosition;

public bool TryRead(int batchSize, [NotNullWhen(true)] out IEnumerable<T>? items)
{
var remainingCount = batchSize;
var pendingOffset = _pendingOffset;
var readPosition = _readPosition;
var result = Enumerable.Empty<T>();
var currentSegment = _currentSegment;

while (true)
{
if (currentSegment.EndOffset < pendingOffset)
if (currentSegment.EndOffset < readPosition)
{
if (currentSegment.NextSegment == null)
{
Expand All @@ -197,11 +191,11 @@ public bool TryRead(int batchSize, [NotNullWhen(true)] out IEnumerable<T>? items
currentSegment = currentSegment.NextSegment;
}

var retrievalSuccess = currentSegment.TryGet(pendingOffset, remainingCount, out var segmentItems);
var retrievalSuccess = currentSegment.TryGet(readPosition, remainingCount, out var segmentItems);
if (retrievalSuccess)
{
var length = segmentItems!.Length;
pendingOffset += (ulong)length;
readPosition += (ulong)length;
remainingCount -= length;
result = result.Concat(segmentItems);
}
Expand Down Expand Up @@ -240,39 +234,32 @@ public bool TryRead(int batchSize, [NotNullWhen(true)] out IEnumerable<T>? items

public void MoveNext()
{
_pendingOffset += (ulong)_lastReadCount;
while (_currentSegment.EndOffset < _pendingOffset && _currentSegment.NextSegment != null)
_readPosition += (ulong)_lastReadCount;
while (_currentSegment.EndOffset < _readPosition && _currentSegment.NextSegment != null)
{
_currentSegment = _currentSegment.NextSegment!;
}
}
}

private class DebugView
private class DebugView(MemoryBufferPartition<T> partition)
{
private readonly MemoryBufferPartition<T> _partition;

public DebugView(MemoryBufferPartition<T> partition)
{
_partition = partition;
}

public int PartitionId => _partition.PartitionId;
public int PartitionId => partition.PartitionId;

public ulong Capacity => _partition.Capacity;
public ulong Capacity => partition.Capacity;

public ulong Count => _partition.Count;
public ulong Count => partition.Count;

public HashSet<MemoryBufferConsumer<T>> Consumers => _partition._consumers;
public HashSet<MemoryBufferConsumer<T>> Consumers => partition._consumers;

public ConcurrentDictionary<string, Reader> ConsumerReaders => _partition._consumerReaders;
public ConcurrentDictionary<string, Reader> ConsumerReaders => partition._consumerReaders;

public MemoryBufferSegment<T>[] Segments
{
get
{
var segments = new List<MemoryBufferSegment<T>>();
for (var segment = _partition._head; segment != null; segment = segment.NextSegment)
for (var segment = partition._head; segment != null; segment = segment.NextSegment)
{
segments.Add(segment);
}
Expand Down
2 changes: 1 addition & 1 deletion src/BufferQueue/Memory/MemoryBufferProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ internal sealed class MemoryBufferProducer<T>(
private uint _partitionIndex;
private int _checkingCapacity;

public string TopicName { get; } = options.TopicName;
public string TopicName { get; } = options.TopicName!;

public ValueTask ProduceAsync(T item)
{
Expand Down
77 changes: 60 additions & 17 deletions src/BufferQueue/Memory/MemoryBufferSegment.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,25 @@ internal sealed class MemoryBufferSegment<T>
private readonly MemoryBufferPartitionOffset _startOffset;
private readonly MemoryBufferPartitionOffset _endOffset;
private readonly T[] _slots;
private volatile int _writePosition;
private volatile int _reservedWritePosition;
private volatile int _publishedWritePosition;

public MemoryBufferSegment(int length, MemoryBufferPartitionOffset startOffset)
{
_startOffset = startOffset;
_endOffset = startOffset + (ulong)(length - 1);
_slots = new T[length];
_writePosition = -1;
_reservedWritePosition = -1;
_publishedWritePosition = -1;
}

private MemoryBufferSegment(T[] slots, MemoryBufferPartitionOffset startOffset)
{
_startOffset = startOffset;
_endOffset = startOffset + (ulong)(slots.Length - 1);
_slots = slots;
_writePosition = -1;
_reservedWritePosition = -1;
_publishedWritePosition = -1;
}

public MemoryBufferSegment<T>? NextSegment { get; set; }
Expand All @@ -42,19 +45,58 @@ private MemoryBufferSegment(T[] slots, MemoryBufferPartitionOffset startOffset)

public int Capacity => _slots.Length;

public int Count => Math.Min(Capacity, _writePosition + 1);
public int Count => Math.Min(Capacity, _publishedWritePosition + 1);

public bool TryEnqueue(T item)
{
var writePosition = Interlocked.Increment(ref _writePosition);
if (writePosition >= _slots.Length)
while (true)
{
_writePosition = _slots.Length - 1;
return false;
}
var currentReserved = _reservedWritePosition;
var nextPosition = currentReserved + 1;
if (nextPosition >= _slots.Length)
{
// No more space to write in this segment.
return false;
}

if (Interlocked.CompareExchange(
ref _reservedWritePosition,
nextPosition,
currentReserved)
!= currentReserved)
{
// Another thread has already written to the next position, retry.
continue;
}

// Write the item to the slot.
// It's safe to write directly without locks because each position is written by at most one thread.
_slots[nextPosition] = item;

// Now we need to publish the new write position so that readers can see the new item.
while (true)
{
var currentPublished = _publishedWritePosition;
if (currentPublished >= nextPosition)
{
// Another thread has already published a position that is greater than our next position,
// which means our item is already visible to readers, no need to publish again.
break;
}

if (Interlocked.CompareExchange(
ref _publishedWritePosition,
nextPosition,
currentPublished)
== currentPublished)
{
// Successfully published the new write position, now readers can see the new item.
break;
}
}

_slots[writePosition] = item;
return true;
return true;
}
}

public bool TryGet(MemoryBufferPartitionOffset offset, int count, [NotNullWhen(true)] out T[]? items)
Expand All @@ -67,22 +109,23 @@ public bool TryGet(MemoryBufferPartitionOffset offset, int count, [NotNullWhen(t

var readPosition = (offset - _startOffset).ToInt32();

if (_writePosition < 0 || readPosition > _writePosition)
if (_publishedWritePosition < 0 || readPosition > _publishedWritePosition)
{
items = null;
return false;
}

var writePosition = Math.Min(_writePosition, _slots.Length - 1);
var actualCount = Math.Min(count, writePosition - readPosition + 1);
var wholeSegment = readPosition == 0 && actualCount == _slots.Length;
var writePosition = Math.Min(_publishedWritePosition, _slots.Length - 1);
// Number of items actually available to return (bounded by requested count and written items).
var availableCount = Math.Min(count, writePosition - readPosition + 1);
var wholeSegment = readPosition == 0 && availableCount == _slots.Length;
if (wholeSegment)
{
items = _slots;
return true;
}

items = _slots[readPosition..(readPosition + actualCount)];
items = _slots[readPosition..(readPosition + availableCount)];
return true;
}

Expand All @@ -102,6 +145,6 @@ private class DebugView(MemoryBufferSegment<T> segment)

public int Count => segment.Count;

public T[] Items => segment._slots.Take(segment._writePosition + 1).ToArray();
public T[] Items => segment._slots.Take(segment._publishedWritePosition + 1).ToArray();
}
}