< Summary

Information
Class: CounterpointCollective.Dataflow.ParallelBlock<T1, T2, T3>
Assembly: Dataflow.Composable
File(s): /builds/counterpointcollective/composabledataflowblocks/Source/Dataflow.Composable/DataFlow/ParallelBlock.cs
Line coverage
95%
Covered lines: 68
Uncovered lines: 3
Coverable lines: 71
Total lines: 149
Line coverage: 95.7%
Branch coverage
78%
Covered branches: 11
Total branches: 14
Branch coverage: 78.5%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
get_TargetSide()100%11100%
get_SourceSide()100%11100%
get_DegreeOfParallelism()100%11100%
get_InputCount()100%11100%
get_OutputCount()100%11100%
get_Count()100%11100%
.ctor(...)50%2297.22%
MessagesFannedOut(...)100%11100%
MessagesFannedIn(...)100%11100%
Hookup(...)87.5%8895.45%
LinkWorkers(...)75%4480%

File(s)

/builds/counterpointcollective/composabledataflowblocks/Source/Dataflow.Composable/DataFlow/ParallelBlock.cs

#LineLine coverage
 1using CounterpointCollective.Collections;
 2using CounterpointCollective.Dataflow.Encapsulation;
 3using CounterpointCollective.Dataflow.Fluent;
 4using CounterpointCollective.Dataflow.Internal;
 5using CounterpointCollective.Dataflow.Notifying;
 6using System.Threading.Tasks.Dataflow;
 7
 8namespace CounterpointCollective.Dataflow
 9{
 10    /// <summary>
 11    /// To prevent deadlocks, Source will always accept messages.
 12    /// However, the block itself will only accept new messages if the combined count of
 13    /// Source and the joining side (using either the smallest or largest queue count,
 14    /// depending on the configured mode) is below the capacity limit.
 15    ///
 16    /// Be careful when you pass GuaranteedBroadcastBlockBoundedCapacityMode.LargestQueue in the options.
 17    /// This may trigger a deadlock in some cases, namely:
 18    ///   - Queue[1] is empty.
 19    ///   - Queue[2] has items
 20    ///   - The ParBlock is full; it will not enqueue more messages before some are consumed.
 21    ///   - worker 1 requires to consume messages before it will produce a next message, e.g. because it is batching.
 22    ///   - worker 2 cannot consume more messages at the moment because it's output side is full
 23    /// The JoiningTarget will not consume messages until all workers have output available, causing a deadlock.
 24    /// </summary>
 25    public class ParallelBlock<I, T, O> : AbstractEncapsulatedPropagatorBlock<I, O>
 26    {
 27        /// <exclude/>
 3128        protected override ITargetBlock<I> TargetSide => _boundedPropagatorBlock;
 29        /// <exclude/>
 9430        protected override ISourceBlock<O> SourceSide => _boundedPropagatorBlock;
 31
 32        private readonly GuaranteedBroadcastBlock<I> _fanOutBlock;
 33        private readonly ITargetBlock<T> _fanInBlock;
 34
 4335        private int DegreeOfParallelism { get; }
 36
 1137        public int InputCount => _fanOutBlock.Count;
 38
 339        public int OutputCount => _boundedPropagatorBlock.Count - InputCount;
 40
 41        /// <summary>
 42        /// Items currently in the block, being either in the input side or output side.
 43        /// When they leave for the workers, they are no longer counted here.
 44        /// When they return from the workers, they are counted here again.
 45        /// This can cause the Count to be temporarily higher than the BoundedCapacity, just like
 46        /// in the TransformManyBlock.
 47        /// </summary>
 348        public int Count => _boundedPropagatorBlock.Count;
 49
 50        private readonly BoundedPropagatorBlock<I,O> _boundedPropagatorBlock;
 51
 652        private readonly List<IPropagatorBlock<I, T>> _workers = [];
 53        private int i; //Index of next worker to hookup
 54
 655        public ParallelBlock(
 656            int degreeOfParallelism,
 657            Func<T[], O> recombine,
 658            GuaranteedBroadcastBlockOptions options
 659        )
 60        {
 661            if (degreeOfParallelism < 2)
 62            {
 063                throw new ArgumentException("degreeOfParallelism must be at least 2.", nameof(degreeOfParallelism));
 64            }
 665            DegreeOfParallelism = degreeOfParallelism;
 66
 667            _fanOutBlock = new GuaranteedBroadcastBlock<I>(
 668                degreeOfParallelism,
 669                new()
 670                {
 671                    CancellationToken = options.CancellationToken,
 672                    BoundedCapacityMode = options.BoundedCapacityMode,
 673                },
 2874                onQueuesShrunk : diff => MessagesFannedOut(-diff)
 675            );
 76
 677            var j = new BatchBlock<T>(degreeOfParallelism, new()
 678            {
 679                CancellationToken = options.CancellationToken,
 680                Greedy = false
 681            })
 682            .Pipeline()
 683            .Transform(
 2884                b => recombine([.. b]),
 685                new ExecutionDataflowBlockOptions
 686                {
 687                    SingleProducerConstrained = true,
 688                    CancellationToken = options.CancellationToken
 689                }
 690                )
 691            .Build();
 692            _fanInBlock = j;
 93
 694            _boundedPropagatorBlock = new BoundedPropagatorBlock<I, O>(_fanOutBlock, j, options.BoundedCapacity);
 695        }
 96
 2897        private void MessagesFannedOut(int diff) => _boundedPropagatorBlock.AdjustCount(-diff);
 98
 2899        private void MessagesFannedIn(int diff) => _boundedPropagatorBlock.AdjustCount(diff);
 100
 101        public void Hookup(IPropagatorBlock<I,T> worker, DataflowLinkOptions options)
 102        {
 20103            if (i == DegreeOfParallelism)
 104            {
 0105                throw new InvalidOperationException($"All {DegreeOfParallelism} workers have already been hooked up.");
 106            }
 20107            _fanOutBlock[i].LinkTo(worker, options);
 108
 20109            worker
 20110                = i == 0
 20111                ? worker
 20112                .Pipeline()
 20113                .WithNotification(h =>
 20114                {
 34115                    h.OnDeliveringMessages = count => MessagesFannedIn(count);
 12116                    h.ConfigureDispatching = q => q.UseSynchronousDispatch();
 6117                })
 20118                .Build()
 20119                : worker;
 120
 20121            _workers.Add(worker);
 122
 20123            worker.LinkTo(_fanInBlock);
 20124            worker.PropagateCompletion(TaskContinuationOptions.OnlyOnFaulted, _fanInBlock);
 20125            i++;
 20126            if (i == DegreeOfParallelism)
 127            {
 26128                Task.WhenAll(_workers.Select(e => e.Completion))
 6129                    .PropagateCompletion(_fanInBlock);
 130            }
 20131        }
 132
 133        /// <summary>
 134        /// Convenience method to link all the workers in one go.
 135        /// </summary>
 136        public void LinkWorkers(IPropagatorBlock<I, T>[] workers, DataflowLinkOptions options)
 137        {
 3138            if (workers.Length != DegreeOfParallelism)
 139            {
 0140                throw new ArgumentException($"Exactly {DegreeOfParallelism} workers are required.", nameof(workers));
 141            }
 142
 34143            for (var i = 0; i < workers.Length; i++)
 144            {
 14145                Hookup(workers[i], options);
 146            }
 3147        }
 148    }
 149}