< Summary

Information
Class: CounterpointCollective.Dataflow.Internal.PostponedMessage<T1, T2>
Assembly: Dataflow.Composable
File(s): /builds/counterpointcollective/composabledataflowblocks/Source/Dataflow.Composable/DataFlow/Internal/PostponedMessages.cs
Line coverage
73%
Covered lines: 11
Uncovered lines: 4
Coverable lines: 15
Total lines: 109
Line coverage: 73.3%
Branch coverage
50%
Covered branches: 1
Total branches: 2
Branch coverage: 50%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
get_SourceBlock()100%11100%
get_Target()100%11100%
get_MessageHeader()100%11100%
.ctor(...)100%11100%
Consume(...)50%2266.66%
Reserve()100%210%
ReleaseReservation()100%210%

File(s)

/builds/counterpointcollective/composabledataflowblocks/Source/Dataflow.Composable/DataFlow/Internal/PostponedMessages.cs

#LineLine coverage
 1using CounterpointCollective.Collections;
 2using System.Diagnostics.CodeAnalysis;
 3using System.Threading.Tasks.Dataflow;
 4
 5namespace CounterpointCollective.Dataflow.Internal
 6{
 7    /// <exclude/>
 8    public readonly record struct PostponedMessage<B, T> : System.IEquatable<PostponedMessage<B, T>> where B : ITargetBl
 9    {
 10227510        public ISourceBlock<T> SourceBlock { get; }
 10227111        public B Target { get; }
 10227312        public DataflowMessageHeader MessageHeader { get; }
 13
 14        public PostponedMessage(ISourceBlock<T> sourceBlock, B target, DataflowMessageHeader messageHeader)
 15        {
 10227316            SourceBlock = sourceBlock;
 10227317            Target = target;
 10227318            MessageHeader = messageHeader;
 10227319        }
 20
 21        public bool Consume([MaybeNullWhen(false)] out T messageValue)
 22        {
 10227123            var v = SourceBlock.ConsumeMessage(MessageHeader, Target, out var messageConsumed);
 10227124            if (messageConsumed)
 25            {
 10227126                messageValue = v!;
 10227127                return true;
 28            }
 29            else
 30            {
 031                messageValue = default;
 032                return false;
 33            }
 34        }
 35
 036        public bool Reserve() => SourceBlock.ReserveMessage(MessageHeader, Target);
 037        public void ReleaseReservation() => SourceBlock.ReleaseReservation(MessageHeader, Target);
 38    }
 39
 40    /// <exclude />
 41    public class PostponedMessages<B, T>(B owner)
 42    where B : ITargetBlock<T>
 43    {
 44        private readonly UniquePriorityQueue<ISourceBlock<T>, DataflowMessageHeader, long> _priorityQueue = new();
 45        private long i;
 46
 47        public int Count => _priorityQueue.Count;
 48
 49        public long Postpone(ISourceBlock<T> s, DataflowMessageHeader h)
 50        {
 51            _priorityQueue.Enqueue(s, h, ++i);
 52            return i;
 53        }
 54
 55        public long Increment => ++i;
 56
 57        public bool Unregister(ISourceBlock<T> s) => _priorityQueue.Remove(s, out var _, out var _);
 58
 59        public bool Unregister(ISourceBlock<T> s, DataflowMessageHeader h)
 60        {
 61            if (_priorityQueue.TryGet(s, out var current, out var p) && current == h)
 62            {
 63                return Unregister(s);
 64            } else
 65            {
 66                return false;
 67            }
 68        }
 69
 70        public bool IsEmpty => Count == 0;
 71
 72        public bool TryPeekPostponed([MaybeNullWhen(false)] out PostponedMessage<B, T> postponedMessage)
 73        {
 74            if (_priorityQueue.TryPeek(out var s, out var h, out var _))
 75            {
 76                postponedMessage = new PostponedMessage<B, T>(s, owner, h);
 77                return true;
 78            }
 79            postponedMessage = default;
 80            return false;
 81        }
 82
 83        public bool TryDequeuePostponed(out PostponedMessage<B, T> postponedMessage)
 84        {
 85            if (_priorityQueue.TryDequeue(out var s, out var h, out var _))
 86            {
 87                postponedMessage = new PostponedMessage<B, T>(s, owner, h);
 88                return true;
 89            }
 90            postponedMessage = default;
 91            return false;
 92        }
 93
 94        public bool TryPeekPostponedFor(ISourceBlock<T> t, [MaybeNullWhen(false)] out PostponedMessage<B, T> postponedMe
 95        {
 96            if (_priorityQueue.TryGet(t, out var h, out var _))
 97            {
 98                postponedMessage = new PostponedMessage<B, T>(t, owner, h);
 99                return true;
 100            } else
 101            {
 102                postponedMessage = default;
 103                return false;
 104            }
 105        }
 106
 107        public void Clear() => _priorityQueue.Clear();
 108    }
 109}