Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 26 additions & 24 deletions TheUniversalCity.RedisClient/RedisClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ private RedisClient(RedisConfiguration configuration)

Receiver = new DnsEndPointTCPConnector(
configuration.DnsEndPoints.ToArray(),
Configuration.Logger,
configuration.ReceiveBufferSize,
configuration.SendBufferSize,
configuration.ConnectRetry,
Expand Down Expand Up @@ -102,7 +103,7 @@ private RedisClient(RedisConfiguration configuration)
private void ReceiverEnumerator_OnConnected(DnsEndPointTCPConnector.Enumerator arg1)
{
#if DEBUG
Console.WriteLine(nameof(ReceiverEnumerator_OnConnected));
Configuration.Logger(nameof(ReceiverEnumerator_OnConnected));
#endif
Interlocked.CompareExchange(ref recentlyConnected, 1, 0);
emergencyStartAutoEvent.Set();
Expand Down Expand Up @@ -147,13 +148,13 @@ public async Task StartAsync()
}
catch (RedisClientNotConectedException ex)
{
Console.WriteLine("Start Error : " + ex.Message);
Configuration.Logger("Start Error : " + ex.Message);
}
finally
{
Interlocked.CompareExchange(ref recentlyConnected, 0, 2);
ReceiverEnumerator.EndEmergency();
Console.WriteLine("Start Emergency Disposing");
Configuration.Logger("Start Emergency Disposing");
}

//startComplete.Set();
Expand Down Expand Up @@ -187,7 +188,7 @@ private void ReceiverEnumerator_OnReceiverDisconnect(Exception arg1, DnsEndPoint

var count = redisCompletedTasks.Count - arg2.bufferedMessageCounter;
#if DEBUG
Console.WriteLine($"{nameof(ReceiverEnumerator_OnReceiverDisconnect)} : Count => {count}, taskCompletionSourcesCount => {redisCompletedTasks.Count}, arg2.bufferedMessageCounter => {arg2.bufferedMessageCounter}");
Configuration.Logger($"{nameof(ReceiverEnumerator_OnReceiverDisconnect)} : Count => {count}, taskCompletionSourcesCount => {redisCompletedTasks.Count}, arg2.bufferedMessageCounter => {arg2.bufferedMessageCounter}");
#endif
for (int i = 0; i < count; i++)
{
Expand Down Expand Up @@ -230,35 +231,36 @@ private void ReceiveWorker(object state)

while (!disposedValue)
{
try
{
while ((obj = RedisObjectDeterminator.Determine(redisClient.ReceiverEnumerator)) is RedisPushType pushObj)
{
if (pushObj[0].ToString() == "invalidate")
{
foreach (var item in pushObj[1] as RedisArray)
{
try {
while ((obj = RedisObjectDeterminator.Determine(
redisClient.ReceiverEnumerator
#if DEBUG
,
Configuration.Logger
#endif

)) is RedisPushType pushObj) {
if (pushObj[0].ToString() == "invalidate") {
foreach (var item in pushObj[1] as RedisArray) {
InvalidateSubscription(item);
}
}

OnPushMessageReceived?.Invoke(pushObj);
}
}
catch (Exception ex)
{
} catch (Exception ex) {
OnException?.Invoke(ex, ReceiverEnumerator.Socket);

continue;
}
#if DEBUG
Console.WriteLine($"{nameof(ReceiveWorker)} :emergencyFlag =>{ReceiverEnumerator.EmergencyFlag}, taskCompletionSourcesCount => {redisCompletedTasks.Count}, emergentTaskCompletionSourcesCount =>{emergentRedisCompletedTasks.Count}, obj=> {obj}, isnull => {obj == null}, type => {obj?.GetType().FullName}");
Configuration.Logger($"{nameof(ReceiveWorker)} :emergencyFlag =>{ReceiverEnumerator.EmergencyFlag}, taskCompletionSourcesCount => {redisCompletedTasks.Count}, emergentTaskCompletionSourcesCount =>{emergentRedisCompletedTasks.Count}, obj=> {obj}, isnull => {obj == null}, type => {obj?.GetType().FullName}");
#endif
TaskCompletionSource<RedisObject> tcs;

if (ReceiverEnumerator.EmergencyFlag)
{
Console.WriteLine("Receiver Managed Thread Id => " + Thread.CurrentThread.ManagedThreadId);
Configuration.Logger("Receiver Managed Thread Id => " + Thread.CurrentThread.ManagedThreadId);

tcs = emergentRedisCompletedTasks.Take();
}
Expand All @@ -267,7 +269,7 @@ private void ReceiveWorker(object state)
tcs = redisCompletedTasks.Take();
}
#if DEBUG
Console.WriteLine($"{nameof(ReceiveWorker)} :");
Configuration.Logger($"{nameof(ReceiveWorker)} :");
#endif

tcs.TrySetResult(obj);
Expand Down Expand Up @@ -300,7 +302,7 @@ public async Task<RedisObject> ExecuteAsync(CancellationToken cancellationToken,
var result = await ExecuteAsync(TaskCreationOptions.RunContinuationsAsynchronously, cancellationToken, segments);

#if DEBUG
Console.WriteLine($"{nameof(ExecuteAsync)} : Command =>{string.Join(" ", commandParams)}, Result => {result}");
Configuration.Logger($"{nameof(ExecuteAsync)} : Command =>{string.Join(" ", commandParams)}, Result => {result}");
#endif

return result;
Expand All @@ -313,7 +315,7 @@ public RedisObject Execute(CancellationToken cancellationToken, params string[]
var result = ExecuteAsync(TaskCreationOptions.None, cancellationToken, segments).ConfigureAwait(false).GetAwaiter().GetResult();

#if DEBUG
Console.WriteLine($"{nameof(ExecuteAsync)} : Command =>{string.Join(" ", commandParams)}, Result => {result}");
Configuration.Logger($"{nameof(ExecuteAsync)} : Command =>{string.Join(" ", commandParams)}, Result => {result}");
#endif

return result;
Expand All @@ -333,7 +335,7 @@ public async Task<RedisObject> ExecuteAsync(TaskCreationOptions taskCreationOpti
var result = ReceiverEnumerator.SendData(commandParams, (transferredBytes) =>
{
#if DEBUG
Console.WriteLine($"{nameof(ExecuteAsync)} : ");
Configuration.Logger($"{nameof(ExecuteAsync)} : ");
#endif
redisCompletedTasks.Add(tcs);
});
Expand Down Expand Up @@ -368,12 +370,12 @@ public async Task<RedisObject> ExecuteEmergentAsync(params string[] commandParam
{
var tcs = new TaskCompletionSource<RedisObject>(TaskCreationOptions.RunContinuationsAsynchronously);
#if DEBUG
Console.WriteLine($"{nameof(ExecuteEmergentAsync)} : Command =>{string.Join(" ", commandParams)}");
Configuration.Logger($"{nameof(ExecuteEmergentAsync)} : Command =>{string.Join(" ", commandParams)}");
#endif
if (!await ReceiverEnumerator.SendEmergentDataAsync(GetSegments(commandParams), () =>
{
#if DEBUG
Console.WriteLine($"{nameof(ExecuteEmergentAsync)} : ");
Configuration.Logger($"{nameof(ExecuteEmergentAsync)} : ");
#endif
emergentRedisCompletedTasks.Add(tcs);
}))
Expand Down Expand Up @@ -929,4 +931,4 @@ public void Dispose()
}
}
}
}
}
2 changes: 2 additions & 0 deletions TheUniversalCity.RedisClient/RedisConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ public class RedisConfiguration
public int ReceiveBufferSize { get { return Options.ContainsKey(RECEIVE_BUFFER_SIZE) ? int.Parse(Options[RECEIVE_BUFFER_SIZE]) : 65536; } }
public int SendBufferSize { get { return Options.ContainsKey(SEND_BUFFER_SIZE) ? int.Parse(Options[SEND_BUFFER_SIZE]) : 65536; } }

public Action<string> Logger { get; set; } = Console.WriteLine;

public Dictionary<string, string> Options { get; set; } = new Dictionary<string, string>();

public RedisConfiguration(string connectionString)
Expand Down
99 changes: 77 additions & 22 deletions TheUniversalCity.RedisClient/RedisObjectDeterminator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,60 +6,115 @@
using TheUniversalCity.RedisClient.RedisObjects.Numerics;
using TheUniversalCity.RedisClient.RedisObjects.SimpleStrings;

namespace TheUniversalCity.RedisClient
{
public static class RedisObjectDeterminator
{
namespace TheUniversalCity.RedisClient {
public static class RedisObjectDeterminator {
public const byte CR = (byte)'\r';
public const byte LF = (byte)'\n';

public static RedisObject Determine(IEnumerator<byte> enumerator)
{
public static RedisObject Determine(IEnumerator<byte> enumerator
#if DEBUG
,
System.Action<string> logger
#endif

) {
enumerator.MoveNext();

var determinativeChar = enumerator.Current;

switch (determinativeChar)
{
switch (determinativeChar) {
case RedisArray.DETERMINATIVE_CHAR:
return RedisCollectionObject.GetRedisCollectionObject<RedisArray>(enumerator);
case RedisAttributeType.DETERMINATIVE_CHAR:
{
var attribute = RedisDictionaryObject.GetRedisDictionaryObject<RedisAttributeType>(enumerator); ;
var afterObj = Determine(enumerator);
return RedisCollectionObject.GetRedisCollectionObject<RedisArray>(
enumerator
#if DEBUG
,
logger
#endif
);
case RedisAttributeType.DETERMINATIVE_CHAR: {
var attribute = RedisDictionaryObject.GetRedisDictionaryObject<RedisAttributeType>(
enumerator
#if DEBUG
,
logger
#endif
);
var afterObj = Determine(
enumerator
#if DEBUG
,
logger
#endif
);

afterObj.SetAttribute(attribute);
afterObj.SetAttribute(attribute);

return afterObj;
}
return afterObj;
}
case RedisBigNumber.DETERMINATIVE_CHAR:
return RedisBigNumber.Parse(enumerator);
case RedisBlobError.DETERMINATIVE_CHAR:
return RedisBlobError.Parse(enumerator);
return RedisBlobError.Parse(
enumerator
#if DEBUG
,
logger
#endif
);
case RedisBlobString.DETERMINATIVE_CHAR:
return RedisBlobString.Parse(enumerator);
return RedisBlobString.Parse(
enumerator
#if DEBUG
,
logger
#endif
);
case RedisBoolean.DETERMINATIVE_CHAR:
return RedisBoolean.Parse(enumerator);
case RedisDouble.DETERMINATIVE_CHAR:
return RedisDouble.Parse(enumerator);
case RedisEndType.DETERMINATIVE_CHAR:
return RedisEndType.Parse(enumerator);
case RedisMapType.DETERMINATIVE_CHAR:
return RedisDictionaryObject.GetRedisDictionaryObject<RedisMapType>(enumerator);
return RedisDictionaryObject.GetRedisDictionaryObject<RedisMapType>(
enumerator
#if DEBUG
,
logger
#endif
);
case RedisNull.DETERMINATIVE_CHAR:
return RedisNull.Parse(enumerator);
case RedisNumber.DETERMINATIVE_CHAR:
return RedisNumber.Parse(enumerator);
case RedisPushType.DETERMINATIVE_CHAR:
return RedisCollectionObject.GetRedisCollectionObject<RedisPushType>(enumerator);
return RedisCollectionObject.GetRedisCollectionObject<RedisPushType>(
enumerator
#if DEBUG
,
logger
#endif
);
case RedisSetReply.DETERMINATIVE_CHAR:
return RedisCollectionObject.GetRedisCollectionObject<RedisSetReply>(enumerator);
return RedisCollectionObject.GetRedisCollectionObject<RedisSetReply>(
enumerator
#if DEBUG
,
logger
#endif
);
case RedisSimpleError.DETERMINATIVE_CHAR:
return RedisSimpleError.Parse(enumerator);
case RedisSimpleString.DETERMINATIVE_CHAR:
return RedisSimpleString.Parse(enumerator);
case RedisVerbatimString.DETERMINATIVE_CHAR:
return RedisVerbatimString.Parse(enumerator);
return RedisVerbatimString.Parse(
enumerator
#if DEBUG
,
logger
#endif
);
default:
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,31 @@ public abstract class RedisCollectionObject : RedisObject, ICollection<RedisObje

public bool IsReadOnly => false;

public static TRedisCollectionObject GetRedisCollectionObject<TRedisCollectionObject>(IEnumerator<byte> enumerator) where TRedisCollectionObject : RedisCollectionObject, new()
{
public static TRedisCollectionObject GetRedisCollectionObject<TRedisCollectionObject>(IEnumerator<byte> enumerator
#if DEBUG
,
System.Action<string> logger
#endif
) where TRedisCollectionObject : RedisCollectionObject, new() {
var length = int.Parse(ReadLineEofCrLf(enumerator, Encoding.ASCII));
var collectionObject = new TRedisCollectionObject();

if (length == -1)
{
if (length == -1) {
return collectionObject;
}

collectionObject.Items = new List<RedisObject>(length);

for (int i = 0; i < length; i++)
{
collectionObject.Add(RedisObjectDeterminator.Determine(enumerator));
for (int i = 0; i < length; i++) {
collectionObject.Add(
RedisObjectDeterminator.Determine(
enumerator
#if DEBUG
,
logger
#endif
)
);
}

return collectionObject;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,38 @@ public abstract class RedisDictionaryObject : RedisObject, IReadOnlyDictionary<R

public int Count => Dictionary.Count;

public static TRedisDictionaryObject GetRedisDictionaryObject<TRedisDictionaryObject>(IEnumerator<byte> enumerator) where TRedisDictionaryObject : RedisDictionaryObject, new()
{
public static TRedisDictionaryObject GetRedisDictionaryObject<TRedisDictionaryObject>(IEnumerator<byte> enumerator
#if DEBUG
,
System.Action<string> logger
#endif
) where TRedisDictionaryObject : RedisDictionaryObject, new() {
var length = int.Parse(ReadLineEofCrLf(enumerator, Encoding.ASCII));
var collectionObject = new TRedisDictionaryObject();

if (length == -1)
{
if (length == -1) {
return collectionObject;
}

collectionObject.Dictionary = new Dictionary<RedisObject, RedisObject>(length);

for (int i = 0; i < length; i++)
{
collectionObject.Dictionary.Add(RedisObjectDeterminator.Determine(enumerator), RedisObjectDeterminator.Determine(enumerator));
for (int i = 0; i < length; i++) {
collectionObject.Dictionary.Add(
RedisObjectDeterminator.Determine(
enumerator
#if DEBUG
,
logger
#endif
),
RedisObjectDeterminator.Determine(
enumerator
#if DEBUG
,
logger
#endif
)
);
}

return collectionObject;
Expand Down
Loading