< Summary

Information
Class: CounterpointCollective.Dataflow.NonOrderPreservingChoiceBlock<T1, T2>
Assembly: Dataflow.Composable
File(s): /builds/counterpointcollective/composabledataflowblocks/Source/Dataflow.Composable/DataFlow/NonOrderPreservingChoiceBlock.cs
Line coverage
96%
Covered lines: 87
Uncovered lines: 3
Coverable lines: 90
Total lines: 135
Line coverage: 96.6%
Branch coverage
85%
Covered branches: 24
Total branches: 28
Branch coverage: 85.7%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
get_TargetSide()100%11100%
get_SourceSide()100%11100%
get_Count()100%11100%
get_InputCount()100%11100%
get_OutputCount()100%11100%
get_BoundedCapacity()100%210%
set_BoundedCapacity(...)100%210%
.ctor(...)50%44100%
.ctor(...)94.44%1818100%
SetupCompletion(...)83.33%6694.11%

File(s)

/builds/counterpointcollective/composabledataflowblocks/Source/Dataflow.Composable/DataFlow/NonOrderPreservingChoiceBlock.cs

#LineLine coverage
 1using CounterpointCollective.Collections;
 2using CounterpointCollective.Dataflow.Encapsulation;
 3using CounterpointCollective.Dataflow.Fluent;
 4using CounterpointCollective.Dataflow.Internal;
 5using System.Threading.Tasks.Dataflow;
 6
 7namespace CounterpointCollective.Dataflow
 8{
 9    public class NonOrderPreservingChoiceBlock<TInput, TOutput> : AbstractEncapsulatedPropagatorBlock<TInput, TOutput>, 
 10    {
 10040211        protected override ITargetBlock<TInput> TargetSide => _boundedPropagatorBlock;
 12
 20173813        protected override ISourceBlock<TOutput> SourceSide => _boundedPropagatorBlock;
 14
 15        private readonly BoundedPropagatorBlock<TInput, TOutput> _boundedPropagatorBlock;
 16        private readonly BufferBlock<TOutput> _outputBuffer;
 17
 318        public int Count => _boundedPropagatorBlock.Count;
 19
 220        public int InputCount => Count - OutputCount;
 21
 522        public int OutputCount => _outputBuffer.Count;
 23
 24        public int BoundedCapacity
 25        {
 026            get => _boundedPropagatorBlock.BoundedCapacity;
 027            set => _boundedPropagatorBlock.BoundedCapacity = value;
 28        }
 29
 30        public NonOrderPreservingChoiceBlock(
 31            Predicate<TInput> predicate,
 32            IPropagatorBlock<TInput, TOutput> thenBlock,
 33            IPropagatorBlock<TInput, TOutput> elseBlock,
 34            ExecutionDataflowBlockOptions? options = null
 135        ) : this(
 10000036            new TransformBlock<TInput, (bool, TInput)>(i => (predicate(i), i),
 137            new() {
 138                CancellationToken = options?.CancellationToken ?? CancellationToken.None,
 139                SingleProducerConstrained = options?.SingleProducerConstrained ?? false
 140            }),
 141            thenBlock,
 142            elseBlock,
 143            options
 144        )
 45        {
 146        }
 47
 1348        public NonOrderPreservingChoiceBlock(
 1349            IPropagatorBlock<TInput, (bool P, TInput V)> conditionCheckBlock,
 1350            IPropagatorBlock<TInput, TOutput> thenBlock,
 1351            IPropagatorBlock<TInput, TOutput> elseBlock,
 1352            DataflowBlockOptions? options = null
 1353        )
 54        {
 1355            options ??= new();
 56
 1357            _outputBuffer = new BufferBlock<TOutput>(new() { CancellationToken = options.CancellationToken });
 58
 1359            _boundedPropagatorBlock = new BoundedPropagatorBlock<TInput, TOutput>(
 1360                DataflowBlock.Encapsulate(conditionCheckBlock, _outputBuffer),
 1361                options.BoundedCapacity
 1362            );
 63
 1364            conditionCheckBlock =
 1365                conditionCheckBlock
 1366                .Pipeline()
 1367                .SelectLast(e => _boundedPropagatorBlock.CreateExit(e))
 1368                .Build();
 69
 1370            conditionCheckBlock.Pipeline()
 10036171            .SynchronousFilter(t => t.P)
 5020972            .SynchronousTransform(t => t.V)
 1373            .LinkTo(thenBlock, new() { PropagateCompletion = true });
 74
 1375            conditionCheckBlock.Pipeline()
 5227076            .SynchronousFilter(t => !t.P)
 5010277            .SynchronousTransform(t => t.V)
 1378            .LinkTo(elseBlock, new() { PropagateCompletion = true });
 79
 1380            thenBlock
 1381                .Pipeline()
 1382                .WithNotification(h => {
 5022283                    h.OnDeliveringMessages = count => _boundedPropagatorBlock.AdjustCount(count);
 2684                    h.ConfigureDispatching = q => q.UseSynchronousDispatch();
 1385                })
 1386                .LinkTo(_outputBuffer, new());
 87
 1388            elseBlock
 1389                .Pipeline()
 1390                .WithNotification(h => {
 5016491                    h.OnDeliveringMessages = count => _boundedPropagatorBlock.AdjustCount(count);
 2692                    h.ConfigureDispatching = q => q.UseSynchronousDispatch();
 1393                })
 1394                .LinkTo(_outputBuffer, new());
 95
 96
 1397            SetupCompletion(conditionCheckBlock.Completion, thenBlock, options.CancellationToken);
 1398            SetupCompletion(conditionCheckBlock.Completion, elseBlock, options.CancellationToken);
 99
 13100            Task.Run(async () =>
 13101            {
 13102                var t = Task.WhenAll(
 13103                    conditionCheckBlock.Completion,
 13104                    thenBlock.Completion,
 13105                    elseBlock.Completion
 13106                );
 13107                await Task.WhenAny(t);
 12108                if (!options.CancellationToken.IsCancellationRequested)
 13109                {
 11110                    _ = t.PropagateCompletion(_outputBuffer);
 13111                }
 25112            });
 13113        }
 114
 115        private void SetupCompletion(Task completionAllowed, IPropagatorBlock<TInput, TOutput> myBranch, CancellationTok
 116        {
 26117            completionAllowed.PropagateCompletion(myBranch);
 26118            myBranch.Completion.ContinueWith(t =>
 26119            {
 22120                if (cancellationToken.IsCancellationRequested)
 26121                {
 0122                    return;
 26123                }
 22124                else if (t.IsFaulted)
 26125                {
 6126                    Fault(t.Exception);
 26127                }
 16128                else if (!completionAllowed.IsCompleted)
 26129                {
 2130                    Fault(new InvalidOperationException("Branch completed prematurely"));
 26131                }
 42132            }, cancellationToken);
 26133        }
 134    }
 135}