Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions src/CloudStructures/IRedisConnection.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
using StackExchange.Redis;

namespace CloudStructures
{
/// <summary>
/// Represents a Redis connection interface.
/// </summary>
public interface IRedisConnection

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where does this get used?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The RedisConnection class implements this interface

{
/// <summary>
/// Gets the asynchronous database accessor for Redis.
/// </summary>
IDatabaseAsync Database { get; }

/// <summary>
/// Gets the Redis transaction object.
/// </summary>
ITransaction Transaction { get; }

/// <summary>
/// Gets an array of Redis server instances connected to the current multiplexer.
/// </summary>
IServer[] Servers { get; }
}
}
4 changes: 2 additions & 2 deletions src/CloudStructures/Internals/EnumerableExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public static bool IsEmpty<T>(this IEnumerable<T> source)


/// <summary>
/// Projects each element of a sequance into new form with state.
/// Projects each element of a sequance into new form with state.
/// </summary>
/// <typeparam name="T">Element type</typeparam>
/// <typeparam name="TState">State type</typeparam>
Expand Down Expand Up @@ -50,7 +50,7 @@ public static IEnumerable<T> Materialize<T>(this IEnumerable<T>? source, bool nu
if (source is null)
{
if (nullToEmpty)
return [];
return new T[0];

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's this?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code was not compiling since it gave errors on returning an empty array like so.
So, I google searched and found this solution of returning an empty array


#if NET6_0_OR_GREATER
ArgumentNullException.ThrowIfNull(source);
Expand Down
244 changes: 136 additions & 108 deletions src/CloudStructures/RedisConnection.cs
Original file line number Diff line number Diff line change
@@ -1,129 +1,157 @@
using System.Diagnostics;
using System;
using System.Diagnostics;
using System.IO;
using System.Linq;
using CloudStructures.Converters;
using CloudStructures.Internals;
using StackExchange.Redis;

namespace CloudStructures;



/// <summary>
/// Provides connection to the server.
/// </summary>
/// <remarks>This connection needs to be used w/o destroying. Please hold as static field or static property.</remarks>
public sealed class RedisConnection
namespace CloudStructures
{
#region Properties
/// <summary>
/// Gets configuration.
/// </summary>
public RedisConfig Config { get; }


/// <summary>
/// Gets value converter.
/// </summary>
internal ValueConverter Converter { get; }


/// <summary>
/// Gets connection event handler.
/// </summary>
private IConnectionEventHandler? Handler { get; }


/// <summary>
/// Gets logger.
/// </summary>
private TextWriter? Logger { get; }


/// <summary>
/// Gets an interactive connection to a database inside redis.
/// </summary>
internal IDatabaseAsync Database
=> this.Config.Database.HasValue
? this.GetConnection().GetDatabase(this.Config.Database.Value)
: this.GetConnection().GetDatabase();


/// <summary>
/// Gets a transaction.
/// </summary>
internal ITransaction Transaction
=> ((IDatabase)this.Database).CreateTransaction();


/// <summary>
/// Gets target servers.
/// Provides connection to the server.
/// </summary>
internal IServer[] Servers
=> this.Config.Options
.EndPoints
.Select(this.GetConnection(), static (x, c) => c.GetServer(x))
.ToArray();
#endregion
/// <remarks>This connection needs to be used w/o destroying. Please hold as static field or static property.</remarks>
public sealed class RedisConnection
{
#region Properties
/// <summary>
/// Gets configuration.
/// </summary>
public RedisConfig? Config { get; }

/// <summary>
/// Gets value converter.
/// </summary>
internal ValueConverter Converter { get; }


/// <summary>
/// Gets connection event handler.
/// </summary>
private IConnectionEventHandler? Handler { get; }


/// <summary>
/// Gets logger.
/// </summary>
private TextWriter? Logger { get; }


/// <summary>
/// Gets an interactive connection to a database inside redis.
/// </summary>
internal IDatabaseAsync Database => _database ?? (this.Config?.Database.HasValue ?? false
? this.GetConnection().GetDatabase(this.Config.Database.Value)
: this.GetConnection().GetDatabase());

/// <summary>
/// Gets a transaction.
/// </summary>
public ITransaction Transaction => ((IDatabase)Database).CreateTransaction();

/// <summary>
/// Gets target servers.
/// </summary>
public IServer[] Servers
{
get
{
if (this.Config != null)
{
return this.Config.Options
.EndPoints
.Select(this.GetConnection(), static (x, c) => c.GetServer(x))
.ToArray();
}
else
{
var multiplexer = Database.Multiplexer;
var endpoints = multiplexer.GetEndPoints();

return endpoints.Select(endpoint => multiplexer.GetServer(endpoint))
.ToArray();
}
}
}
#endregion


#region Constructors
/// <summary>
/// Creates instance with RedisConfig.
/// </summary>
/// <param name="config"></param>
/// <param name="converter">If null, use <see cref="SystemTextJsonConverter"/> as default.</param>
/// <param name="handler"></param>
/// <param name="logger"></param>
public RedisConnection(RedisConfig config, IValueConverter? converter = null, IConnectionEventHandler? handler = null, TextWriter? logger = null)
{
this.Config = config;
this.Converter = new(converter);
this.Handler = handler;
this.Logger = logger;
}

#region Constructors
/// <summary>
/// Creates instance.
/// </summary>
/// <param name="config"></param>
/// <param name="converter">If null, use <see cref="SystemTextJsonConverter"/> as default.</param>
/// <param name="handler"></param>
/// <param name="logger"></param>
public RedisConnection(RedisConfig config, IValueConverter? converter = null, IConnectionEventHandler? handler = null, TextWriter? logger = null)
{
this.Config = config;
this.Converter = new(converter);
this.Handler = handler;
this.Logger = logger;
}
#endregion
/// <summary>
/// Creates a RedisConnection instance with a pre-existing IDatabaseAsync instance.
/// </summary>
/// <param name="database">Pre-existing IDatabaseAsync instance.</param>
/// <param name="converter">Optional value converter.</param>
/// <param name="handler">Connection event handler.</param>
/// <param name="logger">Logger.</param>
public RedisConnection(IDatabaseAsync database, IValueConverter? converter = null, IConnectionEventHandler? handler = null, TextWriter? logger = null)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah okay so this is the main change in this PR then? Allowing the database to be passed in to the constructor. That makes sense.

In the code samples where you saw CloudStructures being used (both the customer one and the hello world one), I thought I saw some interactions with this class through static methods (meaning that the caller wouldn't necessarily have even called the constructor first). Is that accurate or am I misremembering?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The hello world example, I have been calling the constructor to make a connection while in the customer code, they were directly using the "GetConnection" call from the RedisConnection class. But due to lack of full code, I couldn't determine if they were making a call to the same GetConnection as present in this RedisConnection class.

{
this._database = database ?? throw new ArgumentNullException(nameof(database));
this.Converter = new ValueConverter(converter);
this.Handler = handler;
this.Logger = logger;
}
#endregion


#region Connection management
/// <summary>
/// Gets underlying connection.
/// </summary>
/// <returns></returns>
public ConnectionMultiplexer GetConnection()
{
lock (this._gate)
#region Connection management
/// <summary>
/// Gets underlying connection.
/// </summary>
/// <returns></returns>
public ConnectionMultiplexer GetConnection()
{
if (this._connection is null || !this._connection.IsConnected)
lock (this._gate)
{
try
if (this._connection is null || !this._connection.IsConnected)
{
//--- create inner connection
var stopwatch = Stopwatch.StartNew();
this._connection = ConnectionMultiplexer.Connect(this.Config.Options, this.Logger);
stopwatch.Stop();
this.Handler?.OnConnectionOpened(this, new(stopwatch.Elapsed));

//--- attach events
this._connection.ConfigurationChanged += (_, e) => this.Handler?.OnConfigurationChanged(this, e);
this._connection.ConfigurationChangedBroadcast += (_, e) => this.Handler?.OnConfigurationChangedBroadcast(this, e);
this._connection.ConnectionFailed += (_, e) => this.Handler?.OnConnectionFailed(this, e);
this._connection.ConnectionRestored += (_, e) => this.Handler?.OnConnectionRestored(this, e);
this._connection.ErrorMessage += (_, e) => this.Handler?.OnErrorMessage(this, e);
this._connection.HashSlotMoved += (_, e) => this.Handler?.OnHashSlotMoved(this, e);
this._connection.InternalError += (_, e) => this.Handler?.OnInternalError(this, e);
this._connection.ServerMaintenanceEvent += (_, e) => this.Handler?.OnServerMaintenanceEvent(this, e);
}
catch
{
this._connection = null;
throw;
try
{
//--- create inner connection
var stopwatch = Stopwatch.StartNew();
this._connection = ConnectionMultiplexer.Connect(this.Config!.Options, this.Logger);
stopwatch.Stop();
this.Handler?.OnConnectionOpened(this, new(stopwatch.Elapsed));

//--- attach events
this._connection.ConfigurationChanged += (_, e) => this.Handler?.OnConfigurationChanged(this, e);
this._connection.ConfigurationChangedBroadcast += (_, e) => this.Handler?.OnConfigurationChangedBroadcast(this, e);
this._connection.ConnectionFailed += (_, e) => this.Handler?.OnConnectionFailed(this, e);
this._connection.ConnectionRestored += (_, e) => this.Handler?.OnConnectionRestored(this, e);
this._connection.ErrorMessage += (_, e) => this.Handler?.OnErrorMessage(this, e);
this._connection.HashSlotMoved += (_, e) => this.Handler?.OnHashSlotMoved(this, e);
this._connection.InternalError += (_, e) => this.Handler?.OnInternalError(this, e);
this._connection.ServerMaintenanceEvent += (_, e) => this.Handler?.OnServerMaintenanceEvent(this, e);
}
catch
{
this._connection = null;
throw;
}
}
return this._connection;
}
return this._connection;
}

private readonly object _gate = new();
private ConnectionMultiplexer? _connection = null;
private readonly IDatabaseAsync? _database = null;
#endregion
}
private readonly object _gate = new();
private ConnectionMultiplexer? _connection = null;
#endregion
}
2 changes: 1 addition & 1 deletion src/CloudStructures/Structures/RedisSortedSet.cs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public Task<long> CombineAndStoreAsync(SetOperation operation, RedisSortedSet<T>
#if NETSTANDARD2_1 || NET5_0_OR_GREATER
var keys = others.Select(static x => x.Key).Append(this.Key).ToArray();
#else
var keys = others.Select(static x => x.Key).Concat([this.Key]).ToArray();
var keys = others.Select(x => x.Key).Concat(new[] { this.Key }).ToArray();
#endif
return this.Connection.Database.SortedSetCombineAndStoreAsync(operation, destination.Key, keys, weights, aggregate, flags);
}
Expand Down