< Summary

Information
Class: CounterpointCollective.Dataflow.Internal.PostponedMessages<T1, T2>
Assembly: Dataflow.Composable
File(s): /builds/counterpointcollective/composabledataflowblocks/Source/Dataflow.Composable/DataFlow/Internal/PostponedMessages.cs
Line coverage
44%
Covered lines: 12
Uncovered lines: 15
Coverable lines: 27
Total lines: 109
Line coverage: 44.4%
Branch coverage
20%
Covered branches: 2
Total branches: 10
Branch coverage: 20%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
get_Count()100%11100%
Postpone(...)100%11100%
get_Increment()100%210%
Unregister(...)100%210%
Unregister(...)0%2040%
get_IsEmpty()100%11100%
TryPeekPostponed(...)0%620%
TryDequeuePostponed(...)100%22100%
TryPeekPostponedFor(...)0%620%
Clear()100%11100%

File(s)

/builds/counterpointcollective/composabledataflowblocks/Source/Dataflow.Composable/DataFlow/Internal/PostponedMessages.cs

#LineLine coverage
 1using CounterpointCollective.Collections;
 2using System.Diagnostics.CodeAnalysis;
 3using System.Threading.Tasks.Dataflow;
 4
 5namespace CounterpointCollective.Dataflow.Internal
 6{
 7    /// <exclude/>
 8    public readonly record struct PostponedMessage<B, T> : System.IEquatable<PostponedMessage<B, T>> where B : ITargetBl
 9    {
 10        public ISourceBlock<T> SourceBlock { get; }
 11        public B Target { get; }
 12        public DataflowMessageHeader MessageHeader { get; }
 13
 14        public PostponedMessage(ISourceBlock<T> sourceBlock, B target, DataflowMessageHeader messageHeader)
 15        {
 16            SourceBlock = sourceBlock;
 17            Target = target;
 18            MessageHeader = messageHeader;
 19        }
 20
 21        public bool Consume([MaybeNullWhen(false)] out T messageValue)
 22        {
 23            var v = SourceBlock.ConsumeMessage(MessageHeader, Target, out var messageConsumed);
 24            if (messageConsumed)
 25            {
 26                messageValue = v!;
 27                return true;
 28            }
 29            else
 30            {
 31                messageValue = default;
 32                return false;
 33            }
 34        }
 35
 36        public bool Reserve() => SourceBlock.ReserveMessage(MessageHeader, Target);
 37        public void ReleaseReservation() => SourceBlock.ReleaseReservation(MessageHeader, Target);
 38    }
 39
 40    /// <exclude />
 11841    public class PostponedMessages<B, T>(B owner)
 42    where B : ITargetBlock<T>
 43    {
 11844        private readonly UniquePriorityQueue<ISourceBlock<T>, DataflowMessageHeader, long> _priorityQueue = new();
 45        private long i;
 46
 21490547        public int Count => _priorityQueue.Count;
 48
 49        public long Postpone(ISourceBlock<T> s, DataflowMessageHeader h)
 50        {
 10228051            _priorityQueue.Enqueue(s, h, ++i);
 10228052            return i;
 53        }
 54
 055        public long Increment => ++i;
 56
 057        public bool Unregister(ISourceBlock<T> s) => _priorityQueue.Remove(s, out var _, out var _);
 58
 59        public bool Unregister(ISourceBlock<T> s, DataflowMessageHeader h)
 60        {
 061            if (_priorityQueue.TryGet(s, out var current, out var p) && current == h)
 62            {
 063                return Unregister(s);
 64            } else
 65            {
 066                return false;
 67            }
 68        }
 69
 21490570        public bool IsEmpty => Count == 0;
 71
 72        public bool TryPeekPostponed([MaybeNullWhen(false)] out PostponedMessage<B, T> postponedMessage)
 73        {
 074            if (_priorityQueue.TryPeek(out var s, out var h, out var _))
 75            {
 076                postponedMessage = new PostponedMessage<B, T>(s, owner, h);
 077                return true;
 78            }
 079            postponedMessage = default;
 080            return false;
 81        }
 82
 83        public bool TryDequeuePostponed(out PostponedMessage<B, T> postponedMessage)
 84        {
 10227585            if (_priorityQueue.TryDequeue(out var s, out var h, out var _))
 86            {
 10227387                postponedMessage = new PostponedMessage<B, T>(s, owner, h);
 10227388                return true;
 89            }
 290            postponedMessage = default;
 291            return false;
 92        }
 93
 94        public bool TryPeekPostponedFor(ISourceBlock<T> t, [MaybeNullWhen(false)] out PostponedMessage<B, T> postponedMe
 95        {
 096            if (_priorityQueue.TryGet(t, out var h, out var _))
 97            {
 098                postponedMessage = new PostponedMessage<B, T>(t, owner, h);
 099                return true;
 100            } else
 101            {
 0102                postponedMessage = default;
 0103                return false;
 104            }
 105        }
 106
 3107        public void Clear() => _priorityQueue.Clear();
 108    }
 109}