Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 45 additions & 19 deletions .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ tab_width = 4
trim_trailing_whitespace = true

# New line preferences
end_of_line = crlf
end_of_line = unset
insert_final_newline = unset

dotnet_style_null_propagation = true:suggestion
Expand All @@ -22,6 +22,7 @@ dotnet_style_operator_placement_when_wrapping = beginning_of_line
dotnet_style_object_initializer = true:suggestion
dotnet_style_coalesce_expression = true:suggestion
dotnet_style_collection_initializer = true:suggestion
dotnet_style_prefer_collection_expression = when_types_loosely_match:suggestion
dotnet_style_prefer_simplified_boolean_expressions = true:suggestion
dotnet_style_prefer_conditional_expression_over_assignment = false:silent
dotnet_style_prefer_conditional_expression_over_return = false:silent
Expand Down Expand Up @@ -81,6 +82,7 @@ csharp_style_prefer_local_over_anonymous_function = true:silent
csharp_style_prefer_extended_property_pattern = true:suggestion
csharp_style_implicit_object_creation_when_type_is_apparent = true:silent
csharp_style_prefer_tuple_swap = true:silent
csharp_style_prefer_simple_property_accessors = true:suggestion

# Field preferences
dotnet_style_readonly_field = true:suggestion
Expand All @@ -98,7 +100,7 @@ csharp_style_var_elsewhere = true:suggestion
csharp_style_var_for_built_in_types = true:suggestion
csharp_style_var_when_type_is_apparent = true:suggestion

# Expression-bodied members
# Expression-bodied members preferences
csharp_style_expression_bodied_accessors = true:silent
csharp_style_expression_bodied_constructors = false:silent
csharp_style_expression_bodied_indexers = true:silent
Expand All @@ -125,10 +127,13 @@ csharp_prefer_static_local_function = true:suggestion
csharp_preferred_modifier_order = public,private,protected,internal,static,extern,new,virtual,abstract,sealed,override,readonly,unsafe,volatile,async:silent

# Code-block preferences
csharp_style_prefer_top_level_statements = true:suggestion
csharp_style_prefer_primary_constructors = true:suggestion
csharp_prefer_braces = true:silent
csharp_prefer_simple_using_statement = true:suggestion
csharp_style_namespace_declarations = file_scoped:suggestion
csharp_style_prefer_method_group_conversion = true:silent
csharp_prefer_system_threading_lock = true:suggestion

# Expression-level preferences
csharp_prefer_simple_default_expression = true:suggestion
Expand All @@ -138,12 +143,16 @@ csharp_style_pattern_local_over_anonymous_function = true:suggestion
csharp_style_prefer_index_operator = true:suggestion
csharp_style_prefer_range_operator = true:suggestion
csharp_style_throw_expression = true:suggestion
csharp_style_unused_value_assignment_preference = discard_variable:suggestion
csharp_style_unused_value_expression_statement_preference = discard_variable:suggestion
csharp_style_unused_value_assignment_preference = discard_variable:none
csharp_style_unused_value_expression_statement_preference = discard_variable:none

# 'using' directive preferences
csharp_using_directive_placement = outside_namespace:suggestion

# Struct preferences
csharp_style_prefer_readonly_struct = true:suggestion
csharp_style_prefer_readonly_struct_member = true:suggestion

#### C# Formatting Rules ####

# New line preferences
Expand All @@ -157,6 +166,8 @@ csharp_new_line_between_query_expression_clauses = true
csharp_style_allow_embedded_statements_on_same_line_experimental = false:error
csharp_style_allow_blank_lines_between_consecutive_braces_experimental = false:error
csharp_style_allow_blank_line_after_colon_in_constructor_initializer_experimental = true:silent
csharp_style_allow_blank_line_after_token_in_conditional_expression_experimental = true:silent
csharp_style_allow_blank_line_after_token_in_arrow_expression_clause_experimental = true:silent

# Indentation preferences
csharp_indent_block_contents = true
Expand Down Expand Up @@ -193,6 +204,7 @@ csharp_space_between_square_brackets = false
# Wrapping preferences
csharp_preserve_single_line_blocks = true
csharp_preserve_single_line_statements = true
csharp_style_prefer_utf8_string_literals = true:suggestion

#### Naming styles ####

