< Summary

Information
Class: CounterpointCollective.Dataflow.PriorityBufferBlock<T1, T2>
Assembly: Dataflow.Composable
File(s): /builds/counterpointcollective/composabledataflowblocks/Source/Dataflow.Composable/DataFlow/PriorityBufferBlock.cs
Line coverage
99%
Covered lines: 115
Uncovered lines: 1
Coverable lines: 116
Total lines: 172
Line coverage: 99.1%
Branch coverage
100%
Covered branches: 6
Total branches: 6
Branch coverage: 100%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%66100%
get_ElectionBufferContainsOutOfOrderItem()100%11100%
get_ElectedBufferCount()100%210%
get_TargetSide()100%11100%
get_SourceSide()100%11100%
AwaitElectionsHeldAsync()100%11100%
get_Count()100%11100%
.ctor()100%11100%
.ctor(...)100%11100%

File(s)

/builds/counterpointcollective/composabledataflowblocks/Source/Dataflow.Composable/DataFlow/PriorityBufferBlock.cs

#LineLine coverage
 1using CounterpointCollective.Collections;
 2using CounterpointCollective.Dataflow.Encapsulation;
 3using CounterpointCollective.Dataflow.Fluent;
 4using CounterpointCollective.Dataflow.Internal;
 5using CounterpointCollective.Dataflow.Notifying;
 6using CounterpointCollective.Threading;
 7using System.Collections.Generic;
 8using System.Threading;
 9using System.Threading.Tasks;
 10using System.Threading.Tasks.Dataflow;
 11
 12namespace CounterpointCollective.Dataflow
 13{
 14    public class PriorityBufferBlock<T, TPriority>
 15        : AbstractEncapsulatedPropagatorBlock<(T Message, TPriority Priority), (T Message, TPriority Priority)>
 16    {
 17        private readonly BoundedPropagatorBlock<(T Message, TPriority Priority),(T Message, TPriority Priority)> _bounde
 718        private readonly AsyncAutoResetEventSlim _holdElectionsSemaphore = new(false);
 719        private readonly AsyncManualResetEventSlim _electionsHeld = new();
 20
 21        private TPriority worstPrioInElected = default!;
 13022        internal bool ElectionBufferContainsOutOfOrderItem { get; private set; }
 23        private int electedBufferCount;
 024        internal int ElectedBufferCount => Volatile.Read(ref electedBufferCount);
 25
 2726        protected override ITargetBlock<(T Message, TPriority Priority)> TargetSide => _boundedState;
 27
 6428        protected override ISourceBlock<(T Message, TPriority Priority)> SourceSide => _boundedState;
 29
 30        private readonly NotifyingSourceBlock<(T Message, TPriority Priority)> _outputBlock;
 31
 32        public async Task AwaitElectionsHeldAsync()
 33        {
 1234            await _outputBlock.ProcessPendingEventsAsync();
 1235            await _electionsHeld.WaitAsync();
 1236        }
 37
 1838        public int Count => _boundedState.Count;
 39
 440        public PriorityBufferBlock() : this(new())
 441        { }
 42
 43        /// <param name="electedBufferSize">
 44        /// The number of items to eagerly buffer for immediate consumption.
 45        ///
 46        /// A larger buffer can improve throughput when consuming messages, because more items are ready to be delivered
 47        /// However, a larger buffer also increases the likelihood of re-elections if new messages with higher priority 
 48        /// Choose a value that balances consumption throughput against priority accuracy for your scenario.
 49        /// </param>
 50        public PriorityBufferBlock(DataflowBlockOptions options, int electedBufferSize = 1)
 751            : this(Comparer<TPriority>.Default, options, electedBufferSize)
 752        { }
 53
 754        public PriorityBufferBlock(IComparer<TPriority> comparer, DataflowBlockOptions options, int electedBufferSize = 
 55        {
 56            void TriggerElection()
 57            {
 58                _electionsHeld.Reset();
 59                _holdElectionsSemaphore.Set();
 60            }
 61
 762            var inputQueue = new BufferBlock<(T Message, TPriority Priority)>();
 63
 764            var queuedItems = new PriorityQueue<T, TPriority>(comparer);
 765            var electionsCanTerminateGracefullyEvent = new AsyncManualResetEventSlim();
 766            electionsCanTerminateGracefullyEvent.Set();
 67
 768            electedBufferCount = 0;
 769            var electedBuffer =
 770                new BufferBlock<(T Message, TPriority Priority)>(
 771                    new()
 772                    {
 773                        BoundedCapacity = electedBufferSize,
 774                        CancellationToken = options.CancellationToken
 775                    });
 76
 77
 778            var electionTask =
 779                Task.Run(async () =>
 780                {
 3881                    while (true)
 782                    {
 4583                        var t = await Task.WhenAny(_holdElectionsSemaphore.WaitOneAsync());
 784
 4085                        if (!t.IsCompletedSuccessfully)
 786                        {
 787                            break;
 788                        }
 789
 790                        //Add new items to the queue
 3891                        if (inputQueue.TryReceiveAll(out var ii))
 792                        {
 2293                            queuedItems.EnqueueRange(ii);
 794                        }
 795
 796                        //Check if the queue has an item that should go before the electedBuffer's last item.
 3897                        ElectionBufferContainsOutOfOrderItem = electedBufferCount == 0
 3898                            ? false
 3899                            : ElectionBufferContainsOutOfOrderItem ||
 38100                                (
 38101                                    queuedItems.TryPeek(out var _, out var bestPriorityInQueue)
 38102                                    && comparer.Compare(bestPriorityInQueue, worstPrioInElected) < 0
 38103                                );
 7104
 7105                        //If so, try to reclaim electedBuffer's items and add them to the queue too.
 38106                        if (ElectionBufferContainsOutOfOrderItem && electedBuffer.TryReceiveAll(out var oldItems))
 7107                        {
 4108                            ElectionBufferContainsOutOfOrderItem = false;
 4109                            Interlocked.Add(ref electedBufferCount, -oldItems.Count);
 4110                            queuedItems.EnqueueRange(oldItems); //O(m log(n+m)), m is expected to be small.
 7111                        }
 7112
 38113                        if (queuedItems.Count == 0 && !ElectionBufferContainsOutOfOrderItem)
 7114                        {
 5115                            electionsCanTerminateGracefullyEvent.Set();
 7116                        }
 7117                        else
 7118                        {
 33119                            electionsCanTerminateGracefullyEvent.Reset();
 33120                            if (!ElectionBufferContainsOutOfOrderItem)
 7121                            {
 59122                                while (electedBufferCount < electedBufferSize && queuedItems.TryDequeue(out var m, out v
 7123                                {
 29124                                    worstPrioInElected = p;
 29125                                    await electedBuffer.SendAsync((m, p));
 29126                                    Interlocked.Increment(ref electedBufferCount);
 29127                                }
 7128                            }
 7129                        }
 38130                        _electionsHeld.Set();
 7131                    }
 9132                });
 133
 7134            _outputBlock = electedBuffer.WithNotification(h =>
 7135                {
 7136                    h.OnDeliveringMessages = count =>
 7137                    {
 20138                        Interlocked.Add(ref electedBufferCount, -count);
 20139                        TriggerElection();
 27140                    };
 9141                    h.OnReservationReleased = () => TriggerElection();
 14142                    h.ConfigureDispatching = q => q.UseSynchronousDispatch();
 7143                }
 7144            );
 145
 7146            inputQueue.Completion.ContinueWith(async t =>
 7147            {
 2148                if (t.IsCompletedSuccessfully)
 7149                {
 2150                    await electionsCanTerminateGracefullyEvent.WaitAsync();
 7151                }
 2152                _holdElectionsSemaphore.Terminate();
 2153                queuedItems.Clear();
 9154            });
 155
 7156            electionTask.ContinueWith(_ =>
 7157            {
 2158                _ = inputQueue.PropagateCompletion(electedBuffer);
 9159            });
 160
 7161            _boundedState = new BoundedPropagatorBlock<
 7162                (T Message, TPriority Priority),
 7163                (T Message, TPriority Priority)>
 7164            (
 7165                inputQueue,
 7166                _outputBlock,
 7167                options.BoundedCapacity,
 7168                onEntered: TriggerElection
 7169            );
 7170        }
 171    }
 172}