Skip to content

Extended AsyncManualResetEvent #291

@StefanAdriaenssen

Description

@StefanAdriaenssen

Hello,

I had a question about the AsyncManualResetEvent. I often need to perform some asynchronous loops. In order to explain, see the sample code below which resembles a buffer of sorts. The buffer can be filled by many threads. It should be flushed when a certain maximum amount of items in the buffer has been reached. However, this this may not take longer than a certain amount of time. After the buffer is flushed, the buffer will again start waiting for it to be filled or the timeout occurs, and so on.

The following sample illustrates how the AsyncManualResetEvent fits nicely to do the job:

class Sample
    {
        private ConcurrentQueue<BufferItem> _buffer;

        private int _bufferSize;
        private TimeSpan _maxWaitTime;

        private AsyncManualResetEvent _flushSignal;

        public Sample(TimeSpan maxWaitTime, int bufferSize)
        {
            _maxWaitTime = maxWaitTime;
            _bufferSize = bufferSize;

            _buffer = new ConcurrentQueue<BufferItem>();
            _flushSignal = new AsyncManualResetEvent();
        }

        public void AddToBuffer(BufferItem item)
        {
            _buffer.Enqueue(item);

            if (_buffer.Count > _bufferSize)
                _flushSignal.Set();
        }

        private void Flush()
        {
            _buffer.Clear();    // Actual code would process the items, but that's irrelevant. They're just cleared here for brevity.
            _flushSignal.Reset();
        }

        private async Task RunAsync(CancellationToken cancellationToken)
        {
            while (!cancellationToken.IsCancellationRequested)
            {
                using (CancellationTokenSource timeout = new CancellationTokenSource(_maxWaitTime))
                using (CancellationTokenSource linked = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, timeout.Token))
                {
                    try
                    {
                        await _flushSignal.WaitAsync(linked.Token);                // <-- This is what my question is about
                    }
                    catch (OperationCanceledException)
                    { }
                }

                if (_buffer.Any())
                    Flush();
            }
        }

        public void StartProcessing(CancellationToken cancellationToken)
        {
            // Let's assume this method is called only once (this is just a sample)
            Task.Run(() => RunAsync(cancellationToken));
        }
    }

Note that:

  • When the maximum amount of items in the buffer has been reached, the ManualResetEvent will be set, causing a flush.
  • When the timeout has occurred, the buffer will also be flushed.
  • When cancellation has been requested (e.g. when the application is about to close), the buffer will be flushed one last time and then the loop exits.

There are 2 issues with the current implementation. First, I'd like to prevent having to use exception handling for regular program flow. Secondly, because the current WaitAsync only accepts a single CancellationToken, I'm forced to create linked token sources. This has to happen outside of the AsyncManualResetEvent. This pattern occurs in multiple places, so I'd have to repeat it every time again. Maybe I could create some extension method, or a generic class, but this pattern can be used in many places, not just buffers like the one above. I would much rather add some methods to AsyncManualResetEvent that make my consumer code much easier to read. And it would be easy for reuse. I'd like the RunAsync method to become like this:

private async Task RunAsync(CancellationToken cancellationToken)
        {
            while (!cancellationToken.IsCancellationRequested)
            {
                await _flushSignal.TryWaitAsync(_maxWaitTime, cancellationToken);           // <-- Much cleaner

                if (_buffer.Any())
                    Flush();
            }
        }

The behavior is exactly the same, but it is much shorter. Also, I have renamed WaitAsync to TryWaitAsync to emphasize the fact that this overload does not throw exceptions at all. Not when the timeout occurs, nor when external cancellation is requested.

I've been trying to implement the logic myself, and I came up with the following solution. The core of it is in the 'DoWaitAsync' method.