Expand All @@ -210,6 +222,14 @@ dotnet_naming_rule.non_field_members_should_be_pascal_case.severity = suggestion
dotnet_naming_rule.non_field_members_should_be_pascal_case.symbols = non_field_members
dotnet_naming_rule.non_field_members_should_be_pascal_case.style = pascal_case

dotnet_naming_rule.constant_fields_should_be_upper_case.severity = suggestion
dotnet_naming_rule.constant_fields_should_be_upper_case.symbols = constant_fields
dotnet_naming_rule.constant_fields_should_be_upper_case.style = pascal_case

dotnet_naming_symbols.constant_fields.applicable_kinds = field
dotnet_naming_symbols.constant_fields.applicable_accessibilities = *
dotnet_naming_symbols.constant_fields.required_modifiers = const

dotnet_naming_rule.private_or_internal_field_should_be_camel_case.severity = suggestion
dotnet_naming_rule.private_or_internal_field_should_be_camel_case.symbols = private_or_internal_field
dotnet_naming_rule.private_or_internal_field_should_be_camel_case.style = camel_case
Expand All @@ -226,48 +246,48 @@ dotnet_naming_rule.async_method_should_be_ends_with_async.style = ends_with_asyn

dotnet_naming_symbols.interface.applicable_kinds = interface
dotnet_naming_symbols.interface.applicable_accessibilities = public, internal, private, protected, protected_internal, private_protected
dotnet_naming_symbols.interface.required_modifiers =
dotnet_naming_symbols.interface.required_modifiers =

dotnet_naming_symbols.method.applicable_kinds = method
dotnet_naming_symbols.method.applicable_accessibilities = public
dotnet_naming_symbols.method.required_modifiers =
dotnet_naming_symbols.method.required_modifiers =

dotnet_naming_symbols.private_or_internal_field.applicable_kinds = field
dotnet_naming_symbols.private_or_internal_field.applicable_accessibilities = internal, private, private_protected
dotnet_naming_symbols.private_or_internal_field.required_modifiers =
dotnet_naming_symbols.private_or_internal_field.required_modifiers =

dotnet_naming_symbols.types.applicable_kinds = class, struct, interface, enum
dotnet_naming_symbols.types.applicable_accessibilities = public, internal, private, protected, protected_internal, private_protected
dotnet_naming_symbols.types.required_modifiers =
dotnet_naming_symbols.types.required_modifiers =

dotnet_naming_symbols.non_field_members.applicable_kinds = property, event, method
dotnet_naming_symbols.non_field_members.applicable_accessibilities = public, internal, private, protected, protected_internal, private_protected
dotnet_naming_symbols.non_field_members.required_modifiers =
dotnet_naming_symbols.non_field_members.required_modifiers =

dotnet_naming_symbols.async_method.applicable_kinds = method
dotnet_naming_symbols.async_method.applicable_accessibilities = *
dotnet_naming_symbols.async_method.required_modifiers = async

# Naming styles

dotnet_naming_style.pascal_case.required_prefix =
dotnet_naming_style.pascal_case.required_suffix =
dotnet_naming_style.pascal_case.word_separator =
dotnet_naming_style.pascal_case.required_prefix =
dotnet_naming_style.pascal_case.required_suffix =
dotnet_naming_style.pascal_case.word_separator =
dotnet_naming_style.pascal_case.capitalization = pascal_case

dotnet_naming_style.begins_with_i.required_prefix = I
dotnet_naming_style.begins_with_i.required_suffix =
dotnet_naming_style.begins_with_i.word_separator =
dotnet_naming_style.begins_with_i.required_suffix =
dotnet_naming_style.begins_with_i.word_separator =
dotnet_naming_style.begins_with_i.capitalization = pascal_case

dotnet_naming_style.camel_case.required_prefix =
dotnet_naming_style.camel_case.required_suffix =
dotnet_naming_style.camel_case.word_separator =
dotnet_naming_style.camel_case.required_prefix =
dotnet_naming_style.camel_case.required_suffix =
dotnet_naming_style.camel_case.word_separator =
dotnet_naming_style.camel_case.capitalization = camel_case

dotnet_naming_style.ends_with_async.required_prefix =
dotnet_naming_style.ends_with_async.required_prefix =
dotnet_naming_style.ends_with_async.required_suffix = Async
dotnet_naming_style.ends_with_async.word_separator =
dotnet_naming_style.ends_with_async.word_separator =
dotnet_naming_style.ends_with_async.capitalization = pascal_case

# IDE0058: Expression value is never used
Expand All @@ -278,3 +298,9 @@ dotnet_diagnostic.IDE0010.severity = none

