< Summary

Information
Class: CounterpointCollective.Dataflow.OrderPreservingChoiceBlock<T1, T2>
Assembly: Dataflow.Composable
File(s): /builds/counterpointcollective/composabledataflowblocks/Source/Dataflow.Composable/DataFlow/OrderPreservingChoiceBlock.cs
Line coverage
95%
Covered lines: 68
Uncovered lines: 3
Coverable lines: 71
Total lines: 157
Line coverage: 95.7%
Branch coverage
96%
Covered branches: 29
Total branches: 30
Branch coverage: 96.6%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
get_Lock()100%11100%
get_CurrentBranch()100%11100%
get_ThenBranch()100%11100%
get_ElseBranch()100%11100%
get_TargetSide()100%11100%
get_SourceSide()100%11100%
get_Count()100%11100%
get_InputCount()100%11100%
get_OutputCount()100%11100%
get_BoundedCapacity()100%210%
set_BoundedCapacity(...)100%210%
.ctor(...)100%1414100%
OnMessagesFanningOutTo(...)100%1010100%
OnMessagesFannedInFrom(...)83.33%6692.3%

File(s)

/builds/counterpointcollective/composabledataflowblocks/Source/Dataflow.Composable/DataFlow/OrderPreservingChoiceBlock.cs

#LineLine coverage
 1using CounterpointCollective.Collections;
 2using CounterpointCollective.Dataflow.Encapsulation;
 3using CounterpointCollective.Dataflow.Fluent;
 4using CounterpointCollective.Dataflow.Notifying;
 5using System.Threading.Tasks.Dataflow;
 6
 7namespace CounterpointCollective.Dataflow
 8{
 9    public class OrderPreservingChoiceBlock<TInput, TOutput> : AbstractEncapsulatedPropagatorBlock<TInput, TOutput>, IRe
 10    {
 68411        private object Lock { get; } = new();
 12
 137513        internal Branch<TInput, TOutput>? CurrentBranch { get; private set; }
 43114        internal Branch<TInput, TOutput> ThenBranch { get; private set; }
 29315        internal Branch<TInput, TOutput> ElseBranch { get; private set; }
 16
 39317        protected override ITargetBlock<TInput> TargetSide => _inner;
 18
 116119        protected override ISourceBlock<TOutput> SourceSide => _inner;
 20
 21        private readonly NonOrderPreservingChoiceBlock<TInput, TOutput> _inner;
 22
 123        public int Count => _inner.Count;
 24
 225        public int InputCount => _inner.InputCount;
 26
 327        public int OutputCount => _inner.OutputCount;
 28
 29        public int BoundedCapacity
 30        {
 031            get => _inner.BoundedCapacity;
 032            set => _inner.BoundedCapacity = value;
 33        }
 34
 1235        public OrderPreservingChoiceBlock(
 1236            Predicate<TInput> predicate,
 1237            IPropagatorBlock<TInput, TOutput> thenBlock,
 1238            IPropagatorBlock<TInput, TOutput> elseBlock,
 1239            ExecutionDataflowBlockOptions? options = null
 1240        )
 41        {
 1242            options ??= new();
 43
 1244            var conditionCheckBlock = new GroupAdjacentBlock<TInput, bool, TInput>(
 36145                h => predicate(h),
 36146                h => h,
 1247                new() { CancellationToken = options.CancellationToken, SingleProducerConstrained = options.SingleProduce
 1248                flushOnIdle: true
 1249            )
 1250            .Pipeline()
 1251            .TransformMany(g =>
 1252            {
 31253                OnMessagesFanningOutTo(g.Key ? ThenBranch! : ElseBranch!, g.Count());
 67354                return g.Select(e => (g.Key, e));
 1255            }, new() { CancellationToken = options.CancellationToken, SingleProducerConstrained = true })
 1256            .Build();
 57
 58
 1259            ThenBranch = new(BranchName.Then, options.CancellationToken);
 1260            thenBlock = thenBlock.Pipeline()
 1261                .LinkTo(ThenBranch.Gate, new() { PropagateCompletion = true })
 1262                .WithNotification(h => {
 22163                    h.OnDeliveringMessages = count => OnMessagesFannedInFrom(ThenBranch, count);
 2464                    h.ConfigureDispatching = q => q.UseSynchronousDispatch();
 1265                })
 1266                .Build();
 67
 68
 1269            ElseBranch = new(BranchName.Else, options.CancellationToken);
 1270            elseBlock = elseBlock.Pipeline()
 1271                .LinkTo(ElseBranch.Gate, new() { PropagateCompletion = true })
 1272                .WithNotification(h => {
 16373                    h.OnDeliveringMessages = count => OnMessagesFannedInFrom(ElseBranch, count);
 2474                    h.ConfigureDispatching = q => q.UseSynchronousDispatch();
 2475                }).Build();
 76
 77
 1278            _inner = new(conditionCheckBlock, thenBlock, elseBlock, options);
 1279        }
 80
 1281        private readonly LinkedList<(Branch<TInput, TOutput> Branch, int RequiredDeliveryQuota)> _queue = [];
 82
 83        private void OnMessagesFanningOutTo(Branch<TInput, TOutput> branch, int count)
 84        {
 31285            lock (Lock)
 86            {
 31287                CurrentBranch ??= branch;
 31288                if (CurrentBranch == branch && _queue.Count == 0)
 89                {
 12990                    branch.GrantDeliveryQuota(count);
 91                }
 18392                else if (_queue.Last != null && _queue.Last.Value.Branch == branch)
 93                {
 4994                    _queue.Last.Value = (branch, _queue.Last.Value.RequiredDeliveryQuota + count);
 95                }
 96                else
 97                {
 13498                    _queue.AddLast((branch, count));
 99                }
 134100            }
 312101        }
 102
 103        private void OnMessagesFannedInFrom(Branch<TInput, TOutput> b, int count)
 104        {
 360105            lock (Lock)
 106            {
 360107                if (CurrentBranch != b)
 108                {
 0109                    throw new InvalidOperationException($"not expecting messages from {b.Name}");
 110                }
 360111                b.OutstandingDeliveries -= count;
 360112                if (b.OutstandingDeliveries == 0)
 113                {
 262114                    var f = _queue.First;
 262115                    if (f != null)
 116                    {
 134117                        CurrentBranch = f.Value.Branch;
 134118                        f.Value.Branch.GrantDeliveryQuota(f.Value.RequiredDeliveryQuota);
 134119                        _queue.RemoveFirst();
 120                    }
 121                    else
 122                    {
 128123                        CurrentBranch = null;
 124                    }
 125                }
 226126            }
 360127        }
 128    }
 129    internal enum BranchName { Then, Else };
 130
 131    internal record Branch<I, O>
 132    {
 133        public BranchName Name { get; }
 134
 135        private readonly TokenGateBlock<O> _gate;
 136        public IPropagatorBlock<O, O> Gate { get; }
 137
 138        public int Tokens => _gate.Tokens;
 139
 140        public Branch(BranchName name, CancellationToken cancellationToken)
 141        {
 142            Name = name;
 143
 144            var b = new BufferBlock<O>(new() { CancellationToken = cancellationToken });
 145            _gate = new TokenGateBlock<O>(b);
 146            Gate = DataflowBlock.Encapsulate(_gate,b);
 147        }
 148
 149        public void GrantDeliveryQuota(int c)
 150        {
 151            OutstandingDeliveries += c;
 152            _gate.Allow(c);
 153        }
 154
 155        public int OutstandingDeliveries { get; set; }
 156    };
 157}