< Summary

Information
Class: CounterpointCollective.Dataflow.SynchronousFilterBlock<T>
Assembly: Dataflow.Composable
File(s): /builds/counterpointcollective/composabledataflowblocks/Source/Dataflow.Composable/DataFlow/SynchronousFilterBlock.cs
Line coverage
85%
Covered lines: 58
Uncovered lines: 10
Coverable lines: 68
Total lines: 177
Line coverage: 85.2%
Branch coverage
85%
Covered branches: 29
Total branches: 34
Branch coverage: 85.2%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
get_Predicate()100%11100%
get_OutgoingLock()100%11100%
get_Mode()100%11100%
.ctor(...)100%11100%
LinkTo(...)100%11100%
get_Completion()100%11100%
Complete()100%210%
Fault(...)100%210%
ConsumeMessage(...)100%44100%
ReleaseReservation(...)100%22100%
ReserveMessage(...)100%66100%
TryReceive(...)62.5%8883.33%
TryReceiveAll(...)100%44100%
.ctor(...)100%11100%
get_Completion()100%210%
Complete()100%11100%
Fault(...)100%11100%
OfferMessage(...)80%121075%
.ctor(...)100%11100%
get_Completion()100%210%
Complete()100%210%
Fault(...)100%210%
OfferMessage(...)100%210%

File(s)

/builds/counterpointcollective/composabledataflowblocks/Source/Dataflow.Composable/DataFlow/SynchronousFilterBlock.cs

#LineLine coverage
 1using System;
 2using System.Collections.Generic;
 3using System.Diagnostics.CodeAnalysis;
 4using System.Threading.Tasks;
 5using System.Threading.Tasks.Dataflow;
 6
 7namespace CounterpointCollective.Dataflow
 8{
 9    /// <summary>
 10    /// Block means not to accept messages if the predicate does not hold.
 11    /// Drop means the messages are accepted (and disappear) if the predicate does not hold.
 12    /// </summary>
 13    public enum FilterMode
 14    {
 15        Block,
 16        Drop
 17    }
 18
 19    public sealed class SynchronousFilterBlock<T> : IReceivableSourceBlock<T>
 20    {
 21
 22        private readonly ISourceBlock<T> _sourceBlock;
 15264723        private Predicate<T> Predicate { get; }
 24        private readonly DummyTarget _target;
 421425        private object OutgoingLock { get; } = new();
 26        private ITargetBlock<T>? reservationTarget;
 5231127        public FilterMode Mode { get; set; }
 28
 3629        public SynchronousFilterBlock(ISourceBlock<T> sourceBlock, Predicate<T> predicate, FilterMode mode = FilterMode.
 30        {
 3631            Mode = mode;
 3632            _sourceBlock = sourceBlock;
 3633            _target = new(this);
 3634            Predicate = predicate;
 3635        }
 36
 37        public IDisposable LinkTo(ITargetBlock<T> target, DataflowLinkOptions linkOptions)
 38        {
 3139            var t = new LinkTarget(this, target, linkOptions.MaxMessages);
 3140            return _sourceBlock.LinkTo(t, new() { PropagateCompletion = linkOptions.PropagateCompletion, Append = linkOp
 41        }
 42
 543        public Task Completion => _sourceBlock.Completion;
 44
 045        public void Complete() => _sourceBlock.Complete();
 46
 047        public void Fault(Exception exception) => _sourceBlock.Fault(exception);
 48
 49        public T? ConsumeMessage(
 50            DataflowMessageHeader messageHeader,
 51            ITargetBlock<T> target,
 52            out bool messageConsumed
 53        )
 54        {
 417155            messageConsumed = false;
 417156            T? ret = default;
 417157            lock (OutgoingLock)
 58            {
 417159                if (reservationTarget == null || reservationTarget == target)
 60                {
 417161                    ret = _sourceBlock.ConsumeMessage(messageHeader, _target, out messageConsumed);
 417162                    reservationTarget = null;
 63                }
 417164            }
 417165            return ret;
 66        }
 67
 68        public void ReleaseReservation(
 69            DataflowMessageHeader messageHeader,
 70            ITargetBlock<T> target
 71        )
 72        {
 173            lock (OutgoingLock)
 74            {
 175                if (reservationTarget == target)
 76                {
 177                    _sourceBlock.ReleaseReservation(messageHeader, _target);
 178                    reservationTarget = null;
 79                }
 180            }
 181        }
 82
 83        public bool ReserveMessage(
 84            DataflowMessageHeader messageHeader,
 85            ITargetBlock<T> target
 86        )
 87        {
 688            lock (OutgoingLock)
 89            {
 690                if (reservationTarget == null || reservationTarget == target)
 91                {
 592                    var res = _sourceBlock.ReserveMessage(messageHeader, _target);
 593                    if (res)
 94                    {
 595                        reservationTarget = target;
 96                    }
 597                    return res;
 98                }
 99                else
 100                {
 1101                    return false; //other target already holds a reservation.
 102                }
 103            }
 6104        }
 105
 106        public bool TryReceive(Predicate<T>? filter, [MaybeNullWhen(false)] out T item)
 107        {
 8108            if (_sourceBlock is IReceivableSourceBlock<T> receivableSource &&
 0109                receivableSource.TryReceive((filter == null) ? Predicate : i => Predicate(i) && filter(i), out item)
 8110            )
 111            {
 3112                return true;
 113            }
 114            else
 115            {
 5116                item = default;
 5117                return false;
 118            }
 119        }
 120
 121        public bool TryReceiveAll([NotNullWhen(true)] out IList<T>? items)
 122        {
 4123            List<T> l = [];
 6124            while(TryReceive(null, out var item))
 125            {
 2126                l.Add(item);
 2127            }
 128
 4129            if (l.Count > 0)
 130            {
 2131                items = l;
 2132                return true;
 133            }
 134            else
 135            {
 2136                items = null;
 2137                return false;
 138            }
 139        }
 140
 31141        private sealed class LinkTarget(SynchronousFilterBlock<T> outer, ITargetBlock<T> target, int maxMessages) : ITar
 142        {
 0143            public Task Completion => target.Completion;
 144
 17145            public void Complete() => target.Complete();
 8146            public void Fault(Exception exception) => target.Fault(exception);
 147            public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<
 148            {
 152639149                if (maxMessages == 0)
 150                {
 0151                    return DataflowMessageStatus.DecliningPermanently;
 152639152                } else if (!outer.Predicate(messageValue))
 153                {
 52275154                    return outer.Mode == FilterMode.Drop ? DataflowMessageStatus.Accepted : DataflowMessageStatus.Declin
 155                }
 156                else
 157                {
 100364158                    var ret = target.OfferMessage(messageHeader, messageValue, outer, consumeToAccept);
 100364159                    if (ret == DataflowMessageStatus.Accepted && maxMessages > 0)
 160                    {
 0161                        maxMessages--;
 162                    }
 100364163                    return ret;
 164                }
 165            }
 166        }
 167
 36168        private sealed class DummyTarget(SynchronousFilterBlock<T> outer) : ITargetBlock<T>
 169        {
 0170            public Task Completion => outer.Completion;
 0171            public void Complete() => outer.Complete();
 0172            public void Fault(Exception exception) => outer.Fault(exception);
 173            public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<
 0174                => DataflowMessageStatus.Postponed;
 175        }
 176    }
 177}