# IDE0072: Add missing cases
dotnet_diagnostic.IDE0072.severity = none

# IDE0305: Simplify collection initialization
dotnet_diagnostic.IDE0305.severity = none

# CA1873: Avoid potentially expensive logging
dotnet_diagnostic.CA1873.severity = none
9 changes: 2 additions & 7 deletions WhiteRabbit.Messaging/Abstractions/DefaultMessagingBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,7 @@

namespace WhiteRabbit.Messaging.Abstractions;

internal class DefaultMessagingBuilder : IMessagingBuilder
internal class DefaultMessagingBuilder(IServiceCollection services) : IMessagingBuilder
{
public IServiceCollection Services { get; }

public DefaultMessagingBuilder(IServiceCollection services)
{
Services = services;
}
public IServiceCollection Services { get; } = services;
}
75 changes: 43 additions & 32 deletions WhiteRabbit.Messaging/RabbitMq/MessageManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,86 +6,97 @@

namespace WhiteRabbit.Messaging.RabbitMq;

internal class MessageManager : IMessageSender, IDisposable
internal class MessageManager : IMessageSender, IAsyncDisposable
{
private const string MaxPriorityHeader = "x-max-priority";

internal IConnection Connection { get; private set; }

internal IModel Channel { get; private set; }
internal IChannel Channel { get; private set; }

private readonly MessageManagerSettings messageManagerSettings;
private readonly QueueSettings queueSettings;

public MessageManager(MessageManagerSettings messageManagerSettings, QueueSettings queueSettings)
private MessageManager(MessageManagerSettings messageManagerSettings, QueueSettings queueSettings)
{
this.messageManagerSettings = messageManagerSettings;
this.queueSettings = queueSettings;
}

public static async Task<MessageManager> CreateAsync(MessageManagerSettings messageManagerSettings, QueueSettings queueSettings)
{
var factory = new ConnectionFactory { Uri = new Uri(messageManagerSettings.ConnectionString) };
Connection = factory.CreateConnection();
var manager = new MessageManager(messageManagerSettings, queueSettings)
{
Connection = await factory.CreateConnectionAsync().ConfigureAwait(false)
};

Channel = Connection.CreateModel();
manager.Channel = await manager.Connection.CreateChannelAsync().ConfigureAwait(false);

if (messageManagerSettings.QueuePrefetchCount > 0)
{
Channel.BasicQos(0, messageManagerSettings.QueuePrefetchCount, false);
await manager.Channel.BasicQosAsync(0, messageManagerSettings.QueuePrefetchCount, false).ConfigureAwait(false);
}

Channel.ExchangeDeclare(messageManagerSettings.ExchangeName, ExchangeType.Direct, durable: true);
await manager.Channel.ExchangeDeclareAsync(messageManagerSettings.ExchangeName, ExchangeType.Direct, durable: true).ConfigureAwait(false);

foreach (var queue in queueSettings.Queues)
foreach (var (queue, args)
in from (string Name, Type Type) queue in queueSettings.Queues
let args = new Dictionary<string, object>
{
[MaxPriorityHeader] = 10
}
select (queue, args))
{
var args = new Dictionary<string, object>
{
[MaxPriorityHeader] = 10
};

Channel.QueueDeclare(queue.Name, durable: true, exclusive: false, autoDelete: false, args);
Channel.QueueBind(queue.Name, messageManagerSettings.ExchangeName, queue.Name, null);
await manager.Channel.QueueDeclareAsync(queue.Name, durable: true, exclusive: false, autoDelete: false, args).ConfigureAwait(false);
await manager.Channel.QueueBindAsync(queue.Name, messageManagerSettings.ExchangeName, queue.Name, null).ConfigureAwait(false);
}

this.messageManagerSettings = messageManagerSettings;
this.queueSettings = queueSettings;
return manager;
}

public Task PublishAsync<T>(T message, int priority = 1) where T : class
{
var sendBytes = Encoding.UTF8.GetBytes(JsonSerializer.Serialize<object>(message, messageManagerSettings.JsonSerializerOptions ?? JsonOptions.Default));

var routingKey = queueSettings.Queues.First(q => q.Type == typeof(T)).Name;

return PublishAsync(sendBytes.AsMemory(), routingKey, priority);
}

private Task PublishAsync(ReadOnlyMemory<byte> body, string routingKey, int priority = 1)
{
var properties = Channel.CreateBasicProperties();
properties.Persistent = true;
properties.Priority = Convert.ToByte(priority);
var props = new BasicProperties
{
Persistent = true,
Priority = Convert.ToByte(priority)
};

Channel.BasicPublishAsync(messageManagerSettings.ExchangeName, routingKey, true, props, body);

Channel.BasicPublish(messageManagerSettings.ExchangeName, routingKey, properties, body);
return Task.CompletedTask;
}

public void MarkAsComplete(BasicDeliverEventArgs message) => Channel.BasicAck(message.DeliveryTag, false);

public void MarkAsRejected(BasicDeliverEventArgs message) => Channel.BasicReject(message.DeliveryTag, false);
public void MarkAsComplete(BasicDeliverEventArgs message) => Channel.BasicAckAsync(message.DeliveryTag, false);
public void MarkAsRejected(BasicDeliverEventArgs message) => Channel.BasicRejectAsync(message.DeliveryTag, false);

public void Dispose()
public async ValueTask DisposeAsync()
{
try
{
if (Channel.IsOpen)
if (Channel?.IsOpen == true)
{
Channel.Close();
await Channel.CloseAsync().ConfigureAwait(false);
}

if (Connection.IsOpen)
if (Connection?.IsOpen == true)
{
Connection.Close();
await Connection.CloseAsync().ConfigureAwait(false);
}
}
catch
{
// Ignore exceptions on dispose
}

GC.SuppressFinalize(this);
}
}
}
27 changes: 7 additions & 20 deletions WhiteRabbit.Messaging/RabbitMq/QueueListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,16 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using WhiteRabbit.Messaging.Abstractions;

namespace WhiteRabbit.Messaging.RabbitMq;

internal class QueueListener<T> : BackgroundService where T : class
internal class QueueListener<T>(MessageManager messageManager, MessageManagerSettings messageManagerSettings, QueueSettings settings,
ILogger<QueueListener<T>> logger, IServiceProvider serviceProvider) : BackgroundService where T : class
{
private readonly MessageManager messageManager;
private readonly MessageManagerSettings messageManagerSettings;
private readonly ILogger logger;
private readonly IServiceProvider serviceProvider;
private readonly string queueName;

public QueueListener(MessageManager messageManager, MessageManagerSettings messageManagerSettings, QueueSettings settings, ILogger<QueueListener<T>> logger, IServiceProvider serviceProvider)
{
this.messageManager = messageManager;
this.messageManagerSettings = messageManagerSettings;
this.logger = logger;
this.serviceProvider = serviceProvider;

queueName = settings.Queues.First(q => q.Type == typeof(T)).Name;
}
private readonly ILogger logger = logger;
private readonly string queueName = settings.Queues.First(q => q.Type == typeof(T)).Name;

public override Task StartAsync(CancellationToken cancellationToken)
{
Expand All @@ -45,8 +32,8 @@ protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
stoppingToken.ThrowIfCancellationRequested();

var consumer = new EventingBasicConsumer(messageManager.Channel);
consumer.Received += async (_, message) =>
var consumer = new AsyncEventingBasicConsumer(messageManager.Channel);
consumer.ReceivedAsync += async (_, message) =>
{
try
{
Expand All @@ -71,7 +58,7 @@ protected override Task ExecuteAsync(CancellationToken stoppingToken)
stoppingToken.ThrowIfCancellationRequested();
};

messageManager.Channel.BasicConsume(queueName, autoAck: false, consumer);
messageManager.Channel.BasicConsumeAsync(queueName, false, null, false, false, null, consumer, stoppingToken);

return Task.CompletedTask;
}
Expand Down
3 changes: 2 additions & 1 deletion WhiteRabbit.Messaging/RabbitMq/RabbitMqExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ namespace WhiteRabbit.Messaging.RabbitMq;

public static class RabbitMQExtensions
{
public static IMessagingBuilder AddRabbitMq(this IServiceCollection services, Action<MessageManagerSettings> messageManagerConfiguration, Action<QueueSettings> queuesConfiguration)
public static IMessagingBuilder AddRabbitMq(this IServiceCollection services, Action<MessageManagerSettings> messageManagerConfiguration,
Action<QueueSettings> queuesConfiguration)
{
services.AddSingleton<MessageManager>();
services.AddSingleton<IMessageSender>(provider => provider.GetService<MessageManager>());
Expand Down
Loading