< Summary

Information
Class: CounterpointCollective.Dataflow.Branch<T1, T2>
Assembly: Dataflow.Composable
File(s): /builds/counterpointcollective/composabledataflowblocks/Source/Dataflow.Composable/DataFlow/OrderPreservingChoiceBlock.cs
Line coverage
85%
Covered lines: 12
Uncovered lines: 2
Coverable lines: 14
Total lines: 157
Line coverage: 85.7%
Branch coverage
N/A
Covered branches: 0
Total branches: 0
Branch coverage: N/A
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%210%
get_Name()100%210%
get_Gate()100%11100%
get_Tokens()100%11100%
.ctor(...)100%11100%
GrantDeliveryQuota(...)100%11100%
get_OutstandingDeliveries()100%11100%

File(s)

/builds/counterpointcollective/composabledataflowblocks/Source/Dataflow.Composable/DataFlow/OrderPreservingChoiceBlock.cs

#LineLine coverage
 1using CounterpointCollective.Collections;
 2using CounterpointCollective.Dataflow.Encapsulation;
 3using CounterpointCollective.Dataflow.Fluent;
 4using CounterpointCollective.Dataflow.Notifying;
 5using System.Threading.Tasks.Dataflow;
 6
 7namespace CounterpointCollective.Dataflow
 8{
 9    public class OrderPreservingChoiceBlock<TInput, TOutput> : AbstractEncapsulatedPropagatorBlock<TInput, TOutput>, IRe
 10    {
 11        private object Lock { get; } = new();
 12
 13        internal Branch<TInput, TOutput>? CurrentBranch { get; private set; }
 14        internal Branch<TInput, TOutput> ThenBranch { get; private set; }
 15        internal Branch<TInput, TOutput> ElseBranch { get; private set; }
 16
 17        protected override ITargetBlock<TInput> TargetSide => _inner;
 18
 19        protected override ISourceBlock<TOutput> SourceSide => _inner;
 20
 21        private readonly NonOrderPreservingChoiceBlock<TInput, TOutput> _inner;
 22
 23        public int Count => _inner.Count;
 24
 25        public int InputCount => _inner.InputCount;
 26
 27        public int OutputCount => _inner.OutputCount;
 28
 29        public int BoundedCapacity
 30        {
 31            get => _inner.BoundedCapacity;
 32            set => _inner.BoundedCapacity = value;
 33        }
 34
 35        public OrderPreservingChoiceBlock(
 36            Predicate<TInput> predicate,
 37            IPropagatorBlock<TInput, TOutput> thenBlock,
 38            IPropagatorBlock<TInput, TOutput> elseBlock,
 39            ExecutionDataflowBlockOptions? options = null
 40        )
 41        {
 42            options ??= new();
 43
 44            var conditionCheckBlock = new GroupAdjacentBlock<TInput, bool, TInput>(
 45                h => predicate(h),
 46                h => h,
 47                new() { CancellationToken = options.CancellationToken, SingleProducerConstrained = options.SingleProduce
 48                flushOnIdle: true
 49            )
 50            .Pipeline()
 51            .TransformMany(g =>
 52            {
 53                OnMessagesFanningOutTo(g.Key ? ThenBranch! : ElseBranch!, g.Count());
 54                return g.Select(e => (g.Key, e));
 55            }, new() { CancellationToken = options.CancellationToken, SingleProducerConstrained = true })
 56            .Build();
 57
 58
 59            ThenBranch = new(BranchName.Then, options.CancellationToken);
 60            thenBlock = thenBlock.Pipeline()
 61                .LinkTo(ThenBranch.Gate, new() { PropagateCompletion = true })
 62                .WithNotification(h => {
 63                    h.OnDeliveringMessages = count => OnMessagesFannedInFrom(ThenBranch, count);
 64                    h.ConfigureDispatching = q => q.UseSynchronousDispatch();
 65                })
 66                .Build();
 67
 68
 69            ElseBranch = new(BranchName.Else, options.CancellationToken);
 70            elseBlock = elseBlock.Pipeline()
 71                .LinkTo(ElseBranch.Gate, new() { PropagateCompletion = true })
 72                .WithNotification(h => {
 73                    h.OnDeliveringMessages = count => OnMessagesFannedInFrom(ElseBranch, count);
 74                    h.ConfigureDispatching = q => q.UseSynchronousDispatch();
 75                }).Build();
 76
 77
 78            _inner = new(conditionCheckBlock, thenBlock, elseBlock, options);
 79        }
 80
 81        private readonly LinkedList<(Branch<TInput, TOutput> Branch, int RequiredDeliveryQuota)> _queue = [];
 82
 83        private void OnMessagesFanningOutTo(Branch<TInput, TOutput> branch, int count)
 84        {
 85            lock (Lock)
 86            {
 87                CurrentBranch ??= branch;
 88                if (CurrentBranch == branch && _queue.Count == 0)
 89                {
 90                    branch.GrantDeliveryQuota(count);
 91                }
 92                else if (_queue.Last != null && _queue.Last.Value.Branch == branch)
 93                {
 94                    _queue.Last.Value = (branch, _queue.Last.Value.RequiredDeliveryQuota + count);
 95                }
 96                else
 97                {
 98                    _queue.AddLast((branch, count));
 99                }
 100            }
 101        }
 102
 103        private void OnMessagesFannedInFrom(Branch<TInput, TOutput> b, int count)
 104        {
 105            lock (Lock)
 106            {
 107                if (CurrentBranch != b)
 108                {
 109                    throw new InvalidOperationException($"not expecting messages from {b.Name}");
 110                }
 111                b.OutstandingDeliveries -= count;
 112                if (b.OutstandingDeliveries == 0)
 113                {
 114                    var f = _queue.First;
 115                    if (f != null)
 116                    {
 117                        CurrentBranch = f.Value.Branch;
 118                        f.Value.Branch.GrantDeliveryQuota(f.Value.RequiredDeliveryQuota);
 119                        _queue.RemoveFirst();
 120                    }
 121                    else
 122                    {
 123                        CurrentBranch = null;
 124                    }
 125                }
 126            }
 127        }
 128    }
 129    internal enum BranchName { Then, Else };
 130
 0131    internal record Branch<I, O>
 132    {
 0133        public BranchName Name { get; }
 134
 135        private readonly TokenGateBlock<O> _gate;
 24136        public IPropagatorBlock<O, O> Gate { get; }
 137
 4138        public int Tokens => _gate.Tokens;
 139
 24140        public Branch(BranchName name, CancellationToken cancellationToken)
 141        {
 24142            Name = name;
 143
 24144            var b = new BufferBlock<O>(new() { CancellationToken = cancellationToken });
 24145            _gate = new TokenGateBlock<O>(b);
 24146            Gate = DataflowBlock.Encapsulate(_gate,b);
 24147        }
 148
 149        public void GrantDeliveryQuota(int c)
 150        {
 263151            OutstandingDeliveries += c;
 263152            _gate.Allow(c);
 263153        }
 154
 1606155        public int OutstandingDeliveries { get; set; }
 156    };
 157}