< Summary

Information
Class: CounterpointCollective.Dataflow.GuaranteedBroadcastBlock<T>
Assembly: Dataflow.Composable
File(s): /builds/counterpointcollective/composabledataflowblocks/Source/Dataflow.Composable/DataFlow/GuaranteedBroadcastBlock.cs
Line coverage
98%
Covered lines: 64
Uncovered lines: 1
Coverable lines: 65
Total lines: 156
Line coverage: 98.4%
Branch coverage
100%
Covered branches: 18
Total branches: 18
Branch coverage: 100%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
get_Lock()100%11100%
get_Item(...)100%11100%
get_Count()100%22100%
get_Completion()100%11100%
get_TargetSide()100%11100%
get_CompletionSide()100%210%
.ctor(...)100%1212100%
HandleMessageDelivery(...)100%44100%

File(s)

/builds/counterpointcollective/composabledataflowblocks/Source/Dataflow.Composable/DataFlow/GuaranteedBroadcastBlock.cs

#LineLine coverage
 1using CounterpointCollective.Collections;
 2using CounterpointCollective.Dataflow.Encapsulation;
 3using CounterpointCollective.Dataflow.Fluent;
 4using CounterpointCollective.Dataflow.Internal;
 5using CounterpointCollective.Dataflow.Notifying;
 6using System;
 7using System.Collections.Generic;
 8using System.Linq;
 9using System.Threading.Tasks;
 10using System.Threading.Tasks.Dataflow;
 11
 12namespace CounterpointCollective.Dataflow
 13{
 14    public enum GuaranteedBroadcastBlockBoundedCapacityMode
 15    {
 16        SmallestQueue,
 17        LargestQueue
 18    }
 19
 20    public class GuaranteedBroadcastBlockOptions : DataflowBlockOptions
 21    {
 22        /// <summary>
 23        /// Whether to use the smallest or largest queue size for bounded capacity.
 24        /// Defaults to <see cref="GuaranteedBroadcastBlockBoundedCapacityMode.SmallestQueue"/>,
 25        /// meaning the fastest consuming block will determine the Count.
 26        ///
 27        /// If you set it to <see cref="GuaranteedBroadcastBlockBoundedCapacityMode.LargestQueue"/>,
 28        /// the slowest consuming block will determine when the Count.
 29        /// </summary>
 30        public GuaranteedBroadcastBlockBoundedCapacityMode BoundedCapacityMode { get; set; } = GuaranteedBroadcastBlockB
 31    }
 32
 33    /// <summary>
 34    /// Guaranteed delivery of messages to NrOfSources sources.
 35    /// Queueing up messages in the sources until the smallest queue reached BoundedCapacity.
 36    /// </summary>
 37
 38    public sealed class GuaranteedBroadcastBlock<T> : AbstractEncapsulatedTargetBlock<T>
 39    {
 40140        private object Lock { get; } = new();
 41        private readonly UniquePriorityQueue<int, long, long> _totalMessagesDeliveredPerQueue;
 42        private readonly NotifyingPropagatorBlock<T, T>[] _queues;
 3443        public IReceivableSourceBlock<T> this[int index] => _queues[index];
 44
 45        /// <summary>
 46        /// Size of the smallest/largest queue, depending on BoundedCapacityMode.
 47        /// </summary>
 48        public int Count
 49        {
 50            get
 51            {
 13852                foreach (var q in _queues)
 53                {
 4654                    q.ProcessPendingEvents();
 55                }
 2356                return _bounded.Count;
 57            }
 58        }
 59        private readonly Action<int>? _onQueuesShrunk;
 60        private readonly BoundedTargetBlock<T> _bounded;
 61
 62        /// <exclude/>
 363        public override Task Completion { get; }
 64
 65        /// <exclude/>
 25466        protected override ITargetBlock<T> TargetSide => _bounded;
 67
 68        /// <exclude/>
 069        protected override IDataflowBlock CompletionSide => this;
 70
 1371        public GuaranteedBroadcastBlock(
 1372            int nrOfSources,
 1373            GuaranteedBroadcastBlockOptions options,
 1374            Action<int>? onQueuesShrunk = null
 1375        )
 76        {
 1377            _onQueuesShrunk = onQueuesShrunk;
 78
 1379            _queues = new NotifyingPropagatorBlock<T,T>[nrOfSources];
 1380            if (options.BoundedCapacityMode == GuaranteedBroadcastBlockBoundedCapacityMode.LargestQueue)
 81            {
 82                //In largest queue mode, the queue with the most total messages deliveries determines the Count.
 283                _totalMessagesDeliveredPerQueue = new();
 84            }
 85            else
 86            {
 87                //In smallest queue mode, the queue with the least total messages deliveries determines the Count,
 108988                _totalMessagesDeliveredPerQueue = new(Comparer<long>.Create((x, y) => y.CompareTo(x)));
 89            }
 90
 9291            for (var i = 0; i < nrOfSources; i++)
 92            {
 3393                var j = i;
 3394                _queues[i] =
 3395                    new BufferBlock<T>(new() { CancellationToken = options.CancellationToken })
 3396                    .WithNotification(h =>
 3397                    {
 42198                        h.OnDeliveringMessages = count => HandleMessageDelivery(count, j);
 6699                        h.ConfigureDispatching = q => q.UseInlineFirstThenAsyncDispatch();
 66100                    });
 101
 102
 33103                _totalMessagesDeliveredPerQueue.Enqueue(i, 0, 0);
 104            }
 105
 13106            var a = new ActionBlock<T>(m =>
 13107            {
 1484108                for (var i = 0; i < nrOfSources; i++)
 13109                {
 500110                    _queues[i].PostAsserted(m);
 13111                }
 255112            }, new() { CancellationToken = options.CancellationToken, SingleProducerConstrained = true });
 13113            _bounded = new(a, options.BoundedCapacity);
 114
 13115            var tcs = new TaskCompletionSource();
 13116            Completion = tcs.Task;
 13117            Task.Run(async () =>
 13118            {
 13119                var t = await Task.WhenAny(a.Completion);
 6120                _ = t.PropagateCompletion(_queues);
 25121                await Task.WhenAny(Task.WhenAll(_queues.Select(b => b.Completion)));
 6122                if (options.CancellationToken.IsCancellationRequested)
 13123                {
 2124                    tcs.SetCanceled();
 13125                }
 13126                else
 13127                {
 4128                    await t.PropagateCompletionAsync(tcs);
 13129                }
 19130            });
 13131        }
 132
 133        private void HandleMessageDelivery(int count, int queueIndex)
 134        {
 388135            lock (Lock)
 136            {
 137                //Remember the previous smallest/ largest queue total delivery
 388138                var (_, totalDelivered, _) = _totalMessagesDeliveredPerQueue.Peek();
 139
 140                //Update my total delivered count
 388141                var (myTotalDelivered, _) = _totalMessagesDeliveredPerQueue.Get(queueIndex);
 388142                myTotalDelivered += count;
 388143                _totalMessagesDeliveredPerQueue.Enqueue(queueIndex, myTotalDelivered, myTotalDelivered);
 144
 145                //If the smallest/ largest queue total delivery changed, update blocks Count property.
 388146                (_, var newTotalDelivered, _) = _totalMessagesDeliveredPerQueue.Peek();
 388147                var diff = (int)(totalDelivered - newTotalDelivered);
 388148                if (diff < 0)
 149                {
 237150                    _bounded.AdjustCount(diff);
 237151                    _onQueuesShrunk?.Invoke(diff);
 152                }
 179153            }
 388154        }
 155    }
 156}