diff --git a/TheUniversalCity.RedisClient/RedisClient.cs b/TheUniversalCity.RedisClient/RedisClient.cs index 12d03e1..0df56f1 100644 --- a/TheUniversalCity.RedisClient/RedisClient.cs +++ b/TheUniversalCity.RedisClient/RedisClient.cs @@ -61,6 +61,7 @@ private RedisClient(RedisConfiguration configuration) Receiver = new DnsEndPointTCPConnector( configuration.DnsEndPoints.ToArray(), + Configuration.Logger, configuration.ReceiveBufferSize, configuration.SendBufferSize, configuration.ConnectRetry, @@ -102,7 +103,7 @@ private RedisClient(RedisConfiguration configuration) private void ReceiverEnumerator_OnConnected(DnsEndPointTCPConnector.Enumerator arg1) { #if DEBUG - Console.WriteLine(nameof(ReceiverEnumerator_OnConnected)); + Configuration.Logger(nameof(ReceiverEnumerator_OnConnected)); #endif Interlocked.CompareExchange(ref recentlyConnected, 1, 0); emergencyStartAutoEvent.Set(); @@ -147,13 +148,13 @@ public async Task StartAsync() } catch (RedisClientNotConectedException ex) { - Console.WriteLine("Start Error : " + ex.Message); + Configuration.Logger("Start Error : " + ex.Message); } finally { Interlocked.CompareExchange(ref recentlyConnected, 0, 2); ReceiverEnumerator.EndEmergency(); - Console.WriteLine("Start Emergency Disposing"); + Configuration.Logger("Start Emergency Disposing"); } //startComplete.Set(); @@ -187,7 +188,7 @@ private void ReceiverEnumerator_OnReceiverDisconnect(Exception arg1, DnsEndPoint var count = redisCompletedTasks.Count - arg2.bufferedMessageCounter; #if DEBUG - Console.WriteLine($"{nameof(ReceiverEnumerator_OnReceiverDisconnect)} : Count => {count}, taskCompletionSourcesCount => {redisCompletedTasks.Count}, arg2.bufferedMessageCounter => {arg2.bufferedMessageCounter}"); + Configuration.Logger($"{nameof(ReceiverEnumerator_OnReceiverDisconnect)} : Count => {count}, taskCompletionSourcesCount => {redisCompletedTasks.Count}, arg2.bufferedMessageCounter => {arg2.bufferedMessageCounter}"); #endif for (int i = 0; i < count; i++) { @@ -230,35 +231,36 @@ private void ReceiveWorker(object state) while (!disposedValue) { - try - { - while ((obj = RedisObjectDeterminator.Determine(redisClient.ReceiverEnumerator)) is RedisPushType pushObj) - { - if (pushObj[0].ToString() == "invalidate") - { - foreach (var item in pushObj[1] as RedisArray) - { + try { + while ((obj = RedisObjectDeterminator.Determine( + redisClient.ReceiverEnumerator +#if DEBUG + , + Configuration.Logger +#endif + + )) is RedisPushType pushObj) { + if (pushObj[0].ToString() == "invalidate") { + foreach (var item in pushObj[1] as RedisArray) { InvalidateSubscription(item); } } OnPushMessageReceived?.Invoke(pushObj); } - } - catch (Exception ex) - { + } catch (Exception ex) { OnException?.Invoke(ex, ReceiverEnumerator.Socket); continue; } #if DEBUG - Console.WriteLine($"{nameof(ReceiveWorker)} :emergencyFlag =>{ReceiverEnumerator.EmergencyFlag}, taskCompletionSourcesCount => {redisCompletedTasks.Count}, emergentTaskCompletionSourcesCount =>{emergentRedisCompletedTasks.Count}, obj=> {obj}, isnull => {obj == null}, type => {obj?.GetType().FullName}"); + Configuration.Logger($"{nameof(ReceiveWorker)} :emergencyFlag =>{ReceiverEnumerator.EmergencyFlag}, taskCompletionSourcesCount => {redisCompletedTasks.Count}, emergentTaskCompletionSourcesCount =>{emergentRedisCompletedTasks.Count}, obj=> {obj}, isnull => {obj == null}, type => {obj?.GetType().FullName}"); #endif TaskCompletionSource tcs; if (ReceiverEnumerator.EmergencyFlag) { - Console.WriteLine("Receiver Managed Thread Id => " + Thread.CurrentThread.ManagedThreadId); + Configuration.Logger("Receiver Managed Thread Id => " + Thread.CurrentThread.ManagedThreadId); tcs = emergentRedisCompletedTasks.Take(); } @@ -267,7 +269,7 @@ private void ReceiveWorker(object state) tcs = redisCompletedTasks.Take(); } #if DEBUG - Console.WriteLine($"{nameof(ReceiveWorker)} :"); + Configuration.Logger($"{nameof(ReceiveWorker)} :"); #endif tcs.TrySetResult(obj); @@ -300,7 +302,7 @@ public async Task ExecuteAsync(CancellationToken cancellationToken, var result = await ExecuteAsync(TaskCreationOptions.RunContinuationsAsynchronously, cancellationToken, segments); #if DEBUG - Console.WriteLine($"{nameof(ExecuteAsync)} : Command =>{string.Join(" ", commandParams)}, Result => {result}"); + Configuration.Logger($"{nameof(ExecuteAsync)} : Command =>{string.Join(" ", commandParams)}, Result => {result}"); #endif return result; @@ -313,7 +315,7 @@ public RedisObject Execute(CancellationToken cancellationToken, params string[] var result = ExecuteAsync(TaskCreationOptions.None, cancellationToken, segments).ConfigureAwait(false).GetAwaiter().GetResult(); #if DEBUG - Console.WriteLine($"{nameof(ExecuteAsync)} : Command =>{string.Join(" ", commandParams)}, Result => {result}"); + Configuration.Logger($"{nameof(ExecuteAsync)} : Command =>{string.Join(" ", commandParams)}, Result => {result}"); #endif return result; @@ -333,7 +335,7 @@ public async Task ExecuteAsync(TaskCreationOptions taskCreationOpti var result = ReceiverEnumerator.SendData(commandParams, (transferredBytes) => { #if DEBUG - Console.WriteLine($"{nameof(ExecuteAsync)} : "); + Configuration.Logger($"{nameof(ExecuteAsync)} : "); #endif redisCompletedTasks.Add(tcs); }); @@ -368,12 +370,12 @@ public async Task ExecuteEmergentAsync(params string[] commandParam { var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); #if DEBUG - Console.WriteLine($"{nameof(ExecuteEmergentAsync)} : Command =>{string.Join(" ", commandParams)}"); + Configuration.Logger($"{nameof(ExecuteEmergentAsync)} : Command =>{string.Join(" ", commandParams)}"); #endif if (!await ReceiverEnumerator.SendEmergentDataAsync(GetSegments(commandParams), () => { #if DEBUG - Console.WriteLine($"{nameof(ExecuteEmergentAsync)} : "); + Configuration.Logger($"{nameof(ExecuteEmergentAsync)} : "); #endif emergentRedisCompletedTasks.Add(tcs); })) @@ -929,4 +931,4 @@ public void Dispose() } } } -} \ No newline at end of file +} diff --git a/TheUniversalCity.RedisClient/RedisConfiguration.cs b/TheUniversalCity.RedisClient/RedisConfiguration.cs index f1cc1d7..515a651 100644 --- a/TheUniversalCity.RedisClient/RedisConfiguration.cs +++ b/TheUniversalCity.RedisClient/RedisConfiguration.cs @@ -24,6 +24,8 @@ public class RedisConfiguration public int ReceiveBufferSize { get { return Options.ContainsKey(RECEIVE_BUFFER_SIZE) ? int.Parse(Options[RECEIVE_BUFFER_SIZE]) : 65536; } } public int SendBufferSize { get { return Options.ContainsKey(SEND_BUFFER_SIZE) ? int.Parse(Options[SEND_BUFFER_SIZE]) : 65536; } } + public Action Logger { get; set; } = Console.WriteLine; + public Dictionary Options { get; set; } = new Dictionary(); public RedisConfiguration(string connectionString) diff --git a/TheUniversalCity.RedisClient/RedisObjectDeterminator.cs b/TheUniversalCity.RedisClient/RedisObjectDeterminator.cs index 5b49cab..a0a7941 100644 --- a/TheUniversalCity.RedisClient/RedisObjectDeterminator.cs +++ b/TheUniversalCity.RedisClient/RedisObjectDeterminator.cs @@ -6,38 +6,69 @@ using TheUniversalCity.RedisClient.RedisObjects.Numerics; using TheUniversalCity.RedisClient.RedisObjects.SimpleStrings; -namespace TheUniversalCity.RedisClient -{ - public static class RedisObjectDeterminator - { +namespace TheUniversalCity.RedisClient { + public static class RedisObjectDeterminator { public const byte CR = (byte)'\r'; public const byte LF = (byte)'\n'; - public static RedisObject Determine(IEnumerator enumerator) - { + public static RedisObject Determine(IEnumerator enumerator +#if DEBUG + , + System.Action logger +#endif + + ) { enumerator.MoveNext(); var determinativeChar = enumerator.Current; - switch (determinativeChar) - { + switch (determinativeChar) { case RedisArray.DETERMINATIVE_CHAR: - return RedisCollectionObject.GetRedisCollectionObject(enumerator); - case RedisAttributeType.DETERMINATIVE_CHAR: - { - var attribute = RedisDictionaryObject.GetRedisDictionaryObject(enumerator); ; - var afterObj = Determine(enumerator); + return RedisCollectionObject.GetRedisCollectionObject( + enumerator +#if DEBUG + , + logger +#endif + ); + case RedisAttributeType.DETERMINATIVE_CHAR: { + var attribute = RedisDictionaryObject.GetRedisDictionaryObject( + enumerator +#if DEBUG + , + logger +#endif + ); + var afterObj = Determine( + enumerator +#if DEBUG + , + logger +#endif + ); - afterObj.SetAttribute(attribute); + afterObj.SetAttribute(attribute); - return afterObj; - } + return afterObj; + } case RedisBigNumber.DETERMINATIVE_CHAR: return RedisBigNumber.Parse(enumerator); case RedisBlobError.DETERMINATIVE_CHAR: - return RedisBlobError.Parse(enumerator); + return RedisBlobError.Parse( + enumerator +#if DEBUG + , + logger +#endif + ); case RedisBlobString.DETERMINATIVE_CHAR: - return RedisBlobString.Parse(enumerator); + return RedisBlobString.Parse( + enumerator +#if DEBUG + , + logger +#endif + ); case RedisBoolean.DETERMINATIVE_CHAR: return RedisBoolean.Parse(enumerator); case RedisDouble.DETERMINATIVE_CHAR: @@ -45,21 +76,45 @@ public static RedisObject Determine(IEnumerator enumerator) case RedisEndType.DETERMINATIVE_CHAR: return RedisEndType.Parse(enumerator); case RedisMapType.DETERMINATIVE_CHAR: - return RedisDictionaryObject.GetRedisDictionaryObject(enumerator); + return RedisDictionaryObject.GetRedisDictionaryObject( + enumerator +#if DEBUG + , + logger +#endif + ); case RedisNull.DETERMINATIVE_CHAR: return RedisNull.Parse(enumerator); case RedisNumber.DETERMINATIVE_CHAR: return RedisNumber.Parse(enumerator); case RedisPushType.DETERMINATIVE_CHAR: - return RedisCollectionObject.GetRedisCollectionObject(enumerator); + return RedisCollectionObject.GetRedisCollectionObject( + enumerator +#if DEBUG + , + logger +#endif + ); case RedisSetReply.DETERMINATIVE_CHAR: - return RedisCollectionObject.GetRedisCollectionObject(enumerator); + return RedisCollectionObject.GetRedisCollectionObject( + enumerator +#if DEBUG + , + logger +#endif + ); case RedisSimpleError.DETERMINATIVE_CHAR: return RedisSimpleError.Parse(enumerator); case RedisSimpleString.DETERMINATIVE_CHAR: return RedisSimpleString.Parse(enumerator); case RedisVerbatimString.DETERMINATIVE_CHAR: - return RedisVerbatimString.Parse(enumerator); + return RedisVerbatimString.Parse( + enumerator +#if DEBUG + , + logger +#endif + ); default: return null; } diff --git a/TheUniversalCity.RedisClient/RedisObjects/Agregates/Abstract/RedisCollectionObject.cs b/TheUniversalCity.RedisClient/RedisObjects/Agregates/Abstract/RedisCollectionObject.cs index 7dc2fd4..d465212 100644 --- a/TheUniversalCity.RedisClient/RedisObjects/Agregates/Abstract/RedisCollectionObject.cs +++ b/TheUniversalCity.RedisClient/RedisObjects/Agregates/Abstract/RedisCollectionObject.cs @@ -13,21 +13,31 @@ public abstract class RedisCollectionObject : RedisObject, ICollection false; - public static TRedisCollectionObject GetRedisCollectionObject(IEnumerator enumerator) where TRedisCollectionObject : RedisCollectionObject, new() - { + public static TRedisCollectionObject GetRedisCollectionObject(IEnumerator enumerator +#if DEBUG + , + System.Action logger +#endif + ) where TRedisCollectionObject : RedisCollectionObject, new() { var length = int.Parse(ReadLineEofCrLf(enumerator, Encoding.ASCII)); var collectionObject = new TRedisCollectionObject(); - if (length == -1) - { + if (length == -1) { return collectionObject; } collectionObject.Items = new List(length); - for (int i = 0; i < length; i++) - { - collectionObject.Add(RedisObjectDeterminator.Determine(enumerator)); + for (int i = 0; i < length; i++) { + collectionObject.Add( + RedisObjectDeterminator.Determine( + enumerator +#if DEBUG + , + logger +#endif + ) + ); } return collectionObject; diff --git a/TheUniversalCity.RedisClient/RedisObjects/Agregates/Abstract/RedisDictionaryObject.cs b/TheUniversalCity.RedisClient/RedisObjects/Agregates/Abstract/RedisDictionaryObject.cs index 8f9f5a6..58a8bc5 100644 --- a/TheUniversalCity.RedisClient/RedisObjects/Agregates/Abstract/RedisDictionaryObject.cs +++ b/TheUniversalCity.RedisClient/RedisObjects/Agregates/Abstract/RedisDictionaryObject.cs @@ -16,21 +16,38 @@ public abstract class RedisDictionaryObject : RedisObject, IReadOnlyDictionary Dictionary.Count; - public static TRedisDictionaryObject GetRedisDictionaryObject(IEnumerator enumerator) where TRedisDictionaryObject : RedisDictionaryObject, new() - { + public static TRedisDictionaryObject GetRedisDictionaryObject(IEnumerator enumerator +#if DEBUG + , + System.Action logger +#endif + ) where TRedisDictionaryObject : RedisDictionaryObject, new() { var length = int.Parse(ReadLineEofCrLf(enumerator, Encoding.ASCII)); var collectionObject = new TRedisDictionaryObject(); - if (length == -1) - { + if (length == -1) { return collectionObject; } collectionObject.Dictionary = new Dictionary(length); - for (int i = 0; i < length; i++) - { - collectionObject.Dictionary.Add(RedisObjectDeterminator.Determine(enumerator), RedisObjectDeterminator.Determine(enumerator)); + for (int i = 0; i < length; i++) { + collectionObject.Dictionary.Add( + RedisObjectDeterminator.Determine( + enumerator +#if DEBUG + , + logger +#endif + ), + RedisObjectDeterminator.Determine( + enumerator +#if DEBUG + , + logger +#endif + ) + ); } return collectionObject; diff --git a/TheUniversalCity.RedisClient/RedisObjects/BlobStrings/RedisBlobError.cs b/TheUniversalCity.RedisClient/RedisObjects/BlobStrings/RedisBlobError.cs index ed13afd..be60895 100644 --- a/TheUniversalCity.RedisClient/RedisObjects/BlobStrings/RedisBlobError.cs +++ b/TheUniversalCity.RedisClient/RedisObjects/BlobStrings/RedisBlobError.cs @@ -9,16 +9,29 @@ public class RedisBlobError : RedisBlobObject { public const byte DETERMINATIVE_CHAR = (byte)'!'; - public static RedisBlobError Parse(IEnumerator enumerator) - { + public static RedisBlobError Parse(IEnumerator enumerator +#if DEBUG + , + System.Action logger +#endif + ) { var length = long.Parse(ReadLineEofCrLf(enumerator, Encoding.ASCII)); - if (length == -1) - { + if (length == -1) { return new RedisBlobError(); } - return new RedisBlobError { Values = ReadBlobEofCrLf(enumerator, length, Encoding.UTF8) }; + return new RedisBlobError { + Values = ReadBlobEofCrLf( + enumerator, + length, + Encoding.UTF8 +#if DEBUG + , + logger +#endif + ) + }; } public static implicit operator string(RedisBlobError redisBlobError) diff --git a/TheUniversalCity.RedisClient/RedisObjects/BlobStrings/RedisBlobString.cs b/TheUniversalCity.RedisClient/RedisObjects/BlobStrings/RedisBlobString.cs index d568f6b..aefe7ac 100644 --- a/TheUniversalCity.RedisClient/RedisObjects/BlobStrings/RedisBlobString.cs +++ b/TheUniversalCity.RedisClient/RedisObjects/BlobStrings/RedisBlobString.cs @@ -10,18 +10,31 @@ public class RedisBlobString : RedisBlobObject { public const byte DETERMINATIVE_CHAR = (byte)'$'; - public static RedisBlobString Parse(IEnumerator enumerator) - { + public static RedisBlobString Parse(IEnumerator enumerator +#if DEBUG + , + Action logger +#endif + ) { var length = int.Parse(ReadLineEofCrLf(enumerator, Encoding.ASCII)); #if DEBUG - Console.WriteLine($"RedisBlobString Parse : Length =>{length}"); + logger($"RedisBlobString Parse : Length =>{length}"); #endif - if (length == -1) - { + if (length == -1) { return new RedisBlobString(); } - return new RedisBlobString { Values = ReadBlobEofCrLf(enumerator, length, Encoding.UTF8) }; + return new RedisBlobString { + Values = ReadBlobEofCrLf( + enumerator, + length, + Encoding.UTF8 +#if DEBUG + , + logger +#endif + ) + }; } public static implicit operator string(RedisBlobString redisBlobString) diff --git a/TheUniversalCity.RedisClient/RedisObjects/BlobStrings/RedisVerbatimString.cs b/TheUniversalCity.RedisClient/RedisObjects/BlobStrings/RedisVerbatimString.cs index 4b483fa..5b0c890 100644 --- a/TheUniversalCity.RedisClient/RedisObjects/BlobStrings/RedisVerbatimString.cs +++ b/TheUniversalCity.RedisClient/RedisObjects/BlobStrings/RedisVerbatimString.cs @@ -13,12 +13,15 @@ public class RedisVerbatimString : RedisBlobObject public string Prefix { get; set; } - public static RedisVerbatimString Parse(IEnumerator enumerator) - { + public static RedisVerbatimString Parse(IEnumerator enumerator +#if DEBUG + , + Action logger +#endif + ) { var length = long.Parse(ReadLineEofCrLf(enumerator, Encoding.ASCII)); - if (length == -1) - { + if (length == -1) { return new RedisVerbatimString(); } @@ -35,11 +38,21 @@ public static RedisVerbatimString Parse(IEnumerator enumerator) enumerator.MoveNext(); // : character - if (enumerator.Current != COLON) { throw new InvalidOperationException(); } + if (enumerator.Current != COLON) { + throw new InvalidOperationException(); + } - return new RedisVerbatimString { - Prefix = Encoding.ASCII.GetString(bytesPrefix), - Values = ReadBlobEofCrLf(enumerator, length - 4, Encoding.UTF8) + return new RedisVerbatimString { + Prefix = Encoding.ASCII.GetString(bytesPrefix), + Values = ReadBlobEofCrLf( + enumerator, + length - 4, + Encoding.UTF8 +#if DEBUG + , + logger +#endif + ) }; } diff --git a/TheUniversalCity.RedisClient/RedisObjects/RedisObject.cs b/TheUniversalCity.RedisClient/RedisObjects/RedisObject.cs index 6c65368..5bcb45b 100644 --- a/TheUniversalCity.RedisClient/RedisObjects/RedisObject.cs +++ b/TheUniversalCity.RedisClient/RedisObjects/RedisObject.cs @@ -48,16 +48,21 @@ public static byte[][] ReadBlobEofCrLf(IEnumerator enumerator, long length } [MethodImpl(MethodImplOptions.AggressiveInlining)] - public static string[] ReadBlobEofCrLf(IEnumerator enumerator, long length, Encoding encoding) - { + public static string[] ReadBlobEofCrLf(IEnumerator enumerator, + long length, + Encoding encoding +#if DEBUG + , + Action logger +#endif + ) { var byteContainer = ReadBlobEofCrLf(enumerator, length); var stringContainer = new string[byteContainer.Length]; - for (int i = 0; i < byteContainer.Length; i++) - { + for (int i = 0; i < byteContainer.Length; i++) { stringContainer[i] = encoding.GetString(byteContainer[i]); #if DEBUG - Console.WriteLine($"{nameof(ReadBlobEofCrLf)} : Value =>{stringContainer[i]}, i=> {i}"); + logger($"{nameof(ReadBlobEofCrLf)} : Value =>{stringContainer[i]}, i=> {i}"); #endif } diff --git a/TheUniversalCity.RedisClient/Streaming/DnsEndPointTCPConnector.cs b/TheUniversalCity.RedisClient/Streaming/DnsEndPointTCPConnector.cs index 79c5398..6039b63 100644 --- a/TheUniversalCity.RedisClient/Streaming/DnsEndPointTCPConnector.cs +++ b/TheUniversalCity.RedisClient/Streaming/DnsEndPointTCPConnector.cs @@ -16,6 +16,7 @@ public sealed class DnsEndPointTCPConnector : IEnumerable private const int DEFAULT_SEND_BUFFER_SIZE = short.MaxValue; private const int DEFAULT_CONNECT_RETRY_COUNT = 3; private const int DEFAULT_CONNECT_RETRY_INTERVAL = 300; + private readonly Action _logger; public event Action OnException; public event Action OnConnectionTryFailed; @@ -25,11 +26,13 @@ public sealed class DnsEndPointTCPConnector : IEnumerable public DnsEndPointTCPConnector( DnsEndPoint[] endPointList, + Action logger, int receiverBufferSize = DEFAULT_RECEIVER_BUFFER_SIZE, int sendBufferSize = DEFAULT_SEND_BUFFER_SIZE, int connectRetry = DEFAULT_CONNECT_RETRY_COUNT, - int connectRetryInterval = DEFAULT_CONNECT_RETRY_INTERVAL) + int connectRetryInterval = DEFAULT_CONNECT_RETRY_INTERVAL) { + _logger = logger; EndPointList = endPointList; ReceiverBufferSize = receiverBufferSize; SendBufferSize = sendBufferSize; @@ -220,7 +223,7 @@ private bool ReceiveBuffer() Receiver.OnReceiverDisconnect?.Invoke(exception, this); - Console.WriteLine("Not Connected on Receive Buffer"); + Receiver._logger("Not Connected on Receive Buffer"); Connect(); @@ -240,13 +243,13 @@ private int Send(SocketAsyncEventArgs e, ManualResetEvent manualResetEvent) manualResetEvent.WaitOne(); } #if DEBUG - Console.WriteLine($"{nameof(Send)} Enter, senderOffset => {e.Offset}, BytesTransferred => {e.BytesTransferred}"); + Receiver._logger($"{nameof(Send)} Enter, senderOffset => {e.Offset}, BytesTransferred => {e.BytesTransferred}"); #endif if (e.SocketError != SocketError.Success) { var exception = new RedisClientNotConectedException("Not Connected", new SocketException((int)e.SocketError)); #if DEBUG - Console.WriteLine("Send Exception: " + exception.Message); + Receiver._logger("Send Exception: " + exception.Message); #endif Receiver.OnException?.Invoke(exception, this); } @@ -283,7 +286,7 @@ private SendResults SendBuffer() Receiver.OnSenderDisconnect?.Invoke(new RedisClientNotConectedException("Not Connected"), this); - Console.WriteLine("Not Connected on Send Buffer"); + Receiver._logger("Not Connected on Send Buffer"); //connectSignalEvent.WaitOne(); //Thread.Sleep(300); @@ -548,7 +551,7 @@ private void Connect() connectSignalEvent.Set(); - Console.WriteLine($"Socket connected. Endpoint => {Receiver.EndPointList[selectedEndpointIndex]}"); + Receiver._logger($"Socket connected. Endpoint => {Receiver.EndPointList[selectedEndpointIndex]}"); Receiver.OnConnected?.Invoke(this); return;