From 81b384aa0faecf048f77240c6007b19adb8c5e0e Mon Sep 17 00:00:00 2001 From: rishtigupta Date: Fri, 14 Jun 2024 13:17:08 -0700 Subject: [PATCH] feat: add RedisConnection interface and allow database to create connection --- src/CloudStructures/IRedisConnection.cs | 25 ++ .../Internals/EnumerableExtensions.cs | 4 +- src/CloudStructures/RedisConnection.cs | 244 ++++++++++-------- .../Structures/RedisSortedSet.cs | 2 +- 4 files changed, 164 insertions(+), 111 deletions(-) create mode 100644 src/CloudStructures/IRedisConnection.cs diff --git a/src/CloudStructures/IRedisConnection.cs b/src/CloudStructures/IRedisConnection.cs new file mode 100644 index 0000000..b1618f0 --- /dev/null +++ b/src/CloudStructures/IRedisConnection.cs @@ -0,0 +1,25 @@ +using StackExchange.Redis; + +namespace CloudStructures +{ + /// + /// Represents a Redis connection interface. + /// + public interface IRedisConnection + { + /// + /// Gets the asynchronous database accessor for Redis. + /// + IDatabaseAsync Database { get; } + + /// + /// Gets the Redis transaction object. + /// + ITransaction Transaction { get; } + + /// + /// Gets an array of Redis server instances connected to the current multiplexer. + /// + IServer[] Servers { get; } + } +} diff --git a/src/CloudStructures/Internals/EnumerableExtensions.cs b/src/CloudStructures/Internals/EnumerableExtensions.cs index de6a4e6..7440b0a 100644 --- a/src/CloudStructures/Internals/EnumerableExtensions.cs +++ b/src/CloudStructures/Internals/EnumerableExtensions.cs @@ -22,7 +22,7 @@ public static bool IsEmpty(this IEnumerable source) /// - /// Projects each element of a sequance into new form with state. + /// Projects each element of a sequance into new form with state. /// /// Element type /// State type @@ -50,7 +50,7 @@ public static IEnumerable Materialize(this IEnumerable? source, bool nu if (source is null) { if (nullToEmpty) - return []; + return new T[0]; #if NET6_0_OR_GREATER ArgumentNullException.ThrowIfNull(source); diff --git a/src/CloudStructures/RedisConnection.cs b/src/CloudStructures/RedisConnection.cs index 30b37cb..d16dc92 100644 --- a/src/CloudStructures/RedisConnection.cs +++ b/src/CloudStructures/RedisConnection.cs @@ -1,129 +1,157 @@ -using System.Diagnostics; +using System; +using System.Diagnostics; using System.IO; using System.Linq; using CloudStructures.Converters; using CloudStructures.Internals; using StackExchange.Redis; -namespace CloudStructures; - - - -/// -/// Provides connection to the server. -/// -/// This connection needs to be used w/o destroying. Please hold as static field or static property. -public sealed class RedisConnection +namespace CloudStructures { - #region Properties - /// - /// Gets configuration. - /// - public RedisConfig Config { get; } - - - /// - /// Gets value converter. - /// - internal ValueConverter Converter { get; } - - - /// - /// Gets connection event handler. - /// - private IConnectionEventHandler? Handler { get; } - - - /// - /// Gets logger. - /// - private TextWriter? Logger { get; } - - - /// - /// Gets an interactive connection to a database inside redis. - /// - internal IDatabaseAsync Database - => this.Config.Database.HasValue - ? this.GetConnection().GetDatabase(this.Config.Database.Value) - : this.GetConnection().GetDatabase(); - - - /// - /// Gets a transaction. - /// - internal ITransaction Transaction - => ((IDatabase)this.Database).CreateTransaction(); - - /// - /// Gets target servers. + /// Provides connection to the server. /// - internal IServer[] Servers - => this.Config.Options - .EndPoints - .Select(this.GetConnection(), static (x, c) => c.GetServer(x)) - .ToArray(); - #endregion + /// This connection needs to be used w/o destroying. Please hold as static field or static property. + public sealed class RedisConnection + { + #region Properties + /// + /// Gets configuration. + /// + public RedisConfig? Config { get; } + + /// + /// Gets value converter. + /// + internal ValueConverter Converter { get; } + + + /// + /// Gets connection event handler. + /// + private IConnectionEventHandler? Handler { get; } + + + /// + /// Gets logger. + /// + private TextWriter? Logger { get; } + + + /// + /// Gets an interactive connection to a database inside redis. + /// + internal IDatabaseAsync Database => _database ?? (this.Config?.Database.HasValue ?? false + ? this.GetConnection().GetDatabase(this.Config.Database.Value) + : this.GetConnection().GetDatabase()); + + /// + /// Gets a transaction. + /// + public ITransaction Transaction => ((IDatabase)Database).CreateTransaction(); + + /// + /// Gets target servers. + /// + public IServer[] Servers + { + get + { + if (this.Config != null) + { + return this.Config.Options + .EndPoints + .Select(this.GetConnection(), static (x, c) => c.GetServer(x)) + .ToArray(); + } + else + { + var multiplexer = Database.Multiplexer; + var endpoints = multiplexer.GetEndPoints(); + return endpoints.Select(endpoint => multiplexer.GetServer(endpoint)) + .ToArray(); + } + } + } + #endregion + + + #region Constructors + /// + /// Creates instance with RedisConfig. + /// + /// + /// If null, use as default. + /// + /// + public RedisConnection(RedisConfig config, IValueConverter? converter = null, IConnectionEventHandler? handler = null, TextWriter? logger = null) + { + this.Config = config; + this.Converter = new(converter); + this.Handler = handler; + this.Logger = logger; + } - #region Constructors - /// - /// Creates instance. - /// - /// - /// If null, use as default. - /// - /// - public RedisConnection(RedisConfig config, IValueConverter? converter = null, IConnectionEventHandler? handler = null, TextWriter? logger = null) - { - this.Config = config; - this.Converter = new(converter); - this.Handler = handler; - this.Logger = logger; - } - #endregion + /// + /// Creates a RedisConnection instance with a pre-existing IDatabaseAsync instance. + /// + /// Pre-existing IDatabaseAsync instance. + /// Optional value converter. + /// Connection event handler. + /// Logger. + public RedisConnection(IDatabaseAsync database, IValueConverter? converter = null, IConnectionEventHandler? handler = null, TextWriter? logger = null) + { + this._database = database ?? throw new ArgumentNullException(nameof(database)); + this.Converter = new ValueConverter(converter); + this.Handler = handler; + this.Logger = logger; + } + #endregion - #region Connection management - /// - /// Gets underlying connection. - /// - /// - public ConnectionMultiplexer GetConnection() - { - lock (this._gate) + #region Connection management + /// + /// Gets underlying connection. + /// + /// + public ConnectionMultiplexer GetConnection() { - if (this._connection is null || !this._connection.IsConnected) + lock (this._gate) { - try + if (this._connection is null || !this._connection.IsConnected) { - //--- create inner connection - var stopwatch = Stopwatch.StartNew(); - this._connection = ConnectionMultiplexer.Connect(this.Config.Options, this.Logger); - stopwatch.Stop(); - this.Handler?.OnConnectionOpened(this, new(stopwatch.Elapsed)); - - //--- attach events - this._connection.ConfigurationChanged += (_, e) => this.Handler?.OnConfigurationChanged(this, e); - this._connection.ConfigurationChangedBroadcast += (_, e) => this.Handler?.OnConfigurationChangedBroadcast(this, e); - this._connection.ConnectionFailed += (_, e) => this.Handler?.OnConnectionFailed(this, e); - this._connection.ConnectionRestored += (_, e) => this.Handler?.OnConnectionRestored(this, e); - this._connection.ErrorMessage += (_, e) => this.Handler?.OnErrorMessage(this, e); - this._connection.HashSlotMoved += (_, e) => this.Handler?.OnHashSlotMoved(this, e); - this._connection.InternalError += (_, e) => this.Handler?.OnInternalError(this, e); - this._connection.ServerMaintenanceEvent += (_, e) => this.Handler?.OnServerMaintenanceEvent(this, e); - } - catch - { - this._connection = null; - throw; + try + { + //--- create inner connection + var stopwatch = Stopwatch.StartNew(); + this._connection = ConnectionMultiplexer.Connect(this.Config!.Options, this.Logger); + stopwatch.Stop(); + this.Handler?.OnConnectionOpened(this, new(stopwatch.Elapsed)); + + //--- attach events + this._connection.ConfigurationChanged += (_, e) => this.Handler?.OnConfigurationChanged(this, e); + this._connection.ConfigurationChangedBroadcast += (_, e) => this.Handler?.OnConfigurationChangedBroadcast(this, e); + this._connection.ConnectionFailed += (_, e) => this.Handler?.OnConnectionFailed(this, e); + this._connection.ConnectionRestored += (_, e) => this.Handler?.OnConnectionRestored(this, e); + this._connection.ErrorMessage += (_, e) => this.Handler?.OnErrorMessage(this, e); + this._connection.HashSlotMoved += (_, e) => this.Handler?.OnHashSlotMoved(this, e); + this._connection.InternalError += (_, e) => this.Handler?.OnInternalError(this, e); + this._connection.ServerMaintenanceEvent += (_, e) => this.Handler?.OnServerMaintenanceEvent(this, e); + } + catch + { + this._connection = null; + throw; + } } + return this._connection; } - return this._connection; } + + private readonly object _gate = new(); + private ConnectionMultiplexer? _connection = null; + private readonly IDatabaseAsync? _database = null; + #endregion } - private readonly object _gate = new(); - private ConnectionMultiplexer? _connection = null; - #endregion } diff --git a/src/CloudStructures/Structures/RedisSortedSet.cs b/src/CloudStructures/Structures/RedisSortedSet.cs index 210f7ca..566fcac 100644 --- a/src/CloudStructures/Structures/RedisSortedSet.cs +++ b/src/CloudStructures/Structures/RedisSortedSet.cs @@ -131,7 +131,7 @@ public Task CombineAndStoreAsync(SetOperation operation, RedisSortedSet #if NETSTANDARD2_1 || NET5_0_OR_GREATER var keys = others.Select(static x => x.Key).Append(this.Key).ToArray(); #else - var keys = others.Select(static x => x.Key).Concat([this.Key]).ToArray(); + var keys = others.Select(x => x.Key).Concat(new[] { this.Key }).ToArray(); #endif return this.Connection.Database.SortedSetCombineAndStoreAsync(operation, destination.Key, keys, weights, aggregate, flags); }