/// <summary>
    /// An async-compatible manual-reset event.
    /// </summary>
    [DebuggerDisplay("Id = {Id}, IsSet = {GetStateForDebugger}")]
    [DebuggerTypeProxy(typeof(DebugView))]
    public sealed class AsyncManualResetEvent
    {
        /// <summary>
        /// The object used for synchronization.
        /// </summary>
        private readonly object _mutex;

        /// <summary>
        /// The current state of the event.
        /// </summary>
        private TaskCompletionSource<object?> _tcs;

        /// <summary>
        /// The semi-unique identifier for this instance. This is 0 if the id has not yet been created.
        /// </summary>
        private int _id;

        [DebuggerNonUserCode]
        private bool GetStateForDebugger
        {
            get
            {
                return _tcs.Task.IsCompleted;
            }
        }

        /// <summary>
        /// Creates an async-compatible manual-reset event.
        /// </summary>
        /// <param name="set">Whether the manual-reset event is initially set or unset.</param>
        public AsyncManualResetEvent(bool set)
        {
            _mutex = new object();
            _tcs = TaskCompletionSourceExtensions.CreateAsyncTaskSource<object?>();
            if (set)
                _tcs.TrySetResult(null);
        }

        /// <summary>
        /// Creates an async-compatible manual-reset event that is initially unset.
        /// </summary>
        public AsyncManualResetEvent()
            : this(false)
        {
        }

        /// <summary>
        /// Gets a semi-unique identifier for this asynchronous manual-reset event.
        /// </summary>
        public int Id
        {
            get { return IdManager<AsyncManualResetEvent>.GetId(ref _id); }
        }

        /// <summary>
        /// Whether this event is currently set. This member is seldom used; code using this member has a high possibility of race conditions.
        /// </summary>
        public bool IsSet
        {
            get { lock (_mutex) return _tcs.Task.IsCompleted; }
        }

        /// <summary>
        /// Asynchronously waits for this event to be set.
        /// </summary>
        public Task WaitAsync()
        {
            lock (_mutex)
            {
                return _tcs.Task;
            }
        }

        /// <summary>
        /// Asynchronously waits for this event to be set or for the wait to be canceled.
        /// </summary>
        /// <param name="cancellationToken">The cancellation token used to cancel the wait. If this token is already canceled, this method will first check whether the event is set.</param>
        public Task WaitAsync(CancellationToken cancellationToken)
        {
            var waitTask = WaitAsync();
            if (waitTask.IsCompleted)
                return waitTask;
            return waitTask.WaitAsync(cancellationToken);
        }

        private enum WaitResult
        {
            Set,
            Timeout,
            Canceled
        }

        /// <summary>
        /// This is a 'soft' version for waiting for the event asynchronously.
        /// It will return whether the event was set or not.
        /// If not, no exceptions will be thrown.
        /// </summary>
        public async Task<bool> TryWaitAsync(TimeSpan timeout, CancellationToken cancellationToken)
        {
            WaitResult result = await DoWaitAsync(timeout, cancellationToken).ConfigureAwait(false);
            return result == WaitResult.Set;
        }

        /// <summary>
        /// This is a 'hard' version for waiting for the event asynchronously.
        /// It will either return when the event was set, or it will throw an exception.
        /// If the operation was cancelled explicitely by means of the cancellation token, then an OperationCanceledException is thrown.
        /// If the wait timed out before it was either set or cancelled, then a TimeoutException is thrown.
        /// </summary>
        public async Task WaitAsync(TimeSpan timeout, CancellationToken cancellationToken)
        {
            WaitResult result = await DoWaitAsync(timeout, cancellationToken).ConfigureAwait(false);
            
            if (result == WaitResult.Timeout)
                throw new TimeoutException();

            if (result == WaitResult.Canceled)
                throw new OperationCanceledException();
        }

        private async Task<WaitResult> DoWaitAsync(TimeSpan timeout, CancellationToken cancellationToken)
        {
            if (cancellationToken.IsCancellationRequested)
                return WaitResult.Canceled;

            // The following is a task whose responsibility is to 'wait for the timeout to occur'
            TaskCompletionSource<bool> waitForTimeout = TaskCompletionSourceExtensions.CreateAsyncTaskSource<bool>();

            using (var timeoutCts = new CancellationTokenSource(timeout))
            using (CancellationTokenRegistration registration1 = timeoutCts.Token.Register(() => waitForTimeout.TrySetResult(true)))        // If timeout occurs first, then set the TCS to 'true'.
            using (CancellationTokenRegistration registration2 = cancellationToken.Register(() => waitForTimeout.TrySetResult(false)))      // If cancellation is called first, then the waitForTimeout TCS did NOT timeout, therefore 'false'.
            {
                try
                {
                    // Regardless whether Reset was called (reinstantiating _tcs) somewhere between entering this method and this line,
                    // this method will now wait for the next Set() to occur (or timeout or cancel).
                    // Untill now, we've only prepared the wait-for-timeout task.
                    var waitForSet = WaitAsync();

                    // Last chance to avoid having to wait for anything
                    if (waitForSet.IsCompleted)
                        return WaitResult.Set;

                    Task firstToComplete = await Task.WhenAny(waitForSet, waitForTimeout.Task).ConfigureAwait(false);

                    if (firstToComplete == waitForSet)
                        return WaitResult.Set;

                    bool timedOut = await waitForTimeout.Task.ConfigureAwait(false);

                    if (timedOut)
                        return WaitResult.Timeout;
                    else
                        return WaitResult.Canceled;
                }
                finally
                {
                    // Make sure the waitForTimeout task is completed, so that it can be garbage collected (it won't be hanging in the Task.WhenAny).
                    waitForTimeout.TrySetCanceled();

                    registration1.Unregister();
                    registration2.Unregister();
                }
            }
        }

        /// <summary>
        /// Synchronously waits for this event to be set. This method may block the calling thread.
        /// </summary>
        public void Wait()
        {
            WaitAsync().WaitAndUnwrapException();
        }

        /// <summary>
        /// Synchronously waits for this event to be set. This method may block the calling thread.
        /// </summary>
        /// <param name="cancellationToken">The cancellation token used to cancel the wait. If this token is already canceled, this method will first check whether the event is set.</param>
        public void Wait(CancellationToken cancellationToken)
        {
            var ret = WaitAsync(CancellationToken.None);
            if (ret.IsCompleted)
                return;
            ret.WaitAndUnwrapException(cancellationToken);
        }

#pragma warning disable CA1200 // Avoid using cref tags with a prefix
        /// <summary>
        /// Sets the event, atomically completing every task returned by <see cref="O:Nito.AsyncEx.AsyncManualResetEvent.WaitAsync"/>. If the event is already set, this method does nothing.
        /// </summary>
#pragma warning restore CA1200 // Avoid using cref tags with a prefix
        public void Set()
        {
            lock (_mutex)
            {
                _tcs.TrySetResult(null);
            }
        }

        /// <summary>
        /// Resets the event. If the event is already reset, this method does nothing.
        /// </summary>
        public void Reset()
        {
            lock (_mutex)
            {
                if (_tcs.Task.IsCompleted)
                    _tcs = TaskCompletionSourceExtensions.CreateAsyncTaskSource<object?>();
            }
        }

        // ReSharper disable UnusedMember.Local
        [DebuggerNonUserCode]
        private sealed class DebugView
        {
            private readonly AsyncManualResetEvent _mre;

            public DebugView(AsyncManualResetEvent mre)
            {
                _mre = mre;
            }

            public int Id { get { return _mre.Id; } }

            public bool IsSet { get { return _mre.GetStateForDebugger; } }

            public Task CurrentTask { get { return _mre._tcs.Task; } }
        }
        // ReSharper restore UnusedMember.Local
    }

There are of course more overloads that could be added (such as one with a timeout but without a cancellation token, a synchronous one, etc). But the idea should be clear.

Could you please review this code and let me know if you see any major issues with it? Do you believe it would run into threading or performance issues? Or memory leaks perhaps? Or is this a bad design altogether?

Thanks in advance,
Stefan Adriaenssen

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions