< Summary

Information
Class: CounterpointCollective.Dataflow.GuaranteedBroadcastBlockOptions
Assembly: Dataflow.Composable
File(s): /builds/counterpointcollective/composabledataflowblocks/Source/Dataflow.Composable/DataFlow/GuaranteedBroadcastBlock.cs
Line coverage
100%
Covered lines: 1
Uncovered lines: 0
Coverable lines: 1
Total lines: 156
Line coverage: 100%
Branch coverage
N/A
Covered branches: 0
Total branches: 0
Branch coverage: N/A
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
get_BoundedCapacityMode()100%11100%

File(s)

/builds/counterpointcollective/composabledataflowblocks/Source/Dataflow.Composable/DataFlow/GuaranteedBroadcastBlock.cs

#LineLine coverage
 1using CounterpointCollective.Collections;
 2using CounterpointCollective.Dataflow.Encapsulation;
 3using CounterpointCollective.Dataflow.Fluent;
 4using CounterpointCollective.Dataflow.Internal;
 5using CounterpointCollective.Dataflow.Notifying;
 6using System;
 7using System.Collections.Generic;
 8using System.Linq;
 9using System.Threading.Tasks;
 10using System.Threading.Tasks.Dataflow;
 11
 12namespace CounterpointCollective.Dataflow
 13{
 14    public enum GuaranteedBroadcastBlockBoundedCapacityMode
 15    {
 16        SmallestQueue,
 17        LargestQueue
 18    }
 19
 20    public class GuaranteedBroadcastBlockOptions : DataflowBlockOptions
 21    {
 22        /// <summary>
 23        /// Whether to use the smallest or largest queue size for bounded capacity.
 24        /// Defaults to <see cref="GuaranteedBroadcastBlockBoundedCapacityMode.SmallestQueue"/>,
 25        /// meaning the fastest consuming block will determine the Count.
 26        ///
 27        /// If you set it to <see cref="GuaranteedBroadcastBlockBoundedCapacityMode.LargestQueue"/>,
 28        /// the slowest consuming block will determine when the Count.
 29        /// </summary>
 2930        public GuaranteedBroadcastBlockBoundedCapacityMode BoundedCapacityMode { get; set; } = GuaranteedBroadcastBlockB
 31    }
 32
 33    /// <summary>
 34    /// Guaranteed delivery of messages to NrOfSources sources.
 35    /// Queueing up messages in the sources until the smallest queue reached BoundedCapacity.
 36    /// </summary>
 37
 38    public sealed class GuaranteedBroadcastBlock<T> : AbstractEncapsulatedTargetBlock<T>
 39    {
 40        private object Lock { get; } = new();
 41        private readonly UniquePriorityQueue<int, long, long> _totalMessagesDeliveredPerQueue;
 42        private readonly NotifyingPropagatorBlock<T, T>[] _queues;
 43        public IReceivableSourceBlock<T> this[int index] => _queues[index];
 44
 45        /// <summary>
 46        /// Size of the smallest/largest queue, depending on BoundedCapacityMode.
 47        /// </summary>
 48        public int Count
 49        {
 50            get
 51            {
 52                foreach (var q in _queues)
 53                {
 54                    q.ProcessPendingEvents();
 55                }
 56                return _bounded.Count;
 57            }
 58        }
 59        private readonly Action<int>? _onQueuesShrunk;
 60        private readonly BoundedTargetBlock<T> _bounded;
 61
 62        /// <exclude/>
 63        public override Task Completion { get; }
 64
 65        /// <exclude/>
 66        protected override ITargetBlock<T> TargetSide => _bounded;
 67
 68        /// <exclude/>
 69        protected override IDataflowBlock CompletionSide => this;
 70
 71        public GuaranteedBroadcastBlock(
 72            int nrOfSources,
 73            GuaranteedBroadcastBlockOptions options,
 74            Action<int>? onQueuesShrunk = null
 75        )
 76        {
 77            _onQueuesShrunk = onQueuesShrunk;
 78
 79            _queues = new NotifyingPropagatorBlock<T,T>[nrOfSources];
 80            if (options.BoundedCapacityMode == GuaranteedBroadcastBlockBoundedCapacityMode.LargestQueue)
 81            {
 82                //In largest queue mode, the queue with the most total messages deliveries determines the Count.
 83                _totalMessagesDeliveredPerQueue = new();
 84            }
 85            else
 86            {
 87                //In smallest queue mode, the queue with the least total messages deliveries determines the Count,
 88                _totalMessagesDeliveredPerQueue = new(Comparer<long>.Create((x, y) => y.CompareTo(x)));
 89            }
 90
 91            for (var i = 0; i < nrOfSources; i++)
 92            {
 93                var j = i;
 94                _queues[i] =
 95                    new BufferBlock<T>(new() { CancellationToken = options.CancellationToken })
 96                    .WithNotification(h =>
 97                    {
 98                        h.OnDeliveringMessages = count => HandleMessageDelivery(count, j);
 99                        h.ConfigureDispatching = q => q.UseInlineFirstThenAsyncDispatch();
 100                    });
 101
 102
 103                _totalMessagesDeliveredPerQueue.Enqueue(i, 0, 0);
 104            }
 105
 106            var a = new ActionBlock<T>(m =>
 107            {
 108                for (var i = 0; i < nrOfSources; i++)
 109                {
 110                    _queues[i].PostAsserted(m);
 111                }
 112            }, new() { CancellationToken = options.CancellationToken, SingleProducerConstrained = true });
 113            _bounded = new(a, options.BoundedCapacity);
 114
 115            var tcs = new TaskCompletionSource();
 116            Completion = tcs.Task;
 117            Task.Run(async () =>
 118            {
 119                var t = await Task.WhenAny(a.Completion);
 120                _ = t.PropagateCompletion(_queues);
 121                await Task.WhenAny(Task.WhenAll(_queues.Select(b => b.Completion)));
 122                if (options.CancellationToken.IsCancellationRequested)
 123                {
 124                    tcs.SetCanceled();
 125                }
 126                else
 127                {
 128                    await t.PropagateCompletionAsync(tcs);
 129                }
 130            });
 131        }
 132
 133        private void HandleMessageDelivery(int count, int queueIndex)
 134        {
 135            lock (Lock)
 136            {
 137                //Remember the previous smallest/ largest queue total delivery
 138                var (_, totalDelivered, _) = _totalMessagesDeliveredPerQueue.Peek();
 139
 140                //Update my total delivered count
 141                var (myTotalDelivered, _) = _totalMessagesDeliveredPerQueue.Get(queueIndex);
 142                myTotalDelivered += count;
 143                _totalMessagesDeliveredPerQueue.Enqueue(queueIndex, myTotalDelivered, myTotalDelivered);
 144
 145                //If the smallest/ largest queue total delivery changed, update blocks Count property.
 146                (_, var newTotalDelivered, _) = _totalMessagesDeliveredPerQueue.Peek();
 147                var diff = (int)(totalDelivered - newTotalDelivered);
 148                if (diff < 0)
 149                {
 150                    _bounded.AdjustCount(diff);
 151                    _onQueuesShrunk?.Invoke(diff);
 152                }
 153            }
 154        }
 155    }
 156}

Methods/Properties

get_BoundedCapacityMode()