< Summary

Information
Class: CounterpointCollective.Dataflow.Internal.PostponedMessagesManager<T1, T2>
Assembly: Dataflow.Composable
File(s): /builds/counterpointcollective/composabledataflowblocks/Source/Dataflow.Composable/DataFlow/Internal/PostponedMessagesManager.cs
Line coverage
67%
Covered lines: 35
Uncovered lines: 17
Coverable lines: 52
Total lines: 111
Line coverage: 67.3%
Branch coverage
78%
Covered branches: 11
Total branches: 14
Branch coverage: 78.5%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
get_PostponedMessagesLoop()100%11100%
.ctor(...)0%620%
.ctor(...)100%22100%
get_Consume()100%11100%
get_IncomingLock()100%11100%
get_IsRunningPostponedMessages()100%11100%
ProcessPostponedMessages()100%11100%
ShutdownAsync()100%11100%
Handle()100%44100%
TryDequeue()83.33%66100%

File(s)

/builds/counterpointcollective/composabledataflowblocks/Source/Dataflow.Composable/DataFlow/Internal/PostponedMessagesManager.cs

#LineLine coverage
 1using CounterpointCollective.Threading;
 2using System.Diagnostics.CodeAnalysis;
 3using System.Threading.Tasks.Dataflow;
 4
 5namespace CounterpointCollective.Dataflow.Internal
 6{
 7    /// <exclude/>
 8    public delegate bool TryDequeuePostponedMessage<B, T>([MaybeNullWhen(false)] out PostponedMessage<B, T> postponedMes
 9    where B : ITargetBlock<T>;
 10
 11    internal class PostponedMessagesManager<B, T> where B : ITargetBlock<T>
 12    {
 7613        private Task PostponedMessagesLoop { get; }
 14
 15        public PostponedMessagesManager(
 16            PostponedMessages<B, T> postponedMessages,
 17            Func<bool> allowToRun,
 18            Action? postAction = null
 19        ) :
 020            this
 021            (
 022                (out PostponedMessage<B, T> postponedMessage) =>
 023                {
 024                    if (allowToRun())
 025                    {
 026                        return postponedMessages.TryDequeuePostponed(out postponedMessage);
 027                    }
 028                    else
 029                    {
 030                        postponedMessage = default;
 031                        return false;
 032                    }
 033                },
 034                postAction
 035            )
 36        {
 037        }
 38
 11339        public PostponedMessagesManager(
 11340            TryDequeuePostponedMessage<B, T> tryDequeuePostponedMessage,
 11341            Action? postAction = null
 11342        )
 43        {
 11344            _tryDequeuePostponedMessage = tryDequeuePostponedMessage;
 11345            _postAction = postAction;
 11346            Consume = (in PostponedMessage<B, T> p) => { };
 22647            PostponedMessagesLoop = Task.Run(() => Handle());
 11348        }
 49
 50        private readonly TryDequeuePostponedMessage<B, T> _tryDequeuePostponedMessage;
 51
 52        public delegate void Consumer(in PostponedMessage<B, T> value);
 53
 10249754        public Consumer Consume { get; set; }
 55        private readonly Action? _postAction;
 56
 11357        private readonly AsyncAutoResetEventSlim _sem = new(false);
 144527858        public object IncomingLock { get; } = new();
 59
 144515460        public bool IsRunningPostponedMessages { get; private set; }
 61
 31575662        public void ProcessPostponedMessages() => _sem.Set();
 63
 64        public async Task ShutdownAsync()
 65        {
 7666            _sem.Terminate();
 7667            await PostponedMessagesLoop;
 7668        }
 69
 70        private async Task Handle()
 71        {
 72
 73            while (true)
 74            {
 75                try
 76                {
 11274777                    await _sem.WaitOneAsync();
 11271078                } catch (ObjectDisposedException)
 79                {
 7680                    break;
 81                }
 82
 21490583                while (!_sem.Terminated && TryDequeue(out var postponedMessage))
 84                {
 10227185                    Consume(in postponedMessage!);
 10227186                }
 87            }
 88
 89            bool TryDequeue(out PostponedMessage<B, T> postponedMessage)
 90            {
 21490591                lock (IncomingLock)
 92                {
 21490593                    if (_tryDequeuePostponedMessage(out postponedMessage))
 94                    {
 10227195                        IsRunningPostponedMessages = true;
 10227196                        return true;
 97                    } else
 98                    {
 11263499                        if (IsRunningPostponedMessages)
 100                        {
 102271101                            IsRunningPostponedMessages = false;
 102271102                            _postAction?.Invoke();
 103                        }
 112634104                        postponedMessage = default;
 112634105                        return false;
 106                    }
 107                }
 214905108            }
 76109        }
 110    }
 111}