From 850aaf180d2d2f5ea425f86daba278a4b8e74f71 Mon Sep 17 00:00:00 2001 From: Event Horizon <772552754@qq.com> Date: Mon, 11 Aug 2025 22:22:31 +0800 Subject: [PATCH 1/2] support bounded capacity --- README.md | 35 ++++- README.zh-CN.md | 33 +++-- samples/WebAPI/Controllers/TestController.cs | 2 + samples/WebAPI/Program.cs | 25 +++- src/BufferQueue/BufferQueue.csproj | 2 +- src/BufferQueue/IBufferProducer.cs | 2 + src/BufferQueue/Memory/MemoryBuferQueue.cs | 8 +- ...oryBufferBufferOptionsBuilderExtensions.cs | 30 ---- .../Memory/MemoryBufferConsumer.cs | 2 +- src/BufferQueue/Memory/MemoryBufferOptions.cs | 15 -- .../Memory/MemoryBufferOptionsBuilder.cs | 38 +++++ .../MemoryBufferOptionsBuilderExtensions.cs | 23 +++ .../Memory/MemoryBufferPartition.cs | 10 +- .../Memory/MemoryBufferProducer.cs | 64 ++++++++- .../Memory/MemoryBufferQueueFullException.cs | 8 ++ .../Memory/MemoryBufferQueueOptions.cs | 34 +++++ .../BufferQueue.Benchmarks/BenchmarkConfig.cs | 19 --- .../MemoryBufferQueueConsumeBenchmark.cs | 6 +- .../MemoryBufferQueueProduceBenchmark.cs | 12 +- tests/BufferQueue.Benchmarks/Program.cs | 8 +- .../BufferQueue.Tests.csproj | 3 +- .../Memory/MemoryBufferQueueTests.cs | 131 +++++++++++++++--- ...yBufferServiceCollectionExtensionsTests.cs | 84 ++++++++--- .../PushConsumer/PushConsumerTests.cs | 22 ++- 24 files changed, 472 insertions(+), 144 deletions(-) delete mode 100644 src/BufferQueue/Memory/MemoryBufferBufferOptionsBuilderExtensions.cs delete mode 100644 src/BufferQueue/Memory/MemoryBufferOptions.cs create mode 100644 src/BufferQueue/Memory/MemoryBufferOptionsBuilder.cs create mode 100644 src/BufferQueue/Memory/MemoryBufferOptionsBuilderExtensions.cs create mode 100644 src/BufferQueue/Memory/MemoryBufferQueueFullException.cs create mode 100644 src/BufferQueue/Memory/MemoryBufferQueueOptions.cs delete mode 100644 tests/BufferQueue.Benchmarks/BenchmarkConfig.cs diff --git a/README.md b/README.md index 0d947cc..7938ff0 100644 --- a/README.md +++ b/README.md @@ -71,21 +71,39 @@ BufferQueue supports two consumption modes: pull mode and push mode. Pull mode consumer example: ```csharp - -builder.Services.AddBufferQueue(options => +builder.Services.AddBufferQueue(bufferOptionsBuilder => { - options.UseMemory(bufferOptions => + bufferOptionsBuilder + .UseMemory(memoryBufferOptionsBuilder => { // Each pair of Topic and data type corresponds to an independent buffer, and partitionNumber can be set - bufferOptions.AddTopic("topic-foo1", partitionNumber: 6); - bufferOptions.AddTopic("topic-foo2", partitionNumber: 4); - bufferOptions.AddTopic("topic-bar", partitionNumber: 8); + memoryBufferOptionsBuilder + .AddTopic(options => + { + options.TopicName = "topic-foo1"; + options.PartitionNumber = 6; + }) + .AddTopic(options => + { + options.TopicName = "topic-foo2"; + options.PartitionNumber = 4; + }) + .AddTopic(options => + { + options.TopicName = "topic-bar"; + options.PartitionNumber = 8; + // 可以设置缓冲区的最大容量 + options.BoundedCapacity = 100_000; + }); }) // Add push mode consumers, // scan the specified assembly for classes marked with // BufferPushCustomerAttribute and register them as push mode consumers .AddPushCustomers(typeof(Program).Assembly); }); + +// Pull mode consumers can be implemented as HostedService. +builder.Services.AddHostedService(); ``` Pull mode consumer example: @@ -197,6 +215,9 @@ Producer example: Get the specified producer through the IBufferQueue service and send the data by calling the ProduceAsync method. +If bounded capacity is set, when the buffer is full, the ProduceAsync method will discard the data and throw a MemoryBufferQueueFullException. +You can use the TryProduceAsync method to check if the data was successfully sent. + ```csharp [ApiController] [Route("/api/[controller]")] @@ -223,6 +244,8 @@ public class TestController(IBufferQueue bufferQueue) : ControllerBase { var producer = bufferQueue.GetProducer("topic-bar"); await producer.ProduceAsync(bar); + // TryProduceAsync will return a boolean indicating whether the data was successfully sent. + // bool success = await producer.TryProduceAsync(bar); return Ok(); } } diff --git a/README.zh-CN.md b/README.zh-CN.md index b562fd4..8d4adf3 100644 --- a/README.zh-CN.md +++ b/README.zh-CN.md @@ -64,15 +64,30 @@ dotnet add package BufferQueue BufferQueue 支持两种消费模式:pull 模式和 push 模式。 ```csharp - -builder.Services.AddBufferQueue(options => +builder.Services.AddBufferQueue(bufferOptionsBuilder => { - options.UseMemory(bufferOptions => + bufferOptionsBuilder + .UseMemory(memoryBufferOptionsBuilder => { // 每一对 Topic 和数据类型对应一个独立的缓冲区,可以设置 partitionNumber - bufferOptions.AddTopic("topic-foo1", partitionNumber: 6); - bufferOptions.AddTopic("topic-foo2", partitionNumber: 4); - bufferOptions.AddTopic("topic-bar", partitionNumber: 8); + memoryBufferOptionsBuilder + .AddTopic(options => + { + options.TopicName = "topic-foo1"; + options.PartitionNumber = 6; + }) + .AddTopic(options => + { + options.TopicName = "topic-foo2"; + options.PartitionNumber = 4; + }) + .AddTopic(options => + { + options.TopicName = "topic-bar"; + options.PartitionNumber = 8; + // 可以设置缓冲区的最大容量 + options.BoundedCapacity = 100_000; + }); }) // 添加 push 模式的消费者 // 扫描指定程序集中的标记了 BufferPushCustomerAttribute 的类, @@ -141,9 +156,7 @@ push consumer 会被注册到 DI 容器中,可以通过构造函数注入其 BufferPushCustomerAttribute 中的 concurrency 参数用于设置 push consumer 的消费并发数,对应 pull consumer 的 consumerNumber。 - ```csharp - [BufferPushCustomer( topicName: "topic-foo2", groupName: "group-foo2", @@ -194,6 +207,8 @@ Producer 示例: 通过 IBufferQueue 获取到指定的 Producer,然后调用 ProduceAsync 方法发送数据。 +如果设置了 BoundedCapacity,当缓冲区满时,ProduceAsync 方法会丢弃数据并抛出 MemoryBufferQueueFullException。可以使用 TryProduceAsync 方法来检查数据是否成功发送。 + ```csharp [ApiController] [Route("/api/[controller]")] @@ -220,6 +235,8 @@ public class TestController(IBufferQueue bufferQueue) : ControllerBase { var producer = bufferQueue.GetProducer("topic-bar"); await producer.ProduceAsync(bar); + // TryProduceAsync 会返回一个布尔值,表示数据是否成功发送 + // bool success = await producer.TryProduceAsync(bar); return Ok(); } } diff --git a/samples/WebAPI/Controllers/TestController.cs b/samples/WebAPI/Controllers/TestController.cs index fa06f59..b8bed07 100644 --- a/samples/WebAPI/Controllers/TestController.cs +++ b/samples/WebAPI/Controllers/TestController.cs @@ -29,6 +29,8 @@ public async Task PostBar([FromBody] Bar bar) { var producer = bufferQueue.GetProducer("topic-bar"); await producer.ProduceAsync(bar); + // TryProduceAsync can be used if you want to check if the item was produced successfully + // bool success = await producer.TryProduceAsync(bar); return Ok(); } } diff --git a/samples/WebAPI/Program.cs b/samples/WebAPI/Program.cs index 9f7fb8f..f9c3abb 100644 --- a/samples/WebAPI/Program.cs +++ b/samples/WebAPI/Program.cs @@ -14,13 +14,28 @@ builder.Services.AddSwaggerGen(); // Configure the BufferQueue -builder.Services.AddBufferQueue(options => +builder.Services.AddBufferQueue(bufferOptionsBuilder => { - options.UseMemory(bufferOptions => + bufferOptionsBuilder + .UseMemory(memoryBufferOptionsBuilder => { - bufferOptions.AddTopic("topic-foo1", partitionNumber: 6); - bufferOptions.AddTopic("topic-foo2", partitionNumber: 4); - bufferOptions.AddTopic("topic-bar", partitionNumber: 8); + memoryBufferOptionsBuilder + .AddTopic(options => + { + options.TopicName = "topic-foo1"; + options.PartitionNumber = 6; + }) + .AddTopic(options => + { + options.TopicName = "topic-foo2"; + options.PartitionNumber = 4; + }) + .AddTopic(options => + { + options.TopicName = "topic-bar"; + options.PartitionNumber = 8; + options.BoundedCapacity = 100_000; // Set a bounded capacity for the Bar topic + }); }) .AddPushCustomers(typeof(Program).Assembly); }); diff --git a/src/BufferQueue/BufferQueue.csproj b/src/BufferQueue/BufferQueue.csproj index 10c1d70..ddb0244 100644 --- a/src/BufferQueue/BufferQueue.csproj +++ b/src/BufferQueue/BufferQueue.csproj @@ -1,10 +1,10 @@ + net8.0 enable latest true - net6.0;net7.0;net8.0 diff --git a/src/BufferQueue/IBufferProducer.cs b/src/BufferQueue/IBufferProducer.cs index df90dda..ca6df4c 100644 --- a/src/BufferQueue/IBufferProducer.cs +++ b/src/BufferQueue/IBufferProducer.cs @@ -10,4 +10,6 @@ public interface IBufferProducer string TopicName { get; } ValueTask ProduceAsync(T item); + + ValueTask TryProduceAsync(T item); } diff --git a/src/BufferQueue/Memory/MemoryBuferQueue.cs b/src/BufferQueue/Memory/MemoryBuferQueue.cs index b67994c..19705c2 100644 --- a/src/BufferQueue/Memory/MemoryBuferQueue.cs +++ b/src/BufferQueue/Memory/MemoryBuferQueue.cs @@ -11,6 +11,7 @@ internal sealed class MemoryBufferQueue : IBufferQueue { private readonly MemoryBufferPartition[] _partitions; private readonly int _partitionNumber; + private readonly int _segmentSize = 1024; private readonly IBufferProducer _producer; @@ -19,8 +20,11 @@ internal sealed class MemoryBufferQueue : IBufferQueue private readonly object _consumersLock; private readonly Dictionary>> _consumers; - public MemoryBufferQueue(string topicName, int partitionNumber) + public MemoryBufferQueue(MemoryBufferQueueOptions options) { + var topicName = options.TopicName; + var partitionNumber = options.PartitionNumber; + TopicName = topicName; _partitionNumber = partitionNumber; _partitions = new MemoryBufferPartition[partitionNumber]; @@ -29,7 +33,7 @@ public MemoryBufferQueue(string topicName, int partitionNumber) _partitions[i] = new MemoryBufferPartition(i); } - _producer = new MemoryBufferProducer(topicName, _partitions); + _producer = new MemoryBufferProducer(options, _partitions); _consumers = new Dictionary>>(); _consumersLock = new object(); diff --git a/src/BufferQueue/Memory/MemoryBufferBufferOptionsBuilderExtensions.cs b/src/BufferQueue/Memory/MemoryBufferBufferOptionsBuilderExtensions.cs deleted file mode 100644 index d6f7281..0000000 --- a/src/BufferQueue/Memory/MemoryBufferBufferOptionsBuilderExtensions.cs +++ /dev/null @@ -1,30 +0,0 @@ -// Licensed to the .NET Core Community under one or more agreements. -// The .NET Core Community licenses this file to you under the MIT license. - -using System; -using BufferQueue.Memory; - -namespace BufferQueue; - -public static class MemoryBufferBufferOptionsBuilderExtensions -{ - public static BufferOptionsBuilder UseMemory( - this BufferOptionsBuilder builder, - Action configure) - { - if (builder is null) - { - throw new ArgumentNullException(nameof(builder)); - } - - if (configure is null) - { - throw new ArgumentNullException(nameof(configure)); - } - - var options = new MemoryBufferOptions(builder.Services); - configure(options); - - return builder; - } -} diff --git a/src/BufferQueue/Memory/MemoryBufferConsumer.cs b/src/BufferQueue/Memory/MemoryBufferConsumer.cs index 4710c86..baa0c74 100644 --- a/src/BufferQueue/Memory/MemoryBufferConsumer.cs +++ b/src/BufferQueue/Memory/MemoryBufferConsumer.cs @@ -25,7 +25,7 @@ internal sealed class MemoryBufferConsumer : IBufferPullConsumer public MemoryBufferConsumer(BufferPullConsumerOptions options) { _options = options; - _assignedPartitions = Array.Empty>(); + _assignedPartitions = []; _pendingDataValueTaskSource = new PendingDataValueTaskSource>(); _pendingDataVersion = 0; _pendingDataLock = new ReaderWriterLockSlim(); diff --git a/src/BufferQueue/Memory/MemoryBufferOptions.cs b/src/BufferQueue/Memory/MemoryBufferOptions.cs deleted file mode 100644 index 33ca8d0..0000000 --- a/src/BufferQueue/Memory/MemoryBufferOptions.cs +++ /dev/null @@ -1,15 +0,0 @@ -// Licensed to the .NET Core Community under one or more agreements. -// The .NET Core Community licenses this file to you under the MIT license. - -using Microsoft.Extensions.DependencyInjection; - -namespace BufferQueue.Memory; - -public class MemoryBufferOptions(IServiceCollection services) -{ - public MemoryBufferOptions AddTopic(string topicName, int partitionNumber) - { - services.AddKeyedSingleton>(topicName, new MemoryBufferQueue(topicName, partitionNumber)); - return this; - } -} diff --git a/src/BufferQueue/Memory/MemoryBufferOptionsBuilder.cs b/src/BufferQueue/Memory/MemoryBufferOptionsBuilder.cs new file mode 100644 index 0000000..828b48c --- /dev/null +++ b/src/BufferQueue/Memory/MemoryBufferOptionsBuilder.cs @@ -0,0 +1,38 @@ +// Licensed to the .NET Core Community under one or more agreements. +// The .NET Core Community licenses this file to you under the MIT license. + +using System; +using Microsoft.Extensions.DependencyInjection; + +namespace BufferQueue.Memory; + +public class MemoryBufferOptionsBuilder(IServiceCollection services) +{ + public MemoryBufferOptionsBuilder AddTopic( + Action configure) + where T : notnull + { + ArgumentNullException.ThrowIfNull(configure); + + var options = new MemoryBufferQueueOptions(); + configure(options); + + var topicName = options.TopicName; + var partitionNumber = options.PartitionNumber; + + if (string.IsNullOrWhiteSpace(topicName)) + { + throw new ArgumentException("Topic name cannot be null or whitespace.", nameof(options.TopicName)); + } + + if (partitionNumber <= 0) + { + throw new ArgumentOutOfRangeException(nameof(options.PartitionNumber), + "Partition number must be greater than zero."); + } + + services.AddKeyedSingleton>( + topicName, new MemoryBufferQueue(options)); + return this; + } +} diff --git a/src/BufferQueue/Memory/MemoryBufferOptionsBuilderExtensions.cs b/src/BufferQueue/Memory/MemoryBufferOptionsBuilderExtensions.cs new file mode 100644 index 0000000..0529478 --- /dev/null +++ b/src/BufferQueue/Memory/MemoryBufferOptionsBuilderExtensions.cs @@ -0,0 +1,23 @@ +// Licensed to the .NET Core Community under one or more agreements. +// The .NET Core Community licenses this file to you under the MIT license. + +using System; +using BufferQueue.Memory; + +namespace BufferQueue; + +public static class MemoryBufferOptionsBuilderExtensions +{ + public static BufferOptionsBuilder UseMemory( + this BufferOptionsBuilder builder, + Action configure) + { + ArgumentNullException.ThrowIfNull(builder); + ArgumentNullException.ThrowIfNull(configure); + + var options = new MemoryBufferOptionsBuilder(builder.Services); + configure(options); + + return builder; + } +} diff --git a/src/BufferQueue/Memory/MemoryBufferPartition.cs b/src/BufferQueue/Memory/MemoryBufferPartition.cs index bf01ec0..2fb87fb 100644 --- a/src/BufferQueue/Memory/MemoryBufferPartition.cs +++ b/src/BufferQueue/Memory/MemoryBufferPartition.cs @@ -15,7 +15,7 @@ namespace BufferQueue.Memory; internal sealed class MemoryBufferPartition { // internal for test - internal readonly int _segmentLength; + internal readonly int _segmentSize; private volatile MemoryBufferSegment _head; private volatile MemoryBufferSegment _tail; @@ -26,11 +26,11 @@ internal sealed class MemoryBufferPartition private readonly object _createSegmentLock; - public MemoryBufferPartition(int id, int segmentLength = 1024) + public MemoryBufferPartition(int id, int segmentSize = 1024) { - _segmentLength = segmentLength; + _segmentSize = segmentSize; PartitionId = id; - _head = _tail = new MemoryBufferSegment(_segmentLength, default); + _head = _tail = new MemoryBufferSegment(_segmentSize, default); _consumerReaders = new ConcurrentDictionary(); _consumers = new HashSet>(); @@ -79,7 +79,7 @@ public void Enqueue(T item) var newSegmentStartOffset = tail.EndOffset + 1; var newSegment = TryRecycleSegment(newSegmentStartOffset, out var recycledSegment) ? recycledSegment - : new MemoryBufferSegment(_segmentLength, newSegmentStartOffset); + : new MemoryBufferSegment(_segmentSize, newSegmentStartOffset); tail.NextSegment = newSegment; _tail = newSegment; } diff --git a/src/BufferQueue/Memory/MemoryBufferProducer.cs b/src/BufferQueue/Memory/MemoryBufferProducer.cs index a185b3e..1a7744e 100644 --- a/src/BufferQueue/Memory/MemoryBufferProducer.cs +++ b/src/BufferQueue/Memory/MemoryBufferProducer.cs @@ -1,25 +1,83 @@ // Licensed to the .NET Core Community under one or more agreements. // The .NET Core Community licenses this file to you under the MIT license. +using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; namespace BufferQueue.Memory; -internal sealed class MemoryBufferProducer(string topicName, MemoryBufferPartition[] partitions) +internal sealed class MemoryBufferProducer( + MemoryBufferQueueOptions options, + MemoryBufferPartition[] partitions) : IBufferProducer { private uint _partitionIndex; + private int _checkingCapacity; - public string TopicName { get; } = topicName; + public string TopicName { get; } = options.TopicName; public ValueTask ProduceAsync(T item) + { + if (!TryEnqueue(item)) + { + throw new MemoryBufferQueueFullException( + $"The queue '{TopicName}' is full, and the item cannot be produced."); + } + return ValueTask.CompletedTask; + } + + public ValueTask TryProduceAsync(T item) + { + var succeeded = TryEnqueue(item); + return new ValueTask(succeeded); + } + + private bool TryEnqueue(T item) + { + if (!options.BoundedCapacity.HasValue) + { + Enqueue(item); + return true; + } + + while (true) + { + if (Interlocked.CompareExchange(ref _checkingCapacity, 1, 0) != 0) + { + continue; + } + + try + { + var count = 0UL; + foreach (var partition in partitions) + { + count += partition.Count; + if (count >= options.BoundedCapacity.Value) + { + return false; + } + } + + Enqueue(item); + return true; + } + finally + { + Interlocked.Exchange(ref _checkingCapacity, 0); + } + } + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private void Enqueue(T item) { var partition = SelectPartition(); partition.Enqueue(item); - return ValueTask.CompletedTask; } + [MethodImpl(MethodImplOptions.AggressiveInlining)] private MemoryBufferPartition SelectPartition() { var index = (Interlocked.Increment(ref _partitionIndex) - 1) % partitions.Length; diff --git a/src/BufferQueue/Memory/MemoryBufferQueueFullException.cs b/src/BufferQueue/Memory/MemoryBufferQueueFullException.cs new file mode 100644 index 0000000..61777b3 --- /dev/null +++ b/src/BufferQueue/Memory/MemoryBufferQueueFullException.cs @@ -0,0 +1,8 @@ +// Licensed to the .NET Core Community under one or more agreements. +// The .NET Core Community licenses this file to you under the MIT license. + +using System; + +namespace BufferQueue.Memory; + +public class MemoryBufferQueueFullException(string message) : Exception(message); diff --git a/src/BufferQueue/Memory/MemoryBufferQueueOptions.cs b/src/BufferQueue/Memory/MemoryBufferQueueOptions.cs new file mode 100644 index 0000000..df7bc65 --- /dev/null +++ b/src/BufferQueue/Memory/MemoryBufferQueueOptions.cs @@ -0,0 +1,34 @@ +// Licensed to the .NET Core Community under one or more agreements. +// The .NET Core Community licenses this file to you under the MIT license. + +using System; + +namespace BufferQueue.Memory; + +public class MemoryBufferQueueOptions +{ + /// + /// The topic name for the buffer queue. + /// + public string? TopicName { get; set; } + + /// + /// The number of partitions for the topic. Default is 1. + /// + public int PartitionNumber { get; set; } = 1; + + /// + /// The segment size for each segment. Default is 1024. + /// + public long SegmentSize { get; set; } = 1024; + + /// + /// The maximum capacity of the bounded memory buffer queue. Default is null, which means unbounded. + /// + /// + /// If set, will throw a + /// when the queue is full, and + /// will return false when the queue is full. + /// + public ulong? BoundedCapacity { get; set; } +} diff --git a/tests/BufferQueue.Benchmarks/BenchmarkConfig.cs b/tests/BufferQueue.Benchmarks/BenchmarkConfig.cs deleted file mode 100644 index 32f7691..0000000 --- a/tests/BufferQueue.Benchmarks/BenchmarkConfig.cs +++ /dev/null @@ -1,19 +0,0 @@ -// Licensed to the .NET Core Community under one or more agreements. -// The .NET Core Community licenses this file to you under the MIT license. - -using BenchmarkDotNet.Configs; -using BenchmarkDotNet.Diagnosers; - -namespace BufferQueue.Benchmarks; - -public class BenchmarkConfig : ManualConfig -{ - public BenchmarkConfig() - { - Add(DefaultConfig.Instance); - AddDiagnoser(MemoryDiagnoser.Default); - - ArtifactsPath = Path.Combine(AppContext.BaseDirectory, "artifacts", - DateTime.Now.ToString("yyyy-mm-dd_hh-MM-ss")); - } -} diff --git a/tests/BufferQueue.Benchmarks/MemoryBufferQueueConsumeBenchmark.cs b/tests/BufferQueue.Benchmarks/MemoryBufferQueueConsumeBenchmark.cs index 3c3e2cb..e181059 100644 --- a/tests/BufferQueue.Benchmarks/MemoryBufferQueueConsumeBenchmark.cs +++ b/tests/BufferQueue.Benchmarks/MemoryBufferQueueConsumeBenchmark.cs @@ -20,7 +20,11 @@ public class MemoryBufferQueueConsumeBenchmark public void Setup() { _blockingCollection = new BlockingCollection(); - _memoryBufferQueue = new MemoryBufferQueue("test", Environment.ProcessorCount); + _memoryBufferQueue = new MemoryBufferQueue(new MemoryBufferQueueOptions + { + TopicName = "test", + PartitionNumber = Environment.ProcessorCount + }); var producer = _memoryBufferQueue.GetProducer(); for (var i = 0; i < MessageSize; i++) diff --git a/tests/BufferQueue.Benchmarks/MemoryBufferQueueProduceBenchmark.cs b/tests/BufferQueue.Benchmarks/MemoryBufferQueueProduceBenchmark.cs index 3eabff6..daa0aa5 100644 --- a/tests/BufferQueue.Benchmarks/MemoryBufferQueueProduceBenchmark.cs +++ b/tests/BufferQueue.Benchmarks/MemoryBufferQueueProduceBenchmark.cs @@ -38,7 +38,11 @@ public async Task BlockingCollection_Concurrent_Producing() [Benchmark] public async Task MemoryBufferQueue_Concurrent_Producing_Partition_1() { - var queue = new MemoryBufferQueue("test", 1); + var queue = new MemoryBufferQueue(new MemoryBufferQueueOptions + { + TopicName = "test", + PartitionNumber = 1 + }); var producer = queue.GetProducer(); var tasks = _chunks.Select(chunk => Task.Run(async () => { @@ -58,7 +62,11 @@ public async Task MemoryBufferQueue_Concurrent_Producing_Partition_1() [Benchmark] public async Task MemoryBufferQueue_Concurrent_Producing_Partition_ProcessorCount() { - var queue = new MemoryBufferQueue("test", Environment.ProcessorCount); + var queue = new MemoryBufferQueue(new MemoryBufferQueueOptions + { + TopicName = "test", + PartitionNumber = Environment.ProcessorCount + }); var producer = queue.GetProducer(); var tasks = _chunks.Select(chunk => Task.Run(async () => { diff --git a/tests/BufferQueue.Benchmarks/Program.cs b/tests/BufferQueue.Benchmarks/Program.cs index 97d6a5a..5870ab1 100644 --- a/tests/BufferQueue.Benchmarks/Program.cs +++ b/tests/BufferQueue.Benchmarks/Program.cs @@ -1,13 +1,19 @@ // Licensed to the .NET Core Community under one or more agreements. // The .NET Core Community licenses this file to you under the MIT license. +using BenchmarkDotNet.Configs; +using BenchmarkDotNet.Diagnosers; using BenchmarkDotNet.Running; using BufferQueue.Benchmarks; +var config = ManualConfig + .Create(DefaultConfig.Instance) + .AddDiagnoser(MemoryDiagnoser.Default); + var allBenchmarks = new[] { typeof(MemoryBufferQueueProduceBenchmark), typeof(MemoryBufferQueueConsumeBenchmark), }; -new BenchmarkSwitcher(allBenchmarks).Run(args, new BenchmarkConfig()); +new BenchmarkSwitcher(allBenchmarks).Run(args, config); diff --git a/tests/BufferQueue.Tests/BufferQueue.Tests.csproj b/tests/BufferQueue.Tests/BufferQueue.Tests.csproj index 68cd8f8..9f2cfeb 100644 --- a/tests/BufferQueue.Tests/BufferQueue.Tests.csproj +++ b/tests/BufferQueue.Tests/BufferQueue.Tests.csproj @@ -1,12 +1,11 @@ + net8.0 enable enable - false true - net8.0;net6.0;net7.0 diff --git a/tests/BufferQueue.Tests/Memory/MemoryBufferQueueTests.cs b/tests/BufferQueue.Tests/Memory/MemoryBufferQueueTests.cs index 27967ca..0a56396 100644 --- a/tests/BufferQueue.Tests/Memory/MemoryBufferQueueTests.cs +++ b/tests/BufferQueue.Tests/Memory/MemoryBufferQueueTests.cs @@ -8,12 +8,16 @@ namespace BufferQueue.Tests.Memory; public class MemoryBufferQueueTests { - private static int MemoryBufferPartitionSegmentLength => new MemoryBufferPartition(0)._segmentLength; + private static int MemoryBufferPartitionSegmentSize => new MemoryBufferPartition(0)._segmentSize; [Fact] public async Task Produce_And_Consume() { - var queue = new MemoryBufferQueue("test", 1); + var queue = new MemoryBufferQueue(new MemoryBufferQueueOptions + { + TopicName = "test", + PartitionNumber = 1 + }); var producer = queue.GetProducer(); var consumer = queue.CreateConsumer(new BufferPullConsumerOptions { @@ -54,7 +58,11 @@ public async Task Produce_And_Consume() [Fact] public async Task Produce_And_Consume_AutoCommit() { - var queue = new MemoryBufferQueue("test", 1); + var queue = new MemoryBufferQueue(new MemoryBufferQueueOptions + { + TopicName = "test", + PartitionNumber = 1 + }); var producer = queue.GetProducer(); var consumer = queue.CreateConsumer( new BufferPullConsumerOptions @@ -90,7 +98,11 @@ public async Task Produce_And_Consume_AutoCommit() [Fact] public async Task Produce_And_Consume_With_Multiple_Partitions() { - var queue = new MemoryBufferQueue("test", 2); + var queue = new MemoryBufferQueue(new MemoryBufferQueueOptions + { + TopicName = "test", + PartitionNumber = 2 + }); var producer = queue.GetProducer(); var consumer = queue.CreateConsumer( new BufferPullConsumerOptions @@ -131,7 +143,11 @@ public async Task Produce_And_Consume_With_Multiple_Partitions() [Fact] public async Task Produce_And_Consume_With_Multiple_Consumers() { - var queue = new MemoryBufferQueue("test", 2); + var queue = new MemoryBufferQueue(new MemoryBufferQueueOptions + { + TopicName = "test", + PartitionNumber = 2 + }); var producer = queue.GetProducer(); var consumers = queue .CreateConsumers( @@ -167,7 +183,11 @@ public async Task Produce_And_Consume_With_Multiple_Consumers() [Fact] public void Throw_If_Wrong_Consumer_Number() { - var queue = new MemoryBufferQueue("test", 2); + var queue = new MemoryBufferQueue(new MemoryBufferQueueOptions + { + TopicName = "test", + PartitionNumber = 2 + }); Assert.Throws(() => queue.CreateConsumers( new BufferPullConsumerOptions @@ -194,7 +214,11 @@ public void Throw_If_Wrong_Consumer_Number() [Fact] public async Task Offset_Will_Not_Change_If_Consumer_Not_Commit() { - var queue = new MemoryBufferQueue("test", 1); + var queue = new MemoryBufferQueue(new MemoryBufferQueueOptions + { + TopicName = "test", + PartitionNumber = 1 + }); var producer = queue.GetProducer(); var consumer = queue.CreateConsumer( new BufferPullConsumerOptions @@ -238,7 +262,11 @@ public async Task Offset_Will_Not_Change_If_Consumer_Not_Commit() [Fact] public async Task Consumer_Will_Wait_Until_Produce() { - var queue = new MemoryBufferQueue("test", 1); + var queue = new MemoryBufferQueue(new MemoryBufferQueueOptions + { + TopicName = "test", + PartitionNumber = 1 + }); var producer = queue.GetProducer(); var consumer = queue.CreateConsumer(new BufferPullConsumerOptions @@ -267,13 +295,15 @@ public async Task Consumer_Will_Wait_Until_Produce() [Fact] public void Equal_Distribution_Load_Balancing_Strategy_For_Consumers() { - var queue = new MemoryBufferQueue("test", 18); + var queue = new MemoryBufferQueue( + new MemoryBufferQueueOptions { TopicName = "test", PartitionNumber = 18 }); var assignedPartitionsFieldInfo = typeof(MemoryBufferConsumer) .GetField("_assignedPartitions", BindingFlags.Instance | BindingFlags.NonPublic)!; var group1Consumers = queue.CreateConsumers( - new BufferPullConsumerOptions { TopicName = "test", GroupName = "TestGroup1", AutoCommit = false }, 3) + new BufferPullConsumerOptions { TopicName = "test", GroupName = "TestGroup1", AutoCommit = false }, + 3) .ToList(); var group2Consumers = queue @@ -323,9 +353,13 @@ public void Equal_Distribution_Load_Balancing_Strategy_For_Consumers() [Fact] public void Concurrent_Producer_Single_Partition() { - var messageSize = MemoryBufferPartitionSegmentLength * 4; + var messageSize = MemoryBufferPartitionSegmentSize * 4; - var queue = new MemoryBufferQueue("test", 1); + var queue = new MemoryBufferQueue(new MemoryBufferQueueOptions + { + TopicName = "test", + PartitionNumber = 1 + }); var countDownEvent = new CountdownEvent(messageSize); var consumer = @@ -370,9 +404,13 @@ public void Concurrent_Producer_Single_Partition() [Fact] public void Concurrent_Producer_Multiple_Partition() { - var messageSize = MemoryBufferPartitionSegmentLength * 4; + var messageSize = MemoryBufferPartitionSegmentSize * 4; - var queue = new MemoryBufferQueue("test", Environment.ProcessorCount); + var queue = new MemoryBufferQueue(new MemoryBufferQueueOptions + { + TopicName = "test", + PartitionNumber = Environment.ProcessorCount + }); var consumer = queue.CreateConsumer(new BufferPullConsumerOptions @@ -432,11 +470,15 @@ public void Concurrent_Producer_Multiple_Partition() [InlineData(3, 10000)] public void Concurrent_Consumer_Multiple_Groups(int groupNumber, int batchSize) { - var messageSize = MemoryBufferPartitionSegmentLength * 4; + var messageSize = MemoryBufferPartitionSegmentSize * 4; var partitionNumber = Environment.ProcessorCount * 2; var consumerNumberPerGroup = Environment.ProcessorCount; - var queue = new MemoryBufferQueue("test", partitionNumber); + var queue = new MemoryBufferQueue(new MemoryBufferQueueOptions + { + TopicName = "test", + PartitionNumber = partitionNumber + }); var countdownEvent = new CountdownEvent(messageSize * groupNumber); @@ -492,11 +534,15 @@ public void Concurrent_Consumer_Multiple_Groups(int groupNumber, int batchSize) [InlineData(3)] public void Concurrent_Producer_And_Concurrent_Consumer_Multiple_Groups(int groupNumber) { - var messageSize = MemoryBufferPartitionSegmentLength * 4; + var messageSize = MemoryBufferPartitionSegmentSize * 4; var partitionNumber = Environment.ProcessorCount * 2; var consumerNumberPerGroup = Environment.ProcessorCount; - var queue = new MemoryBufferQueue("test", partitionNumber); + var queue = new MemoryBufferQueue(new MemoryBufferQueueOptions + { + TopicName = "test", + PartitionNumber = partitionNumber + }); var countdownEvent = new CountdownEvent(messageSize * groupNumber); @@ -549,4 +595,53 @@ public void Concurrent_Producer_And_Concurrent_Consumer_Multiple_Groups(int grou countdownEvent.Wait(); } + + [Fact] + public async Task Bounded_Capacity() + { + var queue = new MemoryBufferQueue(new MemoryBufferQueueOptions + { + TopicName = "test", + PartitionNumber = 3, + SegmentSize = 2, + BoundedCapacity = 10 + }); + + var producer = queue.GetProducer(); + var consumer = queue.CreateConsumer( + new BufferPullConsumerOptions + { + TopicName = "test", + GroupName = "TestGroup", + AutoCommit = true, + BatchSize = 2 + }); + bool produceResult; + for (var i = 0; i < 10; i++) + { + produceResult = await producer.TryProduceAsync(i); + Assert.True(produceResult); + } + + produceResult = await producer.TryProduceAsync(10); + Assert.False(produceResult); + + var consumedItems = new List(); + var index = 0; + await foreach (var items in consumer.ConsumeAsync()) + { + foreach (var item in items) + { + consumedItems.Add(item); + index++; + } + + if (index == 10) + { + break; + } + } + + Assert.Equal(Enumerable.Range(0, 10), consumedItems.OrderBy(x => x)); + } } diff --git a/tests/BufferQueue.Tests/Memory/MemoryBufferServiceCollectionExtensionsTests.cs b/tests/BufferQueue.Tests/Memory/MemoryBufferServiceCollectionExtensionsTests.cs index 9a77ffd..a21f019 100644 --- a/tests/BufferQueue.Tests/Memory/MemoryBufferServiceCollectionExtensionsTests.cs +++ b/tests/BufferQueue.Tests/Memory/MemoryBufferServiceCollectionExtensionsTests.cs @@ -12,21 +12,35 @@ public class MemoryBufferServiceCollectionExtensionsTests public void AddMemoryBuffer() { var services = new ServiceCollection(); - services.AddBufferQueue(options => options.UseMemory(builder => - builder - .AddTopic("topic1", 1) - .AddTopic("topic2", 2) - )); + services.AddBufferQueue(bufferOptionsBuilder => + bufferOptionsBuilder.UseMemory(memoryBufferOptionsBuilder => + memoryBufferOptionsBuilder + .AddTopic(options => + { + options.TopicName = "topic1"; + options.PartitionNumber = 1; + }) + .AddTopic(options => + { + options.TopicName = "topic2"; + options.PartitionNumber = 2; + }) + )); var provider = services.BuildServiceProvider(); var bufferQueue = provider.GetRequiredService(); var topic1Producer = bufferQueue.GetProducer("topic1"); var topic1Consumer = - bufferQueue.CreatePullConsumer(new BufferPullConsumerOptions { TopicName = "topic1", GroupName = "test" }); + bufferQueue.CreatePullConsumer(new BufferPullConsumerOptions + { + TopicName = "topic1", + GroupName = "test" + }); var topic2Producer = bufferQueue.GetProducer("topic2"); var topic2Consumers = - bufferQueue.CreatePullConsumers(new BufferPullConsumerOptions { TopicName = "topic2", GroupName = "test" }, 2) + bufferQueue.CreatePullConsumers( + new BufferPullConsumerOptions { TopicName = "topic2", GroupName = "test" }, 2) .ToList(); Assert.Equal("topic1", topic1Producer.TopicName); @@ -43,20 +57,37 @@ public void AddMemoryBuffer() public async Task No_Consumption_If_TopicName_Not_Match() { var services = new ServiceCollection(); - services.AddBufferQueue(options => options.UseMemory(builder => - builder - .AddTopic("topic1", 1) - .AddTopic("topic2", 2) - )); + services.AddBufferQueue(bufferOptionsBuilder => + bufferOptionsBuilder.UseMemory(memoryBufferOptionsBuilder => + memoryBufferOptionsBuilder + .AddTopic(options => + { + options.TopicName = "topic1"; + options.PartitionNumber = 1; + }) + .AddTopic(options => + { + options.TopicName = "topic2"; + options.PartitionNumber = 2; + }) + )); var provider = services.BuildServiceProvider(); var bufferQueue = provider.GetRequiredService(); var topic1Producer = bufferQueue.GetProducer("topic1"); var topic1Consumer = - bufferQueue.CreatePullConsumer(new BufferPullConsumerOptions { TopicName = "topic1", GroupName = "test" }); + bufferQueue.CreatePullConsumer(new BufferPullConsumerOptions + { + TopicName = "topic1", + GroupName = "test" + }); var topic2Consumer = - bufferQueue.CreatePullConsumer(new BufferPullConsumerOptions { TopicName = "topic2", GroupName = "test" }); + bufferQueue.CreatePullConsumer(new BufferPullConsumerOptions + { + TopicName = "topic2", + GroupName = "test" + }); await topic1Producer.ProduceAsync(1); @@ -81,18 +112,31 @@ public async Task No_Consumption_If_TopicName_Not_Match() public void Throw_If_TopicName_Not_Registered() { var services = new ServiceCollection(); - services.AddBufferQueue(options => options.UseMemory(builder => - builder - .AddTopic("topic1", 1) - .AddTopic("topic2", 2) - )); + services.AddBufferQueue(bufferOptionsBuilder => + bufferOptionsBuilder.UseMemory(memoryBufferOptionsBuilder => + memoryBufferOptionsBuilder + .AddTopic(options => + { + options.TopicName = "topic1"; + options.PartitionNumber = 1; + }) + .AddTopic(options => + { + options.TopicName = "topic2"; + options.PartitionNumber = 2; + }) + )); var provider = services.BuildServiceProvider(); var bufferQueue = provider.GetRequiredService(); Assert.Throws(() => bufferQueue.GetProducer("topic3")); Assert.Throws(() => - bufferQueue.CreatePullConsumer(new BufferPullConsumerOptions { TopicName = "topic3", GroupName = "test" })); + bufferQueue.CreatePullConsumer(new BufferPullConsumerOptions + { + TopicName = "topic3", + GroupName = "test" + })); Assert.Throws(() => bufferQueue.CreatePullConsumers( new BufferPullConsumerOptions { TopicName = "topic3", GroupName = "test" }, diff --git a/tests/BufferQueue.Tests/PushConsumer/PushConsumerTests.cs b/tests/BufferQueue.Tests/PushConsumer/PushConsumerTests.cs index f5ab8fd..4be89b4 100644 --- a/tests/BufferQueue.Tests/PushConsumer/PushConsumerTests.cs +++ b/tests/BufferQueue.Tests/PushConsumer/PushConsumerTests.cs @@ -22,13 +22,25 @@ public async Task Customers_Should_Be_Able_To_Consume_Messages() services .AddSingleton>(NullLogger.Instance) .AddBufferQueue( - options => + bufferOptionsBuilder => { - options.UseMemory(bufferOptions => + bufferOptionsBuilder.UseMemory(memoryBufferOptionsBuilder => { - bufferOptions.AddTopic("topic-foo1", 2); - bufferOptions.AddTopic("topic-foo2", 4); - bufferOptions.AddTopic("topic-bar", 2); + memoryBufferOptionsBuilder.AddTopic(options => + { + options.TopicName = "topic-foo1"; + options.PartitionNumber = 2; + }); + memoryBufferOptionsBuilder.AddTopic(options => + { + options.TopicName = "topic-foo2"; + options.PartitionNumber = 3; + }); + memoryBufferOptionsBuilder.AddTopic(options => + { + options.TopicName = "topic-bar"; + options.PartitionNumber = 2; + }); }) .AddPushCustomers(typeof(PushConsumerTests).Assembly); } From b4661ec9b413709ce8896f072c7a68dde56b06c7 Mon Sep 17 00:00:00 2001 From: Event Horizon <772552754@qq.com> Date: Mon, 11 Aug 2025 22:40:00 +0800 Subject: [PATCH 2/2] fix --- src/BufferQueue/BufferPullConsumerOptions.cs | 8 +++----- src/BufferQueue/Memory/MemoryBuferQueue.cs | 5 ++--- .../Memory/MemoryBufferPartition.cs | 2 +- .../Memory/MemoryBufferQueueOptions.cs | 4 +--- src/BufferQueue/Memory/MemoryBufferSegment.cs | 19 ++++++------------- .../Memory/MemoryBufferQueueTests.cs | 10 +++++----- 6 files changed, 18 insertions(+), 30 deletions(-) diff --git a/src/BufferQueue/BufferPullConsumerOptions.cs b/src/BufferQueue/BufferPullConsumerOptions.cs index 67aab2d..5e5521a 100644 --- a/src/BufferQueue/BufferPullConsumerOptions.cs +++ b/src/BufferQueue/BufferPullConsumerOptions.cs @@ -1,17 +1,15 @@ // Licensed to the .NET Core Community under one or more agreements. // The .NET Core Community licenses this file to you under the MIT license. -using System.Diagnostics.CodeAnalysis; - namespace BufferQueue; public class BufferPullConsumerOptions { - public string TopicName { get; init; } = string.Empty; + public required string TopicName { get; init; } - public string GroupName { get; init; } = string.Empty; + public required string GroupName { get; init; } - public bool AutoCommit { get; init; } = false; + public bool AutoCommit { get; init; } public int BatchSize { get; init; } = 100; } diff --git a/src/BufferQueue/Memory/MemoryBuferQueue.cs b/src/BufferQueue/Memory/MemoryBuferQueue.cs index 19705c2..0dfaad8 100644 --- a/src/BufferQueue/Memory/MemoryBuferQueue.cs +++ b/src/BufferQueue/Memory/MemoryBuferQueue.cs @@ -11,7 +11,6 @@ internal sealed class MemoryBufferQueue : IBufferQueue { private readonly MemoryBufferPartition[] _partitions; private readonly int _partitionNumber; - private readonly int _segmentSize = 1024; private readonly IBufferProducer _producer; @@ -22,7 +21,7 @@ internal sealed class MemoryBufferQueue : IBufferQueue public MemoryBufferQueue(MemoryBufferQueueOptions options) { - var topicName = options.TopicName; + var topicName = options.TopicName!; var partitionNumber = options.PartitionNumber; TopicName = topicName; @@ -30,7 +29,7 @@ public MemoryBufferQueue(MemoryBufferQueueOptions options) _partitions = new MemoryBufferPartition[partitionNumber]; for (var i = 0; i < partitionNumber; i++) { - _partitions[i] = new MemoryBufferPartition(i); + _partitions[i] = new MemoryBufferPartition(i, options.SegmentSize); } _producer = new MemoryBufferProducer(options, _partitions); diff --git a/src/BufferQueue/Memory/MemoryBufferPartition.cs b/src/BufferQueue/Memory/MemoryBufferPartition.cs index 2fb87fb..8a5ae29 100644 --- a/src/BufferQueue/Memory/MemoryBufferPartition.cs +++ b/src/BufferQueue/Memory/MemoryBufferPartition.cs @@ -26,7 +26,7 @@ internal sealed class MemoryBufferPartition private readonly object _createSegmentLock; - public MemoryBufferPartition(int id, int segmentSize = 1024) + public MemoryBufferPartition(int id, int segmentSize) { _segmentSize = segmentSize; PartitionId = id; diff --git a/src/BufferQueue/Memory/MemoryBufferQueueOptions.cs b/src/BufferQueue/Memory/MemoryBufferQueueOptions.cs index df7bc65..d7fbc31 100644 --- a/src/BufferQueue/Memory/MemoryBufferQueueOptions.cs +++ b/src/BufferQueue/Memory/MemoryBufferQueueOptions.cs @@ -1,8 +1,6 @@ // Licensed to the .NET Core Community under one or more agreements. // The .NET Core Community licenses this file to you under the MIT license. -using System; - namespace BufferQueue.Memory; public class MemoryBufferQueueOptions @@ -20,7 +18,7 @@ public class MemoryBufferQueueOptions /// /// The segment size for each segment. Default is 1024. /// - public long SegmentSize { get; set; } = 1024; + public int SegmentSize { get; set; } = 1024; /// /// The maximum capacity of the bounded memory buffer queue. Default is null, which means unbounded. diff --git a/src/BufferQueue/Memory/MemoryBufferSegment.cs b/src/BufferQueue/Memory/MemoryBufferSegment.cs index feb18b1..6592281 100644 --- a/src/BufferQueue/Memory/MemoryBufferSegment.cs +++ b/src/BufferQueue/Memory/MemoryBufferSegment.cs @@ -92,23 +92,16 @@ public MemoryBufferSegment RecycleSlots(MemoryBufferPartitionOffset startOffs return new MemoryBufferSegment(_slots, startOffset); } - private class DebugView + private class DebugView(MemoryBufferSegment segment) { - private readonly MemoryBufferSegment _segment; + public MemoryBufferPartitionOffset StartOffset => segment._startOffset; - public DebugView(MemoryBufferSegment segment) - { - _segment = segment; - } - - public MemoryBufferPartitionOffset StartOffset => _segment._startOffset; - - public MemoryBufferPartitionOffset EndOffset => _segment._endOffset; + public MemoryBufferPartitionOffset EndOffset => segment._endOffset; - public int Capacity => _segment.Capacity; + public int Capacity => segment.Capacity; - public int Count => _segment.Count; + public int Count => segment.Count; - public T[] Items => _segment._slots.Take(_segment._writePosition + 1).ToArray(); + public T[] Items => segment._slots.Take(segment._writePosition + 1).ToArray(); } } diff --git a/tests/BufferQueue.Tests/Memory/MemoryBufferQueueTests.cs b/tests/BufferQueue.Tests/Memory/MemoryBufferQueueTests.cs index 0a56396..305aa9f 100644 --- a/tests/BufferQueue.Tests/Memory/MemoryBufferQueueTests.cs +++ b/tests/BufferQueue.Tests/Memory/MemoryBufferQueueTests.cs @@ -8,7 +8,7 @@ namespace BufferQueue.Tests.Memory; public class MemoryBufferQueueTests { - private static int MemoryBufferPartitionSegmentSize => new MemoryBufferPartition(0)._segmentSize; + private static int DefaultSegmentSize => new MemoryBufferQueueOptions().SegmentSize; [Fact] public async Task Produce_And_Consume() @@ -353,7 +353,7 @@ public void Equal_Distribution_Load_Balancing_Strategy_For_Consumers() [Fact] public void Concurrent_Producer_Single_Partition() { - var messageSize = MemoryBufferPartitionSegmentSize * 4; + var messageSize = DefaultSegmentSize * 4; var queue = new MemoryBufferQueue(new MemoryBufferQueueOptions { @@ -404,7 +404,7 @@ public void Concurrent_Producer_Single_Partition() [Fact] public void Concurrent_Producer_Multiple_Partition() { - var messageSize = MemoryBufferPartitionSegmentSize * 4; + var messageSize = DefaultSegmentSize * 4; var queue = new MemoryBufferQueue(new MemoryBufferQueueOptions { @@ -470,7 +470,7 @@ public void Concurrent_Producer_Multiple_Partition() [InlineData(3, 10000)] public void Concurrent_Consumer_Multiple_Groups(int groupNumber, int batchSize) { - var messageSize = MemoryBufferPartitionSegmentSize * 4; + var messageSize = DefaultSegmentSize * 4; var partitionNumber = Environment.ProcessorCount * 2; var consumerNumberPerGroup = Environment.ProcessorCount; @@ -534,7 +534,7 @@ public void Concurrent_Consumer_Multiple_Groups(int groupNumber, int batchSize) [InlineData(3)] public void Concurrent_Producer_And_Concurrent_Consumer_Multiple_Groups(int groupNumber) { - var messageSize = MemoryBufferPartitionSegmentSize * 4; + var messageSize = DefaultSegmentSize * 4; var partitionNumber = Environment.ProcessorCount * 2; var consumerNumberPerGroup = Environment.ProcessorCount;